Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.37-rc2 1464 lines 36 kB view raw
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2007, Ericsson AB 5 * Copyright (c) 2004-2008, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "dbg.h" 40#include "port.h" 41#include "addr.h" 42#include "link.h" 43#include "node.h" 44#include "name_table.h" 45#include "user_reg.h" 46#include "msg.h" 47#include "bcast.h" 48 49/* Connection management: */ 50#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 51#define CONFIRMED 0 52#define PROBING 1 53 54#define MAX_REJECT_SIZE 1024 55 56static struct sk_buff *msg_queue_head = NULL; 57static struct sk_buff *msg_queue_tail = NULL; 58 59DEFINE_SPINLOCK(tipc_port_list_lock); 60static DEFINE_SPINLOCK(queue_lock); 61 62static LIST_HEAD(ports); 63static void port_handle_node_down(unsigned long ref); 64static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err); 65static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err); 66static void port_timeout(unsigned long ref); 67 68 69static u32 port_peernode(struct port *p_ptr) 70{ 71 return msg_destnode(&p_ptr->publ.phdr); 72} 73 74static u32 port_peerport(struct port *p_ptr) 75{ 76 return msg_destport(&p_ptr->publ.phdr); 77} 78 79static u32 port_out_seqno(struct port *p_ptr) 80{ 81 return msg_transp_seqno(&p_ptr->publ.phdr); 82} 83 84static void port_incr_out_seqno(struct port *p_ptr) 85{ 86 struct tipc_msg *m = &p_ptr->publ.phdr; 87 88 if (likely(!msg_routed(m))) 89 return; 90 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1)); 91} 92 93/** 94 * tipc_multicast - send a multicast message to local and remote destinations 95 */ 96 97int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, 98 u32 num_sect, struct iovec const *msg_sect) 99{ 100 struct tipc_msg *hdr; 101 struct sk_buff *buf; 102 struct sk_buff *ibuf = NULL; 103 struct port_list dports = {0, NULL, }; 104 struct port *oport = tipc_port_deref(ref); 105 int ext_targets; 106 int res; 107 108 if (unlikely(!oport)) 109 return -EINVAL; 110 111 /* Create multicast message */ 112 113 hdr = &oport->publ.phdr; 114 msg_set_type(hdr, TIPC_MCAST_MSG); 115 msg_set_nametype(hdr, seq->type); 116 msg_set_namelower(hdr, seq->lower); 117 msg_set_nameupper(hdr, seq->upper); 118 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 119 res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 120 !oport->user_port, &buf); 121 if (unlikely(!buf)) 122 return res; 123 124 /* Figure out where to send multicast message */ 125 126 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 127 TIPC_NODE_SCOPE, &dports); 128 129 /* Send message to destinations (duplicate it only if necessary) */ 130 131 if (ext_targets) { 132 if (dports.count != 0) { 133 ibuf = skb_copy(buf, GFP_ATOMIC); 134 if (ibuf == NULL) { 135 tipc_port_list_free(&dports); 136 buf_discard(buf); 137 return -ENOMEM; 138 } 139 } 140 res = tipc_bclink_send_msg(buf); 141 if ((res < 0) && (dports.count != 0)) { 142 buf_discard(ibuf); 143 } 144 } else { 145 ibuf = buf; 146 } 147 148 if (res >= 0) { 149 if (ibuf) 150 tipc_port_recv_mcast(ibuf, &dports); 151 } else { 152 tipc_port_list_free(&dports); 153 } 154 return res; 155} 156 157/** 158 * tipc_port_recv_mcast - deliver multicast message to all destination ports 159 * 160 * If there is no port list, perform a lookup to create one 161 */ 162 163void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 164{ 165 struct tipc_msg* msg; 166 struct port_list dports = {0, NULL, }; 167 struct port_list *item = dp; 168 int cnt = 0; 169 170 msg = buf_msg(buf); 171 172 /* Create destination port list, if one wasn't supplied */ 173 174 if (dp == NULL) { 175 tipc_nametbl_mc_translate(msg_nametype(msg), 176 msg_namelower(msg), 177 msg_nameupper(msg), 178 TIPC_CLUSTER_SCOPE, 179 &dports); 180 item = dp = &dports; 181 } 182 183 /* Deliver a copy of message to each destination port */ 184 185 if (dp->count != 0) { 186 if (dp->count == 1) { 187 msg_set_destport(msg, dp->ports[0]); 188 tipc_port_recv_msg(buf); 189 tipc_port_list_free(dp); 190 return; 191 } 192 for (; cnt < dp->count; cnt++) { 193 int index = cnt % PLSIZE; 194 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 195 196 if (b == NULL) { 197 warn("Unable to deliver multicast message(s)\n"); 198 msg_dbg(msg, "LOST:"); 199 goto exit; 200 } 201 if ((index == 0) && (cnt != 0)) { 202 item = item->next; 203 } 204 msg_set_destport(buf_msg(b),item->ports[index]); 205 tipc_port_recv_msg(b); 206 } 207 } 208exit: 209 buf_discard(buf); 210 tipc_port_list_free(dp); 211} 212 213/** 214 * tipc_createport_raw - create a generic TIPC port 215 * 216 * Returns pointer to (locked) TIPC port, or NULL if unable to create it 217 */ 218 219struct tipc_port *tipc_createport_raw(void *usr_handle, 220 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 221 void (*wakeup)(struct tipc_port *), 222 const u32 importance) 223{ 224 struct port *p_ptr; 225 struct tipc_msg *msg; 226 u32 ref; 227 228 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 229 if (!p_ptr) { 230 warn("Port creation failed, no memory\n"); 231 return NULL; 232 } 233 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 234 if (!ref) { 235 warn("Port creation failed, reference table exhausted\n"); 236 kfree(p_ptr); 237 return NULL; 238 } 239 240 p_ptr->publ.usr_handle = usr_handle; 241 p_ptr->publ.max_pkt = MAX_PKT_DEFAULT; 242 p_ptr->publ.ref = ref; 243 msg = &p_ptr->publ.phdr; 244 tipc_msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0); 245 msg_set_origport(msg, ref); 246 p_ptr->last_in_seqno = 41; 247 p_ptr->sent = 1; 248 INIT_LIST_HEAD(&p_ptr->wait_list); 249 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 250 p_ptr->dispatcher = dispatcher; 251 p_ptr->wakeup = wakeup; 252 p_ptr->user_port = NULL; 253 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 254 spin_lock_bh(&tipc_port_list_lock); 255 INIT_LIST_HEAD(&p_ptr->publications); 256 INIT_LIST_HEAD(&p_ptr->port_list); 257 list_add_tail(&p_ptr->port_list, &ports); 258 spin_unlock_bh(&tipc_port_list_lock); 259 return &(p_ptr->publ); 260} 261 262int tipc_deleteport(u32 ref) 263{ 264 struct port *p_ptr; 265 struct sk_buff *buf = NULL; 266 267 tipc_withdraw(ref, 0, NULL); 268 p_ptr = tipc_port_lock(ref); 269 if (!p_ptr) 270 return -EINVAL; 271 272 tipc_ref_discard(ref); 273 tipc_port_unlock(p_ptr); 274 275 k_cancel_timer(&p_ptr->timer); 276 if (p_ptr->publ.connected) { 277 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 278 tipc_nodesub_unsubscribe(&p_ptr->subscription); 279 } 280 if (p_ptr->user_port) { 281 tipc_reg_remove_port(p_ptr->user_port); 282 kfree(p_ptr->user_port); 283 } 284 285 spin_lock_bh(&tipc_port_list_lock); 286 list_del(&p_ptr->port_list); 287 list_del(&p_ptr->wait_list); 288 spin_unlock_bh(&tipc_port_list_lock); 289 k_term_timer(&p_ptr->timer); 290 kfree(p_ptr); 291 dbg("Deleted port %u\n", ref); 292 tipc_net_route_msg(buf); 293 return 0; 294} 295 296static int port_unreliable(struct port *p_ptr) 297{ 298 return msg_src_droppable(&p_ptr->publ.phdr); 299} 300 301int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 302{ 303 struct port *p_ptr; 304 305 p_ptr = tipc_port_lock(ref); 306 if (!p_ptr) 307 return -EINVAL; 308 *isunreliable = port_unreliable(p_ptr); 309 tipc_port_unlock(p_ptr); 310 return 0; 311} 312 313int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 314{ 315 struct port *p_ptr; 316 317 p_ptr = tipc_port_lock(ref); 318 if (!p_ptr) 319 return -EINVAL; 320 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 321 tipc_port_unlock(p_ptr); 322 return 0; 323} 324 325static int port_unreturnable(struct port *p_ptr) 326{ 327 return msg_dest_droppable(&p_ptr->publ.phdr); 328} 329 330int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 331{ 332 struct port *p_ptr; 333 334 p_ptr = tipc_port_lock(ref); 335 if (!p_ptr) 336 return -EINVAL; 337 *isunrejectable = port_unreturnable(p_ptr); 338 tipc_port_unlock(p_ptr); 339 return 0; 340} 341 342int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 343{ 344 struct port *p_ptr; 345 346 p_ptr = tipc_port_lock(ref); 347 if (!p_ptr) 348 return -EINVAL; 349 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 350 tipc_port_unlock(p_ptr); 351 return 0; 352} 353 354/* 355 * port_build_proto_msg(): build a port level protocol 356 * or a connection abortion message. Called with 357 * tipc_port lock on. 358 */ 359static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 360 u32 origport, u32 orignode, 361 u32 usr, u32 type, u32 err, 362 u32 seqno, u32 ack) 363{ 364 struct sk_buff *buf; 365 struct tipc_msg *msg; 366 367 buf = tipc_buf_acquire(LONG_H_SIZE); 368 if (buf) { 369 msg = buf_msg(buf); 370 tipc_msg_init(msg, usr, type, LONG_H_SIZE, destnode); 371 msg_set_errcode(msg, err); 372 msg_set_destport(msg, destport); 373 msg_set_origport(msg, origport); 374 msg_set_orignode(msg, orignode); 375 msg_set_transp_seqno(msg, seqno); 376 msg_set_msgcnt(msg, ack); 377 msg_dbg(msg, "PORT>SEND>:"); 378 } 379 return buf; 380} 381 382int tipc_reject_msg(struct sk_buff *buf, u32 err) 383{ 384 struct tipc_msg *msg = buf_msg(buf); 385 struct sk_buff *rbuf; 386 struct tipc_msg *rmsg; 387 int hdr_sz; 388 u32 imp = msg_importance(msg); 389 u32 data_sz = msg_data_sz(msg); 390 391 if (data_sz > MAX_REJECT_SIZE) 392 data_sz = MAX_REJECT_SIZE; 393 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) 394 imp++; 395 msg_dbg(msg, "port->rej: "); 396 397 /* discard rejected message if it shouldn't be returned to sender */ 398 if (msg_errcode(msg) || msg_dest_droppable(msg)) { 399 buf_discard(buf); 400 return data_sz; 401 } 402 403 /* construct rejected message */ 404 if (msg_mcast(msg)) 405 hdr_sz = MCAST_H_SIZE; 406 else 407 hdr_sz = LONG_H_SIZE; 408 rbuf = tipc_buf_acquire(data_sz + hdr_sz); 409 if (rbuf == NULL) { 410 buf_discard(buf); 411 return data_sz; 412 } 413 rmsg = buf_msg(rbuf); 414 tipc_msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg)); 415 msg_set_errcode(rmsg, err); 416 msg_set_destport(rmsg, msg_origport(msg)); 417 msg_set_origport(rmsg, msg_destport(msg)); 418 if (msg_short(msg)) { 419 msg_set_orignode(rmsg, tipc_own_addr); 420 /* leave name type & instance as zeroes */ 421 } else { 422 msg_set_orignode(rmsg, msg_destnode(msg)); 423 msg_set_nametype(rmsg, msg_nametype(msg)); 424 msg_set_nameinst(rmsg, msg_nameinst(msg)); 425 } 426 msg_set_size(rmsg, data_sz + hdr_sz); 427 skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz); 428 429 /* send self-abort message when rejecting on a connected port */ 430 if (msg_connected(msg)) { 431 struct sk_buff *abuf = NULL; 432 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 433 434 if (p_ptr) { 435 if (p_ptr->publ.connected) 436 abuf = port_build_self_abort_msg(p_ptr, err); 437 tipc_port_unlock(p_ptr); 438 } 439 tipc_net_route_msg(abuf); 440 } 441 442 /* send rejected message */ 443 buf_discard(buf); 444 tipc_net_route_msg(rbuf); 445 return data_sz; 446} 447 448int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 449 struct iovec const *msg_sect, u32 num_sect, 450 int err) 451{ 452 struct sk_buff *buf; 453 int res; 454 455 res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 456 !p_ptr->user_port, &buf); 457 if (!buf) 458 return res; 459 460 return tipc_reject_msg(buf, err); 461} 462 463static void port_timeout(unsigned long ref) 464{ 465 struct port *p_ptr = tipc_port_lock(ref); 466 struct sk_buff *buf = NULL; 467 468 if (!p_ptr) 469 return; 470 471 if (!p_ptr->publ.connected) { 472 tipc_port_unlock(p_ptr); 473 return; 474 } 475 476 /* Last probe answered ? */ 477 if (p_ptr->probing_state == PROBING) { 478 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 479 } else { 480 buf = port_build_proto_msg(port_peerport(p_ptr), 481 port_peernode(p_ptr), 482 p_ptr->publ.ref, 483 tipc_own_addr, 484 CONN_MANAGER, 485 CONN_PROBE, 486 TIPC_OK, 487 port_out_seqno(p_ptr), 488 0); 489 port_incr_out_seqno(p_ptr); 490 p_ptr->probing_state = PROBING; 491 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 492 } 493 tipc_port_unlock(p_ptr); 494 tipc_net_route_msg(buf); 495} 496 497 498static void port_handle_node_down(unsigned long ref) 499{ 500 struct port *p_ptr = tipc_port_lock(ref); 501 struct sk_buff* buf = NULL; 502 503 if (!p_ptr) 504 return; 505 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 506 tipc_port_unlock(p_ptr); 507 tipc_net_route_msg(buf); 508} 509 510 511static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 512{ 513 u32 imp = msg_importance(&p_ptr->publ.phdr); 514 515 if (!p_ptr->publ.connected) 516 return NULL; 517 if (imp < TIPC_CRITICAL_IMPORTANCE) 518 imp++; 519 return port_build_proto_msg(p_ptr->publ.ref, 520 tipc_own_addr, 521 port_peerport(p_ptr), 522 port_peernode(p_ptr), 523 imp, 524 TIPC_CONN_MSG, 525 err, 526 p_ptr->last_in_seqno + 1, 527 0); 528} 529 530 531static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 532{ 533 u32 imp = msg_importance(&p_ptr->publ.phdr); 534 535 if (!p_ptr->publ.connected) 536 return NULL; 537 if (imp < TIPC_CRITICAL_IMPORTANCE) 538 imp++; 539 return port_build_proto_msg(port_peerport(p_ptr), 540 port_peernode(p_ptr), 541 p_ptr->publ.ref, 542 tipc_own_addr, 543 imp, 544 TIPC_CONN_MSG, 545 err, 546 port_out_seqno(p_ptr), 547 0); 548} 549 550void tipc_port_recv_proto_msg(struct sk_buff *buf) 551{ 552 struct tipc_msg *msg = buf_msg(buf); 553 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 554 u32 err = TIPC_OK; 555 struct sk_buff *r_buf = NULL; 556 struct sk_buff *abort_buf = NULL; 557 558 msg_dbg(msg, "PORT<RECV<:"); 559 560 if (!p_ptr) { 561 err = TIPC_ERR_NO_PORT; 562 } else if (p_ptr->publ.connected) { 563 if ((port_peernode(p_ptr) != msg_orignode(msg)) || 564 (port_peerport(p_ptr) != msg_origport(msg))) { 565 err = TIPC_ERR_NO_PORT; 566 } else if (msg_type(msg) == CONN_ACK) { 567 int wakeup = tipc_port_congested(p_ptr) && 568 p_ptr->publ.congested && 569 p_ptr->wakeup; 570 p_ptr->acked += msg_msgcnt(msg); 571 if (tipc_port_congested(p_ptr)) 572 goto exit; 573 p_ptr->publ.congested = 0; 574 if (!wakeup) 575 goto exit; 576 p_ptr->wakeup(&p_ptr->publ); 577 goto exit; 578 } 579 } else if (p_ptr->publ.published) { 580 err = TIPC_ERR_NO_PORT; 581 } 582 if (err) { 583 r_buf = port_build_proto_msg(msg_origport(msg), 584 msg_orignode(msg), 585 msg_destport(msg), 586 tipc_own_addr, 587 TIPC_HIGH_IMPORTANCE, 588 TIPC_CONN_MSG, 589 err, 590 0, 591 0); 592 goto exit; 593 } 594 595 /* All is fine */ 596 if (msg_type(msg) == CONN_PROBE) { 597 r_buf = port_build_proto_msg(msg_origport(msg), 598 msg_orignode(msg), 599 msg_destport(msg), 600 tipc_own_addr, 601 CONN_MANAGER, 602 CONN_PROBE_REPLY, 603 TIPC_OK, 604 port_out_seqno(p_ptr), 605 0); 606 } 607 p_ptr->probing_state = CONFIRMED; 608 port_incr_out_seqno(p_ptr); 609exit: 610 if (p_ptr) 611 tipc_port_unlock(p_ptr); 612 tipc_net_route_msg(r_buf); 613 tipc_net_route_msg(abort_buf); 614 buf_discard(buf); 615} 616 617static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 618{ 619 struct publication *publ; 620 621 if (full_id) 622 tipc_printf(buf, "<%u.%u.%u:%u>:", 623 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 624 tipc_node(tipc_own_addr), p_ptr->publ.ref); 625 else 626 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 627 628 if (p_ptr->publ.connected) { 629 u32 dport = port_peerport(p_ptr); 630 u32 destnode = port_peernode(p_ptr); 631 632 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 633 tipc_zone(destnode), tipc_cluster(destnode), 634 tipc_node(destnode), dport); 635 if (p_ptr->publ.conn_type != 0) 636 tipc_printf(buf, " via {%u,%u}", 637 p_ptr->publ.conn_type, 638 p_ptr->publ.conn_instance); 639 } 640 else if (p_ptr->publ.published) { 641 tipc_printf(buf, " bound to"); 642 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 643 if (publ->lower == publ->upper) 644 tipc_printf(buf, " {%u,%u}", publ->type, 645 publ->lower); 646 else 647 tipc_printf(buf, " {%u,%u,%u}", publ->type, 648 publ->lower, publ->upper); 649 } 650 } 651 tipc_printf(buf, "\n"); 652} 653 654#define MAX_PORT_QUERY 32768 655 656struct sk_buff *tipc_port_get_ports(void) 657{ 658 struct sk_buff *buf; 659 struct tlv_desc *rep_tlv; 660 struct print_buf pb; 661 struct port *p_ptr; 662 int str_len; 663 664 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 665 if (!buf) 666 return NULL; 667 rep_tlv = (struct tlv_desc *)buf->data; 668 669 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 670 spin_lock_bh(&tipc_port_list_lock); 671 list_for_each_entry(p_ptr, &ports, port_list) { 672 spin_lock_bh(p_ptr->publ.lock); 673 port_print(p_ptr, &pb, 0); 674 spin_unlock_bh(p_ptr->publ.lock); 675 } 676 spin_unlock_bh(&tipc_port_list_lock); 677 str_len = tipc_printbuf_validate(&pb); 678 679 skb_put(buf, TLV_SPACE(str_len)); 680 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 681 682 return buf; 683} 684 685void tipc_port_reinit(void) 686{ 687 struct port *p_ptr; 688 struct tipc_msg *msg; 689 690 spin_lock_bh(&tipc_port_list_lock); 691 list_for_each_entry(p_ptr, &ports, port_list) { 692 msg = &p_ptr->publ.phdr; 693 if (msg_orignode(msg) == tipc_own_addr) 694 break; 695 msg_set_prevnode(msg, tipc_own_addr); 696 msg_set_orignode(msg, tipc_own_addr); 697 } 698 spin_unlock_bh(&tipc_port_list_lock); 699} 700 701 702/* 703 * port_dispatcher_sigh(): Signal handler for messages destinated 704 * to the tipc_port interface. 705 */ 706 707static void port_dispatcher_sigh(void *dummy) 708{ 709 struct sk_buff *buf; 710 711 spin_lock_bh(&queue_lock); 712 buf = msg_queue_head; 713 msg_queue_head = NULL; 714 spin_unlock_bh(&queue_lock); 715 716 while (buf) { 717 struct port *p_ptr; 718 struct user_port *up_ptr; 719 struct tipc_portid orig; 720 struct tipc_name_seq dseq; 721 void *usr_handle; 722 int connected; 723 int published; 724 u32 message_type; 725 726 struct sk_buff *next = buf->next; 727 struct tipc_msg *msg = buf_msg(buf); 728 u32 dref = msg_destport(msg); 729 730 message_type = msg_type(msg); 731 if (message_type > TIPC_DIRECT_MSG) 732 goto reject; /* Unsupported message type */ 733 734 p_ptr = tipc_port_lock(dref); 735 if (!p_ptr) 736 goto reject; /* Port deleted while msg in queue */ 737 738 orig.ref = msg_origport(msg); 739 orig.node = msg_orignode(msg); 740 up_ptr = p_ptr->user_port; 741 usr_handle = up_ptr->usr_handle; 742 connected = p_ptr->publ.connected; 743 published = p_ptr->publ.published; 744 745 if (unlikely(msg_errcode(msg))) 746 goto err; 747 748 switch (message_type) { 749 750 case TIPC_CONN_MSG:{ 751 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 752 u32 peer_port = port_peerport(p_ptr); 753 u32 peer_node = port_peernode(p_ptr); 754 755 tipc_port_unlock(p_ptr); 756 if (unlikely(!cb)) 757 goto reject; 758 if (unlikely(!connected)) { 759 if (tipc_connect2port(dref, &orig)) 760 goto reject; 761 } else if ((msg_origport(msg) != peer_port) || 762 (msg_orignode(msg) != peer_node)) 763 goto reject; 764 if (unlikely(++p_ptr->publ.conn_unacked >= 765 TIPC_FLOW_CONTROL_WIN)) 766 tipc_acknowledge(dref, 767 p_ptr->publ.conn_unacked); 768 skb_pull(buf, msg_hdr_sz(msg)); 769 cb(usr_handle, dref, &buf, msg_data(msg), 770 msg_data_sz(msg)); 771 break; 772 } 773 case TIPC_DIRECT_MSG:{ 774 tipc_msg_event cb = up_ptr->msg_cb; 775 776 tipc_port_unlock(p_ptr); 777 if (unlikely(!cb || connected)) 778 goto reject; 779 skb_pull(buf, msg_hdr_sz(msg)); 780 cb(usr_handle, dref, &buf, msg_data(msg), 781 msg_data_sz(msg), msg_importance(msg), 782 &orig); 783 break; 784 } 785 case TIPC_MCAST_MSG: 786 case TIPC_NAMED_MSG:{ 787 tipc_named_msg_event cb = up_ptr->named_msg_cb; 788 789 tipc_port_unlock(p_ptr); 790 if (unlikely(!cb || connected || !published)) 791 goto reject; 792 dseq.type = msg_nametype(msg); 793 dseq.lower = msg_nameinst(msg); 794 dseq.upper = (message_type == TIPC_NAMED_MSG) 795 ? dseq.lower : msg_nameupper(msg); 796 skb_pull(buf, msg_hdr_sz(msg)); 797 cb(usr_handle, dref, &buf, msg_data(msg), 798 msg_data_sz(msg), msg_importance(msg), 799 &orig, &dseq); 800 break; 801 } 802 } 803 if (buf) 804 buf_discard(buf); 805 buf = next; 806 continue; 807err: 808 switch (message_type) { 809 810 case TIPC_CONN_MSG:{ 811 tipc_conn_shutdown_event cb = 812 up_ptr->conn_err_cb; 813 u32 peer_port = port_peerport(p_ptr); 814 u32 peer_node = port_peernode(p_ptr); 815 816 tipc_port_unlock(p_ptr); 817 if (!cb || !connected) 818 break; 819 if ((msg_origport(msg) != peer_port) || 820 (msg_orignode(msg) != peer_node)) 821 break; 822 tipc_disconnect(dref); 823 skb_pull(buf, msg_hdr_sz(msg)); 824 cb(usr_handle, dref, &buf, msg_data(msg), 825 msg_data_sz(msg), msg_errcode(msg)); 826 break; 827 } 828 case TIPC_DIRECT_MSG:{ 829 tipc_msg_err_event cb = up_ptr->err_cb; 830 831 tipc_port_unlock(p_ptr); 832 if (!cb || connected) 833 break; 834 skb_pull(buf, msg_hdr_sz(msg)); 835 cb(usr_handle, dref, &buf, msg_data(msg), 836 msg_data_sz(msg), msg_errcode(msg), &orig); 837 break; 838 } 839 case TIPC_MCAST_MSG: 840 case TIPC_NAMED_MSG:{ 841 tipc_named_msg_err_event cb = 842 up_ptr->named_err_cb; 843 844 tipc_port_unlock(p_ptr); 845 if (!cb || connected) 846 break; 847 dseq.type = msg_nametype(msg); 848 dseq.lower = msg_nameinst(msg); 849 dseq.upper = (message_type == TIPC_NAMED_MSG) 850 ? dseq.lower : msg_nameupper(msg); 851 skb_pull(buf, msg_hdr_sz(msg)); 852 cb(usr_handle, dref, &buf, msg_data(msg), 853 msg_data_sz(msg), msg_errcode(msg), &dseq); 854 break; 855 } 856 } 857 if (buf) 858 buf_discard(buf); 859 buf = next; 860 continue; 861reject: 862 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 863 buf = next; 864 } 865} 866 867/* 868 * port_dispatcher(): Dispatcher for messages destinated 869 * to the tipc_port interface. Called with port locked. 870 */ 871 872static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 873{ 874 buf->next = NULL; 875 spin_lock_bh(&queue_lock); 876 if (msg_queue_head) { 877 msg_queue_tail->next = buf; 878 msg_queue_tail = buf; 879 } else { 880 msg_queue_tail = msg_queue_head = buf; 881 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 882 } 883 spin_unlock_bh(&queue_lock); 884 return 0; 885} 886 887/* 888 * Wake up port after congestion: Called with port locked, 889 * 890 */ 891 892static void port_wakeup_sh(unsigned long ref) 893{ 894 struct port *p_ptr; 895 struct user_port *up_ptr; 896 tipc_continue_event cb = NULL; 897 void *uh = NULL; 898 899 p_ptr = tipc_port_lock(ref); 900 if (p_ptr) { 901 up_ptr = p_ptr->user_port; 902 if (up_ptr) { 903 cb = up_ptr->continue_event_cb; 904 uh = up_ptr->usr_handle; 905 } 906 tipc_port_unlock(p_ptr); 907 } 908 if (cb) 909 cb(uh, ref); 910} 911 912 913static void port_wakeup(struct tipc_port *p_ptr) 914{ 915 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 916} 917 918void tipc_acknowledge(u32 ref, u32 ack) 919{ 920 struct port *p_ptr; 921 struct sk_buff *buf = NULL; 922 923 p_ptr = tipc_port_lock(ref); 924 if (!p_ptr) 925 return; 926 if (p_ptr->publ.connected) { 927 p_ptr->publ.conn_unacked -= ack; 928 buf = port_build_proto_msg(port_peerport(p_ptr), 929 port_peernode(p_ptr), 930 ref, 931 tipc_own_addr, 932 CONN_MANAGER, 933 CONN_ACK, 934 TIPC_OK, 935 port_out_seqno(p_ptr), 936 ack); 937 } 938 tipc_port_unlock(p_ptr); 939 tipc_net_route_msg(buf); 940} 941 942/* 943 * tipc_createport(): user level call. Will add port to 944 * registry if non-zero user_ref. 945 */ 946 947int tipc_createport(u32 user_ref, 948 void *usr_handle, 949 unsigned int importance, 950 tipc_msg_err_event error_cb, 951 tipc_named_msg_err_event named_error_cb, 952 tipc_conn_shutdown_event conn_error_cb, 953 tipc_msg_event msg_cb, 954 tipc_named_msg_event named_msg_cb, 955 tipc_conn_msg_event conn_msg_cb, 956 tipc_continue_event continue_event_cb,/* May be zero */ 957 u32 *portref) 958{ 959 struct user_port *up_ptr; 960 struct port *p_ptr; 961 962 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 963 if (!up_ptr) { 964 warn("Port creation failed, no memory\n"); 965 return -ENOMEM; 966 } 967 p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher, 968 port_wakeup, importance); 969 if (!p_ptr) { 970 kfree(up_ptr); 971 return -ENOMEM; 972 } 973 974 p_ptr->user_port = up_ptr; 975 up_ptr->user_ref = user_ref; 976 up_ptr->usr_handle = usr_handle; 977 up_ptr->ref = p_ptr->publ.ref; 978 up_ptr->err_cb = error_cb; 979 up_ptr->named_err_cb = named_error_cb; 980 up_ptr->conn_err_cb = conn_error_cb; 981 up_ptr->msg_cb = msg_cb; 982 up_ptr->named_msg_cb = named_msg_cb; 983 up_ptr->conn_msg_cb = conn_msg_cb; 984 up_ptr->continue_event_cb = continue_event_cb; 985 INIT_LIST_HEAD(&up_ptr->uport_list); 986 tipc_reg_add_port(up_ptr); 987 *portref = p_ptr->publ.ref; 988 tipc_port_unlock(p_ptr); 989 return 0; 990} 991 992int tipc_ownidentity(u32 ref, struct tipc_portid *id) 993{ 994 id->ref = ref; 995 id->node = tipc_own_addr; 996 return 0; 997} 998 999int tipc_portimportance(u32 ref, unsigned int *importance) 1000{ 1001 struct port *p_ptr; 1002 1003 p_ptr = tipc_port_lock(ref); 1004 if (!p_ptr) 1005 return -EINVAL; 1006 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 1007 tipc_port_unlock(p_ptr); 1008 return 0; 1009} 1010 1011int tipc_set_portimportance(u32 ref, unsigned int imp) 1012{ 1013 struct port *p_ptr; 1014 1015 if (imp > TIPC_CRITICAL_IMPORTANCE) 1016 return -EINVAL; 1017 1018 p_ptr = tipc_port_lock(ref); 1019 if (!p_ptr) 1020 return -EINVAL; 1021 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 1022 tipc_port_unlock(p_ptr); 1023 return 0; 1024} 1025 1026 1027int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1028{ 1029 struct port *p_ptr; 1030 struct publication *publ; 1031 u32 key; 1032 int res = -EINVAL; 1033 1034 p_ptr = tipc_port_lock(ref); 1035 if (!p_ptr) 1036 return -EINVAL; 1037 1038 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " 1039 "lower = %u, upper = %u\n", 1040 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper); 1041 if (p_ptr->publ.connected) 1042 goto exit; 1043 if (seq->lower > seq->upper) 1044 goto exit; 1045 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 1046 goto exit; 1047 key = ref + p_ptr->pub_count + 1; 1048 if (key == ref) { 1049 res = -EADDRINUSE; 1050 goto exit; 1051 } 1052 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1053 scope, p_ptr->publ.ref, key); 1054 if (publ) { 1055 list_add(&publ->pport_list, &p_ptr->publications); 1056 p_ptr->pub_count++; 1057 p_ptr->publ.published = 1; 1058 res = 0; 1059 } 1060exit: 1061 tipc_port_unlock(p_ptr); 1062 return res; 1063} 1064 1065int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1066{ 1067 struct port *p_ptr; 1068 struct publication *publ; 1069 struct publication *tpubl; 1070 int res = -EINVAL; 1071 1072 p_ptr = tipc_port_lock(ref); 1073 if (!p_ptr) 1074 return -EINVAL; 1075 if (!seq) { 1076 list_for_each_entry_safe(publ, tpubl, 1077 &p_ptr->publications, pport_list) { 1078 tipc_nametbl_withdraw(publ->type, publ->lower, 1079 publ->ref, publ->key); 1080 } 1081 res = 0; 1082 } else { 1083 list_for_each_entry_safe(publ, tpubl, 1084 &p_ptr->publications, pport_list) { 1085 if (publ->scope != scope) 1086 continue; 1087 if (publ->type != seq->type) 1088 continue; 1089 if (publ->lower != seq->lower) 1090 continue; 1091 if (publ->upper != seq->upper) 1092 break; 1093 tipc_nametbl_withdraw(publ->type, publ->lower, 1094 publ->ref, publ->key); 1095 res = 0; 1096 break; 1097 } 1098 } 1099 if (list_empty(&p_ptr->publications)) 1100 p_ptr->publ.published = 0; 1101 tipc_port_unlock(p_ptr); 1102 return res; 1103} 1104 1105int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1106{ 1107 struct port *p_ptr; 1108 struct tipc_msg *msg; 1109 int res = -EINVAL; 1110 1111 p_ptr = tipc_port_lock(ref); 1112 if (!p_ptr) 1113 return -EINVAL; 1114 if (p_ptr->publ.published || p_ptr->publ.connected) 1115 goto exit; 1116 if (!peer->ref) 1117 goto exit; 1118 1119 msg = &p_ptr->publ.phdr; 1120 msg_set_destnode(msg, peer->node); 1121 msg_set_destport(msg, peer->ref); 1122 msg_set_orignode(msg, tipc_own_addr); 1123 msg_set_origport(msg, p_ptr->publ.ref); 1124 msg_set_transp_seqno(msg, 42); 1125 msg_set_type(msg, TIPC_CONN_MSG); 1126 if (!may_route(peer->node)) 1127 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1128 else 1129 msg_set_hdr_sz(msg, LONG_H_SIZE); 1130 1131 p_ptr->probing_interval = PROBING_INTERVAL; 1132 p_ptr->probing_state = CONFIRMED; 1133 p_ptr->publ.connected = 1; 1134 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1135 1136 tipc_nodesub_subscribe(&p_ptr->subscription,peer->node, 1137 (void *)(unsigned long)ref, 1138 (net_ev_handler)port_handle_node_down); 1139 res = 0; 1140exit: 1141 tipc_port_unlock(p_ptr); 1142 p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1143 return res; 1144} 1145 1146/** 1147 * tipc_disconnect_port - disconnect port from peer 1148 * 1149 * Port must be locked. 1150 */ 1151 1152int tipc_disconnect_port(struct tipc_port *tp_ptr) 1153{ 1154 int res; 1155 1156 if (tp_ptr->connected) { 1157 tp_ptr->connected = 0; 1158 /* let timer expire on it's own to avoid deadlock! */ 1159 tipc_nodesub_unsubscribe( 1160 &((struct port *)tp_ptr)->subscription); 1161 res = 0; 1162 } else { 1163 res = -ENOTCONN; 1164 } 1165 return res; 1166} 1167 1168/* 1169 * tipc_disconnect(): Disconnect port form peer. 1170 * This is a node local operation. 1171 */ 1172 1173int tipc_disconnect(u32 ref) 1174{ 1175 struct port *p_ptr; 1176 int res; 1177 1178 p_ptr = tipc_port_lock(ref); 1179 if (!p_ptr) 1180 return -EINVAL; 1181 res = tipc_disconnect_port((struct tipc_port *)p_ptr); 1182 tipc_port_unlock(p_ptr); 1183 return res; 1184} 1185 1186/* 1187 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1188 */ 1189int tipc_shutdown(u32 ref) 1190{ 1191 struct port *p_ptr; 1192 struct sk_buff *buf = NULL; 1193 1194 p_ptr = tipc_port_lock(ref); 1195 if (!p_ptr) 1196 return -EINVAL; 1197 1198 if (p_ptr->publ.connected) { 1199 u32 imp = msg_importance(&p_ptr->publ.phdr); 1200 if (imp < TIPC_CRITICAL_IMPORTANCE) 1201 imp++; 1202 buf = port_build_proto_msg(port_peerport(p_ptr), 1203 port_peernode(p_ptr), 1204 ref, 1205 tipc_own_addr, 1206 imp, 1207 TIPC_CONN_MSG, 1208 TIPC_CONN_SHUTDOWN, 1209 port_out_seqno(p_ptr), 1210 0); 1211 } 1212 tipc_port_unlock(p_ptr); 1213 tipc_net_route_msg(buf); 1214 return tipc_disconnect(ref); 1215} 1216 1217/* 1218 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1219 * message for this node. 1220 */ 1221 1222static int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1223 struct iovec const *msg_sect) 1224{ 1225 struct sk_buff *buf; 1226 int res; 1227 1228 res = tipc_msg_build(&sender->publ.phdr, msg_sect, num_sect, 1229 MAX_MSG_SIZE, !sender->user_port, &buf); 1230 if (likely(buf)) 1231 tipc_port_recv_msg(buf); 1232 return res; 1233} 1234 1235/** 1236 * tipc_send - send message sections on connection 1237 */ 1238 1239int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1240{ 1241 struct port *p_ptr; 1242 u32 destnode; 1243 int res; 1244 1245 p_ptr = tipc_port_deref(ref); 1246 if (!p_ptr || !p_ptr->publ.connected) 1247 return -EINVAL; 1248 1249 p_ptr->publ.congested = 1; 1250 if (!tipc_port_congested(p_ptr)) { 1251 destnode = port_peernode(p_ptr); 1252 if (likely(destnode != tipc_own_addr)) 1253 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1254 destnode); 1255 else 1256 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1257 1258 if (likely(res != -ELINKCONG)) { 1259 port_incr_out_seqno(p_ptr); 1260 p_ptr->publ.congested = 0; 1261 p_ptr->sent++; 1262 return res; 1263 } 1264 } 1265 if (port_unreliable(p_ptr)) { 1266 p_ptr->publ.congested = 0; 1267 /* Just calculate msg length and return */ 1268 return tipc_msg_calc_data_size(msg_sect, num_sect); 1269 } 1270 return -ELINKCONG; 1271} 1272 1273/** 1274 * tipc_forward2name - forward message sections to port name 1275 */ 1276 1277static int tipc_forward2name(u32 ref, 1278 struct tipc_name const *name, 1279 u32 domain, 1280 u32 num_sect, 1281 struct iovec const *msg_sect, 1282 struct tipc_portid const *orig, 1283 unsigned int importance) 1284{ 1285 struct port *p_ptr; 1286 struct tipc_msg *msg; 1287 u32 destnode = domain; 1288 u32 destport; 1289 int res; 1290 1291 p_ptr = tipc_port_deref(ref); 1292 if (!p_ptr || p_ptr->publ.connected) 1293 return -EINVAL; 1294 1295 msg = &p_ptr->publ.phdr; 1296 msg_set_type(msg, TIPC_NAMED_MSG); 1297 msg_set_orignode(msg, orig->node); 1298 msg_set_origport(msg, orig->ref); 1299 msg_set_hdr_sz(msg, LONG_H_SIZE); 1300 msg_set_nametype(msg, name->type); 1301 msg_set_nameinst(msg, name->instance); 1302 msg_set_lookup_scope(msg, tipc_addr_scope(domain)); 1303 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1304 msg_set_importance(msg,importance); 1305 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1306 msg_set_destnode(msg, destnode); 1307 msg_set_destport(msg, destport); 1308 1309 if (likely(destport)) { 1310 p_ptr->sent++; 1311 if (likely(destnode == tipc_own_addr)) 1312 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1313 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1314 destnode); 1315 if (likely(res != -ELINKCONG)) 1316 return res; 1317 if (port_unreliable(p_ptr)) { 1318 /* Just calculate msg length and return */ 1319 return tipc_msg_calc_data_size(msg_sect, num_sect); 1320 } 1321 return -ELINKCONG; 1322 } 1323 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1324 TIPC_ERR_NO_NAME); 1325} 1326 1327/** 1328 * tipc_send2name - send message sections to port name 1329 */ 1330 1331int tipc_send2name(u32 ref, 1332 struct tipc_name const *name, 1333 unsigned int domain, 1334 unsigned int num_sect, 1335 struct iovec const *msg_sect) 1336{ 1337 struct tipc_portid orig; 1338 1339 orig.ref = ref; 1340 orig.node = tipc_own_addr; 1341 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig, 1342 TIPC_PORT_IMPORTANCE); 1343} 1344 1345/** 1346 * tipc_forward2port - forward message sections to port identity 1347 */ 1348 1349static int tipc_forward2port(u32 ref, 1350 struct tipc_portid const *dest, 1351 unsigned int num_sect, 1352 struct iovec const *msg_sect, 1353 struct tipc_portid const *orig, 1354 unsigned int importance) 1355{ 1356 struct port *p_ptr; 1357 struct tipc_msg *msg; 1358 int res; 1359 1360 p_ptr = tipc_port_deref(ref); 1361 if (!p_ptr || p_ptr->publ.connected) 1362 return -EINVAL; 1363 1364 msg = &p_ptr->publ.phdr; 1365 msg_set_type(msg, TIPC_DIRECT_MSG); 1366 msg_set_orignode(msg, orig->node); 1367 msg_set_origport(msg, orig->ref); 1368 msg_set_destnode(msg, dest->node); 1369 msg_set_destport(msg, dest->ref); 1370 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1371 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1372 msg_set_importance(msg, importance); 1373 p_ptr->sent++; 1374 if (dest->node == tipc_own_addr) 1375 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1376 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1377 if (likely(res != -ELINKCONG)) 1378 return res; 1379 if (port_unreliable(p_ptr)) { 1380 /* Just calculate msg length and return */ 1381 return tipc_msg_calc_data_size(msg_sect, num_sect); 1382 } 1383 return -ELINKCONG; 1384} 1385 1386/** 1387 * tipc_send2port - send message sections to port identity 1388 */ 1389 1390int tipc_send2port(u32 ref, 1391 struct tipc_portid const *dest, 1392 unsigned int num_sect, 1393 struct iovec const *msg_sect) 1394{ 1395 struct tipc_portid orig; 1396 1397 orig.ref = ref; 1398 orig.node = tipc_own_addr; 1399 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 1400 TIPC_PORT_IMPORTANCE); 1401} 1402 1403/** 1404 * tipc_forward_buf2port - forward message buffer to port identity 1405 */ 1406static int tipc_forward_buf2port(u32 ref, 1407 struct tipc_portid const *dest, 1408 struct sk_buff *buf, 1409 unsigned int dsz, 1410 struct tipc_portid const *orig, 1411 unsigned int importance) 1412{ 1413 struct port *p_ptr; 1414 struct tipc_msg *msg; 1415 int res; 1416 1417 p_ptr = (struct port *)tipc_ref_deref(ref); 1418 if (!p_ptr || p_ptr->publ.connected) 1419 return -EINVAL; 1420 1421 msg = &p_ptr->publ.phdr; 1422 msg_set_type(msg, TIPC_DIRECT_MSG); 1423 msg_set_orignode(msg, orig->node); 1424 msg_set_origport(msg, orig->ref); 1425 msg_set_destnode(msg, dest->node); 1426 msg_set_destport(msg, dest->ref); 1427 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1428 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1429 msg_set_importance(msg, importance); 1430 msg_set_size(msg, DIR_MSG_H_SIZE + dsz); 1431 if (skb_cow(buf, DIR_MSG_H_SIZE)) 1432 return -ENOMEM; 1433 1434 skb_push(buf, DIR_MSG_H_SIZE); 1435 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE); 1436 msg_dbg(msg, "buf2port: "); 1437 p_ptr->sent++; 1438 if (dest->node == tipc_own_addr) 1439 return tipc_port_recv_msg(buf); 1440 res = tipc_send_buf_fast(buf, dest->node); 1441 if (likely(res != -ELINKCONG)) 1442 return res; 1443 if (port_unreliable(p_ptr)) 1444 return dsz; 1445 return -ELINKCONG; 1446} 1447 1448/** 1449 * tipc_send_buf2port - send message buffer to port identity 1450 */ 1451 1452int tipc_send_buf2port(u32 ref, 1453 struct tipc_portid const *dest, 1454 struct sk_buff *buf, 1455 unsigned int dsz) 1456{ 1457 struct tipc_portid orig; 1458 1459 orig.ref = ref; 1460 orig.node = tipc_own_addr; 1461 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 1462 TIPC_PORT_IMPORTANCE); 1463} 1464