Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.27-rc7 1706 lines 41 kB view raw
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2007, Ericsson AB 5 * Copyright (c) 2004-2008, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "dbg.h" 40#include "port.h" 41#include "addr.h" 42#include "link.h" 43#include "node.h" 44#include "name_table.h" 45#include "user_reg.h" 46#include "msg.h" 47#include "bcast.h" 48 49/* Connection management: */ 50#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 51#define CONFIRMED 0 52#define PROBING 1 53 54#define MAX_REJECT_SIZE 1024 55 56static struct sk_buff *msg_queue_head = NULL; 57static struct sk_buff *msg_queue_tail = NULL; 58 59DEFINE_SPINLOCK(tipc_port_list_lock); 60static DEFINE_SPINLOCK(queue_lock); 61 62static LIST_HEAD(ports); 63static void port_handle_node_down(unsigned long ref); 64static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err); 65static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err); 66static void port_timeout(unsigned long ref); 67 68 69static u32 port_peernode(struct port *p_ptr) 70{ 71 return msg_destnode(&p_ptr->publ.phdr); 72} 73 74static u32 port_peerport(struct port *p_ptr) 75{ 76 return msg_destport(&p_ptr->publ.phdr); 77} 78 79static u32 port_out_seqno(struct port *p_ptr) 80{ 81 return msg_transp_seqno(&p_ptr->publ.phdr); 82} 83 84static void port_incr_out_seqno(struct port *p_ptr) 85{ 86 struct tipc_msg *m = &p_ptr->publ.phdr; 87 88 if (likely(!msg_routed(m))) 89 return; 90 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1)); 91} 92 93/** 94 * tipc_multicast - send a multicast message to local and remote destinations 95 */ 96 97int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, 98 u32 num_sect, struct iovec const *msg_sect) 99{ 100 struct tipc_msg *hdr; 101 struct sk_buff *buf; 102 struct sk_buff *ibuf = NULL; 103 struct port_list dports = {0, NULL, }; 104 struct port *oport = tipc_port_deref(ref); 105 int ext_targets; 106 int res; 107 108 if (unlikely(!oport)) 109 return -EINVAL; 110 111 /* Create multicast message */ 112 113 hdr = &oport->publ.phdr; 114 msg_set_type(hdr, TIPC_MCAST_MSG); 115 msg_set_nametype(hdr, seq->type); 116 msg_set_namelower(hdr, seq->lower); 117 msg_set_nameupper(hdr, seq->upper); 118 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 119 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 120 !oport->user_port, &buf); 121 if (unlikely(!buf)) 122 return res; 123 124 /* Figure out where to send multicast message */ 125 126 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 127 TIPC_NODE_SCOPE, &dports); 128 129 /* Send message to destinations (duplicate it only if necessary) */ 130 131 if (ext_targets) { 132 if (dports.count != 0) { 133 ibuf = skb_copy(buf, GFP_ATOMIC); 134 if (ibuf == NULL) { 135 tipc_port_list_free(&dports); 136 buf_discard(buf); 137 return -ENOMEM; 138 } 139 } 140 res = tipc_bclink_send_msg(buf); 141 if ((res < 0) && (dports.count != 0)) { 142 buf_discard(ibuf); 143 } 144 } else { 145 ibuf = buf; 146 } 147 148 if (res >= 0) { 149 if (ibuf) 150 tipc_port_recv_mcast(ibuf, &dports); 151 } else { 152 tipc_port_list_free(&dports); 153 } 154 return res; 155} 156 157/** 158 * tipc_port_recv_mcast - deliver multicast message to all destination ports 159 * 160 * If there is no port list, perform a lookup to create one 161 */ 162 163void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 164{ 165 struct tipc_msg* msg; 166 struct port_list dports = {0, NULL, }; 167 struct port_list *item = dp; 168 int cnt = 0; 169 170 msg = buf_msg(buf); 171 172 /* Create destination port list, if one wasn't supplied */ 173 174 if (dp == NULL) { 175 tipc_nametbl_mc_translate(msg_nametype(msg), 176 msg_namelower(msg), 177 msg_nameupper(msg), 178 TIPC_CLUSTER_SCOPE, 179 &dports); 180 item = dp = &dports; 181 } 182 183 /* Deliver a copy of message to each destination port */ 184 185 if (dp->count != 0) { 186 if (dp->count == 1) { 187 msg_set_destport(msg, dp->ports[0]); 188 tipc_port_recv_msg(buf); 189 tipc_port_list_free(dp); 190 return; 191 } 192 for (; cnt < dp->count; cnt++) { 193 int index = cnt % PLSIZE; 194 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 195 196 if (b == NULL) { 197 warn("Unable to deliver multicast message(s)\n"); 198 msg_dbg(msg, "LOST:"); 199 goto exit; 200 } 201 if ((index == 0) && (cnt != 0)) { 202 item = item->next; 203 } 204 msg_set_destport(buf_msg(b),item->ports[index]); 205 tipc_port_recv_msg(b); 206 } 207 } 208exit: 209 buf_discard(buf); 210 tipc_port_list_free(dp); 211} 212 213/** 214 * tipc_createport_raw - create a generic TIPC port 215 * 216 * Returns pointer to (locked) TIPC port, or NULL if unable to create it 217 */ 218 219struct tipc_port *tipc_createport_raw(void *usr_handle, 220 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 221 void (*wakeup)(struct tipc_port *), 222 const u32 importance) 223{ 224 struct port *p_ptr; 225 struct tipc_msg *msg; 226 u32 ref; 227 228 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 229 if (!p_ptr) { 230 warn("Port creation failed, no memory\n"); 231 return NULL; 232 } 233 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 234 if (!ref) { 235 warn("Port creation failed, reference table exhausted\n"); 236 kfree(p_ptr); 237 return NULL; 238 } 239 240 p_ptr->publ.usr_handle = usr_handle; 241 p_ptr->publ.max_pkt = MAX_PKT_DEFAULT; 242 p_ptr->publ.ref = ref; 243 msg = &p_ptr->publ.phdr; 244 msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0); 245 msg_set_origport(msg, ref); 246 p_ptr->last_in_seqno = 41; 247 p_ptr->sent = 1; 248 INIT_LIST_HEAD(&p_ptr->wait_list); 249 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 250 p_ptr->congested_link = NULL; 251 p_ptr->dispatcher = dispatcher; 252 p_ptr->wakeup = wakeup; 253 p_ptr->user_port = NULL; 254 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 255 spin_lock_bh(&tipc_port_list_lock); 256 INIT_LIST_HEAD(&p_ptr->publications); 257 INIT_LIST_HEAD(&p_ptr->port_list); 258 list_add_tail(&p_ptr->port_list, &ports); 259 spin_unlock_bh(&tipc_port_list_lock); 260 return &(p_ptr->publ); 261} 262 263int tipc_deleteport(u32 ref) 264{ 265 struct port *p_ptr; 266 struct sk_buff *buf = NULL; 267 268 tipc_withdraw(ref, 0, NULL); 269 p_ptr = tipc_port_lock(ref); 270 if (!p_ptr) 271 return -EINVAL; 272 273 tipc_ref_discard(ref); 274 tipc_port_unlock(p_ptr); 275 276 k_cancel_timer(&p_ptr->timer); 277 if (p_ptr->publ.connected) { 278 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 279 tipc_nodesub_unsubscribe(&p_ptr->subscription); 280 } 281 if (p_ptr->user_port) { 282 tipc_reg_remove_port(p_ptr->user_port); 283 kfree(p_ptr->user_port); 284 } 285 286 spin_lock_bh(&tipc_port_list_lock); 287 list_del(&p_ptr->port_list); 288 list_del(&p_ptr->wait_list); 289 spin_unlock_bh(&tipc_port_list_lock); 290 k_term_timer(&p_ptr->timer); 291 kfree(p_ptr); 292 dbg("Deleted port %u\n", ref); 293 tipc_net_route_msg(buf); 294 return 0; 295} 296 297/** 298 * tipc_get_port() - return port associated with 'ref' 299 * 300 * Note: Port is not locked. 301 */ 302 303struct tipc_port *tipc_get_port(const u32 ref) 304{ 305 return (struct tipc_port *)tipc_ref_deref(ref); 306} 307 308/** 309 * tipc_get_handle - return user handle associated to port 'ref' 310 */ 311 312void *tipc_get_handle(const u32 ref) 313{ 314 struct port *p_ptr; 315 void * handle; 316 317 p_ptr = tipc_port_lock(ref); 318 if (!p_ptr) 319 return NULL; 320 handle = p_ptr->publ.usr_handle; 321 tipc_port_unlock(p_ptr); 322 return handle; 323} 324 325static int port_unreliable(struct port *p_ptr) 326{ 327 return msg_src_droppable(&p_ptr->publ.phdr); 328} 329 330int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 331{ 332 struct port *p_ptr; 333 334 p_ptr = tipc_port_lock(ref); 335 if (!p_ptr) 336 return -EINVAL; 337 *isunreliable = port_unreliable(p_ptr); 338 tipc_port_unlock(p_ptr); 339 return 0; 340} 341 342int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 343{ 344 struct port *p_ptr; 345 346 p_ptr = tipc_port_lock(ref); 347 if (!p_ptr) 348 return -EINVAL; 349 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 350 tipc_port_unlock(p_ptr); 351 return 0; 352} 353 354static int port_unreturnable(struct port *p_ptr) 355{ 356 return msg_dest_droppable(&p_ptr->publ.phdr); 357} 358 359int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 360{ 361 struct port *p_ptr; 362 363 p_ptr = tipc_port_lock(ref); 364 if (!p_ptr) 365 return -EINVAL; 366 *isunrejectable = port_unreturnable(p_ptr); 367 tipc_port_unlock(p_ptr); 368 return 0; 369} 370 371int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 372{ 373 struct port *p_ptr; 374 375 p_ptr = tipc_port_lock(ref); 376 if (!p_ptr) 377 return -EINVAL; 378 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 379 tipc_port_unlock(p_ptr); 380 return 0; 381} 382 383/* 384 * port_build_proto_msg(): build a port level protocol 385 * or a connection abortion message. Called with 386 * tipc_port lock on. 387 */ 388static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 389 u32 origport, u32 orignode, 390 u32 usr, u32 type, u32 err, 391 u32 seqno, u32 ack) 392{ 393 struct sk_buff *buf; 394 struct tipc_msg *msg; 395 396 buf = buf_acquire(LONG_H_SIZE); 397 if (buf) { 398 msg = buf_msg(buf); 399 msg_init(msg, usr, type, LONG_H_SIZE, destnode); 400 msg_set_errcode(msg, err); 401 msg_set_destport(msg, destport); 402 msg_set_origport(msg, origport); 403 msg_set_orignode(msg, orignode); 404 msg_set_transp_seqno(msg, seqno); 405 msg_set_msgcnt(msg, ack); 406 msg_dbg(msg, "PORT>SEND>:"); 407 } 408 return buf; 409} 410 411int tipc_reject_msg(struct sk_buff *buf, u32 err) 412{ 413 struct tipc_msg *msg = buf_msg(buf); 414 struct sk_buff *rbuf; 415 struct tipc_msg *rmsg; 416 int hdr_sz; 417 u32 imp = msg_importance(msg); 418 u32 data_sz = msg_data_sz(msg); 419 420 if (data_sz > MAX_REJECT_SIZE) 421 data_sz = MAX_REJECT_SIZE; 422 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) 423 imp++; 424 msg_dbg(msg, "port->rej: "); 425 426 /* discard rejected message if it shouldn't be returned to sender */ 427 if (msg_errcode(msg) || msg_dest_droppable(msg)) { 428 buf_discard(buf); 429 return data_sz; 430 } 431 432 /* construct rejected message */ 433 if (msg_mcast(msg)) 434 hdr_sz = MCAST_H_SIZE; 435 else 436 hdr_sz = LONG_H_SIZE; 437 rbuf = buf_acquire(data_sz + hdr_sz); 438 if (rbuf == NULL) { 439 buf_discard(buf); 440 return data_sz; 441 } 442 rmsg = buf_msg(rbuf); 443 msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg)); 444 msg_set_errcode(rmsg, err); 445 msg_set_destport(rmsg, msg_origport(msg)); 446 msg_set_origport(rmsg, msg_destport(msg)); 447 if (msg_short(msg)) { 448 msg_set_orignode(rmsg, tipc_own_addr); 449 /* leave name type & instance as zeroes */ 450 } else { 451 msg_set_orignode(rmsg, msg_destnode(msg)); 452 msg_set_nametype(rmsg, msg_nametype(msg)); 453 msg_set_nameinst(rmsg, msg_nameinst(msg)); 454 } 455 msg_set_size(rmsg, data_sz + hdr_sz); 456 skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz); 457 458 /* send self-abort message when rejecting on a connected port */ 459 if (msg_connected(msg)) { 460 struct sk_buff *abuf = NULL; 461 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 462 463 if (p_ptr) { 464 if (p_ptr->publ.connected) 465 abuf = port_build_self_abort_msg(p_ptr, err); 466 tipc_port_unlock(p_ptr); 467 } 468 tipc_net_route_msg(abuf); 469 } 470 471 /* send rejected message */ 472 buf_discard(buf); 473 tipc_net_route_msg(rbuf); 474 return data_sz; 475} 476 477int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 478 struct iovec const *msg_sect, u32 num_sect, 479 int err) 480{ 481 struct sk_buff *buf; 482 int res; 483 484 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 485 !p_ptr->user_port, &buf); 486 if (!buf) 487 return res; 488 489 return tipc_reject_msg(buf, err); 490} 491 492static void port_timeout(unsigned long ref) 493{ 494 struct port *p_ptr = tipc_port_lock(ref); 495 struct sk_buff *buf = NULL; 496 497 if (!p_ptr) 498 return; 499 500 if (!p_ptr->publ.connected) { 501 tipc_port_unlock(p_ptr); 502 return; 503 } 504 505 /* Last probe answered ? */ 506 if (p_ptr->probing_state == PROBING) { 507 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 508 } else { 509 buf = port_build_proto_msg(port_peerport(p_ptr), 510 port_peernode(p_ptr), 511 p_ptr->publ.ref, 512 tipc_own_addr, 513 CONN_MANAGER, 514 CONN_PROBE, 515 TIPC_OK, 516 port_out_seqno(p_ptr), 517 0); 518 port_incr_out_seqno(p_ptr); 519 p_ptr->probing_state = PROBING; 520 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 521 } 522 tipc_port_unlock(p_ptr); 523 tipc_net_route_msg(buf); 524} 525 526 527static void port_handle_node_down(unsigned long ref) 528{ 529 struct port *p_ptr = tipc_port_lock(ref); 530 struct sk_buff* buf = NULL; 531 532 if (!p_ptr) 533 return; 534 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 535 tipc_port_unlock(p_ptr); 536 tipc_net_route_msg(buf); 537} 538 539 540static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 541{ 542 u32 imp = msg_importance(&p_ptr->publ.phdr); 543 544 if (!p_ptr->publ.connected) 545 return NULL; 546 if (imp < TIPC_CRITICAL_IMPORTANCE) 547 imp++; 548 return port_build_proto_msg(p_ptr->publ.ref, 549 tipc_own_addr, 550 port_peerport(p_ptr), 551 port_peernode(p_ptr), 552 imp, 553 TIPC_CONN_MSG, 554 err, 555 p_ptr->last_in_seqno + 1, 556 0); 557} 558 559 560static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 561{ 562 u32 imp = msg_importance(&p_ptr->publ.phdr); 563 564 if (!p_ptr->publ.connected) 565 return NULL; 566 if (imp < TIPC_CRITICAL_IMPORTANCE) 567 imp++; 568 return port_build_proto_msg(port_peerport(p_ptr), 569 port_peernode(p_ptr), 570 p_ptr->publ.ref, 571 tipc_own_addr, 572 imp, 573 TIPC_CONN_MSG, 574 err, 575 port_out_seqno(p_ptr), 576 0); 577} 578 579void tipc_port_recv_proto_msg(struct sk_buff *buf) 580{ 581 struct tipc_msg *msg = buf_msg(buf); 582 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 583 u32 err = TIPC_OK; 584 struct sk_buff *r_buf = NULL; 585 struct sk_buff *abort_buf = NULL; 586 587 msg_dbg(msg, "PORT<RECV<:"); 588 589 if (!p_ptr) { 590 err = TIPC_ERR_NO_PORT; 591 } else if (p_ptr->publ.connected) { 592 if (port_peernode(p_ptr) != msg_orignode(msg)) 593 err = TIPC_ERR_NO_PORT; 594 if (port_peerport(p_ptr) != msg_origport(msg)) 595 err = TIPC_ERR_NO_PORT; 596 if (!err && msg_routed(msg)) { 597 u32 seqno = msg_transp_seqno(msg); 598 u32 myno = ++p_ptr->last_in_seqno; 599 if (seqno != myno) { 600 err = TIPC_ERR_NO_PORT; 601 abort_buf = port_build_self_abort_msg(p_ptr, err); 602 } 603 } 604 if (msg_type(msg) == CONN_ACK) { 605 int wakeup = tipc_port_congested(p_ptr) && 606 p_ptr->publ.congested && 607 p_ptr->wakeup; 608 p_ptr->acked += msg_msgcnt(msg); 609 if (tipc_port_congested(p_ptr)) 610 goto exit; 611 p_ptr->publ.congested = 0; 612 if (!wakeup) 613 goto exit; 614 p_ptr->wakeup(&p_ptr->publ); 615 goto exit; 616 } 617 } else if (p_ptr->publ.published) { 618 err = TIPC_ERR_NO_PORT; 619 } 620 if (err) { 621 r_buf = port_build_proto_msg(msg_origport(msg), 622 msg_orignode(msg), 623 msg_destport(msg), 624 tipc_own_addr, 625 TIPC_HIGH_IMPORTANCE, 626 TIPC_CONN_MSG, 627 err, 628 0, 629 0); 630 goto exit; 631 } 632 633 /* All is fine */ 634 if (msg_type(msg) == CONN_PROBE) { 635 r_buf = port_build_proto_msg(msg_origport(msg), 636 msg_orignode(msg), 637 msg_destport(msg), 638 tipc_own_addr, 639 CONN_MANAGER, 640 CONN_PROBE_REPLY, 641 TIPC_OK, 642 port_out_seqno(p_ptr), 643 0); 644 } 645 p_ptr->probing_state = CONFIRMED; 646 port_incr_out_seqno(p_ptr); 647exit: 648 if (p_ptr) 649 tipc_port_unlock(p_ptr); 650 tipc_net_route_msg(r_buf); 651 tipc_net_route_msg(abort_buf); 652 buf_discard(buf); 653} 654 655static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 656{ 657 struct publication *publ; 658 659 if (full_id) 660 tipc_printf(buf, "<%u.%u.%u:%u>:", 661 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 662 tipc_node(tipc_own_addr), p_ptr->publ.ref); 663 else 664 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 665 666 if (p_ptr->publ.connected) { 667 u32 dport = port_peerport(p_ptr); 668 u32 destnode = port_peernode(p_ptr); 669 670 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 671 tipc_zone(destnode), tipc_cluster(destnode), 672 tipc_node(destnode), dport); 673 if (p_ptr->publ.conn_type != 0) 674 tipc_printf(buf, " via {%u,%u}", 675 p_ptr->publ.conn_type, 676 p_ptr->publ.conn_instance); 677 } 678 else if (p_ptr->publ.published) { 679 tipc_printf(buf, " bound to"); 680 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 681 if (publ->lower == publ->upper) 682 tipc_printf(buf, " {%u,%u}", publ->type, 683 publ->lower); 684 else 685 tipc_printf(buf, " {%u,%u,%u}", publ->type, 686 publ->lower, publ->upper); 687 } 688 } 689 tipc_printf(buf, "\n"); 690} 691 692#define MAX_PORT_QUERY 32768 693 694struct sk_buff *tipc_port_get_ports(void) 695{ 696 struct sk_buff *buf; 697 struct tlv_desc *rep_tlv; 698 struct print_buf pb; 699 struct port *p_ptr; 700 int str_len; 701 702 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 703 if (!buf) 704 return NULL; 705 rep_tlv = (struct tlv_desc *)buf->data; 706 707 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 708 spin_lock_bh(&tipc_port_list_lock); 709 list_for_each_entry(p_ptr, &ports, port_list) { 710 spin_lock_bh(p_ptr->publ.lock); 711 port_print(p_ptr, &pb, 0); 712 spin_unlock_bh(p_ptr->publ.lock); 713 } 714 spin_unlock_bh(&tipc_port_list_lock); 715 str_len = tipc_printbuf_validate(&pb); 716 717 skb_put(buf, TLV_SPACE(str_len)); 718 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 719 720 return buf; 721} 722 723#if 0 724 725#define MAX_PORT_STATS 2000 726 727struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space) 728{ 729 u32 ref; 730 struct port *p_ptr; 731 struct sk_buff *buf; 732 struct tlv_desc *rep_tlv; 733 struct print_buf pb; 734 int str_len; 735 736 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF)) 737 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 738 739 ref = *(u32 *)TLV_DATA(req_tlv_area); 740 ref = ntohl(ref); 741 742 p_ptr = tipc_port_lock(ref); 743 if (!p_ptr) 744 return cfg_reply_error_string("port not found"); 745 746 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS)); 747 if (!buf) { 748 tipc_port_unlock(p_ptr); 749 return NULL; 750 } 751 rep_tlv = (struct tlv_desc *)buf->data; 752 753 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS); 754 port_print(p_ptr, &pb, 1); 755 /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */ 756 tipc_port_unlock(p_ptr); 757 str_len = tipc_printbuf_validate(&pb); 758 759 skb_put(buf, TLV_SPACE(str_len)); 760 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 761 762 return buf; 763} 764 765#endif 766 767void tipc_port_reinit(void) 768{ 769 struct port *p_ptr; 770 struct tipc_msg *msg; 771 772 spin_lock_bh(&tipc_port_list_lock); 773 list_for_each_entry(p_ptr, &ports, port_list) { 774 msg = &p_ptr->publ.phdr; 775 if (msg_orignode(msg) == tipc_own_addr) 776 break; 777 msg_set_prevnode(msg, tipc_own_addr); 778 msg_set_orignode(msg, tipc_own_addr); 779 } 780 spin_unlock_bh(&tipc_port_list_lock); 781} 782 783 784/* 785 * port_dispatcher_sigh(): Signal handler for messages destinated 786 * to the tipc_port interface. 787 */ 788 789static void port_dispatcher_sigh(void *dummy) 790{ 791 struct sk_buff *buf; 792 793 spin_lock_bh(&queue_lock); 794 buf = msg_queue_head; 795 msg_queue_head = NULL; 796 spin_unlock_bh(&queue_lock); 797 798 while (buf) { 799 struct port *p_ptr; 800 struct user_port *up_ptr; 801 struct tipc_portid orig; 802 struct tipc_name_seq dseq; 803 void *usr_handle; 804 int connected; 805 int published; 806 u32 message_type; 807 808 struct sk_buff *next = buf->next; 809 struct tipc_msg *msg = buf_msg(buf); 810 u32 dref = msg_destport(msg); 811 812 message_type = msg_type(msg); 813 if (message_type > TIPC_DIRECT_MSG) 814 goto reject; /* Unsupported message type */ 815 816 p_ptr = tipc_port_lock(dref); 817 if (!p_ptr) 818 goto reject; /* Port deleted while msg in queue */ 819 820 orig.ref = msg_origport(msg); 821 orig.node = msg_orignode(msg); 822 up_ptr = p_ptr->user_port; 823 usr_handle = up_ptr->usr_handle; 824 connected = p_ptr->publ.connected; 825 published = p_ptr->publ.published; 826 827 if (unlikely(msg_errcode(msg))) 828 goto err; 829 830 switch (message_type) { 831 832 case TIPC_CONN_MSG:{ 833 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 834 u32 peer_port = port_peerport(p_ptr); 835 u32 peer_node = port_peernode(p_ptr); 836 837 tipc_port_unlock(p_ptr); 838 if (unlikely(!cb)) 839 goto reject; 840 if (unlikely(!connected)) { 841 if (tipc_connect2port(dref, &orig)) 842 goto reject; 843 } else if ((msg_origport(msg) != peer_port) || 844 (msg_orignode(msg) != peer_node)) 845 goto reject; 846 if (unlikely(++p_ptr->publ.conn_unacked >= 847 TIPC_FLOW_CONTROL_WIN)) 848 tipc_acknowledge(dref, 849 p_ptr->publ.conn_unacked); 850 skb_pull(buf, msg_hdr_sz(msg)); 851 cb(usr_handle, dref, &buf, msg_data(msg), 852 msg_data_sz(msg)); 853 break; 854 } 855 case TIPC_DIRECT_MSG:{ 856 tipc_msg_event cb = up_ptr->msg_cb; 857 858 tipc_port_unlock(p_ptr); 859 if (unlikely(!cb || connected)) 860 goto reject; 861 skb_pull(buf, msg_hdr_sz(msg)); 862 cb(usr_handle, dref, &buf, msg_data(msg), 863 msg_data_sz(msg), msg_importance(msg), 864 &orig); 865 break; 866 } 867 case TIPC_MCAST_MSG: 868 case TIPC_NAMED_MSG:{ 869 tipc_named_msg_event cb = up_ptr->named_msg_cb; 870 871 tipc_port_unlock(p_ptr); 872 if (unlikely(!cb || connected || !published)) 873 goto reject; 874 dseq.type = msg_nametype(msg); 875 dseq.lower = msg_nameinst(msg); 876 dseq.upper = (message_type == TIPC_NAMED_MSG) 877 ? dseq.lower : msg_nameupper(msg); 878 skb_pull(buf, msg_hdr_sz(msg)); 879 cb(usr_handle, dref, &buf, msg_data(msg), 880 msg_data_sz(msg), msg_importance(msg), 881 &orig, &dseq); 882 break; 883 } 884 } 885 if (buf) 886 buf_discard(buf); 887 buf = next; 888 continue; 889err: 890 switch (message_type) { 891 892 case TIPC_CONN_MSG:{ 893 tipc_conn_shutdown_event cb = 894 up_ptr->conn_err_cb; 895 u32 peer_port = port_peerport(p_ptr); 896 u32 peer_node = port_peernode(p_ptr); 897 898 tipc_port_unlock(p_ptr); 899 if (!cb || !connected) 900 break; 901 if ((msg_origport(msg) != peer_port) || 902 (msg_orignode(msg) != peer_node)) 903 break; 904 tipc_disconnect(dref); 905 skb_pull(buf, msg_hdr_sz(msg)); 906 cb(usr_handle, dref, &buf, msg_data(msg), 907 msg_data_sz(msg), msg_errcode(msg)); 908 break; 909 } 910 case TIPC_DIRECT_MSG:{ 911 tipc_msg_err_event cb = up_ptr->err_cb; 912 913 tipc_port_unlock(p_ptr); 914 if (!cb || connected) 915 break; 916 skb_pull(buf, msg_hdr_sz(msg)); 917 cb(usr_handle, dref, &buf, msg_data(msg), 918 msg_data_sz(msg), msg_errcode(msg), &orig); 919 break; 920 } 921 case TIPC_MCAST_MSG: 922 case TIPC_NAMED_MSG:{ 923 tipc_named_msg_err_event cb = 924 up_ptr->named_err_cb; 925 926 tipc_port_unlock(p_ptr); 927 if (!cb || connected) 928 break; 929 dseq.type = msg_nametype(msg); 930 dseq.lower = msg_nameinst(msg); 931 dseq.upper = (message_type == TIPC_NAMED_MSG) 932 ? dseq.lower : msg_nameupper(msg); 933 skb_pull(buf, msg_hdr_sz(msg)); 934 cb(usr_handle, dref, &buf, msg_data(msg), 935 msg_data_sz(msg), msg_errcode(msg), &dseq); 936 break; 937 } 938 } 939 if (buf) 940 buf_discard(buf); 941 buf = next; 942 continue; 943reject: 944 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 945 buf = next; 946 } 947} 948 949/* 950 * port_dispatcher(): Dispatcher for messages destinated 951 * to the tipc_port interface. Called with port locked. 952 */ 953 954static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 955{ 956 buf->next = NULL; 957 spin_lock_bh(&queue_lock); 958 if (msg_queue_head) { 959 msg_queue_tail->next = buf; 960 msg_queue_tail = buf; 961 } else { 962 msg_queue_tail = msg_queue_head = buf; 963 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 964 } 965 spin_unlock_bh(&queue_lock); 966 return 0; 967} 968 969/* 970 * Wake up port after congestion: Called with port locked, 971 * 972 */ 973 974static void port_wakeup_sh(unsigned long ref) 975{ 976 struct port *p_ptr; 977 struct user_port *up_ptr; 978 tipc_continue_event cb = NULL; 979 void *uh = NULL; 980 981 p_ptr = tipc_port_lock(ref); 982 if (p_ptr) { 983 up_ptr = p_ptr->user_port; 984 if (up_ptr) { 985 cb = up_ptr->continue_event_cb; 986 uh = up_ptr->usr_handle; 987 } 988 tipc_port_unlock(p_ptr); 989 } 990 if (cb) 991 cb(uh, ref); 992} 993 994 995static void port_wakeup(struct tipc_port *p_ptr) 996{ 997 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 998} 999 1000void tipc_acknowledge(u32 ref, u32 ack) 1001{ 1002 struct port *p_ptr; 1003 struct sk_buff *buf = NULL; 1004 1005 p_ptr = tipc_port_lock(ref); 1006 if (!p_ptr) 1007 return; 1008 if (p_ptr->publ.connected) { 1009 p_ptr->publ.conn_unacked -= ack; 1010 buf = port_build_proto_msg(port_peerport(p_ptr), 1011 port_peernode(p_ptr), 1012 ref, 1013 tipc_own_addr, 1014 CONN_MANAGER, 1015 CONN_ACK, 1016 TIPC_OK, 1017 port_out_seqno(p_ptr), 1018 ack); 1019 } 1020 tipc_port_unlock(p_ptr); 1021 tipc_net_route_msg(buf); 1022} 1023 1024/* 1025 * tipc_createport(): user level call. Will add port to 1026 * registry if non-zero user_ref. 1027 */ 1028 1029int tipc_createport(u32 user_ref, 1030 void *usr_handle, 1031 unsigned int importance, 1032 tipc_msg_err_event error_cb, 1033 tipc_named_msg_err_event named_error_cb, 1034 tipc_conn_shutdown_event conn_error_cb, 1035 tipc_msg_event msg_cb, 1036 tipc_named_msg_event named_msg_cb, 1037 tipc_conn_msg_event conn_msg_cb, 1038 tipc_continue_event continue_event_cb,/* May be zero */ 1039 u32 *portref) 1040{ 1041 struct user_port *up_ptr; 1042 struct port *p_ptr; 1043 1044 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 1045 if (!up_ptr) { 1046 warn("Port creation failed, no memory\n"); 1047 return -ENOMEM; 1048 } 1049 p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher, 1050 port_wakeup, importance); 1051 if (!p_ptr) { 1052 kfree(up_ptr); 1053 return -ENOMEM; 1054 } 1055 1056 p_ptr->user_port = up_ptr; 1057 up_ptr->user_ref = user_ref; 1058 up_ptr->usr_handle = usr_handle; 1059 up_ptr->ref = p_ptr->publ.ref; 1060 up_ptr->err_cb = error_cb; 1061 up_ptr->named_err_cb = named_error_cb; 1062 up_ptr->conn_err_cb = conn_error_cb; 1063 up_ptr->msg_cb = msg_cb; 1064 up_ptr->named_msg_cb = named_msg_cb; 1065 up_ptr->conn_msg_cb = conn_msg_cb; 1066 up_ptr->continue_event_cb = continue_event_cb; 1067 INIT_LIST_HEAD(&up_ptr->uport_list); 1068 tipc_reg_add_port(up_ptr); 1069 *portref = p_ptr->publ.ref; 1070 tipc_port_unlock(p_ptr); 1071 return 0; 1072} 1073 1074int tipc_ownidentity(u32 ref, struct tipc_portid *id) 1075{ 1076 id->ref = ref; 1077 id->node = tipc_own_addr; 1078 return 0; 1079} 1080 1081int tipc_portimportance(u32 ref, unsigned int *importance) 1082{ 1083 struct port *p_ptr; 1084 1085 p_ptr = tipc_port_lock(ref); 1086 if (!p_ptr) 1087 return -EINVAL; 1088 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 1089 tipc_port_unlock(p_ptr); 1090 return 0; 1091} 1092 1093int tipc_set_portimportance(u32 ref, unsigned int imp) 1094{ 1095 struct port *p_ptr; 1096 1097 if (imp > TIPC_CRITICAL_IMPORTANCE) 1098 return -EINVAL; 1099 1100 p_ptr = tipc_port_lock(ref); 1101 if (!p_ptr) 1102 return -EINVAL; 1103 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 1104 tipc_port_unlock(p_ptr); 1105 return 0; 1106} 1107 1108 1109int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1110{ 1111 struct port *p_ptr; 1112 struct publication *publ; 1113 u32 key; 1114 int res = -EINVAL; 1115 1116 p_ptr = tipc_port_lock(ref); 1117 if (!p_ptr) 1118 return -EINVAL; 1119 1120 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " 1121 "lower = %u, upper = %u\n", 1122 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper); 1123 if (p_ptr->publ.connected) 1124 goto exit; 1125 if (seq->lower > seq->upper) 1126 goto exit; 1127 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 1128 goto exit; 1129 key = ref + p_ptr->pub_count + 1; 1130 if (key == ref) { 1131 res = -EADDRINUSE; 1132 goto exit; 1133 } 1134 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1135 scope, p_ptr->publ.ref, key); 1136 if (publ) { 1137 list_add(&publ->pport_list, &p_ptr->publications); 1138 p_ptr->pub_count++; 1139 p_ptr->publ.published = 1; 1140 res = 0; 1141 } 1142exit: 1143 tipc_port_unlock(p_ptr); 1144 return res; 1145} 1146 1147int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1148{ 1149 struct port *p_ptr; 1150 struct publication *publ; 1151 struct publication *tpubl; 1152 int res = -EINVAL; 1153 1154 p_ptr = tipc_port_lock(ref); 1155 if (!p_ptr) 1156 return -EINVAL; 1157 if (!seq) { 1158 list_for_each_entry_safe(publ, tpubl, 1159 &p_ptr->publications, pport_list) { 1160 tipc_nametbl_withdraw(publ->type, publ->lower, 1161 publ->ref, publ->key); 1162 } 1163 res = 0; 1164 } else { 1165 list_for_each_entry_safe(publ, tpubl, 1166 &p_ptr->publications, pport_list) { 1167 if (publ->scope != scope) 1168 continue; 1169 if (publ->type != seq->type) 1170 continue; 1171 if (publ->lower != seq->lower) 1172 continue; 1173 if (publ->upper != seq->upper) 1174 break; 1175 tipc_nametbl_withdraw(publ->type, publ->lower, 1176 publ->ref, publ->key); 1177 res = 0; 1178 break; 1179 } 1180 } 1181 if (list_empty(&p_ptr->publications)) 1182 p_ptr->publ.published = 0; 1183 tipc_port_unlock(p_ptr); 1184 return res; 1185} 1186 1187int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1188{ 1189 struct port *p_ptr; 1190 struct tipc_msg *msg; 1191 int res = -EINVAL; 1192 1193 p_ptr = tipc_port_lock(ref); 1194 if (!p_ptr) 1195 return -EINVAL; 1196 if (p_ptr->publ.published || p_ptr->publ.connected) 1197 goto exit; 1198 if (!peer->ref) 1199 goto exit; 1200 1201 msg = &p_ptr->publ.phdr; 1202 msg_set_destnode(msg, peer->node); 1203 msg_set_destport(msg, peer->ref); 1204 msg_set_orignode(msg, tipc_own_addr); 1205 msg_set_origport(msg, p_ptr->publ.ref); 1206 msg_set_transp_seqno(msg, 42); 1207 msg_set_type(msg, TIPC_CONN_MSG); 1208 if (!may_route(peer->node)) 1209 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1210 else 1211 msg_set_hdr_sz(msg, LONG_H_SIZE); 1212 1213 p_ptr->probing_interval = PROBING_INTERVAL; 1214 p_ptr->probing_state = CONFIRMED; 1215 p_ptr->publ.connected = 1; 1216 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1217 1218 tipc_nodesub_subscribe(&p_ptr->subscription,peer->node, 1219 (void *)(unsigned long)ref, 1220 (net_ev_handler)port_handle_node_down); 1221 res = 0; 1222exit: 1223 tipc_port_unlock(p_ptr); 1224 p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1225 return res; 1226} 1227 1228/** 1229 * tipc_disconnect_port - disconnect port from peer 1230 * 1231 * Port must be locked. 1232 */ 1233 1234int tipc_disconnect_port(struct tipc_port *tp_ptr) 1235{ 1236 int res; 1237 1238 if (tp_ptr->connected) { 1239 tp_ptr->connected = 0; 1240 /* let timer expire on it's own to avoid deadlock! */ 1241 tipc_nodesub_unsubscribe( 1242 &((struct port *)tp_ptr)->subscription); 1243 res = 0; 1244 } else { 1245 res = -ENOTCONN; 1246 } 1247 return res; 1248} 1249 1250/* 1251 * tipc_disconnect(): Disconnect port form peer. 1252 * This is a node local operation. 1253 */ 1254 1255int tipc_disconnect(u32 ref) 1256{ 1257 struct port *p_ptr; 1258 int res; 1259 1260 p_ptr = tipc_port_lock(ref); 1261 if (!p_ptr) 1262 return -EINVAL; 1263 res = tipc_disconnect_port((struct tipc_port *)p_ptr); 1264 tipc_port_unlock(p_ptr); 1265 return res; 1266} 1267 1268/* 1269 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1270 */ 1271int tipc_shutdown(u32 ref) 1272{ 1273 struct port *p_ptr; 1274 struct sk_buff *buf = NULL; 1275 1276 p_ptr = tipc_port_lock(ref); 1277 if (!p_ptr) 1278 return -EINVAL; 1279 1280 if (p_ptr->publ.connected) { 1281 u32 imp = msg_importance(&p_ptr->publ.phdr); 1282 if (imp < TIPC_CRITICAL_IMPORTANCE) 1283 imp++; 1284 buf = port_build_proto_msg(port_peerport(p_ptr), 1285 port_peernode(p_ptr), 1286 ref, 1287 tipc_own_addr, 1288 imp, 1289 TIPC_CONN_MSG, 1290 TIPC_CONN_SHUTDOWN, 1291 port_out_seqno(p_ptr), 1292 0); 1293 } 1294 tipc_port_unlock(p_ptr); 1295 tipc_net_route_msg(buf); 1296 return tipc_disconnect(ref); 1297} 1298 1299int tipc_isconnected(u32 ref, int *isconnected) 1300{ 1301 struct port *p_ptr; 1302 1303 p_ptr = tipc_port_lock(ref); 1304 if (!p_ptr) 1305 return -EINVAL; 1306 *isconnected = p_ptr->publ.connected; 1307 tipc_port_unlock(p_ptr); 1308 return 0; 1309} 1310 1311int tipc_peer(u32 ref, struct tipc_portid *peer) 1312{ 1313 struct port *p_ptr; 1314 int res; 1315 1316 p_ptr = tipc_port_lock(ref); 1317 if (!p_ptr) 1318 return -EINVAL; 1319 if (p_ptr->publ.connected) { 1320 peer->ref = port_peerport(p_ptr); 1321 peer->node = port_peernode(p_ptr); 1322 res = 0; 1323 } else 1324 res = -ENOTCONN; 1325 tipc_port_unlock(p_ptr); 1326 return res; 1327} 1328 1329int tipc_ref_valid(u32 ref) 1330{ 1331 /* Works irrespective of type */ 1332 return !!tipc_ref_deref(ref); 1333} 1334 1335 1336/* 1337 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1338 * message for this node. 1339 */ 1340 1341int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1342 struct iovec const *msg_sect) 1343{ 1344 struct sk_buff *buf; 1345 int res; 1346 1347 res = msg_build(&sender->publ.phdr, msg_sect, num_sect, 1348 MAX_MSG_SIZE, !sender->user_port, &buf); 1349 if (likely(buf)) 1350 tipc_port_recv_msg(buf); 1351 return res; 1352} 1353 1354/** 1355 * tipc_send - send message sections on connection 1356 */ 1357 1358int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1359{ 1360 struct port *p_ptr; 1361 u32 destnode; 1362 int res; 1363 1364 p_ptr = tipc_port_deref(ref); 1365 if (!p_ptr || !p_ptr->publ.connected) 1366 return -EINVAL; 1367 1368 p_ptr->publ.congested = 1; 1369 if (!tipc_port_congested(p_ptr)) { 1370 destnode = port_peernode(p_ptr); 1371 if (likely(destnode != tipc_own_addr)) 1372 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1373 destnode); 1374 else 1375 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1376 1377 if (likely(res != -ELINKCONG)) { 1378 port_incr_out_seqno(p_ptr); 1379 p_ptr->publ.congested = 0; 1380 p_ptr->sent++; 1381 return res; 1382 } 1383 } 1384 if (port_unreliable(p_ptr)) { 1385 p_ptr->publ.congested = 0; 1386 /* Just calculate msg length and return */ 1387 return msg_calc_data_size(msg_sect, num_sect); 1388 } 1389 return -ELINKCONG; 1390} 1391 1392/** 1393 * tipc_send_buf - send message buffer on connection 1394 */ 1395 1396int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz) 1397{ 1398 struct port *p_ptr; 1399 struct tipc_msg *msg; 1400 u32 destnode; 1401 u32 hsz; 1402 u32 sz; 1403 u32 res; 1404 1405 p_ptr = tipc_port_deref(ref); 1406 if (!p_ptr || !p_ptr->publ.connected) 1407 return -EINVAL; 1408 1409 msg = &p_ptr->publ.phdr; 1410 hsz = msg_hdr_sz(msg); 1411 sz = hsz + dsz; 1412 msg_set_size(msg, sz); 1413 if (skb_cow(buf, hsz)) 1414 return -ENOMEM; 1415 1416 skb_push(buf, hsz); 1417 skb_copy_to_linear_data(buf, msg, hsz); 1418 destnode = msg_destnode(msg); 1419 p_ptr->publ.congested = 1; 1420 if (!tipc_port_congested(p_ptr)) { 1421 if (likely(destnode != tipc_own_addr)) 1422 res = tipc_send_buf_fast(buf, destnode); 1423 else { 1424 tipc_port_recv_msg(buf); 1425 res = sz; 1426 } 1427 if (likely(res != -ELINKCONG)) { 1428 port_incr_out_seqno(p_ptr); 1429 p_ptr->sent++; 1430 p_ptr->publ.congested = 0; 1431 return res; 1432 } 1433 } 1434 if (port_unreliable(p_ptr)) { 1435 p_ptr->publ.congested = 0; 1436 return dsz; 1437 } 1438 return -ELINKCONG; 1439} 1440 1441/** 1442 * tipc_forward2name - forward message sections to port name 1443 */ 1444 1445int tipc_forward2name(u32 ref, 1446 struct tipc_name const *name, 1447 u32 domain, 1448 u32 num_sect, 1449 struct iovec const *msg_sect, 1450 struct tipc_portid const *orig, 1451 unsigned int importance) 1452{ 1453 struct port *p_ptr; 1454 struct tipc_msg *msg; 1455 u32 destnode = domain; 1456 u32 destport = 0; 1457 int res; 1458 1459 p_ptr = tipc_port_deref(ref); 1460 if (!p_ptr || p_ptr->publ.connected) 1461 return -EINVAL; 1462 1463 msg = &p_ptr->publ.phdr; 1464 msg_set_type(msg, TIPC_NAMED_MSG); 1465 msg_set_orignode(msg, orig->node); 1466 msg_set_origport(msg, orig->ref); 1467 msg_set_hdr_sz(msg, LONG_H_SIZE); 1468 msg_set_nametype(msg, name->type); 1469 msg_set_nameinst(msg, name->instance); 1470 msg_set_lookup_scope(msg, addr_scope(domain)); 1471 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1472 msg_set_importance(msg,importance); 1473 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1474 msg_set_destnode(msg, destnode); 1475 msg_set_destport(msg, destport); 1476 1477 if (likely(destport || destnode)) { 1478 p_ptr->sent++; 1479 if (likely(destnode == tipc_own_addr)) 1480 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1481 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1482 destnode); 1483 if (likely(res != -ELINKCONG)) 1484 return res; 1485 if (port_unreliable(p_ptr)) { 1486 /* Just calculate msg length and return */ 1487 return msg_calc_data_size(msg_sect, num_sect); 1488 } 1489 return -ELINKCONG; 1490 } 1491 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1492 TIPC_ERR_NO_NAME); 1493} 1494 1495/** 1496 * tipc_send2name - send message sections to port name 1497 */ 1498 1499int tipc_send2name(u32 ref, 1500 struct tipc_name const *name, 1501 unsigned int domain, 1502 unsigned int num_sect, 1503 struct iovec const *msg_sect) 1504{ 1505 struct tipc_portid orig; 1506 1507 orig.ref = ref; 1508 orig.node = tipc_own_addr; 1509 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig, 1510 TIPC_PORT_IMPORTANCE); 1511} 1512 1513/** 1514 * tipc_forward_buf2name - forward message buffer to port name 1515 */ 1516 1517int tipc_forward_buf2name(u32 ref, 1518 struct tipc_name const *name, 1519 u32 domain, 1520 struct sk_buff *buf, 1521 unsigned int dsz, 1522 struct tipc_portid const *orig, 1523 unsigned int importance) 1524{ 1525 struct port *p_ptr; 1526 struct tipc_msg *msg; 1527 u32 destnode = domain; 1528 u32 destport = 0; 1529 int res; 1530 1531 p_ptr = (struct port *)tipc_ref_deref(ref); 1532 if (!p_ptr || p_ptr->publ.connected) 1533 return -EINVAL; 1534 1535 msg = &p_ptr->publ.phdr; 1536 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1537 msg_set_importance(msg, importance); 1538 msg_set_type(msg, TIPC_NAMED_MSG); 1539 msg_set_orignode(msg, orig->node); 1540 msg_set_origport(msg, orig->ref); 1541 msg_set_nametype(msg, name->type); 1542 msg_set_nameinst(msg, name->instance); 1543 msg_set_lookup_scope(msg, addr_scope(domain)); 1544 msg_set_hdr_sz(msg, LONG_H_SIZE); 1545 msg_set_size(msg, LONG_H_SIZE + dsz); 1546 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1547 msg_set_destnode(msg, destnode); 1548 msg_set_destport(msg, destport); 1549 msg_dbg(msg, "forw2name ==> "); 1550 if (skb_cow(buf, LONG_H_SIZE)) 1551 return -ENOMEM; 1552 skb_push(buf, LONG_H_SIZE); 1553 skb_copy_to_linear_data(buf, msg, LONG_H_SIZE); 1554 msg_dbg(buf_msg(buf),"PREP:"); 1555 if (likely(destport || destnode)) { 1556 p_ptr->sent++; 1557 if (destnode == tipc_own_addr) 1558 return tipc_port_recv_msg(buf); 1559 res = tipc_send_buf_fast(buf, destnode); 1560 if (likely(res != -ELINKCONG)) 1561 return res; 1562 if (port_unreliable(p_ptr)) 1563 return dsz; 1564 return -ELINKCONG; 1565 } 1566 return tipc_reject_msg(buf, TIPC_ERR_NO_NAME); 1567} 1568 1569/** 1570 * tipc_send_buf2name - send message buffer to port name 1571 */ 1572 1573int tipc_send_buf2name(u32 ref, 1574 struct tipc_name const *dest, 1575 u32 domain, 1576 struct sk_buff *buf, 1577 unsigned int dsz) 1578{ 1579 struct tipc_portid orig; 1580 1581 orig.ref = ref; 1582 orig.node = tipc_own_addr; 1583 return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig, 1584 TIPC_PORT_IMPORTANCE); 1585} 1586 1587/** 1588 * tipc_forward2port - forward message sections to port identity 1589 */ 1590 1591int tipc_forward2port(u32 ref, 1592 struct tipc_portid const *dest, 1593 unsigned int num_sect, 1594 struct iovec const *msg_sect, 1595 struct tipc_portid const *orig, 1596 unsigned int importance) 1597{ 1598 struct port *p_ptr; 1599 struct tipc_msg *msg; 1600 int res; 1601 1602 p_ptr = tipc_port_deref(ref); 1603 if (!p_ptr || p_ptr->publ.connected) 1604 return -EINVAL; 1605 1606 msg = &p_ptr->publ.phdr; 1607 msg_set_type(msg, TIPC_DIRECT_MSG); 1608 msg_set_orignode(msg, orig->node); 1609 msg_set_origport(msg, orig->ref); 1610 msg_set_destnode(msg, dest->node); 1611 msg_set_destport(msg, dest->ref); 1612 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1613 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1614 msg_set_importance(msg, importance); 1615 p_ptr->sent++; 1616 if (dest->node == tipc_own_addr) 1617 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1618 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1619 if (likely(res != -ELINKCONG)) 1620 return res; 1621 if (port_unreliable(p_ptr)) { 1622 /* Just calculate msg length and return */ 1623 return msg_calc_data_size(msg_sect, num_sect); 1624 } 1625 return -ELINKCONG; 1626} 1627 1628/** 1629 * tipc_send2port - send message sections to port identity 1630 */ 1631 1632int tipc_send2port(u32 ref, 1633 struct tipc_portid const *dest, 1634 unsigned int num_sect, 1635 struct iovec const *msg_sect) 1636{ 1637 struct tipc_portid orig; 1638 1639 orig.ref = ref; 1640 orig.node = tipc_own_addr; 1641 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 1642 TIPC_PORT_IMPORTANCE); 1643} 1644 1645/** 1646 * tipc_forward_buf2port - forward message buffer to port identity 1647 */ 1648int tipc_forward_buf2port(u32 ref, 1649 struct tipc_portid const *dest, 1650 struct sk_buff *buf, 1651 unsigned int dsz, 1652 struct tipc_portid const *orig, 1653 unsigned int importance) 1654{ 1655 struct port *p_ptr; 1656 struct tipc_msg *msg; 1657 int res; 1658 1659 p_ptr = (struct port *)tipc_ref_deref(ref); 1660 if (!p_ptr || p_ptr->publ.connected) 1661 return -EINVAL; 1662 1663 msg = &p_ptr->publ.phdr; 1664 msg_set_type(msg, TIPC_DIRECT_MSG); 1665 msg_set_orignode(msg, orig->node); 1666 msg_set_origport(msg, orig->ref); 1667 msg_set_destnode(msg, dest->node); 1668 msg_set_destport(msg, dest->ref); 1669 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1670 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1671 msg_set_importance(msg, importance); 1672 msg_set_size(msg, DIR_MSG_H_SIZE + dsz); 1673 if (skb_cow(buf, DIR_MSG_H_SIZE)) 1674 return -ENOMEM; 1675 1676 skb_push(buf, DIR_MSG_H_SIZE); 1677 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE); 1678 msg_dbg(msg, "buf2port: "); 1679 p_ptr->sent++; 1680 if (dest->node == tipc_own_addr) 1681 return tipc_port_recv_msg(buf); 1682 res = tipc_send_buf_fast(buf, dest->node); 1683 if (likely(res != -ELINKCONG)) 1684 return res; 1685 if (port_unreliable(p_ptr)) 1686 return dsz; 1687 return -ELINKCONG; 1688} 1689 1690/** 1691 * tipc_send_buf2port - send message buffer to port identity 1692 */ 1693 1694int tipc_send_buf2port(u32 ref, 1695 struct tipc_portid const *dest, 1696 struct sk_buff *buf, 1697 unsigned int dsz) 1698{ 1699 struct tipc_portid orig; 1700 1701 orig.ref = ref; 1702 orig.node = tipc_own_addr; 1703 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 1704 TIPC_PORT_IMPORTANCE); 1705} 1706