Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

tipc: fix name table rbtree issues

The current rbtree for service ranges in the name table is built based
on the 'lower' & 'upper' range values resulting in a flaw in the rbtree
searching. Some issues have been observed in case of range overlapping:

Case #1: unable to withdraw a name entry:
After some name services are bound, all of them are withdrawn by user
but one remains in the name table forever. This corrupts the table and
that service becomes dummy i.e. no real port.
E.g.

/
{22, 22}
/
/
---> {10, 50}
/ \
/ \
{10, 30} {20, 60}

The node {10, 30} cannot be removed since the rbtree searching stops at
the node's ancestor i.e. {10, 50}, so starting from it will never reach
the finding node.

Case #2: failed to send data in some cases:
E.g. Two service ranges: {20, 60}, {10, 50} are bound. The rbtree for
this service will be one of the two cases below depending on the order
of the bindings:

{20, 60} {10, 50} <--
/ \ / \
/ \ / \
{10, 50} NIL <-- NIL {20, 60}

(a) (b)

Now, try to send some data to service {30}, there will be two results:
(a): Failed, no route to host.
(b): Ok.

The reason is that the rbtree searching will stop at the pointing node
as shown above.

Case #3: Same as case #2b above but if the data sending's scope is
local and the {10, 50} is published by a peer node, then it will result
in 'no route to host' even though the other {20, 60} is for example on
the local node which should be able to get the data.

The issues are actually due to the way we built the rbtree. This commit
fixes it by introducing an additional field to each node - named 'max',
which is the largest 'upper' of that node subtree. The 'max' value for
each subtrees will be propagated correctly whenever a node is inserted/
removed or the tree is rebalanced by the augmented rbtree callbacks.

By this way, we can change the rbtree searching appoarch to solve the
issues above. Another benefit from this is that we can now improve the
searching for a next range matching e.g. in case of multicast, so get
rid of the unneeded looping over all nodes in the tree.

Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Tuong Lien and committed by
David S. Miller
d5162f34 ac397934

