Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

GFS2: dlm based recovery coordination

This new method of managing recovery is an alternative to
the previous approach of using the userland gfs_controld.

- use dlm slot numbers to assign journal id's
- use dlm recovery callbacks to initiate journal recovery
- use a dlm lock to determine the first node to mount fs
- use a dlm lock to track journals that need recovery

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>

authored by

David Teigland and committed by
Steven Whitehouse
e0c2a9aa e343a895

+1098 -42
+1 -1
fs/gfs2/glock.c
··· 1353 1353 spin_lock(&gl->gl_spin); 1354 1354 gl->gl_reply = ret; 1355 1355 1356 - if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_flags))) { 1356 + if (unlikely(test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags))) { 1357 1357 if (gfs2_should_freeze(gl)) { 1358 1358 set_bit(GLF_FROZEN, &gl->gl_flags); 1359 1359 spin_unlock(&gl->gl_spin);
+5 -2
fs/gfs2/glock.h
··· 121 121 122 122 struct lm_lockops { 123 123 const char *lm_proto_name; 124 - int (*lm_mount) (struct gfs2_sbd *sdp, const char *fsname); 125 - void (*lm_unmount) (struct gfs2_sbd *sdp); 124 + int (*lm_mount) (struct gfs2_sbd *sdp, const char *table); 125 + void (*lm_first_done) (struct gfs2_sbd *sdp); 126 + void (*lm_recovery_result) (struct gfs2_sbd *sdp, unsigned int jid, 127 + unsigned int result); 128 + void (*lm_unmount) (struct gfs2_sbd *sdp); 126 129 void (*lm_withdraw) (struct gfs2_sbd *sdp); 127 130 void (*lm_put_lock) (struct gfs2_glock *gl); 128 131 int (*lm_lock) (struct gfs2_glock *gl, unsigned int req_state,
+54 -4
fs/gfs2/incore.h
··· 139 139 #define GDLM_STRNAME_BYTES 25 140 140 #define GDLM_LVB_SIZE 32 141 141 142 + /* 143 + * ls_recover_flags: 144 + * 145 + * DFL_BLOCK_LOCKS: dlm is in recovery and will grant locks that had been 146 + * held by failed nodes whose journals need recovery. Those locks should 147 + * only be used for journal recovery until the journal recovery is done. 148 + * This is set by the dlm recover_prep callback and cleared by the 149 + * gfs2_control thread when journal recovery is complete. To avoid 150 + * races between recover_prep setting and gfs2_control clearing, recover_spin 151 + * is held while changing this bit and reading/writing recover_block 152 + * and recover_start. 153 + * 154 + * DFL_NO_DLM_OPS: dlm lockspace ops/callbacks are not being used. 155 + * 156 + * DFL_FIRST_MOUNT: this node is the first to mount this fs and is doing 157 + * recovery of all journals before allowing other nodes to mount the fs. 158 + * This is cleared when FIRST_MOUNT_DONE is set. 159 + * 160 + * DFL_FIRST_MOUNT_DONE: this node was the first mounter, and has finished 161 + * recovery of all journals, and now allows other nodes to mount the fs. 162 + * 163 + * DFL_MOUNT_DONE: gdlm_mount has completed successfully and cleared 164 + * BLOCK_LOCKS for the first time. The gfs2_control thread should now 165 + * control clearing BLOCK_LOCKS for further recoveries. 166 + * 167 + * DFL_UNMOUNT: gdlm_unmount sets to keep sdp off gfs2_control_wq. 168 + * 169 + * DFL_DLM_RECOVERY: set while dlm is in recovery, between recover_prep() 170 + * and recover_done(), i.e. set while recover_block == recover_start. 171 + */ 172 + 142 173 enum { 143 174 DFL_BLOCK_LOCKS = 0, 175 + DFL_NO_DLM_OPS = 1, 176 + DFL_FIRST_MOUNT = 2, 177 + DFL_FIRST_MOUNT_DONE = 3, 178 + DFL_MOUNT_DONE = 4, 179 + DFL_UNMOUNT = 5, 180 + DFL_DLM_RECOVERY = 6, 144 181 }; 145 182 146 183 struct lm_lockname { ··· 536 499 struct lm_lockstruct { 537 500 int ls_jid; 538 501 unsigned int ls_first; 539 - unsigned int ls_first_done; 540 502 unsigned int ls_nodir; 541 503 const struct lm_lockops *ls_ops; 542 - unsigned long ls_flags; 543 504 dlm_lockspace_t *ls_dlm; 544 505 545 - int ls_recover_jid_done; 546 - int ls_recover_jid_status; 506 + int ls_recover_jid_done; /* These two are deprecated, */ 507 + int ls_recover_jid_status; /* used previously by gfs_controld */ 508 + 509 + struct dlm_lksb ls_mounted_lksb; /* mounted_lock */ 510 + struct dlm_lksb ls_control_lksb; /* control_lock */ 511 + char ls_control_lvb[GDLM_LVB_SIZE]; /* control_lock lvb */ 512 + struct completion ls_sync_wait; /* {control,mounted}_{lock,unlock} */ 513 + 514 + spinlock_t ls_recover_spin; /* protects following fields */ 515 + unsigned long ls_recover_flags; /* DFL_ */ 516 + uint32_t ls_recover_mount; /* gen in first recover_done cb */ 517 + uint32_t ls_recover_start; /* gen in last recover_done cb */ 518 + uint32_t ls_recover_block; /* copy recover_start in last recover_prep */ 519 + uint32_t ls_recover_size; /* size of recover_submit, recover_result */ 520 + uint32_t *ls_recover_submit; /* gen in last recover_slot cb per jid */ 521 + uint32_t *ls_recover_result; /* result of last jid recovery */ 547 522 }; 548 523 549 524 struct gfs2_sbd { ··· 593 544 wait_queue_head_t sd_glock_wait; 594 545 atomic_t sd_glock_disposal; 595 546 struct completion sd_locking_init; 547 + struct delayed_work sd_control_work; 596 548 597 549 /* Inode Stuff */ 598 550
+983 -10
fs/gfs2/lock_dlm.c
··· 1 1 /* 2 2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 3 - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 3 + * Copyright 2004-2011 Red Hat, Inc. 4 4 * 5 5 * This copyrighted material is made available to anyone wishing to use, 6 6 * modify, copy, or redistribute it subject to the terms and conditions ··· 11 11 #include <linux/dlm.h> 12 12 #include <linux/slab.h> 13 13 #include <linux/types.h> 14 + #include <linux/delay.h> 14 15 #include <linux/gfs2_ondisk.h> 15 16 16 17 #include "incore.h" 17 18 #include "glock.h" 18 19 #include "util.h" 20 + #include "sys.h" 19 21 22 + extern struct workqueue_struct *gfs2_control_wq; 20 23 21 24 static void gdlm_ast(void *arg) 22 25 { ··· 188 185 dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl); 189 186 } 190 187 191 - static int gdlm_mount(struct gfs2_sbd *sdp, const char *fsname) 188 + /* 189 + * dlm/gfs2 recovery coordination using dlm_recover callbacks 190 + * 191 + * 1. dlm_controld sees lockspace members change 192 + * 2. dlm_controld blocks dlm-kernel locking activity 193 + * 3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep) 194 + * 4. dlm_controld starts and finishes its own user level recovery 195 + * 5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery 196 + * 6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot) 197 + * 7. dlm_recoverd does its own lock recovery 198 + * 8. dlm_recoverd unblocks dlm-kernel locking activity 199 + * 9. dlm_recoverd notifies gfs2 when done (recover_done with new generation) 200 + * 10. gfs2_control updates control_lock lvb with new generation and jid bits 201 + * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none) 202 + * 12. gfs2_recover dequeues and recovers journals of failed nodes 203 + * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result) 204 + * 14. gfs2_control updates control_lock lvb jid bits for recovered journals 205 + * 15. gfs2_control unblocks normal locking when all journals are recovered 206 + * 207 + * - failures during recovery 208 + * 209 + * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control 210 + * clears BLOCK_LOCKS (step 15), e.g. another node fails while still 211 + * recovering for a prior failure. gfs2_control needs a way to detect 212 + * this so it can leave BLOCK_LOCKS set in step 15. This is managed using 213 + * the recover_block and recover_start values. 214 + * 215 + * recover_done() provides a new lockspace generation number each time it 216 + * is called (step 9). This generation number is saved as recover_start. 217 + * When recover_prep() is called, it sets BLOCK_LOCKS and sets 218 + * recover_block = recover_start. So, while recover_block is equal to 219 + * recover_start, BLOCK_LOCKS should remain set. (recover_spin must 220 + * be held around the BLOCK_LOCKS/recover_block/recover_start logic.) 221 + * 222 + * - more specific gfs2 steps in sequence above 223 + * 224 + * 3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start 225 + * 6. recover_slot records any failed jids (maybe none) 226 + * 9. recover_done sets recover_start = new generation number 227 + * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids 228 + * 12. gfs2_recover does journal recoveries for failed jids identified above 229 + * 14. gfs2_control clears control_lock lvb bits for recovered jids 230 + * 15. gfs2_control checks if recover_block == recover_start (step 3 occured 231 + * again) then do nothing, otherwise if recover_start > recover_block 232 + * then clear BLOCK_LOCKS. 233 + * 234 + * - parallel recovery steps across all nodes 235 + * 236 + * All nodes attempt to update the control_lock lvb with the new generation 237 + * number and jid bits, but only the first to get the control_lock EX will 238 + * do so; others will see that it's already done (lvb already contains new 239 + * generation number.) 240 + * 241 + * . All nodes get the same recover_prep/recover_slot/recover_done callbacks 242 + * . All nodes attempt to set control_lock lvb gen + bits for the new gen 243 + * . One node gets control_lock first and writes the lvb, others see it's done 244 + * . All nodes attempt to recover jids for which they see control_lock bits set 245 + * . One node succeeds for a jid, and that one clears the jid bit in the lvb 246 + * . All nodes will eventually see all lvb bits clear and unblock locks 247 + * 248 + * - is there a problem with clearing an lvb bit that should be set 249 + * and missing a journal recovery? 250 + * 251 + * 1. jid fails 252 + * 2. lvb bit set for step 1 253 + * 3. jid recovered for step 1 254 + * 4. jid taken again (new mount) 255 + * 5. jid fails (for step 4) 256 + * 6. lvb bit set for step 5 (will already be set) 257 + * 7. lvb bit cleared for step 3 258 + * 259 + * This is not a problem because the failure in step 5 does not 260 + * require recovery, because the mount in step 4 could not have 261 + * progressed far enough to unblock locks and access the fs. The 262 + * control_mount() function waits for all recoveries to be complete 263 + * for the latest lockspace generation before ever unblocking locks 264 + * and returning. The mount in step 4 waits until the recovery in 265 + * step 1 is done. 266 + * 267 + * - special case of first mounter: first node to mount the fs 268 + * 269 + * The first node to mount a gfs2 fs needs to check all the journals 270 + * and recover any that need recovery before other nodes are allowed 271 + * to mount the fs. (Others may begin mounting, but they must wait 272 + * for the first mounter to be done before taking locks on the fs 273 + * or accessing the fs.) This has two parts: 274 + * 275 + * 1. The mounted_lock tells a node it's the first to mount the fs. 276 + * Each node holds the mounted_lock in PR while it's mounted. 277 + * Each node tries to acquire the mounted_lock in EX when it mounts. 278 + * If a node is granted the mounted_lock EX it means there are no 279 + * other mounted nodes (no PR locks exist), and it is the first mounter. 280 + * The mounted_lock is demoted to PR when first recovery is done, so 281 + * others will fail to get an EX lock, but will get a PR lock. 282 + * 283 + * 2. The control_lock blocks others in control_mount() while the first 284 + * mounter is doing first mount recovery of all journals. 285 + * A mounting node needs to acquire control_lock in EX mode before 286 + * it can proceed. The first mounter holds control_lock in EX while doing 287 + * the first mount recovery, blocking mounts from other nodes, then demotes 288 + * control_lock to NL when it's done (others_may_mount/first_done), 289 + * allowing other nodes to continue mounting. 290 + * 291 + * first mounter: 292 + * control_lock EX/NOQUEUE success 293 + * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters) 294 + * set first=1 295 + * do first mounter recovery 296 + * mounted_lock EX->PR 297 + * control_lock EX->NL, write lvb generation 298 + * 299 + * other mounter: 300 + * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry) 301 + * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR) 302 + * mounted_lock PR/NOQUEUE success 303 + * read lvb generation 304 + * control_lock EX->NL 305 + * set first=0 306 + * 307 + * - mount during recovery 308 + * 309 + * If a node mounts while others are doing recovery (not first mounter), 310 + * the mounting node will get its initial recover_done() callback without 311 + * having seen any previous failures/callbacks. 312 + * 313 + * It must wait for all recoveries preceding its mount to be finished 314 + * before it unblocks locks. It does this by repeating the "other mounter" 315 + * steps above until the lvb generation number is >= its mount generation 316 + * number (from initial recover_done) and all lvb bits are clear. 317 + * 318 + * - control_lock lvb format 319 + * 320 + * 4 bytes generation number: the latest dlm lockspace generation number 321 + * from recover_done callback. Indicates the jid bitmap has been updated 322 + * to reflect all slot failures through that generation. 323 + * 4 bytes unused. 324 + * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates 325 + * that jid N needs recovery. 326 + */ 327 + 328 + #define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */ 329 + 330 + static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen, 331 + char *lvb_bits) 332 + { 333 + uint32_t gen; 334 + memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE); 335 + memcpy(&gen, lvb_bits, sizeof(uint32_t)); 336 + *lvb_gen = le32_to_cpu(gen); 337 + } 338 + 339 + static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen, 340 + char *lvb_bits) 341 + { 342 + uint32_t gen; 343 + memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE); 344 + gen = cpu_to_le32(lvb_gen); 345 + memcpy(ls->ls_control_lvb, &gen, sizeof(uint32_t)); 346 + } 347 + 348 + static int all_jid_bits_clear(char *lvb) 349 + { 350 + int i; 351 + for (i = JID_BITMAP_OFFSET; i < GDLM_LVB_SIZE; i++) { 352 + if (lvb[i]) 353 + return 0; 354 + } 355 + return 1; 356 + } 357 + 358 + static void sync_wait_cb(void *arg) 359 + { 360 + struct lm_lockstruct *ls = arg; 361 + complete(&ls->ls_sync_wait); 362 + } 363 + 364 + static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name) 192 365 { 193 366 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 194 367 int error; 195 368 196 - if (fsname == NULL) { 197 - fs_info(sdp, "no fsname found\n"); 198 - return -EINVAL; 369 + error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls); 370 + if (error) { 371 + fs_err(sdp, "%s lkid %x error %d\n", 372 + name, lksb->sb_lkid, error); 373 + return error; 199 374 } 200 375 201 - error = dlm_new_lockspace(fsname, NULL, 202 - DLM_LSFL_FS | DLM_LSFL_NEWEXCL | 203 - (ls->ls_nodir ? DLM_LSFL_NODIR : 0), 204 - GDLM_LVB_SIZE, NULL, NULL, NULL, &ls->ls_dlm); 376 + wait_for_completion(&ls->ls_sync_wait); 377 + 378 + if (lksb->sb_status != -DLM_EUNLOCK) { 379 + fs_err(sdp, "%s lkid %x status %d\n", 380 + name, lksb->sb_lkid, lksb->sb_status); 381 + return -1; 382 + } 383 + return 0; 384 + } 385 + 386 + static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags, 387 + unsigned int num, struct dlm_lksb *lksb, char *name) 388 + { 389 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 390 + char strname[GDLM_STRNAME_BYTES]; 391 + int error, status; 392 + 393 + memset(strname, 0, GDLM_STRNAME_BYTES); 394 + snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num); 395 + 396 + error = dlm_lock(ls->ls_dlm, mode, lksb, flags, 397 + strname, GDLM_STRNAME_BYTES - 1, 398 + 0, sync_wait_cb, ls, NULL); 399 + if (error) { 400 + fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n", 401 + name, lksb->sb_lkid, flags, mode, error); 402 + return error; 403 + } 404 + 405 + wait_for_completion(&ls->ls_sync_wait); 406 + 407 + status = lksb->sb_status; 408 + 409 + if (status && status != -EAGAIN) { 410 + fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n", 411 + name, lksb->sb_lkid, flags, mode, status); 412 + } 413 + 414 + return status; 415 + } 416 + 417 + static int mounted_unlock(struct gfs2_sbd *sdp) 418 + { 419 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 420 + return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock"); 421 + } 422 + 423 + static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags) 424 + { 425 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 426 + return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK, 427 + &ls->ls_mounted_lksb, "mounted_lock"); 428 + } 429 + 430 + static int control_unlock(struct gfs2_sbd *sdp) 431 + { 432 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 433 + return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock"); 434 + } 435 + 436 + static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags) 437 + { 438 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 439 + return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK, 440 + &ls->ls_control_lksb, "control_lock"); 441 + } 442 + 443 + static void gfs2_control_func(struct work_struct *work) 444 + { 445 + struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work); 446 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 447 + char lvb_bits[GDLM_LVB_SIZE]; 448 + uint32_t block_gen, start_gen, lvb_gen, flags; 449 + int recover_set = 0; 450 + int write_lvb = 0; 451 + int recover_size; 452 + int i, error; 453 + 454 + spin_lock(&ls->ls_recover_spin); 455 + /* 456 + * No MOUNT_DONE means we're still mounting; control_mount() 457 + * will set this flag, after which this thread will take over 458 + * all further clearing of BLOCK_LOCKS. 459 + * 460 + * FIRST_MOUNT means this node is doing first mounter recovery, 461 + * for which recovery control is handled by 462 + * control_mount()/control_first_done(), not this thread. 463 + */ 464 + if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) || 465 + test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) { 466 + spin_unlock(&ls->ls_recover_spin); 467 + return; 468 + } 469 + block_gen = ls->ls_recover_block; 470 + start_gen = ls->ls_recover_start; 471 + spin_unlock(&ls->ls_recover_spin); 472 + 473 + /* 474 + * Equal block_gen and start_gen implies we are between 475 + * recover_prep and recover_done callbacks, which means 476 + * dlm recovery is in progress and dlm locking is blocked. 477 + * There's no point trying to do any work until recover_done. 478 + */ 479 + 480 + if (block_gen == start_gen) 481 + return; 482 + 483 + /* 484 + * Propagate recover_submit[] and recover_result[] to lvb: 485 + * dlm_recoverd adds to recover_submit[] jids needing recovery 486 + * gfs2_recover adds to recover_result[] journal recovery results 487 + * 488 + * set lvb bit for jids in recover_submit[] if the lvb has not 489 + * yet been updated for the generation of the failure 490 + * 491 + * clear lvb bit for jids in recover_result[] if the result of 492 + * the journal recovery is SUCCESS 493 + */ 494 + 495 + error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 496 + if (error) { 497 + fs_err(sdp, "control lock EX error %d\n", error); 498 + return; 499 + } 500 + 501 + control_lvb_read(ls, &lvb_gen, lvb_bits); 502 + 503 + spin_lock(&ls->ls_recover_spin); 504 + if (block_gen != ls->ls_recover_block || 505 + start_gen != ls->ls_recover_start) { 506 + fs_info(sdp, "recover generation %u block1 %u %u\n", 507 + start_gen, block_gen, ls->ls_recover_block); 508 + spin_unlock(&ls->ls_recover_spin); 509 + control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT); 510 + return; 511 + } 512 + 513 + recover_size = ls->ls_recover_size; 514 + 515 + if (lvb_gen <= start_gen) { 516 + /* 517 + * Clear lvb bits for jids we've successfully recovered. 518 + * Because all nodes attempt to recover failed journals, 519 + * a journal can be recovered multiple times successfully 520 + * in succession. Only the first will really do recovery, 521 + * the others find it clean, but still report a successful 522 + * recovery. So, another node may have already recovered 523 + * the jid and cleared the lvb bit for it. 524 + */ 525 + for (i = 0; i < recover_size; i++) { 526 + if (ls->ls_recover_result[i] != LM_RD_SUCCESS) 527 + continue; 528 + 529 + ls->ls_recover_result[i] = 0; 530 + 531 + if (!test_bit_le(i, lvb_bits + JID_BITMAP_OFFSET)) 532 + continue; 533 + 534 + __clear_bit_le(i, lvb_bits + JID_BITMAP_OFFSET); 535 + write_lvb = 1; 536 + } 537 + } 538 + 539 + if (lvb_gen == start_gen) { 540 + /* 541 + * Failed slots before start_gen are already set in lvb. 542 + */ 543 + for (i = 0; i < recover_size; i++) { 544 + if (!ls->ls_recover_submit[i]) 545 + continue; 546 + if (ls->ls_recover_submit[i] < lvb_gen) 547 + ls->ls_recover_submit[i] = 0; 548 + } 549 + } else if (lvb_gen < start_gen) { 550 + /* 551 + * Failed slots before start_gen are not yet set in lvb. 552 + */ 553 + for (i = 0; i < recover_size; i++) { 554 + if (!ls->ls_recover_submit[i]) 555 + continue; 556 + if (ls->ls_recover_submit[i] < start_gen) { 557 + ls->ls_recover_submit[i] = 0; 558 + __set_bit_le(i, lvb_bits + JID_BITMAP_OFFSET); 559 + } 560 + } 561 + /* even if there are no bits to set, we need to write the 562 + latest generation to the lvb */ 563 + write_lvb = 1; 564 + } else { 565 + /* 566 + * we should be getting a recover_done() for lvb_gen soon 567 + */ 568 + } 569 + spin_unlock(&ls->ls_recover_spin); 570 + 571 + if (write_lvb) { 572 + control_lvb_write(ls, start_gen, lvb_bits); 573 + flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK; 574 + } else { 575 + flags = DLM_LKF_CONVERT; 576 + } 577 + 578 + error = control_lock(sdp, DLM_LOCK_NL, flags); 579 + if (error) { 580 + fs_err(sdp, "control lock NL error %d\n", error); 581 + return; 582 + } 583 + 584 + /* 585 + * Everyone will see jid bits set in the lvb, run gfs2_recover_set(), 586 + * and clear a jid bit in the lvb if the recovery is a success. 587 + * Eventually all journals will be recovered, all jid bits will 588 + * be cleared in the lvb, and everyone will clear BLOCK_LOCKS. 589 + */ 590 + 591 + for (i = 0; i < recover_size; i++) { 592 + if (test_bit_le(i, lvb_bits + JID_BITMAP_OFFSET)) { 593 + fs_info(sdp, "recover generation %u jid %d\n", 594 + start_gen, i); 595 + gfs2_recover_set(sdp, i); 596 + recover_set++; 597 + } 598 + } 599 + if (recover_set) 600 + return; 601 + 602 + /* 603 + * No more jid bits set in lvb, all recovery is done, unblock locks 604 + * (unless a new recover_prep callback has occured blocking locks 605 + * again while working above) 606 + */ 607 + 608 + spin_lock(&ls->ls_recover_spin); 609 + if (ls->ls_recover_block == block_gen && 610 + ls->ls_recover_start == start_gen) { 611 + clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 612 + spin_unlock(&ls->ls_recover_spin); 613 + fs_info(sdp, "recover generation %u done\n", start_gen); 614 + gfs2_glock_thaw(sdp); 615 + } else { 616 + fs_info(sdp, "recover generation %u block2 %u %u\n", 617 + start_gen, block_gen, ls->ls_recover_block); 618 + spin_unlock(&ls->ls_recover_spin); 619 + } 620 + } 621 + 622 + static int control_mount(struct gfs2_sbd *sdp) 623 + { 624 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 625 + char lvb_bits[GDLM_LVB_SIZE]; 626 + uint32_t start_gen, block_gen, mount_gen, lvb_gen; 627 + int mounted_mode; 628 + int retries = 0; 629 + int error; 630 + 631 + memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb)); 632 + memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb)); 633 + memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE); 634 + ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb; 635 + init_completion(&ls->ls_sync_wait); 636 + 637 + set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 638 + 639 + error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK); 640 + if (error) { 641 + fs_err(sdp, "control_mount control_lock NL error %d\n", error); 642 + return error; 643 + } 644 + 645 + error = mounted_lock(sdp, DLM_LOCK_NL, 0); 646 + if (error) { 647 + fs_err(sdp, "control_mount mounted_lock NL error %d\n", error); 648 + control_unlock(sdp); 649 + return error; 650 + } 651 + mounted_mode = DLM_LOCK_NL; 652 + 653 + restart: 654 + if (retries++ && signal_pending(current)) { 655 + error = -EINTR; 656 + goto fail; 657 + } 658 + 659 + /* 660 + * We always start with both locks in NL. control_lock is 661 + * demoted to NL below so we don't need to do it here. 662 + */ 663 + 664 + if (mounted_mode != DLM_LOCK_NL) { 665 + error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT); 666 + if (error) 667 + goto fail; 668 + mounted_mode = DLM_LOCK_NL; 669 + } 670 + 671 + /* 672 + * Other nodes need to do some work in dlm recovery and gfs2_control 673 + * before the recover_done and control_lock will be ready for us below. 674 + * A delay here is not required but often avoids having to retry. 675 + */ 676 + 677 + msleep_interruptible(500); 678 + 679 + /* 680 + * Acquire control_lock in EX and mounted_lock in either EX or PR. 681 + * control_lock lvb keeps track of any pending journal recoveries. 682 + * mounted_lock indicates if any other nodes have the fs mounted. 683 + */ 684 + 685 + error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK); 686 + if (error == -EAGAIN) { 687 + goto restart; 688 + } else if (error) { 689 + fs_err(sdp, "control_mount control_lock EX error %d\n", error); 690 + goto fail; 691 + } 692 + 693 + error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE); 694 + if (!error) { 695 + mounted_mode = DLM_LOCK_EX; 696 + goto locks_done; 697 + } else if (error != -EAGAIN) { 698 + fs_err(sdp, "control_mount mounted_lock EX error %d\n", error); 699 + goto fail; 700 + } 701 + 702 + error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE); 703 + if (!error) { 704 + mounted_mode = DLM_LOCK_PR; 705 + goto locks_done; 706 + } else { 707 + /* not even -EAGAIN should happen here */ 708 + fs_err(sdp, "control_mount mounted_lock PR error %d\n", error); 709 + goto fail; 710 + } 711 + 712 + locks_done: 713 + /* 714 + * If we got both locks above in EX, then we're the first mounter. 715 + * If not, then we need to wait for the control_lock lvb to be 716 + * updated by other mounted nodes to reflect our mount generation. 717 + * 718 + * In simple first mounter cases, first mounter will see zero lvb_gen, 719 + * but in cases where all existing nodes leave/fail before mounting 720 + * nodes finish control_mount, then all nodes will be mounting and 721 + * lvb_gen will be non-zero. 722 + */ 723 + 724 + control_lvb_read(ls, &lvb_gen, lvb_bits); 725 + 726 + if (lvb_gen == 0xFFFFFFFF) { 727 + /* special value to force mount attempts to fail */ 728 + fs_err(sdp, "control_mount control_lock disabled\n"); 729 + error = -EINVAL; 730 + goto fail; 731 + } 732 + 733 + if (mounted_mode == DLM_LOCK_EX) { 734 + /* first mounter, keep both EX while doing first recovery */ 735 + spin_lock(&ls->ls_recover_spin); 736 + clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 737 + set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags); 738 + set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags); 739 + spin_unlock(&ls->ls_recover_spin); 740 + fs_info(sdp, "first mounter control generation %u\n", lvb_gen); 741 + return 0; 742 + } 743 + 744 + error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT); 205 745 if (error) 206 - printk(KERN_ERR "dlm_new_lockspace error %d", error); 746 + goto fail; 747 + 748 + /* 749 + * We are not first mounter, now we need to wait for the control_lock 750 + * lvb generation to be >= the generation from our first recover_done 751 + * and all lvb bits to be clear (no pending journal recoveries.) 752 + */ 753 + 754 + if (!all_jid_bits_clear(lvb_bits)) { 755 + /* journals need recovery, wait until all are clear */ 756 + fs_info(sdp, "control_mount wait for journal recovery\n"); 757 + goto restart; 758 + } 759 + 760 + spin_lock(&ls->ls_recover_spin); 761 + block_gen = ls->ls_recover_block; 762 + start_gen = ls->ls_recover_start; 763 + mount_gen = ls->ls_recover_mount; 764 + 765 + if (lvb_gen < mount_gen) { 766 + /* wait for mounted nodes to update control_lock lvb to our 767 + generation, which might include new recovery bits set */ 768 + fs_info(sdp, "control_mount wait1 block %u start %u mount %u " 769 + "lvb %u flags %lx\n", block_gen, start_gen, mount_gen, 770 + lvb_gen, ls->ls_recover_flags); 771 + spin_unlock(&ls->ls_recover_spin); 772 + goto restart; 773 + } 774 + 775 + if (lvb_gen != start_gen) { 776 + /* wait for mounted nodes to update control_lock lvb to the 777 + latest recovery generation */ 778 + fs_info(sdp, "control_mount wait2 block %u start %u mount %u " 779 + "lvb %u flags %lx\n", block_gen, start_gen, mount_gen, 780 + lvb_gen, ls->ls_recover_flags); 781 + spin_unlock(&ls->ls_recover_spin); 782 + goto restart; 783 + } 784 + 785 + if (block_gen == start_gen) { 786 + /* dlm recovery in progress, wait for it to finish */ 787 + fs_info(sdp, "control_mount wait3 block %u start %u mount %u " 788 + "lvb %u flags %lx\n", block_gen, start_gen, mount_gen, 789 + lvb_gen, ls->ls_recover_flags); 790 + spin_unlock(&ls->ls_recover_spin); 791 + goto restart; 792 + } 793 + 794 + clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 795 + set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags); 796 + memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t)); 797 + memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t)); 798 + spin_unlock(&ls->ls_recover_spin); 799 + return 0; 800 + 801 + fail: 802 + mounted_unlock(sdp); 803 + control_unlock(sdp); 804 + return error; 805 + } 806 + 807 + static int dlm_recovery_wait(void *word) 808 + { 809 + schedule(); 810 + return 0; 811 + } 812 + 813 + static int control_first_done(struct gfs2_sbd *sdp) 814 + { 815 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 816 + char lvb_bits[GDLM_LVB_SIZE]; 817 + uint32_t start_gen, block_gen; 818 + int error; 819 + 820 + restart: 821 + spin_lock(&ls->ls_recover_spin); 822 + start_gen = ls->ls_recover_start; 823 + block_gen = ls->ls_recover_block; 824 + 825 + if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) || 826 + !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) || 827 + !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) { 828 + /* sanity check, should not happen */ 829 + fs_err(sdp, "control_first_done start %u block %u flags %lx\n", 830 + start_gen, block_gen, ls->ls_recover_flags); 831 + spin_unlock(&ls->ls_recover_spin); 832 + control_unlock(sdp); 833 + return -1; 834 + } 835 + 836 + if (start_gen == block_gen) { 837 + /* 838 + * Wait for the end of a dlm recovery cycle to switch from 839 + * first mounter recovery. We can ignore any recover_slot 840 + * callbacks between the recover_prep and next recover_done 841 + * because we are still the first mounter and any failed nodes 842 + * have not fully mounted, so they don't need recovery. 843 + */ 844 + spin_unlock(&ls->ls_recover_spin); 845 + fs_info(sdp, "control_first_done wait gen %u\n", start_gen); 846 + 847 + wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY, 848 + dlm_recovery_wait, TASK_UNINTERRUPTIBLE); 849 + goto restart; 850 + } 851 + 852 + clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags); 853 + set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags); 854 + memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t)); 855 + memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t)); 856 + spin_unlock(&ls->ls_recover_spin); 857 + 858 + memset(lvb_bits, 0, sizeof(lvb_bits)); 859 + control_lvb_write(ls, start_gen, lvb_bits); 860 + 861 + error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT); 862 + if (error) 863 + fs_err(sdp, "control_first_done mounted PR error %d\n", error); 864 + 865 + error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 866 + if (error) 867 + fs_err(sdp, "control_first_done control NL error %d\n", error); 207 868 208 869 return error; 870 + } 871 + 872 + /* 873 + * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC) 874 + * to accomodate the largest slot number. (NB dlm slot numbers start at 1, 875 + * gfs2 jids start at 0, so jid = slot - 1) 876 + */ 877 + 878 + #define RECOVER_SIZE_INC 16 879 + 880 + static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots, 881 + int num_slots) 882 + { 883 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 884 + uint32_t *submit = NULL; 885 + uint32_t *result = NULL; 886 + uint32_t old_size, new_size; 887 + int i, max_jid; 888 + 889 + max_jid = 0; 890 + for (i = 0; i < num_slots; i++) { 891 + if (max_jid < slots[i].slot - 1) 892 + max_jid = slots[i].slot - 1; 893 + } 894 + 895 + old_size = ls->ls_recover_size; 896 + 897 + if (old_size >= max_jid + 1) 898 + return 0; 899 + 900 + new_size = old_size + RECOVER_SIZE_INC; 901 + 902 + submit = kzalloc(new_size * sizeof(uint32_t), GFP_NOFS); 903 + result = kzalloc(new_size * sizeof(uint32_t), GFP_NOFS); 904 + if (!submit || !result) { 905 + kfree(submit); 906 + kfree(result); 907 + return -ENOMEM; 908 + } 909 + 910 + spin_lock(&ls->ls_recover_spin); 911 + memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t)); 912 + memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t)); 913 + kfree(ls->ls_recover_submit); 914 + kfree(ls->ls_recover_result); 915 + ls->ls_recover_submit = submit; 916 + ls->ls_recover_result = result; 917 + ls->ls_recover_size = new_size; 918 + spin_unlock(&ls->ls_recover_spin); 919 + return 0; 920 + } 921 + 922 + static void free_recover_size(struct lm_lockstruct *ls) 923 + { 924 + kfree(ls->ls_recover_submit); 925 + kfree(ls->ls_recover_result); 926 + ls->ls_recover_submit = NULL; 927 + ls->ls_recover_result = NULL; 928 + ls->ls_recover_size = 0; 929 + } 930 + 931 + /* dlm calls before it does lock recovery */ 932 + 933 + static void gdlm_recover_prep(void *arg) 934 + { 935 + struct gfs2_sbd *sdp = arg; 936 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 937 + 938 + spin_lock(&ls->ls_recover_spin); 939 + ls->ls_recover_block = ls->ls_recover_start; 940 + set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags); 941 + 942 + if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) || 943 + test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) { 944 + spin_unlock(&ls->ls_recover_spin); 945 + return; 946 + } 947 + set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 948 + spin_unlock(&ls->ls_recover_spin); 949 + } 950 + 951 + /* dlm calls after recover_prep has been completed on all lockspace members; 952 + identifies slot/jid of failed member */ 953 + 954 + static void gdlm_recover_slot(void *arg, struct dlm_slot *slot) 955 + { 956 + struct gfs2_sbd *sdp = arg; 957 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 958 + int jid = slot->slot - 1; 959 + 960 + spin_lock(&ls->ls_recover_spin); 961 + if (ls->ls_recover_size < jid + 1) { 962 + fs_err(sdp, "recover_slot jid %d gen %u short size %d", 963 + jid, ls->ls_recover_block, ls->ls_recover_size); 964 + spin_unlock(&ls->ls_recover_spin); 965 + return; 966 + } 967 + 968 + if (ls->ls_recover_submit[jid]) { 969 + fs_info(sdp, "recover_slot jid %d gen %u prev %u", 970 + jid, ls->ls_recover_block, ls->ls_recover_submit[jid]); 971 + } 972 + ls->ls_recover_submit[jid] = ls->ls_recover_block; 973 + spin_unlock(&ls->ls_recover_spin); 974 + } 975 + 976 + /* dlm calls after recover_slot and after it completes lock recovery */ 977 + 978 + static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots, 979 + int our_slot, uint32_t generation) 980 + { 981 + struct gfs2_sbd *sdp = arg; 982 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 983 + 984 + /* ensure the ls jid arrays are large enough */ 985 + set_recover_size(sdp, slots, num_slots); 986 + 987 + spin_lock(&ls->ls_recover_spin); 988 + ls->ls_recover_start = generation; 989 + 990 + if (!ls->ls_recover_mount) { 991 + ls->ls_recover_mount = generation; 992 + ls->ls_jid = our_slot - 1; 993 + } 994 + 995 + if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags)) 996 + queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0); 997 + 998 + clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags); 999 + smp_mb__after_clear_bit(); 1000 + wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY); 1001 + spin_unlock(&ls->ls_recover_spin); 1002 + } 1003 + 1004 + /* gfs2_recover thread has a journal recovery result */ 1005 + 1006 + static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid, 1007 + unsigned int result) 1008 + { 1009 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 1010 + 1011 + if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags)) 1012 + return; 1013 + 1014 + /* don't care about the recovery of own journal during mount */ 1015 + if (jid == ls->ls_jid) 1016 + return; 1017 + 1018 + spin_lock(&ls->ls_recover_spin); 1019 + if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) { 1020 + spin_unlock(&ls->ls_recover_spin); 1021 + return; 1022 + } 1023 + if (ls->ls_recover_size < jid + 1) { 1024 + fs_err(sdp, "recovery_result jid %d short size %d", 1025 + jid, ls->ls_recover_size); 1026 + spin_unlock(&ls->ls_recover_spin); 1027 + return; 1028 + } 1029 + 1030 + fs_info(sdp, "recover jid %d result %s\n", jid, 1031 + result == LM_RD_GAVEUP ? "busy" : "success"); 1032 + 1033 + ls->ls_recover_result[jid] = result; 1034 + 1035 + /* GAVEUP means another node is recovering the journal; delay our 1036 + next attempt to recover it, to give the other node a chance to 1037 + finish before trying again */ 1038 + 1039 + if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags)) 1040 + queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 1041 + result == LM_RD_GAVEUP ? HZ : 0); 1042 + spin_unlock(&ls->ls_recover_spin); 1043 + } 1044 + 1045 + const struct dlm_lockspace_ops gdlm_lockspace_ops = { 1046 + .recover_prep = gdlm_recover_prep, 1047 + .recover_slot = gdlm_recover_slot, 1048 + .recover_done = gdlm_recover_done, 1049 + }; 1050 + 1051 + static int gdlm_mount(struct gfs2_sbd *sdp, const char *table) 1052 + { 1053 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 1054 + char cluster[GFS2_LOCKNAME_LEN]; 1055 + const char *fsname; 1056 + uint32_t flags; 1057 + int error, ops_result; 1058 + 1059 + /* 1060 + * initialize everything 1061 + */ 1062 + 1063 + INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func); 1064 + spin_lock_init(&ls->ls_recover_spin); 1065 + ls->ls_recover_flags = 0; 1066 + ls->ls_recover_mount = 0; 1067 + ls->ls_recover_start = 0; 1068 + ls->ls_recover_block = 0; 1069 + ls->ls_recover_size = 0; 1070 + ls->ls_recover_submit = NULL; 1071 + ls->ls_recover_result = NULL; 1072 + 1073 + error = set_recover_size(sdp, NULL, 0); 1074 + if (error) 1075 + goto fail; 1076 + 1077 + /* 1078 + * prepare dlm_new_lockspace args 1079 + */ 1080 + 1081 + fsname = strchr(table, ':'); 1082 + if (!fsname) { 1083 + fs_info(sdp, "no fsname found\n"); 1084 + error = -EINVAL; 1085 + goto fail_free; 1086 + } 1087 + memset(cluster, 0, sizeof(cluster)); 1088 + memcpy(cluster, table, strlen(table) - strlen(fsname)); 1089 + fsname++; 1090 + 1091 + flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL; 1092 + if (ls->ls_nodir) 1093 + flags |= DLM_LSFL_NODIR; 1094 + 1095 + /* 1096 + * create/join lockspace 1097 + */ 1098 + 1099 + error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE, 1100 + &gdlm_lockspace_ops, sdp, &ops_result, 1101 + &ls->ls_dlm); 1102 + if (error) { 1103 + fs_err(sdp, "dlm_new_lockspace error %d\n", error); 1104 + goto fail_free; 1105 + } 1106 + 1107 + if (ops_result < 0) { 1108 + /* 1109 + * dlm does not support ops callbacks, 1110 + * old dlm_controld/gfs_controld are used, try without ops. 1111 + */ 1112 + fs_info(sdp, "dlm lockspace ops not used\n"); 1113 + free_recover_size(ls); 1114 + set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags); 1115 + return 0; 1116 + } 1117 + 1118 + if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) { 1119 + fs_err(sdp, "dlm lockspace ops disallow jid preset\n"); 1120 + error = -EINVAL; 1121 + goto fail_release; 1122 + } 1123 + 1124 + /* 1125 + * control_mount() uses control_lock to determine first mounter, 1126 + * and for later mounts, waits for any recoveries to be cleared. 1127 + */ 1128 + 1129 + error = control_mount(sdp); 1130 + if (error) { 1131 + fs_err(sdp, "mount control error %d\n", error); 1132 + goto fail_release; 1133 + } 1134 + 1135 + ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags); 1136 + clear_bit(SDF_NOJOURNALID, &sdp->sd_flags); 1137 + smp_mb__after_clear_bit(); 1138 + wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID); 1139 + return 0; 1140 + 1141 + fail_release: 1142 + dlm_release_lockspace(ls->ls_dlm, 2); 1143 + fail_free: 1144 + free_recover_size(ls); 1145 + fail: 1146 + return error; 1147 + } 1148 + 1149 + static void gdlm_first_done(struct gfs2_sbd *sdp) 1150 + { 1151 + struct lm_lockstruct *ls = &sdp->sd_lockstruct; 1152 + int error; 1153 + 1154 + if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags)) 1155 + return; 1156 + 1157 + error = control_first_done(sdp); 1158 + if (error) 1159 + fs_err(sdp, "mount first_done error %d\n", error); 209 1160 } 210 1161 211 1162 static void gdlm_unmount(struct gfs2_sbd *sdp) 212 1163 { 213 1164 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 214 1165 1166 + if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags)) 1167 + goto release; 1168 + 1169 + /* wait for gfs2_control_wq to be done with this mount */ 1170 + 1171 + spin_lock(&ls->ls_recover_spin); 1172 + set_bit(DFL_UNMOUNT, &ls->ls_recover_flags); 1173 + spin_unlock(&ls->ls_recover_spin); 1174 + flush_delayed_work_sync(&sdp->sd_control_work); 1175 + 1176 + /* mounted_lock and control_lock will be purged in dlm recovery */ 1177 + release: 215 1178 if (ls->ls_dlm) { 216 1179 dlm_release_lockspace(ls->ls_dlm, 2); 217 1180 ls->ls_dlm = NULL; 218 1181 } 1182 + 1183 + free_recover_size(ls); 219 1184 } 220 1185 221 1186 static const match_table_t dlm_tokens = { ··· 1197 226 const struct lm_lockops gfs2_dlm_ops = { 1198 227 .lm_proto_name = "lock_dlm", 1199 228 .lm_mount = gdlm_mount, 229 + .lm_first_done = gdlm_first_done, 230 + .lm_recovery_result = gdlm_recovery_result, 1200 231 .lm_unmount = gdlm_unmount, 1201 232 .lm_put_lock = gdlm_put_lock, 1202 233 .lm_lock = gdlm_lock,
+10
fs/gfs2/main.c
··· 28 28 #include "recovery.h" 29 29 #include "dir.h" 30 30 31 + struct workqueue_struct *gfs2_control_wq; 32 + 31 33 static struct shrinker qd_shrinker = { 32 34 .shrink = gfs2_shrink_qd_memory, 33 35 .seeks = DEFAULT_SEEKS, ··· 148 146 if (!gfs_recovery_wq) 149 147 goto fail_wq; 150 148 149 + gfs2_control_wq = alloc_workqueue("gfs2_control", 150 + WQ_NON_REENTRANT | WQ_UNBOUND | WQ_FREEZABLE, 0); 151 + if (!gfs2_control_wq) 152 + goto fail_control; 153 + 151 154 gfs2_register_debugfs(); 152 155 153 156 printk("GFS2 installed\n"); 154 157 155 158 return 0; 156 159 160 + fail_control: 161 + destroy_workqueue(gfs_recovery_wq); 157 162 fail_wq: 158 163 unregister_filesystem(&gfs2meta_fs_type); 159 164 fail_unregister: ··· 204 195 unregister_filesystem(&gfs2_fs_type); 205 196 unregister_filesystem(&gfs2meta_fs_type); 206 197 destroy_workqueue(gfs_recovery_wq); 198 + destroy_workqueue(gfs2_control_wq); 207 199 208 200 rcu_barrier(); 209 201
+16 -13
fs/gfs2/ops_fstype.c
··· 562 562 { 563 563 char *message = "FIRSTMOUNT=Done"; 564 564 char *envp[] = { message, NULL }; 565 - struct lm_lockstruct *ls = &sdp->sd_lockstruct; 566 - ls->ls_first_done = 1; 565 + 566 + fs_info(sdp, "first mount done, others may mount\n"); 567 + 568 + if (sdp->sd_lockstruct.ls_ops->lm_first_done) 569 + sdp->sd_lockstruct.ls_ops->lm_first_done(sdp); 570 + 567 571 kobject_uevent_env(&sdp->sd_kobj, KOBJ_CHANGE, envp); 568 572 } 569 573 ··· 948 944 struct gfs2_args *args = &sdp->sd_args; 949 945 const char *proto = sdp->sd_proto_name; 950 946 const char *table = sdp->sd_table_name; 951 - const char *fsname; 952 947 char *o, *options; 953 948 int ret; 954 949 ··· 1007 1004 } 1008 1005 } 1009 1006 1010 - if (sdp->sd_args.ar_spectator) 1011 - snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s.s", table); 1012 - else 1013 - snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s.%u", table, 1014 - sdp->sd_lockstruct.ls_jid); 1015 - 1016 - fsname = strchr(table, ':'); 1017 - if (fsname) 1018 - fsname++; 1019 1007 if (lm->lm_mount == NULL) { 1020 1008 fs_info(sdp, "Now mounting FS...\n"); 1021 1009 complete_all(&sdp->sd_locking_init); 1022 1010 return 0; 1023 1011 } 1024 - ret = lm->lm_mount(sdp, fsname); 1012 + ret = lm->lm_mount(sdp, table); 1025 1013 if (ret == 0) 1026 1014 fs_info(sdp, "Joined cluster. Now mounting FS...\n"); 1027 1015 complete_all(&sdp->sd_locking_init); ··· 1118 1124 if (error) 1119 1125 goto fail; 1120 1126 1127 + snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s", sdp->sd_table_name); 1128 + 1121 1129 gfs2_create_debugfs_file(sdp); 1122 1130 1123 1131 error = gfs2_sys_fs_add(sdp); ··· 1155 1159 sdp->sd_lockstruct.ls_jid = 0; 1156 1160 goto fail_sb; 1157 1161 } 1162 + 1163 + if (sdp->sd_args.ar_spectator) 1164 + snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s.s", 1165 + sdp->sd_table_name); 1166 + else 1167 + snprintf(sdp->sd_fsname, GFS2_FSNAME_LEN, "%s.%u", 1168 + sdp->sd_table_name, sdp->sd_lockstruct.ls_jid); 1158 1169 1159 1170 error = init_inodes(sdp, DO); 1160 1171 if (error)
+4
fs/gfs2/recovery.c
··· 436 436 char env_status[20]; 437 437 char *envp[] = { env_jid, env_status, NULL }; 438 438 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 439 + 439 440 ls->ls_recover_jid_done = jid; 440 441 ls->ls_recover_jid_status = message; 441 442 sprintf(env_jid, "JID=%d", jid); 442 443 sprintf(env_status, "RECOVERY=%s", 443 444 message == LM_RD_SUCCESS ? "Done" : "Failed"); 444 445 kobject_uevent_env(&sdp->sd_kobj, KOBJ_CHANGE, envp); 446 + 447 + if (sdp->sd_lockstruct.ls_ops->lm_recovery_result) 448 + sdp->sd_lockstruct.ls_ops->lm_recovery_result(sdp, jid, message); 445 449 } 446 450 447 451 void gfs2_recover_func(struct work_struct *work)
+21 -12
fs/gfs2/sys.c
··· 298 298 ssize_t ret; 299 299 int val = 0; 300 300 301 - if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_flags)) 301 + if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags)) 302 302 val = 1; 303 303 ret = sprintf(buf, "%d\n", val); 304 304 return ret; ··· 313 313 val = simple_strtol(buf, NULL, 0); 314 314 315 315 if (val == 1) 316 - set_bit(DFL_BLOCK_LOCKS, &ls->ls_flags); 316 + set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 317 317 else if (val == 0) { 318 - clear_bit(DFL_BLOCK_LOCKS, &ls->ls_flags); 318 + clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags); 319 319 smp_mb__after_clear_bit(); 320 320 gfs2_glock_thaw(sdp); 321 321 } else { ··· 350 350 goto out; 351 351 if (sdp->sd_lockstruct.ls_ops->lm_mount == NULL) 352 352 goto out; 353 - sdp->sd_lockstruct.ls_first = first; 354 - rv = 0; 353 + sdp->sd_lockstruct.ls_first = first; 354 + rv = 0; 355 355 out: 356 356 spin_unlock(&sdp->sd_jindex_spin); 357 357 return rv ? rv : len; ··· 360 360 static ssize_t first_done_show(struct gfs2_sbd *sdp, char *buf) 361 361 { 362 362 struct lm_lockstruct *ls = &sdp->sd_lockstruct; 363 - return sprintf(buf, "%d\n", ls->ls_first_done); 363 + return sprintf(buf, "%d\n", !!test_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags)); 364 364 } 365 365 366 - static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len) 366 + int gfs2_recover_set(struct gfs2_sbd *sdp, unsigned jid) 367 367 { 368 - unsigned jid; 369 368 struct gfs2_jdesc *jd; 370 369 int rv; 371 - 372 - rv = sscanf(buf, "%u", &jid); 373 - if (rv != 1) 374 - return -EINVAL; 375 370 376 371 rv = -ESHUTDOWN; 377 372 spin_lock(&sdp->sd_jindex_spin); ··· 384 389 } 385 390 out: 386 391 spin_unlock(&sdp->sd_jindex_spin); 392 + return rv; 393 + } 394 + 395 + static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len) 396 + { 397 + unsigned jid; 398 + int rv; 399 + 400 + rv = sscanf(buf, "%u", &jid); 401 + if (rv != 1) 402 + return -EINVAL; 403 + 404 + rv = gfs2_recover_set(sdp, jid); 405 + 387 406 return rv ? rv : len; 388 407 } 389 408
+2
fs/gfs2/sys.h
··· 19 19 int gfs2_sys_init(void); 20 20 void gfs2_sys_uninit(void); 21 21 22 + int gfs2_recover_set(struct gfs2_sbd *sdp, unsigned jid); 23 + 22 24 #endif /* __SYS_DOT_H__ */ 23 25
+2
include/linux/gfs2_ondisk.h
··· 22 22 #define GFS2_LIVE_LOCK 1 23 23 #define GFS2_TRANS_LOCK 2 24 24 #define GFS2_RENAME_LOCK 3 25 + #define GFS2_CONTROL_LOCK 4 26 + #define GFS2_MOUNTED_LOCK 5 25 27 26 28 /* Format numbers for various metadata types */ 27 29