Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

bpf: Add libbpf logic for user-space ring buffer

Now that all of the logic is in place in the kernel to support user-space
produced ring buffers, we can add the user-space logic to libbpf. This
patch therefore adds the following public symbols to libbpf:

struct user_ring_buffer *
user_ring_buffer__new(int map_fd,
const struct user_ring_buffer_opts *opts);
void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
__u32 size, int timeout_ms);
void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample);
void user_ring_buffer__discard(struct user_ring_buffer *rb,
void user_ring_buffer__free(struct user_ring_buffer *rb);

A user-space producer must first create a struct user_ring_buffer * object
with user_ring_buffer__new(), and can then reserve samples in the
ring buffer using one of the following two symbols:

void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
__u32 size, int timeout_ms);

With user_ring_buffer__reserve(), a pointer to a 'size' region of the ring
buffer will be returned if sufficient space is available in the buffer.
user_ring_buffer__reserve_blocking() provides similar semantics, but will
block for up to 'timeout_ms' in epoll_wait if there is insufficient space
in the buffer. This function has the guarantee from the kernel that it will
receive at least one event-notification per invocation to
bpf_ringbuf_drain(), provided that at least one sample is drained, and the
BPF program did not pass the BPF_RB_NO_WAKEUP flag to bpf_ringbuf_drain().

Once a sample is reserved, it must either be committed to the ring buffer
with user_ring_buffer__submit(), or discarded with
user_ring_buffer__discard().

Signed-off-by: David Vernet <void@manifault.com>
Signed-off-by: Andrii Nakryiko <andrii@kernel.org>
Link: https://lore.kernel.org/bpf/20220920000100.477320-4-void@manifault.com

authored by

David Vernet and committed by
Andrii Nakryiko
b66ccae0 20571567

