Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge branch 'smc-next'

Ursula Braun says:

====================
patches 2018-05-23

here are more smc-patches for net-next:

Patch 1 fixes an ioctl problem detected by syzbot.

Patch 2 improves smc_lgr_list locking in case of abnormal link
group termination. If you want to receive a version for the net-tree,
please let me know. It would look somewhat different, since the port
terminate code has been moved to smc_core.c on net-next.

Patch 3 enables SMC to deal with urgent data.

Patch 4 is a minor improvement to avoid out-of-sync linkgroups
between 2 peers.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>

+267 -43
+37 -5
net/smc/af_smc.c
··· 8 8 * 9 9 * Initial restrictions: 10 10 * - support for alternate links postponed 11 - * - partial support for non-blocking sockets only 12 - * - support for urgent data postponed 13 11 * 14 12 * Copyright IBM Corp. 2016, 2018 15 13 * ··· 1336 1338 if (sk->sk_state == SMC_APPCLOSEWAIT1) 1337 1339 mask |= EPOLLIN; 1338 1340 } 1341 + if (smc->conn.urg_state == SMC_URG_VALID) 1342 + mask |= EPOLLPRI; 1339 1343 1340 1344 } 1341 1345 release_sock(sk); ··· 1477 1477 static int smc_ioctl(struct socket *sock, unsigned int cmd, 1478 1478 unsigned long arg) 1479 1479 { 1480 + union smc_host_cursor cons, urg; 1481 + struct smc_connection *conn; 1480 1482 struct smc_sock *smc; 1481 1483 int answ; 1482 1484 1483 1485 smc = smc_sk(sock->sk); 1486 + conn = &smc->conn; 1484 1487 if (smc->use_fallback) { 1485 1488 if (!smc->clcsock) 1486 1489 return -EBADF; ··· 1493 1490 case SIOCINQ: /* same as FIONREAD */ 1494 1491 if (smc->sk.sk_state == SMC_LISTEN) 1495 1492 return -EINVAL; 1496 - answ = atomic_read(&smc->conn.bytes_to_rcv); 1493 + if (smc->sk.sk_state == SMC_INIT || 1494 + smc->sk.sk_state == SMC_CLOSED) 1495 + answ = 0; 1496 + else 1497 + answ = atomic_read(&smc->conn.bytes_to_rcv); 1497 1498 break; 1498 1499 case SIOCOUTQ: 1499 1500 /* output queue size (not send + not acked) */ 1500 1501 if (smc->sk.sk_state == SMC_LISTEN) 1501 1502 return -EINVAL; 1502 - answ = smc->conn.sndbuf_desc->len - 1503 + if (smc->sk.sk_state == SMC_INIT || 1504 + smc->sk.sk_state == SMC_CLOSED) 1505 + answ = 0; 1506 + else 1507 + answ = smc->conn.sndbuf_desc->len - 1503 1508 atomic_read(&smc->conn.sndbuf_space); 1504 1509 break; 1505 1510 case SIOCOUTQNSD: 1506 1511 /* output queue size (not send only) */ 1507 1512 if (smc->sk.sk_state == SMC_LISTEN) 1508 1513 return -EINVAL; 1509 - answ = smc_tx_prepared_sends(&smc->conn); 1514 + if (smc->sk.sk_state == SMC_INIT || 1515 + smc->sk.sk_state == SMC_CLOSED) 1516 + answ = 0; 1517 + else 1518 + answ = smc_tx_prepared_sends(&smc->conn); 1519 + break; 1520 + case SIOCATMARK: 1521 + if (smc->sk.sk_state == SMC_LISTEN) 1522 + return -EINVAL; 1523 + if (smc->sk.sk_state == SMC_INIT || 1524 + smc->sk.sk_state == SMC_CLOSED) { 1525 + answ = 0; 1526 + } else { 1527 + smc_curs_write(&cons, 1528 + smc_curs_read(&conn->local_tx_ctrl.cons, conn), 1529 + conn); 1530 + smc_curs_write(&urg, 1531 + smc_curs_read(&conn->urg_curs, conn), 1532 + conn); 1533 + answ = smc_curs_diff(conn->rmb_desc->len, 1534 + &cons, &urg) == 1; 1535 + } 1510 1536 break; 1511 1537 default: 1512 1538 return -ENOIOCTLCMD;
+15
net/smc/smc.h
··· 114 114 u8 reserved[18]; 115 115 } __aligned(8); 116 116 117 + enum smc_urg_state { 118 + SMC_URG_VALID, /* data present */ 119 + SMC_URG_NOTYET, /* data pending */ 120 + SMC_URG_READ /* data was already read */ 121 + }; 122 + 117 123 struct smc_connection { 118 124 struct rb_node alert_node; 119 125 struct smc_link_group *lgr; /* link group of connection */ ··· 166 160 union smc_host_cursor rx_curs_confirmed; /* confirmed to peer 167 161 * source of snd_una ? 168 162 */ 163 + union smc_host_cursor urg_curs; /* points at urgent byte */ 164 + enum smc_urg_state urg_state; 165 + bool urg_tx_pend; /* urgent data staged */ 166 + bool urg_rx_skip_pend; 167 + /* indicate urgent oob data 168 + * read, but previous regular 169 + * data still pending 170 + */ 171 + char urg_rx_byte; /* urgent byte */ 169 172 atomic_t bytes_to_rcv; /* arrived data, 170 173 * not yet received 171 174 */
+41 -3
net/smc/smc_cdc.c
··· 164 164 return (s16)(seq1 - seq2) < 0; 165 165 } 166 166 167 + static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc, 168 + int *diff_prod) 169 + { 170 + struct smc_connection *conn = &smc->conn; 171 + char *base; 172 + 173 + /* new data included urgent business */ 174 + smc_curs_write(&conn->urg_curs, 175 + smc_curs_read(&conn->local_rx_ctrl.prod, conn), 176 + conn); 177 + conn->urg_state = SMC_URG_VALID; 178 + if (!sock_flag(&smc->sk, SOCK_URGINLINE)) 179 + /* we'll skip the urgent byte, so don't account for it */ 180 + (*diff_prod)--; 181 + base = (char *)conn->rmb_desc->cpu_addr; 182 + if (conn->urg_curs.count) 183 + conn->urg_rx_byte = *(base + conn->urg_curs.count - 1); 184 + else 185 + conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1); 186 + sk_send_sigurg(&smc->sk); 187 + } 188 + 167 189 static void smc_cdc_msg_recv_action(struct smc_sock *smc, 168 190 struct smc_cdc_msg *cdc) 169 191 { ··· 216 194 diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old, 217 195 &conn->local_rx_ctrl.prod); 218 196 if (diff_prod) { 197 + if (conn->local_rx_ctrl.prod_flags.urg_data_present) 198 + smc_cdc_handle_urg_data_arrival(smc, &diff_prod); 219 199 /* bytes_to_rcv is decreased in smc_recvmsg */ 220 200 smp_mb__before_atomic(); 221 201 atomic_add(diff_prod, &conn->bytes_to_rcv); 222 202 /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ 223 203 smp_mb__after_atomic(); 224 204 smc->sk.sk_data_ready(&smc->sk); 225 - } else if ((conn->local_rx_ctrl.prod_flags.write_blocked) || 226 - (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req)) { 227 - smc->sk.sk_data_ready(&smc->sk); 205 + } else { 206 + if (conn->local_rx_ctrl.prod_flags.write_blocked || 207 + conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || 208 + conn->local_rx_ctrl.prod_flags.urg_data_pending) { 209 + if (conn->local_rx_ctrl.prod_flags.urg_data_pending) 210 + conn->urg_state = SMC_URG_NOTYET; 211 + /* force immediate tx of current consumer cursor, but 212 + * under send_lock to guarantee arrival in seqno-order 213 + */ 214 + smc_tx_sndbuf_nonempty(conn); 215 + } 228 216 } 229 217 230 218 /* piggy backed tx info */ ··· 243 211 smc_tx_sndbuf_nonempty(conn); 244 212 /* trigger socket release if connection closed */ 245 213 smc_close_wake_tx_prepared(smc); 214 + } 215 + if (diff_cons && conn->urg_tx_pend && 216 + atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) { 217 + /* urg data confirmed by peer, indicate we're ready for more */ 218 + conn->urg_tx_pend = false; 219 + smc->sk.sk_write_space(&smc->sk); 246 220 } 247 221 248 222 if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
+13
net/smc/smc_cdc.h
··· 146 146 return max_t(int, 0, (new->count - old->count)); 147 147 } 148 148 149 + /* calculate cursor difference between old and new - returns negative 150 + * value in case old > new 151 + */ 152 + static inline int smc_curs_comp(unsigned int size, 153 + union smc_host_cursor *old, 154 + union smc_host_cursor *new) 155 + { 156 + if (old->wrap > new->wrap || 157 + (old->wrap == new->wrap && old->count > new->count)) 158 + return -smc_curs_diff(size, new, old); 159 + return smc_curs_diff(size, old, new); 160 + } 161 + 149 162 static inline void smc_host_cursor_to_cdc(union smc_cdc_cursor *peer, 150 163 union smc_host_cursor *local, 151 164 struct smc_connection *conn)
+15 -4
net/smc/smc_core.c
··· 28 28 29 29 #define SMC_LGR_NUM_INCR 256 30 30 #define SMC_LGR_FREE_DELAY_SERV (600 * HZ) 31 - #define SMC_LGR_FREE_DELAY_CLNT (SMC_LGR_FREE_DELAY_SERV + 10) 31 + #define SMC_LGR_FREE_DELAY_CLNT (SMC_LGR_FREE_DELAY_SERV + 10 * HZ) 32 32 33 33 static struct smc_lgr_list smc_lgr_list = { /* established link groups */ 34 34 .lock = __SPIN_LOCK_UNLOCKED(smc_lgr_list.lock), ··· 346 346 } 347 347 348 348 /* terminate linkgroup abnormally */ 349 - void smc_lgr_terminate(struct smc_link_group *lgr) 349 + static void __smc_lgr_terminate(struct smc_link_group *lgr) 350 350 { 351 351 struct smc_connection *conn; 352 352 struct smc_sock *smc; ··· 355 355 if (lgr->terminating) 356 356 return; /* lgr already terminating */ 357 357 lgr->terminating = 1; 358 - smc_lgr_forget(lgr); 358 + if (!list_empty(&lgr->list)) /* forget lgr */ 359 + list_del_init(&lgr->list); 359 360 smc_llc_link_inactive(&lgr->lnk[SMC_SINGLE_LINK]); 360 361 361 362 write_lock_bh(&lgr->conns_lock); ··· 378 377 smc_lgr_schedule_free_work(lgr); 379 378 } 380 379 380 + void smc_lgr_terminate(struct smc_link_group *lgr) 381 + { 382 + spin_lock_bh(&smc_lgr_list.lock); 383 + __smc_lgr_terminate(lgr); 384 + spin_unlock_bh(&smc_lgr_list.lock); 385 + } 386 + 381 387 /* Called when IB port is terminated */ 382 388 void smc_port_terminate(struct smc_ib_device *smcibdev, u8 ibport) 383 389 { 384 390 struct smc_link_group *lgr, *l; 385 391 392 + spin_lock_bh(&smc_lgr_list.lock); 386 393 list_for_each_entry_safe(lgr, l, &smc_lgr_list.list, list) { 387 394 if (lgr->lnk[SMC_SINGLE_LINK].smcibdev == smcibdev && 388 395 lgr->lnk[SMC_SINGLE_LINK].ibport == ibport) 389 - smc_lgr_terminate(lgr); 396 + __smc_lgr_terminate(lgr); 390 397 } 398 + spin_unlock_bh(&smc_lgr_list.lock); 391 399 } 392 400 393 401 /* Determine vlan of internal TCP socket. ··· 544 534 } 545 535 conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE; 546 536 conn->local_tx_ctrl.len = SMC_WR_TX_SIZE; 537 + conn->urg_state = SMC_URG_READ; 547 538 #ifndef KERNEL_HAS_ATOMIC64 548 539 spin_lock_init(&conn->acurs_lock); 549 540 #endif
+107 -13
net/smc/smc_rx.c
··· 47 47 * @conn connection to update 48 48 * @cons consumer cursor 49 49 * @len number of Bytes consumed 50 + * Returns: 51 + * 1 if we should end our receive, 0 otherwise 50 52 */ 51 - static void smc_rx_update_consumer(struct smc_connection *conn, 52 - union smc_host_cursor cons, size_t len) 53 + static int smc_rx_update_consumer(struct smc_sock *smc, 54 + union smc_host_cursor cons, size_t len) 53 55 { 56 + struct smc_connection *conn = &smc->conn; 57 + struct sock *sk = &smc->sk; 58 + bool force = false; 59 + int diff, rc = 0; 60 + 54 61 smc_curs_add(conn->rmb_desc->len, &cons, len); 62 + 63 + /* did we process urgent data? */ 64 + if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) { 65 + diff = smc_curs_comp(conn->rmb_desc->len, &cons, 66 + &conn->urg_curs); 67 + if (sock_flag(sk, SOCK_URGINLINE)) { 68 + if (diff == 0) { 69 + force = true; 70 + rc = 1; 71 + conn->urg_state = SMC_URG_READ; 72 + } 73 + } else { 74 + if (diff == 1) { 75 + /* skip urgent byte */ 76 + force = true; 77 + smc_curs_add(conn->rmb_desc->len, &cons, 1); 78 + conn->urg_rx_skip_pend = false; 79 + } else if (diff < -1) 80 + /* we read past urgent byte */ 81 + conn->urg_state = SMC_URG_READ; 82 + } 83 + } 84 + 55 85 smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn), 56 86 conn); 87 + 57 88 /* send consumer cursor update if required */ 58 89 /* similar to advertising new TCP rcv_wnd if required */ 59 - smc_tx_consumer_update(conn); 90 + smc_tx_consumer_update(conn, force); 91 + 92 + return rc; 93 + } 94 + 95 + static void smc_rx_update_cons(struct smc_sock *smc, size_t len) 96 + { 97 + struct smc_connection *conn = &smc->conn; 98 + union smc_host_cursor cons; 99 + 100 + smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn), 101 + conn); 102 + smc_rx_update_consumer(smc, cons, len); 60 103 } 61 104 62 105 struct smc_spd_priv { ··· 113 70 struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private; 114 71 struct smc_sock *smc = priv->smc; 115 72 struct smc_connection *conn; 116 - union smc_host_cursor cons; 117 73 struct sock *sk = &smc->sk; 118 74 119 75 if (sk->sk_state == SMC_CLOSED || ··· 121 79 goto out; 122 80 conn = &smc->conn; 123 81 lock_sock(sk); 124 - smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn), 125 - conn); 126 - smc_rx_update_consumer(conn, cons, priv->len); 82 + smc_rx_update_cons(smc, priv->len); 127 83 release_sock(sk); 128 84 if (atomic_sub_and_test(priv->len, &conn->splice_pending)) 129 85 smc_rx_wake_up(sk); ··· 224 184 return rc; 225 185 } 226 186 187 + static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len, 188 + int flags) 189 + { 190 + struct smc_connection *conn = &smc->conn; 191 + union smc_host_cursor cons; 192 + struct sock *sk = &smc->sk; 193 + int rc = 0; 194 + 195 + if (sock_flag(sk, SOCK_URGINLINE) || 196 + !(conn->urg_state == SMC_URG_VALID) || 197 + conn->urg_state == SMC_URG_READ) 198 + return -EINVAL; 199 + 200 + if (conn->urg_state == SMC_URG_VALID) { 201 + if (!(flags & MSG_PEEK)) 202 + smc->conn.urg_state = SMC_URG_READ; 203 + msg->msg_flags |= MSG_OOB; 204 + if (len > 0) { 205 + if (!(flags & MSG_TRUNC)) 206 + rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1); 207 + len = 1; 208 + smc_curs_write(&cons, 209 + smc_curs_read(&conn->local_tx_ctrl.cons, 210 + conn), 211 + conn); 212 + if (smc_curs_diff(conn->rmb_desc->len, &cons, 213 + &conn->urg_curs) > 1) 214 + conn->urg_rx_skip_pend = true; 215 + /* Urgent Byte was already accounted for, but trigger 216 + * skipping the urgent byte in non-inline case 217 + */ 218 + if (!(flags & MSG_PEEK)) 219 + smc_rx_update_consumer(smc, cons, 0); 220 + } else { 221 + msg->msg_flags |= MSG_TRUNC; 222 + } 223 + 224 + return rc ? -EFAULT : len; 225 + } 226 + 227 + if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN) 228 + return 0; 229 + 230 + return -EAGAIN; 231 + } 232 + 227 233 /* smc_rx_recvmsg - receive data from RMBE 228 234 * @msg: copy data to receive buffer 229 235 * @pipe: copy data to pipe if set - indicates splice() call ··· 295 209 296 210 if (unlikely(flags & MSG_ERRQUEUE)) 297 211 return -EINVAL; /* future work for sk.sk_family == AF_SMC */ 298 - if (flags & MSG_OOB) 299 - return -EINVAL; /* future work */ 300 212 301 213 sk = &smc->sk; 302 214 if (sk->sk_state == SMC_LISTEN) 303 215 return -ENOTCONN; 216 + if (flags & MSG_OOB) 217 + return smc_rx_recv_urg(smc, msg, len, flags); 304 218 timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 305 219 target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); 306 220 ··· 313 227 314 228 if (atomic_read(&conn->bytes_to_rcv)) 315 229 goto copy; 230 + else if (conn->urg_state == SMC_URG_VALID) 231 + /* we received a single urgent Byte - skip */ 232 + smc_rx_update_cons(smc, 0); 316 233 317 234 if (sk->sk_shutdown & RCV_SHUTDOWN || 318 235 smc_cdc_rxed_any_close_or_senddone(conn) || ··· 370 281 continue; 371 282 } 372 283 373 - /* not more than what user space asked for */ 374 - copylen = min_t(size_t, read_remaining, readable); 375 284 smc_curs_write(&cons, 376 285 smc_curs_read(&conn->local_tx_ctrl.cons, conn), 377 286 conn); 378 287 /* subsequent splice() calls pick up where previous left */ 379 288 if (splbytes) 380 289 smc_curs_add(conn->rmb_desc->len, &cons, splbytes); 290 + if (conn->urg_state == SMC_URG_VALID && 291 + sock_flag(&smc->sk, SOCK_URGINLINE) && 292 + readable > 1) 293 + readable--; /* always stop at urgent Byte */ 294 + /* not more than what user space asked for */ 295 + copylen = min_t(size_t, read_remaining, readable); 381 296 /* determine chunks where to read from rcvbuf */ 382 297 /* either unwrapped case, or 1st chunk of wrapped case */ 383 298 chunk_len = min_t(size_t, copylen, conn->rmb_desc->len - ··· 426 333 atomic_sub(copylen, &conn->bytes_to_rcv); 427 334 /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ 428 335 smp_mb__after_atomic(); 429 - if (msg) 430 - smc_rx_update_consumer(conn, cons, copylen); 336 + if (msg && smc_rx_update_consumer(smc, cons, copylen)) 337 + goto out; 431 338 } 432 339 } while (read_remaining); 433 340 out: ··· 439 346 { 440 347 smc->sk.sk_data_ready = smc_rx_wake_up; 441 348 atomic_set(&smc->conn.splice_pending, 0); 349 + smc->conn.urg_state = SMC_URG_READ; 442 350 }
+38 -17
net/smc/smc_tx.c
··· 32 32 /***************************** sndbuf producer *******************************/ 33 33 34 34 /* callback implementation for sk.sk_write_space() 35 - * to wakeup sndbuf producers that blocked with smc_tx_wait_memory(). 35 + * to wakeup sndbuf producers that blocked with smc_tx_wait(). 36 36 * called under sk_socket lock. 37 37 */ 38 38 static void smc_tx_write_space(struct sock *sk) ··· 56 56 } 57 57 } 58 58 59 - /* Wakeup sndbuf producers that blocked with smc_tx_wait_memory(). 59 + /* Wakeup sndbuf producers that blocked with smc_tx_wait(). 60 60 * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space(). 61 61 */ 62 62 void smc_tx_sndbuf_nonfull(struct smc_sock *smc) ··· 66 66 smc->sk.sk_write_space(&smc->sk); 67 67 } 68 68 69 - /* blocks sndbuf producer until at least one byte of free space available */ 70 - static int smc_tx_wait_memory(struct smc_sock *smc, int flags) 69 + /* blocks sndbuf producer until at least one byte of free space available 70 + * or urgent Byte was consumed 71 + */ 72 + static int smc_tx_wait(struct smc_sock *smc, int flags) 71 73 { 72 74 DEFINE_WAIT_FUNC(wait, woken_wake_function); 73 75 struct smc_connection *conn = &smc->conn; ··· 105 103 break; 106 104 } 107 105 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk); 108 - if (atomic_read(&conn->sndbuf_space)) 109 - break; /* at least 1 byte of free space available */ 106 + if (atomic_read(&conn->sndbuf_space) && !conn->urg_tx_pend) 107 + break; /* at least 1 byte of free & no urgent data */ 110 108 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 111 109 sk_wait_event(sk, &timeo, 112 110 sk->sk_err || 113 111 (sk->sk_shutdown & SEND_SHUTDOWN) || 114 112 smc_cdc_rxed_any_close(conn) || 115 - atomic_read(&conn->sndbuf_space), 113 + (atomic_read(&conn->sndbuf_space) && 114 + !conn->urg_tx_pend), 116 115 &wait); 117 116 } 118 117 remove_wait_queue(sk_sleep(sk), &wait); ··· 160 157 if (smc_cdc_rxed_any_close(conn)) 161 158 return send_done ?: -ECONNRESET; 162 159 163 - if (!atomic_read(&conn->sndbuf_space)) { 164 - rc = smc_tx_wait_memory(smc, msg->msg_flags); 160 + if (msg->msg_flags & MSG_OOB) 161 + conn->local_tx_ctrl.prod_flags.urg_data_pending = 1; 162 + 163 + if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) { 164 + rc = smc_tx_wait(smc, msg->msg_flags); 165 165 if (rc) { 166 166 if (send_done) 167 167 return send_done; ··· 174 168 } 175 169 176 170 /* initialize variables for 1st iteration of subsequent loop */ 177 - /* could be just 1 byte, even after smc_tx_wait_memory above */ 171 + /* could be just 1 byte, even after smc_tx_wait above */ 178 172 writespace = atomic_read(&conn->sndbuf_space); 179 173 /* not more than what user space asked for */ 180 174 copylen = min_t(size_t, send_remaining, writespace); ··· 224 218 /* since we just produced more new data into sndbuf, 225 219 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC 226 220 */ 221 + if ((msg->msg_flags & MSG_OOB) && !send_remaining) 222 + conn->urg_tx_pend = true; 227 223 if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) && 228 224 (atomic_read(&conn->sndbuf_space) > 229 225 (conn->sndbuf_desc->len >> 1))) ··· 307 299 union smc_host_cursor sent, prep, prod, cons; 308 300 struct ib_sge sges[SMC_IB_MAX_SEND_SGE]; 309 301 struct smc_link_group *lgr = conn->lgr; 302 + struct smc_cdc_producer_flags *pflags; 310 303 int to_send, rmbespace; 311 304 struct smc_link *link; 312 305 dma_addr_t dma_addr; ··· 335 326 conn); 336 327 337 328 /* if usable snd_wnd closes ask peer to advertise once it opens again */ 338 - conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace); 329 + pflags = &conn->local_tx_ctrl.prod_flags; 330 + pflags->write_blocked = (to_send >= rmbespace); 339 331 /* cf. usable snd_wnd */ 340 332 len = min(to_send, rmbespace); 341 333 ··· 401 391 src_len_sum = src_len; 402 392 } 403 393 394 + if (conn->urg_tx_pend && len == to_send) 395 + pflags->urg_data_present = 1; 404 396 smc_tx_advance_cursors(conn, &prod, &sent, len); 405 397 /* update connection's cursors with advanced local cursors */ 406 398 smc_curs_write(&conn->local_tx_ctrl.prod, ··· 422 410 */ 423 411 int smc_tx_sndbuf_nonempty(struct smc_connection *conn) 424 412 { 413 + struct smc_cdc_producer_flags *pflags; 425 414 struct smc_cdc_tx_pend *pend; 426 415 struct smc_wr_buf *wr_buf; 427 416 int rc; ··· 446 433 goto out_unlock; 447 434 } 448 435 449 - rc = smc_tx_rdma_writes(conn); 450 - if (rc) { 451 - smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], 452 - (struct smc_wr_tx_pend_priv *)pend); 453 - goto out_unlock; 436 + if (!conn->local_tx_ctrl.prod_flags.urg_data_present) { 437 + rc = smc_tx_rdma_writes(conn); 438 + if (rc) { 439 + smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], 440 + (struct smc_wr_tx_pend_priv *)pend); 441 + goto out_unlock; 442 + } 454 443 } 455 444 456 445 rc = smc_cdc_msg_send(conn, wr_buf, pend); 446 + pflags = &conn->local_tx_ctrl.prod_flags; 447 + if (!rc && pflags->urg_data_present) { 448 + pflags->urg_data_pending = 0; 449 + pflags->urg_data_present = 0; 450 + } 457 451 458 452 out_unlock: 459 453 spin_unlock_bh(&conn->send_lock); ··· 493 473 release_sock(&smc->sk); 494 474 } 495 475 496 - void smc_tx_consumer_update(struct smc_connection *conn) 476 + void smc_tx_consumer_update(struct smc_connection *conn, bool force) 497 477 { 498 478 union smc_host_cursor cfed, cons; 499 479 int to_confirm; ··· 507 487 to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons); 508 488 509 489 if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || 490 + force || 510 491 ((to_confirm > conn->rmbe_update_limit) && 511 492 ((to_confirm > (conn->rmb_desc->len / 2)) || 512 493 conn->local_rx_ctrl.prod_flags.write_blocked))) {
+1 -1
net/smc/smc_tx.h
··· 32 32 int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len); 33 33 int smc_tx_sndbuf_nonempty(struct smc_connection *conn); 34 34 void smc_tx_sndbuf_nonfull(struct smc_sock *smc); 35 - void smc_tx_consumer_update(struct smc_connection *conn); 35 + void smc_tx_consumer_update(struct smc_connection *conn, bool force); 36 36 37 37 #endif /* SMC_TX_H */