Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

ring-buffer: Read and write to ring buffers with custom sub buffer size

As the size of the ring sub buffer page can be changed dynamically,
the logic that reads and writes to the buffer should be fixed to take
that into account. Some internal ring buffer APIs are changed:
ring_buffer_alloc_read_page()
ring_buffer_free_read_page()
ring_buffer_read_page()
A new API is introduced:
ring_buffer_read_page_data()

Link: https://lore.kernel.org/linux-trace-devel/20211213094825.61876-6-tz.stoyanov@gmail.com
Link: https://lore.kernel.org/linux-trace-kernel/20231219185628.875145995@goodmis.org

Cc: Masami Hiramatsu <mhiramat@kernel.org>
Cc: Mark Rutland <mark.rutland@arm.com>
Cc: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Cc: Andrew Morton <akpm@linux-foundation.org>
Cc: Vincent Donnefort <vdonnefort@google.com>
Cc: Kent Overstreet <kent.overstreet@gmail.com>
Signed-off-by: Tzvetomir Stoyanov (VMware) <tz.stoyanov@gmail.com>
[ Fixed kerneldoc on data_page parameter in ring_buffer_free_read_page() ]
Signed-off-by: Steven Rostedt (Google) <rostedt@goodmis.org>

authored by

Tzvetomir Stoyanov (VMware) and committed by
Steven Rostedt (Google)
bce761d7 f9b94daa

