this repo has no description
at main 386 lines 14 kB view raw
1#include "uring_context.hpp" 2 3#include <iostream> 4 5enum class UringOpType 6{ 7 Accept, 8 Read, 9 Write 10}; 11 12struct Event 13{ 14 UringOpType type; 15 void *data; 16}; 17 18struct AcceptOperationData 19{ 20 void *op_state_ptr; 21 void (*completion_handler)(void *op_state_ptr, int client_fd) noexcept; 22}; 23 24struct ReadOperationData 25{ 26 void *op_state_ptr; 27 void (*completion_handler)(void *op_state_ptr, ssize_t bytes_read) noexcept; 28}; 29 30struct WriteOperationData 31{ 32 void *op_state_ptr; 33 void (*completion_handler)(void *op_state_ptr, ssize_t bytes_written) noexcept; 34}; 35 36UringContext::UringContext(unsigned entries) 37{ 38 int ret = io_uring_queue_init(entries, &m_ring, 0); 39 if (ret < 0) 40 { 41 throw std::runtime_error("io_uring_queue_init failed"); 42 } 43} 44 45UringContext::UringContext(UringContext &&other) noexcept : m_ring(other.m_ring) 46{ 47 if (other.m_ring.ring_fd != -1) 48 { 49 io_uring_queue_exit(&m_ring); 50 m_ring = other.m_ring; 51 std::memset(&other.m_ring, 0, sizeof(other.m_ring)); 52 other.m_ring.ring_fd = -1; 53 } 54} 55 56UringContext &UringContext::operator=(UringContext &&other) noexcept 57{ 58 if (other.m_ring.ring_fd != -1 && this != &other) 59 { 60 io_uring_queue_exit(&m_ring); 61 m_ring = other.m_ring; 62 std::memset(&other.m_ring, 0, sizeof(other.m_ring)); 63 other.m_ring.ring_fd = -1; 64 } 65 return *this; 66} 67 68void UringContext::run_once() 69{ 70 struct io_uring_cqe *cqe; 71 int ret = io_uring_wait_cqe(&m_ring, &cqe); 72 if (ret < 0) 73 { 74 std::println(std::cerr, "io_uring_wait_cqe failed with error: {}", strerror(-ret)); 75 return; 76 } 77 78 Event *event = static_cast<Event *>(io_uring_cqe_get_data(cqe)); 79 if (event) 80 { 81 switch (event->type) 82 { 83 case UringOpType::Accept: { 84 auto *user_data = static_cast<AcceptOperationData *>(event->data); 85 assert(user_data != nullptr); 86 assert(user_data->completion_handler != nullptr); 87 user_data->completion_handler(user_data->op_state_ptr, cqe->res); 88 break; 89 } 90 case UringOpType::Read: { 91 auto *user_data = static_cast<ReadOperationData *>(event->data); 92 assert(user_data != nullptr); 93 assert(user_data->completion_handler != nullptr); 94 user_data->completion_handler(user_data->op_state_ptr, static_cast<ssize_t>(cqe->res)); 95 break; 96 } 97 case UringOpType::Write: { 98 auto *user_data = static_cast<WriteOperationData *>(event->data); 99 assert(user_data != nullptr); 100 assert(user_data->completion_handler != nullptr); 101 user_data->completion_handler(user_data->op_state_ptr, static_cast<ssize_t>(cqe->res)); 102 break; 103 } 104 default: 105 break; 106 } 107 } 108 io_uring_cqe_seen(&m_ring, cqe); 109} 110 111UringContext::~UringContext() noexcept 112{ 113 io_uring_queue_exit(&m_ring); 114} 115 116struct io_uring *UringContext::ring() noexcept 117{ 118 return &m_ring; 119} 120 121struct AcceptSender 122{ 123 RawFileDescriptor m_server_fd; 124 UringContext &m_uring_ctx; 125 126 AcceptSender(RawFileDescriptor server_fd, UringContext &uring_ctx) : m_server_fd(server_fd), m_uring_ctx(uring_ctx) 127 { 128 } 129 130 using sender_concept = stdexec::sender_t; 131 using completion_signatures = 132 stdexec::completion_signatures<stdexec::set_value_t(int), // on success, return client fd 133 stdexec::set_error_t(std::exception_ptr), // on error, return exception 134 stdexec::set_stopped_t() // on stopped 135 >; 136 137 template <stdexec::receiver Receiver> struct operation_state 138 { 139 using operation_state_t = operation_state<Receiver>; 140 141 RawFileDescriptor m_server_fd; 142 UringContext &m_uring_ctx; 143 Receiver m_receiver; 144 AcceptOperationData m_completion_data{}; 145 Event m_event{UringOpType::Accept, nullptr}; 146 147 static void complete(void *type_erased_op_state_ptr, int client_fd) noexcept 148 { 149 auto *op_state = static_cast<operation_state_t *>(type_erased_op_state_ptr); 150 if (client_fd >= 0) 151 { 152 stdexec::set_value(std::move(op_state->m_receiver), client_fd); 153 } 154 else 155 { 156 stdexec::set_error(std::move(op_state->m_receiver), 157 std::make_exception_ptr(std::runtime_error("Accept failed"))); 158 } 159 } 160 161 operation_state(RawFileDescriptor server_fd, UringContext &uring_ctx, Receiver &&receiver) 162 : m_server_fd(server_fd), m_uring_ctx(uring_ctx), m_receiver(std::move(receiver)) 163 { 164 m_completion_data.op_state_ptr = this; 165 m_completion_data.completion_handler = &operation_state_t::complete; 166 m_event.type = UringOpType::Accept; 167 m_event.data = &m_completion_data; 168 } 169 170 void start() noexcept 171 { 172 // Enqueue accept operation using io_uring 173 io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring()); // Get a submission queue entry 174 if (!sqe) 175 { 176 stdexec::set_error(std::move(m_receiver), 177 std::make_exception_ptr(std::runtime_error("Failed to get SQE"))); 178 return; 179 } 180 io_uring_prep_accept(sqe, m_server_fd, nullptr, nullptr, 0); 181 // store Event* so run() can dispatch by type 182 io_uring_sqe_set_data(sqe, &this->m_event); 183 int ret = io_uring_submit(m_uring_ctx.ring()); 184 if (ret < 0) 185 { 186 stdexec::set_error(std::move(m_receiver), 187 std::make_exception_ptr(std::runtime_error("io_uring_submit failed"))); 188 return; 189 } 190 } 191 }; 192 193 template <class Receiver> friend auto tag_invoke(stdexec::connect_t, AcceptSender self, Receiver r) noexcept 194 { 195 return 196 typename AcceptSender::template operation_state<Receiver>(self.m_server_fd, self.m_uring_ctx, std::move(r)); 197 } 198}; 199 200kev::task<int> UringContext::async_accept(RawFileDescriptor server_fd) 201{ 202 co_return co_await AcceptSender(server_fd, *this); 203} 204 205struct ReadSender 206{ 207 RawFileDescriptor m_fd; 208 UringContext &m_uring_ctx; 209 std::span<std::byte> m_buffer; 210 211 using sender_concept = stdexec::sender_t; 212 using completion_signatures = 213 stdexec::completion_signatures<stdexec::set_value_t(size_t), // on success, return bytes read 214 stdexec::set_error_t(std::exception_ptr), // on error, return exception 215 stdexec::set_stopped_t() // on stopped 216 >; 217 218 template <stdexec::receiver Receiver> struct operation_state 219 { 220 using operation_state_t = operation_state<Receiver>; 221 222 RawFileDescriptor m_fd; 223 UringContext &m_uring_ctx; 224 std::span<std::byte> m_buffer; 225 Receiver m_receiver; 226 Event m_event{UringOpType::Read, nullptr}; 227 ReadOperationData m_completion_data; 228 229 static void complete(void *type_erased_op_state_ptr, ssize_t bytes_read) noexcept 230 { 231 auto *op_state = static_cast<operation_state_t *>(type_erased_op_state_ptr); 232 if (bytes_read >= 0) 233 { 234 stdexec::set_value(std::move(op_state->m_receiver), static_cast<size_t>(bytes_read)); 235 } 236 else 237 { 238 stdexec::set_error(std::move(op_state->m_receiver), 239 std::make_exception_ptr(std::runtime_error("Read failed"))); 240 } 241 } 242 243 operation_state(RawFileDescriptor fd, UringContext &uring_ctx, std::span<std::byte> buffer, Receiver &&receiver) 244 : m_fd(fd), m_uring_ctx(uring_ctx), m_buffer(buffer), m_receiver(std::move(receiver)) 245 { 246 m_completion_data.op_state_ptr = this; 247 m_completion_data.completion_handler = &operation_state_t::complete; 248 m_event.type = UringOpType::Read; 249 m_event.data = &m_completion_data; 250 } 251 252 void start() noexcept 253 { 254 // Enqueue read operation using io_uring 255 io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring()); 256 if (!sqe) 257 { 258 stdexec::set_error(std::move(m_receiver), 259 std::make_exception_ptr(std::runtime_error("Failed to get SQE"))); 260 return; 261 } 262 io_uring_prep_read(sqe, m_fd, m_buffer.data(), m_buffer.size(), 0); 263 // store Event* so run() can dispatch by type 264 io_uring_sqe_set_data(sqe, &this->m_event); 265 int ret = io_uring_submit(m_uring_ctx.ring()); 266 if (ret < 0) 267 { 268 stdexec::set_error(std::move(m_receiver), 269 std::make_exception_ptr(std::runtime_error("io_uring_submit failed"))); 270 return; 271 } 272 } 273 }; 274 275 template <stdexec::receiver_of<completion_signatures> Receiver> 276 friend auto tag_invoke(stdexec::connect_t, ReadSender self, Receiver r) noexcept 277 { 278 return typename ReadSender::template operation_state<Receiver>(self.m_fd, self.m_uring_ctx, self.m_buffer, 279 std::move(r)); 280 } 281}; 282 283kev::task<size_t> UringContext::async_read(RawFileDescriptor fd, std::span<std::byte> buffer) 284{ 285 co_return co_await ReadSender{std::move(fd), *this, buffer}; 286} 287 288struct WriteSender 289{ 290 RawFileDescriptor m_fd; 291 UringContext &m_uring_ctx; 292 std::span<const std::byte> m_buffer; 293 294 using sender_concept = stdexec::sender_t; 295 using completion_signatures = 296 stdexec::completion_signatures<stdexec::set_value_t(size_t), // on success, return bytes written 297 stdexec::set_error_t(std::exception_ptr), // on error, return exception 298 stdexec::set_stopped_t() // on stopped 299 >; 300 301 template <stdexec::receiver Receiver> struct operation_state 302 { 303 using operation_state_t = operation_state<Receiver>; 304 305 RawFileDescriptor m_fd; 306 UringContext &m_uring_ctx; 307 std::span<const std::byte> m_buffer; 308 Receiver m_receiver; 309 Event m_event{UringOpType::Write, nullptr}; 310 WriteOperationData m_completion_data; 311 312 static void complete(void *type_erased_op_state_ptr, ssize_t bytes_written) noexcept 313 { 314 auto *op_state = static_cast<operation_state_t *>(type_erased_op_state_ptr); 315 if (bytes_written >= 0) 316 { 317 stdexec::set_value(std::move(op_state->m_receiver), static_cast<size_t>(bytes_written)); 318 } 319 else 320 { 321 stdexec::set_error(std::move(op_state->m_receiver), 322 std::make_exception_ptr(std::runtime_error("Write failed"))); 323 } 324 } 325 326 operation_state(RawFileDescriptor fd, UringContext &uring_ctx, std::span<const std::byte> buffer, 327 Receiver &&receiver) 328 : m_fd(fd), m_uring_ctx(uring_ctx), m_buffer(buffer), m_receiver(std::move(receiver)) 329 { 330 m_completion_data.op_state_ptr = this; 331 m_completion_data.completion_handler = &operation_state_t::complete; 332 m_event.type = UringOpType::Write; 333 m_event.data = &m_completion_data; 334 } 335 336 void start() noexcept 337 { 338 // Enqueue write operation using io_uring 339 io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring()); 340 if (!sqe) 341 { 342 stdexec::set_error(std::move(m_receiver), 343 std::make_exception_ptr(std::runtime_error("Failed to get SQE"))); 344 return; 345 } 346 io_uring_prep_write(sqe, m_fd, m_buffer.data(), m_buffer.size(), 0); 347 // store Event* so run() can dispatch by type 348 io_uring_sqe_set_data(sqe, &this->m_event); 349 int ret = io_uring_submit(m_uring_ctx.ring()); 350 if (ret < 0) 351 { 352 stdexec::set_error(std::move(m_receiver), 353 std::make_exception_ptr(std::runtime_error("io_uring_submit failed"))); 354 return; 355 } 356 } 357 }; 358 359 template <stdexec::receiver_of<completion_signatures> Receiver> 360 friend auto tag_invoke(stdexec::connect_t, WriteSender self, Receiver r) noexcept 361 { 362 return typename WriteSender ::template operation_state<Receiver>(self.m_fd, self.m_uring_ctx, self.m_buffer, 363 std::move(r)); 364 } 365}; 366 367kev::task<size_t> UringContext::async_write(RawFileDescriptor fd, std::span<const std::byte> data) 368{ 369 co_return co_await WriteSender{std::move(fd), *this, data}; 370} 371 372kev::task<void> UringContext::async_write_all(RawFileDescriptor fd, std::span<const std::byte> data) 373{ 374 using namespace stdexec; 375 376 size_t total_written = 0; 377 while (total_written < data.size()) 378 { 379 size_t bytes_written = co_await async_write(fd, data.subspan(total_written, data.size() - total_written)); 380 if (bytes_written == 0) 381 { 382 throw std::runtime_error("Connection closed while writing"); 383 } 384 total_written += bytes_written; 385 } 386}