Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

xdp: Add proper __rcu annotations to redirect map entries

XDP_REDIRECT works by a three-step process: the bpf_redirect() and
bpf_redirect_map() helpers will lookup the target of the redirect and store
it (along with some other metadata) in a per-CPU struct bpf_redirect_info.
Next, when the program returns the XDP_REDIRECT return code, the driver
will call xdp_do_redirect() which will use the information thus stored to
actually enqueue the frame into a bulk queue structure (that differs
slightly by map type, but shares the same principle). Finally, before
exiting its NAPI poll loop, the driver will call xdp_do_flush(), which will
flush all the different bulk queues, thus completing the redirect.

Pointers to the map entries will be kept around for this whole sequence of
steps, protected by RCU. However, there is no top-level rcu_read_lock() in
the core code; instead drivers add their own rcu_read_lock() around the XDP
portions of the code, but somewhat inconsistently as Martin discovered[0].
However, things still work because everything happens inside a single NAPI
poll sequence, which means it's between a pair of calls to
local_bh_disable()/local_bh_enable(). So Paul suggested[1] that we could
document this intention by using rcu_dereference_check() with
rcu_read_lock_bh_held() as a second parameter, thus allowing sparse and
lockdep to verify that everything is done correctly.

This patch does just that: we add an __rcu annotation to the map entry
pointers and remove the various comments explaining the NAPI poll assurance
strewn through devmap.c in favour of a longer explanation in filter.c. The
goal is to have one coherent documentation of the entire flow, and rely on
the RCU annotations as a "standard" way of communicating the flow in the
map code (which can additionally be understood by sparse and lockdep).

The RCU annotation replacements result in a fairly straight-forward
replacement where READ_ONCE() becomes rcu_dereference_check(), WRITE_ONCE()
becomes rcu_assign_pointer() and xchg() and cmpxchg() gets wrapped in the
proper constructs to cast the pointer back and forth between __rcu and
__kernel address space (for the benefit of sparse). The one complication is
that xskmap has a few constructions where double-pointers are passed back
and forth; these simply all gain __rcu annotations, and only the final
reference/dereference to the inner-most pointer gets changed.

With this, everything can be run through sparse without eliciting
complaints, and lockdep can verify correctness even without the use of
rcu_read_lock() in the drivers. Subsequent patches will clean these up from
the drivers.

[0] https://lore.kernel.org/bpf/20210415173551.7ma4slcbqeyiba2r@kafai-mbp.dhcp.thefacebook.com/
[1] https://lore.kernel.org/bpf/20210419165837.GA975577@paulmck-ThinkPad-P17-Gen-1/

Signed-off-by: Toke Høiland-Jørgensen <toke@redhat.com>
Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
Link: https://lore.kernel.org/bpf/20210624160609.292325-6-toke@redhat.com

authored by

Toke Høiland-Jørgensen and committed by
Daniel Borkmann
782347b6 694cea39

