SVCRDMA: Add xprt refs to fix close/unmount crash

RDMA connection shutdown on an SMP machine can cause a kernel crash due
to the transport close path racing with the I/O tasklet.

Additional transport references were added as follows:
- A reference when on the DTO Q to avoid having the transport
deleted while queued for I/O.
- A reference while there is a QP able to generate events.
- A reference until the DISCONNECTED event is received on the CM ID

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
Signed-off-by: J. Bruce Fields <bfields@citi.umich.edu>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>

authored by Tom Tucker and committed by Linus Torvalds c48cbb40 ee27a558

+62 -42
+62 -42
net/sunrpc/xprtrdma/svc_rdma_transport.c
··· 54 int flags); 55 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 56 static void svc_rdma_release_rqst(struct svc_rqst *); 57 - static void rdma_destroy_xprt(struct svcxprt_rdma *xprt); 58 static void dto_tasklet_func(unsigned long data); 59 static void svc_rdma_detach(struct svc_xprt *xprt); 60 static void svc_rdma_free(struct svc_xprt *xprt); ··· 246 sq_cq_reap(xprt); 247 } 248 249 spin_lock_irqsave(&dto_lock, flags); 250 } 251 spin_unlock_irqrestore(&dto_lock, flags); ··· 275 * add it 276 */ 277 spin_lock_irqsave(&dto_lock, flags); 278 - if (list_empty(&xprt->sc_dto_q)) 279 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 280 spin_unlock_irqrestore(&dto_lock, flags); 281 282 /* Tasklet does all the work to avoid irqsave locks. */ ··· 388 * add it 389 */ 390 spin_lock_irqsave(&dto_lock, flags); 391 - if (list_empty(&xprt->sc_dto_q)) 392 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 393 spin_unlock_irqrestore(&dto_lock, flags); 394 395 /* Tasklet does all the work to avoid irqsave locks. */ ··· 615 switch (event->event) { 616 case RDMA_CM_EVENT_ESTABLISHED: 617 /* Accept complete */ 618 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 619 "cm_id=%p\n", xprt, cma_id); 620 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); ··· 666 667 listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); 668 if (IS_ERR(listen_id)) { 669 - rdma_destroy_xprt(cma_xprt); 670 dprintk("svcrdma: rdma_create_id failed = %ld\n", 671 PTR_ERR(listen_id)); 672 return (void *)listen_id; 673 } 674 ret = rdma_bind_addr(listen_id, sa); 675 if (ret) { 676 - rdma_destroy_xprt(cma_xprt); 677 rdma_destroy_id(listen_id); 678 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 679 return ERR_PTR(ret); 680 } ··· 683 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 684 if (ret) { 685 rdma_destroy_id(listen_id); 686 - rdma_destroy_xprt(cma_xprt); 687 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 688 } 689 690 /* ··· 826 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr; 827 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr; 828 } 829 newxprt->sc_qp = newxprt->sc_cm_id->qp; 830 831 /* Register all of physical memory */ ··· 898 899 errout: 900 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 901 rdma_destroy_id(newxprt->sc_cm_id); 902 - rdma_destroy_xprt(newxprt); 903 return NULL; 904 } 905 ··· 933 rqstp->rq_xprt_ctxt = NULL; 934 } 935 936 - /* Disable data ready events for this connection */ 937 static void svc_rdma_detach(struct svc_xprt *xprt) 938 { 939 struct svcxprt_rdma *rdma = 940 container_of(xprt, struct svcxprt_rdma, sc_xprt); 941 - unsigned long flags; 942 - 943 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 944 - /* 945 - * Shutdown the connection. This will ensure we don't get any 946 - * more events from the provider. 947 - */ 948 - rdma_disconnect(rdma->sc_cm_id); 949 - rdma_destroy_id(rdma->sc_cm_id); 950 951 - /* We may already be on the DTO list */ 952 - spin_lock_irqsave(&dto_lock, flags); 953 - if (!list_empty(&rdma->sc_dto_q)) 954 - list_del_init(&rdma->sc_dto_q); 955 - spin_unlock_irqrestore(&dto_lock, flags); 956 } 957 958 static void svc_rdma_free(struct svc_xprt *xprt) 959 { 960 struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt; 961 dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); 962 - rdma_destroy_xprt(rdma); 963 kfree(rdma); 964 - } 965 - 966 - static void rdma_destroy_xprt(struct svcxprt_rdma *xprt) 967 - { 968 - if (xprt->sc_qp && !IS_ERR(xprt->sc_qp)) 969 - ib_destroy_qp(xprt->sc_qp); 970 - 971 - if (xprt->sc_sq_cq && !IS_ERR(xprt->sc_sq_cq)) 972 - ib_destroy_cq(xprt->sc_sq_cq); 973 - 974 - if (xprt->sc_rq_cq && !IS_ERR(xprt->sc_rq_cq)) 975 - ib_destroy_cq(xprt->sc_rq_cq); 976 - 977 - if (xprt->sc_phys_mr && !IS_ERR(xprt->sc_phys_mr)) 978 - ib_dereg_mr(xprt->sc_phys_mr); 979 - 980 - if (xprt->sc_pd && !IS_ERR(xprt->sc_pd)) 981 - ib_dealloc_pd(xprt->sc_pd); 982 - 983 - destroy_context_cache(xprt->sc_ctxt_head); 984 } 985 986 static int svc_rdma_has_wspace(struct svc_xprt *xprt)
··· 54 int flags); 55 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 56 static void svc_rdma_release_rqst(struct svc_rqst *); 57 static void dto_tasklet_func(unsigned long data); 58 static void svc_rdma_detach(struct svc_xprt *xprt); 59 static void svc_rdma_free(struct svc_xprt *xprt); ··· 247 sq_cq_reap(xprt); 248 } 249 250 + svc_xprt_put(&xprt->sc_xprt); 251 spin_lock_irqsave(&dto_lock, flags); 252 } 253 spin_unlock_irqrestore(&dto_lock, flags); ··· 275 * add it 276 */ 277 spin_lock_irqsave(&dto_lock, flags); 278 + if (list_empty(&xprt->sc_dto_q)) { 279 + svc_xprt_get(&xprt->sc_xprt); 280 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 281 + } 282 spin_unlock_irqrestore(&dto_lock, flags); 283 284 /* Tasklet does all the work to avoid irqsave locks. */ ··· 386 * add it 387 */ 388 spin_lock_irqsave(&dto_lock, flags); 389 + if (list_empty(&xprt->sc_dto_q)) { 390 + svc_xprt_get(&xprt->sc_xprt); 391 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 392 + } 393 spin_unlock_irqrestore(&dto_lock, flags); 394 395 /* Tasklet does all the work to avoid irqsave locks. */ ··· 611 switch (event->event) { 612 case RDMA_CM_EVENT_ESTABLISHED: 613 /* Accept complete */ 614 + svc_xprt_get(xprt); 615 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 616 "cm_id=%p\n", xprt, cma_id); 617 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); ··· 661 662 listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); 663 if (IS_ERR(listen_id)) { 664 + svc_xprt_put(&cma_xprt->sc_xprt); 665 dprintk("svcrdma: rdma_create_id failed = %ld\n", 666 PTR_ERR(listen_id)); 667 return (void *)listen_id; 668 } 669 ret = rdma_bind_addr(listen_id, sa); 670 if (ret) { 671 rdma_destroy_id(listen_id); 672 + svc_xprt_put(&cma_xprt->sc_xprt); 673 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 674 return ERR_PTR(ret); 675 } ··· 678 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 679 if (ret) { 680 rdma_destroy_id(listen_id); 681 + svc_xprt_put(&cma_xprt->sc_xprt); 682 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 683 + return ERR_PTR(ret); 684 } 685 686 /* ··· 820 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr; 821 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr; 822 } 823 + svc_xprt_get(&newxprt->sc_xprt); 824 newxprt->sc_qp = newxprt->sc_cm_id->qp; 825 826 /* Register all of physical memory */ ··· 891 892 errout: 893 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 894 + /* Take a reference in case the DTO handler runs */ 895 + svc_xprt_get(&newxprt->sc_xprt); 896 + if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) { 897 + ib_destroy_qp(newxprt->sc_qp); 898 + svc_xprt_put(&newxprt->sc_xprt); 899 + } 900 rdma_destroy_id(newxprt->sc_cm_id); 901 + /* This call to put will destroy the transport */ 902 + svc_xprt_put(&newxprt->sc_xprt); 903 return NULL; 904 } 905 ··· 919 rqstp->rq_xprt_ctxt = NULL; 920 } 921 922 + /* 923 + * When connected, an svc_xprt has at least three references: 924 + * 925 + * - A reference held by the QP. We still hold that here because this 926 + * code deletes the QP and puts the reference. 927 + * 928 + * - A reference held by the cm_id between the ESTABLISHED and 929 + * DISCONNECTED events. If the remote peer disconnected first, this 930 + * reference could be gone. 931 + * 932 + * - A reference held by the svc_recv code that called this function 933 + * as part of close processing. 934 + * 935 + * At a minimum two references should still be held. 936 + */ 937 static void svc_rdma_detach(struct svc_xprt *xprt) 938 { 939 struct svcxprt_rdma *rdma = 940 container_of(xprt, struct svcxprt_rdma, sc_xprt); 941 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 942 943 + /* Disconnect and flush posted WQE */ 944 + rdma_disconnect(rdma->sc_cm_id); 945 + 946 + /* Destroy the QP if present (not a listener) */ 947 + if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) { 948 + ib_destroy_qp(rdma->sc_qp); 949 + svc_xprt_put(xprt); 950 + } 951 + 952 + /* Destroy the CM ID */ 953 + rdma_destroy_id(rdma->sc_cm_id); 954 } 955 956 static void svc_rdma_free(struct svc_xprt *xprt) 957 { 958 struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt; 959 dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); 960 + /* We should only be called from kref_put */ 961 + BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0); 962 + if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 963 + ib_destroy_cq(rdma->sc_sq_cq); 964 + 965 + if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 966 + ib_destroy_cq(rdma->sc_rq_cq); 967 + 968 + if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr)) 969 + ib_dereg_mr(rdma->sc_phys_mr); 970 + 971 + if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 972 + ib_dealloc_pd(rdma->sc_pd); 973 + 974 + destroy_context_cache(rdma->sc_ctxt_head); 975 kfree(rdma); 976 } 977 978 static int svc_rdma_has_wspace(struct svc_xprt *xprt)