Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

Merge tag 'for-5.12/io_uring-2021-02-25' of git://git.kernel.dk/linux-block

Pull more io_uring updates from Jens Axboe:
"A collection of later fixes that we should get into this release:

- Series of submission cleanups (Pavel)

- A few fixes for issues from earlier this merge window (Pavel, me)

- IOPOLL resubmission fix

- task_work locking fix (Hao)"

* tag 'for-5.12/io_uring-2021-02-25' of git://git.kernel.dk/linux-block: (25 commits)
Revert "io_uring: wait potential ->release() on resurrect"
io_uring: fix locked_free_list caches_free()
io_uring: don't attempt IO reissue from the ring exit path
io_uring: clear request count when freeing caches
io_uring: run task_work on io_uring_register()
io_uring: fix leaving invalid req->flags
io_uring: wait potential ->release() on resurrect
io_uring: keep generic rsrc infra generic
io_uring: zero ref_node after killing it
io_uring: make the !CONFIG_NET helpers a bit more robust
io_uring: don't hold uring_lock when calling io_run_task_work*
io_uring: fail io-wq submission from a task_work
io_uring: don't take uring_lock during iowq cancel
io_uring: fail links more in io_submit_sqe()
io_uring: don't do async setup for links' heads
io_uring: do io_*_prep() early in io_submit_sqe()
io_uring: split sqe-prep and async setup
io_uring: don't submit link on error
io_uring: move req link into submit_state
io_uring: move io_init_req() into io_submit_sqe()
...

