#include "uring_context.hpp" #include enum class UringOpType { Accept, Read, Write }; struct Event { UringOpType type; void *data; }; struct AcceptOperationData { void *op_state_ptr; void (*completion_handler)(void *op_state_ptr, int client_fd) noexcept; }; struct ReadOperationData { void *op_state_ptr; void (*completion_handler)(void *op_state_ptr, ssize_t bytes_read) noexcept; }; struct WriteOperationData { void *op_state_ptr; void (*completion_handler)(void *op_state_ptr, ssize_t bytes_written) noexcept; }; UringContext::UringContext(unsigned entries) { int ret = io_uring_queue_init(entries, &m_ring, 0); if (ret < 0) { throw std::runtime_error("io_uring_queue_init failed"); } } UringContext::UringContext(UringContext &&other) noexcept : m_ring(other.m_ring) { if (other.m_ring.ring_fd != -1) { io_uring_queue_exit(&m_ring); m_ring = other.m_ring; std::memset(&other.m_ring, 0, sizeof(other.m_ring)); other.m_ring.ring_fd = -1; } } UringContext &UringContext::operator=(UringContext &&other) noexcept { if (other.m_ring.ring_fd != -1 && this != &other) { io_uring_queue_exit(&m_ring); m_ring = other.m_ring; std::memset(&other.m_ring, 0, sizeof(other.m_ring)); other.m_ring.ring_fd = -1; } return *this; } void UringContext::run_once() { struct io_uring_cqe *cqe; int ret = io_uring_wait_cqe(&m_ring, &cqe); if (ret < 0) { std::println(std::cerr, "io_uring_wait_cqe failed with error: {}", strerror(-ret)); return; } Event *event = static_cast(io_uring_cqe_get_data(cqe)); if (event) { switch (event->type) { case UringOpType::Accept: { auto *user_data = static_cast(event->data); assert(user_data != nullptr); assert(user_data->completion_handler != nullptr); user_data->completion_handler(user_data->op_state_ptr, cqe->res); break; } case UringOpType::Read: { auto *user_data = static_cast(event->data); assert(user_data != nullptr); assert(user_data->completion_handler != nullptr); user_data->completion_handler(user_data->op_state_ptr, static_cast(cqe->res)); break; } case UringOpType::Write: { auto *user_data = static_cast(event->data); assert(user_data != nullptr); assert(user_data->completion_handler != nullptr); user_data->completion_handler(user_data->op_state_ptr, static_cast(cqe->res)); break; } default: break; } } io_uring_cqe_seen(&m_ring, cqe); } UringContext::~UringContext() noexcept { io_uring_queue_exit(&m_ring); } struct io_uring *UringContext::ring() noexcept { return &m_ring; } struct AcceptSender { RawFileDescriptor m_server_fd; UringContext &m_uring_ctx; AcceptSender(RawFileDescriptor server_fd, UringContext &uring_ctx) : m_server_fd(server_fd), m_uring_ctx(uring_ctx) { } using sender_concept = stdexec::sender_t; using completion_signatures = stdexec::completion_signatures; template struct operation_state { using operation_state_t = operation_state; RawFileDescriptor m_server_fd; UringContext &m_uring_ctx; Receiver m_receiver; AcceptOperationData m_completion_data{}; Event m_event{UringOpType::Accept, nullptr}; static void complete(void *type_erased_op_state_ptr, int client_fd) noexcept { auto *op_state = static_cast(type_erased_op_state_ptr); if (client_fd >= 0) { stdexec::set_value(std::move(op_state->m_receiver), client_fd); } else { stdexec::set_error(std::move(op_state->m_receiver), std::make_exception_ptr(std::runtime_error("Accept failed"))); } } operation_state(RawFileDescriptor server_fd, UringContext &uring_ctx, Receiver &&receiver) : m_server_fd(server_fd), m_uring_ctx(uring_ctx), m_receiver(std::move(receiver)) { m_completion_data.op_state_ptr = this; m_completion_data.completion_handler = &operation_state_t::complete; m_event.type = UringOpType::Accept; m_event.data = &m_completion_data; } void start() noexcept { // Enqueue accept operation using io_uring io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring()); // Get a submission queue entry if (!sqe) { stdexec::set_error(std::move(m_receiver), std::make_exception_ptr(std::runtime_error("Failed to get SQE"))); return; } io_uring_prep_accept(sqe, m_server_fd, nullptr, nullptr, 0); // store Event* so run() can dispatch by type io_uring_sqe_set_data(sqe, &this->m_event); int ret = io_uring_submit(m_uring_ctx.ring()); if (ret < 0) { stdexec::set_error(std::move(m_receiver), std::make_exception_ptr(std::runtime_error("io_uring_submit failed"))); return; } } }; template friend auto tag_invoke(stdexec::connect_t, AcceptSender self, Receiver r) noexcept { return typename AcceptSender::template operation_state(self.m_server_fd, self.m_uring_ctx, std::move(r)); } }; kev::task UringContext::async_accept(RawFileDescriptor server_fd) { co_return co_await AcceptSender(server_fd, *this); } struct ReadSender { RawFileDescriptor m_fd; UringContext &m_uring_ctx; std::span m_buffer; using sender_concept = stdexec::sender_t; using completion_signatures = stdexec::completion_signatures; template struct operation_state { using operation_state_t = operation_state; RawFileDescriptor m_fd; UringContext &m_uring_ctx; std::span m_buffer; Receiver m_receiver; Event m_event{UringOpType::Read, nullptr}; ReadOperationData m_completion_data; static void complete(void *type_erased_op_state_ptr, ssize_t bytes_read) noexcept { auto *op_state = static_cast(type_erased_op_state_ptr); if (bytes_read >= 0) { stdexec::set_value(std::move(op_state->m_receiver), static_cast(bytes_read)); } else { stdexec::set_error(std::move(op_state->m_receiver), std::make_exception_ptr(std::runtime_error("Read failed"))); } } operation_state(RawFileDescriptor fd, UringContext &uring_ctx, std::span buffer, Receiver &&receiver) : m_fd(fd), m_uring_ctx(uring_ctx), m_buffer(buffer), m_receiver(std::move(receiver)) { m_completion_data.op_state_ptr = this; m_completion_data.completion_handler = &operation_state_t::complete; m_event.type = UringOpType::Read; m_event.data = &m_completion_data; } void start() noexcept { // Enqueue read operation using io_uring io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring()); if (!sqe) { stdexec::set_error(std::move(m_receiver), std::make_exception_ptr(std::runtime_error("Failed to get SQE"))); return; } io_uring_prep_read(sqe, m_fd, m_buffer.data(), m_buffer.size(), 0); // store Event* so run() can dispatch by type io_uring_sqe_set_data(sqe, &this->m_event); int ret = io_uring_submit(m_uring_ctx.ring()); if (ret < 0) { stdexec::set_error(std::move(m_receiver), std::make_exception_ptr(std::runtime_error("io_uring_submit failed"))); return; } } }; template Receiver> friend auto tag_invoke(stdexec::connect_t, ReadSender self, Receiver r) noexcept { return typename ReadSender::template operation_state(self.m_fd, self.m_uring_ctx, self.m_buffer, std::move(r)); } }; kev::task UringContext::async_read(RawFileDescriptor fd, std::span buffer) { co_return co_await ReadSender{std::move(fd), *this, buffer}; } struct WriteSender { RawFileDescriptor m_fd; UringContext &m_uring_ctx; std::span m_buffer; using sender_concept = stdexec::sender_t; using completion_signatures = stdexec::completion_signatures; template struct operation_state { using operation_state_t = operation_state; RawFileDescriptor m_fd; UringContext &m_uring_ctx; std::span m_buffer; Receiver m_receiver; Event m_event{UringOpType::Write, nullptr}; WriteOperationData m_completion_data; static void complete(void *type_erased_op_state_ptr, ssize_t bytes_written) noexcept { auto *op_state = static_cast(type_erased_op_state_ptr); if (bytes_written >= 0) { stdexec::set_value(std::move(op_state->m_receiver), static_cast(bytes_written)); } else { stdexec::set_error(std::move(op_state->m_receiver), std::make_exception_ptr(std::runtime_error("Write failed"))); } } operation_state(RawFileDescriptor fd, UringContext &uring_ctx, std::span buffer, Receiver &&receiver) : m_fd(fd), m_uring_ctx(uring_ctx), m_buffer(buffer), m_receiver(std::move(receiver)) { m_completion_data.op_state_ptr = this; m_completion_data.completion_handler = &operation_state_t::complete; m_event.type = UringOpType::Write; m_event.data = &m_completion_data; } void start() noexcept { // Enqueue write operation using io_uring io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring()); if (!sqe) { stdexec::set_error(std::move(m_receiver), std::make_exception_ptr(std::runtime_error("Failed to get SQE"))); return; } io_uring_prep_write(sqe, m_fd, m_buffer.data(), m_buffer.size(), 0); // store Event* so run() can dispatch by type io_uring_sqe_set_data(sqe, &this->m_event); int ret = io_uring_submit(m_uring_ctx.ring()); if (ret < 0) { stdexec::set_error(std::move(m_receiver), std::make_exception_ptr(std::runtime_error("io_uring_submit failed"))); return; } } }; template Receiver> friend auto tag_invoke(stdexec::connect_t, WriteSender self, Receiver r) noexcept { return typename WriteSender ::template operation_state(self.m_fd, self.m_uring_ctx, self.m_buffer, std::move(r)); } }; kev::task UringContext::async_write(RawFileDescriptor fd, std::span data) { co_return co_await WriteSender{std::move(fd), *this, data}; } kev::task UringContext::async_write_all(RawFileDescriptor fd, std::span data) { using namespace stdexec; size_t total_written = 0; while (total_written < data.size()) { size_t bytes_written = co_await async_write(fd, data.subspan(total_written, data.size() - total_written)); if (bytes_written == 0) { throw std::runtime_error("Connection closed while writing"); } total_written += bytes_written; } }