Skip to content

Commit f29e544

Browse files
Hakon-Buggevijay-suman
authored andcommitted
rds: ib: Simplify ib_ring and use atomic ops
The original struct rds_ib_work_ring was implemented using double set of counters / pointers for the consumer and producer of the ring. It is complicated and error prone, due to the compilers ability to re-order execution of the statements and moderns CPUs ability to re-order execution. Hence, use only a single set of free-running counters and use only atomic ops, which do not rely on explicit memory barriers. Without this commit, we will encounter the following BUG_ON() running uek7/ga (v5.15.0-0.30.9) on ARM: kernel BUG at net/rds/ib_send.c:886! Internal error: Oops - BUG: 0 [#1] SMP [] rds_ib_xmit+0xa1c/0xb70 [rds_rdma] rds_send_xmit+0x4a0/0xd50 [rds] rds_sendmsg+0xe7c/0xfa0 [rds] sock_sendmsg+0x74/0x80 ____sys_sendmsg+0x270/0x29c ___sys_sendmsg+0xb0/0x118 __sys_sendmsg+0x90/0x104 __arm64_sys_sendmsg+0x30/0x50 invoke_syscall+0x50/0x15c The bug may also manifest itself as: kernel: RDS/IB: ib_post_send to ::ffff:192.168.0.4 returned -22 or as: kernel: RDS/IB: rds_ib_send_unmap_op: unexpected opcode 0xdead in WR! Orabug: 34137460 Reviewed-by: Konrad Rzeszutek Wilk <konrad.wilk@oracle.com> Fixes: f528efe ("RDS/IB: Ring-handling code.") Tested-by: Gerald Gibson <gerald.gibson@oracle.com> Signed-off-by: Håkon Bugge <haakon.bugge@oracle.com>
1 parent 9ffbe20 commit f29e544

File tree

3 files changed

+35
-47
lines changed

3 files changed

+35
-47
lines changed

net/rds/ib.c

+9-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2006, 2021 Oracle and/or its affiliates.
2+
* Copyright (c) 2006, 2022 Oracle and/or its affiliates.
33
*
44
* This software is available to you under a choice of one of two
55
* licenses. You may choose to be licensed under the terms of the GNU
@@ -904,18 +904,16 @@ static int rds_ib_conn_info_visitor(struct rds_connection *conn,
904904
iinfo->max_send_sge = rds_ibdev->max_sge;
905905
iinfo->qp_num = ic->i_qp_num;
906906
iinfo->dst_qp_num = ic->i_dst_qp_num;
907-
iinfo->recv_alloc_ctr = ic->i_recv_ring.w_alloc_ctr;
908-
iinfo->recv_free_ctr =
909-
(uint32_t)atomic_read(&ic->i_recv_ring.w_free_ctr);
907+
iinfo->recv_alloc_ctr = (uint32_t)atomic64_read(&ic->i_recv_ring.w_alloc_ctr);
908+
iinfo->recv_free_ctr = (uint32_t)atomic64_read(&ic->i_recv_ring.w_free_ctr);
910909
iinfo->flow_ctl_post_credit =
911910
IB_GET_POST_CREDITS(atomic_read(&ic->i_credits));
912911
iinfo->flow_ctl_send_credit =
913912
IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits));
914913
rds_ib_get_mr_info(rds_ibdev, iinfo);
915914
iinfo->cache_allocs = atomic_read(&ic->i_cache_info.ci_cache_allocs);
916-
iinfo->send_alloc_ctr = ic->i_send_ring.w_alloc_ctr;
917-
iinfo->send_free_ctr =
918-
(uint32_t)atomic_read(&ic->i_send_ring.w_free_ctr);
915+
iinfo->send_alloc_ctr = (uint32_t)atomic64_read(&ic->i_send_ring.w_alloc_ctr);
916+
iinfo->send_free_ctr = (uint32_t)atomic64_read(&ic->i_send_ring.w_free_ctr);
919917
iinfo->send_bytes =
920918
(uint64_t)atomic64_read(&conn->c_send_bytes);
921919
iinfo->recv_bytes =
@@ -978,18 +976,16 @@ static int rds6_ib_conn_info_visitor(struct rds_connection *conn,
978976
iinfo6->max_send_sge = rds_ibdev->max_sge;
979977
iinfo6->qp_num = ic->i_qp_num;
980978
iinfo6->dst_qp_num = ic->i_dst_qp_num;
981-
iinfo6->recv_alloc_ctr = ic->i_recv_ring.w_alloc_ctr;
982-
iinfo6->recv_free_ctr =
983-
(uint32_t)atomic_read(&ic->i_recv_ring.w_free_ctr);
979+
iinfo6->recv_alloc_ctr = (uint32_t)atomic64_read(&ic->i_recv_ring.w_alloc_ctr);
980+
iinfo6->recv_free_ctr = (uint32_t)atomic64_read(&ic->i_recv_ring.w_free_ctr);
984981
iinfo6->flow_ctl_post_credit =
985982
IB_GET_POST_CREDITS(atomic_read(&ic->i_credits));
986983
iinfo6->flow_ctl_send_credit =
987984
IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits));
988985
rds6_ib_get_mr_info(rds_ibdev, iinfo6);
989986
iinfo6->cache_allocs = atomic_read(&ic->i_cache_info.ci_cache_allocs);
990-
iinfo6->send_alloc_ctr = ic->i_send_ring.w_alloc_ctr;
991-
iinfo6->send_free_ctr =
992-
(uint32_t)atomic_read(&ic->i_send_ring.w_free_ctr);
987+
iinfo6->send_alloc_ctr = (uint32_t)atomic64_read(&ic->i_send_ring.w_alloc_ctr);
988+
iinfo6->send_free_ctr = (uint32_t)atomic64_read(&ic->i_send_ring.w_free_ctr);
993989
iinfo6->send_bytes =
994990
(uint64_t)atomic64_read(&conn->c_send_bytes);
995991
iinfo6->recv_bytes =

