Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.17-rc4 1422 lines 34 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8#include <linux/kernel.h> 9#include <linux/init.h> 10#include <linux/errno.h> 11#include <linux/sched/signal.h> 12#include <linux/percpu.h> 13#include <linux/slab.h> 14#include <linux/rculist_nulls.h> 15#include <linux/cpu.h> 16#include <linux/tracehook.h> 17#include <linux/audit.h> 18#include <uapi/linux/io_uring.h> 19 20#include "io-wq.h" 21 22#define WORKER_IDLE_TIMEOUT (5 * HZ) 23 24enum { 25 IO_WORKER_F_UP = 1, /* up and active */ 26 IO_WORKER_F_RUNNING = 2, /* account as running */ 27 IO_WORKER_F_FREE = 4, /* worker on free list */ 28 IO_WORKER_F_BOUND = 8, /* is doing bounded work */ 29}; 30 31enum { 32 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 33}; 34 35enum { 36 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 37}; 38 39/* 40 * One for each thread in a wqe pool 41 */ 42struct io_worker { 43 refcount_t ref; 44 unsigned flags; 45 struct hlist_nulls_node nulls_node; 46 struct list_head all_list; 47 struct task_struct *task; 48 struct io_wqe *wqe; 49 50 struct io_wq_work *cur_work; 51 struct io_wq_work *next_work; 52 raw_spinlock_t lock; 53 54 struct completion ref_done; 55 56 unsigned long create_state; 57 struct callback_head create_work; 58 int create_index; 59 60 union { 61 struct rcu_head rcu; 62 struct work_struct work; 63 }; 64}; 65 66#if BITS_PER_LONG == 64 67#define IO_WQ_HASH_ORDER 6 68#else 69#define IO_WQ_HASH_ORDER 5 70#endif 71 72#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 73 74struct io_wqe_acct { 75 unsigned nr_workers; 76 unsigned max_workers; 77 int index; 78 atomic_t nr_running; 79 struct io_wq_work_list work_list; 80 unsigned long flags; 81}; 82 83enum { 84 IO_WQ_ACCT_BOUND, 85 IO_WQ_ACCT_UNBOUND, 86 IO_WQ_ACCT_NR, 87}; 88 89/* 90 * Per-node worker thread pool 91 */ 92struct io_wqe { 93 raw_spinlock_t lock; 94 struct io_wqe_acct acct[2]; 95 96 int node; 97 98 struct hlist_nulls_head free_list; 99 struct list_head all_list; 100 101 struct wait_queue_entry wait; 102 103 struct io_wq *wq; 104 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 105 106 cpumask_var_t cpu_mask; 107}; 108 109/* 110 * Per io_wq state 111 */ 112struct io_wq { 113 unsigned long state; 114 115 free_work_fn *free_work; 116 io_wq_work_fn *do_work; 117 118 struct io_wq_hash *hash; 119 120 atomic_t worker_refs; 121 struct completion worker_done; 122 123 struct hlist_node cpuhp_node; 124 125 struct task_struct *task; 126 127 struct io_wqe *wqes[]; 128}; 129 130static enum cpuhp_state io_wq_online; 131 132struct io_cb_cancel_data { 133 work_cancel_fn *fn; 134 void *data; 135 int nr_running; 136 int nr_pending; 137 bool cancel_all; 138}; 139 140static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index); 141static void io_wqe_dec_running(struct io_worker *worker); 142static bool io_acct_cancel_pending_work(struct io_wqe *wqe, 143 struct io_wqe_acct *acct, 144 struct io_cb_cancel_data *match); 145static void create_worker_cb(struct callback_head *cb); 146static void io_wq_cancel_tw_create(struct io_wq *wq); 147 148static bool io_worker_get(struct io_worker *worker) 149{ 150 return refcount_inc_not_zero(&worker->ref); 151} 152 153static void io_worker_release(struct io_worker *worker) 154{ 155 if (refcount_dec_and_test(&worker->ref)) 156 complete(&worker->ref_done); 157} 158 159static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound) 160{ 161 return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 162} 163 164static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, 165 struct io_wq_work *work) 166{ 167 return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND)); 168} 169 170static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker) 171{ 172 return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND); 173} 174 175static void io_worker_ref_put(struct io_wq *wq) 176{ 177 if (atomic_dec_and_test(&wq->worker_refs)) 178 complete(&wq->worker_done); 179} 180 181static void io_worker_cancel_cb(struct io_worker *worker) 182{ 183 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 184 struct io_wqe *wqe = worker->wqe; 185 struct io_wq *wq = wqe->wq; 186 187 atomic_dec(&acct->nr_running); 188 raw_spin_lock(&worker->wqe->lock); 189 acct->nr_workers--; 190 raw_spin_unlock(&worker->wqe->lock); 191 io_worker_ref_put(wq); 192 clear_bit_unlock(0, &worker->create_state); 193 io_worker_release(worker); 194} 195 196static bool io_task_worker_match(struct callback_head *cb, void *data) 197{ 198 struct io_worker *worker; 199 200 if (cb->func != create_worker_cb) 201 return false; 202 worker = container_of(cb, struct io_worker, create_work); 203 return worker == data; 204} 205 206static void io_worker_exit(struct io_worker *worker) 207{ 208 struct io_wqe *wqe = worker->wqe; 209 struct io_wq *wq = wqe->wq; 210 211 while (1) { 212 struct callback_head *cb = task_work_cancel_match(wq->task, 213 io_task_worker_match, worker); 214 215 if (!cb) 216 break; 217 io_worker_cancel_cb(worker); 218 } 219 220 io_worker_release(worker); 221 wait_for_completion(&worker->ref_done); 222 223 raw_spin_lock(&wqe->lock); 224 if (worker->flags & IO_WORKER_F_FREE) 225 hlist_nulls_del_rcu(&worker->nulls_node); 226 list_del_rcu(&worker->all_list); 227 preempt_disable(); 228 io_wqe_dec_running(worker); 229 worker->flags = 0; 230 current->flags &= ~PF_IO_WORKER; 231 preempt_enable(); 232 raw_spin_unlock(&wqe->lock); 233 234 kfree_rcu(worker, rcu); 235 io_worker_ref_put(wqe->wq); 236 do_exit(0); 237} 238 239static inline bool io_acct_run_queue(struct io_wqe_acct *acct) 240{ 241 if (!wq_list_empty(&acct->work_list) && 242 !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 243 return true; 244 return false; 245} 246 247/* 248 * Check head of free list for an available worker. If one isn't available, 249 * caller must create one. 250 */ 251static bool io_wqe_activate_free_worker(struct io_wqe *wqe, 252 struct io_wqe_acct *acct) 253 __must_hold(RCU) 254{ 255 struct hlist_nulls_node *n; 256 struct io_worker *worker; 257 258 /* 259 * Iterate free_list and see if we can find an idle worker to 260 * activate. If a given worker is on the free_list but in the process 261 * of exiting, keep trying. 262 */ 263 hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { 264 if (!io_worker_get(worker)) 265 continue; 266 if (io_wqe_get_acct(worker) != acct) { 267 io_worker_release(worker); 268 continue; 269 } 270 if (wake_up_process(worker->task)) { 271 io_worker_release(worker); 272 return true; 273 } 274 io_worker_release(worker); 275 } 276 277 return false; 278} 279 280/* 281 * We need a worker. If we find a free one, we're good. If not, and we're 282 * below the max number of workers, create one. 283 */ 284static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) 285{ 286 /* 287 * Most likely an attempt to queue unbounded work on an io_wq that 288 * wasn't setup with any unbounded workers. 289 */ 290 if (unlikely(!acct->max_workers)) 291 pr_warn_once("io-wq is not configured for unbound workers"); 292 293 raw_spin_lock(&wqe->lock); 294 if (acct->nr_workers >= acct->max_workers) { 295 raw_spin_unlock(&wqe->lock); 296 return true; 297 } 298 acct->nr_workers++; 299 raw_spin_unlock(&wqe->lock); 300 atomic_inc(&acct->nr_running); 301 atomic_inc(&wqe->wq->worker_refs); 302 return create_io_worker(wqe->wq, wqe, acct->index); 303} 304 305static void io_wqe_inc_running(struct io_worker *worker) 306{ 307 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 308 309 atomic_inc(&acct->nr_running); 310} 311 312static void create_worker_cb(struct callback_head *cb) 313{ 314 struct io_worker *worker; 315 struct io_wq *wq; 316 struct io_wqe *wqe; 317 struct io_wqe_acct *acct; 318 bool do_create = false; 319 320 worker = container_of(cb, struct io_worker, create_work); 321 wqe = worker->wqe; 322 wq = wqe->wq; 323 acct = &wqe->acct[worker->create_index]; 324 raw_spin_lock(&wqe->lock); 325 if (acct->nr_workers < acct->max_workers) { 326 acct->nr_workers++; 327 do_create = true; 328 } 329 raw_spin_unlock(&wqe->lock); 330 if (do_create) { 331 create_io_worker(wq, wqe, worker->create_index); 332 } else { 333 atomic_dec(&acct->nr_running); 334 io_worker_ref_put(wq); 335 } 336 clear_bit_unlock(0, &worker->create_state); 337 io_worker_release(worker); 338} 339 340static bool io_queue_worker_create(struct io_worker *worker, 341 struct io_wqe_acct *acct, 342 task_work_func_t func) 343{ 344 struct io_wqe *wqe = worker->wqe; 345 struct io_wq *wq = wqe->wq; 346 347 /* raced with exit, just ignore create call */ 348 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 349 goto fail; 350 if (!io_worker_get(worker)) 351 goto fail; 352 /* 353 * create_state manages ownership of create_work/index. We should 354 * only need one entry per worker, as the worker going to sleep 355 * will trigger the condition, and waking will clear it once it 356 * runs the task_work. 357 */ 358 if (test_bit(0, &worker->create_state) || 359 test_and_set_bit_lock(0, &worker->create_state)) 360 goto fail_release; 361 362 atomic_inc(&wq->worker_refs); 363 init_task_work(&worker->create_work, func); 364 worker->create_index = acct->index; 365 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 366 /* 367 * EXIT may have been set after checking it above, check after 368 * adding the task_work and remove any creation item if it is 369 * now set. wq exit does that too, but we can have added this 370 * work item after we canceled in io_wq_exit_workers(). 371 */ 372 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 373 io_wq_cancel_tw_create(wq); 374 io_worker_ref_put(wq); 375 return true; 376 } 377 io_worker_ref_put(wq); 378 clear_bit_unlock(0, &worker->create_state); 379fail_release: 380 io_worker_release(worker); 381fail: 382 atomic_dec(&acct->nr_running); 383 io_worker_ref_put(wq); 384 return false; 385} 386 387static void io_wqe_dec_running(struct io_worker *worker) 388 __must_hold(wqe->lock) 389{ 390 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 391 struct io_wqe *wqe = worker->wqe; 392 393 if (!(worker->flags & IO_WORKER_F_UP)) 394 return; 395 396 if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { 397 atomic_inc(&acct->nr_running); 398 atomic_inc(&wqe->wq->worker_refs); 399 raw_spin_unlock(&wqe->lock); 400 io_queue_worker_create(worker, acct, create_worker_cb); 401 raw_spin_lock(&wqe->lock); 402 } 403} 404 405/* 406 * Worker will start processing some work. Move it to the busy list, if 407 * it's currently on the freelist 408 */ 409static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker) 410 __must_hold(wqe->lock) 411{ 412 if (worker->flags & IO_WORKER_F_FREE) { 413 worker->flags &= ~IO_WORKER_F_FREE; 414 hlist_nulls_del_init_rcu(&worker->nulls_node); 415 } 416} 417 418/* 419 * No work, worker going to sleep. Move to freelist, and unuse mm if we 420 * have one attached. Dropping the mm may potentially sleep, so we drop 421 * the lock in that case and return success. Since the caller has to 422 * retry the loop in that case (we changed task state), we don't regrab 423 * the lock if we return success. 424 */ 425static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) 426 __must_hold(wqe->lock) 427{ 428 if (!(worker->flags & IO_WORKER_F_FREE)) { 429 worker->flags |= IO_WORKER_F_FREE; 430 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 431 } 432} 433 434static inline unsigned int io_get_work_hash(struct io_wq_work *work) 435{ 436 return work->flags >> IO_WQ_HASH_SHIFT; 437} 438 439static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) 440{ 441 struct io_wq *wq = wqe->wq; 442 bool ret = false; 443 444 spin_lock_irq(&wq->hash->wait.lock); 445 if (list_empty(&wqe->wait.entry)) { 446 __add_wait_queue(&wq->hash->wait, &wqe->wait); 447 if (!test_bit(hash, &wq->hash->map)) { 448 __set_current_state(TASK_RUNNING); 449 list_del_init(&wqe->wait.entry); 450 ret = true; 451 } 452 } 453 spin_unlock_irq(&wq->hash->wait.lock); 454 return ret; 455} 456 457static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, 458 struct io_worker *worker) 459 __must_hold(wqe->lock) 460{ 461 struct io_wq_work_node *node, *prev; 462 struct io_wq_work *work, *tail; 463 unsigned int stall_hash = -1U; 464 struct io_wqe *wqe = worker->wqe; 465 466 wq_list_for_each(node, prev, &acct->work_list) { 467 unsigned int hash; 468 469 work = container_of(node, struct io_wq_work, list); 470 471 /* not hashed, can run anytime */ 472 if (!io_wq_is_hashed(work)) { 473 wq_list_del(&acct->work_list, node, prev); 474 return work; 475 } 476 477 hash = io_get_work_hash(work); 478 /* all items with this hash lie in [work, tail] */ 479 tail = wqe->hash_tail[hash]; 480 481 /* hashed, can run if not already running */ 482 if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { 483 wqe->hash_tail[hash] = NULL; 484 wq_list_cut(&acct->work_list, &tail->list, prev); 485 return work; 486 } 487 if (stall_hash == -1U) 488 stall_hash = hash; 489 /* fast forward to a next hash, for-each will fix up @prev */ 490 node = &tail->list; 491 } 492 493 if (stall_hash != -1U) { 494 bool unstalled; 495 496 /* 497 * Set this before dropping the lock to avoid racing with new 498 * work being added and clearing the stalled bit. 499 */ 500 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 501 raw_spin_unlock(&wqe->lock); 502 unstalled = io_wait_on_hash(wqe, stall_hash); 503 raw_spin_lock(&wqe->lock); 504 if (unstalled) { 505 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 506 if (wq_has_sleeper(&wqe->wq->hash->wait)) 507 wake_up(&wqe->wq->hash->wait); 508 } 509 } 510 511 return NULL; 512} 513 514static bool io_flush_signals(void) 515{ 516 if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) { 517 __set_current_state(TASK_RUNNING); 518 tracehook_notify_signal(); 519 return true; 520 } 521 return false; 522} 523 524static void io_assign_current_work(struct io_worker *worker, 525 struct io_wq_work *work) 526{ 527 if (work) { 528 io_flush_signals(); 529 cond_resched(); 530 } 531 532 raw_spin_lock(&worker->lock); 533 worker->cur_work = work; 534 worker->next_work = NULL; 535 raw_spin_unlock(&worker->lock); 536} 537 538static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); 539 540static void io_worker_handle_work(struct io_worker *worker) 541 __releases(wqe->lock) 542{ 543 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 544 struct io_wqe *wqe = worker->wqe; 545 struct io_wq *wq = wqe->wq; 546 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 547 548 do { 549 struct io_wq_work *work; 550 551 /* 552 * If we got some work, mark us as busy. If we didn't, but 553 * the list isn't empty, it means we stalled on hashed work. 554 * Mark us stalled so we don't keep looking for work when we 555 * can't make progress, any work completion or insertion will 556 * clear the stalled flag. 557 */ 558 work = io_get_next_work(acct, worker); 559 if (work) { 560 __io_worker_busy(wqe, worker); 561 562 /* 563 * Make sure cancelation can find this, even before 564 * it becomes the active work. That avoids a window 565 * where the work has been removed from our general 566 * work list, but isn't yet discoverable as the 567 * current work item for this worker. 568 */ 569 raw_spin_lock(&worker->lock); 570 worker->next_work = work; 571 raw_spin_unlock(&worker->lock); 572 } 573 raw_spin_unlock(&wqe->lock); 574 if (!work) 575 break; 576 io_assign_current_work(worker, work); 577 __set_current_state(TASK_RUNNING); 578 579 /* handle a whole dependent link */ 580 do { 581 struct io_wq_work *next_hashed, *linked; 582 unsigned int hash = io_get_work_hash(work); 583 584 next_hashed = wq_next_work(work); 585 586 if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) 587 work->flags |= IO_WQ_WORK_CANCEL; 588 wq->do_work(work); 589 io_assign_current_work(worker, NULL); 590 591 linked = wq->free_work(work); 592 work = next_hashed; 593 if (!work && linked && !io_wq_is_hashed(linked)) { 594 work = linked; 595 linked = NULL; 596 } 597 io_assign_current_work(worker, work); 598 if (linked) 599 io_wqe_enqueue(wqe, linked); 600 601 if (hash != -1U && !next_hashed) { 602 /* serialize hash clear with wake_up() */ 603 spin_lock_irq(&wq->hash->wait.lock); 604 clear_bit(hash, &wq->hash->map); 605 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 606 spin_unlock_irq(&wq->hash->wait.lock); 607 if (wq_has_sleeper(&wq->hash->wait)) 608 wake_up(&wq->hash->wait); 609 } 610 } while (work); 611 612 raw_spin_lock(&wqe->lock); 613 } while (1); 614} 615 616static int io_wqe_worker(void *data) 617{ 618 struct io_worker *worker = data; 619 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 620 struct io_wqe *wqe = worker->wqe; 621 struct io_wq *wq = wqe->wq; 622 bool last_timeout = false; 623 char buf[TASK_COMM_LEN]; 624 625 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 626 627 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 628 set_task_comm(current, buf); 629 630 audit_alloc_kernel(current); 631 632 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 633 long ret; 634 635 set_current_state(TASK_INTERRUPTIBLE); 636loop: 637 raw_spin_lock(&wqe->lock); 638 if (io_acct_run_queue(acct)) { 639 io_worker_handle_work(worker); 640 goto loop; 641 } 642 /* timed out, exit unless we're the last worker */ 643 if (last_timeout && acct->nr_workers > 1) { 644 acct->nr_workers--; 645 raw_spin_unlock(&wqe->lock); 646 __set_current_state(TASK_RUNNING); 647 break; 648 } 649 last_timeout = false; 650 __io_worker_idle(wqe, worker); 651 raw_spin_unlock(&wqe->lock); 652 if (io_flush_signals()) 653 continue; 654 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 655 if (signal_pending(current)) { 656 struct ksignal ksig; 657 658 if (!get_signal(&ksig)) 659 continue; 660 break; 661 } 662 last_timeout = !ret; 663 } 664 665 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 666 raw_spin_lock(&wqe->lock); 667 io_worker_handle_work(worker); 668 } 669 670 audit_free(current); 671 io_worker_exit(worker); 672 return 0; 673} 674 675/* 676 * Called when a worker is scheduled in. Mark us as currently running. 677 */ 678void io_wq_worker_running(struct task_struct *tsk) 679{ 680 struct io_worker *worker = tsk->worker_private; 681 682 if (!worker) 683 return; 684 if (!(worker->flags & IO_WORKER_F_UP)) 685 return; 686 if (worker->flags & IO_WORKER_F_RUNNING) 687 return; 688 worker->flags |= IO_WORKER_F_RUNNING; 689 io_wqe_inc_running(worker); 690} 691 692/* 693 * Called when worker is going to sleep. If there are no workers currently 694 * running and we have work pending, wake up a free one or create a new one. 695 */ 696void io_wq_worker_sleeping(struct task_struct *tsk) 697{ 698 struct io_worker *worker = tsk->worker_private; 699 700 if (!worker) 701 return; 702 if (!(worker->flags & IO_WORKER_F_UP)) 703 return; 704 if (!(worker->flags & IO_WORKER_F_RUNNING)) 705 return; 706 707 worker->flags &= ~IO_WORKER_F_RUNNING; 708 709 raw_spin_lock(&worker->wqe->lock); 710 io_wqe_dec_running(worker); 711 raw_spin_unlock(&worker->wqe->lock); 712} 713 714static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, 715 struct task_struct *tsk) 716{ 717 tsk->worker_private = worker; 718 worker->task = tsk; 719 set_cpus_allowed_ptr(tsk, wqe->cpu_mask); 720 tsk->flags |= PF_NO_SETAFFINITY; 721 722 raw_spin_lock(&wqe->lock); 723 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); 724 list_add_tail_rcu(&worker->all_list, &wqe->all_list); 725 worker->flags |= IO_WORKER_F_FREE; 726 raw_spin_unlock(&wqe->lock); 727 wake_up_new_task(tsk); 728} 729 730static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 731{ 732 return true; 733} 734 735static inline bool io_should_retry_thread(long err) 736{ 737 /* 738 * Prevent perpetual task_work retry, if the task (or its group) is 739 * exiting. 740 */ 741 if (fatal_signal_pending(current)) 742 return false; 743 744 switch (err) { 745 case -EAGAIN: 746 case -ERESTARTSYS: 747 case -ERESTARTNOINTR: 748 case -ERESTARTNOHAND: 749 return true; 750 default: 751 return false; 752 } 753} 754 755static void create_worker_cont(struct callback_head *cb) 756{ 757 struct io_worker *worker; 758 struct task_struct *tsk; 759 struct io_wqe *wqe; 760 761 worker = container_of(cb, struct io_worker, create_work); 762 clear_bit_unlock(0, &worker->create_state); 763 wqe = worker->wqe; 764 tsk = create_io_thread(io_wqe_worker, worker, wqe->node); 765 if (!IS_ERR(tsk)) { 766 io_init_new_worker(wqe, worker, tsk); 767 io_worker_release(worker); 768 return; 769 } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 770 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 771 772 atomic_dec(&acct->nr_running); 773 raw_spin_lock(&wqe->lock); 774 acct->nr_workers--; 775 if (!acct->nr_workers) { 776 struct io_cb_cancel_data match = { 777 .fn = io_wq_work_match_all, 778 .cancel_all = true, 779 }; 780 781 while (io_acct_cancel_pending_work(wqe, acct, &match)) 782 raw_spin_lock(&wqe->lock); 783 } 784 raw_spin_unlock(&wqe->lock); 785 io_worker_ref_put(wqe->wq); 786 kfree(worker); 787 return; 788 } 789 790 /* re-create attempts grab a new worker ref, drop the existing one */ 791 io_worker_release(worker); 792 schedule_work(&worker->work); 793} 794 795static void io_workqueue_create(struct work_struct *work) 796{ 797 struct io_worker *worker = container_of(work, struct io_worker, work); 798 struct io_wqe_acct *acct = io_wqe_get_acct(worker); 799 800 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 801 kfree(worker); 802} 803 804static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) 805{ 806 struct io_wqe_acct *acct = &wqe->acct[index]; 807 struct io_worker *worker; 808 struct task_struct *tsk; 809 810 __set_current_state(TASK_RUNNING); 811 812 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); 813 if (!worker) { 814fail: 815 atomic_dec(&acct->nr_running); 816 raw_spin_lock(&wqe->lock); 817 acct->nr_workers--; 818 raw_spin_unlock(&wqe->lock); 819 io_worker_ref_put(wq); 820 return false; 821 } 822 823 refcount_set(&worker->ref, 1); 824 worker->wqe = wqe; 825 raw_spin_lock_init(&worker->lock); 826 init_completion(&worker->ref_done); 827 828 if (index == IO_WQ_ACCT_BOUND) 829 worker->flags |= IO_WORKER_F_BOUND; 830 831 tsk = create_io_thread(io_wqe_worker, worker, wqe->node); 832 if (!IS_ERR(tsk)) { 833 io_init_new_worker(wqe, worker, tsk); 834 } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 835 kfree(worker); 836 goto fail; 837 } else { 838 INIT_WORK(&worker->work, io_workqueue_create); 839 schedule_work(&worker->work); 840 } 841 842 return true; 843} 844 845/* 846 * Iterate the passed in list and call the specific function for each 847 * worker that isn't exiting 848 */ 849static bool io_wq_for_each_worker(struct io_wqe *wqe, 850 bool (*func)(struct io_worker *, void *), 851 void *data) 852{ 853 struct io_worker *worker; 854 bool ret = false; 855 856 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { 857 if (io_worker_get(worker)) { 858 /* no task if node is/was offline */ 859 if (worker->task) 860 ret = func(worker, data); 861 io_worker_release(worker); 862 if (ret) 863 break; 864 } 865 } 866 867 return ret; 868} 869 870static bool io_wq_worker_wake(struct io_worker *worker, void *data) 871{ 872 set_notify_signal(worker->task); 873 wake_up_process(worker->task); 874 return false; 875} 876 877static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) 878{ 879 struct io_wq *wq = wqe->wq; 880 881 do { 882 work->flags |= IO_WQ_WORK_CANCEL; 883 wq->do_work(work); 884 work = wq->free_work(work); 885 } while (work); 886} 887 888static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) 889{ 890 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 891 unsigned int hash; 892 struct io_wq_work *tail; 893 894 if (!io_wq_is_hashed(work)) { 895append: 896 wq_list_add_tail(&work->list, &acct->work_list); 897 return; 898 } 899 900 hash = io_get_work_hash(work); 901 tail = wqe->hash_tail[hash]; 902 wqe->hash_tail[hash] = work; 903 if (!tail) 904 goto append; 905 906 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 907} 908 909static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 910{ 911 return work == data; 912} 913 914static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) 915{ 916 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 917 unsigned work_flags = work->flags; 918 bool do_create; 919 920 /* 921 * If io-wq is exiting for this task, or if the request has explicitly 922 * been marked as one that should not get executed, cancel it here. 923 */ 924 if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) || 925 (work->flags & IO_WQ_WORK_CANCEL)) { 926 io_run_cancel(work, wqe); 927 return; 928 } 929 930 raw_spin_lock(&wqe->lock); 931 io_wqe_insert_work(wqe, work); 932 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 933 934 rcu_read_lock(); 935 do_create = !io_wqe_activate_free_worker(wqe, acct); 936 rcu_read_unlock(); 937 938 raw_spin_unlock(&wqe->lock); 939 940 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 941 !atomic_read(&acct->nr_running))) { 942 bool did_create; 943 944 did_create = io_wqe_create_worker(wqe, acct); 945 if (likely(did_create)) 946 return; 947 948 raw_spin_lock(&wqe->lock); 949 /* fatal condition, failed to create the first worker */ 950 if (!acct->nr_workers) { 951 struct io_cb_cancel_data match = { 952 .fn = io_wq_work_match_item, 953 .data = work, 954 .cancel_all = false, 955 }; 956 957 if (io_acct_cancel_pending_work(wqe, acct, &match)) 958 raw_spin_lock(&wqe->lock); 959 } 960 raw_spin_unlock(&wqe->lock); 961 } 962} 963 964void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 965{ 966 struct io_wqe *wqe = wq->wqes[numa_node_id()]; 967 968 io_wqe_enqueue(wqe, work); 969} 970 971/* 972 * Work items that hash to the same value will not be done in parallel. 973 * Used to limit concurrent writes, generally hashed by inode. 974 */ 975void io_wq_hash_work(struct io_wq_work *work, void *val) 976{ 977 unsigned int bit; 978 979 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 980 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 981} 982 983static bool __io_wq_worker_cancel(struct io_worker *worker, 984 struct io_cb_cancel_data *match, 985 struct io_wq_work *work) 986{ 987 if (work && match->fn(work, match->data)) { 988 work->flags |= IO_WQ_WORK_CANCEL; 989 set_notify_signal(worker->task); 990 return true; 991 } 992 993 return false; 994} 995 996static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 997{ 998 struct io_cb_cancel_data *match = data; 999 1000 /* 1001 * Hold the lock to avoid ->cur_work going out of scope, caller 1002 * may dereference the passed in work. 1003 */ 1004 raw_spin_lock(&worker->lock); 1005 if (__io_wq_worker_cancel(worker, match, worker->cur_work) || 1006 __io_wq_worker_cancel(worker, match, worker->next_work)) 1007 match->nr_running++; 1008 raw_spin_unlock(&worker->lock); 1009 1010 return match->nr_running && !match->cancel_all; 1011} 1012 1013static inline void io_wqe_remove_pending(struct io_wqe *wqe, 1014 struct io_wq_work *work, 1015 struct io_wq_work_node *prev) 1016{ 1017 struct io_wqe_acct *acct = io_work_get_acct(wqe, work); 1018 unsigned int hash = io_get_work_hash(work); 1019 struct io_wq_work *prev_work = NULL; 1020 1021 if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) { 1022 if (prev) 1023 prev_work = container_of(prev, struct io_wq_work, list); 1024 if (prev_work && io_get_work_hash(prev_work) == hash) 1025 wqe->hash_tail[hash] = prev_work; 1026 else 1027 wqe->hash_tail[hash] = NULL; 1028 } 1029 wq_list_del(&acct->work_list, &work->list, prev); 1030} 1031 1032static bool io_acct_cancel_pending_work(struct io_wqe *wqe, 1033 struct io_wqe_acct *acct, 1034 struct io_cb_cancel_data *match) 1035 __releases(wqe->lock) 1036{ 1037 struct io_wq_work_node *node, *prev; 1038 struct io_wq_work *work; 1039 1040 wq_list_for_each(node, prev, &acct->work_list) { 1041 work = container_of(node, struct io_wq_work, list); 1042 if (!match->fn(work, match->data)) 1043 continue; 1044 io_wqe_remove_pending(wqe, work, prev); 1045 raw_spin_unlock(&wqe->lock); 1046 io_run_cancel(work, wqe); 1047 match->nr_pending++; 1048 /* not safe to continue after unlock */ 1049 return true; 1050 } 1051 1052 return false; 1053} 1054 1055static void io_wqe_cancel_pending_work(struct io_wqe *wqe, 1056 struct io_cb_cancel_data *match) 1057{ 1058 int i; 1059retry: 1060 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1061 struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); 1062 1063 if (io_acct_cancel_pending_work(wqe, acct, match)) { 1064 raw_spin_lock(&wqe->lock); 1065 if (match->cancel_all) 1066 goto retry; 1067 break; 1068 } 1069 } 1070} 1071 1072static void io_wqe_cancel_running_work(struct io_wqe *wqe, 1073 struct io_cb_cancel_data *match) 1074{ 1075 rcu_read_lock(); 1076 io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); 1077 rcu_read_unlock(); 1078} 1079 1080enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1081 void *data, bool cancel_all) 1082{ 1083 struct io_cb_cancel_data match = { 1084 .fn = cancel, 1085 .data = data, 1086 .cancel_all = cancel_all, 1087 }; 1088 int node; 1089 1090 /* 1091 * First check pending list, if we're lucky we can just remove it 1092 * from there. CANCEL_OK means that the work is returned as-new, 1093 * no completion will be posted for it. 1094 * 1095 * Then check if a free (going busy) or busy worker has the work 1096 * currently running. If we find it there, we'll return CANCEL_RUNNING 1097 * as an indication that we attempt to signal cancellation. The 1098 * completion will run normally in this case. 1099 * 1100 * Do both of these while holding the wqe->lock, to ensure that 1101 * we'll find a work item regardless of state. 1102 */ 1103 for_each_node(node) { 1104 struct io_wqe *wqe = wq->wqes[node]; 1105 1106 raw_spin_lock(&wqe->lock); 1107 io_wqe_cancel_pending_work(wqe, &match); 1108 if (match.nr_pending && !match.cancel_all) { 1109 raw_spin_unlock(&wqe->lock); 1110 return IO_WQ_CANCEL_OK; 1111 } 1112 1113 io_wqe_cancel_running_work(wqe, &match); 1114 raw_spin_unlock(&wqe->lock); 1115 if (match.nr_running && !match.cancel_all) 1116 return IO_WQ_CANCEL_RUNNING; 1117 } 1118 1119 if (match.nr_running) 1120 return IO_WQ_CANCEL_RUNNING; 1121 if (match.nr_pending) 1122 return IO_WQ_CANCEL_OK; 1123 return IO_WQ_CANCEL_NOTFOUND; 1124} 1125 1126static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1127 int sync, void *key) 1128{ 1129 struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); 1130 int i; 1131 1132 list_del_init(&wait->entry); 1133 1134 rcu_read_lock(); 1135 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1136 struct io_wqe_acct *acct = &wqe->acct[i]; 1137 1138 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1139 io_wqe_activate_free_worker(wqe, acct); 1140 } 1141 rcu_read_unlock(); 1142 return 1; 1143} 1144 1145struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1146{ 1147 int ret, node, i; 1148 struct io_wq *wq; 1149 1150 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1151 return ERR_PTR(-EINVAL); 1152 if (WARN_ON_ONCE(!bounded)) 1153 return ERR_PTR(-EINVAL); 1154 1155 wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL); 1156 if (!wq) 1157 return ERR_PTR(-ENOMEM); 1158 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1159 if (ret) 1160 goto err_wq; 1161 1162 refcount_inc(&data->hash->refs); 1163 wq->hash = data->hash; 1164 wq->free_work = data->free_work; 1165 wq->do_work = data->do_work; 1166 1167 ret = -ENOMEM; 1168 for_each_node(node) { 1169 struct io_wqe *wqe; 1170 int alloc_node = node; 1171 1172 if (!node_online(alloc_node)) 1173 alloc_node = NUMA_NO_NODE; 1174 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); 1175 if (!wqe) 1176 goto err; 1177 if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL)) 1178 goto err; 1179 cpumask_copy(wqe->cpu_mask, cpumask_of_node(node)); 1180 wq->wqes[node] = wqe; 1181 wqe->node = alloc_node; 1182 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1183 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1184 task_rlimit(current, RLIMIT_NPROC); 1185 INIT_LIST_HEAD(&wqe->wait.entry); 1186 wqe->wait.func = io_wqe_hash_wake; 1187 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1188 struct io_wqe_acct *acct = &wqe->acct[i]; 1189 1190 acct->index = i; 1191 atomic_set(&acct->nr_running, 0); 1192 INIT_WQ_LIST(&acct->work_list); 1193 } 1194 wqe->wq = wq; 1195 raw_spin_lock_init(&wqe->lock); 1196 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); 1197 INIT_LIST_HEAD(&wqe->all_list); 1198 } 1199 1200 wq->task = get_task_struct(data->task); 1201 atomic_set(&wq->worker_refs, 1); 1202 init_completion(&wq->worker_done); 1203 return wq; 1204err: 1205 io_wq_put_hash(data->hash); 1206 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1207 for_each_node(node) { 1208 if (!wq->wqes[node]) 1209 continue; 1210 free_cpumask_var(wq->wqes[node]->cpu_mask); 1211 kfree(wq->wqes[node]); 1212 } 1213err_wq: 1214 kfree(wq); 1215 return ERR_PTR(ret); 1216} 1217 1218static bool io_task_work_match(struct callback_head *cb, void *data) 1219{ 1220 struct io_worker *worker; 1221 1222 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1223 return false; 1224 worker = container_of(cb, struct io_worker, create_work); 1225 return worker->wqe->wq == data; 1226} 1227 1228void io_wq_exit_start(struct io_wq *wq) 1229{ 1230 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1231} 1232 1233static void io_wq_cancel_tw_create(struct io_wq *wq) 1234{ 1235 struct callback_head *cb; 1236 1237 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1238 struct io_worker *worker; 1239 1240 worker = container_of(cb, struct io_worker, create_work); 1241 io_worker_cancel_cb(worker); 1242 } 1243} 1244 1245static void io_wq_exit_workers(struct io_wq *wq) 1246{ 1247 int node; 1248 1249 if (!wq->task) 1250 return; 1251 1252 io_wq_cancel_tw_create(wq); 1253 1254 rcu_read_lock(); 1255 for_each_node(node) { 1256 struct io_wqe *wqe = wq->wqes[node]; 1257 1258 io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL); 1259 } 1260 rcu_read_unlock(); 1261 io_worker_ref_put(wq); 1262 wait_for_completion(&wq->worker_done); 1263 1264 for_each_node(node) { 1265 spin_lock_irq(&wq->hash->wait.lock); 1266 list_del_init(&wq->wqes[node]->wait.entry); 1267 spin_unlock_irq(&wq->hash->wait.lock); 1268 } 1269 put_task_struct(wq->task); 1270 wq->task = NULL; 1271} 1272 1273static void io_wq_destroy(struct io_wq *wq) 1274{ 1275 int node; 1276 1277 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1278 1279 for_each_node(node) { 1280 struct io_wqe *wqe = wq->wqes[node]; 1281 struct io_cb_cancel_data match = { 1282 .fn = io_wq_work_match_all, 1283 .cancel_all = true, 1284 }; 1285 raw_spin_lock(&wqe->lock); 1286 io_wqe_cancel_pending_work(wqe, &match); 1287 raw_spin_unlock(&wqe->lock); 1288 free_cpumask_var(wqe->cpu_mask); 1289 kfree(wqe); 1290 } 1291 io_wq_put_hash(wq->hash); 1292 kfree(wq); 1293} 1294 1295void io_wq_put_and_exit(struct io_wq *wq) 1296{ 1297 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1298 1299 io_wq_exit_workers(wq); 1300 io_wq_destroy(wq); 1301} 1302 1303struct online_data { 1304 unsigned int cpu; 1305 bool online; 1306}; 1307 1308static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1309{ 1310 struct online_data *od = data; 1311 1312 if (od->online) 1313 cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask); 1314 else 1315 cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask); 1316 return false; 1317} 1318 1319static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1320{ 1321 struct online_data od = { 1322 .cpu = cpu, 1323 .online = online 1324 }; 1325 int i; 1326 1327 rcu_read_lock(); 1328 for_each_node(i) 1329 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od); 1330 rcu_read_unlock(); 1331 return 0; 1332} 1333 1334static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1335{ 1336 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1337 1338 return __io_wq_cpu_online(wq, cpu, true); 1339} 1340 1341static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1342{ 1343 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1344 1345 return __io_wq_cpu_online(wq, cpu, false); 1346} 1347 1348int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask) 1349{ 1350 int i; 1351 1352 rcu_read_lock(); 1353 for_each_node(i) { 1354 struct io_wqe *wqe = wq->wqes[i]; 1355 1356 if (mask) 1357 cpumask_copy(wqe->cpu_mask, mask); 1358 else 1359 cpumask_copy(wqe->cpu_mask, cpumask_of_node(i)); 1360 } 1361 rcu_read_unlock(); 1362 return 0; 1363} 1364 1365/* 1366 * Set max number of unbounded workers, returns old value. If new_count is 0, 1367 * then just return the old value. 1368 */ 1369int io_wq_max_workers(struct io_wq *wq, int *new_count) 1370{ 1371 int prev[IO_WQ_ACCT_NR]; 1372 bool first_node = true; 1373 int i, node; 1374 1375 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1376 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1377 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1378 1379 for (i = 0; i < 2; i++) { 1380 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1381 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1382 } 1383 1384 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1385 prev[i] = 0; 1386 1387 rcu_read_lock(); 1388 for_each_node(node) { 1389 struct io_wqe *wqe = wq->wqes[node]; 1390 struct io_wqe_acct *acct; 1391 1392 raw_spin_lock(&wqe->lock); 1393 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1394 acct = &wqe->acct[i]; 1395 if (first_node) 1396 prev[i] = max_t(int, acct->max_workers, prev[i]); 1397 if (new_count[i]) 1398 acct->max_workers = new_count[i]; 1399 } 1400 raw_spin_unlock(&wqe->lock); 1401 first_node = false; 1402 } 1403 rcu_read_unlock(); 1404 1405 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1406 new_count[i] = prev[i]; 1407 1408 return 0; 1409} 1410 1411static __init int io_wq_init(void) 1412{ 1413 int ret; 1414 1415 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1416 io_wq_cpu_online, io_wq_cpu_offline); 1417 if (ret < 0) 1418 return ret; 1419 io_wq_online = ret; 1420 return 0; 1421} 1422subsys_initcall(io_wq_init);