Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.6-rc7 1170 lines 28 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/mm.h> 13#include <linux/mmu_context.h> 14#include <linux/sched/mm.h> 15#include <linux/percpu.h> 16#include <linux/slab.h> 17#include <linux/kthread.h> 18#include <linux/rculist_nulls.h> 19#include <linux/fs_struct.h> 20 21#include "io-wq.h" 22 23#define WORKER_IDLE_TIMEOUT (5 * HZ) 24 25enum { 26 IO_WORKER_F_UP = 1, /* up and active */ 27 IO_WORKER_F_RUNNING = 2, /* account as running */ 28 IO_WORKER_F_FREE = 4, /* worker on free list */ 29 IO_WORKER_F_EXITING = 8, /* worker exiting */ 30 IO_WORKER_F_FIXED = 16, /* static idle worker */ 31 IO_WORKER_F_BOUND = 32, /* is doing bounded work */ 32}; 33 34enum { 35 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 36 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */ 37 IO_WQ_BIT_ERROR = 2, /* error on setup */ 38}; 39 40enum { 41 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ 42}; 43 44/* 45 * One for each thread in a wqe pool 46 */ 47struct io_worker { 48 refcount_t ref; 49 unsigned flags; 50 struct hlist_nulls_node nulls_node; 51 struct list_head all_list; 52 struct task_struct *task; 53 struct io_wqe *wqe; 54 55 struct io_wq_work *cur_work; 56 spinlock_t lock; 57 58 struct rcu_head rcu; 59 struct mm_struct *mm; 60 const struct cred *cur_creds; 61 const struct cred *saved_creds; 62 struct files_struct *restore_files; 63 struct fs_struct *restore_fs; 64}; 65 66#if BITS_PER_LONG == 64 67#define IO_WQ_HASH_ORDER 6 68#else 69#define IO_WQ_HASH_ORDER 5 70#endif 71 72struct io_wqe_acct { 73 unsigned nr_workers; 74 unsigned max_workers; 75 atomic_t nr_running; 76}; 77 78enum { 79 IO_WQ_ACCT_BOUND, 80 IO_WQ_ACCT_UNBOUND, 81}; 82 83/* 84 * Per-node worker thread pool 85 */ 86struct io_wqe { 87 struct { 88 spinlock_t lock; 89 struct io_wq_work_list work_list; 90 unsigned long hash_map; 91 unsigned flags; 92 } ____cacheline_aligned_in_smp; 93 94 int node; 95 struct io_wqe_acct acct[2]; 96 97 struct hlist_nulls_head free_list; 98 struct list_head all_list; 99 100 struct io_wq *wq; 101}; 102 103/* 104 * Per io_wq state 105 */ 106struct io_wq { 107 struct io_wqe **wqes; 108 unsigned long state; 109 110 get_work_fn *get_work; 111 put_work_fn *put_work; 112 113 struct task_struct *manager; 114 struct user_struct *user; 115 refcount_t refs; 116 struct completion done; 117 118 refcount_t use_refs; 119}; 120 121static bool io_worker_get(struct io_worker *worker) 122{ 123 return refcount_inc_not_zero(&worker->ref); 124} 125 126static void io_worker_release(struct io_worker *worker) 127{ 128 if (refcount_dec_and_test(&worker->ref)) 129 wake_up_process(worker->task); 130} 131 132/* 133 * Note: drops the wqe->lock if returning true! The caller must re-acquire 134 * the lock in that case. Some callers need to restart handling if this 135 * happens, so we can't just re-acquire the lock on behalf of the caller. 136 */ 137static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) 138{ 139 bool dropped_lock = false; 140 141 if (worker->saved_creds) { 142 revert_creds(worker->saved_creds); 143 worker->cur_creds = worker->saved_creds = NULL; 144 } 145 146 if (current->files != worker->restore_files) { 147 __acquire(&wqe->lock); 148 spin_unlock_irq(&wqe->lock); 149 dropped_lock = true; 150 151 task_lock(current); 152 current->files = worker->restore_files; 153 task_unlock(current); 154 } 155 156 if (current->fs != worker->restore_fs) 157 current->fs = worker->restore_fs; 158 159 /* 160 * If we have an active mm, we need to drop the wq lock before unusing 161 * it. If we do, return true and let the caller retry the idle loop. 162 */ 163 if (worker->mm) { 164 if (!dropped_lock) { 165 __acquire(&wqe->lock); 166 spin_unlock_irq(&wqe->lock); 167 dropped_lock = true; 168 } 169 __set_current_state(TASK_RUNNING); 170 set_fs(KERNEL_DS); 171 unuse_mm(worker->mm); 172 mmput(worker->mm); 173 worker->mm = NULL; 174 } 175 176 return dropped_lock; 177} 178 179static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, 180 struct io_wq_work *work) 181{ 182 if (work->flags & IO_WQ_WORK_UNBOUND) 183 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 184 185 return &wqe->acct[IO_WQ_ACCT_BOUND]; 186} 187 188static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, 189 struct io_worker *worker) 190{ 191 if (worker->flags & IO_WORKER_F_BOUND) 192 return &wqe->acct[IO_WQ_ACCT_BOUND]; 193 194 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 195} 196 197static void io_worker_exit(struct io_worker *worker) 198{ 199 struct io_wqe *wqe = worker->wqe; 200 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 201 unsigned nr_workers; 202 203 /* 204 * If we're not at zero, someone else is holding a brief reference 205 * to the worker. Wait for that to go away. 206 */ 207 set_current_state(TASK_INTERRUPTIBLE); 208 if (!refcount_dec_and_test(&worker->ref)) 209 schedule(); 210 __set_current_state(TASK_RUNNING); 211 212 preempt_disable(); 213 current->flags &= ~PF_IO_WORKER; 214 if (worker->flags & IO_WORKER_F_RUNNING) 215 atomic_dec(&acct->nr_running); 216 if (!(worker->flags & IO_WORKER_F_BOUND)) 217 atomic_dec(&wqe->wq->user->processes); 218 worker->flags = 0; 219 preempt_enable(); 220 221 spin_lock_irq(&wqe->lock); 222 hlist_nulls_del_rcu(&worker->nulls_node); 223 list_del_rcu(&worker->all_list); 224 if (__io_worker_unuse(wqe, worker)) { 225 __release(&wqe->lock); 226 spin_lock_irq(&wqe->lock); 227 } 228 acct->nr_workers--; 229 nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers + 230 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers; 231 spin_unlock_irq(&wqe->lock); 232 233 /* all workers gone, wq exit can proceed */ 234 if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs)) 235 complete(&wqe->wq->done); 236 237 kfree_rcu(worker, rcu); 238} 239 240static inline bool io_wqe_run_queue(struct io_wqe *wqe) 241 __must_hold(wqe->lock) 242{ 243 if (!wq_list_empty(&wqe->work_list) && 244 !(wqe->flags & IO_WQE_FLAG_STALLED)) 245 return true; 246 return false; 247} 248 249/* 250 * Check head of free list for an available worker. If one isn't available, 251 * caller must wake up the wq manager to create one. 252 */ 253static bool io_wqe_activate_free_worker(struct io_wqe *wqe) 254 __must_hold(RCU) 255{ 256 struct hlist_nulls_node *n; 257 struct io_worker *worker; 258 259 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); 260 if (is_a_nulls(n)) 261 return false; 262 263 worker = hlist_nulls_entry(n, struct io_worker, nulls_node); 264 if (io_worker_get(worker)) { 265 wake_up_process(worker->task); 266 io_worker_release(worker); 267 return true; 268 } 269 270 return false; 271} 272 273/* 274 * We need a worker. If we find a free one, we're good. If not, and we're 275 * below the max number of workers, wake up the manager to create one. 276 */ 277static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) 278{ 279 bool ret; 280 281 /* 282 * Most likely an attempt to queue unbounded work on an io_wq that 283 * wasn't setup with any unbounded workers. 284 */ 285 WARN_ON_ONCE(!acct->max_workers); 286 287 rcu_read_lock(); 288 ret = io_wqe_activate_free_worker(wqe); 289 rcu_read_unlock(); 290 291 if (!ret && acct->nr_workers < acct->max_workers) 292 wake_up_process(wqe->wq->manager); 293} 294 295static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) 296{ 297 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 298 299 atomic_inc(&acct->nr_running); 300} 301 302static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) 303 __must_hold(wqe->lock) 304{ 305 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 306 307 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) 308 io_wqe_wake_worker(wqe, acct); 309} 310 311static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) 312{ 313 allow_kernel_signal(SIGINT); 314 315 current->flags |= PF_IO_WORKER; 316 317 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 318 worker->restore_files = current->files; 319 worker->restore_fs = current->fs; 320 io_wqe_inc_running(wqe, worker); 321} 322 323/* 324 * Worker will start processing some work. Move it to the busy list, if 325 * it's currently on the freelist 326 */ 327static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, 328 struct io_wq_work *work) 329 __must_hold(wqe->lock) 330{ 331 bool worker_bound, work_bound; 332 333 if (worker->flags & IO_WORKER_F_FREE) { 334 worker->flags &= ~IO_WORKER_F_FREE; 335 hlist_nulls_del_init_rcu(&worker->nulls_node); 336 } 337 338 /* 339 * If worker is moving from bound to unbound (or vice versa), then 340 * ensure we update the running accounting. 341 */ 342 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; 343 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; 344 if (worker_bound != work_bound) { 345 io_wqe_dec_running(wqe, worker); 346 if (work_bound) { 347 worker->flags |= IO_WORKER_F_BOUND; 348 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; 349 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; 350 atomic_dec(&wqe->wq->user->processes); 351 } else { 352 worker->flags &= ~IO_WORKER_F_BOUND; 353 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; 354 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; 355 atomic_inc(&wqe->wq->user->processes); 356 } 357 io_wqe_inc_running(wqe, worker); 358 } 359} 360 361/* 362 * No work, worker going to sleep. Move to freelist, and unuse mm if we 363 * have one attached. Dropping the mm may potentially sleep, so we drop 364 * the lock in that case and return success. Since the caller has to 365 * retry the loop in that case (we changed task state), we don't regrab 366 * the lock if we return success. 367 */ 368static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) 369 __must_hold(wqe->lock) 370{ 371 if (!(worker->flags & IO_WORKER_F_FREE)) { 372 worker->flags |= IO_WORKER_F_FREE; 373 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 374 } 375 376 return __io_worker_unuse(wqe, worker); 377} 378 379static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) 380 __must_hold(wqe->lock) 381{ 382 struct io_wq_work_node *node, *prev; 383 struct io_wq_work *work; 384 385 wq_list_for_each(node, prev, &wqe->work_list) { 386 work = container_of(node, struct io_wq_work, list); 387 388 /* not hashed, can run anytime */ 389 if (!(work->flags & IO_WQ_WORK_HASHED)) { 390 wq_node_del(&wqe->work_list, node, prev); 391 return work; 392 } 393 394 /* hashed, can run if not already running */ 395 *hash = work->flags >> IO_WQ_HASH_SHIFT; 396 if (!(wqe->hash_map & BIT_ULL(*hash))) { 397 wqe->hash_map |= BIT_ULL(*hash); 398 wq_node_del(&wqe->work_list, node, prev); 399 return work; 400 } 401 } 402 403 return NULL; 404} 405 406static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) 407{ 408 if (worker->mm) { 409 unuse_mm(worker->mm); 410 mmput(worker->mm); 411 worker->mm = NULL; 412 } 413 if (!work->mm) { 414 set_fs(KERNEL_DS); 415 return; 416 } 417 if (mmget_not_zero(work->mm)) { 418 use_mm(work->mm); 419 if (!worker->mm) 420 set_fs(USER_DS); 421 worker->mm = work->mm; 422 /* hang on to this mm */ 423 work->mm = NULL; 424 return; 425 } 426 427 /* failed grabbing mm, ensure work gets cancelled */ 428 work->flags |= IO_WQ_WORK_CANCEL; 429} 430 431static void io_wq_switch_creds(struct io_worker *worker, 432 struct io_wq_work *work) 433{ 434 const struct cred *old_creds = override_creds(work->creds); 435 436 worker->cur_creds = work->creds; 437 if (worker->saved_creds) 438 put_cred(old_creds); /* creds set by previous switch */ 439 else 440 worker->saved_creds = old_creds; 441} 442 443static void io_worker_handle_work(struct io_worker *worker) 444 __releases(wqe->lock) 445{ 446 struct io_wq_work *work, *old_work = NULL, *put_work = NULL; 447 struct io_wqe *wqe = worker->wqe; 448 struct io_wq *wq = wqe->wq; 449 450 do { 451 unsigned hash = -1U; 452 453 /* 454 * If we got some work, mark us as busy. If we didn't, but 455 * the list isn't empty, it means we stalled on hashed work. 456 * Mark us stalled so we don't keep looking for work when we 457 * can't make progress, any work completion or insertion will 458 * clear the stalled flag. 459 */ 460 work = io_get_next_work(wqe, &hash); 461 if (work) 462 __io_worker_busy(wqe, worker, work); 463 else if (!wq_list_empty(&wqe->work_list)) 464 wqe->flags |= IO_WQE_FLAG_STALLED; 465 466 spin_unlock_irq(&wqe->lock); 467 if (put_work && wq->put_work) 468 wq->put_work(old_work); 469 if (!work) 470 break; 471next: 472 /* flush any pending signals before assigning new work */ 473 if (signal_pending(current)) 474 flush_signals(current); 475 476 cond_resched(); 477 478 spin_lock_irq(&worker->lock); 479 worker->cur_work = work; 480 spin_unlock_irq(&worker->lock); 481 482 if (work->flags & IO_WQ_WORK_CB) 483 work->func(&work); 484 485 if (work->files && current->files != work->files) { 486 task_lock(current); 487 current->files = work->files; 488 task_unlock(current); 489 } 490 if (work->fs && current->fs != work->fs) 491 current->fs = work->fs; 492 if (work->mm != worker->mm) 493 io_wq_switch_mm(worker, work); 494 if (worker->cur_creds != work->creds) 495 io_wq_switch_creds(worker, work); 496 /* 497 * OK to set IO_WQ_WORK_CANCEL even for uncancellable work, 498 * the worker function will do the right thing. 499 */ 500 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) 501 work->flags |= IO_WQ_WORK_CANCEL; 502 if (worker->mm) 503 work->flags |= IO_WQ_WORK_HAS_MM; 504 505 if (wq->get_work) { 506 put_work = work; 507 wq->get_work(work); 508 } 509 510 old_work = work; 511 work->func(&work); 512 513 spin_lock_irq(&worker->lock); 514 worker->cur_work = NULL; 515 spin_unlock_irq(&worker->lock); 516 517 spin_lock_irq(&wqe->lock); 518 519 if (hash != -1U) { 520 wqe->hash_map &= ~BIT_ULL(hash); 521 wqe->flags &= ~IO_WQE_FLAG_STALLED; 522 } 523 if (work && work != old_work) { 524 spin_unlock_irq(&wqe->lock); 525 526 if (put_work && wq->put_work) { 527 wq->put_work(put_work); 528 put_work = NULL; 529 } 530 531 /* dependent work not hashed */ 532 hash = -1U; 533 goto next; 534 } 535 } while (1); 536} 537 538static int io_wqe_worker(void *data) 539{ 540 struct io_worker *worker = data; 541 struct io_wqe *wqe = worker->wqe; 542 struct io_wq *wq = wqe->wq; 543 544 io_worker_start(wqe, worker); 545 546 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 547 set_current_state(TASK_INTERRUPTIBLE); 548loop: 549 spin_lock_irq(&wqe->lock); 550 if (io_wqe_run_queue(wqe)) { 551 __set_current_state(TASK_RUNNING); 552 io_worker_handle_work(worker); 553 goto loop; 554 } 555 /* drops the lock on success, retry */ 556 if (__io_worker_idle(wqe, worker)) { 557 __release(&wqe->lock); 558 goto loop; 559 } 560 spin_unlock_irq(&wqe->lock); 561 if (signal_pending(current)) 562 flush_signals(current); 563 if (schedule_timeout(WORKER_IDLE_TIMEOUT)) 564 continue; 565 /* timed out, exit unless we're the fixed worker */ 566 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 567 !(worker->flags & IO_WORKER_F_FIXED)) 568 break; 569 } 570 571 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 572 spin_lock_irq(&wqe->lock); 573 if (!wq_list_empty(&wqe->work_list)) 574 io_worker_handle_work(worker); 575 else 576 spin_unlock_irq(&wqe->lock); 577 } 578 579 io_worker_exit(worker); 580 return 0; 581} 582 583/* 584 * Called when a worker is scheduled in. Mark us as currently running. 585 */ 586void io_wq_worker_running(struct task_struct *tsk) 587{ 588 struct io_worker *worker = kthread_data(tsk); 589 struct io_wqe *wqe = worker->wqe; 590 591 if (!(worker->flags & IO_WORKER_F_UP)) 592 return; 593 if (worker->flags & IO_WORKER_F_RUNNING) 594 return; 595 worker->flags |= IO_WORKER_F_RUNNING; 596 io_wqe_inc_running(wqe, worker); 597} 598 599/* 600 * Called when worker is going to sleep. If there are no workers currently 601 * running and we have work pending, wake up a free one or have the manager 602 * set one up. 603 */ 604void io_wq_worker_sleeping(struct task_struct *tsk) 605{ 606 struct io_worker *worker = kthread_data(tsk); 607 struct io_wqe *wqe = worker->wqe; 608 609 if (!(worker->flags & IO_WORKER_F_UP)) 610 return; 611 if (!(worker->flags & IO_WORKER_F_RUNNING)) 612 return; 613 614 worker->flags &= ~IO_WORKER_F_RUNNING; 615 616 spin_lock_irq(&wqe->lock); 617 io_wqe_dec_running(wqe, worker); 618 spin_unlock_irq(&wqe->lock); 619} 620 621static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) 622{ 623 struct io_wqe_acct *acct =&wqe->acct[index]; 624 struct io_worker *worker; 625 626 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); 627 if (!worker) 628 return false; 629 630 refcount_set(&worker->ref, 1); 631 worker->nulls_node.pprev = NULL; 632 worker->wqe = wqe; 633 spin_lock_init(&worker->lock); 634 635 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, 636 "io_wqe_worker-%d/%d", index, wqe->node); 637 if (IS_ERR(worker->task)) { 638 kfree(worker); 639 return false; 640 } 641 642 spin_lock_irq(&wqe->lock); 643 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 644 list_add_tail_rcu(&worker->all_list, &wqe->all_list); 645 worker->flags |= IO_WORKER_F_FREE; 646 if (index == IO_WQ_ACCT_BOUND) 647 worker->flags |= IO_WORKER_F_BOUND; 648 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) 649 worker->flags |= IO_WORKER_F_FIXED; 650 acct->nr_workers++; 651 spin_unlock_irq(&wqe->lock); 652 653 if (index == IO_WQ_ACCT_UNBOUND) 654 atomic_inc(&wq->user->processes); 655 656 wake_up_process(worker->task); 657 return true; 658} 659 660static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) 661 __must_hold(wqe->lock) 662{ 663 struct io_wqe_acct *acct = &wqe->acct[index]; 664 665 /* if we have available workers or no work, no need */ 666 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) 667 return false; 668 return acct->nr_workers < acct->max_workers; 669} 670 671/* 672 * Manager thread. Tasked with creating new workers, if we need them. 673 */ 674static int io_wq_manager(void *data) 675{ 676 struct io_wq *wq = data; 677 int workers_to_create = num_possible_nodes(); 678 int node; 679 680 /* create fixed workers */ 681 refcount_set(&wq->refs, workers_to_create); 682 for_each_node(node) { 683 if (!node_online(node)) 684 continue; 685 if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) 686 goto err; 687 workers_to_create--; 688 } 689 690 while (workers_to_create--) 691 refcount_dec(&wq->refs); 692 693 complete(&wq->done); 694 695 while (!kthread_should_stop()) { 696 for_each_node(node) { 697 struct io_wqe *wqe = wq->wqes[node]; 698 bool fork_worker[2] = { false, false }; 699 700 if (!node_online(node)) 701 continue; 702 703 spin_lock_irq(&wqe->lock); 704 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) 705 fork_worker[IO_WQ_ACCT_BOUND] = true; 706 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) 707 fork_worker[IO_WQ_ACCT_UNBOUND] = true; 708 spin_unlock_irq(&wqe->lock); 709 if (fork_worker[IO_WQ_ACCT_BOUND]) 710 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); 711 if (fork_worker[IO_WQ_ACCT_UNBOUND]) 712 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); 713 } 714 set_current_state(TASK_INTERRUPTIBLE); 715 schedule_timeout(HZ); 716 } 717 718 return 0; 719err: 720 set_bit(IO_WQ_BIT_ERROR, &wq->state); 721 set_bit(IO_WQ_BIT_EXIT, &wq->state); 722 if (refcount_sub_and_test(workers_to_create, &wq->refs)) 723 complete(&wq->done); 724 return 0; 725} 726 727static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, 728 struct io_wq_work *work) 729{ 730 bool free_worker; 731 732 if (!(work->flags & IO_WQ_WORK_UNBOUND)) 733 return true; 734 if (atomic_read(&acct->nr_running)) 735 return true; 736 737 rcu_read_lock(); 738 free_worker = !hlist_nulls_empty(&wqe->free_list); 739 rcu_read_unlock(); 740 if (free_worker) 741 return true; 742 743 if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers && 744 !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN))) 745 return false; 746 747 return true; 748} 749 750static void io_run_cancel(struct io_wq_work *work) 751{ 752 do { 753 struct io_wq_work *old_work = work; 754 755 work->flags |= IO_WQ_WORK_CANCEL; 756 work->func(&work); 757 work = (work == old_work) ? NULL : work; 758 } while (work); 759} 760 761static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) 762{ 763 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 764 int work_flags; 765 unsigned long flags; 766 767 /* 768 * Do early check to see if we need a new unbound worker, and if we do, 769 * if we're allowed to do so. This isn't 100% accurate as there's a 770 * gap between this check and incrementing the value, but that's OK. 771 * It's close enough to not be an issue, fork() has the same delay. 772 */ 773 if (unlikely(!io_wq_can_queue(wqe, acct, work))) { 774 io_run_cancel(work); 775 return; 776 } 777 778 work_flags = work->flags; 779 spin_lock_irqsave(&wqe->lock, flags); 780 wq_list_add_tail(&work->list, &wqe->work_list); 781 wqe->flags &= ~IO_WQE_FLAG_STALLED; 782 spin_unlock_irqrestore(&wqe->lock, flags); 783 784 if ((work_flags & IO_WQ_WORK_CONCURRENT) || 785 !atomic_read(&acct->nr_running)) 786 io_wqe_wake_worker(wqe, acct); 787} 788 789void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 790{ 791 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 792 793 io_wqe_enqueue(wqe, work); 794} 795 796/* 797 * Enqueue work, hashed by some key. Work items that hash to the same value 798 * will not be done in parallel. Used to limit concurrent writes, generally 799 * hashed by inode. 800 */ 801void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val) 802{ 803 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 804 unsigned bit; 805 806 807 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 808 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 809 io_wqe_enqueue(wqe, work); 810} 811 812static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) 813{ 814 send_sig(SIGINT, worker->task, 1); 815 return false; 816} 817 818/* 819 * Iterate the passed in list and call the specific function for each 820 * worker that isn't exiting 821 */ 822static bool io_wq_for_each_worker(struct io_wqe *wqe, 823 bool (*func)(struct io_worker *, void *), 824 void *data) 825{ 826 struct io_worker *worker; 827 bool ret = false; 828 829 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { 830 if (io_worker_get(worker)) { 831 /* no task if node is/was offline */ 832 if (worker->task) 833 ret = func(worker, data); 834 io_worker_release(worker); 835 if (ret) 836 break; 837 } 838 } 839 840 return ret; 841} 842 843void io_wq_cancel_all(struct io_wq *wq) 844{ 845 int node; 846 847 set_bit(IO_WQ_BIT_CANCEL, &wq->state); 848 849 rcu_read_lock(); 850 for_each_node(node) { 851 struct io_wqe *wqe = wq->wqes[node]; 852 853 io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL); 854 } 855 rcu_read_unlock(); 856} 857 858struct io_cb_cancel_data { 859 struct io_wqe *wqe; 860 work_cancel_fn *cancel; 861 void *caller_data; 862}; 863 864static bool io_work_cancel(struct io_worker *worker, void *cancel_data) 865{ 866 struct io_cb_cancel_data *data = cancel_data; 867 unsigned long flags; 868 bool ret = false; 869 870 /* 871 * Hold the lock to avoid ->cur_work going out of scope, caller 872 * may dereference the passed in work. 873 */ 874 spin_lock_irqsave(&worker->lock, flags); 875 if (worker->cur_work && 876 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && 877 data->cancel(worker->cur_work, data->caller_data)) { 878 send_sig(SIGINT, worker->task, 1); 879 ret = true; 880 } 881 spin_unlock_irqrestore(&worker->lock, flags); 882 883 return ret; 884} 885 886static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, 887 work_cancel_fn *cancel, 888 void *cancel_data) 889{ 890 struct io_cb_cancel_data data = { 891 .wqe = wqe, 892 .cancel = cancel, 893 .caller_data = cancel_data, 894 }; 895 struct io_wq_work_node *node, *prev; 896 struct io_wq_work *work; 897 unsigned long flags; 898 bool found = false; 899 900 spin_lock_irqsave(&wqe->lock, flags); 901 wq_list_for_each(node, prev, &wqe->work_list) { 902 work = container_of(node, struct io_wq_work, list); 903 904 if (cancel(work, cancel_data)) { 905 wq_node_del(&wqe->work_list, node, prev); 906 found = true; 907 break; 908 } 909 } 910 spin_unlock_irqrestore(&wqe->lock, flags); 911 912 if (found) { 913 io_run_cancel(work); 914 return IO_WQ_CANCEL_OK; 915 } 916 917 rcu_read_lock(); 918 found = io_wq_for_each_worker(wqe, io_work_cancel, &data); 919 rcu_read_unlock(); 920 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; 921} 922 923enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 924 void *data) 925{ 926 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; 927 int node; 928 929 for_each_node(node) { 930 struct io_wqe *wqe = wq->wqes[node]; 931 932 ret = io_wqe_cancel_cb_work(wqe, cancel, data); 933 if (ret != IO_WQ_CANCEL_NOTFOUND) 934 break; 935 } 936 937 return ret; 938} 939 940struct work_match { 941 bool (*fn)(struct io_wq_work *, void *data); 942 void *data; 943}; 944 945static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 946{ 947 struct work_match *match = data; 948 unsigned long flags; 949 bool ret = false; 950 951 spin_lock_irqsave(&worker->lock, flags); 952 if (match->fn(worker->cur_work, match->data) && 953 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) { 954 send_sig(SIGINT, worker->task, 1); 955 ret = true; 956 } 957 spin_unlock_irqrestore(&worker->lock, flags); 958 959 return ret; 960} 961 962static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, 963 struct work_match *match) 964{ 965 struct io_wq_work_node *node, *prev; 966 struct io_wq_work *work; 967 unsigned long flags; 968 bool found = false; 969 970 /* 971 * First check pending list, if we're lucky we can just remove it 972 * from there. CANCEL_OK means that the work is returned as-new, 973 * no completion will be posted for it. 974 */ 975 spin_lock_irqsave(&wqe->lock, flags); 976 wq_list_for_each(node, prev, &wqe->work_list) { 977 work = container_of(node, struct io_wq_work, list); 978 979 if (match->fn(work, match->data)) { 980 wq_node_del(&wqe->work_list, node, prev); 981 found = true; 982 break; 983 } 984 } 985 spin_unlock_irqrestore(&wqe->lock, flags); 986 987 if (found) { 988 io_run_cancel(work); 989 return IO_WQ_CANCEL_OK; 990 } 991 992 /* 993 * Now check if a free (going busy) or busy worker has the work 994 * currently running. If we find it there, we'll return CANCEL_RUNNING 995 * as an indication that we attempt to signal cancellation. The 996 * completion will run normally in this case. 997 */ 998 rcu_read_lock(); 999 found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); 1000 rcu_read_unlock(); 1001 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; 1002} 1003 1004static bool io_wq_work_match(struct io_wq_work *work, void *data) 1005{ 1006 return work == data; 1007} 1008 1009enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) 1010{ 1011 struct work_match match = { 1012 .fn = io_wq_work_match, 1013 .data = cwork 1014 }; 1015 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; 1016 int node; 1017 1018 cwork->flags |= IO_WQ_WORK_CANCEL; 1019 1020 for_each_node(node) { 1021 struct io_wqe *wqe = wq->wqes[node]; 1022 1023 ret = io_wqe_cancel_work(wqe, &match); 1024 if (ret != IO_WQ_CANCEL_NOTFOUND) 1025 break; 1026 } 1027 1028 return ret; 1029} 1030 1031static bool io_wq_pid_match(struct io_wq_work *work, void *data) 1032{ 1033 pid_t pid = (pid_t) (unsigned long) data; 1034 1035 if (work) 1036 return work->task_pid == pid; 1037 return false; 1038} 1039 1040enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid) 1041{ 1042 struct work_match match = { 1043 .fn = io_wq_pid_match, 1044 .data = (void *) (unsigned long) pid 1045 }; 1046 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; 1047 int node; 1048 1049 for_each_node(node) { 1050 struct io_wqe *wqe = wq->wqes[node]; 1051 1052 ret = io_wqe_cancel_work(wqe, &match); 1053 if (ret != IO_WQ_CANCEL_NOTFOUND) 1054 break; 1055 } 1056 1057 return ret; 1058} 1059 1060struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1061{ 1062 int ret = -ENOMEM, node; 1063 struct io_wq *wq; 1064 1065 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 1066 if (!wq) 1067 return ERR_PTR(-ENOMEM); 1068 1069 wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL); 1070 if (!wq->wqes) { 1071 kfree(wq); 1072 return ERR_PTR(-ENOMEM); 1073 } 1074 1075 wq->get_work = data->get_work; 1076 wq->put_work = data->put_work; 1077 1078 /* caller must already hold a reference to this */ 1079 wq->user = data->user; 1080 1081 for_each_node(node) { 1082 struct io_wqe *wqe; 1083 int alloc_node = node; 1084 1085 if (!node_online(alloc_node)) 1086 alloc_node = NUMA_NO_NODE; 1087 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); 1088 if (!wqe) 1089 goto err; 1090 wq->wqes[node] = wqe; 1091 wqe->node = alloc_node; 1092 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1093 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); 1094 if (wq->user) { 1095 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1096 task_rlimit(current, RLIMIT_NPROC); 1097 } 1098 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); 1099 wqe->wq = wq; 1100 spin_lock_init(&wqe->lock); 1101 INIT_WQ_LIST(&wqe->work_list); 1102 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); 1103 INIT_LIST_HEAD(&wqe->all_list); 1104 } 1105 1106 init_completion(&wq->done); 1107 1108 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); 1109 if (!IS_ERR(wq->manager)) { 1110 wake_up_process(wq->manager); 1111 wait_for_completion(&wq->done); 1112 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { 1113 ret = -ENOMEM; 1114 goto err; 1115 } 1116 refcount_set(&wq->use_refs, 1); 1117 reinit_completion(&wq->done); 1118 return wq; 1119 } 1120 1121 ret = PTR_ERR(wq->manager); 1122 complete(&wq->done); 1123err: 1124 for_each_node(node) 1125 kfree(wq->wqes[node]); 1126 kfree(wq->wqes); 1127 kfree(wq); 1128 return ERR_PTR(ret); 1129} 1130 1131bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) 1132{ 1133 if (data->get_work != wq->get_work || data->put_work != wq->put_work) 1134 return false; 1135 1136 return refcount_inc_not_zero(&wq->use_refs); 1137} 1138 1139static bool io_wq_worker_wake(struct io_worker *worker, void *data) 1140{ 1141 wake_up_process(worker->task); 1142 return false; 1143} 1144 1145static void __io_wq_destroy(struct io_wq *wq) 1146{ 1147 int node; 1148 1149 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1150 if (wq->manager) 1151 kthread_stop(wq->manager); 1152 1153 rcu_read_lock(); 1154 for_each_node(node) 1155 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); 1156 rcu_read_unlock(); 1157 1158 wait_for_completion(&wq->done); 1159 1160 for_each_node(node) 1161 kfree(wq->wqes[node]); 1162 kfree(wq->wqes); 1163 kfree(wq); 1164} 1165 1166void io_wq_destroy(struct io_wq *wq) 1167{ 1168 if (refcount_dec_and_test(&wq->use_refs)) 1169 __io_wq_destroy(wq); 1170}