Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

tipc: clean up socket layer message reception

When a message is received in a socket, one of the call chains
tipc_sk_rcv()->tipc_sk_enqueue()->filter_rcv()(->tipc_sk_proto_rcv())
or
tipc_sk_backlog_rcv()->filter_rcv()(->tipc_sk_proto_rcv())
are followed. At each of these levels we may encounter situations
where the message may need to be rejected, or a new message
produced for transfer back to the sender. Despite recent
improvements, the current code for doing this is perceived
as awkward and hard to follow.

Leveraging the two previous commits in this series, we now
introduce a more uniform handling of such situations. We
let each of the functions in the chain itself produce/reverse
the message to be returned to the sender, but also perform the
actual forwarding. This simplifies the necessary logics within
each function.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Jon Paul Maloy and committed by
David S. Miller
cda3696d bcd3ffd4

+134 -146
+9 -11
net/tipc/msg.c
··· 520 520 /** 521 521 * tipc_msg_lookup_dest(): try to find new destination for named message 522 522 * @skb: the buffer containing the message. 523 - * @dnode: return value: next-hop node, if destination found 524 - * @err: return value: error code to use, if message to be rejected 523 + * @err: error code to be used by caller if lookup fails 525 524 * Does not consume buffer 526 525 * Returns true if a destination is found, false otherwise 527 526 */ 528 - bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, 529 - u32 *dnode, int *err) 527 + bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err) 530 528 { 531 529 struct tipc_msg *msg = buf_msg(skb); 532 - u32 dport; 533 - u32 own_addr = tipc_own_addr(net); 530 + u32 dport, dnode; 531 + u32 onode = tipc_own_addr(net); 534 532 535 533 if (!msg_isdata(msg)) 536 534 return false; ··· 541 543 return false; 542 544 if (msg_reroute_cnt(msg)) 543 545 return false; 544 - *dnode = addr_domain(net, msg_lookup_scope(msg)); 546 + dnode = addr_domain(net, msg_lookup_scope(msg)); 545 547 dport = tipc_nametbl_translate(net, msg_nametype(msg), 546 - msg_nameinst(msg), dnode); 548 + msg_nameinst(msg), &dnode); 547 549 if (!dport) 548 550 return false; 549 551 msg_incr_reroute_cnt(msg); 550 - if (*dnode != own_addr) 551 - msg_set_prevnode(msg, own_addr); 552 - msg_set_destnode(msg, *dnode); 552 + if (dnode != onode) 553 + msg_set_prevnode(msg, onode); 554 + msg_set_destnode(msg, dnode); 553 555 msg_set_destport(msg, dport); 554 556 *err = TIPC_OK; 555 557 return true;
+1 -2
net/tipc/msg.h
··· 798 798 bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); 799 799 int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 800 800 int offset, int dsz, int mtu, struct sk_buff_head *list); 801 - bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode, 802 - int *err); 801 + bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 803 802 struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 804 803 805 804 static inline u16 buf_seqno(struct sk_buff *skb)
+123 -132
net/tipc/socket.c
··· 1520 1520 * @tsk: TIPC socket 1521 1521 * @skb: pointer to message buffer. Set to NULL if buffer is consumed 1522 1522 * 1523 - * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise 1523 + * Returns true if everything ok, false otherwise 1524 1524 */ 1525 - static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb) 1525 + static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) 1526 1526 { 1527 1527 struct sock *sk = &tsk->sk; 1528 1528 struct net *net = sock_net(sk); 1529 1529 struct socket *sock = sk->sk_socket; 1530 - struct tipc_msg *msg = buf_msg(*skb); 1531 - int retval = -TIPC_ERR_NO_PORT; 1530 + struct tipc_msg *hdr = buf_msg(skb); 1532 1531 1533 - if (msg_mcast(msg)) 1534 - return retval; 1532 + if (unlikely(msg_mcast(hdr))) 1533 + return false; 1535 1534 1536 1535 switch ((int)sock->state) { 1537 1536 case SS_CONNECTED: 1537 + 1538 1538 /* Accept only connection-based messages sent by peer */ 1539 - if (tsk_peer_msg(tsk, msg)) { 1540 - if (unlikely(msg_errcode(msg))) { 1541 - sock->state = SS_DISCONNECTING; 1542 - tsk->connected = 0; 1543 - /* let timer expire on it's own */ 1544 - tipc_node_remove_conn(net, tsk_peer_node(tsk), 1545 - tsk->portid); 1546 - } 1547 - retval = TIPC_OK; 1539 + if (unlikely(!tsk_peer_msg(tsk, hdr))) 1540 + return false; 1541 + 1542 + if (unlikely(msg_errcode(hdr))) { 1543 + sock->state = SS_DISCONNECTING; 1544 + tsk->connected = 0; 1545 + /* Let timer expire on it's own */ 1546 + tipc_node_remove_conn(net, tsk_peer_node(tsk), 1547 + tsk->portid); 1548 1548 } 1549 - break; 1549 + return true; 1550 + 1550 1551 case SS_CONNECTING: 1552 + 1551 1553 /* Accept only ACK or NACK message */ 1554 + if (unlikely(!msg_connected(hdr))) 1555 + return false; 1552 1556 1553 - if (unlikely(!msg_connected(msg))) 1554 - break; 1555 - 1556 - if (unlikely(msg_errcode(msg))) { 1557 + if (unlikely(msg_errcode(hdr))) { 1557 1558 sock->state = SS_DISCONNECTING; 1558 1559 sk->sk_err = ECONNREFUSED; 1559 - retval = TIPC_OK; 1560 - break; 1560 + return true; 1561 1561 } 1562 1562 1563 - if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) { 1563 + if (unlikely(!msg_isdata(hdr))) { 1564 1564 sock->state = SS_DISCONNECTING; 1565 1565 sk->sk_err = EINVAL; 1566 - retval = TIPC_OK; 1567 - break; 1566 + return true; 1568 1567 } 1569 1568 1570 - tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg)); 1571 - msg_set_importance(&tsk->phdr, msg_importance(msg)); 1569 + tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr)); 1570 + msg_set_importance(&tsk->phdr, msg_importance(hdr)); 1572 1571 sock->state = SS_CONNECTED; 1573 1572 1574 - /* If an incoming message is an 'ACK-', it should be 1575 - * discarded here because it doesn't contain useful 1576 - * data. In addition, we should try to wake up 1577 - * connect() routine if sleeping. 1578 - */ 1579 - if (msg_data_sz(msg) == 0) { 1580 - kfree_skb(*skb); 1581 - *skb = NULL; 1582 - if (waitqueue_active(sk_sleep(sk))) 1583 - wake_up_interruptible(sk_sleep(sk)); 1584 - } 1585 - retval = TIPC_OK; 1586 - break; 1573 + /* If 'ACK+' message, add to socket receive queue */ 1574 + if (msg_data_sz(hdr)) 1575 + return true; 1576 + 1577 + /* If empty 'ACK-' message, wake up sleeping connect() */ 1578 + if (waitqueue_active(sk_sleep(sk))) 1579 + wake_up_interruptible(sk_sleep(sk)); 1580 + 1581 + /* 'ACK-' message is neither accepted nor rejected: */ 1582 + msg_set_dest_droppable(hdr, 1); 1583 + return false; 1584 + 1587 1585 case SS_LISTENING: 1588 1586 case SS_UNCONNECTED: 1587 + 1589 1588 /* Accept only SYN message */ 1590 - if (!msg_connected(msg) && !(msg_errcode(msg))) 1591 - retval = TIPC_OK; 1589 + if (!msg_connected(hdr) && !(msg_errcode(hdr))) 1590 + return true; 1592 1591 break; 1593 1592 case SS_DISCONNECTING: 1594 1593 break; 1595 1594 default: 1596 1595 pr_err("Unknown socket state %u\n", sock->state); 1597 1596 } 1598 - return retval; 1597 + return false; 1599 1598 } 1600 1599 1601 1600 /** ··· 1629 1630 /** 1630 1631 * filter_rcv - validate incoming message 1631 1632 * @sk: socket 1632 - * @skb: pointer to message. Set to NULL if buffer is consumed. 1633 + * @skb: pointer to message. 1633 1634 * 1634 1635 * Enqueues message on receive queue if acceptable; optionally handles 1635 1636 * disconnect indication for a connected socket. 1636 1637 * 1637 1638 * Called with socket lock already taken 1638 1639 * 1639 - * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected 1640 + * Returns true if message was added to socket receive queue, otherwise false 1640 1641 */ 1641 - static int filter_rcv(struct sock *sk, struct sk_buff **skb) 1642 + static bool filter_rcv(struct sock *sk, struct sk_buff *skb) 1642 1643 { 1643 1644 struct socket *sock = sk->sk_socket; 1644 1645 struct tipc_sock *tsk = tipc_sk(sk); 1645 - struct tipc_msg *msg = buf_msg(*skb); 1646 - unsigned int limit = rcvbuf_limit(sk, *skb); 1647 - int rc = TIPC_OK; 1646 + struct tipc_msg *hdr = buf_msg(skb); 1647 + unsigned int limit = rcvbuf_limit(sk, skb); 1648 + int err = TIPC_OK; 1649 + int usr = msg_user(hdr); 1648 1650 1649 - if (unlikely(msg_user(msg) == CONN_MANAGER)) { 1650 - tipc_sk_proto_rcv(tsk, *skb); 1651 - return TIPC_OK; 1651 + if (unlikely(msg_user(hdr) == CONN_MANAGER)) { 1652 + tipc_sk_proto_rcv(tsk, skb); 1653 + return false; 1652 1654 } 1653 1655 1654 - if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { 1655 - kfree_skb(*skb); 1656 + if (unlikely(usr == SOCK_WAKEUP)) { 1657 + kfree_skb(skb); 1656 1658 tsk->link_cong = 0; 1657 1659 sk->sk_write_space(sk); 1658 - *skb = NULL; 1659 - return TIPC_OK; 1660 + return false; 1660 1661 } 1661 1662 1662 - /* Reject message if it is wrong sort of message for socket */ 1663 - if (msg_type(msg) > TIPC_DIRECT_MSG) 1664 - return -TIPC_ERR_NO_PORT; 1663 + /* Drop if illegal message type */ 1664 + if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { 1665 + kfree_skb(skb); 1666 + return false; 1667 + } 1665 1668 1666 - if (sock->state == SS_READY) { 1667 - if (msg_connected(msg)) 1668 - return -TIPC_ERR_NO_PORT; 1669 - } else { 1670 - rc = filter_connect(tsk, skb); 1671 - if (rc != TIPC_OK || !*skb) 1672 - return rc; 1669 + /* Reject if wrong message type for current socket state */ 1670 + if (unlikely(sock->state == SS_READY)) { 1671 + if (msg_connected(hdr)) { 1672 + err = TIPC_ERR_NO_PORT; 1673 + goto reject; 1674 + } 1675 + } else if (unlikely(!filter_connect(tsk, skb))) { 1676 + err = TIPC_ERR_NO_PORT; 1677 + goto reject; 1673 1678 } 1674 1679 1675 1680 /* Reject message if there isn't room to queue it */ 1676 - if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit) 1677 - return -TIPC_ERR_OVERLOAD; 1681 + if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { 1682 + err = TIPC_ERR_OVERLOAD; 1683 + goto reject; 1684 + } 1678 1685 1679 1686 /* Enqueue message */ 1680 - TIPC_SKB_CB(*skb)->handle = NULL; 1681 - __skb_queue_tail(&sk->sk_receive_queue, *skb); 1682 - skb_set_owner_r(*skb, sk); 1687 + TIPC_SKB_CB(skb)->handle = NULL; 1688 + __skb_queue_tail(&sk->sk_receive_queue, skb); 1689 + skb_set_owner_r(skb, sk); 1683 1690 1684 1691 sk->sk_data_ready(sk); 1685 - *skb = NULL; 1686 - return TIPC_OK; 1692 + return true; 1693 + 1694 + reject: 1695 + tipc_sk_respond(sk, skb, err); 1696 + return false; 1687 1697 } 1688 1698 1689 1699 /** ··· 1706 1698 */ 1707 1699 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) 1708 1700 { 1709 - int err; 1710 - atomic_t *dcnt; 1711 - u32 dnode = msg_prevnode(buf_msg(skb)); 1712 - struct tipc_sock *tsk = tipc_sk(sk); 1713 - struct net *net = sock_net(sk); 1714 - uint truesize = skb->truesize; 1701 + unsigned int truesize = skb->truesize; 1715 1702 1716 - err = filter_rcv(sk, &skb); 1717 - if (likely(!skb)) { 1718 - dcnt = &tsk->dupl_rcvcnt; 1719 - if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT) 1720 - atomic_add(truesize, dcnt); 1721 - return 0; 1722 - } 1723 - if (!err || tipc_msg_reverse(tsk_own_node(tsk), &skb, -err)) 1724 - tipc_node_xmit_skb(net, skb, dnode, tsk->portid); 1703 + if (likely(filter_rcv(sk, skb))) 1704 + atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); 1725 1705 return 0; 1726 1706 } 1727 1707 ··· 1719 1723 * @inputq: list of incoming buffers with potentially different destinations 1720 1724 * @sk: socket where the buffers should be enqueued 1721 1725 * @dport: port number for the socket 1722 - * @_skb: returned buffer to be forwarded or rejected, if applicable 1723 1726 * 1724 1727 * Caller must hold socket lock 1725 - * 1726 - * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD 1727 - * or -TIPC_ERR_NO_PORT 1728 1728 */ 1729 - static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, 1730 - u32 dport, struct sk_buff **_skb) 1729 + static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, 1730 + u32 dport) 1731 1731 { 1732 1732 unsigned int lim; 1733 1733 atomic_t *dcnt; 1734 - int err; 1735 1734 struct sk_buff *skb; 1736 1735 unsigned long time_limit = jiffies + 2; 1737 1736 1738 1737 while (skb_queue_len(inputq)) { 1739 1738 if (unlikely(time_after_eq(jiffies, time_limit))) 1740 - return TIPC_OK; 1739 + return; 1740 + 1741 1741 skb = tipc_skb_dequeue(inputq, dport); 1742 1742 if (unlikely(!skb)) 1743 - return TIPC_OK; 1743 + return; 1744 + 1745 + /* Add message directly to receive queue if possible */ 1744 1746 if (!sock_owned_by_user(sk)) { 1745 - err = filter_rcv(sk, &skb); 1746 - if (likely(!skb)) 1747 - continue; 1748 - *_skb = skb; 1749 - return err; 1747 + filter_rcv(sk, skb); 1748 + continue; 1750 1749 } 1750 + 1751 + /* Try backlog, compensating for double-counted bytes */ 1751 1752 dcnt = &tipc_sk(sk)->dupl_rcvcnt; 1752 1753 if (sk->sk_backlog.len) 1753 1754 atomic_set(dcnt, 0); 1754 1755 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); 1755 1756 if (likely(!sk_add_backlog(sk, skb, lim))) 1756 1757 continue; 1757 - *_skb = skb; 1758 - return -TIPC_ERR_OVERLOAD; 1758 + 1759 + /* Overload => reject message back to sender */ 1760 + tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD); 1761 + break; 1759 1762 } 1760 - return TIPC_OK; 1761 1763 } 1762 1764 1763 1765 /** ··· 1763 1769 * @inputq: buffer list containing the buffers 1764 1770 * Consumes all buffers in list until inputq is empty 1765 1771 * Note: may be called in multiple threads referring to the same queue 1766 - * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH 1767 - * Only node local calls check the return value, sending single-buffer queues 1768 1772 */ 1769 - int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) 1773 + void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) 1770 1774 { 1771 1775 u32 dnode, dport = 0; 1772 1776 int err; 1773 - struct sk_buff *skb; 1774 1777 struct tipc_sock *tsk; 1775 - struct tipc_net *tn; 1776 1778 struct sock *sk; 1779 + struct sk_buff *skb; 1777 1780 1778 1781 while (skb_queue_len(inputq)) { 1779 - err = -TIPC_ERR_NO_PORT; 1780 - skb = NULL; 1781 1782 dport = tipc_skb_peek_port(inputq, dport); 1782 1783 tsk = tipc_sk_lookup(net, dport); 1784 + 1783 1785 if (likely(tsk)) { 1784 1786 sk = &tsk->sk; 1785 1787 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { 1786 - err = tipc_sk_enqueue(inputq, sk, dport, &skb); 1788 + tipc_sk_enqueue(inputq, sk, dport); 1787 1789 spin_unlock_bh(&sk->sk_lock.slock); 1788 - dport = 0; 1789 1790 } 1790 1791 sock_put(sk); 1791 - } else { 1792 - skb = tipc_skb_dequeue(inputq, dport); 1793 - } 1794 - if (likely(!skb)) 1795 1792 continue; 1796 - if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) 1797 - goto xmit; 1798 - if (!err) { 1799 - dnode = msg_destnode(buf_msg(skb)); 1800 - goto xmit; 1801 - } else { 1802 - dnode = msg_prevnode(buf_msg(skb)); 1803 1793 } 1804 - tn = net_generic(net, tipc_net_id); 1805 - if (!tipc_msg_reverse(tn->own_addr, &skb, -err)) 1794 + 1795 + /* No destination socket => dequeue skb if still there */ 1796 + skb = tipc_skb_dequeue(inputq, dport); 1797 + if (!skb) 1798 + return; 1799 + 1800 + /* Try secondary lookup if unresolved named message */ 1801 + err = TIPC_ERR_NO_PORT; 1802 + if (tipc_msg_lookup_dest(net, skb, &err)) 1803 + goto xmit; 1804 + 1805 + /* Prepare for message rejection */ 1806 + if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err)) 1806 1807 continue; 1807 1808 xmit: 1809 + dnode = msg_destnode(buf_msg(skb)); 1808 1810 tipc_node_xmit_skb(net, skb, dnode, dport); 1809 1811 } 1810 - return err ? -EHOSTUNREACH : 0; 1811 1812 } 1812 1813 1813 1814 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) ··· 2071 2082 struct net *net = sock_net(sk); 2072 2083 struct tipc_sock *tsk = tipc_sk(sk); 2073 2084 struct sk_buff *skb; 2074 - u32 dnode; 2085 + u32 dnode = tsk_peer_node(tsk); 2086 + u32 dport = tsk_peer_port(tsk); 2087 + u32 onode = tipc_own_addr(net); 2088 + u32 oport = tsk->portid; 2075 2089 int res; 2076 2090 2077 2091 if (how != SHUT_RDWR) ··· 2100 2108 } else { 2101 2109 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 2102 2110 TIPC_CONN_MSG, SHORT_H_SIZE, 2103 - 0, dnode, tsk_own_node(tsk), 2104 - tsk_peer_port(tsk), 2105 - tsk->portid, TIPC_CONN_SHUTDOWN); 2111 + 0, dnode, onode, dport, oport, 2112 + TIPC_CONN_SHUTDOWN); 2106 2113 tipc_node_xmit_skb(net, skb, dnode, tsk->portid); 2107 2114 } 2108 2115 tsk->connected = 0;
+1 -1
net/tipc/socket.h
··· 44 44 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) 45 45 int tipc_socket_init(void); 46 46 void tipc_socket_stop(void); 47 - int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); 47 + void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); 48 48 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 49 49 struct sk_buff_head *inputq); 50 50 void tipc_sk_reinit(struct net *net);