at v5.3 17 kB view raw
1/* SPDX-License-Identifier: GPL-2.0-or-later */ 2/* 3 * Definitions for the 'struct ptr_ring' datastructure. 4 * 5 * Author: 6 * Michael S. Tsirkin <mst@redhat.com> 7 * 8 * Copyright (C) 2016 Red Hat, Inc. 9 * 10 * This is a limited-size FIFO maintaining pointers in FIFO order, with 11 * one CPU producing entries and another consuming entries from a FIFO. 12 * 13 * This implementation tries to minimize cache-contention when there is a 14 * single producer and a single consumer CPU. 15 */ 16 17#ifndef _LINUX_PTR_RING_H 18#define _LINUX_PTR_RING_H 1 19 20#ifdef __KERNEL__ 21#include <linux/spinlock.h> 22#include <linux/cache.h> 23#include <linux/types.h> 24#include <linux/compiler.h> 25#include <linux/slab.h> 26#include <asm/errno.h> 27#endif 28 29struct ptr_ring { 30 int producer ____cacheline_aligned_in_smp; 31 spinlock_t producer_lock; 32 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 33 int consumer_tail; /* next entry to invalidate */ 34 spinlock_t consumer_lock; 35 /* Shared consumer/producer data */ 36 /* Read-only by both the producer and the consumer */ 37 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 38 int batch; /* number of entries to consume in a batch */ 39 void **queue; 40}; 41 42/* Note: callers invoking this in a loop must use a compiler barrier, 43 * for example cpu_relax(). 44 * 45 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: 46 * see e.g. ptr_ring_full. 47 */ 48static inline bool __ptr_ring_full(struct ptr_ring *r) 49{ 50 return r->queue[r->producer]; 51} 52 53static inline bool ptr_ring_full(struct ptr_ring *r) 54{ 55 bool ret; 56 57 spin_lock(&r->producer_lock); 58 ret = __ptr_ring_full(r); 59 spin_unlock(&r->producer_lock); 60 61 return ret; 62} 63 64static inline bool ptr_ring_full_irq(struct ptr_ring *r) 65{ 66 bool ret; 67 68 spin_lock_irq(&r->producer_lock); 69 ret = __ptr_ring_full(r); 70 spin_unlock_irq(&r->producer_lock); 71 72 return ret; 73} 74 75static inline bool ptr_ring_full_any(struct ptr_ring *r) 76{ 77 unsigned long flags; 78 bool ret; 79 80 spin_lock_irqsave(&r->producer_lock, flags); 81 ret = __ptr_ring_full(r); 82 spin_unlock_irqrestore(&r->producer_lock, flags); 83 84 return ret; 85} 86 87static inline bool ptr_ring_full_bh(struct ptr_ring *r) 88{ 89 bool ret; 90 91 spin_lock_bh(&r->producer_lock); 92 ret = __ptr_ring_full(r); 93 spin_unlock_bh(&r->producer_lock); 94 95 return ret; 96} 97 98/* Note: callers invoking this in a loop must use a compiler barrier, 99 * for example cpu_relax(). Callers must hold producer_lock. 100 * Callers are responsible for making sure pointer that is being queued 101 * points to a valid data. 102 */ 103static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 104{ 105 if (unlikely(!r->size) || r->queue[r->producer]) 106 return -ENOSPC; 107 108 /* Make sure the pointer we are storing points to a valid data. */ 109 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 110 smp_wmb(); 111 112 WRITE_ONCE(r->queue[r->producer++], ptr); 113 if (unlikely(r->producer >= r->size)) 114 r->producer = 0; 115 return 0; 116} 117 118/* 119 * Note: resize (below) nests producer lock within consumer lock, so if you 120 * consume in interrupt or BH context, you must disable interrupts/BH when 121 * calling this. 122 */ 123static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 124{ 125 int ret; 126 127 spin_lock(&r->producer_lock); 128 ret = __ptr_ring_produce(r, ptr); 129 spin_unlock(&r->producer_lock); 130 131 return ret; 132} 133 134static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 135{ 136 int ret; 137 138 spin_lock_irq(&r->producer_lock); 139 ret = __ptr_ring_produce(r, ptr); 140 spin_unlock_irq(&r->producer_lock); 141 142 return ret; 143} 144 145static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 146{ 147 unsigned long flags; 148 int ret; 149 150 spin_lock_irqsave(&r->producer_lock, flags); 151 ret = __ptr_ring_produce(r, ptr); 152 spin_unlock_irqrestore(&r->producer_lock, flags); 153 154 return ret; 155} 156 157static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 158{ 159 int ret; 160 161 spin_lock_bh(&r->producer_lock); 162 ret = __ptr_ring_produce(r, ptr); 163 spin_unlock_bh(&r->producer_lock); 164 165 return ret; 166} 167 168static inline void *__ptr_ring_peek(struct ptr_ring *r) 169{ 170 if (likely(r->size)) 171 return READ_ONCE(r->queue[r->consumer_head]); 172 return NULL; 173} 174 175/* 176 * Test ring empty status without taking any locks. 177 * 178 * NB: This is only safe to call if ring is never resized. 179 * 180 * However, if some other CPU consumes ring entries at the same time, the value 181 * returned is not guaranteed to be correct. 182 * 183 * In this case - to avoid incorrectly detecting the ring 184 * as empty - the CPU consuming the ring entries is responsible 185 * for either consuming all ring entries until the ring is empty, 186 * or synchronizing with some other CPU and causing it to 187 * re-test __ptr_ring_empty and/or consume the ring enteries 188 * after the synchronization point. 189 * 190 * Note: callers invoking this in a loop must use a compiler barrier, 191 * for example cpu_relax(). 192 */ 193static inline bool __ptr_ring_empty(struct ptr_ring *r) 194{ 195 if (likely(r->size)) 196 return !r->queue[READ_ONCE(r->consumer_head)]; 197 return true; 198} 199 200static inline bool ptr_ring_empty(struct ptr_ring *r) 201{ 202 bool ret; 203 204 spin_lock(&r->consumer_lock); 205 ret = __ptr_ring_empty(r); 206 spin_unlock(&r->consumer_lock); 207 208 return ret; 209} 210 211static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 212{ 213 bool ret; 214 215 spin_lock_irq(&r->consumer_lock); 216 ret = __ptr_ring_empty(r); 217 spin_unlock_irq(&r->consumer_lock); 218 219 return ret; 220} 221 222static inline bool ptr_ring_empty_any(struct ptr_ring *r) 223{ 224 unsigned long flags; 225 bool ret; 226 227 spin_lock_irqsave(&r->consumer_lock, flags); 228 ret = __ptr_ring_empty(r); 229 spin_unlock_irqrestore(&r->consumer_lock, flags); 230 231 return ret; 232} 233 234static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 235{ 236 bool ret; 237 238 spin_lock_bh(&r->consumer_lock); 239 ret = __ptr_ring_empty(r); 240 spin_unlock_bh(&r->consumer_lock); 241 242 return ret; 243} 244 245/* Must only be called after __ptr_ring_peek returned !NULL */ 246static inline void __ptr_ring_discard_one(struct ptr_ring *r) 247{ 248 /* Fundamentally, what we want to do is update consumer 249 * index and zero out the entry so producer can reuse it. 250 * Doing it naively at each consume would be as simple as: 251 * consumer = r->consumer; 252 * r->queue[consumer++] = NULL; 253 * if (unlikely(consumer >= r->size)) 254 * consumer = 0; 255 * r->consumer = consumer; 256 * but that is suboptimal when the ring is full as producer is writing 257 * out new entries in the same cache line. Defer these updates until a 258 * batch of entries has been consumed. 259 */ 260 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 261 * to work correctly. 262 */ 263 int consumer_head = r->consumer_head; 264 int head = consumer_head++; 265 266 /* Once we have processed enough entries invalidate them in 267 * the ring all at once so producer can reuse their space in the ring. 268 * We also do this when we reach end of the ring - not mandatory 269 * but helps keep the implementation simple. 270 */ 271 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 272 consumer_head >= r->size)) { 273 /* Zero out entries in the reverse order: this way we touch the 274 * cache line that producer might currently be reading the last; 275 * producer won't make progress and touch other cache lines 276 * besides the first one until we write out all entries. 277 */ 278 while (likely(head >= r->consumer_tail)) 279 r->queue[head--] = NULL; 280 r->consumer_tail = consumer_head; 281 } 282 if (unlikely(consumer_head >= r->size)) { 283 consumer_head = 0; 284 r->consumer_tail = 0; 285 } 286 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 287 WRITE_ONCE(r->consumer_head, consumer_head); 288} 289 290static inline void *__ptr_ring_consume(struct ptr_ring *r) 291{ 292 void *ptr; 293 294 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone 295 * accessing data through the pointer is up to date. Pairs 296 * with smp_wmb in __ptr_ring_produce. 297 */ 298 ptr = __ptr_ring_peek(r); 299 if (ptr) 300 __ptr_ring_discard_one(r); 301 302 return ptr; 303} 304 305static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 306 void **array, int n) 307{ 308 void *ptr; 309 int i; 310 311 for (i = 0; i < n; i++) { 312 ptr = __ptr_ring_consume(r); 313 if (!ptr) 314 break; 315 array[i] = ptr; 316 } 317 318 return i; 319} 320 321/* 322 * Note: resize (below) nests producer lock within consumer lock, so if you 323 * call this in interrupt or BH context, you must disable interrupts/BH when 324 * producing. 325 */ 326static inline void *ptr_ring_consume(struct ptr_ring *r) 327{ 328 void *ptr; 329 330 spin_lock(&r->consumer_lock); 331 ptr = __ptr_ring_consume(r); 332 spin_unlock(&r->consumer_lock); 333 334 return ptr; 335} 336 337static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 338{ 339 void *ptr; 340 341 spin_lock_irq(&r->consumer_lock); 342 ptr = __ptr_ring_consume(r); 343 spin_unlock_irq(&r->consumer_lock); 344 345 return ptr; 346} 347 348static inline void *ptr_ring_consume_any(struct ptr_ring *r) 349{ 350 unsigned long flags; 351 void *ptr; 352 353 spin_lock_irqsave(&r->consumer_lock, flags); 354 ptr = __ptr_ring_consume(r); 355 spin_unlock_irqrestore(&r->consumer_lock, flags); 356 357 return ptr; 358} 359 360static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 361{ 362 void *ptr; 363 364 spin_lock_bh(&r->consumer_lock); 365 ptr = __ptr_ring_consume(r); 366 spin_unlock_bh(&r->consumer_lock); 367 368 return ptr; 369} 370 371static inline int ptr_ring_consume_batched(struct ptr_ring *r, 372 void **array, int n) 373{ 374 int ret; 375 376 spin_lock(&r->consumer_lock); 377 ret = __ptr_ring_consume_batched(r, array, n); 378 spin_unlock(&r->consumer_lock); 379 380 return ret; 381} 382 383static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 384 void **array, int n) 385{ 386 int ret; 387 388 spin_lock_irq(&r->consumer_lock); 389 ret = __ptr_ring_consume_batched(r, array, n); 390 spin_unlock_irq(&r->consumer_lock); 391 392 return ret; 393} 394 395static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 396 void **array, int n) 397{ 398 unsigned long flags; 399 int ret; 400 401 spin_lock_irqsave(&r->consumer_lock, flags); 402 ret = __ptr_ring_consume_batched(r, array, n); 403 spin_unlock_irqrestore(&r->consumer_lock, flags); 404 405 return ret; 406} 407 408static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 409 void **array, int n) 410{ 411 int ret; 412 413 spin_lock_bh(&r->consumer_lock); 414 ret = __ptr_ring_consume_batched(r, array, n); 415 spin_unlock_bh(&r->consumer_lock); 416 417 return ret; 418} 419 420/* Cast to structure type and call a function without discarding from FIFO. 421 * Function must return a value. 422 * Callers must take consumer_lock. 423 */ 424#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 425 426#define PTR_RING_PEEK_CALL(r, f) ({ \ 427 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 428 \ 429 spin_lock(&(r)->consumer_lock); \ 430 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 431 spin_unlock(&(r)->consumer_lock); \ 432 __PTR_RING_PEEK_CALL_v; \ 433}) 434 435#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 436 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 437 \ 438 spin_lock_irq(&(r)->consumer_lock); \ 439 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 440 spin_unlock_irq(&(r)->consumer_lock); \ 441 __PTR_RING_PEEK_CALL_v; \ 442}) 443 444#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 445 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 446 \ 447 spin_lock_bh(&(r)->consumer_lock); \ 448 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 449 spin_unlock_bh(&(r)->consumer_lock); \ 450 __PTR_RING_PEEK_CALL_v; \ 451}) 452 453#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 454 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 455 unsigned long __PTR_RING_PEEK_CALL_f;\ 456 \ 457 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 458 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 459 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 460 __PTR_RING_PEEK_CALL_v; \ 461}) 462 463/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See 464 * documentation for vmalloc for which of them are legal. 465 */ 466static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 467{ 468 if (size > KMALLOC_MAX_SIZE / sizeof(void *)) 469 return NULL; 470 return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO); 471} 472 473static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 474{ 475 r->size = size; 476 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 477 /* We need to set batch at least to 1 to make logic 478 * in __ptr_ring_discard_one work correctly. 479 * Batching too much (because ring is small) would cause a lot of 480 * burstiness. Needs tuning, for now disable batching. 481 */ 482 if (r->batch > r->size / 2 || !r->batch) 483 r->batch = 1; 484} 485 486static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 487{ 488 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 489 if (!r->queue) 490 return -ENOMEM; 491 492 __ptr_ring_set_size(r, size); 493 r->producer = r->consumer_head = r->consumer_tail = 0; 494 spin_lock_init(&r->producer_lock); 495 spin_lock_init(&r->consumer_lock); 496 497 return 0; 498} 499 500/* 501 * Return entries into ring. Destroy entries that don't fit. 502 * 503 * Note: this is expected to be a rare slow path operation. 504 * 505 * Note: producer lock is nested within consumer lock, so if you 506 * resize you must make sure all uses nest correctly. 507 * In particular if you consume ring in interrupt or BH context, you must 508 * disable interrupts/BH when doing so. 509 */ 510static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 511 void (*destroy)(void *)) 512{ 513 unsigned long flags; 514 int head; 515 516 spin_lock_irqsave(&r->consumer_lock, flags); 517 spin_lock(&r->producer_lock); 518 519 if (!r->size) 520 goto done; 521 522 /* 523 * Clean out buffered entries (for simplicity). This way following code 524 * can test entries for NULL and if not assume they are valid. 525 */ 526 head = r->consumer_head - 1; 527 while (likely(head >= r->consumer_tail)) 528 r->queue[head--] = NULL; 529 r->consumer_tail = r->consumer_head; 530 531 /* 532 * Go over entries in batch, start moving head back and copy entries. 533 * Stop when we run into previously unconsumed entries. 534 */ 535 while (n) { 536 head = r->consumer_head - 1; 537 if (head < 0) 538 head = r->size - 1; 539 if (r->queue[head]) { 540 /* This batch entry will have to be destroyed. */ 541 goto done; 542 } 543 r->queue[head] = batch[--n]; 544 r->consumer_tail = head; 545 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 546 WRITE_ONCE(r->consumer_head, head); 547 } 548 549done: 550 /* Destroy all entries left in the batch. */ 551 while (n) 552 destroy(batch[--n]); 553 spin_unlock(&r->producer_lock); 554 spin_unlock_irqrestore(&r->consumer_lock, flags); 555} 556 557static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 558 int size, gfp_t gfp, 559 void (*destroy)(void *)) 560{ 561 int producer = 0; 562 void **old; 563 void *ptr; 564 565 while ((ptr = __ptr_ring_consume(r))) 566 if (producer < size) 567 queue[producer++] = ptr; 568 else if (destroy) 569 destroy(ptr); 570 571 if (producer >= size) 572 producer = 0; 573 __ptr_ring_set_size(r, size); 574 r->producer = producer; 575 r->consumer_head = 0; 576 r->consumer_tail = 0; 577 old = r->queue; 578 r->queue = queue; 579 580 return old; 581} 582 583/* 584 * Note: producer lock is nested within consumer lock, so if you 585 * resize you must make sure all uses nest correctly. 586 * In particular if you consume ring in interrupt or BH context, you must 587 * disable interrupts/BH when doing so. 588 */ 589static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 590 void (*destroy)(void *)) 591{ 592 unsigned long flags; 593 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 594 void **old; 595 596 if (!queue) 597 return -ENOMEM; 598 599 spin_lock_irqsave(&(r)->consumer_lock, flags); 600 spin_lock(&(r)->producer_lock); 601 602 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 603 604 spin_unlock(&(r)->producer_lock); 605 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 606 607 kvfree(old); 608 609 return 0; 610} 611 612/* 613 * Note: producer lock is nested within consumer lock, so if you 614 * resize you must make sure all uses nest correctly. 615 * In particular if you consume ring in interrupt or BH context, you must 616 * disable interrupts/BH when doing so. 617 */ 618static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 619 unsigned int nrings, 620 int size, 621 gfp_t gfp, void (*destroy)(void *)) 622{ 623 unsigned long flags; 624 void ***queues; 625 int i; 626 627 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 628 if (!queues) 629 goto noqueues; 630 631 for (i = 0; i < nrings; ++i) { 632 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 633 if (!queues[i]) 634 goto nomem; 635 } 636 637 for (i = 0; i < nrings; ++i) { 638 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 639 spin_lock(&(rings[i])->producer_lock); 640 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 641 size, gfp, destroy); 642 spin_unlock(&(rings[i])->producer_lock); 643 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 644 } 645 646 for (i = 0; i < nrings; ++i) 647 kvfree(queues[i]); 648 649 kfree(queues); 650 651 return 0; 652 653nomem: 654 while (--i >= 0) 655 kvfree(queues[i]); 656 657 kfree(queues); 658 659noqueues: 660 return -ENOMEM; 661} 662 663static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 664{ 665 void *ptr; 666 667 if (destroy) 668 while ((ptr = ptr_ring_consume(r))) 669 destroy(ptr); 670 kvfree(r->queue); 671} 672 673#endif /* _LINUX_PTR_RING_H */