+398 -3
+8 -2
tools/lib/bpf/libbpf.c
··· 2373 2373 return sz; 2374 2374 } 2375 2375 2376 + static bool map_is_ringbuf(const struct bpf_map *map) 2377 + { 2378 + return map->def.type == BPF_MAP_TYPE_RINGBUF || 2379 + map->def.type == BPF_MAP_TYPE_USER_RINGBUF; 2380 + } 2381 + 2376 2382 static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def) 2377 2383 { 2378 2384 map->def.type = def->map_type; ··· 2393 2387 map->btf_value_type_id = def->value_type_id; 2394 2388 2395 2389 /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ 2396 - if (map->def.type == BPF_MAP_TYPE_RINGBUF) 2390 + if (map_is_ringbuf(map)) 2397 2391 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); 2398 2392 2399 2393 if (def->parts & MAP_DEF_MAP_TYPE) ··· 4376 4370 map->def.max_entries = max_entries; 4377 4371 4378 4372 /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */ 4379 - if (map->def.type == BPF_MAP_TYPE_RINGBUF) 4373 + if (map_is_ringbuf(map)) 4380 4374 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries); 4381 4375 4382 4376 return 0;
+107
tools/lib/bpf/libbpf.h
··· 1011 1011 1012 1012 /* Ring buffer APIs */ 1013 1013 struct ring_buffer; 1014 + struct user_ring_buffer; 1014 1015 1015 1016 typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); 1016 1017 ··· 1030 1029 LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); 1031 1030 LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); 1032 1031 LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); 1032 + 1033 + struct user_ring_buffer_opts { 1034 + size_t sz; /* size of this struct, for forward/backward compatibility */ 1035 + }; 1036 + 1037 + #define user_ring_buffer_opts__last_field sz 1038 + 1039 + /* @brief **user_ring_buffer__new()** creates a new instance of a user ring 1040 + * buffer. 1041 + * 1042 + * @param map_fd A file descriptor to a BPF_MAP_TYPE_USER_RINGBUF map. 1043 + * @param opts Options for how the ring buffer should be created. 1044 + * @return A user ring buffer on success; NULL and errno being set on a 1045 + * failure. 1046 + */ 1047 + LIBBPF_API struct user_ring_buffer * 1048 + user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts); 1049 + 1050 + /* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the 1051 + * user ring buffer. 1052 + * @param rb A pointer to a user ring buffer. 1053 + * @param size The size of the sample, in bytes. 1054 + * @return A pointer to an 8-byte aligned reserved region of the user ring 1055 + * buffer; NULL, and errno being set if a sample could not be reserved. 1056 + * 1057 + * This function is *not* thread safe, and callers must synchronize accessing 1058 + * this function if there are multiple producers. If a size is requested that 1059 + * is larger than the size of the entire ring buffer, errno will be set to 1060 + * E2BIG and NULL is returned. If the ring buffer could accommodate the size, 1061 + * but currently does not have enough space, errno is set to ENOSPC and NULL is 1062 + * returned. 1063 + * 1064 + * After initializing the sample, callers must invoke 1065 + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise, 1066 + * the sample must be freed with **user_ring_buffer__discard()**. 1067 + */ 1068 + LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size); 1069 + 1070 + /* @brief **user_ring_buffer__reserve_blocking()** reserves a record in the 1071 + * ring buffer, possibly blocking for up to @timeout_ms until a sample becomes 1072 + * available. 1073 + * @param rb The user ring buffer. 1074 + * @param size The size of the sample, in bytes. 1075 + * @param timeout_ms The amount of time, in milliseconds, for which the caller 1076 + * should block when waiting for a sample. -1 causes the caller to block 1077 + * indefinitely. 1078 + * @return A pointer to an 8-byte aligned reserved region of the user ring 1079 + * buffer; NULL, and errno being set if a sample could not be reserved. 1080 + * 1081 + * This function is *not* thread safe, and callers must synchronize 1082 + * accessing this function if there are multiple producers 1083 + * 1084 + * If **timeout_ms** is -1, the function will block indefinitely until a sample 1085 + * becomes available. Otherwise, **timeout_ms** must be non-negative, or errno 1086 + * is set to EINVAL, and NULL is returned. If **timeout_ms** is 0, no blocking 1087 + * will occur and the function will return immediately after attempting to 1088 + * reserve a sample. 1089 + * 1090 + * If **size** is larger than the size of the entire ring buffer, errno is set 1091 + * to E2BIG and NULL is returned. If the ring buffer could accommodate 1092 + * **size**, but currently does not have enough space, the caller will block 1093 + * until at most **timeout_ms** has elapsed. If insufficient space is available 1094 + * at that time, errno is set to ENOSPC, and NULL is returned. 1095 + * 1096 + * The kernel guarantees that it will wake up this thread to check if 1097 + * sufficient space is available in the ring buffer at least once per 1098 + * invocation of the **bpf_ringbuf_drain()** helper function, provided that at 1099 + * least one sample is consumed, and the BPF program did not invoke the 1100 + * function with BPF_RB_NO_WAKEUP. A wakeup may occur sooner than that, but the 1101 + * kernel does not guarantee this. If the helper function is invoked with 1102 + * BPF_RB_FORCE_WAKEUP, a wakeup event will be sent even if no sample is 1103 + * consumed. 1104 + * 1105 + * When a sample of size **size** is found within **timeout_ms**, a pointer to 1106 + * the sample is returned. After initializing the sample, callers must invoke 1107 + * **user_ring_buffer__submit()** to post the sample to the ring buffer. 1108 + * Otherwise, the sample must be freed with **user_ring_buffer__discard()**. 1109 + */ 1110 + LIBBPF_API void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, 1111 + __u32 size, 1112 + int timeout_ms); 1113 + 1114 + /* @brief **user_ring_buffer__submit()** submits a previously reserved sample 1115 + * into the ring buffer. 1116 + * @param rb The user ring buffer. 1117 + * @param sample A reserved sample. 1118 + * 1119 + * It is not necessary to synchronize amongst multiple producers when invoking 1120 + * this function. 1121 + */ 1122 + LIBBPF_API void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample); 1123 + 1124 + /* @brief **user_ring_buffer__discard()** discards a previously reserved sample. 1125 + * @param rb The user ring buffer. 1126 + * @param sample A reserved sample. 1127 + * 1128 + * It is not necessary to synchronize amongst multiple producers when invoking 1129 + * this function. 1130 + */ 1131 + LIBBPF_API void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample); 1132 + 1133 + /* @brief **user_ring_buffer__free()** frees a ring buffer that was previously 1134 + * created with **user_ring_buffer__new()**. 1135 + * @param rb The user ring buffer being freed. 1136 + */ 1137 + LIBBPF_API void user_ring_buffer__free(struct user_ring_buffer *rb); 1033 1138 1034 1139 /* Perf buffer APIs */ 1035 1140 struct perf_buffer;
+10
tools/lib/bpf/libbpf.map
··· 368 368 libbpf_bpf_prog_type_str; 369 369 perf_buffer__buffer; 370 370 }; 371 + 372 + LIBBPF_1.1.0 { 373 + global: 374 + user_ring_buffer__discard; 375 + user_ring_buffer__free; 376 + user_ring_buffer__new; 377 + user_ring_buffer__reserve; 378 + user_ring_buffer__reserve_blocking; 379 + user_ring_buffer__submit; 380 + } LIBBPF_1.0.0;
+1
tools/lib/bpf/libbpf_probes.c
··· 231 231 return btf_fd; 232 232 break; 233 233 case BPF_MAP_TYPE_RINGBUF: 234 + case BPF_MAP_TYPE_USER_RINGBUF: 234 235 key_size = 0; 235 236 value_size = 0; 236 237 max_entries = 4096;
+1 -1
tools/lib/bpf/libbpf_version.h
··· 4 4 #define __LIBBPF_VERSION_H 5 5 6 6 #define LIBBPF_MAJOR_VERSION 1 7 - #define LIBBPF_MINOR_VERSION 0 7 + #define LIBBPF_MINOR_VERSION 1 8 8 9 9 #endif /* __LIBBPF_VERSION_H */
+271
tools/lib/bpf/ringbuf.c
··· 16 16 #include <asm/barrier.h> 17 17 #include <sys/mman.h> 18 18 #include <sys/epoll.h> 19 + #include <time.h> 19 20 20 21 #include "libbpf.h" 21 22 #include "libbpf_internal.h" ··· 38 37 size_t page_size; 39 38 int epoll_fd; 40 39 int ring_cnt; 40 + }; 41 + 42 + struct user_ring_buffer { 43 + struct epoll_event event; 44 + unsigned long *consumer_pos; 45 + unsigned long *producer_pos; 46 + void *data; 47 + unsigned long mask; 48 + size_t page_size; 49 + int map_fd; 50 + int epoll_fd; 51 + }; 52 + 53 + /* 8-byte ring buffer header structure */ 54 + struct ringbuf_hdr { 55 + __u32 len; 56 + __u32 pad; 41 57 }; 42 58 43 59 static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) ··· 317 299 int ring_buffer__epoll_fd(const struct ring_buffer *rb) 318 300 { 319 301 return rb->epoll_fd; 302 + } 303 + 304 + static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb) 305 + { 306 + if (rb->consumer_pos) { 307 + munmap(rb->consumer_pos, rb->page_size); 308 + rb->consumer_pos = NULL; 309 + } 310 + if (rb->producer_pos) { 311 + munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1)); 312 + rb->producer_pos = NULL; 313 + } 314 + } 315 + 316 + void user_ring_buffer__free(struct user_ring_buffer *rb) 317 + { 318 + if (!rb) 319 + return; 320 + 321 + user_ringbuf_unmap_ring(rb); 322 + 323 + if (rb->epoll_fd >= 0) 324 + close(rb->epoll_fd); 325 + 326 + free(rb); 327 + } 328 + 329 + static int user_ringbuf_map(struct user_ring_buffer *rb, int map_fd) 330 + { 331 + struct bpf_map_info info; 332 + __u32 len = sizeof(info); 333 + void *tmp; 334 + struct epoll_event *rb_epoll; 335 + int err; 336 + 337 + memset(&info, 0, sizeof(info)); 338 + 339 + err = bpf_obj_get_info_by_fd(map_fd, &info, &len); 340 + if (err) { 341 + err = -errno; 342 + pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n", map_fd, err); 343 + return err; 344 + } 345 + 346 + if (info.type != BPF_MAP_TYPE_USER_RINGBUF) { 347 + pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd); 348 + return -EINVAL; 349 + } 350 + 351 + rb->map_fd = map_fd; 352 + rb->mask = info.max_entries - 1; 353 + 354 + /* Map read-only consumer page */ 355 + tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0); 356 + if (tmp == MAP_FAILED) { 357 + err = -errno; 358 + pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n", 359 + map_fd, err); 360 + return err; 361 + } 362 + rb->consumer_pos = tmp; 363 + 364 + /* Map read-write the producer page and data pages. We map the data 365 + * region as twice the total size of the ring buffer to allow the 366 + * simple reading and writing of samples that wrap around the end of 367 + * the buffer. See the kernel implementation for details. 368 + */ 369 + tmp = mmap(NULL, rb->page_size + 2 * info.max_entries, 370 + PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size); 371 + if (tmp == MAP_FAILED) { 372 + err = -errno; 373 + pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n", 374 + map_fd, err); 375 + return err; 376 + } 377 + 378 + rb->producer_pos = tmp; 379 + rb->data = tmp + rb->page_size; 380 + 381 + rb_epoll = &rb->event; 382 + rb_epoll->events = EPOLLOUT; 383 + if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) { 384 + err = -errno; 385 + pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n", map_fd, err); 386 + return err; 387 + } 388 + 389 + return 0; 390 + } 391 + 392 + struct user_ring_buffer * 393 + user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts) 394 + { 395 + struct user_ring_buffer *rb; 396 + int err; 397 + 398 + if (!OPTS_VALID(opts, user_ring_buffer_opts)) 399 + return errno = EINVAL, NULL; 400 + 401 + rb = calloc(1, sizeof(*rb)); 402 + if (!rb) 403 + return errno = ENOMEM, NULL; 404 + 405 + rb->page_size = getpagesize(); 406 + 407 + rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC); 408 + if (rb->epoll_fd < 0) { 409 + err = -errno; 410 + pr_warn("user ringbuf: failed to create epoll instance: %d\n", err); 411 + goto err_out; 412 + } 413 + 414 + err = user_ringbuf_map(rb, map_fd); 415 + if (err) 416 + goto err_out; 417 + 418 + return rb; 419 + 420 + err_out: 421 + user_ring_buffer__free(rb); 422 + return errno = -err, NULL; 423 + } 424 + 425 + static void user_ringbuf_commit(struct user_ring_buffer *rb, void *sample, bool discard) 426 + { 427 + __u32 new_len; 428 + struct ringbuf_hdr *hdr; 429 + uintptr_t hdr_offset; 430 + 431 + hdr_offset = rb->mask + 1 + (sample - rb->data) - BPF_RINGBUF_HDR_SZ; 432 + hdr = rb->data + (hdr_offset & rb->mask); 433 + 434 + new_len = hdr->len & ~BPF_RINGBUF_BUSY_BIT; 435 + if (discard) 436 + new_len |= BPF_RINGBUF_DISCARD_BIT; 437 + 438 + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in 439 + * the kernel. 440 + */ 441 + __atomic_exchange_n(&hdr->len, new_len, __ATOMIC_ACQ_REL); 442 + } 443 + 444 + void user_ring_buffer__discard(struct user_ring_buffer *rb, void *sample) 445 + { 446 + user_ringbuf_commit(rb, sample, true); 447 + } 448 + 449 + void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample) 450 + { 451 + user_ringbuf_commit(rb, sample, false); 452 + } 453 + 454 + void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size) 455 + { 456 + __u32 avail_size, total_size, max_size; 457 + /* 64-bit to avoid overflow in case of extreme application behavior */ 458 + __u64 cons_pos, prod_pos; 459 + struct ringbuf_hdr *hdr; 460 + 461 + /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in 462 + * the kernel. 463 + */ 464 + cons_pos = smp_load_acquire(rb->consumer_pos); 465 + /* Synchronizes with smp_store_release() in user_ringbuf_commit() */ 466 + prod_pos = smp_load_acquire(rb->producer_pos); 467 + 468 + max_size = rb->mask + 1; 469 + avail_size = max_size - (prod_pos - cons_pos); 470 + /* Round up total size to a multiple of 8. */ 471 + total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8; 472 + 473 + if (total_size > max_size) 474 + return errno = E2BIG, NULL; 475 + 476 + if (avail_size < total_size) 477 + return errno = ENOSPC, NULL; 478 + 479 + hdr = rb->data + (prod_pos & rb->mask); 480 + hdr->len = size | BPF_RINGBUF_BUSY_BIT; 481 + hdr->pad = 0; 482 + 483 + /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in 484 + * the kernel. 485 + */ 486 + smp_store_release(rb->producer_pos, prod_pos + total_size); 487 + 488 + return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask); 489 + } 490 + 491 + static __u64 ns_elapsed_timespec(const struct timespec *start, const struct timespec *end) 492 + { 493 + __u64 start_ns, end_ns, ns_per_s = 1000000000; 494 + 495 + start_ns = (__u64)start->tv_sec * ns_per_s + start->tv_nsec; 496 + end_ns = (__u64)end->tv_sec * ns_per_s + end->tv_nsec; 497 + 498 + return end_ns - start_ns; 499 + } 500 + 501 + void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms) 502 + { 503 + void *sample; 504 + int err, ms_remaining = timeout_ms; 505 + struct timespec start; 506 + 507 + if (timeout_ms < 0 && timeout_ms != -1) 508 + return errno = EINVAL, NULL; 509 + 510 + if (timeout_ms != -1) { 511 + err = clock_gettime(CLOCK_MONOTONIC, &start); 512 + if (err) 513 + return NULL; 514 + } 515 + 516 + do { 517 + int cnt, ms_elapsed; 518 + struct timespec curr; 519 + __u64 ns_per_ms = 1000000; 520 + 521 + sample = user_ring_buffer__reserve(rb, size); 522 + if (sample) 523 + return sample; 524 + else if (errno != ENOSPC) 525 + return NULL; 526 + 527 + /* The kernel guarantees at least one event notification 528 + * delivery whenever at least one sample is drained from the 529 + * ring buffer in an invocation to bpf_ringbuf_drain(). Other 530 + * additional events may be delivered at any time, but only one 531 + * event is guaranteed per bpf_ringbuf_drain() invocation, 532 + * provided that a sample is drained, and the BPF program did 533 + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If 534 + * BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a 535 + * wakeup event will be delivered even if no samples are 536 + * drained. 537 + */ 538 + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); 539 + if (cnt < 0) 540 + return NULL; 541 + 542 + if (timeout_ms == -1) 543 + continue; 544 + 545 + err = clock_gettime(CLOCK_MONOTONIC, &curr); 546 + if (err) 547 + return NULL; 548 + 549 + ms_elapsed = ns_elapsed_timespec(&start, &curr) / ns_per_ms; 550 + ms_remaining = timeout_ms - ms_elapsed; 551 + } while (ms_remaining > 0); 552 + 553 + /* Try one more time to reserve a sample after the specified timeout has elapsed. */ 554 + return user_ring_buffer__reserve(rb, size); 320 555 }