Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

ipvs: add support for sync threads

Allow master and backup servers to use many threads
for sync traffic. Add sysctl var "sync_ports" to define the
number of threads. Every thread will use single UDP port,
thread 0 will use the default port 8848 while last thread
will use port 8848+sync_ports-1.

The sync traffic for connections is scheduled to many
master threads based on the cp address but one connection is
always assigned to same thread to avoid reordering of the
sync messages.

Remove ip_vs_sync_switch_mode because this check
for sync mode change is still risky. Instead, check for mode
change under sync_buff_lock.

Make sure the backup socks do not block on reading.

Special thanks to Aleksey Chudov for helping in all tests.

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Tested-by: Aleksey Chudov <aleksey.chudov@gmail.com>
Signed-off-by: Simon Horman <horms@verge.net.au>

+305 -166
+25 -9
include/net/ip_vs.h
··· 784 784 void (*timeout_change)(struct ip_vs_app *app, int flags); 785 785 }; 786 786 787 + struct ipvs_master_sync_state { 788 + struct list_head sync_queue; 789 + struct ip_vs_sync_buff *sync_buff; 790 + int sync_queue_len; 791 + unsigned int sync_queue_delay; 792 + struct task_struct *master_thread; 793 + struct delayed_work master_wakeup_work; 794 + struct netns_ipvs *ipvs; 795 + }; 796 + 787 797 /* IPVS in network namespace */ 788 798 struct netns_ipvs { 789 799 int gen; /* Generation */ ··· 880 870 #endif 881 871 int sysctl_snat_reroute; 882 872 int sysctl_sync_ver; 873 + int sysctl_sync_ports; 883 874 int sysctl_sync_qlen_max; 884 875 int sysctl_sync_sock_size; 885 876 int sysctl_cache_bypass; ··· 904 893 spinlock_t est_lock; 905 894 struct timer_list est_timer; /* Estimation timer */ 906 895 /* ip_vs_sync */ 907 - struct list_head sync_queue; 908 - int sync_queue_len; 909 - unsigned int sync_queue_delay; 910 - struct delayed_work master_wakeup_work; 911 896 spinlock_t sync_lock; 912 - struct ip_vs_sync_buff *sync_buff; 897 + struct ipvs_master_sync_state *ms; 913 898 spinlock_t sync_buff_lock; 914 - struct sockaddr_in sync_mcast_addr; 915 - struct task_struct *master_thread; 916 - struct task_struct *backup_thread; 899 + struct task_struct **backup_threads; 900 + int threads_mask; 917 901 int send_mesg_maxlen; 918 902 int recv_mesg_maxlen; 919 903 volatile int sync_state; ··· 932 926 #define IPVS_SYNC_SEND_DELAY (HZ / 50) 933 927 #define IPVS_SYNC_CHECK_PERIOD HZ 934 928 #define IPVS_SYNC_FLUSH_TIME (HZ * 2) 929 + #define IPVS_SYNC_PORTS_MAX (1 << 6) 935 930 936 931 #ifdef CONFIG_SYSCTL 937 932 ··· 959 952 static inline int sysctl_sync_ver(struct netns_ipvs *ipvs) 960 953 { 961 954 return ipvs->sysctl_sync_ver; 955 + } 956 + 957 + static inline int sysctl_sync_ports(struct netns_ipvs *ipvs) 958 + { 959 + return ACCESS_ONCE(ipvs->sysctl_sync_ports); 962 960 } 963 961 964 962 static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs) ··· 1001 989 static inline int sysctl_sync_ver(struct netns_ipvs *ipvs) 1002 990 { 1003 991 return DEFAULT_SYNC_VER; 992 + } 993 + 994 + static inline int sysctl_sync_ports(struct netns_ipvs *ipvs) 995 + { 996 + return 1; 1004 997 } 1005 998 1006 999 static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs) ··· 1257 1240 extern struct ip_vs_stats ip_vs_stats; 1258 1241 extern int sysctl_ip_vs_sync_ver; 1259 1242 1260 - extern void ip_vs_sync_switch_mode(struct net *net, int mode); 1261 1243 extern struct ip_vs_service * 1262 1244 ip_vs_service_get(struct net *net, int af, __u32 fwmark, __u16 protocol, 1263 1245 const union nf_inet_addr *vaddr, __be16 vport);
+7
net/netfilter/ipvs/ip_vs_conn.c
··· 619 619 if (dest) { 620 620 struct ip_vs_proto_data *pd; 621 621 622 + spin_lock(&cp->lock); 623 + if (cp->dest) { 624 + spin_unlock(&cp->lock); 625 + return dest; 626 + } 627 + 622 628 /* Applications work depending on the forwarding method 623 629 * but better to reassign them always when binding dest */ 624 630 if (cp->app) 625 631 ip_vs_unbind_app(cp); 626 632 627 633 ip_vs_bind_dest(cp, dest); 634 + spin_unlock(&cp->lock); 628 635 629 636 /* Update its packet transmitter */ 630 637 cp->packet_xmit = NULL;
+26 -3
net/netfilter/ipvs/ip_vs_ctl.c
··· 1657 1657 if ((*valp < 0) || (*valp > 1)) { 1658 1658 /* Restore the correct value */ 1659 1659 *valp = val; 1660 - } else { 1661 - struct net *net = current->nsproxy->net_ns; 1662 - ip_vs_sync_switch_mode(net, val); 1660 + } 1661 + } 1662 + return rc; 1663 + } 1664 + 1665 + static int 1666 + proc_do_sync_ports(ctl_table *table, int write, 1667 + void __user *buffer, size_t *lenp, loff_t *ppos) 1668 + { 1669 + int *valp = table->data; 1670 + int val = *valp; 1671 + int rc; 1672 + 1673 + rc = proc_dointvec(table, write, buffer, lenp, ppos); 1674 + if (write && (*valp != val)) { 1675 + if (*valp < 1 || !is_power_of_2(*valp)) { 1676 + /* Restore the correct value */ 1677 + *valp = val; 1663 1678 } 1664 1679 } 1665 1680 return rc; ··· 1736 1721 .maxlen = sizeof(int), 1737 1722 .mode = 0644, 1738 1723 .proc_handler = &proc_do_sync_mode, 1724 + }, 1725 + { 1726 + .procname = "sync_ports", 1727 + .maxlen = sizeof(int), 1728 + .mode = 0644, 1729 + .proc_handler = &proc_do_sync_ports, 1739 1730 }, 1740 1731 { 1741 1732 .procname = "sync_qlen_max", ··· 3707 3686 tbl[idx++].data = &ipvs->sysctl_snat_reroute; 3708 3687 ipvs->sysctl_sync_ver = 1; 3709 3688 tbl[idx++].data = &ipvs->sysctl_sync_ver; 3689 + ipvs->sysctl_sync_ports = 1; 3690 + tbl[idx++].data = &ipvs->sysctl_sync_ports; 3710 3691 ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32; 3711 3692 tbl[idx++].data = &ipvs->sysctl_sync_qlen_max; 3712 3693 ipvs->sysctl_sync_sock_size = 0;
+247 -154
net/netfilter/ipvs/ip_vs_sync.c
··· 196 196 struct net *net; 197 197 struct socket *sock; 198 198 char *buf; 199 + int id; 199 200 }; 200 201 201 202 /* Version 0 definition of packet sizes */ ··· 272 271 unsigned char *end; 273 272 }; 274 273 275 - /* multicast addr */ 276 - static struct sockaddr_in mcast_addr = { 277 - .sin_family = AF_INET, 278 - .sin_port = cpu_to_be16(IP_VS_SYNC_PORT), 279 - .sin_addr.s_addr = cpu_to_be32(IP_VS_SYNC_GROUP), 280 - }; 281 - 282 274 /* 283 275 * Copy of struct ip_vs_seq 284 276 * From unaligned network order to aligned host order ··· 294 300 put_unaligned_be32(ho->previous_delta, &no->previous_delta); 295 301 } 296 302 297 - static inline struct ip_vs_sync_buff *sb_dequeue(struct netns_ipvs *ipvs) 303 + static inline struct ip_vs_sync_buff * 304 + sb_dequeue(struct netns_ipvs *ipvs, struct ipvs_master_sync_state *ms) 298 305 { 299 306 struct ip_vs_sync_buff *sb; 300 307 301 308 spin_lock_bh(&ipvs->sync_lock); 302 - if (list_empty(&ipvs->sync_queue)) { 309 + if (list_empty(&ms->sync_queue)) { 303 310 sb = NULL; 304 311 __set_current_state(TASK_INTERRUPTIBLE); 305 312 } else { 306 - sb = list_entry(ipvs->sync_queue.next, 307 - struct ip_vs_sync_buff, 313 + sb = list_entry(ms->sync_queue.next, struct ip_vs_sync_buff, 308 314 list); 309 315 list_del(&sb->list); 310 - ipvs->sync_queue_len--; 311 - if (!ipvs->sync_queue_len) 312 - ipvs->sync_queue_delay = 0; 316 + ms->sync_queue_len--; 317 + if (!ms->sync_queue_len) 318 + ms->sync_queue_delay = 0; 313 319 } 314 320 spin_unlock_bh(&ipvs->sync_lock); 315 321 ··· 332 338 kfree(sb); 333 339 return NULL; 334 340 } 335 - sb->mesg->reserved = 0; /* old nr_conns i.e. must be zeo now */ 341 + sb->mesg->reserved = 0; /* old nr_conns i.e. must be zero now */ 336 342 sb->mesg->version = SYNC_PROTO_VER; 337 343 sb->mesg->syncid = ipvs->master_syncid; 338 344 sb->mesg->size = sizeof(struct ip_vs_sync_mesg); ··· 351 357 kfree(sb); 352 358 } 353 359 354 - static inline void sb_queue_tail(struct netns_ipvs *ipvs) 360 + static inline void sb_queue_tail(struct netns_ipvs *ipvs, 361 + struct ipvs_master_sync_state *ms) 355 362 { 356 - struct ip_vs_sync_buff *sb = ipvs->sync_buff; 363 + struct ip_vs_sync_buff *sb = ms->sync_buff; 357 364 358 365 spin_lock(&ipvs->sync_lock); 359 366 if (ipvs->sync_state & IP_VS_STATE_MASTER && 360 - ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) { 361 - if (!ipvs->sync_queue_len) 362 - schedule_delayed_work(&ipvs->master_wakeup_work, 367 + ms->sync_queue_len < sysctl_sync_qlen_max(ipvs)) { 368 + if (!ms->sync_queue_len) 369 + schedule_delayed_work(&ms->master_wakeup_work, 363 370 max(IPVS_SYNC_SEND_DELAY, 1)); 364 - ipvs->sync_queue_len++; 365 - list_add_tail(&sb->list, &ipvs->sync_queue); 366 - if ((++ipvs->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE) 367 - wake_up_process(ipvs->master_thread); 371 + ms->sync_queue_len++; 372 + list_add_tail(&sb->list, &ms->sync_queue); 373 + if ((++ms->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE) 374 + wake_up_process(ms->master_thread); 368 375 } else 369 376 ip_vs_sync_buff_release(sb); 370 377 spin_unlock(&ipvs->sync_lock); ··· 376 381 * than the specified time or the specified time is zero. 377 382 */ 378 383 static inline struct ip_vs_sync_buff * 379 - get_curr_sync_buff(struct netns_ipvs *ipvs, unsigned long time) 384 + get_curr_sync_buff(struct netns_ipvs *ipvs, struct ipvs_master_sync_state *ms, 385 + unsigned long time) 380 386 { 381 387 struct ip_vs_sync_buff *sb; 382 388 383 389 spin_lock_bh(&ipvs->sync_buff_lock); 384 - if (ipvs->sync_buff && 385 - time_after_eq(jiffies - ipvs->sync_buff->firstuse, time)) { 386 - sb = ipvs->sync_buff; 387 - ipvs->sync_buff = NULL; 390 + sb = ms->sync_buff; 391 + if (sb && time_after_eq(jiffies - sb->firstuse, time)) { 392 + ms->sync_buff = NULL; 388 393 __set_current_state(TASK_RUNNING); 389 394 } else 390 395 sb = NULL; ··· 392 397 return sb; 393 398 } 394 399 395 - /* 396 - * Switch mode from sending version 0 or 1 397 - * - must handle sync_buf 398 - */ 399 - void ip_vs_sync_switch_mode(struct net *net, int mode) 400 + static inline int 401 + select_master_thread_id(struct netns_ipvs *ipvs, struct ip_vs_conn *cp) 400 402 { 401 - struct netns_ipvs *ipvs = net_ipvs(net); 402 - struct ip_vs_sync_buff *sb; 403 - 404 - spin_lock_bh(&ipvs->sync_buff_lock); 405 - if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) 406 - goto unlock; 407 - sb = ipvs->sync_buff; 408 - if (mode == sysctl_sync_ver(ipvs) || !sb) 409 - goto unlock; 410 - 411 - /* Buffer empty ? then let buf_create do the job */ 412 - if (sb->mesg->size <= sizeof(struct ip_vs_sync_mesg)) { 413 - ip_vs_sync_buff_release(sb); 414 - ipvs->sync_buff = NULL; 415 - } else 416 - sb_queue_tail(ipvs); 417 - 418 - unlock: 419 - spin_unlock_bh(&ipvs->sync_buff_lock); 403 + return ((long) cp >> (1 + ilog2(sizeof(*cp)))) & ipvs->threads_mask; 420 404 } 421 405 422 406 /* ··· 517 543 struct netns_ipvs *ipvs = net_ipvs(net); 518 544 struct ip_vs_sync_mesg_v0 *m; 519 545 struct ip_vs_sync_conn_v0 *s; 546 + struct ip_vs_sync_buff *buff; 547 + struct ipvs_master_sync_state *ms; 548 + int id; 520 549 int len; 521 550 522 551 if (unlikely(cp->af != AF_INET)) ··· 532 555 return; 533 556 534 557 spin_lock(&ipvs->sync_buff_lock); 535 - if (!ipvs->sync_buff) { 536 - ipvs->sync_buff = 537 - ip_vs_sync_buff_create_v0(ipvs); 538 - if (!ipvs->sync_buff) { 558 + if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) { 559 + spin_unlock(&ipvs->sync_buff_lock); 560 + return; 561 + } 562 + 563 + id = select_master_thread_id(ipvs, cp); 564 + ms = &ipvs->ms[id]; 565 + buff = ms->sync_buff; 566 + if (buff) { 567 + m = (struct ip_vs_sync_mesg_v0 *) buff->mesg; 568 + /* Send buffer if it is for v1 */ 569 + if (!m->nr_conns) { 570 + sb_queue_tail(ipvs, ms); 571 + ms->sync_buff = NULL; 572 + buff = NULL; 573 + } 574 + } 575 + if (!buff) { 576 + buff = ip_vs_sync_buff_create_v0(ipvs); 577 + if (!buff) { 539 578 spin_unlock(&ipvs->sync_buff_lock); 540 579 pr_err("ip_vs_sync_buff_create failed.\n"); 541 580 return; 542 581 } 582 + ms->sync_buff = buff; 543 583 } 544 584 545 585 len = (cp->flags & IP_VS_CONN_F_SEQ_MASK) ? FULL_CONN_SIZE : 546 586 SIMPLE_CONN_SIZE; 547 - m = (struct ip_vs_sync_mesg_v0 *)ipvs->sync_buff->mesg; 548 - s = (struct ip_vs_sync_conn_v0 *)ipvs->sync_buff->head; 587 + m = (struct ip_vs_sync_mesg_v0 *) buff->mesg; 588 + s = (struct ip_vs_sync_conn_v0 *) buff->head; 549 589 550 590 /* copy members */ 551 591 s->reserved = 0; ··· 583 589 584 590 m->nr_conns++; 585 591 m->size += len; 586 - ipvs->sync_buff->head += len; 592 + buff->head += len; 587 593 588 594 /* check if there is a space for next one */ 589 - if (ipvs->sync_buff->head + FULL_CONN_SIZE > ipvs->sync_buff->end) { 590 - sb_queue_tail(ipvs); 591 - ipvs->sync_buff = NULL; 595 + if (buff->head + FULL_CONN_SIZE > buff->end) { 596 + sb_queue_tail(ipvs, ms); 597 + ms->sync_buff = NULL; 592 598 } 593 599 spin_unlock(&ipvs->sync_buff_lock); 594 600 ··· 613 619 struct netns_ipvs *ipvs = net_ipvs(net); 614 620 struct ip_vs_sync_mesg *m; 615 621 union ip_vs_sync_conn *s; 622 + struct ip_vs_sync_buff *buff; 623 + struct ipvs_master_sync_state *ms; 624 + int id; 616 625 __u8 *p; 617 626 unsigned int len, pe_name_len, pad; 618 627 ··· 642 645 } 643 646 644 647 spin_lock(&ipvs->sync_buff_lock); 648 + if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) { 649 + spin_unlock(&ipvs->sync_buff_lock); 650 + return; 651 + } 652 + 653 + id = select_master_thread_id(ipvs, cp); 654 + ms = &ipvs->ms[id]; 645 655 646 656 #ifdef CONFIG_IP_VS_IPV6 647 657 if (cp->af == AF_INET6) ··· 667 663 668 664 /* check if there is a space for this one */ 669 665 pad = 0; 670 - if (ipvs->sync_buff) { 671 - pad = (4 - (size_t)ipvs->sync_buff->head) & 3; 672 - if (ipvs->sync_buff->head + len + pad > ipvs->sync_buff->end) { 673 - sb_queue_tail(ipvs); 674 - ipvs->sync_buff = NULL; 666 + buff = ms->sync_buff; 667 + if (buff) { 668 + m = buff->mesg; 669 + pad = (4 - (size_t) buff->head) & 3; 670 + /* Send buffer if it is for v0 */ 671 + if (buff->head + len + pad > buff->end || m->reserved) { 672 + sb_queue_tail(ipvs, ms); 673 + ms->sync_buff = NULL; 674 + buff = NULL; 675 675 pad = 0; 676 676 } 677 677 } 678 678 679 - if (!ipvs->sync_buff) { 680 - ipvs->sync_buff = ip_vs_sync_buff_create(ipvs); 681 - if (!ipvs->sync_buff) { 679 + if (!buff) { 680 + buff = ip_vs_sync_buff_create(ipvs); 681 + if (!buff) { 682 682 spin_unlock(&ipvs->sync_buff_lock); 683 683 pr_err("ip_vs_sync_buff_create failed.\n"); 684 684 return; 685 685 } 686 + ms->sync_buff = buff; 687 + m = buff->mesg; 686 688 } 687 689 688 - m = ipvs->sync_buff->mesg; 689 - p = ipvs->sync_buff->head; 690 - ipvs->sync_buff->head += pad + len; 690 + p = buff->head; 691 + buff->head += pad + len; 691 692 m->size += pad + len; 692 693 /* Add ev. padding from prev. sync_conn */ 693 694 while (pad--) ··· 843 834 kfree(param->pe_data); 844 835 845 836 dest = cp->dest; 837 + spin_lock(&cp->lock); 846 838 if ((cp->flags ^ flags) & IP_VS_CONN_F_INACTIVE && 847 839 !(flags & IP_VS_CONN_F_TEMPLATE) && dest) { 848 840 if (flags & IP_VS_CONN_F_INACTIVE) { ··· 857 847 flags &= IP_VS_CONN_F_BACKUP_UPD_MASK; 858 848 flags |= cp->flags & ~IP_VS_CONN_F_BACKUP_UPD_MASK; 859 849 cp->flags = flags; 850 + spin_unlock(&cp->lock); 860 851 if (!dest) { 861 852 dest = ip_vs_try_bind_dest(cp); 862 853 if (dest) ··· 1410 1399 /* 1411 1400 * Set up sending multicast socket over UDP 1412 1401 */ 1413 - static struct socket *make_send_sock(struct net *net) 1402 + static struct socket *make_send_sock(struct net *net, int id) 1414 1403 { 1415 1404 struct netns_ipvs *ipvs = net_ipvs(net); 1405 + /* multicast addr */ 1406 + struct sockaddr_in mcast_addr = { 1407 + .sin_family = AF_INET, 1408 + .sin_port = cpu_to_be16(IP_VS_SYNC_PORT + id), 1409 + .sin_addr.s_addr = cpu_to_be32(IP_VS_SYNC_GROUP), 1410 + }; 1416 1411 struct socket *sock; 1417 1412 int result; 1418 1413 ··· 1470 1453 /* 1471 1454 * Set up receiving multicast socket over UDP 1472 1455 */ 1473 - static struct socket *make_receive_sock(struct net *net) 1456 + static struct socket *make_receive_sock(struct net *net, int id) 1474 1457 { 1475 1458 struct netns_ipvs *ipvs = net_ipvs(net); 1459 + /* multicast addr */ 1460 + struct sockaddr_in mcast_addr = { 1461 + .sin_family = AF_INET, 1462 + .sin_port = cpu_to_be16(IP_VS_SYNC_PORT + id), 1463 + .sin_addr.s_addr = cpu_to_be32(IP_VS_SYNC_GROUP), 1464 + }; 1476 1465 struct socket *sock; 1477 1466 int result; 1478 1467 ··· 1572 1549 iov.iov_base = buffer; 1573 1550 iov.iov_len = (size_t)buflen; 1574 1551 1575 - len = kernel_recvmsg(sock, &msg, &iov, 1, buflen, 0); 1552 + len = kernel_recvmsg(sock, &msg, &iov, 1, buflen, MSG_DONTWAIT); 1576 1553 1577 1554 if (len < 0) 1578 - return -1; 1555 + return len; 1579 1556 1580 1557 LeaveFunction(7); 1581 1558 return len; ··· 1584 1561 /* Wakeup the master thread for sending */ 1585 1562 static void master_wakeup_work_handler(struct work_struct *work) 1586 1563 { 1587 - struct netns_ipvs *ipvs = container_of(work, struct netns_ipvs, 1588 - master_wakeup_work.work); 1564 + struct ipvs_master_sync_state *ms = 1565 + container_of(work, struct ipvs_master_sync_state, 1566 + master_wakeup_work.work); 1567 + struct netns_ipvs *ipvs = ms->ipvs; 1589 1568 1590 1569 spin_lock_bh(&ipvs->sync_lock); 1591 - if (ipvs->sync_queue_len && 1592 - ipvs->sync_queue_delay < IPVS_SYNC_WAKEUP_RATE) { 1593 - ipvs->sync_queue_delay = IPVS_SYNC_WAKEUP_RATE; 1594 - wake_up_process(ipvs->master_thread); 1570 + if (ms->sync_queue_len && 1571 + ms->sync_queue_delay < IPVS_SYNC_WAKEUP_RATE) { 1572 + ms->sync_queue_delay = IPVS_SYNC_WAKEUP_RATE; 1573 + wake_up_process(ms->master_thread); 1595 1574 } 1596 1575 spin_unlock_bh(&ipvs->sync_lock); 1597 1576 } 1598 1577 1599 1578 /* Get next buffer to send */ 1600 1579 static inline struct ip_vs_sync_buff * 1601 - next_sync_buff(struct netns_ipvs *ipvs) 1580 + next_sync_buff(struct netns_ipvs *ipvs, struct ipvs_master_sync_state *ms) 1602 1581 { 1603 1582 struct ip_vs_sync_buff *sb; 1604 1583 1605 - sb = sb_dequeue(ipvs); 1584 + sb = sb_dequeue(ipvs, ms); 1606 1585 if (sb) 1607 1586 return sb; 1608 1587 /* Do not delay entries in buffer for more than 2 seconds */ 1609 - return get_curr_sync_buff(ipvs, IPVS_SYNC_FLUSH_TIME); 1588 + return get_curr_sync_buff(ipvs, ms, IPVS_SYNC_FLUSH_TIME); 1610 1589 } 1611 1590 1612 1591 static int sync_thread_master(void *data) 1613 1592 { 1614 1593 struct ip_vs_sync_thread_data *tinfo = data; 1615 1594 struct netns_ipvs *ipvs = net_ipvs(tinfo->net); 1595 + struct ipvs_master_sync_state *ms = &ipvs->ms[tinfo->id]; 1616 1596 struct sock *sk = tinfo->sock->sk; 1617 1597 struct ip_vs_sync_buff *sb; 1618 1598 1619 1599 pr_info("sync thread started: state = MASTER, mcast_ifn = %s, " 1620 - "syncid = %d\n", 1621 - ipvs->master_mcast_ifn, ipvs->master_syncid); 1600 + "syncid = %d, id = %d\n", 1601 + ipvs->master_mcast_ifn, ipvs->master_syncid, tinfo->id); 1622 1602 1623 1603 for (;;) { 1624 - sb = next_sync_buff(ipvs); 1604 + sb = next_sync_buff(ipvs, ms); 1625 1605 if (unlikely(kthread_should_stop())) 1626 1606 break; 1627 1607 if (!sb) { ··· 1650 1624 ip_vs_sync_buff_release(sb); 1651 1625 1652 1626 /* clean up the sync_buff queue */ 1653 - while ((sb = sb_dequeue(ipvs))) 1627 + while ((sb = sb_dequeue(ipvs, ms))) 1654 1628 ip_vs_sync_buff_release(sb); 1655 1629 __set_current_state(TASK_RUNNING); 1656 1630 1657 1631 /* clean up the current sync_buff */ 1658 - sb = get_curr_sync_buff(ipvs, 0); 1632 + sb = get_curr_sync_buff(ipvs, ms, 0); 1659 1633 if (sb) 1660 1634 ip_vs_sync_buff_release(sb); 1661 1635 ··· 1674 1648 int len; 1675 1649 1676 1650 pr_info("sync thread started: state = BACKUP, mcast_ifn = %s, " 1677 - "syncid = %d\n", 1678 - ipvs->backup_mcast_ifn, ipvs->backup_syncid); 1651 + "syncid = %d, id = %d\n", 1652 + ipvs->backup_mcast_ifn, ipvs->backup_syncid, tinfo->id); 1679 1653 1680 1654 while (!kthread_should_stop()) { 1681 1655 wait_event_interruptible(*sk_sleep(tinfo->sock->sk), ··· 1687 1661 len = ip_vs_receive(tinfo->sock, tinfo->buf, 1688 1662 ipvs->recv_mesg_maxlen); 1689 1663 if (len <= 0) { 1690 - pr_err("receiving message error\n"); 1664 + if (len != -EAGAIN) 1665 + pr_err("receiving message error\n"); 1691 1666 break; 1692 1667 } 1693 1668 ··· 1712 1685 int start_sync_thread(struct net *net, int state, char *mcast_ifn, __u8 syncid) 1713 1686 { 1714 1687 struct ip_vs_sync_thread_data *tinfo; 1715 - struct task_struct **realtask, *task; 1688 + struct task_struct **array = NULL, *task; 1716 1689 struct socket *sock; 1717 1690 struct netns_ipvs *ipvs = net_ipvs(net); 1718 - char *name, *buf = NULL; 1691 + char *name; 1719 1692 int (*threadfn)(void *data); 1693 + int id, count; 1720 1694 int result = -ENOMEM; 1721 1695 1722 1696 IP_VS_DBG(7, "%s(): pid %d\n", __func__, task_pid_nr(current)); 1723 1697 IP_VS_DBG(7, "Each ip_vs_sync_conn entry needs %Zd bytes\n", 1724 1698 sizeof(struct ip_vs_sync_conn_v0)); 1725 1699 1700 + if (!ipvs->sync_state) { 1701 + count = clamp(sysctl_sync_ports(ipvs), 1, IPVS_SYNC_PORTS_MAX); 1702 + ipvs->threads_mask = count - 1; 1703 + } else 1704 + count = ipvs->threads_mask + 1; 1726 1705 1727 1706 if (state == IP_VS_STATE_MASTER) { 1728 - if (ipvs->master_thread) 1707 + if (ipvs->ms) 1729 1708 return -EEXIST; 1730 1709 1731 1710 strlcpy(ipvs->master_mcast_ifn, mcast_ifn, 1732 1711 sizeof(ipvs->master_mcast_ifn)); 1733 1712 ipvs->master_syncid = syncid; 1734 - realtask = &ipvs->master_thread; 1735 - name = "ipvs_master:%d"; 1713 + name = "ipvs-m:%d:%d"; 1736 1714 threadfn = sync_thread_master; 1737 - ipvs->sync_queue_len = 0; 1738 - ipvs->sync_queue_delay = 0; 1739 - INIT_DELAYED_WORK(&ipvs->master_wakeup_work, 1740 - master_wakeup_work_handler); 1741 - sock = make_send_sock(net); 1742 1715 } else if (state == IP_VS_STATE_BACKUP) { 1743 - if (ipvs->backup_thread) 1716 + if (ipvs->backup_threads) 1744 1717 return -EEXIST; 1745 1718 1746 1719 strlcpy(ipvs->backup_mcast_ifn, mcast_ifn, 1747 1720 sizeof(ipvs->backup_mcast_ifn)); 1748 1721 ipvs->backup_syncid = syncid; 1749 - realtask = &ipvs->backup_thread; 1750 - name = "ipvs_backup:%d"; 1722 + name = "ipvs-b:%d:%d"; 1751 1723 threadfn = sync_thread_backup; 1752 - sock = make_receive_sock(net); 1753 1724 } else { 1754 1725 return -EINVAL; 1755 1726 } 1756 1727 1757 - if (IS_ERR(sock)) { 1758 - result = PTR_ERR(sock); 1759 - goto out; 1760 - } 1728 + if (state == IP_VS_STATE_MASTER) { 1729 + struct ipvs_master_sync_state *ms; 1761 1730 1731 + ipvs->ms = kzalloc(count * sizeof(ipvs->ms[0]), GFP_KERNEL); 1732 + if (!ipvs->ms) 1733 + goto out; 1734 + ms = ipvs->ms; 1735 + for (id = 0; id < count; id++, ms++) { 1736 + INIT_LIST_HEAD(&ms->sync_queue); 1737 + ms->sync_queue_len = 0; 1738 + ms->sync_queue_delay = 0; 1739 + INIT_DELAYED_WORK(&ms->master_wakeup_work, 1740 + master_wakeup_work_handler); 1741 + ms->ipvs = ipvs; 1742 + } 1743 + } else { 1744 + array = kzalloc(count * sizeof(struct task_struct *), 1745 + GFP_KERNEL); 1746 + if (!array) 1747 + goto out; 1748 + } 1762 1749 set_sync_mesg_maxlen(net, state); 1763 - if (state == IP_VS_STATE_BACKUP) { 1764 - buf = kmalloc(ipvs->recv_mesg_maxlen, GFP_KERNEL); 1765 - if (!buf) 1750 + 1751 + tinfo = NULL; 1752 + for (id = 0; id < count; id++) { 1753 + if (state == IP_VS_STATE_MASTER) 1754 + sock = make_send_sock(net, id); 1755 + else 1756 + sock = make_receive_sock(net, id); 1757 + if (IS_ERR(sock)) { 1758 + result = PTR_ERR(sock); 1759 + goto outtinfo; 1760 + } 1761 + tinfo = kmalloc(sizeof(*tinfo), GFP_KERNEL); 1762 + if (!tinfo) 1766 1763 goto outsocket; 1767 - } 1764 + tinfo->net = net; 1765 + tinfo->sock = sock; 1766 + if (state == IP_VS_STATE_BACKUP) { 1767 + tinfo->buf = kmalloc(ipvs->recv_mesg_maxlen, 1768 + GFP_KERNEL); 1769 + if (!tinfo->buf) 1770 + goto outtinfo; 1771 + } 1772 + tinfo->id = id; 1768 1773 1769 - tinfo = kmalloc(sizeof(*tinfo), GFP_KERNEL); 1770 - if (!tinfo) 1771 - goto outbuf; 1772 - 1773 - tinfo->net = net; 1774 - tinfo->sock = sock; 1775 - tinfo->buf = buf; 1776 - 1777 - task = kthread_run(threadfn, tinfo, name, ipvs->gen); 1778 - if (IS_ERR(task)) { 1779 - result = PTR_ERR(task); 1780 - goto outtinfo; 1774 + task = kthread_run(threadfn, tinfo, name, ipvs->gen, id); 1775 + if (IS_ERR(task)) { 1776 + result = PTR_ERR(task); 1777 + goto outtinfo; 1778 + } 1779 + tinfo = NULL; 1780 + if (state == IP_VS_STATE_MASTER) 1781 + ipvs->ms[id].master_thread = task; 1782 + else 1783 + array[id] = task; 1781 1784 } 1782 1785 1783 1786 /* mark as active */ 1784 - *realtask = task; 1787 + 1788 + if (state == IP_VS_STATE_BACKUP) 1789 + ipvs->backup_threads = array; 1790 + spin_lock_bh(&ipvs->sync_buff_lock); 1785 1791 ipvs->sync_state |= state; 1792 + spin_unlock_bh(&ipvs->sync_buff_lock); 1786 1793 1787 1794 /* increase the module use count */ 1788 1795 ip_vs_use_count_inc(); 1789 1796 1790 1797 return 0; 1791 1798 1792 - outtinfo: 1793 - kfree(tinfo); 1794 - outbuf: 1795 - kfree(buf); 1796 1799 outsocket: 1797 1800 sk_release_kernel(sock->sk); 1801 + 1802 + outtinfo: 1803 + if (tinfo) { 1804 + sk_release_kernel(tinfo->sock->sk); 1805 + kfree(tinfo->buf); 1806 + kfree(tinfo); 1807 + } 1808 + count = id; 1809 + while (count-- > 0) { 1810 + if (state == IP_VS_STATE_MASTER) 1811 + kthread_stop(ipvs->ms[count].master_thread); 1812 + else 1813 + kthread_stop(array[count]); 1814 + } 1815 + kfree(array); 1816 + 1798 1817 out: 1818 + if (!(ipvs->sync_state & IP_VS_STATE_MASTER)) { 1819 + kfree(ipvs->ms); 1820 + ipvs->ms = NULL; 1821 + } 1799 1822 return result; 1800 1823 } 1801 1824 ··· 1853 1776 int stop_sync_thread(struct net *net, int state) 1854 1777 { 1855 1778 struct netns_ipvs *ipvs = net_ipvs(net); 1779 + struct task_struct **array; 1780 + int id; 1856 1781 int retc = -EINVAL; 1857 1782 1858 1783 IP_VS_DBG(7, "%s(): pid %d\n", __func__, task_pid_nr(current)); 1859 1784 1860 1785 if (state == IP_VS_STATE_MASTER) { 1861 - if (!ipvs->master_thread) 1786 + if (!ipvs->ms) 1862 1787 return -ESRCH; 1863 - 1864 - pr_info("stopping master sync thread %d ...\n", 1865 - task_pid_nr(ipvs->master_thread)); 1866 1788 1867 1789 /* 1868 1790 * The lock synchronizes with sb_queue_tail(), so that we don't ··· 1869 1793 * progress of stopping the master sync daemon. 1870 1794 */ 1871 1795 1872 - spin_lock_bh(&ipvs->sync_lock); 1796 + spin_lock_bh(&ipvs->sync_buff_lock); 1797 + spin_lock(&ipvs->sync_lock); 1873 1798 ipvs->sync_state &= ~IP_VS_STATE_MASTER; 1874 - spin_unlock_bh(&ipvs->sync_lock); 1875 - cancel_delayed_work_sync(&ipvs->master_wakeup_work); 1876 - retc = kthread_stop(ipvs->master_thread); 1877 - ipvs->master_thread = NULL; 1799 + spin_unlock(&ipvs->sync_lock); 1800 + spin_unlock_bh(&ipvs->sync_buff_lock); 1801 + 1802 + retc = 0; 1803 + for (id = ipvs->threads_mask; id >= 0; id--) { 1804 + struct ipvs_master_sync_state *ms = &ipvs->ms[id]; 1805 + int ret; 1806 + 1807 + pr_info("stopping master sync thread %d ...\n", 1808 + task_pid_nr(ms->master_thread)); 1809 + cancel_delayed_work_sync(&ms->master_wakeup_work); 1810 + ret = kthread_stop(ms->master_thread); 1811 + if (retc >= 0) 1812 + retc = ret; 1813 + } 1814 + kfree(ipvs->ms); 1815 + ipvs->ms = NULL; 1878 1816 } else if (state == IP_VS_STATE_BACKUP) { 1879 - if (!ipvs->backup_thread) 1817 + if (!ipvs->backup_threads) 1880 1818 return -ESRCH; 1881 1819 1882 - pr_info("stopping backup sync thread %d ...\n", 1883 - task_pid_nr(ipvs->backup_thread)); 1884 - 1885 1820 ipvs->sync_state &= ~IP_VS_STATE_BACKUP; 1886 - retc = kthread_stop(ipvs->backup_thread); 1887 - ipvs->backup_thread = NULL; 1821 + array = ipvs->backup_threads; 1822 + retc = 0; 1823 + for (id = ipvs->threads_mask; id >= 0; id--) { 1824 + int ret; 1825 + 1826 + pr_info("stopping backup sync thread %d ...\n", 1827 + task_pid_nr(array[id])); 1828 + ret = kthread_stop(array[id]); 1829 + if (retc >= 0) 1830 + retc = ret; 1831 + } 1832 + kfree(array); 1833 + ipvs->backup_threads = NULL; 1888 1834 } 1889 1835 1890 1836 /* decrease the module use count */ ··· 1923 1825 struct netns_ipvs *ipvs = net_ipvs(net); 1924 1826 1925 1827 __mutex_init(&ipvs->sync_mutex, "ipvs->sync_mutex", &__ipvs_sync_key); 1926 - INIT_LIST_HEAD(&ipvs->sync_queue); 1927 1828 spin_lock_init(&ipvs->sync_lock); 1928 1829 spin_lock_init(&ipvs->sync_buff_lock); 1929 - 1930 - ipvs->sync_mcast_addr.sin_family = AF_INET; 1931 - ipvs->sync_mcast_addr.sin_port = cpu_to_be16(IP_VS_SYNC_PORT); 1932 - ipvs->sync_mcast_addr.sin_addr.s_addr = cpu_to_be32(IP_VS_SYNC_GROUP); 1933 1830 return 0; 1934 1831 } 1935 1832