diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 291 |
1 files changed, 112 insertions, 179 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 7f913ece5038..16161a36dc73 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* * Copyright (c) 2014-2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. @@ -72,8 +73,10 @@ /* * internal functions */ +static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); +static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp); static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); struct workqueue_struct *rpcrdma_receive_wq __read_mostly; @@ -160,7 +163,7 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) rr_cqe); /* WARNING: Only wr_id and status are reliable at this point */ - trace_xprtrdma_wc_receive(rep, wc); + trace_xprtrdma_wc_receive(wc); if (wc->status != IB_WC_SUCCESS) goto out_fail; @@ -232,7 +235,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) complete(&ia->ri_done); break; case RDMA_CM_EVENT_ADDR_ERROR: - ia->ri_async_rc = -EHOSTUNREACH; + ia->ri_async_rc = -EPROTO; complete(&ia->ri_done); break; case RDMA_CM_EVENT_ROUTE_ERROR: @@ -263,7 +266,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) connstate = -ENOTCONN; goto connected; case RDMA_CM_EVENT_UNREACHABLE: - connstate = -ENETDOWN; + connstate = -ENETUNREACH; goto connected; case RDMA_CM_EVENT_REJECTED: dprintk("rpcrdma: connection to %s:%s rejected: %s\n", @@ -306,8 +309,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) init_completion(&ia->ri_done); init_completion(&ia->ri_remove_done); - id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, - IB_QPT_RC); + id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_conn_upcall, + xprt, RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(id)) { rc = PTR_ERR(id); dprintk("RPC: %s: rdma_create_id() failed %i\n", @@ -501,8 +504,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; - unsigned int max_qp_wr, max_sge; struct ib_cq *sendcq, *recvcq; + unsigned int max_sge; int rc; max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, @@ -513,29 +516,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, } ia->ri_max_send_sges = max_sge; - if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { - dprintk("RPC: %s: insufficient wqe's available\n", - __func__); - return -ENOMEM; - } - max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; - - /* check provider's send/recv wr limits */ - if (cdata->max_requests > max_qp_wr) - cdata->max_requests = max_qp_wr; + rc = ia->ri_ops->ro_open(ia, ep, cdata); + if (rc) + return rc; ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; ep->rep_attr.qp_context = ep; ep->rep_attr.srq = NULL; - ep->rep_attr.cap.max_send_wr = cdata->max_requests; - ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; - ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ - rc = ia->ri_ops->ro_open(ia, ep, cdata); - if (rc) - return rc; - ep->rep_attr.cap.max_recv_wr = cdata->max_requests; - ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; - ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ ep->rep_attr.cap.max_send_sge = max_sge; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; @@ -742,7 +729,6 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); - unsigned int extras; int rc; retry: @@ -786,9 +772,8 @@ retry: } dprintk("RPC: %s: connected\n", __func__); - extras = r_xprt->rx_buf.rb_bc_srv_max_requests; - if (extras) - rpcrdma_ep_post_extra_recv(r_xprt, extras); + + rpcrdma_post_recvs(r_xprt, true); out: if (rc) @@ -894,6 +879,7 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) sc->sc_xprt = r_xprt; buf->rb_sc_ctxs[i] = sc; } + buf->rb_flags = 0; return 0; @@ -951,7 +937,7 @@ out_emptyq: * completions recently. This is a sign the Send Queue is * backing up. Cause the caller to pause and try again. */ - dprintk("RPC: %s: empty sendctx queue\n", __func__); + set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags); r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); r_xprt->rx_stats.empty_sendctx_q++; return NULL; @@ -966,7 +952,8 @@ out_emptyq: * * The caller serializes calls to this function (per rpcrdma_buffer). */ -void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) +static void +rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) { struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; unsigned long next_tail; @@ -985,6 +972,11 @@ void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) /* Paired with READ_ONCE */ smp_store_release(&buf->rb_sc_tail, next_tail); + + if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) { + smp_mb__after_atomic(); + xprt_write_space(&sc->sc_xprt->rx_xprt); + } } static void @@ -1098,14 +1090,8 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) return req; } -/** - * rpcrdma_create_rep - Allocate an rpcrdma_rep object - * @r_xprt: controlling transport - * - * Returns 0 on success or a negative errno on failure. - */ -int -rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) +static int +rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp) { struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; struct rpcrdma_buffer *buf = &r_xprt->rx_buf; @@ -1133,6 +1119,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.num_sge = 1; + rep->rr_temp = temp; spin_lock(&buf->rb_lock); list_add(&rep->rr_list, &buf->rb_recv_bufs); @@ -1184,12 +1171,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) list_add(&req->rl_list, &buf->rb_send_bufs); } + buf->rb_posted_receives = 0; INIT_LIST_HEAD(&buf->rb_recv_bufs); - for (i = 0; i <= buf->rb_max_requests; i++) { - rc = rpcrdma_create_rep(r_xprt); - if (rc) - goto out; - } rc = rpcrdma_sendctxs_create(r_xprt); if (rc) @@ -1201,28 +1184,6 @@ out: return rc; } -static struct rpcrdma_req * -rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) -{ - struct rpcrdma_req *req; - - req = list_first_entry(&buf->rb_send_bufs, - struct rpcrdma_req, rl_list); - list_del_init(&req->rl_list); - return req; -} - -static struct rpcrdma_rep * -rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) -{ - struct rpcrdma_rep *rep; - - rep = list_first_entry(&buf->rb_recv_bufs, - struct rpcrdma_rep, rr_list); - list_del(&rep->rr_list); - return rep; -} - static void rpcrdma_destroy_rep(struct rpcrdma_rep *rep) { @@ -1281,10 +1242,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) while (!list_empty(&buf->rb_recv_bufs)) { struct rpcrdma_rep *rep; - rep = rpcrdma_buffer_get_rep_locked(buf); + rep = list_first_entry(&buf->rb_recv_bufs, + struct rpcrdma_rep, rr_list); + list_del(&rep->rr_list); rpcrdma_destroy_rep(rep); } - buf->rb_send_count = 0; spin_lock(&buf->rb_reqslock); while (!list_empty(&buf->rb_allreqs)) { @@ -1299,7 +1261,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) spin_lock(&buf->rb_reqslock); } spin_unlock(&buf->rb_reqslock); - buf->rb_recv_count = 0; rpcrdma_mrs_destroy(buf); } @@ -1372,27 +1333,11 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) __rpcrdma_mr_put(&r_xprt->rx_buf, mr); } -static struct rpcrdma_rep * -rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) -{ - /* If an RPC previously completed without a reply (say, a - * credential problem or a soft timeout occurs) then hold off - * on supplying more Receive buffers until the number of new - * pending RPCs catches up to the number of posted Receives. - */ - if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) - return NULL; - - if (unlikely(list_empty(&buffers->rb_recv_bufs))) - return NULL; - buffers->rb_recv_count++; - return rpcrdma_buffer_get_rep_locked(buffers); -} - -/* - * Get a set of request/reply buffers. +/** + * rpcrdma_buffer_get - Get a request buffer + * @buffers: Buffer pool from which to obtain a buffer * - * Reply buffer (if available) is attached to send buffer upon return. + * Returns a fresh rpcrdma_req, or NULL if none are available. */ struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) @@ -1400,23 +1345,18 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) struct rpcrdma_req *req; spin_lock(&buffers->rb_lock); - if (list_empty(&buffers->rb_send_bufs)) - goto out_reqbuf; - buffers->rb_send_count++; - req = rpcrdma_buffer_get_req_locked(buffers); - req->rl_reply = rpcrdma_buffer_get_rep(buffers); + req = list_first_entry_or_null(&buffers->rb_send_bufs, + struct rpcrdma_req, rl_list); + if (req) + list_del_init(&req->rl_list); spin_unlock(&buffers->rb_lock); - return req; - -out_reqbuf: - spin_unlock(&buffers->rb_lock); - return NULL; } -/* - * Put request/reply buffers back into pool. - * Pre-decrement counter/array index. +/** + * rpcrdma_buffer_put - Put request/reply buffers back into pool + * @req: object to return + * */ void rpcrdma_buffer_put(struct rpcrdma_req *req) @@ -1427,27 +1367,16 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) req->rl_reply = NULL; spin_lock(&buffers->rb_lock); - buffers->rb_send_count--; - list_add_tail(&req->rl_list, &buffers->rb_send_bufs); + list_add(&req->rl_list, &buffers->rb_send_bufs); if (rep) { - buffers->rb_recv_count--; - list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); + if (!rep->rr_temp) { + list_add(&rep->rr_list, &buffers->rb_recv_bufs); + rep = NULL; + } } spin_unlock(&buffers->rb_lock); -} - -/* - * Recover reply buffers from pool. - * This happens when recovering from disconnect. - */ -void -rpcrdma_recv_buffer_get(struct rpcrdma_req *req) -{ - struct rpcrdma_buffer *buffers = req->rl_buffer; - - spin_lock(&buffers->rb_lock); - req->rl_reply = rpcrdma_buffer_get_rep(buffers); - spin_unlock(&buffers->rb_lock); + if (rep) + rpcrdma_destroy_rep(rep); } /* @@ -1459,10 +1388,13 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) { struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; - spin_lock(&buffers->rb_lock); - buffers->rb_recv_count--; - list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock(&buffers->rb_lock); + if (!rep->rr_temp) { + spin_lock(&buffers->rb_lock); + list_add(&rep->rr_list, &buffers->rb_recv_bufs); + spin_unlock(&buffers->rb_lock); + } else { + rpcrdma_destroy_rep(rep); + } } /** @@ -1558,13 +1490,6 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; int rc; - if (req->rl_reply) { - rc = rpcrdma_ep_post_recv(ia, req->rl_reply); - if (rc) - return rc; - req->rl_reply = NULL; - } - if (!ep->rep_send_count || test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { send_wr->send_flags |= IB_SEND_SIGNALED; @@ -1581,61 +1506,69 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, return 0; } -int -rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, - struct rpcrdma_rep *rep) -{ - struct ib_recv_wr *recv_wr_fail; - int rc; - - if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) - goto out_map; - rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); - trace_xprtrdma_post_recv(rep, rc); - if (rc) - return -ENOTCONN; - return 0; - -out_map: - pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); - return -EIO; -} - /** - * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests - * @r_xprt: transport associated with these backchannel resources - * @count: minimum number of incoming requests expected + * rpcrdma_post_recvs - Maybe post some Receive buffers + * @r_xprt: controlling transport + * @temp: when true, allocate temp rpcrdma_rep objects * - * Returns zero if all requested buffers were posted, or a negative errno. */ -int -rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) +void +rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) { - struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_rep *rep; - int rc; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct ib_recv_wr *wr, *bad_wr; + int needed, count, rc; - while (count--) { - spin_lock(&buffers->rb_lock); - if (list_empty(&buffers->rb_recv_bufs)) - goto out_reqbuf; - rep = rpcrdma_buffer_get_rep_locked(buffers); - spin_unlock(&buffers->rb_lock); + needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); + if (buf->rb_posted_receives > needed) + return; + needed -= buf->rb_posted_receives; - rc = rpcrdma_ep_post_recv(ia, rep); - if (rc) - goto out_rc; - } + count = 0; + wr = NULL; + while (needed) { + struct rpcrdma_regbuf *rb; + struct rpcrdma_rep *rep; - return 0; + spin_lock(&buf->rb_lock); + rep = list_first_entry_or_null(&buf->rb_recv_bufs, + struct rpcrdma_rep, rr_list); + if (likely(rep)) + list_del(&rep->rr_list); + spin_unlock(&buf->rb_lock); + if (!rep) { + if (rpcrdma_create_rep(r_xprt, temp)) + break; + continue; + } -out_reqbuf: - spin_unlock(&buffers->rb_lock); - trace_xprtrdma_noreps(r_xprt); - return -ENOMEM; + rb = rep->rr_rdmabuf; + if (!rpcrdma_regbuf_is_mapped(rb)) { + if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) { + rpcrdma_recv_buffer_put(rep); + break; + } + } -out_rc: - rpcrdma_recv_buffer_put(rep); - return rc; + trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe); + rep->rr_recv_wr.next = wr; + wr = &rep->rr_recv_wr; + ++count; + --needed; + } + if (!count) + return; + + rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, &bad_wr); + if (rc) { + for (wr = bad_wr; wr; wr = wr->next) { + struct rpcrdma_rep *rep; + + rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); + rpcrdma_recv_buffer_put(rep); + --count; + } + } + buf->rb_posted_receives += count; + trace_xprtrdma_post_recvs(r_xprt, count, rc); } |