Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'for-2.6.26' of git://linux-nfs.org/~bfields/linux

* 'for-2.6.26' of git://linux-nfs.org/~bfields/linux: (25 commits)
svcrdma: Verify read-list fits within RPCSVC_MAXPAGES
svcrdma: Change svc_rdma_send_error return type to void
svcrdma: Copy transport address and arm CQ before calling rdma_accept
svcrdma: Set rqstp transport address in rdma_read_complete function
svcrdma: Use ib verbs version of dma_unmap
svcrdma: Cleanup queued, but unprocessed I/O in svc_rdma_free
svcrdma: Move the QP and cm_id destruction to svc_rdma_free
svcrdma: Add reference for each SQ/RQ WR
svcrdma: Move destroy to kernel thread
svcrdma: Shrink scope of spinlock on RQ CQ
svcrdma: Use standard Linux lists for context cache
svcrdma: Simplify RDMA_READ deferral buffer management
svcrdma: Remove unused READ_DONE context flags bit
svcrdma: Return error from rdma_read_xdr so caller knows to free context
svcrdma: Fix error handling during listening endpoint creation
svcrdma: Free context on post_recv error in send_reply
svcrdma: Free context on ib_post_recv error
svcrdma: Add put of connection ESTABLISHED reference in rdma_cma_handler
svcrdma: Fix return value in svc_rdma_send
svcrdma: Fix race with dto_tasklet in svc_rdma_send
...

