Linux kernel mirror (for testing) git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel os linux

aio: allocate kiocbs in batches

In testing aio on a fast storage device, I found that the context lock
takes up a fair amount of cpu time in the I/O submission path. The reason
is that we take it for every I/O submitted (see __aio_get_req). Since we
know how many I/Os are passed to io_submit, we can preallocate the kiocbs
in batches, reducing the number of times we take and release the lock.

In my testing, I was able to reduce the amount of time spent in
_raw_spin_lock_irq by .56% (average of 3 runs). The command I used to
test this was:

aio-stress -O -o 2 -o 3 -r 8 -d 128 -b 32 -i 32 -s 16384 <dev>

I also tested the patch with various numbers of events passed to
io_submit, and I ran the xfstests aio group of tests to ensure I didn't
break anything.

Signed-off-by: Jeff Moyer <jmoyer@redhat.com>
Cc: Daniel Ehrenberg <dehrenberg@google.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>

authored by

Jeff Moyer and committed by
Linus Torvalds
080d676d 2ca02df6

+113 -34
+112 -34
fs/aio.c
··· 440 440 static struct kiocb *__aio_get_req(struct kioctx *ctx) 441 441 { 442 442 struct kiocb *req = NULL; 443 - struct aio_ring *ring; 444 - int okay = 0; 445 443 446 444 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL); 447 445 if (unlikely(!req)) ··· 457 459 INIT_LIST_HEAD(&req->ki_run_list); 458 460 req->ki_eventfd = NULL; 459 461 460 - /* Check if the completion queue has enough free space to 461 - * accept an event from this io. 462 - */ 463 - spin_lock_irq(&ctx->ctx_lock); 464 - ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0); 465 - if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) { 466 - list_add(&req->ki_list, &ctx->active_reqs); 467 - ctx->reqs_active++; 468 - okay = 1; 469 - } 470 - kunmap_atomic(ring, KM_USER0); 471 - spin_unlock_irq(&ctx->ctx_lock); 472 - 473 - if (!okay) { 474 - kmem_cache_free(kiocb_cachep, req); 475 - req = NULL; 476 - } 477 - 478 462 return req; 479 463 } 480 464 481 - static inline struct kiocb *aio_get_req(struct kioctx *ctx) 465 + /* 466 + * struct kiocb's are allocated in batches to reduce the number of 467 + * times the ctx lock is acquired and released. 468 + */ 469 + #define KIOCB_BATCH_SIZE 32L 470 + struct kiocb_batch { 471 + struct list_head head; 472 + long count; /* number of requests left to allocate */ 473 + }; 474 + 475 + static void kiocb_batch_init(struct kiocb_batch *batch, long total) 476 + { 477 + INIT_LIST_HEAD(&batch->head); 478 + batch->count = total; 479 + } 480 + 481 + static void kiocb_batch_free(struct kiocb_batch *batch) 482 + { 483 + struct kiocb *req, *n; 484 + 485 + list_for_each_entry_safe(req, n, &batch->head, ki_batch) { 486 + list_del(&req->ki_batch); 487 + kmem_cache_free(kiocb_cachep, req); 488 + } 489 + } 490 + 491 + /* 492 + * Allocate a batch of kiocbs. This avoids taking and dropping the 493 + * context lock a lot during setup. 494 + */ 495 + static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch) 496 + { 497 + unsigned short allocated, to_alloc; 498 + long avail; 499 + bool called_fput = false; 500 + struct kiocb *req, *n; 501 + struct aio_ring *ring; 502 + 503 + to_alloc = min(batch->count, KIOCB_BATCH_SIZE); 504 + for (allocated = 0; allocated < to_alloc; allocated++) { 505 + req = __aio_get_req(ctx); 506 + if (!req) 507 + /* allocation failed, go with what we've got */ 508 + break; 509 + list_add(&req->ki_batch, &batch->head); 510 + } 511 + 512 + if (allocated == 0) 513 + goto out; 514 + 515 + retry: 516 + spin_lock_irq(&ctx->ctx_lock); 517 + ring = kmap_atomic(ctx->ring_info.ring_pages[0]); 518 + 519 + avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active; 520 + BUG_ON(avail < 0); 521 + if (avail == 0 && !called_fput) { 522 + /* 523 + * Handle a potential starvation case. It is possible that 524 + * we hold the last reference on a struct file, causing us 525 + * to delay the final fput to non-irq context. In this case, 526 + * ctx->reqs_active is artificially high. Calling the fput 527 + * routine here may free up a slot in the event completion 528 + * ring, allowing this allocation to succeed. 529 + */ 530 + kunmap_atomic(ring); 531 + spin_unlock_irq(&ctx->ctx_lock); 532 + aio_fput_routine(NULL); 533 + called_fput = true; 534 + goto retry; 535 + } 536 + 537 + if (avail < allocated) { 538 + /* Trim back the number of requests. */ 539 + list_for_each_entry_safe(req, n, &batch->head, ki_batch) { 540 + list_del(&req->ki_batch); 541 + kmem_cache_free(kiocb_cachep, req); 542 + if (--allocated <= avail) 543 + break; 544 + } 545 + } 546 + 547 + batch->count -= allocated; 548 + list_for_each_entry(req, &batch->head, ki_batch) { 549 + list_add(&req->ki_list, &ctx->active_reqs); 550 + ctx->reqs_active++; 551 + } 552 + 553 + kunmap_atomic(ring); 554 + spin_unlock_irq(&ctx->ctx_lock); 555 + 556 + out: 557 + return allocated; 558 + } 559 + 560 + static inline struct kiocb *aio_get_req(struct kioctx *ctx, 561 + struct kiocb_batch *batch) 482 562 { 483 563 struct kiocb *req; 484 - /* Handle a potential starvation case -- should be exceedingly rare as 485 - * requests will be stuck on fput_head only if the aio_fput_routine is 486 - * delayed and the requests were the last user of the struct file. 487 - */ 488 - req = __aio_get_req(ctx); 489 - if (unlikely(NULL == req)) { 490 - aio_fput_routine(NULL); 491 - req = __aio_get_req(ctx); 492 - } 564 + 565 + if (list_empty(&batch->head)) 566 + if (kiocb_batch_refill(ctx, batch) == 0) 567 + return NULL; 568 + req = list_first_entry(&batch->head, struct kiocb, ki_batch); 569 + list_del(&req->ki_batch); 493 570 return req; 494 571 } 495 572 ··· 1588 1515 } 1589 1516 1590 1517 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, 1591 - struct iocb *iocb, bool compat) 1518 + struct iocb *iocb, struct kiocb_batch *batch, 1519 + bool compat) 1592 1520 { 1593 1521 struct kiocb *req; 1594 1522 struct file *file; ··· 1615 1541 if (unlikely(!file)) 1616 1542 return -EBADF; 1617 1543 1618 - req = aio_get_req(ctx); /* returns with 2 references to req */ 1544 + req = aio_get_req(ctx, batch); /* returns with 2 references to req */ 1619 1545 if (unlikely(!req)) { 1620 1546 fput(file); 1621 1547 return -EAGAIN; ··· 1695 1621 { 1696 1622 struct kioctx *ctx; 1697 1623 long ret = 0; 1698 - int i; 1624 + int i = 0; 1699 1625 struct blk_plug plug; 1626 + struct kiocb_batch batch; 1700 1627 1701 1628 if (unlikely(nr < 0)) 1702 1629 return -EINVAL; ··· 1713 1638 pr_debug("EINVAL: io_submit: invalid context id\n"); 1714 1639 return -EINVAL; 1715 1640 } 1641 + 1642 + kiocb_batch_init(&batch, nr); 1716 1643 1717 1644 blk_start_plug(&plug); 1718 1645 ··· 1736 1659 break; 1737 1660 } 1738 1661 1739 - ret = io_submit_one(ctx, user_iocb, &tmp, compat); 1662 + ret = io_submit_one(ctx, user_iocb, &tmp, &batch, compat); 1740 1663 if (ret) 1741 1664 break; 1742 1665 } 1743 1666 blk_finish_plug(&plug); 1744 1667 1668 + kiocb_batch_free(&batch); 1745 1669 put_ioctx(ctx); 1746 1670 return i ? i : ret; 1747 1671 }
+1
include/linux/aio.h
··· 117 117 118 118 struct list_head ki_list; /* the aio core uses this 119 119 * for cancellation */ 120 + struct list_head ki_batch; /* batch allocation */ 120 121 121 122 /* 122 123 * If the aio_resfd field of the userspace iocb is not zero,