this repo has no description
1#include "uring_context.hpp"
2
3#include <iostream>
4
5enum class UringOpType
6{
7 Accept,
8 Read,
9 Write
10};
11
12struct Event
13{
14 UringOpType type;
15 void *data;
16};
17
18struct AcceptOperationData
19{
20 void *op_state_ptr;
21 void (*completion_handler)(void *op_state_ptr, int client_fd) noexcept;
22};
23
24struct ReadOperationData
25{
26 void *op_state_ptr;
27 void (*completion_handler)(void *op_state_ptr, ssize_t bytes_read) noexcept;
28};
29
30struct WriteOperationData
31{
32 void *op_state_ptr;
33 void (*completion_handler)(void *op_state_ptr, ssize_t bytes_written) noexcept;
34};
35
36UringContext::UringContext(unsigned entries)
37{
38 int ret = io_uring_queue_init(entries, &m_ring, 0);
39 if (ret < 0)
40 {
41 throw std::runtime_error("io_uring_queue_init failed");
42 }
43}
44
45UringContext::UringContext(UringContext &&other) noexcept : m_ring(other.m_ring)
46{
47 if (other.m_ring.ring_fd != -1)
48 {
49 io_uring_queue_exit(&m_ring);
50 m_ring = other.m_ring;
51 std::memset(&other.m_ring, 0, sizeof(other.m_ring));
52 other.m_ring.ring_fd = -1;
53 }
54}
55
56UringContext &UringContext::operator=(UringContext &&other) noexcept
57{
58 if (other.m_ring.ring_fd != -1 && this != &other)
59 {
60 io_uring_queue_exit(&m_ring);
61 m_ring = other.m_ring;
62 std::memset(&other.m_ring, 0, sizeof(other.m_ring));
63 other.m_ring.ring_fd = -1;
64 }
65 return *this;
66}
67
68void UringContext::run_once()
69{
70 struct io_uring_cqe *cqe;
71 int ret = io_uring_wait_cqe(&m_ring, &cqe);
72 if (ret < 0)
73 {
74 std::println(std::cerr, "io_uring_wait_cqe failed with error: {}", strerror(-ret));
75 return;
76 }
77
78 Event *event = static_cast<Event *>(io_uring_cqe_get_data(cqe));
79 if (event)
80 {
81 switch (event->type)
82 {
83 case UringOpType::Accept: {
84 auto *user_data = static_cast<AcceptOperationData *>(event->data);
85 assert(user_data != nullptr);
86 assert(user_data->completion_handler != nullptr);
87 user_data->completion_handler(user_data->op_state_ptr, cqe->res);
88 break;
89 }
90 case UringOpType::Read: {
91 auto *user_data = static_cast<ReadOperationData *>(event->data);
92 assert(user_data != nullptr);
93 assert(user_data->completion_handler != nullptr);
94 user_data->completion_handler(user_data->op_state_ptr, static_cast<ssize_t>(cqe->res));
95 break;
96 }
97 case UringOpType::Write: {
98 auto *user_data = static_cast<WriteOperationData *>(event->data);
99 assert(user_data != nullptr);
100 assert(user_data->completion_handler != nullptr);
101 user_data->completion_handler(user_data->op_state_ptr, static_cast<ssize_t>(cqe->res));
102 break;
103 }
104 default:
105 break;
106 }
107 }
108 io_uring_cqe_seen(&m_ring, cqe);
109}
110
111UringContext::~UringContext() noexcept
112{
113 io_uring_queue_exit(&m_ring);
114}
115
116struct io_uring *UringContext::ring() noexcept
117{
118 return &m_ring;
119}
120
121struct AcceptSender
122{
123 RawFileDescriptor m_server_fd;
124 UringContext &m_uring_ctx;
125
126 AcceptSender(RawFileDescriptor server_fd, UringContext &uring_ctx) : m_server_fd(server_fd), m_uring_ctx(uring_ctx)
127 {
128 }
129
130 using sender_concept = stdexec::sender_t;
131 using completion_signatures =
132 stdexec::completion_signatures<stdexec::set_value_t(int), // on success, return client fd
133 stdexec::set_error_t(std::exception_ptr), // on error, return exception
134 stdexec::set_stopped_t() // on stopped
135 >;
136
137 template <stdexec::receiver Receiver> struct operation_state
138 {
139 using operation_state_t = operation_state<Receiver>;
140
141 RawFileDescriptor m_server_fd;
142 UringContext &m_uring_ctx;
143 Receiver m_receiver;
144 AcceptOperationData m_completion_data{};
145 Event m_event{UringOpType::Accept, nullptr};
146
147 static void complete(void *type_erased_op_state_ptr, int client_fd) noexcept
148 {
149 auto *op_state = static_cast<operation_state_t *>(type_erased_op_state_ptr);
150 if (client_fd >= 0)
151 {
152 stdexec::set_value(std::move(op_state->m_receiver), client_fd);
153 }
154 else
155 {
156 stdexec::set_error(std::move(op_state->m_receiver),
157 std::make_exception_ptr(std::runtime_error("Accept failed")));
158 }
159 }
160
161 operation_state(RawFileDescriptor server_fd, UringContext &uring_ctx, Receiver &&receiver)
162 : m_server_fd(server_fd), m_uring_ctx(uring_ctx), m_receiver(std::move(receiver))
163 {
164 m_completion_data.op_state_ptr = this;
165 m_completion_data.completion_handler = &operation_state_t::complete;
166 m_event.type = UringOpType::Accept;
167 m_event.data = &m_completion_data;
168 }
169
170 void start() noexcept
171 {
172 // Enqueue accept operation using io_uring
173 io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring()); // Get a submission queue entry
174 if (!sqe)
175 {
176 stdexec::set_error(std::move(m_receiver),
177 std::make_exception_ptr(std::runtime_error("Failed to get SQE")));
178 return;
179 }
180 io_uring_prep_accept(sqe, m_server_fd, nullptr, nullptr, 0);
181 // store Event* so run() can dispatch by type
182 io_uring_sqe_set_data(sqe, &this->m_event);
183 int ret = io_uring_submit(m_uring_ctx.ring());
184 if (ret < 0)
185 {
186 stdexec::set_error(std::move(m_receiver),
187 std::make_exception_ptr(std::runtime_error("io_uring_submit failed")));
188 return;
189 }
190 }
191 };
192
193 template <class Receiver> friend auto tag_invoke(stdexec::connect_t, AcceptSender self, Receiver r) noexcept
194 {
195 return
196 typename AcceptSender::template operation_state<Receiver>(self.m_server_fd, self.m_uring_ctx, std::move(r));
197 }
198};
199
200kev::task<int> UringContext::async_accept(RawFileDescriptor server_fd)
201{
202 co_return co_await AcceptSender(server_fd, *this);
203}
204
205struct ReadSender
206{
207 RawFileDescriptor m_fd;
208 UringContext &m_uring_ctx;
209 std::span<std::byte> m_buffer;
210
211 using sender_concept = stdexec::sender_t;
212 using completion_signatures =
213 stdexec::completion_signatures<stdexec::set_value_t(size_t), // on success, return bytes read
214 stdexec::set_error_t(std::exception_ptr), // on error, return exception
215 stdexec::set_stopped_t() // on stopped
216 >;
217
218 template <stdexec::receiver Receiver> struct operation_state
219 {
220 using operation_state_t = operation_state<Receiver>;
221
222 RawFileDescriptor m_fd;
223 UringContext &m_uring_ctx;
224 std::span<std::byte> m_buffer;
225 Receiver m_receiver;
226 Event m_event{UringOpType::Read, nullptr};
227 ReadOperationData m_completion_data;
228
229 static void complete(void *type_erased_op_state_ptr, ssize_t bytes_read) noexcept
230 {
231 auto *op_state = static_cast<operation_state_t *>(type_erased_op_state_ptr);
232 if (bytes_read >= 0)
233 {
234 stdexec::set_value(std::move(op_state->m_receiver), static_cast<size_t>(bytes_read));
235 }
236 else
237 {
238 stdexec::set_error(std::move(op_state->m_receiver),
239 std::make_exception_ptr(std::runtime_error("Read failed")));
240 }
241 }
242
243 operation_state(RawFileDescriptor fd, UringContext &uring_ctx, std::span<std::byte> buffer, Receiver &&receiver)
244 : m_fd(fd), m_uring_ctx(uring_ctx), m_buffer(buffer), m_receiver(std::move(receiver))
245 {
246 m_completion_data.op_state_ptr = this;
247 m_completion_data.completion_handler = &operation_state_t::complete;
248 m_event.type = UringOpType::Read;
249 m_event.data = &m_completion_data;
250 }
251
252 void start() noexcept
253 {
254 // Enqueue read operation using io_uring
255 io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring());
256 if (!sqe)
257 {
258 stdexec::set_error(std::move(m_receiver),
259 std::make_exception_ptr(std::runtime_error("Failed to get SQE")));
260 return;
261 }
262 io_uring_prep_read(sqe, m_fd, m_buffer.data(), m_buffer.size(), 0);
263 // store Event* so run() can dispatch by type
264 io_uring_sqe_set_data(sqe, &this->m_event);
265 int ret = io_uring_submit(m_uring_ctx.ring());
266 if (ret < 0)
267 {
268 stdexec::set_error(std::move(m_receiver),
269 std::make_exception_ptr(std::runtime_error("io_uring_submit failed")));
270 return;
271 }
272 }
273 };
274
275 template <stdexec::receiver_of<completion_signatures> Receiver>
276 friend auto tag_invoke(stdexec::connect_t, ReadSender self, Receiver r) noexcept
277 {
278 return typename ReadSender::template operation_state<Receiver>(self.m_fd, self.m_uring_ctx, self.m_buffer,
279 std::move(r));
280 }
281};
282
283kev::task<size_t> UringContext::async_read(RawFileDescriptor fd, std::span<std::byte> buffer)
284{
285 co_return co_await ReadSender{std::move(fd), *this, buffer};
286}
287
288struct WriteSender
289{
290 RawFileDescriptor m_fd;
291 UringContext &m_uring_ctx;
292 std::span<const std::byte> m_buffer;
293
294 using sender_concept = stdexec::sender_t;
295 using completion_signatures =
296 stdexec::completion_signatures<stdexec::set_value_t(size_t), // on success, return bytes written
297 stdexec::set_error_t(std::exception_ptr), // on error, return exception
298 stdexec::set_stopped_t() // on stopped
299 >;
300
301 template <stdexec::receiver Receiver> struct operation_state
302 {
303 using operation_state_t = operation_state<Receiver>;
304
305 RawFileDescriptor m_fd;
306 UringContext &m_uring_ctx;
307 std::span<const std::byte> m_buffer;
308 Receiver m_receiver;
309 Event m_event{UringOpType::Write, nullptr};
310 WriteOperationData m_completion_data;
311
312 static void complete(void *type_erased_op_state_ptr, ssize_t bytes_written) noexcept
313 {
314 auto *op_state = static_cast<operation_state_t *>(type_erased_op_state_ptr);
315 if (bytes_written >= 0)
316 {
317 stdexec::set_value(std::move(op_state->m_receiver), static_cast<size_t>(bytes_written));
318 }
319 else
320 {
321 stdexec::set_error(std::move(op_state->m_receiver),
322 std::make_exception_ptr(std::runtime_error("Write failed")));
323 }
324 }
325
326 operation_state(RawFileDescriptor fd, UringContext &uring_ctx, std::span<const std::byte> buffer,
327 Receiver &&receiver)
328 : m_fd(fd), m_uring_ctx(uring_ctx), m_buffer(buffer), m_receiver(std::move(receiver))
329 {
330 m_completion_data.op_state_ptr = this;
331 m_completion_data.completion_handler = &operation_state_t::complete;
332 m_event.type = UringOpType::Write;
333 m_event.data = &m_completion_data;
334 }
335
336 void start() noexcept
337 {
338 // Enqueue write operation using io_uring
339 io_uring_sqe *const sqe = io_uring_get_sqe(m_uring_ctx.ring());
340 if (!sqe)
341 {
342 stdexec::set_error(std::move(m_receiver),
343 std::make_exception_ptr(std::runtime_error("Failed to get SQE")));
344 return;
345 }
346 io_uring_prep_write(sqe, m_fd, m_buffer.data(), m_buffer.size(), 0);
347 // store Event* so run() can dispatch by type
348 io_uring_sqe_set_data(sqe, &this->m_event);
349 int ret = io_uring_submit(m_uring_ctx.ring());
350 if (ret < 0)
351 {
352 stdexec::set_error(std::move(m_receiver),
353 std::make_exception_ptr(std::runtime_error("io_uring_submit failed")));
354 return;
355 }
356 }
357 };
358
359 template <stdexec::receiver_of<completion_signatures> Receiver>
360 friend auto tag_invoke(stdexec::connect_t, WriteSender self, Receiver r) noexcept
361 {
362 return typename WriteSender ::template operation_state<Receiver>(self.m_fd, self.m_uring_ctx, self.m_buffer,
363 std::move(r));
364 }
365};
366
367kev::task<size_t> UringContext::async_write(RawFileDescriptor fd, std::span<const std::byte> data)
368{
369 co_return co_await WriteSender{std::move(fd), *this, data};
370}
371
372kev::task<void> UringContext::async_write_all(RawFileDescriptor fd, std::span<const std::byte> data)
373{
374 using namespace stdexec;
375
376 size_t total_written = 0;
377 while (total_written < data.size())
378 {
379 size_t bytes_written = co_await async_write(fd, data.subspan(total_written, data.size() - total_written));
380 if (bytes_written == 0)
381 {
382 throw std::runtime_error("Connection closed while writing");
383 }
384 total_written += bytes_written;
385 }
386}