Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v6.15-rc7 1447 lines 35 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/percpu.h> 13#include <linux/slab.h> 14#include <linux/rculist_nulls.h> 15#include <linux/cpu.h> 16#include <linux/cpuset.h> 17#include <linux/task_work.h> 18#include <linux/audit.h> 19#include <linux/mmu_context.h> 20#include <uapi/linux/io_uring.h> 21 22#include "io-wq.h" 23#include "slist.h" 24#include "io_uring.h" 25 26#define WORKER_IDLE_TIMEOUT (5 * HZ) 27#define WORKER_INIT_LIMIT 3 28 29enum { 30 IO_WORKER_F_UP = 0, /* up and active */ 31 IO_WORKER_F_RUNNING = 1, /* account as running */ 32 IO_WORKER_F_FREE = 2, /* worker on free list */ 33}; 34 35enum { 36 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 37}; 38 39enum { 40 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 41}; 42 43/* 44 * One for each thread in a wq pool 45 */ 46struct io_worker { 47 refcount_t ref; 48 unsigned long flags; 49 struct hlist_nulls_node nulls_node; 50 struct list_head all_list; 51 struct task_struct *task; 52 struct io_wq *wq; 53 struct io_wq_acct *acct; 54 55 struct io_wq_work *cur_work; 56 raw_spinlock_t lock; 57 58 struct completion ref_done; 59 60 unsigned long create_state; 61 struct callback_head create_work; 62 int init_retries; 63 64 union { 65 struct rcu_head rcu; 66 struct delayed_work work; 67 }; 68}; 69 70#if BITS_PER_LONG == 64 71#define IO_WQ_HASH_ORDER 6 72#else 73#define IO_WQ_HASH_ORDER 5 74#endif 75 76#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 77 78struct io_wq_acct { 79 /** 80 * Protects access to the worker lists. 81 */ 82 raw_spinlock_t workers_lock; 83 84 unsigned nr_workers; 85 unsigned max_workers; 86 atomic_t nr_running; 87 88 /** 89 * The list of free workers. Protected by #workers_lock 90 * (write) and RCU (read). 91 */ 92 struct hlist_nulls_head free_list; 93 94 /** 95 * The list of all workers. Protected by #workers_lock 96 * (write) and RCU (read). 97 */ 98 struct list_head all_list; 99 100 raw_spinlock_t lock; 101 struct io_wq_work_list work_list; 102 unsigned long flags; 103}; 104 105enum { 106 IO_WQ_ACCT_BOUND, 107 IO_WQ_ACCT_UNBOUND, 108 IO_WQ_ACCT_NR, 109}; 110 111/* 112 * Per io_wq state 113 */ 114struct io_wq { 115 unsigned long state; 116 117 free_work_fn *free_work; 118 io_wq_work_fn *do_work; 119 120 struct io_wq_hash *hash; 121 122 atomic_t worker_refs; 123 struct completion worker_done; 124 125 struct hlist_node cpuhp_node; 126 127 struct task_struct *task; 128 129 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 130 131 struct wait_queue_entry wait; 132 133 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 134 135 cpumask_var_t cpu_mask; 136}; 137 138static enum cpuhp_state io_wq_online; 139 140struct io_cb_cancel_data { 141 work_cancel_fn *fn; 142 void *data; 143 int nr_running; 144 int nr_pending; 145 bool cancel_all; 146}; 147 148static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct); 149static void io_wq_dec_running(struct io_worker *worker); 150static bool io_acct_cancel_pending_work(struct io_wq *wq, 151 struct io_wq_acct *acct, 152 struct io_cb_cancel_data *match); 153static void create_worker_cb(struct callback_head *cb); 154static void io_wq_cancel_tw_create(struct io_wq *wq); 155 156static bool io_worker_get(struct io_worker *worker) 157{ 158 return refcount_inc_not_zero(&worker->ref); 159} 160 161static void io_worker_release(struct io_worker *worker) 162{ 163 if (refcount_dec_and_test(&worker->ref)) 164 complete(&worker->ref_done); 165} 166 167static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 168{ 169 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 170} 171 172static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 173 unsigned int work_flags) 174{ 175 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); 176} 177 178static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 179{ 180 return worker->acct; 181} 182 183static void io_worker_ref_put(struct io_wq *wq) 184{ 185 if (atomic_dec_and_test(&wq->worker_refs)) 186 complete(&wq->worker_done); 187} 188 189bool io_wq_worker_stopped(void) 190{ 191 struct io_worker *worker = current->worker_private; 192 193 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 194 return true; 195 196 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 197} 198 199static void io_worker_cancel_cb(struct io_worker *worker) 200{ 201 struct io_wq_acct *acct = io_wq_get_acct(worker); 202 struct io_wq *wq = worker->wq; 203 204 atomic_dec(&acct->nr_running); 205 raw_spin_lock(&acct->workers_lock); 206 acct->nr_workers--; 207 raw_spin_unlock(&acct->workers_lock); 208 io_worker_ref_put(wq); 209 clear_bit_unlock(0, &worker->create_state); 210 io_worker_release(worker); 211} 212 213static bool io_task_worker_match(struct callback_head *cb, void *data) 214{ 215 struct io_worker *worker; 216 217 if (cb->func != create_worker_cb) 218 return false; 219 worker = container_of(cb, struct io_worker, create_work); 220 return worker == data; 221} 222 223static void io_worker_exit(struct io_worker *worker) 224{ 225 struct io_wq *wq = worker->wq; 226 struct io_wq_acct *acct = io_wq_get_acct(worker); 227 228 while (1) { 229 struct callback_head *cb = task_work_cancel_match(wq->task, 230 io_task_worker_match, worker); 231 232 if (!cb) 233 break; 234 io_worker_cancel_cb(worker); 235 } 236 237 io_worker_release(worker); 238 wait_for_completion(&worker->ref_done); 239 240 raw_spin_lock(&acct->workers_lock); 241 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 242 hlist_nulls_del_rcu(&worker->nulls_node); 243 list_del_rcu(&worker->all_list); 244 raw_spin_unlock(&acct->workers_lock); 245 io_wq_dec_running(worker); 246 /* 247 * this worker is a goner, clear ->worker_private to avoid any 248 * inc/dec running calls that could happen as part of exit from 249 * touching 'worker'. 250 */ 251 current->worker_private = NULL; 252 253 kfree_rcu(worker, rcu); 254 io_worker_ref_put(wq); 255 do_exit(0); 256} 257 258static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 259{ 260 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 261 !wq_list_empty(&acct->work_list); 262} 263 264/* 265 * If there's work to do, returns true with acct->lock acquired. If not, 266 * returns false with no lock held. 267 */ 268static inline bool io_acct_run_queue(struct io_wq_acct *acct) 269 __acquires(&acct->lock) 270{ 271 raw_spin_lock(&acct->lock); 272 if (__io_acct_run_queue(acct)) 273 return true; 274 275 raw_spin_unlock(&acct->lock); 276 return false; 277} 278 279/* 280 * Check head of free list for an available worker. If one isn't available, 281 * caller must create one. 282 */ 283static bool io_acct_activate_free_worker(struct io_wq_acct *acct) 284 __must_hold(RCU) 285{ 286 struct hlist_nulls_node *n; 287 struct io_worker *worker; 288 289 /* 290 * Iterate free_list and see if we can find an idle worker to 291 * activate. If a given worker is on the free_list but in the process 292 * of exiting, keep trying. 293 */ 294 hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { 295 if (!io_worker_get(worker)) 296 continue; 297 /* 298 * If the worker is already running, it's either already 299 * starting work or finishing work. In either case, if it does 300 * to go sleep, we'll kick off a new task for this work anyway. 301 */ 302 wake_up_process(worker->task); 303 io_worker_release(worker); 304 return true; 305 } 306 307 return false; 308} 309 310/* 311 * We need a worker. If we find a free one, we're good. If not, and we're 312 * below the max number of workers, create one. 313 */ 314static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 315{ 316 /* 317 * Most likely an attempt to queue unbounded work on an io_wq that 318 * wasn't setup with any unbounded workers. 319 */ 320 if (unlikely(!acct->max_workers)) 321 pr_warn_once("io-wq is not configured for unbound workers"); 322 323 raw_spin_lock(&acct->workers_lock); 324 if (acct->nr_workers >= acct->max_workers) { 325 raw_spin_unlock(&acct->workers_lock); 326 return true; 327 } 328 acct->nr_workers++; 329 raw_spin_unlock(&acct->workers_lock); 330 atomic_inc(&acct->nr_running); 331 atomic_inc(&wq->worker_refs); 332 return create_io_worker(wq, acct); 333} 334 335static void io_wq_inc_running(struct io_worker *worker) 336{ 337 struct io_wq_acct *acct = io_wq_get_acct(worker); 338 339 atomic_inc(&acct->nr_running); 340} 341 342static void create_worker_cb(struct callback_head *cb) 343{ 344 struct io_worker *worker; 345 struct io_wq *wq; 346 347 struct io_wq_acct *acct; 348 bool do_create = false; 349 350 worker = container_of(cb, struct io_worker, create_work); 351 wq = worker->wq; 352 acct = worker->acct; 353 raw_spin_lock(&acct->workers_lock); 354 355 if (acct->nr_workers < acct->max_workers) { 356 acct->nr_workers++; 357 do_create = true; 358 } 359 raw_spin_unlock(&acct->workers_lock); 360 if (do_create) { 361 create_io_worker(wq, acct); 362 } else { 363 atomic_dec(&acct->nr_running); 364 io_worker_ref_put(wq); 365 } 366 clear_bit_unlock(0, &worker->create_state); 367 io_worker_release(worker); 368} 369 370static bool io_queue_worker_create(struct io_worker *worker, 371 struct io_wq_acct *acct, 372 task_work_func_t func) 373{ 374 struct io_wq *wq = worker->wq; 375 376 /* raced with exit, just ignore create call */ 377 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 378 goto fail; 379 if (!io_worker_get(worker)) 380 goto fail; 381 /* 382 * create_state manages ownership of create_work/index. We should 383 * only need one entry per worker, as the worker going to sleep 384 * will trigger the condition, and waking will clear it once it 385 * runs the task_work. 386 */ 387 if (test_bit(0, &worker->create_state) || 388 test_and_set_bit_lock(0, &worker->create_state)) 389 goto fail_release; 390 391 atomic_inc(&wq->worker_refs); 392 init_task_work(&worker->create_work, func); 393 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 394 /* 395 * EXIT may have been set after checking it above, check after 396 * adding the task_work and remove any creation item if it is 397 * now set. wq exit does that too, but we can have added this 398 * work item after we canceled in io_wq_exit_workers(). 399 */ 400 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 401 io_wq_cancel_tw_create(wq); 402 io_worker_ref_put(wq); 403 return true; 404 } 405 io_worker_ref_put(wq); 406 clear_bit_unlock(0, &worker->create_state); 407fail_release: 408 io_worker_release(worker); 409fail: 410 atomic_dec(&acct->nr_running); 411 io_worker_ref_put(wq); 412 return false; 413} 414 415static void io_wq_dec_running(struct io_worker *worker) 416{ 417 struct io_wq_acct *acct = io_wq_get_acct(worker); 418 struct io_wq *wq = worker->wq; 419 420 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 421 return; 422 423 if (!atomic_dec_and_test(&acct->nr_running)) 424 return; 425 if (!io_acct_run_queue(acct)) 426 return; 427 428 raw_spin_unlock(&acct->lock); 429 atomic_inc(&acct->nr_running); 430 atomic_inc(&wq->worker_refs); 431 io_queue_worker_create(worker, acct, create_worker_cb); 432} 433 434/* 435 * Worker will start processing some work. Move it to the busy list, if 436 * it's currently on the freelist 437 */ 438static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker) 439{ 440 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 441 clear_bit(IO_WORKER_F_FREE, &worker->flags); 442 raw_spin_lock(&acct->workers_lock); 443 hlist_nulls_del_init_rcu(&worker->nulls_node); 444 raw_spin_unlock(&acct->workers_lock); 445 } 446} 447 448/* 449 * No work, worker going to sleep. Move to freelist. 450 */ 451static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker) 452 __must_hold(acct->workers_lock) 453{ 454 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 455 set_bit(IO_WORKER_F_FREE, &worker->flags); 456 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 457 } 458} 459 460static inline unsigned int __io_get_work_hash(unsigned int work_flags) 461{ 462 return work_flags >> IO_WQ_HASH_SHIFT; 463} 464 465static inline unsigned int io_get_work_hash(struct io_wq_work *work) 466{ 467 return __io_get_work_hash(atomic_read(&work->flags)); 468} 469 470static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 471{ 472 bool ret = false; 473 474 spin_lock_irq(&wq->hash->wait.lock); 475 if (list_empty(&wq->wait.entry)) { 476 __add_wait_queue(&wq->hash->wait, &wq->wait); 477 if (!test_bit(hash, &wq->hash->map)) { 478 __set_current_state(TASK_RUNNING); 479 list_del_init(&wq->wait.entry); 480 ret = true; 481 } 482 } 483 spin_unlock_irq(&wq->hash->wait.lock); 484 return ret; 485} 486 487static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 488 struct io_wq *wq) 489 __must_hold(acct->lock) 490{ 491 struct io_wq_work_node *node, *prev; 492 struct io_wq_work *work, *tail; 493 unsigned int stall_hash = -1U; 494 495 wq_list_for_each(node, prev, &acct->work_list) { 496 unsigned int work_flags; 497 unsigned int hash; 498 499 work = container_of(node, struct io_wq_work, list); 500 501 /* not hashed, can run anytime */ 502 work_flags = atomic_read(&work->flags); 503 if (!__io_wq_is_hashed(work_flags)) { 504 wq_list_del(&acct->work_list, node, prev); 505 return work; 506 } 507 508 hash = __io_get_work_hash(work_flags); 509 /* all items with this hash lie in [work, tail] */ 510 tail = wq->hash_tail[hash]; 511 512 /* hashed, can run if not already running */ 513 if (!test_and_set_bit(hash, &wq->hash->map)) { 514 wq->hash_tail[hash] = NULL; 515 wq_list_cut(&acct->work_list, &tail->list, prev); 516 return work; 517 } 518 if (stall_hash == -1U) 519 stall_hash = hash; 520 /* fast forward to a next hash, for-each will fix up @prev */ 521 node = &tail->list; 522 } 523 524 if (stall_hash != -1U) { 525 bool unstalled; 526 527 /* 528 * Set this before dropping the lock to avoid racing with new 529 * work being added and clearing the stalled bit. 530 */ 531 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 532 raw_spin_unlock(&acct->lock); 533 unstalled = io_wait_on_hash(wq, stall_hash); 534 raw_spin_lock(&acct->lock); 535 if (unstalled) { 536 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 537 if (wq_has_sleeper(&wq->hash->wait)) 538 wake_up(&wq->hash->wait); 539 } 540 } 541 542 return NULL; 543} 544 545static void io_assign_current_work(struct io_worker *worker, 546 struct io_wq_work *work) 547{ 548 if (work) { 549 io_run_task_work(); 550 cond_resched(); 551 } 552 553 raw_spin_lock(&worker->lock); 554 worker->cur_work = work; 555 raw_spin_unlock(&worker->lock); 556} 557 558/* 559 * Called with acct->lock held, drops it before returning 560 */ 561static void io_worker_handle_work(struct io_wq_acct *acct, 562 struct io_worker *worker) 563 __releases(&acct->lock) 564{ 565 struct io_wq *wq = worker->wq; 566 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 567 568 do { 569 struct io_wq_work *work; 570 571 /* 572 * If we got some work, mark us as busy. If we didn't, but 573 * the list isn't empty, it means we stalled on hashed work. 574 * Mark us stalled so we don't keep looking for work when we 575 * can't make progress, any work completion or insertion will 576 * clear the stalled flag. 577 */ 578 work = io_get_next_work(acct, wq); 579 if (work) { 580 /* 581 * Make sure cancelation can find this, even before 582 * it becomes the active work. That avoids a window 583 * where the work has been removed from our general 584 * work list, but isn't yet discoverable as the 585 * current work item for this worker. 586 */ 587 raw_spin_lock(&worker->lock); 588 worker->cur_work = work; 589 raw_spin_unlock(&worker->lock); 590 } 591 592 raw_spin_unlock(&acct->lock); 593 594 if (!work) 595 break; 596 597 __io_worker_busy(acct, worker); 598 599 io_assign_current_work(worker, work); 600 __set_current_state(TASK_RUNNING); 601 602 /* handle a whole dependent link */ 603 do { 604 struct io_wq_work *next_hashed, *linked; 605 unsigned int work_flags = atomic_read(&work->flags); 606 unsigned int hash = __io_wq_is_hashed(work_flags) 607 ? __io_get_work_hash(work_flags) 608 : -1U; 609 610 next_hashed = wq_next_work(work); 611 612 if (do_kill && 613 (work_flags & IO_WQ_WORK_UNBOUND)) 614 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 615 wq->do_work(work); 616 io_assign_current_work(worker, NULL); 617 618 linked = wq->free_work(work); 619 work = next_hashed; 620 if (!work && linked && !io_wq_is_hashed(linked)) { 621 work = linked; 622 linked = NULL; 623 } 624 io_assign_current_work(worker, work); 625 if (linked) 626 io_wq_enqueue(wq, linked); 627 628 if (hash != -1U && !next_hashed) { 629 /* serialize hash clear with wake_up() */ 630 spin_lock_irq(&wq->hash->wait.lock); 631 clear_bit(hash, &wq->hash->map); 632 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 633 spin_unlock_irq(&wq->hash->wait.lock); 634 if (wq_has_sleeper(&wq->hash->wait)) 635 wake_up(&wq->hash->wait); 636 } 637 } while (work); 638 639 if (!__io_acct_run_queue(acct)) 640 break; 641 raw_spin_lock(&acct->lock); 642 } while (1); 643} 644 645static int io_wq_worker(void *data) 646{ 647 struct io_worker *worker = data; 648 struct io_wq_acct *acct = io_wq_get_acct(worker); 649 struct io_wq *wq = worker->wq; 650 bool exit_mask = false, last_timeout = false; 651 char buf[TASK_COMM_LEN] = {}; 652 653 set_mask_bits(&worker->flags, 0, 654 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 655 656 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 657 set_task_comm(current, buf); 658 659 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 660 long ret; 661 662 set_current_state(TASK_INTERRUPTIBLE); 663 664 /* 665 * If we have work to do, io_acct_run_queue() returns with 666 * the acct->lock held. If not, it will drop it. 667 */ 668 while (io_acct_run_queue(acct)) 669 io_worker_handle_work(acct, worker); 670 671 raw_spin_lock(&acct->workers_lock); 672 /* 673 * Last sleep timed out. Exit if we're not the last worker, 674 * or if someone modified our affinity. 675 */ 676 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 677 acct->nr_workers--; 678 raw_spin_unlock(&acct->workers_lock); 679 __set_current_state(TASK_RUNNING); 680 break; 681 } 682 last_timeout = false; 683 __io_worker_idle(acct, worker); 684 raw_spin_unlock(&acct->workers_lock); 685 if (io_run_task_work()) 686 continue; 687 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 688 if (signal_pending(current)) { 689 struct ksignal ksig; 690 691 if (!get_signal(&ksig)) 692 continue; 693 break; 694 } 695 if (!ret) { 696 last_timeout = true; 697 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 698 wq->cpu_mask); 699 } 700 } 701 702 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 703 io_worker_handle_work(acct, worker); 704 705 io_worker_exit(worker); 706 return 0; 707} 708 709/* 710 * Called when a worker is scheduled in. Mark us as currently running. 711 */ 712void io_wq_worker_running(struct task_struct *tsk) 713{ 714 struct io_worker *worker = tsk->worker_private; 715 716 if (!worker) 717 return; 718 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 719 return; 720 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 721 return; 722 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 723 io_wq_inc_running(worker); 724} 725 726/* 727 * Called when worker is going to sleep. If there are no workers currently 728 * running and we have work pending, wake up a free one or create a new one. 729 */ 730void io_wq_worker_sleeping(struct task_struct *tsk) 731{ 732 struct io_worker *worker = tsk->worker_private; 733 734 if (!worker) 735 return; 736 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 737 return; 738 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 739 return; 740 741 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 742 io_wq_dec_running(worker); 743} 744 745static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker, 746 struct task_struct *tsk) 747{ 748 tsk->worker_private = worker; 749 worker->task = tsk; 750 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 751 752 raw_spin_lock(&acct->workers_lock); 753 hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list); 754 list_add_tail_rcu(&worker->all_list, &acct->all_list); 755 set_bit(IO_WORKER_F_FREE, &worker->flags); 756 raw_spin_unlock(&acct->workers_lock); 757 wake_up_new_task(tsk); 758} 759 760static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 761{ 762 return true; 763} 764 765static inline bool io_should_retry_thread(struct io_worker *worker, long err) 766{ 767 /* 768 * Prevent perpetual task_work retry, if the task (or its group) is 769 * exiting. 770 */ 771 if (fatal_signal_pending(current)) 772 return false; 773 if (worker->init_retries++ >= WORKER_INIT_LIMIT) 774 return false; 775 776 switch (err) { 777 case -EAGAIN: 778 case -ERESTARTSYS: 779 case -ERESTARTNOINTR: 780 case -ERESTARTNOHAND: 781 return true; 782 default: 783 return false; 784 } 785} 786 787static void queue_create_worker_retry(struct io_worker *worker) 788{ 789 /* 790 * We only bother retrying because there's a chance that the 791 * failure to create a worker is due to some temporary condition 792 * in the forking task (e.g. outstanding signal); give the task 793 * some time to clear that condition. 794 */ 795 schedule_delayed_work(&worker->work, 796 msecs_to_jiffies(worker->init_retries * 5)); 797} 798 799static void create_worker_cont(struct callback_head *cb) 800{ 801 struct io_worker *worker; 802 struct task_struct *tsk; 803 struct io_wq *wq; 804 struct io_wq_acct *acct; 805 806 worker = container_of(cb, struct io_worker, create_work); 807 clear_bit_unlock(0, &worker->create_state); 808 wq = worker->wq; 809 acct = io_wq_get_acct(worker); 810 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 811 if (!IS_ERR(tsk)) { 812 io_init_new_worker(wq, acct, worker, tsk); 813 io_worker_release(worker); 814 return; 815 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 816 atomic_dec(&acct->nr_running); 817 raw_spin_lock(&acct->workers_lock); 818 acct->nr_workers--; 819 if (!acct->nr_workers) { 820 struct io_cb_cancel_data match = { 821 .fn = io_wq_work_match_all, 822 .cancel_all = true, 823 }; 824 825 raw_spin_unlock(&acct->workers_lock); 826 while (io_acct_cancel_pending_work(wq, acct, &match)) 827 ; 828 } else { 829 raw_spin_unlock(&acct->workers_lock); 830 } 831 io_worker_ref_put(wq); 832 kfree(worker); 833 return; 834 } 835 836 /* re-create attempts grab a new worker ref, drop the existing one */ 837 io_worker_release(worker); 838 queue_create_worker_retry(worker); 839} 840 841static void io_workqueue_create(struct work_struct *work) 842{ 843 struct io_worker *worker = container_of(work, struct io_worker, 844 work.work); 845 struct io_wq_acct *acct = io_wq_get_acct(worker); 846 847 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 848 kfree(worker); 849} 850 851static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct) 852{ 853 struct io_worker *worker; 854 struct task_struct *tsk; 855 856 __set_current_state(TASK_RUNNING); 857 858 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 859 if (!worker) { 860fail: 861 atomic_dec(&acct->nr_running); 862 raw_spin_lock(&acct->workers_lock); 863 acct->nr_workers--; 864 raw_spin_unlock(&acct->workers_lock); 865 io_worker_ref_put(wq); 866 return false; 867 } 868 869 refcount_set(&worker->ref, 1); 870 worker->wq = wq; 871 worker->acct = acct; 872 raw_spin_lock_init(&worker->lock); 873 init_completion(&worker->ref_done); 874 875 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 876 if (!IS_ERR(tsk)) { 877 io_init_new_worker(wq, acct, worker, tsk); 878 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 879 kfree(worker); 880 goto fail; 881 } else { 882 INIT_DELAYED_WORK(&worker->work, io_workqueue_create); 883 queue_create_worker_retry(worker); 884 } 885 886 return true; 887} 888 889/* 890 * Iterate the passed in list and call the specific function for each 891 * worker that isn't exiting 892 */ 893static bool io_acct_for_each_worker(struct io_wq_acct *acct, 894 bool (*func)(struct io_worker *, void *), 895 void *data) 896{ 897 struct io_worker *worker; 898 bool ret = false; 899 900 list_for_each_entry_rcu(worker, &acct->all_list, all_list) { 901 if (io_worker_get(worker)) { 902 /* no task if node is/was offline */ 903 if (worker->task) 904 ret = func(worker, data); 905 io_worker_release(worker); 906 if (ret) 907 break; 908 } 909 } 910 911 return ret; 912} 913 914static bool io_wq_for_each_worker(struct io_wq *wq, 915 bool (*func)(struct io_worker *, void *), 916 void *data) 917{ 918 for (int i = 0; i < IO_WQ_ACCT_NR; i++) { 919 if (!io_acct_for_each_worker(&wq->acct[i], func, data)) 920 return false; 921 } 922 923 return true; 924} 925 926static bool io_wq_worker_wake(struct io_worker *worker, void *data) 927{ 928 __set_notify_signal(worker->task); 929 wake_up_process(worker->task); 930 return false; 931} 932 933static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 934{ 935 do { 936 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 937 wq->do_work(work); 938 work = wq->free_work(work); 939 } while (work); 940} 941 942static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, 943 struct io_wq_work *work, unsigned int work_flags) 944{ 945 unsigned int hash; 946 struct io_wq_work *tail; 947 948 if (!__io_wq_is_hashed(work_flags)) { 949append: 950 wq_list_add_tail(&work->list, &acct->work_list); 951 return; 952 } 953 954 hash = __io_get_work_hash(work_flags); 955 tail = wq->hash_tail[hash]; 956 wq->hash_tail[hash] = work; 957 if (!tail) 958 goto append; 959 960 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 961} 962 963static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 964{ 965 return work == data; 966} 967 968void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 969{ 970 unsigned int work_flags = atomic_read(&work->flags); 971 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); 972 struct io_cb_cancel_data match = { 973 .fn = io_wq_work_match_item, 974 .data = work, 975 .cancel_all = false, 976 }; 977 bool do_create; 978 979 /* 980 * If io-wq is exiting for this task, or if the request has explicitly 981 * been marked as one that should not get executed, cancel it here. 982 */ 983 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 984 (work_flags & IO_WQ_WORK_CANCEL)) { 985 io_run_cancel(work, wq); 986 return; 987 } 988 989 raw_spin_lock(&acct->lock); 990 io_wq_insert_work(wq, acct, work, work_flags); 991 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 992 raw_spin_unlock(&acct->lock); 993 994 rcu_read_lock(); 995 do_create = !io_acct_activate_free_worker(acct); 996 rcu_read_unlock(); 997 998 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 999 !atomic_read(&acct->nr_running))) { 1000 bool did_create; 1001 1002 did_create = io_wq_create_worker(wq, acct); 1003 if (likely(did_create)) 1004 return; 1005 1006 raw_spin_lock(&acct->workers_lock); 1007 if (acct->nr_workers) { 1008 raw_spin_unlock(&acct->workers_lock); 1009 return; 1010 } 1011 raw_spin_unlock(&acct->workers_lock); 1012 1013 /* fatal condition, failed to create the first worker */ 1014 io_acct_cancel_pending_work(wq, acct, &match); 1015 } 1016} 1017 1018/* 1019 * Work items that hash to the same value will not be done in parallel. 1020 * Used to limit concurrent writes, generally hashed by inode. 1021 */ 1022void io_wq_hash_work(struct io_wq_work *work, void *val) 1023{ 1024 unsigned int bit; 1025 1026 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 1027 atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); 1028} 1029 1030static bool __io_wq_worker_cancel(struct io_worker *worker, 1031 struct io_cb_cancel_data *match, 1032 struct io_wq_work *work) 1033{ 1034 if (work && match->fn(work, match->data)) { 1035 atomic_or(IO_WQ_WORK_CANCEL, &work->flags); 1036 __set_notify_signal(worker->task); 1037 return true; 1038 } 1039 1040 return false; 1041} 1042 1043static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1044{ 1045 struct io_cb_cancel_data *match = data; 1046 1047 /* 1048 * Hold the lock to avoid ->cur_work going out of scope, caller 1049 * may dereference the passed in work. 1050 */ 1051 raw_spin_lock(&worker->lock); 1052 if (__io_wq_worker_cancel(worker, match, worker->cur_work)) 1053 match->nr_running++; 1054 raw_spin_unlock(&worker->lock); 1055 1056 return match->nr_running && !match->cancel_all; 1057} 1058 1059static inline void io_wq_remove_pending(struct io_wq *wq, 1060 struct io_wq_acct *acct, 1061 struct io_wq_work *work, 1062 struct io_wq_work_node *prev) 1063{ 1064 unsigned int hash = io_get_work_hash(work); 1065 struct io_wq_work *prev_work = NULL; 1066 1067 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1068 if (prev) 1069 prev_work = container_of(prev, struct io_wq_work, list); 1070 if (prev_work && io_get_work_hash(prev_work) == hash) 1071 wq->hash_tail[hash] = prev_work; 1072 else 1073 wq->hash_tail[hash] = NULL; 1074 } 1075 wq_list_del(&acct->work_list, &work->list, prev); 1076} 1077 1078static bool io_acct_cancel_pending_work(struct io_wq *wq, 1079 struct io_wq_acct *acct, 1080 struct io_cb_cancel_data *match) 1081{ 1082 struct io_wq_work_node *node, *prev; 1083 struct io_wq_work *work; 1084 1085 raw_spin_lock(&acct->lock); 1086 wq_list_for_each(node, prev, &acct->work_list) { 1087 work = container_of(node, struct io_wq_work, list); 1088 if (!match->fn(work, match->data)) 1089 continue; 1090 io_wq_remove_pending(wq, acct, work, prev); 1091 raw_spin_unlock(&acct->lock); 1092 io_run_cancel(work, wq); 1093 match->nr_pending++; 1094 /* not safe to continue after unlock */ 1095 return true; 1096 } 1097 raw_spin_unlock(&acct->lock); 1098 1099 return false; 1100} 1101 1102static void io_wq_cancel_pending_work(struct io_wq *wq, 1103 struct io_cb_cancel_data *match) 1104{ 1105 int i; 1106retry: 1107 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1108 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1109 1110 if (io_acct_cancel_pending_work(wq, acct, match)) { 1111 if (match->cancel_all) 1112 goto retry; 1113 break; 1114 } 1115 } 1116} 1117 1118static void io_acct_cancel_running_work(struct io_wq_acct *acct, 1119 struct io_cb_cancel_data *match) 1120{ 1121 raw_spin_lock(&acct->workers_lock); 1122 io_acct_for_each_worker(acct, io_wq_worker_cancel, match); 1123 raw_spin_unlock(&acct->workers_lock); 1124} 1125 1126static void io_wq_cancel_running_work(struct io_wq *wq, 1127 struct io_cb_cancel_data *match) 1128{ 1129 rcu_read_lock(); 1130 1131 for (int i = 0; i < IO_WQ_ACCT_NR; i++) 1132 io_acct_cancel_running_work(&wq->acct[i], match); 1133 1134 rcu_read_unlock(); 1135} 1136 1137enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1138 void *data, bool cancel_all) 1139{ 1140 struct io_cb_cancel_data match = { 1141 .fn = cancel, 1142 .data = data, 1143 .cancel_all = cancel_all, 1144 }; 1145 1146 /* 1147 * First check pending list, if we're lucky we can just remove it 1148 * from there. CANCEL_OK means that the work is returned as-new, 1149 * no completion will be posted for it. 1150 * 1151 * Then check if a free (going busy) or busy worker has the work 1152 * currently running. If we find it there, we'll return CANCEL_RUNNING 1153 * as an indication that we attempt to signal cancellation. The 1154 * completion will run normally in this case. 1155 * 1156 * Do both of these while holding the acct->workers_lock, to ensure that 1157 * we'll find a work item regardless of state. 1158 */ 1159 io_wq_cancel_pending_work(wq, &match); 1160 if (match.nr_pending && !match.cancel_all) 1161 return IO_WQ_CANCEL_OK; 1162 1163 io_wq_cancel_running_work(wq, &match); 1164 if (match.nr_running && !match.cancel_all) 1165 return IO_WQ_CANCEL_RUNNING; 1166 1167 if (match.nr_running) 1168 return IO_WQ_CANCEL_RUNNING; 1169 if (match.nr_pending) 1170 return IO_WQ_CANCEL_OK; 1171 return IO_WQ_CANCEL_NOTFOUND; 1172} 1173 1174static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1175 int sync, void *key) 1176{ 1177 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1178 int i; 1179 1180 list_del_init(&wait->entry); 1181 1182 rcu_read_lock(); 1183 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1184 struct io_wq_acct *acct = &wq->acct[i]; 1185 1186 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1187 io_acct_activate_free_worker(acct); 1188 } 1189 rcu_read_unlock(); 1190 return 1; 1191} 1192 1193struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1194{ 1195 int ret, i; 1196 struct io_wq *wq; 1197 1198 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1199 return ERR_PTR(-EINVAL); 1200 if (WARN_ON_ONCE(!bounded)) 1201 return ERR_PTR(-EINVAL); 1202 1203 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1204 if (!wq) 1205 return ERR_PTR(-ENOMEM); 1206 1207 refcount_inc(&data->hash->refs); 1208 wq->hash = data->hash; 1209 wq->free_work = data->free_work; 1210 wq->do_work = data->do_work; 1211 1212 ret = -ENOMEM; 1213 1214 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1215 goto err; 1216 cpuset_cpus_allowed(data->task, wq->cpu_mask); 1217 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1218 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1219 task_rlimit(current, RLIMIT_NPROC); 1220 INIT_LIST_HEAD(&wq->wait.entry); 1221 wq->wait.func = io_wq_hash_wake; 1222 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1223 struct io_wq_acct *acct = &wq->acct[i]; 1224 1225 atomic_set(&acct->nr_running, 0); 1226 1227 raw_spin_lock_init(&acct->workers_lock); 1228 INIT_HLIST_NULLS_HEAD(&acct->free_list, 0); 1229 INIT_LIST_HEAD(&acct->all_list); 1230 1231 INIT_WQ_LIST(&acct->work_list); 1232 raw_spin_lock_init(&acct->lock); 1233 } 1234 1235 wq->task = get_task_struct(data->task); 1236 atomic_set(&wq->worker_refs, 1); 1237 init_completion(&wq->worker_done); 1238 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1239 if (ret) 1240 goto err; 1241 1242 return wq; 1243err: 1244 io_wq_put_hash(data->hash); 1245 free_cpumask_var(wq->cpu_mask); 1246 kfree(wq); 1247 return ERR_PTR(ret); 1248} 1249 1250static bool io_task_work_match(struct callback_head *cb, void *data) 1251{ 1252 struct io_worker *worker; 1253 1254 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1255 return false; 1256 worker = container_of(cb, struct io_worker, create_work); 1257 return worker->wq == data; 1258} 1259 1260void io_wq_exit_start(struct io_wq *wq) 1261{ 1262 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1263} 1264 1265static void io_wq_cancel_tw_create(struct io_wq *wq) 1266{ 1267 struct callback_head *cb; 1268 1269 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1270 struct io_worker *worker; 1271 1272 worker = container_of(cb, struct io_worker, create_work); 1273 io_worker_cancel_cb(worker); 1274 /* 1275 * Only the worker continuation helper has worker allocated and 1276 * hence needs freeing. 1277 */ 1278 if (cb->func == create_worker_cont) 1279 kfree(worker); 1280 } 1281} 1282 1283static void io_wq_exit_workers(struct io_wq *wq) 1284{ 1285 if (!wq->task) 1286 return; 1287 1288 io_wq_cancel_tw_create(wq); 1289 1290 rcu_read_lock(); 1291 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1292 rcu_read_unlock(); 1293 io_worker_ref_put(wq); 1294 wait_for_completion(&wq->worker_done); 1295 1296 spin_lock_irq(&wq->hash->wait.lock); 1297 list_del_init(&wq->wait.entry); 1298 spin_unlock_irq(&wq->hash->wait.lock); 1299 1300 put_task_struct(wq->task); 1301 wq->task = NULL; 1302} 1303 1304static void io_wq_destroy(struct io_wq *wq) 1305{ 1306 struct io_cb_cancel_data match = { 1307 .fn = io_wq_work_match_all, 1308 .cancel_all = true, 1309 }; 1310 1311 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1312 io_wq_cancel_pending_work(wq, &match); 1313 free_cpumask_var(wq->cpu_mask); 1314 io_wq_put_hash(wq->hash); 1315 kfree(wq); 1316} 1317 1318void io_wq_put_and_exit(struct io_wq *wq) 1319{ 1320 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1321 1322 io_wq_exit_workers(wq); 1323 io_wq_destroy(wq); 1324} 1325 1326struct online_data { 1327 unsigned int cpu; 1328 bool online; 1329}; 1330 1331static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1332{ 1333 struct online_data *od = data; 1334 1335 if (od->online) 1336 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1337 else 1338 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1339 return false; 1340} 1341 1342static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1343{ 1344 struct online_data od = { 1345 .cpu = cpu, 1346 .online = online 1347 }; 1348 1349 rcu_read_lock(); 1350 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1351 rcu_read_unlock(); 1352 return 0; 1353} 1354 1355static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1356{ 1357 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1358 1359 return __io_wq_cpu_online(wq, cpu, true); 1360} 1361 1362static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1363{ 1364 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1365 1366 return __io_wq_cpu_online(wq, cpu, false); 1367} 1368 1369int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1370{ 1371 cpumask_var_t allowed_mask; 1372 int ret = 0; 1373 1374 if (!tctx || !tctx->io_wq) 1375 return -EINVAL; 1376 1377 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 1378 return -ENOMEM; 1379 1380 rcu_read_lock(); 1381 cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); 1382 if (mask) { 1383 if (cpumask_subset(mask, allowed_mask)) 1384 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1385 else 1386 ret = -EINVAL; 1387 } else { 1388 cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask); 1389 } 1390 rcu_read_unlock(); 1391 1392 free_cpumask_var(allowed_mask); 1393 return ret; 1394} 1395 1396/* 1397 * Set max number of unbounded workers, returns old value. If new_count is 0, 1398 * then just return the old value. 1399 */ 1400int io_wq_max_workers(struct io_wq *wq, int *new_count) 1401{ 1402 struct io_wq_acct *acct; 1403 int prev[IO_WQ_ACCT_NR]; 1404 int i; 1405 1406 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1407 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1408 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1409 1410 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1411 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1412 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1413 } 1414 1415 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1416 prev[i] = 0; 1417 1418 rcu_read_lock(); 1419 1420 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1421 acct = &wq->acct[i]; 1422 raw_spin_lock(&acct->workers_lock); 1423 prev[i] = max_t(int, acct->max_workers, prev[i]); 1424 if (new_count[i]) 1425 acct->max_workers = new_count[i]; 1426 raw_spin_unlock(&acct->workers_lock); 1427 } 1428 rcu_read_unlock(); 1429 1430 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1431 new_count[i] = prev[i]; 1432 1433 return 0; 1434} 1435 1436static __init int io_wq_init(void) 1437{ 1438 int ret; 1439 1440 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1441 io_wq_cpu_online, io_wq_cpu_offline); 1442 if (ret < 0) 1443 return ret; 1444 io_wq_online = ret; 1445 return 0; 1446} 1447subsys_initcall(io_wq_init);