Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

dlm: use a new list for recovery of master rsb names

Add a new "masters_list" for master rsb structs, with a new
rwlock. The new list is created and used during the recovery
process to send the master rsb names to new nodes. With this
change, the current "root_list" can be used without locking.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>

authored by

Alexander Aring and committed by
David Teigland
aff46e0f 29e345f3

+79 -14
+8 -14
fs/dlm/dir.c
··· 216 216 if (!rv) 217 217 return r; 218 218 219 - down_read(&ls->ls_root_sem); 220 - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 219 + list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) { 221 220 if (len == r->res_length && !memcmp(name, r->res_name, len)) { 222 - up_read(&ls->ls_root_sem); 223 221 log_debug(ls, "find_rsb_root revert to root_list %s", 224 222 r->res_name); 225 223 return r; 226 224 } 227 225 } 228 - up_read(&ls->ls_root_sem); 229 226 return NULL; 230 227 } 231 228 ··· 238 241 int offset = 0, dir_nodeid; 239 242 __be16 be_namelen; 240 243 241 - down_read(&ls->ls_root_sem); 244 + read_lock(&ls->ls_masters_lock); 242 245 243 246 if (inlen > 1) { 244 247 r = find_rsb_root(ls, inbuf, inlen); ··· 247 250 nodeid, inlen, inlen, inbuf); 248 251 goto out; 249 252 } 250 - list = r->res_root_list.next; 253 + list = r->res_masters_list.next; 251 254 } else { 252 - list = ls->ls_root_list.next; 255 + list = ls->ls_masters_list.next; 253 256 } 254 257 255 - for (offset = 0; list != &ls->ls_root_list; list = list->next) { 256 - r = list_entry(list, struct dlm_rsb, res_root_list); 257 - if (r->res_nodeid) 258 - continue; 259 - 258 + for (offset = 0; list != &ls->ls_masters_list; list = list->next) { 259 + r = list_entry(list, struct dlm_rsb, res_masters_list); 260 260 dir_nodeid = dlm_dir_nodeid(r); 261 261 if (dir_nodeid != nodeid) 262 262 continue; ··· 288 294 * terminating record. 289 295 */ 290 296 291 - if ((list == &ls->ls_root_list) && 297 + if ((list == &ls->ls_masters_list) && 292 298 (offset + sizeof(uint16_t) <= outlen)) { 293 299 be_namelen = cpu_to_be16(0xFFFF); 294 300 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); ··· 296 302 ls->ls_recover_dir_sent_msg++; 297 303 } 298 304 out: 299 - up_read(&ls->ls_root_sem); 305 + read_unlock(&ls->ls_masters_lock); 300 306 } 301 307
+3
fs/dlm/dlm_internal.h
··· 342 342 struct list_head res_waitqueue; 343 343 344 344 struct list_head res_root_list; /* used for recovery */ 345 + struct list_head res_masters_list; /* used for recovery */ 345 346 struct list_head res_recover_list; /* used for recovery */ 346 347 int res_recover_locks_count; 347 348 ··· 676 675 677 676 struct list_head ls_root_list; /* root resources */ 678 677 struct rw_semaphore ls_root_sem; /* protect root_list */ 678 + struct list_head ls_masters_list; /* root resources */ 679 + rwlock_t ls_masters_lock; /* protect root_list */ 679 680 680 681 const struct dlm_lockspace_ops *ls_ops; 681 682 void *ls_ops_arg;
+2
fs/dlm/lock.c
··· 423 423 INIT_LIST_HEAD(&r->res_waitqueue); 424 424 INIT_LIST_HEAD(&r->res_root_list); 425 425 INIT_LIST_HEAD(&r->res_recover_list); 426 + INIT_LIST_HEAD(&r->res_masters_list); 426 427 427 428 *r_ret = r; 428 429 return 0; ··· 1169 1168 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 1170 1169 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 1171 1170 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 1171 + DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); 1172 1172 } 1173 1173 1174 1174 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
+2
fs/dlm/lockspace.c
··· 582 582 init_waitqueue_head(&ls->ls_wait_general); 583 583 INIT_LIST_HEAD(&ls->ls_root_list); 584 584 init_rwsem(&ls->ls_root_sem); 585 + INIT_LIST_HEAD(&ls->ls_masters_list); 586 + rwlock_init(&ls->ls_masters_lock); 585 587 586 588 spin_lock(&lslist_lock); 587 589 ls->ls_create_count = 1;
+64
fs/dlm/recoverd.c
··· 20 20 #include "requestqueue.h" 21 21 #include "recoverd.h" 22 22 23 + static int dlm_create_masters_list(struct dlm_ls *ls) 24 + { 25 + struct rb_node *n; 26 + struct dlm_rsb *r; 27 + int i, error = 0; 28 + 29 + write_lock(&ls->ls_masters_lock); 30 + if (!list_empty(&ls->ls_masters_list)) { 31 + log_error(ls, "root list not empty"); 32 + error = -EINVAL; 33 + goto out; 34 + } 35 + 36 + for (i = 0; i < ls->ls_rsbtbl_size; i++) { 37 + spin_lock_bh(&ls->ls_rsbtbl[i].lock); 38 + for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 39 + r = rb_entry(n, struct dlm_rsb, res_hashnode); 40 + if (r->res_nodeid) 41 + continue; 42 + 43 + list_add(&r->res_masters_list, &ls->ls_masters_list); 44 + dlm_hold_rsb(r); 45 + } 46 + spin_unlock_bh(&ls->ls_rsbtbl[i].lock); 47 + } 48 + out: 49 + write_unlock(&ls->ls_masters_lock); 50 + return error; 51 + } 52 + 53 + static void dlm_release_masters_list(struct dlm_ls *ls) 54 + { 55 + struct dlm_rsb *r, *safe; 56 + 57 + write_lock(&ls->ls_masters_lock); 58 + list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) { 59 + list_del_init(&r->res_masters_list); 60 + dlm_put_rsb(r); 61 + } 62 + write_unlock(&ls->ls_masters_lock); 63 + } 64 + 23 65 static void dlm_create_root_list(struct dlm_ls *ls) 24 66 { 25 67 struct rb_node *n; ··· 165 123 166 124 dlm_recover_dir_nodeid(ls); 167 125 126 + /* Create a snapshot of all active rsbs were we are the master of. 127 + * During the barrier between dlm_recover_members_wait() and 128 + * dlm_recover_directory() other nodes can dump their necessary 129 + * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom 130 + * communication dlm_copy_master_names() handling. 131 + * 132 + * TODO We should create a per lockspace list that contains rsbs 133 + * that we are the master of. Instead of creating this list while 134 + * recovery we keep track of those rsbs while locking handling and 135 + * recovery can use it when necessary. 136 + */ 137 + error = dlm_create_masters_list(ls); 138 + if (error) { 139 + log_rinfo(ls, "dlm_create_masters_list error %d", error); 140 + goto fail; 141 + } 142 + 168 143 ls->ls_recover_dir_sent_res = 0; 169 144 ls->ls_recover_dir_sent_msg = 0; 170 145 ls->ls_recover_locks_in = 0; ··· 191 132 error = dlm_recover_members_wait(ls, rv->seq); 192 133 if (error) { 193 134 log_rinfo(ls, "dlm_recover_members_wait error %d", error); 135 + dlm_release_masters_list(ls); 194 136 goto fail; 195 137 } 196 138 ··· 205 145 error = dlm_recover_directory(ls, rv->seq); 206 146 if (error) { 207 147 log_rinfo(ls, "dlm_recover_directory error %d", error); 148 + dlm_release_masters_list(ls); 208 149 goto fail; 209 150 } 210 151 ··· 214 153 error = dlm_recover_directory_wait(ls, rv->seq); 215 154 if (error) { 216 155 log_rinfo(ls, "dlm_recover_directory_wait error %d", error); 156 + dlm_release_masters_list(ls); 217 157 goto fail; 218 158 } 159 + 160 + dlm_release_masters_list(ls); 219 161 220 162 log_rinfo(ls, "dlm_recover_directory %u out %u messages", 221 163 ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);