Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

svcrdma: Add a "parsed chunk list" data structure

This simple data structure binds the location of each data payload
inside of an RPC message to the chunk that will be used to push it
to or pull it from the client.

There are several benefits to this small additional overhead:

* It enables support for more than one chunk in incoming Read and
Write lists.

* It translates the version-specific on-the-wire format into a
generic in-memory structure, enabling support for multiple
versions of the RPC/RDMA transport protocol.

* It enables the server to re-organize a chunk list if it needs to
adjust where Read chunk data lands in server memory without
altering the contents of the XDR-encoded Receive buffer.

Construction of these lists is done while sanity checking each
incoming RPC/RDMA header. Subsequent patches will make use of the
generated data structures.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>

+635 -84
+12
include/linux/sunrpc/svc_rdma.h
··· 47 47 #include <linux/sunrpc/svcsock.h> 48 48 #include <linux/sunrpc/rpc_rdma.h> 49 49 #include <linux/sunrpc/rpc_rdma_cid.h> 50 + #include <linux/sunrpc/svc_rdma_pcl.h> 51 + 50 52 #include <rdma/ib_verbs.h> 51 53 #include <rdma/rdma_cm.h> 52 54 ··· 144 142 unsigned int rc_page_count; 145 143 unsigned int rc_hdr_count; 146 144 u32 rc_inv_rkey; 145 + 146 + struct svc_rdma_pcl rc_call_pcl; 147 + 148 + struct svc_rdma_pcl rc_read_pcl; 149 + 147 150 __be32 *rc_write_list; 151 + struct svc_rdma_chunk *rc_cur_result_payload; 152 + struct svc_rdma_pcl rc_write_pcl; 153 + 148 154 __be32 *rc_reply_chunk; 155 + struct svc_rdma_pcl rc_reply_pcl; 156 + 149 157 unsigned int rc_read_payload_offset; 150 158 unsigned int rc_read_payload_length; 151 159 struct page *rc_pages[RPCSVC_MAXPAGES];
+128
include/linux/sunrpc/svc_rdma_pcl.h
··· 1 + /* SPDX-License-Identifier: GPL-2.0 */ 2 + /* 3 + * Copyright (c) 2020, Oracle and/or its affiliates 4 + */ 5 + 6 + #ifndef SVC_RDMA_PCL_H 7 + #define SVC_RDMA_PCL_H 8 + 9 + #include <linux/list.h> 10 + 11 + struct svc_rdma_segment { 12 + u32 rs_handle; 13 + u32 rs_length; 14 + u64 rs_offset; 15 + }; 16 + 17 + struct svc_rdma_chunk { 18 + struct list_head ch_list; 19 + 20 + u32 ch_position; 21 + u32 ch_length; 22 + u32 ch_payload_length; 23 + 24 + u32 ch_segcount; 25 + struct svc_rdma_segment ch_segments[]; 26 + }; 27 + 28 + struct svc_rdma_pcl { 29 + unsigned int cl_count; 30 + struct list_head cl_chunks; 31 + }; 32 + 33 + /** 34 + * pcl_init - Initialize a parsed chunk list 35 + * @pcl: parsed chunk list to initialize 36 + * 37 + */ 38 + static inline void pcl_init(struct svc_rdma_pcl *pcl) 39 + { 40 + INIT_LIST_HEAD(&pcl->cl_chunks); 41 + } 42 + 43 + /** 44 + * pcl_is_empty - Return true if parsed chunk list is empty 45 + * @pcl: parsed chunk list 46 + * 47 + */ 48 + static inline bool pcl_is_empty(const struct svc_rdma_pcl *pcl) 49 + { 50 + return list_empty(&pcl->cl_chunks); 51 + } 52 + 53 + /** 54 + * pcl_first_chunk - Return first chunk in a parsed chunk list 55 + * @pcl: parsed chunk list 56 + * 57 + * Returns the first chunk in the list, or NULL if the list is empty. 58 + */ 59 + static inline struct svc_rdma_chunk * 60 + pcl_first_chunk(const struct svc_rdma_pcl *pcl) 61 + { 62 + if (pcl_is_empty(pcl)) 63 + return NULL; 64 + return list_first_entry(&pcl->cl_chunks, struct svc_rdma_chunk, 65 + ch_list); 66 + } 67 + 68 + /** 69 + * pcl_next_chunk - Return next chunk in a parsed chunk list 70 + * @pcl: a parsed chunk list 71 + * @chunk: chunk in @pcl 72 + * 73 + * Returns the next chunk in the list, or NULL if @chunk is already last. 74 + */ 75 + static inline struct svc_rdma_chunk * 76 + pcl_next_chunk(const struct svc_rdma_pcl *pcl, struct svc_rdma_chunk *chunk) 77 + { 78 + if (list_is_last(&chunk->ch_list, &pcl->cl_chunks)) 79 + return NULL; 80 + return list_next_entry(chunk, ch_list); 81 + } 82 + 83 + /** 84 + * pcl_for_each_chunk - Iterate over chunks in a parsed chunk list 85 + * @pos: the loop cursor 86 + * @pcl: a parsed chunk list 87 + */ 88 + #define pcl_for_each_chunk(pos, pcl) \ 89 + for (pos = list_first_entry(&(pcl)->cl_chunks, struct svc_rdma_chunk, ch_list); \ 90 + &pos->ch_list != &(pcl)->cl_chunks; \ 91 + pos = list_next_entry(pos, ch_list)) 92 + 93 + /** 94 + * pcl_for_each_segment - Iterate over segments in a parsed chunk 95 + * @pos: the loop cursor 96 + * @chunk: a parsed chunk 97 + */ 98 + #define pcl_for_each_segment(pos, chunk) \ 99 + for (pos = &(chunk)->ch_segments[0]; \ 100 + pos <= &(chunk)->ch_segments[(chunk)->ch_segcount - 1]; \ 101 + pos++) 102 + 103 + /** 104 + * pcl_chunk_end_offset - Return offset of byte range following @chunk 105 + * @chunk: chunk in @pcl 106 + * 107 + * Returns starting offset of the region just after @chunk 108 + */ 109 + static inline unsigned int 110 + pcl_chunk_end_offset(const struct svc_rdma_chunk *chunk) 111 + { 112 + return xdr_align_size(chunk->ch_position + chunk->ch_payload_length); 113 + } 114 + 115 + struct svc_rdma_recv_ctxt; 116 + 117 + extern void pcl_free(struct svc_rdma_pcl *pcl); 118 + extern bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p); 119 + extern bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p); 120 + extern bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt, 121 + struct svc_rdma_pcl *pcl, __be32 *p); 122 + extern int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl, 123 + const struct xdr_buf *xdr, 124 + int (*actor)(const struct xdr_buf *, 125 + void *), 126 + void *data); 127 + 128 + #endif /* SVC_RDMA_PCL_H */
+73 -2
include/trace/events/rpcrdma.h
··· 1446 1446 ), \ 1447 1447 TP_ARGS(handle, length, offset)) 1448 1448 1449 - DEFINE_SEGMENT_EVENT(decode_wseg); 1450 - DEFINE_SEGMENT_EVENT(encode_rseg); 1451 1449 DEFINE_SEGMENT_EVENT(send_rseg); 1452 1450 DEFINE_SEGMENT_EVENT(encode_wseg); 1453 1451 DEFINE_SEGMENT_EVENT(send_wseg); 1452 + 1453 + TRACE_EVENT(svcrdma_decode_rseg, 1454 + TP_PROTO( 1455 + const struct rpc_rdma_cid *cid, 1456 + const struct svc_rdma_chunk *chunk, 1457 + const struct svc_rdma_segment *segment 1458 + ), 1459 + 1460 + TP_ARGS(cid, chunk, segment), 1461 + 1462 + TP_STRUCT__entry( 1463 + __field(u32, cq_id) 1464 + __field(int, completion_id) 1465 + __field(u32, segno) 1466 + __field(u32, position) 1467 + __field(u32, handle) 1468 + __field(u32, length) 1469 + __field(u64, offset) 1470 + ), 1471 + 1472 + TP_fast_assign( 1473 + __entry->cq_id = cid->ci_queue_id; 1474 + __entry->completion_id = cid->ci_completion_id; 1475 + __entry->segno = chunk->ch_segcount; 1476 + __entry->position = chunk->ch_position; 1477 + __entry->handle = segment->rs_handle; 1478 + __entry->length = segment->rs_length; 1479 + __entry->offset = segment->rs_offset; 1480 + ), 1481 + 1482 + TP_printk("cq_id=%u cid=%d segno=%u position=%u %u@0x%016llx:0x%08x", 1483 + __entry->cq_id, __entry->completion_id, 1484 + __entry->segno, __entry->position, __entry->length, 1485 + (unsigned long long)__entry->offset, __entry->handle 1486 + ) 1487 + ); 1488 + 1489 + TRACE_EVENT(svcrdma_decode_wseg, 1490 + TP_PROTO( 1491 + const struct rpc_rdma_cid *cid, 1492 + const struct svc_rdma_chunk *chunk, 1493 + u32 segno 1494 + ), 1495 + 1496 + TP_ARGS(cid, chunk, segno), 1497 + 1498 + TP_STRUCT__entry( 1499 + __field(u32, cq_id) 1500 + __field(int, completion_id) 1501 + __field(u32, segno) 1502 + __field(u32, handle) 1503 + __field(u32, length) 1504 + __field(u64, offset) 1505 + ), 1506 + 1507 + TP_fast_assign( 1508 + const struct svc_rdma_segment *segment = 1509 + &chunk->ch_segments[segno]; 1510 + 1511 + __entry->cq_id = cid->ci_queue_id; 1512 + __entry->completion_id = cid->ci_completion_id; 1513 + __entry->segno = segno; 1514 + __entry->handle = segment->rs_handle; 1515 + __entry->length = segment->rs_length; 1516 + __entry->offset = segment->rs_offset; 1517 + ), 1518 + 1519 + TP_printk("cq_id=%u cid=%d segno=%u %u@0x%016llx:0x%08x", 1520 + __entry->cq_id, __entry->completion_id, 1521 + __entry->segno, __entry->length, 1522 + (unsigned long long)__entry->offset, __entry->handle 1523 + ) 1524 + ); 1454 1525 1455 1526 DECLARE_EVENT_CLASS(svcrdma_chunk_event, 1456 1527 TP_PROTO(
+1 -1
net/sunrpc/xprtrdma/Makefile
··· 4 4 rpcrdma-y := transport.o rpc_rdma.o verbs.o frwr_ops.o \ 5 5 svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ 6 6 svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \ 7 - module.o 7 + svc_rdma_pcl.o module.o 8 8 rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
+306
net/sunrpc/xprtrdma/svc_rdma_pcl.c
··· 1 + // SPDX-License-Identifier: GPL-2.0 2 + /* 3 + * Copyright (c) 2020 Oracle. All rights reserved. 4 + */ 5 + 6 + #include <linux/sunrpc/svc_rdma.h> 7 + #include <linux/sunrpc/rpc_rdma.h> 8 + 9 + #include "xprt_rdma.h" 10 + #include <trace/events/rpcrdma.h> 11 + 12 + /** 13 + * pcl_free - Release all memory associated with a parsed chunk list 14 + * @pcl: parsed chunk list 15 + * 16 + */ 17 + void pcl_free(struct svc_rdma_pcl *pcl) 18 + { 19 + while (!list_empty(&pcl->cl_chunks)) { 20 + struct svc_rdma_chunk *chunk; 21 + 22 + chunk = pcl_first_chunk(pcl); 23 + list_del(&chunk->ch_list); 24 + kfree(chunk); 25 + } 26 + } 27 + 28 + static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position) 29 + { 30 + struct svc_rdma_chunk *chunk; 31 + 32 + chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL); 33 + if (!chunk) 34 + return NULL; 35 + 36 + chunk->ch_position = position; 37 + chunk->ch_length = 0; 38 + chunk->ch_payload_length = 0; 39 + chunk->ch_segcount = 0; 40 + return chunk; 41 + } 42 + 43 + static struct svc_rdma_chunk * 44 + pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position) 45 + { 46 + struct svc_rdma_chunk *pos; 47 + 48 + pcl_for_each_chunk(pos, pcl) { 49 + if (pos->ch_position == position) 50 + return pos; 51 + } 52 + return NULL; 53 + } 54 + 55 + static void pcl_insert_position(struct svc_rdma_pcl *pcl, 56 + struct svc_rdma_chunk *chunk) 57 + { 58 + struct svc_rdma_chunk *pos; 59 + 60 + pcl_for_each_chunk(pos, pcl) { 61 + if (pos->ch_position > chunk->ch_position) 62 + break; 63 + } 64 + __list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list); 65 + pcl->cl_count++; 66 + } 67 + 68 + static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt, 69 + struct svc_rdma_chunk *chunk, 70 + u32 handle, u32 length, u64 offset) 71 + { 72 + struct svc_rdma_segment *segment; 73 + 74 + segment = &chunk->ch_segments[chunk->ch_segcount]; 75 + segment->rs_handle = handle; 76 + segment->rs_length = length; 77 + segment->rs_offset = offset; 78 + 79 + trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment); 80 + 81 + chunk->ch_length += length; 82 + chunk->ch_segcount++; 83 + } 84 + 85 + /** 86 + * pcl_alloc_call - Construct a parsed chunk list for the Call body 87 + * @rctxt: Ingress receive context 88 + * @p: Start of an un-decoded Read list 89 + * 90 + * Assumptions: 91 + * - The incoming Read list has already been sanity checked. 92 + * - cl_count is already set to the number of segments in 93 + * the un-decoded list. 94 + * - The list might not be in order by position. 95 + * 96 + * Return values: 97 + * %true: Parsed chunk list was successfully constructed, and 98 + * cl_count is updated to be the number of chunks (ie. 99 + * unique positions) in the Read list. 100 + * %false: Memory allocation failed. 101 + */ 102 + bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) 103 + { 104 + struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl; 105 + unsigned int i, segcount = pcl->cl_count; 106 + 107 + pcl->cl_count = 0; 108 + for (i = 0; i < segcount; i++) { 109 + struct svc_rdma_chunk *chunk; 110 + u32 position, handle, length; 111 + u64 offset; 112 + 113 + p++; /* skip the list discriminator */ 114 + p = xdr_decode_read_segment(p, &position, &handle, 115 + &length, &offset); 116 + if (position != 0) 117 + continue; 118 + 119 + if (pcl_is_empty(pcl)) { 120 + chunk = pcl_alloc_chunk(segcount, position); 121 + if (!chunk) 122 + return false; 123 + pcl_insert_position(pcl, chunk); 124 + } else { 125 + chunk = list_first_entry(&pcl->cl_chunks, 126 + struct svc_rdma_chunk, 127 + ch_list); 128 + } 129 + 130 + pcl_set_read_segment(rctxt, chunk, handle, length, offset); 131 + } 132 + 133 + return true; 134 + } 135 + 136 + /** 137 + * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks 138 + * @rctxt: Ingress receive context 139 + * @p: Start of an un-decoded Read list 140 + * 141 + * Assumptions: 142 + * - The incoming Read list has already been sanity checked. 143 + * - cl_count is already set to the number of segments in 144 + * the un-decoded list. 145 + * - The list might not be in order by position. 146 + * 147 + * Return values: 148 + * %true: Parsed chunk list was successfully constructed, and 149 + * cl_count is updated to be the number of chunks (ie. 150 + * unique position values) in the Read list. 151 + * %false: Memory allocation failed. 152 + * 153 + * TODO: 154 + * - Check for chunk range overlaps 155 + */ 156 + bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) 157 + { 158 + struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl; 159 + unsigned int i, segcount = pcl->cl_count; 160 + 161 + pcl->cl_count = 0; 162 + for (i = 0; i < segcount; i++) { 163 + struct svc_rdma_chunk *chunk; 164 + u32 position, handle, length; 165 + u64 offset; 166 + 167 + p++; /* skip the list discriminator */ 168 + p = xdr_decode_read_segment(p, &position, &handle, 169 + &length, &offset); 170 + if (position == 0) 171 + continue; 172 + 173 + chunk = pcl_lookup_position(pcl, position); 174 + if (!chunk) { 175 + chunk = pcl_alloc_chunk(segcount, position); 176 + if (!chunk) 177 + return false; 178 + pcl_insert_position(pcl, chunk); 179 + } 180 + 181 + pcl_set_read_segment(rctxt, chunk, handle, length, offset); 182 + } 183 + 184 + return true; 185 + } 186 + 187 + /** 188 + * pcl_alloc_write - Construct a parsed chunk list from a Write list 189 + * @rctxt: Ingress receive context 190 + * @pcl: Parsed chunk list to populate 191 + * @p: Start of an un-decoded Write list 192 + * 193 + * Assumptions: 194 + * - The incoming Write list has already been sanity checked, and 195 + * - cl_count is set to the number of chunks in the un-decoded list. 196 + * 197 + * Return values: 198 + * %true: Parsed chunk list was successfully constructed. 199 + * %false: Memory allocation failed. 200 + */ 201 + bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt, 202 + struct svc_rdma_pcl *pcl, __be32 *p) 203 + { 204 + struct svc_rdma_segment *segment; 205 + struct svc_rdma_chunk *chunk; 206 + unsigned int i, j; 207 + u32 segcount; 208 + 209 + for (i = 0; i < pcl->cl_count; i++) { 210 + p++; /* skip the list discriminator */ 211 + segcount = be32_to_cpup(p++); 212 + 213 + chunk = pcl_alloc_chunk(segcount, 0); 214 + if (!chunk) 215 + return false; 216 + list_add_tail(&chunk->ch_list, &pcl->cl_chunks); 217 + 218 + for (j = 0; j < segcount; j++) { 219 + segment = &chunk->ch_segments[j]; 220 + p = xdr_decode_rdma_segment(p, &segment->rs_handle, 221 + &segment->rs_length, 222 + &segment->rs_offset); 223 + trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j); 224 + 225 + chunk->ch_length += segment->rs_length; 226 + chunk->ch_segcount++; 227 + } 228 + } 229 + return true; 230 + } 231 + 232 + static int pcl_process_region(const struct xdr_buf *xdr, 233 + unsigned int offset, unsigned int length, 234 + int (*actor)(const struct xdr_buf *, void *), 235 + void *data) 236 + { 237 + struct xdr_buf subbuf; 238 + 239 + if (!length) 240 + return 0; 241 + if (xdr_buf_subsegment(xdr, &subbuf, offset, length)) 242 + return -EMSGSIZE; 243 + return actor(&subbuf, data); 244 + } 245 + 246 + /** 247 + * pcl_process_nonpayloads - Process non-payload regions inside @xdr 248 + * @pcl: Chunk list to process 249 + * @xdr: xdr_buf to process 250 + * @actor: Function to invoke on each non-payload region 251 + * @data: Arguments for @actor 252 + * 253 + * This mechanism must ignore not only result payloads that were already 254 + * sent via RDMA Write, but also XDR padding for those payloads that 255 + * the upper layer has added. 256 + * 257 + * Assumptions: 258 + * The xdr->len and ch_position fields are aligned to 4-byte multiples. 259 + * 260 + * Returns: 261 + * On success, zero, 262 + * %-EMSGSIZE on XDR buffer overflow, or 263 + * The return value of @actor 264 + */ 265 + int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl, 266 + const struct xdr_buf *xdr, 267 + int (*actor)(const struct xdr_buf *, void *), 268 + void *data) 269 + { 270 + struct svc_rdma_chunk *chunk, *next; 271 + unsigned int start; 272 + int ret; 273 + 274 + chunk = pcl_first_chunk(pcl); 275 + 276 + /* No result payloads were generated */ 277 + if (!chunk || !chunk->ch_payload_length) 278 + return actor(xdr, data); 279 + 280 + /* Process the region before the first result payload */ 281 + ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data); 282 + if (ret < 0) 283 + return ret; 284 + 285 + /* Process the regions between each middle result payload */ 286 + while ((next = pcl_next_chunk(pcl, chunk))) { 287 + if (!next->ch_payload_length) 288 + break; 289 + 290 + start = pcl_chunk_end_offset(chunk); 291 + ret = pcl_process_region(xdr, start, next->ch_position - start, 292 + actor, data); 293 + if (ret < 0) 294 + return ret; 295 + 296 + chunk = next; 297 + } 298 + 299 + /* Process the region after the last result payload */ 300 + start = pcl_chunk_end_offset(chunk); 301 + ret = pcl_process_region(xdr, start, xdr->len - start, actor, data); 302 + if (ret < 0) 303 + return ret; 304 + 305 + return 0; 306 + }
+115 -81
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
··· 93 93 * (see rdma_read_complete() below). 94 94 */ 95 95 96 + #include <linux/slab.h> 96 97 #include <linux/spinlock.h> 97 98 #include <asm/unaligned.h> 98 99 #include <rdma/ib_verbs.h> ··· 144 143 goto fail2; 145 144 146 145 svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid); 146 + pcl_init(&ctxt->rc_call_pcl); 147 + pcl_init(&ctxt->rc_read_pcl); 148 + pcl_init(&ctxt->rc_write_pcl); 149 + pcl_init(&ctxt->rc_reply_pcl); 147 150 148 151 ctxt->rc_recv_wr.next = NULL; 149 152 ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe; ··· 230 225 231 226 for (i = 0; i < ctxt->rc_page_count; i++) 232 227 put_page(ctxt->rc_pages[i]); 228 + 229 + pcl_free(&ctxt->rc_call_pcl); 230 + pcl_free(&ctxt->rc_read_pcl); 231 + pcl_free(&ctxt->rc_write_pcl); 232 + pcl_free(&ctxt->rc_reply_pcl); 233 233 234 234 if (!ctxt->rc_temp) 235 235 llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts); ··· 395 385 arg->len = ctxt->rc_byte_len; 396 386 } 397 387 398 - /* This accommodates the largest possible Write chunk. 399 - */ 400 - #define MAX_BYTES_WRITE_CHUNK ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT)) 401 - 402 - /* This accommodates the largest possible Position-Zero 403 - * Read chunk or Reply chunk. 404 - */ 405 - #define MAX_BYTES_SPECIAL_CHUNK ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT)) 406 - 407 - /* Sanity check the Read list. 388 + /** 389 + * xdr_count_read_segments - Count number of Read segments in Read list 390 + * @rctxt: Ingress receive context 391 + * @p: Start of an un-decoded Read list 408 392 * 409 - * Implementation limits: 410 - * - This implementation supports only one Read chunk. 393 + * Before allocating anything, ensure the ingress Read list is safe 394 + * to use. 411 395 * 412 - * Sanity checks: 413 - * - Read list does not overflow Receive buffer. 414 - * - Segment size limited by largest NFS data payload. 415 - * 416 - * The segment count is limited to how many segments can 417 - * fit in the transport header without overflowing the 418 - * buffer. That's about 40 Read segments for a 1KB inline 419 - * threshold. 396 + * The segment count is limited to how many segments can fit in the 397 + * transport header without overflowing the buffer. That's about 40 398 + * Read segments for a 1KB inline threshold. 420 399 * 421 400 * Return values: 422 - * %true: Read list is valid. @rctxt's xdr_stream is updated 423 - * to point to the first byte past the Read list. 424 - * %false: Read list is corrupt. @rctxt's xdr_stream is left 425 - * in an unknown state. 401 + * %true: Read list is valid. @rctxt's xdr_stream is updated to point 402 + * to the first byte past the Read list. rc_read_pcl and 403 + * rc_call_pcl cl_count fields are set to the number of 404 + * Read segments in the list. 405 + * %false: Read list is corrupt. @rctxt's xdr_stream is left in an 406 + * unknown state. 426 407 */ 427 - static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt) 408 + static bool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) 428 409 { 429 - u32 position, len; 430 - bool first; 431 - __be32 *p; 432 - 433 - p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 434 - if (!p) 435 - return false; 436 - 437 - len = 0; 438 - first = true; 410 + rctxt->rc_call_pcl.cl_count = 0; 411 + rctxt->rc_read_pcl.cl_count = 0; 439 412 while (xdr_item_is_present(p)) { 413 + u32 position, handle, length; 414 + u64 offset; 415 + 440 416 p = xdr_inline_decode(&rctxt->rc_stream, 441 417 rpcrdma_readseg_maxsz * sizeof(*p)); 442 418 if (!p) 443 419 return false; 444 420 445 - if (first) { 446 - position = be32_to_cpup(p); 447 - first = false; 448 - } else if (be32_to_cpup(p) != position) { 449 - return false; 421 + xdr_decode_read_segment(p, &position, &handle, 422 + &length, &offset); 423 + if (position) { 424 + if (position & 3) 425 + return false; 426 + ++rctxt->rc_read_pcl.cl_count; 427 + } else { 428 + ++rctxt->rc_call_pcl.cl_count; 450 429 } 451 - p += 2; 452 - len += be32_to_cpup(p); 453 430 454 431 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 455 432 if (!p) 456 433 return false; 457 434 } 458 - return len <= MAX_BYTES_SPECIAL_CHUNK; 435 + return true; 459 436 } 460 437 461 - /* The segment count is limited to how many segments can 462 - * fit in the transport header without overflowing the 463 - * buffer. That's about 60 Write segments for a 1KB inline 464 - * threshold. 438 + /* Sanity check the Read list. 439 + * 440 + * Sanity checks: 441 + * - Read list does not overflow Receive buffer. 442 + * - Chunk size limited by largest NFS data payload. 443 + * 444 + * Return values: 445 + * %true: Read list is valid. @rctxt's xdr_stream is updated 446 + * to point to the first byte past the Read list. 447 + * %false: Read list is corrupt. @rctxt's xdr_stream is left 448 + * in an unknown state. 465 449 */ 466 - static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen) 450 + static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt) 467 451 { 468 - u32 i, segcount, total; 469 452 __be32 *p; 470 453 471 454 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 472 455 if (!p) 473 456 return false; 474 - segcount = be32_to_cpup(p); 457 + if (!xdr_count_read_segments(rctxt, p)) 458 + return false; 459 + if (!pcl_alloc_call(rctxt, p)) 460 + return false; 461 + return pcl_alloc_read(rctxt, p); 462 + } 475 463 476 - total = 0; 477 - for (i = 0; i < segcount; i++) { 478 - u32 handle, length; 479 - u64 offset; 464 + static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt) 465 + { 466 + u32 segcount; 467 + __be32 *p; 480 468 481 - p = xdr_inline_decode(&rctxt->rc_stream, 482 - rpcrdma_segment_maxsz * sizeof(*p)); 469 + if (xdr_stream_decode_u32(&rctxt->rc_stream, &segcount)) 470 + return false; 471 + 472 + /* A bogus segcount causes this buffer overflow check to fail. */ 473 + p = xdr_inline_decode(&rctxt->rc_stream, 474 + segcount * rpcrdma_segment_maxsz * sizeof(*p)); 475 + return p != NULL; 476 + } 477 + 478 + /** 479 + * xdr_count_write_chunks - Count number of Write chunks in Write list 480 + * @rctxt: Received header and decoding state 481 + * @p: start of an un-decoded Write list 482 + * 483 + * Before allocating anything, ensure the ingress Write list is 484 + * safe to use. 485 + * 486 + * Return values: 487 + * %true: Write list is valid. @rctxt's xdr_stream is updated 488 + * to point to the first byte past the Write list, and 489 + * the number of Write chunks is in rc_write_pcl.cl_count. 490 + * %false: Write list is corrupt. @rctxt's xdr_stream is left 491 + * in an indeterminate state. 492 + */ 493 + static bool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p) 494 + { 495 + rctxt->rc_write_pcl.cl_count = 0; 496 + while (xdr_item_is_present(p)) { 497 + if (!xdr_check_write_chunk(rctxt)) 498 + return false; 499 + ++rctxt->rc_write_pcl.cl_count; 500 + p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 483 501 if (!p) 484 502 return false; 485 - 486 - xdr_decode_rdma_segment(p, &handle, &length, &offset); 487 - trace_svcrdma_decode_wseg(handle, length, offset); 488 - 489 - total += length; 490 503 } 491 - return total <= maxlen; 504 + return true; 492 505 } 493 506 494 507 /* Sanity check the Write list. ··· 531 498 */ 532 499 static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt) 533 500 { 534 - u32 chcount = 0; 535 501 __be32 *p; 536 502 537 503 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 538 504 if (!p) 539 505 return false; 540 - rctxt->rc_write_list = p; 541 - while (xdr_item_is_present(p)) { 542 - if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK)) 543 - return false; 544 - ++chcount; 545 - p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 546 - if (!p) 547 - return false; 548 - } 549 - if (!chcount) 550 - rctxt->rc_write_list = NULL; 551 - return chcount < 2; 506 + 507 + rctxt->rc_write_list = NULL; 508 + if (!xdr_count_write_chunks(rctxt, p)) 509 + return false; 510 + if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p)) 511 + return false; 512 + 513 + if (!pcl_is_empty(&rctxt->rc_write_pcl)) 514 + rctxt->rc_write_list = p; 515 + rctxt->rc_cur_result_payload = pcl_first_chunk(&rctxt->rc_write_pcl); 516 + return rctxt->rc_write_pcl.cl_count < 2; 552 517 } 553 518 554 519 /* Sanity check the Reply chunk. ··· 568 537 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 569 538 if (!p) 570 539 return false; 540 + 571 541 rctxt->rc_reply_chunk = NULL; 572 - if (xdr_item_is_present(p)) { 573 - if (!xdr_check_write_chunk(rctxt, MAX_BYTES_SPECIAL_CHUNK)) 574 - return false; 575 - rctxt->rc_reply_chunk = p; 576 - } 577 - return true; 542 + if (!xdr_item_is_present(p)) 543 + return true; 544 + if (!xdr_check_write_chunk(rctxt)) 545 + return false; 546 + 547 + rctxt->rc_reply_chunk = p; 548 + rctxt->rc_reply_pcl.cl_count = 1; 549 + return pcl_alloc_write(rctxt, &rctxt->rc_reply_pcl, p); 578 550 } 579 551 580 552 /* RPC-over-RDMA Version One private extension: Remote Invalidation.