prefect server in zig
at main 262 lines 8.7 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const mem = std.mem; 4const json = std.json; 5 6const db = @import("../db/sqlite.zig"); 7const uuid_util = @import("../utilities/uuid.zig"); 8const time_util = @import("../utilities/time.zig"); 9const hashing = @import("../utilities/hashing.zig"); 10 11fn sendJson(r: zap.Request, body: []const u8) void { 12 r.setHeader("content-type", "application/json") catch {}; 13 r.setHeader("access-control-allow-origin", "*") catch {}; 14 r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 15 r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 16 r.sendBody(body) catch {}; 17} 18 19fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void { 20 r.setStatus(status); 21 sendJson(r, body); 22} 23 24// Routes: 25// POST /block_schemas/ - create (idempotent) 26// POST /block_schemas/filter - list 27// GET /block_schemas/checksum/{checksum} - read by checksum 28// GET /block_schemas/{id} - read by id 29pub fn handle(r: zap.Request) !void { 30 const target = r.path orelse "/"; 31 const method = r.method orelse "GET"; 32 33 // strip /api prefix if present 34 const path = if (mem.startsWith(u8, target, "/api/block_schemas")) 35 target[4..] 36 else 37 target; 38 39 // POST /block_schemas/filter 40 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, path, "/filter")) { 41 try filter(r); 42 return; 43 } 44 45 // POST /block_schemas/ - create 46 if (mem.eql(u8, method, "POST") and (mem.eql(u8, path, "/block_schemas/") or mem.eql(u8, path, "/block_schemas"))) { 47 try create(r); 48 return; 49 } 50 51 // GET /block_schemas/checksum/{checksum} 52 if (mem.eql(u8, method, "GET") and mem.startsWith(u8, path, "/block_schemas/checksum/")) { 53 const checksum = path["/block_schemas/checksum/".len..]; 54 try getByChecksum(r, checksum); 55 return; 56 } 57 58 // GET /block_schemas/{id} 59 if (mem.eql(u8, method, "GET") and mem.startsWith(u8, path, "/block_schemas/")) { 60 const id = path["/block_schemas/".len..]; 61 if (id.len >= 32) { 62 try getById(r, id); 63 return; 64 } 65 } 66 67 sendJsonStatus(r, "{\"detail\":\"not found\"}", .not_found); 68} 69 70fn create(r: zap.Request) !void { 71 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 72 defer arena.deinit(); 73 const alloc = arena.allocator(); 74 75 const body = r.body orelse { 76 sendJsonStatus(r, "{\"detail\":\"request body required\"}", .bad_request); 77 return; 78 }; 79 80 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 81 sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 82 return; 83 }; 84 85 const obj = parsed.value.object; 86 87 const block_type_id = if (obj.get("block_type_id")) |v| v.string else { 88 sendJsonStatus(r, "{\"detail\":\"block_type_id required\"}", .bad_request); 89 return; 90 }; 91 92 const version = if (obj.get("version")) |v| if (v == .string) v.string else "non-versioned" else "non-versioned"; 93 94 // serialize fields and capabilities using streaming writer 95 const fields = blk: { 96 if (obj.get("fields")) |v| { 97 var out: std.Io.Writer.Allocating = .init(alloc); 98 var jw: json.Stringify = .{ .writer = &out.writer }; 99 jw.write(v) catch break :blk "{}"; 100 break :blk out.toOwnedSlice() catch "{}"; 101 } else break :blk "{}"; 102 }; 103 104 const capabilities = blk: { 105 if (obj.get("capabilities")) |v| { 106 var out: std.Io.Writer.Allocating = .init(alloc); 107 var jw: json.Stringify = .{ .writer = &out.writer }; 108 jw.write(v) catch break :blk "[]"; 109 break :blk out.toOwnedSlice() catch "[]"; 110 } else break :blk "[]"; 111 }; 112 113 // compute checksum from fields (sha256, sorted keys to match python prefect) 114 // python uses hash_objects(fields) which serializes as [[fields], {}] with sort_keys=True 115 const checksum = hashing.hashJson(alloc, fields) catch { 116 sendJsonStatus(r, "{\"detail\":\"checksum computation failed\"}", .internal_server_error); 117 return; 118 }; 119 120 // check if schema already exists (idempotent) 121 if (db.block_schemas.getByChecksum(alloc, checksum, version) catch null) |existing| { 122 try sendBlockSchemaResponse(r, alloc, existing, .ok); 123 return; 124 } 125 126 var id_buf: [36]u8 = undefined; 127 const id = uuid_util.generate(&id_buf); 128 129 db.block_schemas.insert(id, checksum, fields, capabilities, version, block_type_id) catch { 130 sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 131 return; 132 }; 133 134 var ts_buf: [32]u8 = undefined; 135 const now = time_util.timestamp(&ts_buf); 136 137 const schema = db.block_schemas.BlockSchemaRow{ 138 .id = id, 139 .created = now, 140 .updated = now, 141 .checksum = checksum, 142 .fields = fields, 143 .capabilities = capabilities, 144 .version = version, 145 .block_type_id = block_type_id, 146 }; 147 148 try sendBlockSchemaResponse(r, alloc, schema, .created); 149} 150 151fn getByChecksum(r: zap.Request, checksum: []const u8) !void { 152 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 153 defer arena.deinit(); 154 const alloc = arena.allocator(); 155 156 // check for version query param (simplified - just look for ?version=) 157 var version: ?[]const u8 = null; 158 if (mem.indexOf(u8, checksum, "?version=")) |idx| { 159 version = checksum[idx + "?version=".len ..]; 160 const actual_checksum = checksum[0..idx]; 161 const schema = db.block_schemas.getByChecksum(alloc, actual_checksum, version) catch null orelse { 162 sendJsonStatus(r, "{\"detail\":\"block schema not found\"}", .not_found); 163 return; 164 }; 165 try sendBlockSchemaResponse(r, alloc, schema, .ok); 166 return; 167 } 168 169 const schema = db.block_schemas.getByChecksum(alloc, checksum, null) catch null orelse { 170 sendJsonStatus(r, "{\"detail\":\"block schema not found\"}", .not_found); 171 return; 172 }; 173 174 try sendBlockSchemaResponse(r, alloc, schema, .ok); 175} 176 177fn getById(r: zap.Request, id: []const u8) !void { 178 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 179 defer arena.deinit(); 180 const alloc = arena.allocator(); 181 182 const schema = db.block_schemas.getById(alloc, id) catch null orelse { 183 sendJsonStatus(r, "{\"detail\":\"block schema not found\"}", .not_found); 184 return; 185 }; 186 187 try sendBlockSchemaResponse(r, alloc, schema, .ok); 188} 189 190fn filter(r: zap.Request) !void { 191 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 192 defer arena.deinit(); 193 const alloc = arena.allocator(); 194 195 const schemas = db.block_schemas.list(alloc, 200) catch { 196 sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 197 return; 198 }; 199 200 var output: std.Io.Writer.Allocating = .init(alloc); 201 var jw: json.Stringify = .{ .writer = &output.writer }; 202 203 jw.beginArray() catch { 204 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 205 return; 206 }; 207 for (schemas) |s| { 208 writeBlockSchema(&jw, s) catch continue; 209 } 210 jw.endArray() catch {}; 211 212 sendJson(r, output.toOwnedSlice() catch "[]"); 213} 214 215fn writeBlockSchema(jw: *json.Stringify, s: db.block_schemas.BlockSchemaRow) !void { 216 try jw.beginObject(); 217 218 try jw.objectField("id"); 219 try jw.write(s.id); 220 221 try jw.objectField("created"); 222 try jw.write(s.created); 223 224 try jw.objectField("updated"); 225 try jw.write(s.updated); 226 227 try jw.objectField("checksum"); 228 try jw.write(s.checksum); 229 230 try jw.objectField("fields"); 231 try jw.beginWriteRaw(); 232 try jw.writer.writeAll(s.fields); 233 jw.endWriteRaw(); 234 235 try jw.objectField("capabilities"); 236 try jw.beginWriteRaw(); 237 try jw.writer.writeAll(s.capabilities); 238 jw.endWriteRaw(); 239 240 try jw.objectField("version"); 241 try jw.write(s.version); 242 243 try jw.objectField("block_type_id"); 244 try jw.write(s.block_type_id); 245 246 try jw.objectField("block_type"); 247 try jw.write(null); 248 249 try jw.endObject(); 250} 251 252fn sendBlockSchemaResponse(r: zap.Request, alloc: std.mem.Allocator, schema: db.block_schemas.BlockSchemaRow, status: zap.http.StatusCode) !void { 253 var output: std.Io.Writer.Allocating = .init(alloc); 254 var jw: json.Stringify = .{ .writer = &output.writer }; 255 256 writeBlockSchema(&jw, schema) catch { 257 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 258 return; 259 }; 260 261 sendJsonStatus(r, output.toOwnedSlice() catch "{}", status); 262}