Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

lib: add light-weight queuing mechanism.

lwq is a FIFO single-linked queue that only requires a spinlock
for dequeueing, which happens in process context. Enqueueing is atomic
with no spinlock and can happen in any context.

This is particularly useful when work items are queued from BH or IRQ
context, and when they are handled one at a time by dedicated threads.

Avoiding any locking when enqueueing means there is no need to disable
BH or interrupts, which is generally best avoided (particularly when
there are any RT tasks on the machine).

This solution is superior to using "list_head" links because we need
half as many pointers in the data structures, and because list_head
lists would need locking to add items to the queue.

This solution is superior to a bespoke solution as all locking and
container_of casting is integrated, so the interface is simple.

Despite the similar name, this solution meets a distinctly different
need to kfifo. kfifo provides a fixed sized circular buffer to which
data can be added at one end and removed at the other, and does not
provide any locking. lwq does not have any size limit and works with
data structures (objects?) rather than data (bytes).

A unit test for basic functionality, which runs at boot time, is included.

Signed-off-by: NeilBrown <neilb@suse.de>
Cc: Andrew Morton <akpm@linux-foundation.org>
Cc: "Liam R. Howlett" <Liam.Howlett@oracle.com>
Cc: Kees Cook <keescook@chromium.org>
Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
Cc: David Gow <davidgow@google.com>
Cc: linux-kernel@vger.kernel.org
Message-Id: <20230911111333.4d1a872330e924a00acb905b@linux-foundation.org>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>

authored by

NeilBrown and committed by
Chuck Lever
de9e82c3 8a3e5975

