Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

netfs: Abstract out a rolling folio buffer implementation

A rolling buffer is a series of folios held in a list of folio_queues. New
folios and folio_queue structs may be inserted at the head simultaneously
with spent ones being removed from the tail without the need for locking.

The rolling buffer includes an iov_iter and it has to be careful managing
this as the list of folio_queues is extended such that an oops doesn't
incurred because the iterator was pointing to the end of a folio_queue
segment that got appended to and then removed.

We need to use the mechanism twice, once for read and once for write, and,
in future patches, we will use a second rolling buffer to handle bounce
buffering for content encryption.

Signed-off-by: David Howells <dhowells@redhat.com>
Link: https://lore.kernel.org/r/20241216204124.3752367-6-dhowells@redhat.com
cc: Jeff Layton <jlayton@kernel.org>
cc: netfs@lists.linux.dev
cc: linux-fsdevel@vger.kernel.org
Signed-off-by: Christian Brauner <brauner@kernel.org>

authored by

David Howells and committed by
Christian Brauner
06fa229c aabcabf2

+374 -299
+1
fs/netfs/Makefile
··· 13 13 read_collect.o \ 14 14 read_pgpriv2.o \ 15 15 read_retry.o \ 16 + rolling_buffer.o \ 16 17 write_collect.o \ 17 18 write_issue.o 18 19
+22 -95
fs/netfs/buffered_read.c
··· 64 64 } 65 65 66 66 /* 67 - * Decant the list of folios to read into a rolling buffer. 68 - */ 69 - static size_t netfs_load_buffer_from_ra(struct netfs_io_request *rreq, 70 - struct folio_queue *folioq, 71 - struct folio_batch *put_batch) 72 - { 73 - unsigned int order, nr; 74 - size_t size = 0; 75 - 76 - nr = __readahead_batch(rreq->ractl, (struct page **)folioq->vec.folios, 77 - ARRAY_SIZE(folioq->vec.folios)); 78 - folioq->vec.nr = nr; 79 - for (int i = 0; i < nr; i++) { 80 - struct folio *folio = folioq_folio(folioq, i); 81 - 82 - trace_netfs_folio(folio, netfs_folio_trace_read); 83 - order = folio_order(folio); 84 - folioq->orders[i] = order; 85 - size += PAGE_SIZE << order; 86 - 87 - if (!folio_batch_add(put_batch, folio)) 88 - folio_batch_release(put_batch); 89 - } 90 - 91 - for (int i = nr; i < folioq_nr_slots(folioq); i++) 92 - folioq_clear(folioq, i); 93 - 94 - return size; 95 - } 96 - 97 - /* 98 67 * netfs_prepare_read_iterator - Prepare the subreq iterator for I/O 99 68 * @subreq: The subrequest to be set up 100 69 * ··· 97 128 98 129 folio_batch_init(&put_batch); 99 130 while (rreq->submitted < subreq->start + rsize) { 100 - struct folio_queue *tail = rreq->buffer_tail, *new; 101 - size_t added; 131 + ssize_t added; 102 132 103 - new = netfs_folioq_alloc(rreq->debug_id, GFP_NOFS, 104 - netfs_trace_folioq_alloc_read_prep); 105 - if (!new) 106 - return -ENOMEM; 107 - new->prev = tail; 108 - tail->next = new; 109 - rreq->buffer_tail = new; 110 - added = netfs_load_buffer_from_ra(rreq, new, &put_batch); 111 - rreq->iter.count += added; 133 + added = rolling_buffer_load_from_ra(&rreq->buffer, rreq->ractl, 134 + &put_batch); 135 + if (added < 0) 136 + return added; 112 137 rreq->submitted += added; 113 138 } 114 139 folio_batch_release(&put_batch); ··· 110 147 111 148 subreq->len = rsize; 112 149 if (unlikely(rreq->io_streams[0].sreq_max_segs)) { 113 - size_t limit = netfs_limit_iter(&rreq->iter, 0, rsize, 150 + size_t limit = netfs_limit_iter(&rreq->buffer.iter, 0, rsize, 114 151 rreq->io_streams[0].sreq_max_segs); 115 152 116 153 if (limit < rsize) { ··· 119 156 } 120 157 } 121 158 122 - subreq->io_iter = rreq->iter; 159 + subreq->io_iter = rreq->buffer.iter; 123 160 124 161 if (iov_iter_is_folioq(&subreq->io_iter)) { 125 - if (subreq->io_iter.folioq_slot >= folioq_nr_slots(subreq->io_iter.folioq)) { 126 - subreq->io_iter.folioq = subreq->io_iter.folioq->next; 127 - subreq->io_iter.folioq_slot = 0; 128 - } 129 162 subreq->curr_folioq = (struct folio_queue *)subreq->io_iter.folioq; 130 163 subreq->curr_folioq_slot = subreq->io_iter.folioq_slot; 131 164 subreq->curr_folio_order = subreq->curr_folioq->orders[subreq->curr_folioq_slot]; 132 165 } 133 166 134 167 iov_iter_truncate(&subreq->io_iter, subreq->len); 135 - iov_iter_advance(&rreq->iter, subreq->len); 168 + rolling_buffer_advance(&rreq->buffer, subreq->len); 136 169 return subreq->len; 137 170 } 138 171 ··· 311 352 return ret; 312 353 } 313 354 314 - /* 315 - * Set up the initial folioq of buffer folios in the rolling buffer and set the 316 - * iterator to refer to it. 317 - */ 318 - static int netfs_prime_buffer(struct netfs_io_request *rreq) 319 - { 320 - struct folio_queue *folioq; 321 - struct folio_batch put_batch; 322 - size_t added; 323 - 324 - folioq = netfs_folioq_alloc(rreq->debug_id, GFP_KERNEL, 325 - netfs_trace_folioq_alloc_read_prime); 326 - if (!folioq) 327 - return -ENOMEM; 328 - 329 - rreq->buffer = folioq; 330 - rreq->buffer_tail = folioq; 331 - rreq->submitted = rreq->start; 332 - iov_iter_folio_queue(&rreq->iter, ITER_DEST, folioq, 0, 0, 0); 333 - 334 - folio_batch_init(&put_batch); 335 - added = netfs_load_buffer_from_ra(rreq, folioq, &put_batch); 336 - folio_batch_release(&put_batch); 337 - rreq->iter.count += added; 338 - rreq->submitted += added; 339 - return 0; 340 - } 341 - 342 355 /** 343 356 * netfs_readahead - Helper to manage a read request 344 357 * @ractl: The description of the readahead request ··· 350 419 netfs_rreq_expand(rreq, ractl); 351 420 352 421 rreq->ractl = ractl; 353 - if (netfs_prime_buffer(rreq) < 0) 422 + rreq->submitted = rreq->start; 423 + if (rolling_buffer_init(&rreq->buffer, rreq->debug_id, ITER_DEST) < 0) 354 424 goto cleanup_free; 355 425 netfs_read_to_pagecache(rreq); 356 426 ··· 367 435 /* 368 436 * Create a rolling buffer with a single occupying folio. 369 437 */ 370 - static int netfs_create_singular_buffer(struct netfs_io_request *rreq, struct folio *folio) 438 + static int netfs_create_singular_buffer(struct netfs_io_request *rreq, struct folio *folio, 439 + unsigned int rollbuf_flags) 371 440 { 372 - struct folio_queue *folioq; 441 + ssize_t added; 373 442 374 - folioq = netfs_folioq_alloc(rreq->debug_id, GFP_KERNEL, 375 - netfs_trace_folioq_alloc_read_sing); 376 - if (!folioq) 443 + if (rolling_buffer_init(&rreq->buffer, rreq->debug_id, ITER_DEST) < 0) 377 444 return -ENOMEM; 378 445 379 - folioq_append(folioq, folio); 380 - BUG_ON(folioq_folio(folioq, 0) != folio); 381 - BUG_ON(folioq_folio_order(folioq, 0) != folio_order(folio)); 382 - rreq->buffer = folioq; 383 - rreq->buffer_tail = folioq; 384 - rreq->submitted = rreq->start + rreq->len; 385 - iov_iter_folio_queue(&rreq->iter, ITER_DEST, folioq, 0, 0, rreq->len); 446 + added = rolling_buffer_append(&rreq->buffer, folio, rollbuf_flags); 447 + if (added < 0) 448 + return added; 449 + rreq->submitted = rreq->start + added; 386 450 rreq->ractl = (struct readahead_control *)1UL; 387 451 return 0; 388 452 } ··· 446 518 } 447 519 if (to < flen) 448 520 bvec_set_folio(&bvec[i++], folio, flen - to, to); 449 - iov_iter_bvec(&rreq->iter, ITER_DEST, bvec, i, rreq->len); 521 + iov_iter_bvec(&rreq->buffer.iter, ITER_DEST, bvec, i, rreq->len); 450 522 rreq->submitted = rreq->start + flen; 451 523 452 524 netfs_read_to_pagecache(rreq); ··· 514 586 trace_netfs_read(rreq, rreq->start, rreq->len, netfs_read_trace_readpage); 515 587 516 588 /* Set up the output buffer */ 517 - ret = netfs_create_singular_buffer(rreq, folio); 589 + ret = netfs_create_singular_buffer(rreq, folio, 0); 518 590 if (ret < 0) 519 591 goto discard; 520 592 ··· 671 743 trace_netfs_read(rreq, pos, len, netfs_read_trace_write_begin); 672 744 673 745 /* Set up the output buffer */ 674 - ret = netfs_create_singular_buffer(rreq, folio); 746 + ret = netfs_create_singular_buffer(rreq, folio, 0); 675 747 if (ret < 0) 676 748 goto error_put; 677 749 ··· 736 808 trace_netfs_read(rreq, start, flen, netfs_read_trace_prefetch_for_write); 737 809 738 810 /* Set up the output buffer */ 739 - ret = netfs_create_singular_buffer(rreq, folio); 811 + ret = netfs_create_singular_buffer(rreq, folio, NETFS_ROLLBUF_PAGECACHE_MARK); 740 812 if (ret < 0) 741 813 goto error_put; 742 814 743 - folioq_mark2(rreq->buffer, 0); 744 815 netfs_read_to_pagecache(rreq); 745 816 ret = netfs_wait_for_read(rreq); 746 817 netfs_put_request(rreq, false, netfs_rreq_trace_put_return);
+7 -7
fs/netfs/direct_read.c
··· 25 25 subreq->len = rsize; 26 26 27 27 if (unlikely(rreq->io_streams[0].sreq_max_segs)) { 28 - size_t limit = netfs_limit_iter(&rreq->iter, 0, rsize, 28 + size_t limit = netfs_limit_iter(&rreq->buffer.iter, 0, rsize, 29 29 rreq->io_streams[0].sreq_max_segs); 30 30 31 31 if (limit < rsize) { ··· 36 36 37 37 trace_netfs_sreq(subreq, netfs_sreq_trace_prepare); 38 38 39 - subreq->io_iter = rreq->iter; 39 + subreq->io_iter = rreq->buffer.iter; 40 40 iov_iter_truncate(&subreq->io_iter, subreq->len); 41 - iov_iter_advance(&rreq->iter, subreq->len); 41 + iov_iter_advance(&rreq->buffer.iter, subreq->len); 42 42 } 43 43 44 44 /* ··· 199 199 * the request. 200 200 */ 201 201 if (user_backed_iter(iter)) { 202 - ret = netfs_extract_user_iter(iter, rreq->len, &rreq->iter, 0); 202 + ret = netfs_extract_user_iter(iter, rreq->len, &rreq->buffer.iter, 0); 203 203 if (ret < 0) 204 204 goto out; 205 - rreq->direct_bv = (struct bio_vec *)rreq->iter.bvec; 205 + rreq->direct_bv = (struct bio_vec *)rreq->buffer.iter.bvec; 206 206 rreq->direct_bv_count = ret; 207 207 rreq->direct_bv_unpin = iov_iter_extract_will_pin(iter); 208 - rreq->len = iov_iter_count(&rreq->iter); 208 + rreq->len = iov_iter_count(&rreq->buffer.iter); 209 209 } else { 210 - rreq->iter = *iter; 210 + rreq->buffer.iter = *iter; 211 211 rreq->len = orig_count; 212 212 rreq->direct_bv_unpin = false; 213 213 iov_iter_advance(iter, orig_count);
+4 -6
fs/netfs/direct_write.c
··· 68 68 * request. 69 69 */ 70 70 if (async || user_backed_iter(iter)) { 71 - n = netfs_extract_user_iter(iter, len, &wreq->iter, 0); 71 + n = netfs_extract_user_iter(iter, len, &wreq->buffer.iter, 0); 72 72 if (n < 0) { 73 73 ret = n; 74 74 goto out; 75 75 } 76 - wreq->direct_bv = (struct bio_vec *)wreq->iter.bvec; 76 + wreq->direct_bv = (struct bio_vec *)wreq->buffer.iter.bvec; 77 77 wreq->direct_bv_count = n; 78 78 wreq->direct_bv_unpin = iov_iter_extract_will_pin(iter); 79 79 } else { 80 - wreq->iter = *iter; 80 + wreq->buffer.iter = *iter; 81 81 } 82 - 83 - wreq->io_iter = wreq->iter; 84 82 } 85 83 86 84 __set_bit(NETFS_RREQ_USE_IO_ITER, &wreq->flags); ··· 90 92 __set_bit(NETFS_RREQ_UPLOAD_TO_SERVER, &wreq->flags); 91 93 if (async) 92 94 wreq->iocb = iocb; 93 - wreq->len = iov_iter_count(&wreq->io_iter); 95 + wreq->len = iov_iter_count(&wreq->buffer.iter); 94 96 wreq->cleanup = netfs_cleanup_dio_write; 95 97 ret = netfs_unbuffered_write(wreq, is_sync_kiocb(iocb), wreq->len); 96 98 if (ret < 0) {
-4
fs/netfs/internal.h
··· 60 60 */ 61 61 struct folio_queue *netfs_buffer_make_space(struct netfs_io_request *rreq, 62 62 enum netfs_folioq_trace trace); 63 - int netfs_buffer_append_folio(struct netfs_io_request *rreq, struct folio *folio, 64 - bool needs_put); 65 - struct folio_queue *netfs_delete_buffer_head(struct netfs_io_request *wreq); 66 - void netfs_clear_buffer(struct netfs_io_request *rreq); 67 63 void netfs_reset_iter(struct netfs_io_subrequest *subreq); 68 64 69 65 /*
-147
fs/netfs/misc.c
··· 8 8 #include <linux/swap.h> 9 9 #include "internal.h" 10 10 11 - /** 12 - * netfs_folioq_alloc - Allocate a folio_queue struct 13 - * @rreq_id: Associated debugging ID for tracing purposes 14 - * @gfp: Allocation constraints 15 - * @trace: Trace tag to indicate the purpose of the allocation 16 - * 17 - * Allocate, initialise and account the folio_queue struct and log a trace line 18 - * to mark the allocation. 19 - */ 20 - struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp, 21 - unsigned int /*enum netfs_folioq_trace*/ trace) 22 - { 23 - static atomic_t debug_ids; 24 - struct folio_queue *fq; 25 - 26 - fq = kmalloc(sizeof(*fq), gfp); 27 - if (fq) { 28 - netfs_stat(&netfs_n_folioq); 29 - folioq_init(fq, rreq_id); 30 - fq->debug_id = atomic_inc_return(&debug_ids); 31 - trace_netfs_folioq(fq, trace); 32 - } 33 - return fq; 34 - } 35 - EXPORT_SYMBOL(netfs_folioq_alloc); 36 - 37 - /** 38 - * netfs_folioq_free - Free a folio_queue struct 39 - * @folioq: The object to free 40 - * @trace: Trace tag to indicate which free 41 - * 42 - * Free and unaccount the folio_queue struct. 43 - */ 44 - void netfs_folioq_free(struct folio_queue *folioq, 45 - unsigned int /*enum netfs_trace_folioq*/ trace) 46 - { 47 - trace_netfs_folioq(folioq, trace); 48 - netfs_stat_d(&netfs_n_folioq); 49 - kfree(folioq); 50 - } 51 - EXPORT_SYMBOL(netfs_folioq_free); 52 - 53 - /* 54 - * Make sure there's space in the rolling queue. 55 - */ 56 - struct folio_queue *netfs_buffer_make_space(struct netfs_io_request *rreq, 57 - enum netfs_folioq_trace trace) 58 - { 59 - struct folio_queue *tail = rreq->buffer_tail, *prev; 60 - unsigned int prev_nr_slots = 0; 61 - 62 - if (WARN_ON_ONCE(!rreq->buffer && tail) || 63 - WARN_ON_ONCE(rreq->buffer && !tail)) 64 - return ERR_PTR(-EIO); 65 - 66 - prev = tail; 67 - if (prev) { 68 - if (!folioq_full(tail)) 69 - return tail; 70 - prev_nr_slots = folioq_nr_slots(tail); 71 - } 72 - 73 - tail = netfs_folioq_alloc(rreq->debug_id, GFP_NOFS, trace); 74 - if (!tail) 75 - return ERR_PTR(-ENOMEM); 76 - tail->prev = prev; 77 - if (prev) 78 - /* [!] NOTE: After we set prev->next, the consumer is entirely 79 - * at liberty to delete prev. 80 - */ 81 - WRITE_ONCE(prev->next, tail); 82 - 83 - rreq->buffer_tail = tail; 84 - if (!rreq->buffer) { 85 - rreq->buffer = tail; 86 - iov_iter_folio_queue(&rreq->io_iter, ITER_SOURCE, tail, 0, 0, 0); 87 - } else { 88 - /* Make sure we don't leave the master iterator pointing to a 89 - * block that might get immediately consumed. 90 - */ 91 - if (rreq->io_iter.folioq == prev && 92 - rreq->io_iter.folioq_slot == prev_nr_slots) { 93 - rreq->io_iter.folioq = tail; 94 - rreq->io_iter.folioq_slot = 0; 95 - } 96 - } 97 - rreq->buffer_tail_slot = 0; 98 - return tail; 99 - } 100 - 101 - /* 102 - * Append a folio to the rolling queue. 103 - */ 104 - int netfs_buffer_append_folio(struct netfs_io_request *rreq, struct folio *folio, 105 - bool needs_put) 106 - { 107 - struct folio_queue *tail; 108 - unsigned int slot, order = folio_order(folio); 109 - 110 - tail = netfs_buffer_make_space(rreq, netfs_trace_folioq_alloc_append_folio); 111 - if (IS_ERR(tail)) 112 - return PTR_ERR(tail); 113 - 114 - rreq->io_iter.count += PAGE_SIZE << order; 115 - 116 - slot = folioq_append(tail, folio); 117 - /* Store the counter after setting the slot. */ 118 - smp_store_release(&rreq->buffer_tail_slot, slot); 119 - return 0; 120 - } 121 - 122 - /* 123 - * Delete the head of a rolling queue. 124 - */ 125 - struct folio_queue *netfs_delete_buffer_head(struct netfs_io_request *wreq) 126 - { 127 - struct folio_queue *head = wreq->buffer, *next = head->next; 128 - 129 - if (next) 130 - next->prev = NULL; 131 - netfs_folioq_free(head, netfs_trace_folioq_delete); 132 - wreq->buffer = next; 133 - return next; 134 - } 135 - 136 - /* 137 - * Clear out a rolling queue. 138 - */ 139 - void netfs_clear_buffer(struct netfs_io_request *rreq) 140 - { 141 - struct folio_queue *p; 142 - 143 - while ((p = rreq->buffer)) { 144 - rreq->buffer = p->next; 145 - for (int slot = 0; slot < folioq_count(p); slot++) { 146 - struct folio *folio = folioq_folio(p, slot); 147 - if (!folio) 148 - continue; 149 - if (folioq_is_marked(p, slot)) { 150 - trace_netfs_folio(folio, netfs_folio_trace_put); 151 - folio_put(folio); 152 - } 153 - } 154 - netfs_folioq_free(p, netfs_trace_folioq_clear); 155 - } 156 - } 157 - 158 11 /* 159 12 * Reset the subrequest iterator to refer just to the region remaining to be 160 13 * read. The iterator may or may not have been advanced by socket ops or
+1 -1
fs/netfs/objects.c
··· 143 143 } 144 144 kvfree(rreq->direct_bv); 145 145 } 146 - netfs_clear_buffer(rreq); 146 + rolling_buffer_clear(&rreq->buffer); 147 147 148 148 if (atomic_dec_and_test(&ictx->io_count)) 149 149 wake_up_var(&ictx->io_count);
+17 -15
fs/netfs/read_pgpriv2.c
··· 34 34 * [DEPRECATED] Cancel PG_private_2 on all marked folios in the event of an 35 35 * unrecoverable error. 36 36 */ 37 - static void netfs_pgpriv2_cancel(struct folio_queue *folioq) 37 + static void netfs_pgpriv2_cancel(struct rolling_buffer *buffer) 38 38 { 39 + struct folio_queue *folioq = buffer->tail; 39 40 struct folio *folio; 40 41 int slot; 41 42 ··· 95 94 trace_netfs_folio(folio, netfs_folio_trace_store_copy); 96 95 97 96 /* Attach the folio to the rolling buffer. */ 98 - if (netfs_buffer_append_folio(wreq, folio, false) < 0) 97 + if (rolling_buffer_append(&wreq->buffer, folio, 0) < 0) 99 98 return -ENOMEM; 100 99 101 100 cache->submit_extendable_to = fsize; ··· 110 109 do { 111 110 ssize_t part; 112 111 113 - wreq->io_iter.iov_offset = cache->submit_off; 112 + wreq->buffer.iter.iov_offset = cache->submit_off; 114 113 115 114 atomic64_set(&wreq->issued_to, fpos + cache->submit_off); 116 115 cache->submit_extendable_to = fsize - cache->submit_off; ··· 123 122 cache->submit_len -= part; 124 123 } while (cache->submit_len > 0); 125 124 126 - wreq->io_iter.iov_offset = 0; 127 - iov_iter_advance(&wreq->io_iter, fsize); 125 + wreq->buffer.iter.iov_offset = 0; 126 + rolling_buffer_advance(&wreq->buffer, fsize); 128 127 atomic64_set(&wreq->issued_to, fpos + fsize); 129 128 130 129 if (flen < fsize) ··· 152 151 goto couldnt_start; 153 152 154 153 /* Need the first folio to be able to set up the op. */ 155 - for (folioq = rreq->buffer; folioq; folioq = folioq->next) { 154 + for (folioq = rreq->buffer.tail; folioq; folioq = folioq->next) { 156 155 if (folioq->marks3) { 157 156 slot = __ffs(folioq->marks3); 158 157 break; ··· 199 198 netfs_put_request(wreq, false, netfs_rreq_trace_put_return); 200 199 _leave(" = %d", error); 201 200 couldnt_start: 202 - netfs_pgpriv2_cancel(rreq->buffer); 201 + netfs_pgpriv2_cancel(&rreq->buffer); 203 202 } 204 203 205 204 /* ··· 208 207 */ 209 208 bool netfs_pgpriv2_unlock_copied_folios(struct netfs_io_request *wreq) 210 209 { 211 - struct folio_queue *folioq = wreq->buffer; 210 + struct folio_queue *folioq = wreq->buffer.tail; 212 211 unsigned long long collected_to = wreq->collected_to; 213 - unsigned int slot = wreq->buffer_head_slot; 212 + unsigned int slot = wreq->buffer.first_tail_slot; 214 213 bool made_progress = false; 215 214 216 215 if (slot >= folioq_nr_slots(folioq)) { 217 - folioq = netfs_delete_buffer_head(wreq); 216 + folioq = rolling_buffer_delete_spent(&wreq->buffer); 218 217 slot = 0; 219 218 } 220 219 ··· 253 252 folioq_clear(folioq, slot); 254 253 slot++; 255 254 if (slot >= folioq_nr_slots(folioq)) { 256 - if (READ_ONCE(wreq->buffer_tail) == folioq) 257 - break; 258 - folioq = netfs_delete_buffer_head(wreq); 255 + folioq = rolling_buffer_delete_spent(&wreq->buffer); 256 + if (!folioq) 257 + goto done; 259 258 slot = 0; 260 259 } 261 260 ··· 263 262 break; 264 263 } 265 264 266 - wreq->buffer = folioq; 267 - wreq->buffer_head_slot = slot; 265 + wreq->buffer.tail = folioq; 266 + done: 267 + wreq->buffer.first_tail_slot = slot; 268 268 return made_progress; 269 269 }
+1 -1
fs/netfs/read_retry.c
··· 245 245 { 246 246 struct folio_queue *p; 247 247 248 - for (p = rreq->buffer; p; p = p->next) { 248 + for (p = rreq->buffer.tail; p; p = p->next) { 249 249 for (int slot = 0; slot < folioq_count(p); slot++) { 250 250 struct folio *folio = folioq_folio(p, slot); 251 251
+226
fs/netfs/rolling_buffer.c
··· 1 + // SPDX-License-Identifier: GPL-2.0-or-later 2 + /* Rolling buffer helpers 3 + * 4 + * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved. 5 + * Written by David Howells (dhowells@redhat.com) 6 + */ 7 + 8 + #include <linux/bitops.h> 9 + #include <linux/pagemap.h> 10 + #include <linux/rolling_buffer.h> 11 + #include <linux/slab.h> 12 + #include "internal.h" 13 + 14 + static atomic_t debug_ids; 15 + 16 + /** 17 + * netfs_folioq_alloc - Allocate a folio_queue struct 18 + * @rreq_id: Associated debugging ID for tracing purposes 19 + * @gfp: Allocation constraints 20 + * @trace: Trace tag to indicate the purpose of the allocation 21 + * 22 + * Allocate, initialise and account the folio_queue struct and log a trace line 23 + * to mark the allocation. 24 + */ 25 + struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp, 26 + unsigned int /*enum netfs_folioq_trace*/ trace) 27 + { 28 + struct folio_queue *fq; 29 + 30 + fq = kmalloc(sizeof(*fq), gfp); 31 + if (fq) { 32 + netfs_stat(&netfs_n_folioq); 33 + folioq_init(fq, rreq_id); 34 + fq->debug_id = atomic_inc_return(&debug_ids); 35 + trace_netfs_folioq(fq, trace); 36 + } 37 + return fq; 38 + } 39 + EXPORT_SYMBOL(netfs_folioq_alloc); 40 + 41 + /** 42 + * netfs_folioq_free - Free a folio_queue struct 43 + * @folioq: The object to free 44 + * @trace: Trace tag to indicate which free 45 + * 46 + * Free and unaccount the folio_queue struct. 47 + */ 48 + void netfs_folioq_free(struct folio_queue *folioq, 49 + unsigned int /*enum netfs_trace_folioq*/ trace) 50 + { 51 + trace_netfs_folioq(folioq, trace); 52 + netfs_stat_d(&netfs_n_folioq); 53 + kfree(folioq); 54 + } 55 + EXPORT_SYMBOL(netfs_folioq_free); 56 + 57 + /* 58 + * Initialise a rolling buffer. We allocate an empty folio queue struct to so 59 + * that the pointers can be independently driven by the producer and the 60 + * consumer. 61 + */ 62 + int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id, 63 + unsigned int direction) 64 + { 65 + struct folio_queue *fq; 66 + 67 + fq = netfs_folioq_alloc(rreq_id, GFP_NOFS, netfs_trace_folioq_rollbuf_init); 68 + if (!fq) 69 + return -ENOMEM; 70 + 71 + roll->head = fq; 72 + roll->tail = fq; 73 + iov_iter_folio_queue(&roll->iter, direction, fq, 0, 0, 0); 74 + return 0; 75 + } 76 + 77 + /* 78 + * Add another folio_queue to a rolling buffer if there's no space left. 79 + */ 80 + int rolling_buffer_make_space(struct rolling_buffer *roll) 81 + { 82 + struct folio_queue *fq, *head = roll->head; 83 + 84 + if (!folioq_full(head)) 85 + return 0; 86 + 87 + fq = netfs_folioq_alloc(head->rreq_id, GFP_NOFS, netfs_trace_folioq_make_space); 88 + if (!fq) 89 + return -ENOMEM; 90 + fq->prev = head; 91 + 92 + roll->head = fq; 93 + if (folioq_full(head)) { 94 + /* Make sure we don't leave the master iterator pointing to a 95 + * block that might get immediately consumed. 96 + */ 97 + if (roll->iter.folioq == head && 98 + roll->iter.folioq_slot == folioq_nr_slots(head)) { 99 + roll->iter.folioq = fq; 100 + roll->iter.folioq_slot = 0; 101 + } 102 + } 103 + 104 + /* Make sure the initialisation is stored before the next pointer. 105 + * 106 + * [!] NOTE: After we set head->next, the consumer is at liberty to 107 + * immediately delete the old head. 108 + */ 109 + smp_store_release(&head->next, fq); 110 + return 0; 111 + } 112 + 113 + /* 114 + * Decant the list of folios to read into a rolling buffer. 115 + */ 116 + ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll, 117 + struct readahead_control *ractl, 118 + struct folio_batch *put_batch) 119 + { 120 + struct folio_queue *fq; 121 + struct page **vec; 122 + int nr, ix, to; 123 + ssize_t size = 0; 124 + 125 + if (rolling_buffer_make_space(roll) < 0) 126 + return -ENOMEM; 127 + 128 + fq = roll->head; 129 + vec = (struct page **)fq->vec.folios; 130 + nr = __readahead_batch(ractl, vec + folio_batch_count(&fq->vec), 131 + folio_batch_space(&fq->vec)); 132 + ix = fq->vec.nr; 133 + to = ix + nr; 134 + fq->vec.nr = to; 135 + for (; ix < to; ix++) { 136 + struct folio *folio = folioq_folio(fq, ix); 137 + unsigned int order = folio_order(folio); 138 + 139 + fq->orders[ix] = order; 140 + size += PAGE_SIZE << order; 141 + trace_netfs_folio(folio, netfs_folio_trace_read); 142 + if (!folio_batch_add(put_batch, folio)) 143 + folio_batch_release(put_batch); 144 + } 145 + WRITE_ONCE(roll->iter.count, roll->iter.count + size); 146 + 147 + /* Store the counter after setting the slot. */ 148 + smp_store_release(&roll->next_head_slot, to); 149 + 150 + for (; ix < folioq_nr_slots(fq); ix++) 151 + folioq_clear(fq, ix); 152 + 153 + return size; 154 + } 155 + 156 + /* 157 + * Append a folio to the rolling buffer. 158 + */ 159 + ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio, 160 + unsigned int flags) 161 + { 162 + ssize_t size = folio_size(folio); 163 + int slot; 164 + 165 + if (rolling_buffer_make_space(roll) < 0) 166 + return -ENOMEM; 167 + 168 + slot = folioq_append(roll->head, folio); 169 + if (flags & ROLLBUF_MARK_1) 170 + folioq_mark(roll->head, slot); 171 + if (flags & ROLLBUF_MARK_2) 172 + folioq_mark2(roll->head, slot); 173 + 174 + WRITE_ONCE(roll->iter.count, roll->iter.count + size); 175 + 176 + /* Store the counter after setting the slot. */ 177 + smp_store_release(&roll->next_head_slot, slot); 178 + return size; 179 + } 180 + 181 + /* 182 + * Delete a spent buffer from a rolling queue and return the next in line. We 183 + * don't return the last buffer to keep the pointers independent, but return 184 + * NULL instead. 185 + */ 186 + struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll) 187 + { 188 + struct folio_queue *spent = roll->tail, *next = READ_ONCE(spent->next); 189 + 190 + if (!next) 191 + return NULL; 192 + next->prev = NULL; 193 + netfs_folioq_free(spent, netfs_trace_folioq_delete); 194 + roll->tail = next; 195 + return next; 196 + } 197 + 198 + /* 199 + * Clear out a rolling queue. Folios that have mark 1 set are put. 200 + */ 201 + void rolling_buffer_clear(struct rolling_buffer *roll) 202 + { 203 + struct folio_batch fbatch; 204 + struct folio_queue *p; 205 + 206 + folio_batch_init(&fbatch); 207 + 208 + while ((p = roll->tail)) { 209 + roll->tail = p->next; 210 + for (int slot = 0; slot < folioq_count(p); slot++) { 211 + struct folio *folio = folioq_folio(p, slot); 212 + 213 + if (!folio) 214 + continue; 215 + if (folioq_is_marked(p, slot)) { 216 + trace_netfs_folio(folio, netfs_folio_trace_put); 217 + if (!folio_batch_add(&fbatch, folio)) 218 + folio_batch_release(&fbatch); 219 + } 220 + } 221 + 222 + netfs_folioq_free(p, netfs_trace_folioq_clear); 223 + } 224 + 225 + folio_batch_release(&fbatch); 226 + }
+11 -8
fs/netfs/write_collect.c
··· 83 83 static void netfs_writeback_unlock_folios(struct netfs_io_request *wreq, 84 84 unsigned int *notes) 85 85 { 86 - struct folio_queue *folioq = wreq->buffer; 86 + struct folio_queue *folioq = wreq->buffer.tail; 87 87 unsigned long long collected_to = wreq->collected_to; 88 - unsigned int slot = wreq->buffer_head_slot; 88 + unsigned int slot = wreq->buffer.first_tail_slot; 89 89 90 90 if (wreq->origin == NETFS_PGPRIV2_COPY_TO_CACHE) { 91 91 if (netfs_pgpriv2_unlock_copied_folios(wreq)) ··· 94 94 } 95 95 96 96 if (slot >= folioq_nr_slots(folioq)) { 97 - folioq = netfs_delete_buffer_head(wreq); 97 + folioq = rolling_buffer_delete_spent(&wreq->buffer); 98 + if (!folioq) 99 + return; 98 100 slot = 0; 99 101 } 100 102 ··· 136 134 folioq_clear(folioq, slot); 137 135 slot++; 138 136 if (slot >= folioq_nr_slots(folioq)) { 139 - if (READ_ONCE(wreq->buffer_tail) == folioq) 140 - break; 141 - folioq = netfs_delete_buffer_head(wreq); 137 + folioq = rolling_buffer_delete_spent(&wreq->buffer); 138 + if (!folioq) 139 + goto done; 142 140 slot = 0; 143 141 } 144 142 ··· 146 144 break; 147 145 } 148 146 149 - wreq->buffer = folioq; 150 - wreq->buffer_head_slot = slot; 147 + wreq->buffer.tail = folioq; 148 + done: 149 + wreq->buffer.first_tail_slot = slot; 151 150 } 152 151 153 152 /*
+17 -9
fs/netfs/write_issue.c
··· 107 107 ictx = netfs_inode(wreq->inode); 108 108 if (is_buffered && netfs_is_cache_enabled(ictx)) 109 109 fscache_begin_write_operation(&wreq->cache_resources, netfs_i_cookie(ictx)); 110 + if (rolling_buffer_init(&wreq->buffer, wreq->debug_id, ITER_SOURCE) < 0) 111 + goto nomem; 110 112 111 113 wreq->cleaned_to = wreq->start; 112 114 ··· 131 129 } 132 130 133 131 return wreq; 132 + nomem: 133 + wreq->error = -ENOMEM; 134 + netfs_put_request(wreq, false, netfs_rreq_trace_put_failed); 135 + return ERR_PTR(-ENOMEM); 134 136 } 135 137 136 138 /** ··· 159 153 loff_t start) 160 154 { 161 155 struct netfs_io_subrequest *subreq; 162 - struct iov_iter *wreq_iter = &wreq->io_iter; 156 + struct iov_iter *wreq_iter = &wreq->buffer.iter; 163 157 164 158 /* Make sure we don't point the iterator at a used-up folio_queue 165 159 * struct being used as a placeholder to prevent the queue from 166 160 * collapsing. In such a case, extend the queue. 167 161 */ 168 162 if (iov_iter_is_folioq(wreq_iter) && 169 - wreq_iter->folioq_slot >= folioq_nr_slots(wreq_iter->folioq)) { 170 - netfs_buffer_make_space(wreq, netfs_trace_folioq_prep_write); 171 - } 163 + wreq_iter->folioq_slot >= folioq_nr_slots(wreq_iter->folioq)) 164 + rolling_buffer_make_space(&wreq->buffer); 172 165 173 166 subreq = netfs_alloc_subrequest(wreq); 174 167 subreq->source = stream->source; ··· 332 327 333 328 _enter(""); 334 329 330 + if (rolling_buffer_make_space(&wreq->buffer) < 0) 331 + return -ENOMEM; 332 + 335 333 /* netfs_perform_write() may shift i_size around the page or from out 336 334 * of the page to beyond it, but cannot move i_size into or through the 337 335 * page since we have it locked. ··· 439 431 } 440 432 441 433 /* Attach the folio to the rolling buffer. */ 442 - netfs_buffer_append_folio(wreq, folio, false); 434 + rolling_buffer_append(&wreq->buffer, folio, 0); 443 435 444 436 /* Move the submission point forward to allow for write-streaming data 445 437 * not starting at the front of the page. We don't do write-streaming ··· 486 478 487 479 /* Advance the iterator(s). */ 488 480 if (stream->submit_off > iter_off) { 489 - iov_iter_advance(&wreq->io_iter, stream->submit_off - iter_off); 481 + rolling_buffer_advance(&wreq->buffer, stream->submit_off - iter_off); 490 482 iter_off = stream->submit_off; 491 483 } 492 484 ··· 504 496 } 505 497 506 498 if (fsize > iter_off) 507 - iov_iter_advance(&wreq->io_iter, fsize - iter_off); 499 + rolling_buffer_advance(&wreq->buffer, fsize - iter_off); 508 500 atomic64_set(&wreq->issued_to, fpos + fsize); 509 501 510 502 if (!debug) ··· 643 635 struct folio **writethrough_cache) 644 636 { 645 637 _enter("R=%x ic=%zu ws=%u cp=%zu tp=%u", 646 - wreq->debug_id, wreq->iter.count, wreq->wsize, copied, to_page_end); 638 + wreq->debug_id, wreq->buffer.iter.count, wreq->wsize, copied, to_page_end); 647 639 648 640 if (!*writethrough_cache) { 649 641 if (folio_test_dirty(folio)) ··· 718 710 part = netfs_advance_write(wreq, upload, start, len, false); 719 711 start += part; 720 712 len -= part; 721 - iov_iter_advance(&wreq->io_iter, part); 713 + rolling_buffer_advance(&wreq->buffer, part); 722 714 if (test_bit(NETFS_RREQ_PAUSE, &wreq->flags)) { 723 715 trace_netfs_rreq(wreq, netfs_rreq_trace_wait_pause); 724 716 wait_on_bit(&wreq->flags, NETFS_RREQ_PAUSE, TASK_UNINTERRUPTIBLE);
+4 -6
include/linux/netfs.h
··· 18 18 #include <linux/fs.h> 19 19 #include <linux/pagemap.h> 20 20 #include <linux/uio.h> 21 + #include <linux/rolling_buffer.h> 21 22 22 23 enum netfs_sreq_ref_trace; 23 24 typedef struct mempool_s mempool_t; ··· 239 238 struct netfs_io_stream io_streams[2]; /* Streams of parallel I/O operations */ 240 239 #define NR_IO_STREAMS 2 //wreq->nr_io_streams 241 240 struct netfs_group *group; /* Writeback group being written back */ 242 - struct folio_queue *buffer; /* Head of I/O buffer */ 243 - struct folio_queue *buffer_tail; /* Tail of I/O buffer */ 244 - struct iov_iter iter; /* Unencrypted-side iterator */ 245 - struct iov_iter io_iter; /* I/O (Encrypted-side) iterator */ 241 + struct rolling_buffer buffer; /* Unencrypted buffer */ 242 + #define NETFS_ROLLBUF_PUT_MARK ROLLBUF_MARK_1 243 + #define NETFS_ROLLBUF_PAGECACHE_MARK ROLLBUF_MARK_2 246 244 void *netfs_priv; /* Private data for the netfs */ 247 245 void *netfs_priv2; /* Private data for the netfs */ 248 246 struct bio_vec *direct_bv; /* DIO buffer list (when handling iovec-iter) */ ··· 259 259 long error; /* 0 or error that occurred */ 260 260 enum netfs_io_origin origin; /* Origin of the request */ 261 261 bool direct_bv_unpin; /* T if direct_bv[] must be unpinned */ 262 - u8 buffer_head_slot; /* First slot in ->buffer */ 263 - u8 buffer_tail_slot; /* Next slot in ->buffer_tail */ 264 262 unsigned long long i_size; /* Size of the file */ 265 263 unsigned long long start; /* Start position */ 266 264 atomic64_t issued_to; /* Write issuer folio cursor */
+61
include/linux/rolling_buffer.h
··· 1 + /* SPDX-License-Identifier: GPL-2.0-or-later */ 2 + /* Rolling buffer of folios 3 + * 4 + * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved. 5 + * Written by David Howells (dhowells@redhat.com) 6 + */ 7 + 8 + #ifndef _ROLLING_BUFFER_H 9 + #define _ROLLING_BUFFER_H 10 + 11 + #include <linux/folio_queue.h> 12 + #include <linux/uio.h> 13 + 14 + /* 15 + * Rolling buffer. Whilst the buffer is live and in use, folios and folio 16 + * queue segments can be added to one end by one thread and removed from the 17 + * other end by another thread. The buffer isn't allowed to be empty; it must 18 + * always have at least one folio_queue in it so that neither side has to 19 + * modify both queue pointers. 20 + * 21 + * The iterator in the buffer is extended as buffers are inserted. It can be 22 + * snapshotted to use a segment of the buffer. 23 + */ 24 + struct rolling_buffer { 25 + struct folio_queue *head; /* Producer's insertion point */ 26 + struct folio_queue *tail; /* Consumer's removal point */ 27 + struct iov_iter iter; /* Iterator tracking what's left in the buffer */ 28 + u8 next_head_slot; /* Next slot in ->head */ 29 + u8 first_tail_slot; /* First slot in ->tail */ 30 + }; 31 + 32 + /* 33 + * Snapshot of a rolling buffer. 34 + */ 35 + struct rolling_buffer_snapshot { 36 + struct folio_queue *curr_folioq; /* Queue segment in which current folio resides */ 37 + unsigned char curr_slot; /* Folio currently being read */ 38 + unsigned char curr_order; /* Order of folio */ 39 + }; 40 + 41 + /* Marks to store per-folio in the internal folio_queue structs. */ 42 + #define ROLLBUF_MARK_1 BIT(0) 43 + #define ROLLBUF_MARK_2 BIT(1) 44 + 45 + int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id, 46 + unsigned int direction); 47 + int rolling_buffer_make_space(struct rolling_buffer *roll); 48 + ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll, 49 + struct readahead_control *ractl, 50 + struct folio_batch *put_batch); 51 + ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio, 52 + unsigned int flags); 53 + struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll); 54 + void rolling_buffer_clear(struct rolling_buffer *roll); 55 + 56 + static inline void rolling_buffer_advance(struct rolling_buffer *roll, size_t amount) 57 + { 58 + iov_iter_advance(&roll->iter, amount); 59 + } 60 + 61 + #endif /* _ROLLING_BUFFER_H */
+2
include/trace/events/netfs.h
··· 198 198 EM(netfs_trace_folioq_alloc_read_sing, "alloc-r-sing") \ 199 199 EM(netfs_trace_folioq_clear, "clear") \ 200 200 EM(netfs_trace_folioq_delete, "delete") \ 201 + EM(netfs_trace_folioq_make_space, "make-space") \ 201 202 EM(netfs_trace_folioq_prep_write, "prep-wr") \ 203 + EM(netfs_trace_folioq_rollbuf_init, "roll-init") \ 202 204 E_(netfs_trace_folioq_read_progress, "r-progress") 203 205 204 206 #ifndef __NETFS_DECLARE_TRACE_ENUMS_ONCE_ONLY