at v4.12 13 kB view raw
1/* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <mst@redhat.com> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21#ifndef _LINUX_PTR_RING_H 22#define _LINUX_PTR_RING_H 1 23 24#ifdef __KERNEL__ 25#include <linux/spinlock.h> 26#include <linux/cache.h> 27#include <linux/types.h> 28#include <linux/compiler.h> 29#include <linux/cache.h> 30#include <linux/slab.h> 31#include <asm/errno.h> 32#endif 33 34struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45}; 46 47/* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). If ring is ever resized, callers must hold 49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 50 * producer_lock, the next call to __ptr_ring_produce may fail. 51 */ 52static inline bool __ptr_ring_full(struct ptr_ring *r) 53{ 54 return r->queue[r->producer]; 55} 56 57static inline bool ptr_ring_full(struct ptr_ring *r) 58{ 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66} 67 68static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69{ 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77} 78 79static inline bool ptr_ring_full_any(struct ptr_ring *r) 80{ 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89} 90 91static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92{ 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100} 101 102/* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 */ 105static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 106{ 107 if (unlikely(!r->size) || r->queue[r->producer]) 108 return -ENOSPC; 109 110 r->queue[r->producer++] = ptr; 111 if (unlikely(r->producer >= r->size)) 112 r->producer = 0; 113 return 0; 114} 115 116/* 117 * Note: resize (below) nests producer lock within consumer lock, so if you 118 * consume in interrupt or BH context, you must disable interrupts/BH when 119 * calling this. 120 */ 121static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 122{ 123 int ret; 124 125 spin_lock(&r->producer_lock); 126 ret = __ptr_ring_produce(r, ptr); 127 spin_unlock(&r->producer_lock); 128 129 return ret; 130} 131 132static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 133{ 134 int ret; 135 136 spin_lock_irq(&r->producer_lock); 137 ret = __ptr_ring_produce(r, ptr); 138 spin_unlock_irq(&r->producer_lock); 139 140 return ret; 141} 142 143static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 144{ 145 unsigned long flags; 146 int ret; 147 148 spin_lock_irqsave(&r->producer_lock, flags); 149 ret = __ptr_ring_produce(r, ptr); 150 spin_unlock_irqrestore(&r->producer_lock, flags); 151 152 return ret; 153} 154 155static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 156{ 157 int ret; 158 159 spin_lock_bh(&r->producer_lock); 160 ret = __ptr_ring_produce(r, ptr); 161 spin_unlock_bh(&r->producer_lock); 162 163 return ret; 164} 165 166/* Note: callers invoking this in a loop must use a compiler barrier, 167 * for example cpu_relax(). Callers must take consumer_lock 168 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. 169 * If ring is never resized, and if the pointer is merely 170 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. 171 */ 172static inline void *__ptr_ring_peek(struct ptr_ring *r) 173{ 174 if (likely(r->size)) 175 return r->queue[r->consumer_head]; 176 return NULL; 177} 178 179/* Note: callers invoking this in a loop must use a compiler barrier, 180 * for example cpu_relax(). Callers must take consumer_lock 181 * if the ring is ever resized - see e.g. ptr_ring_empty. 182 */ 183static inline bool __ptr_ring_empty(struct ptr_ring *r) 184{ 185 return !__ptr_ring_peek(r); 186} 187 188static inline bool ptr_ring_empty(struct ptr_ring *r) 189{ 190 bool ret; 191 192 spin_lock(&r->consumer_lock); 193 ret = __ptr_ring_empty(r); 194 spin_unlock(&r->consumer_lock); 195 196 return ret; 197} 198 199static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 200{ 201 bool ret; 202 203 spin_lock_irq(&r->consumer_lock); 204 ret = __ptr_ring_empty(r); 205 spin_unlock_irq(&r->consumer_lock); 206 207 return ret; 208} 209 210static inline bool ptr_ring_empty_any(struct ptr_ring *r) 211{ 212 unsigned long flags; 213 bool ret; 214 215 spin_lock_irqsave(&r->consumer_lock, flags); 216 ret = __ptr_ring_empty(r); 217 spin_unlock_irqrestore(&r->consumer_lock, flags); 218 219 return ret; 220} 221 222static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 223{ 224 bool ret; 225 226 spin_lock_bh(&r->consumer_lock); 227 ret = __ptr_ring_empty(r); 228 spin_unlock_bh(&r->consumer_lock); 229 230 return ret; 231} 232 233/* Must only be called after __ptr_ring_peek returned !NULL */ 234static inline void __ptr_ring_discard_one(struct ptr_ring *r) 235{ 236 /* Fundamentally, what we want to do is update consumer 237 * index and zero out the entry so producer can reuse it. 238 * Doing it naively at each consume would be as simple as: 239 * r->queue[r->consumer++] = NULL; 240 * if (unlikely(r->consumer >= r->size)) 241 * r->consumer = 0; 242 * but that is suboptimal when the ring is full as producer is writing 243 * out new entries in the same cache line. Defer these updates until a 244 * batch of entries has been consumed. 245 */ 246 int head = r->consumer_head++; 247 248 /* Once we have processed enough entries invalidate them in 249 * the ring all at once so producer can reuse their space in the ring. 250 * We also do this when we reach end of the ring - not mandatory 251 * but helps keep the implementation simple. 252 */ 253 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || 254 r->consumer_head >= r->size)) { 255 /* Zero out entries in the reverse order: this way we touch the 256 * cache line that producer might currently be reading the last; 257 * producer won't make progress and touch other cache lines 258 * besides the first one until we write out all entries. 259 */ 260 while (likely(head >= r->consumer_tail)) 261 r->queue[head--] = NULL; 262 r->consumer_tail = r->consumer_head; 263 } 264 if (unlikely(r->consumer_head >= r->size)) { 265 r->consumer_head = 0; 266 r->consumer_tail = 0; 267 } 268} 269 270static inline void *__ptr_ring_consume(struct ptr_ring *r) 271{ 272 void *ptr; 273 274 ptr = __ptr_ring_peek(r); 275 if (ptr) 276 __ptr_ring_discard_one(r); 277 278 return ptr; 279} 280 281/* 282 * Note: resize (below) nests producer lock within consumer lock, so if you 283 * call this in interrupt or BH context, you must disable interrupts/BH when 284 * producing. 285 */ 286static inline void *ptr_ring_consume(struct ptr_ring *r) 287{ 288 void *ptr; 289 290 spin_lock(&r->consumer_lock); 291 ptr = __ptr_ring_consume(r); 292 spin_unlock(&r->consumer_lock); 293 294 return ptr; 295} 296 297static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 298{ 299 void *ptr; 300 301 spin_lock_irq(&r->consumer_lock); 302 ptr = __ptr_ring_consume(r); 303 spin_unlock_irq(&r->consumer_lock); 304 305 return ptr; 306} 307 308static inline void *ptr_ring_consume_any(struct ptr_ring *r) 309{ 310 unsigned long flags; 311 void *ptr; 312 313 spin_lock_irqsave(&r->consumer_lock, flags); 314 ptr = __ptr_ring_consume(r); 315 spin_unlock_irqrestore(&r->consumer_lock, flags); 316 317 return ptr; 318} 319 320static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 321{ 322 void *ptr; 323 324 spin_lock_bh(&r->consumer_lock); 325 ptr = __ptr_ring_consume(r); 326 spin_unlock_bh(&r->consumer_lock); 327 328 return ptr; 329} 330 331/* Cast to structure type and call a function without discarding from FIFO. 332 * Function must return a value. 333 * Callers must take consumer_lock. 334 */ 335#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 336 337#define PTR_RING_PEEK_CALL(r, f) ({ \ 338 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 339 \ 340 spin_lock(&(r)->consumer_lock); \ 341 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 342 spin_unlock(&(r)->consumer_lock); \ 343 __PTR_RING_PEEK_CALL_v; \ 344}) 345 346#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 347 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 348 \ 349 spin_lock_irq(&(r)->consumer_lock); \ 350 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 351 spin_unlock_irq(&(r)->consumer_lock); \ 352 __PTR_RING_PEEK_CALL_v; \ 353}) 354 355#define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 356 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 357 \ 358 spin_lock_bh(&(r)->consumer_lock); \ 359 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 360 spin_unlock_bh(&(r)->consumer_lock); \ 361 __PTR_RING_PEEK_CALL_v; \ 362}) 363 364#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 365 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 366 unsigned long __PTR_RING_PEEK_CALL_f;\ 367 \ 368 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 369 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 370 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 371 __PTR_RING_PEEK_CALL_v; \ 372}) 373 374static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp) 375{ 376 return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp); 377} 378 379static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 380{ 381 r->size = size; 382 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 383 /* We need to set batch at least to 1 to make logic 384 * in __ptr_ring_discard_one work correctly. 385 * Batching too much (because ring is small) would cause a lot of 386 * burstiness. Needs tuning, for now disable batching. 387 */ 388 if (r->batch > r->size / 2 || !r->batch) 389 r->batch = 1; 390} 391 392static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 393{ 394 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 395 if (!r->queue) 396 return -ENOMEM; 397 398 __ptr_ring_set_size(r, size); 399 r->producer = r->consumer_head = r->consumer_tail = 0; 400 spin_lock_init(&r->producer_lock); 401 spin_lock_init(&r->consumer_lock); 402 403 return 0; 404} 405 406static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 407 int size, gfp_t gfp, 408 void (*destroy)(void *)) 409{ 410 int producer = 0; 411 void **old; 412 void *ptr; 413 414 while ((ptr = __ptr_ring_consume(r))) 415 if (producer < size) 416 queue[producer++] = ptr; 417 else if (destroy) 418 destroy(ptr); 419 420 __ptr_ring_set_size(r, size); 421 r->producer = producer; 422 r->consumer_head = 0; 423 r->consumer_tail = 0; 424 old = r->queue; 425 r->queue = queue; 426 427 return old; 428} 429 430/* 431 * Note: producer lock is nested within consumer lock, so if you 432 * resize you must make sure all uses nest correctly. 433 * In particular if you consume ring in interrupt or BH context, you must 434 * disable interrupts/BH when doing so. 435 */ 436static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 437 void (*destroy)(void *)) 438{ 439 unsigned long flags; 440 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 441 void **old; 442 443 if (!queue) 444 return -ENOMEM; 445 446 spin_lock_irqsave(&(r)->consumer_lock, flags); 447 spin_lock(&(r)->producer_lock); 448 449 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 450 451 spin_unlock(&(r)->producer_lock); 452 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 453 454 kfree(old); 455 456 return 0; 457} 458 459/* 460 * Note: producer lock is nested within consumer lock, so if you 461 * resize you must make sure all uses nest correctly. 462 * In particular if you consume ring in interrupt or BH context, you must 463 * disable interrupts/BH when doing so. 464 */ 465static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, int nrings, 466 int size, 467 gfp_t gfp, void (*destroy)(void *)) 468{ 469 unsigned long flags; 470 void ***queues; 471 int i; 472 473 queues = kmalloc(nrings * sizeof *queues, gfp); 474 if (!queues) 475 goto noqueues; 476 477 for (i = 0; i < nrings; ++i) { 478 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 479 if (!queues[i]) 480 goto nomem; 481 } 482 483 for (i = 0; i < nrings; ++i) { 484 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 485 spin_lock(&(rings[i])->producer_lock); 486 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 487 size, gfp, destroy); 488 spin_unlock(&(rings[i])->producer_lock); 489 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 490 } 491 492 for (i = 0; i < nrings; ++i) 493 kfree(queues[i]); 494 495 kfree(queues); 496 497 return 0; 498 499nomem: 500 while (--i >= 0) 501 kfree(queues[i]); 502 503 kfree(queues); 504 505noqueues: 506 return -ENOMEM; 507} 508 509static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 510{ 511 void *ptr; 512 513 if (destroy) 514 while ((ptr = ptr_ring_consume(r))) 515 destroy(ptr); 516 kfree(r->queue); 517} 518 519#endif /* _LINUX_PTR_RING_H */