Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12/*
13 * lowcomms.c
14 *
15 * This is the "low-level" comms layer.
16 *
17 * It is responsible for sending/receiving messages
18 * from other nodes in the cluster.
19 *
20 * Cluster nodes are referred to by their nodeids. nodeids are
21 * simply 32 bit numbers to the locking module - if they need to
22 * be expanded for the cluster infrastructure then that is its
23 * responsibility. It is this layer's
24 * responsibility to resolve these into IP address or
25 * whatever it needs for inter-node communication.
26 *
27 * The comms level is two kernel threads that deal mainly with
28 * the receiving of messages from other nodes and passing them
29 * up to the mid-level comms layer (which understands the
30 * message format) for execution by the locking core, and
31 * a send thread which does all the setting up of connections
32 * to remote nodes and the sending of data. Threads are not allowed
33 * to send their own data because it may cause them to wait in times
34 * of high load. Also, this way, the sending thread can collect together
35 * messages bound for one node and send them in one block.
36 *
37 * lowcomms will choose to use either TCP or SCTP as its transport layer
38 * depending on the configuration variable 'protocol'. This should be set
39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
41 * for the DLM to function.
42 *
43 */
44
45#include <asm/ioctls.h>
46#include <net/sock.h>
47#include <net/tcp.h>
48#include <linux/pagemap.h>
49#include <linux/file.h>
50#include <linux/mutex.h>
51#include <linux/sctp.h>
52#include <linux/slab.h>
53#include <net/sctp/sctp.h>
54#include <net/ipv6.h>
55
56#include "dlm_internal.h"
57#include "lowcomms.h"
58#include "midcomms.h"
59#include "config.h"
60
61#define NEEDED_RMEM (4*1024*1024)
62#define CONN_HASH_SIZE 32
63
64/* Number of messages to send before rescheduling */
65#define MAX_SEND_MSG_COUNT 25
66#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
67
68struct connection {
69 struct socket *sock; /* NULL if not connected */
70 uint32_t nodeid; /* So we know who we are in the list */
71 struct mutex sock_mutex;
72 unsigned long flags;
73#define CF_READ_PENDING 1
74#define CF_WRITE_PENDING 2
75#define CF_INIT_PENDING 4
76#define CF_IS_OTHERCON 5
77#define CF_CLOSE 6
78#define CF_APP_LIMITED 7
79#define CF_CLOSING 8
80#define CF_SHUTDOWN 9
81#define CF_CONNECTED 10
82 struct list_head writequeue; /* List of outgoing writequeue_entries */
83 spinlock_t writequeue_lock;
84 void (*connect_action) (struct connection *); /* What to do to connect */
85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
86 int retries;
87#define MAX_CONNECT_RETRIES 3
88 struct hlist_node list;
89 struct connection *othercon;
90 struct work_struct rwork; /* Receive workqueue */
91 struct work_struct swork; /* Send workqueue */
92 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
93 unsigned char *rx_buf;
94 int rx_buflen;
95 int rx_leftover;
96 struct rcu_head rcu;
97};
98#define sock2con(x) ((struct connection *)(x)->sk_user_data)
99
100struct listen_connection {
101 struct socket *sock;
102 struct work_struct rwork;
103};
104
105/* An entry waiting to be sent */
106struct writequeue_entry {
107 struct list_head list;
108 struct page *page;
109 int offset;
110 int len;
111 int end;
112 int users;
113 struct connection *con;
114};
115
116struct dlm_node_addr {
117 struct list_head list;
118 int nodeid;
119 int addr_count;
120 int curr_addr_index;
121 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
122};
123
124static struct listen_sock_callbacks {
125 void (*sk_error_report)(struct sock *);
126 void (*sk_data_ready)(struct sock *);
127 void (*sk_state_change)(struct sock *);
128 void (*sk_write_space)(struct sock *);
129} listen_sock;
130
131static LIST_HEAD(dlm_node_addrs);
132static DEFINE_SPINLOCK(dlm_node_addrs_spin);
133
134static struct listen_connection listen_con;
135static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
136static int dlm_local_count;
137static int dlm_allow_conn;
138
139/* Work queues */
140static struct workqueue_struct *recv_workqueue;
141static struct workqueue_struct *send_workqueue;
142
143static struct hlist_head connection_hash[CONN_HASH_SIZE];
144static DEFINE_SPINLOCK(connections_lock);
145DEFINE_STATIC_SRCU(connections_srcu);
146
147static void process_recv_sockets(struct work_struct *work);
148static void process_send_sockets(struct work_struct *work);
149
150static void sctp_connect_to_sock(struct connection *con);
151static void tcp_connect_to_sock(struct connection *con);
152static void dlm_tcp_shutdown(struct connection *con);
153
154/* This is deliberately very simple because most clusters have simple
155 sequential nodeids, so we should be able to go straight to a connection
156 struct in the array */
157static inline int nodeid_hash(int nodeid)
158{
159 return nodeid & (CONN_HASH_SIZE-1);
160}
161
162static struct connection *__find_con(int nodeid)
163{
164 int r, idx;
165 struct connection *con;
166
167 r = nodeid_hash(nodeid);
168
169 idx = srcu_read_lock(&connections_srcu);
170 hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
171 if (con->nodeid == nodeid) {
172 srcu_read_unlock(&connections_srcu, idx);
173 return con;
174 }
175 }
176 srcu_read_unlock(&connections_srcu, idx);
177
178 return NULL;
179}
180
181static int dlm_con_init(struct connection *con, int nodeid)
182{
183 con->rx_buflen = dlm_config.ci_buffer_size;
184 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
185 if (!con->rx_buf)
186 return -ENOMEM;
187
188 con->nodeid = nodeid;
189 mutex_init(&con->sock_mutex);
190 INIT_LIST_HEAD(&con->writequeue);
191 spin_lock_init(&con->writequeue_lock);
192 INIT_WORK(&con->swork, process_send_sockets);
193 INIT_WORK(&con->rwork, process_recv_sockets);
194 init_waitqueue_head(&con->shutdown_wait);
195
196 if (dlm_config.ci_protocol == 0) {
197 con->connect_action = tcp_connect_to_sock;
198 con->shutdown_action = dlm_tcp_shutdown;
199 } else {
200 con->connect_action = sctp_connect_to_sock;
201 }
202
203 return 0;
204}
205
206/*
207 * If 'allocation' is zero then we don't attempt to create a new
208 * connection structure for this node.
209 */
210static struct connection *nodeid2con(int nodeid, gfp_t alloc)
211{
212 struct connection *con, *tmp;
213 int r, ret;
214
215 con = __find_con(nodeid);
216 if (con || !alloc)
217 return con;
218
219 con = kzalloc(sizeof(*con), alloc);
220 if (!con)
221 return NULL;
222
223 ret = dlm_con_init(con, nodeid);
224 if (ret) {
225 kfree(con);
226 return NULL;
227 }
228
229 r = nodeid_hash(nodeid);
230
231 spin_lock(&connections_lock);
232 /* Because multiple workqueues/threads calls this function it can
233 * race on multiple cpu's. Instead of locking hot path __find_con()
234 * we just check in rare cases of recently added nodes again
235 * under protection of connections_lock. If this is the case we
236 * abort our connection creation and return the existing connection.
237 */
238 tmp = __find_con(nodeid);
239 if (tmp) {
240 spin_unlock(&connections_lock);
241 kfree(con->rx_buf);
242 kfree(con);
243 return tmp;
244 }
245
246 hlist_add_head_rcu(&con->list, &connection_hash[r]);
247 spin_unlock(&connections_lock);
248
249 return con;
250}
251
252/* Loop round all connections */
253static void foreach_conn(void (*conn_func)(struct connection *c))
254{
255 int i, idx;
256 struct connection *con;
257
258 idx = srcu_read_lock(&connections_srcu);
259 for (i = 0; i < CONN_HASH_SIZE; i++) {
260 hlist_for_each_entry_rcu(con, &connection_hash[i], list)
261 conn_func(con);
262 }
263 srcu_read_unlock(&connections_srcu, idx);
264}
265
266static struct dlm_node_addr *find_node_addr(int nodeid)
267{
268 struct dlm_node_addr *na;
269
270 list_for_each_entry(na, &dlm_node_addrs, list) {
271 if (na->nodeid == nodeid)
272 return na;
273 }
274 return NULL;
275}
276
277static int addr_compare(const struct sockaddr_storage *x,
278 const struct sockaddr_storage *y)
279{
280 switch (x->ss_family) {
281 case AF_INET: {
282 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
283 struct sockaddr_in *siny = (struct sockaddr_in *)y;
284 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
285 return 0;
286 if (sinx->sin_port != siny->sin_port)
287 return 0;
288 break;
289 }
290 case AF_INET6: {
291 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
292 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
293 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
294 return 0;
295 if (sinx->sin6_port != siny->sin6_port)
296 return 0;
297 break;
298 }
299 default:
300 return 0;
301 }
302 return 1;
303}
304
305static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
306 struct sockaddr *sa_out, bool try_new_addr)
307{
308 struct sockaddr_storage sas;
309 struct dlm_node_addr *na;
310
311 if (!dlm_local_count)
312 return -1;
313
314 spin_lock(&dlm_node_addrs_spin);
315 na = find_node_addr(nodeid);
316 if (na && na->addr_count) {
317 memcpy(&sas, na->addr[na->curr_addr_index],
318 sizeof(struct sockaddr_storage));
319
320 if (try_new_addr) {
321 na->curr_addr_index++;
322 if (na->curr_addr_index == na->addr_count)
323 na->curr_addr_index = 0;
324 }
325 }
326 spin_unlock(&dlm_node_addrs_spin);
327
328 if (!na)
329 return -EEXIST;
330
331 if (!na->addr_count)
332 return -ENOENT;
333
334 if (sas_out)
335 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
336
337 if (!sa_out)
338 return 0;
339
340 if (dlm_local_addr[0]->ss_family == AF_INET) {
341 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
342 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
343 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
344 } else {
345 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
346 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
347 ret6->sin6_addr = in6->sin6_addr;
348 }
349
350 return 0;
351}
352
353static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
354{
355 struct dlm_node_addr *na;
356 int rv = -EEXIST;
357 int addr_i;
358
359 spin_lock(&dlm_node_addrs_spin);
360 list_for_each_entry(na, &dlm_node_addrs, list) {
361 if (!na->addr_count)
362 continue;
363
364 for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
365 if (addr_compare(na->addr[addr_i], addr)) {
366 *nodeid = na->nodeid;
367 rv = 0;
368 goto unlock;
369 }
370 }
371 }
372unlock:
373 spin_unlock(&dlm_node_addrs_spin);
374 return rv;
375}
376
377/* caller need to held dlm_node_addrs_spin lock */
378static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
379 const struct sockaddr_storage *addr)
380{
381 int i;
382
383 for (i = 0; i < na->addr_count; i++) {
384 if (addr_compare(na->addr[i], addr))
385 return true;
386 }
387
388 return false;
389}
390
391int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
392{
393 struct sockaddr_storage *new_addr;
394 struct dlm_node_addr *new_node, *na;
395 bool ret;
396
397 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
398 if (!new_node)
399 return -ENOMEM;
400
401 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
402 if (!new_addr) {
403 kfree(new_node);
404 return -ENOMEM;
405 }
406
407 memcpy(new_addr, addr, len);
408
409 spin_lock(&dlm_node_addrs_spin);
410 na = find_node_addr(nodeid);
411 if (!na) {
412 new_node->nodeid = nodeid;
413 new_node->addr[0] = new_addr;
414 new_node->addr_count = 1;
415 list_add(&new_node->list, &dlm_node_addrs);
416 spin_unlock(&dlm_node_addrs_spin);
417 return 0;
418 }
419
420 ret = dlm_lowcomms_na_has_addr(na, addr);
421 if (ret) {
422 spin_unlock(&dlm_node_addrs_spin);
423 kfree(new_addr);
424 kfree(new_node);
425 return -EEXIST;
426 }
427
428 if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
429 spin_unlock(&dlm_node_addrs_spin);
430 kfree(new_addr);
431 kfree(new_node);
432 return -ENOSPC;
433 }
434
435 na->addr[na->addr_count++] = new_addr;
436 spin_unlock(&dlm_node_addrs_spin);
437 kfree(new_node);
438 return 0;
439}
440
441/* Data available on socket or listen socket received a connect */
442static void lowcomms_data_ready(struct sock *sk)
443{
444 struct connection *con;
445
446 read_lock_bh(&sk->sk_callback_lock);
447 con = sock2con(sk);
448 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
449 queue_work(recv_workqueue, &con->rwork);
450 read_unlock_bh(&sk->sk_callback_lock);
451}
452
453static void lowcomms_listen_data_ready(struct sock *sk)
454{
455 queue_work(recv_workqueue, &listen_con.rwork);
456}
457
458static void lowcomms_write_space(struct sock *sk)
459{
460 struct connection *con;
461
462 read_lock_bh(&sk->sk_callback_lock);
463 con = sock2con(sk);
464 if (!con)
465 goto out;
466
467 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
468 log_print("successful connected to node %d", con->nodeid);
469 queue_work(send_workqueue, &con->swork);
470 goto out;
471 }
472
473 clear_bit(SOCK_NOSPACE, &con->sock->flags);
474
475 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
476 con->sock->sk->sk_write_pending--;
477 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
478 }
479
480 queue_work(send_workqueue, &con->swork);
481out:
482 read_unlock_bh(&sk->sk_callback_lock);
483}
484
485static inline void lowcomms_connect_sock(struct connection *con)
486{
487 if (test_bit(CF_CLOSE, &con->flags))
488 return;
489 queue_work(send_workqueue, &con->swork);
490 cond_resched();
491}
492
493static void lowcomms_state_change(struct sock *sk)
494{
495 /* SCTP layer is not calling sk_data_ready when the connection
496 * is done, so we catch the signal through here. Also, it
497 * doesn't switch socket state when entering shutdown, so we
498 * skip the write in that case.
499 */
500 if (sk->sk_shutdown) {
501 if (sk->sk_shutdown == RCV_SHUTDOWN)
502 lowcomms_data_ready(sk);
503 } else if (sk->sk_state == TCP_ESTABLISHED) {
504 lowcomms_write_space(sk);
505 }
506}
507
508int dlm_lowcomms_connect_node(int nodeid)
509{
510 struct connection *con;
511
512 if (nodeid == dlm_our_nodeid())
513 return 0;
514
515 con = nodeid2con(nodeid, GFP_NOFS);
516 if (!con)
517 return -ENOMEM;
518 lowcomms_connect_sock(con);
519 return 0;
520}
521
522static void lowcomms_error_report(struct sock *sk)
523{
524 struct connection *con;
525 struct sockaddr_storage saddr;
526 void (*orig_report)(struct sock *) = NULL;
527
528 read_lock_bh(&sk->sk_callback_lock);
529 con = sock2con(sk);
530 if (con == NULL)
531 goto out;
532
533 orig_report = listen_sock.sk_error_report;
534 if (con->sock == NULL ||
535 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) {
536 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
537 "sending to node %d, port %d, "
538 "sk_err=%d/%d\n", dlm_our_nodeid(),
539 con->nodeid, dlm_config.ci_tcp_port,
540 sk->sk_err, sk->sk_err_soft);
541 } else if (saddr.ss_family == AF_INET) {
542 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
543
544 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
545 "sending to node %d at %pI4, port %d, "
546 "sk_err=%d/%d\n", dlm_our_nodeid(),
547 con->nodeid, &sin4->sin_addr.s_addr,
548 dlm_config.ci_tcp_port, sk->sk_err,
549 sk->sk_err_soft);
550 } else {
551 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
552
553 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
554 "sending to node %d at %u.%u.%u.%u, "
555 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
556 con->nodeid, sin6->sin6_addr.s6_addr32[0],
557 sin6->sin6_addr.s6_addr32[1],
558 sin6->sin6_addr.s6_addr32[2],
559 sin6->sin6_addr.s6_addr32[3],
560 dlm_config.ci_tcp_port, sk->sk_err,
561 sk->sk_err_soft);
562 }
563out:
564 read_unlock_bh(&sk->sk_callback_lock);
565 if (orig_report)
566 orig_report(sk);
567}
568
569/* Note: sk_callback_lock must be locked before calling this function. */
570static void save_listen_callbacks(struct socket *sock)
571{
572 struct sock *sk = sock->sk;
573
574 listen_sock.sk_data_ready = sk->sk_data_ready;
575 listen_sock.sk_state_change = sk->sk_state_change;
576 listen_sock.sk_write_space = sk->sk_write_space;
577 listen_sock.sk_error_report = sk->sk_error_report;
578}
579
580static void restore_callbacks(struct socket *sock)
581{
582 struct sock *sk = sock->sk;
583
584 write_lock_bh(&sk->sk_callback_lock);
585 sk->sk_user_data = NULL;
586 sk->sk_data_ready = listen_sock.sk_data_ready;
587 sk->sk_state_change = listen_sock.sk_state_change;
588 sk->sk_write_space = listen_sock.sk_write_space;
589 sk->sk_error_report = listen_sock.sk_error_report;
590 write_unlock_bh(&sk->sk_callback_lock);
591}
592
593static void add_listen_sock(struct socket *sock, struct listen_connection *con)
594{
595 struct sock *sk = sock->sk;
596
597 write_lock_bh(&sk->sk_callback_lock);
598 save_listen_callbacks(sock);
599 con->sock = sock;
600
601 sk->sk_user_data = con;
602 sk->sk_allocation = GFP_NOFS;
603 /* Install a data_ready callback */
604 sk->sk_data_ready = lowcomms_listen_data_ready;
605 write_unlock_bh(&sk->sk_callback_lock);
606}
607
608/* Make a socket active */
609static void add_sock(struct socket *sock, struct connection *con)
610{
611 struct sock *sk = sock->sk;
612
613 write_lock_bh(&sk->sk_callback_lock);
614 con->sock = sock;
615
616 sk->sk_user_data = con;
617 /* Install a data_ready callback */
618 sk->sk_data_ready = lowcomms_data_ready;
619 sk->sk_write_space = lowcomms_write_space;
620 sk->sk_state_change = lowcomms_state_change;
621 sk->sk_allocation = GFP_NOFS;
622 sk->sk_error_report = lowcomms_error_report;
623 write_unlock_bh(&sk->sk_callback_lock);
624}
625
626/* Add the port number to an IPv6 or 4 sockaddr and return the address
627 length */
628static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
629 int *addr_len)
630{
631 saddr->ss_family = dlm_local_addr[0]->ss_family;
632 if (saddr->ss_family == AF_INET) {
633 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
634 in4_addr->sin_port = cpu_to_be16(port);
635 *addr_len = sizeof(struct sockaddr_in);
636 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
637 } else {
638 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
639 in6_addr->sin6_port = cpu_to_be16(port);
640 *addr_len = sizeof(struct sockaddr_in6);
641 }
642 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
643}
644
645static void dlm_close_sock(struct socket **sock)
646{
647 if (*sock) {
648 restore_callbacks(*sock);
649 sock_release(*sock);
650 *sock = NULL;
651 }
652}
653
654/* Close a remote connection and tidy up */
655static void close_connection(struct connection *con, bool and_other,
656 bool tx, bool rx)
657{
658 bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
659
660 if (tx && !closing && cancel_work_sync(&con->swork)) {
661 log_print("canceled swork for node %d", con->nodeid);
662 clear_bit(CF_WRITE_PENDING, &con->flags);
663 }
664 if (rx && !closing && cancel_work_sync(&con->rwork)) {
665 log_print("canceled rwork for node %d", con->nodeid);
666 clear_bit(CF_READ_PENDING, &con->flags);
667 }
668
669 mutex_lock(&con->sock_mutex);
670 dlm_close_sock(&con->sock);
671
672 if (con->othercon && and_other) {
673 /* Will only re-enter once. */
674 close_connection(con->othercon, false, true, true);
675 }
676
677 con->rx_leftover = 0;
678 con->retries = 0;
679 clear_bit(CF_CONNECTED, &con->flags);
680 mutex_unlock(&con->sock_mutex);
681 clear_bit(CF_CLOSING, &con->flags);
682}
683
684static void shutdown_connection(struct connection *con)
685{
686 int ret;
687
688 if (cancel_work_sync(&con->swork)) {
689 log_print("canceled swork for node %d", con->nodeid);
690 clear_bit(CF_WRITE_PENDING, &con->flags);
691 }
692
693 mutex_lock(&con->sock_mutex);
694 /* nothing to shutdown */
695 if (!con->sock) {
696 mutex_unlock(&con->sock_mutex);
697 return;
698 }
699
700 set_bit(CF_SHUTDOWN, &con->flags);
701 ret = kernel_sock_shutdown(con->sock, SHUT_WR);
702 mutex_unlock(&con->sock_mutex);
703 if (ret) {
704 log_print("Connection %p failed to shutdown: %d will force close",
705 con, ret);
706 goto force_close;
707 } else {
708 ret = wait_event_timeout(con->shutdown_wait,
709 !test_bit(CF_SHUTDOWN, &con->flags),
710 DLM_SHUTDOWN_WAIT_TIMEOUT);
711 if (ret == 0) {
712 log_print("Connection %p shutdown timed out, will force close",
713 con);
714 goto force_close;
715 }
716 }
717
718 return;
719
720force_close:
721 clear_bit(CF_SHUTDOWN, &con->flags);
722 close_connection(con, false, true, true);
723}
724
725static void dlm_tcp_shutdown(struct connection *con)
726{
727 if (con->othercon)
728 shutdown_connection(con->othercon);
729 shutdown_connection(con);
730}
731
732static int con_realloc_receive_buf(struct connection *con, int newlen)
733{
734 unsigned char *newbuf;
735
736 newbuf = kmalloc(newlen, GFP_NOFS);
737 if (!newbuf)
738 return -ENOMEM;
739
740 /* copy any leftover from last receive */
741 if (con->rx_leftover)
742 memmove(newbuf, con->rx_buf, con->rx_leftover);
743
744 /* swap to new buffer space */
745 kfree(con->rx_buf);
746 con->rx_buflen = newlen;
747 con->rx_buf = newbuf;
748
749 return 0;
750}
751
752/* Data received from remote end */
753static int receive_from_sock(struct connection *con)
754{
755 int call_again_soon = 0;
756 struct msghdr msg;
757 struct kvec iov;
758 int ret, buflen;
759
760 mutex_lock(&con->sock_mutex);
761
762 if (con->sock == NULL) {
763 ret = -EAGAIN;
764 goto out_close;
765 }
766
767 /* realloc if we get new buffer size to read out */
768 buflen = dlm_config.ci_buffer_size;
769 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
770 ret = con_realloc_receive_buf(con, buflen);
771 if (ret < 0)
772 goto out_resched;
773 }
774
775 /* calculate new buffer parameter regarding last receive and
776 * possible leftover bytes
777 */
778 iov.iov_base = con->rx_buf + con->rx_leftover;
779 iov.iov_len = con->rx_buflen - con->rx_leftover;
780
781 memset(&msg, 0, sizeof(msg));
782 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
783 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
784 msg.msg_flags);
785 if (ret <= 0)
786 goto out_close;
787 else if (ret == iov.iov_len)
788 call_again_soon = 1;
789
790 /* new buflen according readed bytes and leftover from last receive */
791 buflen = ret + con->rx_leftover;
792 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
793 if (ret < 0)
794 goto out_close;
795
796 /* calculate leftover bytes from process and put it into begin of
797 * the receive buffer, so next receive we have the full message
798 * at the start address of the receive buffer.
799 */
800 con->rx_leftover = buflen - ret;
801 if (con->rx_leftover) {
802 memmove(con->rx_buf, con->rx_buf + ret,
803 con->rx_leftover);
804 call_again_soon = true;
805 }
806
807 if (call_again_soon)
808 goto out_resched;
809
810 mutex_unlock(&con->sock_mutex);
811 return 0;
812
813out_resched:
814 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
815 queue_work(recv_workqueue, &con->rwork);
816 mutex_unlock(&con->sock_mutex);
817 return -EAGAIN;
818
819out_close:
820 mutex_unlock(&con->sock_mutex);
821 if (ret != -EAGAIN) {
822 /* Reconnect when there is something to send */
823 close_connection(con, false, true, false);
824 if (ret == 0) {
825 log_print("connection %p got EOF from %d",
826 con, con->nodeid);
827 /* handling for tcp shutdown */
828 clear_bit(CF_SHUTDOWN, &con->flags);
829 wake_up(&con->shutdown_wait);
830 /* signal to breaking receive worker */
831 ret = -1;
832 }
833 }
834 return ret;
835}
836
837/* Listening socket is busy, accept a connection */
838static int accept_from_sock(struct listen_connection *con)
839{
840 int result;
841 struct sockaddr_storage peeraddr;
842 struct socket *newsock;
843 int len;
844 int nodeid;
845 struct connection *newcon;
846 struct connection *addcon;
847 unsigned int mark;
848
849 if (!dlm_allow_conn) {
850 return -1;
851 }
852
853 if (!con->sock)
854 return -ENOTCONN;
855
856 result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
857 if (result < 0)
858 goto accept_err;
859
860 /* Get the connected socket's peer */
861 memset(&peeraddr, 0, sizeof(peeraddr));
862 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
863 if (len < 0) {
864 result = -ECONNABORTED;
865 goto accept_err;
866 }
867
868 /* Get the new node's NODEID */
869 make_sockaddr(&peeraddr, 0, &len);
870 if (addr_to_nodeid(&peeraddr, &nodeid)) {
871 unsigned char *b=(unsigned char *)&peeraddr;
872 log_print("connect from non cluster node");
873 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
874 b, sizeof(struct sockaddr_storage));
875 sock_release(newsock);
876 return -1;
877 }
878
879 dlm_comm_mark(nodeid, &mark);
880 sock_set_mark(newsock->sk, mark);
881
882 log_print("got connection from %d", nodeid);
883
884 /* Check to see if we already have a connection to this node. This
885 * could happen if the two nodes initiate a connection at roughly
886 * the same time and the connections cross on the wire.
887 * In this case we store the incoming one in "othercon"
888 */
889 newcon = nodeid2con(nodeid, GFP_NOFS);
890 if (!newcon) {
891 result = -ENOMEM;
892 goto accept_err;
893 }
894
895 mutex_lock(&newcon->sock_mutex);
896 if (newcon->sock) {
897 struct connection *othercon = newcon->othercon;
898
899 if (!othercon) {
900 othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
901 if (!othercon) {
902 log_print("failed to allocate incoming socket");
903 mutex_unlock(&newcon->sock_mutex);
904 result = -ENOMEM;
905 goto accept_err;
906 }
907
908 result = dlm_con_init(othercon, nodeid);
909 if (result < 0) {
910 kfree(othercon);
911 goto accept_err;
912 }
913
914 newcon->othercon = othercon;
915 } else {
916 /* close other sock con if we have something new */
917 close_connection(othercon, false, true, false);
918 }
919
920 mutex_lock_nested(&othercon->sock_mutex, 1);
921 add_sock(newsock, othercon);
922 addcon = othercon;
923 mutex_unlock(&othercon->sock_mutex);
924 }
925 else {
926 /* accept copies the sk after we've saved the callbacks, so we
927 don't want to save them a second time or comm errors will
928 result in calling sk_error_report recursively. */
929 add_sock(newsock, newcon);
930 addcon = newcon;
931 }
932
933 mutex_unlock(&newcon->sock_mutex);
934
935 /*
936 * Add it to the active queue in case we got data
937 * between processing the accept adding the socket
938 * to the read_sockets list
939 */
940 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
941 queue_work(recv_workqueue, &addcon->rwork);
942
943 return 0;
944
945accept_err:
946 if (newsock)
947 sock_release(newsock);
948
949 if (result != -EAGAIN)
950 log_print("error accepting connection from node: %d", result);
951 return result;
952}
953
954static void free_entry(struct writequeue_entry *e)
955{
956 __free_page(e->page);
957 kfree(e);
958}
959
960/*
961 * writequeue_entry_complete - try to delete and free write queue entry
962 * @e: write queue entry to try to delete
963 * @completed: bytes completed
964 *
965 * writequeue_lock must be held.
966 */
967static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
968{
969 e->offset += completed;
970 e->len -= completed;
971
972 if (e->len == 0 && e->users == 0) {
973 list_del(&e->list);
974 free_entry(e);
975 }
976}
977
978/*
979 * sctp_bind_addrs - bind a SCTP socket to all our addresses
980 */
981static int sctp_bind_addrs(struct socket *sock, uint16_t port)
982{
983 struct sockaddr_storage localaddr;
984 struct sockaddr *addr = (struct sockaddr *)&localaddr;
985 int i, addr_len, result = 0;
986
987 for (i = 0; i < dlm_local_count; i++) {
988 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
989 make_sockaddr(&localaddr, port, &addr_len);
990
991 if (!i)
992 result = kernel_bind(sock, addr, addr_len);
993 else
994 result = sock_bind_add(sock->sk, addr, addr_len);
995
996 if (result < 0) {
997 log_print("Can't bind to %d addr number %d, %d.\n",
998 port, i + 1, result);
999 break;
1000 }
1001 }
1002 return result;
1003}
1004
1005/* Initiate an SCTP association.
1006 This is a special case of send_to_sock() in that we don't yet have a
1007 peeled-off socket for this association, so we use the listening socket
1008 and add the primary IP address of the remote node.
1009 */
1010static void sctp_connect_to_sock(struct connection *con)
1011{
1012 struct sockaddr_storage daddr;
1013 int result;
1014 int addr_len;
1015 struct socket *sock;
1016 unsigned int mark;
1017
1018 dlm_comm_mark(con->nodeid, &mark);
1019
1020 mutex_lock(&con->sock_mutex);
1021
1022 /* Some odd races can cause double-connects, ignore them */
1023 if (con->retries++ > MAX_CONNECT_RETRIES)
1024 goto out;
1025
1026 if (con->sock) {
1027 log_print("node %d already connected.", con->nodeid);
1028 goto out;
1029 }
1030
1031 memset(&daddr, 0, sizeof(daddr));
1032 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
1033 if (result < 0) {
1034 log_print("no address for nodeid %d", con->nodeid);
1035 goto out;
1036 }
1037
1038 /* Create a socket to communicate with */
1039 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1040 SOCK_STREAM, IPPROTO_SCTP, &sock);
1041 if (result < 0)
1042 goto socket_err;
1043
1044 sock_set_mark(sock->sk, mark);
1045
1046 add_sock(sock, con);
1047
1048 /* Bind to all addresses. */
1049 if (sctp_bind_addrs(con->sock, 0))
1050 goto bind_err;
1051
1052 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1053
1054 log_print("connecting to %d", con->nodeid);
1055
1056 /* Turn off Nagle's algorithm */
1057 sctp_sock_set_nodelay(sock->sk);
1058
1059 /*
1060 * Make sock->ops->connect() function return in specified time,
1061 * since O_NONBLOCK argument in connect() function does not work here,
1062 * then, we should restore the default value of this attribute.
1063 */
1064 sock_set_sndtimeo(sock->sk, 5);
1065 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1066 0);
1067 sock_set_sndtimeo(sock->sk, 0);
1068
1069 if (result == -EINPROGRESS)
1070 result = 0;
1071 if (result == 0) {
1072 if (!test_and_set_bit(CF_CONNECTED, &con->flags))
1073 log_print("successful connected to node %d", con->nodeid);
1074 goto out;
1075 }
1076
1077bind_err:
1078 con->sock = NULL;
1079 sock_release(sock);
1080
1081socket_err:
1082 /*
1083 * Some errors are fatal and this list might need adjusting. For other
1084 * errors we try again until the max number of retries is reached.
1085 */
1086 if (result != -EHOSTUNREACH &&
1087 result != -ENETUNREACH &&
1088 result != -ENETDOWN &&
1089 result != -EINVAL &&
1090 result != -EPROTONOSUPPORT) {
1091 log_print("connect %d try %d error %d", con->nodeid,
1092 con->retries, result);
1093 mutex_unlock(&con->sock_mutex);
1094 msleep(1000);
1095 lowcomms_connect_sock(con);
1096 return;
1097 }
1098
1099out:
1100 mutex_unlock(&con->sock_mutex);
1101}
1102
1103/* Connect a new socket to its peer */
1104static void tcp_connect_to_sock(struct connection *con)
1105{
1106 struct sockaddr_storage saddr, src_addr;
1107 int addr_len;
1108 struct socket *sock = NULL;
1109 unsigned int mark;
1110 int result;
1111
1112 dlm_comm_mark(con->nodeid, &mark);
1113
1114 mutex_lock(&con->sock_mutex);
1115 if (con->retries++ > MAX_CONNECT_RETRIES)
1116 goto out;
1117
1118 /* Some odd races can cause double-connects, ignore them */
1119 if (con->sock)
1120 goto out;
1121
1122 /* Create a socket to communicate with */
1123 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1124 SOCK_STREAM, IPPROTO_TCP, &sock);
1125 if (result < 0)
1126 goto out_err;
1127
1128 sock_set_mark(sock->sk, mark);
1129
1130 memset(&saddr, 0, sizeof(saddr));
1131 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1132 if (result < 0) {
1133 log_print("no address for nodeid %d", con->nodeid);
1134 goto out_err;
1135 }
1136
1137 add_sock(sock, con);
1138
1139 /* Bind to our cluster-known address connecting to avoid
1140 routing problems */
1141 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1142 make_sockaddr(&src_addr, 0, &addr_len);
1143 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1144 addr_len);
1145 if (result < 0) {
1146 log_print("could not bind for connect: %d", result);
1147 /* This *may* not indicate a critical error */
1148 }
1149
1150 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1151
1152 log_print("connecting to %d", con->nodeid);
1153
1154 /* Turn off Nagle's algorithm */
1155 tcp_sock_set_nodelay(sock->sk);
1156
1157 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1158 O_NONBLOCK);
1159 if (result == -EINPROGRESS)
1160 result = 0;
1161 if (result == 0)
1162 goto out;
1163
1164out_err:
1165 if (con->sock) {
1166 sock_release(con->sock);
1167 con->sock = NULL;
1168 } else if (sock) {
1169 sock_release(sock);
1170 }
1171 /*
1172 * Some errors are fatal and this list might need adjusting. For other
1173 * errors we try again until the max number of retries is reached.
1174 */
1175 if (result != -EHOSTUNREACH &&
1176 result != -ENETUNREACH &&
1177 result != -ENETDOWN &&
1178 result != -EINVAL &&
1179 result != -EPROTONOSUPPORT) {
1180 log_print("connect %d try %d error %d", con->nodeid,
1181 con->retries, result);
1182 mutex_unlock(&con->sock_mutex);
1183 msleep(1000);
1184 lowcomms_connect_sock(con);
1185 return;
1186 }
1187out:
1188 mutex_unlock(&con->sock_mutex);
1189 return;
1190}
1191
1192/* On error caller must run dlm_close_sock() for the
1193 * listen connection socket.
1194 */
1195static int tcp_create_listen_sock(struct listen_connection *con,
1196 struct sockaddr_storage *saddr)
1197{
1198 struct socket *sock = NULL;
1199 int result = 0;
1200 int addr_len;
1201
1202 if (dlm_local_addr[0]->ss_family == AF_INET)
1203 addr_len = sizeof(struct sockaddr_in);
1204 else
1205 addr_len = sizeof(struct sockaddr_in6);
1206
1207 /* Create a socket to communicate with */
1208 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1209 SOCK_STREAM, IPPROTO_TCP, &sock);
1210 if (result < 0) {
1211 log_print("Can't create listening comms socket");
1212 goto create_out;
1213 }
1214
1215 sock_set_mark(sock->sk, dlm_config.ci_mark);
1216
1217 /* Turn off Nagle's algorithm */
1218 tcp_sock_set_nodelay(sock->sk);
1219
1220 sock_set_reuseaddr(sock->sk);
1221
1222 add_listen_sock(sock, con);
1223
1224 /* Bind to our port */
1225 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1226 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1227 if (result < 0) {
1228 log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1229 goto create_out;
1230 }
1231 sock_set_keepalive(sock->sk);
1232
1233 result = sock->ops->listen(sock, 5);
1234 if (result < 0) {
1235 log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1236 goto create_out;
1237 }
1238
1239 return 0;
1240
1241create_out:
1242 return result;
1243}
1244
1245/* Get local addresses */
1246static void init_local(void)
1247{
1248 struct sockaddr_storage sas, *addr;
1249 int i;
1250
1251 dlm_local_count = 0;
1252 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1253 if (dlm_our_addr(&sas, i))
1254 break;
1255
1256 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
1257 if (!addr)
1258 break;
1259 dlm_local_addr[dlm_local_count++] = addr;
1260 }
1261}
1262
1263static void deinit_local(void)
1264{
1265 int i;
1266
1267 for (i = 0; i < dlm_local_count; i++)
1268 kfree(dlm_local_addr[i]);
1269}
1270
1271/* Initialise SCTP socket and bind to all interfaces
1272 * On error caller must run dlm_close_sock() for the
1273 * listen connection socket.
1274 */
1275static int sctp_listen_for_all(struct listen_connection *con)
1276{
1277 struct socket *sock = NULL;
1278 int result = -EINVAL;
1279
1280 log_print("Using SCTP for communications");
1281
1282 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1283 SOCK_STREAM, IPPROTO_SCTP, &sock);
1284 if (result < 0) {
1285 log_print("Can't create comms socket, check SCTP is loaded");
1286 goto out;
1287 }
1288
1289 sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1290 sock_set_mark(sock->sk, dlm_config.ci_mark);
1291 sctp_sock_set_nodelay(sock->sk);
1292
1293 add_listen_sock(sock, con);
1294
1295 /* Bind to all addresses. */
1296 result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port);
1297 if (result < 0)
1298 goto out;
1299
1300 result = sock->ops->listen(sock, 5);
1301 if (result < 0) {
1302 log_print("Can't set socket listening");
1303 goto out;
1304 }
1305
1306 return 0;
1307
1308out:
1309 return result;
1310}
1311
1312static int tcp_listen_for_all(void)
1313{
1314 /* We don't support multi-homed hosts */
1315 if (dlm_local_count > 1) {
1316 log_print("TCP protocol can't handle multi-homed hosts, "
1317 "try SCTP");
1318 return -EINVAL;
1319 }
1320
1321 log_print("Using TCP for communications");
1322
1323 return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]);
1324}
1325
1326
1327
1328static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1329 gfp_t allocation)
1330{
1331 struct writequeue_entry *entry;
1332
1333 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1334 if (!entry)
1335 return NULL;
1336
1337 entry->page = alloc_page(allocation);
1338 if (!entry->page) {
1339 kfree(entry);
1340 return NULL;
1341 }
1342
1343 entry->offset = 0;
1344 entry->len = 0;
1345 entry->end = 0;
1346 entry->users = 0;
1347 entry->con = con;
1348
1349 return entry;
1350}
1351
1352void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1353{
1354 struct connection *con;
1355 struct writequeue_entry *e;
1356 int offset = 0;
1357
1358 if (len > LOWCOMMS_MAX_TX_BUFFER_LEN) {
1359 BUILD_BUG_ON(PAGE_SIZE < LOWCOMMS_MAX_TX_BUFFER_LEN);
1360 log_print("failed to allocate a buffer of size %d", len);
1361 return NULL;
1362 }
1363
1364 con = nodeid2con(nodeid, allocation);
1365 if (!con)
1366 return NULL;
1367
1368 spin_lock(&con->writequeue_lock);
1369 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1370 if ((&e->list == &con->writequeue) ||
1371 (PAGE_SIZE - e->end < len)) {
1372 e = NULL;
1373 } else {
1374 offset = e->end;
1375 e->end += len;
1376 e->users++;
1377 }
1378 spin_unlock(&con->writequeue_lock);
1379
1380 if (e) {
1381 got_one:
1382 *ppc = page_address(e->page) + offset;
1383 return e;
1384 }
1385
1386 e = new_writequeue_entry(con, allocation);
1387 if (e) {
1388 spin_lock(&con->writequeue_lock);
1389 offset = e->end;
1390 e->end += len;
1391 e->users++;
1392 list_add_tail(&e->list, &con->writequeue);
1393 spin_unlock(&con->writequeue_lock);
1394 goto got_one;
1395 }
1396 return NULL;
1397}
1398
1399void dlm_lowcomms_commit_buffer(void *mh)
1400{
1401 struct writequeue_entry *e = (struct writequeue_entry *)mh;
1402 struct connection *con = e->con;
1403 int users;
1404
1405 spin_lock(&con->writequeue_lock);
1406 users = --e->users;
1407 if (users)
1408 goto out;
1409 e->len = e->end - e->offset;
1410 spin_unlock(&con->writequeue_lock);
1411
1412 queue_work(send_workqueue, &con->swork);
1413 return;
1414
1415out:
1416 spin_unlock(&con->writequeue_lock);
1417 return;
1418}
1419
1420/* Send a message */
1421static void send_to_sock(struct connection *con)
1422{
1423 int ret = 0;
1424 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1425 struct writequeue_entry *e;
1426 int len, offset;
1427 int count = 0;
1428
1429 mutex_lock(&con->sock_mutex);
1430 if (con->sock == NULL)
1431 goto out_connect;
1432
1433 spin_lock(&con->writequeue_lock);
1434 for (;;) {
1435 e = list_entry(con->writequeue.next, struct writequeue_entry,
1436 list);
1437 if ((struct list_head *) e == &con->writequeue)
1438 break;
1439
1440 len = e->len;
1441 offset = e->offset;
1442 BUG_ON(len == 0 && e->users == 0);
1443 spin_unlock(&con->writequeue_lock);
1444
1445 ret = 0;
1446 if (len) {
1447 ret = kernel_sendpage(con->sock, e->page, offset, len,
1448 msg_flags);
1449 if (ret == -EAGAIN || ret == 0) {
1450 if (ret == -EAGAIN &&
1451 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1452 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1453 /* Notify TCP that we're limited by the
1454 * application window size.
1455 */
1456 set_bit(SOCK_NOSPACE, &con->sock->flags);
1457 con->sock->sk->sk_write_pending++;
1458 }
1459 cond_resched();
1460 goto out;
1461 } else if (ret < 0)
1462 goto send_error;
1463 }
1464
1465 /* Don't starve people filling buffers */
1466 if (++count >= MAX_SEND_MSG_COUNT) {
1467 cond_resched();
1468 count = 0;
1469 }
1470
1471 spin_lock(&con->writequeue_lock);
1472 writequeue_entry_complete(e, ret);
1473 }
1474 spin_unlock(&con->writequeue_lock);
1475out:
1476 mutex_unlock(&con->sock_mutex);
1477 return;
1478
1479send_error:
1480 mutex_unlock(&con->sock_mutex);
1481 close_connection(con, false, false, true);
1482 /* Requeue the send work. When the work daemon runs again, it will try
1483 a new connection, then call this function again. */
1484 queue_work(send_workqueue, &con->swork);
1485 return;
1486
1487out_connect:
1488 mutex_unlock(&con->sock_mutex);
1489 queue_work(send_workqueue, &con->swork);
1490 cond_resched();
1491}
1492
1493static void clean_one_writequeue(struct connection *con)
1494{
1495 struct writequeue_entry *e, *safe;
1496
1497 spin_lock(&con->writequeue_lock);
1498 list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1499 list_del(&e->list);
1500 free_entry(e);
1501 }
1502 spin_unlock(&con->writequeue_lock);
1503}
1504
1505/* Called from recovery when it knows that a node has
1506 left the cluster */
1507int dlm_lowcomms_close(int nodeid)
1508{
1509 struct connection *con;
1510 struct dlm_node_addr *na;
1511
1512 log_print("closing connection to node %d", nodeid);
1513 con = nodeid2con(nodeid, 0);
1514 if (con) {
1515 set_bit(CF_CLOSE, &con->flags);
1516 close_connection(con, true, true, true);
1517 clean_one_writequeue(con);
1518 if (con->othercon)
1519 clean_one_writequeue(con->othercon);
1520 }
1521
1522 spin_lock(&dlm_node_addrs_spin);
1523 na = find_node_addr(nodeid);
1524 if (na) {
1525 list_del(&na->list);
1526 while (na->addr_count--)
1527 kfree(na->addr[na->addr_count]);
1528 kfree(na);
1529 }
1530 spin_unlock(&dlm_node_addrs_spin);
1531
1532 return 0;
1533}
1534
1535/* Receive workqueue function */
1536static void process_recv_sockets(struct work_struct *work)
1537{
1538 struct connection *con = container_of(work, struct connection, rwork);
1539 int err;
1540
1541 clear_bit(CF_READ_PENDING, &con->flags);
1542 do {
1543 err = receive_from_sock(con);
1544 } while (!err);
1545}
1546
1547static void process_listen_recv_socket(struct work_struct *work)
1548{
1549 accept_from_sock(&listen_con);
1550}
1551
1552/* Send workqueue function */
1553static void process_send_sockets(struct work_struct *work)
1554{
1555 struct connection *con = container_of(work, struct connection, swork);
1556
1557 clear_bit(CF_WRITE_PENDING, &con->flags);
1558 if (con->sock == NULL) /* not mutex protected so check it inside too */
1559 con->connect_action(con);
1560 if (!list_empty(&con->writequeue))
1561 send_to_sock(con);
1562}
1563
1564static void work_stop(void)
1565{
1566 if (recv_workqueue)
1567 destroy_workqueue(recv_workqueue);
1568 if (send_workqueue)
1569 destroy_workqueue(send_workqueue);
1570}
1571
1572static int work_start(void)
1573{
1574 recv_workqueue = alloc_workqueue("dlm_recv",
1575 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1576 if (!recv_workqueue) {
1577 log_print("can't start dlm_recv");
1578 return -ENOMEM;
1579 }
1580
1581 send_workqueue = alloc_workqueue("dlm_send",
1582 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1583 if (!send_workqueue) {
1584 log_print("can't start dlm_send");
1585 destroy_workqueue(recv_workqueue);
1586 return -ENOMEM;
1587 }
1588
1589 return 0;
1590}
1591
1592static void _stop_conn(struct connection *con, bool and_other)
1593{
1594 mutex_lock(&con->sock_mutex);
1595 set_bit(CF_CLOSE, &con->flags);
1596 set_bit(CF_READ_PENDING, &con->flags);
1597 set_bit(CF_WRITE_PENDING, &con->flags);
1598 if (con->sock && con->sock->sk) {
1599 write_lock_bh(&con->sock->sk->sk_callback_lock);
1600 con->sock->sk->sk_user_data = NULL;
1601 write_unlock_bh(&con->sock->sk->sk_callback_lock);
1602 }
1603 if (con->othercon && and_other)
1604 _stop_conn(con->othercon, false);
1605 mutex_unlock(&con->sock_mutex);
1606}
1607
1608static void stop_conn(struct connection *con)
1609{
1610 _stop_conn(con, true);
1611}
1612
1613static void shutdown_conn(struct connection *con)
1614{
1615 if (con->shutdown_action)
1616 con->shutdown_action(con);
1617}
1618
1619static void connection_release(struct rcu_head *rcu)
1620{
1621 struct connection *con = container_of(rcu, struct connection, rcu);
1622
1623 kfree(con->rx_buf);
1624 kfree(con);
1625}
1626
1627static void free_conn(struct connection *con)
1628{
1629 close_connection(con, true, true, true);
1630 spin_lock(&connections_lock);
1631 hlist_del_rcu(&con->list);
1632 spin_unlock(&connections_lock);
1633 if (con->othercon) {
1634 clean_one_writequeue(con->othercon);
1635 call_srcu(&connections_srcu, &con->othercon->rcu,
1636 connection_release);
1637 }
1638 clean_one_writequeue(con);
1639 call_srcu(&connections_srcu, &con->rcu, connection_release);
1640}
1641
1642static void work_flush(void)
1643{
1644 int ok, idx;
1645 int i;
1646 struct connection *con;
1647
1648 do {
1649 ok = 1;
1650 foreach_conn(stop_conn);
1651 if (recv_workqueue)
1652 flush_workqueue(recv_workqueue);
1653 if (send_workqueue)
1654 flush_workqueue(send_workqueue);
1655 idx = srcu_read_lock(&connections_srcu);
1656 for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1657 hlist_for_each_entry_rcu(con, &connection_hash[i],
1658 list) {
1659 ok &= test_bit(CF_READ_PENDING, &con->flags);
1660 ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1661 if (con->othercon) {
1662 ok &= test_bit(CF_READ_PENDING,
1663 &con->othercon->flags);
1664 ok &= test_bit(CF_WRITE_PENDING,
1665 &con->othercon->flags);
1666 }
1667 }
1668 }
1669 srcu_read_unlock(&connections_srcu, idx);
1670 } while (!ok);
1671}
1672
1673void dlm_lowcomms_stop(void)
1674{
1675 /* Set all the flags to prevent any
1676 socket activity.
1677 */
1678 dlm_allow_conn = 0;
1679
1680 if (recv_workqueue)
1681 flush_workqueue(recv_workqueue);
1682 if (send_workqueue)
1683 flush_workqueue(send_workqueue);
1684
1685 dlm_close_sock(&listen_con.sock);
1686
1687 foreach_conn(shutdown_conn);
1688 work_flush();
1689 foreach_conn(free_conn);
1690 work_stop();
1691 deinit_local();
1692}
1693
1694int dlm_lowcomms_start(void)
1695{
1696 int error = -EINVAL;
1697 int i;
1698
1699 for (i = 0; i < CONN_HASH_SIZE; i++)
1700 INIT_HLIST_HEAD(&connection_hash[i]);
1701
1702 init_local();
1703 if (!dlm_local_count) {
1704 error = -ENOTCONN;
1705 log_print("no local IP address has been set");
1706 goto fail;
1707 }
1708
1709 INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1710
1711 error = work_start();
1712 if (error)
1713 goto fail;
1714
1715 dlm_allow_conn = 1;
1716
1717 /* Start listening */
1718 if (dlm_config.ci_protocol == 0)
1719 error = tcp_listen_for_all();
1720 else
1721 error = sctp_listen_for_all(&listen_con);
1722 if (error)
1723 goto fail_unlisten;
1724
1725 return 0;
1726
1727fail_unlisten:
1728 dlm_allow_conn = 0;
1729 dlm_close_sock(&listen_con.sock);
1730fail:
1731 return error;
1732}
1733
1734void dlm_lowcomms_exit(void)
1735{
1736 struct dlm_node_addr *na, *safe;
1737
1738 spin_lock(&dlm_node_addrs_spin);
1739 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1740 list_del(&na->list);
1741 while (na->addr_count--)
1742 kfree(na->addr[na->addr_count]);
1743 kfree(na);
1744 }
1745 spin_unlock(&dlm_node_addrs_spin);
1746}