Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

io_uring: support for IO polling

Add support for a polled io_uring instance. When a read or write is
submitted to a polled io_uring, the application must poll for
completions on the CQ ring through io_uring_enter(2). Polled IO may not
generate IRQ completions, hence they need to be actively found by the
application itself.

To use polling, io_uring_setup() must be used with the
IORING_SETUP_IOPOLL flag being set. It is illegal to mix and match
polled and non-polled IO on an io_uring.

Reviewed-by: Hannes Reinecke <hare@suse.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>

+271 -9
+266 -9
fs/io_uring.c
··· 124 124 125 125 struct { 126 126 spinlock_t completion_lock; 127 + bool poll_multi_file; 128 + /* 129 + * ->poll_list is protected by the ctx->uring_lock for 130 + * io_uring instances that don't use IORING_SETUP_SQPOLL. 131 + * For SQPOLL, only the single threaded io_sq_thread() will 132 + * manipulate the list, hence no extra locking is needed there. 133 + */ 134 + struct list_head poll_list; 127 135 } ____cacheline_aligned_in_smp; 128 136 129 137 #if defined(CONFIG_UNIX) ··· 143 135 const struct io_uring_sqe *sqe; 144 136 unsigned short index; 145 137 bool has_user; 138 + bool needs_lock; 146 139 }; 147 140 148 141 struct io_kiocb { ··· 155 146 struct list_head list; 156 147 unsigned int flags; 157 148 #define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */ 149 + #define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */ 158 150 u64 user_data; 151 + u64 error; 159 152 160 153 struct work_struct work; 161 154 }; 162 155 163 156 #define IO_PLUG_THRESHOLD 2 157 + #define IO_IOPOLL_BATCH 8 164 158 165 159 static struct kmem_cache *req_cachep; 166 160 ··· 208 196 mutex_init(&ctx->uring_lock); 209 197 init_waitqueue_head(&ctx->wait); 210 198 spin_lock_init(&ctx->completion_lock); 199 + INIT_LIST_HEAD(&ctx->poll_list); 211 200 return ctx; 212 201 } 213 202 ··· 310 297 return NULL; 311 298 } 312 299 300 + static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr) 301 + { 302 + if (*nr) { 303 + kmem_cache_free_bulk(req_cachep, *nr, reqs); 304 + io_ring_drop_ctx_refs(ctx, *nr); 305 + *nr = 0; 306 + } 307 + } 308 + 313 309 static void io_free_req(struct io_kiocb *req) 314 310 { 315 311 io_ring_drop_ctx_refs(req->ctx, 1); 316 312 kmem_cache_free(req_cachep, req); 313 + } 314 + 315 + /* 316 + * Find and free completed poll iocbs 317 + */ 318 + static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, 319 + struct list_head *done) 320 + { 321 + void *reqs[IO_IOPOLL_BATCH]; 322 + struct io_kiocb *req; 323 + int to_free = 0; 324 + 325 + while (!list_empty(done)) { 326 + req = list_first_entry(done, struct io_kiocb, list); 327 + list_del(&req->list); 328 + 329 + io_cqring_fill_event(ctx, req->user_data, req->error, 0); 330 + 331 + reqs[to_free++] = req; 332 + (*nr_events)++; 333 + 334 + fput(req->rw.ki_filp); 335 + if (to_free == ARRAY_SIZE(reqs)) 336 + io_free_req_many(ctx, reqs, &to_free); 337 + } 338 + io_commit_cqring(ctx); 339 + 340 + io_free_req_many(ctx, reqs, &to_free); 341 + } 342 + 343 + static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, 344 + long min) 345 + { 346 + struct io_kiocb *req, *tmp; 347 + LIST_HEAD(done); 348 + bool spin; 349 + int ret; 350 + 351 + /* 352 + * Only spin for completions if we don't have multiple devices hanging 353 + * off our complete list, and we're under the requested amount. 354 + */ 355 + spin = !ctx->poll_multi_file && *nr_events < min; 356 + 357 + ret = 0; 358 + list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) { 359 + struct kiocb *kiocb = &req->rw; 360 + 361 + /* 362 + * Move completed entries to our local list. If we find a 363 + * request that requires polling, break out and complete 364 + * the done list first, if we have entries there. 365 + */ 366 + if (req->flags & REQ_F_IOPOLL_COMPLETED) { 367 + list_move_tail(&req->list, &done); 368 + continue; 369 + } 370 + if (!list_empty(&done)) 371 + break; 372 + 373 + ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin); 374 + if (ret < 0) 375 + break; 376 + 377 + if (ret && spin) 378 + spin = false; 379 + ret = 0; 380 + } 381 + 382 + if (!list_empty(&done)) 383 + io_iopoll_complete(ctx, nr_events, &done); 384 + 385 + return ret; 386 + } 387 + 388 + /* 389 + * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a 390 + * non-spinning poll check - we'll still enter the driver poll loop, but only 391 + * as a non-spinning completion check. 392 + */ 393 + static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events, 394 + long min) 395 + { 396 + while (!list_empty(&ctx->poll_list)) { 397 + int ret; 398 + 399 + ret = io_do_iopoll(ctx, nr_events, min); 400 + if (ret < 0) 401 + return ret; 402 + if (!min || *nr_events >= min) 403 + return 0; 404 + } 405 + 406 + return 1; 407 + } 408 + 409 + /* 410 + * We can't just wait for polled events to come to us, we have to actively 411 + * find and complete them. 412 + */ 413 + static void io_iopoll_reap_events(struct io_ring_ctx *ctx) 414 + { 415 + if (!(ctx->flags & IORING_SETUP_IOPOLL)) 416 + return; 417 + 418 + mutex_lock(&ctx->uring_lock); 419 + while (!list_empty(&ctx->poll_list)) { 420 + unsigned int nr_events = 0; 421 + 422 + io_iopoll_getevents(ctx, &nr_events, 1); 423 + } 424 + mutex_unlock(&ctx->uring_lock); 425 + } 426 + 427 + static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, 428 + long min) 429 + { 430 + int ret = 0; 431 + 432 + do { 433 + int tmin = 0; 434 + 435 + if (*nr_events < min) 436 + tmin = min - *nr_events; 437 + 438 + ret = io_iopoll_getevents(ctx, nr_events, tmin); 439 + if (ret <= 0) 440 + break; 441 + ret = 0; 442 + } while (min && !*nr_events && !need_resched()); 443 + 444 + return ret; 317 445 } 318 446 319 447 static void kiocb_end_write(struct kiocb *kiocb) ··· 483 329 io_free_req(req); 484 330 } 485 331 332 + static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) 333 + { 334 + struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); 335 + 336 + kiocb_end_write(kiocb); 337 + 338 + req->error = res; 339 + if (res != -EAGAIN) 340 + req->flags |= REQ_F_IOPOLL_COMPLETED; 341 + } 342 + 343 + /* 344 + * After the iocb has been issued, it's safe to be found on the poll list. 345 + * Adding the kiocb to the list AFTER submission ensures that we don't 346 + * find it from a io_iopoll_getevents() thread before the issuer is done 347 + * accessing the kiocb cookie. 348 + */ 349 + static void io_iopoll_req_issued(struct io_kiocb *req) 350 + { 351 + struct io_ring_ctx *ctx = req->ctx; 352 + 353 + /* 354 + * Track whether we have multiple files in our lists. This will impact 355 + * how we do polling eventually, not spinning if we're on potentially 356 + * different devices. 357 + */ 358 + if (list_empty(&ctx->poll_list)) { 359 + ctx->poll_multi_file = false; 360 + } else if (!ctx->poll_multi_file) { 361 + struct io_kiocb *list_req; 362 + 363 + list_req = list_first_entry(&ctx->poll_list, struct io_kiocb, 364 + list); 365 + if (list_req->rw.ki_filp != req->rw.ki_filp) 366 + ctx->poll_multi_file = true; 367 + } 368 + 369 + /* 370 + * For fast devices, IO may have already completed. If it has, add 371 + * it to the front so we find it first. 372 + */ 373 + if (req->flags & REQ_F_IOPOLL_COMPLETED) 374 + list_add(&req->list, &ctx->poll_list); 375 + else 376 + list_add_tail(&req->list, &ctx->poll_list); 377 + } 378 + 486 379 /* 487 380 * If we tracked the file through the SCM inflight mechanism, we could support 488 381 * any file. For now, just ensure that anything potentially problematic is done ··· 550 349 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe, 551 350 bool force_nonblock) 552 351 { 352 + struct io_ring_ctx *ctx = req->ctx; 553 353 struct kiocb *kiocb = &req->rw; 554 354 unsigned ioprio; 555 355 int fd, ret; ··· 586 384 kiocb->ki_flags |= IOCB_NOWAIT; 587 385 req->flags |= REQ_F_FORCE_NONBLOCK; 588 386 } 589 - if (kiocb->ki_flags & IOCB_HIPRI) { 590 - ret = -EINVAL; 591 - goto out_fput; 592 - } 387 + if (ctx->flags & IORING_SETUP_IOPOLL) { 388 + ret = -EOPNOTSUPP; 389 + if (!(kiocb->ki_flags & IOCB_DIRECT) || 390 + !kiocb->ki_filp->f_op->iopoll) 391 + goto out_fput; 593 392 594 - kiocb->ki_complete = io_complete_rw; 393 + req->error = 0; 394 + kiocb->ki_flags |= IOCB_HIPRI; 395 + kiocb->ki_complete = io_complete_rw_iopoll; 396 + } else { 397 + if (kiocb->ki_flags & IOCB_HIPRI) { 398 + ret = -EINVAL; 399 + goto out_fput; 400 + } 401 + kiocb->ki_complete = io_complete_rw; 402 + } 595 403 return 0; 596 404 out_fput: 597 405 fput(kiocb->ki_filp); ··· 755 543 struct io_ring_ctx *ctx = req->ctx; 756 544 long err = 0; 757 545 546 + if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) 547 + return -EINVAL; 548 + 758 549 /* 759 550 * Twilight zone - it's possible that someone issued an opcode that 760 551 * has a file attached, then got -EAGAIN on submission, and changed ··· 781 566 if (req->rw.ki_filp) 782 567 return 0; 783 568 569 + if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 570 + return -EINVAL; 784 571 if (unlikely(sqe->addr || sqe->ioprio)) 785 572 return -EINVAL; 786 573 ··· 854 637 break; 855 638 } 856 639 857 - return ret; 640 + if (ret) 641 + return ret; 642 + 643 + if (ctx->flags & IORING_SETUP_IOPOLL) { 644 + if (req->error == -EAGAIN) 645 + return -EAGAIN; 646 + 647 + /* workqueue context doesn't hold uring_lock, grab it now */ 648 + if (s->needs_lock) 649 + mutex_lock(&ctx->uring_lock); 650 + io_iopoll_req_issued(req); 651 + if (s->needs_lock) 652 + mutex_unlock(&ctx->uring_lock); 653 + } 654 + 655 + return 0; 858 656 } 859 657 860 658 static void io_sq_wq_submit_work(struct work_struct *work) ··· 893 661 use_mm(ctx->sqo_mm); 894 662 set_fs(USER_DS); 895 663 s->has_user = true; 664 + s->needs_lock = true; 896 665 897 - ret = __io_submit_sqe(ctx, req, s, false); 666 + do { 667 + ret = __io_submit_sqe(ctx, req, s, false); 668 + /* 669 + * We can get EAGAIN for polled IO even though we're forcing 670 + * a sync submission from here, since we can't wait for 671 + * request slots on the block side. 672 + */ 673 + if (ret != -EAGAIN) 674 + break; 675 + cond_resched(); 676 + } while (1); 898 677 899 678 set_fs(old_fs); 900 679 unuse_mm(ctx->sqo_mm); ··· 1042 799 break; 1043 800 1044 801 s.has_user = true; 802 + s.needs_lock = false; 803 + 1045 804 ret = io_submit_sqe(ctx, &s); 1046 805 if (ret) { 1047 806 io_drop_sqring(ctx); ··· 1192 947 destroy_workqueue(ctx->sqo_wq); 1193 948 if (ctx->sqo_mm) 1194 949 mmdrop(ctx->sqo_mm); 950 + 951 + io_iopoll_reap_events(ctx); 952 + 1195 953 #if defined(CONFIG_UNIX) 1196 954 if (ctx->ring_sock) 1197 955 sock_release(ctx->ring_sock); ··· 1241 993 percpu_ref_kill(&ctx->refs); 1242 994 mutex_unlock(&ctx->uring_lock); 1243 995 996 + io_iopoll_reap_events(ctx); 1244 997 wait_for_completion(&ctx->ctx_done); 1245 998 io_ring_ctx_free(ctx); 1246 999 } ··· 1323 1074 goto out_ctx; 1324 1075 } 1325 1076 if (flags & IORING_ENTER_GETEVENTS) { 1077 + unsigned nr_events = 0; 1078 + 1326 1079 min_complete = min(min_complete, ctx->cq_entries); 1327 1080 1328 1081 /* ··· 1336 1085 if (submitted < to_submit) 1337 1086 min_complete = min_t(unsigned, submitted, min_complete); 1338 1087 1339 - ret = io_cqring_wait(ctx, min_complete, sig, sigsz); 1088 + if (ctx->flags & IORING_SETUP_IOPOLL) { 1089 + mutex_lock(&ctx->uring_lock); 1090 + ret = io_iopoll_check(ctx, &nr_events, min_complete); 1091 + mutex_unlock(&ctx->uring_lock); 1092 + } else { 1093 + ret = io_cqring_wait(ctx, min_complete, sig, sigsz); 1094 + } 1340 1095 } 1341 1096 1342 1097 out_ctx: ··· 1539 1282 return -EINVAL; 1540 1283 } 1541 1284 1542 - if (p.flags) 1285 + if (p.flags & ~IORING_SETUP_IOPOLL) 1543 1286 return -EINVAL; 1544 1287 1545 1288 ret = io_uring_create(entries, &p);
+5
include/uapi/linux/io_uring.h
··· 30 30 __u64 __pad2[3]; 31 31 }; 32 32 33 + /* 34 + * io_uring_setup() flags 35 + */ 36 + #define IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */ 37 + 33 38 #define IORING_OP_NOP 0 34 39 #define IORING_OP_READV 1 35 40 #define IORING_OP_WRITEV 2