Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.18-rc6 1705 lines 42 kB view raw
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2006, Ericsson AB 5 * Copyright (c) 2004-2005, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "dbg.h" 40#include "port.h" 41#include "addr.h" 42#include "link.h" 43#include "node.h" 44#include "port.h" 45#include "name_table.h" 46#include "user_reg.h" 47#include "msg.h" 48#include "bcast.h" 49 50/* Connection management: */ 51#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 52#define CONFIRMED 0 53#define PROBING 1 54 55#define MAX_REJECT_SIZE 1024 56 57static struct sk_buff *msg_queue_head = NULL; 58static struct sk_buff *msg_queue_tail = NULL; 59 60DEFINE_SPINLOCK(tipc_port_list_lock); 61static DEFINE_SPINLOCK(queue_lock); 62 63static LIST_HEAD(ports); 64static void port_handle_node_down(unsigned long ref); 65static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err); 66static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err); 67static void port_timeout(unsigned long ref); 68 69 70static u32 port_peernode(struct port *p_ptr) 71{ 72 return msg_destnode(&p_ptr->publ.phdr); 73} 74 75static u32 port_peerport(struct port *p_ptr) 76{ 77 return msg_destport(&p_ptr->publ.phdr); 78} 79 80static u32 port_out_seqno(struct port *p_ptr) 81{ 82 return msg_transp_seqno(&p_ptr->publ.phdr); 83} 84 85static void port_incr_out_seqno(struct port *p_ptr) 86{ 87 struct tipc_msg *m = &p_ptr->publ.phdr; 88 89 if (likely(!msg_routed(m))) 90 return; 91 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1)); 92} 93 94/** 95 * tipc_multicast - send a multicast message to local and remote destinations 96 */ 97 98int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, 99 u32 num_sect, struct iovec const *msg_sect) 100{ 101 struct tipc_msg *hdr; 102 struct sk_buff *buf; 103 struct sk_buff *ibuf = NULL; 104 struct port_list dports = {0, NULL, }; 105 struct port *oport = tipc_port_deref(ref); 106 int ext_targets; 107 int res; 108 109 if (unlikely(!oport)) 110 return -EINVAL; 111 112 /* Create multicast message */ 113 114 hdr = &oport->publ.phdr; 115 msg_set_type(hdr, TIPC_MCAST_MSG); 116 msg_set_nametype(hdr, seq->type); 117 msg_set_namelower(hdr, seq->lower); 118 msg_set_nameupper(hdr, seq->upper); 119 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 120 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 121 !oport->user_port, &buf); 122 if (unlikely(!buf)) 123 return res; 124 125 /* Figure out where to send multicast message */ 126 127 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 128 TIPC_NODE_SCOPE, &dports); 129 130 /* Send message to destinations (duplicate it only if necessary) */ 131 132 if (ext_targets) { 133 if (dports.count != 0) { 134 ibuf = skb_copy(buf, GFP_ATOMIC); 135 if (ibuf == NULL) { 136 tipc_port_list_free(&dports); 137 buf_discard(buf); 138 return -ENOMEM; 139 } 140 } 141 res = tipc_bclink_send_msg(buf); 142 if ((res < 0) && (dports.count != 0)) { 143 buf_discard(ibuf); 144 } 145 } else { 146 ibuf = buf; 147 } 148 149 if (res >= 0) { 150 if (ibuf) 151 tipc_port_recv_mcast(ibuf, &dports); 152 } else { 153 tipc_port_list_free(&dports); 154 } 155 return res; 156} 157 158/** 159 * tipc_port_recv_mcast - deliver multicast message to all destination ports 160 * 161 * If there is no port list, perform a lookup to create one 162 */ 163 164void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 165{ 166 struct tipc_msg* msg; 167 struct port_list dports = {0, NULL, }; 168 struct port_list *item = dp; 169 int cnt = 0; 170 171 msg = buf_msg(buf); 172 173 /* Create destination port list, if one wasn't supplied */ 174 175 if (dp == NULL) { 176 tipc_nametbl_mc_translate(msg_nametype(msg), 177 msg_namelower(msg), 178 msg_nameupper(msg), 179 TIPC_CLUSTER_SCOPE, 180 &dports); 181 item = dp = &dports; 182 } 183 184 /* Deliver a copy of message to each destination port */ 185 186 if (dp->count != 0) { 187 if (dp->count == 1) { 188 msg_set_destport(msg, dp->ports[0]); 189 tipc_port_recv_msg(buf); 190 tipc_port_list_free(dp); 191 return; 192 } 193 for (; cnt < dp->count; cnt++) { 194 int index = cnt % PLSIZE; 195 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 196 197 if (b == NULL) { 198 warn("Unable to deliver multicast message(s)\n"); 199 msg_dbg(msg, "LOST:"); 200 goto exit; 201 } 202 if ((index == 0) && (cnt != 0)) { 203 item = item->next; 204 } 205 msg_set_destport(buf_msg(b),item->ports[index]); 206 tipc_port_recv_msg(b); 207 } 208 } 209exit: 210 buf_discard(buf); 211 tipc_port_list_free(dp); 212} 213 214/** 215 * tipc_createport_raw - create a native TIPC port 216 * 217 * Returns local port reference 218 */ 219 220u32 tipc_createport_raw(void *usr_handle, 221 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 222 void (*wakeup)(struct tipc_port *), 223 const u32 importance) 224{ 225 struct port *p_ptr; 226 struct tipc_msg *msg; 227 u32 ref; 228 229 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 230 if (!p_ptr) { 231 warn("Port creation failed, no memory\n"); 232 return 0; 233 } 234 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 235 if (!ref) { 236 warn("Port creation failed, reference table exhausted\n"); 237 kfree(p_ptr); 238 return 0; 239 } 240 241 tipc_port_lock(ref); 242 p_ptr->publ.ref = ref; 243 msg = &p_ptr->publ.phdr; 244 msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0); 245 msg_set_orignode(msg, tipc_own_addr); 246 msg_set_prevnode(msg, tipc_own_addr); 247 msg_set_origport(msg, ref); 248 msg_set_importance(msg,importance); 249 p_ptr->last_in_seqno = 41; 250 p_ptr->sent = 1; 251 p_ptr->publ.usr_handle = usr_handle; 252 INIT_LIST_HEAD(&p_ptr->wait_list); 253 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 254 p_ptr->congested_link = NULL; 255 p_ptr->max_pkt = MAX_PKT_DEFAULT; 256 p_ptr->dispatcher = dispatcher; 257 p_ptr->wakeup = wakeup; 258 p_ptr->user_port = NULL; 259 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 260 spin_lock_bh(&tipc_port_list_lock); 261 INIT_LIST_HEAD(&p_ptr->publications); 262 INIT_LIST_HEAD(&p_ptr->port_list); 263 list_add_tail(&p_ptr->port_list, &ports); 264 spin_unlock_bh(&tipc_port_list_lock); 265 tipc_port_unlock(p_ptr); 266 return ref; 267} 268 269int tipc_deleteport(u32 ref) 270{ 271 struct port *p_ptr; 272 struct sk_buff *buf = NULL; 273 274 tipc_withdraw(ref, 0, NULL); 275 p_ptr = tipc_port_lock(ref); 276 if (!p_ptr) 277 return -EINVAL; 278 279 tipc_ref_discard(ref); 280 tipc_port_unlock(p_ptr); 281 282 k_cancel_timer(&p_ptr->timer); 283 if (p_ptr->publ.connected) { 284 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 285 tipc_nodesub_unsubscribe(&p_ptr->subscription); 286 } 287 if (p_ptr->user_port) { 288 tipc_reg_remove_port(p_ptr->user_port); 289 kfree(p_ptr->user_port); 290 } 291 292 spin_lock_bh(&tipc_port_list_lock); 293 list_del(&p_ptr->port_list); 294 list_del(&p_ptr->wait_list); 295 spin_unlock_bh(&tipc_port_list_lock); 296 k_term_timer(&p_ptr->timer); 297 kfree(p_ptr); 298 dbg("Deleted port %u\n", ref); 299 tipc_net_route_msg(buf); 300 return TIPC_OK; 301} 302 303/** 304 * tipc_get_port() - return port associated with 'ref' 305 * 306 * Note: Port is not locked. 307 */ 308 309struct tipc_port *tipc_get_port(const u32 ref) 310{ 311 return (struct tipc_port *)tipc_ref_deref(ref); 312} 313 314/** 315 * tipc_get_handle - return user handle associated to port 'ref' 316 */ 317 318void *tipc_get_handle(const u32 ref) 319{ 320 struct port *p_ptr; 321 void * handle; 322 323 p_ptr = tipc_port_lock(ref); 324 if (!p_ptr) 325 return NULL; 326 handle = p_ptr->publ.usr_handle; 327 tipc_port_unlock(p_ptr); 328 return handle; 329} 330 331static int port_unreliable(struct port *p_ptr) 332{ 333 return msg_src_droppable(&p_ptr->publ.phdr); 334} 335 336int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 337{ 338 struct port *p_ptr; 339 340 p_ptr = tipc_port_lock(ref); 341 if (!p_ptr) 342 return -EINVAL; 343 *isunreliable = port_unreliable(p_ptr); 344 spin_unlock_bh(p_ptr->publ.lock); 345 return TIPC_OK; 346} 347 348int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 349{ 350 struct port *p_ptr; 351 352 p_ptr = tipc_port_lock(ref); 353 if (!p_ptr) 354 return -EINVAL; 355 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 356 tipc_port_unlock(p_ptr); 357 return TIPC_OK; 358} 359 360static int port_unreturnable(struct port *p_ptr) 361{ 362 return msg_dest_droppable(&p_ptr->publ.phdr); 363} 364 365int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 366{ 367 struct port *p_ptr; 368 369 p_ptr = tipc_port_lock(ref); 370 if (!p_ptr) 371 return -EINVAL; 372 *isunrejectable = port_unreturnable(p_ptr); 373 spin_unlock_bh(p_ptr->publ.lock); 374 return TIPC_OK; 375} 376 377int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 378{ 379 struct port *p_ptr; 380 381 p_ptr = tipc_port_lock(ref); 382 if (!p_ptr) 383 return -EINVAL; 384 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 385 tipc_port_unlock(p_ptr); 386 return TIPC_OK; 387} 388 389/* 390 * port_build_proto_msg(): build a port level protocol 391 * or a connection abortion message. Called with 392 * tipc_port lock on. 393 */ 394static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 395 u32 origport, u32 orignode, 396 u32 usr, u32 type, u32 err, 397 u32 seqno, u32 ack) 398{ 399 struct sk_buff *buf; 400 struct tipc_msg *msg; 401 402 buf = buf_acquire(LONG_H_SIZE); 403 if (buf) { 404 msg = buf_msg(buf); 405 msg_init(msg, usr, type, err, LONG_H_SIZE, destnode); 406 msg_set_destport(msg, destport); 407 msg_set_origport(msg, origport); 408 msg_set_destnode(msg, destnode); 409 msg_set_orignode(msg, orignode); 410 msg_set_transp_seqno(msg, seqno); 411 msg_set_msgcnt(msg, ack); 412 msg_dbg(msg, "PORT>SEND>:"); 413 } 414 return buf; 415} 416 417int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz) 418{ 419 msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr)); 420 msg_set_options(&tp_ptr->phdr, opt, sz); 421 return TIPC_OK; 422} 423 424int tipc_reject_msg(struct sk_buff *buf, u32 err) 425{ 426 struct tipc_msg *msg = buf_msg(buf); 427 struct sk_buff *rbuf; 428 struct tipc_msg *rmsg; 429 int hdr_sz; 430 u32 imp = msg_importance(msg); 431 u32 data_sz = msg_data_sz(msg); 432 433 if (data_sz > MAX_REJECT_SIZE) 434 data_sz = MAX_REJECT_SIZE; 435 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) 436 imp++; 437 msg_dbg(msg, "port->rej: "); 438 439 /* discard rejected message if it shouldn't be returned to sender */ 440 if (msg_errcode(msg) || msg_dest_droppable(msg)) { 441 buf_discard(buf); 442 return data_sz; 443 } 444 445 /* construct rejected message */ 446 if (msg_mcast(msg)) 447 hdr_sz = MCAST_H_SIZE; 448 else 449 hdr_sz = LONG_H_SIZE; 450 rbuf = buf_acquire(data_sz + hdr_sz); 451 if (rbuf == NULL) { 452 buf_discard(buf); 453 return data_sz; 454 } 455 rmsg = buf_msg(rbuf); 456 msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg)); 457 msg_set_destport(rmsg, msg_origport(msg)); 458 msg_set_prevnode(rmsg, tipc_own_addr); 459 msg_set_origport(rmsg, msg_destport(msg)); 460 if (msg_short(msg)) 461 msg_set_orignode(rmsg, tipc_own_addr); 462 else 463 msg_set_orignode(rmsg, msg_destnode(msg)); 464 msg_set_size(rmsg, data_sz + hdr_sz); 465 msg_set_nametype(rmsg, msg_nametype(msg)); 466 msg_set_nameinst(rmsg, msg_nameinst(msg)); 467 memcpy(rbuf->data + hdr_sz, msg_data(msg), data_sz); 468 469 /* send self-abort message when rejecting on a connected port */ 470 if (msg_connected(msg)) { 471 struct sk_buff *abuf = NULL; 472 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 473 474 if (p_ptr) { 475 if (p_ptr->publ.connected) 476 abuf = port_build_self_abort_msg(p_ptr, err); 477 tipc_port_unlock(p_ptr); 478 } 479 tipc_net_route_msg(abuf); 480 } 481 482 /* send rejected message */ 483 buf_discard(buf); 484 tipc_net_route_msg(rbuf); 485 return data_sz; 486} 487 488int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 489 struct iovec const *msg_sect, u32 num_sect, 490 int err) 491{ 492 struct sk_buff *buf; 493 int res; 494 495 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 496 !p_ptr->user_port, &buf); 497 if (!buf) 498 return res; 499 500 return tipc_reject_msg(buf, err); 501} 502 503static void port_timeout(unsigned long ref) 504{ 505 struct port *p_ptr = tipc_port_lock(ref); 506 struct sk_buff *buf = NULL; 507 508 if (!p_ptr || !p_ptr->publ.connected) 509 return; 510 511 /* Last probe answered ? */ 512 if (p_ptr->probing_state == PROBING) { 513 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 514 } else { 515 buf = port_build_proto_msg(port_peerport(p_ptr), 516 port_peernode(p_ptr), 517 p_ptr->publ.ref, 518 tipc_own_addr, 519 CONN_MANAGER, 520 CONN_PROBE, 521 TIPC_OK, 522 port_out_seqno(p_ptr), 523 0); 524 port_incr_out_seqno(p_ptr); 525 p_ptr->probing_state = PROBING; 526 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 527 } 528 tipc_port_unlock(p_ptr); 529 tipc_net_route_msg(buf); 530} 531 532 533static void port_handle_node_down(unsigned long ref) 534{ 535 struct port *p_ptr = tipc_port_lock(ref); 536 struct sk_buff* buf = NULL; 537 538 if (!p_ptr) 539 return; 540 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 541 tipc_port_unlock(p_ptr); 542 tipc_net_route_msg(buf); 543} 544 545 546static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 547{ 548 u32 imp = msg_importance(&p_ptr->publ.phdr); 549 550 if (!p_ptr->publ.connected) 551 return NULL; 552 if (imp < TIPC_CRITICAL_IMPORTANCE) 553 imp++; 554 return port_build_proto_msg(p_ptr->publ.ref, 555 tipc_own_addr, 556 port_peerport(p_ptr), 557 port_peernode(p_ptr), 558 imp, 559 TIPC_CONN_MSG, 560 err, 561 p_ptr->last_in_seqno + 1, 562 0); 563} 564 565 566static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 567{ 568 u32 imp = msg_importance(&p_ptr->publ.phdr); 569 570 if (!p_ptr->publ.connected) 571 return NULL; 572 if (imp < TIPC_CRITICAL_IMPORTANCE) 573 imp++; 574 return port_build_proto_msg(port_peerport(p_ptr), 575 port_peernode(p_ptr), 576 p_ptr->publ.ref, 577 tipc_own_addr, 578 imp, 579 TIPC_CONN_MSG, 580 err, 581 port_out_seqno(p_ptr), 582 0); 583} 584 585void tipc_port_recv_proto_msg(struct sk_buff *buf) 586{ 587 struct tipc_msg *msg = buf_msg(buf); 588 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 589 u32 err = TIPC_OK; 590 struct sk_buff *r_buf = NULL; 591 struct sk_buff *abort_buf = NULL; 592 593 msg_dbg(msg, "PORT<RECV<:"); 594 595 if (!p_ptr) { 596 err = TIPC_ERR_NO_PORT; 597 } else if (p_ptr->publ.connected) { 598 if (port_peernode(p_ptr) != msg_orignode(msg)) 599 err = TIPC_ERR_NO_PORT; 600 if (port_peerport(p_ptr) != msg_origport(msg)) 601 err = TIPC_ERR_NO_PORT; 602 if (!err && msg_routed(msg)) { 603 u32 seqno = msg_transp_seqno(msg); 604 u32 myno = ++p_ptr->last_in_seqno; 605 if (seqno != myno) { 606 err = TIPC_ERR_NO_PORT; 607 abort_buf = port_build_self_abort_msg(p_ptr, err); 608 } 609 } 610 if (msg_type(msg) == CONN_ACK) { 611 int wakeup = tipc_port_congested(p_ptr) && 612 p_ptr->publ.congested && 613 p_ptr->wakeup; 614 p_ptr->acked += msg_msgcnt(msg); 615 if (tipc_port_congested(p_ptr)) 616 goto exit; 617 p_ptr->publ.congested = 0; 618 if (!wakeup) 619 goto exit; 620 p_ptr->wakeup(&p_ptr->publ); 621 goto exit; 622 } 623 } else if (p_ptr->publ.published) { 624 err = TIPC_ERR_NO_PORT; 625 } 626 if (err) { 627 r_buf = port_build_proto_msg(msg_origport(msg), 628 msg_orignode(msg), 629 msg_destport(msg), 630 tipc_own_addr, 631 DATA_HIGH, 632 TIPC_CONN_MSG, 633 err, 634 0, 635 0); 636 goto exit; 637 } 638 639 /* All is fine */ 640 if (msg_type(msg) == CONN_PROBE) { 641 r_buf = port_build_proto_msg(msg_origport(msg), 642 msg_orignode(msg), 643 msg_destport(msg), 644 tipc_own_addr, 645 CONN_MANAGER, 646 CONN_PROBE_REPLY, 647 TIPC_OK, 648 port_out_seqno(p_ptr), 649 0); 650 } 651 p_ptr->probing_state = CONFIRMED; 652 port_incr_out_seqno(p_ptr); 653exit: 654 if (p_ptr) 655 tipc_port_unlock(p_ptr); 656 tipc_net_route_msg(r_buf); 657 tipc_net_route_msg(abort_buf); 658 buf_discard(buf); 659} 660 661static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 662{ 663 struct publication *publ; 664 665 if (full_id) 666 tipc_printf(buf, "<%u.%u.%u:%u>:", 667 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 668 tipc_node(tipc_own_addr), p_ptr->publ.ref); 669 else 670 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 671 672 if (p_ptr->publ.connected) { 673 u32 dport = port_peerport(p_ptr); 674 u32 destnode = port_peernode(p_ptr); 675 676 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 677 tipc_zone(destnode), tipc_cluster(destnode), 678 tipc_node(destnode), dport); 679 if (p_ptr->publ.conn_type != 0) 680 tipc_printf(buf, " via {%u,%u}", 681 p_ptr->publ.conn_type, 682 p_ptr->publ.conn_instance); 683 } 684 else if (p_ptr->publ.published) { 685 tipc_printf(buf, " bound to"); 686 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 687 if (publ->lower == publ->upper) 688 tipc_printf(buf, " {%u,%u}", publ->type, 689 publ->lower); 690 else 691 tipc_printf(buf, " {%u,%u,%u}", publ->type, 692 publ->lower, publ->upper); 693 } 694 } 695 tipc_printf(buf, "\n"); 696} 697 698#define MAX_PORT_QUERY 32768 699 700struct sk_buff *tipc_port_get_ports(void) 701{ 702 struct sk_buff *buf; 703 struct tlv_desc *rep_tlv; 704 struct print_buf pb; 705 struct port *p_ptr; 706 int str_len; 707 708 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 709 if (!buf) 710 return NULL; 711 rep_tlv = (struct tlv_desc *)buf->data; 712 713 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 714 spin_lock_bh(&tipc_port_list_lock); 715 list_for_each_entry(p_ptr, &ports, port_list) { 716 spin_lock_bh(p_ptr->publ.lock); 717 port_print(p_ptr, &pb, 0); 718 spin_unlock_bh(p_ptr->publ.lock); 719 } 720 spin_unlock_bh(&tipc_port_list_lock); 721 str_len = tipc_printbuf_validate(&pb); 722 723 skb_put(buf, TLV_SPACE(str_len)); 724 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 725 726 return buf; 727} 728 729#if 0 730 731#define MAX_PORT_STATS 2000 732 733struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space) 734{ 735 u32 ref; 736 struct port *p_ptr; 737 struct sk_buff *buf; 738 struct tlv_desc *rep_tlv; 739 struct print_buf pb; 740 int str_len; 741 742 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF)) 743 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 744 745 ref = *(u32 *)TLV_DATA(req_tlv_area); 746 ref = ntohl(ref); 747 748 p_ptr = tipc_port_lock(ref); 749 if (!p_ptr) 750 return cfg_reply_error_string("port not found"); 751 752 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS)); 753 if (!buf) { 754 tipc_port_unlock(p_ptr); 755 return NULL; 756 } 757 rep_tlv = (struct tlv_desc *)buf->data; 758 759 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS); 760 port_print(p_ptr, &pb, 1); 761 /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */ 762 tipc_port_unlock(p_ptr); 763 str_len = tipc_printbuf_validate(&pb); 764 765 skb_put(buf, TLV_SPACE(str_len)); 766 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 767 768 return buf; 769} 770 771#endif 772 773void tipc_port_reinit(void) 774{ 775 struct port *p_ptr; 776 struct tipc_msg *msg; 777 778 spin_lock_bh(&tipc_port_list_lock); 779 list_for_each_entry(p_ptr, &ports, port_list) { 780 msg = &p_ptr->publ.phdr; 781 if (msg_orignode(msg) == tipc_own_addr) 782 break; 783 msg_set_orignode(msg, tipc_own_addr); 784 } 785 spin_unlock_bh(&tipc_port_list_lock); 786} 787 788 789/* 790 * port_dispatcher_sigh(): Signal handler for messages destinated 791 * to the tipc_port interface. 792 */ 793 794static void port_dispatcher_sigh(void *dummy) 795{ 796 struct sk_buff *buf; 797 798 spin_lock_bh(&queue_lock); 799 buf = msg_queue_head; 800 msg_queue_head = NULL; 801 spin_unlock_bh(&queue_lock); 802 803 while (buf) { 804 struct port *p_ptr; 805 struct user_port *up_ptr; 806 struct tipc_portid orig; 807 struct tipc_name_seq dseq; 808 void *usr_handle; 809 int connected; 810 int published; 811 u32 message_type; 812 813 struct sk_buff *next = buf->next; 814 struct tipc_msg *msg = buf_msg(buf); 815 u32 dref = msg_destport(msg); 816 817 message_type = msg_type(msg); 818 if (message_type > TIPC_DIRECT_MSG) 819 goto reject; /* Unsupported message type */ 820 821 p_ptr = tipc_port_lock(dref); 822 if (!p_ptr) 823 goto reject; /* Port deleted while msg in queue */ 824 825 orig.ref = msg_origport(msg); 826 orig.node = msg_orignode(msg); 827 up_ptr = p_ptr->user_port; 828 usr_handle = up_ptr->usr_handle; 829 connected = p_ptr->publ.connected; 830 published = p_ptr->publ.published; 831 832 if (unlikely(msg_errcode(msg))) 833 goto err; 834 835 switch (message_type) { 836 837 case TIPC_CONN_MSG:{ 838 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 839 u32 peer_port = port_peerport(p_ptr); 840 u32 peer_node = port_peernode(p_ptr); 841 842 spin_unlock_bh(p_ptr->publ.lock); 843 if (unlikely(!connected)) { 844 if (unlikely(published)) 845 goto reject; 846 tipc_connect2port(dref,&orig); 847 } 848 if (unlikely(msg_origport(msg) != peer_port)) 849 goto reject; 850 if (unlikely(msg_orignode(msg) != peer_node)) 851 goto reject; 852 if (unlikely(!cb)) 853 goto reject; 854 if (unlikely(++p_ptr->publ.conn_unacked >= 855 TIPC_FLOW_CONTROL_WIN)) 856 tipc_acknowledge(dref, 857 p_ptr->publ.conn_unacked); 858 skb_pull(buf, msg_hdr_sz(msg)); 859 cb(usr_handle, dref, &buf, msg_data(msg), 860 msg_data_sz(msg)); 861 break; 862 } 863 case TIPC_DIRECT_MSG:{ 864 tipc_msg_event cb = up_ptr->msg_cb; 865 866 spin_unlock_bh(p_ptr->publ.lock); 867 if (unlikely(connected)) 868 goto reject; 869 if (unlikely(!cb)) 870 goto reject; 871 skb_pull(buf, msg_hdr_sz(msg)); 872 cb(usr_handle, dref, &buf, msg_data(msg), 873 msg_data_sz(msg), msg_importance(msg), 874 &orig); 875 break; 876 } 877 case TIPC_MCAST_MSG: 878 case TIPC_NAMED_MSG:{ 879 tipc_named_msg_event cb = up_ptr->named_msg_cb; 880 881 spin_unlock_bh(p_ptr->publ.lock); 882 if (unlikely(connected)) 883 goto reject; 884 if (unlikely(!cb)) 885 goto reject; 886 if (unlikely(!published)) 887 goto reject; 888 dseq.type = msg_nametype(msg); 889 dseq.lower = msg_nameinst(msg); 890 dseq.upper = (message_type == TIPC_NAMED_MSG) 891 ? dseq.lower : msg_nameupper(msg); 892 skb_pull(buf, msg_hdr_sz(msg)); 893 cb(usr_handle, dref, &buf, msg_data(msg), 894 msg_data_sz(msg), msg_importance(msg), 895 &orig, &dseq); 896 break; 897 } 898 } 899 if (buf) 900 buf_discard(buf); 901 buf = next; 902 continue; 903err: 904 switch (message_type) { 905 906 case TIPC_CONN_MSG:{ 907 tipc_conn_shutdown_event cb = 908 up_ptr->conn_err_cb; 909 u32 peer_port = port_peerport(p_ptr); 910 u32 peer_node = port_peernode(p_ptr); 911 912 spin_unlock_bh(p_ptr->publ.lock); 913 if (!connected || !cb) 914 break; 915 if (msg_origport(msg) != peer_port) 916 break; 917 if (msg_orignode(msg) != peer_node) 918 break; 919 tipc_disconnect(dref); 920 skb_pull(buf, msg_hdr_sz(msg)); 921 cb(usr_handle, dref, &buf, msg_data(msg), 922 msg_data_sz(msg), msg_errcode(msg)); 923 break; 924 } 925 case TIPC_DIRECT_MSG:{ 926 tipc_msg_err_event cb = up_ptr->err_cb; 927 928 spin_unlock_bh(p_ptr->publ.lock); 929 if (connected || !cb) 930 break; 931 skb_pull(buf, msg_hdr_sz(msg)); 932 cb(usr_handle, dref, &buf, msg_data(msg), 933 msg_data_sz(msg), msg_errcode(msg), &orig); 934 break; 935 } 936 case TIPC_MCAST_MSG: 937 case TIPC_NAMED_MSG:{ 938 tipc_named_msg_err_event cb = 939 up_ptr->named_err_cb; 940 941 spin_unlock_bh(p_ptr->publ.lock); 942 if (connected || !cb) 943 break; 944 dseq.type = msg_nametype(msg); 945 dseq.lower = msg_nameinst(msg); 946 dseq.upper = (message_type == TIPC_NAMED_MSG) 947 ? dseq.lower : msg_nameupper(msg); 948 skb_pull(buf, msg_hdr_sz(msg)); 949 cb(usr_handle, dref, &buf, msg_data(msg), 950 msg_data_sz(msg), msg_errcode(msg), &dseq); 951 break; 952 } 953 } 954 if (buf) 955 buf_discard(buf); 956 buf = next; 957 continue; 958reject: 959 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 960 buf = next; 961 } 962} 963 964/* 965 * port_dispatcher(): Dispatcher for messages destinated 966 * to the tipc_port interface. Called with port locked. 967 */ 968 969static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 970{ 971 buf->next = NULL; 972 spin_lock_bh(&queue_lock); 973 if (msg_queue_head) { 974 msg_queue_tail->next = buf; 975 msg_queue_tail = buf; 976 } else { 977 msg_queue_tail = msg_queue_head = buf; 978 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 979 } 980 spin_unlock_bh(&queue_lock); 981 return TIPC_OK; 982} 983 984/* 985 * Wake up port after congestion: Called with port locked, 986 * 987 */ 988 989static void port_wakeup_sh(unsigned long ref) 990{ 991 struct port *p_ptr; 992 struct user_port *up_ptr; 993 tipc_continue_event cb = NULL; 994 void *uh = NULL; 995 996 p_ptr = tipc_port_lock(ref); 997 if (p_ptr) { 998 up_ptr = p_ptr->user_port; 999 if (up_ptr) { 1000 cb = up_ptr->continue_event_cb; 1001 uh = up_ptr->usr_handle; 1002 } 1003 tipc_port_unlock(p_ptr); 1004 } 1005 if (cb) 1006 cb(uh, ref); 1007} 1008 1009 1010static void port_wakeup(struct tipc_port *p_ptr) 1011{ 1012 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 1013} 1014 1015void tipc_acknowledge(u32 ref, u32 ack) 1016{ 1017 struct port *p_ptr; 1018 struct sk_buff *buf = NULL; 1019 1020 p_ptr = tipc_port_lock(ref); 1021 if (!p_ptr) 1022 return; 1023 if (p_ptr->publ.connected) { 1024 p_ptr->publ.conn_unacked -= ack; 1025 buf = port_build_proto_msg(port_peerport(p_ptr), 1026 port_peernode(p_ptr), 1027 ref, 1028 tipc_own_addr, 1029 CONN_MANAGER, 1030 CONN_ACK, 1031 TIPC_OK, 1032 port_out_seqno(p_ptr), 1033 ack); 1034 } 1035 tipc_port_unlock(p_ptr); 1036 tipc_net_route_msg(buf); 1037} 1038 1039/* 1040 * tipc_createport(): user level call. Will add port to 1041 * registry if non-zero user_ref. 1042 */ 1043 1044int tipc_createport(u32 user_ref, 1045 void *usr_handle, 1046 unsigned int importance, 1047 tipc_msg_err_event error_cb, 1048 tipc_named_msg_err_event named_error_cb, 1049 tipc_conn_shutdown_event conn_error_cb, 1050 tipc_msg_event msg_cb, 1051 tipc_named_msg_event named_msg_cb, 1052 tipc_conn_msg_event conn_msg_cb, 1053 tipc_continue_event continue_event_cb,/* May be zero */ 1054 u32 *portref) 1055{ 1056 struct user_port *up_ptr; 1057 struct port *p_ptr; 1058 u32 ref; 1059 1060 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 1061 if (!up_ptr) { 1062 warn("Port creation failed, no memory\n"); 1063 return -ENOMEM; 1064 } 1065 ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance); 1066 p_ptr = tipc_port_lock(ref); 1067 if (!p_ptr) { 1068 kfree(up_ptr); 1069 return -ENOMEM; 1070 } 1071 1072 p_ptr->user_port = up_ptr; 1073 up_ptr->user_ref = user_ref; 1074 up_ptr->usr_handle = usr_handle; 1075 up_ptr->ref = p_ptr->publ.ref; 1076 up_ptr->err_cb = error_cb; 1077 up_ptr->named_err_cb = named_error_cb; 1078 up_ptr->conn_err_cb = conn_error_cb; 1079 up_ptr->msg_cb = msg_cb; 1080 up_ptr->named_msg_cb = named_msg_cb; 1081 up_ptr->conn_msg_cb = conn_msg_cb; 1082 up_ptr->continue_event_cb = continue_event_cb; 1083 INIT_LIST_HEAD(&up_ptr->uport_list); 1084 tipc_reg_add_port(up_ptr); 1085 *portref = p_ptr->publ.ref; 1086 dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref); 1087 tipc_port_unlock(p_ptr); 1088 return TIPC_OK; 1089} 1090 1091int tipc_ownidentity(u32 ref, struct tipc_portid *id) 1092{ 1093 id->ref = ref; 1094 id->node = tipc_own_addr; 1095 return TIPC_OK; 1096} 1097 1098int tipc_portimportance(u32 ref, unsigned int *importance) 1099{ 1100 struct port *p_ptr; 1101 1102 p_ptr = tipc_port_lock(ref); 1103 if (!p_ptr) 1104 return -EINVAL; 1105 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 1106 spin_unlock_bh(p_ptr->publ.lock); 1107 return TIPC_OK; 1108} 1109 1110int tipc_set_portimportance(u32 ref, unsigned int imp) 1111{ 1112 struct port *p_ptr; 1113 1114 if (imp > TIPC_CRITICAL_IMPORTANCE) 1115 return -EINVAL; 1116 1117 p_ptr = tipc_port_lock(ref); 1118 if (!p_ptr) 1119 return -EINVAL; 1120 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 1121 spin_unlock_bh(p_ptr->publ.lock); 1122 return TIPC_OK; 1123} 1124 1125 1126int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1127{ 1128 struct port *p_ptr; 1129 struct publication *publ; 1130 u32 key; 1131 int res = -EINVAL; 1132 1133 p_ptr = tipc_port_lock(ref); 1134 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " 1135 "lower = %u, upper = %u\n", 1136 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper); 1137 if (!p_ptr) 1138 return -EINVAL; 1139 if (p_ptr->publ.connected) 1140 goto exit; 1141 if (seq->lower > seq->upper) 1142 goto exit; 1143 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 1144 goto exit; 1145 key = ref + p_ptr->pub_count + 1; 1146 if (key == ref) { 1147 res = -EADDRINUSE; 1148 goto exit; 1149 } 1150 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1151 scope, p_ptr->publ.ref, key); 1152 if (publ) { 1153 list_add(&publ->pport_list, &p_ptr->publications); 1154 p_ptr->pub_count++; 1155 p_ptr->publ.published = 1; 1156 res = TIPC_OK; 1157 } 1158exit: 1159 tipc_port_unlock(p_ptr); 1160 return res; 1161} 1162 1163int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1164{ 1165 struct port *p_ptr; 1166 struct publication *publ; 1167 struct publication *tpubl; 1168 int res = -EINVAL; 1169 1170 p_ptr = tipc_port_lock(ref); 1171 if (!p_ptr) 1172 return -EINVAL; 1173 if (!seq) { 1174 list_for_each_entry_safe(publ, tpubl, 1175 &p_ptr->publications, pport_list) { 1176 tipc_nametbl_withdraw(publ->type, publ->lower, 1177 publ->ref, publ->key); 1178 } 1179 res = TIPC_OK; 1180 } else { 1181 list_for_each_entry_safe(publ, tpubl, 1182 &p_ptr->publications, pport_list) { 1183 if (publ->scope != scope) 1184 continue; 1185 if (publ->type != seq->type) 1186 continue; 1187 if (publ->lower != seq->lower) 1188 continue; 1189 if (publ->upper != seq->upper) 1190 break; 1191 tipc_nametbl_withdraw(publ->type, publ->lower, 1192 publ->ref, publ->key); 1193 res = TIPC_OK; 1194 break; 1195 } 1196 } 1197 if (list_empty(&p_ptr->publications)) 1198 p_ptr->publ.published = 0; 1199 tipc_port_unlock(p_ptr); 1200 return res; 1201} 1202 1203int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1204{ 1205 struct port *p_ptr; 1206 struct tipc_msg *msg; 1207 int res = -EINVAL; 1208 1209 p_ptr = tipc_port_lock(ref); 1210 if (!p_ptr) 1211 return -EINVAL; 1212 if (p_ptr->publ.published || p_ptr->publ.connected) 1213 goto exit; 1214 if (!peer->ref) 1215 goto exit; 1216 1217 msg = &p_ptr->publ.phdr; 1218 msg_set_destnode(msg, peer->node); 1219 msg_set_destport(msg, peer->ref); 1220 msg_set_orignode(msg, tipc_own_addr); 1221 msg_set_origport(msg, p_ptr->publ.ref); 1222 msg_set_transp_seqno(msg, 42); 1223 msg_set_type(msg, TIPC_CONN_MSG); 1224 if (!may_route(peer->node)) 1225 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1226 else 1227 msg_set_hdr_sz(msg, LONG_H_SIZE); 1228 1229 p_ptr->probing_interval = PROBING_INTERVAL; 1230 p_ptr->probing_state = CONFIRMED; 1231 p_ptr->publ.connected = 1; 1232 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1233 1234 tipc_nodesub_subscribe(&p_ptr->subscription,peer->node, 1235 (void *)(unsigned long)ref, 1236 (net_ev_handler)port_handle_node_down); 1237 res = TIPC_OK; 1238exit: 1239 tipc_port_unlock(p_ptr); 1240 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1241 return res; 1242} 1243 1244/* 1245 * tipc_disconnect(): Disconnect port form peer. 1246 * This is a node local operation. 1247 */ 1248 1249int tipc_disconnect(u32 ref) 1250{ 1251 struct port *p_ptr; 1252 int res = -ENOTCONN; 1253 1254 p_ptr = tipc_port_lock(ref); 1255 if (!p_ptr) 1256 return -EINVAL; 1257 if (p_ptr->publ.connected) { 1258 p_ptr->publ.connected = 0; 1259 /* let timer expire on it's own to avoid deadlock! */ 1260 tipc_nodesub_unsubscribe(&p_ptr->subscription); 1261 res = TIPC_OK; 1262 } 1263 tipc_port_unlock(p_ptr); 1264 return res; 1265} 1266 1267/* 1268 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1269 */ 1270int tipc_shutdown(u32 ref) 1271{ 1272 struct port *p_ptr; 1273 struct sk_buff *buf = NULL; 1274 1275 p_ptr = tipc_port_lock(ref); 1276 if (!p_ptr) 1277 return -EINVAL; 1278 1279 if (p_ptr->publ.connected) { 1280 u32 imp = msg_importance(&p_ptr->publ.phdr); 1281 if (imp < TIPC_CRITICAL_IMPORTANCE) 1282 imp++; 1283 buf = port_build_proto_msg(port_peerport(p_ptr), 1284 port_peernode(p_ptr), 1285 ref, 1286 tipc_own_addr, 1287 imp, 1288 TIPC_CONN_MSG, 1289 TIPC_CONN_SHUTDOWN, 1290 port_out_seqno(p_ptr), 1291 0); 1292 } 1293 tipc_port_unlock(p_ptr); 1294 tipc_net_route_msg(buf); 1295 return tipc_disconnect(ref); 1296} 1297 1298int tipc_isconnected(u32 ref, int *isconnected) 1299{ 1300 struct port *p_ptr; 1301 1302 p_ptr = tipc_port_lock(ref); 1303 if (!p_ptr) 1304 return -EINVAL; 1305 *isconnected = p_ptr->publ.connected; 1306 tipc_port_unlock(p_ptr); 1307 return TIPC_OK; 1308} 1309 1310int tipc_peer(u32 ref, struct tipc_portid *peer) 1311{ 1312 struct port *p_ptr; 1313 int res; 1314 1315 p_ptr = tipc_port_lock(ref); 1316 if (!p_ptr) 1317 return -EINVAL; 1318 if (p_ptr->publ.connected) { 1319 peer->ref = port_peerport(p_ptr); 1320 peer->node = port_peernode(p_ptr); 1321 res = TIPC_OK; 1322 } else 1323 res = -ENOTCONN; 1324 tipc_port_unlock(p_ptr); 1325 return res; 1326} 1327 1328int tipc_ref_valid(u32 ref) 1329{ 1330 /* Works irrespective of type */ 1331 return !!tipc_ref_deref(ref); 1332} 1333 1334 1335/* 1336 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1337 * message for this node. 1338 */ 1339 1340int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1341 struct iovec const *msg_sect) 1342{ 1343 struct sk_buff *buf; 1344 int res; 1345 1346 res = msg_build(&sender->publ.phdr, msg_sect, num_sect, 1347 MAX_MSG_SIZE, !sender->user_port, &buf); 1348 if (likely(buf)) 1349 tipc_port_recv_msg(buf); 1350 return res; 1351} 1352 1353/** 1354 * tipc_send - send message sections on connection 1355 */ 1356 1357int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1358{ 1359 struct port *p_ptr; 1360 u32 destnode; 1361 int res; 1362 1363 p_ptr = tipc_port_deref(ref); 1364 if (!p_ptr || !p_ptr->publ.connected) 1365 return -EINVAL; 1366 1367 p_ptr->publ.congested = 1; 1368 if (!tipc_port_congested(p_ptr)) { 1369 destnode = port_peernode(p_ptr); 1370 if (likely(destnode != tipc_own_addr)) 1371 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1372 destnode); 1373 else 1374 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1375 1376 if (likely(res != -ELINKCONG)) { 1377 port_incr_out_seqno(p_ptr); 1378 p_ptr->publ.congested = 0; 1379 p_ptr->sent++; 1380 return res; 1381 } 1382 } 1383 if (port_unreliable(p_ptr)) { 1384 p_ptr->publ.congested = 0; 1385 /* Just calculate msg length and return */ 1386 return msg_calc_data_size(msg_sect, num_sect); 1387 } 1388 return -ELINKCONG; 1389} 1390 1391/** 1392 * tipc_send_buf - send message buffer on connection 1393 */ 1394 1395int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz) 1396{ 1397 struct port *p_ptr; 1398 struct tipc_msg *msg; 1399 u32 destnode; 1400 u32 hsz; 1401 u32 sz; 1402 u32 res; 1403 1404 p_ptr = tipc_port_deref(ref); 1405 if (!p_ptr || !p_ptr->publ.connected) 1406 return -EINVAL; 1407 1408 msg = &p_ptr->publ.phdr; 1409 hsz = msg_hdr_sz(msg); 1410 sz = hsz + dsz; 1411 msg_set_size(msg, sz); 1412 if (skb_cow(buf, hsz)) 1413 return -ENOMEM; 1414 1415 skb_push(buf, hsz); 1416 memcpy(buf->data, (unchar *)msg, hsz); 1417 destnode = msg_destnode(msg); 1418 p_ptr->publ.congested = 1; 1419 if (!tipc_port_congested(p_ptr)) { 1420 if (likely(destnode != tipc_own_addr)) 1421 res = tipc_send_buf_fast(buf, destnode); 1422 else { 1423 tipc_port_recv_msg(buf); 1424 res = sz; 1425 } 1426 if (likely(res != -ELINKCONG)) { 1427 port_incr_out_seqno(p_ptr); 1428 p_ptr->sent++; 1429 p_ptr->publ.congested = 0; 1430 return res; 1431 } 1432 } 1433 if (port_unreliable(p_ptr)) { 1434 p_ptr->publ.congested = 0; 1435 return dsz; 1436 } 1437 return -ELINKCONG; 1438} 1439 1440/** 1441 * tipc_forward2name - forward message sections to port name 1442 */ 1443 1444int tipc_forward2name(u32 ref, 1445 struct tipc_name const *name, 1446 u32 domain, 1447 u32 num_sect, 1448 struct iovec const *msg_sect, 1449 struct tipc_portid const *orig, 1450 unsigned int importance) 1451{ 1452 struct port *p_ptr; 1453 struct tipc_msg *msg; 1454 u32 destnode = domain; 1455 u32 destport = 0; 1456 int res; 1457 1458 p_ptr = tipc_port_deref(ref); 1459 if (!p_ptr || p_ptr->publ.connected) 1460 return -EINVAL; 1461 1462 msg = &p_ptr->publ.phdr; 1463 msg_set_type(msg, TIPC_NAMED_MSG); 1464 msg_set_orignode(msg, orig->node); 1465 msg_set_origport(msg, orig->ref); 1466 msg_set_hdr_sz(msg, LONG_H_SIZE); 1467 msg_set_nametype(msg, name->type); 1468 msg_set_nameinst(msg, name->instance); 1469 msg_set_lookup_scope(msg, addr_scope(domain)); 1470 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1471 msg_set_importance(msg,importance); 1472 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1473 msg_set_destnode(msg, destnode); 1474 msg_set_destport(msg, destport); 1475 1476 if (likely(destport || destnode)) { 1477 p_ptr->sent++; 1478 if (likely(destnode == tipc_own_addr)) 1479 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1480 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1481 destnode); 1482 if (likely(res != -ELINKCONG)) 1483 return res; 1484 if (port_unreliable(p_ptr)) { 1485 /* Just calculate msg length and return */ 1486 return msg_calc_data_size(msg_sect, num_sect); 1487 } 1488 return -ELINKCONG; 1489 } 1490 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1491 TIPC_ERR_NO_NAME); 1492} 1493 1494/** 1495 * tipc_send2name - send message sections to port name 1496 */ 1497 1498int tipc_send2name(u32 ref, 1499 struct tipc_name const *name, 1500 unsigned int domain, 1501 unsigned int num_sect, 1502 struct iovec const *msg_sect) 1503{ 1504 struct tipc_portid orig; 1505 1506 orig.ref = ref; 1507 orig.node = tipc_own_addr; 1508 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig, 1509 TIPC_PORT_IMPORTANCE); 1510} 1511 1512/** 1513 * tipc_forward_buf2name - forward message buffer to port name 1514 */ 1515 1516int tipc_forward_buf2name(u32 ref, 1517 struct tipc_name const *name, 1518 u32 domain, 1519 struct sk_buff *buf, 1520 unsigned int dsz, 1521 struct tipc_portid const *orig, 1522 unsigned int importance) 1523{ 1524 struct port *p_ptr; 1525 struct tipc_msg *msg; 1526 u32 destnode = domain; 1527 u32 destport = 0; 1528 int res; 1529 1530 p_ptr = (struct port *)tipc_ref_deref(ref); 1531 if (!p_ptr || p_ptr->publ.connected) 1532 return -EINVAL; 1533 1534 msg = &p_ptr->publ.phdr; 1535 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1536 msg_set_importance(msg, importance); 1537 msg_set_type(msg, TIPC_NAMED_MSG); 1538 msg_set_orignode(msg, orig->node); 1539 msg_set_origport(msg, orig->ref); 1540 msg_set_nametype(msg, name->type); 1541 msg_set_nameinst(msg, name->instance); 1542 msg_set_lookup_scope(msg, addr_scope(domain)); 1543 msg_set_hdr_sz(msg, LONG_H_SIZE); 1544 msg_set_size(msg, LONG_H_SIZE + dsz); 1545 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1546 msg_set_destnode(msg, destnode); 1547 msg_set_destport(msg, destport); 1548 msg_dbg(msg, "forw2name ==> "); 1549 if (skb_cow(buf, LONG_H_SIZE)) 1550 return -ENOMEM; 1551 skb_push(buf, LONG_H_SIZE); 1552 memcpy(buf->data, (unchar *)msg, LONG_H_SIZE); 1553 msg_dbg(buf_msg(buf),"PREP:"); 1554 if (likely(destport || destnode)) { 1555 p_ptr->sent++; 1556 if (destnode == tipc_own_addr) 1557 return tipc_port_recv_msg(buf); 1558 res = tipc_send_buf_fast(buf, destnode); 1559 if (likely(res != -ELINKCONG)) 1560 return res; 1561 if (port_unreliable(p_ptr)) 1562 return dsz; 1563 return -ELINKCONG; 1564 } 1565 return tipc_reject_msg(buf, TIPC_ERR_NO_NAME); 1566} 1567 1568/** 1569 * tipc_send_buf2name - send message buffer to port name 1570 */ 1571 1572int tipc_send_buf2name(u32 ref, 1573 struct tipc_name const *dest, 1574 u32 domain, 1575 struct sk_buff *buf, 1576 unsigned int dsz) 1577{ 1578 struct tipc_portid orig; 1579 1580 orig.ref = ref; 1581 orig.node = tipc_own_addr; 1582 return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig, 1583 TIPC_PORT_IMPORTANCE); 1584} 1585 1586/** 1587 * tipc_forward2port - forward message sections to port identity 1588 */ 1589 1590int tipc_forward2port(u32 ref, 1591 struct tipc_portid const *dest, 1592 unsigned int num_sect, 1593 struct iovec const *msg_sect, 1594 struct tipc_portid const *orig, 1595 unsigned int importance) 1596{ 1597 struct port *p_ptr; 1598 struct tipc_msg *msg; 1599 int res; 1600 1601 p_ptr = tipc_port_deref(ref); 1602 if (!p_ptr || p_ptr->publ.connected) 1603 return -EINVAL; 1604 1605 msg = &p_ptr->publ.phdr; 1606 msg_set_type(msg, TIPC_DIRECT_MSG); 1607 msg_set_orignode(msg, orig->node); 1608 msg_set_origport(msg, orig->ref); 1609 msg_set_destnode(msg, dest->node); 1610 msg_set_destport(msg, dest->ref); 1611 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1612 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1613 msg_set_importance(msg, importance); 1614 p_ptr->sent++; 1615 if (dest->node == tipc_own_addr) 1616 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1617 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1618 if (likely(res != -ELINKCONG)) 1619 return res; 1620 if (port_unreliable(p_ptr)) { 1621 /* Just calculate msg length and return */ 1622 return msg_calc_data_size(msg_sect, num_sect); 1623 } 1624 return -ELINKCONG; 1625} 1626 1627/** 1628 * tipc_send2port - send message sections to port identity 1629 */ 1630 1631int tipc_send2port(u32 ref, 1632 struct tipc_portid const *dest, 1633 unsigned int num_sect, 1634 struct iovec const *msg_sect) 1635{ 1636 struct tipc_portid orig; 1637 1638 orig.ref = ref; 1639 orig.node = tipc_own_addr; 1640 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 1641 TIPC_PORT_IMPORTANCE); 1642} 1643 1644/** 1645 * tipc_forward_buf2port - forward message buffer to port identity 1646 */ 1647int tipc_forward_buf2port(u32 ref, 1648 struct tipc_portid const *dest, 1649 struct sk_buff *buf, 1650 unsigned int dsz, 1651 struct tipc_portid const *orig, 1652 unsigned int importance) 1653{ 1654 struct port *p_ptr; 1655 struct tipc_msg *msg; 1656 int res; 1657 1658 p_ptr = (struct port *)tipc_ref_deref(ref); 1659 if (!p_ptr || p_ptr->publ.connected) 1660 return -EINVAL; 1661 1662 msg = &p_ptr->publ.phdr; 1663 msg_set_type(msg, TIPC_DIRECT_MSG); 1664 msg_set_orignode(msg, orig->node); 1665 msg_set_origport(msg, orig->ref); 1666 msg_set_destnode(msg, dest->node); 1667 msg_set_destport(msg, dest->ref); 1668 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1669 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1670 msg_set_importance(msg, importance); 1671 msg_set_size(msg, DIR_MSG_H_SIZE + dsz); 1672 if (skb_cow(buf, DIR_MSG_H_SIZE)) 1673 return -ENOMEM; 1674 1675 skb_push(buf, DIR_MSG_H_SIZE); 1676 memcpy(buf->data, (unchar *)msg, DIR_MSG_H_SIZE); 1677 msg_dbg(msg, "buf2port: "); 1678 p_ptr->sent++; 1679 if (dest->node == tipc_own_addr) 1680 return tipc_port_recv_msg(buf); 1681 res = tipc_send_buf_fast(buf, dest->node); 1682 if (likely(res != -ELINKCONG)) 1683 return res; 1684 if (port_unreliable(p_ptr)) 1685 return dsz; 1686 return -ELINKCONG; 1687} 1688 1689/** 1690 * tipc_send_buf2port - send message buffer to port identity 1691 */ 1692 1693int tipc_send_buf2port(u32 ref, 1694 struct tipc_portid const *dest, 1695 struct sk_buff *buf, 1696 unsigned int dsz) 1697{ 1698 struct tipc_portid orig; 1699 1700 orig.ref = ref; 1701 orig.node = tipc_own_addr; 1702 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 1703 TIPC_PORT_IMPORTANCE); 1704} 1705