prefect server in zig
at main 383 lines 14 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6 7/// Deployment status enum 8pub const Status = enum { 9 not_ready, 10 ready, 11 12 pub fn fromString(s: []const u8) Status { 13 if (std.mem.eql(u8, s, "READY")) return .ready; 14 return .not_ready; 15 } 16 17 pub fn toString(self: Status) []const u8 { 18 return switch (self) { 19 .not_ready => "NOT_READY", 20 .ready => "READY", 21 }; 22 } 23}; 24 25pub const DeploymentRow = struct { 26 id: []const u8, 27 created: []const u8, 28 updated: []const u8, 29 name: []const u8, 30 flow_id: []const u8, 31 version: ?[]const u8, 32 description: ?[]const u8, 33 paused: bool, 34 status: Status, 35 last_polled: ?[]const u8, 36 parameters: []const u8, 37 parameter_openapi_schema: ?[]const u8, 38 enforce_parameter_schema: bool, 39 tags: []const u8, 40 labels: []const u8, 41 path: ?[]const u8, 42 entrypoint: ?[]const u8, 43 job_variables: []const u8, 44 pull_steps: ?[]const u8, 45 work_pool_name: ?[]const u8, 46 work_queue_name: ?[]const u8, 47 work_queue_id: ?[]const u8, 48 storage_document_id: ?[]const u8, 49 infrastructure_document_id: ?[]const u8, 50 concurrency_limit: ?i64, 51}; 52 53const Col = struct { 54 const id: usize = 0; 55 const created: usize = 1; 56 const updated: usize = 2; 57 const name: usize = 3; 58 const flow_id: usize = 4; 59 const version: usize = 5; 60 const description: usize = 6; 61 const paused: usize = 7; 62 const status: usize = 8; 63 const last_polled: usize = 9; 64 const parameters: usize = 10; 65 const parameter_openapi_schema: usize = 11; 66 const enforce_parameter_schema: usize = 12; 67 const tags: usize = 13; 68 const labels: usize = 14; 69 const path: usize = 15; 70 const entrypoint: usize = 16; 71 const job_variables: usize = 17; 72 const pull_steps: usize = 18; 73 const work_pool_name: usize = 19; 74 const work_queue_name: usize = 20; 75 const work_queue_id: usize = 21; 76 const storage_document_id: usize = 22; 77 const infrastructure_document_id: usize = 23; 78 const concurrency_limit: usize = 24; 79}; 80 81const select_cols = 82 \\id, created, updated, name, flow_id, version, description, paused, status, last_polled, 83 \\parameters, parameter_openapi_schema, enforce_parameter_schema, tags, labels, 84 \\path, entrypoint, job_variables, pull_steps, work_pool_name, work_queue_name, 85 \\work_queue_id, storage_document_id, infrastructure_document_id, concurrency_limit 86; 87 88fn rowFromResult(alloc: Allocator, r: anytype) !DeploymentRow { 89 const concurrency = r.textOrNull(Col.concurrency_limit); 90 return DeploymentRow{ 91 .id = try alloc.dupe(u8, r.text(Col.id)), 92 .created = try alloc.dupe(u8, r.text(Col.created)), 93 .updated = try alloc.dupe(u8, r.text(Col.updated)), 94 .name = try alloc.dupe(u8, r.text(Col.name)), 95 .flow_id = try alloc.dupe(u8, r.text(Col.flow_id)), 96 .version = if (r.textOrNull(Col.version)) |v| try alloc.dupe(u8, v) else null, 97 .description = if (r.textOrNull(Col.description)) |d| try alloc.dupe(u8, d) else null, 98 .paused = r.int(Col.paused) != 0, 99 .status = Status.fromString(r.text(Col.status)), 100 .last_polled = if (r.textOrNull(Col.last_polled)) |lp| try alloc.dupe(u8, lp) else null, 101 .parameters = try alloc.dupe(u8, r.text(Col.parameters)), 102 .parameter_openapi_schema = if (r.textOrNull(Col.parameter_openapi_schema)) |s| try alloc.dupe(u8, s) else null, 103 .enforce_parameter_schema = r.int(Col.enforce_parameter_schema) != 0, 104 .tags = try alloc.dupe(u8, r.text(Col.tags)), 105 .labels = try alloc.dupe(u8, r.text(Col.labels)), 106 .path = if (r.textOrNull(Col.path)) |p| try alloc.dupe(u8, p) else null, 107 .entrypoint = if (r.textOrNull(Col.entrypoint)) |e| try alloc.dupe(u8, e) else null, 108 .job_variables = try alloc.dupe(u8, r.text(Col.job_variables)), 109 .pull_steps = if (r.textOrNull(Col.pull_steps)) |ps| try alloc.dupe(u8, ps) else null, 110 .work_pool_name = if (r.textOrNull(Col.work_pool_name)) |wpn| try alloc.dupe(u8, wpn) else null, 111 .work_queue_name = if (r.textOrNull(Col.work_queue_name)) |wqn| try alloc.dupe(u8, wqn) else null, 112 .work_queue_id = if (r.textOrNull(Col.work_queue_id)) |wqi| try alloc.dupe(u8, wqi) else null, 113 .storage_document_id = if (r.textOrNull(Col.storage_document_id)) |sd| try alloc.dupe(u8, sd) else null, 114 .infrastructure_document_id = if (r.textOrNull(Col.infrastructure_document_id)) |id_| try alloc.dupe(u8, id_) else null, 115 .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null, 116 }; 117} 118 119pub fn getById(alloc: Allocator, id: []const u8) !?DeploymentRow { 120 var r = backend.db.row( 121 "SELECT " ++ select_cols ++ " FROM deployment WHERE id = ?", 122 .{id}, 123 ) catch return null; 124 125 if (r) |*row| { 126 defer row.deinit(); 127 return try rowFromResult(alloc, row); 128 } 129 return null; 130} 131 132pub fn getByFlowAndName(alloc: Allocator, flow_id: []const u8, name: []const u8) !?DeploymentRow { 133 var r = backend.db.row( 134 "SELECT " ++ select_cols ++ " FROM deployment WHERE flow_id = ? AND name = ?", 135 .{ flow_id, name }, 136 ) catch return null; 137 138 if (r) |*row| { 139 defer row.deinit(); 140 return try rowFromResult(alloc, row); 141 } 142 return null; 143} 144 145pub const InsertParams = struct { 146 version: ?[]const u8 = null, 147 description: ?[]const u8 = null, 148 paused: bool = false, 149 status: Status = .not_ready, 150 parameters: []const u8 = "{}", 151 parameter_openapi_schema: ?[]const u8 = null, 152 enforce_parameter_schema: bool = true, 153 tags: []const u8 = "[]", 154 labels: []const u8 = "{}", 155 path: ?[]const u8 = null, 156 entrypoint: ?[]const u8 = null, 157 job_variables: []const u8 = "{}", 158 pull_steps: ?[]const u8 = null, 159 work_pool_name: ?[]const u8 = null, 160 work_queue_name: ?[]const u8 = null, 161 work_queue_id: ?[]const u8 = null, 162 storage_document_id: ?[]const u8 = null, 163 infrastructure_document_id: ?[]const u8 = null, 164 concurrency_limit: ?i64 = null, 165}; 166 167pub fn insert( 168 id: []const u8, 169 name: []const u8, 170 flow_id: []const u8, 171 created: []const u8, 172 params: InsertParams, 173) !void { 174 backend.db.exec( 175 \\INSERT INTO deployment (id, created, updated, name, flow_id, version, description, 176 \\ paused, status, parameters, parameter_openapi_schema, enforce_parameter_schema, 177 \\ tags, labels, path, entrypoint, job_variables, pull_steps, 178 \\ work_pool_name, work_queue_name, work_queue_id, 179 \\ storage_document_id, infrastructure_document_id, concurrency_limit) 180 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 181 , .{ 182 id, 183 created, 184 created, 185 name, 186 flow_id, 187 params.version, 188 params.description, 189 @as(i32, if (params.paused) 1 else 0), 190 params.status.toString(), 191 params.parameters, 192 params.parameter_openapi_schema, 193 @as(i32, if (params.enforce_parameter_schema) 1 else 0), 194 params.tags, 195 params.labels, 196 params.path, 197 params.entrypoint, 198 params.job_variables, 199 params.pull_steps, 200 params.work_pool_name, 201 params.work_queue_name, 202 params.work_queue_id, 203 params.storage_document_id, 204 params.infrastructure_document_id, 205 params.concurrency_limit, 206 }) catch |err| { 207 log.err("database", "insert deployment error: {}", .{err}); 208 return err; 209 }; 210} 211 212pub const UpdateParams = struct { 213 version: ?[]const u8 = null, 214 description: ?[]const u8 = null, 215 paused: ?bool = null, 216 parameters: ?[]const u8 = null, 217 parameter_openapi_schema: ?[]const u8 = null, 218 enforce_parameter_schema: ?bool = null, 219 tags: ?[]const u8 = null, 220 labels: ?[]const u8 = null, 221 path: ?[]const u8 = null, 222 entrypoint: ?[]const u8 = null, 223 job_variables: ?[]const u8 = null, 224 pull_steps: ?[]const u8 = null, 225 work_pool_name: ?[]const u8 = null, 226 work_queue_name: ?[]const u8 = null, 227 work_queue_id: ?[]const u8 = null, 228 concurrency_limit: ?i64 = null, 229}; 230 231pub fn updateById(id: []const u8, updated: []const u8, params: UpdateParams) !bool { 232 const affected = backend.db.execWithRowCount( 233 \\UPDATE deployment SET 234 \\ version = COALESCE(?, version), 235 \\ description = COALESCE(?, description), 236 \\ paused = COALESCE(?, paused), 237 \\ parameters = COALESCE(?, parameters), 238 \\ parameter_openapi_schema = COALESCE(?, parameter_openapi_schema), 239 \\ enforce_parameter_schema = COALESCE(?, enforce_parameter_schema), 240 \\ tags = COALESCE(?, tags), 241 \\ labels = COALESCE(?, labels), 242 \\ path = COALESCE(?, path), 243 \\ entrypoint = COALESCE(?, entrypoint), 244 \\ job_variables = COALESCE(?, job_variables), 245 \\ pull_steps = COALESCE(?, pull_steps), 246 \\ work_pool_name = COALESCE(?, work_pool_name), 247 \\ work_queue_name = COALESCE(?, work_queue_name), 248 \\ work_queue_id = COALESCE(?, work_queue_id), 249 \\ concurrency_limit = COALESCE(?, concurrency_limit), 250 \\ updated = ? 251 \\WHERE id = ? 252 , .{ 253 params.version, 254 params.description, 255 if (params.paused) |p| @as(?i32, if (p) 1 else 0) else null, 256 params.parameters, 257 params.parameter_openapi_schema, 258 if (params.enforce_parameter_schema) |e| @as(?i32, if (e) 1 else 0) else null, 259 params.tags, 260 params.labels, 261 params.path, 262 params.entrypoint, 263 params.job_variables, 264 params.pull_steps, 265 params.work_pool_name, 266 params.work_queue_name, 267 params.work_queue_id, 268 params.concurrency_limit, 269 updated, 270 id, 271 }) catch |err| { 272 log.err("database", "update deployment error: {}", .{err}); 273 return err; 274 }; 275 return affected > 0; 276} 277 278pub fn updateStatus(id: []const u8, status: Status, updated: []const u8) !bool { 279 const affected = backend.db.execWithRowCount( 280 "UPDATE deployment SET status = ?, updated = ? WHERE id = ?", 281 .{ status.toString(), updated, id }, 282 ) catch |err| { 283 log.err("database", "update deployment status error: {}", .{err}); 284 return err; 285 }; 286 return affected > 0; 287} 288 289pub fn updateLastPolled(id: []const u8, last_polled: []const u8) !bool { 290 const affected = backend.db.execWithRowCount( 291 "UPDATE deployment SET last_polled = ?, updated = ? WHERE id = ?", 292 .{ last_polled, last_polled, id }, 293 ) catch |err| { 294 log.err("database", "update deployment last_polled error: {}", .{err}); 295 return err; 296 }; 297 return affected > 0; 298} 299 300pub fn updatePaused(id: []const u8, paused: bool, updated: []const u8) !bool { 301 const affected = backend.db.execWithRowCount( 302 "UPDATE deployment SET paused = ?, updated = ? WHERE id = ?", 303 .{ @as(i64, if (paused) 1 else 0), updated, id }, 304 ) catch |err| { 305 log.err("database", "update deployment paused error: {}", .{err}); 306 return err; 307 }; 308 return affected > 0; 309} 310 311pub fn deleteById(id: []const u8) !bool { 312 const affected = backend.db.execWithRowCount( 313 "DELETE FROM deployment WHERE id = ?", 314 .{id}, 315 ) catch |err| { 316 log.err("database", "delete deployment error: {}", .{err}); 317 return err; 318 }; 319 return affected > 0; 320} 321 322pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]DeploymentRow { 323 var results = std.ArrayListUnmanaged(DeploymentRow){}; 324 errdefer results.deinit(alloc); 325 326 var rows = backend.db.query( 327 "SELECT " ++ select_cols ++ " FROM deployment ORDER BY created DESC LIMIT ? OFFSET ?", 328 .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 329 ) catch |err| { 330 log.err("database", "list deployments error: {}", .{err}); 331 return err; 332 }; 333 defer rows.deinit(); 334 335 while (rows.next()) |r| { 336 try results.append(alloc, try rowFromResult(alloc, &r)); 337 } 338 339 return results.toOwnedSlice(alloc); 340} 341 342pub fn listByFlowId(alloc: Allocator, flow_id: []const u8, limit: usize, offset: usize) ![]DeploymentRow { 343 var results = std.ArrayListUnmanaged(DeploymentRow){}; 344 errdefer results.deinit(alloc); 345 346 var rows = backend.db.query( 347 "SELECT " ++ select_cols ++ " FROM deployment WHERE flow_id = ? ORDER BY created DESC LIMIT ? OFFSET ?", 348 .{ flow_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 349 ) catch |err| { 350 log.err("database", "list deployments by flow error: {}", .{err}); 351 return err; 352 }; 353 defer rows.deinit(); 354 355 while (rows.next()) |r| { 356 try results.append(alloc, try rowFromResult(alloc, &r)); 357 } 358 359 return results.toOwnedSlice(alloc); 360} 361 362pub fn count() !usize { 363 var r = backend.db.row("SELECT COUNT(*) FROM deployment", .{}) catch return 0; 364 if (r) |*row| { 365 defer row.deinit(); 366 return @intCast(row.bigint(0)); 367 } 368 return 0; 369} 370 371/// Mark deployments as READY for the given work queue IDs 372/// Called when workers poll for scheduled runs 373pub fn markReadyByWorkQueues(work_queue_ids: []const []const u8, updated: []const u8) !void { 374 for (work_queue_ids) |queue_id| { 375 backend.db.exec( 376 "UPDATE deployment SET status = 'READY', last_polled = ?, updated = ? WHERE work_queue_id = ? AND status = 'NOT_READY'", 377 .{ updated, updated, queue_id }, 378 ) catch |err| { 379 log.err("database", "mark deployments ready error: {}", .{err}); 380 continue; 381 }; 382 } 383}