Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

dlm: add node slots and generation

Slot numbers are assigned to nodes when they join the lockspace.
The slot number chosen is the minimum unused value starting at 1.
Once a node is assigned a slot, that slot number will not change
while the node remains a lockspace member. If the node leaves
and rejoins it can be assigned a new slot number.

A new generation number is also added to a lockspace. It is
set and incremented during each recovery along with the slot
collection/assignment.

The slot numbers will be passed to gfs2 which will use them as
journal id's.

Signed-off-by: David Teigland <teigland@redhat.com>

+480 -29
+46 -2
fs/dlm/dlm_internal.h
··· 117 117 struct list_head list; 118 118 int nodeid; 119 119 int weight; 120 + int slot; 121 + int slot_prev; 122 + uint32_t generation; 123 + }; 124 + 125 + /* 126 + * low nodeid saves array of these in ls_slots 127 + */ 128 + 129 + struct dlm_slot { 130 + int nodeid; 131 + int slot; 120 132 }; 121 133 122 134 /* ··· 349 337 /* dlm_header is first element of all structs sent between nodes */ 350 338 351 339 #define DLM_HEADER_MAJOR 0x00030000 352 - #define DLM_HEADER_MINOR 0x00000000 340 + #define DLM_HEADER_MINOR 0x00000001 341 + 342 + #define DLM_HEADER_SLOTS 0x00000001 353 343 354 344 #define DLM_MSG 1 355 345 #define DLM_RCOM 2 ··· 439 425 struct dlm_rcom rcom; 440 426 }; 441 427 428 + #define DLM_RSF_NEED_SLOTS 0x00000001 429 + 430 + /* RCOM_STATUS data */ 431 + struct rcom_status { 432 + __le32 rs_flags; 433 + __le32 rs_unused1; 434 + __le64 rs_unused2; 435 + }; 436 + 437 + /* RCOM_STATUS_REPLY data */ 442 438 struct rcom_config { 443 439 __le32 rf_lvblen; 444 440 __le32 rf_lsflags; 445 - __le64 rf_unused; 441 + 442 + /* DLM_HEADER_SLOTS adds: */ 443 + __le32 rf_flags; 444 + __le16 rf_our_slot; 445 + __le16 rf_num_slots; 446 + __le32 rf_generation; 447 + __le32 rf_unused1; 448 + __le64 rf_unused2; 449 + }; 450 + 451 + struct rcom_slot { 452 + __le32 ro_nodeid; 453 + __le16 ro_slot; 454 + __le16 ro_unused1; 455 + __le64 ro_unused2; 446 456 }; 447 457 448 458 struct rcom_lock { ··· 493 455 struct list_head ls_list; /* list of lockspaces */ 494 456 dlm_lockspace_t *ls_local_handle; 495 457 uint32_t ls_global_id; /* global unique lockspace ID */ 458 + uint32_t ls_generation; 496 459 uint32_t ls_exflags; 497 460 int ls_lvblen; 498 461 int ls_count; /* refcount of processes in ··· 531 492 int ls_low_nodeid; 532 493 int ls_total_weight; 533 494 int *ls_node_array; 495 + 496 + int ls_slot; 497 + int ls_num_slots; 498 + int ls_slots_size; 499 + struct dlm_slot *ls_slots; 534 500 535 501 struct dlm_rsb ls_stub_rsb; /* for returning errors */ 536 502 struct dlm_lkb ls_stub_lkb; /* for returning errors */
+5
fs/dlm/lockspace.c
··· 525 525 if (!ls->ls_recover_buf) 526 526 goto out_dirfree; 527 527 528 + ls->ls_slot = 0; 529 + ls->ls_num_slots = 0; 530 + ls->ls_slots_size = 0; 531 + ls->ls_slots = NULL; 532 + 528 533 INIT_LIST_HEAD(&ls->ls_recover_list); 529 534 spin_lock_init(&ls->ls_recover_list_lock); 530 535 ls->ls_recover_list_count = 0;
+283 -1
fs/dlm/member.c
··· 19 19 #include "config.h" 20 20 #include "lowcomms.h" 21 21 22 + int dlm_slots_version(struct dlm_header *h) 23 + { 24 + if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS) 25 + return 0; 26 + return 1; 27 + } 28 + 29 + void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc, 30 + struct dlm_member *memb) 31 + { 32 + struct rcom_config *rf = (struct rcom_config *)rc->rc_buf; 33 + 34 + if (!dlm_slots_version(&rc->rc_header)) 35 + return; 36 + 37 + memb->slot = le16_to_cpu(rf->rf_our_slot); 38 + memb->generation = le32_to_cpu(rf->rf_generation); 39 + } 40 + 41 + void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc) 42 + { 43 + struct dlm_slot *slot; 44 + struct rcom_slot *ro; 45 + int i; 46 + 47 + ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config)); 48 + 49 + /* ls_slots array is sparse, but not rcom_slots */ 50 + 51 + for (i = 0; i < ls->ls_slots_size; i++) { 52 + slot = &ls->ls_slots[i]; 53 + if (!slot->nodeid) 54 + continue; 55 + ro->ro_nodeid = cpu_to_le32(slot->nodeid); 56 + ro->ro_slot = cpu_to_le16(slot->slot); 57 + ro++; 58 + } 59 + } 60 + 61 + #define SLOT_DEBUG_LINE 128 62 + 63 + static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, 64 + struct rcom_slot *ro0, struct dlm_slot *array, 65 + int array_size) 66 + { 67 + char line[SLOT_DEBUG_LINE]; 68 + int len = SLOT_DEBUG_LINE - 1; 69 + int pos = 0; 70 + int ret, i; 71 + 72 + if (!dlm_config.ci_log_debug) 73 + return; 74 + 75 + memset(line, 0, sizeof(line)); 76 + 77 + if (array) { 78 + for (i = 0; i < array_size; i++) { 79 + if (!array[i].nodeid) 80 + continue; 81 + 82 + ret = snprintf(line + pos, len - pos, " %d:%d", 83 + array[i].slot, array[i].nodeid); 84 + if (ret >= len - pos) 85 + break; 86 + pos += ret; 87 + } 88 + } else if (ro0) { 89 + for (i = 0; i < num_slots; i++) { 90 + ret = snprintf(line + pos, len - pos, " %d:%d", 91 + ro0[i].ro_slot, ro0[i].ro_nodeid); 92 + if (ret >= len - pos) 93 + break; 94 + pos += ret; 95 + } 96 + } 97 + 98 + log_debug(ls, "generation %u slots %d%s", gen, num_slots, line); 99 + } 100 + 101 + int dlm_slots_copy_in(struct dlm_ls *ls) 102 + { 103 + struct dlm_member *memb; 104 + struct dlm_rcom *rc = ls->ls_recover_buf; 105 + struct rcom_config *rf = (struct rcom_config *)rc->rc_buf; 106 + struct rcom_slot *ro0, *ro; 107 + int our_nodeid = dlm_our_nodeid(); 108 + int i, num_slots; 109 + uint32_t gen; 110 + 111 + if (!dlm_slots_version(&rc->rc_header)) 112 + return -1; 113 + 114 + gen = le32_to_cpu(rf->rf_generation); 115 + if (gen <= ls->ls_generation) { 116 + log_error(ls, "dlm_slots_copy_in gen %u old %u", 117 + gen, ls->ls_generation); 118 + } 119 + ls->ls_generation = gen; 120 + 121 + num_slots = le16_to_cpu(rf->rf_num_slots); 122 + if (!num_slots) 123 + return -1; 124 + 125 + ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config)); 126 + 127 + for (i = 0, ro = ro0; i < num_slots; i++, ro++) { 128 + ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid); 129 + ro->ro_slot = le16_to_cpu(ro->ro_slot); 130 + } 131 + 132 + log_debug_slots(ls, gen, num_slots, ro0, NULL, 0); 133 + 134 + list_for_each_entry(memb, &ls->ls_nodes, list) { 135 + for (i = 0, ro = ro0; i < num_slots; i++, ro++) { 136 + if (ro->ro_nodeid != memb->nodeid) 137 + continue; 138 + memb->slot = ro->ro_slot; 139 + memb->slot_prev = memb->slot; 140 + break; 141 + } 142 + 143 + if (memb->nodeid == our_nodeid) { 144 + if (ls->ls_slot && ls->ls_slot != memb->slot) { 145 + log_error(ls, "dlm_slots_copy_in our slot " 146 + "changed %d %d", ls->ls_slot, 147 + memb->slot); 148 + return -1; 149 + } 150 + 151 + if (!ls->ls_slot) 152 + ls->ls_slot = memb->slot; 153 + } 154 + 155 + if (!memb->slot) { 156 + log_error(ls, "dlm_slots_copy_in nodeid %d no slot", 157 + memb->nodeid); 158 + return -1; 159 + } 160 + } 161 + 162 + return 0; 163 + } 164 + 165 + /* for any nodes that do not support slots, we will not have set memb->slot 166 + in wait_status_all(), so memb->slot will remain -1, and we will not 167 + assign slots or set ls_num_slots here */ 168 + 169 + int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size, 170 + struct dlm_slot **slots_out, uint32_t *gen_out) 171 + { 172 + struct dlm_member *memb; 173 + struct dlm_slot *array; 174 + int our_nodeid = dlm_our_nodeid(); 175 + int array_size, max_slots, i; 176 + int need = 0; 177 + int max = 0; 178 + int num = 0; 179 + uint32_t gen = 0; 180 + 181 + /* our own memb struct will have slot -1 gen 0 */ 182 + 183 + list_for_each_entry(memb, &ls->ls_nodes, list) { 184 + if (memb->nodeid == our_nodeid) { 185 + memb->slot = ls->ls_slot; 186 + memb->generation = ls->ls_generation; 187 + break; 188 + } 189 + } 190 + 191 + list_for_each_entry(memb, &ls->ls_nodes, list) { 192 + if (memb->generation > gen) 193 + gen = memb->generation; 194 + 195 + /* node doesn't support slots */ 196 + 197 + if (memb->slot == -1) 198 + return -1; 199 + 200 + /* node needs a slot assigned */ 201 + 202 + if (!memb->slot) 203 + need++; 204 + 205 + /* node has a slot assigned */ 206 + 207 + num++; 208 + 209 + if (!max || max < memb->slot) 210 + max = memb->slot; 211 + 212 + /* sanity check, once slot is assigned it shouldn't change */ 213 + 214 + if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) { 215 + log_error(ls, "nodeid %d slot changed %d %d", 216 + memb->nodeid, memb->slot_prev, memb->slot); 217 + return -1; 218 + } 219 + memb->slot_prev = memb->slot; 220 + } 221 + 222 + array_size = max + need; 223 + 224 + array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS); 225 + if (!array) 226 + return -ENOMEM; 227 + 228 + num = 0; 229 + 230 + /* fill in slots (offsets) that are used */ 231 + 232 + list_for_each_entry(memb, &ls->ls_nodes, list) { 233 + if (!memb->slot) 234 + continue; 235 + 236 + if (memb->slot > array_size) { 237 + log_error(ls, "invalid slot number %d", memb->slot); 238 + kfree(array); 239 + return -1; 240 + } 241 + 242 + array[memb->slot - 1].nodeid = memb->nodeid; 243 + array[memb->slot - 1].slot = memb->slot; 244 + num++; 245 + } 246 + 247 + /* assign new slots from unused offsets */ 248 + 249 + list_for_each_entry(memb, &ls->ls_nodes, list) { 250 + if (memb->slot) 251 + continue; 252 + 253 + for (i = 0; i < array_size; i++) { 254 + if (array[i].nodeid) 255 + continue; 256 + 257 + memb->slot = i + 1; 258 + memb->slot_prev = memb->slot; 259 + array[i].nodeid = memb->nodeid; 260 + array[i].slot = memb->slot; 261 + num++; 262 + 263 + if (!ls->ls_slot && memb->nodeid == our_nodeid) 264 + ls->ls_slot = memb->slot; 265 + break; 266 + } 267 + 268 + if (!memb->slot) { 269 + log_error(ls, "no free slot found"); 270 + kfree(array); 271 + return -1; 272 + } 273 + } 274 + 275 + gen++; 276 + 277 + log_debug_slots(ls, gen, num, NULL, array, array_size); 278 + 279 + max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) - 280 + sizeof(struct rcom_config)) / sizeof(struct rcom_slot); 281 + 282 + if (num > max_slots) { 283 + log_error(ls, "num_slots %d exceeds max_slots %d", 284 + num, max_slots); 285 + kfree(array); 286 + return -1; 287 + } 288 + 289 + *gen_out = gen; 290 + *slots_out = array; 291 + *slots_size = array_size; 292 + *num_slots = num; 293 + return 0; 294 + } 295 + 22 296 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 23 297 { 24 298 struct dlm_member *memb = NULL; ··· 450 176 error = dlm_recovery_stopped(ls); 451 177 if (error) 452 178 break; 453 - error = dlm_rcom_status(ls, memb->nodeid); 179 + error = dlm_rcom_status(ls, memb->nodeid, 0); 454 180 if (error) 455 181 break; 456 182 } ··· 596 322 */ 597 323 598 324 dlm_recoverd_suspend(ls); 325 + 326 + spin_lock(&ls->ls_recover_lock); 327 + kfree(ls->ls_slots); 328 + ls->ls_slots = NULL; 329 + ls->ls_num_slots = 0; 330 + ls->ls_slots_size = 0; 599 331 ls->ls_recover_status = 0; 332 + spin_unlock(&ls->ls_recover_lock); 333 + 600 334 dlm_recoverd_resume(ls); 601 335 602 336 if (!ls->ls_recover_begin)
+7
fs/dlm/member.h
··· 20 20 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out); 21 21 int dlm_is_removed(struct dlm_ls *ls, int nodeid); 22 22 int dlm_is_member(struct dlm_ls *ls, int nodeid); 23 + int dlm_slots_version(struct dlm_header *h); 24 + void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc, 25 + struct dlm_member *memb); 26 + void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc); 27 + int dlm_slots_copy_in(struct dlm_ls *ls); 28 + int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size, 29 + struct dlm_slot **slots_out, uint32_t *gen_out); 23 30 24 31 #endif /* __MEMBER_DOT_H__ */ 25 32
+82 -17
fs/dlm/rcom.c
··· 23 23 #include "memory.h" 24 24 #include "lock.h" 25 25 #include "util.h" 26 + #include "member.h" 26 27 27 28 28 29 static int rcom_response(struct dlm_ls *ls) ··· 73 72 dlm_lowcomms_commit_buffer(mh); 74 73 } 75 74 75 + static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs, 76 + uint32_t flags) 77 + { 78 + rs->rs_flags = cpu_to_le32(flags); 79 + } 80 + 76 81 /* When replying to a status request, a node also sends back its 77 82 configuration values. The requesting node then checks that the remote 78 83 node is configured the same way as itself. */ 79 84 80 - static void make_config(struct dlm_ls *ls, struct rcom_config *rf) 85 + static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf, 86 + uint32_t num_slots) 81 87 { 82 88 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen); 83 89 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags); 90 + 91 + rf->rf_our_slot = cpu_to_le16(ls->ls_slot); 92 + rf->rf_num_slots = cpu_to_le16(num_slots); 93 + rf->rf_generation = cpu_to_le32(ls->ls_generation); 84 94 } 85 95 86 - static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 96 + static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 87 97 { 88 98 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; 89 - size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config); 90 99 91 100 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) { 92 101 log_error(ls, "version mismatch: %x nodeid %d: %x", 93 102 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid, 94 103 rc->rc_header.h_version); 95 - return -EPROTO; 96 - } 97 - 98 - if (rc->rc_header.h_length < conf_size) { 99 - log_error(ls, "config too short: %d nodeid %d", 100 - rc->rc_header.h_length, nodeid); 101 104 return -EPROTO; 102 105 } 103 106 ··· 132 127 spin_unlock(&ls->ls_rcom_spin); 133 128 } 134 129 135 - int dlm_rcom_status(struct dlm_ls *ls, int nodeid) 130 + /* 131 + * low nodeid gathers one slot value at a time from each node. 132 + * it sets need_slots=0, and saves rf_our_slot returned from each 133 + * rcom_config. 134 + * 135 + * other nodes gather all slot values at once from the low nodeid. 136 + * they set need_slots=1, and ignore the rf_our_slot returned from each 137 + * rcom_config. they use the rf_num_slots returned from the low 138 + * node's rcom_config. 139 + */ 140 + 141 + int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) 136 142 { 137 143 struct dlm_rcom *rc; 138 144 struct dlm_mhandle *mh; ··· 157 141 goto out; 158 142 } 159 143 160 - error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh); 144 + error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 145 + sizeof(struct rcom_status), &rc, &mh); 161 146 if (error) 162 147 goto out; 148 + 149 + set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags); 163 150 164 151 allow_sync_reply(ls, &rc->rc_id); 165 152 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size); ··· 180 161 /* we pretend the remote lockspace exists with 0 status */ 181 162 log_debug(ls, "remote node %d not ready", nodeid); 182 163 rc->rc_result = 0; 183 - } else 184 - error = check_config(ls, rc, nodeid); 164 + error = 0; 165 + } else { 166 + error = check_rcom_config(ls, rc, nodeid); 167 + } 168 + 185 169 /* the caller looks at rc_result for the remote recovery status */ 186 170 out: 187 171 return error; ··· 194 172 { 195 173 struct dlm_rcom *rc; 196 174 struct dlm_mhandle *mh; 197 - int error, nodeid = rc_in->rc_header.h_nodeid; 175 + struct rcom_status *rs; 176 + uint32_t status; 177 + int nodeid = rc_in->rc_header.h_nodeid; 178 + int len = sizeof(struct rcom_config); 179 + int num_slots = 0; 180 + int error; 198 181 182 + if (!dlm_slots_version(&rc_in->rc_header)) { 183 + status = dlm_recover_status(ls); 184 + goto do_create; 185 + } 186 + 187 + rs = (struct rcom_status *)rc_in->rc_buf; 188 + 189 + if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) { 190 + status = dlm_recover_status(ls); 191 + goto do_create; 192 + } 193 + 194 + spin_lock(&ls->ls_recover_lock); 195 + status = ls->ls_recover_status; 196 + num_slots = ls->ls_num_slots; 197 + spin_unlock(&ls->ls_recover_lock); 198 + len += num_slots * sizeof(struct rcom_slot); 199 + 200 + do_create: 199 201 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY, 200 - sizeof(struct rcom_config), &rc, &mh); 202 + len, &rc, &mh); 201 203 if (error) 202 204 return; 205 + 203 206 rc->rc_id = rc_in->rc_id; 204 207 rc->rc_seq_reply = rc_in->rc_seq; 205 - rc->rc_result = dlm_recover_status(ls); 206 - make_config(ls, (struct rcom_config *) rc->rc_buf); 208 + rc->rc_result = status; 207 209 210 + set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots); 211 + 212 + if (!num_slots) 213 + goto do_send; 214 + 215 + spin_lock(&ls->ls_recover_lock); 216 + if (ls->ls_num_slots != num_slots) { 217 + spin_unlock(&ls->ls_recover_lock); 218 + log_debug(ls, "receive_rcom_status num_slots %d to %d", 219 + num_slots, ls->ls_num_slots); 220 + rc->rc_result = 0; 221 + set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0); 222 + goto do_send; 223 + } 224 + 225 + dlm_slots_copy_out(ls, rc); 226 + spin_unlock(&ls->ls_recover_lock); 227 + 228 + do_send: 208 229 send_rcom(ls, mh, rc); 209 230 } 210 231
+1 -1
fs/dlm/rcom.h
··· 14 14 #ifndef __RCOM_DOT_H__ 15 15 #define __RCOM_DOT_H__ 16 16 17 - int dlm_rcom_status(struct dlm_ls *ls, int nodeid); 17 + int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); 18 18 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); 19 19 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); 20 20 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+56 -8
fs/dlm/recover.c
··· 85 85 return status; 86 86 } 87 87 88 + static void _set_recover_status(struct dlm_ls *ls, uint32_t status) 89 + { 90 + ls->ls_recover_status |= status; 91 + } 92 + 88 93 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) 89 94 { 90 95 spin_lock(&ls->ls_recover_lock); 91 - ls->ls_recover_status |= status; 96 + _set_recover_status(ls, status); 92 97 spin_unlock(&ls->ls_recover_lock); 93 98 } 94 99 95 - static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status) 100 + static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, 101 + int save_slots) 96 102 { 97 103 struct dlm_rcom *rc = ls->ls_recover_buf; 98 104 struct dlm_member *memb; ··· 112 106 goto out; 113 107 } 114 108 115 - error = dlm_rcom_status(ls, memb->nodeid); 109 + error = dlm_rcom_status(ls, memb->nodeid, 0); 116 110 if (error) 117 111 goto out; 112 + 113 + if (save_slots) 114 + dlm_slot_save(ls, rc, memb); 118 115 119 116 if (rc->rc_result & wait_status) 120 117 break; ··· 130 121 return error; 131 122 } 132 123 133 - static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status) 124 + static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, 125 + uint32_t status_flags) 134 126 { 135 127 struct dlm_rcom *rc = ls->ls_recover_buf; 136 128 int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; ··· 142 132 goto out; 143 133 } 144 134 145 - error = dlm_rcom_status(ls, nodeid); 135 + error = dlm_rcom_status(ls, nodeid, status_flags); 146 136 if (error) 147 137 break; 148 138 ··· 162 152 int error; 163 153 164 154 if (ls->ls_low_nodeid == dlm_our_nodeid()) { 165 - error = wait_status_all(ls, status); 155 + error = wait_status_all(ls, status, 0); 166 156 if (!error) 167 157 dlm_set_recover_status(ls, status_all); 168 158 } else 169 - error = wait_status_low(ls, status_all); 159 + error = wait_status_low(ls, status_all, 0); 170 160 171 161 return error; 172 162 } 173 163 174 164 int dlm_recover_members_wait(struct dlm_ls *ls) 175 165 { 176 - return wait_status(ls, DLM_RS_NODES); 166 + struct dlm_member *memb; 167 + struct dlm_slot *slots; 168 + int num_slots, slots_size; 169 + int error, rv; 170 + uint32_t gen; 171 + 172 + list_for_each_entry(memb, &ls->ls_nodes, list) { 173 + memb->slot = -1; 174 + memb->generation = 0; 175 + } 176 + 177 + if (ls->ls_low_nodeid == dlm_our_nodeid()) { 178 + error = wait_status_all(ls, DLM_RS_NODES, 1); 179 + if (error) 180 + goto out; 181 + 182 + /* slots array is sparse, slots_size may be > num_slots */ 183 + 184 + rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen); 185 + if (!rv) { 186 + spin_lock(&ls->ls_recover_lock); 187 + _set_recover_status(ls, DLM_RS_NODES_ALL); 188 + ls->ls_num_slots = num_slots; 189 + ls->ls_slots_size = slots_size; 190 + ls->ls_slots = slots; 191 + ls->ls_generation = gen; 192 + spin_unlock(&ls->ls_recover_lock); 193 + } else { 194 + dlm_set_recover_status(ls, DLM_RS_NODES_ALL); 195 + } 196 + } else { 197 + error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS); 198 + if (error) 199 + goto out; 200 + 201 + dlm_slots_copy_in(ls); 202 + } 203 + out: 204 + return error; 177 205 } 178 206 179 207 int dlm_recover_directory_wait(struct dlm_ls *ls)