Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

io_uring: add support for pre-mapped user IO buffers

If we have fixed user buffers, we can map them into the kernel when we
setup the io_uring. That avoids the need to do get_user_pages() for
each and every IO.

To utilize this feature, the application must call io_uring_register()
after having setup an io_uring instance, passing in
IORING_REGISTER_BUFFERS as the opcode. The argument must be a pointer to
an iovec array, and the nr_args should contain how many iovecs the
application wishes to map.

If successful, these buffers are now mapped into the kernel, eligible
for IO. To use these fixed buffers, the application must use the
IORING_OP_READ_FIXED and IORING_OP_WRITE_FIXED opcodes, and then
set sqe->index to the desired buffer index. sqe->addr..sqe->addr+seq->len
must point to somewhere inside the indexed buffer.

The application may register buffers throughout the lifetime of the
io_uring instance. It can call io_uring_register() with
IORING_UNREGISTER_BUFFERS as the opcode to unregister the current set of
buffers, and then register a new set. The application need not
unregister buffers explicitly before shutting down the io_uring
instance.

It's perfectly valid to setup a larger buffer, and then sometimes only
use parts of it for an IO. As long as the range is within the originally
mapped region, it will work just fine.

For now, buffers must not be file backed. If file backed buffers are
passed in, the registration will fail with -1/EOPNOTSUPP. This
restriction may be relaxed in the future.

RLIMIT_MEMLOCK is used to check how much memory we can pin. A somewhat
arbitrary 1G per buffer size is also imposed.

Reviewed-by: Hannes Reinecke <hare@suse.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>

