Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

rds: deliver zerocopy completion notification with data

This commit is an optimization over commit 01883eda72bd
("rds: support for zcopy completion notification") for PF_RDS sockets.

RDS applications are predominantly request-response transactions, so
it is more efficient to reduce the number of system calls and have
zerocopy completion notification delivered as ancillary data on the
POLLIN channel.

Cookies are passed up as ancillary data (at level SOL_RDS) in a
struct rds_zcopy_cookies when the returned value of recvmsg() is
greater than, or equal to, 0. A max of RDS_MAX_ZCOOKIES may be passed
with each message.

This commit removes support for zerocopy completion notification on
MSG_ERRQUEUE for PF_RDS sockets.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Willem de Bruijn <willemb@google.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Sowmini Varadhan and committed by
David S. Miller
401910db 67490e34

+60 -27
-2
include/uapi/linux/errqueue.h
··· 20 20 #define SO_EE_ORIGIN_ICMP6 3 21 21 #define SO_EE_ORIGIN_TXSTATUS 4 22 22 #define SO_EE_ORIGIN_ZEROCOPY 5 23 - #define SO_EE_ORIGIN_ZCOOKIE 6 24 23 #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS 25 24 26 25 #define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1)) 27 26 28 27 #define SO_EE_CODE_ZEROCOPY_COPIED 1 29 - #define SO_EE_ORIGIN_MAX_ZCOOKIES 8 30 28 31 29 /** 32 30 * struct scm_timestamping - timestamps exposed through cmsg
+7
include/uapi/linux/rds.h
··· 104 104 #define RDS_CMSG_MASKED_ATOMIC_CSWP 9 105 105 #define RDS_CMSG_RXPATH_LATENCY 11 106 106 #define RDS_CMSG_ZCOPY_COOKIE 12 107 + #define RDS_CMSG_ZCOPY_COMPLETION 13 107 108 108 109 #define RDS_INFO_FIRST 10000 109 110 #define RDS_INFO_COUNTERS 10000 ··· 317 316 #define RDS_RDMA_CANCELED 2 318 317 #define RDS_RDMA_DROPPED 3 319 318 #define RDS_RDMA_OTHER_ERROR 4 319 + 320 + #define RDS_MAX_ZCOOKIES 8 321 + struct rds_zcopy_cookies { 322 + __u32 num; 323 + __u32 cookies[RDS_MAX_ZCOOKIES]; 324 + }; 320 325 321 326 /* 322 327 * Common set of flags for all RDMA related structs
+5 -2
net/rds/af_rds.c
··· 77 77 rds_send_drop_to(rs, NULL); 78 78 rds_rdma_drop_keys(rs); 79 79 rds_notify_queue_get(rs, NULL); 80 + __skb_queue_purge(&rs->rs_zcookie_queue); 80 81 81 82 spin_lock_bh(&rds_sock_lock); 82 83 list_del_init(&rs->rs_item); ··· 145 144 * - to signal that a previously congested destination may have become 146 145 * uncongested 147 146 * - A notification has been queued to the socket (this can be a congestion 148 - * update, or a RDMA completion). 147 + * update, or a RDMA completion, or a MSG_ZEROCOPY completion). 149 148 * 150 149 * EPOLLOUT is asserted if there is room on the send queue. This does not mean 151 150 * however, that the next sendmsg() call will succeed. If the application tries ··· 179 178 spin_unlock(&rs->rs_lock); 180 179 } 181 180 if (!list_empty(&rs->rs_recv_queue) || 182 - !list_empty(&rs->rs_notify_queue)) 181 + !list_empty(&rs->rs_notify_queue) || 182 + !skb_queue_empty(&rs->rs_zcookie_queue)) 183 183 mask |= (EPOLLIN | EPOLLRDNORM); 184 184 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) 185 185 mask |= (EPOLLOUT | EPOLLWRNORM); ··· 515 513 INIT_LIST_HEAD(&rs->rs_recv_queue); 516 514 INIT_LIST_HEAD(&rs->rs_notify_queue); 517 515 INIT_LIST_HEAD(&rs->rs_cong_list); 516 + skb_queue_head_init(&rs->rs_zcookie_queue); 518 517 spin_lock_init(&rs->rs_rdma_lock); 519 518 rs->rs_rdma_keys = RB_ROOT; 520 519 rs->rs_rx_traces = 0;
+16 -22
net/rds/message.c
··· 58 58 59 59 static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) 60 60 { 61 - struct sock_exterr_skb *serr = SKB_EXT_ERR(skb); 62 - int ncookies; 63 - u32 *ptr; 61 + struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb; 62 + int ncookies = ck->num; 64 63 65 - if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE) 64 + if (ncookies == RDS_MAX_ZCOOKIES) 66 65 return false; 67 - ncookies = serr->ee.ee_data; 68 - if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES) 69 - return false; 70 - ptr = skb_put(skb, sizeof(u32)); 71 - *ptr = cookie; 72 - serr->ee.ee_data = ++ncookies; 66 + ck->cookies[ncookies] = cookie; 67 + ck->num = ++ncookies; 73 68 return true; 74 69 } 75 70 76 71 static void rds_rm_zerocopy_callback(struct rds_sock *rs, 77 72 struct rds_znotifier *znotif) 78 73 { 79 - struct sock *sk = rds_rs_to_sk(rs); 80 74 struct sk_buff *skb, *tail; 81 - struct sock_exterr_skb *serr; 82 75 unsigned long flags; 83 76 struct sk_buff_head *q; 84 77 u32 cookie = znotif->z_cookie; 78 + struct rds_zcopy_cookies *ck; 85 79 86 - q = &sk->sk_error_queue; 80 + q = &rs->rs_zcookie_queue; 87 81 spin_lock_irqsave(&q->lock, flags); 88 82 tail = skb_peek_tail(q); 89 83 ··· 85 91 spin_unlock_irqrestore(&q->lock, flags); 86 92 mm_unaccount_pinned_pages(&znotif->z_mmp); 87 93 consume_skb(rds_skb_from_znotifier(znotif)); 88 - sk->sk_error_report(sk); 94 + /* caller invokes rds_wake_sk_sleep() */ 89 95 return; 90 96 } 91 97 92 98 skb = rds_skb_from_znotifier(znotif); 93 - serr = SKB_EXT_ERR(skb); 94 - memset(&serr->ee, 0, sizeof(serr->ee)); 95 - serr->ee.ee_errno = 0; 96 - serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE; 97 - serr->ee.ee_info = 0; 99 + ck = (struct rds_zcopy_cookies *)skb->cb; 100 + memset(ck, 0, sizeof(*ck)); 98 101 WARN_ON(!skb_zcookie_add(skb, cookie)); 99 102 100 103 __skb_queue_tail(q, skb); 101 104 102 105 spin_unlock_irqrestore(&q->lock, flags); 103 - sk->sk_error_report(sk); 106 + /* caller invokes rds_wake_sk_sleep() */ 104 107 105 108 mm_unaccount_pinned_pages(&znotif->z_mmp); 106 109 } ··· 120 129 if (rm->data.op_mmp_znotifier) { 121 130 zcopy = true; 122 131 rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier); 132 + rds_wake_sk_sleep(rs); 123 133 rm->data.op_mmp_znotifier = NULL; 124 134 } 125 135 sock_put(rds_rs_to_sk(rs)); ··· 354 362 int total_copied = 0; 355 363 struct sk_buff *skb; 356 364 357 - skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32), 358 - GFP_KERNEL); 365 + skb = alloc_skb(0, GFP_KERNEL); 359 366 if (!skb) 360 367 return -ENOMEM; 368 + BUILD_BUG_ON(sizeof(skb->cb) < 369 + max_t(int, sizeof(struct rds_znotifier), 370 + sizeof(struct rds_zcopy_cookies))); 361 371 rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb); 362 372 if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp, 363 373 length)) {
+2
net/rds/rds.h
··· 603 603 /* Socket receive path trace points*/ 604 604 u8 rs_rx_traces; 605 605 u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; 606 + 607 + struct sk_buff_head rs_zcookie_queue; 606 608 }; 607 609 608 610 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
+30 -1
net/rds/recv.c
··· 577 577 return ret; 578 578 } 579 579 580 + static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) 581 + { 582 + struct sk_buff *skb; 583 + struct sk_buff_head *q = &rs->rs_zcookie_queue; 584 + struct rds_zcopy_cookies *done; 585 + 586 + if (!msg->msg_control) 587 + return false; 588 + 589 + if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || 590 + msg->msg_controllen < CMSG_SPACE(sizeof(*done))) 591 + return false; 592 + 593 + skb = skb_dequeue(q); 594 + if (!skb) 595 + return false; 596 + done = (struct rds_zcopy_cookies *)skb->cb; 597 + if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done), 598 + done)) { 599 + skb_queue_head(q, skb); 600 + return false; 601 + } 602 + consume_skb(skb); 603 + return true; 604 + } 605 + 580 606 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, 581 607 int msg_flags) 582 608 { ··· 637 611 638 612 if (!rds_next_incoming(rs, &inc)) { 639 613 if (nonblock) { 640 - ret = -EAGAIN; 614 + bool reaped = rds_recvmsg_zcookie(rs, msg); 615 + 616 + ret = reaped ? 0 : -EAGAIN; 641 617 break; 642 618 } 643 619 ··· 688 660 ret = -EFAULT; 689 661 goto out; 690 662 } 663 + rds_recvmsg_zcookie(rs, msg); 691 664 692 665 rds_stats_inc(s_recv_delivered); 693 666