+179 -100
+179 -100
net/tipc/name_table.c
··· 36 36 37 37 #include <net/sock.h> 38 38 #include <linux/list_sort.h> 39 + #include <linux/rbtree_augmented.h> 39 40 #include "core.h" 40 41 #include "netlink.h" 41 42 #include "name_table.h" ··· 52 51 * @lower: service range lower bound 53 52 * @upper: service range upper bound 54 53 * @tree_node: member of service range RB tree 54 + * @max: largest 'upper' in this node subtree 55 55 * @local_publ: list of identical publications made from this node 56 56 * Used by closest_first lookup and multicast lookup algorithm 57 57 * @all_publ: all publications identical to this one, whatever node and scope ··· 62 60 u32 lower; 63 61 u32 upper; 64 62 struct rb_node tree_node; 63 + u32 max; 65 64 struct list_head local_publ; 66 65 struct list_head all_publ; 67 66 }; ··· 86 83 spinlock_t lock; /* Covers service range list */ 87 84 struct rcu_head rcu; 88 85 }; 86 + 87 + #define service_range_upper(sr) ((sr)->upper) 88 + RB_DECLARE_CALLBACKS_MAX(static, sr_callbacks, 89 + struct service_range, tree_node, u32, max, 90 + service_range_upper) 91 + 92 + #define service_range_entry(rbtree_node) \ 93 + (container_of(rbtree_node, struct service_range, tree_node)) 94 + 95 + #define service_range_overlap(sr, start, end) \ 96 + ((sr)->lower <= (end) && (sr)->upper >= (start)) 97 + 98 + /** 99 + * service_range_foreach_match - iterate over tipc service rbtree for each 100 + * range match 101 + * @sr: the service range pointer as a loop cursor 102 + * @sc: the pointer to tipc service which holds the service range rbtree 103 + * @start, end: the range (end >= start) for matching 104 + */ 105 + #define service_range_foreach_match(sr, sc, start, end) \ 106 + for (sr = service_range_match_first((sc)->ranges.rb_node, \ 107 + start, \ 108 + end); \ 109 + sr; \ 110 + sr = service_range_match_next(&(sr)->tree_node, \ 111 + start, \ 112 + end)) 113 + 114 + /** 115 + * service_range_match_first - find first service range matching a range 116 + * @n: the root node of service range rbtree for searching 117 + * @start, end: the range (end >= start) for matching 118 + * 119 + * Return: the leftmost service range node in the rbtree that overlaps the 120 + * specific range if any. Otherwise, returns NULL. 121 + */ 122 + static struct service_range *service_range_match_first(struct rb_node *n, 123 + u32 start, u32 end) 124 + { 125 + struct service_range *sr; 126 + struct rb_node *l, *r; 127 + 128 + /* Non overlaps in tree at all? */ 129 + if (!n || service_range_entry(n)->max < start) 130 + return NULL; 131 + 132 + while (n) { 133 + l = n->rb_left; 134 + if (l && service_range_entry(l)->max >= start) { 135 + /* A leftmost overlap range node must be one in the left 136 + * subtree. If not, it has lower > end, then nodes on 137 + * the right side cannot satisfy the condition either. 138 + */ 139 + n = l; 140 + continue; 141 + } 142 + 143 + /* No one in the left subtree can match, return if this node is 144 + * an overlap i.e. leftmost. 145 + */ 146 + sr = service_range_entry(n); 147 + if (service_range_overlap(sr, start, end)) 148 + return sr; 149 + 150 + /* Ok, try to lookup on the right side */ 151 + r = n->rb_right; 152 + if (sr->lower <= end && 153 + r && service_range_entry(r)->max >= start) { 154 + n = r; 155 + continue; 156 + } 157 + break; 158 + } 159 + 160 + return NULL; 161 + } 162 + 163 + /** 164 + * service_range_match_next - find next service range matching a range 165 + * @n: a node in service range rbtree from which the searching starts 166 + * @start, end: the range (end >= start) for matching 167 + * 168 + * Return: the next service range node to the given node in the rbtree that 169 + * overlaps the specific range if any. Otherwise, returns NULL. 170 + */ 171 + static struct service_range *service_range_match_next(struct rb_node *n, 172 + u32 start, u32 end) 173 + { 174 + struct service_range *sr; 175 + struct rb_node *p, *r; 176 + 177 + while (n) { 178 + r = n->rb_right; 179 + if (r && service_range_entry(r)->max >= start) 180 + /* A next overlap range node must be one in the right 181 + * subtree. If not, it has lower > end, then any next 182 + * successor (- an ancestor) of this node cannot 183 + * satisfy the condition either. 184 + */ 185 + return service_range_match_first(r, start, end); 186 + 187 + /* No one in the right subtree can match, go up to find an 188 + * ancestor of this node which is parent of a left-hand child. 189 + */ 190 + while ((p = rb_parent(n)) && n == p->rb_right) 191 + n = p; 192 + if (!p) 193 + break; 194 + 195 + /* Return if this ancestor is an overlap */ 196 + sr = service_range_entry(p); 197 + if (service_range_overlap(sr, start, end)) 198 + return sr; 199 + 200 + /* Ok, try to lookup more from this ancestor */ 201 + if (sr->lower <= end) { 202 + n = p; 203 + continue; 204 + } 205 + break; 206 + } 207 + 208 + return NULL; 209 + } 89 210 90 211 static int hash(int x) 91 212 { ··· 266 139 return service; 267 140 } 268 141 269 - /** 270 - * tipc_service_first_range - find first service range in tree matching instance 271 - * 272 - * Very time-critical, so binary search through range rb tree 273 - */ 274 - static struct service_range *tipc_service_first_range(struct tipc_service *sc, 275 - u32 instance) 276 - { 277 - struct rb_node *n = sc->ranges.rb_node; 278 - struct service_range *sr; 279 - 280 - while (n) { 281 - sr = container_of(n, struct service_range, tree_node); 282 - if (sr->lower > instance) 283 - n = n->rb_left; 284 - else if (sr->upper < instance) 285 - n = n->rb_right; 286 - else 287 - return sr; 288 - } 289 - return NULL; 290 - } 291 - 292 142 /* tipc_service_find_range - find service range matching publication parameters 293 143 */ 294 144 static struct service_range *tipc_service_find_range(struct tipc_service *sc, 295 145 u32 lower, u32 upper) 296 146 { 297 - struct rb_node *n = sc->ranges.rb_node; 298 147 struct service_range *sr; 299 148 300 - sr = tipc_service_first_range(sc, lower); 301 - if (!sr) 302 - return NULL; 303 - 304 - /* Look for exact match */ 305 - for (n = &sr->tree_node; n; n = rb_next(n)) { 306 - sr = container_of(n, struct service_range, tree_node); 307 - if (sr->upper == upper) 308 - break; 149 + service_range_foreach_match(sr, sc, lower, upper) { 150 + /* Look for exact match */ 151 + if (sr->lower == lower && sr->upper == upper) 152 + return sr; 309 153 } 310 - if (!n || sr->lower != lower || sr->upper != upper) 311 - return NULL; 312 154 313 - return sr; 155 + return NULL; 314 156 } 315 157 316 158 static struct service_range *tipc_service_create_range(struct tipc_service *sc, 317 159 u32 lower, u32 upper) 318 160 { 319 161 struct rb_node **n, *parent = NULL; 320 - struct service_range *sr, *tmp; 162 + struct service_range *sr; 321 163 322 164 n = &sc->ranges.rb_node; 323 165 while (*n) { 324 - tmp = container_of(*n, struct service_range, tree_node); 325 166 parent = *n; 326 - tmp = container_of(parent, struct service_range, tree_node); 327 - if (lower < tmp->lower) 328 - n = &(*n)->rb_left; 329 - else if (lower > tmp->lower) 330 - n = &(*n)->rb_right; 331 - else if (upper < tmp->upper) 332 - n = &(*n)->rb_left; 333 - else if (upper > tmp->upper) 334 - n = &(*n)->rb_right; 167 + sr = service_range_entry(parent); 168 + if (lower == sr->lower && upper == sr->upper) 169 + return sr; 170 + if (sr->max < upper) 171 + sr->max = upper; 172 + if (lower <= sr->lower) 173 + n = &parent->rb_left; 335 174 else 336 - return tmp; 175 + n = &parent->rb_right; 337 176 } 338 177 sr = kzalloc(sizeof(*sr), GFP_ATOMIC); 339 178 if (!sr) 340 179 return NULL; 341 180 sr->lower = lower; 342 181 sr->upper = upper; 182 + sr->max = upper; 343 183 INIT_LIST_HEAD(&sr->local_publ); 344 184 INIT_LIST_HEAD(&sr->all_publ); 345 185 rb_link_node(&sr->tree_node, parent, n); 346 - rb_insert_color(&sr->tree_node, &sc->ranges); 186 + rb_insert_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks); 347 187 return sr; 348 188 } 349 189 ··· 404 310 struct list_head publ_list; 405 311 struct service_range *sr; 406 312 struct tipc_name_seq ns; 407 - struct rb_node *n; 408 313 u32 filter; 409 314 410 315 ns.type = tipc_sub_read(sb, seq.type); ··· 418 325 return; 419 326 420 327 INIT_LIST_HEAD(&publ_list); 421 - for (n = rb_first(&service->ranges); n; n = rb_next(n)) { 422 - sr = container_of(n, struct service_range, tree_node); 423 - if (sr->lower > ns.upper) 424 - break; 425 - if (!tipc_sub_check_overlap(&ns, sr->lower, sr->upper)) 426 - continue; 427 - 328 + service_range_foreach_match(sr, service, ns.lower, ns.upper) { 428 329 first = NULL; 429 330 list_for_each_entry(p, &sr->all_publ, all_publ) { 430 331 if (filter & TIPC_SUB_PORTS) ··· 512 425 513 426 /* Remove service range item if this was its last publication */ 514 427 if (list_empty(&sr->all_publ)) { 515 - rb_erase(&sr->tree_node, &sc->ranges); 428 + rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks); 516 429 kfree(sr); 517 430 } 518 431 ··· 560 473 rcu_read_lock(); 561 474 sc = tipc_service_find(net, type); 562 475 if (unlikely(!sc)) 563 - goto not_found; 476 + goto exit; 564 477 565 478 spin_lock_bh(&sc->lock); 566 - sr = tipc_service_first_range(sc, instance); 567 - if (unlikely(!sr)) 568 - goto no_match; 569 - 570 - /* Select lookup algorithm: local, closest-first or round-robin */ 571 - if (*dnode == self) { 572 - list = &sr->local_publ; 573 - if (list_empty(list)) 574 - goto no_match; 575 - p = list_first_entry(list, struct publication, local_publ); 576 - list_move_tail(&p->local_publ, &sr->local_publ); 577 - } else if (legacy && !*dnode && !list_empty(&sr->local_publ)) { 578 - list = &sr->local_publ; 579 - p = list_first_entry(list, struct publication, local_publ); 580 - list_move_tail(&p->local_publ, &sr->local_publ); 581 - } else { 582 - list = &sr->all_publ; 583 - p = list_first_entry(list, struct publication, all_publ); 584 - list_move_tail(&p->all_publ, &sr->all_publ); 479 + service_range_foreach_match(sr, sc, instance, instance) { 480 + /* Select lookup algo: local, closest-first or round-robin */ 481 + if (*dnode == self) { 482 + list = &sr->local_publ; 483 + if (list_empty(list)) 484 + continue; 485 + p = list_first_entry(list, struct publication, 486 + local_publ); 487 + list_move_tail(&p->local_publ, &sr->local_publ); 488 + } else if (legacy && !*dnode && !list_empty(&sr->local_publ)) { 489 + list = &sr->local_publ; 490 + p = list_first_entry(list, struct publication, 491 + local_publ); 492 + list_move_tail(&p->local_publ, &sr->local_publ); 493 + } else { 494 + list = &sr->all_publ; 495 + p = list_first_entry(list, struct publication, 496 + all_publ); 497 + list_move_tail(&p->all_publ, &sr->all_publ); 498 + } 499 + port = p->port; 500 + node = p->node; 501 + /* Todo: as for legacy, pick the first matching range only, a 502 + * "true" round-robin will be performed as needed. 503 + */ 504 + break; 585 505 } 586 - port = p->port; 587 - node = p->node; 588 - no_match: 589 506 spin_unlock_bh(&sc->lock); 590 - not_found: 507 + 508 + exit: 591 509 rcu_read_unlock(); 592 510 *dnode = node; 593 511 return port; ··· 615 523 616 524 spin_lock_bh(&sc->lock); 617 525 618 - sr = tipc_service_first_range(sc, instance); 526 + /* Todo: a full search i.e. service_range_foreach_match() instead? */ 527 + sr = service_range_match_first(sc->ranges.rb_node, instance, instance); 619 528 if (!sr) 620 529 goto no_match; 621 530 ··· 645 552 struct service_range *sr; 646 553 struct tipc_service *sc; 647 554 struct publication *p; 648 - struct rb_node *n; 649 555 650 556 rcu_read_lock(); 651 557 sc = tipc_service_find(net, type); ··· 652 560 goto exit; 653 561 654 562 spin_lock_bh(&sc->lock); 655 - 656 - for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { 657 - sr = container_of(n, struct service_range, tree_node); 658 - if (sr->upper < lower) 659 - continue; 660 - if (sr->lower > upper) 661 - break; 563 + service_range_foreach_match(sr, sc, lower, upper) { 662 564 list_for_each_entry(p, &sr->local_publ, local_publ) { 663 565 if (p->scope == scope || (!exact && p->scope < scope)) 664 566 tipc_dest_push(dports, 0, p->port); ··· 673 587 struct service_range *sr; 674 588 struct tipc_service *sc; 675 589 struct publication *p; 676 - struct rb_node *n; 677 590 678 591 rcu_read_lock(); 679 592 sc = tipc_service_find(net, type); ··· 680 595 goto exit; 681 596 682 597 spin_lock_bh(&sc->lock); 683 - 684 - for (n = rb_first(&sc->ranges); n; n = rb_next(n)) { 685 - sr = container_of(n, struct service_range, tree_node); 686 - if (sr->upper < lower) 687 - continue; 688 - if (sr->lower > upper) 689 - break; 598 + service_range_foreach_match(sr, sc, lower, upper) { 690 599 list_for_each_entry(p, &sr->all_publ, all_publ) { 691 600 tipc_nlist_add(nodes, p->node); 692 601 } ··· 878 799 tipc_service_remove_publ(sr, p->node, p->key); 879 800 kfree_rcu(p, rcu); 880 801 } 881 - rb_erase(&sr->tree_node, &sc->ranges); 802 + rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks); 882 803 kfree(sr); 883 804 } 884 805 hlist_del_init_rcu(&sc->service_list);