const std = @import("std"); const zap = @import("zap"); const mem = std.mem; const json = std.json; const db = @import("../db/sqlite.zig"); const uuid_util = @import("../utilities/uuid.zig"); const time_util = @import("../utilities/time.zig"); const json_util = @import("../utilities/json.zig"); const pool_helpers = @import("work_pools.zig"); pub fn create(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = pool_helpers.extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const obj = parsed.value.object; const name = switch (obj.get("name") orelse { json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); return; }) { .string => |s| s, else => { json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); return; }, }; if (db.work_queues.getByPoolAndName(alloc, pool.id, name) catch null) |_| { json_util.sendStatus(r, "{\"detail\":\"Work queue already exists.\"}", .conflict); return; } const description = pool_helpers.getOptionalString(obj.get("description")) orelse ""; const is_paused = pool_helpers.getOptionalBool(obj.get("is_paused")) orelse false; const concurrency_limit = pool_helpers.getOptionalInt(obj.get("concurrency_limit")); const priority = pool_helpers.getOptionalInt(obj.get("priority")) orelse (db.work_queues.nextPriority(pool.id) catch 1); var id_buf: [36]u8 = undefined; const queue_id = uuid_util.generate(&id_buf); var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); const status: db.work_queues.Status = if (is_paused) .paused else .not_ready; db.work_queues.insert( queue_id, name, description, is_paused, concurrency_limit, priority, pool.id, status, now, ) catch { json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); return; }; const queue = db.work_queues.getByPoolAndName(alloc, pool.id, name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"queue not found after insert\"}", .internal_server_error); return; }; const resp = writeQueue(alloc, queue, pool_name) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.sendStatus(r, resp, .created); } pub fn get(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = pool_helpers.extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const queue_name = pool_helpers.extractQueueName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| { const resp = writeQueue(alloc, queue, pool_name) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); } else { json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); } } pub fn update(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = pool_helpers.extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const queue_name = pool_helpers.extractQueueName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const obj = parsed.value.object; const new_name = pool_helpers.getOptionalString(obj.get("name")); const description = pool_helpers.getOptionalString(obj.get("description")); const is_paused = pool_helpers.getOptionalBool(obj.get("is_paused")); const concurrency_limit = pool_helpers.getOptionalInt(obj.get("concurrency_limit")); const priority = pool_helpers.getOptionalInt(obj.get("priority")); var new_status: ?db.work_queues.Status = null; if (is_paused) |paused| { new_status = if (paused) .paused else .not_ready; } var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); const did_update = db.work_queues.updateByPoolAndName( pool.id, queue_name, new_name, description, is_paused, concurrency_limit, priority, new_status, now, ) catch { json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); return; }; if (!did_update) { json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); return; } r.setStatus(.no_content); r.sendBody("") catch {}; } pub fn delete(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = pool_helpers.extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const queue_name = pool_helpers.extractQueueName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; if (pool.default_queue_id) |default_id| { if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| { if (mem.eql(u8, queue.id, default_id)) { json_util.sendStatus(r, "{\"detail\":\"Cannot delete the default work queue.\"}", .bad_request); return; } } } const deleted = db.work_queues.deleteByPoolAndName(pool.id, queue_name) catch { json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); return; }; if (!deleted) { json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); return; } r.setStatus(.no_content); r.sendBody("") catch {}; } pub fn filter(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = pool_helpers.extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; var limit: usize = 200; var offset: usize = 0; if (r.body) |body| { if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { const obj = parsed.value.object; if (obj.get("limit")) |v| { if (v == .integer) limit = @intCast(v.integer); } if (obj.get("offset")) |v| { if (v == .integer) offset = @intCast(v.integer); } } else |_| {} } const queues = db.work_queues.listByPool(alloc, pool.id, limit, offset) catch { json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); return; }; var output: std.io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; jw.beginArray() catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; for (queues) |queue| { writeQueueObject(&jw, queue, pool_name) catch continue; } jw.endArray() catch {}; json_util.send(r, output.toOwnedSlice() catch "[]"); } // JSON serialization fn writeQueue(alloc: std.mem.Allocator, queue: db.work_queues.WorkQueueRow, pool_name: []const u8) ![]const u8 { var output: std.io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; try writeQueueObject(&jw, queue, pool_name); return output.toOwnedSlice(); } fn writeQueueObject(jw: *json.Stringify, queue: db.work_queues.WorkQueueRow, pool_name: []const u8) !void { try jw.beginObject(); try jw.objectField("id"); try jw.write(queue.id); try jw.objectField("created"); try jw.write(queue.created); try jw.objectField("updated"); try jw.write(queue.updated); try jw.objectField("name"); try jw.write(queue.name); try jw.objectField("description"); try jw.write(queue.description); try jw.objectField("is_paused"); try jw.write(queue.is_paused); try jw.objectField("concurrency_limit"); if (queue.concurrency_limit) |c| { try jw.write(c); } else { try jw.write(null); } try jw.objectField("priority"); try jw.write(queue.priority); try jw.objectField("work_pool_id"); try jw.write(queue.work_pool_id); try jw.objectField("work_pool_name"); try jw.write(pool_name); try jw.objectField("last_polled"); if (queue.last_polled) |lp| { try jw.write(lp); } else { try jw.write(null); } try jw.objectField("status"); try jw.write(queue.status.toString()); try jw.endObject(); }