at v4.15-rc6 16 kB view raw
1/* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <mst@redhat.com> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21#ifndef _LINUX_PTR_RING_H 22#define _LINUX_PTR_RING_H 1 23 24#ifdef __KERNEL__ 25#include <linux/spinlock.h> 26#include <linux/cache.h> 27#include <linux/types.h> 28#include <linux/compiler.h> 29#include <linux/cache.h> 30#include <linux/slab.h> 31#include <asm/errno.h> 32#endif 33 34struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45}; 46 47/* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). If ring is ever resized, callers must hold 49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 50 * producer_lock, the next call to __ptr_ring_produce may fail. 51 */ 52static inline bool __ptr_ring_full(struct ptr_ring *r) 53{ 54 return r->queue[r->producer]; 55} 56 57static inline bool ptr_ring_full(struct ptr_ring *r) 58{ 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66} 67 68static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69{ 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77} 78 79static inline bool ptr_ring_full_any(struct ptr_ring *r) 80{ 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89} 90 91static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92{ 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100} 101 102/* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 * Callers are responsible for making sure pointer that is being queued 105 * points to a valid data. 106 */ 107static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 108{ 109 if (unlikely(!r->size) || r->queue[r->producer]) 110 return -ENOSPC; 111 112 /* Make sure the pointer we are storing points to a valid data. */ 113 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 114 smp_wmb(); 115 116 r->queue[r->producer++] = ptr; 117 if (unlikely(r->producer >= r->size)) 118 r->producer = 0; 119 return 0; 120} 121 122/* 123 * Note: resize (below) nests producer lock within consumer lock, so if you 124 * consume in interrupt or BH context, you must disable interrupts/BH when 125 * calling this. 126 */ 127static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 128{ 129 int ret; 130 131 spin_lock(&r->producer_lock); 132 ret = __ptr_ring_produce(r, ptr); 133 spin_unlock(&r->producer_lock); 134 135 return ret; 136} 137 138static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 139{ 140 int ret; 141 142 spin_lock_irq(&r->producer_lock); 143 ret = __ptr_ring_produce(r, ptr); 144 spin_unlock_irq(&r->producer_lock); 145 146 return ret; 147} 148 149static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 150{ 151 unsigned long flags; 152 int ret; 153 154 spin_lock_irqsave(&r->producer_lock, flags); 155 ret = __ptr_ring_produce(r, ptr); 156 spin_unlock_irqrestore(&r->producer_lock, flags); 157 158 return ret; 159} 160 161static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 162{ 163 int ret; 164 165 spin_lock_bh(&r->producer_lock); 166 ret = __ptr_ring_produce(r, ptr); 167 spin_unlock_bh(&r->producer_lock); 168 169 return ret; 170} 171 172/* Note: callers invoking this in a loop must use a compiler barrier, 173 * for example cpu_relax(). Callers must take consumer_lock 174 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. 175 * If ring is never resized, and if the pointer is merely 176 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. 177 */ 178static inline void *__ptr_ring_peek(struct ptr_ring *r) 179{ 180 if (likely(r->size)) 181 return r->queue[r->consumer_head]; 182 return NULL; 183} 184 185/* Note: callers invoking this in a loop must use a compiler barrier, 186 * for example cpu_relax(). Callers must take consumer_lock 187 * if the ring is ever resized - see e.g. ptr_ring_empty. 188 */ 189static inline bool __ptr_ring_empty(struct ptr_ring *r) 190{ 191 return !__ptr_ring_peek(r); 192} 193 194static inline bool ptr_ring_empty(struct ptr_ring *r) 195{ 196 bool ret; 197 198 spin_lock(&r->consumer_lock); 199 ret = __ptr_ring_empty(r); 200 spin_unlock(&r->consumer_lock); 201 202 return ret; 203} 204 205static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 206{ 207 bool ret; 208 209 spin_lock_irq(&r->consumer_lock); 210 ret = __ptr_ring_empty(r); 211 spin_unlock_irq(&r->consumer_lock); 212 213 return ret; 214} 215 216static inline bool ptr_ring_empty_any(struct ptr_ring *r) 217{ 218 unsigned long flags; 219 bool ret; 220 221 spin_lock_irqsave(&r->consumer_lock, flags); 222 ret = __ptr_ring_empty(r); 223 spin_unlock_irqrestore(&r->consumer_lock, flags); 224 225 return ret; 226} 227 228static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 229{ 230 bool ret; 231 232 spin_lock_bh(&r->consumer_lock); 233 ret = __ptr_ring_empty(r); 234 spin_unlock_bh(&r->consumer_lock); 235 236 return ret; 237} 238 239/* Must only be called after __ptr_ring_peek returned !NULL */ 240static inline void __ptr_ring_discard_one(struct ptr_ring *r) 241{ 242 /* Fundamentally, what we want to do is update consumer 243 * index and zero out the entry so producer can reuse it. 244 * Doing it naively at each consume would be as simple as: 245 * r->queue[r->consumer++] = NULL; 246 * if (unlikely(r->consumer >= r->size)) 247 * r->consumer = 0; 248 * but that is suboptimal when the ring is full as producer is writing 249 * out new entries in the same cache line. Defer these updates until a 250 * batch of entries has been consumed. 251 */ 252 int head = r->consumer_head++; 253 254 /* Once we have processed enough entries invalidate them in 255 * the ring all at once so producer can reuse their space in the ring. 256 * We also do this when we reach end of the ring - not mandatory 257 * but helps keep the implementation simple. 258 */ 259 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || 260 r->consumer_head >= r->size)) { 261 /* Zero out entries in the reverse order: this way we touch the 262 * cache line that producer might currently be reading the last; 263 * producer won't make progress and touch other cache lines 264 * besides the first one until we write out all entries. 265 */ 266 while (likely(head >= r->consumer_tail)) 267 r->queue[head--] = NULL; 268 r->consumer_tail = r->consumer_head; 269 } 270 if (unlikely(r->consumer_head >= r->size)) { 271 r->consumer_head = 0; 272 r->consumer_tail = 0; 273 } 274} 275 276static inline void *__ptr_ring_consume(struct ptr_ring *r) 277{ 278 void *ptr; 279 280 ptr = __ptr_ring_peek(r); 281 if (ptr) 282 __ptr_ring_discard_one(r); 283 284 /* Make sure anyone accessing data through the pointer is up to date. */ 285 /* Pairs with smp_wmb in __ptr_ring_produce. */ 286 smp_read_barrier_depends(); 287 return ptr; 288} 289 290static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 291 void **array, int n) 292{ 293 void *ptr; 294 int i; 295 296 for (i = 0; i < n; i++) { 297 ptr = __ptr_ring_consume(r); 298 if (!ptr) 299 break; 300 array[i] = ptr; 301 } 302 303 return i; 304} 305 306/* 307 * Note: resize (below) nests producer lock within consumer lock, so if you 308 * call this in interrupt or BH context, you must disable interrupts/BH when 309 * producing. 310 */ 311static inline void *ptr_ring_consume(struct ptr_ring *r) 312{ 313 void *ptr; 314 315 spin_lock(&r->consumer_lock); 316 ptr = __ptr_ring_consume(r); 317 spin_unlock(&r->consumer_lock); 318 319 return ptr; 320} 321 322static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 323{ 324 void *ptr; 325 326 spin_lock_irq(&r->consumer_lock); 327 ptr = __ptr_ring_consume(r); 328 spin_unlock_irq(&r->consumer_lock); 329 330 return ptr; 331} 332 333static inline void *ptr_ring_consume_any(struct ptr_ring *r) 334{ 335 unsigned long flags; 336 void *ptr; 337 338 spin_lock_irqsave(&r->consumer_lock, flags); 339 ptr = __ptr_ring_consume(r); 340 spin_unlock_irqrestore(&r->consumer_lock, flags); 341 342 return ptr; 343} 344 345static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 346{ 347 void *ptr; 348 349 spin_lock_bh(&r->consumer_lock); 350 ptr = __ptr_ring_consume(r); 351 spin_unlock_bh(&r->consumer_lock); 352 353 return ptr; 354} 355 356static inline int ptr_ring_consume_batched(struct ptr_ring *r, 357 void **array, int n) 358{ 359 int ret; 360 361 spin_lock(&r->consumer_lock); 362 ret = __ptr_ring_consume_batched(r, array, n); 363 spin_unlock(&r->consumer_lock); 364 365 return ret; 366} 367 368static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 369 void **array, int n) 370{ 371 int ret; 372 373 spin_lock_irq(&r->consumer_lock); 374 ret = __ptr_ring_consume_batched(r, array, n); 375 spin_unlock_irq(&r->consumer_lock); 376 377 return ret; 378} 379 380static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 381 void **array, int n) 382{ 383 unsigned long flags; 384 int ret; 385 386 spin_lock_irqsave(&r->consumer_lock, flags); 387 ret = __ptr_ring_consume_batched(r, array, n); 388 spin_unlock_irqrestore(&r->consumer_lock, flags); 389 390 return ret; 391} 392 393static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 394 void **array, int n) 395{ 396 int ret; 397 398 spin_lock_bh(&r->consumer_lock); 399 ret = __ptr_ring_consume_batched(r, array, n); 400 spin_unlock_bh(&r->consumer_lock); 401 402 return ret; 403} 404 405/* Cast to structure type and call a function without discarding from FIFO. 406 * Function must return a value. 407 * Callers must take consumer_lock. 408 */ 409#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 410 411#define PTR_RING_PEEK_CALL(r, f) ({ \ 412 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 413 \ 414 spin_lock(&(r)->consumer_lock); \ 415 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 416 spin_unlock(&(r)->consumer_lock); \ 417 __PTR_RING_PEEK_CALL_v; \ 418}) 419 420#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 421 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 422 \ 423 spin_lock_irq(&(r)->consumer_lock); \ 424 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 425 spin_unlock_irq(&(r)->consumer_lock); \ 426 __PTR_RING_PEEK_CALL_v; \ 427}) 428 429#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 430 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 431 \ 432 spin_lock_bh(&(r)->consumer_lock); \ 433 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 434 spin_unlock_bh(&(r)->consumer_lock); \ 435 __PTR_RING_PEEK_CALL_v; \ 436}) 437 438#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 439 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 440 unsigned long __PTR_RING_PEEK_CALL_f;\ 441 \ 442 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 443 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 444 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 445 __PTR_RING_PEEK_CALL_v; \ 446}) 447 448static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 449{ 450 return kcalloc(size, sizeof(void *), gfp); 451} 452 453static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 454{ 455 r->size = size; 456 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 457 /* We need to set batch at least to 1 to make logic 458 * in __ptr_ring_discard_one work correctly. 459 * Batching too much (because ring is small) would cause a lot of 460 * burstiness. Needs tuning, for now disable batching. 461 */ 462 if (r->batch > r->size / 2 || !r->batch) 463 r->batch = 1; 464} 465 466static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 467{ 468 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 469 if (!r->queue) 470 return -ENOMEM; 471 472 __ptr_ring_set_size(r, size); 473 r->producer = r->consumer_head = r->consumer_tail = 0; 474 spin_lock_init(&r->producer_lock); 475 spin_lock_init(&r->consumer_lock); 476 477 return 0; 478} 479 480/* 481 * Return entries into ring. Destroy entries that don't fit. 482 * 483 * Note: this is expected to be a rare slow path operation. 484 * 485 * Note: producer lock is nested within consumer lock, so if you 486 * resize you must make sure all uses nest correctly. 487 * In particular if you consume ring in interrupt or BH context, you must 488 * disable interrupts/BH when doing so. 489 */ 490static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 491 void (*destroy)(void *)) 492{ 493 unsigned long flags; 494 int head; 495 496 spin_lock_irqsave(&r->consumer_lock, flags); 497 spin_lock(&r->producer_lock); 498 499 if (!r->size) 500 goto done; 501 502 /* 503 * Clean out buffered entries (for simplicity). This way following code 504 * can test entries for NULL and if not assume they are valid. 505 */ 506 head = r->consumer_head - 1; 507 while (likely(head >= r->consumer_tail)) 508 r->queue[head--] = NULL; 509 r->consumer_tail = r->consumer_head; 510 511 /* 512 * Go over entries in batch, start moving head back and copy entries. 513 * Stop when we run into previously unconsumed entries. 514 */ 515 while (n) { 516 head = r->consumer_head - 1; 517 if (head < 0) 518 head = r->size - 1; 519 if (r->queue[head]) { 520 /* This batch entry will have to be destroyed. */ 521 goto done; 522 } 523 r->queue[head] = batch[--n]; 524 r->consumer_tail = r->consumer_head = head; 525 } 526 527done: 528 /* Destroy all entries left in the batch. */ 529 while (n) 530 destroy(batch[--n]); 531 spin_unlock(&r->producer_lock); 532 spin_unlock_irqrestore(&r->consumer_lock, flags); 533} 534 535static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 536 int size, gfp_t gfp, 537 void (*destroy)(void *)) 538{ 539 int producer = 0; 540 void **old; 541 void *ptr; 542 543 while ((ptr = __ptr_ring_consume(r))) 544 if (producer < size) 545 queue[producer++] = ptr; 546 else if (destroy) 547 destroy(ptr); 548 549 __ptr_ring_set_size(r, size); 550 r->producer = producer; 551 r->consumer_head = 0; 552 r->consumer_tail = 0; 553 old = r->queue; 554 r->queue = queue; 555 556 return old; 557} 558 559/* 560 * Note: producer lock is nested within consumer lock, so if you 561 * resize you must make sure all uses nest correctly. 562 * In particular if you consume ring in interrupt or BH context, you must 563 * disable interrupts/BH when doing so. 564 */ 565static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 566 void (*destroy)(void *)) 567{ 568 unsigned long flags; 569 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 570 void **old; 571 572 if (!queue) 573 return -ENOMEM; 574 575 spin_lock_irqsave(&(r)->consumer_lock, flags); 576 spin_lock(&(r)->producer_lock); 577 578 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 579 580 spin_unlock(&(r)->producer_lock); 581 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 582 583 kfree(old); 584 585 return 0; 586} 587 588/* 589 * Note: producer lock is nested within consumer lock, so if you 590 * resize you must make sure all uses nest correctly. 591 * In particular if you consume ring in interrupt or BH context, you must 592 * disable interrupts/BH when doing so. 593 */ 594static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 595 unsigned int nrings, 596 int size, 597 gfp_t gfp, void (*destroy)(void *)) 598{ 599 unsigned long flags; 600 void ***queues; 601 int i; 602 603 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 604 if (!queues) 605 goto noqueues; 606 607 for (i = 0; i < nrings; ++i) { 608 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 609 if (!queues[i]) 610 goto nomem; 611 } 612 613 for (i = 0; i < nrings; ++i) { 614 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 615 spin_lock(&(rings[i])->producer_lock); 616 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 617 size, gfp, destroy); 618 spin_unlock(&(rings[i])->producer_lock); 619 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 620 } 621 622 for (i = 0; i < nrings; ++i) 623 kfree(queues[i]); 624 625 kfree(queues); 626 627 return 0; 628 629nomem: 630 while (--i >= 0) 631 kfree(queues[i]); 632 633 kfree(queues); 634 635noqueues: 636 return -ENOMEM; 637} 638 639static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 640{ 641 void *ptr; 642 643 if (destroy) 644 while ((ptr = ptr_ring_consume(r))) 645 destroy(ptr); 646 kfree(r->queue); 647} 648 649#endif /* _LINUX_PTR_RING_H */