Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.18-rc7 1088 lines 26 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* Copyright(c) 2020 Intel Corporation. */ 3 4#define _GNU_SOURCE 5#include <poll.h> 6#include <pthread.h> 7#include <signal.h> 8#include <sched.h> 9#include <stdio.h> 10#include <stdlib.h> 11#include <string.h> 12#include <sys/mman.h> 13#include <sys/resource.h> 14#include <sys/socket.h> 15#include <sys/types.h> 16#include <time.h> 17#include <unistd.h> 18#include <getopt.h> 19#include <netinet/ether.h> 20#include <net/if.h> 21 22#include <linux/bpf.h> 23#include <linux/if_link.h> 24#include <linux/if_xdp.h> 25 26#include <bpf/libbpf.h> 27#include <bpf/xsk.h> 28#include <bpf/bpf.h> 29 30/* libbpf APIs for AF_XDP are deprecated starting from v0.7 */ 31#pragma GCC diagnostic ignored "-Wdeprecated-declarations" 32 33#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) 34 35typedef __u64 u64; 36typedef __u32 u32; 37typedef __u16 u16; 38typedef __u8 u8; 39 40/* This program illustrates the packet forwarding between multiple AF_XDP 41 * sockets in multi-threaded environment. All threads are sharing a common 42 * buffer pool, with each socket having its own private buffer cache. 43 * 44 * Example 1: Single thread handling two sockets. The packets received by socket 45 * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue 46 * QB), while the packets received by socket B are forwarded to socket A. The 47 * thread is running on CPU core X: 48 * 49 * ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X 50 * 51 * Example 2: Two threads, each handling two sockets. The thread running on CPU 52 * core X forwards all the packets received by socket A to socket B, and all the 53 * packets received by socket B to socket A. The thread running on CPU core Y is 54 * performing the same packet forwarding between sockets C and D: 55 * 56 * ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD 57 * -c CX -c CY 58 */ 59 60/* 61 * Buffer pool and buffer cache 62 * 63 * For packet forwarding, the packet buffers are typically allocated from the 64 * pool for packet reception and freed back to the pool for further reuse once 65 * the packet transmission is completed. 66 * 67 * The buffer pool is shared between multiple threads. In order to minimize the 68 * access latency to the shared buffer pool, each thread creates one (or 69 * several) buffer caches, which, unlike the buffer pool, are private to the 70 * thread that creates them and therefore cannot be shared with other threads. 71 * The access to the shared pool is only needed either (A) when the cache gets 72 * empty due to repeated buffer allocations and it needs to be replenished from 73 * the pool, or (B) when the cache gets full due to repeated buffer free and it 74 * needs to be flushed back to the pull. 75 * 76 * In a packet forwarding system, a packet received on any input port can 77 * potentially be transmitted on any output port, depending on the forwarding 78 * configuration. For AF_XDP sockets, for this to work with zero-copy of the 79 * packet buffers when, it is required that the buffer pool memory fits into the 80 * UMEM area shared by all the sockets. 81 */ 82 83struct bpool_params { 84 u32 n_buffers; 85 u32 buffer_size; 86 int mmap_flags; 87 88 u32 n_users_max; 89 u32 n_buffers_per_slab; 90}; 91 92/* This buffer pool implementation organizes the buffers into equally sized 93 * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the 94 * pool that are completely filled with buffer pointers (full slabs). 95 * 96 * Each buffer cache has a slab for buffer allocation and a slab for buffer 97 * free, with both of these slabs initially empty. When the cache's allocation 98 * slab goes empty, it is swapped with one of the available full slabs from the 99 * pool, if any is available. When the cache's free slab goes full, it is 100 * swapped for one of the empty slabs from the pool, which is guaranteed to 101 * succeed. 102 * 103 * Partially filled slabs never get traded between the cache and the pool 104 * (except when the cache itself is destroyed), which enables fast operation 105 * through pointer swapping. 106 */ 107struct bpool { 108 struct bpool_params params; 109 pthread_mutex_t lock; 110 void *addr; 111 112 u64 **slabs; 113 u64 **slabs_reserved; 114 u64 *buffers; 115 u64 *buffers_reserved; 116 117 u64 n_slabs; 118 u64 n_slabs_reserved; 119 u64 n_buffers; 120 121 u64 n_slabs_available; 122 u64 n_slabs_reserved_available; 123 124 struct xsk_umem_config umem_cfg; 125 struct xsk_ring_prod umem_fq; 126 struct xsk_ring_cons umem_cq; 127 struct xsk_umem *umem; 128}; 129 130static struct bpool * 131bpool_init(struct bpool_params *params, 132 struct xsk_umem_config *umem_cfg) 133{ 134 struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY}; 135 u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved; 136 u64 slabs_size, slabs_reserved_size; 137 u64 buffers_size, buffers_reserved_size; 138 u64 total_size, i; 139 struct bpool *bp; 140 u8 *p; 141 int status; 142 143 /* mmap prep. */ 144 if (setrlimit(RLIMIT_MEMLOCK, &r)) 145 return NULL; 146 147 /* bpool internals dimensioning. */ 148 n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) / 149 params->n_buffers_per_slab; 150 n_slabs_reserved = params->n_users_max * 2; 151 n_buffers = n_slabs * params->n_buffers_per_slab; 152 n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab; 153 154 slabs_size = n_slabs * sizeof(u64 *); 155 slabs_reserved_size = n_slabs_reserved * sizeof(u64 *); 156 buffers_size = n_buffers * sizeof(u64); 157 buffers_reserved_size = n_buffers_reserved * sizeof(u64); 158 159 total_size = sizeof(struct bpool) + 160 slabs_size + slabs_reserved_size + 161 buffers_size + buffers_reserved_size; 162 163 /* bpool memory allocation. */ 164 p = calloc(total_size, sizeof(u8)); 165 if (!p) 166 return NULL; 167 168 /* bpool memory initialization. */ 169 bp = (struct bpool *)p; 170 memcpy(&bp->params, params, sizeof(*params)); 171 bp->params.n_buffers = n_buffers; 172 173 bp->slabs = (u64 **)&p[sizeof(struct bpool)]; 174 bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) + 175 slabs_size]; 176 bp->buffers = (u64 *)&p[sizeof(struct bpool) + 177 slabs_size + slabs_reserved_size]; 178 bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) + 179 slabs_size + slabs_reserved_size + buffers_size]; 180 181 bp->n_slabs = n_slabs; 182 bp->n_slabs_reserved = n_slabs_reserved; 183 bp->n_buffers = n_buffers; 184 185 for (i = 0; i < n_slabs; i++) 186 bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab]; 187 bp->n_slabs_available = n_slabs; 188 189 for (i = 0; i < n_slabs_reserved; i++) 190 bp->slabs_reserved[i] = &bp->buffers_reserved[i * 191 params->n_buffers_per_slab]; 192 bp->n_slabs_reserved_available = n_slabs_reserved; 193 194 for (i = 0; i < n_buffers; i++) 195 bp->buffers[i] = i * params->buffer_size; 196 197 /* lock. */ 198 status = pthread_mutex_init(&bp->lock, NULL); 199 if (status) { 200 free(p); 201 return NULL; 202 } 203 204 /* mmap. */ 205 bp->addr = mmap(NULL, 206 n_buffers * params->buffer_size, 207 PROT_READ | PROT_WRITE, 208 MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags, 209 -1, 210 0); 211 if (bp->addr == MAP_FAILED) { 212 pthread_mutex_destroy(&bp->lock); 213 free(p); 214 return NULL; 215 } 216 217 /* umem. */ 218 status = xsk_umem__create(&bp->umem, 219 bp->addr, 220 bp->params.n_buffers * bp->params.buffer_size, 221 &bp->umem_fq, 222 &bp->umem_cq, 223 umem_cfg); 224 if (status) { 225 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size); 226 pthread_mutex_destroy(&bp->lock); 227 free(p); 228 return NULL; 229 } 230 memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg)); 231 232 return bp; 233} 234 235static void 236bpool_free(struct bpool *bp) 237{ 238 if (!bp) 239 return; 240 241 xsk_umem__delete(bp->umem); 242 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size); 243 pthread_mutex_destroy(&bp->lock); 244 free(bp); 245} 246 247struct bcache { 248 struct bpool *bp; 249 250 u64 *slab_cons; 251 u64 *slab_prod; 252 253 u64 n_buffers_cons; 254 u64 n_buffers_prod; 255}; 256 257static u32 258bcache_slab_size(struct bcache *bc) 259{ 260 struct bpool *bp = bc->bp; 261 262 return bp->params.n_buffers_per_slab; 263} 264 265static struct bcache * 266bcache_init(struct bpool *bp) 267{ 268 struct bcache *bc; 269 270 bc = calloc(1, sizeof(struct bcache)); 271 if (!bc) 272 return NULL; 273 274 bc->bp = bp; 275 bc->n_buffers_cons = 0; 276 bc->n_buffers_prod = 0; 277 278 pthread_mutex_lock(&bp->lock); 279 if (bp->n_slabs_reserved_available == 0) { 280 pthread_mutex_unlock(&bp->lock); 281 free(bc); 282 return NULL; 283 } 284 285 bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1]; 286 bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2]; 287 bp->n_slabs_reserved_available -= 2; 288 pthread_mutex_unlock(&bp->lock); 289 290 return bc; 291} 292 293static void 294bcache_free(struct bcache *bc) 295{ 296 struct bpool *bp; 297 298 if (!bc) 299 return; 300 301 /* In order to keep this example simple, the case of freeing any 302 * existing buffers from the cache back to the pool is ignored. 303 */ 304 305 bp = bc->bp; 306 pthread_mutex_lock(&bp->lock); 307 bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod; 308 bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons; 309 bp->n_slabs_reserved_available += 2; 310 pthread_mutex_unlock(&bp->lock); 311 312 free(bc); 313} 314 315/* To work correctly, the implementation requires that the *n_buffers* input 316 * argument is never greater than the buffer pool's *n_buffers_per_slab*. This 317 * is typically the case, with one exception taking place when large number of 318 * buffers are allocated at init time (e.g. for the UMEM fill queue setup). 319 */ 320static inline u32 321bcache_cons_check(struct bcache *bc, u32 n_buffers) 322{ 323 struct bpool *bp = bc->bp; 324 u64 n_buffers_per_slab = bp->params.n_buffers_per_slab; 325 u64 n_buffers_cons = bc->n_buffers_cons; 326 u64 n_slabs_available; 327 u64 *slab_full; 328 329 /* 330 * Consumer slab is not empty: Use what's available locally. Do not 331 * look for more buffers from the pool when the ask can only be 332 * partially satisfied. 333 */ 334 if (n_buffers_cons) 335 return (n_buffers_cons < n_buffers) ? 336 n_buffers_cons : 337 n_buffers; 338 339 /* 340 * Consumer slab is empty: look to trade the current consumer slab 341 * (full) for a full slab from the pool, if any is available. 342 */ 343 pthread_mutex_lock(&bp->lock); 344 n_slabs_available = bp->n_slabs_available; 345 if (!n_slabs_available) { 346 pthread_mutex_unlock(&bp->lock); 347 return 0; 348 } 349 350 n_slabs_available--; 351 slab_full = bp->slabs[n_slabs_available]; 352 bp->slabs[n_slabs_available] = bc->slab_cons; 353 bp->n_slabs_available = n_slabs_available; 354 pthread_mutex_unlock(&bp->lock); 355 356 bc->slab_cons = slab_full; 357 bc->n_buffers_cons = n_buffers_per_slab; 358 return n_buffers; 359} 360 361static inline u64 362bcache_cons(struct bcache *bc) 363{ 364 u64 n_buffers_cons = bc->n_buffers_cons - 1; 365 u64 buffer; 366 367 buffer = bc->slab_cons[n_buffers_cons]; 368 bc->n_buffers_cons = n_buffers_cons; 369 return buffer; 370} 371 372static inline void 373bcache_prod(struct bcache *bc, u64 buffer) 374{ 375 struct bpool *bp = bc->bp; 376 u64 n_buffers_per_slab = bp->params.n_buffers_per_slab; 377 u64 n_buffers_prod = bc->n_buffers_prod; 378 u64 n_slabs_available; 379 u64 *slab_empty; 380 381 /* 382 * Producer slab is not yet full: store the current buffer to it. 383 */ 384 if (n_buffers_prod < n_buffers_per_slab) { 385 bc->slab_prod[n_buffers_prod] = buffer; 386 bc->n_buffers_prod = n_buffers_prod + 1; 387 return; 388 } 389 390 /* 391 * Producer slab is full: trade the cache's current producer slab 392 * (full) for an empty slab from the pool, then store the current 393 * buffer to the new producer slab. As one full slab exists in the 394 * cache, it is guaranteed that there is at least one empty slab 395 * available in the pool. 396 */ 397 pthread_mutex_lock(&bp->lock); 398 n_slabs_available = bp->n_slabs_available; 399 slab_empty = bp->slabs[n_slabs_available]; 400 bp->slabs[n_slabs_available] = bc->slab_prod; 401 bp->n_slabs_available = n_slabs_available + 1; 402 pthread_mutex_unlock(&bp->lock); 403 404 slab_empty[0] = buffer; 405 bc->slab_prod = slab_empty; 406 bc->n_buffers_prod = 1; 407} 408 409/* 410 * Port 411 * 412 * Each of the forwarding ports sits on top of an AF_XDP socket. In order for 413 * packet forwarding to happen with no packet buffer copy, all the sockets need 414 * to share the same UMEM area, which is used as the buffer pool memory. 415 */ 416#ifndef MAX_BURST_RX 417#define MAX_BURST_RX 64 418#endif 419 420#ifndef MAX_BURST_TX 421#define MAX_BURST_TX 64 422#endif 423 424struct burst_rx { 425 u64 addr[MAX_BURST_RX]; 426 u32 len[MAX_BURST_RX]; 427}; 428 429struct burst_tx { 430 u64 addr[MAX_BURST_TX]; 431 u32 len[MAX_BURST_TX]; 432 u32 n_pkts; 433}; 434 435struct port_params { 436 struct xsk_socket_config xsk_cfg; 437 struct bpool *bp; 438 const char *iface; 439 u32 iface_queue; 440}; 441 442struct port { 443 struct port_params params; 444 445 struct bcache *bc; 446 447 struct xsk_ring_cons rxq; 448 struct xsk_ring_prod txq; 449 struct xsk_ring_prod umem_fq; 450 struct xsk_ring_cons umem_cq; 451 struct xsk_socket *xsk; 452 int umem_fq_initialized; 453 454 u64 n_pkts_rx; 455 u64 n_pkts_tx; 456}; 457 458static void 459port_free(struct port *p) 460{ 461 if (!p) 462 return; 463 464 /* To keep this example simple, the code to free the buffers from the 465 * socket's receive and transmit queues, as well as from the UMEM fill 466 * and completion queues, is not included. 467 */ 468 469 if (p->xsk) 470 xsk_socket__delete(p->xsk); 471 472 bcache_free(p->bc); 473 474 free(p); 475} 476 477static struct port * 478port_init(struct port_params *params) 479{ 480 struct port *p; 481 u32 umem_fq_size, pos = 0; 482 int status, i; 483 484 /* Memory allocation and initialization. */ 485 p = calloc(sizeof(struct port), 1); 486 if (!p) 487 return NULL; 488 489 memcpy(&p->params, params, sizeof(p->params)); 490 umem_fq_size = params->bp->umem_cfg.fill_size; 491 492 /* bcache. */ 493 p->bc = bcache_init(params->bp); 494 if (!p->bc || 495 (bcache_slab_size(p->bc) < umem_fq_size) || 496 (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) { 497 port_free(p); 498 return NULL; 499 } 500 501 /* xsk socket. */ 502 status = xsk_socket__create_shared(&p->xsk, 503 params->iface, 504 params->iface_queue, 505 params->bp->umem, 506 &p->rxq, 507 &p->txq, 508 &p->umem_fq, 509 &p->umem_cq, 510 &params->xsk_cfg); 511 if (status) { 512 port_free(p); 513 return NULL; 514 } 515 516 /* umem fq. */ 517 xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos); 518 519 for (i = 0; i < umem_fq_size; i++) 520 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = 521 bcache_cons(p->bc); 522 523 xsk_ring_prod__submit(&p->umem_fq, umem_fq_size); 524 p->umem_fq_initialized = 1; 525 526 return p; 527} 528 529static inline u32 530port_rx_burst(struct port *p, struct burst_rx *b) 531{ 532 u32 n_pkts, pos, i; 533 534 /* Free buffers for FQ replenish. */ 535 n_pkts = ARRAY_SIZE(b->addr); 536 537 n_pkts = bcache_cons_check(p->bc, n_pkts); 538 if (!n_pkts) 539 return 0; 540 541 /* RXQ. */ 542 n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos); 543 if (!n_pkts) { 544 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) { 545 struct pollfd pollfd = { 546 .fd = xsk_socket__fd(p->xsk), 547 .events = POLLIN, 548 }; 549 550 poll(&pollfd, 1, 0); 551 } 552 return 0; 553 } 554 555 for (i = 0; i < n_pkts; i++) { 556 b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr; 557 b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len; 558 } 559 560 xsk_ring_cons__release(&p->rxq, n_pkts); 561 p->n_pkts_rx += n_pkts; 562 563 /* UMEM FQ. */ 564 for ( ; ; ) { 565 int status; 566 567 status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos); 568 if (status == n_pkts) 569 break; 570 571 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) { 572 struct pollfd pollfd = { 573 .fd = xsk_socket__fd(p->xsk), 574 .events = POLLIN, 575 }; 576 577 poll(&pollfd, 1, 0); 578 } 579 } 580 581 for (i = 0; i < n_pkts; i++) 582 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = 583 bcache_cons(p->bc); 584 585 xsk_ring_prod__submit(&p->umem_fq, n_pkts); 586 587 return n_pkts; 588} 589 590static inline void 591port_tx_burst(struct port *p, struct burst_tx *b) 592{ 593 u32 n_pkts, pos, i; 594 int status; 595 596 /* UMEM CQ. */ 597 n_pkts = p->params.bp->umem_cfg.comp_size; 598 599 n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos); 600 601 for (i = 0; i < n_pkts; i++) { 602 u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i); 603 604 bcache_prod(p->bc, addr); 605 } 606 607 xsk_ring_cons__release(&p->umem_cq, n_pkts); 608 609 /* TXQ. */ 610 n_pkts = b->n_pkts; 611 612 for ( ; ; ) { 613 status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos); 614 if (status == n_pkts) 615 break; 616 617 if (xsk_ring_prod__needs_wakeup(&p->txq)) 618 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, 619 NULL, 0); 620 } 621 622 for (i = 0; i < n_pkts; i++) { 623 xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i]; 624 xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i]; 625 } 626 627 xsk_ring_prod__submit(&p->txq, n_pkts); 628 if (xsk_ring_prod__needs_wakeup(&p->txq)) 629 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0); 630 p->n_pkts_tx += n_pkts; 631} 632 633/* 634 * Thread 635 * 636 * Packet forwarding threads. 637 */ 638#ifndef MAX_PORTS_PER_THREAD 639#define MAX_PORTS_PER_THREAD 16 640#endif 641 642struct thread_data { 643 struct port *ports_rx[MAX_PORTS_PER_THREAD]; 644 struct port *ports_tx[MAX_PORTS_PER_THREAD]; 645 u32 n_ports_rx; 646 struct burst_rx burst_rx; 647 struct burst_tx burst_tx[MAX_PORTS_PER_THREAD]; 648 u32 cpu_core_id; 649 int quit; 650}; 651 652static void swap_mac_addresses(void *data) 653{ 654 struct ether_header *eth = (struct ether_header *)data; 655 struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost; 656 struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost; 657 struct ether_addr tmp; 658 659 tmp = *src_addr; 660 *src_addr = *dst_addr; 661 *dst_addr = tmp; 662} 663 664static void * 665thread_func(void *arg) 666{ 667 struct thread_data *t = arg; 668 cpu_set_t cpu_cores; 669 u32 i; 670 671 CPU_ZERO(&cpu_cores); 672 CPU_SET(t->cpu_core_id, &cpu_cores); 673 pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores); 674 675 for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) { 676 struct port *port_rx = t->ports_rx[i]; 677 struct port *port_tx = t->ports_tx[i]; 678 struct burst_rx *brx = &t->burst_rx; 679 struct burst_tx *btx = &t->burst_tx[i]; 680 u32 n_pkts, j; 681 682 /* RX. */ 683 n_pkts = port_rx_burst(port_rx, brx); 684 if (!n_pkts) 685 continue; 686 687 /* Process & TX. */ 688 for (j = 0; j < n_pkts; j++) { 689 u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]); 690 u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr, 691 addr); 692 693 swap_mac_addresses(pkt); 694 695 btx->addr[btx->n_pkts] = brx->addr[j]; 696 btx->len[btx->n_pkts] = brx->len[j]; 697 btx->n_pkts++; 698 699 if (btx->n_pkts == MAX_BURST_TX) { 700 port_tx_burst(port_tx, btx); 701 btx->n_pkts = 0; 702 } 703 } 704 } 705 706 return NULL; 707} 708 709/* 710 * Process 711 */ 712static const struct bpool_params bpool_params_default = { 713 .n_buffers = 64 * 1024, 714 .buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE, 715 .mmap_flags = 0, 716 717 .n_users_max = 16, 718 .n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, 719}; 720 721static const struct xsk_umem_config umem_cfg_default = { 722 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, 723 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 724 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE, 725 .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM, 726 .flags = 0, 727}; 728 729static const struct port_params port_params_default = { 730 .xsk_cfg = { 731 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 732 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, 733 .libbpf_flags = 0, 734 .xdp_flags = XDP_FLAGS_DRV_MODE, 735 .bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY, 736 }, 737 738 .bp = NULL, 739 .iface = NULL, 740 .iface_queue = 0, 741}; 742 743#ifndef MAX_PORTS 744#define MAX_PORTS 64 745#endif 746 747#ifndef MAX_THREADS 748#define MAX_THREADS 64 749#endif 750 751static struct bpool_params bpool_params; 752static struct xsk_umem_config umem_cfg; 753static struct bpool *bp; 754 755static struct port_params port_params[MAX_PORTS]; 756static struct port *ports[MAX_PORTS]; 757static u64 n_pkts_rx[MAX_PORTS]; 758static u64 n_pkts_tx[MAX_PORTS]; 759static int n_ports; 760 761static pthread_t threads[MAX_THREADS]; 762static struct thread_data thread_data[MAX_THREADS]; 763static int n_threads; 764 765static void 766print_usage(char *prog_name) 767{ 768 const char *usage = 769 "Usage:\n" 770 "\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n" 771 "\n" 772 "-c CORE CPU core to run a packet forwarding thread\n" 773 " on. May be invoked multiple times.\n" 774 "\n" 775 "-b SIZE Number of buffers in the buffer pool shared\n" 776 " by all the forwarding threads. Default: %u.\n" 777 "\n" 778 "-i INTERFACE Network interface. Each (INTERFACE, QUEUE)\n" 779 " pair specifies one forwarding port. May be\n" 780 " invoked multiple times.\n" 781 "\n" 782 "-q QUEUE Network interface queue for RX and TX. Each\n" 783 " (INTERFACE, QUEUE) pair specified one\n" 784 " forwarding port. Default: %u. May be invoked\n" 785 " multiple times.\n" 786 "\n"; 787 printf(usage, 788 prog_name, 789 bpool_params_default.n_buffers, 790 port_params_default.iface_queue); 791} 792 793static int 794parse_args(int argc, char **argv) 795{ 796 struct option lgopts[] = { 797 { NULL, 0, 0, 0 } 798 }; 799 int opt, option_index; 800 801 /* Parse the input arguments. */ 802 for ( ; ;) { 803 opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index); 804 if (opt == EOF) 805 break; 806 807 switch (opt) { 808 case 'b': 809 bpool_params.n_buffers = atoi(optarg); 810 break; 811 812 case 'c': 813 if (n_threads == MAX_THREADS) { 814 printf("Max number of threads (%d) reached.\n", 815 MAX_THREADS); 816 return -1; 817 } 818 819 thread_data[n_threads].cpu_core_id = atoi(optarg); 820 n_threads++; 821 break; 822 823 case 'i': 824 if (n_ports == MAX_PORTS) { 825 printf("Max number of ports (%d) reached.\n", 826 MAX_PORTS); 827 return -1; 828 } 829 830 port_params[n_ports].iface = optarg; 831 port_params[n_ports].iface_queue = 0; 832 n_ports++; 833 break; 834 835 case 'q': 836 if (n_ports == 0) { 837 printf("No port specified for queue.\n"); 838 return -1; 839 } 840 port_params[n_ports - 1].iface_queue = atoi(optarg); 841 break; 842 843 default: 844 printf("Illegal argument.\n"); 845 return -1; 846 } 847 } 848 849 optind = 1; /* reset getopt lib */ 850 851 /* Check the input arguments. */ 852 if (!n_ports) { 853 printf("No ports specified.\n"); 854 return -1; 855 } 856 857 if (!n_threads) { 858 printf("No threads specified.\n"); 859 return -1; 860 } 861 862 if (n_ports % n_threads) { 863 printf("Ports cannot be evenly distributed to threads.\n"); 864 return -1; 865 } 866 867 return 0; 868} 869 870static void 871print_port(u32 port_id) 872{ 873 struct port *port = ports[port_id]; 874 875 printf("Port %u: interface = %s, queue = %u\n", 876 port_id, port->params.iface, port->params.iface_queue); 877} 878 879static void 880print_thread(u32 thread_id) 881{ 882 struct thread_data *t = &thread_data[thread_id]; 883 u32 i; 884 885 printf("Thread %u (CPU core %u): ", 886 thread_id, t->cpu_core_id); 887 888 for (i = 0; i < t->n_ports_rx; i++) { 889 struct port *port_rx = t->ports_rx[i]; 890 struct port *port_tx = t->ports_tx[i]; 891 892 printf("(%s, %u) -> (%s, %u), ", 893 port_rx->params.iface, 894 port_rx->params.iface_queue, 895 port_tx->params.iface, 896 port_tx->params.iface_queue); 897 } 898 899 printf("\n"); 900} 901 902static void 903print_port_stats_separator(void) 904{ 905 printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n", 906 "----", 907 "------------", 908 "-------------", 909 "------------", 910 "-------------"); 911} 912 913static void 914print_port_stats_header(void) 915{ 916 print_port_stats_separator(); 917 printf("| %4s | %12s | %13s | %12s | %13s |\n", 918 "Port", 919 "RX packets", 920 "RX rate (pps)", 921 "TX packets", 922 "TX_rate (pps)"); 923 print_port_stats_separator(); 924} 925 926static void 927print_port_stats_trailer(void) 928{ 929 print_port_stats_separator(); 930 printf("\n"); 931} 932 933static void 934print_port_stats(int port_id, u64 ns_diff) 935{ 936 struct port *p = ports[port_id]; 937 double rx_pps, tx_pps; 938 939 rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff; 940 tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff; 941 942 printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n", 943 port_id, 944 p->n_pkts_rx, 945 rx_pps, 946 p->n_pkts_tx, 947 tx_pps); 948 949 n_pkts_rx[port_id] = p->n_pkts_rx; 950 n_pkts_tx[port_id] = p->n_pkts_tx; 951} 952 953static void 954print_port_stats_all(u64 ns_diff) 955{ 956 int i; 957 958 print_port_stats_header(); 959 for (i = 0; i < n_ports; i++) 960 print_port_stats(i, ns_diff); 961 print_port_stats_trailer(); 962} 963 964static int quit; 965 966static void 967signal_handler(int sig) 968{ 969 quit = 1; 970} 971 972static void remove_xdp_program(void) 973{ 974 int i; 975 976 for (i = 0 ; i < n_ports; i++) 977 bpf_xdp_detach(if_nametoindex(port_params[i].iface), 978 port_params[i].xsk_cfg.xdp_flags, NULL); 979} 980 981int main(int argc, char **argv) 982{ 983 struct timespec time; 984 u64 ns0; 985 int i; 986 987 /* Parse args. */ 988 memcpy(&bpool_params, &bpool_params_default, 989 sizeof(struct bpool_params)); 990 memcpy(&umem_cfg, &umem_cfg_default, 991 sizeof(struct xsk_umem_config)); 992 for (i = 0; i < MAX_PORTS; i++) 993 memcpy(&port_params[i], &port_params_default, 994 sizeof(struct port_params)); 995 996 if (parse_args(argc, argv)) { 997 print_usage(argv[0]); 998 return -1; 999 } 1000 1001 /* Buffer pool initialization. */ 1002 bp = bpool_init(&bpool_params, &umem_cfg); 1003 if (!bp) { 1004 printf("Buffer pool initialization failed.\n"); 1005 return -1; 1006 } 1007 printf("Buffer pool created successfully.\n"); 1008 1009 /* Ports initialization. */ 1010 for (i = 0; i < MAX_PORTS; i++) 1011 port_params[i].bp = bp; 1012 1013 for (i = 0; i < n_ports; i++) { 1014 ports[i] = port_init(&port_params[i]); 1015 if (!ports[i]) { 1016 printf("Port %d initialization failed.\n", i); 1017 return -1; 1018 } 1019 print_port(i); 1020 } 1021 printf("All ports created successfully.\n"); 1022 1023 /* Threads. */ 1024 for (i = 0; i < n_threads; i++) { 1025 struct thread_data *t = &thread_data[i]; 1026 u32 n_ports_per_thread = n_ports / n_threads, j; 1027 1028 for (j = 0; j < n_ports_per_thread; j++) { 1029 t->ports_rx[j] = ports[i * n_ports_per_thread + j]; 1030 t->ports_tx[j] = ports[i * n_ports_per_thread + 1031 (j + 1) % n_ports_per_thread]; 1032 } 1033 1034 t->n_ports_rx = n_ports_per_thread; 1035 1036 print_thread(i); 1037 } 1038 1039 for (i = 0; i < n_threads; i++) { 1040 int status; 1041 1042 status = pthread_create(&threads[i], 1043 NULL, 1044 thread_func, 1045 &thread_data[i]); 1046 if (status) { 1047 printf("Thread %d creation failed.\n", i); 1048 return -1; 1049 } 1050 } 1051 printf("All threads created successfully.\n"); 1052 1053 /* Print statistics. */ 1054 signal(SIGINT, signal_handler); 1055 signal(SIGTERM, signal_handler); 1056 signal(SIGABRT, signal_handler); 1057 1058 clock_gettime(CLOCK_MONOTONIC, &time); 1059 ns0 = time.tv_sec * 1000000000UL + time.tv_nsec; 1060 for ( ; !quit; ) { 1061 u64 ns1, ns_diff; 1062 1063 sleep(1); 1064 clock_gettime(CLOCK_MONOTONIC, &time); 1065 ns1 = time.tv_sec * 1000000000UL + time.tv_nsec; 1066 ns_diff = ns1 - ns0; 1067 ns0 = ns1; 1068 1069 print_port_stats_all(ns_diff); 1070 } 1071 1072 /* Threads completion. */ 1073 printf("Quit.\n"); 1074 for (i = 0; i < n_threads; i++) 1075 thread_data[i].quit = 1; 1076 1077 for (i = 0; i < n_threads; i++) 1078 pthread_join(threads[i], NULL); 1079 1080 for (i = 0; i < n_ports; i++) 1081 port_free(ports[i]); 1082 1083 bpool_free(bp); 1084 1085 remove_xdp_program(); 1086 1087 return 0; 1088}