Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v2.6.12-rc3 687 lines 14 kB view raw
1/* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * 4 * This file is released under the GPL. 5 * 6 * Kcopyd provides a simple interface for copying an area of one 7 * block-device to one or more other block-devices, with an asynchronous 8 * completion notification. 9 */ 10 11#include <asm/atomic.h> 12 13#include <linux/blkdev.h> 14#include <linux/config.h> 15#include <linux/fs.h> 16#include <linux/init.h> 17#include <linux/list.h> 18#include <linux/mempool.h> 19#include <linux/module.h> 20#include <linux/pagemap.h> 21#include <linux/slab.h> 22#include <linux/vmalloc.h> 23#include <linux/workqueue.h> 24 25#include "kcopyd.h" 26 27static struct workqueue_struct *_kcopyd_wq; 28static struct work_struct _kcopyd_work; 29 30static inline void wake(void) 31{ 32 queue_work(_kcopyd_wq, &_kcopyd_work); 33} 34 35/*----------------------------------------------------------------- 36 * Each kcopyd client has its own little pool of preallocated 37 * pages for kcopyd io. 38 *---------------------------------------------------------------*/ 39struct kcopyd_client { 40 struct list_head list; 41 42 spinlock_t lock; 43 struct page_list *pages; 44 unsigned int nr_pages; 45 unsigned int nr_free_pages; 46}; 47 48static struct page_list *alloc_pl(void) 49{ 50 struct page_list *pl; 51 52 pl = kmalloc(sizeof(*pl), GFP_KERNEL); 53 if (!pl) 54 return NULL; 55 56 pl->page = alloc_page(GFP_KERNEL); 57 if (!pl->page) { 58 kfree(pl); 59 return NULL; 60 } 61 62 return pl; 63} 64 65static void free_pl(struct page_list *pl) 66{ 67 __free_page(pl->page); 68 kfree(pl); 69} 70 71static int kcopyd_get_pages(struct kcopyd_client *kc, 72 unsigned int nr, struct page_list **pages) 73{ 74 struct page_list *pl; 75 76 spin_lock(&kc->lock); 77 if (kc->nr_free_pages < nr) { 78 spin_unlock(&kc->lock); 79 return -ENOMEM; 80 } 81 82 kc->nr_free_pages -= nr; 83 for (*pages = pl = kc->pages; --nr; pl = pl->next) 84 ; 85 86 kc->pages = pl->next; 87 pl->next = NULL; 88 89 spin_unlock(&kc->lock); 90 91 return 0; 92} 93 94static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl) 95{ 96 struct page_list *cursor; 97 98 spin_lock(&kc->lock); 99 for (cursor = pl; cursor->next; cursor = cursor->next) 100 kc->nr_free_pages++; 101 102 kc->nr_free_pages++; 103 cursor->next = kc->pages; 104 kc->pages = pl; 105 spin_unlock(&kc->lock); 106} 107 108/* 109 * These three functions resize the page pool. 110 */ 111static void drop_pages(struct page_list *pl) 112{ 113 struct page_list *next; 114 115 while (pl) { 116 next = pl->next; 117 free_pl(pl); 118 pl = next; 119 } 120} 121 122static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr) 123{ 124 unsigned int i; 125 struct page_list *pl = NULL, *next; 126 127 for (i = 0; i < nr; i++) { 128 next = alloc_pl(); 129 if (!next) { 130 if (pl) 131 drop_pages(pl); 132 return -ENOMEM; 133 } 134 next->next = pl; 135 pl = next; 136 } 137 138 kcopyd_put_pages(kc, pl); 139 kc->nr_pages += nr; 140 return 0; 141} 142 143static void client_free_pages(struct kcopyd_client *kc) 144{ 145 BUG_ON(kc->nr_free_pages != kc->nr_pages); 146 drop_pages(kc->pages); 147 kc->pages = NULL; 148 kc->nr_free_pages = kc->nr_pages = 0; 149} 150 151/*----------------------------------------------------------------- 152 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 153 * for this reason we use a mempool to prevent the client from 154 * ever having to do io (which could cause a deadlock). 155 *---------------------------------------------------------------*/ 156struct kcopyd_job { 157 struct kcopyd_client *kc; 158 struct list_head list; 159 unsigned long flags; 160 161 /* 162 * Error state of the job. 163 */ 164 int read_err; 165 unsigned int write_err; 166 167 /* 168 * Either READ or WRITE 169 */ 170 int rw; 171 struct io_region source; 172 173 /* 174 * The destinations for the transfer. 175 */ 176 unsigned int num_dests; 177 struct io_region dests[KCOPYD_MAX_REGIONS]; 178 179 sector_t offset; 180 unsigned int nr_pages; 181 struct page_list *pages; 182 183 /* 184 * Set this to ensure you are notified when the job has 185 * completed. 'context' is for callback to use. 186 */ 187 kcopyd_notify_fn fn; 188 void *context; 189 190 /* 191 * These fields are only used if the job has been split 192 * into more manageable parts. 193 */ 194 struct semaphore lock; 195 atomic_t sub_jobs; 196 sector_t progress; 197}; 198 199/* FIXME: this should scale with the number of pages */ 200#define MIN_JOBS 512 201 202static kmem_cache_t *_job_cache; 203static mempool_t *_job_pool; 204 205/* 206 * We maintain three lists of jobs: 207 * 208 * i) jobs waiting for pages 209 * ii) jobs that have pages, and are waiting for the io to be issued. 210 * iii) jobs that have completed. 211 * 212 * All three of these are protected by job_lock. 213 */ 214static DEFINE_SPINLOCK(_job_lock); 215 216static LIST_HEAD(_complete_jobs); 217static LIST_HEAD(_io_jobs); 218static LIST_HEAD(_pages_jobs); 219 220static int jobs_init(void) 221{ 222 _job_cache = kmem_cache_create("kcopyd-jobs", 223 sizeof(struct kcopyd_job), 224 __alignof__(struct kcopyd_job), 225 0, NULL, NULL); 226 if (!_job_cache) 227 return -ENOMEM; 228 229 _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab, 230 mempool_free_slab, _job_cache); 231 if (!_job_pool) { 232 kmem_cache_destroy(_job_cache); 233 return -ENOMEM; 234 } 235 236 return 0; 237} 238 239static void jobs_exit(void) 240{ 241 BUG_ON(!list_empty(&_complete_jobs)); 242 BUG_ON(!list_empty(&_io_jobs)); 243 BUG_ON(!list_empty(&_pages_jobs)); 244 245 mempool_destroy(_job_pool); 246 kmem_cache_destroy(_job_cache); 247 _job_pool = NULL; 248 _job_cache = NULL; 249} 250 251/* 252 * Functions to push and pop a job onto the head of a given job 253 * list. 254 */ 255static inline struct kcopyd_job *pop(struct list_head *jobs) 256{ 257 struct kcopyd_job *job = NULL; 258 unsigned long flags; 259 260 spin_lock_irqsave(&_job_lock, flags); 261 262 if (!list_empty(jobs)) { 263 job = list_entry(jobs->next, struct kcopyd_job, list); 264 list_del(&job->list); 265 } 266 spin_unlock_irqrestore(&_job_lock, flags); 267 268 return job; 269} 270 271static inline void push(struct list_head *jobs, struct kcopyd_job *job) 272{ 273 unsigned long flags; 274 275 spin_lock_irqsave(&_job_lock, flags); 276 list_add_tail(&job->list, jobs); 277 spin_unlock_irqrestore(&_job_lock, flags); 278} 279 280/* 281 * These three functions process 1 item from the corresponding 282 * job list. 283 * 284 * They return: 285 * < 0: error 286 * 0: success 287 * > 0: can't process yet. 288 */ 289static int run_complete_job(struct kcopyd_job *job) 290{ 291 void *context = job->context; 292 int read_err = job->read_err; 293 unsigned int write_err = job->write_err; 294 kcopyd_notify_fn fn = job->fn; 295 296 kcopyd_put_pages(job->kc, job->pages); 297 mempool_free(job, _job_pool); 298 fn(read_err, write_err, context); 299 return 0; 300} 301 302static void complete_io(unsigned long error, void *context) 303{ 304 struct kcopyd_job *job = (struct kcopyd_job *) context; 305 306 if (error) { 307 if (job->rw == WRITE) 308 job->write_err &= error; 309 else 310 job->read_err = 1; 311 312 if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { 313 push(&_complete_jobs, job); 314 wake(); 315 return; 316 } 317 } 318 319 if (job->rw == WRITE) 320 push(&_complete_jobs, job); 321 322 else { 323 job->rw = WRITE; 324 push(&_io_jobs, job); 325 } 326 327 wake(); 328} 329 330/* 331 * Request io on as many buffer heads as we can currently get for 332 * a particular job. 333 */ 334static int run_io_job(struct kcopyd_job *job) 335{ 336 int r; 337 338 if (job->rw == READ) 339 r = dm_io_async(1, &job->source, job->rw, 340 job->pages, 341 job->offset, complete_io, job); 342 343 else 344 r = dm_io_async(job->num_dests, job->dests, job->rw, 345 job->pages, 346 job->offset, complete_io, job); 347 348 return r; 349} 350 351static int run_pages_job(struct kcopyd_job *job) 352{ 353 int r; 354 355 job->nr_pages = dm_div_up(job->dests[0].count + job->offset, 356 PAGE_SIZE >> 9); 357 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 358 if (!r) { 359 /* this job is ready for io */ 360 push(&_io_jobs, job); 361 return 0; 362 } 363 364 if (r == -ENOMEM) 365 /* can't complete now */ 366 return 1; 367 368 return r; 369} 370 371/* 372 * Run through a list for as long as possible. Returns the count 373 * of successful jobs. 374 */ 375static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) 376{ 377 struct kcopyd_job *job; 378 int r, count = 0; 379 380 while ((job = pop(jobs))) { 381 382 r = fn(job); 383 384 if (r < 0) { 385 /* error this rogue job */ 386 if (job->rw == WRITE) 387 job->write_err = (unsigned int) -1; 388 else 389 job->read_err = 1; 390 push(&_complete_jobs, job); 391 break; 392 } 393 394 if (r > 0) { 395 /* 396 * We couldn't service this job ATM, so 397 * push this job back onto the list. 398 */ 399 push(jobs, job); 400 break; 401 } 402 403 count++; 404 } 405 406 return count; 407} 408 409/* 410 * kcopyd does this every time it's woken up. 411 */ 412static void do_work(void *ignored) 413{ 414 /* 415 * The order that these are called is *very* important. 416 * complete jobs can free some pages for pages jobs. 417 * Pages jobs when successful will jump onto the io jobs 418 * list. io jobs call wake when they complete and it all 419 * starts again. 420 */ 421 process_jobs(&_complete_jobs, run_complete_job); 422 process_jobs(&_pages_jobs, run_pages_job); 423 process_jobs(&_io_jobs, run_io_job); 424} 425 426/* 427 * If we are copying a small region we just dispatch a single job 428 * to do the copy, otherwise the io has to be split up into many 429 * jobs. 430 */ 431static void dispatch_job(struct kcopyd_job *job) 432{ 433 push(&_pages_jobs, job); 434 wake(); 435} 436 437#define SUB_JOB_SIZE 128 438static void segment_complete(int read_err, 439 unsigned int write_err, void *context) 440{ 441 /* FIXME: tidy this function */ 442 sector_t progress = 0; 443 sector_t count = 0; 444 struct kcopyd_job *job = (struct kcopyd_job *) context; 445 446 down(&job->lock); 447 448 /* update the error */ 449 if (read_err) 450 job->read_err = 1; 451 452 if (write_err) 453 job->write_err &= write_err; 454 455 /* 456 * Only dispatch more work if there hasn't been an error. 457 */ 458 if ((!job->read_err && !job->write_err) || 459 test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { 460 /* get the next chunk of work */ 461 progress = job->progress; 462 count = job->source.count - progress; 463 if (count) { 464 if (count > SUB_JOB_SIZE) 465 count = SUB_JOB_SIZE; 466 467 job->progress += count; 468 } 469 } 470 up(&job->lock); 471 472 if (count) { 473 int i; 474 struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO); 475 476 *sub_job = *job; 477 sub_job->source.sector += progress; 478 sub_job->source.count = count; 479 480 for (i = 0; i < job->num_dests; i++) { 481 sub_job->dests[i].sector += progress; 482 sub_job->dests[i].count = count; 483 } 484 485 sub_job->fn = segment_complete; 486 sub_job->context = job; 487 dispatch_job(sub_job); 488 489 } else if (atomic_dec_and_test(&job->sub_jobs)) { 490 491 /* 492 * To avoid a race we must keep the job around 493 * until after the notify function has completed. 494 * Otherwise the client may try and stop the job 495 * after we've completed. 496 */ 497 job->fn(read_err, write_err, job->context); 498 mempool_free(job, _job_pool); 499 } 500} 501 502/* 503 * Create some little jobs that will do the move between 504 * them. 505 */ 506#define SPLIT_COUNT 8 507static void split_job(struct kcopyd_job *job) 508{ 509 int i; 510 511 atomic_set(&job->sub_jobs, SPLIT_COUNT); 512 for (i = 0; i < SPLIT_COUNT; i++) 513 segment_complete(0, 0u, job); 514} 515 516int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from, 517 unsigned int num_dests, struct io_region *dests, 518 unsigned int flags, kcopyd_notify_fn fn, void *context) 519{ 520 struct kcopyd_job *job; 521 522 /* 523 * Allocate a new job. 524 */ 525 job = mempool_alloc(_job_pool, GFP_NOIO); 526 527 /* 528 * set up for the read. 529 */ 530 job->kc = kc; 531 job->flags = flags; 532 job->read_err = 0; 533 job->write_err = 0; 534 job->rw = READ; 535 536 job->source = *from; 537 538 job->num_dests = num_dests; 539 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 540 541 job->offset = 0; 542 job->nr_pages = 0; 543 job->pages = NULL; 544 545 job->fn = fn; 546 job->context = context; 547 548 if (job->source.count < SUB_JOB_SIZE) 549 dispatch_job(job); 550 551 else { 552 init_MUTEX(&job->lock); 553 job->progress = 0; 554 split_job(job); 555 } 556 557 return 0; 558} 559 560/* 561 * Cancels a kcopyd job, eg. someone might be deactivating a 562 * mirror. 563 */ 564int kcopyd_cancel(struct kcopyd_job *job, int block) 565{ 566 /* FIXME: finish */ 567 return -1; 568} 569 570/*----------------------------------------------------------------- 571 * Unit setup 572 *---------------------------------------------------------------*/ 573static DECLARE_MUTEX(_client_lock); 574static LIST_HEAD(_clients); 575 576static void client_add(struct kcopyd_client *kc) 577{ 578 down(&_client_lock); 579 list_add(&kc->list, &_clients); 580 up(&_client_lock); 581} 582 583static void client_del(struct kcopyd_client *kc) 584{ 585 down(&_client_lock); 586 list_del(&kc->list); 587 up(&_client_lock); 588} 589 590static DECLARE_MUTEX(kcopyd_init_lock); 591static int kcopyd_clients = 0; 592 593static int kcopyd_init(void) 594{ 595 int r; 596 597 down(&kcopyd_init_lock); 598 599 if (kcopyd_clients) { 600 /* Already initialized. */ 601 kcopyd_clients++; 602 up(&kcopyd_init_lock); 603 return 0; 604 } 605 606 r = jobs_init(); 607 if (r) { 608 up(&kcopyd_init_lock); 609 return r; 610 } 611 612 _kcopyd_wq = create_singlethread_workqueue("kcopyd"); 613 if (!_kcopyd_wq) { 614 jobs_exit(); 615 up(&kcopyd_init_lock); 616 return -ENOMEM; 617 } 618 619 kcopyd_clients++; 620 INIT_WORK(&_kcopyd_work, do_work, NULL); 621 up(&kcopyd_init_lock); 622 return 0; 623} 624 625static void kcopyd_exit(void) 626{ 627 down(&kcopyd_init_lock); 628 kcopyd_clients--; 629 if (!kcopyd_clients) { 630 jobs_exit(); 631 destroy_workqueue(_kcopyd_wq); 632 _kcopyd_wq = NULL; 633 } 634 up(&kcopyd_init_lock); 635} 636 637int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result) 638{ 639 int r = 0; 640 struct kcopyd_client *kc; 641 642 r = kcopyd_init(); 643 if (r) 644 return r; 645 646 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 647 if (!kc) { 648 kcopyd_exit(); 649 return -ENOMEM; 650 } 651 652 spin_lock_init(&kc->lock); 653 kc->pages = NULL; 654 kc->nr_pages = kc->nr_free_pages = 0; 655 r = client_alloc_pages(kc, nr_pages); 656 if (r) { 657 kfree(kc); 658 kcopyd_exit(); 659 return r; 660 } 661 662 r = dm_io_get(nr_pages); 663 if (r) { 664 client_free_pages(kc); 665 kfree(kc); 666 kcopyd_exit(); 667 return r; 668 } 669 670 client_add(kc); 671 *result = kc; 672 return 0; 673} 674 675void kcopyd_client_destroy(struct kcopyd_client *kc) 676{ 677 dm_io_put(kc->nr_pages); 678 client_free_pages(kc); 679 client_del(kc); 680 kfree(kc); 681 kcopyd_exit(); 682} 683 684EXPORT_SYMBOL(kcopyd_client_create); 685EXPORT_SYMBOL(kcopyd_client_destroy); 686EXPORT_SYMBOL(kcopyd_copy); 687EXPORT_SYMBOL(kcopyd_cancel);