Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1/* SPDX-License-Identifier: GPL-2.0 */
2/* XDP user-space ring structure
3 * Copyright(c) 2018 Intel Corporation.
4 */
5
6#ifndef _LINUX_XSK_QUEUE_H
7#define _LINUX_XSK_QUEUE_H
8
9#include <linux/types.h>
10#include <linux/if_xdp.h>
11#include <net/xdp_sock.h>
12#include <net/xsk_buff_pool.h>
13
14#include "xsk.h"
15
16struct xdp_ring {
17 u32 producer ____cacheline_aligned_in_smp;
18 /* Hinder the adjacent cache prefetcher to prefetch the consumer
19 * pointer if the producer pointer is touched and vice versa.
20 */
21 u32 pad1 ____cacheline_aligned_in_smp;
22 u32 consumer ____cacheline_aligned_in_smp;
23 u32 pad2 ____cacheline_aligned_in_smp;
24 u32 flags;
25 u32 pad3 ____cacheline_aligned_in_smp;
26};
27
28/* Used for the RX and TX queues for packets */
29struct xdp_rxtx_ring {
30 struct xdp_ring ptrs;
31 struct xdp_desc desc[] ____cacheline_aligned_in_smp;
32};
33
34/* Used for the fill and completion queues for buffers */
35struct xdp_umem_ring {
36 struct xdp_ring ptrs;
37 u64 desc[] ____cacheline_aligned_in_smp;
38};
39
40struct xsk_queue {
41 u32 ring_mask;
42 u32 nentries;
43 u32 cached_prod;
44 u32 cached_cons;
45 struct xdp_ring *ring;
46 u64 invalid_descs;
47 u64 queue_empty_descs;
48 size_t ring_vmalloc_size;
49};
50
51/* The structure of the shared state of the rings are a simple
52 * circular buffer, as outlined in
53 * Documentation/core-api/circular-buffers.rst. For the Rx and
54 * completion ring, the kernel is the producer and user space is the
55 * consumer. For the Tx and fill rings, the kernel is the consumer and
56 * user space is the producer.
57 *
58 * producer consumer
59 *
60 * if (LOAD ->consumer) { (A) LOAD.acq ->producer (C)
61 * STORE $data LOAD $data
62 * STORE.rel ->producer (B) STORE.rel ->consumer (D)
63 * }
64 *
65 * (A) pairs with (D), and (B) pairs with (C).
66 *
67 * Starting with (B), it protects the data from being written after
68 * the producer pointer. If this barrier was missing, the consumer
69 * could observe the producer pointer being set and thus load the data
70 * before the producer has written the new data. The consumer would in
71 * this case load the old data.
72 *
73 * (C) protects the consumer from speculatively loading the data before
74 * the producer pointer actually has been read. If we do not have this
75 * barrier, some architectures could load old data as speculative loads
76 * are not discarded as the CPU does not know there is a dependency
77 * between ->producer and data.
78 *
79 * (A) is a control dependency that separates the load of ->consumer
80 * from the stores of $data. In case ->consumer indicates there is no
81 * room in the buffer to store $data we do not. The dependency will
82 * order both of the stores after the loads. So no barrier is needed.
83 *
84 * (D) protects the load of the data to be observed to happen after the
85 * store of the consumer pointer. If we did not have this memory
86 * barrier, the producer could observe the consumer pointer being set
87 * and overwrite the data with a new value before the consumer got the
88 * chance to read the old value. The consumer would thus miss reading
89 * the old entry and very likely read the new entry twice, once right
90 * now and again after circling through the ring.
91 */
92
93/* The operations on the rings are the following:
94 *
95 * producer consumer
96 *
97 * RESERVE entries PEEK in the ring for entries
98 * WRITE data into the ring READ data from the ring
99 * SUBMIT entries RELEASE entries
100 *
101 * The producer reserves one or more entries in the ring. It can then
102 * fill in these entries and finally submit them so that they can be
103 * seen and read by the consumer.
104 *
105 * The consumer peeks into the ring to see if the producer has written
106 * any new entries. If so, the consumer can then read these entries
107 * and when it is done reading them release them back to the producer
108 * so that the producer can use these slots to fill in new entries.
109 *
110 * The function names below reflect these operations.
111 */
112
113/* Functions that read and validate content from consumer rings. */
114
115static inline void __xskq_cons_read_addr_unchecked(struct xsk_queue *q, u32 cached_cons, u64 *addr)
116{
117 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
118 u32 idx = cached_cons & q->ring_mask;
119
120 *addr = ring->desc[idx];
121}
122
123static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr)
124{
125 if (q->cached_cons != q->cached_prod) {
126 __xskq_cons_read_addr_unchecked(q, q->cached_cons, addr);
127 return true;
128 }
129
130 return false;
131}
132
133static inline bool xp_aligned_validate_desc(struct xsk_buff_pool *pool,
134 struct xdp_desc *desc)
135{
136 u64 offset = desc->addr & (pool->chunk_size - 1);
137
138 if (offset + desc->len > pool->chunk_size)
139 return false;
140
141 if (desc->addr >= pool->addrs_cnt)
142 return false;
143
144 if (desc->options)
145 return false;
146 return true;
147}
148
149static inline bool xp_unaligned_validate_desc(struct xsk_buff_pool *pool,
150 struct xdp_desc *desc)
151{
152 u64 addr = xp_unaligned_add_offset_to_addr(desc->addr);
153
154 if (desc->len > pool->chunk_size)
155 return false;
156
157 if (addr >= pool->addrs_cnt || addr + desc->len > pool->addrs_cnt ||
158 xp_desc_crosses_non_contig_pg(pool, addr, desc->len))
159 return false;
160
161 if (desc->options)
162 return false;
163 return true;
164}
165
166static inline bool xp_validate_desc(struct xsk_buff_pool *pool,
167 struct xdp_desc *desc)
168{
169 return pool->unaligned ? xp_unaligned_validate_desc(pool, desc) :
170 xp_aligned_validate_desc(pool, desc);
171}
172
173static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
174 struct xdp_desc *d,
175 struct xsk_buff_pool *pool)
176{
177 if (!xp_validate_desc(pool, d)) {
178 q->invalid_descs++;
179 return false;
180 }
181 return true;
182}
183
184static inline bool xskq_cons_read_desc(struct xsk_queue *q,
185 struct xdp_desc *desc,
186 struct xsk_buff_pool *pool)
187{
188 while (q->cached_cons != q->cached_prod) {
189 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
190 u32 idx = q->cached_cons & q->ring_mask;
191
192 *desc = ring->desc[idx];
193 if (xskq_cons_is_valid_desc(q, desc, pool))
194 return true;
195
196 q->cached_cons++;
197 }
198
199 return false;
200}
201
202static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
203{
204 q->cached_cons += cnt;
205}
206
207static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
208 u32 max)
209{
210 u32 cached_cons = q->cached_cons, nb_entries = 0;
211 struct xdp_desc *descs = pool->tx_descs;
212
213 while (cached_cons != q->cached_prod && nb_entries < max) {
214 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
215 u32 idx = cached_cons & q->ring_mask;
216
217 descs[nb_entries] = ring->desc[idx];
218 if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) {
219 /* Skip the entry */
220 cached_cons++;
221 continue;
222 }
223
224 nb_entries++;
225 cached_cons++;
226 }
227
228 /* Release valid plus any invalid entries */
229 xskq_cons_release_n(q, cached_cons - q->cached_cons);
230 return nb_entries;
231}
232
233/* Functions for consumers */
234
235static inline void __xskq_cons_release(struct xsk_queue *q)
236{
237 smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
238}
239
240static inline void __xskq_cons_peek(struct xsk_queue *q)
241{
242 /* Refresh the local pointer */
243 q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */
244}
245
246static inline void xskq_cons_get_entries(struct xsk_queue *q)
247{
248 __xskq_cons_release(q);
249 __xskq_cons_peek(q);
250}
251
252static inline u32 xskq_cons_nb_entries(struct xsk_queue *q, u32 max)
253{
254 u32 entries = q->cached_prod - q->cached_cons;
255
256 if (entries >= max)
257 return max;
258
259 __xskq_cons_peek(q);
260 entries = q->cached_prod - q->cached_cons;
261
262 return entries >= max ? max : entries;
263}
264
265static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
266{
267 return xskq_cons_nb_entries(q, cnt) >= cnt;
268}
269
270static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr)
271{
272 if (q->cached_prod == q->cached_cons)
273 xskq_cons_get_entries(q);
274 return xskq_cons_read_addr_unchecked(q, addr);
275}
276
277static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
278 struct xdp_desc *desc,
279 struct xsk_buff_pool *pool)
280{
281 if (q->cached_prod == q->cached_cons)
282 xskq_cons_get_entries(q);
283 return xskq_cons_read_desc(q, desc, pool);
284}
285
286/* To improve performance in the xskq_cons_release functions, only update local state here.
287 * Reflect this to global state when we get new entries from the ring in
288 * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop.
289 */
290static inline void xskq_cons_release(struct xsk_queue *q)
291{
292 q->cached_cons++;
293}
294
295static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
296{
297 /* No barriers needed since data is not accessed */
298 return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer);
299}
300
301/* Functions for producers */
302
303static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
304{
305 u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
306
307 if (free_entries >= max)
308 return max;
309
310 /* Refresh the local tail pointer */
311 q->cached_cons = READ_ONCE(q->ring->consumer);
312 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
313
314 return free_entries >= max ? max : free_entries;
315}
316
317static inline bool xskq_prod_is_full(struct xsk_queue *q)
318{
319 return xskq_prod_nb_free(q, 1) ? false : true;
320}
321
322static inline void xskq_prod_cancel(struct xsk_queue *q)
323{
324 q->cached_prod--;
325}
326
327static inline int xskq_prod_reserve(struct xsk_queue *q)
328{
329 if (xskq_prod_is_full(q))
330 return -ENOSPC;
331
332 /* A, matches D */
333 q->cached_prod++;
334 return 0;
335}
336
337static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
338{
339 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
340
341 if (xskq_prod_is_full(q))
342 return -ENOSPC;
343
344 /* A, matches D */
345 ring->desc[q->cached_prod++ & q->ring_mask] = addr;
346 return 0;
347}
348
349static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
350 u32 nb_entries)
351{
352 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
353 u32 i, cached_prod;
354
355 /* A, matches D */
356 cached_prod = q->cached_prod;
357 for (i = 0; i < nb_entries; i++)
358 ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
359 q->cached_prod = cached_prod;
360}
361
362static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
363 u64 addr, u32 len)
364{
365 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
366 u32 idx;
367
368 if (xskq_prod_is_full(q))
369 return -ENOBUFS;
370
371 /* A, matches D */
372 idx = q->cached_prod++ & q->ring_mask;
373 ring->desc[idx].addr = addr;
374 ring->desc[idx].len = len;
375
376 return 0;
377}
378
379static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
380{
381 smp_store_release(&q->ring->producer, idx); /* B, matches C */
382}
383
384static inline void xskq_prod_submit(struct xsk_queue *q)
385{
386 __xskq_prod_submit(q, q->cached_prod);
387}
388
389static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
390{
391 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
392 u32 idx = q->ring->producer;
393
394 ring->desc[idx++ & q->ring_mask] = addr;
395
396 __xskq_prod_submit(q, idx);
397}
398
399static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
400{
401 __xskq_prod_submit(q, q->ring->producer + nb_entries);
402}
403
404static inline bool xskq_prod_is_empty(struct xsk_queue *q)
405{
406 /* No barriers needed since data is not accessed */
407 return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
408}
409
410/* For both producers and consumers */
411
412static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
413{
414 return q ? q->invalid_descs : 0;
415}
416
417static inline u64 xskq_nb_queue_empty_descs(struct xsk_queue *q)
418{
419 return q ? q->queue_empty_descs : 0;
420}
421
422struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
423void xskq_destroy(struct xsk_queue *q_ops);
424
425#endif /* _LINUX_XSK_QUEUE_H */