Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v3.5-rc1 1345 lines 33 kB view raw
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2007, Ericsson AB 5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "port.h" 40#include "name_table.h" 41 42/* Connection management: */ 43#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 44#define CONFIRMED 0 45#define PROBING 1 46 47#define MAX_REJECT_SIZE 1024 48 49static struct sk_buff *msg_queue_head; 50static struct sk_buff *msg_queue_tail; 51 52DEFINE_SPINLOCK(tipc_port_list_lock); 53static DEFINE_SPINLOCK(queue_lock); 54 55static LIST_HEAD(ports); 56static void port_handle_node_down(unsigned long ref); 57static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err); 58static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err); 59static void port_timeout(unsigned long ref); 60 61 62static u32 port_peernode(struct tipc_port *p_ptr) 63{ 64 return msg_destnode(&p_ptr->phdr); 65} 66 67static u32 port_peerport(struct tipc_port *p_ptr) 68{ 69 return msg_destport(&p_ptr->phdr); 70} 71 72/* 73 * tipc_port_peer_msg - verify message was sent by connected port's peer 74 * 75 * Handles cases where the node's network address has changed from 76 * the default of <0.0.0> to its configured setting. 77 */ 78int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg) 79{ 80 u32 peernode; 81 u32 orignode; 82 83 if (msg_origport(msg) != port_peerport(p_ptr)) 84 return 0; 85 86 orignode = msg_orignode(msg); 87 peernode = port_peernode(p_ptr); 88 return (orignode == peernode) || 89 (!orignode && (peernode == tipc_own_addr)) || 90 (!peernode && (orignode == tipc_own_addr)); 91} 92 93/** 94 * tipc_multicast - send a multicast message to local and remote destinations 95 */ 96int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, 97 u32 num_sect, struct iovec const *msg_sect, 98 unsigned int total_len) 99{ 100 struct tipc_msg *hdr; 101 struct sk_buff *buf; 102 struct sk_buff *ibuf = NULL; 103 struct tipc_port_list dports = {0, NULL, }; 104 struct tipc_port *oport = tipc_port_deref(ref); 105 int ext_targets; 106 int res; 107 108 if (unlikely(!oport)) 109 return -EINVAL; 110 111 /* Create multicast message */ 112 hdr = &oport->phdr; 113 msg_set_type(hdr, TIPC_MCAST_MSG); 114 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); 115 msg_set_destport(hdr, 0); 116 msg_set_destnode(hdr, 0); 117 msg_set_nametype(hdr, seq->type); 118 msg_set_namelower(hdr, seq->lower); 119 msg_set_nameupper(hdr, seq->upper); 120 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 121 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, 122 !oport->user_port, &buf); 123 if (unlikely(!buf)) 124 return res; 125 126 /* Figure out where to send multicast message */ 127 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 128 TIPC_NODE_SCOPE, &dports); 129 130 /* Send message to destinations (duplicate it only if necessary) */ 131 if (ext_targets) { 132 if (dports.count != 0) { 133 ibuf = skb_copy(buf, GFP_ATOMIC); 134 if (ibuf == NULL) { 135 tipc_port_list_free(&dports); 136 kfree_skb(buf); 137 return -ENOMEM; 138 } 139 } 140 res = tipc_bclink_send_msg(buf); 141 if ((res < 0) && (dports.count != 0)) 142 kfree_skb(ibuf); 143 } else { 144 ibuf = buf; 145 } 146 147 if (res >= 0) { 148 if (ibuf) 149 tipc_port_recv_mcast(ibuf, &dports); 150 } else { 151 tipc_port_list_free(&dports); 152 } 153 return res; 154} 155 156/** 157 * tipc_port_recv_mcast - deliver multicast message to all destination ports 158 * 159 * If there is no port list, perform a lookup to create one 160 */ 161void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp) 162{ 163 struct tipc_msg *msg; 164 struct tipc_port_list dports = {0, NULL, }; 165 struct tipc_port_list *item = dp; 166 int cnt = 0; 167 168 msg = buf_msg(buf); 169 170 /* Create destination port list, if one wasn't supplied */ 171 if (dp == NULL) { 172 tipc_nametbl_mc_translate(msg_nametype(msg), 173 msg_namelower(msg), 174 msg_nameupper(msg), 175 TIPC_CLUSTER_SCOPE, 176 &dports); 177 item = dp = &dports; 178 } 179 180 /* Deliver a copy of message to each destination port */ 181 if (dp->count != 0) { 182 msg_set_destnode(msg, tipc_own_addr); 183 if (dp->count == 1) { 184 msg_set_destport(msg, dp->ports[0]); 185 tipc_port_recv_msg(buf); 186 tipc_port_list_free(dp); 187 return; 188 } 189 for (; cnt < dp->count; cnt++) { 190 int index = cnt % PLSIZE; 191 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 192 193 if (b == NULL) { 194 warn("Unable to deliver multicast message(s)\n"); 195 goto exit; 196 } 197 if ((index == 0) && (cnt != 0)) 198 item = item->next; 199 msg_set_destport(buf_msg(b), item->ports[index]); 200 tipc_port_recv_msg(b); 201 } 202 } 203exit: 204 kfree_skb(buf); 205 tipc_port_list_free(dp); 206} 207 208/** 209 * tipc_createport_raw - create a generic TIPC port 210 * 211 * Returns pointer to (locked) TIPC port, or NULL if unable to create it 212 */ 213struct tipc_port *tipc_createport_raw(void *usr_handle, 214 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 215 void (*wakeup)(struct tipc_port *), 216 const u32 importance) 217{ 218 struct tipc_port *p_ptr; 219 struct tipc_msg *msg; 220 u32 ref; 221 222 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 223 if (!p_ptr) { 224 warn("Port creation failed, no memory\n"); 225 return NULL; 226 } 227 ref = tipc_ref_acquire(p_ptr, &p_ptr->lock); 228 if (!ref) { 229 warn("Port creation failed, reference table exhausted\n"); 230 kfree(p_ptr); 231 return NULL; 232 } 233 234 p_ptr->usr_handle = usr_handle; 235 p_ptr->max_pkt = MAX_PKT_DEFAULT; 236 p_ptr->ref = ref; 237 INIT_LIST_HEAD(&p_ptr->wait_list); 238 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 239 p_ptr->dispatcher = dispatcher; 240 p_ptr->wakeup = wakeup; 241 p_ptr->user_port = NULL; 242 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 243 INIT_LIST_HEAD(&p_ptr->publications); 244 INIT_LIST_HEAD(&p_ptr->port_list); 245 246 /* 247 * Must hold port list lock while initializing message header template 248 * to ensure a change to node's own network address doesn't result 249 * in template containing out-dated network address information 250 */ 251 spin_lock_bh(&tipc_port_list_lock); 252 msg = &p_ptr->phdr; 253 tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0); 254 msg_set_origport(msg, ref); 255 list_add_tail(&p_ptr->port_list, &ports); 256 spin_unlock_bh(&tipc_port_list_lock); 257 return p_ptr; 258} 259 260int tipc_deleteport(u32 ref) 261{ 262 struct tipc_port *p_ptr; 263 struct sk_buff *buf = NULL; 264 265 tipc_withdraw(ref, 0, NULL); 266 p_ptr = tipc_port_lock(ref); 267 if (!p_ptr) 268 return -EINVAL; 269 270 tipc_ref_discard(ref); 271 tipc_port_unlock(p_ptr); 272 273 k_cancel_timer(&p_ptr->timer); 274 if (p_ptr->connected) { 275 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 276 tipc_nodesub_unsubscribe(&p_ptr->subscription); 277 } 278 kfree(p_ptr->user_port); 279 280 spin_lock_bh(&tipc_port_list_lock); 281 list_del(&p_ptr->port_list); 282 list_del(&p_ptr->wait_list); 283 spin_unlock_bh(&tipc_port_list_lock); 284 k_term_timer(&p_ptr->timer); 285 kfree(p_ptr); 286 tipc_net_route_msg(buf); 287 return 0; 288} 289 290static int port_unreliable(struct tipc_port *p_ptr) 291{ 292 return msg_src_droppable(&p_ptr->phdr); 293} 294 295int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 296{ 297 struct tipc_port *p_ptr; 298 299 p_ptr = tipc_port_lock(ref); 300 if (!p_ptr) 301 return -EINVAL; 302 *isunreliable = port_unreliable(p_ptr); 303 tipc_port_unlock(p_ptr); 304 return 0; 305} 306 307int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 308{ 309 struct tipc_port *p_ptr; 310 311 p_ptr = tipc_port_lock(ref); 312 if (!p_ptr) 313 return -EINVAL; 314 msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0)); 315 tipc_port_unlock(p_ptr); 316 return 0; 317} 318 319static int port_unreturnable(struct tipc_port *p_ptr) 320{ 321 return msg_dest_droppable(&p_ptr->phdr); 322} 323 324int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 325{ 326 struct tipc_port *p_ptr; 327 328 p_ptr = tipc_port_lock(ref); 329 if (!p_ptr) 330 return -EINVAL; 331 *isunrejectable = port_unreturnable(p_ptr); 332 tipc_port_unlock(p_ptr); 333 return 0; 334} 335 336int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 337{ 338 struct tipc_port *p_ptr; 339 340 p_ptr = tipc_port_lock(ref); 341 if (!p_ptr) 342 return -EINVAL; 343 msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0)); 344 tipc_port_unlock(p_ptr); 345 return 0; 346} 347 348/* 349 * port_build_proto_msg(): create connection protocol message for port 350 * 351 * On entry the port must be locked and connected. 352 */ 353static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr, 354 u32 type, u32 ack) 355{ 356 struct sk_buff *buf; 357 struct tipc_msg *msg; 358 359 buf = tipc_buf_acquire(INT_H_SIZE); 360 if (buf) { 361 msg = buf_msg(buf); 362 tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE, 363 port_peernode(p_ptr)); 364 msg_set_destport(msg, port_peerport(p_ptr)); 365 msg_set_origport(msg, p_ptr->ref); 366 msg_set_msgcnt(msg, ack); 367 } 368 return buf; 369} 370 371int tipc_reject_msg(struct sk_buff *buf, u32 err) 372{ 373 struct tipc_msg *msg = buf_msg(buf); 374 struct sk_buff *rbuf; 375 struct tipc_msg *rmsg; 376 int hdr_sz; 377 u32 imp; 378 u32 data_sz = msg_data_sz(msg); 379 u32 src_node; 380 u32 rmsg_sz; 381 382 /* discard rejected message if it shouldn't be returned to sender */ 383 if (WARN(!msg_isdata(msg), 384 "attempt to reject message with user=%u", msg_user(msg))) { 385 dump_stack(); 386 goto exit; 387 } 388 if (msg_errcode(msg) || msg_dest_droppable(msg)) 389 goto exit; 390 391 /* 392 * construct returned message by copying rejected message header and 393 * data (or subset), then updating header fields that need adjusting 394 */ 395 hdr_sz = msg_hdr_sz(msg); 396 rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE); 397 398 rbuf = tipc_buf_acquire(rmsg_sz); 399 if (rbuf == NULL) 400 goto exit; 401 402 rmsg = buf_msg(rbuf); 403 skb_copy_to_linear_data(rbuf, msg, rmsg_sz); 404 405 if (msg_connected(rmsg)) { 406 imp = msg_importance(rmsg); 407 if (imp < TIPC_CRITICAL_IMPORTANCE) 408 msg_set_importance(rmsg, ++imp); 409 } 410 msg_set_non_seq(rmsg, 0); 411 msg_set_size(rmsg, rmsg_sz); 412 msg_set_errcode(rmsg, err); 413 msg_set_prevnode(rmsg, tipc_own_addr); 414 msg_swap_words(rmsg, 4, 5); 415 if (!msg_short(rmsg)) 416 msg_swap_words(rmsg, 6, 7); 417 418 /* send self-abort message when rejecting on a connected port */ 419 if (msg_connected(msg)) { 420 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg)); 421 422 if (p_ptr) { 423 struct sk_buff *abuf = NULL; 424 425 if (p_ptr->connected) 426 abuf = port_build_self_abort_msg(p_ptr, err); 427 tipc_port_unlock(p_ptr); 428 tipc_net_route_msg(abuf); 429 } 430 } 431 432 /* send returned message & dispose of rejected message */ 433 src_node = msg_prevnode(msg); 434 if (in_own_node(src_node)) 435 tipc_port_recv_msg(rbuf); 436 else 437 tipc_link_send(rbuf, src_node, msg_link_selector(rmsg)); 438exit: 439 kfree_skb(buf); 440 return data_sz; 441} 442 443int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, 444 struct iovec const *msg_sect, u32 num_sect, 445 unsigned int total_len, int err) 446{ 447 struct sk_buff *buf; 448 int res; 449 450 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, 451 !p_ptr->user_port, &buf); 452 if (!buf) 453 return res; 454 455 return tipc_reject_msg(buf, err); 456} 457 458static void port_timeout(unsigned long ref) 459{ 460 struct tipc_port *p_ptr = tipc_port_lock(ref); 461 struct sk_buff *buf = NULL; 462 463 if (!p_ptr) 464 return; 465 466 if (!p_ptr->connected) { 467 tipc_port_unlock(p_ptr); 468 return; 469 } 470 471 /* Last probe answered ? */ 472 if (p_ptr->probing_state == PROBING) { 473 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 474 } else { 475 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0); 476 p_ptr->probing_state = PROBING; 477 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 478 } 479 tipc_port_unlock(p_ptr); 480 tipc_net_route_msg(buf); 481} 482 483 484static void port_handle_node_down(unsigned long ref) 485{ 486 struct tipc_port *p_ptr = tipc_port_lock(ref); 487 struct sk_buff *buf = NULL; 488 489 if (!p_ptr) 490 return; 491 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 492 tipc_port_unlock(p_ptr); 493 tipc_net_route_msg(buf); 494} 495 496 497static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err) 498{ 499 struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err); 500 501 if (buf) { 502 struct tipc_msg *msg = buf_msg(buf); 503 msg_swap_words(msg, 4, 5); 504 msg_swap_words(msg, 6, 7); 505 } 506 return buf; 507} 508 509 510static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err) 511{ 512 struct sk_buff *buf; 513 struct tipc_msg *msg; 514 u32 imp; 515 516 if (!p_ptr->connected) 517 return NULL; 518 519 buf = tipc_buf_acquire(BASIC_H_SIZE); 520 if (buf) { 521 msg = buf_msg(buf); 522 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE); 523 msg_set_hdr_sz(msg, BASIC_H_SIZE); 524 msg_set_size(msg, BASIC_H_SIZE); 525 imp = msg_importance(msg); 526 if (imp < TIPC_CRITICAL_IMPORTANCE) 527 msg_set_importance(msg, ++imp); 528 msg_set_errcode(msg, err); 529 } 530 return buf; 531} 532 533void tipc_port_recv_proto_msg(struct sk_buff *buf) 534{ 535 struct tipc_msg *msg = buf_msg(buf); 536 struct tipc_port *p_ptr; 537 struct sk_buff *r_buf = NULL; 538 u32 destport = msg_destport(msg); 539 int wakeable; 540 541 /* Validate connection */ 542 p_ptr = tipc_port_lock(destport); 543 if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) { 544 r_buf = tipc_buf_acquire(BASIC_H_SIZE); 545 if (r_buf) { 546 msg = buf_msg(r_buf); 547 tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG, 548 BASIC_H_SIZE, msg_orignode(msg)); 549 msg_set_errcode(msg, TIPC_ERR_NO_PORT); 550 msg_set_origport(msg, destport); 551 msg_set_destport(msg, msg_origport(msg)); 552 } 553 if (p_ptr) 554 tipc_port_unlock(p_ptr); 555 goto exit; 556 } 557 558 /* Process protocol message sent by peer */ 559 switch (msg_type(msg)) { 560 case CONN_ACK: 561 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested && 562 p_ptr->wakeup; 563 p_ptr->acked += msg_msgcnt(msg); 564 if (!tipc_port_congested(p_ptr)) { 565 p_ptr->congested = 0; 566 if (wakeable) 567 p_ptr->wakeup(p_ptr); 568 } 569 break; 570 case CONN_PROBE: 571 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0); 572 break; 573 default: 574 /* CONN_PROBE_REPLY or unrecognized - no action required */ 575 break; 576 } 577 p_ptr->probing_state = CONFIRMED; 578 tipc_port_unlock(p_ptr); 579exit: 580 tipc_net_route_msg(r_buf); 581 kfree_skb(buf); 582} 583 584static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id) 585{ 586 struct publication *publ; 587 588 if (full_id) 589 tipc_printf(buf, "<%u.%u.%u:%u>:", 590 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 591 tipc_node(tipc_own_addr), p_ptr->ref); 592 else 593 tipc_printf(buf, "%-10u:", p_ptr->ref); 594 595 if (p_ptr->connected) { 596 u32 dport = port_peerport(p_ptr); 597 u32 destnode = port_peernode(p_ptr); 598 599 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 600 tipc_zone(destnode), tipc_cluster(destnode), 601 tipc_node(destnode), dport); 602 if (p_ptr->conn_type != 0) 603 tipc_printf(buf, " via {%u,%u}", 604 p_ptr->conn_type, 605 p_ptr->conn_instance); 606 } else if (p_ptr->published) { 607 tipc_printf(buf, " bound to"); 608 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 609 if (publ->lower == publ->upper) 610 tipc_printf(buf, " {%u,%u}", publ->type, 611 publ->lower); 612 else 613 tipc_printf(buf, " {%u,%u,%u}", publ->type, 614 publ->lower, publ->upper); 615 } 616 } 617 tipc_printf(buf, "\n"); 618} 619 620#define MAX_PORT_QUERY 32768 621 622struct sk_buff *tipc_port_get_ports(void) 623{ 624 struct sk_buff *buf; 625 struct tlv_desc *rep_tlv; 626 struct print_buf pb; 627 struct tipc_port *p_ptr; 628 int str_len; 629 630 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 631 if (!buf) 632 return NULL; 633 rep_tlv = (struct tlv_desc *)buf->data; 634 635 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 636 spin_lock_bh(&tipc_port_list_lock); 637 list_for_each_entry(p_ptr, &ports, port_list) { 638 spin_lock_bh(p_ptr->lock); 639 port_print(p_ptr, &pb, 0); 640 spin_unlock_bh(p_ptr->lock); 641 } 642 spin_unlock_bh(&tipc_port_list_lock); 643 str_len = tipc_printbuf_validate(&pb); 644 645 skb_put(buf, TLV_SPACE(str_len)); 646 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 647 648 return buf; 649} 650 651void tipc_port_reinit(void) 652{ 653 struct tipc_port *p_ptr; 654 struct tipc_msg *msg; 655 656 spin_lock_bh(&tipc_port_list_lock); 657 list_for_each_entry(p_ptr, &ports, port_list) { 658 msg = &p_ptr->phdr; 659 msg_set_prevnode(msg, tipc_own_addr); 660 msg_set_orignode(msg, tipc_own_addr); 661 } 662 spin_unlock_bh(&tipc_port_list_lock); 663} 664 665 666/* 667 * port_dispatcher_sigh(): Signal handler for messages destinated 668 * to the tipc_port interface. 669 */ 670static void port_dispatcher_sigh(void *dummy) 671{ 672 struct sk_buff *buf; 673 674 spin_lock_bh(&queue_lock); 675 buf = msg_queue_head; 676 msg_queue_head = NULL; 677 spin_unlock_bh(&queue_lock); 678 679 while (buf) { 680 struct tipc_port *p_ptr; 681 struct user_port *up_ptr; 682 struct tipc_portid orig; 683 struct tipc_name_seq dseq; 684 void *usr_handle; 685 int connected; 686 int peer_invalid; 687 int published; 688 u32 message_type; 689 690 struct sk_buff *next = buf->next; 691 struct tipc_msg *msg = buf_msg(buf); 692 u32 dref = msg_destport(msg); 693 694 message_type = msg_type(msg); 695 if (message_type > TIPC_DIRECT_MSG) 696 goto reject; /* Unsupported message type */ 697 698 p_ptr = tipc_port_lock(dref); 699 if (!p_ptr) 700 goto reject; /* Port deleted while msg in queue */ 701 702 orig.ref = msg_origport(msg); 703 orig.node = msg_orignode(msg); 704 up_ptr = p_ptr->user_port; 705 usr_handle = up_ptr->usr_handle; 706 connected = p_ptr->connected; 707 peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg); 708 published = p_ptr->published; 709 710 if (unlikely(msg_errcode(msg))) 711 goto err; 712 713 switch (message_type) { 714 715 case TIPC_CONN_MSG:{ 716 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 717 u32 dsz; 718 719 tipc_port_unlock(p_ptr); 720 if (unlikely(!cb)) 721 goto reject; 722 if (unlikely(!connected)) { 723 if (tipc_connect2port(dref, &orig)) 724 goto reject; 725 } else if (peer_invalid) 726 goto reject; 727 dsz = msg_data_sz(msg); 728 if (unlikely(dsz && 729 (++p_ptr->conn_unacked >= 730 TIPC_FLOW_CONTROL_WIN))) 731 tipc_acknowledge(dref, 732 p_ptr->conn_unacked); 733 skb_pull(buf, msg_hdr_sz(msg)); 734 cb(usr_handle, dref, &buf, msg_data(msg), dsz); 735 break; 736 } 737 case TIPC_DIRECT_MSG:{ 738 tipc_msg_event cb = up_ptr->msg_cb; 739 740 tipc_port_unlock(p_ptr); 741 if (unlikely(!cb || connected)) 742 goto reject; 743 skb_pull(buf, msg_hdr_sz(msg)); 744 cb(usr_handle, dref, &buf, msg_data(msg), 745 msg_data_sz(msg), msg_importance(msg), 746 &orig); 747 break; 748 } 749 case TIPC_MCAST_MSG: 750 case TIPC_NAMED_MSG:{ 751 tipc_named_msg_event cb = up_ptr->named_msg_cb; 752 753 tipc_port_unlock(p_ptr); 754 if (unlikely(!cb || connected || !published)) 755 goto reject; 756 dseq.type = msg_nametype(msg); 757 dseq.lower = msg_nameinst(msg); 758 dseq.upper = (message_type == TIPC_NAMED_MSG) 759 ? dseq.lower : msg_nameupper(msg); 760 skb_pull(buf, msg_hdr_sz(msg)); 761 cb(usr_handle, dref, &buf, msg_data(msg), 762 msg_data_sz(msg), msg_importance(msg), 763 &orig, &dseq); 764 break; 765 } 766 } 767 if (buf) 768 kfree_skb(buf); 769 buf = next; 770 continue; 771err: 772 switch (message_type) { 773 774 case TIPC_CONN_MSG:{ 775 tipc_conn_shutdown_event cb = 776 up_ptr->conn_err_cb; 777 778 tipc_port_unlock(p_ptr); 779 if (!cb || !connected || peer_invalid) 780 break; 781 tipc_disconnect(dref); 782 skb_pull(buf, msg_hdr_sz(msg)); 783 cb(usr_handle, dref, &buf, msg_data(msg), 784 msg_data_sz(msg), msg_errcode(msg)); 785 break; 786 } 787 case TIPC_DIRECT_MSG:{ 788 tipc_msg_err_event cb = up_ptr->err_cb; 789 790 tipc_port_unlock(p_ptr); 791 if (!cb || connected) 792 break; 793 skb_pull(buf, msg_hdr_sz(msg)); 794 cb(usr_handle, dref, &buf, msg_data(msg), 795 msg_data_sz(msg), msg_errcode(msg), &orig); 796 break; 797 } 798 case TIPC_MCAST_MSG: 799 case TIPC_NAMED_MSG:{ 800 tipc_named_msg_err_event cb = 801 up_ptr->named_err_cb; 802 803 tipc_port_unlock(p_ptr); 804 if (!cb || connected) 805 break; 806 dseq.type = msg_nametype(msg); 807 dseq.lower = msg_nameinst(msg); 808 dseq.upper = (message_type == TIPC_NAMED_MSG) 809 ? dseq.lower : msg_nameupper(msg); 810 skb_pull(buf, msg_hdr_sz(msg)); 811 cb(usr_handle, dref, &buf, msg_data(msg), 812 msg_data_sz(msg), msg_errcode(msg), &dseq); 813 break; 814 } 815 } 816 if (buf) 817 kfree_skb(buf); 818 buf = next; 819 continue; 820reject: 821 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 822 buf = next; 823 } 824} 825 826/* 827 * port_dispatcher(): Dispatcher for messages destinated 828 * to the tipc_port interface. Called with port locked. 829 */ 830static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 831{ 832 buf->next = NULL; 833 spin_lock_bh(&queue_lock); 834 if (msg_queue_head) { 835 msg_queue_tail->next = buf; 836 msg_queue_tail = buf; 837 } else { 838 msg_queue_tail = msg_queue_head = buf; 839 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 840 } 841 spin_unlock_bh(&queue_lock); 842 return 0; 843} 844 845/* 846 * Wake up port after congestion: Called with port locked 847 */ 848static void port_wakeup_sh(unsigned long ref) 849{ 850 struct tipc_port *p_ptr; 851 struct user_port *up_ptr; 852 tipc_continue_event cb = NULL; 853 void *uh = NULL; 854 855 p_ptr = tipc_port_lock(ref); 856 if (p_ptr) { 857 up_ptr = p_ptr->user_port; 858 if (up_ptr) { 859 cb = up_ptr->continue_event_cb; 860 uh = up_ptr->usr_handle; 861 } 862 tipc_port_unlock(p_ptr); 863 } 864 if (cb) 865 cb(uh, ref); 866} 867 868 869static void port_wakeup(struct tipc_port *p_ptr) 870{ 871 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 872} 873 874void tipc_acknowledge(u32 ref, u32 ack) 875{ 876 struct tipc_port *p_ptr; 877 struct sk_buff *buf = NULL; 878 879 p_ptr = tipc_port_lock(ref); 880 if (!p_ptr) 881 return; 882 if (p_ptr->connected) { 883 p_ptr->conn_unacked -= ack; 884 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); 885 } 886 tipc_port_unlock(p_ptr); 887 tipc_net_route_msg(buf); 888} 889 890/* 891 * tipc_createport(): user level call. 892 */ 893int tipc_createport(void *usr_handle, 894 unsigned int importance, 895 tipc_msg_err_event error_cb, 896 tipc_named_msg_err_event named_error_cb, 897 tipc_conn_shutdown_event conn_error_cb, 898 tipc_msg_event msg_cb, 899 tipc_named_msg_event named_msg_cb, 900 tipc_conn_msg_event conn_msg_cb, 901 tipc_continue_event continue_event_cb, /* May be zero */ 902 u32 *portref) 903{ 904 struct user_port *up_ptr; 905 struct tipc_port *p_ptr; 906 907 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 908 if (!up_ptr) { 909 warn("Port creation failed, no memory\n"); 910 return -ENOMEM; 911 } 912 p_ptr = (struct tipc_port *)tipc_createport_raw(NULL, port_dispatcher, 913 port_wakeup, importance); 914 if (!p_ptr) { 915 kfree(up_ptr); 916 return -ENOMEM; 917 } 918 919 p_ptr->user_port = up_ptr; 920 up_ptr->usr_handle = usr_handle; 921 up_ptr->ref = p_ptr->ref; 922 up_ptr->err_cb = error_cb; 923 up_ptr->named_err_cb = named_error_cb; 924 up_ptr->conn_err_cb = conn_error_cb; 925 up_ptr->msg_cb = msg_cb; 926 up_ptr->named_msg_cb = named_msg_cb; 927 up_ptr->conn_msg_cb = conn_msg_cb; 928 up_ptr->continue_event_cb = continue_event_cb; 929 *portref = p_ptr->ref; 930 tipc_port_unlock(p_ptr); 931 return 0; 932} 933 934int tipc_portimportance(u32 ref, unsigned int *importance) 935{ 936 struct tipc_port *p_ptr; 937 938 p_ptr = tipc_port_lock(ref); 939 if (!p_ptr) 940 return -EINVAL; 941 *importance = (unsigned int)msg_importance(&p_ptr->phdr); 942 tipc_port_unlock(p_ptr); 943 return 0; 944} 945 946int tipc_set_portimportance(u32 ref, unsigned int imp) 947{ 948 struct tipc_port *p_ptr; 949 950 if (imp > TIPC_CRITICAL_IMPORTANCE) 951 return -EINVAL; 952 953 p_ptr = tipc_port_lock(ref); 954 if (!p_ptr) 955 return -EINVAL; 956 msg_set_importance(&p_ptr->phdr, (u32)imp); 957 tipc_port_unlock(p_ptr); 958 return 0; 959} 960 961 962int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 963{ 964 struct tipc_port *p_ptr; 965 struct publication *publ; 966 u32 key; 967 int res = -EINVAL; 968 969 p_ptr = tipc_port_lock(ref); 970 if (!p_ptr) 971 return -EINVAL; 972 973 if (p_ptr->connected) 974 goto exit; 975 key = ref + p_ptr->pub_count + 1; 976 if (key == ref) { 977 res = -EADDRINUSE; 978 goto exit; 979 } 980 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 981 scope, p_ptr->ref, key); 982 if (publ) { 983 list_add(&publ->pport_list, &p_ptr->publications); 984 p_ptr->pub_count++; 985 p_ptr->published = 1; 986 res = 0; 987 } 988exit: 989 tipc_port_unlock(p_ptr); 990 return res; 991} 992 993int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 994{ 995 struct tipc_port *p_ptr; 996 struct publication *publ; 997 struct publication *tpubl; 998 int res = -EINVAL; 999 1000 p_ptr = tipc_port_lock(ref); 1001 if (!p_ptr) 1002 return -EINVAL; 1003 if (!seq) { 1004 list_for_each_entry_safe(publ, tpubl, 1005 &p_ptr->publications, pport_list) { 1006 tipc_nametbl_withdraw(publ->type, publ->lower, 1007 publ->ref, publ->key); 1008 } 1009 res = 0; 1010 } else { 1011 list_for_each_entry_safe(publ, tpubl, 1012 &p_ptr->publications, pport_list) { 1013 if (publ->scope != scope) 1014 continue; 1015 if (publ->type != seq->type) 1016 continue; 1017 if (publ->lower != seq->lower) 1018 continue; 1019 if (publ->upper != seq->upper) 1020 break; 1021 tipc_nametbl_withdraw(publ->type, publ->lower, 1022 publ->ref, publ->key); 1023 res = 0; 1024 break; 1025 } 1026 } 1027 if (list_empty(&p_ptr->publications)) 1028 p_ptr->published = 0; 1029 tipc_port_unlock(p_ptr); 1030 return res; 1031} 1032 1033int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1034{ 1035 struct tipc_port *p_ptr; 1036 struct tipc_msg *msg; 1037 int res = -EINVAL; 1038 1039 p_ptr = tipc_port_lock(ref); 1040 if (!p_ptr) 1041 return -EINVAL; 1042 if (p_ptr->published || p_ptr->connected) 1043 goto exit; 1044 if (!peer->ref) 1045 goto exit; 1046 1047 msg = &p_ptr->phdr; 1048 msg_set_destnode(msg, peer->node); 1049 msg_set_destport(msg, peer->ref); 1050 msg_set_type(msg, TIPC_CONN_MSG); 1051 msg_set_lookup_scope(msg, 0); 1052 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1053 1054 p_ptr->probing_interval = PROBING_INTERVAL; 1055 p_ptr->probing_state = CONFIRMED; 1056 p_ptr->connected = 1; 1057 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1058 1059 tipc_nodesub_subscribe(&p_ptr->subscription, peer->node, 1060 (void *)(unsigned long)ref, 1061 (net_ev_handler)port_handle_node_down); 1062 res = 0; 1063exit: 1064 tipc_port_unlock(p_ptr); 1065 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1066 return res; 1067} 1068 1069/** 1070 * tipc_disconnect_port - disconnect port from peer 1071 * 1072 * Port must be locked. 1073 */ 1074int tipc_disconnect_port(struct tipc_port *tp_ptr) 1075{ 1076 int res; 1077 1078 if (tp_ptr->connected) { 1079 tp_ptr->connected = 0; 1080 /* let timer expire on it's own to avoid deadlock! */ 1081 tipc_nodesub_unsubscribe( 1082 &((struct tipc_port *)tp_ptr)->subscription); 1083 res = 0; 1084 } else { 1085 res = -ENOTCONN; 1086 } 1087 return res; 1088} 1089 1090/* 1091 * tipc_disconnect(): Disconnect port form peer. 1092 * This is a node local operation. 1093 */ 1094int tipc_disconnect(u32 ref) 1095{ 1096 struct tipc_port *p_ptr; 1097 int res; 1098 1099 p_ptr = tipc_port_lock(ref); 1100 if (!p_ptr) 1101 return -EINVAL; 1102 res = tipc_disconnect_port((struct tipc_port *)p_ptr); 1103 tipc_port_unlock(p_ptr); 1104 return res; 1105} 1106 1107/* 1108 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1109 */ 1110int tipc_shutdown(u32 ref) 1111{ 1112 struct tipc_port *p_ptr; 1113 struct sk_buff *buf = NULL; 1114 1115 p_ptr = tipc_port_lock(ref); 1116 if (!p_ptr) 1117 return -EINVAL; 1118 1119 buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN); 1120 tipc_port_unlock(p_ptr); 1121 tipc_net_route_msg(buf); 1122 return tipc_disconnect(ref); 1123} 1124 1125/** 1126 * tipc_port_recv_msg - receive message from lower layer and deliver to port user 1127 */ 1128int tipc_port_recv_msg(struct sk_buff *buf) 1129{ 1130 struct tipc_port *p_ptr; 1131 struct tipc_msg *msg = buf_msg(buf); 1132 u32 destport = msg_destport(msg); 1133 u32 dsz = msg_data_sz(msg); 1134 u32 err; 1135 1136 /* forward unresolved named message */ 1137 if (unlikely(!destport)) { 1138 tipc_net_route_msg(buf); 1139 return dsz; 1140 } 1141 1142 /* validate destination & pass to port, otherwise reject message */ 1143 p_ptr = tipc_port_lock(destport); 1144 if (likely(p_ptr)) { 1145 err = p_ptr->dispatcher(p_ptr, buf); 1146 tipc_port_unlock(p_ptr); 1147 if (likely(!err)) 1148 return dsz; 1149 } else { 1150 err = TIPC_ERR_NO_PORT; 1151 } 1152 1153 return tipc_reject_msg(buf, err); 1154} 1155 1156/* 1157 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1158 * message for this node. 1159 */ 1160static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect, 1161 struct iovec const *msg_sect, 1162 unsigned int total_len) 1163{ 1164 struct sk_buff *buf; 1165 int res; 1166 1167 res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len, 1168 MAX_MSG_SIZE, !sender->user_port, &buf); 1169 if (likely(buf)) 1170 tipc_port_recv_msg(buf); 1171 return res; 1172} 1173 1174/** 1175 * tipc_send - send message sections on connection 1176 */ 1177int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect, 1178 unsigned int total_len) 1179{ 1180 struct tipc_port *p_ptr; 1181 u32 destnode; 1182 int res; 1183 1184 p_ptr = tipc_port_deref(ref); 1185 if (!p_ptr || !p_ptr->connected) 1186 return -EINVAL; 1187 1188 p_ptr->congested = 1; 1189 if (!tipc_port_congested(p_ptr)) { 1190 destnode = port_peernode(p_ptr); 1191 if (likely(!in_own_node(destnode))) 1192 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1193 total_len, destnode); 1194 else 1195 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect, 1196 total_len); 1197 1198 if (likely(res != -ELINKCONG)) { 1199 p_ptr->congested = 0; 1200 if (res > 0) 1201 p_ptr->sent++; 1202 return res; 1203 } 1204 } 1205 if (port_unreliable(p_ptr)) { 1206 p_ptr->congested = 0; 1207 return total_len; 1208 } 1209 return -ELINKCONG; 1210} 1211 1212/** 1213 * tipc_send2name - send message sections to port name 1214 */ 1215int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain, 1216 unsigned int num_sect, struct iovec const *msg_sect, 1217 unsigned int total_len) 1218{ 1219 struct tipc_port *p_ptr; 1220 struct tipc_msg *msg; 1221 u32 destnode = domain; 1222 u32 destport; 1223 int res; 1224 1225 p_ptr = tipc_port_deref(ref); 1226 if (!p_ptr || p_ptr->connected) 1227 return -EINVAL; 1228 1229 msg = &p_ptr->phdr; 1230 msg_set_type(msg, TIPC_NAMED_MSG); 1231 msg_set_hdr_sz(msg, NAMED_H_SIZE); 1232 msg_set_nametype(msg, name->type); 1233 msg_set_nameinst(msg, name->instance); 1234 msg_set_lookup_scope(msg, tipc_addr_scope(domain)); 1235 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1236 msg_set_destnode(msg, destnode); 1237 msg_set_destport(msg, destport); 1238 1239 if (likely(destport || destnode)) { 1240 if (likely(in_own_node(destnode))) 1241 res = tipc_port_recv_sections(p_ptr, num_sect, 1242 msg_sect, total_len); 1243 else if (tipc_own_addr) 1244 res = tipc_link_send_sections_fast(p_ptr, msg_sect, 1245 num_sect, total_len, 1246 destnode); 1247 else 1248 res = tipc_port_reject_sections(p_ptr, msg, msg_sect, 1249 num_sect, total_len, 1250 TIPC_ERR_NO_NODE); 1251 if (likely(res != -ELINKCONG)) { 1252 if (res > 0) 1253 p_ptr->sent++; 1254 return res; 1255 } 1256 if (port_unreliable(p_ptr)) { 1257 return total_len; 1258 } 1259 return -ELINKCONG; 1260 } 1261 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1262 total_len, TIPC_ERR_NO_NAME); 1263} 1264 1265/** 1266 * tipc_send2port - send message sections to port identity 1267 */ 1268int tipc_send2port(u32 ref, struct tipc_portid const *dest, 1269 unsigned int num_sect, struct iovec const *msg_sect, 1270 unsigned int total_len) 1271{ 1272 struct tipc_port *p_ptr; 1273 struct tipc_msg *msg; 1274 int res; 1275 1276 p_ptr = tipc_port_deref(ref); 1277 if (!p_ptr || p_ptr->connected) 1278 return -EINVAL; 1279 1280 msg = &p_ptr->phdr; 1281 msg_set_type(msg, TIPC_DIRECT_MSG); 1282 msg_set_lookup_scope(msg, 0); 1283 msg_set_destnode(msg, dest->node); 1284 msg_set_destport(msg, dest->ref); 1285 msg_set_hdr_sz(msg, BASIC_H_SIZE); 1286 1287 if (in_own_node(dest->node)) 1288 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect, 1289 total_len); 1290 else if (tipc_own_addr) 1291 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1292 total_len, dest->node); 1293 else 1294 res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1295 total_len, TIPC_ERR_NO_NODE); 1296 if (likely(res != -ELINKCONG)) { 1297 if (res > 0) 1298 p_ptr->sent++; 1299 return res; 1300 } 1301 if (port_unreliable(p_ptr)) { 1302 return total_len; 1303 } 1304 return -ELINKCONG; 1305} 1306 1307/** 1308 * tipc_send_buf2port - send message buffer to port identity 1309 */ 1310int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest, 1311 struct sk_buff *buf, unsigned int dsz) 1312{ 1313 struct tipc_port *p_ptr; 1314 struct tipc_msg *msg; 1315 int res; 1316 1317 p_ptr = (struct tipc_port *)tipc_ref_deref(ref); 1318 if (!p_ptr || p_ptr->connected) 1319 return -EINVAL; 1320 1321 msg = &p_ptr->phdr; 1322 msg_set_type(msg, TIPC_DIRECT_MSG); 1323 msg_set_destnode(msg, dest->node); 1324 msg_set_destport(msg, dest->ref); 1325 msg_set_hdr_sz(msg, BASIC_H_SIZE); 1326 msg_set_size(msg, BASIC_H_SIZE + dsz); 1327 if (skb_cow(buf, BASIC_H_SIZE)) 1328 return -ENOMEM; 1329 1330 skb_push(buf, BASIC_H_SIZE); 1331 skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE); 1332 1333 if (in_own_node(dest->node)) 1334 res = tipc_port_recv_msg(buf); 1335 else 1336 res = tipc_send_buf_fast(buf, dest->node); 1337 if (likely(res != -ELINKCONG)) { 1338 if (res > 0) 1339 p_ptr->sent++; 1340 return res; 1341 } 1342 if (port_unreliable(p_ptr)) 1343 return dsz; 1344 return -ELINKCONG; 1345}