summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c12
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c266
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c488
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c7
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c1
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c24
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c8
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c14
-rw-r--r--net/sunrpc/xprtrdma/transport.c65
-rw-r--r--net/sunrpc/xprtrdma/verbs.c636
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h91
11 files changed, 854 insertions, 758 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 59e624b1d7a0..1a0ae0c61353 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -54,9 +54,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
{
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
-
- return r_xprt->rx_buf.rb_bc_srv_max_requests;
+ return RPCRDMA_BACKWARD_WRS >> 1;
}
static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
@@ -81,7 +79,7 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
*p = xdr_zero;
if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
- &rqst->rq_snd_buf, rpcrdma_noch))
+ &rqst->rq_snd_buf, rpcrdma_noch_pullup))
return -EIO;
trace_xprtrdma_cb_reply(rqst);
@@ -165,6 +163,7 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
spin_lock(&xprt->bc_pa_lock);
list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
spin_unlock(&xprt->bc_pa_lock);
+ xprt_put(xprt);
}
static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
@@ -195,6 +194,10 @@ create_req:
req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
if (!req)
return NULL;
+ if (rpcrdma_req_setup(r_xprt, req)) {
+ rpcrdma_req_destroy(req);
+ return NULL;
+ }
xprt->bc_alloc_count++;
rqst = &req->rl_slot;
@@ -261,6 +264,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
/* Queue rqst for ULP's callback service */
bc_serv = xprt->bc_serv;
+ xprt_get(xprt);
spin_lock(&bc_serv->sv_cb_lock);
list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
spin_unlock(&bc_serv->sv_cb_lock);
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 0b6dad7580a1..125297c9aa3e 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -7,67 +7,37 @@
/* Lightweight memory registration using Fast Registration Work
* Requests (FRWR).
*
- * FRWR features ordered asynchronous registration and deregistration
- * of arbitrarily sized memory regions. This is the fastest and safest
+ * FRWR features ordered asynchronous registration and invalidation
+ * of arbitrarily-sized memory regions. This is the fastest and safest
* but most complex memory registration mode.
*/
/* Normal operation
*
- * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
+ * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
* Work Request (frwr_map). When the RDMA operation is finished, this
* Memory Region is invalidated using a LOCAL_INV Work Request
- * (frwr_unmap_sync).
+ * (frwr_unmap_async and frwr_unmap_sync).
*
- * Typically these Work Requests are not signaled, and neither are RDMA
- * SEND Work Requests (with the exception of signaling occasionally to
- * prevent provider work queue overflows). This greatly reduces HCA
+ * Typically FAST_REG Work Requests are not signaled, and neither are
+ * RDMA Send Work Requests (with the exception of signaling occasionally
+ * to prevent provider work queue overflows). This greatly reduces HCA
* interrupt workload.
- *
- * As an optimization, frwr_unmap marks MRs INVALID before the
- * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
- * rb_mrs immediately so that no work (like managing a linked list
- * under a spinlock) is needed in the completion upcall.
- *
- * But this means that frwr_map() can occasionally encounter an MR
- * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
- * ordering prevents a subsequent FAST_REG WR from executing against
- * that MR while it is still being invalidated.
*/
/* Transport recovery
*
- * ->op_map and the transport connect worker cannot run at the same
- * time, but ->op_unmap can fire while the transport connect worker
- * is running. Thus MR recovery is handled in ->op_map, to guarantee
- * that recovered MRs are owned by a sending RPC, and not one where
- * ->op_unmap could fire at the same time transport reconnect is
- * being done.
- *
- * When the underlying transport disconnects, MRs are left in one of
- * four states:
- *
- * INVALID: The MR was not in use before the QP entered ERROR state.
- *
- * VALID: The MR was registered before the QP entered ERROR state.
- *
- * FLUSHED_FR: The MR was being registered when the QP entered ERROR
- * state, and the pending WR was flushed.
- *
- * FLUSHED_LI: The MR was being invalidated when the QP entered ERROR
- * state, and the pending WR was flushed.
- *
- * When frwr_map encounters FLUSHED and VALID MRs, they are recovered
- * with ib_dereg_mr and then are re-initialized. Because MR recovery
- * allocates fresh resources, it is deferred to a workqueue, and the
- * recovered MRs are placed back on the rb_mrs list when recovery is
- * complete. frwr_map allocates another MR for the current RPC while
- * the broken MR is reset.
- *
- * To ensure that frwr_map doesn't encounter an MR that is marked
- * INVALID but that is about to be flushed due to a previous transport
- * disconnect, the transport connect worker attempts to drain all
- * pending send queue WRs before the transport is reconnected.
+ * frwr_map and frwr_unmap_* cannot run at the same time the transport
+ * connect worker is running. The connect worker holds the transport
+ * send lock, just as ->send_request does. This prevents frwr_map and
+ * the connect worker from running concurrently. When a connection is
+ * closed, the Receive completion queue is drained before the allowing
+ * the connect worker to get control. This prevents frwr_unmap and the
+ * connect worker from running concurrently.
+ *
+ * When the underlying transport disconnects, MRs that are in flight
+ * are flushed and are likely unusable. Thus all MRs are destroyed.
+ * New MRs are created on demand.
*/
#include <linux/sunrpc/rpc_rdma.h>
@@ -81,28 +51,6 @@
#endif
/**
- * frwr_is_supported - Check if device supports FRWR
- * @device: interface adapter to check
- *
- * Returns true if device supports FRWR, otherwise false
- */
-bool frwr_is_supported(struct ib_device *device)
-{
- struct ib_device_attr *attrs = &device->attrs;
-
- if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
- goto out_not_supported;
- if (attrs->max_fast_reg_page_list_len == 0)
- goto out_not_supported;
- return true;
-
-out_not_supported:
- pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
- device->name);
- return false;
-}
-
-/**
* frwr_release_mr - Destroy one MR
* @mr: MR allocated by frwr_init_mr
*
@@ -118,13 +66,8 @@ void frwr_release_mr(struct rpcrdma_mr *mr)
kfree(mr);
}
-/* MRs are dynamically allocated, so simply clean up and release the MR.
- * A replacement MR will subsequently be allocated on demand.
- */
-static void
-frwr_mr_recycle_worker(struct work_struct *work)
+static void frwr_mr_recycle(struct rpcrdma_mr *mr)
{
- struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
trace_xprtrdma_mr_recycle(mr);
@@ -136,10 +79,10 @@ frwr_mr_recycle_worker(struct work_struct *work)
mr->mr_dir = DMA_NONE;
}
- spin_lock(&r_xprt->rx_buf.rb_mrlock);
+ spin_lock(&r_xprt->rx_buf.rb_lock);
list_del(&mr->mr_all);
r_xprt->rx_stats.mrs_recycled++;
- spin_unlock(&r_xprt->rx_buf.rb_mrlock);
+ spin_unlock(&r_xprt->rx_buf.rb_lock);
frwr_release_mr(mr);
}
@@ -156,12 +99,10 @@ frwr_mr_recycle_worker(struct work_struct *work)
*/
void frwr_reset(struct rpcrdma_req *req)
{
- while (!list_empty(&req->rl_registered)) {
- struct rpcrdma_mr *mr;
+ struct rpcrdma_mr *mr;
- mr = rpcrdma_mr_pop(&req->rl_registered);
- rpcrdma_mr_unmap_and_put(mr);
- }
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
+ rpcrdma_mr_put(mr);
}
/**
@@ -183,14 +124,13 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
if (IS_ERR(frmr))
goto out_mr_err;
- sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL);
+ sg = kcalloc(depth, sizeof(*sg), GFP_NOFS);
if (!sg)
goto out_list_err;
mr->frwr.fr_mr = frmr;
mr->mr_dir = DMA_NONE;
INIT_LIST_HEAD(&mr->mr_list);
- INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
init_completion(&mr->frwr.fr_linv_done);
sg_init_table(sg, depth);
@@ -203,33 +143,53 @@ out_mr_err:
return rc;
out_list_err:
- dprintk("RPC: %s: sg allocation failure\n",
- __func__);
ib_dereg_mr(frmr);
return -ENOMEM;
}
/**
- * frwr_open - Prepare an endpoint for use with FRWR
- * @ia: interface adapter this endpoint will use
- * @ep: endpoint to prepare
+ * frwr_query_device - Prepare a transport for use with FRWR
+ * @r_xprt: controlling transport instance
+ * @device: RDMA device to query
*
* On success, sets:
- * ep->rep_attr.cap.max_send_wr
- * ep->rep_attr.cap.max_recv_wr
+ * ep->rep_attr
* ep->rep_max_requests
- * ia->ri_max_segs
+ * ia->ri_max_rdma_segs
*
* And these FRWR-related fields:
* ia->ri_max_frwr_depth
* ia->ri_mrtype
*
- * On failure, a negative errno is returned.
+ * Return values:
+ * On success, returns zero.
+ * %-EINVAL - the device does not support FRWR memory registration
+ * %-ENOMEM - the device is not sufficiently capable for NFS/RDMA
*/
-int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
+int frwr_query_device(struct rpcrdma_xprt *r_xprt,
+ const struct ib_device *device)
{
- struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
+ const struct ib_device_attr *attrs = &device->attrs;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
int max_qp_wr, depth, delta;
+ unsigned int max_sge;
+
+ if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) ||
+ attrs->max_fast_reg_page_list_len == 0) {
+ pr_err("rpcrdma: 'frwr' mode is not supported by device %s\n",
+ device->name);
+ return -EINVAL;
+ }
+
+ max_sge = min_t(unsigned int, attrs->max_send_sge,
+ RPCRDMA_MAX_SEND_SGES);
+ if (max_sge < RPCRDMA_MIN_SEND_SGES) {
+ pr_err("rpcrdma: HCA provides only %u send SGEs\n", max_sge);
+ return -ENOMEM;
+ }
+ ep->rep_attr.cap.max_send_sge = max_sge;
+ ep->rep_attr.cap.max_recv_sge = 1;
ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
@@ -239,14 +199,12 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
* capability, but perform optimally when the MRs are not larger
* than a page.
*/
- if (attrs->max_sge_rd > 1)
+ if (attrs->max_sge_rd > RPCRDMA_MAX_HDR_SEGS)
ia->ri_max_frwr_depth = attrs->max_sge_rd;
else
ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
- dprintk("RPC: %s: max FR page list depth = %u\n",
- __func__, ia->ri_max_frwr_depth);
/* Add room for frwr register and invalidate WRs.
* 1. FRWR reg WR for head
@@ -270,7 +228,7 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
} while (delta > 0);
}
- max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
+ max_qp_wr = attrs->max_qp_wr;
max_qp_wr -= RPCRDMA_BACKWARD_WRS;
max_qp_wr -= 1;
if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
@@ -281,7 +239,7 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
ep->rep_max_requests = max_qp_wr / depth;
if (!ep->rep_max_requests)
- return -EINVAL;
+ return -ENOMEM;
ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
}
ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
@@ -290,30 +248,22 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
- ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
- ia->ri_max_frwr_depth);
+ ia->ri_max_rdma_segs =
+ DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
/* Reply chunks require segments for head and tail buffers */
- ia->ri_max_segs += 2;
- if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
- ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
- return 0;
-}
-
-/**
- * frwr_maxpages - Compute size of largest payload
- * @r_xprt: transport
- *
- * Returns maximum size of an RPC message, in pages.
- *
- * FRWR mode conveys a list of pages per chunk segment. The
- * maximum length of that list is the FRWR page list depth.
- */
-size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
-{
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ ia->ri_max_rdma_segs += 2;
+ if (ia->ri_max_rdma_segs > RPCRDMA_MAX_HDR_SEGS)
+ ia->ri_max_rdma_segs = RPCRDMA_MAX_HDR_SEGS;
+
+ /* Ensure the underlying device is capable of conveying the
+ * largest r/wsize NFS will ask for. This guarantees that
+ * failing over from one RDMA device to another will not
+ * break NFS I/O.
+ */
+ if ((ia->ri_max_rdma_segs * ia->ri_max_frwr_depth) < RPCRDMA_MAX_SEGS)
+ return -ENOMEM;
- return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
- (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
+ return 0;
}
/**
@@ -323,31 +273,25 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
* @nsegs: number of segments remaining
* @writing: true when RDMA Write will be used
* @xid: XID of RPC using the registered memory
- * @out: initialized MR
+ * @mr: MR to fill in
*
* Prepare a REG_MR Work Request to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
*
* Returns the next segment or a negative errno pointer.
- * On success, the prepared MR is planted in @out.
+ * On success, @mr is filled in.
*/
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr_seg *seg,
int nsegs, bool writing, __be32 xid,
- struct rpcrdma_mr **out)
+ struct rpcrdma_mr *mr)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
- struct rpcrdma_mr *mr;
- struct ib_mr *ibmr;
struct ib_reg_wr *reg_wr;
- int i, n;
+ int i, n, dma_nents;
+ struct ib_mr *ibmr;
u8 key;
- mr = rpcrdma_mr_get(r_xprt);
- if (!mr)
- goto out_getmr_err;
-
if (nsegs > ia->ri_max_frwr_depth)
nsegs = ia->ri_max_frwr_depth;
for (i = 0; i < nsegs;) {
@@ -362,22 +306,23 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
++seg;
++i;
- if (holes_ok)
+ if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
continue;
if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
break;
}
mr->mr_dir = rpcrdma_data_dir(writing);
+ mr->mr_nents = i;
- mr->mr_nents =
- ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
- if (!mr->mr_nents)
+ dma_nents = ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, mr->mr_nents,
+ mr->mr_dir);
+ if (!dma_nents)
goto out_dmamap_err;
ibmr = mr->frwr.fr_mr;
- n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
- if (unlikely(n != mr->mr_nents))
+ n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE);
+ if (n != dma_nents)
goto out_mapmr_err;
ibmr->iova &= 0x00000000ffffffff;
@@ -397,22 +342,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
mr->mr_offset = ibmr->iova;
trace_xprtrdma_mr_map(mr);
- *out = mr;
return seg;
-out_getmr_err:
- xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
- return ERR_PTR(-EAGAIN);
-
out_dmamap_err:
mr->mr_dir = DMA_NONE;
trace_xprtrdma_frwr_sgerr(mr, i);
- rpcrdma_mr_put(mr);
return ERR_PTR(-EIO);
out_mapmr_err:
trace_xprtrdma_frwr_maperr(mr, n);
- rpcrdma_mr_recycle(mr);
return ERR_PTR(-EIO);
}
@@ -449,7 +387,7 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
struct ib_send_wr *post_wr;
struct rpcrdma_mr *mr;
- post_wr = &req->rl_sendctx->sc_wr;
+ post_wr = &req->rl_wr;
list_for_each_entry(mr, &req->rl_registered, mr_list) {
struct rpcrdma_frwr *frwr;
@@ -465,9 +403,6 @@ int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
post_wr = &frwr->fr_regwr.wr;
}
- /* If ib_post_send fails, the next ->send_request for
- * @req will queue these MRs for recovery.
- */
return ib_post_send(ia->ri_id->qp, post_wr, NULL);
}
@@ -485,7 +420,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
if (mr->mr_handle == rep->rr_inv_rkey) {
list_del_init(&mr->mr_list);
trace_xprtrdma_mr_remoteinv(mr);
- rpcrdma_mr_unmap_and_put(mr);
+ rpcrdma_mr_put(mr);
break; /* only one invalidated MR per RPC */
}
}
@@ -493,9 +428,9 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
{
if (wc->status != IB_WC_SUCCESS)
- rpcrdma_mr_recycle(mr);
+ frwr_mr_recycle(mr);
else
- rpcrdma_mr_unmap_and_put(mr);
+ rpcrdma_mr_put(mr);
}
/**
@@ -532,8 +467,8 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_li_wake(wc, frwr);
- complete(&frwr->fr_linv_done);
__frwr_release_mr(wc, mr);
+ complete(&frwr->fr_linv_done);
}
/**
@@ -562,8 +497,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
frwr = NULL;
prev = &first;
- while (!list_empty(&req->rl_registered)) {
- mr = rpcrdma_mr_pop(&req->rl_registered);
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
trace_xprtrdma_mr_localinv(mr);
r_xprt->rx_stats.local_inv_needed++;
@@ -596,7 +530,6 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
bad_wr = NULL;
rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
- trace_xprtrdma_post_send(req, rc);
/* The final LOCAL_INV WR in the chain is supposed to
* do the wake. If it was never posted, the wake will
@@ -609,6 +542,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
+ trace_xprtrdma_post_linv(req, rc);
while (bad_wr) {
frwr = container_of(bad_wr, struct rpcrdma_frwr,
fr_invwr);
@@ -616,7 +550,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
bad_wr = bad_wr->next;
list_del_init(&mr->mr_list);
- rpcrdma_mr_recycle(mr);
+ frwr_mr_recycle(mr);
}
}
@@ -632,11 +566,15 @@ static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
struct rpcrdma_frwr *frwr =
container_of(cqe, struct rpcrdma_frwr, fr_cqe);
struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+ struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_li_done(wc, frwr);
- rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
__frwr_release_mr(wc, mr);
+
+ /* Ensure @rep is generated before __frwr_release_mr */
+ smp_rmb();
+ rpcrdma_complete_rqst(rep);
}
/**
@@ -662,15 +600,13 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
frwr = NULL;
prev = &first;
- while (!list_empty(&req->rl_registered)) {
- mr = rpcrdma_mr_pop(&req->rl_registered);
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
trace_xprtrdma_mr_localinv(mr);
r_xprt->rx_stats.local_inv_needed++;
frwr = &mr->frwr;
frwr->fr_cqe.done = frwr_wc_localinv;
- frwr->fr_req = req;
last = &frwr->fr_invwr;
last->next = NULL;
last->wr_cqe = &frwr->fr_cqe;
@@ -697,18 +633,18 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
*/
bad_wr = NULL;
rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
- trace_xprtrdma_post_send(req, rc);
if (!rc)
return;
/* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
+ trace_xprtrdma_post_linv(req, rc);
while (bad_wr) {
frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
mr = container_of(frwr, struct rpcrdma_mr, frwr);
bad_wr = bad_wr->next;
- rpcrdma_mr_recycle(mr);
+ frwr_mr_recycle(mr);
}
/* The final LOCAL_INV WR in the chain is supposed to
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 4345e6912392..28020ec104d4 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -78,8 +78,6 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
size += rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
- dprintk("RPC: %s: max call header size = %u\n",
- __func__, size);
return size;
}
@@ -100,8 +98,6 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
- dprintk("RPC: %s: max reply header size = %u\n",
- __func__, size);
return size;
}
@@ -115,7 +111,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
*/
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
{
- unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs;
+ unsigned int maxsegs = r_xprt->rx_ia.ri_max_rdma_segs;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
ep->rep_max_inline_send =
@@ -149,7 +145,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
remaining -= min_t(unsigned int,
PAGE_SIZE - offset, remaining);
offset = 0;
- if (++count > r_xprt->rx_ia.ri_max_send_sges)
+ if (++count > r_xprt->rx_ep.rep_attr.cap.max_send_sge)
return false;
}
}
@@ -342,6 +338,31 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
return 0;
}
+static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing,
+ struct rpcrdma_mr **mr)
+{
+ *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
+ if (!*mr) {
+ *mr = rpcrdma_mr_get(r_xprt);
+ if (!*mr)
+ goto out_getmr_err;
+ trace_xprtrdma_mr_get(req);
+ (*mr)->mr_req = req;
+ }
+
+ rpcrdma_mr_push(*mr, &req->rl_registered);
+ return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
+
+out_getmr_err:
+ trace_xprtrdma_nomrs(req);
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+ rpcrdma_mrs_refresh(r_xprt);
+ return ERR_PTR(-EAGAIN);
+}
+
/* Register and XDR encode the Read list. Supports encoding a list of read
* segments that belong to a single read chunk.
*
@@ -356,9 +377,10 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
*
* Only a single @pos value is currently supported.
*/
-static noinline int
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
+static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype rtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -366,7 +388,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
unsigned int pos;
int nsegs;
- if (rtype == rpcrdma_noch)
+ if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
goto done;
pos = rqst->rq_snd_buf.head[0].iov_len;
@@ -379,10 +401,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return nsegs;
do {
- seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_read_segment(xdr, mr, pos) < 0)
return -EMSGSIZE;
@@ -411,9 +432,10 @@ done:
*
* Only a single Write chunk is currently supported.
*/
-static noinline int
-rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -440,10 +462,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nchunks = 0;
do {
- seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
@@ -474,9 +495,10 @@ done:
* Returns zero on success, or a negative errno if a failure occurred.
* @xdr is advanced to the next position in the stream.
*/
-static noinline int
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct rpc_rqst *rqst,
+ enum rpcrdma_chunktype wtype)
{
struct xdr_stream *xdr = &req->rl_stream;
struct rpcrdma_mr_seg *seg;
@@ -501,10 +523,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nchunks = 0;
do {
- seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr);
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
if (IS_ERR(seg))
return PTR_ERR(seg);
- rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_rdma_segment(xdr, mr) < 0)
return -EMSGSIZE;
@@ -539,6 +560,7 @@ static void rpcrdma_sendctx_done(struct kref *kref)
*/
void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{
+ struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
struct ib_sge *sge;
if (!sc->sc_unmap_count)
@@ -550,7 +572,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
*/
for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
++sge, --sc->sc_unmap_count)
- ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
+ ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length,
DMA_TO_DEVICE);
kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
@@ -558,152 +580,228 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
/* Prepare an SGE for the RPC-over-RDMA transport header.
*/
-static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
+static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 len)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
- struct ib_sge *sge = sc->sc_sges;
+ struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
+
+ sge->addr = rdmab_addr(rb);
+ sge->length = len;
+ sge->lkey = rdmab_lkey(rb);
+
+ ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
+ DMA_TO_DEVICE);
+}
+
+/* The head iovec is straightforward, as it is usually already
+ * DMA-mapped. Sync the content that has changed.
+ */
+static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, unsigned int len)
+{
+ struct rpcrdma_sendctx *sc = req->rl_sendctx;
+ struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
+ struct rpcrdma_regbuf *rb = req->rl_sendbuf;
if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
- goto out_regbuf;
+ return false;
+
sge->addr = rdmab_addr(rb);
sge->length = len;
sge->lkey = rdmab_lkey(rb);
ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
DMA_TO_DEVICE);
- sc->sc_wr.num_sge++;
return true;
+}
-out_regbuf:
- pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+/* If there is a page list present, DMA map and prepare an
+ * SGE for each page to be sent.
+ */
+static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req,
+ struct xdr_buf *xdr)
+{
+ struct rpcrdma_sendctx *sc = req->rl_sendctx;
+ struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+ unsigned int page_base, len, remaining;
+ struct page **ppages;
+ struct ib_sge *sge;
+
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+ page_base = offset_in_page(xdr->page_base);
+ remaining = xdr->page_len;
+ while (remaining) {
+ sge = &sc->sc_sges[req->rl_wr.num_sge++];
+ len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
+ sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages,
+ page_base, len, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
+ goto out_mapping_err;
+
+ sge->length = len;
+ sge->lkey = rdmab_lkey(rb);
+
+ sc->sc_unmap_count++;
+ ppages++;
+ remaining -= len;
+ page_base = 0;
+ }
+
+ return true;
+
+out_mapping_err:
+ trace_xprtrdma_dma_maperr(sge->addr);
return false;
}
-/* Prepare the Send SGEs. The head and tail iovec, and each entry
- * in the page list, gets its own SGE.
+/* The tail iovec may include an XDR pad for the page list,
+ * as well as additional content, and may not reside in the
+ * same page as the head iovec.
*/
-static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_req *req,
+static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
struct xdr_buf *xdr,
- enum rpcrdma_chunktype rtype)
+ unsigned int page_base, unsigned int len)
{
struct rpcrdma_sendctx *sc = req->rl_sendctx;
- unsigned int sge_no, page_base, len, remaining;
+ struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
struct rpcrdma_regbuf *rb = req->rl_sendbuf;
- struct ib_sge *sge = sc->sc_sges;
- struct page *page, **ppages;
+ struct page *page = virt_to_page(xdr->tail[0].iov_base);
- /* The head iovec is straightforward, as it is already
- * DMA-mapped. Sync the content that has changed.
- */
- if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
- goto out_regbuf;
- sc->sc_device = rdmab_device(rb);
- sge_no = 1;
- sge[sge_no].addr = rdmab_addr(rb);
- sge[sge_no].length = xdr->head[0].iov_len;
- sge[sge_no].lkey = rdmab_lkey(rb);
- ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
- sge[sge_no].length, DMA_TO_DEVICE);
+ sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
+ goto out_mapping_err;
- /* If there is a Read chunk, the page list is being handled
- * via explicit RDMA, and thus is skipped here. However, the
- * tail iovec may include an XDR pad for the page list, as
- * well as additional content, and may not reside in the
- * same page as the head iovec.
- */
- if (rtype == rpcrdma_readch) {
- len = xdr->tail[0].iov_len;
+ sge->length = len;
+ sge->lkey = rdmab_lkey(rb);
+ ++sc->sc_unmap_count;
+ return true;
- /* Do not include the tail if it is only an XDR pad */
- if (len < 4)
- goto out;
+out_mapping_err:
+ trace_xprtrdma_dma_maperr(sge->addr);
+ return false;
+}
- page = virt_to_page(xdr->tail[0].iov_base);
- page_base = offset_in_page(xdr->tail[0].iov_base);
+/* Copy the tail to the end of the head buffer.
+ */
+static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct xdr_buf *xdr)
+{
+ unsigned char *dst;
- /* If the content in the page list is an odd length,
- * xdr_write_pages() has added a pad at the beginning
- * of the tail iovec. Force the tail's non-pad content
- * to land at the next XDR position in the Send message.
- */
- page_base += len & 3;
- len -= len & 3;
- goto map_tail;
- }
+ dst = (unsigned char *)xdr->head[0].iov_base;
+ dst += xdr->head[0].iov_len + xdr->page_len;
+ memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
+ r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len;
+}
- /* If there is a page list present, temporarily DMA map
- * and prepare an SGE for each page to be sent.
- */
- if (xdr->page_len) {
- ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
- page_base = offset_in_page(xdr->page_base);
- remaining = xdr->page_len;
- while (remaining) {
- sge_no++;
- if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
- goto out_mapping_overflow;
-
- len = min_t(u32, PAGE_SIZE - page_base, remaining);
- sge[sge_no].addr =
- ib_dma_map_page(rdmab_device(rb), *ppages,
- page_base, len, DMA_TO_DEVICE);
- if (ib_dma_mapping_error(rdmab_device(rb),
- sge[sge_no].addr))
- goto out_mapping_err;
- sge[sge_no].length = len;
- sge[sge_no].lkey = rdmab_lkey(rb);
-
- sc->sc_unmap_count++;
- ppages++;
- remaining -= len;
- page_base = 0;
- }
+/* Copy pagelist content into the head buffer.
+ */
+static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct xdr_buf *xdr)
+{
+ unsigned int len, page_base, remaining;
+ struct page **ppages;
+ unsigned char *src, *dst;
+
+ dst = (unsigned char *)xdr->head[0].iov_base;
+ dst += xdr->head[0].iov_len;
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+ page_base = offset_in_page(xdr->page_base);
+ remaining = xdr->page_len;
+ while (remaining) {
+ src = page_address(*ppages);
+ src += page_base;
+ len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
+ memcpy(dst, src, len);
+ r_xprt->rx_stats.pullup_copy_count += len;
+
+ ppages++;
+ dst += len;
+ remaining -= len;
+ page_base = 0;
}
+}
- /* The tail iovec is not always constructed in the same
- * page where the head iovec resides (see, for example,
- * gss_wrap_req_priv). To neatly accommodate that case,
- * DMA map it separately.
- */
- if (xdr->tail[0].iov_len) {
- page = virt_to_page(xdr->tail[0].iov_base);
- page_base = offset_in_page(xdr->tail[0].iov_base);
- len = xdr->tail[0].iov_len;
+/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
+ * When the head, pagelist, and tail are small, a pull-up copy
+ * is considerably less costly than DMA mapping the components
+ * of @xdr.
+ *
+ * Assumptions:
+ * - the caller has already verified that the total length
+ * of the RPC Call body will fit into @rl_sendbuf.
+ */
+static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct xdr_buf *xdr)
+{
+ if (unlikely(xdr->tail[0].iov_len))
+ rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
-map_tail:
- sge_no++;
- sge[sge_no].addr =
- ib_dma_map_page(rdmab_device(rb), page, page_base, len,
- DMA_TO_DEVICE);
- if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr))
- goto out_mapping_err;
- sge[sge_no].length = len;
- sge[sge_no].lkey = rdmab_lkey(rb);
- sc->sc_unmap_count++;
- }
+ if (unlikely(xdr->page_len))
+ rpcrdma_pullup_pagelist(r_xprt, req, xdr);
-out:
- sc->sc_wr.num_sge += sge_no;
- if (sc->sc_unmap_count)
+ /* The whole RPC message resides in the head iovec now */
+ return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
+}
+
+static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct xdr_buf *xdr)
+{
+ struct kvec *tail = &xdr->tail[0];
+
+ if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
+ return false;
+ if (xdr->page_len)
+ if (!rpcrdma_prepare_pagelist(req, xdr))
+ return false;
+ if (tail->iov_len)
+ if (!rpcrdma_prepare_tail_iov(req, xdr,
+ offset_in_page(tail->iov_base),
+ tail->iov_len))
+ return false;
+
+ if (req->rl_sendctx->sc_unmap_count)
kref_get(&req->rl_kref);
return true;
+}
-out_regbuf:
- pr_err("rpcrdma: failed to DMA map a Send buffer\n");
- return false;
+static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req,
+ struct xdr_buf *xdr)
+{
+ if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
+ return false;
-out_mapping_overflow:
- rpcrdma_sendctx_unmap(sc);
- pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
- return false;
+ /* If there is a Read chunk, the page list is being handled
+ * via explicit RDMA, and thus is skipped here.
+ */
-out_mapping_err:
- rpcrdma_sendctx_unmap(sc);
- trace_xprtrdma_dma_maperr(sge[sge_no].addr);
- return false;
+ /* Do not include the tail if it is only an XDR pad */
+ if (xdr->tail[0].iov_len > 3) {
+ unsigned int page_base, len;
+
+ /* If the content in the page list is an odd length,
+ * xdr_write_pages() adds a pad at the beginning of
+ * the tail iovec. Force the tail's non-pad content to
+ * land at the next XDR position in the Send message.
+ */
+ page_base = offset_in_page(xdr->tail[0].iov_base);
+ len = xdr->tail[0].iov_len;
+ page_base += len & 3;
+ len -= len & 3;
+ if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
+ return false;
+ kref_get(&req->rl_kref);
+ }
+
+ return true;
}
/**
@@ -716,31 +814,52 @@ out_mapping_err:
*
* Returns 0 on success; otherwise a negative errno is returned.
*/
-int
-rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_req *req, u32 hdrlen,
- struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, u32 hdrlen,
+ struct xdr_buf *xdr,
+ enum rpcrdma_chunktype rtype)
{
int ret;
ret = -EAGAIN;
req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx)
- goto err;
- req->rl_sendctx->sc_wr.num_sge = 0;
+ goto out_nosc;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
kref_init(&req->rl_kref);
+ req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
+ req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
+ req->rl_wr.num_sge = 0;
+ req->rl_wr.opcode = IB_WR_SEND;
+
+ rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen);
ret = -EIO;
- if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
- goto err;
- if (rtype != rpcrdma_areadch)
- if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
- goto err;
+ switch (rtype) {
+ case rpcrdma_noch_pullup:
+ if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
+ goto out_unmap;
+ break;
+ case rpcrdma_noch_mapped:
+ if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr))
+ goto out_unmap;
+ break;
+ case rpcrdma_readch:
+ if (!rpcrdma_prepare_readch(r_xprt, req, xdr))
+ goto out_unmap;
+ break;
+ case rpcrdma_areadch:
+ break;
+ default:
+ goto out_unmap;
+ }
+
return 0;
-err:
+out_unmap:
+ rpcrdma_sendctx_unmap(req->rl_sendctx);
+out_nosc:
trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
return ret;
}
@@ -770,6 +889,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct xdr_stream *xdr = &req->rl_stream;
enum rpcrdma_chunktype rtype, wtype;
+ struct xdr_buf *buf = &rqst->rq_snd_buf;
bool ddp_allowed;
__be32 *p;
int ret;
@@ -785,7 +905,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
goto out_err;
*p++ = rqst->rq_xid;
*p++ = rpcrdma_version;
- *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
+ *p++ = r_xprt->rx_buf.rb_max_requests;
/* When the ULP employs a GSS flavor that guarantees integrity
* or privacy, direct data placement of individual data items
@@ -827,8 +947,9 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
*/
if (rpcrdma_args_inline(r_xprt, rqst)) {
*p++ = rdma_msg;
- rtype = rpcrdma_noch;
- } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
+ rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
+ rpcrdma_noch_pullup : rpcrdma_noch_mapped;
+ } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) {
*p++ = rdma_msg;
rtype = rpcrdma_readch;
} else {
@@ -837,17 +958,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
rtype = rpcrdma_areadch;
}
- /* If this is a retransmit, discard previously registered
- * chunks. Very likely the connection has been replaced,
- * so these registrations are invalid and unusable.
- */
- while (unlikely(!list_empty(&req->rl_registered))) {
- struct rpcrdma_mr *mr;
-
- mr = rpcrdma_mr_pop(&req->rl_registered);
- rpcrdma_mr_recycle(mr);
- }
-
/* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message:
*
@@ -881,7 +991,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
goto out_err;
ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
- &rqst->rq_snd_buf, rtype);
+ buf, rtype);
if (ret)
goto out_err;
@@ -895,6 +1005,40 @@ out_err:
return ret;
}
+static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt,
+ struct rpcrdma_buffer *buf,
+ u32 grant)
+{
+ buf->rb_credits = grant;
+ xprt->cwnd = grant << RPC_CWNDSHIFT;
+}
+
+static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant)
+{
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+
+ spin_lock(&xprt->transport_lock);
+ __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant);
+ spin_unlock(&xprt->transport_lock);
+}
+
+/**
+ * rpcrdma_reset_cwnd - Reset the xprt's congestion window
+ * @r_xprt: controlling transport instance
+ *
+ * Prepare @r_xprt for the next connection by reinitializing
+ * its credit grant to one (see RFC 8166, Section 3.3.3).
+ */
+void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+
+ spin_lock(&xprt->transport_lock);
+ xprt->cong = 0;
+ __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1);
+ spin_unlock(&xprt->transport_lock);
+}
+
/**
* rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
* @rqst: controlling RPC request
@@ -934,7 +1078,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
curlen = rqst->rq_rcv_buf.head[0].iov_len;
if (curlen > copy_len)
curlen = copy_len;
- trace_xprtrdma_fixup(rqst, copy_len, curlen);
srcp += curlen;
copy_len -= curlen;
@@ -954,8 +1097,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
if (curlen > pagelist_len)
curlen = pagelist_len;
- trace_xprtrdma_fixup_pg(rqst, i, srcp,
- copy_len, curlen);
destp = kmap_atomic(ppages[i]);
memcpy(destp + page_base, srcp, curlen);
flush_dcache_page(ppages[i]);
@@ -987,6 +1128,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
rqst->rq_private_buf.tail[0].iov_base = srcp;
}
+ if (fixup_copy_count)
+ trace_xprtrdma_fixup(rqst, fixup_copy_count);
return fixup_copy_count;
}
@@ -1240,8 +1383,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
struct rpc_rqst *rqst = rep->rr_rqst;
int status;
- xprt->reestablish_timeout = 0;
-
switch (rep->rr_proc) {
case rdma_msg:
status = rpcrdma_decode_msg(r_xprt, rep, rqst);
@@ -1300,6 +1441,12 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
u32 credits;
__be32 *p;
+ /* Any data means we had a useful conversation, so
+ * then we don't need to delay the next reconnect.
+ */
+ if (xprt->reestablish_timeout)
+ xprt->reestablish_timeout = 0;
+
/* Fixed transport header fields */
xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
rep->rr_hdrbuf.head[0].iov_base, NULL);
@@ -1329,14 +1476,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (credits == 0)
credits = 1; /* don't deadlock */
- else if (credits > buf->rb_max_requests)
- credits = buf->rb_max_requests;
- if (buf->rb_credits != credits) {
- spin_lock(&xprt->transport_lock);
- buf->rb_credits = credits;
- xprt->cwnd = credits << RPC_CWNDSHIFT;
- spin_unlock(&xprt->transport_lock);
- }
+ else if (credits > r_xprt->rx_ep.rep_max_requests)
+ credits = r_xprt->rx_ep.rep_max_requests;
+ if (buf->rb_credits != credits)
+ rpcrdma_update_cwnd(r_xprt, credits);
+ rpcrdma_post_recvs(r_xprt, false);
req = rpcr_to_rdmar(rqst);
if (req->rl_reply) {
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index abdb3004a1e3..97bca509a391 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -73,8 +73,6 @@ atomic_t rdma_stat_rq_prod;
atomic_t rdma_stat_sq_poll;
atomic_t rdma_stat_sq_prod;
-struct workqueue_struct *svc_rdma_wq;
-
/*
* This function implements reading and resetting an atomic_t stat
* variable through read/write to a proc file. Any write to the file
@@ -230,7 +228,6 @@ static struct ctl_table svcrdma_root_table[] = {
void svc_rdma_cleanup(void)
{
dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
- destroy_workqueue(svc_rdma_wq);
if (svcrdma_table_header) {
unregister_sysctl_table(svcrdma_table_header);
svcrdma_table_header = NULL;
@@ -246,10 +243,6 @@ int svc_rdma_init(void)
dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests);
dprintk("\tmax_inline : %d\n", svcrdma_max_req_size);
- svc_rdma_wq = alloc_workqueue("svc_rdma", 0, 0);
- if (!svc_rdma_wq)
- return -ENOMEM;
-
if (!svcrdma_table_header)
svcrdma_table_header =
register_sysctl_table(svcrdma_root_table);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index d1fcc41d5eb5..908e78bb87c6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -195,6 +195,7 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
#endif
+ rqst->rq_xtime = ktime_get();
rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
if (rc) {
svc_rdma_send_ctxt_put(rdma, ctxt);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 65e2fb9aac65..96bccd398469 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -172,9 +172,10 @@ static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma,
void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
{
struct svc_rdma_recv_ctxt *ctxt;
+ struct llist_node *node;
- while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) {
- list_del(&ctxt->rc_list);
+ while ((node = llist_del_first(&rdma->sc_recv_ctxts))) {
+ ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
svc_rdma_recv_ctxt_destroy(rdma, ctxt);
}
}
@@ -183,21 +184,18 @@ static struct svc_rdma_recv_ctxt *
svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
{
struct svc_rdma_recv_ctxt *ctxt;
+ struct llist_node *node;
- spin_lock(&rdma->sc_recv_lock);
- ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts);
- if (!ctxt)
+ node = llist_del_first(&rdma->sc_recv_ctxts);
+ if (!node)
goto out_empty;
- list_del(&ctxt->rc_list);
- spin_unlock(&rdma->sc_recv_lock);
+ ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
out:
ctxt->rc_page_count = 0;
return ctxt;
out_empty:
- spin_unlock(&rdma->sc_recv_lock);
-
ctxt = svc_rdma_recv_ctxt_alloc(rdma);
if (!ctxt)
return NULL;
@@ -218,11 +216,9 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
for (i = 0; i < ctxt->rc_page_count; i++)
put_page(ctxt->rc_pages[i]);
- if (!ctxt->rc_temp) {
- spin_lock(&rdma->sc_recv_lock);
- list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts);
- spin_unlock(&rdma->sc_recv_lock);
- } else
+ if (!ctxt->rc_temp)
+ llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
+ else
svc_rdma_recv_ctxt_destroy(rdma, ctxt);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 6fdba72f89f4..f3f108090aa4 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -233,11 +233,15 @@ void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
/* The first SGE contains the transport header, which
* remains mapped until @ctxt is destroyed.
*/
- for (i = 1; i < ctxt->sc_send_wr.num_sge; i++)
+ for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
ib_dma_unmap_page(device,
ctxt->sc_sges[i].addr,
ctxt->sc_sges[i].length,
DMA_TO_DEVICE);
+ trace_svcrdma_dma_unmap_page(rdma,
+ ctxt->sc_sges[i].addr,
+ ctxt->sc_sges[i].length);
+ }
for (i = 0; i < ctxt->sc_page_count; ++i)
put_page(ctxt->sc_pages[i]);
@@ -490,6 +494,7 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
dma_addr_t dma_addr;
dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
+ trace_svcrdma_dma_map_page(rdma, dma_addr, len);
if (ib_dma_mapping_error(dev, dma_addr))
goto out_maperr;
@@ -499,7 +504,6 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
return 0;
out_maperr:
- trace_svcrdma_dma_map_page(rdma, page);
return -EIO;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 3fe665152d95..145a3615c319 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -140,14 +140,13 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts);
- INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
+ init_llist_head(&cma_xprt->sc_recv_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait);
spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
spin_lock_init(&cma_xprt->sc_send_lock);
- spin_lock_init(&cma_xprt->sc_recv_lock);
spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
/*
@@ -454,14 +453,14 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating PD for connect request\n");
goto errout;
}
- newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
- 0, IB_POLL_WORKQUEUE);
+ newxprt->sc_sq_cq = ib_alloc_cq_any(dev, newxprt, newxprt->sc_sq_depth,
+ IB_POLL_WORKQUEUE);
if (IS_ERR(newxprt->sc_sq_cq)) {
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
}
- newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, rq_depth,
- 0, IB_POLL_WORKQUEUE);
+ newxprt->sc_rq_cq =
+ ib_alloc_cq_any(dev, newxprt, rq_depth, IB_POLL_WORKQUEUE);
if (IS_ERR(newxprt->sc_rq_cq)) {
dprintk("svcrdma: error creating RQ CQ for connect request\n");
goto errout;
@@ -630,8 +629,9 @@ static void svc_rdma_free(struct svc_xprt *xprt)
{
struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
INIT_WORK(&rdma->sc_work, __svc_rdma_free);
- queue_work(svc_rdma_wq, &rdma->sc_work);
+ schedule_work(&rdma->sc_work);
}
static int svc_rdma_has_wspace(struct svc_xprt *xprt)
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 2ec349ed4770..3cfeba68ee9a 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -243,16 +243,13 @@ xprt_rdma_connect_worker(struct work_struct *work)
rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
xprt_clear_connecting(xprt);
if (r_xprt->rx_ep.rep_connected > 0) {
- if (!xprt_test_and_set_connected(xprt)) {
- xprt->stat.connect_count++;
- xprt->stat.connect_time += (long)jiffies -
- xprt->stat.connect_start;
- xprt_wake_pending_tasks(xprt, -EAGAIN);
- }
- } else {
- if (xprt_test_and_clear_connected(xprt))
- xprt_wake_pending_tasks(xprt, rc);
+ xprt->stat.connect_count++;
+ xprt->stat.connect_time += (long)jiffies -
+ xprt->stat.connect_start;
+ xprt_set_connected(xprt);
+ rc = -EAGAIN;
}
+ xprt_wake_pending_tasks(xprt, rc);
}
/**
@@ -319,7 +316,8 @@ xprt_setup_rdma(struct xprt_create *args)
if (args->addrlen > sizeof(xprt->addr))
return ERR_PTR(-EBADF);
- xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 0);
+ xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0,
+ xprt_rdma_slot_table_entries);
if (!xprt)
return ERR_PTR(-ENOMEM);
@@ -361,19 +359,13 @@ xprt_setup_rdma(struct xprt_create *args)
if (rc)
goto out3;
- INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
- xprt_rdma_connect_worker);
-
- xprt->max_payload = frwr_maxpages(new_xprt);
- if (xprt->max_payload == 0)
- goto out4;
- xprt->max_payload <<= PAGE_SHIFT;
- dprintk("RPC: %s: transport data payload maximum: %zu bytes\n",
- __func__, xprt->max_payload);
-
if (!try_module_get(THIS_MODULE))
goto out4;
+ INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
+ xprt_rdma_connect_worker);
+ xprt->max_payload = RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
+
dprintk("RPC: %s: %s:%s\n", __func__,
xprt->address_strings[RPC_DISPLAY_ADDR],
xprt->address_strings[RPC_DISPLAY_PORT]);
@@ -423,17 +415,10 @@ void xprt_rdma_close(struct rpc_xprt *xprt)
if (ep->rep_connected == -ENODEV)
return;
- if (ep->rep_connected > 0)
- xprt->reestablish_timeout = 0;
rpcrdma_ep_disconnect(ep, ia);
- /* Prepare @xprt for the next connection by reinitializing
- * its credit grant to one (see RFC 8166, Section 3.3.3).
- */
- r_xprt->rx_buf.rb_credits = 1;
- xprt->cwnd = RPC_CWNDSHIFT;
-
out:
+ xprt->reestablish_timeout = 0;
++xprt->connect_cookie;
xprt_disconnect_done(xprt);
}
@@ -451,12 +436,6 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
char buf[8];
- dprintk("RPC: %s: setting port for xprt %p (%s:%s) to %u\n",
- __func__, xprt,
- xprt->address_strings[RPC_DISPLAY_ADDR],
- xprt->address_strings[RPC_DISPLAY_PORT],
- port);
-
rpc_set_port(sap, port);
kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
@@ -466,6 +445,9 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
snprintf(buf, sizeof(buf), "%4hx", port);
xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
+
+ trace_xprtrdma_op_setport(container_of(xprt, struct rpcrdma_xprt,
+ rx_xprt));
}
/**
@@ -494,9 +476,9 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
* @reconnect_timeout: reconnect timeout after server disconnects
*
*/
-static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt,
- unsigned long connect_timeout,
- unsigned long reconnect_timeout)
+static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt,
+ unsigned long connect_timeout,
+ unsigned long reconnect_timeout)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -537,13 +519,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
unsigned long delay;
- trace_xprtrdma_op_connect(r_xprt);
-
delay = 0;
if (r_xprt->rx_ep.rep_connected != 0) {
delay = xprt_reconnect_delay(xprt);
xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
}
+ trace_xprtrdma_op_connect(r_xprt, delay);
queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
delay);
}
@@ -571,6 +552,7 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
return;
out_sleep:
+ set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN;
}
@@ -589,7 +571,8 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
- rpc_wake_up_next(&xprt->backlog);
+ if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
+ clear_bit(XPRT_CONGESTED, &xprt->state);
}
static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
@@ -803,7 +786,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
.send_request = xprt_rdma_send_request,
.close = xprt_rdma_close,
.destroy = xprt_rdma_destroy,
- .set_connect_timeout = xprt_rdma_tcp_set_connect_timeout,
+ .set_connect_timeout = xprt_rdma_set_connect_timeout,
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 805b1f35e1ca..353f61ac8d51 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -53,6 +53,7 @@
#include <linux/slab.h>
#include <linux/sunrpc/addr.h>
#include <linux/sunrpc/svc_rdma.h>
+#include <linux/log2.h>
#include <asm-generic/barrier.h>
#include <asm/bitops.h>
@@ -73,15 +74,21 @@
/*
* internal functions
*/
-static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
+static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_sendctx *sc);
+static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
+static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
-static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
static struct rpcrdma_regbuf *
rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
gfp_t flags);
static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
-static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/* Wait for outstanding transport work to finish. ib_drain_qp
* handles the drains in the wrong order for us, so open code
@@ -122,7 +129,7 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context)
/**
* rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
- * @cq: completion queue (ignored)
+ * @cq: completion queue
* @wc: completed WR
*
*/
@@ -135,7 +142,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_send(sc, wc);
- rpcrdma_sendctx_put_locked(sc);
+ rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc);
}
/**
@@ -167,19 +174,18 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
rdmab_addr(rep->rr_rdmabuf),
wc->byte_len, DMA_FROM_DEVICE);
- rpcrdma_post_recvs(r_xprt, false);
rpcrdma_reply_handler(rep);
return;
out_flushed:
- rpcrdma_recv_buffer_put(rep);
+ rpcrdma_rep_destroy(rep);
}
-static void
-rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
- struct rdma_conn_param *param)
+static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt,
+ struct rdma_conn_param *param)
{
const struct rpcrdma_connect_private *pmsg = param->private_data;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
unsigned int rsize, wsize;
/* Default settings for RPC-over-RDMA Version One */
@@ -195,13 +201,11 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
}
- if (rsize < r_xprt->rx_ep.rep_inline_recv)
- r_xprt->rx_ep.rep_inline_recv = rsize;
- if (wsize < r_xprt->rx_ep.rep_inline_send)
- r_xprt->rx_ep.rep_inline_send = wsize;
- dprintk("RPC: %s: max send %u, max recv %u\n", __func__,
- r_xprt->rx_ep.rep_inline_send,
- r_xprt->rx_ep.rep_inline_recv);
+ if (rsize < ep->rep_inline_recv)
+ ep->rep_inline_recv = rsize;
+ if (wsize < ep->rep_inline_send)
+ ep->rep_inline_send = wsize;
+
rpcrdma_set_max_header_sizes(r_xprt);
}
@@ -244,6 +248,7 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
ia->ri_id->device->name,
rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
#endif
+ init_completion(&ia->ri_remove_done);
set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
ep->rep_connected = -ENODEV;
xprt_force_disconnect(xprt);
@@ -255,7 +260,8 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
case RDMA_CM_EVENT_ESTABLISHED:
++xprt->connect_cookie;
ep->rep_connected = 1;
- rpcrdma_update_connect_private(r_xprt, &event->param.conn);
+ rpcrdma_update_cm_private(r_xprt, &event->param.conn);
+ trace_xprtrdma_inline_thresh(r_xprt);
wake_up_all(&ep->rep_connect_wait);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
@@ -295,10 +301,7 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
struct rdma_cm_id *id;
int rc;
- trace_xprtrdma_conn_start(xprt);
-
init_completion(&ia->ri_done);
- init_completion(&ia->ri_remove_done);
id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
xprt, RDMA_PS_TCP, IB_QPT_RC);
@@ -312,10 +315,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
if (rc)
goto out;
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
- if (rc < 0) {
- trace_xprtrdma_conn_tout(xprt);
+ if (rc < 0)
goto out;
- }
rc = ia->ri_async_rc;
if (rc)
@@ -326,10 +327,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
if (rc)
goto out;
rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
- if (rc < 0) {
- trace_xprtrdma_conn_tout(xprt);
+ if (rc < 0)
goto out;
- }
rc = ia->ri_async_rc;
if (rc)
goto out;
@@ -371,18 +370,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
goto out_err;
}
- switch (xprt_rdma_memreg_strategy) {
- case RPCRDMA_FRWR:
- if (frwr_is_supported(ia->ri_id->device))
- break;
- /*FALLTHROUGH*/
- default:
- pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
- ia->ri_id->device->name, xprt_rdma_memreg_strategy);
- rc = -EINVAL;
- goto out_err;
- }
-
return 0;
out_err:
@@ -396,6 +383,8 @@ out_err:
*
* Divest transport H/W resources associated with this adapter,
* but allow it to be restored later.
+ *
+ * Caller must hold the transport send lock.
*/
void
rpcrdma_ia_remove(struct rpcrdma_ia *ia)
@@ -403,11 +392,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
rx_ia);
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_req *req;
- struct rpcrdma_rep *rep;
-
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
/* This is similar to rpcrdma_ep_destroy, but:
* - Don't cancel the connect worker.
@@ -429,14 +413,10 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
/* The ULP is responsible for ensuring all DMA
* mappings and MRs are gone.
*/
- list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
- rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
- list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
- rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
- rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
- rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
- }
- rpcrdma_mrs_destroy(buf);
+ rpcrdma_reps_unmap(r_xprt);
+ rpcrdma_reqs_reset(r_xprt);
+ rpcrdma_mrs_destroy(r_xprt);
+ rpcrdma_sendctxs_destroy(r_xprt);
ib_dealloc_pd(ia->ri_pd);
ia->ri_pd = NULL;
@@ -479,30 +459,20 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
struct ib_cq *sendcq, *recvcq;
- unsigned int max_sge;
int rc;
- ep->rep_max_requests = xprt_rdma_slot_table_entries;
+ ep->rep_max_requests = r_xprt->rx_xprt.max_reqs;
ep->rep_inline_send = xprt_rdma_max_inline_write;
ep->rep_inline_recv = xprt_rdma_max_inline_read;
- max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge,
- RPCRDMA_MAX_SEND_SGES);
- if (max_sge < RPCRDMA_MIN_SEND_SGES) {
- pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
- return -ENOMEM;
- }
- ia->ri_max_send_sges = max_sge;
-
- rc = frwr_open(ia, ep);
+ rc = frwr_query_device(r_xprt, ia->ri_id->device);
if (rc)
return rc;
+ r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->rep_max_requests);
ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
ep->rep_attr.qp_context = ep;
ep->rep_attr.srq = NULL;
- ep->rep_attr.cap.max_send_sge = max_sge;
- ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
ep->rep_attr.qp_type = IB_QPT_RC;
@@ -521,18 +491,17 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
init_waitqueue_head(&ep->rep_connect_wait);
ep->rep_receive_count = 0;
- sendcq = ib_alloc_cq(ia->ri_id->device, NULL,
- ep->rep_attr.cap.max_send_wr + 1,
- ia->ri_id->device->num_comp_vectors > 1 ? 1 : 0,
- IB_POLL_WORKQUEUE);
+ sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt,
+ ep->rep_attr.cap.max_send_wr + 1,
+ IB_POLL_WORKQUEUE);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
goto out1;
}
- recvcq = ib_alloc_cq(ia->ri_id->device, NULL,
- ep->rep_attr.cap.max_recv_wr + 1,
- 0, IB_POLL_WORKQUEUE);
+ recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+ ep->rep_attr.cap.max_recv_wr + 1,
+ IB_POLL_WORKQUEUE);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
goto out2;
@@ -605,10 +574,11 @@ void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
* Unlike a normal reconnection, a fresh PD and a new set
* of MRs and buffers is needed.
*/
-static int
-rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
- struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
+ struct ib_qp_init_attr *qp_init_attr)
{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
int rc, err;
trace_xprtrdma_reinsert(r_xprt);
@@ -623,15 +593,14 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
goto out2;
}
+ memcpy(qp_init_attr, &ep->rep_attr, sizeof(*qp_init_attr));
rc = -ENETUNREACH;
- err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+ err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
if (err) {
pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
goto out3;
}
-
- rpcrdma_mrs_create(r_xprt);
return 0;
out3:
@@ -642,16 +611,14 @@ out1:
return rc;
}
-static int
-rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
- struct rpcrdma_ia *ia)
+static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
+ struct ib_qp_init_attr *qp_init_attr)
{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rdma_cm_id *id, *old;
int err, rc;
- trace_xprtrdma_reconnect(r_xprt);
-
- rpcrdma_ep_disconnect(ep, ia);
+ rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
rc = -EHOSTUNREACH;
id = rpcrdma_create_id(r_xprt, ia);
@@ -673,7 +640,7 @@ rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
goto out_destroy;
}
- err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
+ err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
if (err)
goto out_destroy;
@@ -698,25 +665,26 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
rx_ia);
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct ib_qp_init_attr qp_init_attr;
int rc;
retry:
+ memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
switch (ep->rep_connected) {
case 0:
- dprintk("RPC: %s: connecting...\n", __func__);
- rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+ rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
if (rc) {
rc = -ENETUNREACH;
goto out_noupdate;
}
break;
case -ENODEV:
- rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
+ rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
if (rc)
goto out_noupdate;
break;
default:
- rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
+ rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
if (rc)
goto out;
}
@@ -724,12 +692,19 @@ retry:
ep->rep_connected = 0;
xprt_clear_connected(xprt);
+ rpcrdma_reset_cwnd(r_xprt);
rpcrdma_post_recvs(r_xprt, true);
+ rc = rpcrdma_sendctxs_create(r_xprt);
+ if (rc)
+ goto out;
+
rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
if (rc)
goto out;
+ if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
if (ep->rep_connected <= 0) {
if (ep->rep_connected == -EAGAIN)
@@ -738,13 +713,19 @@ retry:
goto out;
}
- dprintk("RPC: %s: connected\n", __func__);
+ rc = rpcrdma_reqs_setup(r_xprt);
+ if (rc) {
+ rpcrdma_ep_disconnect(ep, ia);
+ goto out;
+ }
+ rpcrdma_mrs_create(r_xprt);
out:
if (rc)
ep->rep_connected = rc;
out_noupdate:
+ trace_xprtrdma_connect(r_xprt, rc);
return rc;
}
@@ -753,11 +734,8 @@ out_noupdate:
* @ep: endpoint to disconnect
* @ia: associated interface adapter
*
- * This is separate from destroy to facilitate the ability
- * to reconnect without recreating the endpoint.
- *
- * This call is not reentrant, and must not be made in parallel
- * on the same endpoint.
+ * Caller serializes. Either the transport send lock is held,
+ * or we're being called to destroy the transport.
*/
void
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
@@ -776,6 +754,9 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
trace_xprtrdma_disconnect(r_xprt, rc);
rpcrdma_xprt_drain(r_xprt);
+ rpcrdma_reqs_reset(r_xprt);
+ rpcrdma_mrs_destroy(r_xprt);
+ rpcrdma_sendctxs_destroy(r_xprt);
}
/* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -795,27 +776,28 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
* queue activity, and rpcrdma_xprt_drain has flushed all remaining
* Send requests.
*/
-static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
+static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
unsigned long i;
+ if (!buf->rb_sc_ctxs)
+ return;
for (i = 0; i <= buf->rb_sc_last; i++)
kfree(buf->rb_sc_ctxs[i]);
kfree(buf->rb_sc_ctxs);
+ buf->rb_sc_ctxs = NULL;
}
-static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
+static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
{
struct rpcrdma_sendctx *sc;
- sc = kzalloc(struct_size(sc, sc_sges, ia->ri_max_send_sges),
+ sc = kzalloc(struct_size(sc, sc_sges, ep->rep_attr.cap.max_send_sge),
GFP_KERNEL);
if (!sc)
return NULL;
- sc->sc_wr.wr_cqe = &sc->sc_cqe;
- sc->sc_wr.sg_list = sc->sc_sges;
- sc->sc_wr.opcode = IB_WR_SEND;
sc->sc_cqe.done = rpcrdma_wc_send;
return sc;
}
@@ -831,22 +813,22 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
* the ->send_request call to fail temporarily before too many
* Sends are posted.
*/
- i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
- dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i);
+ i = r_xprt->rx_ep.rep_max_requests + RPCRDMA_MAX_BC_REQUESTS;
buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
if (!buf->rb_sc_ctxs)
return -ENOMEM;
buf->rb_sc_last = i - 1;
for (i = 0; i <= buf->rb_sc_last; i++) {
- sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
+ sc = rpcrdma_sendctx_create(&r_xprt->rx_ep);
if (!sc)
return -ENOMEM;
- sc->sc_xprt = r_xprt;
buf->rb_sc_ctxs[i] = sc;
}
+ buf->rb_sc_head = 0;
+ buf->rb_sc_tail = 0;
return 0;
}
@@ -906,6 +888,7 @@ out_emptyq:
/**
* rpcrdma_sendctx_put_locked - Release a send context
+ * @r_xprt: controlling transport instance
* @sc: send context to release
*
* Usage: Called from Send completion to return a sendctxt
@@ -913,10 +896,10 @@ out_emptyq:
*
* The caller serializes calls to this function (per transport).
*/
-static void
-rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_sendctx *sc)
{
- struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
unsigned long next_tail;
/* Unmap SGEs of previously completed but unsignaled
@@ -934,7 +917,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
- xprt_write_space(&sc->sc_xprt->rx_xprt);
+ xprt_write_space(&r_xprt->rx_xprt);
}
static void
@@ -943,14 +926,12 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
unsigned int count;
- LIST_HEAD(free);
- LIST_HEAD(all);
- for (count = 0; count < ia->ri_max_segs; count++) {
+ for (count = 0; count < ia->ri_max_rdma_segs; count++) {
struct rpcrdma_mr *mr;
int rc;
- mr = kzalloc(sizeof(*mr), GFP_KERNEL);
+ mr = kzalloc(sizeof(*mr), GFP_NOFS);
if (!mr)
break;
@@ -962,15 +943,13 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
mr->mr_xprt = r_xprt;
- list_add(&mr->mr_list, &free);
- list_add(&mr->mr_all, &all);
+ spin_lock(&buf->rb_lock);
+ rpcrdma_mr_push(mr, &buf->rb_mrs);
+ list_add(&mr->mr_all, &buf->rb_all_mrs);
+ spin_unlock(&buf->rb_lock);
}
- spin_lock(&buf->rb_mrlock);
- list_splice(&free, &buf->rb_mrs);
- list_splice(&all, &buf->rb_all);
r_xprt->rx_stats.mrs_allocated += count;
- spin_unlock(&buf->rb_mrlock);
trace_xprtrdma_createmrs(r_xprt, count);
}
@@ -978,7 +957,7 @@ static void
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
- rb_refresh_worker.work);
+ rb_refresh_worker);
struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
rx_buf);
@@ -987,6 +966,28 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
}
/**
+ * rpcrdma_mrs_refresh - Wake the MR refresh worker
+ * @r_xprt: controlling transport instance
+ *
+ */
+void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+ /* If there is no underlying device, it's no use to
+ * wake the refresh worker.
+ */
+ if (ep->rep_connected != -ENODEV) {
+ /* The work is scheduled on a WQ_MEM_RECLAIM
+ * workqueue in order to prevent MR allocation
+ * from recursing into NFS during direct reclaim.
+ */
+ queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
+ }
+}
+
+/**
* rpcrdma_req_create - Allocate an rpcrdma_req object
* @r_xprt: controlling r_xprt
* @size: initial size, in bytes, of send and receive buffers
@@ -998,45 +999,120 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
gfp_t flags)
{
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
- struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
req = kzalloc(sizeof(*req), flags);
if (req == NULL)
goto out1;
- rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags);
- if (!rb)
- goto out2;
- req->rl_rdmabuf = rb;
- xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
-
req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
if (!req->rl_sendbuf)
- goto out3;
+ goto out2;
req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
if (!req->rl_recvbuf)
- goto out4;
+ goto out3;
+ INIT_LIST_HEAD(&req->rl_free_mrs);
INIT_LIST_HEAD(&req->rl_registered);
spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_lock);
return req;
-out4:
- kfree(req->rl_sendbuf);
out3:
- kfree(req->rl_rdmabuf);
+ kfree(req->rl_sendbuf);
out2:
kfree(req);
out1:
return NULL;
}
-static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
- bool temp)
+/**
+ * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req object to set up
+ *
+ * Returns zero on success, and a negative errno on failure.
+ */
+int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+ struct rpcrdma_regbuf *rb;
+ size_t maxhdrsize;
+
+ /* Compute maximum header buffer size in bytes */
+ maxhdrsize = rpcrdma_fixed_maxsz + 3 +
+ r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz;
+ maxhdrsize *= sizeof(__be32);
+ rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
+ DMA_TO_DEVICE, GFP_KERNEL);
+ if (!rb)
+ goto out;
+
+ if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
+ goto out_free;
+
+ req->rl_rdmabuf = rb;
+ xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
+ return 0;
+
+out_free:
+ rpcrdma_regbuf_free(rb);
+out:
+ return -ENOMEM;
+}
+
+/* ASSUMPTION: the rb_allreqs list is stable for the duration,
+ * and thus can be walked without holding rb_lock. Eg. the
+ * caller is holding the transport send lock to exclude
+ * device removal or disconnection.
+ */
+static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_req *req;
+ int rc;
+
+ list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
+ rc = rpcrdma_req_setup(r_xprt, req);
+ if (rc)
+ return rc;
+ }
+ return 0;
+}
+
+static void rpcrdma_req_reset(struct rpcrdma_req *req)
+{
+ /* Credits are valid for only one connection */
+ req->rl_slot.rq_cong = 0;
+
+ rpcrdma_regbuf_free(req->rl_rdmabuf);
+ req->rl_rdmabuf = NULL;
+
+ rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
+ rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
+}
+
+/* ASSUMPTION: the rb_allreqs list is stable for the duration,
+ * and thus can be walked without holding rb_lock. Eg. the
+ * caller is holding the transport send lock to exclude
+ * device removal or disconnection.
+ */
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_req *req;
+
+ list_for_each_entry(req, &buf->rb_allreqs, rl_all)
+ rpcrdma_req_reset(req);
+}
+
+/* No locking needed here. This function is called only by the
+ * Receive completion handler.
+ */
+static noinline
+struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
+ bool temp)
{
struct rpcrdma_rep *rep;
@@ -1049,6 +1125,9 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
if (!rep->rr_rdmabuf)
goto out_free;
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
+ goto out_free_regbuf;
+
xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
rdmab_length(rep->rr_rdmabuf));
rep->rr_cqe.done = rpcrdma_wc_receive;
@@ -1058,14 +1137,63 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
rep->rr_recv_wr.num_sge = 1;
rep->rr_temp = temp;
+ list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps);
return rep;
+out_free_regbuf:
+ rpcrdma_regbuf_free(rep->rr_rdmabuf);
out_free:
kfree(rep);
out:
return NULL;
}
+/* No locking needed here. This function is invoked only by the
+ * Receive completion handler, or during transport shutdown.
+ */
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+{
+ list_del(&rep->rr_all);
+ rpcrdma_regbuf_free(rep->rr_rdmabuf);
+ kfree(rep);
+}
+
+static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
+{
+ struct llist_node *node;
+
+ /* Calls to llist_del_first are required to be serialized */
+ node = llist_del_first(&buf->rb_free_reps);
+ if (!node)
+ return NULL;
+ return llist_entry(node, struct rpcrdma_rep, rr_node);
+}
+
+static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
+ struct rpcrdma_rep *rep)
+{
+ llist_add(&rep->rr_node, &buf->rb_free_reps);
+}
+
+static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_rep *rep;
+
+ list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
+ rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
+ rep->rr_temp = true;
+ }
+}
+
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_rep *rep;
+
+ while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
+ rpcrdma_rep_destroy(rep);
+}
+
/**
* rpcrdma_buffer_create - Create initial set of req/rep objects
* @r_xprt: transport instance to (re)initialize
@@ -1077,37 +1205,28 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
int i, rc;
- buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0;
- spin_lock_init(&buf->rb_mrlock);
spin_lock_init(&buf->rb_lock);
INIT_LIST_HEAD(&buf->rb_mrs);
- INIT_LIST_HEAD(&buf->rb_all);
- INIT_DELAYED_WORK(&buf->rb_refresh_worker,
- rpcrdma_mr_refresh_worker);
-
- rpcrdma_mrs_create(r_xprt);
+ INIT_LIST_HEAD(&buf->rb_all_mrs);
+ INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
+ INIT_LIST_HEAD(&buf->rb_all_reps);
rc = -ENOMEM;
- for (i = 0; i < buf->rb_max_requests; i++) {
+ for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
struct rpcrdma_req *req;
- req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
+ req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
GFP_KERNEL);
if (!req)
goto out;
list_add(&req->rl_list, &buf->rb_send_bufs);
}
- buf->rb_credits = 1;
- INIT_LIST_HEAD(&buf->rb_recv_bufs);
-
- rc = rpcrdma_sendctxs_create(r_xprt);
- if (rc)
- goto out;
+ init_llist_head(&buf->rb_free_reps);
return 0;
out:
@@ -1115,58 +1234,62 @@ out:
return rc;
}
-static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
-{
- rpcrdma_regbuf_free(rep->rr_rdmabuf);
- kfree(rep);
-}
-
/**
* rpcrdma_req_destroy - Destroy an rpcrdma_req object
* @req: unused object to be destroyed
*
- * This function assumes that the caller prevents concurrent device
- * unload and transport tear-down.
+ * Relies on caller holding the transport send lock to protect
+ * removing req->rl_all from buf->rb_all_reqs safely.
*/
-void
-rpcrdma_req_destroy(struct rpcrdma_req *req)
+void rpcrdma_req_destroy(struct rpcrdma_req *req)
{
+ struct rpcrdma_mr *mr;
+
list_del(&req->rl_all);
+ while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
+ struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
+
+ spin_lock(&buf->rb_lock);
+ list_del(&mr->mr_all);
+ spin_unlock(&buf->rb_lock);
+
+ frwr_release_mr(mr);
+ }
+
rpcrdma_regbuf_free(req->rl_recvbuf);
rpcrdma_regbuf_free(req->rl_sendbuf);
rpcrdma_regbuf_free(req->rl_rdmabuf);
kfree(req);
}
-static void
-rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
+/**
+ * rpcrdma_mrs_destroy - Release all of a transport's MRs
+ * @r_xprt: controlling transport instance
+ *
+ * Relies on caller holding the transport send lock to protect
+ * removing mr->mr_list from req->rl_free_mrs safely.
+ */
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
{
- struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
- rx_buf);
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_mr *mr;
- unsigned int count;
- count = 0;
- spin_lock(&buf->rb_mrlock);
- while (!list_empty(&buf->rb_all)) {
- mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
- list_del(&mr->mr_all);
+ cancel_work_sync(&buf->rb_refresh_worker);
- spin_unlock(&buf->rb_mrlock);
-
- /* Ensure MW is not on any rl_registered list */
- if (!list_empty(&mr->mr_list))
- list_del(&mr->mr_list);
+ spin_lock(&buf->rb_lock);
+ while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
+ struct rpcrdma_mr,
+ mr_all)) != NULL) {
+ list_del(&mr->mr_list);
+ list_del(&mr->mr_all);
+ spin_unlock(&buf->rb_lock);
frwr_release_mr(mr);
- count++;
- spin_lock(&buf->rb_mrlock);
- }
- spin_unlock(&buf->rb_mrlock);
- r_xprt->rx_stats.mrs_allocated = 0;
- dprintk("RPC: %s: released %u MRs\n", __func__, count);
+ spin_lock(&buf->rb_lock);
+ }
+ spin_unlock(&buf->rb_lock);
}
/**
@@ -1180,18 +1303,7 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
-
- rpcrdma_sendctxs_destroy(buf);
-
- while (!list_empty(&buf->rb_recv_bufs)) {
- struct rpcrdma_rep *rep;
-
- rep = list_first_entry(&buf->rb_recv_bufs,
- struct rpcrdma_rep, rr_list);
- list_del(&rep->rr_list);
- rpcrdma_rep_destroy(rep);
- }
+ rpcrdma_reps_destroy(buf);
while (!list_empty(&buf->rb_send_bufs)) {
struct rpcrdma_req *req;
@@ -1201,8 +1313,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
list_del(&req->rl_list);
rpcrdma_req_destroy(req);
}
-
- rpcrdma_mrs_destroy(buf);
}
/**
@@ -1216,54 +1326,20 @@ struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_mr *mr = NULL;
-
- spin_lock(&buf->rb_mrlock);
- if (!list_empty(&buf->rb_mrs))
- mr = rpcrdma_mr_pop(&buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
+ struct rpcrdma_mr *mr;
- if (!mr)
- goto out_nomrs;
+ spin_lock(&buf->rb_lock);
+ mr = rpcrdma_mr_pop(&buf->rb_mrs);
+ spin_unlock(&buf->rb_lock);
return mr;
-
-out_nomrs:
- trace_xprtrdma_nomrs(r_xprt);
- if (r_xprt->rx_ep.rep_connected != -ENODEV)
- schedule_delayed_work(&buf->rb_refresh_worker, 0);
-
- /* Allow the reply handler and refresh worker to run */
- cond_resched();
-
- return NULL;
-}
-
-static void
-__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
-{
- spin_lock(&buf->rb_mrlock);
- rpcrdma_mr_push(mr, &buf->rb_mrs);
- spin_unlock(&buf->rb_mrlock);
}
/**
- * rpcrdma_mr_put - Release an rpcrdma_mr object
- * @mr: object to release
+ * rpcrdma_mr_put - DMA unmap an MR and release it
+ * @mr: MR to release
*
*/
-void
-rpcrdma_mr_put(struct rpcrdma_mr *mr)
-{
- __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
-}
-
-/**
- * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
- * @mr: object to release
- *
- */
-void
-rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
+void rpcrdma_mr_put(struct rpcrdma_mr *mr)
{
struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
@@ -1273,7 +1349,8 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
mr->mr_sg, mr->mr_nents, mr->mr_dir);
mr->mr_dir = DMA_NONE;
}
- __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
+
+ rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
}
/**
@@ -1304,39 +1381,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
*/
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
- struct rpcrdma_rep *rep = req->rl_reply;
-
+ if (req->rl_reply)
+ rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
- if (rep) {
- if (!rep->rr_temp) {
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
- rep = NULL;
- }
- }
spin_unlock(&buffers->rb_lock);
- if (rep)
- rpcrdma_rep_destroy(rep);
}
-/*
- * Put reply buffers back into pool when not attached to
- * request. This happens in error conditions.
+/**
+ * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
+ * @rep: rep to release
+ *
+ * Used after error conditions.
*/
-void
-rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
- struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-
- if (!rep->rr_temp) {
- spin_lock(&buffers->rb_lock);
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
- spin_unlock(&buffers->rb_lock);
- } else {
- rpcrdma_rep_destroy(rep);
- }
+ rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
}
/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
@@ -1453,7 +1515,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
- struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
+ struct ib_send_wr *send_wr = &req->rl_wr;
int rc;
if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
@@ -1471,12 +1533,17 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
return 0;
}
-static void
-rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
+/**
+ * rpcrdma_post_recvs - Refill the Receive Queue
+ * @r_xprt: controlling transport instance
+ * @temp: mark Receive buffers to be deleted after use
+ *
+ */
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- struct ib_recv_wr *i, *wr, *bad_wr;
+ struct ib_recv_wr *wr, *bad_wr;
struct rpcrdma_rep *rep;
int needed, count, rc;
@@ -1484,7 +1551,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
- if (ep->rep_receive_count > needed)
+ if (likely(ep->rep_receive_count > needed))
goto out;
needed -= ep->rep_receive_count;
if (!temp)
@@ -1492,42 +1559,26 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
/* fast path: all needed reps can be found on the free list */
wr = NULL;
- spin_lock(&buf->rb_lock);
while (needed) {
- rep = list_first_entry_or_null(&buf->rb_recv_bufs,
- struct rpcrdma_rep, rr_list);
+ rep = rpcrdma_rep_get_locked(buf);
+ if (rep && rep->rr_temp) {
+ rpcrdma_rep_destroy(rep);
+ continue;
+ }
if (!rep)
- break;
-
- list_del(&rep->rr_list);
- rep->rr_recv_wr.next = wr;
- wr = &rep->rr_recv_wr;
- --needed;
- }
- spin_unlock(&buf->rb_lock);
-
- while (needed) {
- rep = rpcrdma_rep_create(r_xprt, temp);
+ rep = rpcrdma_rep_create(r_xprt, temp);
if (!rep)
break;
+ trace_xprtrdma_post_recv(rep);
rep->rr_recv_wr.next = wr;
wr = &rep->rr_recv_wr;
--needed;
+ ++count;
}
if (!wr)
goto out;
- for (i = wr; i; i = i->next) {
- rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
-
- if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
- goto release_wrs;
-
- trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
- ++count;
- }
-
rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
(const struct ib_recv_wr **)&bad_wr);
out:
@@ -1544,11 +1595,4 @@ out:
}
ep->rep_receive_count += count;
return;
-
-release_wrs:
- for (i = wr; i;) {
- rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
- i = i->next;
- rpcrdma_recv_buffer_put(rep);
- }
}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 92ce09fcea74..37d5080c250b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -47,6 +47,7 @@
#include <linux/atomic.h> /* atomic_t, etc */
#include <linux/kref.h> /* struct kref */
#include <linux/workqueue.h> /* struct work_struct */
+#include <linux/llist.h>
#include <rdma/rdma_cm.h> /* RDMA connection api */
#include <rdma/ib_verbs.h> /* RDMA verbs api */
@@ -70,9 +71,8 @@ struct rpcrdma_ia {
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
int ri_async_rc;
- unsigned int ri_max_segs;
+ unsigned int ri_max_rdma_segs;
unsigned int ri_max_frwr_depth;
- unsigned int ri_max_send_sges;
bool ri_implicit_roundup;
enum ib_mr_type ri_mrtype;
unsigned long ri_flags;
@@ -98,7 +98,7 @@ struct rpcrdma_ep {
wait_queue_head_t rep_connect_wait;
struct rpcrdma_connect_private rep_cm_private;
struct rdma_conn_param rep_remote_cma;
- unsigned int rep_max_requests; /* set by /proc */
+ unsigned int rep_max_requests; /* depends on device */
unsigned int rep_inline_send; /* negotiated */
unsigned int rep_inline_recv; /* negotiated */
int rep_receive_count;
@@ -117,9 +117,6 @@ struct rpcrdma_ep {
#endif
/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
- *
- * The below structure appears at the front of a large region of kmalloc'd
- * memory, which always starts on a good alignment boundary.
*/
struct rpcrdma_regbuf {
@@ -158,25 +155,22 @@ static inline void *rdmab_data(const struct rpcrdma_regbuf *rb)
/* To ensure a transport can always make forward progress,
* the number of RDMA segments allowed in header chunk lists
- * is capped at 8. This prevents less-capable devices and
- * memory registrations from overrunning the Send buffer
- * while building chunk lists.
+ * is capped at 16. This prevents less-capable devices from
+ * overrunning the Send buffer while building chunk lists.
*
* Elements of the Read list take up more room than the
- * Write list or Reply chunk. 8 read segments means the Read
- * list (or Write list or Reply chunk) cannot consume more
- * than
- *
- * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
+ * Write list or Reply chunk. 16 read segments means the
+ * chunk lists cannot consume more than
*
- * And the fixed part of the header is another 24 bytes.
+ * ((16 + 2) * read segment size) + 1 XDR words,
*
- * The smallest inline threshold is 1024 bytes, ensuring that
- * at least 750 bytes are available for RPC messages.
+ * or about 400 bytes. The fixed part of the header is
+ * another 24 bytes. Thus when the inline threshold is
+ * 1024 bytes, at least 600 bytes are available for RPC
+ * message bodies.
*/
enum {
- RPCRDMA_MAX_HDR_SEGS = 8,
- RPCRDMA_HDRBUF_SIZE = 256,
+ RPCRDMA_MAX_HDR_SEGS = 16,
};
/*
@@ -206,8 +200,9 @@ struct rpcrdma_rep {
struct rpc_rqst *rr_rqst;
struct xdr_buf rr_hdrbuf;
struct xdr_stream rr_stream;
- struct list_head rr_list;
+ struct llist_node rr_node;
struct ib_recv_wr rr_recv_wr;
+ struct list_head rr_all;
};
/* To reduce the rate at which a transport invokes ib_post_recv
@@ -223,12 +218,8 @@ enum {
/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
*/
struct rpcrdma_req;
-struct rpcrdma_xprt;
struct rpcrdma_sendctx {
- struct ib_send_wr sc_wr;
struct ib_cqe sc_cqe;
- struct ib_device *sc_device;
- struct rpcrdma_xprt *sc_xprt;
struct rpcrdma_req *sc_req;
unsigned int sc_unmap_count;
struct ib_sge sc_sges[];
@@ -240,20 +231,20 @@ struct rpcrdma_sendctx {
* An external memory region is any buffer or page that is registered
* on the fly (ie, not pre-registered).
*/
-struct rpcrdma_req;
struct rpcrdma_frwr {
struct ib_mr *fr_mr;
struct ib_cqe fr_cqe;
struct completion fr_linv_done;
- struct rpcrdma_req *fr_req;
union {
struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr;
};
};
+struct rpcrdma_req;
struct rpcrdma_mr {
struct list_head mr_list;
+ struct rpcrdma_req *mr_req;
struct scatterlist *mr_sg;
int mr_nents;
enum dma_data_direction mr_dir;
@@ -262,7 +253,6 @@ struct rpcrdma_mr {
u32 mr_handle;
u32 mr_length;
u64 mr_offset;
- struct work_struct mr_recycle;
struct list_head mr_all;
};
@@ -323,6 +313,7 @@ struct rpcrdma_req {
struct rpcrdma_rep *rl_reply;
struct xdr_stream rl_stream;
struct xdr_buf rl_hdrbuf;
+ struct ib_send_wr rl_wr;
struct rpcrdma_sendctx *rl_sendctx;
struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
@@ -331,7 +322,8 @@ struct rpcrdma_req {
struct list_head rl_all;
struct kref rl_kref;
- struct list_head rl_registered; /* registered segments */
+ struct list_head rl_free_mrs;
+ struct list_head rl_registered;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};
@@ -344,7 +336,7 @@ rpcr_to_rdmar(const struct rpc_rqst *rqst)
static inline void
rpcrdma_mr_push(struct rpcrdma_mr *mr, struct list_head *list)
{
- list_add_tail(&mr->mr_list, list);
+ list_add(&mr->mr_list, list);
}
static inline struct rpcrdma_mr *
@@ -352,8 +344,9 @@ rpcrdma_mr_pop(struct list_head *list)
{
struct rpcrdma_mr *mr;
- mr = list_first_entry(list, struct rpcrdma_mr, mr_list);
- list_del_init(&mr->mr_list);
+ mr = list_first_entry_or_null(list, struct rpcrdma_mr, mr_list);
+ if (mr)
+ list_del_init(&mr->mr_list);
return mr;
}
@@ -364,27 +357,28 @@ rpcrdma_mr_pop(struct list_head *list)
* One of these is associated with a transport instance
*/
struct rpcrdma_buffer {
- spinlock_t rb_mrlock; /* protect rb_mrs list */
+ spinlock_t rb_lock;
+ struct list_head rb_send_bufs;
struct list_head rb_mrs;
- struct list_head rb_all;
unsigned long rb_sc_head;
unsigned long rb_sc_tail;
unsigned long rb_sc_last;
struct rpcrdma_sendctx **rb_sc_ctxs;
- spinlock_t rb_lock; /* protect buf lists */
- struct list_head rb_send_bufs;
- struct list_head rb_recv_bufs;
struct list_head rb_allreqs;
+ struct list_head rb_all_mrs;
+ struct list_head rb_all_reps;
- u32 rb_max_requests;
+ struct llist_head rb_free_reps;
+
+ __be32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */
u32 rb_bc_srv_max_requests;
u32 rb_bc_max_requests;
- struct delayed_work rb_refresh_worker;
+ struct work_struct rb_refresh_worker;
};
/*
@@ -477,12 +471,14 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_req *);
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/*
* Buffer calls - xprtrdma/verbs.c
*/
struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
gfp_t flags);
+int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
void rpcrdma_req_destroy(struct rpcrdma_req *req);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
@@ -490,13 +486,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
void rpcrdma_mr_put(struct rpcrdma_mr *mr);
-void rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr);
-
-static inline void
-rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
-{
- schedule_work(&mr->mr_recycle);
-}
+void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
@@ -545,16 +535,15 @@ rpcrdma_data_dir(bool writing)
/* Memory registration calls xprtrdma/frwr_ops.c
*/
-bool frwr_is_supported(struct ib_device *device);
void frwr_reset(struct rpcrdma_req *req);
-int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
+int frwr_query_device(struct rpcrdma_xprt *r_xprt,
+ const struct ib_device *device);
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
void frwr_release_mr(struct rpcrdma_mr *mr);
-size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr_seg *seg,
int nsegs, bool writing, __be32 xid,
- struct rpcrdma_mr **mr);
+ struct rpcrdma_mr *mr);
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
@@ -566,6 +555,8 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
enum rpcrdma_chunktype {
rpcrdma_noch = 0,
+ rpcrdma_noch_pullup,
+ rpcrdma_noch_mapped,
rpcrdma_readch,
rpcrdma_areadch,
rpcrdma_writech,
@@ -579,6 +570,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc);
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
+void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
@@ -590,7 +582,6 @@ static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
/* RPC/RDMA module init - xprtrdma/transport.c
*/
-extern unsigned int xprt_rdma_slot_table_entries;
extern unsigned int xprt_rdma_max_inline_read;
extern unsigned int xprt_rdma_max_inline_write;
void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
OpenPOWER on IntegriCloud