Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.22-rc4 1711 lines 42 kB view raw
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2006, Ericsson AB 5 * Copyright (c) 2004-2005, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "dbg.h" 40#include "port.h" 41#include "addr.h" 42#include "link.h" 43#include "node.h" 44#include "port.h" 45#include "name_table.h" 46#include "user_reg.h" 47#include "msg.h" 48#include "bcast.h" 49 50/* Connection management: */ 51#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 52#define CONFIRMED 0 53#define PROBING 1 54 55#define MAX_REJECT_SIZE 1024 56 57static struct sk_buff *msg_queue_head = NULL; 58static struct sk_buff *msg_queue_tail = NULL; 59 60DEFINE_SPINLOCK(tipc_port_list_lock); 61static DEFINE_SPINLOCK(queue_lock); 62 63static LIST_HEAD(ports); 64static void port_handle_node_down(unsigned long ref); 65static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err); 66static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err); 67static void port_timeout(unsigned long ref); 68 69 70static u32 port_peernode(struct port *p_ptr) 71{ 72 return msg_destnode(&p_ptr->publ.phdr); 73} 74 75static u32 port_peerport(struct port *p_ptr) 76{ 77 return msg_destport(&p_ptr->publ.phdr); 78} 79 80static u32 port_out_seqno(struct port *p_ptr) 81{ 82 return msg_transp_seqno(&p_ptr->publ.phdr); 83} 84 85static void port_incr_out_seqno(struct port *p_ptr) 86{ 87 struct tipc_msg *m = &p_ptr->publ.phdr; 88 89 if (likely(!msg_routed(m))) 90 return; 91 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1)); 92} 93 94/** 95 * tipc_multicast - send a multicast message to local and remote destinations 96 */ 97 98int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, 99 u32 num_sect, struct iovec const *msg_sect) 100{ 101 struct tipc_msg *hdr; 102 struct sk_buff *buf; 103 struct sk_buff *ibuf = NULL; 104 struct port_list dports = {0, NULL, }; 105 struct port *oport = tipc_port_deref(ref); 106 int ext_targets; 107 int res; 108 109 if (unlikely(!oport)) 110 return -EINVAL; 111 112 /* Create multicast message */ 113 114 hdr = &oport->publ.phdr; 115 msg_set_type(hdr, TIPC_MCAST_MSG); 116 msg_set_nametype(hdr, seq->type); 117 msg_set_namelower(hdr, seq->lower); 118 msg_set_nameupper(hdr, seq->upper); 119 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 120 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 121 !oport->user_port, &buf); 122 if (unlikely(!buf)) 123 return res; 124 125 /* Figure out where to send multicast message */ 126 127 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 128 TIPC_NODE_SCOPE, &dports); 129 130 /* Send message to destinations (duplicate it only if necessary) */ 131 132 if (ext_targets) { 133 if (dports.count != 0) { 134 ibuf = skb_copy(buf, GFP_ATOMIC); 135 if (ibuf == NULL) { 136 tipc_port_list_free(&dports); 137 buf_discard(buf); 138 return -ENOMEM; 139 } 140 } 141 res = tipc_bclink_send_msg(buf); 142 if ((res < 0) && (dports.count != 0)) { 143 buf_discard(ibuf); 144 } 145 } else { 146 ibuf = buf; 147 } 148 149 if (res >= 0) { 150 if (ibuf) 151 tipc_port_recv_mcast(ibuf, &dports); 152 } else { 153 tipc_port_list_free(&dports); 154 } 155 return res; 156} 157 158/** 159 * tipc_port_recv_mcast - deliver multicast message to all destination ports 160 * 161 * If there is no port list, perform a lookup to create one 162 */ 163 164void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 165{ 166 struct tipc_msg* msg; 167 struct port_list dports = {0, NULL, }; 168 struct port_list *item = dp; 169 int cnt = 0; 170 171 msg = buf_msg(buf); 172 173 /* Create destination port list, if one wasn't supplied */ 174 175 if (dp == NULL) { 176 tipc_nametbl_mc_translate(msg_nametype(msg), 177 msg_namelower(msg), 178 msg_nameupper(msg), 179 TIPC_CLUSTER_SCOPE, 180 &dports); 181 item = dp = &dports; 182 } 183 184 /* Deliver a copy of message to each destination port */ 185 186 if (dp->count != 0) { 187 if (dp->count == 1) { 188 msg_set_destport(msg, dp->ports[0]); 189 tipc_port_recv_msg(buf); 190 tipc_port_list_free(dp); 191 return; 192 } 193 for (; cnt < dp->count; cnt++) { 194 int index = cnt % PLSIZE; 195 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 196 197 if (b == NULL) { 198 warn("Unable to deliver multicast message(s)\n"); 199 msg_dbg(msg, "LOST:"); 200 goto exit; 201 } 202 if ((index == 0) && (cnt != 0)) { 203 item = item->next; 204 } 205 msg_set_destport(buf_msg(b),item->ports[index]); 206 tipc_port_recv_msg(b); 207 } 208 } 209exit: 210 buf_discard(buf); 211 tipc_port_list_free(dp); 212} 213 214/** 215 * tipc_createport_raw - create a native TIPC port 216 * 217 * Returns local port reference 218 */ 219 220u32 tipc_createport_raw(void *usr_handle, 221 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 222 void (*wakeup)(struct tipc_port *), 223 const u32 importance) 224{ 225 struct port *p_ptr; 226 struct tipc_msg *msg; 227 u32 ref; 228 229 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 230 if (!p_ptr) { 231 warn("Port creation failed, no memory\n"); 232 return 0; 233 } 234 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 235 if (!ref) { 236 warn("Port creation failed, reference table exhausted\n"); 237 kfree(p_ptr); 238 return 0; 239 } 240 241 tipc_port_lock(ref); 242 p_ptr->publ.ref = ref; 243 msg = &p_ptr->publ.phdr; 244 msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0); 245 msg_set_orignode(msg, tipc_own_addr); 246 msg_set_prevnode(msg, tipc_own_addr); 247 msg_set_origport(msg, ref); 248 msg_set_importance(msg,importance); 249 p_ptr->last_in_seqno = 41; 250 p_ptr->sent = 1; 251 p_ptr->publ.usr_handle = usr_handle; 252 INIT_LIST_HEAD(&p_ptr->wait_list); 253 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 254 p_ptr->congested_link = NULL; 255 p_ptr->max_pkt = MAX_PKT_DEFAULT; 256 p_ptr->dispatcher = dispatcher; 257 p_ptr->wakeup = wakeup; 258 p_ptr->user_port = NULL; 259 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 260 spin_lock_bh(&tipc_port_list_lock); 261 INIT_LIST_HEAD(&p_ptr->publications); 262 INIT_LIST_HEAD(&p_ptr->port_list); 263 list_add_tail(&p_ptr->port_list, &ports); 264 spin_unlock_bh(&tipc_port_list_lock); 265 tipc_port_unlock(p_ptr); 266 return ref; 267} 268 269int tipc_deleteport(u32 ref) 270{ 271 struct port *p_ptr; 272 struct sk_buff *buf = NULL; 273 274 tipc_withdraw(ref, 0, NULL); 275 p_ptr = tipc_port_lock(ref); 276 if (!p_ptr) 277 return -EINVAL; 278 279 tipc_ref_discard(ref); 280 tipc_port_unlock(p_ptr); 281 282 k_cancel_timer(&p_ptr->timer); 283 if (p_ptr->publ.connected) { 284 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 285 tipc_nodesub_unsubscribe(&p_ptr->subscription); 286 } 287 if (p_ptr->user_port) { 288 tipc_reg_remove_port(p_ptr->user_port); 289 kfree(p_ptr->user_port); 290 } 291 292 spin_lock_bh(&tipc_port_list_lock); 293 list_del(&p_ptr->port_list); 294 list_del(&p_ptr->wait_list); 295 spin_unlock_bh(&tipc_port_list_lock); 296 k_term_timer(&p_ptr->timer); 297 kfree(p_ptr); 298 dbg("Deleted port %u\n", ref); 299 tipc_net_route_msg(buf); 300 return TIPC_OK; 301} 302 303/** 304 * tipc_get_port() - return port associated with 'ref' 305 * 306 * Note: Port is not locked. 307 */ 308 309struct tipc_port *tipc_get_port(const u32 ref) 310{ 311 return (struct tipc_port *)tipc_ref_deref(ref); 312} 313 314/** 315 * tipc_get_handle - return user handle associated to port 'ref' 316 */ 317 318void *tipc_get_handle(const u32 ref) 319{ 320 struct port *p_ptr; 321 void * handle; 322 323 p_ptr = tipc_port_lock(ref); 324 if (!p_ptr) 325 return NULL; 326 handle = p_ptr->publ.usr_handle; 327 tipc_port_unlock(p_ptr); 328 return handle; 329} 330 331static int port_unreliable(struct port *p_ptr) 332{ 333 return msg_src_droppable(&p_ptr->publ.phdr); 334} 335 336int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 337{ 338 struct port *p_ptr; 339 340 p_ptr = tipc_port_lock(ref); 341 if (!p_ptr) 342 return -EINVAL; 343 *isunreliable = port_unreliable(p_ptr); 344 spin_unlock_bh(p_ptr->publ.lock); 345 return TIPC_OK; 346} 347 348int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 349{ 350 struct port *p_ptr; 351 352 p_ptr = tipc_port_lock(ref); 353 if (!p_ptr) 354 return -EINVAL; 355 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 356 tipc_port_unlock(p_ptr); 357 return TIPC_OK; 358} 359 360static int port_unreturnable(struct port *p_ptr) 361{ 362 return msg_dest_droppable(&p_ptr->publ.phdr); 363} 364 365int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 366{ 367 struct port *p_ptr; 368 369 p_ptr = tipc_port_lock(ref); 370 if (!p_ptr) 371 return -EINVAL; 372 *isunrejectable = port_unreturnable(p_ptr); 373 spin_unlock_bh(p_ptr->publ.lock); 374 return TIPC_OK; 375} 376 377int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 378{ 379 struct port *p_ptr; 380 381 p_ptr = tipc_port_lock(ref); 382 if (!p_ptr) 383 return -EINVAL; 384 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 385 tipc_port_unlock(p_ptr); 386 return TIPC_OK; 387} 388 389/* 390 * port_build_proto_msg(): build a port level protocol 391 * or a connection abortion message. Called with 392 * tipc_port lock on. 393 */ 394static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 395 u32 origport, u32 orignode, 396 u32 usr, u32 type, u32 err, 397 u32 seqno, u32 ack) 398{ 399 struct sk_buff *buf; 400 struct tipc_msg *msg; 401 402 buf = buf_acquire(LONG_H_SIZE); 403 if (buf) { 404 msg = buf_msg(buf); 405 msg_init(msg, usr, type, err, LONG_H_SIZE, destnode); 406 msg_set_destport(msg, destport); 407 msg_set_origport(msg, origport); 408 msg_set_destnode(msg, destnode); 409 msg_set_orignode(msg, orignode); 410 msg_set_transp_seqno(msg, seqno); 411 msg_set_msgcnt(msg, ack); 412 msg_dbg(msg, "PORT>SEND>:"); 413 } 414 return buf; 415} 416 417int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz) 418{ 419 msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr)); 420 msg_set_options(&tp_ptr->phdr, opt, sz); 421 return TIPC_OK; 422} 423 424int tipc_reject_msg(struct sk_buff *buf, u32 err) 425{ 426 struct tipc_msg *msg = buf_msg(buf); 427 struct sk_buff *rbuf; 428 struct tipc_msg *rmsg; 429 int hdr_sz; 430 u32 imp = msg_importance(msg); 431 u32 data_sz = msg_data_sz(msg); 432 433 if (data_sz > MAX_REJECT_SIZE) 434 data_sz = MAX_REJECT_SIZE; 435 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) 436 imp++; 437 msg_dbg(msg, "port->rej: "); 438 439 /* discard rejected message if it shouldn't be returned to sender */ 440 if (msg_errcode(msg) || msg_dest_droppable(msg)) { 441 buf_discard(buf); 442 return data_sz; 443 } 444 445 /* construct rejected message */ 446 if (msg_mcast(msg)) 447 hdr_sz = MCAST_H_SIZE; 448 else 449 hdr_sz = LONG_H_SIZE; 450 rbuf = buf_acquire(data_sz + hdr_sz); 451 if (rbuf == NULL) { 452 buf_discard(buf); 453 return data_sz; 454 } 455 rmsg = buf_msg(rbuf); 456 msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg)); 457 msg_set_destport(rmsg, msg_origport(msg)); 458 msg_set_prevnode(rmsg, tipc_own_addr); 459 msg_set_origport(rmsg, msg_destport(msg)); 460 if (msg_short(msg)) 461 msg_set_orignode(rmsg, tipc_own_addr); 462 else 463 msg_set_orignode(rmsg, msg_destnode(msg)); 464 msg_set_size(rmsg, data_sz + hdr_sz); 465 msg_set_nametype(rmsg, msg_nametype(msg)); 466 msg_set_nameinst(rmsg, msg_nameinst(msg)); 467 skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz); 468 469 /* send self-abort message when rejecting on a connected port */ 470 if (msg_connected(msg)) { 471 struct sk_buff *abuf = NULL; 472 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 473 474 if (p_ptr) { 475 if (p_ptr->publ.connected) 476 abuf = port_build_self_abort_msg(p_ptr, err); 477 tipc_port_unlock(p_ptr); 478 } 479 tipc_net_route_msg(abuf); 480 } 481 482 /* send rejected message */ 483 buf_discard(buf); 484 tipc_net_route_msg(rbuf); 485 return data_sz; 486} 487 488int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 489 struct iovec const *msg_sect, u32 num_sect, 490 int err) 491{ 492 struct sk_buff *buf; 493 int res; 494 495 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 496 !p_ptr->user_port, &buf); 497 if (!buf) 498 return res; 499 500 return tipc_reject_msg(buf, err); 501} 502 503static void port_timeout(unsigned long ref) 504{ 505 struct port *p_ptr = tipc_port_lock(ref); 506 struct sk_buff *buf = NULL; 507 508 if (!p_ptr) 509 return; 510 511 if (!p_ptr->publ.connected) { 512 tipc_port_unlock(p_ptr); 513 return; 514 } 515 516 /* Last probe answered ? */ 517 if (p_ptr->probing_state == PROBING) { 518 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 519 } else { 520 buf = port_build_proto_msg(port_peerport(p_ptr), 521 port_peernode(p_ptr), 522 p_ptr->publ.ref, 523 tipc_own_addr, 524 CONN_MANAGER, 525 CONN_PROBE, 526 TIPC_OK, 527 port_out_seqno(p_ptr), 528 0); 529 port_incr_out_seqno(p_ptr); 530 p_ptr->probing_state = PROBING; 531 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 532 } 533 tipc_port_unlock(p_ptr); 534 tipc_net_route_msg(buf); 535} 536 537 538static void port_handle_node_down(unsigned long ref) 539{ 540 struct port *p_ptr = tipc_port_lock(ref); 541 struct sk_buff* buf = NULL; 542 543 if (!p_ptr) 544 return; 545 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 546 tipc_port_unlock(p_ptr); 547 tipc_net_route_msg(buf); 548} 549 550 551static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 552{ 553 u32 imp = msg_importance(&p_ptr->publ.phdr); 554 555 if (!p_ptr->publ.connected) 556 return NULL; 557 if (imp < TIPC_CRITICAL_IMPORTANCE) 558 imp++; 559 return port_build_proto_msg(p_ptr->publ.ref, 560 tipc_own_addr, 561 port_peerport(p_ptr), 562 port_peernode(p_ptr), 563 imp, 564 TIPC_CONN_MSG, 565 err, 566 p_ptr->last_in_seqno + 1, 567 0); 568} 569 570 571static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 572{ 573 u32 imp = msg_importance(&p_ptr->publ.phdr); 574 575 if (!p_ptr->publ.connected) 576 return NULL; 577 if (imp < TIPC_CRITICAL_IMPORTANCE) 578 imp++; 579 return port_build_proto_msg(port_peerport(p_ptr), 580 port_peernode(p_ptr), 581 p_ptr->publ.ref, 582 tipc_own_addr, 583 imp, 584 TIPC_CONN_MSG, 585 err, 586 port_out_seqno(p_ptr), 587 0); 588} 589 590void tipc_port_recv_proto_msg(struct sk_buff *buf) 591{ 592 struct tipc_msg *msg = buf_msg(buf); 593 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 594 u32 err = TIPC_OK; 595 struct sk_buff *r_buf = NULL; 596 struct sk_buff *abort_buf = NULL; 597 598 msg_dbg(msg, "PORT<RECV<:"); 599 600 if (!p_ptr) { 601 err = TIPC_ERR_NO_PORT; 602 } else if (p_ptr->publ.connected) { 603 if (port_peernode(p_ptr) != msg_orignode(msg)) 604 err = TIPC_ERR_NO_PORT; 605 if (port_peerport(p_ptr) != msg_origport(msg)) 606 err = TIPC_ERR_NO_PORT; 607 if (!err && msg_routed(msg)) { 608 u32 seqno = msg_transp_seqno(msg); 609 u32 myno = ++p_ptr->last_in_seqno; 610 if (seqno != myno) { 611 err = TIPC_ERR_NO_PORT; 612 abort_buf = port_build_self_abort_msg(p_ptr, err); 613 } 614 } 615 if (msg_type(msg) == CONN_ACK) { 616 int wakeup = tipc_port_congested(p_ptr) && 617 p_ptr->publ.congested && 618 p_ptr->wakeup; 619 p_ptr->acked += msg_msgcnt(msg); 620 if (tipc_port_congested(p_ptr)) 621 goto exit; 622 p_ptr->publ.congested = 0; 623 if (!wakeup) 624 goto exit; 625 p_ptr->wakeup(&p_ptr->publ); 626 goto exit; 627 } 628 } else if (p_ptr->publ.published) { 629 err = TIPC_ERR_NO_PORT; 630 } 631 if (err) { 632 r_buf = port_build_proto_msg(msg_origport(msg), 633 msg_orignode(msg), 634 msg_destport(msg), 635 tipc_own_addr, 636 DATA_HIGH, 637 TIPC_CONN_MSG, 638 err, 639 0, 640 0); 641 goto exit; 642 } 643 644 /* All is fine */ 645 if (msg_type(msg) == CONN_PROBE) { 646 r_buf = port_build_proto_msg(msg_origport(msg), 647 msg_orignode(msg), 648 msg_destport(msg), 649 tipc_own_addr, 650 CONN_MANAGER, 651 CONN_PROBE_REPLY, 652 TIPC_OK, 653 port_out_seqno(p_ptr), 654 0); 655 } 656 p_ptr->probing_state = CONFIRMED; 657 port_incr_out_seqno(p_ptr); 658exit: 659 if (p_ptr) 660 tipc_port_unlock(p_ptr); 661 tipc_net_route_msg(r_buf); 662 tipc_net_route_msg(abort_buf); 663 buf_discard(buf); 664} 665 666static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 667{ 668 struct publication *publ; 669 670 if (full_id) 671 tipc_printf(buf, "<%u.%u.%u:%u>:", 672 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 673 tipc_node(tipc_own_addr), p_ptr->publ.ref); 674 else 675 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 676 677 if (p_ptr->publ.connected) { 678 u32 dport = port_peerport(p_ptr); 679 u32 destnode = port_peernode(p_ptr); 680 681 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 682 tipc_zone(destnode), tipc_cluster(destnode), 683 tipc_node(destnode), dport); 684 if (p_ptr->publ.conn_type != 0) 685 tipc_printf(buf, " via {%u,%u}", 686 p_ptr->publ.conn_type, 687 p_ptr->publ.conn_instance); 688 } 689 else if (p_ptr->publ.published) { 690 tipc_printf(buf, " bound to"); 691 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 692 if (publ->lower == publ->upper) 693 tipc_printf(buf, " {%u,%u}", publ->type, 694 publ->lower); 695 else 696 tipc_printf(buf, " {%u,%u,%u}", publ->type, 697 publ->lower, publ->upper); 698 } 699 } 700 tipc_printf(buf, "\n"); 701} 702 703#define MAX_PORT_QUERY 32768 704 705struct sk_buff *tipc_port_get_ports(void) 706{ 707 struct sk_buff *buf; 708 struct tlv_desc *rep_tlv; 709 struct print_buf pb; 710 struct port *p_ptr; 711 int str_len; 712 713 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 714 if (!buf) 715 return NULL; 716 rep_tlv = (struct tlv_desc *)buf->data; 717 718 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 719 spin_lock_bh(&tipc_port_list_lock); 720 list_for_each_entry(p_ptr, &ports, port_list) { 721 spin_lock_bh(p_ptr->publ.lock); 722 port_print(p_ptr, &pb, 0); 723 spin_unlock_bh(p_ptr->publ.lock); 724 } 725 spin_unlock_bh(&tipc_port_list_lock); 726 str_len = tipc_printbuf_validate(&pb); 727 728 skb_put(buf, TLV_SPACE(str_len)); 729 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 730 731 return buf; 732} 733 734#if 0 735 736#define MAX_PORT_STATS 2000 737 738struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space) 739{ 740 u32 ref; 741 struct port *p_ptr; 742 struct sk_buff *buf; 743 struct tlv_desc *rep_tlv; 744 struct print_buf pb; 745 int str_len; 746 747 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF)) 748 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 749 750 ref = *(u32 *)TLV_DATA(req_tlv_area); 751 ref = ntohl(ref); 752 753 p_ptr = tipc_port_lock(ref); 754 if (!p_ptr) 755 return cfg_reply_error_string("port not found"); 756 757 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS)); 758 if (!buf) { 759 tipc_port_unlock(p_ptr); 760 return NULL; 761 } 762 rep_tlv = (struct tlv_desc *)buf->data; 763 764 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS); 765 port_print(p_ptr, &pb, 1); 766 /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */ 767 tipc_port_unlock(p_ptr); 768 str_len = tipc_printbuf_validate(&pb); 769 770 skb_put(buf, TLV_SPACE(str_len)); 771 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 772 773 return buf; 774} 775 776#endif 777 778void tipc_port_reinit(void) 779{ 780 struct port *p_ptr; 781 struct tipc_msg *msg; 782 783 spin_lock_bh(&tipc_port_list_lock); 784 list_for_each_entry(p_ptr, &ports, port_list) { 785 msg = &p_ptr->publ.phdr; 786 if (msg_orignode(msg) == tipc_own_addr) 787 break; 788 msg_set_orignode(msg, tipc_own_addr); 789 } 790 spin_unlock_bh(&tipc_port_list_lock); 791} 792 793 794/* 795 * port_dispatcher_sigh(): Signal handler for messages destinated 796 * to the tipc_port interface. 797 */ 798 799static void port_dispatcher_sigh(void *dummy) 800{ 801 struct sk_buff *buf; 802 803 spin_lock_bh(&queue_lock); 804 buf = msg_queue_head; 805 msg_queue_head = NULL; 806 spin_unlock_bh(&queue_lock); 807 808 while (buf) { 809 struct port *p_ptr; 810 struct user_port *up_ptr; 811 struct tipc_portid orig; 812 struct tipc_name_seq dseq; 813 void *usr_handle; 814 int connected; 815 int published; 816 u32 message_type; 817 818 struct sk_buff *next = buf->next; 819 struct tipc_msg *msg = buf_msg(buf); 820 u32 dref = msg_destport(msg); 821 822 message_type = msg_type(msg); 823 if (message_type > TIPC_DIRECT_MSG) 824 goto reject; /* Unsupported message type */ 825 826 p_ptr = tipc_port_lock(dref); 827 if (!p_ptr) 828 goto reject; /* Port deleted while msg in queue */ 829 830 orig.ref = msg_origport(msg); 831 orig.node = msg_orignode(msg); 832 up_ptr = p_ptr->user_port; 833 usr_handle = up_ptr->usr_handle; 834 connected = p_ptr->publ.connected; 835 published = p_ptr->publ.published; 836 837 if (unlikely(msg_errcode(msg))) 838 goto err; 839 840 switch (message_type) { 841 842 case TIPC_CONN_MSG:{ 843 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 844 u32 peer_port = port_peerport(p_ptr); 845 u32 peer_node = port_peernode(p_ptr); 846 847 spin_unlock_bh(p_ptr->publ.lock); 848 if (unlikely(!connected)) { 849 if (unlikely(published)) 850 goto reject; 851 tipc_connect2port(dref,&orig); 852 } 853 if (unlikely(msg_origport(msg) != peer_port)) 854 goto reject; 855 if (unlikely(msg_orignode(msg) != peer_node)) 856 goto reject; 857 if (unlikely(!cb)) 858 goto reject; 859 if (unlikely(++p_ptr->publ.conn_unacked >= 860 TIPC_FLOW_CONTROL_WIN)) 861 tipc_acknowledge(dref, 862 p_ptr->publ.conn_unacked); 863 skb_pull(buf, msg_hdr_sz(msg)); 864 cb(usr_handle, dref, &buf, msg_data(msg), 865 msg_data_sz(msg)); 866 break; 867 } 868 case TIPC_DIRECT_MSG:{ 869 tipc_msg_event cb = up_ptr->msg_cb; 870 871 spin_unlock_bh(p_ptr->publ.lock); 872 if (unlikely(connected)) 873 goto reject; 874 if (unlikely(!cb)) 875 goto reject; 876 skb_pull(buf, msg_hdr_sz(msg)); 877 cb(usr_handle, dref, &buf, msg_data(msg), 878 msg_data_sz(msg), msg_importance(msg), 879 &orig); 880 break; 881 } 882 case TIPC_MCAST_MSG: 883 case TIPC_NAMED_MSG:{ 884 tipc_named_msg_event cb = up_ptr->named_msg_cb; 885 886 spin_unlock_bh(p_ptr->publ.lock); 887 if (unlikely(connected)) 888 goto reject; 889 if (unlikely(!cb)) 890 goto reject; 891 if (unlikely(!published)) 892 goto reject; 893 dseq.type = msg_nametype(msg); 894 dseq.lower = msg_nameinst(msg); 895 dseq.upper = (message_type == TIPC_NAMED_MSG) 896 ? dseq.lower : msg_nameupper(msg); 897 skb_pull(buf, msg_hdr_sz(msg)); 898 cb(usr_handle, dref, &buf, msg_data(msg), 899 msg_data_sz(msg), msg_importance(msg), 900 &orig, &dseq); 901 break; 902 } 903 } 904 if (buf) 905 buf_discard(buf); 906 buf = next; 907 continue; 908err: 909 switch (message_type) { 910 911 case TIPC_CONN_MSG:{ 912 tipc_conn_shutdown_event cb = 913 up_ptr->conn_err_cb; 914 u32 peer_port = port_peerport(p_ptr); 915 u32 peer_node = port_peernode(p_ptr); 916 917 spin_unlock_bh(p_ptr->publ.lock); 918 if (!connected || !cb) 919 break; 920 if (msg_origport(msg) != peer_port) 921 break; 922 if (msg_orignode(msg) != peer_node) 923 break; 924 tipc_disconnect(dref); 925 skb_pull(buf, msg_hdr_sz(msg)); 926 cb(usr_handle, dref, &buf, msg_data(msg), 927 msg_data_sz(msg), msg_errcode(msg)); 928 break; 929 } 930 case TIPC_DIRECT_MSG:{ 931 tipc_msg_err_event cb = up_ptr->err_cb; 932 933 spin_unlock_bh(p_ptr->publ.lock); 934 if (connected || !cb) 935 break; 936 skb_pull(buf, msg_hdr_sz(msg)); 937 cb(usr_handle, dref, &buf, msg_data(msg), 938 msg_data_sz(msg), msg_errcode(msg), &orig); 939 break; 940 } 941 case TIPC_MCAST_MSG: 942 case TIPC_NAMED_MSG:{ 943 tipc_named_msg_err_event cb = 944 up_ptr->named_err_cb; 945 946 spin_unlock_bh(p_ptr->publ.lock); 947 if (connected || !cb) 948 break; 949 dseq.type = msg_nametype(msg); 950 dseq.lower = msg_nameinst(msg); 951 dseq.upper = (message_type == TIPC_NAMED_MSG) 952 ? dseq.lower : msg_nameupper(msg); 953 skb_pull(buf, msg_hdr_sz(msg)); 954 cb(usr_handle, dref, &buf, msg_data(msg), 955 msg_data_sz(msg), msg_errcode(msg), &dseq); 956 break; 957 } 958 } 959 if (buf) 960 buf_discard(buf); 961 buf = next; 962 continue; 963reject: 964 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 965 buf = next; 966 } 967} 968 969/* 970 * port_dispatcher(): Dispatcher for messages destinated 971 * to the tipc_port interface. Called with port locked. 972 */ 973 974static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 975{ 976 buf->next = NULL; 977 spin_lock_bh(&queue_lock); 978 if (msg_queue_head) { 979 msg_queue_tail->next = buf; 980 msg_queue_tail = buf; 981 } else { 982 msg_queue_tail = msg_queue_head = buf; 983 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 984 } 985 spin_unlock_bh(&queue_lock); 986 return TIPC_OK; 987} 988 989/* 990 * Wake up port after congestion: Called with port locked, 991 * 992 */ 993 994static void port_wakeup_sh(unsigned long ref) 995{ 996 struct port *p_ptr; 997 struct user_port *up_ptr; 998 tipc_continue_event cb = NULL; 999 void *uh = NULL; 1000 1001 p_ptr = tipc_port_lock(ref); 1002 if (p_ptr) { 1003 up_ptr = p_ptr->user_port; 1004 if (up_ptr) { 1005 cb = up_ptr->continue_event_cb; 1006 uh = up_ptr->usr_handle; 1007 } 1008 tipc_port_unlock(p_ptr); 1009 } 1010 if (cb) 1011 cb(uh, ref); 1012} 1013 1014 1015static void port_wakeup(struct tipc_port *p_ptr) 1016{ 1017 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 1018} 1019 1020void tipc_acknowledge(u32 ref, u32 ack) 1021{ 1022 struct port *p_ptr; 1023 struct sk_buff *buf = NULL; 1024 1025 p_ptr = tipc_port_lock(ref); 1026 if (!p_ptr) 1027 return; 1028 if (p_ptr->publ.connected) { 1029 p_ptr->publ.conn_unacked -= ack; 1030 buf = port_build_proto_msg(port_peerport(p_ptr), 1031 port_peernode(p_ptr), 1032 ref, 1033 tipc_own_addr, 1034 CONN_MANAGER, 1035 CONN_ACK, 1036 TIPC_OK, 1037 port_out_seqno(p_ptr), 1038 ack); 1039 } 1040 tipc_port_unlock(p_ptr); 1041 tipc_net_route_msg(buf); 1042} 1043 1044/* 1045 * tipc_createport(): user level call. Will add port to 1046 * registry if non-zero user_ref. 1047 */ 1048 1049int tipc_createport(u32 user_ref, 1050 void *usr_handle, 1051 unsigned int importance, 1052 tipc_msg_err_event error_cb, 1053 tipc_named_msg_err_event named_error_cb, 1054 tipc_conn_shutdown_event conn_error_cb, 1055 tipc_msg_event msg_cb, 1056 tipc_named_msg_event named_msg_cb, 1057 tipc_conn_msg_event conn_msg_cb, 1058 tipc_continue_event continue_event_cb,/* May be zero */ 1059 u32 *portref) 1060{ 1061 struct user_port *up_ptr; 1062 struct port *p_ptr; 1063 u32 ref; 1064 1065 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 1066 if (!up_ptr) { 1067 warn("Port creation failed, no memory\n"); 1068 return -ENOMEM; 1069 } 1070 ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance); 1071 p_ptr = tipc_port_lock(ref); 1072 if (!p_ptr) { 1073 kfree(up_ptr); 1074 return -ENOMEM; 1075 } 1076 1077 p_ptr->user_port = up_ptr; 1078 up_ptr->user_ref = user_ref; 1079 up_ptr->usr_handle = usr_handle; 1080 up_ptr->ref = p_ptr->publ.ref; 1081 up_ptr->err_cb = error_cb; 1082 up_ptr->named_err_cb = named_error_cb; 1083 up_ptr->conn_err_cb = conn_error_cb; 1084 up_ptr->msg_cb = msg_cb; 1085 up_ptr->named_msg_cb = named_msg_cb; 1086 up_ptr->conn_msg_cb = conn_msg_cb; 1087 up_ptr->continue_event_cb = continue_event_cb; 1088 INIT_LIST_HEAD(&up_ptr->uport_list); 1089 tipc_reg_add_port(up_ptr); 1090 *portref = p_ptr->publ.ref; 1091 dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref); 1092 tipc_port_unlock(p_ptr); 1093 return TIPC_OK; 1094} 1095 1096int tipc_ownidentity(u32 ref, struct tipc_portid *id) 1097{ 1098 id->ref = ref; 1099 id->node = tipc_own_addr; 1100 return TIPC_OK; 1101} 1102 1103int tipc_portimportance(u32 ref, unsigned int *importance) 1104{ 1105 struct port *p_ptr; 1106 1107 p_ptr = tipc_port_lock(ref); 1108 if (!p_ptr) 1109 return -EINVAL; 1110 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 1111 spin_unlock_bh(p_ptr->publ.lock); 1112 return TIPC_OK; 1113} 1114 1115int tipc_set_portimportance(u32 ref, unsigned int imp) 1116{ 1117 struct port *p_ptr; 1118 1119 if (imp > TIPC_CRITICAL_IMPORTANCE) 1120 return -EINVAL; 1121 1122 p_ptr = tipc_port_lock(ref); 1123 if (!p_ptr) 1124 return -EINVAL; 1125 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 1126 spin_unlock_bh(p_ptr->publ.lock); 1127 return TIPC_OK; 1128} 1129 1130 1131int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1132{ 1133 struct port *p_ptr; 1134 struct publication *publ; 1135 u32 key; 1136 int res = -EINVAL; 1137 1138 p_ptr = tipc_port_lock(ref); 1139 if (!p_ptr) 1140 return -EINVAL; 1141 1142 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " 1143 "lower = %u, upper = %u\n", 1144 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper); 1145 if (p_ptr->publ.connected) 1146 goto exit; 1147 if (seq->lower > seq->upper) 1148 goto exit; 1149 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 1150 goto exit; 1151 key = ref + p_ptr->pub_count + 1; 1152 if (key == ref) { 1153 res = -EADDRINUSE; 1154 goto exit; 1155 } 1156 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1157 scope, p_ptr->publ.ref, key); 1158 if (publ) { 1159 list_add(&publ->pport_list, &p_ptr->publications); 1160 p_ptr->pub_count++; 1161 p_ptr->publ.published = 1; 1162 res = TIPC_OK; 1163 } 1164exit: 1165 tipc_port_unlock(p_ptr); 1166 return res; 1167} 1168 1169int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1170{ 1171 struct port *p_ptr; 1172 struct publication *publ; 1173 struct publication *tpubl; 1174 int res = -EINVAL; 1175 1176 p_ptr = tipc_port_lock(ref); 1177 if (!p_ptr) 1178 return -EINVAL; 1179 if (!seq) { 1180 list_for_each_entry_safe(publ, tpubl, 1181 &p_ptr->publications, pport_list) { 1182 tipc_nametbl_withdraw(publ->type, publ->lower, 1183 publ->ref, publ->key); 1184 } 1185 res = TIPC_OK; 1186 } else { 1187 list_for_each_entry_safe(publ, tpubl, 1188 &p_ptr->publications, pport_list) { 1189 if (publ->scope != scope) 1190 continue; 1191 if (publ->type != seq->type) 1192 continue; 1193 if (publ->lower != seq->lower) 1194 continue; 1195 if (publ->upper != seq->upper) 1196 break; 1197 tipc_nametbl_withdraw(publ->type, publ->lower, 1198 publ->ref, publ->key); 1199 res = TIPC_OK; 1200 break; 1201 } 1202 } 1203 if (list_empty(&p_ptr->publications)) 1204 p_ptr->publ.published = 0; 1205 tipc_port_unlock(p_ptr); 1206 return res; 1207} 1208 1209int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1210{ 1211 struct port *p_ptr; 1212 struct tipc_msg *msg; 1213 int res = -EINVAL; 1214 1215 p_ptr = tipc_port_lock(ref); 1216 if (!p_ptr) 1217 return -EINVAL; 1218 if (p_ptr->publ.published || p_ptr->publ.connected) 1219 goto exit; 1220 if (!peer->ref) 1221 goto exit; 1222 1223 msg = &p_ptr->publ.phdr; 1224 msg_set_destnode(msg, peer->node); 1225 msg_set_destport(msg, peer->ref); 1226 msg_set_orignode(msg, tipc_own_addr); 1227 msg_set_origport(msg, p_ptr->publ.ref); 1228 msg_set_transp_seqno(msg, 42); 1229 msg_set_type(msg, TIPC_CONN_MSG); 1230 if (!may_route(peer->node)) 1231 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1232 else 1233 msg_set_hdr_sz(msg, LONG_H_SIZE); 1234 1235 p_ptr->probing_interval = PROBING_INTERVAL; 1236 p_ptr->probing_state = CONFIRMED; 1237 p_ptr->publ.connected = 1; 1238 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1239 1240 tipc_nodesub_subscribe(&p_ptr->subscription,peer->node, 1241 (void *)(unsigned long)ref, 1242 (net_ev_handler)port_handle_node_down); 1243 res = TIPC_OK; 1244exit: 1245 tipc_port_unlock(p_ptr); 1246 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1247 return res; 1248} 1249 1250/* 1251 * tipc_disconnect(): Disconnect port form peer. 1252 * This is a node local operation. 1253 */ 1254 1255int tipc_disconnect(u32 ref) 1256{ 1257 struct port *p_ptr; 1258 int res = -ENOTCONN; 1259 1260 p_ptr = tipc_port_lock(ref); 1261 if (!p_ptr) 1262 return -EINVAL; 1263 if (p_ptr->publ.connected) { 1264 p_ptr->publ.connected = 0; 1265 /* let timer expire on it's own to avoid deadlock! */ 1266 tipc_nodesub_unsubscribe(&p_ptr->subscription); 1267 res = TIPC_OK; 1268 } 1269 tipc_port_unlock(p_ptr); 1270 return res; 1271} 1272 1273/* 1274 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1275 */ 1276int tipc_shutdown(u32 ref) 1277{ 1278 struct port *p_ptr; 1279 struct sk_buff *buf = NULL; 1280 1281 p_ptr = tipc_port_lock(ref); 1282 if (!p_ptr) 1283 return -EINVAL; 1284 1285 if (p_ptr->publ.connected) { 1286 u32 imp = msg_importance(&p_ptr->publ.phdr); 1287 if (imp < TIPC_CRITICAL_IMPORTANCE) 1288 imp++; 1289 buf = port_build_proto_msg(port_peerport(p_ptr), 1290 port_peernode(p_ptr), 1291 ref, 1292 tipc_own_addr, 1293 imp, 1294 TIPC_CONN_MSG, 1295 TIPC_CONN_SHUTDOWN, 1296 port_out_seqno(p_ptr), 1297 0); 1298 } 1299 tipc_port_unlock(p_ptr); 1300 tipc_net_route_msg(buf); 1301 return tipc_disconnect(ref); 1302} 1303 1304int tipc_isconnected(u32 ref, int *isconnected) 1305{ 1306 struct port *p_ptr; 1307 1308 p_ptr = tipc_port_lock(ref); 1309 if (!p_ptr) 1310 return -EINVAL; 1311 *isconnected = p_ptr->publ.connected; 1312 tipc_port_unlock(p_ptr); 1313 return TIPC_OK; 1314} 1315 1316int tipc_peer(u32 ref, struct tipc_portid *peer) 1317{ 1318 struct port *p_ptr; 1319 int res; 1320 1321 p_ptr = tipc_port_lock(ref); 1322 if (!p_ptr) 1323 return -EINVAL; 1324 if (p_ptr->publ.connected) { 1325 peer->ref = port_peerport(p_ptr); 1326 peer->node = port_peernode(p_ptr); 1327 res = TIPC_OK; 1328 } else 1329 res = -ENOTCONN; 1330 tipc_port_unlock(p_ptr); 1331 return res; 1332} 1333 1334int tipc_ref_valid(u32 ref) 1335{ 1336 /* Works irrespective of type */ 1337 return !!tipc_ref_deref(ref); 1338} 1339 1340 1341/* 1342 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1343 * message for this node. 1344 */ 1345 1346int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1347 struct iovec const *msg_sect) 1348{ 1349 struct sk_buff *buf; 1350 int res; 1351 1352 res = msg_build(&sender->publ.phdr, msg_sect, num_sect, 1353 MAX_MSG_SIZE, !sender->user_port, &buf); 1354 if (likely(buf)) 1355 tipc_port_recv_msg(buf); 1356 return res; 1357} 1358 1359/** 1360 * tipc_send - send message sections on connection 1361 */ 1362 1363int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1364{ 1365 struct port *p_ptr; 1366 u32 destnode; 1367 int res; 1368 1369 p_ptr = tipc_port_deref(ref); 1370 if (!p_ptr || !p_ptr->publ.connected) 1371 return -EINVAL; 1372 1373 p_ptr->publ.congested = 1; 1374 if (!tipc_port_congested(p_ptr)) { 1375 destnode = port_peernode(p_ptr); 1376 if (likely(destnode != tipc_own_addr)) 1377 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1378 destnode); 1379 else 1380 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1381 1382 if (likely(res != -ELINKCONG)) { 1383 port_incr_out_seqno(p_ptr); 1384 p_ptr->publ.congested = 0; 1385 p_ptr->sent++; 1386 return res; 1387 } 1388 } 1389 if (port_unreliable(p_ptr)) { 1390 p_ptr->publ.congested = 0; 1391 /* Just calculate msg length and return */ 1392 return msg_calc_data_size(msg_sect, num_sect); 1393 } 1394 return -ELINKCONG; 1395} 1396 1397/** 1398 * tipc_send_buf - send message buffer on connection 1399 */ 1400 1401int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz) 1402{ 1403 struct port *p_ptr; 1404 struct tipc_msg *msg; 1405 u32 destnode; 1406 u32 hsz; 1407 u32 sz; 1408 u32 res; 1409 1410 p_ptr = tipc_port_deref(ref); 1411 if (!p_ptr || !p_ptr->publ.connected) 1412 return -EINVAL; 1413 1414 msg = &p_ptr->publ.phdr; 1415 hsz = msg_hdr_sz(msg); 1416 sz = hsz + dsz; 1417 msg_set_size(msg, sz); 1418 if (skb_cow(buf, hsz)) 1419 return -ENOMEM; 1420 1421 skb_push(buf, hsz); 1422 skb_copy_to_linear_data(buf, msg, hsz); 1423 destnode = msg_destnode(msg); 1424 p_ptr->publ.congested = 1; 1425 if (!tipc_port_congested(p_ptr)) { 1426 if (likely(destnode != tipc_own_addr)) 1427 res = tipc_send_buf_fast(buf, destnode); 1428 else { 1429 tipc_port_recv_msg(buf); 1430 res = sz; 1431 } 1432 if (likely(res != -ELINKCONG)) { 1433 port_incr_out_seqno(p_ptr); 1434 p_ptr->sent++; 1435 p_ptr->publ.congested = 0; 1436 return res; 1437 } 1438 } 1439 if (port_unreliable(p_ptr)) { 1440 p_ptr->publ.congested = 0; 1441 return dsz; 1442 } 1443 return -ELINKCONG; 1444} 1445 1446/** 1447 * tipc_forward2name - forward message sections to port name 1448 */ 1449 1450int tipc_forward2name(u32 ref, 1451 struct tipc_name const *name, 1452 u32 domain, 1453 u32 num_sect, 1454 struct iovec const *msg_sect, 1455 struct tipc_portid const *orig, 1456 unsigned int importance) 1457{ 1458 struct port *p_ptr; 1459 struct tipc_msg *msg; 1460 u32 destnode = domain; 1461 u32 destport = 0; 1462 int res; 1463 1464 p_ptr = tipc_port_deref(ref); 1465 if (!p_ptr || p_ptr->publ.connected) 1466 return -EINVAL; 1467 1468 msg = &p_ptr->publ.phdr; 1469 msg_set_type(msg, TIPC_NAMED_MSG); 1470 msg_set_orignode(msg, orig->node); 1471 msg_set_origport(msg, orig->ref); 1472 msg_set_hdr_sz(msg, LONG_H_SIZE); 1473 msg_set_nametype(msg, name->type); 1474 msg_set_nameinst(msg, name->instance); 1475 msg_set_lookup_scope(msg, addr_scope(domain)); 1476 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1477 msg_set_importance(msg,importance); 1478 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1479 msg_set_destnode(msg, destnode); 1480 msg_set_destport(msg, destport); 1481 1482 if (likely(destport || destnode)) { 1483 p_ptr->sent++; 1484 if (likely(destnode == tipc_own_addr)) 1485 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1486 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1487 destnode); 1488 if (likely(res != -ELINKCONG)) 1489 return res; 1490 if (port_unreliable(p_ptr)) { 1491 /* Just calculate msg length and return */ 1492 return msg_calc_data_size(msg_sect, num_sect); 1493 } 1494 return -ELINKCONG; 1495 } 1496 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1497 TIPC_ERR_NO_NAME); 1498} 1499 1500/** 1501 * tipc_send2name - send message sections to port name 1502 */ 1503 1504int tipc_send2name(u32 ref, 1505 struct tipc_name const *name, 1506 unsigned int domain, 1507 unsigned int num_sect, 1508 struct iovec const *msg_sect) 1509{ 1510 struct tipc_portid orig; 1511 1512 orig.ref = ref; 1513 orig.node = tipc_own_addr; 1514 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig, 1515 TIPC_PORT_IMPORTANCE); 1516} 1517 1518/** 1519 * tipc_forward_buf2name - forward message buffer to port name 1520 */ 1521 1522int tipc_forward_buf2name(u32 ref, 1523 struct tipc_name const *name, 1524 u32 domain, 1525 struct sk_buff *buf, 1526 unsigned int dsz, 1527 struct tipc_portid const *orig, 1528 unsigned int importance) 1529{ 1530 struct port *p_ptr; 1531 struct tipc_msg *msg; 1532 u32 destnode = domain; 1533 u32 destport = 0; 1534 int res; 1535 1536 p_ptr = (struct port *)tipc_ref_deref(ref); 1537 if (!p_ptr || p_ptr->publ.connected) 1538 return -EINVAL; 1539 1540 msg = &p_ptr->publ.phdr; 1541 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1542 msg_set_importance(msg, importance); 1543 msg_set_type(msg, TIPC_NAMED_MSG); 1544 msg_set_orignode(msg, orig->node); 1545 msg_set_origport(msg, orig->ref); 1546 msg_set_nametype(msg, name->type); 1547 msg_set_nameinst(msg, name->instance); 1548 msg_set_lookup_scope(msg, addr_scope(domain)); 1549 msg_set_hdr_sz(msg, LONG_H_SIZE); 1550 msg_set_size(msg, LONG_H_SIZE + dsz); 1551 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1552 msg_set_destnode(msg, destnode); 1553 msg_set_destport(msg, destport); 1554 msg_dbg(msg, "forw2name ==> "); 1555 if (skb_cow(buf, LONG_H_SIZE)) 1556 return -ENOMEM; 1557 skb_push(buf, LONG_H_SIZE); 1558 skb_copy_to_linear_data(buf, msg, LONG_H_SIZE); 1559 msg_dbg(buf_msg(buf),"PREP:"); 1560 if (likely(destport || destnode)) { 1561 p_ptr->sent++; 1562 if (destnode == tipc_own_addr) 1563 return tipc_port_recv_msg(buf); 1564 res = tipc_send_buf_fast(buf, destnode); 1565 if (likely(res != -ELINKCONG)) 1566 return res; 1567 if (port_unreliable(p_ptr)) 1568 return dsz; 1569 return -ELINKCONG; 1570 } 1571 return tipc_reject_msg(buf, TIPC_ERR_NO_NAME); 1572} 1573 1574/** 1575 * tipc_send_buf2name - send message buffer to port name 1576 */ 1577 1578int tipc_send_buf2name(u32 ref, 1579 struct tipc_name const *dest, 1580 u32 domain, 1581 struct sk_buff *buf, 1582 unsigned int dsz) 1583{ 1584 struct tipc_portid orig; 1585 1586 orig.ref = ref; 1587 orig.node = tipc_own_addr; 1588 return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig, 1589 TIPC_PORT_IMPORTANCE); 1590} 1591 1592/** 1593 * tipc_forward2port - forward message sections to port identity 1594 */ 1595 1596int tipc_forward2port(u32 ref, 1597 struct tipc_portid const *dest, 1598 unsigned int num_sect, 1599 struct iovec const *msg_sect, 1600 struct tipc_portid const *orig, 1601 unsigned int importance) 1602{ 1603 struct port *p_ptr; 1604 struct tipc_msg *msg; 1605 int res; 1606 1607 p_ptr = tipc_port_deref(ref); 1608 if (!p_ptr || p_ptr->publ.connected) 1609 return -EINVAL; 1610 1611 msg = &p_ptr->publ.phdr; 1612 msg_set_type(msg, TIPC_DIRECT_MSG); 1613 msg_set_orignode(msg, orig->node); 1614 msg_set_origport(msg, orig->ref); 1615 msg_set_destnode(msg, dest->node); 1616 msg_set_destport(msg, dest->ref); 1617 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1618 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1619 msg_set_importance(msg, importance); 1620 p_ptr->sent++; 1621 if (dest->node == tipc_own_addr) 1622 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1623 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1624 if (likely(res != -ELINKCONG)) 1625 return res; 1626 if (port_unreliable(p_ptr)) { 1627 /* Just calculate msg length and return */ 1628 return msg_calc_data_size(msg_sect, num_sect); 1629 } 1630 return -ELINKCONG; 1631} 1632 1633/** 1634 * tipc_send2port - send message sections to port identity 1635 */ 1636 1637int tipc_send2port(u32 ref, 1638 struct tipc_portid const *dest, 1639 unsigned int num_sect, 1640 struct iovec const *msg_sect) 1641{ 1642 struct tipc_portid orig; 1643 1644 orig.ref = ref; 1645 orig.node = tipc_own_addr; 1646 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 1647 TIPC_PORT_IMPORTANCE); 1648} 1649 1650/** 1651 * tipc_forward_buf2port - forward message buffer to port identity 1652 */ 1653int tipc_forward_buf2port(u32 ref, 1654 struct tipc_portid const *dest, 1655 struct sk_buff *buf, 1656 unsigned int dsz, 1657 struct tipc_portid const *orig, 1658 unsigned int importance) 1659{ 1660 struct port *p_ptr; 1661 struct tipc_msg *msg; 1662 int res; 1663 1664 p_ptr = (struct port *)tipc_ref_deref(ref); 1665 if (!p_ptr || p_ptr->publ.connected) 1666 return -EINVAL; 1667 1668 msg = &p_ptr->publ.phdr; 1669 msg_set_type(msg, TIPC_DIRECT_MSG); 1670 msg_set_orignode(msg, orig->node); 1671 msg_set_origport(msg, orig->ref); 1672 msg_set_destnode(msg, dest->node); 1673 msg_set_destport(msg, dest->ref); 1674 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1675 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1676 msg_set_importance(msg, importance); 1677 msg_set_size(msg, DIR_MSG_H_SIZE + dsz); 1678 if (skb_cow(buf, DIR_MSG_H_SIZE)) 1679 return -ENOMEM; 1680 1681 skb_push(buf, DIR_MSG_H_SIZE); 1682 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE); 1683 msg_dbg(msg, "buf2port: "); 1684 p_ptr->sent++; 1685 if (dest->node == tipc_own_addr) 1686 return tipc_port_recv_msg(buf); 1687 res = tipc_send_buf_fast(buf, dest->node); 1688 if (likely(res != -ELINKCONG)) 1689 return res; 1690 if (port_unreliable(p_ptr)) 1691 return dsz; 1692 return -ELINKCONG; 1693} 1694 1695/** 1696 * tipc_send_buf2port - send message buffer to port identity 1697 */ 1698 1699int tipc_send_buf2port(u32 ref, 1700 struct tipc_portid const *dest, 1701 struct sk_buff *buf, 1702 unsigned int dsz) 1703{ 1704 struct tipc_portid orig; 1705 1706 orig.ref = ref; 1707 orig.node = tipc_own_addr; 1708 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 1709 TIPC_PORT_IMPORTANCE); 1710} 1711