prefect server in zig
at main 348 lines 11 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const mem = std.mem; 4const json = std.json; 5 6const db = @import("../db/sqlite.zig"); 7const uuid_util = @import("../utilities/uuid.zig"); 8const time_util = @import("../utilities/time.zig"); 9const json_util = @import("../utilities/json.zig"); 10const pool_helpers = @import("work_pools.zig"); 11 12pub fn create(r: zap.Request, target: []const u8) !void { 13 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 14 defer arena.deinit(); 15 const alloc = arena.allocator(); 16 17 const pool_name = pool_helpers.extractPoolName(target) orelse { 18 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 19 return; 20 }; 21 22 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 23 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 24 return; 25 }; 26 27 const body = r.body orelse { 28 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 29 return; 30 }; 31 32 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 33 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 34 return; 35 }; 36 37 const obj = parsed.value.object; 38 39 const name = switch (obj.get("name") orelse { 40 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 41 return; 42 }) { 43 .string => |s| s, 44 else => { 45 json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); 46 return; 47 }, 48 }; 49 50 if (db.work_queues.getByPoolAndName(alloc, pool.id, name) catch null) |_| { 51 json_util.sendStatus(r, "{\"detail\":\"Work queue already exists.\"}", .conflict); 52 return; 53 } 54 55 const description = pool_helpers.getOptionalString(obj.get("description")) orelse ""; 56 const is_paused = pool_helpers.getOptionalBool(obj.get("is_paused")) orelse false; 57 const concurrency_limit = pool_helpers.getOptionalInt(obj.get("concurrency_limit")); 58 const priority = pool_helpers.getOptionalInt(obj.get("priority")) orelse (db.work_queues.nextPriority(pool.id) catch 1); 59 60 var id_buf: [36]u8 = undefined; 61 const queue_id = uuid_util.generate(&id_buf); 62 63 var ts_buf: [32]u8 = undefined; 64 const now = time_util.timestamp(&ts_buf); 65 66 const status: db.work_queues.Status = if (is_paused) .paused else .not_ready; 67 68 db.work_queues.insert( 69 queue_id, 70 name, 71 description, 72 is_paused, 73 concurrency_limit, 74 priority, 75 pool.id, 76 status, 77 now, 78 ) catch { 79 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 80 return; 81 }; 82 83 const queue = db.work_queues.getByPoolAndName(alloc, pool.id, name) catch null orelse { 84 json_util.sendStatus(r, "{\"detail\":\"queue not found after insert\"}", .internal_server_error); 85 return; 86 }; 87 88 const resp = writeQueue(alloc, queue, pool_name) catch { 89 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 90 return; 91 }; 92 json_util.sendStatus(r, resp, .created); 93} 94 95pub fn get(r: zap.Request, target: []const u8) !void { 96 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 97 defer arena.deinit(); 98 const alloc = arena.allocator(); 99 100 const pool_name = pool_helpers.extractPoolName(target) orelse { 101 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 102 return; 103 }; 104 105 const queue_name = pool_helpers.extractQueueName(target) orelse { 106 json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); 107 return; 108 }; 109 110 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 111 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 112 return; 113 }; 114 115 if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| { 116 const resp = writeQueue(alloc, queue, pool_name) catch { 117 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 118 return; 119 }; 120 json_util.send(r, resp); 121 } else { 122 json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); 123 } 124} 125 126pub fn update(r: zap.Request, target: []const u8) !void { 127 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 128 defer arena.deinit(); 129 const alloc = arena.allocator(); 130 131 const pool_name = pool_helpers.extractPoolName(target) orelse { 132 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 133 return; 134 }; 135 136 const queue_name = pool_helpers.extractQueueName(target) orelse { 137 json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); 138 return; 139 }; 140 141 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 142 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 143 return; 144 }; 145 146 const body = r.body orelse { 147 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 148 return; 149 }; 150 151 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 152 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 153 return; 154 }; 155 156 const obj = parsed.value.object; 157 158 const new_name = pool_helpers.getOptionalString(obj.get("name")); 159 const description = pool_helpers.getOptionalString(obj.get("description")); 160 const is_paused = pool_helpers.getOptionalBool(obj.get("is_paused")); 161 const concurrency_limit = pool_helpers.getOptionalInt(obj.get("concurrency_limit")); 162 const priority = pool_helpers.getOptionalInt(obj.get("priority")); 163 164 var new_status: ?db.work_queues.Status = null; 165 if (is_paused) |paused| { 166 new_status = if (paused) .paused else .not_ready; 167 } 168 169 var ts_buf: [32]u8 = undefined; 170 const now = time_util.timestamp(&ts_buf); 171 172 const did_update = db.work_queues.updateByPoolAndName( 173 pool.id, 174 queue_name, 175 new_name, 176 description, 177 is_paused, 178 concurrency_limit, 179 priority, 180 new_status, 181 now, 182 ) catch { 183 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 184 return; 185 }; 186 187 if (!did_update) { 188 json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); 189 return; 190 } 191 192 r.setStatus(.no_content); 193 r.sendBody("") catch {}; 194} 195 196pub fn delete(r: zap.Request, target: []const u8) !void { 197 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 198 defer arena.deinit(); 199 const alloc = arena.allocator(); 200 201 const pool_name = pool_helpers.extractPoolName(target) orelse { 202 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 203 return; 204 }; 205 206 const queue_name = pool_helpers.extractQueueName(target) orelse { 207 json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); 208 return; 209 }; 210 211 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 212 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 213 return; 214 }; 215 216 if (pool.default_queue_id) |default_id| { 217 if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| { 218 if (mem.eql(u8, queue.id, default_id)) { 219 json_util.sendStatus(r, "{\"detail\":\"Cannot delete the default work queue.\"}", .bad_request); 220 return; 221 } 222 } 223 } 224 225 const deleted = db.work_queues.deleteByPoolAndName(pool.id, queue_name) catch { 226 json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 227 return; 228 }; 229 230 if (!deleted) { 231 json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); 232 return; 233 } 234 235 r.setStatus(.no_content); 236 r.sendBody("") catch {}; 237} 238 239pub fn filter(r: zap.Request, target: []const u8) !void { 240 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 241 defer arena.deinit(); 242 const alloc = arena.allocator(); 243 244 const pool_name = pool_helpers.extractPoolName(target) orelse { 245 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 246 return; 247 }; 248 249 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 250 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 251 return; 252 }; 253 254 var limit: usize = 200; 255 var offset: usize = 0; 256 257 if (r.body) |body| { 258 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 259 const obj = parsed.value.object; 260 if (obj.get("limit")) |v| { 261 if (v == .integer) limit = @intCast(v.integer); 262 } 263 if (obj.get("offset")) |v| { 264 if (v == .integer) offset = @intCast(v.integer); 265 } 266 } else |_| {} 267 } 268 269 const queues = db.work_queues.listByPool(alloc, pool.id, limit, offset) catch { 270 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 271 return; 272 }; 273 274 var output: std.io.Writer.Allocating = .init(alloc); 275 var jw: json.Stringify = .{ .writer = &output.writer }; 276 277 jw.beginArray() catch { 278 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 279 return; 280 }; 281 282 for (queues) |queue| { 283 writeQueueObject(&jw, queue, pool_name) catch continue; 284 } 285 286 jw.endArray() catch {}; 287 288 json_util.send(r, output.toOwnedSlice() catch "[]"); 289} 290 291// JSON serialization 292 293fn writeQueue(alloc: std.mem.Allocator, queue: db.work_queues.WorkQueueRow, pool_name: []const u8) ![]const u8 { 294 var output: std.io.Writer.Allocating = .init(alloc); 295 var jw: json.Stringify = .{ .writer = &output.writer }; 296 try writeQueueObject(&jw, queue, pool_name); 297 return output.toOwnedSlice(); 298} 299 300fn writeQueueObject(jw: *json.Stringify, queue: db.work_queues.WorkQueueRow, pool_name: []const u8) !void { 301 try jw.beginObject(); 302 303 try jw.objectField("id"); 304 try jw.write(queue.id); 305 306 try jw.objectField("created"); 307 try jw.write(queue.created); 308 309 try jw.objectField("updated"); 310 try jw.write(queue.updated); 311 312 try jw.objectField("name"); 313 try jw.write(queue.name); 314 315 try jw.objectField("description"); 316 try jw.write(queue.description); 317 318 try jw.objectField("is_paused"); 319 try jw.write(queue.is_paused); 320 321 try jw.objectField("concurrency_limit"); 322 if (queue.concurrency_limit) |c| { 323 try jw.write(c); 324 } else { 325 try jw.write(null); 326 } 327 328 try jw.objectField("priority"); 329 try jw.write(queue.priority); 330 331 try jw.objectField("work_pool_id"); 332 try jw.write(queue.work_pool_id); 333 334 try jw.objectField("work_pool_name"); 335 try jw.write(pool_name); 336 337 try jw.objectField("last_polled"); 338 if (queue.last_polled) |lp| { 339 try jw.write(lp); 340 } else { 341 try jw.write(null); 342 } 343 344 try jw.objectField("status"); 345 try jw.write(queue.status.toString()); 346 347 try jw.endObject(); 348}