Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.26-rc6 1721 lines 42 kB view raw
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2007, Ericsson AB 5 * Copyright (c) 2004-2007, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "dbg.h" 40#include "port.h" 41#include "addr.h" 42#include "link.h" 43#include "node.h" 44#include "name_table.h" 45#include "user_reg.h" 46#include "msg.h" 47#include "bcast.h" 48 49/* Connection management: */ 50#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 51#define CONFIRMED 0 52#define PROBING 1 53 54#define MAX_REJECT_SIZE 1024 55 56static struct sk_buff *msg_queue_head = NULL; 57static struct sk_buff *msg_queue_tail = NULL; 58 59DEFINE_SPINLOCK(tipc_port_list_lock); 60static DEFINE_SPINLOCK(queue_lock); 61 62static LIST_HEAD(ports); 63static void port_handle_node_down(unsigned long ref); 64static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err); 65static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err); 66static void port_timeout(unsigned long ref); 67 68 69static u32 port_peernode(struct port *p_ptr) 70{ 71 return msg_destnode(&p_ptr->publ.phdr); 72} 73 74static u32 port_peerport(struct port *p_ptr) 75{ 76 return msg_destport(&p_ptr->publ.phdr); 77} 78 79static u32 port_out_seqno(struct port *p_ptr) 80{ 81 return msg_transp_seqno(&p_ptr->publ.phdr); 82} 83 84static void port_incr_out_seqno(struct port *p_ptr) 85{ 86 struct tipc_msg *m = &p_ptr->publ.phdr; 87 88 if (likely(!msg_routed(m))) 89 return; 90 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1)); 91} 92 93/** 94 * tipc_multicast - send a multicast message to local and remote destinations 95 */ 96 97int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, 98 u32 num_sect, struct iovec const *msg_sect) 99{ 100 struct tipc_msg *hdr; 101 struct sk_buff *buf; 102 struct sk_buff *ibuf = NULL; 103 struct port_list dports = {0, NULL, }; 104 struct port *oport = tipc_port_deref(ref); 105 int ext_targets; 106 int res; 107 108 if (unlikely(!oport)) 109 return -EINVAL; 110 111 /* Create multicast message */ 112 113 hdr = &oport->publ.phdr; 114 msg_set_type(hdr, TIPC_MCAST_MSG); 115 msg_set_nametype(hdr, seq->type); 116 msg_set_namelower(hdr, seq->lower); 117 msg_set_nameupper(hdr, seq->upper); 118 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 119 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 120 !oport->user_port, &buf); 121 if (unlikely(!buf)) 122 return res; 123 124 /* Figure out where to send multicast message */ 125 126 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 127 TIPC_NODE_SCOPE, &dports); 128 129 /* Send message to destinations (duplicate it only if necessary) */ 130 131 if (ext_targets) { 132 if (dports.count != 0) { 133 ibuf = skb_copy(buf, GFP_ATOMIC); 134 if (ibuf == NULL) { 135 tipc_port_list_free(&dports); 136 buf_discard(buf); 137 return -ENOMEM; 138 } 139 } 140 res = tipc_bclink_send_msg(buf); 141 if ((res < 0) && (dports.count != 0)) { 142 buf_discard(ibuf); 143 } 144 } else { 145 ibuf = buf; 146 } 147 148 if (res >= 0) { 149 if (ibuf) 150 tipc_port_recv_mcast(ibuf, &dports); 151 } else { 152 tipc_port_list_free(&dports); 153 } 154 return res; 155} 156 157/** 158 * tipc_port_recv_mcast - deliver multicast message to all destination ports 159 * 160 * If there is no port list, perform a lookup to create one 161 */ 162 163void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 164{ 165 struct tipc_msg* msg; 166 struct port_list dports = {0, NULL, }; 167 struct port_list *item = dp; 168 int cnt = 0; 169 170 msg = buf_msg(buf); 171 172 /* Create destination port list, if one wasn't supplied */ 173 174 if (dp == NULL) { 175 tipc_nametbl_mc_translate(msg_nametype(msg), 176 msg_namelower(msg), 177 msg_nameupper(msg), 178 TIPC_CLUSTER_SCOPE, 179 &dports); 180 item = dp = &dports; 181 } 182 183 /* Deliver a copy of message to each destination port */ 184 185 if (dp->count != 0) { 186 if (dp->count == 1) { 187 msg_set_destport(msg, dp->ports[0]); 188 tipc_port_recv_msg(buf); 189 tipc_port_list_free(dp); 190 return; 191 } 192 for (; cnt < dp->count; cnt++) { 193 int index = cnt % PLSIZE; 194 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 195 196 if (b == NULL) { 197 warn("Unable to deliver multicast message(s)\n"); 198 msg_dbg(msg, "LOST:"); 199 goto exit; 200 } 201 if ((index == 0) && (cnt != 0)) { 202 item = item->next; 203 } 204 msg_set_destport(buf_msg(b),item->ports[index]); 205 tipc_port_recv_msg(b); 206 } 207 } 208exit: 209 buf_discard(buf); 210 tipc_port_list_free(dp); 211} 212 213/** 214 * tipc_createport_raw - create a native TIPC port 215 * 216 * Returns local port reference 217 */ 218 219u32 tipc_createport_raw(void *usr_handle, 220 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 221 void (*wakeup)(struct tipc_port *), 222 const u32 importance) 223{ 224 struct port *p_ptr; 225 struct tipc_msg *msg; 226 u32 ref; 227 228 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 229 if (!p_ptr) { 230 warn("Port creation failed, no memory\n"); 231 return 0; 232 } 233 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 234 if (!ref) { 235 warn("Port creation failed, reference table exhausted\n"); 236 kfree(p_ptr); 237 return 0; 238 } 239 240 tipc_port_lock(ref); 241 p_ptr->publ.usr_handle = usr_handle; 242 p_ptr->publ.max_pkt = MAX_PKT_DEFAULT; 243 p_ptr->publ.ref = ref; 244 msg = &p_ptr->publ.phdr; 245 msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 246 0); 247 msg_set_orignode(msg, tipc_own_addr); 248 msg_set_prevnode(msg, tipc_own_addr); 249 msg_set_origport(msg, ref); 250 msg_set_importance(msg,importance); 251 p_ptr->last_in_seqno = 41; 252 p_ptr->sent = 1; 253 INIT_LIST_HEAD(&p_ptr->wait_list); 254 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 255 p_ptr->congested_link = NULL; 256 p_ptr->dispatcher = dispatcher; 257 p_ptr->wakeup = wakeup; 258 p_ptr->user_port = NULL; 259 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 260 spin_lock_bh(&tipc_port_list_lock); 261 INIT_LIST_HEAD(&p_ptr->publications); 262 INIT_LIST_HEAD(&p_ptr->port_list); 263 list_add_tail(&p_ptr->port_list, &ports); 264 spin_unlock_bh(&tipc_port_list_lock); 265 tipc_port_unlock(p_ptr); 266 return ref; 267} 268 269int tipc_deleteport(u32 ref) 270{ 271 struct port *p_ptr; 272 struct sk_buff *buf = NULL; 273 274 tipc_withdraw(ref, 0, NULL); 275 p_ptr = tipc_port_lock(ref); 276 if (!p_ptr) 277 return -EINVAL; 278 279 tipc_ref_discard(ref); 280 tipc_port_unlock(p_ptr); 281 282 k_cancel_timer(&p_ptr->timer); 283 if (p_ptr->publ.connected) { 284 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 285 tipc_nodesub_unsubscribe(&p_ptr->subscription); 286 } 287 if (p_ptr->user_port) { 288 tipc_reg_remove_port(p_ptr->user_port); 289 kfree(p_ptr->user_port); 290 } 291 292 spin_lock_bh(&tipc_port_list_lock); 293 list_del(&p_ptr->port_list); 294 list_del(&p_ptr->wait_list); 295 spin_unlock_bh(&tipc_port_list_lock); 296 k_term_timer(&p_ptr->timer); 297 kfree(p_ptr); 298 dbg("Deleted port %u\n", ref); 299 tipc_net_route_msg(buf); 300 return TIPC_OK; 301} 302 303/** 304 * tipc_get_port() - return port associated with 'ref' 305 * 306 * Note: Port is not locked. 307 */ 308 309struct tipc_port *tipc_get_port(const u32 ref) 310{ 311 return (struct tipc_port *)tipc_ref_deref(ref); 312} 313 314/** 315 * tipc_get_handle - return user handle associated to port 'ref' 316 */ 317 318void *tipc_get_handle(const u32 ref) 319{ 320 struct port *p_ptr; 321 void * handle; 322 323 p_ptr = tipc_port_lock(ref); 324 if (!p_ptr) 325 return NULL; 326 handle = p_ptr->publ.usr_handle; 327 tipc_port_unlock(p_ptr); 328 return handle; 329} 330 331static int port_unreliable(struct port *p_ptr) 332{ 333 return msg_src_droppable(&p_ptr->publ.phdr); 334} 335 336int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 337{ 338 struct port *p_ptr; 339 340 p_ptr = tipc_port_lock(ref); 341 if (!p_ptr) 342 return -EINVAL; 343 *isunreliable = port_unreliable(p_ptr); 344 tipc_port_unlock(p_ptr); 345 return TIPC_OK; 346} 347 348int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 349{ 350 struct port *p_ptr; 351 352 p_ptr = tipc_port_lock(ref); 353 if (!p_ptr) 354 return -EINVAL; 355 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 356 tipc_port_unlock(p_ptr); 357 return TIPC_OK; 358} 359 360static int port_unreturnable(struct port *p_ptr) 361{ 362 return msg_dest_droppable(&p_ptr->publ.phdr); 363} 364 365int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 366{ 367 struct port *p_ptr; 368 369 p_ptr = tipc_port_lock(ref); 370 if (!p_ptr) 371 return -EINVAL; 372 *isunrejectable = port_unreturnable(p_ptr); 373 tipc_port_unlock(p_ptr); 374 return TIPC_OK; 375} 376 377int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 378{ 379 struct port *p_ptr; 380 381 p_ptr = tipc_port_lock(ref); 382 if (!p_ptr) 383 return -EINVAL; 384 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 385 tipc_port_unlock(p_ptr); 386 return TIPC_OK; 387} 388 389/* 390 * port_build_proto_msg(): build a port level protocol 391 * or a connection abortion message. Called with 392 * tipc_port lock on. 393 */ 394static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 395 u32 origport, u32 orignode, 396 u32 usr, u32 type, u32 err, 397 u32 seqno, u32 ack) 398{ 399 struct sk_buff *buf; 400 struct tipc_msg *msg; 401 402 buf = buf_acquire(LONG_H_SIZE); 403 if (buf) { 404 msg = buf_msg(buf); 405 msg_init(msg, usr, type, err, LONG_H_SIZE, destnode); 406 msg_set_destport(msg, destport); 407 msg_set_origport(msg, origport); 408 msg_set_destnode(msg, destnode); 409 msg_set_orignode(msg, orignode); 410 msg_set_transp_seqno(msg, seqno); 411 msg_set_msgcnt(msg, ack); 412 msg_dbg(msg, "PORT>SEND>:"); 413 } 414 return buf; 415} 416 417int tipc_reject_msg(struct sk_buff *buf, u32 err) 418{ 419 struct tipc_msg *msg = buf_msg(buf); 420 struct sk_buff *rbuf; 421 struct tipc_msg *rmsg; 422 int hdr_sz; 423 u32 imp = msg_importance(msg); 424 u32 data_sz = msg_data_sz(msg); 425 426 if (data_sz > MAX_REJECT_SIZE) 427 data_sz = MAX_REJECT_SIZE; 428 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) 429 imp++; 430 msg_dbg(msg, "port->rej: "); 431 432 /* discard rejected message if it shouldn't be returned to sender */ 433 if (msg_errcode(msg) || msg_dest_droppable(msg)) { 434 buf_discard(buf); 435 return data_sz; 436 } 437 438 /* construct rejected message */ 439 if (msg_mcast(msg)) 440 hdr_sz = MCAST_H_SIZE; 441 else 442 hdr_sz = LONG_H_SIZE; 443 rbuf = buf_acquire(data_sz + hdr_sz); 444 if (rbuf == NULL) { 445 buf_discard(buf); 446 return data_sz; 447 } 448 rmsg = buf_msg(rbuf); 449 msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg)); 450 msg_set_destport(rmsg, msg_origport(msg)); 451 msg_set_prevnode(rmsg, tipc_own_addr); 452 msg_set_origport(rmsg, msg_destport(msg)); 453 if (msg_short(msg)) 454 msg_set_orignode(rmsg, tipc_own_addr); 455 else 456 msg_set_orignode(rmsg, msg_destnode(msg)); 457 msg_set_size(rmsg, data_sz + hdr_sz); 458 msg_set_nametype(rmsg, msg_nametype(msg)); 459 msg_set_nameinst(rmsg, msg_nameinst(msg)); 460 skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz); 461 462 /* send self-abort message when rejecting on a connected port */ 463 if (msg_connected(msg)) { 464 struct sk_buff *abuf = NULL; 465 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 466 467 if (p_ptr) { 468 if (p_ptr->publ.connected) 469 abuf = port_build_self_abort_msg(p_ptr, err); 470 tipc_port_unlock(p_ptr); 471 } 472 tipc_net_route_msg(abuf); 473 } 474 475 /* send rejected message */ 476 buf_discard(buf); 477 tipc_net_route_msg(rbuf); 478 return data_sz; 479} 480 481int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 482 struct iovec const *msg_sect, u32 num_sect, 483 int err) 484{ 485 struct sk_buff *buf; 486 int res; 487 488 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 489 !p_ptr->user_port, &buf); 490 if (!buf) 491 return res; 492 493 return tipc_reject_msg(buf, err); 494} 495 496static void port_timeout(unsigned long ref) 497{ 498 struct port *p_ptr = tipc_port_lock(ref); 499 struct sk_buff *buf = NULL; 500 501 if (!p_ptr) 502 return; 503 504 if (!p_ptr->publ.connected) { 505 tipc_port_unlock(p_ptr); 506 return; 507 } 508 509 /* Last probe answered ? */ 510 if (p_ptr->probing_state == PROBING) { 511 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 512 } else { 513 buf = port_build_proto_msg(port_peerport(p_ptr), 514 port_peernode(p_ptr), 515 p_ptr->publ.ref, 516 tipc_own_addr, 517 CONN_MANAGER, 518 CONN_PROBE, 519 TIPC_OK, 520 port_out_seqno(p_ptr), 521 0); 522 port_incr_out_seqno(p_ptr); 523 p_ptr->probing_state = PROBING; 524 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 525 } 526 tipc_port_unlock(p_ptr); 527 tipc_net_route_msg(buf); 528} 529 530 531static void port_handle_node_down(unsigned long ref) 532{ 533 struct port *p_ptr = tipc_port_lock(ref); 534 struct sk_buff* buf = NULL; 535 536 if (!p_ptr) 537 return; 538 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 539 tipc_port_unlock(p_ptr); 540 tipc_net_route_msg(buf); 541} 542 543 544static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 545{ 546 u32 imp = msg_importance(&p_ptr->publ.phdr); 547 548 if (!p_ptr->publ.connected) 549 return NULL; 550 if (imp < TIPC_CRITICAL_IMPORTANCE) 551 imp++; 552 return port_build_proto_msg(p_ptr->publ.ref, 553 tipc_own_addr, 554 port_peerport(p_ptr), 555 port_peernode(p_ptr), 556 imp, 557 TIPC_CONN_MSG, 558 err, 559 p_ptr->last_in_seqno + 1, 560 0); 561} 562 563 564static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 565{ 566 u32 imp = msg_importance(&p_ptr->publ.phdr); 567 568 if (!p_ptr->publ.connected) 569 return NULL; 570 if (imp < TIPC_CRITICAL_IMPORTANCE) 571 imp++; 572 return port_build_proto_msg(port_peerport(p_ptr), 573 port_peernode(p_ptr), 574 p_ptr->publ.ref, 575 tipc_own_addr, 576 imp, 577 TIPC_CONN_MSG, 578 err, 579 port_out_seqno(p_ptr), 580 0); 581} 582 583void tipc_port_recv_proto_msg(struct sk_buff *buf) 584{ 585 struct tipc_msg *msg = buf_msg(buf); 586 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 587 u32 err = TIPC_OK; 588 struct sk_buff *r_buf = NULL; 589 struct sk_buff *abort_buf = NULL; 590 591 msg_dbg(msg, "PORT<RECV<:"); 592 593 if (!p_ptr) { 594 err = TIPC_ERR_NO_PORT; 595 } else if (p_ptr->publ.connected) { 596 if (port_peernode(p_ptr) != msg_orignode(msg)) 597 err = TIPC_ERR_NO_PORT; 598 if (port_peerport(p_ptr) != msg_origport(msg)) 599 err = TIPC_ERR_NO_PORT; 600 if (!err && msg_routed(msg)) { 601 u32 seqno = msg_transp_seqno(msg); 602 u32 myno = ++p_ptr->last_in_seqno; 603 if (seqno != myno) { 604 err = TIPC_ERR_NO_PORT; 605 abort_buf = port_build_self_abort_msg(p_ptr, err); 606 } 607 } 608 if (msg_type(msg) == CONN_ACK) { 609 int wakeup = tipc_port_congested(p_ptr) && 610 p_ptr->publ.congested && 611 p_ptr->wakeup; 612 p_ptr->acked += msg_msgcnt(msg); 613 if (tipc_port_congested(p_ptr)) 614 goto exit; 615 p_ptr->publ.congested = 0; 616 if (!wakeup) 617 goto exit; 618 p_ptr->wakeup(&p_ptr->publ); 619 goto exit; 620 } 621 } else if (p_ptr->publ.published) { 622 err = TIPC_ERR_NO_PORT; 623 } 624 if (err) { 625 r_buf = port_build_proto_msg(msg_origport(msg), 626 msg_orignode(msg), 627 msg_destport(msg), 628 tipc_own_addr, 629 TIPC_HIGH_IMPORTANCE, 630 TIPC_CONN_MSG, 631 err, 632 0, 633 0); 634 goto exit; 635 } 636 637 /* All is fine */ 638 if (msg_type(msg) == CONN_PROBE) { 639 r_buf = port_build_proto_msg(msg_origport(msg), 640 msg_orignode(msg), 641 msg_destport(msg), 642 tipc_own_addr, 643 CONN_MANAGER, 644 CONN_PROBE_REPLY, 645 TIPC_OK, 646 port_out_seqno(p_ptr), 647 0); 648 } 649 p_ptr->probing_state = CONFIRMED; 650 port_incr_out_seqno(p_ptr); 651exit: 652 if (p_ptr) 653 tipc_port_unlock(p_ptr); 654 tipc_net_route_msg(r_buf); 655 tipc_net_route_msg(abort_buf); 656 buf_discard(buf); 657} 658 659static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 660{ 661 struct publication *publ; 662 663 if (full_id) 664 tipc_printf(buf, "<%u.%u.%u:%u>:", 665 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 666 tipc_node(tipc_own_addr), p_ptr->publ.ref); 667 else 668 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 669 670 if (p_ptr->publ.connected) { 671 u32 dport = port_peerport(p_ptr); 672 u32 destnode = port_peernode(p_ptr); 673 674 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 675 tipc_zone(destnode), tipc_cluster(destnode), 676 tipc_node(destnode), dport); 677 if (p_ptr->publ.conn_type != 0) 678 tipc_printf(buf, " via {%u,%u}", 679 p_ptr->publ.conn_type, 680 p_ptr->publ.conn_instance); 681 } 682 else if (p_ptr->publ.published) { 683 tipc_printf(buf, " bound to"); 684 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 685 if (publ->lower == publ->upper) 686 tipc_printf(buf, " {%u,%u}", publ->type, 687 publ->lower); 688 else 689 tipc_printf(buf, " {%u,%u,%u}", publ->type, 690 publ->lower, publ->upper); 691 } 692 } 693 tipc_printf(buf, "\n"); 694} 695 696#define MAX_PORT_QUERY 32768 697 698struct sk_buff *tipc_port_get_ports(void) 699{ 700 struct sk_buff *buf; 701 struct tlv_desc *rep_tlv; 702 struct print_buf pb; 703 struct port *p_ptr; 704 int str_len; 705 706 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 707 if (!buf) 708 return NULL; 709 rep_tlv = (struct tlv_desc *)buf->data; 710 711 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 712 spin_lock_bh(&tipc_port_list_lock); 713 list_for_each_entry(p_ptr, &ports, port_list) { 714 spin_lock_bh(p_ptr->publ.lock); 715 port_print(p_ptr, &pb, 0); 716 spin_unlock_bh(p_ptr->publ.lock); 717 } 718 spin_unlock_bh(&tipc_port_list_lock); 719 str_len = tipc_printbuf_validate(&pb); 720 721 skb_put(buf, TLV_SPACE(str_len)); 722 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 723 724 return buf; 725} 726 727#if 0 728 729#define MAX_PORT_STATS 2000 730 731struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space) 732{ 733 u32 ref; 734 struct port *p_ptr; 735 struct sk_buff *buf; 736 struct tlv_desc *rep_tlv; 737 struct print_buf pb; 738 int str_len; 739 740 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF)) 741 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 742 743 ref = *(u32 *)TLV_DATA(req_tlv_area); 744 ref = ntohl(ref); 745 746 p_ptr = tipc_port_lock(ref); 747 if (!p_ptr) 748 return cfg_reply_error_string("port not found"); 749 750 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS)); 751 if (!buf) { 752 tipc_port_unlock(p_ptr); 753 return NULL; 754 } 755 rep_tlv = (struct tlv_desc *)buf->data; 756 757 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS); 758 port_print(p_ptr, &pb, 1); 759 /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */ 760 tipc_port_unlock(p_ptr); 761 str_len = tipc_printbuf_validate(&pb); 762 763 skb_put(buf, TLV_SPACE(str_len)); 764 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 765 766 return buf; 767} 768 769#endif 770 771void tipc_port_reinit(void) 772{ 773 struct port *p_ptr; 774 struct tipc_msg *msg; 775 776 spin_lock_bh(&tipc_port_list_lock); 777 list_for_each_entry(p_ptr, &ports, port_list) { 778 msg = &p_ptr->publ.phdr; 779 if (msg_orignode(msg) == tipc_own_addr) 780 break; 781 msg_set_orignode(msg, tipc_own_addr); 782 } 783 spin_unlock_bh(&tipc_port_list_lock); 784} 785 786 787/* 788 * port_dispatcher_sigh(): Signal handler for messages destinated 789 * to the tipc_port interface. 790 */ 791 792static void port_dispatcher_sigh(void *dummy) 793{ 794 struct sk_buff *buf; 795 796 spin_lock_bh(&queue_lock); 797 buf = msg_queue_head; 798 msg_queue_head = NULL; 799 spin_unlock_bh(&queue_lock); 800 801 while (buf) { 802 struct port *p_ptr; 803 struct user_port *up_ptr; 804 struct tipc_portid orig; 805 struct tipc_name_seq dseq; 806 void *usr_handle; 807 int connected; 808 int published; 809 u32 message_type; 810 811 struct sk_buff *next = buf->next; 812 struct tipc_msg *msg = buf_msg(buf); 813 u32 dref = msg_destport(msg); 814 815 message_type = msg_type(msg); 816 if (message_type > TIPC_DIRECT_MSG) 817 goto reject; /* Unsupported message type */ 818 819 p_ptr = tipc_port_lock(dref); 820 if (!p_ptr) 821 goto reject; /* Port deleted while msg in queue */ 822 823 orig.ref = msg_origport(msg); 824 orig.node = msg_orignode(msg); 825 up_ptr = p_ptr->user_port; 826 usr_handle = up_ptr->usr_handle; 827 connected = p_ptr->publ.connected; 828 published = p_ptr->publ.published; 829 830 if (unlikely(msg_errcode(msg))) 831 goto err; 832 833 switch (message_type) { 834 835 case TIPC_CONN_MSG:{ 836 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 837 u32 peer_port = port_peerport(p_ptr); 838 u32 peer_node = port_peernode(p_ptr); 839 840 tipc_port_unlock(p_ptr); 841 if (unlikely(!connected)) { 842 if (unlikely(published)) 843 goto reject; 844 tipc_connect2port(dref,&orig); 845 } 846 if (unlikely(msg_origport(msg) != peer_port)) 847 goto reject; 848 if (unlikely(msg_orignode(msg) != peer_node)) 849 goto reject; 850 if (unlikely(!cb)) 851 goto reject; 852 if (unlikely(++p_ptr->publ.conn_unacked >= 853 TIPC_FLOW_CONTROL_WIN)) 854 tipc_acknowledge(dref, 855 p_ptr->publ.conn_unacked); 856 skb_pull(buf, msg_hdr_sz(msg)); 857 cb(usr_handle, dref, &buf, msg_data(msg), 858 msg_data_sz(msg)); 859 break; 860 } 861 case TIPC_DIRECT_MSG:{ 862 tipc_msg_event cb = up_ptr->msg_cb; 863 864 tipc_port_unlock(p_ptr); 865 if (unlikely(connected)) 866 goto reject; 867 if (unlikely(!cb)) 868 goto reject; 869 skb_pull(buf, msg_hdr_sz(msg)); 870 cb(usr_handle, dref, &buf, msg_data(msg), 871 msg_data_sz(msg), msg_importance(msg), 872 &orig); 873 break; 874 } 875 case TIPC_MCAST_MSG: 876 case TIPC_NAMED_MSG:{ 877 tipc_named_msg_event cb = up_ptr->named_msg_cb; 878 879 tipc_port_unlock(p_ptr); 880 if (unlikely(connected)) 881 goto reject; 882 if (unlikely(!cb)) 883 goto reject; 884 if (unlikely(!published)) 885 goto reject; 886 dseq.type = msg_nametype(msg); 887 dseq.lower = msg_nameinst(msg); 888 dseq.upper = (message_type == TIPC_NAMED_MSG) 889 ? dseq.lower : msg_nameupper(msg); 890 skb_pull(buf, msg_hdr_sz(msg)); 891 cb(usr_handle, dref, &buf, msg_data(msg), 892 msg_data_sz(msg), msg_importance(msg), 893 &orig, &dseq); 894 break; 895 } 896 } 897 if (buf) 898 buf_discard(buf); 899 buf = next; 900 continue; 901err: 902 switch (message_type) { 903 904 case TIPC_CONN_MSG:{ 905 tipc_conn_shutdown_event cb = 906 up_ptr->conn_err_cb; 907 u32 peer_port = port_peerport(p_ptr); 908 u32 peer_node = port_peernode(p_ptr); 909 910 tipc_port_unlock(p_ptr); 911 if (!connected || !cb) 912 break; 913 if (msg_origport(msg) != peer_port) 914 break; 915 if (msg_orignode(msg) != peer_node) 916 break; 917 tipc_disconnect(dref); 918 skb_pull(buf, msg_hdr_sz(msg)); 919 cb(usr_handle, dref, &buf, msg_data(msg), 920 msg_data_sz(msg), msg_errcode(msg)); 921 break; 922 } 923 case TIPC_DIRECT_MSG:{ 924 tipc_msg_err_event cb = up_ptr->err_cb; 925 926 tipc_port_unlock(p_ptr); 927 if (connected || !cb) 928 break; 929 skb_pull(buf, msg_hdr_sz(msg)); 930 cb(usr_handle, dref, &buf, msg_data(msg), 931 msg_data_sz(msg), msg_errcode(msg), &orig); 932 break; 933 } 934 case TIPC_MCAST_MSG: 935 case TIPC_NAMED_MSG:{ 936 tipc_named_msg_err_event cb = 937 up_ptr->named_err_cb; 938 939 tipc_port_unlock(p_ptr); 940 if (connected || !cb) 941 break; 942 dseq.type = msg_nametype(msg); 943 dseq.lower = msg_nameinst(msg); 944 dseq.upper = (message_type == TIPC_NAMED_MSG) 945 ? dseq.lower : msg_nameupper(msg); 946 skb_pull(buf, msg_hdr_sz(msg)); 947 cb(usr_handle, dref, &buf, msg_data(msg), 948 msg_data_sz(msg), msg_errcode(msg), &dseq); 949 break; 950 } 951 } 952 if (buf) 953 buf_discard(buf); 954 buf = next; 955 continue; 956reject: 957 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 958 buf = next; 959 } 960} 961 962/* 963 * port_dispatcher(): Dispatcher for messages destinated 964 * to the tipc_port interface. Called with port locked. 965 */ 966 967static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 968{ 969 buf->next = NULL; 970 spin_lock_bh(&queue_lock); 971 if (msg_queue_head) { 972 msg_queue_tail->next = buf; 973 msg_queue_tail = buf; 974 } else { 975 msg_queue_tail = msg_queue_head = buf; 976 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 977 } 978 spin_unlock_bh(&queue_lock); 979 return TIPC_OK; 980} 981 982/* 983 * Wake up port after congestion: Called with port locked, 984 * 985 */ 986 987static void port_wakeup_sh(unsigned long ref) 988{ 989 struct port *p_ptr; 990 struct user_port *up_ptr; 991 tipc_continue_event cb = NULL; 992 void *uh = NULL; 993 994 p_ptr = tipc_port_lock(ref); 995 if (p_ptr) { 996 up_ptr = p_ptr->user_port; 997 if (up_ptr) { 998 cb = up_ptr->continue_event_cb; 999 uh = up_ptr->usr_handle; 1000 } 1001 tipc_port_unlock(p_ptr); 1002 } 1003 if (cb) 1004 cb(uh, ref); 1005} 1006 1007 1008static void port_wakeup(struct tipc_port *p_ptr) 1009{ 1010 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 1011} 1012 1013void tipc_acknowledge(u32 ref, u32 ack) 1014{ 1015 struct port *p_ptr; 1016 struct sk_buff *buf = NULL; 1017 1018 p_ptr = tipc_port_lock(ref); 1019 if (!p_ptr) 1020 return; 1021 if (p_ptr->publ.connected) { 1022 p_ptr->publ.conn_unacked -= ack; 1023 buf = port_build_proto_msg(port_peerport(p_ptr), 1024 port_peernode(p_ptr), 1025 ref, 1026 tipc_own_addr, 1027 CONN_MANAGER, 1028 CONN_ACK, 1029 TIPC_OK, 1030 port_out_seqno(p_ptr), 1031 ack); 1032 } 1033 tipc_port_unlock(p_ptr); 1034 tipc_net_route_msg(buf); 1035} 1036 1037/* 1038 * tipc_createport(): user level call. Will add port to 1039 * registry if non-zero user_ref. 1040 */ 1041 1042int tipc_createport(u32 user_ref, 1043 void *usr_handle, 1044 unsigned int importance, 1045 tipc_msg_err_event error_cb, 1046 tipc_named_msg_err_event named_error_cb, 1047 tipc_conn_shutdown_event conn_error_cb, 1048 tipc_msg_event msg_cb, 1049 tipc_named_msg_event named_msg_cb, 1050 tipc_conn_msg_event conn_msg_cb, 1051 tipc_continue_event continue_event_cb,/* May be zero */ 1052 u32 *portref) 1053{ 1054 struct user_port *up_ptr; 1055 struct port *p_ptr; 1056 u32 ref; 1057 1058 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 1059 if (!up_ptr) { 1060 warn("Port creation failed, no memory\n"); 1061 return -ENOMEM; 1062 } 1063 ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance); 1064 p_ptr = tipc_port_lock(ref); 1065 if (!p_ptr) { 1066 kfree(up_ptr); 1067 return -ENOMEM; 1068 } 1069 1070 p_ptr->user_port = up_ptr; 1071 up_ptr->user_ref = user_ref; 1072 up_ptr->usr_handle = usr_handle; 1073 up_ptr->ref = p_ptr->publ.ref; 1074 up_ptr->err_cb = error_cb; 1075 up_ptr->named_err_cb = named_error_cb; 1076 up_ptr->conn_err_cb = conn_error_cb; 1077 up_ptr->msg_cb = msg_cb; 1078 up_ptr->named_msg_cb = named_msg_cb; 1079 up_ptr->conn_msg_cb = conn_msg_cb; 1080 up_ptr->continue_event_cb = continue_event_cb; 1081 INIT_LIST_HEAD(&up_ptr->uport_list); 1082 tipc_reg_add_port(up_ptr); 1083 *portref = p_ptr->publ.ref; 1084 dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref); 1085 tipc_port_unlock(p_ptr); 1086 return TIPC_OK; 1087} 1088 1089int tipc_ownidentity(u32 ref, struct tipc_portid *id) 1090{ 1091 id->ref = ref; 1092 id->node = tipc_own_addr; 1093 return TIPC_OK; 1094} 1095 1096int tipc_portimportance(u32 ref, unsigned int *importance) 1097{ 1098 struct port *p_ptr; 1099 1100 p_ptr = tipc_port_lock(ref); 1101 if (!p_ptr) 1102 return -EINVAL; 1103 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 1104 tipc_port_unlock(p_ptr); 1105 return TIPC_OK; 1106} 1107 1108int tipc_set_portimportance(u32 ref, unsigned int imp) 1109{ 1110 struct port *p_ptr; 1111 1112 if (imp > TIPC_CRITICAL_IMPORTANCE) 1113 return -EINVAL; 1114 1115 p_ptr = tipc_port_lock(ref); 1116 if (!p_ptr) 1117 return -EINVAL; 1118 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 1119 tipc_port_unlock(p_ptr); 1120 return TIPC_OK; 1121} 1122 1123 1124int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1125{ 1126 struct port *p_ptr; 1127 struct publication *publ; 1128 u32 key; 1129 int res = -EINVAL; 1130 1131 p_ptr = tipc_port_lock(ref); 1132 if (!p_ptr) 1133 return -EINVAL; 1134 1135 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " 1136 "lower = %u, upper = %u\n", 1137 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper); 1138 if (p_ptr->publ.connected) 1139 goto exit; 1140 if (seq->lower > seq->upper) 1141 goto exit; 1142 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 1143 goto exit; 1144 key = ref + p_ptr->pub_count + 1; 1145 if (key == ref) { 1146 res = -EADDRINUSE; 1147 goto exit; 1148 } 1149 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1150 scope, p_ptr->publ.ref, key); 1151 if (publ) { 1152 list_add(&publ->pport_list, &p_ptr->publications); 1153 p_ptr->pub_count++; 1154 p_ptr->publ.published = 1; 1155 res = TIPC_OK; 1156 } 1157exit: 1158 tipc_port_unlock(p_ptr); 1159 return res; 1160} 1161 1162int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1163{ 1164 struct port *p_ptr; 1165 struct publication *publ; 1166 struct publication *tpubl; 1167 int res = -EINVAL; 1168 1169 p_ptr = tipc_port_lock(ref); 1170 if (!p_ptr) 1171 return -EINVAL; 1172 if (!seq) { 1173 list_for_each_entry_safe(publ, tpubl, 1174 &p_ptr->publications, pport_list) { 1175 tipc_nametbl_withdraw(publ->type, publ->lower, 1176 publ->ref, publ->key); 1177 } 1178 res = TIPC_OK; 1179 } else { 1180 list_for_each_entry_safe(publ, tpubl, 1181 &p_ptr->publications, pport_list) { 1182 if (publ->scope != scope) 1183 continue; 1184 if (publ->type != seq->type) 1185 continue; 1186 if (publ->lower != seq->lower) 1187 continue; 1188 if (publ->upper != seq->upper) 1189 break; 1190 tipc_nametbl_withdraw(publ->type, publ->lower, 1191 publ->ref, publ->key); 1192 res = TIPC_OK; 1193 break; 1194 } 1195 } 1196 if (list_empty(&p_ptr->publications)) 1197 p_ptr->publ.published = 0; 1198 tipc_port_unlock(p_ptr); 1199 return res; 1200} 1201 1202int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1203{ 1204 struct port *p_ptr; 1205 struct tipc_msg *msg; 1206 int res = -EINVAL; 1207 1208 p_ptr = tipc_port_lock(ref); 1209 if (!p_ptr) 1210 return -EINVAL; 1211 if (p_ptr->publ.published || p_ptr->publ.connected) 1212 goto exit; 1213 if (!peer->ref) 1214 goto exit; 1215 1216 msg = &p_ptr->publ.phdr; 1217 msg_set_destnode(msg, peer->node); 1218 msg_set_destport(msg, peer->ref); 1219 msg_set_orignode(msg, tipc_own_addr); 1220 msg_set_origport(msg, p_ptr->publ.ref); 1221 msg_set_transp_seqno(msg, 42); 1222 msg_set_type(msg, TIPC_CONN_MSG); 1223 if (!may_route(peer->node)) 1224 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1225 else 1226 msg_set_hdr_sz(msg, LONG_H_SIZE); 1227 1228 p_ptr->probing_interval = PROBING_INTERVAL; 1229 p_ptr->probing_state = CONFIRMED; 1230 p_ptr->publ.connected = 1; 1231 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1232 1233 tipc_nodesub_subscribe(&p_ptr->subscription,peer->node, 1234 (void *)(unsigned long)ref, 1235 (net_ev_handler)port_handle_node_down); 1236 res = TIPC_OK; 1237exit: 1238 tipc_port_unlock(p_ptr); 1239 p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1240 return res; 1241} 1242 1243/** 1244 * tipc_disconnect_port - disconnect port from peer 1245 * 1246 * Port must be locked. 1247 */ 1248 1249int tipc_disconnect_port(struct tipc_port *tp_ptr) 1250{ 1251 int res; 1252 1253 if (tp_ptr->connected) { 1254 tp_ptr->connected = 0; 1255 /* let timer expire on it's own to avoid deadlock! */ 1256 tipc_nodesub_unsubscribe( 1257 &((struct port *)tp_ptr)->subscription); 1258 res = TIPC_OK; 1259 } else { 1260 res = -ENOTCONN; 1261 } 1262 return res; 1263} 1264 1265/* 1266 * tipc_disconnect(): Disconnect port form peer. 1267 * This is a node local operation. 1268 */ 1269 1270int tipc_disconnect(u32 ref) 1271{ 1272 struct port *p_ptr; 1273 int res; 1274 1275 p_ptr = tipc_port_lock(ref); 1276 if (!p_ptr) 1277 return -EINVAL; 1278 res = tipc_disconnect_port((struct tipc_port *)p_ptr); 1279 tipc_port_unlock(p_ptr); 1280 return res; 1281} 1282 1283/* 1284 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1285 */ 1286int tipc_shutdown(u32 ref) 1287{ 1288 struct port *p_ptr; 1289 struct sk_buff *buf = NULL; 1290 1291 p_ptr = tipc_port_lock(ref); 1292 if (!p_ptr) 1293 return -EINVAL; 1294 1295 if (p_ptr->publ.connected) { 1296 u32 imp = msg_importance(&p_ptr->publ.phdr); 1297 if (imp < TIPC_CRITICAL_IMPORTANCE) 1298 imp++; 1299 buf = port_build_proto_msg(port_peerport(p_ptr), 1300 port_peernode(p_ptr), 1301 ref, 1302 tipc_own_addr, 1303 imp, 1304 TIPC_CONN_MSG, 1305 TIPC_CONN_SHUTDOWN, 1306 port_out_seqno(p_ptr), 1307 0); 1308 } 1309 tipc_port_unlock(p_ptr); 1310 tipc_net_route_msg(buf); 1311 return tipc_disconnect(ref); 1312} 1313 1314int tipc_isconnected(u32 ref, int *isconnected) 1315{ 1316 struct port *p_ptr; 1317 1318 p_ptr = tipc_port_lock(ref); 1319 if (!p_ptr) 1320 return -EINVAL; 1321 *isconnected = p_ptr->publ.connected; 1322 tipc_port_unlock(p_ptr); 1323 return TIPC_OK; 1324} 1325 1326int tipc_peer(u32 ref, struct tipc_portid *peer) 1327{ 1328 struct port *p_ptr; 1329 int res; 1330 1331 p_ptr = tipc_port_lock(ref); 1332 if (!p_ptr) 1333 return -EINVAL; 1334 if (p_ptr->publ.connected) { 1335 peer->ref = port_peerport(p_ptr); 1336 peer->node = port_peernode(p_ptr); 1337 res = TIPC_OK; 1338 } else 1339 res = -ENOTCONN; 1340 tipc_port_unlock(p_ptr); 1341 return res; 1342} 1343 1344int tipc_ref_valid(u32 ref) 1345{ 1346 /* Works irrespective of type */ 1347 return !!tipc_ref_deref(ref); 1348} 1349 1350 1351/* 1352 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1353 * message for this node. 1354 */ 1355 1356int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1357 struct iovec const *msg_sect) 1358{ 1359 struct sk_buff *buf; 1360 int res; 1361 1362 res = msg_build(&sender->publ.phdr, msg_sect, num_sect, 1363 MAX_MSG_SIZE, !sender->user_port, &buf); 1364 if (likely(buf)) 1365 tipc_port_recv_msg(buf); 1366 return res; 1367} 1368 1369/** 1370 * tipc_send - send message sections on connection 1371 */ 1372 1373int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1374{ 1375 struct port *p_ptr; 1376 u32 destnode; 1377 int res; 1378 1379 p_ptr = tipc_port_deref(ref); 1380 if (!p_ptr || !p_ptr->publ.connected) 1381 return -EINVAL; 1382 1383 p_ptr->publ.congested = 1; 1384 if (!tipc_port_congested(p_ptr)) { 1385 destnode = port_peernode(p_ptr); 1386 if (likely(destnode != tipc_own_addr)) 1387 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1388 destnode); 1389 else 1390 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1391 1392 if (likely(res != -ELINKCONG)) { 1393 port_incr_out_seqno(p_ptr); 1394 p_ptr->publ.congested = 0; 1395 p_ptr->sent++; 1396 return res; 1397 } 1398 } 1399 if (port_unreliable(p_ptr)) { 1400 p_ptr->publ.congested = 0; 1401 /* Just calculate msg length and return */ 1402 return msg_calc_data_size(msg_sect, num_sect); 1403 } 1404 return -ELINKCONG; 1405} 1406 1407/** 1408 * tipc_send_buf - send message buffer on connection 1409 */ 1410 1411int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz) 1412{ 1413 struct port *p_ptr; 1414 struct tipc_msg *msg; 1415 u32 destnode; 1416 u32 hsz; 1417 u32 sz; 1418 u32 res; 1419 1420 p_ptr = tipc_port_deref(ref); 1421 if (!p_ptr || !p_ptr->publ.connected) 1422 return -EINVAL; 1423 1424 msg = &p_ptr->publ.phdr; 1425 hsz = msg_hdr_sz(msg); 1426 sz = hsz + dsz; 1427 msg_set_size(msg, sz); 1428 if (skb_cow(buf, hsz)) 1429 return -ENOMEM; 1430 1431 skb_push(buf, hsz); 1432 skb_copy_to_linear_data(buf, msg, hsz); 1433 destnode = msg_destnode(msg); 1434 p_ptr->publ.congested = 1; 1435 if (!tipc_port_congested(p_ptr)) { 1436 if (likely(destnode != tipc_own_addr)) 1437 res = tipc_send_buf_fast(buf, destnode); 1438 else { 1439 tipc_port_recv_msg(buf); 1440 res = sz; 1441 } 1442 if (likely(res != -ELINKCONG)) { 1443 port_incr_out_seqno(p_ptr); 1444 p_ptr->sent++; 1445 p_ptr->publ.congested = 0; 1446 return res; 1447 } 1448 } 1449 if (port_unreliable(p_ptr)) { 1450 p_ptr->publ.congested = 0; 1451 return dsz; 1452 } 1453 return -ELINKCONG; 1454} 1455 1456/** 1457 * tipc_forward2name - forward message sections to port name 1458 */ 1459 1460int tipc_forward2name(u32 ref, 1461 struct tipc_name const *name, 1462 u32 domain, 1463 u32 num_sect, 1464 struct iovec const *msg_sect, 1465 struct tipc_portid const *orig, 1466 unsigned int importance) 1467{ 1468 struct port *p_ptr; 1469 struct tipc_msg *msg; 1470 u32 destnode = domain; 1471 u32 destport = 0; 1472 int res; 1473 1474 p_ptr = tipc_port_deref(ref); 1475 if (!p_ptr || p_ptr->publ.connected) 1476 return -EINVAL; 1477 1478 msg = &p_ptr->publ.phdr; 1479 msg_set_type(msg, TIPC_NAMED_MSG); 1480 msg_set_orignode(msg, orig->node); 1481 msg_set_origport(msg, orig->ref); 1482 msg_set_hdr_sz(msg, LONG_H_SIZE); 1483 msg_set_nametype(msg, name->type); 1484 msg_set_nameinst(msg, name->instance); 1485 msg_set_lookup_scope(msg, addr_scope(domain)); 1486 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1487 msg_set_importance(msg,importance); 1488 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1489 msg_set_destnode(msg, destnode); 1490 msg_set_destport(msg, destport); 1491 1492 if (likely(destport || destnode)) { 1493 p_ptr->sent++; 1494 if (likely(destnode == tipc_own_addr)) 1495 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1496 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1497 destnode); 1498 if (likely(res != -ELINKCONG)) 1499 return res; 1500 if (port_unreliable(p_ptr)) { 1501 /* Just calculate msg length and return */ 1502 return msg_calc_data_size(msg_sect, num_sect); 1503 } 1504 return -ELINKCONG; 1505 } 1506 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1507 TIPC_ERR_NO_NAME); 1508} 1509 1510/** 1511 * tipc_send2name - send message sections to port name 1512 */ 1513 1514int tipc_send2name(u32 ref, 1515 struct tipc_name const *name, 1516 unsigned int domain, 1517 unsigned int num_sect, 1518 struct iovec const *msg_sect) 1519{ 1520 struct tipc_portid orig; 1521 1522 orig.ref = ref; 1523 orig.node = tipc_own_addr; 1524 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig, 1525 TIPC_PORT_IMPORTANCE); 1526} 1527 1528/** 1529 * tipc_forward_buf2name - forward message buffer to port name 1530 */ 1531 1532int tipc_forward_buf2name(u32 ref, 1533 struct tipc_name const *name, 1534 u32 domain, 1535 struct sk_buff *buf, 1536 unsigned int dsz, 1537 struct tipc_portid const *orig, 1538 unsigned int importance) 1539{ 1540 struct port *p_ptr; 1541 struct tipc_msg *msg; 1542 u32 destnode = domain; 1543 u32 destport = 0; 1544 int res; 1545 1546 p_ptr = (struct port *)tipc_ref_deref(ref); 1547 if (!p_ptr || p_ptr->publ.connected) 1548 return -EINVAL; 1549 1550 msg = &p_ptr->publ.phdr; 1551 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1552 msg_set_importance(msg, importance); 1553 msg_set_type(msg, TIPC_NAMED_MSG); 1554 msg_set_orignode(msg, orig->node); 1555 msg_set_origport(msg, orig->ref); 1556 msg_set_nametype(msg, name->type); 1557 msg_set_nameinst(msg, name->instance); 1558 msg_set_lookup_scope(msg, addr_scope(domain)); 1559 msg_set_hdr_sz(msg, LONG_H_SIZE); 1560 msg_set_size(msg, LONG_H_SIZE + dsz); 1561 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1562 msg_set_destnode(msg, destnode); 1563 msg_set_destport(msg, destport); 1564 msg_dbg(msg, "forw2name ==> "); 1565 if (skb_cow(buf, LONG_H_SIZE)) 1566 return -ENOMEM; 1567 skb_push(buf, LONG_H_SIZE); 1568 skb_copy_to_linear_data(buf, msg, LONG_H_SIZE); 1569 msg_dbg(buf_msg(buf),"PREP:"); 1570 if (likely(destport || destnode)) { 1571 p_ptr->sent++; 1572 if (destnode == tipc_own_addr) 1573 return tipc_port_recv_msg(buf); 1574 res = tipc_send_buf_fast(buf, destnode); 1575 if (likely(res != -ELINKCONG)) 1576 return res; 1577 if (port_unreliable(p_ptr)) 1578 return dsz; 1579 return -ELINKCONG; 1580 } 1581 return tipc_reject_msg(buf, TIPC_ERR_NO_NAME); 1582} 1583 1584/** 1585 * tipc_send_buf2name - send message buffer to port name 1586 */ 1587 1588int tipc_send_buf2name(u32 ref, 1589 struct tipc_name const *dest, 1590 u32 domain, 1591 struct sk_buff *buf, 1592 unsigned int dsz) 1593{ 1594 struct tipc_portid orig; 1595 1596 orig.ref = ref; 1597 orig.node = tipc_own_addr; 1598 return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig, 1599 TIPC_PORT_IMPORTANCE); 1600} 1601 1602/** 1603 * tipc_forward2port - forward message sections to port identity 1604 */ 1605 1606int tipc_forward2port(u32 ref, 1607 struct tipc_portid const *dest, 1608 unsigned int num_sect, 1609 struct iovec const *msg_sect, 1610 struct tipc_portid const *orig, 1611 unsigned int importance) 1612{ 1613 struct port *p_ptr; 1614 struct tipc_msg *msg; 1615 int res; 1616 1617 p_ptr = tipc_port_deref(ref); 1618 if (!p_ptr || p_ptr->publ.connected) 1619 return -EINVAL; 1620 1621 msg = &p_ptr->publ.phdr; 1622 msg_set_type(msg, TIPC_DIRECT_MSG); 1623 msg_set_orignode(msg, orig->node); 1624 msg_set_origport(msg, orig->ref); 1625 msg_set_destnode(msg, dest->node); 1626 msg_set_destport(msg, dest->ref); 1627 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1628 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1629 msg_set_importance(msg, importance); 1630 p_ptr->sent++; 1631 if (dest->node == tipc_own_addr) 1632 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1633 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1634 if (likely(res != -ELINKCONG)) 1635 return res; 1636 if (port_unreliable(p_ptr)) { 1637 /* Just calculate msg length and return */ 1638 return msg_calc_data_size(msg_sect, num_sect); 1639 } 1640 return -ELINKCONG; 1641} 1642 1643/** 1644 * tipc_send2port - send message sections to port identity 1645 */ 1646 1647int tipc_send2port(u32 ref, 1648 struct tipc_portid const *dest, 1649 unsigned int num_sect, 1650 struct iovec const *msg_sect) 1651{ 1652 struct tipc_portid orig; 1653 1654 orig.ref = ref; 1655 orig.node = tipc_own_addr; 1656 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 1657 TIPC_PORT_IMPORTANCE); 1658} 1659 1660/** 1661 * tipc_forward_buf2port - forward message buffer to port identity 1662 */ 1663int tipc_forward_buf2port(u32 ref, 1664 struct tipc_portid const *dest, 1665 struct sk_buff *buf, 1666 unsigned int dsz, 1667 struct tipc_portid const *orig, 1668 unsigned int importance) 1669{ 1670 struct port *p_ptr; 1671 struct tipc_msg *msg; 1672 int res; 1673 1674 p_ptr = (struct port *)tipc_ref_deref(ref); 1675 if (!p_ptr || p_ptr->publ.connected) 1676 return -EINVAL; 1677 1678 msg = &p_ptr->publ.phdr; 1679 msg_set_type(msg, TIPC_DIRECT_MSG); 1680 msg_set_orignode(msg, orig->node); 1681 msg_set_origport(msg, orig->ref); 1682 msg_set_destnode(msg, dest->node); 1683 msg_set_destport(msg, dest->ref); 1684 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1685 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1686 msg_set_importance(msg, importance); 1687 msg_set_size(msg, DIR_MSG_H_SIZE + dsz); 1688 if (skb_cow(buf, DIR_MSG_H_SIZE)) 1689 return -ENOMEM; 1690 1691 skb_push(buf, DIR_MSG_H_SIZE); 1692 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE); 1693 msg_dbg(msg, "buf2port: "); 1694 p_ptr->sent++; 1695 if (dest->node == tipc_own_addr) 1696 return tipc_port_recv_msg(buf); 1697 res = tipc_send_buf_fast(buf, dest->node); 1698 if (likely(res != -ELINKCONG)) 1699 return res; 1700 if (port_unreliable(p_ptr)) 1701 return dsz; 1702 return -ELINKCONG; 1703} 1704 1705/** 1706 * tipc_send_buf2port - send message buffer to port identity 1707 */ 1708 1709int tipc_send_buf2port(u32 ref, 1710 struct tipc_portid const *dest, 1711 struct sk_buff *buf, 1712 unsigned int dsz) 1713{ 1714 struct tipc_portid orig; 1715 1716 orig.ref = ref; 1717 orig.node = tipc_own_addr; 1718 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 1719 TIPC_PORT_IMPORTANCE); 1720} 1721