Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

rds: fix reordering with composite message notification

RDS composite message(rdma + control) user notification needs to be
triggered once the full message is delivered and such a fix was
added as part of commit 941f8d55f6d61 ("RDS: RDMA: Fix the composite
message user notification"). But rds_send_remove_from_sock is missing
data part notify check and hence at times the user don't get
notification which isn't desirable.

One way is to fix the rds_send_remove_from_sock to check of that case
but considering the ordering complexity with completion handler and
rdma + control messages are always dispatched back to back in same send
context, just delaying the signaled completion on rmda work request also
gets the desired behaviour. i.e Notifying application only after
RDMA + control message send completes. So patch updates the earlier
fix with this approach. The delay signaling completions of rdma op
till the control message send completes fix was done by Venkat
Venkatsubra in downstream kernel.

Reviewed-and-tested-by: Zhu Yanjun <yanjun.zhu@oracle.com>
Reviewed-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>

+14 -30
+13 -16
net/rds/ib_send.c
··· 69 69 complete(rm, notify_status); 70 70 } 71 71 72 + static void rds_ib_send_unmap_data(struct rds_ib_connection *ic, 73 + struct rm_data_op *op, 74 + int wc_status) 75 + { 76 + if (op->op_nents) 77 + ib_dma_unmap_sg(ic->i_cm_id->device, 78 + op->op_sg, op->op_nents, 79 + DMA_TO_DEVICE); 80 + } 81 + 72 82 static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, 73 83 struct rm_rdma_op *op, 74 84 int wc_status) ··· 137 127 rds_ib_stats_inc(s_ib_atomic_cswp); 138 128 else 139 129 rds_ib_stats_inc(s_ib_atomic_fadd); 140 - } 141 - 142 - static void rds_ib_send_unmap_data(struct rds_ib_connection *ic, 143 - struct rm_data_op *op, 144 - int wc_status) 145 - { 146 - struct rds_message *rm = container_of(op, struct rds_message, data); 147 - 148 - if (op->op_nents) 149 - ib_dma_unmap_sg(ic->i_cm_id->device, 150 - op->op_sg, op->op_nents, 151 - DMA_TO_DEVICE); 152 - 153 - if (rm->rdma.op_active && rm->data.op_notify) 154 - rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status); 155 130 } 156 131 157 132 /* ··· 897 902 send->s_queued = jiffies; 898 903 send->s_op = NULL; 899 904 900 - nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify); 905 + if (!op->op_notify) 906 + nr_sig += rds_ib_set_wr_signal_state(ic, send, 907 + op->op_notify); 901 908 902 909 send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; 903 910 send->s_rdma_wr.remote_addr = remote_addr;
-10
net/rds/rdma.c
··· 641 641 } 642 642 op->op_notifier->n_user_token = args->user_token; 643 643 op->op_notifier->n_status = RDS_RDMA_SUCCESS; 644 - 645 - /* Enable rmda notification on data operation for composite 646 - * rds messages and make sure notification is enabled only 647 - * for the data operation which follows it so that application 648 - * gets notified only after full message gets delivered. 649 - */ 650 - if (rm->data.op_sg) { 651 - rm->rdma.op_notify = 0; 652 - rm->data.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME); 653 - } 654 644 } 655 645 656 646 /* The cookie contains the R_Key of the remote memory region, and
-1
net/rds/rds.h
··· 476 476 } rdma; 477 477 struct rm_data_op { 478 478 unsigned int op_active:1; 479 - unsigned int op_notify:1; 480 479 unsigned int op_nents; 481 480 unsigned int op_count; 482 481 unsigned int op_dmasg;
+1 -3
net/rds/send.c
··· 491 491 struct rm_rdma_op *ro; 492 492 struct rds_notifier *notifier; 493 493 unsigned long flags; 494 - unsigned int notify = 0; 495 494 496 495 spin_lock_irqsave(&rm->m_rs_lock, flags); 497 496 498 - notify = rm->rdma.op_notify | rm->data.op_notify; 499 497 ro = &rm->rdma; 500 498 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 501 - ro->op_active && notify && ro->op_notifier) { 499 + ro->op_active && ro->op_notify && ro->op_notifier) { 502 500 notifier = ro->op_notifier; 503 501 rs = rm->m_rs; 504 502 sock_hold(rds_rs_to_sk(rs));