Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12#include <linux/module.h>
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "recoverd.h"
18#include "dir.h"
19#include "midcomms.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "recover.h"
24#include "requestqueue.h"
25#include "user.h"
26#include "ast.h"
27
28static int ls_count;
29static struct mutex ls_lock;
30static struct list_head lslist;
31static spinlock_t lslist_lock;
32
33static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34{
35 ssize_t ret = len;
36 int n;
37 int rc = kstrtoint(buf, 0, &n);
38
39 if (rc)
40 return rc;
41 ls = dlm_find_lockspace_local(ls);
42 if (!ls)
43 return -EINVAL;
44
45 switch (n) {
46 case 0:
47 dlm_ls_stop(ls);
48 break;
49 case 1:
50 dlm_ls_start(ls);
51 break;
52 default:
53 ret = -EINVAL;
54 }
55 dlm_put_lockspace(ls);
56 return ret;
57}
58
59static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60{
61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63 if (rc)
64 return rc;
65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 wake_up(&ls->ls_uevent_wait);
67 return len;
68}
69
70static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71{
72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73}
74
75static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76{
77 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79 if (rc)
80 return rc;
81 return len;
82}
83
84static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85{
86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87}
88
89static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90{
91 int val;
92 int rc = kstrtoint(buf, 0, &val);
93
94 if (rc)
95 return rc;
96 if (val == 1)
97 set_bit(LSFL_NODIR, &ls->ls_flags);
98 return len;
99}
100
101static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102{
103 uint32_t status = dlm_recover_status(ls);
104 return snprintf(buf, PAGE_SIZE, "%x\n", status);
105}
106
107static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108{
109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110}
111
112struct dlm_attr {
113 struct attribute attr;
114 ssize_t (*show)(struct dlm_ls *, char *);
115 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116};
117
118static struct dlm_attr dlm_attr_control = {
119 .attr = {.name = "control", .mode = S_IWUSR},
120 .store = dlm_control_store
121};
122
123static struct dlm_attr dlm_attr_event = {
124 .attr = {.name = "event_done", .mode = S_IWUSR},
125 .store = dlm_event_store
126};
127
128static struct dlm_attr dlm_attr_id = {
129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 .show = dlm_id_show,
131 .store = dlm_id_store
132};
133
134static struct dlm_attr dlm_attr_nodir = {
135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 .show = dlm_nodir_show,
137 .store = dlm_nodir_store
138};
139
140static struct dlm_attr dlm_attr_recover_status = {
141 .attr = {.name = "recover_status", .mode = S_IRUGO},
142 .show = dlm_recover_status_show
143};
144
145static struct dlm_attr dlm_attr_recover_nodeid = {
146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
147 .show = dlm_recover_nodeid_show
148};
149
150static struct attribute *dlm_attrs[] = {
151 &dlm_attr_control.attr,
152 &dlm_attr_event.attr,
153 &dlm_attr_id.attr,
154 &dlm_attr_nodir.attr,
155 &dlm_attr_recover_status.attr,
156 &dlm_attr_recover_nodeid.attr,
157 NULL,
158};
159ATTRIBUTE_GROUPS(dlm);
160
161static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 char *buf)
163{
164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 return a->show ? a->show(ls, buf) : 0;
167}
168
169static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 const char *buf, size_t len)
171{
172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 return a->store ? a->store(ls, buf, len) : len;
175}
176
177static void lockspace_kobj_release(struct kobject *k)
178{
179 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
180 kfree(ls);
181}
182
183static const struct sysfs_ops dlm_attr_ops = {
184 .show = dlm_attr_show,
185 .store = dlm_attr_store,
186};
187
188static struct kobj_type dlm_ktype = {
189 .default_groups = dlm_groups,
190 .sysfs_ops = &dlm_attr_ops,
191 .release = lockspace_kobj_release,
192};
193
194static struct kset *dlm_kset;
195
196static int do_uevent(struct dlm_ls *ls, int in)
197{
198 if (in)
199 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
200 else
201 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
202
203 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
204
205 /* dlm_controld will see the uevent, do the necessary group management
206 and then write to sysfs to wake us */
207
208 wait_event(ls->ls_uevent_wait,
209 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
210
211 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
212
213 return ls->ls_uevent_result;
214}
215
216static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
217{
218 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219
220 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
221 return 0;
222}
223
224static const struct kset_uevent_ops dlm_uevent_ops = {
225 .uevent = dlm_uevent,
226};
227
228int __init dlm_lockspace_init(void)
229{
230 ls_count = 0;
231 mutex_init(&ls_lock);
232 INIT_LIST_HEAD(&lslist);
233 spin_lock_init(&lslist_lock);
234
235 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
236 if (!dlm_kset) {
237 printk(KERN_WARNING "%s: can not create kset\n", __func__);
238 return -ENOMEM;
239 }
240 return 0;
241}
242
243void dlm_lockspace_exit(void)
244{
245 kset_unregister(dlm_kset);
246}
247
248struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
249{
250 struct dlm_ls *ls;
251
252 spin_lock_bh(&lslist_lock);
253
254 list_for_each_entry(ls, &lslist, ls_list) {
255 if (ls->ls_global_id == id) {
256 atomic_inc(&ls->ls_count);
257 goto out;
258 }
259 }
260 ls = NULL;
261 out:
262 spin_unlock_bh(&lslist_lock);
263 return ls;
264}
265
266struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
267{
268 struct dlm_ls *ls = lockspace;
269
270 atomic_inc(&ls->ls_count);
271 return ls;
272}
273
274struct dlm_ls *dlm_find_lockspace_device(int minor)
275{
276 struct dlm_ls *ls;
277
278 spin_lock_bh(&lslist_lock);
279 list_for_each_entry(ls, &lslist, ls_list) {
280 if (ls->ls_device.minor == minor) {
281 atomic_inc(&ls->ls_count);
282 goto out;
283 }
284 }
285 ls = NULL;
286 out:
287 spin_unlock_bh(&lslist_lock);
288 return ls;
289}
290
291void dlm_put_lockspace(struct dlm_ls *ls)
292{
293 if (atomic_dec_and_test(&ls->ls_count))
294 wake_up(&ls->ls_count_wait);
295}
296
297static void remove_lockspace(struct dlm_ls *ls)
298{
299retry:
300 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
301
302 spin_lock_bh(&lslist_lock);
303 if (atomic_read(&ls->ls_count) != 0) {
304 spin_unlock_bh(&lslist_lock);
305 goto retry;
306 }
307
308 WARN_ON(ls->ls_create_count != 0);
309 list_del(&ls->ls_list);
310 spin_unlock_bh(&lslist_lock);
311}
312
313static int threads_start(void)
314{
315 int error;
316
317 /* Thread for sending/receiving messages for all lockspace's */
318 error = dlm_midcomms_start();
319 if (error)
320 log_print("cannot start dlm midcomms %d", error);
321
322 return error;
323}
324
325static int new_lockspace(const char *name, const char *cluster,
326 uint32_t flags, int lvblen,
327 const struct dlm_lockspace_ops *ops, void *ops_arg,
328 int *ops_result, dlm_lockspace_t **lockspace)
329{
330 struct dlm_ls *ls;
331 int do_unreg = 0;
332 int namelen = strlen(name);
333 int error;
334
335 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
336 return -EINVAL;
337
338 if (lvblen % 8)
339 return -EINVAL;
340
341 if (!try_module_get(THIS_MODULE))
342 return -EINVAL;
343
344 if (!dlm_user_daemon_available()) {
345 log_print("dlm user daemon not available");
346 error = -EUNATCH;
347 goto out;
348 }
349
350 if (ops && ops_result) {
351 if (!dlm_config.ci_recover_callbacks)
352 *ops_result = -EOPNOTSUPP;
353 else
354 *ops_result = 0;
355 }
356
357 if (!cluster)
358 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
359 dlm_config.ci_cluster_name);
360
361 if (dlm_config.ci_recover_callbacks && cluster &&
362 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
363 log_print("dlm cluster name '%s' does not match "
364 "the application cluster name '%s'",
365 dlm_config.ci_cluster_name, cluster);
366 error = -EBADR;
367 goto out;
368 }
369
370 error = 0;
371
372 spin_lock_bh(&lslist_lock);
373 list_for_each_entry(ls, &lslist, ls_list) {
374 WARN_ON(ls->ls_create_count <= 0);
375 if (ls->ls_namelen != namelen)
376 continue;
377 if (memcmp(ls->ls_name, name, namelen))
378 continue;
379 if (flags & DLM_LSFL_NEWEXCL) {
380 error = -EEXIST;
381 break;
382 }
383 ls->ls_create_count++;
384 *lockspace = ls;
385 error = 1;
386 break;
387 }
388 spin_unlock_bh(&lslist_lock);
389
390 if (error)
391 goto out;
392
393 error = -ENOMEM;
394
395 ls = kzalloc(sizeof(*ls), GFP_NOFS);
396 if (!ls)
397 goto out;
398 memcpy(ls->ls_name, name, namelen);
399 ls->ls_namelen = namelen;
400 ls->ls_lvblen = lvblen;
401 atomic_set(&ls->ls_count, 0);
402 init_waitqueue_head(&ls->ls_count_wait);
403 ls->ls_flags = 0;
404
405 if (ops && dlm_config.ci_recover_callbacks) {
406 ls->ls_ops = ops;
407 ls->ls_ops_arg = ops_arg;
408 }
409
410 if (flags & DLM_LSFL_SOFTIRQ)
411 set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
412
413 /* ls_exflags are forced to match among nodes, and we don't
414 * need to require all nodes to have some flags set
415 */
416 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
417 DLM_LSFL_SOFTIRQ));
418
419 INIT_LIST_HEAD(&ls->ls_slow_inactive);
420 INIT_LIST_HEAD(&ls->ls_slow_active);
421 rwlock_init(&ls->ls_rsbtbl_lock);
422
423 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
424 if (error)
425 goto out_lsfree;
426
427 xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
428 rwlock_init(&ls->ls_lkbxa_lock);
429
430 INIT_LIST_HEAD(&ls->ls_waiters);
431 spin_lock_init(&ls->ls_waiters_lock);
432 INIT_LIST_HEAD(&ls->ls_orphans);
433 spin_lock_init(&ls->ls_orphans_lock);
434
435 INIT_LIST_HEAD(&ls->ls_nodes);
436 INIT_LIST_HEAD(&ls->ls_nodes_gone);
437 ls->ls_num_nodes = 0;
438 ls->ls_low_nodeid = 0;
439 ls->ls_total_weight = 0;
440 ls->ls_node_array = NULL;
441
442 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
443 ls->ls_local_rsb.res_ls = ls;
444
445 ls->ls_debug_rsb_dentry = NULL;
446 ls->ls_debug_waiters_dentry = NULL;
447
448 init_waitqueue_head(&ls->ls_uevent_wait);
449 ls->ls_uevent_result = 0;
450 init_completion(&ls->ls_recovery_done);
451 ls->ls_recovery_result = -1;
452
453 spin_lock_init(&ls->ls_cb_lock);
454 INIT_LIST_HEAD(&ls->ls_cb_delay);
455
456 ls->ls_recoverd_task = NULL;
457 mutex_init(&ls->ls_recoverd_active);
458 spin_lock_init(&ls->ls_recover_lock);
459 spin_lock_init(&ls->ls_rcom_spin);
460 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
461 ls->ls_recover_status = 0;
462 ls->ls_recover_seq = get_random_u64();
463 ls->ls_recover_args = NULL;
464 init_rwsem(&ls->ls_in_recovery);
465 rwlock_init(&ls->ls_recv_active);
466 INIT_LIST_HEAD(&ls->ls_requestqueue);
467 rwlock_init(&ls->ls_requestqueue_lock);
468 spin_lock_init(&ls->ls_clear_proc_locks);
469
470 /* Due backwards compatibility with 3.1 we need to use maximum
471 * possible dlm message size to be sure the message will fit and
472 * not having out of bounds issues. However on sending side 3.2
473 * might send less.
474 */
475 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
476 if (!ls->ls_recover_buf) {
477 error = -ENOMEM;
478 goto out_lkbxa;
479 }
480
481 ls->ls_slot = 0;
482 ls->ls_num_slots = 0;
483 ls->ls_slots_size = 0;
484 ls->ls_slots = NULL;
485
486 INIT_LIST_HEAD(&ls->ls_recover_list);
487 spin_lock_init(&ls->ls_recover_list_lock);
488 xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
489 spin_lock_init(&ls->ls_recover_xa_lock);
490 ls->ls_recover_list_count = 0;
491 init_waitqueue_head(&ls->ls_wait_general);
492 INIT_LIST_HEAD(&ls->ls_masters_list);
493 rwlock_init(&ls->ls_masters_lock);
494 INIT_LIST_HEAD(&ls->ls_dir_dump_list);
495 rwlock_init(&ls->ls_dir_dump_lock);
496
497 INIT_LIST_HEAD(&ls->ls_scan_list);
498 spin_lock_init(&ls->ls_scan_lock);
499 timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
500
501 spin_lock_bh(&lslist_lock);
502 ls->ls_create_count = 1;
503 list_add(&ls->ls_list, &lslist);
504 spin_unlock_bh(&lslist_lock);
505
506 if (flags & DLM_LSFL_FS)
507 set_bit(LSFL_FS, &ls->ls_flags);
508
509 error = dlm_callback_start(ls);
510 if (error) {
511 log_error(ls, "can't start dlm_callback %d", error);
512 goto out_delist;
513 }
514
515 init_waitqueue_head(&ls->ls_recover_lock_wait);
516
517 /*
518 * Once started, dlm_recoverd first looks for ls in lslist, then
519 * initializes ls_in_recovery as locked in "down" mode. We need
520 * to wait for the wakeup from dlm_recoverd because in_recovery
521 * has to start out in down mode.
522 */
523
524 error = dlm_recoverd_start(ls);
525 if (error) {
526 log_error(ls, "can't start dlm_recoverd %d", error);
527 goto out_callback;
528 }
529
530 wait_event(ls->ls_recover_lock_wait,
531 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
532
533 /* let kobject handle freeing of ls if there's an error */
534 do_unreg = 1;
535
536 ls->ls_kobj.kset = dlm_kset;
537 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
538 "%s", ls->ls_name);
539 if (error)
540 goto out_recoverd;
541 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
542
543 /* This uevent triggers dlm_controld in userspace to add us to the
544 group of nodes that are members of this lockspace (managed by the
545 cluster infrastructure.) Once it's done that, it tells us who the
546 current lockspace members are (via configfs) and then tells the
547 lockspace to start running (via sysfs) in dlm_ls_start(). */
548
549 error = do_uevent(ls, 1);
550 if (error)
551 goto out_recoverd;
552
553 /* wait until recovery is successful or failed */
554 wait_for_completion(&ls->ls_recovery_done);
555 error = ls->ls_recovery_result;
556 if (error)
557 goto out_members;
558
559 dlm_create_debug_file(ls);
560
561 log_rinfo(ls, "join complete");
562 *lockspace = ls;
563 return 0;
564
565 out_members:
566 do_uevent(ls, 0);
567 dlm_clear_members(ls);
568 kfree(ls->ls_node_array);
569 out_recoverd:
570 dlm_recoverd_stop(ls);
571 out_callback:
572 dlm_callback_stop(ls);
573 out_delist:
574 spin_lock_bh(&lslist_lock);
575 list_del(&ls->ls_list);
576 spin_unlock_bh(&lslist_lock);
577 xa_destroy(&ls->ls_recover_xa);
578 kfree(ls->ls_recover_buf);
579 out_lkbxa:
580 xa_destroy(&ls->ls_lkbxa);
581 rhashtable_destroy(&ls->ls_rsbtbl);
582 out_lsfree:
583 if (do_unreg)
584 kobject_put(&ls->ls_kobj);
585 else
586 kfree(ls);
587 out:
588 module_put(THIS_MODULE);
589 return error;
590}
591
592static int __dlm_new_lockspace(const char *name, const char *cluster,
593 uint32_t flags, int lvblen,
594 const struct dlm_lockspace_ops *ops,
595 void *ops_arg, int *ops_result,
596 dlm_lockspace_t **lockspace)
597{
598 int error = 0;
599
600 mutex_lock(&ls_lock);
601 if (!ls_count)
602 error = threads_start();
603 if (error)
604 goto out;
605
606 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
607 ops_result, lockspace);
608 if (!error)
609 ls_count++;
610 if (error > 0)
611 error = 0;
612 if (!ls_count) {
613 dlm_midcomms_shutdown();
614 dlm_midcomms_stop();
615 }
616 out:
617 mutex_unlock(&ls_lock);
618 return error;
619}
620
621int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
622 int lvblen, const struct dlm_lockspace_ops *ops,
623 void *ops_arg, int *ops_result,
624 dlm_lockspace_t **lockspace)
625{
626 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
627 ops, ops_arg, ops_result, lockspace);
628}
629
630int dlm_new_user_lockspace(const char *name, const char *cluster,
631 uint32_t flags, int lvblen,
632 const struct dlm_lockspace_ops *ops,
633 void *ops_arg, int *ops_result,
634 dlm_lockspace_t **lockspace)
635{
636 if (flags & DLM_LSFL_SOFTIRQ)
637 return -EINVAL;
638
639 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
640 ops_arg, ops_result, lockspace);
641}
642
643static int lkb_idr_free(struct dlm_lkb *lkb)
644{
645 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
646 dlm_free_lvb(lkb->lkb_lvbptr);
647
648 dlm_free_lkb(lkb);
649 return 0;
650}
651
652/* NOTE: We check the lkbxa here rather than the resource table.
653 This is because there may be LKBs queued as ASTs that have been unlinked
654 from their RSBs and are pending deletion once the AST has been delivered */
655
656static int lockspace_busy(struct dlm_ls *ls, int force)
657{
658 struct dlm_lkb *lkb;
659 unsigned long id;
660 int rv = 0;
661
662 read_lock_bh(&ls->ls_lkbxa_lock);
663 if (force == 0) {
664 xa_for_each(&ls->ls_lkbxa, id, lkb) {
665 rv = 1;
666 break;
667 }
668 } else if (force == 1) {
669 xa_for_each(&ls->ls_lkbxa, id, lkb) {
670 if (lkb->lkb_nodeid == 0 &&
671 lkb->lkb_grmode != DLM_LOCK_IV) {
672 rv = 1;
673 break;
674 }
675 }
676 } else {
677 rv = 0;
678 }
679 read_unlock_bh(&ls->ls_lkbxa_lock);
680 return rv;
681}
682
683static void rhash_free_rsb(void *ptr, void *arg)
684{
685 struct dlm_rsb *rsb = ptr;
686
687 dlm_free_rsb(rsb);
688}
689
690static int release_lockspace(struct dlm_ls *ls, int force)
691{
692 struct dlm_lkb *lkb;
693 unsigned long id;
694 int busy, rv;
695
696 busy = lockspace_busy(ls, force);
697
698 spin_lock_bh(&lslist_lock);
699 if (ls->ls_create_count == 1) {
700 if (busy) {
701 rv = -EBUSY;
702 } else {
703 /* remove_lockspace takes ls off lslist */
704 ls->ls_create_count = 0;
705 rv = 0;
706 }
707 } else if (ls->ls_create_count > 1) {
708 rv = --ls->ls_create_count;
709 } else {
710 rv = -EINVAL;
711 }
712 spin_unlock_bh(&lslist_lock);
713
714 if (rv) {
715 log_debug(ls, "release_lockspace no remove %d", rv);
716 return rv;
717 }
718
719 if (ls_count == 1)
720 dlm_midcomms_version_wait();
721
722 dlm_device_deregister(ls);
723
724 if (force < 3 && dlm_user_daemon_available())
725 do_uevent(ls, 0);
726
727 dlm_recoverd_stop(ls);
728
729 /* clear the LSFL_RUNNING flag to fast up
730 * time_shutdown_sync(), we don't care anymore
731 */
732 clear_bit(LSFL_RUNNING, &ls->ls_flags);
733 timer_shutdown_sync(&ls->ls_scan_timer);
734
735 if (ls_count == 1) {
736 dlm_clear_members(ls);
737 dlm_midcomms_shutdown();
738 }
739
740 dlm_callback_stop(ls);
741
742 remove_lockspace(ls);
743
744 dlm_delete_debug_file(ls);
745
746 xa_destroy(&ls->ls_recover_xa);
747 kfree(ls->ls_recover_buf);
748
749 /*
750 * Free all lkb's in xa
751 */
752 xa_for_each(&ls->ls_lkbxa, id, lkb) {
753 lkb_idr_free(lkb);
754 }
755 xa_destroy(&ls->ls_lkbxa);
756
757 /*
758 * Free all rsb's on rsbtbl
759 */
760 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
761
762 /*
763 * Free structures on any other lists
764 */
765
766 dlm_purge_requestqueue(ls);
767 kfree(ls->ls_recover_args);
768 dlm_clear_members(ls);
769 dlm_clear_members_gone(ls);
770 kfree(ls->ls_node_array);
771 log_rinfo(ls, "release_lockspace final free");
772 kobject_put(&ls->ls_kobj);
773 /* The ls structure will be freed when the kobject is done with */
774
775 module_put(THIS_MODULE);
776 return 0;
777}
778
779/*
780 * Called when a system has released all its locks and is not going to use the
781 * lockspace any longer. We free everything we're managing for this lockspace.
782 * Remaining nodes will go through the recovery process as if we'd died. The
783 * lockspace must continue to function as usual, participating in recoveries,
784 * until this returns.
785 *
786 * Force has 4 possible values:
787 * 0 - don't destroy lockspace if it has any LKBs
788 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
789 * 2 - destroy lockspace regardless of LKBs
790 * 3 - destroy lockspace as part of a forced shutdown
791 */
792
793int dlm_release_lockspace(void *lockspace, int force)
794{
795 struct dlm_ls *ls;
796 int error;
797
798 ls = dlm_find_lockspace_local(lockspace);
799 if (!ls)
800 return -EINVAL;
801 dlm_put_lockspace(ls);
802
803 mutex_lock(&ls_lock);
804 error = release_lockspace(ls, force);
805 if (!error)
806 ls_count--;
807 if (!ls_count)
808 dlm_midcomms_stop();
809 mutex_unlock(&ls_lock);
810
811 return error;
812}
813
814void dlm_stop_lockspaces(void)
815{
816 struct dlm_ls *ls;
817 int count;
818
819 restart:
820 count = 0;
821 spin_lock_bh(&lslist_lock);
822 list_for_each_entry(ls, &lslist, ls_list) {
823 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
824 count++;
825 continue;
826 }
827 spin_unlock_bh(&lslist_lock);
828 log_error(ls, "no userland control daemon, stopping lockspace");
829 dlm_ls_stop(ls);
830 goto restart;
831 }
832 spin_unlock_bh(&lslist_lock);
833
834 if (count)
835 log_print("dlm user daemon left %d lockspaces", count);
836}