Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

svcrdma: Use generic RDMA R/W API in RPC Call path

The current svcrdma recvfrom code path has a lot of detail about
registration mode and the type of port (iWARP, IB, etc).

Instead, use the RDMA core's generic R/W API. This shares code with
other RDMA-enabled ULPs that manages the gory details of buffer
registration and the posting of RDMA Read Work Requests.

Since the Read list marshaling code is being replaced, I took the
opportunity to replace C structure-based XDR encoding code with more
portable code that uses pointer arithmetic.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>

authored by

Chuck Lever and committed by
J. Bruce Fields
cafc7398 026d958b

+106 -468
-14
include/linux/sunrpc/svc_rdma.h
··· 82 82 int hdr_count; 83 83 struct xdr_buf arg; 84 84 struct ib_cqe cqe; 85 - struct ib_cqe reg_cqe; 86 - struct ib_cqe inv_cqe; 87 85 u32 byte_len; 88 - u32 position; 89 86 struct svcxprt_rdma *xprt; 90 87 unsigned long flags; 91 88 enum dma_data_direction direction; ··· 113 116 struct list_head sc_accept_q; /* Conn. waiting accept */ 114 117 int sc_ord; /* RDMA read limit */ 115 118 int sc_max_sge; 116 - int sc_max_sge_rd; /* max sge for read target */ 117 119 bool sc_snd_w_inv; /* OK to use Send With Invalidate */ 118 120 119 121 atomic_t sc_sq_avail; /* SQEs ready to be consumed */ ··· 137 141 struct ib_qp *sc_qp; 138 142 struct ib_cq *sc_rq_cq; 139 143 struct ib_cq *sc_sq_cq; 140 - int (*sc_reader)(struct svcxprt_rdma *, 141 - struct svc_rqst *, 142 - struct svc_rdma_op_ctxt *, 143 - int *, u32 *, u32, u32, u64, bool); 144 144 u32 sc_dev_caps; /* distilled device caps */ 145 145 unsigned int sc_frmr_pg_list_len; 146 146 struct list_head sc_frmr_q; ··· 179 187 180 188 /* svc_rdma_recvfrom.c */ 181 189 extern int svc_rdma_recvfrom(struct svc_rqst *); 182 - extern int rdma_read_chunk_lcl(struct svcxprt_rdma *, struct svc_rqst *, 183 - struct svc_rdma_op_ctxt *, int *, u32 *, 184 - u32, u32, u64, bool); 185 - extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *, 186 - struct svc_rdma_op_ctxt *, int *, u32 *, 187 - u32, u32, u64, bool); 188 190 189 191 /* svc_rdma_rw.c */ 190 192 extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
+106 -441
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
··· 41 41 * Author: Tom Tucker <tom@opengridcomputing.com> 42 42 */ 43 43 44 - #include <linux/sunrpc/xdr.h> 45 - #include <linux/sunrpc/debug.h> 46 - #include <linux/sunrpc/rpc_rdma.h> 47 - #include <linux/spinlock.h> 44 + /* Operation 45 + * 46 + * The main entry point is svc_rdma_recvfrom. This is called from 47 + * svc_recv when the transport indicates there is incoming data to 48 + * be read. "Data Ready" is signaled when an RDMA Receive completes, 49 + * or when a set of RDMA Reads complete. 50 + * 51 + * An svc_rqst is passed in. This structure contains an array of 52 + * free pages (rq_pages) that will contain the incoming RPC message. 53 + * 54 + * Short messages are moved directly into svc_rqst::rq_arg, and 55 + * the RPC Call is ready to be processed by the Upper Layer. 56 + * svc_rdma_recvfrom returns the length of the RPC Call message, 57 + * completing the reception of the RPC Call. 58 + * 59 + * However, when an incoming message has Read chunks, 60 + * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's 61 + * data payload from the client. svc_rdma_recvfrom sets up the 62 + * RDMA Reads using pages in svc_rqst::rq_pages, which are 63 + * transferred to an svc_rdma_op_ctxt for the duration of the 64 + * I/O. svc_rdma_recvfrom then returns zero, since the RPC message 65 + * is still not yet ready. 66 + * 67 + * When the Read chunk payloads have become available on the 68 + * server, "Data Ready" is raised again, and svc_recv calls 69 + * svc_rdma_recvfrom again. This second call may use a different 70 + * svc_rqst than the first one, thus any information that needs 71 + * to be preserved across these two calls is kept in an 72 + * svc_rdma_op_ctxt. 73 + * 74 + * The second call to svc_rdma_recvfrom performs final assembly 75 + * of the RPC Call message, using the RDMA Read sink pages kept in 76 + * the svc_rdma_op_ctxt. The xdr_buf is copied from the 77 + * svc_rdma_op_ctxt to the second svc_rqst. The second call returns 78 + * the length of the completed RPC Call message. 79 + * 80 + * Page Management 81 + * 82 + * Pages under I/O must be transferred from the first svc_rqst to an 83 + * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns. 84 + * 85 + * The first svc_rqst supplies pages for RDMA Reads. These are moved 86 + * from rqstp::rq_pages into ctxt::pages. The consumed elements of 87 + * the rq_pages array are set to NULL and refilled with the first 88 + * svc_rdma_recvfrom call returns. 89 + * 90 + * During the second svc_rdma_recvfrom call, RDMA Read sink pages 91 + * are transferred from the svc_rdma_op_ctxt to the second svc_rqst 92 + * (see rdma_read_complete() below). 93 + */ 94 + 48 95 #include <asm/unaligned.h> 49 96 #include <rdma/ib_verbs.h> 50 97 #include <rdma/rdma_cm.h> 98 + 99 + #include <linux/spinlock.h> 100 + 101 + #include <linux/sunrpc/xdr.h> 102 + #include <linux/sunrpc/debug.h> 103 + #include <linux/sunrpc/rpc_rdma.h> 51 104 #include <linux/sunrpc/svc_rdma.h> 52 105 53 106 #define RPCDBG_FACILITY RPCDBG_SVCXPRT ··· 114 61 struct svc_rdma_op_ctxt *ctxt, 115 62 u32 byte_count) 116 63 { 117 - struct rpcrdma_msg *rmsgp; 118 64 struct page *page; 119 65 u32 bc; 120 66 int sge_no; ··· 136 84 /* If data remains, store it in the pagelist */ 137 85 rqstp->rq_arg.page_len = bc; 138 86 rqstp->rq_arg.page_base = 0; 139 - 140 - /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */ 141 - rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base; 142 - if (rmsgp->rm_type == rdma_nomsg) 143 - rqstp->rq_arg.pages = &rqstp->rq_pages[0]; 144 - else 145 - rqstp->rq_arg.pages = &rqstp->rq_pages[1]; 146 87 147 88 sge_no = 1; 148 89 while (bc && sge_no < ctxt->count) { ··· 365 320 return -EINVAL; 366 321 } 367 322 368 - /* Issue an RDMA_READ using the local lkey to map the data sink */ 369 - int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, 370 - struct svc_rqst *rqstp, 371 - struct svc_rdma_op_ctxt *head, 372 - int *page_no, 373 - u32 *page_offset, 374 - u32 rs_handle, 375 - u32 rs_length, 376 - u64 rs_offset, 377 - bool last) 378 - { 379 - struct ib_rdma_wr read_wr; 380 - int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; 381 - struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); 382 - int ret, read, pno; 383 - u32 pg_off = *page_offset; 384 - u32 pg_no = *page_no; 385 - 386 - ctxt->direction = DMA_FROM_DEVICE; 387 - ctxt->read_hdr = head; 388 - pages_needed = min_t(int, pages_needed, xprt->sc_max_sge_rd); 389 - read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset, 390 - rs_length); 391 - 392 - for (pno = 0; pno < pages_needed; pno++) { 393 - int len = min_t(int, rs_length, PAGE_SIZE - pg_off); 394 - 395 - head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; 396 - head->arg.page_len += len; 397 - 398 - head->arg.len += len; 399 - if (!pg_off) 400 - head->count++; 401 - rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1]; 402 - rqstp->rq_next_page = rqstp->rq_respages + 1; 403 - ctxt->sge[pno].addr = 404 - ib_dma_map_page(xprt->sc_cm_id->device, 405 - head->arg.pages[pg_no], pg_off, 406 - PAGE_SIZE - pg_off, 407 - DMA_FROM_DEVICE); 408 - ret = ib_dma_mapping_error(xprt->sc_cm_id->device, 409 - ctxt->sge[pno].addr); 410 - if (ret) 411 - goto err; 412 - svc_rdma_count_mappings(xprt, ctxt); 413 - 414 - ctxt->sge[pno].lkey = xprt->sc_pd->local_dma_lkey; 415 - ctxt->sge[pno].length = len; 416 - ctxt->count++; 417 - 418 - /* adjust offset and wrap to next page if needed */ 419 - pg_off += len; 420 - if (pg_off == PAGE_SIZE) { 421 - pg_off = 0; 422 - pg_no++; 423 - } 424 - rs_length -= len; 425 - } 426 - 427 - if (last && rs_length == 0) 428 - set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 429 - else 430 - clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 431 - 432 - memset(&read_wr, 0, sizeof(read_wr)); 433 - ctxt->cqe.done = svc_rdma_wc_read; 434 - read_wr.wr.wr_cqe = &ctxt->cqe; 435 - read_wr.wr.opcode = IB_WR_RDMA_READ; 436 - read_wr.wr.send_flags = IB_SEND_SIGNALED; 437 - read_wr.rkey = rs_handle; 438 - read_wr.remote_addr = rs_offset; 439 - read_wr.wr.sg_list = ctxt->sge; 440 - read_wr.wr.num_sge = pages_needed; 441 - 442 - ret = svc_rdma_send(xprt, &read_wr.wr); 443 - if (ret) { 444 - pr_err("svcrdma: Error %d posting RDMA_READ\n", ret); 445 - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 446 - goto err; 447 - } 448 - 449 - /* return current location in page array */ 450 - *page_no = pg_no; 451 - *page_offset = pg_off; 452 - ret = read; 453 - atomic_inc(&rdma_stat_read); 454 - return ret; 455 - err: 456 - svc_rdma_unmap_dma(ctxt); 457 - svc_rdma_put_context(ctxt, 0); 458 - return ret; 459 - } 460 - 461 - /* Issue an RDMA_READ using an FRMR to map the data sink */ 462 - int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt, 463 - struct svc_rqst *rqstp, 464 - struct svc_rdma_op_ctxt *head, 465 - int *page_no, 466 - u32 *page_offset, 467 - u32 rs_handle, 468 - u32 rs_length, 469 - u64 rs_offset, 470 - bool last) 471 - { 472 - struct ib_rdma_wr read_wr; 473 - struct ib_send_wr inv_wr; 474 - struct ib_reg_wr reg_wr; 475 - u8 key; 476 - int nents = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; 477 - struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); 478 - struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt); 479 - int ret, read, pno, dma_nents, n; 480 - u32 pg_off = *page_offset; 481 - u32 pg_no = *page_no; 482 - 483 - if (IS_ERR(frmr)) 484 - return -ENOMEM; 485 - 486 - ctxt->direction = DMA_FROM_DEVICE; 487 - ctxt->frmr = frmr; 488 - nents = min_t(unsigned int, nents, xprt->sc_frmr_pg_list_len); 489 - read = min_t(int, (nents << PAGE_SHIFT) - *page_offset, rs_length); 490 - 491 - frmr->direction = DMA_FROM_DEVICE; 492 - frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE); 493 - frmr->sg_nents = nents; 494 - 495 - for (pno = 0; pno < nents; pno++) { 496 - int len = min_t(int, rs_length, PAGE_SIZE - pg_off); 497 - 498 - head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; 499 - head->arg.page_len += len; 500 - head->arg.len += len; 501 - if (!pg_off) 502 - head->count++; 503 - 504 - sg_set_page(&frmr->sg[pno], rqstp->rq_arg.pages[pg_no], 505 - len, pg_off); 506 - 507 - rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1]; 508 - rqstp->rq_next_page = rqstp->rq_respages + 1; 509 - 510 - /* adjust offset and wrap to next page if needed */ 511 - pg_off += len; 512 - if (pg_off == PAGE_SIZE) { 513 - pg_off = 0; 514 - pg_no++; 515 - } 516 - rs_length -= len; 517 - } 518 - 519 - if (last && rs_length == 0) 520 - set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 521 - else 522 - clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 523 - 524 - dma_nents = ib_dma_map_sg(xprt->sc_cm_id->device, 525 - frmr->sg, frmr->sg_nents, 526 - frmr->direction); 527 - if (!dma_nents) { 528 - pr_err("svcrdma: failed to dma map sg %p\n", 529 - frmr->sg); 530 - return -ENOMEM; 531 - } 532 - 533 - n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE); 534 - if (unlikely(n != frmr->sg_nents)) { 535 - pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n", 536 - frmr->mr, n, frmr->sg_nents); 537 - return n < 0 ? n : -EINVAL; 538 - } 539 - 540 - /* Bump the key */ 541 - key = (u8)(frmr->mr->lkey & 0x000000FF); 542 - ib_update_fast_reg_key(frmr->mr, ++key); 543 - 544 - ctxt->sge[0].addr = frmr->mr->iova; 545 - ctxt->sge[0].lkey = frmr->mr->lkey; 546 - ctxt->sge[0].length = frmr->mr->length; 547 - ctxt->count = 1; 548 - ctxt->read_hdr = head; 549 - 550 - /* Prepare REG WR */ 551 - ctxt->reg_cqe.done = svc_rdma_wc_reg; 552 - reg_wr.wr.wr_cqe = &ctxt->reg_cqe; 553 - reg_wr.wr.opcode = IB_WR_REG_MR; 554 - reg_wr.wr.send_flags = IB_SEND_SIGNALED; 555 - reg_wr.wr.num_sge = 0; 556 - reg_wr.mr = frmr->mr; 557 - reg_wr.key = frmr->mr->lkey; 558 - reg_wr.access = frmr->access_flags; 559 - reg_wr.wr.next = &read_wr.wr; 560 - 561 - /* Prepare RDMA_READ */ 562 - memset(&read_wr, 0, sizeof(read_wr)); 563 - ctxt->cqe.done = svc_rdma_wc_read; 564 - read_wr.wr.wr_cqe = &ctxt->cqe; 565 - read_wr.wr.send_flags = IB_SEND_SIGNALED; 566 - read_wr.rkey = rs_handle; 567 - read_wr.remote_addr = rs_offset; 568 - read_wr.wr.sg_list = ctxt->sge; 569 - read_wr.wr.num_sge = 1; 570 - if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) { 571 - read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV; 572 - read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey; 573 - } else { 574 - read_wr.wr.opcode = IB_WR_RDMA_READ; 575 - read_wr.wr.next = &inv_wr; 576 - /* Prepare invalidate */ 577 - memset(&inv_wr, 0, sizeof(inv_wr)); 578 - ctxt->inv_cqe.done = svc_rdma_wc_inv; 579 - inv_wr.wr_cqe = &ctxt->inv_cqe; 580 - inv_wr.opcode = IB_WR_LOCAL_INV; 581 - inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE; 582 - inv_wr.ex.invalidate_rkey = frmr->mr->lkey; 583 - } 584 - 585 - /* Post the chain */ 586 - ret = svc_rdma_send(xprt, &reg_wr.wr); 587 - if (ret) { 588 - pr_err("svcrdma: Error %d posting RDMA_READ\n", ret); 589 - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 590 - goto err; 591 - } 592 - 593 - /* return current location in page array */ 594 - *page_no = pg_no; 595 - *page_offset = pg_off; 596 - ret = read; 597 - atomic_inc(&rdma_stat_read); 598 - return ret; 599 - err: 600 - svc_rdma_put_context(ctxt, 0); 601 - svc_rdma_put_frmr(xprt, frmr); 602 - return ret; 603 - } 604 - 605 - /* If there was additional inline content, append it to the end of arg.pages. 606 - * Tail copy has to be done after the reader function has determined how many 607 - * pages are needed for RDMA READ. 608 - */ 609 - static int 610 - rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head, 611 - u32 position, u32 byte_count, u32 page_offset, int page_no) 612 - { 613 - char *srcp, *destp; 614 - 615 - srcp = head->arg.head[0].iov_base + position; 616 - byte_count = head->arg.head[0].iov_len - position; 617 - if (byte_count > PAGE_SIZE) { 618 - dprintk("svcrdma: large tail unsupported\n"); 619 - return 0; 620 - } 621 - 622 - /* Fit as much of the tail on the current page as possible */ 623 - if (page_offset != PAGE_SIZE) { 624 - destp = page_address(rqstp->rq_arg.pages[page_no]); 625 - destp += page_offset; 626 - while (byte_count--) { 627 - *destp++ = *srcp++; 628 - page_offset++; 629 - if (page_offset == PAGE_SIZE && byte_count) 630 - goto more; 631 - } 632 - goto done; 633 - } 634 - 635 - more: 636 - /* Fit the rest on the next page */ 637 - page_no++; 638 - destp = page_address(rqstp->rq_arg.pages[page_no]); 639 - while (byte_count--) 640 - *destp++ = *srcp++; 641 - 642 - rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; 643 - rqstp->rq_next_page = rqstp->rq_respages + 1; 644 - 645 - done: 646 - byte_count = head->arg.head[0].iov_len - position; 647 - head->arg.page_len += byte_count; 648 - head->arg.len += byte_count; 649 - head->arg.buflen += byte_count; 650 - return 1; 651 - } 652 - 653 - /* Returns the address of the first read chunk or <nul> if no read chunk 654 - * is present 655 - */ 656 - static struct rpcrdma_read_chunk * 657 - svc_rdma_get_read_chunk(struct rpcrdma_msg *rmsgp) 658 - { 659 - struct rpcrdma_read_chunk *ch = 660 - (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 661 - 662 - if (ch->rc_discrim == xdr_zero) 663 - return NULL; 664 - return ch; 665 - } 666 - 667 - static int rdma_read_chunks(struct svcxprt_rdma *xprt, 668 - struct rpcrdma_msg *rmsgp, 669 - struct svc_rqst *rqstp, 670 - struct svc_rdma_op_ctxt *head) 671 - { 672 - int page_no, ret; 673 - struct rpcrdma_read_chunk *ch; 674 - u32 handle, page_offset, byte_count; 675 - u32 position; 676 - u64 rs_offset; 677 - bool last; 678 - 679 - /* If no read list is present, return 0 */ 680 - ch = svc_rdma_get_read_chunk(rmsgp); 681 - if (!ch) 682 - return 0; 683 - 684 - /* The request is completed when the RDMA_READs complete. The 685 - * head context keeps all the pages that comprise the 686 - * request. 687 - */ 688 - head->arg.head[0] = rqstp->rq_arg.head[0]; 689 - head->arg.tail[0] = rqstp->rq_arg.tail[0]; 690 - head->hdr_count = head->count; 691 - head->arg.page_base = 0; 692 - head->arg.page_len = 0; 693 - head->arg.len = rqstp->rq_arg.len; 694 - head->arg.buflen = rqstp->rq_arg.buflen; 695 - 696 - /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */ 697 - position = be32_to_cpu(ch->rc_position); 698 - if (position == 0) { 699 - head->arg.pages = &head->pages[0]; 700 - page_offset = head->byte_len; 701 - } else { 702 - head->arg.pages = &head->pages[head->count]; 703 - page_offset = 0; 704 - } 705 - 706 - ret = 0; 707 - page_no = 0; 708 - for (; ch->rc_discrim != xdr_zero; ch++) { 709 - if (be32_to_cpu(ch->rc_position) != position) 710 - goto err; 711 - 712 - handle = be32_to_cpu(ch->rc_target.rs_handle), 713 - byte_count = be32_to_cpu(ch->rc_target.rs_length); 714 - xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset, 715 - &rs_offset); 716 - 717 - while (byte_count > 0) { 718 - last = (ch + 1)->rc_discrim == xdr_zero; 719 - ret = xprt->sc_reader(xprt, rqstp, head, 720 - &page_no, &page_offset, 721 - handle, byte_count, 722 - rs_offset, last); 723 - if (ret < 0) 724 - goto err; 725 - byte_count -= ret; 726 - rs_offset += ret; 727 - head->arg.buflen += ret; 728 - } 729 - } 730 - 731 - /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */ 732 - if (page_offset & 3) { 733 - u32 pad = 4 - (page_offset & 3); 734 - 735 - head->arg.tail[0].iov_len += pad; 736 - head->arg.len += pad; 737 - head->arg.buflen += pad; 738 - page_offset += pad; 739 - } 740 - 741 - ret = 1; 742 - if (position && position < head->arg.head[0].iov_len) 743 - ret = rdma_copy_tail(rqstp, head, position, 744 - byte_count, page_offset, page_no); 745 - head->arg.head[0].iov_len = position; 746 - head->position = position; 747 - 748 - err: 749 - /* Detach arg pages. svc_recv will replenish them */ 750 - for (page_no = 0; 751 - &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++) 752 - rqstp->rq_pages[page_no] = NULL; 753 - 754 - return ret; 755 - } 756 - 757 323 static void rdma_read_complete(struct svc_rqst *rqstp, 758 324 struct svc_rdma_op_ctxt *head) 759 325 { ··· 376 720 rqstp->rq_pages[page_no] = head->pages[page_no]; 377 721 } 378 722 379 - /* Adjustments made for RDMA_NOMSG type requests */ 380 - if (head->position == 0) { 381 - if (head->arg.len <= head->sge[0].length) { 382 - head->arg.head[0].iov_len = head->arg.len - 383 - head->byte_len; 384 - head->arg.page_len = 0; 385 - } else { 386 - head->arg.head[0].iov_len = head->sge[0].length - 387 - head->byte_len; 388 - head->arg.page_len = head->arg.len - 389 - head->sge[0].length; 390 - } 391 - } 392 - 393 723 /* Point rq_arg.pages past header */ 394 724 rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count]; 395 725 rqstp->rq_arg.page_len = head->arg.page_len; 396 - rqstp->rq_arg.page_base = head->arg.page_base; 397 726 398 727 /* rq_respages starts after the last arg page */ 399 728 rqstp->rq_respages = &rqstp->rq_pages[page_no]; ··· 475 834 return true; 476 835 } 477 836 478 - /* 479 - * Set up the rqstp thread context to point to the RQ buffer. If 480 - * necessary, pull additional data from the client with an RDMA_READ 481 - * request. 837 + /** 838 + * svc_rdma_recvfrom - Receive an RPC call 839 + * @rqstp: request structure into which to receive an RPC Call 840 + * 841 + * Returns: 842 + * The positive number of bytes in the RPC Call message, 843 + * %0 if there were no Calls ready to return, 844 + * %-EINVAL if the Read chunk data is too large, 845 + * %-ENOMEM if rdma_rw context pool was exhausted, 846 + * %-ENOTCONN if posting failed (connection is lost), 847 + * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 848 + * 849 + * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only 850 + * when there are no remaining ctxt's to process. 851 + * 852 + * The next ctxt is removed from the "receive" lists. 853 + * 854 + * - If the ctxt completes a Read, then finish assembling the Call 855 + * message and return the number of bytes in the message. 856 + * 857 + * - If the ctxt completes a Receive, then construct the Call 858 + * message from the contents of the Receive buffer. 859 + * 860 + * - If there are no Read chunks in this message, then finish 861 + * assembling the Call message and return the number of bytes 862 + * in the message. 863 + * 864 + * - If there are Read chunks in this message, post Read WRs to 865 + * pull that payload and return 0. 482 866 */ 483 867 int svc_rdma_recvfrom(struct svc_rqst *rqstp) 484 868 { ··· 511 845 struct svcxprt_rdma *rdma_xprt = 512 846 container_of(xprt, struct svcxprt_rdma, sc_xprt); 513 847 struct svc_rdma_op_ctxt *ctxt; 514 - struct rpcrdma_msg *rmsgp; 848 + __be32 *p; 515 849 int ret; 516 - 517 - dprintk("svcrdma: rqstp=%p\n", rqstp); 518 850 519 851 spin_lock(&rdma_xprt->sc_rq_dto_lock); 520 852 if (!list_empty(&rdma_xprt->sc_read_complete_q)) { ··· 534 870 } 535 871 spin_unlock(&rdma_xprt->sc_rq_dto_lock); 536 872 537 - dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p\n", 873 + dprintk("svcrdma: recvfrom: ctxt=%p on xprt=%p, rqstp=%p\n", 538 874 ctxt, rdma_xprt, rqstp); 539 875 atomic_inc(&rdma_stat_recv); 540 876 ··· 542 878 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len); 543 879 544 880 /* Decode the RDMA header. */ 545 - rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base; 881 + p = (__be32 *)rqstp->rq_arg.head[0].iov_base; 546 882 ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg); 547 883 if (ret < 0) 548 884 goto out_err; ··· 550 886 goto out_drop; 551 887 rqstp->rq_xprt_hlen = ret; 552 888 553 - if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) { 554 - ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, 555 - &rmsgp->rm_xid, 889 + if (svc_rdma_is_backchannel_reply(xprt, p)) { 890 + ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p, 556 891 &rqstp->rq_arg); 557 892 svc_rdma_put_context(ctxt, 0); 558 893 if (ret) ··· 559 896 return ret; 560 897 } 561 898 562 - /* Read read-list data. */ 563 - ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt); 564 - if (ret > 0) { 565 - /* read-list posted, defer until data received from client. */ 566 - goto defer; 567 - } else if (ret < 0) { 568 - /* Post of read-list failed, free context. */ 569 - svc_rdma_put_context(ctxt, 1); 570 - return 0; 571 - } 899 + p += rpcrdma_fixed_maxsz; 900 + if (*p != xdr_zero) 901 + goto out_readchunk; 572 902 573 903 complete: 574 904 ret = rqstp->rq_arg.head[0].iov_len ··· 577 921 svc_xprt_copy_addrs(rqstp, xprt); 578 922 return ret; 579 923 924 + out_readchunk: 925 + ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p); 926 + if (ret < 0) 927 + goto out_postfail; 928 + return 0; 929 + 580 930 out_err: 581 - svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret); 931 + svc_rdma_send_error(rdma_xprt, p, ret); 582 932 svc_rdma_put_context(ctxt, 0); 583 933 return 0; 584 934 585 - defer: 586 - return 0; 935 + out_postfail: 936 + if (ret == -EINVAL) 937 + svc_rdma_send_error(rdma_xprt, p, ret); 938 + svc_rdma_put_context(ctxt, 1); 939 + return ret; 587 940 588 941 out_drop: 589 942 svc_rdma_put_context(ctxt, 1);
-13
net/sunrpc/xprtrdma/svc_rdma_transport.c
··· 908 908 * capabilities of this particular device */ 909 909 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 910 910 (size_t)RPCSVC_MAXPAGES); 911 - newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd, 912 - RPCSVC_MAXPAGES); 913 911 newxprt->sc_max_req_size = svcrdma_max_req_size; 914 912 newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr, 915 913 svcrdma_max_requests); ··· 996 998 * NB: iWARP requires remote write access for the data sink 997 999 * of an RDMA_READ. IB does not. 998 1000 */ 999 - newxprt->sc_reader = rdma_read_chunk_lcl; 1000 1001 if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) { 1001 1002 newxprt->sc_frmr_pg_list_len = 1002 1003 dev->attrs.max_fast_reg_page_list_len; 1003 1004 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG; 1004 - newxprt->sc_reader = rdma_read_chunk_frmr; 1005 1005 } else 1006 1006 newxprt->sc_snd_w_inv = false; 1007 1007 ··· 1052 1056 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 1053 1057 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); 1054 1058 dprintk(" max_sge : %d\n", newxprt->sc_max_sge); 1055 - dprintk(" max_sge_rd : %d\n", newxprt->sc_max_sge_rd); 1056 1059 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); 1057 1060 dprintk(" max_requests : %d\n", newxprt->sc_max_requests); 1058 1061 dprintk(" ord : %d\n", newxprt->sc_ord); ··· 1112 1117 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 1113 1118 kref_read(&xprt->xpt_ref)); 1114 1119 1115 - /* 1116 - * Destroy queued, but not processed read completions. Note 1117 - * that this cleanup has to be done before destroying the 1118 - * cm_id because the device ptr is needed to unmap the dma in 1119 - * svc_rdma_put_context. 1120 - */ 1121 1120 while (!list_empty(&rdma->sc_read_complete_q)) { 1122 1121 struct svc_rdma_op_ctxt *ctxt; 1123 1122 ctxt = list_first_entry(&rdma->sc_read_complete_q, ··· 1119 1130 list_del(&ctxt->list); 1120 1131 svc_rdma_put_context(ctxt, 1); 1121 1132 } 1122 - 1123 - /* Destroy queued, but not processed recv completions */ 1124 1133 while (!list_empty(&rdma->sc_rq_dto_q)) { 1125 1134 struct svc_rdma_op_ctxt *ctxt; 1126 1135 ctxt = list_first_entry(&rdma->sc_rq_dto_q,