+89 -41
+8 -3
include/linux/ring_buffer.h
··· 192 192 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu); 193 193 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu); 194 194 195 - void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu); 196 - void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data); 197 - int ring_buffer_read_page(struct trace_buffer *buffer, void **data_page, 195 + struct buffer_data_read_page; 196 + struct buffer_data_read_page * 197 + ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu); 198 + void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 199 + struct buffer_data_read_page *page); 200 + int ring_buffer_read_page(struct trace_buffer *buffer, 201 + struct buffer_data_read_page *data_page, 198 202 size_t len, int cpu, int full); 203 + void *ring_buffer_read_page_data(struct buffer_data_read_page *page); 199 204 200 205 struct trace_seq; 201 206
+55 -20
kernel/trace/ring_buffer.c
··· 318 318 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 319 319 }; 320 320 321 + struct buffer_data_read_page { 322 + unsigned order; /* order of the page */ 323 + struct buffer_data_page *data; /* actual data, stored in this page */ 324 + }; 325 + 321 326 /* 322 327 * Note, the buffer_page list must be first. The buffer pages 323 328 * are allocated in cache lines, which means that each buffer ··· 5488 5483 * Returns: 5489 5484 * The page allocated, or ERR_PTR 5490 5485 */ 5491 - void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5486 + struct buffer_data_read_page * 5487 + ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5492 5488 { 5493 5489 struct ring_buffer_per_cpu *cpu_buffer; 5494 - struct buffer_data_page *bpage = NULL; 5490 + struct buffer_data_read_page *bpage = NULL; 5495 5491 unsigned long flags; 5496 5492 struct page *page; 5497 5493 5498 5494 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5499 5495 return ERR_PTR(-ENODEV); 5500 5496 5497 + bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 5498 + if (!bpage) 5499 + return ERR_PTR(-ENOMEM); 5500 + 5501 + bpage->order = buffer->subbuf_order; 5501 5502 cpu_buffer = buffer->buffers[cpu]; 5502 5503 local_irq_save(flags); 5503 5504 arch_spin_lock(&cpu_buffer->lock); 5504 5505 5505 5506 if (cpu_buffer->free_page) { 5506 - bpage = cpu_buffer->free_page; 5507 + bpage->data = cpu_buffer->free_page; 5507 5508 cpu_buffer->free_page = NULL; 5508 5509 } 5509 5510 5510 5511 arch_spin_unlock(&cpu_buffer->lock); 5511 5512 local_irq_restore(flags); 5512 5513 5513 - if (bpage) 5514 + if (bpage->data) 5514 5515 goto out; 5515 5516 5516 5517 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_NORETRY, 5517 5518 cpu_buffer->buffer->subbuf_order); 5518 - if (!page) 5519 + if (!page) { 5520 + kfree(bpage); 5519 5521 return ERR_PTR(-ENOMEM); 5522 + } 5520 5523 5521 - bpage = page_address(page); 5524 + bpage->data = page_address(page); 5522 5525 5523 5526 out: 5524 - rb_init_page(bpage); 5527 + rb_init_page(bpage->data); 5525 5528 5526 5529 return bpage; 5527 5530 } ··· 5539 5526 * ring_buffer_free_read_page - free an allocated read page 5540 5527 * @buffer: the buffer the page was allocate for 5541 5528 * @cpu: the cpu buffer the page came from 5542 - * @data: the page to free 5529 + * @data_page: the page to free 5543 5530 * 5544 5531 * Free a page allocated from ring_buffer_alloc_read_page. 5545 5532 */ 5546 - void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data) 5533 + void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 5534 + struct buffer_data_read_page *data_page) 5547 5535 { 5548 5536 struct ring_buffer_per_cpu *cpu_buffer; 5549 - struct buffer_data_page *bpage = data; 5537 + struct buffer_data_page *bpage = data_page->data; 5550 5538 struct page *page = virt_to_page(bpage); 5551 5539 unsigned long flags; 5552 5540 ··· 5556 5542 5557 5543 cpu_buffer = buffer->buffers[cpu]; 5558 5544 5559 - /* If the page is still in use someplace else, we can't reuse it */ 5560 - if (page_ref_count(page) > 1) 5545 + /* 5546 + * If the page is still in use someplace else, or order of the page 5547 + * is different from the subbuffer order of the buffer - 5548 + * we can't reuse it 5549 + */ 5550 + if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 5561 5551 goto out; 5562 5552 5563 5553 local_irq_save(flags); ··· 5576 5558 local_irq_restore(flags); 5577 5559 5578 5560 out: 5579 - free_pages((unsigned long)bpage, buffer->subbuf_order); 5561 + free_pages((unsigned long)bpage, data_page->order); 5562 + kfree(data_page); 5580 5563 } 5581 5564 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5582 5565 ··· 5598 5579 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5599 5580 * if (IS_ERR(rpage)) 5600 5581 * return PTR_ERR(rpage); 5601 - * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 5582 + * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 5602 5583 * if (ret >= 0) 5603 - * process_page(rpage, ret); 5584 + * process_page(ring_buffer_read_page_data(rpage), ret); 5585 + * ring_buffer_free_read_page(buffer, cpu, rpage); 5604 5586 * 5605 5587 * When @full is set, the function will not return true unless 5606 5588 * the writer is off the reader page. ··· 5616 5596 * <0 if no data has been transferred. 5617 5597 */ 5618 5598 int ring_buffer_read_page(struct trace_buffer *buffer, 5619 - void **data_page, size_t len, int cpu, int full) 5599 + struct buffer_data_read_page *data_page, 5600 + size_t len, int cpu, int full) 5620 5601 { 5621 5602 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5622 5603 struct ring_buffer_event *event; ··· 5642 5621 5643 5622 len -= BUF_PAGE_HDR_SIZE; 5644 5623 5645 - if (!data_page) 5624 + if (!data_page || !data_page->data) 5625 + goto out; 5626 + if (data_page->order != buffer->subbuf_order) 5646 5627 goto out; 5647 5628 5648 - bpage = *data_page; 5629 + bpage = data_page->data; 5649 5630 if (!bpage) 5650 5631 goto out; 5651 5632 ··· 5741 5718 /* swap the pages */ 5742 5719 rb_init_page(bpage); 5743 5720 bpage = reader->page; 5744 - reader->page = *data_page; 5721 + reader->page = data_page->data; 5745 5722 local_set(&reader->write, 0); 5746 5723 local_set(&reader->entries, 0); 5747 5724 reader->read = 0; 5748 - *data_page = bpage; 5725 + data_page->data = bpage; 5749 5726 5750 5727 /* 5751 5728 * Use the real_end for the data size, ··· 5789 5766 return ret; 5790 5767 } 5791 5768 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5769 + 5770 + /** 5771 + * ring_buffer_read_page_data - get pointer to the data in the page. 5772 + * @page: the page to get the data from 5773 + * 5774 + * Returns pointer to the actual data in this page. 5775 + */ 5776 + void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 5777 + { 5778 + return page->data; 5779 + } 5780 + EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 5792 5781 5793 5782 /** 5794 5783 * ring_buffer_subbuf_size_get - get size of the sub buffer.
+6 -4
kernel/trace/ring_buffer_benchmark.c
··· 104 104 105 105 static enum event_status read_page(int cpu) 106 106 { 107 + struct buffer_data_read_page *bpage; 107 108 struct ring_buffer_event *event; 108 109 struct rb_page *rpage; 109 110 unsigned long commit; 110 - void *bpage; 111 + int page_size; 111 112 int *entry; 112 113 int ret; 113 114 int inc; ··· 118 117 if (IS_ERR(bpage)) 119 118 return EVENT_DROPPED; 120 119 121 - ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1); 120 + page_size = ring_buffer_subbuf_size_get(buffer); 121 + ret = ring_buffer_read_page(buffer, bpage, page_size, cpu, 1); 122 122 if (ret >= 0) { 123 - rpage = bpage; 123 + rpage = ring_buffer_read_page_data(bpage); 124 124 /* The commit may have missed event flags set, clear them */ 125 125 commit = local_read(&rpage->commit) & 0xfffff; 126 126 for (i = 0; i < commit && !test_error ; i += inc) { 127 127 128 - if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) { 128 + if (i >= (page_size - offsetof(struct rb_page, data))) { 129 129 TEST_ERROR(); 130 130 break; 131 131 }
+20 -14
kernel/trace/trace.c
··· 8286 8286 { 8287 8287 struct ftrace_buffer_info *info = filp->private_data; 8288 8288 struct trace_iterator *iter = &info->iter; 8289 + void *trace_data; 8290 + int page_size; 8289 8291 ssize_t ret = 0; 8290 8292 ssize_t size; 8291 8293 ··· 8298 8296 if (iter->snapshot && iter->tr->current_trace->use_max_tr) 8299 8297 return -EBUSY; 8300 8298 #endif 8299 + 8300 + page_size = ring_buffer_subbuf_size_get(iter->array_buffer->buffer); 8301 8301 8302 8302 if (!info->spare) { 8303 8303 info->spare = ring_buffer_alloc_read_page(iter->array_buffer->buffer, ··· 8315 8311 return ret; 8316 8312 8317 8313 /* Do we have previous read data to read? */ 8318 - if (info->read < PAGE_SIZE) 8314 + if (info->read < page_size) 8319 8315 goto read; 8320 8316 8321 8317 again: 8322 8318 trace_access_lock(iter->cpu_file); 8323 8319 ret = ring_buffer_read_page(iter->array_buffer->buffer, 8324 - &info->spare, 8320 + info->spare, 8325 8321 count, 8326 8322 iter->cpu_file, 0); 8327 8323 trace_access_unlock(iter->cpu_file); ··· 8342 8338 8343 8339 info->read = 0; 8344 8340 read: 8345 - size = PAGE_SIZE - info->read; 8341 + size = page_size - info->read; 8346 8342 if (size > count) 8347 8343 size = count; 8348 - 8349 - ret = copy_to_user(ubuf, info->spare + info->read, size); 8344 + trace_data = ring_buffer_read_page_data(info->spare); 8345 + ret = copy_to_user(ubuf, trace_data + info->read, size); 8350 8346 if (ret == size) 8351 8347 return -EFAULT; 8352 8348 ··· 8457 8453 .spd_release = buffer_spd_release, 8458 8454 }; 8459 8455 struct buffer_ref *ref; 8456 + int page_size; 8460 8457 int entries, i; 8461 8458 ssize_t ret = 0; 8462 8459 ··· 8466 8461 return -EBUSY; 8467 8462 #endif 8468 8463 8469 - if (*ppos & (PAGE_SIZE - 1)) 8464 + page_size = ring_buffer_subbuf_size_get(iter->array_buffer->buffer); 8465 + if (*ppos & (page_size - 1)) 8470 8466 return -EINVAL; 8471 8467 8472 - if (len & (PAGE_SIZE - 1)) { 8473 - if (len < PAGE_SIZE) 8468 + if (len & (page_size - 1)) { 8469 + if (len < page_size) 8474 8470 return -EINVAL; 8475 - len &= PAGE_MASK; 8471 + len &= (~(page_size - 1)); 8476 8472 } 8477 8473 8478 8474 if (splice_grow_spd(pipe, &spd)) ··· 8483 8477 trace_access_lock(iter->cpu_file); 8484 8478 entries = ring_buffer_entries_cpu(iter->array_buffer->buffer, iter->cpu_file); 8485 8479 8486 - for (i = 0; i < spd.nr_pages_max && len && entries; i++, len -= PAGE_SIZE) { 8480 + for (i = 0; i < spd.nr_pages_max && len && entries; i++, len -= page_size) { 8487 8481 struct page *page; 8488 8482 int r; 8489 8483 ··· 8504 8498 } 8505 8499 ref->cpu = iter->cpu_file; 8506 8500 8507 - r = ring_buffer_read_page(ref->buffer, &ref->page, 8501 + r = ring_buffer_read_page(ref->buffer, ref->page, 8508 8502 len, iter->cpu_file, 1); 8509 8503 if (r < 0) { 8510 8504 ring_buffer_free_read_page(ref->buffer, ref->cpu, ··· 8513 8507 break; 8514 8508 } 8515 8509 8516 - page = virt_to_page(ref->page); 8510 + page = virt_to_page(ring_buffer_read_page_data(ref->page)); 8517 8511 8518 8512 spd.pages[i] = page; 8519 - spd.partial[i].len = PAGE_SIZE; 8513 + spd.partial[i].len = page_size; 8520 8514 spd.partial[i].offset = 0; 8521 8515 spd.partial[i].private = (unsigned long)ref; 8522 8516 spd.nr_pages++; 8523 - *ppos += PAGE_SIZE; 8517 + *ppos += page_size; 8524 8518 8525 8519 entries = ring_buffer_entries_cpu(iter->array_buffer->buffer, iter->cpu_file); 8526 8520 }