Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v3.2-rc3 1329 lines 32 kB view raw
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2007, Ericsson AB 5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "port.h" 40#include "name_table.h" 41 42/* Connection management: */ 43#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 44#define CONFIRMED 0 45#define PROBING 1 46 47#define MAX_REJECT_SIZE 1024 48 49static struct sk_buff *msg_queue_head; 50static struct sk_buff *msg_queue_tail; 51 52DEFINE_SPINLOCK(tipc_port_list_lock); 53static DEFINE_SPINLOCK(queue_lock); 54 55static LIST_HEAD(ports); 56static void port_handle_node_down(unsigned long ref); 57static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err); 58static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err); 59static void port_timeout(unsigned long ref); 60 61 62static u32 port_peernode(struct tipc_port *p_ptr) 63{ 64 return msg_destnode(&p_ptr->phdr); 65} 66 67static u32 port_peerport(struct tipc_port *p_ptr) 68{ 69 return msg_destport(&p_ptr->phdr); 70} 71 72/** 73 * tipc_multicast - send a multicast message to local and remote destinations 74 */ 75 76int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, 77 u32 num_sect, struct iovec const *msg_sect, 78 unsigned int total_len) 79{ 80 struct tipc_msg *hdr; 81 struct sk_buff *buf; 82 struct sk_buff *ibuf = NULL; 83 struct port_list dports = {0, NULL, }; 84 struct tipc_port *oport = tipc_port_deref(ref); 85 int ext_targets; 86 int res; 87 88 if (unlikely(!oport)) 89 return -EINVAL; 90 91 /* Create multicast message */ 92 93 hdr = &oport->phdr; 94 msg_set_type(hdr, TIPC_MCAST_MSG); 95 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); 96 msg_set_destport(hdr, 0); 97 msg_set_destnode(hdr, 0); 98 msg_set_nametype(hdr, seq->type); 99 msg_set_namelower(hdr, seq->lower); 100 msg_set_nameupper(hdr, seq->upper); 101 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 102 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, 103 !oport->user_port, &buf); 104 if (unlikely(!buf)) 105 return res; 106 107 /* Figure out where to send multicast message */ 108 109 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 110 TIPC_NODE_SCOPE, &dports); 111 112 /* Send message to destinations (duplicate it only if necessary) */ 113 114 if (ext_targets) { 115 if (dports.count != 0) { 116 ibuf = skb_copy(buf, GFP_ATOMIC); 117 if (ibuf == NULL) { 118 tipc_port_list_free(&dports); 119 buf_discard(buf); 120 return -ENOMEM; 121 } 122 } 123 res = tipc_bclink_send_msg(buf); 124 if ((res < 0) && (dports.count != 0)) 125 buf_discard(ibuf); 126 } else { 127 ibuf = buf; 128 } 129 130 if (res >= 0) { 131 if (ibuf) 132 tipc_port_recv_mcast(ibuf, &dports); 133 } else { 134 tipc_port_list_free(&dports); 135 } 136 return res; 137} 138 139/** 140 * tipc_port_recv_mcast - deliver multicast message to all destination ports 141 * 142 * If there is no port list, perform a lookup to create one 143 */ 144 145void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 146{ 147 struct tipc_msg *msg; 148 struct port_list dports = {0, NULL, }; 149 struct port_list *item = dp; 150 int cnt = 0; 151 152 msg = buf_msg(buf); 153 154 /* Create destination port list, if one wasn't supplied */ 155 156 if (dp == NULL) { 157 tipc_nametbl_mc_translate(msg_nametype(msg), 158 msg_namelower(msg), 159 msg_nameupper(msg), 160 TIPC_CLUSTER_SCOPE, 161 &dports); 162 item = dp = &dports; 163 } 164 165 /* Deliver a copy of message to each destination port */ 166 167 if (dp->count != 0) { 168 msg_set_destnode(msg, tipc_own_addr); 169 if (dp->count == 1) { 170 msg_set_destport(msg, dp->ports[0]); 171 tipc_port_recv_msg(buf); 172 tipc_port_list_free(dp); 173 return; 174 } 175 for (; cnt < dp->count; cnt++) { 176 int index = cnt % PLSIZE; 177 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 178 179 if (b == NULL) { 180 warn("Unable to deliver multicast message(s)\n"); 181 goto exit; 182 } 183 if ((index == 0) && (cnt != 0)) 184 item = item->next; 185 msg_set_destport(buf_msg(b), item->ports[index]); 186 tipc_port_recv_msg(b); 187 } 188 } 189exit: 190 buf_discard(buf); 191 tipc_port_list_free(dp); 192} 193 194/** 195 * tipc_createport_raw - create a generic TIPC port 196 * 197 * Returns pointer to (locked) TIPC port, or NULL if unable to create it 198 */ 199 200struct tipc_port *tipc_createport_raw(void *usr_handle, 201 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 202 void (*wakeup)(struct tipc_port *), 203 const u32 importance) 204{ 205 struct tipc_port *p_ptr; 206 struct tipc_msg *msg; 207 u32 ref; 208 209 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 210 if (!p_ptr) { 211 warn("Port creation failed, no memory\n"); 212 return NULL; 213 } 214 ref = tipc_ref_acquire(p_ptr, &p_ptr->lock); 215 if (!ref) { 216 warn("Port creation failed, reference table exhausted\n"); 217 kfree(p_ptr); 218 return NULL; 219 } 220 221 p_ptr->usr_handle = usr_handle; 222 p_ptr->max_pkt = MAX_PKT_DEFAULT; 223 p_ptr->ref = ref; 224 msg = &p_ptr->phdr; 225 tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0); 226 msg_set_origport(msg, ref); 227 INIT_LIST_HEAD(&p_ptr->wait_list); 228 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 229 p_ptr->dispatcher = dispatcher; 230 p_ptr->wakeup = wakeup; 231 p_ptr->user_port = NULL; 232 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 233 spin_lock_bh(&tipc_port_list_lock); 234 INIT_LIST_HEAD(&p_ptr->publications); 235 INIT_LIST_HEAD(&p_ptr->port_list); 236 list_add_tail(&p_ptr->port_list, &ports); 237 spin_unlock_bh(&tipc_port_list_lock); 238 return p_ptr; 239} 240 241int tipc_deleteport(u32 ref) 242{ 243 struct tipc_port *p_ptr; 244 struct sk_buff *buf = NULL; 245 246 tipc_withdraw(ref, 0, NULL); 247 p_ptr = tipc_port_lock(ref); 248 if (!p_ptr) 249 return -EINVAL; 250 251 tipc_ref_discard(ref); 252 tipc_port_unlock(p_ptr); 253 254 k_cancel_timer(&p_ptr->timer); 255 if (p_ptr->connected) { 256 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 257 tipc_nodesub_unsubscribe(&p_ptr->subscription); 258 } 259 kfree(p_ptr->user_port); 260 261 spin_lock_bh(&tipc_port_list_lock); 262 list_del(&p_ptr->port_list); 263 list_del(&p_ptr->wait_list); 264 spin_unlock_bh(&tipc_port_list_lock); 265 k_term_timer(&p_ptr->timer); 266 kfree(p_ptr); 267 tipc_net_route_msg(buf); 268 return 0; 269} 270 271static int port_unreliable(struct tipc_port *p_ptr) 272{ 273 return msg_src_droppable(&p_ptr->phdr); 274} 275 276int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 277{ 278 struct tipc_port *p_ptr; 279 280 p_ptr = tipc_port_lock(ref); 281 if (!p_ptr) 282 return -EINVAL; 283 *isunreliable = port_unreliable(p_ptr); 284 tipc_port_unlock(p_ptr); 285 return 0; 286} 287 288int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 289{ 290 struct tipc_port *p_ptr; 291 292 p_ptr = tipc_port_lock(ref); 293 if (!p_ptr) 294 return -EINVAL; 295 msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0)); 296 tipc_port_unlock(p_ptr); 297 return 0; 298} 299 300static int port_unreturnable(struct tipc_port *p_ptr) 301{ 302 return msg_dest_droppable(&p_ptr->phdr); 303} 304 305int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 306{ 307 struct tipc_port *p_ptr; 308 309 p_ptr = tipc_port_lock(ref); 310 if (!p_ptr) 311 return -EINVAL; 312 *isunrejectable = port_unreturnable(p_ptr); 313 tipc_port_unlock(p_ptr); 314 return 0; 315} 316 317int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 318{ 319 struct tipc_port *p_ptr; 320 321 p_ptr = tipc_port_lock(ref); 322 if (!p_ptr) 323 return -EINVAL; 324 msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0)); 325 tipc_port_unlock(p_ptr); 326 return 0; 327} 328 329/* 330 * port_build_proto_msg(): create connection protocol message for port 331 * 332 * On entry the port must be locked and connected. 333 */ 334static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr, 335 u32 type, u32 ack) 336{ 337 struct sk_buff *buf; 338 struct tipc_msg *msg; 339 340 buf = tipc_buf_acquire(INT_H_SIZE); 341 if (buf) { 342 msg = buf_msg(buf); 343 tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE, 344 port_peernode(p_ptr)); 345 msg_set_destport(msg, port_peerport(p_ptr)); 346 msg_set_origport(msg, p_ptr->ref); 347 msg_set_msgcnt(msg, ack); 348 } 349 return buf; 350} 351 352int tipc_reject_msg(struct sk_buff *buf, u32 err) 353{ 354 struct tipc_msg *msg = buf_msg(buf); 355 struct sk_buff *rbuf; 356 struct tipc_msg *rmsg; 357 int hdr_sz; 358 u32 imp; 359 u32 data_sz = msg_data_sz(msg); 360 u32 src_node; 361 u32 rmsg_sz; 362 363 /* discard rejected message if it shouldn't be returned to sender */ 364 365 if (WARN(!msg_isdata(msg), 366 "attempt to reject message with user=%u", msg_user(msg))) { 367 dump_stack(); 368 goto exit; 369 } 370 if (msg_errcode(msg) || msg_dest_droppable(msg)) 371 goto exit; 372 373 /* 374 * construct returned message by copying rejected message header and 375 * data (or subset), then updating header fields that need adjusting 376 */ 377 378 hdr_sz = msg_hdr_sz(msg); 379 rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE); 380 381 rbuf = tipc_buf_acquire(rmsg_sz); 382 if (rbuf == NULL) 383 goto exit; 384 385 rmsg = buf_msg(rbuf); 386 skb_copy_to_linear_data(rbuf, msg, rmsg_sz); 387 388 if (msg_connected(rmsg)) { 389 imp = msg_importance(rmsg); 390 if (imp < TIPC_CRITICAL_IMPORTANCE) 391 msg_set_importance(rmsg, ++imp); 392 } 393 msg_set_non_seq(rmsg, 0); 394 msg_set_size(rmsg, rmsg_sz); 395 msg_set_errcode(rmsg, err); 396 msg_set_prevnode(rmsg, tipc_own_addr); 397 msg_swap_words(rmsg, 4, 5); 398 if (!msg_short(rmsg)) 399 msg_swap_words(rmsg, 6, 7); 400 401 /* send self-abort message when rejecting on a connected port */ 402 if (msg_connected(msg)) { 403 struct sk_buff *abuf = NULL; 404 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg)); 405 406 if (p_ptr) { 407 if (p_ptr->connected) 408 abuf = port_build_self_abort_msg(p_ptr, err); 409 tipc_port_unlock(p_ptr); 410 } 411 tipc_net_route_msg(abuf); 412 } 413 414 /* send returned message & dispose of rejected message */ 415 416 src_node = msg_prevnode(msg); 417 if (src_node == tipc_own_addr) 418 tipc_port_recv_msg(rbuf); 419 else 420 tipc_link_send(rbuf, src_node, msg_link_selector(rmsg)); 421exit: 422 buf_discard(buf); 423 return data_sz; 424} 425 426int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, 427 struct iovec const *msg_sect, u32 num_sect, 428 unsigned int total_len, int err) 429{ 430 struct sk_buff *buf; 431 int res; 432 433 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE, 434 !p_ptr->user_port, &buf); 435 if (!buf) 436 return res; 437 438 return tipc_reject_msg(buf, err); 439} 440 441static void port_timeout(unsigned long ref) 442{ 443 struct tipc_port *p_ptr = tipc_port_lock(ref); 444 struct sk_buff *buf = NULL; 445 446 if (!p_ptr) 447 return; 448 449 if (!p_ptr->connected) { 450 tipc_port_unlock(p_ptr); 451 return; 452 } 453 454 /* Last probe answered ? */ 455 if (p_ptr->probing_state == PROBING) { 456 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 457 } else { 458 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0); 459 p_ptr->probing_state = PROBING; 460 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 461 } 462 tipc_port_unlock(p_ptr); 463 tipc_net_route_msg(buf); 464} 465 466 467static void port_handle_node_down(unsigned long ref) 468{ 469 struct tipc_port *p_ptr = tipc_port_lock(ref); 470 struct sk_buff *buf = NULL; 471 472 if (!p_ptr) 473 return; 474 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 475 tipc_port_unlock(p_ptr); 476 tipc_net_route_msg(buf); 477} 478 479 480static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err) 481{ 482 struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err); 483 484 if (buf) { 485 struct tipc_msg *msg = buf_msg(buf); 486 msg_swap_words(msg, 4, 5); 487 msg_swap_words(msg, 6, 7); 488 } 489 return buf; 490} 491 492 493static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err) 494{ 495 struct sk_buff *buf; 496 struct tipc_msg *msg; 497 u32 imp; 498 499 if (!p_ptr->connected) 500 return NULL; 501 502 buf = tipc_buf_acquire(BASIC_H_SIZE); 503 if (buf) { 504 msg = buf_msg(buf); 505 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE); 506 msg_set_hdr_sz(msg, BASIC_H_SIZE); 507 msg_set_size(msg, BASIC_H_SIZE); 508 imp = msg_importance(msg); 509 if (imp < TIPC_CRITICAL_IMPORTANCE) 510 msg_set_importance(msg, ++imp); 511 msg_set_errcode(msg, err); 512 } 513 return buf; 514} 515 516void tipc_port_recv_proto_msg(struct sk_buff *buf) 517{ 518 struct tipc_msg *msg = buf_msg(buf); 519 struct tipc_port *p_ptr; 520 struct sk_buff *r_buf = NULL; 521 u32 orignode = msg_orignode(msg); 522 u32 origport = msg_origport(msg); 523 u32 destport = msg_destport(msg); 524 int wakeable; 525 526 /* Validate connection */ 527 528 p_ptr = tipc_port_lock(destport); 529 if (!p_ptr || !p_ptr->connected || 530 (port_peernode(p_ptr) != orignode) || 531 (port_peerport(p_ptr) != origport)) { 532 r_buf = tipc_buf_acquire(BASIC_H_SIZE); 533 if (r_buf) { 534 msg = buf_msg(r_buf); 535 tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG, 536 BASIC_H_SIZE, orignode); 537 msg_set_errcode(msg, TIPC_ERR_NO_PORT); 538 msg_set_origport(msg, destport); 539 msg_set_destport(msg, origport); 540 } 541 if (p_ptr) 542 tipc_port_unlock(p_ptr); 543 goto exit; 544 } 545 546 /* Process protocol message sent by peer */ 547 548 switch (msg_type(msg)) { 549 case CONN_ACK: 550 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested && 551 p_ptr->wakeup; 552 p_ptr->acked += msg_msgcnt(msg); 553 if (!tipc_port_congested(p_ptr)) { 554 p_ptr->congested = 0; 555 if (wakeable) 556 p_ptr->wakeup(p_ptr); 557 } 558 break; 559 case CONN_PROBE: 560 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0); 561 break; 562 default: 563 /* CONN_PROBE_REPLY or unrecognized - no action required */ 564 break; 565 } 566 p_ptr->probing_state = CONFIRMED; 567 tipc_port_unlock(p_ptr); 568exit: 569 tipc_net_route_msg(r_buf); 570 buf_discard(buf); 571} 572 573static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id) 574{ 575 struct publication *publ; 576 577 if (full_id) 578 tipc_printf(buf, "<%u.%u.%u:%u>:", 579 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 580 tipc_node(tipc_own_addr), p_ptr->ref); 581 else 582 tipc_printf(buf, "%-10u:", p_ptr->ref); 583 584 if (p_ptr->connected) { 585 u32 dport = port_peerport(p_ptr); 586 u32 destnode = port_peernode(p_ptr); 587 588 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 589 tipc_zone(destnode), tipc_cluster(destnode), 590 tipc_node(destnode), dport); 591 if (p_ptr->conn_type != 0) 592 tipc_printf(buf, " via {%u,%u}", 593 p_ptr->conn_type, 594 p_ptr->conn_instance); 595 } else if (p_ptr->published) { 596 tipc_printf(buf, " bound to"); 597 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 598 if (publ->lower == publ->upper) 599 tipc_printf(buf, " {%u,%u}", publ->type, 600 publ->lower); 601 else 602 tipc_printf(buf, " {%u,%u,%u}", publ->type, 603 publ->lower, publ->upper); 604 } 605 } 606 tipc_printf(buf, "\n"); 607} 608 609#define MAX_PORT_QUERY 32768 610 611struct sk_buff *tipc_port_get_ports(void) 612{ 613 struct sk_buff *buf; 614 struct tlv_desc *rep_tlv; 615 struct print_buf pb; 616 struct tipc_port *p_ptr; 617 int str_len; 618 619 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 620 if (!buf) 621 return NULL; 622 rep_tlv = (struct tlv_desc *)buf->data; 623 624 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 625 spin_lock_bh(&tipc_port_list_lock); 626 list_for_each_entry(p_ptr, &ports, port_list) { 627 spin_lock_bh(p_ptr->lock); 628 port_print(p_ptr, &pb, 0); 629 spin_unlock_bh(p_ptr->lock); 630 } 631 spin_unlock_bh(&tipc_port_list_lock); 632 str_len = tipc_printbuf_validate(&pb); 633 634 skb_put(buf, TLV_SPACE(str_len)); 635 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 636 637 return buf; 638} 639 640void tipc_port_reinit(void) 641{ 642 struct tipc_port *p_ptr; 643 struct tipc_msg *msg; 644 645 spin_lock_bh(&tipc_port_list_lock); 646 list_for_each_entry(p_ptr, &ports, port_list) { 647 msg = &p_ptr->phdr; 648 if (msg_orignode(msg) == tipc_own_addr) 649 break; 650 msg_set_prevnode(msg, tipc_own_addr); 651 msg_set_orignode(msg, tipc_own_addr); 652 } 653 spin_unlock_bh(&tipc_port_list_lock); 654} 655 656 657/* 658 * port_dispatcher_sigh(): Signal handler for messages destinated 659 * to the tipc_port interface. 660 */ 661 662static void port_dispatcher_sigh(void *dummy) 663{ 664 struct sk_buff *buf; 665 666 spin_lock_bh(&queue_lock); 667 buf = msg_queue_head; 668 msg_queue_head = NULL; 669 spin_unlock_bh(&queue_lock); 670 671 while (buf) { 672 struct tipc_port *p_ptr; 673 struct user_port *up_ptr; 674 struct tipc_portid orig; 675 struct tipc_name_seq dseq; 676 void *usr_handle; 677 int connected; 678 int published; 679 u32 message_type; 680 681 struct sk_buff *next = buf->next; 682 struct tipc_msg *msg = buf_msg(buf); 683 u32 dref = msg_destport(msg); 684 685 message_type = msg_type(msg); 686 if (message_type > TIPC_DIRECT_MSG) 687 goto reject; /* Unsupported message type */ 688 689 p_ptr = tipc_port_lock(dref); 690 if (!p_ptr) 691 goto reject; /* Port deleted while msg in queue */ 692 693 orig.ref = msg_origport(msg); 694 orig.node = msg_orignode(msg); 695 up_ptr = p_ptr->user_port; 696 usr_handle = up_ptr->usr_handle; 697 connected = p_ptr->connected; 698 published = p_ptr->published; 699 700 if (unlikely(msg_errcode(msg))) 701 goto err; 702 703 switch (message_type) { 704 705 case TIPC_CONN_MSG:{ 706 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 707 u32 peer_port = port_peerport(p_ptr); 708 u32 peer_node = port_peernode(p_ptr); 709 u32 dsz; 710 711 tipc_port_unlock(p_ptr); 712 if (unlikely(!cb)) 713 goto reject; 714 if (unlikely(!connected)) { 715 if (tipc_connect2port(dref, &orig)) 716 goto reject; 717 } else if ((msg_origport(msg) != peer_port) || 718 (msg_orignode(msg) != peer_node)) 719 goto reject; 720 dsz = msg_data_sz(msg); 721 if (unlikely(dsz && 722 (++p_ptr->conn_unacked >= 723 TIPC_FLOW_CONTROL_WIN))) 724 tipc_acknowledge(dref, 725 p_ptr->conn_unacked); 726 skb_pull(buf, msg_hdr_sz(msg)); 727 cb(usr_handle, dref, &buf, msg_data(msg), dsz); 728 break; 729 } 730 case TIPC_DIRECT_MSG:{ 731 tipc_msg_event cb = up_ptr->msg_cb; 732 733 tipc_port_unlock(p_ptr); 734 if (unlikely(!cb || connected)) 735 goto reject; 736 skb_pull(buf, msg_hdr_sz(msg)); 737 cb(usr_handle, dref, &buf, msg_data(msg), 738 msg_data_sz(msg), msg_importance(msg), 739 &orig); 740 break; 741 } 742 case TIPC_MCAST_MSG: 743 case TIPC_NAMED_MSG:{ 744 tipc_named_msg_event cb = up_ptr->named_msg_cb; 745 746 tipc_port_unlock(p_ptr); 747 if (unlikely(!cb || connected || !published)) 748 goto reject; 749 dseq.type = msg_nametype(msg); 750 dseq.lower = msg_nameinst(msg); 751 dseq.upper = (message_type == TIPC_NAMED_MSG) 752 ? dseq.lower : msg_nameupper(msg); 753 skb_pull(buf, msg_hdr_sz(msg)); 754 cb(usr_handle, dref, &buf, msg_data(msg), 755 msg_data_sz(msg), msg_importance(msg), 756 &orig, &dseq); 757 break; 758 } 759 } 760 if (buf) 761 buf_discard(buf); 762 buf = next; 763 continue; 764err: 765 switch (message_type) { 766 767 case TIPC_CONN_MSG:{ 768 tipc_conn_shutdown_event cb = 769 up_ptr->conn_err_cb; 770 u32 peer_port = port_peerport(p_ptr); 771 u32 peer_node = port_peernode(p_ptr); 772 773 tipc_port_unlock(p_ptr); 774 if (!cb || !connected) 775 break; 776 if ((msg_origport(msg) != peer_port) || 777 (msg_orignode(msg) != peer_node)) 778 break; 779 tipc_disconnect(dref); 780 skb_pull(buf, msg_hdr_sz(msg)); 781 cb(usr_handle, dref, &buf, msg_data(msg), 782 msg_data_sz(msg), msg_errcode(msg)); 783 break; 784 } 785 case TIPC_DIRECT_MSG:{ 786 tipc_msg_err_event cb = up_ptr->err_cb; 787 788 tipc_port_unlock(p_ptr); 789 if (!cb || connected) 790 break; 791 skb_pull(buf, msg_hdr_sz(msg)); 792 cb(usr_handle, dref, &buf, msg_data(msg), 793 msg_data_sz(msg), msg_errcode(msg), &orig); 794 break; 795 } 796 case TIPC_MCAST_MSG: 797 case TIPC_NAMED_MSG:{ 798 tipc_named_msg_err_event cb = 799 up_ptr->named_err_cb; 800 801 tipc_port_unlock(p_ptr); 802 if (!cb || connected) 803 break; 804 dseq.type = msg_nametype(msg); 805 dseq.lower = msg_nameinst(msg); 806 dseq.upper = (message_type == TIPC_NAMED_MSG) 807 ? dseq.lower : msg_nameupper(msg); 808 skb_pull(buf, msg_hdr_sz(msg)); 809 cb(usr_handle, dref, &buf, msg_data(msg), 810 msg_data_sz(msg), msg_errcode(msg), &dseq); 811 break; 812 } 813 } 814 if (buf) 815 buf_discard(buf); 816 buf = next; 817 continue; 818reject: 819 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 820 buf = next; 821 } 822} 823 824/* 825 * port_dispatcher(): Dispatcher for messages destinated 826 * to the tipc_port interface. Called with port locked. 827 */ 828 829static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 830{ 831 buf->next = NULL; 832 spin_lock_bh(&queue_lock); 833 if (msg_queue_head) { 834 msg_queue_tail->next = buf; 835 msg_queue_tail = buf; 836 } else { 837 msg_queue_tail = msg_queue_head = buf; 838 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 839 } 840 spin_unlock_bh(&queue_lock); 841 return 0; 842} 843 844/* 845 * Wake up port after congestion: Called with port locked, 846 * 847 */ 848 849static void port_wakeup_sh(unsigned long ref) 850{ 851 struct tipc_port *p_ptr; 852 struct user_port *up_ptr; 853 tipc_continue_event cb = NULL; 854 void *uh = NULL; 855 856 p_ptr = tipc_port_lock(ref); 857 if (p_ptr) { 858 up_ptr = p_ptr->user_port; 859 if (up_ptr) { 860 cb = up_ptr->continue_event_cb; 861 uh = up_ptr->usr_handle; 862 } 863 tipc_port_unlock(p_ptr); 864 } 865 if (cb) 866 cb(uh, ref); 867} 868 869 870static void port_wakeup(struct tipc_port *p_ptr) 871{ 872 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 873} 874 875void tipc_acknowledge(u32 ref, u32 ack) 876{ 877 struct tipc_port *p_ptr; 878 struct sk_buff *buf = NULL; 879 880 p_ptr = tipc_port_lock(ref); 881 if (!p_ptr) 882 return; 883 if (p_ptr->connected) { 884 p_ptr->conn_unacked -= ack; 885 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); 886 } 887 tipc_port_unlock(p_ptr); 888 tipc_net_route_msg(buf); 889} 890 891/* 892 * tipc_createport(): user level call. 893 */ 894 895int tipc_createport(void *usr_handle, 896 unsigned int importance, 897 tipc_msg_err_event error_cb, 898 tipc_named_msg_err_event named_error_cb, 899 tipc_conn_shutdown_event conn_error_cb, 900 tipc_msg_event msg_cb, 901 tipc_named_msg_event named_msg_cb, 902 tipc_conn_msg_event conn_msg_cb, 903 tipc_continue_event continue_event_cb,/* May be zero */ 904 u32 *portref) 905{ 906 struct user_port *up_ptr; 907 struct tipc_port *p_ptr; 908 909 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 910 if (!up_ptr) { 911 warn("Port creation failed, no memory\n"); 912 return -ENOMEM; 913 } 914 p_ptr = (struct tipc_port *)tipc_createport_raw(NULL, port_dispatcher, 915 port_wakeup, importance); 916 if (!p_ptr) { 917 kfree(up_ptr); 918 return -ENOMEM; 919 } 920 921 p_ptr->user_port = up_ptr; 922 up_ptr->usr_handle = usr_handle; 923 up_ptr->ref = p_ptr->ref; 924 up_ptr->err_cb = error_cb; 925 up_ptr->named_err_cb = named_error_cb; 926 up_ptr->conn_err_cb = conn_error_cb; 927 up_ptr->msg_cb = msg_cb; 928 up_ptr->named_msg_cb = named_msg_cb; 929 up_ptr->conn_msg_cb = conn_msg_cb; 930 up_ptr->continue_event_cb = continue_event_cb; 931 *portref = p_ptr->ref; 932 tipc_port_unlock(p_ptr); 933 return 0; 934} 935 936int tipc_portimportance(u32 ref, unsigned int *importance) 937{ 938 struct tipc_port *p_ptr; 939 940 p_ptr = tipc_port_lock(ref); 941 if (!p_ptr) 942 return -EINVAL; 943 *importance = (unsigned int)msg_importance(&p_ptr->phdr); 944 tipc_port_unlock(p_ptr); 945 return 0; 946} 947 948int tipc_set_portimportance(u32 ref, unsigned int imp) 949{ 950 struct tipc_port *p_ptr; 951 952 if (imp > TIPC_CRITICAL_IMPORTANCE) 953 return -EINVAL; 954 955 p_ptr = tipc_port_lock(ref); 956 if (!p_ptr) 957 return -EINVAL; 958 msg_set_importance(&p_ptr->phdr, (u32)imp); 959 tipc_port_unlock(p_ptr); 960 return 0; 961} 962 963 964int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 965{ 966 struct tipc_port *p_ptr; 967 struct publication *publ; 968 u32 key; 969 int res = -EINVAL; 970 971 p_ptr = tipc_port_lock(ref); 972 if (!p_ptr) 973 return -EINVAL; 974 975 if (p_ptr->connected) 976 goto exit; 977 if (seq->lower > seq->upper) 978 goto exit; 979 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 980 goto exit; 981 key = ref + p_ptr->pub_count + 1; 982 if (key == ref) { 983 res = -EADDRINUSE; 984 goto exit; 985 } 986 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 987 scope, p_ptr->ref, key); 988 if (publ) { 989 list_add(&publ->pport_list, &p_ptr->publications); 990 p_ptr->pub_count++; 991 p_ptr->published = 1; 992 res = 0; 993 } 994exit: 995 tipc_port_unlock(p_ptr); 996 return res; 997} 998 999int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1000{ 1001 struct tipc_port *p_ptr; 1002 struct publication *publ; 1003 struct publication *tpubl; 1004 int res = -EINVAL; 1005 1006 p_ptr = tipc_port_lock(ref); 1007 if (!p_ptr) 1008 return -EINVAL; 1009 if (!seq) { 1010 list_for_each_entry_safe(publ, tpubl, 1011 &p_ptr->publications, pport_list) { 1012 tipc_nametbl_withdraw(publ->type, publ->lower, 1013 publ->ref, publ->key); 1014 } 1015 res = 0; 1016 } else { 1017 list_for_each_entry_safe(publ, tpubl, 1018 &p_ptr->publications, pport_list) { 1019 if (publ->scope != scope) 1020 continue; 1021 if (publ->type != seq->type) 1022 continue; 1023 if (publ->lower != seq->lower) 1024 continue; 1025 if (publ->upper != seq->upper) 1026 break; 1027 tipc_nametbl_withdraw(publ->type, publ->lower, 1028 publ->ref, publ->key); 1029 res = 0; 1030 break; 1031 } 1032 } 1033 if (list_empty(&p_ptr->publications)) 1034 p_ptr->published = 0; 1035 tipc_port_unlock(p_ptr); 1036 return res; 1037} 1038 1039int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1040{ 1041 struct tipc_port *p_ptr; 1042 struct tipc_msg *msg; 1043 int res = -EINVAL; 1044 1045 p_ptr = tipc_port_lock(ref); 1046 if (!p_ptr) 1047 return -EINVAL; 1048 if (p_ptr->published || p_ptr->connected) 1049 goto exit; 1050 if (!peer->ref) 1051 goto exit; 1052 1053 msg = &p_ptr->phdr; 1054 msg_set_destnode(msg, peer->node); 1055 msg_set_destport(msg, peer->ref); 1056 msg_set_orignode(msg, tipc_own_addr); 1057 msg_set_origport(msg, p_ptr->ref); 1058 msg_set_type(msg, TIPC_CONN_MSG); 1059 msg_set_lookup_scope(msg, 0); 1060 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1061 1062 p_ptr->probing_interval = PROBING_INTERVAL; 1063 p_ptr->probing_state = CONFIRMED; 1064 p_ptr->connected = 1; 1065 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1066 1067 tipc_nodesub_subscribe(&p_ptr->subscription, peer->node, 1068 (void *)(unsigned long)ref, 1069 (net_ev_handler)port_handle_node_down); 1070 res = 0; 1071exit: 1072 tipc_port_unlock(p_ptr); 1073 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1074 return res; 1075} 1076 1077/** 1078 * tipc_disconnect_port - disconnect port from peer 1079 * 1080 * Port must be locked. 1081 */ 1082 1083int tipc_disconnect_port(struct tipc_port *tp_ptr) 1084{ 1085 int res; 1086 1087 if (tp_ptr->connected) { 1088 tp_ptr->connected = 0; 1089 /* let timer expire on it's own to avoid deadlock! */ 1090 tipc_nodesub_unsubscribe( 1091 &((struct tipc_port *)tp_ptr)->subscription); 1092 res = 0; 1093 } else { 1094 res = -ENOTCONN; 1095 } 1096 return res; 1097} 1098 1099/* 1100 * tipc_disconnect(): Disconnect port form peer. 1101 * This is a node local operation. 1102 */ 1103 1104int tipc_disconnect(u32 ref) 1105{ 1106 struct tipc_port *p_ptr; 1107 int res; 1108 1109 p_ptr = tipc_port_lock(ref); 1110 if (!p_ptr) 1111 return -EINVAL; 1112 res = tipc_disconnect_port((struct tipc_port *)p_ptr); 1113 tipc_port_unlock(p_ptr); 1114 return res; 1115} 1116 1117/* 1118 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1119 */ 1120int tipc_shutdown(u32 ref) 1121{ 1122 struct tipc_port *p_ptr; 1123 struct sk_buff *buf = NULL; 1124 1125 p_ptr = tipc_port_lock(ref); 1126 if (!p_ptr) 1127 return -EINVAL; 1128 1129 buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN); 1130 tipc_port_unlock(p_ptr); 1131 tipc_net_route_msg(buf); 1132 return tipc_disconnect(ref); 1133} 1134 1135/* 1136 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1137 * message for this node. 1138 */ 1139 1140static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect, 1141 struct iovec const *msg_sect, 1142 unsigned int total_len) 1143{ 1144 struct sk_buff *buf; 1145 int res; 1146 1147 res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len, 1148 MAX_MSG_SIZE, !sender->user_port, &buf); 1149 if (likely(buf)) 1150 tipc_port_recv_msg(buf); 1151 return res; 1152} 1153 1154/** 1155 * tipc_send - send message sections on connection 1156 */ 1157 1158int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect, 1159 unsigned int total_len) 1160{ 1161 struct tipc_port *p_ptr; 1162 u32 destnode; 1163 int res; 1164 1165 p_ptr = tipc_port_deref(ref); 1166 if (!p_ptr || !p_ptr->connected) 1167 return -EINVAL; 1168 1169 p_ptr->congested = 1; 1170 if (!tipc_port_congested(p_ptr)) { 1171 destnode = port_peernode(p_ptr); 1172 if (likely(destnode != tipc_own_addr)) 1173 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1174 total_len, destnode); 1175 else 1176 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect, 1177 total_len); 1178 1179 if (likely(res != -ELINKCONG)) { 1180 p_ptr->congested = 0; 1181 if (res > 0) 1182 p_ptr->sent++; 1183 return res; 1184 } 1185 } 1186 if (port_unreliable(p_ptr)) { 1187 p_ptr->congested = 0; 1188 return total_len; 1189 } 1190 return -ELINKCONG; 1191} 1192 1193/** 1194 * tipc_send2name - send message sections to port name 1195 */ 1196 1197int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain, 1198 unsigned int num_sect, struct iovec const *msg_sect, 1199 unsigned int total_len) 1200{ 1201 struct tipc_port *p_ptr; 1202 struct tipc_msg *msg; 1203 u32 destnode = domain; 1204 u32 destport; 1205 int res; 1206 1207 p_ptr = tipc_port_deref(ref); 1208 if (!p_ptr || p_ptr->connected) 1209 return -EINVAL; 1210 1211 msg = &p_ptr->phdr; 1212 msg_set_type(msg, TIPC_NAMED_MSG); 1213 msg_set_orignode(msg, tipc_own_addr); 1214 msg_set_origport(msg, ref); 1215 msg_set_hdr_sz(msg, NAMED_H_SIZE); 1216 msg_set_nametype(msg, name->type); 1217 msg_set_nameinst(msg, name->instance); 1218 msg_set_lookup_scope(msg, tipc_addr_scope(domain)); 1219 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1220 msg_set_destnode(msg, destnode); 1221 msg_set_destport(msg, destport); 1222 1223 if (likely(destport)) { 1224 if (likely(destnode == tipc_own_addr)) 1225 res = tipc_port_recv_sections(p_ptr, num_sect, 1226 msg_sect, total_len); 1227 else 1228 res = tipc_link_send_sections_fast(p_ptr, msg_sect, 1229 num_sect, total_len, 1230 destnode); 1231 if (likely(res != -ELINKCONG)) { 1232 if (res > 0) 1233 p_ptr->sent++; 1234 return res; 1235 } 1236 if (port_unreliable(p_ptr)) { 1237 return total_len; 1238 } 1239 return -ELINKCONG; 1240 } 1241 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1242 total_len, TIPC_ERR_NO_NAME); 1243} 1244 1245/** 1246 * tipc_send2port - send message sections to port identity 1247 */ 1248 1249int tipc_send2port(u32 ref, struct tipc_portid const *dest, 1250 unsigned int num_sect, struct iovec const *msg_sect, 1251 unsigned int total_len) 1252{ 1253 struct tipc_port *p_ptr; 1254 struct tipc_msg *msg; 1255 int res; 1256 1257 p_ptr = tipc_port_deref(ref); 1258 if (!p_ptr || p_ptr->connected) 1259 return -EINVAL; 1260 1261 msg = &p_ptr->phdr; 1262 msg_set_type(msg, TIPC_DIRECT_MSG); 1263 msg_set_lookup_scope(msg, 0); 1264 msg_set_orignode(msg, tipc_own_addr); 1265 msg_set_origport(msg, ref); 1266 msg_set_destnode(msg, dest->node); 1267 msg_set_destport(msg, dest->ref); 1268 msg_set_hdr_sz(msg, BASIC_H_SIZE); 1269 1270 if (dest->node == tipc_own_addr) 1271 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect, 1272 total_len); 1273 else 1274 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1275 total_len, dest->node); 1276 if (likely(res != -ELINKCONG)) { 1277 if (res > 0) 1278 p_ptr->sent++; 1279 return res; 1280 } 1281 if (port_unreliable(p_ptr)) { 1282 return total_len; 1283 } 1284 return -ELINKCONG; 1285} 1286 1287/** 1288 * tipc_send_buf2port - send message buffer to port identity 1289 */ 1290 1291int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest, 1292 struct sk_buff *buf, unsigned int dsz) 1293{ 1294 struct tipc_port *p_ptr; 1295 struct tipc_msg *msg; 1296 int res; 1297 1298 p_ptr = (struct tipc_port *)tipc_ref_deref(ref); 1299 if (!p_ptr || p_ptr->connected) 1300 return -EINVAL; 1301 1302 msg = &p_ptr->phdr; 1303 msg_set_type(msg, TIPC_DIRECT_MSG); 1304 msg_set_orignode(msg, tipc_own_addr); 1305 msg_set_origport(msg, ref); 1306 msg_set_destnode(msg, dest->node); 1307 msg_set_destport(msg, dest->ref); 1308 msg_set_hdr_sz(msg, BASIC_H_SIZE); 1309 msg_set_size(msg, BASIC_H_SIZE + dsz); 1310 if (skb_cow(buf, BASIC_H_SIZE)) 1311 return -ENOMEM; 1312 1313 skb_push(buf, BASIC_H_SIZE); 1314 skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE); 1315 1316 if (dest->node == tipc_own_addr) 1317 res = tipc_port_recv_msg(buf); 1318 else 1319 res = tipc_send_buf_fast(buf, dest->node); 1320 if (likely(res != -ELINKCONG)) { 1321 if (res > 0) 1322 p_ptr->sent++; 1323 return res; 1324 } 1325 if (port_unreliable(p_ptr)) 1326 return dsz; 1327 return -ELINKCONG; 1328} 1329