Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: some improvements and fixes

We introduce a better algorithm for selecting when and which
users should be subject to link congestion control, plus clean
up some code for that mechanism.
Commit #3 fixes another rare race condition during packet reception.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>

+157 -97
+1 -1
net/tipc/bcast.c
··· 831 831 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); 832 832 if (!prop) 833 833 goto attr_msg_full; 834 - if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0])) 834 + if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window)) 835 835 goto prop_msg_full; 836 836 nla_nest_end(msg->skb, prop); 837 837
+130 -77
net/tipc/link.c
··· 139 139 kref_put(&l_ptr->ref, tipc_link_release); 140 140 } 141 141 142 + static struct tipc_link *tipc_parallel_link(struct tipc_link *l) 143 + { 144 + if (l->owner->active_links[0] != l) 145 + return l->owner->active_links[0]; 146 + return l->owner->active_links[1]; 147 + } 148 + 142 149 static void link_init_max_pkt(struct tipc_link *l_ptr) 143 150 { 144 151 struct tipc_node *node = l_ptr->owner; ··· 317 310 link_init_max_pkt(l_ptr); 318 311 l_ptr->priority = b_ptr->priority; 319 312 tipc_link_set_queue_limits(l_ptr, b_ptr->window); 320 - 321 313 l_ptr->next_out_no = 1; 322 314 __skb_queue_head_init(&l_ptr->transmq); 323 315 __skb_queue_head_init(&l_ptr->backlogq); ··· 374 368 } 375 369 376 370 /** 377 - * link_schedule_user - schedule user for wakeup after congestion 371 + * link_schedule_user - schedule a message sender for wakeup after congestion 378 372 * @link: congested link 379 - * @oport: sending port 380 - * @chain_sz: size of buffer chain that was attempted sent 381 - * @imp: importance of message attempted sent 373 + * @list: message that was attempted sent 382 374 * Create pseudo msg to send back to user when congestion abates 375 + * Only consumes message if there is an error 383 376 */ 384 - static bool link_schedule_user(struct tipc_link *link, u32 oport, 385 - uint chain_sz, uint imp) 377 + static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) 386 378 { 387 - struct sk_buff *buf; 379 + struct tipc_msg *msg = buf_msg(skb_peek(list)); 380 + int imp = msg_importance(msg); 381 + u32 oport = msg_origport(msg); 382 + u32 addr = link_own_addr(link); 383 + struct sk_buff *skb; 388 384 389 - buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, 390 - link_own_addr(link), link_own_addr(link), 391 - oport, 0, 0); 392 - if (!buf) 393 - return false; 394 - TIPC_SKB_CB(buf)->chain_sz = chain_sz; 395 - TIPC_SKB_CB(buf)->chain_imp = imp; 396 - skb_queue_tail(&link->wakeupq, buf); 385 + /* This really cannot happen... */ 386 + if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { 387 + pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 388 + tipc_link_reset(link); 389 + goto err; 390 + } 391 + /* Non-blocking sender: */ 392 + if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending) 393 + return -ELINKCONG; 394 + 395 + /* Create and schedule wakeup pseudo message */ 396 + skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, 397 + addr, addr, oport, 0, 0); 398 + if (!skb) 399 + goto err; 400 + TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); 401 + TIPC_SKB_CB(skb)->chain_imp = imp; 402 + skb_queue_tail(&link->wakeupq, skb); 397 403 link->stats.link_congs++; 398 - return true; 404 + return -ELINKCONG; 405 + err: 406 + __skb_queue_purge(list); 407 + return -ENOBUFS; 399 408 } 400 409 401 410 /** ··· 419 398 * Move a number of waiting users, as permitted by available space in 420 399 * the send queue, from link wait queue to node wait queue for wakeup 421 400 */ 422 - void link_prepare_wakeup(struct tipc_link *link) 401 + void link_prepare_wakeup(struct tipc_link *l) 423 402 { 424 - uint pend_qsz = skb_queue_len(&link->backlogq); 403 + int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; 404 + int imp, lim; 425 405 struct sk_buff *skb, *tmp; 426 406 427 - skb_queue_walk_safe(&link->wakeupq, skb, tmp) { 428 - if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) 407 + skb_queue_walk_safe(&l->wakeupq, skb, tmp) { 408 + imp = TIPC_SKB_CB(skb)->chain_imp; 409 + lim = l->window + l->backlog[imp].limit; 410 + pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; 411 + if ((pnd[imp] + l->backlog[imp].len) >= lim) 429 412 break; 430 - pend_qsz += TIPC_SKB_CB(skb)->chain_sz; 431 - skb_unlink(skb, &link->wakeupq); 432 - skb_queue_tail(&link->inputq, skb); 433 - link->owner->inputq = &link->inputq; 434 - link->owner->action_flags |= TIPC_MSG_EVT; 413 + skb_unlink(skb, &l->wakeupq); 414 + skb_queue_tail(&l->inputq, skb); 415 + l->owner->inputq = &l->inputq; 416 + l->owner->action_flags |= TIPC_MSG_EVT; 435 417 } 436 418 } 437 419 ··· 448 424 l_ptr->reasm_buf = NULL; 449 425 } 450 426 427 + static void tipc_link_purge_backlog(struct tipc_link *l) 428 + { 429 + __skb_queue_purge(&l->backlogq); 430 + l->backlog[TIPC_LOW_IMPORTANCE].len = 0; 431 + l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; 432 + l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; 433 + l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; 434 + l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; 435 + } 436 + 451 437 /** 452 438 * tipc_link_purge_queues - purge all pkt queues associated with link 453 439 * @l_ptr: pointer to link ··· 466 432 { 467 433 __skb_queue_purge(&l_ptr->deferdq); 468 434 __skb_queue_purge(&l_ptr->transmq); 469 - __skb_queue_purge(&l_ptr->backlogq); 435 + tipc_link_purge_backlog(l_ptr); 470 436 tipc_link_reset_fragments(l_ptr); 471 437 } 472 438 ··· 500 466 501 467 /* Clean up all queues, except inputq: */ 502 468 __skb_queue_purge(&l_ptr->transmq); 503 - __skb_queue_purge(&l_ptr->backlogq); 504 469 __skb_queue_purge(&l_ptr->deferdq); 505 470 if (!owner->inputq) 506 471 owner->inputq = &l_ptr->inputq; 507 472 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); 508 473 if (!skb_queue_empty(owner->inputq)) 509 474 owner->action_flags |= TIPC_MSG_EVT; 475 + tipc_link_purge_backlog(l_ptr); 510 476 l_ptr->rcv_unacked = 0; 511 477 l_ptr->checkpoint = 1; 512 478 l_ptr->next_out_no = 1; ··· 730 696 } 731 697 } 732 698 733 - /* tipc_link_cong: determine return value and how to treat the 734 - * sent buffer during link congestion. 735 - * - For plain, errorless user data messages we keep the buffer and 736 - * return -ELINKONG. 737 - * - For all other messages we discard the buffer and return -EHOSTUNREACH 738 - * - For TIPC internal messages we also reset the link 739 - */ 740 - static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list) 741 - { 742 - struct sk_buff *skb = skb_peek(list); 743 - struct tipc_msg *msg = buf_msg(skb); 744 - int imp = msg_importance(msg); 745 - u32 oport = msg_tot_origport(msg); 746 - 747 - if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { 748 - pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 749 - tipc_link_reset(link); 750 - goto drop; 751 - } 752 - if (unlikely(msg_errcode(msg))) 753 - goto drop; 754 - if (unlikely(msg_reroute_cnt(msg))) 755 - goto drop; 756 - if (TIPC_SKB_CB(skb)->wakeup_pending) 757 - return -ELINKCONG; 758 - if (link_schedule_user(link, oport, skb_queue_len(list), imp)) 759 - return -ELINKCONG; 760 - drop: 761 - __skb_queue_purge(list); 762 - return -EHOSTUNREACH; 763 - } 764 - 765 699 /** 766 700 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 767 701 * @link: link to use 768 702 * @list: chain of buffers containing message 769 703 * 770 - * Consumes the buffer chain, except when returning -ELINKCONG 771 - * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 772 - * user data messages) or -EHOSTUNREACH (all other messages/senders) 773 - * Only the socket functions tipc_send_stream() and tipc_send_packet() need 774 - * to act on the return value, since they may need to do more send attempts. 704 + * Consumes the buffer chain, except when returning -ELINKCONG, 705 + * since the caller then may want to make more send attempts. 706 + * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS 707 + * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted 775 708 */ 776 709 int __tipc_link_xmit(struct net *net, struct tipc_link *link, 777 710 struct sk_buff_head *list) ··· 755 754 struct sk_buff_head *backlogq = &link->backlogq; 756 755 struct sk_buff *skb, *tmp; 757 756 758 - /* Match queue limit against msg importance: */ 759 - if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp])) 760 - return tipc_link_cong(link, list); 757 + /* Match backlog limit against msg importance: */ 758 + if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit)) 759 + return link_schedule_user(link, list); 761 760 762 - /* Has valid packet limit been used ? */ 763 761 if (unlikely(msg_size(msg) > mtu)) { 764 762 __skb_queue_purge(list); 765 763 return -EMSGSIZE; 766 764 } 767 - 768 765 /* Prepare each packet for sending, and add to relevant queue: */ 769 766 skb_queue_walk_safe(list, skb, tmp) { 770 767 __skb_unlink(skb, list); ··· 785 786 if (tipc_msg_make_bundle(&skb, mtu, link->addr)) { 786 787 link->stats.sent_bundled++; 787 788 link->stats.sent_bundles++; 789 + imp = msg_importance(buf_msg(skb)); 788 790 } 789 791 __skb_queue_tail(backlogq, skb); 792 + link->backlog[imp].len++; 790 793 seqno++; 791 794 } 792 795 link->next_out_no = seqno; ··· 809 808 return __tipc_link_xmit(link->owner->net, link, &head); 810 809 } 811 810 811 + /* tipc_link_xmit_skb(): send single buffer to destination 812 + * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE 813 + * messages, which will not be rejected 814 + * The only exception is datagram messages rerouted after secondary 815 + * lookup, which are rare and safe to dispose of anyway. 816 + * TODO: Return real return value, and let callers use 817 + * tipc_wait_for_sendpkt() where applicable 818 + */ 812 819 int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, 813 820 u32 selector) 814 821 { 815 822 struct sk_buff_head head; 823 + int rc; 816 824 817 825 skb2list(skb, &head); 818 - return tipc_link_xmit(net, &head, dnode, selector); 826 + rc = tipc_link_xmit(net, &head, dnode, selector); 827 + if (rc == -ELINKCONG) 828 + kfree_skb(skb); 829 + return 0; 819 830 } 820 831 821 832 /** ··· 927 914 if (!skb) 928 915 break; 929 916 msg = buf_msg(skb); 917 + link->backlog[msg_importance(msg)].len--; 930 918 msg_set_ack(msg, ack); 931 919 msg_set_bcast_ack(msg, link->owner->bclink.last_in); 932 920 link->rcv_unacked = 0; ··· 1031 1017 retransmits--; 1032 1018 l_ptr->stats.retransmitted++; 1033 1019 } 1020 + } 1021 + 1022 + /* link_synch(): check if all packets arrived before the synch 1023 + * point have been consumed 1024 + * Returns true if the parallel links are synched, otherwise false 1025 + */ 1026 + static bool link_synch(struct tipc_link *l) 1027 + { 1028 + unsigned int post_synch; 1029 + struct tipc_link *pl; 1030 + 1031 + pl = tipc_parallel_link(l); 1032 + if (pl == l) 1033 + goto synched; 1034 + 1035 + /* Was last pre-synch packet added to input queue ? */ 1036 + if (less_eq(pl->next_in_no, l->synch_point)) 1037 + return false; 1038 + 1039 + /* Is it still in the input queue ? */ 1040 + post_synch = mod(pl->next_in_no - l->synch_point) - 1; 1041 + if (skb_queue_len(&pl->inputq) > post_synch) 1042 + return false; 1043 + synched: 1044 + l->flags &= ~LINK_SYNCHING; 1045 + return true; 1034 1046 } 1035 1047 1036 1048 static void link_retrieve_defq(struct tipc_link *link, ··· 1189 1149 skb = NULL; 1190 1150 goto unlock; 1191 1151 } 1152 + /* Synchronize with parallel link if applicable */ 1153 + if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) { 1154 + link_handle_out_of_seq_msg(l_ptr, skb); 1155 + if (link_synch(l_ptr)) 1156 + link_retrieve_defq(l_ptr, &head); 1157 + skb = NULL; 1158 + goto unlock; 1159 + } 1192 1160 l_ptr->next_in_no++; 1193 1161 if (unlikely(!skb_queue_empty(&l_ptr->deferdq))) 1194 1162 link_retrieve_defq(l_ptr, &head); ··· 1272 1224 1273 1225 switch (msg_user(msg)) { 1274 1226 case CHANGEOVER_PROTOCOL: 1227 + if (msg_dup(msg)) { 1228 + link->flags |= LINK_SYNCHING; 1229 + link->synch_point = msg_seqno(msg_get_wrapped(msg)); 1230 + } 1275 1231 if (!tipc_link_tunnel_rcv(node, &skb)) 1276 1232 break; 1277 1233 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { ··· 1662 1610 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1663 1611 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 1664 1612 skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); 1613 + tipc_link_purge_backlog(l_ptr); 1665 1614 msgcount = skb_queue_len(&l_ptr->transmq); 1666 1615 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1667 1616 msg_set_msgcnt(&tunnel_hdr, msgcount); ··· 1870 1817 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE); 1871 1818 1872 1819 l->window = win; 1873 - l->queue_limit[TIPC_LOW_IMPORTANCE] = win / 2; 1874 - l->queue_limit[TIPC_MEDIUM_IMPORTANCE] = win; 1875 - l->queue_limit[TIPC_HIGH_IMPORTANCE] = win / 2 * 3; 1876 - l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2; 1877 - l->queue_limit[TIPC_SYSTEM_IMPORTANCE] = max_bulk; 1820 + l->backlog[TIPC_LOW_IMPORTANCE].limit = win / 2; 1821 + l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = win; 1822 + l->backlog[TIPC_HIGH_IMPORTANCE].limit = win / 2 * 3; 1823 + l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2; 1824 + l->backlog[TIPC_SYSTEM_IMPORTANCE].limit = max_bulk; 1878 1825 } 1879 1826 1880 1827 /* tipc_link_find_owner - locate owner node of link by link's name ··· 2173 2120 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance)) 2174 2121 goto prop_msg_full; 2175 2122 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, 2176 - link->queue_limit[TIPC_LOW_IMPORTANCE])) 2123 + link->window)) 2177 2124 goto prop_msg_full; 2178 2125 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority)) 2179 2126 goto prop_msg_full;
+7 -2
net/tipc/link.h
··· 60 60 */ 61 61 #define LINK_STARTED 0x0001 62 62 #define LINK_STOPPED 0x0002 63 + #define LINK_SYNCHING 0x0004 63 64 64 65 /* Starting value for maximum packet size negotiation on unicast links 65 66 * (unless bearer MTU is less) ··· 119 118 * @pmsg: convenience pointer to "proto_msg" field 120 119 * @priority: current link priority 121 120 * @net_plane: current link network plane ('A' through 'H') 122 - * @queue_limit: outbound message queue congestion thresholds (indexed by user) 121 + * @backlog_limit: backlog queue congestion thresholds (indexed by importance) 123 122 * @exp_msg_count: # of tunnelled messages expected during link changeover 124 123 * @reset_checkpoint: seq # of last acknowledged message at time of link reset 125 124 * @max_pkt: current maximum packet size for this link ··· 167 166 struct tipc_msg *pmsg; 168 167 u32 priority; 169 168 char net_plane; 170 - u32 queue_limit[15]; /* queue_limit[0]==window limit */ 171 169 172 170 /* Changeover */ 173 171 u32 exp_msg_count; 174 172 u32 reset_checkpoint; 173 + u32 synch_point; 175 174 176 175 /* Max packet negotiation */ 177 176 u32 max_pkt; ··· 181 180 /* Sending */ 182 181 struct sk_buff_head transmq; 183 182 struct sk_buff_head backlogq; 183 + struct { 184 + u16 len; 185 + u16 limit; 186 + } backlog[5]; 184 187 u32 next_out_no; 185 188 u32 window; 186 189 u32 last_retransmitted;
+19 -17
net/tipc/msg.h
··· 240 240 m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); 241 241 } 242 242 243 + static inline unchar *msg_data(struct tipc_msg *m) 244 + { 245 + return ((unchar *)m) + msg_hdr_sz(m); 246 + } 247 + 248 + static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) 249 + { 250 + return (struct tipc_msg *)msg_data(m); 251 + } 243 252 244 253 /* 245 254 * Word 1 ··· 381 372 382 373 static inline u32 msg_origport(struct tipc_msg *m) 383 374 { 375 + if (msg_user(m) == MSG_FRAGMENTER) 376 + m = msg_get_wrapped(m); 384 377 return msg_word(m, 4); 385 378 } 386 379 ··· 478 467 msg_set_word(m, 10, n); 479 468 } 480 469 481 - static inline unchar *msg_data(struct tipc_msg *m) 482 - { 483 - return ((unchar *)m) + msg_hdr_sz(m); 484 - } 485 - 486 - static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) 487 - { 488 - return (struct tipc_msg *)msg_data(m); 489 - } 490 - 491 470 /* 492 471 * Constants and routines used to read and write TIPC internal message headers 493 472 */ ··· 554 553 msg_set_bits(m, 1, 15, 0x1fff, n); 555 554 } 556 555 556 + static inline bool msg_dup(struct tipc_msg *m) 557 + { 558 + if (likely(msg_user(m) != CHANGEOVER_PROTOCOL)) 559 + return false; 560 + if (msg_type(m) != DUPLICATE_MSG) 561 + return false; 562 + return true; 563 + } 557 564 558 565 /* 559 566 * Word 2 ··· 760 751 static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) 761 752 { 762 753 msg_set_bits(m, 9, 0, 0xffff, n); 763 - } 764 - 765 - static inline u32 msg_tot_origport(struct tipc_msg *m) 766 - { 767 - if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) 768 - return msg_origport(msg_get_wrapped(m)); 769 - return msg_origport(m); 770 754 } 771 755 772 756 struct sk_buff *tipc_buf_acquire(u32 size);