at v5.2 3246 lines 78 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Shared application/kernel submission and completion ring pairs, for 4 * supporting fast/efficient IO. 5 * 6 * A note on the read/write ordering memory barriers that are matched between 7 * the application and kernel side. 8 * 9 * After the application reads the CQ ring tail, it must use an 10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses 11 * before writing the tail (using smp_load_acquire to read the tail will 12 * do). It also needs a smp_mb() before updating CQ head (ordering the 13 * entry load(s) with the head store), pairing with an implicit barrier 14 * through a control-dependency in io_get_cqring (smp_store_release to 15 * store head will do). Failure to do so could lead to reading invalid 16 * CQ entries. 17 * 18 * Likewise, the application must use an appropriate smp_wmb() before 19 * writing the SQ tail (ordering SQ entry stores with the tail store), 20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release 21 * to store the tail will do). And it needs a barrier ordering the SQ 22 * head load before writing new SQ entries (smp_load_acquire to read 23 * head will do). 24 * 25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application 26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after* 27 * updating the SQ tail; a full memory barrier smp_mb() is needed 28 * between. 29 * 30 * Also see the examples in the liburing library: 31 * 32 * git://git.kernel.dk/liburing 33 * 34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens 35 * from data shared between the kernel and application. This is done both 36 * for ordering purposes, but also to ensure that once a value is loaded from 37 * data that the application could potentially modify, it remains stable. 38 * 39 * Copyright (C) 2018-2019 Jens Axboe 40 * Copyright (c) 2018-2019 Christoph Hellwig 41 */ 42#include <linux/kernel.h> 43#include <linux/init.h> 44#include <linux/errno.h> 45#include <linux/syscalls.h> 46#include <linux/compat.h> 47#include <linux/refcount.h> 48#include <linux/uio.h> 49 50#include <linux/sched/signal.h> 51#include <linux/fs.h> 52#include <linux/file.h> 53#include <linux/fdtable.h> 54#include <linux/mm.h> 55#include <linux/mman.h> 56#include <linux/mmu_context.h> 57#include <linux/percpu.h> 58#include <linux/slab.h> 59#include <linux/workqueue.h> 60#include <linux/kthread.h> 61#include <linux/blkdev.h> 62#include <linux/bvec.h> 63#include <linux/net.h> 64#include <net/sock.h> 65#include <net/af_unix.h> 66#include <net/scm.h> 67#include <linux/anon_inodes.h> 68#include <linux/sched/mm.h> 69#include <linux/uaccess.h> 70#include <linux/nospec.h> 71#include <linux/sizes.h> 72#include <linux/hugetlb.h> 73 74#include <uapi/linux/io_uring.h> 75 76#include "internal.h" 77 78#define IORING_MAX_ENTRIES 4096 79#define IORING_MAX_FIXED_FILES 1024 80 81struct io_uring { 82 u32 head ____cacheline_aligned_in_smp; 83 u32 tail ____cacheline_aligned_in_smp; 84}; 85 86/* 87 * This data is shared with the application through the mmap at offset 88 * IORING_OFF_SQ_RING. 89 * 90 * The offsets to the member fields are published through struct 91 * io_sqring_offsets when calling io_uring_setup. 92 */ 93struct io_sq_ring { 94 /* 95 * Head and tail offsets into the ring; the offsets need to be 96 * masked to get valid indices. 97 * 98 * The kernel controls head and the application controls tail. 99 */ 100 struct io_uring r; 101 /* 102 * Bitmask to apply to head and tail offsets (constant, equals 103 * ring_entries - 1) 104 */ 105 u32 ring_mask; 106 /* Ring size (constant, power of 2) */ 107 u32 ring_entries; 108 /* 109 * Number of invalid entries dropped by the kernel due to 110 * invalid index stored in array 111 * 112 * Written by the kernel, shouldn't be modified by the 113 * application (i.e. get number of "new events" by comparing to 114 * cached value). 115 * 116 * After a new SQ head value was read by the application this 117 * counter includes all submissions that were dropped reaching 118 * the new SQ head (and possibly more). 119 */ 120 u32 dropped; 121 /* 122 * Runtime flags 123 * 124 * Written by the kernel, shouldn't be modified by the 125 * application. 126 * 127 * The application needs a full memory barrier before checking 128 * for IORING_SQ_NEED_WAKEUP after updating the sq tail. 129 */ 130 u32 flags; 131 /* 132 * Ring buffer of indices into array of io_uring_sqe, which is 133 * mmapped by the application using the IORING_OFF_SQES offset. 134 * 135 * This indirection could e.g. be used to assign fixed 136 * io_uring_sqe entries to operations and only submit them to 137 * the queue when needed. 138 * 139 * The kernel modifies neither the indices array nor the entries 140 * array. 141 */ 142 u32 array[]; 143}; 144 145/* 146 * This data is shared with the application through the mmap at offset 147 * IORING_OFF_CQ_RING. 148 * 149 * The offsets to the member fields are published through struct 150 * io_cqring_offsets when calling io_uring_setup. 151 */ 152struct io_cq_ring { 153 /* 154 * Head and tail offsets into the ring; the offsets need to be 155 * masked to get valid indices. 156 * 157 * The application controls head and the kernel tail. 158 */ 159 struct io_uring r; 160 /* 161 * Bitmask to apply to head and tail offsets (constant, equals 162 * ring_entries - 1) 163 */ 164 u32 ring_mask; 165 /* Ring size (constant, power of 2) */ 166 u32 ring_entries; 167 /* 168 * Number of completion events lost because the queue was full; 169 * this should be avoided by the application by making sure 170 * there are not more requests pending thatn there is space in 171 * the completion queue. 172 * 173 * Written by the kernel, shouldn't be modified by the 174 * application (i.e. get number of "new events" by comparing to 175 * cached value). 176 * 177 * As completion events come in out of order this counter is not 178 * ordered with any other data. 179 */ 180 u32 overflow; 181 /* 182 * Ring buffer of completion events. 183 * 184 * The kernel writes completion events fresh every time they are 185 * produced, so the application is allowed to modify pending 186 * entries. 187 */ 188 struct io_uring_cqe cqes[]; 189}; 190 191struct io_mapped_ubuf { 192 u64 ubuf; 193 size_t len; 194 struct bio_vec *bvec; 195 unsigned int nr_bvecs; 196}; 197 198struct async_list { 199 spinlock_t lock; 200 atomic_t cnt; 201 struct list_head list; 202 203 struct file *file; 204 off_t io_end; 205 size_t io_pages; 206}; 207 208struct io_ring_ctx { 209 struct { 210 struct percpu_ref refs; 211 } ____cacheline_aligned_in_smp; 212 213 struct { 214 unsigned int flags; 215 bool compat; 216 bool account_mem; 217 218 /* SQ ring */ 219 struct io_sq_ring *sq_ring; 220 unsigned cached_sq_head; 221 unsigned sq_entries; 222 unsigned sq_mask; 223 unsigned sq_thread_idle; 224 struct io_uring_sqe *sq_sqes; 225 226 struct list_head defer_list; 227 } ____cacheline_aligned_in_smp; 228 229 /* IO offload */ 230 struct workqueue_struct *sqo_wq; 231 struct task_struct *sqo_thread; /* if using sq thread polling */ 232 struct mm_struct *sqo_mm; 233 wait_queue_head_t sqo_wait; 234 235 struct { 236 /* CQ ring */ 237 struct io_cq_ring *cq_ring; 238 unsigned cached_cq_tail; 239 unsigned cq_entries; 240 unsigned cq_mask; 241 struct wait_queue_head cq_wait; 242 struct fasync_struct *cq_fasync; 243 struct eventfd_ctx *cq_ev_fd; 244 } ____cacheline_aligned_in_smp; 245 246 /* 247 * If used, fixed file set. Writers must ensure that ->refs is dead, 248 * readers must ensure that ->refs is alive as long as the file* is 249 * used. Only updated through io_uring_register(2). 250 */ 251 struct file **user_files; 252 unsigned nr_user_files; 253 254 /* if used, fixed mapped user buffers */ 255 unsigned nr_user_bufs; 256 struct io_mapped_ubuf *user_bufs; 257 258 struct user_struct *user; 259 260 struct completion ctx_done; 261 262 struct { 263 struct mutex uring_lock; 264 wait_queue_head_t wait; 265 } ____cacheline_aligned_in_smp; 266 267 struct { 268 spinlock_t completion_lock; 269 bool poll_multi_file; 270 /* 271 * ->poll_list is protected by the ctx->uring_lock for 272 * io_uring instances that don't use IORING_SETUP_SQPOLL. 273 * For SQPOLL, only the single threaded io_sq_thread() will 274 * manipulate the list, hence no extra locking is needed there. 275 */ 276 struct list_head poll_list; 277 struct list_head cancel_list; 278 } ____cacheline_aligned_in_smp; 279 280 struct async_list pending_async[2]; 281 282#if defined(CONFIG_UNIX) 283 struct socket *ring_sock; 284#endif 285}; 286 287struct sqe_submit { 288 const struct io_uring_sqe *sqe; 289 unsigned short index; 290 bool has_user; 291 bool needs_lock; 292 bool needs_fixed_file; 293}; 294 295/* 296 * First field must be the file pointer in all the 297 * iocb unions! See also 'struct kiocb' in <linux/fs.h> 298 */ 299struct io_poll_iocb { 300 struct file *file; 301 struct wait_queue_head *head; 302 __poll_t events; 303 bool done; 304 bool canceled; 305 struct wait_queue_entry wait; 306}; 307 308/* 309 * NOTE! Each of the iocb union members has the file pointer 310 * as the first entry in their struct definition. So you can 311 * access the file pointer through any of the sub-structs, 312 * or directly as just 'ki_filp' in this struct. 313 */ 314struct io_kiocb { 315 union { 316 struct file *file; 317 struct kiocb rw; 318 struct io_poll_iocb poll; 319 }; 320 321 struct sqe_submit submit; 322 323 struct io_ring_ctx *ctx; 324 struct list_head list; 325 unsigned int flags; 326 refcount_t refs; 327#define REQ_F_NOWAIT 1 /* must not punt to workers */ 328#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */ 329#define REQ_F_FIXED_FILE 4 /* ctx owns file */ 330#define REQ_F_SEQ_PREV 8 /* sequential with previous */ 331#define REQ_F_IO_DRAIN 16 /* drain existing IO first */ 332#define REQ_F_IO_DRAINED 32 /* drain done */ 333 u64 user_data; 334 u32 error; /* iopoll result from callback */ 335 u32 sequence; 336 337 struct work_struct work; 338}; 339 340#define IO_PLUG_THRESHOLD 2 341#define IO_IOPOLL_BATCH 8 342 343struct io_submit_state { 344 struct blk_plug plug; 345 346 /* 347 * io_kiocb alloc cache 348 */ 349 void *reqs[IO_IOPOLL_BATCH]; 350 unsigned int free_reqs; 351 unsigned int cur_req; 352 353 /* 354 * File reference cache 355 */ 356 struct file *file; 357 unsigned int fd; 358 unsigned int has_refs; 359 unsigned int used_refs; 360 unsigned int ios_left; 361}; 362 363static void io_sq_wq_submit_work(struct work_struct *work); 364 365static struct kmem_cache *req_cachep; 366 367static const struct file_operations io_uring_fops; 368 369struct sock *io_uring_get_socket(struct file *file) 370{ 371#if defined(CONFIG_UNIX) 372 if (file->f_op == &io_uring_fops) { 373 struct io_ring_ctx *ctx = file->private_data; 374 375 return ctx->ring_sock->sk; 376 } 377#endif 378 return NULL; 379} 380EXPORT_SYMBOL(io_uring_get_socket); 381 382static void io_ring_ctx_ref_free(struct percpu_ref *ref) 383{ 384 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); 385 386 complete(&ctx->ctx_done); 387} 388 389static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) 390{ 391 struct io_ring_ctx *ctx; 392 int i; 393 394 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 395 if (!ctx) 396 return NULL; 397 398 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) { 399 kfree(ctx); 400 return NULL; 401 } 402 403 ctx->flags = p->flags; 404 init_waitqueue_head(&ctx->cq_wait); 405 init_completion(&ctx->ctx_done); 406 mutex_init(&ctx->uring_lock); 407 init_waitqueue_head(&ctx->wait); 408 for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) { 409 spin_lock_init(&ctx->pending_async[i].lock); 410 INIT_LIST_HEAD(&ctx->pending_async[i].list); 411 atomic_set(&ctx->pending_async[i].cnt, 0); 412 } 413 spin_lock_init(&ctx->completion_lock); 414 INIT_LIST_HEAD(&ctx->poll_list); 415 INIT_LIST_HEAD(&ctx->cancel_list); 416 INIT_LIST_HEAD(&ctx->defer_list); 417 return ctx; 418} 419 420static inline bool io_sequence_defer(struct io_ring_ctx *ctx, 421 struct io_kiocb *req) 422{ 423 if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) 424 return false; 425 426 return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped; 427} 428 429static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) 430{ 431 struct io_kiocb *req; 432 433 if (list_empty(&ctx->defer_list)) 434 return NULL; 435 436 req = list_first_entry(&ctx->defer_list, struct io_kiocb, list); 437 if (!io_sequence_defer(ctx, req)) { 438 list_del_init(&req->list); 439 return req; 440 } 441 442 return NULL; 443} 444 445static void __io_commit_cqring(struct io_ring_ctx *ctx) 446{ 447 struct io_cq_ring *ring = ctx->cq_ring; 448 449 if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) { 450 /* order cqe stores with ring update */ 451 smp_store_release(&ring->r.tail, ctx->cached_cq_tail); 452 453 if (wq_has_sleeper(&ctx->cq_wait)) { 454 wake_up_interruptible(&ctx->cq_wait); 455 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); 456 } 457 } 458} 459 460static void io_commit_cqring(struct io_ring_ctx *ctx) 461{ 462 struct io_kiocb *req; 463 464 __io_commit_cqring(ctx); 465 466 while ((req = io_get_deferred_req(ctx)) != NULL) { 467 req->flags |= REQ_F_IO_DRAINED; 468 queue_work(ctx->sqo_wq, &req->work); 469 } 470} 471 472static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) 473{ 474 struct io_cq_ring *ring = ctx->cq_ring; 475 unsigned tail; 476 477 tail = ctx->cached_cq_tail; 478 /* 479 * writes to the cq entry need to come after reading head; the 480 * control dependency is enough as we're using WRITE_ONCE to 481 * fill the cq entry 482 */ 483 if (tail - READ_ONCE(ring->r.head) == ring->ring_entries) 484 return NULL; 485 486 ctx->cached_cq_tail++; 487 return &ring->cqes[tail & ctx->cq_mask]; 488} 489 490static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, 491 long res) 492{ 493 struct io_uring_cqe *cqe; 494 495 /* 496 * If we can't get a cq entry, userspace overflowed the 497 * submission (by quite a lot). Increment the overflow count in 498 * the ring. 499 */ 500 cqe = io_get_cqring(ctx); 501 if (cqe) { 502 WRITE_ONCE(cqe->user_data, ki_user_data); 503 WRITE_ONCE(cqe->res, res); 504 WRITE_ONCE(cqe->flags, 0); 505 } else { 506 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow); 507 508 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1); 509 } 510} 511 512static void io_cqring_ev_posted(struct io_ring_ctx *ctx) 513{ 514 if (waitqueue_active(&ctx->wait)) 515 wake_up(&ctx->wait); 516 if (waitqueue_active(&ctx->sqo_wait)) 517 wake_up(&ctx->sqo_wait); 518 if (ctx->cq_ev_fd) 519 eventfd_signal(ctx->cq_ev_fd, 1); 520} 521 522static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data, 523 long res) 524{ 525 unsigned long flags; 526 527 spin_lock_irqsave(&ctx->completion_lock, flags); 528 io_cqring_fill_event(ctx, user_data, res); 529 io_commit_cqring(ctx); 530 spin_unlock_irqrestore(&ctx->completion_lock, flags); 531 532 io_cqring_ev_posted(ctx); 533} 534 535static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs) 536{ 537 percpu_ref_put_many(&ctx->refs, refs); 538 539 if (waitqueue_active(&ctx->wait)) 540 wake_up(&ctx->wait); 541} 542 543static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, 544 struct io_submit_state *state) 545{ 546 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN; 547 struct io_kiocb *req; 548 549 if (!percpu_ref_tryget(&ctx->refs)) 550 return NULL; 551 552 if (!state) { 553 req = kmem_cache_alloc(req_cachep, gfp); 554 if (unlikely(!req)) 555 goto out; 556 } else if (!state->free_reqs) { 557 size_t sz; 558 int ret; 559 560 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs)); 561 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs); 562 563 /* 564 * Bulk alloc is all-or-nothing. If we fail to get a batch, 565 * retry single alloc to be on the safe side. 566 */ 567 if (unlikely(ret <= 0)) { 568 state->reqs[0] = kmem_cache_alloc(req_cachep, gfp); 569 if (!state->reqs[0]) 570 goto out; 571 ret = 1; 572 } 573 state->free_reqs = ret - 1; 574 state->cur_req = 1; 575 req = state->reqs[0]; 576 } else { 577 req = state->reqs[state->cur_req]; 578 state->free_reqs--; 579 state->cur_req++; 580 } 581 582 req->file = NULL; 583 req->ctx = ctx; 584 req->flags = 0; 585 /* one is dropped after submission, the other at completion */ 586 refcount_set(&req->refs, 2); 587 return req; 588out: 589 io_ring_drop_ctx_refs(ctx, 1); 590 return NULL; 591} 592 593static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr) 594{ 595 if (*nr) { 596 kmem_cache_free_bulk(req_cachep, *nr, reqs); 597 io_ring_drop_ctx_refs(ctx, *nr); 598 *nr = 0; 599 } 600} 601 602static void io_free_req(struct io_kiocb *req) 603{ 604 if (req->file && !(req->flags & REQ_F_FIXED_FILE)) 605 fput(req->file); 606 io_ring_drop_ctx_refs(req->ctx, 1); 607 kmem_cache_free(req_cachep, req); 608} 609 610static void io_put_req(struct io_kiocb *req) 611{ 612 if (refcount_dec_and_test(&req->refs)) 613 io_free_req(req); 614} 615 616/* 617 * Find and free completed poll iocbs 618 */ 619static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, 620 struct list_head *done) 621{ 622 void *reqs[IO_IOPOLL_BATCH]; 623 struct io_kiocb *req; 624 int to_free; 625 626 to_free = 0; 627 while (!list_empty(done)) { 628 req = list_first_entry(done, struct io_kiocb, list); 629 list_del(&req->list); 630 631 io_cqring_fill_event(ctx, req->user_data, req->error); 632 (*nr_events)++; 633 634 if (refcount_dec_and_test(&req->refs)) { 635 /* If we're not using fixed files, we have to pair the 636 * completion part with the file put. Use regular 637 * completions for those, only batch free for fixed 638 * file. 639 */ 640 if (req->flags & REQ_F_FIXED_FILE) { 641 reqs[to_free++] = req; 642 if (to_free == ARRAY_SIZE(reqs)) 643 io_free_req_many(ctx, reqs, &to_free); 644 } else { 645 io_free_req(req); 646 } 647 } 648 } 649 650 io_commit_cqring(ctx); 651 io_free_req_many(ctx, reqs, &to_free); 652} 653 654static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, 655 long min) 656{ 657 struct io_kiocb *req, *tmp; 658 LIST_HEAD(done); 659 bool spin; 660 int ret; 661 662 /* 663 * Only spin for completions if we don't have multiple devices hanging 664 * off our complete list, and we're under the requested amount. 665 */ 666 spin = !ctx->poll_multi_file && *nr_events < min; 667 668 ret = 0; 669 list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) { 670 struct kiocb *kiocb = &req->rw; 671 672 /* 673 * Move completed entries to our local list. If we find a 674 * request that requires polling, break out and complete 675 * the done list first, if we have entries there. 676 */ 677 if (req->flags & REQ_F_IOPOLL_COMPLETED) { 678 list_move_tail(&req->list, &done); 679 continue; 680 } 681 if (!list_empty(&done)) 682 break; 683 684 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin); 685 if (ret < 0) 686 break; 687 688 if (ret && spin) 689 spin = false; 690 ret = 0; 691 } 692 693 if (!list_empty(&done)) 694 io_iopoll_complete(ctx, nr_events, &done); 695 696 return ret; 697} 698 699/* 700 * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a 701 * non-spinning poll check - we'll still enter the driver poll loop, but only 702 * as a non-spinning completion check. 703 */ 704static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events, 705 long min) 706{ 707 while (!list_empty(&ctx->poll_list)) { 708 int ret; 709 710 ret = io_do_iopoll(ctx, nr_events, min); 711 if (ret < 0) 712 return ret; 713 if (!min || *nr_events >= min) 714 return 0; 715 } 716 717 return 1; 718} 719 720/* 721 * We can't just wait for polled events to come to us, we have to actively 722 * find and complete them. 723 */ 724static void io_iopoll_reap_events(struct io_ring_ctx *ctx) 725{ 726 if (!(ctx->flags & IORING_SETUP_IOPOLL)) 727 return; 728 729 mutex_lock(&ctx->uring_lock); 730 while (!list_empty(&ctx->poll_list)) { 731 unsigned int nr_events = 0; 732 733 io_iopoll_getevents(ctx, &nr_events, 1); 734 } 735 mutex_unlock(&ctx->uring_lock); 736} 737 738static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, 739 long min) 740{ 741 int ret = 0; 742 743 do { 744 int tmin = 0; 745 746 if (*nr_events < min) 747 tmin = min - *nr_events; 748 749 ret = io_iopoll_getevents(ctx, nr_events, tmin); 750 if (ret <= 0) 751 break; 752 ret = 0; 753 } while (min && !*nr_events && !need_resched()); 754 755 return ret; 756} 757 758static void kiocb_end_write(struct kiocb *kiocb) 759{ 760 if (kiocb->ki_flags & IOCB_WRITE) { 761 struct inode *inode = file_inode(kiocb->ki_filp); 762 763 /* 764 * Tell lockdep we inherited freeze protection from submission 765 * thread. 766 */ 767 if (S_ISREG(inode->i_mode)) 768 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE); 769 file_end_write(kiocb->ki_filp); 770 } 771} 772 773static void io_complete_rw(struct kiocb *kiocb, long res, long res2) 774{ 775 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 776 777 kiocb_end_write(kiocb); 778 779 io_cqring_add_event(req->ctx, req->user_data, res); 780 io_put_req(req); 781} 782 783static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) 784{ 785 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 786 787 kiocb_end_write(kiocb); 788 789 req->error = res; 790 if (res != -EAGAIN) 791 req->flags |= REQ_F_IOPOLL_COMPLETED; 792} 793 794/* 795 * After the iocb has been issued, it's safe to be found on the poll list. 796 * Adding the kiocb to the list AFTER submission ensures that we don't 797 * find it from a io_iopoll_getevents() thread before the issuer is done 798 * accessing the kiocb cookie. 799 */ 800static void io_iopoll_req_issued(struct io_kiocb *req) 801{ 802 struct io_ring_ctx *ctx = req->ctx; 803 804 /* 805 * Track whether we have multiple files in our lists. This will impact 806 * how we do polling eventually, not spinning if we're on potentially 807 * different devices. 808 */ 809 if (list_empty(&ctx->poll_list)) { 810 ctx->poll_multi_file = false; 811 } else if (!ctx->poll_multi_file) { 812 struct io_kiocb *list_req; 813 814 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb, 815 list); 816 if (list_req->rw.ki_filp != req->rw.ki_filp) 817 ctx->poll_multi_file = true; 818 } 819 820 /* 821 * For fast devices, IO may have already completed. If it has, add 822 * it to the front so we find it first. 823 */ 824 if (req->flags & REQ_F_IOPOLL_COMPLETED) 825 list_add(&req->list, &ctx->poll_list); 826 else 827 list_add_tail(&req->list, &ctx->poll_list); 828} 829 830static void io_file_put(struct io_submit_state *state) 831{ 832 if (state->file) { 833 int diff = state->has_refs - state->used_refs; 834 835 if (diff) 836 fput_many(state->file, diff); 837 state->file = NULL; 838 } 839} 840 841/* 842 * Get as many references to a file as we have IOs left in this submission, 843 * assuming most submissions are for one file, or at least that each file 844 * has more than one submission. 845 */ 846static struct file *io_file_get(struct io_submit_state *state, int fd) 847{ 848 if (!state) 849 return fget(fd); 850 851 if (state->file) { 852 if (state->fd == fd) { 853 state->used_refs++; 854 state->ios_left--; 855 return state->file; 856 } 857 io_file_put(state); 858 } 859 state->file = fget_many(fd, state->ios_left); 860 if (!state->file) 861 return NULL; 862 863 state->fd = fd; 864 state->has_refs = state->ios_left; 865 state->used_refs = 1; 866 state->ios_left--; 867 return state->file; 868} 869 870/* 871 * If we tracked the file through the SCM inflight mechanism, we could support 872 * any file. For now, just ensure that anything potentially problematic is done 873 * inline. 874 */ 875static bool io_file_supports_async(struct file *file) 876{ 877 umode_t mode = file_inode(file)->i_mode; 878 879 if (S_ISBLK(mode) || S_ISCHR(mode)) 880 return true; 881 if (S_ISREG(mode) && file->f_op != &io_uring_fops) 882 return true; 883 884 return false; 885} 886 887static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, 888 bool force_nonblock) 889{ 890 const struct io_uring_sqe *sqe = s->sqe; 891 struct io_ring_ctx *ctx = req->ctx; 892 struct kiocb *kiocb = &req->rw; 893 unsigned ioprio; 894 int ret; 895 896 if (!req->file) 897 return -EBADF; 898 899 if (force_nonblock && !io_file_supports_async(req->file)) 900 force_nonblock = false; 901 902 kiocb->ki_pos = READ_ONCE(sqe->off); 903 kiocb->ki_flags = iocb_flags(kiocb->ki_filp); 904 kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp)); 905 906 ioprio = READ_ONCE(sqe->ioprio); 907 if (ioprio) { 908 ret = ioprio_check_cap(ioprio); 909 if (ret) 910 return ret; 911 912 kiocb->ki_ioprio = ioprio; 913 } else 914 kiocb->ki_ioprio = get_current_ioprio(); 915 916 ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags)); 917 if (unlikely(ret)) 918 return ret; 919 920 /* don't allow async punt if RWF_NOWAIT was requested */ 921 if (kiocb->ki_flags & IOCB_NOWAIT) 922 req->flags |= REQ_F_NOWAIT; 923 924 if (force_nonblock) 925 kiocb->ki_flags |= IOCB_NOWAIT; 926 927 if (ctx->flags & IORING_SETUP_IOPOLL) { 928 if (!(kiocb->ki_flags & IOCB_DIRECT) || 929 !kiocb->ki_filp->f_op->iopoll) 930 return -EOPNOTSUPP; 931 932 req->error = 0; 933 kiocb->ki_flags |= IOCB_HIPRI; 934 kiocb->ki_complete = io_complete_rw_iopoll; 935 } else { 936 if (kiocb->ki_flags & IOCB_HIPRI) 937 return -EINVAL; 938 kiocb->ki_complete = io_complete_rw; 939 } 940 return 0; 941} 942 943static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) 944{ 945 switch (ret) { 946 case -EIOCBQUEUED: 947 break; 948 case -ERESTARTSYS: 949 case -ERESTARTNOINTR: 950 case -ERESTARTNOHAND: 951 case -ERESTART_RESTARTBLOCK: 952 /* 953 * We can't just restart the syscall, since previously 954 * submitted sqes may already be in progress. Just fail this 955 * IO with EINTR. 956 */ 957 ret = -EINTR; 958 /* fall through */ 959 default: 960 kiocb->ki_complete(kiocb, ret, 0); 961 } 962} 963 964static int io_import_fixed(struct io_ring_ctx *ctx, int rw, 965 const struct io_uring_sqe *sqe, 966 struct iov_iter *iter) 967{ 968 size_t len = READ_ONCE(sqe->len); 969 struct io_mapped_ubuf *imu; 970 unsigned index, buf_index; 971 size_t offset; 972 u64 buf_addr; 973 974 /* attempt to use fixed buffers without having provided iovecs */ 975 if (unlikely(!ctx->user_bufs)) 976 return -EFAULT; 977 978 buf_index = READ_ONCE(sqe->buf_index); 979 if (unlikely(buf_index >= ctx->nr_user_bufs)) 980 return -EFAULT; 981 982 index = array_index_nospec(buf_index, ctx->nr_user_bufs); 983 imu = &ctx->user_bufs[index]; 984 buf_addr = READ_ONCE(sqe->addr); 985 986 /* overflow */ 987 if (buf_addr + len < buf_addr) 988 return -EFAULT; 989 /* not inside the mapped region */ 990 if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len) 991 return -EFAULT; 992 993 /* 994 * May not be a start of buffer, set size appropriately 995 * and advance us to the beginning. 996 */ 997 offset = buf_addr - imu->ubuf; 998 iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len); 999 if (offset) 1000 iov_iter_advance(iter, offset); 1001 1002 /* don't drop a reference to these pages */ 1003 iter->type |= ITER_BVEC_FLAG_NO_REF; 1004 return 0; 1005} 1006 1007static int io_import_iovec(struct io_ring_ctx *ctx, int rw, 1008 const struct sqe_submit *s, struct iovec **iovec, 1009 struct iov_iter *iter) 1010{ 1011 const struct io_uring_sqe *sqe = s->sqe; 1012 void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr)); 1013 size_t sqe_len = READ_ONCE(sqe->len); 1014 u8 opcode; 1015 1016 /* 1017 * We're reading ->opcode for the second time, but the first read 1018 * doesn't care whether it's _FIXED or not, so it doesn't matter 1019 * whether ->opcode changes concurrently. The first read does care 1020 * about whether it is a READ or a WRITE, so we don't trust this read 1021 * for that purpose and instead let the caller pass in the read/write 1022 * flag. 1023 */ 1024 opcode = READ_ONCE(sqe->opcode); 1025 if (opcode == IORING_OP_READ_FIXED || 1026 opcode == IORING_OP_WRITE_FIXED) { 1027 int ret = io_import_fixed(ctx, rw, sqe, iter); 1028 *iovec = NULL; 1029 return ret; 1030 } 1031 1032 if (!s->has_user) 1033 return -EFAULT; 1034 1035#ifdef CONFIG_COMPAT 1036 if (ctx->compat) 1037 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV, 1038 iovec, iter); 1039#endif 1040 1041 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); 1042} 1043 1044/* 1045 * Make a note of the last file/offset/direction we punted to async 1046 * context. We'll use this information to see if we can piggy back a 1047 * sequential request onto the previous one, if it's still hasn't been 1048 * completed by the async worker. 1049 */ 1050static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) 1051{ 1052 struct async_list *async_list = &req->ctx->pending_async[rw]; 1053 struct kiocb *kiocb = &req->rw; 1054 struct file *filp = kiocb->ki_filp; 1055 off_t io_end = kiocb->ki_pos + len; 1056 1057 if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) { 1058 unsigned long max_pages; 1059 1060 /* Use 8x RA size as a decent limiter for both reads/writes */ 1061 max_pages = filp->f_ra.ra_pages; 1062 if (!max_pages) 1063 max_pages = VM_READAHEAD_PAGES; 1064 max_pages *= 8; 1065 1066 /* If max pages are exceeded, reset the state */ 1067 len >>= PAGE_SHIFT; 1068 if (async_list->io_pages + len <= max_pages) { 1069 req->flags |= REQ_F_SEQ_PREV; 1070 async_list->io_pages += len; 1071 } else { 1072 io_end = 0; 1073 async_list->io_pages = 0; 1074 } 1075 } 1076 1077 /* New file? Reset state. */ 1078 if (async_list->file != filp) { 1079 async_list->io_pages = 0; 1080 async_list->file = filp; 1081 } 1082 async_list->io_end = io_end; 1083} 1084 1085static int io_read(struct io_kiocb *req, const struct sqe_submit *s, 1086 bool force_nonblock) 1087{ 1088 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1089 struct kiocb *kiocb = &req->rw; 1090 struct iov_iter iter; 1091 struct file *file; 1092 size_t iov_count; 1093 int ret; 1094 1095 ret = io_prep_rw(req, s, force_nonblock); 1096 if (ret) 1097 return ret; 1098 file = kiocb->ki_filp; 1099 1100 if (unlikely(!(file->f_mode & FMODE_READ))) 1101 return -EBADF; 1102 if (unlikely(!file->f_op->read_iter)) 1103 return -EINVAL; 1104 1105 ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter); 1106 if (ret) 1107 return ret; 1108 1109 iov_count = iov_iter_count(&iter); 1110 ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count); 1111 if (!ret) { 1112 ssize_t ret2; 1113 1114 /* Catch -EAGAIN return for forced non-blocking submission */ 1115 ret2 = call_read_iter(file, kiocb, &iter); 1116 if (!force_nonblock || ret2 != -EAGAIN) { 1117 io_rw_done(kiocb, ret2); 1118 } else { 1119 /* 1120 * If ->needs_lock is true, we're already in async 1121 * context. 1122 */ 1123 if (!s->needs_lock) 1124 io_async_list_note(READ, req, iov_count); 1125 ret = -EAGAIN; 1126 } 1127 } 1128 kfree(iovec); 1129 return ret; 1130} 1131 1132static int io_write(struct io_kiocb *req, const struct sqe_submit *s, 1133 bool force_nonblock) 1134{ 1135 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1136 struct kiocb *kiocb = &req->rw; 1137 struct iov_iter iter; 1138 struct file *file; 1139 size_t iov_count; 1140 int ret; 1141 1142 ret = io_prep_rw(req, s, force_nonblock); 1143 if (ret) 1144 return ret; 1145 1146 file = kiocb->ki_filp; 1147 if (unlikely(!(file->f_mode & FMODE_WRITE))) 1148 return -EBADF; 1149 if (unlikely(!file->f_op->write_iter)) 1150 return -EINVAL; 1151 1152 ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter); 1153 if (ret) 1154 return ret; 1155 1156 iov_count = iov_iter_count(&iter); 1157 1158 ret = -EAGAIN; 1159 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) { 1160 /* If ->needs_lock is true, we're already in async context. */ 1161 if (!s->needs_lock) 1162 io_async_list_note(WRITE, req, iov_count); 1163 goto out_free; 1164 } 1165 1166 ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count); 1167 if (!ret) { 1168 ssize_t ret2; 1169 1170 /* 1171 * Open-code file_start_write here to grab freeze protection, 1172 * which will be released by another thread in 1173 * io_complete_rw(). Fool lockdep by telling it the lock got 1174 * released so that it doesn't complain about the held lock when 1175 * we return to userspace. 1176 */ 1177 if (S_ISREG(file_inode(file)->i_mode)) { 1178 __sb_start_write(file_inode(file)->i_sb, 1179 SB_FREEZE_WRITE, true); 1180 __sb_writers_release(file_inode(file)->i_sb, 1181 SB_FREEZE_WRITE); 1182 } 1183 kiocb->ki_flags |= IOCB_WRITE; 1184 1185 ret2 = call_write_iter(file, kiocb, &iter); 1186 if (!force_nonblock || ret2 != -EAGAIN) { 1187 io_rw_done(kiocb, ret2); 1188 } else { 1189 /* 1190 * If ->needs_lock is true, we're already in async 1191 * context. 1192 */ 1193 if (!s->needs_lock) 1194 io_async_list_note(WRITE, req, iov_count); 1195 ret = -EAGAIN; 1196 } 1197 } 1198out_free: 1199 kfree(iovec); 1200 return ret; 1201} 1202 1203/* 1204 * IORING_OP_NOP just posts a completion event, nothing else. 1205 */ 1206static int io_nop(struct io_kiocb *req, u64 user_data) 1207{ 1208 struct io_ring_ctx *ctx = req->ctx; 1209 long err = 0; 1210 1211 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1212 return -EINVAL; 1213 1214 io_cqring_add_event(ctx, user_data, err); 1215 io_put_req(req); 1216 return 0; 1217} 1218 1219static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1220{ 1221 struct io_ring_ctx *ctx = req->ctx; 1222 1223 if (!req->file) 1224 return -EBADF; 1225 1226 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1227 return -EINVAL; 1228 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 1229 return -EINVAL; 1230 1231 return 0; 1232} 1233 1234static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1235 bool force_nonblock) 1236{ 1237 loff_t sqe_off = READ_ONCE(sqe->off); 1238 loff_t sqe_len = READ_ONCE(sqe->len); 1239 loff_t end = sqe_off + sqe_len; 1240 unsigned fsync_flags; 1241 int ret; 1242 1243 fsync_flags = READ_ONCE(sqe->fsync_flags); 1244 if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC)) 1245 return -EINVAL; 1246 1247 ret = io_prep_fsync(req, sqe); 1248 if (ret) 1249 return ret; 1250 1251 /* fsync always requires a blocking context */ 1252 if (force_nonblock) 1253 return -EAGAIN; 1254 1255 ret = vfs_fsync_range(req->rw.ki_filp, sqe_off, 1256 end > 0 ? end : LLONG_MAX, 1257 fsync_flags & IORING_FSYNC_DATASYNC); 1258 1259 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1260 io_put_req(req); 1261 return 0; 1262} 1263 1264static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1265{ 1266 struct io_ring_ctx *ctx = req->ctx; 1267 int ret = 0; 1268 1269 if (!req->file) 1270 return -EBADF; 1271 1272 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1273 return -EINVAL; 1274 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 1275 return -EINVAL; 1276 1277 return ret; 1278} 1279 1280static int io_sync_file_range(struct io_kiocb *req, 1281 const struct io_uring_sqe *sqe, 1282 bool force_nonblock) 1283{ 1284 loff_t sqe_off; 1285 loff_t sqe_len; 1286 unsigned flags; 1287 int ret; 1288 1289 ret = io_prep_sfr(req, sqe); 1290 if (ret) 1291 return ret; 1292 1293 /* sync_file_range always requires a blocking context */ 1294 if (force_nonblock) 1295 return -EAGAIN; 1296 1297 sqe_off = READ_ONCE(sqe->off); 1298 sqe_len = READ_ONCE(sqe->len); 1299 flags = READ_ONCE(sqe->sync_range_flags); 1300 1301 ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags); 1302 1303 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1304 io_put_req(req); 1305 return 0; 1306} 1307 1308static void io_poll_remove_one(struct io_kiocb *req) 1309{ 1310 struct io_poll_iocb *poll = &req->poll; 1311 1312 spin_lock(&poll->head->lock); 1313 WRITE_ONCE(poll->canceled, true); 1314 if (!list_empty(&poll->wait.entry)) { 1315 list_del_init(&poll->wait.entry); 1316 queue_work(req->ctx->sqo_wq, &req->work); 1317 } 1318 spin_unlock(&poll->head->lock); 1319 1320 list_del_init(&req->list); 1321} 1322 1323static void io_poll_remove_all(struct io_ring_ctx *ctx) 1324{ 1325 struct io_kiocb *req; 1326 1327 spin_lock_irq(&ctx->completion_lock); 1328 while (!list_empty(&ctx->cancel_list)) { 1329 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list); 1330 io_poll_remove_one(req); 1331 } 1332 spin_unlock_irq(&ctx->completion_lock); 1333} 1334 1335/* 1336 * Find a running poll command that matches one specified in sqe->addr, 1337 * and remove it if found. 1338 */ 1339static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1340{ 1341 struct io_ring_ctx *ctx = req->ctx; 1342 struct io_kiocb *poll_req, *next; 1343 int ret = -ENOENT; 1344 1345 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1346 return -EINVAL; 1347 if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index || 1348 sqe->poll_events) 1349 return -EINVAL; 1350 1351 spin_lock_irq(&ctx->completion_lock); 1352 list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) { 1353 if (READ_ONCE(sqe->addr) == poll_req->user_data) { 1354 io_poll_remove_one(poll_req); 1355 ret = 0; 1356 break; 1357 } 1358 } 1359 spin_unlock_irq(&ctx->completion_lock); 1360 1361 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1362 io_put_req(req); 1363 return 0; 1364} 1365 1366static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req, 1367 __poll_t mask) 1368{ 1369 req->poll.done = true; 1370 io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask)); 1371 io_commit_cqring(ctx); 1372} 1373 1374static void io_poll_complete_work(struct work_struct *work) 1375{ 1376 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1377 struct io_poll_iocb *poll = &req->poll; 1378 struct poll_table_struct pt = { ._key = poll->events }; 1379 struct io_ring_ctx *ctx = req->ctx; 1380 __poll_t mask = 0; 1381 1382 if (!READ_ONCE(poll->canceled)) 1383 mask = vfs_poll(poll->file, &pt) & poll->events; 1384 1385 /* 1386 * Note that ->ki_cancel callers also delete iocb from active_reqs after 1387 * calling ->ki_cancel. We need the ctx_lock roundtrip here to 1388 * synchronize with them. In the cancellation case the list_del_init 1389 * itself is not actually needed, but harmless so we keep it in to 1390 * avoid further branches in the fast path. 1391 */ 1392 spin_lock_irq(&ctx->completion_lock); 1393 if (!mask && !READ_ONCE(poll->canceled)) { 1394 add_wait_queue(poll->head, &poll->wait); 1395 spin_unlock_irq(&ctx->completion_lock); 1396 return; 1397 } 1398 list_del_init(&req->list); 1399 io_poll_complete(ctx, req, mask); 1400 spin_unlock_irq(&ctx->completion_lock); 1401 1402 io_cqring_ev_posted(ctx); 1403 io_put_req(req); 1404} 1405 1406static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, 1407 void *key) 1408{ 1409 struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb, 1410 wait); 1411 struct io_kiocb *req = container_of(poll, struct io_kiocb, poll); 1412 struct io_ring_ctx *ctx = req->ctx; 1413 __poll_t mask = key_to_poll(key); 1414 unsigned long flags; 1415 1416 /* for instances that support it check for an event match first: */ 1417 if (mask && !(mask & poll->events)) 1418 return 0; 1419 1420 list_del_init(&poll->wait.entry); 1421 1422 if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) { 1423 list_del(&req->list); 1424 io_poll_complete(ctx, req, mask); 1425 spin_unlock_irqrestore(&ctx->completion_lock, flags); 1426 1427 io_cqring_ev_posted(ctx); 1428 io_put_req(req); 1429 } else { 1430 queue_work(ctx->sqo_wq, &req->work); 1431 } 1432 1433 return 1; 1434} 1435 1436struct io_poll_table { 1437 struct poll_table_struct pt; 1438 struct io_kiocb *req; 1439 int error; 1440}; 1441 1442static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, 1443 struct poll_table_struct *p) 1444{ 1445 struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); 1446 1447 if (unlikely(pt->req->poll.head)) { 1448 pt->error = -EINVAL; 1449 return; 1450 } 1451 1452 pt->error = 0; 1453 pt->req->poll.head = head; 1454 add_wait_queue(head, &pt->req->poll.wait); 1455} 1456 1457static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1458{ 1459 struct io_poll_iocb *poll = &req->poll; 1460 struct io_ring_ctx *ctx = req->ctx; 1461 struct io_poll_table ipt; 1462 bool cancel = false; 1463 __poll_t mask; 1464 u16 events; 1465 1466 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1467 return -EINVAL; 1468 if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index) 1469 return -EINVAL; 1470 if (!poll->file) 1471 return -EBADF; 1472 1473 INIT_WORK(&req->work, io_poll_complete_work); 1474 events = READ_ONCE(sqe->poll_events); 1475 poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; 1476 1477 poll->head = NULL; 1478 poll->done = false; 1479 poll->canceled = false; 1480 1481 ipt.pt._qproc = io_poll_queue_proc; 1482 ipt.pt._key = poll->events; 1483 ipt.req = req; 1484 ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */ 1485 1486 /* initialized the list so that we can do list_empty checks */ 1487 INIT_LIST_HEAD(&poll->wait.entry); 1488 init_waitqueue_func_entry(&poll->wait, io_poll_wake); 1489 1490 mask = vfs_poll(poll->file, &ipt.pt) & poll->events; 1491 1492 spin_lock_irq(&ctx->completion_lock); 1493 if (likely(poll->head)) { 1494 spin_lock(&poll->head->lock); 1495 if (unlikely(list_empty(&poll->wait.entry))) { 1496 if (ipt.error) 1497 cancel = true; 1498 ipt.error = 0; 1499 mask = 0; 1500 } 1501 if (mask || ipt.error) 1502 list_del_init(&poll->wait.entry); 1503 else if (cancel) 1504 WRITE_ONCE(poll->canceled, true); 1505 else if (!poll->done) /* actually waiting for an event */ 1506 list_add_tail(&req->list, &ctx->cancel_list); 1507 spin_unlock(&poll->head->lock); 1508 } 1509 if (mask) { /* no async, we'd stolen it */ 1510 ipt.error = 0; 1511 io_poll_complete(ctx, req, mask); 1512 } 1513 spin_unlock_irq(&ctx->completion_lock); 1514 1515 if (mask) { 1516 io_cqring_ev_posted(ctx); 1517 io_put_req(req); 1518 } 1519 return ipt.error; 1520} 1521 1522static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, 1523 const struct io_uring_sqe *sqe) 1524{ 1525 struct io_uring_sqe *sqe_copy; 1526 1527 if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) 1528 return 0; 1529 1530 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); 1531 if (!sqe_copy) 1532 return -EAGAIN; 1533 1534 spin_lock_irq(&ctx->completion_lock); 1535 if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) { 1536 spin_unlock_irq(&ctx->completion_lock); 1537 kfree(sqe_copy); 1538 return 0; 1539 } 1540 1541 memcpy(sqe_copy, sqe, sizeof(*sqe_copy)); 1542 req->submit.sqe = sqe_copy; 1543 1544 INIT_WORK(&req->work, io_sq_wq_submit_work); 1545 list_add_tail(&req->list, &ctx->defer_list); 1546 spin_unlock_irq(&ctx->completion_lock); 1547 return -EIOCBQUEUED; 1548} 1549 1550static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 1551 const struct sqe_submit *s, bool force_nonblock) 1552{ 1553 int ret, opcode; 1554 1555 if (unlikely(s->index >= ctx->sq_entries)) 1556 return -EINVAL; 1557 req->user_data = READ_ONCE(s->sqe->user_data); 1558 1559 opcode = READ_ONCE(s->sqe->opcode); 1560 switch (opcode) { 1561 case IORING_OP_NOP: 1562 ret = io_nop(req, req->user_data); 1563 break; 1564 case IORING_OP_READV: 1565 if (unlikely(s->sqe->buf_index)) 1566 return -EINVAL; 1567 ret = io_read(req, s, force_nonblock); 1568 break; 1569 case IORING_OP_WRITEV: 1570 if (unlikely(s->sqe->buf_index)) 1571 return -EINVAL; 1572 ret = io_write(req, s, force_nonblock); 1573 break; 1574 case IORING_OP_READ_FIXED: 1575 ret = io_read(req, s, force_nonblock); 1576 break; 1577 case IORING_OP_WRITE_FIXED: 1578 ret = io_write(req, s, force_nonblock); 1579 break; 1580 case IORING_OP_FSYNC: 1581 ret = io_fsync(req, s->sqe, force_nonblock); 1582 break; 1583 case IORING_OP_POLL_ADD: 1584 ret = io_poll_add(req, s->sqe); 1585 break; 1586 case IORING_OP_POLL_REMOVE: 1587 ret = io_poll_remove(req, s->sqe); 1588 break; 1589 case IORING_OP_SYNC_FILE_RANGE: 1590 ret = io_sync_file_range(req, s->sqe, force_nonblock); 1591 break; 1592 default: 1593 ret = -EINVAL; 1594 break; 1595 } 1596 1597 if (ret) 1598 return ret; 1599 1600 if (ctx->flags & IORING_SETUP_IOPOLL) { 1601 if (req->error == -EAGAIN) 1602 return -EAGAIN; 1603 1604 /* workqueue context doesn't hold uring_lock, grab it now */ 1605 if (s->needs_lock) 1606 mutex_lock(&ctx->uring_lock); 1607 io_iopoll_req_issued(req); 1608 if (s->needs_lock) 1609 mutex_unlock(&ctx->uring_lock); 1610 } 1611 1612 return 0; 1613} 1614 1615static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx, 1616 const struct io_uring_sqe *sqe) 1617{ 1618 switch (sqe->opcode) { 1619 case IORING_OP_READV: 1620 case IORING_OP_READ_FIXED: 1621 return &ctx->pending_async[READ]; 1622 case IORING_OP_WRITEV: 1623 case IORING_OP_WRITE_FIXED: 1624 return &ctx->pending_async[WRITE]; 1625 default: 1626 return NULL; 1627 } 1628} 1629 1630static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) 1631{ 1632 u8 opcode = READ_ONCE(sqe->opcode); 1633 1634 return !(opcode == IORING_OP_READ_FIXED || 1635 opcode == IORING_OP_WRITE_FIXED); 1636} 1637 1638static void io_sq_wq_submit_work(struct work_struct *work) 1639{ 1640 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1641 struct io_ring_ctx *ctx = req->ctx; 1642 struct mm_struct *cur_mm = NULL; 1643 struct async_list *async_list; 1644 LIST_HEAD(req_list); 1645 mm_segment_t old_fs; 1646 int ret; 1647 1648 async_list = io_async_list_from_sqe(ctx, req->submit.sqe); 1649restart: 1650 do { 1651 struct sqe_submit *s = &req->submit; 1652 const struct io_uring_sqe *sqe = s->sqe; 1653 1654 /* Ensure we clear previously set non-block flag */ 1655 req->rw.ki_flags &= ~IOCB_NOWAIT; 1656 1657 ret = 0; 1658 if (io_sqe_needs_user(sqe) && !cur_mm) { 1659 if (!mmget_not_zero(ctx->sqo_mm)) { 1660 ret = -EFAULT; 1661 } else { 1662 cur_mm = ctx->sqo_mm; 1663 use_mm(cur_mm); 1664 old_fs = get_fs(); 1665 set_fs(USER_DS); 1666 } 1667 } 1668 1669 if (!ret) { 1670 s->has_user = cur_mm != NULL; 1671 s->needs_lock = true; 1672 do { 1673 ret = __io_submit_sqe(ctx, req, s, false); 1674 /* 1675 * We can get EAGAIN for polled IO even though 1676 * we're forcing a sync submission from here, 1677 * since we can't wait for request slots on the 1678 * block side. 1679 */ 1680 if (ret != -EAGAIN) 1681 break; 1682 cond_resched(); 1683 } while (1); 1684 } 1685 1686 /* drop submission reference */ 1687 io_put_req(req); 1688 1689 if (ret) { 1690 io_cqring_add_event(ctx, sqe->user_data, ret); 1691 io_put_req(req); 1692 } 1693 1694 /* async context always use a copy of the sqe */ 1695 kfree(sqe); 1696 1697 if (!async_list) 1698 break; 1699 if (!list_empty(&req_list)) { 1700 req = list_first_entry(&req_list, struct io_kiocb, 1701 list); 1702 list_del(&req->list); 1703 continue; 1704 } 1705 if (list_empty(&async_list->list)) 1706 break; 1707 1708 req = NULL; 1709 spin_lock(&async_list->lock); 1710 if (list_empty(&async_list->list)) { 1711 spin_unlock(&async_list->lock); 1712 break; 1713 } 1714 list_splice_init(&async_list->list, &req_list); 1715 spin_unlock(&async_list->lock); 1716 1717 req = list_first_entry(&req_list, struct io_kiocb, list); 1718 list_del(&req->list); 1719 } while (req); 1720 1721 /* 1722 * Rare case of racing with a submitter. If we find the count has 1723 * dropped to zero AND we have pending work items, then restart 1724 * the processing. This is a tiny race window. 1725 */ 1726 if (async_list) { 1727 ret = atomic_dec_return(&async_list->cnt); 1728 while (!ret && !list_empty(&async_list->list)) { 1729 spin_lock(&async_list->lock); 1730 atomic_inc(&async_list->cnt); 1731 list_splice_init(&async_list->list, &req_list); 1732 spin_unlock(&async_list->lock); 1733 1734 if (!list_empty(&req_list)) { 1735 req = list_first_entry(&req_list, 1736 struct io_kiocb, list); 1737 list_del(&req->list); 1738 goto restart; 1739 } 1740 ret = atomic_dec_return(&async_list->cnt); 1741 } 1742 } 1743 1744 if (cur_mm) { 1745 set_fs(old_fs); 1746 unuse_mm(cur_mm); 1747 mmput(cur_mm); 1748 } 1749} 1750 1751/* 1752 * See if we can piggy back onto previously submitted work, that is still 1753 * running. We currently only allow this if the new request is sequential 1754 * to the previous one we punted. 1755 */ 1756static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) 1757{ 1758 bool ret = false; 1759 1760 if (!list) 1761 return false; 1762 if (!(req->flags & REQ_F_SEQ_PREV)) 1763 return false; 1764 if (!atomic_read(&list->cnt)) 1765 return false; 1766 1767 ret = true; 1768 spin_lock(&list->lock); 1769 list_add_tail(&req->list, &list->list); 1770 if (!atomic_read(&list->cnt)) { 1771 list_del_init(&req->list); 1772 ret = false; 1773 } 1774 spin_unlock(&list->lock); 1775 return ret; 1776} 1777 1778static bool io_op_needs_file(const struct io_uring_sqe *sqe) 1779{ 1780 int op = READ_ONCE(sqe->opcode); 1781 1782 switch (op) { 1783 case IORING_OP_NOP: 1784 case IORING_OP_POLL_REMOVE: 1785 return false; 1786 default: 1787 return true; 1788 } 1789} 1790 1791static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, 1792 struct io_submit_state *state, struct io_kiocb *req) 1793{ 1794 unsigned flags; 1795 int fd; 1796 1797 flags = READ_ONCE(s->sqe->flags); 1798 fd = READ_ONCE(s->sqe->fd); 1799 1800 if (flags & IOSQE_IO_DRAIN) { 1801 req->flags |= REQ_F_IO_DRAIN; 1802 req->sequence = ctx->cached_sq_head - 1; 1803 } 1804 1805 if (!io_op_needs_file(s->sqe)) 1806 return 0; 1807 1808 if (flags & IOSQE_FIXED_FILE) { 1809 if (unlikely(!ctx->user_files || 1810 (unsigned) fd >= ctx->nr_user_files)) 1811 return -EBADF; 1812 req->file = ctx->user_files[fd]; 1813 req->flags |= REQ_F_FIXED_FILE; 1814 } else { 1815 if (s->needs_fixed_file) 1816 return -EBADF; 1817 req->file = io_file_get(state, fd); 1818 if (unlikely(!req->file)) 1819 return -EBADF; 1820 } 1821 1822 return 0; 1823} 1824 1825static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, 1826 struct io_submit_state *state) 1827{ 1828 struct io_kiocb *req; 1829 int ret; 1830 1831 /* enforce forwards compatibility on users */ 1832 if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN))) 1833 return -EINVAL; 1834 1835 req = io_get_req(ctx, state); 1836 if (unlikely(!req)) 1837 return -EAGAIN; 1838 1839 ret = io_req_set_file(ctx, s, state, req); 1840 if (unlikely(ret)) 1841 goto out; 1842 1843 ret = io_req_defer(ctx, req, s->sqe); 1844 if (ret) { 1845 if (ret == -EIOCBQUEUED) 1846 ret = 0; 1847 return ret; 1848 } 1849 1850 ret = __io_submit_sqe(ctx, req, s, true); 1851 if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { 1852 struct io_uring_sqe *sqe_copy; 1853 1854 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); 1855 if (sqe_copy) { 1856 struct async_list *list; 1857 1858 memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy)); 1859 s->sqe = sqe_copy; 1860 1861 memcpy(&req->submit, s, sizeof(*s)); 1862 list = io_async_list_from_sqe(ctx, s->sqe); 1863 if (!io_add_to_prev_work(list, req)) { 1864 if (list) 1865 atomic_inc(&list->cnt); 1866 INIT_WORK(&req->work, io_sq_wq_submit_work); 1867 queue_work(ctx->sqo_wq, &req->work); 1868 } 1869 1870 /* 1871 * Queued up for async execution, worker will release 1872 * submit reference when the iocb is actually 1873 * submitted. 1874 */ 1875 return 0; 1876 } 1877 } 1878 1879out: 1880 /* drop submission reference */ 1881 io_put_req(req); 1882 1883 /* and drop final reference, if we failed */ 1884 if (ret) 1885 io_put_req(req); 1886 1887 return ret; 1888} 1889 1890/* 1891 * Batched submission is done, ensure local IO is flushed out. 1892 */ 1893static void io_submit_state_end(struct io_submit_state *state) 1894{ 1895 blk_finish_plug(&state->plug); 1896 io_file_put(state); 1897 if (state->free_reqs) 1898 kmem_cache_free_bulk(req_cachep, state->free_reqs, 1899 &state->reqs[state->cur_req]); 1900} 1901 1902/* 1903 * Start submission side cache. 1904 */ 1905static void io_submit_state_start(struct io_submit_state *state, 1906 struct io_ring_ctx *ctx, unsigned max_ios) 1907{ 1908 blk_start_plug(&state->plug); 1909 state->free_reqs = 0; 1910 state->file = NULL; 1911 state->ios_left = max_ios; 1912} 1913 1914static void io_commit_sqring(struct io_ring_ctx *ctx) 1915{ 1916 struct io_sq_ring *ring = ctx->sq_ring; 1917 1918 if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) { 1919 /* 1920 * Ensure any loads from the SQEs are done at this point, 1921 * since once we write the new head, the application could 1922 * write new data to them. 1923 */ 1924 smp_store_release(&ring->r.head, ctx->cached_sq_head); 1925 } 1926} 1927 1928/* 1929 * Fetch an sqe, if one is available. Note that s->sqe will point to memory 1930 * that is mapped by userspace. This means that care needs to be taken to 1931 * ensure that reads are stable, as we cannot rely on userspace always 1932 * being a good citizen. If members of the sqe are validated and then later 1933 * used, it's important that those reads are done through READ_ONCE() to 1934 * prevent a re-load down the line. 1935 */ 1936static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) 1937{ 1938 struct io_sq_ring *ring = ctx->sq_ring; 1939 unsigned head; 1940 1941 /* 1942 * The cached sq head (or cq tail) serves two purposes: 1943 * 1944 * 1) allows us to batch the cost of updating the user visible 1945 * head updates. 1946 * 2) allows the kernel side to track the head on its own, even 1947 * though the application is the one updating it. 1948 */ 1949 head = ctx->cached_sq_head; 1950 /* make sure SQ entry isn't read before tail */ 1951 if (head == smp_load_acquire(&ring->r.tail)) 1952 return false; 1953 1954 head = READ_ONCE(ring->array[head & ctx->sq_mask]); 1955 if (head < ctx->sq_entries) { 1956 s->index = head; 1957 s->sqe = &ctx->sq_sqes[head]; 1958 ctx->cached_sq_head++; 1959 return true; 1960 } 1961 1962 /* drop invalid entries */ 1963 ctx->cached_sq_head++; 1964 ring->dropped++; 1965 return false; 1966} 1967 1968static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, 1969 unsigned int nr, bool has_user, bool mm_fault) 1970{ 1971 struct io_submit_state state, *statep = NULL; 1972 int ret, i, submitted = 0; 1973 1974 if (nr > IO_PLUG_THRESHOLD) { 1975 io_submit_state_start(&state, ctx, nr); 1976 statep = &state; 1977 } 1978 1979 for (i = 0; i < nr; i++) { 1980 if (unlikely(mm_fault)) { 1981 ret = -EFAULT; 1982 } else { 1983 sqes[i].has_user = has_user; 1984 sqes[i].needs_lock = true; 1985 sqes[i].needs_fixed_file = true; 1986 ret = io_submit_sqe(ctx, &sqes[i], statep); 1987 } 1988 if (!ret) { 1989 submitted++; 1990 continue; 1991 } 1992 1993 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret); 1994 } 1995 1996 if (statep) 1997 io_submit_state_end(&state); 1998 1999 return submitted; 2000} 2001 2002static int io_sq_thread(void *data) 2003{ 2004 struct sqe_submit sqes[IO_IOPOLL_BATCH]; 2005 struct io_ring_ctx *ctx = data; 2006 struct mm_struct *cur_mm = NULL; 2007 mm_segment_t old_fs; 2008 DEFINE_WAIT(wait); 2009 unsigned inflight; 2010 unsigned long timeout; 2011 2012 old_fs = get_fs(); 2013 set_fs(USER_DS); 2014 2015 timeout = inflight = 0; 2016 while (!kthread_should_park()) { 2017 bool all_fixed, mm_fault = false; 2018 int i; 2019 2020 if (inflight) { 2021 unsigned nr_events = 0; 2022 2023 if (ctx->flags & IORING_SETUP_IOPOLL) { 2024 /* 2025 * We disallow the app entering submit/complete 2026 * with polling, but we still need to lock the 2027 * ring to prevent racing with polled issue 2028 * that got punted to a workqueue. 2029 */ 2030 mutex_lock(&ctx->uring_lock); 2031 io_iopoll_check(ctx, &nr_events, 0); 2032 mutex_unlock(&ctx->uring_lock); 2033 } else { 2034 /* 2035 * Normal IO, just pretend everything completed. 2036 * We don't have to poll completions for that. 2037 */ 2038 nr_events = inflight; 2039 } 2040 2041 inflight -= nr_events; 2042 if (!inflight) 2043 timeout = jiffies + ctx->sq_thread_idle; 2044 } 2045 2046 if (!io_get_sqring(ctx, &sqes[0])) { 2047 /* 2048 * We're polling. If we're within the defined idle 2049 * period, then let us spin without work before going 2050 * to sleep. 2051 */ 2052 if (inflight || !time_after(jiffies, timeout)) { 2053 cpu_relax(); 2054 continue; 2055 } 2056 2057 /* 2058 * Drop cur_mm before scheduling, we can't hold it for 2059 * long periods (or over schedule()). Do this before 2060 * adding ourselves to the waitqueue, as the unuse/drop 2061 * may sleep. 2062 */ 2063 if (cur_mm) { 2064 unuse_mm(cur_mm); 2065 mmput(cur_mm); 2066 cur_mm = NULL; 2067 } 2068 2069 prepare_to_wait(&ctx->sqo_wait, &wait, 2070 TASK_INTERRUPTIBLE); 2071 2072 /* Tell userspace we may need a wakeup call */ 2073 ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP; 2074 /* make sure to read SQ tail after writing flags */ 2075 smp_mb(); 2076 2077 if (!io_get_sqring(ctx, &sqes[0])) { 2078 if (kthread_should_park()) { 2079 finish_wait(&ctx->sqo_wait, &wait); 2080 break; 2081 } 2082 if (signal_pending(current)) 2083 flush_signals(current); 2084 schedule(); 2085 finish_wait(&ctx->sqo_wait, &wait); 2086 2087 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; 2088 continue; 2089 } 2090 finish_wait(&ctx->sqo_wait, &wait); 2091 2092 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; 2093 } 2094 2095 i = 0; 2096 all_fixed = true; 2097 do { 2098 if (all_fixed && io_sqe_needs_user(sqes[i].sqe)) 2099 all_fixed = false; 2100 2101 i++; 2102 if (i == ARRAY_SIZE(sqes)) 2103 break; 2104 } while (io_get_sqring(ctx, &sqes[i])); 2105 2106 /* Unless all new commands are FIXED regions, grab mm */ 2107 if (!all_fixed && !cur_mm) { 2108 mm_fault = !mmget_not_zero(ctx->sqo_mm); 2109 if (!mm_fault) { 2110 use_mm(ctx->sqo_mm); 2111 cur_mm = ctx->sqo_mm; 2112 } 2113 } 2114 2115 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL, 2116 mm_fault); 2117 2118 /* Commit SQ ring head once we've consumed all SQEs */ 2119 io_commit_sqring(ctx); 2120 } 2121 2122 set_fs(old_fs); 2123 if (cur_mm) { 2124 unuse_mm(cur_mm); 2125 mmput(cur_mm); 2126 } 2127 2128 kthread_parkme(); 2129 2130 return 0; 2131} 2132 2133static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) 2134{ 2135 struct io_submit_state state, *statep = NULL; 2136 int i, submit = 0; 2137 2138 if (to_submit > IO_PLUG_THRESHOLD) { 2139 io_submit_state_start(&state, ctx, to_submit); 2140 statep = &state; 2141 } 2142 2143 for (i = 0; i < to_submit; i++) { 2144 struct sqe_submit s; 2145 int ret; 2146 2147 if (!io_get_sqring(ctx, &s)) 2148 break; 2149 2150 s.has_user = true; 2151 s.needs_lock = false; 2152 s.needs_fixed_file = false; 2153 submit++; 2154 2155 ret = io_submit_sqe(ctx, &s, statep); 2156 if (ret) 2157 io_cqring_add_event(ctx, s.sqe->user_data, ret); 2158 } 2159 io_commit_sqring(ctx); 2160 2161 if (statep) 2162 io_submit_state_end(statep); 2163 2164 return submit; 2165} 2166 2167static unsigned io_cqring_events(struct io_cq_ring *ring) 2168{ 2169 /* See comment at the top of this file */ 2170 smp_rmb(); 2171 return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head); 2172} 2173 2174/* 2175 * Wait until events become available, if we don't already have some. The 2176 * application must reap them itself, as they reside on the shared cq ring. 2177 */ 2178static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, 2179 const sigset_t __user *sig, size_t sigsz) 2180{ 2181 struct io_cq_ring *ring = ctx->cq_ring; 2182 sigset_t ksigmask, sigsaved; 2183 int ret; 2184 2185 if (io_cqring_events(ring) >= min_events) 2186 return 0; 2187 2188 if (sig) { 2189#ifdef CONFIG_COMPAT 2190 if (in_compat_syscall()) 2191 ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig, 2192 &ksigmask, &sigsaved, sigsz); 2193 else 2194#endif 2195 ret = set_user_sigmask(sig, &ksigmask, 2196 &sigsaved, sigsz); 2197 2198 if (ret) 2199 return ret; 2200 } 2201 2202 ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events); 2203 2204 if (sig) 2205 restore_user_sigmask(sig, &sigsaved, ret == -ERESTARTSYS); 2206 2207 if (ret == -ERESTARTSYS) 2208 ret = -EINTR; 2209 2210 return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0; 2211} 2212 2213static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) 2214{ 2215#if defined(CONFIG_UNIX) 2216 if (ctx->ring_sock) { 2217 struct sock *sock = ctx->ring_sock->sk; 2218 struct sk_buff *skb; 2219 2220 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL) 2221 kfree_skb(skb); 2222 } 2223#else 2224 int i; 2225 2226 for (i = 0; i < ctx->nr_user_files; i++) 2227 fput(ctx->user_files[i]); 2228#endif 2229} 2230 2231static int io_sqe_files_unregister(struct io_ring_ctx *ctx) 2232{ 2233 if (!ctx->user_files) 2234 return -ENXIO; 2235 2236 __io_sqe_files_unregister(ctx); 2237 kfree(ctx->user_files); 2238 ctx->user_files = NULL; 2239 ctx->nr_user_files = 0; 2240 return 0; 2241} 2242 2243static void io_sq_thread_stop(struct io_ring_ctx *ctx) 2244{ 2245 if (ctx->sqo_thread) { 2246 /* 2247 * The park is a bit of a work-around, without it we get 2248 * warning spews on shutdown with SQPOLL set and affinity 2249 * set to a single CPU. 2250 */ 2251 kthread_park(ctx->sqo_thread); 2252 kthread_stop(ctx->sqo_thread); 2253 ctx->sqo_thread = NULL; 2254 } 2255} 2256 2257static void io_finish_async(struct io_ring_ctx *ctx) 2258{ 2259 io_sq_thread_stop(ctx); 2260 2261 if (ctx->sqo_wq) { 2262 destroy_workqueue(ctx->sqo_wq); 2263 ctx->sqo_wq = NULL; 2264 } 2265} 2266 2267#if defined(CONFIG_UNIX) 2268static void io_destruct_skb(struct sk_buff *skb) 2269{ 2270 struct io_ring_ctx *ctx = skb->sk->sk_user_data; 2271 2272 io_finish_async(ctx); 2273 unix_destruct_scm(skb); 2274} 2275 2276/* 2277 * Ensure the UNIX gc is aware of our file set, so we are certain that 2278 * the io_uring can be safely unregistered on process exit, even if we have 2279 * loops in the file referencing. 2280 */ 2281static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset) 2282{ 2283 struct sock *sk = ctx->ring_sock->sk; 2284 struct scm_fp_list *fpl; 2285 struct sk_buff *skb; 2286 int i; 2287 2288 if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) { 2289 unsigned long inflight = ctx->user->unix_inflight + nr; 2290 2291 if (inflight > task_rlimit(current, RLIMIT_NOFILE)) 2292 return -EMFILE; 2293 } 2294 2295 fpl = kzalloc(sizeof(*fpl), GFP_KERNEL); 2296 if (!fpl) 2297 return -ENOMEM; 2298 2299 skb = alloc_skb(0, GFP_KERNEL); 2300 if (!skb) { 2301 kfree(fpl); 2302 return -ENOMEM; 2303 } 2304 2305 skb->sk = sk; 2306 skb->destructor = io_destruct_skb; 2307 2308 fpl->user = get_uid(ctx->user); 2309 for (i = 0; i < nr; i++) { 2310 fpl->fp[i] = get_file(ctx->user_files[i + offset]); 2311 unix_inflight(fpl->user, fpl->fp[i]); 2312 } 2313 2314 fpl->max = fpl->count = nr; 2315 UNIXCB(skb).fp = fpl; 2316 refcount_add(skb->truesize, &sk->sk_wmem_alloc); 2317 skb_queue_head(&sk->sk_receive_queue, skb); 2318 2319 for (i = 0; i < nr; i++) 2320 fput(fpl->fp[i]); 2321 2322 return 0; 2323} 2324 2325/* 2326 * If UNIX sockets are enabled, fd passing can cause a reference cycle which 2327 * causes regular reference counting to break down. We rely on the UNIX 2328 * garbage collection to take care of this problem for us. 2329 */ 2330static int io_sqe_files_scm(struct io_ring_ctx *ctx) 2331{ 2332 unsigned left, total; 2333 int ret = 0; 2334 2335 total = 0; 2336 left = ctx->nr_user_files; 2337 while (left) { 2338 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD); 2339 2340 ret = __io_sqe_files_scm(ctx, this_files, total); 2341 if (ret) 2342 break; 2343 left -= this_files; 2344 total += this_files; 2345 } 2346 2347 if (!ret) 2348 return 0; 2349 2350 while (total < ctx->nr_user_files) { 2351 fput(ctx->user_files[total]); 2352 total++; 2353 } 2354 2355 return ret; 2356} 2357#else 2358static int io_sqe_files_scm(struct io_ring_ctx *ctx) 2359{ 2360 return 0; 2361} 2362#endif 2363 2364static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, 2365 unsigned nr_args) 2366{ 2367 __s32 __user *fds = (__s32 __user *) arg; 2368 int fd, ret = 0; 2369 unsigned i; 2370 2371 if (ctx->user_files) 2372 return -EBUSY; 2373 if (!nr_args) 2374 return -EINVAL; 2375 if (nr_args > IORING_MAX_FIXED_FILES) 2376 return -EMFILE; 2377 2378 ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL); 2379 if (!ctx->user_files) 2380 return -ENOMEM; 2381 2382 for (i = 0; i < nr_args; i++) { 2383 ret = -EFAULT; 2384 if (copy_from_user(&fd, &fds[i], sizeof(fd))) 2385 break; 2386 2387 ctx->user_files[i] = fget(fd); 2388 2389 ret = -EBADF; 2390 if (!ctx->user_files[i]) 2391 break; 2392 /* 2393 * Don't allow io_uring instances to be registered. If UNIX 2394 * isn't enabled, then this causes a reference cycle and this 2395 * instance can never get freed. If UNIX is enabled we'll 2396 * handle it just fine, but there's still no point in allowing 2397 * a ring fd as it doesn't support regular read/write anyway. 2398 */ 2399 if (ctx->user_files[i]->f_op == &io_uring_fops) { 2400 fput(ctx->user_files[i]); 2401 break; 2402 } 2403 ctx->nr_user_files++; 2404 ret = 0; 2405 } 2406 2407 if (ret) { 2408 for (i = 0; i < ctx->nr_user_files; i++) 2409 fput(ctx->user_files[i]); 2410 2411 kfree(ctx->user_files); 2412 ctx->user_files = NULL; 2413 ctx->nr_user_files = 0; 2414 return ret; 2415 } 2416 2417 ret = io_sqe_files_scm(ctx); 2418 if (ret) 2419 io_sqe_files_unregister(ctx); 2420 2421 return ret; 2422} 2423 2424static int io_sq_offload_start(struct io_ring_ctx *ctx, 2425 struct io_uring_params *p) 2426{ 2427 int ret; 2428 2429 init_waitqueue_head(&ctx->sqo_wait); 2430 mmgrab(current->mm); 2431 ctx->sqo_mm = current->mm; 2432 2433 if (ctx->flags & IORING_SETUP_SQPOLL) { 2434 ret = -EPERM; 2435 if (!capable(CAP_SYS_ADMIN)) 2436 goto err; 2437 2438 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 2439 if (!ctx->sq_thread_idle) 2440 ctx->sq_thread_idle = HZ; 2441 2442 if (p->flags & IORING_SETUP_SQ_AFF) { 2443 int cpu = p->sq_thread_cpu; 2444 2445 ret = -EINVAL; 2446 if (cpu >= nr_cpu_ids) 2447 goto err; 2448 if (!cpu_online(cpu)) 2449 goto err; 2450 2451 ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread, 2452 ctx, cpu, 2453 "io_uring-sq"); 2454 } else { 2455 ctx->sqo_thread = kthread_create(io_sq_thread, ctx, 2456 "io_uring-sq"); 2457 } 2458 if (IS_ERR(ctx->sqo_thread)) { 2459 ret = PTR_ERR(ctx->sqo_thread); 2460 ctx->sqo_thread = NULL; 2461 goto err; 2462 } 2463 wake_up_process(ctx->sqo_thread); 2464 } else if (p->flags & IORING_SETUP_SQ_AFF) { 2465 /* Can't have SQ_AFF without SQPOLL */ 2466 ret = -EINVAL; 2467 goto err; 2468 } 2469 2470 /* Do QD, or 2 * CPUS, whatever is smallest */ 2471 ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE, 2472 min(ctx->sq_entries - 1, 2 * num_online_cpus())); 2473 if (!ctx->sqo_wq) { 2474 ret = -ENOMEM; 2475 goto err; 2476 } 2477 2478 return 0; 2479err: 2480 io_sq_thread_stop(ctx); 2481 mmdrop(ctx->sqo_mm); 2482 ctx->sqo_mm = NULL; 2483 return ret; 2484} 2485 2486static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages) 2487{ 2488 atomic_long_sub(nr_pages, &user->locked_vm); 2489} 2490 2491static int io_account_mem(struct user_struct *user, unsigned long nr_pages) 2492{ 2493 unsigned long page_limit, cur_pages, new_pages; 2494 2495 /* Don't allow more pages than we can safely lock */ 2496 page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT; 2497 2498 do { 2499 cur_pages = atomic_long_read(&user->locked_vm); 2500 new_pages = cur_pages + nr_pages; 2501 if (new_pages > page_limit) 2502 return -ENOMEM; 2503 } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages, 2504 new_pages) != cur_pages); 2505 2506 return 0; 2507} 2508 2509static void io_mem_free(void *ptr) 2510{ 2511 struct page *page; 2512 2513 if (!ptr) 2514 return; 2515 2516 page = virt_to_head_page(ptr); 2517 if (put_page_testzero(page)) 2518 free_compound_page(page); 2519} 2520 2521static void *io_mem_alloc(size_t size) 2522{ 2523 gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP | 2524 __GFP_NORETRY; 2525 2526 return (void *) __get_free_pages(gfp_flags, get_order(size)); 2527} 2528 2529static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries) 2530{ 2531 struct io_sq_ring *sq_ring; 2532 struct io_cq_ring *cq_ring; 2533 size_t bytes; 2534 2535 bytes = struct_size(sq_ring, array, sq_entries); 2536 bytes += array_size(sizeof(struct io_uring_sqe), sq_entries); 2537 bytes += struct_size(cq_ring, cqes, cq_entries); 2538 2539 return (bytes + PAGE_SIZE - 1) / PAGE_SIZE; 2540} 2541 2542static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) 2543{ 2544 int i, j; 2545 2546 if (!ctx->user_bufs) 2547 return -ENXIO; 2548 2549 for (i = 0; i < ctx->nr_user_bufs; i++) { 2550 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 2551 2552 for (j = 0; j < imu->nr_bvecs; j++) 2553 put_page(imu->bvec[j].bv_page); 2554 2555 if (ctx->account_mem) 2556 io_unaccount_mem(ctx->user, imu->nr_bvecs); 2557 kvfree(imu->bvec); 2558 imu->nr_bvecs = 0; 2559 } 2560 2561 kfree(ctx->user_bufs); 2562 ctx->user_bufs = NULL; 2563 ctx->nr_user_bufs = 0; 2564 return 0; 2565} 2566 2567static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst, 2568 void __user *arg, unsigned index) 2569{ 2570 struct iovec __user *src; 2571 2572#ifdef CONFIG_COMPAT 2573 if (ctx->compat) { 2574 struct compat_iovec __user *ciovs; 2575 struct compat_iovec ciov; 2576 2577 ciovs = (struct compat_iovec __user *) arg; 2578 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov))) 2579 return -EFAULT; 2580 2581 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base; 2582 dst->iov_len = ciov.iov_len; 2583 return 0; 2584 } 2585#endif 2586 src = (struct iovec __user *) arg; 2587 if (copy_from_user(dst, &src[index], sizeof(*dst))) 2588 return -EFAULT; 2589 return 0; 2590} 2591 2592static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, 2593 unsigned nr_args) 2594{ 2595 struct vm_area_struct **vmas = NULL; 2596 struct page **pages = NULL; 2597 int i, j, got_pages = 0; 2598 int ret = -EINVAL; 2599 2600 if (ctx->user_bufs) 2601 return -EBUSY; 2602 if (!nr_args || nr_args > UIO_MAXIOV) 2603 return -EINVAL; 2604 2605 ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf), 2606 GFP_KERNEL); 2607 if (!ctx->user_bufs) 2608 return -ENOMEM; 2609 2610 for (i = 0; i < nr_args; i++) { 2611 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 2612 unsigned long off, start, end, ubuf; 2613 int pret, nr_pages; 2614 struct iovec iov; 2615 size_t size; 2616 2617 ret = io_copy_iov(ctx, &iov, arg, i); 2618 if (ret) 2619 goto err; 2620 2621 /* 2622 * Don't impose further limits on the size and buffer 2623 * constraints here, we'll -EINVAL later when IO is 2624 * submitted if they are wrong. 2625 */ 2626 ret = -EFAULT; 2627 if (!iov.iov_base || !iov.iov_len) 2628 goto err; 2629 2630 /* arbitrary limit, but we need something */ 2631 if (iov.iov_len > SZ_1G) 2632 goto err; 2633 2634 ubuf = (unsigned long) iov.iov_base; 2635 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT; 2636 start = ubuf >> PAGE_SHIFT; 2637 nr_pages = end - start; 2638 2639 if (ctx->account_mem) { 2640 ret = io_account_mem(ctx->user, nr_pages); 2641 if (ret) 2642 goto err; 2643 } 2644 2645 ret = 0; 2646 if (!pages || nr_pages > got_pages) { 2647 kfree(vmas); 2648 kfree(pages); 2649 pages = kvmalloc_array(nr_pages, sizeof(struct page *), 2650 GFP_KERNEL); 2651 vmas = kvmalloc_array(nr_pages, 2652 sizeof(struct vm_area_struct *), 2653 GFP_KERNEL); 2654 if (!pages || !vmas) { 2655 ret = -ENOMEM; 2656 if (ctx->account_mem) 2657 io_unaccount_mem(ctx->user, nr_pages); 2658 goto err; 2659 } 2660 got_pages = nr_pages; 2661 } 2662 2663 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec), 2664 GFP_KERNEL); 2665 ret = -ENOMEM; 2666 if (!imu->bvec) { 2667 if (ctx->account_mem) 2668 io_unaccount_mem(ctx->user, nr_pages); 2669 goto err; 2670 } 2671 2672 ret = 0; 2673 down_read(&current->mm->mmap_sem); 2674 pret = get_user_pages(ubuf, nr_pages, 2675 FOLL_WRITE | FOLL_LONGTERM, 2676 pages, vmas); 2677 if (pret == nr_pages) { 2678 /* don't support file backed memory */ 2679 for (j = 0; j < nr_pages; j++) { 2680 struct vm_area_struct *vma = vmas[j]; 2681 2682 if (vma->vm_file && 2683 !is_file_hugepages(vma->vm_file)) { 2684 ret = -EOPNOTSUPP; 2685 break; 2686 } 2687 } 2688 } else { 2689 ret = pret < 0 ? pret : -EFAULT; 2690 } 2691 up_read(&current->mm->mmap_sem); 2692 if (ret) { 2693 /* 2694 * if we did partial map, or found file backed vmas, 2695 * release any pages we did get 2696 */ 2697 if (pret > 0) { 2698 for (j = 0; j < pret; j++) 2699 put_page(pages[j]); 2700 } 2701 if (ctx->account_mem) 2702 io_unaccount_mem(ctx->user, nr_pages); 2703 kvfree(imu->bvec); 2704 goto err; 2705 } 2706 2707 off = ubuf & ~PAGE_MASK; 2708 size = iov.iov_len; 2709 for (j = 0; j < nr_pages; j++) { 2710 size_t vec_len; 2711 2712 vec_len = min_t(size_t, size, PAGE_SIZE - off); 2713 imu->bvec[j].bv_page = pages[j]; 2714 imu->bvec[j].bv_len = vec_len; 2715 imu->bvec[j].bv_offset = off; 2716 off = 0; 2717 size -= vec_len; 2718 } 2719 /* store original address for later verification */ 2720 imu->ubuf = ubuf; 2721 imu->len = iov.iov_len; 2722 imu->nr_bvecs = nr_pages; 2723 2724 ctx->nr_user_bufs++; 2725 } 2726 kvfree(pages); 2727 kvfree(vmas); 2728 return 0; 2729err: 2730 kvfree(pages); 2731 kvfree(vmas); 2732 io_sqe_buffer_unregister(ctx); 2733 return ret; 2734} 2735 2736static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg) 2737{ 2738 __s32 __user *fds = arg; 2739 int fd; 2740 2741 if (ctx->cq_ev_fd) 2742 return -EBUSY; 2743 2744 if (copy_from_user(&fd, fds, sizeof(*fds))) 2745 return -EFAULT; 2746 2747 ctx->cq_ev_fd = eventfd_ctx_fdget(fd); 2748 if (IS_ERR(ctx->cq_ev_fd)) { 2749 int ret = PTR_ERR(ctx->cq_ev_fd); 2750 ctx->cq_ev_fd = NULL; 2751 return ret; 2752 } 2753 2754 return 0; 2755} 2756 2757static int io_eventfd_unregister(struct io_ring_ctx *ctx) 2758{ 2759 if (ctx->cq_ev_fd) { 2760 eventfd_ctx_put(ctx->cq_ev_fd); 2761 ctx->cq_ev_fd = NULL; 2762 return 0; 2763 } 2764 2765 return -ENXIO; 2766} 2767 2768static void io_ring_ctx_free(struct io_ring_ctx *ctx) 2769{ 2770 io_finish_async(ctx); 2771 if (ctx->sqo_mm) 2772 mmdrop(ctx->sqo_mm); 2773 2774 io_iopoll_reap_events(ctx); 2775 io_sqe_buffer_unregister(ctx); 2776 io_sqe_files_unregister(ctx); 2777 io_eventfd_unregister(ctx); 2778 2779#if defined(CONFIG_UNIX) 2780 if (ctx->ring_sock) { 2781 ctx->ring_sock->file = NULL; /* so that iput() is called */ 2782 sock_release(ctx->ring_sock); 2783 } 2784#endif 2785 2786 io_mem_free(ctx->sq_ring); 2787 io_mem_free(ctx->sq_sqes); 2788 io_mem_free(ctx->cq_ring); 2789 2790 percpu_ref_exit(&ctx->refs); 2791 if (ctx->account_mem) 2792 io_unaccount_mem(ctx->user, 2793 ring_pages(ctx->sq_entries, ctx->cq_entries)); 2794 free_uid(ctx->user); 2795 kfree(ctx); 2796} 2797 2798static __poll_t io_uring_poll(struct file *file, poll_table *wait) 2799{ 2800 struct io_ring_ctx *ctx = file->private_data; 2801 __poll_t mask = 0; 2802 2803 poll_wait(file, &ctx->cq_wait, wait); 2804 /* 2805 * synchronizes with barrier from wq_has_sleeper call in 2806 * io_commit_cqring 2807 */ 2808 smp_rmb(); 2809 if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head != 2810 ctx->sq_ring->ring_entries) 2811 mask |= EPOLLOUT | EPOLLWRNORM; 2812 if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail) 2813 mask |= EPOLLIN | EPOLLRDNORM; 2814 2815 return mask; 2816} 2817 2818static int io_uring_fasync(int fd, struct file *file, int on) 2819{ 2820 struct io_ring_ctx *ctx = file->private_data; 2821 2822 return fasync_helper(fd, file, on, &ctx->cq_fasync); 2823} 2824 2825static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) 2826{ 2827 mutex_lock(&ctx->uring_lock); 2828 percpu_ref_kill(&ctx->refs); 2829 mutex_unlock(&ctx->uring_lock); 2830 2831 io_poll_remove_all(ctx); 2832 io_iopoll_reap_events(ctx); 2833 wait_for_completion(&ctx->ctx_done); 2834 io_ring_ctx_free(ctx); 2835} 2836 2837static int io_uring_release(struct inode *inode, struct file *file) 2838{ 2839 struct io_ring_ctx *ctx = file->private_data; 2840 2841 file->private_data = NULL; 2842 io_ring_ctx_wait_and_kill(ctx); 2843 return 0; 2844} 2845 2846static int io_uring_mmap(struct file *file, struct vm_area_struct *vma) 2847{ 2848 loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT; 2849 unsigned long sz = vma->vm_end - vma->vm_start; 2850 struct io_ring_ctx *ctx = file->private_data; 2851 unsigned long pfn; 2852 struct page *page; 2853 void *ptr; 2854 2855 switch (offset) { 2856 case IORING_OFF_SQ_RING: 2857 ptr = ctx->sq_ring; 2858 break; 2859 case IORING_OFF_SQES: 2860 ptr = ctx->sq_sqes; 2861 break; 2862 case IORING_OFF_CQ_RING: 2863 ptr = ctx->cq_ring; 2864 break; 2865 default: 2866 return -EINVAL; 2867 } 2868 2869 page = virt_to_head_page(ptr); 2870 if (sz > (PAGE_SIZE << compound_order(page))) 2871 return -EINVAL; 2872 2873 pfn = virt_to_phys(ptr) >> PAGE_SHIFT; 2874 return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot); 2875} 2876 2877SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, 2878 u32, min_complete, u32, flags, const sigset_t __user *, sig, 2879 size_t, sigsz) 2880{ 2881 struct io_ring_ctx *ctx; 2882 long ret = -EBADF; 2883 int submitted = 0; 2884 struct fd f; 2885 2886 if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP)) 2887 return -EINVAL; 2888 2889 f = fdget(fd); 2890 if (!f.file) 2891 return -EBADF; 2892 2893 ret = -EOPNOTSUPP; 2894 if (f.file->f_op != &io_uring_fops) 2895 goto out_fput; 2896 2897 ret = -ENXIO; 2898 ctx = f.file->private_data; 2899 if (!percpu_ref_tryget(&ctx->refs)) 2900 goto out_fput; 2901 2902 /* 2903 * For SQ polling, the thread will do all submissions and completions. 2904 * Just return the requested submit count, and wake the thread if 2905 * we were asked to. 2906 */ 2907 if (ctx->flags & IORING_SETUP_SQPOLL) { 2908 if (flags & IORING_ENTER_SQ_WAKEUP) 2909 wake_up(&ctx->sqo_wait); 2910 submitted = to_submit; 2911 goto out_ctx; 2912 } 2913 2914 ret = 0; 2915 if (to_submit) { 2916 to_submit = min(to_submit, ctx->sq_entries); 2917 2918 mutex_lock(&ctx->uring_lock); 2919 submitted = io_ring_submit(ctx, to_submit); 2920 mutex_unlock(&ctx->uring_lock); 2921 } 2922 if (flags & IORING_ENTER_GETEVENTS) { 2923 unsigned nr_events = 0; 2924 2925 min_complete = min(min_complete, ctx->cq_entries); 2926 2927 if (ctx->flags & IORING_SETUP_IOPOLL) { 2928 mutex_lock(&ctx->uring_lock); 2929 ret = io_iopoll_check(ctx, &nr_events, min_complete); 2930 mutex_unlock(&ctx->uring_lock); 2931 } else { 2932 ret = io_cqring_wait(ctx, min_complete, sig, sigsz); 2933 } 2934 } 2935 2936out_ctx: 2937 io_ring_drop_ctx_refs(ctx, 1); 2938out_fput: 2939 fdput(f); 2940 return submitted ? submitted : ret; 2941} 2942 2943static const struct file_operations io_uring_fops = { 2944 .release = io_uring_release, 2945 .mmap = io_uring_mmap, 2946 .poll = io_uring_poll, 2947 .fasync = io_uring_fasync, 2948}; 2949 2950static int io_allocate_scq_urings(struct io_ring_ctx *ctx, 2951 struct io_uring_params *p) 2952{ 2953 struct io_sq_ring *sq_ring; 2954 struct io_cq_ring *cq_ring; 2955 size_t size; 2956 2957 sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries)); 2958 if (!sq_ring) 2959 return -ENOMEM; 2960 2961 ctx->sq_ring = sq_ring; 2962 sq_ring->ring_mask = p->sq_entries - 1; 2963 sq_ring->ring_entries = p->sq_entries; 2964 ctx->sq_mask = sq_ring->ring_mask; 2965 ctx->sq_entries = sq_ring->ring_entries; 2966 2967 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); 2968 if (size == SIZE_MAX) 2969 return -EOVERFLOW; 2970 2971 ctx->sq_sqes = io_mem_alloc(size); 2972 if (!ctx->sq_sqes) 2973 return -ENOMEM; 2974 2975 cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries)); 2976 if (!cq_ring) 2977 return -ENOMEM; 2978 2979 ctx->cq_ring = cq_ring; 2980 cq_ring->ring_mask = p->cq_entries - 1; 2981 cq_ring->ring_entries = p->cq_entries; 2982 ctx->cq_mask = cq_ring->ring_mask; 2983 ctx->cq_entries = cq_ring->ring_entries; 2984 return 0; 2985} 2986 2987/* 2988 * Allocate an anonymous fd, this is what constitutes the application 2989 * visible backing of an io_uring instance. The application mmaps this 2990 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled, 2991 * we have to tie this fd to a socket for file garbage collection purposes. 2992 */ 2993static int io_uring_get_fd(struct io_ring_ctx *ctx) 2994{ 2995 struct file *file; 2996 int ret; 2997 2998#if defined(CONFIG_UNIX) 2999 ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP, 3000 &ctx->ring_sock); 3001 if (ret) 3002 return ret; 3003#endif 3004 3005 ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC); 3006 if (ret < 0) 3007 goto err; 3008 3009 file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx, 3010 O_RDWR | O_CLOEXEC); 3011 if (IS_ERR(file)) { 3012 put_unused_fd(ret); 3013 ret = PTR_ERR(file); 3014 goto err; 3015 } 3016 3017#if defined(CONFIG_UNIX) 3018 ctx->ring_sock->file = file; 3019 ctx->ring_sock->sk->sk_user_data = ctx; 3020#endif 3021 fd_install(ret, file); 3022 return ret; 3023err: 3024#if defined(CONFIG_UNIX) 3025 sock_release(ctx->ring_sock); 3026 ctx->ring_sock = NULL; 3027#endif 3028 return ret; 3029} 3030 3031static int io_uring_create(unsigned entries, struct io_uring_params *p) 3032{ 3033 struct user_struct *user = NULL; 3034 struct io_ring_ctx *ctx; 3035 bool account_mem; 3036 int ret; 3037 3038 if (!entries || entries > IORING_MAX_ENTRIES) 3039 return -EINVAL; 3040 3041 /* 3042 * Use twice as many entries for the CQ ring. It's possible for the 3043 * application to drive a higher depth than the size of the SQ ring, 3044 * since the sqes are only used at submission time. This allows for 3045 * some flexibility in overcommitting a bit. 3046 */ 3047 p->sq_entries = roundup_pow_of_two(entries); 3048 p->cq_entries = 2 * p->sq_entries; 3049 3050 user = get_uid(current_user()); 3051 account_mem = !capable(CAP_IPC_LOCK); 3052 3053 if (account_mem) { 3054 ret = io_account_mem(user, 3055 ring_pages(p->sq_entries, p->cq_entries)); 3056 if (ret) { 3057 free_uid(user); 3058 return ret; 3059 } 3060 } 3061 3062 ctx = io_ring_ctx_alloc(p); 3063 if (!ctx) { 3064 if (account_mem) 3065 io_unaccount_mem(user, ring_pages(p->sq_entries, 3066 p->cq_entries)); 3067 free_uid(user); 3068 return -ENOMEM; 3069 } 3070 ctx->compat = in_compat_syscall(); 3071 ctx->account_mem = account_mem; 3072 ctx->user = user; 3073 3074 ret = io_allocate_scq_urings(ctx, p); 3075 if (ret) 3076 goto err; 3077 3078 ret = io_sq_offload_start(ctx, p); 3079 if (ret) 3080 goto err; 3081 3082 ret = io_uring_get_fd(ctx); 3083 if (ret < 0) 3084 goto err; 3085 3086 memset(&p->sq_off, 0, sizeof(p->sq_off)); 3087 p->sq_off.head = offsetof(struct io_sq_ring, r.head); 3088 p->sq_off.tail = offsetof(struct io_sq_ring, r.tail); 3089 p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask); 3090 p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries); 3091 p->sq_off.flags = offsetof(struct io_sq_ring, flags); 3092 p->sq_off.dropped = offsetof(struct io_sq_ring, dropped); 3093 p->sq_off.array = offsetof(struct io_sq_ring, array); 3094 3095 memset(&p->cq_off, 0, sizeof(p->cq_off)); 3096 p->cq_off.head = offsetof(struct io_cq_ring, r.head); 3097 p->cq_off.tail = offsetof(struct io_cq_ring, r.tail); 3098 p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask); 3099 p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries); 3100 p->cq_off.overflow = offsetof(struct io_cq_ring, overflow); 3101 p->cq_off.cqes = offsetof(struct io_cq_ring, cqes); 3102 return ret; 3103err: 3104 io_ring_ctx_wait_and_kill(ctx); 3105 return ret; 3106} 3107 3108/* 3109 * Sets up an aio uring context, and returns the fd. Applications asks for a 3110 * ring size, we return the actual sq/cq ring sizes (among other things) in the 3111 * params structure passed in. 3112 */ 3113static long io_uring_setup(u32 entries, struct io_uring_params __user *params) 3114{ 3115 struct io_uring_params p; 3116 long ret; 3117 int i; 3118 3119 if (copy_from_user(&p, params, sizeof(p))) 3120 return -EFAULT; 3121 for (i = 0; i < ARRAY_SIZE(p.resv); i++) { 3122 if (p.resv[i]) 3123 return -EINVAL; 3124 } 3125 3126 if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | 3127 IORING_SETUP_SQ_AFF)) 3128 return -EINVAL; 3129 3130 ret = io_uring_create(entries, &p); 3131 if (ret < 0) 3132 return ret; 3133 3134 if (copy_to_user(params, &p, sizeof(p))) 3135 return -EFAULT; 3136 3137 return ret; 3138} 3139 3140SYSCALL_DEFINE2(io_uring_setup, u32, entries, 3141 struct io_uring_params __user *, params) 3142{ 3143 return io_uring_setup(entries, params); 3144} 3145 3146static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, 3147 void __user *arg, unsigned nr_args) 3148 __releases(ctx->uring_lock) 3149 __acquires(ctx->uring_lock) 3150{ 3151 int ret; 3152 3153 /* 3154 * We're inside the ring mutex, if the ref is already dying, then 3155 * someone else killed the ctx or is already going through 3156 * io_uring_register(). 3157 */ 3158 if (percpu_ref_is_dying(&ctx->refs)) 3159 return -ENXIO; 3160 3161 percpu_ref_kill(&ctx->refs); 3162 3163 /* 3164 * Drop uring mutex before waiting for references to exit. If another 3165 * thread is currently inside io_uring_enter() it might need to grab 3166 * the uring_lock to make progress. If we hold it here across the drain 3167 * wait, then we can deadlock. It's safe to drop the mutex here, since 3168 * no new references will come in after we've killed the percpu ref. 3169 */ 3170 mutex_unlock(&ctx->uring_lock); 3171 wait_for_completion(&ctx->ctx_done); 3172 mutex_lock(&ctx->uring_lock); 3173 3174 switch (opcode) { 3175 case IORING_REGISTER_BUFFERS: 3176 ret = io_sqe_buffer_register(ctx, arg, nr_args); 3177 break; 3178 case IORING_UNREGISTER_BUFFERS: 3179 ret = -EINVAL; 3180 if (arg || nr_args) 3181 break; 3182 ret = io_sqe_buffer_unregister(ctx); 3183 break; 3184 case IORING_REGISTER_FILES: 3185 ret = io_sqe_files_register(ctx, arg, nr_args); 3186 break; 3187 case IORING_UNREGISTER_FILES: 3188 ret = -EINVAL; 3189 if (arg || nr_args) 3190 break; 3191 ret = io_sqe_files_unregister(ctx); 3192 break; 3193 case IORING_REGISTER_EVENTFD: 3194 ret = -EINVAL; 3195 if (nr_args != 1) 3196 break; 3197 ret = io_eventfd_register(ctx, arg); 3198 break; 3199 case IORING_UNREGISTER_EVENTFD: 3200 ret = -EINVAL; 3201 if (arg || nr_args) 3202 break; 3203 ret = io_eventfd_unregister(ctx); 3204 break; 3205 default: 3206 ret = -EINVAL; 3207 break; 3208 } 3209 3210 /* bring the ctx back to life */ 3211 reinit_completion(&ctx->ctx_done); 3212 percpu_ref_reinit(&ctx->refs); 3213 return ret; 3214} 3215 3216SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, 3217 void __user *, arg, unsigned int, nr_args) 3218{ 3219 struct io_ring_ctx *ctx; 3220 long ret = -EBADF; 3221 struct fd f; 3222 3223 f = fdget(fd); 3224 if (!f.file) 3225 return -EBADF; 3226 3227 ret = -EOPNOTSUPP; 3228 if (f.file->f_op != &io_uring_fops) 3229 goto out_fput; 3230 3231 ctx = f.file->private_data; 3232 3233 mutex_lock(&ctx->uring_lock); 3234 ret = __io_uring_register(ctx, opcode, arg, nr_args); 3235 mutex_unlock(&ctx->uring_lock); 3236out_fput: 3237 fdput(f); 3238 return ret; 3239} 3240 3241static int __init io_uring_init(void) 3242{ 3243 req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC); 3244 return 0; 3245}; 3246__initcall(io_uring_init);