at v4.13 16 kB view raw
1/* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <mst@redhat.com> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21#ifndef _LINUX_PTR_RING_H 22#define _LINUX_PTR_RING_H 1 23 24#ifdef __KERNEL__ 25#include <linux/spinlock.h> 26#include <linux/cache.h> 27#include <linux/types.h> 28#include <linux/compiler.h> 29#include <linux/cache.h> 30#include <linux/slab.h> 31#include <asm/errno.h> 32#endif 33 34struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45}; 46 47/* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). If ring is ever resized, callers must hold 49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 50 * producer_lock, the next call to __ptr_ring_produce may fail. 51 */ 52static inline bool __ptr_ring_full(struct ptr_ring *r) 53{ 54 return r->queue[r->producer]; 55} 56 57static inline bool ptr_ring_full(struct ptr_ring *r) 58{ 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66} 67 68static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69{ 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77} 78 79static inline bool ptr_ring_full_any(struct ptr_ring *r) 80{ 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89} 90 91static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92{ 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100} 101 102/* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 */ 105static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 106{ 107 if (unlikely(!r->size) || r->queue[r->producer]) 108 return -ENOSPC; 109 110 r->queue[r->producer++] = ptr; 111 if (unlikely(r->producer >= r->size)) 112 r->producer = 0; 113 return 0; 114} 115 116/* 117 * Note: resize (below) nests producer lock within consumer lock, so if you 118 * consume in interrupt or BH context, you must disable interrupts/BH when 119 * calling this. 120 */ 121static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 122{ 123 int ret; 124 125 spin_lock(&r->producer_lock); 126 ret = __ptr_ring_produce(r, ptr); 127 spin_unlock(&r->producer_lock); 128 129 return ret; 130} 131 132static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 133{ 134 int ret; 135 136 spin_lock_irq(&r->producer_lock); 137 ret = __ptr_ring_produce(r, ptr); 138 spin_unlock_irq(&r->producer_lock); 139 140 return ret; 141} 142 143static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 144{ 145 unsigned long flags; 146 int ret; 147 148 spin_lock_irqsave(&r->producer_lock, flags); 149 ret = __ptr_ring_produce(r, ptr); 150 spin_unlock_irqrestore(&r->producer_lock, flags); 151 152 return ret; 153} 154 155static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 156{ 157 int ret; 158 159 spin_lock_bh(&r->producer_lock); 160 ret = __ptr_ring_produce(r, ptr); 161 spin_unlock_bh(&r->producer_lock); 162 163 return ret; 164} 165 166/* Note: callers invoking this in a loop must use a compiler barrier, 167 * for example cpu_relax(). Callers must take consumer_lock 168 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. 169 * If ring is never resized, and if the pointer is merely 170 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. 171 */ 172static inline void *__ptr_ring_peek(struct ptr_ring *r) 173{ 174 if (likely(r->size)) 175 return r->queue[r->consumer_head]; 176 return NULL; 177} 178 179/* Note: callers invoking this in a loop must use a compiler barrier, 180 * for example cpu_relax(). Callers must take consumer_lock 181 * if the ring is ever resized - see e.g. ptr_ring_empty. 182 */ 183static inline bool __ptr_ring_empty(struct ptr_ring *r) 184{ 185 return !__ptr_ring_peek(r); 186} 187 188static inline bool ptr_ring_empty(struct ptr_ring *r) 189{ 190 bool ret; 191 192 spin_lock(&r->consumer_lock); 193 ret = __ptr_ring_empty(r); 194 spin_unlock(&r->consumer_lock); 195 196 return ret; 197} 198 199static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 200{ 201 bool ret; 202 203 spin_lock_irq(&r->consumer_lock); 204 ret = __ptr_ring_empty(r); 205 spin_unlock_irq(&r->consumer_lock); 206 207 return ret; 208} 209 210static inline bool ptr_ring_empty_any(struct ptr_ring *r) 211{ 212 unsigned long flags; 213 bool ret; 214 215 spin_lock_irqsave(&r->consumer_lock, flags); 216 ret = __ptr_ring_empty(r); 217 spin_unlock_irqrestore(&r->consumer_lock, flags); 218 219 return ret; 220} 221 222static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 223{ 224 bool ret; 225 226 spin_lock_bh(&r->consumer_lock); 227 ret = __ptr_ring_empty(r); 228 spin_unlock_bh(&r->consumer_lock); 229 230 return ret; 231} 232 233/* Must only be called after __ptr_ring_peek returned !NULL */ 234static inline void __ptr_ring_discard_one(struct ptr_ring *r) 235{ 236 /* Fundamentally, what we want to do is update consumer 237 * index and zero out the entry so producer can reuse it. 238 * Doing it naively at each consume would be as simple as: 239 * r->queue[r->consumer++] = NULL; 240 * if (unlikely(r->consumer >= r->size)) 241 * r->consumer = 0; 242 * but that is suboptimal when the ring is full as producer is writing 243 * out new entries in the same cache line. Defer these updates until a 244 * batch of entries has been consumed. 245 */ 246 int head = r->consumer_head++; 247 248 /* Once we have processed enough entries invalidate them in 249 * the ring all at once so producer can reuse their space in the ring. 250 * We also do this when we reach end of the ring - not mandatory 251 * but helps keep the implementation simple. 252 */ 253 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || 254 r->consumer_head >= r->size)) { 255 /* Zero out entries in the reverse order: this way we touch the 256 * cache line that producer might currently be reading the last; 257 * producer won't make progress and touch other cache lines 258 * besides the first one until we write out all entries. 259 */ 260 while (likely(head >= r->consumer_tail)) 261 r->queue[head--] = NULL; 262 r->consumer_tail = r->consumer_head; 263 } 264 if (unlikely(r->consumer_head >= r->size)) { 265 r->consumer_head = 0; 266 r->consumer_tail = 0; 267 } 268} 269 270static inline void *__ptr_ring_consume(struct ptr_ring *r) 271{ 272 void *ptr; 273 274 ptr = __ptr_ring_peek(r); 275 if (ptr) 276 __ptr_ring_discard_one(r); 277 278 return ptr; 279} 280 281static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 282 void **array, int n) 283{ 284 void *ptr; 285 int i; 286 287 for (i = 0; i < n; i++) { 288 ptr = __ptr_ring_consume(r); 289 if (!ptr) 290 break; 291 array[i] = ptr; 292 } 293 294 return i; 295} 296 297/* 298 * Note: resize (below) nests producer lock within consumer lock, so if you 299 * call this in interrupt or BH context, you must disable interrupts/BH when 300 * producing. 301 */ 302static inline void *ptr_ring_consume(struct ptr_ring *r) 303{ 304 void *ptr; 305 306 spin_lock(&r->consumer_lock); 307 ptr = __ptr_ring_consume(r); 308 spin_unlock(&r->consumer_lock); 309 310 return ptr; 311} 312 313static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 314{ 315 void *ptr; 316 317 spin_lock_irq(&r->consumer_lock); 318 ptr = __ptr_ring_consume(r); 319 spin_unlock_irq(&r->consumer_lock); 320 321 return ptr; 322} 323 324static inline void *ptr_ring_consume_any(struct ptr_ring *r) 325{ 326 unsigned long flags; 327 void *ptr; 328 329 spin_lock_irqsave(&r->consumer_lock, flags); 330 ptr = __ptr_ring_consume(r); 331 spin_unlock_irqrestore(&r->consumer_lock, flags); 332 333 return ptr; 334} 335 336static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 337{ 338 void *ptr; 339 340 spin_lock_bh(&r->consumer_lock); 341 ptr = __ptr_ring_consume(r); 342 spin_unlock_bh(&r->consumer_lock); 343 344 return ptr; 345} 346 347static inline int ptr_ring_consume_batched(struct ptr_ring *r, 348 void **array, int n) 349{ 350 int ret; 351 352 spin_lock(&r->consumer_lock); 353 ret = __ptr_ring_consume_batched(r, array, n); 354 spin_unlock(&r->consumer_lock); 355 356 return ret; 357} 358 359static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 360 void **array, int n) 361{ 362 int ret; 363 364 spin_lock_irq(&r->consumer_lock); 365 ret = __ptr_ring_consume_batched(r, array, n); 366 spin_unlock_irq(&r->consumer_lock); 367 368 return ret; 369} 370 371static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 372 void **array, int n) 373{ 374 unsigned long flags; 375 int ret; 376 377 spin_lock_irqsave(&r->consumer_lock, flags); 378 ret = __ptr_ring_consume_batched(r, array, n); 379 spin_unlock_irqrestore(&r->consumer_lock, flags); 380 381 return ret; 382} 383 384static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 385 void **array, int n) 386{ 387 int ret; 388 389 spin_lock_bh(&r->consumer_lock); 390 ret = __ptr_ring_consume_batched(r, array, n); 391 spin_unlock_bh(&r->consumer_lock); 392 393 return ret; 394} 395 396/* Cast to structure type and call a function without discarding from FIFO. 397 * Function must return a value. 398 * Callers must take consumer_lock. 399 */ 400#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 401 402#define PTR_RING_PEEK_CALL(r, f) ({ \ 403 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 404 \ 405 spin_lock(&(r)->consumer_lock); \ 406 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 407 spin_unlock(&(r)->consumer_lock); \ 408 __PTR_RING_PEEK_CALL_v; \ 409}) 410 411#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 412 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 413 \ 414 spin_lock_irq(&(r)->consumer_lock); \ 415 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 416 spin_unlock_irq(&(r)->consumer_lock); \ 417 __PTR_RING_PEEK_CALL_v; \ 418}) 419 420#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 421 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 422 \ 423 spin_lock_bh(&(r)->consumer_lock); \ 424 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 425 spin_unlock_bh(&(r)->consumer_lock); \ 426 __PTR_RING_PEEK_CALL_v; \ 427}) 428 429#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 430 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 431 unsigned long __PTR_RING_PEEK_CALL_f;\ 432 \ 433 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 434 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 435 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 436 __PTR_RING_PEEK_CALL_v; \ 437}) 438 439static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 440{ 441 return kcalloc(size, sizeof(void *), gfp); 442} 443 444static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 445{ 446 r->size = size; 447 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 448 /* We need to set batch at least to 1 to make logic 449 * in __ptr_ring_discard_one work correctly. 450 * Batching too much (because ring is small) would cause a lot of 451 * burstiness. Needs tuning, for now disable batching. 452 */ 453 if (r->batch > r->size / 2 || !r->batch) 454 r->batch = 1; 455} 456 457static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 458{ 459 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 460 if (!r->queue) 461 return -ENOMEM; 462 463 __ptr_ring_set_size(r, size); 464 r->producer = r->consumer_head = r->consumer_tail = 0; 465 spin_lock_init(&r->producer_lock); 466 spin_lock_init(&r->consumer_lock); 467 468 return 0; 469} 470 471/* 472 * Return entries into ring. Destroy entries that don't fit. 473 * 474 * Note: this is expected to be a rare slow path operation. 475 * 476 * Note: producer lock is nested within consumer lock, so if you 477 * resize you must make sure all uses nest correctly. 478 * In particular if you consume ring in interrupt or BH context, you must 479 * disable interrupts/BH when doing so. 480 */ 481static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 482 void (*destroy)(void *)) 483{ 484 unsigned long flags; 485 int head; 486 487 spin_lock_irqsave(&r->consumer_lock, flags); 488 spin_lock(&r->producer_lock); 489 490 if (!r->size) 491 goto done; 492 493 /* 494 * Clean out buffered entries (for simplicity). This way following code 495 * can test entries for NULL and if not assume they are valid. 496 */ 497 head = r->consumer_head - 1; 498 while (likely(head >= r->consumer_tail)) 499 r->queue[head--] = NULL; 500 r->consumer_tail = r->consumer_head; 501 502 /* 503 * Go over entries in batch, start moving head back and copy entries. 504 * Stop when we run into previously unconsumed entries. 505 */ 506 while (n) { 507 head = r->consumer_head - 1; 508 if (head < 0) 509 head = r->size - 1; 510 if (r->queue[head]) { 511 /* This batch entry will have to be destroyed. */ 512 goto done; 513 } 514 r->queue[head] = batch[--n]; 515 r->consumer_tail = r->consumer_head = head; 516 } 517 518done: 519 /* Destroy all entries left in the batch. */ 520 while (n) 521 destroy(batch[--n]); 522 spin_unlock(&r->producer_lock); 523 spin_unlock_irqrestore(&r->consumer_lock, flags); 524} 525 526static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 527 int size, gfp_t gfp, 528 void (*destroy)(void *)) 529{ 530 int producer = 0; 531 void **old; 532 void *ptr; 533 534 while ((ptr = __ptr_ring_consume(r))) 535 if (producer < size) 536 queue[producer++] = ptr; 537 else if (destroy) 538 destroy(ptr); 539 540 __ptr_ring_set_size(r, size); 541 r->producer = producer; 542 r->consumer_head = 0; 543 r->consumer_tail = 0; 544 old = r->queue; 545 r->queue = queue; 546 547 return old; 548} 549 550/* 551 * Note: producer lock is nested within consumer lock, so if you 552 * resize you must make sure all uses nest correctly. 553 * In particular if you consume ring in interrupt or BH context, you must 554 * disable interrupts/BH when doing so. 555 */ 556static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 557 void (*destroy)(void *)) 558{ 559 unsigned long flags; 560 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 561 void **old; 562 563 if (!queue) 564 return -ENOMEM; 565 566 spin_lock_irqsave(&(r)->consumer_lock, flags); 567 spin_lock(&(r)->producer_lock); 568 569 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 570 571 spin_unlock(&(r)->producer_lock); 572 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 573 574 kfree(old); 575 576 return 0; 577} 578 579/* 580 * Note: producer lock is nested within consumer lock, so if you 581 * resize you must make sure all uses nest correctly. 582 * In particular if you consume ring in interrupt or BH context, you must 583 * disable interrupts/BH when doing so. 584 */ 585static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 586 unsigned int nrings, 587 int size, 588 gfp_t gfp, void (*destroy)(void *)) 589{ 590 unsigned long flags; 591 void ***queues; 592 int i; 593 594 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 595 if (!queues) 596 goto noqueues; 597 598 for (i = 0; i < nrings; ++i) { 599 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 600 if (!queues[i]) 601 goto nomem; 602 } 603 604 for (i = 0; i < nrings; ++i) { 605 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 606 spin_lock(&(rings[i])->producer_lock); 607 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 608 size, gfp, destroy); 609 spin_unlock(&(rings[i])->producer_lock); 610 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 611 } 612 613 for (i = 0; i < nrings; ++i) 614 kfree(queues[i]); 615 616 kfree(queues); 617 618 return 0; 619 620nomem: 621 while (--i >= 0) 622 kfree(queues[i]); 623 624 kfree(queues); 625 626noqueues: 627 return -ENOMEM; 628} 629 630static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 631{ 632 void *ptr; 633 634 if (destroy) 635 while ((ptr = ptr_ring_consume(r))) 636 destroy(ptr); 637 kfree(r->queue); 638} 639 640#endif /* _LINUX_PTR_RING_H */