dm raid1: fix EIO after log failure

This patch adds the ability to requeue write I/O to
core device-mapper when there is a log device failure.

If a write to the log produces and error, the pending writes are
put on the "failures" list. Since the log is marked as failed,
they will stay on the failures list until a suspend happens.

Suspends come in two phases, presuspend and postsuspend. We must
make sure that all the writes on the failures list are requeued
in the presuspend phase (a requirement of dm core). This means
that recovery must be complete (because writes may be delayed
behind it) and the failures list must be requeued before we
return from presuspend.

The mechanisms to ensure recovery is complete (or stopped) was
already in place, but needed to be moved from postsuspend to
presuspend. We rely on 'flush_workqueue' to ensure that the
mirror thread is complete and therefore, has requeued all writes
in the failures list.

Because we are using flush_workqueue, we must ensure that no
additional 'queue_work' calls will produce additional I/O
that we need to requeue (because once we return from
presuspend, we are unable to do anything about it). 'queue_work'
is called in response to the following functions:
- complete_resync_work = NA, recovery is stopped
- rh_dec (mirror_end_io) = NA, only calls 'queue_work' if it
is ready to recover the region
(recovery is stopped) or it needs
to clear the region in the log*
**this doesn't get called while
suspending**
- rh_recovery_end = NA, recovery is stopped
- rh_recovery_start = NA, recovery is stopped
- write_callback = 1) Writes w/o failures simply call
bio_endio -> mirror_end_io -> rh_dec
(see rh_dec above)
2) Writes with failures are put on
the failures list and queue_work is
called**
** write_callbacks don't happen
during suspend **
- do_failures = NA, 'queue_work' not called if suspending
- add_mirror (initialization) = NA, only done on mirror creation
- queue_bio = NA, 1) delayed I/O scheduled before flush_workqueue
is called. 2) No more I/Os are being issued.
3) Re-attempted READs can still be handled.
(Write completions are handled through rh_dec/
write_callback - mention above - and do not
use queue_bio.)

Signed-off-by: Jonathan Brassow <jbrassow@redhat.com>
Signed-off-by: Alasdair G Kergon <agk@redhat.com>

authored by Jonathan Brassow and committed by Alasdair G Kergon b80aa7a0 8f0205b7

