Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge tag 'trace-ring-buffer-v6.8-rc7-2' of git://git.kernel.org/pub/scm/linux/kernel/git/trace/linux-trace

Pull tracing updates from Steven Rostedt:

- Do not update shortest_full in rb_watermark_hit() if the watermark is
hit. The shortest_full field was being updated regardless if the task
was going to wait or not. If the watermark is hit, then the task is
not going to wait, so do not update the shortest_full field (used by
the waker).

- Update shortest_full field before setting the full_waiters_pending
flag

In the poll logic, the full_waiters_pending flag was being set before
the shortest_full field was set. If the full_waiters_pending flag is
set, writers will check the shortest_full field which has the least
percentage of data that the ring buffer needs to be filled before
waking up. The writer will check shortest_full if
full_waiters_pending is set, and if the ring buffer percentage filled
is greater than shortest full, then it will call the irq_work to wake
up the waiters.

The problem was that the poll logic set the full_waiters_pending flag
before updating shortest_full, which when zero will always trigger
the writer to call the irq_work to wake up the waiters. The irq_work
will reset the shortest_full field back to zero as the woken waiters
is suppose to reset it.

- There's some optimized logic in the rb_watermark_hit() that is used
in ring_buffer_wait(). Use that helper function in the poll logic as
well.

- Restructure ring_buffer_wait() to use wait_event_interruptible()

The logic to wake up pending readers when the file descriptor is
closed is racy. Restructure ring_buffer_wait() to allow callers to
pass in conditions besides the ring buffer having enough data in it
by using wait_event_interruptible().

- Update the tracing_wait_on_pipe() to call ring_buffer_wait() with its
own conditions to exit the wait loop.

* tag 'trace-ring-buffer-v6.8-rc7-2' of git://git.kernel.org/pub/scm/linux/kernel/git/trace/linux-trace:
tracing/ring-buffer: Fix wait_on_pipe() race
ring-buffer: Use wait_event_interruptible() in ring_buffer_wait()
ring-buffer: Reuse rb_watermark_hit() for the poll logic
ring-buffer: Fix full_waiters_pending in poll
ring-buffer: Do not set shortest_full when full target is hit

