prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5
6const db = @import("../db/sqlite.zig");
7const uuid_util = @import("../utilities/uuid.zig");
8const time_util = @import("../utilities/time.zig");
9const json_util = @import("../utilities/json.zig");
10const pool_helpers = @import("work_pools.zig");
11
12pub fn create(r: zap.Request, target: []const u8) !void {
13 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
14 defer arena.deinit();
15 const alloc = arena.allocator();
16
17 const pool_name = pool_helpers.extractPoolName(target) orelse {
18 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
19 return;
20 };
21
22 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
23 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
24 return;
25 };
26
27 const body = r.body orelse {
28 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
29 return;
30 };
31
32 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
33 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
34 return;
35 };
36
37 const obj = parsed.value.object;
38
39 const name = switch (obj.get("name") orelse {
40 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request);
41 return;
42 }) {
43 .string => |s| s,
44 else => {
45 json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request);
46 return;
47 },
48 };
49
50 if (db.work_queues.getByPoolAndName(alloc, pool.id, name) catch null) |_| {
51 json_util.sendStatus(r, "{\"detail\":\"Work queue already exists.\"}", .conflict);
52 return;
53 }
54
55 const description = pool_helpers.getOptionalString(obj.get("description")) orelse "";
56 const is_paused = pool_helpers.getOptionalBool(obj.get("is_paused")) orelse false;
57 const concurrency_limit = pool_helpers.getOptionalInt(obj.get("concurrency_limit"));
58 const priority = pool_helpers.getOptionalInt(obj.get("priority")) orelse (db.work_queues.nextPriority(pool.id) catch 1);
59
60 var id_buf: [36]u8 = undefined;
61 const queue_id = uuid_util.generate(&id_buf);
62
63 var ts_buf: [32]u8 = undefined;
64 const now = time_util.timestamp(&ts_buf);
65
66 const status: db.work_queues.Status = if (is_paused) .paused else .not_ready;
67
68 db.work_queues.insert(
69 queue_id,
70 name,
71 description,
72 is_paused,
73 concurrency_limit,
74 priority,
75 pool.id,
76 status,
77 now,
78 ) catch {
79 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
80 return;
81 };
82
83 const queue = db.work_queues.getByPoolAndName(alloc, pool.id, name) catch null orelse {
84 json_util.sendStatus(r, "{\"detail\":\"queue not found after insert\"}", .internal_server_error);
85 return;
86 };
87
88 const resp = writeQueue(alloc, queue, pool_name) catch {
89 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
90 return;
91 };
92 json_util.sendStatus(r, resp, .created);
93}
94
95pub fn get(r: zap.Request, target: []const u8) !void {
96 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
97 defer arena.deinit();
98 const alloc = arena.allocator();
99
100 const pool_name = pool_helpers.extractPoolName(target) orelse {
101 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
102 return;
103 };
104
105 const queue_name = pool_helpers.extractQueueName(target) orelse {
106 json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request);
107 return;
108 };
109
110 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
111 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
112 return;
113 };
114
115 if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| {
116 const resp = writeQueue(alloc, queue, pool_name) catch {
117 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
118 return;
119 };
120 json_util.send(r, resp);
121 } else {
122 json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found);
123 }
124}
125
126pub fn update(r: zap.Request, target: []const u8) !void {
127 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
128 defer arena.deinit();
129 const alloc = arena.allocator();
130
131 const pool_name = pool_helpers.extractPoolName(target) orelse {
132 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
133 return;
134 };
135
136 const queue_name = pool_helpers.extractQueueName(target) orelse {
137 json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request);
138 return;
139 };
140
141 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
142 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
143 return;
144 };
145
146 const body = r.body orelse {
147 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
148 return;
149 };
150
151 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
152 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
153 return;
154 };
155
156 const obj = parsed.value.object;
157
158 const new_name = pool_helpers.getOptionalString(obj.get("name"));
159 const description = pool_helpers.getOptionalString(obj.get("description"));
160 const is_paused = pool_helpers.getOptionalBool(obj.get("is_paused"));
161 const concurrency_limit = pool_helpers.getOptionalInt(obj.get("concurrency_limit"));
162 const priority = pool_helpers.getOptionalInt(obj.get("priority"));
163
164 var new_status: ?db.work_queues.Status = null;
165 if (is_paused) |paused| {
166 new_status = if (paused) .paused else .not_ready;
167 }
168
169 var ts_buf: [32]u8 = undefined;
170 const now = time_util.timestamp(&ts_buf);
171
172 const did_update = db.work_queues.updateByPoolAndName(
173 pool.id,
174 queue_name,
175 new_name,
176 description,
177 is_paused,
178 concurrency_limit,
179 priority,
180 new_status,
181 now,
182 ) catch {
183 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
184 return;
185 };
186
187 if (!did_update) {
188 json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found);
189 return;
190 }
191
192 r.setStatus(.no_content);
193 r.sendBody("") catch {};
194}
195
196pub fn delete(r: zap.Request, target: []const u8) !void {
197 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
198 defer arena.deinit();
199 const alloc = arena.allocator();
200
201 const pool_name = pool_helpers.extractPoolName(target) orelse {
202 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
203 return;
204 };
205
206 const queue_name = pool_helpers.extractQueueName(target) orelse {
207 json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request);
208 return;
209 };
210
211 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
212 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
213 return;
214 };
215
216 if (pool.default_queue_id) |default_id| {
217 if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| {
218 if (mem.eql(u8, queue.id, default_id)) {
219 json_util.sendStatus(r, "{\"detail\":\"Cannot delete the default work queue.\"}", .bad_request);
220 return;
221 }
222 }
223 }
224
225 const deleted = db.work_queues.deleteByPoolAndName(pool.id, queue_name) catch {
226 json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error);
227 return;
228 };
229
230 if (!deleted) {
231 json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found);
232 return;
233 }
234
235 r.setStatus(.no_content);
236 r.sendBody("") catch {};
237}
238
239pub fn filter(r: zap.Request, target: []const u8) !void {
240 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
241 defer arena.deinit();
242 const alloc = arena.allocator();
243
244 const pool_name = pool_helpers.extractPoolName(target) orelse {
245 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
246 return;
247 };
248
249 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
250 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
251 return;
252 };
253
254 var limit: usize = 200;
255 var offset: usize = 0;
256
257 if (r.body) |body| {
258 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| {
259 const obj = parsed.value.object;
260 if (obj.get("limit")) |v| {
261 if (v == .integer) limit = @intCast(v.integer);
262 }
263 if (obj.get("offset")) |v| {
264 if (v == .integer) offset = @intCast(v.integer);
265 }
266 } else |_| {}
267 }
268
269 const queues = db.work_queues.listByPool(alloc, pool.id, limit, offset) catch {
270 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
271 return;
272 };
273
274 var output: std.io.Writer.Allocating = .init(alloc);
275 var jw: json.Stringify = .{ .writer = &output.writer };
276
277 jw.beginArray() catch {
278 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
279 return;
280 };
281
282 for (queues) |queue| {
283 writeQueueObject(&jw, queue, pool_name) catch continue;
284 }
285
286 jw.endArray() catch {};
287
288 json_util.send(r, output.toOwnedSlice() catch "[]");
289}
290
291// JSON serialization
292
293fn writeQueue(alloc: std.mem.Allocator, queue: db.work_queues.WorkQueueRow, pool_name: []const u8) ![]const u8 {
294 var output: std.io.Writer.Allocating = .init(alloc);
295 var jw: json.Stringify = .{ .writer = &output.writer };
296 try writeQueueObject(&jw, queue, pool_name);
297 return output.toOwnedSlice();
298}
299
300fn writeQueueObject(jw: *json.Stringify, queue: db.work_queues.WorkQueueRow, pool_name: []const u8) !void {
301 try jw.beginObject();
302
303 try jw.objectField("id");
304 try jw.write(queue.id);
305
306 try jw.objectField("created");
307 try jw.write(queue.created);
308
309 try jw.objectField("updated");
310 try jw.write(queue.updated);
311
312 try jw.objectField("name");
313 try jw.write(queue.name);
314
315 try jw.objectField("description");
316 try jw.write(queue.description);
317
318 try jw.objectField("is_paused");
319 try jw.write(queue.is_paused);
320
321 try jw.objectField("concurrency_limit");
322 if (queue.concurrency_limit) |c| {
323 try jw.write(c);
324 } else {
325 try jw.write(null);
326 }
327
328 try jw.objectField("priority");
329 try jw.write(queue.priority);
330
331 try jw.objectField("work_pool_id");
332 try jw.write(queue.work_pool_id);
333
334 try jw.objectField("work_pool_name");
335 try jw.write(pool_name);
336
337 try jw.objectField("last_polled");
338 if (queue.last_polled) |lp| {
339 try jw.write(lp);
340 } else {
341 try jw.write(null);
342 }
343
344 try jw.objectField("status");
345 try jw.write(queue.status.toString());
346
347 try jw.endObject();
348}