+90 -11
+90 -11
drivers/md/dm-raid1.c
··· 146 region_t nr_regions; 147 int in_sync; 148 int log_failure; 149 150 atomic_t default_mirror; /* Default mirror */ 151 ··· 373 struct region_hash *rh = reg->rh; 374 375 rh->log->type->set_region_sync(rh->log, reg->key, success); 376 dispatch_bios(rh->ms, &reg->delayed_bios); 377 if (atomic_dec_and_test(&rh->recovery_in_flight)) 378 wake_up_all(&_kmirrord_recovery_stopped); ··· 1080 /* 1081 * Dispatch io. 1082 */ 1083 - if (unlikely(ms->log_failure)) 1084 while ((bio = bio_list_pop(&sync))) 1085 - bio_endio(bio, -EIO); 1086 - else while ((bio = bio_list_pop(&sync))) 1087 - do_write(ms, bio); 1088 1089 while ((bio = bio_list_pop(&recover))) 1090 rh_delay(&ms->rh, bio); ··· 1104 if (!failures->head) 1105 return; 1106 1107 - while ((bio = bio_list_pop(failures))) 1108 - __bio_mark_nosync(ms, bio, bio->bi_size, 0); 1109 } 1110 1111 static void trigger_event(struct work_struct *work) ··· 1227 ms->nr_mirrors = nr_mirrors; 1228 ms->nr_regions = dm_sector_div_up(ti->len, region_size); 1229 ms->in_sync = 0; 1230 atomic_set(&ms->default_mirror, DEFAULT_MIRROR); 1231 1232 ms->io_client = dm_io_client_create(DM_IO_PAGES); ··· 1564 return 0; 1565 } 1566 1567 - static void mirror_postsuspend(struct dm_target *ti) 1568 { 1569 struct mirror_set *ms = (struct mirror_set *) ti->private; 1570 struct dirty_log *log = ms->rh.log; 1571 1572 rh_stop_recovery(&ms->rh); 1573 1574 - /* Wait for all I/O we generated to complete */ 1575 wait_event(_kmirrord_recovery_stopped, 1576 !atomic_read(&ms->rh.recovery_in_flight)); 1577 1578 if (log->type->postsuspend && log->type->postsuspend(log)) 1579 /* FIXME: need better error handling */ 1580 - DMWARN("log suspend failed"); 1581 } 1582 1583 static void mirror_resume(struct dm_target *ti) 1584 { 1585 - struct mirror_set *ms = (struct mirror_set *) ti->private; 1586 struct dirty_log *log = ms->rh.log; 1587 if (log->type->resume && log->type->resume(log)) 1588 /* FIXME: need better error handling */ 1589 DMWARN("log resume failed"); ··· 1642 DMEMIT("%d", ms->nr_mirrors); 1643 for (m = 0; m < ms->nr_mirrors; m++) 1644 DMEMIT(" %s %llu", ms->mirror[m].dev->name, 1645 - (unsigned long long)ms->mirror[m].offset); 1646 1647 if (ms->features & DM_RAID1_HANDLE_ERRORS) 1648 DMEMIT(" 1 handle_errors"); ··· 1659 .dtr = mirror_dtr, 1660 .map = mirror_map, 1661 .end_io = mirror_end_io, 1662 .postsuspend = mirror_postsuspend, 1663 .resume = mirror_resume, 1664 .status = mirror_status,
··· 146 region_t nr_regions; 147 int in_sync; 148 int log_failure; 149 + atomic_t suspend; 150 151 atomic_t default_mirror; /* Default mirror */ 152 ··· 372 struct region_hash *rh = reg->rh; 373 374 rh->log->type->set_region_sync(rh->log, reg->key, success); 375 + 376 + /* 377 + * Dispatch the bios before we call 'wake_up_all'. 378 + * This is important because if we are suspending, 379 + * we want to know that recovery is complete and 380 + * the work queue is flushed. If we wake_up_all 381 + * before we dispatch_bios (queue bios and call wake()), 382 + * then we risk suspending before the work queue 383 + * has been properly flushed. 384 + */ 385 dispatch_bios(rh->ms, &reg->delayed_bios); 386 if (atomic_dec_and_test(&rh->recovery_in_flight)) 387 wake_up_all(&_kmirrord_recovery_stopped); ··· 1069 /* 1070 * Dispatch io. 1071 */ 1072 + if (unlikely(ms->log_failure)) { 1073 + spin_lock_irq(&ms->lock); 1074 + bio_list_merge(&ms->failures, &sync); 1075 + spin_unlock_irq(&ms->lock); 1076 + } else 1077 while ((bio = bio_list_pop(&sync))) 1078 + do_write(ms, bio); 1079 1080 while ((bio = bio_list_pop(&recover))) 1081 rh_delay(&ms->rh, bio); ··· 1091 if (!failures->head) 1092 return; 1093 1094 + if (!ms->log_failure) { 1095 + while ((bio = bio_list_pop(failures))) 1096 + __bio_mark_nosync(ms, bio, bio->bi_size, 0); 1097 + return; 1098 + } 1099 + 1100 + /* 1101 + * If the log has failed, unattempted writes are being 1102 + * put on the failures list. We can't issue those writes 1103 + * until a log has been marked, so we must store them. 1104 + * 1105 + * If a 'noflush' suspend is in progress, we can requeue 1106 + * the I/O's to the core. This give userspace a chance 1107 + * to reconfigure the mirror, at which point the core 1108 + * will reissue the writes. If the 'noflush' flag is 1109 + * not set, we have no choice but to return errors. 1110 + * 1111 + * Some writes on the failures list may have been 1112 + * submitted before the log failure and represent a 1113 + * failure to write to one of the devices. It is ok 1114 + * for us to treat them the same and requeue them 1115 + * as well. 1116 + */ 1117 + if (dm_noflush_suspending(ms->ti)) { 1118 + while ((bio = bio_list_pop(failures))) 1119 + bio_endio(bio, DM_ENDIO_REQUEUE); 1120 + return; 1121 + } 1122 + 1123 + if (atomic_read(&ms->suspend)) { 1124 + while ((bio = bio_list_pop(failures))) 1125 + bio_endio(bio, -EIO); 1126 + return; 1127 + } 1128 + 1129 + spin_lock_irq(&ms->lock); 1130 + bio_list_merge(&ms->failures, failures); 1131 + spin_unlock_irq(&ms->lock); 1132 + 1133 + wake(ms); 1134 } 1135 1136 static void trigger_event(struct work_struct *work) ··· 1176 ms->nr_mirrors = nr_mirrors; 1177 ms->nr_regions = dm_sector_div_up(ti->len, region_size); 1178 ms->in_sync = 0; 1179 + ms->log_failure = 0; 1180 + atomic_set(&ms->suspend, 0); 1181 atomic_set(&ms->default_mirror, DEFAULT_MIRROR); 1182 1183 ms->io_client = dm_io_client_create(DM_IO_PAGES); ··· 1511 return 0; 1512 } 1513 1514 + static void mirror_presuspend(struct dm_target *ti) 1515 { 1516 struct mirror_set *ms = (struct mirror_set *) ti->private; 1517 struct dirty_log *log = ms->rh.log; 1518 1519 + atomic_set(&ms->suspend, 1); 1520 + 1521 + /* 1522 + * We must finish up all the work that we've 1523 + * generated (i.e. recovery work). 1524 + */ 1525 rh_stop_recovery(&ms->rh); 1526 1527 wait_event(_kmirrord_recovery_stopped, 1528 !atomic_read(&ms->rh.recovery_in_flight)); 1529 1530 + if (log->type->presuspend && log->type->presuspend(log)) 1531 + /* FIXME: need better error handling */ 1532 + DMWARN("log presuspend failed"); 1533 + 1534 + /* 1535 + * Now that recovery is complete/stopped and the 1536 + * delayed bios are queued, we need to wait for 1537 + * the worker thread to complete. This way, 1538 + * we know that all of our I/O has been pushed. 1539 + */ 1540 + flush_workqueue(ms->kmirrord_wq); 1541 + } 1542 + 1543 + static void mirror_postsuspend(struct dm_target *ti) 1544 + { 1545 + struct mirror_set *ms = ti->private; 1546 + struct dirty_log *log = ms->rh.log; 1547 + 1548 if (log->type->postsuspend && log->type->postsuspend(log)) 1549 /* FIXME: need better error handling */ 1550 + DMWARN("log postsuspend failed"); 1551 } 1552 1553 static void mirror_resume(struct dm_target *ti) 1554 { 1555 + struct mirror_set *ms = ti->private; 1556 struct dirty_log *log = ms->rh.log; 1557 + 1558 + atomic_set(&ms->suspend, 0); 1559 if (log->type->resume && log->type->resume(log)) 1560 /* FIXME: need better error handling */ 1561 DMWARN("log resume failed"); ··· 1564 DMEMIT("%d", ms->nr_mirrors); 1565 for (m = 0; m < ms->nr_mirrors; m++) 1566 DMEMIT(" %s %llu", ms->mirror[m].dev->name, 1567 + (unsigned long long)ms->mirror[m].offset); 1568 1569 if (ms->features & DM_RAID1_HANDLE_ERRORS) 1570 DMEMIT(" 1 handle_errors"); ··· 1581 .dtr = mirror_dtr, 1582 .map = mirror_map, 1583 .end_io = mirror_end_io, 1584 + .presuspend = mirror_presuspend, 1585 .postsuspend = mirror_postsuspend, 1586 .resume = mirror_resume, 1587 .status = mirror_status,