prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5
6const db = @import("../db/sqlite.zig");
7const uuid_util = @import("../utilities/uuid.zig");
8const time_util = @import("../utilities/time.zig");
9const json_util = @import("../utilities/json.zig");
10
11// POST /flows/ - create or get flow by name
12// POST /flows/filter - list flows
13// GET /flows/{id} - get flow by id
14// PATCH /flows/{id} - update flow
15// DELETE /flows/{id} - delete flow
16pub fn handle(r: zap.Request) !void {
17 const target = r.path orelse "/";
18 const method = r.method orelse "GET";
19
20 // POST /flows/filter - list
21 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) {
22 try filter(r);
23 return;
24 }
25
26 // extract id from path for GET/PATCH/DELETE
27 const prefix = if (mem.startsWith(u8, target, "/api/flows/")) "/api/flows/" else "/flows/";
28 const has_id = target.len > prefix.len;
29 const flow_id = if (has_id) target[prefix.len..] else "";
30
31 if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/flows/") or mem.eql(u8, target, "/api/flows/"))) {
32 try createFlow(r);
33 } else if (mem.eql(u8, method, "GET") and has_id) {
34 try getFlow(r, flow_id);
35 } else if (mem.eql(u8, method, "PATCH") and has_id) {
36 try patchFlow(r, flow_id);
37 } else if (mem.eql(u8, method, "DELETE") and has_id) {
38 try deleteFlow(r, flow_id);
39 } else if (mem.eql(u8, method, "GET") or mem.eql(u8, method, "PATCH") or mem.eql(u8, method, "DELETE")) {
40 json_util.sendStatus(r, "{\"detail\":\"flow id required\"}", .bad_request);
41 } else {
42 json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented);
43 }
44}
45
46fn createFlow(r: zap.Request) !void {
47 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
48 defer arena.deinit();
49 const alloc = arena.allocator();
50
51 const body = r.body orelse {
52 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
53 return;
54 };
55
56 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
57 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
58 return;
59 };
60
61 const name = parsed.value.object.get("name") orelse {
62 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request);
63 return;
64 };
65 const name_str = name.string;
66
67 // try to get existing flow first
68 if (db.getFlowByName(alloc, name_str) catch null) |flow| {
69 const resp = writeFlow(alloc, flow) catch {
70 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
71 return;
72 };
73 json_util.send(r, resp);
74 return;
75 }
76
77 // create new flow
78 var new_id_buf: [36]u8 = undefined;
79 const new_id = uuid_util.generate(&new_id_buf);
80
81 db.insertFlow(new_id, name_str) catch {
82 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
83 return;
84 };
85
86 var ts_buf: [32]u8 = undefined;
87 const now = time_util.timestamp(&ts_buf);
88
89 const flow = db.FlowRow{
90 .id = new_id,
91 .created = now,
92 .updated = now,
93 .name = name_str,
94 .tags = "[]",
95 };
96
97 const resp = writeFlow(alloc, flow) catch {
98 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
99 return;
100 };
101 json_util.sendStatus(r, resp, .created);
102}
103
104fn getFlow(r: zap.Request, flow_id: []const u8) !void {
105 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
106 defer arena.deinit();
107 const alloc = arena.allocator();
108
109 if (db.getFlowById(alloc, flow_id) catch null) |flow| {
110 const resp = writeFlow(alloc, flow) catch {
111 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
112 return;
113 };
114 json_util.send(r, resp);
115 } else {
116 json_util.sendStatus(r, "{\"detail\":\"flow not found\"}", .not_found);
117 }
118}
119
120fn patchFlow(r: zap.Request, flow_id: []const u8) !void {
121 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
122 defer arena.deinit();
123 const alloc = arena.allocator();
124
125 const body = r.body orelse {
126 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
127 return;
128 };
129
130 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
131 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
132 return;
133 };
134
135 // extract tags if provided
136 var tags_json: ?[]const u8 = null;
137 if (parsed.value.object.get("tags")) |tags_val| {
138 var output: std.Io.Writer.Allocating = .init(alloc);
139 var jw: json.Stringify = .{ .writer = &output.writer };
140 jw.write(tags_val) catch {
141 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
142 return;
143 };
144 tags_json = output.toOwnedSlice() catch null;
145 }
146
147 const updated = db.updateFlow(flow_id, tags_json) catch false;
148 if (!updated) {
149 json_util.sendStatus(r, "{\"detail\":\"flow not found\"}", .not_found);
150 return;
151 }
152
153 json_util.sendStatus(r, "", .no_content);
154}
155
156fn deleteFlow(r: zap.Request, flow_id: []const u8) !void {
157 const deleted = db.deleteFlow(flow_id) catch false;
158 if (!deleted) {
159 json_util.sendStatus(r, "{\"detail\":\"flow not found\"}", .not_found);
160 return;
161 }
162
163 json_util.sendStatus(r, "", .no_content);
164}
165
166fn filter(r: zap.Request) !void {
167 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
168 defer arena.deinit();
169 const alloc = arena.allocator();
170
171 const flows = db.listFlows(alloc, 50) catch {
172 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
173 return;
174 };
175
176 var output: std.Io.Writer.Allocating = .init(alloc);
177 var jw: json.Stringify = .{ .writer = &output.writer };
178
179 jw.beginArray() catch {
180 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
181 return;
182 };
183
184 for (flows) |flow| {
185 writeFlowObject(&jw, flow) catch continue;
186 }
187
188 jw.endArray() catch {};
189
190 json_util.send(r, output.toOwnedSlice() catch "[]");
191}
192
193fn writeFlow(alloc: std.mem.Allocator, flow: db.FlowRow) ![]const u8 {
194 var output: std.Io.Writer.Allocating = .init(alloc);
195 var jw: json.Stringify = .{ .writer = &output.writer };
196 try writeFlowObject(&jw, flow);
197 return output.toOwnedSlice();
198}
199
200fn writeFlowObject(jw: *json.Stringify, flow: db.FlowRow) !void {
201 try jw.beginObject();
202
203 try jw.objectField("id");
204 try jw.write(flow.id);
205
206 try jw.objectField("created");
207 try jw.write(flow.created);
208
209 try jw.objectField("updated");
210 try jw.write(flow.updated);
211
212 try jw.objectField("name");
213 try jw.write(flow.name);
214
215 try jw.objectField("tags");
216 try jw.beginWriteRaw();
217 try jw.writer.writeAll(flow.tags);
218 jw.endWriteRaw();
219
220 try jw.endObject();
221}