Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.6-rc3 1217 lines 29 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/mm.h> 13#include <linux/mmu_context.h> 14#include <linux/sched/mm.h> 15#include <linux/percpu.h> 16#include <linux/slab.h> 17#include <linux/kthread.h> 18#include <linux/rculist_nulls.h> 19#include <linux/fs_struct.h> 20 21#include "io-wq.h" 22 23#define WORKER_IDLE_TIMEOUT (5 * HZ) 24 25enum { 26 IO_WORKER_F_UP = 1, /* up and active */ 27 IO_WORKER_F_RUNNING = 2, /* account as running */ 28 IO_WORKER_F_FREE = 4, /* worker on free list */ 29 IO_WORKER_F_EXITING = 8, /* worker exiting */ 30 IO_WORKER_F_FIXED = 16, /* static idle worker */ 31 IO_WORKER_F_BOUND = 32, /* is doing bounded work */ 32}; 33 34enum { 35 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 36 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */ 37 IO_WQ_BIT_ERROR = 2, /* error on setup */ 38}; 39 40enum { 41 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ 42}; 43 44/* 45 * One for each thread in a wqe pool 46 */ 47struct io_worker { 48 refcount_t ref; 49 unsigned flags; 50 struct hlist_nulls_node nulls_node; 51 struct list_head all_list; 52 struct task_struct *task; 53 struct io_wqe *wqe; 54 55 struct io_wq_work *cur_work; 56 spinlock_t lock; 57 58 struct rcu_head rcu; 59 struct mm_struct *mm; 60 const struct cred *cur_creds; 61 const struct cred *saved_creds; 62 struct files_struct *restore_files; 63 struct fs_struct *restore_fs; 64}; 65 66#if BITS_PER_LONG == 64 67#define IO_WQ_HASH_ORDER 6 68#else 69#define IO_WQ_HASH_ORDER 5 70#endif 71 72struct io_wqe_acct { 73 unsigned nr_workers; 74 unsigned max_workers; 75 atomic_t nr_running; 76}; 77 78enum { 79 IO_WQ_ACCT_BOUND, 80 IO_WQ_ACCT_UNBOUND, 81}; 82 83/* 84 * Per-node worker thread pool 85 */ 86struct io_wqe { 87 struct { 88 spinlock_t lock; 89 struct io_wq_work_list work_list; 90 unsigned long hash_map; 91 unsigned flags; 92 } ____cacheline_aligned_in_smp; 93 94 int node; 95 struct io_wqe_acct acct[2]; 96 97 struct hlist_nulls_head free_list; 98 struct list_head all_list; 99 100 struct io_wq *wq; 101}; 102 103/* 104 * Per io_wq state 105 */ 106struct io_wq { 107 struct io_wqe **wqes; 108 unsigned long state; 109 110 get_work_fn *get_work; 111 put_work_fn *put_work; 112 113 struct task_struct *manager; 114 struct user_struct *user; 115 refcount_t refs; 116 struct completion done; 117 118 refcount_t use_refs; 119}; 120 121static bool io_worker_get(struct io_worker *worker) 122{ 123 return refcount_inc_not_zero(&worker->ref); 124} 125 126static void io_worker_release(struct io_worker *worker) 127{ 128 if (refcount_dec_and_test(&worker->ref)) 129 wake_up_process(worker->task); 130} 131 132/* 133 * Note: drops the wqe->lock if returning true! The caller must re-acquire 134 * the lock in that case. Some callers need to restart handling if this 135 * happens, so we can't just re-acquire the lock on behalf of the caller. 136 */ 137static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) 138{ 139 bool dropped_lock = false; 140 141 if (worker->saved_creds) { 142 revert_creds(worker->saved_creds); 143 worker->cur_creds = worker->saved_creds = NULL; 144 } 145 146 if (current->files != worker->restore_files) { 147 __acquire(&wqe->lock); 148 spin_unlock_irq(&wqe->lock); 149 dropped_lock = true; 150 151 task_lock(current); 152 current->files = worker->restore_files; 153 task_unlock(current); 154 } 155 156 if (current->fs != worker->restore_fs) 157 current->fs = worker->restore_fs; 158 159 /* 160 * If we have an active mm, we need to drop the wq lock before unusing 161 * it. If we do, return true and let the caller retry the idle loop. 162 */ 163 if (worker->mm) { 164 if (!dropped_lock) { 165 __acquire(&wqe->lock); 166 spin_unlock_irq(&wqe->lock); 167 dropped_lock = true; 168 } 169 __set_current_state(TASK_RUNNING); 170 set_fs(KERNEL_DS); 171 unuse_mm(worker->mm); 172 mmput(worker->mm); 173 worker->mm = NULL; 174 } 175 176 return dropped_lock; 177} 178 179static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, 180 struct io_wq_work *work) 181{ 182 if (work->flags & IO_WQ_WORK_UNBOUND) 183 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 184 185 return &wqe->acct[IO_WQ_ACCT_BOUND]; 186} 187 188static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, 189 struct io_worker *worker) 190{ 191 if (worker->flags & IO_WORKER_F_BOUND) 192 return &wqe->acct[IO_WQ_ACCT_BOUND]; 193 194 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 195} 196 197static void io_worker_exit(struct io_worker *worker) 198{ 199 struct io_wqe *wqe = worker->wqe; 200 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 201 unsigned nr_workers; 202 203 /* 204 * If we're not at zero, someone else is holding a brief reference 205 * to the worker. Wait for that to go away. 206 */ 207 set_current_state(TASK_INTERRUPTIBLE); 208 if (!refcount_dec_and_test(&worker->ref)) 209 schedule(); 210 __set_current_state(TASK_RUNNING); 211 212 preempt_disable(); 213 current->flags &= ~PF_IO_WORKER; 214 if (worker->flags & IO_WORKER_F_RUNNING) 215 atomic_dec(&acct->nr_running); 216 if (!(worker->flags & IO_WORKER_F_BOUND)) 217 atomic_dec(&wqe->wq->user->processes); 218 worker->flags = 0; 219 preempt_enable(); 220 221 spin_lock_irq(&wqe->lock); 222 hlist_nulls_del_rcu(&worker->nulls_node); 223 list_del_rcu(&worker->all_list); 224 if (__io_worker_unuse(wqe, worker)) { 225 __release(&wqe->lock); 226 spin_lock_irq(&wqe->lock); 227 } 228 acct->nr_workers--; 229 nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers + 230 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers; 231 spin_unlock_irq(&wqe->lock); 232 233 /* all workers gone, wq exit can proceed */ 234 if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs)) 235 complete(&wqe->wq->done); 236 237 kfree_rcu(worker, rcu); 238} 239 240static inline bool io_wqe_run_queue(struct io_wqe *wqe) 241 __must_hold(wqe->lock) 242{ 243 if (!wq_list_empty(&wqe->work_list) && 244 !(wqe->flags & IO_WQE_FLAG_STALLED)) 245 return true; 246 return false; 247} 248 249/* 250 * Check head of free list for an available worker. If one isn't available, 251 * caller must wake up the wq manager to create one. 252 */ 253static bool io_wqe_activate_free_worker(struct io_wqe *wqe) 254 __must_hold(RCU) 255{ 256 struct hlist_nulls_node *n; 257 struct io_worker *worker; 258 259 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); 260 if (is_a_nulls(n)) 261 return false; 262 263 worker = hlist_nulls_entry(n, struct io_worker, nulls_node); 264 if (io_worker_get(worker)) { 265 wake_up_process(worker->task); 266 io_worker_release(worker); 267 return true; 268 } 269 270 return false; 271} 272 273/* 274 * We need a worker. If we find a free one, we're good. If not, and we're 275 * below the max number of workers, wake up the manager to create one. 276 */ 277static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) 278{ 279 bool ret; 280 281 /* 282 * Most likely an attempt to queue unbounded work on an io_wq that 283 * wasn't setup with any unbounded workers. 284 */ 285 WARN_ON_ONCE(!acct->max_workers); 286 287 rcu_read_lock(); 288 ret = io_wqe_activate_free_worker(wqe); 289 rcu_read_unlock(); 290 291 if (!ret && acct->nr_workers < acct->max_workers) 292 wake_up_process(wqe->wq->manager); 293} 294 295static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) 296{ 297 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 298 299 atomic_inc(&acct->nr_running); 300} 301 302static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) 303 __must_hold(wqe->lock) 304{ 305 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 306 307 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) 308 io_wqe_wake_worker(wqe, acct); 309} 310 311static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) 312{ 313 allow_kernel_signal(SIGINT); 314 315 current->flags |= PF_IO_WORKER; 316 317 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 318 worker->restore_files = current->files; 319 worker->restore_fs = current->fs; 320 io_wqe_inc_running(wqe, worker); 321} 322 323/* 324 * Worker will start processing some work. Move it to the busy list, if 325 * it's currently on the freelist 326 */ 327static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, 328 struct io_wq_work *work) 329 __must_hold(wqe->lock) 330{ 331 bool worker_bound, work_bound; 332 333 if (worker->flags & IO_WORKER_F_FREE) { 334 worker->flags &= ~IO_WORKER_F_FREE; 335 hlist_nulls_del_init_rcu(&worker->nulls_node); 336 } 337 338 /* 339 * If worker is moving from bound to unbound (or vice versa), then 340 * ensure we update the running accounting. 341 */ 342 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; 343 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; 344 if (worker_bound != work_bound) { 345 io_wqe_dec_running(wqe, worker); 346 if (work_bound) { 347 worker->flags |= IO_WORKER_F_BOUND; 348 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; 349 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; 350 atomic_dec(&wqe->wq->user->processes); 351 } else { 352 worker->flags &= ~IO_WORKER_F_BOUND; 353 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; 354 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; 355 atomic_inc(&wqe->wq->user->processes); 356 } 357 io_wqe_inc_running(wqe, worker); 358 } 359} 360 361/* 362 * No work, worker going to sleep. Move to freelist, and unuse mm if we 363 * have one attached. Dropping the mm may potentially sleep, so we drop 364 * the lock in that case and return success. Since the caller has to 365 * retry the loop in that case (we changed task state), we don't regrab 366 * the lock if we return success. 367 */ 368static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) 369 __must_hold(wqe->lock) 370{ 371 if (!(worker->flags & IO_WORKER_F_FREE)) { 372 worker->flags |= IO_WORKER_F_FREE; 373 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 374 } 375 376 return __io_worker_unuse(wqe, worker); 377} 378 379static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) 380 __must_hold(wqe->lock) 381{ 382 struct io_wq_work_node *node, *prev; 383 struct io_wq_work *work; 384 385 wq_list_for_each(node, prev, &wqe->work_list) { 386 work = container_of(node, struct io_wq_work, list); 387 388 /* not hashed, can run anytime */ 389 if (!(work->flags & IO_WQ_WORK_HASHED)) { 390 wq_node_del(&wqe->work_list, node, prev); 391 return work; 392 } 393 394 /* hashed, can run if not already running */ 395 *hash = work->flags >> IO_WQ_HASH_SHIFT; 396 if (!(wqe->hash_map & BIT_ULL(*hash))) { 397 wqe->hash_map |= BIT_ULL(*hash); 398 wq_node_del(&wqe->work_list, node, prev); 399 return work; 400 } 401 } 402 403 return NULL; 404} 405 406static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) 407{ 408 if (worker->mm) { 409 unuse_mm(worker->mm); 410 mmput(worker->mm); 411 worker->mm = NULL; 412 } 413 if (!work->mm) { 414 set_fs(KERNEL_DS); 415 return; 416 } 417 if (mmget_not_zero(work->mm)) { 418 use_mm(work->mm); 419 if (!worker->mm) 420 set_fs(USER_DS); 421 worker->mm = work->mm; 422 /* hang on to this mm */ 423 work->mm = NULL; 424 return; 425 } 426 427 /* failed grabbing mm, ensure work gets cancelled */ 428 work->flags |= IO_WQ_WORK_CANCEL; 429} 430 431static void io_wq_switch_creds(struct io_worker *worker, 432 struct io_wq_work *work) 433{ 434 const struct cred *old_creds = override_creds(work->creds); 435 436 worker->cur_creds = work->creds; 437 if (worker->saved_creds) 438 put_cred(old_creds); /* creds set by previous switch */ 439 else 440 worker->saved_creds = old_creds; 441} 442 443static void io_worker_handle_work(struct io_worker *worker) 444 __releases(wqe->lock) 445{ 446 struct io_wq_work *work, *old_work = NULL, *put_work = NULL; 447 struct io_wqe *wqe = worker->wqe; 448 struct io_wq *wq = wqe->wq; 449 450 do { 451 unsigned hash = -1U; 452 453 /* 454 * If we got some work, mark us as busy. If we didn't, but 455 * the list isn't empty, it means we stalled on hashed work. 456 * Mark us stalled so we don't keep looking for work when we 457 * can't make progress, any work completion or insertion will 458 * clear the stalled flag. 459 */ 460 work = io_get_next_work(wqe, &hash); 461 if (work) 462 __io_worker_busy(wqe, worker, work); 463 else if (!wq_list_empty(&wqe->work_list)) 464 wqe->flags |= IO_WQE_FLAG_STALLED; 465 466 spin_unlock_irq(&wqe->lock); 467 if (put_work && wq->put_work) 468 wq->put_work(old_work); 469 if (!work) 470 break; 471next: 472 /* flush any pending signals before assigning new work */ 473 if (signal_pending(current)) 474 flush_signals(current); 475 476 cond_resched(); 477 478 spin_lock_irq(&worker->lock); 479 worker->cur_work = work; 480 spin_unlock_irq(&worker->lock); 481 482 if (work->flags & IO_WQ_WORK_CB) 483 work->func(&work); 484 485 if (work->files && current->files != work->files) { 486 task_lock(current); 487 current->files = work->files; 488 task_unlock(current); 489 } 490 if (work->fs && current->fs != work->fs) 491 current->fs = work->fs; 492 if (work->mm != worker->mm) 493 io_wq_switch_mm(worker, work); 494 if (worker->cur_creds != work->creds) 495 io_wq_switch_creds(worker, work); 496 /* 497 * OK to set IO_WQ_WORK_CANCEL even for uncancellable work, 498 * the worker function will do the right thing. 499 */ 500 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) 501 work->flags |= IO_WQ_WORK_CANCEL; 502 if (worker->mm) 503 work->flags |= IO_WQ_WORK_HAS_MM; 504 505 if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) { 506 put_work = work; 507 wq->get_work(work); 508 } 509 510 old_work = work; 511 work->func(&work); 512 513 spin_lock_irq(&worker->lock); 514 worker->cur_work = NULL; 515 spin_unlock_irq(&worker->lock); 516 517 spin_lock_irq(&wqe->lock); 518 519 if (hash != -1U) { 520 wqe->hash_map &= ~BIT_ULL(hash); 521 wqe->flags &= ~IO_WQE_FLAG_STALLED; 522 } 523 if (work && work != old_work) { 524 spin_unlock_irq(&wqe->lock); 525 526 if (put_work && wq->put_work) { 527 wq->put_work(put_work); 528 put_work = NULL; 529 } 530 531 /* dependent work not hashed */ 532 hash = -1U; 533 goto next; 534 } 535 } while (1); 536} 537 538static inline void io_worker_spin_for_work(struct io_wqe *wqe) 539{ 540 int i = 0; 541 542 while (++i < 1000) { 543 if (io_wqe_run_queue(wqe)) 544 break; 545 if (need_resched()) 546 break; 547 cpu_relax(); 548 } 549} 550 551static int io_wqe_worker(void *data) 552{ 553 struct io_worker *worker = data; 554 struct io_wqe *wqe = worker->wqe; 555 struct io_wq *wq = wqe->wq; 556 bool did_work; 557 558 io_worker_start(wqe, worker); 559 560 did_work = false; 561 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 562 set_current_state(TASK_INTERRUPTIBLE); 563loop: 564 if (did_work) 565 io_worker_spin_for_work(wqe); 566 spin_lock_irq(&wqe->lock); 567 if (io_wqe_run_queue(wqe)) { 568 __set_current_state(TASK_RUNNING); 569 io_worker_handle_work(worker); 570 did_work = true; 571 goto loop; 572 } 573 did_work = false; 574 /* drops the lock on success, retry */ 575 if (__io_worker_idle(wqe, worker)) { 576 __release(&wqe->lock); 577 goto loop; 578 } 579 spin_unlock_irq(&wqe->lock); 580 if (signal_pending(current)) 581 flush_signals(current); 582 if (schedule_timeout(WORKER_IDLE_TIMEOUT)) 583 continue; 584 /* timed out, exit unless we're the fixed worker */ 585 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 586 !(worker->flags & IO_WORKER_F_FIXED)) 587 break; 588 } 589 590 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 591 spin_lock_irq(&wqe->lock); 592 if (!wq_list_empty(&wqe->work_list)) 593 io_worker_handle_work(worker); 594 else 595 spin_unlock_irq(&wqe->lock); 596 } 597 598 io_worker_exit(worker); 599 return 0; 600} 601 602/* 603 * Called when a worker is scheduled in. Mark us as currently running. 604 */ 605void io_wq_worker_running(struct task_struct *tsk) 606{ 607 struct io_worker *worker = kthread_data(tsk); 608 struct io_wqe *wqe = worker->wqe; 609 610 if (!(worker->flags & IO_WORKER_F_UP)) 611 return; 612 if (worker->flags & IO_WORKER_F_RUNNING) 613 return; 614 worker->flags |= IO_WORKER_F_RUNNING; 615 io_wqe_inc_running(wqe, worker); 616} 617 618/* 619 * Called when worker is going to sleep. If there are no workers currently 620 * running and we have work pending, wake up a free one or have the manager 621 * set one up. 622 */ 623void io_wq_worker_sleeping(struct task_struct *tsk) 624{ 625 struct io_worker *worker = kthread_data(tsk); 626 struct io_wqe *wqe = worker->wqe; 627 628 if (!(worker->flags & IO_WORKER_F_UP)) 629 return; 630 if (!(worker->flags & IO_WORKER_F_RUNNING)) 631 return; 632 633 worker->flags &= ~IO_WORKER_F_RUNNING; 634 635 spin_lock_irq(&wqe->lock); 636 io_wqe_dec_running(wqe, worker); 637 spin_unlock_irq(&wqe->lock); 638} 639 640static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) 641{ 642 struct io_wqe_acct *acct =&wqe->acct[index]; 643 struct io_worker *worker; 644 645 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); 646 if (!worker) 647 return false; 648 649 refcount_set(&worker->ref, 1); 650 worker->nulls_node.pprev = NULL; 651 worker->wqe = wqe; 652 spin_lock_init(&worker->lock); 653 654 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, 655 "io_wqe_worker-%d/%d", index, wqe->node); 656 if (IS_ERR(worker->task)) { 657 kfree(worker); 658 return false; 659 } 660 661 spin_lock_irq(&wqe->lock); 662 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 663 list_add_tail_rcu(&worker->all_list, &wqe->all_list); 664 worker->flags |= IO_WORKER_F_FREE; 665 if (index == IO_WQ_ACCT_BOUND) 666 worker->flags |= IO_WORKER_F_BOUND; 667 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) 668 worker->flags |= IO_WORKER_F_FIXED; 669 acct->nr_workers++; 670 spin_unlock_irq(&wqe->lock); 671 672 if (index == IO_WQ_ACCT_UNBOUND) 673 atomic_inc(&wq->user->processes); 674 675 wake_up_process(worker->task); 676 return true; 677} 678 679static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) 680 __must_hold(wqe->lock) 681{ 682 struct io_wqe_acct *acct = &wqe->acct[index]; 683 684 /* if we have available workers or no work, no need */ 685 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) 686 return false; 687 return acct->nr_workers < acct->max_workers; 688} 689 690/* 691 * Manager thread. Tasked with creating new workers, if we need them. 692 */ 693static int io_wq_manager(void *data) 694{ 695 struct io_wq *wq = data; 696 int workers_to_create = num_possible_nodes(); 697 int node; 698 699 /* create fixed workers */ 700 refcount_set(&wq->refs, workers_to_create); 701 for_each_node(node) { 702 if (!node_online(node)) 703 continue; 704 if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) 705 goto err; 706 workers_to_create--; 707 } 708 709 while (workers_to_create--) 710 refcount_dec(&wq->refs); 711 712 complete(&wq->done); 713 714 while (!kthread_should_stop()) { 715 for_each_node(node) { 716 struct io_wqe *wqe = wq->wqes[node]; 717 bool fork_worker[2] = { false, false }; 718 719 if (!node_online(node)) 720 continue; 721 722 spin_lock_irq(&wqe->lock); 723 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) 724 fork_worker[IO_WQ_ACCT_BOUND] = true; 725 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) 726 fork_worker[IO_WQ_ACCT_UNBOUND] = true; 727 spin_unlock_irq(&wqe->lock); 728 if (fork_worker[IO_WQ_ACCT_BOUND]) 729 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); 730 if (fork_worker[IO_WQ_ACCT_UNBOUND]) 731 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); 732 } 733 set_current_state(TASK_INTERRUPTIBLE); 734 schedule_timeout(HZ); 735 } 736 737 return 0; 738err: 739 set_bit(IO_WQ_BIT_ERROR, &wq->state); 740 set_bit(IO_WQ_BIT_EXIT, &wq->state); 741 if (refcount_sub_and_test(workers_to_create, &wq->refs)) 742 complete(&wq->done); 743 return 0; 744} 745 746static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, 747 struct io_wq_work *work) 748{ 749 bool free_worker; 750 751 if (!(work->flags & IO_WQ_WORK_UNBOUND)) 752 return true; 753 if (atomic_read(&acct->nr_running)) 754 return true; 755 756 rcu_read_lock(); 757 free_worker = !hlist_nulls_empty(&wqe->free_list); 758 rcu_read_unlock(); 759 if (free_worker) 760 return true; 761 762 if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers && 763 !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN))) 764 return false; 765 766 return true; 767} 768 769static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) 770{ 771 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 772 int work_flags; 773 unsigned long flags; 774 775 /* 776 * Do early check to see if we need a new unbound worker, and if we do, 777 * if we're allowed to do so. This isn't 100% accurate as there's a 778 * gap between this check and incrementing the value, but that's OK. 779 * It's close enough to not be an issue, fork() has the same delay. 780 */ 781 if (unlikely(!io_wq_can_queue(wqe, acct, work))) { 782 work->flags |= IO_WQ_WORK_CANCEL; 783 work->func(&work); 784 return; 785 } 786 787 work_flags = work->flags; 788 spin_lock_irqsave(&wqe->lock, flags); 789 wq_list_add_tail(&work->list, &wqe->work_list); 790 wqe->flags &= ~IO_WQE_FLAG_STALLED; 791 spin_unlock_irqrestore(&wqe->lock, flags); 792 793 if ((work_flags & IO_WQ_WORK_CONCURRENT) || 794 !atomic_read(&acct->nr_running)) 795 io_wqe_wake_worker(wqe, acct); 796} 797 798void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 799{ 800 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 801 802 io_wqe_enqueue(wqe, work); 803} 804 805/* 806 * Enqueue work, hashed by some key. Work items that hash to the same value 807 * will not be done in parallel. Used to limit concurrent writes, generally 808 * hashed by inode. 809 */ 810void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val) 811{ 812 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 813 unsigned bit; 814 815 816 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 817 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 818 io_wqe_enqueue(wqe, work); 819} 820 821static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) 822{ 823 send_sig(SIGINT, worker->task, 1); 824 return false; 825} 826 827/* 828 * Iterate the passed in list and call the specific function for each 829 * worker that isn't exiting 830 */ 831static bool io_wq_for_each_worker(struct io_wqe *wqe, 832 bool (*func)(struct io_worker *, void *), 833 void *data) 834{ 835 struct io_worker *worker; 836 bool ret = false; 837 838 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { 839 if (io_worker_get(worker)) { 840 /* no task if node is/was offline */ 841 if (worker->task) 842 ret = func(worker, data); 843 io_worker_release(worker); 844 if (ret) 845 break; 846 } 847 } 848 849 return ret; 850} 851 852void io_wq_cancel_all(struct io_wq *wq) 853{ 854 int node; 855 856 set_bit(IO_WQ_BIT_CANCEL, &wq->state); 857 858 rcu_read_lock(); 859 for_each_node(node) { 860 struct io_wqe *wqe = wq->wqes[node]; 861 862 io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL); 863 } 864 rcu_read_unlock(); 865} 866 867struct io_cb_cancel_data { 868 struct io_wqe *wqe; 869 work_cancel_fn *cancel; 870 void *caller_data; 871}; 872 873static bool io_work_cancel(struct io_worker *worker, void *cancel_data) 874{ 875 struct io_cb_cancel_data *data = cancel_data; 876 unsigned long flags; 877 bool ret = false; 878 879 /* 880 * Hold the lock to avoid ->cur_work going out of scope, caller 881 * may dereference the passed in work. 882 */ 883 spin_lock_irqsave(&worker->lock, flags); 884 if (worker->cur_work && 885 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && 886 data->cancel(worker->cur_work, data->caller_data)) { 887 send_sig(SIGINT, worker->task, 1); 888 ret = true; 889 } 890 spin_unlock_irqrestore(&worker->lock, flags); 891 892 return ret; 893} 894 895static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, 896 work_cancel_fn *cancel, 897 void *cancel_data) 898{ 899 struct io_cb_cancel_data data = { 900 .wqe = wqe, 901 .cancel = cancel, 902 .caller_data = cancel_data, 903 }; 904 struct io_wq_work_node *node, *prev; 905 struct io_wq_work *work; 906 unsigned long flags; 907 bool found = false; 908 909 spin_lock_irqsave(&wqe->lock, flags); 910 wq_list_for_each(node, prev, &wqe->work_list) { 911 work = container_of(node, struct io_wq_work, list); 912 913 if (cancel(work, cancel_data)) { 914 wq_node_del(&wqe->work_list, node, prev); 915 found = true; 916 break; 917 } 918 } 919 spin_unlock_irqrestore(&wqe->lock, flags); 920 921 if (found) { 922 work->flags |= IO_WQ_WORK_CANCEL; 923 work->func(&work); 924 return IO_WQ_CANCEL_OK; 925 } 926 927 rcu_read_lock(); 928 found = io_wq_for_each_worker(wqe, io_work_cancel, &data); 929 rcu_read_unlock(); 930 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; 931} 932 933enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 934 void *data) 935{ 936 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; 937 int node; 938 939 for_each_node(node) { 940 struct io_wqe *wqe = wq->wqes[node]; 941 942 ret = io_wqe_cancel_cb_work(wqe, cancel, data); 943 if (ret != IO_WQ_CANCEL_NOTFOUND) 944 break; 945 } 946 947 return ret; 948} 949 950struct work_match { 951 bool (*fn)(struct io_wq_work *, void *data); 952 void *data; 953}; 954 955static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 956{ 957 struct work_match *match = data; 958 unsigned long flags; 959 bool ret = false; 960 961 spin_lock_irqsave(&worker->lock, flags); 962 if (match->fn(worker->cur_work, match->data) && 963 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) { 964 send_sig(SIGINT, worker->task, 1); 965 ret = true; 966 } 967 spin_unlock_irqrestore(&worker->lock, flags); 968 969 return ret; 970} 971 972static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, 973 struct work_match *match) 974{ 975 struct io_wq_work_node *node, *prev; 976 struct io_wq_work *work; 977 unsigned long flags; 978 bool found = false; 979 980 /* 981 * First check pending list, if we're lucky we can just remove it 982 * from there. CANCEL_OK means that the work is returned as-new, 983 * no completion will be posted for it. 984 */ 985 spin_lock_irqsave(&wqe->lock, flags); 986 wq_list_for_each(node, prev, &wqe->work_list) { 987 work = container_of(node, struct io_wq_work, list); 988 989 if (match->fn(work, match->data)) { 990 wq_node_del(&wqe->work_list, node, prev); 991 found = true; 992 break; 993 } 994 } 995 spin_unlock_irqrestore(&wqe->lock, flags); 996 997 if (found) { 998 work->flags |= IO_WQ_WORK_CANCEL; 999 work->func(&work); 1000 return IO_WQ_CANCEL_OK; 1001 } 1002 1003 /* 1004 * Now check if a free (going busy) or busy worker has the work 1005 * currently running. If we find it there, we'll return CANCEL_RUNNING 1006 * as an indication that we attempt to signal cancellation. The 1007 * completion will run normally in this case. 1008 */ 1009 rcu_read_lock(); 1010 found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); 1011 rcu_read_unlock(); 1012 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; 1013} 1014 1015static bool io_wq_work_match(struct io_wq_work *work, void *data) 1016{ 1017 return work == data; 1018} 1019 1020enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) 1021{ 1022 struct work_match match = { 1023 .fn = io_wq_work_match, 1024 .data = cwork 1025 }; 1026 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; 1027 int node; 1028 1029 cwork->flags |= IO_WQ_WORK_CANCEL; 1030 1031 for_each_node(node) { 1032 struct io_wqe *wqe = wq->wqes[node]; 1033 1034 ret = io_wqe_cancel_work(wqe, &match); 1035 if (ret != IO_WQ_CANCEL_NOTFOUND) 1036 break; 1037 } 1038 1039 return ret; 1040} 1041 1042static bool io_wq_pid_match(struct io_wq_work *work, void *data) 1043{ 1044 pid_t pid = (pid_t) (unsigned long) data; 1045 1046 if (work) 1047 return work->task_pid == pid; 1048 return false; 1049} 1050 1051enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid) 1052{ 1053 struct work_match match = { 1054 .fn = io_wq_pid_match, 1055 .data = (void *) (unsigned long) pid 1056 }; 1057 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; 1058 int node; 1059 1060 for_each_node(node) { 1061 struct io_wqe *wqe = wq->wqes[node]; 1062 1063 ret = io_wqe_cancel_work(wqe, &match); 1064 if (ret != IO_WQ_CANCEL_NOTFOUND) 1065 break; 1066 } 1067 1068 return ret; 1069} 1070 1071struct io_wq_flush_data { 1072 struct io_wq_work work; 1073 struct completion done; 1074}; 1075 1076static void io_wq_flush_func(struct io_wq_work **workptr) 1077{ 1078 struct io_wq_work *work = *workptr; 1079 struct io_wq_flush_data *data; 1080 1081 data = container_of(work, struct io_wq_flush_data, work); 1082 complete(&data->done); 1083} 1084 1085/* 1086 * Doesn't wait for previously queued work to finish. When this completes, 1087 * it just means that previously queued work was started. 1088 */ 1089void io_wq_flush(struct io_wq *wq) 1090{ 1091 struct io_wq_flush_data data; 1092 int node; 1093 1094 for_each_node(node) { 1095 struct io_wqe *wqe = wq->wqes[node]; 1096 1097 if (!node_online(node)) 1098 continue; 1099 init_completion(&data.done); 1100 INIT_IO_WORK(&data.work, io_wq_flush_func); 1101 data.work.flags |= IO_WQ_WORK_INTERNAL; 1102 io_wqe_enqueue(wqe, &data.work); 1103 wait_for_completion(&data.done); 1104 } 1105} 1106 1107struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1108{ 1109 int ret = -ENOMEM, node; 1110 struct io_wq *wq; 1111 1112 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 1113 if (!wq) 1114 return ERR_PTR(-ENOMEM); 1115 1116 wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL); 1117 if (!wq->wqes) { 1118 kfree(wq); 1119 return ERR_PTR(-ENOMEM); 1120 } 1121 1122 wq->get_work = data->get_work; 1123 wq->put_work = data->put_work; 1124 1125 /* caller must already hold a reference to this */ 1126 wq->user = data->user; 1127 1128 for_each_node(node) { 1129 struct io_wqe *wqe; 1130 int alloc_node = node; 1131 1132 if (!node_online(alloc_node)) 1133 alloc_node = NUMA_NO_NODE; 1134 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); 1135 if (!wqe) 1136 goto err; 1137 wq->wqes[node] = wqe; 1138 wqe->node = alloc_node; 1139 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1140 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); 1141 if (wq->user) { 1142 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1143 task_rlimit(current, RLIMIT_NPROC); 1144 } 1145 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); 1146 wqe->wq = wq; 1147 spin_lock_init(&wqe->lock); 1148 INIT_WQ_LIST(&wqe->work_list); 1149 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); 1150 INIT_LIST_HEAD(&wqe->all_list); 1151 } 1152 1153 init_completion(&wq->done); 1154 1155 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); 1156 if (!IS_ERR(wq->manager)) { 1157 wake_up_process(wq->manager); 1158 wait_for_completion(&wq->done); 1159 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { 1160 ret = -ENOMEM; 1161 goto err; 1162 } 1163 refcount_set(&wq->use_refs, 1); 1164 reinit_completion(&wq->done); 1165 return wq; 1166 } 1167 1168 ret = PTR_ERR(wq->manager); 1169 complete(&wq->done); 1170err: 1171 for_each_node(node) 1172 kfree(wq->wqes[node]); 1173 kfree(wq->wqes); 1174 kfree(wq); 1175 return ERR_PTR(ret); 1176} 1177 1178bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) 1179{ 1180 if (data->get_work != wq->get_work || data->put_work != wq->put_work) 1181 return false; 1182 1183 return refcount_inc_not_zero(&wq->use_refs); 1184} 1185 1186static bool io_wq_worker_wake(struct io_worker *worker, void *data) 1187{ 1188 wake_up_process(worker->task); 1189 return false; 1190} 1191 1192static void __io_wq_destroy(struct io_wq *wq) 1193{ 1194 int node; 1195 1196 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1197 if (wq->manager) 1198 kthread_stop(wq->manager); 1199 1200 rcu_read_lock(); 1201 for_each_node(node) 1202 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); 1203 rcu_read_unlock(); 1204 1205 wait_for_completion(&wq->done); 1206 1207 for_each_node(node) 1208 kfree(wq->wqes[node]); 1209 kfree(wq->wqes); 1210 kfree(wq); 1211} 1212 1213void io_wq_destroy(struct io_wq *wq) 1214{ 1215 if (refcount_dec_and_test(&wq->use_refs)) 1216 __io_wq_destroy(wq); 1217}