+83 -54
+3 -5
include/linux/filter.h
··· 763 763 static __always_inline u32 bpf_prog_run_xdp(const struct bpf_prog *prog, 764 764 struct xdp_buff *xdp) 765 765 { 766 - /* Caller needs to hold rcu_read_lock() (!), otherwise program 767 - * can be released while still running, or map elements could be 768 - * freed early while still having concurrent users. XDP fastpath 769 - * already takes rcu_read_lock() when fetching the program, so 770 - * it's not necessary here anymore. 766 + /* Driver XDP hooks are invoked within a single NAPI poll cycle and thus 767 + * under local_bh_disable(), which provides the needed RCU protection 768 + * for accessing map entries. 771 769 */ 772 770 return __BPF_PROG_RUN(prog, xdp, BPF_DISPATCHER_FUNC(xdp)); 773 771 }
+1 -1
include/net/xdp_sock.h
··· 37 37 struct xsk_map { 38 38 struct bpf_map map; 39 39 spinlock_t lock; /* Synchronize map updates */ 40 - struct xdp_sock *xsk_map[]; 40 + struct xdp_sock __rcu *xsk_map[]; 41 41 }; 42 42 43 43 struct xdp_sock {
+9 -4
kernel/bpf/cpumap.c
··· 74 74 struct bpf_cpu_map { 75 75 struct bpf_map map; 76 76 /* Below members specific for map type */ 77 - struct bpf_cpu_map_entry **cpu_map; 77 + struct bpf_cpu_map_entry __rcu **cpu_map; 78 78 }; 79 79 80 80 static DEFINE_PER_CPU(struct list_head, cpu_map_flush_list); ··· 469 469 { 470 470 struct bpf_cpu_map_entry *old_rcpu; 471 471 472 - old_rcpu = xchg(&cmap->cpu_map[key_cpu], rcpu); 472 + old_rcpu = unrcu_pointer(xchg(&cmap->cpu_map[key_cpu], RCU_INITIALIZER(rcpu))); 473 473 if (old_rcpu) { 474 474 call_rcu(&old_rcpu->rcu, __cpu_map_entry_free); 475 475 INIT_WORK(&old_rcpu->kthread_stop_wq, cpu_map_kthread_stop); ··· 551 551 for (i = 0; i < cmap->map.max_entries; i++) { 552 552 struct bpf_cpu_map_entry *rcpu; 553 553 554 - rcpu = READ_ONCE(cmap->cpu_map[i]); 554 + rcpu = rcu_dereference_raw(cmap->cpu_map[i]); 555 555 if (!rcpu) 556 556 continue; 557 557 ··· 562 562 kfree(cmap); 563 563 } 564 564 565 + /* Elements are kept alive by RCU; either by rcu_read_lock() (from syscall) or 566 + * by local_bh_disable() (from XDP calls inside NAPI). The 567 + * rcu_read_lock_bh_held() below makes lockdep accept both. 568 + */ 565 569 static void *__cpu_map_lookup_elem(struct bpf_map *map, u32 key) 566 570 { 567 571 struct bpf_cpu_map *cmap = container_of(map, struct bpf_cpu_map, map); ··· 574 570 if (key >= map->max_entries) 575 571 return NULL; 576 572 577 - rcpu = READ_ONCE(cmap->cpu_map[key]); 573 + rcpu = rcu_dereference_check(cmap->cpu_map[key], 574 + rcu_read_lock_bh_held()); 578 575 return rcpu; 579 576 } 580 577
+21 -28
kernel/bpf/devmap.c
··· 73 73 74 74 struct bpf_dtab { 75 75 struct bpf_map map; 76 - struct bpf_dtab_netdev **netdev_map; /* DEVMAP type only */ 76 + struct bpf_dtab_netdev __rcu **netdev_map; /* DEVMAP type only */ 77 77 struct list_head list; 78 78 79 79 /* these are only used for DEVMAP_HASH type maps */ ··· 226 226 for (i = 0; i < dtab->map.max_entries; i++) { 227 227 struct bpf_dtab_netdev *dev; 228 228 229 - dev = dtab->netdev_map[i]; 229 + dev = rcu_dereference_raw(dtab->netdev_map[i]); 230 230 if (!dev) 231 231 continue; 232 232 ··· 259 259 return 0; 260 260 } 261 261 262 + /* Elements are kept alive by RCU; either by rcu_read_lock() (from syscall) or 263 + * by local_bh_disable() (from XDP calls inside NAPI). The 264 + * rcu_read_lock_bh_held() below makes lockdep accept both. 265 + */ 262 266 static void *__dev_map_hash_lookup_elem(struct bpf_map *map, u32 key) 263 267 { 264 268 struct bpf_dtab *dtab = container_of(map, struct bpf_dtab, map); ··· 414 410 trace_xdp_devmap_xmit(bq->dev_rx, dev, sent, cnt - sent, err); 415 411 } 416 412 417 - /* __dev_flush is called from xdp_do_flush() which _must_ be signaled 418 - * from the driver before returning from its napi->poll() routine. The poll() 419 - * routine is called either from busy_poll context or net_rx_action signaled 420 - * from NET_RX_SOFTIRQ. Either way the poll routine must complete before the 421 - * net device can be torn down. On devmap tear down we ensure the flush list 422 - * is empty before completing to ensure all flush operations have completed. 423 - * When drivers update the bpf program they may need to ensure any flush ops 424 - * are also complete. Using synchronize_rcu or call_rcu will suffice for this 425 - * because both wait for napi context to exit. 413 + /* __dev_flush is called from xdp_do_flush() which _must_ be signalled from the 414 + * driver before returning from its napi->poll() routine. See the comment above 415 + * xdp_do_flush() in filter.c. 426 416 */ 427 417 void __dev_flush(void) 428 418 { ··· 431 433 } 432 434 } 433 435 434 - /* rcu_read_lock (from syscall and BPF contexts) ensures that if a delete and/or 435 - * update happens in parallel here a dev_put won't happen until after reading 436 - * the ifindex. 436 + /* Elements are kept alive by RCU; either by rcu_read_lock() (from syscall) or 437 + * by local_bh_disable() (from XDP calls inside NAPI). The 438 + * rcu_read_lock_bh_held() below makes lockdep accept both. 437 439 */ 438 440 static void *__dev_map_lookup_elem(struct bpf_map *map, u32 key) 439 441 { ··· 443 445 if (key >= map->max_entries) 444 446 return NULL; 445 447 446 - obj = READ_ONCE(dtab->netdev_map[key]); 448 + obj = rcu_dereference_check(dtab->netdev_map[key], 449 + rcu_read_lock_bh_held()); 447 450 return obj; 448 451 } 449 452 450 - /* Runs under RCU-read-side, plus in softirq under NAPI protection. 451 - * Thus, safe percpu variable access. 453 + /* Runs in NAPI, i.e., softirq under local_bh_disable(). Thus, safe percpu 454 + * variable access, and map elements stick around. See comment above 455 + * xdp_do_flush() in filter.c. 452 456 */ 453 457 static void bq_enqueue(struct net_device *dev, struct xdp_frame *xdpf, 454 458 struct net_device *dev_rx, struct bpf_prog *xdp_prog) ··· 735 735 if (k >= map->max_entries) 736 736 return -EINVAL; 737 737 738 - /* Use call_rcu() here to ensure any rcu critical sections have 739 - * completed as well as any flush operations because call_rcu 740 - * will wait for preempt-disable region to complete, NAPI in this 741 - * context. And additionally, the driver tear down ensures all 742 - * soft irqs are complete before removing the net device in the 743 - * case of dev_put equals zero. 744 - */ 745 - old_dev = xchg(&dtab->netdev_map[k], NULL); 738 + old_dev = unrcu_pointer(xchg(&dtab->netdev_map[k], NULL)); 746 739 if (old_dev) 747 740 call_rcu(&old_dev->rcu, __dev_map_entry_free); 748 741 return 0; ··· 844 851 * Remembering the driver side flush operation will happen before the 845 852 * net device is removed. 846 853 */ 847 - old_dev = xchg(&dtab->netdev_map[i], dev); 854 + old_dev = unrcu_pointer(xchg(&dtab->netdev_map[i], RCU_INITIALIZER(dev))); 848 855 if (old_dev) 849 856 call_rcu(&old_dev->rcu, __dev_map_entry_free); 850 857 ··· 1024 1031 for (i = 0; i < dtab->map.max_entries; i++) { 1025 1032 struct bpf_dtab_netdev *dev, *odev; 1026 1033 1027 - dev = READ_ONCE(dtab->netdev_map[i]); 1034 + dev = rcu_dereference(dtab->netdev_map[i]); 1028 1035 if (!dev || netdev != dev->dev) 1029 1036 continue; 1030 - odev = cmpxchg(&dtab->netdev_map[i], dev, NULL); 1037 + odev = unrcu_pointer(cmpxchg(&dtab->netdev_map[i], RCU_INITIALIZER(dev), NULL)); 1031 1038 if (dev == odev) 1032 1039 call_rcu(&dev->rcu, 1033 1040 __dev_map_entry_free);
+28
net/core/filter.c
··· 3897 3897 .arg2_type = ARG_ANYTHING, 3898 3898 }; 3899 3899 3900 + /* XDP_REDIRECT works by a three-step process, implemented in the functions 3901 + * below: 3902 + * 3903 + * 1. The bpf_redirect() and bpf_redirect_map() helpers will lookup the target 3904 + * of the redirect and store it (along with some other metadata) in a per-CPU 3905 + * struct bpf_redirect_info. 3906 + * 3907 + * 2. When the program returns the XDP_REDIRECT return code, the driver will 3908 + * call xdp_do_redirect() which will use the information in struct 3909 + * bpf_redirect_info to actually enqueue the frame into a map type-specific 3910 + * bulk queue structure. 3911 + * 3912 + * 3. Before exiting its NAPI poll loop, the driver will call xdp_do_flush(), 3913 + * which will flush all the different bulk queues, thus completing the 3914 + * redirect. 3915 + * 3916 + * Pointers to the map entries will be kept around for this whole sequence of 3917 + * steps, protected by RCU. However, there is no top-level rcu_read_lock() in 3918 + * the core code; instead, the RCU protection relies on everything happening 3919 + * inside a single NAPI poll sequence, which means it's between a pair of calls 3920 + * to local_bh_disable()/local_bh_enable(). 3921 + * 3922 + * The map entries are marked as __rcu and the map code makes sure to 3923 + * dereference those pointers with rcu_dereference_check() in a way that works 3924 + * for both sections that to hold an rcu_read_lock() and sections that are 3925 + * called from NAPI without a separate rcu_read_lock(). The code below does not 3926 + * use RCU annotations, but relies on those in the map code. 3927 + */ 3900 3928 void xdp_do_flush(void) 3901 3929 { 3902 3930 __dev_flush();
+2 -2
net/xdp/xsk.c
··· 749 749 } 750 750 751 751 static struct xsk_map *xsk_get_map_list_entry(struct xdp_sock *xs, 752 - struct xdp_sock ***map_entry) 752 + struct xdp_sock __rcu ***map_entry) 753 753 { 754 754 struct xsk_map *map = NULL; 755 755 struct xsk_map_node *node; ··· 785 785 * might be updates to the map between 786 786 * xsk_get_map_list_entry() and xsk_map_try_sock_delete(). 787 787 */ 788 - struct xdp_sock **map_entry = NULL; 788 + struct xdp_sock __rcu **map_entry = NULL; 789 789 struct xsk_map *map; 790 790 791 791 while ((map = xsk_get_map_list_entry(xs, &map_entry))) {
+2 -2
net/xdp/xsk.h
··· 31 31 struct xsk_map_node { 32 32 struct list_head node; 33 33 struct xsk_map *map; 34 - struct xdp_sock **map_entry; 34 + struct xdp_sock __rcu **map_entry; 35 35 }; 36 36 37 37 static inline struct xdp_sock *xdp_sk(struct sock *sk) ··· 40 40 } 41 41 42 42 void xsk_map_try_sock_delete(struct xsk_map *map, struct xdp_sock *xs, 43 - struct xdp_sock **map_entry); 43 + struct xdp_sock __rcu **map_entry); 44 44 void xsk_clear_pool_at_qid(struct net_device *dev, u16 queue_id); 45 45 int xsk_reg_pool_at_qid(struct net_device *dev, struct xsk_buff_pool *pool, 46 46 u16 queue_id);
+17 -12
net/xdp/xskmap.c
··· 12 12 #include "xsk.h" 13 13 14 14 static struct xsk_map_node *xsk_map_node_alloc(struct xsk_map *map, 15 - struct xdp_sock **map_entry) 15 + struct xdp_sock __rcu **map_entry) 16 16 { 17 17 struct xsk_map_node *node; 18 18 ··· 42 42 } 43 43 44 44 static void xsk_map_sock_delete(struct xdp_sock *xs, 45 - struct xdp_sock **map_entry) 45 + struct xdp_sock __rcu **map_entry) 46 46 { 47 47 struct xsk_map_node *n, *tmp; 48 48 ··· 124 124 return insn - insn_buf; 125 125 } 126 126 127 + /* Elements are kept alive by RCU; either by rcu_read_lock() (from syscall) or 128 + * by local_bh_disable() (from XDP calls inside NAPI). The 129 + * rcu_read_lock_bh_held() below makes lockdep accept both. 130 + */ 127 131 static void *__xsk_map_lookup_elem(struct bpf_map *map, u32 key) 128 132 { 129 133 struct xsk_map *m = container_of(map, struct xsk_map, map); ··· 135 131 if (key >= map->max_entries) 136 132 return NULL; 137 133 138 - return READ_ONCE(m->xsk_map[key]); 134 + return rcu_dereference_check(m->xsk_map[key], rcu_read_lock_bh_held()); 139 135 } 140 136 141 137 static void *xsk_map_lookup_elem(struct bpf_map *map, void *key) 142 138 { 143 - WARN_ON_ONCE(!rcu_read_lock_held()); 144 139 return __xsk_map_lookup_elem(map, *(u32 *)key); 145 140 } 146 141 ··· 152 149 u64 map_flags) 153 150 { 154 151 struct xsk_map *m = container_of(map, struct xsk_map, map); 155 - struct xdp_sock *xs, *old_xs, **map_entry; 152 + struct xdp_sock __rcu **map_entry; 153 + struct xdp_sock *xs, *old_xs; 156 154 u32 i = *(u32 *)key, fd = *(u32 *)value; 157 155 struct xsk_map_node *node; 158 156 struct socket *sock; ··· 183 179 } 184 180 185 181 spin_lock_bh(&m->lock); 186 - old_xs = READ_ONCE(*map_entry); 182 + old_xs = rcu_dereference_protected(*map_entry, lockdep_is_held(&m->lock)); 187 183 if (old_xs == xs) { 188 184 err = 0; 189 185 goto out; ··· 195 191 goto out; 196 192 } 197 193 xsk_map_sock_add(xs, node); 198 - WRITE_ONCE(*map_entry, xs); 194 + rcu_assign_pointer(*map_entry, xs); 199 195 if (old_xs) 200 196 xsk_map_sock_delete(old_xs, map_entry); 201 197 spin_unlock_bh(&m->lock); ··· 212 208 static int xsk_map_delete_elem(struct bpf_map *map, void *key) 213 209 { 214 210 struct xsk_map *m = container_of(map, struct xsk_map, map); 215 - struct xdp_sock *old_xs, **map_entry; 211 + struct xdp_sock __rcu **map_entry; 212 + struct xdp_sock *old_xs; 216 213 int k = *(u32 *)key; 217 214 218 215 if (k >= map->max_entries) ··· 221 216 222 217 spin_lock_bh(&m->lock); 223 218 map_entry = &m->xsk_map[k]; 224 - old_xs = xchg(map_entry, NULL); 219 + old_xs = unrcu_pointer(xchg(map_entry, NULL)); 225 220 if (old_xs) 226 221 xsk_map_sock_delete(old_xs, map_entry); 227 222 spin_unlock_bh(&m->lock); ··· 236 231 } 237 232 238 233 void xsk_map_try_sock_delete(struct xsk_map *map, struct xdp_sock *xs, 239 - struct xdp_sock **map_entry) 234 + struct xdp_sock __rcu **map_entry) 240 235 { 241 236 spin_lock_bh(&map->lock); 242 - if (READ_ONCE(*map_entry) == xs) { 243 - WRITE_ONCE(*map_entry, NULL); 237 + if (rcu_access_pointer(*map_entry) == xs) { 238 + rcu_assign_pointer(*map_entry, NULL); 244 239 xsk_map_sock_delete(xs, map_entry); 245 240 } 246 241 spin_unlock_bh(&map->lock);