rdma: SVCRDMA Core Transport Services

This file implements the core transport data management and I/O
path. The I/O path for RDMA involves receiving callbacks on interrupt
context. Since all the svc transport locks are _bh locks we enqueue the
transport on a list, schedule a tasklet to dequeue data indications from
the RDMA completion queue. The tasklet in turn takes _bh locks to
enqueue receive data indications on a list for the transport. The
svc_rdma_recvfrom transport function dequeues data from this list in an
NFSD thread context.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
Acked-by: Neil Brown <neilb@suse.de>
Signed-off-by: J. Bruce Fields <bfields@citi.umich.edu>

authored by Tom Tucker and committed by J. Bruce Fields 377f9b2f ef7fbf59

+1080
+1080
net/sunrpc/xprtrdma/svc_rdma_transport.c
··· 1 + /* 2 + * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 3 + * 4 + * This software is available to you under a choice of one of two 5 + * licenses. You may choose to be licensed under the terms of the GNU 6 + * General Public License (GPL) Version 2, available from the file 7 + * COPYING in the main directory of this source tree, or the BSD-type 8 + * license below: 9 + * 10 + * Redistribution and use in source and binary forms, with or without 11 + * modification, are permitted provided that the following conditions 12 + * are met: 13 + * 14 + * Redistributions of source code must retain the above copyright 15 + * notice, this list of conditions and the following disclaimer. 16 + * 17 + * Redistributions in binary form must reproduce the above 18 + * copyright notice, this list of conditions and the following 19 + * disclaimer in the documentation and/or other materials provided 20 + * with the distribution. 21 + * 22 + * Neither the name of the Network Appliance, Inc. nor the names of 23 + * its contributors may be used to endorse or promote products 24 + * derived from this software without specific prior written 25 + * permission. 26 + * 27 + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 + * 39 + * Author: Tom Tucker <tom@opengridcomputing.com> 40 + */ 41 + 42 + #include <linux/sunrpc/svc_xprt.h> 43 + #include <linux/sunrpc/debug.h> 44 + #include <linux/sunrpc/rpc_rdma.h> 45 + #include <linux/spinlock.h> 46 + #include <rdma/ib_verbs.h> 47 + #include <rdma/rdma_cm.h> 48 + #include <linux/sunrpc/svc_rdma.h> 49 + 50 + #define RPCDBG_FACILITY RPCDBG_SVCXPRT 51 + 52 + static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 53 + struct sockaddr *sa, int salen, 54 + int flags); 55 + static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 56 + static void svc_rdma_release_rqst(struct svc_rqst *); 57 + static void rdma_destroy_xprt(struct svcxprt_rdma *xprt); 58 + static void dto_tasklet_func(unsigned long data); 59 + static void svc_rdma_detach(struct svc_xprt *xprt); 60 + static void svc_rdma_free(struct svc_xprt *xprt); 61 + static int svc_rdma_has_wspace(struct svc_xprt *xprt); 62 + static void rq_cq_reap(struct svcxprt_rdma *xprt); 63 + static void sq_cq_reap(struct svcxprt_rdma *xprt); 64 + 65 + DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL); 66 + static DEFINE_SPINLOCK(dto_lock); 67 + static LIST_HEAD(dto_xprt_q); 68 + 69 + static struct svc_xprt_ops svc_rdma_ops = { 70 + .xpo_create = svc_rdma_create, 71 + .xpo_recvfrom = svc_rdma_recvfrom, 72 + .xpo_sendto = svc_rdma_sendto, 73 + .xpo_release_rqst = svc_rdma_release_rqst, 74 + .xpo_detach = svc_rdma_detach, 75 + .xpo_free = svc_rdma_free, 76 + .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 77 + .xpo_has_wspace = svc_rdma_has_wspace, 78 + .xpo_accept = svc_rdma_accept, 79 + }; 80 + 81 + struct svc_xprt_class svc_rdma_class = { 82 + .xcl_name = "rdma", 83 + .xcl_owner = THIS_MODULE, 84 + .xcl_ops = &svc_rdma_ops, 85 + .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP, 86 + }; 87 + 88 + static int rdma_bump_context_cache(struct svcxprt_rdma *xprt) 89 + { 90 + int target; 91 + int at_least_one = 0; 92 + struct svc_rdma_op_ctxt *ctxt; 93 + 94 + target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump, 95 + xprt->sc_ctxt_max); 96 + 97 + spin_lock_bh(&xprt->sc_ctxt_lock); 98 + while (xprt->sc_ctxt_cnt < target) { 99 + xprt->sc_ctxt_cnt++; 100 + spin_unlock_bh(&xprt->sc_ctxt_lock); 101 + 102 + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); 103 + 104 + spin_lock_bh(&xprt->sc_ctxt_lock); 105 + if (ctxt) { 106 + at_least_one = 1; 107 + ctxt->next = xprt->sc_ctxt_head; 108 + xprt->sc_ctxt_head = ctxt; 109 + } else { 110 + /* kmalloc failed...give up for now */ 111 + xprt->sc_ctxt_cnt--; 112 + break; 113 + } 114 + } 115 + spin_unlock_bh(&xprt->sc_ctxt_lock); 116 + dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n", 117 + xprt->sc_ctxt_max, xprt->sc_ctxt_cnt); 118 + return at_least_one; 119 + } 120 + 121 + struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 122 + { 123 + struct svc_rdma_op_ctxt *ctxt; 124 + 125 + while (1) { 126 + spin_lock_bh(&xprt->sc_ctxt_lock); 127 + if (unlikely(xprt->sc_ctxt_head == NULL)) { 128 + /* Try to bump my cache. */ 129 + spin_unlock_bh(&xprt->sc_ctxt_lock); 130 + 131 + if (rdma_bump_context_cache(xprt)) 132 + continue; 133 + 134 + printk(KERN_INFO "svcrdma: sleeping waiting for " 135 + "context memory on xprt=%p\n", 136 + xprt); 137 + schedule_timeout_uninterruptible(msecs_to_jiffies(500)); 138 + continue; 139 + } 140 + ctxt = xprt->sc_ctxt_head; 141 + xprt->sc_ctxt_head = ctxt->next; 142 + spin_unlock_bh(&xprt->sc_ctxt_lock); 143 + ctxt->xprt = xprt; 144 + INIT_LIST_HEAD(&ctxt->dto_q); 145 + ctxt->count = 0; 146 + break; 147 + } 148 + return ctxt; 149 + } 150 + 151 + void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 152 + { 153 + struct svcxprt_rdma *xprt; 154 + int i; 155 + 156 + BUG_ON(!ctxt); 157 + xprt = ctxt->xprt; 158 + if (free_pages) 159 + for (i = 0; i < ctxt->count; i++) 160 + put_page(ctxt->pages[i]); 161 + 162 + for (i = 0; i < ctxt->count; i++) 163 + dma_unmap_single(xprt->sc_cm_id->device->dma_device, 164 + ctxt->sge[i].addr, 165 + ctxt->sge[i].length, 166 + ctxt->direction); 167 + spin_lock_bh(&xprt->sc_ctxt_lock); 168 + ctxt->next = xprt->sc_ctxt_head; 169 + xprt->sc_ctxt_head = ctxt; 170 + spin_unlock_bh(&xprt->sc_ctxt_lock); 171 + } 172 + 173 + /* ib_cq event handler */ 174 + static void cq_event_handler(struct ib_event *event, void *context) 175 + { 176 + struct svc_xprt *xprt = context; 177 + dprintk("svcrdma: received CQ event id=%d, context=%p\n", 178 + event->event, context); 179 + set_bit(XPT_CLOSE, &xprt->xpt_flags); 180 + } 181 + 182 + /* QP event handler */ 183 + static void qp_event_handler(struct ib_event *event, void *context) 184 + { 185 + struct svc_xprt *xprt = context; 186 + 187 + switch (event->event) { 188 + /* These are considered benign events */ 189 + case IB_EVENT_PATH_MIG: 190 + case IB_EVENT_COMM_EST: 191 + case IB_EVENT_SQ_DRAINED: 192 + case IB_EVENT_QP_LAST_WQE_REACHED: 193 + dprintk("svcrdma: QP event %d received for QP=%p\n", 194 + event->event, event->element.qp); 195 + break; 196 + /* These are considered fatal events */ 197 + case IB_EVENT_PATH_MIG_ERR: 198 + case IB_EVENT_QP_FATAL: 199 + case IB_EVENT_QP_REQ_ERR: 200 + case IB_EVENT_QP_ACCESS_ERR: 201 + case IB_EVENT_DEVICE_FATAL: 202 + default: 203 + dprintk("svcrdma: QP ERROR event %d received for QP=%p, " 204 + "closing transport\n", 205 + event->event, event->element.qp); 206 + set_bit(XPT_CLOSE, &xprt->xpt_flags); 207 + break; 208 + } 209 + } 210 + 211 + /* 212 + * Data Transfer Operation Tasklet 213 + * 214 + * Walks a list of transports with I/O pending, removing entries as 215 + * they are added to the server's I/O pending list. Two bits indicate 216 + * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave 217 + * spinlock that serializes access to the transport list with the RQ 218 + * and SQ interrupt handlers. 219 + */ 220 + static void dto_tasklet_func(unsigned long data) 221 + { 222 + struct svcxprt_rdma *xprt; 223 + unsigned long flags; 224 + 225 + spin_lock_irqsave(&dto_lock, flags); 226 + while (!list_empty(&dto_xprt_q)) { 227 + xprt = list_entry(dto_xprt_q.next, 228 + struct svcxprt_rdma, sc_dto_q); 229 + list_del_init(&xprt->sc_dto_q); 230 + spin_unlock_irqrestore(&dto_lock, flags); 231 + 232 + if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) { 233 + ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 234 + rq_cq_reap(xprt); 235 + set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 236 + /* 237 + * If data arrived before established event, 238 + * don't enqueue. This defers RPC I/O until the 239 + * RDMA connection is complete. 240 + */ 241 + if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 242 + svc_xprt_enqueue(&xprt->sc_xprt); 243 + } 244 + 245 + if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) { 246 + ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); 247 + sq_cq_reap(xprt); 248 + } 249 + 250 + spin_lock_irqsave(&dto_lock, flags); 251 + } 252 + spin_unlock_irqrestore(&dto_lock, flags); 253 + } 254 + 255 + /* 256 + * Receive Queue Completion Handler 257 + * 258 + * Since an RQ completion handler is called on interrupt context, we 259 + * need to defer the handling of the I/O to a tasklet 260 + */ 261 + static void rq_comp_handler(struct ib_cq *cq, void *cq_context) 262 + { 263 + struct svcxprt_rdma *xprt = cq_context; 264 + unsigned long flags; 265 + 266 + /* 267 + * Set the bit regardless of whether or not it's on the list 268 + * because it may be on the list already due to an SQ 269 + * completion. 270 + */ 271 + set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags); 272 + 273 + /* 274 + * If this transport is not already on the DTO transport queue, 275 + * add it 276 + */ 277 + spin_lock_irqsave(&dto_lock, flags); 278 + if (list_empty(&xprt->sc_dto_q)) 279 + list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 280 + spin_unlock_irqrestore(&dto_lock, flags); 281 + 282 + /* Tasklet does all the work to avoid irqsave locks. */ 283 + tasklet_schedule(&dto_tasklet); 284 + } 285 + 286 + /* 287 + * rq_cq_reap - Process the RQ CQ. 288 + * 289 + * Take all completing WC off the CQE and enqueue the associated DTO 290 + * context on the dto_q for the transport. 291 + */ 292 + static void rq_cq_reap(struct svcxprt_rdma *xprt) 293 + { 294 + int ret; 295 + struct ib_wc wc; 296 + struct svc_rdma_op_ctxt *ctxt = NULL; 297 + 298 + atomic_inc(&rdma_stat_rq_poll); 299 + 300 + spin_lock_bh(&xprt->sc_rq_dto_lock); 301 + while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { 302 + ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 303 + ctxt->wc_status = wc.status; 304 + ctxt->byte_len = wc.byte_len; 305 + if (wc.status != IB_WC_SUCCESS) { 306 + /* Close the transport */ 307 + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 308 + svc_rdma_put_context(ctxt, 1); 309 + continue; 310 + } 311 + list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); 312 + } 313 + spin_unlock_bh(&xprt->sc_rq_dto_lock); 314 + 315 + if (ctxt) 316 + atomic_inc(&rdma_stat_rq_prod); 317 + } 318 + 319 + /* 320 + * Send Queue Completion Handler - potentially called on interrupt context. 321 + */ 322 + static void sq_cq_reap(struct svcxprt_rdma *xprt) 323 + { 324 + struct svc_rdma_op_ctxt *ctxt = NULL; 325 + struct ib_wc wc; 326 + struct ib_cq *cq = xprt->sc_sq_cq; 327 + int ret; 328 + 329 + atomic_inc(&rdma_stat_sq_poll); 330 + while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { 331 + ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 332 + xprt = ctxt->xprt; 333 + 334 + if (wc.status != IB_WC_SUCCESS) 335 + /* Close the transport */ 336 + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 337 + 338 + /* Decrement used SQ WR count */ 339 + atomic_dec(&xprt->sc_sq_count); 340 + wake_up(&xprt->sc_send_wait); 341 + 342 + switch (ctxt->wr_op) { 343 + case IB_WR_SEND: 344 + case IB_WR_RDMA_WRITE: 345 + svc_rdma_put_context(ctxt, 1); 346 + break; 347 + 348 + case IB_WR_RDMA_READ: 349 + if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 350 + set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 351 + set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags); 352 + spin_lock_bh(&xprt->sc_read_complete_lock); 353 + list_add_tail(&ctxt->dto_q, 354 + &xprt->sc_read_complete_q); 355 + spin_unlock_bh(&xprt->sc_read_complete_lock); 356 + svc_xprt_enqueue(&xprt->sc_xprt); 357 + } 358 + break; 359 + 360 + default: 361 + printk(KERN_ERR "svcrdma: unexpected completion type, " 362 + "opcode=%d, status=%d\n", 363 + wc.opcode, wc.status); 364 + break; 365 + } 366 + } 367 + 368 + if (ctxt) 369 + atomic_inc(&rdma_stat_sq_prod); 370 + } 371 + 372 + static void sq_comp_handler(struct ib_cq *cq, void *cq_context) 373 + { 374 + struct svcxprt_rdma *xprt = cq_context; 375 + unsigned long flags; 376 + 377 + /* 378 + * Set the bit regardless of whether or not it's on the list 379 + * because it may be on the list already due to an RQ 380 + * completion. 381 + */ 382 + set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags); 383 + 384 + /* 385 + * If this transport is not already on the DTO transport queue, 386 + * add it 387 + */ 388 + spin_lock_irqsave(&dto_lock, flags); 389 + if (list_empty(&xprt->sc_dto_q)) 390 + list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 391 + spin_unlock_irqrestore(&dto_lock, flags); 392 + 393 + /* Tasklet does all the work to avoid irqsave locks. */ 394 + tasklet_schedule(&dto_tasklet); 395 + } 396 + 397 + static void create_context_cache(struct svcxprt_rdma *xprt, 398 + int ctxt_count, int ctxt_bump, int ctxt_max) 399 + { 400 + struct svc_rdma_op_ctxt *ctxt; 401 + int i; 402 + 403 + xprt->sc_ctxt_max = ctxt_max; 404 + xprt->sc_ctxt_bump = ctxt_bump; 405 + xprt->sc_ctxt_cnt = 0; 406 + xprt->sc_ctxt_head = NULL; 407 + for (i = 0; i < ctxt_count; i++) { 408 + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); 409 + if (ctxt) { 410 + ctxt->next = xprt->sc_ctxt_head; 411 + xprt->sc_ctxt_head = ctxt; 412 + xprt->sc_ctxt_cnt++; 413 + } 414 + } 415 + } 416 + 417 + static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt) 418 + { 419 + struct svc_rdma_op_ctxt *next; 420 + if (!ctxt) 421 + return; 422 + 423 + do { 424 + next = ctxt->next; 425 + kfree(ctxt); 426 + ctxt = next; 427 + } while (next); 428 + } 429 + 430 + static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 431 + int listener) 432 + { 433 + struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 434 + 435 + if (!cma_xprt) 436 + return NULL; 437 + svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv); 438 + INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 439 + INIT_LIST_HEAD(&cma_xprt->sc_dto_q); 440 + INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 441 + INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 442 + init_waitqueue_head(&cma_xprt->sc_send_wait); 443 + 444 + spin_lock_init(&cma_xprt->sc_lock); 445 + spin_lock_init(&cma_xprt->sc_read_complete_lock); 446 + spin_lock_init(&cma_xprt->sc_ctxt_lock); 447 + spin_lock_init(&cma_xprt->sc_rq_dto_lock); 448 + 449 + cma_xprt->sc_ord = svcrdma_ord; 450 + 451 + cma_xprt->sc_max_req_size = svcrdma_max_req_size; 452 + cma_xprt->sc_max_requests = svcrdma_max_requests; 453 + cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT; 454 + atomic_set(&cma_xprt->sc_sq_count, 0); 455 + 456 + if (!listener) { 457 + int reqs = cma_xprt->sc_max_requests; 458 + create_context_cache(cma_xprt, 459 + reqs << 1, /* starting size */ 460 + reqs, /* bump amount */ 461 + reqs + 462 + cma_xprt->sc_sq_depth + 463 + RPCRDMA_MAX_THREADS + 1); /* max */ 464 + if (!cma_xprt->sc_ctxt_head) { 465 + kfree(cma_xprt); 466 + return NULL; 467 + } 468 + clear_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 469 + } else 470 + set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 471 + 472 + return cma_xprt; 473 + } 474 + 475 + struct page *svc_rdma_get_page(void) 476 + { 477 + struct page *page; 478 + 479 + while ((page = alloc_page(GFP_KERNEL)) == NULL) { 480 + /* If we can't get memory, wait a bit and try again */ 481 + printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 " 482 + "jiffies.\n"); 483 + schedule_timeout_uninterruptible(msecs_to_jiffies(1000)); 484 + } 485 + return page; 486 + } 487 + 488 + int svc_rdma_post_recv(struct svcxprt_rdma *xprt) 489 + { 490 + struct ib_recv_wr recv_wr, *bad_recv_wr; 491 + struct svc_rdma_op_ctxt *ctxt; 492 + struct page *page; 493 + unsigned long pa; 494 + int sge_no; 495 + int buflen; 496 + int ret; 497 + 498 + ctxt = svc_rdma_get_context(xprt); 499 + buflen = 0; 500 + ctxt->direction = DMA_FROM_DEVICE; 501 + for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 502 + BUG_ON(sge_no >= xprt->sc_max_sge); 503 + page = svc_rdma_get_page(); 504 + ctxt->pages[sge_no] = page; 505 + pa = ib_dma_map_page(xprt->sc_cm_id->device, 506 + page, 0, PAGE_SIZE, 507 + DMA_FROM_DEVICE); 508 + ctxt->sge[sge_no].addr = pa; 509 + ctxt->sge[sge_no].length = PAGE_SIZE; 510 + ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey; 511 + buflen += PAGE_SIZE; 512 + } 513 + ctxt->count = sge_no; 514 + recv_wr.next = NULL; 515 + recv_wr.sg_list = &ctxt->sge[0]; 516 + recv_wr.num_sge = ctxt->count; 517 + recv_wr.wr_id = (u64)(unsigned long)ctxt; 518 + 519 + ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 520 + return ret; 521 + } 522 + 523 + /* 524 + * This function handles the CONNECT_REQUEST event on a listening 525 + * endpoint. It is passed the cma_id for the _new_ connection. The context in 526 + * this cma_id is inherited from the listening cma_id and is the svc_xprt 527 + * structure for the listening endpoint. 528 + * 529 + * This function creates a new xprt for the new connection and enqueues it on 530 + * the accept queue for the listent xprt. When the listen thread is kicked, it 531 + * will call the recvfrom method on the listen xprt which will accept the new 532 + * connection. 533 + */ 534 + static void handle_connect_req(struct rdma_cm_id *new_cma_id) 535 + { 536 + struct svcxprt_rdma *listen_xprt = new_cma_id->context; 537 + struct svcxprt_rdma *newxprt; 538 + 539 + /* Create a new transport */ 540 + newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 541 + if (!newxprt) { 542 + dprintk("svcrdma: failed to create new transport\n"); 543 + return; 544 + } 545 + newxprt->sc_cm_id = new_cma_id; 546 + new_cma_id->context = newxprt; 547 + dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 548 + newxprt, newxprt->sc_cm_id, listen_xprt); 549 + 550 + /* 551 + * Enqueue the new transport on the accept queue of the listening 552 + * transport 553 + */ 554 + spin_lock_bh(&listen_xprt->sc_lock); 555 + list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 556 + spin_unlock_bh(&listen_xprt->sc_lock); 557 + 558 + /* 559 + * Can't use svc_xprt_received here because we are not on a 560 + * rqstp thread 561 + */ 562 + set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 563 + svc_xprt_enqueue(&listen_xprt->sc_xprt); 564 + } 565 + 566 + /* 567 + * Handles events generated on the listening endpoint. These events will be 568 + * either be incoming connect requests or adapter removal events. 569 + */ 570 + static int rdma_listen_handler(struct rdma_cm_id *cma_id, 571 + struct rdma_cm_event *event) 572 + { 573 + struct svcxprt_rdma *xprt = cma_id->context; 574 + int ret = 0; 575 + 576 + switch (event->event) { 577 + case RDMA_CM_EVENT_CONNECT_REQUEST: 578 + dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 579 + "event=%d\n", cma_id, cma_id->context, event->event); 580 + handle_connect_req(cma_id); 581 + break; 582 + 583 + case RDMA_CM_EVENT_ESTABLISHED: 584 + /* Accept complete */ 585 + dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 586 + "cm_id=%p\n", xprt, cma_id); 587 + break; 588 + 589 + case RDMA_CM_EVENT_DEVICE_REMOVAL: 590 + dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 591 + xprt, cma_id); 592 + if (xprt) 593 + set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 594 + break; 595 + 596 + default: 597 + dprintk("svcrdma: Unexpected event on listening endpoint %p, " 598 + "event=%d\n", cma_id, event->event); 599 + break; 600 + } 601 + 602 + return ret; 603 + } 604 + 605 + static int rdma_cma_handler(struct rdma_cm_id *cma_id, 606 + struct rdma_cm_event *event) 607 + { 608 + struct svc_xprt *xprt = cma_id->context; 609 + struct svcxprt_rdma *rdma = 610 + container_of(xprt, struct svcxprt_rdma, sc_xprt); 611 + switch (event->event) { 612 + case RDMA_CM_EVENT_ESTABLISHED: 613 + /* Accept complete */ 614 + dprintk("svcrdma: Connection completed on DTO xprt=%p, " 615 + "cm_id=%p\n", xprt, cma_id); 616 + clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 617 + svc_xprt_enqueue(xprt); 618 + break; 619 + case RDMA_CM_EVENT_DISCONNECTED: 620 + dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 621 + xprt, cma_id); 622 + if (xprt) { 623 + set_bit(XPT_CLOSE, &xprt->xpt_flags); 624 + svc_xprt_enqueue(xprt); 625 + } 626 + break; 627 + case RDMA_CM_EVENT_DEVICE_REMOVAL: 628 + dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 629 + "event=%d\n", cma_id, xprt, event->event); 630 + if (xprt) { 631 + set_bit(XPT_CLOSE, &xprt->xpt_flags); 632 + svc_xprt_enqueue(xprt); 633 + } 634 + break; 635 + default: 636 + dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 637 + "event=%d\n", cma_id, event->event); 638 + break; 639 + } 640 + return 0; 641 + } 642 + 643 + /* 644 + * Create a listening RDMA service endpoint. 645 + */ 646 + static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 647 + struct sockaddr *sa, int salen, 648 + int flags) 649 + { 650 + struct rdma_cm_id *listen_id; 651 + struct svcxprt_rdma *cma_xprt; 652 + struct svc_xprt *xprt; 653 + int ret; 654 + 655 + dprintk("svcrdma: Creating RDMA socket\n"); 656 + 657 + cma_xprt = rdma_create_xprt(serv, 1); 658 + if (!cma_xprt) 659 + return ERR_PTR(ENOMEM); 660 + xprt = &cma_xprt->sc_xprt; 661 + 662 + listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); 663 + if (IS_ERR(listen_id)) { 664 + rdma_destroy_xprt(cma_xprt); 665 + dprintk("svcrdma: rdma_create_id failed = %ld\n", 666 + PTR_ERR(listen_id)); 667 + return (void *)listen_id; 668 + } 669 + ret = rdma_bind_addr(listen_id, sa); 670 + if (ret) { 671 + rdma_destroy_xprt(cma_xprt); 672 + rdma_destroy_id(listen_id); 673 + dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 674 + return ERR_PTR(ret); 675 + } 676 + cma_xprt->sc_cm_id = listen_id; 677 + 678 + ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 679 + if (ret) { 680 + rdma_destroy_id(listen_id); 681 + rdma_destroy_xprt(cma_xprt); 682 + dprintk("svcrdma: rdma_listen failed = %d\n", ret); 683 + } 684 + 685 + /* 686 + * We need to use the address from the cm_id in case the 687 + * caller specified 0 for the port number. 688 + */ 689 + sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 690 + svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 691 + 692 + return &cma_xprt->sc_xprt; 693 + } 694 + 695 + /* 696 + * This is the xpo_recvfrom function for listening endpoints. Its 697 + * purpose is to accept incoming connections. The CMA callback handler 698 + * has already created a new transport and attached it to the new CMA 699 + * ID. 700 + * 701 + * There is a queue of pending connections hung on the listening 702 + * transport. This queue contains the new svc_xprt structure. This 703 + * function takes svc_xprt structures off the accept_q and completes 704 + * the connection. 705 + */ 706 + static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 707 + { 708 + struct svcxprt_rdma *listen_rdma; 709 + struct svcxprt_rdma *newxprt = NULL; 710 + struct rdma_conn_param conn_param; 711 + struct ib_qp_init_attr qp_attr; 712 + struct ib_device_attr devattr; 713 + struct sockaddr *sa; 714 + int ret; 715 + int i; 716 + 717 + listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 718 + clear_bit(XPT_CONN, &xprt->xpt_flags); 719 + /* Get the next entry off the accept list */ 720 + spin_lock_bh(&listen_rdma->sc_lock); 721 + if (!list_empty(&listen_rdma->sc_accept_q)) { 722 + newxprt = list_entry(listen_rdma->sc_accept_q.next, 723 + struct svcxprt_rdma, sc_accept_q); 724 + list_del_init(&newxprt->sc_accept_q); 725 + } 726 + if (!list_empty(&listen_rdma->sc_accept_q)) 727 + set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 728 + spin_unlock_bh(&listen_rdma->sc_lock); 729 + if (!newxprt) 730 + return NULL; 731 + 732 + dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 733 + newxprt, newxprt->sc_cm_id); 734 + 735 + ret = ib_query_device(newxprt->sc_cm_id->device, &devattr); 736 + if (ret) { 737 + dprintk("svcrdma: could not query device attributes on " 738 + "device %p, rc=%d\n", newxprt->sc_cm_id->device, ret); 739 + goto errout; 740 + } 741 + 742 + /* Qualify the transport resource defaults with the 743 + * capabilities of this particular device */ 744 + newxprt->sc_max_sge = min((size_t)devattr.max_sge, 745 + (size_t)RPCSVC_MAXPAGES); 746 + newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr, 747 + (size_t)svcrdma_max_requests); 748 + newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests; 749 + 750 + newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom, 751 + (size_t)svcrdma_ord); 752 + 753 + newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device); 754 + if (IS_ERR(newxprt->sc_pd)) { 755 + dprintk("svcrdma: error creating PD for connect request\n"); 756 + goto errout; 757 + } 758 + newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device, 759 + sq_comp_handler, 760 + cq_event_handler, 761 + newxprt, 762 + newxprt->sc_sq_depth, 763 + 0); 764 + if (IS_ERR(newxprt->sc_sq_cq)) { 765 + dprintk("svcrdma: error creating SQ CQ for connect request\n"); 766 + goto errout; 767 + } 768 + newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device, 769 + rq_comp_handler, 770 + cq_event_handler, 771 + newxprt, 772 + newxprt->sc_max_requests, 773 + 0); 774 + if (IS_ERR(newxprt->sc_rq_cq)) { 775 + dprintk("svcrdma: error creating RQ CQ for connect request\n"); 776 + goto errout; 777 + } 778 + 779 + memset(&qp_attr, 0, sizeof qp_attr); 780 + qp_attr.event_handler = qp_event_handler; 781 + qp_attr.qp_context = &newxprt->sc_xprt; 782 + qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 783 + qp_attr.cap.max_recv_wr = newxprt->sc_max_requests; 784 + qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 785 + qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 786 + qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 787 + qp_attr.qp_type = IB_QPT_RC; 788 + qp_attr.send_cq = newxprt->sc_sq_cq; 789 + qp_attr.recv_cq = newxprt->sc_rq_cq; 790 + dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n" 791 + " cm_id->device=%p, sc_pd->device=%p\n" 792 + " cap.max_send_wr = %d\n" 793 + " cap.max_recv_wr = %d\n" 794 + " cap.max_send_sge = %d\n" 795 + " cap.max_recv_sge = %d\n", 796 + newxprt->sc_cm_id, newxprt->sc_pd, 797 + newxprt->sc_cm_id->device, newxprt->sc_pd->device, 798 + qp_attr.cap.max_send_wr, 799 + qp_attr.cap.max_recv_wr, 800 + qp_attr.cap.max_send_sge, 801 + qp_attr.cap.max_recv_sge); 802 + 803 + ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 804 + if (ret) { 805 + /* 806 + * XXX: This is a hack. We need a xx_request_qp interface 807 + * that will adjust the qp_attr's with a best-effort 808 + * number 809 + */ 810 + qp_attr.cap.max_send_sge -= 2; 811 + qp_attr.cap.max_recv_sge -= 2; 812 + ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, 813 + &qp_attr); 814 + if (ret) { 815 + dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 816 + goto errout; 817 + } 818 + newxprt->sc_max_sge = qp_attr.cap.max_send_sge; 819 + newxprt->sc_max_sge = qp_attr.cap.max_recv_sge; 820 + newxprt->sc_sq_depth = qp_attr.cap.max_send_wr; 821 + newxprt->sc_max_requests = qp_attr.cap.max_recv_wr; 822 + } 823 + newxprt->sc_qp = newxprt->sc_cm_id->qp; 824 + 825 + /* Register all of physical memory */ 826 + newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd, 827 + IB_ACCESS_LOCAL_WRITE | 828 + IB_ACCESS_REMOTE_WRITE); 829 + if (IS_ERR(newxprt->sc_phys_mr)) { 830 + dprintk("svcrdma: Failed to create DMA MR ret=%d\n", ret); 831 + goto errout; 832 + } 833 + 834 + /* Post receive buffers */ 835 + for (i = 0; i < newxprt->sc_max_requests; i++) { 836 + ret = svc_rdma_post_recv(newxprt); 837 + if (ret) { 838 + dprintk("svcrdma: failure posting receive buffers\n"); 839 + goto errout; 840 + } 841 + } 842 + 843 + /* Swap out the handler */ 844 + newxprt->sc_cm_id->event_handler = rdma_cma_handler; 845 + 846 + /* Accept Connection */ 847 + set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 848 + memset(&conn_param, 0, sizeof conn_param); 849 + conn_param.responder_resources = 0; 850 + conn_param.initiator_depth = newxprt->sc_ord; 851 + ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 852 + if (ret) { 853 + dprintk("svcrdma: failed to accept new connection, ret=%d\n", 854 + ret); 855 + goto errout; 856 + } 857 + 858 + dprintk("svcrdma: new connection %p accepted with the following " 859 + "attributes:\n" 860 + " local_ip : %d.%d.%d.%d\n" 861 + " local_port : %d\n" 862 + " remote_ip : %d.%d.%d.%d\n" 863 + " remote_port : %d\n" 864 + " max_sge : %d\n" 865 + " sq_depth : %d\n" 866 + " max_requests : %d\n" 867 + " ord : %d\n", 868 + newxprt, 869 + NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id-> 870 + route.addr.src_addr)->sin_addr.s_addr), 871 + ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 872 + route.addr.src_addr)->sin_port), 873 + NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id-> 874 + route.addr.dst_addr)->sin_addr.s_addr), 875 + ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 876 + route.addr.dst_addr)->sin_port), 877 + newxprt->sc_max_sge, 878 + newxprt->sc_sq_depth, 879 + newxprt->sc_max_requests, 880 + newxprt->sc_ord); 881 + 882 + /* Set the local and remote addresses in the transport */ 883 + sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 884 + svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 885 + sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 886 + svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 887 + 888 + ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); 889 + ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP); 890 + return &newxprt->sc_xprt; 891 + 892 + errout: 893 + dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 894 + rdma_destroy_id(newxprt->sc_cm_id); 895 + rdma_destroy_xprt(newxprt); 896 + return NULL; 897 + } 898 + 899 + /* 900 + * Post an RQ WQE to the RQ when the rqst is being released. This 901 + * effectively returns an RQ credit to the client. The rq_xprt_ctxt 902 + * will be null if the request is deferred due to an RDMA_READ or the 903 + * transport had no data ready (EAGAIN). Note that an RPC deferred in 904 + * svc_process will still return the credit, this is because the data 905 + * is copied and no longer consume a WQE/WC. 906 + */ 907 + static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 908 + { 909 + int err; 910 + struct svcxprt_rdma *rdma = 911 + container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt); 912 + if (rqstp->rq_xprt_ctxt) { 913 + BUG_ON(rqstp->rq_xprt_ctxt != rdma); 914 + err = svc_rdma_post_recv(rdma); 915 + if (err) 916 + dprintk("svcrdma: failed to post an RQ WQE error=%d\n", 917 + err); 918 + } 919 + rqstp->rq_xprt_ctxt = NULL; 920 + } 921 + 922 + /* Disable data ready events for this connection */ 923 + static void svc_rdma_detach(struct svc_xprt *xprt) 924 + { 925 + struct svcxprt_rdma *rdma = 926 + container_of(xprt, struct svcxprt_rdma, sc_xprt); 927 + unsigned long flags; 928 + 929 + dprintk("svc: svc_rdma_detach(%p)\n", xprt); 930 + /* 931 + * Shutdown the connection. This will ensure we don't get any 932 + * more events from the provider. 933 + */ 934 + rdma_disconnect(rdma->sc_cm_id); 935 + rdma_destroy_id(rdma->sc_cm_id); 936 + 937 + /* We may already be on the DTO list */ 938 + spin_lock_irqsave(&dto_lock, flags); 939 + if (!list_empty(&rdma->sc_dto_q)) 940 + list_del_init(&rdma->sc_dto_q); 941 + spin_unlock_irqrestore(&dto_lock, flags); 942 + } 943 + 944 + static void svc_rdma_free(struct svc_xprt *xprt) 945 + { 946 + struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt; 947 + dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); 948 + rdma_destroy_xprt(rdma); 949 + kfree(rdma); 950 + } 951 + 952 + static void rdma_destroy_xprt(struct svcxprt_rdma *xprt) 953 + { 954 + if (xprt->sc_qp && !IS_ERR(xprt->sc_qp)) 955 + ib_destroy_qp(xprt->sc_qp); 956 + 957 + if (xprt->sc_sq_cq && !IS_ERR(xprt->sc_sq_cq)) 958 + ib_destroy_cq(xprt->sc_sq_cq); 959 + 960 + if (xprt->sc_rq_cq && !IS_ERR(xprt->sc_rq_cq)) 961 + ib_destroy_cq(xprt->sc_rq_cq); 962 + 963 + if (xprt->sc_phys_mr && !IS_ERR(xprt->sc_phys_mr)) 964 + ib_dereg_mr(xprt->sc_phys_mr); 965 + 966 + if (xprt->sc_pd && !IS_ERR(xprt->sc_pd)) 967 + ib_dealloc_pd(xprt->sc_pd); 968 + 969 + destroy_context_cache(xprt->sc_ctxt_head); 970 + } 971 + 972 + static int svc_rdma_has_wspace(struct svc_xprt *xprt) 973 + { 974 + struct svcxprt_rdma *rdma = 975 + container_of(xprt, struct svcxprt_rdma, sc_xprt); 976 + 977 + /* 978 + * If there are fewer SQ WR available than required to send a 979 + * simple response, return false. 980 + */ 981 + if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3)) 982 + return 0; 983 + 984 + /* 985 + * ...or there are already waiters on the SQ, 986 + * return false. 987 + */ 988 + if (waitqueue_active(&rdma->sc_send_wait)) 989 + return 0; 990 + 991 + /* Otherwise return true. */ 992 + return 1; 993 + } 994 + 995 + int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 996 + { 997 + struct ib_send_wr *bad_wr; 998 + int ret; 999 + 1000 + if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1001 + return 0; 1002 + 1003 + BUG_ON(wr->send_flags != IB_SEND_SIGNALED); 1004 + BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op != 1005 + wr->opcode); 1006 + /* If the SQ is full, wait until an SQ entry is available */ 1007 + while (1) { 1008 + spin_lock_bh(&xprt->sc_lock); 1009 + if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { 1010 + spin_unlock_bh(&xprt->sc_lock); 1011 + atomic_inc(&rdma_stat_sq_starve); 1012 + /* See if we can reap some SQ WR */ 1013 + sq_cq_reap(xprt); 1014 + 1015 + /* Wait until SQ WR available if SQ still full */ 1016 + wait_event(xprt->sc_send_wait, 1017 + atomic_read(&xprt->sc_sq_count) < 1018 + xprt->sc_sq_depth); 1019 + continue; 1020 + } 1021 + /* Bumped used SQ WR count and post */ 1022 + ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1023 + if (!ret) 1024 + atomic_inc(&xprt->sc_sq_count); 1025 + else 1026 + dprintk("svcrdma: failed to post SQ WR rc=%d, " 1027 + "sc_sq_count=%d, sc_sq_depth=%d\n", 1028 + ret, atomic_read(&xprt->sc_sq_count), 1029 + xprt->sc_sq_depth); 1030 + spin_unlock_bh(&xprt->sc_lock); 1031 + break; 1032 + } 1033 + return ret; 1034 + } 1035 + 1036 + int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, 1037 + enum rpcrdma_errcode err) 1038 + { 1039 + struct ib_send_wr err_wr; 1040 + struct ib_sge sge; 1041 + struct page *p; 1042 + struct svc_rdma_op_ctxt *ctxt; 1043 + u32 *va; 1044 + int length; 1045 + int ret; 1046 + 1047 + p = svc_rdma_get_page(); 1048 + va = page_address(p); 1049 + 1050 + /* XDR encode error */ 1051 + length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va); 1052 + 1053 + /* Prepare SGE for local address */ 1054 + sge.addr = ib_dma_map_page(xprt->sc_cm_id->device, 1055 + p, 0, PAGE_SIZE, DMA_FROM_DEVICE); 1056 + sge.lkey = xprt->sc_phys_mr->lkey; 1057 + sge.length = length; 1058 + 1059 + ctxt = svc_rdma_get_context(xprt); 1060 + ctxt->count = 1; 1061 + ctxt->pages[0] = p; 1062 + 1063 + /* Prepare SEND WR */ 1064 + memset(&err_wr, 0, sizeof err_wr); 1065 + ctxt->wr_op = IB_WR_SEND; 1066 + err_wr.wr_id = (unsigned long)ctxt; 1067 + err_wr.sg_list = &sge; 1068 + err_wr.num_sge = 1; 1069 + err_wr.opcode = IB_WR_SEND; 1070 + err_wr.send_flags = IB_SEND_SIGNALED; 1071 + 1072 + /* Post It */ 1073 + ret = svc_rdma_send(xprt, &err_wr); 1074 + if (ret) { 1075 + dprintk("svcrdma: Error posting send = %d\n", ret); 1076 + svc_rdma_put_context(ctxt, 1); 1077 + } 1078 + 1079 + return ret; 1080 + }