Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

xsk: Batched buffer allocation for the pool

Add a new driver interface xsk_buff_alloc_batch() offering batched
buffer allocations to improve performance. The new interface takes
three arguments: the buffer pool to allocated from, a pointer to an
array of struct xdp_buff pointers which will contain pointers to the
allocated xdp_buffs, and an unsigned integer specifying the max number
of buffers to allocate. The return value is the actual number of
buffers that the allocator managed to allocate and it will be in the
range 0 <= N <= max, where max is the third parameter to the function.

u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp,
u32 max);

A second driver interface is also introduced that need to be used in
conjunction with xsk_buff_alloc_batch(). It is a helper that sets the
size of struct xdp_buff and is used by the NIC Rx irq routine when
receiving a packet. This helper sets the three struct members data,
data_meta, and data_end. The two first ones is in the xsk_buff_alloc()
case set in the allocation routine and data_end is set when a packet
is received in the receive irq function. This unfortunately leads to
worse performance since the xdp_buff is touched twice with a long time
period in between leading to an extra cache miss. Instead, we fill out
the xdp_buff with all 3 fields at one single point in time in the
driver, when the size of the packet is known. Hence this helper. Note
that the driver has to use this helper (or set all three fields
itself) when using xsk_buff_alloc_batch(). xsk_buff_alloc() works as
before and does not require this.

void xsk_buff_set_size(struct xdp_buff *xdp, u32 size);

Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
Link: https://lore.kernel.org/bpf/20210922075613.12186-3-magnus.karlsson@gmail.com

authored by

Magnus Karlsson and committed by
Daniel Borkmann
47e4075d 10a5e009