+358 -352
+358 -352
fs/io_uring.c
··· 104 104 #define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \ 105 105 IORING_REGISTER_LAST + IORING_OP_LAST) 106 106 107 + #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \ 108 + IOSQE_IO_HARDLINK | IOSQE_ASYNC | \ 109 + IOSQE_BUFFER_SELECT) 110 + 107 111 struct io_uring { 108 112 u32 head ____cacheline_aligned_in_smp; 109 113 u32 tail ____cacheline_aligned_in_smp; ··· 236 232 struct fixed_rsrc_ref_node *node; 237 233 struct percpu_ref refs; 238 234 struct completion done; 235 + bool quiesce; 239 236 }; 240 237 241 238 struct io_buffer { ··· 284 279 struct list_head locked_free_list; 285 280 }; 286 281 282 + struct io_submit_link { 283 + struct io_kiocb *head; 284 + struct io_kiocb *last; 285 + }; 286 + 287 287 struct io_submit_state { 288 288 struct blk_plug plug; 289 + struct io_submit_link link; 289 290 290 291 /* 291 292 * io_kiocb alloc cache ··· 1039 1028 static void destroy_fixed_rsrc_ref_node(struct fixed_rsrc_ref_node *ref_node); 1040 1029 static struct fixed_rsrc_ref_node *alloc_fixed_rsrc_ref_node( 1041 1030 struct io_ring_ctx *ctx); 1042 - static void init_fixed_file_ref_node(struct io_ring_ctx *ctx, 1043 - struct fixed_rsrc_ref_node *ref_node); 1031 + static void io_ring_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc); 1044 1032 1045 1033 static bool io_rw_reissue(struct io_kiocb *req); 1046 1034 static void io_cqring_fill_event(struct io_kiocb *req, long res); ··· 2339 2329 struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); 2340 2330 struct io_ring_ctx *ctx = req->ctx; 2341 2331 2342 - __io_req_task_cancel(req, -ECANCELED); 2332 + mutex_lock(&ctx->uring_lock); 2333 + __io_req_task_cancel(req, req->result); 2334 + mutex_unlock(&ctx->uring_lock); 2343 2335 percpu_ref_put(&ctx->refs); 2344 2336 } 2345 2337 ··· 2376 2364 req->task_work.func = io_req_task_submit; 2377 2365 ret = io_req_task_work_add(req); 2378 2366 if (unlikely(ret)) { 2367 + req->result = -ECANCELED; 2379 2368 percpu_ref_get(&req->ctx->refs); 2380 2369 io_req_task_work_add_fallback(req, io_req_task_cancel); 2381 2370 } 2371 + } 2372 + 2373 + static void io_req_task_queue_fail(struct io_kiocb *req, int ret) 2374 + { 2375 + percpu_ref_get(&req->ctx->refs); 2376 + req->result = ret; 2377 + req->task_work.func = io_req_task_cancel; 2378 + 2379 + if (unlikely(io_req_task_work_add(req))) 2380 + io_req_task_work_add_fallback(req, io_req_task_cancel); 2382 2381 } 2383 2382 2384 2383 static inline void io_queue_next(struct io_kiocb *req) ··· 2828 2805 if (!S_ISBLK(mode) && !S_ISREG(mode)) 2829 2806 return false; 2830 2807 if ((req->flags & REQ_F_NOWAIT) || io_wq_current_is_worker()) 2808 + return false; 2809 + /* 2810 + * If ref is dying, we might be running poll reap from the exit work. 2811 + * Don't attempt to reissue from that path, just let it fail with 2812 + * -EAGAIN. 2813 + */ 2814 + if (percpu_ref_is_dying(&req->ctx->refs)) 2831 2815 return false; 2832 2816 2833 2817 lockdep_assert_held(&req->ctx->uring_lock); ··· 3497 3467 3498 3468 static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 3499 3469 { 3500 - ssize_t ret; 3501 - 3502 - ret = io_prep_rw(req, sqe); 3503 - if (ret) 3504 - return ret; 3505 - 3506 3470 if (unlikely(!(req->file->f_mode & FMODE_READ))) 3507 3471 return -EBADF; 3508 - 3509 - /* either don't need iovec imported or already have it */ 3510 - if (!req->async_data) 3511 - return 0; 3512 - return io_rw_prep_async(req, READ); 3472 + return io_prep_rw(req, sqe); 3513 3473 } 3514 3474 3515 3475 /* ··· 3627 3607 ret = io_iter_do_read(req, iter); 3628 3608 3629 3609 if (ret == -EIOCBQUEUED) { 3630 - /* it's faster to check here then delegate to kfree */ 3631 - if (iovec) 3632 - kfree(iovec); 3633 - return 0; 3610 + goto out_free; 3634 3611 } else if (ret == -EAGAIN) { 3635 3612 /* IOPOLL retry should happen for io-wq threads */ 3636 3613 if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL)) ··· 3648 3631 if (ret2) 3649 3632 return ret2; 3650 3633 3634 + iovec = NULL; 3651 3635 rw = req->async_data; 3652 3636 /* now use our persistent iterator, if we aren't already */ 3653 3637 iter = &rw->iter; ··· 3675 3657 } while (ret > 0 && ret < io_size); 3676 3658 done: 3677 3659 kiocb_done(kiocb, ret, issue_flags); 3660 + out_free: 3661 + /* it's faster to check here then delegate to kfree */ 3662 + if (iovec) 3663 + kfree(iovec); 3678 3664 return 0; 3679 3665 } 3680 3666 3681 3667 static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 3682 3668 { 3683 - ssize_t ret; 3684 - 3685 - ret = io_prep_rw(req, sqe); 3686 - if (ret) 3687 - return ret; 3688 - 3689 3669 if (unlikely(!(req->file->f_mode & FMODE_WRITE))) 3690 3670 return -EBADF; 3691 - 3692 - /* either don't need iovec imported or already have it */ 3693 - if (!req->async_data) 3694 - return 0; 3695 - return io_rw_prep_async(req, WRITE); 3671 + return io_prep_rw(req, sqe); 3696 3672 } 3697 3673 3698 3674 static int io_write(struct io_kiocb *req, unsigned int issue_flags) ··· 4023 4011 return 0; 4024 4012 } 4025 4013 4026 - static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe) 4014 + static int io_fsync_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 4027 4015 { 4028 4016 struct io_ring_ctx *ctx = req->ctx; 4029 4017 ··· 4610 4598 return 0; 4611 4599 } 4612 4600 4613 - static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) 4601 + static int io_sfr_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 4614 4602 { 4615 4603 struct io_ring_ctx *ctx = req->ctx; 4616 - 4617 - if (!req->file) 4618 - return -EBADF; 4619 4604 4620 4605 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 4621 4606 return -EINVAL; ··· 4673 4664 req->sr_msg.msg_flags, &iomsg->free_iov); 4674 4665 } 4675 4666 4667 + static int io_sendmsg_prep_async(struct io_kiocb *req) 4668 + { 4669 + int ret; 4670 + 4671 + if (!io_op_defs[req->opcode].needs_async_data) 4672 + return 0; 4673 + ret = io_sendmsg_copy_hdr(req, req->async_data); 4674 + if (!ret) 4675 + req->flags |= REQ_F_NEED_CLEANUP; 4676 + return ret; 4677 + } 4678 + 4676 4679 static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 4677 4680 { 4678 - struct io_async_msghdr *async_msg = req->async_data; 4679 4681 struct io_sr_msg *sr = &req->sr_msg; 4680 - int ret; 4681 4682 4682 4683 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 4683 4684 return -EINVAL; ··· 4700 4681 if (req->ctx->compat) 4701 4682 sr->msg_flags |= MSG_CMSG_COMPAT; 4702 4683 #endif 4703 - 4704 - if (!async_msg || !io_op_defs[req->opcode].needs_async_data) 4705 - return 0; 4706 - ret = io_sendmsg_copy_hdr(req, async_msg); 4707 - if (!ret) 4708 - req->flags |= REQ_F_NEED_CLEANUP; 4709 - return ret; 4684 + return 0; 4710 4685 } 4711 4686 4712 4687 static int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags) ··· 4894 4881 return io_put_kbuf(req, req->sr_msg.kbuf); 4895 4882 } 4896 4883 4897 - static int io_recvmsg_prep(struct io_kiocb *req, 4898 - const struct io_uring_sqe *sqe) 4884 + static int io_recvmsg_prep_async(struct io_kiocb *req) 4899 4885 { 4900 - struct io_async_msghdr *async_msg = req->async_data; 4901 - struct io_sr_msg *sr = &req->sr_msg; 4902 4886 int ret; 4887 + 4888 + if (!io_op_defs[req->opcode].needs_async_data) 4889 + return 0; 4890 + ret = io_recvmsg_copy_hdr(req, req->async_data); 4891 + if (!ret) 4892 + req->flags |= REQ_F_NEED_CLEANUP; 4893 + return ret; 4894 + } 4895 + 4896 + static int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 4897 + { 4898 + struct io_sr_msg *sr = &req->sr_msg; 4903 4899 4904 4900 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 4905 4901 return -EINVAL; ··· 4922 4900 if (req->ctx->compat) 4923 4901 sr->msg_flags |= MSG_CMSG_COMPAT; 4924 4902 #endif 4925 - 4926 - if (!async_msg || !io_op_defs[req->opcode].needs_async_data) 4927 - return 0; 4928 - ret = io_recvmsg_copy_hdr(req, async_msg); 4929 - if (!ret) 4930 - req->flags |= REQ_F_NEED_CLEANUP; 4931 - return ret; 4903 + return 0; 4932 4904 } 4933 4905 4934 4906 static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) ··· 5075 5059 return 0; 5076 5060 } 5077 5061 5062 + static int io_connect_prep_async(struct io_kiocb *req) 5063 + { 5064 + struct io_async_connect *io = req->async_data; 5065 + struct io_connect *conn = &req->connect; 5066 + 5067 + return move_addr_to_kernel(conn->addr, conn->addr_len, &io->address); 5068 + } 5069 + 5078 5070 static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 5079 5071 { 5080 5072 struct io_connect *conn = &req->connect; 5081 - struct io_async_connect *io = req->async_data; 5082 5073 5083 5074 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 5084 5075 return -EINVAL; ··· 5094 5071 5095 5072 conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr)); 5096 5073 conn->addr_len = READ_ONCE(sqe->addr2); 5097 - 5098 - if (!io) 5099 - return 0; 5100 - 5101 - return move_addr_to_kernel(conn->addr, conn->addr_len, 5102 - &io->address); 5074 + return 0; 5103 5075 } 5104 5076 5105 5077 static int io_connect(struct io_kiocb *req, unsigned int issue_flags) ··· 5139 5121 return 0; 5140 5122 } 5141 5123 #else /* !CONFIG_NET */ 5142 - static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 5143 - { 5144 - return -EOPNOTSUPP; 5124 + #define IO_NETOP_FN(op) \ 5125 + static int io_##op(struct io_kiocb *req, unsigned int issue_flags) \ 5126 + { \ 5127 + return -EOPNOTSUPP; \ 5145 5128 } 5146 5129 5147 - static int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags) 5148 - { 5149 - return -EOPNOTSUPP; 5130 + #define IO_NETOP_PREP(op) \ 5131 + IO_NETOP_FN(op) \ 5132 + static int io_##op##_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) \ 5133 + { \ 5134 + return -EOPNOTSUPP; \ 5135 + } \ 5136 + 5137 + #define IO_NETOP_PREP_ASYNC(op) \ 5138 + IO_NETOP_PREP(op) \ 5139 + static int io_##op##_prep_async(struct io_kiocb *req) \ 5140 + { \ 5141 + return -EOPNOTSUPP; \ 5150 5142 } 5151 5143 5152 - static int io_send(struct io_kiocb *req, unsigned int issue_flags) 5153 - { 5154 - return -EOPNOTSUPP; 5155 - } 5156 - 5157 - static int io_recvmsg_prep(struct io_kiocb *req, 5158 - const struct io_uring_sqe *sqe) 5159 - { 5160 - return -EOPNOTSUPP; 5161 - } 5162 - 5163 - static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) 5164 - { 5165 - return -EOPNOTSUPP; 5166 - } 5167 - 5168 - static int io_recv(struct io_kiocb *req, unsigned int issue_flags) 5169 - { 5170 - return -EOPNOTSUPP; 5171 - } 5172 - 5173 - static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 5174 - { 5175 - return -EOPNOTSUPP; 5176 - } 5177 - 5178 - static int io_accept(struct io_kiocb *req, unsigned int issue_flags) 5179 - { 5180 - return -EOPNOTSUPP; 5181 - } 5182 - 5183 - static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) 5184 - { 5185 - return -EOPNOTSUPP; 5186 - } 5187 - 5188 - static int io_connect(struct io_kiocb *req, unsigned int issue_flags) 5189 - { 5190 - return -EOPNOTSUPP; 5191 - } 5144 + IO_NETOP_PREP_ASYNC(sendmsg); 5145 + IO_NETOP_PREP_ASYNC(recvmsg); 5146 + IO_NETOP_PREP_ASYNC(connect); 5147 + IO_NETOP_PREP(accept); 5148 + IO_NETOP_FN(send); 5149 + IO_NETOP_FN(recv); 5192 5150 #endif /* CONFIG_NET */ 5193 5151 5194 5152 struct io_poll_table { ··· 6078 6084 case IORING_OP_POLL_REMOVE: 6079 6085 return io_poll_remove_prep(req, sqe); 6080 6086 case IORING_OP_FSYNC: 6081 - return io_prep_fsync(req, sqe); 6087 + return io_fsync_prep(req, sqe); 6082 6088 case IORING_OP_SYNC_FILE_RANGE: 6083 - return io_prep_sfr(req, sqe); 6089 + return io_sfr_prep(req, sqe); 6084 6090 case IORING_OP_SENDMSG: 6085 6091 case IORING_OP_SEND: 6086 6092 return io_sendmsg_prep(req, sqe); ··· 6138 6144 return-EINVAL; 6139 6145 } 6140 6146 6141 - static int io_req_defer_prep(struct io_kiocb *req, 6142 - const struct io_uring_sqe *sqe) 6147 + static int io_req_prep_async(struct io_kiocb *req) 6143 6148 { 6144 - if (!sqe) 6149 + switch (req->opcode) { 6150 + case IORING_OP_READV: 6151 + case IORING_OP_READ_FIXED: 6152 + case IORING_OP_READ: 6153 + return io_rw_prep_async(req, READ); 6154 + case IORING_OP_WRITEV: 6155 + case IORING_OP_WRITE_FIXED: 6156 + case IORING_OP_WRITE: 6157 + return io_rw_prep_async(req, WRITE); 6158 + case IORING_OP_SENDMSG: 6159 + case IORING_OP_SEND: 6160 + return io_sendmsg_prep_async(req); 6161 + case IORING_OP_RECVMSG: 6162 + case IORING_OP_RECV: 6163 + return io_recvmsg_prep_async(req); 6164 + case IORING_OP_CONNECT: 6165 + return io_connect_prep_async(req); 6166 + } 6167 + return 0; 6168 + } 6169 + 6170 + static int io_req_defer_prep(struct io_kiocb *req) 6171 + { 6172 + if (!io_op_defs[req->opcode].needs_async_data) 6145 6173 return 0; 6146 - if (io_alloc_async_data(req)) 6174 + /* some opcodes init it during the inital prep */ 6175 + if (req->async_data) 6176 + return 0; 6177 + if (__io_alloc_async_data(req)) 6147 6178 return -EAGAIN; 6148 - return io_req_prep(req, sqe); 6179 + return io_req_prep_async(req); 6149 6180 } 6150 6181 6151 6182 static u32 io_get_sequence(struct io_kiocb *req) ··· 6186 6167 return total_submitted - nr_reqs; 6187 6168 } 6188 6169 6189 - static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe) 6170 + static int io_req_defer(struct io_kiocb *req) 6190 6171 { 6191 6172 struct io_ring_ctx *ctx = req->ctx; 6192 6173 struct io_defer_entry *de; ··· 6203 6184 if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) 6204 6185 return 0; 6205 6186 6206 - if (!req->async_data) { 6207 - ret = io_req_defer_prep(req, sqe); 6208 - if (ret) 6209 - return ret; 6210 - } 6187 + ret = io_req_defer_prep(req); 6188 + if (ret) 6189 + return ret; 6211 6190 io_prep_async_link(req); 6212 6191 de = kmalloc(sizeof(*de), GFP_KERNEL); 6213 6192 if (!de) ··· 6444 6427 } while (1); 6445 6428 } 6446 6429 6430 + /* avoid locking problems by failing it from a clean context */ 6447 6431 if (ret) { 6448 - struct io_ring_ctx *lock_ctx = NULL; 6449 - 6450 - if (req->ctx->flags & IORING_SETUP_IOPOLL) 6451 - lock_ctx = req->ctx; 6452 - 6453 - /* 6454 - * io_iopoll_complete() does not hold completion_lock to 6455 - * complete polled io, so here for polled io, we can not call 6456 - * io_req_complete() directly, otherwise there maybe concurrent 6457 - * access to cqring, defer_list, etc, which is not safe. Given 6458 - * that io_iopoll_complete() is always called under uring_lock, 6459 - * so here for polled io, we also get uring_lock to complete 6460 - * it. 6461 - */ 6462 - if (lock_ctx) 6463 - mutex_lock(&lock_ctx->uring_lock); 6464 - 6465 - req_set_fail_links(req); 6466 - io_req_complete(req, ret); 6467 - 6468 - if (lock_ctx) 6469 - mutex_unlock(&lock_ctx->uring_lock); 6432 + /* io-wq is going to take one down */ 6433 + refcount_inc(&req->refs); 6434 + io_req_task_queue_fail(req, ret); 6470 6435 } 6471 6436 } 6472 6437 ··· 6606 6607 io_queue_linked_timeout(linked_timeout); 6607 6608 } 6608 6609 6609 - static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe) 6610 + static void io_queue_sqe(struct io_kiocb *req) 6610 6611 { 6611 6612 int ret; 6612 6613 6613 - ret = io_req_defer(req, sqe); 6614 + ret = io_req_defer(req); 6614 6615 if (ret) { 6615 6616 if (ret != -EIOCBQUEUED) { 6616 6617 fail_req: ··· 6619 6620 io_req_complete(req, ret); 6620 6621 } 6621 6622 } else if (req->flags & REQ_F_FORCE_ASYNC) { 6622 - if (!req->async_data) { 6623 - ret = io_req_defer_prep(req, sqe); 6624 - if (unlikely(ret)) 6625 - goto fail_req; 6626 - } 6623 + ret = io_req_defer_prep(req); 6624 + if (unlikely(ret)) 6625 + goto fail_req; 6627 6626 io_queue_async_work(req); 6628 6627 } else { 6629 - if (sqe) { 6630 - ret = io_req_prep(req, sqe); 6631 - if (unlikely(ret)) 6632 - goto fail_req; 6633 - } 6634 6628 __io_queue_sqe(req); 6635 6629 } 6636 - } 6637 - 6638 - static inline void io_queue_link_head(struct io_kiocb *req) 6639 - { 6640 - if (unlikely(req->flags & REQ_F_FAIL_LINK)) { 6641 - io_put_req(req); 6642 - io_req_complete(req, -ECANCELED); 6643 - } else 6644 - io_queue_sqe(req, NULL); 6645 - } 6646 - 6647 - struct io_submit_link { 6648 - struct io_kiocb *head; 6649 - struct io_kiocb *last; 6650 - }; 6651 - 6652 - static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, 6653 - struct io_submit_link *link) 6654 - { 6655 - struct io_ring_ctx *ctx = req->ctx; 6656 - int ret; 6657 - 6658 - /* 6659 - * If we already have a head request, queue this one for async 6660 - * submittal once the head completes. If we don't have a head but 6661 - * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be 6662 - * submitted sync once the chain is complete. If none of those 6663 - * conditions are true (normal request), then just queue it. 6664 - */ 6665 - if (link->head) { 6666 - struct io_kiocb *head = link->head; 6667 - 6668 - /* 6669 - * Taking sequential execution of a link, draining both sides 6670 - * of the link also fullfils IOSQE_IO_DRAIN semantics for all 6671 - * requests in the link. So, it drains the head and the 6672 - * next after the link request. The last one is done via 6673 - * drain_next flag to persist the effect across calls. 6674 - */ 6675 - if (req->flags & REQ_F_IO_DRAIN) { 6676 - head->flags |= REQ_F_IO_DRAIN; 6677 - ctx->drain_next = 1; 6678 - } 6679 - ret = io_req_defer_prep(req, sqe); 6680 - if (unlikely(ret)) { 6681 - /* fail even hard links since we don't submit */ 6682 - head->flags |= REQ_F_FAIL_LINK; 6683 - return ret; 6684 - } 6685 - trace_io_uring_link(ctx, req, head); 6686 - link->last->link = req; 6687 - link->last = req; 6688 - 6689 - /* last request of a link, enqueue the link */ 6690 - if (!(req->flags & (REQ_F_LINK | REQ_F_HARDLINK))) { 6691 - io_queue_link_head(head); 6692 - link->head = NULL; 6693 - } 6694 - } else { 6695 - if (unlikely(ctx->drain_next)) { 6696 - req->flags |= REQ_F_IO_DRAIN; 6697 - ctx->drain_next = 0; 6698 - } 6699 - if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) { 6700 - ret = io_req_defer_prep(req, sqe); 6701 - if (unlikely(ret)) 6702 - req->flags |= REQ_F_FAIL_LINK; 6703 - link->head = req; 6704 - link->last = req; 6705 - } else { 6706 - io_queue_sqe(req, sqe); 6707 - } 6708 - } 6709 - 6710 - return 0; 6711 - } 6712 - 6713 - /* 6714 - * Batched submission is done, ensure local IO is flushed out. 6715 - */ 6716 - static void io_submit_state_end(struct io_submit_state *state, 6717 - struct io_ring_ctx *ctx) 6718 - { 6719 - if (state->comp.nr) 6720 - io_submit_flush_completions(&state->comp, ctx); 6721 - if (state->plug_started) 6722 - blk_finish_plug(&state->plug); 6723 - io_state_file_put(state); 6724 - } 6725 - 6726 - /* 6727 - * Start submission side cache. 6728 - */ 6729 - static void io_submit_state_start(struct io_submit_state *state, 6730 - unsigned int max_ios) 6731 - { 6732 - state->plug_started = false; 6733 - state->ios_left = max_ios; 6734 - } 6735 - 6736 - static void io_commit_sqring(struct io_ring_ctx *ctx) 6737 - { 6738 - struct io_rings *rings = ctx->rings; 6739 - 6740 - /* 6741 - * Ensure any loads from the SQEs are done at this point, 6742 - * since once we write the new head, the application could 6743 - * write new data to them. 6744 - */ 6745 - smp_store_release(&rings->sq.head, ctx->cached_sq_head); 6746 - } 6747 - 6748 - /* 6749 - * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory 6750 - * that is mapped by userspace. This means that care needs to be taken to 6751 - * ensure that reads are stable, as we cannot rely on userspace always 6752 - * being a good citizen. If members of the sqe are validated and then later 6753 - * used, it's important that those reads are done through READ_ONCE() to 6754 - * prevent a re-load down the line. 6755 - */ 6756 - static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx) 6757 - { 6758 - u32 *sq_array = ctx->sq_array; 6759 - unsigned head; 6760 - 6761 - /* 6762 - * The cached sq head (or cq tail) serves two purposes: 6763 - * 6764 - * 1) allows us to batch the cost of updating the user visible 6765 - * head updates. 6766 - * 2) allows the kernel side to track the head on its own, even 6767 - * though the application is the one updating it. 6768 - */ 6769 - head = READ_ONCE(sq_array[ctx->cached_sq_head++ & ctx->sq_mask]); 6770 - if (likely(head < ctx->sq_entries)) 6771 - return &ctx->sq_sqes[head]; 6772 - 6773 - /* drop invalid entries */ 6774 - ctx->cached_sq_dropped++; 6775 - WRITE_ONCE(ctx->rings->sq_dropped, ctx->cached_sq_dropped); 6776 - return NULL; 6777 6630 } 6778 6631 6779 6632 /* ··· 6654 6803 return true; 6655 6804 } 6656 6805 6657 - #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \ 6658 - IOSQE_IO_HARDLINK | IOSQE_ASYNC | \ 6659 - IOSQE_BUFFER_SELECT) 6660 - 6661 6806 static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, 6662 6807 const struct io_uring_sqe *sqe) 6663 6808 { ··· 6676 6829 req->result = 0; 6677 6830 6678 6831 /* enforce forwards compatibility on users */ 6679 - if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) 6832 + if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) { 6833 + req->flags = 0; 6680 6834 return -EINVAL; 6835 + } 6681 6836 6682 6837 if (unlikely(req->opcode >= IORING_OP_LAST)) 6683 6838 return -EINVAL; ··· 6733 6884 return ret; 6734 6885 } 6735 6886 6887 + static int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 6888 + const struct io_uring_sqe *sqe) 6889 + { 6890 + struct io_submit_link *link = &ctx->submit_state.link; 6891 + int ret; 6892 + 6893 + ret = io_init_req(ctx, req, sqe); 6894 + if (unlikely(ret)) { 6895 + fail_req: 6896 + io_put_req(req); 6897 + io_req_complete(req, ret); 6898 + if (link->head) { 6899 + /* fail even hard links since we don't submit */ 6900 + link->head->flags |= REQ_F_FAIL_LINK; 6901 + io_put_req(link->head); 6902 + io_req_complete(link->head, -ECANCELED); 6903 + link->head = NULL; 6904 + } 6905 + return ret; 6906 + } 6907 + ret = io_req_prep(req, sqe); 6908 + if (unlikely(ret)) 6909 + goto fail_req; 6910 + 6911 + /* don't need @sqe from now on */ 6912 + trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data, 6913 + true, ctx->flags & IORING_SETUP_SQPOLL); 6914 + 6915 + /* 6916 + * If we already have a head request, queue this one for async 6917 + * submittal once the head completes. If we don't have a head but 6918 + * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be 6919 + * submitted sync once the chain is complete. If none of those 6920 + * conditions are true (normal request), then just queue it. 6921 + */ 6922 + if (link->head) { 6923 + struct io_kiocb *head = link->head; 6924 + 6925 + /* 6926 + * Taking sequential execution of a link, draining both sides 6927 + * of the link also fullfils IOSQE_IO_DRAIN semantics for all 6928 + * requests in the link. So, it drains the head and the 6929 + * next after the link request. The last one is done via 6930 + * drain_next flag to persist the effect across calls. 6931 + */ 6932 + if (req->flags & REQ_F_IO_DRAIN) { 6933 + head->flags |= REQ_F_IO_DRAIN; 6934 + ctx->drain_next = 1; 6935 + } 6936 + ret = io_req_defer_prep(req); 6937 + if (unlikely(ret)) 6938 + goto fail_req; 6939 + trace_io_uring_link(ctx, req, head); 6940 + link->last->link = req; 6941 + link->last = req; 6942 + 6943 + /* last request of a link, enqueue the link */ 6944 + if (!(req->flags & (REQ_F_LINK | REQ_F_HARDLINK))) { 6945 + io_queue_sqe(head); 6946 + link->head = NULL; 6947 + } 6948 + } else { 6949 + if (unlikely(ctx->drain_next)) { 6950 + req->flags |= REQ_F_IO_DRAIN; 6951 + ctx->drain_next = 0; 6952 + } 6953 + if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) { 6954 + link->head = req; 6955 + link->last = req; 6956 + } else { 6957 + io_queue_sqe(req); 6958 + } 6959 + } 6960 + 6961 + return 0; 6962 + } 6963 + 6964 + /* 6965 + * Batched submission is done, ensure local IO is flushed out. 6966 + */ 6967 + static void io_submit_state_end(struct io_submit_state *state, 6968 + struct io_ring_ctx *ctx) 6969 + { 6970 + if (state->link.head) 6971 + io_queue_sqe(state->link.head); 6972 + if (state->comp.nr) 6973 + io_submit_flush_completions(&state->comp, ctx); 6974 + if (state->plug_started) 6975 + blk_finish_plug(&state->plug); 6976 + io_state_file_put(state); 6977 + } 6978 + 6979 + /* 6980 + * Start submission side cache. 6981 + */ 6982 + static void io_submit_state_start(struct io_submit_state *state, 6983 + unsigned int max_ios) 6984 + { 6985 + state->plug_started = false; 6986 + state->ios_left = max_ios; 6987 + /* set only head, no need to init link_last in advance */ 6988 + state->link.head = NULL; 6989 + } 6990 + 6991 + static void io_commit_sqring(struct io_ring_ctx *ctx) 6992 + { 6993 + struct io_rings *rings = ctx->rings; 6994 + 6995 + /* 6996 + * Ensure any loads from the SQEs are done at this point, 6997 + * since once we write the new head, the application could 6998 + * write new data to them. 6999 + */ 7000 + smp_store_release(&rings->sq.head, ctx->cached_sq_head); 7001 + } 7002 + 7003 + /* 7004 + * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory 7005 + * that is mapped by userspace. This means that care needs to be taken to 7006 + * ensure that reads are stable, as we cannot rely on userspace always 7007 + * being a good citizen. If members of the sqe are validated and then later 7008 + * used, it's important that those reads are done through READ_ONCE() to 7009 + * prevent a re-load down the line. 7010 + */ 7011 + static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx) 7012 + { 7013 + u32 *sq_array = ctx->sq_array; 7014 + unsigned head; 7015 + 7016 + /* 7017 + * The cached sq head (or cq tail) serves two purposes: 7018 + * 7019 + * 1) allows us to batch the cost of updating the user visible 7020 + * head updates. 7021 + * 2) allows the kernel side to track the head on its own, even 7022 + * though the application is the one updating it. 7023 + */ 7024 + head = READ_ONCE(sq_array[ctx->cached_sq_head++ & ctx->sq_mask]); 7025 + if (likely(head < ctx->sq_entries)) 7026 + return &ctx->sq_sqes[head]; 7027 + 7028 + /* drop invalid entries */ 7029 + ctx->cached_sq_dropped++; 7030 + WRITE_ONCE(ctx->rings->sq_dropped, ctx->cached_sq_dropped); 7031 + return NULL; 7032 + } 7033 + 6736 7034 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) 6737 7035 { 6738 - struct io_submit_link link; 6739 - int i, submitted = 0; 7036 + int submitted = 0; 6740 7037 6741 7038 /* if we have a backlog and couldn't flush it all, return BUSY */ 6742 7039 if (test_bit(0, &ctx->sq_check_overflow)) { ··· 6898 6903 6899 6904 percpu_counter_add(&current->io_uring->inflight, nr); 6900 6905 refcount_add(nr, &current->usage); 6901 - 6902 6906 io_submit_state_start(&ctx->submit_state, nr); 6903 - link.head = NULL; 6904 6907 6905 - for (i = 0; i < nr; i++) { 6908 + while (submitted < nr) { 6906 6909 const struct io_uring_sqe *sqe; 6907 6910 struct io_kiocb *req; 6908 - int err; 6909 6911 6910 6912 req = io_alloc_req(ctx); 6911 6913 if (unlikely(!req)) { ··· 6917 6925 } 6918 6926 /* will complete beyond this point, count as submitted */ 6919 6927 submitted++; 6920 - 6921 - err = io_init_req(ctx, req, sqe); 6922 - if (unlikely(err)) { 6923 - fail_req: 6924 - io_put_req(req); 6925 - io_req_complete(req, err); 6928 + if (io_submit_sqe(ctx, req, sqe)) 6926 6929 break; 6927 - } 6928 - 6929 - trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data, 6930 - true, ctx->flags & IORING_SETUP_SQPOLL); 6931 - err = io_submit_sqe(req, sqe, &link); 6932 - if (err) 6933 - goto fail_req; 6934 6930 } 6935 6931 6936 6932 if (unlikely(submitted != nr)) { ··· 6930 6950 percpu_counter_sub(&tctx->inflight, unused); 6931 6951 put_task_struct_many(current, unused); 6932 6952 } 6933 - if (link.head) 6934 - io_queue_link_head(link.head); 6935 - io_submit_state_end(&ctx->submit_state, ctx); 6936 6953 6954 + io_submit_state_end(&ctx->submit_state, ctx); 6937 6955 /* Commit SQ ring head once we've consumed and submitted all SQEs */ 6938 6956 io_commit_sqring(ctx); 6939 6957 ··· 7306 7328 percpu_ref_get(&rsrc_data->refs); 7307 7329 } 7308 7330 7309 - static int io_rsrc_ref_quiesce(struct fixed_rsrc_data *data, 7310 - struct io_ring_ctx *ctx, 7311 - struct fixed_rsrc_ref_node *backup_node) 7331 + static void io_sqe_rsrc_kill_node(struct io_ring_ctx *ctx, struct fixed_rsrc_data *data) 7312 7332 { 7313 - struct fixed_rsrc_ref_node *ref_node; 7314 - int ret; 7333 + struct fixed_rsrc_ref_node *ref_node = NULL; 7315 7334 7316 7335 io_rsrc_ref_lock(ctx); 7317 7336 ref_node = data->node; 7337 + data->node = NULL; 7318 7338 io_rsrc_ref_unlock(ctx); 7319 7339 if (ref_node) 7320 7340 percpu_ref_kill(&ref_node->refs); 7341 + } 7321 7342 7322 - percpu_ref_kill(&data->refs); 7343 + static int io_rsrc_ref_quiesce(struct fixed_rsrc_data *data, 7344 + struct io_ring_ctx *ctx, 7345 + void (*rsrc_put)(struct io_ring_ctx *ctx, 7346 + struct io_rsrc_put *prsrc)) 7347 + { 7348 + struct fixed_rsrc_ref_node *backup_node; 7349 + int ret; 7323 7350 7324 - /* wait for all refs nodes to complete */ 7325 - flush_delayed_work(&ctx->rsrc_put_work); 7351 + if (data->quiesce) 7352 + return -ENXIO; 7353 + 7354 + data->quiesce = true; 7326 7355 do { 7356 + ret = -ENOMEM; 7357 + backup_node = alloc_fixed_rsrc_ref_node(ctx); 7358 + if (!backup_node) 7359 + break; 7360 + backup_node->rsrc_data = data; 7361 + backup_node->rsrc_put = rsrc_put; 7362 + 7363 + io_sqe_rsrc_kill_node(ctx, data); 7364 + percpu_ref_kill(&data->refs); 7365 + flush_delayed_work(&ctx->rsrc_put_work); 7366 + 7327 7367 ret = wait_for_completion_interruptible(&data->done); 7328 7368 if (!ret) 7329 7369 break; 7330 - ret = io_run_task_work_sig(); 7331 - if (ret < 0) { 7332 - percpu_ref_resurrect(&data->refs); 7333 - reinit_completion(&data->done); 7334 - io_sqe_rsrc_set_node(ctx, data, backup_node); 7335 - return ret; 7336 - } 7337 - } while (1); 7338 7370 7339 - destroy_fixed_rsrc_ref_node(backup_node); 7340 - return 0; 7371 + percpu_ref_resurrect(&data->refs); 7372 + io_sqe_rsrc_set_node(ctx, data, backup_node); 7373 + backup_node = NULL; 7374 + reinit_completion(&data->done); 7375 + mutex_unlock(&ctx->uring_lock); 7376 + ret = io_run_task_work_sig(); 7377 + mutex_lock(&ctx->uring_lock); 7378 + } while (ret >= 0); 7379 + data->quiesce = false; 7380 + 7381 + if (backup_node) 7382 + destroy_fixed_rsrc_ref_node(backup_node); 7383 + return ret; 7341 7384 } 7342 7385 7343 7386 static struct fixed_rsrc_data *alloc_fixed_rsrc_data(struct io_ring_ctx *ctx) ··· 7389 7390 static int io_sqe_files_unregister(struct io_ring_ctx *ctx) 7390 7391 { 7391 7392 struct fixed_rsrc_data *data = ctx->file_data; 7392 - struct fixed_rsrc_ref_node *backup_node; 7393 7393 unsigned nr_tables, i; 7394 7394 int ret; 7395 7395 7396 - if (!data) 7396 + /* 7397 + * percpu_ref_is_dying() is to stop parallel files unregister 7398 + * Since we possibly drop uring lock later in this function to 7399 + * run task work. 7400 + */ 7401 + if (!data || percpu_ref_is_dying(&data->refs)) 7397 7402 return -ENXIO; 7398 - backup_node = alloc_fixed_rsrc_ref_node(ctx); 7399 - if (!backup_node) 7400 - return -ENOMEM; 7401 - init_fixed_file_ref_node(ctx, backup_node); 7402 - 7403 - ret = io_rsrc_ref_quiesce(data, ctx, backup_node); 7403 + ret = io_rsrc_ref_quiesce(data, ctx, io_ring_file_put); 7404 7404 if (ret) 7405 7405 return ret; 7406 7406 ··· 8700 8702 static void io_req_caches_free(struct io_ring_ctx *ctx, struct task_struct *tsk) 8701 8703 { 8702 8704 struct io_submit_state *submit_state = &ctx->submit_state; 8705 + struct io_comp_state *cs = &ctx->submit_state.comp; 8703 8706 8704 8707 mutex_lock(&ctx->uring_lock); 8705 8708 8706 - if (submit_state->free_reqs) 8709 + if (submit_state->free_reqs) { 8707 8710 kmem_cache_free_bulk(req_cachep, submit_state->free_reqs, 8708 8711 submit_state->reqs); 8709 - 8710 - io_req_cache_free(&submit_state->comp.free_list, NULL); 8712 + submit_state->free_reqs = 0; 8713 + } 8711 8714 8712 8715 spin_lock_irq(&ctx->completion_lock); 8713 - io_req_cache_free(&submit_state->comp.locked_free_list, NULL); 8716 + list_splice_init(&cs->locked_free_list, &cs->free_list); 8717 + cs->locked_free_nr = 0; 8714 8718 spin_unlock_irq(&ctx->completion_lock); 8719 + 8720 + io_req_cache_free(&cs->free_list, NULL); 8715 8721 8716 8722 mutex_unlock(&ctx->uring_lock); 8717 8723 } ··· 8745 8743 css_put(ctx->sqo_blkcg_css); 8746 8744 #endif 8747 8745 8746 + mutex_lock(&ctx->uring_lock); 8748 8747 io_sqe_files_unregister(ctx); 8748 + mutex_unlock(&ctx->uring_lock); 8749 8749 io_eventfd_unregister(ctx); 8750 8750 io_destroy_buffers(ctx); 8751 8751 idr_destroy(&ctx->personality_idr); ··· 10199 10195 goto out_fput; 10200 10196 10201 10197 ctx = f.file->private_data; 10198 + 10199 + io_run_task_work(); 10202 10200 10203 10201 mutex_lock(&ctx->uring_lock); 10204 10202 ret = __io_uring_register(ctx, opcode, arg, nr_args);