prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6
7/// Deployment status enum
8pub const Status = enum {
9 not_ready,
10 ready,
11
12 pub fn fromString(s: []const u8) Status {
13 if (std.mem.eql(u8, s, "READY")) return .ready;
14 return .not_ready;
15 }
16
17 pub fn toString(self: Status) []const u8 {
18 return switch (self) {
19 .not_ready => "NOT_READY",
20 .ready => "READY",
21 };
22 }
23};
24
25pub const DeploymentRow = struct {
26 id: []const u8,
27 created: []const u8,
28 updated: []const u8,
29 name: []const u8,
30 flow_id: []const u8,
31 version: ?[]const u8,
32 description: ?[]const u8,
33 paused: bool,
34 status: Status,
35 last_polled: ?[]const u8,
36 parameters: []const u8,
37 parameter_openapi_schema: ?[]const u8,
38 enforce_parameter_schema: bool,
39 tags: []const u8,
40 labels: []const u8,
41 path: ?[]const u8,
42 entrypoint: ?[]const u8,
43 job_variables: []const u8,
44 pull_steps: ?[]const u8,
45 work_pool_name: ?[]const u8,
46 work_queue_name: ?[]const u8,
47 work_queue_id: ?[]const u8,
48 storage_document_id: ?[]const u8,
49 infrastructure_document_id: ?[]const u8,
50 concurrency_limit: ?i64,
51};
52
53const Col = struct {
54 const id: usize = 0;
55 const created: usize = 1;
56 const updated: usize = 2;
57 const name: usize = 3;
58 const flow_id: usize = 4;
59 const version: usize = 5;
60 const description: usize = 6;
61 const paused: usize = 7;
62 const status: usize = 8;
63 const last_polled: usize = 9;
64 const parameters: usize = 10;
65 const parameter_openapi_schema: usize = 11;
66 const enforce_parameter_schema: usize = 12;
67 const tags: usize = 13;
68 const labels: usize = 14;
69 const path: usize = 15;
70 const entrypoint: usize = 16;
71 const job_variables: usize = 17;
72 const pull_steps: usize = 18;
73 const work_pool_name: usize = 19;
74 const work_queue_name: usize = 20;
75 const work_queue_id: usize = 21;
76 const storage_document_id: usize = 22;
77 const infrastructure_document_id: usize = 23;
78 const concurrency_limit: usize = 24;
79};
80
81const select_cols =
82 \\id, created, updated, name, flow_id, version, description, paused, status, last_polled,
83 \\parameters, parameter_openapi_schema, enforce_parameter_schema, tags, labels,
84 \\path, entrypoint, job_variables, pull_steps, work_pool_name, work_queue_name,
85 \\work_queue_id, storage_document_id, infrastructure_document_id, concurrency_limit
86;
87
88fn rowFromResult(alloc: Allocator, r: anytype) !DeploymentRow {
89 const concurrency = r.textOrNull(Col.concurrency_limit);
90 return DeploymentRow{
91 .id = try alloc.dupe(u8, r.text(Col.id)),
92 .created = try alloc.dupe(u8, r.text(Col.created)),
93 .updated = try alloc.dupe(u8, r.text(Col.updated)),
94 .name = try alloc.dupe(u8, r.text(Col.name)),
95 .flow_id = try alloc.dupe(u8, r.text(Col.flow_id)),
96 .version = if (r.textOrNull(Col.version)) |v| try alloc.dupe(u8, v) else null,
97 .description = if (r.textOrNull(Col.description)) |d| try alloc.dupe(u8, d) else null,
98 .paused = r.int(Col.paused) != 0,
99 .status = Status.fromString(r.text(Col.status)),
100 .last_polled = if (r.textOrNull(Col.last_polled)) |lp| try alloc.dupe(u8, lp) else null,
101 .parameters = try alloc.dupe(u8, r.text(Col.parameters)),
102 .parameter_openapi_schema = if (r.textOrNull(Col.parameter_openapi_schema)) |s| try alloc.dupe(u8, s) else null,
103 .enforce_parameter_schema = r.int(Col.enforce_parameter_schema) != 0,
104 .tags = try alloc.dupe(u8, r.text(Col.tags)),
105 .labels = try alloc.dupe(u8, r.text(Col.labels)),
106 .path = if (r.textOrNull(Col.path)) |p| try alloc.dupe(u8, p) else null,
107 .entrypoint = if (r.textOrNull(Col.entrypoint)) |e| try alloc.dupe(u8, e) else null,
108 .job_variables = try alloc.dupe(u8, r.text(Col.job_variables)),
109 .pull_steps = if (r.textOrNull(Col.pull_steps)) |ps| try alloc.dupe(u8, ps) else null,
110 .work_pool_name = if (r.textOrNull(Col.work_pool_name)) |wpn| try alloc.dupe(u8, wpn) else null,
111 .work_queue_name = if (r.textOrNull(Col.work_queue_name)) |wqn| try alloc.dupe(u8, wqn) else null,
112 .work_queue_id = if (r.textOrNull(Col.work_queue_id)) |wqi| try alloc.dupe(u8, wqi) else null,
113 .storage_document_id = if (r.textOrNull(Col.storage_document_id)) |sd| try alloc.dupe(u8, sd) else null,
114 .infrastructure_document_id = if (r.textOrNull(Col.infrastructure_document_id)) |id_| try alloc.dupe(u8, id_) else null,
115 .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null,
116 };
117}
118
119pub fn getById(alloc: Allocator, id: []const u8) !?DeploymentRow {
120 var r = backend.db.row(
121 "SELECT " ++ select_cols ++ " FROM deployment WHERE id = ?",
122 .{id},
123 ) catch return null;
124
125 if (r) |*row| {
126 defer row.deinit();
127 return try rowFromResult(alloc, row);
128 }
129 return null;
130}
131
132pub fn getByFlowAndName(alloc: Allocator, flow_id: []const u8, name: []const u8) !?DeploymentRow {
133 var r = backend.db.row(
134 "SELECT " ++ select_cols ++ " FROM deployment WHERE flow_id = ? AND name = ?",
135 .{ flow_id, name },
136 ) catch return null;
137
138 if (r) |*row| {
139 defer row.deinit();
140 return try rowFromResult(alloc, row);
141 }
142 return null;
143}
144
145pub const InsertParams = struct {
146 version: ?[]const u8 = null,
147 description: ?[]const u8 = null,
148 paused: bool = false,
149 status: Status = .not_ready,
150 parameters: []const u8 = "{}",
151 parameter_openapi_schema: ?[]const u8 = null,
152 enforce_parameter_schema: bool = true,
153 tags: []const u8 = "[]",
154 labels: []const u8 = "{}",
155 path: ?[]const u8 = null,
156 entrypoint: ?[]const u8 = null,
157 job_variables: []const u8 = "{}",
158 pull_steps: ?[]const u8 = null,
159 work_pool_name: ?[]const u8 = null,
160 work_queue_name: ?[]const u8 = null,
161 work_queue_id: ?[]const u8 = null,
162 storage_document_id: ?[]const u8 = null,
163 infrastructure_document_id: ?[]const u8 = null,
164 concurrency_limit: ?i64 = null,
165};
166
167pub fn insert(
168 id: []const u8,
169 name: []const u8,
170 flow_id: []const u8,
171 created: []const u8,
172 params: InsertParams,
173) !void {
174 backend.db.exec(
175 \\INSERT INTO deployment (id, created, updated, name, flow_id, version, description,
176 \\ paused, status, parameters, parameter_openapi_schema, enforce_parameter_schema,
177 \\ tags, labels, path, entrypoint, job_variables, pull_steps,
178 \\ work_pool_name, work_queue_name, work_queue_id,
179 \\ storage_document_id, infrastructure_document_id, concurrency_limit)
180 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
181 , .{
182 id,
183 created,
184 created,
185 name,
186 flow_id,
187 params.version,
188 params.description,
189 @as(i32, if (params.paused) 1 else 0),
190 params.status.toString(),
191 params.parameters,
192 params.parameter_openapi_schema,
193 @as(i32, if (params.enforce_parameter_schema) 1 else 0),
194 params.tags,
195 params.labels,
196 params.path,
197 params.entrypoint,
198 params.job_variables,
199 params.pull_steps,
200 params.work_pool_name,
201 params.work_queue_name,
202 params.work_queue_id,
203 params.storage_document_id,
204 params.infrastructure_document_id,
205 params.concurrency_limit,
206 }) catch |err| {
207 log.err("database", "insert deployment error: {}", .{err});
208 return err;
209 };
210}
211
212pub const UpdateParams = struct {
213 version: ?[]const u8 = null,
214 description: ?[]const u8 = null,
215 paused: ?bool = null,
216 parameters: ?[]const u8 = null,
217 parameter_openapi_schema: ?[]const u8 = null,
218 enforce_parameter_schema: ?bool = null,
219 tags: ?[]const u8 = null,
220 labels: ?[]const u8 = null,
221 path: ?[]const u8 = null,
222 entrypoint: ?[]const u8 = null,
223 job_variables: ?[]const u8 = null,
224 pull_steps: ?[]const u8 = null,
225 work_pool_name: ?[]const u8 = null,
226 work_queue_name: ?[]const u8 = null,
227 work_queue_id: ?[]const u8 = null,
228 concurrency_limit: ?i64 = null,
229};
230
231pub fn updateById(id: []const u8, updated: []const u8, params: UpdateParams) !bool {
232 const affected = backend.db.execWithRowCount(
233 \\UPDATE deployment SET
234 \\ version = COALESCE(?, version),
235 \\ description = COALESCE(?, description),
236 \\ paused = COALESCE(?, paused),
237 \\ parameters = COALESCE(?, parameters),
238 \\ parameter_openapi_schema = COALESCE(?, parameter_openapi_schema),
239 \\ enforce_parameter_schema = COALESCE(?, enforce_parameter_schema),
240 \\ tags = COALESCE(?, tags),
241 \\ labels = COALESCE(?, labels),
242 \\ path = COALESCE(?, path),
243 \\ entrypoint = COALESCE(?, entrypoint),
244 \\ job_variables = COALESCE(?, job_variables),
245 \\ pull_steps = COALESCE(?, pull_steps),
246 \\ work_pool_name = COALESCE(?, work_pool_name),
247 \\ work_queue_name = COALESCE(?, work_queue_name),
248 \\ work_queue_id = COALESCE(?, work_queue_id),
249 \\ concurrency_limit = COALESCE(?, concurrency_limit),
250 \\ updated = ?
251 \\WHERE id = ?
252 , .{
253 params.version,
254 params.description,
255 if (params.paused) |p| @as(?i32, if (p) 1 else 0) else null,
256 params.parameters,
257 params.parameter_openapi_schema,
258 if (params.enforce_parameter_schema) |e| @as(?i32, if (e) 1 else 0) else null,
259 params.tags,
260 params.labels,
261 params.path,
262 params.entrypoint,
263 params.job_variables,
264 params.pull_steps,
265 params.work_pool_name,
266 params.work_queue_name,
267 params.work_queue_id,
268 params.concurrency_limit,
269 updated,
270 id,
271 }) catch |err| {
272 log.err("database", "update deployment error: {}", .{err});
273 return err;
274 };
275 return affected > 0;
276}
277
278pub fn updateStatus(id: []const u8, status: Status, updated: []const u8) !bool {
279 const affected = backend.db.execWithRowCount(
280 "UPDATE deployment SET status = ?, updated = ? WHERE id = ?",
281 .{ status.toString(), updated, id },
282 ) catch |err| {
283 log.err("database", "update deployment status error: {}", .{err});
284 return err;
285 };
286 return affected > 0;
287}
288
289pub fn updateLastPolled(id: []const u8, last_polled: []const u8) !bool {
290 const affected = backend.db.execWithRowCount(
291 "UPDATE deployment SET last_polled = ?, updated = ? WHERE id = ?",
292 .{ last_polled, last_polled, id },
293 ) catch |err| {
294 log.err("database", "update deployment last_polled error: {}", .{err});
295 return err;
296 };
297 return affected > 0;
298}
299
300pub fn updatePaused(id: []const u8, paused: bool, updated: []const u8) !bool {
301 const affected = backend.db.execWithRowCount(
302 "UPDATE deployment SET paused = ?, updated = ? WHERE id = ?",
303 .{ @as(i64, if (paused) 1 else 0), updated, id },
304 ) catch |err| {
305 log.err("database", "update deployment paused error: {}", .{err});
306 return err;
307 };
308 return affected > 0;
309}
310
311pub fn deleteById(id: []const u8) !bool {
312 const affected = backend.db.execWithRowCount(
313 "DELETE FROM deployment WHERE id = ?",
314 .{id},
315 ) catch |err| {
316 log.err("database", "delete deployment error: {}", .{err});
317 return err;
318 };
319 return affected > 0;
320}
321
322pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]DeploymentRow {
323 var results = std.ArrayListUnmanaged(DeploymentRow){};
324 errdefer results.deinit(alloc);
325
326 var rows = backend.db.query(
327 "SELECT " ++ select_cols ++ " FROM deployment ORDER BY created DESC LIMIT ? OFFSET ?",
328 .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) },
329 ) catch |err| {
330 log.err("database", "list deployments error: {}", .{err});
331 return err;
332 };
333 defer rows.deinit();
334
335 while (rows.next()) |r| {
336 try results.append(alloc, try rowFromResult(alloc, &r));
337 }
338
339 return results.toOwnedSlice(alloc);
340}
341
342pub fn listByFlowId(alloc: Allocator, flow_id: []const u8, limit: usize, offset: usize) ![]DeploymentRow {
343 var results = std.ArrayListUnmanaged(DeploymentRow){};
344 errdefer results.deinit(alloc);
345
346 var rows = backend.db.query(
347 "SELECT " ++ select_cols ++ " FROM deployment WHERE flow_id = ? ORDER BY created DESC LIMIT ? OFFSET ?",
348 .{ flow_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) },
349 ) catch |err| {
350 log.err("database", "list deployments by flow error: {}", .{err});
351 return err;
352 };
353 defer rows.deinit();
354
355 while (rows.next()) |r| {
356 try results.append(alloc, try rowFromResult(alloc, &r));
357 }
358
359 return results.toOwnedSlice(alloc);
360}
361
362pub fn count() !usize {
363 var r = backend.db.row("SELECT COUNT(*) FROM deployment", .{}) catch return 0;
364 if (r) |*row| {
365 defer row.deinit();
366 return @intCast(row.bigint(0));
367 }
368 return 0;
369}
370
371/// Mark deployments as READY for the given work queue IDs
372/// Called when workers poll for scheduled runs
373pub fn markReadyByWorkQueues(work_queue_ids: []const []const u8, updated: []const u8) !void {
374 for (work_queue_ids) |queue_id| {
375 backend.db.exec(
376 "UPDATE deployment SET status = 'READY', last_polled = ?, updated = ? WHERE work_queue_id = ? AND status = 'NOT_READY'",
377 .{ updated, updated, queue_id },
378 ) catch |err| {
379 log.err("database", "mark deployments ready error: {}", .{err});
380 continue;
381 };
382 }
383}