Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

nfsd41: sunrpc: Added rpc server-side backchannel handling

When the call direction is a reply, copy the xid and call direction into the
req->rq_private_buf.head[0].iov_base otherwise rpc_verify_header returns
rpc_garbage.

Signed-off-by: Rahul Iyer <iyer@netapp.com>
Signed-off-by: Mike Sager <sager@netapp.com>
Signed-off-by: Marc Eshel <eshel@almaden.ibm.com>
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: Andy Adamson <andros@netapp.com>
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
[get rid of CONFIG_NFSD_V4_1]
[sunrpc: refactoring of svc_tcp_recvfrom]
[nfsd41: sunrpc: create common send routine for the fore and the back channels]
[nfsd41: sunrpc: Use free_page() to free server backchannel pages]
[nfsd41: sunrpc: Document server backchannel locking]
[nfsd41: sunrpc: remove bc_connect_worker()]
[nfsd41: sunrpc: Define xprt_server_backchannel()[
[nfsd41: sunrpc: remove bc_close and bc_init_auto_disconnect dummy functions]
[nfsd41: sunrpc: eliminate unneeded switch statement in xs_setup_tcp()]
[nfsd41: sunrpc: Don't auto close the server backchannel connection]
[nfsd41: sunrpc: Remove unused functions]
Signed-off-by: Alexandros Batsakis <batsakis@netapp.com>
Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
[nfsd41: change bc_sock to bc_xprt]
[nfsd41: sunrpc: move struct rpc_buffer def into a common header file]
[nfsd41: sunrpc: use rpc_sleep in bc_send_request so not to block on mutex]
[removed cosmetic changes]
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
[sunrpc: add new xprt class for nfsv4.1 backchannel]
[sunrpc: v2.1 change handling of auto_close and init_auto_disconnect operations for the nfsv4.1 backchannel]
Signed-off-by: Alexandros Batsakis <batsakis@netapp.com>
[reverted more cosmetic leftovers]
[got rid of xprt_server_backchannel]
[separated "nfsd41: sunrpc: add new xprt class for nfsv4.1 backchannel"]
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
Cc: Trond Myklebust <trond.myklebust@netapp.com>
[sunrpc: change idle timeout value for the backchannel]
Signed-off-by: Alexandros Batsakis <batsakis@netapp.com>
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
Acked-by: Trond Myklebust <trond.myklebust@netapp.com>
Signed-off-by: J. Bruce Fields <bfields@citi.umich.edu>

authored by

Rahul Iyer and committed by
J. Bruce Fields
4cfc7e60 6951867b

+303 -39
+1
include/linux/sunrpc/svc_xprt.h
··· 65 65 size_t xpt_locallen; /* length of address */ 66 66 struct sockaddr_storage xpt_remote; /* remote peer's address */ 67 67 size_t xpt_remotelen; /* length of address */ 68 + struct rpc_wait_queue xpt_bc_pending; /* backchannel wait queue */ 68 69 }; 69 70 70 71 int svc_reg_xprt_class(struct svc_xprt_class *);
+1
include/linux/sunrpc/svcsock.h
··· 28 28 /* private TCP part */ 29 29 u32 sk_reclen; /* length of record */ 30 30 u32 sk_tcplen; /* current read length */ 31 + struct rpc_xprt *sk_bc_xprt; /* NFSv4.1 backchannel xprt */ 31 32 }; 32 33 33 34 /*
+1
include/linux/sunrpc/xprt.h
··· 179 179 spinlock_t reserve_lock; /* lock slot table */ 180 180 u32 xid; /* Next XID value to use */ 181 181 struct rpc_task * snd_task; /* Task blocked in send */ 182 + struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */ 182 183 #if defined(CONFIG_NFS_V4_1) 183 184 struct svc_serv *bc_serv; /* The RPC service which will */ 184 185 /* process the callback */
+4
net/sunrpc/sunrpc.h
··· 43 43 (task->tk_msg.rpc_proc->p_decode != NULL); 44 44 } 45 45 46 + int svc_send_common(struct socket *sock, struct xdr_buf *xdr, 47 + struct page *headpage, unsigned long headoffset, 48 + struct page *tailpage, unsigned long tailoffset); 49 + 46 50 #endif /* _NET_SUNRPC_SUNRPC_H */ 47 51
+2
net/sunrpc/svc_xprt.c
··· 160 160 mutex_init(&xprt->xpt_mutex); 161 161 spin_lock_init(&xprt->xpt_lock); 162 162 set_bit(XPT_BUSY, &xprt->xpt_flags); 163 + rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); 163 164 } 164 165 EXPORT_SYMBOL_GPL(svc_xprt_init); 165 166 ··· 811 810 else 812 811 len = xprt->xpt_ops->xpo_sendto(rqstp); 813 812 mutex_unlock(&xprt->xpt_mutex); 813 + rpc_wake_up(&xprt->xpt_bc_pending); 814 814 svc_xprt_release(rqstp); 815 815 816 816 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
+137 -35
net/sunrpc/svcsock.c
··· 49 49 #include <linux/sunrpc/msg_prot.h> 50 50 #include <linux/sunrpc/svcsock.h> 51 51 #include <linux/sunrpc/stats.h> 52 + #include <linux/sunrpc/xprt.h> 52 53 53 54 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 54 55 ··· 154 153 } 155 154 156 155 /* 157 - * Generic sendto routine 156 + * send routine intended to be shared by the fore- and back-channel 158 157 */ 159 - static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) 158 + int svc_send_common(struct socket *sock, struct xdr_buf *xdr, 159 + struct page *headpage, unsigned long headoffset, 160 + struct page *tailpage, unsigned long tailoffset) 160 161 { 161 - struct svc_sock *svsk = 162 - container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); 163 - struct socket *sock = svsk->sk_sock; 164 - int slen; 165 - union { 166 - struct cmsghdr hdr; 167 - long all[SVC_PKTINFO_SPACE / sizeof(long)]; 168 - } buffer; 169 - struct cmsghdr *cmh = &buffer.hdr; 170 - int len = 0; 171 162 int result; 172 163 int size; 173 164 struct page **ppage = xdr->pages; 174 165 size_t base = xdr->page_base; 175 166 unsigned int pglen = xdr->page_len; 176 167 unsigned int flags = MSG_MORE; 177 - RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]); 168 + int slen; 169 + int len = 0; 178 170 179 171 slen = xdr->len; 180 - 181 - if (rqstp->rq_prot == IPPROTO_UDP) { 182 - struct msghdr msg = { 183 - .msg_name = &rqstp->rq_addr, 184 - .msg_namelen = rqstp->rq_addrlen, 185 - .msg_control = cmh, 186 - .msg_controllen = sizeof(buffer), 187 - .msg_flags = MSG_MORE, 188 - }; 189 - 190 - svc_set_cmsg_data(rqstp, cmh); 191 - 192 - if (sock_sendmsg(sock, &msg, 0) < 0) 193 - goto out; 194 - } 195 172 196 173 /* send head */ 197 174 if (slen == xdr->head[0].iov_len) 198 175 flags = 0; 199 - len = kernel_sendpage(sock, rqstp->rq_respages[0], 0, 176 + len = kernel_sendpage(sock, headpage, headoffset, 200 177 xdr->head[0].iov_len, flags); 201 178 if (len != xdr->head[0].iov_len) 202 179 goto out; ··· 198 219 base = 0; 199 220 ppage++; 200 221 } 222 + 201 223 /* send tail */ 202 224 if (xdr->tail[0].iov_len) { 203 - result = kernel_sendpage(sock, rqstp->rq_respages[0], 204 - ((unsigned long)xdr->tail[0].iov_base) 205 - & (PAGE_SIZE-1), 206 - xdr->tail[0].iov_len, 0); 207 - 225 + result = kernel_sendpage(sock, tailpage, tailoffset, 226 + xdr->tail[0].iov_len, 0); 208 227 if (result > 0) 209 228 len += result; 210 229 } 230 + 231 + out: 232 + return len; 233 + } 234 + 235 + 236 + /* 237 + * Generic sendto routine 238 + */ 239 + static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) 240 + { 241 + struct svc_sock *svsk = 242 + container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); 243 + struct socket *sock = svsk->sk_sock; 244 + union { 245 + struct cmsghdr hdr; 246 + long all[SVC_PKTINFO_SPACE / sizeof(long)]; 247 + } buffer; 248 + struct cmsghdr *cmh = &buffer.hdr; 249 + int len = 0; 250 + unsigned long tailoff; 251 + unsigned long headoff; 252 + RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]); 253 + 254 + if (rqstp->rq_prot == IPPROTO_UDP) { 255 + struct msghdr msg = { 256 + .msg_name = &rqstp->rq_addr, 257 + .msg_namelen = rqstp->rq_addrlen, 258 + .msg_control = cmh, 259 + .msg_controllen = sizeof(buffer), 260 + .msg_flags = MSG_MORE, 261 + }; 262 + 263 + svc_set_cmsg_data(rqstp, cmh); 264 + 265 + if (sock_sendmsg(sock, &msg, 0) < 0) 266 + goto out; 267 + } 268 + 269 + tailoff = ((unsigned long)xdr->tail[0].iov_base) & (PAGE_SIZE-1); 270 + headoff = 0; 271 + len = svc_send_common(sock, xdr, rqstp->rq_respages[0], headoff, 272 + rqstp->rq_respages[0], tailoff); 273 + 211 274 out: 212 275 dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %s)\n", 213 276 svsk, xdr->head[0].iov_base, xdr->head[0].iov_len, ··· 972 951 return -EAGAIN; 973 952 } 974 953 954 + static int svc_process_calldir(struct svc_sock *svsk, struct svc_rqst *rqstp, 955 + struct rpc_rqst **reqpp, struct kvec *vec) 956 + { 957 + struct rpc_rqst *req = NULL; 958 + u32 *p; 959 + u32 xid; 960 + u32 calldir; 961 + int len; 962 + 963 + len = svc_recvfrom(rqstp, vec, 1, 8); 964 + if (len < 0) 965 + goto error; 966 + 967 + p = (u32 *)rqstp->rq_arg.head[0].iov_base; 968 + xid = *p++; 969 + calldir = *p; 970 + 971 + if (calldir == 0) { 972 + /* REQUEST is the most common case */ 973 + vec[0] = rqstp->rq_arg.head[0]; 974 + } else { 975 + /* REPLY */ 976 + if (svsk->sk_bc_xprt) 977 + req = xprt_lookup_rqst(svsk->sk_bc_xprt, xid); 978 + 979 + if (!req) { 980 + printk(KERN_NOTICE 981 + "%s: Got unrecognized reply: " 982 + "calldir 0x%x sk_bc_xprt %p xid %08x\n", 983 + __func__, ntohl(calldir), 984 + svsk->sk_bc_xprt, xid); 985 + vec[0] = rqstp->rq_arg.head[0]; 986 + goto out; 987 + } 988 + 989 + memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 990 + sizeof(struct xdr_buf)); 991 + /* copy the xid and call direction */ 992 + memcpy(req->rq_private_buf.head[0].iov_base, 993 + rqstp->rq_arg.head[0].iov_base, 8); 994 + vec[0] = req->rq_private_buf.head[0]; 995 + } 996 + out: 997 + vec[0].iov_base += 8; 998 + vec[0].iov_len -= 8; 999 + len = svsk->sk_reclen - 8; 1000 + error: 1001 + *reqpp = req; 1002 + return len; 1003 + } 1004 + 975 1005 /* 976 1006 * Receive data from a TCP socket. 977 1007 */ ··· 1034 962 int len; 1035 963 struct kvec *vec; 1036 964 int pnum, vlen; 965 + struct rpc_rqst *req = NULL; 1037 966 1038 967 dprintk("svc: tcp_recv %p data %d conn %d close %d\n", 1039 968 svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags), ··· 1048 975 vec = rqstp->rq_vec; 1049 976 vec[0] = rqstp->rq_arg.head[0]; 1050 977 vlen = PAGE_SIZE; 978 + 979 + /* 980 + * We have enough data for the whole tcp record. Let's try and read the 981 + * first 8 bytes to get the xid and the call direction. We can use this 982 + * to figure out if this is a call or a reply to a callback. If 983 + * sk_reclen is < 8 (xid and calldir), then this is a malformed packet. 984 + * In that case, don't bother with the calldir and just read the data. 985 + * It will be rejected in svc_process. 986 + */ 987 + if (len >= 8) { 988 + len = svc_process_calldir(svsk, rqstp, &req, vec); 989 + if (len < 0) 990 + goto err_again; 991 + vlen -= 8; 992 + } 993 + 1051 994 pnum = 1; 1052 995 while (vlen < len) { 1053 - vec[pnum].iov_base = page_address(rqstp->rq_pages[pnum]); 996 + vec[pnum].iov_base = (req) ? 997 + page_address(req->rq_private_buf.pages[pnum - 1]) : 998 + page_address(rqstp->rq_pages[pnum]); 1054 999 vec[pnum].iov_len = PAGE_SIZE; 1055 1000 pnum++; 1056 1001 vlen += PAGE_SIZE; ··· 1080 989 if (len < 0) 1081 990 goto err_again; 1082 991 992 + /* 993 + * Account for the 8 bytes we read earlier 994 + */ 995 + len += 8; 996 + 997 + if (req) { 998 + xprt_complete_rqst(req->rq_task, len); 999 + len = 0; 1000 + goto out; 1001 + } 1083 1002 dprintk("svc: TCP complete record (%d bytes)\n", len); 1084 1003 rqstp->rq_arg.len = len; 1085 1004 rqstp->rq_arg.page_base = 0; ··· 1103 1002 rqstp->rq_xprt_ctxt = NULL; 1104 1003 rqstp->rq_prot = IPPROTO_TCP; 1105 1004 1005 + out: 1106 1006 /* Reset TCP read info */ 1107 1007 svsk->sk_reclen = 0; 1108 1008 svsk->sk_tcplen = 0;
+11 -4
net/sunrpc/xprt.c
··· 832 832 spin_unlock_bh(&xprt->transport_lock); 833 833 } 834 834 835 + static inline int xprt_has_timer(struct rpc_xprt *xprt) 836 + { 837 + return xprt->idle_timeout != 0; 838 + } 839 + 835 840 /** 836 841 * xprt_prepare_transmit - reserve the transport before sending a request 837 842 * @task: RPC task about to send a request ··· 1018 1013 if (!list_empty(&req->rq_list)) 1019 1014 list_del(&req->rq_list); 1020 1015 xprt->last_used = jiffies; 1021 - if (list_empty(&xprt->recv)) 1016 + if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) 1022 1017 mod_timer(&xprt->timer, 1023 1018 xprt->last_used + xprt->idle_timeout); 1024 1019 spin_unlock_bh(&xprt->transport_lock); ··· 1087 1082 #endif /* CONFIG_NFS_V4_1 */ 1088 1083 1089 1084 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1090 - setup_timer(&xprt->timer, xprt_init_autodisconnect, 1091 - (unsigned long)xprt); 1085 + if (xprt_has_timer(xprt)) 1086 + setup_timer(&xprt->timer, xprt_init_autodisconnect, 1087 + (unsigned long)xprt); 1088 + else 1089 + init_timer(&xprt->timer); 1092 1090 xprt->last_used = jiffies; 1093 1091 xprt->cwnd = RPC_INITCWND; 1094 1092 xprt->bind_index = 0; ··· 1110 1102 1111 1103 dprintk("RPC: created transport %p with %u slots\n", xprt, 1112 1104 xprt->max_reqs); 1113 - 1114 1105 return xprt; 1115 1106 } 1116 1107
+146
net/sunrpc/xprtsock.c
··· 32 32 #include <linux/tcp.h> 33 33 #include <linux/sunrpc/clnt.h> 34 34 #include <linux/sunrpc/sched.h> 35 + #include <linux/sunrpc/svcsock.h> 35 36 #include <linux/sunrpc/xprtsock.h> 36 37 #include <linux/file.h> 37 38 #ifdef CONFIG_NFS_V4_1 ··· 44 43 #include <net/udp.h> 45 44 #include <net/tcp.h> 46 45 46 + #include "sunrpc.h" 47 47 /* 48 48 * xprtsock tunables 49 49 */ ··· 2100 2098 xprt->stat.bklog_u); 2101 2099 } 2102 2100 2101 + /* 2102 + * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason 2103 + * we allocate pages instead doing a kmalloc like rpc_malloc is because we want 2104 + * to use the server side send routines. 2105 + */ 2106 + void *bc_malloc(struct rpc_task *task, size_t size) 2107 + { 2108 + struct page *page; 2109 + struct rpc_buffer *buf; 2110 + 2111 + BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer)); 2112 + page = alloc_page(GFP_KERNEL); 2113 + 2114 + if (!page) 2115 + return NULL; 2116 + 2117 + buf = page_address(page); 2118 + buf->len = PAGE_SIZE; 2119 + 2120 + return buf->data; 2121 + } 2122 + 2123 + /* 2124 + * Free the space allocated in the bc_alloc routine 2125 + */ 2126 + void bc_free(void *buffer) 2127 + { 2128 + struct rpc_buffer *buf; 2129 + 2130 + if (!buffer) 2131 + return; 2132 + 2133 + buf = container_of(buffer, struct rpc_buffer, data); 2134 + free_page((unsigned long)buf); 2135 + } 2136 + 2137 + /* 2138 + * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex 2139 + * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request. 2140 + */ 2141 + static int bc_sendto(struct rpc_rqst *req) 2142 + { 2143 + int len; 2144 + struct xdr_buf *xbufp = &req->rq_snd_buf; 2145 + struct rpc_xprt *xprt = req->rq_xprt; 2146 + struct sock_xprt *transport = 2147 + container_of(xprt, struct sock_xprt, xprt); 2148 + struct socket *sock = transport->sock; 2149 + unsigned long headoff; 2150 + unsigned long tailoff; 2151 + 2152 + /* 2153 + * Set up the rpc header and record marker stuff 2154 + */ 2155 + xs_encode_tcp_record_marker(xbufp); 2156 + 2157 + tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK; 2158 + headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK; 2159 + len = svc_send_common(sock, xbufp, 2160 + virt_to_page(xbufp->head[0].iov_base), headoff, 2161 + xbufp->tail[0].iov_base, tailoff); 2162 + 2163 + if (len != xbufp->len) { 2164 + printk(KERN_NOTICE "Error sending entire callback!\n"); 2165 + len = -EAGAIN; 2166 + } 2167 + 2168 + return len; 2169 + } 2170 + 2171 + /* 2172 + * The send routine. Borrows from svc_send 2173 + */ 2174 + static int bc_send_request(struct rpc_task *task) 2175 + { 2176 + struct rpc_rqst *req = task->tk_rqstp; 2177 + struct svc_xprt *xprt; 2178 + struct svc_sock *svsk; 2179 + u32 len; 2180 + 2181 + dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); 2182 + /* 2183 + * Get the server socket associated with this callback xprt 2184 + */ 2185 + xprt = req->rq_xprt->bc_xprt; 2186 + svsk = container_of(xprt, struct svc_sock, sk_xprt); 2187 + 2188 + /* 2189 + * Grab the mutex to serialize data as the connection is shared 2190 + * with the fore channel 2191 + */ 2192 + if (!mutex_trylock(&xprt->xpt_mutex)) { 2193 + rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL); 2194 + if (!mutex_trylock(&xprt->xpt_mutex)) 2195 + return -EAGAIN; 2196 + rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task); 2197 + } 2198 + if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 2199 + len = -ENOTCONN; 2200 + else 2201 + len = bc_sendto(req); 2202 + mutex_unlock(&xprt->xpt_mutex); 2203 + 2204 + if (len > 0) 2205 + len = 0; 2206 + 2207 + return len; 2208 + } 2209 + 2210 + /* 2211 + * The close routine. Since this is client initiated, we do nothing 2212 + */ 2213 + 2214 + static void bc_close(struct rpc_xprt *xprt) 2215 + { 2216 + return; 2217 + } 2218 + 2219 + /* 2220 + * The xprt destroy routine. Again, because this connection is client 2221 + * initiated, we do nothing 2222 + */ 2223 + 2224 + static void bc_destroy(struct rpc_xprt *xprt) 2225 + { 2226 + return; 2227 + } 2228 + 2103 2229 static struct rpc_xprt_ops xs_udp_ops = { 2104 2230 .set_buffer_size = xs_udp_set_buffer_size, 2105 2231 .reserve_xprt = xprt_reserve_xprt_cong, ··· 2261 2131 #endif /* CONFIG_NFS_V4_1 */ 2262 2132 .close = xs_tcp_close, 2263 2133 .destroy = xs_destroy, 2134 + .print_stats = xs_tcp_print_stats, 2135 + }; 2136 + 2137 + /* 2138 + * The rpc_xprt_ops for the server backchannel 2139 + */ 2140 + 2141 + static struct rpc_xprt_ops bc_tcp_ops = { 2142 + .reserve_xprt = xprt_reserve_xprt, 2143 + .release_xprt = xprt_release_xprt, 2144 + .buf_alloc = bc_malloc, 2145 + .buf_free = bc_free, 2146 + .send_request = bc_send_request, 2147 + .set_retrans_timeout = xprt_set_retrans_timeout_def, 2148 + .close = bc_close, 2149 + .destroy = bc_destroy, 2264 2150 .print_stats = xs_tcp_print_stats, 2265 2151 }; 2266 2152