+134 -80
+3 -1
include/linux/ring_buffer.h
··· 98 98 __ring_buffer_alloc((size), (flags), &__key); \ 99 99 }) 100 100 101 - int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full); 101 + typedef bool (*ring_buffer_cond_fn)(void *data); 102 + int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 103 + ring_buffer_cond_fn cond, void *data); 102 104 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 103 105 struct file *filp, poll_table *poll_table, int full); 104 106 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu);
+4 -1
include/linux/trace_events.h
··· 103 103 unsigned int temp_size; 104 104 char *fmt; /* modified format holder */ 105 105 unsigned int fmt_size; 106 - long wait_index; 106 + atomic_t wait_index; 107 107 108 108 /* trace_seq for __print_flags() and __print_symbolic() etc. */ 109 109 struct trace_seq tmp_seq; 110 110 111 111 cpumask_var_t started; 112 + 113 + /* Set when the file is closed to prevent new waiters */ 114 + bool closed; 112 115 113 116 /* it's true when current open file is snapshot */ 114 117 bool snapshot;
+96 -66
kernel/trace/ring_buffer.c
··· 834 834 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 835 835 ret = !pagebusy && full_hit(buffer, cpu, full); 836 836 837 - if (!cpu_buffer->shortest_full || 838 - cpu_buffer->shortest_full > full) 839 - cpu_buffer->shortest_full = full; 837 + if (!ret && (!cpu_buffer->shortest_full || 838 + cpu_buffer->shortest_full > full)) { 839 + cpu_buffer->shortest_full = full; 840 + } 840 841 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 841 842 } 842 843 return ret; 843 844 } 844 845 845 - /** 846 - * ring_buffer_wait - wait for input to the ring buffer 847 - * @buffer: buffer to wait on 848 - * @cpu: the cpu buffer to wait on 849 - * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 850 - * 851 - * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 852 - * as data is added to any of the @buffer's cpu buffers. Otherwise 853 - * it will wait for data to be added to a specific cpu buffer. 854 - */ 855 - int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) 846 + static inline bool 847 + rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 848 + int cpu, int full, ring_buffer_cond_fn cond, void *data) 856 849 { 857 - struct ring_buffer_per_cpu *cpu_buffer; 858 - DEFINE_WAIT(wait); 859 - struct rb_irq_work *work; 860 - int ret = 0; 850 + if (rb_watermark_hit(buffer, cpu, full)) 851 + return true; 861 852 862 - /* 863 - * Depending on what the caller is waiting for, either any 864 - * data in any cpu buffer, or a specific buffer, put the 865 - * caller on the appropriate wait queue. 866 - */ 867 - if (cpu == RING_BUFFER_ALL_CPUS) { 868 - work = &buffer->irq_work; 869 - /* Full only makes sense on per cpu reads */ 870 - full = 0; 871 - } else { 872 - if (!cpumask_test_cpu(cpu, buffer->cpumask)) 873 - return -ENODEV; 874 - cpu_buffer = buffer->buffers[cpu]; 875 - work = &cpu_buffer->irq_work; 876 - } 877 - 878 - if (full) 879 - prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 880 - else 881 - prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 853 + if (cond(data)) 854 + return true; 882 855 883 856 /* 884 857 * The events can happen in critical sections where ··· 874 901 * a task has been queued. It's OK for spurious wake ups. 875 902 */ 876 903 if (full) 877 - work->full_waiters_pending = true; 904 + rbwork->full_waiters_pending = true; 878 905 else 879 - work->waiters_pending = true; 906 + rbwork->waiters_pending = true; 880 907 881 - if (rb_watermark_hit(buffer, cpu, full)) 882 - goto out; 908 + return false; 909 + } 883 910 884 - if (signal_pending(current)) { 885 - ret = -EINTR; 886 - goto out; 911 + /* 912 + * The default wait condition for ring_buffer_wait() is to just to exit the 913 + * wait loop the first time it is woken up. 914 + */ 915 + static bool rb_wait_once(void *data) 916 + { 917 + long *once = data; 918 + 919 + /* wait_event() actually calls this twice before scheduling*/ 920 + if (*once > 1) 921 + return true; 922 + 923 + (*once)++; 924 + return false; 925 + } 926 + 927 + /** 928 + * ring_buffer_wait - wait for input to the ring buffer 929 + * @buffer: buffer to wait on 930 + * @cpu: the cpu buffer to wait on 931 + * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 932 + * @cond: condition function to break out of wait (NULL to run once) 933 + * @data: the data to pass to @cond. 934 + * 935 + * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 936 + * as data is added to any of the @buffer's cpu buffers. Otherwise 937 + * it will wait for data to be added to a specific cpu buffer. 938 + */ 939 + int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 940 + ring_buffer_cond_fn cond, void *data) 941 + { 942 + struct ring_buffer_per_cpu *cpu_buffer; 943 + struct wait_queue_head *waitq; 944 + struct rb_irq_work *rbwork; 945 + long once = 0; 946 + int ret = 0; 947 + 948 + if (!cond) { 949 + cond = rb_wait_once; 950 + data = &once; 887 951 } 888 952 889 - schedule(); 890 - out: 891 - if (full) 892 - finish_wait(&work->full_waiters, &wait); 893 - else 894 - finish_wait(&work->waiters, &wait); 953 + /* 954 + * Depending on what the caller is waiting for, either any 955 + * data in any cpu buffer, or a specific buffer, put the 956 + * caller on the appropriate wait queue. 957 + */ 958 + if (cpu == RING_BUFFER_ALL_CPUS) { 959 + rbwork = &buffer->irq_work; 960 + /* Full only makes sense on per cpu reads */ 961 + full = 0; 962 + } else { 963 + if (!cpumask_test_cpu(cpu, buffer->cpumask)) 964 + return -ENODEV; 965 + cpu_buffer = buffer->buffers[cpu]; 966 + rbwork = &cpu_buffer->irq_work; 967 + } 895 968 896 - if (!ret && !rb_watermark_hit(buffer, cpu, full) && signal_pending(current)) 897 - ret = -EINTR; 969 + if (full) 970 + waitq = &rbwork->full_waiters; 971 + else 972 + waitq = &rbwork->waiters; 973 + 974 + ret = wait_event_interruptible((*waitq), 975 + rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 898 976 899 977 return ret; 900 978 } ··· 983 959 } 984 960 985 961 if (full) { 986 - unsigned long flags; 987 - 988 962 poll_wait(filp, &rbwork->full_waiters, poll_table); 989 963 990 - raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 964 + if (rb_watermark_hit(buffer, cpu, full)) 965 + return EPOLLIN | EPOLLRDNORM; 966 + /* 967 + * Only allow full_waiters_pending update to be seen after 968 + * the shortest_full is set (in rb_watermark_hit). If the 969 + * writer sees the full_waiters_pending flag set, it will 970 + * compare the amount in the ring buffer to shortest_full. 971 + * If the amount in the ring buffer is greater than the 972 + * shortest_full percent, it will call the irq_work handler 973 + * to wake up this list. The irq_handler will reset shortest_full 974 + * back to zero. That's done under the reader_lock, but 975 + * the below smp_mb() makes sure that the update to 976 + * full_waiters_pending doesn't leak up into the above. 977 + */ 978 + smp_mb(); 991 979 rbwork->full_waiters_pending = true; 992 - if (!cpu_buffer->shortest_full || 993 - cpu_buffer->shortest_full > full) 994 - cpu_buffer->shortest_full = full; 995 - raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 996 - } else { 997 - poll_wait(filp, &rbwork->waiters, poll_table); 998 - rbwork->waiters_pending = true; 980 + return 0; 999 981 } 982 + 983 + poll_wait(filp, &rbwork->waiters, poll_table); 984 + rbwork->waiters_pending = true; 1000 985 1001 986 /* 1002 987 * There's a tight race between setting the waiters_pending and ··· 1021 988 * will fix it later. 1022 989 */ 1023 990 smp_mb(); 1024 - 1025 - if (full) 1026 - return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0; 1027 991 1028 992 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1029 993 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
+31 -12
kernel/trace/trace.c
··· 1955 1955 1956 1956 #endif /* CONFIG_TRACER_MAX_TRACE */ 1957 1957 1958 + struct pipe_wait { 1959 + struct trace_iterator *iter; 1960 + int wait_index; 1961 + }; 1962 + 1963 + static bool wait_pipe_cond(void *data) 1964 + { 1965 + struct pipe_wait *pwait = data; 1966 + struct trace_iterator *iter = pwait->iter; 1967 + 1968 + if (atomic_read_acquire(&iter->wait_index) != pwait->wait_index) 1969 + return true; 1970 + 1971 + return iter->closed; 1972 + } 1973 + 1958 1974 static int wait_on_pipe(struct trace_iterator *iter, int full) 1959 1975 { 1976 + struct pipe_wait pwait; 1960 1977 int ret; 1961 1978 1962 1979 /* Iterators are static, they should be filled or empty */ 1963 1980 if (trace_buffer_iter(iter, iter->cpu_file)) 1964 1981 return 0; 1965 1982 1966 - ret = ring_buffer_wait(iter->array_buffer->buffer, iter->cpu_file, full); 1983 + pwait.wait_index = atomic_read_acquire(&iter->wait_index); 1984 + pwait.iter = iter; 1985 + 1986 + ret = ring_buffer_wait(iter->array_buffer->buffer, iter->cpu_file, full, 1987 + wait_pipe_cond, &pwait); 1967 1988 1968 1989 #ifdef CONFIG_TRACER_MAX_TRACE 1969 1990 /* ··· 8418 8397 struct ftrace_buffer_info *info = file->private_data; 8419 8398 struct trace_iterator *iter = &info->iter; 8420 8399 8421 - iter->wait_index++; 8400 + iter->closed = true; 8422 8401 /* Make sure the waiters see the new wait_index */ 8423 - smp_wmb(); 8402 + (void)atomic_fetch_inc_release(&iter->wait_index); 8424 8403 8425 8404 ring_buffer_wake_waiters(iter->array_buffer->buffer, iter->cpu_file); 8426 8405 ··· 8520 8499 .spd_release = buffer_spd_release, 8521 8500 }; 8522 8501 struct buffer_ref *ref; 8502 + bool woken = false; 8523 8503 int page_size; 8524 8504 int entries, i; 8525 8505 ssize_t ret = 0; ··· 8594 8572 8595 8573 /* did we read anything? */ 8596 8574 if (!spd.nr_pages) { 8597 - long wait_index; 8598 8575 8599 8576 if (ret) 8577 + goto out; 8578 + 8579 + if (woken) 8600 8580 goto out; 8601 8581 8602 8582 ret = -EAGAIN; 8603 8583 if ((file->f_flags & O_NONBLOCK) || (flags & SPLICE_F_NONBLOCK)) 8604 8584 goto out; 8605 - 8606 - wait_index = READ_ONCE(iter->wait_index); 8607 8585 8608 8586 ret = wait_on_pipe(iter, iter->snapshot ? 0 : iter->tr->buffer_percent); 8609 8587 if (ret) ··· 8613 8591 if (!tracer_tracing_is_on(iter->tr)) 8614 8592 goto out; 8615 8593 8616 - /* Make sure we see the new wait_index */ 8617 - smp_rmb(); 8618 - if (wait_index != iter->wait_index) 8619 - goto out; 8594 + /* Iterate one more time to collect any new data then exit */ 8595 + woken = true; 8620 8596 8621 8597 goto again; 8622 8598 } ··· 8637 8617 8638 8618 mutex_lock(&trace_types_lock); 8639 8619 8640 - iter->wait_index++; 8641 8620 /* Make sure the waiters see the new wait_index */ 8642 - smp_wmb(); 8621 + (void)atomic_fetch_inc_release(&iter->wait_index); 8643 8622 8644 8623 ring_buffer_wake_waiters(iter->array_buffer->buffer, iter->cpu_file); 8645 8624