Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

RDS: Connection handling

While arguably the fact that the underlying transport needs a
connection to convey RDS's datagrame reliably is not important
to rds proper, the transports implemented so far (IB and TCP)
have both been connection-oriented, and so the connection
state machine-related code is in the common rds code.

This patch also includes several work items, to handle connecting,
sending, receiving, and shutdown.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>

authored by

Andy Grover and committed by
David S. Miller
00e0f34c a8c879a7

+752
+487
net/rds/connection.c
··· 1 + /* 2 + * Copyright (c) 2006 Oracle. All rights reserved. 3 + * 4 + * This software is available to you under a choice of one of two 5 + * licenses. You may choose to be licensed under the terms of the GNU 6 + * General Public License (GPL) Version 2, available from the file 7 + * COPYING in the main directory of this source tree, or the 8 + * OpenIB.org BSD license below: 9 + * 10 + * Redistribution and use in source and binary forms, with or 11 + * without modification, are permitted provided that the following 12 + * conditions are met: 13 + * 14 + * - Redistributions of source code must retain the above 15 + * copyright notice, this list of conditions and the following 16 + * disclaimer. 17 + * 18 + * - Redistributions in binary form must reproduce the above 19 + * copyright notice, this list of conditions and the following 20 + * disclaimer in the documentation and/or other materials 21 + * provided with the distribution. 22 + * 23 + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 + * SOFTWARE. 31 + * 32 + */ 33 + #include <linux/kernel.h> 34 + #include <linux/list.h> 35 + #include <net/inet_hashtables.h> 36 + 37 + #include "rds.h" 38 + #include "loop.h" 39 + #include "rdma.h" 40 + 41 + #define RDS_CONNECTION_HASH_BITS 12 42 + #define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS) 43 + #define RDS_CONNECTION_HASH_MASK (RDS_CONNECTION_HASH_ENTRIES - 1) 44 + 45 + /* converting this to RCU is a chore for another day.. */ 46 + static DEFINE_SPINLOCK(rds_conn_lock); 47 + static unsigned long rds_conn_count; 48 + static struct hlist_head rds_conn_hash[RDS_CONNECTION_HASH_ENTRIES]; 49 + static struct kmem_cache *rds_conn_slab; 50 + 51 + static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr) 52 + { 53 + /* Pass NULL, don't need struct net for hash */ 54 + unsigned long hash = inet_ehashfn(NULL, 55 + be32_to_cpu(laddr), 0, 56 + be32_to_cpu(faddr), 0); 57 + return &rds_conn_hash[hash & RDS_CONNECTION_HASH_MASK]; 58 + } 59 + 60 + #define rds_conn_info_set(var, test, suffix) do { \ 61 + if (test) \ 62 + var |= RDS_INFO_CONNECTION_FLAG_##suffix; \ 63 + } while (0) 64 + 65 + static inline int rds_conn_is_sending(struct rds_connection *conn) 66 + { 67 + int ret = 0; 68 + 69 + if (!mutex_trylock(&conn->c_send_lock)) 70 + ret = 1; 71 + else 72 + mutex_unlock(&conn->c_send_lock); 73 + 74 + return ret; 75 + } 76 + 77 + static struct rds_connection *rds_conn_lookup(struct hlist_head *head, 78 + __be32 laddr, __be32 faddr, 79 + struct rds_transport *trans) 80 + { 81 + struct rds_connection *conn, *ret = NULL; 82 + struct hlist_node *pos; 83 + 84 + hlist_for_each_entry(conn, pos, head, c_hash_node) { 85 + if (conn->c_faddr == faddr && conn->c_laddr == laddr && 86 + conn->c_trans == trans) { 87 + ret = conn; 88 + break; 89 + } 90 + } 91 + rdsdebug("returning conn %p for %pI4 -> %pI4\n", ret, 92 + &laddr, &faddr); 93 + return ret; 94 + } 95 + 96 + /* 97 + * This is called by transports as they're bringing down a connection. 98 + * It clears partial message state so that the transport can start sending 99 + * and receiving over this connection again in the future. It is up to 100 + * the transport to have serialized this call with its send and recv. 101 + */ 102 + void rds_conn_reset(struct rds_connection *conn) 103 + { 104 + rdsdebug("connection %pI4 to %pI4 reset\n", 105 + &conn->c_laddr, &conn->c_faddr); 106 + 107 + rds_stats_inc(s_conn_reset); 108 + rds_send_reset(conn); 109 + conn->c_flags = 0; 110 + 111 + /* Do not clear next_rx_seq here, else we cannot distinguish 112 + * retransmitted packets from new packets, and will hand all 113 + * of them to the application. That is not consistent with the 114 + * reliability guarantees of RDS. */ 115 + } 116 + 117 + /* 118 + * There is only every one 'conn' for a given pair of addresses in the 119 + * system at a time. They contain messages to be retransmitted and so 120 + * span the lifetime of the actual underlying transport connections. 121 + * 122 + * For now they are not garbage collected once they're created. They 123 + * are torn down as the module is removed, if ever. 124 + */ 125 + static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, 126 + struct rds_transport *trans, gfp_t gfp, 127 + int is_outgoing) 128 + { 129 + struct rds_connection *conn, *tmp, *parent = NULL; 130 + struct hlist_head *head = rds_conn_bucket(laddr, faddr); 131 + unsigned long flags; 132 + int ret; 133 + 134 + spin_lock_irqsave(&rds_conn_lock, flags); 135 + conn = rds_conn_lookup(head, laddr, faddr, trans); 136 + if (conn 137 + && conn->c_loopback 138 + && conn->c_trans != &rds_loop_transport 139 + && !is_outgoing) { 140 + /* This is a looped back IB connection, and we're 141 + * called by the code handling the incoming connect. 142 + * We need a second connection object into which we 143 + * can stick the other QP. */ 144 + parent = conn; 145 + conn = parent->c_passive; 146 + } 147 + spin_unlock_irqrestore(&rds_conn_lock, flags); 148 + if (conn) 149 + goto out; 150 + 151 + conn = kmem_cache_alloc(rds_conn_slab, gfp); 152 + if (conn == NULL) { 153 + conn = ERR_PTR(-ENOMEM); 154 + goto out; 155 + } 156 + 157 + memset(conn, 0, sizeof(*conn)); 158 + 159 + INIT_HLIST_NODE(&conn->c_hash_node); 160 + conn->c_version = RDS_PROTOCOL_3_0; 161 + conn->c_laddr = laddr; 162 + conn->c_faddr = faddr; 163 + spin_lock_init(&conn->c_lock); 164 + conn->c_next_tx_seq = 1; 165 + 166 + mutex_init(&conn->c_send_lock); 167 + INIT_LIST_HEAD(&conn->c_send_queue); 168 + INIT_LIST_HEAD(&conn->c_retrans); 169 + 170 + ret = rds_cong_get_maps(conn); 171 + if (ret) { 172 + kmem_cache_free(rds_conn_slab, conn); 173 + conn = ERR_PTR(ret); 174 + goto out; 175 + } 176 + 177 + /* 178 + * This is where a connection becomes loopback. If *any* RDS sockets 179 + * can bind to the destination address then we'd rather the messages 180 + * flow through loopback rather than either transport. 181 + */ 182 + if (rds_trans_get_preferred(faddr)) { 183 + conn->c_loopback = 1; 184 + if (is_outgoing && trans->t_prefer_loopback) { 185 + /* "outgoing" connection - and the transport 186 + * says it wants the connection handled by the 187 + * loopback transport. This is what TCP does. 188 + */ 189 + trans = &rds_loop_transport; 190 + } 191 + } 192 + 193 + conn->c_trans = trans; 194 + 195 + ret = trans->conn_alloc(conn, gfp); 196 + if (ret) { 197 + kmem_cache_free(rds_conn_slab, conn); 198 + conn = ERR_PTR(ret); 199 + goto out; 200 + } 201 + 202 + atomic_set(&conn->c_state, RDS_CONN_DOWN); 203 + conn->c_reconnect_jiffies = 0; 204 + INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); 205 + INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); 206 + INIT_DELAYED_WORK(&conn->c_conn_w, rds_connect_worker); 207 + INIT_WORK(&conn->c_down_w, rds_shutdown_worker); 208 + mutex_init(&conn->c_cm_lock); 209 + conn->c_flags = 0; 210 + 211 + rdsdebug("allocated conn %p for %pI4 -> %pI4 over %s %s\n", 212 + conn, &laddr, &faddr, 213 + trans->t_name ? trans->t_name : "[unknown]", 214 + is_outgoing ? "(outgoing)" : ""); 215 + 216 + spin_lock_irqsave(&rds_conn_lock, flags); 217 + if (parent == NULL) { 218 + tmp = rds_conn_lookup(head, laddr, faddr, trans); 219 + if (tmp == NULL) 220 + hlist_add_head(&conn->c_hash_node, head); 221 + } else { 222 + tmp = parent->c_passive; 223 + if (!tmp) 224 + parent->c_passive = conn; 225 + } 226 + 227 + if (tmp) { 228 + trans->conn_free(conn->c_transport_data); 229 + kmem_cache_free(rds_conn_slab, conn); 230 + conn = tmp; 231 + } else { 232 + rds_cong_add_conn(conn); 233 + rds_conn_count++; 234 + } 235 + 236 + spin_unlock_irqrestore(&rds_conn_lock, flags); 237 + 238 + out: 239 + return conn; 240 + } 241 + 242 + struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr, 243 + struct rds_transport *trans, gfp_t gfp) 244 + { 245 + return __rds_conn_create(laddr, faddr, trans, gfp, 0); 246 + } 247 + 248 + struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr, 249 + struct rds_transport *trans, gfp_t gfp) 250 + { 251 + return __rds_conn_create(laddr, faddr, trans, gfp, 1); 252 + } 253 + 254 + void rds_conn_destroy(struct rds_connection *conn) 255 + { 256 + struct rds_message *rm, *rtmp; 257 + 258 + rdsdebug("freeing conn %p for %pI4 -> " 259 + "%pI4\n", conn, &conn->c_laddr, 260 + &conn->c_faddr); 261 + 262 + hlist_del_init(&conn->c_hash_node); 263 + 264 + /* wait for the rds thread to shut it down */ 265 + atomic_set(&conn->c_state, RDS_CONN_ERROR); 266 + cancel_delayed_work(&conn->c_conn_w); 267 + queue_work(rds_wq, &conn->c_down_w); 268 + flush_workqueue(rds_wq); 269 + 270 + /* tear down queued messages */ 271 + list_for_each_entry_safe(rm, rtmp, 272 + &conn->c_send_queue, 273 + m_conn_item) { 274 + list_del_init(&rm->m_conn_item); 275 + BUG_ON(!list_empty(&rm->m_sock_item)); 276 + rds_message_put(rm); 277 + } 278 + if (conn->c_xmit_rm) 279 + rds_message_put(conn->c_xmit_rm); 280 + 281 + conn->c_trans->conn_free(conn->c_transport_data); 282 + 283 + /* 284 + * The congestion maps aren't freed up here. They're 285 + * freed by rds_cong_exit() after all the connections 286 + * have been freed. 287 + */ 288 + rds_cong_remove_conn(conn); 289 + 290 + BUG_ON(!list_empty(&conn->c_retrans)); 291 + kmem_cache_free(rds_conn_slab, conn); 292 + 293 + rds_conn_count--; 294 + } 295 + 296 + static void rds_conn_message_info(struct socket *sock, unsigned int len, 297 + struct rds_info_iterator *iter, 298 + struct rds_info_lengths *lens, 299 + int want_send) 300 + { 301 + struct hlist_head *head; 302 + struct hlist_node *pos; 303 + struct list_head *list; 304 + struct rds_connection *conn; 305 + struct rds_message *rm; 306 + unsigned long flags; 307 + unsigned int total = 0; 308 + size_t i; 309 + 310 + len /= sizeof(struct rds_info_message); 311 + 312 + spin_lock_irqsave(&rds_conn_lock, flags); 313 + 314 + for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); 315 + i++, head++) { 316 + hlist_for_each_entry(conn, pos, head, c_hash_node) { 317 + if (want_send) 318 + list = &conn->c_send_queue; 319 + else 320 + list = &conn->c_retrans; 321 + 322 + spin_lock(&conn->c_lock); 323 + 324 + /* XXX too lazy to maintain counts.. */ 325 + list_for_each_entry(rm, list, m_conn_item) { 326 + total++; 327 + if (total <= len) 328 + rds_inc_info_copy(&rm->m_inc, iter, 329 + conn->c_laddr, 330 + conn->c_faddr, 0); 331 + } 332 + 333 + spin_unlock(&conn->c_lock); 334 + } 335 + } 336 + 337 + spin_unlock_irqrestore(&rds_conn_lock, flags); 338 + 339 + lens->nr = total; 340 + lens->each = sizeof(struct rds_info_message); 341 + } 342 + 343 + static void rds_conn_message_info_send(struct socket *sock, unsigned int len, 344 + struct rds_info_iterator *iter, 345 + struct rds_info_lengths *lens) 346 + { 347 + rds_conn_message_info(sock, len, iter, lens, 1); 348 + } 349 + 350 + static void rds_conn_message_info_retrans(struct socket *sock, 351 + unsigned int len, 352 + struct rds_info_iterator *iter, 353 + struct rds_info_lengths *lens) 354 + { 355 + rds_conn_message_info(sock, len, iter, lens, 0); 356 + } 357 + 358 + void rds_for_each_conn_info(struct socket *sock, unsigned int len, 359 + struct rds_info_iterator *iter, 360 + struct rds_info_lengths *lens, 361 + int (*visitor)(struct rds_connection *, void *), 362 + size_t item_len) 363 + { 364 + uint64_t buffer[(item_len + 7) / 8]; 365 + struct hlist_head *head; 366 + struct hlist_node *pos; 367 + struct hlist_node *tmp; 368 + struct rds_connection *conn; 369 + unsigned long flags; 370 + size_t i; 371 + 372 + spin_lock_irqsave(&rds_conn_lock, flags); 373 + 374 + lens->nr = 0; 375 + lens->each = item_len; 376 + 377 + for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); 378 + i++, head++) { 379 + hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) { 380 + 381 + /* XXX no c_lock usage.. */ 382 + if (!visitor(conn, buffer)) 383 + continue; 384 + 385 + /* We copy as much as we can fit in the buffer, 386 + * but we count all items so that the caller 387 + * can resize the buffer. */ 388 + if (len >= item_len) { 389 + rds_info_copy(iter, buffer, item_len); 390 + len -= item_len; 391 + } 392 + lens->nr++; 393 + } 394 + } 395 + 396 + spin_unlock_irqrestore(&rds_conn_lock, flags); 397 + } 398 + 399 + static int rds_conn_info_visitor(struct rds_connection *conn, 400 + void *buffer) 401 + { 402 + struct rds_info_connection *cinfo = buffer; 403 + 404 + cinfo->next_tx_seq = conn->c_next_tx_seq; 405 + cinfo->next_rx_seq = conn->c_next_rx_seq; 406 + cinfo->laddr = conn->c_laddr; 407 + cinfo->faddr = conn->c_faddr; 408 + strncpy(cinfo->transport, conn->c_trans->t_name, 409 + sizeof(cinfo->transport)); 410 + cinfo->flags = 0; 411 + 412 + rds_conn_info_set(cinfo->flags, 413 + rds_conn_is_sending(conn), SENDING); 414 + /* XXX Future: return the state rather than these funky bits */ 415 + rds_conn_info_set(cinfo->flags, 416 + atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, 417 + CONNECTING); 418 + rds_conn_info_set(cinfo->flags, 419 + atomic_read(&conn->c_state) == RDS_CONN_UP, 420 + CONNECTED); 421 + return 1; 422 + } 423 + 424 + static void rds_conn_info(struct socket *sock, unsigned int len, 425 + struct rds_info_iterator *iter, 426 + struct rds_info_lengths *lens) 427 + { 428 + rds_for_each_conn_info(sock, len, iter, lens, 429 + rds_conn_info_visitor, 430 + sizeof(struct rds_info_connection)); 431 + } 432 + 433 + int __init rds_conn_init(void) 434 + { 435 + rds_conn_slab = kmem_cache_create("rds_connection", 436 + sizeof(struct rds_connection), 437 + 0, 0, NULL); 438 + if (rds_conn_slab == NULL) 439 + return -ENOMEM; 440 + 441 + rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info); 442 + rds_info_register_func(RDS_INFO_SEND_MESSAGES, 443 + rds_conn_message_info_send); 444 + rds_info_register_func(RDS_INFO_RETRANS_MESSAGES, 445 + rds_conn_message_info_retrans); 446 + 447 + return 0; 448 + } 449 + 450 + void rds_conn_exit(void) 451 + { 452 + rds_loop_exit(); 453 + 454 + WARN_ON(!hlist_empty(rds_conn_hash)); 455 + 456 + kmem_cache_destroy(rds_conn_slab); 457 + 458 + rds_info_deregister_func(RDS_INFO_CONNECTIONS, rds_conn_info); 459 + rds_info_deregister_func(RDS_INFO_SEND_MESSAGES, 460 + rds_conn_message_info_send); 461 + rds_info_deregister_func(RDS_INFO_RETRANS_MESSAGES, 462 + rds_conn_message_info_retrans); 463 + } 464 + 465 + /* 466 + * Force a disconnect 467 + */ 468 + void rds_conn_drop(struct rds_connection *conn) 469 + { 470 + atomic_set(&conn->c_state, RDS_CONN_ERROR); 471 + queue_work(rds_wq, &conn->c_down_w); 472 + } 473 + 474 + /* 475 + * An error occurred on the connection 476 + */ 477 + void 478 + __rds_conn_error(struct rds_connection *conn, const char *fmt, ...) 479 + { 480 + va_list ap; 481 + 482 + va_start(ap, fmt); 483 + vprintk(fmt, ap); 484 + va_end(ap); 485 + 486 + rds_conn_drop(conn); 487 + }
+265
net/rds/threads.c
··· 1 + /* 2 + * Copyright (c) 2006 Oracle. All rights reserved. 3 + * 4 + * This software is available to you under a choice of one of two 5 + * licenses. You may choose to be licensed under the terms of the GNU 6 + * General Public License (GPL) Version 2, available from the file 7 + * COPYING in the main directory of this source tree, or the 8 + * OpenIB.org BSD license below: 9 + * 10 + * Redistribution and use in source and binary forms, with or 11 + * without modification, are permitted provided that the following 12 + * conditions are met: 13 + * 14 + * - Redistributions of source code must retain the above 15 + * copyright notice, this list of conditions and the following 16 + * disclaimer. 17 + * 18 + * - Redistributions in binary form must reproduce the above 19 + * copyright notice, this list of conditions and the following 20 + * disclaimer in the documentation and/or other materials 21 + * provided with the distribution. 22 + * 23 + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 + * SOFTWARE. 31 + * 32 + */ 33 + #include <linux/kernel.h> 34 + #include <linux/random.h> 35 + 36 + #include "rds.h" 37 + 38 + /* 39 + * All of connection management is simplified by serializing it through 40 + * work queues that execute in a connection managing thread. 41 + * 42 + * TCP wants to send acks through sendpage() in response to data_ready(), 43 + * but it needs a process context to do so. 44 + * 45 + * The receive paths need to allocate but can't drop packets (!) so we have 46 + * a thread around to block allocating if the receive fast path sees an 47 + * allocation failure. 48 + */ 49 + 50 + /* Grand Unified Theory of connection life cycle: 51 + * At any point in time, the connection can be in one of these states: 52 + * DOWN, CONNECTING, UP, DISCONNECTING, ERROR 53 + * 54 + * The following transitions are possible: 55 + * ANY -> ERROR 56 + * UP -> DISCONNECTING 57 + * ERROR -> DISCONNECTING 58 + * DISCONNECTING -> DOWN 59 + * DOWN -> CONNECTING 60 + * CONNECTING -> UP 61 + * 62 + * Transition to state DISCONNECTING/DOWN: 63 + * - Inside the shutdown worker; synchronizes with xmit path 64 + * through c_send_lock, and with connection management callbacks 65 + * via c_cm_lock. 66 + * 67 + * For receive callbacks, we rely on the underlying transport 68 + * (TCP, IB/RDMA) to provide the necessary synchronisation. 69 + */ 70 + struct workqueue_struct *rds_wq; 71 + 72 + void rds_connect_complete(struct rds_connection *conn) 73 + { 74 + if (!rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_UP)) { 75 + printk(KERN_WARNING "%s: Cannot transition to state UP, " 76 + "current state is %d\n", 77 + __func__, 78 + atomic_read(&conn->c_state)); 79 + atomic_set(&conn->c_state, RDS_CONN_ERROR); 80 + queue_work(rds_wq, &conn->c_down_w); 81 + return; 82 + } 83 + 84 + rdsdebug("conn %p for %pI4 to %pI4 complete\n", 85 + conn, &conn->c_laddr, &conn->c_faddr); 86 + 87 + conn->c_reconnect_jiffies = 0; 88 + set_bit(0, &conn->c_map_queued); 89 + queue_delayed_work(rds_wq, &conn->c_send_w, 0); 90 + queue_delayed_work(rds_wq, &conn->c_recv_w, 0); 91 + } 92 + 93 + /* 94 + * This random exponential backoff is relied on to eventually resolve racing 95 + * connects. 96 + * 97 + * If connect attempts race then both parties drop both connections and come 98 + * here to wait for a random amount of time before trying again. Eventually 99 + * the backoff range will be so much greater than the time it takes to 100 + * establish a connection that one of the pair will establish the connection 101 + * before the other's random delay fires. 102 + * 103 + * Connection attempts that arrive while a connection is already established 104 + * are also considered to be racing connects. This lets a connection from 105 + * a rebooted machine replace an existing stale connection before the transport 106 + * notices that the connection has failed. 107 + * 108 + * We should *always* start with a random backoff; otherwise a broken connection 109 + * will always take several iterations to be re-established. 110 + */ 111 + static void rds_queue_reconnect(struct rds_connection *conn) 112 + { 113 + unsigned long rand; 114 + 115 + rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n", 116 + conn, &conn->c_laddr, &conn->c_faddr, 117 + conn->c_reconnect_jiffies); 118 + 119 + set_bit(RDS_RECONNECT_PENDING, &conn->c_flags); 120 + if (conn->c_reconnect_jiffies == 0) { 121 + conn->c_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; 122 + queue_delayed_work(rds_wq, &conn->c_conn_w, 0); 123 + return; 124 + } 125 + 126 + get_random_bytes(&rand, sizeof(rand)); 127 + rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n", 128 + rand % conn->c_reconnect_jiffies, conn->c_reconnect_jiffies, 129 + conn, &conn->c_laddr, &conn->c_faddr); 130 + queue_delayed_work(rds_wq, &conn->c_conn_w, 131 + rand % conn->c_reconnect_jiffies); 132 + 133 + conn->c_reconnect_jiffies = min(conn->c_reconnect_jiffies * 2, 134 + rds_sysctl_reconnect_max_jiffies); 135 + } 136 + 137 + void rds_connect_worker(struct work_struct *work) 138 + { 139 + struct rds_connection *conn = container_of(work, struct rds_connection, c_conn_w.work); 140 + int ret; 141 + 142 + clear_bit(RDS_RECONNECT_PENDING, &conn->c_flags); 143 + if (rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) { 144 + ret = conn->c_trans->conn_connect(conn); 145 + rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n", 146 + conn, &conn->c_laddr, &conn->c_faddr, ret); 147 + 148 + if (ret) { 149 + if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN)) 150 + rds_queue_reconnect(conn); 151 + else 152 + rds_conn_error(conn, "RDS: connect failed\n"); 153 + } 154 + } 155 + } 156 + 157 + void rds_shutdown_worker(struct work_struct *work) 158 + { 159 + struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w); 160 + 161 + /* shut it down unless it's down already */ 162 + if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) { 163 + /* 164 + * Quiesce the connection mgmt handlers before we start tearing 165 + * things down. We don't hold the mutex for the entire 166 + * duration of the shutdown operation, else we may be 167 + * deadlocking with the CM handler. Instead, the CM event 168 + * handler is supposed to check for state DISCONNECTING 169 + */ 170 + mutex_lock(&conn->c_cm_lock); 171 + if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING) 172 + && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) { 173 + rds_conn_error(conn, "shutdown called in state %d\n", 174 + atomic_read(&conn->c_state)); 175 + mutex_unlock(&conn->c_cm_lock); 176 + return; 177 + } 178 + mutex_unlock(&conn->c_cm_lock); 179 + 180 + mutex_lock(&conn->c_send_lock); 181 + conn->c_trans->conn_shutdown(conn); 182 + rds_conn_reset(conn); 183 + mutex_unlock(&conn->c_send_lock); 184 + 185 + if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) { 186 + /* This can happen - eg when we're in the middle of tearing 187 + * down the connection, and someone unloads the rds module. 188 + * Quite reproduceable with loopback connections. 189 + * Mostly harmless. 190 + */ 191 + rds_conn_error(conn, 192 + "%s: failed to transition to state DOWN, " 193 + "current state is %d\n", 194 + __func__, 195 + atomic_read(&conn->c_state)); 196 + return; 197 + } 198 + } 199 + 200 + /* Then reconnect if it's still live. 201 + * The passive side of an IB loopback connection is never added 202 + * to the conn hash, so we never trigger a reconnect on this 203 + * conn - the reconnect is always triggered by the active peer. */ 204 + cancel_delayed_work(&conn->c_conn_w); 205 + if (!hlist_unhashed(&conn->c_hash_node)) 206 + rds_queue_reconnect(conn); 207 + } 208 + 209 + void rds_send_worker(struct work_struct *work) 210 + { 211 + struct rds_connection *conn = container_of(work, struct rds_connection, c_send_w.work); 212 + int ret; 213 + 214 + if (rds_conn_state(conn) == RDS_CONN_UP) { 215 + ret = rds_send_xmit(conn); 216 + rdsdebug("conn %p ret %d\n", conn, ret); 217 + switch (ret) { 218 + case -EAGAIN: 219 + rds_stats_inc(s_send_immediate_retry); 220 + queue_delayed_work(rds_wq, &conn->c_send_w, 0); 221 + break; 222 + case -ENOMEM: 223 + rds_stats_inc(s_send_delayed_retry); 224 + queue_delayed_work(rds_wq, &conn->c_send_w, 2); 225 + default: 226 + break; 227 + } 228 + } 229 + } 230 + 231 + void rds_recv_worker(struct work_struct *work) 232 + { 233 + struct rds_connection *conn = container_of(work, struct rds_connection, c_recv_w.work); 234 + int ret; 235 + 236 + if (rds_conn_state(conn) == RDS_CONN_UP) { 237 + ret = conn->c_trans->recv(conn); 238 + rdsdebug("conn %p ret %d\n", conn, ret); 239 + switch (ret) { 240 + case -EAGAIN: 241 + rds_stats_inc(s_recv_immediate_retry); 242 + queue_delayed_work(rds_wq, &conn->c_recv_w, 0); 243 + break; 244 + case -ENOMEM: 245 + rds_stats_inc(s_recv_delayed_retry); 246 + queue_delayed_work(rds_wq, &conn->c_recv_w, 2); 247 + default: 248 + break; 249 + } 250 + } 251 + } 252 + 253 + void rds_threads_exit(void) 254 + { 255 + destroy_workqueue(rds_wq); 256 + } 257 + 258 + int __init rds_threads_init(void) 259 + { 260 + rds_wq = create_singlethread_workqueue("krdsd"); 261 + if (rds_wq == NULL) 262 + return -ENOMEM; 263 + 264 + return 0; 265 + }