at v5.2-rc2 17 kB view raw
1/* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <mst@redhat.com> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21#ifndef _LINUX_PTR_RING_H 22#define _LINUX_PTR_RING_H 1 23 24#ifdef __KERNEL__ 25#include <linux/spinlock.h> 26#include <linux/cache.h> 27#include <linux/types.h> 28#include <linux/compiler.h> 29#include <linux/slab.h> 30#include <asm/errno.h> 31#endif 32 33struct ptr_ring { 34 int producer ____cacheline_aligned_in_smp; 35 spinlock_t producer_lock; 36 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 37 int consumer_tail; /* next entry to invalidate */ 38 spinlock_t consumer_lock; 39 /* Shared consumer/producer data */ 40 /* Read-only by both the producer and the consumer */ 41 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 42 int batch; /* number of entries to consume in a batch */ 43 void **queue; 44}; 45 46/* Note: callers invoking this in a loop must use a compiler barrier, 47 * for example cpu_relax(). 48 * 49 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: 50 * see e.g. ptr_ring_full. 51 */ 52static inline bool __ptr_ring_full(struct ptr_ring *r) 53{ 54 return r->queue[r->producer]; 55} 56 57static inline bool ptr_ring_full(struct ptr_ring *r) 58{ 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66} 67 68static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69{ 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77} 78 79static inline bool ptr_ring_full_any(struct ptr_ring *r) 80{ 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89} 90 91static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92{ 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100} 101 102/* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 * Callers are responsible for making sure pointer that is being queued 105 * points to a valid data. 106 */ 107static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 108{ 109 if (unlikely(!r->size) || r->queue[r->producer]) 110 return -ENOSPC; 111 112 /* Make sure the pointer we are storing points to a valid data. */ 113 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 114 smp_wmb(); 115 116 WRITE_ONCE(r->queue[r->producer++], ptr); 117 if (unlikely(r->producer >= r->size)) 118 r->producer = 0; 119 return 0; 120} 121 122/* 123 * Note: resize (below) nests producer lock within consumer lock, so if you 124 * consume in interrupt or BH context, you must disable interrupts/BH when 125 * calling this. 126 */ 127static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 128{ 129 int ret; 130 131 spin_lock(&r->producer_lock); 132 ret = __ptr_ring_produce(r, ptr); 133 spin_unlock(&r->producer_lock); 134 135 return ret; 136} 137 138static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 139{ 140 int ret; 141 142 spin_lock_irq(&r->producer_lock); 143 ret = __ptr_ring_produce(r, ptr); 144 spin_unlock_irq(&r->producer_lock); 145 146 return ret; 147} 148 149static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 150{ 151 unsigned long flags; 152 int ret; 153 154 spin_lock_irqsave(&r->producer_lock, flags); 155 ret = __ptr_ring_produce(r, ptr); 156 spin_unlock_irqrestore(&r->producer_lock, flags); 157 158 return ret; 159} 160 161static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 162{ 163 int ret; 164 165 spin_lock_bh(&r->producer_lock); 166 ret = __ptr_ring_produce(r, ptr); 167 spin_unlock_bh(&r->producer_lock); 168 169 return ret; 170} 171 172static inline void *__ptr_ring_peek(struct ptr_ring *r) 173{ 174 if (likely(r->size)) 175 return READ_ONCE(r->queue[r->consumer_head]); 176 return NULL; 177} 178 179/* 180 * Test ring empty status without taking any locks. 181 * 182 * NB: This is only safe to call if ring is never resized. 183 * 184 * However, if some other CPU consumes ring entries at the same time, the value 185 * returned is not guaranteed to be correct. 186 * 187 * In this case - to avoid incorrectly detecting the ring 188 * as empty - the CPU consuming the ring entries is responsible 189 * for either consuming all ring entries until the ring is empty, 190 * or synchronizing with some other CPU and causing it to 191 * re-test __ptr_ring_empty and/or consume the ring enteries 192 * after the synchronization point. 193 * 194 * Note: callers invoking this in a loop must use a compiler barrier, 195 * for example cpu_relax(). 196 */ 197static inline bool __ptr_ring_empty(struct ptr_ring *r) 198{ 199 if (likely(r->size)) 200 return !r->queue[READ_ONCE(r->consumer_head)]; 201 return true; 202} 203 204static inline bool ptr_ring_empty(struct ptr_ring *r) 205{ 206 bool ret; 207 208 spin_lock(&r->consumer_lock); 209 ret = __ptr_ring_empty(r); 210 spin_unlock(&r->consumer_lock); 211 212 return ret; 213} 214 215static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 216{ 217 bool ret; 218 219 spin_lock_irq(&r->consumer_lock); 220 ret = __ptr_ring_empty(r); 221 spin_unlock_irq(&r->consumer_lock); 222 223 return ret; 224} 225 226static inline bool ptr_ring_empty_any(struct ptr_ring *r) 227{ 228 unsigned long flags; 229 bool ret; 230 231 spin_lock_irqsave(&r->consumer_lock, flags); 232 ret = __ptr_ring_empty(r); 233 spin_unlock_irqrestore(&r->consumer_lock, flags); 234 235 return ret; 236} 237 238static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 239{ 240 bool ret; 241 242 spin_lock_bh(&r->consumer_lock); 243 ret = __ptr_ring_empty(r); 244 spin_unlock_bh(&r->consumer_lock); 245 246 return ret; 247} 248 249/* Must only be called after __ptr_ring_peek returned !NULL */ 250static inline void __ptr_ring_discard_one(struct ptr_ring *r) 251{ 252 /* Fundamentally, what we want to do is update consumer 253 * index and zero out the entry so producer can reuse it. 254 * Doing it naively at each consume would be as simple as: 255 * consumer = r->consumer; 256 * r->queue[consumer++] = NULL; 257 * if (unlikely(consumer >= r->size)) 258 * consumer = 0; 259 * r->consumer = consumer; 260 * but that is suboptimal when the ring is full as producer is writing 261 * out new entries in the same cache line. Defer these updates until a 262 * batch of entries has been consumed. 263 */ 264 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 265 * to work correctly. 266 */ 267 int consumer_head = r->consumer_head; 268 int head = consumer_head++; 269 270 /* Once we have processed enough entries invalidate them in 271 * the ring all at once so producer can reuse their space in the ring. 272 * We also do this when we reach end of the ring - not mandatory 273 * but helps keep the implementation simple. 274 */ 275 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 276 consumer_head >= r->size)) { 277 /* Zero out entries in the reverse order: this way we touch the 278 * cache line that producer might currently be reading the last; 279 * producer won't make progress and touch other cache lines 280 * besides the first one until we write out all entries. 281 */ 282 while (likely(head >= r->consumer_tail)) 283 r->queue[head--] = NULL; 284 r->consumer_tail = consumer_head; 285 } 286 if (unlikely(consumer_head >= r->size)) { 287 consumer_head = 0; 288 r->consumer_tail = 0; 289 } 290 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 291 WRITE_ONCE(r->consumer_head, consumer_head); 292} 293 294static inline void *__ptr_ring_consume(struct ptr_ring *r) 295{ 296 void *ptr; 297 298 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone 299 * accessing data through the pointer is up to date. Pairs 300 * with smp_wmb in __ptr_ring_produce. 301 */ 302 ptr = __ptr_ring_peek(r); 303 if (ptr) 304 __ptr_ring_discard_one(r); 305 306 return ptr; 307} 308 309static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 310 void **array, int n) 311{ 312 void *ptr; 313 int i; 314 315 for (i = 0; i < n; i++) { 316 ptr = __ptr_ring_consume(r); 317 if (!ptr) 318 break; 319 array[i] = ptr; 320 } 321 322 return i; 323} 324 325/* 326 * Note: resize (below) nests producer lock within consumer lock, so if you 327 * call this in interrupt or BH context, you must disable interrupts/BH when 328 * producing. 329 */ 330static inline void *ptr_ring_consume(struct ptr_ring *r) 331{ 332 void *ptr; 333 334 spin_lock(&r->consumer_lock); 335 ptr = __ptr_ring_consume(r); 336 spin_unlock(&r->consumer_lock); 337 338 return ptr; 339} 340 341static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 342{ 343 void *ptr; 344 345 spin_lock_irq(&r->consumer_lock); 346 ptr = __ptr_ring_consume(r); 347 spin_unlock_irq(&r->consumer_lock); 348 349 return ptr; 350} 351 352static inline void *ptr_ring_consume_any(struct ptr_ring *r) 353{ 354 unsigned long flags; 355 void *ptr; 356 357 spin_lock_irqsave(&r->consumer_lock, flags); 358 ptr = __ptr_ring_consume(r); 359 spin_unlock_irqrestore(&r->consumer_lock, flags); 360 361 return ptr; 362} 363 364static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 365{ 366 void *ptr; 367 368 spin_lock_bh(&r->consumer_lock); 369 ptr = __ptr_ring_consume(r); 370 spin_unlock_bh(&r->consumer_lock); 371 372 return ptr; 373} 374 375static inline int ptr_ring_consume_batched(struct ptr_ring *r, 376 void **array, int n) 377{ 378 int ret; 379 380 spin_lock(&r->consumer_lock); 381 ret = __ptr_ring_consume_batched(r, array, n); 382 spin_unlock(&r->consumer_lock); 383 384 return ret; 385} 386 387static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 388 void **array, int n) 389{ 390 int ret; 391 392 spin_lock_irq(&r->consumer_lock); 393 ret = __ptr_ring_consume_batched(r, array, n); 394 spin_unlock_irq(&r->consumer_lock); 395 396 return ret; 397} 398 399static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 400 void **array, int n) 401{ 402 unsigned long flags; 403 int ret; 404 405 spin_lock_irqsave(&r->consumer_lock, flags); 406 ret = __ptr_ring_consume_batched(r, array, n); 407 spin_unlock_irqrestore(&r->consumer_lock, flags); 408 409 return ret; 410} 411 412static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 413 void **array, int n) 414{ 415 int ret; 416 417 spin_lock_bh(&r->consumer_lock); 418 ret = __ptr_ring_consume_batched(r, array, n); 419 spin_unlock_bh(&r->consumer_lock); 420 421 return ret; 422} 423 424/* Cast to structure type and call a function without discarding from FIFO. 425 * Function must return a value. 426 * Callers must take consumer_lock. 427 */ 428#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 429 430#define PTR_RING_PEEK_CALL(r, f) ({ \ 431 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 432 \ 433 spin_lock(&(r)->consumer_lock); \ 434 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 435 spin_unlock(&(r)->consumer_lock); \ 436 __PTR_RING_PEEK_CALL_v; \ 437}) 438 439#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 440 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 441 \ 442 spin_lock_irq(&(r)->consumer_lock); \ 443 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 444 spin_unlock_irq(&(r)->consumer_lock); \ 445 __PTR_RING_PEEK_CALL_v; \ 446}) 447 448#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 449 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 450 \ 451 spin_lock_bh(&(r)->consumer_lock); \ 452 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 453 spin_unlock_bh(&(r)->consumer_lock); \ 454 __PTR_RING_PEEK_CALL_v; \ 455}) 456 457#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 458 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 459 unsigned long __PTR_RING_PEEK_CALL_f;\ 460 \ 461 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 462 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 463 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 464 __PTR_RING_PEEK_CALL_v; \ 465}) 466 467/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See 468 * documentation for vmalloc for which of them are legal. 469 */ 470static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 471{ 472 if (size > KMALLOC_MAX_SIZE / sizeof(void *)) 473 return NULL; 474 return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO); 475} 476 477static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 478{ 479 r->size = size; 480 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 481 /* We need to set batch at least to 1 to make logic 482 * in __ptr_ring_discard_one work correctly. 483 * Batching too much (because ring is small) would cause a lot of 484 * burstiness. Needs tuning, for now disable batching. 485 */ 486 if (r->batch > r->size / 2 || !r->batch) 487 r->batch = 1; 488} 489 490static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 491{ 492 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 493 if (!r->queue) 494 return -ENOMEM; 495 496 __ptr_ring_set_size(r, size); 497 r->producer = r->consumer_head = r->consumer_tail = 0; 498 spin_lock_init(&r->producer_lock); 499 spin_lock_init(&r->consumer_lock); 500 501 return 0; 502} 503 504/* 505 * Return entries into ring. Destroy entries that don't fit. 506 * 507 * Note: this is expected to be a rare slow path operation. 508 * 509 * Note: producer lock is nested within consumer lock, so if you 510 * resize you must make sure all uses nest correctly. 511 * In particular if you consume ring in interrupt or BH context, you must 512 * disable interrupts/BH when doing so. 513 */ 514static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 515 void (*destroy)(void *)) 516{ 517 unsigned long flags; 518 int head; 519 520 spin_lock_irqsave(&r->consumer_lock, flags); 521 spin_lock(&r->producer_lock); 522 523 if (!r->size) 524 goto done; 525 526 /* 527 * Clean out buffered entries (for simplicity). This way following code 528 * can test entries for NULL and if not assume they are valid. 529 */ 530 head = r->consumer_head - 1; 531 while (likely(head >= r->consumer_tail)) 532 r->queue[head--] = NULL; 533 r->consumer_tail = r->consumer_head; 534 535 /* 536 * Go over entries in batch, start moving head back and copy entries. 537 * Stop when we run into previously unconsumed entries. 538 */ 539 while (n) { 540 head = r->consumer_head - 1; 541 if (head < 0) 542 head = r->size - 1; 543 if (r->queue[head]) { 544 /* This batch entry will have to be destroyed. */ 545 goto done; 546 } 547 r->queue[head] = batch[--n]; 548 r->consumer_tail = head; 549 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 550 WRITE_ONCE(r->consumer_head, head); 551 } 552 553done: 554 /* Destroy all entries left in the batch. */ 555 while (n) 556 destroy(batch[--n]); 557 spin_unlock(&r->producer_lock); 558 spin_unlock_irqrestore(&r->consumer_lock, flags); 559} 560 561static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 562 int size, gfp_t gfp, 563 void (*destroy)(void *)) 564{ 565 int producer = 0; 566 void **old; 567 void *ptr; 568 569 while ((ptr = __ptr_ring_consume(r))) 570 if (producer < size) 571 queue[producer++] = ptr; 572 else if (destroy) 573 destroy(ptr); 574 575 if (producer >= size) 576 producer = 0; 577 __ptr_ring_set_size(r, size); 578 r->producer = producer; 579 r->consumer_head = 0; 580 r->consumer_tail = 0; 581 old = r->queue; 582 r->queue = queue; 583 584 return old; 585} 586 587/* 588 * Note: producer lock is nested within consumer lock, so if you 589 * resize you must make sure all uses nest correctly. 590 * In particular if you consume ring in interrupt or BH context, you must 591 * disable interrupts/BH when doing so. 592 */ 593static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 594 void (*destroy)(void *)) 595{ 596 unsigned long flags; 597 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 598 void **old; 599 600 if (!queue) 601 return -ENOMEM; 602 603 spin_lock_irqsave(&(r)->consumer_lock, flags); 604 spin_lock(&(r)->producer_lock); 605 606 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 607 608 spin_unlock(&(r)->producer_lock); 609 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 610 611 kvfree(old); 612 613 return 0; 614} 615 616/* 617 * Note: producer lock is nested within consumer lock, so if you 618 * resize you must make sure all uses nest correctly. 619 * In particular if you consume ring in interrupt or BH context, you must 620 * disable interrupts/BH when doing so. 621 */ 622static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 623 unsigned int nrings, 624 int size, 625 gfp_t gfp, void (*destroy)(void *)) 626{ 627 unsigned long flags; 628 void ***queues; 629 int i; 630 631 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 632 if (!queues) 633 goto noqueues; 634 635 for (i = 0; i < nrings; ++i) { 636 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 637 if (!queues[i]) 638 goto nomem; 639 } 640 641 for (i = 0; i < nrings; ++i) { 642 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 643 spin_lock(&(rings[i])->producer_lock); 644 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 645 size, gfp, destroy); 646 spin_unlock(&(rings[i])->producer_lock); 647 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 648 } 649 650 for (i = 0; i < nrings; ++i) 651 kvfree(queues[i]); 652 653 kfree(queues); 654 655 return 0; 656 657nomem: 658 while (--i >= 0) 659 kvfree(queues[i]); 660 661 kfree(queues); 662 663noqueues: 664 return -ENOMEM; 665} 666 667static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 668{ 669 void *ptr; 670 671 if (destroy) 672 while ((ptr = ptr_ring_consume(r))) 673 destroy(ptr); 674 kvfree(r->queue); 675} 676 677#endif /* _LINUX_PTR_RING_H */