Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v3.6-rc6 1350 lines 34 kB view raw
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2007, Ericsson AB 5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "port.h" 40#include "name_table.h" 41 42/* Connection management: */ 43#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 44#define CONFIRMED 0 45#define PROBING 1 46 47#define MAX_REJECT_SIZE 1024 48 49static struct sk_buff *msg_queue_head; 50static struct sk_buff *msg_queue_tail; 51 52DEFINE_SPINLOCK(tipc_port_list_lock); 53static DEFINE_SPINLOCK(queue_lock); 54 55static LIST_HEAD(ports); 56static void port_handle_node_down(unsigned long ref); 57static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err); 58static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err); 59static void port_timeout(unsigned long ref); 60 61 62static u32 port_peernode(struct tipc_port *p_ptr) 63{ 64 return msg_destnode(&p_ptr->phdr); 65} 66 67static u32 port_peerport(struct tipc_port *p_ptr) 68{ 69 return msg_destport(&p_ptr->phdr); 70} 71 72/** 73 * tipc_port_peer_msg - verify message was sent by connected port's peer 74 * 75 * Handles cases where the node's network address has changed from 76 * the default of <0.0.0> to its configured setting. 77 */ 78int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg) 79{ 80 u32 peernode; 81 u32 orignode; 82 83 if (msg_origport(msg) != port_peerport(p_ptr)) 84 return 0; 85 86 orignode = msg_orignode(msg); 87 peernode = port_peernode(p_ptr); 88 return (orignode == peernode) || 89 (!orignode && (peernode == tipc_own_addr)) || 90 (!peernode && (orignode == tipc_own_addr)); 91} 92 93/** 94 * tipc_multicast - send a multicast message to local and remote destinations 95 */ 96int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, 97 u32 num_sect, struct iovec const *msg_sect, 98 unsigned int total_len) 99{ 100 struct tipc_msg *hdr; 101 struct sk_buff *buf; 102 struct sk_buff *ibuf = NULL; 103 struct tipc_port_list dports = {0, NULL, }; 104 struct tipc_port *oport = tipc_port_deref(ref); 105 int ext_targets; 106 int res; 107 108 if (unlikely(!oport)) 109 return -EINVAL; 110 111 /* Create multicast message */ 112 hdr = &oport->phdr; 113 msg_set_type(hdr, TIPC_MCAST_MSG); 114 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); 115 msg_set_destport(hdr, 0); 116 msg_set_destnode(hdr, 0); 117 msg_set_nametype(hdr, seq->type); 118 msg_set_namelower(hdr, seq->lower); 119 msg_set_nameupper(hdr, seq->upper); 120 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 121 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, 122 !oport->user_port, &buf); 123 if (unlikely(!buf)) 124 return res; 125 126 /* Figure out where to send multicast message */ 127 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 128 TIPC_NODE_SCOPE, &dports); 129 130 /* Send message to destinations (duplicate it only if necessary) */ 131 if (ext_targets) { 132 if (dports.count != 0) { 133 ibuf = skb_copy(buf, GFP_ATOMIC); 134 if (ibuf == NULL) { 135 tipc_port_list_free(&dports); 136 kfree_skb(buf); 137 return -ENOMEM; 138 } 139 } 140 res = tipc_bclink_send_msg(buf); 141 if ((res < 0) && (dports.count != 0)) 142 kfree_skb(ibuf); 143 } else { 144 ibuf = buf; 145 } 146 147 if (res >= 0) { 148 if (ibuf) 149 tipc_port_recv_mcast(ibuf, &dports); 150 } else { 151 tipc_port_list_free(&dports); 152 } 153 return res; 154} 155 156/** 157 * tipc_port_recv_mcast - deliver multicast message to all destination ports 158 * 159 * If there is no port list, perform a lookup to create one 160 */ 161void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp) 162{ 163 struct tipc_msg *msg; 164 struct tipc_port_list dports = {0, NULL, }; 165 struct tipc_port_list *item = dp; 166 int cnt = 0; 167 168 msg = buf_msg(buf); 169 170 /* Create destination port list, if one wasn't supplied */ 171 if (dp == NULL) { 172 tipc_nametbl_mc_translate(msg_nametype(msg), 173 msg_namelower(msg), 174 msg_nameupper(msg), 175 TIPC_CLUSTER_SCOPE, 176 &dports); 177 item = dp = &dports; 178 } 179 180 /* Deliver a copy of message to each destination port */ 181 if (dp->count != 0) { 182 msg_set_destnode(msg, tipc_own_addr); 183 if (dp->count == 1) { 184 msg_set_destport(msg, dp->ports[0]); 185 tipc_port_recv_msg(buf); 186 tipc_port_list_free(dp); 187 return; 188 } 189 for (; cnt < dp->count; cnt++) { 190 int index = cnt % PLSIZE; 191 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 192 193 if (b == NULL) { 194 pr_warn("Unable to deliver multicast message(s)\n"); 195 goto exit; 196 } 197 if ((index == 0) && (cnt != 0)) 198 item = item->next; 199 msg_set_destport(buf_msg(b), item->ports[index]); 200 tipc_port_recv_msg(b); 201 } 202 } 203exit: 204 kfree_skb(buf); 205 tipc_port_list_free(dp); 206} 207 208/** 209 * tipc_createport_raw - create a generic TIPC port 210 * 211 * Returns pointer to (locked) TIPC port, or NULL if unable to create it 212 */ 213struct tipc_port *tipc_createport_raw(void *usr_handle, 214 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 215 void (*wakeup)(struct tipc_port *), 216 const u32 importance) 217{ 218 struct tipc_port *p_ptr; 219 struct tipc_msg *msg; 220 u32 ref; 221 222 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 223 if (!p_ptr) { 224 pr_warn("Port creation failed, no memory\n"); 225 return NULL; 226 } 227 ref = tipc_ref_acquire(p_ptr, &p_ptr->lock); 228 if (!ref) { 229 pr_warn("Port creation failed, ref. table exhausted\n"); 230 kfree(p_ptr); 231 return NULL; 232 } 233 234 p_ptr->usr_handle = usr_handle; 235 p_ptr->max_pkt = MAX_PKT_DEFAULT; 236 p_ptr->ref = ref; 237 INIT_LIST_HEAD(&p_ptr->wait_list); 238 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 239 p_ptr->dispatcher = dispatcher; 240 p_ptr->wakeup = wakeup; 241 p_ptr->user_port = NULL; 242 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 243 INIT_LIST_HEAD(&p_ptr->publications); 244 INIT_LIST_HEAD(&p_ptr->port_list); 245 246 /* 247 * Must hold port list lock while initializing message header template 248 * to ensure a change to node's own network address doesn't result 249 * in template containing out-dated network address information 250 */ 251 spin_lock_bh(&tipc_port_list_lock); 252 msg = &p_ptr->phdr; 253 tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0); 254 msg_set_origport(msg, ref); 255 list_add_tail(&p_ptr->port_list, &ports); 256 spin_unlock_bh(&tipc_port_list_lock); 257 return p_ptr; 258} 259 260int tipc_deleteport(u32 ref) 261{ 262 struct tipc_port *p_ptr; 263 struct sk_buff *buf = NULL; 264 265 tipc_withdraw(ref, 0, NULL); 266 p_ptr = tipc_port_lock(ref); 267 if (!p_ptr) 268 return -EINVAL; 269 270 tipc_ref_discard(ref); 271 tipc_port_unlock(p_ptr); 272 273 k_cancel_timer(&p_ptr->timer); 274 if (p_ptr->connected) { 275 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 276 tipc_nodesub_unsubscribe(&p_ptr->subscription); 277 } 278 kfree(p_ptr->user_port); 279 280 spin_lock_bh(&tipc_port_list_lock); 281 list_del(&p_ptr->port_list); 282 list_del(&p_ptr->wait_list); 283 spin_unlock_bh(&tipc_port_list_lock); 284 k_term_timer(&p_ptr->timer); 285 kfree(p_ptr); 286 tipc_net_route_msg(buf); 287 return 0; 288} 289 290static int port_unreliable(struct tipc_port *p_ptr) 291{ 292 return msg_src_droppable(&p_ptr->phdr); 293} 294 295int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 296{ 297 struct tipc_port *p_ptr; 298 299 p_ptr = tipc_port_lock(ref); 300 if (!p_ptr) 301 return -EINVAL; 302 *isunreliable = port_unreliable(p_ptr); 303 tipc_port_unlock(p_ptr); 304 return 0; 305} 306 307int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 308{ 309 struct tipc_port *p_ptr; 310 311 p_ptr = tipc_port_lock(ref); 312 if (!p_ptr) 313 return -EINVAL; 314 msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0)); 315 tipc_port_unlock(p_ptr); 316 return 0; 317} 318 319static int port_unreturnable(struct tipc_port *p_ptr) 320{ 321 return msg_dest_droppable(&p_ptr->phdr); 322} 323 324int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 325{ 326 struct tipc_port *p_ptr; 327 328 p_ptr = tipc_port_lock(ref); 329 if (!p_ptr) 330 return -EINVAL; 331 *isunrejectable = port_unreturnable(p_ptr); 332 tipc_port_unlock(p_ptr); 333 return 0; 334} 335 336int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 337{ 338 struct tipc_port *p_ptr; 339 340 p_ptr = tipc_port_lock(ref); 341 if (!p_ptr) 342 return -EINVAL; 343 msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0)); 344 tipc_port_unlock(p_ptr); 345 return 0; 346} 347 348/* 349 * port_build_proto_msg(): create connection protocol message for port 350 * 351 * On entry the port must be locked and connected. 352 */ 353static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr, 354 u32 type, u32 ack) 355{ 356 struct sk_buff *buf; 357 struct tipc_msg *msg; 358 359 buf = tipc_buf_acquire(INT_H_SIZE); 360 if (buf) { 361 msg = buf_msg(buf); 362 tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE, 363 port_peernode(p_ptr)); 364 msg_set_destport(msg, port_peerport(p_ptr)); 365 msg_set_origport(msg, p_ptr->ref); 366 msg_set_msgcnt(msg, ack); 367 } 368 return buf; 369} 370 371int tipc_reject_msg(struct sk_buff *buf, u32 err) 372{ 373 struct tipc_msg *msg = buf_msg(buf); 374 struct sk_buff *rbuf; 375 struct tipc_msg *rmsg; 376 int hdr_sz; 377 u32 imp; 378 u32 data_sz = msg_data_sz(msg); 379 u32 src_node; 380 u32 rmsg_sz; 381 382 /* discard rejected message if it shouldn't be returned to sender */ 383 if (WARN(!msg_isdata(msg), 384 "attempt to reject message with user=%u", msg_user(msg))) { 385 dump_stack(); 386 goto exit; 387 } 388 if (msg_errcode(msg) || msg_dest_droppable(msg)) 389 goto exit; 390 391 /* 392 * construct returned message by copying rejected message header and 393 * data (or subset), then updating header fields that need adjusting 394 */ 395 hdr_sz = msg_hdr_sz(msg); 396 rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE); 397 398 rbuf = tipc_buf_acquire(rmsg_sz); 399 if (rbuf == NULL) 400 goto exit; 401 402 rmsg = buf_msg(rbuf); 403 skb_copy_to_linear_data(rbuf, msg, rmsg_sz); 404 405 if (msg_connected(rmsg)) { 406 imp = msg_importance(rmsg); 407 if (imp < TIPC_CRITICAL_IMPORTANCE) 408 msg_set_importance(rmsg, ++imp); 409 } 410 msg_set_non_seq(rmsg, 0); 411 msg_set_size(rmsg, rmsg_sz); 412 msg_set_errcode(rmsg, err); 413 msg_set_prevnode(rmsg, tipc_own_addr); 414 msg_swap_words(rmsg, 4, 5); 415 if (!msg_short(rmsg)) 416 msg_swap_words(rmsg, 6, 7); 417 418 /* send self-abort message when rejecting on a connected port */ 419 if (msg_connected(msg)) { 420 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg)); 421 422 if (p_ptr) { 423 struct sk_buff *abuf = NULL; 424 425 if (p_ptr->connected) 426 abuf = port_build_self_abort_msg(p_ptr, err); 427 tipc_port_unlock(p_ptr); 428 tipc_net_route_msg(abuf); 429 } 430 } 431 432 /* send returned message & dispose of rejected message */ 433 src_node = msg_prevnode(msg); 434 if (in_own_node(src_node)) 435 tipc_port_recv_msg(rbuf); 436 else 437 tipc_link_send(rbuf, src_node, msg_link_selector(rmsg)); 438exit: 439 kfree_skb(buf); 440 return data_sz; 441} 442 443int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, 444 struct iovec const *msg_sect, u32 num_sect, 445 unsigned int total_len, int err) 446{ 447 struct sk_buff *buf; 448 int res; 449 450 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, 451 !p_ptr->user_port, &buf); 452 if (!buf) 453 return res; 454 455 return tipc_reject_msg(buf, err); 456} 457 458static void port_timeout(unsigned long ref) 459{ 460 struct tipc_port *p_ptr = tipc_port_lock(ref); 461 struct sk_buff *buf = NULL; 462 463 if (!p_ptr) 464 return; 465 466 if (!p_ptr->connected) { 467 tipc_port_unlock(p_ptr); 468 return; 469 } 470 471 /* Last probe answered ? */ 472 if (p_ptr->probing_state == PROBING) { 473 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 474 } else { 475 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0); 476 p_ptr->probing_state = PROBING; 477 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 478 } 479 tipc_port_unlock(p_ptr); 480 tipc_net_route_msg(buf); 481} 482 483 484static void port_handle_node_down(unsigned long ref) 485{ 486 struct tipc_port *p_ptr = tipc_port_lock(ref); 487 struct sk_buff *buf = NULL; 488 489 if (!p_ptr) 490 return; 491 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 492 tipc_port_unlock(p_ptr); 493 tipc_net_route_msg(buf); 494} 495 496 497static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err) 498{ 499 struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err); 500 501 if (buf) { 502 struct tipc_msg *msg = buf_msg(buf); 503 msg_swap_words(msg, 4, 5); 504 msg_swap_words(msg, 6, 7); 505 } 506 return buf; 507} 508 509 510static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err) 511{ 512 struct sk_buff *buf; 513 struct tipc_msg *msg; 514 u32 imp; 515 516 if (!p_ptr->connected) 517 return NULL; 518 519 buf = tipc_buf_acquire(BASIC_H_SIZE); 520 if (buf) { 521 msg = buf_msg(buf); 522 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE); 523 msg_set_hdr_sz(msg, BASIC_H_SIZE); 524 msg_set_size(msg, BASIC_H_SIZE); 525 imp = msg_importance(msg); 526 if (imp < TIPC_CRITICAL_IMPORTANCE) 527 msg_set_importance(msg, ++imp); 528 msg_set_errcode(msg, err); 529 } 530 return buf; 531} 532 533void tipc_port_recv_proto_msg(struct sk_buff *buf) 534{ 535 struct tipc_msg *msg = buf_msg(buf); 536 struct tipc_port *p_ptr; 537 struct sk_buff *r_buf = NULL; 538 u32 destport = msg_destport(msg); 539 int wakeable; 540 541 /* Validate connection */ 542 p_ptr = tipc_port_lock(destport); 543 if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) { 544 r_buf = tipc_buf_acquire(BASIC_H_SIZE); 545 if (r_buf) { 546 msg = buf_msg(r_buf); 547 tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG, 548 BASIC_H_SIZE, msg_orignode(msg)); 549 msg_set_errcode(msg, TIPC_ERR_NO_PORT); 550 msg_set_origport(msg, destport); 551 msg_set_destport(msg, msg_origport(msg)); 552 } 553 if (p_ptr) 554 tipc_port_unlock(p_ptr); 555 goto exit; 556 } 557 558 /* Process protocol message sent by peer */ 559 switch (msg_type(msg)) { 560 case CONN_ACK: 561 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested && 562 p_ptr->wakeup; 563 p_ptr->acked += msg_msgcnt(msg); 564 if (!tipc_port_congested(p_ptr)) { 565 p_ptr->congested = 0; 566 if (wakeable) 567 p_ptr->wakeup(p_ptr); 568 } 569 break; 570 case CONN_PROBE: 571 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0); 572 break; 573 default: 574 /* CONN_PROBE_REPLY or unrecognized - no action required */ 575 break; 576 } 577 p_ptr->probing_state = CONFIRMED; 578 tipc_port_unlock(p_ptr); 579exit: 580 tipc_net_route_msg(r_buf); 581 kfree_skb(buf); 582} 583 584static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id) 585{ 586 struct publication *publ; 587 int ret; 588 589 if (full_id) 590 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:", 591 tipc_zone(tipc_own_addr), 592 tipc_cluster(tipc_own_addr), 593 tipc_node(tipc_own_addr), p_ptr->ref); 594 else 595 ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref); 596 597 if (p_ptr->connected) { 598 u32 dport = port_peerport(p_ptr); 599 u32 destnode = port_peernode(p_ptr); 600 601 ret += tipc_snprintf(buf + ret, len - ret, 602 " connected to <%u.%u.%u:%u>", 603 tipc_zone(destnode), 604 tipc_cluster(destnode), 605 tipc_node(destnode), dport); 606 if (p_ptr->conn_type != 0) 607 ret += tipc_snprintf(buf + ret, len - ret, 608 " via {%u,%u}", p_ptr->conn_type, 609 p_ptr->conn_instance); 610 } else if (p_ptr->published) { 611 ret += tipc_snprintf(buf + ret, len - ret, " bound to"); 612 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 613 if (publ->lower == publ->upper) 614 ret += tipc_snprintf(buf + ret, len - ret, 615 " {%u,%u}", publ->type, 616 publ->lower); 617 else 618 ret += tipc_snprintf(buf + ret, len - ret, 619 " {%u,%u,%u}", publ->type, 620 publ->lower, publ->upper); 621 } 622 } 623 ret += tipc_snprintf(buf + ret, len - ret, "\n"); 624 return ret; 625} 626 627struct sk_buff *tipc_port_get_ports(void) 628{ 629 struct sk_buff *buf; 630 struct tlv_desc *rep_tlv; 631 char *pb; 632 int pb_len; 633 struct tipc_port *p_ptr; 634 int str_len = 0; 635 636 buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN)); 637 if (!buf) 638 return NULL; 639 rep_tlv = (struct tlv_desc *)buf->data; 640 pb = TLV_DATA(rep_tlv); 641 pb_len = ULTRA_STRING_MAX_LEN; 642 643 spin_lock_bh(&tipc_port_list_lock); 644 list_for_each_entry(p_ptr, &ports, port_list) { 645 spin_lock_bh(p_ptr->lock); 646 str_len += port_print(p_ptr, pb, pb_len, 0); 647 spin_unlock_bh(p_ptr->lock); 648 } 649 spin_unlock_bh(&tipc_port_list_lock); 650 str_len += 1; /* for "\0" */ 651 skb_put(buf, TLV_SPACE(str_len)); 652 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 653 654 return buf; 655} 656 657void tipc_port_reinit(void) 658{ 659 struct tipc_port *p_ptr; 660 struct tipc_msg *msg; 661 662 spin_lock_bh(&tipc_port_list_lock); 663 list_for_each_entry(p_ptr, &ports, port_list) { 664 msg = &p_ptr->phdr; 665 msg_set_prevnode(msg, tipc_own_addr); 666 msg_set_orignode(msg, tipc_own_addr); 667 } 668 spin_unlock_bh(&tipc_port_list_lock); 669} 670 671 672/* 673 * port_dispatcher_sigh(): Signal handler for messages destinated 674 * to the tipc_port interface. 675 */ 676static void port_dispatcher_sigh(void *dummy) 677{ 678 struct sk_buff *buf; 679 680 spin_lock_bh(&queue_lock); 681 buf = msg_queue_head; 682 msg_queue_head = NULL; 683 spin_unlock_bh(&queue_lock); 684 685 while (buf) { 686 struct tipc_port *p_ptr; 687 struct user_port *up_ptr; 688 struct tipc_portid orig; 689 struct tipc_name_seq dseq; 690 void *usr_handle; 691 int connected; 692 int peer_invalid; 693 int published; 694 u32 message_type; 695 696 struct sk_buff *next = buf->next; 697 struct tipc_msg *msg = buf_msg(buf); 698 u32 dref = msg_destport(msg); 699 700 message_type = msg_type(msg); 701 if (message_type > TIPC_DIRECT_MSG) 702 goto reject; /* Unsupported message type */ 703 704 p_ptr = tipc_port_lock(dref); 705 if (!p_ptr) 706 goto reject; /* Port deleted while msg in queue */ 707 708 orig.ref = msg_origport(msg); 709 orig.node = msg_orignode(msg); 710 up_ptr = p_ptr->user_port; 711 usr_handle = up_ptr->usr_handle; 712 connected = p_ptr->connected; 713 peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg); 714 published = p_ptr->published; 715 716 if (unlikely(msg_errcode(msg))) 717 goto err; 718 719 switch (message_type) { 720 721 case TIPC_CONN_MSG:{ 722 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 723 u32 dsz; 724 725 tipc_port_unlock(p_ptr); 726 if (unlikely(!cb)) 727 goto reject; 728 if (unlikely(!connected)) { 729 if (tipc_connect2port(dref, &orig)) 730 goto reject; 731 } else if (peer_invalid) 732 goto reject; 733 dsz = msg_data_sz(msg); 734 if (unlikely(dsz && 735 (++p_ptr->conn_unacked >= 736 TIPC_FLOW_CONTROL_WIN))) 737 tipc_acknowledge(dref, 738 p_ptr->conn_unacked); 739 skb_pull(buf, msg_hdr_sz(msg)); 740 cb(usr_handle, dref, &buf, msg_data(msg), dsz); 741 break; 742 } 743 case TIPC_DIRECT_MSG:{ 744 tipc_msg_event cb = up_ptr->msg_cb; 745 746 tipc_port_unlock(p_ptr); 747 if (unlikely(!cb || connected)) 748 goto reject; 749 skb_pull(buf, msg_hdr_sz(msg)); 750 cb(usr_handle, dref, &buf, msg_data(msg), 751 msg_data_sz(msg), msg_importance(msg), 752 &orig); 753 break; 754 } 755 case TIPC_MCAST_MSG: 756 case TIPC_NAMED_MSG:{ 757 tipc_named_msg_event cb = up_ptr->named_msg_cb; 758 759 tipc_port_unlock(p_ptr); 760 if (unlikely(!cb || connected || !published)) 761 goto reject; 762 dseq.type = msg_nametype(msg); 763 dseq.lower = msg_nameinst(msg); 764 dseq.upper = (message_type == TIPC_NAMED_MSG) 765 ? dseq.lower : msg_nameupper(msg); 766 skb_pull(buf, msg_hdr_sz(msg)); 767 cb(usr_handle, dref, &buf, msg_data(msg), 768 msg_data_sz(msg), msg_importance(msg), 769 &orig, &dseq); 770 break; 771 } 772 } 773 if (buf) 774 kfree_skb(buf); 775 buf = next; 776 continue; 777err: 778 switch (message_type) { 779 780 case TIPC_CONN_MSG:{ 781 tipc_conn_shutdown_event cb = 782 up_ptr->conn_err_cb; 783 784 tipc_port_unlock(p_ptr); 785 if (!cb || !connected || peer_invalid) 786 break; 787 tipc_disconnect(dref); 788 skb_pull(buf, msg_hdr_sz(msg)); 789 cb(usr_handle, dref, &buf, msg_data(msg), 790 msg_data_sz(msg), msg_errcode(msg)); 791 break; 792 } 793 case TIPC_DIRECT_MSG:{ 794 tipc_msg_err_event cb = up_ptr->err_cb; 795 796 tipc_port_unlock(p_ptr); 797 if (!cb || connected) 798 break; 799 skb_pull(buf, msg_hdr_sz(msg)); 800 cb(usr_handle, dref, &buf, msg_data(msg), 801 msg_data_sz(msg), msg_errcode(msg), &orig); 802 break; 803 } 804 case TIPC_MCAST_MSG: 805 case TIPC_NAMED_MSG:{ 806 tipc_named_msg_err_event cb = 807 up_ptr->named_err_cb; 808 809 tipc_port_unlock(p_ptr); 810 if (!cb || connected) 811 break; 812 dseq.type = msg_nametype(msg); 813 dseq.lower = msg_nameinst(msg); 814 dseq.upper = (message_type == TIPC_NAMED_MSG) 815 ? dseq.lower : msg_nameupper(msg); 816 skb_pull(buf, msg_hdr_sz(msg)); 817 cb(usr_handle, dref, &buf, msg_data(msg), 818 msg_data_sz(msg), msg_errcode(msg), &dseq); 819 break; 820 } 821 } 822 if (buf) 823 kfree_skb(buf); 824 buf = next; 825 continue; 826reject: 827 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 828 buf = next; 829 } 830} 831 832/* 833 * port_dispatcher(): Dispatcher for messages destinated 834 * to the tipc_port interface. Called with port locked. 835 */ 836static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 837{ 838 buf->next = NULL; 839 spin_lock_bh(&queue_lock); 840 if (msg_queue_head) { 841 msg_queue_tail->next = buf; 842 msg_queue_tail = buf; 843 } else { 844 msg_queue_tail = msg_queue_head = buf; 845 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 846 } 847 spin_unlock_bh(&queue_lock); 848 return 0; 849} 850 851/* 852 * Wake up port after congestion: Called with port locked 853 */ 854static void port_wakeup_sh(unsigned long ref) 855{ 856 struct tipc_port *p_ptr; 857 struct user_port *up_ptr; 858 tipc_continue_event cb = NULL; 859 void *uh = NULL; 860 861 p_ptr = tipc_port_lock(ref); 862 if (p_ptr) { 863 up_ptr = p_ptr->user_port; 864 if (up_ptr) { 865 cb = up_ptr->continue_event_cb; 866 uh = up_ptr->usr_handle; 867 } 868 tipc_port_unlock(p_ptr); 869 } 870 if (cb) 871 cb(uh, ref); 872} 873 874 875static void port_wakeup(struct tipc_port *p_ptr) 876{ 877 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 878} 879 880void tipc_acknowledge(u32 ref, u32 ack) 881{ 882 struct tipc_port *p_ptr; 883 struct sk_buff *buf = NULL; 884 885 p_ptr = tipc_port_lock(ref); 886 if (!p_ptr) 887 return; 888 if (p_ptr->connected) { 889 p_ptr->conn_unacked -= ack; 890 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); 891 } 892 tipc_port_unlock(p_ptr); 893 tipc_net_route_msg(buf); 894} 895 896/* 897 * tipc_createport(): user level call. 898 */ 899int tipc_createport(void *usr_handle, 900 unsigned int importance, 901 tipc_msg_err_event error_cb, 902 tipc_named_msg_err_event named_error_cb, 903 tipc_conn_shutdown_event conn_error_cb, 904 tipc_msg_event msg_cb, 905 tipc_named_msg_event named_msg_cb, 906 tipc_conn_msg_event conn_msg_cb, 907 tipc_continue_event continue_event_cb, /* May be zero */ 908 u32 *portref) 909{ 910 struct user_port *up_ptr; 911 struct tipc_port *p_ptr; 912 913 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 914 if (!up_ptr) { 915 pr_warn("Port creation failed, no memory\n"); 916 return -ENOMEM; 917 } 918 p_ptr = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, 919 importance); 920 if (!p_ptr) { 921 kfree(up_ptr); 922 return -ENOMEM; 923 } 924 925 p_ptr->user_port = up_ptr; 926 up_ptr->usr_handle = usr_handle; 927 up_ptr->ref = p_ptr->ref; 928 up_ptr->err_cb = error_cb; 929 up_ptr->named_err_cb = named_error_cb; 930 up_ptr->conn_err_cb = conn_error_cb; 931 up_ptr->msg_cb = msg_cb; 932 up_ptr->named_msg_cb = named_msg_cb; 933 up_ptr->conn_msg_cb = conn_msg_cb; 934 up_ptr->continue_event_cb = continue_event_cb; 935 *portref = p_ptr->ref; 936 tipc_port_unlock(p_ptr); 937 return 0; 938} 939 940int tipc_portimportance(u32 ref, unsigned int *importance) 941{ 942 struct tipc_port *p_ptr; 943 944 p_ptr = tipc_port_lock(ref); 945 if (!p_ptr) 946 return -EINVAL; 947 *importance = (unsigned int)msg_importance(&p_ptr->phdr); 948 tipc_port_unlock(p_ptr); 949 return 0; 950} 951 952int tipc_set_portimportance(u32 ref, unsigned int imp) 953{ 954 struct tipc_port *p_ptr; 955 956 if (imp > TIPC_CRITICAL_IMPORTANCE) 957 return -EINVAL; 958 959 p_ptr = tipc_port_lock(ref); 960 if (!p_ptr) 961 return -EINVAL; 962 msg_set_importance(&p_ptr->phdr, (u32)imp); 963 tipc_port_unlock(p_ptr); 964 return 0; 965} 966 967 968int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 969{ 970 struct tipc_port *p_ptr; 971 struct publication *publ; 972 u32 key; 973 int res = -EINVAL; 974 975 p_ptr = tipc_port_lock(ref); 976 if (!p_ptr) 977 return -EINVAL; 978 979 if (p_ptr->connected) 980 goto exit; 981 key = ref + p_ptr->pub_count + 1; 982 if (key == ref) { 983 res = -EADDRINUSE; 984 goto exit; 985 } 986 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 987 scope, p_ptr->ref, key); 988 if (publ) { 989 list_add(&publ->pport_list, &p_ptr->publications); 990 p_ptr->pub_count++; 991 p_ptr->published = 1; 992 res = 0; 993 } 994exit: 995 tipc_port_unlock(p_ptr); 996 return res; 997} 998 999int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1000{ 1001 struct tipc_port *p_ptr; 1002 struct publication *publ; 1003 struct publication *tpubl; 1004 int res = -EINVAL; 1005 1006 p_ptr = tipc_port_lock(ref); 1007 if (!p_ptr) 1008 return -EINVAL; 1009 if (!seq) { 1010 list_for_each_entry_safe(publ, tpubl, 1011 &p_ptr->publications, pport_list) { 1012 tipc_nametbl_withdraw(publ->type, publ->lower, 1013 publ->ref, publ->key); 1014 } 1015 res = 0; 1016 } else { 1017 list_for_each_entry_safe(publ, tpubl, 1018 &p_ptr->publications, pport_list) { 1019 if (publ->scope != scope) 1020 continue; 1021 if (publ->type != seq->type) 1022 continue; 1023 if (publ->lower != seq->lower) 1024 continue; 1025 if (publ->upper != seq->upper) 1026 break; 1027 tipc_nametbl_withdraw(publ->type, publ->lower, 1028 publ->ref, publ->key); 1029 res = 0; 1030 break; 1031 } 1032 } 1033 if (list_empty(&p_ptr->publications)) 1034 p_ptr->published = 0; 1035 tipc_port_unlock(p_ptr); 1036 return res; 1037} 1038 1039int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1040{ 1041 struct tipc_port *p_ptr; 1042 struct tipc_msg *msg; 1043 int res = -EINVAL; 1044 1045 p_ptr = tipc_port_lock(ref); 1046 if (!p_ptr) 1047 return -EINVAL; 1048 if (p_ptr->published || p_ptr->connected) 1049 goto exit; 1050 if (!peer->ref) 1051 goto exit; 1052 1053 msg = &p_ptr->phdr; 1054 msg_set_destnode(msg, peer->node); 1055 msg_set_destport(msg, peer->ref); 1056 msg_set_type(msg, TIPC_CONN_MSG); 1057 msg_set_lookup_scope(msg, 0); 1058 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1059 1060 p_ptr->probing_interval = PROBING_INTERVAL; 1061 p_ptr->probing_state = CONFIRMED; 1062 p_ptr->connected = 1; 1063 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1064 1065 tipc_nodesub_subscribe(&p_ptr->subscription, peer->node, 1066 (void *)(unsigned long)ref, 1067 (net_ev_handler)port_handle_node_down); 1068 res = 0; 1069exit: 1070 tipc_port_unlock(p_ptr); 1071 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1072 return res; 1073} 1074 1075/** 1076 * tipc_disconnect_port - disconnect port from peer 1077 * 1078 * Port must be locked. 1079 */ 1080int tipc_disconnect_port(struct tipc_port *tp_ptr) 1081{ 1082 int res; 1083 1084 if (tp_ptr->connected) { 1085 tp_ptr->connected = 0; 1086 /* let timer expire on it's own to avoid deadlock! */ 1087 tipc_nodesub_unsubscribe(&tp_ptr->subscription); 1088 res = 0; 1089 } else { 1090 res = -ENOTCONN; 1091 } 1092 return res; 1093} 1094 1095/* 1096 * tipc_disconnect(): Disconnect port form peer. 1097 * This is a node local operation. 1098 */ 1099int tipc_disconnect(u32 ref) 1100{ 1101 struct tipc_port *p_ptr; 1102 int res; 1103 1104 p_ptr = tipc_port_lock(ref); 1105 if (!p_ptr) 1106 return -EINVAL; 1107 res = tipc_disconnect_port(p_ptr); 1108 tipc_port_unlock(p_ptr); 1109 return res; 1110} 1111 1112/* 1113 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1114 */ 1115int tipc_shutdown(u32 ref) 1116{ 1117 struct tipc_port *p_ptr; 1118 struct sk_buff *buf = NULL; 1119 1120 p_ptr = tipc_port_lock(ref); 1121 if (!p_ptr) 1122 return -EINVAL; 1123 1124 buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN); 1125 tipc_port_unlock(p_ptr); 1126 tipc_net_route_msg(buf); 1127 return tipc_disconnect(ref); 1128} 1129 1130/** 1131 * tipc_port_recv_msg - receive message from lower layer and deliver to port user 1132 */ 1133int tipc_port_recv_msg(struct sk_buff *buf) 1134{ 1135 struct tipc_port *p_ptr; 1136 struct tipc_msg *msg = buf_msg(buf); 1137 u32 destport = msg_destport(msg); 1138 u32 dsz = msg_data_sz(msg); 1139 u32 err; 1140 1141 /* forward unresolved named message */ 1142 if (unlikely(!destport)) { 1143 tipc_net_route_msg(buf); 1144 return dsz; 1145 } 1146 1147 /* validate destination & pass to port, otherwise reject message */ 1148 p_ptr = tipc_port_lock(destport); 1149 if (likely(p_ptr)) { 1150 err = p_ptr->dispatcher(p_ptr, buf); 1151 tipc_port_unlock(p_ptr); 1152 if (likely(!err)) 1153 return dsz; 1154 } else { 1155 err = TIPC_ERR_NO_PORT; 1156 } 1157 1158 return tipc_reject_msg(buf, err); 1159} 1160 1161/* 1162 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1163 * message for this node. 1164 */ 1165static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect, 1166 struct iovec const *msg_sect, 1167 unsigned int total_len) 1168{ 1169 struct sk_buff *buf; 1170 int res; 1171 1172 res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len, 1173 MAX_MSG_SIZE, !sender->user_port, &buf); 1174 if (likely(buf)) 1175 tipc_port_recv_msg(buf); 1176 return res; 1177} 1178 1179/** 1180 * tipc_send - send message sections on connection 1181 */ 1182int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect, 1183 unsigned int total_len) 1184{ 1185 struct tipc_port *p_ptr; 1186 u32 destnode; 1187 int res; 1188 1189 p_ptr = tipc_port_deref(ref); 1190 if (!p_ptr || !p_ptr->connected) 1191 return -EINVAL; 1192 1193 p_ptr->congested = 1; 1194 if (!tipc_port_congested(p_ptr)) { 1195 destnode = port_peernode(p_ptr); 1196 if (likely(!in_own_node(destnode))) 1197 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1198 total_len, destnode); 1199 else 1200 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect, 1201 total_len); 1202 1203 if (likely(res != -ELINKCONG)) { 1204 p_ptr->congested = 0; 1205 if (res > 0) 1206 p_ptr->sent++; 1207 return res; 1208 } 1209 } 1210 if (port_unreliable(p_ptr)) { 1211 p_ptr->congested = 0; 1212 return total_len; 1213 } 1214 return -ELINKCONG; 1215} 1216 1217/** 1218 * tipc_send2name - send message sections to port name 1219 */ 1220int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain, 1221 unsigned int num_sect, struct iovec const *msg_sect, 1222 unsigned int total_len) 1223{ 1224 struct tipc_port *p_ptr; 1225 struct tipc_msg *msg; 1226 u32 destnode = domain; 1227 u32 destport; 1228 int res; 1229 1230 p_ptr = tipc_port_deref(ref); 1231 if (!p_ptr || p_ptr->connected) 1232 return -EINVAL; 1233 1234 msg = &p_ptr->phdr; 1235 msg_set_type(msg, TIPC_NAMED_MSG); 1236 msg_set_hdr_sz(msg, NAMED_H_SIZE); 1237 msg_set_nametype(msg, name->type); 1238 msg_set_nameinst(msg, name->instance); 1239 msg_set_lookup_scope(msg, tipc_addr_scope(domain)); 1240 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1241 msg_set_destnode(msg, destnode); 1242 msg_set_destport(msg, destport); 1243 1244 if (likely(destport || destnode)) { 1245 if (likely(in_own_node(destnode))) 1246 res = tipc_port_recv_sections(p_ptr, num_sect, 1247 msg_sect, total_len); 1248 else if (tipc_own_addr) 1249 res = tipc_link_send_sections_fast(p_ptr, msg_sect, 1250 num_sect, total_len, 1251 destnode); 1252 else 1253 res = tipc_port_reject_sections(p_ptr, msg, msg_sect, 1254 num_sect, total_len, 1255 TIPC_ERR_NO_NODE); 1256 if (likely(res != -ELINKCONG)) { 1257 if (res > 0) 1258 p_ptr->sent++; 1259 return res; 1260 } 1261 if (port_unreliable(p_ptr)) { 1262 return total_len; 1263 } 1264 return -ELINKCONG; 1265 } 1266 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1267 total_len, TIPC_ERR_NO_NAME); 1268} 1269 1270/** 1271 * tipc_send2port - send message sections to port identity 1272 */ 1273int tipc_send2port(u32 ref, struct tipc_portid const *dest, 1274 unsigned int num_sect, struct iovec const *msg_sect, 1275 unsigned int total_len) 1276{ 1277 struct tipc_port *p_ptr; 1278 struct tipc_msg *msg; 1279 int res; 1280 1281 p_ptr = tipc_port_deref(ref); 1282 if (!p_ptr || p_ptr->connected) 1283 return -EINVAL; 1284 1285 msg = &p_ptr->phdr; 1286 msg_set_type(msg, TIPC_DIRECT_MSG); 1287 msg_set_lookup_scope(msg, 0); 1288 msg_set_destnode(msg, dest->node); 1289 msg_set_destport(msg, dest->ref); 1290 msg_set_hdr_sz(msg, BASIC_H_SIZE); 1291 1292 if (in_own_node(dest->node)) 1293 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect, 1294 total_len); 1295 else if (tipc_own_addr) 1296 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1297 total_len, dest->node); 1298 else 1299 res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1300 total_len, TIPC_ERR_NO_NODE); 1301 if (likely(res != -ELINKCONG)) { 1302 if (res > 0) 1303 p_ptr->sent++; 1304 return res; 1305 } 1306 if (port_unreliable(p_ptr)) { 1307 return total_len; 1308 } 1309 return -ELINKCONG; 1310} 1311 1312/** 1313 * tipc_send_buf2port - send message buffer to port identity 1314 */ 1315int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest, 1316 struct sk_buff *buf, unsigned int dsz) 1317{ 1318 struct tipc_port *p_ptr; 1319 struct tipc_msg *msg; 1320 int res; 1321 1322 p_ptr = (struct tipc_port *)tipc_ref_deref(ref); 1323 if (!p_ptr || p_ptr->connected) 1324 return -EINVAL; 1325 1326 msg = &p_ptr->phdr; 1327 msg_set_type(msg, TIPC_DIRECT_MSG); 1328 msg_set_destnode(msg, dest->node); 1329 msg_set_destport(msg, dest->ref); 1330 msg_set_hdr_sz(msg, BASIC_H_SIZE); 1331 msg_set_size(msg, BASIC_H_SIZE + dsz); 1332 if (skb_cow(buf, BASIC_H_SIZE)) 1333 return -ENOMEM; 1334 1335 skb_push(buf, BASIC_H_SIZE); 1336 skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE); 1337 1338 if (in_own_node(dest->node)) 1339 res = tipc_port_recv_msg(buf); 1340 else 1341 res = tipc_send_buf_fast(buf, dest->node); 1342 if (likely(res != -ELINKCONG)) { 1343 if (res > 0) 1344 p_ptr->sent++; 1345 return res; 1346 } 1347 if (port_unreliable(p_ptr)) 1348 return dsz; 1349 return -ELINKCONG; 1350}