Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

tipc: permit overlapping service ranges in name table

With the new RB tree structure for service ranges it becomes possible to
solve an old problem; - we can now allow overlapping service ranges in
the table.

When inserting a new service range to the tree, we use 'lower' as primary
key, and when necessary 'upper' as secondary key.

Since there may now be multiple service ranges matching an indicated
'lower' value, we must also add the 'upper' value to the functions
used for removing publications, so that the correct, corresponding
range item can be found.

These changes guarantee that a well-formed publication/withdrawal item
from a peer node never will be rejected, and make it possible to
eliminate the problematic backlog functionality we currently have for
handling such cases.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Jon Maloy and committed by
David S. Miller
37922ea4 f20889f7

+60 -111
+22 -68
net/tipc/name_distr.c
··· 204 204 */ 205 205 static void tipc_publ_purge(struct net *net, struct publication *publ, u32 addr) 206 206 { 207 - struct tipc_net *tn = net_generic(net, tipc_net_id); 207 + struct tipc_net *tn = tipc_net(net); 208 208 struct publication *p; 209 209 210 210 spin_lock_bh(&tn->nametbl_lock); 211 - p = tipc_nametbl_remove_publ(net, publ->type, publ->lower, 212 - publ->node, publ->port, publ->key); 211 + p = tipc_nametbl_remove_publ(net, publ->type, publ->lower, publ->upper, 212 + publ->node, publ->key); 213 213 if (p) 214 214 tipc_node_unsubscribe(net, &p->binding_node, addr); 215 215 spin_unlock_bh(&tn->nametbl_lock); ··· 261 261 static bool tipc_update_nametbl(struct net *net, struct distr_item *i, 262 262 u32 node, u32 dtype) 263 263 { 264 - struct publication *publ = NULL; 264 + struct publication *p = NULL; 265 + u32 lower = ntohl(i->lower); 266 + u32 upper = ntohl(i->upper); 267 + u32 type = ntohl(i->type); 268 + u32 port = ntohl(i->port); 269 + u32 key = ntohl(i->key); 265 270 266 271 if (dtype == PUBLICATION) { 267 - publ = tipc_nametbl_insert_publ(net, ntohl(i->type), 268 - ntohl(i->lower), 269 - ntohl(i->upper), 270 - TIPC_CLUSTER_SCOPE, node, 271 - ntohl(i->port), ntohl(i->key)); 272 - if (publ) { 273 - tipc_node_subscribe(net, &publ->binding_node, node); 272 + p = tipc_nametbl_insert_publ(net, type, lower, upper, 273 + TIPC_CLUSTER_SCOPE, node, 274 + port, key); 275 + if (p) { 276 + tipc_node_subscribe(net, &p->binding_node, node); 274 277 return true; 275 278 } 276 279 } else if (dtype == WITHDRAWAL) { 277 - publ = tipc_nametbl_remove_publ(net, ntohl(i->type), 278 - ntohl(i->lower), 279 - node, ntohl(i->port), 280 - ntohl(i->key)); 281 - if (publ) { 282 - tipc_node_unsubscribe(net, &publ->binding_node, node); 283 - kfree_rcu(publ, rcu); 280 + p = tipc_nametbl_remove_publ(net, type, lower, 281 + upper, node, key); 282 + if (p) { 283 + tipc_node_unsubscribe(net, &p->binding_node, node); 284 + kfree_rcu(p, rcu); 284 285 return true; 285 286 } 287 + pr_warn_ratelimited("Failed to remove binding %u,%u from %x\n", 288 + type, lower, node); 286 289 } else { 287 290 pr_warn("Unrecognized name table message received\n"); 288 291 } 289 292 return false; 290 - } 291 - 292 - /** 293 - * tipc_named_add_backlog - add a failed name table update to the backlog 294 - * 295 - */ 296 - static void tipc_named_add_backlog(struct net *net, struct distr_item *i, 297 - u32 type, u32 node) 298 - { 299 - struct distr_queue_item *e; 300 - struct tipc_net *tn = net_generic(net, tipc_net_id); 301 - unsigned long now = get_jiffies_64(); 302 - 303 - e = kzalloc(sizeof(*e), GFP_ATOMIC); 304 - if (!e) 305 - return; 306 - e->dtype = type; 307 - e->node = node; 308 - e->expires = now + msecs_to_jiffies(sysctl_tipc_named_timeout); 309 - memcpy(e, i, sizeof(*i)); 310 - list_add_tail(&e->next, &tn->dist_queue); 311 - } 312 - 313 - /** 314 - * tipc_named_process_backlog - try to process any pending name table updates 315 - * from the network. 316 - */ 317 - void tipc_named_process_backlog(struct net *net) 318 - { 319 - struct distr_queue_item *e, *tmp; 320 - struct tipc_net *tn = net_generic(net, tipc_net_id); 321 - unsigned long now = get_jiffies_64(); 322 - 323 - list_for_each_entry_safe(e, tmp, &tn->dist_queue, next) { 324 - if (time_after(e->expires, now)) { 325 - if (!tipc_update_nametbl(net, &e->i, e->node, e->dtype)) 326 - continue; 327 - } else { 328 - pr_warn_ratelimited("Dropping name table update (%d) of {%u, %u, %u} from %x key=%u\n", 329 - e->dtype, ntohl(e->i.type), 330 - ntohl(e->i.lower), 331 - ntohl(e->i.upper), 332 - e->node, ntohl(e->i.key)); 333 - } 334 - list_del(&e->next); 335 - kfree(e); 336 - } 337 293 } 338 294 339 295 /** ··· 314 358 count = msg_data_sz(msg) / ITEM_SIZE; 315 359 node = msg_orignode(msg); 316 360 while (count--) { 317 - if (!tipc_update_nametbl(net, item, node, mtype)) 318 - tipc_named_add_backlog(net, item, mtype, node); 361 + tipc_update_nametbl(net, item, node, mtype); 319 362 item++; 320 363 } 321 364 kfree_skb(skb); 322 - tipc_named_process_backlog(net); 323 365 } 324 366 spin_unlock_bh(&tn->nametbl_lock); 325 367 }
-1
net/tipc/name_distr.h
··· 72 72 void tipc_named_node_up(struct net *net, u32 dnode); 73 73 void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue); 74 74 void tipc_named_reinit(struct net *net); 75 - void tipc_named_process_backlog(struct net *net); 76 75 void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr); 77 76 78 77 #endif
+30 -34
net/tipc/name_table.c
··· 171 171 tmp = container_of(parent, struct service_range, tree_node); 172 172 if (lower < tmp->lower) 173 173 n = &(*n)->rb_left; 174 + else if (lower > tmp->lower) 175 + n = &(*n)->rb_right; 176 + else if (upper < tmp->upper) 177 + n = &(*n)->rb_left; 174 178 else if (upper > tmp->upper) 175 179 n = &(*n)->rb_right; 176 180 else 177 - return NULL; 181 + return tmp; 178 182 } 179 183 sr = kzalloc(sizeof(*sr), GFP_ATOMIC); 180 184 if (!sr) ··· 204 200 struct publication *p; 205 201 bool first = false; 206 202 207 - sr = tipc_service_find_range(sc, lower); 208 - if (!sr) { 209 - sr = tipc_service_create_range(sc, lower, upper); 210 - if (!sr) 211 - goto err; 212 - first = true; 213 - } 203 + sr = tipc_service_create_range(sc, lower, upper); 204 + if (!sr) 205 + goto err; 214 206 215 - /* Lower end overlaps existing entry, but we need an exact match */ 216 - if (sr->lower != lower || sr->upper != upper) 217 - return NULL; 207 + first = list_empty(&sr->all_publ); 218 208 219 209 /* Return if the publication already exists */ 220 210 list_for_each_entry(p, &sr->all_publ, all_publ) { ··· 237 239 238 240 /** 239 241 * tipc_service_remove_publ - remove a publication from a service 240 - * 241 - * NOTE: There may be cases where TIPC is asked to remove a publication 242 - * that is not in the name table. For example, if another node issues a 243 - * publication for a name range that overlaps an existing name range 244 - * the publication will not be recorded, which means the publication won't 245 - * be found when the name range is later withdrawn by that node. 246 - * A failed withdraw request simply returns a failure indication and lets the 247 - * caller issue any error or warning messages associated with such a problem. 248 242 */ 249 243 static struct publication *tipc_service_remove_publ(struct net *net, 250 244 struct tipc_service *sc, 251 - u32 inst, u32 node, 252 - u32 port, u32 key) 245 + u32 lower, u32 upper, 246 + u32 node, u32 key) 253 247 { 254 248 struct tipc_subscription *sub, *tmp; 255 249 struct service_range *sr; 256 250 struct publication *p; 257 251 bool found = false; 258 252 bool last = false; 253 + struct rb_node *n; 259 254 260 - sr = tipc_service_find_range(sc, inst); 255 + sr = tipc_service_find_range(sc, lower); 261 256 if (!sr) 257 + return NULL; 258 + 259 + /* Find exact matching service range */ 260 + for (n = &sr->tree_node; n; n = rb_next(n)) { 261 + sr = container_of(n, struct service_range, tree_node); 262 + if (sr->upper == upper) 263 + break; 264 + } 265 + if (!n || sr->lower != lower || sr->upper != upper) 262 266 return NULL; 263 267 264 268 /* Find publication, if it exists */ ··· 375 375 } 376 376 377 377 struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, 378 - u32 lower, u32 node, u32 port, 379 - u32 key) 378 + u32 lower, u32 upper, 379 + u32 node, u32 key) 380 380 { 381 381 struct tipc_service *sc = tipc_service_find(net, type); 382 382 struct publication *p = NULL; ··· 385 385 return NULL; 386 386 387 387 spin_lock_bh(&sc->lock); 388 - p = tipc_service_remove_publ(net, sc, lower, node, port, key); 388 + p = tipc_service_remove_publ(net, sc, lower, upper, node, key); 389 389 390 390 /* Delete service item if this no more publications and subscriptions */ 391 391 if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) { ··· 620 620 if (p) { 621 621 nt->local_publ_count++; 622 622 skb = tipc_named_publish(net, p); 623 - /* Any pending external events? */ 624 - tipc_named_process_backlog(net); 625 623 } 626 624 exit: 627 625 spin_unlock_bh(&tn->nametbl_lock); ··· 633 635 * tipc_nametbl_withdraw - withdraw a service binding 634 636 */ 635 637 int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, 636 - u32 port, u32 key) 638 + u32 upper, u32 key) 637 639 { 638 640 struct name_table *nt = tipc_name_table(net); 639 641 struct tipc_net *tn = tipc_net(net); ··· 643 645 644 646 spin_lock_bh(&tn->nametbl_lock); 645 647 646 - p = tipc_nametbl_remove_publ(net, type, lower, self, port, key); 648 + p = tipc_nametbl_remove_publ(net, type, lower, upper, self, key); 647 649 if (p) { 648 650 nt->local_publ_count--; 649 651 skb = tipc_named_withdraw(net, p); 650 - /* Any pending external events? */ 651 - tipc_named_process_backlog(net); 652 652 list_del_init(&p->binding_sock); 653 653 kfree_rcu(p, rcu); 654 654 } else { 655 655 pr_err("Failed to remove local publication {%u,%u,%u}/%u\n", 656 - type, lower, port, key); 656 + type, lower, upper, key); 657 657 } 658 658 spin_unlock_bh(&tn->nametbl_lock); 659 659 ··· 750 754 rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) { 751 755 list_for_each_entry_safe(p, tmpb, 752 756 &sr->all_publ, all_publ) { 753 - tipc_service_remove_publ(net, sc, p->lower, p->node, 754 - p->port, p->key); 757 + tipc_service_remove_publ(net, sc, p->lower, p->upper, 758 + p->node, p->key); 755 759 kfree_rcu(p, rcu); 756 760 } 757 761 }
+4 -4
net/tipc/name_table.h
··· 116 116 struct list_head *dsts, int *dstcnt, u32 exclude, 117 117 bool all); 118 118 struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, 119 - u32 upper, u32 scope, u32 port_ref, 119 + u32 upper, u32 scope, u32 port, 120 120 u32 key); 121 - int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, 121 + int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 upper, 122 122 u32 key); 123 123 struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type, 124 124 u32 lower, u32 upper, u32 scope, 125 125 u32 node, u32 ref, u32 key); 126 126 struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, 127 - u32 lower, u32 node, u32 ref, 128 - u32 key); 127 + u32 lower, u32 upper, 128 + u32 node, u32 key); 129 129 void tipc_nametbl_subscribe(struct tipc_subscription *s); 130 130 void tipc_nametbl_unsubscribe(struct tipc_subscription *s); 131 131 int tipc_nametbl_init(struct net *net);
+1 -1
net/tipc/net.c
··· 136 136 if (!self) 137 137 return; 138 138 139 - tipc_nametbl_withdraw(net, TIPC_CFG_SRV, self, 0, self); 139 + tipc_nametbl_withdraw(net, TIPC_CFG_SRV, self, self, self); 140 140 rtnl_lock(); 141 141 tipc_bearer_stop(net); 142 142 tipc_node_stop(net);
+1 -1
net/tipc/node.c
··· 329 329 if (flags & TIPC_NOTIFY_LINK_DOWN) { 330 330 tipc_mon_peer_down(net, addr, bearer_id); 331 331 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, 332 - link_id, link_id); 332 + addr, link_id); 333 333 } 334 334 } 335 335
+2 -2
net/tipc/socket.c
··· 2634 2634 if (publ->upper != seq->upper) 2635 2635 break; 2636 2636 tipc_nametbl_withdraw(net, publ->type, publ->lower, 2637 - publ->port, publ->key); 2637 + publ->upper, publ->key); 2638 2638 rc = 0; 2639 2639 break; 2640 2640 } 2641 2641 tipc_nametbl_withdraw(net, publ->type, publ->lower, 2642 - publ->port, publ->key); 2642 + publ->upper, publ->key); 2643 2643 rc = 0; 2644 2644 } 2645 2645 if (list_empty(&tsk->publications))