+226 -218
+1 -1
fs/nfsd/nfs4callback.c
··· 419 419 out_release_client: 420 420 rpc_shutdown_client(client); 421 421 out_err: 422 - put_nfs4_client(clp); 423 422 dprintk("NFSD: warning: no callback path to client %.*s\n", 424 423 (int)clp->cl_name.len, clp->cl_name.data); 424 + put_nfs4_client(clp); 425 425 return status; 426 426 } 427 427
+7 -5
include/linux/sunrpc/svc_rdma.h
··· 71 71 * completes. 72 72 */ 73 73 struct svc_rdma_op_ctxt { 74 - struct svc_rdma_op_ctxt *next; 74 + struct svc_rdma_op_ctxt *read_hdr; 75 + struct list_head free_list; 75 76 struct xdr_buf arg; 76 77 struct list_head dto_q; 77 78 enum ib_wr_opcode wr_op; ··· 86 85 struct page *pages[RPCSVC_MAXPAGES]; 87 86 }; 88 87 89 - #define RDMACTXT_F_READ_DONE 1 90 88 #define RDMACTXT_F_LAST_CTXT 2 91 89 92 90 struct svcxprt_rdma { ··· 104 104 105 105 struct ib_pd *sc_pd; 106 106 107 - struct svc_rdma_op_ctxt *sc_ctxt_head; 107 + atomic_t sc_ctxt_used; 108 + struct list_head sc_ctxt_free; 108 109 int sc_ctxt_cnt; 109 110 int sc_ctxt_bump; 110 111 int sc_ctxt_max; ··· 124 123 struct list_head sc_dto_q; /* DTO tasklet I/O pending Q */ 125 124 struct list_head sc_read_complete_q; 126 125 spinlock_t sc_read_complete_lock; 126 + struct work_struct sc_work; 127 127 }; 128 128 /* sc_flags */ 129 129 #define RDMAXPRT_RQ_PENDING 1 ··· 166 164 167 165 /* svc_rdma_transport.c */ 168 166 extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *); 169 - extern int svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *, 170 - enum rpcrdma_errcode); 167 + extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *, 168 + enum rpcrdma_errcode); 171 169 struct page *svc_rdma_get_page(void); 172 170 extern int svc_rdma_post_recv(struct svcxprt_rdma *); 173 171 extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
-23
net/sunrpc/svc_xprt.c
··· 6 6 7 7 #include <linux/sched.h> 8 8 #include <linux/errno.h> 9 - #include <linux/fcntl.h> 10 - #include <linux/net.h> 11 - #include <linux/in.h> 12 - #include <linux/inet.h> 13 - #include <linux/udp.h> 14 - #include <linux/tcp.h> 15 - #include <linux/unistd.h> 16 - #include <linux/slab.h> 17 - #include <linux/netdevice.h> 18 - #include <linux/skbuff.h> 19 - #include <linux/file.h> 20 9 #include <linux/freezer.h> 21 10 #include <linux/kthread.h> 22 11 #include <net/sock.h> 23 - #include <net/checksum.h> 24 - #include <net/ip.h> 25 - #include <net/ipv6.h> 26 - #include <net/tcp_states.h> 27 - #include <linux/uaccess.h> 28 - #include <asm/ioctls.h> 29 - 30 - #include <linux/sunrpc/types.h> 31 - #include <linux/sunrpc/clnt.h> 32 - #include <linux/sunrpc/xdr.h> 33 12 #include <linux/sunrpc/stats.h> 34 13 #include <linux/sunrpc/svc_xprt.h> 35 14 ··· 274 295 275 296 if (!(xprt->xpt_flags & 276 297 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 277 - return; 278 - if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 279 298 return; 280 299 281 300 cpu = get_cpu();
+2 -2
net/sunrpc/svcauth_unix.c
··· 278 278 dom = im->m_client->h.name; 279 279 280 280 if (ipv6_addr_v4mapped(&addr)) { 281 - seq_printf(m, "%s" NIPQUAD_FMT "%s\n", 281 + seq_printf(m, "%s " NIPQUAD_FMT " %s\n", 282 282 im->m_class, 283 283 ntohl(addr.s6_addr32[3]) >> 24 & 0xff, 284 284 ntohl(addr.s6_addr32[3]) >> 16 & 0xff, ··· 286 286 ntohl(addr.s6_addr32[3]) >> 0 & 0xff, 287 287 dom); 288 288 } else { 289 - seq_printf(m, "%s" NIP6_FMT "%s\n", 289 + seq_printf(m, "%s " NIP6_FMT " %s\n", 290 290 im->m_class, NIP6(addr), dom); 291 291 } 292 292 return 0;
+31 -71
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
··· 260 260 * On our side, we need to read into a pagelist. The first page immediately 261 261 * follows the RPC header. 262 262 * 263 - * This function returns 1 to indicate success. The data is not yet in 263 + * This function returns: 264 + * 0 - No error and no read-list found. 265 + * 266 + * 1 - Successful read-list processing. The data is not yet in 264 267 * the pagelist and therefore the RPC request must be deferred. The 265 268 * I/O completion will enqueue the transport again and 266 269 * svc_rdma_recvfrom will complete the request. 270 + * 271 + * <0 - Error processing/posting read-list. 267 272 * 268 273 * NOTE: The ctxt must not be touched after the last WR has been posted 269 274 * because the I/O completion processing may occur on another ··· 289 284 u64 sgl_offset; 290 285 struct rpcrdma_read_chunk *ch; 291 286 struct svc_rdma_op_ctxt *ctxt = NULL; 292 - struct svc_rdma_op_ctxt *head; 293 287 struct svc_rdma_op_ctxt *tmp_sge_ctxt; 294 288 struct svc_rdma_op_ctxt *tmp_ch_ctxt; 295 289 struct chunk_sge *ch_sge_ary; ··· 306 302 ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge; 307 303 308 304 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count); 305 + if (ch_count > RPCSVC_MAXPAGES) 306 + return -EINVAL; 309 307 sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp, 310 308 sge, ch_sge_ary, 311 309 ch_count, byte_count); 312 - head = svc_rdma_get_context(xprt); 313 310 sgl_offset = 0; 314 311 ch_no = 0; 315 312 316 313 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 317 314 ch->rc_discrim != 0; ch++, ch_no++) { 318 315 next_sge: 319 - if (!ctxt) 320 - ctxt = head; 321 - else { 322 - ctxt->next = svc_rdma_get_context(xprt); 323 - ctxt = ctxt->next; 324 - } 325 - ctxt->next = NULL; 316 + ctxt = svc_rdma_get_context(xprt); 326 317 ctxt->direction = DMA_FROM_DEVICE; 327 - clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags); 328 318 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 329 319 330 320 /* Prepare READ WR */ ··· 345 347 * the client and the RPC needs to be enqueued. 346 348 */ 347 349 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 348 - ctxt->next = hdr_ctxt; 349 - hdr_ctxt->next = head; 350 + ctxt->read_hdr = hdr_ctxt; 350 351 } 351 352 /* Post the read */ 352 353 err = svc_rdma_send(xprt, &read_wr); 353 354 if (err) { 354 - printk(KERN_ERR "svcrdma: Error posting send = %d\n", 355 + printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n", 355 356 err); 356 - /* 357 - * Break the circular list so free knows when 358 - * to stop if the error happened to occur on 359 - * the last read 360 - */ 361 - ctxt->next = NULL; 357 + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 358 + svc_rdma_put_context(ctxt, 0); 362 359 goto out; 363 360 } 364 361 atomic_inc(&rdma_stat_read); ··· 364 371 goto next_sge; 365 372 } 366 373 sgl_offset = 0; 367 - err = 0; 374 + err = 1; 368 375 } 369 376 370 377 out: ··· 382 389 while (rqstp->rq_resused) 383 390 rqstp->rq_respages[--rqstp->rq_resused] = NULL; 384 391 385 - if (err) { 386 - printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err); 387 - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 388 - /* Free the linked list of read contexts */ 389 - while (head != NULL) { 390 - ctxt = head->next; 391 - svc_rdma_put_context(head, 1); 392 - head = ctxt; 393 - } 394 - return 0; 395 - } 396 - 397 - return 1; 392 + return err; 398 393 } 399 394 400 395 static int rdma_read_complete(struct svc_rqst *rqstp, 401 - struct svc_rdma_op_ctxt *data) 396 + struct svc_rdma_op_ctxt *head) 402 397 { 403 - struct svc_rdma_op_ctxt *head = data->next; 404 398 int page_no; 405 399 int ret; 406 400 ··· 413 433 rqstp->rq_arg.len = head->arg.len; 414 434 rqstp->rq_arg.buflen = head->arg.buflen; 415 435 436 + /* Free the context */ 437 + svc_rdma_put_context(head, 0); 438 + 416 439 /* XXX: What should this be? */ 417 440 rqstp->rq_prot = IPPROTO_MAX; 418 - 419 - /* 420 - * Free the contexts we used to build the RDMA_READ. We have 421 - * to be careful here because the context list uses the same 422 - * next pointer used to chain the contexts associated with the 423 - * RDMA_READ 424 - */ 425 - data->next = NULL; /* terminate circular list */ 426 - do { 427 - data = head->next; 428 - svc_rdma_put_context(head, 0); 429 - head = data; 430 - } while (head != NULL); 441 + svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt); 431 442 432 443 ret = rqstp->rq_arg.head[0].iov_len 433 444 + rqstp->rq_arg.page_len ··· 428 457 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, 429 458 rqstp->rq_arg.head[0].iov_len); 430 459 431 - /* Indicate that we've consumed an RQ credit */ 432 - rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 433 460 svc_xprt_received(rqstp->rq_xprt); 434 461 return ret; 435 462 } ··· 448 479 int len; 449 480 450 481 dprintk("svcrdma: rqstp=%p\n", rqstp); 451 - 452 - /* 453 - * The rq_xprt_ctxt indicates if we've consumed an RQ credit 454 - * or not. It is used in the rdma xpo_release_rqst function to 455 - * determine whether or not to return an RQ WQE to the RQ. 456 - */ 457 - rqstp->rq_xprt_ctxt = NULL; 458 482 459 483 spin_lock_bh(&rdma_xprt->sc_read_complete_lock); 460 484 if (!list_empty(&rdma_xprt->sc_read_complete_q)) { ··· 499 537 /* If the request is invalid, reply with an error */ 500 538 if (len < 0) { 501 539 if (len == -ENOSYS) 502 - (void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS); 540 + svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS); 503 541 goto close_out; 504 542 } 505 543 506 - /* Read read-list data. If we would need to wait, defer 507 - * it. Not that in this case, we don't return the RQ credit 508 - * until after the read completes. 509 - */ 510 - if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) { 544 + /* Read read-list data. */ 545 + ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt); 546 + if (ret > 0) { 547 + /* read-list posted, defer until data received from client. */ 511 548 svc_xprt_received(xprt); 512 549 return 0; 513 550 } 514 - 515 - /* Indicate we've consumed an RQ credit */ 516 - rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 551 + if (ret < 0) { 552 + /* Post of read-list failed, free context. */ 553 + svc_rdma_put_context(ctxt, 1); 554 + return 0; 555 + } 517 556 518 557 ret = rqstp->rq_arg.head[0].iov_len 519 558 + rqstp->rq_arg.page_len ··· 532 569 return ret; 533 570 534 571 close_out: 535 - if (ctxt) { 572 + if (ctxt) 536 573 svc_rdma_put_context(ctxt, 1); 537 - /* Indicate we've consumed an RQ credit */ 538 - rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 539 - } 540 574 dprintk("svcrdma: transport %p is closing\n", xprt); 541 575 /* 542 576 * Set the close bit and enqueue it. svc_recv will see the
+11
net/sunrpc/xprtrdma/svc_rdma_sendto.c
··· 389 389 int page_no; 390 390 int ret; 391 391 392 + /* Post a recv buffer to handle another request. */ 393 + ret = svc_rdma_post_recv(rdma); 394 + if (ret) { 395 + printk(KERN_INFO 396 + "svcrdma: could not post a receive buffer, err=%d." 397 + "Closing transport %p.\n", ret, rdma); 398 + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 399 + svc_rdma_put_context(ctxt, 0); 400 + return -ENOTCONN; 401 + } 402 + 392 403 /* Prepare the context */ 393 404 ctxt->pages[0] = page; 394 405 ctxt->count = 1;
+174 -116
net/sunrpc/xprtrdma/svc_rdma_transport.c
··· 103 103 spin_lock_bh(&xprt->sc_ctxt_lock); 104 104 if (ctxt) { 105 105 at_least_one = 1; 106 - ctxt->next = xprt->sc_ctxt_head; 107 - xprt->sc_ctxt_head = ctxt; 106 + INIT_LIST_HEAD(&ctxt->free_list); 107 + list_add(&ctxt->free_list, &xprt->sc_ctxt_free); 108 108 } else { 109 109 /* kmalloc failed...give up for now */ 110 110 xprt->sc_ctxt_cnt--; ··· 123 123 124 124 while (1) { 125 125 spin_lock_bh(&xprt->sc_ctxt_lock); 126 - if (unlikely(xprt->sc_ctxt_head == NULL)) { 126 + if (unlikely(list_empty(&xprt->sc_ctxt_free))) { 127 127 /* Try to bump my cache. */ 128 128 spin_unlock_bh(&xprt->sc_ctxt_lock); 129 129 ··· 136 136 schedule_timeout_uninterruptible(msecs_to_jiffies(500)); 137 137 continue; 138 138 } 139 - ctxt = xprt->sc_ctxt_head; 140 - xprt->sc_ctxt_head = ctxt->next; 139 + ctxt = list_entry(xprt->sc_ctxt_free.next, 140 + struct svc_rdma_op_ctxt, 141 + free_list); 142 + list_del_init(&ctxt->free_list); 141 143 spin_unlock_bh(&xprt->sc_ctxt_lock); 142 144 ctxt->xprt = xprt; 143 145 INIT_LIST_HEAD(&ctxt->dto_q); 144 146 ctxt->count = 0; 147 + atomic_inc(&xprt->sc_ctxt_used); 145 148 break; 146 149 } 147 150 return ctxt; ··· 162 159 put_page(ctxt->pages[i]); 163 160 164 161 for (i = 0; i < ctxt->count; i++) 165 - dma_unmap_single(xprt->sc_cm_id->device->dma_device, 166 - ctxt->sge[i].addr, 167 - ctxt->sge[i].length, 168 - ctxt->direction); 162 + ib_dma_unmap_single(xprt->sc_cm_id->device, 163 + ctxt->sge[i].addr, 164 + ctxt->sge[i].length, 165 + ctxt->direction); 166 + 169 167 spin_lock_bh(&xprt->sc_ctxt_lock); 170 - ctxt->next = xprt->sc_ctxt_head; 171 - xprt->sc_ctxt_head = ctxt; 168 + list_add(&ctxt->free_list, &xprt->sc_ctxt_free); 172 169 spin_unlock_bh(&xprt->sc_ctxt_lock); 170 + atomic_dec(&xprt->sc_ctxt_used); 173 171 } 174 172 175 173 /* ib_cq event handler */ ··· 232 228 list_del_init(&xprt->sc_dto_q); 233 229 spin_unlock_irqrestore(&dto_lock, flags); 234 230 235 - if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) { 236 - ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 237 - rq_cq_reap(xprt); 238 - set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 239 - /* 240 - * If data arrived before established event, 241 - * don't enqueue. This defers RPC I/O until the 242 - * RDMA connection is complete. 243 - */ 244 - if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 245 - svc_xprt_enqueue(&xprt->sc_xprt); 246 - } 247 - 248 - if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) { 249 - ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); 250 - sq_cq_reap(xprt); 251 - } 231 + rq_cq_reap(xprt); 232 + sq_cq_reap(xprt); 252 233 253 234 svc_xprt_put(&xprt->sc_xprt); 254 235 spin_lock_irqsave(&dto_lock, flags); ··· 252 263 struct svcxprt_rdma *xprt = cq_context; 253 264 unsigned long flags; 254 265 266 + /* Guard against unconditional flush call for destroyed QP */ 267 + if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0) 268 + return; 269 + 255 270 /* 256 271 * Set the bit regardless of whether or not it's on the list 257 272 * because it may be on the list already due to an SQ 258 273 * completion. 259 - */ 274 + */ 260 275 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags); 261 276 262 277 /* ··· 283 290 * 284 291 * Take all completing WC off the CQE and enqueue the associated DTO 285 292 * context on the dto_q for the transport. 293 + * 294 + * Note that caller must hold a transport reference. 286 295 */ 287 296 static void rq_cq_reap(struct svcxprt_rdma *xprt) 288 297 { ··· 292 297 struct ib_wc wc; 293 298 struct svc_rdma_op_ctxt *ctxt = NULL; 294 299 300 + if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) 301 + return; 302 + 303 + ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 295 304 atomic_inc(&rdma_stat_rq_poll); 296 305 297 - spin_lock_bh(&xprt->sc_rq_dto_lock); 298 306 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { 299 307 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 300 308 ctxt->wc_status = wc.status; 301 309 ctxt->byte_len = wc.byte_len; 302 310 if (wc.status != IB_WC_SUCCESS) { 303 311 /* Close the transport */ 312 + dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt); 304 313 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 305 314 svc_rdma_put_context(ctxt, 1); 315 + svc_xprt_put(&xprt->sc_xprt); 306 316 continue; 307 317 } 318 + spin_lock_bh(&xprt->sc_rq_dto_lock); 308 319 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); 320 + spin_unlock_bh(&xprt->sc_rq_dto_lock); 321 + svc_xprt_put(&xprt->sc_xprt); 309 322 } 310 - spin_unlock_bh(&xprt->sc_rq_dto_lock); 311 323 312 324 if (ctxt) 313 325 atomic_inc(&rdma_stat_rq_prod); 326 + 327 + set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 328 + /* 329 + * If data arrived before established event, 330 + * don't enqueue. This defers RPC I/O until the 331 + * RDMA connection is complete. 332 + */ 333 + if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 334 + svc_xprt_enqueue(&xprt->sc_xprt); 314 335 } 315 336 316 337 /* 317 338 * Send Queue Completion Handler - potentially called on interrupt context. 339 + * 340 + * Note that caller must hold a transport reference. 318 341 */ 319 342 static void sq_cq_reap(struct svcxprt_rdma *xprt) 320 343 { ··· 341 328 struct ib_cq *cq = xprt->sc_sq_cq; 342 329 int ret; 343 330 331 + 332 + if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) 333 + return; 334 + 335 + ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); 344 336 atomic_inc(&rdma_stat_sq_poll); 345 337 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { 346 338 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; ··· 367 349 368 350 case IB_WR_RDMA_READ: 369 351 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 352 + struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr; 353 + BUG_ON(!read_hdr); 370 354 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 371 - set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags); 372 355 spin_lock_bh(&xprt->sc_read_complete_lock); 373 - list_add_tail(&ctxt->dto_q, 356 + list_add_tail(&read_hdr->dto_q, 374 357 &xprt->sc_read_complete_q); 375 358 spin_unlock_bh(&xprt->sc_read_complete_lock); 376 359 svc_xprt_enqueue(&xprt->sc_xprt); 377 360 } 361 + svc_rdma_put_context(ctxt, 0); 378 362 break; 379 363 380 364 default: ··· 385 365 wc.opcode, wc.status); 386 366 break; 387 367 } 368 + svc_xprt_put(&xprt->sc_xprt); 388 369 } 389 370 390 371 if (ctxt) ··· 397 376 struct svcxprt_rdma *xprt = cq_context; 398 377 unsigned long flags; 399 378 379 + /* Guard against unconditional flush call for destroyed QP */ 380 + if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0) 381 + return; 382 + 400 383 /* 401 384 * Set the bit regardless of whether or not it's on the list 402 385 * because it may be on the list already due to an RQ 403 386 * completion. 404 - */ 387 + */ 405 388 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags); 406 389 407 390 /* ··· 432 407 xprt->sc_ctxt_max = ctxt_max; 433 408 xprt->sc_ctxt_bump = ctxt_bump; 434 409 xprt->sc_ctxt_cnt = 0; 435 - xprt->sc_ctxt_head = NULL; 410 + atomic_set(&xprt->sc_ctxt_used, 0); 411 + 412 + INIT_LIST_HEAD(&xprt->sc_ctxt_free); 436 413 for (i = 0; i < ctxt_count; i++) { 437 414 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); 438 415 if (ctxt) { 439 - ctxt->next = xprt->sc_ctxt_head; 440 - xprt->sc_ctxt_head = ctxt; 416 + INIT_LIST_HEAD(&ctxt->free_list); 417 + list_add(&ctxt->free_list, &xprt->sc_ctxt_free); 441 418 xprt->sc_ctxt_cnt++; 442 419 } 443 420 } 444 421 } 445 422 446 - static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt) 423 + static void destroy_context_cache(struct svcxprt_rdma *xprt) 447 424 { 448 - struct svc_rdma_op_ctxt *next; 449 - if (!ctxt) 450 - return; 451 - 452 - do { 453 - next = ctxt->next; 425 + while (!list_empty(&xprt->sc_ctxt_free)) { 426 + struct svc_rdma_op_ctxt *ctxt; 427 + ctxt = list_entry(xprt->sc_ctxt_free.next, 428 + struct svc_rdma_op_ctxt, 429 + free_list); 430 + list_del_init(&ctxt->free_list); 454 431 kfree(ctxt); 455 - ctxt = next; 456 - } while (next); 432 + } 457 433 } 458 434 459 435 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, ··· 491 465 reqs + 492 466 cma_xprt->sc_sq_depth + 493 467 RPCRDMA_MAX_THREADS + 1); /* max */ 494 - if (!cma_xprt->sc_ctxt_head) { 468 + if (list_empty(&cma_xprt->sc_ctxt_free)) { 495 469 kfree(cma_xprt); 496 470 return NULL; 497 471 } ··· 546 520 recv_wr.num_sge = ctxt->count; 547 521 recv_wr.wr_id = (u64)(unsigned long)ctxt; 548 522 523 + svc_xprt_get(&xprt->sc_xprt); 549 524 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 525 + if (ret) { 526 + svc_xprt_put(&xprt->sc_xprt); 527 + svc_rdma_put_context(ctxt, 1); 528 + } 550 529 return ret; 551 530 } 552 531 ··· 570 539 { 571 540 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 572 541 struct svcxprt_rdma *newxprt; 542 + struct sockaddr *sa; 573 543 574 544 /* Create a new transport */ 575 545 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); ··· 582 550 new_cma_id->context = newxprt; 583 551 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 584 552 newxprt, newxprt->sc_cm_id, listen_xprt); 553 + 554 + /* Set the local and remote addresses in the transport */ 555 + sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 556 + svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 557 + sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 558 + svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 585 559 586 560 /* 587 561 * Enqueue the new transport on the accept queue of the listening ··· 665 627 if (xprt) { 666 628 set_bit(XPT_CLOSE, &xprt->xpt_flags); 667 629 svc_xprt_enqueue(xprt); 630 + svc_xprt_put(xprt); 668 631 } 669 632 break; 670 633 case RDMA_CM_EVENT_DEVICE_REMOVAL: ··· 700 661 701 662 cma_xprt = rdma_create_xprt(serv, 1); 702 663 if (!cma_xprt) 703 - return ERR_PTR(ENOMEM); 664 + return ERR_PTR(-ENOMEM); 704 665 xprt = &cma_xprt->sc_xprt; 705 666 706 667 listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); 707 668 if (IS_ERR(listen_id)) { 708 - svc_xprt_put(&cma_xprt->sc_xprt); 709 - dprintk("svcrdma: rdma_create_id failed = %ld\n", 710 - PTR_ERR(listen_id)); 711 - return (void *)listen_id; 669 + ret = PTR_ERR(listen_id); 670 + dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 671 + goto err0; 712 672 } 673 + 713 674 ret = rdma_bind_addr(listen_id, sa); 714 675 if (ret) { 715 - rdma_destroy_id(listen_id); 716 - svc_xprt_put(&cma_xprt->sc_xprt); 717 676 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 718 - return ERR_PTR(ret); 677 + goto err1; 719 678 } 720 679 cma_xprt->sc_cm_id = listen_id; 721 680 722 681 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 723 682 if (ret) { 724 - rdma_destroy_id(listen_id); 725 - svc_xprt_put(&cma_xprt->sc_xprt); 726 683 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 727 - return ERR_PTR(ret); 684 + goto err1; 728 685 } 729 686 730 687 /* ··· 731 696 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 732 697 733 698 return &cma_xprt->sc_xprt; 699 + 700 + err1: 701 + rdma_destroy_id(listen_id); 702 + err0: 703 + kfree(cma_xprt); 704 + return ERR_PTR(ret); 734 705 } 735 706 736 707 /* ··· 757 716 struct rdma_conn_param conn_param; 758 717 struct ib_qp_init_attr qp_attr; 759 718 struct ib_device_attr devattr; 760 - struct sockaddr *sa; 761 719 int ret; 762 720 int i; 763 721 ··· 866 826 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr; 867 827 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr; 868 828 } 869 - svc_xprt_get(&newxprt->sc_xprt); 870 829 newxprt->sc_qp = newxprt->sc_cm_id->qp; 871 830 872 831 /* Register all of physical memory */ ··· 888 849 889 850 /* Swap out the handler */ 890 851 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 852 + 853 + /* 854 + * Arm the CQs for the SQ and RQ before accepting so we can't 855 + * miss the first message 856 + */ 857 + ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); 858 + ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP); 891 859 892 860 /* Accept Connection */ 893 861 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); ··· 932 886 newxprt->sc_max_requests, 933 887 newxprt->sc_ord); 934 888 935 - /* Set the local and remote addresses in the transport */ 936 - sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 937 - svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 938 - sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 939 - svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 940 - 941 - ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); 942 - ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP); 943 889 return &newxprt->sc_xprt; 944 890 945 891 errout: 946 892 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 947 893 /* Take a reference in case the DTO handler runs */ 948 894 svc_xprt_get(&newxprt->sc_xprt); 949 - if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) { 895 + if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 950 896 ib_destroy_qp(newxprt->sc_qp); 951 - svc_xprt_put(&newxprt->sc_xprt); 952 - } 953 897 rdma_destroy_id(newxprt->sc_cm_id); 954 898 /* This call to put will destroy the transport */ 955 899 svc_xprt_put(&newxprt->sc_xprt); 956 900 return NULL; 957 901 } 958 902 959 - /* 960 - * Post an RQ WQE to the RQ when the rqst is being released. This 961 - * effectively returns an RQ credit to the client. The rq_xprt_ctxt 962 - * will be null if the request is deferred due to an RDMA_READ or the 963 - * transport had no data ready (EAGAIN). Note that an RPC deferred in 964 - * svc_process will still return the credit, this is because the data 965 - * is copied and no longer consume a WQE/WC. 966 - */ 967 903 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 968 904 { 969 - int err; 970 - struct svcxprt_rdma *rdma = 971 - container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt); 972 - if (rqstp->rq_xprt_ctxt) { 973 - BUG_ON(rqstp->rq_xprt_ctxt != rdma); 974 - err = svc_rdma_post_recv(rdma); 975 - if (err) 976 - dprintk("svcrdma: failed to post an RQ WQE error=%d\n", 977 - err); 978 - } 979 - rqstp->rq_xprt_ctxt = NULL; 980 905 } 981 906 982 907 /* 983 - * When connected, an svc_xprt has at least three references: 984 - * 985 - * - A reference held by the QP. We still hold that here because this 986 - * code deletes the QP and puts the reference. 908 + * When connected, an svc_xprt has at least two references: 987 909 * 988 910 * - A reference held by the cm_id between the ESTABLISHED and 989 911 * DISCONNECTED events. If the remote peer disconnected first, this ··· 960 946 * - A reference held by the svc_recv code that called this function 961 947 * as part of close processing. 962 948 * 963 - * At a minimum two references should still be held. 949 + * At a minimum one references should still be held. 964 950 */ 965 951 static void svc_rdma_detach(struct svc_xprt *xprt) 966 952 { ··· 970 956 971 957 /* Disconnect and flush posted WQE */ 972 958 rdma_disconnect(rdma->sc_cm_id); 973 - 974 - /* Destroy the QP if present (not a listener) */ 975 - if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) { 976 - ib_destroy_qp(rdma->sc_qp); 977 - svc_xprt_put(xprt); 978 - } 979 - 980 - /* Destroy the CM ID */ 981 - rdma_destroy_id(rdma->sc_cm_id); 982 959 } 983 960 984 - static void svc_rdma_free(struct svc_xprt *xprt) 961 + static void __svc_rdma_free(struct work_struct *work) 985 962 { 986 - struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt; 963 + struct svcxprt_rdma *rdma = 964 + container_of(work, struct svcxprt_rdma, sc_work); 987 965 dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); 966 + 988 967 /* We should only be called from kref_put */ 989 - BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0); 968 + BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0); 969 + 970 + /* 971 + * Destroy queued, but not processed read completions. Note 972 + * that this cleanup has to be done before destroying the 973 + * cm_id because the device ptr is needed to unmap the dma in 974 + * svc_rdma_put_context. 975 + */ 976 + spin_lock_bh(&rdma->sc_read_complete_lock); 977 + while (!list_empty(&rdma->sc_read_complete_q)) { 978 + struct svc_rdma_op_ctxt *ctxt; 979 + ctxt = list_entry(rdma->sc_read_complete_q.next, 980 + struct svc_rdma_op_ctxt, 981 + dto_q); 982 + list_del_init(&ctxt->dto_q); 983 + svc_rdma_put_context(ctxt, 1); 984 + } 985 + spin_unlock_bh(&rdma->sc_read_complete_lock); 986 + 987 + /* Destroy queued, but not processed recv completions */ 988 + spin_lock_bh(&rdma->sc_rq_dto_lock); 989 + while (!list_empty(&rdma->sc_rq_dto_q)) { 990 + struct svc_rdma_op_ctxt *ctxt; 991 + ctxt = list_entry(rdma->sc_rq_dto_q.next, 992 + struct svc_rdma_op_ctxt, 993 + dto_q); 994 + list_del_init(&ctxt->dto_q); 995 + svc_rdma_put_context(ctxt, 1); 996 + } 997 + spin_unlock_bh(&rdma->sc_rq_dto_lock); 998 + 999 + /* Warn if we leaked a resource or under-referenced */ 1000 + WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0); 1001 + 1002 + /* Destroy the QP if present (not a listener) */ 1003 + if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1004 + ib_destroy_qp(rdma->sc_qp); 1005 + 990 1006 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 991 1007 ib_destroy_cq(rdma->sc_sq_cq); 992 1008 ··· 1029 985 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 1030 986 ib_dealloc_pd(rdma->sc_pd); 1031 987 1032 - destroy_context_cache(rdma->sc_ctxt_head); 988 + /* Destroy the CM ID */ 989 + rdma_destroy_id(rdma->sc_cm_id); 990 + 991 + destroy_context_cache(rdma); 1033 992 kfree(rdma); 993 + } 994 + 995 + static void svc_rdma_free(struct svc_xprt *xprt) 996 + { 997 + struct svcxprt_rdma *rdma = 998 + container_of(xprt, struct svcxprt_rdma, sc_xprt); 999 + INIT_WORK(&rdma->sc_work, __svc_rdma_free); 1000 + schedule_work(&rdma->sc_work); 1034 1001 } 1035 1002 1036 1003 static int svc_rdma_has_wspace(struct svc_xprt *xprt) ··· 1073 1018 int ret; 1074 1019 1075 1020 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1076 - return 0; 1021 + return -ENOTCONN; 1077 1022 1078 1023 BUG_ON(wr->send_flags != IB_SEND_SIGNALED); 1079 1024 BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op != ··· 1084 1029 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { 1085 1030 spin_unlock_bh(&xprt->sc_lock); 1086 1031 atomic_inc(&rdma_stat_sq_starve); 1087 - /* See if we can reap some SQ WR */ 1032 + 1033 + /* See if we can opportunistically reap SQ WR to make room */ 1088 1034 sq_cq_reap(xprt); 1089 1035 1090 1036 /* Wait until SQ WR available if SQ still full */ ··· 1097 1041 continue; 1098 1042 } 1099 1043 /* Bumped used SQ WR count and post */ 1044 + svc_xprt_get(&xprt->sc_xprt); 1100 1045 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1101 1046 if (!ret) 1102 1047 atomic_inc(&xprt->sc_sq_count); 1103 - else 1048 + else { 1049 + svc_xprt_put(&xprt->sc_xprt); 1104 1050 dprintk("svcrdma: failed to post SQ WR rc=%d, " 1105 1051 "sc_sq_count=%d, sc_sq_depth=%d\n", 1106 1052 ret, atomic_read(&xprt->sc_sq_count), 1107 1053 xprt->sc_sq_depth); 1054 + } 1108 1055 spin_unlock_bh(&xprt->sc_lock); 1109 1056 break; 1110 1057 } 1111 1058 return ret; 1112 1059 } 1113 1060 1114 - int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, 1115 - enum rpcrdma_errcode err) 1061 + void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, 1062 + enum rpcrdma_errcode err) 1116 1063 { 1117 1064 struct ib_send_wr err_wr; 1118 1065 struct ib_sge sge; ··· 1153 1094 /* Post It */ 1154 1095 ret = svc_rdma_send(xprt, &err_wr); 1155 1096 if (ret) { 1156 - dprintk("svcrdma: Error posting send = %d\n", ret); 1097 + dprintk("svcrdma: Error %d posting send for protocol error\n", 1098 + ret); 1157 1099 svc_rdma_put_context(ctxt, 1); 1158 1100 } 1159 - 1160 - return ret; 1161 1101 }