Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

xsk: support ZC Tx multi-buffer in batch API

Modify xskq_cons_read_desc_batch() in a way that each processed
descriptor will be checked if it is an EOP one or not and act
accordingly to that.

Change the behavior of mentioned function to break the processing when
stumbling upon invalid descriptor instead of skipping it. Furthermore,
let us give only full packets down to ZC driver.
With these two assumptions ZC drivers will not have to take care of an
intermediate state of incomplete frames, which will simplify its
implementations a lot.

Last but not least, stop processing when count of frags would exceed
max supported segments on underlying device.

Signed-off-by: Maciej Fijalkowski <maciej.fijalkowski@intel.com>
Link: https://lore.kernel.org/r/20230719132421.584801-15-maciej.fijalkowski@intel.com
Signed-off-by: Alexei Starovoitov <ast@kernel.org>

authored by

Maciej Fijalkowski and committed by
Alexei Starovoitov
d5581966 1c9ba9c1

+37 -10
+37 -10
net/xdp/xsk_queue.h
··· 48 48 size_t ring_vmalloc_size; 49 49 }; 50 50 51 + struct parsed_desc { 52 + u32 mb; 53 + u32 valid; 54 + }; 55 + 51 56 /* The structure of the shared state of the rings are a simple 52 57 * circular buffer, as outlined in 53 58 * Documentation/core-api/circular-buffers.rst. For the Rx and ··· 223 218 q->cached_cons += cnt; 224 219 } 225 220 226 - static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, 227 - u32 max) 221 + static inline void parse_desc(struct xsk_queue *q, struct xsk_buff_pool *pool, 222 + struct xdp_desc *desc, struct parsed_desc *parsed) 223 + { 224 + parsed->valid = xskq_cons_is_valid_desc(q, desc, pool); 225 + parsed->mb = xp_mb_desc(desc); 226 + } 227 + 228 + static inline 229 + u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool, 230 + u32 max) 228 231 { 229 232 u32 cached_cons = q->cached_cons, nb_entries = 0; 230 233 struct xdp_desc *descs = pool->tx_descs; 234 + u32 total_descs = 0, nr_frags = 0; 231 235 236 + /* track first entry, if stumble upon *any* invalid descriptor, rewind 237 + * current packet that consists of frags and stop the processing 238 + */ 232 239 while (cached_cons != q->cached_prod && nb_entries < max) { 233 240 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 234 241 u32 idx = cached_cons & q->ring_mask; 242 + struct parsed_desc parsed; 235 243 236 244 descs[nb_entries] = ring->desc[idx]; 237 - if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) { 238 - /* Skip the entry */ 239 - cached_cons++; 240 - continue; 241 - } 242 - 243 - nb_entries++; 244 245 cached_cons++; 246 + parse_desc(q, pool, &descs[nb_entries], &parsed); 247 + if (unlikely(!parsed.valid)) 248 + break; 249 + 250 + if (likely(!parsed.mb)) { 251 + total_descs += (nr_frags + 1); 252 + nr_frags = 0; 253 + } else { 254 + nr_frags++; 255 + if (nr_frags == pool->netdev->xdp_zc_max_segs) { 256 + nr_frags = 0; 257 + break; 258 + } 259 + } 260 + nb_entries++; 245 261 } 246 262 263 + cached_cons -= nr_frags; 247 264 /* Release valid plus any invalid entries */ 248 265 xskq_cons_release_n(q, cached_cons - q->cached_cons); 249 - return nb_entries; 266 + return total_descs; 250 267 } 251 268 252 269 /* Functions for consumers */