Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.26-rc1 654 lines 14 kB view raw
1/* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12#include <linux/types.h> 13#include <asm/atomic.h> 14#include <linux/blkdev.h> 15#include <linux/fs.h> 16#include <linux/init.h> 17#include <linux/list.h> 18#include <linux/mempool.h> 19#include <linux/module.h> 20#include <linux/pagemap.h> 21#include <linux/slab.h> 22#include <linux/vmalloc.h> 23#include <linux/workqueue.h> 24#include <linux/mutex.h> 25#include <linux/dm-kcopyd.h> 26 27#include "dm.h" 28 29/*----------------------------------------------------------------- 30 * Each kcopyd client has its own little pool of preallocated 31 * pages for kcopyd io. 32 *---------------------------------------------------------------*/ 33struct dm_kcopyd_client { 34 spinlock_t lock; 35 struct page_list *pages; 36 unsigned int nr_pages; 37 unsigned int nr_free_pages; 38 39 struct dm_io_client *io_client; 40 41 wait_queue_head_t destroyq; 42 atomic_t nr_jobs; 43 44 mempool_t *job_pool; 45 46 struct workqueue_struct *kcopyd_wq; 47 struct work_struct kcopyd_work; 48 49/* 50 * We maintain three lists of jobs: 51 * 52 * i) jobs waiting for pages 53 * ii) jobs that have pages, and are waiting for the io to be issued. 54 * iii) jobs that have completed. 55 * 56 * All three of these are protected by job_lock. 57 */ 58 spinlock_t job_lock; 59 struct list_head complete_jobs; 60 struct list_head io_jobs; 61 struct list_head pages_jobs; 62}; 63 64static void wake(struct dm_kcopyd_client *kc) 65{ 66 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 67} 68 69static struct page_list *alloc_pl(void) 70{ 71 struct page_list *pl; 72 73 pl = kmalloc(sizeof(*pl), GFP_KERNEL); 74 if (!pl) 75 return NULL; 76 77 pl->page = alloc_page(GFP_KERNEL); 78 if (!pl->page) { 79 kfree(pl); 80 return NULL; 81 } 82 83 return pl; 84} 85 86static void free_pl(struct page_list *pl) 87{ 88 __free_page(pl->page); 89 kfree(pl); 90} 91 92static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 93 unsigned int nr, struct page_list **pages) 94{ 95 struct page_list *pl; 96 97 spin_lock(&kc->lock); 98 if (kc->nr_free_pages < nr) { 99 spin_unlock(&kc->lock); 100 return -ENOMEM; 101 } 102 103 kc->nr_free_pages -= nr; 104 for (*pages = pl = kc->pages; --nr; pl = pl->next) 105 ; 106 107 kc->pages = pl->next; 108 pl->next = NULL; 109 110 spin_unlock(&kc->lock); 111 112 return 0; 113} 114 115static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 116{ 117 struct page_list *cursor; 118 119 spin_lock(&kc->lock); 120 for (cursor = pl; cursor->next; cursor = cursor->next) 121 kc->nr_free_pages++; 122 123 kc->nr_free_pages++; 124 cursor->next = kc->pages; 125 kc->pages = pl; 126 spin_unlock(&kc->lock); 127} 128 129/* 130 * These three functions resize the page pool. 131 */ 132static void drop_pages(struct page_list *pl) 133{ 134 struct page_list *next; 135 136 while (pl) { 137 next = pl->next; 138 free_pl(pl); 139 pl = next; 140 } 141} 142 143static int client_alloc_pages(struct dm_kcopyd_client *kc, unsigned int nr) 144{ 145 unsigned int i; 146 struct page_list *pl = NULL, *next; 147 148 for (i = 0; i < nr; i++) { 149 next = alloc_pl(); 150 if (!next) { 151 if (pl) 152 drop_pages(pl); 153 return -ENOMEM; 154 } 155 next->next = pl; 156 pl = next; 157 } 158 159 kcopyd_put_pages(kc, pl); 160 kc->nr_pages += nr; 161 return 0; 162} 163 164static void client_free_pages(struct dm_kcopyd_client *kc) 165{ 166 BUG_ON(kc->nr_free_pages != kc->nr_pages); 167 drop_pages(kc->pages); 168 kc->pages = NULL; 169 kc->nr_free_pages = kc->nr_pages = 0; 170} 171 172/*----------------------------------------------------------------- 173 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 174 * for this reason we use a mempool to prevent the client from 175 * ever having to do io (which could cause a deadlock). 176 *---------------------------------------------------------------*/ 177struct kcopyd_job { 178 struct dm_kcopyd_client *kc; 179 struct list_head list; 180 unsigned long flags; 181 182 /* 183 * Error state of the job. 184 */ 185 int read_err; 186 unsigned long write_err; 187 188 /* 189 * Either READ or WRITE 190 */ 191 int rw; 192 struct dm_io_region source; 193 194 /* 195 * The destinations for the transfer. 196 */ 197 unsigned int num_dests; 198 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 199 200 sector_t offset; 201 unsigned int nr_pages; 202 struct page_list *pages; 203 204 /* 205 * Set this to ensure you are notified when the job has 206 * completed. 'context' is for callback to use. 207 */ 208 dm_kcopyd_notify_fn fn; 209 void *context; 210 211 /* 212 * These fields are only used if the job has been split 213 * into more manageable parts. 214 */ 215 struct mutex lock; 216 atomic_t sub_jobs; 217 sector_t progress; 218}; 219 220/* FIXME: this should scale with the number of pages */ 221#define MIN_JOBS 512 222 223static struct kmem_cache *_job_cache; 224 225int __init dm_kcopyd_init(void) 226{ 227 _job_cache = KMEM_CACHE(kcopyd_job, 0); 228 if (!_job_cache) 229 return -ENOMEM; 230 231 return 0; 232} 233 234void dm_kcopyd_exit(void) 235{ 236 kmem_cache_destroy(_job_cache); 237 _job_cache = NULL; 238} 239 240/* 241 * Functions to push and pop a job onto the head of a given job 242 * list. 243 */ 244static struct kcopyd_job *pop(struct list_head *jobs, 245 struct dm_kcopyd_client *kc) 246{ 247 struct kcopyd_job *job = NULL; 248 unsigned long flags; 249 250 spin_lock_irqsave(&kc->job_lock, flags); 251 252 if (!list_empty(jobs)) { 253 job = list_entry(jobs->next, struct kcopyd_job, list); 254 list_del(&job->list); 255 } 256 spin_unlock_irqrestore(&kc->job_lock, flags); 257 258 return job; 259} 260 261static void push(struct list_head *jobs, struct kcopyd_job *job) 262{ 263 unsigned long flags; 264 struct dm_kcopyd_client *kc = job->kc; 265 266 spin_lock_irqsave(&kc->job_lock, flags); 267 list_add_tail(&job->list, jobs); 268 spin_unlock_irqrestore(&kc->job_lock, flags); 269} 270 271/* 272 * These three functions process 1 item from the corresponding 273 * job list. 274 * 275 * They return: 276 * < 0: error 277 * 0: success 278 * > 0: can't process yet. 279 */ 280static int run_complete_job(struct kcopyd_job *job) 281{ 282 void *context = job->context; 283 int read_err = job->read_err; 284 unsigned long write_err = job->write_err; 285 dm_kcopyd_notify_fn fn = job->fn; 286 struct dm_kcopyd_client *kc = job->kc; 287 288 kcopyd_put_pages(kc, job->pages); 289 mempool_free(job, kc->job_pool); 290 fn(read_err, write_err, context); 291 292 if (atomic_dec_and_test(&kc->nr_jobs)) 293 wake_up(&kc->destroyq); 294 295 return 0; 296} 297 298static void complete_io(unsigned long error, void *context) 299{ 300 struct kcopyd_job *job = (struct kcopyd_job *) context; 301 struct dm_kcopyd_client *kc = job->kc; 302 303 if (error) { 304 if (job->rw == WRITE) 305 job->write_err |= error; 306 else 307 job->read_err = 1; 308 309 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 310 push(&kc->complete_jobs, job); 311 wake(kc); 312 return; 313 } 314 } 315 316 if (job->rw == WRITE) 317 push(&kc->complete_jobs, job); 318 319 else { 320 job->rw = WRITE; 321 push(&kc->io_jobs, job); 322 } 323 324 wake(kc); 325} 326 327/* 328 * Request io on as many buffer heads as we can currently get for 329 * a particular job. 330 */ 331static int run_io_job(struct kcopyd_job *job) 332{ 333 int r; 334 struct dm_io_request io_req = { 335 .bi_rw = job->rw | (1 << BIO_RW_SYNC), 336 .mem.type = DM_IO_PAGE_LIST, 337 .mem.ptr.pl = job->pages, 338 .mem.offset = job->offset, 339 .notify.fn = complete_io, 340 .notify.context = job, 341 .client = job->kc->io_client, 342 }; 343 344 if (job->rw == READ) 345 r = dm_io(&io_req, 1, &job->source, NULL); 346 else 347 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 348 349 return r; 350} 351 352static int run_pages_job(struct kcopyd_job *job) 353{ 354 int r; 355 356 job->nr_pages = dm_div_up(job->dests[0].count + job->offset, 357 PAGE_SIZE >> 9); 358 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 359 if (!r) { 360 /* this job is ready for io */ 361 push(&job->kc->io_jobs, job); 362 return 0; 363 } 364 365 if (r == -ENOMEM) 366 /* can't complete now */ 367 return 1; 368 369 return r; 370} 371 372/* 373 * Run through a list for as long as possible. Returns the count 374 * of successful jobs. 375 */ 376static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 377 int (*fn) (struct kcopyd_job *)) 378{ 379 struct kcopyd_job *job; 380 int r, count = 0; 381 382 while ((job = pop(jobs, kc))) { 383 384 r = fn(job); 385 386 if (r < 0) { 387 /* error this rogue job */ 388 if (job->rw == WRITE) 389 job->write_err = (unsigned long) -1L; 390 else 391 job->read_err = 1; 392 push(&kc->complete_jobs, job); 393 break; 394 } 395 396 if (r > 0) { 397 /* 398 * We couldn't service this job ATM, so 399 * push this job back onto the list. 400 */ 401 push(jobs, job); 402 break; 403 } 404 405 count++; 406 } 407 408 return count; 409} 410 411/* 412 * kcopyd does this every time it's woken up. 413 */ 414static void do_work(struct work_struct *work) 415{ 416 struct dm_kcopyd_client *kc = container_of(work, 417 struct dm_kcopyd_client, kcopyd_work); 418 419 /* 420 * The order that these are called is *very* important. 421 * complete jobs can free some pages for pages jobs. 422 * Pages jobs when successful will jump onto the io jobs 423 * list. io jobs call wake when they complete and it all 424 * starts again. 425 */ 426 process_jobs(&kc->complete_jobs, kc, run_complete_job); 427 process_jobs(&kc->pages_jobs, kc, run_pages_job); 428 process_jobs(&kc->io_jobs, kc, run_io_job); 429} 430 431/* 432 * If we are copying a small region we just dispatch a single job 433 * to do the copy, otherwise the io has to be split up into many 434 * jobs. 435 */ 436static void dispatch_job(struct kcopyd_job *job) 437{ 438 struct dm_kcopyd_client *kc = job->kc; 439 atomic_inc(&kc->nr_jobs); 440 push(&kc->pages_jobs, job); 441 wake(kc); 442} 443 444#define SUB_JOB_SIZE 128 445static void segment_complete(int read_err, unsigned long write_err, 446 void *context) 447{ 448 /* FIXME: tidy this function */ 449 sector_t progress = 0; 450 sector_t count = 0; 451 struct kcopyd_job *job = (struct kcopyd_job *) context; 452 453 mutex_lock(&job->lock); 454 455 /* update the error */ 456 if (read_err) 457 job->read_err = 1; 458 459 if (write_err) 460 job->write_err |= write_err; 461 462 /* 463 * Only dispatch more work if there hasn't been an error. 464 */ 465 if ((!job->read_err && !job->write_err) || 466 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 467 /* get the next chunk of work */ 468 progress = job->progress; 469 count = job->source.count - progress; 470 if (count) { 471 if (count > SUB_JOB_SIZE) 472 count = SUB_JOB_SIZE; 473 474 job->progress += count; 475 } 476 } 477 mutex_unlock(&job->lock); 478 479 if (count) { 480 int i; 481 struct kcopyd_job *sub_job = mempool_alloc(job->kc->job_pool, 482 GFP_NOIO); 483 484 *sub_job = *job; 485 sub_job->source.sector += progress; 486 sub_job->source.count = count; 487 488 for (i = 0; i < job->num_dests; i++) { 489 sub_job->dests[i].sector += progress; 490 sub_job->dests[i].count = count; 491 } 492 493 sub_job->fn = segment_complete; 494 sub_job->context = job; 495 dispatch_job(sub_job); 496 497 } else if (atomic_dec_and_test(&job->sub_jobs)) { 498 499 /* 500 * To avoid a race we must keep the job around 501 * until after the notify function has completed. 502 * Otherwise the client may try and stop the job 503 * after we've completed. 504 */ 505 job->fn(read_err, write_err, job->context); 506 mempool_free(job, job->kc->job_pool); 507 } 508} 509 510/* 511 * Create some little jobs that will do the move between 512 * them. 513 */ 514#define SPLIT_COUNT 8 515static void split_job(struct kcopyd_job *job) 516{ 517 int i; 518 519 atomic_set(&job->sub_jobs, SPLIT_COUNT); 520 for (i = 0; i < SPLIT_COUNT; i++) 521 segment_complete(0, 0u, job); 522} 523 524int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 525 unsigned int num_dests, struct dm_io_region *dests, 526 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 527{ 528 struct kcopyd_job *job; 529 530 /* 531 * Allocate a new job. 532 */ 533 job = mempool_alloc(kc->job_pool, GFP_NOIO); 534 535 /* 536 * set up for the read. 537 */ 538 job->kc = kc; 539 job->flags = flags; 540 job->read_err = 0; 541 job->write_err = 0; 542 job->rw = READ; 543 544 job->source = *from; 545 546 job->num_dests = num_dests; 547 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 548 549 job->offset = 0; 550 job->nr_pages = 0; 551 job->pages = NULL; 552 553 job->fn = fn; 554 job->context = context; 555 556 if (job->source.count < SUB_JOB_SIZE) 557 dispatch_job(job); 558 559 else { 560 mutex_init(&job->lock); 561 job->progress = 0; 562 split_job(job); 563 } 564 565 return 0; 566} 567EXPORT_SYMBOL(dm_kcopyd_copy); 568 569/* 570 * Cancels a kcopyd job, eg. someone might be deactivating a 571 * mirror. 572 */ 573#if 0 574int kcopyd_cancel(struct kcopyd_job *job, int block) 575{ 576 /* FIXME: finish */ 577 return -1; 578} 579#endif /* 0 */ 580 581/*----------------------------------------------------------------- 582 * Client setup 583 *---------------------------------------------------------------*/ 584int dm_kcopyd_client_create(unsigned int nr_pages, 585 struct dm_kcopyd_client **result) 586{ 587 int r = -ENOMEM; 588 struct dm_kcopyd_client *kc; 589 590 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 591 if (!kc) 592 return -ENOMEM; 593 594 spin_lock_init(&kc->lock); 595 spin_lock_init(&kc->job_lock); 596 INIT_LIST_HEAD(&kc->complete_jobs); 597 INIT_LIST_HEAD(&kc->io_jobs); 598 INIT_LIST_HEAD(&kc->pages_jobs); 599 600 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 601 if (!kc->job_pool) 602 goto bad_slab; 603 604 INIT_WORK(&kc->kcopyd_work, do_work); 605 kc->kcopyd_wq = create_singlethread_workqueue("kcopyd"); 606 if (!kc->kcopyd_wq) 607 goto bad_workqueue; 608 609 kc->pages = NULL; 610 kc->nr_pages = kc->nr_free_pages = 0; 611 r = client_alloc_pages(kc, nr_pages); 612 if (r) 613 goto bad_client_pages; 614 615 kc->io_client = dm_io_client_create(nr_pages); 616 if (IS_ERR(kc->io_client)) { 617 r = PTR_ERR(kc->io_client); 618 goto bad_io_client; 619 } 620 621 init_waitqueue_head(&kc->destroyq); 622 atomic_set(&kc->nr_jobs, 0); 623 624 *result = kc; 625 return 0; 626 627bad_io_client: 628 client_free_pages(kc); 629bad_client_pages: 630 destroy_workqueue(kc->kcopyd_wq); 631bad_workqueue: 632 mempool_destroy(kc->job_pool); 633bad_slab: 634 kfree(kc); 635 636 return r; 637} 638EXPORT_SYMBOL(dm_kcopyd_client_create); 639 640void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 641{ 642 /* Wait for completion of all jobs submitted by this client. */ 643 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 644 645 BUG_ON(!list_empty(&kc->complete_jobs)); 646 BUG_ON(!list_empty(&kc->io_jobs)); 647 BUG_ON(!list_empty(&kc->pages_jobs)); 648 destroy_workqueue(kc->kcopyd_wq); 649 dm_io_client_destroy(kc->io_client); 650 client_free_pages(kc); 651 mempool_destroy(kc->job_pool); 652 kfree(kc); 653} 654EXPORT_SYMBOL(dm_kcopyd_client_destroy);