at v4.15-rc2 705 lines 17 kB view raw
1/* 2 * net/tipc/server.c: TIPC server infrastructure 3 * 4 * Copyright (c) 2012-2013, Wind River Systems 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36#include "server.h" 37#include "core.h" 38#include "socket.h" 39#include "addr.h" 40#include "msg.h" 41#include <net/sock.h> 42#include <linux/module.h> 43 44/* Number of messages to send before rescheduling */ 45#define MAX_SEND_MSG_COUNT 25 46#define MAX_RECV_MSG_COUNT 25 47#define CF_CONNECTED 1 48#define CF_SERVER 2 49 50#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) 51 52/** 53 * struct tipc_conn - TIPC connection structure 54 * @kref: reference counter to connection object 55 * @conid: connection identifier 56 * @sock: socket handler associated with connection 57 * @flags: indicates connection state 58 * @server: pointer to connected server 59 * @rwork: receive work item 60 * @usr_data: user-specified field 61 * @rx_action: what to do when connection socket is active 62 * @outqueue: pointer to first outbound message in queue 63 * @outqueue_lock: control access to the outqueue 64 * @outqueue: list of connection objects for its server 65 * @swork: send work item 66 */ 67struct tipc_conn { 68 struct kref kref; 69 int conid; 70 struct socket *sock; 71 unsigned long flags; 72 struct tipc_server *server; 73 struct work_struct rwork; 74 int (*rx_action) (struct tipc_conn *con); 75 void *usr_data; 76 struct list_head outqueue; 77 spinlock_t outqueue_lock; 78 struct work_struct swork; 79}; 80 81/* An entry waiting to be sent */ 82struct outqueue_entry { 83 struct list_head list; 84 struct kvec iov; 85 struct sockaddr_tipc dest; 86}; 87 88static void tipc_recv_work(struct work_struct *work); 89static void tipc_send_work(struct work_struct *work); 90static void tipc_clean_outqueues(struct tipc_conn *con); 91 92static void tipc_conn_kref_release(struct kref *kref) 93{ 94 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); 95 struct tipc_server *s = con->server; 96 struct sockaddr_tipc *saddr = s->saddr; 97 struct socket *sock = con->sock; 98 struct sock *sk; 99 100 if (sock) { 101 sk = sock->sk; 102 if (test_bit(CF_SERVER, &con->flags)) { 103 __module_get(sock->ops->owner); 104 __module_get(sk->sk_prot_creator->owner); 105 } 106 saddr->scope = -TIPC_NODE_SCOPE; 107 kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); 108 sock_release(sock); 109 con->sock = NULL; 110 } 111 spin_lock_bh(&s->idr_lock); 112 idr_remove(&s->conn_idr, con->conid); 113 s->idr_in_use--; 114 spin_unlock_bh(&s->idr_lock); 115 tipc_clean_outqueues(con); 116 kfree(con); 117} 118 119static void conn_put(struct tipc_conn *con) 120{ 121 kref_put(&con->kref, tipc_conn_kref_release); 122} 123 124static void conn_get(struct tipc_conn *con) 125{ 126 kref_get(&con->kref); 127} 128 129static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) 130{ 131 struct tipc_conn *con; 132 133 spin_lock_bh(&s->idr_lock); 134 con = idr_find(&s->conn_idr, conid); 135 if (con && test_bit(CF_CONNECTED, &con->flags)) 136 conn_get(con); 137 else 138 con = NULL; 139 spin_unlock_bh(&s->idr_lock); 140 return con; 141} 142 143static void sock_data_ready(struct sock *sk) 144{ 145 struct tipc_conn *con; 146 147 read_lock_bh(&sk->sk_callback_lock); 148 con = sock2con(sk); 149 if (con && test_bit(CF_CONNECTED, &con->flags)) { 150 conn_get(con); 151 if (!queue_work(con->server->rcv_wq, &con->rwork)) 152 conn_put(con); 153 } 154 read_unlock_bh(&sk->sk_callback_lock); 155} 156 157static void sock_write_space(struct sock *sk) 158{ 159 struct tipc_conn *con; 160 161 read_lock_bh(&sk->sk_callback_lock); 162 con = sock2con(sk); 163 if (con && test_bit(CF_CONNECTED, &con->flags)) { 164 conn_get(con); 165 if (!queue_work(con->server->send_wq, &con->swork)) 166 conn_put(con); 167 } 168 read_unlock_bh(&sk->sk_callback_lock); 169} 170 171static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) 172{ 173 struct sock *sk = sock->sk; 174 175 write_lock_bh(&sk->sk_callback_lock); 176 177 sk->sk_data_ready = sock_data_ready; 178 sk->sk_write_space = sock_write_space; 179 sk->sk_user_data = con; 180 181 con->sock = sock; 182 183 write_unlock_bh(&sk->sk_callback_lock); 184} 185 186static void tipc_unregister_callbacks(struct tipc_conn *con) 187{ 188 struct sock *sk = con->sock->sk; 189 190 write_lock_bh(&sk->sk_callback_lock); 191 sk->sk_user_data = NULL; 192 write_unlock_bh(&sk->sk_callback_lock); 193} 194 195static void tipc_close_conn(struct tipc_conn *con) 196{ 197 struct tipc_server *s = con->server; 198 199 if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { 200 if (con->sock) 201 tipc_unregister_callbacks(con); 202 203 if (con->conid) 204 s->tipc_conn_release(con->conid, con->usr_data); 205 206 /* We shouldn't flush pending works as we may be in the 207 * thread. In fact the races with pending rx/tx work structs 208 * are harmless for us here as we have already deleted this 209 * connection from server connection list. 210 */ 211 if (con->sock) 212 kernel_sock_shutdown(con->sock, SHUT_RDWR); 213 conn_put(con); 214 } 215} 216 217static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) 218{ 219 struct tipc_conn *con; 220 int ret; 221 222 con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC); 223 if (!con) 224 return ERR_PTR(-ENOMEM); 225 226 kref_init(&con->kref); 227 INIT_LIST_HEAD(&con->outqueue); 228 spin_lock_init(&con->outqueue_lock); 229 INIT_WORK(&con->swork, tipc_send_work); 230 INIT_WORK(&con->rwork, tipc_recv_work); 231 232 spin_lock_bh(&s->idr_lock); 233 ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC); 234 if (ret < 0) { 235 kfree(con); 236 spin_unlock_bh(&s->idr_lock); 237 return ERR_PTR(-ENOMEM); 238 } 239 con->conid = ret; 240 s->idr_in_use++; 241 spin_unlock_bh(&s->idr_lock); 242 243 set_bit(CF_CONNECTED, &con->flags); 244 con->server = s; 245 246 return con; 247} 248 249static int tipc_receive_from_sock(struct tipc_conn *con) 250{ 251 struct msghdr msg = {}; 252 struct tipc_server *s = con->server; 253 struct sockaddr_tipc addr; 254 struct kvec iov; 255 void *buf; 256 int ret; 257 258 buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC); 259 if (!buf) { 260 ret = -ENOMEM; 261 goto out_close; 262 } 263 264 iov.iov_base = buf; 265 iov.iov_len = s->max_rcvbuf_size; 266 msg.msg_name = &addr; 267 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 268 MSG_DONTWAIT); 269 if (ret <= 0) { 270 kmem_cache_free(s->rcvbuf_cache, buf); 271 goto out_close; 272 } 273 274 s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid, &addr, 275 con->usr_data, buf, ret); 276 277 kmem_cache_free(s->rcvbuf_cache, buf); 278 279 return 0; 280 281out_close: 282 if (ret != -EWOULDBLOCK) 283 tipc_close_conn(con); 284 else if (ret == 0) 285 /* Don't return success if we really got EOF */ 286 ret = -EAGAIN; 287 288 return ret; 289} 290 291static int tipc_accept_from_sock(struct tipc_conn *con) 292{ 293 struct tipc_server *s = con->server; 294 struct socket *sock = con->sock; 295 struct socket *newsock; 296 struct tipc_conn *newcon; 297 int ret; 298 299 ret = kernel_accept(sock, &newsock, O_NONBLOCK); 300 if (ret < 0) 301 return ret; 302 303 newcon = tipc_alloc_conn(con->server); 304 if (IS_ERR(newcon)) { 305 ret = PTR_ERR(newcon); 306 sock_release(newsock); 307 return ret; 308 } 309 310 newcon->rx_action = tipc_receive_from_sock; 311 tipc_register_callbacks(newsock, newcon); 312 313 /* Notify that new connection is incoming */ 314 newcon->usr_data = s->tipc_conn_new(newcon->conid); 315 if (!newcon->usr_data) { 316 sock_release(newsock); 317 return -ENOMEM; 318 } 319 320 /* Wake up receive process in case of 'SYN+' message */ 321 newsock->sk->sk_data_ready(newsock->sk); 322 return ret; 323} 324 325static struct socket *tipc_create_listen_sock(struct tipc_conn *con) 326{ 327 struct tipc_server *s = con->server; 328 struct socket *sock = NULL; 329 int ret; 330 331 ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock); 332 if (ret < 0) 333 return NULL; 334 ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE, 335 (char *)&s->imp, sizeof(s->imp)); 336 if (ret < 0) 337 goto create_err; 338 ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr)); 339 if (ret < 0) 340 goto create_err; 341 342 switch (s->type) { 343 case SOCK_STREAM: 344 case SOCK_SEQPACKET: 345 con->rx_action = tipc_accept_from_sock; 346 347 ret = kernel_listen(sock, 0); 348 if (ret < 0) 349 goto create_err; 350 break; 351 case SOCK_DGRAM: 352 case SOCK_RDM: 353 con->rx_action = tipc_receive_from_sock; 354 break; 355 default: 356 pr_err("Unknown socket type %d\n", s->type); 357 goto create_err; 358 } 359 360 /* As server's listening socket owner and creator is the same module, 361 * we have to decrease TIPC module reference count to guarantee that 362 * it remains zero after the server socket is created, otherwise, 363 * executing "rmmod" command is unable to make TIPC module deleted 364 * after TIPC module is inserted successfully. 365 * 366 * However, the reference count is ever increased twice in 367 * sock_create_kern(): one is to increase the reference count of owner 368 * of TIPC socket's proto_ops struct; another is to increment the 369 * reference count of owner of TIPC proto struct. Therefore, we must 370 * decrement the module reference count twice to ensure that it keeps 371 * zero after server's listening socket is created. Of course, we 372 * must bump the module reference count twice as well before the socket 373 * is closed. 374 */ 375 module_put(sock->ops->owner); 376 module_put(sock->sk->sk_prot_creator->owner); 377 set_bit(CF_SERVER, &con->flags); 378 379 return sock; 380 381create_err: 382 kernel_sock_shutdown(sock, SHUT_RDWR); 383 sock_release(sock); 384 return NULL; 385} 386 387static int tipc_open_listening_sock(struct tipc_server *s) 388{ 389 struct socket *sock; 390 struct tipc_conn *con; 391 392 con = tipc_alloc_conn(s); 393 if (IS_ERR(con)) 394 return PTR_ERR(con); 395 396 sock = tipc_create_listen_sock(con); 397 if (!sock) { 398 idr_remove(&s->conn_idr, con->conid); 399 s->idr_in_use--; 400 kfree(con); 401 return -EINVAL; 402 } 403 404 tipc_register_callbacks(sock, con); 405 return 0; 406} 407 408static struct outqueue_entry *tipc_alloc_entry(void *data, int len) 409{ 410 struct outqueue_entry *entry; 411 void *buf; 412 413 entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC); 414 if (!entry) 415 return NULL; 416 417 buf = kmemdup(data, len, GFP_ATOMIC); 418 if (!buf) { 419 kfree(entry); 420 return NULL; 421 } 422 423 entry->iov.iov_base = buf; 424 entry->iov.iov_len = len; 425 426 return entry; 427} 428 429static void tipc_free_entry(struct outqueue_entry *e) 430{ 431 kfree(e->iov.iov_base); 432 kfree(e); 433} 434 435static void tipc_clean_outqueues(struct tipc_conn *con) 436{ 437 struct outqueue_entry *e, *safe; 438 439 spin_lock_bh(&con->outqueue_lock); 440 list_for_each_entry_safe(e, safe, &con->outqueue, list) { 441 list_del(&e->list); 442 tipc_free_entry(e); 443 } 444 spin_unlock_bh(&con->outqueue_lock); 445} 446 447int tipc_conn_sendmsg(struct tipc_server *s, int conid, 448 struct sockaddr_tipc *addr, void *data, size_t len) 449{ 450 struct outqueue_entry *e; 451 struct tipc_conn *con; 452 453 con = tipc_conn_lookup(s, conid); 454 if (!con) 455 return -EINVAL; 456 457 if (!test_bit(CF_CONNECTED, &con->flags)) { 458 conn_put(con); 459 return 0; 460 } 461 462 e = tipc_alloc_entry(data, len); 463 if (!e) { 464 conn_put(con); 465 return -ENOMEM; 466 } 467 468 if (addr) 469 memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc)); 470 471 spin_lock_bh(&con->outqueue_lock); 472 list_add_tail(&e->list, &con->outqueue); 473 spin_unlock_bh(&con->outqueue_lock); 474 475 if (!queue_work(s->send_wq, &con->swork)) 476 conn_put(con); 477 return 0; 478} 479 480void tipc_conn_terminate(struct tipc_server *s, int conid) 481{ 482 struct tipc_conn *con; 483 484 con = tipc_conn_lookup(s, conid); 485 if (con) { 486 tipc_close_conn(con); 487 conn_put(con); 488 } 489} 490 491bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 492 u32 lower, u32 upper, int *conid) 493{ 494 struct tipc_subscriber *scbr; 495 struct tipc_subscr sub; 496 struct tipc_server *s; 497 struct tipc_conn *con; 498 499 sub.seq.type = type; 500 sub.seq.lower = lower; 501 sub.seq.upper = upper; 502 sub.timeout = TIPC_WAIT_FOREVER; 503 sub.filter = TIPC_SUB_PORTS; 504 *(u32 *)&sub.usr_handle = port; 505 506 con = tipc_alloc_conn(tipc_topsrv(net)); 507 if (IS_ERR(con)) 508 return false; 509 510 *conid = con->conid; 511 s = con->server; 512 scbr = s->tipc_conn_new(*conid); 513 if (!scbr) { 514 tipc_close_conn(con); 515 return false; 516 } 517 518 con->usr_data = scbr; 519 con->sock = NULL; 520 s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub)); 521 return true; 522} 523 524void tipc_topsrv_kern_unsubscr(struct net *net, int conid) 525{ 526 struct tipc_conn *con; 527 528 con = tipc_conn_lookup(tipc_topsrv(net), conid); 529 if (!con) 530 return; 531 tipc_close_conn(con); 532 conn_put(con); 533} 534 535static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) 536{ 537 u32 port = *(u32 *)&evt->s.usr_handle; 538 u32 self = tipc_own_addr(net); 539 struct sk_buff_head evtq; 540 struct sk_buff *skb; 541 542 skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt), 543 self, self, port, port, 0); 544 if (!skb) 545 return; 546 msg_set_dest_droppable(buf_msg(skb), true); 547 memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); 548 skb_queue_head_init(&evtq); 549 __skb_queue_tail(&evtq, skb); 550 tipc_sk_rcv(net, &evtq); 551} 552 553static void tipc_send_to_sock(struct tipc_conn *con) 554{ 555 struct tipc_server *s = con->server; 556 struct outqueue_entry *e; 557 struct tipc_event *evt; 558 struct msghdr msg; 559 int count = 0; 560 int ret; 561 562 spin_lock_bh(&con->outqueue_lock); 563 while (test_bit(CF_CONNECTED, &con->flags)) { 564 e = list_entry(con->outqueue.next, struct outqueue_entry, list); 565 if ((struct list_head *) e == &con->outqueue) 566 break; 567 568 spin_unlock_bh(&con->outqueue_lock); 569 570 if (con->sock) { 571 memset(&msg, 0, sizeof(msg)); 572 msg.msg_flags = MSG_DONTWAIT; 573 if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) { 574 msg.msg_name = &e->dest; 575 msg.msg_namelen = sizeof(struct sockaddr_tipc); 576 } 577 ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, 578 e->iov.iov_len); 579 if (ret == -EWOULDBLOCK || ret == 0) { 580 cond_resched(); 581 goto out; 582 } else if (ret < 0) { 583 goto send_err; 584 } 585 } else { 586 evt = e->iov.iov_base; 587 tipc_send_kern_top_evt(s->net, evt); 588 } 589 /* Don't starve users filling buffers */ 590 if (++count >= MAX_SEND_MSG_COUNT) { 591 cond_resched(); 592 count = 0; 593 } 594 595 spin_lock_bh(&con->outqueue_lock); 596 list_del(&e->list); 597 tipc_free_entry(e); 598 } 599 spin_unlock_bh(&con->outqueue_lock); 600out: 601 return; 602 603send_err: 604 tipc_close_conn(con); 605} 606 607static void tipc_recv_work(struct work_struct *work) 608{ 609 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); 610 int count = 0; 611 612 while (test_bit(CF_CONNECTED, &con->flags)) { 613 if (con->rx_action(con)) 614 break; 615 616 /* Don't flood Rx machine */ 617 if (++count >= MAX_RECV_MSG_COUNT) { 618 cond_resched(); 619 count = 0; 620 } 621 } 622 conn_put(con); 623} 624 625static void tipc_send_work(struct work_struct *work) 626{ 627 struct tipc_conn *con = container_of(work, struct tipc_conn, swork); 628 629 if (test_bit(CF_CONNECTED, &con->flags)) 630 tipc_send_to_sock(con); 631 632 conn_put(con); 633} 634 635static void tipc_work_stop(struct tipc_server *s) 636{ 637 destroy_workqueue(s->rcv_wq); 638 destroy_workqueue(s->send_wq); 639} 640 641static int tipc_work_start(struct tipc_server *s) 642{ 643 s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0); 644 if (!s->rcv_wq) { 645 pr_err("can't start tipc receive workqueue\n"); 646 return -ENOMEM; 647 } 648 649 s->send_wq = alloc_ordered_workqueue("tipc_send", 0); 650 if (!s->send_wq) { 651 pr_err("can't start tipc send workqueue\n"); 652 destroy_workqueue(s->rcv_wq); 653 return -ENOMEM; 654 } 655 656 return 0; 657} 658 659int tipc_server_start(struct tipc_server *s) 660{ 661 int ret; 662 663 spin_lock_init(&s->idr_lock); 664 idr_init(&s->conn_idr); 665 s->idr_in_use = 0; 666 667 s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size, 668 0, SLAB_HWCACHE_ALIGN, NULL); 669 if (!s->rcvbuf_cache) 670 return -ENOMEM; 671 672 ret = tipc_work_start(s); 673 if (ret < 0) { 674 kmem_cache_destroy(s->rcvbuf_cache); 675 return ret; 676 } 677 ret = tipc_open_listening_sock(s); 678 if (ret < 0) { 679 tipc_work_stop(s); 680 kmem_cache_destroy(s->rcvbuf_cache); 681 return ret; 682 } 683 return ret; 684} 685 686void tipc_server_stop(struct tipc_server *s) 687{ 688 struct tipc_conn *con; 689 int id; 690 691 spin_lock_bh(&s->idr_lock); 692 for (id = 0; s->idr_in_use; id++) { 693 con = idr_find(&s->conn_idr, id); 694 if (con) { 695 spin_unlock_bh(&s->idr_lock); 696 tipc_close_conn(con); 697 spin_lock_bh(&s->idr_lock); 698 } 699 } 700 spin_unlock_bh(&s->idr_lock); 701 702 tipc_work_stop(s); 703 kmem_cache_destroy(s->rcvbuf_cache); 704 idr_destroy(&s->conn_idr); 705}