Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

tipc: make subscriber server support net namespace

TIPC establishes one subscriber server which allows users to subscribe
their interesting name service status. After tipc supports namespace,
one dedicated tipc stack instance is created for each namespace, and
each instance can be deemed as one independent TIPC node. As a result,
subscriber server must be built for each namespace.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Tested-by: Tero Aho <Tero.Aho@coriant.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Ying Xue and committed by
David S. Miller
a62fbcce 34747539

+86 -65
+12 -12
net/tipc/core.c
··· 68 68 err = tipc_nametbl_init(net); 69 69 if (err) 70 70 goto out_nametbl; 71 + 72 + err = tipc_subscr_start(net); 73 + if (err) 74 + goto out_subscr; 71 75 return 0; 72 76 77 + out_subscr: 78 + tipc_nametbl_stop(net); 73 79 out_nametbl: 74 80 tipc_sk_rht_destroy(net); 75 81 out_sk_rht: ··· 84 78 85 79 static void __net_exit tipc_exit_net(struct net *net) 86 80 { 81 + tipc_subscr_stop(net); 87 82 tipc_net_stop(net); 88 83 tipc_nametbl_stop(net); 89 84 tipc_sk_rht_destroy(net); ··· 111 104 112 105 get_random_bytes(&tipc_random, sizeof(tipc_random)); 113 106 114 - err = register_pernet_subsys(&tipc_net_ops); 115 - if (err) 116 - goto out_pernet; 117 - 118 107 err = tipc_netlink_start(); 119 108 if (err) 120 109 goto out_netlink; ··· 123 120 if (err) 124 121 goto out_sysctl; 125 122 126 - err = tipc_subscr_start(); 123 + err = register_pernet_subsys(&tipc_net_ops); 127 124 if (err) 128 - goto out_subscr; 125 + goto out_pernet; 129 126 130 127 err = tipc_bearer_setup(); 131 128 if (err) ··· 134 131 pr_info("Started in single node mode\n"); 135 132 return 0; 136 133 out_bearer: 137 - tipc_subscr_stop(); 138 - out_subscr: 134 + unregister_pernet_subsys(&tipc_net_ops); 135 + out_pernet: 139 136 tipc_unregister_sysctl(); 140 137 out_sysctl: 141 138 tipc_socket_stop(); 142 139 out_socket: 143 140 tipc_netlink_stop(); 144 141 out_netlink: 145 - unregister_pernet_subsys(&tipc_net_ops); 146 - out_pernet: 147 142 pr_err("Unable to start in single node mode\n"); 148 143 return err; 149 144 } 150 145 151 146 static void __exit tipc_exit(void) 152 147 { 153 - unregister_pernet_subsys(&tipc_net_ops); 154 148 tipc_bearer_cleanup(); 155 149 tipc_netlink_stop(); 156 - tipc_subscr_stop(); 157 150 tipc_socket_stop(); 158 151 tipc_unregister_sysctl(); 152 + unregister_pernet_subsys(&tipc_net_ops); 159 153 160 154 pr_info("Deactivated\n"); 161 155 }
+4
net/tipc/core.h
··· 106 106 /* Name table */ 107 107 spinlock_t nametbl_lock; 108 108 struct name_table *nametbl; 109 + 110 + /* Topology subscription server */ 111 + struct tipc_server *topsrv; 112 + atomic_t subscription_count; 109 113 }; 110 114 111 115 #ifdef CONFIG_SYSCTL
+1 -1
net/tipc/server.c
··· 309 309 struct socket *sock = NULL; 310 310 int ret; 311 311 312 - ret = tipc_sock_create_local(s->type, &sock); 312 + ret = tipc_sock_create_local(s->net, s->type, &sock); 313 313 if (ret < 0) 314 314 return NULL; 315 315 ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
+3 -1
net/tipc/server.h
··· 47 47 * @conn_idr: identifier set of connection 48 48 * @idr_lock: protect the connection identifier set 49 49 * @idr_in_use: amount of allocated identifier entry 50 + * @net: network namspace instance 50 51 * @rcvbuf_cache: memory cache of server receive buffer 51 52 * @rcv_wq: receive workqueue 52 53 * @send_wq: send workqueue ··· 64 63 struct idr conn_idr; 65 64 spinlock_t idr_lock; 66 65 int idr_in_use; 66 + struct net *net; 67 67 struct kmem_cache *rcvbuf_cache; 68 68 struct workqueue_struct *rcv_wq; 69 69 struct workqueue_struct *send_wq; ··· 75 73 struct sockaddr_tipc *addr, void *usr_data, 76 74 void *buf, size_t len); 77 75 struct sockaddr_tipc *saddr; 78 - const char name[TIPC_SERVER_NAME_LEN]; 76 + char name[TIPC_SERVER_NAME_LEN]; 79 77 int imp; 80 78 int type; 81 79 };
+2 -2
net/tipc/socket.c
··· 388 388 * 389 389 * Returns 0 on success, errno otherwise 390 390 */ 391 - int tipc_sock_create_local(int type, struct socket **res) 391 + int tipc_sock_create_local(struct net *net, int type, struct socket **res) 392 392 { 393 393 int rc; 394 394 ··· 397 397 pr_err("Failed to create kernel socket\n"); 398 398 return rc; 399 399 } 400 - tipc_sk_create(&init_net, *res, 0, 1); 400 + tipc_sk_create(net, *res, 0, 1); 401 401 402 402 return 0; 403 403 }
+1 -1
net/tipc/socket.h
··· 45 45 46 46 int tipc_socket_init(void); 47 47 void tipc_socket_stop(void); 48 - int tipc_sock_create_local(int type, struct socket **res); 48 + int tipc_sock_create_local(struct net *net, int type, struct socket **res); 49 49 void tipc_sock_release_local(struct socket *sock); 50 50 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, 51 51 int flags);
+61 -43
net/tipc/subscr.c
··· 50 50 struct list_head subscription_list; 51 51 }; 52 52 53 - static void subscr_conn_msg_event(struct net *net, int conid, 54 - struct sockaddr_tipc *addr, void *usr_data, 55 - void *buf, size_t len); 56 - static void *subscr_named_msg_event(int conid); 57 - static void subscr_conn_shutdown_event(int conid, void *usr_data); 58 - 59 - static atomic_t subscription_count = ATOMIC_INIT(0); 60 - 61 - static struct sockaddr_tipc topsrv_addr __read_mostly = { 62 - .family = AF_TIPC, 63 - .addrtype = TIPC_ADDR_NAMESEQ, 64 - .addr.nameseq.type = TIPC_TOP_SRV, 65 - .addr.nameseq.lower = TIPC_TOP_SRV, 66 - .addr.nameseq.upper = TIPC_TOP_SRV, 67 - .scope = TIPC_NODE_SCOPE 68 - }; 69 - 70 - static struct tipc_server topsrv __read_mostly = { 71 - .saddr = &topsrv_addr, 72 - .imp = TIPC_CRITICAL_IMPORTANCE, 73 - .type = SOCK_SEQPACKET, 74 - .max_rcvbuf_size = sizeof(struct tipc_subscr), 75 - .name = "topology_server", 76 - .tipc_conn_recvmsg = subscr_conn_msg_event, 77 - .tipc_conn_new = subscr_named_msg_event, 78 - .tipc_conn_shutdown = subscr_conn_shutdown_event, 79 - }; 80 - 81 53 /** 82 54 * htohl - convert value to endianness used by destination 83 55 * @in: value to convert ··· 66 94 u32 found_upper, u32 event, u32 port_ref, 67 95 u32 node) 68 96 { 97 + struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 69 98 struct tipc_subscriber *subscriber = sub->subscriber; 70 99 struct kvec msg_sect; 71 100 ··· 77 104 sub->evt.found_upper = htohl(found_upper, sub->swap); 78 105 sub->evt.port.ref = htohl(port_ref, sub->swap); 79 106 sub->evt.port.node = htohl(node, sub->swap); 80 - tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, msg_sect.iov_base, 81 - msg_sect.iov_len); 107 + tipc_conn_sendmsg(tn->topsrv, subscriber->conid, NULL, 108 + msg_sect.iov_base, msg_sect.iov_len); 82 109 } 83 110 84 111 /** ··· 119 146 { 120 147 struct tipc_subscription *sub = (struct tipc_subscription *)data; 121 148 struct tipc_subscriber *subscriber = sub->subscriber; 149 + struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 122 150 123 151 /* The spin lock per subscriber is used to protect its members */ 124 152 spin_lock_bh(&subscriber->lock); ··· 144 170 145 171 /* Now destroy subscription */ 146 172 kfree(sub); 147 - atomic_dec(&subscription_count); 173 + atomic_dec(&tn->subscription_count); 148 174 } 149 175 150 176 /** ··· 154 180 */ 155 181 static void subscr_del(struct tipc_subscription *sub) 156 182 { 183 + struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 184 + 157 185 tipc_nametbl_unsubscribe(sub); 158 186 list_del(&sub->subscription_list); 159 187 kfree(sub); 160 - atomic_dec(&subscription_count); 188 + atomic_dec(&tn->subscription_count); 161 189 } 162 190 163 191 /** ··· 167 191 * 168 192 * Note: Must call it in process context since it might sleep. 169 193 */ 170 - static void subscr_terminate(struct tipc_subscriber *subscriber) 194 + static void subscr_terminate(struct tipc_subscription *sub) 171 195 { 172 - tipc_conn_terminate(&topsrv, subscriber->conid); 196 + struct tipc_subscriber *subscriber = sub->subscriber; 197 + struct tipc_net *tn = net_generic(sub->net, tipc_net_id); 198 + 199 + tipc_conn_terminate(tn->topsrv, subscriber->conid); 173 200 } 174 201 175 202 static void subscr_release(struct tipc_subscriber *subscriber) ··· 242 263 */ 243 264 static int subscr_subscribe(struct net *net, struct tipc_subscr *s, 244 265 struct tipc_subscriber *subscriber, 245 - struct tipc_subscription **sub_p) { 266 + struct tipc_subscription **sub_p) 267 + { 268 + struct tipc_net *tn = net_generic(net, tipc_net_id); 246 269 struct tipc_subscription *sub; 247 270 int swap; 248 271 ··· 259 278 } 260 279 261 280 /* Refuse subscription if global limit exceeded */ 262 - if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 281 + if (atomic_read(&tn->subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { 263 282 pr_warn("Subscription rejected, limit reached (%u)\n", 264 283 TIPC_MAX_SUBSCRIPTIONS); 265 284 return -EINVAL; ··· 290 309 sub->subscriber = subscriber; 291 310 sub->swap = swap; 292 311 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 293 - atomic_inc(&subscription_count); 312 + atomic_inc(&tn->subscription_count); 294 313 if (sub->timeout != TIPC_WAIT_FOREVER) { 295 314 setup_timer(&sub->timer, subscr_timeout, (unsigned long)sub); 296 315 mod_timer(&sub->timer, jiffies + sub->timeout); ··· 317 336 if (subscr_subscribe(net, (struct tipc_subscr *)buf, subscriber, 318 337 &sub) < 0) { 319 338 spin_unlock_bh(&subscriber->lock); 320 - subscr_terminate(subscriber); 339 + subscr_terminate(sub); 321 340 return; 322 341 } 323 342 if (sub) 324 343 tipc_nametbl_subscribe(sub); 325 344 spin_unlock_bh(&subscriber->lock); 326 345 } 327 - 328 346 329 347 /* Handle one request to establish a new subscriber */ 330 348 static void *subscr_named_msg_event(int conid) ··· 343 363 return (void *)subscriber; 344 364 } 345 365 346 - int tipc_subscr_start(void) 366 + int tipc_subscr_start(struct net *net) 347 367 { 348 - return tipc_server_start(&topsrv); 368 + struct tipc_net *tn = net_generic(net, tipc_net_id); 369 + const char name[] = "topology_server"; 370 + struct tipc_server *topsrv; 371 + struct sockaddr_tipc *saddr; 372 + 373 + saddr = kzalloc(sizeof(*saddr), GFP_ATOMIC); 374 + if (!saddr) 375 + return -ENOMEM; 376 + saddr->family = AF_TIPC; 377 + saddr->addrtype = TIPC_ADDR_NAMESEQ; 378 + saddr->addr.nameseq.type = TIPC_TOP_SRV; 379 + saddr->addr.nameseq.lower = TIPC_TOP_SRV; 380 + saddr->addr.nameseq.upper = TIPC_TOP_SRV; 381 + saddr->scope = TIPC_NODE_SCOPE; 382 + 383 + topsrv = kzalloc(sizeof(*topsrv), GFP_ATOMIC); 384 + if (!topsrv) { 385 + kfree(saddr); 386 + return -ENOMEM; 387 + } 388 + topsrv->net = net; 389 + topsrv->saddr = saddr; 390 + topsrv->imp = TIPC_CRITICAL_IMPORTANCE; 391 + topsrv->type = SOCK_SEQPACKET; 392 + topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr); 393 + topsrv->tipc_conn_recvmsg = subscr_conn_msg_event; 394 + topsrv->tipc_conn_new = subscr_named_msg_event; 395 + topsrv->tipc_conn_shutdown = subscr_conn_shutdown_event; 396 + 397 + strncpy(topsrv->name, name, strlen(name) + 1); 398 + tn->topsrv = topsrv; 399 + atomic_set(&tn->subscription_count, 0); 400 + 401 + return tipc_server_start(topsrv); 349 402 } 350 403 351 - void tipc_subscr_stop(void) 404 + void tipc_subscr_stop(struct net *net) 352 405 { 353 - tipc_server_stop(&topsrv); 406 + struct tipc_net *tn = net_generic(net, tipc_net_id); 407 + struct tipc_server *topsrv = tn->topsrv; 408 + 409 + tipc_server_stop(topsrv); 410 + kfree(topsrv->saddr); 411 + kfree(topsrv); 354 412 }
+2 -5
net/tipc/subscr.h
··· 74 74 75 75 int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower, 76 76 u32 found_upper); 77 - 78 77 void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower, 79 78 u32 found_upper, u32 event, u32 port_ref, 80 79 u32 node, int must); 81 - 82 - int tipc_subscr_start(void); 83 - 84 - void tipc_subscr_stop(void); 80 + int tipc_subscr_start(struct net *net); 81 + void tipc_subscr_stop(struct net *net); 85 82 86 83 #endif