Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

[PATCH] RPC: separate TCP and UDP socket write paths

Split the RPC client's main socket write path into a TCP version and a UDP
version to eliminate another dependency on the "xprt->stream" variable.

Compiler optimization removes unneeded code from xs_sendpages, as this
function is now called with some constant arguments.

We can now cleanly perform transport protocol-specific return code testing
and error recovery in each path.

Test-plan:
Millions of fsx operations. Performance characterization such as
"sio" or "iozone". Examine oprofile results for any changes before and
after this patch is applied.

Version: Thu, 11 Aug 2005 16:08:46 -0400

Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>

authored by

Chuck Lever and committed by
Trond Myklebust
262965f5 b0d93ad5

+128 -87
+128 -87
net/sunrpc/xprtsock.c
··· 40 40 */ 41 41 #define XS_MAX_RESVPORT (800U) 42 42 43 + /* 44 + * How many times to try sending a request on a socket before waiting 45 + * for the socket buffer to clear. 46 + */ 47 + #define XS_SENDMSG_RETRY (10U) 48 + 43 49 #ifdef RPC_DEBUG 44 50 # undef RPC_DEBUG_DATA 45 51 # define RPCDBG_FACILITY RPCDBG_TRANS ··· 120 114 * @base: starting position in the buffer 121 115 * 122 116 */ 123 - static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) 117 + static inline int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) 124 118 { 125 119 struct page **ppage = xdr->pages; 126 120 unsigned int len, pglen = xdr->page_len; 127 121 int err, ret = 0; 128 122 ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int); 123 + 124 + if (unlikely(!sock)) 125 + return -ENOTCONN; 126 + 127 + clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 129 128 130 129 len = xdr->head[0].iov_len; 131 130 if (base < len || (addr != NULL && base == 0)) { ··· 198 187 } 199 188 200 189 /** 201 - * xs_sendmsg - write an RPC request to a socket 202 - * @xprt: generic transport 203 - * @req: the RPC request to write 190 + * xs_nospace - place task on wait queue if transmit was incomplete 191 + * @task: task to put to sleep 204 192 * 205 193 */ 206 - static int xs_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) 194 + static void xs_nospace(struct rpc_task *task) 207 195 { 208 - struct socket *sock = xprt->sock; 209 - struct xdr_buf *xdr = &req->rq_snd_buf; 210 - struct sockaddr *addr = NULL; 211 - int addrlen = 0; 212 - unsigned int skip; 213 - int result; 196 + struct rpc_rqst *req = task->tk_rqstp; 197 + struct rpc_xprt *xprt = req->rq_xprt; 214 198 215 - if (!sock) 216 - return -ENOTCONN; 199 + dprintk("RPC: %4d xmit incomplete (%u left of %u)\n", 200 + task->tk_pid, req->rq_slen - req->rq_bytes_sent, 201 + req->rq_slen); 202 + 203 + if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 204 + /* Protect against races with write_space */ 205 + spin_lock_bh(&xprt->transport_lock); 206 + 207 + /* Don't race with disconnect */ 208 + if (!xprt_connected(xprt)) 209 + task->tk_status = -ENOTCONN; 210 + else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) 211 + xprt_wait_for_buffer_space(task); 212 + 213 + spin_unlock_bh(&xprt->transport_lock); 214 + } else 215 + /* Keep holding the socket if it is blocked */ 216 + rpc_delay(task, HZ>>4); 217 + } 218 + 219 + /** 220 + * xs_udp_send_request - write an RPC request to a UDP socket 221 + * @task: address of RPC task that manages the state of an RPC request 222 + * 223 + * Return values: 224 + * 0: The request has been sent 225 + * EAGAIN: The socket was blocked, please call again later to 226 + * complete the request 227 + * ENOTCONN: Caller needs to invoke connect logic then call again 228 + * other: Some other error occured, the request was not sent 229 + */ 230 + static int xs_udp_send_request(struct rpc_task *task) 231 + { 232 + struct rpc_rqst *req = task->tk_rqstp; 233 + struct rpc_xprt *xprt = req->rq_xprt; 234 + struct xdr_buf *xdr = &req->rq_snd_buf; 235 + int status; 217 236 218 237 xs_pktdump("packet data:", 219 238 req->rq_svec->iov_base, 220 239 req->rq_svec->iov_len); 221 240 222 - /* For UDP, we need to provide an address */ 223 - if (!xprt->stream) { 224 - addr = (struct sockaddr *) &xprt->addr; 225 - addrlen = sizeof(xprt->addr); 226 - } 227 - /* Don't repeat bytes */ 228 - skip = req->rq_bytes_sent; 241 + req->rq_xtime = jiffies; 242 + status = xs_sendpages(xprt->sock, (struct sockaddr *) &xprt->addr, 243 + sizeof(xprt->addr), xdr, req->rq_bytes_sent); 229 244 230 - clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 231 - result = xs_sendpages(sock, addr, addrlen, xdr, skip); 245 + dprintk("RPC: xs_udp_send_request(%u) = %d\n", 246 + xdr->len - req->rq_bytes_sent, status); 232 247 233 - dprintk("RPC: xs_sendmsg(%d) = %d\n", xdr->len - skip, result); 248 + if (likely(status >= (int) req->rq_slen)) 249 + return 0; 234 250 235 - if (result >= 0) 236 - return result; 251 + /* Still some bytes left; set up for a retry later. */ 252 + if (status > 0) 253 + status = -EAGAIN; 237 254 238 - switch (result) { 255 + switch (status) { 256 + case -ENETUNREACH: 257 + case -EPIPE: 239 258 case -ECONNREFUSED: 240 259 /* When the server has died, an ICMP port unreachable message 241 260 * prompts ECONNREFUSED. */ 242 - case -EAGAIN: 243 261 break; 244 - case -ECONNRESET: 245 - case -ENOTCONN: 246 - case -EPIPE: 247 - /* connection broken */ 248 - if (xprt->stream) 249 - result = -ENOTCONN; 262 + case -EAGAIN: 263 + xs_nospace(task); 250 264 break; 251 265 default: 266 + dprintk("RPC: sendmsg returned unrecognized error %d\n", 267 + -status); 252 268 break; 253 269 } 254 - return result; 270 + 271 + return status; 255 272 } 256 273 257 274 /** 258 - * xs_send_request - write an RPC request to a socket 275 + * xs_tcp_send_request - write an RPC request to a TCP socket 259 276 * @task: address of RPC task that manages the state of an RPC request 260 277 * 261 278 * Return values: 262 - * 0: The request has been sent 263 - * EAGAIN: The socket was blocked, please call again later to 264 - * complete the request 265 - * other: Some other error occured, the request was not sent 279 + * 0: The request has been sent 280 + * EAGAIN: The socket was blocked, please call again later to 281 + * complete the request 282 + * ENOTCONN: Caller needs to invoke connect logic then call again 283 + * other: Some other error occured, the request was not sent 266 284 * 267 285 * XXX: In the case of soft timeouts, should we eventually give up 268 - * if the socket is not able to make progress? 286 + * if sendmsg is not able to make progress? 269 287 */ 270 - static int xs_send_request(struct rpc_task *task) 288 + static int xs_tcp_send_request(struct rpc_task *task) 271 289 { 272 290 struct rpc_rqst *req = task->tk_rqstp; 273 291 struct rpc_xprt *xprt = req->rq_xprt; 292 + struct xdr_buf *xdr = &req->rq_snd_buf; 293 + u32 *marker = req->rq_svec[0].iov_base; 274 294 int status, retry = 0; 275 295 276 - /* set up everything as needed. */ 277 296 /* Write the record marker */ 278 - if (xprt->stream) { 279 - u32 *marker = req->rq_svec[0].iov_base; 297 + *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); 280 298 281 - *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); 282 - } 299 + xs_pktdump("packet data:", 300 + req->rq_svec->iov_base, 301 + req->rq_svec->iov_len); 283 302 284 303 /* Continue transmitting the packet/record. We must be careful 285 304 * to cope with writespace callbacks arriving _after_ we have 286 - * called sendmsg(). 287 - */ 305 + * called sendmsg(). */ 288 306 while (1) { 289 307 req->rq_xtime = jiffies; 290 - status = xs_sendmsg(xprt, req); 308 + status = xs_sendpages(xprt->sock, NULL, 0, xdr, 309 + req->rq_bytes_sent); 291 310 292 - if (status < 0) 311 + dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 312 + xdr->len - req->rq_bytes_sent, status); 313 + 314 + if (unlikely(status < 0)) 293 315 break; 294 316 295 - if (xprt->stream) { 296 - req->rq_bytes_sent += status; 297 - 298 - /* If we've sent the entire packet, immediately 299 - * reset the count of bytes sent. */ 300 - if (req->rq_bytes_sent >= req->rq_slen) { 301 - req->rq_bytes_sent = 0; 302 - return 0; 303 - } 304 - } else { 305 - if (status >= req->rq_slen) 306 - return 0; 307 - status = -EAGAIN; 308 - break; 317 + /* If we've sent the entire packet, immediately 318 + * reset the count of bytes sent. */ 319 + req->rq_bytes_sent += status; 320 + if (likely(req->rq_bytes_sent >= req->rq_slen)) { 321 + req->rq_bytes_sent = 0; 322 + return 0; 309 323 } 310 - 311 - dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", 312 - task->tk_pid, req->rq_slen - req->rq_bytes_sent, 313 - req->rq_slen); 314 324 315 325 status = -EAGAIN; 316 - if (retry++ > 50) 326 + if (retry++ > XS_SENDMSG_RETRY) 317 327 break; 318 328 } 319 329 320 - if (status == -EAGAIN) { 321 - if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 322 - /* Protect against races with write_space */ 323 - spin_lock_bh(&xprt->transport_lock); 324 - /* Don't race with disconnect */ 325 - if (!xprt_connected(xprt)) 326 - task->tk_status = -ENOTCONN; 327 - else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) 328 - xprt_wait_for_buffer_space(task); 329 - spin_unlock_bh(&xprt->transport_lock); 330 - return status; 331 - } 332 - /* Keep holding the socket if it is blocked */ 333 - rpc_delay(task, HZ>>4); 330 + switch (status) { 331 + case -EAGAIN: 332 + xs_nospace(task); 333 + break; 334 + case -ECONNREFUSED: 335 + case -ECONNRESET: 336 + case -ENOTCONN: 337 + case -EPIPE: 338 + status = -ENOTCONN; 339 + break; 340 + default: 341 + dprintk("RPC: sendmsg returned unrecognized error %d\n", 342 + -status); 343 + break; 334 344 } 345 + 335 346 return status; 336 347 } 337 348 ··· 1025 992 } 1026 993 } 1027 994 1028 - static struct rpc_xprt_ops xs_ops = { 995 + static struct rpc_xprt_ops xs_udp_ops = { 1029 996 .set_buffer_size = xs_set_buffer_size, 1030 997 .connect = xs_connect, 1031 - .send_request = xs_send_request, 998 + .send_request = xs_udp_send_request, 999 + .close = xs_close, 1000 + .destroy = xs_destroy, 1001 + }; 1002 + 1003 + static struct rpc_xprt_ops xs_tcp_ops = { 1004 + .set_buffer_size = xs_set_buffer_size, 1005 + .connect = xs_connect, 1006 + .send_request = xs_tcp_send_request, 1032 1007 .close = xs_close, 1033 1008 .destroy = xs_destroy, 1034 1009 }; ··· 1074 1033 1075 1034 INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt); 1076 1035 1077 - xprt->ops = &xs_ops; 1036 + xprt->ops = &xs_udp_ops; 1078 1037 1079 1038 if (to) 1080 1039 xprt->timeout = *to; ··· 1113 1072 1114 1073 INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt); 1115 1074 1116 - xprt->ops = &xs_ops; 1075 + xprt->ops = &xs_tcp_ops; 1117 1076 1118 1077 if (to) 1119 1078 xprt->timeout = *to;