Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.19 1441 lines 38 kB view raw
1/* 2 * linux/net/sunrpc/xprtsock.c 3 * 4 * Client-side transport implementation for sockets. 5 * 6 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> 7 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 * 11 * Rewrite of larges part of the code in order to stabilize TCP stuff. 12 * Fix behaviour when socket buffer is full. 13 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 14 * 15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> 16 */ 17 18#include <linux/types.h> 19#include <linux/slab.h> 20#include <linux/capability.h> 21#include <linux/sched.h> 22#include <linux/pagemap.h> 23#include <linux/errno.h> 24#include <linux/socket.h> 25#include <linux/in.h> 26#include <linux/net.h> 27#include <linux/mm.h> 28#include <linux/udp.h> 29#include <linux/tcp.h> 30#include <linux/sunrpc/clnt.h> 31#include <linux/sunrpc/sched.h> 32#include <linux/file.h> 33 34#include <net/sock.h> 35#include <net/checksum.h> 36#include <net/udp.h> 37#include <net/tcp.h> 38 39/* 40 * xprtsock tunables 41 */ 42unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 43unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; 44 45unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; 46unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; 47 48/* 49 * How many times to try sending a request on a socket before waiting 50 * for the socket buffer to clear. 51 */ 52#define XS_SENDMSG_RETRY (10U) 53 54/* 55 * Time out for an RPC UDP socket connect. UDP socket connects are 56 * synchronous, but we set a timeout anyway in case of resource 57 * exhaustion on the local host. 58 */ 59#define XS_UDP_CONN_TO (5U * HZ) 60 61/* 62 * Wait duration for an RPC TCP connection to be established. Solaris 63 * NFS over TCP uses 60 seconds, for example, which is in line with how 64 * long a server takes to reboot. 65 */ 66#define XS_TCP_CONN_TO (60U * HZ) 67 68/* 69 * Wait duration for a reply from the RPC portmapper. 70 */ 71#define XS_BIND_TO (60U * HZ) 72 73/* 74 * Delay if a UDP socket connect error occurs. This is most likely some 75 * kind of resource problem on the local host. 76 */ 77#define XS_UDP_REEST_TO (2U * HZ) 78 79/* 80 * The reestablish timeout allows clients to delay for a bit before attempting 81 * to reconnect to a server that just dropped our connection. 82 * 83 * We implement an exponential backoff when trying to reestablish a TCP 84 * transport connection with the server. Some servers like to drop a TCP 85 * connection when they are overworked, so we start with a short timeout and 86 * increase over time if the server is down or not responding. 87 */ 88#define XS_TCP_INIT_REEST_TO (3U * HZ) 89#define XS_TCP_MAX_REEST_TO (5U * 60 * HZ) 90 91/* 92 * TCP idle timeout; client drops the transport socket if it is idle 93 * for this long. Note that we also timeout UDP sockets to prevent 94 * holding port numbers when there is no RPC traffic. 95 */ 96#define XS_IDLE_DISC_TO (5U * 60 * HZ) 97 98#ifdef RPC_DEBUG 99# undef RPC_DEBUG_DATA 100# define RPCDBG_FACILITY RPCDBG_TRANS 101#endif 102 103#ifdef RPC_DEBUG_DATA 104static void xs_pktdump(char *msg, u32 *packet, unsigned int count) 105{ 106 u8 *buf = (u8 *) packet; 107 int j; 108 109 dprintk("RPC: %s\n", msg); 110 for (j = 0; j < count && j < 128; j += 4) { 111 if (!(j & 31)) { 112 if (j) 113 dprintk("\n"); 114 dprintk("0x%04x ", j); 115 } 116 dprintk("%02x%02x%02x%02x ", 117 buf[j], buf[j+1], buf[j+2], buf[j+3]); 118 } 119 dprintk("\n"); 120} 121#else 122static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count) 123{ 124 /* NOP */ 125} 126#endif 127 128static void xs_format_peer_addresses(struct rpc_xprt *xprt) 129{ 130 struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr; 131 char *buf; 132 133 buf = kzalloc(20, GFP_KERNEL); 134 if (buf) { 135 snprintf(buf, 20, "%u.%u.%u.%u", 136 NIPQUAD(addr->sin_addr.s_addr)); 137 } 138 xprt->address_strings[RPC_DISPLAY_ADDR] = buf; 139 140 buf = kzalloc(8, GFP_KERNEL); 141 if (buf) { 142 snprintf(buf, 8, "%u", 143 ntohs(addr->sin_port)); 144 } 145 xprt->address_strings[RPC_DISPLAY_PORT] = buf; 146 147 if (xprt->prot == IPPROTO_UDP) 148 xprt->address_strings[RPC_DISPLAY_PROTO] = "udp"; 149 else 150 xprt->address_strings[RPC_DISPLAY_PROTO] = "tcp"; 151 152 buf = kzalloc(48, GFP_KERNEL); 153 if (buf) { 154 snprintf(buf, 48, "addr=%u.%u.%u.%u port=%u proto=%s", 155 NIPQUAD(addr->sin_addr.s_addr), 156 ntohs(addr->sin_port), 157 xprt->prot == IPPROTO_UDP ? "udp" : "tcp"); 158 } 159 xprt->address_strings[RPC_DISPLAY_ALL] = buf; 160} 161 162static void xs_free_peer_addresses(struct rpc_xprt *xprt) 163{ 164 kfree(xprt->address_strings[RPC_DISPLAY_ADDR]); 165 kfree(xprt->address_strings[RPC_DISPLAY_PORT]); 166 kfree(xprt->address_strings[RPC_DISPLAY_ALL]); 167} 168 169#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 170 171static inline int xs_send_head(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, unsigned int len) 172{ 173 struct kvec iov = { 174 .iov_base = xdr->head[0].iov_base + base, 175 .iov_len = len - base, 176 }; 177 struct msghdr msg = { 178 .msg_name = addr, 179 .msg_namelen = addrlen, 180 .msg_flags = XS_SENDMSG_FLAGS, 181 }; 182 183 if (xdr->len > len) 184 msg.msg_flags |= MSG_MORE; 185 186 if (likely(iov.iov_len)) 187 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 188 return kernel_sendmsg(sock, &msg, NULL, 0, 0); 189} 190 191static int xs_send_tail(struct socket *sock, struct xdr_buf *xdr, unsigned int base, unsigned int len) 192{ 193 struct kvec iov = { 194 .iov_base = xdr->tail[0].iov_base + base, 195 .iov_len = len - base, 196 }; 197 struct msghdr msg = { 198 .msg_flags = XS_SENDMSG_FLAGS, 199 }; 200 201 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 202} 203 204/** 205 * xs_sendpages - write pages directly to a socket 206 * @sock: socket to send on 207 * @addr: UDP only -- address of destination 208 * @addrlen: UDP only -- length of destination address 209 * @xdr: buffer containing this request 210 * @base: starting position in the buffer 211 * 212 */ 213static inline int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) 214{ 215 struct page **ppage = xdr->pages; 216 unsigned int len, pglen = xdr->page_len; 217 int err, ret = 0; 218 219 if (unlikely(!sock)) 220 return -ENOTCONN; 221 222 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 223 224 len = xdr->head[0].iov_len; 225 if (base < len || (addr != NULL && base == 0)) { 226 err = xs_send_head(sock, addr, addrlen, xdr, base, len); 227 if (ret == 0) 228 ret = err; 229 else if (err > 0) 230 ret += err; 231 if (err != (len - base)) 232 goto out; 233 base = 0; 234 } else 235 base -= len; 236 237 if (unlikely(pglen == 0)) 238 goto copy_tail; 239 if (unlikely(base >= pglen)) { 240 base -= pglen; 241 goto copy_tail; 242 } 243 if (base || xdr->page_base) { 244 pglen -= base; 245 base += xdr->page_base; 246 ppage += base >> PAGE_CACHE_SHIFT; 247 base &= ~PAGE_CACHE_MASK; 248 } 249 250 do { 251 int flags = XS_SENDMSG_FLAGS; 252 253 len = PAGE_CACHE_SIZE; 254 if (base) 255 len -= base; 256 if (pglen < len) 257 len = pglen; 258 259 if (pglen != len || xdr->tail[0].iov_len != 0) 260 flags |= MSG_MORE; 261 262 err = kernel_sendpage(sock, *ppage, base, len, flags); 263 if (ret == 0) 264 ret = err; 265 else if (err > 0) 266 ret += err; 267 if (err != len) 268 goto out; 269 base = 0; 270 ppage++; 271 } while ((pglen -= len) != 0); 272copy_tail: 273 len = xdr->tail[0].iov_len; 274 if (base < len) { 275 err = xs_send_tail(sock, xdr, base, len); 276 if (ret == 0) 277 ret = err; 278 else if (err > 0) 279 ret += err; 280 } 281out: 282 return ret; 283} 284 285/** 286 * xs_nospace - place task on wait queue if transmit was incomplete 287 * @task: task to put to sleep 288 * 289 */ 290static void xs_nospace(struct rpc_task *task) 291{ 292 struct rpc_rqst *req = task->tk_rqstp; 293 struct rpc_xprt *xprt = req->rq_xprt; 294 295 dprintk("RPC: %4d xmit incomplete (%u left of %u)\n", 296 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 297 req->rq_slen); 298 299 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 300 /* Protect against races with write_space */ 301 spin_lock_bh(&xprt->transport_lock); 302 303 /* Don't race with disconnect */ 304 if (!xprt_connected(xprt)) 305 task->tk_status = -ENOTCONN; 306 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) 307 xprt_wait_for_buffer_space(task); 308 309 spin_unlock_bh(&xprt->transport_lock); 310 } else 311 /* Keep holding the socket if it is blocked */ 312 rpc_delay(task, HZ>>4); 313} 314 315/** 316 * xs_udp_send_request - write an RPC request to a UDP socket 317 * @task: address of RPC task that manages the state of an RPC request 318 * 319 * Return values: 320 * 0: The request has been sent 321 * EAGAIN: The socket was blocked, please call again later to 322 * complete the request 323 * ENOTCONN: Caller needs to invoke connect logic then call again 324 * other: Some other error occured, the request was not sent 325 */ 326static int xs_udp_send_request(struct rpc_task *task) 327{ 328 struct rpc_rqst *req = task->tk_rqstp; 329 struct rpc_xprt *xprt = req->rq_xprt; 330 struct xdr_buf *xdr = &req->rq_snd_buf; 331 int status; 332 333 xs_pktdump("packet data:", 334 req->rq_svec->iov_base, 335 req->rq_svec->iov_len); 336 337 req->rq_xtime = jiffies; 338 status = xs_sendpages(xprt->sock, (struct sockaddr *) &xprt->addr, 339 xprt->addrlen, xdr, req->rq_bytes_sent); 340 341 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 342 xdr->len - req->rq_bytes_sent, status); 343 344 if (likely(status >= (int) req->rq_slen)) 345 return 0; 346 347 /* Still some bytes left; set up for a retry later. */ 348 if (status > 0) 349 status = -EAGAIN; 350 351 switch (status) { 352 case -ENETUNREACH: 353 case -EPIPE: 354 case -ECONNREFUSED: 355 /* When the server has died, an ICMP port unreachable message 356 * prompts ECONNREFUSED. */ 357 break; 358 case -EAGAIN: 359 xs_nospace(task); 360 break; 361 default: 362 dprintk("RPC: sendmsg returned unrecognized error %d\n", 363 -status); 364 break; 365 } 366 367 return status; 368} 369 370static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf) 371{ 372 u32 reclen = buf->len - sizeof(rpc_fraghdr); 373 rpc_fraghdr *base = buf->head[0].iov_base; 374 *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen); 375} 376 377/** 378 * xs_tcp_send_request - write an RPC request to a TCP socket 379 * @task: address of RPC task that manages the state of an RPC request 380 * 381 * Return values: 382 * 0: The request has been sent 383 * EAGAIN: The socket was blocked, please call again later to 384 * complete the request 385 * ENOTCONN: Caller needs to invoke connect logic then call again 386 * other: Some other error occured, the request was not sent 387 * 388 * XXX: In the case of soft timeouts, should we eventually give up 389 * if sendmsg is not able to make progress? 390 */ 391static int xs_tcp_send_request(struct rpc_task *task) 392{ 393 struct rpc_rqst *req = task->tk_rqstp; 394 struct rpc_xprt *xprt = req->rq_xprt; 395 struct xdr_buf *xdr = &req->rq_snd_buf; 396 int status, retry = 0; 397 398 xs_encode_tcp_record_marker(&req->rq_snd_buf); 399 400 xs_pktdump("packet data:", 401 req->rq_svec->iov_base, 402 req->rq_svec->iov_len); 403 404 /* Continue transmitting the packet/record. We must be careful 405 * to cope with writespace callbacks arriving _after_ we have 406 * called sendmsg(). */ 407 while (1) { 408 req->rq_xtime = jiffies; 409 status = xs_sendpages(xprt->sock, NULL, 0, xdr, 410 req->rq_bytes_sent); 411 412 dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 413 xdr->len - req->rq_bytes_sent, status); 414 415 if (unlikely(status < 0)) 416 break; 417 418 /* If we've sent the entire packet, immediately 419 * reset the count of bytes sent. */ 420 req->rq_bytes_sent += status; 421 task->tk_bytes_sent += status; 422 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 423 req->rq_bytes_sent = 0; 424 return 0; 425 } 426 427 status = -EAGAIN; 428 if (retry++ > XS_SENDMSG_RETRY) 429 break; 430 } 431 432 switch (status) { 433 case -EAGAIN: 434 xs_nospace(task); 435 break; 436 case -ECONNREFUSED: 437 case -ECONNRESET: 438 case -ENOTCONN: 439 case -EPIPE: 440 status = -ENOTCONN; 441 break; 442 default: 443 dprintk("RPC: sendmsg returned unrecognized error %d\n", 444 -status); 445 xprt_disconnect(xprt); 446 break; 447 } 448 449 return status; 450} 451 452/** 453 * xs_tcp_release_xprt - clean up after a tcp transmission 454 * @xprt: transport 455 * @task: rpc task 456 * 457 * This cleans up if an error causes us to abort the transmission of a request. 458 * In this case, the socket may need to be reset in order to avoid confusing 459 * the server. 460 */ 461static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 462{ 463 struct rpc_rqst *req; 464 465 if (task != xprt->snd_task) 466 return; 467 if (task == NULL) 468 goto out_release; 469 req = task->tk_rqstp; 470 if (req->rq_bytes_sent == 0) 471 goto out_release; 472 if (req->rq_bytes_sent == req->rq_snd_buf.len) 473 goto out_release; 474 set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state); 475out_release: 476 xprt_release_xprt(xprt, task); 477} 478 479/** 480 * xs_close - close a socket 481 * @xprt: transport 482 * 483 * This is used when all requests are complete; ie, no DRC state remains 484 * on the server we want to save. 485 */ 486static void xs_close(struct rpc_xprt *xprt) 487{ 488 struct socket *sock = xprt->sock; 489 struct sock *sk = xprt->inet; 490 491 if (!sk) 492 goto clear_close_wait; 493 494 dprintk("RPC: xs_close xprt %p\n", xprt); 495 496 write_lock_bh(&sk->sk_callback_lock); 497 xprt->inet = NULL; 498 xprt->sock = NULL; 499 500 sk->sk_user_data = NULL; 501 sk->sk_data_ready = xprt->old_data_ready; 502 sk->sk_state_change = xprt->old_state_change; 503 sk->sk_write_space = xprt->old_write_space; 504 write_unlock_bh(&sk->sk_callback_lock); 505 506 sk->sk_no_check = 0; 507 508 sock_release(sock); 509clear_close_wait: 510 smp_mb__before_clear_bit(); 511 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 512 smp_mb__after_clear_bit(); 513} 514 515/** 516 * xs_destroy - prepare to shutdown a transport 517 * @xprt: doomed transport 518 * 519 */ 520static void xs_destroy(struct rpc_xprt *xprt) 521{ 522 dprintk("RPC: xs_destroy xprt %p\n", xprt); 523 524 cancel_delayed_work(&xprt->connect_worker); 525 flush_scheduled_work(); 526 527 xprt_disconnect(xprt); 528 xs_close(xprt); 529 xs_free_peer_addresses(xprt); 530 kfree(xprt->slot); 531} 532 533static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) 534{ 535 return (struct rpc_xprt *) sk->sk_user_data; 536} 537 538/** 539 * xs_udp_data_ready - "data ready" callback for UDP sockets 540 * @sk: socket with data to read 541 * @len: how much data to read 542 * 543 */ 544static void xs_udp_data_ready(struct sock *sk, int len) 545{ 546 struct rpc_task *task; 547 struct rpc_xprt *xprt; 548 struct rpc_rqst *rovr; 549 struct sk_buff *skb; 550 int err, repsize, copied; 551 u32 _xid; 552 __be32 *xp; 553 554 read_lock(&sk->sk_callback_lock); 555 dprintk("RPC: xs_udp_data_ready...\n"); 556 if (!(xprt = xprt_from_sock(sk))) 557 goto out; 558 559 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 560 goto out; 561 562 if (xprt->shutdown) 563 goto dropit; 564 565 repsize = skb->len - sizeof(struct udphdr); 566 if (repsize < 4) { 567 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 568 goto dropit; 569 } 570 571 /* Copy the XID from the skb... */ 572 xp = skb_header_pointer(skb, sizeof(struct udphdr), 573 sizeof(_xid), &_xid); 574 if (xp == NULL) 575 goto dropit; 576 577 /* Look up and lock the request corresponding to the given XID */ 578 spin_lock(&xprt->transport_lock); 579 rovr = xprt_lookup_rqst(xprt, *xp); 580 if (!rovr) 581 goto out_unlock; 582 task = rovr->rq_task; 583 584 if ((copied = rovr->rq_private_buf.buflen) > repsize) 585 copied = repsize; 586 587 /* Suck it into the iovec, verify checksum if not done by hw. */ 588 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) 589 goto out_unlock; 590 591 /* Something worked... */ 592 dst_confirm(skb->dst); 593 594 xprt_adjust_cwnd(task, copied); 595 xprt_update_rtt(task); 596 xprt_complete_rqst(task, copied); 597 598 out_unlock: 599 spin_unlock(&xprt->transport_lock); 600 dropit: 601 skb_free_datagram(sk, skb); 602 out: 603 read_unlock(&sk->sk_callback_lock); 604} 605 606static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len) 607{ 608 if (len > desc->count) 609 len = desc->count; 610 if (skb_copy_bits(desc->skb, desc->offset, p, len)) { 611 dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", 612 len, desc->count); 613 return 0; 614 } 615 desc->offset += len; 616 desc->count -= len; 617 dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", 618 len, desc->count); 619 return len; 620} 621 622static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) 623{ 624 size_t len, used; 625 char *p; 626 627 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; 628 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; 629 used = xs_tcp_copy_data(desc, p, len); 630 xprt->tcp_offset += used; 631 if (used != len) 632 return; 633 634 xprt->tcp_reclen = ntohl(xprt->tcp_recm); 635 if (xprt->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) 636 xprt->tcp_flags |= XPRT_LAST_FRAG; 637 else 638 xprt->tcp_flags &= ~XPRT_LAST_FRAG; 639 xprt->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; 640 641 xprt->tcp_flags &= ~XPRT_COPY_RECM; 642 xprt->tcp_offset = 0; 643 644 /* Sanity check of the record length */ 645 if (unlikely(xprt->tcp_reclen < 4)) { 646 dprintk("RPC: invalid TCP record fragment length\n"); 647 xprt_disconnect(xprt); 648 return; 649 } 650 dprintk("RPC: reading TCP record fragment of length %d\n", 651 xprt->tcp_reclen); 652} 653 654static void xs_tcp_check_recm(struct rpc_xprt *xprt) 655{ 656 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", 657 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); 658 if (xprt->tcp_offset == xprt->tcp_reclen) { 659 xprt->tcp_flags |= XPRT_COPY_RECM; 660 xprt->tcp_offset = 0; 661 if (xprt->tcp_flags & XPRT_LAST_FRAG) { 662 xprt->tcp_flags &= ~XPRT_COPY_DATA; 663 xprt->tcp_flags |= XPRT_COPY_XID; 664 xprt->tcp_copied = 0; 665 } 666 } 667} 668 669static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) 670{ 671 size_t len, used; 672 char *p; 673 674 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; 675 dprintk("RPC: reading XID (%Zu bytes)\n", len); 676 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; 677 used = xs_tcp_copy_data(desc, p, len); 678 xprt->tcp_offset += used; 679 if (used != len) 680 return; 681 xprt->tcp_flags &= ~XPRT_COPY_XID; 682 xprt->tcp_flags |= XPRT_COPY_DATA; 683 xprt->tcp_copied = 4; 684 dprintk("RPC: reading reply for XID %08x\n", 685 ntohl(xprt->tcp_xid)); 686 xs_tcp_check_recm(xprt); 687} 688 689static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) 690{ 691 struct rpc_rqst *req; 692 struct xdr_buf *rcvbuf; 693 size_t len; 694 ssize_t r; 695 696 /* Find and lock the request corresponding to this xid */ 697 spin_lock(&xprt->transport_lock); 698 req = xprt_lookup_rqst(xprt, xprt->tcp_xid); 699 if (!req) { 700 xprt->tcp_flags &= ~XPRT_COPY_DATA; 701 dprintk("RPC: XID %08x request not found!\n", 702 ntohl(xprt->tcp_xid)); 703 spin_unlock(&xprt->transport_lock); 704 return; 705 } 706 707 rcvbuf = &req->rq_private_buf; 708 len = desc->count; 709 if (len > xprt->tcp_reclen - xprt->tcp_offset) { 710 skb_reader_t my_desc; 711 712 len = xprt->tcp_reclen - xprt->tcp_offset; 713 memcpy(&my_desc, desc, sizeof(my_desc)); 714 my_desc.count = len; 715 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 716 &my_desc, xs_tcp_copy_data); 717 desc->count -= r; 718 desc->offset += r; 719 } else 720 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 721 desc, xs_tcp_copy_data); 722 723 if (r > 0) { 724 xprt->tcp_copied += r; 725 xprt->tcp_offset += r; 726 } 727 if (r != len) { 728 /* Error when copying to the receive buffer, 729 * usually because we weren't able to allocate 730 * additional buffer pages. All we can do now 731 * is turn off XPRT_COPY_DATA, so the request 732 * will not receive any additional updates, 733 * and time out. 734 * Any remaining data from this record will 735 * be discarded. 736 */ 737 xprt->tcp_flags &= ~XPRT_COPY_DATA; 738 dprintk("RPC: XID %08x truncated request\n", 739 ntohl(xprt->tcp_xid)); 740 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 741 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 742 goto out; 743 } 744 745 dprintk("RPC: XID %08x read %Zd bytes\n", 746 ntohl(xprt->tcp_xid), r); 747 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 748 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 749 750 if (xprt->tcp_copied == req->rq_private_buf.buflen) 751 xprt->tcp_flags &= ~XPRT_COPY_DATA; 752 else if (xprt->tcp_offset == xprt->tcp_reclen) { 753 if (xprt->tcp_flags & XPRT_LAST_FRAG) 754 xprt->tcp_flags &= ~XPRT_COPY_DATA; 755 } 756 757out: 758 if (!(xprt->tcp_flags & XPRT_COPY_DATA)) 759 xprt_complete_rqst(req->rq_task, xprt->tcp_copied); 760 spin_unlock(&xprt->transport_lock); 761 xs_tcp_check_recm(xprt); 762} 763 764static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) 765{ 766 size_t len; 767 768 len = xprt->tcp_reclen - xprt->tcp_offset; 769 if (len > desc->count) 770 len = desc->count; 771 desc->count -= len; 772 desc->offset += len; 773 xprt->tcp_offset += len; 774 dprintk("RPC: discarded %Zu bytes\n", len); 775 xs_tcp_check_recm(xprt); 776} 777 778static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len) 779{ 780 struct rpc_xprt *xprt = rd_desc->arg.data; 781 skb_reader_t desc = { 782 .skb = skb, 783 .offset = offset, 784 .count = len, 785 .csum = 0 786 }; 787 788 dprintk("RPC: xs_tcp_data_recv started\n"); 789 do { 790 /* Read in a new fragment marker if necessary */ 791 /* Can we ever really expect to get completely empty fragments? */ 792 if (xprt->tcp_flags & XPRT_COPY_RECM) { 793 xs_tcp_read_fraghdr(xprt, &desc); 794 continue; 795 } 796 /* Read in the xid if necessary */ 797 if (xprt->tcp_flags & XPRT_COPY_XID) { 798 xs_tcp_read_xid(xprt, &desc); 799 continue; 800 } 801 /* Read in the request data */ 802 if (xprt->tcp_flags & XPRT_COPY_DATA) { 803 xs_tcp_read_request(xprt, &desc); 804 continue; 805 } 806 /* Skip over any trailing bytes on short reads */ 807 xs_tcp_read_discard(xprt, &desc); 808 } while (desc.count); 809 dprintk("RPC: xs_tcp_data_recv done\n"); 810 return len - desc.count; 811} 812 813/** 814 * xs_tcp_data_ready - "data ready" callback for TCP sockets 815 * @sk: socket with data to read 816 * @bytes: how much data to read 817 * 818 */ 819static void xs_tcp_data_ready(struct sock *sk, int bytes) 820{ 821 struct rpc_xprt *xprt; 822 read_descriptor_t rd_desc; 823 824 read_lock(&sk->sk_callback_lock); 825 dprintk("RPC: xs_tcp_data_ready...\n"); 826 if (!(xprt = xprt_from_sock(sk))) 827 goto out; 828 if (xprt->shutdown) 829 goto out; 830 831 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ 832 rd_desc.arg.data = xprt; 833 rd_desc.count = 65536; 834 tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); 835out: 836 read_unlock(&sk->sk_callback_lock); 837} 838 839/** 840 * xs_tcp_state_change - callback to handle TCP socket state changes 841 * @sk: socket whose state has changed 842 * 843 */ 844static void xs_tcp_state_change(struct sock *sk) 845{ 846 struct rpc_xprt *xprt; 847 848 read_lock(&sk->sk_callback_lock); 849 if (!(xprt = xprt_from_sock(sk))) 850 goto out; 851 dprintk("RPC: xs_tcp_state_change client %p...\n", xprt); 852 dprintk("RPC: state %x conn %d dead %d zapped %d\n", 853 sk->sk_state, xprt_connected(xprt), 854 sock_flag(sk, SOCK_DEAD), 855 sock_flag(sk, SOCK_ZAPPED)); 856 857 switch (sk->sk_state) { 858 case TCP_ESTABLISHED: 859 spin_lock_bh(&xprt->transport_lock); 860 if (!xprt_test_and_set_connected(xprt)) { 861 /* Reset TCP record info */ 862 xprt->tcp_offset = 0; 863 xprt->tcp_reclen = 0; 864 xprt->tcp_copied = 0; 865 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; 866 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 867 xprt_wake_pending_tasks(xprt, 0); 868 } 869 spin_unlock_bh(&xprt->transport_lock); 870 break; 871 case TCP_SYN_SENT: 872 case TCP_SYN_RECV: 873 break; 874 case TCP_CLOSE_WAIT: 875 /* Try to schedule an autoclose RPC calls */ 876 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 877 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 878 schedule_work(&xprt->task_cleanup); 879 default: 880 xprt_disconnect(xprt); 881 } 882 out: 883 read_unlock(&sk->sk_callback_lock); 884} 885 886/** 887 * xs_udp_write_space - callback invoked when socket buffer space 888 * becomes available 889 * @sk: socket whose state has changed 890 * 891 * Called when more output buffer space is available for this socket. 892 * We try not to wake our writers until they can make "significant" 893 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 894 * with a bunch of small requests. 895 */ 896static void xs_udp_write_space(struct sock *sk) 897{ 898 read_lock(&sk->sk_callback_lock); 899 900 /* from net/core/sock.c:sock_def_write_space */ 901 if (sock_writeable(sk)) { 902 struct socket *sock; 903 struct rpc_xprt *xprt; 904 905 if (unlikely(!(sock = sk->sk_socket))) 906 goto out; 907 if (unlikely(!(xprt = xprt_from_sock(sk)))) 908 goto out; 909 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))) 910 goto out; 911 912 xprt_write_space(xprt); 913 } 914 915 out: 916 read_unlock(&sk->sk_callback_lock); 917} 918 919/** 920 * xs_tcp_write_space - callback invoked when socket buffer space 921 * becomes available 922 * @sk: socket whose state has changed 923 * 924 * Called when more output buffer space is available for this socket. 925 * We try not to wake our writers until they can make "significant" 926 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 927 * with a bunch of small requests. 928 */ 929static void xs_tcp_write_space(struct sock *sk) 930{ 931 read_lock(&sk->sk_callback_lock); 932 933 /* from net/core/stream.c:sk_stream_write_space */ 934 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { 935 struct socket *sock; 936 struct rpc_xprt *xprt; 937 938 if (unlikely(!(sock = sk->sk_socket))) 939 goto out; 940 if (unlikely(!(xprt = xprt_from_sock(sk)))) 941 goto out; 942 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))) 943 goto out; 944 945 xprt_write_space(xprt); 946 } 947 948 out: 949 read_unlock(&sk->sk_callback_lock); 950} 951 952static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt) 953{ 954 struct sock *sk = xprt->inet; 955 956 if (xprt->rcvsize) { 957 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 958 sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; 959 } 960 if (xprt->sndsize) { 961 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 962 sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; 963 sk->sk_write_space(sk); 964 } 965} 966 967/** 968 * xs_udp_set_buffer_size - set send and receive limits 969 * @xprt: generic transport 970 * @sndsize: requested size of send buffer, in bytes 971 * @rcvsize: requested size of receive buffer, in bytes 972 * 973 * Set socket send and receive buffer size limits. 974 */ 975static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize) 976{ 977 xprt->sndsize = 0; 978 if (sndsize) 979 xprt->sndsize = sndsize + 1024; 980 xprt->rcvsize = 0; 981 if (rcvsize) 982 xprt->rcvsize = rcvsize + 1024; 983 984 xs_udp_do_set_buffer_size(xprt); 985} 986 987/** 988 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport 989 * @task: task that timed out 990 * 991 * Adjust the congestion window after a retransmit timeout has occurred. 992 */ 993static void xs_udp_timer(struct rpc_task *task) 994{ 995 xprt_adjust_cwnd(task, -ETIMEDOUT); 996} 997 998static unsigned short xs_get_random_port(void) 999{ 1000 unsigned short range = xprt_max_resvport - xprt_min_resvport; 1001 unsigned short rand = (unsigned short) net_random() % range; 1002 return rand + xprt_min_resvport; 1003} 1004 1005/** 1006 * xs_print_peer_address - format an IPv4 address for printing 1007 * @xprt: generic transport 1008 * @format: flags field indicating which parts of the address to render 1009 */ 1010static char *xs_print_peer_address(struct rpc_xprt *xprt, enum rpc_display_format_t format) 1011{ 1012 if (xprt->address_strings[format] != NULL) 1013 return xprt->address_strings[format]; 1014 else 1015 return "unprintable"; 1016} 1017 1018/** 1019 * xs_set_port - reset the port number in the remote endpoint address 1020 * @xprt: generic transport 1021 * @port: new port number 1022 * 1023 */ 1024static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) 1025{ 1026 struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr; 1027 1028 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port); 1029 1030 sap->sin_port = htons(port); 1031} 1032 1033static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock) 1034{ 1035 struct sockaddr_in myaddr = { 1036 .sin_family = AF_INET, 1037 }; 1038 int err; 1039 unsigned short port = xprt->port; 1040 1041 do { 1042 myaddr.sin_port = htons(port); 1043 err = kernel_bind(sock, (struct sockaddr *) &myaddr, 1044 sizeof(myaddr)); 1045 if (err == 0) { 1046 xprt->port = port; 1047 dprintk("RPC: xs_bindresvport bound to port %u\n", 1048 port); 1049 return 0; 1050 } 1051 if (port <= xprt_min_resvport) 1052 port = xprt_max_resvport; 1053 else 1054 port--; 1055 } while (err == -EADDRINUSE && port != xprt->port); 1056 1057 dprintk("RPC: can't bind to reserved port (%d).\n", -err); 1058 return err; 1059} 1060 1061/** 1062 * xs_udp_connect_worker - set up a UDP socket 1063 * @args: RPC transport to connect 1064 * 1065 * Invoked by a work queue tasklet. 1066 */ 1067static void xs_udp_connect_worker(void *args) 1068{ 1069 struct rpc_xprt *xprt = (struct rpc_xprt *) args; 1070 struct socket *sock = xprt->sock; 1071 int err, status = -EIO; 1072 1073 if (xprt->shutdown || !xprt_bound(xprt)) 1074 goto out; 1075 1076 /* Start by resetting any existing state */ 1077 xs_close(xprt); 1078 1079 if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) { 1080 dprintk("RPC: can't create UDP transport socket (%d).\n", -err); 1081 goto out; 1082 } 1083 1084 if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) { 1085 sock_release(sock); 1086 goto out; 1087 } 1088 1089 dprintk("RPC: worker connecting xprt %p to address: %s\n", 1090 xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL)); 1091 1092 if (!xprt->inet) { 1093 struct sock *sk = sock->sk; 1094 1095 write_lock_bh(&sk->sk_callback_lock); 1096 1097 sk->sk_user_data = xprt; 1098 xprt->old_data_ready = sk->sk_data_ready; 1099 xprt->old_state_change = sk->sk_state_change; 1100 xprt->old_write_space = sk->sk_write_space; 1101 sk->sk_data_ready = xs_udp_data_ready; 1102 sk->sk_write_space = xs_udp_write_space; 1103 sk->sk_no_check = UDP_CSUM_NORCV; 1104 sk->sk_allocation = GFP_ATOMIC; 1105 1106 xprt_set_connected(xprt); 1107 1108 /* Reset to new socket */ 1109 xprt->sock = sock; 1110 xprt->inet = sk; 1111 1112 write_unlock_bh(&sk->sk_callback_lock); 1113 } 1114 xs_udp_do_set_buffer_size(xprt); 1115 status = 0; 1116out: 1117 xprt_wake_pending_tasks(xprt, status); 1118 xprt_clear_connecting(xprt); 1119} 1120 1121/* 1122 * We need to preserve the port number so the reply cache on the server can 1123 * find our cached RPC replies when we get around to reconnecting. 1124 */ 1125static void xs_tcp_reuse_connection(struct rpc_xprt *xprt) 1126{ 1127 int result; 1128 struct socket *sock = xprt->sock; 1129 struct sockaddr any; 1130 1131 dprintk("RPC: disconnecting xprt %p to reuse port\n", xprt); 1132 1133 /* 1134 * Disconnect the transport socket by doing a connect operation 1135 * with AF_UNSPEC. This should return immediately... 1136 */ 1137 memset(&any, 0, sizeof(any)); 1138 any.sa_family = AF_UNSPEC; 1139 result = kernel_connect(sock, &any, sizeof(any), 0); 1140 if (result) 1141 dprintk("RPC: AF_UNSPEC connect return code %d\n", 1142 result); 1143} 1144 1145/** 1146 * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint 1147 * @args: RPC transport to connect 1148 * 1149 * Invoked by a work queue tasklet. 1150 */ 1151static void xs_tcp_connect_worker(void *args) 1152{ 1153 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 1154 struct socket *sock = xprt->sock; 1155 int err, status = -EIO; 1156 1157 if (xprt->shutdown || !xprt_bound(xprt)) 1158 goto out; 1159 1160 if (!xprt->sock) { 1161 /* start from scratch */ 1162 if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) { 1163 dprintk("RPC: can't create TCP transport socket (%d).\n", -err); 1164 goto out; 1165 } 1166 1167 if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) { 1168 sock_release(sock); 1169 goto out; 1170 } 1171 } else 1172 /* "close" the socket, preserving the local port */ 1173 xs_tcp_reuse_connection(xprt); 1174 1175 dprintk("RPC: worker connecting xprt %p to address: %s\n", 1176 xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL)); 1177 1178 if (!xprt->inet) { 1179 struct sock *sk = sock->sk; 1180 1181 write_lock_bh(&sk->sk_callback_lock); 1182 1183 sk->sk_user_data = xprt; 1184 xprt->old_data_ready = sk->sk_data_ready; 1185 xprt->old_state_change = sk->sk_state_change; 1186 xprt->old_write_space = sk->sk_write_space; 1187 sk->sk_data_ready = xs_tcp_data_ready; 1188 sk->sk_state_change = xs_tcp_state_change; 1189 sk->sk_write_space = xs_tcp_write_space; 1190 sk->sk_allocation = GFP_ATOMIC; 1191 1192 /* socket options */ 1193 sk->sk_userlocks |= SOCK_BINDPORT_LOCK; 1194 sock_reset_flag(sk, SOCK_LINGER); 1195 tcp_sk(sk)->linger2 = 0; 1196 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; 1197 1198 xprt_clear_connected(xprt); 1199 1200 /* Reset to new socket */ 1201 xprt->sock = sock; 1202 xprt->inet = sk; 1203 1204 write_unlock_bh(&sk->sk_callback_lock); 1205 } 1206 1207 /* Tell the socket layer to start connecting... */ 1208 xprt->stat.connect_count++; 1209 xprt->stat.connect_start = jiffies; 1210 status = kernel_connect(sock, (struct sockaddr *) &xprt->addr, 1211 xprt->addrlen, O_NONBLOCK); 1212 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 1213 xprt, -status, xprt_connected(xprt), sock->sk->sk_state); 1214 if (status < 0) { 1215 switch (status) { 1216 case -EINPROGRESS: 1217 case -EALREADY: 1218 goto out_clear; 1219 case -ECONNREFUSED: 1220 case -ECONNRESET: 1221 /* retry with existing socket, after a delay */ 1222 break; 1223 default: 1224 /* get rid of existing socket, and retry */ 1225 xs_close(xprt); 1226 break; 1227 } 1228 } 1229out: 1230 xprt_wake_pending_tasks(xprt, status); 1231out_clear: 1232 xprt_clear_connecting(xprt); 1233} 1234 1235/** 1236 * xs_connect - connect a socket to a remote endpoint 1237 * @task: address of RPC task that manages state of connect request 1238 * 1239 * TCP: If the remote end dropped the connection, delay reconnecting. 1240 * 1241 * UDP socket connects are synchronous, but we use a work queue anyway 1242 * to guarantee that even unprivileged user processes can set up a 1243 * socket on a privileged port. 1244 * 1245 * If a UDP socket connect fails, the delay behavior here prevents 1246 * retry floods (hard mounts). 1247 */ 1248static void xs_connect(struct rpc_task *task) 1249{ 1250 struct rpc_xprt *xprt = task->tk_xprt; 1251 1252 if (xprt_test_and_set_connecting(xprt)) 1253 return; 1254 1255 if (xprt->sock != NULL) { 1256 dprintk("RPC: xs_connect delayed xprt %p for %lu seconds\n", 1257 xprt, xprt->reestablish_timeout / HZ); 1258 schedule_delayed_work(&xprt->connect_worker, 1259 xprt->reestablish_timeout); 1260 xprt->reestablish_timeout <<= 1; 1261 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO) 1262 xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; 1263 } else { 1264 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 1265 schedule_work(&xprt->connect_worker); 1266 1267 /* flush_scheduled_work can sleep... */ 1268 if (!RPC_IS_ASYNC(task)) 1269 flush_scheduled_work(); 1270 } 1271} 1272 1273/** 1274 * xs_udp_print_stats - display UDP socket-specifc stats 1275 * @xprt: rpc_xprt struct containing statistics 1276 * @seq: output file 1277 * 1278 */ 1279static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 1280{ 1281 seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n", 1282 xprt->port, 1283 xprt->stat.bind_count, 1284 xprt->stat.sends, 1285 xprt->stat.recvs, 1286 xprt->stat.bad_xids, 1287 xprt->stat.req_u, 1288 xprt->stat.bklog_u); 1289} 1290 1291/** 1292 * xs_tcp_print_stats - display TCP socket-specifc stats 1293 * @xprt: rpc_xprt struct containing statistics 1294 * @seq: output file 1295 * 1296 */ 1297static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 1298{ 1299 long idle_time = 0; 1300 1301 if (xprt_connected(xprt)) 1302 idle_time = (long)(jiffies - xprt->last_used) / HZ; 1303 1304 seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n", 1305 xprt->port, 1306 xprt->stat.bind_count, 1307 xprt->stat.connect_count, 1308 xprt->stat.connect_time, 1309 idle_time, 1310 xprt->stat.sends, 1311 xprt->stat.recvs, 1312 xprt->stat.bad_xids, 1313 xprt->stat.req_u, 1314 xprt->stat.bklog_u); 1315} 1316 1317static struct rpc_xprt_ops xs_udp_ops = { 1318 .set_buffer_size = xs_udp_set_buffer_size, 1319 .print_addr = xs_print_peer_address, 1320 .reserve_xprt = xprt_reserve_xprt_cong, 1321 .release_xprt = xprt_release_xprt_cong, 1322 .rpcbind = rpc_getport, 1323 .set_port = xs_set_port, 1324 .connect = xs_connect, 1325 .buf_alloc = rpc_malloc, 1326 .buf_free = rpc_free, 1327 .send_request = xs_udp_send_request, 1328 .set_retrans_timeout = xprt_set_retrans_timeout_rtt, 1329 .timer = xs_udp_timer, 1330 .release_request = xprt_release_rqst_cong, 1331 .close = xs_close, 1332 .destroy = xs_destroy, 1333 .print_stats = xs_udp_print_stats, 1334}; 1335 1336static struct rpc_xprt_ops xs_tcp_ops = { 1337 .print_addr = xs_print_peer_address, 1338 .reserve_xprt = xprt_reserve_xprt, 1339 .release_xprt = xs_tcp_release_xprt, 1340 .rpcbind = rpc_getport, 1341 .set_port = xs_set_port, 1342 .connect = xs_connect, 1343 .buf_alloc = rpc_malloc, 1344 .buf_free = rpc_free, 1345 .send_request = xs_tcp_send_request, 1346 .set_retrans_timeout = xprt_set_retrans_timeout_def, 1347 .close = xs_close, 1348 .destroy = xs_destroy, 1349 .print_stats = xs_tcp_print_stats, 1350}; 1351 1352/** 1353 * xs_setup_udp - Set up transport to use a UDP socket 1354 * @xprt: transport to set up 1355 * @to: timeout parameters 1356 * 1357 */ 1358int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) 1359{ 1360 size_t slot_table_size; 1361 struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr; 1362 1363 xprt->max_reqs = xprt_udp_slot_table_entries; 1364 slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); 1365 xprt->slot = kzalloc(slot_table_size, GFP_KERNEL); 1366 if (xprt->slot == NULL) 1367 return -ENOMEM; 1368 1369 if (ntohs(addr->sin_port) != 0) 1370 xprt_set_bound(xprt); 1371 xprt->port = xs_get_random_port(); 1372 1373 xprt->prot = IPPROTO_UDP; 1374 xprt->tsh_size = 0; 1375 /* XXX: header size can vary due to auth type, IPv6, etc. */ 1376 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 1377 1378 INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt); 1379 xprt->bind_timeout = XS_BIND_TO; 1380 xprt->connect_timeout = XS_UDP_CONN_TO; 1381 xprt->reestablish_timeout = XS_UDP_REEST_TO; 1382 xprt->idle_timeout = XS_IDLE_DISC_TO; 1383 1384 xprt->ops = &xs_udp_ops; 1385 1386 if (to) 1387 xprt->timeout = *to; 1388 else 1389 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ); 1390 1391 xs_format_peer_addresses(xprt); 1392 dprintk("RPC: set up transport to address %s\n", 1393 xs_print_peer_address(xprt, RPC_DISPLAY_ALL)); 1394 1395 return 0; 1396} 1397 1398/** 1399 * xs_setup_tcp - Set up transport to use a TCP socket 1400 * @xprt: transport to set up 1401 * @to: timeout parameters 1402 * 1403 */ 1404int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) 1405{ 1406 size_t slot_table_size; 1407 struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr; 1408 1409 xprt->max_reqs = xprt_tcp_slot_table_entries; 1410 slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); 1411 xprt->slot = kzalloc(slot_table_size, GFP_KERNEL); 1412 if (xprt->slot == NULL) 1413 return -ENOMEM; 1414 1415 if (ntohs(addr->sin_port) != 0) 1416 xprt_set_bound(xprt); 1417 xprt->port = xs_get_random_port(); 1418 1419 xprt->prot = IPPROTO_TCP; 1420 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 1421 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 1422 1423 INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt); 1424 xprt->bind_timeout = XS_BIND_TO; 1425 xprt->connect_timeout = XS_TCP_CONN_TO; 1426 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 1427 xprt->idle_timeout = XS_IDLE_DISC_TO; 1428 1429 xprt->ops = &xs_tcp_ops; 1430 1431 if (to) 1432 xprt->timeout = *to; 1433 else 1434 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ); 1435 1436 xs_format_peer_addresses(xprt); 1437 dprintk("RPC: set up transport to address %s\n", 1438 xs_print_peer_address(xprt, RPC_DISPLAY_ALL)); 1439 1440 return 0; 1441}