Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.12-rc5 1135 lines 27 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/mm.h> 13#include <linux/sched/mm.h> 14#include <linux/percpu.h> 15#include <linux/slab.h> 16#include <linux/rculist_nulls.h> 17#include <linux/cpu.h> 18#include <linux/tracehook.h> 19 20#include "../kernel/sched/sched.h" 21#include "io-wq.h" 22 23#define WORKER_IDLE_TIMEOUT (5 * HZ) 24 25enum { 26 IO_WORKER_F_UP = 1, /* up and active */ 27 IO_WORKER_F_RUNNING = 2, /* account as running */ 28 IO_WORKER_F_FREE = 4, /* worker on free list */ 29 IO_WORKER_F_FIXED = 8, /* static idle worker */ 30 IO_WORKER_F_BOUND = 16, /* is doing bounded work */ 31}; 32 33enum { 34 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 35}; 36 37enum { 38 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ 39}; 40 41/* 42 * One for each thread in a wqe pool 43 */ 44struct io_worker { 45 refcount_t ref; 46 unsigned flags; 47 struct hlist_nulls_node nulls_node; 48 struct list_head all_list; 49 struct task_struct *task; 50 struct io_wqe *wqe; 51 52 struct io_wq_work *cur_work; 53 spinlock_t lock; 54 55 struct completion ref_done; 56 57 struct rcu_head rcu; 58}; 59 60#if BITS_PER_LONG == 64 61#define IO_WQ_HASH_ORDER 6 62#else 63#define IO_WQ_HASH_ORDER 5 64#endif 65 66#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 67 68struct io_wqe_acct { 69 unsigned nr_workers; 70 unsigned max_workers; 71 atomic_t nr_running; 72}; 73 74enum { 75 IO_WQ_ACCT_BOUND, 76 IO_WQ_ACCT_UNBOUND, 77}; 78 79/* 80 * Per-node worker thread pool 81 */ 82struct io_wqe { 83 struct { 84 raw_spinlock_t lock; 85 struct io_wq_work_list work_list; 86 unsigned flags; 87 } ____cacheline_aligned_in_smp; 88 89 int node; 90 struct io_wqe_acct acct[2]; 91 92 struct hlist_nulls_head free_list; 93 struct list_head all_list; 94 95 struct wait_queue_entry wait; 96 97 struct io_wq *wq; 98 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 99}; 100 101/* 102 * Per io_wq state 103 */ 104struct io_wq { 105 struct io_wqe **wqes; 106 unsigned long state; 107 108 free_work_fn *free_work; 109 io_wq_work_fn *do_work; 110 111 struct task_struct *manager; 112 113 struct io_wq_hash *hash; 114 115 refcount_t refs; 116 struct completion exited; 117 118 atomic_t worker_refs; 119 struct completion worker_done; 120 121 struct hlist_node cpuhp_node; 122 123 pid_t task_pid; 124}; 125 126static enum cpuhp_state io_wq_online; 127 128struct io_cb_cancel_data { 129 work_cancel_fn *fn; 130 void *data; 131 int nr_running; 132 int nr_pending; 133 bool cancel_all; 134}; 135 136static void io_wqe_cancel_pending_work(struct io_wqe *wqe, 137 struct io_cb_cancel_data *match); 138 139static bool io_worker_get(struct io_worker *worker) 140{ 141 return refcount_inc_not_zero(&worker->ref); 142} 143 144static void io_worker_release(struct io_worker *worker) 145{ 146 if (refcount_dec_and_test(&worker->ref)) 147 complete(&worker->ref_done); 148} 149 150static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, 151 struct io_wq_work *work) 152{ 153 if (work->flags & IO_WQ_WORK_UNBOUND) 154 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 155 156 return &wqe->acct[IO_WQ_ACCT_BOUND]; 157} 158 159static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker) 160{ 161 struct io_wqe *wqe = worker->wqe; 162 163 if (worker->flags & IO_WORKER_F_BOUND) 164 return &wqe->acct[IO_WQ_ACCT_BOUND]; 165 166 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 167} 168 169static void io_worker_exit(struct io_worker *worker) 170{ 171 struct io_wqe *wqe = worker->wqe; 172 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 173 unsigned flags; 174 175 if (refcount_dec_and_test(&worker->ref)) 176 complete(&worker->ref_done); 177 wait_for_completion(&worker->ref_done); 178 179 preempt_disable(); 180 current->flags &= ~PF_IO_WORKER; 181 flags = worker->flags; 182 worker->flags = 0; 183 if (flags & IO_WORKER_F_RUNNING) 184 atomic_dec(&acct->nr_running); 185 worker->flags = 0; 186 preempt_enable(); 187 188 raw_spin_lock_irq(&wqe->lock); 189 if (flags & IO_WORKER_F_FREE) 190 hlist_nulls_del_rcu(&worker->nulls_node); 191 list_del_rcu(&worker->all_list); 192 acct->nr_workers--; 193 raw_spin_unlock_irq(&wqe->lock); 194 195 kfree_rcu(worker, rcu); 196 if (atomic_dec_and_test(&wqe->wq->worker_refs)) 197 complete(&wqe->wq->worker_done); 198 do_exit(0); 199} 200 201static inline bool io_wqe_run_queue(struct io_wqe *wqe) 202 __must_hold(wqe->lock) 203{ 204 if (!wq_list_empty(&wqe->work_list) && 205 !(wqe->flags & IO_WQE_FLAG_STALLED)) 206 return true; 207 return false; 208} 209 210/* 211 * Check head of free list for an available worker. If one isn't available, 212 * caller must wake up the wq manager to create one. 213 */ 214static bool io_wqe_activate_free_worker(struct io_wqe *wqe) 215 __must_hold(RCU) 216{ 217 struct hlist_nulls_node *n; 218 struct io_worker *worker; 219 220 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); 221 if (is_a_nulls(n)) 222 return false; 223 224 worker = hlist_nulls_entry(n, struct io_worker, nulls_node); 225 if (io_worker_get(worker)) { 226 wake_up_process(worker->task); 227 io_worker_release(worker); 228 return true; 229 } 230 231 return false; 232} 233 234/* 235 * We need a worker. If we find a free one, we're good. If not, and we're 236 * below the max number of workers, wake up the manager to create one. 237 */ 238static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) 239{ 240 bool ret; 241 242 /* 243 * Most likely an attempt to queue unbounded work on an io_wq that 244 * wasn't setup with any unbounded workers. 245 */ 246 WARN_ON_ONCE(!acct->max_workers); 247 248 rcu_read_lock(); 249 ret = io_wqe_activate_free_worker(wqe); 250 rcu_read_unlock(); 251 252 if (!ret && acct->nr_workers < acct->max_workers) 253 wake_up_process(wqe->wq->manager); 254} 255 256static void io_wqe_inc_running(struct io_worker *worker) 257{ 258 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 259 260 atomic_inc(&acct->nr_running); 261} 262 263static void io_wqe_dec_running(struct io_worker *worker) 264 __must_hold(wqe->lock) 265{ 266 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 267 struct io_wqe *wqe = worker->wqe; 268 269 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) 270 io_wqe_wake_worker(wqe, acct); 271} 272 273/* 274 * Worker will start processing some work. Move it to the busy list, if 275 * it's currently on the freelist 276 */ 277static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, 278 struct io_wq_work *work) 279 __must_hold(wqe->lock) 280{ 281 bool worker_bound, work_bound; 282 283 if (worker->flags & IO_WORKER_F_FREE) { 284 worker->flags &= ~IO_WORKER_F_FREE; 285 hlist_nulls_del_init_rcu(&worker->nulls_node); 286 } 287 288 /* 289 * If worker is moving from bound to unbound (or vice versa), then 290 * ensure we update the running accounting. 291 */ 292 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; 293 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; 294 if (worker_bound != work_bound) { 295 io_wqe_dec_running(worker); 296 if (work_bound) { 297 worker->flags |= IO_WORKER_F_BOUND; 298 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; 299 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; 300 } else { 301 worker->flags &= ~IO_WORKER_F_BOUND; 302 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; 303 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; 304 } 305 io_wqe_inc_running(worker); 306 } 307} 308 309/* 310 * No work, worker going to sleep. Move to freelist, and unuse mm if we 311 * have one attached. Dropping the mm may potentially sleep, so we drop 312 * the lock in that case and return success. Since the caller has to 313 * retry the loop in that case (we changed task state), we don't regrab 314 * the lock if we return success. 315 */ 316static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) 317 __must_hold(wqe->lock) 318{ 319 if (!(worker->flags & IO_WORKER_F_FREE)) { 320 worker->flags |= IO_WORKER_F_FREE; 321 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 322 } 323} 324 325static inline unsigned int io_get_work_hash(struct io_wq_work *work) 326{ 327 return work->flags >> IO_WQ_HASH_SHIFT; 328} 329 330static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) 331{ 332 struct io_wq *wq = wqe->wq; 333 334 spin_lock(&wq->hash->wait.lock); 335 if (list_empty(&wqe->wait.entry)) { 336 __add_wait_queue(&wq->hash->wait, &wqe->wait); 337 if (!test_bit(hash, &wq->hash->map)) { 338 __set_current_state(TASK_RUNNING); 339 list_del_init(&wqe->wait.entry); 340 } 341 } 342 spin_unlock(&wq->hash->wait.lock); 343} 344 345static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) 346 __must_hold(wqe->lock) 347{ 348 struct io_wq_work_node *node, *prev; 349 struct io_wq_work *work, *tail; 350 unsigned int stall_hash = -1U; 351 352 wq_list_for_each(node, prev, &wqe->work_list) { 353 unsigned int hash; 354 355 work = container_of(node, struct io_wq_work, list); 356 357 /* not hashed, can run anytime */ 358 if (!io_wq_is_hashed(work)) { 359 wq_list_del(&wqe->work_list, node, prev); 360 return work; 361 } 362 363 hash = io_get_work_hash(work); 364 /* all items with this hash lie in [work, tail] */ 365 tail = wqe->hash_tail[hash]; 366 367 /* hashed, can run if not already running */ 368 if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { 369 wqe->hash_tail[hash] = NULL; 370 wq_list_cut(&wqe->work_list, &tail->list, prev); 371 return work; 372 } 373 if (stall_hash == -1U) 374 stall_hash = hash; 375 /* fast forward to a next hash, for-each will fix up @prev */ 376 node = &tail->list; 377 } 378 379 if (stall_hash != -1U) { 380 raw_spin_unlock(&wqe->lock); 381 io_wait_on_hash(wqe, stall_hash); 382 raw_spin_lock(&wqe->lock); 383 } 384 385 return NULL; 386} 387 388static bool io_flush_signals(void) 389{ 390 if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) { 391 __set_current_state(TASK_RUNNING); 392 tracehook_notify_signal(); 393 return true; 394 } 395 return false; 396} 397 398static void io_assign_current_work(struct io_worker *worker, 399 struct io_wq_work *work) 400{ 401 if (work) { 402 io_flush_signals(); 403 cond_resched(); 404 } 405 406 spin_lock_irq(&worker->lock); 407 worker->cur_work = work; 408 spin_unlock_irq(&worker->lock); 409} 410 411static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); 412 413static void io_worker_handle_work(struct io_worker *worker) 414 __releases(wqe->lock) 415{ 416 struct io_wqe *wqe = worker->wqe; 417 struct io_wq *wq = wqe->wq; 418 419 do { 420 struct io_wq_work *work; 421get_next: 422 /* 423 * If we got some work, mark us as busy. If we didn't, but 424 * the list isn't empty, it means we stalled on hashed work. 425 * Mark us stalled so we don't keep looking for work when we 426 * can't make progress, any work completion or insertion will 427 * clear the stalled flag. 428 */ 429 work = io_get_next_work(wqe); 430 if (work) 431 __io_worker_busy(wqe, worker, work); 432 else if (!wq_list_empty(&wqe->work_list)) 433 wqe->flags |= IO_WQE_FLAG_STALLED; 434 435 raw_spin_unlock_irq(&wqe->lock); 436 if (!work) 437 break; 438 io_assign_current_work(worker, work); 439 __set_current_state(TASK_RUNNING); 440 441 /* handle a whole dependent link */ 442 do { 443 struct io_wq_work *next_hashed, *linked; 444 unsigned int hash = io_get_work_hash(work); 445 446 next_hashed = wq_next_work(work); 447 wq->do_work(work); 448 io_assign_current_work(worker, NULL); 449 450 linked = wq->free_work(work); 451 work = next_hashed; 452 if (!work && linked && !io_wq_is_hashed(linked)) { 453 work = linked; 454 linked = NULL; 455 } 456 io_assign_current_work(worker, work); 457 if (linked) 458 io_wqe_enqueue(wqe, linked); 459 460 if (hash != -1U && !next_hashed) { 461 clear_bit(hash, &wq->hash->map); 462 if (wq_has_sleeper(&wq->hash->wait)) 463 wake_up(&wq->hash->wait); 464 raw_spin_lock_irq(&wqe->lock); 465 wqe->flags &= ~IO_WQE_FLAG_STALLED; 466 /* skip unnecessary unlock-lock wqe->lock */ 467 if (!work) 468 goto get_next; 469 raw_spin_unlock_irq(&wqe->lock); 470 } 471 } while (work); 472 473 raw_spin_lock_irq(&wqe->lock); 474 } while (1); 475} 476 477static int io_wqe_worker(void *data) 478{ 479 struct io_worker *worker = data; 480 struct io_wqe *wqe = worker->wqe; 481 struct io_wq *wq = wqe->wq; 482 char buf[TASK_COMM_LEN]; 483 484 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 485 io_wqe_inc_running(worker); 486 487 sprintf(buf, "iou-wrk-%d", wq->task_pid); 488 set_task_comm(current, buf); 489 490 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 491 long ret; 492 493 set_current_state(TASK_INTERRUPTIBLE); 494loop: 495 raw_spin_lock_irq(&wqe->lock); 496 if (io_wqe_run_queue(wqe)) { 497 io_worker_handle_work(worker); 498 goto loop; 499 } 500 __io_worker_idle(wqe, worker); 501 raw_spin_unlock_irq(&wqe->lock); 502 if (io_flush_signals()) 503 continue; 504 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 505 if (signal_pending(current)) { 506 struct ksignal ksig; 507 508 if (!get_signal(&ksig)) 509 continue; 510 break; 511 } 512 if (ret) 513 continue; 514 /* timed out, exit unless we're the fixed worker */ 515 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 516 !(worker->flags & IO_WORKER_F_FIXED)) 517 break; 518 } 519 520 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 521 raw_spin_lock_irq(&wqe->lock); 522 if (!wq_list_empty(&wqe->work_list)) 523 io_worker_handle_work(worker); 524 else 525 raw_spin_unlock_irq(&wqe->lock); 526 } 527 528 io_worker_exit(worker); 529 return 0; 530} 531 532/* 533 * Called when a worker is scheduled in. Mark us as currently running. 534 */ 535void io_wq_worker_running(struct task_struct *tsk) 536{ 537 struct io_worker *worker = tsk->pf_io_worker; 538 539 if (!worker) 540 return; 541 if (!(worker->flags & IO_WORKER_F_UP)) 542 return; 543 if (worker->flags & IO_WORKER_F_RUNNING) 544 return; 545 worker->flags |= IO_WORKER_F_RUNNING; 546 io_wqe_inc_running(worker); 547} 548 549/* 550 * Called when worker is going to sleep. If there are no workers currently 551 * running and we have work pending, wake up a free one or have the manager 552 * set one up. 553 */ 554void io_wq_worker_sleeping(struct task_struct *tsk) 555{ 556 struct io_worker *worker = tsk->pf_io_worker; 557 558 if (!worker) 559 return; 560 if (!(worker->flags & IO_WORKER_F_UP)) 561 return; 562 if (!(worker->flags & IO_WORKER_F_RUNNING)) 563 return; 564 565 worker->flags &= ~IO_WORKER_F_RUNNING; 566 567 raw_spin_lock_irq(&worker->wqe->lock); 568 io_wqe_dec_running(worker); 569 raw_spin_unlock_irq(&worker->wqe->lock); 570} 571 572static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) 573{ 574 struct io_wqe_acct *acct = &wqe->acct[index]; 575 struct io_worker *worker; 576 struct task_struct *tsk; 577 578 __set_current_state(TASK_RUNNING); 579 580 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); 581 if (!worker) 582 return false; 583 584 refcount_set(&worker->ref, 1); 585 worker->nulls_node.pprev = NULL; 586 worker->wqe = wqe; 587 spin_lock_init(&worker->lock); 588 init_completion(&worker->ref_done); 589 590 atomic_inc(&wq->worker_refs); 591 592 tsk = create_io_thread(io_wqe_worker, worker, wqe->node); 593 if (IS_ERR(tsk)) { 594 if (atomic_dec_and_test(&wq->worker_refs)) 595 complete(&wq->worker_done); 596 kfree(worker); 597 return false; 598 } 599 600 tsk->pf_io_worker = worker; 601 worker->task = tsk; 602 set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node)); 603 tsk->flags |= PF_NO_SETAFFINITY; 604 605 raw_spin_lock_irq(&wqe->lock); 606 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 607 list_add_tail_rcu(&worker->all_list, &wqe->all_list); 608 worker->flags |= IO_WORKER_F_FREE; 609 if (index == IO_WQ_ACCT_BOUND) 610 worker->flags |= IO_WORKER_F_BOUND; 611 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) 612 worker->flags |= IO_WORKER_F_FIXED; 613 acct->nr_workers++; 614 raw_spin_unlock_irq(&wqe->lock); 615 wake_up_new_task(tsk); 616 return true; 617} 618 619static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) 620 __must_hold(wqe->lock) 621{ 622 struct io_wqe_acct *acct = &wqe->acct[index]; 623 624 if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) 625 return false; 626 /* if we have available workers or no work, no need */ 627 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) 628 return false; 629 return acct->nr_workers < acct->max_workers; 630} 631 632/* 633 * Iterate the passed in list and call the specific function for each 634 * worker that isn't exiting 635 */ 636static bool io_wq_for_each_worker(struct io_wqe *wqe, 637 bool (*func)(struct io_worker *, void *), 638 void *data) 639{ 640 struct io_worker *worker; 641 bool ret = false; 642 643 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { 644 if (io_worker_get(worker)) { 645 /* no task if node is/was offline */ 646 if (worker->task) 647 ret = func(worker, data); 648 io_worker_release(worker); 649 if (ret) 650 break; 651 } 652 } 653 654 return ret; 655} 656 657static bool io_wq_worker_wake(struct io_worker *worker, void *data) 658{ 659 set_notify_signal(worker->task); 660 wake_up_process(worker->task); 661 return false; 662} 663 664static void io_wq_check_workers(struct io_wq *wq) 665{ 666 int node; 667 668 for_each_node(node) { 669 struct io_wqe *wqe = wq->wqes[node]; 670 bool fork_worker[2] = { false, false }; 671 672 if (!node_online(node)) 673 continue; 674 675 raw_spin_lock_irq(&wqe->lock); 676 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) 677 fork_worker[IO_WQ_ACCT_BOUND] = true; 678 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) 679 fork_worker[IO_WQ_ACCT_UNBOUND] = true; 680 raw_spin_unlock_irq(&wqe->lock); 681 if (fork_worker[IO_WQ_ACCT_BOUND]) 682 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); 683 if (fork_worker[IO_WQ_ACCT_UNBOUND]) 684 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); 685 } 686} 687 688static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 689{ 690 return true; 691} 692 693static void io_wq_cancel_pending(struct io_wq *wq) 694{ 695 struct io_cb_cancel_data match = { 696 .fn = io_wq_work_match_all, 697 .cancel_all = true, 698 }; 699 int node; 700 701 for_each_node(node) 702 io_wqe_cancel_pending_work(wq->wqes[node], &match); 703} 704 705/* 706 * Manager thread. Tasked with creating new workers, if we need them. 707 */ 708static int io_wq_manager(void *data) 709{ 710 struct io_wq *wq = data; 711 char buf[TASK_COMM_LEN]; 712 int node; 713 714 sprintf(buf, "iou-mgr-%d", wq->task_pid); 715 set_task_comm(current, buf); 716 717 do { 718 set_current_state(TASK_INTERRUPTIBLE); 719 io_wq_check_workers(wq); 720 schedule_timeout(HZ); 721 if (signal_pending(current)) { 722 struct ksignal ksig; 723 724 if (!get_signal(&ksig)) 725 continue; 726 set_bit(IO_WQ_BIT_EXIT, &wq->state); 727 } 728 } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 729 730 io_wq_check_workers(wq); 731 732 rcu_read_lock(); 733 for_each_node(node) 734 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); 735 rcu_read_unlock(); 736 737 if (atomic_dec_and_test(&wq->worker_refs)) 738 complete(&wq->worker_done); 739 wait_for_completion(&wq->worker_done); 740 741 spin_lock_irq(&wq->hash->wait.lock); 742 for_each_node(node) 743 list_del_init(&wq->wqes[node]->wait.entry); 744 spin_unlock_irq(&wq->hash->wait.lock); 745 746 io_wq_cancel_pending(wq); 747 complete(&wq->exited); 748 do_exit(0); 749} 750 751static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) 752{ 753 struct io_wq *wq = wqe->wq; 754 755 do { 756 work->flags |= IO_WQ_WORK_CANCEL; 757 wq->do_work(work); 758 work = wq->free_work(work); 759 } while (work); 760} 761 762static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) 763{ 764 unsigned int hash; 765 struct io_wq_work *tail; 766 767 if (!io_wq_is_hashed(work)) { 768append: 769 wq_list_add_tail(&work->list, &wqe->work_list); 770 return; 771 } 772 773 hash = io_get_work_hash(work); 774 tail = wqe->hash_tail[hash]; 775 wqe->hash_tail[hash] = work; 776 if (!tail) 777 goto append; 778 779 wq_list_add_after(&work->list, &tail->list, &wqe->work_list); 780} 781 782static int io_wq_fork_manager(struct io_wq *wq) 783{ 784 struct task_struct *tsk; 785 786 if (wq->manager) 787 return 0; 788 789 WARN_ON_ONCE(test_bit(IO_WQ_BIT_EXIT, &wq->state)); 790 791 init_completion(&wq->worker_done); 792 atomic_set(&wq->worker_refs, 1); 793 tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE); 794 if (!IS_ERR(tsk)) { 795 wq->manager = get_task_struct(tsk); 796 wake_up_new_task(tsk); 797 return 0; 798 } 799 800 if (atomic_dec_and_test(&wq->worker_refs)) 801 complete(&wq->worker_done); 802 803 return PTR_ERR(tsk); 804} 805 806static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) 807{ 808 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 809 int work_flags; 810 unsigned long flags; 811 812 /* Can only happen if manager creation fails after exec */ 813 if (io_wq_fork_manager(wqe->wq) || 814 test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) { 815 io_run_cancel(work, wqe); 816 return; 817 } 818 819 work_flags = work->flags; 820 raw_spin_lock_irqsave(&wqe->lock, flags); 821 io_wqe_insert_work(wqe, work); 822 wqe->flags &= ~IO_WQE_FLAG_STALLED; 823 raw_spin_unlock_irqrestore(&wqe->lock, flags); 824 825 if ((work_flags & IO_WQ_WORK_CONCURRENT) || 826 !atomic_read(&acct->nr_running)) 827 io_wqe_wake_worker(wqe, acct); 828} 829 830void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 831{ 832 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 833 834 io_wqe_enqueue(wqe, work); 835} 836 837/* 838 * Work items that hash to the same value will not be done in parallel. 839 * Used to limit concurrent writes, generally hashed by inode. 840 */ 841void io_wq_hash_work(struct io_wq_work *work, void *val) 842{ 843 unsigned int bit; 844 845 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 846 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 847} 848 849static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 850{ 851 struct io_cb_cancel_data *match = data; 852 unsigned long flags; 853 854 /* 855 * Hold the lock to avoid ->cur_work going out of scope, caller 856 * may dereference the passed in work. 857 */ 858 spin_lock_irqsave(&worker->lock, flags); 859 if (worker->cur_work && 860 match->fn(worker->cur_work, match->data)) { 861 set_notify_signal(worker->task); 862 match->nr_running++; 863 } 864 spin_unlock_irqrestore(&worker->lock, flags); 865 866 return match->nr_running && !match->cancel_all; 867} 868 869static inline void io_wqe_remove_pending(struct io_wqe *wqe, 870 struct io_wq_work *work, 871 struct io_wq_work_node *prev) 872{ 873 unsigned int hash = io_get_work_hash(work); 874 struct io_wq_work *prev_work = NULL; 875 876 if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) { 877 if (prev) 878 prev_work = container_of(prev, struct io_wq_work, list); 879 if (prev_work && io_get_work_hash(prev_work) == hash) 880 wqe->hash_tail[hash] = prev_work; 881 else 882 wqe->hash_tail[hash] = NULL; 883 } 884 wq_list_del(&wqe->work_list, &work->list, prev); 885} 886 887static void io_wqe_cancel_pending_work(struct io_wqe *wqe, 888 struct io_cb_cancel_data *match) 889{ 890 struct io_wq_work_node *node, *prev; 891 struct io_wq_work *work; 892 unsigned long flags; 893 894retry: 895 raw_spin_lock_irqsave(&wqe->lock, flags); 896 wq_list_for_each(node, prev, &wqe->work_list) { 897 work = container_of(node, struct io_wq_work, list); 898 if (!match->fn(work, match->data)) 899 continue; 900 io_wqe_remove_pending(wqe, work, prev); 901 raw_spin_unlock_irqrestore(&wqe->lock, flags); 902 io_run_cancel(work, wqe); 903 match->nr_pending++; 904 if (!match->cancel_all) 905 return; 906 907 /* not safe to continue after unlock */ 908 goto retry; 909 } 910 raw_spin_unlock_irqrestore(&wqe->lock, flags); 911} 912 913static void io_wqe_cancel_running_work(struct io_wqe *wqe, 914 struct io_cb_cancel_data *match) 915{ 916 rcu_read_lock(); 917 io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); 918 rcu_read_unlock(); 919} 920 921enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 922 void *data, bool cancel_all) 923{ 924 struct io_cb_cancel_data match = { 925 .fn = cancel, 926 .data = data, 927 .cancel_all = cancel_all, 928 }; 929 int node; 930 931 /* 932 * First check pending list, if we're lucky we can just remove it 933 * from there. CANCEL_OK means that the work is returned as-new, 934 * no completion will be posted for it. 935 */ 936 for_each_node(node) { 937 struct io_wqe *wqe = wq->wqes[node]; 938 939 io_wqe_cancel_pending_work(wqe, &match); 940 if (match.nr_pending && !match.cancel_all) 941 return IO_WQ_CANCEL_OK; 942 } 943 944 /* 945 * Now check if a free (going busy) or busy worker has the work 946 * currently running. If we find it there, we'll return CANCEL_RUNNING 947 * as an indication that we attempt to signal cancellation. The 948 * completion will run normally in this case. 949 */ 950 for_each_node(node) { 951 struct io_wqe *wqe = wq->wqes[node]; 952 953 io_wqe_cancel_running_work(wqe, &match); 954 if (match.nr_running && !match.cancel_all) 955 return IO_WQ_CANCEL_RUNNING; 956 } 957 958 if (match.nr_running) 959 return IO_WQ_CANCEL_RUNNING; 960 if (match.nr_pending) 961 return IO_WQ_CANCEL_OK; 962 return IO_WQ_CANCEL_NOTFOUND; 963} 964 965static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, 966 int sync, void *key) 967{ 968 struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); 969 int ret; 970 971 list_del_init(&wait->entry); 972 973 rcu_read_lock(); 974 ret = io_wqe_activate_free_worker(wqe); 975 rcu_read_unlock(); 976 977 if (!ret) 978 wake_up_process(wqe->wq->manager); 979 980 return 1; 981} 982 983struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 984{ 985 int ret = -ENOMEM, node; 986 struct io_wq *wq; 987 988 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 989 return ERR_PTR(-EINVAL); 990 991 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 992 if (!wq) 993 return ERR_PTR(-ENOMEM); 994 995 wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL); 996 if (!wq->wqes) 997 goto err_wq; 998 999 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1000 if (ret) 1001 goto err_wqes; 1002 1003 refcount_inc(&data->hash->refs); 1004 wq->hash = data->hash; 1005 wq->free_work = data->free_work; 1006 wq->do_work = data->do_work; 1007 1008 ret = -ENOMEM; 1009 for_each_node(node) { 1010 struct io_wqe *wqe; 1011 int alloc_node = node; 1012 1013 if (!node_online(alloc_node)) 1014 alloc_node = NUMA_NO_NODE; 1015 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); 1016 if (!wqe) 1017 goto err; 1018 wq->wqes[node] = wqe; 1019 wqe->node = alloc_node; 1020 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1021 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); 1022 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1023 task_rlimit(current, RLIMIT_NPROC); 1024 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); 1025 wqe->wait.func = io_wqe_hash_wake; 1026 INIT_LIST_HEAD(&wqe->wait.entry); 1027 wqe->wq = wq; 1028 raw_spin_lock_init(&wqe->lock); 1029 INIT_WQ_LIST(&wqe->work_list); 1030 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); 1031 INIT_LIST_HEAD(&wqe->all_list); 1032 } 1033 1034 wq->task_pid = current->pid; 1035 init_completion(&wq->exited); 1036 refcount_set(&wq->refs, 1); 1037 1038 ret = io_wq_fork_manager(wq); 1039 if (!ret) 1040 return wq; 1041err: 1042 io_wq_put_hash(data->hash); 1043 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1044 for_each_node(node) 1045 kfree(wq->wqes[node]); 1046err_wqes: 1047 kfree(wq->wqes); 1048err_wq: 1049 kfree(wq); 1050 return ERR_PTR(ret); 1051} 1052 1053static void io_wq_destroy_manager(struct io_wq *wq) 1054{ 1055 if (wq->manager) { 1056 wake_up_process(wq->manager); 1057 wait_for_completion(&wq->exited); 1058 put_task_struct(wq->manager); 1059 wq->manager = NULL; 1060 } 1061} 1062 1063static void io_wq_destroy(struct io_wq *wq) 1064{ 1065 int node; 1066 1067 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1068 1069 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1070 io_wq_destroy_manager(wq); 1071 1072 for_each_node(node) { 1073 struct io_wqe *wqe = wq->wqes[node]; 1074 struct io_cb_cancel_data match = { 1075 .fn = io_wq_work_match_all, 1076 .cancel_all = true, 1077 }; 1078 io_wqe_cancel_pending_work(wqe, &match); 1079 kfree(wqe); 1080 } 1081 io_wq_put_hash(wq->hash); 1082 kfree(wq->wqes); 1083 kfree(wq); 1084} 1085 1086void io_wq_put(struct io_wq *wq) 1087{ 1088 if (refcount_dec_and_test(&wq->refs)) 1089 io_wq_destroy(wq); 1090} 1091 1092void io_wq_put_and_exit(struct io_wq *wq) 1093{ 1094 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1095 io_wq_destroy_manager(wq); 1096 io_wq_put(wq); 1097} 1098 1099static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1100{ 1101 struct task_struct *task = worker->task; 1102 struct rq_flags rf; 1103 struct rq *rq; 1104 1105 rq = task_rq_lock(task, &rf); 1106 do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node)); 1107 task->flags |= PF_NO_SETAFFINITY; 1108 task_rq_unlock(rq, task, &rf); 1109 return false; 1110} 1111 1112static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1113{ 1114 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1115 int i; 1116 1117 rcu_read_lock(); 1118 for_each_node(i) 1119 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL); 1120 rcu_read_unlock(); 1121 return 0; 1122} 1123 1124static __init int io_wq_init(void) 1125{ 1126 int ret; 1127 1128 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1129 io_wq_cpu_online, NULL); 1130 if (ret < 0) 1131 return ret; 1132 io_wq_online = ret; 1133 return 0; 1134} 1135subsys_initcall(io_wq_init);