Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.14-rc1 1121 lines 27 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/percpu.h> 13#include <linux/slab.h> 14#include <linux/rculist_nulls.h> 15#include <linux/cpu.h> 16#include <linux/tracehook.h> 17 18#include "io-wq.h" 19 20#define WORKER_IDLE_TIMEOUT (5 * HZ) 21 22enum { 23 IO_WORKER_F_UP = 1, /* up and active */ 24 IO_WORKER_F_RUNNING = 2, /* account as running */ 25 IO_WORKER_F_FREE = 4, /* worker on free list */ 26 IO_WORKER_F_FIXED = 8, /* static idle worker */ 27 IO_WORKER_F_BOUND = 16, /* is doing bounded work */ 28}; 29 30enum { 31 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 32}; 33 34enum { 35 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ 36}; 37 38/* 39 * One for each thread in a wqe pool 40 */ 41struct io_worker { 42 refcount_t ref; 43 unsigned flags; 44 struct hlist_nulls_node nulls_node; 45 struct list_head all_list; 46 struct task_struct *task; 47 struct io_wqe *wqe; 48 49 struct io_wq_work *cur_work; 50 spinlock_t lock; 51 52 struct completion ref_done; 53 54 struct rcu_head rcu; 55}; 56 57#if BITS_PER_LONG == 64 58#define IO_WQ_HASH_ORDER 6 59#else 60#define IO_WQ_HASH_ORDER 5 61#endif 62 63#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 64 65struct io_wqe_acct { 66 unsigned nr_workers; 67 unsigned max_workers; 68 int index; 69 atomic_t nr_running; 70}; 71 72enum { 73 IO_WQ_ACCT_BOUND, 74 IO_WQ_ACCT_UNBOUND, 75}; 76 77/* 78 * Per-node worker thread pool 79 */ 80struct io_wqe { 81 struct { 82 raw_spinlock_t lock; 83 struct io_wq_work_list work_list; 84 unsigned flags; 85 } ____cacheline_aligned_in_smp; 86 87 int node; 88 struct io_wqe_acct acct[2]; 89 90 struct hlist_nulls_head free_list; 91 struct list_head all_list; 92 93 struct wait_queue_entry wait; 94 95 struct io_wq *wq; 96 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 97 98 cpumask_var_t cpu_mask; 99}; 100 101/* 102 * Per io_wq state 103 */ 104struct io_wq { 105 unsigned long state; 106 107 free_work_fn *free_work; 108 io_wq_work_fn *do_work; 109 110 struct io_wq_hash *hash; 111 112 atomic_t worker_refs; 113 struct completion worker_done; 114 115 struct hlist_node cpuhp_node; 116 117 struct task_struct *task; 118 119 struct io_wqe *wqes[]; 120}; 121 122static enum cpuhp_state io_wq_online; 123 124struct io_cb_cancel_data { 125 work_cancel_fn *fn; 126 void *data; 127 int nr_running; 128 int nr_pending; 129 bool cancel_all; 130}; 131 132static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); 133 134static bool io_worker_get(struct io_worker *worker) 135{ 136 return refcount_inc_not_zero(&worker->ref); 137} 138 139static void io_worker_release(struct io_worker *worker) 140{ 141 if (refcount_dec_and_test(&worker->ref)) 142 complete(&worker->ref_done); 143} 144 145static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound) 146{ 147 return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 148} 149 150static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, 151 struct io_wq_work *work) 152{ 153 return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND)); 154} 155 156static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker) 157{ 158 return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND); 159} 160 161static void io_worker_ref_put(struct io_wq *wq) 162{ 163 if (atomic_dec_and_test(&wq->worker_refs)) 164 complete(&wq->worker_done); 165} 166 167static void io_worker_exit(struct io_worker *worker) 168{ 169 struct io_wqe *wqe = worker->wqe; 170 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 171 unsigned flags; 172 173 if (refcount_dec_and_test(&worker->ref)) 174 complete(&worker->ref_done); 175 wait_for_completion(&worker->ref_done); 176 177 preempt_disable(); 178 current->flags &= ~PF_IO_WORKER; 179 flags = worker->flags; 180 worker->flags = 0; 181 if (flags & IO_WORKER_F_RUNNING) 182 atomic_dec(&acct->nr_running); 183 worker->flags = 0; 184 preempt_enable(); 185 186 raw_spin_lock_irq(&wqe->lock); 187 if (flags & IO_WORKER_F_FREE) 188 hlist_nulls_del_rcu(&worker->nulls_node); 189 list_del_rcu(&worker->all_list); 190 acct->nr_workers--; 191 raw_spin_unlock_irq(&wqe->lock); 192 193 kfree_rcu(worker, rcu); 194 io_worker_ref_put(wqe->wq); 195 do_exit(0); 196} 197 198static inline bool io_wqe_run_queue(struct io_wqe *wqe) 199 __must_hold(wqe->lock) 200{ 201 if (!wq_list_empty(&wqe->work_list) && 202 !(wqe->flags & IO_WQE_FLAG_STALLED)) 203 return true; 204 return false; 205} 206 207/* 208 * Check head of free list for an available worker. If one isn't available, 209 * caller must create one. 210 */ 211static bool io_wqe_activate_free_worker(struct io_wqe *wqe) 212 __must_hold(RCU) 213{ 214 struct hlist_nulls_node *n; 215 struct io_worker *worker; 216 217 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); 218 if (is_a_nulls(n)) 219 return false; 220 221 worker = hlist_nulls_entry(n, struct io_worker, nulls_node); 222 if (io_worker_get(worker)) { 223 wake_up_process(worker->task); 224 io_worker_release(worker); 225 return true; 226 } 227 228 return false; 229} 230 231/* 232 * We need a worker. If we find a free one, we're good. If not, and we're 233 * below the max number of workers, create one. 234 */ 235static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) 236{ 237 bool ret; 238 239 /* 240 * Most likely an attempt to queue unbounded work on an io_wq that 241 * wasn't setup with any unbounded workers. 242 */ 243 if (unlikely(!acct->max_workers)) 244 pr_warn_once("io-wq is not configured for unbound workers"); 245 246 rcu_read_lock(); 247 ret = io_wqe_activate_free_worker(wqe); 248 rcu_read_unlock(); 249 250 if (!ret && acct->nr_workers < acct->max_workers) { 251 atomic_inc(&acct->nr_running); 252 atomic_inc(&wqe->wq->worker_refs); 253 create_io_worker(wqe->wq, wqe, acct->index); 254 } 255} 256 257static void io_wqe_inc_running(struct io_worker *worker) 258{ 259 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 260 261 atomic_inc(&acct->nr_running); 262} 263 264struct create_worker_data { 265 struct callback_head work; 266 struct io_wqe *wqe; 267 int index; 268}; 269 270static void create_worker_cb(struct callback_head *cb) 271{ 272 struct create_worker_data *cwd; 273 struct io_wq *wq; 274 275 cwd = container_of(cb, struct create_worker_data, work); 276 wq = cwd->wqe->wq; 277 create_io_worker(wq, cwd->wqe, cwd->index); 278 kfree(cwd); 279} 280 281static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct) 282{ 283 struct create_worker_data *cwd; 284 struct io_wq *wq = wqe->wq; 285 286 /* raced with exit, just ignore create call */ 287 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 288 goto fail; 289 290 cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC); 291 if (cwd) { 292 init_task_work(&cwd->work, create_worker_cb); 293 cwd->wqe = wqe; 294 cwd->index = acct->index; 295 if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL)) 296 return; 297 298 kfree(cwd); 299 } 300fail: 301 atomic_dec(&acct->nr_running); 302 io_worker_ref_put(wq); 303} 304 305static void io_wqe_dec_running(struct io_worker *worker) 306 __must_hold(wqe->lock) 307{ 308 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 309 struct io_wqe *wqe = worker->wqe; 310 311 if (!(worker->flags & IO_WORKER_F_UP)) 312 return; 313 314 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) { 315 atomic_inc(&acct->nr_running); 316 atomic_inc(&wqe->wq->worker_refs); 317 io_queue_worker_create(wqe, acct); 318 } 319} 320 321/* 322 * Worker will start processing some work. Move it to the busy list, if 323 * it's currently on the freelist 324 */ 325static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, 326 struct io_wq_work *work) 327 __must_hold(wqe->lock) 328{ 329 bool worker_bound, work_bound; 330 331 BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1); 332 333 if (worker->flags & IO_WORKER_F_FREE) { 334 worker->flags &= ~IO_WORKER_F_FREE; 335 hlist_nulls_del_init_rcu(&worker->nulls_node); 336 } 337 338 /* 339 * If worker is moving from bound to unbound (or vice versa), then 340 * ensure we update the running accounting. 341 */ 342 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; 343 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; 344 if (worker_bound != work_bound) { 345 int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND; 346 io_wqe_dec_running(worker); 347 worker->flags ^= IO_WORKER_F_BOUND; 348 wqe->acct[index].nr_workers--; 349 wqe->acct[index ^ 1].nr_workers++; 350 io_wqe_inc_running(worker); 351 } 352} 353 354/* 355 * No work, worker going to sleep. Move to freelist, and unuse mm if we 356 * have one attached. Dropping the mm may potentially sleep, so we drop 357 * the lock in that case and return success. Since the caller has to 358 * retry the loop in that case (we changed task state), we don't regrab 359 * the lock if we return success. 360 */ 361static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) 362 __must_hold(wqe->lock) 363{ 364 if (!(worker->flags & IO_WORKER_F_FREE)) { 365 worker->flags |= IO_WORKER_F_FREE; 366 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 367 } 368} 369 370static inline unsigned int io_get_work_hash(struct io_wq_work *work) 371{ 372 return work->flags >> IO_WQ_HASH_SHIFT; 373} 374 375static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) 376{ 377 struct io_wq *wq = wqe->wq; 378 379 spin_lock(&wq->hash->wait.lock); 380 if (list_empty(&wqe->wait.entry)) { 381 __add_wait_queue(&wq->hash->wait, &wqe->wait); 382 if (!test_bit(hash, &wq->hash->map)) { 383 __set_current_state(TASK_RUNNING); 384 list_del_init(&wqe->wait.entry); 385 } 386 } 387 spin_unlock(&wq->hash->wait.lock); 388} 389 390static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) 391 __must_hold(wqe->lock) 392{ 393 struct io_wq_work_node *node, *prev; 394 struct io_wq_work *work, *tail; 395 unsigned int stall_hash = -1U; 396 397 wq_list_for_each(node, prev, &wqe->work_list) { 398 unsigned int hash; 399 400 work = container_of(node, struct io_wq_work, list); 401 402 /* not hashed, can run anytime */ 403 if (!io_wq_is_hashed(work)) { 404 wq_list_del(&wqe->work_list, node, prev); 405 return work; 406 } 407 408 hash = io_get_work_hash(work); 409 /* all items with this hash lie in [work, tail] */ 410 tail = wqe->hash_tail[hash]; 411 412 /* hashed, can run if not already running */ 413 if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { 414 wqe->hash_tail[hash] = NULL; 415 wq_list_cut(&wqe->work_list, &tail->list, prev); 416 return work; 417 } 418 if (stall_hash == -1U) 419 stall_hash = hash; 420 /* fast forward to a next hash, for-each will fix up @prev */ 421 node = &tail->list; 422 } 423 424 if (stall_hash != -1U) { 425 raw_spin_unlock(&wqe->lock); 426 io_wait_on_hash(wqe, stall_hash); 427 raw_spin_lock(&wqe->lock); 428 } 429 430 return NULL; 431} 432 433static bool io_flush_signals(void) 434{ 435 if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) { 436 __set_current_state(TASK_RUNNING); 437 tracehook_notify_signal(); 438 return true; 439 } 440 return false; 441} 442 443static void io_assign_current_work(struct io_worker *worker, 444 struct io_wq_work *work) 445{ 446 if (work) { 447 io_flush_signals(); 448 cond_resched(); 449 } 450 451 spin_lock_irq(&worker->lock); 452 worker->cur_work = work; 453 spin_unlock_irq(&worker->lock); 454} 455 456static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); 457 458static void io_worker_handle_work(struct io_worker *worker) 459 __releases(wqe->lock) 460{ 461 struct io_wqe *wqe = worker->wqe; 462 struct io_wq *wq = wqe->wq; 463 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 464 465 do { 466 struct io_wq_work *work; 467get_next: 468 /* 469 * If we got some work, mark us as busy. If we didn't, but 470 * the list isn't empty, it means we stalled on hashed work. 471 * Mark us stalled so we don't keep looking for work when we 472 * can't make progress, any work completion or insertion will 473 * clear the stalled flag. 474 */ 475 work = io_get_next_work(wqe); 476 if (work) 477 __io_worker_busy(wqe, worker, work); 478 else if (!wq_list_empty(&wqe->work_list)) 479 wqe->flags |= IO_WQE_FLAG_STALLED; 480 481 raw_spin_unlock_irq(&wqe->lock); 482 if (!work) 483 break; 484 io_assign_current_work(worker, work); 485 __set_current_state(TASK_RUNNING); 486 487 /* handle a whole dependent link */ 488 do { 489 struct io_wq_work *next_hashed, *linked; 490 unsigned int hash = io_get_work_hash(work); 491 492 next_hashed = wq_next_work(work); 493 494 if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) 495 work->flags |= IO_WQ_WORK_CANCEL; 496 wq->do_work(work); 497 io_assign_current_work(worker, NULL); 498 499 linked = wq->free_work(work); 500 work = next_hashed; 501 if (!work && linked && !io_wq_is_hashed(linked)) { 502 work = linked; 503 linked = NULL; 504 } 505 io_assign_current_work(worker, work); 506 if (linked) 507 io_wqe_enqueue(wqe, linked); 508 509 if (hash != -1U && !next_hashed) { 510 clear_bit(hash, &wq->hash->map); 511 if (wq_has_sleeper(&wq->hash->wait)) 512 wake_up(&wq->hash->wait); 513 raw_spin_lock_irq(&wqe->lock); 514 wqe->flags &= ~IO_WQE_FLAG_STALLED; 515 /* skip unnecessary unlock-lock wqe->lock */ 516 if (!work) 517 goto get_next; 518 raw_spin_unlock_irq(&wqe->lock); 519 } 520 } while (work); 521 522 raw_spin_lock_irq(&wqe->lock); 523 } while (1); 524} 525 526static int io_wqe_worker(void *data) 527{ 528 struct io_worker *worker = data; 529 struct io_wqe *wqe = worker->wqe; 530 struct io_wq *wq = wqe->wq; 531 char buf[TASK_COMM_LEN]; 532 533 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 534 535 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 536 set_task_comm(current, buf); 537 538 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 539 long ret; 540 541 set_current_state(TASK_INTERRUPTIBLE); 542loop: 543 raw_spin_lock_irq(&wqe->lock); 544 if (io_wqe_run_queue(wqe)) { 545 io_worker_handle_work(worker); 546 goto loop; 547 } 548 __io_worker_idle(wqe, worker); 549 raw_spin_unlock_irq(&wqe->lock); 550 if (io_flush_signals()) 551 continue; 552 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 553 if (signal_pending(current)) { 554 struct ksignal ksig; 555 556 if (!get_signal(&ksig)) 557 continue; 558 break; 559 } 560 if (ret) 561 continue; 562 /* timed out, exit unless we're the fixed worker */ 563 if (!(worker->flags & IO_WORKER_F_FIXED)) 564 break; 565 } 566 567 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 568 raw_spin_lock_irq(&wqe->lock); 569 io_worker_handle_work(worker); 570 } 571 572 io_worker_exit(worker); 573 return 0; 574} 575 576/* 577 * Called when a worker is scheduled in. Mark us as currently running. 578 */ 579void io_wq_worker_running(struct task_struct *tsk) 580{ 581 struct io_worker *worker = tsk->pf_io_worker; 582 583 if (!worker) 584 return; 585 if (!(worker->flags & IO_WORKER_F_UP)) 586 return; 587 if (worker->flags & IO_WORKER_F_RUNNING) 588 return; 589 worker->flags |= IO_WORKER_F_RUNNING; 590 io_wqe_inc_running(worker); 591} 592 593/* 594 * Called when worker is going to sleep. If there are no workers currently 595 * running and we have work pending, wake up a free one or create a new one. 596 */ 597void io_wq_worker_sleeping(struct task_struct *tsk) 598{ 599 struct io_worker *worker = tsk->pf_io_worker; 600 601 if (!worker) 602 return; 603 if (!(worker->flags & IO_WORKER_F_UP)) 604 return; 605 if (!(worker->flags & IO_WORKER_F_RUNNING)) 606 return; 607 608 worker->flags &= ~IO_WORKER_F_RUNNING; 609 610 raw_spin_lock_irq(&worker->wqe->lock); 611 io_wqe_dec_running(worker); 612 raw_spin_unlock_irq(&worker->wqe->lock); 613} 614 615static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) 616{ 617 struct io_wqe_acct *acct = &wqe->acct[index]; 618 struct io_worker *worker; 619 struct task_struct *tsk; 620 621 __set_current_state(TASK_RUNNING); 622 623 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); 624 if (!worker) 625 goto fail; 626 627 refcount_set(&worker->ref, 1); 628 worker->nulls_node.pprev = NULL; 629 worker->wqe = wqe; 630 spin_lock_init(&worker->lock); 631 init_completion(&worker->ref_done); 632 633 tsk = create_io_thread(io_wqe_worker, worker, wqe->node); 634 if (IS_ERR(tsk)) { 635 kfree(worker); 636fail: 637 atomic_dec(&acct->nr_running); 638 io_worker_ref_put(wq); 639 return; 640 } 641 642 tsk->pf_io_worker = worker; 643 worker->task = tsk; 644 set_cpus_allowed_ptr(tsk, wqe->cpu_mask); 645 tsk->flags |= PF_NO_SETAFFINITY; 646 647 raw_spin_lock_irq(&wqe->lock); 648 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 649 list_add_tail_rcu(&worker->all_list, &wqe->all_list); 650 worker->flags |= IO_WORKER_F_FREE; 651 if (index == IO_WQ_ACCT_BOUND) 652 worker->flags |= IO_WORKER_F_BOUND; 653 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) 654 worker->flags |= IO_WORKER_F_FIXED; 655 acct->nr_workers++; 656 raw_spin_unlock_irq(&wqe->lock); 657 wake_up_new_task(tsk); 658} 659 660/* 661 * Iterate the passed in list and call the specific function for each 662 * worker that isn't exiting 663 */ 664static bool io_wq_for_each_worker(struct io_wqe *wqe, 665 bool (*func)(struct io_worker *, void *), 666 void *data) 667{ 668 struct io_worker *worker; 669 bool ret = false; 670 671 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { 672 if (io_worker_get(worker)) { 673 /* no task if node is/was offline */ 674 if (worker->task) 675 ret = func(worker, data); 676 io_worker_release(worker); 677 if (ret) 678 break; 679 } 680 } 681 682 return ret; 683} 684 685static bool io_wq_worker_wake(struct io_worker *worker, void *data) 686{ 687 set_notify_signal(worker->task); 688 wake_up_process(worker->task); 689 return false; 690} 691 692static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 693{ 694 return true; 695} 696 697static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) 698{ 699 struct io_wq *wq = wqe->wq; 700 701 do { 702 work->flags |= IO_WQ_WORK_CANCEL; 703 wq->do_work(work); 704 work = wq->free_work(work); 705 } while (work); 706} 707 708static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) 709{ 710 unsigned int hash; 711 struct io_wq_work *tail; 712 713 if (!io_wq_is_hashed(work)) { 714append: 715 wq_list_add_tail(&work->list, &wqe->work_list); 716 return; 717 } 718 719 hash = io_get_work_hash(work); 720 tail = wqe->hash_tail[hash]; 721 wqe->hash_tail[hash] = work; 722 if (!tail) 723 goto append; 724 725 wq_list_add_after(&work->list, &tail->list, &wqe->work_list); 726} 727 728static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) 729{ 730 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 731 int work_flags; 732 unsigned long flags; 733 734 if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) { 735 io_run_cancel(work, wqe); 736 return; 737 } 738 739 work_flags = work->flags; 740 raw_spin_lock_irqsave(&wqe->lock, flags); 741 io_wqe_insert_work(wqe, work); 742 wqe->flags &= ~IO_WQE_FLAG_STALLED; 743 raw_spin_unlock_irqrestore(&wqe->lock, flags); 744 745 if ((work_flags & IO_WQ_WORK_CONCURRENT) || 746 !atomic_read(&acct->nr_running)) 747 io_wqe_wake_worker(wqe, acct); 748} 749 750void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 751{ 752 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 753 754 io_wqe_enqueue(wqe, work); 755} 756 757/* 758 * Work items that hash to the same value will not be done in parallel. 759 * Used to limit concurrent writes, generally hashed by inode. 760 */ 761void io_wq_hash_work(struct io_wq_work *work, void *val) 762{ 763 unsigned int bit; 764 765 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 766 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 767} 768 769static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 770{ 771 struct io_cb_cancel_data *match = data; 772 unsigned long flags; 773 774 /* 775 * Hold the lock to avoid ->cur_work going out of scope, caller 776 * may dereference the passed in work. 777 */ 778 spin_lock_irqsave(&worker->lock, flags); 779 if (worker->cur_work && 780 match->fn(worker->cur_work, match->data)) { 781 set_notify_signal(worker->task); 782 match->nr_running++; 783 } 784 spin_unlock_irqrestore(&worker->lock, flags); 785 786 return match->nr_running && !match->cancel_all; 787} 788 789static inline void io_wqe_remove_pending(struct io_wqe *wqe, 790 struct io_wq_work *work, 791 struct io_wq_work_node *prev) 792{ 793 unsigned int hash = io_get_work_hash(work); 794 struct io_wq_work *prev_work = NULL; 795 796 if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) { 797 if (prev) 798 prev_work = container_of(prev, struct io_wq_work, list); 799 if (prev_work && io_get_work_hash(prev_work) == hash) 800 wqe->hash_tail[hash] = prev_work; 801 else 802 wqe->hash_tail[hash] = NULL; 803 } 804 wq_list_del(&wqe->work_list, &work->list, prev); 805} 806 807static void io_wqe_cancel_pending_work(struct io_wqe *wqe, 808 struct io_cb_cancel_data *match) 809{ 810 struct io_wq_work_node *node, *prev; 811 struct io_wq_work *work; 812 unsigned long flags; 813 814retry: 815 raw_spin_lock_irqsave(&wqe->lock, flags); 816 wq_list_for_each(node, prev, &wqe->work_list) { 817 work = container_of(node, struct io_wq_work, list); 818 if (!match->fn(work, match->data)) 819 continue; 820 io_wqe_remove_pending(wqe, work, prev); 821 raw_spin_unlock_irqrestore(&wqe->lock, flags); 822 io_run_cancel(work, wqe); 823 match->nr_pending++; 824 if (!match->cancel_all) 825 return; 826 827 /* not safe to continue after unlock */ 828 goto retry; 829 } 830 raw_spin_unlock_irqrestore(&wqe->lock, flags); 831} 832 833static void io_wqe_cancel_running_work(struct io_wqe *wqe, 834 struct io_cb_cancel_data *match) 835{ 836 rcu_read_lock(); 837 io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); 838 rcu_read_unlock(); 839} 840 841enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 842 void *data, bool cancel_all) 843{ 844 struct io_cb_cancel_data match = { 845 .fn = cancel, 846 .data = data, 847 .cancel_all = cancel_all, 848 }; 849 int node; 850 851 /* 852 * First check pending list, if we're lucky we can just remove it 853 * from there. CANCEL_OK means that the work is returned as-new, 854 * no completion will be posted for it. 855 */ 856 for_each_node(node) { 857 struct io_wqe *wqe = wq->wqes[node]; 858 859 io_wqe_cancel_pending_work(wqe, &match); 860 if (match.nr_pending && !match.cancel_all) 861 return IO_WQ_CANCEL_OK; 862 } 863 864 /* 865 * Now check if a free (going busy) or busy worker has the work 866 * currently running. If we find it there, we'll return CANCEL_RUNNING 867 * as an indication that we attempt to signal cancellation. The 868 * completion will run normally in this case. 869 */ 870 for_each_node(node) { 871 struct io_wqe *wqe = wq->wqes[node]; 872 873 io_wqe_cancel_running_work(wqe, &match); 874 if (match.nr_running && !match.cancel_all) 875 return IO_WQ_CANCEL_RUNNING; 876 } 877 878 if (match.nr_running) 879 return IO_WQ_CANCEL_RUNNING; 880 if (match.nr_pending) 881 return IO_WQ_CANCEL_OK; 882 return IO_WQ_CANCEL_NOTFOUND; 883} 884 885static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, 886 int sync, void *key) 887{ 888 struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); 889 890 list_del_init(&wait->entry); 891 892 rcu_read_lock(); 893 io_wqe_activate_free_worker(wqe); 894 rcu_read_unlock(); 895 return 1; 896} 897 898struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 899{ 900 int ret, node; 901 struct io_wq *wq; 902 903 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 904 return ERR_PTR(-EINVAL); 905 if (WARN_ON_ONCE(!bounded)) 906 return ERR_PTR(-EINVAL); 907 908 wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL); 909 if (!wq) 910 return ERR_PTR(-ENOMEM); 911 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 912 if (ret) 913 goto err_wq; 914 915 refcount_inc(&data->hash->refs); 916 wq->hash = data->hash; 917 wq->free_work = data->free_work; 918 wq->do_work = data->do_work; 919 920 ret = -ENOMEM; 921 for_each_node(node) { 922 struct io_wqe *wqe; 923 int alloc_node = node; 924 925 if (!node_online(alloc_node)) 926 alloc_node = NUMA_NO_NODE; 927 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); 928 if (!wqe) 929 goto err; 930 if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL)) 931 goto err; 932 cpumask_copy(wqe->cpu_mask, cpumask_of_node(node)); 933 wq->wqes[node] = wqe; 934 wqe->node = alloc_node; 935 wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND; 936 wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND; 937 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 938 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); 939 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = 940 task_rlimit(current, RLIMIT_NPROC); 941 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); 942 wqe->wait.func = io_wqe_hash_wake; 943 INIT_LIST_HEAD(&wqe->wait.entry); 944 wqe->wq = wq; 945 raw_spin_lock_init(&wqe->lock); 946 INIT_WQ_LIST(&wqe->work_list); 947 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); 948 INIT_LIST_HEAD(&wqe->all_list); 949 } 950 951 wq->task = get_task_struct(data->task); 952 atomic_set(&wq->worker_refs, 1); 953 init_completion(&wq->worker_done); 954 return wq; 955err: 956 io_wq_put_hash(data->hash); 957 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 958 for_each_node(node) { 959 if (!wq->wqes[node]) 960 continue; 961 free_cpumask_var(wq->wqes[node]->cpu_mask); 962 kfree(wq->wqes[node]); 963 } 964err_wq: 965 kfree(wq); 966 return ERR_PTR(ret); 967} 968 969static bool io_task_work_match(struct callback_head *cb, void *data) 970{ 971 struct create_worker_data *cwd; 972 973 if (cb->func != create_worker_cb) 974 return false; 975 cwd = container_of(cb, struct create_worker_data, work); 976 return cwd->wqe->wq == data; 977} 978 979void io_wq_exit_start(struct io_wq *wq) 980{ 981 set_bit(IO_WQ_BIT_EXIT, &wq->state); 982} 983 984static void io_wq_exit_workers(struct io_wq *wq) 985{ 986 struct callback_head *cb; 987 int node; 988 989 if (!wq->task) 990 return; 991 992 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 993 struct create_worker_data *cwd; 994 995 cwd = container_of(cb, struct create_worker_data, work); 996 atomic_dec(&cwd->wqe->acct[cwd->index].nr_running); 997 io_worker_ref_put(wq); 998 kfree(cwd); 999 } 1000 1001 rcu_read_lock(); 1002 for_each_node(node) { 1003 struct io_wqe *wqe = wq->wqes[node]; 1004 1005 io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL); 1006 } 1007 rcu_read_unlock(); 1008 io_worker_ref_put(wq); 1009 wait_for_completion(&wq->worker_done); 1010 1011 for_each_node(node) { 1012 spin_lock_irq(&wq->hash->wait.lock); 1013 list_del_init(&wq->wqes[node]->wait.entry); 1014 spin_unlock_irq(&wq->hash->wait.lock); 1015 } 1016 put_task_struct(wq->task); 1017 wq->task = NULL; 1018} 1019 1020static void io_wq_destroy(struct io_wq *wq) 1021{ 1022 int node; 1023 1024 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1025 1026 for_each_node(node) { 1027 struct io_wqe *wqe = wq->wqes[node]; 1028 struct io_cb_cancel_data match = { 1029 .fn = io_wq_work_match_all, 1030 .cancel_all = true, 1031 }; 1032 io_wqe_cancel_pending_work(wqe, &match); 1033 free_cpumask_var(wqe->cpu_mask); 1034 kfree(wqe); 1035 } 1036 io_wq_put_hash(wq->hash); 1037 kfree(wq); 1038} 1039 1040void io_wq_put_and_exit(struct io_wq *wq) 1041{ 1042 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1043 1044 io_wq_exit_workers(wq); 1045 io_wq_destroy(wq); 1046} 1047 1048struct online_data { 1049 unsigned int cpu; 1050 bool online; 1051}; 1052 1053static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1054{ 1055 struct online_data *od = data; 1056 1057 if (od->online) 1058 cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask); 1059 else 1060 cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask); 1061 return false; 1062} 1063 1064static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1065{ 1066 struct online_data od = { 1067 .cpu = cpu, 1068 .online = online 1069 }; 1070 int i; 1071 1072 rcu_read_lock(); 1073 for_each_node(i) 1074 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od); 1075 rcu_read_unlock(); 1076 return 0; 1077} 1078 1079static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1080{ 1081 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1082 1083 return __io_wq_cpu_online(wq, cpu, true); 1084} 1085 1086static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1087{ 1088 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1089 1090 return __io_wq_cpu_online(wq, cpu, false); 1091} 1092 1093int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask) 1094{ 1095 int i; 1096 1097 rcu_read_lock(); 1098 for_each_node(i) { 1099 struct io_wqe *wqe = wq->wqes[i]; 1100 1101 if (mask) 1102 cpumask_copy(wqe->cpu_mask, mask); 1103 else 1104 cpumask_copy(wqe->cpu_mask, cpumask_of_node(i)); 1105 } 1106 rcu_read_unlock(); 1107 return 0; 1108} 1109 1110static __init int io_wq_init(void) 1111{ 1112 int ret; 1113 1114 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1115 io_wq_cpu_online, io_wq_cpu_offline); 1116 if (ret < 0) 1117 return ret; 1118 io_wq_online = ret; 1119 return 0; 1120} 1121subsys_initcall(io_wq_init);