Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v6.6-rc2 376 lines 11 kB view raw
1// SPDX-License-Identifier: GPL-2.0-only 2/****************************************************************************** 3 4(c) 2007 Network Appliance, Inc. All Rights Reserved. 5(c) 2009 NetApp. All Rights Reserved. 6 7 8******************************************************************************/ 9 10#include <linux/tcp.h> 11#include <linux/slab.h> 12#include <linux/sunrpc/xprt.h> 13#include <linux/export.h> 14#include <linux/sunrpc/bc_xprt.h> 15 16#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 17#define RPCDBG_FACILITY RPCDBG_TRANS 18#endif 19 20#define BC_MAX_SLOTS 64U 21 22unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt) 23{ 24 return BC_MAX_SLOTS; 25} 26 27/* 28 * Helper routines that track the number of preallocation elements 29 * on the transport. 30 */ 31static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) 32{ 33 return xprt->bc_alloc_count < xprt->bc_alloc_max; 34} 35 36/* 37 * Free the preallocated rpc_rqst structure and the memory 38 * buffers hanging off of it. 39 */ 40static void xprt_free_allocation(struct rpc_rqst *req) 41{ 42 struct xdr_buf *xbufp; 43 44 dprintk("RPC: free allocations for req= %p\n", req); 45 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); 46 xbufp = &req->rq_rcv_buf; 47 free_page((unsigned long)xbufp->head[0].iov_base); 48 xbufp = &req->rq_snd_buf; 49 free_page((unsigned long)xbufp->head[0].iov_base); 50 kfree(req); 51} 52 53static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf) 54{ 55 buf->head[0].iov_len = PAGE_SIZE; 56 buf->tail[0].iov_len = 0; 57 buf->pages = NULL; 58 buf->page_len = 0; 59 buf->flags = 0; 60 buf->len = 0; 61 buf->buflen = PAGE_SIZE; 62} 63 64static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags) 65{ 66 struct page *page; 67 /* Preallocate one XDR receive buffer */ 68 page = alloc_page(gfp_flags); 69 if (page == NULL) 70 return -ENOMEM; 71 xdr_buf_init(buf, page_address(page), PAGE_SIZE); 72 return 0; 73} 74 75static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt) 76{ 77 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN; 78 struct rpc_rqst *req; 79 80 /* Pre-allocate one backchannel rpc_rqst */ 81 req = kzalloc(sizeof(*req), gfp_flags); 82 if (req == NULL) 83 return NULL; 84 85 req->rq_xprt = xprt; 86 INIT_LIST_HEAD(&req->rq_bc_list); 87 88 /* Preallocate one XDR receive buffer */ 89 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) { 90 printk(KERN_ERR "Failed to create bc receive xbuf\n"); 91 goto out_free; 92 } 93 req->rq_rcv_buf.len = PAGE_SIZE; 94 95 /* Preallocate one XDR send buffer */ 96 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) { 97 printk(KERN_ERR "Failed to create bc snd xbuf\n"); 98 goto out_free; 99 } 100 return req; 101out_free: 102 xprt_free_allocation(req); 103 return NULL; 104} 105 106/* 107 * Preallocate up to min_reqs structures and related buffers for use 108 * by the backchannel. This function can be called multiple times 109 * when creating new sessions that use the same rpc_xprt. The 110 * preallocated buffers are added to the pool of resources used by 111 * the rpc_xprt. Any one of these resources may be used by an 112 * incoming callback request. It's up to the higher levels in the 113 * stack to enforce that the maximum number of session slots is not 114 * being exceeded. 115 * 116 * Some callback arguments can be large. For example, a pNFS server 117 * using multiple deviceids. The list can be unbound, but the client 118 * has the ability to tell the server the maximum size of the callback 119 * requests. Each deviceID is 16 bytes, so allocate one page 120 * for the arguments to have enough room to receive a number of these 121 * deviceIDs. The NFS client indicates to the pNFS server that its 122 * callback requests can be up to 4096 bytes in size. 123 */ 124int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 125{ 126 if (!xprt->ops->bc_setup) 127 return 0; 128 return xprt->ops->bc_setup(xprt, min_reqs); 129} 130EXPORT_SYMBOL_GPL(xprt_setup_backchannel); 131 132int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) 133{ 134 struct rpc_rqst *req; 135 struct list_head tmp_list; 136 int i; 137 138 dprintk("RPC: setup backchannel transport\n"); 139 140 if (min_reqs > BC_MAX_SLOTS) 141 min_reqs = BC_MAX_SLOTS; 142 143 /* 144 * We use a temporary list to keep track of the preallocated 145 * buffers. Once we're done building the list we splice it 146 * into the backchannel preallocation list off of the rpc_xprt 147 * struct. This helps minimize the amount of time the list 148 * lock is held on the rpc_xprt struct. It also makes cleanup 149 * easier in case of memory allocation errors. 150 */ 151 INIT_LIST_HEAD(&tmp_list); 152 for (i = 0; i < min_reqs; i++) { 153 /* Pre-allocate one backchannel rpc_rqst */ 154 req = xprt_alloc_bc_req(xprt); 155 if (req == NULL) { 156 printk(KERN_ERR "Failed to create bc rpc_rqst\n"); 157 goto out_free; 158 } 159 160 /* Add the allocated buffer to the tmp list */ 161 dprintk("RPC: adding req= %p\n", req); 162 list_add(&req->rq_bc_pa_list, &tmp_list); 163 } 164 165 /* 166 * Add the temporary list to the backchannel preallocation list 167 */ 168 spin_lock(&xprt->bc_pa_lock); 169 list_splice(&tmp_list, &xprt->bc_pa_list); 170 xprt->bc_alloc_count += min_reqs; 171 xprt->bc_alloc_max += min_reqs; 172 atomic_add(min_reqs, &xprt->bc_slot_count); 173 spin_unlock(&xprt->bc_pa_lock); 174 175 dprintk("RPC: setup backchannel transport done\n"); 176 return 0; 177 178out_free: 179 /* 180 * Memory allocation failed, free the temporary list 181 */ 182 while (!list_empty(&tmp_list)) { 183 req = list_first_entry(&tmp_list, 184 struct rpc_rqst, 185 rq_bc_pa_list); 186 list_del(&req->rq_bc_pa_list); 187 xprt_free_allocation(req); 188 } 189 190 dprintk("RPC: setup backchannel transport failed\n"); 191 return -ENOMEM; 192} 193 194/** 195 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 196 * @xprt: the transport holding the preallocated strucures 197 * @max_reqs: the maximum number of preallocated structures to destroy 198 * 199 * Since these structures may have been allocated by multiple calls 200 * to xprt_setup_backchannel, we only destroy up to the maximum number 201 * of reqs specified by the caller. 202 */ 203void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 204{ 205 if (xprt->ops->bc_destroy) 206 xprt->ops->bc_destroy(xprt, max_reqs); 207} 208EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); 209 210void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) 211{ 212 struct rpc_rqst *req = NULL, *tmp = NULL; 213 214 dprintk("RPC: destroy backchannel transport\n"); 215 216 if (max_reqs == 0) 217 goto out; 218 219 spin_lock_bh(&xprt->bc_pa_lock); 220 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max); 221 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 222 dprintk("RPC: req=%p\n", req); 223 list_del(&req->rq_bc_pa_list); 224 xprt_free_allocation(req); 225 xprt->bc_alloc_count--; 226 atomic_dec(&xprt->bc_slot_count); 227 if (--max_reqs == 0) 228 break; 229 } 230 spin_unlock_bh(&xprt->bc_pa_lock); 231 232out: 233 dprintk("RPC: backchannel list empty= %s\n", 234 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 235} 236 237static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid, 238 struct rpc_rqst *new) 239{ 240 struct rpc_rqst *req = NULL; 241 242 dprintk("RPC: allocate a backchannel request\n"); 243 if (list_empty(&xprt->bc_pa_list)) { 244 if (!new) 245 goto not_found; 246 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS) 247 goto not_found; 248 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list); 249 xprt->bc_alloc_count++; 250 atomic_inc(&xprt->bc_slot_count); 251 } 252 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, 253 rq_bc_pa_list); 254 req->rq_reply_bytes_recvd = 0; 255 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 256 sizeof(req->rq_private_buf)); 257 req->rq_xid = xid; 258 req->rq_connect_cookie = xprt->connect_cookie; 259 dprintk("RPC: backchannel req=%p\n", req); 260not_found: 261 return req; 262} 263 264/* 265 * Return the preallocated rpc_rqst structure and XDR buffers 266 * associated with this rpc_task. 267 */ 268void xprt_free_bc_request(struct rpc_rqst *req) 269{ 270 struct rpc_xprt *xprt = req->rq_xprt; 271 272 xprt->ops->bc_free_rqst(req); 273} 274 275void xprt_free_bc_rqst(struct rpc_rqst *req) 276{ 277 struct rpc_xprt *xprt = req->rq_xprt; 278 279 dprintk("RPC: free backchannel req=%p\n", req); 280 281 req->rq_connect_cookie = xprt->connect_cookie - 1; 282 smp_mb__before_atomic(); 283 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 284 smp_mb__after_atomic(); 285 286 /* 287 * Return it to the list of preallocations so that it 288 * may be reused by a new callback request. 289 */ 290 spin_lock_bh(&xprt->bc_pa_lock); 291 if (xprt_need_to_requeue(xprt)) { 292 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf); 293 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf); 294 req->rq_rcv_buf.len = PAGE_SIZE; 295 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); 296 xprt->bc_alloc_count++; 297 atomic_inc(&xprt->bc_slot_count); 298 req = NULL; 299 } 300 spin_unlock_bh(&xprt->bc_pa_lock); 301 if (req != NULL) { 302 /* 303 * The last remaining session was destroyed while this 304 * entry was in use. Free the entry and don't attempt 305 * to add back to the list because there is no need to 306 * have anymore preallocated entries. 307 */ 308 dprintk("RPC: Last session removed req=%p\n", req); 309 xprt_free_allocation(req); 310 } 311 xprt_put(xprt); 312} 313 314/* 315 * One or more rpc_rqst structure have been preallocated during the 316 * backchannel setup. Buffer space for the send and private XDR buffers 317 * has been preallocated as well. Use xprt_alloc_bc_request to allocate 318 * to this request. Use xprt_free_bc_request to return it. 319 * 320 * We know that we're called in soft interrupt context, grab the spin_lock 321 * since there is no need to grab the bottom half spin_lock. 322 * 323 * Return an available rpc_rqst, otherwise NULL if non are available. 324 */ 325struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid) 326{ 327 struct rpc_rqst *req, *new = NULL; 328 329 do { 330 spin_lock(&xprt->bc_pa_lock); 331 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) { 332 if (req->rq_connect_cookie != xprt->connect_cookie) 333 continue; 334 if (req->rq_xid == xid) 335 goto found; 336 } 337 req = xprt_get_bc_request(xprt, xid, new); 338found: 339 spin_unlock(&xprt->bc_pa_lock); 340 if (new) { 341 if (req != new) 342 xprt_free_allocation(new); 343 break; 344 } else if (req) 345 break; 346 new = xprt_alloc_bc_req(xprt); 347 } while (new); 348 return req; 349} 350 351/* 352 * Add callback request to callback list. The callback 353 * service sleeps on the sv_cb_waitq waiting for new 354 * requests. Wake it up after adding enqueing the 355 * request. 356 */ 357void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) 358{ 359 struct rpc_xprt *xprt = req->rq_xprt; 360 struct svc_serv *bc_serv = xprt->bc_serv; 361 362 spin_lock(&xprt->bc_pa_lock); 363 list_del(&req->rq_bc_pa_list); 364 xprt->bc_alloc_count--; 365 spin_unlock(&xprt->bc_pa_lock); 366 367 req->rq_private_buf.len = copied; 368 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 369 370 dprintk("RPC: add callback request to list\n"); 371 xprt_get(xprt); 372 spin_lock(&bc_serv->sv_cb_lock); 373 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); 374 wake_up(&bc_serv->sv_cb_waitq); 375 spin_unlock(&bc_serv->sv_cb_lock); 376}