at v6.18 17 kB view raw
1/* SPDX-License-Identifier: GPL-2.0-or-later */ 2/* 3 * Definitions for the 'struct ptr_ring' datastructure. 4 * 5 * Author: 6 * Michael S. Tsirkin <mst@redhat.com> 7 * 8 * Copyright (C) 2016 Red Hat, Inc. 9 * 10 * This is a limited-size FIFO maintaining pointers in FIFO order, with 11 * one CPU producing entries and another consuming entries from a FIFO. 12 * 13 * This implementation tries to minimize cache-contention when there is a 14 * single producer and a single consumer CPU. 15 */ 16 17#ifndef _LINUX_PTR_RING_H 18#define _LINUX_PTR_RING_H 1 19 20#ifdef __KERNEL__ 21#include <linux/spinlock.h> 22#include <linux/cache.h> 23#include <linux/types.h> 24#include <linux/compiler.h> 25#include <linux/slab.h> 26#include <linux/mm.h> 27#include <asm/errno.h> 28#endif 29 30struct ptr_ring { 31 int producer ____cacheline_aligned_in_smp; 32 spinlock_t producer_lock; 33 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 34 int consumer_tail; /* next entry to invalidate */ 35 spinlock_t consumer_lock; 36 /* Shared consumer/producer data */ 37 /* Read-only by both the producer and the consumer */ 38 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 39 int batch; /* number of entries to consume in a batch */ 40 void **queue; 41}; 42 43/* Note: callers invoking this in a loop must use a compiler barrier, 44 * for example cpu_relax(). 45 * 46 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: 47 * see e.g. ptr_ring_full. 48 */ 49static inline bool __ptr_ring_full(struct ptr_ring *r) 50{ 51 return r->queue[r->producer]; 52} 53 54static inline bool ptr_ring_full(struct ptr_ring *r) 55{ 56 bool ret; 57 58 spin_lock(&r->producer_lock); 59 ret = __ptr_ring_full(r); 60 spin_unlock(&r->producer_lock); 61 62 return ret; 63} 64 65static inline bool ptr_ring_full_irq(struct ptr_ring *r) 66{ 67 bool ret; 68 69 spin_lock_irq(&r->producer_lock); 70 ret = __ptr_ring_full(r); 71 spin_unlock_irq(&r->producer_lock); 72 73 return ret; 74} 75 76static inline bool ptr_ring_full_any(struct ptr_ring *r) 77{ 78 unsigned long flags; 79 bool ret; 80 81 spin_lock_irqsave(&r->producer_lock, flags); 82 ret = __ptr_ring_full(r); 83 spin_unlock_irqrestore(&r->producer_lock, flags); 84 85 return ret; 86} 87 88static inline bool ptr_ring_full_bh(struct ptr_ring *r) 89{ 90 bool ret; 91 92 spin_lock_bh(&r->producer_lock); 93 ret = __ptr_ring_full(r); 94 spin_unlock_bh(&r->producer_lock); 95 96 return ret; 97} 98 99/* Note: callers invoking this in a loop must use a compiler barrier, 100 * for example cpu_relax(). Callers must hold producer_lock. 101 * Callers are responsible for making sure pointer that is being queued 102 * points to a valid data. 103 */ 104static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 105{ 106 if (unlikely(!r->size) || r->queue[r->producer]) 107 return -ENOSPC; 108 109 /* Make sure the pointer we are storing points to a valid data. */ 110 /* Pairs with the dependency ordering in __ptr_ring_consume. */ 111 smp_wmb(); 112 113 WRITE_ONCE(r->queue[r->producer++], ptr); 114 if (unlikely(r->producer >= r->size)) 115 r->producer = 0; 116 return 0; 117} 118 119/* 120 * Note: resize (below) nests producer lock within consumer lock, so if you 121 * consume in interrupt or BH context, you must disable interrupts/BH when 122 * calling this. 123 */ 124static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 125{ 126 int ret; 127 128 spin_lock(&r->producer_lock); 129 ret = __ptr_ring_produce(r, ptr); 130 spin_unlock(&r->producer_lock); 131 132 return ret; 133} 134 135static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 136{ 137 int ret; 138 139 spin_lock_irq(&r->producer_lock); 140 ret = __ptr_ring_produce(r, ptr); 141 spin_unlock_irq(&r->producer_lock); 142 143 return ret; 144} 145 146static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 147{ 148 unsigned long flags; 149 int ret; 150 151 spin_lock_irqsave(&r->producer_lock, flags); 152 ret = __ptr_ring_produce(r, ptr); 153 spin_unlock_irqrestore(&r->producer_lock, flags); 154 155 return ret; 156} 157 158static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 159{ 160 int ret; 161 162 spin_lock_bh(&r->producer_lock); 163 ret = __ptr_ring_produce(r, ptr); 164 spin_unlock_bh(&r->producer_lock); 165 166 return ret; 167} 168 169static inline void *__ptr_ring_peek(struct ptr_ring *r) 170{ 171 if (likely(r->size)) 172 return READ_ONCE(r->queue[r->consumer_head]); 173 return NULL; 174} 175 176/* 177 * Test ring empty status without taking any locks. 178 * 179 * NB: This is only safe to call if ring is never resized. 180 * 181 * However, if some other CPU consumes ring entries at the same time, the value 182 * returned is not guaranteed to be correct. 183 * 184 * In this case - to avoid incorrectly detecting the ring 185 * as empty - the CPU consuming the ring entries is responsible 186 * for either consuming all ring entries until the ring is empty, 187 * or synchronizing with some other CPU and causing it to 188 * re-test __ptr_ring_empty and/or consume the ring enteries 189 * after the synchronization point. 190 * 191 * Note: callers invoking this in a loop must use a compiler barrier, 192 * for example cpu_relax(). 193 */ 194static inline bool __ptr_ring_empty(struct ptr_ring *r) 195{ 196 if (likely(r->size)) 197 return !r->queue[READ_ONCE(r->consumer_head)]; 198 return true; 199} 200 201static inline bool ptr_ring_empty(struct ptr_ring *r) 202{ 203 bool ret; 204 205 spin_lock(&r->consumer_lock); 206 ret = __ptr_ring_empty(r); 207 spin_unlock(&r->consumer_lock); 208 209 return ret; 210} 211 212static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 213{ 214 bool ret; 215 216 spin_lock_irq(&r->consumer_lock); 217 ret = __ptr_ring_empty(r); 218 spin_unlock_irq(&r->consumer_lock); 219 220 return ret; 221} 222 223static inline bool ptr_ring_empty_any(struct ptr_ring *r) 224{ 225 unsigned long flags; 226 bool ret; 227 228 spin_lock_irqsave(&r->consumer_lock, flags); 229 ret = __ptr_ring_empty(r); 230 spin_unlock_irqrestore(&r->consumer_lock, flags); 231 232 return ret; 233} 234 235static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 236{ 237 bool ret; 238 239 spin_lock_bh(&r->consumer_lock); 240 ret = __ptr_ring_empty(r); 241 spin_unlock_bh(&r->consumer_lock); 242 243 return ret; 244} 245 246/* Zero entries from tail to specified head. 247 * NB: if consumer_head can be >= r->size need to fixup tail later. 248 */ 249static inline void __ptr_ring_zero_tail(struct ptr_ring *r, int consumer_head) 250{ 251 int head = consumer_head; 252 253 /* Zero out entries in the reverse order: this way we touch the 254 * cache line that producer might currently be reading the last; 255 * producer won't make progress and touch other cache lines 256 * besides the first one until we write out all entries. 257 */ 258 while (likely(head > r->consumer_tail)) 259 r->queue[--head] = NULL; 260 261 r->consumer_tail = consumer_head; 262} 263 264/* Must only be called after __ptr_ring_peek returned !NULL */ 265static inline void __ptr_ring_discard_one(struct ptr_ring *r) 266{ 267 /* Fundamentally, what we want to do is update consumer 268 * index and zero out the entry so producer can reuse it. 269 * Doing it naively at each consume would be as simple as: 270 * consumer = r->consumer; 271 * r->queue[consumer++] = NULL; 272 * if (unlikely(consumer >= r->size)) 273 * consumer = 0; 274 * r->consumer = consumer; 275 * but that is suboptimal when the ring is full as producer is writing 276 * out new entries in the same cache line. Defer these updates until a 277 * batch of entries has been consumed. 278 */ 279 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 280 * to work correctly. 281 */ 282 int consumer_head = r->consumer_head + 1; 283 284 /* Once we have processed enough entries invalidate them in 285 * the ring all at once so producer can reuse their space in the ring. 286 * We also do this when we reach end of the ring - not mandatory 287 * but helps keep the implementation simple. 288 */ 289 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 290 consumer_head >= r->size)) 291 __ptr_ring_zero_tail(r, consumer_head); 292 293 if (unlikely(consumer_head >= r->size)) { 294 consumer_head = 0; 295 r->consumer_tail = 0; 296 } 297 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 298 WRITE_ONCE(r->consumer_head, consumer_head); 299} 300 301static inline void *__ptr_ring_consume(struct ptr_ring *r) 302{ 303 void *ptr; 304 305 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone 306 * accessing data through the pointer is up to date. Pairs 307 * with smp_wmb in __ptr_ring_produce. 308 */ 309 ptr = __ptr_ring_peek(r); 310 if (ptr) 311 __ptr_ring_discard_one(r); 312 313 return ptr; 314} 315 316static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 317 void **array, int n) 318{ 319 void *ptr; 320 int i; 321 322 for (i = 0; i < n; i++) { 323 ptr = __ptr_ring_consume(r); 324 if (!ptr) 325 break; 326 array[i] = ptr; 327 } 328 329 return i; 330} 331 332/* 333 * Note: resize (below) nests producer lock within consumer lock, so if you 334 * call this in interrupt or BH context, you must disable interrupts/BH when 335 * producing. 336 */ 337static inline void *ptr_ring_consume(struct ptr_ring *r) 338{ 339 void *ptr; 340 341 spin_lock(&r->consumer_lock); 342 ptr = __ptr_ring_consume(r); 343 spin_unlock(&r->consumer_lock); 344 345 return ptr; 346} 347 348static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 349{ 350 void *ptr; 351 352 spin_lock_irq(&r->consumer_lock); 353 ptr = __ptr_ring_consume(r); 354 spin_unlock_irq(&r->consumer_lock); 355 356 return ptr; 357} 358 359static inline void *ptr_ring_consume_any(struct ptr_ring *r) 360{ 361 unsigned long flags; 362 void *ptr; 363 364 spin_lock_irqsave(&r->consumer_lock, flags); 365 ptr = __ptr_ring_consume(r); 366 spin_unlock_irqrestore(&r->consumer_lock, flags); 367 368 return ptr; 369} 370 371static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 372{ 373 void *ptr; 374 375 spin_lock_bh(&r->consumer_lock); 376 ptr = __ptr_ring_consume(r); 377 spin_unlock_bh(&r->consumer_lock); 378 379 return ptr; 380} 381 382static inline int ptr_ring_consume_batched(struct ptr_ring *r, 383 void **array, int n) 384{ 385 int ret; 386 387 spin_lock(&r->consumer_lock); 388 ret = __ptr_ring_consume_batched(r, array, n); 389 spin_unlock(&r->consumer_lock); 390 391 return ret; 392} 393 394static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 395 void **array, int n) 396{ 397 int ret; 398 399 spin_lock_irq(&r->consumer_lock); 400 ret = __ptr_ring_consume_batched(r, array, n); 401 spin_unlock_irq(&r->consumer_lock); 402 403 return ret; 404} 405 406static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 407 void **array, int n) 408{ 409 unsigned long flags; 410 int ret; 411 412 spin_lock_irqsave(&r->consumer_lock, flags); 413 ret = __ptr_ring_consume_batched(r, array, n); 414 spin_unlock_irqrestore(&r->consumer_lock, flags); 415 416 return ret; 417} 418 419static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 420 void **array, int n) 421{ 422 int ret; 423 424 spin_lock_bh(&r->consumer_lock); 425 ret = __ptr_ring_consume_batched(r, array, n); 426 spin_unlock_bh(&r->consumer_lock); 427 428 return ret; 429} 430 431/* Cast to structure type and call a function without discarding from FIFO. 432 * Function must return a value. 433 * Callers must take consumer_lock. 434 */ 435#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 436 437#define PTR_RING_PEEK_CALL(r, f) ({ \ 438 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 439 \ 440 spin_lock(&(r)->consumer_lock); \ 441 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 442 spin_unlock(&(r)->consumer_lock); \ 443 __PTR_RING_PEEK_CALL_v; \ 444}) 445 446#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 447 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 448 \ 449 spin_lock_irq(&(r)->consumer_lock); \ 450 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 451 spin_unlock_irq(&(r)->consumer_lock); \ 452 __PTR_RING_PEEK_CALL_v; \ 453}) 454 455#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 456 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 457 \ 458 spin_lock_bh(&(r)->consumer_lock); \ 459 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 460 spin_unlock_bh(&(r)->consumer_lock); \ 461 __PTR_RING_PEEK_CALL_v; \ 462}) 463 464#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 465 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 466 unsigned long __PTR_RING_PEEK_CALL_f;\ 467 \ 468 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 469 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 470 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 471 __PTR_RING_PEEK_CALL_v; \ 472}) 473 474/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See 475 * documentation for vmalloc for which of them are legal. 476 */ 477static inline void **__ptr_ring_init_queue_alloc_noprof(unsigned int size, gfp_t gfp) 478{ 479 if (size > KMALLOC_MAX_SIZE / sizeof(void *)) 480 return NULL; 481 return kvmalloc_array_noprof(size, sizeof(void *), gfp | __GFP_ZERO); 482} 483 484static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 485{ 486 r->size = size; 487 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 488 /* We need to set batch at least to 1 to make logic 489 * in __ptr_ring_discard_one work correctly. 490 * Batching too much (because ring is small) would cause a lot of 491 * burstiness. Needs tuning, for now disable batching. 492 */ 493 if (r->batch > r->size / 2 || !r->batch) 494 r->batch = 1; 495} 496 497static inline int ptr_ring_init_noprof(struct ptr_ring *r, int size, gfp_t gfp) 498{ 499 r->queue = __ptr_ring_init_queue_alloc_noprof(size, gfp); 500 if (!r->queue) 501 return -ENOMEM; 502 503 __ptr_ring_set_size(r, size); 504 r->producer = r->consumer_head = r->consumer_tail = 0; 505 spin_lock_init(&r->producer_lock); 506 spin_lock_init(&r->consumer_lock); 507 508 return 0; 509} 510#define ptr_ring_init(...) alloc_hooks(ptr_ring_init_noprof(__VA_ARGS__)) 511 512/* 513 * Return entries into ring. Destroy entries that don't fit. 514 * 515 * Note: this is expected to be a rare slow path operation. 516 * 517 * Note: producer lock is nested within consumer lock, so if you 518 * resize you must make sure all uses nest correctly. 519 * In particular if you consume ring in interrupt or BH context, you must 520 * disable interrupts/BH when doing so. 521 */ 522static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 523 void (*destroy)(void *)) 524{ 525 unsigned long flags; 526 527 spin_lock_irqsave(&r->consumer_lock, flags); 528 spin_lock(&r->producer_lock); 529 530 if (!r->size) 531 goto done; 532 533 /* 534 * Clean out buffered entries (for simplicity). This way following code 535 * can test entries for NULL and if not assume they are valid. 536 */ 537 __ptr_ring_zero_tail(r, r->consumer_head); 538 539 /* 540 * Go over entries in batch, start moving head back and copy entries. 541 * Stop when we run into previously unconsumed entries. 542 */ 543 while (n) { 544 int head = r->consumer_head - 1; 545 if (head < 0) 546 head = r->size - 1; 547 if (r->queue[head]) { 548 /* This batch entry will have to be destroyed. */ 549 goto done; 550 } 551 r->queue[head] = batch[--n]; 552 r->consumer_tail = head; 553 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 554 WRITE_ONCE(r->consumer_head, head); 555 } 556 557done: 558 /* Destroy all entries left in the batch. */ 559 while (n) 560 destroy(batch[--n]); 561 spin_unlock(&r->producer_lock); 562 spin_unlock_irqrestore(&r->consumer_lock, flags); 563} 564 565static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 566 int size, gfp_t gfp, 567 void (*destroy)(void *)) 568{ 569 int producer = 0; 570 void **old; 571 void *ptr; 572 573 while ((ptr = __ptr_ring_consume(r))) 574 if (producer < size) 575 queue[producer++] = ptr; 576 else if (destroy) 577 destroy(ptr); 578 579 if (producer >= size) 580 producer = 0; 581 __ptr_ring_set_size(r, size); 582 r->producer = producer; 583 r->consumer_head = 0; 584 r->consumer_tail = 0; 585 old = r->queue; 586 r->queue = queue; 587 588 return old; 589} 590 591/* 592 * Note: producer lock is nested within consumer lock, so if you 593 * resize you must make sure all uses nest correctly. 594 * In particular if you consume ring in interrupt or BH context, you must 595 * disable interrupts/BH when doing so. 596 */ 597static inline int ptr_ring_resize_noprof(struct ptr_ring *r, int size, gfp_t gfp, 598 void (*destroy)(void *)) 599{ 600 unsigned long flags; 601 void **queue = __ptr_ring_init_queue_alloc_noprof(size, gfp); 602 void **old; 603 604 if (!queue) 605 return -ENOMEM; 606 607 spin_lock_irqsave(&(r)->consumer_lock, flags); 608 spin_lock(&(r)->producer_lock); 609 610 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 611 612 spin_unlock(&(r)->producer_lock); 613 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 614 615 kvfree(old); 616 617 return 0; 618} 619#define ptr_ring_resize(...) alloc_hooks(ptr_ring_resize_noprof(__VA_ARGS__)) 620 621/* 622 * Note: producer lock is nested within consumer lock, so if you 623 * resize you must make sure all uses nest correctly. 624 * In particular if you consume ring in BH context, you must 625 * disable BH when doing so. 626 */ 627static inline int ptr_ring_resize_multiple_bh_noprof(struct ptr_ring **rings, 628 unsigned int nrings, 629 int size, gfp_t gfp, 630 void (*destroy)(void *)) 631{ 632 void ***queues; 633 int i; 634 635 queues = kmalloc_array_noprof(nrings, sizeof(*queues), gfp); 636 if (!queues) 637 goto noqueues; 638 639 for (i = 0; i < nrings; ++i) { 640 queues[i] = __ptr_ring_init_queue_alloc_noprof(size, gfp); 641 if (!queues[i]) 642 goto nomem; 643 } 644 645 for (i = 0; i < nrings; ++i) { 646 spin_lock_bh(&(rings[i])->consumer_lock); 647 spin_lock(&(rings[i])->producer_lock); 648 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 649 size, gfp, destroy); 650 spin_unlock(&(rings[i])->producer_lock); 651 spin_unlock_bh(&(rings[i])->consumer_lock); 652 } 653 654 for (i = 0; i < nrings; ++i) 655 kvfree(queues[i]); 656 657 kfree(queues); 658 659 return 0; 660 661nomem: 662 while (--i >= 0) 663 kvfree(queues[i]); 664 665 kfree(queues); 666 667noqueues: 668 return -ENOMEM; 669} 670#define ptr_ring_resize_multiple_bh(...) \ 671 alloc_hooks(ptr_ring_resize_multiple_bh_noprof(__VA_ARGS__)) 672 673static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 674{ 675 void *ptr; 676 677 if (destroy) 678 while ((ptr = ptr_ring_consume(r))) 679 destroy(ptr); 680 kvfree(r->queue); 681} 682 683#endif /* _LINUX_PTR_RING_H */