Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

tipc: improve groupcast scope handling

When a member joins a group, it also indicates a binding scope. This
makes it possible to create both node local groups, invisible to other
nodes, as well as cluster global groups, visible everywhere.

In order to avoid that different members end up having permanently
differing views of group size and memberhip, we must inhibit locally
and globally bound members from joining the same group.

We do this by using the binding scope as an additional separator between
groups. I.e., a member must ignore all membership events from sockets
using a different scope than itself, and all lookups for message
destinations must require an exact match between the message's lookup
scope and the potential target's binding scope.

Apart from making it possible to create local groups using the same
identity on different nodes, a side effect of this is that it now also
becomes possible to create a cluster global group with the same identity
across the same nodes, without interfering with the local groups.

Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Jon Maloy and committed by
David S. Miller
232d07b7 8348500f

+98 -74
+3 -4
include/uapi/linux/tipc.h
··· 117 117 /* 118 118 * Publication scopes when binding port names and port name sequences 119 119 */ 120 - 121 - #define TIPC_ZONE_SCOPE 1 122 - #define TIPC_CLUSTER_SCOPE 2 123 - #define TIPC_NODE_SCOPE 3 120 + #define TIPC_ZONE_SCOPE 1 121 + #define TIPC_CLUSTER_SCOPE 2 122 + #define TIPC_NODE_SCOPE 3 124 123 125 124 /* 126 125 * Limiting values for messages
+8 -5
net/tipc/group.c
··· 87 87 int subid; 88 88 u32 type; 89 89 u32 instance; 90 - u32 domain; 91 90 u32 scope; 92 91 u32 portid; 93 92 u16 member_cnt; ··· 157 158 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 158 159 struct tipc_group_req *mreq) 159 160 { 161 + u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; 162 + bool global = mreq->scope != TIPC_NODE_SCOPE; 160 163 struct tipc_group *grp; 161 164 u32 type = mreq->type; 162 165 ··· 172 171 grp->members = RB_ROOT; 173 172 grp->net = net; 174 173 grp->portid = portid; 175 - grp->domain = addr_domain(net, mreq->scope); 176 174 grp->type = type; 177 175 grp->instance = mreq->instance; 178 176 grp->scope = mreq->scope; 179 177 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 180 178 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 181 - if (tipc_topsrv_kern_subscr(net, portid, type, 182 - TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS, 183 - 0, ~0, &grp->subid)) 179 + filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; 180 + if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, 181 + filter, &grp->subid)) 184 182 return grp; 185 183 kfree(grp); 186 184 return NULL; ··· 730 730 u16 remitted, in_flight; 731 731 732 732 if (!grp) 733 + return; 734 + 735 + if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) 733 736 return; 734 737 735 738 m = tipc_group_find_member(grp, node, port);
+19 -21
net/tipc/name_table.c
··· 328 328 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 329 329 tipc_subscrp_report_overlap(s, publ->lower, publ->upper, 330 330 TIPC_PUBLISHED, publ->ref, 331 - publ->node, created_subseq); 331 + publ->node, publ->scope, 332 + created_subseq); 332 333 } 333 334 return publ; 334 335 } ··· 399 398 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 400 399 tipc_subscrp_report_overlap(s, publ->lower, publ->upper, 401 400 TIPC_WITHDRAWN, publ->ref, 402 - publ->node, removed_subseq); 401 + publ->node, publ->scope, 402 + removed_subseq); 403 403 } 404 404 405 405 return publ; ··· 437 435 sseq->upper, 438 436 TIPC_PUBLISHED, 439 437 crs->ref, crs->node, 438 + crs->scope, 440 439 must_report); 441 440 must_report = 0; 442 441 } ··· 601 598 return ref; 602 599 } 603 600 604 - bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, 601 + bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope, 605 602 struct list_head *dsts, int *dstcnt, u32 exclude, 606 603 bool all) 607 604 { ··· 610 607 struct name_info *info; 611 608 struct name_seq *seq; 612 609 struct sub_seq *sseq; 613 - 614 - if (!tipc_in_scope(domain, self)) 615 - return false; 616 610 617 611 *dstcnt = 0; 618 612 rcu_read_lock(); ··· 621 621 if (likely(sseq)) { 622 622 info = sseq->info; 623 623 list_for_each_entry(publ, &info->zone_list, zone_list) { 624 - if (!tipc_in_scope(domain, publ->node)) 624 + if (publ->scope != scope) 625 625 continue; 626 626 if (publ->ref == exclude && publ->node == self) 627 627 continue; ··· 639 639 return !list_empty(dsts); 640 640 } 641 641 642 - int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 643 - u32 limit, struct list_head *dports) 642 + int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper, 643 + u32 scope, bool exact, struct list_head *dports) 644 644 { 645 - struct name_seq *seq; 646 - struct sub_seq *sseq; 647 645 struct sub_seq *sseq_stop; 648 646 struct name_info *info; 647 + struct publication *p; 648 + struct name_seq *seq; 649 + struct sub_seq *sseq; 649 650 int res = 0; 650 651 651 652 rcu_read_lock(); ··· 658 657 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); 659 658 sseq_stop = seq->sseqs + seq->first_free; 660 659 for (; sseq != sseq_stop; sseq++) { 661 - struct publication *publ; 662 - 663 660 if (sseq->lower > upper) 664 661 break; 665 - 666 662 info = sseq->info; 667 - list_for_each_entry(publ, &info->node_list, node_list) { 668 - if (publ->scope <= limit) 669 - tipc_dest_push(dports, 0, publ->ref); 663 + list_for_each_entry(p, &info->node_list, node_list) { 664 + if (p->scope == scope || (!exact && p->scope < scope)) 665 + tipc_dest_push(dports, 0, p->ref); 670 666 } 671 667 672 668 if (info->cluster_list_size != info->node_list_size) ··· 680 682 * - Determines if any node local ports overlap 681 683 */ 682 684 void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 683 - u32 upper, u32 domain, 685 + u32 upper, u32 scope, 684 686 struct tipc_nlist *nodes) 685 687 { 686 688 struct sub_seq *sseq, *stop; ··· 699 701 for (; sseq != stop && sseq->lower <= upper; sseq++) { 700 702 info = sseq->info; 701 703 list_for_each_entry(publ, &info->zone_list, zone_list) { 702 - if (tipc_in_scope(domain, publ->node)) 704 + if (publ->scope == scope) 703 705 tipc_nlist_add(nodes, publ->node); 704 706 } 705 707 } ··· 711 713 /* tipc_nametbl_build_group - build list of communication group members 712 714 */ 713 715 void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, 714 - u32 type, u32 domain) 716 + u32 type, u32 scope) 715 717 { 716 718 struct sub_seq *sseq, *stop; 717 719 struct name_info *info; ··· 729 731 for (; sseq != stop; sseq++) { 730 732 info = sseq->info; 731 733 list_for_each_entry(p, &info->zone_list, zone_list) { 732 - if (!tipc_in_scope(domain, p->node)) 734 + if (p->scope != scope) 733 735 continue; 734 736 tipc_group_add_member(grp, p->node, p->ref, p->lower); 735 737 }
+2 -2
net/tipc/name_table.h
··· 100 100 int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); 101 101 102 102 u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); 103 - int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 104 - u32 limit, struct list_head *dports); 103 + int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper, 104 + u32 scope, bool exact, struct list_head *dports); 105 105 void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, 106 106 u32 type, u32 domain); 107 107 void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
+2 -2
net/tipc/server.c
··· 489 489 } 490 490 } 491 491 492 - bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 493 - u32 filter, u32 lower, u32 upper, int *conid) 492 + bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, 493 + u32 upper, u32 filter, int *conid) 494 494 { 495 495 struct tipc_subscriber *scbr; 496 496 struct tipc_subscr sub;
+4 -2
net/tipc/server.h
··· 41 41 #include <net/net_namespace.h> 42 42 43 43 #define TIPC_SERVER_NAME_LEN 32 44 + #define TIPC_SUB_CLUSTER_SCOPE 0x20 45 + #define TIPC_SUB_NODE_SCOPE 0x40 44 46 #define TIPC_SUB_NO_STATUS 0x80 45 47 46 48 /** ··· 86 84 int tipc_conn_sendmsg(struct tipc_server *s, int conid, 87 85 struct sockaddr_tipc *addr, void *data, size_t len); 88 86 89 - bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 90 - u32 filter, u32 lower, u32 upper, int *conid); 87 + bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, 88 + u32 upper, u32 filter, int *conid); 91 89 void tipc_topsrv_kern_unsubscr(struct net *net, int conid); 92 90 93 91 /**
+52 -34
net/tipc/socket.c
··· 928 928 struct list_head *cong_links = &tsk->cong_links; 929 929 int blks = tsk_blocks(GROUP_H_SIZE + dlen); 930 930 struct tipc_group *grp = tsk->group; 931 + struct tipc_msg *hdr = &tsk->phdr; 931 932 struct tipc_member *first = NULL; 932 933 struct tipc_member *mbr = NULL; 933 934 struct net *net = sock_net(sk); 934 935 u32 node, port, exclude; 935 - u32 type, inst, domain; 936 936 struct list_head dsts; 937 + u32 type, inst, scope; 937 938 int lookups = 0; 938 939 int dstcnt, rc; 939 940 bool cong; 940 941 941 942 INIT_LIST_HEAD(&dsts); 942 943 943 - type = dest->addr.name.name.type; 944 + type = msg_nametype(hdr); 944 945 inst = dest->addr.name.name.instance; 945 - domain = addr_domain(net, dest->scope); 946 + scope = msg_lookup_scope(hdr); 946 947 exclude = tipc_group_exclude(grp); 947 948 948 949 while (++lookups < 4) { ··· 951 950 952 951 /* Look for a non-congested destination member, if any */ 953 952 while (1) { 954 - if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts, 953 + if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts, 955 954 &dstcnt, exclude, false)) 956 955 return -EHOSTUNREACH; 957 956 tipc_dest_pop(&dsts, &node, &port); ··· 1080 1079 { 1081 1080 struct sock *sk = sock->sk; 1082 1081 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1083 - struct tipc_name_seq *seq = &dest->addr.nameseq; 1084 1082 struct tipc_sock *tsk = tipc_sk(sk); 1085 1083 struct tipc_group *grp = tsk->group; 1084 + struct tipc_msg *hdr = &tsk->phdr; 1086 1085 struct net *net = sock_net(sk); 1087 - u32 domain, exclude, dstcnt; 1086 + u32 type, inst, scope, exclude; 1088 1087 struct list_head dsts; 1088 + u32 dstcnt; 1089 1089 1090 1090 INIT_LIST_HEAD(&dsts); 1091 1091 1092 - if (seq->lower != seq->upper) 1093 - return -ENOTSUPP; 1094 - 1095 - domain = addr_domain(net, dest->scope); 1092 + type = msg_nametype(hdr); 1093 + inst = dest->addr.name.name.instance; 1094 + scope = msg_lookup_scope(hdr); 1096 1095 exclude = tipc_group_exclude(grp); 1097 - if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain, 1098 - &dsts, &dstcnt, exclude, true)) 1096 + 1097 + if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts, 1098 + &dstcnt, exclude, true)) 1099 1099 return -EHOSTUNREACH; 1100 1100 1101 1101 if (dstcnt == 1) { ··· 1118 1116 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 1119 1117 struct sk_buff_head *inputq) 1120 1118 { 1121 - u32 scope = TIPC_CLUSTER_SCOPE; 1122 1119 u32 self = tipc_own_addr(net); 1120 + u32 type, lower, upper, scope; 1123 1121 struct sk_buff *skb, *_skb; 1124 - u32 lower = 0, upper = ~0; 1125 - struct sk_buff_head tmpq; 1126 1122 u32 portid, oport, onode; 1123 + struct sk_buff_head tmpq; 1127 1124 struct list_head dports; 1128 - struct tipc_msg *msg; 1129 - int user, mtyp, hsz; 1125 + struct tipc_msg *hdr; 1126 + int user, mtyp, hlen; 1127 + bool exact; 1130 1128 1131 1129 __skb_queue_head_init(&tmpq); 1132 1130 INIT_LIST_HEAD(&dports); 1133 1131 1134 1132 skb = tipc_skb_peek(arrvq, &inputq->lock); 1135 1133 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 1136 - msg = buf_msg(skb); 1137 - user = msg_user(msg); 1138 - mtyp = msg_type(msg); 1134 + hdr = buf_msg(skb); 1135 + user = msg_user(hdr); 1136 + mtyp = msg_type(hdr); 1137 + hlen = skb_headroom(skb) + msg_hdr_sz(hdr); 1138 + oport = msg_origport(hdr); 1139 + onode = msg_orignode(hdr); 1140 + type = msg_nametype(hdr); 1141 + 1139 1142 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) { 1140 1143 spin_lock_bh(&inputq->lock); 1141 1144 if (skb_peek(arrvq) == skb) { ··· 1151 1144 spin_unlock_bh(&inputq->lock); 1152 1145 continue; 1153 1146 } 1154 - hsz = skb_headroom(skb) + msg_hdr_sz(msg); 1155 - oport = msg_origport(msg); 1156 - onode = msg_orignode(msg); 1157 - if (onode == self) 1158 - scope = TIPC_NODE_SCOPE; 1159 1147 1160 - /* Create destination port list and message clones: */ 1161 - if (!msg_in_group(msg)) { 1162 - lower = msg_namelower(msg); 1163 - upper = msg_nameupper(msg); 1148 + /* Group messages require exact scope match */ 1149 + if (msg_in_group(hdr)) { 1150 + lower = 0; 1151 + upper = ~0; 1152 + scope = msg_lookup_scope(hdr); 1153 + exact = true; 1154 + } else { 1155 + /* TIPC_NODE_SCOPE means "any scope" in this context */ 1156 + if (onode == self) 1157 + scope = TIPC_NODE_SCOPE; 1158 + else 1159 + scope = TIPC_CLUSTER_SCOPE; 1160 + exact = false; 1161 + lower = msg_namelower(hdr); 1162 + upper = msg_nameupper(hdr); 1164 1163 } 1165 - tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper, 1166 - scope, &dports); 1164 + 1165 + /* Create destination port list: */ 1166 + tipc_nametbl_mc_lookup(net, type, lower, upper, 1167 + scope, exact, &dports); 1168 + 1169 + /* Clone message per destination */ 1167 1170 while (tipc_dest_pop(&dports, NULL, &portid)) { 1168 - _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 1171 + _skb = __pskb_copy(skb, hlen, GFP_ATOMIC); 1169 1172 if (_skb) { 1170 1173 msg_set_destport(buf_msg(_skb), portid); 1171 1174 __skb_queue_tail(&tmpq, _skb); ··· 2748 2731 static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) 2749 2732 { 2750 2733 struct net *net = sock_net(&tsk->sk); 2751 - u32 domain = addr_domain(net, mreq->scope); 2752 2734 struct tipc_group *grp = tsk->group; 2753 2735 struct tipc_msg *hdr = &tsk->phdr; 2754 2736 struct tipc_name_seq seq; ··· 2755 2739 2756 2740 if (mreq->type < TIPC_RESERVED_TYPES) 2757 2741 return -EACCES; 2742 + if (mreq->scope > TIPC_NODE_SCOPE) 2743 + return -EINVAL; 2758 2744 if (grp) 2759 2745 return -EACCES; 2760 2746 grp = tipc_group_create(net, tsk->portid, mreq); ··· 2769 2751 seq.type = mreq->type; 2770 2752 seq.lower = mreq->instance; 2771 2753 seq.upper = seq.lower; 2772 - tipc_nametbl_build_group(net, grp, mreq->type, domain); 2754 + tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope); 2773 2755 rc = tipc_sk_publish(tsk, mreq->scope, &seq); 2774 2756 if (rc) { 2775 2757 tipc_group_delete(net, grp);
+7 -3
net/tipc/subscr.c
··· 118 118 119 119 void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, 120 120 u32 found_upper, u32 event, u32 port_ref, 121 - u32 node, int must) 121 + u32 node, u32 scope, int must) 122 122 { 123 + u32 filter = htohl(sub->evt.s.filter, sub->swap); 123 124 struct tipc_name_seq seq; 124 125 125 126 tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq); 126 127 if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) 127 128 return; 128 - if (!must && 129 - !(htohl(sub->evt.s.filter, sub->swap) & TIPC_SUB_PORTS)) 129 + if (!must && !(filter & TIPC_SUB_PORTS)) 130 + return; 131 + if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE) 132 + return; 133 + if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) 130 134 return; 131 135 132 136 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
+1 -1
net/tipc/subscr.h
··· 71 71 u32 found_upper); 72 72 void tipc_subscrp_report_overlap(struct tipc_subscription *sub, 73 73 u32 found_lower, u32 found_upper, u32 event, 74 - u32 port_ref, u32 node, int must); 74 + u32 port_ref, u32 node, u32 scope, int must); 75 75 void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, 76 76 struct tipc_name_seq *out); 77 77 u32 tipc_subscrp_convert_seq_type(u32 type, int swap);