Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

net: x25: Queue received packets in the drivers instead of per-CPU queues

X.25 Layer 3 (the Packet Layer) expects layer 2 to provide a reliable
datalink service such that no packets are reordered or dropped. And
X.25 Layer 2 (the LAPB layer) is indeed designed to provide such service.

However, this reliability is not preserved when a driver calls "netif_rx"
to deliver the received packets to layer 3, because "netif_rx" will put
the packets into per-CPU queues before they are delivered to layer 3.
If there are multiple CPUs, the order of the packets may not be preserved.
The per-CPU queues may also drop packets if there are too many.

Therefore, we should not call "netif_rx" to let it queue the packets.
Instead, we should use our own queue that won't reorder or drop packets.

This patch changes all X.25 drivers to use their own queues instead of
calling "netif_rx". The patch also documents this requirement in the
"x25-iface" documentation.

Cc: Martin Schiller <ms@dev.tdt.de>
Signed-off-by: Xie He <xie.he.0141@gmail.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Xie He and committed by
David S. Miller
514e1150 7d42e84e

+79 -63
+8 -55
Documentation/networking/x25-iface.rst
··· 70 70 LAPB parameters. To be defined. 71 71 72 72 73 + Requirements for the device driver 74 + ---------------------------------- 73 75 74 - Possible Problems 75 - ================= 76 + Packets should not be reordered or dropped when delivering between the 77 + Packet Layer and the device driver. 76 78 77 - (Henner Eisen, 2000-10-28) 78 - 79 - The X.25 packet layer protocol depends on a reliable datalink service. 80 - The LAPB protocol provides such reliable service. But this reliability 81 - is not preserved by the Linux network device driver interface: 82 - 83 - - With Linux 2.4.x (and above) SMP kernels, packet ordering is not 84 - preserved. Even if a device driver calls netif_rx(skb1) and later 85 - netif_rx(skb2), skb2 might be delivered to the network layer 86 - earlier that skb1. 87 - - Data passed upstream by means of netif_rx() might be dropped by the 88 - kernel if the backlog queue is congested. 89 - 90 - The X.25 packet layer protocol will detect this and reset the virtual 91 - call in question. But many upper layer protocols are not designed to 92 - handle such N-Reset events gracefully. And frequent N-Reset events 93 - will always degrade performance. 94 - 95 - Thus, driver authors should make netif_rx() as reliable as possible: 96 - 97 - SMP re-ordering will not occur if the driver's interrupt handler is 98 - always executed on the same CPU. Thus, 99 - 100 - - Driver authors should use irq affinity for the interrupt handler. 101 - 102 - The probability of packet loss due to backlog congestion can be 103 - reduced by the following measures or a combination thereof: 104 - 105 - (1) Drivers for kernel versions 2.4.x and above should always check the 106 - return value of netif_rx(). If it returns NET_RX_DROP, the 107 - driver's LAPB protocol must not confirm reception of the frame 108 - to the peer. 109 - This will reliably suppress packet loss. The LAPB protocol will 110 - automatically cause the peer to re-transmit the dropped packet 111 - later. 112 - The lapb module interface was modified to support this. Its 113 - data_indication() method should now transparently pass the 114 - netif_rx() return value to the (lapb module) caller. 115 - (2) Drivers for kernel versions 2.2.x should always check the global 116 - variable netdev_dropping when a new frame is received. The driver 117 - should only call netif_rx() if netdev_dropping is zero. Otherwise 118 - the driver should not confirm delivery of the frame and drop it. 119 - Alternatively, the driver can queue the frame internally and call 120 - netif_rx() later when netif_dropping is 0 again. In that case, delivery 121 - confirmation should also be deferred such that the internal queue 122 - cannot grow to much. 123 - This will not reliably avoid packet loss, but the probability 124 - of packet loss in netif_rx() path will be significantly reduced. 125 - (3) Additionally, driver authors might consider to support 126 - CONFIG_NET_HW_FLOWCONTROL. This allows the driver to be woken up 127 - when a previously congested backlog queue becomes empty again. 128 - The driver could uses this for flow-controlling the peer by means 129 - of the LAPB protocol's flow-control service. 79 + To avoid packets from being reordered or dropped when delivering from 80 + the device driver to the Packet Layer, the device driver should not 81 + call "netif_rx" to deliver the received packets. Instead, it should 82 + call "netif_receive_skb_core" from softirq context to deliver them.
+27 -3
drivers/net/wan/hdlc_x25.c
··· 25 25 x25_hdlc_proto settings; 26 26 bool up; 27 27 spinlock_t up_lock; /* Protects "up" */ 28 + struct sk_buff_head rx_queue; 29 + struct tasklet_struct rx_tasklet; 28 30 }; 29 31 30 32 static int x25_ioctl(struct net_device *dev, struct ifreq *ifr); ··· 36 34 return hdlc->state; 37 35 } 38 36 37 + static void x25_rx_queue_kick(struct tasklet_struct *t) 38 + { 39 + struct x25_state *x25st = from_tasklet(x25st, t, rx_tasklet); 40 + struct sk_buff *skb = skb_dequeue(&x25st->rx_queue); 41 + 42 + while (skb) { 43 + netif_receive_skb_core(skb); 44 + skb = skb_dequeue(&x25st->rx_queue); 45 + } 46 + } 47 + 39 48 /* These functions are callbacks called by LAPB layer */ 40 49 41 50 static void x25_connect_disconnect(struct net_device *dev, int reason, int code) 42 51 { 52 + struct x25_state *x25st = state(dev_to_hdlc(dev)); 43 53 struct sk_buff *skb; 44 54 unsigned char *ptr; 45 55 46 - if ((skb = dev_alloc_skb(1)) == NULL) { 56 + skb = __dev_alloc_skb(1, GFP_ATOMIC | __GFP_NOMEMALLOC); 57 + if (!skb) { 47 58 netdev_err(dev, "out of memory\n"); 48 59 return; 49 60 } ··· 65 50 *ptr = code; 66 51 67 52 skb->protocol = x25_type_trans(skb, dev); 68 - netif_rx(skb); 53 + 54 + skb_queue_tail(&x25st->rx_queue, skb); 55 + tasklet_schedule(&x25st->rx_tasklet); 69 56 } 70 57 71 58 ··· 88 71 89 72 static int x25_data_indication(struct net_device *dev, struct sk_buff *skb) 90 73 { 74 + struct x25_state *x25st = state(dev_to_hdlc(dev)); 91 75 unsigned char *ptr; 92 76 93 77 if (skb_cow(skb, 1)) { ··· 102 84 *ptr = X25_IFACE_DATA; 103 85 104 86 skb->protocol = x25_type_trans(skb, dev); 105 - return netif_rx(skb); 87 + 88 + skb_queue_tail(&x25st->rx_queue, skb); 89 + tasklet_schedule(&x25st->rx_tasklet); 90 + return NET_RX_SUCCESS; 106 91 } 107 92 108 93 ··· 244 223 spin_unlock_bh(&x25st->up_lock); 245 224 246 225 lapb_unregister(dev); 226 + tasklet_kill(&x25st->rx_tasklet); 247 227 } 248 228 249 229 ··· 360 338 memcpy(&state(hdlc)->settings, &new_settings, size); 361 339 state(hdlc)->up = false; 362 340 spin_lock_init(&state(hdlc)->up_lock); 341 + skb_queue_head_init(&state(hdlc)->rx_queue); 342 + tasklet_setup(&state(hdlc)->rx_tasklet, x25_rx_queue_kick); 363 343 364 344 /* There's no header_ops so hard_header_len should be 0. */ 365 345 dev->hard_header_len = 0;
+44 -5
drivers/net/wan/lapbether.c
··· 53 53 struct net_device *axdev; /* lapbeth device (lapb#) */ 54 54 bool up; 55 55 spinlock_t up_lock; /* Protects "up" */ 56 + struct sk_buff_head rx_queue; 57 + struct napi_struct napi; 56 58 }; 57 59 58 60 static LIST_HEAD(lapbeth_devices); ··· 84 82 } 85 83 86 84 /* ------------------------------------------------------------------------ */ 85 + 86 + static int lapbeth_napi_poll(struct napi_struct *napi, int budget) 87 + { 88 + struct lapbethdev *lapbeth = container_of(napi, struct lapbethdev, 89 + napi); 90 + struct sk_buff *skb; 91 + int processed = 0; 92 + 93 + for (; processed < budget; ++processed) { 94 + skb = skb_dequeue(&lapbeth->rx_queue); 95 + if (!skb) 96 + break; 97 + netif_receive_skb_core(skb); 98 + } 99 + 100 + if (processed < budget) 101 + napi_complete(napi); 102 + 103 + return processed; 104 + } 87 105 88 106 /* 89 107 * Receive a LAPB frame via an ethernet interface. ··· 157 135 158 136 static int lapbeth_data_indication(struct net_device *dev, struct sk_buff *skb) 159 137 { 138 + struct lapbethdev *lapbeth = netdev_priv(dev); 160 139 unsigned char *ptr; 161 140 162 141 if (skb_cow(skb, 1)) { ··· 171 148 *ptr = X25_IFACE_DATA; 172 149 173 150 skb->protocol = x25_type_trans(skb, dev); 174 - return netif_rx(skb); 151 + 152 + skb_queue_tail(&lapbeth->rx_queue, skb); 153 + napi_schedule(&lapbeth->napi); 154 + return NET_RX_SUCCESS; 175 155 } 176 156 177 157 /* ··· 259 233 260 234 static void lapbeth_connected(struct net_device *dev, int reason) 261 235 { 236 + struct lapbethdev *lapbeth = netdev_priv(dev); 262 237 unsigned char *ptr; 263 - struct sk_buff *skb = dev_alloc_skb(1); 238 + struct sk_buff *skb = __dev_alloc_skb(1, GFP_ATOMIC | __GFP_NOMEMALLOC); 264 239 265 240 if (!skb) { 266 241 pr_err("out of memory\n"); ··· 272 245 *ptr = X25_IFACE_CONNECT; 273 246 274 247 skb->protocol = x25_type_trans(skb, dev); 275 - netif_rx(skb); 248 + 249 + skb_queue_tail(&lapbeth->rx_queue, skb); 250 + napi_schedule(&lapbeth->napi); 276 251 } 277 252 278 253 static void lapbeth_disconnected(struct net_device *dev, int reason) 279 254 { 255 + struct lapbethdev *lapbeth = netdev_priv(dev); 280 256 unsigned char *ptr; 281 - struct sk_buff *skb = dev_alloc_skb(1); 257 + struct sk_buff *skb = __dev_alloc_skb(1, GFP_ATOMIC | __GFP_NOMEMALLOC); 282 258 283 259 if (!skb) { 284 260 pr_err("out of memory\n"); ··· 292 262 *ptr = X25_IFACE_DISCONNECT; 293 263 294 264 skb->protocol = x25_type_trans(skb, dev); 295 - netif_rx(skb); 265 + 266 + skb_queue_tail(&lapbeth->rx_queue, skb); 267 + napi_schedule(&lapbeth->napi); 296 268 } 297 269 298 270 /* ··· 325 293 struct lapbethdev *lapbeth = netdev_priv(dev); 326 294 int err; 327 295 296 + napi_enable(&lapbeth->napi); 297 + 328 298 if ((err = lapb_register(dev, &lapbeth_callbacks)) != LAPB_OK) { 329 299 pr_err("lapb_register error: %d\n", err); 330 300 return -ENODEV; ··· 350 316 351 317 if ((err = lapb_unregister(dev)) != LAPB_OK) 352 318 pr_err("lapb_unregister error: %d\n", err); 319 + 320 + napi_disable(&lapbeth->napi); 353 321 354 322 return 0; 355 323 } ··· 409 373 410 374 lapbeth->up = false; 411 375 spin_lock_init(&lapbeth->up_lock); 376 + 377 + skb_queue_head_init(&lapbeth->rx_queue); 378 + netif_napi_add(ndev, &lapbeth->napi, lapbeth_napi_poll, 16); 412 379 413 380 rc = -EIO; 414 381 if (register_netdevice(ndev))