Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1// SPDX-License-Identifier: GPL-2.0
2/*
3 * Task work handling for io_uring
4 */
5#include <linux/kernel.h>
6#include <linux/errno.h>
7#include <linux/sched/signal.h>
8#include <linux/io_uring.h>
9#include <linux/indirect_call_wrapper.h>
10
11#include "io_uring.h"
12#include "tctx.h"
13#include "poll.h"
14#include "rw.h"
15#include "eventfd.h"
16#include "wait.h"
17
18void io_fallback_req_func(struct work_struct *work)
19{
20 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
21 fallback_work.work);
22 struct llist_node *node = llist_del_all(&ctx->fallback_llist);
23 struct io_kiocb *req, *tmp;
24 struct io_tw_state ts = {};
25
26 percpu_ref_get(&ctx->refs);
27 mutex_lock(&ctx->uring_lock);
28 ts.cancel = io_should_terminate_tw(ctx);
29 llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
30 req->io_task_work.func((struct io_tw_req){req}, ts);
31 io_submit_flush_completions(ctx);
32 mutex_unlock(&ctx->uring_lock);
33 percpu_ref_put(&ctx->refs);
34}
35
36static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
37{
38 if (!ctx)
39 return;
40 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
41 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
42
43 io_submit_flush_completions(ctx);
44 mutex_unlock(&ctx->uring_lock);
45 percpu_ref_put(&ctx->refs);
46}
47
48/*
49 * Run queued task_work, returning the number of entries processed in *count.
50 * If more entries than max_entries are available, stop processing once this
51 * is reached and return the rest of the list.
52 */
53struct llist_node *io_handle_tw_list(struct llist_node *node,
54 unsigned int *count,
55 unsigned int max_entries)
56{
57 struct io_ring_ctx *ctx = NULL;
58 struct io_tw_state ts = { };
59
60 do {
61 struct llist_node *next = node->next;
62 struct io_kiocb *req = container_of(node, struct io_kiocb,
63 io_task_work.node);
64
65 if (req->ctx != ctx) {
66 ctx_flush_and_put(ctx, ts);
67 ctx = req->ctx;
68 mutex_lock(&ctx->uring_lock);
69 percpu_ref_get(&ctx->refs);
70 ts.cancel = io_should_terminate_tw(ctx);
71 }
72 INDIRECT_CALL_2(req->io_task_work.func,
73 io_poll_task_func, io_req_rw_complete,
74 (struct io_tw_req){req}, ts);
75 node = next;
76 (*count)++;
77 if (unlikely(need_resched())) {
78 ctx_flush_and_put(ctx, ts);
79 ctx = NULL;
80 cond_resched();
81 }
82 } while (node && *count < max_entries);
83
84 ctx_flush_and_put(ctx, ts);
85 return node;
86}
87
88static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
89{
90 struct io_ring_ctx *last_ctx = NULL;
91 struct io_kiocb *req;
92
93 while (node) {
94 req = container_of(node, struct io_kiocb, io_task_work.node);
95 node = node->next;
96 if (last_ctx != req->ctx) {
97 if (last_ctx) {
98 if (sync)
99 flush_delayed_work(&last_ctx->fallback_work);
100 percpu_ref_put(&last_ctx->refs);
101 }
102 last_ctx = req->ctx;
103 percpu_ref_get(&last_ctx->refs);
104 }
105 if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
106 schedule_delayed_work(&last_ctx->fallback_work, 1);
107 }
108
109 if (last_ctx) {
110 if (sync)
111 flush_delayed_work(&last_ctx->fallback_work);
112 percpu_ref_put(&last_ctx->refs);
113 }
114}
115
116static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
117{
118 struct llist_node *node = llist_del_all(&tctx->task_list);
119
120 __io_fallback_tw(node, sync);
121}
122
123struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
124 unsigned int max_entries,
125 unsigned int *count)
126{
127 struct llist_node *node;
128
129 node = llist_del_all(&tctx->task_list);
130 if (node) {
131 node = llist_reverse_order(node);
132 node = io_handle_tw_list(node, count, max_entries);
133 }
134
135 /* relaxed read is enough as only the task itself sets ->in_cancel */
136 if (unlikely(atomic_read(&tctx->in_cancel)))
137 io_uring_drop_tctx_refs(current);
138
139 trace_io_uring_task_work_run(tctx, *count);
140 return node;
141}
142
143void tctx_task_work(struct callback_head *cb)
144{
145 struct io_uring_task *tctx;
146 struct llist_node *ret;
147 unsigned int count = 0;
148
149 tctx = container_of(cb, struct io_uring_task, task_work);
150 ret = tctx_task_work_run(tctx, UINT_MAX, &count);
151 /* can't happen */
152 WARN_ON_ONCE(ret);
153}
154
155void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
156{
157 struct io_ring_ctx *ctx = req->ctx;
158 unsigned nr_wait, nr_tw, nr_tw_prev;
159 struct llist_node *head;
160
161 /* See comment above IO_CQ_WAKE_INIT */
162 BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
163
164 /*
165 * We don't know how many requests there are in the link and whether
166 * they can even be queued lazily, fall back to non-lazy.
167 */
168 if (req->flags & IO_REQ_LINK_FLAGS)
169 flags &= ~IOU_F_TWQ_LAZY_WAKE;
170
171 guard(rcu)();
172
173 head = READ_ONCE(ctx->work_llist.first);
174 do {
175 nr_tw_prev = 0;
176 if (head) {
177 struct io_kiocb *first_req = container_of(head,
178 struct io_kiocb,
179 io_task_work.node);
180 /*
181 * Might be executed at any moment, rely on
182 * SLAB_TYPESAFE_BY_RCU to keep it alive.
183 */
184 nr_tw_prev = READ_ONCE(first_req->nr_tw);
185 }
186
187 /*
188 * Theoretically, it can overflow, but that's fine as one of
189 * previous adds should've tried to wake the task.
190 */
191 nr_tw = nr_tw_prev + 1;
192 if (!(flags & IOU_F_TWQ_LAZY_WAKE))
193 nr_tw = IO_CQ_WAKE_FORCE;
194
195 req->nr_tw = nr_tw;
196 req->io_task_work.node.next = head;
197 } while (!try_cmpxchg(&ctx->work_llist.first, &head,
198 &req->io_task_work.node));
199
200 /*
201 * cmpxchg implies a full barrier, which pairs with the barrier
202 * in set_current_state() on the io_cqring_wait() side. It's used
203 * to ensure that either we see updated ->cq_wait_nr, or waiters
204 * going to sleep will observe the work added to the list, which
205 * is similar to the wait/wawke task state sync.
206 */
207
208 if (!head) {
209 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
210 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
211 if (ctx->has_evfd)
212 io_eventfd_signal(ctx, false);
213 }
214
215 nr_wait = atomic_read(&ctx->cq_wait_nr);
216 /* not enough or no one is waiting */
217 if (nr_tw < nr_wait)
218 return;
219 /* the previous add has already woken it up */
220 if (nr_tw_prev >= nr_wait)
221 return;
222 wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
223}
224
225void io_req_normal_work_add(struct io_kiocb *req)
226{
227 struct io_uring_task *tctx = req->tctx;
228 struct io_ring_ctx *ctx = req->ctx;
229
230 /* task_work already pending, we're done */
231 if (!llist_add(&req->io_task_work.node, &tctx->task_list))
232 return;
233
234 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
235 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
236
237 /* SQPOLL doesn't need the task_work added, it'll run it itself */
238 if (ctx->flags & IORING_SETUP_SQPOLL) {
239 __set_notify_signal(tctx->task);
240 return;
241 }
242
243 if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
244 return;
245
246 io_fallback_tw(tctx, false);
247}
248
249void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
250{
251 if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
252 return;
253 __io_req_task_work_add(req, flags);
254}
255
256void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
257{
258 struct llist_node *node = llist_del_all(&ctx->work_llist);
259
260 __io_fallback_tw(node, false);
261 node = llist_del_all(&ctx->retry_llist);
262 __io_fallback_tw(node, false);
263}
264
265static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
266 int min_events)
267{
268 if (!io_local_work_pending(ctx))
269 return false;
270 if (events < min_events)
271 return true;
272 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
273 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
274 return false;
275}
276
277static int __io_run_local_work_loop(struct llist_node **node,
278 io_tw_token_t tw,
279 int events)
280{
281 int ret = 0;
282
283 while (*node) {
284 struct llist_node *next = (*node)->next;
285 struct io_kiocb *req = container_of(*node, struct io_kiocb,
286 io_task_work.node);
287 INDIRECT_CALL_2(req->io_task_work.func,
288 io_poll_task_func, io_req_rw_complete,
289 (struct io_tw_req){req}, tw);
290 *node = next;
291 if (++ret >= events)
292 break;
293 }
294
295 return ret;
296}
297
298static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
299 int min_events, int max_events)
300{
301 struct llist_node *node;
302 unsigned int loops = 0;
303 int ret = 0;
304
305 if (WARN_ON_ONCE(ctx->submitter_task != current))
306 return -EEXIST;
307 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
308 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
309again:
310 tw.cancel = io_should_terminate_tw(ctx);
311 min_events -= ret;
312 ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
313 if (ctx->retry_llist.first)
314 goto retry_done;
315
316 /*
317 * llists are in reverse order, flip it back the right way before
318 * running the pending items.
319 */
320 node = llist_reverse_order(llist_del_all(&ctx->work_llist));
321 ret += __io_run_local_work_loop(&node, tw, max_events - ret);
322 ctx->retry_llist.first = node;
323 loops++;
324
325 if (io_run_local_work_continue(ctx, ret, min_events))
326 goto again;
327retry_done:
328 io_submit_flush_completions(ctx);
329 if (io_run_local_work_continue(ctx, ret, min_events))
330 goto again;
331
332 trace_io_uring_local_work_run(ctx, ret, loops);
333 return ret;
334}
335
336int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events)
337{
338 struct io_tw_state ts = {};
339
340 if (!io_local_work_pending(ctx))
341 return 0;
342 return __io_run_local_work(ctx, ts, min_events,
343 max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
344}
345
346int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
347{
348 struct io_tw_state ts = {};
349 int ret;
350
351 mutex_lock(&ctx->uring_lock);
352 ret = __io_run_local_work(ctx, ts, min_events, max_events);
353 mutex_unlock(&ctx->uring_lock);
354 return ret;
355}