diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/auth.c | 9 | ||||
-rw-r--r-- | net/sunrpc/auth_generic.c | 13 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 6 | ||||
-rw-r--r-- | net/sunrpc/auth_unix.c | 6 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 17 | ||||
-rw-r--r-- | net/sunrpc/xdr.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 16 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 134 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 214 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/physical_ops.c | 39 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 517 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 16 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 78 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 47 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 6 |
15 files changed, 676 insertions, 444 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index 02f53674dc39..040ff627c18a 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -543,7 +543,7 @@ rpcauth_cache_enforce_limit(void) */ struct rpc_cred * rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred, - int flags) + int flags, gfp_t gfp) { LIST_HEAD(free); struct rpc_cred_cache *cache = auth->au_credcache; @@ -580,7 +580,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred, if (flags & RPCAUTH_LOOKUP_RCU) return ERR_PTR(-ECHILD); - new = auth->au_ops->crcreate(auth, acred, flags); + new = auth->au_ops->crcreate(auth, acred, flags, gfp); if (IS_ERR(new)) { cred = new; goto out; @@ -703,8 +703,7 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags) new = rpcauth_bind_new_cred(task, lookupflags); if (IS_ERR(new)) return PTR_ERR(new); - if (req->rq_cred != NULL) - put_rpccred(req->rq_cred); + put_rpccred(req->rq_cred); req->rq_cred = new; return 0; } @@ -712,6 +711,8 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags) void put_rpccred(struct rpc_cred *cred) { + if (cred == NULL) + return; /* Fast path for unhashed credentials */ if (test_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags) == 0) { if (atomic_dec_and_test(&cred->cr_count)) diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c index 41248b1820c7..54dd3fdead54 100644 --- a/net/sunrpc/auth_generic.c +++ b/net/sunrpc/auth_generic.c @@ -38,6 +38,13 @@ struct rpc_cred *rpc_lookup_cred(void) } EXPORT_SYMBOL_GPL(rpc_lookup_cred); +struct rpc_cred * +rpc_lookup_generic_cred(struct auth_cred *acred, int flags, gfp_t gfp) +{ + return rpcauth_lookup_credcache(&generic_auth, acred, flags, gfp); +} +EXPORT_SYMBOL_GPL(rpc_lookup_generic_cred); + struct rpc_cred *rpc_lookup_cred_nonblock(void) { return rpcauth_lookupcred(&generic_auth, RPCAUTH_LOOKUP_RCU); @@ -77,15 +84,15 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task, static struct rpc_cred * generic_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(&generic_auth, acred, flags); + return rpcauth_lookup_credcache(&generic_auth, acred, flags, GFP_KERNEL); } static struct rpc_cred * -generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct generic_cred *gcred; - gcred = kmalloc(sizeof(*gcred), GFP_KERNEL); + gcred = kmalloc(sizeof(*gcred), gfp); if (gcred == NULL) return ERR_PTR(-ENOMEM); diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 15612ffa8d57..e64ae93d5b4f 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1299,11 +1299,11 @@ gss_destroy_cred(struct rpc_cred *cred) static struct rpc_cred * gss_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(auth, acred, flags); + return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS); } static struct rpc_cred * -gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct gss_auth *gss_auth = container_of(auth, struct gss_auth, rpc_auth); struct gss_cred *cred = NULL; @@ -1313,7 +1313,7 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) __func__, from_kuid(&init_user_ns, acred->uid), auth->au_flavor); - if (!(cred = kzalloc(sizeof(*cred), GFP_NOFS))) + if (!(cred = kzalloc(sizeof(*cred), gfp))) goto out_err; rpcauth_init_cred(&cred->gc_base, acred, auth, &gss_credops); diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c index 0d3dd364c22f..9f65452b7cbc 100644 --- a/net/sunrpc/auth_unix.c +++ b/net/sunrpc/auth_unix.c @@ -52,11 +52,11 @@ unx_destroy(struct rpc_auth *auth) static struct rpc_cred * unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(auth, acred, flags); + return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS); } static struct rpc_cred * -unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct unx_cred *cred; unsigned int groups = 0; @@ -66,7 +66,7 @@ unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) from_kuid(&init_user_ns, acred->uid), from_kgid(&init_user_ns, acred->gid)); - if (!(cred = kmalloc(sizeof(*cred), GFP_NOFS))) + if (!(cred = kmalloc(sizeof(*cred), gfp))) return ERR_PTR(-ENOMEM); rpcauth_init_cred(&cred->uc_base, acred, auth, &unix_credops); diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 7e0c9bf22df8..06b4df9faaa1 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1414,6 +1414,23 @@ size_t rpc_max_payload(struct rpc_clnt *clnt) EXPORT_SYMBOL_GPL(rpc_max_payload); /** + * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes + * @clnt: RPC client to query + */ +size_t rpc_max_bc_payload(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + size_t ret; + + rcu_read_lock(); + xprt = rcu_dereference(clnt->cl_xprt); + ret = xprt->ops->bc_maxpayload(xprt); + rcu_read_unlock(); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_max_bc_payload); + +/** * rpc_get_timeout - Get timeout for transport in units of HZ * @clnt: RPC client to query */ diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 6bdb3865212d..c4f3cc0c0775 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -797,6 +797,8 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p) xdr_set_iov(xdr, buf->head, buf->len); else if (buf->page_len != 0) xdr_set_page_base(xdr, 0, buf->len); + else + xdr_set_iov(xdr, buf->head, buf->len); if (p != NULL && p > xdr->p && xdr->end >= p) { xdr->nwords -= p - xdr->p; xdr->p = p; diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 2dcd7640eeb5..87762d976b63 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -192,6 +192,22 @@ int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) } /** + * xprt_rdma_bc_maxpayload - Return maximum backchannel message size + * @xprt: transport + * + * Returns maximum size, in bytes, of a backchannel message + */ +size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; + size_t maxmsg; + + maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize); + return maxmsg - RPCRDMA_HDRLEN_MIN; +} + +/** * rpcrdma_bc_marshal_reply - Send backwards direction reply * @rqst: buffer containing RPC reply data * diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index b289e106540b..6326ebe8b595 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -35,10 +35,71 @@ /* Maximum scatter/gather per FMR */ #define RPCRDMA_MAX_FMR_SGES (64) +static struct workqueue_struct *fmr_recovery_wq; + +#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND) + +int +fmr_alloc_recovery_wq(void) +{ + fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0); + return !fmr_recovery_wq ? -ENOMEM : 0; +} + +void +fmr_destroy_recovery_wq(void) +{ + struct workqueue_struct *wq; + + if (!fmr_recovery_wq) + return; + + wq = fmr_recovery_wq; + fmr_recovery_wq = NULL; + destroy_workqueue(wq); +} + +static int +__fmr_unmap(struct rpcrdma_mw *mw) +{ + LIST_HEAD(l); + + list_add(&mw->fmr.fmr->list, &l); + return ib_unmap_fmr(&l); +} + +/* Deferred reset of a single FMR. Generate a fresh rkey by + * replacing the MR. There's no recovery if this fails. + */ +static void +__fmr_recovery_worker(struct work_struct *work) +{ + struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw, + mw_work); + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + + __fmr_unmap(mw); + rpcrdma_put_mw(r_xprt, mw); + return; +} + +/* A broken MR was discovered in a context that can't sleep. + * Defer recovery to the recovery worker. + */ +static void +__fmr_queue_recovery(struct rpcrdma_mw *mw) +{ + INIT_WORK(&mw->mw_work, __fmr_recovery_worker); + queue_work(fmr_recovery_wq, &mw->mw_work); +} + static int fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_create_data_internal *cdata) { + rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, + RPCRDMA_MAX_DATA_SEGS / + RPCRDMA_MAX_FMR_SGES)); return 0; } @@ -48,7 +109,7 @@ static size_t fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) { return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES); + RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES); } static int @@ -89,6 +150,7 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) if (IS_ERR(r->fmr.fmr)) goto out_fmr_err; + r->mw_xprt = r_xprt; list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); } @@ -104,15 +166,6 @@ out: return rc; } -static int -__fmr_unmap(struct rpcrdma_mw *r) -{ - LIST_HEAD(l); - - list_add(&r->fmr.fmr->list, &l); - return ib_unmap_fmr(&l); -} - /* Use the ib_map_phys_fmr() verb to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ @@ -183,15 +236,10 @@ static void __fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) { struct ib_device *device = r_xprt->rx_ia.ri_device; - struct rpcrdma_mw *mw = seg->rl_mw; int nsegs = seg->mr_nsegs; - seg->rl_mw = NULL; - while (nsegs--) rpcrdma_unmap_one(device, seg++); - - rpcrdma_put_mw(r_xprt, mw); } /* Invalidate all memory regions that were registered for "req". @@ -234,42 +282,50 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) seg = &req->rl_segments[i]; __fmr_dma_unmap(r_xprt, seg); + rpcrdma_put_mw(r_xprt, seg->rl_mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; + seg->rl_mw = NULL; } req->rl_nchunks = 0; } -/* Use the ib_unmap_fmr() verb to prevent further remote - * access via RDMA READ or RDMA WRITE. +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + * + * In the asynchronous case, DMA unmapping occurs first here + * because the rpcrdma_mr_seg is released immediately after this + * call. It's contents won't be available in __fmr_dma_unmap later. + * FIXME. */ -static int -fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) +static void +fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) { - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_mw *mw = seg1->rl_mw; - int rc, nsegs = seg->mr_nsegs; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; + unsigned int i; - dprintk("RPC: %s: FMR %p\n", __func__, mw); + for (i = 0; req->rl_nchunks; req->rl_nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; - seg1->rl_mw = NULL; - while (seg1->mr_nsegs--) - rpcrdma_unmap_one(ia->ri_device, seg++); - rc = __fmr_unmap(mw); - if (rc) - goto out_err; - rpcrdma_put_mw(r_xprt, mw); - return nsegs; + if (sync) { + /* ORDER */ + __fmr_unmap(mw); + __fmr_dma_unmap(r_xprt, seg); + rpcrdma_put_mw(r_xprt, mw); + } else { + __fmr_dma_unmap(r_xprt, seg); + __fmr_queue_recovery(mw); + } -out_err: - /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy - * will attempt to release it when the transport is destroyed. - */ - dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc); - return nsegs; + i += seg->mr_nsegs; + seg->mr_nsegs = 0; + seg->rl_mw = NULL; + } } static void @@ -295,7 +351,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_map = fmr_op_map, .ro_unmap_sync = fmr_op_unmap_sync, - .ro_unmap = fmr_op_unmap, + .ro_unmap_safe = fmr_op_unmap_safe, .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, .ro_init = fmr_op_init, diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 94c3fa910b85..c0947544babe 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -98,6 +98,47 @@ frwr_destroy_recovery_wq(void) destroy_workqueue(wq); } +static int +__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) +{ + struct rpcrdma_frmr *f = &r->frmr; + int rc; + + rc = ib_dereg_mr(f->fr_mr); + if (rc) { + pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n", + rc, r); + return rc; + } + + f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, + ia->ri_max_frmr_depth); + if (IS_ERR(f->fr_mr)) { + pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n", + PTR_ERR(f->fr_mr), r); + return PTR_ERR(f->fr_mr); + } + + dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); + f->fr_state = FRMR_IS_INVALID; + return 0; +} + +static void +__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) +{ + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_frmr *f = &mw->frmr; + int rc; + + rc = __frwr_reset_mr(ia, mw); + ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir); + if (rc) + return; + + rpcrdma_put_mw(r_xprt, mw); +} + /* Deferred reset of a single FRMR. Generate a fresh rkey by * replacing the MR. * @@ -109,26 +150,10 @@ static void __frwr_recovery_worker(struct work_struct *work) { struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - frmr.fr_work); - struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt; - unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; - struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - - if (ib_dereg_mr(r->frmr.fr_mr)) - goto out_fail; + mw_work); - r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); - if (IS_ERR(r->frmr.fr_mr)) - goto out_fail; - - dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); - r->frmr.fr_state = FRMR_IS_INVALID; - rpcrdma_put_mw(r_xprt, r); + __frwr_reset_and_unmap(r->mw_xprt, r); return; - -out_fail: - pr_warn("RPC: %s: FRMR %p unrecovered\n", - __func__, r); } /* A broken MR was discovered in a context that can't sleep. @@ -137,8 +162,8 @@ out_fail: static void __frwr_queue_recovery(struct rpcrdma_mw *r) { - INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->frmr.fr_work); + INIT_WORK(&r->mw_work, __frwr_recovery_worker); + queue_work(frwr_recovery_wq, &r->mw_work); } static int @@ -152,11 +177,11 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, if (IS_ERR(f->fr_mr)) goto out_mr_err; - f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL); - if (!f->sg) + f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL); + if (!f->fr_sg) goto out_list_err; - sg_init_table(f->sg, depth); + sg_init_table(f->fr_sg, depth); init_completion(&f->fr_linv_done); @@ -185,7 +210,7 @@ __frwr_release(struct rpcrdma_mw *r) if (rc) dprintk("RPC: %s: ib_dereg_mr status %i\n", __func__, rc); - kfree(r->frmr.sg); + kfree(r->frmr.fr_sg); } static int @@ -231,6 +256,9 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, depth; } + rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, + RPCRDMA_MAX_DATA_SEGS / + ia->ri_max_frmr_depth)); return 0; } @@ -243,7 +271,7 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ia *ia = &r_xprt->rx_ia; return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); + RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth); } static void @@ -350,9 +378,9 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) return rc; } + r->mw_xprt = r_xprt; list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); - r->frmr.fr_xprt = r_xprt; } return 0; @@ -396,12 +424,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, for (i = 0; i < nsegs;) { if (seg->mr_page) - sg_set_page(&frmr->sg[i], + sg_set_page(&frmr->fr_sg[i], seg->mr_page, seg->mr_len, offset_in_page(seg->mr_offset)); else - sg_set_buf(&frmr->sg[i], seg->mr_offset, + sg_set_buf(&frmr->fr_sg[i], seg->mr_offset, seg->mr_len); ++seg; @@ -412,25 +440,26 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - frmr->sg_nents = i; + frmr->fr_nents = i; + frmr->fr_dir = direction; - dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction); + dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction); if (!dma_nents) { pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n", - __func__, frmr->sg, frmr->sg_nents); + __func__, frmr->fr_sg, frmr->fr_nents); return -ENOMEM; } - n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE); - if (unlikely(n != frmr->sg_nents)) { + n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE); + if (unlikely(n != frmr->fr_nents)) { pr_err("RPC: %s: failed to map mr %p (%u/%u)\n", - __func__, frmr->fr_mr, n, frmr->sg_nents); + __func__, frmr->fr_mr, n, frmr->fr_nents); rc = n < 0 ? n : -EINVAL; goto out_senderr; } dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n", - __func__, mw, frmr->sg_nents, mr->length); + __func__, mw, frmr->fr_nents, mr->length); key = (u8)(mr->rkey & 0x000000FF); ib_update_fast_reg_key(mr, ++key); @@ -452,18 +481,16 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (rc) goto out_senderr; - seg1->mr_dir = direction; seg1->rl_mw = mw; seg1->mr_rkey = mr->rkey; seg1->mr_base = mr->iova; - seg1->mr_nsegs = frmr->sg_nents; + seg1->mr_nsegs = frmr->fr_nents; seg1->mr_len = mr->length; - return frmr->sg_nents; + return frmr->fr_nents; out_senderr: dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction); __frwr_queue_recovery(mw); return rc; } @@ -487,24 +514,6 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) return invalidate_wr; } -static void -__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int rc) -{ - struct ib_device *device = r_xprt->rx_ia.ri_device; - struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->frmr; - - seg->rl_mw = NULL; - - ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir); - - if (!rc) - rpcrdma_put_mw(r_xprt, mw); - else - __frwr_queue_recovery(mw); -} - /* Invalidate all memory regions that were registered for "req". * * Sleeps until it is safe for the host CPU to access the @@ -518,6 +527,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) struct rpcrdma_mr_seg *seg; unsigned int i, nchunks; struct rpcrdma_frmr *f; + struct rpcrdma_mw *mw; int rc; dprintk("RPC: %s: req %p\n", __func__, req); @@ -558,11 +568,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * unless ri_id->qp is a valid pointer. */ rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); - if (rc) { - pr_warn("%s: ib_post_send failed %i\n", __func__, rc); - rdma_disconnect(ia->ri_id); - goto unmap; - } + if (rc) + goto reset_mrs; wait_for_completion(&f->fr_linv_done); @@ -572,56 +579,65 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) unmap: for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { seg = &req->rl_segments[i]; + mw = seg->rl_mw; + seg->rl_mw = NULL; - __frwr_dma_unmap(r_xprt, seg, rc); + ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, + f->fr_dir); + rpcrdma_put_mw(r_xprt, mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; } req->rl_nchunks = 0; -} + return; -/* Post a LOCAL_INV Work Request to prevent further remote access - * via RDMA READ or RDMA WRITE. - */ -static int -frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) -{ - struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mw *mw = seg1->rl_mw; - struct rpcrdma_frmr *frmr = &mw->frmr; - struct ib_send_wr *invalidate_wr, *bad_wr; - int rc, nsegs = seg->mr_nsegs; +reset_mrs: + pr_warn("%s: ib_post_send failed %i\n", __func__, rc); - dprintk("RPC: %s: FRMR %p\n", __func__, mw); + /* Find and reset the MRs in the LOCAL_INV WRs that did not + * get posted. This is synchronous, and slow. + */ + for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; + f = &mw->frmr; - seg1->rl_mw = NULL; - frmr->fr_state = FRMR_IS_INVALID; - invalidate_wr = &mw->frmr.fr_invwr; + if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) { + __frwr_reset_mr(ia, mw); + bad_wr = bad_wr->next; + } - memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - frmr->fr_cqe.done = frwr_wc_localinv; - invalidate_wr->wr_cqe = &frmr->fr_cqe; - invalidate_wr->opcode = IB_WR_LOCAL_INV; - invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey; - DECR_CQCOUNT(&r_xprt->rx_ep); + i += seg->mr_nsegs; + } + goto unmap; +} - ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir); - read_lock(&ia->ri_qplock); - rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr); - read_unlock(&ia->ri_qplock); - if (rc) - goto out_err; +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + */ +static void +frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) +{ + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; + unsigned int i; - rpcrdma_put_mw(r_xprt, mw); - return nsegs; + for (i = 0; req->rl_nchunks; req->rl_nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; -out_err: - dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - __frwr_queue_recovery(mw); - return nsegs; + if (sync) + __frwr_reset_and_unmap(r_xprt, mw); + else + __frwr_queue_recovery(mw); + + i += seg->mr_nsegs; + seg->mr_nsegs = 0; + seg->rl_mw = NULL; + } } static void @@ -643,7 +659,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, .ro_unmap_sync = frwr_op_unmap_sync, - .ro_unmap = frwr_op_unmap, + .ro_unmap_safe = frwr_op_unmap_safe, .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, .ro_init = frwr_op_init, diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index 481b9b6f4a15..3750596cc432 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -36,8 +36,11 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, __func__, PTR_ERR(mr)); return -ENOMEM; } - ia->ri_dma_mr = mr; + + rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int, + RPCRDMA_MAX_DATA_SEGS, + RPCRDMA_MAX_HDR_SEGS)); return 0; } @@ -47,7 +50,7 @@ static size_t physical_op_maxpages(struct rpcrdma_xprt *r_xprt) { return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt)); + RPCRDMA_MAX_HDR_SEGS); } static int @@ -71,17 +74,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, return 1; } -/* Unmap a memory region, but leave it registered. - */ -static int -physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) -{ - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - - rpcrdma_unmap_one(ia->ri_device, seg); - return 1; -} - /* DMA unmap all memory regions that were mapped for "req". */ static void @@ -94,6 +86,25 @@ physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) rpcrdma_unmap_one(device, &req->rl_segments[i++]); } +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + * + * For physical memory registration, there is no good way to + * fence a single MR that has been advertised to the server. The + * client has already handed the server an R_key that cannot be + * invalidated and is shared by all MRs on this connection. + * Tearing down the PD might be the only safe choice, but it's + * not clear that a freshly acquired DMA R_key would be different + * than the one used by the PD that was just destroyed. + * FIXME. + */ +static void +physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) +{ + physical_op_unmap_sync(r_xprt, req); +} + static void physical_op_destroy(struct rpcrdma_buffer *buf) { @@ -102,7 +113,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = { .ro_map = physical_op_map, .ro_unmap_sync = physical_op_unmap_sync, - .ro_unmap = physical_op_unmap, + .ro_unmap_safe = physical_op_unmap_safe, .ro_open = physical_op_open, .ro_maxpages = physical_op_maxpages, .ro_init = physical_op_init, diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 888823bb6dae..35a81096e83d 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -61,26 +61,84 @@ enum rpcrdma_chunktype { rpcrdma_replych }; -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static const char transfertypes[][12] = { - "pure inline", /* no chunks */ - " read chunk", /* some argument via rdma read */ - "*read chunk", /* entire request via rdma read */ - "write chunk", /* some result via rdma write */ + "inline", /* no chunks */ + "read list", /* some argument via rdma read */ + "*read list", /* entire request via rdma read */ + "write list", /* some result via rdma write */ "reply chunk" /* entire reply via rdma write */ }; -#endif + +/* Returns size of largest RPC-over-RDMA header in a Call message + * + * The largest Call header contains a full-size Read list and a + * minimal Reply chunk. + */ +static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) +{ + unsigned int size; + + /* Fixed header fields and list discriminators */ + size = RPCRDMA_HDRLEN_MIN; + + /* Maximum Read list size */ + maxsegs += 2; /* segment for head and tail buffers */ + size = maxsegs * sizeof(struct rpcrdma_read_chunk); + + /* Minimal Read chunk size */ + size += sizeof(__be32); /* segment count */ + size += sizeof(struct rpcrdma_segment); + size += sizeof(__be32); /* list discriminator */ + + dprintk("RPC: %s: max call header size = %u\n", + __func__, size); + return size; +} + +/* Returns size of largest RPC-over-RDMA header in a Reply message + * + * There is only one Write list or one Reply chunk per Reply + * message. The larger list is the Write list. + */ +static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) +{ + unsigned int size; + + /* Fixed header fields and list discriminators */ + size = RPCRDMA_HDRLEN_MIN; + + /* Maximum Write list size */ + maxsegs += 2; /* segment for head and tail buffers */ + size = sizeof(__be32); /* segment count */ + size += maxsegs * sizeof(struct rpcrdma_segment); + size += sizeof(__be32); /* list discriminator */ + + dprintk("RPC: %s: max reply header size = %u\n", + __func__, size); + return size; +} + +void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia, + struct rpcrdma_create_data_internal *cdata, + unsigned int maxsegs) +{ + ia->ri_max_inline_write = cdata->inline_wsize - + rpcrdma_max_call_header_size(maxsegs); + ia->ri_max_inline_read = cdata->inline_rsize - + rpcrdma_max_reply_header_size(maxsegs); +} /* The client can send a request inline as long as the RPCRDMA header * plus the RPC call fit under the transport's inline limit. If the * combined call message size exceeds that limit, the client must use * the read chunk list for this operation. */ -static bool rpcrdma_args_inline(struct rpc_rqst *rqst) +static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) { - unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; - return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); + return rqst->rq_snd_buf.len <= ia->ri_max_inline_write; } /* The client can't know how large the actual reply will be. Thus it @@ -89,11 +147,12 @@ static bool rpcrdma_args_inline(struct rpc_rqst *rqst) * limit, the client must provide a write list or a reply chunk for * this request. */ -static bool rpcrdma_results_inline(struct rpc_rqst *rqst) +static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) { - unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; - return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst); + return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; } static int @@ -226,23 +285,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, return n; } -/* - * Create read/write chunk lists, and reply chunks, for RDMA - * - * Assume check against THRESHOLD has been done, and chunks are required. - * Assume only encoding one list entry for read|write chunks. The NFSv3 - * protocol is simple enough to allow this as it only has a single "bulk - * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The - * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) - * - * When used for a single reply chunk (which is a special write - * chunk used for the entire reply, rather than just the data), it - * is used primarily for READDIR and READLINK which would otherwise - * be severely size-limited by a small rdma inline read max. The server - * response will come back as an RDMA Write, followed by a message - * of type RDMA_NOMSG carrying the xid and length. As a result, reply - * chunks do not provide data alignment, however they do not require - * "fixup" (moving the response to the upper layer buffer) either. +static inline __be32 * +xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg) +{ + *iptr++ = cpu_to_be32(seg->mr_rkey); + *iptr++ = cpu_to_be32(seg->mr_len); + return xdr_encode_hyper(iptr, seg->mr_base); +} + +/* XDR-encode the Read list. Supports encoding a list of read + * segments that belong to a single read chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * @@ -250,131 +302,190 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * N elements, position P (same P for all chunks of same arg!): * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 * + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Read list, or an error pointer. + */ +static __be32 * +rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, struct rpc_rqst *rqst, + __be32 *iptr, enum rpcrdma_chunktype rtype) +{ + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + unsigned int pos; + int n, nsegs; + + if (rtype == rpcrdma_noch) { + *iptr++ = xdr_zero; /* item not present */ + return iptr; + } + + pos = rqst->rq_snd_buf.head[0].iov_len; + if (rtype == rpcrdma_areadch) + pos = 0; + nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); + if (nsegs < 0) + return ERR_PTR(nsegs); + + do { + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false); + if (n <= 0) + return ERR_PTR(n); + + *iptr++ = xdr_one; /* item present */ + + /* All read segments in this chunk + * have the same "position". + */ + *iptr++ = cpu_to_be32(pos); + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: read segment pos %u " + "%d@0x%016llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, pos, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.read_chunk_count++; + req->rl_nchunks++; + seg += n; + nsegs -= n; + } while (nsegs); + req->rl_nextseg = seg; + + /* Finish Read list */ + *iptr++ = xdr_zero; /* Next item not present */ + return iptr; +} + +/* XDR-encode the Write list. Supports encoding a list containing + * one array of plain segments that belong to a single write chunk. + * + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): + * * Write chunklist (a list of (one) counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO - 0 * + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Write list, or an error pointer. + */ +static __be32 * +rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + struct rpc_rqst *rqst, __be32 *iptr, + enum rpcrdma_chunktype wtype) +{ + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + int n, nsegs, nchunks; + __be32 *segcount; + + if (wtype != rpcrdma_writech) { + *iptr++ = xdr_zero; /* no Write list present */ + return iptr; + } + + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, + rqst->rq_rcv_buf.head[0].iov_len, + wtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); + if (nsegs < 0) + return ERR_PTR(nsegs); + + *iptr++ = xdr_one; /* Write list present */ + segcount = iptr++; /* save location of segment count */ + + nchunks = 0; + do { + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); + if (n <= 0) + return ERR_PTR(n); + + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: write segment " + "%d@0x016%llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.write_chunk_count++; + r_xprt->rx_stats.total_rdma_request += seg->mr_len; + req->rl_nchunks++; + nchunks++; + seg += n; + nsegs -= n; + } while (nsegs); + req->rl_nextseg = seg; + + /* Update count of segments in this Write chunk */ + *segcount = cpu_to_be32(nchunks); + + /* Finish Write list */ + *iptr++ = xdr_zero; /* Next item not present */ + return iptr; +} + +/* XDR-encode the Reply chunk. Supports encoding an array of plain + * segments that belong to a single write (reply) chunk. + * + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): + * * Reply chunk (a counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO * - * Returns positive RPC/RDMA header size, or negative errno. + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Reply chunk, or an error pointer. */ - -static ssize_t -rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, - struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) +static __be32 * +rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, struct rpc_rqst *rqst, + __be32 *iptr, enum rpcrdma_chunktype wtype) { - struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); - int n, nsegs, nchunks = 0; - unsigned int pos; - struct rpcrdma_mr_seg *seg = req->rl_segments; - struct rpcrdma_read_chunk *cur_rchunk = NULL; - struct rpcrdma_write_array *warray = NULL; - struct rpcrdma_write_chunk *cur_wchunk = NULL; - __be32 *iptr = headerp->rm_body.rm_chunks; - int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool); - - if (type == rpcrdma_readch || type == rpcrdma_areadch) { - /* a read chunk - server will RDMA Read our memory */ - cur_rchunk = (struct rpcrdma_read_chunk *) iptr; - } else { - /* a write or reply chunk - server will RDMA Write our memory */ - *iptr++ = xdr_zero; /* encode a NULL read chunk list */ - if (type == rpcrdma_replych) - *iptr++ = xdr_zero; /* a NULL write chunk list */ - warray = (struct rpcrdma_write_array *) iptr; - cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1); - } + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + int n, nsegs, nchunks; + __be32 *segcount; - if (type == rpcrdma_replych || type == rpcrdma_areadch) - pos = 0; - else - pos = target->head[0].iov_len; + if (wtype != rpcrdma_replych) { + *iptr++ = xdr_zero; /* no Reply chunk present */ + return iptr; + } - nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); if (nsegs < 0) - return nsegs; + return ERR_PTR(nsegs); - map = r_xprt->rx_ia.ri_ops->ro_map; + *iptr++ = xdr_one; /* Reply chunk present */ + segcount = iptr++; /* save location of segment count */ + + nchunks = 0; do { - n = map(r_xprt, seg, nsegs, cur_wchunk != NULL); + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); if (n <= 0) - goto out; - if (cur_rchunk) { /* read */ - cur_rchunk->rc_discrim = xdr_one; - /* all read chunks have the same "position" */ - cur_rchunk->rc_position = cpu_to_be32(pos); - cur_rchunk->rc_target.rs_handle = - cpu_to_be32(seg->mr_rkey); - cur_rchunk->rc_target.rs_length = - cpu_to_be32(seg->mr_len); - xdr_encode_hyper( - (__be32 *)&cur_rchunk->rc_target.rs_offset, - seg->mr_base); - dprintk("RPC: %s: read chunk " - "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, pos, n < nsegs ? "more" : "last"); - cur_rchunk++; - r_xprt->rx_stats.read_chunk_count++; - } else { /* write/reply */ - cur_wchunk->wc_target.rs_handle = - cpu_to_be32(seg->mr_rkey); - cur_wchunk->wc_target.rs_length = - cpu_to_be32(seg->mr_len); - xdr_encode_hyper( - (__be32 *)&cur_wchunk->wc_target.rs_offset, - seg->mr_base); - dprintk("RPC: %s: %s chunk " - "elem %d@0x%llx:0x%x (%s)\n", __func__, - (type == rpcrdma_replych) ? "reply" : "write", - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); - cur_wchunk++; - if (type == rpcrdma_replych) - r_xprt->rx_stats.reply_chunk_count++; - else - r_xprt->rx_stats.write_chunk_count++; - r_xprt->rx_stats.total_rdma_request += seg->mr_len; - } + return ERR_PTR(n); + + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: reply segment " + "%d@0x%016llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.reply_chunk_count++; + r_xprt->rx_stats.total_rdma_request += seg->mr_len; + req->rl_nchunks++; nchunks++; seg += n; nsegs -= n; } while (nsegs); + req->rl_nextseg = seg; - /* success. all failures return above */ - req->rl_nchunks = nchunks; - - /* - * finish off header. If write, marshal discrim and nchunks. - */ - if (cur_rchunk) { - iptr = (__be32 *) cur_rchunk; - *iptr++ = xdr_zero; /* finish the read chunk list */ - *iptr++ = xdr_zero; /* encode a NULL write chunk list */ - *iptr++ = xdr_zero; /* encode a NULL reply chunk */ - } else { - warray->wc_discrim = xdr_one; - warray->wc_nchunks = cpu_to_be32(nchunks); - iptr = (__be32 *) cur_wchunk; - if (type == rpcrdma_writech) { - *iptr++ = xdr_zero; /* finish the write chunk list */ - *iptr++ = xdr_zero; /* encode a NULL reply chunk */ - } - } - - /* - * Return header size. - */ - return (unsigned char *)iptr - (unsigned char *)headerp; + /* Update count of segments in the Reply chunk */ + *segcount = cpu_to_be32(nchunks); -out: - for (pos = 0; nchunks--;) - pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, - &req->rl_segments[pos]); - return n; + return iptr; } /* @@ -440,13 +551,10 @@ static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) * Marshal a request: the primary job of this routine is to choose * the transfer modes. See comments below. * - * Uses multiple RDMA IOVs for a request: - * [0] -- RPC RDMA header, which uses memory from the *start* of the - * preregistered buffer that already holds the RPC data in - * its middle. - * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. - * [2] -- optional padding. - * [3] -- if padded, header only in [1] and data here. + * Prepares up to two IOVs per Call message: + * + * [0] -- RPC RDMA header + * [1] -- the RPC header/data * * Returns zero on success, otherwise a negative errno. */ @@ -457,24 +565,17 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - char *base; - size_t rpclen; - ssize_t hdrlen; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; + ssize_t hdrlen; + size_t rpclen; + __be32 *iptr; #if defined(CONFIG_SUNRPC_BACKCHANNEL) if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) return rpcrdma_bc_marshal_reply(rqst); #endif - /* - * rpclen gets amount of data in first buffer, which is the - * pre-registered buffer. - */ - base = rqst->rq_svec[0].iov_base; - rpclen = rqst->rq_svec[0].iov_len; - headerp = rdmab_to_msg(req->rl_rdmabuf); /* don't byte-swap XID, it's already done in request */ headerp->rm_xid = rqst->rq_xid; @@ -485,15 +586,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) /* * Chunks needed for results? * - * o Read ops return data as write chunk(s), header as inline. * o If the expected result is under the inline threshold, all ops * return as inline. + * o Large read ops return data as write chunk(s), header as + * inline. * o Large non-read ops return as a single reply chunk. */ - if (rqst->rq_rcv_buf.flags & XDRBUF_READ) - wtype = rpcrdma_writech; - else if (rpcrdma_results_inline(rqst)) + if (rpcrdma_results_inline(r_xprt, rqst)) wtype = rpcrdma_noch; + else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) + wtype = rpcrdma_writech; else wtype = rpcrdma_replych; @@ -511,10 +613,14 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * that both has a data payload, and whose non-data arguments * by themselves are larger than the inline threshold. */ - if (rpcrdma_args_inline(rqst)) { + if (rpcrdma_args_inline(r_xprt, rqst)) { rtype = rpcrdma_noch; + rpcrdma_inline_pullup(rqst); + rpclen = rqst->rq_svec[0].iov_len; } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; + rpclen = rqst->rq_svec[0].iov_len; + rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); } else { r_xprt->rx_stats.nomsg_call_count++; headerp->rm_type = htonl(RDMA_NOMSG); @@ -522,57 +628,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) rpclen = 0; } - /* The following simplification is not true forever */ - if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) - wtype = rpcrdma_noch; - if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { - dprintk("RPC: %s: cannot marshal multiple chunk lists\n", - __func__); - return -EIO; - } - - hdrlen = RPCRDMA_HDRLEN_MIN; - - /* - * Pull up any extra send data into the preregistered buffer. - * When padding is in use and applies to the transfer, insert - * it and change the message type. + /* This implementation supports the following combinations + * of chunk lists in one RPC-over-RDMA Call message: + * + * - Read list + * - Write list + * - Reply chunk + * - Read list + Reply chunk + * + * It might not yet support the following combinations: + * + * - Read list + Write list + * + * It does not support the following combinations: + * + * - Write list + Reply chunk + * - Read list + Write list + Reply chunk + * + * This implementation supports only a single chunk in each + * Read or Write list. Thus for example the client cannot + * send a Call message with a Position Zero Read chunk and a + * regular Read chunk at the same time. */ - if (rtype == rpcrdma_noch) { - - rpcrdma_inline_pullup(rqst); - - headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; - /* new length after pullup */ - rpclen = rqst->rq_svec[0].iov_len; - } else if (rtype == rpcrdma_readch) - rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); - if (rtype != rpcrdma_noch) { - hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, - headerp, rtype); - wtype = rtype; /* simplify dprintk */ - - } else if (wtype != rpcrdma_noch) { - hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, - headerp, wtype); - } - if (hdrlen < 0) - return hdrlen; + req->rl_nchunks = 0; + req->rl_nextseg = req->rl_segments; + iptr = headerp->rm_body.rm_chunks; + iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); + if (IS_ERR(iptr)) + goto out_unmap; + iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype); + if (IS_ERR(iptr)) + goto out_unmap; + iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype); + if (IS_ERR(iptr)) + goto out_unmap; + hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; + + if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) + goto out_overflow; + + dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", + rqst->rq_task->tk_pid, __func__, + transfertypes[rtype], transfertypes[wtype], + hdrlen, rpclen); - dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd" - " headerp 0x%p base 0x%p lkey 0x%x\n", - __func__, transfertypes[wtype], hdrlen, rpclen, - headerp, base, rdmab_lkey(req->rl_rdmabuf)); - - /* - * initialize send_iov's - normally only two: rdma chunk header and - * single preregistered RPC header buffer, but if padding is present, - * then use a preregistered (and zeroed) pad buffer between the RPC - * header and any write data. In all non-rdma cases, any following - * data has been copied into the RPC header buffer. - */ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); req->rl_send_iov[0].length = hdrlen; req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); @@ -587,6 +686,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) req->rl_niovs = 2; return 0; + +out_overflow: + pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n", + hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]); + /* Terminate this RPC. Chunks registered above will be + * released by xprt_release -> xprt_rmda_free . + */ + return -EIO; + +out_unmap: + r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); + return PTR_ERR(iptr); } /* diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index b1b009f10ea3..99d2e5b72726 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -73,6 +73,8 @@ static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; +static unsigned int min_inline_size = RPCRDMA_MIN_INLINE; +static unsigned int max_inline_size = RPCRDMA_MAX_INLINE; static unsigned int zero; static unsigned int max_padding = PAGE_SIZE; static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; @@ -96,6 +98,8 @@ static struct ctl_table xr_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec, + .extra1 = &min_inline_size, + .extra2 = &max_inline_size, }, { .procname = "rdma_max_inline_write", @@ -103,6 +107,8 @@ static struct ctl_table xr_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec, + .extra1 = &min_inline_size, + .extra2 = &max_inline_size, }, { .procname = "rdma_inline_write_padding", @@ -508,6 +514,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) out: dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); req->rl_connect_cookie = 0; /* our reserved value */ + req->rl_task = task; return req->rl_sendbuf->rg_base; out_rdmabuf: @@ -564,7 +571,6 @@ xprt_rdma_free(void *buffer) struct rpcrdma_req *req; struct rpcrdma_xprt *r_xprt; struct rpcrdma_regbuf *rb; - int i; if (buffer == NULL) return; @@ -578,11 +584,8 @@ xprt_rdma_free(void *buffer) dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - for (i = 0; req->rl_nchunks;) { - --req->rl_nchunks; - i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, - &req->rl_segments[i]); - } + r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, + !RPC_IS_ASYNC(req->rl_task)); rpcrdma_buffer_put(req); } @@ -707,6 +710,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = { #if defined(CONFIG_SUNRPC_BACKCHANNEL) .bc_setup = xprt_rdma_bc_setup, .bc_up = xprt_rdma_bc_up, + .bc_maxpayload = xprt_rdma_bc_maxpayload, .bc_free_rqst = xprt_rdma_bc_free_rqst, .bc_destroy = xprt_rdma_bc_destroy, #endif diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index f5ed9f982cd7..b044d98a1370 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -203,15 +203,6 @@ out_fail: goto out_schedule; } -static void -rpcrdma_flush_cqs(struct rpcrdma_ep *ep) -{ - struct ib_wc wc; - - while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_receive_wc(NULL, &wc); -} - static int rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) { @@ -374,23 +365,6 @@ out: } /* - * Drain any cq, prior to teardown. - */ -static void -rpcrdma_clean_cq(struct ib_cq *cq) -{ - struct ib_wc wc; - int count = 0; - - while (1 == ib_poll_cq(cq, 1, &wc)) - ++count; - - if (count) - dprintk("RPC: %s: flushed %d events (last 0x%x)\n", - __func__, count, wc.opcode); -} - -/* * Exported functions. */ @@ -459,7 +433,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) dprintk("RPC: %s: memory registration strategy is '%s'\n", __func__, ia->ri_ops->ro_displayname); - rwlock_init(&ia->ri_qplock); return 0; out3: @@ -515,7 +488,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, __func__); return -ENOMEM; } - max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS; + max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; /* check provider's send/recv wr limits */ if (cdata->max_requests > max_qp_wr) @@ -526,11 +499,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.srq = NULL; ep->rep_attr.cap.max_send_wr = cdata->max_requests; ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; + ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ rc = ia->ri_ops->ro_open(ia, ep, cdata); if (rc) return rc; ep->rep_attr.cap.max_recv_wr = cdata->max_requests; ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; + ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; @@ -578,6 +553,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.recv_cq = recvcq; /* Initialize cma parameters */ + memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); /* RPC/RDMA does not use private data */ ep->rep_remote_cma.private_data = NULL; @@ -591,7 +567,16 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_remote_cma.responder_resources = ia->ri_device->attrs.max_qp_rd_atom; - ep->rep_remote_cma.retry_count = 7; + /* Limit transport retries so client can detect server + * GID changes quickly. RPC layer handles re-establishing + * transport connection and retransmission. + */ + ep->rep_remote_cma.retry_count = 6; + + /* RPC-over-RDMA handles its own flow control. In addition, + * make all RNR NAKs visible so we know that RPC-over-RDMA + * flow control is working correctly (no NAKs should be seen). + */ ep->rep_remote_cma.flow_control = 0; ep->rep_remote_cma.rnr_retry_count = 0; @@ -622,13 +607,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) cancel_delayed_work_sync(&ep->rep_connect_worker); - if (ia->ri_id->qp) - rpcrdma_ep_disconnect(ep, ia); - - rpcrdma_clean_cq(ep->rep_attr.recv_cq); - rpcrdma_clean_cq(ep->rep_attr.send_cq); - if (ia->ri_id->qp) { + rpcrdma_ep_disconnect(ep, ia); rdma_destroy_qp(ia->ri_id); ia->ri_id->qp = NULL; } @@ -659,7 +639,6 @@ retry: dprintk("RPC: %s: reconnecting...\n", __func__); rpcrdma_ep_disconnect(ep, ia); - rpcrdma_flush_cqs(ep); xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); id = rpcrdma_create_id(xprt, ia, @@ -692,10 +671,8 @@ retry: goto out; } - write_lock(&ia->ri_qplock); old = ia->ri_id; ia->ri_id = id; - write_unlock(&ia->ri_qplock); rdma_destroy_qp(old); rpcrdma_destroy_id(old); @@ -785,7 +762,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { int rc; - rpcrdma_flush_cqs(ep); rc = rdma_disconnect(ia->ri_id); if (!rc) { /* returns without wait if not connected */ @@ -797,6 +773,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); ep->rep_connected = rc; } + + ib_drain_qp(ia->ri_id->qp); } struct rpcrdma_req * @@ -1271,25 +1249,3 @@ out_rc: rpcrdma_recv_buffer_put(rep); return rc; } - -/* How many chunk list items fit within our inline buffers? - */ -unsigned int -rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; - int bytes, segments; - - bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize); - bytes -= RPCRDMA_HDRLEN_MIN; - if (bytes < sizeof(struct rpcrdma_segment) * 2) { - pr_warn("RPC: %s: inline threshold too small\n", - __func__); - return 0; - } - - segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1); - dprintk("RPC: %s: max chunk list size = %d segments\n", - __func__, segments); - return segments; -} diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 2ebc743cb96f..95cdc66225ee 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -65,7 +65,6 @@ */ struct rpcrdma_ia { const struct rpcrdma_memreg_ops *ri_ops; - rwlock_t ri_qplock; struct ib_device *ri_device; struct rdma_cm_id *ri_id; struct ib_pd *ri_pd; @@ -73,6 +72,8 @@ struct rpcrdma_ia { struct completion ri_done; int ri_async_rc; unsigned int ri_max_frmr_depth; + unsigned int ri_max_inline_write; + unsigned int ri_max_inline_read; struct ib_qp_attr ri_qp_attr; struct ib_qp_init_attr ri_qp_init_attr; }; @@ -144,6 +145,26 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) #define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN) +/* To ensure a transport can always make forward progress, + * the number of RDMA segments allowed in header chunk lists + * is capped at 8. This prevents less-capable devices and + * memory registrations from overrunning the Send buffer + * while building chunk lists. + * + * Elements of the Read list take up more room than the + * Write list or Reply chunk. 8 read segments means the Read + * list (or Write list or Reply chunk) cannot consume more + * than + * + * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes. + * + * And the fixed part of the header is another 24 bytes. + * + * The smallest inline threshold is 1024 bytes, ensuring that + * at least 750 bytes are available for RPC messages. + */ +#define RPCRDMA_MAX_HDR_SEGS (8) + /* * struct rpcrdma_rep -- this structure encapsulates state required to recv * and complete a reply, asychronously. It needs several pieces of @@ -162,7 +183,9 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) */ #define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) -#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ + +/* data segments + head/tail for Call + head/tail for Reply */ +#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4) struct rpcrdma_buffer; @@ -198,14 +221,13 @@ enum rpcrdma_frmr_state { }; struct rpcrdma_frmr { - struct scatterlist *sg; - int sg_nents; + struct scatterlist *fr_sg; + int fr_nents; + enum dma_data_direction fr_dir; struct ib_mr *fr_mr; struct ib_cqe fr_cqe; enum rpcrdma_frmr_state fr_state; struct completion fr_linv_done; - struct work_struct fr_work; - struct rpcrdma_xprt *fr_xprt; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; @@ -222,6 +244,8 @@ struct rpcrdma_mw { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; }; + struct work_struct mw_work; + struct rpcrdma_xprt *mw_xprt; struct list_head mw_list; struct list_head mw_all; }; @@ -270,12 +294,14 @@ struct rpcrdma_req { unsigned int rl_niovs; unsigned int rl_nchunks; unsigned int rl_connect_cookie; + struct rpc_task *rl_task; struct rpcrdma_buffer *rl_buffer; struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + struct rpcrdma_mr_seg *rl_nextseg; struct ib_cqe rl_cqe; struct list_head rl_all; @@ -372,8 +398,8 @@ struct rpcrdma_memreg_ops { struct rpcrdma_mr_seg *, int, bool); void (*ro_unmap_sync)(struct rpcrdma_xprt *, struct rpcrdma_req *); - int (*ro_unmap)(struct rpcrdma_xprt *, - struct rpcrdma_mr_seg *); + void (*ro_unmap_safe)(struct rpcrdma_xprt *, + struct rpcrdma_req *, bool); int (*ro_open)(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_create_data_internal *); @@ -456,7 +482,6 @@ struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, void rpcrdma_free_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *); -unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); int frwr_alloc_recovery_wq(void); @@ -519,6 +544,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *); * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c */ int rpcrdma_marshal_req(struct rpc_rqst *); +void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *, + struct rpcrdma_create_data_internal *, + unsigned int); /* RPC/RDMA module init - xprtrdma/transport.c */ @@ -534,6 +562,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int xprt_rdma_bc_up(struct svc_serv *, struct net *); +size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index b90c5397b5e1..2d3e0c42361e 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1364,6 +1364,11 @@ static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) return ret; return 0; } + +static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) +{ + return PAGE_SIZE; +} #else static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) @@ -2661,6 +2666,7 @@ static struct rpc_xprt_ops xs_tcp_ops = { #ifdef CONFIG_SUNRPC_BACKCHANNEL .bc_setup = xprt_setup_bc, .bc_up = xs_tcp_bc_up, + .bc_maxpayload = xs_tcp_bc_maxpayload, .bc_free_rqst = xprt_free_bc_rqst, .bc_destroy = xprt_destroy_bc, #endif |