const std = @import("std"); const zap = @import("zap"); const json = std.json; const db = @import("../db/sqlite.zig"); const uuid_util = @import("../utilities/uuid.zig"); const time_util = @import("../utilities/time.zig"); const json_util = @import("../utilities/json.zig"); const pool_helpers = @import("work_pools.zig"); pub fn heartbeat(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = pool_helpers.extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const obj = parsed.value.object; const worker_name = switch (obj.get("name") orelse { json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); return; }) { .string => |s| s, else => { json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); return; }, }; const heartbeat_interval = pool_helpers.getOptionalInt(obj.get("heartbeat_interval_seconds")); var id_buf: [36]u8 = undefined; const worker_id = uuid_util.generate(&id_buf); var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); db.workers.upsertHeartbeat(worker_id, worker_name, pool.id, heartbeat_interval, now) catch { json_util.sendStatus(r, "{\"detail\":\"heartbeat failed\"}", .internal_server_error); return; }; // Update pool status to READY if it was NOT_READY if (pool.status == .not_ready) { _ = db.work_pools.updateStatus(pool_name, .ready, now) catch {}; } r.setStatus(.no_content); r.sendBody("") catch {}; } pub fn filter(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = pool_helpers.extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; var limit: usize = 200; var offset: usize = 0; if (r.body) |body| { if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { const obj = parsed.value.object; if (obj.get("limit")) |v| { if (v == .integer) limit = @intCast(v.integer); } if (obj.get("offset")) |v| { if (v == .integer) offset = @intCast(v.integer); } } else |_| {} } const workers_list = db.workers.listByPool(alloc, pool.id, limit, offset) catch { json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); return; }; var output: std.io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; jw.beginArray() catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; for (workers_list) |worker| { writeWorkerObject(&jw, worker) catch continue; } jw.endArray() catch {}; json_util.send(r, output.toOwnedSlice() catch "[]"); } pub fn delete(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = pool_helpers.extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const worker_name = pool_helpers.extractWorkerName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"worker name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; const deleted = db.workers.deleteByPoolAndName(pool.id, worker_name) catch { json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); return; }; if (!deleted) { json_util.sendStatus(r, "{\"detail\":\"Worker not found.\"}", .not_found); return; } r.setStatus(.no_content); r.sendBody("") catch {}; } // JSON serialization fn writeWorkerObject(jw: *json.Stringify, worker: db.workers.WorkerRow) !void { try jw.beginObject(); try jw.objectField("id"); try jw.write(worker.id); try jw.objectField("created"); try jw.write(worker.created); try jw.objectField("updated"); try jw.write(worker.updated); try jw.objectField("name"); try jw.write(worker.name); try jw.objectField("work_pool_id"); try jw.write(worker.work_pool_id); try jw.objectField("last_heartbeat_time"); if (worker.last_heartbeat_time) |lh| { try jw.write(lh); } else { try jw.write(null); } try jw.objectField("heartbeat_interval_seconds"); if (worker.heartbeat_interval_seconds) |hi| { try jw.write(hi); } else { try jw.write(null); } try jw.objectField("status"); try jw.write(worker.status.toString()); try jw.endObject(); }