Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge tag 'for-6.2/io_uring-next-2022-12-08' of git://git.kernel.dk/linux

Pull io_uring updates part two from Jens Axboe:

- Misc fixes (me, Lin)

- Series from Pavel extending the single task exclusive ring mode,
yielding nice improvements for the common case of having a single
ring per thread (Pavel)

- Cleanup for MSG_RING, removing our IOPOLL hack (Pavel)

- Further poll cleanups and fixes (Pavel)

- Misc cleanups and fixes (Pavel)

* tag 'for-6.2/io_uring-next-2022-12-08' of git://git.kernel.dk/linux: (22 commits)
io_uring/msg_ring: flag target ring as having task_work, if needed
io_uring: skip spinlocking for ->task_complete
io_uring: do msg_ring in target task via tw
io_uring: extract a io_msg_install_complete helper
io_uring: get rid of double locking
io_uring: never run tw and fallback in parallel
io_uring: use tw for putting rsrc
io_uring: force multishot CQEs into task context
io_uring: complete all requests in task context
io_uring: don't check overflow flush failures
io_uring: skip overflow CQE posting for dying ring
io_uring: improve io_double_lock_ctx fail handling
io_uring: dont remove file from msg_ring reqs
io_uring: reshuffle issue_flags
io_uring: don't reinstall quiesce node for each tw
io_uring: improve rsrc quiesce refs checks
io_uring: don't raw spin unlock to match cq_lock
io_uring: combine poll tw handlers
io_uring: improve poll warning handling
io_uring: remove ctx variable in io_poll_check_events
...

