prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5
6const db = @import("../db/sqlite.zig");
7const uuid_util = @import("../utilities/uuid.zig");
8const time_util = @import("../utilities/time.zig");
9const hashing = @import("../utilities/hashing.zig");
10
11fn sendJson(r: zap.Request, body: []const u8) void {
12 r.setHeader("content-type", "application/json") catch {};
13 r.setHeader("access-control-allow-origin", "*") catch {};
14 r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {};
15 r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {};
16 r.sendBody(body) catch {};
17}
18
19fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void {
20 r.setStatus(status);
21 sendJson(r, body);
22}
23
24// Routes:
25// POST /block_schemas/ - create (idempotent)
26// POST /block_schemas/filter - list
27// GET /block_schemas/checksum/{checksum} - read by checksum
28// GET /block_schemas/{id} - read by id
29pub fn handle(r: zap.Request) !void {
30 const target = r.path orelse "/";
31 const method = r.method orelse "GET";
32
33 // strip /api prefix if present
34 const path = if (mem.startsWith(u8, target, "/api/block_schemas"))
35 target[4..]
36 else
37 target;
38
39 // POST /block_schemas/filter
40 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, path, "/filter")) {
41 try filter(r);
42 return;
43 }
44
45 // POST /block_schemas/ - create
46 if (mem.eql(u8, method, "POST") and (mem.eql(u8, path, "/block_schemas/") or mem.eql(u8, path, "/block_schemas"))) {
47 try create(r);
48 return;
49 }
50
51 // GET /block_schemas/checksum/{checksum}
52 if (mem.eql(u8, method, "GET") and mem.startsWith(u8, path, "/block_schemas/checksum/")) {
53 const checksum = path["/block_schemas/checksum/".len..];
54 try getByChecksum(r, checksum);
55 return;
56 }
57
58 // GET /block_schemas/{id}
59 if (mem.eql(u8, method, "GET") and mem.startsWith(u8, path, "/block_schemas/")) {
60 const id = path["/block_schemas/".len..];
61 if (id.len >= 32) {
62 try getById(r, id);
63 return;
64 }
65 }
66
67 sendJsonStatus(r, "{\"detail\":\"not found\"}", .not_found);
68}
69
70fn create(r: zap.Request) !void {
71 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
72 defer arena.deinit();
73 const alloc = arena.allocator();
74
75 const body = r.body orelse {
76 sendJsonStatus(r, "{\"detail\":\"request body required\"}", .bad_request);
77 return;
78 };
79
80 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
81 sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
82 return;
83 };
84
85 const obj = parsed.value.object;
86
87 const block_type_id = if (obj.get("block_type_id")) |v| v.string else {
88 sendJsonStatus(r, "{\"detail\":\"block_type_id required\"}", .bad_request);
89 return;
90 };
91
92 const version = if (obj.get("version")) |v| if (v == .string) v.string else "non-versioned" else "non-versioned";
93
94 // serialize fields and capabilities using streaming writer
95 const fields = blk: {
96 if (obj.get("fields")) |v| {
97 var out: std.Io.Writer.Allocating = .init(alloc);
98 var jw: json.Stringify = .{ .writer = &out.writer };
99 jw.write(v) catch break :blk "{}";
100 break :blk out.toOwnedSlice() catch "{}";
101 } else break :blk "{}";
102 };
103
104 const capabilities = blk: {
105 if (obj.get("capabilities")) |v| {
106 var out: std.Io.Writer.Allocating = .init(alloc);
107 var jw: json.Stringify = .{ .writer = &out.writer };
108 jw.write(v) catch break :blk "[]";
109 break :blk out.toOwnedSlice() catch "[]";
110 } else break :blk "[]";
111 };
112
113 // compute checksum from fields (sha256, sorted keys to match python prefect)
114 // python uses hash_objects(fields) which serializes as [[fields], {}] with sort_keys=True
115 const checksum = hashing.hashJson(alloc, fields) catch {
116 sendJsonStatus(r, "{\"detail\":\"checksum computation failed\"}", .internal_server_error);
117 return;
118 };
119
120 // check if schema already exists (idempotent)
121 if (db.block_schemas.getByChecksum(alloc, checksum, version) catch null) |existing| {
122 try sendBlockSchemaResponse(r, alloc, existing, .ok);
123 return;
124 }
125
126 var id_buf: [36]u8 = undefined;
127 const id = uuid_util.generate(&id_buf);
128
129 db.block_schemas.insert(id, checksum, fields, capabilities, version, block_type_id) catch {
130 sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
131 return;
132 };
133
134 var ts_buf: [32]u8 = undefined;
135 const now = time_util.timestamp(&ts_buf);
136
137 const schema = db.block_schemas.BlockSchemaRow{
138 .id = id,
139 .created = now,
140 .updated = now,
141 .checksum = checksum,
142 .fields = fields,
143 .capabilities = capabilities,
144 .version = version,
145 .block_type_id = block_type_id,
146 };
147
148 try sendBlockSchemaResponse(r, alloc, schema, .created);
149}
150
151fn getByChecksum(r: zap.Request, checksum: []const u8) !void {
152 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
153 defer arena.deinit();
154 const alloc = arena.allocator();
155
156 // check for version query param (simplified - just look for ?version=)
157 var version: ?[]const u8 = null;
158 if (mem.indexOf(u8, checksum, "?version=")) |idx| {
159 version = checksum[idx + "?version=".len ..];
160 const actual_checksum = checksum[0..idx];
161 const schema = db.block_schemas.getByChecksum(alloc, actual_checksum, version) catch null orelse {
162 sendJsonStatus(r, "{\"detail\":\"block schema not found\"}", .not_found);
163 return;
164 };
165 try sendBlockSchemaResponse(r, alloc, schema, .ok);
166 return;
167 }
168
169 const schema = db.block_schemas.getByChecksum(alloc, checksum, null) catch null orelse {
170 sendJsonStatus(r, "{\"detail\":\"block schema not found\"}", .not_found);
171 return;
172 };
173
174 try sendBlockSchemaResponse(r, alloc, schema, .ok);
175}
176
177fn getById(r: zap.Request, id: []const u8) !void {
178 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
179 defer arena.deinit();
180 const alloc = arena.allocator();
181
182 const schema = db.block_schemas.getById(alloc, id) catch null orelse {
183 sendJsonStatus(r, "{\"detail\":\"block schema not found\"}", .not_found);
184 return;
185 };
186
187 try sendBlockSchemaResponse(r, alloc, schema, .ok);
188}
189
190fn filter(r: zap.Request) !void {
191 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
192 defer arena.deinit();
193 const alloc = arena.allocator();
194
195 const schemas = db.block_schemas.list(alloc, 200) catch {
196 sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
197 return;
198 };
199
200 var output: std.Io.Writer.Allocating = .init(alloc);
201 var jw: json.Stringify = .{ .writer = &output.writer };
202
203 jw.beginArray() catch {
204 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
205 return;
206 };
207 for (schemas) |s| {
208 writeBlockSchema(&jw, s) catch continue;
209 }
210 jw.endArray() catch {};
211
212 sendJson(r, output.toOwnedSlice() catch "[]");
213}
214
215fn writeBlockSchema(jw: *json.Stringify, s: db.block_schemas.BlockSchemaRow) !void {
216 try jw.beginObject();
217
218 try jw.objectField("id");
219 try jw.write(s.id);
220
221 try jw.objectField("created");
222 try jw.write(s.created);
223
224 try jw.objectField("updated");
225 try jw.write(s.updated);
226
227 try jw.objectField("checksum");
228 try jw.write(s.checksum);
229
230 try jw.objectField("fields");
231 try jw.beginWriteRaw();
232 try jw.writer.writeAll(s.fields);
233 jw.endWriteRaw();
234
235 try jw.objectField("capabilities");
236 try jw.beginWriteRaw();
237 try jw.writer.writeAll(s.capabilities);
238 jw.endWriteRaw();
239
240 try jw.objectField("version");
241 try jw.write(s.version);
242
243 try jw.objectField("block_type_id");
244 try jw.write(s.block_type_id);
245
246 try jw.objectField("block_type");
247 try jw.write(null);
248
249 try jw.endObject();
250}
251
252fn sendBlockSchemaResponse(r: zap.Request, alloc: std.mem.Allocator, schema: db.block_schemas.BlockSchemaRow, status: zap.http.StatusCode) !void {
253 var output: std.Io.Writer.Allocating = .init(alloc);
254 var jw: json.Stringify = .{ .writer = &output.writer };
255
256 writeBlockSchema(&jw, schema) catch {
257 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
258 return;
259 };
260
261 sendJsonStatus(r, output.toOwnedSlice() catch "{}", status);
262}