diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 636 |
1 files changed, 340 insertions, 296 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 805b1f35e1ca..353f61ac8d51 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -53,6 +53,7 @@ #include <linux/slab.h> #include <linux/sunrpc/addr.h> #include <linux/sunrpc/svc_rdma.h> +#include <linux/log2.h> #include <asm-generic/barrier.h> #include <asm/bitops.h> @@ -73,15 +74,21 @@ /* * internal functions */ -static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); +static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_sendctx *sc); +static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep); +static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt); static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); -static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); +static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt); static struct rpcrdma_regbuf * rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction, gfp_t flags); static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb); static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb); -static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); /* Wait for outstanding transport work to finish. ib_drain_qp * handles the drains in the wrong order for us, so open code @@ -122,7 +129,7 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context) /** * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC - * @cq: completion queue (ignored) + * @cq: completion queue * @wc: completed WR * */ @@ -135,7 +142,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_send(sc, wc); - rpcrdma_sendctx_put_locked(sc); + rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc); } /** @@ -167,19 +174,18 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) rdmab_addr(rep->rr_rdmabuf), wc->byte_len, DMA_FROM_DEVICE); - rpcrdma_post_recvs(r_xprt, false); rpcrdma_reply_handler(rep); return; out_flushed: - rpcrdma_recv_buffer_put(rep); + rpcrdma_rep_destroy(rep); } -static void -rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, - struct rdma_conn_param *param) +static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt, + struct rdma_conn_param *param) { const struct rpcrdma_connect_private *pmsg = param->private_data; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; unsigned int rsize, wsize; /* Default settings for RPC-over-RDMA Version One */ @@ -195,13 +201,11 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); } - if (rsize < r_xprt->rx_ep.rep_inline_recv) - r_xprt->rx_ep.rep_inline_recv = rsize; - if (wsize < r_xprt->rx_ep.rep_inline_send) - r_xprt->rx_ep.rep_inline_send = wsize; - dprintk("RPC: %s: max send %u, max recv %u\n", __func__, - r_xprt->rx_ep.rep_inline_send, - r_xprt->rx_ep.rep_inline_recv); + if (rsize < ep->rep_inline_recv) + ep->rep_inline_recv = rsize; + if (wsize < ep->rep_inline_send) + ep->rep_inline_send = wsize; + rpcrdma_set_max_header_sizes(r_xprt); } @@ -244,6 +248,7 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) ia->ri_id->device->name, rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt)); #endif + init_completion(&ia->ri_remove_done); set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); ep->rep_connected = -ENODEV; xprt_force_disconnect(xprt); @@ -255,7 +260,8 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) case RDMA_CM_EVENT_ESTABLISHED: ++xprt->connect_cookie; ep->rep_connected = 1; - rpcrdma_update_connect_private(r_xprt, &event->param.conn); + rpcrdma_update_cm_private(r_xprt, &event->param.conn); + trace_xprtrdma_inline_thresh(r_xprt); wake_up_all(&ep->rep_connect_wait); break; case RDMA_CM_EVENT_CONNECT_ERROR: @@ -295,10 +301,7 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) struct rdma_cm_id *id; int rc; - trace_xprtrdma_conn_start(xprt); - init_completion(&ia->ri_done); - init_completion(&ia->ri_remove_done); id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler, xprt, RDMA_PS_TCP, IB_QPT_RC); @@ -312,10 +315,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) if (rc) goto out; rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); - if (rc < 0) { - trace_xprtrdma_conn_tout(xprt); + if (rc < 0) goto out; - } rc = ia->ri_async_rc; if (rc) @@ -326,10 +327,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) if (rc) goto out; rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); - if (rc < 0) { - trace_xprtrdma_conn_tout(xprt); + if (rc < 0) goto out; - } rc = ia->ri_async_rc; if (rc) goto out; @@ -371,18 +370,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt) goto out_err; } - switch (xprt_rdma_memreg_strategy) { - case RPCRDMA_FRWR: - if (frwr_is_supported(ia->ri_id->device)) - break; - /*FALLTHROUGH*/ - default: - pr_err("rpcrdma: Device %s does not support memreg mode %d\n", - ia->ri_id->device->name, xprt_rdma_memreg_strategy); - rc = -EINVAL; - goto out_err; - } - return 0; out_err: @@ -396,6 +383,8 @@ out_err: * * Divest transport H/W resources associated with this adapter, * but allow it to be restored later. + * + * Caller must hold the transport send lock. */ void rpcrdma_ia_remove(struct rpcrdma_ia *ia) @@ -403,11 +392,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); struct rpcrdma_ep *ep = &r_xprt->rx_ep; - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_req *req; - struct rpcrdma_rep *rep; - - cancel_delayed_work_sync(&buf->rb_refresh_worker); /* This is similar to rpcrdma_ep_destroy, but: * - Don't cancel the connect worker. @@ -429,14 +413,10 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) /* The ULP is responsible for ensuring all DMA * mappings and MRs are gone. */ - list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) - rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf); - list_for_each_entry(req, &buf->rb_allreqs, rl_all) { - rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf); - rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); - rpcrdma_regbuf_dma_unmap(req->rl_recvbuf); - } - rpcrdma_mrs_destroy(buf); + rpcrdma_reps_unmap(r_xprt); + rpcrdma_reqs_reset(r_xprt); + rpcrdma_mrs_destroy(r_xprt); + rpcrdma_sendctxs_destroy(r_xprt); ib_dealloc_pd(ia->ri_pd); ia->ri_pd = NULL; @@ -479,30 +459,20 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; struct ib_cq *sendcq, *recvcq; - unsigned int max_sge; int rc; - ep->rep_max_requests = xprt_rdma_slot_table_entries; + ep->rep_max_requests = r_xprt->rx_xprt.max_reqs; ep->rep_inline_send = xprt_rdma_max_inline_write; ep->rep_inline_recv = xprt_rdma_max_inline_read; - max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge, - RPCRDMA_MAX_SEND_SGES); - if (max_sge < RPCRDMA_MIN_SEND_SGES) { - pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); - return -ENOMEM; - } - ia->ri_max_send_sges = max_sge; - - rc = frwr_open(ia, ep); + rc = frwr_query_device(r_xprt, ia->ri_id->device); if (rc) return rc; + r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->rep_max_requests); ep->rep_attr.event_handler = rpcrdma_qp_event_handler; ep->rep_attr.qp_context = ep; ep->rep_attr.srq = NULL; - ep->rep_attr.cap.max_send_sge = max_sge; - ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; ep->rep_attr.qp_type = IB_QPT_RC; @@ -521,18 +491,17 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) init_waitqueue_head(&ep->rep_connect_wait); ep->rep_receive_count = 0; - sendcq = ib_alloc_cq(ia->ri_id->device, NULL, - ep->rep_attr.cap.max_send_wr + 1, - ia->ri_id->device->num_comp_vectors > 1 ? 1 : 0, - IB_POLL_WORKQUEUE); + sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt, + ep->rep_attr.cap.max_send_wr + 1, + IB_POLL_WORKQUEUE); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); goto out1; } - recvcq = ib_alloc_cq(ia->ri_id->device, NULL, - ep->rep_attr.cap.max_recv_wr + 1, - 0, IB_POLL_WORKQUEUE); + recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL, + ep->rep_attr.cap.max_recv_wr + 1, + IB_POLL_WORKQUEUE); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); goto out2; @@ -605,10 +574,11 @@ void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt) * Unlike a normal reconnection, a fresh PD and a new set * of MRs and buffers is needed. */ -static int -rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) +static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, + struct ib_qp_init_attr *qp_init_attr) { + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; int rc, err; trace_xprtrdma_reinsert(r_xprt); @@ -623,15 +593,14 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); goto out2; } + memcpy(qp_init_attr, &ep->rep_attr, sizeof(*qp_init_attr)); rc = -ENETUNREACH; - err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); + err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr); if (err) { pr_err("rpcrdma: rdma_create_qp returned %d\n", err); goto out3; } - - rpcrdma_mrs_create(r_xprt); return 0; out3: @@ -642,16 +611,14 @@ out1: return rc; } -static int -rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, - struct rpcrdma_ia *ia) +static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, + struct ib_qp_init_attr *qp_init_attr) { + struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rdma_cm_id *id, *old; int err, rc; - trace_xprtrdma_reconnect(r_xprt); - - rpcrdma_ep_disconnect(ep, ia); + rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia); rc = -EHOSTUNREACH; id = rpcrdma_create_id(r_xprt, ia); @@ -673,7 +640,7 @@ rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, goto out_destroy; } - err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); + err = rdma_create_qp(id, ia->ri_pd, qp_init_attr); if (err) goto out_destroy; @@ -698,25 +665,26 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct ib_qp_init_attr qp_init_attr; int rc; retry: + memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr)); switch (ep->rep_connected) { case 0: - dprintk("RPC: %s: connecting...\n", __func__); - rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); + rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr); if (rc) { rc = -ENETUNREACH; goto out_noupdate; } break; case -ENODEV: - rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); + rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr); if (rc) goto out_noupdate; break; default: - rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); + rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr); if (rc) goto out; } @@ -724,12 +692,19 @@ retry: ep->rep_connected = 0; xprt_clear_connected(xprt); + rpcrdma_reset_cwnd(r_xprt); rpcrdma_post_recvs(r_xprt, true); + rc = rpcrdma_sendctxs_create(r_xprt); + if (rc) + goto out; + rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); if (rc) goto out; + if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) + xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); if (ep->rep_connected <= 0) { if (ep->rep_connected == -EAGAIN) @@ -738,13 +713,19 @@ retry: goto out; } - dprintk("RPC: %s: connected\n", __func__); + rc = rpcrdma_reqs_setup(r_xprt); + if (rc) { + rpcrdma_ep_disconnect(ep, ia); + goto out; + } + rpcrdma_mrs_create(r_xprt); out: if (rc) ep->rep_connected = rc; out_noupdate: + trace_xprtrdma_connect(r_xprt, rc); return rc; } @@ -753,11 +734,8 @@ out_noupdate: * @ep: endpoint to disconnect * @ia: associated interface adapter * - * This is separate from destroy to facilitate the ability - * to reconnect without recreating the endpoint. - * - * This call is not reentrant, and must not be made in parallel - * on the same endpoint. + * Caller serializes. Either the transport send lock is held, + * or we're being called to destroy the transport. */ void rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) @@ -776,6 +754,9 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) trace_xprtrdma_disconnect(r_xprt, rc); rpcrdma_xprt_drain(r_xprt); + rpcrdma_reqs_reset(r_xprt); + rpcrdma_mrs_destroy(r_xprt); + rpcrdma_sendctxs_destroy(r_xprt); } /* Fixed-size circular FIFO queue. This implementation is wait-free and @@ -795,27 +776,28 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) * queue activity, and rpcrdma_xprt_drain has flushed all remaining * Send requests. */ -static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) +static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; unsigned long i; + if (!buf->rb_sc_ctxs) + return; for (i = 0; i <= buf->rb_sc_last; i++) kfree(buf->rb_sc_ctxs[i]); kfree(buf->rb_sc_ctxs); + buf->rb_sc_ctxs = NULL; } -static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) +static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep) { struct rpcrdma_sendctx *sc; - sc = kzalloc(struct_size(sc, sc_sges, ia->ri_max_send_sges), + sc = kzalloc(struct_size(sc, sc_sges, ep->rep_attr.cap.max_send_sge), GFP_KERNEL); if (!sc) return NULL; - sc->sc_wr.wr_cqe = &sc->sc_cqe; - sc->sc_wr.sg_list = sc->sc_sges; - sc->sc_wr.opcode = IB_WR_SEND; sc->sc_cqe.done = rpcrdma_wc_send; return sc; } @@ -831,22 +813,22 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) * the ->send_request call to fail temporarily before too many * Sends are posted. */ - i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; - dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); + i = r_xprt->rx_ep.rep_max_requests + RPCRDMA_MAX_BC_REQUESTS; buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); if (!buf->rb_sc_ctxs) return -ENOMEM; buf->rb_sc_last = i - 1; for (i = 0; i <= buf->rb_sc_last; i++) { - sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); + sc = rpcrdma_sendctx_create(&r_xprt->rx_ep); if (!sc) return -ENOMEM; - sc->sc_xprt = r_xprt; buf->rb_sc_ctxs[i] = sc; } + buf->rb_sc_head = 0; + buf->rb_sc_tail = 0; return 0; } @@ -906,6 +888,7 @@ out_emptyq: /** * rpcrdma_sendctx_put_locked - Release a send context + * @r_xprt: controlling transport instance * @sc: send context to release * * Usage: Called from Send completion to return a sendctxt @@ -913,10 +896,10 @@ out_emptyq: * * The caller serializes calls to this function (per transport). */ -static void -rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) +static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_sendctx *sc) { - struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; unsigned long next_tail; /* Unmap SGEs of previously completed but unsignaled @@ -934,7 +917,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) /* Paired with READ_ONCE */ smp_store_release(&buf->rb_sc_tail, next_tail); - xprt_write_space(&sc->sc_xprt->rx_xprt); + xprt_write_space(&r_xprt->rx_xprt); } static void @@ -943,14 +926,12 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; unsigned int count; - LIST_HEAD(free); - LIST_HEAD(all); - for (count = 0; count < ia->ri_max_segs; count++) { + for (count = 0; count < ia->ri_max_rdma_segs; count++) { struct rpcrdma_mr *mr; int rc; - mr = kzalloc(sizeof(*mr), GFP_KERNEL); + mr = kzalloc(sizeof(*mr), GFP_NOFS); if (!mr) break; @@ -962,15 +943,13 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) mr->mr_xprt = r_xprt; - list_add(&mr->mr_list, &free); - list_add(&mr->mr_all, &all); + spin_lock(&buf->rb_lock); + rpcrdma_mr_push(mr, &buf->rb_mrs); + list_add(&mr->mr_all, &buf->rb_all_mrs); + spin_unlock(&buf->rb_lock); } - spin_lock(&buf->rb_mrlock); - list_splice(&free, &buf->rb_mrs); - list_splice(&all, &buf->rb_all); r_xprt->rx_stats.mrs_allocated += count; - spin_unlock(&buf->rb_mrlock); trace_xprtrdma_createmrs(r_xprt, count); } @@ -978,7 +957,7 @@ static void rpcrdma_mr_refresh_worker(struct work_struct *work) { struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, - rb_refresh_worker.work); + rb_refresh_worker); struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); @@ -987,6 +966,28 @@ rpcrdma_mr_refresh_worker(struct work_struct *work) } /** + * rpcrdma_mrs_refresh - Wake the MR refresh worker + * @r_xprt: controlling transport instance + * + */ +void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; + + /* If there is no underlying device, it's no use to + * wake the refresh worker. + */ + if (ep->rep_connected != -ENODEV) { + /* The work is scheduled on a WQ_MEM_RECLAIM + * workqueue in order to prevent MR allocation + * from recursing into NFS during direct reclaim. + */ + queue_work(xprtiod_workqueue, &buf->rb_refresh_worker); + } +} + +/** * rpcrdma_req_create - Allocate an rpcrdma_req object * @r_xprt: controlling r_xprt * @size: initial size, in bytes, of send and receive buffers @@ -998,45 +999,120 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, gfp_t flags) { struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; - struct rpcrdma_regbuf *rb; struct rpcrdma_req *req; req = kzalloc(sizeof(*req), flags); if (req == NULL) goto out1; - rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags); - if (!rb) - goto out2; - req->rl_rdmabuf = rb; - xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb)); - req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags); if (!req->rl_sendbuf) - goto out3; + goto out2; req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags); if (!req->rl_recvbuf) - goto out4; + goto out3; + INIT_LIST_HEAD(&req->rl_free_mrs); INIT_LIST_HEAD(&req->rl_registered); spin_lock(&buffer->rb_lock); list_add(&req->rl_all, &buffer->rb_allreqs); spin_unlock(&buffer->rb_lock); return req; -out4: - kfree(req->rl_sendbuf); out3: - kfree(req->rl_rdmabuf); + kfree(req->rl_sendbuf); out2: kfree(req); out1: return NULL; } -static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, - bool temp) +/** + * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object + * @r_xprt: controlling transport instance + * @req: rpcrdma_req object to set up + * + * Returns zero on success, and a negative errno on failure. + */ +int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + struct rpcrdma_regbuf *rb; + size_t maxhdrsize; + + /* Compute maximum header buffer size in bytes */ + maxhdrsize = rpcrdma_fixed_maxsz + 3 + + r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz; + maxhdrsize *= sizeof(__be32); + rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize), + DMA_TO_DEVICE, GFP_KERNEL); + if (!rb) + goto out; + + if (!__rpcrdma_regbuf_dma_map(r_xprt, rb)) + goto out_free; + + req->rl_rdmabuf = rb; + xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb)); + return 0; + +out_free: + rpcrdma_regbuf_free(rb); +out: + return -ENOMEM; +} + +/* ASSUMPTION: the rb_allreqs list is stable for the duration, + * and thus can be walked without holding rb_lock. Eg. the + * caller is holding the transport send lock to exclude + * device removal or disconnection. + */ +static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_req *req; + int rc; + + list_for_each_entry(req, &buf->rb_allreqs, rl_all) { + rc = rpcrdma_req_setup(r_xprt, req); + if (rc) + return rc; + } + return 0; +} + +static void rpcrdma_req_reset(struct rpcrdma_req *req) +{ + /* Credits are valid for only one connection */ + req->rl_slot.rq_cong = 0; + + rpcrdma_regbuf_free(req->rl_rdmabuf); + req->rl_rdmabuf = NULL; + + rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); + rpcrdma_regbuf_dma_unmap(req->rl_recvbuf); +} + +/* ASSUMPTION: the rb_allreqs list is stable for the duration, + * and thus can be walked without holding rb_lock. Eg. the + * caller is holding the transport send lock to exclude + * device removal or disconnection. + */ +static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_req *req; + + list_for_each_entry(req, &buf->rb_allreqs, rl_all) + rpcrdma_req_reset(req); +} + +/* No locking needed here. This function is called only by the + * Receive completion handler. + */ +static noinline +struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, + bool temp) { struct rpcrdma_rep *rep; @@ -1049,6 +1125,9 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, if (!rep->rr_rdmabuf) goto out_free; + if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) + goto out_free_regbuf; + xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf), rdmab_length(rep->rr_rdmabuf)); rep->rr_cqe.done = rpcrdma_wc_receive; @@ -1058,14 +1137,63 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.num_sge = 1; rep->rr_temp = temp; + list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps); return rep; +out_free_regbuf: + rpcrdma_regbuf_free(rep->rr_rdmabuf); out_free: kfree(rep); out: return NULL; } +/* No locking needed here. This function is invoked only by the + * Receive completion handler, or during transport shutdown. + */ +static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) +{ + list_del(&rep->rr_all); + rpcrdma_regbuf_free(rep->rr_rdmabuf); + kfree(rep); +} + +static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) +{ + struct llist_node *node; + + /* Calls to llist_del_first are required to be serialized */ + node = llist_del_first(&buf->rb_free_reps); + if (!node) + return NULL; + return llist_entry(node, struct rpcrdma_rep, rr_node); +} + +static void rpcrdma_rep_put(struct rpcrdma_buffer *buf, + struct rpcrdma_rep *rep) +{ + llist_add(&rep->rr_node, &buf->rb_free_reps); +} + +static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_rep *rep; + + list_for_each_entry(rep, &buf->rb_all_reps, rr_all) { + rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf); + rep->rr_temp = true; + } +} + +static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_rep *rep; + + while ((rep = rpcrdma_rep_get_locked(buf)) != NULL) + rpcrdma_rep_destroy(rep); +} + /** * rpcrdma_buffer_create - Create initial set of req/rep objects * @r_xprt: transport instance to (re)initialize @@ -1077,37 +1205,28 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; int i, rc; - buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests; buf->rb_bc_srv_max_requests = 0; - spin_lock_init(&buf->rb_mrlock); spin_lock_init(&buf->rb_lock); INIT_LIST_HEAD(&buf->rb_mrs); - INIT_LIST_HEAD(&buf->rb_all); - INIT_DELAYED_WORK(&buf->rb_refresh_worker, - rpcrdma_mr_refresh_worker); - - rpcrdma_mrs_create(r_xprt); + INIT_LIST_HEAD(&buf->rb_all_mrs); + INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker); INIT_LIST_HEAD(&buf->rb_send_bufs); INIT_LIST_HEAD(&buf->rb_allreqs); + INIT_LIST_HEAD(&buf->rb_all_reps); rc = -ENOMEM; - for (i = 0; i < buf->rb_max_requests; i++) { + for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) { struct rpcrdma_req *req; - req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE, + req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2, GFP_KERNEL); if (!req) goto out; list_add(&req->rl_list, &buf->rb_send_bufs); } - buf->rb_credits = 1; - INIT_LIST_HEAD(&buf->rb_recv_bufs); - - rc = rpcrdma_sendctxs_create(r_xprt); - if (rc) - goto out; + init_llist_head(&buf->rb_free_reps); return 0; out: @@ -1115,58 +1234,62 @@ out: return rc; } -static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) -{ - rpcrdma_regbuf_free(rep->rr_rdmabuf); - kfree(rep); -} - /** * rpcrdma_req_destroy - Destroy an rpcrdma_req object * @req: unused object to be destroyed * - * This function assumes that the caller prevents concurrent device - * unload and transport tear-down. + * Relies on caller holding the transport send lock to protect + * removing req->rl_all from buf->rb_all_reqs safely. */ -void -rpcrdma_req_destroy(struct rpcrdma_req *req) +void rpcrdma_req_destroy(struct rpcrdma_req *req) { + struct rpcrdma_mr *mr; + list_del(&req->rl_all); + while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) { + struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf; + + spin_lock(&buf->rb_lock); + list_del(&mr->mr_all); + spin_unlock(&buf->rb_lock); + + frwr_release_mr(mr); + } + rpcrdma_regbuf_free(req->rl_recvbuf); rpcrdma_regbuf_free(req->rl_sendbuf); rpcrdma_regbuf_free(req->rl_rdmabuf); kfree(req); } -static void -rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) +/** + * rpcrdma_mrs_destroy - Release all of a transport's MRs + * @r_xprt: controlling transport instance + * + * Relies on caller holding the transport send lock to protect + * removing mr->mr_list from req->rl_free_mrs safely. + */ +static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt) { - struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, - rx_buf); + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_mr *mr; - unsigned int count; - count = 0; - spin_lock(&buf->rb_mrlock); - while (!list_empty(&buf->rb_all)) { - mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all); - list_del(&mr->mr_all); + cancel_work_sync(&buf->rb_refresh_worker); - spin_unlock(&buf->rb_mrlock); - - /* Ensure MW is not on any rl_registered list */ - if (!list_empty(&mr->mr_list)) - list_del(&mr->mr_list); + spin_lock(&buf->rb_lock); + while ((mr = list_first_entry_or_null(&buf->rb_all_mrs, + struct rpcrdma_mr, + mr_all)) != NULL) { + list_del(&mr->mr_list); + list_del(&mr->mr_all); + spin_unlock(&buf->rb_lock); frwr_release_mr(mr); - count++; - spin_lock(&buf->rb_mrlock); - } - spin_unlock(&buf->rb_mrlock); - r_xprt->rx_stats.mrs_allocated = 0; - dprintk("RPC: %s: released %u MRs\n", __func__, count); + spin_lock(&buf->rb_lock); + } + spin_unlock(&buf->rb_lock); } /** @@ -1180,18 +1303,7 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { - cancel_delayed_work_sync(&buf->rb_refresh_worker); - - rpcrdma_sendctxs_destroy(buf); - - while (!list_empty(&buf->rb_recv_bufs)) { - struct rpcrdma_rep *rep; - - rep = list_first_entry(&buf->rb_recv_bufs, - struct rpcrdma_rep, rr_list); - list_del(&rep->rr_list); - rpcrdma_rep_destroy(rep); - } + rpcrdma_reps_destroy(buf); while (!list_empty(&buf->rb_send_bufs)) { struct rpcrdma_req *req; @@ -1201,8 +1313,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) list_del(&req->rl_list); rpcrdma_req_destroy(req); } - - rpcrdma_mrs_destroy(buf); } /** @@ -1216,54 +1326,20 @@ struct rpcrdma_mr * rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_mr *mr = NULL; - - spin_lock(&buf->rb_mrlock); - if (!list_empty(&buf->rb_mrs)) - mr = rpcrdma_mr_pop(&buf->rb_mrs); - spin_unlock(&buf->rb_mrlock); + struct rpcrdma_mr *mr; - if (!mr) - goto out_nomrs; + spin_lock(&buf->rb_lock); + mr = rpcrdma_mr_pop(&buf->rb_mrs); + spin_unlock(&buf->rb_lock); return mr; - -out_nomrs: - trace_xprtrdma_nomrs(r_xprt); - if (r_xprt->rx_ep.rep_connected != -ENODEV) - schedule_delayed_work(&buf->rb_refresh_worker, 0); - - /* Allow the reply handler and refresh worker to run */ - cond_resched(); - - return NULL; -} - -static void -__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr) -{ - spin_lock(&buf->rb_mrlock); - rpcrdma_mr_push(mr, &buf->rb_mrs); - spin_unlock(&buf->rb_mrlock); } /** - * rpcrdma_mr_put - Release an rpcrdma_mr object - * @mr: object to release + * rpcrdma_mr_put - DMA unmap an MR and release it + * @mr: MR to release * */ -void -rpcrdma_mr_put(struct rpcrdma_mr *mr) -{ - __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr); -} - -/** - * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it - * @mr: object to release - * - */ -void -rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) +void rpcrdma_mr_put(struct rpcrdma_mr *mr) { struct rpcrdma_xprt *r_xprt = mr->mr_xprt; @@ -1273,7 +1349,8 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) mr->mr_sg, mr->mr_nents, mr->mr_dir); mr->mr_dir = DMA_NONE; } - __rpcrdma_mr_put(&r_xprt->rx_buf, mr); + + rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs); } /** @@ -1304,39 +1381,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) */ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) { - struct rpcrdma_rep *rep = req->rl_reply; - + if (req->rl_reply) + rpcrdma_rep_put(buffers, req->rl_reply); req->rl_reply = NULL; spin_lock(&buffers->rb_lock); list_add(&req->rl_list, &buffers->rb_send_bufs); - if (rep) { - if (!rep->rr_temp) { - list_add(&rep->rr_list, &buffers->rb_recv_bufs); - rep = NULL; - } - } spin_unlock(&buffers->rb_lock); - if (rep) - rpcrdma_rep_destroy(rep); } -/* - * Put reply buffers back into pool when not attached to - * request. This happens in error conditions. +/** + * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list + * @rep: rep to release + * + * Used after error conditions. */ -void -rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) +void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) { - struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; - - if (!rep->rr_temp) { - spin_lock(&buffers->rb_lock); - list_add(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock(&buffers->rb_lock); - } else { - rpcrdma_rep_destroy(rep); - } + rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep); } /* Returns a pointer to a rpcrdma_regbuf object, or NULL. @@ -1453,7 +1515,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_req *req) { - struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; + struct ib_send_wr *send_wr = &req->rl_wr; int rc; if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) { @@ -1471,12 +1533,17 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, return 0; } -static void -rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) +/** + * rpcrdma_post_recvs - Refill the Receive Queue + * @r_xprt: controlling transport instance + * @temp: mark Receive buffers to be deleted after use + * + */ +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ep *ep = &r_xprt->rx_ep; - struct ib_recv_wr *i, *wr, *bad_wr; + struct ib_recv_wr *wr, *bad_wr; struct rpcrdma_rep *rep; int needed, count, rc; @@ -1484,7 +1551,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) count = 0; needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); - if (ep->rep_receive_count > needed) + if (likely(ep->rep_receive_count > needed)) goto out; needed -= ep->rep_receive_count; if (!temp) @@ -1492,42 +1559,26 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) /* fast path: all needed reps can be found on the free list */ wr = NULL; - spin_lock(&buf->rb_lock); while (needed) { - rep = list_first_entry_or_null(&buf->rb_recv_bufs, - struct rpcrdma_rep, rr_list); + rep = rpcrdma_rep_get_locked(buf); + if (rep && rep->rr_temp) { + rpcrdma_rep_destroy(rep); + continue; + } if (!rep) - break; - - list_del(&rep->rr_list); - rep->rr_recv_wr.next = wr; - wr = &rep->rr_recv_wr; - --needed; - } - spin_unlock(&buf->rb_lock); - - while (needed) { - rep = rpcrdma_rep_create(r_xprt, temp); + rep = rpcrdma_rep_create(r_xprt, temp); if (!rep) break; + trace_xprtrdma_post_recv(rep); rep->rr_recv_wr.next = wr; wr = &rep->rr_recv_wr; --needed; + ++count; } if (!wr) goto out; - for (i = wr; i; i = i->next) { - rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); - - if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) - goto release_wrs; - - trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe); - ++count; - } - rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); out: @@ -1544,11 +1595,4 @@ out: } ep->rep_receive_count += count; return; - -release_wrs: - for (i = wr; i;) { - rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); - i = i->next; - rpcrdma_recv_buffer_put(rep); - } } |