Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

net/rds: Handle ODP mr registration/unregistration

On-Demand-Paging MRs are registered using ib_reg_user_mr and
unregistered with ib_dereg_mr.

Signed-off-by: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: Leon Romanovsky <leonro@mellanox.com>

authored by

Hans Westgaard Ry and committed by
Leon Romanovsky
2eafa174 c4c86abb

+243 -55
+7
net/rds/ib.c
··· 156 156 has_fmr = (device->ops.alloc_fmr && device->ops.dealloc_fmr && 157 157 device->ops.map_phys_fmr && device->ops.unmap_fmr); 158 158 rds_ibdev->use_fastreg = (has_fr && !has_fmr); 159 + rds_ibdev->odp_capable = 160 + !!(device->attrs.device_cap_flags & 161 + IB_DEVICE_ON_DEMAND_PAGING) && 162 + !!(device->attrs.odp_caps.per_transport_caps.rc_odp_caps & 163 + IB_ODP_SUPPORT_WRITE) && 164 + !!(device->attrs.odp_caps.per_transport_caps.rc_odp_caps & 165 + IB_ODP_SUPPORT_READ); 159 166 160 167 rds_ibdev->fmr_max_remaps = device->attrs.max_map_per_fmr?: 32; 161 168 rds_ibdev->max_1m_mrs = device->attrs.max_mr ?
+2 -1
net/rds/ib.h
··· 247 247 struct ib_device *dev; 248 248 struct ib_pd *pd; 249 249 struct dma_pool *rid_hdrs_pool; /* RDS headers DMA pool */ 250 - bool use_fastreg; 250 + u8 use_fastreg:1; 251 + u8 odp_capable:1; 251 252 252 253 unsigned int max_mrs; 253 254 struct rds_ib_mr_pool *mr_1m_pool;
+6 -1
net/rds/ib_mr.h
··· 67 67 68 68 /* This is stored as mr->r_trans_private. */ 69 69 struct rds_ib_mr { 70 + struct delayed_work work; 70 71 struct rds_ib_device *device; 71 72 struct rds_ib_mr_pool *pool; 72 73 struct rds_ib_connection *ic; ··· 82 81 unsigned int sg_len; 83 82 int sg_dma_len; 84 83 84 + u8 odp:1; 85 85 union { 86 86 struct rds_ib_fmr fmr; 87 87 struct rds_ib_frmr frmr; 88 + struct ib_mr *mr; 88 89 } u; 89 90 }; 90 91 ··· 125 122 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *); 126 123 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, 127 124 struct rds_sock *rs, u32 *key_ret, 128 - struct rds_connection *conn); 125 + struct rds_connection *conn, u64 start, u64 length, 126 + int need_odp); 129 127 void rds_ib_sync_mr(void *trans_private, int dir); 130 128 void rds_ib_free_mr(void *trans_private, int invalidate); 131 129 void rds_ib_flush_mrs(void); 132 130 int rds_ib_mr_init(void); 133 131 void rds_ib_mr_exit(void); 132 + u32 rds_ib_get_lkey(void *trans_private); 134 133 135 134 void __rds_ib_teardown_mr(struct rds_ib_mr *); 136 135 void rds_ib_teardown_mr(struct rds_ib_mr *);
+74 -1
net/rds/ib_rdma.c
··· 37 37 38 38 #include "rds_single_path.h" 39 39 #include "ib_mr.h" 40 + #include "rds.h" 40 41 41 42 struct workqueue_struct *rds_ib_mr_wq; 43 + struct rds_ib_dereg_odp_mr { 44 + struct work_struct work; 45 + struct ib_mr *mr; 46 + }; 47 + 48 + static void rds_ib_odp_mr_worker(struct work_struct *work); 42 49 43 50 static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr) 44 51 { ··· 219 212 { 220 213 struct rds_ib_mr *ibmr = trans_private; 221 214 struct rds_ib_device *rds_ibdev = ibmr->device; 215 + 216 + if (ibmr->odp) 217 + return; 222 218 223 219 switch (direction) { 224 220 case DMA_FROM_DEVICE: ··· 492 482 493 483 rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len); 494 484 485 + if (ibmr->odp) { 486 + /* A MR created and marked as use_once. We use delayed work, 487 + * because there is a change that we are in interrupt and can't 488 + * call to ib_dereg_mr() directly. 489 + */ 490 + INIT_DELAYED_WORK(&ibmr->work, rds_ib_odp_mr_worker); 491 + queue_delayed_work(rds_ib_mr_wq, &ibmr->work, 0); 492 + return; 493 + } 494 + 495 495 /* Return it to the pool's free list */ 496 496 if (rds_ibdev->use_fastreg) 497 497 rds_ib_free_frmr_list(ibmr); ··· 546 526 up_read(&rds_ib_devices_lock); 547 527 } 548 528 529 + u32 rds_ib_get_lkey(void *trans_private) 530 + { 531 + struct rds_ib_mr *ibmr = trans_private; 532 + 533 + return ibmr->u.mr->lkey; 534 + } 535 + 549 536 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, 550 537 struct rds_sock *rs, u32 *key_ret, 551 - struct rds_connection *conn) 538 + struct rds_connection *conn, 539 + u64 start, u64 length, int need_odp) 552 540 { 553 541 struct rds_ib_device *rds_ibdev; 554 542 struct rds_ib_mr *ibmr = NULL; ··· 567 539 if (!rds_ibdev) { 568 540 ret = -ENODEV; 569 541 goto out; 542 + } 543 + 544 + if (need_odp == ODP_ZEROBASED || need_odp == ODP_VIRTUAL) { 545 + u64 virt_addr = need_odp == ODP_ZEROBASED ? 0 : start; 546 + int access_flags = 547 + (IB_ACCESS_LOCAL_WRITE | IB_ACCESS_REMOTE_READ | 548 + IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_ATOMIC | 549 + IB_ACCESS_ON_DEMAND); 550 + struct ib_mr *ib_mr; 551 + 552 + if (!rds_ibdev->odp_capable) { 553 + ret = -EOPNOTSUPP; 554 + goto out; 555 + } 556 + 557 + ib_mr = ib_reg_user_mr(rds_ibdev->pd, start, length, virt_addr, 558 + access_flags); 559 + 560 + if (IS_ERR(ib_mr)) { 561 + rdsdebug("rds_ib_get_user_mr returned %d\n", 562 + IS_ERR(ib_mr)); 563 + ret = PTR_ERR(ib_mr); 564 + goto out; 565 + } 566 + if (key_ret) 567 + *key_ret = ib_mr->rkey; 568 + 569 + ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL); 570 + if (!ibmr) { 571 + ib_dereg_mr(ib_mr); 572 + ret = -ENOMEM; 573 + goto out; 574 + } 575 + ibmr->u.mr = ib_mr; 576 + ibmr->odp = 1; 577 + return ibmr; 570 578 } 571 579 572 580 if (conn) ··· 692 628 void rds_ib_mr_exit(void) 693 629 { 694 630 destroy_workqueue(rds_ib_mr_wq); 631 + } 632 + 633 + static void rds_ib_odp_mr_worker(struct work_struct *work) 634 + { 635 + struct rds_ib_mr *ibmr; 636 + 637 + ibmr = container_of(work, struct rds_ib_mr, work.work); 638 + ib_dereg_mr(ibmr->u.mr); 639 + kfree(ibmr); 695 640 }
+31 -13
net/rds/ib_send.c
··· 39 39 #include "rds_single_path.h" 40 40 #include "rds.h" 41 41 #include "ib.h" 42 + #include "ib_mr.h" 42 43 43 44 /* 44 45 * Convert IB-specific error message to RDS error message and call core ··· 636 635 send->s_sge[0].addr = ic->i_send_hdrs_dma[pos]; 637 636 638 637 send->s_sge[0].length = sizeof(struct rds_header); 638 + send->s_sge[0].lkey = ic->i_pd->local_dma_lkey; 639 639 640 640 memcpy(ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, 641 641 sizeof(struct rds_header)); ··· 652 650 send->s_sge[1].addr = sg_dma_address(scat); 653 651 send->s_sge[1].addr += rm->data.op_dmaoff; 654 652 send->s_sge[1].length = len; 653 + send->s_sge[1].lkey = ic->i_pd->local_dma_lkey; 655 654 656 655 bytes_sent += len; 657 656 rm->data.op_dmaoff += len; ··· 861 858 int ret; 862 859 int num_sge; 863 860 int nr_sig = 0; 861 + u64 odp_addr = op->op_odp_addr; 862 + u32 odp_lkey = 0; 864 863 865 864 /* map the op the first time we see it */ 866 - if (!op->op_mapped) { 867 - op->op_count = ib_dma_map_sg(ic->i_cm_id->device, 868 - op->op_sg, op->op_nents, (op->op_write) ? 869 - DMA_TO_DEVICE : DMA_FROM_DEVICE); 870 - rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count); 871 - if (op->op_count == 0) { 872 - rds_ib_stats_inc(s_ib_tx_sg_mapping_failure); 873 - ret = -ENOMEM; /* XXX ? */ 874 - goto out; 865 + if (!op->op_odp_mr) { 866 + if (!op->op_mapped) { 867 + op->op_count = 868 + ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 869 + op->op_nents, 870 + (op->op_write) ? DMA_TO_DEVICE : 871 + DMA_FROM_DEVICE); 872 + rdsdebug("ic %p mapping op %p: %d\n", ic, op, 873 + op->op_count); 874 + if (op->op_count == 0) { 875 + rds_ib_stats_inc(s_ib_tx_sg_mapping_failure); 876 + ret = -ENOMEM; /* XXX ? */ 877 + goto out; 878 + } 879 + op->op_mapped = 1; 875 880 } 876 - 877 - op->op_mapped = 1; 881 + } else { 882 + op->op_count = op->op_nents; 883 + odp_lkey = rds_ib_get_lkey(op->op_odp_mr->r_trans_private); 878 884 } 879 885 880 886 /* ··· 935 923 for (j = 0; j < send->s_rdma_wr.wr.num_sge && 936 924 scat != &op->op_sg[op->op_count]; j++) { 937 925 len = sg_dma_len(scat); 938 - send->s_sge[j].addr = sg_dma_address(scat); 926 + if (!op->op_odp_mr) { 927 + send->s_sge[j].addr = sg_dma_address(scat); 928 + send->s_sge[j].lkey = ic->i_pd->local_dma_lkey; 929 + } else { 930 + send->s_sge[j].addr = odp_addr; 931 + send->s_sge[j].lkey = odp_lkey; 932 + } 939 933 send->s_sge[j].length = len; 940 - send->s_sge[j].lkey = ic->i_pd->local_dma_lkey; 941 934 942 935 sent += len; 943 936 rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr); 944 937 945 938 remote_addr += len; 939 + odp_addr += len; 946 940 scat++; 947 941 } 948 942
+112 -37
net/rds/rdma.c
··· 177 177 struct rds_conn_path *cp) 178 178 { 179 179 struct rds_mr *mr = NULL, *found; 180 + struct scatterlist *sg = NULL; 180 181 unsigned int nr_pages; 181 182 struct page **pages = NULL; 182 - struct scatterlist *sg; 183 183 void *trans_private; 184 184 unsigned long flags; 185 185 rds_rdma_cookie_t cookie; 186 - unsigned int nents; 186 + unsigned int nents = 0; 187 + int need_odp = 0; 187 188 long i; 188 189 int ret; 189 190 ··· 195 194 196 195 if (!rs->rs_transport->get_mr) { 197 196 ret = -EOPNOTSUPP; 197 + goto out; 198 + } 199 + 200 + /* If the combination of the addr and size requested for this memory 201 + * region causes an integer overflow, return error. 202 + */ 203 + if (((args->vec.addr + args->vec.bytes) < args->vec.addr) || 204 + PAGE_ALIGN(args->vec.addr + args->vec.bytes) < 205 + (args->vec.addr + args->vec.bytes)) { 206 + ret = -EINVAL; 207 + goto out; 208 + } 209 + 210 + if (!can_do_mlock()) { 211 + ret = -EPERM; 198 212 goto out; 199 213 } 200 214 ··· 266 250 * the zero page. 267 251 */ 268 252 ret = rds_pin_pages(args->vec.addr, nr_pages, pages, 1); 269 - if (ret < 0) 253 + if (ret == -EOPNOTSUPP) { 254 + need_odp = 1; 255 + } else if (ret <= 0) { 270 256 goto out; 257 + } else { 258 + nents = ret; 259 + sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL); 260 + if (!sg) { 261 + ret = -ENOMEM; 262 + goto out; 263 + } 264 + WARN_ON(!nents); 265 + sg_init_table(sg, nents); 271 266 272 - nents = ret; 273 - sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL); 274 - if (!sg) { 275 - ret = -ENOMEM; 276 - goto out; 267 + /* Stick all pages into the scatterlist */ 268 + for (i = 0 ; i < nents; i++) 269 + sg_set_page(&sg[i], pages[i], PAGE_SIZE, 0); 270 + 271 + rdsdebug("RDS: trans_private nents is %u\n", nents); 277 272 } 278 - WARN_ON(!nents); 279 - sg_init_table(sg, nents); 280 - 281 - /* Stick all pages into the scatterlist */ 282 - for (i = 0 ; i < nents; i++) 283 - sg_set_page(&sg[i], pages[i], PAGE_SIZE, 0); 284 - 285 - rdsdebug("RDS: trans_private nents is %u\n", nents); 286 - 287 273 /* Obtain a transport specific MR. If this succeeds, the 288 274 * s/g list is now owned by the MR. 289 275 * Note that dma_map() implies that pending writes are 290 276 * flushed to RAM, so no dma_sync is needed here. */ 291 - trans_private = rs->rs_transport->get_mr(sg, nents, rs, 292 - &mr->r_key, 293 - cp ? cp->cp_conn : NULL); 277 + trans_private = rs->rs_transport->get_mr( 278 + sg, nents, rs, &mr->r_key, cp ? cp->cp_conn : NULL, 279 + args->vec.addr, args->vec.bytes, 280 + need_odp ? ODP_ZEROBASED : ODP_NOT_NEEDED); 294 281 295 282 if (IS_ERR(trans_private)) { 296 - for (i = 0 ; i < nents; i++) 297 - put_page(sg_page(&sg[i])); 298 - kfree(sg); 283 + /* In ODP case, we don't GUP pages, so don't need 284 + * to release anything. 285 + */ 286 + if (!need_odp) { 287 + for (i = 0 ; i < nents; i++) 288 + put_page(sg_page(&sg[i])); 289 + kfree(sg); 290 + } 299 291 ret = PTR_ERR(trans_private); 300 292 goto out; 301 293 } ··· 317 293 * map page aligned regions. So we keep the offset, and build 318 294 * a 64bit cookie containing <R_Key, offset> and pass that 319 295 * around. */ 320 - cookie = rds_rdma_make_cookie(mr->r_key, args->vec.addr & ~PAGE_MASK); 296 + if (need_odp) 297 + cookie = rds_rdma_make_cookie(mr->r_key, 0); 298 + else 299 + cookie = rds_rdma_make_cookie(mr->r_key, 300 + args->vec.addr & ~PAGE_MASK); 321 301 if (cookie_ret) 322 302 *cookie_ret = cookie; 323 303 ··· 486 458 { 487 459 unsigned int i; 488 460 489 - for (i = 0; i < ro->op_nents; i++) { 490 - struct page *page = sg_page(&ro->op_sg[i]); 461 + if (ro->op_odp_mr) { 462 + rds_mr_put(ro->op_odp_mr); 463 + } else { 464 + for (i = 0; i < ro->op_nents; i++) { 465 + struct page *page = sg_page(&ro->op_sg[i]); 491 466 492 - /* Mark page dirty if it was possibly modified, which 493 - * is the case for a RDMA_READ which copies from remote 494 - * to local memory */ 495 - if (!ro->op_write) { 496 - WARN_ON(!page->mapping && irqs_disabled()); 497 - set_page_dirty(page); 467 + /* Mark page dirty if it was possibly modified, which 468 + * is the case for a RDMA_READ which copies from remote 469 + * to local memory 470 + */ 471 + if (!ro->op_write) 472 + set_page_dirty(page); 473 + put_page(page); 498 474 } 499 - put_page(page); 500 475 } 501 476 502 477 kfree(ro->op_notifier); 503 478 ro->op_notifier = NULL; 504 479 ro->op_active = 0; 480 + ro->op_odp_mr = NULL; 505 481 } 506 482 507 483 void rds_atomic_free_op(struct rm_atomic_op *ao) ··· 615 583 struct rds_iovec *iovs; 616 584 unsigned int i, j; 617 585 int ret = 0; 586 + bool odp_supported = true; 618 587 619 588 if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args)) 620 589 || rm->rdma.op_active) ··· 637 604 ret = -EINVAL; 638 605 goto out_ret; 639 606 } 607 + /* odp-mr is not supported for multiple requests within one message */ 608 + if (args->nr_local != 1) 609 + odp_supported = false; 640 610 641 611 iovs = vec->iov; 642 612 ··· 661 625 op->op_silent = !!(args->flags & RDS_RDMA_SILENT); 662 626 op->op_active = 1; 663 627 op->op_recverr = rs->rs_recverr; 628 + op->op_odp_mr = NULL; 629 + 664 630 WARN_ON(!nr_pages); 665 631 op->op_sg = rds_message_alloc_sgs(rm, nr_pages, &ret); 666 632 if (!op->op_sg) ··· 712 674 * If it's a READ operation, we need to pin the pages for writing. 713 675 */ 714 676 ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write); 715 - if (ret < 0) 677 + if ((!odp_supported && ret <= 0) || 678 + (odp_supported && ret <= 0 && ret != -EOPNOTSUPP)) 716 679 goto out_pages; 717 - else 718 - ret = 0; 680 + 681 + if (ret == -EOPNOTSUPP) { 682 + struct rds_mr *local_odp_mr; 683 + 684 + if (!rs->rs_transport->get_mr) { 685 + ret = -EOPNOTSUPP; 686 + goto out_pages; 687 + } 688 + local_odp_mr = 689 + kzalloc(sizeof(*local_odp_mr), GFP_KERNEL); 690 + if (!local_odp_mr) { 691 + ret = -ENOMEM; 692 + goto out_pages; 693 + } 694 + RB_CLEAR_NODE(&local_odp_mr->r_rb_node); 695 + refcount_set(&local_odp_mr->r_refcount, 1); 696 + local_odp_mr->r_trans = rs->rs_transport; 697 + local_odp_mr->r_sock = rs; 698 + local_odp_mr->r_trans_private = 699 + rs->rs_transport->get_mr( 700 + NULL, 0, rs, &local_odp_mr->r_key, NULL, 701 + iov->addr, iov->bytes, ODP_VIRTUAL); 702 + if (IS_ERR(local_odp_mr->r_trans_private)) { 703 + ret = IS_ERR(local_odp_mr->r_trans_private); 704 + rdsdebug("get_mr ret %d %p\"", ret, 705 + local_odp_mr->r_trans_private); 706 + kfree(local_odp_mr); 707 + ret = -EOPNOTSUPP; 708 + goto out_pages; 709 + } 710 + rdsdebug("Need odp; local_odp_mr %p trans_private %p\n", 711 + local_odp_mr, local_odp_mr->r_trans_private); 712 + op->op_odp_mr = local_odp_mr; 713 + op->op_odp_addr = iov->addr; 714 + } 719 715 720 716 rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n", 721 717 nr_bytes, nr, iov->bytes, iov->addr); ··· 765 693 min_t(unsigned int, iov->bytes, PAGE_SIZE - offset), 766 694 offset); 767 695 696 + sg_dma_len(sg) = sg->length; 768 697 rdsdebug("RDS: sg->offset %x sg->len %x iov->addr %llx iov->bytes %llu\n", 769 698 sg->offset, sg->length, iov->addr, iov->bytes); 770 699 ··· 784 711 goto out_pages; 785 712 } 786 713 op->op_bytes = nr_bytes; 714 + ret = 0; 787 715 788 716 out_pages: 789 717 kfree(pages); ··· 831 757 spin_unlock_irqrestore(&rs->rs_rdma_lock, flags); 832 758 833 759 if (mr) { 834 - mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE); 760 + mr->r_trans->sync_mr(mr->r_trans_private, 761 + DMA_TO_DEVICE); 835 762 rm->rdma.op_rdma_mr = mr; 836 763 } 837 764 return err;
+11 -2
net/rds/rds.h
··· 40 40 #ifdef ATOMIC64_INIT 41 41 #define KERNEL_HAS_ATOMIC64 42 42 #endif 43 - 44 43 #ifdef RDS_DEBUG 45 44 #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args) 46 45 #else ··· 477 478 struct rds_notifier *op_notifier; 478 479 479 480 struct rds_mr *op_rdma_mr; 481 + 482 + u64 op_odp_addr; 483 + struct rds_mr *op_odp_mr; 480 484 } rdma; 481 485 struct rm_data_op { 482 486 unsigned int op_active:1; ··· 575 573 void (*exit)(void); 576 574 void *(*get_mr)(struct scatterlist *sg, unsigned long nr_sg, 577 575 struct rds_sock *rs, u32 *key_ret, 578 - struct rds_connection *conn); 576 + struct rds_connection *conn, 577 + u64 start, u64 length, int need_odp); 579 578 void (*sync_mr)(void *trans_private, int direction); 580 579 void (*free_mr)(void *trans_private, int invalidate); 581 580 void (*flush_mrs)(void); ··· 958 955 return !check_net(rds_conn_net(conn)) || 959 956 (conn->c_trans->t_unloading && conn->c_trans->t_unloading(conn)); 960 957 } 958 + 959 + enum { 960 + ODP_NOT_NEEDED, 961 + ODP_ZEROBASED, 962 + ODP_VIRTUAL 963 + }; 961 964 962 965 /* stats.c */ 963 966 DECLARE_PER_CPU_SHARED_ALIGNED(struct rds_statistics, rds_stats);