Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'tcp-zero-copy-receive'

Eric Dumazet says:

====================
tcp: add zero copy receive

This patch series add mmap() support to TCP sockets for RX zero copy.

While tcp_mmap() patch itself is quite small (~100 LOC), optimal support
for asynchronous mmap() required better SO_RCVLOWAT behavior, and a
test program to demonstrate how mmap() on TCP sockets can be used.

Note that mmap() (and associated munmap()) calls are adding more
pressure on per-process VM semaphore, so might not show benefit
for processus with high number of threads.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>

+608 -7
+1
include/linux/net.h
··· 197 197 int offset, size_t size, int flags); 198 198 int (*sendmsg_locked)(struct sock *sk, struct msghdr *msg, 199 199 size_t size); 200 + int (*set_rcvlowat)(struct sock *sk, int val); 200 201 }; 201 202 202 203 #define DECLARE_SOCKADDR(type, dst, src) \
+4
include/net/tcp.h
··· 402 402 void tcp_syn_ack_timeout(const struct request_sock *req); 403 403 int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock, 404 404 int flags, int *addr_len); 405 + int tcp_set_rcvlowat(struct sock *sk, int val); 406 + void tcp_data_ready(struct sock *sk); 407 + int tcp_mmap(struct file *file, struct socket *sock, 408 + struct vm_area_struct *vma); 405 409 void tcp_parse_options(const struct net *net, const struct sk_buff *skb, 406 410 struct tcp_options_received *opt_rx, 407 411 int estab, struct tcp_fastopen_cookie *foc);
+4 -1
net/core/sock.c
··· 905 905 case SO_RCVLOWAT: 906 906 if (val < 0) 907 907 val = INT_MAX; 908 - sk->sk_rcvlowat = val ? : 1; 908 + if (sock->ops->set_rcvlowat) 909 + ret = sock->ops->set_rcvlowat(sk, val); 910 + else 911 + sk->sk_rcvlowat = val ? : 1; 909 912 break; 910 913 911 914 case SO_RCVTIMEO:
+2 -1
net/ipv4/af_inet.c
··· 994 994 .getsockopt = sock_common_getsockopt, 995 995 .sendmsg = inet_sendmsg, 996 996 .recvmsg = inet_recvmsg, 997 - .mmap = sock_no_mmap, 997 + .mmap = tcp_mmap, 998 998 .sendpage = inet_sendpage, 999 999 .splice_read = tcp_splice_read, 1000 1000 .read_sock = tcp_read_sock, ··· 1006 1006 .compat_getsockopt = compat_sock_common_getsockopt, 1007 1007 .compat_ioctl = inet_compat_ioctl, 1008 1008 #endif 1009 + .set_rcvlowat = tcp_set_rcvlowat, 1009 1010 }; 1010 1011 EXPORT_SYMBOL(inet_stream_ops); 1011 1012
+138
net/ipv4/tcp.c
··· 1701 1701 } 1702 1702 EXPORT_SYMBOL(tcp_peek_len); 1703 1703 1704 + /* Make sure sk_rcvbuf is big enough to satisfy SO_RCVLOWAT hint */ 1705 + int tcp_set_rcvlowat(struct sock *sk, int val) 1706 + { 1707 + sk->sk_rcvlowat = val ? : 1; 1708 + 1709 + /* Check if we need to signal EPOLLIN right now */ 1710 + tcp_data_ready(sk); 1711 + 1712 + if (sk->sk_userlocks & SOCK_RCVBUF_LOCK) 1713 + return 0; 1714 + 1715 + /* val comes from user space and might be close to INT_MAX */ 1716 + val <<= 1; 1717 + if (val < 0) 1718 + val = INT_MAX; 1719 + 1720 + val = min(val, sock_net(sk)->ipv4.sysctl_tcp_rmem[2]); 1721 + if (val > sk->sk_rcvbuf) { 1722 + sk->sk_rcvbuf = val; 1723 + tcp_sk(sk)->window_clamp = tcp_win_from_space(sk, val); 1724 + } 1725 + return 0; 1726 + } 1727 + EXPORT_SYMBOL(tcp_set_rcvlowat); 1728 + 1729 + /* When user wants to mmap X pages, we first need to perform the mapping 1730 + * before freeing any skbs in receive queue, otherwise user would be unable 1731 + * to fallback to standard recvmsg(). This happens if some data in the 1732 + * requested block is not exactly fitting in a page. 1733 + * 1734 + * We only support order-0 pages for the moment. 1735 + * mmap() on TCP is very strict, there is no point 1736 + * trying to accommodate with pathological layouts. 1737 + */ 1738 + int tcp_mmap(struct file *file, struct socket *sock, 1739 + struct vm_area_struct *vma) 1740 + { 1741 + unsigned long size = vma->vm_end - vma->vm_start; 1742 + unsigned int nr_pages = size >> PAGE_SHIFT; 1743 + struct page **pages_array = NULL; 1744 + u32 seq, len, offset, nr = 0; 1745 + struct sock *sk = sock->sk; 1746 + const skb_frag_t *frags; 1747 + struct tcp_sock *tp; 1748 + struct sk_buff *skb; 1749 + int ret; 1750 + 1751 + if (vma->vm_pgoff || !nr_pages) 1752 + return -EINVAL; 1753 + 1754 + if (vma->vm_flags & VM_WRITE) 1755 + return -EPERM; 1756 + /* TODO: Maybe the following is not needed if pages are COW */ 1757 + vma->vm_flags &= ~VM_MAYWRITE; 1758 + 1759 + lock_sock(sk); 1760 + 1761 + ret = -ENOTCONN; 1762 + if (sk->sk_state == TCP_LISTEN) 1763 + goto out; 1764 + 1765 + sock_rps_record_flow(sk); 1766 + 1767 + if (tcp_inq(sk) < size) { 1768 + ret = sock_flag(sk, SOCK_DONE) ? -EIO : -EAGAIN; 1769 + goto out; 1770 + } 1771 + tp = tcp_sk(sk); 1772 + seq = tp->copied_seq; 1773 + /* Abort if urgent data is in the area */ 1774 + if (unlikely(tp->urg_data)) { 1775 + u32 urg_offset = tp->urg_seq - seq; 1776 + 1777 + ret = -EINVAL; 1778 + if (urg_offset < size) 1779 + goto out; 1780 + } 1781 + ret = -ENOMEM; 1782 + pages_array = kvmalloc_array(nr_pages, sizeof(struct page *), 1783 + GFP_KERNEL); 1784 + if (!pages_array) 1785 + goto out; 1786 + skb = tcp_recv_skb(sk, seq, &offset); 1787 + ret = -EINVAL; 1788 + skb_start: 1789 + /* We do not support anything not in page frags */ 1790 + offset -= skb_headlen(skb); 1791 + if ((int)offset < 0) 1792 + goto out; 1793 + if (skb_has_frag_list(skb)) 1794 + goto out; 1795 + len = skb->data_len - offset; 1796 + frags = skb_shinfo(skb)->frags; 1797 + while (offset) { 1798 + if (frags->size > offset) 1799 + goto out; 1800 + offset -= frags->size; 1801 + frags++; 1802 + } 1803 + while (nr < nr_pages) { 1804 + if (len) { 1805 + if (len < PAGE_SIZE) 1806 + goto out; 1807 + if (frags->size != PAGE_SIZE || frags->page_offset) 1808 + goto out; 1809 + pages_array[nr++] = skb_frag_page(frags); 1810 + frags++; 1811 + len -= PAGE_SIZE; 1812 + seq += PAGE_SIZE; 1813 + continue; 1814 + } 1815 + skb = skb->next; 1816 + offset = seq - TCP_SKB_CB(skb)->seq; 1817 + goto skb_start; 1818 + } 1819 + /* OK, we have a full set of pages ready to be inserted into vma */ 1820 + for (nr = 0; nr < nr_pages; nr++) { 1821 + ret = vm_insert_page(vma, vma->vm_start + (nr << PAGE_SHIFT), 1822 + pages_array[nr]); 1823 + if (ret) 1824 + goto out; 1825 + } 1826 + /* operation is complete, we can 'consume' all skbs */ 1827 + tp->copied_seq = seq; 1828 + tcp_rcv_space_adjust(sk); 1829 + 1830 + /* Clean up data we have read: This will do ACK frames. */ 1831 + tcp_recv_skb(sk, seq, &offset); 1832 + tcp_cleanup_rbuf(sk, size); 1833 + 1834 + ret = 0; 1835 + out: 1836 + release_sock(sk); 1837 + kvfree(pages_array); 1838 + return ret; 1839 + } 1840 + EXPORT_SYMBOL(tcp_mmap); 1841 + 1704 1842 static void tcp_update_recv_tstamps(struct sk_buff *skb, 1705 1843 struct scm_timestamping *tss) 1706 1844 {
+18 -4
net/ipv4/tcp_input.c
··· 4576 4576 4577 4577 } 4578 4578 4579 + void tcp_data_ready(struct sock *sk) 4580 + { 4581 + const struct tcp_sock *tp = tcp_sk(sk); 4582 + int avail = tp->rcv_nxt - tp->copied_seq; 4583 + 4584 + if (avail < sk->sk_rcvlowat && !sock_flag(sk, SOCK_DONE)) 4585 + return; 4586 + 4587 + sk->sk_data_ready(sk); 4588 + } 4589 + 4579 4590 static void tcp_data_queue(struct sock *sk, struct sk_buff *skb) 4580 4591 { 4581 4592 struct tcp_sock *tp = tcp_sk(sk); ··· 4644 4633 if (eaten > 0) 4645 4634 kfree_skb_partial(skb, fragstolen); 4646 4635 if (!sock_flag(sk, SOCK_DEAD)) 4647 - sk->sk_data_ready(sk); 4636 + tcp_data_ready(sk); 4648 4637 return; 4649 4638 } 4650 4639 ··· 5037 5026 /* More than one full frame received... */ 5038 5027 if (((tp->rcv_nxt - tp->rcv_wup) > inet_csk(sk)->icsk_ack.rcv_mss && 5039 5028 /* ... and right edge of window advances far enough. 5040 - * (tcp_recvmsg() will send ACK otherwise). Or... 5029 + * (tcp_recvmsg() will send ACK otherwise). 5030 + * If application uses SO_RCVLOWAT, we want send ack now if 5031 + * we have not received enough bytes to satisfy the condition. 5041 5032 */ 5042 - __tcp_select_window(sk) >= tp->rcv_wnd) || 5033 + (tp->rcv_nxt - tp->copied_seq < sk->sk_rcvlowat || 5034 + __tcp_select_window(sk) >= tp->rcv_wnd)) || 5043 5035 /* We ACK each frame or... */ 5044 5036 tcp_in_quickack_mode(sk) || 5045 5037 /* We have out of order data. */ ··· 5445 5431 no_ack: 5446 5432 if (eaten) 5447 5433 kfree_skb_partial(skb, fragstolen); 5448 - sk->sk_data_ready(sk); 5434 + tcp_data_ready(sk); 5449 5435 return; 5450 5436 } 5451 5437 }
+2 -1
net/ipv6/af_inet6.c
··· 579 579 .getsockopt = sock_common_getsockopt, /* ok */ 580 580 .sendmsg = inet_sendmsg, /* ok */ 581 581 .recvmsg = inet_recvmsg, /* ok */ 582 - .mmap = sock_no_mmap, 582 + .mmap = tcp_mmap, 583 583 .sendpage = inet_sendpage, 584 584 .sendmsg_locked = tcp_sendmsg_locked, 585 585 .sendpage_locked = tcp_sendpage_locked, ··· 590 590 .compat_setsockopt = compat_sock_common_setsockopt, 591 591 .compat_getsockopt = compat_sock_common_getsockopt, 592 592 #endif 593 + .set_rcvlowat = tcp_set_rcvlowat, 593 594 }; 594 595 595 596 const struct proto_ops inet6_dgram_ops = {
+2
tools/testing/selftests/net/Makefile
··· 8 8 TEST_PROGS += fib_tests.sh fib-onlink-tests.sh pmtu.sh 9 9 TEST_GEN_FILES = socket 10 10 TEST_GEN_FILES += psock_fanout psock_tpacket msg_zerocopy 11 + TEST_GEN_FILES += tcp_mmap 11 12 TEST_GEN_PROGS = reuseport_bpf reuseport_bpf_cpu reuseport_bpf_numa 12 13 TEST_GEN_PROGS += reuseport_dualstack reuseaddr_conflict 13 14 14 15 include ../lib.mk 15 16 16 17 $(OUTPUT)/reuseport_bpf_numa: LDFLAGS += -lnuma 18 + $(OUTPUT)/tcp_mmap: LDFLAGS += -lpthread
+437
tools/testing/selftests/net/tcp_mmap.c
··· 1 + /* 2 + * Copyright 2018 Google Inc. 3 + * Author: Eric Dumazet (edumazet@google.com) 4 + * 5 + * Reference program demonstrating tcp mmap() usage, 6 + * and SO_RCVLOWAT hints for receiver. 7 + * 8 + * Note : NIC with header split is needed to use mmap() on TCP : 9 + * Each incoming frame must be a multiple of PAGE_SIZE bytes of TCP payload. 10 + * 11 + * How to use on loopback interface : 12 + * 13 + * ifconfig lo mtu 61512 # 15*4096 + 40 (ipv6 header) + 32 (TCP with TS option header) 14 + * tcp_mmap -s -z & 15 + * tcp_mmap -H ::1 -z 16 + * 17 + * Or leave default lo mtu, but use -M option to set TCP_MAXSEG option to (4096 + 12) 18 + * (4096 : page size on x86, 12: TCP TS option length) 19 + * tcp_mmap -s -z -M $((4096+12)) & 20 + * tcp_mmap -H ::1 -z -M $((4096+12)) 21 + * 22 + * Note: -z option on sender uses MSG_ZEROCOPY, which forces a copy when packets go through loopback interface. 23 + * We might use sendfile() instead, but really this test program is about mmap(), for receivers ;) 24 + * 25 + * $ ./tcp_mmap -s & # Without mmap() 26 + * $ for i in {1..4}; do ./tcp_mmap -H ::1 -z ; done 27 + * received 32768 MB (0 % mmap'ed) in 14.1157 s, 19.4732 Gbit 28 + * cpu usage user:0.057 sys:7.815, 240.234 usec per MB, 65531 c-switches 29 + * received 32768 MB (0 % mmap'ed) in 14.6833 s, 18.7204 Gbit 30 + * cpu usage user:0.043 sys:8.103, 248.596 usec per MB, 65524 c-switches 31 + * received 32768 MB (0 % mmap'ed) in 11.143 s, 24.6682 Gbit 32 + * cpu usage user:0.044 sys:6.576, 202.026 usec per MB, 65519 c-switches 33 + * received 32768 MB (0 % mmap'ed) in 14.9056 s, 18.4413 Gbit 34 + * cpu usage user:0.036 sys:8.193, 251.129 usec per MB, 65530 c-switches 35 + * $ kill %1 # kill tcp_mmap server 36 + * 37 + * $ ./tcp_mmap -s -z & # With mmap() 38 + * $ for i in {1..4}; do ./tcp_mmap -H ::1 -z ; done 39 + * received 32768 MB (99.9939 % mmap'ed) in 6.73792 s, 40.7956 Gbit 40 + * cpu usage user:0.045 sys:2.827, 87.6465 usec per MB, 65532 c-switches 41 + * received 32768 MB (99.9939 % mmap'ed) in 7.26732 s, 37.8238 Gbit 42 + * cpu usage user:0.037 sys:3.087, 95.3369 usec per MB, 65532 c-switches 43 + * received 32768 MB (99.9939 % mmap'ed) in 7.61661 s, 36.0893 Gbit 44 + * cpu usage user:0.046 sys:3.559, 110.016 usec per MB, 65529 c-switches 45 + * received 32768 MB (99.9939 % mmap'ed) in 7.43764 s, 36.9577 Gbit 46 + * cpu usage user:0.035 sys:3.467, 106.873 usec per MB, 65530 c-switches 47 + * 48 + * License (GPLv2): 49 + * 50 + * This program is free software; you can redistribute it and/or modify it 51 + * under the terms and conditions of the GNU General Public License, 52 + * version 2, as published by the Free Software Foundation. 53 + * 54 + * This program is distributed in the hope it will be useful, but WITHOUT 55 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 56 + * FITNESS FOR A PARTICULAR PURPOSE. * See the GNU General Public License for 57 + * more details. 58 + * 59 + * You should have received a copy of the GNU General Public License along with 60 + * this program; if not, write to the Free Software Foundation, Inc., 61 + * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. 62 + */ 63 + #define _GNU_SOURCE 64 + #include <pthread.h> 65 + #include <sys/types.h> 66 + #include <fcntl.h> 67 + #include <error.h> 68 + #include <sys/socket.h> 69 + #include <sys/mman.h> 70 + #include <sys/resource.h> 71 + #include <unistd.h> 72 + #include <string.h> 73 + #include <stdlib.h> 74 + #include <stdio.h> 75 + #include <errno.h> 76 + #include <time.h> 77 + #include <sys/time.h> 78 + #include <netinet/in.h> 79 + #include <netinet/tcp.h> 80 + #include <arpa/inet.h> 81 + #include <poll.h> 82 + 83 + #ifndef MSG_ZEROCOPY 84 + #define MSG_ZEROCOPY 0x4000000 85 + #endif 86 + 87 + #define FILE_SZ (1UL << 35) 88 + static int cfg_family = AF_INET6; 89 + static socklen_t cfg_alen = sizeof(struct sockaddr_in6); 90 + static int cfg_port = 8787; 91 + 92 + static int rcvbuf; /* Default: autotuning. Can be set with -r <integer> option */ 93 + static int sndbuf; /* Default: autotuning. Can be set with -w <integer> option */ 94 + static int zflg; /* zero copy option. (MSG_ZEROCOPY for sender, mmap() for receiver */ 95 + static int xflg; /* hash received data (simple xor) (-h option) */ 96 + static int keepflag; /* -k option: receiver shall keep all received file in memory (no munmap() calls) */ 97 + 98 + static int chunk_size = 512*1024; 99 + 100 + unsigned long htotal; 101 + 102 + static inline void prefetch(const void *x) 103 + { 104 + #if defined(__x86_64__) 105 + asm volatile("prefetcht0 %P0" : : "m" (*(const char *)x)); 106 + #endif 107 + } 108 + 109 + void hash_zone(void *zone, unsigned int length) 110 + { 111 + unsigned long temp = htotal; 112 + 113 + while (length >= 8*sizeof(long)) { 114 + prefetch(zone + 384); 115 + temp ^= *(unsigned long *)zone; 116 + temp ^= *(unsigned long *)(zone + sizeof(long)); 117 + temp ^= *(unsigned long *)(zone + 2*sizeof(long)); 118 + temp ^= *(unsigned long *)(zone + 3*sizeof(long)); 119 + temp ^= *(unsigned long *)(zone + 4*sizeof(long)); 120 + temp ^= *(unsigned long *)(zone + 5*sizeof(long)); 121 + temp ^= *(unsigned long *)(zone + 6*sizeof(long)); 122 + temp ^= *(unsigned long *)(zone + 7*sizeof(long)); 123 + zone += 8*sizeof(long); 124 + length -= 8*sizeof(long); 125 + } 126 + while (length >= 1) { 127 + temp ^= *(unsigned char *)zone; 128 + zone += 1; 129 + length--; 130 + } 131 + htotal = temp; 132 + } 133 + 134 + void *child_thread(void *arg) 135 + { 136 + unsigned long total_mmap = 0, total = 0; 137 + unsigned long delta_usec; 138 + int flags = MAP_SHARED; 139 + struct timeval t0, t1; 140 + char *buffer = NULL; 141 + void *oaddr = NULL; 142 + double throughput; 143 + struct rusage ru; 144 + int lu, fd; 145 + 146 + fd = (int)(unsigned long)arg; 147 + 148 + gettimeofday(&t0, NULL); 149 + 150 + fcntl(fd, F_SETFL, O_NDELAY); 151 + buffer = malloc(chunk_size); 152 + if (!buffer) { 153 + perror("malloc"); 154 + goto error; 155 + } 156 + while (1) { 157 + struct pollfd pfd = { .fd = fd, .events = POLLIN, }; 158 + int sub; 159 + 160 + poll(&pfd, 1, 10000); 161 + if (zflg) { 162 + void *naddr; 163 + 164 + naddr = mmap(oaddr, chunk_size, PROT_READ, flags, fd, 0); 165 + if (naddr == (void *)-1) { 166 + if (errno == EAGAIN) { 167 + /* That is if SO_RCVLOWAT is buggy */ 168 + usleep(1000); 169 + continue; 170 + } 171 + if (errno == EINVAL) { 172 + flags = MAP_SHARED; 173 + oaddr = NULL; 174 + goto fallback; 175 + } 176 + if (errno != EIO) 177 + perror("mmap()"); 178 + break; 179 + } 180 + total_mmap += chunk_size; 181 + if (xflg) 182 + hash_zone(naddr, chunk_size); 183 + total += chunk_size; 184 + if (!keepflag) { 185 + flags |= MAP_FIXED; 186 + oaddr = naddr; 187 + } 188 + continue; 189 + } 190 + fallback: 191 + sub = 0; 192 + while (sub < chunk_size) { 193 + lu = read(fd, buffer + sub, chunk_size - sub); 194 + if (lu == 0) 195 + goto end; 196 + if (lu < 0) 197 + break; 198 + if (xflg) 199 + hash_zone(buffer + sub, lu); 200 + total += lu; 201 + sub += lu; 202 + } 203 + } 204 + end: 205 + gettimeofday(&t1, NULL); 206 + delta_usec = (t1.tv_sec - t0.tv_sec) * 1000000 + t1.tv_usec - t0.tv_usec; 207 + 208 + throughput = 0; 209 + if (delta_usec) 210 + throughput = total * 8.0 / (double)delta_usec / 1000.0; 211 + getrusage(RUSAGE_THREAD, &ru); 212 + if (total > 1024*1024) { 213 + unsigned long total_usec; 214 + unsigned long mb = total >> 20; 215 + total_usec = 1000000*ru.ru_utime.tv_sec + ru.ru_utime.tv_usec + 216 + 1000000*ru.ru_stime.tv_sec + ru.ru_stime.tv_usec; 217 + printf("received %lg MB (%lg %% mmap'ed) in %lg s, %lg Gbit\n" 218 + " cpu usage user:%lg sys:%lg, %lg usec per MB, %lu c-switches\n", 219 + total / (1024.0 * 1024.0), 220 + 100.0*total_mmap/total, 221 + (double)delta_usec / 1000000.0, 222 + throughput, 223 + (double)ru.ru_utime.tv_sec + (double)ru.ru_utime.tv_usec / 1000000.0, 224 + (double)ru.ru_stime.tv_sec + (double)ru.ru_stime.tv_usec / 1000000.0, 225 + (double)total_usec/mb, 226 + ru.ru_nvcsw); 227 + } 228 + error: 229 + free(buffer); 230 + close(fd); 231 + pthread_exit(0); 232 + } 233 + 234 + static void apply_rcvsnd_buf(int fd) 235 + { 236 + if (rcvbuf && setsockopt(fd, SOL_SOCKET, 237 + SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) == -1) { 238 + perror("setsockopt SO_RCVBUF"); 239 + } 240 + 241 + if (sndbuf && setsockopt(fd, SOL_SOCKET, 242 + SO_SNDBUF, &sndbuf, sizeof(sndbuf)) == -1) { 243 + perror("setsockopt SO_SNDBUF"); 244 + } 245 + } 246 + 247 + 248 + static void setup_sockaddr(int domain, const char *str_addr, 249 + struct sockaddr_storage *sockaddr) 250 + { 251 + struct sockaddr_in6 *addr6 = (void *) sockaddr; 252 + struct sockaddr_in *addr4 = (void *) sockaddr; 253 + 254 + switch (domain) { 255 + case PF_INET: 256 + memset(addr4, 0, sizeof(*addr4)); 257 + addr4->sin_family = AF_INET; 258 + addr4->sin_port = htons(cfg_port); 259 + if (str_addr && 260 + inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1) 261 + error(1, 0, "ipv4 parse error: %s", str_addr); 262 + break; 263 + case PF_INET6: 264 + memset(addr6, 0, sizeof(*addr6)); 265 + addr6->sin6_family = AF_INET6; 266 + addr6->sin6_port = htons(cfg_port); 267 + if (str_addr && 268 + inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1) 269 + error(1, 0, "ipv6 parse error: %s", str_addr); 270 + break; 271 + default: 272 + error(1, 0, "illegal domain"); 273 + } 274 + } 275 + 276 + static void do_accept(int fdlisten) 277 + { 278 + if (setsockopt(fdlisten, SOL_SOCKET, SO_RCVLOWAT, 279 + &chunk_size, sizeof(chunk_size)) == -1) { 280 + perror("setsockopt SO_RCVLOWAT"); 281 + } 282 + 283 + apply_rcvsnd_buf(fdlisten); 284 + 285 + while (1) { 286 + struct sockaddr_in addr; 287 + socklen_t addrlen = sizeof(addr); 288 + pthread_t th; 289 + int fd, res; 290 + 291 + fd = accept(fdlisten, (struct sockaddr *)&addr, &addrlen); 292 + if (fd == -1) { 293 + perror("accept"); 294 + continue; 295 + } 296 + res = pthread_create(&th, NULL, child_thread, 297 + (void *)(unsigned long)fd); 298 + if (res) { 299 + errno = res; 300 + perror("pthread_create"); 301 + close(fd); 302 + } 303 + } 304 + } 305 + 306 + int main(int argc, char *argv[]) 307 + { 308 + struct sockaddr_storage listenaddr, addr; 309 + unsigned int max_pacing_rate = 0; 310 + unsigned long total = 0; 311 + char *host = NULL; 312 + int fd, c, on = 1; 313 + char *buffer; 314 + int sflg = 0; 315 + int mss = 0; 316 + 317 + while ((c = getopt(argc, argv, "46p:svr:w:H:zxkP:M:")) != -1) { 318 + switch (c) { 319 + case '4': 320 + cfg_family = PF_INET; 321 + cfg_alen = sizeof(struct sockaddr_in); 322 + break; 323 + case '6': 324 + cfg_family = PF_INET6; 325 + cfg_alen = sizeof(struct sockaddr_in6); 326 + break; 327 + case 'p': 328 + cfg_port = atoi(optarg); 329 + break; 330 + case 'H': 331 + host = optarg; 332 + break; 333 + case 's': /* server : listen for incoming connections */ 334 + sflg++; 335 + break; 336 + case 'r': 337 + rcvbuf = atoi(optarg); 338 + break; 339 + case 'w': 340 + sndbuf = atoi(optarg); 341 + break; 342 + case 'z': 343 + zflg = 1; 344 + break; 345 + case 'M': 346 + mss = atoi(optarg); 347 + break; 348 + case 'x': 349 + xflg = 1; 350 + break; 351 + case 'k': 352 + keepflag = 1; 353 + break; 354 + case 'P': 355 + max_pacing_rate = atoi(optarg) ; 356 + break; 357 + default: 358 + exit(1); 359 + } 360 + } 361 + if (sflg) { 362 + int fdlisten = socket(cfg_family, SOCK_STREAM, 0); 363 + 364 + if (fdlisten == -1) { 365 + perror("socket"); 366 + exit(1); 367 + } 368 + apply_rcvsnd_buf(fdlisten); 369 + setsockopt(fdlisten, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); 370 + 371 + setup_sockaddr(cfg_family, host, &listenaddr); 372 + 373 + if (mss && 374 + setsockopt(fdlisten, SOL_TCP, TCP_MAXSEG, &mss, sizeof(mss)) == -1) { 375 + perror("setsockopt TCP_MAXSEG"); 376 + exit(1); 377 + } 378 + if (bind(fdlisten, (const struct sockaddr *)&listenaddr, cfg_alen) == -1) { 379 + perror("bind"); 380 + exit(1); 381 + } 382 + if (listen(fdlisten, 128) == -1) { 383 + perror("listen"); 384 + exit(1); 385 + } 386 + do_accept(fdlisten); 387 + } 388 + buffer = mmap(NULL, chunk_size, PROT_READ | PROT_WRITE, 389 + MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); 390 + if (buffer == (char *)-1) { 391 + perror("mmap"); 392 + exit(1); 393 + } 394 + 395 + fd = socket(AF_INET6, SOCK_STREAM, 0); 396 + if (fd == -1) { 397 + perror("socket"); 398 + exit(1); 399 + } 400 + apply_rcvsnd_buf(fd); 401 + 402 + setup_sockaddr(cfg_family, host, &addr); 403 + 404 + if (mss && 405 + setsockopt(fd, SOL_TCP, TCP_MAXSEG, &mss, sizeof(mss)) == -1) { 406 + perror("setsockopt TCP_MAXSEG"); 407 + exit(1); 408 + } 409 + if (connect(fd, (const struct sockaddr *)&addr, cfg_alen) == -1) { 410 + perror("connect"); 411 + exit(1); 412 + } 413 + if (max_pacing_rate && 414 + setsockopt(fd, SOL_SOCKET, SO_MAX_PACING_RATE, 415 + &max_pacing_rate, sizeof(max_pacing_rate)) == -1) 416 + perror("setsockopt SO_MAX_PACING_RATE"); 417 + 418 + if (zflg && setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, 419 + &on, sizeof(on)) == -1) { 420 + perror("setsockopt SO_ZEROCOPY, (-z option disabled)"); 421 + zflg = 0; 422 + } 423 + while (total < FILE_SZ) { 424 + long wr = FILE_SZ - total; 425 + 426 + if (wr > chunk_size) 427 + wr = chunk_size; 428 + /* Note : we just want to fill the pipe with 0 bytes */ 429 + wr = send(fd, buffer, wr, zflg ? MSG_ZEROCOPY : 0); 430 + if (wr <= 0) 431 + break; 432 + total += wr; 433 + } 434 + close(fd); 435 + munmap(buffer, chunk_size); 436 + return 0; 437 + }