diff options
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 12 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 266 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 488 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma.c | 7 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 1 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 24 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 14 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 65 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 636 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 91 |
11 files changed, 854 insertions, 758 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 59e624b1d7a0..1a0ae0c61353 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -54,9 +54,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt) { - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - - return r_xprt->rx_buf.rb_bc_srv_max_requests; + return RPCRDMA_BACKWARD_WRS >> 1; } static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) @@ -81,7 +79,7 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) *p = xdr_zero; if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, - &rqst->rq_snd_buf, rpcrdma_noch)) + &rqst->rq_snd_buf, rpcrdma_noch_pullup)) return -EIO; trace_xprtrdma_cb_reply(rqst); @@ -165,6 +163,7 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) spin_lock(&xprt->bc_pa_lock); list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); spin_unlock(&xprt->bc_pa_lock); + xprt_put(xprt); } static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt) @@ -195,6 +194,10 @@ create_req: req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL); if (!req) return NULL; + if (rpcrdma_req_setup(r_xprt, req)) { + rpcrdma_req_destroy(req); + return NULL; + } xprt->bc_alloc_count++; rqst = &req->rl_slot; @@ -261,6 +264,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, /* Queue rqst for ULP's callback service */ bc_serv = xprt->bc_serv; + xprt_get(xprt); spin_lock(&bc_serv->sv_cb_lock); list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); spin_unlock(&bc_serv->sv_cb_lock); diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 0b6dad7580a1..125297c9aa3e 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -7,67 +7,37 @@ /* Lightweight memory registration using Fast Registration Work * Requests (FRWR). * - * FRWR features ordered asynchronous registration and deregistration - * of arbitrarily sized memory regions. This is the fastest and safest + * FRWR features ordered asynchronous registration and invalidation + * of arbitrarily-sized memory regions. This is the fastest and safest * but most complex memory registration mode. */ /* Normal operation * - * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG + * A Memory Region is prepared for RDMA Read or Write using a FAST_REG * Work Request (frwr_map). When the RDMA operation is finished, this * Memory Region is invalidated using a LOCAL_INV Work Request - * (frwr_unmap_sync). + * (frwr_unmap_async and frwr_unmap_sync). * - * Typically these Work Requests are not signaled, and neither are RDMA - * SEND Work Requests (with the exception of signaling occasionally to - * prevent provider work queue overflows). This greatly reduces HCA + * Typically FAST_REG Work Requests are not signaled, and neither are + * RDMA Send Work Requests (with the exception of signaling occasionally + * to prevent provider work queue overflows). This greatly reduces HCA * interrupt workload. - * - * As an optimization, frwr_unmap marks MRs INVALID before the - * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on - * rb_mrs immediately so that no work (like managing a linked list - * under a spinlock) is needed in the completion upcall. - * - * But this means that frwr_map() can occasionally encounter an MR - * that is INVALID but the LOCAL_INV WR has not completed. Work Queue - * ordering prevents a subsequent FAST_REG WR from executing against - * that MR while it is still being invalidated. */ /* Transport recovery * - * ->op_map and the transport connect worker cannot run at the same - * time, but ->op_unmap can fire while the transport connect worker - * is running. Thus MR recovery is handled in ->op_map, to guarantee - * that recovered MRs are owned by a sending RPC, and not one where - * ->op_unmap could fire at the same time transport reconnect is - * being done. - * - * When the underlying transport disconnects, MRs are left in one of - * four states: - * - * INVALID: The MR was not in use before the QP entered ERROR state. - * - * VALID: The MR was registered before the QP entered ERROR state. - * - * FLUSHED_FR: The MR was being registered when the QP entered ERROR - * state, and the pending WR was flushed. - * - * FLUSHED_LI: The MR was being invalidated when the QP entered ERROR - * state, and the pending WR was flushed. - * - * When frwr_map encounters FLUSHED and VALID MRs, they are recovered - * with ib_dereg_mr and then are re-initialized. Because MR recovery - * allocates fresh resources, it is deferred to a workqueue, and the - * recovered MRs are placed back on the rb_mrs list when recovery is - * complete. frwr_map allocates another MR for the current RPC while - * the broken MR is reset. - * - * To ensure that frwr_map doesn't encounter an MR that is marked - * INVALID but that is about to be flushed due to a previous transport - * disconnect, the transport connect worker attempts to drain all - * pending send queue WRs before the transport is reconnected. + * frwr_map and frwr_unmap_* cannot run at the same time the transport + * connect worker is running. The connect worker holds the transport + * send lock, just as ->send_request does. This prevents frwr_map and + * the connect worker from running concurrently. When a connection is + * closed, the Receive completion queue is drained before the allowing + * the connect worker to get control. This prevents frwr_unmap and the + * connect worker from running concurrently. + * + * When the underlying transport disconnects, MRs that are in flight + * are flushed and are likely unusable. Thus all MRs are destroyed. + * New MRs are created on demand. */ #include <linux/sunrpc/rpc_rdma.h> @@ -81,28 +51,6 @@ #endif /** - * frwr_is_supported - Check if device supports FRWR - * @device: interface adapter to check - * - * Returns true if device supports FRWR, otherwise false - */ -bool frwr_is_supported(struct ib_device *device) -{ - struct ib_device_attr *attrs = &device->attrs; - - if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) - goto out_not_supported; - if (attrs->max_fast_reg_page_list_len == 0) - goto out_not_supported; - return true; - -out_not_supported: - pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n", - device->name); - return false; -} - -/** * frwr_release_mr - Destroy one MR * @mr: MR allocated by frwr_init_mr * @@ -118,13 +66,8 @@ void frwr_release_mr(struct rpcrdma_mr *mr) kfree(mr); } -/* MRs are dynamically allocated, so simply clean up and release the MR. - * A replacement MR will subsequently be allocated on demand. - */ -static void -frwr_mr_recycle_worker(struct work_struct *work) +static void frwr_mr_recycle(struct rpcrdma_mr *mr) { - struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle); struct rpcrdma_xprt *r_xprt = mr->mr_xprt; trace_xprtrdma_mr_recycle(mr); @@ -136,10 +79,10 @@ frwr_mr_recycle_worker(struct work_struct *work) mr->mr_dir = DMA_NONE; } - spin_lock(&r_xprt->rx_buf.rb_mrlock); + spin_lock(&r_xprt->rx_buf.rb_lock); list_del(&mr->mr_all); r_xprt->rx_stats.mrs_recycled++; - spin_unlock(&r_xprt->rx_buf.rb_mrlock); + spin_unlock(&r_xprt->rx_buf.rb_lock); frwr_release_mr(mr); } @@ -156,12 +99,10 @@ frwr_mr_recycle_worker(struct work_struct *work) */ void frwr_reset(struct rpcrdma_req *req) { - while (!list_empty(&req->rl_registered)) { - struct rpcrdma_mr *mr; + struct rpcrdma_mr *mr; - mr = rpcrdma_mr_pop(&req->rl_registered); - rpcrdma_mr_unmap_and_put(mr); - } + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) + rpcrdma_mr_put(mr); } /** @@ -183,14 +124,13 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) if (IS_ERR(frmr)) goto out_mr_err; - sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL); + sg = kcalloc(depth, sizeof(*sg), GFP_NOFS); if (!sg) goto out_list_err; mr->frwr.fr_mr = frmr; mr->mr_dir = DMA_NONE; INIT_LIST_HEAD(&mr->mr_list); - INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker); init_completion(&mr->frwr.fr_linv_done); sg_init_table(sg, depth); @@ -203,33 +143,53 @@ out_mr_err: return rc; out_list_err: - dprintk("RPC: %s: sg allocation failure\n", - __func__); ib_dereg_mr(frmr); return -ENOMEM; } /** - * frwr_open - Prepare an endpoint for use with FRWR - * @ia: interface adapter this endpoint will use - * @ep: endpoint to prepare + * frwr_query_device - Prepare a transport for use with FRWR + * @r_xprt: controlling transport instance + * @device: RDMA device to query * * On success, sets: - * ep->rep_attr.cap.max_send_wr - * ep->rep_attr.cap.max_recv_wr + * ep->rep_attr * ep->rep_max_requests - * ia->ri_max_segs + * ia->ri_max_rdma_segs * * And these FRWR-related fields: * ia->ri_max_frwr_depth * ia->ri_mrtype * - * On failure, a negative errno is returned. + * Return values: + * On success, returns zero. + * %-EINVAL - the device does not support FRWR memory registration + * %-ENOMEM - the device is not sufficiently capable for NFS/RDMA */ -int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep) +int frwr_query_device(struct rpcrdma_xprt *r_xprt, + const struct ib_device *device) { - struct ib_device_attr *attrs = &ia->ri_id->device->attrs; + const struct ib_device_attr *attrs = &device->attrs; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; int max_qp_wr, depth, delta; + unsigned int max_sge; + + if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) || + attrs->max_fast_reg_page_list_len == 0) { + pr_err("rpcrdma: 'frwr' mode is not supported by device %s\n", + device->name); + return -EINVAL; + } + + max_sge = min_t(unsigned int, attrs->max_send_sge, + RPCRDMA_MAX_SEND_SGES); + if (max_sge < RPCRDMA_MIN_SEND_SGES) { + pr_err("rpcrdma: HCA provides only %u send SGEs\n", max_sge); + return -ENOMEM; + } + ep->rep_attr.cap.max_send_sge = max_sge; + ep->rep_attr.cap.max_recv_sge = 1; ia->ri_mrtype = IB_MR_TYPE_MEM_REG; if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG) @@ -239,14 +199,12 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep) * capability, but perform optimally when the MRs are not larger * than a page. */ - if (attrs->max_sge_rd > 1) + if (attrs->max_sge_rd > RPCRDMA_MAX_HDR_SEGS) ia->ri_max_frwr_depth = attrs->max_sge_rd; else ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len; if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS) ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS; - dprintk("RPC: %s: max FR page list depth = %u\n", - __func__, ia->ri_max_frwr_depth); /* Add room for frwr register and invalidate WRs. * 1. FRWR reg WR for head @@ -270,7 +228,7 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep) } while (delta > 0); } - max_qp_wr = ia->ri_id->device->attrs.max_qp_wr; + max_qp_wr = attrs->max_qp_wr; max_qp_wr -= RPCRDMA_BACKWARD_WRS; max_qp_wr -= 1; if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE) @@ -281,7 +239,7 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep) if (ep->rep_attr.cap.max_send_wr > max_qp_wr) { ep->rep_max_requests = max_qp_wr / depth; if (!ep->rep_max_requests) - return -EINVAL; + return -ENOMEM; ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth; } ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; @@ -290,30 +248,22 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep) ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ - ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS / - ia->ri_max_frwr_depth); + ia->ri_max_rdma_segs = + DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth); /* Reply chunks require segments for head and tail buffers */ - ia->ri_max_segs += 2; - if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS) - ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS; - return 0; -} - -/** - * frwr_maxpages - Compute size of largest payload - * @r_xprt: transport - * - * Returns maximum size of an RPC message, in pages. - * - * FRWR mode conveys a list of pages per chunk segment. The - * maximum length of that list is the FRWR page list depth. - */ -size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_ia *ia = &r_xprt->rx_ia; + ia->ri_max_rdma_segs += 2; + if (ia->ri_max_rdma_segs > RPCRDMA_MAX_HDR_SEGS) + ia->ri_max_rdma_segs = RPCRDMA_MAX_HDR_SEGS; + + /* Ensure the underlying device is capable of conveying the + * largest r/wsize NFS will ask for. This guarantees that + * failing over from one RDMA device to another will not + * break NFS I/O. + */ + if ((ia->ri_max_rdma_segs * ia->ri_max_frwr_depth) < RPCRDMA_MAX_SEGS) + return -ENOMEM; - return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth); + return 0; } /** @@ -323,31 +273,25 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt) * @nsegs: number of segments remaining * @writing: true when RDMA Write will be used * @xid: XID of RPC using the registered memory - * @out: initialized MR + * @mr: MR to fill in * * Prepare a REG_MR Work Request to register a memory region * for remote access via RDMA READ or RDMA WRITE. * * Returns the next segment or a negative errno pointer. - * On success, the prepared MR is planted in @out. + * On success, @mr is filled in. */ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, __be32 xid, - struct rpcrdma_mr **out) + struct rpcrdma_mr *mr) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS; - struct rpcrdma_mr *mr; - struct ib_mr *ibmr; struct ib_reg_wr *reg_wr; - int i, n; + int i, n, dma_nents; + struct ib_mr *ibmr; u8 key; - mr = rpcrdma_mr_get(r_xprt); - if (!mr) - goto out_getmr_err; - if (nsegs > ia->ri_max_frwr_depth) nsegs = ia->ri_max_frwr_depth; for (i = 0; i < nsegs;) { @@ -362,22 +306,23 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, ++seg; ++i; - if (holes_ok) + if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS) continue; if ((i < nsegs && offset_in_page(seg->mr_offset)) || offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } mr->mr_dir = rpcrdma_data_dir(writing); + mr->mr_nents = i; - mr->mr_nents = - ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir); - if (!mr->mr_nents) + dma_nents = ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, mr->mr_nents, + mr->mr_dir); + if (!dma_nents) goto out_dmamap_err; ibmr = mr->frwr.fr_mr; - n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE); - if (unlikely(n != mr->mr_nents)) + n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE); + if (n != dma_nents) goto out_mapmr_err; ibmr->iova &= 0x00000000ffffffff; @@ -397,22 +342,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, mr->mr_offset = ibmr->iova; trace_xprtrdma_mr_map(mr); - *out = mr; return seg; -out_getmr_err: - xprt_wait_for_buffer_space(&r_xprt->rx_xprt); - return ERR_PTR(-EAGAIN); - out_dmamap_err: mr->mr_dir = DMA_NONE; trace_xprtrdma_frwr_sgerr(mr, i); - rpcrdma_mr_put(mr); return ERR_PTR(-EIO); out_mapmr_err: trace_xprtrdma_frwr_maperr(mr, n); - rpcrdma_mr_recycle(mr); return ERR_PTR(-EIO); } @@ -449,7 +387,7 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req) struct ib_send_wr *post_wr; struct rpcrdma_mr *mr; - post_wr = &req->rl_sendctx->sc_wr; + post_wr = &req->rl_wr; list_for_each_entry(mr, &req->rl_registered, mr_list) { struct rpcrdma_frwr *frwr; @@ -465,9 +403,6 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req) post_wr = &frwr->fr_regwr.wr; } - /* If ib_post_send fails, the next ->send_request for - * @req will queue these MRs for recovery. - */ return ib_post_send(ia->ri_id->qp, post_wr, NULL); } @@ -485,7 +420,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); trace_xprtrdma_mr_remoteinv(mr); - rpcrdma_mr_unmap_and_put(mr); + rpcrdma_mr_put(mr); break; /* only one invalidated MR per RPC */ } } @@ -493,9 +428,9 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr) { if (wc->status != IB_WC_SUCCESS) - rpcrdma_mr_recycle(mr); + frwr_mr_recycle(mr); else - rpcrdma_mr_unmap_and_put(mr); + rpcrdma_mr_put(mr); } /** @@ -532,8 +467,8 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_li_wake(wc, frwr); - complete(&frwr->fr_linv_done); __frwr_release_mr(wc, mr); + complete(&frwr->fr_linv_done); } /** @@ -562,8 +497,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ frwr = NULL; prev = &first; - while (!list_empty(&req->rl_registered)) { - mr = rpcrdma_mr_pop(&req->rl_registered); + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; @@ -596,7 +530,6 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ bad_wr = NULL; rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); - trace_xprtrdma_post_send(req, rc); /* The final LOCAL_INV WR in the chain is supposed to * do the wake. If it was never posted, the wake will @@ -609,6 +542,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) /* Recycle MRs in the LOCAL_INV chain that did not get posted. */ + trace_xprtrdma_post_linv(req, rc); while (bad_wr) { frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); @@ -616,7 +550,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) bad_wr = bad_wr->next; list_del_init(&mr->mr_list); - rpcrdma_mr_recycle(mr); + frwr_mr_recycle(mr); } } @@ -632,11 +566,15 @@ static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, fr_cqe); struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_rep *rep = mr->mr_req->rl_reply; /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_li_done(wc, frwr); - rpcrdma_complete_rqst(frwr->fr_req->rl_reply); __frwr_release_mr(wc, mr); + + /* Ensure @rep is generated before __frwr_release_mr */ + smp_rmb(); + rpcrdma_complete_rqst(rep); } /** @@ -662,15 +600,13 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ frwr = NULL; prev = &first; - while (!list_empty(&req->rl_registered)) { - mr = rpcrdma_mr_pop(&req->rl_registered); + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; frwr = &mr->frwr; frwr->fr_cqe.done = frwr_wc_localinv; - frwr->fr_req = req; last = &frwr->fr_invwr; last->next = NULL; last->wr_cqe = &frwr->fr_cqe; @@ -697,18 +633,18 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ bad_wr = NULL; rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); - trace_xprtrdma_post_send(req, rc); if (!rc) return; /* Recycle MRs in the LOCAL_INV chain that did not get posted. */ + trace_xprtrdma_post_linv(req, rc); while (bad_wr) { frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); mr = container_of(frwr, struct rpcrdma_mr, frwr); bad_wr = bad_wr->next; - rpcrdma_mr_recycle(mr); + frwr_mr_recycle(mr); } /* The final LOCAL_INV WR in the chain is supposed to diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 4345e6912392..28020ec104d4 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -78,8 +78,6 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) size += rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ - dprintk("RPC: %s: max call header size = %u\n", - __func__, size); return size; } @@ -100,8 +98,6 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ - dprintk("RPC: %s: max reply header size = %u\n", - __func__, size); return size; } @@ -115,7 +111,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) */ void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt) { - unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs; + unsigned int maxsegs = r_xprt->rx_ia.ri_max_rdma_segs; struct rpcrdma_ep *ep = &r_xprt->rx_ep; ep->rep_max_inline_send = @@ -149,7 +145,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, remaining -= min_t(unsigned int, PAGE_SIZE - offset, remaining); offset = 0; - if (++count > r_xprt->rx_ia.ri_max_send_sges) + if (++count > r_xprt->rx_ep.rep_attr.cap.max_send_sge) return false; } } @@ -342,6 +338,31 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, return 0; } +static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpcrdma_mr_seg *seg, + int nsegs, bool writing, + struct rpcrdma_mr **mr) +{ + *mr = rpcrdma_mr_pop(&req->rl_free_mrs); + if (!*mr) { + *mr = rpcrdma_mr_get(r_xprt); + if (!*mr) + goto out_getmr_err; + trace_xprtrdma_mr_get(req); + (*mr)->mr_req = req; + } + + rpcrdma_mr_push(*mr, &req->rl_registered); + return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); + +out_getmr_err: + trace_xprtrdma_nomrs(req); + xprt_wait_for_buffer_space(&r_xprt->rx_xprt); + rpcrdma_mrs_refresh(r_xprt); + return ERR_PTR(-EAGAIN); +} + /* Register and XDR encode the Read list. Supports encoding a list of read * segments that belong to a single read chunk. * @@ -356,9 +377,10 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, * * Only a single @pos value is currently supported. */ -static noinline int -rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype) +static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype rtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -366,7 +388,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, unsigned int pos; int nsegs; - if (rtype == rpcrdma_noch) + if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) goto done; pos = rqst->rq_snd_buf.head[0].iov_len; @@ -379,10 +401,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return nsegs; do { - seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_read_segment(xdr, mr, pos) < 0) return -EMSGSIZE; @@ -411,9 +432,10 @@ done: * * Only a single Write chunk is currently supported. */ -static noinline int -rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) +static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -440,10 +462,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; @@ -474,9 +495,10 @@ done: * Returns zero on success, or a negative errno if a failure occurred. * @xdr is advanced to the next position in the stream. */ -static noinline int -rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) +static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -501,10 +523,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; @@ -539,6 +560,7 @@ static void rpcrdma_sendctx_done(struct kref *kref) */ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) { + struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf; struct ib_sge *sge; if (!sc->sc_unmap_count) @@ -550,7 +572,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) */ for (sge = &sc->sc_sges[2]; sc->sc_unmap_count; ++sge, --sc->sc_unmap_count) - ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length, + ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); @@ -558,152 +580,228 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) /* Prepare an SGE for the RPC-over-RDMA transport header. */ -static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, +static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, u32 len) { struct rpcrdma_sendctx *sc = req->rl_sendctx; struct rpcrdma_regbuf *rb = req->rl_rdmabuf; - struct ib_sge *sge = sc->sc_sges; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; + + sge->addr = rdmab_addr(rb); + sge->length = len; + sge->lkey = rdmab_lkey(rb); + + ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, + DMA_TO_DEVICE); +} + +/* The head iovec is straightforward, as it is usually already + * DMA-mapped. Sync the content that has changed. + */ +static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, unsigned int len) +{ + struct rpcrdma_sendctx *sc = req->rl_sendctx; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; + struct rpcrdma_regbuf *rb = req->rl_sendbuf; if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) - goto out_regbuf; + return false; + sge->addr = rdmab_addr(rb); sge->length = len; sge->lkey = rdmab_lkey(rb); ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); - sc->sc_wr.num_sge++; return true; +} -out_regbuf: - pr_err("rpcrdma: failed to DMA map a Send buffer\n"); +/* If there is a page list present, DMA map and prepare an + * SGE for each page to be sent. + */ +static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + struct rpcrdma_sendctx *sc = req->rl_sendctx; + struct rpcrdma_regbuf *rb = req->rl_sendbuf; + unsigned int page_base, len, remaining; + struct page **ppages; + struct ib_sge *sge; + + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_base = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + sge = &sc->sc_sges[req->rl_wr.num_sge++]; + len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); + sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages, + page_base, len, DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) + goto out_mapping_err; + + sge->length = len; + sge->lkey = rdmab_lkey(rb); + + sc->sc_unmap_count++; + ppages++; + remaining -= len; + page_base = 0; + } + + return true; + +out_mapping_err: + trace_xprtrdma_dma_maperr(sge->addr); return false; } -/* Prepare the Send SGEs. The head and tail iovec, and each entry - * in the page list, gets its own SGE. +/* The tail iovec may include an XDR pad for the page list, + * as well as additional content, and may not reside in the + * same page as the head iovec. */ -static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, +static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, struct xdr_buf *xdr, - enum rpcrdma_chunktype rtype) + unsigned int page_base, unsigned int len) { struct rpcrdma_sendctx *sc = req->rl_sendctx; - unsigned int sge_no, page_base, len, remaining; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; struct rpcrdma_regbuf *rb = req->rl_sendbuf; - struct ib_sge *sge = sc->sc_sges; - struct page *page, **ppages; + struct page *page = virt_to_page(xdr->tail[0].iov_base); - /* The head iovec is straightforward, as it is already - * DMA-mapped. Sync the content that has changed. - */ - if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) - goto out_regbuf; - sc->sc_device = rdmab_device(rb); - sge_no = 1; - sge[sge_no].addr = rdmab_addr(rb); - sge[sge_no].length = xdr->head[0].iov_len; - sge[sge_no].lkey = rdmab_lkey(rb); - ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr, - sge[sge_no].length, DMA_TO_DEVICE); + sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len, + DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) + goto out_mapping_err; - /* If there is a Read chunk, the page list is being handled - * via explicit RDMA, and thus is skipped here. However, the - * tail iovec may include an XDR pad for the page list, as - * well as additional content, and may not reside in the - * same page as the head iovec. - */ - if (rtype == rpcrdma_readch) { - len = xdr->tail[0].iov_len; + sge->length = len; + sge->lkey = rdmab_lkey(rb); + ++sc->sc_unmap_count; + return true; - /* Do not include the tail if it is only an XDR pad */ - if (len < 4) - goto out; +out_mapping_err: + trace_xprtrdma_dma_maperr(sge->addr); + return false; +} - page = virt_to_page(xdr->tail[0].iov_base); - page_base = offset_in_page(xdr->tail[0].iov_base); +/* Copy the tail to the end of the head buffer. + */ +static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + unsigned char *dst; - /* If the content in the page list is an odd length, - * xdr_write_pages() has added a pad at the beginning - * of the tail iovec. Force the tail's non-pad content - * to land at the next XDR position in the Send message. - */ - page_base += len & 3; - len -= len & 3; - goto map_tail; - } + dst = (unsigned char *)xdr->head[0].iov_base; + dst += xdr->head[0].iov_len + xdr->page_len; + memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len); + r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len; +} - /* If there is a page list present, temporarily DMA map - * and prepare an SGE for each page to be sent. - */ - if (xdr->page_len) { - ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); - page_base = offset_in_page(xdr->page_base); - remaining = xdr->page_len; - while (remaining) { - sge_no++; - if (sge_no > RPCRDMA_MAX_SEND_SGES - 2) - goto out_mapping_overflow; - - len = min_t(u32, PAGE_SIZE - page_base, remaining); - sge[sge_no].addr = - ib_dma_map_page(rdmab_device(rb), *ppages, - page_base, len, DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdmab_device(rb), - sge[sge_no].addr)) - goto out_mapping_err; - sge[sge_no].length = len; - sge[sge_no].lkey = rdmab_lkey(rb); - - sc->sc_unmap_count++; - ppages++; - remaining -= len; - page_base = 0; - } +/* Copy pagelist content into the head buffer. + */ +static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + unsigned int len, page_base, remaining; + struct page **ppages; + unsigned char *src, *dst; + + dst = (unsigned char *)xdr->head[0].iov_base; + dst += xdr->head[0].iov_len; + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_base = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + src = page_address(*ppages); + src += page_base; + len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); + memcpy(dst, src, len); + r_xprt->rx_stats.pullup_copy_count += len; + + ppages++; + dst += len; + remaining -= len; + page_base = 0; } +} - /* The tail iovec is not always constructed in the same - * page where the head iovec resides (see, for example, - * gss_wrap_req_priv). To neatly accommodate that case, - * DMA map it separately. - */ - if (xdr->tail[0].iov_len) { - page = virt_to_page(xdr->tail[0].iov_base); - page_base = offset_in_page(xdr->tail[0].iov_base); - len = xdr->tail[0].iov_len; +/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. + * When the head, pagelist, and tail are small, a pull-up copy + * is considerably less costly than DMA mapping the components + * of @xdr. + * + * Assumptions: + * - the caller has already verified that the total length + * of the RPC Call body will fit into @rl_sendbuf. + */ +static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + if (unlikely(xdr->tail[0].iov_len)) + rpcrdma_pullup_tail_iov(r_xprt, req, xdr); -map_tail: - sge_no++; - sge[sge_no].addr = - ib_dma_map_page(rdmab_device(rb), page, page_base, len, - DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr)) - goto out_mapping_err; - sge[sge_no].length = len; - sge[sge_no].lkey = rdmab_lkey(rb); - sc->sc_unmap_count++; - } + if (unlikely(xdr->page_len)) + rpcrdma_pullup_pagelist(r_xprt, req, xdr); -out: - sc->sc_wr.num_sge += sge_no; - if (sc->sc_unmap_count) + /* The whole RPC message resides in the head iovec now */ + return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len); +} + +static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + struct kvec *tail = &xdr->tail[0]; + + if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) + return false; + if (xdr->page_len) + if (!rpcrdma_prepare_pagelist(req, xdr)) + return false; + if (tail->iov_len) + if (!rpcrdma_prepare_tail_iov(req, xdr, + offset_in_page(tail->iov_base), + tail->iov_len)) + return false; + + if (req->rl_sendctx->sc_unmap_count) kref_get(&req->rl_kref); return true; +} -out_regbuf: - pr_err("rpcrdma: failed to DMA map a Send buffer\n"); - return false; +static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) + return false; -out_mapping_overflow: - rpcrdma_sendctx_unmap(sc); - pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); - return false; + /* If there is a Read chunk, the page list is being handled + * via explicit RDMA, and thus is skipped here. + */ -out_mapping_err: - rpcrdma_sendctx_unmap(sc); - trace_xprtrdma_dma_maperr(sge[sge_no].addr); - return false; + /* Do not include the tail if it is only an XDR pad */ + if (xdr->tail[0].iov_len > 3) { + unsigned int page_base, len; + + /* If the content in the page list is an odd length, + * xdr_write_pages() adds a pad at the beginning of + * the tail iovec. Force the tail's non-pad content to + * land at the next XDR position in the Send message. + */ + page_base = offset_in_page(xdr->tail[0].iov_base); + len = xdr->tail[0].iov_len; + page_base += len & 3; + len -= len & 3; + if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) + return false; + kref_get(&req->rl_kref); + } + + return true; } /** @@ -716,31 +814,52 @@ out_mapping_err: * * Returns 0 on success; otherwise a negative errno is returned. */ -int -rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, u32 hdrlen, - struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) +inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, u32 hdrlen, + struct xdr_buf *xdr, + enum rpcrdma_chunktype rtype) { int ret; ret = -EAGAIN; req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); if (!req->rl_sendctx) - goto err; - req->rl_sendctx->sc_wr.num_sge = 0; + goto out_nosc; req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_req = req; kref_init(&req->rl_kref); + req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe; + req->rl_wr.sg_list = req->rl_sendctx->sc_sges; + req->rl_wr.num_sge = 0; + req->rl_wr.opcode = IB_WR_SEND; + + rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen); ret = -EIO; - if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) - goto err; - if (rtype != rpcrdma_areadch) - if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype)) - goto err; + switch (rtype) { + case rpcrdma_noch_pullup: + if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_noch_mapped: + if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_readch: + if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_areadch: + break; + default: + goto out_unmap; + } + return 0; -err: +out_unmap: + rpcrdma_sendctx_unmap(req->rl_sendctx); +out_nosc: trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); return ret; } @@ -770,6 +889,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct xdr_stream *xdr = &req->rl_stream; enum rpcrdma_chunktype rtype, wtype; + struct xdr_buf *buf = &rqst->rq_snd_buf; bool ddp_allowed; __be32 *p; int ret; @@ -785,7 +905,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) goto out_err; *p++ = rqst->rq_xid; *p++ = rpcrdma_version; - *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); + *p++ = r_xprt->rx_buf.rb_max_requests; /* When the ULP employs a GSS flavor that guarantees integrity * or privacy, direct data placement of individual data items @@ -827,8 +947,9 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) */ if (rpcrdma_args_inline(r_xprt, rqst)) { *p++ = rdma_msg; - rtype = rpcrdma_noch; - } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { + rtype = buf->len < rdmab_length(req->rl_sendbuf) ? + rpcrdma_noch_pullup : rpcrdma_noch_mapped; + } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) { *p++ = rdma_msg; rtype = rpcrdma_readch; } else { @@ -837,17 +958,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) rtype = rpcrdma_areadch; } - /* If this is a retransmit, discard previously registered - * chunks. Very likely the connection has been replaced, - * so these registrations are invalid and unusable. - */ - while (unlikely(!list_empty(&req->rl_registered))) { - struct rpcrdma_mr *mr; - - mr = rpcrdma_mr_pop(&req->rl_registered); - rpcrdma_mr_recycle(mr); - } - /* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: * @@ -881,7 +991,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) goto out_err; ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, - &rqst->rq_snd_buf, rtype); + buf, rtype); if (ret) goto out_err; @@ -895,6 +1005,40 @@ out_err: return ret; } +static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt, + struct rpcrdma_buffer *buf, + u32 grant) +{ + buf->rb_credits = grant; + xprt->cwnd = grant << RPC_CWNDSHIFT; +} + +static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + + spin_lock(&xprt->transport_lock); + __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant); + spin_unlock(&xprt->transport_lock); +} + +/** + * rpcrdma_reset_cwnd - Reset the xprt's congestion window + * @r_xprt: controlling transport instance + * + * Prepare @r_xprt for the next connection by reinitializing + * its credit grant to one (see RFC 8166, Section 3.3.3). + */ +void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + + spin_lock(&xprt->transport_lock); + xprt->cong = 0; + __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1); + spin_unlock(&xprt->transport_lock); +} + /** * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs * @rqst: controlling RPC request @@ -934,7 +1078,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) curlen = rqst->rq_rcv_buf.head[0].iov_len; if (curlen > copy_len) curlen = copy_len; - trace_xprtrdma_fixup(rqst, copy_len, curlen); srcp += curlen; copy_len -= curlen; @@ -954,8 +1097,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) if (curlen > pagelist_len) curlen = pagelist_len; - trace_xprtrdma_fixup_pg(rqst, i, srcp, - copy_len, curlen); destp = kmap_atomic(ppages[i]); memcpy(destp + page_base, srcp, curlen); flush_dcache_page(ppages[i]); @@ -987,6 +1128,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) rqst->rq_private_buf.tail[0].iov_base = srcp; } + if (fixup_copy_count) + trace_xprtrdma_fixup(rqst, fixup_copy_count); return fixup_copy_count; } @@ -1240,8 +1383,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) struct rpc_rqst *rqst = rep->rr_rqst; int status; - xprt->reestablish_timeout = 0; - switch (rep->rr_proc) { case rdma_msg: status = rpcrdma_decode_msg(r_xprt, rep, rqst); @@ -1300,6 +1441,12 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) u32 credits; __be32 *p; + /* Any data means we had a useful conversation, so + * then we don't need to delay the next reconnect. + */ + if (xprt->reestablish_timeout) + xprt->reestablish_timeout = 0; + /* Fixed transport header fields */ xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, rep->rr_hdrbuf.head[0].iov_base, NULL); @@ -1329,14 +1476,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (credits == 0) credits = 1; /* don't deadlock */ - else if (credits > buf->rb_max_requests) - credits = buf->rb_max_requests; - if (buf->rb_credits != credits) { - spin_lock(&xprt->transport_lock); - buf->rb_credits = credits; - xprt->cwnd = credits << RPC_CWNDSHIFT; - spin_unlock(&xprt->transport_lock); - } + else if (credits > r_xprt->rx_ep.rep_max_requests) + credits = r_xprt->rx_ep.rep_max_requests; + if (buf->rb_credits != credits) + rpcrdma_update_cwnd(r_xprt, credits); + rpcrdma_post_recvs(r_xprt, false); req = rpcr_to_rdmar(rqst); if (req->rl_reply) { diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index abdb3004a1e3..97bca509a391 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -73,8 +73,6 @@ atomic_t rdma_stat_rq_prod; atomic_t rdma_stat_sq_poll; atomic_t rdma_stat_sq_prod; -struct workqueue_struct *svc_rdma_wq; - /* * This function implements reading and resetting an atomic_t stat * variable through read/write to a proc file. Any write to the file @@ -230,7 +228,6 @@ static struct ctl_table svcrdma_root_table[] = { void svc_rdma_cleanup(void) { dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n"); - destroy_workqueue(svc_rdma_wq); if (svcrdma_table_header) { unregister_sysctl_table(svcrdma_table_header); svcrdma_table_header = NULL; @@ -246,10 +243,6 @@ int svc_rdma_init(void) dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); - svc_rdma_wq = alloc_workqueue("svc_rdma", 0, 0); - if (!svc_rdma_wq) - return -ENOMEM; - if (!svcrdma_table_header) svcrdma_table_header = register_sysctl_table(svcrdma_root_table); diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index d1fcc41d5eb5..908e78bb87c6 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -195,6 +195,7 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer); #endif + rqst->rq_xtime = ktime_get(); rc = svc_rdma_bc_sendto(rdma, rqst, ctxt); if (rc) { svc_rdma_send_ctxt_put(rdma, ctxt); diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 65e2fb9aac65..96bccd398469 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -172,9 +172,10 @@ static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma, void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) { struct svc_rdma_recv_ctxt *ctxt; + struct llist_node *node; - while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) { - list_del(&ctxt->rc_list); + while ((node = llist_del_first(&rdma->sc_recv_ctxts))) { + ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); svc_rdma_recv_ctxt_destroy(rdma, ctxt); } } @@ -183,21 +184,18 @@ static struct svc_rdma_recv_ctxt * svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) { struct svc_rdma_recv_ctxt *ctxt; + struct llist_node *node; - spin_lock(&rdma->sc_recv_lock); - ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts); - if (!ctxt) + node = llist_del_first(&rdma->sc_recv_ctxts); + if (!node) goto out_empty; - list_del(&ctxt->rc_list); - spin_unlock(&rdma->sc_recv_lock); + ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); out: ctxt->rc_page_count = 0; return ctxt; out_empty: - spin_unlock(&rdma->sc_recv_lock); - ctxt = svc_rdma_recv_ctxt_alloc(rdma); if (!ctxt) return NULL; @@ -218,11 +216,9 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, for (i = 0; i < ctxt->rc_page_count; i++) put_page(ctxt->rc_pages[i]); - if (!ctxt->rc_temp) { - spin_lock(&rdma->sc_recv_lock); - list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts); - spin_unlock(&rdma->sc_recv_lock); - } else + if (!ctxt->rc_temp) + llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts); + else svc_rdma_recv_ctxt_destroy(rdma, ctxt); } diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 6fdba72f89f4..f3f108090aa4 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -233,11 +233,15 @@ void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, /* The first SGE contains the transport header, which * remains mapped until @ctxt is destroyed. */ - for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) + for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) { ib_dma_unmap_page(device, ctxt->sc_sges[i].addr, ctxt->sc_sges[i].length, DMA_TO_DEVICE); + trace_svcrdma_dma_unmap_page(rdma, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length); + } for (i = 0; i < ctxt->sc_page_count; ++i) put_page(ctxt->sc_pages[i]); @@ -490,6 +494,7 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, dma_addr_t dma_addr; dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); + trace_svcrdma_dma_map_page(rdma, dma_addr, len); if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; @@ -499,7 +504,6 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, return 0; out_maperr: - trace_svcrdma_dma_map_page(rdma, page); return -EIO; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 3fe665152d95..145a3615c319 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -140,14 +140,13 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts); - INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts); + init_llist_head(&cma_xprt->sc_recv_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock); spin_lock_init(&cma_xprt->sc_send_lock); - spin_lock_init(&cma_xprt->sc_recv_lock); spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); /* @@ -454,14 +453,14 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) dprintk("svcrdma: error creating PD for connect request\n"); goto errout; } - newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth, - 0, IB_POLL_WORKQUEUE); + newxprt->sc_sq_cq = ib_alloc_cq_any(dev, newxprt, newxprt->sc_sq_depth, + IB_POLL_WORKQUEUE); if (IS_ERR(newxprt->sc_sq_cq)) { dprintk("svcrdma: error creating SQ CQ for connect request\n"); goto errout; } - newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, rq_depth, - 0, IB_POLL_WORKQUEUE); + newxprt->sc_rq_cq = + ib_alloc_cq_any(dev, newxprt, rq_depth, IB_POLL_WORKQUEUE); if (IS_ERR(newxprt->sc_rq_cq)) { dprintk("svcrdma: error creating RQ CQ for connect request\n"); goto errout; @@ -630,8 +629,9 @@ static void svc_rdma_free(struct svc_xprt *xprt) { struct svcxprt_rdma *rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); + INIT_WORK(&rdma->sc_work, __svc_rdma_free); - queue_work(svc_rdma_wq, &rdma->sc_work); + schedule_work(&rdma->sc_work); } static int svc_rdma_has_wspace(struct svc_xprt *xprt) diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 2ec349ed4770..3cfeba68ee9a 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -243,16 +243,13 @@ xprt_rdma_connect_worker(struct work_struct *work) rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia); xprt_clear_connecting(xprt); if (r_xprt->rx_ep.rep_connected > 0) { - if (!xprt_test_and_set_connected(xprt)) { - xprt->stat.connect_count++; - xprt->stat.connect_time += (long)jiffies - - xprt->stat.connect_start; - xprt_wake_pending_tasks(xprt, -EAGAIN); - } - } else { - if (xprt_test_and_clear_connected(xprt)) - xprt_wake_pending_tasks(xprt, rc); + xprt->stat.connect_count++; + xprt->stat.connect_time += (long)jiffies - + xprt->stat.connect_start; + xprt_set_connected(xprt); + rc = -EAGAIN; } + xprt_wake_pending_tasks(xprt, rc); } /** @@ -319,7 +316,8 @@ xprt_setup_rdma(struct xprt_create *args) if (args->addrlen > sizeof(xprt->addr)) return ERR_PTR(-EBADF); - xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0); + xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, + xprt_rdma_slot_table_entries); if (!xprt) return ERR_PTR(-ENOMEM); @@ -361,19 +359,13 @@ xprt_setup_rdma(struct xprt_create *args) if (rc) goto out3; - INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, - xprt_rdma_connect_worker); - - xprt->max_payload = frwr_maxpages(new_xprt); - if (xprt->max_payload == 0) - goto out4; - xprt->max_payload <<= PAGE_SHIFT; - dprintk("RPC: %s: transport data payload maximum: %zu bytes\n", - __func__, xprt->max_payload); - if (!try_module_get(THIS_MODULE)) goto out4; + INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, + xprt_rdma_connect_worker); + xprt->max_payload = RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT; + dprintk("RPC: %s: %s:%s\n", __func__, xprt->address_strings[RPC_DISPLAY_ADDR], xprt->address_strings[RPC_DISPLAY_PORT]); @@ -423,17 +415,10 @@ void xprt_rdma_close(struct rpc_xprt *xprt) if (ep->rep_connected == -ENODEV) return; - if (ep->rep_connected > 0) - xprt->reestablish_timeout = 0; rpcrdma_ep_disconnect(ep, ia); - /* Prepare @xprt for the next connection by reinitializing - * its credit grant to one (see RFC 8166, Section 3.3.3). - */ - r_xprt->rx_buf.rb_credits = 1; - xprt->cwnd = RPC_CWNDSHIFT; - out: + xprt->reestablish_timeout = 0; ++xprt->connect_cookie; xprt_disconnect_done(xprt); } @@ -451,12 +436,6 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) struct sockaddr *sap = (struct sockaddr *)&xprt->addr; char buf[8]; - dprintk("RPC: %s: setting port for xprt %p (%s:%s) to %u\n", - __func__, xprt, - xprt->address_strings[RPC_DISPLAY_ADDR], - xprt->address_strings[RPC_DISPLAY_PORT], - port); - rpc_set_port(sap, port); kfree(xprt->address_strings[RPC_DISPLAY_PORT]); @@ -466,6 +445,9 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); snprintf(buf, sizeof(buf), "%4hx", port); xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); + + trace_xprtrdma_op_setport(container_of(xprt, struct rpcrdma_xprt, + rx_xprt)); } /** @@ -494,9 +476,9 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) * @reconnect_timeout: reconnect timeout after server disconnects * */ -static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt, - unsigned long connect_timeout, - unsigned long reconnect_timeout) +static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt, + unsigned long connect_timeout, + unsigned long reconnect_timeout) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); @@ -537,13 +519,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); unsigned long delay; - trace_xprtrdma_op_connect(r_xprt); - delay = 0; if (r_xprt->rx_ep.rep_connected != 0) { delay = xprt_reconnect_delay(xprt); xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO); } + trace_xprtrdma_op_connect(r_xprt, delay); queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker, delay); } @@ -571,6 +552,7 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) return; out_sleep: + set_bit(XPRT_CONGESTED, &xprt->state); rpc_sleep_on(&xprt->backlog, task, NULL); task->tk_status = -EAGAIN; } @@ -589,7 +571,8 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst) memset(rqst, 0, sizeof(*rqst)); rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst)); - rpc_wake_up_next(&xprt->backlog); + if (unlikely(!rpc_wake_up_next(&xprt->backlog))) + clear_bit(XPRT_CONGESTED, &xprt->state); } static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt, @@ -803,7 +786,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = { .send_request = xprt_rdma_send_request, .close = xprt_rdma_close, .destroy = xprt_rdma_destroy, - .set_connect_timeout = xprt_rdma_tcp_set_connect_timeout, + .set_connect_timeout = xprt_rdma_set_connect_timeout, .print_stats = xprt_rdma_print_stats, .enable_swap = xprt_rdma_enable_swap, .disable_swap = xprt_rdma_disable_swap, diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 805b1f35e1ca..353f61ac8d51 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -53,6 +53,7 @@ #include <linux/slab.h> #include <linux/sunrpc/addr.h> #include <linux/sunrpc/svc_rdma.h> +#include <linux/log2.h> #include <asm-generic/barrier.h> #include <asm/bitops.h> @@ -73,15 +74,21 @@ /* * internal functions */ -static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); +static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_sendctx *sc); +static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt); +static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep); +static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt); static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); -static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); +static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt); static struct rpcrdma_regbuf * rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction, gfp_t flags); static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb); static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb); -static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); /* Wait for outstanding transport work to finish. ib_drain_qp * handles the drains in the wrong order for us, so open code @@ -122,7 +129,7 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context) /** * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC - * @cq: completion queue (ignored) + * @cq: completion queue * @wc: completed WR * */ @@ -135,7 +142,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_send(sc, wc); - rpcrdma_sendctx_put_locked(sc); + rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc); } /** @@ -167,19 +174,18 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) rdmab_addr(rep->rr_rdmabuf), wc->byte_len, DMA_FROM_DEVICE); - rpcrdma_post_recvs(r_xprt, false); rpcrdma_reply_handler(rep); return; out_flushed: - rpcrdma_recv_buffer_put(rep); + rpcrdma_rep_destroy(rep); } -static void -rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, - struct rdma_conn_param *param) +static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt, + struct rdma_conn_param *param) { const struct rpcrdma_connect_private *pmsg = param->private_data; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; unsigned int rsize, wsize; /* Default settings for RPC-over-RDMA Version One */ @@ -195,13 +201,11 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); } - if (rsize < r_xprt->rx_ep.rep_inline_recv) - r_xprt->rx_ep.rep_inline_recv = rsize; - if (wsize < r_xprt->rx_ep.rep_inline_send) - r_xprt->rx_ep.rep_inline_send = wsize; - dprintk("RPC: %s: max send %u, max recv %u\n", __func__, - r_xprt->rx_ep.rep_inline_send, - r_xprt->rx_ep.rep_inline_recv); + if (rsize < ep->rep_inline_recv) + ep->rep_inline_recv = rsize; + if (wsize < ep->rep_inline_send) + ep->rep_inline_send = wsize; + rpcrdma_set_max_header_sizes(r_xprt); } @@ -244,6 +248,7 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) ia->ri_id->device->name, rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt)); #endif + init_completion(&ia->ri_remove_done); set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); ep->rep_connected = -ENODEV; xprt_force_disconnect(xprt); @@ -255,7 +260,8 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) case RDMA_CM_EVENT_ESTABLISHED: ++xprt->connect_cookie; ep->rep_connected = 1; - rpcrdma_update_connect_private(r_xprt, &event->param.conn); + rpcrdma_update_cm_private(r_xprt, &event->param.conn); + trace_xprtrdma_inline_thresh(r_xprt); wake_up_all(&ep->rep_connect_wait); break; case RDMA_CM_EVENT_CONNECT_ERROR: @@ -295,10 +301,7 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) struct rdma_cm_id *id; int rc; - trace_xprtrdma_conn_start(xprt); - init_completion(&ia->ri_done); - init_completion(&ia->ri_remove_done); id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler, xprt, RDMA_PS_TCP, IB_QPT_RC); @@ -312,10 +315,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) if (rc) goto out; rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); - if (rc < 0) { - trace_xprtrdma_conn_tout(xprt); + if (rc < 0) goto out; - } rc = ia->ri_async_rc; if (rc) @@ -326,10 +327,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) if (rc) goto out; rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); - if (rc < 0) { - trace_xprtrdma_conn_tout(xprt); + if (rc < 0) goto out; - } rc = ia->ri_async_rc; if (rc) goto out; @@ -371,18 +370,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt) goto out_err; } - switch (xprt_rdma_memreg_strategy) { - case RPCRDMA_FRWR: - if (frwr_is_supported(ia->ri_id->device)) - break; - /*FALLTHROUGH*/ - default: - pr_err("rpcrdma: Device %s does not support memreg mode %d\n", - ia->ri_id->device->name, xprt_rdma_memreg_strategy); - rc = -EINVAL; - goto out_err; - } - return 0; out_err: @@ -396,6 +383,8 @@ out_err: * * Divest transport H/W resources associated with this adapter, * but allow it to be restored later. + * + * Caller must hold the transport send lock. */ void rpcrdma_ia_remove(struct rpcrdma_ia *ia) @@ -403,11 +392,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); struct rpcrdma_ep *ep = &r_xprt->rx_ep; - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_req *req; - struct rpcrdma_rep *rep; - - cancel_delayed_work_sync(&buf->rb_refresh_worker); /* This is similar to rpcrdma_ep_destroy, but: * - Don't cancel the connect worker. @@ -429,14 +413,10 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) /* The ULP is responsible for ensuring all DMA * mappings and MRs are gone. */ - list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) - rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf); - list_for_each_entry(req, &buf->rb_allreqs, rl_all) { - rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf); - rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); - rpcrdma_regbuf_dma_unmap(req->rl_recvbuf); - } - rpcrdma_mrs_destroy(buf); + rpcrdma_reps_unmap(r_xprt); + rpcrdma_reqs_reset(r_xprt); + rpcrdma_mrs_destroy(r_xprt); + rpcrdma_sendctxs_destroy(r_xprt); ib_dealloc_pd(ia->ri_pd); ia->ri_pd = NULL; @@ -479,30 +459,20 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; struct ib_cq *sendcq, *recvcq; - unsigned int max_sge; int rc; - ep->rep_max_requests = xprt_rdma_slot_table_entries; + ep->rep_max_requests = r_xprt->rx_xprt.max_reqs; ep->rep_inline_send = xprt_rdma_max_inline_write; ep->rep_inline_recv = xprt_rdma_max_inline_read; - max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge, - RPCRDMA_MAX_SEND_SGES); - if (max_sge < RPCRDMA_MIN_SEND_SGES) { - pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); - return -ENOMEM; - } - ia->ri_max_send_sges = max_sge; - - rc = frwr_open(ia, ep); + rc = frwr_query_device(r_xprt, ia->ri_id->device); if (rc) return rc; + r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->rep_max_requests); ep->rep_attr.event_handler = rpcrdma_qp_event_handler; ep->rep_attr.qp_context = ep; ep->rep_attr.srq = NULL; - ep->rep_attr.cap.max_send_sge = max_sge; - ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; ep->rep_attr.qp_type = IB_QPT_RC; @@ -521,18 +491,17 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) init_waitqueue_head(&ep->rep_connect_wait); ep->rep_receive_count = 0; - sendcq = ib_alloc_cq(ia->ri_id->device, NULL, - ep->rep_attr.cap.max_send_wr + 1, - ia->ri_id->device->num_comp_vectors > 1 ? 1 : 0, - IB_POLL_WORKQUEUE); + sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt, + ep->rep_attr.cap.max_send_wr + 1, + IB_POLL_WORKQUEUE); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); goto out1; } - recvcq = ib_alloc_cq(ia->ri_id->device, NULL, - ep->rep_attr.cap.max_recv_wr + 1, - 0, IB_POLL_WORKQUEUE); + recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL, + ep->rep_attr.cap.max_recv_wr + 1, + IB_POLL_WORKQUEUE); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); goto out2; @@ -605,10 +574,11 @@ void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt) * Unlike a normal reconnection, a fresh PD and a new set * of MRs and buffers is needed. */ -static int -rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) +static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, + struct ib_qp_init_attr *qp_init_attr) { + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; int rc, err; trace_xprtrdma_reinsert(r_xprt); @@ -623,15 +593,14 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); goto out2; } + memcpy(qp_init_attr, &ep->rep_attr, sizeof(*qp_init_attr)); rc = -ENETUNREACH; - err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); + err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr); if (err) { pr_err("rpcrdma: rdma_create_qp returned %d\n", err); goto out3; } - - rpcrdma_mrs_create(r_xprt); return 0; out3: @@ -642,16 +611,14 @@ out1: return rc; } -static int -rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, - struct rpcrdma_ia *ia) +static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, + struct ib_qp_init_attr *qp_init_attr) { + struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rdma_cm_id *id, *old; int err, rc; - trace_xprtrdma_reconnect(r_xprt); - - rpcrdma_ep_disconnect(ep, ia); + rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia); rc = -EHOSTUNREACH; id = rpcrdma_create_id(r_xprt, ia); @@ -673,7 +640,7 @@ rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, goto out_destroy; } - err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); + err = rdma_create_qp(id, ia->ri_pd, qp_init_attr); if (err) goto out_destroy; @@ -698,25 +665,26 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct ib_qp_init_attr qp_init_attr; int rc; retry: + memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr)); switch (ep->rep_connected) { case 0: - dprintk("RPC: %s: connecting...\n", __func__); - rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); + rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr); if (rc) { rc = -ENETUNREACH; goto out_noupdate; } break; case -ENODEV: - rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); + rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr); if (rc) goto out_noupdate; break; default: - rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); + rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr); if (rc) goto out; } @@ -724,12 +692,19 @@ retry: ep->rep_connected = 0; xprt_clear_connected(xprt); + rpcrdma_reset_cwnd(r_xprt); rpcrdma_post_recvs(r_xprt, true); + rc = rpcrdma_sendctxs_create(r_xprt); + if (rc) + goto out; + rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); if (rc) goto out; + if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) + xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); if (ep->rep_connected <= 0) { if (ep->rep_connected == -EAGAIN) @@ -738,13 +713,19 @@ retry: goto out; } - dprintk("RPC: %s: connected\n", __func__); + rc = rpcrdma_reqs_setup(r_xprt); + if (rc) { + rpcrdma_ep_disconnect(ep, ia); + goto out; + } + rpcrdma_mrs_create(r_xprt); out: if (rc) ep->rep_connected = rc; out_noupdate: + trace_xprtrdma_connect(r_xprt, rc); return rc; } @@ -753,11 +734,8 @@ out_noupdate: * @ep: endpoint to disconnect * @ia: associated interface adapter * - * This is separate from destroy to facilitate the ability - * to reconnect without recreating the endpoint. - * - * This call is not reentrant, and must not be made in parallel - * on the same endpoint. + * Caller serializes. Either the transport send lock is held, + * or we're being called to destroy the transport. */ void rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) @@ -776,6 +754,9 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) trace_xprtrdma_disconnect(r_xprt, rc); rpcrdma_xprt_drain(r_xprt); + rpcrdma_reqs_reset(r_xprt); + rpcrdma_mrs_destroy(r_xprt); + rpcrdma_sendctxs_destroy(r_xprt); } /* Fixed-size circular FIFO queue. This implementation is wait-free and @@ -795,27 +776,28 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) * queue activity, and rpcrdma_xprt_drain has flushed all remaining * Send requests. */ -static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) +static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; unsigned long i; + if (!buf->rb_sc_ctxs) + return; for (i = 0; i <= buf->rb_sc_last; i++) kfree(buf->rb_sc_ctxs[i]); kfree(buf->rb_sc_ctxs); + buf->rb_sc_ctxs = NULL; } -static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) +static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep) { struct rpcrdma_sendctx *sc; - sc = kzalloc(struct_size(sc, sc_sges, ia->ri_max_send_sges), + sc = kzalloc(struct_size(sc, sc_sges, ep->rep_attr.cap.max_send_sge), GFP_KERNEL); if (!sc) return NULL; - sc->sc_wr.wr_cqe = &sc->sc_cqe; - sc->sc_wr.sg_list = sc->sc_sges; - sc->sc_wr.opcode = IB_WR_SEND; sc->sc_cqe.done = rpcrdma_wc_send; return sc; } @@ -831,22 +813,22 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) * the ->send_request call to fail temporarily before too many * Sends are posted. */ - i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; - dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); + i = r_xprt->rx_ep.rep_max_requests + RPCRDMA_MAX_BC_REQUESTS; buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); if (!buf->rb_sc_ctxs) return -ENOMEM; buf->rb_sc_last = i - 1; for (i = 0; i <= buf->rb_sc_last; i++) { - sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); + sc = rpcrdma_sendctx_create(&r_xprt->rx_ep); if (!sc) return -ENOMEM; - sc->sc_xprt = r_xprt; buf->rb_sc_ctxs[i] = sc; } + buf->rb_sc_head = 0; + buf->rb_sc_tail = 0; return 0; } @@ -906,6 +888,7 @@ out_emptyq: /** * rpcrdma_sendctx_put_locked - Release a send context + * @r_xprt: controlling transport instance * @sc: send context to release * * Usage: Called from Send completion to return a sendctxt @@ -913,10 +896,10 @@ out_emptyq: * * The caller serializes calls to this function (per transport). */ -static void -rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) +static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_sendctx *sc) { - struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; unsigned long next_tail; /* Unmap SGEs of previously completed but unsignaled @@ -934,7 +917,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) /* Paired with READ_ONCE */ smp_store_release(&buf->rb_sc_tail, next_tail); - xprt_write_space(&sc->sc_xprt->rx_xprt); + xprt_write_space(&r_xprt->rx_xprt); } static void @@ -943,14 +926,12 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; unsigned int count; - LIST_HEAD(free); - LIST_HEAD(all); - for (count = 0; count < ia->ri_max_segs; count++) { + for (count = 0; count < ia->ri_max_rdma_segs; count++) { struct rpcrdma_mr *mr; int rc; - mr = kzalloc(sizeof(*mr), GFP_KERNEL); + mr = kzalloc(sizeof(*mr), GFP_NOFS); if (!mr) break; @@ -962,15 +943,13 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) mr->mr_xprt = r_xprt; - list_add(&mr->mr_list, &free); - list_add(&mr->mr_all, &all); + spin_lock(&buf->rb_lock); + rpcrdma_mr_push(mr, &buf->rb_mrs); + list_add(&mr->mr_all, &buf->rb_all_mrs); + spin_unlock(&buf->rb_lock); } - spin_lock(&buf->rb_mrlock); - list_splice(&free, &buf->rb_mrs); - list_splice(&all, &buf->rb_all); r_xprt->rx_stats.mrs_allocated += count; - spin_unlock(&buf->rb_mrlock); trace_xprtrdma_createmrs(r_xprt, count); } @@ -978,7 +957,7 @@ static void rpcrdma_mr_refresh_worker(struct work_struct *work) { struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, - rb_refresh_worker.work); + rb_refresh_worker); struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); @@ -987,6 +966,28 @@ rpcrdma_mr_refresh_worker(struct work_struct *work) } /** + * rpcrdma_mrs_refresh - Wake the MR refresh worker + * @r_xprt: controlling transport instance + * + */ +void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; + + /* If there is no underlying device, it's no use to + * wake the refresh worker. + */ + if (ep->rep_connected != -ENODEV) { + /* The work is scheduled on a WQ_MEM_RECLAIM + * workqueue in order to prevent MR allocation + * from recursing into NFS during direct reclaim. + */ + queue_work(xprtiod_workqueue, &buf->rb_refresh_worker); + } +} + +/** * rpcrdma_req_create - Allocate an rpcrdma_req object * @r_xprt: controlling r_xprt * @size: initial size, in bytes, of send and receive buffers @@ -998,45 +999,120 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, gfp_t flags) { struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; - struct rpcrdma_regbuf *rb; struct rpcrdma_req *req; req = kzalloc(sizeof(*req), flags); if (req == NULL) goto out1; - rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags); - if (!rb) - goto out2; - req->rl_rdmabuf = rb; - xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb)); - req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags); if (!req->rl_sendbuf) - goto out3; + goto out2; req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags); if (!req->rl_recvbuf) - goto out4; + goto out3; + INIT_LIST_HEAD(&req->rl_free_mrs); INIT_LIST_HEAD(&req->rl_registered); spin_lock(&buffer->rb_lock); list_add(&req->rl_all, &buffer->rb_allreqs); spin_unlock(&buffer->rb_lock); return req; -out4: - kfree(req->rl_sendbuf); out3: - kfree(req->rl_rdmabuf); + kfree(req->rl_sendbuf); out2: kfree(req); out1: return NULL; } -static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, - bool temp) +/** + * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object + * @r_xprt: controlling transport instance + * @req: rpcrdma_req object to set up + * + * Returns zero on success, and a negative errno on failure. + */ +int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + struct rpcrdma_regbuf *rb; + size_t maxhdrsize; + + /* Compute maximum header buffer size in bytes */ + maxhdrsize = rpcrdma_fixed_maxsz + 3 + + r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz; + maxhdrsize *= sizeof(__be32); + rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize), + DMA_TO_DEVICE, GFP_KERNEL); + if (!rb) + goto out; + + if (!__rpcrdma_regbuf_dma_map(r_xprt, rb)) + goto out_free; + + req->rl_rdmabuf = rb; + xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb)); + return 0; + +out_free: + rpcrdma_regbuf_free(rb); +out: + return -ENOMEM; +} + +/* ASSUMPTION: the rb_allreqs list is stable for the duration, + * and thus can be walked without holding rb_lock. Eg. the + * caller is holding the transport send lock to exclude + * device removal or disconnection. + */ +static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_req *req; + int rc; + + list_for_each_entry(req, &buf->rb_allreqs, rl_all) { + rc = rpcrdma_req_setup(r_xprt, req); + if (rc) + return rc; + } + return 0; +} + +static void rpcrdma_req_reset(struct rpcrdma_req *req) +{ + /* Credits are valid for only one connection */ + req->rl_slot.rq_cong = 0; + + rpcrdma_regbuf_free(req->rl_rdmabuf); + req->rl_rdmabuf = NULL; + + rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); + rpcrdma_regbuf_dma_unmap(req->rl_recvbuf); +} + +/* ASSUMPTION: the rb_allreqs list is stable for the duration, + * and thus can be walked without holding rb_lock. Eg. the + * caller is holding the transport send lock to exclude + * device removal or disconnection. + */ +static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_req *req; + + list_for_each_entry(req, &buf->rb_allreqs, rl_all) + rpcrdma_req_reset(req); +} + +/* No locking needed here. This function is called only by the + * Receive completion handler. + */ +static noinline +struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, + bool temp) { struct rpcrdma_rep *rep; @@ -1049,6 +1125,9 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, if (!rep->rr_rdmabuf) goto out_free; + if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) + goto out_free_regbuf; + xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf), rdmab_length(rep->rr_rdmabuf)); rep->rr_cqe.done = rpcrdma_wc_receive; @@ -1058,14 +1137,63 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.num_sge = 1; rep->rr_temp = temp; + list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps); return rep; +out_free_regbuf: + rpcrdma_regbuf_free(rep->rr_rdmabuf); out_free: kfree(rep); out: return NULL; } +/* No locking needed here. This function is invoked only by the + * Receive completion handler, or during transport shutdown. + */ +static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) +{ + list_del(&rep->rr_all); + rpcrdma_regbuf_free(rep->rr_rdmabuf); + kfree(rep); +} + +static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) +{ + struct llist_node *node; + + /* Calls to llist_del_first are required to be serialized */ + node = llist_del_first(&buf->rb_free_reps); + if (!node) + return NULL; + return llist_entry(node, struct rpcrdma_rep, rr_node); +} + +static void rpcrdma_rep_put(struct rpcrdma_buffer *buf, + struct rpcrdma_rep *rep) +{ + llist_add(&rep->rr_node, &buf->rb_free_reps); +} + +static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_rep *rep; + + list_for_each_entry(rep, &buf->rb_all_reps, rr_all) { + rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf); + rep->rr_temp = true; + } +} + +static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_rep *rep; + + while ((rep = rpcrdma_rep_get_locked(buf)) != NULL) + rpcrdma_rep_destroy(rep); +} + /** * rpcrdma_buffer_create - Create initial set of req/rep objects * @r_xprt: transport instance to (re)initialize @@ -1077,37 +1205,28 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; int i, rc; - buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests; buf->rb_bc_srv_max_requests = 0; - spin_lock_init(&buf->rb_mrlock); spin_lock_init(&buf->rb_lock); INIT_LIST_HEAD(&buf->rb_mrs); - INIT_LIST_HEAD(&buf->rb_all); - INIT_DELAYED_WORK(&buf->rb_refresh_worker, - rpcrdma_mr_refresh_worker); - - rpcrdma_mrs_create(r_xprt); + INIT_LIST_HEAD(&buf->rb_all_mrs); + INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker); INIT_LIST_HEAD(&buf->rb_send_bufs); INIT_LIST_HEAD(&buf->rb_allreqs); + INIT_LIST_HEAD(&buf->rb_all_reps); rc = -ENOMEM; - for (i = 0; i < buf->rb_max_requests; i++) { + for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) { struct rpcrdma_req *req; - req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE, + req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2, GFP_KERNEL); if (!req) goto out; list_add(&req->rl_list, &buf->rb_send_bufs); } - buf->rb_credits = 1; - INIT_LIST_HEAD(&buf->rb_recv_bufs); - - rc = rpcrdma_sendctxs_create(r_xprt); - if (rc) - goto out; + init_llist_head(&buf->rb_free_reps); return 0; out: @@ -1115,58 +1234,62 @@ out: return rc; } -static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) -{ - rpcrdma_regbuf_free(rep->rr_rdmabuf); - kfree(rep); -} - /** * rpcrdma_req_destroy - Destroy an rpcrdma_req object * @req: unused object to be destroyed * - * This function assumes that the caller prevents concurrent device - * unload and transport tear-down. + * Relies on caller holding the transport send lock to protect + * removing req->rl_all from buf->rb_all_reqs safely. */ -void -rpcrdma_req_destroy(struct rpcrdma_req *req) +void rpcrdma_req_destroy(struct rpcrdma_req *req) { + struct rpcrdma_mr *mr; + list_del(&req->rl_all); + while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) { + struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf; + + spin_lock(&buf->rb_lock); + list_del(&mr->mr_all); + spin_unlock(&buf->rb_lock); + + frwr_release_mr(mr); + } + rpcrdma_regbuf_free(req->rl_recvbuf); rpcrdma_regbuf_free(req->rl_sendbuf); rpcrdma_regbuf_free(req->rl_rdmabuf); kfree(req); } -static void -rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) +/** + * rpcrdma_mrs_destroy - Release all of a transport's MRs + * @r_xprt: controlling transport instance + * + * Relies on caller holding the transport send lock to protect + * removing mr->mr_list from req->rl_free_mrs safely. + */ +static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt) { - struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, - rx_buf); + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_mr *mr; - unsigned int count; - count = 0; - spin_lock(&buf->rb_mrlock); - while (!list_empty(&buf->rb_all)) { - mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all); - list_del(&mr->mr_all); + cancel_work_sync(&buf->rb_refresh_worker); - spin_unlock(&buf->rb_mrlock); - - /* Ensure MW is not on any rl_registered list */ - if (!list_empty(&mr->mr_list)) - list_del(&mr->mr_list); + spin_lock(&buf->rb_lock); + while ((mr = list_first_entry_or_null(&buf->rb_all_mrs, + struct rpcrdma_mr, + mr_all)) != NULL) { + list_del(&mr->mr_list); + list_del(&mr->mr_all); + spin_unlock(&buf->rb_lock); frwr_release_mr(mr); - count++; - spin_lock(&buf->rb_mrlock); - } - spin_unlock(&buf->rb_mrlock); - r_xprt->rx_stats.mrs_allocated = 0; - dprintk("RPC: %s: released %u MRs\n", __func__, count); + spin_lock(&buf->rb_lock); + } + spin_unlock(&buf->rb_lock); } /** @@ -1180,18 +1303,7 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { - cancel_delayed_work_sync(&buf->rb_refresh_worker); - - rpcrdma_sendctxs_destroy(buf); - - while (!list_empty(&buf->rb_recv_bufs)) { - struct rpcrdma_rep *rep; - - rep = list_first_entry(&buf->rb_recv_bufs, - struct rpcrdma_rep, rr_list); - list_del(&rep->rr_list); - rpcrdma_rep_destroy(rep); - } + rpcrdma_reps_destroy(buf); while (!list_empty(&buf->rb_send_bufs)) { struct rpcrdma_req *req; @@ -1201,8 +1313,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) list_del(&req->rl_list); rpcrdma_req_destroy(req); } - - rpcrdma_mrs_destroy(buf); } /** @@ -1216,54 +1326,20 @@ struct rpcrdma_mr * rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_mr *mr = NULL; - - spin_lock(&buf->rb_mrlock); - if (!list_empty(&buf->rb_mrs)) - mr = rpcrdma_mr_pop(&buf->rb_mrs); - spin_unlock(&buf->rb_mrlock); + struct rpcrdma_mr *mr; - if (!mr) - goto out_nomrs; + spin_lock(&buf->rb_lock); + mr = rpcrdma_mr_pop(&buf->rb_mrs); + spin_unlock(&buf->rb_lock); return mr; - -out_nomrs: - trace_xprtrdma_nomrs(r_xprt); - if (r_xprt->rx_ep.rep_connected != -ENODEV) - schedule_delayed_work(&buf->rb_refresh_worker, 0); - - /* Allow the reply handler and refresh worker to run */ - cond_resched(); - - return NULL; -} - -static void -__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr) -{ - spin_lock(&buf->rb_mrlock); - rpcrdma_mr_push(mr, &buf->rb_mrs); - spin_unlock(&buf->rb_mrlock); } /** - * rpcrdma_mr_put - Release an rpcrdma_mr object - * @mr: object to release + * rpcrdma_mr_put - DMA unmap an MR and release it + * @mr: MR to release * */ -void -rpcrdma_mr_put(struct rpcrdma_mr *mr) -{ - __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr); -} - -/** - * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it - * @mr: object to release - * - */ -void -rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) +void rpcrdma_mr_put(struct rpcrdma_mr *mr) { struct rpcrdma_xprt *r_xprt = mr->mr_xprt; @@ -1273,7 +1349,8 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) mr->mr_sg, mr->mr_nents, mr->mr_dir); mr->mr_dir = DMA_NONE; } - __rpcrdma_mr_put(&r_xprt->rx_buf, mr); + + rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs); } /** @@ -1304,39 +1381,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) */ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) { - struct rpcrdma_rep *rep = req->rl_reply; - + if (req->rl_reply) + rpcrdma_rep_put(buffers, req->rl_reply); req->rl_reply = NULL; spin_lock(&buffers->rb_lock); list_add(&req->rl_list, &buffers->rb_send_bufs); - if (rep) { - if (!rep->rr_temp) { - list_add(&rep->rr_list, &buffers->rb_recv_bufs); - rep = NULL; - } - } spin_unlock(&buffers->rb_lock); - if (rep) - rpcrdma_rep_destroy(rep); } -/* - * Put reply buffers back into pool when not attached to - * request. This happens in error conditions. +/** + * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list + * @rep: rep to release + * + * Used after error conditions. */ -void -rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) +void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) { - struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; - - if (!rep->rr_temp) { - spin_lock(&buffers->rb_lock); - list_add(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock(&buffers->rb_lock); - } else { - rpcrdma_rep_destroy(rep); - } + rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep); } /* Returns a pointer to a rpcrdma_regbuf object, or NULL. @@ -1453,7 +1515,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_req *req) { - struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; + struct ib_send_wr *send_wr = &req->rl_wr; int rc; if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) { @@ -1471,12 +1533,17 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, return 0; } -static void -rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) +/** + * rpcrdma_post_recvs - Refill the Receive Queue + * @r_xprt: controlling transport instance + * @temp: mark Receive buffers to be deleted after use + * + */ +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ep *ep = &r_xprt->rx_ep; - struct ib_recv_wr *i, *wr, *bad_wr; + struct ib_recv_wr *wr, *bad_wr; struct rpcrdma_rep *rep; int needed, count, rc; @@ -1484,7 +1551,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) count = 0; needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); - if (ep->rep_receive_count > needed) + if (likely(ep->rep_receive_count > needed)) goto out; needed -= ep->rep_receive_count; if (!temp) @@ -1492,42 +1559,26 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) /* fast path: all needed reps can be found on the free list */ wr = NULL; - spin_lock(&buf->rb_lock); while (needed) { - rep = list_first_entry_or_null(&buf->rb_recv_bufs, - struct rpcrdma_rep, rr_list); + rep = rpcrdma_rep_get_locked(buf); + if (rep && rep->rr_temp) { + rpcrdma_rep_destroy(rep); + continue; + } if (!rep) - break; - - list_del(&rep->rr_list); - rep->rr_recv_wr.next = wr; - wr = &rep->rr_recv_wr; - --needed; - } - spin_unlock(&buf->rb_lock); - - while (needed) { - rep = rpcrdma_rep_create(r_xprt, temp); + rep = rpcrdma_rep_create(r_xprt, temp); if (!rep) break; + trace_xprtrdma_post_recv(rep); rep->rr_recv_wr.next = wr; wr = &rep->rr_recv_wr; --needed; + ++count; } if (!wr) goto out; - for (i = wr; i; i = i->next) { - rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); - - if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) - goto release_wrs; - - trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe); - ++count; - } - rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); out: @@ -1544,11 +1595,4 @@ out: } ep->rep_receive_count += count; return; - -release_wrs: - for (i = wr; i;) { - rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); - i = i->next; - rpcrdma_recv_buffer_put(rep); - } } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 92ce09fcea74..37d5080c250b 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -47,6 +47,7 @@ #include <linux/atomic.h> /* atomic_t, etc */ #include <linux/kref.h> /* struct kref */ #include <linux/workqueue.h> /* struct work_struct */ +#include <linux/llist.h> #include <rdma/rdma_cm.h> /* RDMA connection api */ #include <rdma/ib_verbs.h> /* RDMA verbs api */ @@ -70,9 +71,8 @@ struct rpcrdma_ia { struct rdma_cm_id *ri_id; struct ib_pd *ri_pd; int ri_async_rc; - unsigned int ri_max_segs; + unsigned int ri_max_rdma_segs; unsigned int ri_max_frwr_depth; - unsigned int ri_max_send_sges; bool ri_implicit_roundup; enum ib_mr_type ri_mrtype; unsigned long ri_flags; @@ -98,7 +98,7 @@ struct rpcrdma_ep { wait_queue_head_t rep_connect_wait; struct rpcrdma_connect_private rep_cm_private; struct rdma_conn_param rep_remote_cma; - unsigned int rep_max_requests; /* set by /proc */ + unsigned int rep_max_requests; /* depends on device */ unsigned int rep_inline_send; /* negotiated */ unsigned int rep_inline_recv; /* negotiated */ int rep_receive_count; @@ -117,9 +117,6 @@ struct rpcrdma_ep { #endif /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV - * - * The below structure appears at the front of a large region of kmalloc'd - * memory, which always starts on a good alignment boundary. */ struct rpcrdma_regbuf { @@ -158,25 +155,22 @@ static inline void *rdmab_data(const struct rpcrdma_regbuf *rb) /* To ensure a transport can always make forward progress, * the number of RDMA segments allowed in header chunk lists - * is capped at 8. This prevents less-capable devices and - * memory registrations from overrunning the Send buffer - * while building chunk lists. + * is capped at 16. This prevents less-capable devices from + * overrunning the Send buffer while building chunk lists. * * Elements of the Read list take up more room than the - * Write list or Reply chunk. 8 read segments means the Read - * list (or Write list or Reply chunk) cannot consume more - * than - * - * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes. + * Write list or Reply chunk. 16 read segments means the + * chunk lists cannot consume more than * - * And the fixed part of the header is another 24 bytes. + * ((16 + 2) * read segment size) + 1 XDR words, * - * The smallest inline threshold is 1024 bytes, ensuring that - * at least 750 bytes are available for RPC messages. + * or about 400 bytes. The fixed part of the header is + * another 24 bytes. Thus when the inline threshold is + * 1024 bytes, at least 600 bytes are available for RPC + * message bodies. */ enum { - RPCRDMA_MAX_HDR_SEGS = 8, - RPCRDMA_HDRBUF_SIZE = 256, + RPCRDMA_MAX_HDR_SEGS = 16, }; /* @@ -206,8 +200,9 @@ struct rpcrdma_rep { struct rpc_rqst *rr_rqst; struct xdr_buf rr_hdrbuf; struct xdr_stream rr_stream; - struct list_head rr_list; + struct llist_node rr_node; struct ib_recv_wr rr_recv_wr; + struct list_head rr_all; }; /* To reduce the rate at which a transport invokes ib_post_recv @@ -223,12 +218,8 @@ enum { /* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes */ struct rpcrdma_req; -struct rpcrdma_xprt; struct rpcrdma_sendctx { - struct ib_send_wr sc_wr; struct ib_cqe sc_cqe; - struct ib_device *sc_device; - struct rpcrdma_xprt *sc_xprt; struct rpcrdma_req *sc_req; unsigned int sc_unmap_count; struct ib_sge sc_sges[]; @@ -240,20 +231,20 @@ struct rpcrdma_sendctx { * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). */ -struct rpcrdma_req; struct rpcrdma_frwr { struct ib_mr *fr_mr; struct ib_cqe fr_cqe; struct completion fr_linv_done; - struct rpcrdma_req *fr_req; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; }; }; +struct rpcrdma_req; struct rpcrdma_mr { struct list_head mr_list; + struct rpcrdma_req *mr_req; struct scatterlist *mr_sg; int mr_nents; enum dma_data_direction mr_dir; @@ -262,7 +253,6 @@ struct rpcrdma_mr { u32 mr_handle; u32 mr_length; u64 mr_offset; - struct work_struct mr_recycle; struct list_head mr_all; }; @@ -323,6 +313,7 @@ struct rpcrdma_req { struct rpcrdma_rep *rl_reply; struct xdr_stream rl_stream; struct xdr_buf rl_hdrbuf; + struct ib_send_wr rl_wr; struct rpcrdma_sendctx *rl_sendctx; struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */ struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */ @@ -331,7 +322,8 @@ struct rpcrdma_req { struct list_head rl_all; struct kref rl_kref; - struct list_head rl_registered; /* registered segments */ + struct list_head rl_free_mrs; + struct list_head rl_registered; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; @@ -344,7 +336,7 @@ rpcr_to_rdmar(const struct rpc_rqst *rqst) static inline void rpcrdma_mr_push(struct rpcrdma_mr *mr, struct list_head *list) { - list_add_tail(&mr->mr_list, list); + list_add(&mr->mr_list, list); } static inline struct rpcrdma_mr * @@ -352,8 +344,9 @@ rpcrdma_mr_pop(struct list_head *list) { struct rpcrdma_mr *mr; - mr = list_first_entry(list, struct rpcrdma_mr, mr_list); - list_del_init(&mr->mr_list); + mr = list_first_entry_or_null(list, struct rpcrdma_mr, mr_list); + if (mr) + list_del_init(&mr->mr_list); return mr; } @@ -364,27 +357,28 @@ rpcrdma_mr_pop(struct list_head *list) * One of these is associated with a transport instance */ struct rpcrdma_buffer { - spinlock_t rb_mrlock; /* protect rb_mrs list */ + spinlock_t rb_lock; + struct list_head rb_send_bufs; struct list_head rb_mrs; - struct list_head rb_all; unsigned long rb_sc_head; unsigned long rb_sc_tail; unsigned long rb_sc_last; struct rpcrdma_sendctx **rb_sc_ctxs; - spinlock_t rb_lock; /* protect buf lists */ - struct list_head rb_send_bufs; - struct list_head rb_recv_bufs; struct list_head rb_allreqs; + struct list_head rb_all_mrs; + struct list_head rb_all_reps; - u32 rb_max_requests; + struct llist_head rb_free_reps; + + __be32 rb_max_requests; u32 rb_credits; /* most recent credit grant */ u32 rb_bc_srv_max_requests; u32 rb_bc_max_requests; - struct delayed_work rb_refresh_worker; + struct work_struct rb_refresh_worker; }; /* @@ -477,12 +471,14 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *); int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_req *); +void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); /* * Buffer calls - xprtrdma/verbs.c */ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, gfp_t flags); +int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); void rpcrdma_req_destroy(struct rpcrdma_req *req); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); @@ -490,13 +486,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt); struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt); void rpcrdma_mr_put(struct rpcrdma_mr *mr); -void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr); - -static inline void -rpcrdma_mr_recycle(struct rpcrdma_mr *mr) -{ - schedule_work(&mr->mr_recycle); -} +void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, @@ -545,16 +535,15 @@ rpcrdma_data_dir(bool writing) /* Memory registration calls xprtrdma/frwr_ops.c */ -bool frwr_is_supported(struct ib_device *device); void frwr_reset(struct rpcrdma_req *req); -int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep); +int frwr_query_device(struct rpcrdma_xprt *r_xprt, + const struct ib_device *device); int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr); void frwr_release_mr(struct rpcrdma_mr *mr); -size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt); struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, __be32 xid, - struct rpcrdma_mr **mr); + struct rpcrdma_mr *mr); int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); @@ -566,6 +555,8 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); enum rpcrdma_chunktype { rpcrdma_noch = 0, + rpcrdma_noch_pullup, + rpcrdma_noch_mapped, rpcrdma_readch, rpcrdma_areadch, rpcrdma_writech, @@ -579,6 +570,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc); int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); +void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep); @@ -590,7 +582,6 @@ static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) /* RPC/RDMA module init - xprtrdma/transport.c */ -extern unsigned int xprt_rdma_slot_table_entries; extern unsigned int xprt_rdma_max_inline_read; extern unsigned int xprt_rdma_max_inline_write; void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap); |