+381 -15
+1
arch/x86/entry/syscalls/syscall_32.tbl
··· 400 400 386 i386 rseq sys_rseq __ia32_sys_rseq 401 401 425 i386 io_uring_setup sys_io_uring_setup __ia32_sys_io_uring_setup 402 402 426 i386 io_uring_enter sys_io_uring_enter __ia32_sys_io_uring_enter 403 + 427 i386 io_uring_register sys_io_uring_register __ia32_sys_io_uring_register
+1
arch/x86/entry/syscalls/syscall_64.tbl
··· 345 345 334 common rseq __x64_sys_rseq 346 346 425 common io_uring_setup __x64_sys_io_uring_setup 347 347 426 common io_uring_enter __x64_sys_io_uring_enter 348 + 427 common io_uring_register __x64_sys_io_uring_register 348 349 349 350 # 350 351 # x32-specific system call numbers start at 512 to avoid cache impact
+361 -13
fs/io_uring.c
··· 45 45 #include <linux/slab.h> 46 46 #include <linux/workqueue.h> 47 47 #include <linux/blkdev.h> 48 + #include <linux/bvec.h> 48 49 #include <linux/net.h> 49 50 #include <net/sock.h> 50 51 #include <net/af_unix.h> ··· 53 52 #include <linux/sched/mm.h> 54 53 #include <linux/uaccess.h> 55 54 #include <linux/nospec.h> 55 + #include <linux/sizes.h> 56 + #include <linux/hugetlb.h> 56 57 57 58 #include <uapi/linux/io_uring.h> 58 59 ··· 82 79 u32 ring_entries; 83 80 u32 overflow; 84 81 struct io_uring_cqe cqes[]; 82 + }; 83 + 84 + struct io_mapped_ubuf { 85 + u64 ubuf; 86 + size_t len; 87 + struct bio_vec *bvec; 88 + unsigned int nr_bvecs; 85 89 }; 86 90 87 91 struct io_ring_ctx { ··· 122 112 struct wait_queue_head cq_wait; 123 113 struct fasync_struct *cq_fasync; 124 114 } ____cacheline_aligned_in_smp; 115 + 116 + /* if used, fixed mapped user buffers */ 117 + unsigned nr_user_bufs; 118 + struct io_mapped_ubuf *user_bufs; 125 119 126 120 struct user_struct *user; 127 121 ··· 746 732 } 747 733 } 748 734 735 + static int io_import_fixed(struct io_ring_ctx *ctx, int rw, 736 + const struct io_uring_sqe *sqe, 737 + struct iov_iter *iter) 738 + { 739 + size_t len = READ_ONCE(sqe->len); 740 + struct io_mapped_ubuf *imu; 741 + unsigned index, buf_index; 742 + size_t offset; 743 + u64 buf_addr; 744 + 745 + /* attempt to use fixed buffers without having provided iovecs */ 746 + if (unlikely(!ctx->user_bufs)) 747 + return -EFAULT; 748 + 749 + buf_index = READ_ONCE(sqe->buf_index); 750 + if (unlikely(buf_index >= ctx->nr_user_bufs)) 751 + return -EFAULT; 752 + 753 + index = array_index_nospec(buf_index, ctx->nr_user_bufs); 754 + imu = &ctx->user_bufs[index]; 755 + buf_addr = READ_ONCE(sqe->addr); 756 + 757 + /* overflow */ 758 + if (buf_addr + len < buf_addr) 759 + return -EFAULT; 760 + /* not inside the mapped region */ 761 + if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len) 762 + return -EFAULT; 763 + 764 + /* 765 + * May not be a start of buffer, set size appropriately 766 + * and advance us to the beginning. 767 + */ 768 + offset = buf_addr - imu->ubuf; 769 + iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len); 770 + if (offset) 771 + iov_iter_advance(iter, offset); 772 + return 0; 773 + } 774 + 749 775 static int io_import_iovec(struct io_ring_ctx *ctx, int rw, 750 776 const struct sqe_submit *s, struct iovec **iovec, 751 777 struct iov_iter *iter) ··· 793 739 const struct io_uring_sqe *sqe = s->sqe; 794 740 void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr)); 795 741 size_t sqe_len = READ_ONCE(sqe->len); 742 + u8 opcode; 743 + 744 + /* 745 + * We're reading ->opcode for the second time, but the first read 746 + * doesn't care whether it's _FIXED or not, so it doesn't matter 747 + * whether ->opcode changes concurrently. The first read does care 748 + * about whether it is a READ or a WRITE, so we don't trust this read 749 + * for that purpose and instead let the caller pass in the read/write 750 + * flag. 751 + */ 752 + opcode = READ_ONCE(sqe->opcode); 753 + if (opcode == IORING_OP_READ_FIXED || 754 + opcode == IORING_OP_WRITE_FIXED) { 755 + ssize_t ret = io_import_fixed(ctx, rw, sqe, iter); 756 + *iovec = NULL; 757 + return ret; 758 + } 796 759 797 760 if (!s->has_user) 798 761 return -EFAULT; ··· 957 886 958 887 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) 959 888 return -EINVAL; 960 - if (unlikely(sqe->addr || sqe->ioprio)) 889 + if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) 961 890 return -EINVAL; 962 891 963 892 fd = READ_ONCE(sqe->fd); ··· 1016 945 ret = io_nop(req, req->user_data); 1017 946 break; 1018 947 case IORING_OP_READV: 948 + if (unlikely(s->sqe->buf_index)) 949 + return -EINVAL; 1019 950 ret = io_read(req, s, force_nonblock, state); 1020 951 break; 1021 952 case IORING_OP_WRITEV: 953 + if (unlikely(s->sqe->buf_index)) 954 + return -EINVAL; 955 + ret = io_write(req, s, force_nonblock, state); 956 + break; 957 + case IORING_OP_READ_FIXED: 958 + ret = io_read(req, s, force_nonblock, state); 959 + break; 960 + case IORING_OP_WRITE_FIXED: 1022 961 ret = io_write(req, s, force_nonblock, state); 1023 962 break; 1024 963 case IORING_OP_FSYNC: ··· 1057 976 return 0; 1058 977 } 1059 978 979 + static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) 980 + { 981 + u8 opcode = READ_ONCE(sqe->opcode); 982 + 983 + return !(opcode == IORING_OP_READ_FIXED || 984 + opcode == IORING_OP_WRITE_FIXED); 985 + } 986 + 1060 987 static void io_sq_wq_submit_work(struct work_struct *work) 1061 988 { 1062 989 struct io_kiocb *req = container_of(work, struct io_kiocb, work); 1063 990 struct sqe_submit *s = &req->submit; 1064 991 const struct io_uring_sqe *sqe = s->sqe; 1065 992 struct io_ring_ctx *ctx = req->ctx; 1066 - mm_segment_t old_fs = get_fs(); 993 + mm_segment_t old_fs; 994 + bool needs_user; 1067 995 int ret; 1068 996 1069 997 /* Ensure we clear previously set forced non-block flag */ 1070 998 req->flags &= ~REQ_F_FORCE_NONBLOCK; 1071 999 req->rw.ki_flags &= ~IOCB_NOWAIT; 1072 1000 1073 - if (!mmget_not_zero(ctx->sqo_mm)) { 1074 - ret = -EFAULT; 1075 - goto err; 1076 - } 1077 - 1078 - use_mm(ctx->sqo_mm); 1079 - set_fs(USER_DS); 1080 - s->has_user = true; 1081 1001 s->needs_lock = true; 1002 + s->has_user = false; 1003 + 1004 + /* 1005 + * If we're doing IO to fixed buffers, we don't need to get/set 1006 + * user context 1007 + */ 1008 + needs_user = io_sqe_needs_user(s->sqe); 1009 + if (needs_user) { 1010 + if (!mmget_not_zero(ctx->sqo_mm)) { 1011 + ret = -EFAULT; 1012 + goto err; 1013 + } 1014 + use_mm(ctx->sqo_mm); 1015 + old_fs = get_fs(); 1016 + set_fs(USER_DS); 1017 + s->has_user = true; 1018 + } 1082 1019 1083 1020 do { 1084 1021 ret = __io_submit_sqe(ctx, req, s, false, NULL); ··· 1110 1011 cond_resched(); 1111 1012 } while (1); 1112 1013 1113 - set_fs(old_fs); 1114 - unuse_mm(ctx->sqo_mm); 1115 - mmput(ctx->sqo_mm); 1014 + if (needs_user) { 1015 + set_fs(old_fs); 1016 + unuse_mm(ctx->sqo_mm); 1017 + mmput(ctx->sqo_mm); 1018 + } 1116 1019 err: 1117 1020 if (ret) { 1118 1021 io_cqring_add_event(ctx, sqe->user_data, ret, 0); ··· 1418 1317 return (bytes + PAGE_SIZE - 1) / PAGE_SIZE; 1419 1318 } 1420 1319 1320 + static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) 1321 + { 1322 + int i, j; 1323 + 1324 + if (!ctx->user_bufs) 1325 + return -ENXIO; 1326 + 1327 + for (i = 0; i < ctx->nr_user_bufs; i++) { 1328 + struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 1329 + 1330 + for (j = 0; j < imu->nr_bvecs; j++) 1331 + put_page(imu->bvec[j].bv_page); 1332 + 1333 + if (ctx->account_mem) 1334 + io_unaccount_mem(ctx->user, imu->nr_bvecs); 1335 + kfree(imu->bvec); 1336 + imu->nr_bvecs = 0; 1337 + } 1338 + 1339 + kfree(ctx->user_bufs); 1340 + ctx->user_bufs = NULL; 1341 + ctx->nr_user_bufs = 0; 1342 + return 0; 1343 + } 1344 + 1345 + static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst, 1346 + void __user *arg, unsigned index) 1347 + { 1348 + struct iovec __user *src; 1349 + 1350 + #ifdef CONFIG_COMPAT 1351 + if (ctx->compat) { 1352 + struct compat_iovec __user *ciovs; 1353 + struct compat_iovec ciov; 1354 + 1355 + ciovs = (struct compat_iovec __user *) arg; 1356 + if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov))) 1357 + return -EFAULT; 1358 + 1359 + dst->iov_base = (void __user *) (unsigned long) ciov.iov_base; 1360 + dst->iov_len = ciov.iov_len; 1361 + return 0; 1362 + } 1363 + #endif 1364 + src = (struct iovec __user *) arg; 1365 + if (copy_from_user(dst, &src[index], sizeof(*dst))) 1366 + return -EFAULT; 1367 + return 0; 1368 + } 1369 + 1370 + static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, 1371 + unsigned nr_args) 1372 + { 1373 + struct vm_area_struct **vmas = NULL; 1374 + struct page **pages = NULL; 1375 + int i, j, got_pages = 0; 1376 + int ret = -EINVAL; 1377 + 1378 + if (ctx->user_bufs) 1379 + return -EBUSY; 1380 + if (!nr_args || nr_args > UIO_MAXIOV) 1381 + return -EINVAL; 1382 + 1383 + ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf), 1384 + GFP_KERNEL); 1385 + if (!ctx->user_bufs) 1386 + return -ENOMEM; 1387 + 1388 + for (i = 0; i < nr_args; i++) { 1389 + struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; 1390 + unsigned long off, start, end, ubuf; 1391 + int pret, nr_pages; 1392 + struct iovec iov; 1393 + size_t size; 1394 + 1395 + ret = io_copy_iov(ctx, &iov, arg, i); 1396 + if (ret) 1397 + break; 1398 + 1399 + /* 1400 + * Don't impose further limits on the size and buffer 1401 + * constraints here, we'll -EINVAL later when IO is 1402 + * submitted if they are wrong. 1403 + */ 1404 + ret = -EFAULT; 1405 + if (!iov.iov_base || !iov.iov_len) 1406 + goto err; 1407 + 1408 + /* arbitrary limit, but we need something */ 1409 + if (iov.iov_len > SZ_1G) 1410 + goto err; 1411 + 1412 + ubuf = (unsigned long) iov.iov_base; 1413 + end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT; 1414 + start = ubuf >> PAGE_SHIFT; 1415 + nr_pages = end - start; 1416 + 1417 + if (ctx->account_mem) { 1418 + ret = io_account_mem(ctx->user, nr_pages); 1419 + if (ret) 1420 + goto err; 1421 + } 1422 + 1423 + ret = 0; 1424 + if (!pages || nr_pages > got_pages) { 1425 + kfree(vmas); 1426 + kfree(pages); 1427 + pages = kmalloc_array(nr_pages, sizeof(struct page *), 1428 + GFP_KERNEL); 1429 + vmas = kmalloc_array(nr_pages, 1430 + sizeof(struct vm_area_struct *), 1431 + GFP_KERNEL); 1432 + if (!pages || !vmas) { 1433 + ret = -ENOMEM; 1434 + if (ctx->account_mem) 1435 + io_unaccount_mem(ctx->user, nr_pages); 1436 + goto err; 1437 + } 1438 + got_pages = nr_pages; 1439 + } 1440 + 1441 + imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec), 1442 + GFP_KERNEL); 1443 + ret = -ENOMEM; 1444 + if (!imu->bvec) { 1445 + if (ctx->account_mem) 1446 + io_unaccount_mem(ctx->user, nr_pages); 1447 + goto err; 1448 + } 1449 + 1450 + ret = 0; 1451 + down_read(&current->mm->mmap_sem); 1452 + pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE, 1453 + pages, vmas); 1454 + if (pret == nr_pages) { 1455 + /* don't support file backed memory */ 1456 + for (j = 0; j < nr_pages; j++) { 1457 + struct vm_area_struct *vma = vmas[j]; 1458 + 1459 + if (vma->vm_file && 1460 + !is_file_hugepages(vma->vm_file)) { 1461 + ret = -EOPNOTSUPP; 1462 + break; 1463 + } 1464 + } 1465 + } else { 1466 + ret = pret < 0 ? pret : -EFAULT; 1467 + } 1468 + up_read(&current->mm->mmap_sem); 1469 + if (ret) { 1470 + /* 1471 + * if we did partial map, or found file backed vmas, 1472 + * release any pages we did get 1473 + */ 1474 + if (pret > 0) { 1475 + for (j = 0; j < pret; j++) 1476 + put_page(pages[j]); 1477 + } 1478 + if (ctx->account_mem) 1479 + io_unaccount_mem(ctx->user, nr_pages); 1480 + goto err; 1481 + } 1482 + 1483 + off = ubuf & ~PAGE_MASK; 1484 + size = iov.iov_len; 1485 + for (j = 0; j < nr_pages; j++) { 1486 + size_t vec_len; 1487 + 1488 + vec_len = min_t(size_t, size, PAGE_SIZE - off); 1489 + imu->bvec[j].bv_page = pages[j]; 1490 + imu->bvec[j].bv_len = vec_len; 1491 + imu->bvec[j].bv_offset = off; 1492 + off = 0; 1493 + size -= vec_len; 1494 + } 1495 + /* store original address for later verification */ 1496 + imu->ubuf = ubuf; 1497 + imu->len = iov.iov_len; 1498 + imu->nr_bvecs = nr_pages; 1499 + 1500 + ctx->nr_user_bufs++; 1501 + } 1502 + kfree(pages); 1503 + kfree(vmas); 1504 + return 0; 1505 + err: 1506 + kfree(pages); 1507 + kfree(vmas); 1508 + io_sqe_buffer_unregister(ctx); 1509 + return ret; 1510 + } 1511 + 1421 1512 static void io_ring_ctx_free(struct io_ring_ctx *ctx) 1422 1513 { 1423 1514 if (ctx->sqo_wq) ··· 1618 1325 mmdrop(ctx->sqo_mm); 1619 1326 1620 1327 io_iopoll_reap_events(ctx); 1328 + io_sqe_buffer_unregister(ctx); 1621 1329 1622 1330 #if defined(CONFIG_UNIX) 1623 1331 if (ctx->ring_sock) ··· 1981 1687 struct io_uring_params __user *, params) 1982 1688 { 1983 1689 return io_uring_setup(entries, params); 1690 + } 1691 + 1692 + static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, 1693 + void __user *arg, unsigned nr_args) 1694 + { 1695 + int ret; 1696 + 1697 + percpu_ref_kill(&ctx->refs); 1698 + wait_for_completion(&ctx->ctx_done); 1699 + 1700 + switch (opcode) { 1701 + case IORING_REGISTER_BUFFERS: 1702 + ret = io_sqe_buffer_register(ctx, arg, nr_args); 1703 + break; 1704 + case IORING_UNREGISTER_BUFFERS: 1705 + ret = -EINVAL; 1706 + if (arg || nr_args) 1707 + break; 1708 + ret = io_sqe_buffer_unregister(ctx); 1709 + break; 1710 + default: 1711 + ret = -EINVAL; 1712 + break; 1713 + } 1714 + 1715 + /* bring the ctx back to life */ 1716 + reinit_completion(&ctx->ctx_done); 1717 + percpu_ref_reinit(&ctx->refs); 1718 + return ret; 1719 + } 1720 + 1721 + SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, 1722 + void __user *, arg, unsigned int, nr_args) 1723 + { 1724 + struct io_ring_ctx *ctx; 1725 + long ret = -EBADF; 1726 + struct fd f; 1727 + 1728 + f = fdget(fd); 1729 + if (!f.file) 1730 + return -EBADF; 1731 + 1732 + ret = -EOPNOTSUPP; 1733 + if (f.file->f_op != &io_uring_fops) 1734 + goto out_fput; 1735 + 1736 + ctx = f.file->private_data; 1737 + 1738 + mutex_lock(&ctx->uring_lock); 1739 + ret = __io_uring_register(ctx, opcode, arg, nr_args); 1740 + mutex_unlock(&ctx->uring_lock); 1741 + out_fput: 1742 + fdput(f); 1743 + return ret; 1984 1744 } 1985 1745 1986 1746 static int __init io_uring_init(void)
+2
include/linux/syscalls.h
··· 315 315 asmlinkage long sys_io_uring_enter(unsigned int fd, u32 to_submit, 316 316 u32 min_complete, u32 flags, 317 317 const sigset_t __user *sig, size_t sigsz); 318 + asmlinkage long sys_io_uring_register(unsigned int fd, unsigned int op, 319 + void __user *arg, unsigned int nr_args); 318 320 319 321 /* fs/xattr.c */ 320 322 asmlinkage long sys_setxattr(const char __user *path, const char __user *name,
+3 -1
include/uapi/asm-generic/unistd.h
··· 744 744 __SYSCALL(__NR_io_uring_setup, sys_io_uring_setup) 745 745 #define __NR_io_uring_enter 426 746 746 __SYSCALL(__NR_io_uring_enter, sys_io_uring_enter) 747 + #define __NR_io_uring_register 427 748 + __SYSCALL(__NR_io_uring_register, sys_io_uring_register) 747 749 748 750 #undef __NR_syscalls 749 - #define __NR_syscalls 427 751 + #define __NR_syscalls 428 750 752 751 753 /* 752 754 * 32 bit systems traditionally used different
+12 -1
include/uapi/linux/io_uring.h
··· 27 27 __u32 fsync_flags; 28 28 }; 29 29 __u64 user_data; /* data to be passed back at completion time */ 30 - __u64 __pad2[3]; 30 + union { 31 + __u16 buf_index; /* index into fixed buffers, if used */ 32 + __u64 __pad2[3]; 33 + }; 31 34 }; 32 35 33 36 /* ··· 42 39 #define IORING_OP_READV 1 43 40 #define IORING_OP_WRITEV 2 44 41 #define IORING_OP_FSYNC 3 42 + #define IORING_OP_READ_FIXED 4 43 + #define IORING_OP_WRITE_FIXED 5 45 44 46 45 /* 47 46 * sqe->fsync_flags ··· 107 102 struct io_sqring_offsets sq_off; 108 103 struct io_cqring_offsets cq_off; 109 104 }; 105 + 106 + /* 107 + * io_uring_register(2) opcodes and arguments 108 + */ 109 + #define IORING_REGISTER_BUFFERS 0 110 + #define IORING_UNREGISTER_BUFFERS 1 110 111 111 112 #endif
+1
kernel/sys_ni.c
··· 48 48 COND_SYSCALL_COMPAT(io_pgetevents); 49 49 COND_SYSCALL(io_uring_setup); 50 50 COND_SYSCALL(io_uring_enter); 51 + COND_SYSCALL(io_uring_register); 51 52 52 53 /* fs/xattr.c */ 53 54