Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v5.4-rc3 3942 lines 96 kB view raw
1// SPDX-License-Identifier: GPL-2.0 2/* 3 * Shared application/kernel submission and completion ring pairs, for 4 * supporting fast/efficient IO. 5 * 6 * A note on the read/write ordering memory barriers that are matched between 7 * the application and kernel side. 8 * 9 * After the application reads the CQ ring tail, it must use an 10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses 11 * before writing the tail (using smp_load_acquire to read the tail will 12 * do). It also needs a smp_mb() before updating CQ head (ordering the 13 * entry load(s) with the head store), pairing with an implicit barrier 14 * through a control-dependency in io_get_cqring (smp_store_release to 15 * store head will do). Failure to do so could lead to reading invalid 16 * CQ entries. 17 * 18 * Likewise, the application must use an appropriate smp_wmb() before 19 * writing the SQ tail (ordering SQ entry stores with the tail store), 20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release 21 * to store the tail will do). And it needs a barrier ordering the SQ 22 * head load before writing new SQ entries (smp_load_acquire to read 23 * head will do). 24 * 25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application 26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after* 27 * updating the SQ tail; a full memory barrier smp_mb() is needed 28 * between. 29 * 30 * Also see the examples in the liburing library: 31 * 32 * git://git.kernel.dk/liburing 33 * 34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens 35 * from data shared between the kernel and application. This is done both 36 * for ordering purposes, but also to ensure that once a value is loaded from 37 * data that the application could potentially modify, it remains stable. 38 * 39 * Copyright (C) 2018-2019 Jens Axboe 40 * Copyright (c) 2018-2019 Christoph Hellwig 41 */ 42#include <linux/kernel.h> 43#include <linux/init.h> 44#include <linux/errno.h> 45#include <linux/syscalls.h> 46#include <linux/compat.h> 47#include <linux/refcount.h> 48#include <linux/uio.h> 49 50#include <linux/sched/signal.h> 51#include <linux/fs.h> 52#include <linux/file.h> 53#include <linux/fdtable.h> 54#include <linux/mm.h> 55#include <linux/mman.h> 56#include <linux/mmu_context.h> 57#include <linux/percpu.h> 58#include <linux/slab.h> 59#include <linux/workqueue.h> 60#include <linux/kthread.h> 61#include <linux/blkdev.h> 62#include <linux/bvec.h> 63#include <linux/net.h> 64#include <net/sock.h> 65#include <net/af_unix.h> 66#include <net/scm.h> 67#include <linux/anon_inodes.h> 68#include <linux/sched/mm.h> 69#include <linux/uaccess.h> 70#include <linux/nospec.h> 71#include <linux/sizes.h> 72#include <linux/hugetlb.h> 73 74#include <uapi/linux/io_uring.h> 75 76#include "internal.h" 77 78#define IORING_MAX_ENTRIES 32768 79#define IORING_MAX_FIXED_FILES 1024 80 81struct io_uring { 82 u32 head ____cacheline_aligned_in_smp; 83 u32 tail ____cacheline_aligned_in_smp; 84}; 85 86/* 87 * This data is shared with the application through the mmap at offsets 88 * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING. 89 * 90 * The offsets to the member fields are published through struct 91 * io_sqring_offsets when calling io_uring_setup. 92 */ 93struct io_rings { 94 /* 95 * Head and tail offsets into the ring; the offsets need to be 96 * masked to get valid indices. 97 * 98 * The kernel controls head of the sq ring and the tail of the cq ring, 99 * and the application controls tail of the sq ring and the head of the 100 * cq ring. 101 */ 102 struct io_uring sq, cq; 103 /* 104 * Bitmasks to apply to head and tail offsets (constant, equals 105 * ring_entries - 1) 106 */ 107 u32 sq_ring_mask, cq_ring_mask; 108 /* Ring sizes (constant, power of 2) */ 109 u32 sq_ring_entries, cq_ring_entries; 110 /* 111 * Number of invalid entries dropped by the kernel due to 112 * invalid index stored in array 113 * 114 * Written by the kernel, shouldn't be modified by the 115 * application (i.e. get number of "new events" by comparing to 116 * cached value). 117 * 118 * After a new SQ head value was read by the application this 119 * counter includes all submissions that were dropped reaching 120 * the new SQ head (and possibly more). 121 */ 122 u32 sq_dropped; 123 /* 124 * Runtime flags 125 * 126 * Written by the kernel, shouldn't be modified by the 127 * application. 128 * 129 * The application needs a full memory barrier before checking 130 * for IORING_SQ_NEED_WAKEUP after updating the sq tail. 131 */ 132 u32 sq_flags; 133 /* 134 * Number of completion events lost because the queue was full; 135 * this should be avoided by the application by making sure 136 * there are not more requests pending thatn there is space in 137 * the completion queue. 138 * 139 * Written by the kernel, shouldn't be modified by the 140 * application (i.e. get number of "new events" by comparing to 141 * cached value). 142 * 143 * As completion events come in out of order this counter is not 144 * ordered with any other data. 145 */ 146 u32 cq_overflow; 147 /* 148 * Ring buffer of completion events. 149 * 150 * The kernel writes completion events fresh every time they are 151 * produced, so the application is allowed to modify pending 152 * entries. 153 */ 154 struct io_uring_cqe cqes[] ____cacheline_aligned_in_smp; 155}; 156 157struct io_mapped_ubuf { 158 u64 ubuf; 159 size_t len; 160 struct bio_vec *bvec; 161 unsigned int nr_bvecs; 162}; 163 164struct async_list { 165 spinlock_t lock; 166 atomic_t cnt; 167 struct list_head list; 168 169 struct file *file; 170 off_t io_start; 171 size_t io_len; 172}; 173 174struct io_ring_ctx { 175 struct { 176 struct percpu_ref refs; 177 } ____cacheline_aligned_in_smp; 178 179 struct { 180 unsigned int flags; 181 bool compat; 182 bool account_mem; 183 184 /* 185 * Ring buffer of indices into array of io_uring_sqe, which is 186 * mmapped by the application using the IORING_OFF_SQES offset. 187 * 188 * This indirection could e.g. be used to assign fixed 189 * io_uring_sqe entries to operations and only submit them to 190 * the queue when needed. 191 * 192 * The kernel modifies neither the indices array nor the entries 193 * array. 194 */ 195 u32 *sq_array; 196 unsigned cached_sq_head; 197 unsigned sq_entries; 198 unsigned sq_mask; 199 unsigned sq_thread_idle; 200 struct io_uring_sqe *sq_sqes; 201 202 struct list_head defer_list; 203 struct list_head timeout_list; 204 } ____cacheline_aligned_in_smp; 205 206 /* IO offload */ 207 struct workqueue_struct *sqo_wq[2]; 208 struct task_struct *sqo_thread; /* if using sq thread polling */ 209 struct mm_struct *sqo_mm; 210 wait_queue_head_t sqo_wait; 211 struct completion sqo_thread_started; 212 213 struct { 214 unsigned cached_cq_tail; 215 unsigned cq_entries; 216 unsigned cq_mask; 217 struct wait_queue_head cq_wait; 218 struct fasync_struct *cq_fasync; 219 struct eventfd_ctx *cq_ev_fd; 220 atomic_t cq_timeouts; 221 } ____cacheline_aligned_in_smp; 222 223 struct io_rings *rings; 224 225 /* 226 * If used, fixed file set. Writers must ensure that ->refs is dead, 227 * readers must ensure that ->refs is alive as long as the file* is 228 * used. Only updated through io_uring_register(2). 229 */ 230 struct file **user_files; 231 unsigned nr_user_files; 232 233 /* if used, fixed mapped user buffers */ 234 unsigned nr_user_bufs; 235 struct io_mapped_ubuf *user_bufs; 236 237 struct user_struct *user; 238 239 struct completion ctx_done; 240 241 struct { 242 struct mutex uring_lock; 243 wait_queue_head_t wait; 244 } ____cacheline_aligned_in_smp; 245 246 struct { 247 spinlock_t completion_lock; 248 bool poll_multi_file; 249 /* 250 * ->poll_list is protected by the ctx->uring_lock for 251 * io_uring instances that don't use IORING_SETUP_SQPOLL. 252 * For SQPOLL, only the single threaded io_sq_thread() will 253 * manipulate the list, hence no extra locking is needed there. 254 */ 255 struct list_head poll_list; 256 struct list_head cancel_list; 257 } ____cacheline_aligned_in_smp; 258 259 struct async_list pending_async[2]; 260 261#if defined(CONFIG_UNIX) 262 struct socket *ring_sock; 263#endif 264}; 265 266struct sqe_submit { 267 const struct io_uring_sqe *sqe; 268 unsigned short index; 269 u32 sequence; 270 bool has_user; 271 bool needs_lock; 272 bool needs_fixed_file; 273}; 274 275/* 276 * First field must be the file pointer in all the 277 * iocb unions! See also 'struct kiocb' in <linux/fs.h> 278 */ 279struct io_poll_iocb { 280 struct file *file; 281 struct wait_queue_head *head; 282 __poll_t events; 283 bool done; 284 bool canceled; 285 struct wait_queue_entry wait; 286}; 287 288struct io_timeout { 289 struct file *file; 290 struct hrtimer timer; 291}; 292 293/* 294 * NOTE! Each of the iocb union members has the file pointer 295 * as the first entry in their struct definition. So you can 296 * access the file pointer through any of the sub-structs, 297 * or directly as just 'ki_filp' in this struct. 298 */ 299struct io_kiocb { 300 union { 301 struct file *file; 302 struct kiocb rw; 303 struct io_poll_iocb poll; 304 struct io_timeout timeout; 305 }; 306 307 struct sqe_submit submit; 308 309 struct io_ring_ctx *ctx; 310 struct list_head list; 311 struct list_head link_list; 312 unsigned int flags; 313 refcount_t refs; 314#define REQ_F_NOWAIT 1 /* must not punt to workers */ 315#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */ 316#define REQ_F_FIXED_FILE 4 /* ctx owns file */ 317#define REQ_F_SEQ_PREV 8 /* sequential with previous */ 318#define REQ_F_IO_DRAIN 16 /* drain existing IO first */ 319#define REQ_F_IO_DRAINED 32 /* drain done */ 320#define REQ_F_LINK 64 /* linked sqes */ 321#define REQ_F_LINK_DONE 128 /* linked sqes done */ 322#define REQ_F_FAIL_LINK 256 /* fail rest of links */ 323#define REQ_F_SHADOW_DRAIN 512 /* link-drain shadow req */ 324#define REQ_F_TIMEOUT 1024 /* timeout request */ 325 u64 user_data; 326 u32 result; 327 u32 sequence; 328 329 struct work_struct work; 330}; 331 332#define IO_PLUG_THRESHOLD 2 333#define IO_IOPOLL_BATCH 8 334 335struct io_submit_state { 336 struct blk_plug plug; 337 338 /* 339 * io_kiocb alloc cache 340 */ 341 void *reqs[IO_IOPOLL_BATCH]; 342 unsigned int free_reqs; 343 unsigned int cur_req; 344 345 /* 346 * File reference cache 347 */ 348 struct file *file; 349 unsigned int fd; 350 unsigned int has_refs; 351 unsigned int used_refs; 352 unsigned int ios_left; 353}; 354 355static void io_sq_wq_submit_work(struct work_struct *work); 356static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, 357 long res); 358static void __io_free_req(struct io_kiocb *req); 359 360static struct kmem_cache *req_cachep; 361 362static const struct file_operations io_uring_fops; 363 364struct sock *io_uring_get_socket(struct file *file) 365{ 366#if defined(CONFIG_UNIX) 367 if (file->f_op == &io_uring_fops) { 368 struct io_ring_ctx *ctx = file->private_data; 369 370 return ctx->ring_sock->sk; 371 } 372#endif 373 return NULL; 374} 375EXPORT_SYMBOL(io_uring_get_socket); 376 377static void io_ring_ctx_ref_free(struct percpu_ref *ref) 378{ 379 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); 380 381 complete(&ctx->ctx_done); 382} 383 384static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) 385{ 386 struct io_ring_ctx *ctx; 387 int i; 388 389 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 390 if (!ctx) 391 return NULL; 392 393 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 394 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) { 395 kfree(ctx); 396 return NULL; 397 } 398 399 ctx->flags = p->flags; 400 init_waitqueue_head(&ctx->cq_wait); 401 init_completion(&ctx->ctx_done); 402 init_completion(&ctx->sqo_thread_started); 403 mutex_init(&ctx->uring_lock); 404 init_waitqueue_head(&ctx->wait); 405 for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) { 406 spin_lock_init(&ctx->pending_async[i].lock); 407 INIT_LIST_HEAD(&ctx->pending_async[i].list); 408 atomic_set(&ctx->pending_async[i].cnt, 0); 409 } 410 spin_lock_init(&ctx->completion_lock); 411 INIT_LIST_HEAD(&ctx->poll_list); 412 INIT_LIST_HEAD(&ctx->cancel_list); 413 INIT_LIST_HEAD(&ctx->defer_list); 414 INIT_LIST_HEAD(&ctx->timeout_list); 415 return ctx; 416} 417 418static inline bool __io_sequence_defer(struct io_ring_ctx *ctx, 419 struct io_kiocb *req) 420{ 421 return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped; 422} 423 424static inline bool io_sequence_defer(struct io_ring_ctx *ctx, 425 struct io_kiocb *req) 426{ 427 if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN) 428 return false; 429 430 return __io_sequence_defer(ctx, req); 431} 432 433static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) 434{ 435 struct io_kiocb *req; 436 437 req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list); 438 if (req && !io_sequence_defer(ctx, req)) { 439 list_del_init(&req->list); 440 return req; 441 } 442 443 return NULL; 444} 445 446static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx) 447{ 448 struct io_kiocb *req; 449 450 req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list); 451 if (req && !__io_sequence_defer(ctx, req)) { 452 list_del_init(&req->list); 453 return req; 454 } 455 456 return NULL; 457} 458 459static void __io_commit_cqring(struct io_ring_ctx *ctx) 460{ 461 struct io_rings *rings = ctx->rings; 462 463 if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) { 464 /* order cqe stores with ring update */ 465 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail); 466 467 if (wq_has_sleeper(&ctx->cq_wait)) { 468 wake_up_interruptible(&ctx->cq_wait); 469 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); 470 } 471 } 472} 473 474static inline void io_queue_async_work(struct io_ring_ctx *ctx, 475 struct io_kiocb *req) 476{ 477 int rw = 0; 478 479 if (req->submit.sqe) { 480 switch (req->submit.sqe->opcode) { 481 case IORING_OP_WRITEV: 482 case IORING_OP_WRITE_FIXED: 483 rw = !(req->rw.ki_flags & IOCB_DIRECT); 484 break; 485 } 486 } 487 488 queue_work(ctx->sqo_wq[rw], &req->work); 489} 490 491static void io_kill_timeout(struct io_kiocb *req) 492{ 493 int ret; 494 495 ret = hrtimer_try_to_cancel(&req->timeout.timer); 496 if (ret != -1) { 497 atomic_inc(&req->ctx->cq_timeouts); 498 list_del(&req->list); 499 io_cqring_fill_event(req->ctx, req->user_data, 0); 500 __io_free_req(req); 501 } 502} 503 504static void io_kill_timeouts(struct io_ring_ctx *ctx) 505{ 506 struct io_kiocb *req, *tmp; 507 508 spin_lock_irq(&ctx->completion_lock); 509 list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list) 510 io_kill_timeout(req); 511 spin_unlock_irq(&ctx->completion_lock); 512} 513 514static void io_commit_cqring(struct io_ring_ctx *ctx) 515{ 516 struct io_kiocb *req; 517 518 while ((req = io_get_timeout_req(ctx)) != NULL) 519 io_kill_timeout(req); 520 521 __io_commit_cqring(ctx); 522 523 while ((req = io_get_deferred_req(ctx)) != NULL) { 524 if (req->flags & REQ_F_SHADOW_DRAIN) { 525 /* Just for drain, free it. */ 526 __io_free_req(req); 527 continue; 528 } 529 req->flags |= REQ_F_IO_DRAINED; 530 io_queue_async_work(ctx, req); 531 } 532} 533 534static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) 535{ 536 struct io_rings *rings = ctx->rings; 537 unsigned tail; 538 539 tail = ctx->cached_cq_tail; 540 /* 541 * writes to the cq entry need to come after reading head; the 542 * control dependency is enough as we're using WRITE_ONCE to 543 * fill the cq entry 544 */ 545 if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries) 546 return NULL; 547 548 ctx->cached_cq_tail++; 549 return &rings->cqes[tail & ctx->cq_mask]; 550} 551 552static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, 553 long res) 554{ 555 struct io_uring_cqe *cqe; 556 557 /* 558 * If we can't get a cq entry, userspace overflowed the 559 * submission (by quite a lot). Increment the overflow count in 560 * the ring. 561 */ 562 cqe = io_get_cqring(ctx); 563 if (cqe) { 564 WRITE_ONCE(cqe->user_data, ki_user_data); 565 WRITE_ONCE(cqe->res, res); 566 WRITE_ONCE(cqe->flags, 0); 567 } else { 568 unsigned overflow = READ_ONCE(ctx->rings->cq_overflow); 569 570 WRITE_ONCE(ctx->rings->cq_overflow, overflow + 1); 571 } 572} 573 574static void io_cqring_ev_posted(struct io_ring_ctx *ctx) 575{ 576 if (waitqueue_active(&ctx->wait)) 577 wake_up(&ctx->wait); 578 if (waitqueue_active(&ctx->sqo_wait)) 579 wake_up(&ctx->sqo_wait); 580 if (ctx->cq_ev_fd) 581 eventfd_signal(ctx->cq_ev_fd, 1); 582} 583 584static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data, 585 long res) 586{ 587 unsigned long flags; 588 589 spin_lock_irqsave(&ctx->completion_lock, flags); 590 io_cqring_fill_event(ctx, user_data, res); 591 io_commit_cqring(ctx); 592 spin_unlock_irqrestore(&ctx->completion_lock, flags); 593 594 io_cqring_ev_posted(ctx); 595} 596 597static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, 598 struct io_submit_state *state) 599{ 600 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN; 601 struct io_kiocb *req; 602 603 if (!percpu_ref_tryget(&ctx->refs)) 604 return NULL; 605 606 if (!state) { 607 req = kmem_cache_alloc(req_cachep, gfp); 608 if (unlikely(!req)) 609 goto out; 610 } else if (!state->free_reqs) { 611 size_t sz; 612 int ret; 613 614 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs)); 615 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs); 616 617 /* 618 * Bulk alloc is all-or-nothing. If we fail to get a batch, 619 * retry single alloc to be on the safe side. 620 */ 621 if (unlikely(ret <= 0)) { 622 state->reqs[0] = kmem_cache_alloc(req_cachep, gfp); 623 if (!state->reqs[0]) 624 goto out; 625 ret = 1; 626 } 627 state->free_reqs = ret - 1; 628 state->cur_req = 1; 629 req = state->reqs[0]; 630 } else { 631 req = state->reqs[state->cur_req]; 632 state->free_reqs--; 633 state->cur_req++; 634 } 635 636 req->file = NULL; 637 req->ctx = ctx; 638 req->flags = 0; 639 /* one is dropped after submission, the other at completion */ 640 refcount_set(&req->refs, 2); 641 req->result = 0; 642 return req; 643out: 644 percpu_ref_put(&ctx->refs); 645 return NULL; 646} 647 648static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr) 649{ 650 if (*nr) { 651 kmem_cache_free_bulk(req_cachep, *nr, reqs); 652 percpu_ref_put_many(&ctx->refs, *nr); 653 *nr = 0; 654 } 655} 656 657static void __io_free_req(struct io_kiocb *req) 658{ 659 if (req->file && !(req->flags & REQ_F_FIXED_FILE)) 660 fput(req->file); 661 percpu_ref_put(&req->ctx->refs); 662 kmem_cache_free(req_cachep, req); 663} 664 665static void io_req_link_next(struct io_kiocb *req) 666{ 667 struct io_kiocb *nxt; 668 669 /* 670 * The list should never be empty when we are called here. But could 671 * potentially happen if the chain is messed up, check to be on the 672 * safe side. 673 */ 674 nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list); 675 if (nxt) { 676 list_del(&nxt->list); 677 if (!list_empty(&req->link_list)) { 678 INIT_LIST_HEAD(&nxt->link_list); 679 list_splice(&req->link_list, &nxt->link_list); 680 nxt->flags |= REQ_F_LINK; 681 } 682 683 nxt->flags |= REQ_F_LINK_DONE; 684 INIT_WORK(&nxt->work, io_sq_wq_submit_work); 685 io_queue_async_work(req->ctx, nxt); 686 } 687} 688 689/* 690 * Called if REQ_F_LINK is set, and we fail the head request 691 */ 692static void io_fail_links(struct io_kiocb *req) 693{ 694 struct io_kiocb *link; 695 696 while (!list_empty(&req->link_list)) { 697 link = list_first_entry(&req->link_list, struct io_kiocb, list); 698 list_del(&link->list); 699 700 io_cqring_add_event(req->ctx, link->user_data, -ECANCELED); 701 __io_free_req(link); 702 } 703} 704 705static void io_free_req(struct io_kiocb *req) 706{ 707 /* 708 * If LINK is set, we have dependent requests in this chain. If we 709 * didn't fail this request, queue the first one up, moving any other 710 * dependencies to the next request. In case of failure, fail the rest 711 * of the chain. 712 */ 713 if (req->flags & REQ_F_LINK) { 714 if (req->flags & REQ_F_FAIL_LINK) 715 io_fail_links(req); 716 else 717 io_req_link_next(req); 718 } 719 720 __io_free_req(req); 721} 722 723static void io_put_req(struct io_kiocb *req) 724{ 725 if (refcount_dec_and_test(&req->refs)) 726 io_free_req(req); 727} 728 729static unsigned io_cqring_events(struct io_rings *rings) 730{ 731 /* See comment at the top of this file */ 732 smp_rmb(); 733 return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head); 734} 735 736/* 737 * Find and free completed poll iocbs 738 */ 739static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, 740 struct list_head *done) 741{ 742 void *reqs[IO_IOPOLL_BATCH]; 743 struct io_kiocb *req; 744 int to_free; 745 746 to_free = 0; 747 while (!list_empty(done)) { 748 req = list_first_entry(done, struct io_kiocb, list); 749 list_del(&req->list); 750 751 io_cqring_fill_event(ctx, req->user_data, req->result); 752 (*nr_events)++; 753 754 if (refcount_dec_and_test(&req->refs)) { 755 /* If we're not using fixed files, we have to pair the 756 * completion part with the file put. Use regular 757 * completions for those, only batch free for fixed 758 * file and non-linked commands. 759 */ 760 if ((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == 761 REQ_F_FIXED_FILE) { 762 reqs[to_free++] = req; 763 if (to_free == ARRAY_SIZE(reqs)) 764 io_free_req_many(ctx, reqs, &to_free); 765 } else { 766 io_free_req(req); 767 } 768 } 769 } 770 771 io_commit_cqring(ctx); 772 io_free_req_many(ctx, reqs, &to_free); 773} 774 775static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, 776 long min) 777{ 778 struct io_kiocb *req, *tmp; 779 LIST_HEAD(done); 780 bool spin; 781 int ret; 782 783 /* 784 * Only spin for completions if we don't have multiple devices hanging 785 * off our complete list, and we're under the requested amount. 786 */ 787 spin = !ctx->poll_multi_file && *nr_events < min; 788 789 ret = 0; 790 list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) { 791 struct kiocb *kiocb = &req->rw; 792 793 /* 794 * Move completed entries to our local list. If we find a 795 * request that requires polling, break out and complete 796 * the done list first, if we have entries there. 797 */ 798 if (req->flags & REQ_F_IOPOLL_COMPLETED) { 799 list_move_tail(&req->list, &done); 800 continue; 801 } 802 if (!list_empty(&done)) 803 break; 804 805 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin); 806 if (ret < 0) 807 break; 808 809 if (ret && spin) 810 spin = false; 811 ret = 0; 812 } 813 814 if (!list_empty(&done)) 815 io_iopoll_complete(ctx, nr_events, &done); 816 817 return ret; 818} 819 820/* 821 * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a 822 * non-spinning poll check - we'll still enter the driver poll loop, but only 823 * as a non-spinning completion check. 824 */ 825static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events, 826 long min) 827{ 828 while (!list_empty(&ctx->poll_list) && !need_resched()) { 829 int ret; 830 831 ret = io_do_iopoll(ctx, nr_events, min); 832 if (ret < 0) 833 return ret; 834 if (!min || *nr_events >= min) 835 return 0; 836 } 837 838 return 1; 839} 840 841/* 842 * We can't just wait for polled events to come to us, we have to actively 843 * find and complete them. 844 */ 845static void io_iopoll_reap_events(struct io_ring_ctx *ctx) 846{ 847 if (!(ctx->flags & IORING_SETUP_IOPOLL)) 848 return; 849 850 mutex_lock(&ctx->uring_lock); 851 while (!list_empty(&ctx->poll_list)) { 852 unsigned int nr_events = 0; 853 854 io_iopoll_getevents(ctx, &nr_events, 1); 855 856 /* 857 * Ensure we allow local-to-the-cpu processing to take place, 858 * in this case we need to ensure that we reap all events. 859 */ 860 cond_resched(); 861 } 862 mutex_unlock(&ctx->uring_lock); 863} 864 865static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, 866 long min) 867{ 868 int iters, ret = 0; 869 870 /* 871 * We disallow the app entering submit/complete with polling, but we 872 * still need to lock the ring to prevent racing with polled issue 873 * that got punted to a workqueue. 874 */ 875 mutex_lock(&ctx->uring_lock); 876 877 iters = 0; 878 do { 879 int tmin = 0; 880 881 /* 882 * Don't enter poll loop if we already have events pending. 883 * If we do, we can potentially be spinning for commands that 884 * already triggered a CQE (eg in error). 885 */ 886 if (io_cqring_events(ctx->rings)) 887 break; 888 889 /* 890 * If a submit got punted to a workqueue, we can have the 891 * application entering polling for a command before it gets 892 * issued. That app will hold the uring_lock for the duration 893 * of the poll right here, so we need to take a breather every 894 * now and then to ensure that the issue has a chance to add 895 * the poll to the issued list. Otherwise we can spin here 896 * forever, while the workqueue is stuck trying to acquire the 897 * very same mutex. 898 */ 899 if (!(++iters & 7)) { 900 mutex_unlock(&ctx->uring_lock); 901 mutex_lock(&ctx->uring_lock); 902 } 903 904 if (*nr_events < min) 905 tmin = min - *nr_events; 906 907 ret = io_iopoll_getevents(ctx, nr_events, tmin); 908 if (ret <= 0) 909 break; 910 ret = 0; 911 } while (min && !*nr_events && !need_resched()); 912 913 mutex_unlock(&ctx->uring_lock); 914 return ret; 915} 916 917static void kiocb_end_write(struct kiocb *kiocb) 918{ 919 if (kiocb->ki_flags & IOCB_WRITE) { 920 struct inode *inode = file_inode(kiocb->ki_filp); 921 922 /* 923 * Tell lockdep we inherited freeze protection from submission 924 * thread. 925 */ 926 if (S_ISREG(inode->i_mode)) 927 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE); 928 file_end_write(kiocb->ki_filp); 929 } 930} 931 932static void io_complete_rw(struct kiocb *kiocb, long res, long res2) 933{ 934 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 935 936 kiocb_end_write(kiocb); 937 938 if ((req->flags & REQ_F_LINK) && res != req->result) 939 req->flags |= REQ_F_FAIL_LINK; 940 io_cqring_add_event(req->ctx, req->user_data, res); 941 io_put_req(req); 942} 943 944static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) 945{ 946 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 947 948 kiocb_end_write(kiocb); 949 950 if ((req->flags & REQ_F_LINK) && res != req->result) 951 req->flags |= REQ_F_FAIL_LINK; 952 req->result = res; 953 if (res != -EAGAIN) 954 req->flags |= REQ_F_IOPOLL_COMPLETED; 955} 956 957/* 958 * After the iocb has been issued, it's safe to be found on the poll list. 959 * Adding the kiocb to the list AFTER submission ensures that we don't 960 * find it from a io_iopoll_getevents() thread before the issuer is done 961 * accessing the kiocb cookie. 962 */ 963static void io_iopoll_req_issued(struct io_kiocb *req) 964{ 965 struct io_ring_ctx *ctx = req->ctx; 966 967 /* 968 * Track whether we have multiple files in our lists. This will impact 969 * how we do polling eventually, not spinning if we're on potentially 970 * different devices. 971 */ 972 if (list_empty(&ctx->poll_list)) { 973 ctx->poll_multi_file = false; 974 } else if (!ctx->poll_multi_file) { 975 struct io_kiocb *list_req; 976 977 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb, 978 list); 979 if (list_req->rw.ki_filp != req->rw.ki_filp) 980 ctx->poll_multi_file = true; 981 } 982 983 /* 984 * For fast devices, IO may have already completed. If it has, add 985 * it to the front so we find it first. 986 */ 987 if (req->flags & REQ_F_IOPOLL_COMPLETED) 988 list_add(&req->list, &ctx->poll_list); 989 else 990 list_add_tail(&req->list, &ctx->poll_list); 991} 992 993static void io_file_put(struct io_submit_state *state) 994{ 995 if (state->file) { 996 int diff = state->has_refs - state->used_refs; 997 998 if (diff) 999 fput_many(state->file, diff); 1000 state->file = NULL; 1001 } 1002} 1003 1004/* 1005 * Get as many references to a file as we have IOs left in this submission, 1006 * assuming most submissions are for one file, or at least that each file 1007 * has more than one submission. 1008 */ 1009static struct file *io_file_get(struct io_submit_state *state, int fd) 1010{ 1011 if (!state) 1012 return fget(fd); 1013 1014 if (state->file) { 1015 if (state->fd == fd) { 1016 state->used_refs++; 1017 state->ios_left--; 1018 return state->file; 1019 } 1020 io_file_put(state); 1021 } 1022 state->file = fget_many(fd, state->ios_left); 1023 if (!state->file) 1024 return NULL; 1025 1026 state->fd = fd; 1027 state->has_refs = state->ios_left; 1028 state->used_refs = 1; 1029 state->ios_left--; 1030 return state->file; 1031} 1032 1033/* 1034 * If we tracked the file through the SCM inflight mechanism, we could support 1035 * any file. For now, just ensure that anything potentially problematic is done 1036 * inline. 1037 */ 1038static bool io_file_supports_async(struct file *file) 1039{ 1040 umode_t mode = file_inode(file)->i_mode; 1041 1042 if (S_ISBLK(mode) || S_ISCHR(mode)) 1043 return true; 1044 if (S_ISREG(mode) && file->f_op != &io_uring_fops) 1045 return true; 1046 1047 return false; 1048} 1049 1050static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, 1051 bool force_nonblock) 1052{ 1053 const struct io_uring_sqe *sqe = s->sqe; 1054 struct io_ring_ctx *ctx = req->ctx; 1055 struct kiocb *kiocb = &req->rw; 1056 unsigned ioprio; 1057 int ret; 1058 1059 if (!req->file) 1060 return -EBADF; 1061 1062 if (force_nonblock && !io_file_supports_async(req->file)) 1063 force_nonblock = false; 1064 1065 kiocb->ki_pos = READ_ONCE(sqe->off); 1066 kiocb->ki_flags = iocb_flags(kiocb->ki_filp); 1067 kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp)); 1068 1069 ioprio = READ_ONCE(sqe->ioprio); 1070 if (ioprio) { 1071 ret = ioprio_check_cap(ioprio); 1072 if (ret) 1073 return ret; 1074 1075 kiocb->ki_ioprio = ioprio; 1076 } else 1077 kiocb->ki_ioprio = get_current_ioprio(); 1078 1079 ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags)); 1080 if (unlikely(ret)) 1081 return ret; 1082 1083 /* don't allow async punt if RWF_NOWAIT was requested */ 1084 if (kiocb->ki_flags & IOCB_NOWAIT) 1085 req->flags |= REQ_F_NOWAIT; 1086 1087 if (force_nonblock) 1088 kiocb->ki_flags |= IOCB_NOWAIT; 1089 1090 if (ctx->flags & IORING_SETUP_IOPOLL) { 1091 if (!(kiocb->ki_flags & IOCB_DIRECT) || 1092 !kiocb->ki_filp->f_op->iopoll) 1093 return -EOPNOTSUPP; 1094 1095 kiocb->ki_flags |= IOCB_HIPRI; 1096 kiocb->ki_complete = io_complete_rw_iopoll; 1097 } else { 1098 if (kiocb->ki_flags & IOCB_HIPRI) 1099 return -EINVAL; 1100 kiocb->ki_complete = io_complete_rw; 1101 } 1102 return 0; 1103} 1104 1105static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) 1106{ 1107 switch (ret) { 1108 case -EIOCBQUEUED: 1109 break; 1110 case -ERESTARTSYS: 1111 case -ERESTARTNOINTR: 1112 case -ERESTARTNOHAND: 1113 case -ERESTART_RESTARTBLOCK: 1114 /* 1115 * We can't just restart the syscall, since previously 1116 * submitted sqes may already be in progress. Just fail this 1117 * IO with EINTR. 1118 */ 1119 ret = -EINTR; 1120 /* fall through */ 1121 default: 1122 kiocb->ki_complete(kiocb, ret, 0); 1123 } 1124} 1125 1126static int io_import_fixed(struct io_ring_ctx *ctx, int rw, 1127 const struct io_uring_sqe *sqe, 1128 struct iov_iter *iter) 1129{ 1130 size_t len = READ_ONCE(sqe->len); 1131 struct io_mapped_ubuf *imu; 1132 unsigned index, buf_index; 1133 size_t offset; 1134 u64 buf_addr; 1135 1136 /* attempt to use fixed buffers without having provided iovecs */ 1137 if (unlikely(!ctx->user_bufs)) 1138 return -EFAULT; 1139 1140 buf_index = READ_ONCE(sqe->buf_index); 1141 if (unlikely(buf_index >= ctx->nr_user_bufs)) 1142 return -EFAULT; 1143 1144 index = array_index_nospec(buf_index, ctx->nr_user_bufs); 1145 imu = &ctx->user_bufs[index]; 1146 buf_addr = READ_ONCE(sqe->addr); 1147 1148 /* overflow */ 1149 if (buf_addr + len < buf_addr) 1150 return -EFAULT; 1151 /* not inside the mapped region */ 1152 if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len) 1153 return -EFAULT; 1154 1155 /* 1156 * May not be a start of buffer, set size appropriately 1157 * and advance us to the beginning. 1158 */ 1159 offset = buf_addr - imu->ubuf; 1160 iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len); 1161 1162 if (offset) { 1163 /* 1164 * Don't use iov_iter_advance() here, as it's really slow for 1165 * using the latter parts of a big fixed buffer - it iterates 1166 * over each segment manually. We can cheat a bit here, because 1167 * we know that: 1168 * 1169 * 1) it's a BVEC iter, we set it up 1170 * 2) all bvecs are PAGE_SIZE in size, except potentially the 1171 * first and last bvec 1172 * 1173 * So just find our index, and adjust the iterator afterwards. 1174 * If the offset is within the first bvec (or the whole first 1175 * bvec, just use iov_iter_advance(). This makes it easier 1176 * since we can just skip the first segment, which may not 1177 * be PAGE_SIZE aligned. 1178 */ 1179 const struct bio_vec *bvec = imu->bvec; 1180 1181 if (offset <= bvec->bv_len) { 1182 iov_iter_advance(iter, offset); 1183 } else { 1184 unsigned long seg_skip; 1185 1186 /* skip first vec */ 1187 offset -= bvec->bv_len; 1188 seg_skip = 1 + (offset >> PAGE_SHIFT); 1189 1190 iter->bvec = bvec + seg_skip; 1191 iter->nr_segs -= seg_skip; 1192 iter->count -= bvec->bv_len + offset; 1193 iter->iov_offset = offset & ~PAGE_MASK; 1194 } 1195 } 1196 1197 return 0; 1198} 1199 1200static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw, 1201 const struct sqe_submit *s, struct iovec **iovec, 1202 struct iov_iter *iter) 1203{ 1204 const struct io_uring_sqe *sqe = s->sqe; 1205 void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr)); 1206 size_t sqe_len = READ_ONCE(sqe->len); 1207 u8 opcode; 1208 1209 /* 1210 * We're reading ->opcode for the second time, but the first read 1211 * doesn't care whether it's _FIXED or not, so it doesn't matter 1212 * whether ->opcode changes concurrently. The first read does care 1213 * about whether it is a READ or a WRITE, so we don't trust this read 1214 * for that purpose and instead let the caller pass in the read/write 1215 * flag. 1216 */ 1217 opcode = READ_ONCE(sqe->opcode); 1218 if (opcode == IORING_OP_READ_FIXED || 1219 opcode == IORING_OP_WRITE_FIXED) { 1220 ssize_t ret = io_import_fixed(ctx, rw, sqe, iter); 1221 *iovec = NULL; 1222 return ret; 1223 } 1224 1225 if (!s->has_user) 1226 return -EFAULT; 1227 1228#ifdef CONFIG_COMPAT 1229 if (ctx->compat) 1230 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV, 1231 iovec, iter); 1232#endif 1233 1234 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); 1235} 1236 1237static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb) 1238{ 1239 if (al->file == kiocb->ki_filp) { 1240 off_t start, end; 1241 1242 /* 1243 * Allow merging if we're anywhere in the range of the same 1244 * page. Generally this happens for sub-page reads or writes, 1245 * and it's beneficial to allow the first worker to bring the 1246 * page in and the piggy backed work can then work on the 1247 * cached page. 1248 */ 1249 start = al->io_start & PAGE_MASK; 1250 end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK; 1251 if (kiocb->ki_pos >= start && kiocb->ki_pos <= end) 1252 return true; 1253 } 1254 1255 al->file = NULL; 1256 return false; 1257} 1258 1259/* 1260 * Make a note of the last file/offset/direction we punted to async 1261 * context. We'll use this information to see if we can piggy back a 1262 * sequential request onto the previous one, if it's still hasn't been 1263 * completed by the async worker. 1264 */ 1265static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) 1266{ 1267 struct async_list *async_list = &req->ctx->pending_async[rw]; 1268 struct kiocb *kiocb = &req->rw; 1269 struct file *filp = kiocb->ki_filp; 1270 1271 if (io_should_merge(async_list, kiocb)) { 1272 unsigned long max_bytes; 1273 1274 /* Use 8x RA size as a decent limiter for both reads/writes */ 1275 max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3); 1276 if (!max_bytes) 1277 max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3); 1278 1279 /* If max len are exceeded, reset the state */ 1280 if (async_list->io_len + len <= max_bytes) { 1281 req->flags |= REQ_F_SEQ_PREV; 1282 async_list->io_len += len; 1283 } else { 1284 async_list->file = NULL; 1285 } 1286 } 1287 1288 /* New file? Reset state. */ 1289 if (async_list->file != filp) { 1290 async_list->io_start = kiocb->ki_pos; 1291 async_list->io_len = len; 1292 async_list->file = filp; 1293 } 1294} 1295 1296/* 1297 * For files that don't have ->read_iter() and ->write_iter(), handle them 1298 * by looping over ->read() or ->write() manually. 1299 */ 1300static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb, 1301 struct iov_iter *iter) 1302{ 1303 ssize_t ret = 0; 1304 1305 /* 1306 * Don't support polled IO through this interface, and we can't 1307 * support non-blocking either. For the latter, this just causes 1308 * the kiocb to be handled from an async context. 1309 */ 1310 if (kiocb->ki_flags & IOCB_HIPRI) 1311 return -EOPNOTSUPP; 1312 if (kiocb->ki_flags & IOCB_NOWAIT) 1313 return -EAGAIN; 1314 1315 while (iov_iter_count(iter)) { 1316 struct iovec iovec = iov_iter_iovec(iter); 1317 ssize_t nr; 1318 1319 if (rw == READ) { 1320 nr = file->f_op->read(file, iovec.iov_base, 1321 iovec.iov_len, &kiocb->ki_pos); 1322 } else { 1323 nr = file->f_op->write(file, iovec.iov_base, 1324 iovec.iov_len, &kiocb->ki_pos); 1325 } 1326 1327 if (nr < 0) { 1328 if (!ret) 1329 ret = nr; 1330 break; 1331 } 1332 ret += nr; 1333 if (nr != iovec.iov_len) 1334 break; 1335 iov_iter_advance(iter, nr); 1336 } 1337 1338 return ret; 1339} 1340 1341static int io_read(struct io_kiocb *req, const struct sqe_submit *s, 1342 bool force_nonblock) 1343{ 1344 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1345 struct kiocb *kiocb = &req->rw; 1346 struct iov_iter iter; 1347 struct file *file; 1348 size_t iov_count; 1349 ssize_t read_size, ret; 1350 1351 ret = io_prep_rw(req, s, force_nonblock); 1352 if (ret) 1353 return ret; 1354 file = kiocb->ki_filp; 1355 1356 if (unlikely(!(file->f_mode & FMODE_READ))) 1357 return -EBADF; 1358 1359 ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter); 1360 if (ret < 0) 1361 return ret; 1362 1363 read_size = ret; 1364 if (req->flags & REQ_F_LINK) 1365 req->result = read_size; 1366 1367 iov_count = iov_iter_count(&iter); 1368 ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count); 1369 if (!ret) { 1370 ssize_t ret2; 1371 1372 if (file->f_op->read_iter) 1373 ret2 = call_read_iter(file, kiocb, &iter); 1374 else 1375 ret2 = loop_rw_iter(READ, file, kiocb, &iter); 1376 1377 /* 1378 * In case of a short read, punt to async. This can happen 1379 * if we have data partially cached. Alternatively we can 1380 * return the short read, in which case the application will 1381 * need to issue another SQE and wait for it. That SQE will 1382 * need async punt anyway, so it's more efficient to do it 1383 * here. 1384 */ 1385 if (force_nonblock && ret2 > 0 && ret2 < read_size) 1386 ret2 = -EAGAIN; 1387 /* Catch -EAGAIN return for forced non-blocking submission */ 1388 if (!force_nonblock || ret2 != -EAGAIN) { 1389 io_rw_done(kiocb, ret2); 1390 } else { 1391 /* 1392 * If ->needs_lock is true, we're already in async 1393 * context. 1394 */ 1395 if (!s->needs_lock) 1396 io_async_list_note(READ, req, iov_count); 1397 ret = -EAGAIN; 1398 } 1399 } 1400 kfree(iovec); 1401 return ret; 1402} 1403 1404static int io_write(struct io_kiocb *req, const struct sqe_submit *s, 1405 bool force_nonblock) 1406{ 1407 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1408 struct kiocb *kiocb = &req->rw; 1409 struct iov_iter iter; 1410 struct file *file; 1411 size_t iov_count; 1412 ssize_t ret; 1413 1414 ret = io_prep_rw(req, s, force_nonblock); 1415 if (ret) 1416 return ret; 1417 1418 file = kiocb->ki_filp; 1419 if (unlikely(!(file->f_mode & FMODE_WRITE))) 1420 return -EBADF; 1421 1422 ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter); 1423 if (ret < 0) 1424 return ret; 1425 1426 if (req->flags & REQ_F_LINK) 1427 req->result = ret; 1428 1429 iov_count = iov_iter_count(&iter); 1430 1431 ret = -EAGAIN; 1432 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) { 1433 /* If ->needs_lock is true, we're already in async context. */ 1434 if (!s->needs_lock) 1435 io_async_list_note(WRITE, req, iov_count); 1436 goto out_free; 1437 } 1438 1439 ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count); 1440 if (!ret) { 1441 ssize_t ret2; 1442 1443 /* 1444 * Open-code file_start_write here to grab freeze protection, 1445 * which will be released by another thread in 1446 * io_complete_rw(). Fool lockdep by telling it the lock got 1447 * released so that it doesn't complain about the held lock when 1448 * we return to userspace. 1449 */ 1450 if (S_ISREG(file_inode(file)->i_mode)) { 1451 __sb_start_write(file_inode(file)->i_sb, 1452 SB_FREEZE_WRITE, true); 1453 __sb_writers_release(file_inode(file)->i_sb, 1454 SB_FREEZE_WRITE); 1455 } 1456 kiocb->ki_flags |= IOCB_WRITE; 1457 1458 if (file->f_op->write_iter) 1459 ret2 = call_write_iter(file, kiocb, &iter); 1460 else 1461 ret2 = loop_rw_iter(WRITE, file, kiocb, &iter); 1462 if (!force_nonblock || ret2 != -EAGAIN) { 1463 io_rw_done(kiocb, ret2); 1464 } else { 1465 /* 1466 * If ->needs_lock is true, we're already in async 1467 * context. 1468 */ 1469 if (!s->needs_lock) 1470 io_async_list_note(WRITE, req, iov_count); 1471 ret = -EAGAIN; 1472 } 1473 } 1474out_free: 1475 kfree(iovec); 1476 return ret; 1477} 1478 1479/* 1480 * IORING_OP_NOP just posts a completion event, nothing else. 1481 */ 1482static int io_nop(struct io_kiocb *req, u64 user_data) 1483{ 1484 struct io_ring_ctx *ctx = req->ctx; 1485 long err = 0; 1486 1487 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1488 return -EINVAL; 1489 1490 io_cqring_add_event(ctx, user_data, err); 1491 io_put_req(req); 1492 return 0; 1493} 1494 1495static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1496{ 1497 struct io_ring_ctx *ctx = req->ctx; 1498 1499 if (!req->file) 1500 return -EBADF; 1501 1502 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1503 return -EINVAL; 1504 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 1505 return -EINVAL; 1506 1507 return 0; 1508} 1509 1510static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1511 bool force_nonblock) 1512{ 1513 loff_t sqe_off = READ_ONCE(sqe->off); 1514 loff_t sqe_len = READ_ONCE(sqe->len); 1515 loff_t end = sqe_off + sqe_len; 1516 unsigned fsync_flags; 1517 int ret; 1518 1519 fsync_flags = READ_ONCE(sqe->fsync_flags); 1520 if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC)) 1521 return -EINVAL; 1522 1523 ret = io_prep_fsync(req, sqe); 1524 if (ret) 1525 return ret; 1526 1527 /* fsync always requires a blocking context */ 1528 if (force_nonblock) 1529 return -EAGAIN; 1530 1531 ret = vfs_fsync_range(req->rw.ki_filp, sqe_off, 1532 end > 0 ? end : LLONG_MAX, 1533 fsync_flags & IORING_FSYNC_DATASYNC); 1534 1535 if (ret < 0 && (req->flags & REQ_F_LINK)) 1536 req->flags |= REQ_F_FAIL_LINK; 1537 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1538 io_put_req(req); 1539 return 0; 1540} 1541 1542static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1543{ 1544 struct io_ring_ctx *ctx = req->ctx; 1545 int ret = 0; 1546 1547 if (!req->file) 1548 return -EBADF; 1549 1550 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1551 return -EINVAL; 1552 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 1553 return -EINVAL; 1554 1555 return ret; 1556} 1557 1558static int io_sync_file_range(struct io_kiocb *req, 1559 const struct io_uring_sqe *sqe, 1560 bool force_nonblock) 1561{ 1562 loff_t sqe_off; 1563 loff_t sqe_len; 1564 unsigned flags; 1565 int ret; 1566 1567 ret = io_prep_sfr(req, sqe); 1568 if (ret) 1569 return ret; 1570 1571 /* sync_file_range always requires a blocking context */ 1572 if (force_nonblock) 1573 return -EAGAIN; 1574 1575 sqe_off = READ_ONCE(sqe->off); 1576 sqe_len = READ_ONCE(sqe->len); 1577 flags = READ_ONCE(sqe->sync_range_flags); 1578 1579 ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags); 1580 1581 if (ret < 0 && (req->flags & REQ_F_LINK)) 1582 req->flags |= REQ_F_FAIL_LINK; 1583 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1584 io_put_req(req); 1585 return 0; 1586} 1587 1588#if defined(CONFIG_NET) 1589static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1590 bool force_nonblock, 1591 long (*fn)(struct socket *, struct user_msghdr __user *, 1592 unsigned int)) 1593{ 1594 struct socket *sock; 1595 int ret; 1596 1597 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1598 return -EINVAL; 1599 1600 sock = sock_from_file(req->file, &ret); 1601 if (sock) { 1602 struct user_msghdr __user *msg; 1603 unsigned flags; 1604 1605 flags = READ_ONCE(sqe->msg_flags); 1606 if (flags & MSG_DONTWAIT) 1607 req->flags |= REQ_F_NOWAIT; 1608 else if (force_nonblock) 1609 flags |= MSG_DONTWAIT; 1610 1611 msg = (struct user_msghdr __user *) (unsigned long) 1612 READ_ONCE(sqe->addr); 1613 1614 ret = fn(sock, msg, flags); 1615 if (force_nonblock && ret == -EAGAIN) 1616 return ret; 1617 } 1618 1619 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1620 io_put_req(req); 1621 return 0; 1622} 1623#endif 1624 1625static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1626 bool force_nonblock) 1627{ 1628#if defined(CONFIG_NET) 1629 return io_send_recvmsg(req, sqe, force_nonblock, __sys_sendmsg_sock); 1630#else 1631 return -EOPNOTSUPP; 1632#endif 1633} 1634 1635static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe, 1636 bool force_nonblock) 1637{ 1638#if defined(CONFIG_NET) 1639 return io_send_recvmsg(req, sqe, force_nonblock, __sys_recvmsg_sock); 1640#else 1641 return -EOPNOTSUPP; 1642#endif 1643} 1644 1645static void io_poll_remove_one(struct io_kiocb *req) 1646{ 1647 struct io_poll_iocb *poll = &req->poll; 1648 1649 spin_lock(&poll->head->lock); 1650 WRITE_ONCE(poll->canceled, true); 1651 if (!list_empty(&poll->wait.entry)) { 1652 list_del_init(&poll->wait.entry); 1653 io_queue_async_work(req->ctx, req); 1654 } 1655 spin_unlock(&poll->head->lock); 1656 1657 list_del_init(&req->list); 1658} 1659 1660static void io_poll_remove_all(struct io_ring_ctx *ctx) 1661{ 1662 struct io_kiocb *req; 1663 1664 spin_lock_irq(&ctx->completion_lock); 1665 while (!list_empty(&ctx->cancel_list)) { 1666 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list); 1667 io_poll_remove_one(req); 1668 } 1669 spin_unlock_irq(&ctx->completion_lock); 1670} 1671 1672/* 1673 * Find a running poll command that matches one specified in sqe->addr, 1674 * and remove it if found. 1675 */ 1676static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1677{ 1678 struct io_ring_ctx *ctx = req->ctx; 1679 struct io_kiocb *poll_req, *next; 1680 int ret = -ENOENT; 1681 1682 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1683 return -EINVAL; 1684 if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index || 1685 sqe->poll_events) 1686 return -EINVAL; 1687 1688 spin_lock_irq(&ctx->completion_lock); 1689 list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) { 1690 if (READ_ONCE(sqe->addr) == poll_req->user_data) { 1691 io_poll_remove_one(poll_req); 1692 ret = 0; 1693 break; 1694 } 1695 } 1696 spin_unlock_irq(&ctx->completion_lock); 1697 1698 io_cqring_add_event(req->ctx, sqe->user_data, ret); 1699 io_put_req(req); 1700 return 0; 1701} 1702 1703static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req, 1704 __poll_t mask) 1705{ 1706 req->poll.done = true; 1707 io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask)); 1708 io_commit_cqring(ctx); 1709} 1710 1711static void io_poll_complete_work(struct work_struct *work) 1712{ 1713 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1714 struct io_poll_iocb *poll = &req->poll; 1715 struct poll_table_struct pt = { ._key = poll->events }; 1716 struct io_ring_ctx *ctx = req->ctx; 1717 __poll_t mask = 0; 1718 1719 if (!READ_ONCE(poll->canceled)) 1720 mask = vfs_poll(poll->file, &pt) & poll->events; 1721 1722 /* 1723 * Note that ->ki_cancel callers also delete iocb from active_reqs after 1724 * calling ->ki_cancel. We need the ctx_lock roundtrip here to 1725 * synchronize with them. In the cancellation case the list_del_init 1726 * itself is not actually needed, but harmless so we keep it in to 1727 * avoid further branches in the fast path. 1728 */ 1729 spin_lock_irq(&ctx->completion_lock); 1730 if (!mask && !READ_ONCE(poll->canceled)) { 1731 add_wait_queue(poll->head, &poll->wait); 1732 spin_unlock_irq(&ctx->completion_lock); 1733 return; 1734 } 1735 list_del_init(&req->list); 1736 io_poll_complete(ctx, req, mask); 1737 spin_unlock_irq(&ctx->completion_lock); 1738 1739 io_cqring_ev_posted(ctx); 1740 io_put_req(req); 1741} 1742 1743static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, 1744 void *key) 1745{ 1746 struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb, 1747 wait); 1748 struct io_kiocb *req = container_of(poll, struct io_kiocb, poll); 1749 struct io_ring_ctx *ctx = req->ctx; 1750 __poll_t mask = key_to_poll(key); 1751 unsigned long flags; 1752 1753 /* for instances that support it check for an event match first: */ 1754 if (mask && !(mask & poll->events)) 1755 return 0; 1756 1757 list_del_init(&poll->wait.entry); 1758 1759 if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) { 1760 list_del(&req->list); 1761 io_poll_complete(ctx, req, mask); 1762 spin_unlock_irqrestore(&ctx->completion_lock, flags); 1763 1764 io_cqring_ev_posted(ctx); 1765 io_put_req(req); 1766 } else { 1767 io_queue_async_work(ctx, req); 1768 } 1769 1770 return 1; 1771} 1772 1773struct io_poll_table { 1774 struct poll_table_struct pt; 1775 struct io_kiocb *req; 1776 int error; 1777}; 1778 1779static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, 1780 struct poll_table_struct *p) 1781{ 1782 struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); 1783 1784 if (unlikely(pt->req->poll.head)) { 1785 pt->error = -EINVAL; 1786 return; 1787 } 1788 1789 pt->error = 0; 1790 pt->req->poll.head = head; 1791 add_wait_queue(head, &pt->req->poll.wait); 1792} 1793 1794static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1795{ 1796 struct io_poll_iocb *poll = &req->poll; 1797 struct io_ring_ctx *ctx = req->ctx; 1798 struct io_poll_table ipt; 1799 bool cancel = false; 1800 __poll_t mask; 1801 u16 events; 1802 1803 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 1804 return -EINVAL; 1805 if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index) 1806 return -EINVAL; 1807 if (!poll->file) 1808 return -EBADF; 1809 1810 req->submit.sqe = NULL; 1811 INIT_WORK(&req->work, io_poll_complete_work); 1812 events = READ_ONCE(sqe->poll_events); 1813 poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; 1814 1815 poll->head = NULL; 1816 poll->done = false; 1817 poll->canceled = false; 1818 1819 ipt.pt._qproc = io_poll_queue_proc; 1820 ipt.pt._key = poll->events; 1821 ipt.req = req; 1822 ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */ 1823 1824 /* initialized the list so that we can do list_empty checks */ 1825 INIT_LIST_HEAD(&poll->wait.entry); 1826 init_waitqueue_func_entry(&poll->wait, io_poll_wake); 1827 1828 INIT_LIST_HEAD(&req->list); 1829 1830 mask = vfs_poll(poll->file, &ipt.pt) & poll->events; 1831 1832 spin_lock_irq(&ctx->completion_lock); 1833 if (likely(poll->head)) { 1834 spin_lock(&poll->head->lock); 1835 if (unlikely(list_empty(&poll->wait.entry))) { 1836 if (ipt.error) 1837 cancel = true; 1838 ipt.error = 0; 1839 mask = 0; 1840 } 1841 if (mask || ipt.error) 1842 list_del_init(&poll->wait.entry); 1843 else if (cancel) 1844 WRITE_ONCE(poll->canceled, true); 1845 else if (!poll->done) /* actually waiting for an event */ 1846 list_add_tail(&req->list, &ctx->cancel_list); 1847 spin_unlock(&poll->head->lock); 1848 } 1849 if (mask) { /* no async, we'd stolen it */ 1850 ipt.error = 0; 1851 io_poll_complete(ctx, req, mask); 1852 } 1853 spin_unlock_irq(&ctx->completion_lock); 1854 1855 if (mask) { 1856 io_cqring_ev_posted(ctx); 1857 io_put_req(req); 1858 } 1859 return ipt.error; 1860} 1861 1862static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) 1863{ 1864 struct io_ring_ctx *ctx; 1865 struct io_kiocb *req; 1866 unsigned long flags; 1867 1868 req = container_of(timer, struct io_kiocb, timeout.timer); 1869 ctx = req->ctx; 1870 atomic_inc(&ctx->cq_timeouts); 1871 1872 spin_lock_irqsave(&ctx->completion_lock, flags); 1873 list_del(&req->list); 1874 1875 io_cqring_fill_event(ctx, req->user_data, -ETIME); 1876 io_commit_cqring(ctx); 1877 spin_unlock_irqrestore(&ctx->completion_lock, flags); 1878 1879 io_cqring_ev_posted(ctx); 1880 1881 io_put_req(req); 1882 return HRTIMER_NORESTART; 1883} 1884 1885static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe) 1886{ 1887 unsigned count, req_dist, tail_index; 1888 struct io_ring_ctx *ctx = req->ctx; 1889 struct list_head *entry; 1890 struct timespec64 ts; 1891 1892 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 1893 return -EINVAL; 1894 if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->timeout_flags || 1895 sqe->len != 1) 1896 return -EINVAL; 1897 1898 if (get_timespec64(&ts, u64_to_user_ptr(sqe->addr))) 1899 return -EFAULT; 1900 1901 /* 1902 * sqe->off holds how many events that need to occur for this 1903 * timeout event to be satisfied. 1904 */ 1905 count = READ_ONCE(sqe->off); 1906 if (!count) 1907 count = 1; 1908 1909 req->sequence = ctx->cached_sq_head + count - 1; 1910 req->flags |= REQ_F_TIMEOUT; 1911 1912 /* 1913 * Insertion sort, ensuring the first entry in the list is always 1914 * the one we need first. 1915 */ 1916 tail_index = ctx->cached_cq_tail - ctx->rings->sq_dropped; 1917 req_dist = req->sequence - tail_index; 1918 spin_lock_irq(&ctx->completion_lock); 1919 list_for_each_prev(entry, &ctx->timeout_list) { 1920 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list); 1921 unsigned dist; 1922 1923 dist = nxt->sequence - tail_index; 1924 if (req_dist >= dist) 1925 break; 1926 } 1927 list_add(&req->list, entry); 1928 spin_unlock_irq(&ctx->completion_lock); 1929 1930 hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL); 1931 req->timeout.timer.function = io_timeout_fn; 1932 hrtimer_start(&req->timeout.timer, timespec64_to_ktime(ts), 1933 HRTIMER_MODE_REL); 1934 return 0; 1935} 1936 1937static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req, 1938 const struct io_uring_sqe *sqe) 1939{ 1940 struct io_uring_sqe *sqe_copy; 1941 1942 if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) 1943 return 0; 1944 1945 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); 1946 if (!sqe_copy) 1947 return -EAGAIN; 1948 1949 spin_lock_irq(&ctx->completion_lock); 1950 if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) { 1951 spin_unlock_irq(&ctx->completion_lock); 1952 kfree(sqe_copy); 1953 return 0; 1954 } 1955 1956 memcpy(sqe_copy, sqe, sizeof(*sqe_copy)); 1957 req->submit.sqe = sqe_copy; 1958 1959 INIT_WORK(&req->work, io_sq_wq_submit_work); 1960 list_add_tail(&req->list, &ctx->defer_list); 1961 spin_unlock_irq(&ctx->completion_lock); 1962 return -EIOCBQUEUED; 1963} 1964 1965static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 1966 const struct sqe_submit *s, bool force_nonblock) 1967{ 1968 int ret, opcode; 1969 1970 req->user_data = READ_ONCE(s->sqe->user_data); 1971 1972 if (unlikely(s->index >= ctx->sq_entries)) 1973 return -EINVAL; 1974 1975 opcode = READ_ONCE(s->sqe->opcode); 1976 switch (opcode) { 1977 case IORING_OP_NOP: 1978 ret = io_nop(req, req->user_data); 1979 break; 1980 case IORING_OP_READV: 1981 if (unlikely(s->sqe->buf_index)) 1982 return -EINVAL; 1983 ret = io_read(req, s, force_nonblock); 1984 break; 1985 case IORING_OP_WRITEV: 1986 if (unlikely(s->sqe->buf_index)) 1987 return -EINVAL; 1988 ret = io_write(req, s, force_nonblock); 1989 break; 1990 case IORING_OP_READ_FIXED: 1991 ret = io_read(req, s, force_nonblock); 1992 break; 1993 case IORING_OP_WRITE_FIXED: 1994 ret = io_write(req, s, force_nonblock); 1995 break; 1996 case IORING_OP_FSYNC: 1997 ret = io_fsync(req, s->sqe, force_nonblock); 1998 break; 1999 case IORING_OP_POLL_ADD: 2000 ret = io_poll_add(req, s->sqe); 2001 break; 2002 case IORING_OP_POLL_REMOVE: 2003 ret = io_poll_remove(req, s->sqe); 2004 break; 2005 case IORING_OP_SYNC_FILE_RANGE: 2006 ret = io_sync_file_range(req, s->sqe, force_nonblock); 2007 break; 2008 case IORING_OP_SENDMSG: 2009 ret = io_sendmsg(req, s->sqe, force_nonblock); 2010 break; 2011 case IORING_OP_RECVMSG: 2012 ret = io_recvmsg(req, s->sqe, force_nonblock); 2013 break; 2014 case IORING_OP_TIMEOUT: 2015 ret = io_timeout(req, s->sqe); 2016 break; 2017 default: 2018 ret = -EINVAL; 2019 break; 2020 } 2021 2022 if (ret) 2023 return ret; 2024 2025 if (ctx->flags & IORING_SETUP_IOPOLL) { 2026 if (req->result == -EAGAIN) 2027 return -EAGAIN; 2028 2029 /* workqueue context doesn't hold uring_lock, grab it now */ 2030 if (s->needs_lock) 2031 mutex_lock(&ctx->uring_lock); 2032 io_iopoll_req_issued(req); 2033 if (s->needs_lock) 2034 mutex_unlock(&ctx->uring_lock); 2035 } 2036 2037 return 0; 2038} 2039 2040static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx, 2041 const struct io_uring_sqe *sqe) 2042{ 2043 switch (sqe->opcode) { 2044 case IORING_OP_READV: 2045 case IORING_OP_READ_FIXED: 2046 return &ctx->pending_async[READ]; 2047 case IORING_OP_WRITEV: 2048 case IORING_OP_WRITE_FIXED: 2049 return &ctx->pending_async[WRITE]; 2050 default: 2051 return NULL; 2052 } 2053} 2054 2055static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) 2056{ 2057 u8 opcode = READ_ONCE(sqe->opcode); 2058 2059 return !(opcode == IORING_OP_READ_FIXED || 2060 opcode == IORING_OP_WRITE_FIXED); 2061} 2062 2063static void io_sq_wq_submit_work(struct work_struct *work) 2064{ 2065 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 2066 struct io_ring_ctx *ctx = req->ctx; 2067 struct mm_struct *cur_mm = NULL; 2068 struct async_list *async_list; 2069 LIST_HEAD(req_list); 2070 mm_segment_t old_fs; 2071 int ret; 2072 2073 async_list = io_async_list_from_sqe(ctx, req->submit.sqe); 2074restart: 2075 do { 2076 struct sqe_submit *s = &req->submit; 2077 const struct io_uring_sqe *sqe = s->sqe; 2078 unsigned int flags = req->flags; 2079 2080 /* Ensure we clear previously set non-block flag */ 2081 req->rw.ki_flags &= ~IOCB_NOWAIT; 2082 2083 ret = 0; 2084 if (io_sqe_needs_user(sqe) && !cur_mm) { 2085 if (!mmget_not_zero(ctx->sqo_mm)) { 2086 ret = -EFAULT; 2087 } else { 2088 cur_mm = ctx->sqo_mm; 2089 use_mm(cur_mm); 2090 old_fs = get_fs(); 2091 set_fs(USER_DS); 2092 } 2093 } 2094 2095 if (!ret) { 2096 s->has_user = cur_mm != NULL; 2097 s->needs_lock = true; 2098 do { 2099 ret = __io_submit_sqe(ctx, req, s, false); 2100 /* 2101 * We can get EAGAIN for polled IO even though 2102 * we're forcing a sync submission from here, 2103 * since we can't wait for request slots on the 2104 * block side. 2105 */ 2106 if (ret != -EAGAIN) 2107 break; 2108 cond_resched(); 2109 } while (1); 2110 } 2111 2112 /* drop submission reference */ 2113 io_put_req(req); 2114 2115 if (ret) { 2116 io_cqring_add_event(ctx, sqe->user_data, ret); 2117 io_put_req(req); 2118 } 2119 2120 /* async context always use a copy of the sqe */ 2121 kfree(sqe); 2122 2123 /* req from defer and link list needn't decrease async cnt */ 2124 if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE)) 2125 goto out; 2126 2127 if (!async_list) 2128 break; 2129 if (!list_empty(&req_list)) { 2130 req = list_first_entry(&req_list, struct io_kiocb, 2131 list); 2132 list_del(&req->list); 2133 continue; 2134 } 2135 if (list_empty(&async_list->list)) 2136 break; 2137 2138 req = NULL; 2139 spin_lock(&async_list->lock); 2140 if (list_empty(&async_list->list)) { 2141 spin_unlock(&async_list->lock); 2142 break; 2143 } 2144 list_splice_init(&async_list->list, &req_list); 2145 spin_unlock(&async_list->lock); 2146 2147 req = list_first_entry(&req_list, struct io_kiocb, list); 2148 list_del(&req->list); 2149 } while (req); 2150 2151 /* 2152 * Rare case of racing with a submitter. If we find the count has 2153 * dropped to zero AND we have pending work items, then restart 2154 * the processing. This is a tiny race window. 2155 */ 2156 if (async_list) { 2157 ret = atomic_dec_return(&async_list->cnt); 2158 while (!ret && !list_empty(&async_list->list)) { 2159 spin_lock(&async_list->lock); 2160 atomic_inc(&async_list->cnt); 2161 list_splice_init(&async_list->list, &req_list); 2162 spin_unlock(&async_list->lock); 2163 2164 if (!list_empty(&req_list)) { 2165 req = list_first_entry(&req_list, 2166 struct io_kiocb, list); 2167 list_del(&req->list); 2168 goto restart; 2169 } 2170 ret = atomic_dec_return(&async_list->cnt); 2171 } 2172 } 2173 2174out: 2175 if (cur_mm) { 2176 set_fs(old_fs); 2177 unuse_mm(cur_mm); 2178 mmput(cur_mm); 2179 } 2180} 2181 2182/* 2183 * See if we can piggy back onto previously submitted work, that is still 2184 * running. We currently only allow this if the new request is sequential 2185 * to the previous one we punted. 2186 */ 2187static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) 2188{ 2189 bool ret; 2190 2191 if (!list) 2192 return false; 2193 if (!(req->flags & REQ_F_SEQ_PREV)) 2194 return false; 2195 if (!atomic_read(&list->cnt)) 2196 return false; 2197 2198 ret = true; 2199 spin_lock(&list->lock); 2200 list_add_tail(&req->list, &list->list); 2201 /* 2202 * Ensure we see a simultaneous modification from io_sq_wq_submit_work() 2203 */ 2204 smp_mb(); 2205 if (!atomic_read(&list->cnt)) { 2206 list_del_init(&req->list); 2207 ret = false; 2208 } 2209 spin_unlock(&list->lock); 2210 return ret; 2211} 2212 2213static bool io_op_needs_file(const struct io_uring_sqe *sqe) 2214{ 2215 int op = READ_ONCE(sqe->opcode); 2216 2217 switch (op) { 2218 case IORING_OP_NOP: 2219 case IORING_OP_POLL_REMOVE: 2220 return false; 2221 default: 2222 return true; 2223 } 2224} 2225 2226static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, 2227 struct io_submit_state *state, struct io_kiocb *req) 2228{ 2229 unsigned flags; 2230 int fd; 2231 2232 flags = READ_ONCE(s->sqe->flags); 2233 fd = READ_ONCE(s->sqe->fd); 2234 2235 if (flags & IOSQE_IO_DRAIN) 2236 req->flags |= REQ_F_IO_DRAIN; 2237 /* 2238 * All io need record the previous position, if LINK vs DARIN, 2239 * it can be used to mark the position of the first IO in the 2240 * link list. 2241 */ 2242 req->sequence = s->sequence; 2243 2244 if (!io_op_needs_file(s->sqe)) 2245 return 0; 2246 2247 if (flags & IOSQE_FIXED_FILE) { 2248 if (unlikely(!ctx->user_files || 2249 (unsigned) fd >= ctx->nr_user_files)) 2250 return -EBADF; 2251 req->file = ctx->user_files[fd]; 2252 req->flags |= REQ_F_FIXED_FILE; 2253 } else { 2254 if (s->needs_fixed_file) 2255 return -EBADF; 2256 req->file = io_file_get(state, fd); 2257 if (unlikely(!req->file)) 2258 return -EBADF; 2259 } 2260 2261 return 0; 2262} 2263 2264static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 2265 struct sqe_submit *s, bool force_nonblock) 2266{ 2267 int ret; 2268 2269 ret = __io_submit_sqe(ctx, req, s, force_nonblock); 2270 if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { 2271 struct io_uring_sqe *sqe_copy; 2272 2273 sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL); 2274 if (sqe_copy) { 2275 struct async_list *list; 2276 2277 s->sqe = sqe_copy; 2278 memcpy(&req->submit, s, sizeof(*s)); 2279 list = io_async_list_from_sqe(ctx, s->sqe); 2280 if (!io_add_to_prev_work(list, req)) { 2281 if (list) 2282 atomic_inc(&list->cnt); 2283 INIT_WORK(&req->work, io_sq_wq_submit_work); 2284 io_queue_async_work(ctx, req); 2285 } 2286 2287 /* 2288 * Queued up for async execution, worker will release 2289 * submit reference when the iocb is actually submitted. 2290 */ 2291 return 0; 2292 } 2293 } 2294 2295 /* drop submission reference */ 2296 io_put_req(req); 2297 2298 /* and drop final reference, if we failed */ 2299 if (ret) { 2300 io_cqring_add_event(ctx, req->user_data, ret); 2301 if (req->flags & REQ_F_LINK) 2302 req->flags |= REQ_F_FAIL_LINK; 2303 io_put_req(req); 2304 } 2305 2306 return ret; 2307} 2308 2309static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, 2310 struct sqe_submit *s, bool force_nonblock) 2311{ 2312 int ret; 2313 2314 ret = io_req_defer(ctx, req, s->sqe); 2315 if (ret) { 2316 if (ret != -EIOCBQUEUED) { 2317 io_free_req(req); 2318 io_cqring_add_event(ctx, s->sqe->user_data, ret); 2319 } 2320 return 0; 2321 } 2322 2323 return __io_queue_sqe(ctx, req, s, force_nonblock); 2324} 2325 2326static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, 2327 struct sqe_submit *s, struct io_kiocb *shadow, 2328 bool force_nonblock) 2329{ 2330 int ret; 2331 int need_submit = false; 2332 2333 if (!shadow) 2334 return io_queue_sqe(ctx, req, s, force_nonblock); 2335 2336 /* 2337 * Mark the first IO in link list as DRAIN, let all the following 2338 * IOs enter the defer list. all IO needs to be completed before link 2339 * list. 2340 */ 2341 req->flags |= REQ_F_IO_DRAIN; 2342 ret = io_req_defer(ctx, req, s->sqe); 2343 if (ret) { 2344 if (ret != -EIOCBQUEUED) { 2345 io_free_req(req); 2346 io_cqring_add_event(ctx, s->sqe->user_data, ret); 2347 return 0; 2348 } 2349 } else { 2350 /* 2351 * If ret == 0 means that all IOs in front of link io are 2352 * running done. let's queue link head. 2353 */ 2354 need_submit = true; 2355 } 2356 2357 /* Insert shadow req to defer_list, blocking next IOs */ 2358 spin_lock_irq(&ctx->completion_lock); 2359 list_add_tail(&shadow->list, &ctx->defer_list); 2360 spin_unlock_irq(&ctx->completion_lock); 2361 2362 if (need_submit) 2363 return __io_queue_sqe(ctx, req, s, force_nonblock); 2364 2365 return 0; 2366} 2367 2368#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK) 2369 2370static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, 2371 struct io_submit_state *state, struct io_kiocb **link, 2372 bool force_nonblock) 2373{ 2374 struct io_uring_sqe *sqe_copy; 2375 struct io_kiocb *req; 2376 int ret; 2377 2378 /* enforce forwards compatibility on users */ 2379 if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) { 2380 ret = -EINVAL; 2381 goto err; 2382 } 2383 2384 req = io_get_req(ctx, state); 2385 if (unlikely(!req)) { 2386 ret = -EAGAIN; 2387 goto err; 2388 } 2389 2390 ret = io_req_set_file(ctx, s, state, req); 2391 if (unlikely(ret)) { 2392err_req: 2393 io_free_req(req); 2394err: 2395 io_cqring_add_event(ctx, s->sqe->user_data, ret); 2396 return; 2397 } 2398 2399 /* 2400 * If we already have a head request, queue this one for async 2401 * submittal once the head completes. If we don't have a head but 2402 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be 2403 * submitted sync once the chain is complete. If none of those 2404 * conditions are true (normal request), then just queue it. 2405 */ 2406 if (*link) { 2407 struct io_kiocb *prev = *link; 2408 2409 sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL); 2410 if (!sqe_copy) { 2411 ret = -EAGAIN; 2412 goto err_req; 2413 } 2414 2415 s->sqe = sqe_copy; 2416 memcpy(&req->submit, s, sizeof(*s)); 2417 list_add_tail(&req->list, &prev->link_list); 2418 } else if (s->sqe->flags & IOSQE_IO_LINK) { 2419 req->flags |= REQ_F_LINK; 2420 2421 memcpy(&req->submit, s, sizeof(*s)); 2422 INIT_LIST_HEAD(&req->link_list); 2423 *link = req; 2424 } else { 2425 io_queue_sqe(ctx, req, s, force_nonblock); 2426 } 2427} 2428 2429/* 2430 * Batched submission is done, ensure local IO is flushed out. 2431 */ 2432static void io_submit_state_end(struct io_submit_state *state) 2433{ 2434 blk_finish_plug(&state->plug); 2435 io_file_put(state); 2436 if (state->free_reqs) 2437 kmem_cache_free_bulk(req_cachep, state->free_reqs, 2438 &state->reqs[state->cur_req]); 2439} 2440 2441/* 2442 * Start submission side cache. 2443 */ 2444static void io_submit_state_start(struct io_submit_state *state, 2445 struct io_ring_ctx *ctx, unsigned max_ios) 2446{ 2447 blk_start_plug(&state->plug); 2448 state->free_reqs = 0; 2449 state->file = NULL; 2450 state->ios_left = max_ios; 2451} 2452 2453static void io_commit_sqring(struct io_ring_ctx *ctx) 2454{ 2455 struct io_rings *rings = ctx->rings; 2456 2457 if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) { 2458 /* 2459 * Ensure any loads from the SQEs are done at this point, 2460 * since once we write the new head, the application could 2461 * write new data to them. 2462 */ 2463 smp_store_release(&rings->sq.head, ctx->cached_sq_head); 2464 } 2465} 2466 2467/* 2468 * Fetch an sqe, if one is available. Note that s->sqe will point to memory 2469 * that is mapped by userspace. This means that care needs to be taken to 2470 * ensure that reads are stable, as we cannot rely on userspace always 2471 * being a good citizen. If members of the sqe are validated and then later 2472 * used, it's important that those reads are done through READ_ONCE() to 2473 * prevent a re-load down the line. 2474 */ 2475static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) 2476{ 2477 struct io_rings *rings = ctx->rings; 2478 u32 *sq_array = ctx->sq_array; 2479 unsigned head; 2480 2481 /* 2482 * The cached sq head (or cq tail) serves two purposes: 2483 * 2484 * 1) allows us to batch the cost of updating the user visible 2485 * head updates. 2486 * 2) allows the kernel side to track the head on its own, even 2487 * though the application is the one updating it. 2488 */ 2489 head = ctx->cached_sq_head; 2490 /* make sure SQ entry isn't read before tail */ 2491 if (head == smp_load_acquire(&rings->sq.tail)) 2492 return false; 2493 2494 head = READ_ONCE(sq_array[head & ctx->sq_mask]); 2495 if (head < ctx->sq_entries) { 2496 s->index = head; 2497 s->sqe = &ctx->sq_sqes[head]; 2498 s->sequence = ctx->cached_sq_head; 2499 ctx->cached_sq_head++; 2500 return true; 2501 } 2502 2503 /* drop invalid entries */ 2504 ctx->cached_sq_head++; 2505 rings->sq_dropped++; 2506 return false; 2507} 2508 2509static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, 2510 unsigned int nr, bool has_user, bool mm_fault) 2511{ 2512 struct io_submit_state state, *statep = NULL; 2513 struct io_kiocb *link = NULL; 2514 struct io_kiocb *shadow_req = NULL; 2515 bool prev_was_link = false; 2516 int i, submitted = 0; 2517 2518 if (nr > IO_PLUG_THRESHOLD) { 2519 io_submit_state_start(&state, ctx, nr); 2520 statep = &state; 2521 } 2522 2523 for (i = 0; i < nr; i++) { 2524 /* 2525 * If previous wasn't linked and we have a linked command, 2526 * that's the end of the chain. Submit the previous link. 2527 */ 2528 if (!prev_was_link && link) { 2529 io_queue_link_head(ctx, link, &link->submit, shadow_req, 2530 true); 2531 link = NULL; 2532 shadow_req = NULL; 2533 } 2534 prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0; 2535 2536 if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) { 2537 if (!shadow_req) { 2538 shadow_req = io_get_req(ctx, NULL); 2539 if (unlikely(!shadow_req)) 2540 goto out; 2541 shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); 2542 refcount_dec(&shadow_req->refs); 2543 } 2544 shadow_req->sequence = sqes[i].sequence; 2545 } 2546 2547out: 2548 if (unlikely(mm_fault)) { 2549 io_cqring_add_event(ctx, sqes[i].sqe->user_data, 2550 -EFAULT); 2551 } else { 2552 sqes[i].has_user = has_user; 2553 sqes[i].needs_lock = true; 2554 sqes[i].needs_fixed_file = true; 2555 io_submit_sqe(ctx, &sqes[i], statep, &link, true); 2556 submitted++; 2557 } 2558 } 2559 2560 if (link) 2561 io_queue_link_head(ctx, link, &link->submit, shadow_req, true); 2562 if (statep) 2563 io_submit_state_end(&state); 2564 2565 return submitted; 2566} 2567 2568static int io_sq_thread(void *data) 2569{ 2570 struct sqe_submit sqes[IO_IOPOLL_BATCH]; 2571 struct io_ring_ctx *ctx = data; 2572 struct mm_struct *cur_mm = NULL; 2573 mm_segment_t old_fs; 2574 DEFINE_WAIT(wait); 2575 unsigned inflight; 2576 unsigned long timeout; 2577 2578 complete(&ctx->sqo_thread_started); 2579 2580 old_fs = get_fs(); 2581 set_fs(USER_DS); 2582 2583 timeout = inflight = 0; 2584 while (!kthread_should_park()) { 2585 bool all_fixed, mm_fault = false; 2586 int i; 2587 2588 if (inflight) { 2589 unsigned nr_events = 0; 2590 2591 if (ctx->flags & IORING_SETUP_IOPOLL) { 2592 io_iopoll_check(ctx, &nr_events, 0); 2593 } else { 2594 /* 2595 * Normal IO, just pretend everything completed. 2596 * We don't have to poll completions for that. 2597 */ 2598 nr_events = inflight; 2599 } 2600 2601 inflight -= nr_events; 2602 if (!inflight) 2603 timeout = jiffies + ctx->sq_thread_idle; 2604 } 2605 2606 if (!io_get_sqring(ctx, &sqes[0])) { 2607 /* 2608 * We're polling. If we're within the defined idle 2609 * period, then let us spin without work before going 2610 * to sleep. 2611 */ 2612 if (inflight || !time_after(jiffies, timeout)) { 2613 cond_resched(); 2614 continue; 2615 } 2616 2617 /* 2618 * Drop cur_mm before scheduling, we can't hold it for 2619 * long periods (or over schedule()). Do this before 2620 * adding ourselves to the waitqueue, as the unuse/drop 2621 * may sleep. 2622 */ 2623 if (cur_mm) { 2624 unuse_mm(cur_mm); 2625 mmput(cur_mm); 2626 cur_mm = NULL; 2627 } 2628 2629 prepare_to_wait(&ctx->sqo_wait, &wait, 2630 TASK_INTERRUPTIBLE); 2631 2632 /* Tell userspace we may need a wakeup call */ 2633 ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP; 2634 /* make sure to read SQ tail after writing flags */ 2635 smp_mb(); 2636 2637 if (!io_get_sqring(ctx, &sqes[0])) { 2638 if (kthread_should_park()) { 2639 finish_wait(&ctx->sqo_wait, &wait); 2640 break; 2641 } 2642 if (signal_pending(current)) 2643 flush_signals(current); 2644 schedule(); 2645 finish_wait(&ctx->sqo_wait, &wait); 2646 2647 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP; 2648 continue; 2649 } 2650 finish_wait(&ctx->sqo_wait, &wait); 2651 2652 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP; 2653 } 2654 2655 i = 0; 2656 all_fixed = true; 2657 do { 2658 if (all_fixed && io_sqe_needs_user(sqes[i].sqe)) 2659 all_fixed = false; 2660 2661 i++; 2662 if (i == ARRAY_SIZE(sqes)) 2663 break; 2664 } while (io_get_sqring(ctx, &sqes[i])); 2665 2666 /* Unless all new commands are FIXED regions, grab mm */ 2667 if (!all_fixed && !cur_mm) { 2668 mm_fault = !mmget_not_zero(ctx->sqo_mm); 2669 if (!mm_fault) { 2670 use_mm(ctx->sqo_mm); 2671 cur_mm = ctx->sqo_mm; 2672 } 2673 } 2674 2675 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL, 2676 mm_fault); 2677 2678 /* Commit SQ ring head once we've consumed all SQEs */ 2679 io_commit_sqring(ctx); 2680 } 2681 2682 set_fs(old_fs); 2683 if (cur_mm) { 2684 unuse_mm(cur_mm); 2685 mmput(cur_mm); 2686 } 2687 2688 kthread_parkme(); 2689 2690 return 0; 2691} 2692 2693static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit, 2694 bool block_for_last) 2695{ 2696 struct io_submit_state state, *statep = NULL; 2697 struct io_kiocb *link = NULL; 2698 struct io_kiocb *shadow_req = NULL; 2699 bool prev_was_link = false; 2700 int i, submit = 0; 2701 2702 if (to_submit > IO_PLUG_THRESHOLD) { 2703 io_submit_state_start(&state, ctx, to_submit); 2704 statep = &state; 2705 } 2706 2707 for (i = 0; i < to_submit; i++) { 2708 bool force_nonblock = true; 2709 struct sqe_submit s; 2710 2711 if (!io_get_sqring(ctx, &s)) 2712 break; 2713 2714 /* 2715 * If previous wasn't linked and we have a linked command, 2716 * that's the end of the chain. Submit the previous link. 2717 */ 2718 if (!prev_was_link && link) { 2719 io_queue_link_head(ctx, link, &link->submit, shadow_req, 2720 force_nonblock); 2721 link = NULL; 2722 shadow_req = NULL; 2723 } 2724 prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0; 2725 2726 if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) { 2727 if (!shadow_req) { 2728 shadow_req = io_get_req(ctx, NULL); 2729 if (unlikely(!shadow_req)) 2730 goto out; 2731 shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); 2732 refcount_dec(&shadow_req->refs); 2733 } 2734 shadow_req->sequence = s.sequence; 2735 } 2736 2737out: 2738 s.has_user = true; 2739 s.needs_lock = false; 2740 s.needs_fixed_file = false; 2741 submit++; 2742 2743 /* 2744 * The caller will block for events after submit, submit the 2745 * last IO non-blocking. This is either the only IO it's 2746 * submitting, or it already submitted the previous ones. This 2747 * improves performance by avoiding an async punt that we don't 2748 * need to do. 2749 */ 2750 if (block_for_last && submit == to_submit) 2751 force_nonblock = false; 2752 2753 io_submit_sqe(ctx, &s, statep, &link, force_nonblock); 2754 } 2755 io_commit_sqring(ctx); 2756 2757 if (link) 2758 io_queue_link_head(ctx, link, &link->submit, shadow_req, 2759 !block_for_last); 2760 if (statep) 2761 io_submit_state_end(statep); 2762 2763 return submit; 2764} 2765 2766struct io_wait_queue { 2767 struct wait_queue_entry wq; 2768 struct io_ring_ctx *ctx; 2769 unsigned to_wait; 2770 unsigned nr_timeouts; 2771}; 2772 2773static inline bool io_should_wake(struct io_wait_queue *iowq) 2774{ 2775 struct io_ring_ctx *ctx = iowq->ctx; 2776 2777 /* 2778 * Wake up if we have enough events, or if a timeout occured since we 2779 * started waiting. For timeouts, we always want to return to userspace, 2780 * regardless of event count. 2781 */ 2782 return io_cqring_events(ctx->rings) >= iowq->to_wait || 2783 atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; 2784} 2785 2786static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, 2787 int wake_flags, void *key) 2788{ 2789 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, 2790 wq); 2791 2792 if (!io_should_wake(iowq)) 2793 return -1; 2794 2795 return autoremove_wake_function(curr, mode, wake_flags, key); 2796} 2797 2798/* 2799 * Wait until events become available, if we don't already have some. The 2800 * application must reap them itself, as they reside on the shared cq ring. 2801 */ 2802static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, 2803 const sigset_t __user *sig, size_t sigsz) 2804{ 2805 struct io_wait_queue iowq = { 2806 .wq = { 2807 .private = current, 2808 .func = io_wake_function, 2809 .entry = LIST_HEAD_INIT(iowq.wq.entry), 2810 }, 2811 .ctx = ctx, 2812 .to_wait = min_events, 2813 }; 2814 struct io_rings *rings = ctx->rings; 2815 int ret; 2816 2817 if (io_cqring_events(rings) >= min_events) 2818 return 0; 2819 2820 if (sig) { 2821#ifdef CONFIG_COMPAT 2822 if (in_compat_syscall()) 2823 ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig, 2824 sigsz); 2825 else 2826#endif 2827 ret = set_user_sigmask(sig, sigsz); 2828 2829 if (ret) 2830 return ret; 2831 } 2832 2833 ret = 0; 2834 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); 2835 do { 2836 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, 2837 TASK_INTERRUPTIBLE); 2838 if (io_should_wake(&iowq)) 2839 break; 2840 schedule(); 2841 if (signal_pending(current)) { 2842 ret = -ERESTARTSYS; 2843 break; 2844 } 2845 } while (1); 2846 finish_wait(&ctx->wait, &iowq.wq); 2847 2848 restore_saved_sigmask_unless(ret == -ERESTARTSYS); 2849 if (ret == -ERESTARTSYS) 2850 ret = -EINTR; 2851 2852 return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; 2853} 2854 2855static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) 2856{ 2857#if defined(CONFIG_UNIX) 2858 if (ctx->ring_sock) { 2859 struct sock *sock = ctx->ring_sock->sk; 2860 struct sk_buff *skb; 2861 2862 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL) 2863 kfree_skb(skb); 2864 } 2865#else 2866 int i; 2867 2868 for (i = 0; i < ctx->nr_user_files; i++) 2869 fput(ctx->user_files[i]); 2870#endif 2871} 2872 2873static int io_sqe_files_unregister(struct io_ring_ctx *ctx) 2874{ 2875 if (!ctx->user_files) 2876 return -ENXIO; 2877 2878 __io_sqe_files_unregister(ctx); 2879 kfree(ctx->user_files); 2880 ctx->user_files = NULL; 2881 ctx->nr_user_files = 0; 2882 return 0; 2883} 2884 2885static void io_sq_thread_stop(struct io_ring_ctx *ctx) 2886{ 2887 if (ctx->sqo_thread) { 2888 wait_for_completion(&ctx->sqo_thread_started); 2889 /* 2890 * The park is a bit of a work-around, without it we get 2891 * warning spews on shutdown with SQPOLL set and affinity 2892 * set to a single CPU. 2893 */ 2894 kthread_park(ctx->sqo_thread); 2895 kthread_stop(ctx->sqo_thread); 2896 ctx->sqo_thread = NULL; 2897 } 2898} 2899 2900static void io_finish_async(struct io_ring_ctx *ctx) 2901{ 2902 int i; 2903 2904 io_sq_thread_stop(ctx); 2905 2906 for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) { 2907 if (ctx->sqo_wq[i]) { 2908 destroy_workqueue(ctx->sqo_wq[i]); 2909 ctx->sqo_wq[i] = NULL; 2910 } 2911 } 2912} 2913 2914#if defined(CONFIG_UNIX) 2915static void io_destruct_skb(struct sk_buff *skb) 2916{ 2917 struct io_ring_ctx *ctx = skb->sk->sk_user_data; 2918 int i; 2919 2920 for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) 2921 if (ctx->sqo_wq[i]) 2922 flush_workqueue(ctx->sqo_wq[i]); 2923 2924 unix_destruct_scm(skb); 2925} 2926 2927/* 2928 * Ensure the UNIX gc is aware of our file set, so we are certain that 2929 * the io_uring can be safely unregistered on process exit, even if we have 2930 * loops in the file referencing. 2931 */ 2932static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset) 2933{ 2934 struct sock *sk = ctx->ring_sock->sk; 2935 struct scm_fp_list *fpl; 2936 struct sk_buff *skb; 2937 int i; 2938 2939 if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) { 2940 unsigned long inflight = ctx->user->unix_inflight + nr; 2941 2942 if (inflight > task_rlimit(current, RLIMIT_NOFILE)) 2943 return -EMFILE; 2944 } 2945 2946 fpl = kzalloc(sizeof(*fpl), GFP_KERNEL); 2947 if (!fpl) 2948 return -ENOMEM; 2949 2950 skb = alloc_skb(0, GFP_KERNEL); 2951 if (!skb) { 2952 kfree(fpl); 2953 return -ENOMEM; 2954 } 2955 2956 skb->sk = sk; 2957 skb->destructor = io_destruct_skb; 2958 2959 fpl->user = get_uid(ctx->user); 2960 for (i = 0; i < nr; i++) { 2961 fpl->fp[i] = get_file(ctx->user_files[i + offset]); 2962 unix_inflight(fpl->user, fpl->fp[i]); 2963 } 2964 2965 fpl->max = fpl->count = nr; 2966 UNIXCB(skb).fp = fpl; 2967 refcount_add(skb->truesize, &sk->sk_wmem_alloc); 2968 skb_queue_head(&sk->sk_receive_queue, skb); 2969 2970 for (i = 0; i < nr; i++) 2971 fput(fpl->fp[i]); 2972 2973 return 0; 2974} 2975 2976/* 2977 * If UNIX sockets are enabled, fd passing can cause a reference cycle which 2978 * causes regular reference counting to break down. We rely on the UNIX 2979 * garbage collection to take care of this problem for us. 2980 */ 2981static int io_sqe_files_scm(struct io_ring_ctx *ctx) 2982{ 2983 unsigned left, total; 2984 int ret = 0; 2985 2986 total = 0; 2987 left = ctx->nr_user_files; 2988 while (left) { 2989 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD); 2990 2991 ret = __io_sqe_files_scm(ctx, this_files, total); 2992 if (ret) 2993 break; 2994 left -= this_files; 2995 total += this_files; 2996 } 2997 2998 if (!ret) 2999 return 0; 3000 3001 while (total < ctx->nr_user_files) { 3002 fput(ctx->user_files[total]); 3003 total++; 3004 } 3005 3006 return ret; 3007} 3008#else 3009static int io_sqe_files_scm(struct io_ring_ctx *ctx) 3010{ 3011 return 0; 3012} 3013#endif 3014 3015static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, 3016 unsigned nr_args) 3017{ 3018 __s32 __user *fds = (__s32 __user *) arg; 3019 int fd, ret = 0; 3020 unsigned i; 3021 3022 if (ctx->user_files) 3023 return -EBUSY; 3024 if (!nr_args) 3025 return -EINVAL; 3026 if (nr_args > IORING_MAX_FIXED_FILES) 3027 return -EMFILE; 3028 3029 ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL); 3030 if (!ctx->user_files) 3031 return -ENOMEM; 3032 3033 for (i = 0; i < nr_args; i++) { 3034 ret = -EFAULT; 3035 if (copy_from_user(&fd, &fds[i], sizeof(fd))) 3036 break; 3037 3038 ctx->user_files[i] = fget(fd); 3039 3040 ret = -EBADF; 3041 if (!ctx->user_files[i]) 3042 break; 3043 /* 3044 * Don't allow io_uring instances to be registered. If UNIX 3045 * isn't enabled, then this causes a reference cycle and this 3046 * instance can never get freed. If UNIX is enabled we'll 3047 * handle it just fine, but there's still no point in allowing 3048 * a ring fd as it doesn't support regular read/write anyway. 3049 */ 3050 if (ctx->user_files[i]->f_op == &io_uring_fops) { 3051 fput(ctx->user_files[i]); 3052 break; 3053 } 3054 ctx->nr_user_files++; 3055 ret = 0; 3056 } 3057 3058 if (ret) { 3059 for (i = 0; i < ctx->nr_user_files; i++) 3060 fput(ctx->user_files[i]); 3061 3062 kfree(ctx->user_files); 3063 ctx->user_files = NULL; 3064 ctx->nr_user_files = 0; 3065 return ret; 3066 } 3067 3068 ret = io_sqe_files_scm(ctx); 3069 if (ret) 3070 io_sqe_files_unregister(ctx); 3071 3072 return ret; 3073} 3074 3075static int io_sq_offload_start(struct io_ring_ctx *ctx, 3076 struct io_uring_params *p) 3077{ 3078 int ret; 3079 3080 init_waitqueue_head(&ctx->sqo_wait); 3081 mmgrab(current->mm); 3082 ctx->sqo_mm = current->mm; 3083 3084 if (ctx->flags & IORING_SETUP_SQPOLL) { 3085 ret = -EPERM; 3086 if (!capable(CAP_SYS_ADMIN)) 3087 goto err; 3088 3089 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 3090 if (!ctx->sq_thread_idle) 3091 ctx->sq_thread_idle = HZ; 3092 3093 if (p->flags & IORING_SETUP_SQ_AFF) { 3094 int cpu = p->sq_thread_cpu; 3095 3096 ret = -EINVAL; 3097 if (cpu >= nr_cpu_ids) 3098 goto err; 3099 if (!cpu_online(cpu)) 3100 goto err; 3101 3102 ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread, 3103 ctx, cpu, 3104 "io_uring-sq"); 3105 } else { 3106 ctx->sqo_thread = kthread_create(io_sq_thread, ctx, 3107 "io_uring-sq"); 3108 } 3109 if (IS_ERR(ctx->sqo_thread)) { 3110 ret = PTR_ERR(ctx->sqo_thread); 3111 ctx->sqo_thread = NULL; 3112 goto err; 3113 } 3114 wake_up_process(ctx->sqo_thread); 3115 } else if (p->flags & IORING_SETUP_SQ_AFF) { 3116 /* Can't have SQ_AFF without SQPOLL */ 3117 ret = -EINVAL; 3118 goto err; 3119 } 3120 3121 /* Do QD, or 2 * CPUS, whatever is smallest */ 3122 ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq", 3123 WQ_UNBOUND | WQ_FREEZABLE, 3124 min(ctx->sq_entries - 1, 2 * num_online_cpus())); 3125 if (!ctx->sqo_wq[0]) { 3126 ret = -ENOMEM; 3127 goto err; 3128 } 3129 3130 /* 3131 * This is for buffered writes, where we want to limit the parallelism 3132 * due to file locking in file systems. As "normal" buffered writes 3133 * should parellelize on writeout quite nicely, limit us to having 2 3134 * pending. This avoids massive contention on the inode when doing 3135 * buffered async writes. 3136 */ 3137 ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq", 3138 WQ_UNBOUND | WQ_FREEZABLE, 2); 3139 if (!ctx->sqo_wq[1]) { 3140 ret = -ENOMEM; 3141 goto err; 3142 } 3143 3144 return 0; 3145err: 3146 io_finish_async(ctx); 3147 mmdrop(ctx->sqo_mm); 3148 ctx->sqo_mm = NULL; 3149 return ret; 3150} 3151 3152static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages) 3153{ 3154 atomic_long_sub(nr_pages, &user->locked_vm); 3155} 3156 3157static int io_account_mem(struct user_struct *user, unsigned long nr_pages) 3158{ 3159 unsigned long page_limit, cur_pages, new_pages; 3160 3161 /* Don't allow more pages than we can safely lock */ 3162 page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT; 3163 3164 do { 3165 cur_pages = atomic_long_read(&user->locked_vm); 3166 new_pages = cur_pages + nr_pages; 3167 if (new_pages > page_limit) 3168 return -ENOMEM; 3169 } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages, 3170 new_pages) != cur_pages); 3171 3172 return 0; 3173} 3174 3175static void io_mem_free(void *ptr) 3176{ 3177 struct page *page; 3178 3179 if (!ptr) 3180 return; 3181 3182 page = virt_to_head_page(ptr); 3183 if (put_page_testzero(page)) 3184 free_compound_page(page); 3185} 3186 3187static void *io_mem_alloc(size_t size) 3188{ 3189 gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP | 3190 __GFP_NORETRY; 3191 3192 return (void *) __get_free_pages(gfp_flags, get_order(size)); 3193} 3194 3195static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries, 3196 size_t *sq_offset) 3197{ 3198 struct io_rings *rings; 3199 size_t off, sq_array_size; 3200 3201 off = struct_size(rings, cqes, cq_entries); 3202 if (off == SIZE_MAX) 3203 return SIZE_MAX; 3204 3205#ifdef CONFIG_SMP 3206 off = ALIGN(off, SMP_CACHE_BYTES); 3207 if (off == 0) 3208 return SIZE_MAX; 3209#endif 3210 3211 sq_array_size = array_size(sizeof(u32), sq_entries); 3212 if (sq_array_size == SIZE_MAX) 3213 return SIZE_MAX; 3214 3215 if (check_add_overflow(off, sq_array_size, &off)) 3216 return SIZE_MAX; 3217 3218 if (sq_offset) 3219 *sq_offset = off; 3220 3221 return off; 3222} 3223 3224static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries) 3225{ 3226 size_t pages; 3227 3228 pages = (size_t)1 << get_order( 3229 rings_size(sq_entries, cq_entries, NULL)); 3230 pages += (size_t)1 << get_order( 3231 array_size(sizeof(struct io_uring_sqe), sq_entries)); 3232 3233 return pages; 3234} 3235 3236static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) 3237{ 3238 int i, j; 3239 3240 if (!ctx->user_bufs) 3241 return -ENXIO; 3242 3243 for (i = 0; i < ctx->nr_user_bufs; i++) { 3244 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 3245 3246 for (j = 0; j < imu->nr_bvecs; j++) 3247 put_user_page(imu->bvec[j].bv_page); 3248 3249 if (ctx->account_mem) 3250 io_unaccount_mem(ctx->user, imu->nr_bvecs); 3251 kvfree(imu->bvec); 3252 imu->nr_bvecs = 0; 3253 } 3254 3255 kfree(ctx->user_bufs); 3256 ctx->user_bufs = NULL; 3257 ctx->nr_user_bufs = 0; 3258 return 0; 3259} 3260 3261static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst, 3262 void __user *arg, unsigned index) 3263{ 3264 struct iovec __user *src; 3265 3266#ifdef CONFIG_COMPAT 3267 if (ctx->compat) { 3268 struct compat_iovec __user *ciovs; 3269 struct compat_iovec ciov; 3270 3271 ciovs = (struct compat_iovec __user *) arg; 3272 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov))) 3273 return -EFAULT; 3274 3275 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base; 3276 dst->iov_len = ciov.iov_len; 3277 return 0; 3278 } 3279#endif 3280 src = (struct iovec __user *) arg; 3281 if (copy_from_user(dst, &src[index], sizeof(*dst))) 3282 return -EFAULT; 3283 return 0; 3284} 3285 3286static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, 3287 unsigned nr_args) 3288{ 3289 struct vm_area_struct **vmas = NULL; 3290 struct page **pages = NULL; 3291 int i, j, got_pages = 0; 3292 int ret = -EINVAL; 3293 3294 if (ctx->user_bufs) 3295 return -EBUSY; 3296 if (!nr_args || nr_args > UIO_MAXIOV) 3297 return -EINVAL; 3298 3299 ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf), 3300 GFP_KERNEL); 3301 if (!ctx->user_bufs) 3302 return -ENOMEM; 3303 3304 for (i = 0; i < nr_args; i++) { 3305 struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 3306 unsigned long off, start, end, ubuf; 3307 int pret, nr_pages; 3308 struct iovec iov; 3309 size_t size; 3310 3311 ret = io_copy_iov(ctx, &iov, arg, i); 3312 if (ret) 3313 goto err; 3314 3315 /* 3316 * Don't impose further limits on the size and buffer 3317 * constraints here, we'll -EINVAL later when IO is 3318 * submitted if they are wrong. 3319 */ 3320 ret = -EFAULT; 3321 if (!iov.iov_base || !iov.iov_len) 3322 goto err; 3323 3324 /* arbitrary limit, but we need something */ 3325 if (iov.iov_len > SZ_1G) 3326 goto err; 3327 3328 ubuf = (unsigned long) iov.iov_base; 3329 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT; 3330 start = ubuf >> PAGE_SHIFT; 3331 nr_pages = end - start; 3332 3333 if (ctx->account_mem) { 3334 ret = io_account_mem(ctx->user, nr_pages); 3335 if (ret) 3336 goto err; 3337 } 3338 3339 ret = 0; 3340 if (!pages || nr_pages > got_pages) { 3341 kfree(vmas); 3342 kfree(pages); 3343 pages = kvmalloc_array(nr_pages, sizeof(struct page *), 3344 GFP_KERNEL); 3345 vmas = kvmalloc_array(nr_pages, 3346 sizeof(struct vm_area_struct *), 3347 GFP_KERNEL); 3348 if (!pages || !vmas) { 3349 ret = -ENOMEM; 3350 if (ctx->account_mem) 3351 io_unaccount_mem(ctx->user, nr_pages); 3352 goto err; 3353 } 3354 got_pages = nr_pages; 3355 } 3356 3357 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec), 3358 GFP_KERNEL); 3359 ret = -ENOMEM; 3360 if (!imu->bvec) { 3361 if (ctx->account_mem) 3362 io_unaccount_mem(ctx->user, nr_pages); 3363 goto err; 3364 } 3365 3366 ret = 0; 3367 down_read(&current->mm->mmap_sem); 3368 pret = get_user_pages(ubuf, nr_pages, 3369 FOLL_WRITE | FOLL_LONGTERM, 3370 pages, vmas); 3371 if (pret == nr_pages) { 3372 /* don't support file backed memory */ 3373 for (j = 0; j < nr_pages; j++) { 3374 struct vm_area_struct *vma = vmas[j]; 3375 3376 if (vma->vm_file && 3377 !is_file_hugepages(vma->vm_file)) { 3378 ret = -EOPNOTSUPP; 3379 break; 3380 } 3381 } 3382 } else { 3383 ret = pret < 0 ? pret : -EFAULT; 3384 } 3385 up_read(&current->mm->mmap_sem); 3386 if (ret) { 3387 /* 3388 * if we did partial map, or found file backed vmas, 3389 * release any pages we did get 3390 */ 3391 if (pret > 0) 3392 put_user_pages(pages, pret); 3393 if (ctx->account_mem) 3394 io_unaccount_mem(ctx->user, nr_pages); 3395 kvfree(imu->bvec); 3396 goto err; 3397 } 3398 3399 off = ubuf & ~PAGE_MASK; 3400 size = iov.iov_len; 3401 for (j = 0; j < nr_pages; j++) { 3402 size_t vec_len; 3403 3404 vec_len = min_t(size_t, size, PAGE_SIZE - off); 3405 imu->bvec[j].bv_page = pages[j]; 3406 imu->bvec[j].bv_len = vec_len; 3407 imu->bvec[j].bv_offset = off; 3408 off = 0; 3409 size -= vec_len; 3410 } 3411 /* store original address for later verification */ 3412 imu->ubuf = ubuf; 3413 imu->len = iov.iov_len; 3414 imu->nr_bvecs = nr_pages; 3415 3416 ctx->nr_user_bufs++; 3417 } 3418 kvfree(pages); 3419 kvfree(vmas); 3420 return 0; 3421err: 3422 kvfree(pages); 3423 kvfree(vmas); 3424 io_sqe_buffer_unregister(ctx); 3425 return ret; 3426} 3427 3428static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg) 3429{ 3430 __s32 __user *fds = arg; 3431 int fd; 3432 3433 if (ctx->cq_ev_fd) 3434 return -EBUSY; 3435 3436 if (copy_from_user(&fd, fds, sizeof(*fds))) 3437 return -EFAULT; 3438 3439 ctx->cq_ev_fd = eventfd_ctx_fdget(fd); 3440 if (IS_ERR(ctx->cq_ev_fd)) { 3441 int ret = PTR_ERR(ctx->cq_ev_fd); 3442 ctx->cq_ev_fd = NULL; 3443 return ret; 3444 } 3445 3446 return 0; 3447} 3448 3449static int io_eventfd_unregister(struct io_ring_ctx *ctx) 3450{ 3451 if (ctx->cq_ev_fd) { 3452 eventfd_ctx_put(ctx->cq_ev_fd); 3453 ctx->cq_ev_fd = NULL; 3454 return 0; 3455 } 3456 3457 return -ENXIO; 3458} 3459 3460static void io_ring_ctx_free(struct io_ring_ctx *ctx) 3461{ 3462 io_finish_async(ctx); 3463 if (ctx->sqo_mm) 3464 mmdrop(ctx->sqo_mm); 3465 3466 io_iopoll_reap_events(ctx); 3467 io_sqe_buffer_unregister(ctx); 3468 io_sqe_files_unregister(ctx); 3469 io_eventfd_unregister(ctx); 3470 3471#if defined(CONFIG_UNIX) 3472 if (ctx->ring_sock) { 3473 ctx->ring_sock->file = NULL; /* so that iput() is called */ 3474 sock_release(ctx->ring_sock); 3475 } 3476#endif 3477 3478 io_mem_free(ctx->rings); 3479 io_mem_free(ctx->sq_sqes); 3480 3481 percpu_ref_exit(&ctx->refs); 3482 if (ctx->account_mem) 3483 io_unaccount_mem(ctx->user, 3484 ring_pages(ctx->sq_entries, ctx->cq_entries)); 3485 free_uid(ctx->user); 3486 kfree(ctx); 3487} 3488 3489static __poll_t io_uring_poll(struct file *file, poll_table *wait) 3490{ 3491 struct io_ring_ctx *ctx = file->private_data; 3492 __poll_t mask = 0; 3493 3494 poll_wait(file, &ctx->cq_wait, wait); 3495 /* 3496 * synchronizes with barrier from wq_has_sleeper call in 3497 * io_commit_cqring 3498 */ 3499 smp_rmb(); 3500 if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head != 3501 ctx->rings->sq_ring_entries) 3502 mask |= EPOLLOUT | EPOLLWRNORM; 3503 if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail) 3504 mask |= EPOLLIN | EPOLLRDNORM; 3505 3506 return mask; 3507} 3508 3509static int io_uring_fasync(int fd, struct file *file, int on) 3510{ 3511 struct io_ring_ctx *ctx = file->private_data; 3512 3513 return fasync_helper(fd, file, on, &ctx->cq_fasync); 3514} 3515 3516static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) 3517{ 3518 mutex_lock(&ctx->uring_lock); 3519 percpu_ref_kill(&ctx->refs); 3520 mutex_unlock(&ctx->uring_lock); 3521 3522 io_kill_timeouts(ctx); 3523 io_poll_remove_all(ctx); 3524 io_iopoll_reap_events(ctx); 3525 wait_for_completion(&ctx->ctx_done); 3526 io_ring_ctx_free(ctx); 3527} 3528 3529static int io_uring_release(struct inode *inode, struct file *file) 3530{ 3531 struct io_ring_ctx *ctx = file->private_data; 3532 3533 file->private_data = NULL; 3534 io_ring_ctx_wait_and_kill(ctx); 3535 return 0; 3536} 3537 3538static int io_uring_mmap(struct file *file, struct vm_area_struct *vma) 3539{ 3540 loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT; 3541 unsigned long sz = vma->vm_end - vma->vm_start; 3542 struct io_ring_ctx *ctx = file->private_data; 3543 unsigned long pfn; 3544 struct page *page; 3545 void *ptr; 3546 3547 switch (offset) { 3548 case IORING_OFF_SQ_RING: 3549 case IORING_OFF_CQ_RING: 3550 ptr = ctx->rings; 3551 break; 3552 case IORING_OFF_SQES: 3553 ptr = ctx->sq_sqes; 3554 break; 3555 default: 3556 return -EINVAL; 3557 } 3558 3559 page = virt_to_head_page(ptr); 3560 if (sz > page_size(page)) 3561 return -EINVAL; 3562 3563 pfn = virt_to_phys(ptr) >> PAGE_SHIFT; 3564 return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot); 3565} 3566 3567SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, 3568 u32, min_complete, u32, flags, const sigset_t __user *, sig, 3569 size_t, sigsz) 3570{ 3571 struct io_ring_ctx *ctx; 3572 long ret = -EBADF; 3573 int submitted = 0; 3574 struct fd f; 3575 3576 if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP)) 3577 return -EINVAL; 3578 3579 f = fdget(fd); 3580 if (!f.file) 3581 return -EBADF; 3582 3583 ret = -EOPNOTSUPP; 3584 if (f.file->f_op != &io_uring_fops) 3585 goto out_fput; 3586 3587 ret = -ENXIO; 3588 ctx = f.file->private_data; 3589 if (!percpu_ref_tryget(&ctx->refs)) 3590 goto out_fput; 3591 3592 /* 3593 * For SQ polling, the thread will do all submissions and completions. 3594 * Just return the requested submit count, and wake the thread if 3595 * we were asked to. 3596 */ 3597 ret = 0; 3598 if (ctx->flags & IORING_SETUP_SQPOLL) { 3599 if (flags & IORING_ENTER_SQ_WAKEUP) 3600 wake_up(&ctx->sqo_wait); 3601 submitted = to_submit; 3602 } else if (to_submit) { 3603 bool block_for_last = false; 3604 3605 to_submit = min(to_submit, ctx->sq_entries); 3606 3607 /* 3608 * Allow last submission to block in a series, IFF the caller 3609 * asked to wait for events and we don't currently have 3610 * enough. This potentially avoids an async punt. 3611 */ 3612 if (to_submit == min_complete && 3613 io_cqring_events(ctx->rings) < min_complete) 3614 block_for_last = true; 3615 3616 mutex_lock(&ctx->uring_lock); 3617 submitted = io_ring_submit(ctx, to_submit, block_for_last); 3618 mutex_unlock(&ctx->uring_lock); 3619 } 3620 if (flags & IORING_ENTER_GETEVENTS) { 3621 unsigned nr_events = 0; 3622 3623 min_complete = min(min_complete, ctx->cq_entries); 3624 3625 if (ctx->flags & IORING_SETUP_IOPOLL) { 3626 ret = io_iopoll_check(ctx, &nr_events, min_complete); 3627 } else { 3628 ret = io_cqring_wait(ctx, min_complete, sig, sigsz); 3629 } 3630 } 3631 3632 percpu_ref_put(&ctx->refs); 3633out_fput: 3634 fdput(f); 3635 return submitted ? submitted : ret; 3636} 3637 3638static const struct file_operations io_uring_fops = { 3639 .release = io_uring_release, 3640 .mmap = io_uring_mmap, 3641 .poll = io_uring_poll, 3642 .fasync = io_uring_fasync, 3643}; 3644 3645static int io_allocate_scq_urings(struct io_ring_ctx *ctx, 3646 struct io_uring_params *p) 3647{ 3648 struct io_rings *rings; 3649 size_t size, sq_array_offset; 3650 3651 size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset); 3652 if (size == SIZE_MAX) 3653 return -EOVERFLOW; 3654 3655 rings = io_mem_alloc(size); 3656 if (!rings) 3657 return -ENOMEM; 3658 3659 ctx->rings = rings; 3660 ctx->sq_array = (u32 *)((char *)rings + sq_array_offset); 3661 rings->sq_ring_mask = p->sq_entries - 1; 3662 rings->cq_ring_mask = p->cq_entries - 1; 3663 rings->sq_ring_entries = p->sq_entries; 3664 rings->cq_ring_entries = p->cq_entries; 3665 ctx->sq_mask = rings->sq_ring_mask; 3666 ctx->cq_mask = rings->cq_ring_mask; 3667 ctx->sq_entries = rings->sq_ring_entries; 3668 ctx->cq_entries = rings->cq_ring_entries; 3669 3670 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); 3671 if (size == SIZE_MAX) 3672 return -EOVERFLOW; 3673 3674 ctx->sq_sqes = io_mem_alloc(size); 3675 if (!ctx->sq_sqes) 3676 return -ENOMEM; 3677 3678 return 0; 3679} 3680 3681/* 3682 * Allocate an anonymous fd, this is what constitutes the application 3683 * visible backing of an io_uring instance. The application mmaps this 3684 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled, 3685 * we have to tie this fd to a socket for file garbage collection purposes. 3686 */ 3687static int io_uring_get_fd(struct io_ring_ctx *ctx) 3688{ 3689 struct file *file; 3690 int ret; 3691 3692#if defined(CONFIG_UNIX) 3693 ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP, 3694 &ctx->ring_sock); 3695 if (ret) 3696 return ret; 3697#endif 3698 3699 ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC); 3700 if (ret < 0) 3701 goto err; 3702 3703 file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx, 3704 O_RDWR | O_CLOEXEC); 3705 if (IS_ERR(file)) { 3706 put_unused_fd(ret); 3707 ret = PTR_ERR(file); 3708 goto err; 3709 } 3710 3711#if defined(CONFIG_UNIX) 3712 ctx->ring_sock->file = file; 3713 ctx->ring_sock->sk->sk_user_data = ctx; 3714#endif 3715 fd_install(ret, file); 3716 return ret; 3717err: 3718#if defined(CONFIG_UNIX) 3719 sock_release(ctx->ring_sock); 3720 ctx->ring_sock = NULL; 3721#endif 3722 return ret; 3723} 3724 3725static int io_uring_create(unsigned entries, struct io_uring_params *p) 3726{ 3727 struct user_struct *user = NULL; 3728 struct io_ring_ctx *ctx; 3729 bool account_mem; 3730 int ret; 3731 3732 if (!entries || entries > IORING_MAX_ENTRIES) 3733 return -EINVAL; 3734 3735 /* 3736 * Use twice as many entries for the CQ ring. It's possible for the 3737 * application to drive a higher depth than the size of the SQ ring, 3738 * since the sqes are only used at submission time. This allows for 3739 * some flexibility in overcommitting a bit. 3740 */ 3741 p->sq_entries = roundup_pow_of_two(entries); 3742 p->cq_entries = 2 * p->sq_entries; 3743 3744 user = get_uid(current_user()); 3745 account_mem = !capable(CAP_IPC_LOCK); 3746 3747 if (account_mem) { 3748 ret = io_account_mem(user, 3749 ring_pages(p->sq_entries, p->cq_entries)); 3750 if (ret) { 3751 free_uid(user); 3752 return ret; 3753 } 3754 } 3755 3756 ctx = io_ring_ctx_alloc(p); 3757 if (!ctx) { 3758 if (account_mem) 3759 io_unaccount_mem(user, ring_pages(p->sq_entries, 3760 p->cq_entries)); 3761 free_uid(user); 3762 return -ENOMEM; 3763 } 3764 ctx->compat = in_compat_syscall(); 3765 ctx->account_mem = account_mem; 3766 ctx->user = user; 3767 3768 ret = io_allocate_scq_urings(ctx, p); 3769 if (ret) 3770 goto err; 3771 3772 ret = io_sq_offload_start(ctx, p); 3773 if (ret) 3774 goto err; 3775 3776 ret = io_uring_get_fd(ctx); 3777 if (ret < 0) 3778 goto err; 3779 3780 memset(&p->sq_off, 0, sizeof(p->sq_off)); 3781 p->sq_off.head = offsetof(struct io_rings, sq.head); 3782 p->sq_off.tail = offsetof(struct io_rings, sq.tail); 3783 p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask); 3784 p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries); 3785 p->sq_off.flags = offsetof(struct io_rings, sq_flags); 3786 p->sq_off.dropped = offsetof(struct io_rings, sq_dropped); 3787 p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings; 3788 3789 memset(&p->cq_off, 0, sizeof(p->cq_off)); 3790 p->cq_off.head = offsetof(struct io_rings, cq.head); 3791 p->cq_off.tail = offsetof(struct io_rings, cq.tail); 3792 p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask); 3793 p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries); 3794 p->cq_off.overflow = offsetof(struct io_rings, cq_overflow); 3795 p->cq_off.cqes = offsetof(struct io_rings, cqes); 3796 3797 p->features = IORING_FEAT_SINGLE_MMAP; 3798 return ret; 3799err: 3800 io_ring_ctx_wait_and_kill(ctx); 3801 return ret; 3802} 3803 3804/* 3805 * Sets up an aio uring context, and returns the fd. Applications asks for a 3806 * ring size, we return the actual sq/cq ring sizes (among other things) in the 3807 * params structure passed in. 3808 */ 3809static long io_uring_setup(u32 entries, struct io_uring_params __user *params) 3810{ 3811 struct io_uring_params p; 3812 long ret; 3813 int i; 3814 3815 if (copy_from_user(&p, params, sizeof(p))) 3816 return -EFAULT; 3817 for (i = 0; i < ARRAY_SIZE(p.resv); i++) { 3818 if (p.resv[i]) 3819 return -EINVAL; 3820 } 3821 3822 if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | 3823 IORING_SETUP_SQ_AFF)) 3824 return -EINVAL; 3825 3826 ret = io_uring_create(entries, &p); 3827 if (ret < 0) 3828 return ret; 3829 3830 if (copy_to_user(params, &p, sizeof(p))) 3831 return -EFAULT; 3832 3833 return ret; 3834} 3835 3836SYSCALL_DEFINE2(io_uring_setup, u32, entries, 3837 struct io_uring_params __user *, params) 3838{ 3839 return io_uring_setup(entries, params); 3840} 3841 3842static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, 3843 void __user *arg, unsigned nr_args) 3844 __releases(ctx->uring_lock) 3845 __acquires(ctx->uring_lock) 3846{ 3847 int ret; 3848 3849 /* 3850 * We're inside the ring mutex, if the ref is already dying, then 3851 * someone else killed the ctx or is already going through 3852 * io_uring_register(). 3853 */ 3854 if (percpu_ref_is_dying(&ctx->refs)) 3855 return -ENXIO; 3856 3857 percpu_ref_kill(&ctx->refs); 3858 3859 /* 3860 * Drop uring mutex before waiting for references to exit. If another 3861 * thread is currently inside io_uring_enter() it might need to grab 3862 * the uring_lock to make progress. If we hold it here across the drain 3863 * wait, then we can deadlock. It's safe to drop the mutex here, since 3864 * no new references will come in after we've killed the percpu ref. 3865 */ 3866 mutex_unlock(&ctx->uring_lock); 3867 wait_for_completion(&ctx->ctx_done); 3868 mutex_lock(&ctx->uring_lock); 3869 3870 switch (opcode) { 3871 case IORING_REGISTER_BUFFERS: 3872 ret = io_sqe_buffer_register(ctx, arg, nr_args); 3873 break; 3874 case IORING_UNREGISTER_BUFFERS: 3875 ret = -EINVAL; 3876 if (arg || nr_args) 3877 break; 3878 ret = io_sqe_buffer_unregister(ctx); 3879 break; 3880 case IORING_REGISTER_FILES: 3881 ret = io_sqe_files_register(ctx, arg, nr_args); 3882 break; 3883 case IORING_UNREGISTER_FILES: 3884 ret = -EINVAL; 3885 if (arg || nr_args) 3886 break; 3887 ret = io_sqe_files_unregister(ctx); 3888 break; 3889 case IORING_REGISTER_EVENTFD: 3890 ret = -EINVAL; 3891 if (nr_args != 1) 3892 break; 3893 ret = io_eventfd_register(ctx, arg); 3894 break; 3895 case IORING_UNREGISTER_EVENTFD: 3896 ret = -EINVAL; 3897 if (arg || nr_args) 3898 break; 3899 ret = io_eventfd_unregister(ctx); 3900 break; 3901 default: 3902 ret = -EINVAL; 3903 break; 3904 } 3905 3906 /* bring the ctx back to life */ 3907 reinit_completion(&ctx->ctx_done); 3908 percpu_ref_reinit(&ctx->refs); 3909 return ret; 3910} 3911 3912SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, 3913 void __user *, arg, unsigned int, nr_args) 3914{ 3915 struct io_ring_ctx *ctx; 3916 long ret = -EBADF; 3917 struct fd f; 3918 3919 f = fdget(fd); 3920 if (!f.file) 3921 return -EBADF; 3922 3923 ret = -EOPNOTSUPP; 3924 if (f.file->f_op != &io_uring_fops) 3925 goto out_fput; 3926 3927 ctx = f.file->private_data; 3928 3929 mutex_lock(&ctx->uring_lock); 3930 ret = __io_uring_register(ctx, opcode, arg, nr_args); 3931 mutex_unlock(&ctx->uring_lock); 3932out_fput: 3933 fdput(f); 3934 return ret; 3935} 3936 3937static int __init io_uring_init(void) 3938{ 3939 req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC); 3940 return 0; 3941}; 3942__initcall(io_uring_init);