at master 36 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/percpu.h> 13#include <linux/slab.h> 14#include <linux/rculist_nulls.h> 15#include <linux/cpu.h> 16#include <linux/cpuset.h> 17#include <linux/task_work.h> 18#include <linux/audit.h> 19#include <linux/mmu_context.h> 20#include <uapi/linux/io_uring.h> 21 22#include "io-wq.h" 23#include "slist.h" 24#include "io_uring.h" 25 26#define WORKER_IDLE_TIMEOUT (5 * HZ) 27#define WORKER_INIT_LIMIT 3 28 29enum { 30 IO_WORKER_F_UP = 0, /* up and active */ 31 IO_WORKER_F_RUNNING = 1, /* account as running */ 32 IO_WORKER_F_FREE = 2, /* worker on free list */ 33}; 34 35enum { 36 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 37}; 38 39enum { 40 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 41}; 42 43/* 44 * One for each thread in a wq pool 45 */ 46struct io_worker { 47 refcount_t ref; 48 unsigned long flags; 49 struct hlist_nulls_node nulls_node; 50 struct list_head all_list; 51 struct task_struct *task; 52 struct io_wq *wq; 53 struct io_wq_acct *acct; 54 55 struct io_wq_work *cur_work; 56 raw_spinlock_t lock; 57 58 struct completion ref_done; 59 60 unsigned long create_state; 61 struct callback_head create_work; 62 int init_retries; 63 64 union { 65 struct rcu_head rcu; 66 struct delayed_work work; 67 }; 68}; 69 70#if BITS_PER_LONG == 64 71#define IO_WQ_HASH_ORDER 6 72#else 73#define IO_WQ_HASH_ORDER 5 74#endif 75 76#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 77 78struct io_wq_acct { 79 /** 80 * Protects access to the worker lists. 81 */ 82 raw_spinlock_t workers_lock; 83 84 unsigned nr_workers; 85 unsigned max_workers; 86 atomic_t nr_running; 87 88 /** 89 * The list of free workers. Protected by #workers_lock 90 * (write) and RCU (read). 91 */ 92 struct hlist_nulls_head free_list; 93 94 /** 95 * The list of all workers. Protected by #workers_lock 96 * (write) and RCU (read). 97 */ 98 struct list_head all_list; 99 100 raw_spinlock_t lock; 101 struct io_wq_work_list work_list; 102 unsigned long flags; 103}; 104 105enum { 106 IO_WQ_ACCT_BOUND, 107 IO_WQ_ACCT_UNBOUND, 108 IO_WQ_ACCT_NR, 109}; 110 111/* 112 * Per io_wq state 113 */ 114struct io_wq { 115 unsigned long state; 116 117 struct io_wq_hash *hash; 118 119 atomic_t worker_refs; 120 struct completion worker_done; 121 122 struct hlist_node cpuhp_node; 123 124 struct task_struct *task; 125 126 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 127 128 struct wait_queue_entry wait; 129 130 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 131 132 cpumask_var_t cpu_mask; 133}; 134 135static enum cpuhp_state io_wq_online; 136 137struct io_cb_cancel_data { 138 work_cancel_fn *fn; 139 void *data; 140 int nr_running; 141 int nr_pending; 142 bool cancel_all; 143}; 144 145static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); 146static void io_wq_dec_running(struct io_worker *worker); 147static bool io_acct_cancel_pending_work(struct io_wq *wq, 148 struct io_wq_acct *acct, 149 struct io_cb_cancel_data *match); 150static void create_worker_cb(struct callback_head *cb); 151static void io_wq_cancel_tw_create(struct io_wq *wq); 152 153static inline unsigned int __io_get_work_hash(unsigned int work_flags) 154{ 155 return work_flags >> IO_WQ_HASH_SHIFT; 156} 157 158static inline unsigned int io_get_work_hash(struct io_wq_work *work) 159{ 160 return __io_get_work_hash(atomic_read(&work->flags)); 161} 162 163static bool io_worker_get(struct io_worker *worker) 164{ 165 return refcount_inc_not_zero(&worker->ref); 166} 167 168static void io_worker_release(struct io_worker *worker) 169{ 170 if (refcount_dec_and_test(&worker->ref)) 171 complete(&worker->ref_done); 172} 173 174static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 175{ 176 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 177} 178 179static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 180 unsigned int work_flags) 181{ 182 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); 183} 184 185static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 186{ 187 return worker->acct; 188} 189 190static void io_worker_ref_put(struct io_wq *wq) 191{ 192 if (atomic_dec_and_test(&wq->worker_refs)) 193 complete(&wq->worker_done); 194} 195 196bool io_wq_worker_stopped(void) 197{ 198 struct io_worker *worker = current->worker_private; 199 200 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 201 return true; 202 203 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 204} 205 206static void io_worker_cancel_cb(struct io_worker *worker) 207{ 208 struct io_wq_acct *acct = io_wq_get_acct(worker); 209 struct io_wq *wq = worker->wq; 210 211 atomic_dec(&acct->nr_running); 212 raw_spin_lock(&acct->workers_lock); 213 acct->nr_workers--; 214 raw_spin_unlock(&acct->workers_lock); 215 io_worker_ref_put(wq); 216 clear_bit_unlock(0, &worker->create_state); 217 io_worker_release(worker); 218} 219 220static bool io_task_worker_match(struct callback_head *cb, void *data) 221{ 222 struct io_worker *worker; 223 224 if (cb->func != create_worker_cb) 225 return false; 226 worker = container_of(cb, struct io_worker, create_work); 227 return worker == data; 228} 229 230static void io_worker_exit(struct io_worker *worker) 231{ 232 struct io_wq *wq = worker->wq; 233 struct io_wq_acct *acct = io_wq_get_acct(worker); 234 235 while (1) { 236 struct callback_head *cb = task_work_cancel_match(wq->task, 237 io_task_worker_match, worker); 238 239 if (!cb) 240 break; 241 io_worker_cancel_cb(worker); 242 } 243 244 io_worker_release(worker); 245 wait_for_completion(&worker->ref_done); 246 247 raw_spin_lock(&acct->workers_lock); 248 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 249 hlist_nulls_del_rcu(&worker->nulls_node); 250 list_del_rcu(&worker->all_list); 251 raw_spin_unlock(&acct->workers_lock); 252 io_wq_dec_running(worker); 253 /* 254 * this worker is a goner, clear ->worker_private to avoid any 255 * inc/dec running calls that could happen as part of exit from 256 * touching 'worker'. 257 */ 258 current->worker_private = NULL; 259 260 kfree_rcu(worker, rcu); 261 io_worker_ref_put(wq); 262 do_exit(0); 263} 264 265static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 266{ 267 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 268 !wq_list_empty(&acct->work_list); 269} 270 271/* 272 * If there's work to do, returns true with acct->lock acquired. If not, 273 * returns false with no lock held. 274 */ 275static inline bool io_acct_run_queue(struct io_wq_acct *acct) 276 __acquires(&acct->lock) 277{ 278 raw_spin_lock(&acct->lock); 279 if (__io_acct_run_queue(acct)) 280 return true; 281 282 raw_spin_unlock(&acct->lock); 283 return false; 284} 285 286/* 287 * Check head of free list for an available worker. If one isn't available, 288 * caller must create one. 289 */ 290static bool io_acct_activate_free_worker(struct io_wq_acct *acct) 291 __must_hold(RCU) 292{ 293 struct hlist_nulls_node *n; 294 struct io_worker *worker; 295 296 /* 297 * Iterate free_list and see if we can find an idle worker to 298 * activate. If a given worker is on the free_list but in the process 299 * of exiting, keep trying. 300 */ 301 hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { 302 if (!io_worker_get(worker)) 303 continue; 304 /* 305 * If the worker is already running, it's either already 306 * starting work or finishing work. In either case, if it does 307 * to go sleep, we'll kick off a new task for this work anyway. 308 */ 309 wake_up_process(worker->task); 310 io_worker_release(worker); 311 return true; 312 } 313 314 return false; 315} 316 317/* 318 * We need a worker. If we find a free one, we're good. If not, and we're 319 * below the max number of workers, create one. 320 */ 321static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 322{ 323 /* 324 * Most likely an attempt to queue unbounded work on an io_wq that 325 * wasn't setup with any unbounded workers. 326 */ 327 if (unlikely(!acct->max_workers)) 328 pr_warn_once("io-wq is not configured for unbound workers"); 329 330 raw_spin_lock(&acct->workers_lock); 331 if (acct->nr_workers >= acct->max_workers) { 332 raw_spin_unlock(&acct->workers_lock); 333 return true; 334 } 335 acct->nr_workers++; 336 raw_spin_unlock(&acct->workers_lock); 337 atomic_inc(&acct->nr_running); 338 atomic_inc(&wq->worker_refs); 339 return create_io_worker(wq, acct); 340} 341 342static void io_wq_inc_running(struct io_worker *worker) 343{ 344 struct io_wq_acct *acct = io_wq_get_acct(worker); 345 346 atomic_inc(&acct->nr_running); 347} 348 349static void create_worker_cb(struct callback_head *cb) 350{ 351 struct io_worker *worker; 352 struct io_wq *wq; 353 354 struct io_wq_acct *acct; 355 bool activated_free_worker, do_create = false; 356 357 worker = container_of(cb, struct io_worker, create_work); 358 wq = worker->wq; 359 acct = worker->acct; 360 361 rcu_read_lock(); 362 activated_free_worker = io_acct_activate_free_worker(acct); 363 rcu_read_unlock(); 364 if (activated_free_worker) 365 goto no_need_create; 366 367 raw_spin_lock(&acct->workers_lock); 368 369 if (acct->nr_workers < acct->max_workers) { 370 acct->nr_workers++; 371 do_create = true; 372 } 373 raw_spin_unlock(&acct->workers_lock); 374 if (do_create) { 375 create_io_worker(wq, acct); 376 } else { 377no_need_create: 378 atomic_dec(&acct->nr_running); 379 io_worker_ref_put(wq); 380 } 381 clear_bit_unlock(0, &worker->create_state); 382 io_worker_release(worker); 383} 384 385static bool io_queue_worker_create(struct io_worker *worker, 386 struct io_wq_acct *acct, 387 task_work_func_t func) 388{ 389 struct io_wq *wq = worker->wq; 390 391 /* raced with exit, just ignore create call */ 392 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 393 goto fail; 394 if (!io_worker_get(worker)) 395 goto fail; 396 /* 397 * create_state manages ownership of create_work/index. We should 398 * only need one entry per worker, as the worker going to sleep 399 * will trigger the condition, and waking will clear it once it 400 * runs the task_work. 401 */ 402 if (test_bit(0, &worker->create_state) || 403 test_and_set_bit_lock(0, &worker->create_state)) 404 goto fail_release; 405 406 atomic_inc(&wq->worker_refs); 407 init_task_work(&worker->create_work, func); 408 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 409 /* 410 * EXIT may have been set after checking it above, check after 411 * adding the task_work and remove any creation item if it is 412 * now set. wq exit does that too, but we can have added this 413 * work item after we canceled in io_wq_exit_workers(). 414 */ 415 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 416 io_wq_cancel_tw_create(wq); 417 io_worker_ref_put(wq); 418 return true; 419 } 420 io_worker_ref_put(wq); 421 clear_bit_unlock(0, &worker->create_state); 422fail_release: 423 io_worker_release(worker); 424fail: 425 atomic_dec(&acct->nr_running); 426 io_worker_ref_put(wq); 427 return false; 428} 429 430/* Defer if current and next work are both hashed to the same chain */ 431static bool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct) 432{ 433 unsigned int hash, work_flags; 434 struct io_wq_work *next; 435 436 lockdep_assert_held(&acct->lock); 437 438 work_flags = atomic_read(&work->flags); 439 if (!__io_wq_is_hashed(work_flags)) 440 return false; 441 442 /* should not happen, io_acct_run_queue() said we had work */ 443 if (wq_list_empty(&acct->work_list)) 444 return true; 445 446 hash = __io_get_work_hash(work_flags); 447 next = container_of(acct->work_list.first, struct io_wq_work, list); 448 work_flags = atomic_read(&next->flags); 449 if (!__io_wq_is_hashed(work_flags)) 450 return false; 451 return hash == __io_get_work_hash(work_flags); 452} 453 454static void io_wq_dec_running(struct io_worker *worker) 455{ 456 struct io_wq_acct *acct = io_wq_get_acct(worker); 457 struct io_wq *wq = worker->wq; 458 459 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 460 return; 461 462 if (!atomic_dec_and_test(&acct->nr_running)) 463 return; 464 if (!worker->cur_work) 465 return; 466 if (!io_acct_run_queue(acct)) 467 return; 468 if (io_wq_hash_defer(worker->cur_work, acct)) { 469 raw_spin_unlock(&acct->lock); 470 return; 471 } 472 473 raw_spin_unlock(&acct->lock); 474 atomic_inc(&acct->nr_running); 475 atomic_inc(&wq->worker_refs); 476 io_queue_worker_create(worker, acct, create_worker_cb); 477} 478 479/* 480 * Worker will start processing some work. Move it to the busy list, if 481 * it's currently on the freelist 482 */ 483static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) 484{ 485 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 486 clear_bit(IO_WORKER_F_FREE, &worker->flags); 487 raw_spin_lock(&acct->workers_lock); 488 hlist_nulls_del_init_rcu(&worker->nulls_node); 489 raw_spin_unlock(&acct->workers_lock); 490 } 491} 492 493/* 494 * No work, worker going to sleep. Move to freelist. 495 */ 496static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) 497 __must_hold(acct->workers_lock) 498{ 499 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 500 set_bit(IO_WORKER_F_FREE, &worker->flags); 501 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 502 } 503} 504 505static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 506{ 507 bool ret = false; 508 509 spin_lock_irq(&wq->hash->wait.lock); 510 if (list_empty(&wq->wait.entry)) { 511 __add_wait_queue(&wq->hash->wait, &wq->wait); 512 if (!test_bit(hash, &wq->hash->map)) { 513 __set_current_state(TASK_RUNNING); 514 list_del_init(&wq->wait.entry); 515 ret = true; 516 } 517 } 518 spin_unlock_irq(&wq->hash->wait.lock); 519 return ret; 520} 521 522static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 523 struct io_wq *wq) 524 __must_hold(acct->lock) 525{ 526 struct io_wq_work_node *node, *prev; 527 struct io_wq_work *work, *tail; 528 unsigned int stall_hash = -1U; 529 530 wq_list_for_each(node, prev, &acct->work_list) { 531 unsigned int work_flags; 532 unsigned int hash; 533 534 work = container_of(node, struct io_wq_work, list); 535 536 /* not hashed, can run anytime */ 537 work_flags = atomic_read(&work->flags); 538 if (!__io_wq_is_hashed(work_flags)) { 539 wq_list_del(&acct->work_list, node, prev); 540 return work; 541 } 542 543 hash = __io_get_work_hash(work_flags); 544 /* all items with this hash lie in [work, tail] */ 545 tail = wq->hash_tail[hash]; 546 547 /* hashed, can run if not already running */ 548 if (!test_and_set_bit(hash, &wq->hash->map)) { 549 wq->hash_tail[hash] = NULL; 550 wq_list_cut(&acct->work_list, &tail->list, prev); 551 return work; 552 } 553 if (stall_hash == -1U) 554 stall_hash = hash; 555 /* fast forward to a next hash, for-each will fix up @prev */ 556 node = &tail->list; 557 } 558 559 if (stall_hash != -1U) { 560 bool unstalled; 561 562 /* 563 * Set this before dropping the lock to avoid racing with new 564 * work being added and clearing the stalled bit. 565 */ 566 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 567 raw_spin_unlock(&acct->lock); 568 unstalled = io_wait_on_hash(wq, stall_hash); 569 raw_spin_lock(&acct->lock); 570 if (unstalled) { 571 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 572 if (wq_has_sleeper(&wq->hash->wait)) 573 wake_up(&wq->hash->wait); 574 } 575 } 576 577 return NULL; 578} 579 580static void io_assign_current_work(struct io_worker *worker, 581 struct io_wq_work *work) 582{ 583 if (work) { 584 io_run_task_work(); 585 cond_resched(); 586 } 587 588 raw_spin_lock(&worker->lock); 589 worker->cur_work = work; 590 raw_spin_unlock(&worker->lock); 591} 592 593/* 594 * Called with acct->lock held, drops it before returning 595 */ 596static void io_worker_handle_work(struct io_wq_acct *acct, 597 struct io_worker *worker) 598 __releases(&acct->lock) 599{ 600 struct io_wq *wq = worker->wq; 601 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 602 603 do { 604 struct io_wq_work *work; 605 606 /* 607 * If we got some work, mark us as busy. If we didn't, but 608 * the list isn't empty, it means we stalled on hashed work. 609 * Mark us stalled so we don't keep looking for work when we 610 * can't make progress, any work completion or insertion will 611 * clear the stalled flag. 612 */ 613 work = io_get_next_work(acct, wq); 614 if (work) { 615 /* 616 * Make sure cancelation can find this, even before 617 * it becomes the active work. That avoids a window 618 * where the work has been removed from our general 619 * work list, but isn't yet discoverable as the 620 * current work item for this worker. 621 */ 622 raw_spin_lock(&worker->lock); 623 worker->cur_work = work; 624 raw_spin_unlock(&worker->lock); 625 } 626 627 raw_spin_unlock(&acct->lock); 628 629 if (!work) 630 break; 631 632 __io_worker_busy(acct, worker); 633 634 io_assign_current_work(worker, work); 635 __set_current_state(TASK_RUNNING); 636 637 /* handle a whole dependent link */ 638 do { 639 struct io_wq_work *next_hashed, *linked; 640 unsigned int work_flags = atomic_read(&work->flags); 641 unsigned int hash = __io_wq_is_hashed(work_flags) 642 ? __io_get_work_hash(work_flags) 643 : -1U; 644 645 next_hashed = wq_next_work(work); 646 647 if (do_kill && 648 (work_flags & IO_WQ_WORK_UNBOUND)) 649 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 650 io_wq_submit_work(work); 651 io_assign_current_work(worker, NULL); 652 653 linked = io_wq_free_work(work); 654 work = next_hashed; 655 if (!work && linked && !io_wq_is_hashed(linked)) { 656 work = linked; 657 linked = NULL; 658 } 659 io_assign_current_work(worker, work); 660 if (linked) 661 io_wq_enqueue(wq, linked); 662 663 if (hash != -1U && !next_hashed) { 664 /* serialize hash clear with wake_up() */ 665 spin_lock_irq(&wq->hash->wait.lock); 666 clear_bit(hash, &wq->hash->map); 667 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 668 spin_unlock_irq(&wq->hash->wait.lock); 669 if (wq_has_sleeper(&wq->hash->wait)) 670 wake_up(&wq->hash->wait); 671 } 672 } while (work); 673 674 if (!__io_acct_run_queue(acct)) 675 break; 676 raw_spin_lock(&acct->lock); 677 } while (1); 678} 679 680static int io_wq_worker(void *data) 681{ 682 struct io_worker *worker = data; 683 struct io_wq_acct *acct = io_wq_get_acct(worker); 684 struct io_wq *wq = worker->wq; 685 bool exit_mask = false, last_timeout = false; 686 char buf[TASK_COMM_LEN] = {}; 687 688 set_mask_bits(&worker->flags, 0, 689 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 690 691 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 692 set_task_comm(current, buf); 693 694 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 695 long ret; 696 697 set_current_state(TASK_INTERRUPTIBLE); 698 699 /* 700 * If we have work to do, io_acct_run_queue() returns with 701 * the acct->lock held. If not, it will drop it. 702 */ 703 while (io_acct_run_queue(acct)) 704 io_worker_handle_work(acct, worker); 705 706 raw_spin_lock(&acct->workers_lock); 707 /* 708 * Last sleep timed out. Exit if we're not the last worker, 709 * or if someone modified our affinity. 710 */ 711 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 712 acct->nr_workers--; 713 raw_spin_unlock(&acct->workers_lock); 714 __set_current_state(TASK_RUNNING); 715 break; 716 } 717 last_timeout = false; 718 __io_worker_idle(acct, worker); 719 raw_spin_unlock(&acct->workers_lock); 720 if (io_run_task_work()) 721 continue; 722 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 723 if (signal_pending(current)) { 724 struct ksignal ksig; 725 726 if (!get_signal(&ksig)) 727 continue; 728 break; 729 } 730 if (!ret) { 731 last_timeout = true; 732 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 733 wq->cpu_mask); 734 } 735 } 736 737 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 738 io_worker_handle_work(acct, worker); 739 740 io_worker_exit(worker); 741 return 0; 742} 743 744/* 745 * Called when a worker is scheduled in. Mark us as currently running. 746 */ 747void io_wq_worker_running(struct task_struct *tsk) 748{ 749 struct io_worker *worker = tsk->worker_private; 750 751 if (!worker) 752 return; 753 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 754 return; 755 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 756 return; 757 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 758 io_wq_inc_running(worker); 759} 760 761/* 762 * Called when worker is going to sleep. If there are no workers currently 763 * running and we have work pending, wake up a free one or create a new one. 764 */ 765void io_wq_worker_sleeping(struct task_struct *tsk) 766{ 767 struct io_worker *worker = tsk->worker_private; 768 769 if (!worker) 770 return; 771 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 772 return; 773 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 774 return; 775 776 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 777 io_wq_dec_running(worker); 778} 779 780static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, 781 struct task_struct *tsk) 782{ 783 tsk->worker_private = worker; 784 worker->task = tsk; 785 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 786 787 raw_spin_lock(&acct->workers_lock); 788 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 789 list_add_tail_rcu(&worker->all_list, &acct->all_list); 790 set_bit(IO_WORKER_F_FREE, &worker->flags); 791 raw_spin_unlock(&acct->workers_lock); 792 wake_up_new_task(tsk); 793} 794 795static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 796{ 797 return true; 798} 799 800static inline bool io_should_retry_thread(struct io_worker *worker, long err) 801{ 802 /* 803 * Prevent perpetual task_work retry, if the task (or its group) is 804 * exiting. 805 */ 806 if (fatal_signal_pending(current)) 807 return false; 808 809 worker->init_retries++; 810 switch (err) { 811 case -EAGAIN: 812 return worker->init_retries <= WORKER_INIT_LIMIT; 813 /* Analogous to a fork() syscall, always retry on a restartable error */ 814 case -ERESTARTSYS: 815 case -ERESTARTNOINTR: 816 case -ERESTARTNOHAND: 817 return true; 818 default: 819 return false; 820 } 821} 822 823static void queue_create_worker_retry(struct io_worker *worker) 824{ 825 /* 826 * We only bother retrying because there's a chance that the 827 * failure to create a worker is due to some temporary condition 828 * in the forking task (e.g. outstanding signal); give the task 829 * some time to clear that condition. 830 */ 831 schedule_delayed_work(&worker->work, 832 msecs_to_jiffies(worker->init_retries * 5)); 833} 834 835static void create_worker_cont(struct callback_head *cb) 836{ 837 struct io_worker *worker; 838 struct task_struct *tsk; 839 struct io_wq *wq; 840 struct io_wq_acct *acct; 841 842 worker = container_of(cb, struct io_worker, create_work); 843 clear_bit_unlock(0, &worker->create_state); 844 wq = worker->wq; 845 acct = io_wq_get_acct(worker); 846 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 847 if (!IS_ERR(tsk)) { 848 io_init_new_worker(wq, acct, worker, tsk); 849 io_worker_release(worker); 850 return; 851 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 852 atomic_dec(&acct->nr_running); 853 raw_spin_lock(&acct->workers_lock); 854 acct->nr_workers--; 855 if (!acct->nr_workers) { 856 struct io_cb_cancel_data match = { 857 .fn = io_wq_work_match_all, 858 .cancel_all = true, 859 }; 860 861 raw_spin_unlock(&acct->workers_lock); 862 while (io_acct_cancel_pending_work(wq, acct, &match)) 863 ; 864 } else { 865 raw_spin_unlock(&acct->workers_lock); 866 } 867 io_worker_ref_put(wq); 868 kfree(worker); 869 return; 870 } 871 872 /* re-create attempts grab a new worker ref, drop the existing one */ 873 io_worker_release(worker); 874 queue_create_worker_retry(worker); 875} 876 877static void io_workqueue_create(struct work_struct *work) 878{ 879 struct io_worker *worker = container_of(work, struct io_worker, 880 work.work); 881 struct io_wq_acct *acct = io_wq_get_acct(worker); 882 883 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 884 kfree(worker); 885} 886 887static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) 888{ 889 struct io_worker *worker; 890 struct task_struct *tsk; 891 892 __set_current_state(TASK_RUNNING); 893 894 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 895 if (!worker) { 896fail: 897 atomic_dec(&acct->nr_running); 898 raw_spin_lock(&acct->workers_lock); 899 acct->nr_workers--; 900 raw_spin_unlock(&acct->workers_lock); 901 io_worker_ref_put(wq); 902 return false; 903 } 904 905 refcount_set(&worker->ref, 1); 906 worker->wq = wq; 907 worker->acct = acct; 908 raw_spin_lock_init(&worker->lock); 909 init_completion(&worker->ref_done); 910 911 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 912 if (!IS_ERR(tsk)) { 913 io_init_new_worker(wq, acct, worker, tsk); 914 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 915 kfree(worker); 916 goto fail; 917 } else { 918 INIT_DELAYED_WORK(&worker->work, io_workqueue_create); 919 queue_create_worker_retry(worker); 920 } 921 922 return true; 923} 924 925/* 926 * Iterate the passed in list and call the specific function for each 927 * worker that isn't exiting 928 */ 929static bool io_acct_for_each_worker(struct io_wq_acct *acct, 930 bool (*func)(struct io_worker *, void *), 931 void *data) 932{ 933 struct io_worker *worker; 934 bool ret = false; 935 936 list_for_each_entry_rcu(worker, &acct->all_list, all_list) { 937 if (io_worker_get(worker)) { 938 /* no task if node is/was offline */ 939 if (worker->task) 940 ret = func(worker, data); 941 io_worker_release(worker); 942 if (ret) 943 break; 944 } 945 } 946 947 return ret; 948} 949 950static void io_wq_for_each_worker(struct io_wq *wq, 951 bool (*func)(struct io_worker *, void *), 952 void *data) 953{ 954 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 955 if (io_acct_for_each_worker(&wq->acct[i], func, data)) 956 break; 957} 958 959static bool io_wq_worker_wake(struct io_worker *worker, void *data) 960{ 961 __set_notify_signal(worker->task); 962 wake_up_process(worker->task); 963 return false; 964} 965 966static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 967{ 968 do { 969 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 970 io_wq_submit_work(work); 971 work = io_wq_free_work(work); 972 } while (work); 973} 974 975static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, 976 struct io_wq_work *work, unsigned int work_flags) 977{ 978 unsigned int hash; 979 struct io_wq_work *tail; 980 981 if (!__io_wq_is_hashed(work_flags)) { 982append: 983 wq_list_add_tail(&work->list, &acct->work_list); 984 return; 985 } 986 987 hash = __io_get_work_hash(work_flags); 988 tail = wq->hash_tail[hash]; 989 wq->hash_tail[hash] = work; 990 if (!tail) 991 goto append; 992 993 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 994} 995 996static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 997{ 998 return work == data; 999} 1000 1001void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 1002{ 1003 unsigned int work_flags = atomic_read(&work->flags); 1004 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); 1005 struct io_cb_cancel_data match = { 1006 .fn = io_wq_work_match_item, 1007 .data = work, 1008 .cancel_all = false, 1009 }; 1010 bool do_create; 1011 1012 /* 1013 * If io-wq is exiting for this task, or if the request has explicitly 1014 * been marked as one that should not get executed, cancel it here. 1015 */ 1016 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 1017 (work_flags & IO_WQ_WORK_CANCEL)) { 1018 io_run_cancel(work, wq); 1019 return; 1020 } 1021 1022 raw_spin_lock(&acct->lock); 1023 io_wq_insert_work(wq, acct, work, work_flags); 1024 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 1025 raw_spin_unlock(&acct->lock); 1026 1027 rcu_read_lock(); 1028 do_create = !io_acct_activate_free_worker(acct); 1029 rcu_read_unlock(); 1030 1031 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 1032 !atomic_read(&acct->nr_running))) { 1033 bool did_create; 1034 1035 did_create = io_wq_create_worker(wq, acct); 1036 if (likely(did_create)) 1037 return; 1038 1039 raw_spin_lock(&acct->workers_lock); 1040 if (acct->nr_workers) { 1041 raw_spin_unlock(&acct->workers_lock); 1042 return; 1043 } 1044 raw_spin_unlock(&acct->workers_lock); 1045 1046 /* fatal condition, failed to create the first worker */ 1047 io_acct_cancel_pending_work(wq, acct, &match); 1048 } 1049} 1050 1051/* 1052 * Work items that hash to the same value will not be done in parallel. 1053 * Used to limit concurrent writes, generally hashed by inode. 1054 */ 1055void io_wq_hash_work(struct io_wq_work *work, void *val) 1056{ 1057 unsigned int bit; 1058 1059 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 1060 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 1061} 1062 1063static bool __io_wq_worker_cancel(struct io_worker *worker, 1064 struct io_cb_cancel_data *match, 1065 struct io_wq_work *work) 1066{ 1067 if (work && match->fn(work, match->data)) { 1068 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1069 __set_notify_signal(worker->task); 1070 return true; 1071 } 1072 1073 return false; 1074} 1075 1076static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1077{ 1078 struct io_cb_cancel_data *match = data; 1079 1080 /* 1081 * Hold the lock to avoid ->cur_work going out of scope, caller 1082 * may dereference the passed in work. 1083 */ 1084 raw_spin_lock(&worker->lock); 1085 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1086 match->nr_running++; 1087 raw_spin_unlock(&worker->lock); 1088 1089 return match->nr_running && !match->cancel_all; 1090} 1091 1092static inline void io_wq_remove_pending(struct io_wq *wq, 1093 struct io_wq_acct *acct, 1094 struct io_wq_work *work, 1095 struct io_wq_work_node *prev) 1096{ 1097 unsigned int hash = io_get_work_hash(work); 1098 struct io_wq_work *prev_work = NULL; 1099 1100 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1101 if (prev) 1102 prev_work = container_of(prev, struct io_wq_work, list); 1103 if (prev_work && io_get_work_hash(prev_work) == hash) 1104 wq->hash_tail[hash] = prev_work; 1105 else 1106 wq->hash_tail[hash] = NULL; 1107 } 1108 wq_list_del(&acct->work_list, &work->list, prev); 1109} 1110 1111static bool io_acct_cancel_pending_work(struct io_wq *wq, 1112 struct io_wq_acct *acct, 1113 struct io_cb_cancel_data *match) 1114{ 1115 struct io_wq_work_node *node, *prev; 1116 struct io_wq_work *work; 1117 1118 raw_spin_lock(&acct->lock); 1119 wq_list_for_each(node, prev, &acct->work_list) { 1120 work = container_of(node, struct io_wq_work, list); 1121 if (!match->fn(work, match->data)) 1122 continue; 1123 io_wq_remove_pending(wq, acct, work, prev); 1124 raw_spin_unlock(&acct->lock); 1125 io_run_cancel(work, wq); 1126 match->nr_pending++; 1127 /* not safe to continue after unlock */ 1128 return true; 1129 } 1130 raw_spin_unlock(&acct->lock); 1131 1132 return false; 1133} 1134 1135static void io_wq_cancel_pending_work(struct io_wq *wq, 1136 struct io_cb_cancel_data *match) 1137{ 1138 int i; 1139retry: 1140 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1141 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1142 1143 if (io_acct_cancel_pending_work(wq, acct, match)) { 1144 if (match->cancel_all) 1145 goto retry; 1146 break; 1147 } 1148 } 1149} 1150 1151static void io_acct_cancel_running_work(struct io_wq_acct *acct, 1152 struct io_cb_cancel_data *match) 1153{ 1154 raw_spin_lock(&acct->workers_lock); 1155 io_acct_for_each_worker(acct, io_wq_worker_cancel, match); 1156 raw_spin_unlock(&acct->workers_lock); 1157} 1158 1159static void io_wq_cancel_running_work(struct io_wq *wq, 1160 struct io_cb_cancel_data *match) 1161{ 1162 rcu_read_lock(); 1163 1164 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 1165 io_acct_cancel_running_work(&wq->acct[i], match); 1166 1167 rcu_read_unlock(); 1168} 1169 1170enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1171 void *data, bool cancel_all) 1172{ 1173 struct io_cb_cancel_data match = { 1174 .fn = cancel, 1175 .data = data, 1176 .cancel_all = cancel_all, 1177 }; 1178 1179 /* 1180 * First check pending list, if we're lucky we can just remove it 1181 * from there. CANCEL_OK means that the work is returned as-new, 1182 * no completion will be posted for it. 1183 * 1184 * Then check if a free (going busy) or busy worker has the work 1185 * currently running. If we find it there, we'll return CANCEL_RUNNING 1186 * as an indication that we attempt to signal cancellation. The 1187 * completion will run normally in this case. 1188 * 1189 * Do both of these while holding the acct->workers_lock, to ensure that 1190 * we'll find a work item regardless of state. 1191 */ 1192 io_wq_cancel_pending_work(wq, &match); 1193 if (match.nr_pending && !match.cancel_all) 1194 return IO_WQ_CANCEL_OK; 1195 1196 io_wq_cancel_running_work(wq, &match); 1197 if (match.nr_running && !match.cancel_all) 1198 return IO_WQ_CANCEL_RUNNING; 1199 1200 if (match.nr_running) 1201 return IO_WQ_CANCEL_RUNNING; 1202 if (match.nr_pending) 1203 return IO_WQ_CANCEL_OK; 1204 return IO_WQ_CANCEL_NOTFOUND; 1205} 1206 1207static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1208 int sync, void *key) 1209{ 1210 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1211 int i; 1212 1213 list_del_init(&wait->entry); 1214 1215 rcu_read_lock(); 1216 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1217 struct io_wq_acct *acct = &wq->acct[i]; 1218 1219 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1220 io_acct_activate_free_worker(acct); 1221 } 1222 rcu_read_unlock(); 1223 return 1; 1224} 1225 1226struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1227{ 1228 int ret, i; 1229 struct io_wq *wq; 1230 1231 if (WARN_ON_ONCE(!bounded)) 1232 return ERR_PTR(-EINVAL); 1233 1234 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1235 if (!wq) 1236 return ERR_PTR(-ENOMEM); 1237 1238 refcount_inc(&data->hash->refs); 1239 wq->hash = data->hash; 1240 1241 ret = -ENOMEM; 1242 1243 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1244 goto err; 1245 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1246 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1247 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1248 task_rlimit(current, RLIMIT_NPROC); 1249 INIT_LIST_HEAD(&wq->wait.entry); 1250 wq->wait.func = io_wq_hash_wake; 1251 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1252 struct io_wq_acct *acct = &wq->acct[i]; 1253 1254 atomic_set(&acct->nr_running, 0); 1255 1256 raw_spin_lock_init(&acct->workers_lock); 1257 INIT_HLIST_NULLS_HEAD(&acct->free_list, 0); 1258 INIT_LIST_HEAD(&acct->all_list); 1259 1260 INIT_WQ_LIST(&acct->work_list); 1261 raw_spin_lock_init(&acct->lock); 1262 } 1263 1264 wq->task = get_task_struct(data->task); 1265 atomic_set(&wq->worker_refs, 1); 1266 init_completion(&wq->worker_done); 1267 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1268 if (ret) { 1269 put_task_struct(wq->task); 1270 goto err; 1271 } 1272 1273 return wq; 1274err: 1275 io_wq_put_hash(data->hash); 1276 free_cpumask_var(wq->cpu_mask); 1277 kfree(wq); 1278 return ERR_PTR(ret); 1279} 1280 1281static bool io_task_work_match(struct callback_head *cb, void *data) 1282{ 1283 struct io_worker *worker; 1284 1285 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1286 return false; 1287 worker = container_of(cb, struct io_worker, create_work); 1288 return worker->wq == data; 1289} 1290 1291void io_wq_exit_start(struct io_wq *wq) 1292{ 1293 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1294} 1295 1296static void io_wq_cancel_tw_create(struct io_wq *wq) 1297{ 1298 struct callback_head *cb; 1299 1300 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1301 struct io_worker *worker; 1302 1303 worker = container_of(cb, struct io_worker, create_work); 1304 io_worker_cancel_cb(worker); 1305 /* 1306 * Only the worker continuation helper has worker allocated and 1307 * hence needs freeing. 1308 */ 1309 if (cb->func == create_worker_cont) 1310 kfree(worker); 1311 } 1312} 1313 1314static void io_wq_exit_workers(struct io_wq *wq) 1315{ 1316 if (!wq->task) 1317 return; 1318 1319 io_wq_cancel_tw_create(wq); 1320 1321 rcu_read_lock(); 1322 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1323 rcu_read_unlock(); 1324 io_worker_ref_put(wq); 1325 wait_for_completion(&wq->worker_done); 1326 1327 spin_lock_irq(&wq->hash->wait.lock); 1328 list_del_init(&wq->wait.entry); 1329 spin_unlock_irq(&wq->hash->wait.lock); 1330 1331 put_task_struct(wq->task); 1332 wq->task = NULL; 1333} 1334 1335static void io_wq_destroy(struct io_wq *wq) 1336{ 1337 struct io_cb_cancel_data match = { 1338 .fn = io_wq_work_match_all, 1339 .cancel_all = true, 1340 }; 1341 1342 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1343 io_wq_cancel_pending_work(wq, &match); 1344 free_cpumask_var(wq->cpu_mask); 1345 io_wq_put_hash(wq->hash); 1346 kfree(wq); 1347} 1348 1349void io_wq_put_and_exit(struct io_wq *wq) 1350{ 1351 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1352 1353 io_wq_exit_workers(wq); 1354 io_wq_destroy(wq); 1355} 1356 1357struct online_data { 1358 unsigned int cpu; 1359 bool online; 1360}; 1361 1362static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1363{ 1364 struct online_data *od = data; 1365 1366 if (od->online) 1367 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1368 else 1369 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1370 return false; 1371} 1372 1373static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1374{ 1375 struct online_data od = { 1376 .cpu = cpu, 1377 .online = online 1378 }; 1379 1380 rcu_read_lock(); 1381 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1382 rcu_read_unlock(); 1383 return 0; 1384} 1385 1386static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1387{ 1388 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1389 1390 return __io_wq_cpu_online(wq, cpu, true); 1391} 1392 1393static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1394{ 1395 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1396 1397 return __io_wq_cpu_online(wq, cpu, false); 1398} 1399 1400int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1401{ 1402 cpumask_var_t allowed_mask; 1403 int ret = 0; 1404 1405 if (!tctx || !tctx->io_wq) 1406 return -EINVAL; 1407 1408 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1409 return -ENOMEM; 1410 1411 rcu_read_lock(); 1412 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1413 if (mask) { 1414 if (cpumask_subset(mask, allowed_mask)) 1415 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1416 else 1417 ret = -EINVAL; 1418 } else { 1419 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1420 } 1421 rcu_read_unlock(); 1422 1423 free_cpumask_var(allowed_mask); 1424 return ret; 1425} 1426 1427/* 1428 * Set max number of unbounded workers, returns old value. If new_count is 0, 1429 * then just return the old value. 1430 */ 1431int io_wq_max_workers(struct io_wq *wq, int *new_count) 1432{ 1433 struct io_wq_acct *acct; 1434 int prev[IO_WQ_ACCT_NR]; 1435 int i; 1436 1437 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1438 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1439 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1440 1441 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1442 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1443 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1444 } 1445 1446 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1447 prev[i] = 0; 1448 1449 rcu_read_lock(); 1450 1451 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1452 acct = &wq->acct[i]; 1453 raw_spin_lock(&acct->workers_lock); 1454 prev[i] = max_t(int, acct->max_workers, prev[i]); 1455 if (new_count[i]) 1456 acct->max_workers = new_count[i]; 1457 raw_spin_unlock(&acct->workers_lock); 1458 } 1459 rcu_read_unlock(); 1460 1461 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1462 new_count[i] = prev[i]; 1463 1464 return 0; 1465} 1466 1467static __init int io_wq_init(void) 1468{ 1469 int ret; 1470 1471 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1472 io_wq_cpu_online, io_wq_cpu_offline); 1473 if (ret < 0) 1474 return ret; 1475 io_wq_online = ret; 1476 return 0; 1477} 1478subsys_initcall(io_wq_init);