+118 -4
+22
include/net/xdp_sock_drv.h
··· 77 77 return xp_alloc(pool); 78 78 } 79 79 80 + /* Returns as many entries as possible up to max. 0 <= N <= max. */ 81 + static inline u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max) 82 + { 83 + return xp_alloc_batch(pool, xdp, max); 84 + } 85 + 80 86 static inline bool xsk_buff_can_alloc(struct xsk_buff_pool *pool, u32 count) 81 87 { 82 88 return xp_can_alloc(pool, count); ··· 93 87 struct xdp_buff_xsk *xskb = container_of(xdp, struct xdp_buff_xsk, xdp); 94 88 95 89 xp_free(xskb); 90 + } 91 + 92 + static inline void xsk_buff_set_size(struct xdp_buff *xdp, u32 size) 93 + { 94 + xdp->data = xdp->data_hard_start + XDP_PACKET_HEADROOM; 95 + xdp->data_meta = xdp->data; 96 + xdp->data_end = xdp->data + size; 96 97 } 97 98 98 99 static inline dma_addr_t xsk_buff_raw_get_dma(struct xsk_buff_pool *pool, ··· 225 212 return NULL; 226 213 } 227 214 215 + static inline u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max) 216 + { 217 + return 0; 218 + } 219 + 228 220 static inline bool xsk_buff_can_alloc(struct xsk_buff_pool *pool, u32 count) 229 221 { 230 222 return false; 231 223 } 232 224 233 225 static inline void xsk_buff_free(struct xdp_buff *xdp) 226 + { 227 + } 228 + 229 + static inline void xsk_buff_set_size(struct xdp_buff *xdp, u32 size) 234 230 { 235 231 } 236 232
+1
include/net/xsk_buff_pool.h
··· 104 104 unsigned long attrs, struct page **pages, u32 nr_pages); 105 105 void xp_dma_unmap(struct xsk_buff_pool *pool, unsigned long attrs); 106 106 struct xdp_buff *xp_alloc(struct xsk_buff_pool *pool); 107 + u32 xp_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max); 107 108 bool xp_can_alloc(struct xsk_buff_pool *pool, u32 count); 108 109 void *xp_raw_get_data(struct xsk_buff_pool *pool, u64 addr); 109 110 dma_addr_t xp_raw_get_dma(struct xsk_buff_pool *pool, u64 addr);
+87
net/xdp/xsk_buff_pool.c
··· 507 507 } 508 508 EXPORT_SYMBOL(xp_alloc); 509 509 510 + static u32 xp_alloc_new_from_fq(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max) 511 + { 512 + u32 i, cached_cons, nb_entries; 513 + 514 + if (max > pool->free_heads_cnt) 515 + max = pool->free_heads_cnt; 516 + max = xskq_cons_nb_entries(pool->fq, max); 517 + 518 + cached_cons = pool->fq->cached_cons; 519 + nb_entries = max; 520 + i = max; 521 + while (i--) { 522 + struct xdp_buff_xsk *xskb; 523 + u64 addr; 524 + bool ok; 525 + 526 + __xskq_cons_read_addr_unchecked(pool->fq, cached_cons++, &addr); 527 + 528 + ok = pool->unaligned ? xp_check_unaligned(pool, &addr) : 529 + xp_check_aligned(pool, &addr); 530 + if (unlikely(!ok)) { 531 + pool->fq->invalid_descs++; 532 + nb_entries--; 533 + continue; 534 + } 535 + 536 + xskb = pool->free_heads[--pool->free_heads_cnt]; 537 + *xdp = &xskb->xdp; 538 + xskb->orig_addr = addr; 539 + xskb->xdp.data_hard_start = pool->addrs + addr + pool->headroom; 540 + xskb->frame_dma = (pool->dma_pages[addr >> PAGE_SHIFT] & 541 + ~XSK_NEXT_PG_CONTIG_MASK) + (addr & ~PAGE_MASK); 542 + xskb->dma = xskb->frame_dma + pool->headroom + XDP_PACKET_HEADROOM; 543 + xdp++; 544 + } 545 + 546 + xskq_cons_release_n(pool->fq, max); 547 + return nb_entries; 548 + } 549 + 550 + static u32 xp_alloc_reused(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 nb_entries) 551 + { 552 + struct xdp_buff_xsk *xskb; 553 + u32 i; 554 + 555 + nb_entries = min_t(u32, nb_entries, pool->free_list_cnt); 556 + 557 + i = nb_entries; 558 + while (i--) { 559 + xskb = list_first_entry(&pool->free_list, struct xdp_buff_xsk, free_list_node); 560 + list_del(&xskb->free_list_node); 561 + 562 + *xdp = &xskb->xdp; 563 + xdp++; 564 + } 565 + pool->free_list_cnt -= nb_entries; 566 + 567 + return nb_entries; 568 + } 569 + 570 + u32 xp_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max) 571 + { 572 + u32 nb_entries1 = 0, nb_entries2; 573 + 574 + if (unlikely(pool->dma_need_sync)) { 575 + /* Slow path */ 576 + *xdp = xp_alloc(pool); 577 + return !!*xdp; 578 + } 579 + 580 + if (unlikely(pool->free_list_cnt)) { 581 + nb_entries1 = xp_alloc_reused(pool, xdp, max); 582 + if (nb_entries1 == max) 583 + return nb_entries1; 584 + 585 + max -= nb_entries1; 586 + xdp += nb_entries1; 587 + } 588 + 589 + nb_entries2 = xp_alloc_new_from_fq(pool, xdp, max); 590 + if (!nb_entries2) 591 + pool->fq->queue_empty_descs++; 592 + 593 + return nb_entries1 + nb_entries2; 594 + } 595 + EXPORT_SYMBOL(xp_alloc_batch); 596 + 510 597 bool xp_can_alloc(struct xsk_buff_pool *pool, u32 count) 511 598 { 512 599 if (pool->free_list_cnt >= count)
+8 -4
net/xdp/xsk_queue.h
··· 111 111 112 112 /* Functions that read and validate content from consumer rings. */ 113 113 114 - static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr) 114 + static inline void __xskq_cons_read_addr_unchecked(struct xsk_queue *q, u32 cached_cons, u64 *addr) 115 115 { 116 116 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 117 + u32 idx = cached_cons & q->ring_mask; 117 118 119 + *addr = ring->desc[idx]; 120 + } 121 + 122 + static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr) 123 + { 118 124 if (q->cached_cons != q->cached_prod) { 119 - u32 idx = q->cached_cons & q->ring_mask; 120 - 121 - *addr = ring->desc[idx]; 125 + __xskq_cons_read_addr_unchecked(q, q->cached_cons, addr); 122 126 return true; 123 127 } 124 128