Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge tag 'dlm-6.11' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:

- New flag DLM_LSFL_SOFTIRQ_SAFE can be set by code using dlm to
indicate callbacks can be run from softirq

- Change md-cluster to set DLM_LSFL_SOFTIRQ_SAFE

- Clean up for previous changes, e.g. unused code and parameters

- Remove custom pre-allocation of rsb structs which is unnecessary with
kmem caches

- Change idr to xarray for lkb structs in use

- Change idr to xarray for rsb structs being recovered

- Change outdated naming related to internal rsb states

- Fix some incorrect add/remove of rsb on scan list

- Use rcu to free rsb structs

* tag 'dlm-6.11' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
dlm: add rcu_barrier before destroy kmem cache
dlm: remove DLM_LSFL_SOFTIRQ from exflags
fs: dlm: remove unused struct 'dlm_processed_nodes'
md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()
dlm: implement LSFL_SOFTIRQ_SAFE
dlm: introduce DLM_LSFL_SOFTIRQ_SAFE
dlm: use LSFL_FS to check for kernel lockspace
dlm: use rcu to avoid an extra rsb struct lookup
dlm: fix add_scan and del_scan usage
dlm: change list and timer names
dlm: move recover idr to xarray datastructure
dlm: move lkb idr to xarray datastructure
dlm: drop own rsb pre allocation mechanism
dlm: remove ls_local_handle from struct dlm_ls
dlm: remove unused parameter in dlm_midcomms_addr
dlm: don't kref_init rsbs created for toss list
dlm: remove scand leftovers

