Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'bulk-cpumap-redirect'

Jesper Dangaard Brouer says:

====================
This patchset utilize a number of different kernel bulk APIs for optimizing
the performance for the XDP cpumap redirect feature.

Benchmark details are available here:
https://github.com/xdp-project/xdp-project/blob/master/areas/cpumap/cpumap03-optimizations.org

Performance measurements can be considered micro benchmarks, as they measure
dropping packets at different stages in the network stack.
Summary based on above:

Baseline benchmarks
- baseline-redirect: UdpNoPorts: 3,180,074
- baseline-redirect: iptables-raw drop: 6,193,534

Patch1: bpf: cpumap use ptr_ring_consume_batched
- redirect: UdpNoPorts: 3,327,729
- redirect: iptables-raw drop: 6,321,540

Patch2: net: core: introduce build_skb_around
- redirect: UdpNoPorts: 3,221,303
- redirect: iptables-raw drop: 6,320,066

Patch3: bpf: cpumap do bulk allocation of SKBs
- redirect: UdpNoPorts: 3,290,563
- redirect: iptables-raw drop: 6,650,112

Patch4: bpf: cpumap memory prefetchw optimizations for struct page
- redirect: UdpNoPorts: 3,520,250
- redirect: iptables-raw drop: 7,649,604

In this V2 submission I have chosen drop the SKB-list patch using
netif_receive_skb_list() as it was not showing a performance improvement for
these micro benchmarks.
====================

Signed-off-by: Alexei Starovoitov <ast@kernel.org>

