Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.2-rc5 3246 lines 78 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Shared application/kernel submission and completion ring pairs, for 4 * supporting fast/efficient IO. 5 * 6 * A note on the read/write ordering memory barriers that are matched between 7 * the application and kernel side. 8 * 9 * After the application reads the CQ ring tail, it must use an 10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses 11 * before writing the tail (using smp_load_acquire to read the tail will 12 * do). It also needs a smp_mb() before updating CQ head (ordering the 13 * entry load(s) with the head store), pairing with an implicit barrier 14 * through a control-dependency in io_get_cqring (smp_store_release to 15 * store head will do). Failure to do so could lead to reading invalid 16 * CQ entries. 17 * 18 * Likewise, the application must use an appropriate smp_wmb() before 19 * writing the SQ tail (ordering SQ entry stores with the tail store), 20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release 21 * to store the tail will do). And it needs a barrier ordering the SQ 22 * head load before writing new SQ entries (smp_load_acquire to read 23 * head will do). 24 * 25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application 26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after* 27 * updating the SQ tail; a full memory barrier smp_mb() is needed 28 * between. 29 * 30 * Also see the examples in the liburing library: 31 * 32 * git://git.kernel.dk/liburing 33 * 34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens 35 * from data shared between the kernel and application. This is done both 36 * for ordering purposes, but also to ensure that once a value is loaded from 37 * data that the application could potentially modify, it remains stable. 38 * 39 * Copyright (C) 2018-2019 Jens Axboe 40 * Copyright (c) 2018-2019 Christoph Hellwig 41 */ 42#include <linux/kernel.h> 43#include <linux/init.h> 44#include <linux/errno.h> 45#include <linux/syscalls.h> 46#include <linux/compat.h> 47#include <linux/refcount.h> 48#include <linux/uio.h> 49 50#include <linux/sched/signal.h> 51#include <linux/fs.h> 52#include <linux/file.h> 53#include <linux/fdtable.h> 54#include <linux/mm.h> 55#include <linux/mman.h> 56#include <linux/mmu_context.h> 57#include <linux/percpu.h> 58#include <linux/slab.h> 59#include <linux/workqueue.h> 60#include <linux/kthread.h> 61#include <linux/blkdev.h> 62#include <linux/bvec.h> 63#include <linux/net.h> 64#include <net/sock.h> 65#include <net/af_unix.h> 66#include <net/scm.h> 67#include <linux/anon_inodes.h> 68#include <linux/sched/mm.h> 69#include <linux/uaccess.h> 70#include <linux/nospec.h> 71#include <linux/sizes.h> 72#include <linux/hugetlb.h> 73 74#include <uapi/linux/io_uring.h> 75 76#include "internal.h" 77 78#define IORING_MAX_ENTRIES 4096 79#define IORING_MAX_FIXED_FILES 1024 80 81struct io_uring { 82 u32 head ____cacheline_aligned_in_smp; 83 u32 tail ____cacheline_aligned_in_smp; 84}; 85 86/* 87 * This data is shared with the application through the mmap at offset 88 * IORING_OFF_SQ_RING. 89 * 90 * The offsets to the member fields are published through struct 91 * io_sqring_offsets when calling io_uring_setup. 92 */ 93struct io_sq_ring { 94 /* 95 * Head and tail offsets into the ring; the offsets need to be 96 * masked to get valid indices. 97 * 98 * The kernel controls head and the application controls tail. 99 */ 100 struct io_uring r; 101 /* 102 * Bitmask to apply to head and tail offsets (constant, equals 103 * ring_entries - 1) 104 */ 105 u32 ring_mask; 106 /* Ring size (constant, power of 2) */ 107 u32 ring_entries; 108 /* 109 * Number of invalid entries dropped by the kernel due to 110 * invalid index stored in array 111 * 112 * Written by the kernel, shouldn't be modified by the 113 * application (i.e. get number of "new events" by comparing to 114 * cached value). 115 * 116 * After a new SQ head value was read by the application this 117 * counter includes all submissions that were dropped reaching 118 * the new SQ head (and possibly more). 119 */ 120 u32 dropped; 121 /* 122 * Runtime flags 123 * 124 * Written by the kernel, shouldn't be modified by the 125 * application. 126 * 127 * The application needs a full memory barrier before checking 128 * for IORING_SQ_NEED_WAKEUP after updating the sq tail. 129 */ 130 u32 flags; 131 /* 132 * Ring buffer of indices into array of io_uring_sqe, which is 133 * mmapped by the application using the IORING_OFF_SQES offset. 134 * 135 * This indirection could e.g. be used to assign fixed 136 * io_uring_sqe entries to operations and only submit them to 137 * the queue when needed. 138 * 139 * The kernel modifies neither the indices array nor the entries 140 * array. 141 */ 142 u32 array[]; 143}; 144 145/* 146 * This data is shared with the application through the mmap at offset 147 * IORING_OFF_CQ_RING. 148 * 149 * The offsets to the member fields are published through struct 150 * io_cqring_offsets when calling io_uring_setup. 151 */ 152struct io_cq_ring { 153 /* 154 * Head and tail offsets into the ring; the offsets need to be 155 * masked to get valid indices. 156 * 157 * The application controls head and the kernel tail. 158 */ 159 struct io_uring r; 160 /* 161 * Bitmask to apply to head and tail offsets (constant, equals 162 * ring_entries - 1) 163 */ 164 u32 ring_mask; 165 /* Ring size (constant, power of 2) */ 166 u32 ring_entries; 167 /* 168 * Number of completion events lost because the queue was full; 169 * this should be avoided by the application by making sure 170 * there are not more requests pending thatn there is space in 171 * the completion queue. 172 * 173 * Written by the kernel, shouldn't be modified by the 174 * application (i.e. get number of "new events" by comparing to 175 * cached value). 176 * 177 * As completion events come in out of order this counter is not 178 * ordered with any other data. 179 */ 180 u32 overflow; 181 /* 182 * Ring buffer of completion events. 183 * 184 * The kernel writes completion events fresh every time they are 185 * produced, so the application is allowed to modify pending 186 * entries. 187 */ 188 struct io_uring_cqe cqes[]; 189}; 190 191struct io_mapped_ubuf { 192 u64 ubuf; 193 size_t len; 194 struct bio_vec *bvec; 195 unsigned int nr_bvecs; 196}; 197 198struct async_list { 199 spinlock_t lock; 200 atomic_t cnt; 201 struct list_head list; 202 203 struct file *file; 204 off_t io_end; 205 size_t io_pages; 206}; 207 208struct io_ring_ctx { 209 struct { 210 struct percpu_ref refs; 211 } ____cacheline_aligned_in_smp; 212 213 struct { 214 unsigned int flags; 215 bool compat; 216 bool account_mem; 217 218 /* SQ ring */ 219 struct io_sq_ring *sq_ring; 220 unsigned cached_sq_head; 221 unsigned sq_entries; 222 unsigned sq_mask; 223 unsigned sq_thread_idle; 224 struct io_uring_sqe *sq_sqes; 225 226 struct list_head defer_list; 227 } ____cacheline_aligned_in_smp; 228 229 /* IO offload */ 230 struct workqueue_struct *sqo_wq; 231 struct task_struct *sqo_thread; /* if using sq thread polling */ 232 struct mm_struct *sqo_mm; 233 wait_queue_head_t sqo_wait; 234 235 struct { 236 /* CQ ring */ 237 struct io_cq_ring *cq_ring; 238 unsigned cached_cq_tail; 239 unsigned cq_entries; 240 unsigned cq_mask; 241 struct wait_queue_head cq_wait; 242 struct fasync_struct *cq_fasync; 243 struct eventfd_ctx *cq_ev_fd; 244 } ____cacheline_aligned_in_smp; 245 246 /* 247 * If used, fixed file set. Writers must ensure that ->refs is dead, 248 * readers must ensure that ->refs is alive as long as the file* is 249 * used. Only updated through io_uring_register(2). 250 */ 251 struct file **user_files; 252 unsigned nr_user_files; 253 254 /* if used, fixed mapped user buffers */ 255 unsigned nr_user_bufs; 256 struct io_mapped_ubuf *user_bufs; 257 258 struct user_struct *user; 259 260 struct completion ctx_done; 261 262 struct { 263 struct mutex uring_lock; 264 wait_queue_head_t wait; 265 } ____cacheline_aligned_in_smp; 266 267 struct { 268 spinlock_t completion_lock; 269 bool poll_multi_file; 270 /* 271 * ->poll_list is protected by the ctx->uring_lock for 272 * io_uring instances that don't use IORING_SETUP_SQPOLL. 273 * For SQPOLL, only the single threaded io_sq_thread() will 274 * manipulate the list, hence no extra locking is needed there. 275 */ 276 struct list_head poll_list; 277 struct list_head cancel_list; 278 } ____cacheline_aligned_in_smp; 279 280 struct async_list pending_async[2]; 281 282#if defined(CONFIG_UNIX) 283 struct socket *ring_sock; 284#endif 285}; 286 287struct sqe_submit { 288 const struct io_uring_sqe *sqe; 289 unsigned short index; 290 bool has_user; 291 bool needs_lock; 292 bool needs_fixed_file; 293}; 294 295/* 296 * First field must be the file pointer in all the 297 * iocb unions! See also 'struct kiocb' in <linux/fs.h> 298 */ 299struct io_poll_iocb { 300 struct file *file; 301 struct wait_queue_head *head; 302 __poll_t events; 303 bool done; 304 bool canceled; 305 struct wait_queue_entry wait; 306}; 307 308/* 309 * NOTE! Each of the iocb union members has the file pointer 310 * as the first entry in their struct definition. So you can 311 * access the file pointer through any of the sub-structs, 312 * or directly as just 'ki_filp' in this struct. 313 */ 314struct io_kiocb { 315 union { 316 struct file *file; 317 struct kiocb rw; 318 struct io_poll_iocb poll; 319 }; 320 321 struct sqe_submit submit; 322 323 struct io_ring_ctx *ctx; 324 struct list_head list; 325 unsigned int flags; 326 refcount_t refs; 327#define REQ_F_NOWAIT 1 /* must not punt to workers */ 328#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */ 329#define REQ_F_FIXED_FILE 4 /* ctx owns file */ 330#define REQ_F_SEQ_PREV 8 /* sequential with previous */ 331#define REQ_F_IO_DRAIN 16 /* drain existing IO first */ 332#define REQ_F_IO_DRAINED 32 /* drain done */ 333 u64 user_data; 334 u32 error; /* iopoll result from callback */ 335 u32 sequence; 336 337 struct work_struct work; 338}; 339 340#define IO_PLUG_THRESHOLD 2 341#define IO_IOPOLL_BATCH 8 342 343struct io_submit_state { 344 struct blk_plug plug; 345 346 /* 347 * io_kiocb alloc cache 348 */ 349 void *reqs[IO_IOPOLL_BATCH]; 350 unsigned int free_reqs; 351 unsigned int cur_req; 352 353 /* 354 * File reference cache 355 */ 356 struct file *file; 357 unsigned int fd; 358 unsigned int has_refs; 359 unsigned int used_refs; 360 unsigned int ios_left; 361}; 362 363static void io_sq_wq_submit_work(struct work_struct *work); 364 365static struct kmem_cache *req_cachep; 366 367static const struct file_operations io_uring_fops; 368 369struct sock *io_uring_get_socket(struct file *file) 370{ 371#if defined(CONFIG_UNIX) 372 if (file->f_op == &io_uring_fops) { 373 struct io_ring_ctx *ctx = file->private_data; 374 375 return ctx->ring_sock->sk; 376 } 377#endif 378 return NULL; 379} 380EXPORT_SYMBOL(io_uring_get_socket); 381 382static void io_ring_ctx_ref_free(struct percpu_ref *ref) 383{ 384 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); 385 386 complete(&ctx->ctx_done); 387} 388 389static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) 390{ 391 struct io_ring_ctx *ctx; 392 int i; 393 394 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 395 if (!ctx) 396 return NULL; 397 398 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) { 399 kfree(ctx); 400 return NULL; 401 } 402 403 ctx->flags = p->flags; 404 init_waitqueue_head(&ctx->cq_wait); 405 init_completion(&ctx->ctx_done); 406 mutex_init(&ctx->uring_lock); 407 init_waitqueue_head(&ctx->wait); 408 for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) { 409 spin_lock_init(&ctx->pending_async[i].lock); 410 INIT_LIST_HEAD(&ctx->pending_async[i].list); 411 atomic_set(&ctx->pending_async[i].cnt, 0); 412 } 413 spin_lock_init(&ctx->completion_lock); 414 INIT_LIST_HEAD(&ctx->poll_list); 415 INIT_LIST_HEAD(&ctx->cancel_list); 416 INIT_LIST_HEAD(&ctx->defer_list); 417 return ctx; 418} 419 420static inline bool io_sequence_defer(struct io_ring_ctx *ctx, 421 struct io_kiocb *req) 422{ 423 if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) 424 return false; 425 426 return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped; 427} 428 429static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) 430{ 431 struct io_kiocb *req; 432 433 if (list_empty(&ctx->defer_list)) 434 return NULL; 435 436 req = list_first_entry(&ctx->defer_list, struct io_kiocb, list); 437 if (!io_sequence_defer(ctx, req)) { 438 list_del_init(&req->list); 439 return req; 440 } 441 442 return NULL; 443} 444 445static void __io_commit_cqring(struct io_ring_ctx *ctx) 446{ 447 struct io_cq_ring *ring = ctx->cq_ring; 448 449 if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) { 450 /* order cqe stores with ring update */ 451 smp_store_release(&ring->r.tail, ctx->cached_cq_tail); 452 453 if (wq_has_sleeper(&ctx->cq_wait)) { 454 wake_up_interruptible(&ctx->cq_wait); 455 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); 456 } 457 } 458} 459 460static void io_commit_cqring(struct io_ring_ctx *ctx) 461{ 462 struct io_kiocb *req; 463 464 __io_commit_cqring(ctx); 465 466 while ((req = io_get_deferred_req(ctx)) != NULL) { 467 req->flags |= REQ_F_IO_DRAINED; 468 queue_work(ctx->sqo_wq, &req->work); 469 } 470} 471 472static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) 473{ 474 struct io_cq_ring *ring = ctx->cq_ring; 475 unsigned tail; 476 477 tail = ctx->cached_cq_tail; 478 /* 479 * writes to the cq entry need to come after reading head; the 480 * control dependency is enough as we're using WRITE_ONCE to 481 * fill the cq entry 482 */ 483 if (tail - READ_ONCE(ring->r.head) == ring->ring_entries) 484 return NULL; 485 486 ctx->cached_cq_tail++; 487 return &ring->cqes[tail & ctx->cq_mask]; 488} 489 490static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, 491 long res) 492{ 493 struct io_uring_cqe *cqe; 494 495 /* 496 * If we can't get a cq entry, userspace overflowed the 497 * submission (by quite a lot). Increment the overflow count in 498 * the ring. 499 */ 500 cqe = io_get_cqring(ctx); 501 if (cqe) { 502 WRITE_ONCE(cqe->user_data, ki_user_data); 503 WRITE_ONCE(cqe->res, res); 504 WRITE_ONCE(cqe->flags, 0); 505 } else { 506 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow); 507 508 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1); 509 } 510} 511 512static void io_cqring_ev_posted(struct io_ring_ctx *ctx) 513{ 514 if (waitqueue_active(&ctx->wait)) 515 wake_up(&ctx->wait); 516 if (waitqueue_active(&ctx->sqo_wait)) 517 wake_up(&ctx->sqo_wait); 518 if (ctx->cq_ev_fd) 519 eventfd_signal(ctx->cq_ev_fd, 1); 520} 521 522static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data, 523 long res) 524{ 525 unsigned long flags; 526 527 spin_lock_irqsave(&ctx->completion_lock, flags); 528 io_cqring_fill_event(ctx, user_data, res); 529 io_commit_cqring(ctx); 530 spin_unlock_irqrestore(&ctx->completion_lock, flags); 531 532 io_cqring_ev_posted(ctx); 533} 534 535static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs) 536{ 537 percpu_ref_put_many(&ctx->refs, refs); 538 539 if (waitqueue_active(&ctx->wait)) 540 wake_up(&ctx->wait); 541} 542 543static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, 544 struct io_submit_state *state) 545{ 546 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN; 547 struct io_kiocb *req; 548 549 if (!percpu_ref_tryget(&ctx->refs)) 550 return NULL; 551 552 if (!state) { 553 req = kmem_cache_alloc(req_cachep, gfp); 554 if (unlikely(!req)) 555 goto out; 556 } else if (!state->free_reqs) { 557 size_t sz; 558 int ret; 559 560 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs)); 561 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs); 562 563 /* 564 * Bulk alloc is all-or-nothing. If we fail to get a batch, 565 * retry single alloc to be on the safe side. 566 */ 567 if (unlikely(ret <= 0)) { 568 state->reqs[0] = kmem_cache_alloc(req_cachep, gfp); 569 if (!state->reqs[0]) 570 goto out; 571 ret = 1; 572 } 573 state->free_reqs = ret - 1; 574 state->cur_req = 1; 575 req = state->reqs[0]; 576 } else { 577 req = state->reqs[state->cur_req]; 578 state->free_reqs--; 579 state->cur_req++; 580 } 581 582 req->ctx = ctx; 583 req->flags = 0; 584 /* one is dropped after submission, the other at completion */ 585 refcount_set(&req->refs, 2); 586 return req; 587out: 588 io_ring_drop_ctx_refs(ctx, 1); 589 return NULL; 590} 591 592static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr) 593{ 594 if (*nr) { 595 kmem_cache_free_bulk(req_cachep, *nr, reqs); 596 io_ring_drop_ctx_refs(ctx, *nr); 597 *nr = 0; 598 } 599} 600 601static void io_free_req(struct io_kiocb *req) 602{ 603 if (req->file && !(req->flags & REQ_F_FIXED_FILE)) 604 fput(req->file); 605 io_ring_drop_ctx_refs(req->ctx, 1); 606 kmem_cache_free(req_cachep, req); 607} 608 609static void io_put_req(struct io_kiocb *req) 610{ 611 if (refcount_dec_and_test(&req->refs)) 612 io_free_req(req); 613} 614 615/* 616 * Find and free completed poll iocbs 617 */ 618static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, 619 struct list_head *done) 620{ 621 void *reqs[IO_IOPOLL_BATCH]; 622 struct io_kiocb *req; 623 int to_free; 624 625 to_free = 0; 626 while (!list_empty(done)) { 627 req = list_first_entry(done, struct io_kiocb, list); 628 list_del(&req->list); 629 630 io_cqring_fill_event(ctx, req->user_data, req->error); 631 (*nr_events)++; 632 633 if (refcount_dec_and_test(&req->refs)) { 634 /* If we're not using fixed files, we have to pair the 635 * completion part with the file put. Use regular 636 * completions for those, only batch free for fixed 637 * file. 638 */ 639 if (req->flags & REQ_F_FIXED_FILE) { 640 reqs[to_free++] = req; 641 if (to_free == ARRAY_SIZE(reqs)) 642 io_free_req_many(ctx, reqs, &to_free); 643 } else { 644 io_free_req(req); 645 } 646 } 647 } 648 649 io_commit_cqring(ctx); 650 io_free_req_many(ctx, reqs, &to_free); 651} 652 653static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, 654 long min) 655{ 656 struct io_kiocb *req, *tmp; 657 LIST_HEAD(done); 658 bool spin; 659 int ret; 660 661 /* 662 * Only spin for completions if we don't have multiple devices hanging 663 * off our complete list, and we're under the requested amount. 664 */ 665 spin = !ctx->poll_multi_file && *nr_events < min; 666 667 ret = 0; 668 list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) { 669 struct kiocb *kiocb = &req->rw; 670 671 /* 672 * Move completed entries to our local list. If we find a 673 * request that requires polling, break out and complete 674 * the done list first, if we have entries there. 675 */ 676 if (req->flags & REQ_F_IOPOLL_COMPLETED) { 677 list_move_tail(&req->list, &done); 678 continue; 679 } 680 if (!list_empty(&done)) 681 break; 682 683 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin); 684 if (ret < 0) 685 break; 686 687 if (ret && spin) 688 spin = false; 689 ret = 0; 690 } 691 692 if (!list_empty(&done)) 693 io_iopoll_complete(ctx, nr_events, &done); 694 695 return ret; 696} 697 698/* 699 * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a 700 * non-spinning poll check - we'll still enter the driver poll loop, but only 701 * as a non-spinning completion check. 702 */ 703static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events, 704 long min) 705{ 706 while (!list_empty(&ctx->poll_list)) { 707 int ret; 708 709 ret = io_do_iopoll(ctx, nr_events, min); 710 if (ret < 0) 711 return ret; 712 if (!min || *nr_events >= min) 713 return 0; 714 } 715 716 return 1; 717} 718 719/* 720 * We can't just wait for polled events to come to us, we have to actively 721 * find and complete them. 722 */ 723static void io_iopoll_reap_events(struct io_ring_ctx *ctx) 724{ 725 if (!(ctx->flags & IORING_SETUP_IOPOLL)) 726 return; 727 728 mutex_lock(&ctx->uring_lock); 729 while (!list_empty(&ctx->poll_list)) { 730 unsigned int nr_events = 0; 731 732 io_iopoll_getevents(ctx, &nr_events, 1); 733 } 734 mutex_unlock(&ctx->uring_lock); 735} 736 737static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, 738 long min) 739{ 740 int ret = 0; 741 742 do { 743 int tmin = 0; 744 745 if (*nr_events < min) 746 tmin = min - *nr_events; 747 748 ret = io_iopoll_getevents(ctx, nr_events, tmin); 749 if (ret <= 0) 750 break; 751 ret = 0; 752 } while (min && !*nr_events && !need_resched()); 753 754 return ret; 755} 756 757static void kiocb_end_write(struct kiocb *kiocb) 758{ 759 if (kiocb->ki_flags & IOCB_WRITE) { 760 struct inode *inode = file_inode(kiocb->ki_filp); 761 762 /* 763 * Tell lockdep we inherited freeze protection from submission 764 * thread. 765 */ 766 if (S_ISREG(inode->i_mode)) 767 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE); 768 file_end_write(kiocb->ki_filp); 769 } 770} 771 772static void io_complete_rw(struct kiocb *kiocb, long res, long res2) 773{ 774 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 775 776 kiocb_end_write(kiocb); 777 778 io_cqring_add_event(req->ctx, req->user_data, res); 779 io_put_req(req); 780} 781 782static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) 783{ 784 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 785 786 kiocb_end_write(kiocb); 787 788 req->error = res; 789 if (res != -EAGAIN) 790 req->flags |= REQ_F_IOPOLL_COMPLETED; 791} 792 793/* 794 * After the iocb has been issued, it's safe to be found on the poll list. 795 * Adding the kiocb to the list AFTER submission ensures that we don't 796 * find it from a io_iopoll_getevents() thread before the issuer is done 797 * accessing the kiocb cookie. 798 */ 799static void io_iopoll_req_issued(struct io_kiocb *req) 800{ 801 struct io_ring_ctx *ctx = req->ctx; 802 803 /* 804 * Track whether we have multiple files in our lists. This will impact 805 * how we do polling eventually, not spinning if we're on potentially 806 * different devices. 807 */ 808 if (list_empty(&ctx->poll_list)) { 809 ctx->poll_multi_file = false; 810 } else if (!ctx->poll_multi_file) { 811 struct io_kiocb *list_req; 812 813 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb, 814 list); 815 if (list_req->rw.ki_filp != req->rw.ki_filp) 816 ctx->poll_multi_file = true; 817 } 818 819 /* 820 * For fast devices, IO may have already completed. If it has, add 821 * it to the front so we find it first. 822 */ 823 if (req->flags & REQ_F_IOPOLL_COMPLETED) 824 list_add(&req->list, &ctx->poll_list); 825 else 826 list_add_tail(&req->list, &ctx->poll_list); 827} 828 829static void io_file_put(struct io_submit_state *state) 830{ 831 if (state->file) { 832 int diff = state->has_refs - state->used_refs; 833 834 if (diff) 835 fput_many(state->file, diff); 836 state->file = NULL; 837 } 838} 839 840/* 841 * Get as many references to a file as we have IOs left in this submission, 842 * assuming most submissions are for one file, or at least that each file 843 * has more than one submission. 844 */ 845static struct file *io_file_get(struct io_submit_state *state, int fd) 846{ 847 if (!state) 848 return fget(fd); 849 850 if (state->file) { 851 if (state->fd == fd) { 852 state->used_refs++; 853 state->ios_left--; 854 return state->file; 855 } 856 io_file_put(state); 857 } 858 state->file = fget_many(fd, state->ios_left); 859 if (!state->file) 860 return NULL; 861 862 state->fd = fd; 863 state->has_refs = state->ios_left; 864 state->used_refs = 1; 865 state->ios_left--; 866 return state->file; 867} 868 869/* 870 * If we tracked the file through the SCM inflight mechanism, we could support 871 * any file. For now, just ensure that anything potentially problematic is done 872 * inline. 873 */ 874static bool io_file_supports_async(struct file *file) 875{ 876 umode_t mode = file_inode(file)->i_mode; 877 878 if (S_ISBLK(mode) || S_ISCHR(mode)) 879 return true; 880 if (S_ISREG(mode) && file->f_op != &io_uring_fops) 881 return true; 882 883 return false; 884} 885 886static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, 887 bool force_nonblock) 888{ 889 const struct io_uring_sqe *sqe = s->sqe; 890 struct io_ring_ctx *ctx = req->ctx; 891 struct kiocb *kiocb = &req->rw; 892 unsigned ioprio; 893 int ret; 894 895 if (!req->file) 896 return -EBADF; 897 898 if (force_nonblock && !io_file_supports_async(req->file)) 899 force_nonblock = false; 900 901 kiocb->ki_pos = READ_ONCE(sqe->off); 902 kiocb->ki_flags = iocb_flags(kiocb->ki_filp); 903 kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp)); 904 905 ioprio = READ_ONCE(sqe->ioprio); 906 if (ioprio) { 907 ret = ioprio_check_cap(ioprio); 908 if (ret) 909 return ret; 910 911 kiocb->ki_ioprio = ioprio; 912 } else 913 kiocb->ki_ioprio = get_current_ioprio(); 914 915 ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags)); 916 if (unlikely(ret)) 917 return ret; 918 919 /* don't allow async punt if RWF_NOWAIT was requested */ 920 if (kiocb->ki_flags & IOCB_NOWAIT) 921 req->flags |= REQ_F_NOWAIT; 922 923 if (force_nonblock) 924 kiocb->ki_flags |= IOCB_NOWAIT; 925 926 if (ctx->flags & IORING_SETUP_IOPOLL) { 927 if (!(kiocb->ki_flags & IOCB_DIRECT) || 928 !kiocb->ki_filp->f_op->iopoll) 929 return -EOPNOTSUPP; 930 931 req->error = 0; 932 kiocb->ki_flags |= IOCB_HIPRI; 933 kiocb->ki_complete = io_complete_rw_iopoll; 934 } else { 935 if (kiocb->ki_flags & IOCB_HIPRI) 936 return -EINVAL; 937 kiocb->ki_complete = io_complete_rw; 938 } 939 return 0; 940} 941 942static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) 943{ 944 switch (ret) { 945 case -EIOCBQUEUED: 946 break; 947 case -ERESTARTSYS: 948 case -ERESTARTNOINTR: 949 case -ERESTARTNOHAND: 950 case -ERESTART_RESTARTBLOCK: 951 /* 952 * We can't just restart the syscall, since previously 953 * submitted sqes may already be in progress. Just fail this 954 * IO with EINTR. 955 */ 956 ret = -EINTR; 957 /* fall through */ 958 default: 959 kiocb->ki_complete(kiocb, ret, 0); 960 } 961} 962 963static int io_import_fixed(struct io_ring_ctx *ctx, int rw, 964 const struct io_uring_sqe *sqe, 965 struct iov_iter *iter) 966{ 967 size_t len = READ_ONCE(sqe->len); 968 struct io_mapped_ubuf *imu; 969 unsigned index, buf_index; 970 size_t offset; 971 u64 buf_addr; 972 973 /* attempt to use fixed buffers without having provided iovecs */ 974 if (unlikely(!ctx->user_bufs)) 975 return -EFAULT; 976 977 buf_index = READ_ONCE(sqe->buf_index); 978 if (unlikely(buf_index >= ctx->nr_user_bufs)) 979 return -EFAULT; 980 981 index = array_index_nospec(buf_index, ctx->nr_user_bufs); 982 imu = &ctx->user_bufs[index]; 983 buf_addr = READ_ONCE(sqe->addr); 984 985 /* overflow */ 986 if (buf_addr + len < buf_addr) 987 return -EFAULT; 988 /* not inside the mapped region */ 989 if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len) 990 return -EFAULT; 991 992 /* 993 * May not be a start of buffer, set size appropriately 994 * and advance us to the beginning. 995 */ 996 offset = buf_addr - imu->ubuf; 997 iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len); 998 if (offset) 999 iov_iter_advance(iter, offset); 1000 1001 /* don't drop a reference to these pages */ 1002 iter->type |= ITER_BVEC_FLAG_NO_REF; 1003 return 0; 1004} 1005 1006static int io_import_iovec(struct io_ring_ctx *ctx, int rw, 1007 const struct sqe_submit *s, struct iovec **iovec, 1008 struct iov_iter *iter) 1009{ 1010 const struct io_uring_sqe *sqe = s->sqe; 1011 void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr)); 1012 size_t sqe_len = READ_ONCE(sqe->len); 1013 u8 opcode; 1014 1015 /* 1016 * We're reading ->opcode for the second time, but the first read 1017 * doesn't care whether it's _FIXED or not, so it doesn't matter 1018 * whether ->opcode changes concurrently. The first read does care 1019 * about whether it is a READ or a WRITE, so we don't trust this read 1020 * for that purpose and instead let the caller pass in the read/write 1021 * flag. 1022 */ 1023 opcode = READ_ONCE(sqe->opcode); 1024 if (opcode == IORING_OP_READ_FIXED || 1025 opcode == IORING_OP_WRITE_FIXED) { 1026 int ret = io_import_fixed(ctx, rw, sqe, iter); 1027 *iovec = NULL; 1028 return ret; 1029 } 1030 1031 if (!s->has_user) 1032 return -EFAULT; 1033 1034#ifdef CONFIG_COMPAT 1035 if (ctx->compat) 1036 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV, 1037 iovec, iter); 1038#endif 1039 1040 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); 1041} 1042 1043/* 1044 * Make a note of the last file/offset/direction we punted to async 1045 * context. We'll use this information to see if we can piggy back a 1046 * sequential request onto the previous one, if it's still hasn't been 1047 * completed by the async worker. 1048 */ 1049static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) 1050{ 1051 struct async_list *async_list = &req->ctx->pending_async[rw]; 1052 struct kiocb *kiocb = &req->rw; 1053 struct file *filp = kiocb->ki_filp; 1054 off_t io_end = kiocb->ki_pos + len; 1055 1056 if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) { 1057 unsigned long max_pages; 1058 1059 /* Use 8x RA size as a decent limiter for both reads/writes */ 1060 max_pages = filp->f_ra.ra_pages; 1061 if (!max_pages) 1062 max_pages = VM_READAHEAD_PAGES; 1063 max_pages *= 8; 1064 1065 /* If max pages are exceeded, reset the state */ 1066 len >>= PAGE_SHIFT; 1067 if (async_list->io_pages + len <= max_pages) { 1068 req->flags |= REQ_F_SEQ_PREV; 1069 async_list->io_pages += len; 1070 } else { 1071 io_end = 0; 1072 async_list->io_pages = 0; 1073 } 1074 } 1075 1076 /* New file? Reset state. */ 1077 if (async_list->file != filp) { 1078 async_list->io_pages = 0; 1079 async_list->file = filp; 1080 } 1081 async_list->io_end = io_end; 1082} 1083 1084static int io_read(struct io_kiocb *req, const struct sqe_submit *s, 1085 bool force_nonblock) 1086{ 1087 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1088 struct kiocb *kiocb = &req->rw; 1089 struct iov_iter iter; 1090 struct file *file; 1091 size_t iov_count; 1092 int ret; 1093 1094 ret = io_prep_rw(req, s, force_nonblock); 1095 if (ret) 1096 return ret; 1097 file = kiocb->ki_filp; 1098 1099 if (unlikely(!(file->f_mode & FMODE_READ))) 1100 return -EBADF; 1101 if (unlikely(!file->f_op->read_iter)) 1102 return -EINVAL; 1103 1104 ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter); 1105 if (ret) 1106 return ret; 1107 1108 iov_count = iov_iter_count(&iter); 1109 ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count); 1110 if (!ret) { 1111 ssize_t ret2; 1112 1113 /* Catch -EAGAIN return for forced non-blocking submission */ 1114 ret2 = call_read_iter(file, kiocb, &iter); 1115 if (!force_nonblock || ret2 != -EAGAIN) { 1116 io_rw_done(kiocb, ret2); 1117 } else { 1118 /* 1119 * If ->needs_lock is true, we're already in async 1120 * context. 1121 */ 1122 if (!s->needs_lock) 1123 io_async_list_note(READ, req, iov_count); 1124 ret = -EAGAIN; 1125 } 1126 } 1127 kfree(iovec); 1128 return ret; 1129} 1130 1131static int io_write(struct io_kiocb *req, const struct sqe_submit *s, 1132 bool force_nonblock) 1133{ 1134 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1135 struct kiocb *kiocb = &req->rw; 1136 struct iov_iter iter; 1137 struct file *file; 1138 size_t iov_count; 1139 int ret; 1140 1141 ret = io_prep_rw(req, s, force_nonblock); 1142 if (ret) 1143 return ret; 1144 1145 file = kiocb->ki_filp; 1146 if (unlikely(!(file->f_mode & FMODE_WRITE))) 1147 return -EBADF; 1148 if (unlikely(!file->f_op->write_iter)) 1149 return -EINVAL; 1150 1151 ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter); 1152 if (ret) 1153 return ret; 1154 1155 iov_count = iov_iter_count(&iter); 1156 1157 ret = -EAGAIN; 1158 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) { 1159 /* If ->needs_lock is true, we're already in async context. */ 1160 if (!s->needs_lock) 1161 io_async_list_note(WRITE, req, iov_count); 1162 goto out_free; 1163 } 1164 1165 ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count); 1166 if (!ret) { 1167 ssize_t ret2; 1168 1169 /* 1170 * Open-code file_start_write here to grab freeze protection, 1171 * which will be released by another thread in 1172 * io_complete_rw(). Fool lockdep by telling it the lock got 1173 * released so that it doesn't complain about the held lock when 1174 * we return to userspace. 1175 */ 1176 if (S_ISREG(file_inode(file)->i_mode)) { 1177 __sb_start_write(file_inode(file)->i_sb, 1178 SB_FREEZE_WRITE, true); 1179 __sb_writers_release(file_inode(file)->i_sb, 1180 SB_FREEZE_WRITE); 1181 } 1182 kiocb->ki_flags |= IOCB_WRITE; 1183 1184 ret2 = call_write_iter(file, kiocb, &iter); 1185 if (!force_nonblock || ret2 != -EAGAIN) { 1186 io_rw_done(kiocb, ret2); 1187 } else { 1188 /* 1189 * If ->needs_lock is true, we're already in async 1190 * context. 1191 */ 1192 if (!s->needs_lock) 1193 io_async_list_note(WRITE, req, iov_count); 1194 ret = -EAGAIN; 1195 } 1196 } 1197out_free: 1198 kfree(iovec); 1199 return ret; 1200} 1201 1202/* 1203 * IORING_OP_NOP just posts a completion event, nothing else. 1204 */ 1205static int io_nop(struct io_kiocb *req, u64 user_data) 1206{ 1207 struct io_ring_ctx *ctx = req->ctx; 1208 long err = 0; 1209 1210 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1211 return -EINVAL; 1212 1213 io_cqring_add_event(ctx, user_data, err); 1214 io_put_req(req); 1215 return 0; 1216} 1217 1218static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1219{ 1220 struct io_ring_ctx *ctx = req->ctx; 1221 1222 if (!req->file) 1223 return -EBADF; 1224 1225 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1226 return -EINVAL; 1227 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 1228 return -EINVAL; 1229 1230 return 0; 1231} 1232 1233static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1234 bool force_nonblock) 1235{ 1236 loff_t sqe_off = READ_ONCE(sqe->off); 1237 loff_t sqe_len = READ_ONCE(sqe->len); 1238 loff_t end = sqe_off + sqe_len; 1239 unsigned fsync_flags; 1240 int ret; 1241 1242 fsync_flags = READ_ONCE(sqe->fsync_flags); 1243 if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC)) 1244 return -EINVAL; 1245 1246 ret = io_prep_fsync(req, sqe); 1247 if (ret) 1248 return ret; 1249 1250 /* fsync always requires a blocking context */ 1251 if (force_nonblock) 1252 return -EAGAIN; 1253 1254 ret = vfs_fsync_range(req->rw.ki_filp, sqe_off, 1255 end > 0 ? end : LLONG_MAX, 1256 fsync_flags & IORING_FSYNC_DATASYNC); 1257 1258 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1259 io_put_req(req); 1260 return 0; 1261} 1262 1263static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1264{ 1265 struct io_ring_ctx *ctx = req->ctx; 1266 int ret = 0; 1267 1268 if (!req->file) 1269 return -EBADF; 1270 1271 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1272 return -EINVAL; 1273 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 1274 return -EINVAL; 1275 1276 return ret; 1277} 1278 1279static int io_sync_file_range(struct io_kiocb *req, 1280 const struct io_uring_sqe *sqe, 1281 bool force_nonblock) 1282{ 1283 loff_t sqe_off; 1284 loff_t sqe_len; 1285 unsigned flags; 1286 int ret; 1287 1288 ret = io_prep_sfr(req, sqe); 1289 if (ret) 1290 return ret; 1291 1292 /* sync_file_range always requires a blocking context */ 1293 if (force_nonblock) 1294 return -EAGAIN; 1295 1296 sqe_off = READ_ONCE(sqe->off); 1297 sqe_len = READ_ONCE(sqe->len); 1298 flags = READ_ONCE(sqe->sync_range_flags); 1299 1300 ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags); 1301 1302 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1303 io_put_req(req); 1304 return 0; 1305} 1306 1307static void io_poll_remove_one(struct io_kiocb *req) 1308{ 1309 struct io_poll_iocb *poll = &req->poll; 1310 1311 spin_lock(&poll->head->lock); 1312 WRITE_ONCE(poll->canceled, true); 1313 if (!list_empty(&poll->wait.entry)) { 1314 list_del_init(&poll->wait.entry); 1315 queue_work(req->ctx->sqo_wq, &req->work); 1316 } 1317 spin_unlock(&poll->head->lock); 1318 1319 list_del_init(&req->list); 1320} 1321 1322static void io_poll_remove_all(struct io_ring_ctx *ctx) 1323{ 1324 struct io_kiocb *req; 1325 1326 spin_lock_irq(&ctx->completion_lock); 1327 while (!list_empty(&ctx->cancel_list)) { 1328 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list); 1329 io_poll_remove_one(req); 1330 } 1331 spin_unlock_irq(&ctx->completion_lock); 1332} 1333 1334/* 1335 * Find a running poll command that matches one specified in sqe->addr, 1336 * and remove it if found. 1337 */ 1338static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1339{ 1340 struct io_ring_ctx *ctx = req->ctx; 1341 struct io_kiocb *poll_req, *next; 1342 int ret = -ENOENT; 1343 1344 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1345 return -EINVAL; 1346 if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index || 1347 sqe->poll_events) 1348 return -EINVAL; 1349 1350 spin_lock_irq(&ctx->completion_lock); 1351 list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) { 1352 if (READ_ONCE(sqe->addr) == poll_req->user_data) { 1353 io_poll_remove_one(poll_req); 1354 ret = 0; 1355 break; 1356 } 1357 } 1358 spin_unlock_irq(&ctx->completion_lock); 1359 1360 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1361 io_put_req(req); 1362 return 0; 1363} 1364 1365static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req, 1366 __poll_t mask) 1367{ 1368 req->poll.done = true; 1369 io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask)); 1370 io_commit_cqring(ctx); 1371} 1372 1373static void io_poll_complete_work(struct work_struct *work) 1374{ 1375 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1376 struct io_poll_iocb *poll = &req->poll; 1377 struct poll_table_struct pt = { ._key = poll->events }; 1378 struct io_ring_ctx *ctx = req->ctx; 1379 __poll_t mask = 0; 1380 1381 if (!READ_ONCE(poll->canceled)) 1382 mask = vfs_poll(poll->file, &pt) & poll->events; 1383 1384 /* 1385 * Note that ->ki_cancel callers also delete iocb from active_reqs after 1386 * calling ->ki_cancel. We need the ctx_lock roundtrip here to 1387 * synchronize with them. In the cancellation case the list_del_init 1388 * itself is not actually needed, but harmless so we keep it in to 1389 * avoid further branches in the fast path. 1390 */ 1391 spin_lock_irq(&ctx->completion_lock); 1392 if (!mask && !READ_ONCE(poll->canceled)) { 1393 add_wait_queue(poll->head, &poll->wait); 1394 spin_unlock_irq(&ctx->completion_lock); 1395 return; 1396 } 1397 list_del_init(&req->list); 1398 io_poll_complete(ctx, req, mask); 1399 spin_unlock_irq(&ctx->completion_lock); 1400 1401 io_cqring_ev_posted(ctx); 1402 io_put_req(req); 1403} 1404 1405static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, 1406 void *key) 1407{ 1408 struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb, 1409 wait); 1410 struct io_kiocb *req = container_of(poll, struct io_kiocb, poll); 1411 struct io_ring_ctx *ctx = req->ctx; 1412 __poll_t mask = key_to_poll(key); 1413 unsigned long flags; 1414 1415 /* for instances that support it check for an event match first: */ 1416 if (mask && !(mask & poll->events)) 1417 return 0; 1418 1419 list_del_init(&poll->wait.entry); 1420 1421 if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) { 1422 list_del(&req->list); 1423 io_poll_complete(ctx, req, mask); 1424 spin_unlock_irqrestore(&ctx->completion_lock, flags); 1425 1426 io_cqring_ev_posted(ctx); 1427 io_put_req(req); 1428 } else { 1429 queue_work(ctx->sqo_wq, &req->work); 1430 } 1431 1432 return 1; 1433} 1434 1435struct io_poll_table { 1436 struct poll_table_struct pt; 1437 struct io_kiocb *req; 1438 int error; 1439}; 1440 1441static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, 1442 struct poll_table_struct *p) 1443{ 1444 struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); 1445 1446 if (unlikely(pt->req->poll.head)) { 1447 pt->error = -EINVAL; 1448 return; 1449 } 1450 1451 pt->error = 0; 1452 pt->req->poll.head = head; 1453 add_wait_queue(head, &pt->req->poll.wait); 1454} 1455 1456static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1457{ 1458 struct io_poll_iocb *poll = &req->poll; 1459 struct io_ring_ctx *ctx = req->ctx; 1460 struct io_poll_table ipt; 1461 bool cancel = false; 1462 __poll_t mask; 1463 u16 events; 1464 1465 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1466 return -EINVAL; 1467 if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index) 1468 return -EINVAL; 1469 if (!poll->file) 1470 return -EBADF; 1471 1472 INIT_WORK(&req->work, io_poll_complete_work); 1473 events = READ_ONCE(sqe->poll_events); 1474 poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; 1475 1476 poll->head = NULL; 1477 poll->done = false; 1478 poll->canceled = false; 1479 1480 ipt.pt._qproc = io_poll_queue_proc; 1481 ipt.pt._key = poll->events; 1482 ipt.req = req; 1483 ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */ 1484 1485 /* initialized the list so that we can do list_empty checks */ 1486 INIT_LIST_HEAD(&poll->wait.entry); 1487 init_waitqueue_func_entry(&poll->wait, io_poll_wake); 1488 1489 mask = vfs_poll(poll->file, &ipt.pt) & poll->events; 1490 1491 spin_lock_irq(&ctx->completion_lock); 1492 if (likely(poll->head)) { 1493 spin_lock(&poll->head->lock); 1494 if (unlikely(list_empty(&poll->wait.entry))) { 1495 if (ipt.error) 1496 cancel = true; 1497 ipt.error = 0; 1498 mask = 0; 1499 } 1500 if (mask || ipt.error) 1501 list_del_init(&poll->wait.entry); 1502 else if (cancel) 1503 WRITE_ONCE(poll->canceled, true); 1504 else if (!poll->done) /* actually waiting for an event */ 1505 list_add_tail(&req->list, &ctx->cancel_list); 1506 spin_unlock(&poll->head->lock); 1507 } 1508 if (mask) { /* no async, we'd stolen it */ 1509 ipt.error = 0; 1510 io_poll_complete(ctx, req, mask); 1511 } 1512 spin_unlock_irq(&ctx->completion_lock); 1513 1514 if (mask) { 1515 io_cqring_ev_posted(ctx); 1516 io_put_req(req); 1517 } 1518 return ipt.error; 1519} 1520 1521static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, 1522 const struct io_uring_sqe *sqe) 1523{ 1524 struct io_uring_sqe *sqe_copy; 1525 1526 if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) 1527 return 0; 1528 1529 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); 1530 if (!sqe_copy) 1531 return -EAGAIN; 1532 1533 spin_lock_irq(&ctx->completion_lock); 1534 if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) { 1535 spin_unlock_irq(&ctx->completion_lock); 1536 kfree(sqe_copy); 1537 return 0; 1538 } 1539 1540 memcpy(sqe_copy, sqe, sizeof(*sqe_copy)); 1541 req->submit.sqe = sqe_copy; 1542 1543 INIT_WORK(&req->work, io_sq_wq_submit_work); 1544 list_add_tail(&req->list, &ctx->defer_list); 1545 spin_unlock_irq(&ctx->completion_lock); 1546 return -EIOCBQUEUED; 1547} 1548 1549static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 1550 const struct sqe_submit *s, bool force_nonblock) 1551{ 1552 int ret, opcode; 1553 1554 if (unlikely(s->index >= ctx->sq_entries)) 1555 return -EINVAL; 1556 req->user_data = READ_ONCE(s->sqe->user_data); 1557 1558 opcode = READ_ONCE(s->sqe->opcode); 1559 switch (opcode) { 1560 case IORING_OP_NOP: 1561 ret = io_nop(req, req->user_data); 1562 break; 1563 case IORING_OP_READV: 1564 if (unlikely(s->sqe->buf_index)) 1565 return -EINVAL; 1566 ret = io_read(req, s, force_nonblock); 1567 break; 1568 case IORING_OP_WRITEV: 1569 if (unlikely(s->sqe->buf_index)) 1570 return -EINVAL; 1571 ret = io_write(req, s, force_nonblock); 1572 break; 1573 case IORING_OP_READ_FIXED: 1574 ret = io_read(req, s, force_nonblock); 1575 break; 1576 case IORING_OP_WRITE_FIXED: 1577 ret = io_write(req, s, force_nonblock); 1578 break; 1579 case IORING_OP_FSYNC: 1580 ret = io_fsync(req, s->sqe, force_nonblock); 1581 break; 1582 case IORING_OP_POLL_ADD: 1583 ret = io_poll_add(req, s->sqe); 1584 break; 1585 case IORING_OP_POLL_REMOVE: 1586 ret = io_poll_remove(req, s->sqe); 1587 break; 1588 case IORING_OP_SYNC_FILE_RANGE: 1589 ret = io_sync_file_range(req, s->sqe, force_nonblock); 1590 break; 1591 default: 1592 ret = -EINVAL; 1593 break; 1594 } 1595 1596 if (ret) 1597 return ret; 1598 1599 if (ctx->flags & IORING_SETUP_IOPOLL) { 1600 if (req->error == -EAGAIN) 1601 return -EAGAIN; 1602 1603 /* workqueue context doesn't hold uring_lock, grab it now */ 1604 if (s->needs_lock) 1605 mutex_lock(&ctx->uring_lock); 1606 io_iopoll_req_issued(req); 1607 if (s->needs_lock) 1608 mutex_unlock(&ctx->uring_lock); 1609 } 1610 1611 return 0; 1612} 1613 1614static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx, 1615 const struct io_uring_sqe *sqe) 1616{ 1617 switch (sqe->opcode) { 1618 case IORING_OP_READV: 1619 case IORING_OP_READ_FIXED: 1620 return &ctx->pending_async[READ]; 1621 case IORING_OP_WRITEV: 1622 case IORING_OP_WRITE_FIXED: 1623 return &ctx->pending_async[WRITE]; 1624 default: 1625 return NULL; 1626 } 1627} 1628 1629static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) 1630{ 1631 u8 opcode = READ_ONCE(sqe->opcode); 1632 1633 return !(opcode == IORING_OP_READ_FIXED || 1634 opcode == IORING_OP_WRITE_FIXED); 1635} 1636 1637static void io_sq_wq_submit_work(struct work_struct *work) 1638{ 1639 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1640 struct io_ring_ctx *ctx = req->ctx; 1641 struct mm_struct *cur_mm = NULL; 1642 struct async_list *async_list; 1643 LIST_HEAD(req_list); 1644 mm_segment_t old_fs; 1645 int ret; 1646 1647 async_list = io_async_list_from_sqe(ctx, req->submit.sqe); 1648restart: 1649 do { 1650 struct sqe_submit *s = &req->submit; 1651 const struct io_uring_sqe *sqe = s->sqe; 1652 1653 /* Ensure we clear previously set non-block flag */ 1654 req->rw.ki_flags &= ~IOCB_NOWAIT; 1655 1656 ret = 0; 1657 if (io_sqe_needs_user(sqe) && !cur_mm) { 1658 if (!mmget_not_zero(ctx->sqo_mm)) { 1659 ret = -EFAULT; 1660 } else { 1661 cur_mm = ctx->sqo_mm; 1662 use_mm(cur_mm); 1663 old_fs = get_fs(); 1664 set_fs(USER_DS); 1665 } 1666 } 1667 1668 if (!ret) { 1669 s->has_user = cur_mm != NULL; 1670 s->needs_lock = true; 1671 do { 1672 ret = __io_submit_sqe(ctx, req, s, false); 1673 /* 1674 * We can get EAGAIN for polled IO even though 1675 * we're forcing a sync submission from here, 1676 * since we can't wait for request slots on the 1677 * block side. 1678 */ 1679 if (ret != -EAGAIN) 1680 break; 1681 cond_resched(); 1682 } while (1); 1683 } 1684 1685 /* drop submission reference */ 1686 io_put_req(req); 1687 1688 if (ret) { 1689 io_cqring_add_event(ctx, sqe->user_data, ret); 1690 io_put_req(req); 1691 } 1692 1693 /* async context always use a copy of the sqe */ 1694 kfree(sqe); 1695 1696 if (!async_list) 1697 break; 1698 if (!list_empty(&req_list)) { 1699 req = list_first_entry(&req_list, struct io_kiocb, 1700 list); 1701 list_del(&req->list); 1702 continue; 1703 } 1704 if (list_empty(&async_list->list)) 1705 break; 1706 1707 req = NULL; 1708 spin_lock(&async_list->lock); 1709 if (list_empty(&async_list->list)) { 1710 spin_unlock(&async_list->lock); 1711 break; 1712 } 1713 list_splice_init(&async_list->list, &req_list); 1714 spin_unlock(&async_list->lock); 1715 1716 req = list_first_entry(&req_list, struct io_kiocb, list); 1717 list_del(&req->list); 1718 } while (req); 1719 1720 /* 1721 * Rare case of racing with a submitter. If we find the count has 1722 * dropped to zero AND we have pending work items, then restart 1723 * the processing. This is a tiny race window. 1724 */ 1725 if (async_list) { 1726 ret = atomic_dec_return(&async_list->cnt); 1727 while (!ret && !list_empty(&async_list->list)) { 1728 spin_lock(&async_list->lock); 1729 atomic_inc(&async_list->cnt); 1730 list_splice_init(&async_list->list, &req_list); 1731 spin_unlock(&async_list->lock); 1732 1733 if (!list_empty(&req_list)) { 1734 req = list_first_entry(&req_list, 1735 struct io_kiocb, list); 1736 list_del(&req->list); 1737 goto restart; 1738 } 1739 ret = atomic_dec_return(&async_list->cnt); 1740 } 1741 } 1742 1743 if (cur_mm) { 1744 set_fs(old_fs); 1745 unuse_mm(cur_mm); 1746 mmput(cur_mm); 1747 } 1748} 1749 1750/* 1751 * See if we can piggy back onto previously submitted work, that is still 1752 * running. We currently only allow this if the new request is sequential 1753 * to the previous one we punted. 1754 */ 1755static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) 1756{ 1757 bool ret = false; 1758 1759 if (!list) 1760 return false; 1761 if (!(req->flags & REQ_F_SEQ_PREV)) 1762 return false; 1763 if (!atomic_read(&list->cnt)) 1764 return false; 1765 1766 ret = true; 1767 spin_lock(&list->lock); 1768 list_add_tail(&req->list, &list->list); 1769 if (!atomic_read(&list->cnt)) { 1770 list_del_init(&req->list); 1771 ret = false; 1772 } 1773 spin_unlock(&list->lock); 1774 return ret; 1775} 1776 1777static bool io_op_needs_file(const struct io_uring_sqe *sqe) 1778{ 1779 int op = READ_ONCE(sqe->opcode); 1780 1781 switch (op) { 1782 case IORING_OP_NOP: 1783 case IORING_OP_POLL_REMOVE: 1784 return false; 1785 default: 1786 return true; 1787 } 1788} 1789 1790static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, 1791 struct io_submit_state *state, struct io_kiocb *req) 1792{ 1793 unsigned flags; 1794 int fd; 1795 1796 flags = READ_ONCE(s->sqe->flags); 1797 fd = READ_ONCE(s->sqe->fd); 1798 1799 if (flags & IOSQE_IO_DRAIN) { 1800 req->flags |= REQ_F_IO_DRAIN; 1801 req->sequence = ctx->cached_sq_head - 1; 1802 } 1803 1804 if (!io_op_needs_file(s->sqe)) { 1805 req->file = NULL; 1806 return 0; 1807 } 1808 1809 if (flags & IOSQE_FIXED_FILE) { 1810 if (unlikely(!ctx->user_files || 1811 (unsigned) fd >= ctx->nr_user_files)) 1812 return -EBADF; 1813 req->file = ctx->user_files[fd]; 1814 req->flags |= REQ_F_FIXED_FILE; 1815 } else { 1816 if (s->needs_fixed_file) 1817 return -EBADF; 1818 req->file = io_file_get(state, fd); 1819 if (unlikely(!req->file)) 1820 return -EBADF; 1821 } 1822 1823 return 0; 1824} 1825 1826static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, 1827 struct io_submit_state *state) 1828{ 1829 struct io_kiocb *req; 1830 int ret; 1831 1832 /* enforce forwards compatibility on users */ 1833 if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN))) 1834 return -EINVAL; 1835 1836 req = io_get_req(ctx, state); 1837 if (unlikely(!req)) 1838 return -EAGAIN; 1839 1840 ret = io_req_set_file(ctx, s, state, req); 1841 if (unlikely(ret)) 1842 goto out; 1843 1844 ret = io_req_defer(ctx, req, s->sqe); 1845 if (ret) { 1846 if (ret == -EIOCBQUEUED) 1847 ret = 0; 1848 return ret; 1849 } 1850 1851 ret = __io_submit_sqe(ctx, req, s, true); 1852 if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { 1853 struct io_uring_sqe *sqe_copy; 1854 1855 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); 1856 if (sqe_copy) { 1857 struct async_list *list; 1858 1859 memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy)); 1860 s->sqe = sqe_copy; 1861 1862 memcpy(&req->submit, s, sizeof(*s)); 1863 list = io_async_list_from_sqe(ctx, s->sqe); 1864 if (!io_add_to_prev_work(list, req)) { 1865 if (list) 1866 atomic_inc(&list->cnt); 1867 INIT_WORK(&req->work, io_sq_wq_submit_work); 1868 queue_work(ctx->sqo_wq, &req->work); 1869 } 1870 1871 /* 1872 * Queued up for async execution, worker will release 1873 * submit reference when the iocb is actually 1874 * submitted. 1875 */ 1876 return 0; 1877 } 1878 } 1879 1880out: 1881 /* drop submission reference */ 1882 io_put_req(req); 1883 1884 /* and drop final reference, if we failed */ 1885 if (ret) 1886 io_put_req(req); 1887 1888 return ret; 1889} 1890 1891/* 1892 * Batched submission is done, ensure local IO is flushed out. 1893 */ 1894static void io_submit_state_end(struct io_submit_state *state) 1895{ 1896 blk_finish_plug(&state->plug); 1897 io_file_put(state); 1898 if (state->free_reqs) 1899 kmem_cache_free_bulk(req_cachep, state->free_reqs, 1900 &state->reqs[state->cur_req]); 1901} 1902 1903/* 1904 * Start submission side cache. 1905 */ 1906static void io_submit_state_start(struct io_submit_state *state, 1907 struct io_ring_ctx *ctx, unsigned max_ios) 1908{ 1909 blk_start_plug(&state->plug); 1910 state->free_reqs = 0; 1911 state->file = NULL; 1912 state->ios_left = max_ios; 1913} 1914 1915static void io_commit_sqring(struct io_ring_ctx *ctx) 1916{ 1917 struct io_sq_ring *ring = ctx->sq_ring; 1918 1919 if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) { 1920 /* 1921 * Ensure any loads from the SQEs are done at this point, 1922 * since once we write the new head, the application could 1923 * write new data to them. 1924 */ 1925 smp_store_release(&ring->r.head, ctx->cached_sq_head); 1926 } 1927} 1928 1929/* 1930 * Fetch an sqe, if one is available. Note that s->sqe will point to memory 1931 * that is mapped by userspace. This means that care needs to be taken to 1932 * ensure that reads are stable, as we cannot rely on userspace always 1933 * being a good citizen. If members of the sqe are validated and then later 1934 * used, it's important that those reads are done through READ_ONCE() to 1935 * prevent a re-load down the line. 1936 */ 1937static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) 1938{ 1939 struct io_sq_ring *ring = ctx->sq_ring; 1940 unsigned head; 1941 1942 /* 1943 * The cached sq head (or cq tail) serves two purposes: 1944 * 1945 * 1) allows us to batch the cost of updating the user visible 1946 * head updates. 1947 * 2) allows the kernel side to track the head on its own, even 1948 * though the application is the one updating it. 1949 */ 1950 head = ctx->cached_sq_head; 1951 /* make sure SQ entry isn't read before tail */ 1952 if (head == smp_load_acquire(&ring->r.tail)) 1953 return false; 1954 1955 head = READ_ONCE(ring->array[head & ctx->sq_mask]); 1956 if (head < ctx->sq_entries) { 1957 s->index = head; 1958 s->sqe = &ctx->sq_sqes[head]; 1959 ctx->cached_sq_head++; 1960 return true; 1961 } 1962 1963 /* drop invalid entries */ 1964 ctx->cached_sq_head++; 1965 ring->dropped++; 1966 return false; 1967} 1968 1969static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, 1970 unsigned int nr, bool has_user, bool mm_fault) 1971{ 1972 struct io_submit_state state, *statep = NULL; 1973 int ret, i, submitted = 0; 1974 1975 if (nr > IO_PLUG_THRESHOLD) { 1976 io_submit_state_start(&state, ctx, nr); 1977 statep = &state; 1978 } 1979 1980 for (i = 0; i < nr; i++) { 1981 if (unlikely(mm_fault)) { 1982 ret = -EFAULT; 1983 } else { 1984 sqes[i].has_user = has_user; 1985 sqes[i].needs_lock = true; 1986 sqes[i].needs_fixed_file = true; 1987 ret = io_submit_sqe(ctx, &sqes[i], statep); 1988 } 1989 if (!ret) { 1990 submitted++; 1991 continue; 1992 } 1993 1994 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret); 1995 } 1996 1997 if (statep) 1998 io_submit_state_end(&state); 1999 2000 return submitted; 2001} 2002 2003static int io_sq_thread(void *data) 2004{ 2005 struct sqe_submit sqes[IO_IOPOLL_BATCH]; 2006 struct io_ring_ctx *ctx = data; 2007 struct mm_struct *cur_mm = NULL; 2008 mm_segment_t old_fs; 2009 DEFINE_WAIT(wait); 2010 unsigned inflight; 2011 unsigned long timeout; 2012 2013 old_fs = get_fs(); 2014 set_fs(USER_DS); 2015 2016 timeout = inflight = 0; 2017 while (!kthread_should_park()) { 2018 bool all_fixed, mm_fault = false; 2019 int i; 2020 2021 if (inflight) { 2022 unsigned nr_events = 0; 2023 2024 if (ctx->flags & IORING_SETUP_IOPOLL) { 2025 /* 2026 * We disallow the app entering submit/complete 2027 * with polling, but we still need to lock the 2028 * ring to prevent racing with polled issue 2029 * that got punted to a workqueue. 2030 */ 2031 mutex_lock(&ctx->uring_lock); 2032 io_iopoll_check(ctx, &nr_events, 0); 2033 mutex_unlock(&ctx->uring_lock); 2034 } else { 2035 /* 2036 * Normal IO, just pretend everything completed. 2037 * We don't have to poll completions for that. 2038 */ 2039 nr_events = inflight; 2040 } 2041 2042 inflight -= nr_events; 2043 if (!inflight) 2044 timeout = jiffies + ctx->sq_thread_idle; 2045 } 2046 2047 if (!io_get_sqring(ctx, &sqes[0])) { 2048 /* 2049 * We're polling. If we're within the defined idle 2050 * period, then let us spin without work before going 2051 * to sleep. 2052 */ 2053 if (inflight || !time_after(jiffies, timeout)) { 2054 cpu_relax(); 2055 continue; 2056 } 2057 2058 /* 2059 * Drop cur_mm before scheduling, we can't hold it for 2060 * long periods (or over schedule()). Do this before 2061 * adding ourselves to the waitqueue, as the unuse/drop 2062 * may sleep. 2063 */ 2064 if (cur_mm) { 2065 unuse_mm(cur_mm); 2066 mmput(cur_mm); 2067 cur_mm = NULL; 2068 } 2069 2070 prepare_to_wait(&ctx->sqo_wait, &wait, 2071 TASK_INTERRUPTIBLE); 2072 2073 /* Tell userspace we may need a wakeup call */ 2074 ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP; 2075 /* make sure to read SQ tail after writing flags */ 2076 smp_mb(); 2077 2078 if (!io_get_sqring(ctx, &sqes[0])) { 2079 if (kthread_should_park()) { 2080 finish_wait(&ctx->sqo_wait, &wait); 2081 break; 2082 } 2083 if (signal_pending(current)) 2084 flush_signals(current); 2085 schedule(); 2086 finish_wait(&ctx->sqo_wait, &wait); 2087 2088 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; 2089 continue; 2090 } 2091 finish_wait(&ctx->sqo_wait, &wait); 2092 2093 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; 2094 } 2095 2096 i = 0; 2097 all_fixed = true; 2098 do { 2099 if (all_fixed && io_sqe_needs_user(sqes[i].sqe)) 2100 all_fixed = false; 2101 2102 i++; 2103 if (i == ARRAY_SIZE(sqes)) 2104 break; 2105 } while (io_get_sqring(ctx, &sqes[i])); 2106 2107 /* Unless all new commands are FIXED regions, grab mm */ 2108 if (!all_fixed && !cur_mm) { 2109 mm_fault = !mmget_not_zero(ctx->sqo_mm); 2110 if (!mm_fault) { 2111 use_mm(ctx->sqo_mm); 2112 cur_mm = ctx->sqo_mm; 2113 } 2114 } 2115 2116 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL, 2117 mm_fault); 2118 2119 /* Commit SQ ring head once we've consumed all SQEs */ 2120 io_commit_sqring(ctx); 2121 } 2122 2123 set_fs(old_fs); 2124 if (cur_mm) { 2125 unuse_mm(cur_mm); 2126 mmput(cur_mm); 2127 } 2128 2129 kthread_parkme(); 2130 2131 return 0; 2132} 2133 2134static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) 2135{ 2136 struct io_submit_state state, *statep = NULL; 2137 int i, submit = 0; 2138 2139 if (to_submit > IO_PLUG_THRESHOLD) { 2140 io_submit_state_start(&state, ctx, to_submit); 2141 statep = &state; 2142 } 2143 2144 for (i = 0; i < to_submit; i++) { 2145 struct sqe_submit s; 2146 int ret; 2147 2148 if (!io_get_sqring(ctx, &s)) 2149 break; 2150 2151 s.has_user = true; 2152 s.needs_lock = false; 2153 s.needs_fixed_file = false; 2154 submit++; 2155 2156 ret = io_submit_sqe(ctx, &s, statep); 2157 if (ret) 2158 io_cqring_add_event(ctx, s.sqe->user_data, ret); 2159 } 2160 io_commit_sqring(ctx); 2161 2162 if (statep) 2163 io_submit_state_end(statep); 2164 2165 return submit; 2166} 2167 2168static unsigned io_cqring_events(struct io_cq_ring *ring) 2169{ 2170 /* See comment at the top of this file */ 2171 smp_rmb(); 2172 return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head); 2173} 2174 2175/* 2176 * Wait until events become available, if we don't already have some. The 2177 * application must reap them itself, as they reside on the shared cq ring. 2178 */ 2179static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, 2180 const sigset_t __user *sig, size_t sigsz) 2181{ 2182 struct io_cq_ring *ring = ctx->cq_ring; 2183 sigset_t ksigmask, sigsaved; 2184 int ret; 2185 2186 if (io_cqring_events(ring) >= min_events) 2187 return 0; 2188 2189 if (sig) { 2190#ifdef CONFIG_COMPAT 2191 if (in_compat_syscall()) 2192 ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig, 2193 &ksigmask, &sigsaved, sigsz); 2194 else 2195#endif 2196 ret = set_user_sigmask(sig, &ksigmask, 2197 &sigsaved, sigsz); 2198 2199 if (ret) 2200 return ret; 2201 } 2202 2203 ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events); 2204 if (ret == -ERESTARTSYS) 2205 ret = -EINTR; 2206 2207 if (sig) 2208 restore_user_sigmask(sig, &sigsaved); 2209 2210 return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0; 2211} 2212 2213static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) 2214{ 2215#if defined(CONFIG_UNIX) 2216 if (ctx->ring_sock) { 2217 struct sock *sock = ctx->ring_sock->sk; 2218 struct sk_buff *skb; 2219 2220 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL) 2221 kfree_skb(skb); 2222 } 2223#else 2224 int i; 2225 2226 for (i = 0; i < ctx->nr_user_files; i++) 2227 fput(ctx->user_files[i]); 2228#endif 2229} 2230 2231static int io_sqe_files_unregister(struct io_ring_ctx *ctx) 2232{ 2233 if (!ctx->user_files) 2234 return -ENXIO; 2235 2236 __io_sqe_files_unregister(ctx); 2237 kfree(ctx->user_files); 2238 ctx->user_files = NULL; 2239 ctx->nr_user_files = 0; 2240 return 0; 2241} 2242 2243static void io_sq_thread_stop(struct io_ring_ctx *ctx) 2244{ 2245 if (ctx->sqo_thread) { 2246 /* 2247 * The park is a bit of a work-around, without it we get 2248 * warning spews on shutdown with SQPOLL set and affinity 2249 * set to a single CPU. 2250 */ 2251 kthread_park(ctx->sqo_thread); 2252 kthread_stop(ctx->sqo_thread); 2253 ctx->sqo_thread = NULL; 2254 } 2255} 2256 2257static void io_finish_async(struct io_ring_ctx *ctx) 2258{ 2259 io_sq_thread_stop(ctx); 2260 2261 if (ctx->sqo_wq) { 2262 destroy_workqueue(ctx->sqo_wq); 2263 ctx->sqo_wq = NULL; 2264 } 2265} 2266 2267#if defined(CONFIG_UNIX) 2268static void io_destruct_skb(struct sk_buff *skb) 2269{ 2270 struct io_ring_ctx *ctx = skb->sk->sk_user_data; 2271 2272 io_finish_async(ctx); 2273 unix_destruct_scm(skb); 2274} 2275 2276/* 2277 * Ensure the UNIX gc is aware of our file set, so we are certain that 2278 * the io_uring can be safely unregistered on process exit, even if we have 2279 * loops in the file referencing. 2280 */ 2281static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset) 2282{ 2283 struct sock *sk = ctx->ring_sock->sk; 2284 struct scm_fp_list *fpl; 2285 struct sk_buff *skb; 2286 int i; 2287 2288 if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) { 2289 unsigned long inflight = ctx->user->unix_inflight + nr; 2290 2291 if (inflight > task_rlimit(current, RLIMIT_NOFILE)) 2292 return -EMFILE; 2293 } 2294 2295 fpl = kzalloc(sizeof(*fpl), GFP_KERNEL); 2296 if (!fpl) 2297 return -ENOMEM; 2298 2299 skb = alloc_skb(0, GFP_KERNEL); 2300 if (!skb) { 2301 kfree(fpl); 2302 return -ENOMEM; 2303 } 2304 2305 skb->sk = sk; 2306 skb->destructor = io_destruct_skb; 2307 2308 fpl->user = get_uid(ctx->user); 2309 for (i = 0; i < nr; i++) { 2310 fpl->fp[i] = get_file(ctx->user_files[i + offset]); 2311 unix_inflight(fpl->user, fpl->fp[i]); 2312 } 2313 2314 fpl->max = fpl->count = nr; 2315 UNIXCB(skb).fp = fpl; 2316 refcount_add(skb->truesize, &sk->sk_wmem_alloc); 2317 skb_queue_head(&sk->sk_receive_queue, skb); 2318 2319 for (i = 0; i < nr; i++) 2320 fput(fpl->fp[i]); 2321 2322 return 0; 2323} 2324 2325/* 2326 * If UNIX sockets are enabled, fd passing can cause a reference cycle which 2327 * causes regular reference counting to break down. We rely on the UNIX 2328 * garbage collection to take care of this problem for us. 2329 */ 2330static int io_sqe_files_scm(struct io_ring_ctx *ctx) 2331{ 2332 unsigned left, total; 2333 int ret = 0; 2334 2335 total = 0; 2336 left = ctx->nr_user_files; 2337 while (left) { 2338 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD); 2339 2340 ret = __io_sqe_files_scm(ctx, this_files, total); 2341 if (ret) 2342 break; 2343 left -= this_files; 2344 total += this_files; 2345 } 2346 2347 if (!ret) 2348 return 0; 2349 2350 while (total < ctx->nr_user_files) { 2351 fput(ctx->user_files[total]); 2352 total++; 2353 } 2354 2355 return ret; 2356} 2357#else 2358static int io_sqe_files_scm(struct io_ring_ctx *ctx) 2359{ 2360 return 0; 2361} 2362#endif 2363 2364static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, 2365 unsigned nr_args) 2366{ 2367 __s32 __user *fds = (__s32 __user *) arg; 2368 int fd, ret = 0; 2369 unsigned i; 2370 2371 if (ctx->user_files) 2372 return -EBUSY; 2373 if (!nr_args) 2374 return -EINVAL; 2375 if (nr_args > IORING_MAX_FIXED_FILES) 2376 return -EMFILE; 2377 2378 ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL); 2379 if (!ctx->user_files) 2380 return -ENOMEM; 2381 2382 for (i = 0; i < nr_args; i++) { 2383 ret = -EFAULT; 2384 if (copy_from_user(&fd, &fds[i], sizeof(fd))) 2385 break; 2386 2387 ctx->user_files[i] = fget(fd); 2388 2389 ret = -EBADF; 2390 if (!ctx->user_files[i]) 2391 break; 2392 /* 2393 * Don't allow io_uring instances to be registered. If UNIX 2394 * isn't enabled, then this causes a reference cycle and this 2395 * instance can never get freed. If UNIX is enabled we'll 2396 * handle it just fine, but there's still no point in allowing 2397 * a ring fd as it doesn't support regular read/write anyway. 2398 */ 2399 if (ctx->user_files[i]->f_op == &io_uring_fops) { 2400 fput(ctx->user_files[i]); 2401 break; 2402 } 2403 ctx->nr_user_files++; 2404 ret = 0; 2405 } 2406 2407 if (ret) { 2408 for (i = 0; i < ctx->nr_user_files; i++) 2409 fput(ctx->user_files[i]); 2410 2411 kfree(ctx->user_files); 2412 ctx->user_files = NULL; 2413 ctx->nr_user_files = 0; 2414 return ret; 2415 } 2416 2417 ret = io_sqe_files_scm(ctx); 2418 if (ret) 2419 io_sqe_files_unregister(ctx); 2420 2421 return ret; 2422} 2423 2424static int io_sq_offload_start(struct io_ring_ctx *ctx, 2425 struct io_uring_params *p) 2426{ 2427 int ret; 2428 2429 init_waitqueue_head(&ctx->sqo_wait); 2430 mmgrab(current->mm); 2431 ctx->sqo_mm = current->mm; 2432 2433 if (ctx->flags & IORING_SETUP_SQPOLL) { 2434 ret = -EPERM; 2435 if (!capable(CAP_SYS_ADMIN)) 2436 goto err; 2437 2438 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 2439 if (!ctx->sq_thread_idle) 2440 ctx->sq_thread_idle = HZ; 2441 2442 if (p->flags & IORING_SETUP_SQ_AFF) { 2443 int cpu = p->sq_thread_cpu; 2444 2445 ret = -EINVAL; 2446 if (cpu >= nr_cpu_ids) 2447 goto err; 2448 if (!cpu_online(cpu)) 2449 goto err; 2450 2451 ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread, 2452 ctx, cpu, 2453 "io_uring-sq"); 2454 } else { 2455 ctx->sqo_thread = kthread_create(io_sq_thread, ctx, 2456 "io_uring-sq"); 2457 } 2458 if (IS_ERR(ctx->sqo_thread)) { 2459 ret = PTR_ERR(ctx->sqo_thread); 2460 ctx->sqo_thread = NULL; 2461 goto err; 2462 } 2463 wake_up_process(ctx->sqo_thread); 2464 } else if (p->flags & IORING_SETUP_SQ_AFF) { 2465 /* Can't have SQ_AFF without SQPOLL */ 2466 ret = -EINVAL; 2467 goto err; 2468 } 2469 2470 /* Do QD, or 2 * CPUS, whatever is smallest */ 2471 ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE, 2472 min(ctx->sq_entries - 1, 2 * num_online_cpus())); 2473 if (!ctx->sqo_wq) { 2474 ret = -ENOMEM; 2475 goto err; 2476 } 2477 2478 return 0; 2479err: 2480 io_sq_thread_stop(ctx); 2481 mmdrop(ctx->sqo_mm); 2482 ctx->sqo_mm = NULL; 2483 return ret; 2484} 2485 2486static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages) 2487{ 2488 atomic_long_sub(nr_pages, &user->locked_vm); 2489} 2490 2491static int io_account_mem(struct user_struct *user, unsigned long nr_pages) 2492{ 2493 unsigned long page_limit, cur_pages, new_pages; 2494 2495 /* Don't allow more pages than we can safely lock */ 2496 page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT; 2497 2498 do { 2499 cur_pages = atomic_long_read(&user->locked_vm); 2500 new_pages = cur_pages + nr_pages; 2501 if (new_pages > page_limit) 2502 return -ENOMEM; 2503 } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages, 2504 new_pages) != cur_pages); 2505 2506 return 0; 2507} 2508 2509static void io_mem_free(void *ptr) 2510{ 2511 struct page *page; 2512 2513 if (!ptr) 2514 return; 2515 2516 page = virt_to_head_page(ptr); 2517 if (put_page_testzero(page)) 2518 free_compound_page(page); 2519} 2520 2521static void *io_mem_alloc(size_t size) 2522{ 2523 gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP | 2524 __GFP_NORETRY; 2525 2526 return (void *) __get_free_pages(gfp_flags, get_order(size)); 2527} 2528 2529static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries) 2530{ 2531 struct io_sq_ring *sq_ring; 2532 struct io_cq_ring *cq_ring; 2533 size_t bytes; 2534 2535 bytes = struct_size(sq_ring, array, sq_entries); 2536 bytes += array_size(sizeof(struct io_uring_sqe), sq_entries); 2537 bytes += struct_size(cq_ring, cqes, cq_entries); 2538 2539 return (bytes + PAGE_SIZE - 1) / PAGE_SIZE; 2540} 2541 2542static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) 2543{ 2544 int i, j; 2545 2546 if (!ctx->user_bufs) 2547 return -ENXIO; 2548 2549 for (i = 0; i < ctx->nr_user_bufs; i++) { 2550 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 2551 2552 for (j = 0; j < imu->nr_bvecs; j++) 2553 put_page(imu->bvec[j].bv_page); 2554 2555 if (ctx->account_mem) 2556 io_unaccount_mem(ctx->user, imu->nr_bvecs); 2557 kvfree(imu->bvec); 2558 imu->nr_bvecs = 0; 2559 } 2560 2561 kfree(ctx->user_bufs); 2562 ctx->user_bufs = NULL; 2563 ctx->nr_user_bufs = 0; 2564 return 0; 2565} 2566 2567static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst, 2568 void __user *arg, unsigned index) 2569{ 2570 struct iovec __user *src; 2571 2572#ifdef CONFIG_COMPAT 2573 if (ctx->compat) { 2574 struct compat_iovec __user *ciovs; 2575 struct compat_iovec ciov; 2576 2577 ciovs = (struct compat_iovec __user *) arg; 2578 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov))) 2579 return -EFAULT; 2580 2581 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base; 2582 dst->iov_len = ciov.iov_len; 2583 return 0; 2584 } 2585#endif 2586 src = (struct iovec __user *) arg; 2587 if (copy_from_user(dst, &src[index], sizeof(*dst))) 2588 return -EFAULT; 2589 return 0; 2590} 2591 2592static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, 2593 unsigned nr_args) 2594{ 2595 struct vm_area_struct **vmas = NULL; 2596 struct page **pages = NULL; 2597 int i, j, got_pages = 0; 2598 int ret = -EINVAL; 2599 2600 if (ctx->user_bufs) 2601 return -EBUSY; 2602 if (!nr_args || nr_args > UIO_MAXIOV) 2603 return -EINVAL; 2604 2605 ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf), 2606 GFP_KERNEL); 2607 if (!ctx->user_bufs) 2608 return -ENOMEM; 2609 2610 for (i = 0; i < nr_args; i++) { 2611 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 2612 unsigned long off, start, end, ubuf; 2613 int pret, nr_pages; 2614 struct iovec iov; 2615 size_t size; 2616 2617 ret = io_copy_iov(ctx, &iov, arg, i); 2618 if (ret) 2619 goto err; 2620 2621 /* 2622 * Don't impose further limits on the size and buffer 2623 * constraints here, we'll -EINVAL later when IO is 2624 * submitted if they are wrong. 2625 */ 2626 ret = -EFAULT; 2627 if (!iov.iov_base || !iov.iov_len) 2628 goto err; 2629 2630 /* arbitrary limit, but we need something */ 2631 if (iov.iov_len > SZ_1G) 2632 goto err; 2633 2634 ubuf = (unsigned long) iov.iov_base; 2635 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT; 2636 start = ubuf >> PAGE_SHIFT; 2637 nr_pages = end - start; 2638 2639 if (ctx->account_mem) { 2640 ret = io_account_mem(ctx->user, nr_pages); 2641 if (ret) 2642 goto err; 2643 } 2644 2645 ret = 0; 2646 if (!pages || nr_pages > got_pages) { 2647 kfree(vmas); 2648 kfree(pages); 2649 pages = kvmalloc_array(nr_pages, sizeof(struct page *), 2650 GFP_KERNEL); 2651 vmas = kvmalloc_array(nr_pages, 2652 sizeof(struct vm_area_struct *), 2653 GFP_KERNEL); 2654 if (!pages || !vmas) { 2655 ret = -ENOMEM; 2656 if (ctx->account_mem) 2657 io_unaccount_mem(ctx->user, nr_pages); 2658 goto err; 2659 } 2660 got_pages = nr_pages; 2661 } 2662 2663 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec), 2664 GFP_KERNEL); 2665 ret = -ENOMEM; 2666 if (!imu->bvec) { 2667 if (ctx->account_mem) 2668 io_unaccount_mem(ctx->user, nr_pages); 2669 goto err; 2670 } 2671 2672 ret = 0; 2673 down_read(&current->mm->mmap_sem); 2674 pret = get_user_pages(ubuf, nr_pages, 2675 FOLL_WRITE | FOLL_LONGTERM, 2676 pages, vmas); 2677 if (pret == nr_pages) { 2678 /* don't support file backed memory */ 2679 for (j = 0; j < nr_pages; j++) { 2680 struct vm_area_struct *vma = vmas[j]; 2681 2682 if (vma->vm_file && 2683 !is_file_hugepages(vma->vm_file)) { 2684 ret = -EOPNOTSUPP; 2685 break; 2686 } 2687 } 2688 } else { 2689 ret = pret < 0 ? pret : -EFAULT; 2690 } 2691 up_read(&current->mm->mmap_sem); 2692 if (ret) { 2693 /* 2694 * if we did partial map, or found file backed vmas, 2695 * release any pages we did get 2696 */ 2697 if (pret > 0) { 2698 for (j = 0; j < pret; j++) 2699 put_page(pages[j]); 2700 } 2701 if (ctx->account_mem) 2702 io_unaccount_mem(ctx->user, nr_pages); 2703 kvfree(imu->bvec); 2704 goto err; 2705 } 2706 2707 off = ubuf & ~PAGE_MASK; 2708 size = iov.iov_len; 2709 for (j = 0; j < nr_pages; j++) { 2710 size_t vec_len; 2711 2712 vec_len = min_t(size_t, size, PAGE_SIZE - off); 2713 imu->bvec[j].bv_page = pages[j]; 2714 imu->bvec[j].bv_len = vec_len; 2715 imu->bvec[j].bv_offset = off; 2716 off = 0; 2717 size -= vec_len; 2718 } 2719 /* store original address for later verification */ 2720 imu->ubuf = ubuf; 2721 imu->len = iov.iov_len; 2722 imu->nr_bvecs = nr_pages; 2723 2724 ctx->nr_user_bufs++; 2725 } 2726 kvfree(pages); 2727 kvfree(vmas); 2728 return 0; 2729err: 2730 kvfree(pages); 2731 kvfree(vmas); 2732 io_sqe_buffer_unregister(ctx); 2733 return ret; 2734} 2735 2736static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg) 2737{ 2738 __s32 __user *fds = arg; 2739 int fd; 2740 2741 if (ctx->cq_ev_fd) 2742 return -EBUSY; 2743 2744 if (copy_from_user(&fd, fds, sizeof(*fds))) 2745 return -EFAULT; 2746 2747 ctx->cq_ev_fd = eventfd_ctx_fdget(fd); 2748 if (IS_ERR(ctx->cq_ev_fd)) { 2749 int ret = PTR_ERR(ctx->cq_ev_fd); 2750 ctx->cq_ev_fd = NULL; 2751 return ret; 2752 } 2753 2754 return 0; 2755} 2756 2757static int io_eventfd_unregister(struct io_ring_ctx *ctx) 2758{ 2759 if (ctx->cq_ev_fd) { 2760 eventfd_ctx_put(ctx->cq_ev_fd); 2761 ctx->cq_ev_fd = NULL; 2762 return 0; 2763 } 2764 2765 return -ENXIO; 2766} 2767 2768static void io_ring_ctx_free(struct io_ring_ctx *ctx) 2769{ 2770 io_finish_async(ctx); 2771 if (ctx->sqo_mm) 2772 mmdrop(ctx->sqo_mm); 2773 2774 io_iopoll_reap_events(ctx); 2775 io_sqe_buffer_unregister(ctx); 2776 io_sqe_files_unregister(ctx); 2777 io_eventfd_unregister(ctx); 2778 2779#if defined(CONFIG_UNIX) 2780 if (ctx->ring_sock) { 2781 ctx->ring_sock->file = NULL; /* so that iput() is called */ 2782 sock_release(ctx->ring_sock); 2783 } 2784#endif 2785 2786 io_mem_free(ctx->sq_ring); 2787 io_mem_free(ctx->sq_sqes); 2788 io_mem_free(ctx->cq_ring); 2789 2790 percpu_ref_exit(&ctx->refs); 2791 if (ctx->account_mem) 2792 io_unaccount_mem(ctx->user, 2793 ring_pages(ctx->sq_entries, ctx->cq_entries)); 2794 free_uid(ctx->user); 2795 kfree(ctx); 2796} 2797 2798static __poll_t io_uring_poll(struct file *file, poll_table *wait) 2799{ 2800 struct io_ring_ctx *ctx = file->private_data; 2801 __poll_t mask = 0; 2802 2803 poll_wait(file, &ctx->cq_wait, wait); 2804 /* 2805 * synchronizes with barrier from wq_has_sleeper call in 2806 * io_commit_cqring 2807 */ 2808 smp_rmb(); 2809 if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head != 2810 ctx->sq_ring->ring_entries) 2811 mask |= EPOLLOUT | EPOLLWRNORM; 2812 if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail) 2813 mask |= EPOLLIN | EPOLLRDNORM; 2814 2815 return mask; 2816} 2817 2818static int io_uring_fasync(int fd, struct file *file, int on) 2819{ 2820 struct io_ring_ctx *ctx = file->private_data; 2821 2822 return fasync_helper(fd, file, on, &ctx->cq_fasync); 2823} 2824 2825static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) 2826{ 2827 mutex_lock(&ctx->uring_lock); 2828 percpu_ref_kill(&ctx->refs); 2829 mutex_unlock(&ctx->uring_lock); 2830 2831 io_poll_remove_all(ctx); 2832 io_iopoll_reap_events(ctx); 2833 wait_for_completion(&ctx->ctx_done); 2834 io_ring_ctx_free(ctx); 2835} 2836 2837static int io_uring_release(struct inode *inode, struct file *file) 2838{ 2839 struct io_ring_ctx *ctx = file->private_data; 2840 2841 file->private_data = NULL; 2842 io_ring_ctx_wait_and_kill(ctx); 2843 return 0; 2844} 2845 2846static int io_uring_mmap(struct file *file, struct vm_area_struct *vma) 2847{ 2848 loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT; 2849 unsigned long sz = vma->vm_end - vma->vm_start; 2850 struct io_ring_ctx *ctx = file->private_data; 2851 unsigned long pfn; 2852 struct page *page; 2853 void *ptr; 2854 2855 switch (offset) { 2856 case IORING_OFF_SQ_RING: 2857 ptr = ctx->sq_ring; 2858 break; 2859 case IORING_OFF_SQES: 2860 ptr = ctx->sq_sqes; 2861 break; 2862 case IORING_OFF_CQ_RING: 2863 ptr = ctx->cq_ring; 2864 break; 2865 default: 2866 return -EINVAL; 2867 } 2868 2869 page = virt_to_head_page(ptr); 2870 if (sz > (PAGE_SIZE << compound_order(page))) 2871 return -EINVAL; 2872 2873 pfn = virt_to_phys(ptr) >> PAGE_SHIFT; 2874 return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot); 2875} 2876 2877SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, 2878 u32, min_complete, u32, flags, const sigset_t __user *, sig, 2879 size_t, sigsz) 2880{ 2881 struct io_ring_ctx *ctx; 2882 long ret = -EBADF; 2883 int submitted = 0; 2884 struct fd f; 2885 2886 if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP)) 2887 return -EINVAL; 2888 2889 f = fdget(fd); 2890 if (!f.file) 2891 return -EBADF; 2892 2893 ret = -EOPNOTSUPP; 2894 if (f.file->f_op != &io_uring_fops) 2895 goto out_fput; 2896 2897 ret = -ENXIO; 2898 ctx = f.file->private_data; 2899 if (!percpu_ref_tryget(&ctx->refs)) 2900 goto out_fput; 2901 2902 /* 2903 * For SQ polling, the thread will do all submissions and completions. 2904 * Just return the requested submit count, and wake the thread if 2905 * we were asked to. 2906 */ 2907 if (ctx->flags & IORING_SETUP_SQPOLL) { 2908 if (flags & IORING_ENTER_SQ_WAKEUP) 2909 wake_up(&ctx->sqo_wait); 2910 submitted = to_submit; 2911 goto out_ctx; 2912 } 2913 2914 ret = 0; 2915 if (to_submit) { 2916 to_submit = min(to_submit, ctx->sq_entries); 2917 2918 mutex_lock(&ctx->uring_lock); 2919 submitted = io_ring_submit(ctx, to_submit); 2920 mutex_unlock(&ctx->uring_lock); 2921 } 2922 if (flags & IORING_ENTER_GETEVENTS) { 2923 unsigned nr_events = 0; 2924 2925 min_complete = min(min_complete, ctx->cq_entries); 2926 2927 if (ctx->flags & IORING_SETUP_IOPOLL) { 2928 mutex_lock(&ctx->uring_lock); 2929 ret = io_iopoll_check(ctx, &nr_events, min_complete); 2930 mutex_unlock(&ctx->uring_lock); 2931 } else { 2932 ret = io_cqring_wait(ctx, min_complete, sig, sigsz); 2933 } 2934 } 2935 2936out_ctx: 2937 io_ring_drop_ctx_refs(ctx, 1); 2938out_fput: 2939 fdput(f); 2940 return submitted ? submitted : ret; 2941} 2942 2943static const struct file_operations io_uring_fops = { 2944 .release = io_uring_release, 2945 .mmap = io_uring_mmap, 2946 .poll = io_uring_poll, 2947 .fasync = io_uring_fasync, 2948}; 2949 2950static int io_allocate_scq_urings(struct io_ring_ctx *ctx, 2951 struct io_uring_params *p) 2952{ 2953 struct io_sq_ring *sq_ring; 2954 struct io_cq_ring *cq_ring; 2955 size_t size; 2956 2957 sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries)); 2958 if (!sq_ring) 2959 return -ENOMEM; 2960 2961 ctx->sq_ring = sq_ring; 2962 sq_ring->ring_mask = p->sq_entries - 1; 2963 sq_ring->ring_entries = p->sq_entries; 2964 ctx->sq_mask = sq_ring->ring_mask; 2965 ctx->sq_entries = sq_ring->ring_entries; 2966 2967 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); 2968 if (size == SIZE_MAX) 2969 return -EOVERFLOW; 2970 2971 ctx->sq_sqes = io_mem_alloc(size); 2972 if (!ctx->sq_sqes) 2973 return -ENOMEM; 2974 2975 cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries)); 2976 if (!cq_ring) 2977 return -ENOMEM; 2978 2979 ctx->cq_ring = cq_ring; 2980 cq_ring->ring_mask = p->cq_entries - 1; 2981 cq_ring->ring_entries = p->cq_entries; 2982 ctx->cq_mask = cq_ring->ring_mask; 2983 ctx->cq_entries = cq_ring->ring_entries; 2984 return 0; 2985} 2986 2987/* 2988 * Allocate an anonymous fd, this is what constitutes the application 2989 * visible backing of an io_uring instance. The application mmaps this 2990 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled, 2991 * we have to tie this fd to a socket for file garbage collection purposes. 2992 */ 2993static int io_uring_get_fd(struct io_ring_ctx *ctx) 2994{ 2995 struct file *file; 2996 int ret; 2997 2998#if defined(CONFIG_UNIX) 2999 ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP, 3000 &ctx->ring_sock); 3001 if (ret) 3002 return ret; 3003#endif 3004 3005 ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC); 3006 if (ret < 0) 3007 goto err; 3008 3009 file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx, 3010 O_RDWR | O_CLOEXEC); 3011 if (IS_ERR(file)) { 3012 put_unused_fd(ret); 3013 ret = PTR_ERR(file); 3014 goto err; 3015 } 3016 3017#if defined(CONFIG_UNIX) 3018 ctx->ring_sock->file = file; 3019 ctx->ring_sock->sk->sk_user_data = ctx; 3020#endif 3021 fd_install(ret, file); 3022 return ret; 3023err: 3024#if defined(CONFIG_UNIX) 3025 sock_release(ctx->ring_sock); 3026 ctx->ring_sock = NULL; 3027#endif 3028 return ret; 3029} 3030 3031static int io_uring_create(unsigned entries, struct io_uring_params *p) 3032{ 3033 struct user_struct *user = NULL; 3034 struct io_ring_ctx *ctx; 3035 bool account_mem; 3036 int ret; 3037 3038 if (!entries || entries > IORING_MAX_ENTRIES) 3039 return -EINVAL; 3040 3041 /* 3042 * Use twice as many entries for the CQ ring. It's possible for the 3043 * application to drive a higher depth than the size of the SQ ring, 3044 * since the sqes are only used at submission time. This allows for 3045 * some flexibility in overcommitting a bit. 3046 */ 3047 p->sq_entries = roundup_pow_of_two(entries); 3048 p->cq_entries = 2 * p->sq_entries; 3049 3050 user = get_uid(current_user()); 3051 account_mem = !capable(CAP_IPC_LOCK); 3052 3053 if (account_mem) { 3054 ret = io_account_mem(user, 3055 ring_pages(p->sq_entries, p->cq_entries)); 3056 if (ret) { 3057 free_uid(user); 3058 return ret; 3059 } 3060 } 3061 3062 ctx = io_ring_ctx_alloc(p); 3063 if (!ctx) { 3064 if (account_mem) 3065 io_unaccount_mem(user, ring_pages(p->sq_entries, 3066 p->cq_entries)); 3067 free_uid(user); 3068 return -ENOMEM; 3069 } 3070 ctx->compat = in_compat_syscall(); 3071 ctx->account_mem = account_mem; 3072 ctx->user = user; 3073 3074 ret = io_allocate_scq_urings(ctx, p); 3075 if (ret) 3076 goto err; 3077 3078 ret = io_sq_offload_start(ctx, p); 3079 if (ret) 3080 goto err; 3081 3082 ret = io_uring_get_fd(ctx); 3083 if (ret < 0) 3084 goto err; 3085 3086 memset(&p->sq_off, 0, sizeof(p->sq_off)); 3087 p->sq_off.head = offsetof(struct io_sq_ring, r.head); 3088 p->sq_off.tail = offsetof(struct io_sq_ring, r.tail); 3089 p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask); 3090 p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries); 3091 p->sq_off.flags = offsetof(struct io_sq_ring, flags); 3092 p->sq_off.dropped = offsetof(struct io_sq_ring, dropped); 3093 p->sq_off.array = offsetof(struct io_sq_ring, array); 3094 3095 memset(&p->cq_off, 0, sizeof(p->cq_off)); 3096 p->cq_off.head = offsetof(struct io_cq_ring, r.head); 3097 p->cq_off.tail = offsetof(struct io_cq_ring, r.tail); 3098 p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask); 3099 p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries); 3100 p->cq_off.overflow = offsetof(struct io_cq_ring, overflow); 3101 p->cq_off.cqes = offsetof(struct io_cq_ring, cqes); 3102 return ret; 3103err: 3104 io_ring_ctx_wait_and_kill(ctx); 3105 return ret; 3106} 3107 3108/* 3109 * Sets up an aio uring context, and returns the fd. Applications asks for a 3110 * ring size, we return the actual sq/cq ring sizes (among other things) in the 3111 * params structure passed in. 3112 */ 3113static long io_uring_setup(u32 entries, struct io_uring_params __user *params) 3114{ 3115 struct io_uring_params p; 3116 long ret; 3117 int i; 3118 3119 if (copy_from_user(&p, params, sizeof(p))) 3120 return -EFAULT; 3121 for (i = 0; i < ARRAY_SIZE(p.resv); i++) { 3122 if (p.resv[i]) 3123 return -EINVAL; 3124 } 3125 3126 if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | 3127 IORING_SETUP_SQ_AFF)) 3128 return -EINVAL; 3129 3130 ret = io_uring_create(entries, &p); 3131 if (ret < 0) 3132 return ret; 3133 3134 if (copy_to_user(params, &p, sizeof(p))) 3135 return -EFAULT; 3136 3137 return ret; 3138} 3139 3140SYSCALL_DEFINE2(io_uring_setup, u32, entries, 3141 struct io_uring_params __user *, params) 3142{ 3143 return io_uring_setup(entries, params); 3144} 3145 3146static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, 3147 void __user *arg, unsigned nr_args) 3148 __releases(ctx->uring_lock) 3149 __acquires(ctx->uring_lock) 3150{ 3151 int ret; 3152 3153 /* 3154 * We're inside the ring mutex, if the ref is already dying, then 3155 * someone else killed the ctx or is already going through 3156 * io_uring_register(). 3157 */ 3158 if (percpu_ref_is_dying(&ctx->refs)) 3159 return -ENXIO; 3160 3161 percpu_ref_kill(&ctx->refs); 3162 3163 /* 3164 * Drop uring mutex before waiting for references to exit. If another 3165 * thread is currently inside io_uring_enter() it might need to grab 3166 * the uring_lock to make progress. If we hold it here across the drain 3167 * wait, then we can deadlock. It's safe to drop the mutex here, since 3168 * no new references will come in after we've killed the percpu ref. 3169 */ 3170 mutex_unlock(&ctx->uring_lock); 3171 wait_for_completion(&ctx->ctx_done); 3172 mutex_lock(&ctx->uring_lock); 3173 3174 switch (opcode) { 3175 case IORING_REGISTER_BUFFERS: 3176 ret = io_sqe_buffer_register(ctx, arg, nr_args); 3177 break; 3178 case IORING_UNREGISTER_BUFFERS: 3179 ret = -EINVAL; 3180 if (arg || nr_args) 3181 break; 3182 ret = io_sqe_buffer_unregister(ctx); 3183 break; 3184 case IORING_REGISTER_FILES: 3185 ret = io_sqe_files_register(ctx, arg, nr_args); 3186 break; 3187 case IORING_UNREGISTER_FILES: 3188 ret = -EINVAL; 3189 if (arg || nr_args) 3190 break; 3191 ret = io_sqe_files_unregister(ctx); 3192 break; 3193 case IORING_REGISTER_EVENTFD: 3194 ret = -EINVAL; 3195 if (nr_args != 1) 3196 break; 3197 ret = io_eventfd_register(ctx, arg); 3198 break; 3199 case IORING_UNREGISTER_EVENTFD: 3200 ret = -EINVAL; 3201 if (arg || nr_args) 3202 break; 3203 ret = io_eventfd_unregister(ctx); 3204 break; 3205 default: 3206 ret = -EINVAL; 3207 break; 3208 } 3209 3210 /* bring the ctx back to life */ 3211 reinit_completion(&ctx->ctx_done); 3212 percpu_ref_reinit(&ctx->refs); 3213 return ret; 3214} 3215 3216SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, 3217 void __user *, arg, unsigned int, nr_args) 3218{ 3219 struct io_ring_ctx *ctx; 3220 long ret = -EBADF; 3221 struct fd f; 3222 3223 f = fdget(fd); 3224 if (!f.file) 3225 return -EBADF; 3226 3227 ret = -EOPNOTSUPP; 3228 if (f.file->f_op != &io_uring_fops) 3229 goto out_fput; 3230 3231 ctx = f.file->private_data; 3232 3233 mutex_lock(&ctx->uring_lock); 3234 ret = __io_uring_register(ctx, opcode, arg, nr_args); 3235 mutex_unlock(&ctx->uring_lock); 3236out_fput: 3237 fdput(f); 3238 return ret; 3239} 3240 3241static int __init io_uring_init(void) 3242{ 3243 req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC); 3244 return 0; 3245}; 3246__initcall(io_uring_init);