Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

tipc: use temporary, non-protected skb queue for bundle reception

Currently, when we extract small messages from a message bundle, or
when many messages have accumulated in the link arrival queue, those
messages are added one by one to the lock protected link input queue.
This may increase contention with the reader of that queue, in
the function tipc_sk_rcv().

This commit introduces a temporary, unprotected input queue in
tipc_link_rcv() for such cases. Only when the arrival queue has been
emptied, and the function is ready to return, does it splice the whole
temporary queue into the real input queue.

Tested-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Jon Paul Maloy and committed by
David S. Miller
9073fb8b 23d8335d

+19 -15
+19 -15
net/tipc/link.c
··· 111 111 static void link_reset_statistics(struct tipc_link *l_ptr); 112 112 static void link_print(struct tipc_link *l_ptr, const char *str); 113 113 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 114 - static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb); 115 - static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); 116 114 117 115 /* 118 116 * Simple non-static link routines (i.e. referenced outside this file) ··· 958 960 * Consumes buffer if message is of right type 959 961 * Node lock must be held 960 962 */ 961 - static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb) 963 + static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, 964 + struct sk_buff_head *inputq) 962 965 { 963 966 struct tipc_node *node = link->owner; 964 - struct tipc_msg *msg = buf_msg(skb); 965 967 966 - switch (msg_user(msg)) { 968 + switch (msg_user(buf_msg(skb))) { 967 969 case TIPC_LOW_IMPORTANCE: 968 970 case TIPC_MEDIUM_IMPORTANCE: 969 971 case TIPC_HIGH_IMPORTANCE: 970 972 case TIPC_CRITICAL_IMPORTANCE: 971 973 case CONN_MANAGER: 972 - skb_queue_tail(link->inputq, skb); 974 + __skb_queue_tail(inputq, skb); 973 975 return true; 974 976 case NAME_DISTRIBUTOR: 975 977 node->bclink.recv_permitted = true; ··· 991 993 * 992 994 * Consumes buffer 993 995 */ 994 - static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb) 996 + static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 997 + struct sk_buff_head *inputq) 995 998 { 996 999 struct tipc_node *node = l->owner; 997 1000 struct tipc_msg *hdr = buf_msg(skb); ··· 1015 1016 hdr = buf_msg(skb); 1016 1017 if (less(msg_seqno(hdr), l->drop_point)) 1017 1018 goto drop; 1018 - if (tipc_data_input(l, skb)) 1019 + if (tipc_data_input(l, skb, inputq)) 1019 1020 return rc; 1020 1021 usr = msg_user(hdr); 1021 1022 reasm_skb = &l->failover_reasm_skb; ··· 1025 1026 l->stats.recv_bundles++; 1026 1027 l->stats.recv_bundled += msg_msgcnt(hdr); 1027 1028 while (tipc_msg_extract(skb, &iskb, &pos)) 1028 - tipc_data_input(l, iskb); 1029 + tipc_data_input(l, iskb, inputq); 1029 1030 return 0; 1030 1031 } else if (usr == MSG_FRAGMENTER) { 1031 1032 l->stats.recv_fragments++; 1032 1033 if (tipc_buf_append(reasm_skb, &skb)) { 1033 1034 l->stats.recv_fragmented++; 1034 - tipc_data_input(l, skb); 1035 + tipc_data_input(l, skb, inputq); 1035 1036 } else if (!*reasm_skb) { 1036 1037 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 1037 1038 } ··· 1069 1070 struct sk_buff_head *xmitq) 1070 1071 { 1071 1072 struct sk_buff_head *arrvq = &l->deferdq; 1073 + struct sk_buff_head tmpq; 1072 1074 struct tipc_msg *hdr; 1073 1075 u16 seqno, rcv_nxt; 1074 1076 int rc = 0; 1077 + 1078 + __skb_queue_head_init(&tmpq); 1075 1079 1076 1080 if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) { 1077 1081 if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV)) ··· 1097 1095 rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); 1098 1096 if (!link_is_up(l)) { 1099 1097 kfree_skb(__skb_dequeue(arrvq)); 1100 - return rc; 1098 + goto exit; 1101 1099 } 1102 1100 } 1103 1101 ··· 1115 1113 rcv_nxt = l->rcv_nxt; 1116 1114 if (unlikely(less(rcv_nxt, seqno))) { 1117 1115 l->stats.deferred_recv++; 1118 - return rc; 1116 + goto exit; 1119 1117 } 1120 1118 1121 1119 __skb_dequeue(arrvq); ··· 1124 1122 if (unlikely(more(rcv_nxt, seqno))) { 1125 1123 l->stats.duplicates++; 1126 1124 kfree_skb(skb); 1127 - return rc; 1125 + goto exit; 1128 1126 } 1129 1127 1130 1128 /* Packet can be delivered */ 1131 1129 l->rcv_nxt++; 1132 1130 l->stats.recv_info++; 1133 - if (unlikely(!tipc_data_input(l, skb))) 1134 - rc = tipc_link_input(l, skb); 1131 + if (unlikely(!tipc_data_input(l, skb, &tmpq))) 1132 + rc = tipc_link_input(l, skb, &tmpq); 1135 1133 1136 1134 /* Ack at regular intervals */ 1137 1135 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { ··· 1141 1139 0, 0, 0, 0, xmitq); 1142 1140 } 1143 1141 } 1142 + exit: 1143 + tipc_skb_queue_splice_tail(&tmpq, l->inputq); 1144 1144 return rc; 1145 1145 } 1146 1146