+91 -35
+2
include/linux/skbuff.h
··· 1042 1042 int node); 1043 1043 struct sk_buff *__build_skb(void *data, unsigned int frag_size); 1044 1044 struct sk_buff *build_skb(void *data, unsigned int frag_size); 1045 + struct sk_buff *build_skb_around(struct sk_buff *skb, 1046 + void *data, unsigned int frag_size); 1045 1047 1046 1048 /** 1047 1049 * alloc_skb - allocate a network buffer
+37 -16
kernel/bpf/cpumap.c
··· 160 160 } 161 161 162 162 static struct sk_buff *cpu_map_build_skb(struct bpf_cpu_map_entry *rcpu, 163 - struct xdp_frame *xdpf) 163 + struct xdp_frame *xdpf, 164 + struct sk_buff *skb) 164 165 { 165 166 unsigned int hard_start_headroom; 166 167 unsigned int frame_size; 167 168 void *pkt_data_start; 168 - struct sk_buff *skb; 169 169 170 170 /* Part of headroom was reserved to xdpf */ 171 171 hard_start_headroom = sizeof(struct xdp_frame) + xdpf->headroom; ··· 191 191 SKB_DATA_ALIGN(sizeof(struct skb_shared_info)); 192 192 193 193 pkt_data_start = xdpf->data - hard_start_headroom; 194 - skb = build_skb(pkt_data_start, frame_size); 195 - if (!skb) 194 + skb = build_skb_around(skb, pkt_data_start, frame_size); 195 + if (unlikely(!skb)) 196 196 return NULL; 197 197 198 198 skb_reserve(skb, hard_start_headroom); ··· 240 240 } 241 241 } 242 242 243 + #define CPUMAP_BATCH 8 244 + 243 245 static int cpu_map_kthread_run(void *data) 244 246 { 245 247 struct bpf_cpu_map_entry *rcpu = data; ··· 254 252 * kthread_stop signal until queue is empty. 255 253 */ 256 254 while (!kthread_should_stop() || !__ptr_ring_empty(rcpu->queue)) { 257 - unsigned int processed = 0, drops = 0, sched = 0; 258 - struct xdp_frame *xdpf; 255 + unsigned int drops = 0, sched = 0; 256 + void *frames[CPUMAP_BATCH]; 257 + void *skbs[CPUMAP_BATCH]; 258 + gfp_t gfp = __GFP_ZERO | GFP_ATOMIC; 259 + int i, n, m; 259 260 260 261 /* Release CPU reschedule checks */ 261 262 if (__ptr_ring_empty(rcpu->queue)) { ··· 274 269 sched = cond_resched(); 275 270 } 276 271 277 - /* Process packets in rcpu->queue */ 278 - local_bh_disable(); 279 272 /* 280 273 * The bpf_cpu_map_entry is single consumer, with this 281 274 * kthread CPU pinned. Lockless access to ptr_ring 282 275 * consume side valid as no-resize allowed of queue. 283 276 */ 284 - while ((xdpf = __ptr_ring_consume(rcpu->queue))) { 285 - struct sk_buff *skb; 277 + n = ptr_ring_consume_batched(rcpu->queue, frames, CPUMAP_BATCH); 278 + 279 + for (i = 0; i < n; i++) { 280 + void *f = frames[i]; 281 + struct page *page = virt_to_page(f); 282 + 283 + /* Bring struct page memory area to curr CPU. Read by 284 + * build_skb_around via page_is_pfmemalloc(), and when 285 + * freed written by page_frag_free call. 286 + */ 287 + prefetchw(page); 288 + } 289 + 290 + m = kmem_cache_alloc_bulk(skbuff_head_cache, gfp, n, skbs); 291 + if (unlikely(m == 0)) { 292 + for (i = 0; i < n; i++) 293 + skbs[i] = NULL; /* effect: xdp_return_frame */ 294 + drops = n; 295 + } 296 + 297 + local_bh_disable(); 298 + for (i = 0; i < n; i++) { 299 + struct xdp_frame *xdpf = frames[i]; 300 + struct sk_buff *skb = skbs[i]; 286 301 int ret; 287 302 288 - skb = cpu_map_build_skb(rcpu, xdpf); 303 + skb = cpu_map_build_skb(rcpu, xdpf, skb); 289 304 if (!skb) { 290 305 xdp_return_frame(xdpf); 291 306 continue; ··· 315 290 ret = netif_receive_skb_core(skb); 316 291 if (ret == NET_RX_DROP) 317 292 drops++; 318 - 319 - /* Limit BH-disable period */ 320 - if (++processed == 8) 321 - break; 322 293 } 323 294 /* Feedback loop via tracepoint */ 324 - trace_xdp_cpumap_kthread(rcpu->map_id, processed, drops, sched); 295 + trace_xdp_cpumap_kthread(rcpu->map_id, n, drops, sched); 325 296 326 297 local_bh_enable(); /* resched point, may call do_softirq() */ 327 298 }
+52 -19
net/core/skbuff.c
··· 258 258 } 259 259 EXPORT_SYMBOL(__alloc_skb); 260 260 261 + /* Caller must provide SKB that is memset cleared */ 262 + static struct sk_buff *__build_skb_around(struct sk_buff *skb, 263 + void *data, unsigned int frag_size) 264 + { 265 + struct skb_shared_info *shinfo; 266 + unsigned int size = frag_size ? : ksize(data); 267 + 268 + size -= SKB_DATA_ALIGN(sizeof(struct skb_shared_info)); 269 + 270 + /* Assumes caller memset cleared SKB */ 271 + skb->truesize = SKB_TRUESIZE(size); 272 + refcount_set(&skb->users, 1); 273 + skb->head = data; 274 + skb->data = data; 275 + skb_reset_tail_pointer(skb); 276 + skb->end = skb->tail + size; 277 + skb->mac_header = (typeof(skb->mac_header))~0U; 278 + skb->transport_header = (typeof(skb->transport_header))~0U; 279 + 280 + /* make sure we initialize shinfo sequentially */ 281 + shinfo = skb_shinfo(skb); 282 + memset(shinfo, 0, offsetof(struct skb_shared_info, dataref)); 283 + atomic_set(&shinfo->dataref, 1); 284 + 285 + return skb; 286 + } 287 + 261 288 /** 262 289 * __build_skb - build a network buffer 263 290 * @data: data buffer provided by caller ··· 306 279 */ 307 280 struct sk_buff *__build_skb(void *data, unsigned int frag_size) 308 281 { 309 - struct skb_shared_info *shinfo; 310 282 struct sk_buff *skb; 311 - unsigned int size = frag_size ? : ksize(data); 312 283 313 284 skb = kmem_cache_alloc(skbuff_head_cache, GFP_ATOMIC); 314 - if (!skb) 285 + if (unlikely(!skb)) 315 286 return NULL; 316 287 317 - size -= SKB_DATA_ALIGN(sizeof(struct skb_shared_info)); 318 - 319 288 memset(skb, 0, offsetof(struct sk_buff, tail)); 320 - skb->truesize = SKB_TRUESIZE(size); 321 - refcount_set(&skb->users, 1); 322 - skb->head = data; 323 - skb->data = data; 324 - skb_reset_tail_pointer(skb); 325 - skb->end = skb->tail + size; 326 - skb->mac_header = (typeof(skb->mac_header))~0U; 327 - skb->transport_header = (typeof(skb->transport_header))~0U; 328 289 329 - /* make sure we initialize shinfo sequentially */ 330 - shinfo = skb_shinfo(skb); 331 - memset(shinfo, 0, offsetof(struct skb_shared_info, dataref)); 332 - atomic_set(&shinfo->dataref, 1); 333 - 334 - return skb; 290 + return __build_skb_around(skb, data, frag_size); 335 291 } 336 292 337 293 /* build_skb() is wrapper over __build_skb(), that specifically ··· 334 324 return skb; 335 325 } 336 326 EXPORT_SYMBOL(build_skb); 327 + 328 + /** 329 + * build_skb_around - build a network buffer around provided skb 330 + * @skb: sk_buff provide by caller, must be memset cleared 331 + * @data: data buffer provided by caller 332 + * @frag_size: size of data, or 0 if head was kmalloced 333 + */ 334 + struct sk_buff *build_skb_around(struct sk_buff *skb, 335 + void *data, unsigned int frag_size) 336 + { 337 + if (unlikely(!skb)) 338 + return NULL; 339 + 340 + skb = __build_skb_around(skb, data, frag_size); 341 + 342 + if (skb && frag_size) { 343 + skb->head_frag = 1; 344 + if (page_is_pfmemalloc(virt_to_head_page(data))) 345 + skb->pfmemalloc = 1; 346 + } 347 + return skb; 348 + } 349 + EXPORT_SYMBOL(build_skb_around); 337 350 338 351 #define NAPI_SKB_CACHE_SIZE 64 339 352