+583 -561
+1 -1
drivers/md/md-cluster.c
··· 887 887 memset(str, 0, 64); 888 888 sprintf(str, "%pU", mddev->uuid); 889 889 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 890 - 0, LVB_SIZE, &md_ls_ops, mddev, 890 + DLM_LSFL_SOFTIRQ, LVB_SIZE, &md_ls_ops, mddev, 891 891 &ops_rv, &cinfo->lockspace); 892 892 if (ret) 893 893 goto err;
+110 -60
fs/dlm/ast.c
··· 18 18 #include "user.h" 19 19 #include "ast.h" 20 20 21 + static void dlm_run_callback(uint32_t ls_id, uint32_t lkb_id, int8_t mode, 22 + uint32_t flags, uint8_t sb_flags, int sb_status, 23 + struct dlm_lksb *lksb, 24 + void (*astfn)(void *astparam), 25 + void (*bastfn)(void *astparam, int mode), 26 + void *astparam, const char *res_name, 27 + size_t res_length) 28 + { 29 + if (flags & DLM_CB_BAST) { 30 + trace_dlm_bast(ls_id, lkb_id, mode, res_name, res_length); 31 + bastfn(astparam, mode); 32 + } else if (flags & DLM_CB_CAST) { 33 + trace_dlm_ast(ls_id, lkb_id, sb_status, sb_flags, res_name, 34 + res_length); 35 + lksb->sb_status = sb_status; 36 + lksb->sb_flags = sb_flags; 37 + astfn(astparam); 38 + } 39 + } 40 + 41 + static void dlm_do_callback(struct dlm_callback *cb) 42 + { 43 + dlm_run_callback(cb->ls_id, cb->lkb_id, cb->mode, cb->flags, 44 + cb->sb_flags, cb->sb_status, cb->lkb_lksb, 45 + cb->astfn, cb->bastfn, cb->astparam, 46 + cb->res_name, cb->res_length); 47 + dlm_free_cb(cb); 48 + } 49 + 21 50 static void dlm_callback_work(struct work_struct *work) 22 51 { 23 52 struct dlm_callback *cb = container_of(work, struct dlm_callback, work); 24 53 25 - if (cb->flags & DLM_CB_BAST) { 26 - trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name, 27 - cb->res_length); 28 - cb->bastfn(cb->astparam, cb->mode); 29 - } else if (cb->flags & DLM_CB_CAST) { 30 - trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status, 31 - cb->sb_flags, cb->res_name, cb->res_length); 32 - cb->lkb_lksb->sb_status = cb->sb_status; 33 - cb->lkb_lksb->sb_flags = cb->sb_flags; 34 - cb->astfn(cb->astparam); 35 - } 36 - 37 - dlm_free_cb(cb); 54 + dlm_do_callback(cb); 38 55 } 39 56 40 - int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 41 - int status, uint32_t sbflags, 42 - struct dlm_callback **cb) 57 + bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 58 + int status, uint32_t sbflags, int *copy_lvb) 43 59 { 44 60 struct dlm_rsb *rsb = lkb->lkb_resource; 45 - int rv = DLM_ENQUEUE_CALLBACK_SUCCESS; 46 61 struct dlm_ls *ls = rsb->res_ls; 47 - int copy_lvb = 0; 48 62 int prev_mode; 63 + 64 + if (copy_lvb) 65 + *copy_lvb = 0; 49 66 50 67 if (flags & DLM_CB_BAST) { 51 68 /* if cb is a bast, it should be skipped if the blocking mode is ··· 73 56 log_debug(ls, "skip %x bast mode %d for cast mode %d", 74 57 lkb->lkb_id, mode, 75 58 lkb->lkb_last_cast_cb_mode); 76 - goto out; 59 + return true; 77 60 } 78 61 } 79 62 ··· 91 74 (prev_mode > mode && prev_mode > DLM_LOCK_PR)) { 92 75 log_debug(ls, "skip %x add bast mode %d for bast mode %d", 93 76 lkb->lkb_id, mode, prev_mode); 94 - goto out; 77 + return true; 95 78 } 96 79 } 97 80 ··· 102 85 prev_mode = lkb->lkb_last_cast_cb_mode; 103 86 104 87 if (!status && lkb->lkb_lksb->sb_lvbptr && 105 - dlm_lvb_operations[prev_mode + 1][mode + 1]) 106 - copy_lvb = 1; 88 + dlm_lvb_operations[prev_mode + 1][mode + 1]) { 89 + if (copy_lvb) 90 + *copy_lvb = 1; 91 + } 107 92 } 108 93 109 94 lkb->lkb_last_cast_cb_mode = mode; ··· 115 96 lkb->lkb_last_cb_mode = mode; 116 97 lkb->lkb_last_cb_flags = flags; 117 98 99 + return false; 100 + } 101 + 102 + int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, 103 + int status, uint32_t sbflags, 104 + struct dlm_callback **cb) 105 + { 106 + struct dlm_rsb *rsb = lkb->lkb_resource; 107 + struct dlm_ls *ls = rsb->res_ls; 108 + 118 109 *cb = dlm_allocate_cb(); 119 - if (!*cb) { 120 - rv = DLM_ENQUEUE_CALLBACK_FAILURE; 121 - goto out; 122 - } 110 + if (WARN_ON_ONCE(!*cb)) 111 + return -ENOMEM; 123 112 124 113 /* for tracing */ 125 114 (*cb)->lkb_id = lkb->lkb_id; ··· 139 112 (*cb)->mode = mode; 140 113 (*cb)->sb_status = status; 141 114 (*cb)->sb_flags = (sbflags & 0x000000FF); 142 - (*cb)->copy_lvb = copy_lvb; 143 115 (*cb)->lkb_lksb = lkb->lkb_lksb; 144 116 145 - rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED; 117 + return 0; 118 + } 146 119 147 - out: 148 - return rv; 120 + static int dlm_get_queue_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, 121 + int status, uint32_t sbflags, 122 + struct dlm_callback **cb) 123 + { 124 + int rv; 125 + 126 + rv = dlm_get_cb(lkb, flags, mode, status, sbflags, cb); 127 + if (rv) 128 + return rv; 129 + 130 + (*cb)->astfn = lkb->lkb_astfn; 131 + (*cb)->bastfn = lkb->lkb_bastfn; 132 + (*cb)->astparam = lkb->lkb_astparam; 133 + INIT_WORK(&(*cb)->work, dlm_callback_work); 134 + 135 + return 0; 149 136 } 150 137 151 138 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 152 - uint32_t sbflags) 139 + uint32_t sbflags) 153 140 { 154 - struct dlm_ls *ls = lkb->lkb_resource->res_ls; 141 + struct dlm_rsb *rsb = lkb->lkb_resource; 142 + struct dlm_ls *ls = rsb->res_ls; 155 143 struct dlm_callback *cb; 156 144 int rv; 157 145 ··· 175 133 return; 176 134 } 177 135 178 - rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, 179 - &cb); 180 - switch (rv) { 181 - case DLM_ENQUEUE_CALLBACK_NEED_SCHED: 182 - cb->astfn = lkb->lkb_astfn; 183 - cb->bastfn = lkb->lkb_bastfn; 184 - cb->astparam = lkb->lkb_astparam; 185 - INIT_WORK(&cb->work, dlm_callback_work); 136 + if (dlm_may_skip_callback(lkb, flags, mode, status, sbflags, NULL)) 137 + return; 186 138 187 - spin_lock_bh(&ls->ls_cb_lock); 188 - if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) 139 + spin_lock_bh(&ls->ls_cb_lock); 140 + if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { 141 + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); 142 + if (!rv) 189 143 list_add(&cb->list, &ls->ls_cb_delay); 190 - else 191 - queue_work(ls->ls_callback_wq, &cb->work); 192 - spin_unlock_bh(&ls->ls_cb_lock); 193 - break; 194 - case DLM_ENQUEUE_CALLBACK_SUCCESS: 195 - break; 196 - case DLM_ENQUEUE_CALLBACK_FAILURE: 197 - fallthrough; 198 - default: 199 - WARN_ON_ONCE(1); 200 - break; 144 + } else { 145 + if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) { 146 + dlm_run_callback(ls->ls_global_id, lkb->lkb_id, mode, flags, 147 + sbflags, status, lkb->lkb_lksb, 148 + lkb->lkb_astfn, lkb->lkb_bastfn, 149 + lkb->lkb_astparam, rsb->res_name, 150 + rsb->res_length); 151 + } else { 152 + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); 153 + if (!rv) 154 + queue_work(ls->ls_callback_wq, &cb->work); 155 + } 201 156 } 157 + spin_unlock_bh(&ls->ls_cb_lock); 202 158 } 203 159 204 160 int dlm_callback_start(struct dlm_ls *ls) 205 161 { 162 + if (!test_bit(LSFL_FS, &ls->ls_flags) || 163 + test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) 164 + return 0; 165 + 206 166 ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback", 207 167 WQ_HIGHPRI | WQ_MEM_RECLAIM); 208 168 if (!ls->ls_callback_wq) { ··· 222 178 223 179 void dlm_callback_suspend(struct dlm_ls *ls) 224 180 { 225 - if (ls->ls_callback_wq) { 226 - spin_lock_bh(&ls->ls_cb_lock); 227 - set_bit(LSFL_CB_DELAY, &ls->ls_flags); 228 - spin_unlock_bh(&ls->ls_cb_lock); 181 + if (!test_bit(LSFL_FS, &ls->ls_flags)) 182 + return; 229 183 184 + spin_lock_bh(&ls->ls_cb_lock); 185 + set_bit(LSFL_CB_DELAY, &ls->ls_flags); 186 + spin_unlock_bh(&ls->ls_cb_lock); 187 + 188 + if (ls->ls_callback_wq) 230 189 flush_workqueue(ls->ls_callback_wq); 231 - } 232 190 } 233 191 234 192 #define MAX_CB_QUEUE 25 ··· 241 195 int count = 0, sum = 0; 242 196 bool empty; 243 197 244 - if (!ls->ls_callback_wq) 198 + if (!test_bit(LSFL_FS, &ls->ls_flags)) 245 199 return; 246 200 247 201 more: 248 202 spin_lock_bh(&ls->ls_cb_lock); 249 203 list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) { 250 204 list_del(&cb->list); 251 - queue_work(ls->ls_callback_wq, &cb->work); 205 + if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) 206 + dlm_do_callback(cb); 207 + else 208 + queue_work(ls->ls_callback_wq, &cb->work); 209 + 252 210 count++; 253 211 if (count == MAX_CB_QUEUE) 254 212 break;
+5 -6
fs/dlm/ast.h
··· 11 11 #ifndef __ASTD_DOT_H__ 12 12 #define __ASTD_DOT_H__ 13 13 14 - #define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1 15 - #define DLM_ENQUEUE_CALLBACK_SUCCESS 0 16 - #define DLM_ENQUEUE_CALLBACK_FAILURE -1 17 - int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 18 - int status, uint32_t sbflags, 19 - struct dlm_callback **cb); 14 + bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 15 + int status, uint32_t sbflags, int *copy_lvb); 16 + int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, 17 + int status, uint32_t sbflags, 18 + struct dlm_callback **cb); 20 19 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 21 20 uint32_t sbflags); 22 21
+1 -1
fs/dlm/config.c
··· 672 672 673 673 memcpy(addr, buf, len); 674 674 675 - rv = dlm_midcomms_addr(cm->nodeid, addr, len); 675 + rv = dlm_midcomms_addr(cm->nodeid, addr); 676 676 if (rv) { 677 677 kfree(addr); 678 678 return rv;
+5 -5
fs/dlm/debug_fs.c
··· 380 380 381 381 static int table_seq_show(struct seq_file *seq, void *iter_ptr) 382 382 { 383 - struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_rsbs_list); 383 + struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_slow_list); 384 384 385 385 if (seq->op == &format1_seq_ops) 386 386 print_format1(rsb, seq); ··· 409 409 } 410 410 411 411 if (seq->op == &format4_seq_ops) 412 - list = &ls->ls_toss; 412 + list = &ls->ls_slow_inactive; 413 413 else 414 - list = &ls->ls_keep; 414 + list = &ls->ls_slow_active; 415 415 416 416 read_lock_bh(&ls->ls_rsbtbl_lock); 417 417 return seq_list_start(list, *pos); ··· 423 423 struct list_head *list; 424 424 425 425 if (seq->op == &format4_seq_ops) 426 - list = &ls->ls_toss; 426 + list = &ls->ls_slow_inactive; 427 427 else 428 - list = &ls->ls_keep; 428 + list = &ls->ls_slow_active; 429 429 430 430 return seq_list_next(iter_ptr, list, pos); 431 431 }
+22 -38
fs/dlm/dlm_internal.h
··· 36 36 #include <linux/miscdevice.h> 37 37 #include <linux/rhashtable.h> 38 38 #include <linux/mutex.h> 39 - #include <linux/idr.h> 39 + #include <linux/xarray.h> 40 40 #include <linux/ratelimit.h> 41 41 #include <linux/uaccess.h> 42 42 ··· 316 316 int res_nodeid; 317 317 int res_master_nodeid; 318 318 int res_dir_nodeid; 319 - int res_id; /* for ls_recover_idr */ 319 + unsigned long res_id; /* for ls_recover_xa */ 320 320 uint32_t res_lvbseq; 321 321 uint32_t res_hash; 322 322 unsigned long res_toss_time; 323 323 uint32_t res_first_lkid; 324 324 struct list_head res_lookup; /* lkbs waiting on first */ 325 - union { 326 - struct list_head res_hashchain; 327 - struct rhash_head res_node; /* rsbtbl */ 328 - }; 325 + struct rhash_head res_node; /* rsbtbl */ 329 326 struct list_head res_grantqueue; 330 327 struct list_head res_convertqueue; 331 328 struct list_head res_waitqueue; 332 329 333 - struct list_head res_rsbs_list; 330 + struct list_head res_slow_list; /* ls_slow_* */ 331 + struct list_head res_scan_list; 334 332 struct list_head res_root_list; /* used for recovery */ 335 333 struct list_head res_masters_list; /* used for recovery */ 336 334 struct list_head res_recover_list; /* used for recovery */ 337 - struct list_head res_toss_q_list; 338 335 int res_recover_locks_count; 336 + struct rcu_head rcu; 339 337 340 338 char *res_lvbptr; 341 339 char res_name[DLM_RESNAME_MAXLEN+1]; ··· 366 368 RSB_RECOVER_CONVERT, 367 369 RSB_RECOVER_GRANT, 368 370 RSB_RECOVER_LVB_INVAL, 369 - RSB_TOSS, 371 + RSB_INACTIVE, 372 + RSB_HASHED, /* set while rsb is on ls_rsbtbl */ 370 373 }; 371 374 372 375 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) ··· 558 559 char rl_lvb[]; 559 560 }; 560 561 561 - /* 562 - * The max number of resources per rsbtbl bucket that shrink will attempt 563 - * to remove in each iteration. 564 - */ 565 - 566 - #define DLM_REMOVE_NAMES_MAX 8 567 - 568 562 struct dlm_ls { 569 563 struct list_head ls_list; /* list of lockspaces */ 570 - dlm_lockspace_t *ls_local_handle; 571 564 uint32_t ls_global_id; /* global unique lockspace ID */ 572 565 uint32_t ls_generation; 573 566 uint32_t ls_exflags; ··· 569 578 wait_queue_head_t ls_count_wait; 570 579 int ls_create_count; /* create/release refcount */ 571 580 unsigned long ls_flags; /* LSFL_ */ 572 - unsigned long ls_scan_time; 573 581 struct kobject ls_kobj; 574 582 575 - struct idr ls_lkbidr; 576 - rwlock_t ls_lkbidr_lock; 583 + struct xarray ls_lkbxa; 584 + rwlock_t ls_lkbxa_lock; 577 585 586 + /* an rsb is on rsbtl for primary locking functions, 587 + and on a slow list for recovery/dump iteration */ 578 588 struct rhashtable ls_rsbtbl; 579 - rwlock_t ls_rsbtbl_lock; 589 + rwlock_t ls_rsbtbl_lock; /* for ls_rsbtbl and ls_slow */ 590 + struct list_head ls_slow_inactive; /* to iterate rsbtbl */ 591 + struct list_head ls_slow_active; /* to iterate rsbtbl */ 580 592 581 - struct list_head ls_toss; 582 - struct list_head ls_keep; 583 - 584 - struct timer_list ls_timer; 585 - /* this queue is ordered according the 586 - * absolute res_toss_time jiffies time 587 - * to mod_timer() with the first element 588 - * if necessary. 589 - */ 590 - struct list_head ls_toss_q; 591 - spinlock_t ls_toss_q_lock; 593 + struct timer_list ls_scan_timer; /* based on first scan_list rsb toss_time */ 594 + struct list_head ls_scan_list; /* rsbs ordered by res_toss_time */ 595 + spinlock_t ls_scan_lock; 592 596 593 597 spinlock_t ls_waiters_lock; 594 598 struct list_head ls_waiters; /* lkbs needing a reply */ 595 599 596 600 spinlock_t ls_orphans_lock; 597 601 struct list_head ls_orphans; 598 - 599 - spinlock_t ls_new_rsb_spin; 600 - int ls_new_rsb_count; 601 - struct list_head ls_new_rsb; /* new rsb structs */ 602 602 603 603 struct list_head ls_nodes; /* current nodes in ls */ 604 604 struct list_head ls_nodes_gone; /* dead node list, recovery */ ··· 646 664 struct list_head ls_recover_list; 647 665 spinlock_t ls_recover_list_lock; 648 666 int ls_recover_list_count; 649 - struct idr ls_recover_idr; 650 - spinlock_t ls_recover_idr_lock; 667 + struct xarray ls_recover_xa; 668 + spinlock_t ls_recover_xa_lock; 651 669 wait_queue_head_t ls_wait_general; 652 670 wait_queue_head_t ls_recover_lock_wait; 653 671 spinlock_t ls_clear_proc_locks; ··· 698 716 #define LSFL_CB_DELAY 9 699 717 #define LSFL_NODIR 10 700 718 #define LSFL_RECV_MSG_BLOCKED 11 719 + #define LSFL_FS 12 720 + #define LSFL_SOFTIRQ 13 701 721 702 722 #define DLM_PROC_FLAGS_CLOSING 1 703 723 #define DLM_PROC_FLAGS_COMPAT 2
+279 -289
fs/dlm/lock.c
··· 89 89 const struct dlm_message *ms, bool local); 90 90 static int receive_extralen(const struct dlm_message *ms); 91 91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 92 - static void toss_rsb(struct kref *kref); 92 + static void deactivate_rsb(struct kref *kref); 93 93 94 94 /* 95 95 * Lock compatibilty matrix - thanks Steve ··· 330 330 331 331 static inline void hold_rsb(struct dlm_rsb *r) 332 332 { 333 - /* rsbs in toss state never get referenced */ 334 - WARN_ON(rsb_flag(r, RSB_TOSS)); 333 + /* inactive rsbs are not ref counted */ 334 + WARN_ON(rsb_flag(r, RSB_INACTIVE)); 335 335 kref_get(&r->res_ref); 336 336 } 337 337 ··· 370 370 return 0; 371 371 } 372 372 373 - /* When all references to the rsb are gone it's transferred to 374 - the tossed list for later disposal. */ 375 - 376 373 static void put_rsb(struct dlm_rsb *r) 377 374 { 378 375 struct dlm_ls *ls = r->res_ls; 379 376 int rv; 380 377 381 - rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb, 378 + rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb, 382 379 &ls->ls_rsbtbl_lock); 383 380 if (rv) 384 381 write_unlock_bh(&ls->ls_rsbtbl_lock); ··· 386 389 put_rsb(r); 387 390 } 388 391 389 - static int pre_rsb_struct(struct dlm_ls *ls) 390 - { 391 - struct dlm_rsb *r1, *r2; 392 - int count = 0; 393 - 394 - spin_lock_bh(&ls->ls_new_rsb_spin); 395 - if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { 396 - spin_unlock_bh(&ls->ls_new_rsb_spin); 397 - return 0; 398 - } 399 - spin_unlock_bh(&ls->ls_new_rsb_spin); 400 - 401 - r1 = dlm_allocate_rsb(ls); 402 - r2 = dlm_allocate_rsb(ls); 403 - 404 - spin_lock_bh(&ls->ls_new_rsb_spin); 405 - if (r1) { 406 - list_add(&r1->res_hashchain, &ls->ls_new_rsb); 407 - ls->ls_new_rsb_count++; 408 - } 409 - if (r2) { 410 - list_add(&r2->res_hashchain, &ls->ls_new_rsb); 411 - ls->ls_new_rsb_count++; 412 - } 413 - count = ls->ls_new_rsb_count; 414 - spin_unlock_bh(&ls->ls_new_rsb_spin); 415 - 416 - if (!count) 417 - return -ENOMEM; 418 - return 0; 419 - } 420 - 421 392 /* connected with timer_delete_sync() in dlm_ls_stop() to stop 422 393 * new timers when recovery is triggered and don't run them 423 - * again until a dlm_timer_resume() tries it again. 394 + * again until a resume_scan_timer() tries it again. 424 395 */ 425 - static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies) 396 + static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies) 426 397 { 427 398 if (!dlm_locking_stopped(ls)) 428 - mod_timer(&ls->ls_timer, jiffies); 399 + mod_timer(&ls->ls_scan_timer, jiffies); 429 400 } 430 401 431 402 /* This function tries to resume the timer callback if a rsb 432 - * is on the toss list and no timer is pending. It might that 403 + * is on the scan list and no timer is pending. It might that 433 404 * the first entry is on currently executed as timer callback 434 405 * but we don't care if a timer queued up again and does 435 406 * nothing. Should be a rare case. 436 407 */ 437 - void dlm_timer_resume(struct dlm_ls *ls) 408 + void resume_scan_timer(struct dlm_ls *ls) 438 409 { 439 410 struct dlm_rsb *r; 440 411 441 - spin_lock_bh(&ls->ls_toss_q_lock); 442 - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, 443 - res_toss_q_list); 444 - if (r && !timer_pending(&ls->ls_timer)) 445 - __rsb_mod_timer(ls, r->res_toss_time); 446 - spin_unlock_bh(&ls->ls_toss_q_lock); 412 + spin_lock_bh(&ls->ls_scan_lock); 413 + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, 414 + res_scan_list); 415 + if (r && !timer_pending(&ls->ls_scan_timer)) 416 + enable_scan_timer(ls, r->res_toss_time); 417 + spin_unlock_bh(&ls->ls_scan_lock); 447 418 } 448 419 449 - /* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */ 450 - static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) 420 + /* ls_rsbtbl_lock must be held */ 421 + 422 + static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) 451 423 { 452 424 struct dlm_rsb *first; 453 425 454 - spin_lock_bh(&ls->ls_toss_q_lock); 426 + /* active rsbs should never be on the scan list */ 427 + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); 428 + 429 + spin_lock_bh(&ls->ls_scan_lock); 455 430 r->res_toss_time = 0; 456 431 457 432 /* if the rsb is not queued do nothing */ 458 - if (list_empty(&r->res_toss_q_list)) 433 + if (list_empty(&r->res_scan_list)) 459 434 goto out; 460 435 461 436 /* get the first element before delete */ 462 - first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb, 463 - res_toss_q_list); 464 - list_del_init(&r->res_toss_q_list); 437 + first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb, 438 + res_scan_list); 439 + list_del_init(&r->res_scan_list); 465 440 /* check if the first element was the rsb we deleted */ 466 441 if (first == r) { 467 442 /* try to get the new first element, if the list ··· 443 474 * if the list isn't empty and a new first element got 444 475 * in place, set the new timer expire time. 445 476 */ 446 - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, 447 - res_toss_q_list); 477 + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, 478 + res_scan_list); 448 479 if (!first) 449 - timer_delete(&ls->ls_timer); 480 + timer_delete(&ls->ls_scan_timer); 450 481 else 451 - __rsb_mod_timer(ls, first->res_toss_time); 482 + enable_scan_timer(ls, first->res_toss_time); 452 483 } 453 484 454 485 out: 455 - spin_unlock_bh(&ls->ls_toss_q_lock); 486 + spin_unlock_bh(&ls->ls_scan_lock); 456 487 } 457 488 458 - /* Caller must held ls_rsbtbl_lock and need to be called every time 459 - * when either the rsb enters toss state or the toss state changes 460 - * the dir/master nodeid. 461 - */ 462 - static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) 489 + static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) 463 490 { 464 491 int our_nodeid = dlm_our_nodeid(); 465 492 struct dlm_rsb *first; 466 493 467 - /* If we're the directory record for this rsb, and 468 - * we're not the master of it, then we need to wait 469 - * for the master node to send us a dir remove for 470 - * before removing the dir record. 471 - */ 472 - if (!dlm_no_directory(ls) && 473 - (r->res_master_nodeid != our_nodeid) && 474 - (dlm_dir_nodeid(r) == our_nodeid)) { 475 - rsb_delete_toss_timer(ls, r); 476 - return; 477 - } 494 + /* A dir record for a remote master rsb should never be on the scan list. */ 495 + WARN_ON(!dlm_no_directory(ls) && 496 + (r->res_master_nodeid != our_nodeid) && 497 + (dlm_dir_nodeid(r) == our_nodeid)); 478 498 479 - spin_lock_bh(&ls->ls_toss_q_lock); 499 + /* An active rsb should never be on the scan list. */ 500 + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); 501 + 502 + /* An rsb should not already be on the scan list. */ 503 + WARN_ON(!list_empty(&r->res_scan_list)); 504 + 505 + spin_lock_bh(&ls->ls_scan_lock); 480 506 /* set the new rsb absolute expire time in the rsb */ 481 507 r->res_toss_time = rsb_toss_jiffies(); 482 - if (list_empty(&ls->ls_toss_q)) { 508 + if (list_empty(&ls->ls_scan_list)) { 483 509 /* if the queue is empty add the element and it's 484 510 * our new expire time 485 511 */ 486 - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); 487 - __rsb_mod_timer(ls, r->res_toss_time); 512 + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); 513 + enable_scan_timer(ls, r->res_toss_time); 488 514 } else { 489 - /* check if the rsb was already queued, if so delete 490 - * it from the toss queue 491 - */ 492 - if (!list_empty(&r->res_toss_q_list)) 493 - list_del(&r->res_toss_q_list); 494 - 495 515 /* try to get the maybe new first element and then add 496 516 * to this rsb with the oldest expire time to the end 497 517 * of the queue. If the list was empty before this 498 518 * rsb expire time is our next expiration if it wasn't 499 519 * the now new first elemet is our new expiration time 500 520 */ 501 - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, 502 - res_toss_q_list); 503 - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); 521 + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, 522 + res_scan_list); 523 + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); 504 524 if (!first) 505 - __rsb_mod_timer(ls, r->res_toss_time); 525 + enable_scan_timer(ls, r->res_toss_time); 506 526 else 507 - __rsb_mod_timer(ls, first->res_toss_time); 527 + enable_scan_timer(ls, first->res_toss_time); 508 528 } 509 - spin_unlock_bh(&ls->ls_toss_q_lock); 529 + spin_unlock_bh(&ls->ls_scan_lock); 510 530 } 511 531 512 532 /* if we hit contention we do in 250 ms a retry to trylock. ··· 505 547 */ 506 548 #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) 507 549 508 - void dlm_rsb_toss_timer(struct timer_list *timer) 550 + /* Called by lockspace scan_timer to free unused rsb's. */ 551 + 552 + void dlm_rsb_scan(struct timer_list *timer) 509 553 { 510 - struct dlm_ls *ls = from_timer(ls, timer, ls_timer); 554 + struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); 511 555 int our_nodeid = dlm_our_nodeid(); 512 556 struct dlm_rsb *r; 513 557 int rv; ··· 517 557 while (1) { 518 558 /* interrupting point to leave iteration when 519 559 * recovery waits for timer_delete_sync(), recovery 520 - * will take care to delete everything in toss queue. 560 + * will take care to delete everything in scan list. 521 561 */ 522 562 if (dlm_locking_stopped(ls)) 523 563 break; 524 564 525 - rv = spin_trylock(&ls->ls_toss_q_lock); 565 + rv = spin_trylock(&ls->ls_scan_lock); 526 566 if (!rv) { 527 567 /* rearm again try timer */ 528 - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); 568 + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); 529 569 break; 530 570 } 531 571 532 - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, 533 - res_toss_q_list); 572 + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, 573 + res_scan_list); 534 574 if (!r) { 535 - /* nothing to do anymore next rsb queue will 536 - * set next mod_timer() expire. 537 - */ 538 - spin_unlock(&ls->ls_toss_q_lock); 575 + /* the next add_scan will enable the timer again */ 576 + spin_unlock(&ls->ls_scan_lock); 539 577 break; 540 578 } 541 579 542 - /* test if the first rsb isn't expired yet, if 543 - * so we stop freeing rsb from toss queue as 544 - * the order in queue is ascending to the 545 - * absolute res_toss_time jiffies 580 + /* 581 + * If the first rsb is not yet expired, then stop because the 582 + * list is sorted with nearest expiration first. 546 583 */ 547 584 if (time_before(jiffies, r->res_toss_time)) { 548 585 /* rearm with the next rsb to expire in the future */ 549 - __rsb_mod_timer(ls, r->res_toss_time); 550 - spin_unlock(&ls->ls_toss_q_lock); 586 + enable_scan_timer(ls, r->res_toss_time); 587 + spin_unlock(&ls->ls_scan_lock); 551 588 break; 552 589 } 553 590 554 591 /* in find_rsb_dir/nodir there is a reverse order of this 555 592 * lock, however this is only a trylock if we hit some 556 593 * possible contention we try it again. 557 - * 558 - * This lock synchronized while holding ls_toss_q_lock 559 - * synchronize everything that rsb_delete_toss_timer() 560 - * or rsb_mod_timer() can't run after this timer callback 561 - * deletes the rsb from the ls_toss_q. Whereas the other 562 - * holders have always a priority to run as this is only 563 - * a caching handling and the other holders might to put 564 - * this rsb out of the toss state. 565 594 */ 566 595 rv = write_trylock(&ls->ls_rsbtbl_lock); 567 596 if (!rv) { 568 - spin_unlock(&ls->ls_toss_q_lock); 597 + spin_unlock(&ls->ls_scan_lock); 569 598 /* rearm again try timer */ 570 - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); 599 + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); 571 600 break; 572 601 } 573 602 574 - list_del(&r->res_rsbs_list); 603 + list_del(&r->res_slow_list); 575 604 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, 576 605 dlm_rhash_rsb_params); 606 + rsb_clear_flag(r, RSB_HASHED); 577 607 578 - /* not necessary to held the ls_rsbtbl_lock when 579 - * calling send_remove() 580 - */ 608 + /* ls_rsbtbl_lock is not needed when calling send_remove() */ 581 609 write_unlock(&ls->ls_rsbtbl_lock); 582 610 583 - /* remove the rsb out of the toss queue its gone 584 - * drom DLM now 585 - */ 586 - list_del_init(&r->res_toss_q_list); 587 - spin_unlock(&ls->ls_toss_q_lock); 611 + list_del_init(&r->res_scan_list); 612 + spin_unlock(&ls->ls_scan_lock); 588 613 589 - /* no rsb in this state should ever run a timer */ 614 + /* An rsb that is a dir record for a remote master rsb 615 + * cannot be removed, and should not have a timer enabled. 616 + */ 590 617 WARN_ON(!dlm_no_directory(ls) && 591 618 (r->res_master_nodeid != our_nodeid) && 592 619 (dlm_dir_nodeid(r) == our_nodeid)); ··· 587 640 (dlm_dir_nodeid(r) != our_nodeid)) 588 641 send_remove(r); 589 642 590 - free_toss_rsb(r); 643 + free_inactive_rsb(r); 591 644 } 592 645 } 593 646 ··· 599 652 struct dlm_rsb **r_ret) 600 653 { 601 654 struct dlm_rsb *r; 602 - int count; 603 655 604 - spin_lock_bh(&ls->ls_new_rsb_spin); 605 - if (list_empty(&ls->ls_new_rsb)) { 606 - count = ls->ls_new_rsb_count; 607 - spin_unlock_bh(&ls->ls_new_rsb_spin); 608 - log_debug(ls, "find_rsb retry %d %d %s", 609 - count, dlm_config.ci_new_rsb_count, 610 - (const char *)name); 611 - return -EAGAIN; 612 - } 613 - 614 - r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); 615 - list_del(&r->res_hashchain); 616 - ls->ls_new_rsb_count--; 617 - spin_unlock_bh(&ls->ls_new_rsb_spin); 656 + r = dlm_allocate_rsb(ls); 657 + if (!r) 658 + return -ENOMEM; 618 659 619 660 r->res_ls = ls; 620 661 r->res_length = len; ··· 614 679 INIT_LIST_HEAD(&r->res_convertqueue); 615 680 INIT_LIST_HEAD(&r->res_waitqueue); 616 681 INIT_LIST_HEAD(&r->res_root_list); 617 - INIT_LIST_HEAD(&r->res_toss_q_list); 682 + INIT_LIST_HEAD(&r->res_scan_list); 618 683 INIT_LIST_HEAD(&r->res_recover_list); 619 684 INIT_LIST_HEAD(&r->res_masters_list); 620 685 ··· 637 702 638 703 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) 639 704 { 640 - return rhashtable_insert_fast(rhash, &rsb->res_node, 641 - dlm_rhash_rsb_params); 705 + int rv; 706 + 707 + rv = rhashtable_insert_fast(rhash, &rsb->res_node, 708 + dlm_rhash_rsb_params); 709 + if (!rv) 710 + rsb_set_flag(rsb, RSB_HASHED); 711 + 712 + return rv; 642 713 } 643 714 644 715 /* ··· 674 733 * So, if the given rsb is on the toss list, it is moved to the keep list 675 734 * before being returned. 676 735 * 677 - * toss_rsb() happens when all local usage of the rsb is done, i.e. no 736 + * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no 678 737 * more refcounts exist, so the rsb is moved from the keep list to the 679 738 * toss list. 680 739 * ··· 722 781 * 723 782 * If someone sends us a request, we are the dir node, and we do 724 783 * not find the rsb anywhere, then recreate it. This happens if 725 - * someone sends us a request after we have removed/freed an rsb 726 - * from our toss list. (They sent a request instead of lookup 727 - * because they are using an rsb from their toss list.) 784 + * someone sends us a request after we have removed/freed an rsb. 785 + * (They sent a request instead of lookup because they are using 786 + * an rsb taken from their scan list.) 728 787 */ 729 788 730 789 if (from_local || from_dir || ··· 733 792 } 734 793 735 794 retry: 736 - if (create) { 737 - error = pre_rsb_struct(ls); 738 - if (error < 0) 739 - goto out; 740 - } 741 795 742 - retry_lookup: 743 - 744 - /* check if the rsb is in keep state under read lock - likely path */ 796 + /* check if the rsb is active under read lock - likely path */ 745 797 read_lock_bh(&ls->ls_rsbtbl_lock); 746 798 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 747 799 if (error) { ··· 746 812 * rsb is active, so we can't check master_nodeid without lock_rsb. 747 813 */ 748 814 749 - if (rsb_flag(r, RSB_TOSS)) { 815 + if (rsb_flag(r, RSB_INACTIVE)) { 750 816 read_unlock_bh(&ls->ls_rsbtbl_lock); 751 - goto do_toss; 817 + goto do_inactive; 752 818 } 753 819 754 820 kref_get(&r->res_ref); ··· 756 822 goto out; 757 823 758 824 759 - do_toss: 825 + do_inactive: 760 826 write_lock_bh(&ls->ls_rsbtbl_lock); 761 827 762 - /* retry lookup under write lock to see if its still in toss state 763 - * if not it's in keep state and we relookup - unlikely path. 828 + /* 829 + * The expectation here is that the rsb will have HASHED and 830 + * INACTIVE flags set, and that the rsb can be moved from 831 + * inactive back to active again. However, between releasing 832 + * the read lock and acquiring the write lock, this rsb could 833 + * have been removed from rsbtbl, and had HASHED cleared, to 834 + * be freed. To deal with this case, we would normally need 835 + * to repeat dlm_search_rsb_tree while holding the write lock, 836 + * but rcu allows us to simply check the HASHED flag, because 837 + * the rcu read lock means the rsb will not be freed yet. 838 + * If the HASHED flag is not set, then the rsb is being freed, 839 + * so we add a new rsb struct. If the HASHED flag is set, 840 + * and INACTIVE is not set, it means another thread has 841 + * made the rsb active, as we're expecting to do here, and 842 + * we just repeat the lookup (this will be very unlikely.) 764 843 */ 765 - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 766 - if (!error) { 767 - if (!rsb_flag(r, RSB_TOSS)) { 844 + if (rsb_flag(r, RSB_HASHED)) { 845 + if (!rsb_flag(r, RSB_INACTIVE)) { 768 846 write_unlock_bh(&ls->ls_rsbtbl_lock); 769 - goto retry_lookup; 847 + goto retry; 770 848 } 771 849 } else { 772 850 write_unlock_bh(&ls->ls_rsbtbl_lock); ··· 788 842 /* 789 843 * rsb found inactive (master_nodeid may be out of date unless 790 844 * we are the dir_nodeid or were the master) No other thread 791 - * is using this rsb because it's on the toss list, so we can 845 + * is using this rsb because it's inactive, so we can 792 846 * look at or update res_master_nodeid without lock_rsb. 793 847 */ 794 848 795 849 if ((r->res_master_nodeid != our_nodeid) && from_other) { 796 850 /* our rsb was not master, and another node (not the dir node) 797 851 has sent us a request */ 798 - log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", 852 + log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s", 799 853 from_nodeid, r->res_master_nodeid, dir_nodeid, 800 854 r->res_name); 801 855 write_unlock_bh(&ls->ls_rsbtbl_lock); ··· 805 859 806 860 if ((r->res_master_nodeid != our_nodeid) && from_dir) { 807 861 /* don't think this should ever happen */ 808 - log_error(ls, "find_rsb toss from_dir %d master %d", 862 + log_error(ls, "find_rsb inactive from_dir %d master %d", 809 863 from_nodeid, r->res_master_nodeid); 810 864 dlm_print_rsb(r); 811 865 /* fix it and go on */ ··· 822 876 r->res_first_lkid = 0; 823 877 } 824 878 825 - list_move(&r->res_rsbs_list, &ls->ls_keep); 826 - rsb_clear_flag(r, RSB_TOSS); 827 - /* rsb got out of toss state, it becomes alive again 828 - * and we reinit the reference counter that is only 829 - * valid for keep state rsbs 830 - */ 831 - kref_init(&r->res_ref); 832 - rsb_delete_toss_timer(ls, r); 879 + /* A dir record will not be on the scan list. */ 880 + if (r->res_dir_nodeid != our_nodeid) 881 + del_scan(ls, r); 882 + list_move(&r->res_slow_list, &ls->ls_slow_active); 883 + rsb_clear_flag(r, RSB_INACTIVE); 884 + kref_init(&r->res_ref); /* ref is now used in active state */ 833 885 write_unlock_bh(&ls->ls_rsbtbl_lock); 834 886 835 887 goto out; ··· 842 898 goto out; 843 899 844 900 error = get_rsb_struct(ls, name, len, &r); 845 - if (error == -EAGAIN) 846 - goto retry; 847 - if (error) 901 + if (WARN_ON_ONCE(error)) 848 902 goto out; 849 903 850 904 r->res_hash = hash; ··· 894 952 */ 895 953 write_unlock_bh(&ls->ls_rsbtbl_lock); 896 954 dlm_free_rsb(r); 897 - goto retry_lookup; 955 + goto retry; 898 956 } else if (!error) { 899 - list_add(&r->res_rsbs_list, &ls->ls_keep); 957 + list_add(&r->res_slow_list, &ls->ls_slow_active); 900 958 } 901 959 write_unlock_bh(&ls->ls_rsbtbl_lock); 902 960 out: ··· 918 976 int error; 919 977 920 978 retry: 921 - error = pre_rsb_struct(ls); 922 - if (error < 0) 923 - goto out; 924 979 925 - retry_lookup: 926 - 927 - /* check if the rsb is in keep state under read lock - likely path */ 980 + /* check if the rsb is in active state under read lock - likely path */ 928 981 read_lock_bh(&ls->ls_rsbtbl_lock); 929 982 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 930 983 if (error) { ··· 927 990 goto do_new; 928 991 } 929 992 930 - if (rsb_flag(r, RSB_TOSS)) { 993 + if (rsb_flag(r, RSB_INACTIVE)) { 931 994 read_unlock_bh(&ls->ls_rsbtbl_lock); 932 - goto do_toss; 995 + goto do_inactive; 933 996 } 934 997 935 998 /* ··· 942 1005 goto out; 943 1006 944 1007 945 - do_toss: 1008 + do_inactive: 946 1009 write_lock_bh(&ls->ls_rsbtbl_lock); 947 1010 948 - /* retry lookup under write lock to see if its still in toss state 949 - * if not it's in keep state and we relookup - unlikely path. 950 - */ 951 - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 952 - if (!error) { 953 - if (!rsb_flag(r, RSB_TOSS)) { 1011 + /* See comment in find_rsb_dir. */ 1012 + if (rsb_flag(r, RSB_HASHED)) { 1013 + if (!rsb_flag(r, RSB_INACTIVE)) { 954 1014 write_unlock_bh(&ls->ls_rsbtbl_lock); 955 - goto retry_lookup; 1015 + goto retry; 956 1016 } 957 1017 } else { 958 1018 write_unlock_bh(&ls->ls_rsbtbl_lock); ··· 959 1025 960 1026 /* 961 1027 * rsb found inactive. No other thread is using this rsb because 962 - * it's on the toss list, so we can look at or update 963 - * res_master_nodeid without lock_rsb. 1028 + * it's inactive, so we can look at or update res_master_nodeid 1029 + * without lock_rsb. 964 1030 */ 965 1031 966 1032 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { 967 1033 /* our rsb is not master, and another node has sent us a 968 1034 request; this should never happen */ 969 - log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", 1035 + log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d", 970 1036 from_nodeid, r->res_master_nodeid, dir_nodeid); 971 1037 dlm_print_rsb(r); 972 1038 write_unlock_bh(&ls->ls_rsbtbl_lock); ··· 978 1044 (dir_nodeid == our_nodeid)) { 979 1045 /* our rsb is not master, and we are dir; may as well fix it; 980 1046 this should never happen */ 981 - log_error(ls, "find_rsb toss our %d master %d dir %d", 1047 + log_error(ls, "find_rsb inactive our %d master %d dir %d", 982 1048 our_nodeid, r->res_master_nodeid, dir_nodeid); 983 1049 dlm_print_rsb(r); 984 1050 r->res_master_nodeid = our_nodeid; 985 1051 r->res_nodeid = 0; 986 1052 } 987 1053 988 - list_move(&r->res_rsbs_list, &ls->ls_keep); 989 - rsb_clear_flag(r, RSB_TOSS); 990 - /* rsb got out of toss state, it becomes alive again 991 - * and we reinit the reference counter that is only 992 - * valid for keep state rsbs 993 - */ 1054 + list_move(&r->res_slow_list, &ls->ls_slow_active); 1055 + rsb_clear_flag(r, RSB_INACTIVE); 994 1056 kref_init(&r->res_ref); 995 - rsb_delete_toss_timer(ls, r); 1057 + del_scan(ls, r); 996 1058 write_unlock_bh(&ls->ls_rsbtbl_lock); 997 1059 998 1060 goto out; ··· 1000 1070 */ 1001 1071 1002 1072 error = get_rsb_struct(ls, name, len, &r); 1003 - if (error == -EAGAIN) { 1004 - goto retry; 1005 - } 1006 - if (error) 1073 + if (WARN_ON_ONCE(error)) 1007 1074 goto out; 1008 1075 1009 1076 r->res_hash = hash; ··· 1017 1090 */ 1018 1091 write_unlock_bh(&ls->ls_rsbtbl_lock); 1019 1092 dlm_free_rsb(r); 1020 - goto retry_lookup; 1093 + goto retry; 1021 1094 } else if (!error) { 1022 - list_add(&r->res_rsbs_list, &ls->ls_keep); 1095 + list_add(&r->res_slow_list, &ls->ls_slow_active); 1023 1096 } 1024 1097 write_unlock_bh(&ls->ls_rsbtbl_lock); 1025 1098 ··· 1028 1101 return error; 1029 1102 } 1030 1103 1104 + /* 1105 + * rsb rcu usage 1106 + * 1107 + * While rcu read lock is held, the rsb cannot be freed, 1108 + * which allows a lookup optimization. 1109 + * 1110 + * Two threads are accessing the same rsb concurrently, 1111 + * the first (A) is trying to use the rsb, the second (B) 1112 + * is trying to free the rsb. 1113 + * 1114 + * thread A thread B 1115 + * (trying to use rsb) (trying to free rsb) 1116 + * 1117 + * A1. rcu read lock 1118 + * A2. rsbtbl read lock 1119 + * A3. look up rsb in rsbtbl 1120 + * A4. rsbtbl read unlock 1121 + * B1. rsbtbl write lock 1122 + * B2. look up rsb in rsbtbl 1123 + * B3. remove rsb from rsbtbl 1124 + * B4. clear rsb HASHED flag 1125 + * B5. rsbtbl write unlock 1126 + * B6. begin freeing rsb using rcu... 1127 + * 1128 + * (rsb is inactive, so try to make it active again) 1129 + * A5. read rsb HASHED flag (safe because rsb is not freed yet) 1130 + * A6. the rsb HASHED flag is not set, which it means the rsb 1131 + * is being removed from rsbtbl and freed, so don't use it. 1132 + * A7. rcu read unlock 1133 + * 1134 + * B7. ...finish freeing rsb using rcu 1135 + * A8. create a new rsb 1136 + * 1137 + * Without the rcu optimization, steps A5-8 would need to do 1138 + * an extra rsbtbl lookup: 1139 + * A5. rsbtbl write lock 1140 + * A6. look up rsb in rsbtbl, not found 1141 + * A7. rsbtbl write unlock 1142 + * A8. create a new rsb 1143 + */ 1144 + 1031 1145 static int find_rsb(struct dlm_ls *ls, const void *name, int len, 1032 1146 int from_nodeid, unsigned int flags, 1033 1147 struct dlm_rsb **r_ret) 1034 1148 { 1035 1149 int dir_nodeid; 1036 1150 uint32_t hash; 1151 + int rv; 1037 1152 1038 1153 if (len > DLM_RESNAME_MAXLEN) 1039 1154 return -EINVAL; ··· 1083 1114 hash = jhash(name, len, 0); 1084 1115 dir_nodeid = dlm_hash2nodeid(ls, hash); 1085 1116 1117 + rcu_read_lock(); 1086 1118 if (dlm_no_directory(ls)) 1087 - return find_rsb_nodir(ls, name, len, hash, dir_nodeid, 1119 + rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid, 1088 1120 from_nodeid, flags, r_ret); 1089 1121 else 1090 - return find_rsb_dir(ls, name, len, hash, dir_nodeid, 1122 + rv = find_rsb_dir(ls, name, len, hash, dir_nodeid, 1091 1123 from_nodeid, flags, r_ret); 1124 + rcu_read_unlock(); 1125 + return rv; 1092 1126 } 1093 1127 1094 1128 /* we have received a request and found that res_master_nodeid != our_nodeid, ··· 1138 1166 } 1139 1167 1140 1168 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, 1141 - int from_nodeid, bool toss_list, unsigned int flags, 1169 + int from_nodeid, bool is_inactive, unsigned int flags, 1142 1170 int *r_nodeid, int *result) 1143 1171 { 1144 1172 int fix_master = (flags & DLM_LU_RECOVER_MASTER); ··· 1162 1190 r->res_nodeid = from_nodeid; 1163 1191 rsb_set_flag(r, RSB_NEW_MASTER); 1164 1192 1165 - if (toss_list) { 1166 - /* I don't think we should ever find it on toss list. */ 1167 - log_error(ls, "%s fix_master on toss", __func__); 1193 + if (is_inactive) { 1194 + /* I don't think we should ever find it inactive. */ 1195 + log_error(ls, "%s fix_master inactive", __func__); 1168 1196 dlm_dump_rsb(r); 1169 1197 } 1170 1198 } ··· 1204 1232 if (!from_master && !fix_master && 1205 1233 (r->res_master_nodeid == from_nodeid)) { 1206 1234 /* this can happen when the master sends remove, the dir node 1207 - * finds the rsb on the keep list and ignores the remove, 1235 + * finds the rsb on the active list and ignores the remove, 1208 1236 * and the former master sends a lookup 1209 1237 */ 1210 1238 ··· 1248 1276 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) 1249 1277 */ 1250 1278 1251 - int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, 1252 - int len, unsigned int flags, int *r_nodeid, int *result) 1279 + static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, 1280 + int len, unsigned int flags, int *r_nodeid, int *result) 1253 1281 { 1254 1282 struct dlm_rsb *r = NULL; 1255 1283 uint32_t hash; ··· 1276 1304 } 1277 1305 1278 1306 retry: 1279 - error = pre_rsb_struct(ls); 1280 - if (error < 0) 1281 - return error; 1282 1307 1283 - retry_lookup: 1284 - 1285 - /* check if the rsb is in keep state under read lock - likely path */ 1308 + /* check if the rsb is active under read lock - likely path */ 1286 1309 read_lock_bh(&ls->ls_rsbtbl_lock); 1287 1310 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 1288 1311 if (!error) { 1289 - if (rsb_flag(r, RSB_TOSS)) { 1312 + if (rsb_flag(r, RSB_INACTIVE)) { 1290 1313 read_unlock_bh(&ls->ls_rsbtbl_lock); 1291 - goto do_toss; 1314 + goto do_inactive; 1292 1315 } 1293 1316 1294 1317 /* because the rsb is active, we need to lock_rsb before ··· 1307 1340 goto not_found; 1308 1341 } 1309 1342 1310 - do_toss: 1343 + do_inactive: 1311 1344 /* unlikely path - relookup under write */ 1312 1345 write_lock_bh(&ls->ls_rsbtbl_lock); 1313 1346 1314 - /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock 1315 - * check if the rsb is still in toss state, if not relookup 1316 - */ 1317 1347 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 1318 1348 if (!error) { 1319 - if (!rsb_flag(r, RSB_TOSS)) { 1349 + if (!rsb_flag(r, RSB_INACTIVE)) { 1320 1350 write_unlock_bh(&ls->ls_rsbtbl_lock); 1321 1351 /* something as changed, very unlikely but 1322 1352 * try again 1323 1353 */ 1324 - goto retry_lookup; 1354 + goto retry; 1325 1355 } 1326 1356 } else { 1327 1357 write_unlock_bh(&ls->ls_rsbtbl_lock); 1328 1358 goto not_found; 1329 1359 } 1330 1360 1331 - /* because the rsb is inactive (on toss list), it's not refcounted 1332 - * and lock_rsb is not used, but is protected by the rsbtbl lock 1333 - */ 1361 + /* because the rsb is inactive, it's not refcounted and lock_rsb 1362 + is not used, but is protected by the rsbtbl lock */ 1334 1363 1335 1364 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, 1336 1365 r_nodeid, result); 1337 1366 1338 - rsb_mod_timer(ls, r); 1339 - /* the rsb was inactive (on toss list) */ 1367 + /* A dir record rsb should never be on scan list. */ 1368 + /* Try to fix this with del_scan? */ 1369 + WARN_ON(!list_empty(&r->res_scan_list)); 1370 + 1340 1371 write_unlock_bh(&ls->ls_rsbtbl_lock); 1341 1372 1342 1373 return 0; 1343 1374 1344 1375 not_found: 1345 1376 error = get_rsb_struct(ls, name, len, &r); 1346 - if (error == -EAGAIN) 1347 - goto retry; 1348 - if (error) 1377 + if (WARN_ON_ONCE(error)) 1349 1378 goto out; 1350 1379 1351 1380 r->res_hash = hash; 1352 1381 r->res_dir_nodeid = our_nodeid; 1353 1382 r->res_master_nodeid = from_nodeid; 1354 1383 r->res_nodeid = from_nodeid; 1355 - kref_init(&r->res_ref); 1356 - rsb_set_flag(r, RSB_TOSS); 1384 + rsb_set_flag(r, RSB_INACTIVE); 1357 1385 1358 1386 write_lock_bh(&ls->ls_rsbtbl_lock); 1359 1387 error = rsb_insert(r, &ls->ls_rsbtbl); ··· 1358 1396 */ 1359 1397 write_unlock_bh(&ls->ls_rsbtbl_lock); 1360 1398 dlm_free_rsb(r); 1361 - goto retry_lookup; 1399 + goto retry; 1362 1400 } else if (error) { 1363 1401 write_unlock_bh(&ls->ls_rsbtbl_lock); 1364 1402 /* should never happen */ ··· 1366 1404 goto retry; 1367 1405 } 1368 1406 1369 - list_add(&r->res_rsbs_list, &ls->ls_toss); 1370 - rsb_mod_timer(ls, r); 1407 + list_add(&r->res_slow_list, &ls->ls_slow_inactive); 1371 1408 write_unlock_bh(&ls->ls_rsbtbl_lock); 1372 1409 1373 1410 if (result) ··· 1376 1415 return error; 1377 1416 } 1378 1417 1418 + int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, 1419 + int len, unsigned int flags, int *r_nodeid, int *result) 1420 + { 1421 + int rv; 1422 + rcu_read_lock(); 1423 + rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result); 1424 + rcu_read_unlock(); 1425 + return rv; 1426 + } 1427 + 1379 1428 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) 1380 1429 { 1381 1430 struct dlm_rsb *r; 1382 1431 1383 1432 read_lock_bh(&ls->ls_rsbtbl_lock); 1384 - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { 1433 + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { 1385 1434 if (r->res_hash == hash) 1386 1435 dlm_dump_rsb(r); 1387 1436 } ··· 1413 1442 read_unlock_bh(&ls->ls_rsbtbl_lock); 1414 1443 } 1415 1444 1416 - static void toss_rsb(struct kref *kref) 1445 + static void deactivate_rsb(struct kref *kref) 1417 1446 { 1418 1447 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1419 1448 struct dlm_ls *ls = r->res_ls; 1449 + int our_nodeid = dlm_our_nodeid(); 1420 1450 1421 1451 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 1422 - rsb_set_flag(r, RSB_TOSS); 1423 - list_move(&r->res_rsbs_list, &ls->ls_toss); 1424 - rsb_mod_timer(ls, r); 1452 + rsb_set_flag(r, RSB_INACTIVE); 1453 + list_move(&r->res_slow_list, &ls->ls_slow_inactive); 1454 + 1455 + /* 1456 + * When the rsb becomes unused: 1457 + * - If it's not a dir record for a remote master rsb, 1458 + * then it is put on the scan list to be freed. 1459 + * - If it's a dir record for a remote master rsb, 1460 + * then it is kept in the inactive state until 1461 + * receive_remove() from the master node. 1462 + */ 1463 + if (!dlm_no_directory(ls) && 1464 + (r->res_master_nodeid != our_nodeid) && 1465 + (dlm_dir_nodeid(r) != our_nodeid)) 1466 + add_scan(ls, r); 1425 1467 1426 1468 if (r->res_lvbptr) { 1427 1469 dlm_free_lvb(r->res_lvbptr); ··· 1448 1464 { 1449 1465 int rv; 1450 1466 1451 - /* rsbs in toss state never get referenced */ 1452 - WARN_ON(rsb_flag(r, RSB_TOSS)); 1453 - rv = kref_put(&r->res_ref, toss_rsb); 1467 + /* inactive rsbs are not ref counted */ 1468 + WARN_ON(rsb_flag(r, RSB_INACTIVE)); 1469 + rv = kref_put(&r->res_ref, deactivate_rsb); 1454 1470 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 1455 1471 } 1456 1472 1457 - void free_toss_rsb(struct dlm_rsb *r) 1473 + void free_inactive_rsb(struct dlm_rsb *r) 1458 1474 { 1459 - WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS)); 1475 + WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE)); 1460 1476 1461 1477 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 1462 1478 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 1463 1479 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 1464 1480 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 1465 1481 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 1466 - DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r);); 1482 + DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r);); 1467 1483 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 1468 1484 DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); 1469 1485 ··· 1488 1504 } 1489 1505 1490 1506 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, 1491 - int start, int end) 1507 + unsigned long start, unsigned long end) 1492 1508 { 1509 + struct xa_limit limit; 1493 1510 struct dlm_lkb *lkb; 1494 1511 int rv; 1512 + 1513 + limit.max = end; 1514 + limit.min = start; 1495 1515 1496 1516 lkb = dlm_allocate_lkb(ls); 1497 1517 if (!lkb) ··· 1510 1522 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 1511 1523 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 1512 1524 1513 - write_lock_bh(&ls->ls_lkbidr_lock); 1514 - rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); 1515 - if (rv >= 0) 1516 - lkb->lkb_id = rv; 1517 - write_unlock_bh(&ls->ls_lkbidr_lock); 1525 + write_lock_bh(&ls->ls_lkbxa_lock); 1526 + rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC); 1527 + write_unlock_bh(&ls->ls_lkbxa_lock); 1518 1528 1519 1529 if (rv < 0) { 1520 - log_error(ls, "create_lkb idr error %d", rv); 1530 + log_error(ls, "create_lkb xa error %d", rv); 1521 1531 dlm_free_lkb(lkb); 1522 1532 return rv; 1523 1533 } ··· 1526 1540 1527 1541 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 1528 1542 { 1529 - return _create_lkb(ls, lkb_ret, 1, 0); 1543 + return _create_lkb(ls, lkb_ret, 1, ULONG_MAX); 1530 1544 } 1531 1545 1532 1546 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 1533 1547 { 1534 1548 struct dlm_lkb *lkb; 1535 1549 1536 - read_lock_bh(&ls->ls_lkbidr_lock); 1537 - lkb = idr_find(&ls->ls_lkbidr, lkid); 1550 + read_lock_bh(&ls->ls_lkbxa_lock); 1551 + lkb = xa_load(&ls->ls_lkbxa, lkid); 1538 1552 if (lkb) 1539 1553 kref_get(&lkb->lkb_ref); 1540 - read_unlock_bh(&ls->ls_lkbidr_lock); 1554 + read_unlock_bh(&ls->ls_lkbxa_lock); 1541 1555 1542 1556 *lkb_ret = lkb; 1543 1557 return lkb ? 0 : -ENOENT; ··· 1562 1576 int rv; 1563 1577 1564 1578 rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb, 1565 - &ls->ls_lkbidr_lock); 1579 + &ls->ls_lkbxa_lock); 1566 1580 if (rv) { 1567 - idr_remove(&ls->ls_lkbidr, lkid); 1568 - write_unlock_bh(&ls->ls_lkbidr_lock); 1581 + xa_erase(&ls->ls_lkbxa, lkid); 1582 + write_unlock_bh(&ls->ls_lkbxa_lock); 1569 1583 1570 1584 detach_lkb(lkb); 1571 1585 ··· 4309 4323 return; 4310 4324 } 4311 4325 4312 - /* Look for name in rsb toss state, if it's there, kill it. 4313 - * If it's in non toss state, it's being used, and we should ignore this 4326 + /* 4327 + * Look for inactive rsb, if it's there, free it. 4328 + * If the rsb is active, it's being used, and we should ignore this 4314 4329 * message. This is an expected race between the dir node sending a 4315 4330 * request to the master node at the same time as the master node sends 4316 4331 * a remove to the dir node. The resolution to that race is for the ··· 4334 4347 return; 4335 4348 } 4336 4349 4337 - if (!rsb_flag(r, RSB_TOSS)) { 4350 + if (!rsb_flag(r, RSB_INACTIVE)) { 4338 4351 if (r->res_master_nodeid != from_nodeid) { 4339 4352 /* should not happen */ 4340 - log_error(ls, "receive_remove keep from %d master %d", 4353 + log_error(ls, "receive_remove on active rsb from %d master %d", 4341 4354 from_nodeid, r->res_master_nodeid); 4342 4355 dlm_print_rsb(r); 4343 4356 write_unlock_bh(&ls->ls_rsbtbl_lock); 4344 4357 return; 4345 4358 } 4359 + 4360 + /* Ignore the remove message, see race comment above. */ 4346 4361 4347 4362 log_debug(ls, "receive_remove from %d master %d first %x %s", 4348 4363 from_nodeid, r->res_master_nodeid, r->res_first_lkid, ··· 4354 4365 } 4355 4366 4356 4367 if (r->res_master_nodeid != from_nodeid) { 4357 - log_error(ls, "receive_remove toss from %d master %d", 4368 + log_error(ls, "receive_remove inactive from %d master %d", 4358 4369 from_nodeid, r->res_master_nodeid); 4359 4370 dlm_print_rsb(r); 4360 4371 write_unlock_bh(&ls->ls_rsbtbl_lock); 4361 4372 return; 4362 4373 } 4363 4374 4364 - list_del(&r->res_rsbs_list); 4375 + list_del(&r->res_slow_list); 4365 4376 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, 4366 4377 dlm_rhash_rsb_params); 4378 + rsb_clear_flag(r, RSB_HASHED); 4367 4379 write_unlock_bh(&ls->ls_rsbtbl_lock); 4368 4380 4369 - free_toss_rsb(r); 4381 + free_inactive_rsb(r); 4370 4382 } 4371 4383 4372 4384 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) ··· 5434 5444 struct dlm_rsb *r; 5435 5445 5436 5446 read_lock_bh(&ls->ls_rsbtbl_lock); 5437 - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { 5447 + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { 5438 5448 if (!rsb_flag(r, RSB_RECOVER_GRANT)) 5439 5449 continue; 5440 5450 if (!is_master(r)) {
+3 -4
fs/dlm/lock.h
··· 11 11 #ifndef __LOCK_DOT_H__ 12 12 #define __LOCK_DOT_H__ 13 13 14 - void dlm_rsb_toss_timer(struct timer_list *timer); 15 14 void dlm_dump_rsb(struct dlm_rsb *r); 16 15 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len); 17 16 void dlm_print_lkb(struct dlm_lkb *lkb); ··· 18 19 uint32_t saved_seq); 19 20 void dlm_receive_buffer(const union dlm_packet *p, int nodeid); 20 21 int dlm_modes_compat(int mode1, int mode2); 21 - void free_toss_rsb(struct dlm_rsb *r); 22 + void free_inactive_rsb(struct dlm_rsb *r); 22 23 void dlm_put_rsb(struct dlm_rsb *r); 23 24 void dlm_hold_rsb(struct dlm_rsb *r); 24 25 int dlm_put_lkb(struct dlm_lkb *lkb); 25 - void dlm_scan_rsbs(struct dlm_ls *ls); 26 26 int dlm_lock_recovery_try(struct dlm_ls *ls); 27 27 void dlm_lock_recovery(struct dlm_ls *ls); 28 28 void dlm_unlock_recovery(struct dlm_ls *ls); 29 - void dlm_timer_resume(struct dlm_ls *ls); 29 + void dlm_rsb_scan(struct timer_list *timer); 30 + void resume_scan_timer(struct dlm_ls *ls); 30 31 31 32 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, 32 33 int len, unsigned int flags, int *r_nodeid, int *result);
+58 -73
fs/dlm/lockspace.c
··· 38 38 39 39 if (rc) 40 40 return rc; 41 - ls = dlm_find_lockspace_local(ls->ls_local_handle); 41 + ls = dlm_find_lockspace_local(ls); 42 42 if (!ls) 43 43 return -EINVAL; 44 44 ··· 265 265 266 266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) 267 267 { 268 - struct dlm_ls *ls; 268 + struct dlm_ls *ls = lockspace; 269 269 270 - spin_lock_bh(&lslist_lock); 271 - list_for_each_entry(ls, &lslist, ls_list) { 272 - if (ls->ls_local_handle == lockspace) { 273 - atomic_inc(&ls->ls_count); 274 - goto out; 275 - } 276 - } 277 - ls = NULL; 278 - out: 279 - spin_unlock_bh(&lslist_lock); 270 + atomic_inc(&ls->ls_count); 280 271 return ls; 281 272 } 282 273 ··· 401 410 atomic_set(&ls->ls_count, 0); 402 411 init_waitqueue_head(&ls->ls_count_wait); 403 412 ls->ls_flags = 0; 404 - ls->ls_scan_time = jiffies; 405 413 406 414 if (ops && dlm_config.ci_recover_callbacks) { 407 415 ls->ls_ops = ops; 408 416 ls->ls_ops_arg = ops_arg; 409 417 } 410 418 419 + if (flags & DLM_LSFL_SOFTIRQ) 420 + set_bit(LSFL_SOFTIRQ, &ls->ls_flags); 421 + 411 422 /* ls_exflags are forced to match among nodes, and we don't 412 423 * need to require all nodes to have some flags set 413 424 */ 414 - ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); 425 + ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL | 426 + DLM_LSFL_SOFTIRQ)); 415 427 416 - INIT_LIST_HEAD(&ls->ls_toss); 417 - INIT_LIST_HEAD(&ls->ls_keep); 428 + INIT_LIST_HEAD(&ls->ls_slow_inactive); 429 + INIT_LIST_HEAD(&ls->ls_slow_active); 418 430 rwlock_init(&ls->ls_rsbtbl_lock); 419 431 420 432 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); 421 433 if (error) 422 434 goto out_lsfree; 423 435 424 - idr_init(&ls->ls_lkbidr); 425 - rwlock_init(&ls->ls_lkbidr_lock); 436 + xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); 437 + rwlock_init(&ls->ls_lkbxa_lock); 426 438 427 439 INIT_LIST_HEAD(&ls->ls_waiters); 428 440 spin_lock_init(&ls->ls_waiters_lock); 429 441 INIT_LIST_HEAD(&ls->ls_orphans); 430 442 spin_lock_init(&ls->ls_orphans_lock); 431 - 432 - INIT_LIST_HEAD(&ls->ls_new_rsb); 433 - spin_lock_init(&ls->ls_new_rsb_spin); 434 443 435 444 INIT_LIST_HEAD(&ls->ls_nodes); 436 445 INIT_LIST_HEAD(&ls->ls_nodes_gone); ··· 475 484 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); 476 485 if (!ls->ls_recover_buf) { 477 486 error = -ENOMEM; 478 - goto out_lkbidr; 487 + goto out_lkbxa; 479 488 } 480 489 481 490 ls->ls_slot = 0; ··· 485 494 486 495 INIT_LIST_HEAD(&ls->ls_recover_list); 487 496 spin_lock_init(&ls->ls_recover_list_lock); 488 - idr_init(&ls->ls_recover_idr); 489 - spin_lock_init(&ls->ls_recover_idr_lock); 497 + xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH); 498 + spin_lock_init(&ls->ls_recover_xa_lock); 490 499 ls->ls_recover_list_count = 0; 491 - ls->ls_local_handle = ls; 492 500 init_waitqueue_head(&ls->ls_wait_general); 493 501 INIT_LIST_HEAD(&ls->ls_masters_list); 494 502 rwlock_init(&ls->ls_masters_lock); 495 503 INIT_LIST_HEAD(&ls->ls_dir_dump_list); 496 504 rwlock_init(&ls->ls_dir_dump_lock); 497 505 498 - INIT_LIST_HEAD(&ls->ls_toss_q); 499 - spin_lock_init(&ls->ls_toss_q_lock); 500 - timer_setup(&ls->ls_timer, dlm_rsb_toss_timer, 501 - TIMER_DEFERRABLE); 506 + INIT_LIST_HEAD(&ls->ls_scan_list); 507 + spin_lock_init(&ls->ls_scan_lock); 508 + timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE); 502 509 503 510 spin_lock_bh(&lslist_lock); 504 511 ls->ls_create_count = 1; 505 512 list_add(&ls->ls_list, &lslist); 506 513 spin_unlock_bh(&lslist_lock); 507 514 508 - if (flags & DLM_LSFL_FS) { 509 - error = dlm_callback_start(ls); 510 - if (error) { 511 - log_error(ls, "can't start dlm_callback %d", error); 512 - goto out_delist; 513 - } 515 + if (flags & DLM_LSFL_FS) 516 + set_bit(LSFL_FS, &ls->ls_flags); 517 + 518 + error = dlm_callback_start(ls); 519 + if (error) { 520 + log_error(ls, "can't start dlm_callback %d", error); 521 + goto out_delist; 514 522 } 515 523 516 524 init_waitqueue_head(&ls->ls_recover_lock_wait); ··· 574 584 spin_lock_bh(&lslist_lock); 575 585 list_del(&ls->ls_list); 576 586 spin_unlock_bh(&lslist_lock); 577 - idr_destroy(&ls->ls_recover_idr); 587 + xa_destroy(&ls->ls_recover_xa); 578 588 kfree(ls->ls_recover_buf); 579 - out_lkbidr: 580 - idr_destroy(&ls->ls_lkbidr); 589 + out_lkbxa: 590 + xa_destroy(&ls->ls_lkbxa); 581 591 rhashtable_destroy(&ls->ls_rsbtbl); 582 592 out_lsfree: 583 593 if (do_unreg) ··· 633 643 void *ops_arg, int *ops_result, 634 644 dlm_lockspace_t **lockspace) 635 645 { 646 + if (flags & DLM_LSFL_SOFTIRQ) 647 + return -EINVAL; 648 + 636 649 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops, 637 650 ops_arg, ops_result, lockspace); 638 651 } 639 652 640 - static int lkb_idr_is_local(int id, void *p, void *data) 653 + static int lkb_idr_free(struct dlm_lkb *lkb) 641 654 { 642 - struct dlm_lkb *lkb = p; 643 - 644 - return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; 645 - } 646 - 647 - static int lkb_idr_is_any(int id, void *p, void *data) 648 - { 649 - return 1; 650 - } 651 - 652 - static int lkb_idr_free(int id, void *p, void *data) 653 - { 654 - struct dlm_lkb *lkb = p; 655 - 656 655 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) 657 656 dlm_free_lvb(lkb->lkb_lvbptr); 658 657 ··· 649 670 return 0; 650 671 } 651 672 652 - /* NOTE: We check the lkbidr here rather than the resource table. 673 + /* NOTE: We check the lkbxa here rather than the resource table. 653 674 This is because there may be LKBs queued as ASTs that have been unlinked 654 675 from their RSBs and are pending deletion once the AST has been delivered */ 655 676 656 677 static int lockspace_busy(struct dlm_ls *ls, int force) 657 678 { 658 - int rv; 679 + struct dlm_lkb *lkb; 680 + unsigned long id; 681 + int rv = 0; 659 682 660 - read_lock_bh(&ls->ls_lkbidr_lock); 683 + read_lock_bh(&ls->ls_lkbxa_lock); 661 684 if (force == 0) { 662 - rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); 685 + xa_for_each(&ls->ls_lkbxa, id, lkb) { 686 + rv = 1; 687 + break; 688 + } 663 689 } else if (force == 1) { 664 - rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); 690 + xa_for_each(&ls->ls_lkbxa, id, lkb) { 691 + if (lkb->lkb_nodeid == 0 && 692 + lkb->lkb_grmode != DLM_LOCK_IV) { 693 + rv = 1; 694 + break; 695 + } 696 + } 665 697 } else { 666 698 rv = 0; 667 699 } 668 - read_unlock_bh(&ls->ls_lkbidr_lock); 700 + read_unlock_bh(&ls->ls_lkbxa_lock); 669 701 return rv; 670 702 } 671 703 ··· 689 699 690 700 static int release_lockspace(struct dlm_ls *ls, int force) 691 701 { 692 - struct dlm_rsb *rsb; 702 + struct dlm_lkb *lkb; 703 + unsigned long id; 693 704 int busy, rv; 694 705 695 706 busy = lockspace_busy(ls, force); ··· 730 739 * time_shutdown_sync(), we don't care anymore 731 740 */ 732 741 clear_bit(LSFL_RUNNING, &ls->ls_flags); 733 - timer_shutdown_sync(&ls->ls_timer); 742 + timer_shutdown_sync(&ls->ls_scan_timer); 734 743 735 744 if (ls_count == 1) { 736 745 dlm_clear_members(ls); ··· 743 752 744 753 dlm_delete_debug_file(ls); 745 754 746 - idr_destroy(&ls->ls_recover_idr); 755 + xa_destroy(&ls->ls_recover_xa); 747 756 kfree(ls->ls_recover_buf); 748 757 749 758 /* 750 - * Free all lkb's in idr 759 + * Free all lkb's in xa 751 760 */ 752 - 753 - idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); 754 - idr_destroy(&ls->ls_lkbidr); 761 + xa_for_each(&ls->ls_lkbxa, id, lkb) { 762 + lkb_idr_free(lkb); 763 + } 764 + xa_destroy(&ls->ls_lkbxa); 755 765 756 766 /* 757 767 * Free all rsb's on rsbtbl 758 768 */ 759 769 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); 760 - 761 - while (!list_empty(&ls->ls_new_rsb)) { 762 - rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, 763 - res_hashchain); 764 - list_del(&rsb->res_hashchain); 765 - dlm_free_rsb(rsb); 766 - } 767 770 768 771 /* 769 772 * Free structures on any other lists
+1 -7
fs/dlm/lowcomms.c
··· 461 461 return false; 462 462 } 463 463 464 - int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 464 + int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr) 465 465 { 466 466 struct connection *con; 467 467 bool ret, idx; ··· 857 857 kfree(pentry->buf); 858 858 kfree(pentry); 859 859 } 860 - 861 - struct dlm_processed_nodes { 862 - int nodeid; 863 - 864 - struct list_head list; 865 - }; 866 860 867 861 static void process_dlm_messages(struct work_struct *work) 868 862 {
+1 -1
fs/dlm/lowcomms.h
··· 46 46 int dlm_lowcomms_resend_msg(struct dlm_msg *msg); 47 47 int dlm_lowcomms_connect_node(int nodeid); 48 48 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); 49 - int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); 49 + int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr); 50 50 void dlm_midcomms_receive_done(int nodeid); 51 51 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void); 52 52 struct kmem_cache *dlm_lowcomms_msg_cache_create(void);
+1 -1
fs/dlm/member.c
··· 642 642 set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); 643 643 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); 644 644 if (new) 645 - timer_delete_sync(&ls->ls_timer); 645 + timer_delete_sync(&ls->ls_scan_timer); 646 646 ls->ls_recover_seq++; 647 647 648 648 /* activate requestqueue and stop processing */
+9 -1
fs/dlm/memory.c
··· 72 72 73 73 void dlm_memory_exit(void) 74 74 { 75 + rcu_barrier(); 76 + 75 77 kmem_cache_destroy(writequeue_cache); 76 78 kmem_cache_destroy(mhandle_cache); 77 79 kmem_cache_destroy(msg_cache); ··· 103 101 return r; 104 102 } 105 103 106 - void dlm_free_rsb(struct dlm_rsb *r) 104 + static void __free_rsb_rcu(struct rcu_head *rcu) 107 105 { 106 + struct dlm_rsb *r = container_of(rcu, struct dlm_rsb, rcu); 108 107 if (r->res_lvbptr) 109 108 dlm_free_lvb(r->res_lvbptr); 110 109 kmem_cache_free(rsb_cache, r); 110 + } 111 + 112 + void dlm_free_rsb(struct dlm_rsb *r) 113 + { 114 + call_rcu(&r->rcu, __free_rsb_rcu); 111 115 } 112 116 113 117 struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)
+2 -2
fs/dlm/midcomms.c
··· 334 334 return __find_node(nodeid, nodeid_hash(nodeid)); 335 335 } 336 336 337 - int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 337 + int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr) 338 338 { 339 339 int ret, idx, r = nodeid_hash(nodeid); 340 340 struct midcomms_node *node; 341 341 342 - ret = dlm_lowcomms_addr(nodeid, addr, len); 342 + ret = dlm_lowcomms_addr(nodeid, addr); 343 343 if (ret) 344 344 return ret; 345 345
+1 -1
fs/dlm/midcomms.h
··· 19 19 struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, char **ppc); 20 20 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name, 21 21 int namelen); 22 - int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); 22 + int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr); 23 23 void dlm_midcomms_version_wait(void); 24 24 int dlm_midcomms_close(int nodeid); 25 25 int dlm_midcomms_start(void);
+40 -38
fs/dlm/recover.c
··· 293 293 spin_unlock_bh(&ls->ls_recover_list_lock); 294 294 } 295 295 296 - static int recover_idr_empty(struct dlm_ls *ls) 296 + static int recover_xa_empty(struct dlm_ls *ls) 297 297 { 298 298 int empty = 1; 299 299 300 - spin_lock_bh(&ls->ls_recover_idr_lock); 300 + spin_lock_bh(&ls->ls_recover_xa_lock); 301 301 if (ls->ls_recover_list_count) 302 302 empty = 0; 303 - spin_unlock_bh(&ls->ls_recover_idr_lock); 303 + spin_unlock_bh(&ls->ls_recover_xa_lock); 304 304 305 305 return empty; 306 306 } 307 307 308 - static int recover_idr_add(struct dlm_rsb *r) 308 + static int recover_xa_add(struct dlm_rsb *r) 309 309 { 310 310 struct dlm_ls *ls = r->res_ls; 311 + struct xa_limit limit = { 312 + .min = 1, 313 + .max = UINT_MAX, 314 + }; 315 + uint32_t id; 311 316 int rv; 312 317 313 - spin_lock_bh(&ls->ls_recover_idr_lock); 318 + spin_lock_bh(&ls->ls_recover_xa_lock); 314 319 if (r->res_id) { 315 320 rv = -1; 316 321 goto out_unlock; 317 322 } 318 - rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT); 323 + rv = xa_alloc(&ls->ls_recover_xa, &id, r, limit, GFP_ATOMIC); 319 324 if (rv < 0) 320 325 goto out_unlock; 321 326 322 - r->res_id = rv; 327 + r->res_id = id; 323 328 ls->ls_recover_list_count++; 324 329 dlm_hold_rsb(r); 325 330 rv = 0; 326 331 out_unlock: 327 - spin_unlock_bh(&ls->ls_recover_idr_lock); 332 + spin_unlock_bh(&ls->ls_recover_xa_lock); 328 333 return rv; 329 334 } 330 335 331 - static void recover_idr_del(struct dlm_rsb *r) 336 + static void recover_xa_del(struct dlm_rsb *r) 332 337 { 333 338 struct dlm_ls *ls = r->res_ls; 334 339 335 - spin_lock_bh(&ls->ls_recover_idr_lock); 336 - idr_remove(&ls->ls_recover_idr, r->res_id); 340 + spin_lock_bh(&ls->ls_recover_xa_lock); 341 + xa_erase_bh(&ls->ls_recover_xa, r->res_id); 337 342 r->res_id = 0; 338 343 ls->ls_recover_list_count--; 339 - spin_unlock_bh(&ls->ls_recover_idr_lock); 344 + spin_unlock_bh(&ls->ls_recover_xa_lock); 340 345 341 346 dlm_put_rsb(r); 342 347 } 343 348 344 - static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) 349 + static struct dlm_rsb *recover_xa_find(struct dlm_ls *ls, uint64_t id) 345 350 { 346 351 struct dlm_rsb *r; 347 352 348 - spin_lock_bh(&ls->ls_recover_idr_lock); 349 - r = idr_find(&ls->ls_recover_idr, (int)id); 350 - spin_unlock_bh(&ls->ls_recover_idr_lock); 353 + spin_lock_bh(&ls->ls_recover_xa_lock); 354 + r = xa_load(&ls->ls_recover_xa, (int)id); 355 + spin_unlock_bh(&ls->ls_recover_xa_lock); 351 356 return r; 352 357 } 353 358 354 - static void recover_idr_clear(struct dlm_ls *ls) 359 + static void recover_xa_clear(struct dlm_ls *ls) 355 360 { 356 361 struct dlm_rsb *r; 357 - int id; 362 + unsigned long id; 358 363 359 - spin_lock_bh(&ls->ls_recover_idr_lock); 364 + spin_lock_bh(&ls->ls_recover_xa_lock); 360 365 361 - idr_for_each_entry(&ls->ls_recover_idr, r, id) { 362 - idr_remove(&ls->ls_recover_idr, id); 366 + xa_for_each(&ls->ls_recover_xa, id, r) { 367 + xa_erase_bh(&ls->ls_recover_xa, id); 363 368 r->res_id = 0; 364 369 r->res_recover_locks_count = 0; 365 370 ls->ls_recover_list_count--; ··· 377 372 ls->ls_recover_list_count); 378 373 ls->ls_recover_list_count = 0; 379 374 } 380 - spin_unlock_bh(&ls->ls_recover_idr_lock); 375 + spin_unlock_bh(&ls->ls_recover_xa_lock); 381 376 } 382 377 383 378 ··· 475 470 set_new_master(r); 476 471 error = 0; 477 472 } else { 478 - recover_idr_add(r); 473 + recover_xa_add(r); 479 474 error = dlm_send_rcom_lookup(r, dir_nodeid, seq); 480 475 } 481 476 ··· 556 551 557 552 log_rinfo(ls, "dlm_recover_masters %u of %u", count, total); 558 553 559 - error = dlm_wait_function(ls, &recover_idr_empty); 554 + error = dlm_wait_function(ls, &recover_xa_empty); 560 555 out: 561 556 if (error) 562 - recover_idr_clear(ls); 557 + recover_xa_clear(ls); 563 558 return error; 564 559 } 565 560 ··· 568 563 struct dlm_rsb *r; 569 564 int ret_nodeid, new_master; 570 565 571 - r = recover_idr_find(ls, le64_to_cpu(rc->rc_id)); 566 + r = recover_xa_find(ls, le64_to_cpu(rc->rc_id)); 572 567 if (!r) { 573 568 log_error(ls, "dlm_recover_master_reply no id %llx", 574 569 (unsigned long long)le64_to_cpu(rc->rc_id)); ··· 587 582 r->res_nodeid = new_master; 588 583 set_new_master(r); 589 584 unlock_rsb(r); 590 - recover_idr_del(r); 585 + recover_xa_del(r); 591 586 592 - if (recover_idr_empty(ls)) 587 + if (recover_xa_empty(ls)) 593 588 wake_up(&ls->ls_wait_general); 594 589 out: 595 590 return 0; ··· 882 877 log_rinfo(ls, "dlm_recover_rsbs %d done", count); 883 878 } 884 879 885 - /* Create a single list of all root rsb's to be used during recovery */ 886 - 887 - void dlm_clear_toss(struct dlm_ls *ls) 880 + void dlm_clear_inactive(struct dlm_ls *ls) 888 881 { 889 882 struct dlm_rsb *r, *safe; 890 883 unsigned int count = 0; 891 884 892 885 write_lock_bh(&ls->ls_rsbtbl_lock); 893 - list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) { 894 - list_del(&r->res_rsbs_list); 886 + list_for_each_entry_safe(r, safe, &ls->ls_slow_inactive, res_slow_list) { 887 + list_del(&r->res_slow_list); 895 888 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, 896 889 dlm_rhash_rsb_params); 897 890 898 - /* remove it from the toss queue if its part of it */ 899 - if (!list_empty(&r->res_toss_q_list)) 900 - list_del_init(&r->res_toss_q_list); 891 + if (!list_empty(&r->res_scan_list)) 892 + list_del_init(&r->res_scan_list); 901 893 902 - free_toss_rsb(r); 894 + free_inactive_rsb(r); 903 895 count++; 904 896 } 905 897 write_unlock_bh(&ls->ls_rsbtbl_lock); 906 898 907 899 if (count) 908 - log_rinfo(ls, "dlm_clear_toss %u done", count); 900 + log_rinfo(ls, "dlm_clear_inactive %u done", count); 909 901 } 910 902
+1 -1
fs/dlm/recover.h
··· 25 25 int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq, 26 26 const struct list_head *root_list); 27 27 void dlm_recovered_lock(struct dlm_rsb *r); 28 - void dlm_clear_toss(struct dlm_ls *ls); 28 + void dlm_clear_inactive(struct dlm_ls *ls); 29 29 void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list); 30 30 31 31 #endif /* __RECOVER_DOT_H__ */
+7 -7
fs/dlm/recoverd.c
··· 33 33 } 34 34 35 35 read_lock_bh(&ls->ls_rsbtbl_lock); 36 - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { 36 + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { 37 37 if (r->res_nodeid) 38 38 continue; 39 39 ··· 63 63 struct dlm_rsb *r; 64 64 65 65 read_lock_bh(&ls->ls_rsbtbl_lock); 66 - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { 66 + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { 67 67 list_add(&r->res_root_list, root_list); 68 68 dlm_hold_rsb(r); 69 69 } 70 70 71 - WARN_ON_ONCE(!list_empty(&ls->ls_toss)); 71 + WARN_ON_ONCE(!list_empty(&ls->ls_slow_inactive)); 72 72 read_unlock_bh(&ls->ls_rsbtbl_lock); 73 73 } 74 74 ··· 98 98 spin_lock_bh(&ls->ls_recover_lock); 99 99 if (ls->ls_recover_seq == seq) { 100 100 set_bit(LSFL_RUNNING, &ls->ls_flags); 101 - /* Schedule next timer if recovery put something on toss. 101 + /* Schedule next timer if recovery put something on inactive. 102 102 * 103 103 * The rsbs that was queued while recovery on toss hasn't 104 104 * started yet because LSFL_RUNNING was set everything 105 105 * else recovery hasn't started as well because ls_in_recovery 106 106 * is still hold. So we should not run into the case that 107 - * dlm_timer_resume() queues a timer that can occur in 107 + * resume_scan_timer() queues a timer that can occur in 108 108 * a no op. 109 109 */ 110 - dlm_timer_resume(ls); 110 + resume_scan_timer(ls); 111 111 /* unblocks processes waiting to enter the dlm */ 112 112 up_write(&ls->ls_in_recovery); 113 113 clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); ··· 131 131 132 132 dlm_callback_suspend(ls); 133 133 134 - dlm_clear_toss(ls); 134 + dlm_clear_inactive(ls); 135 135 136 136 /* 137 137 * This list of root rsb's will be the basis of most of the recovery
+18 -24
fs/dlm/user.c
··· 182 182 struct dlm_user_args *ua; 183 183 struct dlm_user_proc *proc; 184 184 struct dlm_callback *cb; 185 - int rv; 185 + int rv, copy_lvb; 186 186 187 187 if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) || 188 188 test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags)) ··· 213 213 214 214 spin_lock_bh(&proc->asts_spin); 215 215 216 - rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb); 217 - switch (rv) { 218 - case DLM_ENQUEUE_CALLBACK_NEED_SCHED: 219 - cb->ua = *ua; 220 - cb->lkb_lksb = &cb->ua.lksb; 221 - if (cb->copy_lvb) { 222 - memcpy(cb->lvbptr, ua->lksb.sb_lvbptr, 223 - DLM_USER_LVB_LEN); 224 - cb->lkb_lksb->sb_lvbptr = cb->lvbptr; 225 - } 216 + if (!dlm_may_skip_callback(lkb, flags, mode, status, sbflags, 217 + &copy_lvb)) { 218 + rv = dlm_get_cb(lkb, flags, mode, status, sbflags, &cb); 219 + if (!rv) { 220 + cb->copy_lvb = copy_lvb; 221 + cb->ua = *ua; 222 + cb->lkb_lksb = &cb->ua.lksb; 223 + if (copy_lvb) { 224 + memcpy(cb->lvbptr, ua->lksb.sb_lvbptr, 225 + DLM_USER_LVB_LEN); 226 + cb->lkb_lksb->sb_lvbptr = cb->lvbptr; 227 + } 226 228 227 - list_add_tail(&cb->list, &proc->asts); 228 - wake_up_interruptible(&proc->wait); 229 - break; 230 - case DLM_ENQUEUE_CALLBACK_SUCCESS: 231 - break; 232 - case DLM_ENQUEUE_CALLBACK_FAILURE: 233 - fallthrough; 234 - default: 235 - spin_unlock_bh(&proc->asts_spin); 236 - WARN_ON_ONCE(1); 237 - goto out; 229 + list_add_tail(&cb->list, &proc->asts); 230 + wake_up_interruptible(&proc->wait); 231 + } 238 232 } 239 233 spin_unlock_bh(&proc->asts_spin); 240 234 ··· 448 454 if (params->flags & DLM_USER_LSFLG_FORCEFREE) 449 455 force = 2; 450 456 451 - lockspace = ls->ls_local_handle; 457 + lockspace = ls; 452 458 dlm_put_lockspace(ls); 453 459 454 460 /* The final dlm_release_lockspace waits for references to go to ··· 651 657 return -ENOMEM; 652 658 } 653 659 654 - proc->lockspace = ls->ls_local_handle; 660 + proc->lockspace = ls; 655 661 INIT_LIST_HEAD(&proc->asts); 656 662 INIT_LIST_HEAD(&proc->locks); 657 663 INIT_LIST_HEAD(&proc->unlocking);
+16 -1
include/linux/dlm.h
··· 35 35 int num_slots, int our_slot, uint32_t generation); 36 36 }; 37 37 38 + /* only relevant for kernel lockspaces, will be removed in future */ 39 + #define DLM_LSFL_SOFTIRQ __DLM_LSFL_RESERVED0 40 + 38 41 /* 39 42 * dlm_new_lockspace 40 43 * ··· 58 55 * used to select the directory node. Must be the same on all nodes. 59 56 * DLM_LSFL_NEWEXCL 60 57 * dlm_new_lockspace() should return -EEXIST if the lockspace exists. 58 + * DLM_LSFL_SOFTIRQ 59 + * dlm request callbacks (ast, bast) are softirq safe. Flag should be 60 + * preferred by users. Will be default in some future. If set the 61 + * strongest context for ast, bast callback is softirq as it avoids 62 + * an additional context switch. 61 63 * 62 64 * lvblen: length of lvb in bytes. Must be multiple of 8. 63 65 * dlm_new_lockspace() returns an error if this does not match ··· 129 121 * call. 130 122 * 131 123 * AST routines should not block (at least not for long), but may make 132 - * any locking calls they please. 124 + * any locking calls they please. If DLM_LSFL_SOFTIRQ for kernel 125 + * users of dlm_new_lockspace() is passed the ast and bast callbacks 126 + * can be processed in softirq context. Also some of the callback 127 + * contexts are in the same context as the DLM lock request API, users 128 + * must not hold locks while calling dlm lock request API and trying 129 + * to acquire this lock in the callback again, this will end in a 130 + * lock recursion. For newer implementation the DLM_LSFL_SOFTIRQ 131 + * should be used. 133 132 */ 134 133 135 134 int dlm_lock(dlm_lockspace_t *lockspace,
+2
include/uapi/linux/dlm.h
··· 71 71 /* DLM_LSFL_TIMEWARN is deprecated and reserved. DO NOT USE! */ 72 72 #define DLM_LSFL_TIMEWARN 0x00000002 73 73 #define DLM_LSFL_NEWEXCL 0x00000008 74 + /* currently reserved due in-kernel use */ 75 + #define __DLM_LSFL_RESERVED0 0x00000010 74 76 75 77 76 78 #endif /* _UAPI__DLM_DOT_H__ */