Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.24-rc1 708 lines 14 kB view raw
1/* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12#include <asm/types.h> 13#include <asm/atomic.h> 14 15#include <linux/blkdev.h> 16#include <linux/fs.h> 17#include <linux/init.h> 18#include <linux/list.h> 19#include <linux/mempool.h> 20#include <linux/module.h> 21#include <linux/pagemap.h> 22#include <linux/slab.h> 23#include <linux/vmalloc.h> 24#include <linux/workqueue.h> 25#include <linux/mutex.h> 26 27#include "kcopyd.h" 28 29static struct workqueue_struct *_kcopyd_wq; 30static struct work_struct _kcopyd_work; 31 32static void wake(void) 33{ 34 queue_work(_kcopyd_wq, &_kcopyd_work); 35} 36 37/*----------------------------------------------------------------- 38 * Each kcopyd client has its own little pool of preallocated 39 * pages for kcopyd io. 40 *---------------------------------------------------------------*/ 41struct kcopyd_client { 42 struct list_head list; 43 44 spinlock_t lock; 45 struct page_list *pages; 46 unsigned int nr_pages; 47 unsigned int nr_free_pages; 48 49 struct dm_io_client *io_client; 50 51 wait_queue_head_t destroyq; 52 atomic_t nr_jobs; 53}; 54 55static struct page_list *alloc_pl(void) 56{ 57 struct page_list *pl; 58 59 pl = kmalloc(sizeof(*pl), GFP_KERNEL); 60 if (!pl) 61 return NULL; 62 63 pl->page = alloc_page(GFP_KERNEL); 64 if (!pl->page) { 65 kfree(pl); 66 return NULL; 67 } 68 69 return pl; 70} 71 72static void free_pl(struct page_list *pl) 73{ 74 __free_page(pl->page); 75 kfree(pl); 76} 77 78static int kcopyd_get_pages(struct kcopyd_client *kc, 79 unsigned int nr, struct page_list **pages) 80{ 81 struct page_list *pl; 82 83 spin_lock(&kc->lock); 84 if (kc->nr_free_pages < nr) { 85 spin_unlock(&kc->lock); 86 return -ENOMEM; 87 } 88 89 kc->nr_free_pages -= nr; 90 for (*pages = pl = kc->pages; --nr; pl = pl->next) 91 ; 92 93 kc->pages = pl->next; 94 pl->next = NULL; 95 96 spin_unlock(&kc->lock); 97 98 return 0; 99} 100 101static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl) 102{ 103 struct page_list *cursor; 104 105 spin_lock(&kc->lock); 106 for (cursor = pl; cursor->next; cursor = cursor->next) 107 kc->nr_free_pages++; 108 109 kc->nr_free_pages++; 110 cursor->next = kc->pages; 111 kc->pages = pl; 112 spin_unlock(&kc->lock); 113} 114 115/* 116 * These three functions resize the page pool. 117 */ 118static void drop_pages(struct page_list *pl) 119{ 120 struct page_list *next; 121 122 while (pl) { 123 next = pl->next; 124 free_pl(pl); 125 pl = next; 126 } 127} 128 129static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr) 130{ 131 unsigned int i; 132 struct page_list *pl = NULL, *next; 133 134 for (i = 0; i < nr; i++) { 135 next = alloc_pl(); 136 if (!next) { 137 if (pl) 138 drop_pages(pl); 139 return -ENOMEM; 140 } 141 next->next = pl; 142 pl = next; 143 } 144 145 kcopyd_put_pages(kc, pl); 146 kc->nr_pages += nr; 147 return 0; 148} 149 150static void client_free_pages(struct kcopyd_client *kc) 151{ 152 BUG_ON(kc->nr_free_pages != kc->nr_pages); 153 drop_pages(kc->pages); 154 kc->pages = NULL; 155 kc->nr_free_pages = kc->nr_pages = 0; 156} 157 158/*----------------------------------------------------------------- 159 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 160 * for this reason we use a mempool to prevent the client from 161 * ever having to do io (which could cause a deadlock). 162 *---------------------------------------------------------------*/ 163struct kcopyd_job { 164 struct kcopyd_client *kc; 165 struct list_head list; 166 unsigned long flags; 167 168 /* 169 * Error state of the job. 170 */ 171 int read_err; 172 unsigned int write_err; 173 174 /* 175 * Either READ or WRITE 176 */ 177 int rw; 178 struct io_region source; 179 180 /* 181 * The destinations for the transfer. 182 */ 183 unsigned int num_dests; 184 struct io_region dests[KCOPYD_MAX_REGIONS]; 185 186 sector_t offset; 187 unsigned int nr_pages; 188 struct page_list *pages; 189 190 /* 191 * Set this to ensure you are notified when the job has 192 * completed. 'context' is for callback to use. 193 */ 194 kcopyd_notify_fn fn; 195 void *context; 196 197 /* 198 * These fields are only used if the job has been split 199 * into more manageable parts. 200 */ 201 struct mutex lock; 202 atomic_t sub_jobs; 203 sector_t progress; 204}; 205 206/* FIXME: this should scale with the number of pages */ 207#define MIN_JOBS 512 208 209static struct kmem_cache *_job_cache; 210static mempool_t *_job_pool; 211 212/* 213 * We maintain three lists of jobs: 214 * 215 * i) jobs waiting for pages 216 * ii) jobs that have pages, and are waiting for the io to be issued. 217 * iii) jobs that have completed. 218 * 219 * All three of these are protected by job_lock. 220 */ 221static DEFINE_SPINLOCK(_job_lock); 222 223static LIST_HEAD(_complete_jobs); 224static LIST_HEAD(_io_jobs); 225static LIST_HEAD(_pages_jobs); 226 227static int jobs_init(void) 228{ 229 _job_cache = KMEM_CACHE(kcopyd_job, 0); 230 if (!_job_cache) 231 return -ENOMEM; 232 233 _job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 234 if (!_job_pool) { 235 kmem_cache_destroy(_job_cache); 236 return -ENOMEM; 237 } 238 239 return 0; 240} 241 242static void jobs_exit(void) 243{ 244 BUG_ON(!list_empty(&_complete_jobs)); 245 BUG_ON(!list_empty(&_io_jobs)); 246 BUG_ON(!list_empty(&_pages_jobs)); 247 248 mempool_destroy(_job_pool); 249 kmem_cache_destroy(_job_cache); 250 _job_pool = NULL; 251 _job_cache = NULL; 252} 253 254/* 255 * Functions to push and pop a job onto the head of a given job 256 * list. 257 */ 258static struct kcopyd_job *pop(struct list_head *jobs) 259{ 260 struct kcopyd_job *job = NULL; 261 unsigned long flags; 262 263 spin_lock_irqsave(&_job_lock, flags); 264 265 if (!list_empty(jobs)) { 266 job = list_entry(jobs->next, struct kcopyd_job, list); 267 list_del(&job->list); 268 } 269 spin_unlock_irqrestore(&_job_lock, flags); 270 271 return job; 272} 273 274static void push(struct list_head *jobs, struct kcopyd_job *job) 275{ 276 unsigned long flags; 277 278 spin_lock_irqsave(&_job_lock, flags); 279 list_add_tail(&job->list, jobs); 280 spin_unlock_irqrestore(&_job_lock, flags); 281} 282 283/* 284 * These three functions process 1 item from the corresponding 285 * job list. 286 * 287 * They return: 288 * < 0: error 289 * 0: success 290 * > 0: can't process yet. 291 */ 292static int run_complete_job(struct kcopyd_job *job) 293{ 294 void *context = job->context; 295 int read_err = job->read_err; 296 unsigned int write_err = job->write_err; 297 kcopyd_notify_fn fn = job->fn; 298 struct kcopyd_client *kc = job->kc; 299 300 kcopyd_put_pages(kc, job->pages); 301 mempool_free(job, _job_pool); 302 fn(read_err, write_err, context); 303 304 if (atomic_dec_and_test(&kc->nr_jobs)) 305 wake_up(&kc->destroyq); 306 307 return 0; 308} 309 310static void complete_io(unsigned long error, void *context) 311{ 312 struct kcopyd_job *job = (struct kcopyd_job *) context; 313 314 if (error) { 315 if (job->rw == WRITE) 316 job->write_err |= error; 317 else 318 job->read_err = 1; 319 320 if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { 321 push(&_complete_jobs, job); 322 wake(); 323 return; 324 } 325 } 326 327 if (job->rw == WRITE) 328 push(&_complete_jobs, job); 329 330 else { 331 job->rw = WRITE; 332 push(&_io_jobs, job); 333 } 334 335 wake(); 336} 337 338/* 339 * Request io on as many buffer heads as we can currently get for 340 * a particular job. 341 */ 342static int run_io_job(struct kcopyd_job *job) 343{ 344 int r; 345 struct dm_io_request io_req = { 346 .bi_rw = job->rw, 347 .mem.type = DM_IO_PAGE_LIST, 348 .mem.ptr.pl = job->pages, 349 .mem.offset = job->offset, 350 .notify.fn = complete_io, 351 .notify.context = job, 352 .client = job->kc->io_client, 353 }; 354 355 if (job->rw == READ) 356 r = dm_io(&io_req, 1, &job->source, NULL); 357 else 358 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 359 360 return r; 361} 362 363static int run_pages_job(struct kcopyd_job *job) 364{ 365 int r; 366 367 job->nr_pages = dm_div_up(job->dests[0].count + job->offset, 368 PAGE_SIZE >> 9); 369 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 370 if (!r) { 371 /* this job is ready for io */ 372 push(&_io_jobs, job); 373 return 0; 374 } 375 376 if (r == -ENOMEM) 377 /* can't complete now */ 378 return 1; 379 380 return r; 381} 382 383/* 384 * Run through a list for as long as possible. Returns the count 385 * of successful jobs. 386 */ 387static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) 388{ 389 struct kcopyd_job *job; 390 int r, count = 0; 391 392 while ((job = pop(jobs))) { 393 394 r = fn(job); 395 396 if (r < 0) { 397 /* error this rogue job */ 398 if (job->rw == WRITE) 399 job->write_err = (unsigned int) -1; 400 else 401 job->read_err = 1; 402 push(&_complete_jobs, job); 403 break; 404 } 405 406 if (r > 0) { 407 /* 408 * We couldn't service this job ATM, so 409 * push this job back onto the list. 410 */ 411 push(jobs, job); 412 break; 413 } 414 415 count++; 416 } 417 418 return count; 419} 420 421/* 422 * kcopyd does this every time it's woken up. 423 */ 424static void do_work(struct work_struct *ignored) 425{ 426 /* 427 * The order that these are called is *very* important. 428 * complete jobs can free some pages for pages jobs. 429 * Pages jobs when successful will jump onto the io jobs 430 * list. io jobs call wake when they complete and it all 431 * starts again. 432 */ 433 process_jobs(&_complete_jobs, run_complete_job); 434 process_jobs(&_pages_jobs, run_pages_job); 435 process_jobs(&_io_jobs, run_io_job); 436} 437 438/* 439 * If we are copying a small region we just dispatch a single job 440 * to do the copy, otherwise the io has to be split up into many 441 * jobs. 442 */ 443static void dispatch_job(struct kcopyd_job *job) 444{ 445 atomic_inc(&job->kc->nr_jobs); 446 push(&_pages_jobs, job); 447 wake(); 448} 449 450#define SUB_JOB_SIZE 128 451static void segment_complete(int read_err, 452 unsigned int write_err, void *context) 453{ 454 /* FIXME: tidy this function */ 455 sector_t progress = 0; 456 sector_t count = 0; 457 struct kcopyd_job *job = (struct kcopyd_job *) context; 458 459 mutex_lock(&job->lock); 460 461 /* update the error */ 462 if (read_err) 463 job->read_err = 1; 464 465 if (write_err) 466 job->write_err |= write_err; 467 468 /* 469 * Only dispatch more work if there hasn't been an error. 470 */ 471 if ((!job->read_err && !job->write_err) || 472 test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { 473 /* get the next chunk of work */ 474 progress = job->progress; 475 count = job->source.count - progress; 476 if (count) { 477 if (count > SUB_JOB_SIZE) 478 count = SUB_JOB_SIZE; 479 480 job->progress += count; 481 } 482 } 483 mutex_unlock(&job->lock); 484 485 if (count) { 486 int i; 487 struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO); 488 489 *sub_job = *job; 490 sub_job->source.sector += progress; 491 sub_job->source.count = count; 492 493 for (i = 0; i < job->num_dests; i++) { 494 sub_job->dests[i].sector += progress; 495 sub_job->dests[i].count = count; 496 } 497 498 sub_job->fn = segment_complete; 499 sub_job->context = job; 500 dispatch_job(sub_job); 501 502 } else if (atomic_dec_and_test(&job->sub_jobs)) { 503 504 /* 505 * To avoid a race we must keep the job around 506 * until after the notify function has completed. 507 * Otherwise the client may try and stop the job 508 * after we've completed. 509 */ 510 job->fn(read_err, write_err, job->context); 511 mempool_free(job, _job_pool); 512 } 513} 514 515/* 516 * Create some little jobs that will do the move between 517 * them. 518 */ 519#define SPLIT_COUNT 8 520static void split_job(struct kcopyd_job *job) 521{ 522 int i; 523 524 atomic_set(&job->sub_jobs, SPLIT_COUNT); 525 for (i = 0; i < SPLIT_COUNT; i++) 526 segment_complete(0, 0u, job); 527} 528 529int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from, 530 unsigned int num_dests, struct io_region *dests, 531 unsigned int flags, kcopyd_notify_fn fn, void *context) 532{ 533 struct kcopyd_job *job; 534 535 /* 536 * Allocate a new job. 537 */ 538 job = mempool_alloc(_job_pool, GFP_NOIO); 539 540 /* 541 * set up for the read. 542 */ 543 job->kc = kc; 544 job->flags = flags; 545 job->read_err = 0; 546 job->write_err = 0; 547 job->rw = READ; 548 549 job->source = *from; 550 551 job->num_dests = num_dests; 552 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 553 554 job->offset = 0; 555 job->nr_pages = 0; 556 job->pages = NULL; 557 558 job->fn = fn; 559 job->context = context; 560 561 if (job->source.count < SUB_JOB_SIZE) 562 dispatch_job(job); 563 564 else { 565 mutex_init(&job->lock); 566 job->progress = 0; 567 split_job(job); 568 } 569 570 return 0; 571} 572 573/* 574 * Cancels a kcopyd job, eg. someone might be deactivating a 575 * mirror. 576 */ 577#if 0 578int kcopyd_cancel(struct kcopyd_job *job, int block) 579{ 580 /* FIXME: finish */ 581 return -1; 582} 583#endif /* 0 */ 584 585/*----------------------------------------------------------------- 586 * Unit setup 587 *---------------------------------------------------------------*/ 588static DEFINE_MUTEX(_client_lock); 589static LIST_HEAD(_clients); 590 591static void client_add(struct kcopyd_client *kc) 592{ 593 mutex_lock(&_client_lock); 594 list_add(&kc->list, &_clients); 595 mutex_unlock(&_client_lock); 596} 597 598static void client_del(struct kcopyd_client *kc) 599{ 600 mutex_lock(&_client_lock); 601 list_del(&kc->list); 602 mutex_unlock(&_client_lock); 603} 604 605static DEFINE_MUTEX(kcopyd_init_lock); 606static int kcopyd_clients = 0; 607 608static int kcopyd_init(void) 609{ 610 int r; 611 612 mutex_lock(&kcopyd_init_lock); 613 614 if (kcopyd_clients) { 615 /* Already initialized. */ 616 kcopyd_clients++; 617 mutex_unlock(&kcopyd_init_lock); 618 return 0; 619 } 620 621 r = jobs_init(); 622 if (r) { 623 mutex_unlock(&kcopyd_init_lock); 624 return r; 625 } 626 627 _kcopyd_wq = create_singlethread_workqueue("kcopyd"); 628 if (!_kcopyd_wq) { 629 jobs_exit(); 630 mutex_unlock(&kcopyd_init_lock); 631 return -ENOMEM; 632 } 633 634 kcopyd_clients++; 635 INIT_WORK(&_kcopyd_work, do_work); 636 mutex_unlock(&kcopyd_init_lock); 637 return 0; 638} 639 640static void kcopyd_exit(void) 641{ 642 mutex_lock(&kcopyd_init_lock); 643 kcopyd_clients--; 644 if (!kcopyd_clients) { 645 jobs_exit(); 646 destroy_workqueue(_kcopyd_wq); 647 _kcopyd_wq = NULL; 648 } 649 mutex_unlock(&kcopyd_init_lock); 650} 651 652int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result) 653{ 654 int r = 0; 655 struct kcopyd_client *kc; 656 657 r = kcopyd_init(); 658 if (r) 659 return r; 660 661 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 662 if (!kc) { 663 kcopyd_exit(); 664 return -ENOMEM; 665 } 666 667 spin_lock_init(&kc->lock); 668 kc->pages = NULL; 669 kc->nr_pages = kc->nr_free_pages = 0; 670 r = client_alloc_pages(kc, nr_pages); 671 if (r) { 672 kfree(kc); 673 kcopyd_exit(); 674 return r; 675 } 676 677 kc->io_client = dm_io_client_create(nr_pages); 678 if (IS_ERR(kc->io_client)) { 679 r = PTR_ERR(kc->io_client); 680 client_free_pages(kc); 681 kfree(kc); 682 kcopyd_exit(); 683 return r; 684 } 685 686 init_waitqueue_head(&kc->destroyq); 687 atomic_set(&kc->nr_jobs, 0); 688 689 client_add(kc); 690 *result = kc; 691 return 0; 692} 693 694void kcopyd_client_destroy(struct kcopyd_client *kc) 695{ 696 /* Wait for completion of all jobs submitted by this client. */ 697 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 698 699 dm_io_client_destroy(kc->io_client); 700 client_free_pages(kc); 701 client_del(kc); 702 kfree(kc); 703 kcopyd_exit(); 704} 705 706EXPORT_SYMBOL(kcopyd_client_create); 707EXPORT_SYMBOL(kcopyd_client_destroy); 708EXPORT_SYMBOL(kcopyd_copy);