+370 -197
+7 -6
include/linux/io_uring.h
··· 9 9 enum io_uring_cmd_flags { 10 10 IO_URING_F_COMPLETE_DEFER = 1, 11 11 IO_URING_F_UNLOCKED = 2, 12 + /* the request is executed from poll, it should not be freed */ 13 + IO_URING_F_MULTISHOT = 4, 14 + /* executed by io-wq */ 15 + IO_URING_F_IOWQ = 8, 12 16 /* int's last bit, sign checks are usually faster than a bit test */ 13 17 IO_URING_F_NONBLOCK = INT_MIN, 14 18 15 19 /* ctx state flags, for URING_CMD */ 16 - IO_URING_F_SQE128 = 4, 17 - IO_URING_F_CQE32 = 8, 18 - IO_URING_F_IOPOLL = 16, 19 - 20 - /* the request is executed from poll, it should not be freed */ 21 - IO_URING_F_MULTISHOT = 32, 20 + IO_URING_F_SQE128 = (1 << 8), 21 + IO_URING_F_CQE32 = (1 << 9), 22 + IO_URING_F_IOPOLL = (1 << 10), 22 23 }; 23 24 24 25 struct io_uring_cmd {
+3
include/linux/io_uring_types.h
··· 208 208 unsigned int drain_disabled: 1; 209 209 unsigned int has_evfd: 1; 210 210 unsigned int syscall_iopoll: 1; 211 + /* all CQEs should be posted only by the submitter task */ 212 + unsigned int task_complete: 1; 211 213 } ____cacheline_aligned_in_smp; 212 214 213 215 /* submission data */ ··· 328 326 struct io_rsrc_data *buf_data; 329 327 330 328 struct delayed_work rsrc_put_work; 329 + struct callback_head rsrc_put_tw; 331 330 struct llist_head rsrc_put_llist; 332 331 struct list_head rsrc_ref_list; 333 332 spinlock_t rsrc_ref_lock;
+110 -57
io_uring/io_uring.c
··· 149 149 static void io_queue_sqe(struct io_kiocb *req); 150 150 static void io_move_task_work_from_local(struct io_ring_ctx *ctx); 151 151 static void __io_submit_flush_completions(struct io_ring_ctx *ctx); 152 + static __cold void io_fallback_tw(struct io_uring_task *tctx); 152 153 153 154 static struct kmem_cache *req_cachep; 154 155 ··· 327 326 spin_lock_init(&ctx->rsrc_ref_lock); 328 327 INIT_LIST_HEAD(&ctx->rsrc_ref_list); 329 328 INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work); 329 + init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw); 330 330 init_llist_head(&ctx->rsrc_put_llist); 331 331 init_llist_head(&ctx->work_llist); 332 332 INIT_LIST_HEAD(&ctx->tctx_list); ··· 584 582 io_eventfd_flush_signal(ctx); 585 583 } 586 584 585 + static inline void __io_cq_lock(struct io_ring_ctx *ctx) 586 + __acquires(ctx->completion_lock) 587 + { 588 + if (!ctx->task_complete) 589 + spin_lock(&ctx->completion_lock); 590 + } 591 + 592 + static inline void __io_cq_unlock(struct io_ring_ctx *ctx) 593 + { 594 + if (!ctx->task_complete) 595 + spin_unlock(&ctx->completion_lock); 596 + } 597 + 587 598 /* keep it inlined for io_submit_flush_completions() */ 588 - static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx) 599 + static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) 589 600 __releases(ctx->completion_lock) 590 601 { 591 602 io_commit_cqring(ctx); 592 - spin_unlock(&ctx->completion_lock); 593 - 603 + __io_cq_unlock(ctx); 594 604 io_commit_cqring_flush(ctx); 595 605 io_cqring_wake(ctx); 596 606 } ··· 610 596 void io_cq_unlock_post(struct io_ring_ctx *ctx) 611 597 __releases(ctx->completion_lock) 612 598 { 613 - io_cq_unlock_post_inline(ctx); 599 + io_commit_cqring(ctx); 600 + spin_unlock(&ctx->completion_lock); 601 + io_commit_cqring_flush(ctx); 602 + io_cqring_wake(ctx); 614 603 } 615 604 616 605 /* Returns true if there are no backlogged entries after the flush */ 617 - static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) 606 + static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) 618 607 { 619 - bool all_flushed; 608 + struct io_overflow_cqe *ocqe; 609 + LIST_HEAD(list); 610 + 611 + io_cq_lock(ctx); 612 + list_splice_init(&ctx->cq_overflow_list, &list); 613 + clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); 614 + io_cq_unlock(ctx); 615 + 616 + while (!list_empty(&list)) { 617 + ocqe = list_first_entry(&list, struct io_overflow_cqe, list); 618 + list_del(&ocqe->list); 619 + kfree(ocqe); 620 + } 621 + } 622 + 623 + /* Returns true if there are no backlogged entries after the flush */ 624 + static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx) 625 + { 620 626 size_t cqe_size = sizeof(struct io_uring_cqe); 621 627 622 - if (!force && __io_cqring_events(ctx) == ctx->cq_entries) 623 - return false; 628 + if (__io_cqring_events(ctx) == ctx->cq_entries) 629 + return; 624 630 625 631 if (ctx->flags & IORING_SETUP_CQE32) 626 632 cqe_size <<= 1; ··· 650 616 struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true); 651 617 struct io_overflow_cqe *ocqe; 652 618 653 - if (!cqe && !force) 619 + if (!cqe) 654 620 break; 655 621 ocqe = list_first_entry(&ctx->cq_overflow_list, 656 622 struct io_overflow_cqe, list); 657 - if (cqe) 658 - memcpy(cqe, &ocqe->cqe, cqe_size); 659 - else 660 - io_account_cq_overflow(ctx); 661 - 623 + memcpy(cqe, &ocqe->cqe, cqe_size); 662 624 list_del(&ocqe->list); 663 625 kfree(ocqe); 664 626 } 665 627 666 - all_flushed = list_empty(&ctx->cq_overflow_list); 667 - if (all_flushed) { 628 + if (list_empty(&ctx->cq_overflow_list)) { 668 629 clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); 669 630 atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags); 670 631 } 671 - 672 632 io_cq_unlock_post(ctx); 673 - return all_flushed; 674 633 } 675 634 676 - static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx) 635 + static void io_cqring_overflow_flush(struct io_ring_ctx *ctx) 677 636 { 678 - bool ret = true; 679 - 680 637 if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { 681 638 /* iopoll syncs against uring_lock, not completion_lock */ 682 639 if (ctx->flags & IORING_SETUP_IOPOLL) 683 640 mutex_lock(&ctx->uring_lock); 684 - ret = __io_cqring_overflow_flush(ctx, false); 641 + __io_cqring_overflow_flush(ctx); 685 642 if (ctx->flags & IORING_SETUP_IOPOLL) 686 643 mutex_unlock(&ctx->uring_lock); 687 644 } 688 - 689 - return ret; 690 645 } 691 646 692 647 void __io_put_task(struct task_struct *task, int nr) ··· 800 777 return &rings->cqes[off]; 801 778 } 802 779 803 - static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, 804 - bool allow_overflow) 780 + static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, 781 + u32 cflags) 805 782 { 806 783 struct io_uring_cqe *cqe; 807 784 808 - lockdep_assert_held(&ctx->completion_lock); 785 + if (!ctx->task_complete) 786 + lockdep_assert_held(&ctx->completion_lock); 809 787 810 788 ctx->cq_extra++; 811 789 ··· 829 805 } 830 806 return true; 831 807 } 832 - 833 - if (allow_overflow) 834 - return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); 835 - 836 808 return false; 837 809 } 838 810 ··· 842 822 for (i = 0; i < state->cqes_count; i++) { 843 823 struct io_uring_cqe *cqe = &state->cqes[i]; 844 824 845 - io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags, true); 825 + if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) { 826 + if (ctx->task_complete) { 827 + spin_lock(&ctx->completion_lock); 828 + io_cqring_event_overflow(ctx, cqe->user_data, 829 + cqe->res, cqe->flags, 0, 0); 830 + spin_unlock(&ctx->completion_lock); 831 + } else { 832 + io_cqring_event_overflow(ctx, cqe->user_data, 833 + cqe->res, cqe->flags, 0, 0); 834 + } 835 + } 846 836 } 847 837 state->cqes_count = 0; 848 838 } ··· 863 833 bool filled; 864 834 865 835 io_cq_lock(ctx); 866 - filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow); 836 + filled = io_fill_cqe_aux(ctx, user_data, res, cflags); 837 + if (!filled && allow_overflow) 838 + filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); 839 + 867 840 io_cq_unlock_post(ctx); 868 841 return filled; 869 842 } ··· 890 857 lockdep_assert_held(&ctx->uring_lock); 891 858 892 859 if (ctx->submit_state.cqes_count == length) { 893 - io_cq_lock(ctx); 860 + __io_cq_lock(ctx); 894 861 __io_flush_post_cqes(ctx); 895 862 /* no need to flush - flush is deferred */ 896 - spin_unlock(&ctx->completion_lock); 863 + __io_cq_unlock_post(ctx); 897 864 } 898 865 899 866 /* For defered completions this is not as strict as it is otherwise, ··· 948 915 949 916 void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) 950 917 { 951 - if (!(issue_flags & IO_URING_F_UNLOCKED) || 952 - !(req->ctx->flags & IORING_SETUP_IOPOLL)) { 918 + if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) { 919 + req->io_task_work.func = io_req_task_complete; 920 + io_req_task_work_add(req); 921 + } else if (!(issue_flags & IO_URING_F_UNLOCKED) || 922 + !(req->ctx->flags & IORING_SETUP_IOPOLL)) { 953 923 __io_req_complete_post(req); 954 924 } else { 955 925 struct io_ring_ctx *ctx = req->ctx; ··· 1175 1139 struct io_uring_task *tctx = container_of(cb, struct io_uring_task, 1176 1140 task_work); 1177 1141 struct llist_node fake = {}; 1178 - struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake); 1142 + struct llist_node *node; 1179 1143 unsigned int loops = 1; 1180 - unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL); 1144 + unsigned int count; 1181 1145 1146 + if (unlikely(current->flags & PF_EXITING)) { 1147 + io_fallback_tw(tctx); 1148 + return; 1149 + } 1150 + 1151 + node = io_llist_xchg(&tctx->task_list, &fake); 1152 + count = handle_tw_list(node, &ctx, &uring_locked, NULL); 1182 1153 node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL); 1183 1154 while (node != &fake) { 1184 1155 loops++; ··· 1428 1385 struct io_wq_work_node *node, *prev; 1429 1386 struct io_submit_state *state = &ctx->submit_state; 1430 1387 1431 - io_cq_lock(ctx); 1388 + __io_cq_lock(ctx); 1432 1389 /* must come first to preserve CQE ordering in failure cases */ 1433 1390 if (state->cqes_count) 1434 1391 __io_flush_post_cqes(ctx); ··· 1436 1393 struct io_kiocb *req = container_of(node, struct io_kiocb, 1437 1394 comp_list); 1438 1395 1439 - if (!(req->flags & REQ_F_CQE_SKIP)) 1440 - __io_fill_cqe_req(ctx, req); 1396 + if (!(req->flags & REQ_F_CQE_SKIP) && 1397 + unlikely(!__io_fill_cqe_req(ctx, req))) { 1398 + if (ctx->task_complete) { 1399 + spin_lock(&ctx->completion_lock); 1400 + io_req_cqe_overflow(req); 1401 + spin_unlock(&ctx->completion_lock); 1402 + } else { 1403 + io_req_cqe_overflow(req); 1404 + } 1405 + } 1441 1406 } 1442 - io_cq_unlock_post_inline(ctx); 1407 + __io_cq_unlock_post(ctx); 1443 1408 1444 1409 if (!wq_list_empty(&ctx->submit_state.compl_reqs)) { 1445 1410 io_free_batch_list(ctx, state->compl_reqs.first); ··· 1518 1467 check_cq = READ_ONCE(ctx->check_cq); 1519 1468 if (unlikely(check_cq)) { 1520 1469 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) 1521 - __io_cqring_overflow_flush(ctx, false); 1470 + __io_cqring_overflow_flush(ctx); 1522 1471 /* 1523 1472 * Similarly do not spin if we have not informed the user of any 1524 1473 * dropped CQE. ··· 1850 1799 return ret; 1851 1800 1852 1801 /* If the op doesn't have a file, we're not polling for it */ 1853 - if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file) 1802 + if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue) 1854 1803 io_iopoll_req_issued(req, issue_flags); 1855 1804 1856 1805 return 0; ··· 1859 1808 int io_poll_issue(struct io_kiocb *req, bool *locked) 1860 1809 { 1861 1810 io_tw_lock(req->ctx, locked); 1862 - if (unlikely(req->task->flags & PF_EXITING)) 1863 - return -EFAULT; 1864 1811 return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT| 1865 1812 IO_URING_F_COMPLETE_DEFER); 1866 1813 } ··· 1875 1826 { 1876 1827 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1877 1828 const struct io_op_def *def = &io_op_defs[req->opcode]; 1878 - unsigned int issue_flags = IO_URING_F_UNLOCKED; 1829 + unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ; 1879 1830 bool needs_poll = false; 1880 1831 int ret = 0, err = -ECANCELED; 1881 1832 ··· 2531 2482 2532 2483 trace_io_uring_cqring_wait(ctx, min_events); 2533 2484 do { 2534 - /* if we can't even flush overflow, don't wait for more */ 2535 - if (!io_cqring_overflow_flush(ctx)) { 2536 - ret = -EBUSY; 2537 - break; 2538 - } 2485 + io_cqring_overflow_flush(ctx); 2539 2486 prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, 2540 2487 TASK_INTERRUPTIBLE); 2541 2488 ret = io_cqring_wait_schedule(ctx, &iowq, timeout); ··· 2682 2637 __io_sqe_buffers_unregister(ctx); 2683 2638 if (ctx->file_data) 2684 2639 __io_sqe_files_unregister(ctx); 2685 - if (ctx->rings) 2686 - __io_cqring_overflow_flush(ctx, true); 2640 + io_cqring_overflow_kill(ctx); 2687 2641 io_eventfd_unregister(ctx); 2688 2642 io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free); 2689 2643 io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); ··· 2825 2781 * as nobody else will be looking for them. 2826 2782 */ 2827 2783 do { 2784 + if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { 2785 + mutex_lock(&ctx->uring_lock); 2786 + io_cqring_overflow_kill(ctx); 2787 + mutex_unlock(&ctx->uring_lock); 2788 + } 2789 + 2828 2790 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) 2829 2791 io_move_task_work_from_local(ctx); 2830 2792 ··· 2896 2846 2897 2847 mutex_lock(&ctx->uring_lock); 2898 2848 percpu_ref_kill(&ctx->refs); 2899 - if (ctx->rings) 2900 - __io_cqring_overflow_flush(ctx, true); 2901 2849 xa_for_each(&ctx->personalities, index, creds) 2902 2850 io_unregister_personality(ctx, index); 2903 2851 if (ctx->rings) ··· 3536 3488 ctx = io_ring_ctx_alloc(p); 3537 3489 if (!ctx) 3538 3490 return -ENOMEM; 3491 + 3492 + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && 3493 + !(ctx->flags & IORING_SETUP_IOPOLL) && 3494 + !(ctx->flags & IORING_SETUP_SQPOLL)) 3495 + ctx->task_complete = true; 3539 3496 3540 3497 /* 3541 3498 * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
+14 -1
io_uring/io_uring.h
··· 93 93 spin_lock(&ctx->completion_lock); 94 94 } 95 95 96 + static inline void io_cq_unlock(struct io_ring_ctx *ctx) 97 + { 98 + spin_unlock(&ctx->completion_lock); 99 + } 100 + 96 101 void io_cq_unlock_post(struct io_ring_ctx *ctx); 97 102 98 103 static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx, ··· 133 128 */ 134 129 cqe = io_get_cqe(ctx); 135 130 if (unlikely(!cqe)) 136 - return io_req_cqe_overflow(req); 131 + return false; 137 132 138 133 trace_io_uring_complete(req->ctx, req, req->cqe.user_data, 139 134 req->cqe.res, req->cqe.flags, ··· 154 149 WRITE_ONCE(cqe->big_cqe[1], extra2); 155 150 } 156 151 return true; 152 + } 153 + 154 + static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx, 155 + struct io_kiocb *req) 156 + { 157 + if (likely(__io_fill_cqe_req(ctx, req))) 158 + return true; 159 + return io_req_cqe_overflow(req); 157 160 } 158 161 159 162 static inline void req_set_fail(struct io_kiocb *req)
+118 -48
io_uring/msg_ring.c
··· 15 15 16 16 struct io_msg { 17 17 struct file *file; 18 + struct file *src_file; 19 + struct callback_head tw; 18 20 u64 user_data; 19 21 u32 len; 20 22 u32 cmd; ··· 24 22 u32 dst_fd; 25 23 u32 flags; 26 24 }; 25 + 26 + void io_msg_ring_cleanup(struct io_kiocb *req) 27 + { 28 + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); 29 + 30 + if (WARN_ON_ONCE(!msg->src_file)) 31 + return; 32 + 33 + fput(msg->src_file); 34 + msg->src_file = NULL; 35 + } 36 + 37 + static void io_msg_tw_complete(struct callback_head *head) 38 + { 39 + struct io_msg *msg = container_of(head, struct io_msg, tw); 40 + struct io_kiocb *req = cmd_to_io_kiocb(msg); 41 + struct io_ring_ctx *target_ctx = req->file->private_data; 42 + int ret = 0; 43 + 44 + if (current->flags & PF_EXITING) 45 + ret = -EOWNERDEAD; 46 + else if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) 47 + ret = -EOVERFLOW; 48 + 49 + if (ret < 0) 50 + req_set_fail(req); 51 + io_req_queue_tw_complete(req, ret); 52 + } 27 53 28 54 static int io_msg_ring_data(struct io_kiocb *req) 29 55 { ··· 61 31 if (msg->src_fd || msg->dst_fd || msg->flags) 62 32 return -EINVAL; 63 33 34 + if (target_ctx->task_complete && current != target_ctx->submitter_task) { 35 + init_task_work(&msg->tw, io_msg_tw_complete); 36 + if (task_work_add(target_ctx->submitter_task, &msg->tw, 37 + TWA_SIGNAL_NO_IPI)) 38 + return -EOWNERDEAD; 39 + 40 + atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags); 41 + return IOU_ISSUE_SKIP_COMPLETE; 42 + } 43 + 64 44 if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) 65 45 return 0; 66 46 67 47 return -EOVERFLOW; 68 48 } 69 49 70 - static void io_double_unlock_ctx(struct io_ring_ctx *ctx, 71 - struct io_ring_ctx *octx, 50 + static void io_double_unlock_ctx(struct io_ring_ctx *octx, 72 51 unsigned int issue_flags) 73 52 { 74 - if (issue_flags & IO_URING_F_UNLOCKED) 75 - mutex_unlock(&ctx->uring_lock); 76 53 mutex_unlock(&octx->uring_lock); 77 54 } 78 55 79 - static int io_double_lock_ctx(struct io_ring_ctx *ctx, 80 - struct io_ring_ctx *octx, 56 + static int io_double_lock_ctx(struct io_ring_ctx *octx, 81 57 unsigned int issue_flags) 82 58 { 83 59 /* ··· 96 60 return -EAGAIN; 97 61 return 0; 98 62 } 99 - 100 - /* Always grab smallest value ctx first. We know ctx != octx. */ 101 - if (ctx < octx) { 102 - mutex_lock(&ctx->uring_lock); 103 - mutex_lock(&octx->uring_lock); 104 - } else { 105 - mutex_lock(&octx->uring_lock); 106 - mutex_lock(&ctx->uring_lock); 107 - } 108 - 63 + mutex_lock(&octx->uring_lock); 109 64 return 0; 110 65 } 111 66 112 - static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) 67 + static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags) 68 + { 69 + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); 70 + struct io_ring_ctx *ctx = req->ctx; 71 + struct file *file = NULL; 72 + unsigned long file_ptr; 73 + int idx = msg->src_fd; 74 + 75 + io_ring_submit_lock(ctx, issue_flags); 76 + if (likely(idx < ctx->nr_user_files)) { 77 + idx = array_index_nospec(idx, ctx->nr_user_files); 78 + file_ptr = io_fixed_file_slot(&ctx->file_table, idx)->file_ptr; 79 + file = (struct file *) (file_ptr & FFS_MASK); 80 + if (file) 81 + get_file(file); 82 + } 83 + io_ring_submit_unlock(ctx, issue_flags); 84 + return file; 85 + } 86 + 87 + static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flags) 113 88 { 114 89 struct io_ring_ctx *target_ctx = req->file->private_data; 115 90 struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); 116 - struct io_ring_ctx *ctx = req->ctx; 117 - unsigned long file_ptr; 118 - struct file *src_file; 91 + struct file *src_file = msg->src_file; 119 92 int ret; 120 93 121 - if (target_ctx == ctx) 122 - return -EINVAL; 123 - 124 - ret = io_double_lock_ctx(ctx, target_ctx, issue_flags); 125 - if (unlikely(ret)) 126 - return ret; 127 - 128 - ret = -EBADF; 129 - if (unlikely(msg->src_fd >= ctx->nr_user_files)) 130 - goto out_unlock; 131 - 132 - msg->src_fd = array_index_nospec(msg->src_fd, ctx->nr_user_files); 133 - file_ptr = io_fixed_file_slot(&ctx->file_table, msg->src_fd)->file_ptr; 134 - if (!file_ptr) 135 - goto out_unlock; 136 - 137 - src_file = (struct file *) (file_ptr & FFS_MASK); 138 - get_file(src_file); 94 + if (unlikely(io_double_lock_ctx(target_ctx, issue_flags))) 95 + return -EAGAIN; 139 96 140 97 ret = __io_fixed_fd_install(target_ctx, src_file, msg->dst_fd); 141 - if (ret < 0) { 142 - fput(src_file); 98 + if (ret < 0) 143 99 goto out_unlock; 144 - } 100 + 101 + msg->src_file = NULL; 102 + req->flags &= ~REQ_F_NEED_CLEANUP; 145 103 146 104 if (msg->flags & IORING_MSG_RING_CQE_SKIP) 147 105 goto out_unlock; 148 - 149 106 /* 150 107 * If this fails, the target still received the file descriptor but 151 108 * wasn't notified of the fact. This means that if this request ··· 148 119 if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0)) 149 120 ret = -EOVERFLOW; 150 121 out_unlock: 151 - io_double_unlock_ctx(ctx, target_ctx, issue_flags); 122 + io_double_unlock_ctx(target_ctx, issue_flags); 152 123 return ret; 124 + } 125 + 126 + static void io_msg_tw_fd_complete(struct callback_head *head) 127 + { 128 + struct io_msg *msg = container_of(head, struct io_msg, tw); 129 + struct io_kiocb *req = cmd_to_io_kiocb(msg); 130 + int ret = -EOWNERDEAD; 131 + 132 + if (!(current->flags & PF_EXITING)) 133 + ret = io_msg_install_complete(req, IO_URING_F_UNLOCKED); 134 + if (ret < 0) 135 + req_set_fail(req); 136 + io_req_queue_tw_complete(req, ret); 137 + } 138 + 139 + static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) 140 + { 141 + struct io_ring_ctx *target_ctx = req->file->private_data; 142 + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); 143 + struct io_ring_ctx *ctx = req->ctx; 144 + struct file *src_file = msg->src_file; 145 + 146 + if (target_ctx == ctx) 147 + return -EINVAL; 148 + if (!src_file) { 149 + src_file = io_msg_grab_file(req, issue_flags); 150 + if (!src_file) 151 + return -EBADF; 152 + msg->src_file = src_file; 153 + req->flags |= REQ_F_NEED_CLEANUP; 154 + } 155 + 156 + if (target_ctx->task_complete && current != target_ctx->submitter_task) { 157 + init_task_work(&msg->tw, io_msg_tw_fd_complete); 158 + if (task_work_add(target_ctx->submitter_task, &msg->tw, 159 + TWA_SIGNAL)) 160 + return -EOWNERDEAD; 161 + 162 + return IOU_ISSUE_SKIP_COMPLETE; 163 + } 164 + return io_msg_install_complete(req, issue_flags); 153 165 } 154 166 155 167 int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) ··· 200 130 if (unlikely(sqe->buf_index || sqe->personality)) 201 131 return -EINVAL; 202 132 133 + msg->src_file = NULL; 203 134 msg->user_data = READ_ONCE(sqe->off); 204 135 msg->len = READ_ONCE(sqe->len); 205 136 msg->cmd = READ_ONCE(sqe->addr); ··· 235 164 } 236 165 237 166 done: 238 - if (ret < 0) 167 + if (ret < 0) { 168 + if (ret == -EAGAIN || ret == IOU_ISSUE_SKIP_COMPLETE) 169 + return ret; 239 170 req_set_fail(req); 171 + } 240 172 io_req_set_res(req, ret, 0); 241 - /* put file to avoid an attempt to IOPOLL the req */ 242 - if (!(req->flags & REQ_F_FIXED_FILE)) 243 - io_put_file(req->file); 244 - req->file = NULL; 245 173 return IOU_OK; 246 174 }
+1
io_uring/msg_ring.h
··· 2 2 3 3 int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); 4 4 int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags); 5 + void io_msg_ring_cleanup(struct io_kiocb *req);
+21
io_uring/net.c
··· 67 67 struct io_kiocb *notif; 68 68 }; 69 69 70 + static inline bool io_check_multishot(struct io_kiocb *req, 71 + unsigned int issue_flags) 72 + { 73 + /* 74 + * When ->locked_cq is set we only allow to post CQEs from the original 75 + * task context. Usual request completions will be handled in other 76 + * generic paths but multipoll may decide to post extra cqes. 77 + */ 78 + return !(issue_flags & IO_URING_F_IOWQ) || 79 + !(issue_flags & IO_URING_F_MULTISHOT) || 80 + !req->ctx->task_complete; 81 + } 82 + 70 83 int io_shutdown_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 71 84 { 72 85 struct io_shutdown *shutdown = io_kiocb_to_cmd(req, struct io_shutdown); ··· 743 730 (sr->flags & IORING_RECVSEND_POLL_FIRST)) 744 731 return io_setup_async_msg(req, kmsg, issue_flags); 745 732 733 + if (!io_check_multishot(req, issue_flags)) 734 + return io_setup_async_msg(req, kmsg, issue_flags); 735 + 746 736 retry_multishot: 747 737 if (io_do_buffer_select(req)) { 748 738 void __user *buf; ··· 843 827 844 828 if (!(req->flags & REQ_F_POLLED) && 845 829 (sr->flags & IORING_RECVSEND_POLL_FIRST)) 830 + return -EAGAIN; 831 + 832 + if (!io_check_multishot(req, issue_flags)) 846 833 return -EAGAIN; 847 834 848 835 sock = sock_from_file(req->file); ··· 1299 1280 struct file *file; 1300 1281 int ret, fd; 1301 1282 1283 + if (!io_check_multishot(req, issue_flags)) 1284 + return -EAGAIN; 1302 1285 retry: 1303 1286 if (!fixed) { 1304 1287 fd = __get_unused_fd_flags(accept->flags, accept->nofile);
+8
io_uring/opdef.c
··· 63 63 .audit_skip = 1, 64 64 .ioprio = 1, 65 65 .iopoll = 1, 66 + .iopoll_queue = 1, 66 67 .async_size = sizeof(struct io_async_rw), 67 68 .name = "READV", 68 69 .prep = io_prep_rw, ··· 81 80 .audit_skip = 1, 82 81 .ioprio = 1, 83 82 .iopoll = 1, 83 + .iopoll_queue = 1, 84 84 .async_size = sizeof(struct io_async_rw), 85 85 .name = "WRITEV", 86 86 .prep = io_prep_rw, ··· 105 103 .audit_skip = 1, 106 104 .ioprio = 1, 107 105 .iopoll = 1, 106 + .iopoll_queue = 1, 108 107 .async_size = sizeof(struct io_async_rw), 109 108 .name = "READ_FIXED", 110 109 .prep = io_prep_rw, ··· 121 118 .audit_skip = 1, 122 119 .ioprio = 1, 123 120 .iopoll = 1, 121 + .iopoll_queue = 1, 124 122 .async_size = sizeof(struct io_async_rw), 125 123 .name = "WRITE_FIXED", 126 124 .prep = io_prep_rw, ··· 281 277 .audit_skip = 1, 282 278 .ioprio = 1, 283 279 .iopoll = 1, 280 + .iopoll_queue = 1, 284 281 .async_size = sizeof(struct io_async_rw), 285 282 .name = "READ", 286 283 .prep = io_prep_rw, ··· 297 292 .audit_skip = 1, 298 293 .ioprio = 1, 299 294 .iopoll = 1, 295 + .iopoll_queue = 1, 300 296 .async_size = sizeof(struct io_async_rw), 301 297 .name = "WRITE", 302 298 .prep = io_prep_rw, ··· 445 439 .name = "MSG_RING", 446 440 .prep = io_msg_ring_prep, 447 441 .issue = io_msg_ring, 442 + .cleanup = io_msg_ring_cleanup, 448 443 }, 449 444 [IORING_OP_FSETXATTR] = { 450 445 .needs_file = 1, ··· 488 481 .plug = 1, 489 482 .name = "URING_CMD", 490 483 .iopoll = 1, 484 + .iopoll_queue = 1, 491 485 .async_size = uring_cmd_pdu_size(1), 492 486 .prep = io_uring_cmd_prep, 493 487 .issue = io_uring_cmd,
+2
io_uring/opdef.h
··· 25 25 unsigned ioprio : 1; 26 26 /* supports iopoll */ 27 27 unsigned iopoll : 1; 28 + /* have to be put into the iopoll list */ 29 + unsigned iopoll_queue : 1; 28 30 /* opcode specific path will handle ->async_data allocation if needed */ 29 31 unsigned manual_alloc : 1; 30 32 /* size of async data needed, if any */
+44 -54
io_uring/poll.c
··· 237 237 */ 238 238 static int io_poll_check_events(struct io_kiocb *req, bool *locked) 239 239 { 240 - struct io_ring_ctx *ctx = req->ctx; 241 240 int v, ret; 242 241 243 242 /* req->task == current here, checking PF_EXITING is safe */ ··· 246 247 do { 247 248 v = atomic_read(&req->poll_refs); 248 249 249 - /* tw handler should be the owner, and so have some references */ 250 - if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK))) 251 - return IOU_POLL_DONE; 252 - if (v & IO_POLL_CANCEL_FLAG) 253 - return -ECANCELED; 254 - /* 255 - * cqe.res contains only events of the first wake up 256 - * and all others are be lost. Redo vfs_poll() to get 257 - * up to date state. 258 - */ 259 - if ((v & IO_POLL_REF_MASK) != 1) 260 - req->cqe.res = 0; 261 - if (v & IO_POLL_RETRY_FLAG) { 262 - req->cqe.res = 0; 250 + if (unlikely(v != 1)) { 251 + /* tw should be the owner and so have some refs */ 252 + if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK))) 253 + return IOU_POLL_NO_ACTION; 254 + if (v & IO_POLL_CANCEL_FLAG) 255 + return -ECANCELED; 263 256 /* 264 - * We won't find new events that came in between 265 - * vfs_poll and the ref put unless we clear the flag 266 - * in advance. 257 + * cqe.res contains only events of the first wake up 258 + * and all others are to be lost. Redo vfs_poll() to get 259 + * up to date state. 267 260 */ 268 - atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs); 269 - v &= ~IO_POLL_RETRY_FLAG; 261 + if ((v & IO_POLL_REF_MASK) != 1) 262 + req->cqe.res = 0; 263 + 264 + if (v & IO_POLL_RETRY_FLAG) { 265 + req->cqe.res = 0; 266 + /* 267 + * We won't find new events that came in between 268 + * vfs_poll and the ref put unless we clear the 269 + * flag in advance. 270 + */ 271 + atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs); 272 + v &= ~IO_POLL_RETRY_FLAG; 273 + } 270 274 } 271 275 272 276 /* the mask was stashed in __io_poll_execute */ ··· 288 286 __poll_t mask = mangle_poll(req->cqe.res & 289 287 req->apoll_events); 290 288 291 - if (!io_aux_cqe(ctx, *locked, req->cqe.user_data, 289 + if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data, 292 290 mask, IORING_CQE_F_MORE, false)) { 293 291 io_req_set_res(req, mask, 0); 294 292 return IOU_POLL_REMOVE_POLL_USE_RES; ··· 321 319 ret = io_poll_check_events(req, locked); 322 320 if (ret == IOU_POLL_NO_ACTION) 323 321 return; 324 - 325 - if (ret == IOU_POLL_DONE) { 326 - struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll); 327 - req->cqe.res = mangle_poll(req->cqe.res & poll->events); 328 - } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) { 329 - req->cqe.res = ret; 330 - req_set_fail(req); 331 - } 332 - 333 322 io_poll_remove_entries(req); 334 323 io_poll_tw_hash_eject(req, locked); 335 324 336 - io_req_set_res(req, req->cqe.res, 0); 337 - io_req_task_complete(req, locked); 338 - } 325 + if (req->opcode == IORING_OP_POLL_ADD) { 326 + if (ret == IOU_POLL_DONE) { 327 + struct io_poll *poll; 339 328 340 - static void io_apoll_task_func(struct io_kiocb *req, bool *locked) 341 - { 342 - int ret; 329 + poll = io_kiocb_to_cmd(req, struct io_poll); 330 + req->cqe.res = mangle_poll(req->cqe.res & poll->events); 331 + } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) { 332 + req->cqe.res = ret; 333 + req_set_fail(req); 334 + } 343 335 344 - ret = io_poll_check_events(req, locked); 345 - if (ret == IOU_POLL_NO_ACTION) 346 - return; 347 - 348 - io_tw_lock(req->ctx, locked); 349 - io_poll_remove_entries(req); 350 - io_poll_tw_hash_eject(req, locked); 351 - 352 - if (ret == IOU_POLL_REMOVE_POLL_USE_RES) 336 + io_req_set_res(req, req->cqe.res, 0); 353 337 io_req_task_complete(req, locked); 354 - else if (ret == IOU_POLL_DONE) 355 - io_req_task_submit(req, locked); 356 - else 357 - io_req_defer_failed(req, ret); 338 + } else { 339 + io_tw_lock(req->ctx, locked); 340 + 341 + if (ret == IOU_POLL_REMOVE_POLL_USE_RES) 342 + io_req_task_complete(req, locked); 343 + else if (ret == IOU_POLL_DONE) 344 + io_req_task_submit(req, locked); 345 + else 346 + io_req_defer_failed(req, ret); 347 + } 358 348 } 359 349 360 350 static void __io_poll_execute(struct io_kiocb *req, int mask) 361 351 { 362 352 io_req_set_res(req, mask, 0); 363 - 364 - if (req->opcode == IORING_OP_POLL_ADD) 365 - req->io_task_work.func = io_poll_task_func; 366 - else 367 - req->io_task_work.func = io_apoll_task_func; 353 + req->io_task_work.func = io_poll_task_func; 368 354 369 355 trace_io_uring_task_add(req, mask); 370 356 io_req_task_work_add(req);
+41 -31
io_uring/rsrc.c
··· 204 204 } 205 205 } 206 206 207 + void io_rsrc_put_tw(struct callback_head *cb) 208 + { 209 + struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx, 210 + rsrc_put_tw); 211 + 212 + io_rsrc_put_work(&ctx->rsrc_put_work.work); 213 + } 214 + 207 215 void io_wait_rsrc_data(struct io_rsrc_data *data) 208 216 { 209 217 if (data && !atomic_dec_and_test(&data->refs)) ··· 250 242 } 251 243 spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags); 252 244 253 - if (first_add) 254 - mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay); 245 + if (!first_add) 246 + return; 247 + 248 + if (ctx->submitter_task) { 249 + if (!task_work_add(ctx->submitter_task, &ctx->rsrc_put_tw, 250 + ctx->notify_method)) 251 + return; 252 + } 253 + mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay); 255 254 } 256 255 257 256 static struct io_rsrc_node *io_rsrc_node_alloc(void) ··· 324 309 /* As we may drop ->uring_lock, other task may have started quiesce */ 325 310 if (data->quiesce) 326 311 return -ENXIO; 312 + ret = io_rsrc_node_switch_start(ctx); 313 + if (ret) 314 + return ret; 315 + io_rsrc_node_switch(ctx, data); 316 + 317 + /* kill initial ref, already quiesced if zero */ 318 + if (atomic_dec_and_test(&data->refs)) 319 + return 0; 327 320 328 321 data->quiesce = true; 322 + mutex_unlock(&ctx->uring_lock); 329 323 do { 330 - ret = io_rsrc_node_switch_start(ctx); 331 - if (ret) 332 - break; 333 - io_rsrc_node_switch(ctx, data); 334 - 335 - /* kill initial ref, already quiesced if zero */ 336 - if (atomic_dec_and_test(&data->refs)) 337 - break; 338 - mutex_unlock(&ctx->uring_lock); 339 - 340 324 ret = io_run_task_work_sig(ctx); 341 - if (ret < 0) 342 - goto reinit; 325 + if (ret < 0) { 326 + atomic_inc(&data->refs); 327 + /* wait for all works potentially completing data->done */ 328 + flush_delayed_work(&ctx->rsrc_put_work); 329 + reinit_completion(&data->done); 330 + mutex_lock(&ctx->uring_lock); 331 + break; 332 + } 343 333 344 334 flush_delayed_work(&ctx->rsrc_put_work); 345 335 ret = wait_for_completion_interruptible(&data->done); 346 336 if (!ret) { 347 337 mutex_lock(&ctx->uring_lock); 348 - if (atomic_read(&data->refs) > 0) { 349 - /* 350 - * it has been revived by another thread while 351 - * we were unlocked 352 - */ 353 - mutex_unlock(&ctx->uring_lock); 354 - } else { 338 + if (atomic_read(&data->refs) <= 0) 355 339 break; 356 - } 340 + /* 341 + * it has been revived by another thread while 342 + * we were unlocked 343 + */ 344 + mutex_unlock(&ctx->uring_lock); 357 345 } 358 - 359 - reinit: 360 - atomic_inc(&data->refs); 361 - /* wait for all works potentially completing data->done */ 362 - flush_delayed_work(&ctx->rsrc_put_work); 363 - reinit_completion(&data->done); 364 - 365 - mutex_lock(&ctx->uring_lock); 366 - } while (ret >= 0); 346 + } while (1); 367 347 data->quiesce = false; 368 348 369 349 return ret;
+1
io_uring/rsrc.h
··· 53 53 struct bio_vec bvec[]; 54 54 }; 55 55 56 + void io_rsrc_put_tw(struct callback_head *cb); 56 57 void io_rsrc_put_work(struct work_struct *work); 57 58 void io_rsrc_refs_refill(struct io_ring_ctx *ctx); 58 59 void io_wait_rsrc_data(struct io_rsrc_data *data);