net/rds/ib.h

+2-4
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,8 @@ struct rds_ib_recv_work {
201201

202202
struct rds_ib_work_ring {
203203
u32 w_nr;
204-
u32 w_alloc_ptr;
205-
u32 w_alloc_ctr;
206-
u32 w_free_ptr;
207-
atomic_t w_free_ctr;
204+
atomic64_t w_alloc_ctr;
205+
atomic64_t w_free_ctr;
208206
};
209207

210208
/*

net/rds/ib_ring.c

+24-30
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2006 Oracle. All rights reserved.
2+
* Copyright (c) 2006, 2022, Oracle and/or its affiliates.
33
*
44
* This software is available to you under a choice of one of two
55
* licenses. You may choose to be licensed under the terms of the GNU
@@ -30,6 +30,7 @@
3030
* SOFTWARE.
3131
*
3232
*/
33+
#include <linux/atomic.h>
3334
#include <linux/kernel.h>
3435

3536
#include "rds.h"
@@ -44,18 +45,17 @@
4445
* Freeing always happens in an interrupt, and hence only
4546
* races with allocations, but not with other free()s.
4647
*
47-
* The interaction between allocation and freeing is that
48-
* the alloc code has to determine the number of free entries.
49-
* To this end, we maintain two counters; an allocation counter
50-
* and a free counter. Both are allowed to run freely, and wrap
51-
* around.
48+
* The interaction between allocation and freeing is that the alloc
49+
* code has to determine the number of free entries. To this end, we
50+
* maintain two free-running counters; an allocation counter and a
51+
* free counter.
52+
*
5253
* The number of used entries is always (alloc_ctr - free_ctr) % NR.
5354
*
54-
* The current implementation makes free_ctr atomic. When the
55-
* caller finds an allocation fails, it should set an "alloc fail"
56-
* bit and retry the allocation. The "alloc fail" bit essentially tells
57-
* the CQ completion handlers to wake it up after freeing some
58-
* more entries.
55+
* When the caller finds an allocation fails, it should set an "alloc
56+
* fail" bit and retry the allocation. The "alloc fail" bit
57+
* essentially tells the CQ completion handlers to wake it up after
58+
* freeing some more entries.
5959
*/
6060

6161
/*
@@ -72,12 +72,7 @@ void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr)
7272

7373
static inline u32 __rds_ib_ring_used(struct rds_ib_work_ring *ring)
7474
{
75-
u32 diff;
76-
77-
/* This assumes that atomic_t has at least as many bits as u32 */
78-
diff = ring->w_alloc_ctr - (u32) atomic_read(&ring->w_free_ctr);
79-
80-
return diff;
75+
return (u64)atomic64_read(&ring->w_alloc_ctr) - (u64)atomic64_read(&ring->w_free_ctr);
8176
}
8277

8378
void rds_ib_ring_resize(struct rds_ib_work_ring *ring, u32 nr)
@@ -99,31 +94,30 @@ u32 rds_ib_ring_alloc(struct rds_ib_work_ring *ring, u32 val, u32 *pos)
9994

10095
avail = ring->w_nr - __rds_ib_ring_used(ring);
10196

102-
rdsdebug("ring %p val %u next %u free %u\n", ring, val,
103-
ring->w_alloc_ptr, avail);
104-
10597
if (val && avail) {
106-
ret = min(val, avail);
107-
*pos = ring->w_alloc_ptr;
98+
u64 new_ctr;
10899

109-
ring->w_alloc_ptr = (ring->w_alloc_ptr + ret) % ring->w_nr;
110-
ring->w_alloc_ctr += ret;
100+
ret = min(val, avail);
101+
new_ctr = (u64)atomic64_add_return(ret, &ring->w_alloc_ctr);
102+
*pos = (new_ctr - ret) % ring->w_nr;
111103
}
112104

113105
return ret;
114106
}
115107

116108
void rds_ib_ring_free(struct rds_ib_work_ring *ring, u32 val)
117109
{
118-
ring->w_free_ptr = (ring->w_free_ptr + val) % ring->w_nr;
119-
atomic_add(val, &ring->w_free_ctr);
120-
smp_mb();
110+
smp_mb__before_atomic();
111+
atomic64_add(val, &ring->w_free_ctr);
112+
smp_mb__after_atomic();
113+
121114
}
122115

123116
void rds_ib_ring_unalloc(struct rds_ib_work_ring *ring, u32 val)
124117
{
125-
ring->w_alloc_ptr = (ring->w_alloc_ptr + ring->w_nr - val) % ring->w_nr;
126-
ring->w_alloc_ctr -= val;
118+
smp_mb__before_atomic();
119+
atomic64_sub(val, &ring->w_alloc_ctr);
120+
smp_mb__after_atomic();
127121
}
128122

129123
int rds_ib_ring_empty(struct rds_ib_work_ring *ring)
@@ -142,7 +136,7 @@ int rds_ib_ring_low(struct rds_ib_work_ring *ring)
142136
*/
143137
u32 rds_ib_ring_oldest(struct rds_ib_work_ring *ring)
144138
{
145-
return ring->w_free_ptr;
139+
return (u64)atomic64_read(&ring->w_free_ctr) % ring->w_nr;
146140
}
147141

148142
/*

0 commit comments

Comments
 (0)