Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.10 1249 lines 31 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/mm.h> 13#include <linux/sched/mm.h> 14#include <linux/percpu.h> 15#include <linux/slab.h> 16#include <linux/kthread.h> 17#include <linux/rculist_nulls.h> 18#include <linux/fs_struct.h> 19#include <linux/task_work.h> 20#include <linux/blk-cgroup.h> 21#include <linux/audit.h> 22#include <linux/cpu.h> 23 24#include "../kernel/sched/sched.h" 25#include "io-wq.h" 26 27#define WORKER_IDLE_TIMEOUT (5 * HZ) 28 29enum { 30 IO_WORKER_F_UP = 1, /* up and active */ 31 IO_WORKER_F_RUNNING = 2, /* account as running */ 32 IO_WORKER_F_FREE = 4, /* worker on free list */ 33 IO_WORKER_F_FIXED = 8, /* static idle worker */ 34 IO_WORKER_F_BOUND = 16, /* is doing bounded work */ 35}; 36 37enum { 38 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 39 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */ 40 IO_WQ_BIT_ERROR = 2, /* error on setup */ 41}; 42 43enum { 44 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ 45}; 46 47/* 48 * One for each thread in a wqe pool 49 */ 50struct io_worker { 51 refcount_t ref; 52 unsigned flags; 53 struct hlist_nulls_node nulls_node; 54 struct list_head all_list; 55 struct task_struct *task; 56 struct io_wqe *wqe; 57 58 struct io_wq_work *cur_work; 59 spinlock_t lock; 60 61 struct rcu_head rcu; 62 struct mm_struct *mm; 63#ifdef CONFIG_BLK_CGROUP 64 struct cgroup_subsys_state *blkcg_css; 65#endif 66 const struct cred *cur_creds; 67 const struct cred *saved_creds; 68 struct files_struct *restore_files; 69 struct nsproxy *restore_nsproxy; 70 struct fs_struct *restore_fs; 71}; 72 73#if BITS_PER_LONG == 64 74#define IO_WQ_HASH_ORDER 6 75#else 76#define IO_WQ_HASH_ORDER 5 77#endif 78 79#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 80 81struct io_wqe_acct { 82 unsigned nr_workers; 83 unsigned max_workers; 84 atomic_t nr_running; 85}; 86 87enum { 88 IO_WQ_ACCT_BOUND, 89 IO_WQ_ACCT_UNBOUND, 90}; 91 92/* 93 * Per-node worker thread pool 94 */ 95struct io_wqe { 96 struct { 97 raw_spinlock_t lock; 98 struct io_wq_work_list work_list; 99 unsigned long hash_map; 100 unsigned flags; 101 } ____cacheline_aligned_in_smp; 102 103 int node; 104 struct io_wqe_acct acct[2]; 105 106 struct hlist_nulls_head free_list; 107 struct list_head all_list; 108 109 struct io_wq *wq; 110 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 111}; 112 113/* 114 * Per io_wq state 115 */ 116struct io_wq { 117 struct io_wqe **wqes; 118 unsigned long state; 119 120 free_work_fn *free_work; 121 io_wq_work_fn *do_work; 122 123 struct task_struct *manager; 124 struct user_struct *user; 125 refcount_t refs; 126 struct completion done; 127 128 struct hlist_node cpuhp_node; 129 130 refcount_t use_refs; 131}; 132 133static enum cpuhp_state io_wq_online; 134 135static bool io_worker_get(struct io_worker *worker) 136{ 137 return refcount_inc_not_zero(&worker->ref); 138} 139 140static void io_worker_release(struct io_worker *worker) 141{ 142 if (refcount_dec_and_test(&worker->ref)) 143 wake_up_process(worker->task); 144} 145 146/* 147 * Note: drops the wqe->lock if returning true! The caller must re-acquire 148 * the lock in that case. Some callers need to restart handling if this 149 * happens, so we can't just re-acquire the lock on behalf of the caller. 150 */ 151static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) 152{ 153 bool dropped_lock = false; 154 155 if (worker->saved_creds) { 156 revert_creds(worker->saved_creds); 157 worker->cur_creds = worker->saved_creds = NULL; 158 } 159 160 if (current->files != worker->restore_files) { 161 __acquire(&wqe->lock); 162 raw_spin_unlock_irq(&wqe->lock); 163 dropped_lock = true; 164 165 task_lock(current); 166 current->files = worker->restore_files; 167 current->nsproxy = worker->restore_nsproxy; 168 task_unlock(current); 169 } 170 171 if (current->fs != worker->restore_fs) 172 current->fs = worker->restore_fs; 173 174 /* 175 * If we have an active mm, we need to drop the wq lock before unusing 176 * it. If we do, return true and let the caller retry the idle loop. 177 */ 178 if (worker->mm) { 179 if (!dropped_lock) { 180 __acquire(&wqe->lock); 181 raw_spin_unlock_irq(&wqe->lock); 182 dropped_lock = true; 183 } 184 __set_current_state(TASK_RUNNING); 185 kthread_unuse_mm(worker->mm); 186 mmput(worker->mm); 187 worker->mm = NULL; 188 } 189 190#ifdef CONFIG_BLK_CGROUP 191 if (worker->blkcg_css) { 192 kthread_associate_blkcg(NULL); 193 worker->blkcg_css = NULL; 194 } 195#endif 196 if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) 197 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; 198 return dropped_lock; 199} 200 201static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, 202 struct io_wq_work *work) 203{ 204 if (work->flags & IO_WQ_WORK_UNBOUND) 205 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 206 207 return &wqe->acct[IO_WQ_ACCT_BOUND]; 208} 209 210static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, 211 struct io_worker *worker) 212{ 213 if (worker->flags & IO_WORKER_F_BOUND) 214 return &wqe->acct[IO_WQ_ACCT_BOUND]; 215 216 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 217} 218 219static void io_worker_exit(struct io_worker *worker) 220{ 221 struct io_wqe *wqe = worker->wqe; 222 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 223 224 /* 225 * If we're not at zero, someone else is holding a brief reference 226 * to the worker. Wait for that to go away. 227 */ 228 set_current_state(TASK_INTERRUPTIBLE); 229 if (!refcount_dec_and_test(&worker->ref)) 230 schedule(); 231 __set_current_state(TASK_RUNNING); 232 233 preempt_disable(); 234 current->flags &= ~PF_IO_WORKER; 235 if (worker->flags & IO_WORKER_F_RUNNING) 236 atomic_dec(&acct->nr_running); 237 if (!(worker->flags & IO_WORKER_F_BOUND)) 238 atomic_dec(&wqe->wq->user->processes); 239 worker->flags = 0; 240 preempt_enable(); 241 242 raw_spin_lock_irq(&wqe->lock); 243 hlist_nulls_del_rcu(&worker->nulls_node); 244 list_del_rcu(&worker->all_list); 245 if (__io_worker_unuse(wqe, worker)) { 246 __release(&wqe->lock); 247 raw_spin_lock_irq(&wqe->lock); 248 } 249 acct->nr_workers--; 250 raw_spin_unlock_irq(&wqe->lock); 251 252 kfree_rcu(worker, rcu); 253 if (refcount_dec_and_test(&wqe->wq->refs)) 254 complete(&wqe->wq->done); 255} 256 257static inline bool io_wqe_run_queue(struct io_wqe *wqe) 258 __must_hold(wqe->lock) 259{ 260 if (!wq_list_empty(&wqe->work_list) && 261 !(wqe->flags & IO_WQE_FLAG_STALLED)) 262 return true; 263 return false; 264} 265 266/* 267 * Check head of free list for an available worker. If one isn't available, 268 * caller must wake up the wq manager to create one. 269 */ 270static bool io_wqe_activate_free_worker(struct io_wqe *wqe) 271 __must_hold(RCU) 272{ 273 struct hlist_nulls_node *n; 274 struct io_worker *worker; 275 276 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); 277 if (is_a_nulls(n)) 278 return false; 279 280 worker = hlist_nulls_entry(n, struct io_worker, nulls_node); 281 if (io_worker_get(worker)) { 282 wake_up_process(worker->task); 283 io_worker_release(worker); 284 return true; 285 } 286 287 return false; 288} 289 290/* 291 * We need a worker. If we find a free one, we're good. If not, and we're 292 * below the max number of workers, wake up the manager to create one. 293 */ 294static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) 295{ 296 bool ret; 297 298 /* 299 * Most likely an attempt to queue unbounded work on an io_wq that 300 * wasn't setup with any unbounded workers. 301 */ 302 WARN_ON_ONCE(!acct->max_workers); 303 304 rcu_read_lock(); 305 ret = io_wqe_activate_free_worker(wqe); 306 rcu_read_unlock(); 307 308 if (!ret && acct->nr_workers < acct->max_workers) 309 wake_up_process(wqe->wq->manager); 310} 311 312static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) 313{ 314 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 315 316 atomic_inc(&acct->nr_running); 317} 318 319static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) 320 __must_hold(wqe->lock) 321{ 322 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 323 324 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) 325 io_wqe_wake_worker(wqe, acct); 326} 327 328static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) 329{ 330 allow_kernel_signal(SIGINT); 331 332 current->flags |= PF_IO_WORKER; 333 334 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 335 worker->restore_files = current->files; 336 worker->restore_nsproxy = current->nsproxy; 337 worker->restore_fs = current->fs; 338 io_wqe_inc_running(wqe, worker); 339} 340 341/* 342 * Worker will start processing some work. Move it to the busy list, if 343 * it's currently on the freelist 344 */ 345static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, 346 struct io_wq_work *work) 347 __must_hold(wqe->lock) 348{ 349 bool worker_bound, work_bound; 350 351 if (worker->flags & IO_WORKER_F_FREE) { 352 worker->flags &= ~IO_WORKER_F_FREE; 353 hlist_nulls_del_init_rcu(&worker->nulls_node); 354 } 355 356 /* 357 * If worker is moving from bound to unbound (or vice versa), then 358 * ensure we update the running accounting. 359 */ 360 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; 361 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; 362 if (worker_bound != work_bound) { 363 io_wqe_dec_running(wqe, worker); 364 if (work_bound) { 365 worker->flags |= IO_WORKER_F_BOUND; 366 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; 367 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; 368 atomic_dec(&wqe->wq->user->processes); 369 } else { 370 worker->flags &= ~IO_WORKER_F_BOUND; 371 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; 372 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; 373 atomic_inc(&wqe->wq->user->processes); 374 } 375 io_wqe_inc_running(wqe, worker); 376 } 377} 378 379/* 380 * No work, worker going to sleep. Move to freelist, and unuse mm if we 381 * have one attached. Dropping the mm may potentially sleep, so we drop 382 * the lock in that case and return success. Since the caller has to 383 * retry the loop in that case (we changed task state), we don't regrab 384 * the lock if we return success. 385 */ 386static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) 387 __must_hold(wqe->lock) 388{ 389 if (!(worker->flags & IO_WORKER_F_FREE)) { 390 worker->flags |= IO_WORKER_F_FREE; 391 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 392 } 393 394 return __io_worker_unuse(wqe, worker); 395} 396 397static inline unsigned int io_get_work_hash(struct io_wq_work *work) 398{ 399 return work->flags >> IO_WQ_HASH_SHIFT; 400} 401 402static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) 403 __must_hold(wqe->lock) 404{ 405 struct io_wq_work_node *node, *prev; 406 struct io_wq_work *work, *tail; 407 unsigned int hash; 408 409 wq_list_for_each(node, prev, &wqe->work_list) { 410 work = container_of(node, struct io_wq_work, list); 411 412 /* not hashed, can run anytime */ 413 if (!io_wq_is_hashed(work)) { 414 wq_list_del(&wqe->work_list, node, prev); 415 return work; 416 } 417 418 /* hashed, can run if not already running */ 419 hash = io_get_work_hash(work); 420 if (!(wqe->hash_map & BIT(hash))) { 421 wqe->hash_map |= BIT(hash); 422 /* all items with this hash lie in [work, tail] */ 423 tail = wqe->hash_tail[hash]; 424 wqe->hash_tail[hash] = NULL; 425 wq_list_cut(&wqe->work_list, &tail->list, prev); 426 return work; 427 } 428 } 429 430 return NULL; 431} 432 433static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) 434{ 435 if (worker->mm) { 436 kthread_unuse_mm(worker->mm); 437 mmput(worker->mm); 438 worker->mm = NULL; 439 } 440 441 if (mmget_not_zero(work->identity->mm)) { 442 kthread_use_mm(work->identity->mm); 443 worker->mm = work->identity->mm; 444 return; 445 } 446 447 /* failed grabbing mm, ensure work gets cancelled */ 448 work->flags |= IO_WQ_WORK_CANCEL; 449} 450 451static inline void io_wq_switch_blkcg(struct io_worker *worker, 452 struct io_wq_work *work) 453{ 454#ifdef CONFIG_BLK_CGROUP 455 if (!(work->flags & IO_WQ_WORK_BLKCG)) 456 return; 457 if (work->identity->blkcg_css != worker->blkcg_css) { 458 kthread_associate_blkcg(work->identity->blkcg_css); 459 worker->blkcg_css = work->identity->blkcg_css; 460 } 461#endif 462} 463 464static void io_wq_switch_creds(struct io_worker *worker, 465 struct io_wq_work *work) 466{ 467 const struct cred *old_creds = override_creds(work->identity->creds); 468 469 worker->cur_creds = work->identity->creds; 470 if (worker->saved_creds) 471 put_cred(old_creds); /* creds set by previous switch */ 472 else 473 worker->saved_creds = old_creds; 474} 475 476static void io_impersonate_work(struct io_worker *worker, 477 struct io_wq_work *work) 478{ 479 if ((work->flags & IO_WQ_WORK_FILES) && 480 current->files != work->identity->files) { 481 task_lock(current); 482 current->files = work->identity->files; 483 current->nsproxy = work->identity->nsproxy; 484 task_unlock(current); 485 if (!work->identity->files) { 486 /* failed grabbing files, ensure work gets cancelled */ 487 work->flags |= IO_WQ_WORK_CANCEL; 488 } 489 } 490 if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs) 491 current->fs = work->identity->fs; 492 if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm) 493 io_wq_switch_mm(worker, work); 494 if ((work->flags & IO_WQ_WORK_CREDS) && 495 worker->cur_creds != work->identity->creds) 496 io_wq_switch_creds(worker, work); 497 if (work->flags & IO_WQ_WORK_FSIZE) 498 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize; 499 else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) 500 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; 501 io_wq_switch_blkcg(worker, work); 502#ifdef CONFIG_AUDIT 503 current->loginuid = work->identity->loginuid; 504 current->sessionid = work->identity->sessionid; 505#endif 506} 507 508static void io_assign_current_work(struct io_worker *worker, 509 struct io_wq_work *work) 510{ 511 if (work) { 512 /* flush pending signals before assigning new work */ 513 if (signal_pending(current)) 514 flush_signals(current); 515 cond_resched(); 516 } 517 518#ifdef CONFIG_AUDIT 519 current->loginuid = KUIDT_INIT(AUDIT_UID_UNSET); 520 current->sessionid = AUDIT_SID_UNSET; 521#endif 522 523 spin_lock_irq(&worker->lock); 524 worker->cur_work = work; 525 spin_unlock_irq(&worker->lock); 526} 527 528static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); 529 530static void io_worker_handle_work(struct io_worker *worker) 531 __releases(wqe->lock) 532{ 533 struct io_wqe *wqe = worker->wqe; 534 struct io_wq *wq = wqe->wq; 535 536 do { 537 struct io_wq_work *work; 538get_next: 539 /* 540 * If we got some work, mark us as busy. If we didn't, but 541 * the list isn't empty, it means we stalled on hashed work. 542 * Mark us stalled so we don't keep looking for work when we 543 * can't make progress, any work completion or insertion will 544 * clear the stalled flag. 545 */ 546 work = io_get_next_work(wqe); 547 if (work) 548 __io_worker_busy(wqe, worker, work); 549 else if (!wq_list_empty(&wqe->work_list)) 550 wqe->flags |= IO_WQE_FLAG_STALLED; 551 552 raw_spin_unlock_irq(&wqe->lock); 553 if (!work) 554 break; 555 io_assign_current_work(worker, work); 556 557 /* handle a whole dependent link */ 558 do { 559 struct io_wq_work *old_work, *next_hashed, *linked; 560 unsigned int hash = io_get_work_hash(work); 561 562 next_hashed = wq_next_work(work); 563 io_impersonate_work(worker, work); 564 /* 565 * OK to set IO_WQ_WORK_CANCEL even for uncancellable 566 * work, the worker function will do the right thing. 567 */ 568 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) 569 work->flags |= IO_WQ_WORK_CANCEL; 570 571 old_work = work; 572 linked = wq->do_work(work); 573 574 work = next_hashed; 575 if (!work && linked && !io_wq_is_hashed(linked)) { 576 work = linked; 577 linked = NULL; 578 } 579 io_assign_current_work(worker, work); 580 wq->free_work(old_work); 581 582 if (linked) 583 io_wqe_enqueue(wqe, linked); 584 585 if (hash != -1U && !next_hashed) { 586 raw_spin_lock_irq(&wqe->lock); 587 wqe->hash_map &= ~BIT_ULL(hash); 588 wqe->flags &= ~IO_WQE_FLAG_STALLED; 589 /* skip unnecessary unlock-lock wqe->lock */ 590 if (!work) 591 goto get_next; 592 raw_spin_unlock_irq(&wqe->lock); 593 } 594 } while (work); 595 596 raw_spin_lock_irq(&wqe->lock); 597 } while (1); 598} 599 600static int io_wqe_worker(void *data) 601{ 602 struct io_worker *worker = data; 603 struct io_wqe *wqe = worker->wqe; 604 struct io_wq *wq = wqe->wq; 605 606 io_worker_start(wqe, worker); 607 608 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 609 set_current_state(TASK_INTERRUPTIBLE); 610loop: 611 raw_spin_lock_irq(&wqe->lock); 612 if (io_wqe_run_queue(wqe)) { 613 __set_current_state(TASK_RUNNING); 614 io_worker_handle_work(worker); 615 goto loop; 616 } 617 /* drops the lock on success, retry */ 618 if (__io_worker_idle(wqe, worker)) { 619 __release(&wqe->lock); 620 goto loop; 621 } 622 raw_spin_unlock_irq(&wqe->lock); 623 if (signal_pending(current)) 624 flush_signals(current); 625 if (schedule_timeout(WORKER_IDLE_TIMEOUT)) 626 continue; 627 /* timed out, exit unless we're the fixed worker */ 628 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 629 !(worker->flags & IO_WORKER_F_FIXED)) 630 break; 631 } 632 633 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 634 raw_spin_lock_irq(&wqe->lock); 635 if (!wq_list_empty(&wqe->work_list)) 636 io_worker_handle_work(worker); 637 else 638 raw_spin_unlock_irq(&wqe->lock); 639 } 640 641 io_worker_exit(worker); 642 return 0; 643} 644 645/* 646 * Called when a worker is scheduled in. Mark us as currently running. 647 */ 648void io_wq_worker_running(struct task_struct *tsk) 649{ 650 struct io_worker *worker = kthread_data(tsk); 651 struct io_wqe *wqe = worker->wqe; 652 653 if (!(worker->flags & IO_WORKER_F_UP)) 654 return; 655 if (worker->flags & IO_WORKER_F_RUNNING) 656 return; 657 worker->flags |= IO_WORKER_F_RUNNING; 658 io_wqe_inc_running(wqe, worker); 659} 660 661/* 662 * Called when worker is going to sleep. If there are no workers currently 663 * running and we have work pending, wake up a free one or have the manager 664 * set one up. 665 */ 666void io_wq_worker_sleeping(struct task_struct *tsk) 667{ 668 struct io_worker *worker = kthread_data(tsk); 669 struct io_wqe *wqe = worker->wqe; 670 671 if (!(worker->flags & IO_WORKER_F_UP)) 672 return; 673 if (!(worker->flags & IO_WORKER_F_RUNNING)) 674 return; 675 676 worker->flags &= ~IO_WORKER_F_RUNNING; 677 678 raw_spin_lock_irq(&wqe->lock); 679 io_wqe_dec_running(wqe, worker); 680 raw_spin_unlock_irq(&wqe->lock); 681} 682 683static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) 684{ 685 struct io_wqe_acct *acct = &wqe->acct[index]; 686 struct io_worker *worker; 687 688 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); 689 if (!worker) 690 return false; 691 692 refcount_set(&worker->ref, 1); 693 worker->nulls_node.pprev = NULL; 694 worker->wqe = wqe; 695 spin_lock_init(&worker->lock); 696 697 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, 698 "io_wqe_worker-%d/%d", index, wqe->node); 699 if (IS_ERR(worker->task)) { 700 kfree(worker); 701 return false; 702 } 703 kthread_bind_mask(worker->task, cpumask_of_node(wqe->node)); 704 705 raw_spin_lock_irq(&wqe->lock); 706 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 707 list_add_tail_rcu(&worker->all_list, &wqe->all_list); 708 worker->flags |= IO_WORKER_F_FREE; 709 if (index == IO_WQ_ACCT_BOUND) 710 worker->flags |= IO_WORKER_F_BOUND; 711 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) 712 worker->flags |= IO_WORKER_F_FIXED; 713 acct->nr_workers++; 714 raw_spin_unlock_irq(&wqe->lock); 715 716 if (index == IO_WQ_ACCT_UNBOUND) 717 atomic_inc(&wq->user->processes); 718 719 refcount_inc(&wq->refs); 720 wake_up_process(worker->task); 721 return true; 722} 723 724static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) 725 __must_hold(wqe->lock) 726{ 727 struct io_wqe_acct *acct = &wqe->acct[index]; 728 729 /* if we have available workers or no work, no need */ 730 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) 731 return false; 732 return acct->nr_workers < acct->max_workers; 733} 734 735static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) 736{ 737 send_sig(SIGINT, worker->task, 1); 738 return false; 739} 740 741/* 742 * Iterate the passed in list and call the specific function for each 743 * worker that isn't exiting 744 */ 745static bool io_wq_for_each_worker(struct io_wqe *wqe, 746 bool (*func)(struct io_worker *, void *), 747 void *data) 748{ 749 struct io_worker *worker; 750 bool ret = false; 751 752 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { 753 if (io_worker_get(worker)) { 754 /* no task if node is/was offline */ 755 if (worker->task) 756 ret = func(worker, data); 757 io_worker_release(worker); 758 if (ret) 759 break; 760 } 761 } 762 763 return ret; 764} 765 766static bool io_wq_worker_wake(struct io_worker *worker, void *data) 767{ 768 wake_up_process(worker->task); 769 return false; 770} 771 772/* 773 * Manager thread. Tasked with creating new workers, if we need them. 774 */ 775static int io_wq_manager(void *data) 776{ 777 struct io_wq *wq = data; 778 int node; 779 780 /* create fixed workers */ 781 refcount_set(&wq->refs, 1); 782 for_each_node(node) { 783 if (!node_online(node)) 784 continue; 785 if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) 786 continue; 787 set_bit(IO_WQ_BIT_ERROR, &wq->state); 788 set_bit(IO_WQ_BIT_EXIT, &wq->state); 789 goto out; 790 } 791 792 complete(&wq->done); 793 794 while (!kthread_should_stop()) { 795 if (current->task_works) 796 task_work_run(); 797 798 for_each_node(node) { 799 struct io_wqe *wqe = wq->wqes[node]; 800 bool fork_worker[2] = { false, false }; 801 802 if (!node_online(node)) 803 continue; 804 805 raw_spin_lock_irq(&wqe->lock); 806 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) 807 fork_worker[IO_WQ_ACCT_BOUND] = true; 808 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) 809 fork_worker[IO_WQ_ACCT_UNBOUND] = true; 810 raw_spin_unlock_irq(&wqe->lock); 811 if (fork_worker[IO_WQ_ACCT_BOUND]) 812 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); 813 if (fork_worker[IO_WQ_ACCT_UNBOUND]) 814 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); 815 } 816 set_current_state(TASK_INTERRUPTIBLE); 817 schedule_timeout(HZ); 818 } 819 820 if (current->task_works) 821 task_work_run(); 822 823out: 824 if (refcount_dec_and_test(&wq->refs)) { 825 complete(&wq->done); 826 return 0; 827 } 828 /* if ERROR is set and we get here, we have workers to wake */ 829 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { 830 rcu_read_lock(); 831 for_each_node(node) 832 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); 833 rcu_read_unlock(); 834 } 835 return 0; 836} 837 838static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, 839 struct io_wq_work *work) 840{ 841 bool free_worker; 842 843 if (!(work->flags & IO_WQ_WORK_UNBOUND)) 844 return true; 845 if (atomic_read(&acct->nr_running)) 846 return true; 847 848 rcu_read_lock(); 849 free_worker = !hlist_nulls_empty(&wqe->free_list); 850 rcu_read_unlock(); 851 if (free_worker) 852 return true; 853 854 if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers && 855 !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN))) 856 return false; 857 858 return true; 859} 860 861static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) 862{ 863 struct io_wq *wq = wqe->wq; 864 865 do { 866 struct io_wq_work *old_work = work; 867 868 work->flags |= IO_WQ_WORK_CANCEL; 869 work = wq->do_work(work); 870 wq->free_work(old_work); 871 } while (work); 872} 873 874static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) 875{ 876 unsigned int hash; 877 struct io_wq_work *tail; 878 879 if (!io_wq_is_hashed(work)) { 880append: 881 wq_list_add_tail(&work->list, &wqe->work_list); 882 return; 883 } 884 885 hash = io_get_work_hash(work); 886 tail = wqe->hash_tail[hash]; 887 wqe->hash_tail[hash] = work; 888 if (!tail) 889 goto append; 890 891 wq_list_add_after(&work->list, &tail->list, &wqe->work_list); 892} 893 894static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) 895{ 896 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 897 int work_flags; 898 unsigned long flags; 899 900 /* 901 * Do early check to see if we need a new unbound worker, and if we do, 902 * if we're allowed to do so. This isn't 100% accurate as there's a 903 * gap between this check and incrementing the value, but that's OK. 904 * It's close enough to not be an issue, fork() has the same delay. 905 */ 906 if (unlikely(!io_wq_can_queue(wqe, acct, work))) { 907 io_run_cancel(work, wqe); 908 return; 909 } 910 911 work_flags = work->flags; 912 raw_spin_lock_irqsave(&wqe->lock, flags); 913 io_wqe_insert_work(wqe, work); 914 wqe->flags &= ~IO_WQE_FLAG_STALLED; 915 raw_spin_unlock_irqrestore(&wqe->lock, flags); 916 917 if ((work_flags & IO_WQ_WORK_CONCURRENT) || 918 !atomic_read(&acct->nr_running)) 919 io_wqe_wake_worker(wqe, acct); 920} 921 922void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 923{ 924 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 925 926 io_wqe_enqueue(wqe, work); 927} 928 929/* 930 * Work items that hash to the same value will not be done in parallel. 931 * Used to limit concurrent writes, generally hashed by inode. 932 */ 933void io_wq_hash_work(struct io_wq_work *work, void *val) 934{ 935 unsigned int bit; 936 937 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 938 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 939} 940 941void io_wq_cancel_all(struct io_wq *wq) 942{ 943 int node; 944 945 set_bit(IO_WQ_BIT_CANCEL, &wq->state); 946 947 rcu_read_lock(); 948 for_each_node(node) { 949 struct io_wqe *wqe = wq->wqes[node]; 950 951 io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL); 952 } 953 rcu_read_unlock(); 954} 955 956struct io_cb_cancel_data { 957 work_cancel_fn *fn; 958 void *data; 959 int nr_running; 960 int nr_pending; 961 bool cancel_all; 962}; 963 964static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 965{ 966 struct io_cb_cancel_data *match = data; 967 unsigned long flags; 968 969 /* 970 * Hold the lock to avoid ->cur_work going out of scope, caller 971 * may dereference the passed in work. 972 */ 973 spin_lock_irqsave(&worker->lock, flags); 974 if (worker->cur_work && 975 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && 976 match->fn(worker->cur_work, match->data)) { 977 send_sig(SIGINT, worker->task, 1); 978 match->nr_running++; 979 } 980 spin_unlock_irqrestore(&worker->lock, flags); 981 982 return match->nr_running && !match->cancel_all; 983} 984 985static inline void io_wqe_remove_pending(struct io_wqe *wqe, 986 struct io_wq_work *work, 987 struct io_wq_work_node *prev) 988{ 989 unsigned int hash = io_get_work_hash(work); 990 struct io_wq_work *prev_work = NULL; 991 992 if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) { 993 if (prev) 994 prev_work = container_of(prev, struct io_wq_work, list); 995 if (prev_work && io_get_work_hash(prev_work) == hash) 996 wqe->hash_tail[hash] = prev_work; 997 else 998 wqe->hash_tail[hash] = NULL; 999 } 1000 wq_list_del(&wqe->work_list, &work->list, prev); 1001} 1002 1003static void io_wqe_cancel_pending_work(struct io_wqe *wqe, 1004 struct io_cb_cancel_data *match) 1005{ 1006 struct io_wq_work_node *node, *prev; 1007 struct io_wq_work *work; 1008 unsigned long flags; 1009 1010retry: 1011 raw_spin_lock_irqsave(&wqe->lock, flags); 1012 wq_list_for_each(node, prev, &wqe->work_list) { 1013 work = container_of(node, struct io_wq_work, list); 1014 if (!match->fn(work, match->data)) 1015 continue; 1016 io_wqe_remove_pending(wqe, work, prev); 1017 raw_spin_unlock_irqrestore(&wqe->lock, flags); 1018 io_run_cancel(work, wqe); 1019 match->nr_pending++; 1020 if (!match->cancel_all) 1021 return; 1022 1023 /* not safe to continue after unlock */ 1024 goto retry; 1025 } 1026 raw_spin_unlock_irqrestore(&wqe->lock, flags); 1027} 1028 1029static void io_wqe_cancel_running_work(struct io_wqe *wqe, 1030 struct io_cb_cancel_data *match) 1031{ 1032 rcu_read_lock(); 1033 io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); 1034 rcu_read_unlock(); 1035} 1036 1037enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1038 void *data, bool cancel_all) 1039{ 1040 struct io_cb_cancel_data match = { 1041 .fn = cancel, 1042 .data = data, 1043 .cancel_all = cancel_all, 1044 }; 1045 int node; 1046 1047 /* 1048 * First check pending list, if we're lucky we can just remove it 1049 * from there. CANCEL_OK means that the work is returned as-new, 1050 * no completion will be posted for it. 1051 */ 1052 for_each_node(node) { 1053 struct io_wqe *wqe = wq->wqes[node]; 1054 1055 io_wqe_cancel_pending_work(wqe, &match); 1056 if (match.nr_pending && !match.cancel_all) 1057 return IO_WQ_CANCEL_OK; 1058 } 1059 1060 /* 1061 * Now check if a free (going busy) or busy worker has the work 1062 * currently running. If we find it there, we'll return CANCEL_RUNNING 1063 * as an indication that we attempt to signal cancellation. The 1064 * completion will run normally in this case. 1065 */ 1066 for_each_node(node) { 1067 struct io_wqe *wqe = wq->wqes[node]; 1068 1069 io_wqe_cancel_running_work(wqe, &match); 1070 if (match.nr_running && !match.cancel_all) 1071 return IO_WQ_CANCEL_RUNNING; 1072 } 1073 1074 if (match.nr_running) 1075 return IO_WQ_CANCEL_RUNNING; 1076 if (match.nr_pending) 1077 return IO_WQ_CANCEL_OK; 1078 return IO_WQ_CANCEL_NOTFOUND; 1079} 1080 1081static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data) 1082{ 1083 return work == data; 1084} 1085 1086enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) 1087{ 1088 return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork, false); 1089} 1090 1091struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1092{ 1093 int ret = -ENOMEM, node; 1094 struct io_wq *wq; 1095 1096 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1097 return ERR_PTR(-EINVAL); 1098 1099 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 1100 if (!wq) 1101 return ERR_PTR(-ENOMEM); 1102 1103 wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL); 1104 if (!wq->wqes) 1105 goto err_wq; 1106 1107 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1108 if (ret) 1109 goto err_wqes; 1110 1111 wq->free_work = data->free_work; 1112 wq->do_work = data->do_work; 1113 1114 /* caller must already hold a reference to this */ 1115 wq->user = data->user; 1116 1117 ret = -ENOMEM; 1118 for_each_node(node) { 1119 struct io_wqe *wqe; 1120 int alloc_node = node; 1121 1122 if (!node_online(alloc_node)) 1123 alloc_node = NUMA_NO_NODE; 1124 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); 1125 if (!wqe) 1126 goto err; 1127 wq->wqes[node] = wqe; 1128 wqe->node = alloc_node; 1129 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1130 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); 1131 if (wq->user) { 1132 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1133 task_rlimit(current, RLIMIT_NPROC); 1134 } 1135 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); 1136 wqe->wq = wq; 1137 raw_spin_lock_init(&wqe->lock); 1138 INIT_WQ_LIST(&wqe->work_list); 1139 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); 1140 INIT_LIST_HEAD(&wqe->all_list); 1141 } 1142 1143 init_completion(&wq->done); 1144 1145 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); 1146 if (!IS_ERR(wq->manager)) { 1147 wake_up_process(wq->manager); 1148 wait_for_completion(&wq->done); 1149 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { 1150 ret = -ENOMEM; 1151 goto err; 1152 } 1153 refcount_set(&wq->use_refs, 1); 1154 reinit_completion(&wq->done); 1155 return wq; 1156 } 1157 1158 ret = PTR_ERR(wq->manager); 1159 complete(&wq->done); 1160err: 1161 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1162 for_each_node(node) 1163 kfree(wq->wqes[node]); 1164err_wqes: 1165 kfree(wq->wqes); 1166err_wq: 1167 kfree(wq); 1168 return ERR_PTR(ret); 1169} 1170 1171bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) 1172{ 1173 if (data->free_work != wq->free_work || data->do_work != wq->do_work) 1174 return false; 1175 1176 return refcount_inc_not_zero(&wq->use_refs); 1177} 1178 1179static void __io_wq_destroy(struct io_wq *wq) 1180{ 1181 int node; 1182 1183 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1184 1185 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1186 if (wq->manager) 1187 kthread_stop(wq->manager); 1188 1189 rcu_read_lock(); 1190 for_each_node(node) 1191 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); 1192 rcu_read_unlock(); 1193 1194 wait_for_completion(&wq->done); 1195 1196 for_each_node(node) 1197 kfree(wq->wqes[node]); 1198 kfree(wq->wqes); 1199 kfree(wq); 1200} 1201 1202void io_wq_destroy(struct io_wq *wq) 1203{ 1204 if (refcount_dec_and_test(&wq->use_refs)) 1205 __io_wq_destroy(wq); 1206} 1207 1208struct task_struct *io_wq_get_task(struct io_wq *wq) 1209{ 1210 return wq->manager; 1211} 1212 1213static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1214{ 1215 struct task_struct *task = worker->task; 1216 struct rq_flags rf; 1217 struct rq *rq; 1218 1219 rq = task_rq_lock(task, &rf); 1220 do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node)); 1221 task->flags |= PF_NO_SETAFFINITY; 1222 task_rq_unlock(rq, task, &rf); 1223 return false; 1224} 1225 1226static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1227{ 1228 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1229 int i; 1230 1231 rcu_read_lock(); 1232 for_each_node(i) 1233 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL); 1234 rcu_read_unlock(); 1235 return 0; 1236} 1237 1238static __init int io_wq_init(void) 1239{ 1240 int ret; 1241 1242 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1243 io_wq_cpu_online, NULL); 1244 if (ret < 0) 1245 return ret; 1246 io_wq_online = ret; 1247 return 0; 1248} 1249subsys_initcall(io_wq_init);