diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 488 |
1 files changed, 316 insertions, 172 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 4345e6912392..28020ec104d4 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -78,8 +78,6 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) size += rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ - dprintk("RPC: %s: max call header size = %u\n", - __func__, size); return size; } @@ -100,8 +98,6 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); size += sizeof(__be32); /* list discriminator */ - dprintk("RPC: %s: max reply header size = %u\n", - __func__, size); return size; } @@ -115,7 +111,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) */ void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt) { - unsigned int maxsegs = r_xprt->rx_ia.ri_max_segs; + unsigned int maxsegs = r_xprt->rx_ia.ri_max_rdma_segs; struct rpcrdma_ep *ep = &r_xprt->rx_ep; ep->rep_max_inline_send = @@ -149,7 +145,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, remaining -= min_t(unsigned int, PAGE_SIZE - offset, remaining); offset = 0; - if (++count > r_xprt->rx_ia.ri_max_send_sges) + if (++count > r_xprt->rx_ep.rep_attr.cap.max_send_sge) return false; } } @@ -342,6 +338,31 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, return 0; } +static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpcrdma_mr_seg *seg, + int nsegs, bool writing, + struct rpcrdma_mr **mr) +{ + *mr = rpcrdma_mr_pop(&req->rl_free_mrs); + if (!*mr) { + *mr = rpcrdma_mr_get(r_xprt); + if (!*mr) + goto out_getmr_err; + trace_xprtrdma_mr_get(req); + (*mr)->mr_req = req; + } + + rpcrdma_mr_push(*mr, &req->rl_registered); + return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); + +out_getmr_err: + trace_xprtrdma_nomrs(req); + xprt_wait_for_buffer_space(&r_xprt->rx_xprt); + rpcrdma_mrs_refresh(r_xprt); + return ERR_PTR(-EAGAIN); +} + /* Register and XDR encode the Read list. Supports encoding a list of read * segments that belong to a single read chunk. * @@ -356,9 +377,10 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, * * Only a single @pos value is currently supported. */ -static noinline int -rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype) +static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype rtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -366,7 +388,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, unsigned int pos; int nsegs; - if (rtype == rpcrdma_noch) + if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) goto done; pos = rqst->rq_snd_buf.head[0].iov_len; @@ -379,10 +401,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return nsegs; do { - seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_read_segment(xdr, mr, pos) < 0) return -EMSGSIZE; @@ -411,9 +432,10 @@ done: * * Only a single Write chunk is currently supported. */ -static noinline int -rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) +static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -440,10 +462,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; @@ -474,9 +495,10 @@ done: * Returns zero on success, or a negative errno if a failure occurred. * @xdr is advanced to the next position in the stream. */ -static noinline int -rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) +static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -501,10 +523,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; @@ -539,6 +560,7 @@ static void rpcrdma_sendctx_done(struct kref *kref) */ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) { + struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf; struct ib_sge *sge; if (!sc->sc_unmap_count) @@ -550,7 +572,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) */ for (sge = &sc->sc_sges[2]; sc->sc_unmap_count; ++sge, --sc->sc_unmap_count) - ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length, + ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); @@ -558,152 +580,228 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) /* Prepare an SGE for the RPC-over-RDMA transport header. */ -static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, +static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, u32 len) { struct rpcrdma_sendctx *sc = req->rl_sendctx; struct rpcrdma_regbuf *rb = req->rl_rdmabuf; - struct ib_sge *sge = sc->sc_sges; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; + + sge->addr = rdmab_addr(rb); + sge->length = len; + sge->lkey = rdmab_lkey(rb); + + ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, + DMA_TO_DEVICE); +} + +/* The head iovec is straightforward, as it is usually already + * DMA-mapped. Sync the content that has changed. + */ +static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, unsigned int len) +{ + struct rpcrdma_sendctx *sc = req->rl_sendctx; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; + struct rpcrdma_regbuf *rb = req->rl_sendbuf; if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) - goto out_regbuf; + return false; + sge->addr = rdmab_addr(rb); sge->length = len; sge->lkey = rdmab_lkey(rb); ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, DMA_TO_DEVICE); - sc->sc_wr.num_sge++; return true; +} -out_regbuf: - pr_err("rpcrdma: failed to DMA map a Send buffer\n"); +/* If there is a page list present, DMA map and prepare an + * SGE for each page to be sent. + */ +static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + struct rpcrdma_sendctx *sc = req->rl_sendctx; + struct rpcrdma_regbuf *rb = req->rl_sendbuf; + unsigned int page_base, len, remaining; + struct page **ppages; + struct ib_sge *sge; + + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_base = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + sge = &sc->sc_sges[req->rl_wr.num_sge++]; + len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); + sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages, + page_base, len, DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) + goto out_mapping_err; + + sge->length = len; + sge->lkey = rdmab_lkey(rb); + + sc->sc_unmap_count++; + ppages++; + remaining -= len; + page_base = 0; + } + + return true; + +out_mapping_err: + trace_xprtrdma_dma_maperr(sge->addr); return false; } -/* Prepare the Send SGEs. The head and tail iovec, and each entry - * in the page list, gets its own SGE. +/* The tail iovec may include an XDR pad for the page list, + * as well as additional content, and may not reside in the + * same page as the head iovec. */ -static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, +static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, struct xdr_buf *xdr, - enum rpcrdma_chunktype rtype) + unsigned int page_base, unsigned int len) { struct rpcrdma_sendctx *sc = req->rl_sendctx; - unsigned int sge_no, page_base, len, remaining; + struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; struct rpcrdma_regbuf *rb = req->rl_sendbuf; - struct ib_sge *sge = sc->sc_sges; - struct page *page, **ppages; + struct page *page = virt_to_page(xdr->tail[0].iov_base); - /* The head iovec is straightforward, as it is already - * DMA-mapped. Sync the content that has changed. - */ - if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) - goto out_regbuf; - sc->sc_device = rdmab_device(rb); - sge_no = 1; - sge[sge_no].addr = rdmab_addr(rb); - sge[sge_no].length = xdr->head[0].iov_len; - sge[sge_no].lkey = rdmab_lkey(rb); - ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr, - sge[sge_no].length, DMA_TO_DEVICE); + sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len, + DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) + goto out_mapping_err; - /* If there is a Read chunk, the page list is being handled - * via explicit RDMA, and thus is skipped here. However, the - * tail iovec may include an XDR pad for the page list, as - * well as additional content, and may not reside in the - * same page as the head iovec. - */ - if (rtype == rpcrdma_readch) { - len = xdr->tail[0].iov_len; + sge->length = len; + sge->lkey = rdmab_lkey(rb); + ++sc->sc_unmap_count; + return true; - /* Do not include the tail if it is only an XDR pad */ - if (len < 4) - goto out; +out_mapping_err: + trace_xprtrdma_dma_maperr(sge->addr); + return false; +} - page = virt_to_page(xdr->tail[0].iov_base); - page_base = offset_in_page(xdr->tail[0].iov_base); +/* Copy the tail to the end of the head buffer. + */ +static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + unsigned char *dst; - /* If the content in the page list is an odd length, - * xdr_write_pages() has added a pad at the beginning - * of the tail iovec. Force the tail's non-pad content - * to land at the next XDR position in the Send message. - */ - page_base += len & 3; - len -= len & 3; - goto map_tail; - } + dst = (unsigned char *)xdr->head[0].iov_base; + dst += xdr->head[0].iov_len + xdr->page_len; + memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len); + r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len; +} - /* If there is a page list present, temporarily DMA map - * and prepare an SGE for each page to be sent. - */ - if (xdr->page_len) { - ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); - page_base = offset_in_page(xdr->page_base); - remaining = xdr->page_len; - while (remaining) { - sge_no++; - if (sge_no > RPCRDMA_MAX_SEND_SGES - 2) - goto out_mapping_overflow; - - len = min_t(u32, PAGE_SIZE - page_base, remaining); - sge[sge_no].addr = - ib_dma_map_page(rdmab_device(rb), *ppages, - page_base, len, DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdmab_device(rb), - sge[sge_no].addr)) - goto out_mapping_err; - sge[sge_no].length = len; - sge[sge_no].lkey = rdmab_lkey(rb); - - sc->sc_unmap_count++; - ppages++; - remaining -= len; - page_base = 0; - } +/* Copy pagelist content into the head buffer. + */ +static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + unsigned int len, page_base, remaining; + struct page **ppages; + unsigned char *src, *dst; + + dst = (unsigned char *)xdr->head[0].iov_base; + dst += xdr->head[0].iov_len; + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_base = offset_in_page(xdr->page_base); + remaining = xdr->page_len; + while (remaining) { + src = page_address(*ppages); + src += page_base; + len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); + memcpy(dst, src, len); + r_xprt->rx_stats.pullup_copy_count += len; + + ppages++; + dst += len; + remaining -= len; + page_base = 0; } +} - /* The tail iovec is not always constructed in the same - * page where the head iovec resides (see, for example, - * gss_wrap_req_priv). To neatly accommodate that case, - * DMA map it separately. - */ - if (xdr->tail[0].iov_len) { - page = virt_to_page(xdr->tail[0].iov_base); - page_base = offset_in_page(xdr->tail[0].iov_base); - len = xdr->tail[0].iov_len; +/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. + * When the head, pagelist, and tail are small, a pull-up copy + * is considerably less costly than DMA mapping the components + * of @xdr. + * + * Assumptions: + * - the caller has already verified that the total length + * of the RPC Call body will fit into @rl_sendbuf. + */ +static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + if (unlikely(xdr->tail[0].iov_len)) + rpcrdma_pullup_tail_iov(r_xprt, req, xdr); -map_tail: - sge_no++; - sge[sge_no].addr = - ib_dma_map_page(rdmab_device(rb), page, page_base, len, - DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdmab_device(rb), sge[sge_no].addr)) - goto out_mapping_err; - sge[sge_no].length = len; - sge[sge_no].lkey = rdmab_lkey(rb); - sc->sc_unmap_count++; - } + if (unlikely(xdr->page_len)) + rpcrdma_pullup_pagelist(r_xprt, req, xdr); -out: - sc->sc_wr.num_sge += sge_no; - if (sc->sc_unmap_count) + /* The whole RPC message resides in the head iovec now */ + return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len); +} + +static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + struct kvec *tail = &xdr->tail[0]; + + if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) + return false; + if (xdr->page_len) + if (!rpcrdma_prepare_pagelist(req, xdr)) + return false; + if (tail->iov_len) + if (!rpcrdma_prepare_tail_iov(req, xdr, + offset_in_page(tail->iov_base), + tail->iov_len)) + return false; + + if (req->rl_sendctx->sc_unmap_count) kref_get(&req->rl_kref); return true; +} -out_regbuf: - pr_err("rpcrdma: failed to DMA map a Send buffer\n"); - return false; +static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct xdr_buf *xdr) +{ + if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) + return false; -out_mapping_overflow: - rpcrdma_sendctx_unmap(sc); - pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); - return false; + /* If there is a Read chunk, the page list is being handled + * via explicit RDMA, and thus is skipped here. + */ -out_mapping_err: - rpcrdma_sendctx_unmap(sc); - trace_xprtrdma_dma_maperr(sge[sge_no].addr); - return false; + /* Do not include the tail if it is only an XDR pad */ + if (xdr->tail[0].iov_len > 3) { + unsigned int page_base, len; + + /* If the content in the page list is an odd length, + * xdr_write_pages() adds a pad at the beginning of + * the tail iovec. Force the tail's non-pad content to + * land at the next XDR position in the Send message. + */ + page_base = offset_in_page(xdr->tail[0].iov_base); + len = xdr->tail[0].iov_len; + page_base += len & 3; + len -= len & 3; + if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) + return false; + kref_get(&req->rl_kref); + } + + return true; } /** @@ -716,31 +814,52 @@ out_mapping_err: * * Returns 0 on success; otherwise a negative errno is returned. */ -int -rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, u32 hdrlen, - struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) +inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, u32 hdrlen, + struct xdr_buf *xdr, + enum rpcrdma_chunktype rtype) { int ret; ret = -EAGAIN; req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); if (!req->rl_sendctx) - goto err; - req->rl_sendctx->sc_wr.num_sge = 0; + goto out_nosc; req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_req = req; kref_init(&req->rl_kref); + req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe; + req->rl_wr.sg_list = req->rl_sendctx->sc_sges; + req->rl_wr.num_sge = 0; + req->rl_wr.opcode = IB_WR_SEND; + + rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen); ret = -EIO; - if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) - goto err; - if (rtype != rpcrdma_areadch) - if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype)) - goto err; + switch (rtype) { + case rpcrdma_noch_pullup: + if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_noch_mapped: + if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_readch: + if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) + goto out_unmap; + break; + case rpcrdma_areadch: + break; + default: + goto out_unmap; + } + return 0; -err: +out_unmap: + rpcrdma_sendctx_unmap(req->rl_sendctx); +out_nosc: trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); return ret; } @@ -770,6 +889,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct xdr_stream *xdr = &req->rl_stream; enum rpcrdma_chunktype rtype, wtype; + struct xdr_buf *buf = &rqst->rq_snd_buf; bool ddp_allowed; __be32 *p; int ret; @@ -785,7 +905,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) goto out_err; *p++ = rqst->rq_xid; *p++ = rpcrdma_version; - *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); + *p++ = r_xprt->rx_buf.rb_max_requests; /* When the ULP employs a GSS flavor that guarantees integrity * or privacy, direct data placement of individual data items @@ -827,8 +947,9 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) */ if (rpcrdma_args_inline(r_xprt, rqst)) { *p++ = rdma_msg; - rtype = rpcrdma_noch; - } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { + rtype = buf->len < rdmab_length(req->rl_sendbuf) ? + rpcrdma_noch_pullup : rpcrdma_noch_mapped; + } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) { *p++ = rdma_msg; rtype = rpcrdma_readch; } else { @@ -837,17 +958,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) rtype = rpcrdma_areadch; } - /* If this is a retransmit, discard previously registered - * chunks. Very likely the connection has been replaced, - * so these registrations are invalid and unusable. - */ - while (unlikely(!list_empty(&req->rl_registered))) { - struct rpcrdma_mr *mr; - - mr = rpcrdma_mr_pop(&req->rl_registered); - rpcrdma_mr_recycle(mr); - } - /* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: * @@ -881,7 +991,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) goto out_err; ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, - &rqst->rq_snd_buf, rtype); + buf, rtype); if (ret) goto out_err; @@ -895,6 +1005,40 @@ out_err: return ret; } +static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt, + struct rpcrdma_buffer *buf, + u32 grant) +{ + buf->rb_credits = grant; + xprt->cwnd = grant << RPC_CWNDSHIFT; +} + +static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + + spin_lock(&xprt->transport_lock); + __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant); + spin_unlock(&xprt->transport_lock); +} + +/** + * rpcrdma_reset_cwnd - Reset the xprt's congestion window + * @r_xprt: controlling transport instance + * + * Prepare @r_xprt for the next connection by reinitializing + * its credit grant to one (see RFC 8166, Section 3.3.3). + */ +void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + + spin_lock(&xprt->transport_lock); + xprt->cong = 0; + __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1); + spin_unlock(&xprt->transport_lock); +} + /** * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs * @rqst: controlling RPC request @@ -934,7 +1078,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) curlen = rqst->rq_rcv_buf.head[0].iov_len; if (curlen > copy_len) curlen = copy_len; - trace_xprtrdma_fixup(rqst, copy_len, curlen); srcp += curlen; copy_len -= curlen; @@ -954,8 +1097,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) if (curlen > pagelist_len) curlen = pagelist_len; - trace_xprtrdma_fixup_pg(rqst, i, srcp, - copy_len, curlen); destp = kmap_atomic(ppages[i]); memcpy(destp + page_base, srcp, curlen); flush_dcache_page(ppages[i]); @@ -987,6 +1128,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) rqst->rq_private_buf.tail[0].iov_base = srcp; } + if (fixup_copy_count) + trace_xprtrdma_fixup(rqst, fixup_copy_count); return fixup_copy_count; } @@ -1240,8 +1383,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) struct rpc_rqst *rqst = rep->rr_rqst; int status; - xprt->reestablish_timeout = 0; - switch (rep->rr_proc) { case rdma_msg: status = rpcrdma_decode_msg(r_xprt, rep, rqst); @@ -1300,6 +1441,12 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) u32 credits; __be32 *p; + /* Any data means we had a useful conversation, so + * then we don't need to delay the next reconnect. + */ + if (xprt->reestablish_timeout) + xprt->reestablish_timeout = 0; + /* Fixed transport header fields */ xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, rep->rr_hdrbuf.head[0].iov_base, NULL); @@ -1329,14 +1476,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (credits == 0) credits = 1; /* don't deadlock */ - else if (credits > buf->rb_max_requests) - credits = buf->rb_max_requests; - if (buf->rb_credits != credits) { - spin_lock(&xprt->transport_lock); - buf->rb_credits = credits; - xprt->cwnd = credits << RPC_CWNDSHIFT; - spin_unlock(&xprt->transport_lock); - } + else if (credits > r_xprt->rx_ep.rep_max_requests) + credits = r_xprt->rx_ep.rep_max_requests; + if (buf->rb_credits != credits) + rpcrdma_update_cwnd(r_xprt, credits); + rpcrdma_post_recvs(r_xprt, false); req = rpcr_to_rdmar(rqst); if (req->rl_reply) { |