Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

[TIPC]: Optimize stream send routine to avoid fragmentation

This patch enhances TIPC's stream socket send routine so that
it avoids transmitting data in chunks that require fragmentation
and reassembly, thereby improving performance at both the
sending and receiving ends of the connection.

The "maximum packet size" hint that records MTU info allows
the socket to decide how big a chunk it should send; in the
event that the hint has become stale, fragmentation may still
occur, but the data will be passed correctly and the hint will
be updated in time for the following send. Note: The 66060 byte
pseudo-MTU used for intra-node connections requires the send
routine to perform an additional check to ensure it does not
exceed TIPC"s limit of 66000 bytes of user data per chunk.

Signed-off-by: Allan Stephens <allan.stephens@windriver.com>
Signed-off-by: Jon Paul Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Allan Stephens and committed by
David S. Miller
05646c91 5eee6a6d

+36 -27
+4 -2
include/net/tipc/tipc_port.h
··· 1 1 /* 2 2 * include/net/tipc/tipc_port.h: Include file for privileged access to TIPC ports 3 3 * 4 - * Copyright (c) 1994-2006, Ericsson AB 5 - * Copyright (c) 2005, Wind River Systems 4 + * Copyright (c) 1994-2007, Ericsson AB 5 + * Copyright (c) 2005-2007, Wind River Systems 6 6 * All rights reserved. 7 7 * 8 8 * Redistribution and use in source and binary forms, with or without ··· 55 55 * @conn_unacked: number of unacknowledged messages received from peer port 56 56 * @published: non-zero if port has one or more associated names 57 57 * @congested: non-zero if cannot send because of link or port congestion 58 + * @max_pkt: maximum packet size "hint" used when building messages sent by port 58 59 * @ref: unique reference to port in TIPC object registry 59 60 * @phdr: preformatted message header used when sending messages 60 61 */ ··· 69 68 u32 conn_unacked; 70 69 int published; 71 70 u32 congested; 71 + u32 max_pkt; 72 72 u32 ref; 73 73 struct tipc_msg phdr; 74 74 };
+8 -8
net/tipc/link.c
··· 1 1 /* 2 2 * net/tipc/link.c: TIPC link code 3 3 * 4 - * Copyright (c) 1996-2006, Ericsson AB 5 - * Copyright (c) 2004-2006, Wind River Systems 4 + * Copyright (c) 1996-2007, Ericsson AB 5 + * Copyright (c) 2004-2007, Wind River Systems 6 6 * All rights reserved. 7 7 * 8 8 * Redistribution and use in source and binary forms, with or without ··· 1260 1260 * (Must not hold any locks while building message.) 1261 1261 */ 1262 1262 1263 - res = msg_build(hdr, msg_sect, num_sect, sender->max_pkt, 1263 + res = msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt, 1264 1264 !sender->user_port, &buf); 1265 1265 1266 1266 read_lock_bh(&tipc_net_lock); ··· 1271 1271 if (likely(l_ptr)) { 1272 1272 if (likely(buf)) { 1273 1273 res = link_send_buf_fast(l_ptr, buf, 1274 - &sender->max_pkt); 1274 + &sender->publ.max_pkt); 1275 1275 if (unlikely(res < 0)) 1276 1276 buf_discard(buf); 1277 1277 exit: ··· 1299 1299 * then re-try fast path or fragment the message 1300 1300 */ 1301 1301 1302 - sender->max_pkt = link_max_pkt(l_ptr); 1302 + sender->publ.max_pkt = link_max_pkt(l_ptr); 1303 1303 tipc_node_unlock(node); 1304 1304 read_unlock_bh(&tipc_net_lock); 1305 1305 1306 1306 1307 - if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) 1307 + if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt) 1308 1308 goto again; 1309 1309 1310 1310 return link_send_sections_long(sender, msg_sect, ··· 1357 1357 1358 1358 again: 1359 1359 fragm_no = 1; 1360 - max_pkt = sender->max_pkt - INT_H_SIZE; 1360 + max_pkt = sender->publ.max_pkt - INT_H_SIZE; 1361 1361 /* leave room for tunnel header in case of link changeover */ 1362 1362 fragm_sz = max_pkt - INT_H_SIZE; 1363 1363 /* leave room for fragmentation header in each fragment */ ··· 1463 1463 goto reject; 1464 1464 } 1465 1465 if (link_max_pkt(l_ptr) < max_pkt) { 1466 - sender->max_pkt = link_max_pkt(l_ptr); 1466 + sender->publ.max_pkt = link_max_pkt(l_ptr); 1467 1467 tipc_node_unlock(node); 1468 1468 for (; buf_chain; buf_chain = buf) { 1469 1469 buf = buf_chain->next;
+5 -5
net/tipc/port.c
··· 1 1 /* 2 2 * net/tipc/port.c: TIPC port code 3 3 * 4 - * Copyright (c) 1992-2006, Ericsson AB 5 - * Copyright (c) 2004-2005, Wind River Systems 4 + * Copyright (c) 1992-2007, Ericsson AB 5 + * Copyright (c) 2004-2007, Wind River Systems 6 6 * All rights reserved. 7 7 * 8 8 * Redistribution and use in source and binary forms, with or without ··· 239 239 } 240 240 241 241 tipc_port_lock(ref); 242 + p_ptr->publ.usr_handle = usr_handle; 243 + p_ptr->publ.max_pkt = MAX_PKT_DEFAULT; 242 244 p_ptr->publ.ref = ref; 243 245 msg = &p_ptr->publ.phdr; 244 246 msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0); ··· 250 248 msg_set_importance(msg,importance); 251 249 p_ptr->last_in_seqno = 41; 252 250 p_ptr->sent = 1; 253 - p_ptr->publ.usr_handle = usr_handle; 254 251 INIT_LIST_HEAD(&p_ptr->wait_list); 255 252 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 256 253 p_ptr->congested_link = NULL; 257 - p_ptr->max_pkt = MAX_PKT_DEFAULT; 258 254 p_ptr->dispatcher = dispatcher; 259 255 p_ptr->wakeup = wakeup; 260 256 p_ptr->user_port = NULL; ··· 1243 1243 res = TIPC_OK; 1244 1244 exit: 1245 1245 tipc_port_unlock(p_ptr); 1246 - p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1246 + p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1247 1247 return res; 1248 1248 } 1249 1249
+2 -4
net/tipc/port.h
··· 1 1 /* 2 2 * net/tipc/port.h: Include file for TIPC port code 3 3 * 4 - * Copyright (c) 1994-2006, Ericsson AB 5 - * Copyright (c) 2004-2005, Wind River Systems 4 + * Copyright (c) 1994-2007, Ericsson AB 5 + * Copyright (c) 2004-2007, Wind River Systems 6 6 * All rights reserved. 7 7 * 8 8 * Redistribution and use in source and binary forms, with or without ··· 81 81 * @acked: 82 82 * @publications: list of publications for port 83 83 * @pub_count: total # of publications port has made during its lifetime 84 - * @max_pkt: maximum packet size "hint" used when building messages sent by port 85 84 * @probing_state: 86 85 * @probing_interval: 87 86 * @last_in_seqno: ··· 101 102 u32 acked; 102 103 struct list_head publications; 103 104 u32 pub_count; 104 - u32 max_pkt; 105 105 u32 probing_state; 106 106 u32 probing_interval; 107 107 u32 last_in_seqno;
+17 -8
net/tipc/socket.c
··· 607 607 static int send_stream(struct kiocb *iocb, struct socket *sock, 608 608 struct msghdr *m, size_t total_len) 609 609 { 610 + struct tipc_port *tport; 610 611 struct msghdr my_msg; 611 612 struct iovec my_iov; 612 613 struct iovec *curr_iov; 613 614 int curr_iovlen; 614 615 char __user *curr_start; 616 + u32 hdr_size; 615 617 int curr_left; 616 618 int bytes_to_send; 617 619 int bytes_sent; 618 620 int res; 619 621 620 - if (likely(total_len <= TIPC_MAX_USER_MSG_SIZE)) 621 - return send_packet(iocb, sock, m, total_len); 622 - 623 - /* Can only send large data streams if already connected */ 622 + /* Handle special cases where there is no connection */ 624 623 625 624 if (unlikely(sock->state != SS_CONNECTED)) { 626 - if (sock->state == SS_DISCONNECTING) 625 + if (sock->state == SS_UNCONNECTED) 626 + return send_packet(iocb, sock, m, total_len); 627 + else if (sock->state == SS_DISCONNECTING) 627 628 return -EPIPE; 628 629 else 629 630 return -ENOTCONN; ··· 649 648 my_msg.msg_name = NULL; 650 649 bytes_sent = 0; 651 650 651 + tport = tipc_sk(sock->sk)->p; 652 + hdr_size = msg_hdr_sz(&tport->phdr); 653 + 652 654 while (curr_iovlen--) { 653 655 curr_start = curr_iov->iov_base; 654 656 curr_left = curr_iov->iov_len; 655 657 656 658 while (curr_left) { 657 - bytes_to_send = (curr_left < TIPC_MAX_USER_MSG_SIZE) 658 - ? curr_left : TIPC_MAX_USER_MSG_SIZE; 659 + bytes_to_send = tport->max_pkt - hdr_size; 660 + if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE) 661 + bytes_to_send = TIPC_MAX_USER_MSG_SIZE; 662 + if (curr_left < bytes_to_send) 663 + bytes_to_send = curr_left; 659 664 my_iov.iov_base = curr_start; 660 665 my_iov.iov_len = bytes_to_send; 661 666 if ((res = send_packet(iocb, sock, &my_msg, 0)) < 0) { 662 - return bytes_sent ? bytes_sent : res; 667 + if (bytes_sent != 0) 668 + res = bytes_sent; 669 + return res; 663 670 } 664 671 curr_left -= bytes_to_send; 665 672 curr_start += bytes_to_send;