Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

xsk: Introduce batched Tx descriptor interfaces

Introduce batched descriptor interfaces in the xsk core code for the
Tx path to be used in the driver to write a code path with higher
performance. This interface will be used by the i40e driver in the
next patch. Though other drivers would likely benefit from this new
interface too.

Note that batching is only implemented for the common case when
there is only one socket bound to the same device and queue id. When
this is not the case, we fall back to the old non-batched version of
the function.

Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
Acked-by: John Fastabend <john.fastabend@gmail.com>
Link: https://lore.kernel.org/bpf/1605525167-14450-5-git-send-email-magnus.karlsson@gmail.com

authored by

Magnus Karlsson and committed by
Daniel Borkmann
9349eb3a b8c7aece

+140 -13
+7
include/net/xdp_sock_drv.h
··· 13 13 14 14 void xsk_tx_completed(struct xsk_buff_pool *pool, u32 nb_entries); 15 15 bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc); 16 + u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc, u32 max); 16 17 void xsk_tx_release(struct xsk_buff_pool *pool); 17 18 struct xsk_buff_pool *xsk_get_pool_from_qid(struct net_device *dev, 18 19 u16 queue_id); ··· 127 126 struct xdp_desc *desc) 128 127 { 129 128 return false; 129 + } 130 + 131 + static inline u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc, 132 + u32 max) 133 + { 134 + return 0; 130 135 } 131 136 132 137 static inline void xsk_tx_release(struct xsk_buff_pool *pool)
+57
net/xdp/xsk.c
··· 332 332 } 333 333 EXPORT_SYMBOL(xsk_tx_peek_desc); 334 334 335 + static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, struct xdp_desc *descs, 336 + u32 max_entries) 337 + { 338 + u32 nb_pkts = 0; 339 + 340 + while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts])) 341 + nb_pkts++; 342 + 343 + xsk_tx_release(pool); 344 + return nb_pkts; 345 + } 346 + 347 + u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *descs, 348 + u32 max_entries) 349 + { 350 + struct xdp_sock *xs; 351 + u32 nb_pkts; 352 + 353 + rcu_read_lock(); 354 + if (!list_is_singular(&pool->xsk_tx_list)) { 355 + /* Fallback to the non-batched version */ 356 + rcu_read_unlock(); 357 + return xsk_tx_peek_release_fallback(pool, descs, max_entries); 358 + } 359 + 360 + xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list); 361 + if (!xs) { 362 + nb_pkts = 0; 363 + goto out; 364 + } 365 + 366 + nb_pkts = xskq_cons_peek_desc_batch(xs->tx, descs, pool, max_entries); 367 + if (!nb_pkts) { 368 + xs->tx->queue_empty_descs++; 369 + goto out; 370 + } 371 + 372 + /* This is the backpressure mechanism for the Tx path. Try to 373 + * reserve space in the completion queue for all packets, but 374 + * if there are fewer slots available, just process that many 375 + * packets. This avoids having to implement any buffering in 376 + * the Tx path. 377 + */ 378 + nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, descs, nb_pkts); 379 + if (!nb_pkts) 380 + goto out; 381 + 382 + xskq_cons_release_n(xs->tx, nb_pkts); 383 + __xskq_cons_release(xs->tx); 384 + xs->sk.sk_write_space(&xs->sk); 385 + 386 + out: 387 + rcu_read_unlock(); 388 + return nb_pkts; 389 + } 390 + EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch); 391 + 335 392 static int xsk_wakeup(struct xdp_sock *xs, u8 flags) 336 393 { 337 394 struct net_device *dev = xs->dev;
+76 -13
net/xdp/xsk_queue.h
··· 199 199 return false; 200 200 } 201 201 202 + static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, 203 + struct xdp_desc *descs, 204 + struct xsk_buff_pool *pool, u32 max) 205 + { 206 + u32 cached_cons = q->cached_cons, nb_entries = 0; 207 + 208 + while (cached_cons != q->cached_prod && nb_entries < max) { 209 + struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 210 + u32 idx = cached_cons & q->ring_mask; 211 + 212 + descs[nb_entries] = ring->desc[idx]; 213 + if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) { 214 + /* Skip the entry */ 215 + cached_cons++; 216 + continue; 217 + } 218 + 219 + nb_entries++; 220 + cached_cons++; 221 + } 222 + 223 + return nb_entries; 224 + } 225 + 202 226 /* Functions for consumers */ 203 227 204 228 static inline void __xskq_cons_release(struct xsk_queue *q) ··· 244 220 __xskq_cons_peek(q); 245 221 } 246 222 247 - static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt) 223 + static inline u32 xskq_cons_nb_entries(struct xsk_queue *q, u32 max) 248 224 { 249 225 u32 entries = q->cached_prod - q->cached_cons; 250 226 251 - if (entries >= cnt) 252 - return true; 227 + if (entries >= max) 228 + return max; 253 229 254 230 __xskq_cons_peek(q); 255 231 entries = q->cached_prod - q->cached_cons; 256 232 257 - return entries >= cnt; 233 + return entries >= max ? max : entries; 234 + } 235 + 236 + static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt) 237 + { 238 + return xskq_cons_nb_entries(q, cnt) >= cnt ? true : false; 258 239 } 259 240 260 241 static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr) ··· 278 249 return xskq_cons_read_desc(q, desc, pool); 279 250 } 280 251 252 + static inline u32 xskq_cons_peek_desc_batch(struct xsk_queue *q, struct xdp_desc *descs, 253 + struct xsk_buff_pool *pool, u32 max) 254 + { 255 + u32 entries = xskq_cons_nb_entries(q, max); 256 + 257 + return xskq_cons_read_desc_batch(q, descs, pool, entries); 258 + } 259 + 260 + /* To improve performance in the xskq_cons_release functions, only update local state here. 261 + * Reflect this to global state when we get new entries from the ring in 262 + * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop. 263 + */ 281 264 static inline void xskq_cons_release(struct xsk_queue *q) 282 265 { 283 - /* To improve performance, only update local state here. 284 - * Reflect this to global state when we get new entries 285 - * from the ring in xskq_cons_get_entries() and whenever 286 - * Rx or Tx processing are completed in the NAPI loop. 287 - */ 288 266 q->cached_cons++; 267 + } 268 + 269 + static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt) 270 + { 271 + q->cached_cons += cnt; 289 272 } 290 273 291 274 static inline bool xskq_cons_is_full(struct xsk_queue *q) ··· 309 268 310 269 /* Functions for producers */ 311 270 312 - static inline bool xskq_prod_is_full(struct xsk_queue *q) 271 + static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max) 313 272 { 314 273 u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons); 315 274 316 - if (free_entries) 317 - return false; 275 + if (free_entries >= max) 276 + return max; 318 277 319 278 /* Refresh the local tail pointer */ 320 279 q->cached_cons = READ_ONCE(q->ring->consumer); 321 280 free_entries = q->nentries - (q->cached_prod - q->cached_cons); 322 281 323 - return !free_entries; 282 + return free_entries >= max ? max : free_entries; 283 + } 284 + 285 + static inline bool xskq_prod_is_full(struct xsk_queue *q) 286 + { 287 + return xskq_prod_nb_free(q, 1) ? false : true; 324 288 } 325 289 326 290 static inline int xskq_prod_reserve(struct xsk_queue *q) ··· 348 302 /* A, matches D */ 349 303 ring->desc[q->cached_prod++ & q->ring_mask] = addr; 350 304 return 0; 305 + } 306 + 307 + static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs, 308 + u32 max) 309 + { 310 + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 311 + u32 nb_entries, i, cached_prod; 312 + 313 + nb_entries = xskq_prod_nb_free(q, max); 314 + 315 + /* A, matches D */ 316 + cached_prod = q->cached_prod; 317 + for (i = 0; i < nb_entries; i++) 318 + ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr; 319 + q->cached_prod = cached_prod; 320 + 321 + return nb_entries; 351 322 } 352 323 353 324 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,