Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'tipc-link-improvements'

Jon Maloy says:

====================
tipc: some link level code improvements

Extensive testing has revealed some weaknesses and non-optimal solutions
in the link level code.

This commit series addresses those issues.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>

+172 -120
+3 -5
net/tipc/bearer.c
··· 362 362 b_ptr->media->disable_media(b_ptr); 363 363 364 364 tipc_node_delete_links(net, b_ptr->identity); 365 + RCU_INIT_POINTER(b_ptr->media_ptr, NULL); 365 366 if (b_ptr->link_req) 366 367 tipc_disc_delete(b_ptr->link_req); 367 368 ··· 400 399 401 400 /* tipc_disable_l2_media - detach TIPC bearer from an L2 interface 402 401 * 403 - * Mark L2 bearer as inactive so that incoming buffers are thrown away, 404 - * then get worker thread to complete bearer cleanup. (Can't do cleanup 405 - * here because cleanup code needs to sleep and caller holds spinlocks.) 402 + * Mark L2 bearer as inactive so that incoming buffers are thrown away 406 403 */ 407 404 void tipc_disable_l2_media(struct tipc_bearer *b) 408 405 { 409 406 struct net_device *dev; 410 407 411 408 dev = (struct net_device *)rtnl_dereference(b->media_ptr); 412 - RCU_INIT_POINTER(b->media_ptr, NULL); 413 409 RCU_INIT_POINTER(dev->tipc_ptr, NULL); 414 410 synchronize_net(); 415 411 dev_put(dev); ··· 552 554 case NETDEV_CHANGE: 553 555 if (netif_carrier_ok(dev)) 554 556 break; 555 - case NETDEV_DOWN: 557 + case NETDEV_GOING_DOWN: 556 558 case NETDEV_CHANGEMTU: 557 559 tipc_reset_bearer(net, b_ptr); 558 560 break;
+102 -68
net/tipc/link.c
··· 120 120 return link_is_up(l); 121 121 } 122 122 123 + bool tipc_link_peer_is_down(struct tipc_link *l) 124 + { 125 + return l->state == LINK_PEER_RESET; 126 + } 127 + 123 128 bool tipc_link_is_reset(struct tipc_link *l) 124 129 { 125 130 return l->state & (LINK_RESET | LINK_FAILINGOVER | LINK_ESTABLISHING); 131 + } 132 + 133 + bool tipc_link_is_establishing(struct tipc_link *l) 134 + { 135 + return l->state == LINK_ESTABLISHING; 126 136 } 127 137 128 138 bool tipc_link_is_synching(struct tipc_link *l) ··· 331 321 switch (evt) { 332 322 case LINK_ESTABLISH_EVT: 333 323 l->state = LINK_ESTABLISHED; 334 - rc |= TIPC_LINK_UP_EVT; 335 324 break; 336 325 case LINK_FAILOVER_BEGIN_EVT: 337 326 l->state = LINK_FAILINGOVER; 338 327 break; 339 - case LINK_PEER_RESET_EVT: 340 328 case LINK_RESET_EVT: 329 + l->state = LINK_RESET; 330 + break; 341 331 case LINK_FAILURE_EVT: 332 + case LINK_PEER_RESET_EVT: 342 333 case LINK_SYNCH_BEGIN_EVT: 343 334 case LINK_FAILOVER_END_EVT: 344 335 break; ··· 589 578 590 579 void tipc_link_reset(struct tipc_link *l) 591 580 { 592 - tipc_link_fsm_evt(l, LINK_RESET_EVT); 593 - 594 581 /* Link is down, accept any session */ 595 582 l->peer_session = WILDCARD_SESSION; 596 583 ··· 962 953 case TIPC_HIGH_IMPORTANCE: 963 954 case TIPC_CRITICAL_IMPORTANCE: 964 955 case CONN_MANAGER: 965 - __skb_queue_tail(inputq, skb); 956 + skb_queue_tail(inputq, skb); 966 957 return true; 967 958 case NAME_DISTRIBUTOR: 968 959 node->bclink.recv_permitted = true; ··· 991 982 struct tipc_msg *hdr = buf_msg(skb); 992 983 struct sk_buff **reasm_skb = &l->reasm_buf; 993 984 struct sk_buff *iskb; 985 + struct sk_buff_head tmpq; 994 986 int usr = msg_user(hdr); 995 987 int rc = 0; 996 988 int pos = 0; ··· 1016 1006 } 1017 1007 1018 1008 if (usr == MSG_BUNDLER) { 1009 + skb_queue_head_init(&tmpq); 1019 1010 l->stats.recv_bundles++; 1020 1011 l->stats.recv_bundled += msg_msgcnt(hdr); 1021 1012 while (tipc_msg_extract(skb, &iskb, &pos)) 1022 - tipc_data_input(l, iskb, inputq); 1013 + tipc_data_input(l, iskb, &tmpq); 1014 + tipc_skb_queue_splice_tail(&tmpq, inputq); 1023 1015 return 0; 1024 1016 } else if (usr == MSG_FRAGMENTER) { 1025 1017 l->stats.recv_fragments++; ··· 1056 1044 return released; 1057 1045 } 1058 1046 1047 + /* tipc_link_build_ack_msg: prepare link acknowledge message for transmission 1048 + */ 1049 + void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) 1050 + { 1051 + l->rcv_unacked = 0; 1052 + l->stats.sent_acks++; 1053 + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1054 + } 1055 + 1056 + /* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message 1057 + */ 1058 + void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq) 1059 + { 1060 + int mtyp = RESET_MSG; 1061 + 1062 + if (l->state == LINK_ESTABLISHING) 1063 + mtyp = ACTIVATE_MSG; 1064 + 1065 + tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, xmitq); 1066 + } 1067 + 1068 + /* tipc_link_build_nack_msg: prepare link nack message for transmission 1069 + */ 1070 + static void tipc_link_build_nack_msg(struct tipc_link *l, 1071 + struct sk_buff_head *xmitq) 1072 + { 1073 + u32 def_cnt = ++l->stats.deferred_recv; 1074 + 1075 + if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1076 + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1077 + } 1078 + 1059 1079 /* tipc_link_rcv - process TIPC packets/messages arriving from off-node 1060 - * @link: the link that should handle the message 1080 + * @l: the link that should handle the message 1061 1081 * @skb: TIPC packet 1062 1082 * @xmitq: queue to place packets to be sent after this call 1063 1083 */ 1064 1084 int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, 1065 1085 struct sk_buff_head *xmitq) 1066 1086 { 1067 - struct sk_buff_head *arrvq = &l->deferdq; 1068 - struct sk_buff_head tmpq; 1087 + struct sk_buff_head *defq = &l->deferdq; 1069 1088 struct tipc_msg *hdr; 1070 - u16 seqno, rcv_nxt; 1089 + u16 seqno, rcv_nxt, win_lim; 1071 1090 int rc = 0; 1072 1091 1073 - __skb_queue_head_init(&tmpq); 1074 - 1075 - if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) { 1076 - if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV)) 1077 - tipc_link_build_proto_msg(l, STATE_MSG, 0, 1078 - 0, 0, 0, xmitq); 1079 - return rc; 1080 - } 1081 - 1082 - while ((skb = skb_peek(arrvq))) { 1092 + do { 1083 1093 hdr = buf_msg(skb); 1094 + seqno = msg_seqno(hdr); 1095 + rcv_nxt = l->rcv_nxt; 1096 + win_lim = rcv_nxt + TIPC_MAX_LINK_WIN; 1084 1097 1085 1098 /* Verify and update link state */ 1086 - if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) { 1087 - __skb_dequeue(arrvq); 1088 - rc = tipc_link_proto_rcv(l, skb, xmitq); 1089 - continue; 1090 - } 1099 + if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) 1100 + return tipc_link_proto_rcv(l, skb, xmitq); 1091 1101 1092 1102 if (unlikely(!link_is_up(l))) { 1093 - rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); 1094 - if (!link_is_up(l)) { 1095 - kfree_skb(__skb_dequeue(arrvq)); 1096 - goto exit; 1097 - } 1103 + if (l->state == LINK_ESTABLISHING) 1104 + rc = TIPC_LINK_UP_EVT; 1105 + goto drop; 1098 1106 } 1099 1107 1108 + /* Don't send probe at next timeout expiration */ 1100 1109 l->silent_intv_cnt = 0; 1110 + 1111 + /* Drop if outside receive window */ 1112 + if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) { 1113 + l->stats.duplicates++; 1114 + goto drop; 1115 + } 1101 1116 1102 1117 /* Forward queues and wake up waiting users */ 1103 1118 if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) { ··· 1133 1094 link_prepare_wakeup(l); 1134 1095 } 1135 1096 1136 - /* Defer reception if there is a gap in the sequence */ 1137 - seqno = msg_seqno(hdr); 1138 - rcv_nxt = l->rcv_nxt; 1139 - if (unlikely(less(rcv_nxt, seqno))) { 1140 - l->stats.deferred_recv++; 1141 - goto exit; 1097 + /* Defer delivery if sequence gap */ 1098 + if (unlikely(seqno != rcv_nxt)) { 1099 + __tipc_skb_queue_sorted(defq, seqno, skb); 1100 + tipc_link_build_nack_msg(l, xmitq); 1101 + break; 1142 1102 } 1143 1103 1144 - __skb_dequeue(arrvq); 1145 - 1146 - /* Drop if packet already received */ 1147 - if (unlikely(more(rcv_nxt, seqno))) { 1148 - l->stats.duplicates++; 1149 - kfree_skb(skb); 1150 - goto exit; 1151 - } 1152 - 1153 - /* Packet can be delivered */ 1104 + /* Deliver packet */ 1154 1105 l->rcv_nxt++; 1155 1106 l->stats.recv_info++; 1156 - if (unlikely(!tipc_data_input(l, skb, &tmpq))) 1157 - rc = tipc_link_input(l, skb, &tmpq); 1107 + if (!tipc_data_input(l, skb, l->inputq)) 1108 + rc = tipc_link_input(l, skb, l->inputq); 1109 + if (unlikely(rc)) 1110 + break; 1111 + if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1112 + tipc_link_build_ack_msg(l, xmitq); 1158 1113 1159 - /* Ack at regular intervals */ 1160 - if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { 1161 - l->rcv_unacked = 0; 1162 - l->stats.sent_acks++; 1163 - tipc_link_build_proto_msg(l, STATE_MSG, 1164 - 0, 0, 0, 0, xmitq); 1165 - } 1166 - } 1167 - exit: 1168 - tipc_skb_queue_splice_tail(&tmpq, l->inputq); 1114 + } while ((skb = __skb_dequeue(defq))); 1115 + 1116 + return rc; 1117 + drop: 1118 + kfree_skb(skb); 1169 1119 return rc; 1170 1120 } 1171 1121 ··· 1278 1250 } 1279 1251 1280 1252 /* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets 1281 - * with contents of the link's tranmsit and backlog queues. 1253 + * with contents of the link's transmit and backlog queues. 1282 1254 */ 1283 1255 void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, 1284 1256 int mtyp, struct sk_buff_head *xmitq) ··· 1359 1331 u16 peers_tol = msg_link_tolerance(hdr); 1360 1332 u16 peers_prio = msg_linkprio(hdr); 1361 1333 u16 rcv_nxt = l->rcv_nxt; 1334 + int mtyp = msg_type(hdr); 1362 1335 char *if_name; 1363 1336 int rc = 0; 1364 1337 ··· 1369 1340 if (link_own_addr(l) > msg_prevnode(hdr)) 1370 1341 l->net_plane = msg_net_plane(hdr); 1371 1342 1372 - switch (msg_type(hdr)) { 1343 + switch (mtyp) { 1373 1344 case RESET_MSG: 1374 1345 1375 1346 /* Ignore duplicate RESET with old session number */ ··· 1396 1367 if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI)) 1397 1368 l->priority = peers_prio; 1398 1369 1399 - if (msg_type(hdr) == RESET_MSG) { 1400 - rc |= tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); 1401 - } else if (!link_is_up(l)) { 1402 - tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); 1403 - rc |= tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); 1404 - } 1370 + /* ACTIVATE_MSG serves as PEER_RESET if link is already down */ 1371 + if ((mtyp == RESET_MSG) || !link_is_up(l)) 1372 + rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); 1373 + 1374 + /* ACTIVATE_MSG takes up link if it was already locally reset */ 1375 + if ((mtyp == ACTIVATE_MSG) && (l->state == LINK_ESTABLISHING)) 1376 + rc = TIPC_LINK_UP_EVT; 1377 + 1405 1378 l->peer_session = msg_session(hdr); 1406 1379 l->peer_bearer_id = msg_bearer_id(hdr); 1407 1380 if (l->mtu > msg_max_pkt(hdr)) ··· 1420 1389 l->stats.recv_states++; 1421 1390 if (msg_probe(hdr)) 1422 1391 l->stats.recv_probes++; 1423 - rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); 1424 - if (!link_is_up(l)) 1392 + 1393 + if (!link_is_up(l)) { 1394 + if (l->state == LINK_ESTABLISHING) 1395 + rc = TIPC_LINK_UP_EVT; 1425 1396 break; 1397 + } 1426 1398 1427 1399 /* Send NACK if peer has sent pkts we haven't received yet */ 1428 1400 if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l))
+4 -1
net/tipc/link.h
··· 185 185 } backlog[5]; 186 186 u16 snd_nxt; 187 187 u16 last_retransm; 188 - u32 window; 188 + u16 window; 189 189 u32 stale_count; 190 190 191 191 /* Reception */ ··· 213 213 int mtyp, struct sk_buff_head *xmitq); 214 214 void tipc_link_build_bcast_sync_msg(struct tipc_link *l, 215 215 struct sk_buff_head *xmitq); 216 + void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq); 216 217 int tipc_link_fsm_evt(struct tipc_link *l, int evt); 217 218 void tipc_link_reset_fragments(struct tipc_link *l_ptr); 218 219 bool tipc_link_is_up(struct tipc_link *l); 220 + bool tipc_link_peer_is_down(struct tipc_link *l); 219 221 bool tipc_link_is_reset(struct tipc_link *l); 222 + bool tipc_link_is_establishing(struct tipc_link *l); 220 223 bool tipc_link_is_synching(struct tipc_link *l); 221 224 bool tipc_link_is_failingover(struct tipc_link *l); 222 225 bool tipc_link_is_blocked(struct tipc_link *l);
+31
net/tipc/msg.c
··· 590 590 kfree_skb(head); 591 591 return NULL; 592 592 } 593 + 594 + /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 595 + * @list: list to be appended to 596 + * @seqno: sequence number of buffer to add 597 + * @skb: buffer to add 598 + */ 599 + void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 600 + struct sk_buff *skb) 601 + { 602 + struct sk_buff *_skb, *tmp; 603 + 604 + if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) { 605 + __skb_queue_head(list, skb); 606 + return; 607 + } 608 + 609 + if (more(seqno, buf_seqno(skb_peek_tail(list)))) { 610 + __skb_queue_tail(list, skb); 611 + return; 612 + } 613 + 614 + skb_queue_walk_safe(list, _skb, tmp) { 615 + if (more(seqno, buf_seqno(_skb))) 616 + continue; 617 + if (seqno == buf_seqno(_skb)) 618 + break; 619 + __skb_queue_before(list, _skb, skb); 620 + return; 621 + } 622 + kfree_skb(skb); 623 + }
+2 -32
net/tipc/msg.h
··· 790 790 int offset, int dsz, int mtu, struct sk_buff_head *list); 791 791 bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 792 792 struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 793 + void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 794 + struct sk_buff *skb); 793 795 794 796 static inline u16 buf_seqno(struct sk_buff *skb) 795 797 { ··· 862 860 } 863 861 spin_unlock_bh(&list->lock); 864 862 return skb; 865 - } 866 - 867 - /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 868 - * @list: list to be appended to 869 - * @skb: buffer to add 870 - * Returns true if queue should treated further, otherwise false 871 - */ 872 - static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list, 873 - struct sk_buff *skb) 874 - { 875 - struct sk_buff *_skb, *tmp; 876 - struct tipc_msg *hdr = buf_msg(skb); 877 - u16 seqno = msg_seqno(hdr); 878 - 879 - if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) { 880 - __skb_queue_head(list, skb); 881 - return true; 882 - } 883 - if (likely(less(seqno, buf_seqno(skb_peek(list))))) { 884 - __skb_queue_head(list, skb); 885 - return true; 886 - } 887 - if (!more(seqno, buf_seqno(skb_peek_tail(list)))) { 888 - skb_queue_walk_safe(list, _skb, tmp) { 889 - if (likely(less(seqno, buf_seqno(_skb)))) { 890 - __skb_queue_before(list, _skb, skb); 891 - return true; 892 - } 893 - } 894 - } 895 - __skb_queue_tail(list, skb); 896 - return false; 897 863 } 898 864 899 865 /* tipc_skb_queue_splice_tail - append an skb list to lock protected list
+30 -13
net/tipc/node.c
··· 317 317 struct tipc_link *ol = node_active_link(n, 0); 318 318 struct tipc_link *nl = n->links[bearer_id].link; 319 319 320 - if (!nl || !tipc_link_is_up(nl)) 320 + if (!nl) 321 + return; 322 + 323 + tipc_link_fsm_evt(nl, LINK_ESTABLISH_EVT); 324 + if (!tipc_link_is_up(nl)) 321 325 return; 322 326 323 327 n->working_links++; ··· 420 416 } 421 417 422 418 if (!tipc_node_is_up(n)) { 419 + if (tipc_link_peer_is_down(l)) 420 + tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); 421 + tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT); 422 + tipc_link_fsm_evt(l, LINK_RESET_EVT); 423 423 tipc_link_reset(l); 424 + tipc_link_build_reset_msg(l, xmitq); 425 + *maddr = &n->links[*bearer_id].maddr; 424 426 node_lost_contact(n, &le->inputq); 425 427 return; 426 428 } ··· 438 428 n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); 439 429 tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); 440 430 tipc_link_reset(l); 431 + tipc_link_fsm_evt(l, LINK_RESET_EVT); 441 432 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); 442 433 tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); 443 434 *maddr = &n->links[tnl->bearer_id].maddr; ··· 448 437 static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) 449 438 { 450 439 struct tipc_link_entry *le = &n->links[bearer_id]; 440 + struct tipc_link *l = le->link; 451 441 struct tipc_media_addr *maddr; 452 442 struct sk_buff_head xmitq; 443 + 444 + if (!l) 445 + return; 453 446 454 447 __skb_queue_head_init(&xmitq); 455 448 456 449 tipc_node_lock(n); 457 - __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); 458 - if (delete && le->link) { 459 - kfree(le->link); 460 - le->link = NULL; 461 - n->link_cnt--; 450 + if (!tipc_link_is_establishing(l)) { 451 + __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); 452 + if (delete) { 453 + kfree(l); 454 + le->link = NULL; 455 + n->link_cnt--; 456 + } 457 + } else { 458 + /* Defuse pending tipc_node_link_up() */ 459 + tipc_link_fsm_evt(l, LINK_RESET_EVT); 462 460 } 463 461 tipc_node_unlock(n); 464 - 465 462 tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); 466 463 tipc_sk_rcv(n->net, &le->inputq); 467 464 } ··· 586 567 goto exit; 587 568 } 588 569 tipc_link_reset(l); 570 + tipc_link_fsm_evt(l, LINK_RESET_EVT); 589 571 if (n->state == NODE_FAILINGOVER) 590 572 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); 591 573 le->link = l; ··· 599 579 memcpy(&le->maddr, maddr, sizeof(*maddr)); 600 580 exit: 601 581 tipc_node_unlock(n); 602 - if (reset) 582 + if (reset && !tipc_link_is_reset(l)) 603 583 tipc_node_link_down(n, b->identity, false); 604 584 tipc_node_put(n); 605 585 } ··· 706 686 break; 707 687 case SELF_ESTABL_CONTACT_EVT: 708 688 case PEER_LOST_CONTACT_EVT: 709 - break; 710 689 case NODE_SYNCH_END_EVT: 711 - case NODE_SYNCH_BEGIN_EVT: 712 690 case NODE_FAILOVER_BEGIN_EVT: 691 + break; 692 + case NODE_SYNCH_BEGIN_EVT: 713 693 case NODE_FAILOVER_END_EVT: 714 694 default: 715 695 goto illegal_evt; ··· 868 848 if (l) 869 849 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); 870 850 } 871 - 872 - /* Prevent re-contact with node until cleanup is done */ 873 - tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT); 874 851 875 852 /* Notify publications from this node */ 876 853 n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
-1
net/tipc/udp_media.c
··· 425 425 } 426 426 if (ub->ubsock) 427 427 sock_set_flag(ub->ubsock->sk, SOCK_DEAD); 428 - RCU_INIT_POINTER(b->media_ptr, NULL); 429 428 RCU_INIT_POINTER(ub->bearer, NULL); 430 429 431 430 /* sock_release need to be done outside of rtnl lock */