at v4.15 16 kB view raw
1/* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <mst@redhat.com> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21#ifndef _LINUX_PTR_RING_H 22#define _LINUX_PTR_RING_H 1 23 24#ifdef __KERNEL__ 25#include <linux/spinlock.h> 26#include <linux/cache.h> 27#include <linux/types.h> 28#include <linux/compiler.h> 29#include <linux/cache.h> 30#include <linux/slab.h> 31#include <asm/errno.h> 32#endif 33 34struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45}; 46 47/* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). If ring is ever resized, callers must hold 49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 50 * producer_lock, the next call to __ptr_ring_produce may fail. 51 */ 52static inline bool __ptr_ring_full(struct ptr_ring *r) 53{ 54 return r->queue[r->producer]; 55} 56 57static inline bool ptr_ring_full(struct ptr_ring *r) 58{ 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66} 67 68static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69{ 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77} 78 79static inline bool ptr_ring_full_any(struct ptr_ring *r) 80{ 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89} 90 91static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92{ 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100} 101 102/* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 * Callers are responsible for making sure pointer that is being queued 105 * points to a valid data. 106 */ 107static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 108{ 109 if (unlikely(!r->size) || r->queue[r->producer]) 110 return -ENOSPC; 111 112 /* Make sure the pointer we are storing points to a valid data. */ 113 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 114 smp_wmb(); 115 116 r->queue[r->producer++] = ptr; 117 if (unlikely(r->producer >= r->size)) 118 r->producer = 0; 119 return 0; 120} 121 122/* 123 * Note: resize (below) nests producer lock within consumer lock, so if you 124 * consume in interrupt or BH context, you must disable interrupts/BH when 125 * calling this. 126 */ 127static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 128{ 129 int ret; 130 131 spin_lock(&r->producer_lock); 132 ret = __ptr_ring_produce(r, ptr); 133 spin_unlock(&r->producer_lock); 134 135 return ret; 136} 137 138static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 139{ 140 int ret; 141 142 spin_lock_irq(&r->producer_lock); 143 ret = __ptr_ring_produce(r, ptr); 144 spin_unlock_irq(&r->producer_lock); 145 146 return ret; 147} 148 149static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 150{ 151 unsigned long flags; 152 int ret; 153 154 spin_lock_irqsave(&r->producer_lock, flags); 155 ret = __ptr_ring_produce(r, ptr); 156 spin_unlock_irqrestore(&r->producer_lock, flags); 157 158 return ret; 159} 160 161static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 162{ 163 int ret; 164 165 spin_lock_bh(&r->producer_lock); 166 ret = __ptr_ring_produce(r, ptr); 167 spin_unlock_bh(&r->producer_lock); 168 169 return ret; 170} 171 172/* Note: callers invoking this in a loop must use a compiler barrier, 173 * for example cpu_relax(). Callers must take consumer_lock 174 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. 175 * If ring is never resized, and if the pointer is merely 176 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. 177 * However, if called outside the lock, and if some other CPU 178 * consumes ring entries at the same time, the value returned 179 * is not guaranteed to be correct. 180 * In this case - to avoid incorrectly detecting the ring 181 * as empty - the CPU consuming the ring entries is responsible 182 * for either consuming all ring entries until the ring is empty, 183 * or synchronizing with some other CPU and causing it to 184 * execute __ptr_ring_peek and/or consume the ring enteries 185 * after the synchronization point. 186 */ 187static inline void *__ptr_ring_peek(struct ptr_ring *r) 188{ 189 if (likely(r->size)) 190 return r->queue[r->consumer_head]; 191 return NULL; 192} 193 194/* See __ptr_ring_peek above for locking rules. */ 195static inline bool __ptr_ring_empty(struct ptr_ring *r) 196{ 197 return !__ptr_ring_peek(r); 198} 199 200static inline bool ptr_ring_empty(struct ptr_ring *r) 201{ 202 bool ret; 203 204 spin_lock(&r->consumer_lock); 205 ret = __ptr_ring_empty(r); 206 spin_unlock(&r->consumer_lock); 207 208 return ret; 209} 210 211static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 212{ 213 bool ret; 214 215 spin_lock_irq(&r->consumer_lock); 216 ret = __ptr_ring_empty(r); 217 spin_unlock_irq(&r->consumer_lock); 218 219 return ret; 220} 221 222static inline bool ptr_ring_empty_any(struct ptr_ring *r) 223{ 224 unsigned long flags; 225 bool ret; 226 227 spin_lock_irqsave(&r->consumer_lock, flags); 228 ret = __ptr_ring_empty(r); 229 spin_unlock_irqrestore(&r->consumer_lock, flags); 230 231 return ret; 232} 233 234static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 235{ 236 bool ret; 237 238 spin_lock_bh(&r->consumer_lock); 239 ret = __ptr_ring_empty(r); 240 spin_unlock_bh(&r->consumer_lock); 241 242 return ret; 243} 244 245/* Must only be called after __ptr_ring_peek returned !NULL */ 246static inline void __ptr_ring_discard_one(struct ptr_ring *r) 247{ 248 /* Fundamentally, what we want to do is update consumer 249 * index and zero out the entry so producer can reuse it. 250 * Doing it naively at each consume would be as simple as: 251 * r->queue[r->consumer++] = NULL; 252 * if (unlikely(r->consumer >= r->size)) 253 * r->consumer = 0; 254 * but that is suboptimal when the ring is full as producer is writing 255 * out new entries in the same cache line. Defer these updates until a 256 * batch of entries has been consumed. 257 */ 258 int head = r->consumer_head++; 259 260 /* Once we have processed enough entries invalidate them in 261 * the ring all at once so producer can reuse their space in the ring. 262 * We also do this when we reach end of the ring - not mandatory 263 * but helps keep the implementation simple. 264 */ 265 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || 266 r->consumer_head >= r->size)) { 267 /* Zero out entries in the reverse order: this way we touch the 268 * cache line that producer might currently be reading the last; 269 * producer won't make progress and touch other cache lines 270 * besides the first one until we write out all entries. 271 */ 272 while (likely(head >= r->consumer_tail)) 273 r->queue[head--] = NULL; 274 r->consumer_tail = r->consumer_head; 275 } 276 if (unlikely(r->consumer_head >= r->size)) { 277 r->consumer_head = 0; 278 r->consumer_tail = 0; 279 } 280} 281 282static inline void *__ptr_ring_consume(struct ptr_ring *r) 283{ 284 void *ptr; 285 286 ptr = __ptr_ring_peek(r); 287 if (ptr) 288 __ptr_ring_discard_one(r); 289 290 /* Make sure anyone accessing data through the pointer is up to date. */ 291 /* Pairs with smp_wmb in __ptr_ring_produce. */ 292 smp_read_barrier_depends(); 293 return ptr; 294} 295 296static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 297 void **array, int n) 298{ 299 void *ptr; 300 int i; 301 302 for (i = 0; i < n; i++) { 303 ptr = __ptr_ring_consume(r); 304 if (!ptr) 305 break; 306 array[i] = ptr; 307 } 308 309 return i; 310} 311 312/* 313 * Note: resize (below) nests producer lock within consumer lock, so if you 314 * call this in interrupt or BH context, you must disable interrupts/BH when 315 * producing. 316 */ 317static inline void *ptr_ring_consume(struct ptr_ring *r) 318{ 319 void *ptr; 320 321 spin_lock(&r->consumer_lock); 322 ptr = __ptr_ring_consume(r); 323 spin_unlock(&r->consumer_lock); 324 325 return ptr; 326} 327 328static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 329{ 330 void *ptr; 331 332 spin_lock_irq(&r->consumer_lock); 333 ptr = __ptr_ring_consume(r); 334 spin_unlock_irq(&r->consumer_lock); 335 336 return ptr; 337} 338 339static inline void *ptr_ring_consume_any(struct ptr_ring *r) 340{ 341 unsigned long flags; 342 void *ptr; 343 344 spin_lock_irqsave(&r->consumer_lock, flags); 345 ptr = __ptr_ring_consume(r); 346 spin_unlock_irqrestore(&r->consumer_lock, flags); 347 348 return ptr; 349} 350 351static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 352{ 353 void *ptr; 354 355 spin_lock_bh(&r->consumer_lock); 356 ptr = __ptr_ring_consume(r); 357 spin_unlock_bh(&r->consumer_lock); 358 359 return ptr; 360} 361 362static inline int ptr_ring_consume_batched(struct ptr_ring *r, 363 void **array, int n) 364{ 365 int ret; 366 367 spin_lock(&r->consumer_lock); 368 ret = __ptr_ring_consume_batched(r, array, n); 369 spin_unlock(&r->consumer_lock); 370 371 return ret; 372} 373 374static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 375 void **array, int n) 376{ 377 int ret; 378 379 spin_lock_irq(&r->consumer_lock); 380 ret = __ptr_ring_consume_batched(r, array, n); 381 spin_unlock_irq(&r->consumer_lock); 382 383 return ret; 384} 385 386static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 387 void **array, int n) 388{ 389 unsigned long flags; 390 int ret; 391 392 spin_lock_irqsave(&r->consumer_lock, flags); 393 ret = __ptr_ring_consume_batched(r, array, n); 394 spin_unlock_irqrestore(&r->consumer_lock, flags); 395 396 return ret; 397} 398 399static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 400 void **array, int n) 401{ 402 int ret; 403 404 spin_lock_bh(&r->consumer_lock); 405 ret = __ptr_ring_consume_batched(r, array, n); 406 spin_unlock_bh(&r->consumer_lock); 407 408 return ret; 409} 410 411/* Cast to structure type and call a function without discarding from FIFO. 412 * Function must return a value. 413 * Callers must take consumer_lock. 414 */ 415#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 416 417#define PTR_RING_PEEK_CALL(r, f) ({ \ 418 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 419 \ 420 spin_lock(&(r)->consumer_lock); \ 421 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 422 spin_unlock(&(r)->consumer_lock); \ 423 __PTR_RING_PEEK_CALL_v; \ 424}) 425 426#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 427 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 428 \ 429 spin_lock_irq(&(r)->consumer_lock); \ 430 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 431 spin_unlock_irq(&(r)->consumer_lock); \ 432 __PTR_RING_PEEK_CALL_v; \ 433}) 434 435#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 436 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 437 \ 438 spin_lock_bh(&(r)->consumer_lock); \ 439 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 440 spin_unlock_bh(&(r)->consumer_lock); \ 441 __PTR_RING_PEEK_CALL_v; \ 442}) 443 444#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 445 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 446 unsigned long __PTR_RING_PEEK_CALL_f;\ 447 \ 448 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 449 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 450 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 451 __PTR_RING_PEEK_CALL_v; \ 452}) 453 454static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 455{ 456 return kcalloc(size, sizeof(void *), gfp); 457} 458 459static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 460{ 461 r->size = size; 462 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 463 /* We need to set batch at least to 1 to make logic 464 * in __ptr_ring_discard_one work correctly. 465 * Batching too much (because ring is small) would cause a lot of 466 * burstiness. Needs tuning, for now disable batching. 467 */ 468 if (r->batch > r->size / 2 || !r->batch) 469 r->batch = 1; 470} 471 472static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 473{ 474 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 475 if (!r->queue) 476 return -ENOMEM; 477 478 __ptr_ring_set_size(r, size); 479 r->producer = r->consumer_head = r->consumer_tail = 0; 480 spin_lock_init(&r->producer_lock); 481 spin_lock_init(&r->consumer_lock); 482 483 return 0; 484} 485 486/* 487 * Return entries into ring. Destroy entries that don't fit. 488 * 489 * Note: this is expected to be a rare slow path operation. 490 * 491 * Note: producer lock is nested within consumer lock, so if you 492 * resize you must make sure all uses nest correctly. 493 * In particular if you consume ring in interrupt or BH context, you must 494 * disable interrupts/BH when doing so. 495 */ 496static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 497 void (*destroy)(void *)) 498{ 499 unsigned long flags; 500 int head; 501 502 spin_lock_irqsave(&r->consumer_lock, flags); 503 spin_lock(&r->producer_lock); 504 505 if (!r->size) 506 goto done; 507 508 /* 509 * Clean out buffered entries (for simplicity). This way following code 510 * can test entries for NULL and if not assume they are valid. 511 */ 512 head = r->consumer_head - 1; 513 while (likely(head >= r->consumer_tail)) 514 r->queue[head--] = NULL; 515 r->consumer_tail = r->consumer_head; 516 517 /* 518 * Go over entries in batch, start moving head back and copy entries. 519 * Stop when we run into previously unconsumed entries. 520 */ 521 while (n) { 522 head = r->consumer_head - 1; 523 if (head < 0) 524 head = r->size - 1; 525 if (r->queue[head]) { 526 /* This batch entry will have to be destroyed. */ 527 goto done; 528 } 529 r->queue[head] = batch[--n]; 530 r->consumer_tail = r->consumer_head = head; 531 } 532 533done: 534 /* Destroy all entries left in the batch. */ 535 while (n) 536 destroy(batch[--n]); 537 spin_unlock(&r->producer_lock); 538 spin_unlock_irqrestore(&r->consumer_lock, flags); 539} 540 541static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 542 int size, gfp_t gfp, 543 void (*destroy)(void *)) 544{ 545 int producer = 0; 546 void **old; 547 void *ptr; 548 549 while ((ptr = __ptr_ring_consume(r))) 550 if (producer < size) 551 queue[producer++] = ptr; 552 else if (destroy) 553 destroy(ptr); 554 555 __ptr_ring_set_size(r, size); 556 r->producer = producer; 557 r->consumer_head = 0; 558 r->consumer_tail = 0; 559 old = r->queue; 560 r->queue = queue; 561 562 return old; 563} 564 565/* 566 * Note: producer lock is nested within consumer lock, so if you 567 * resize you must make sure all uses nest correctly. 568 * In particular if you consume ring in interrupt or BH context, you must 569 * disable interrupts/BH when doing so. 570 */ 571static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 572 void (*destroy)(void *)) 573{ 574 unsigned long flags; 575 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 576 void **old; 577 578 if (!queue) 579 return -ENOMEM; 580 581 spin_lock_irqsave(&(r)->consumer_lock, flags); 582 spin_lock(&(r)->producer_lock); 583 584 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 585 586 spin_unlock(&(r)->producer_lock); 587 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 588 589 kfree(old); 590 591 return 0; 592} 593 594/* 595 * Note: producer lock is nested within consumer lock, so if you 596 * resize you must make sure all uses nest correctly. 597 * In particular if you consume ring in interrupt or BH context, you must 598 * disable interrupts/BH when doing so. 599 */ 600static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 601 unsigned int nrings, 602 int size, 603 gfp_t gfp, void (*destroy)(void *)) 604{ 605 unsigned long flags; 606 void ***queues; 607 int i; 608 609 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 610 if (!queues) 611 goto noqueues; 612 613 for (i = 0; i < nrings; ++i) { 614 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 615 if (!queues[i]) 616 goto nomem; 617 } 618 619 for (i = 0; i < nrings; ++i) { 620 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 621 spin_lock(&(rings[i])->producer_lock); 622 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 623 size, gfp, destroy); 624 spin_unlock(&(rings[i])->producer_lock); 625 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 626 } 627 628 for (i = 0; i < nrings; ++i) 629 kfree(queues[i]); 630 631 kfree(queues); 632 633 return 0; 634 635nomem: 636 while (--i >= 0) 637 kfree(queues[i]); 638 639 kfree(queues); 640 641noqueues: 642 return -ENOMEM; 643} 644 645static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 646{ 647 void *ptr; 648 649 if (destroy) 650 while ((ptr = ptr_ring_consume(r))) 651 destroy(ptr); 652 kfree(r->queue); 653} 654 655#endif /* _LINUX_PTR_RING_H */