+288 -1
+124
include/linux/lwq.h
··· 1 + /* SPDX-License-Identifier: GPL-2.0-only */ 2 + 3 + #ifndef LWQ_H 4 + #define LWQ_H 5 + /* 6 + * Light-weight single-linked queue built from llist 7 + * 8 + * Entries can be enqueued from any context with no locking. 9 + * Entries can be dequeued from process context with integrated locking. 10 + * 11 + * This is particularly suitable when work items are queued in 12 + * BH or IRQ context, and where work items are handled one at a time 13 + * by dedicated threads. 14 + */ 15 + #include <linux/container_of.h> 16 + #include <linux/spinlock.h> 17 + #include <linux/llist.h> 18 + 19 + struct lwq_node { 20 + struct llist_node node; 21 + }; 22 + 23 + struct lwq { 24 + spinlock_t lock; 25 + struct llist_node *ready; /* entries to be dequeued */ 26 + struct llist_head new; /* entries being enqueued */ 27 + }; 28 + 29 + /** 30 + * lwq_init - initialise a lwq 31 + * @q: the lwq object 32 + */ 33 + static inline void lwq_init(struct lwq *q) 34 + { 35 + spin_lock_init(&q->lock); 36 + q->ready = NULL; 37 + init_llist_head(&q->new); 38 + } 39 + 40 + /** 41 + * lwq_empty - test if lwq contains any entry 42 + * @q: the lwq object 43 + * 44 + * This empty test contains an acquire barrier so that if a wakeup 45 + * is sent when lwq_dequeue returns true, it is safe to go to sleep after 46 + * a test on lwq_empty(). 47 + */ 48 + static inline bool lwq_empty(struct lwq *q) 49 + { 50 + /* acquire ensures ordering wrt lwq_enqueue() */ 51 + return smp_load_acquire(&q->ready) == NULL && llist_empty(&q->new); 52 + } 53 + 54 + struct llist_node *__lwq_dequeue(struct lwq *q); 55 + /** 56 + * lwq_dequeue - dequeue first (oldest) entry from lwq 57 + * @q: the queue to dequeue from 58 + * @type: the type of object to return 59 + * @member: them member in returned object which is an lwq_node. 60 + * 61 + * Remove a single object from the lwq and return it. This will take 62 + * a spinlock and so must always be called in the same context, typcially 63 + * process contet. 64 + */ 65 + #define lwq_dequeue(q, type, member) \ 66 + ({ struct llist_node *_n = __lwq_dequeue(q); \ 67 + _n ? container_of(_n, type, member.node) : NULL; }) 68 + 69 + struct llist_node *lwq_dequeue_all(struct lwq *q); 70 + 71 + /** 72 + * lwq_for_each_safe - iterate over detached queue allowing deletion 73 + * @_n: iterator variable 74 + * @_t1: temporary struct llist_node ** 75 + * @_t2: temporary struct llist_node * 76 + * @_l: address of llist_node pointer from lwq_dequeue_all() 77 + * @_member: member in _n where lwq_node is found. 78 + * 79 + * Iterate over members in a dequeued list. If the iterator variable 80 + * is set to NULL, the iterator removes that entry from the queue. 81 + */ 82 + #define lwq_for_each_safe(_n, _t1, _t2, _l, _member) \ 83 + for (_t1 = (_l); \ 84 + *(_t1) ? (_n = container_of(*(_t1), typeof(*(_n)), _member.node),\ 85 + _t2 = ((*_t1)->next), \ 86 + true) \ 87 + : false; \ 88 + (_n) ? (_t1 = &(_n)->_member.node.next, 0) \ 89 + : ((*(_t1) = (_t2)), 0)) 90 + 91 + /** 92 + * lwq_enqueue - add a new item to the end of the queue 93 + * @n - the lwq_node embedded in the item to be added 94 + * @q - the lwq to append to. 95 + * 96 + * No locking is needed to append to the queue so this can 97 + * be called from any context. 98 + * Return %true is the list may have previously been empty. 99 + */ 100 + static inline bool lwq_enqueue(struct lwq_node *n, struct lwq *q) 101 + { 102 + /* acquire enqures ordering wrt lwq_dequeue */ 103 + return llist_add(&n->node, &q->new) && 104 + smp_load_acquire(&q->ready) == NULL; 105 + } 106 + 107 + /** 108 + * lwq_enqueue_batch - add a list of new items to the end of the queue 109 + * @n - the lwq_node embedded in the first item to be added 110 + * @q - the lwq to append to. 111 + * 112 + * No locking is needed to append to the queue so this can 113 + * be called from any context. 114 + * Return %true is the list may have previously been empty. 115 + */ 116 + static inline bool lwq_enqueue_batch(struct llist_node *n, struct lwq *q) 117 + { 118 + struct llist_node *e = n; 119 + 120 + /* acquire enqures ordering wrt lwq_dequeue */ 121 + return llist_add_batch(llist_reverse_order(n), e, &q->new) && 122 + smp_load_acquire(&q->ready) == NULL; 123 + } 124 + #endif /* LWQ_H */
+5
lib/Kconfig
··· 729 729 config OBJAGG 730 730 tristate "objagg" if COMPILE_TEST 731 731 732 + config LWQ_TEST 733 + bool "Boot-time test for lwq queuing" 734 + help 735 + Run boot-time test of light-weight queuing. 736 + 732 737 endmenu 733 738 734 739 config GENERIC_IOREMAP
+1 -1
lib/Makefile
··· 45 45 obj-y += bcd.o sort.o parser.o debug_locks.o random32.o \ 46 46 bust_spinlocks.o kasprintf.o bitmap.o scatterlist.o \ 47 47 list_sort.o uuid.o iov_iter.o clz_ctz.o \ 48 - bsearch.o find_bit.o llist.o memweight.o kfifo.o \ 48 + bsearch.o find_bit.o llist.o lwq.o memweight.o kfifo.o \ 49 49 percpu-refcount.o rhashtable.o base64.o \ 50 50 once.o refcount.o rcuref.o usercopy.o errseq.o bucket_locks.o \ 51 51 generic-radix-tree.o
+158
lib/lwq.c
··· 1 + // SPDX-License-Identifier: GPL-2.0-only 2 + /* 3 + * Light-weight single-linked queue. 4 + * 5 + * Entries are enqueued to the head of an llist, with no blocking. 6 + * This can happen in any context. 7 + * 8 + * Entries are dequeued using a spinlock to protect against multiple 9 + * access. The llist is staged in reverse order, and refreshed 10 + * from the llist when it exhausts. 11 + * 12 + * This is particularly suitable when work items are queued in BH or 13 + * IRQ context, and where work items are handled one at a time by 14 + * dedicated threads. 15 + */ 16 + #include <linux/rcupdate.h> 17 + #include <linux/lwq.h> 18 + 19 + struct llist_node *__lwq_dequeue(struct lwq *q) 20 + { 21 + struct llist_node *this; 22 + 23 + if (lwq_empty(q)) 24 + return NULL; 25 + spin_lock(&q->lock); 26 + this = q->ready; 27 + if (!this && !llist_empty(&q->new)) { 28 + /* ensure queue doesn't appear transiently lwq_empty */ 29 + smp_store_release(&q->ready, (void *)1); 30 + this = llist_reverse_order(llist_del_all(&q->new)); 31 + if (!this) 32 + q->ready = NULL; 33 + } 34 + if (this) 35 + q->ready = llist_next(this); 36 + spin_unlock(&q->lock); 37 + return this; 38 + } 39 + EXPORT_SYMBOL_GPL(__lwq_dequeue); 40 + 41 + /** 42 + * lwq_dequeue_all - dequeue all currently enqueued objects 43 + * @q: the queue to dequeue from 44 + * 45 + * Remove and return a linked list of llist_nodes of all the objects that were 46 + * in the queue. The first on the list will be the object that was least 47 + * recently enqueued. 48 + */ 49 + struct llist_node *lwq_dequeue_all(struct lwq *q) 50 + { 51 + struct llist_node *r, *t, **ep; 52 + 53 + if (lwq_empty(q)) 54 + return NULL; 55 + 56 + spin_lock(&q->lock); 57 + r = q->ready; 58 + q->ready = NULL; 59 + t = llist_del_all(&q->new); 60 + spin_unlock(&q->lock); 61 + ep = &r; 62 + while (*ep) 63 + ep = &(*ep)->next; 64 + *ep = llist_reverse_order(t); 65 + return r; 66 + } 67 + EXPORT_SYMBOL_GPL(lwq_dequeue_all); 68 + 69 + #if IS_ENABLED(CONFIG_LWQ_TEST) 70 + 71 + #include <linux/module.h> 72 + #include <linux/slab.h> 73 + #include <linux/wait_bit.h> 74 + #include <linux/kthread.h> 75 + #include <linux/delay.h> 76 + struct tnode { 77 + struct lwq_node n; 78 + int i; 79 + int c; 80 + }; 81 + 82 + static int lwq_exercise(void *qv) 83 + { 84 + struct lwq *q = qv; 85 + int cnt; 86 + struct tnode *t; 87 + 88 + for (cnt = 0; cnt < 10000; cnt++) { 89 + wait_var_event(q, (t = lwq_dequeue(q, struct tnode, n)) != NULL); 90 + t->c++; 91 + if (lwq_enqueue(&t->n, q)) 92 + wake_up_var(q); 93 + } 94 + while (!kthread_should_stop()) 95 + schedule_timeout_idle(1); 96 + return 0; 97 + } 98 + 99 + static int lwq_test(void) 100 + { 101 + int i; 102 + struct lwq q; 103 + struct llist_node *l, **t1, *t2; 104 + struct tnode *t; 105 + struct task_struct *threads[8]; 106 + 107 + printk(KERN_INFO "testing lwq....\n"); 108 + lwq_init(&q); 109 + printk(KERN_INFO " lwq: run some threads\n"); 110 + for (i = 0; i < ARRAY_SIZE(threads); i++) 111 + threads[i] = kthread_run(lwq_exercise, &q, "lwq-test-%d", i); 112 + for (i = 0; i < 100; i++) { 113 + t = kmalloc(sizeof(*t), GFP_KERNEL); 114 + if (!t) 115 + break; 116 + t->i = i; 117 + t->c = 0; 118 + if (lwq_enqueue(&t->n, &q)) 119 + wake_up_var(&q); 120 + } 121 + /* wait for threads to exit */ 122 + for (i = 0; i < ARRAY_SIZE(threads); i++) 123 + if (!IS_ERR_OR_NULL(threads[i])) 124 + kthread_stop(threads[i]); 125 + printk(KERN_INFO " lwq: dequeue first 50:"); 126 + for (i = 0; i < 50 ; i++) { 127 + if (i && (i % 10) == 0) { 128 + printk(KERN_CONT "\n"); 129 + printk(KERN_INFO " lwq: ... "); 130 + } 131 + t = lwq_dequeue(&q, struct tnode, n); 132 + if (t) 133 + printk(KERN_CONT " %d(%d)", t->i, t->c); 134 + kfree(t); 135 + } 136 + printk(KERN_CONT "\n"); 137 + l = lwq_dequeue_all(&q); 138 + printk(KERN_INFO " lwq: delete the multiples of 3 (test lwq_for_each_safe())\n"); 139 + lwq_for_each_safe(t, t1, t2, &l, n) { 140 + if ((t->i % 3) == 0) { 141 + t->i = -1; 142 + kfree(t); 143 + t = NULL; 144 + } 145 + } 146 + if (l) 147 + lwq_enqueue_batch(l, &q); 148 + printk(KERN_INFO " lwq: dequeue remaining:"); 149 + while ((t = lwq_dequeue(&q, struct tnode, n)) != NULL) { 150 + printk(KERN_CONT " %d", t->i); 151 + kfree(t); 152 + } 153 + printk(KERN_CONT "\n"); 154 + return 0; 155 + } 156 + 157 + module_init(lwq_test); 158 + #endif /* CONFIG_LWQ_TEST*/