at v5.3 86 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Shared application/kernel submission and completion ring pairs, for 4 * supporting fast/efficient IO. 5 * 6 * A note on the read/write ordering memory barriers that are matched between 7 * the application and kernel side. 8 * 9 * After the application reads the CQ ring tail, it must use an 10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses 11 * before writing the tail (using smp_load_acquire to read the tail will 12 * do). It also needs a smp_mb() before updating CQ head (ordering the 13 * entry load(s) with the head store), pairing with an implicit barrier 14 * through a control-dependency in io_get_cqring (smp_store_release to 15 * store head will do). Failure to do so could lead to reading invalid 16 * CQ entries. 17 * 18 * Likewise, the application must use an appropriate smp_wmb() before 19 * writing the SQ tail (ordering SQ entry stores with the tail store), 20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release 21 * to store the tail will do). And it needs a barrier ordering the SQ 22 * head load before writing new SQ entries (smp_load_acquire to read 23 * head will do). 24 * 25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application 26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after* 27 * updating the SQ tail; a full memory barrier smp_mb() is needed 28 * between. 29 * 30 * Also see the examples in the liburing library: 31 * 32 * git://git.kernel.dk/liburing 33 * 34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens 35 * from data shared between the kernel and application. This is done both 36 * for ordering purposes, but also to ensure that once a value is loaded from 37 * data that the application could potentially modify, it remains stable. 38 * 39 * Copyright (C) 2018-2019 Jens Axboe 40 * Copyright (c) 2018-2019 Christoph Hellwig 41 */ 42#include <linux/kernel.h> 43#include <linux/init.h> 44#include <linux/errno.h> 45#include <linux/syscalls.h> 46#include <linux/compat.h> 47#include <linux/refcount.h> 48#include <linux/uio.h> 49 50#include <linux/sched/signal.h> 51#include <linux/fs.h> 52#include <linux/file.h> 53#include <linux/fdtable.h> 54#include <linux/mm.h> 55#include <linux/mman.h> 56#include <linux/mmu_context.h> 57#include <linux/percpu.h> 58#include <linux/slab.h> 59#include <linux/workqueue.h> 60#include <linux/kthread.h> 61#include <linux/blkdev.h> 62#include <linux/bvec.h> 63#include <linux/net.h> 64#include <net/sock.h> 65#include <net/af_unix.h> 66#include <net/scm.h> 67#include <linux/anon_inodes.h> 68#include <linux/sched/mm.h> 69#include <linux/uaccess.h> 70#include <linux/nospec.h> 71#include <linux/sizes.h> 72#include <linux/hugetlb.h> 73 74#include <uapi/linux/io_uring.h> 75 76#include "internal.h" 77 78#define IORING_MAX_ENTRIES 4096 79#define IORING_MAX_FIXED_FILES 1024 80 81struct io_uring { 82 u32 head ____cacheline_aligned_in_smp; 83 u32 tail ____cacheline_aligned_in_smp; 84}; 85 86/* 87 * This data is shared with the application through the mmap at offset 88 * IORING_OFF_SQ_RING. 89 * 90 * The offsets to the member fields are published through struct 91 * io_sqring_offsets when calling io_uring_setup. 92 */ 93struct io_sq_ring { 94 /* 95 * Head and tail offsets into the ring; the offsets need to be 96 * masked to get valid indices. 97 * 98 * The kernel controls head and the application controls tail. 99 */ 100 struct io_uring r; 101 /* 102 * Bitmask to apply to head and tail offsets (constant, equals 103 * ring_entries - 1) 104 */ 105 u32 ring_mask; 106 /* Ring size (constant, power of 2) */ 107 u32 ring_entries; 108 /* 109 * Number of invalid entries dropped by the kernel due to 110 * invalid index stored in array 111 * 112 * Written by the kernel, shouldn't be modified by the 113 * application (i.e. get number of "new events" by comparing to 114 * cached value). 115 * 116 * After a new SQ head value was read by the application this 117 * counter includes all submissions that were dropped reaching 118 * the new SQ head (and possibly more). 119 */ 120 u32 dropped; 121 /* 122 * Runtime flags 123 * 124 * Written by the kernel, shouldn't be modified by the 125 * application. 126 * 127 * The application needs a full memory barrier before checking 128 * for IORING_SQ_NEED_WAKEUP after updating the sq tail. 129 */ 130 u32 flags; 131 /* 132 * Ring buffer of indices into array of io_uring_sqe, which is 133 * mmapped by the application using the IORING_OFF_SQES offset. 134 * 135 * This indirection could e.g. be used to assign fixed 136 * io_uring_sqe entries to operations and only submit them to 137 * the queue when needed. 138 * 139 * The kernel modifies neither the indices array nor the entries 140 * array. 141 */ 142 u32 array[]; 143}; 144 145/* 146 * This data is shared with the application through the mmap at offset 147 * IORING_OFF_CQ_RING. 148 * 149 * The offsets to the member fields are published through struct 150 * io_cqring_offsets when calling io_uring_setup. 151 */ 152struct io_cq_ring { 153 /* 154 * Head and tail offsets into the ring; the offsets need to be 155 * masked to get valid indices. 156 * 157 * The application controls head and the kernel tail. 158 */ 159 struct io_uring r; 160 /* 161 * Bitmask to apply to head and tail offsets (constant, equals 162 * ring_entries - 1) 163 */ 164 u32 ring_mask; 165 /* Ring size (constant, power of 2) */ 166 u32 ring_entries; 167 /* 168 * Number of completion events lost because the queue was full; 169 * this should be avoided by the application by making sure 170 * there are not more requests pending thatn there is space in 171 * the completion queue. 172 * 173 * Written by the kernel, shouldn't be modified by the 174 * application (i.e. get number of "new events" by comparing to 175 * cached value). 176 * 177 * As completion events come in out of order this counter is not 178 * ordered with any other data. 179 */ 180 u32 overflow; 181 /* 182 * Ring buffer of completion events. 183 * 184 * The kernel writes completion events fresh every time they are 185 * produced, so the application is allowed to modify pending 186 * entries. 187 */ 188 struct io_uring_cqe cqes[]; 189}; 190 191struct io_mapped_ubuf { 192 u64 ubuf; 193 size_t len; 194 struct bio_vec *bvec; 195 unsigned int nr_bvecs; 196}; 197 198struct async_list { 199 spinlock_t lock; 200 atomic_t cnt; 201 struct list_head list; 202 203 struct file *file; 204 off_t io_end; 205 size_t io_len; 206}; 207 208struct io_ring_ctx { 209 struct { 210 struct percpu_ref refs; 211 } ____cacheline_aligned_in_smp; 212 213 struct { 214 unsigned int flags; 215 bool compat; 216 bool account_mem; 217 218 /* SQ ring */ 219 struct io_sq_ring *sq_ring; 220 unsigned cached_sq_head; 221 unsigned sq_entries; 222 unsigned sq_mask; 223 unsigned sq_thread_idle; 224 struct io_uring_sqe *sq_sqes; 225 226 struct list_head defer_list; 227 } ____cacheline_aligned_in_smp; 228 229 /* IO offload */ 230 struct workqueue_struct *sqo_wq; 231 struct task_struct *sqo_thread; /* if using sq thread polling */ 232 struct mm_struct *sqo_mm; 233 wait_queue_head_t sqo_wait; 234 struct completion sqo_thread_started; 235 236 struct { 237 /* CQ ring */ 238 struct io_cq_ring *cq_ring; 239 unsigned cached_cq_tail; 240 unsigned cq_entries; 241 unsigned cq_mask; 242 struct wait_queue_head cq_wait; 243 struct fasync_struct *cq_fasync; 244 struct eventfd_ctx *cq_ev_fd; 245 } ____cacheline_aligned_in_smp; 246 247 /* 248 * If used, fixed file set. Writers must ensure that ->refs is dead, 249 * readers must ensure that ->refs is alive as long as the file* is 250 * used. Only updated through io_uring_register(2). 251 */ 252 struct file **user_files; 253 unsigned nr_user_files; 254 255 /* if used, fixed mapped user buffers */ 256 unsigned nr_user_bufs; 257 struct io_mapped_ubuf *user_bufs; 258 259 struct user_struct *user; 260 261 struct completion ctx_done; 262 263 struct { 264 struct mutex uring_lock; 265 wait_queue_head_t wait; 266 } ____cacheline_aligned_in_smp; 267 268 struct { 269 spinlock_t completion_lock; 270 bool poll_multi_file; 271 /* 272 * ->poll_list is protected by the ctx->uring_lock for 273 * io_uring instances that don't use IORING_SETUP_SQPOLL. 274 * For SQPOLL, only the single threaded io_sq_thread() will 275 * manipulate the list, hence no extra locking is needed there. 276 */ 277 struct list_head poll_list; 278 struct list_head cancel_list; 279 } ____cacheline_aligned_in_smp; 280 281 struct async_list pending_async[2]; 282 283#if defined(CONFIG_UNIX) 284 struct socket *ring_sock; 285#endif 286}; 287 288struct sqe_submit { 289 const struct io_uring_sqe *sqe; 290 unsigned short index; 291 bool has_user; 292 bool needs_lock; 293 bool needs_fixed_file; 294}; 295 296/* 297 * First field must be the file pointer in all the 298 * iocb unions! See also 'struct kiocb' in <linux/fs.h> 299 */ 300struct io_poll_iocb { 301 struct file *file; 302 struct wait_queue_head *head; 303 __poll_t events; 304 bool done; 305 bool canceled; 306 struct wait_queue_entry wait; 307}; 308 309/* 310 * NOTE! Each of the iocb union members has the file pointer 311 * as the first entry in their struct definition. So you can 312 * access the file pointer through any of the sub-structs, 313 * or directly as just 'ki_filp' in this struct. 314 */ 315struct io_kiocb { 316 union { 317 struct file *file; 318 struct kiocb rw; 319 struct io_poll_iocb poll; 320 }; 321 322 struct sqe_submit submit; 323 324 struct io_ring_ctx *ctx; 325 struct list_head list; 326 struct list_head link_list; 327 unsigned int flags; 328 refcount_t refs; 329#define REQ_F_NOWAIT 1 /* must not punt to workers */ 330#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */ 331#define REQ_F_FIXED_FILE 4 /* ctx owns file */ 332#define REQ_F_SEQ_PREV 8 /* sequential with previous */ 333#define REQ_F_IO_DRAIN 16 /* drain existing IO first */ 334#define REQ_F_IO_DRAINED 32 /* drain done */ 335#define REQ_F_LINK 64 /* linked sqes */ 336#define REQ_F_LINK_DONE 128 /* linked sqes done */ 337#define REQ_F_FAIL_LINK 256 /* fail rest of links */ 338 u64 user_data; 339 u32 result; 340 u32 sequence; 341 342 struct work_struct work; 343}; 344 345#define IO_PLUG_THRESHOLD 2 346#define IO_IOPOLL_BATCH 8 347 348struct io_submit_state { 349 struct blk_plug plug; 350 351 /* 352 * io_kiocb alloc cache 353 */ 354 void *reqs[IO_IOPOLL_BATCH]; 355 unsigned int free_reqs; 356 unsigned int cur_req; 357 358 /* 359 * File reference cache 360 */ 361 struct file *file; 362 unsigned int fd; 363 unsigned int has_refs; 364 unsigned int used_refs; 365 unsigned int ios_left; 366}; 367 368static void io_sq_wq_submit_work(struct work_struct *work); 369 370static struct kmem_cache *req_cachep; 371 372static const struct file_operations io_uring_fops; 373 374struct sock *io_uring_get_socket(struct file *file) 375{ 376#if defined(CONFIG_UNIX) 377 if (file->f_op == &io_uring_fops) { 378 struct io_ring_ctx *ctx = file->private_data; 379 380 return ctx->ring_sock->sk; 381 } 382#endif 383 return NULL; 384} 385EXPORT_SYMBOL(io_uring_get_socket); 386 387static void io_ring_ctx_ref_free(struct percpu_ref *ref) 388{ 389 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); 390 391 complete(&ctx->ctx_done); 392} 393 394static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) 395{ 396 struct io_ring_ctx *ctx; 397 int i; 398 399 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 400 if (!ctx) 401 return NULL; 402 403 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 404 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) { 405 kfree(ctx); 406 return NULL; 407 } 408 409 ctx->flags = p->flags; 410 init_waitqueue_head(&ctx->cq_wait); 411 init_completion(&ctx->ctx_done); 412 init_completion(&ctx->sqo_thread_started); 413 mutex_init(&ctx->uring_lock); 414 init_waitqueue_head(&ctx->wait); 415 for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) { 416 spin_lock_init(&ctx->pending_async[i].lock); 417 INIT_LIST_HEAD(&ctx->pending_async[i].list); 418 atomic_set(&ctx->pending_async[i].cnt, 0); 419 } 420 spin_lock_init(&ctx->completion_lock); 421 INIT_LIST_HEAD(&ctx->poll_list); 422 INIT_LIST_HEAD(&ctx->cancel_list); 423 INIT_LIST_HEAD(&ctx->defer_list); 424 return ctx; 425} 426 427static inline bool io_sequence_defer(struct io_ring_ctx *ctx, 428 struct io_kiocb *req) 429{ 430 if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) 431 return false; 432 433 return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped; 434} 435 436static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) 437{ 438 struct io_kiocb *req; 439 440 if (list_empty(&ctx->defer_list)) 441 return NULL; 442 443 req = list_first_entry(&ctx->defer_list, struct io_kiocb, list); 444 if (!io_sequence_defer(ctx, req)) { 445 list_del_init(&req->list); 446 return req; 447 } 448 449 return NULL; 450} 451 452static void __io_commit_cqring(struct io_ring_ctx *ctx) 453{ 454 struct io_cq_ring *ring = ctx->cq_ring; 455 456 if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) { 457 /* order cqe stores with ring update */ 458 smp_store_release(&ring->r.tail, ctx->cached_cq_tail); 459 460 if (wq_has_sleeper(&ctx->cq_wait)) { 461 wake_up_interruptible(&ctx->cq_wait); 462 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); 463 } 464 } 465} 466 467static void io_commit_cqring(struct io_ring_ctx *ctx) 468{ 469 struct io_kiocb *req; 470 471 __io_commit_cqring(ctx); 472 473 while ((req = io_get_deferred_req(ctx)) != NULL) { 474 req->flags |= REQ_F_IO_DRAINED; 475 queue_work(ctx->sqo_wq, &req->work); 476 } 477} 478 479static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) 480{ 481 struct io_cq_ring *ring = ctx->cq_ring; 482 unsigned tail; 483 484 tail = ctx->cached_cq_tail; 485 /* 486 * writes to the cq entry need to come after reading head; the 487 * control dependency is enough as we're using WRITE_ONCE to 488 * fill the cq entry 489 */ 490 if (tail - READ_ONCE(ring->r.head) == ring->ring_entries) 491 return NULL; 492 493 ctx->cached_cq_tail++; 494 return &ring->cqes[tail & ctx->cq_mask]; 495} 496 497static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, 498 long res) 499{ 500 struct io_uring_cqe *cqe; 501 502 /* 503 * If we can't get a cq entry, userspace overflowed the 504 * submission (by quite a lot). Increment the overflow count in 505 * the ring. 506 */ 507 cqe = io_get_cqring(ctx); 508 if (cqe) { 509 WRITE_ONCE(cqe->user_data, ki_user_data); 510 WRITE_ONCE(cqe->res, res); 511 WRITE_ONCE(cqe->flags, 0); 512 } else { 513 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow); 514 515 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1); 516 } 517} 518 519static void io_cqring_ev_posted(struct io_ring_ctx *ctx) 520{ 521 if (waitqueue_active(&ctx->wait)) 522 wake_up(&ctx->wait); 523 if (waitqueue_active(&ctx->sqo_wait)) 524 wake_up(&ctx->sqo_wait); 525 if (ctx->cq_ev_fd) 526 eventfd_signal(ctx->cq_ev_fd, 1); 527} 528 529static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data, 530 long res) 531{ 532 unsigned long flags; 533 534 spin_lock_irqsave(&ctx->completion_lock, flags); 535 io_cqring_fill_event(ctx, user_data, res); 536 io_commit_cqring(ctx); 537 spin_unlock_irqrestore(&ctx->completion_lock, flags); 538 539 io_cqring_ev_posted(ctx); 540} 541 542static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs) 543{ 544 percpu_ref_put_many(&ctx->refs, refs); 545 546 if (waitqueue_active(&ctx->wait)) 547 wake_up(&ctx->wait); 548} 549 550static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, 551 struct io_submit_state *state) 552{ 553 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN; 554 struct io_kiocb *req; 555 556 if (!percpu_ref_tryget(&ctx->refs)) 557 return NULL; 558 559 if (!state) { 560 req = kmem_cache_alloc(req_cachep, gfp); 561 if (unlikely(!req)) 562 goto out; 563 } else if (!state->free_reqs) { 564 size_t sz; 565 int ret; 566 567 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs)); 568 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs); 569 570 /* 571 * Bulk alloc is all-or-nothing. If we fail to get a batch, 572 * retry single alloc to be on the safe side. 573 */ 574 if (unlikely(ret <= 0)) { 575 state->reqs[0] = kmem_cache_alloc(req_cachep, gfp); 576 if (!state->reqs[0]) 577 goto out; 578 ret = 1; 579 } 580 state->free_reqs = ret - 1; 581 state->cur_req = 1; 582 req = state->reqs[0]; 583 } else { 584 req = state->reqs[state->cur_req]; 585 state->free_reqs--; 586 state->cur_req++; 587 } 588 589 req->file = NULL; 590 req->ctx = ctx; 591 req->flags = 0; 592 /* one is dropped after submission, the other at completion */ 593 refcount_set(&req->refs, 2); 594 req->result = 0; 595 return req; 596out: 597 io_ring_drop_ctx_refs(ctx, 1); 598 return NULL; 599} 600 601static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr) 602{ 603 if (*nr) { 604 kmem_cache_free_bulk(req_cachep, *nr, reqs); 605 io_ring_drop_ctx_refs(ctx, *nr); 606 *nr = 0; 607 } 608} 609 610static void __io_free_req(struct io_kiocb *req) 611{ 612 if (req->file && !(req->flags & REQ_F_FIXED_FILE)) 613 fput(req->file); 614 io_ring_drop_ctx_refs(req->ctx, 1); 615 kmem_cache_free(req_cachep, req); 616} 617 618static void io_req_link_next(struct io_kiocb *req) 619{ 620 struct io_kiocb *nxt; 621 622 /* 623 * The list should never be empty when we are called here. But could 624 * potentially happen if the chain is messed up, check to be on the 625 * safe side. 626 */ 627 nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list); 628 if (nxt) { 629 list_del(&nxt->list); 630 if (!list_empty(&req->link_list)) { 631 INIT_LIST_HEAD(&nxt->link_list); 632 list_splice(&req->link_list, &nxt->link_list); 633 nxt->flags |= REQ_F_LINK; 634 } 635 636 nxt->flags |= REQ_F_LINK_DONE; 637 INIT_WORK(&nxt->work, io_sq_wq_submit_work); 638 queue_work(req->ctx->sqo_wq, &nxt->work); 639 } 640} 641 642/* 643 * Called if REQ_F_LINK is set, and we fail the head request 644 */ 645static void io_fail_links(struct io_kiocb *req) 646{ 647 struct io_kiocb *link; 648 649 while (!list_empty(&req->link_list)) { 650 link = list_first_entry(&req->link_list, struct io_kiocb, list); 651 list_del(&link->list); 652 653 io_cqring_add_event(req->ctx, link->user_data, -ECANCELED); 654 __io_free_req(link); 655 } 656} 657 658static void io_free_req(struct io_kiocb *req) 659{ 660 /* 661 * If LINK is set, we have dependent requests in this chain. If we 662 * didn't fail this request, queue the first one up, moving any other 663 * dependencies to the next request. In case of failure, fail the rest 664 * of the chain. 665 */ 666 if (req->flags & REQ_F_LINK) { 667 if (req->flags & REQ_F_FAIL_LINK) 668 io_fail_links(req); 669 else 670 io_req_link_next(req); 671 } 672 673 __io_free_req(req); 674} 675 676static void io_put_req(struct io_kiocb *req) 677{ 678 if (refcount_dec_and_test(&req->refs)) 679 io_free_req(req); 680} 681 682static unsigned io_cqring_events(struct io_cq_ring *ring) 683{ 684 /* See comment at the top of this file */ 685 smp_rmb(); 686 return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head); 687} 688 689/* 690 * Find and free completed poll iocbs 691 */ 692static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, 693 struct list_head *done) 694{ 695 void *reqs[IO_IOPOLL_BATCH]; 696 struct io_kiocb *req; 697 int to_free; 698 699 to_free = 0; 700 while (!list_empty(done)) { 701 req = list_first_entry(done, struct io_kiocb, list); 702 list_del(&req->list); 703 704 io_cqring_fill_event(ctx, req->user_data, req->result); 705 (*nr_events)++; 706 707 if (refcount_dec_and_test(&req->refs)) { 708 /* If we're not using fixed files, we have to pair the 709 * completion part with the file put. Use regular 710 * completions for those, only batch free for fixed 711 * file and non-linked commands. 712 */ 713 if ((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == 714 REQ_F_FIXED_FILE) { 715 reqs[to_free++] = req; 716 if (to_free == ARRAY_SIZE(reqs)) 717 io_free_req_many(ctx, reqs, &to_free); 718 } else { 719 io_free_req(req); 720 } 721 } 722 } 723 724 io_commit_cqring(ctx); 725 io_free_req_many(ctx, reqs, &to_free); 726} 727 728static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, 729 long min) 730{ 731 struct io_kiocb *req, *tmp; 732 LIST_HEAD(done); 733 bool spin; 734 int ret; 735 736 /* 737 * Only spin for completions if we don't have multiple devices hanging 738 * off our complete list, and we're under the requested amount. 739 */ 740 spin = !ctx->poll_multi_file && *nr_events < min; 741 742 ret = 0; 743 list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) { 744 struct kiocb *kiocb = &req->rw; 745 746 /* 747 * Move completed entries to our local list. If we find a 748 * request that requires polling, break out and complete 749 * the done list first, if we have entries there. 750 */ 751 if (req->flags & REQ_F_IOPOLL_COMPLETED) { 752 list_move_tail(&req->list, &done); 753 continue; 754 } 755 if (!list_empty(&done)) 756 break; 757 758 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin); 759 if (ret < 0) 760 break; 761 762 if (ret && spin) 763 spin = false; 764 ret = 0; 765 } 766 767 if (!list_empty(&done)) 768 io_iopoll_complete(ctx, nr_events, &done); 769 770 return ret; 771} 772 773/* 774 * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a 775 * non-spinning poll check - we'll still enter the driver poll loop, but only 776 * as a non-spinning completion check. 777 */ 778static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events, 779 long min) 780{ 781 while (!list_empty(&ctx->poll_list) && !need_resched()) { 782 int ret; 783 784 ret = io_do_iopoll(ctx, nr_events, min); 785 if (ret < 0) 786 return ret; 787 if (!min || *nr_events >= min) 788 return 0; 789 } 790 791 return 1; 792} 793 794/* 795 * We can't just wait for polled events to come to us, we have to actively 796 * find and complete them. 797 */ 798static void io_iopoll_reap_events(struct io_ring_ctx *ctx) 799{ 800 if (!(ctx->flags & IORING_SETUP_IOPOLL)) 801 return; 802 803 mutex_lock(&ctx->uring_lock); 804 while (!list_empty(&ctx->poll_list)) { 805 unsigned int nr_events = 0; 806 807 io_iopoll_getevents(ctx, &nr_events, 1); 808 809 /* 810 * Ensure we allow local-to-the-cpu processing to take place, 811 * in this case we need to ensure that we reap all events. 812 */ 813 cond_resched(); 814 } 815 mutex_unlock(&ctx->uring_lock); 816} 817 818static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, 819 long min) 820{ 821 int iters, ret = 0; 822 823 /* 824 * We disallow the app entering submit/complete with polling, but we 825 * still need to lock the ring to prevent racing with polled issue 826 * that got punted to a workqueue. 827 */ 828 mutex_lock(&ctx->uring_lock); 829 830 iters = 0; 831 do { 832 int tmin = 0; 833 834 /* 835 * Don't enter poll loop if we already have events pending. 836 * If we do, we can potentially be spinning for commands that 837 * already triggered a CQE (eg in error). 838 */ 839 if (io_cqring_events(ctx->cq_ring)) 840 break; 841 842 /* 843 * If a submit got punted to a workqueue, we can have the 844 * application entering polling for a command before it gets 845 * issued. That app will hold the uring_lock for the duration 846 * of the poll right here, so we need to take a breather every 847 * now and then to ensure that the issue has a chance to add 848 * the poll to the issued list. Otherwise we can spin here 849 * forever, while the workqueue is stuck trying to acquire the 850 * very same mutex. 851 */ 852 if (!(++iters & 7)) { 853 mutex_unlock(&ctx->uring_lock); 854 mutex_lock(&ctx->uring_lock); 855 } 856 857 if (*nr_events < min) 858 tmin = min - *nr_events; 859 860 ret = io_iopoll_getevents(ctx, nr_events, tmin); 861 if (ret <= 0) 862 break; 863 ret = 0; 864 } while (min && !*nr_events && !need_resched()); 865 866 mutex_unlock(&ctx->uring_lock); 867 return ret; 868} 869 870static void kiocb_end_write(struct kiocb *kiocb) 871{ 872 if (kiocb->ki_flags & IOCB_WRITE) { 873 struct inode *inode = file_inode(kiocb->ki_filp); 874 875 /* 876 * Tell lockdep we inherited freeze protection from submission 877 * thread. 878 */ 879 if (S_ISREG(inode->i_mode)) 880 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE); 881 file_end_write(kiocb->ki_filp); 882 } 883} 884 885static void io_complete_rw(struct kiocb *kiocb, long res, long res2) 886{ 887 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 888 889 kiocb_end_write(kiocb); 890 891 if ((req->flags & REQ_F_LINK) && res != req->result) 892 req->flags |= REQ_F_FAIL_LINK; 893 io_cqring_add_event(req->ctx, req->user_data, res); 894 io_put_req(req); 895} 896 897static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) 898{ 899 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 900 901 kiocb_end_write(kiocb); 902 903 if ((req->flags & REQ_F_LINK) && res != req->result) 904 req->flags |= REQ_F_FAIL_LINK; 905 req->result = res; 906 if (res != -EAGAIN) 907 req->flags |= REQ_F_IOPOLL_COMPLETED; 908} 909 910/* 911 * After the iocb has been issued, it's safe to be found on the poll list. 912 * Adding the kiocb to the list AFTER submission ensures that we don't 913 * find it from a io_iopoll_getevents() thread before the issuer is done 914 * accessing the kiocb cookie. 915 */ 916static void io_iopoll_req_issued(struct io_kiocb *req) 917{ 918 struct io_ring_ctx *ctx = req->ctx; 919 920 /* 921 * Track whether we have multiple files in our lists. This will impact 922 * how we do polling eventually, not spinning if we're on potentially 923 * different devices. 924 */ 925 if (list_empty(&ctx->poll_list)) { 926 ctx->poll_multi_file = false; 927 } else if (!ctx->poll_multi_file) { 928 struct io_kiocb *list_req; 929 930 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb, 931 list); 932 if (list_req->rw.ki_filp != req->rw.ki_filp) 933 ctx->poll_multi_file = true; 934 } 935 936 /* 937 * For fast devices, IO may have already completed. If it has, add 938 * it to the front so we find it first. 939 */ 940 if (req->flags & REQ_F_IOPOLL_COMPLETED) 941 list_add(&req->list, &ctx->poll_list); 942 else 943 list_add_tail(&req->list, &ctx->poll_list); 944} 945 946static void io_file_put(struct io_submit_state *state) 947{ 948 if (state->file) { 949 int diff = state->has_refs - state->used_refs; 950 951 if (diff) 952 fput_many(state->file, diff); 953 state->file = NULL; 954 } 955} 956 957/* 958 * Get as many references to a file as we have IOs left in this submission, 959 * assuming most submissions are for one file, or at least that each file 960 * has more than one submission. 961 */ 962static struct file *io_file_get(struct io_submit_state *state, int fd) 963{ 964 if (!state) 965 return fget(fd); 966 967 if (state->file) { 968 if (state->fd == fd) { 969 state->used_refs++; 970 state->ios_left--; 971 return state->file; 972 } 973 io_file_put(state); 974 } 975 state->file = fget_many(fd, state->ios_left); 976 if (!state->file) 977 return NULL; 978 979 state->fd = fd; 980 state->has_refs = state->ios_left; 981 state->used_refs = 1; 982 state->ios_left--; 983 return state->file; 984} 985 986/* 987 * If we tracked the file through the SCM inflight mechanism, we could support 988 * any file. For now, just ensure that anything potentially problematic is done 989 * inline. 990 */ 991static bool io_file_supports_async(struct file *file) 992{ 993 umode_t mode = file_inode(file)->i_mode; 994 995 if (S_ISBLK(mode) || S_ISCHR(mode)) 996 return true; 997 if (S_ISREG(mode) && file->f_op != &io_uring_fops) 998 return true; 999 1000 return false; 1001} 1002 1003static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, 1004 bool force_nonblock) 1005{ 1006 const struct io_uring_sqe *sqe = s->sqe; 1007 struct io_ring_ctx *ctx = req->ctx; 1008 struct kiocb *kiocb = &req->rw; 1009 unsigned ioprio; 1010 int ret; 1011 1012 if (!req->file) 1013 return -EBADF; 1014 1015 if (force_nonblock && !io_file_supports_async(req->file)) 1016 force_nonblock = false; 1017 1018 kiocb->ki_pos = READ_ONCE(sqe->off); 1019 kiocb->ki_flags = iocb_flags(kiocb->ki_filp); 1020 kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp)); 1021 1022 ioprio = READ_ONCE(sqe->ioprio); 1023 if (ioprio) { 1024 ret = ioprio_check_cap(ioprio); 1025 if (ret) 1026 return ret; 1027 1028 kiocb->ki_ioprio = ioprio; 1029 } else 1030 kiocb->ki_ioprio = get_current_ioprio(); 1031 1032 ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags)); 1033 if (unlikely(ret)) 1034 return ret; 1035 1036 /* don't allow async punt if RWF_NOWAIT was requested */ 1037 if (kiocb->ki_flags & IOCB_NOWAIT) 1038 req->flags |= REQ_F_NOWAIT; 1039 1040 if (force_nonblock) 1041 kiocb->ki_flags |= IOCB_NOWAIT; 1042 1043 if (ctx->flags & IORING_SETUP_IOPOLL) { 1044 if (!(kiocb->ki_flags & IOCB_DIRECT) || 1045 !kiocb->ki_filp->f_op->iopoll) 1046 return -EOPNOTSUPP; 1047 1048 kiocb->ki_flags |= IOCB_HIPRI; 1049 kiocb->ki_complete = io_complete_rw_iopoll; 1050 } else { 1051 if (kiocb->ki_flags & IOCB_HIPRI) 1052 return -EINVAL; 1053 kiocb->ki_complete = io_complete_rw; 1054 } 1055 return 0; 1056} 1057 1058static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) 1059{ 1060 switch (ret) { 1061 case -EIOCBQUEUED: 1062 break; 1063 case -ERESTARTSYS: 1064 case -ERESTARTNOINTR: 1065 case -ERESTARTNOHAND: 1066 case -ERESTART_RESTARTBLOCK: 1067 /* 1068 * We can't just restart the syscall, since previously 1069 * submitted sqes may already be in progress. Just fail this 1070 * IO with EINTR. 1071 */ 1072 ret = -EINTR; 1073 /* fall through */ 1074 default: 1075 kiocb->ki_complete(kiocb, ret, 0); 1076 } 1077} 1078 1079static int io_import_fixed(struct io_ring_ctx *ctx, int rw, 1080 const struct io_uring_sqe *sqe, 1081 struct iov_iter *iter) 1082{ 1083 size_t len = READ_ONCE(sqe->len); 1084 struct io_mapped_ubuf *imu; 1085 unsigned index, buf_index; 1086 size_t offset; 1087 u64 buf_addr; 1088 1089 /* attempt to use fixed buffers without having provided iovecs */ 1090 if (unlikely(!ctx->user_bufs)) 1091 return -EFAULT; 1092 1093 buf_index = READ_ONCE(sqe->buf_index); 1094 if (unlikely(buf_index >= ctx->nr_user_bufs)) 1095 return -EFAULT; 1096 1097 index = array_index_nospec(buf_index, ctx->nr_user_bufs); 1098 imu = &ctx->user_bufs[index]; 1099 buf_addr = READ_ONCE(sqe->addr); 1100 1101 /* overflow */ 1102 if (buf_addr + len < buf_addr) 1103 return -EFAULT; 1104 /* not inside the mapped region */ 1105 if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len) 1106 return -EFAULT; 1107 1108 /* 1109 * May not be a start of buffer, set size appropriately 1110 * and advance us to the beginning. 1111 */ 1112 offset = buf_addr - imu->ubuf; 1113 iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len); 1114 1115 if (offset) { 1116 /* 1117 * Don't use iov_iter_advance() here, as it's really slow for 1118 * using the latter parts of a big fixed buffer - it iterates 1119 * over each segment manually. We can cheat a bit here, because 1120 * we know that: 1121 * 1122 * 1) it's a BVEC iter, we set it up 1123 * 2) all bvecs are PAGE_SIZE in size, except potentially the 1124 * first and last bvec 1125 * 1126 * So just find our index, and adjust the iterator afterwards. 1127 * If the offset is within the first bvec (or the whole first 1128 * bvec, just use iov_iter_advance(). This makes it easier 1129 * since we can just skip the first segment, which may not 1130 * be PAGE_SIZE aligned. 1131 */ 1132 const struct bio_vec *bvec = imu->bvec; 1133 1134 if (offset <= bvec->bv_len) { 1135 iov_iter_advance(iter, offset); 1136 } else { 1137 unsigned long seg_skip; 1138 1139 /* skip first vec */ 1140 offset -= bvec->bv_len; 1141 seg_skip = 1 + (offset >> PAGE_SHIFT); 1142 1143 iter->bvec = bvec + seg_skip; 1144 iter->nr_segs -= seg_skip; 1145 iter->count -= bvec->bv_len + offset; 1146 iter->iov_offset = offset & ~PAGE_MASK; 1147 } 1148 } 1149 1150 return 0; 1151} 1152 1153static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw, 1154 const struct sqe_submit *s, struct iovec **iovec, 1155 struct iov_iter *iter) 1156{ 1157 const struct io_uring_sqe *sqe = s->sqe; 1158 void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr)); 1159 size_t sqe_len = READ_ONCE(sqe->len); 1160 u8 opcode; 1161 1162 /* 1163 * We're reading ->opcode for the second time, but the first read 1164 * doesn't care whether it's _FIXED or not, so it doesn't matter 1165 * whether ->opcode changes concurrently. The first read does care 1166 * about whether it is a READ or a WRITE, so we don't trust this read 1167 * for that purpose and instead let the caller pass in the read/write 1168 * flag. 1169 */ 1170 opcode = READ_ONCE(sqe->opcode); 1171 if (opcode == IORING_OP_READ_FIXED || 1172 opcode == IORING_OP_WRITE_FIXED) { 1173 ssize_t ret = io_import_fixed(ctx, rw, sqe, iter); 1174 *iovec = NULL; 1175 return ret; 1176 } 1177 1178 if (!s->has_user) 1179 return -EFAULT; 1180 1181#ifdef CONFIG_COMPAT 1182 if (ctx->compat) 1183 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV, 1184 iovec, iter); 1185#endif 1186 1187 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); 1188} 1189 1190/* 1191 * Make a note of the last file/offset/direction we punted to async 1192 * context. We'll use this information to see if we can piggy back a 1193 * sequential request onto the previous one, if it's still hasn't been 1194 * completed by the async worker. 1195 */ 1196static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) 1197{ 1198 struct async_list *async_list = &req->ctx->pending_async[rw]; 1199 struct kiocb *kiocb = &req->rw; 1200 struct file *filp = kiocb->ki_filp; 1201 off_t io_end = kiocb->ki_pos + len; 1202 1203 if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) { 1204 unsigned long max_bytes; 1205 1206 /* Use 8x RA size as a decent limiter for both reads/writes */ 1207 max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3); 1208 if (!max_bytes) 1209 max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3); 1210 1211 /* If max len are exceeded, reset the state */ 1212 if (async_list->io_len + len <= max_bytes) { 1213 req->flags |= REQ_F_SEQ_PREV; 1214 async_list->io_len += len; 1215 } else { 1216 io_end = 0; 1217 async_list->io_len = 0; 1218 } 1219 } 1220 1221 /* New file? Reset state. */ 1222 if (async_list->file != filp) { 1223 async_list->io_len = 0; 1224 async_list->file = filp; 1225 } 1226 async_list->io_end = io_end; 1227} 1228 1229static int io_read(struct io_kiocb *req, const struct sqe_submit *s, 1230 bool force_nonblock) 1231{ 1232 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1233 struct kiocb *kiocb = &req->rw; 1234 struct iov_iter iter; 1235 struct file *file; 1236 size_t iov_count; 1237 ssize_t read_size, ret; 1238 1239 ret = io_prep_rw(req, s, force_nonblock); 1240 if (ret) 1241 return ret; 1242 file = kiocb->ki_filp; 1243 1244 if (unlikely(!(file->f_mode & FMODE_READ))) 1245 return -EBADF; 1246 if (unlikely(!file->f_op->read_iter)) 1247 return -EINVAL; 1248 1249 ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter); 1250 if (ret < 0) 1251 return ret; 1252 1253 read_size = ret; 1254 if (req->flags & REQ_F_LINK) 1255 req->result = read_size; 1256 1257 iov_count = iov_iter_count(&iter); 1258 ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count); 1259 if (!ret) { 1260 ssize_t ret2; 1261 1262 ret2 = call_read_iter(file, kiocb, &iter); 1263 /* 1264 * In case of a short read, punt to async. This can happen 1265 * if we have data partially cached. Alternatively we can 1266 * return the short read, in which case the application will 1267 * need to issue another SQE and wait for it. That SQE will 1268 * need async punt anyway, so it's more efficient to do it 1269 * here. 1270 */ 1271 if (force_nonblock && ret2 > 0 && ret2 < read_size) 1272 ret2 = -EAGAIN; 1273 /* Catch -EAGAIN return for forced non-blocking submission */ 1274 if (!force_nonblock || ret2 != -EAGAIN) { 1275 io_rw_done(kiocb, ret2); 1276 } else { 1277 /* 1278 * If ->needs_lock is true, we're already in async 1279 * context. 1280 */ 1281 if (!s->needs_lock) 1282 io_async_list_note(READ, req, iov_count); 1283 ret = -EAGAIN; 1284 } 1285 } 1286 kfree(iovec); 1287 return ret; 1288} 1289 1290static int io_write(struct io_kiocb *req, const struct sqe_submit *s, 1291 bool force_nonblock) 1292{ 1293 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1294 struct kiocb *kiocb = &req->rw; 1295 struct iov_iter iter; 1296 struct file *file; 1297 size_t iov_count; 1298 ssize_t ret; 1299 1300 ret = io_prep_rw(req, s, force_nonblock); 1301 if (ret) 1302 return ret; 1303 1304 file = kiocb->ki_filp; 1305 if (unlikely(!(file->f_mode & FMODE_WRITE))) 1306 return -EBADF; 1307 if (unlikely(!file->f_op->write_iter)) 1308 return -EINVAL; 1309 1310 ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter); 1311 if (ret < 0) 1312 return ret; 1313 1314 if (req->flags & REQ_F_LINK) 1315 req->result = ret; 1316 1317 iov_count = iov_iter_count(&iter); 1318 1319 ret = -EAGAIN; 1320 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) { 1321 /* If ->needs_lock is true, we're already in async context. */ 1322 if (!s->needs_lock) 1323 io_async_list_note(WRITE, req, iov_count); 1324 goto out_free; 1325 } 1326 1327 ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count); 1328 if (!ret) { 1329 ssize_t ret2; 1330 1331 /* 1332 * Open-code file_start_write here to grab freeze protection, 1333 * which will be released by another thread in 1334 * io_complete_rw(). Fool lockdep by telling it the lock got 1335 * released so that it doesn't complain about the held lock when 1336 * we return to userspace. 1337 */ 1338 if (S_ISREG(file_inode(file)->i_mode)) { 1339 __sb_start_write(file_inode(file)->i_sb, 1340 SB_FREEZE_WRITE, true); 1341 __sb_writers_release(file_inode(file)->i_sb, 1342 SB_FREEZE_WRITE); 1343 } 1344 kiocb->ki_flags |= IOCB_WRITE; 1345 1346 ret2 = call_write_iter(file, kiocb, &iter); 1347 if (!force_nonblock || ret2 != -EAGAIN) { 1348 io_rw_done(kiocb, ret2); 1349 } else { 1350 /* 1351 * If ->needs_lock is true, we're already in async 1352 * context. 1353 */ 1354 if (!s->needs_lock) 1355 io_async_list_note(WRITE, req, iov_count); 1356 ret = -EAGAIN; 1357 } 1358 } 1359out_free: 1360 kfree(iovec); 1361 return ret; 1362} 1363 1364/* 1365 * IORING_OP_NOP just posts a completion event, nothing else. 1366 */ 1367static int io_nop(struct io_kiocb *req, u64 user_data) 1368{ 1369 struct io_ring_ctx *ctx = req->ctx; 1370 long err = 0; 1371 1372 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1373 return -EINVAL; 1374 1375 io_cqring_add_event(ctx, user_data, err); 1376 io_put_req(req); 1377 return 0; 1378} 1379 1380static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1381{ 1382 struct io_ring_ctx *ctx = req->ctx; 1383 1384 if (!req->file) 1385 return -EBADF; 1386 1387 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1388 return -EINVAL; 1389 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 1390 return -EINVAL; 1391 1392 return 0; 1393} 1394 1395static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1396 bool force_nonblock) 1397{ 1398 loff_t sqe_off = READ_ONCE(sqe->off); 1399 loff_t sqe_len = READ_ONCE(sqe->len); 1400 loff_t end = sqe_off + sqe_len; 1401 unsigned fsync_flags; 1402 int ret; 1403 1404 fsync_flags = READ_ONCE(sqe->fsync_flags); 1405 if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC)) 1406 return -EINVAL; 1407 1408 ret = io_prep_fsync(req, sqe); 1409 if (ret) 1410 return ret; 1411 1412 /* fsync always requires a blocking context */ 1413 if (force_nonblock) 1414 return -EAGAIN; 1415 1416 ret = vfs_fsync_range(req->rw.ki_filp, sqe_off, 1417 end > 0 ? end : LLONG_MAX, 1418 fsync_flags & IORING_FSYNC_DATASYNC); 1419 1420 if (ret < 0 && (req->flags & REQ_F_LINK)) 1421 req->flags |= REQ_F_FAIL_LINK; 1422 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1423 io_put_req(req); 1424 return 0; 1425} 1426 1427static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1428{ 1429 struct io_ring_ctx *ctx = req->ctx; 1430 int ret = 0; 1431 1432 if (!req->file) 1433 return -EBADF; 1434 1435 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1436 return -EINVAL; 1437 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 1438 return -EINVAL; 1439 1440 return ret; 1441} 1442 1443static int io_sync_file_range(struct io_kiocb *req, 1444 const struct io_uring_sqe *sqe, 1445 bool force_nonblock) 1446{ 1447 loff_t sqe_off; 1448 loff_t sqe_len; 1449 unsigned flags; 1450 int ret; 1451 1452 ret = io_prep_sfr(req, sqe); 1453 if (ret) 1454 return ret; 1455 1456 /* sync_file_range always requires a blocking context */ 1457 if (force_nonblock) 1458 return -EAGAIN; 1459 1460 sqe_off = READ_ONCE(sqe->off); 1461 sqe_len = READ_ONCE(sqe->len); 1462 flags = READ_ONCE(sqe->sync_range_flags); 1463 1464 ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags); 1465 1466 if (ret < 0 && (req->flags & REQ_F_LINK)) 1467 req->flags |= REQ_F_FAIL_LINK; 1468 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1469 io_put_req(req); 1470 return 0; 1471} 1472 1473#if defined(CONFIG_NET) 1474static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1475 bool force_nonblock, 1476 long (*fn)(struct socket *, struct user_msghdr __user *, 1477 unsigned int)) 1478{ 1479 struct socket *sock; 1480 int ret; 1481 1482 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1483 return -EINVAL; 1484 1485 sock = sock_from_file(req->file, &ret); 1486 if (sock) { 1487 struct user_msghdr __user *msg; 1488 unsigned flags; 1489 1490 flags = READ_ONCE(sqe->msg_flags); 1491 if (flags & MSG_DONTWAIT) 1492 req->flags |= REQ_F_NOWAIT; 1493 else if (force_nonblock) 1494 flags |= MSG_DONTWAIT; 1495 1496 msg = (struct user_msghdr __user *) (unsigned long) 1497 READ_ONCE(sqe->addr); 1498 1499 ret = fn(sock, msg, flags); 1500 if (force_nonblock && ret == -EAGAIN) 1501 return ret; 1502 } 1503 1504 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1505 io_put_req(req); 1506 return 0; 1507} 1508#endif 1509 1510static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1511 bool force_nonblock) 1512{ 1513#if defined(CONFIG_NET) 1514 return io_send_recvmsg(req, sqe, force_nonblock, __sys_sendmsg_sock); 1515#else 1516 return -EOPNOTSUPP; 1517#endif 1518} 1519 1520static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1521 bool force_nonblock) 1522{ 1523#if defined(CONFIG_NET) 1524 return io_send_recvmsg(req, sqe, force_nonblock, __sys_recvmsg_sock); 1525#else 1526 return -EOPNOTSUPP; 1527#endif 1528} 1529 1530static void io_poll_remove_one(struct io_kiocb *req) 1531{ 1532 struct io_poll_iocb *poll = &req->poll; 1533 1534 spin_lock(&poll->head->lock); 1535 WRITE_ONCE(poll->canceled, true); 1536 if (!list_empty(&poll->wait.entry)) { 1537 list_del_init(&poll->wait.entry); 1538 queue_work(req->ctx->sqo_wq, &req->work); 1539 } 1540 spin_unlock(&poll->head->lock); 1541 1542 list_del_init(&req->list); 1543} 1544 1545static void io_poll_remove_all(struct io_ring_ctx *ctx) 1546{ 1547 struct io_kiocb *req; 1548 1549 spin_lock_irq(&ctx->completion_lock); 1550 while (!list_empty(&ctx->cancel_list)) { 1551 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list); 1552 io_poll_remove_one(req); 1553 } 1554 spin_unlock_irq(&ctx->completion_lock); 1555} 1556 1557/* 1558 * Find a running poll command that matches one specified in sqe->addr, 1559 * and remove it if found. 1560 */ 1561static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1562{ 1563 struct io_ring_ctx *ctx = req->ctx; 1564 struct io_kiocb *poll_req, *next; 1565 int ret = -ENOENT; 1566 1567 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1568 return -EINVAL; 1569 if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index || 1570 sqe->poll_events) 1571 return -EINVAL; 1572 1573 spin_lock_irq(&ctx->completion_lock); 1574 list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) { 1575 if (READ_ONCE(sqe->addr) == poll_req->user_data) { 1576 io_poll_remove_one(poll_req); 1577 ret = 0; 1578 break; 1579 } 1580 } 1581 spin_unlock_irq(&ctx->completion_lock); 1582 1583 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1584 io_put_req(req); 1585 return 0; 1586} 1587 1588static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req, 1589 __poll_t mask) 1590{ 1591 req->poll.done = true; 1592 io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask)); 1593 io_commit_cqring(ctx); 1594} 1595 1596static void io_poll_complete_work(struct work_struct *work) 1597{ 1598 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1599 struct io_poll_iocb *poll = &req->poll; 1600 struct poll_table_struct pt = { ._key = poll->events }; 1601 struct io_ring_ctx *ctx = req->ctx; 1602 __poll_t mask = 0; 1603 1604 if (!READ_ONCE(poll->canceled)) 1605 mask = vfs_poll(poll->file, &pt) & poll->events; 1606 1607 /* 1608 * Note that ->ki_cancel callers also delete iocb from active_reqs after 1609 * calling ->ki_cancel. We need the ctx_lock roundtrip here to 1610 * synchronize with them. In the cancellation case the list_del_init 1611 * itself is not actually needed, but harmless so we keep it in to 1612 * avoid further branches in the fast path. 1613 */ 1614 spin_lock_irq(&ctx->completion_lock); 1615 if (!mask && !READ_ONCE(poll->canceled)) { 1616 add_wait_queue(poll->head, &poll->wait); 1617 spin_unlock_irq(&ctx->completion_lock); 1618 return; 1619 } 1620 list_del_init(&req->list); 1621 io_poll_complete(ctx, req, mask); 1622 spin_unlock_irq(&ctx->completion_lock); 1623 1624 io_cqring_ev_posted(ctx); 1625 io_put_req(req); 1626} 1627 1628static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, 1629 void *key) 1630{ 1631 struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb, 1632 wait); 1633 struct io_kiocb *req = container_of(poll, struct io_kiocb, poll); 1634 struct io_ring_ctx *ctx = req->ctx; 1635 __poll_t mask = key_to_poll(key); 1636 unsigned long flags; 1637 1638 /* for instances that support it check for an event match first: */ 1639 if (mask && !(mask & poll->events)) 1640 return 0; 1641 1642 list_del_init(&poll->wait.entry); 1643 1644 if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) { 1645 list_del(&req->list); 1646 io_poll_complete(ctx, req, mask); 1647 spin_unlock_irqrestore(&ctx->completion_lock, flags); 1648 1649 io_cqring_ev_posted(ctx); 1650 io_put_req(req); 1651 } else { 1652 queue_work(ctx->sqo_wq, &req->work); 1653 } 1654 1655 return 1; 1656} 1657 1658struct io_poll_table { 1659 struct poll_table_struct pt; 1660 struct io_kiocb *req; 1661 int error; 1662}; 1663 1664static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, 1665 struct poll_table_struct *p) 1666{ 1667 struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); 1668 1669 if (unlikely(pt->req->poll.head)) { 1670 pt->error = -EINVAL; 1671 return; 1672 } 1673 1674 pt->error = 0; 1675 pt->req->poll.head = head; 1676 add_wait_queue(head, &pt->req->poll.wait); 1677} 1678 1679static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1680{ 1681 struct io_poll_iocb *poll = &req->poll; 1682 struct io_ring_ctx *ctx = req->ctx; 1683 struct io_poll_table ipt; 1684 bool cancel = false; 1685 __poll_t mask; 1686 u16 events; 1687 1688 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1689 return -EINVAL; 1690 if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index) 1691 return -EINVAL; 1692 if (!poll->file) 1693 return -EBADF; 1694 1695 INIT_WORK(&req->work, io_poll_complete_work); 1696 events = READ_ONCE(sqe->poll_events); 1697 poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; 1698 1699 poll->head = NULL; 1700 poll->done = false; 1701 poll->canceled = false; 1702 1703 ipt.pt._qproc = io_poll_queue_proc; 1704 ipt.pt._key = poll->events; 1705 ipt.req = req; 1706 ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */ 1707 1708 /* initialized the list so that we can do list_empty checks */ 1709 INIT_LIST_HEAD(&poll->wait.entry); 1710 init_waitqueue_func_entry(&poll->wait, io_poll_wake); 1711 1712 INIT_LIST_HEAD(&req->list); 1713 1714 mask = vfs_poll(poll->file, &ipt.pt) & poll->events; 1715 1716 spin_lock_irq(&ctx->completion_lock); 1717 if (likely(poll->head)) { 1718 spin_lock(&poll->head->lock); 1719 if (unlikely(list_empty(&poll->wait.entry))) { 1720 if (ipt.error) 1721 cancel = true; 1722 ipt.error = 0; 1723 mask = 0; 1724 } 1725 if (mask || ipt.error) 1726 list_del_init(&poll->wait.entry); 1727 else if (cancel) 1728 WRITE_ONCE(poll->canceled, true); 1729 else if (!poll->done) /* actually waiting for an event */ 1730 list_add_tail(&req->list, &ctx->cancel_list); 1731 spin_unlock(&poll->head->lock); 1732 } 1733 if (mask) { /* no async, we'd stolen it */ 1734 ipt.error = 0; 1735 io_poll_complete(ctx, req, mask); 1736 } 1737 spin_unlock_irq(&ctx->completion_lock); 1738 1739 if (mask) { 1740 io_cqring_ev_posted(ctx); 1741 io_put_req(req); 1742 } 1743 return ipt.error; 1744} 1745 1746static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, 1747 const struct io_uring_sqe *sqe) 1748{ 1749 struct io_uring_sqe *sqe_copy; 1750 1751 if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) 1752 return 0; 1753 1754 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); 1755 if (!sqe_copy) 1756 return -EAGAIN; 1757 1758 spin_lock_irq(&ctx->completion_lock); 1759 if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) { 1760 spin_unlock_irq(&ctx->completion_lock); 1761 kfree(sqe_copy); 1762 return 0; 1763 } 1764 1765 memcpy(sqe_copy, sqe, sizeof(*sqe_copy)); 1766 req->submit.sqe = sqe_copy; 1767 1768 INIT_WORK(&req->work, io_sq_wq_submit_work); 1769 list_add_tail(&req->list, &ctx->defer_list); 1770 spin_unlock_irq(&ctx->completion_lock); 1771 return -EIOCBQUEUED; 1772} 1773 1774static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 1775 const struct sqe_submit *s, bool force_nonblock) 1776{ 1777 int ret, opcode; 1778 1779 req->user_data = READ_ONCE(s->sqe->user_data); 1780 1781 if (unlikely(s->index >= ctx->sq_entries)) 1782 return -EINVAL; 1783 1784 opcode = READ_ONCE(s->sqe->opcode); 1785 switch (opcode) { 1786 case IORING_OP_NOP: 1787 ret = io_nop(req, req->user_data); 1788 break; 1789 case IORING_OP_READV: 1790 if (unlikely(s->sqe->buf_index)) 1791 return -EINVAL; 1792 ret = io_read(req, s, force_nonblock); 1793 break; 1794 case IORING_OP_WRITEV: 1795 if (unlikely(s->sqe->buf_index)) 1796 return -EINVAL; 1797 ret = io_write(req, s, force_nonblock); 1798 break; 1799 case IORING_OP_READ_FIXED: 1800 ret = io_read(req, s, force_nonblock); 1801 break; 1802 case IORING_OP_WRITE_FIXED: 1803 ret = io_write(req, s, force_nonblock); 1804 break; 1805 case IORING_OP_FSYNC: 1806 ret = io_fsync(req, s->sqe, force_nonblock); 1807 break; 1808 case IORING_OP_POLL_ADD: 1809 ret = io_poll_add(req, s->sqe); 1810 break; 1811 case IORING_OP_POLL_REMOVE: 1812 ret = io_poll_remove(req, s->sqe); 1813 break; 1814 case IORING_OP_SYNC_FILE_RANGE: 1815 ret = io_sync_file_range(req, s->sqe, force_nonblock); 1816 break; 1817 case IORING_OP_SENDMSG: 1818 ret = io_sendmsg(req, s->sqe, force_nonblock); 1819 break; 1820 case IORING_OP_RECVMSG: 1821 ret = io_recvmsg(req, s->sqe, force_nonblock); 1822 break; 1823 default: 1824 ret = -EINVAL; 1825 break; 1826 } 1827 1828 if (ret) 1829 return ret; 1830 1831 if (ctx->flags & IORING_SETUP_IOPOLL) { 1832 if (req->result == -EAGAIN) 1833 return -EAGAIN; 1834 1835 /* workqueue context doesn't hold uring_lock, grab it now */ 1836 if (s->needs_lock) 1837 mutex_lock(&ctx->uring_lock); 1838 io_iopoll_req_issued(req); 1839 if (s->needs_lock) 1840 mutex_unlock(&ctx->uring_lock); 1841 } 1842 1843 return 0; 1844} 1845 1846static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx, 1847 const struct io_uring_sqe *sqe) 1848{ 1849 switch (sqe->opcode) { 1850 case IORING_OP_READV: 1851 case IORING_OP_READ_FIXED: 1852 return &ctx->pending_async[READ]; 1853 case IORING_OP_WRITEV: 1854 case IORING_OP_WRITE_FIXED: 1855 return &ctx->pending_async[WRITE]; 1856 default: 1857 return NULL; 1858 } 1859} 1860 1861static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) 1862{ 1863 u8 opcode = READ_ONCE(sqe->opcode); 1864 1865 return !(opcode == IORING_OP_READ_FIXED || 1866 opcode == IORING_OP_WRITE_FIXED); 1867} 1868 1869static void io_sq_wq_submit_work(struct work_struct *work) 1870{ 1871 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1872 struct io_ring_ctx *ctx = req->ctx; 1873 struct mm_struct *cur_mm = NULL; 1874 struct async_list *async_list; 1875 LIST_HEAD(req_list); 1876 mm_segment_t old_fs; 1877 int ret; 1878 1879 async_list = io_async_list_from_sqe(ctx, req->submit.sqe); 1880restart: 1881 do { 1882 struct sqe_submit *s = &req->submit; 1883 const struct io_uring_sqe *sqe = s->sqe; 1884 unsigned int flags = req->flags; 1885 1886 /* Ensure we clear previously set non-block flag */ 1887 req->rw.ki_flags &= ~IOCB_NOWAIT; 1888 1889 ret = 0; 1890 if (io_sqe_needs_user(sqe) && !cur_mm) { 1891 if (!mmget_not_zero(ctx->sqo_mm)) { 1892 ret = -EFAULT; 1893 } else { 1894 cur_mm = ctx->sqo_mm; 1895 use_mm(cur_mm); 1896 old_fs = get_fs(); 1897 set_fs(USER_DS); 1898 } 1899 } 1900 1901 if (!ret) { 1902 s->has_user = cur_mm != NULL; 1903 s->needs_lock = true; 1904 do { 1905 ret = __io_submit_sqe(ctx, req, s, false); 1906 /* 1907 * We can get EAGAIN for polled IO even though 1908 * we're forcing a sync submission from here, 1909 * since we can't wait for request slots on the 1910 * block side. 1911 */ 1912 if (ret != -EAGAIN) 1913 break; 1914 cond_resched(); 1915 } while (1); 1916 } 1917 1918 /* drop submission reference */ 1919 io_put_req(req); 1920 1921 if (ret) { 1922 io_cqring_add_event(ctx, sqe->user_data, ret); 1923 io_put_req(req); 1924 } 1925 1926 /* async context always use a copy of the sqe */ 1927 kfree(sqe); 1928 1929 /* req from defer and link list needn't decrease async cnt */ 1930 if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE)) 1931 goto out; 1932 1933 if (!async_list) 1934 break; 1935 if (!list_empty(&req_list)) { 1936 req = list_first_entry(&req_list, struct io_kiocb, 1937 list); 1938 list_del(&req->list); 1939 continue; 1940 } 1941 if (list_empty(&async_list->list)) 1942 break; 1943 1944 req = NULL; 1945 spin_lock(&async_list->lock); 1946 if (list_empty(&async_list->list)) { 1947 spin_unlock(&async_list->lock); 1948 break; 1949 } 1950 list_splice_init(&async_list->list, &req_list); 1951 spin_unlock(&async_list->lock); 1952 1953 req = list_first_entry(&req_list, struct io_kiocb, list); 1954 list_del(&req->list); 1955 } while (req); 1956 1957 /* 1958 * Rare case of racing with a submitter. If we find the count has 1959 * dropped to zero AND we have pending work items, then restart 1960 * the processing. This is a tiny race window. 1961 */ 1962 if (async_list) { 1963 ret = atomic_dec_return(&async_list->cnt); 1964 while (!ret && !list_empty(&async_list->list)) { 1965 spin_lock(&async_list->lock); 1966 atomic_inc(&async_list->cnt); 1967 list_splice_init(&async_list->list, &req_list); 1968 spin_unlock(&async_list->lock); 1969 1970 if (!list_empty(&req_list)) { 1971 req = list_first_entry(&req_list, 1972 struct io_kiocb, list); 1973 list_del(&req->list); 1974 goto restart; 1975 } 1976 ret = atomic_dec_return(&async_list->cnt); 1977 } 1978 } 1979 1980out: 1981 if (cur_mm) { 1982 set_fs(old_fs); 1983 unuse_mm(cur_mm); 1984 mmput(cur_mm); 1985 } 1986} 1987 1988/* 1989 * See if we can piggy back onto previously submitted work, that is still 1990 * running. We currently only allow this if the new request is sequential 1991 * to the previous one we punted. 1992 */ 1993static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) 1994{ 1995 bool ret = false; 1996 1997 if (!list) 1998 return false; 1999 if (!(req->flags & REQ_F_SEQ_PREV)) 2000 return false; 2001 if (!atomic_read(&list->cnt)) 2002 return false; 2003 2004 ret = true; 2005 spin_lock(&list->lock); 2006 list_add_tail(&req->list, &list->list); 2007 /* 2008 * Ensure we see a simultaneous modification from io_sq_wq_submit_work() 2009 */ 2010 smp_mb(); 2011 if (!atomic_read(&list->cnt)) { 2012 list_del_init(&req->list); 2013 ret = false; 2014 } 2015 spin_unlock(&list->lock); 2016 return ret; 2017} 2018 2019static bool io_op_needs_file(const struct io_uring_sqe *sqe) 2020{ 2021 int op = READ_ONCE(sqe->opcode); 2022 2023 switch (op) { 2024 case IORING_OP_NOP: 2025 case IORING_OP_POLL_REMOVE: 2026 return false; 2027 default: 2028 return true; 2029 } 2030} 2031 2032static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, 2033 struct io_submit_state *state, struct io_kiocb *req) 2034{ 2035 unsigned flags; 2036 int fd; 2037 2038 flags = READ_ONCE(s->sqe->flags); 2039 fd = READ_ONCE(s->sqe->fd); 2040 2041 if (flags & IOSQE_IO_DRAIN) { 2042 req->flags |= REQ_F_IO_DRAIN; 2043 req->sequence = ctx->cached_sq_head - 1; 2044 } 2045 2046 if (!io_op_needs_file(s->sqe)) 2047 return 0; 2048 2049 if (flags & IOSQE_FIXED_FILE) { 2050 if (unlikely(!ctx->user_files || 2051 (unsigned) fd >= ctx->nr_user_files)) 2052 return -EBADF; 2053 req->file = ctx->user_files[fd]; 2054 req->flags |= REQ_F_FIXED_FILE; 2055 } else { 2056 if (s->needs_fixed_file) 2057 return -EBADF; 2058 req->file = io_file_get(state, fd); 2059 if (unlikely(!req->file)) 2060 return -EBADF; 2061 } 2062 2063 return 0; 2064} 2065 2066static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 2067 struct sqe_submit *s) 2068{ 2069 int ret; 2070 2071 ret = io_req_defer(ctx, req, s->sqe); 2072 if (ret) { 2073 if (ret != -EIOCBQUEUED) { 2074 io_free_req(req); 2075 io_cqring_add_event(ctx, s->sqe->user_data, ret); 2076 } 2077 return 0; 2078 } 2079 2080 ret = __io_submit_sqe(ctx, req, s, true); 2081 if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { 2082 struct io_uring_sqe *sqe_copy; 2083 2084 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); 2085 if (sqe_copy) { 2086 struct async_list *list; 2087 2088 memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy)); 2089 s->sqe = sqe_copy; 2090 2091 memcpy(&req->submit, s, sizeof(*s)); 2092 list = io_async_list_from_sqe(ctx, s->sqe); 2093 if (!io_add_to_prev_work(list, req)) { 2094 if (list) 2095 atomic_inc(&list->cnt); 2096 INIT_WORK(&req->work, io_sq_wq_submit_work); 2097 queue_work(ctx->sqo_wq, &req->work); 2098 } 2099 2100 /* 2101 * Queued up for async execution, worker will release 2102 * submit reference when the iocb is actually submitted. 2103 */ 2104 return 0; 2105 } 2106 } 2107 2108 /* drop submission reference */ 2109 io_put_req(req); 2110 2111 /* and drop final reference, if we failed */ 2112 if (ret) { 2113 io_cqring_add_event(ctx, req->user_data, ret); 2114 if (req->flags & REQ_F_LINK) 2115 req->flags |= REQ_F_FAIL_LINK; 2116 io_put_req(req); 2117 } 2118 2119 return ret; 2120} 2121 2122#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK) 2123 2124static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, 2125 struct io_submit_state *state, struct io_kiocb **link) 2126{ 2127 struct io_uring_sqe *sqe_copy; 2128 struct io_kiocb *req; 2129 int ret; 2130 2131 /* enforce forwards compatibility on users */ 2132 if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) { 2133 ret = -EINVAL; 2134 goto err; 2135 } 2136 2137 req = io_get_req(ctx, state); 2138 if (unlikely(!req)) { 2139 ret = -EAGAIN; 2140 goto err; 2141 } 2142 2143 ret = io_req_set_file(ctx, s, state, req); 2144 if (unlikely(ret)) { 2145err_req: 2146 io_free_req(req); 2147err: 2148 io_cqring_add_event(ctx, s->sqe->user_data, ret); 2149 return; 2150 } 2151 2152 /* 2153 * If we already have a head request, queue this one for async 2154 * submittal once the head completes. If we don't have a head but 2155 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be 2156 * submitted sync once the chain is complete. If none of those 2157 * conditions are true (normal request), then just queue it. 2158 */ 2159 if (*link) { 2160 struct io_kiocb *prev = *link; 2161 2162 sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL); 2163 if (!sqe_copy) { 2164 ret = -EAGAIN; 2165 goto err_req; 2166 } 2167 2168 s->sqe = sqe_copy; 2169 memcpy(&req->submit, s, sizeof(*s)); 2170 list_add_tail(&req->list, &prev->link_list); 2171 } else if (s->sqe->flags & IOSQE_IO_LINK) { 2172 req->flags |= REQ_F_LINK; 2173 2174 memcpy(&req->submit, s, sizeof(*s)); 2175 INIT_LIST_HEAD(&req->link_list); 2176 *link = req; 2177 } else { 2178 io_queue_sqe(ctx, req, s); 2179 } 2180} 2181 2182/* 2183 * Batched submission is done, ensure local IO is flushed out. 2184 */ 2185static void io_submit_state_end(struct io_submit_state *state) 2186{ 2187 blk_finish_plug(&state->plug); 2188 io_file_put(state); 2189 if (state->free_reqs) 2190 kmem_cache_free_bulk(req_cachep, state->free_reqs, 2191 &state->reqs[state->cur_req]); 2192} 2193 2194/* 2195 * Start submission side cache. 2196 */ 2197static void io_submit_state_start(struct io_submit_state *state, 2198 struct io_ring_ctx *ctx, unsigned max_ios) 2199{ 2200 blk_start_plug(&state->plug); 2201 state->free_reqs = 0; 2202 state->file = NULL; 2203 state->ios_left = max_ios; 2204} 2205 2206static void io_commit_sqring(struct io_ring_ctx *ctx) 2207{ 2208 struct io_sq_ring *ring = ctx->sq_ring; 2209 2210 if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) { 2211 /* 2212 * Ensure any loads from the SQEs are done at this point, 2213 * since once we write the new head, the application could 2214 * write new data to them. 2215 */ 2216 smp_store_release(&ring->r.head, ctx->cached_sq_head); 2217 } 2218} 2219 2220/* 2221 * Fetch an sqe, if one is available. Note that s->sqe will point to memory 2222 * that is mapped by userspace. This means that care needs to be taken to 2223 * ensure that reads are stable, as we cannot rely on userspace always 2224 * being a good citizen. If members of the sqe are validated and then later 2225 * used, it's important that those reads are done through READ_ONCE() to 2226 * prevent a re-load down the line. 2227 */ 2228static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) 2229{ 2230 struct io_sq_ring *ring = ctx->sq_ring; 2231 unsigned head; 2232 2233 /* 2234 * The cached sq head (or cq tail) serves two purposes: 2235 * 2236 * 1) allows us to batch the cost of updating the user visible 2237 * head updates. 2238 * 2) allows the kernel side to track the head on its own, even 2239 * though the application is the one updating it. 2240 */ 2241 head = ctx->cached_sq_head; 2242 /* make sure SQ entry isn't read before tail */ 2243 if (head == smp_load_acquire(&ring->r.tail)) 2244 return false; 2245 2246 head = READ_ONCE(ring->array[head & ctx->sq_mask]); 2247 if (head < ctx->sq_entries) { 2248 s->index = head; 2249 s->sqe = &ctx->sq_sqes[head]; 2250 ctx->cached_sq_head++; 2251 return true; 2252 } 2253 2254 /* drop invalid entries */ 2255 ctx->cached_sq_head++; 2256 ring->dropped++; 2257 return false; 2258} 2259 2260static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, 2261 unsigned int nr, bool has_user, bool mm_fault) 2262{ 2263 struct io_submit_state state, *statep = NULL; 2264 struct io_kiocb *link = NULL; 2265 bool prev_was_link = false; 2266 int i, submitted = 0; 2267 2268 if (nr > IO_PLUG_THRESHOLD) { 2269 io_submit_state_start(&state, ctx, nr); 2270 statep = &state; 2271 } 2272 2273 for (i = 0; i < nr; i++) { 2274 /* 2275 * If previous wasn't linked and we have a linked command, 2276 * that's the end of the chain. Submit the previous link. 2277 */ 2278 if (!prev_was_link && link) { 2279 io_queue_sqe(ctx, link, &link->submit); 2280 link = NULL; 2281 } 2282 prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0; 2283 2284 if (unlikely(mm_fault)) { 2285 io_cqring_add_event(ctx, sqes[i].sqe->user_data, 2286 -EFAULT); 2287 } else { 2288 sqes[i].has_user = has_user; 2289 sqes[i].needs_lock = true; 2290 sqes[i].needs_fixed_file = true; 2291 io_submit_sqe(ctx, &sqes[i], statep, &link); 2292 submitted++; 2293 } 2294 } 2295 2296 if (link) 2297 io_queue_sqe(ctx, link, &link->submit); 2298 if (statep) 2299 io_submit_state_end(&state); 2300 2301 return submitted; 2302} 2303 2304static int io_sq_thread(void *data) 2305{ 2306 struct sqe_submit sqes[IO_IOPOLL_BATCH]; 2307 struct io_ring_ctx *ctx = data; 2308 struct mm_struct *cur_mm = NULL; 2309 mm_segment_t old_fs; 2310 DEFINE_WAIT(wait); 2311 unsigned inflight; 2312 unsigned long timeout; 2313 2314 complete(&ctx->sqo_thread_started); 2315 2316 old_fs = get_fs(); 2317 set_fs(USER_DS); 2318 2319 timeout = inflight = 0; 2320 while (!kthread_should_park()) { 2321 bool all_fixed, mm_fault = false; 2322 int i; 2323 2324 if (inflight) { 2325 unsigned nr_events = 0; 2326 2327 if (ctx->flags & IORING_SETUP_IOPOLL) { 2328 io_iopoll_check(ctx, &nr_events, 0); 2329 } else { 2330 /* 2331 * Normal IO, just pretend everything completed. 2332 * We don't have to poll completions for that. 2333 */ 2334 nr_events = inflight; 2335 } 2336 2337 inflight -= nr_events; 2338 if (!inflight) 2339 timeout = jiffies + ctx->sq_thread_idle; 2340 } 2341 2342 if (!io_get_sqring(ctx, &sqes[0])) { 2343 /* 2344 * We're polling. If we're within the defined idle 2345 * period, then let us spin without work before going 2346 * to sleep. 2347 */ 2348 if (inflight || !time_after(jiffies, timeout)) { 2349 cpu_relax(); 2350 continue; 2351 } 2352 2353 /* 2354 * Drop cur_mm before scheduling, we can't hold it for 2355 * long periods (or over schedule()). Do this before 2356 * adding ourselves to the waitqueue, as the unuse/drop 2357 * may sleep. 2358 */ 2359 if (cur_mm) { 2360 unuse_mm(cur_mm); 2361 mmput(cur_mm); 2362 cur_mm = NULL; 2363 } 2364 2365 prepare_to_wait(&ctx->sqo_wait, &wait, 2366 TASK_INTERRUPTIBLE); 2367 2368 /* Tell userspace we may need a wakeup call */ 2369 ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP; 2370 /* make sure to read SQ tail after writing flags */ 2371 smp_mb(); 2372 2373 if (!io_get_sqring(ctx, &sqes[0])) { 2374 if (kthread_should_park()) { 2375 finish_wait(&ctx->sqo_wait, &wait); 2376 break; 2377 } 2378 if (signal_pending(current)) 2379 flush_signals(current); 2380 schedule(); 2381 finish_wait(&ctx->sqo_wait, &wait); 2382 2383 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; 2384 continue; 2385 } 2386 finish_wait(&ctx->sqo_wait, &wait); 2387 2388 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; 2389 } 2390 2391 i = 0; 2392 all_fixed = true; 2393 do { 2394 if (all_fixed && io_sqe_needs_user(sqes[i].sqe)) 2395 all_fixed = false; 2396 2397 i++; 2398 if (i == ARRAY_SIZE(sqes)) 2399 break; 2400 } while (io_get_sqring(ctx, &sqes[i])); 2401 2402 /* Unless all new commands are FIXED regions, grab mm */ 2403 if (!all_fixed && !cur_mm) { 2404 mm_fault = !mmget_not_zero(ctx->sqo_mm); 2405 if (!mm_fault) { 2406 use_mm(ctx->sqo_mm); 2407 cur_mm = ctx->sqo_mm; 2408 } 2409 } 2410 2411 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL, 2412 mm_fault); 2413 2414 /* Commit SQ ring head once we've consumed all SQEs */ 2415 io_commit_sqring(ctx); 2416 } 2417 2418 set_fs(old_fs); 2419 if (cur_mm) { 2420 unuse_mm(cur_mm); 2421 mmput(cur_mm); 2422 } 2423 2424 kthread_parkme(); 2425 2426 return 0; 2427} 2428 2429static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) 2430{ 2431 struct io_submit_state state, *statep = NULL; 2432 struct io_kiocb *link = NULL; 2433 bool prev_was_link = false; 2434 int i, submit = 0; 2435 2436 if (to_submit > IO_PLUG_THRESHOLD) { 2437 io_submit_state_start(&state, ctx, to_submit); 2438 statep = &state; 2439 } 2440 2441 for (i = 0; i < to_submit; i++) { 2442 struct sqe_submit s; 2443 2444 if (!io_get_sqring(ctx, &s)) 2445 break; 2446 2447 /* 2448 * If previous wasn't linked and we have a linked command, 2449 * that's the end of the chain. Submit the previous link. 2450 */ 2451 if (!prev_was_link && link) { 2452 io_queue_sqe(ctx, link, &link->submit); 2453 link = NULL; 2454 } 2455 prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0; 2456 2457 s.has_user = true; 2458 s.needs_lock = false; 2459 s.needs_fixed_file = false; 2460 submit++; 2461 io_submit_sqe(ctx, &s, statep, &link); 2462 } 2463 io_commit_sqring(ctx); 2464 2465 if (link) 2466 io_queue_sqe(ctx, link, &link->submit); 2467 if (statep) 2468 io_submit_state_end(statep); 2469 2470 return submit; 2471} 2472 2473/* 2474 * Wait until events become available, if we don't already have some. The 2475 * application must reap them itself, as they reside on the shared cq ring. 2476 */ 2477static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, 2478 const sigset_t __user *sig, size_t sigsz) 2479{ 2480 struct io_cq_ring *ring = ctx->cq_ring; 2481 int ret; 2482 2483 if (io_cqring_events(ring) >= min_events) 2484 return 0; 2485 2486 if (sig) { 2487#ifdef CONFIG_COMPAT 2488 if (in_compat_syscall()) 2489 ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig, 2490 sigsz); 2491 else 2492#endif 2493 ret = set_user_sigmask(sig, sigsz); 2494 2495 if (ret) 2496 return ret; 2497 } 2498 2499 ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events); 2500 restore_saved_sigmask_unless(ret == -ERESTARTSYS); 2501 if (ret == -ERESTARTSYS) 2502 ret = -EINTR; 2503 2504 return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0; 2505} 2506 2507static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) 2508{ 2509#if defined(CONFIG_UNIX) 2510 if (ctx->ring_sock) { 2511 struct sock *sock = ctx->ring_sock->sk; 2512 struct sk_buff *skb; 2513 2514 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL) 2515 kfree_skb(skb); 2516 } 2517#else 2518 int i; 2519 2520 for (i = 0; i < ctx->nr_user_files; i++) 2521 fput(ctx->user_files[i]); 2522#endif 2523} 2524 2525static int io_sqe_files_unregister(struct io_ring_ctx *ctx) 2526{ 2527 if (!ctx->user_files) 2528 return -ENXIO; 2529 2530 __io_sqe_files_unregister(ctx); 2531 kfree(ctx->user_files); 2532 ctx->user_files = NULL; 2533 ctx->nr_user_files = 0; 2534 return 0; 2535} 2536 2537static void io_sq_thread_stop(struct io_ring_ctx *ctx) 2538{ 2539 if (ctx->sqo_thread) { 2540 wait_for_completion(&ctx->sqo_thread_started); 2541 /* 2542 * The park is a bit of a work-around, without it we get 2543 * warning spews on shutdown with SQPOLL set and affinity 2544 * set to a single CPU. 2545 */ 2546 kthread_park(ctx->sqo_thread); 2547 kthread_stop(ctx->sqo_thread); 2548 ctx->sqo_thread = NULL; 2549 } 2550} 2551 2552static void io_finish_async(struct io_ring_ctx *ctx) 2553{ 2554 io_sq_thread_stop(ctx); 2555 2556 if (ctx->sqo_wq) { 2557 destroy_workqueue(ctx->sqo_wq); 2558 ctx->sqo_wq = NULL; 2559 } 2560} 2561 2562#if defined(CONFIG_UNIX) 2563static void io_destruct_skb(struct sk_buff *skb) 2564{ 2565 struct io_ring_ctx *ctx = skb->sk->sk_user_data; 2566 2567 io_finish_async(ctx); 2568 unix_destruct_scm(skb); 2569} 2570 2571/* 2572 * Ensure the UNIX gc is aware of our file set, so we are certain that 2573 * the io_uring can be safely unregistered on process exit, even if we have 2574 * loops in the file referencing. 2575 */ 2576static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset) 2577{ 2578 struct sock *sk = ctx->ring_sock->sk; 2579 struct scm_fp_list *fpl; 2580 struct sk_buff *skb; 2581 int i; 2582 2583 if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) { 2584 unsigned long inflight = ctx->user->unix_inflight + nr; 2585 2586 if (inflight > task_rlimit(current, RLIMIT_NOFILE)) 2587 return -EMFILE; 2588 } 2589 2590 fpl = kzalloc(sizeof(*fpl), GFP_KERNEL); 2591 if (!fpl) 2592 return -ENOMEM; 2593 2594 skb = alloc_skb(0, GFP_KERNEL); 2595 if (!skb) { 2596 kfree(fpl); 2597 return -ENOMEM; 2598 } 2599 2600 skb->sk = sk; 2601 skb->destructor = io_destruct_skb; 2602 2603 fpl->user = get_uid(ctx->user); 2604 for (i = 0; i < nr; i++) { 2605 fpl->fp[i] = get_file(ctx->user_files[i + offset]); 2606 unix_inflight(fpl->user, fpl->fp[i]); 2607 } 2608 2609 fpl->max = fpl->count = nr; 2610 UNIXCB(skb).fp = fpl; 2611 refcount_add(skb->truesize, &sk->sk_wmem_alloc); 2612 skb_queue_head(&sk->sk_receive_queue, skb); 2613 2614 for (i = 0; i < nr; i++) 2615 fput(fpl->fp[i]); 2616 2617 return 0; 2618} 2619 2620/* 2621 * If UNIX sockets are enabled, fd passing can cause a reference cycle which 2622 * causes regular reference counting to break down. We rely on the UNIX 2623 * garbage collection to take care of this problem for us. 2624 */ 2625static int io_sqe_files_scm(struct io_ring_ctx *ctx) 2626{ 2627 unsigned left, total; 2628 int ret = 0; 2629 2630 total = 0; 2631 left = ctx->nr_user_files; 2632 while (left) { 2633 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD); 2634 2635 ret = __io_sqe_files_scm(ctx, this_files, total); 2636 if (ret) 2637 break; 2638 left -= this_files; 2639 total += this_files; 2640 } 2641 2642 if (!ret) 2643 return 0; 2644 2645 while (total < ctx->nr_user_files) { 2646 fput(ctx->user_files[total]); 2647 total++; 2648 } 2649 2650 return ret; 2651} 2652#else 2653static int io_sqe_files_scm(struct io_ring_ctx *ctx) 2654{ 2655 return 0; 2656} 2657#endif 2658 2659static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, 2660 unsigned nr_args) 2661{ 2662 __s32 __user *fds = (__s32 __user *) arg; 2663 int fd, ret = 0; 2664 unsigned i; 2665 2666 if (ctx->user_files) 2667 return -EBUSY; 2668 if (!nr_args) 2669 return -EINVAL; 2670 if (nr_args > IORING_MAX_FIXED_FILES) 2671 return -EMFILE; 2672 2673 ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL); 2674 if (!ctx->user_files) 2675 return -ENOMEM; 2676 2677 for (i = 0; i < nr_args; i++) { 2678 ret = -EFAULT; 2679 if (copy_from_user(&fd, &fds[i], sizeof(fd))) 2680 break; 2681 2682 ctx->user_files[i] = fget(fd); 2683 2684 ret = -EBADF; 2685 if (!ctx->user_files[i]) 2686 break; 2687 /* 2688 * Don't allow io_uring instances to be registered. If UNIX 2689 * isn't enabled, then this causes a reference cycle and this 2690 * instance can never get freed. If UNIX is enabled we'll 2691 * handle it just fine, but there's still no point in allowing 2692 * a ring fd as it doesn't support regular read/write anyway. 2693 */ 2694 if (ctx->user_files[i]->f_op == &io_uring_fops) { 2695 fput(ctx->user_files[i]); 2696 break; 2697 } 2698 ctx->nr_user_files++; 2699 ret = 0; 2700 } 2701 2702 if (ret) { 2703 for (i = 0; i < ctx->nr_user_files; i++) 2704 fput(ctx->user_files[i]); 2705 2706 kfree(ctx->user_files); 2707 ctx->user_files = NULL; 2708 ctx->nr_user_files = 0; 2709 return ret; 2710 } 2711 2712 ret = io_sqe_files_scm(ctx); 2713 if (ret) 2714 io_sqe_files_unregister(ctx); 2715 2716 return ret; 2717} 2718 2719static int io_sq_offload_start(struct io_ring_ctx *ctx, 2720 struct io_uring_params *p) 2721{ 2722 int ret; 2723 2724 init_waitqueue_head(&ctx->sqo_wait); 2725 mmgrab(current->mm); 2726 ctx->sqo_mm = current->mm; 2727 2728 if (ctx->flags & IORING_SETUP_SQPOLL) { 2729 ret = -EPERM; 2730 if (!capable(CAP_SYS_ADMIN)) 2731 goto err; 2732 2733 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 2734 if (!ctx->sq_thread_idle) 2735 ctx->sq_thread_idle = HZ; 2736 2737 if (p->flags & IORING_SETUP_SQ_AFF) { 2738 int cpu = p->sq_thread_cpu; 2739 2740 ret = -EINVAL; 2741 if (cpu >= nr_cpu_ids) 2742 goto err; 2743 if (!cpu_online(cpu)) 2744 goto err; 2745 2746 ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread, 2747 ctx, cpu, 2748 "io_uring-sq"); 2749 } else { 2750 ctx->sqo_thread = kthread_create(io_sq_thread, ctx, 2751 "io_uring-sq"); 2752 } 2753 if (IS_ERR(ctx->sqo_thread)) { 2754 ret = PTR_ERR(ctx->sqo_thread); 2755 ctx->sqo_thread = NULL; 2756 goto err; 2757 } 2758 wake_up_process(ctx->sqo_thread); 2759 } else if (p->flags & IORING_SETUP_SQ_AFF) { 2760 /* Can't have SQ_AFF without SQPOLL */ 2761 ret = -EINVAL; 2762 goto err; 2763 } 2764 2765 /* Do QD, or 2 * CPUS, whatever is smallest */ 2766 ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE, 2767 min(ctx->sq_entries - 1, 2 * num_online_cpus())); 2768 if (!ctx->sqo_wq) { 2769 ret = -ENOMEM; 2770 goto err; 2771 } 2772 2773 return 0; 2774err: 2775 io_sq_thread_stop(ctx); 2776 mmdrop(ctx->sqo_mm); 2777 ctx->sqo_mm = NULL; 2778 return ret; 2779} 2780 2781static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages) 2782{ 2783 atomic_long_sub(nr_pages, &user->locked_vm); 2784} 2785 2786static int io_account_mem(struct user_struct *user, unsigned long nr_pages) 2787{ 2788 unsigned long page_limit, cur_pages, new_pages; 2789 2790 /* Don't allow more pages than we can safely lock */ 2791 page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT; 2792 2793 do { 2794 cur_pages = atomic_long_read(&user->locked_vm); 2795 new_pages = cur_pages + nr_pages; 2796 if (new_pages > page_limit) 2797 return -ENOMEM; 2798 } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages, 2799 new_pages) != cur_pages); 2800 2801 return 0; 2802} 2803 2804static void io_mem_free(void *ptr) 2805{ 2806 struct page *page; 2807 2808 if (!ptr) 2809 return; 2810 2811 page = virt_to_head_page(ptr); 2812 if (put_page_testzero(page)) 2813 free_compound_page(page); 2814} 2815 2816static void *io_mem_alloc(size_t size) 2817{ 2818 gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP | 2819 __GFP_NORETRY; 2820 2821 return (void *) __get_free_pages(gfp_flags, get_order(size)); 2822} 2823 2824static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries) 2825{ 2826 struct io_sq_ring *sq_ring; 2827 struct io_cq_ring *cq_ring; 2828 size_t bytes; 2829 2830 bytes = struct_size(sq_ring, array, sq_entries); 2831 bytes += array_size(sizeof(struct io_uring_sqe), sq_entries); 2832 bytes += struct_size(cq_ring, cqes, cq_entries); 2833 2834 return (bytes + PAGE_SIZE - 1) / PAGE_SIZE; 2835} 2836 2837static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) 2838{ 2839 int i, j; 2840 2841 if (!ctx->user_bufs) 2842 return -ENXIO; 2843 2844 for (i = 0; i < ctx->nr_user_bufs; i++) { 2845 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 2846 2847 for (j = 0; j < imu->nr_bvecs; j++) 2848 put_page(imu->bvec[j].bv_page); 2849 2850 if (ctx->account_mem) 2851 io_unaccount_mem(ctx->user, imu->nr_bvecs); 2852 kvfree(imu->bvec); 2853 imu->nr_bvecs = 0; 2854 } 2855 2856 kfree(ctx->user_bufs); 2857 ctx->user_bufs = NULL; 2858 ctx->nr_user_bufs = 0; 2859 return 0; 2860} 2861 2862static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst, 2863 void __user *arg, unsigned index) 2864{ 2865 struct iovec __user *src; 2866 2867#ifdef CONFIG_COMPAT 2868 if (ctx->compat) { 2869 struct compat_iovec __user *ciovs; 2870 struct compat_iovec ciov; 2871 2872 ciovs = (struct compat_iovec __user *) arg; 2873 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov))) 2874 return -EFAULT; 2875 2876 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base; 2877 dst->iov_len = ciov.iov_len; 2878 return 0; 2879 } 2880#endif 2881 src = (struct iovec __user *) arg; 2882 if (copy_from_user(dst, &src[index], sizeof(*dst))) 2883 return -EFAULT; 2884 return 0; 2885} 2886 2887static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, 2888 unsigned nr_args) 2889{ 2890 struct vm_area_struct **vmas = NULL; 2891 struct page **pages = NULL; 2892 int i, j, got_pages = 0; 2893 int ret = -EINVAL; 2894 2895 if (ctx->user_bufs) 2896 return -EBUSY; 2897 if (!nr_args || nr_args > UIO_MAXIOV) 2898 return -EINVAL; 2899 2900 ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf), 2901 GFP_KERNEL); 2902 if (!ctx->user_bufs) 2903 return -ENOMEM; 2904 2905 for (i = 0; i < nr_args; i++) { 2906 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 2907 unsigned long off, start, end, ubuf; 2908 int pret, nr_pages; 2909 struct iovec iov; 2910 size_t size; 2911 2912 ret = io_copy_iov(ctx, &iov, arg, i); 2913 if (ret) 2914 goto err; 2915 2916 /* 2917 * Don't impose further limits on the size and buffer 2918 * constraints here, we'll -EINVAL later when IO is 2919 * submitted if they are wrong. 2920 */ 2921 ret = -EFAULT; 2922 if (!iov.iov_base || !iov.iov_len) 2923 goto err; 2924 2925 /* arbitrary limit, but we need something */ 2926 if (iov.iov_len > SZ_1G) 2927 goto err; 2928 2929 ubuf = (unsigned long) iov.iov_base; 2930 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT; 2931 start = ubuf >> PAGE_SHIFT; 2932 nr_pages = end - start; 2933 2934 if (ctx->account_mem) { 2935 ret = io_account_mem(ctx->user, nr_pages); 2936 if (ret) 2937 goto err; 2938 } 2939 2940 ret = 0; 2941 if (!pages || nr_pages > got_pages) { 2942 kfree(vmas); 2943 kfree(pages); 2944 pages = kvmalloc_array(nr_pages, sizeof(struct page *), 2945 GFP_KERNEL); 2946 vmas = kvmalloc_array(nr_pages, 2947 sizeof(struct vm_area_struct *), 2948 GFP_KERNEL); 2949 if (!pages || !vmas) { 2950 ret = -ENOMEM; 2951 if (ctx->account_mem) 2952 io_unaccount_mem(ctx->user, nr_pages); 2953 goto err; 2954 } 2955 got_pages = nr_pages; 2956 } 2957 2958 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec), 2959 GFP_KERNEL); 2960 ret = -ENOMEM; 2961 if (!imu->bvec) { 2962 if (ctx->account_mem) 2963 io_unaccount_mem(ctx->user, nr_pages); 2964 goto err; 2965 } 2966 2967 ret = 0; 2968 down_read(&current->mm->mmap_sem); 2969 pret = get_user_pages(ubuf, nr_pages, 2970 FOLL_WRITE | FOLL_LONGTERM, 2971 pages, vmas); 2972 if (pret == nr_pages) { 2973 /* don't support file backed memory */ 2974 for (j = 0; j < nr_pages; j++) { 2975 struct vm_area_struct *vma = vmas[j]; 2976 2977 if (vma->vm_file && 2978 !is_file_hugepages(vma->vm_file)) { 2979 ret = -EOPNOTSUPP; 2980 break; 2981 } 2982 } 2983 } else { 2984 ret = pret < 0 ? pret : -EFAULT; 2985 } 2986 up_read(&current->mm->mmap_sem); 2987 if (ret) { 2988 /* 2989 * if we did partial map, or found file backed vmas, 2990 * release any pages we did get 2991 */ 2992 if (pret > 0) { 2993 for (j = 0; j < pret; j++) 2994 put_page(pages[j]); 2995 } 2996 if (ctx->account_mem) 2997 io_unaccount_mem(ctx->user, nr_pages); 2998 kvfree(imu->bvec); 2999 goto err; 3000 } 3001 3002 off = ubuf & ~PAGE_MASK; 3003 size = iov.iov_len; 3004 for (j = 0; j < nr_pages; j++) { 3005 size_t vec_len; 3006 3007 vec_len = min_t(size_t, size, PAGE_SIZE - off); 3008 imu->bvec[j].bv_page = pages[j]; 3009 imu->bvec[j].bv_len = vec_len; 3010 imu->bvec[j].bv_offset = off; 3011 off = 0; 3012 size -= vec_len; 3013 } 3014 /* store original address for later verification */ 3015 imu->ubuf = ubuf; 3016 imu->len = iov.iov_len; 3017 imu->nr_bvecs = nr_pages; 3018 3019 ctx->nr_user_bufs++; 3020 } 3021 kvfree(pages); 3022 kvfree(vmas); 3023 return 0; 3024err: 3025 kvfree(pages); 3026 kvfree(vmas); 3027 io_sqe_buffer_unregister(ctx); 3028 return ret; 3029} 3030 3031static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg) 3032{ 3033 __s32 __user *fds = arg; 3034 int fd; 3035 3036 if (ctx->cq_ev_fd) 3037 return -EBUSY; 3038 3039 if (copy_from_user(&fd, fds, sizeof(*fds))) 3040 return -EFAULT; 3041 3042 ctx->cq_ev_fd = eventfd_ctx_fdget(fd); 3043 if (IS_ERR(ctx->cq_ev_fd)) { 3044 int ret = PTR_ERR(ctx->cq_ev_fd); 3045 ctx->cq_ev_fd = NULL; 3046 return ret; 3047 } 3048 3049 return 0; 3050} 3051 3052static int io_eventfd_unregister(struct io_ring_ctx *ctx) 3053{ 3054 if (ctx->cq_ev_fd) { 3055 eventfd_ctx_put(ctx->cq_ev_fd); 3056 ctx->cq_ev_fd = NULL; 3057 return 0; 3058 } 3059 3060 return -ENXIO; 3061} 3062 3063static void io_ring_ctx_free(struct io_ring_ctx *ctx) 3064{ 3065 io_finish_async(ctx); 3066 if (ctx->sqo_mm) 3067 mmdrop(ctx->sqo_mm); 3068 3069 io_iopoll_reap_events(ctx); 3070 io_sqe_buffer_unregister(ctx); 3071 io_sqe_files_unregister(ctx); 3072 io_eventfd_unregister(ctx); 3073 3074#if defined(CONFIG_UNIX) 3075 if (ctx->ring_sock) { 3076 ctx->ring_sock->file = NULL; /* so that iput() is called */ 3077 sock_release(ctx->ring_sock); 3078 } 3079#endif 3080 3081 io_mem_free(ctx->sq_ring); 3082 io_mem_free(ctx->sq_sqes); 3083 io_mem_free(ctx->cq_ring); 3084 3085 percpu_ref_exit(&ctx->refs); 3086 if (ctx->account_mem) 3087 io_unaccount_mem(ctx->user, 3088 ring_pages(ctx->sq_entries, ctx->cq_entries)); 3089 free_uid(ctx->user); 3090 kfree(ctx); 3091} 3092 3093static __poll_t io_uring_poll(struct file *file, poll_table *wait) 3094{ 3095 struct io_ring_ctx *ctx = file->private_data; 3096 __poll_t mask = 0; 3097 3098 poll_wait(file, &ctx->cq_wait, wait); 3099 /* 3100 * synchronizes with barrier from wq_has_sleeper call in 3101 * io_commit_cqring 3102 */ 3103 smp_rmb(); 3104 if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head != 3105 ctx->sq_ring->ring_entries) 3106 mask |= EPOLLOUT | EPOLLWRNORM; 3107 if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail) 3108 mask |= EPOLLIN | EPOLLRDNORM; 3109 3110 return mask; 3111} 3112 3113static int io_uring_fasync(int fd, struct file *file, int on) 3114{ 3115 struct io_ring_ctx *ctx = file->private_data; 3116 3117 return fasync_helper(fd, file, on, &ctx->cq_fasync); 3118} 3119 3120static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) 3121{ 3122 mutex_lock(&ctx->uring_lock); 3123 percpu_ref_kill(&ctx->refs); 3124 mutex_unlock(&ctx->uring_lock); 3125 3126 io_poll_remove_all(ctx); 3127 io_iopoll_reap_events(ctx); 3128 wait_for_completion(&ctx->ctx_done); 3129 io_ring_ctx_free(ctx); 3130} 3131 3132static int io_uring_release(struct inode *inode, struct file *file) 3133{ 3134 struct io_ring_ctx *ctx = file->private_data; 3135 3136 file->private_data = NULL; 3137 io_ring_ctx_wait_and_kill(ctx); 3138 return 0; 3139} 3140 3141static int io_uring_mmap(struct file *file, struct vm_area_struct *vma) 3142{ 3143 loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT; 3144 unsigned long sz = vma->vm_end - vma->vm_start; 3145 struct io_ring_ctx *ctx = file->private_data; 3146 unsigned long pfn; 3147 struct page *page; 3148 void *ptr; 3149 3150 switch (offset) { 3151 case IORING_OFF_SQ_RING: 3152 ptr = ctx->sq_ring; 3153 break; 3154 case IORING_OFF_SQES: 3155 ptr = ctx->sq_sqes; 3156 break; 3157 case IORING_OFF_CQ_RING: 3158 ptr = ctx->cq_ring; 3159 break; 3160 default: 3161 return -EINVAL; 3162 } 3163 3164 page = virt_to_head_page(ptr); 3165 if (sz > (PAGE_SIZE << compound_order(page))) 3166 return -EINVAL; 3167 3168 pfn = virt_to_phys(ptr) >> PAGE_SHIFT; 3169 return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot); 3170} 3171 3172SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, 3173 u32, min_complete, u32, flags, const sigset_t __user *, sig, 3174 size_t, sigsz) 3175{ 3176 struct io_ring_ctx *ctx; 3177 long ret = -EBADF; 3178 int submitted = 0; 3179 struct fd f; 3180 3181 if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP)) 3182 return -EINVAL; 3183 3184 f = fdget(fd); 3185 if (!f.file) 3186 return -EBADF; 3187 3188 ret = -EOPNOTSUPP; 3189 if (f.file->f_op != &io_uring_fops) 3190 goto out_fput; 3191 3192 ret = -ENXIO; 3193 ctx = f.file->private_data; 3194 if (!percpu_ref_tryget(&ctx->refs)) 3195 goto out_fput; 3196 3197 /* 3198 * For SQ polling, the thread will do all submissions and completions. 3199 * Just return the requested submit count, and wake the thread if 3200 * we were asked to. 3201 */ 3202 if (ctx->flags & IORING_SETUP_SQPOLL) { 3203 if (flags & IORING_ENTER_SQ_WAKEUP) 3204 wake_up(&ctx->sqo_wait); 3205 submitted = to_submit; 3206 goto out_ctx; 3207 } 3208 3209 ret = 0; 3210 if (to_submit) { 3211 to_submit = min(to_submit, ctx->sq_entries); 3212 3213 mutex_lock(&ctx->uring_lock); 3214 submitted = io_ring_submit(ctx, to_submit); 3215 mutex_unlock(&ctx->uring_lock); 3216 } 3217 if (flags & IORING_ENTER_GETEVENTS) { 3218 unsigned nr_events = 0; 3219 3220 min_complete = min(min_complete, ctx->cq_entries); 3221 3222 if (ctx->flags & IORING_SETUP_IOPOLL) { 3223 ret = io_iopoll_check(ctx, &nr_events, min_complete); 3224 } else { 3225 ret = io_cqring_wait(ctx, min_complete, sig, sigsz); 3226 } 3227 } 3228 3229out_ctx: 3230 io_ring_drop_ctx_refs(ctx, 1); 3231out_fput: 3232 fdput(f); 3233 return submitted ? submitted : ret; 3234} 3235 3236static const struct file_operations io_uring_fops = { 3237 .release = io_uring_release, 3238 .mmap = io_uring_mmap, 3239 .poll = io_uring_poll, 3240 .fasync = io_uring_fasync, 3241}; 3242 3243static int io_allocate_scq_urings(struct io_ring_ctx *ctx, 3244 struct io_uring_params *p) 3245{ 3246 struct io_sq_ring *sq_ring; 3247 struct io_cq_ring *cq_ring; 3248 size_t size; 3249 3250 sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries)); 3251 if (!sq_ring) 3252 return -ENOMEM; 3253 3254 ctx->sq_ring = sq_ring; 3255 sq_ring->ring_mask = p->sq_entries - 1; 3256 sq_ring->ring_entries = p->sq_entries; 3257 ctx->sq_mask = sq_ring->ring_mask; 3258 ctx->sq_entries = sq_ring->ring_entries; 3259 3260 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); 3261 if (size == SIZE_MAX) 3262 return -EOVERFLOW; 3263 3264 ctx->sq_sqes = io_mem_alloc(size); 3265 if (!ctx->sq_sqes) 3266 return -ENOMEM; 3267 3268 cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries)); 3269 if (!cq_ring) 3270 return -ENOMEM; 3271 3272 ctx->cq_ring = cq_ring; 3273 cq_ring->ring_mask = p->cq_entries - 1; 3274 cq_ring->ring_entries = p->cq_entries; 3275 ctx->cq_mask = cq_ring->ring_mask; 3276 ctx->cq_entries = cq_ring->ring_entries; 3277 return 0; 3278} 3279 3280/* 3281 * Allocate an anonymous fd, this is what constitutes the application 3282 * visible backing of an io_uring instance. The application mmaps this 3283 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled, 3284 * we have to tie this fd to a socket for file garbage collection purposes. 3285 */ 3286static int io_uring_get_fd(struct io_ring_ctx *ctx) 3287{ 3288 struct file *file; 3289 int ret; 3290 3291#if defined(CONFIG_UNIX) 3292 ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP, 3293 &ctx->ring_sock); 3294 if (ret) 3295 return ret; 3296#endif 3297 3298 ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC); 3299 if (ret < 0) 3300 goto err; 3301 3302 file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx, 3303 O_RDWR | O_CLOEXEC); 3304 if (IS_ERR(file)) { 3305 put_unused_fd(ret); 3306 ret = PTR_ERR(file); 3307 goto err; 3308 } 3309 3310#if defined(CONFIG_UNIX) 3311 ctx->ring_sock->file = file; 3312 ctx->ring_sock->sk->sk_user_data = ctx; 3313#endif 3314 fd_install(ret, file); 3315 return ret; 3316err: 3317#if defined(CONFIG_UNIX) 3318 sock_release(ctx->ring_sock); 3319 ctx->ring_sock = NULL; 3320#endif 3321 return ret; 3322} 3323 3324static int io_uring_create(unsigned entries, struct io_uring_params *p) 3325{ 3326 struct user_struct *user = NULL; 3327 struct io_ring_ctx *ctx; 3328 bool account_mem; 3329 int ret; 3330 3331 if (!entries || entries > IORING_MAX_ENTRIES) 3332 return -EINVAL; 3333 3334 /* 3335 * Use twice as many entries for the CQ ring. It's possible for the 3336 * application to drive a higher depth than the size of the SQ ring, 3337 * since the sqes are only used at submission time. This allows for 3338 * some flexibility in overcommitting a bit. 3339 */ 3340 p->sq_entries = roundup_pow_of_two(entries); 3341 p->cq_entries = 2 * p->sq_entries; 3342 3343 user = get_uid(current_user()); 3344 account_mem = !capable(CAP_IPC_LOCK); 3345 3346 if (account_mem) { 3347 ret = io_account_mem(user, 3348 ring_pages(p->sq_entries, p->cq_entries)); 3349 if (ret) { 3350 free_uid(user); 3351 return ret; 3352 } 3353 } 3354 3355 ctx = io_ring_ctx_alloc(p); 3356 if (!ctx) { 3357 if (account_mem) 3358 io_unaccount_mem(user, ring_pages(p->sq_entries, 3359 p->cq_entries)); 3360 free_uid(user); 3361 return -ENOMEM; 3362 } 3363 ctx->compat = in_compat_syscall(); 3364 ctx->account_mem = account_mem; 3365 ctx->user = user; 3366 3367 ret = io_allocate_scq_urings(ctx, p); 3368 if (ret) 3369 goto err; 3370 3371 ret = io_sq_offload_start(ctx, p); 3372 if (ret) 3373 goto err; 3374 3375 ret = io_uring_get_fd(ctx); 3376 if (ret < 0) 3377 goto err; 3378 3379 memset(&p->sq_off, 0, sizeof(p->sq_off)); 3380 p->sq_off.head = offsetof(struct io_sq_ring, r.head); 3381 p->sq_off.tail = offsetof(struct io_sq_ring, r.tail); 3382 p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask); 3383 p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries); 3384 p->sq_off.flags = offsetof(struct io_sq_ring, flags); 3385 p->sq_off.dropped = offsetof(struct io_sq_ring, dropped); 3386 p->sq_off.array = offsetof(struct io_sq_ring, array); 3387 3388 memset(&p->cq_off, 0, sizeof(p->cq_off)); 3389 p->cq_off.head = offsetof(struct io_cq_ring, r.head); 3390 p->cq_off.tail = offsetof(struct io_cq_ring, r.tail); 3391 p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask); 3392 p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries); 3393 p->cq_off.overflow = offsetof(struct io_cq_ring, overflow); 3394 p->cq_off.cqes = offsetof(struct io_cq_ring, cqes); 3395 return ret; 3396err: 3397 io_ring_ctx_wait_and_kill(ctx); 3398 return ret; 3399} 3400 3401/* 3402 * Sets up an aio uring context, and returns the fd. Applications asks for a 3403 * ring size, we return the actual sq/cq ring sizes (among other things) in the 3404 * params structure passed in. 3405 */ 3406static long io_uring_setup(u32 entries, struct io_uring_params __user *params) 3407{ 3408 struct io_uring_params p; 3409 long ret; 3410 int i; 3411 3412 if (copy_from_user(&p, params, sizeof(p))) 3413 return -EFAULT; 3414 for (i = 0; i < ARRAY_SIZE(p.resv); i++) { 3415 if (p.resv[i]) 3416 return -EINVAL; 3417 } 3418 3419 if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | 3420 IORING_SETUP_SQ_AFF)) 3421 return -EINVAL; 3422 3423 ret = io_uring_create(entries, &p); 3424 if (ret < 0) 3425 return ret; 3426 3427 if (copy_to_user(params, &p, sizeof(p))) 3428 return -EFAULT; 3429 3430 return ret; 3431} 3432 3433SYSCALL_DEFINE2(io_uring_setup, u32, entries, 3434 struct io_uring_params __user *, params) 3435{ 3436 return io_uring_setup(entries, params); 3437} 3438 3439static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, 3440 void __user *arg, unsigned nr_args) 3441 __releases(ctx->uring_lock) 3442 __acquires(ctx->uring_lock) 3443{ 3444 int ret; 3445 3446 /* 3447 * We're inside the ring mutex, if the ref is already dying, then 3448 * someone else killed the ctx or is already going through 3449 * io_uring_register(). 3450 */ 3451 if (percpu_ref_is_dying(&ctx->refs)) 3452 return -ENXIO; 3453 3454 percpu_ref_kill(&ctx->refs); 3455 3456 /* 3457 * Drop uring mutex before waiting for references to exit. If another 3458 * thread is currently inside io_uring_enter() it might need to grab 3459 * the uring_lock to make progress. If we hold it here across the drain 3460 * wait, then we can deadlock. It's safe to drop the mutex here, since 3461 * no new references will come in after we've killed the percpu ref. 3462 */ 3463 mutex_unlock(&ctx->uring_lock); 3464 wait_for_completion(&ctx->ctx_done); 3465 mutex_lock(&ctx->uring_lock); 3466 3467 switch (opcode) { 3468 case IORING_REGISTER_BUFFERS: 3469 ret = io_sqe_buffer_register(ctx, arg, nr_args); 3470 break; 3471 case IORING_UNREGISTER_BUFFERS: 3472 ret = -EINVAL; 3473 if (arg || nr_args) 3474 break; 3475 ret = io_sqe_buffer_unregister(ctx); 3476 break; 3477 case IORING_REGISTER_FILES: 3478 ret = io_sqe_files_register(ctx, arg, nr_args); 3479 break; 3480 case IORING_UNREGISTER_FILES: 3481 ret = -EINVAL; 3482 if (arg || nr_args) 3483 break; 3484 ret = io_sqe_files_unregister(ctx); 3485 break; 3486 case IORING_REGISTER_EVENTFD: 3487 ret = -EINVAL; 3488 if (nr_args != 1) 3489 break; 3490 ret = io_eventfd_register(ctx, arg); 3491 break; 3492 case IORING_UNREGISTER_EVENTFD: 3493 ret = -EINVAL; 3494 if (arg || nr_args) 3495 break; 3496 ret = io_eventfd_unregister(ctx); 3497 break; 3498 default: 3499 ret = -EINVAL; 3500 break; 3501 } 3502 3503 /* bring the ctx back to life */ 3504 reinit_completion(&ctx->ctx_done); 3505 percpu_ref_reinit(&ctx->refs); 3506 return ret; 3507} 3508 3509SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, 3510 void __user *, arg, unsigned int, nr_args) 3511{ 3512 struct io_ring_ctx *ctx; 3513 long ret = -EBADF; 3514 struct fd f; 3515 3516 f = fdget(fd); 3517 if (!f.file) 3518 return -EBADF; 3519 3520 ret = -EOPNOTSUPP; 3521 if (f.file->f_op != &io_uring_fops) 3522 goto out_fput; 3523 3524 ctx = f.file->private_data; 3525 3526 mutex_lock(&ctx->uring_lock); 3527 ret = __io_uring_register(ctx, opcode, arg, nr_args); 3528 mutex_unlock(&ctx->uring_lock); 3529out_fput: 3530 fdput(f); 3531 return ret; 3532} 3533 3534static int __init io_uring_init(void) 3535{ 3536 req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC); 3537 return 0; 3538}; 3539__initcall(io_uring_init);