Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.11-rc7 1211 lines 30 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/mm.h> 13#include <linux/sched/mm.h> 14#include <linux/percpu.h> 15#include <linux/slab.h> 16#include <linux/kthread.h> 17#include <linux/rculist_nulls.h> 18#include <linux/fs_struct.h> 19#include <linux/task_work.h> 20#include <linux/blk-cgroup.h> 21#include <linux/audit.h> 22#include <linux/cpu.h> 23 24#include "../kernel/sched/sched.h" 25#include "io-wq.h" 26 27#define WORKER_IDLE_TIMEOUT (5 * HZ) 28 29enum { 30 IO_WORKER_F_UP = 1, /* up and active */ 31 IO_WORKER_F_RUNNING = 2, /* account as running */ 32 IO_WORKER_F_FREE = 4, /* worker on free list */ 33 IO_WORKER_F_FIXED = 8, /* static idle worker */ 34 IO_WORKER_F_BOUND = 16, /* is doing bounded work */ 35}; 36 37enum { 38 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 39 IO_WQ_BIT_ERROR = 1, /* error on setup */ 40}; 41 42enum { 43 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ 44}; 45 46/* 47 * One for each thread in a wqe pool 48 */ 49struct io_worker { 50 refcount_t ref; 51 unsigned flags; 52 struct hlist_nulls_node nulls_node; 53 struct list_head all_list; 54 struct task_struct *task; 55 struct io_wqe *wqe; 56 57 struct io_wq_work *cur_work; 58 spinlock_t lock; 59 60 struct rcu_head rcu; 61 struct mm_struct *mm; 62#ifdef CONFIG_BLK_CGROUP 63 struct cgroup_subsys_state *blkcg_css; 64#endif 65 const struct cred *cur_creds; 66 const struct cred *saved_creds; 67 struct files_struct *restore_files; 68 struct nsproxy *restore_nsproxy; 69 struct fs_struct *restore_fs; 70}; 71 72#if BITS_PER_LONG == 64 73#define IO_WQ_HASH_ORDER 6 74#else 75#define IO_WQ_HASH_ORDER 5 76#endif 77 78#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 79 80struct io_wqe_acct { 81 unsigned nr_workers; 82 unsigned max_workers; 83 atomic_t nr_running; 84}; 85 86enum { 87 IO_WQ_ACCT_BOUND, 88 IO_WQ_ACCT_UNBOUND, 89}; 90 91/* 92 * Per-node worker thread pool 93 */ 94struct io_wqe { 95 struct { 96 raw_spinlock_t lock; 97 struct io_wq_work_list work_list; 98 unsigned long hash_map; 99 unsigned flags; 100 } ____cacheline_aligned_in_smp; 101 102 int node; 103 struct io_wqe_acct acct[2]; 104 105 struct hlist_nulls_head free_list; 106 struct list_head all_list; 107 108 struct io_wq *wq; 109 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 110}; 111 112/* 113 * Per io_wq state 114 */ 115struct io_wq { 116 struct io_wqe **wqes; 117 unsigned long state; 118 119 free_work_fn *free_work; 120 io_wq_work_fn *do_work; 121 122 struct task_struct *manager; 123 struct user_struct *user; 124 refcount_t refs; 125 struct completion done; 126 127 struct hlist_node cpuhp_node; 128 129 refcount_t use_refs; 130}; 131 132static enum cpuhp_state io_wq_online; 133 134static bool io_worker_get(struct io_worker *worker) 135{ 136 return refcount_inc_not_zero(&worker->ref); 137} 138 139static void io_worker_release(struct io_worker *worker) 140{ 141 if (refcount_dec_and_test(&worker->ref)) 142 wake_up_process(worker->task); 143} 144 145/* 146 * Note: drops the wqe->lock if returning true! The caller must re-acquire 147 * the lock in that case. Some callers need to restart handling if this 148 * happens, so we can't just re-acquire the lock on behalf of the caller. 149 */ 150static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) 151{ 152 bool dropped_lock = false; 153 154 if (worker->saved_creds) { 155 revert_creds(worker->saved_creds); 156 worker->cur_creds = worker->saved_creds = NULL; 157 } 158 159 if (current->files != worker->restore_files) { 160 __acquire(&wqe->lock); 161 raw_spin_unlock_irq(&wqe->lock); 162 dropped_lock = true; 163 164 task_lock(current); 165 current->files = worker->restore_files; 166 current->nsproxy = worker->restore_nsproxy; 167 task_unlock(current); 168 } 169 170 if (current->fs != worker->restore_fs) 171 current->fs = worker->restore_fs; 172 173 /* 174 * If we have an active mm, we need to drop the wq lock before unusing 175 * it. If we do, return true and let the caller retry the idle loop. 176 */ 177 if (worker->mm) { 178 if (!dropped_lock) { 179 __acquire(&wqe->lock); 180 raw_spin_unlock_irq(&wqe->lock); 181 dropped_lock = true; 182 } 183 __set_current_state(TASK_RUNNING); 184 kthread_unuse_mm(worker->mm); 185 mmput(worker->mm); 186 worker->mm = NULL; 187 } 188 189#ifdef CONFIG_BLK_CGROUP 190 if (worker->blkcg_css) { 191 kthread_associate_blkcg(NULL); 192 worker->blkcg_css = NULL; 193 } 194#endif 195 if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) 196 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; 197 return dropped_lock; 198} 199 200static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, 201 struct io_wq_work *work) 202{ 203 if (work->flags & IO_WQ_WORK_UNBOUND) 204 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 205 206 return &wqe->acct[IO_WQ_ACCT_BOUND]; 207} 208 209static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, 210 struct io_worker *worker) 211{ 212 if (worker->flags & IO_WORKER_F_BOUND) 213 return &wqe->acct[IO_WQ_ACCT_BOUND]; 214 215 return &wqe->acct[IO_WQ_ACCT_UNBOUND]; 216} 217 218static void io_worker_exit(struct io_worker *worker) 219{ 220 struct io_wqe *wqe = worker->wqe; 221 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 222 223 /* 224 * If we're not at zero, someone else is holding a brief reference 225 * to the worker. Wait for that to go away. 226 */ 227 set_current_state(TASK_INTERRUPTIBLE); 228 if (!refcount_dec_and_test(&worker->ref)) 229 schedule(); 230 __set_current_state(TASK_RUNNING); 231 232 preempt_disable(); 233 current->flags &= ~PF_IO_WORKER; 234 if (worker->flags & IO_WORKER_F_RUNNING) 235 atomic_dec(&acct->nr_running); 236 if (!(worker->flags & IO_WORKER_F_BOUND)) 237 atomic_dec(&wqe->wq->user->processes); 238 worker->flags = 0; 239 preempt_enable(); 240 241 raw_spin_lock_irq(&wqe->lock); 242 hlist_nulls_del_rcu(&worker->nulls_node); 243 list_del_rcu(&worker->all_list); 244 if (__io_worker_unuse(wqe, worker)) { 245 __release(&wqe->lock); 246 raw_spin_lock_irq(&wqe->lock); 247 } 248 acct->nr_workers--; 249 raw_spin_unlock_irq(&wqe->lock); 250 251 kfree_rcu(worker, rcu); 252 if (refcount_dec_and_test(&wqe->wq->refs)) 253 complete(&wqe->wq->done); 254} 255 256static inline bool io_wqe_run_queue(struct io_wqe *wqe) 257 __must_hold(wqe->lock) 258{ 259 if (!wq_list_empty(&wqe->work_list) && 260 !(wqe->flags & IO_WQE_FLAG_STALLED)) 261 return true; 262 return false; 263} 264 265/* 266 * Check head of free list for an available worker. If one isn't available, 267 * caller must wake up the wq manager to create one. 268 */ 269static bool io_wqe_activate_free_worker(struct io_wqe *wqe) 270 __must_hold(RCU) 271{ 272 struct hlist_nulls_node *n; 273 struct io_worker *worker; 274 275 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); 276 if (is_a_nulls(n)) 277 return false; 278 279 worker = hlist_nulls_entry(n, struct io_worker, nulls_node); 280 if (io_worker_get(worker)) { 281 wake_up_process(worker->task); 282 io_worker_release(worker); 283 return true; 284 } 285 286 return false; 287} 288 289/* 290 * We need a worker. If we find a free one, we're good. If not, and we're 291 * below the max number of workers, wake up the manager to create one. 292 */ 293static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) 294{ 295 bool ret; 296 297 /* 298 * Most likely an attempt to queue unbounded work on an io_wq that 299 * wasn't setup with any unbounded workers. 300 */ 301 WARN_ON_ONCE(!acct->max_workers); 302 303 rcu_read_lock(); 304 ret = io_wqe_activate_free_worker(wqe); 305 rcu_read_unlock(); 306 307 if (!ret && acct->nr_workers < acct->max_workers) 308 wake_up_process(wqe->wq->manager); 309} 310 311static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) 312{ 313 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 314 315 atomic_inc(&acct->nr_running); 316} 317 318static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) 319 __must_hold(wqe->lock) 320{ 321 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); 322 323 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) 324 io_wqe_wake_worker(wqe, acct); 325} 326 327static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) 328{ 329 allow_kernel_signal(SIGINT); 330 331 current->flags |= PF_IO_WORKER; 332 333 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 334 worker->restore_files = current->files; 335 worker->restore_nsproxy = current->nsproxy; 336 worker->restore_fs = current->fs; 337 io_wqe_inc_running(wqe, worker); 338} 339 340/* 341 * Worker will start processing some work. Move it to the busy list, if 342 * it's currently on the freelist 343 */ 344static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, 345 struct io_wq_work *work) 346 __must_hold(wqe->lock) 347{ 348 bool worker_bound, work_bound; 349 350 if (worker->flags & IO_WORKER_F_FREE) { 351 worker->flags &= ~IO_WORKER_F_FREE; 352 hlist_nulls_del_init_rcu(&worker->nulls_node); 353 } 354 355 /* 356 * If worker is moving from bound to unbound (or vice versa), then 357 * ensure we update the running accounting. 358 */ 359 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; 360 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; 361 if (worker_bound != work_bound) { 362 io_wqe_dec_running(wqe, worker); 363 if (work_bound) { 364 worker->flags |= IO_WORKER_F_BOUND; 365 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; 366 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; 367 atomic_dec(&wqe->wq->user->processes); 368 } else { 369 worker->flags &= ~IO_WORKER_F_BOUND; 370 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; 371 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; 372 atomic_inc(&wqe->wq->user->processes); 373 } 374 io_wqe_inc_running(wqe, worker); 375 } 376} 377 378/* 379 * No work, worker going to sleep. Move to freelist, and unuse mm if we 380 * have one attached. Dropping the mm may potentially sleep, so we drop 381 * the lock in that case and return success. Since the caller has to 382 * retry the loop in that case (we changed task state), we don't regrab 383 * the lock if we return success. 384 */ 385static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) 386 __must_hold(wqe->lock) 387{ 388 if (!(worker->flags & IO_WORKER_F_FREE)) { 389 worker->flags |= IO_WORKER_F_FREE; 390 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 391 } 392 393 return __io_worker_unuse(wqe, worker); 394} 395 396static inline unsigned int io_get_work_hash(struct io_wq_work *work) 397{ 398 return work->flags >> IO_WQ_HASH_SHIFT; 399} 400 401static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) 402 __must_hold(wqe->lock) 403{ 404 struct io_wq_work_node *node, *prev; 405 struct io_wq_work *work, *tail; 406 unsigned int hash; 407 408 wq_list_for_each(node, prev, &wqe->work_list) { 409 work = container_of(node, struct io_wq_work, list); 410 411 /* not hashed, can run anytime */ 412 if (!io_wq_is_hashed(work)) { 413 wq_list_del(&wqe->work_list, node, prev); 414 return work; 415 } 416 417 /* hashed, can run if not already running */ 418 hash = io_get_work_hash(work); 419 if (!(wqe->hash_map & BIT(hash))) { 420 wqe->hash_map |= BIT(hash); 421 /* all items with this hash lie in [work, tail] */ 422 tail = wqe->hash_tail[hash]; 423 wqe->hash_tail[hash] = NULL; 424 wq_list_cut(&wqe->work_list, &tail->list, prev); 425 return work; 426 } 427 } 428 429 return NULL; 430} 431 432static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) 433{ 434 if (worker->mm) { 435 kthread_unuse_mm(worker->mm); 436 mmput(worker->mm); 437 worker->mm = NULL; 438 } 439 440 if (mmget_not_zero(work->identity->mm)) { 441 kthread_use_mm(work->identity->mm); 442 worker->mm = work->identity->mm; 443 return; 444 } 445 446 /* failed grabbing mm, ensure work gets cancelled */ 447 work->flags |= IO_WQ_WORK_CANCEL; 448} 449 450static inline void io_wq_switch_blkcg(struct io_worker *worker, 451 struct io_wq_work *work) 452{ 453#ifdef CONFIG_BLK_CGROUP 454 if (!(work->flags & IO_WQ_WORK_BLKCG)) 455 return; 456 if (work->identity->blkcg_css != worker->blkcg_css) { 457 kthread_associate_blkcg(work->identity->blkcg_css); 458 worker->blkcg_css = work->identity->blkcg_css; 459 } 460#endif 461} 462 463static void io_wq_switch_creds(struct io_worker *worker, 464 struct io_wq_work *work) 465{ 466 const struct cred *old_creds = override_creds(work->identity->creds); 467 468 worker->cur_creds = work->identity->creds; 469 if (worker->saved_creds) 470 put_cred(old_creds); /* creds set by previous switch */ 471 else 472 worker->saved_creds = old_creds; 473} 474 475static void io_impersonate_work(struct io_worker *worker, 476 struct io_wq_work *work) 477{ 478 if ((work->flags & IO_WQ_WORK_FILES) && 479 current->files != work->identity->files) { 480 task_lock(current); 481 current->files = work->identity->files; 482 current->nsproxy = work->identity->nsproxy; 483 task_unlock(current); 484 if (!work->identity->files) { 485 /* failed grabbing files, ensure work gets cancelled */ 486 work->flags |= IO_WQ_WORK_CANCEL; 487 } 488 } 489 if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs) 490 current->fs = work->identity->fs; 491 if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm) 492 io_wq_switch_mm(worker, work); 493 if ((work->flags & IO_WQ_WORK_CREDS) && 494 worker->cur_creds != work->identity->creds) 495 io_wq_switch_creds(worker, work); 496 if (work->flags & IO_WQ_WORK_FSIZE) 497 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize; 498 else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) 499 current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; 500 io_wq_switch_blkcg(worker, work); 501#ifdef CONFIG_AUDIT 502 current->loginuid = work->identity->loginuid; 503 current->sessionid = work->identity->sessionid; 504#endif 505} 506 507static void io_assign_current_work(struct io_worker *worker, 508 struct io_wq_work *work) 509{ 510 if (work) { 511 /* flush pending signals before assigning new work */ 512 if (signal_pending(current)) 513 flush_signals(current); 514 cond_resched(); 515 } 516 517#ifdef CONFIG_AUDIT 518 current->loginuid = KUIDT_INIT(AUDIT_UID_UNSET); 519 current->sessionid = AUDIT_SID_UNSET; 520#endif 521 522 spin_lock_irq(&worker->lock); 523 worker->cur_work = work; 524 spin_unlock_irq(&worker->lock); 525} 526 527static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); 528 529static void io_worker_handle_work(struct io_worker *worker) 530 __releases(wqe->lock) 531{ 532 struct io_wqe *wqe = worker->wqe; 533 struct io_wq *wq = wqe->wq; 534 535 do { 536 struct io_wq_work *work; 537get_next: 538 /* 539 * If we got some work, mark us as busy. If we didn't, but 540 * the list isn't empty, it means we stalled on hashed work. 541 * Mark us stalled so we don't keep looking for work when we 542 * can't make progress, any work completion or insertion will 543 * clear the stalled flag. 544 */ 545 work = io_get_next_work(wqe); 546 if (work) 547 __io_worker_busy(wqe, worker, work); 548 else if (!wq_list_empty(&wqe->work_list)) 549 wqe->flags |= IO_WQE_FLAG_STALLED; 550 551 raw_spin_unlock_irq(&wqe->lock); 552 if (!work) 553 break; 554 io_assign_current_work(worker, work); 555 556 /* handle a whole dependent link */ 557 do { 558 struct io_wq_work *old_work, *next_hashed, *linked; 559 unsigned int hash = io_get_work_hash(work); 560 561 next_hashed = wq_next_work(work); 562 io_impersonate_work(worker, work); 563 564 old_work = work; 565 linked = wq->do_work(work); 566 567 work = next_hashed; 568 if (!work && linked && !io_wq_is_hashed(linked)) { 569 work = linked; 570 linked = NULL; 571 } 572 io_assign_current_work(worker, work); 573 wq->free_work(old_work); 574 575 if (linked) 576 io_wqe_enqueue(wqe, linked); 577 578 if (hash != -1U && !next_hashed) { 579 raw_spin_lock_irq(&wqe->lock); 580 wqe->hash_map &= ~BIT_ULL(hash); 581 wqe->flags &= ~IO_WQE_FLAG_STALLED; 582 /* skip unnecessary unlock-lock wqe->lock */ 583 if (!work) 584 goto get_next; 585 raw_spin_unlock_irq(&wqe->lock); 586 } 587 } while (work); 588 589 raw_spin_lock_irq(&wqe->lock); 590 } while (1); 591} 592 593static int io_wqe_worker(void *data) 594{ 595 struct io_worker *worker = data; 596 struct io_wqe *wqe = worker->wqe; 597 struct io_wq *wq = wqe->wq; 598 599 io_worker_start(wqe, worker); 600 601 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 602 set_current_state(TASK_INTERRUPTIBLE); 603loop: 604 raw_spin_lock_irq(&wqe->lock); 605 if (io_wqe_run_queue(wqe)) { 606 __set_current_state(TASK_RUNNING); 607 io_worker_handle_work(worker); 608 goto loop; 609 } 610 /* drops the lock on success, retry */ 611 if (__io_worker_idle(wqe, worker)) { 612 __release(&wqe->lock); 613 goto loop; 614 } 615 raw_spin_unlock_irq(&wqe->lock); 616 if (signal_pending(current)) 617 flush_signals(current); 618 if (schedule_timeout(WORKER_IDLE_TIMEOUT)) 619 continue; 620 /* timed out, exit unless we're the fixed worker */ 621 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 622 !(worker->flags & IO_WORKER_F_FIXED)) 623 break; 624 } 625 626 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 627 raw_spin_lock_irq(&wqe->lock); 628 if (!wq_list_empty(&wqe->work_list)) 629 io_worker_handle_work(worker); 630 else 631 raw_spin_unlock_irq(&wqe->lock); 632 } 633 634 io_worker_exit(worker); 635 return 0; 636} 637 638/* 639 * Called when a worker is scheduled in. Mark us as currently running. 640 */ 641void io_wq_worker_running(struct task_struct *tsk) 642{ 643 struct io_worker *worker = kthread_data(tsk); 644 struct io_wqe *wqe = worker->wqe; 645 646 if (!(worker->flags & IO_WORKER_F_UP)) 647 return; 648 if (worker->flags & IO_WORKER_F_RUNNING) 649 return; 650 worker->flags |= IO_WORKER_F_RUNNING; 651 io_wqe_inc_running(wqe, worker); 652} 653 654/* 655 * Called when worker is going to sleep. If there are no workers currently 656 * running and we have work pending, wake up a free one or have the manager 657 * set one up. 658 */ 659void io_wq_worker_sleeping(struct task_struct *tsk) 660{ 661 struct io_worker *worker = kthread_data(tsk); 662 struct io_wqe *wqe = worker->wqe; 663 664 if (!(worker->flags & IO_WORKER_F_UP)) 665 return; 666 if (!(worker->flags & IO_WORKER_F_RUNNING)) 667 return; 668 669 worker->flags &= ~IO_WORKER_F_RUNNING; 670 671 raw_spin_lock_irq(&wqe->lock); 672 io_wqe_dec_running(wqe, worker); 673 raw_spin_unlock_irq(&wqe->lock); 674} 675 676static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) 677{ 678 struct io_wqe_acct *acct = &wqe->acct[index]; 679 struct io_worker *worker; 680 681 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); 682 if (!worker) 683 return false; 684 685 refcount_set(&worker->ref, 1); 686 worker->nulls_node.pprev = NULL; 687 worker->wqe = wqe; 688 spin_lock_init(&worker->lock); 689 690 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, 691 "io_wqe_worker-%d/%d", index, wqe->node); 692 if (IS_ERR(worker->task)) { 693 kfree(worker); 694 return false; 695 } 696 kthread_bind_mask(worker->task, cpumask_of_node(wqe->node)); 697 698 raw_spin_lock_irq(&wqe->lock); 699 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 700 list_add_tail_rcu(&worker->all_list, &wqe->all_list); 701 worker->flags |= IO_WORKER_F_FREE; 702 if (index == IO_WQ_ACCT_BOUND) 703 worker->flags |= IO_WORKER_F_BOUND; 704 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) 705 worker->flags |= IO_WORKER_F_FIXED; 706 acct->nr_workers++; 707 raw_spin_unlock_irq(&wqe->lock); 708 709 if (index == IO_WQ_ACCT_UNBOUND) 710 atomic_inc(&wq->user->processes); 711 712 refcount_inc(&wq->refs); 713 wake_up_process(worker->task); 714 return true; 715} 716 717static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) 718 __must_hold(wqe->lock) 719{ 720 struct io_wqe_acct *acct = &wqe->acct[index]; 721 722 /* if we have available workers or no work, no need */ 723 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) 724 return false; 725 return acct->nr_workers < acct->max_workers; 726} 727 728/* 729 * Iterate the passed in list and call the specific function for each 730 * worker that isn't exiting 731 */ 732static bool io_wq_for_each_worker(struct io_wqe *wqe, 733 bool (*func)(struct io_worker *, void *), 734 void *data) 735{ 736 struct io_worker *worker; 737 bool ret = false; 738 739 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { 740 if (io_worker_get(worker)) { 741 /* no task if node is/was offline */ 742 if (worker->task) 743 ret = func(worker, data); 744 io_worker_release(worker); 745 if (ret) 746 break; 747 } 748 } 749 750 return ret; 751} 752 753static bool io_wq_worker_wake(struct io_worker *worker, void *data) 754{ 755 wake_up_process(worker->task); 756 return false; 757} 758 759/* 760 * Manager thread. Tasked with creating new workers, if we need them. 761 */ 762static int io_wq_manager(void *data) 763{ 764 struct io_wq *wq = data; 765 int node; 766 767 /* create fixed workers */ 768 refcount_set(&wq->refs, 1); 769 for_each_node(node) { 770 if (!node_online(node)) 771 continue; 772 if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) 773 continue; 774 set_bit(IO_WQ_BIT_ERROR, &wq->state); 775 set_bit(IO_WQ_BIT_EXIT, &wq->state); 776 goto out; 777 } 778 779 complete(&wq->done); 780 781 while (!kthread_should_stop()) { 782 if (current->task_works) 783 task_work_run(); 784 785 for_each_node(node) { 786 struct io_wqe *wqe = wq->wqes[node]; 787 bool fork_worker[2] = { false, false }; 788 789 if (!node_online(node)) 790 continue; 791 792 raw_spin_lock_irq(&wqe->lock); 793 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) 794 fork_worker[IO_WQ_ACCT_BOUND] = true; 795 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) 796 fork_worker[IO_WQ_ACCT_UNBOUND] = true; 797 raw_spin_unlock_irq(&wqe->lock); 798 if (fork_worker[IO_WQ_ACCT_BOUND]) 799 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); 800 if (fork_worker[IO_WQ_ACCT_UNBOUND]) 801 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); 802 } 803 set_current_state(TASK_INTERRUPTIBLE); 804 schedule_timeout(HZ); 805 } 806 807 if (current->task_works) 808 task_work_run(); 809 810out: 811 if (refcount_dec_and_test(&wq->refs)) { 812 complete(&wq->done); 813 return 0; 814 } 815 /* if ERROR is set and we get here, we have workers to wake */ 816 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { 817 rcu_read_lock(); 818 for_each_node(node) 819 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); 820 rcu_read_unlock(); 821 } 822 return 0; 823} 824 825static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, 826 struct io_wq_work *work) 827{ 828 bool free_worker; 829 830 if (!(work->flags & IO_WQ_WORK_UNBOUND)) 831 return true; 832 if (atomic_read(&acct->nr_running)) 833 return true; 834 835 rcu_read_lock(); 836 free_worker = !hlist_nulls_empty(&wqe->free_list); 837 rcu_read_unlock(); 838 if (free_worker) 839 return true; 840 841 if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers && 842 !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN))) 843 return false; 844 845 return true; 846} 847 848static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) 849{ 850 struct io_wq *wq = wqe->wq; 851 852 do { 853 struct io_wq_work *old_work = work; 854 855 work->flags |= IO_WQ_WORK_CANCEL; 856 work = wq->do_work(work); 857 wq->free_work(old_work); 858 } while (work); 859} 860 861static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) 862{ 863 unsigned int hash; 864 struct io_wq_work *tail; 865 866 if (!io_wq_is_hashed(work)) { 867append: 868 wq_list_add_tail(&work->list, &wqe->work_list); 869 return; 870 } 871 872 hash = io_get_work_hash(work); 873 tail = wqe->hash_tail[hash]; 874 wqe->hash_tail[hash] = work; 875 if (!tail) 876 goto append; 877 878 wq_list_add_after(&work->list, &tail->list, &wqe->work_list); 879} 880 881static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) 882{ 883 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 884 int work_flags; 885 unsigned long flags; 886 887 /* 888 * Do early check to see if we need a new unbound worker, and if we do, 889 * if we're allowed to do so. This isn't 100% accurate as there's a 890 * gap between this check and incrementing the value, but that's OK. 891 * It's close enough to not be an issue, fork() has the same delay. 892 */ 893 if (unlikely(!io_wq_can_queue(wqe, acct, work))) { 894 io_run_cancel(work, wqe); 895 return; 896 } 897 898 work_flags = work->flags; 899 raw_spin_lock_irqsave(&wqe->lock, flags); 900 io_wqe_insert_work(wqe, work); 901 wqe->flags &= ~IO_WQE_FLAG_STALLED; 902 raw_spin_unlock_irqrestore(&wqe->lock, flags); 903 904 if ((work_flags & IO_WQ_WORK_CONCURRENT) || 905 !atomic_read(&acct->nr_running)) 906 io_wqe_wake_worker(wqe, acct); 907} 908 909void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 910{ 911 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 912 913 io_wqe_enqueue(wqe, work); 914} 915 916/* 917 * Work items that hash to the same value will not be done in parallel. 918 * Used to limit concurrent writes, generally hashed by inode. 919 */ 920void io_wq_hash_work(struct io_wq_work *work, void *val) 921{ 922 unsigned int bit; 923 924 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 925 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 926} 927 928struct io_cb_cancel_data { 929 work_cancel_fn *fn; 930 void *data; 931 int nr_running; 932 int nr_pending; 933 bool cancel_all; 934}; 935 936static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 937{ 938 struct io_cb_cancel_data *match = data; 939 unsigned long flags; 940 941 /* 942 * Hold the lock to avoid ->cur_work going out of scope, caller 943 * may dereference the passed in work. 944 */ 945 spin_lock_irqsave(&worker->lock, flags); 946 if (worker->cur_work && 947 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && 948 match->fn(worker->cur_work, match->data)) { 949 send_sig(SIGINT, worker->task, 1); 950 match->nr_running++; 951 } 952 spin_unlock_irqrestore(&worker->lock, flags); 953 954 return match->nr_running && !match->cancel_all; 955} 956 957static inline void io_wqe_remove_pending(struct io_wqe *wqe, 958 struct io_wq_work *work, 959 struct io_wq_work_node *prev) 960{ 961 unsigned int hash = io_get_work_hash(work); 962 struct io_wq_work *prev_work = NULL; 963 964 if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) { 965 if (prev) 966 prev_work = container_of(prev, struct io_wq_work, list); 967 if (prev_work && io_get_work_hash(prev_work) == hash) 968 wqe->hash_tail[hash] = prev_work; 969 else 970 wqe->hash_tail[hash] = NULL; 971 } 972 wq_list_del(&wqe->work_list, &work->list, prev); 973} 974 975static void io_wqe_cancel_pending_work(struct io_wqe *wqe, 976 struct io_cb_cancel_data *match) 977{ 978 struct io_wq_work_node *node, *prev; 979 struct io_wq_work *work; 980 unsigned long flags; 981 982retry: 983 raw_spin_lock_irqsave(&wqe->lock, flags); 984 wq_list_for_each(node, prev, &wqe->work_list) { 985 work = container_of(node, struct io_wq_work, list); 986 if (!match->fn(work, match->data)) 987 continue; 988 io_wqe_remove_pending(wqe, work, prev); 989 raw_spin_unlock_irqrestore(&wqe->lock, flags); 990 io_run_cancel(work, wqe); 991 match->nr_pending++; 992 if (!match->cancel_all) 993 return; 994 995 /* not safe to continue after unlock */ 996 goto retry; 997 } 998 raw_spin_unlock_irqrestore(&wqe->lock, flags); 999} 1000 1001static void io_wqe_cancel_running_work(struct io_wqe *wqe, 1002 struct io_cb_cancel_data *match) 1003{ 1004 rcu_read_lock(); 1005 io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); 1006 rcu_read_unlock(); 1007} 1008 1009enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1010 void *data, bool cancel_all) 1011{ 1012 struct io_cb_cancel_data match = { 1013 .fn = cancel, 1014 .data = data, 1015 .cancel_all = cancel_all, 1016 }; 1017 int node; 1018 1019 /* 1020 * First check pending list, if we're lucky we can just remove it 1021 * from there. CANCEL_OK means that the work is returned as-new, 1022 * no completion will be posted for it. 1023 */ 1024 for_each_node(node) { 1025 struct io_wqe *wqe = wq->wqes[node]; 1026 1027 io_wqe_cancel_pending_work(wqe, &match); 1028 if (match.nr_pending && !match.cancel_all) 1029 return IO_WQ_CANCEL_OK; 1030 } 1031 1032 /* 1033 * Now check if a free (going busy) or busy worker has the work 1034 * currently running. If we find it there, we'll return CANCEL_RUNNING 1035 * as an indication that we attempt to signal cancellation. The 1036 * completion will run normally in this case. 1037 */ 1038 for_each_node(node) { 1039 struct io_wqe *wqe = wq->wqes[node]; 1040 1041 io_wqe_cancel_running_work(wqe, &match); 1042 if (match.nr_running && !match.cancel_all) 1043 return IO_WQ_CANCEL_RUNNING; 1044 } 1045 1046 if (match.nr_running) 1047 return IO_WQ_CANCEL_RUNNING; 1048 if (match.nr_pending) 1049 return IO_WQ_CANCEL_OK; 1050 return IO_WQ_CANCEL_NOTFOUND; 1051} 1052 1053struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1054{ 1055 int ret = -ENOMEM, node; 1056 struct io_wq *wq; 1057 1058 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1059 return ERR_PTR(-EINVAL); 1060 1061 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 1062 if (!wq) 1063 return ERR_PTR(-ENOMEM); 1064 1065 wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL); 1066 if (!wq->wqes) 1067 goto err_wq; 1068 1069 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1070 if (ret) 1071 goto err_wqes; 1072 1073 wq->free_work = data->free_work; 1074 wq->do_work = data->do_work; 1075 1076 /* caller must already hold a reference to this */ 1077 wq->user = data->user; 1078 1079 ret = -ENOMEM; 1080 for_each_node(node) { 1081 struct io_wqe *wqe; 1082 int alloc_node = node; 1083 1084 if (!node_online(alloc_node)) 1085 alloc_node = NUMA_NO_NODE; 1086 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); 1087 if (!wqe) 1088 goto err; 1089 wq->wqes[node] = wqe; 1090 wqe->node = alloc_node; 1091 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1092 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); 1093 if (wq->user) { 1094 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1095 task_rlimit(current, RLIMIT_NPROC); 1096 } 1097 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); 1098 wqe->wq = wq; 1099 raw_spin_lock_init(&wqe->lock); 1100 INIT_WQ_LIST(&wqe->work_list); 1101 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); 1102 INIT_LIST_HEAD(&wqe->all_list); 1103 } 1104 1105 init_completion(&wq->done); 1106 1107 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); 1108 if (!IS_ERR(wq->manager)) { 1109 wake_up_process(wq->manager); 1110 wait_for_completion(&wq->done); 1111 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { 1112 ret = -ENOMEM; 1113 goto err; 1114 } 1115 refcount_set(&wq->use_refs, 1); 1116 reinit_completion(&wq->done); 1117 return wq; 1118 } 1119 1120 ret = PTR_ERR(wq->manager); 1121 complete(&wq->done); 1122err: 1123 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1124 for_each_node(node) 1125 kfree(wq->wqes[node]); 1126err_wqes: 1127 kfree(wq->wqes); 1128err_wq: 1129 kfree(wq); 1130 return ERR_PTR(ret); 1131} 1132 1133bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) 1134{ 1135 if (data->free_work != wq->free_work || data->do_work != wq->do_work) 1136 return false; 1137 1138 return refcount_inc_not_zero(&wq->use_refs); 1139} 1140 1141static void __io_wq_destroy(struct io_wq *wq) 1142{ 1143 int node; 1144 1145 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1146 1147 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1148 if (wq->manager) 1149 kthread_stop(wq->manager); 1150 1151 rcu_read_lock(); 1152 for_each_node(node) 1153 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); 1154 rcu_read_unlock(); 1155 1156 wait_for_completion(&wq->done); 1157 1158 for_each_node(node) 1159 kfree(wq->wqes[node]); 1160 kfree(wq->wqes); 1161 kfree(wq); 1162} 1163 1164void io_wq_destroy(struct io_wq *wq) 1165{ 1166 if (refcount_dec_and_test(&wq->use_refs)) 1167 __io_wq_destroy(wq); 1168} 1169 1170struct task_struct *io_wq_get_task(struct io_wq *wq) 1171{ 1172 return wq->manager; 1173} 1174 1175static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1176{ 1177 struct task_struct *task = worker->task; 1178 struct rq_flags rf; 1179 struct rq *rq; 1180 1181 rq = task_rq_lock(task, &rf); 1182 do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node)); 1183 task->flags |= PF_NO_SETAFFINITY; 1184 task_rq_unlock(rq, task, &rf); 1185 return false; 1186} 1187 1188static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1189{ 1190 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1191 int i; 1192 1193 rcu_read_lock(); 1194 for_each_node(i) 1195 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL); 1196 rcu_read_unlock(); 1197 return 0; 1198} 1199 1200static __init int io_wq_init(void) 1201{ 1202 int ret; 1203 1204 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1205 io_wq_cpu_online, NULL); 1206 if (ret < 0) 1207 return ret; 1208 io_wq_online = ret; 1209 return 0; 1210} 1211subsys_initcall(io_wq_init);