Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.26-rc3 1077 lines 29 kB view raw
1/* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7#include <linux/sched.h> 8#include <linux/errno.h> 9#include <linux/fcntl.h> 10#include <linux/net.h> 11#include <linux/in.h> 12#include <linux/inet.h> 13#include <linux/udp.h> 14#include <linux/tcp.h> 15#include <linux/unistd.h> 16#include <linux/slab.h> 17#include <linux/netdevice.h> 18#include <linux/skbuff.h> 19#include <linux/file.h> 20#include <linux/freezer.h> 21#include <linux/kthread.h> 22#include <net/sock.h> 23#include <net/checksum.h> 24#include <net/ip.h> 25#include <net/ipv6.h> 26#include <net/tcp_states.h> 27#include <linux/uaccess.h> 28#include <asm/ioctls.h> 29 30#include <linux/sunrpc/types.h> 31#include <linux/sunrpc/clnt.h> 32#include <linux/sunrpc/xdr.h> 33#include <linux/sunrpc/stats.h> 34#include <linux/sunrpc/svc_xprt.h> 35 36#define RPCDBG_FACILITY RPCDBG_SVCXPRT 37 38static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 39static int svc_deferred_recv(struct svc_rqst *rqstp); 40static struct cache_deferred_req *svc_defer(struct cache_req *req); 41static void svc_age_temp_xprts(unsigned long closure); 42 43/* apparently the "standard" is that clients close 44 * idle connections after 5 minutes, servers after 45 * 6 minutes 46 * http://www.connectathon.org/talks96/nfstcp.pdf 47 */ 48static int svc_conn_age_period = 6*60; 49 50/* List of registered transport classes */ 51static DEFINE_SPINLOCK(svc_xprt_class_lock); 52static LIST_HEAD(svc_xprt_class_list); 53 54/* SMP locking strategy: 55 * 56 * svc_pool->sp_lock protects most of the fields of that pool. 57 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 58 * when both need to be taken (rare), svc_serv->sv_lock is first. 59 * BKL protects svc_serv->sv_nrthread. 60 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 61 * and the ->sk_info_authunix cache. 62 * 63 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 64 * enqueued multiply. During normal transport processing this bit 65 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 66 * Providers should not manipulate this bit directly. 67 * 68 * Some flags can be set to certain values at any time 69 * providing that certain rules are followed: 70 * 71 * XPT_CONN, XPT_DATA: 72 * - Can be set or cleared at any time. 73 * - After a set, svc_xprt_enqueue must be called to enqueue 74 * the transport for processing. 75 * - After a clear, the transport must be read/accepted. 76 * If this succeeds, it must be set again. 77 * XPT_CLOSE: 78 * - Can set at any time. It is never cleared. 79 * XPT_DEAD: 80 * - Can only be set while XPT_BUSY is held which ensures 81 * that no other thread will be using the transport or will 82 * try to set XPT_DEAD. 83 */ 84 85int svc_reg_xprt_class(struct svc_xprt_class *xcl) 86{ 87 struct svc_xprt_class *cl; 88 int res = -EEXIST; 89 90 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 91 92 INIT_LIST_HEAD(&xcl->xcl_list); 93 spin_lock(&svc_xprt_class_lock); 94 /* Make sure there isn't already a class with the same name */ 95 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 96 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 97 goto out; 98 } 99 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 100 res = 0; 101out: 102 spin_unlock(&svc_xprt_class_lock); 103 return res; 104} 105EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 106 107void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 108{ 109 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 110 spin_lock(&svc_xprt_class_lock); 111 list_del_init(&xcl->xcl_list); 112 spin_unlock(&svc_xprt_class_lock); 113} 114EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 115 116/* 117 * Format the transport list for printing 118 */ 119int svc_print_xprts(char *buf, int maxlen) 120{ 121 struct list_head *le; 122 char tmpstr[80]; 123 int len = 0; 124 buf[0] = '\0'; 125 126 spin_lock(&svc_xprt_class_lock); 127 list_for_each(le, &svc_xprt_class_list) { 128 int slen; 129 struct svc_xprt_class *xcl = 130 list_entry(le, struct svc_xprt_class, xcl_list); 131 132 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 133 slen = strlen(tmpstr); 134 if (len + slen > maxlen) 135 break; 136 len += slen; 137 strcat(buf, tmpstr); 138 } 139 spin_unlock(&svc_xprt_class_lock); 140 141 return len; 142} 143 144static void svc_xprt_free(struct kref *kref) 145{ 146 struct svc_xprt *xprt = 147 container_of(kref, struct svc_xprt, xpt_ref); 148 struct module *owner = xprt->xpt_class->xcl_owner; 149 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags) 150 && xprt->xpt_auth_cache != NULL) 151 svcauth_unix_info_release(xprt->xpt_auth_cache); 152 xprt->xpt_ops->xpo_free(xprt); 153 module_put(owner); 154} 155 156void svc_xprt_put(struct svc_xprt *xprt) 157{ 158 kref_put(&xprt->xpt_ref, svc_xprt_free); 159} 160EXPORT_SYMBOL_GPL(svc_xprt_put); 161 162/* 163 * Called by transport drivers to initialize the transport independent 164 * portion of the transport instance. 165 */ 166void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 167 struct svc_serv *serv) 168{ 169 memset(xprt, 0, sizeof(*xprt)); 170 xprt->xpt_class = xcl; 171 xprt->xpt_ops = xcl->xcl_ops; 172 kref_init(&xprt->xpt_ref); 173 xprt->xpt_server = serv; 174 INIT_LIST_HEAD(&xprt->xpt_list); 175 INIT_LIST_HEAD(&xprt->xpt_ready); 176 INIT_LIST_HEAD(&xprt->xpt_deferred); 177 mutex_init(&xprt->xpt_mutex); 178 spin_lock_init(&xprt->xpt_lock); 179 set_bit(XPT_BUSY, &xprt->xpt_flags); 180} 181EXPORT_SYMBOL_GPL(svc_xprt_init); 182 183int svc_create_xprt(struct svc_serv *serv, char *xprt_name, unsigned short port, 184 int flags) 185{ 186 struct svc_xprt_class *xcl; 187 struct sockaddr_in sin = { 188 .sin_family = AF_INET, 189 .sin_addr.s_addr = htonl(INADDR_ANY), 190 .sin_port = htons(port), 191 }; 192 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 193 spin_lock(&svc_xprt_class_lock); 194 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 195 struct svc_xprt *newxprt; 196 197 if (strcmp(xprt_name, xcl->xcl_name)) 198 continue; 199 200 if (!try_module_get(xcl->xcl_owner)) 201 goto err; 202 203 spin_unlock(&svc_xprt_class_lock); 204 newxprt = xcl->xcl_ops-> 205 xpo_create(serv, (struct sockaddr *)&sin, sizeof(sin), 206 flags); 207 if (IS_ERR(newxprt)) { 208 module_put(xcl->xcl_owner); 209 return PTR_ERR(newxprt); 210 } 211 212 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 213 spin_lock_bh(&serv->sv_lock); 214 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 215 spin_unlock_bh(&serv->sv_lock); 216 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 217 return svc_xprt_local_port(newxprt); 218 } 219 err: 220 spin_unlock(&svc_xprt_class_lock); 221 dprintk("svc: transport %s not found\n", xprt_name); 222 return -ENOENT; 223} 224EXPORT_SYMBOL_GPL(svc_create_xprt); 225 226/* 227 * Copy the local and remote xprt addresses to the rqstp structure 228 */ 229void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 230{ 231 struct sockaddr *sin; 232 233 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 234 rqstp->rq_addrlen = xprt->xpt_remotelen; 235 236 /* 237 * Destination address in request is needed for binding the 238 * source address in RPC replies/callbacks later. 239 */ 240 sin = (struct sockaddr *)&xprt->xpt_local; 241 switch (sin->sa_family) { 242 case AF_INET: 243 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 244 break; 245 case AF_INET6: 246 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 247 break; 248 } 249} 250EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 251 252/** 253 * svc_print_addr - Format rq_addr field for printing 254 * @rqstp: svc_rqst struct containing address to print 255 * @buf: target buffer for formatted address 256 * @len: length of target buffer 257 * 258 */ 259char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 260{ 261 return __svc_print_addr(svc_addr(rqstp), buf, len); 262} 263EXPORT_SYMBOL_GPL(svc_print_addr); 264 265/* 266 * Queue up an idle server thread. Must have pool->sp_lock held. 267 * Note: this is really a stack rather than a queue, so that we only 268 * use as many different threads as we need, and the rest don't pollute 269 * the cache. 270 */ 271static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 272{ 273 list_add(&rqstp->rq_list, &pool->sp_threads); 274} 275 276/* 277 * Dequeue an nfsd thread. Must have pool->sp_lock held. 278 */ 279static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 280{ 281 list_del(&rqstp->rq_list); 282} 283 284/* 285 * Queue up a transport with data pending. If there are idle nfsd 286 * processes, wake 'em up. 287 * 288 */ 289void svc_xprt_enqueue(struct svc_xprt *xprt) 290{ 291 struct svc_serv *serv = xprt->xpt_server; 292 struct svc_pool *pool; 293 struct svc_rqst *rqstp; 294 int cpu; 295 296 if (!(xprt->xpt_flags & 297 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 298 return; 299 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 300 return; 301 302 cpu = get_cpu(); 303 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 304 put_cpu(); 305 306 spin_lock_bh(&pool->sp_lock); 307 308 if (!list_empty(&pool->sp_threads) && 309 !list_empty(&pool->sp_sockets)) 310 printk(KERN_ERR 311 "svc_xprt_enqueue: " 312 "threads and transports both waiting??\n"); 313 314 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) { 315 /* Don't enqueue dead transports */ 316 dprintk("svc: transport %p is dead, not enqueued\n", xprt); 317 goto out_unlock; 318 } 319 320 /* Mark transport as busy. It will remain in this state until 321 * the provider calls svc_xprt_received. We update XPT_BUSY 322 * atomically because it also guards against trying to enqueue 323 * the transport twice. 324 */ 325 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 326 /* Don't enqueue transport while already enqueued */ 327 dprintk("svc: transport %p busy, not enqueued\n", xprt); 328 goto out_unlock; 329 } 330 BUG_ON(xprt->xpt_pool != NULL); 331 xprt->xpt_pool = pool; 332 333 /* Handle pending connection */ 334 if (test_bit(XPT_CONN, &xprt->xpt_flags)) 335 goto process; 336 337 /* Handle close in-progress */ 338 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 339 goto process; 340 341 /* Check if we have space to reply to a request */ 342 if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { 343 /* Don't enqueue while not enough space for reply */ 344 dprintk("svc: no write space, transport %p not enqueued\n", 345 xprt); 346 xprt->xpt_pool = NULL; 347 clear_bit(XPT_BUSY, &xprt->xpt_flags); 348 goto out_unlock; 349 } 350 351 process: 352 if (!list_empty(&pool->sp_threads)) { 353 rqstp = list_entry(pool->sp_threads.next, 354 struct svc_rqst, 355 rq_list); 356 dprintk("svc: transport %p served by daemon %p\n", 357 xprt, rqstp); 358 svc_thread_dequeue(pool, rqstp); 359 if (rqstp->rq_xprt) 360 printk(KERN_ERR 361 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 362 rqstp, rqstp->rq_xprt); 363 rqstp->rq_xprt = xprt; 364 svc_xprt_get(xprt); 365 rqstp->rq_reserved = serv->sv_max_mesg; 366 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 367 BUG_ON(xprt->xpt_pool != pool); 368 wake_up(&rqstp->rq_wait); 369 } else { 370 dprintk("svc: transport %p put into queue\n", xprt); 371 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 372 BUG_ON(xprt->xpt_pool != pool); 373 } 374 375out_unlock: 376 spin_unlock_bh(&pool->sp_lock); 377} 378EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 379 380/* 381 * Dequeue the first transport. Must be called with the pool->sp_lock held. 382 */ 383static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 384{ 385 struct svc_xprt *xprt; 386 387 if (list_empty(&pool->sp_sockets)) 388 return NULL; 389 390 xprt = list_entry(pool->sp_sockets.next, 391 struct svc_xprt, xpt_ready); 392 list_del_init(&xprt->xpt_ready); 393 394 dprintk("svc: transport %p dequeued, inuse=%d\n", 395 xprt, atomic_read(&xprt->xpt_ref.refcount)); 396 397 return xprt; 398} 399 400/* 401 * svc_xprt_received conditionally queues the transport for processing 402 * by another thread. The caller must hold the XPT_BUSY bit and must 403 * not thereafter touch transport data. 404 * 405 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 406 * insufficient) data. 407 */ 408void svc_xprt_received(struct svc_xprt *xprt) 409{ 410 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 411 xprt->xpt_pool = NULL; 412 clear_bit(XPT_BUSY, &xprt->xpt_flags); 413 svc_xprt_enqueue(xprt); 414} 415EXPORT_SYMBOL_GPL(svc_xprt_received); 416 417/** 418 * svc_reserve - change the space reserved for the reply to a request. 419 * @rqstp: The request in question 420 * @space: new max space to reserve 421 * 422 * Each request reserves some space on the output queue of the transport 423 * to make sure the reply fits. This function reduces that reserved 424 * space to be the amount of space used already, plus @space. 425 * 426 */ 427void svc_reserve(struct svc_rqst *rqstp, int space) 428{ 429 space += rqstp->rq_res.head[0].iov_len; 430 431 if (space < rqstp->rq_reserved) { 432 struct svc_xprt *xprt = rqstp->rq_xprt; 433 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 434 rqstp->rq_reserved = space; 435 436 svc_xprt_enqueue(xprt); 437 } 438} 439EXPORT_SYMBOL(svc_reserve); 440 441static void svc_xprt_release(struct svc_rqst *rqstp) 442{ 443 struct svc_xprt *xprt = rqstp->rq_xprt; 444 445 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 446 447 svc_free_res_pages(rqstp); 448 rqstp->rq_res.page_len = 0; 449 rqstp->rq_res.page_base = 0; 450 451 /* Reset response buffer and release 452 * the reservation. 453 * But first, check that enough space was reserved 454 * for the reply, otherwise we have a bug! 455 */ 456 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 457 printk(KERN_ERR "RPC request reserved %d but used %d\n", 458 rqstp->rq_reserved, 459 rqstp->rq_res.len); 460 461 rqstp->rq_res.head[0].iov_len = 0; 462 svc_reserve(rqstp, 0); 463 rqstp->rq_xprt = NULL; 464 465 svc_xprt_put(xprt); 466} 467 468/* 469 * External function to wake up a server waiting for data 470 * This really only makes sense for services like lockd 471 * which have exactly one thread anyway. 472 */ 473void svc_wake_up(struct svc_serv *serv) 474{ 475 struct svc_rqst *rqstp; 476 unsigned int i; 477 struct svc_pool *pool; 478 479 for (i = 0; i < serv->sv_nrpools; i++) { 480 pool = &serv->sv_pools[i]; 481 482 spin_lock_bh(&pool->sp_lock); 483 if (!list_empty(&pool->sp_threads)) { 484 rqstp = list_entry(pool->sp_threads.next, 485 struct svc_rqst, 486 rq_list); 487 dprintk("svc: daemon %p woken up.\n", rqstp); 488 /* 489 svc_thread_dequeue(pool, rqstp); 490 rqstp->rq_xprt = NULL; 491 */ 492 wake_up(&rqstp->rq_wait); 493 } 494 spin_unlock_bh(&pool->sp_lock); 495 } 496} 497EXPORT_SYMBOL(svc_wake_up); 498 499int svc_port_is_privileged(struct sockaddr *sin) 500{ 501 switch (sin->sa_family) { 502 case AF_INET: 503 return ntohs(((struct sockaddr_in *)sin)->sin_port) 504 < PROT_SOCK; 505 case AF_INET6: 506 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 507 < PROT_SOCK; 508 default: 509 return 0; 510 } 511} 512 513/* 514 * Make sure that we don't have too many active connections. If we 515 * have, something must be dropped. 516 * 517 * There's no point in trying to do random drop here for DoS 518 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 519 * attacker can easily beat that. 520 * 521 * The only somewhat efficient mechanism would be if drop old 522 * connections from the same IP first. But right now we don't even 523 * record the client IP in svc_sock. 524 */ 525static void svc_check_conn_limits(struct svc_serv *serv) 526{ 527 if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) { 528 struct svc_xprt *xprt = NULL; 529 spin_lock_bh(&serv->sv_lock); 530 if (!list_empty(&serv->sv_tempsocks)) { 531 if (net_ratelimit()) { 532 /* Try to help the admin */ 533 printk(KERN_NOTICE "%s: too many open " 534 "connections, consider increasing the " 535 "number of nfsd threads\n", 536 serv->sv_name); 537 } 538 /* 539 * Always select the oldest connection. It's not fair, 540 * but so is life 541 */ 542 xprt = list_entry(serv->sv_tempsocks.prev, 543 struct svc_xprt, 544 xpt_list); 545 set_bit(XPT_CLOSE, &xprt->xpt_flags); 546 svc_xprt_get(xprt); 547 } 548 spin_unlock_bh(&serv->sv_lock); 549 550 if (xprt) { 551 svc_xprt_enqueue(xprt); 552 svc_xprt_put(xprt); 553 } 554 } 555} 556 557/* 558 * Receive the next request on any transport. This code is carefully 559 * organised not to touch any cachelines in the shared svc_serv 560 * structure, only cachelines in the local svc_pool. 561 */ 562int svc_recv(struct svc_rqst *rqstp, long timeout) 563{ 564 struct svc_xprt *xprt = NULL; 565 struct svc_serv *serv = rqstp->rq_server; 566 struct svc_pool *pool = rqstp->rq_pool; 567 int len, i; 568 int pages; 569 struct xdr_buf *arg; 570 DECLARE_WAITQUEUE(wait, current); 571 572 dprintk("svc: server %p waiting for data (to = %ld)\n", 573 rqstp, timeout); 574 575 if (rqstp->rq_xprt) 576 printk(KERN_ERR 577 "svc_recv: service %p, transport not NULL!\n", 578 rqstp); 579 if (waitqueue_active(&rqstp->rq_wait)) 580 printk(KERN_ERR 581 "svc_recv: service %p, wait queue active!\n", 582 rqstp); 583 584 /* now allocate needed pages. If we get a failure, sleep briefly */ 585 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 586 for (i = 0; i < pages ; i++) 587 while (rqstp->rq_pages[i] == NULL) { 588 struct page *p = alloc_page(GFP_KERNEL); 589 if (!p) { 590 set_current_state(TASK_INTERRUPTIBLE); 591 if (signalled() || kthread_should_stop()) { 592 set_current_state(TASK_RUNNING); 593 return -EINTR; 594 } 595 schedule_timeout(msecs_to_jiffies(500)); 596 } 597 rqstp->rq_pages[i] = p; 598 } 599 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 600 BUG_ON(pages >= RPCSVC_MAXPAGES); 601 602 /* Make arg->head point to first page and arg->pages point to rest */ 603 arg = &rqstp->rq_arg; 604 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 605 arg->head[0].iov_len = PAGE_SIZE; 606 arg->pages = rqstp->rq_pages + 1; 607 arg->page_base = 0; 608 /* save at least one page for response */ 609 arg->page_len = (pages-2)*PAGE_SIZE; 610 arg->len = (pages-1)*PAGE_SIZE; 611 arg->tail[0].iov_len = 0; 612 613 try_to_freeze(); 614 cond_resched(); 615 if (signalled() || kthread_should_stop()) 616 return -EINTR; 617 618 spin_lock_bh(&pool->sp_lock); 619 xprt = svc_xprt_dequeue(pool); 620 if (xprt) { 621 rqstp->rq_xprt = xprt; 622 svc_xprt_get(xprt); 623 rqstp->rq_reserved = serv->sv_max_mesg; 624 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 625 } else { 626 /* No data pending. Go to sleep */ 627 svc_thread_enqueue(pool, rqstp); 628 629 /* 630 * We have to be able to interrupt this wait 631 * to bring down the daemons ... 632 */ 633 set_current_state(TASK_INTERRUPTIBLE); 634 635 /* 636 * checking kthread_should_stop() here allows us to avoid 637 * locking and signalling when stopping kthreads that call 638 * svc_recv. If the thread has already been woken up, then 639 * we can exit here without sleeping. If not, then it 640 * it'll be woken up quickly during the schedule_timeout 641 */ 642 if (kthread_should_stop()) { 643 set_current_state(TASK_RUNNING); 644 spin_unlock_bh(&pool->sp_lock); 645 return -EINTR; 646 } 647 648 add_wait_queue(&rqstp->rq_wait, &wait); 649 spin_unlock_bh(&pool->sp_lock); 650 651 schedule_timeout(timeout); 652 653 try_to_freeze(); 654 655 spin_lock_bh(&pool->sp_lock); 656 remove_wait_queue(&rqstp->rq_wait, &wait); 657 658 xprt = rqstp->rq_xprt; 659 if (!xprt) { 660 svc_thread_dequeue(pool, rqstp); 661 spin_unlock_bh(&pool->sp_lock); 662 dprintk("svc: server %p, no data yet\n", rqstp); 663 if (signalled() || kthread_should_stop()) 664 return -EINTR; 665 else 666 return -EAGAIN; 667 } 668 } 669 spin_unlock_bh(&pool->sp_lock); 670 671 len = 0; 672 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 673 dprintk("svc_recv: found XPT_CLOSE\n"); 674 svc_delete_xprt(xprt); 675 } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 676 struct svc_xprt *newxpt; 677 newxpt = xprt->xpt_ops->xpo_accept(xprt); 678 if (newxpt) { 679 /* 680 * We know this module_get will succeed because the 681 * listener holds a reference too 682 */ 683 __module_get(newxpt->xpt_class->xcl_owner); 684 svc_check_conn_limits(xprt->xpt_server); 685 spin_lock_bh(&serv->sv_lock); 686 set_bit(XPT_TEMP, &newxpt->xpt_flags); 687 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 688 serv->sv_tmpcnt++; 689 if (serv->sv_temptimer.function == NULL) { 690 /* setup timer to age temp transports */ 691 setup_timer(&serv->sv_temptimer, 692 svc_age_temp_xprts, 693 (unsigned long)serv); 694 mod_timer(&serv->sv_temptimer, 695 jiffies + svc_conn_age_period * HZ); 696 } 697 spin_unlock_bh(&serv->sv_lock); 698 svc_xprt_received(newxpt); 699 } 700 svc_xprt_received(xprt); 701 } else { 702 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 703 rqstp, pool->sp_id, xprt, 704 atomic_read(&xprt->xpt_ref.refcount)); 705 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 706 if (rqstp->rq_deferred) { 707 svc_xprt_received(xprt); 708 len = svc_deferred_recv(rqstp); 709 } else 710 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 711 dprintk("svc: got len=%d\n", len); 712 } 713 714 /* No data, incomplete (TCP) read, or accept() */ 715 if (len == 0 || len == -EAGAIN) { 716 rqstp->rq_res.len = 0; 717 svc_xprt_release(rqstp); 718 return -EAGAIN; 719 } 720 clear_bit(XPT_OLD, &xprt->xpt_flags); 721 722 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 723 rqstp->rq_chandle.defer = svc_defer; 724 725 if (serv->sv_stats) 726 serv->sv_stats->netcnt++; 727 return len; 728} 729EXPORT_SYMBOL(svc_recv); 730 731/* 732 * Drop request 733 */ 734void svc_drop(struct svc_rqst *rqstp) 735{ 736 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 737 svc_xprt_release(rqstp); 738} 739EXPORT_SYMBOL(svc_drop); 740 741/* 742 * Return reply to client. 743 */ 744int svc_send(struct svc_rqst *rqstp) 745{ 746 struct svc_xprt *xprt; 747 int len; 748 struct xdr_buf *xb; 749 750 xprt = rqstp->rq_xprt; 751 if (!xprt) 752 return -EFAULT; 753 754 /* release the receive skb before sending the reply */ 755 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 756 757 /* calculate over-all length */ 758 xb = &rqstp->rq_res; 759 xb->len = xb->head[0].iov_len + 760 xb->page_len + 761 xb->tail[0].iov_len; 762 763 /* Grab mutex to serialize outgoing data. */ 764 mutex_lock(&xprt->xpt_mutex); 765 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 766 len = -ENOTCONN; 767 else 768 len = xprt->xpt_ops->xpo_sendto(rqstp); 769 mutex_unlock(&xprt->xpt_mutex); 770 svc_xprt_release(rqstp); 771 772 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 773 return 0; 774 return len; 775} 776 777/* 778 * Timer function to close old temporary transports, using 779 * a mark-and-sweep algorithm. 780 */ 781static void svc_age_temp_xprts(unsigned long closure) 782{ 783 struct svc_serv *serv = (struct svc_serv *)closure; 784 struct svc_xprt *xprt; 785 struct list_head *le, *next; 786 LIST_HEAD(to_be_aged); 787 788 dprintk("svc_age_temp_xprts\n"); 789 790 if (!spin_trylock_bh(&serv->sv_lock)) { 791 /* busy, try again 1 sec later */ 792 dprintk("svc_age_temp_xprts: busy\n"); 793 mod_timer(&serv->sv_temptimer, jiffies + HZ); 794 return; 795 } 796 797 list_for_each_safe(le, next, &serv->sv_tempsocks) { 798 xprt = list_entry(le, struct svc_xprt, xpt_list); 799 800 /* First time through, just mark it OLD. Second time 801 * through, close it. */ 802 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 803 continue; 804 if (atomic_read(&xprt->xpt_ref.refcount) > 1 805 || test_bit(XPT_BUSY, &xprt->xpt_flags)) 806 continue; 807 svc_xprt_get(xprt); 808 list_move(le, &to_be_aged); 809 set_bit(XPT_CLOSE, &xprt->xpt_flags); 810 set_bit(XPT_DETACHED, &xprt->xpt_flags); 811 } 812 spin_unlock_bh(&serv->sv_lock); 813 814 while (!list_empty(&to_be_aged)) { 815 le = to_be_aged.next; 816 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 817 list_del_init(le); 818 xprt = list_entry(le, struct svc_xprt, xpt_list); 819 820 dprintk("queuing xprt %p for closing\n", xprt); 821 822 /* a thread will dequeue and close it soon */ 823 svc_xprt_enqueue(xprt); 824 svc_xprt_put(xprt); 825 } 826 827 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 828} 829 830/* 831 * Remove a dead transport 832 */ 833void svc_delete_xprt(struct svc_xprt *xprt) 834{ 835 struct svc_serv *serv = xprt->xpt_server; 836 837 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 838 xprt->xpt_ops->xpo_detach(xprt); 839 840 spin_lock_bh(&serv->sv_lock); 841 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 842 list_del_init(&xprt->xpt_list); 843 /* 844 * We used to delete the transport from whichever list 845 * it's sk_xprt.xpt_ready node was on, but we don't actually 846 * need to. This is because the only time we're called 847 * while still attached to a queue, the queue itself 848 * is about to be destroyed (in svc_destroy). 849 */ 850 if (!test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) { 851 BUG_ON(atomic_read(&xprt->xpt_ref.refcount) < 2); 852 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 853 serv->sv_tmpcnt--; 854 svc_xprt_put(xprt); 855 } 856 spin_unlock_bh(&serv->sv_lock); 857} 858 859void svc_close_xprt(struct svc_xprt *xprt) 860{ 861 set_bit(XPT_CLOSE, &xprt->xpt_flags); 862 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 863 /* someone else will have to effect the close */ 864 return; 865 866 svc_xprt_get(xprt); 867 svc_delete_xprt(xprt); 868 clear_bit(XPT_BUSY, &xprt->xpt_flags); 869 svc_xprt_put(xprt); 870} 871EXPORT_SYMBOL_GPL(svc_close_xprt); 872 873void svc_close_all(struct list_head *xprt_list) 874{ 875 struct svc_xprt *xprt; 876 struct svc_xprt *tmp; 877 878 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 879 set_bit(XPT_CLOSE, &xprt->xpt_flags); 880 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 881 /* Waiting to be processed, but no threads left, 882 * So just remove it from the waiting list 883 */ 884 list_del_init(&xprt->xpt_ready); 885 clear_bit(XPT_BUSY, &xprt->xpt_flags); 886 } 887 svc_close_xprt(xprt); 888 } 889} 890 891/* 892 * Handle defer and revisit of requests 893 */ 894 895static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 896{ 897 struct svc_deferred_req *dr = 898 container_of(dreq, struct svc_deferred_req, handle); 899 struct svc_xprt *xprt = dr->xprt; 900 901 if (too_many) { 902 svc_xprt_put(xprt); 903 kfree(dr); 904 return; 905 } 906 dprintk("revisit queued\n"); 907 dr->xprt = NULL; 908 spin_lock(&xprt->xpt_lock); 909 list_add(&dr->handle.recent, &xprt->xpt_deferred); 910 spin_unlock(&xprt->xpt_lock); 911 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 912 svc_xprt_enqueue(xprt); 913 svc_xprt_put(xprt); 914} 915 916/* 917 * Save the request off for later processing. The request buffer looks 918 * like this: 919 * 920 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 921 * 922 * This code can only handle requests that consist of an xprt-header 923 * and rpc-header. 924 */ 925static struct cache_deferred_req *svc_defer(struct cache_req *req) 926{ 927 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 928 struct svc_deferred_req *dr; 929 930 if (rqstp->rq_arg.page_len) 931 return NULL; /* if more than a page, give up FIXME */ 932 if (rqstp->rq_deferred) { 933 dr = rqstp->rq_deferred; 934 rqstp->rq_deferred = NULL; 935 } else { 936 size_t skip; 937 size_t size; 938 /* FIXME maybe discard if size too large */ 939 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 940 dr = kmalloc(size, GFP_KERNEL); 941 if (dr == NULL) 942 return NULL; 943 944 dr->handle.owner = rqstp->rq_server; 945 dr->prot = rqstp->rq_prot; 946 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 947 dr->addrlen = rqstp->rq_addrlen; 948 dr->daddr = rqstp->rq_daddr; 949 dr->argslen = rqstp->rq_arg.len >> 2; 950 dr->xprt_hlen = rqstp->rq_xprt_hlen; 951 952 /* back up head to the start of the buffer and copy */ 953 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 954 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 955 dr->argslen << 2); 956 } 957 svc_xprt_get(rqstp->rq_xprt); 958 dr->xprt = rqstp->rq_xprt; 959 960 dr->handle.revisit = svc_revisit; 961 return &dr->handle; 962} 963 964/* 965 * recv data from a deferred request into an active one 966 */ 967static int svc_deferred_recv(struct svc_rqst *rqstp) 968{ 969 struct svc_deferred_req *dr = rqstp->rq_deferred; 970 971 /* setup iov_base past transport header */ 972 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 973 /* The iov_len does not include the transport header bytes */ 974 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 975 rqstp->rq_arg.page_len = 0; 976 /* The rq_arg.len includes the transport header bytes */ 977 rqstp->rq_arg.len = dr->argslen<<2; 978 rqstp->rq_prot = dr->prot; 979 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 980 rqstp->rq_addrlen = dr->addrlen; 981 /* Save off transport header len in case we get deferred again */ 982 rqstp->rq_xprt_hlen = dr->xprt_hlen; 983 rqstp->rq_daddr = dr->daddr; 984 rqstp->rq_respages = rqstp->rq_pages; 985 return (dr->argslen<<2) - dr->xprt_hlen; 986} 987 988 989static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 990{ 991 struct svc_deferred_req *dr = NULL; 992 993 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 994 return NULL; 995 spin_lock(&xprt->xpt_lock); 996 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 997 if (!list_empty(&xprt->xpt_deferred)) { 998 dr = list_entry(xprt->xpt_deferred.next, 999 struct svc_deferred_req, 1000 handle.recent); 1001 list_del_init(&dr->handle.recent); 1002 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 1003 } 1004 spin_unlock(&xprt->xpt_lock); 1005 return dr; 1006} 1007 1008/* 1009 * Return the transport instance pointer for the endpoint accepting 1010 * connections/peer traffic from the specified transport class, 1011 * address family and port. 1012 * 1013 * Specifying 0 for the address family or port is effectively a 1014 * wild-card, and will result in matching the first transport in the 1015 * service's list that has a matching class name. 1016 */ 1017struct svc_xprt *svc_find_xprt(struct svc_serv *serv, char *xcl_name, 1018 int af, int port) 1019{ 1020 struct svc_xprt *xprt; 1021 struct svc_xprt *found = NULL; 1022 1023 /* Sanity check the args */ 1024 if (!serv || !xcl_name) 1025 return found; 1026 1027 spin_lock_bh(&serv->sv_lock); 1028 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1029 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1030 continue; 1031 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1032 continue; 1033 if (port && port != svc_xprt_local_port(xprt)) 1034 continue; 1035 found = xprt; 1036 svc_xprt_get(xprt); 1037 break; 1038 } 1039 spin_unlock_bh(&serv->sv_lock); 1040 return found; 1041} 1042EXPORT_SYMBOL_GPL(svc_find_xprt); 1043 1044/* 1045 * Format a buffer with a list of the active transports. A zero for 1046 * the buflen parameter disables target buffer overflow checking. 1047 */ 1048int svc_xprt_names(struct svc_serv *serv, char *buf, int buflen) 1049{ 1050 struct svc_xprt *xprt; 1051 char xprt_str[64]; 1052 int totlen = 0; 1053 int len; 1054 1055 /* Sanity check args */ 1056 if (!serv) 1057 return 0; 1058 1059 spin_lock_bh(&serv->sv_lock); 1060 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1061 len = snprintf(xprt_str, sizeof(xprt_str), 1062 "%s %d\n", xprt->xpt_class->xcl_name, 1063 svc_xprt_local_port(xprt)); 1064 /* If the string was truncated, replace with error string */ 1065 if (len >= sizeof(xprt_str)) 1066 strcpy(xprt_str, "name-too-long\n"); 1067 /* Don't overflow buffer */ 1068 len = strlen(xprt_str); 1069 if (buflen && (len + totlen >= buflen)) 1070 break; 1071 strcpy(buf+totlen, xprt_str); 1072 totlen += len; 1073 } 1074 spin_unlock_bh(&serv->sv_lock); 1075 return totlen; 1076} 1077EXPORT_SYMBOL_GPL(svc_xprt_names);