//! admin endpoint handlers for relay management. //! //! all handlers require Bearer token auth against RELAY_ADMIN_PASSWORD. //! includes host blocking/unblocking, account bans, and backfill control. const std = @import("std"); const h = @import("http.zig"); const router = @import("router.zig"); const websocket = @import("websocket"); const broadcaster = @import("../broadcaster.zig"); const event_log_mod = @import("../event_log.zig"); const backfill_mod = @import("../backfill.zig"); const cleaner_mod = @import("../cleaner.zig"); const resync_mod = @import("../resync.zig"); const log = std.log.scoped(.relay); const HttpContext = router.HttpContext; /// check admin auth via headers, send error response if not authorized. returns true if authorized. pub fn checkAdmin(conn: *h.Conn, headers: ?*const websocket.Handshake.KeyValue) bool { const admin_pw = std.posix.getenv("RELAY_ADMIN_PASSWORD") orelse { h.respondJson(conn, .forbidden, "{\"error\":\"admin endpoint not configured\"}"); return false; }; const kv = headers orelse { h.respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); return false; }; // handshake parser lowercases all header names const auth_value = kv.get("authorization") orelse { h.respondJson(conn, .unauthorized, "{\"error\":\"missing authorization header\"}"); return false; }; const bearer_prefix = "Bearer "; if (!std.mem.startsWith(u8, auth_value, bearer_prefix)) { h.respondJson(conn, .unauthorized, "{\"error\":\"invalid authorization scheme\"}"); return false; } const token = auth_value[bearer_prefix.len..]; if (!std.mem.eql(u8, token, admin_pw)) { h.respondJson(conn, .unauthorized, "{\"error\":\"invalid token\"}"); return false; } return true; } pub fn handleBan(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { did: []const u8 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\"}\"}"); return; }; defer parsed.deinit(); const did = parsed.value.did; // resolve DID → UID and take down const uid = ctx.persist.uidForDid(did) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to resolve DID\"}"); return; }; ctx.persist.takeDownUser(uid) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"takedown failed\"}"); return; }; // remove from collection index so banned accounts don't appear in listReposByCollection ctx.collection_index.removeAll(did) catch |err| { log.debug("collection removeAll after ban failed: {s}", .{@errorName(err)}); }; // emit #account event so downstream consumers see the takedown if (buildAccountFrame(ctx.persist.allocator, did)) |frame_bytes| { if (ctx.persist.persist(.account, uid, frame_bytes)) |relay_seq| { ctx.bc.stats.relay_seq.store(relay_seq, .release); const broadcast_data = broadcaster.resequenceFrame(ctx.persist.allocator, frame_bytes, relay_seq) orelse frame_bytes; ctx.bc.broadcast(relay_seq, broadcast_data); log.info("admin: emitted #account takedown event for {s} (seq={d})", .{ did, relay_seq }); } else |err| { log.warn("admin: failed to persist #account takedown event: {s}", .{@errorName(err)}); } } log.info("admin: banned {s} (uid={d})", .{ did, uid }); h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminListHosts(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const persist = ctx.persist; const hosts = persist.listAllHosts(persist.allocator) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"query failed\"}"); return; }; defer { for (hosts) |host| { persist.allocator.free(host.hostname); persist.allocator.free(host.status); } persist.allocator.free(hosts); } var list: std.ArrayListUnmanaged(u8) = .{}; defer list.deinit(persist.allocator); const w = list.writer(persist.allocator); w.writeAll("{\"hosts\":[") catch return; for (hosts, 0..) |host, i| { if (i > 0) w.writeByte(',') catch return; if (host.account_limit) |limit| { std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d},\"account_limit\":{d}}}", .{ host.id, host.hostname, host.status, host.last_seq, host.failed_attempts, limit, }) catch return; } else { std.fmt.format(w, "{{\"id\":{d},\"hostname\":\"{s}\",\"status\":\"{s}\",\"last_seq\":{d},\"failed_attempts\":{d},\"account_limit\":null}}", .{ host.id, host.hostname, host.status, host.last_seq, host.failed_attempts, }) catch return; } } std.fmt.format(w, "],\"active_workers\":{d}}}", .{ctx.slurper.workerCount()}) catch return; h.respondJson(conn, .ok, list.items); } pub fn handleAdminBlockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); return; }; defer parsed.deinit(); const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); return; }; persist.updateHostStatus(host_info.id, "blocked") catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); return; }; log.info("admin: blocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminUnblockHost(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, persist: *event_log_mod.DiskPersist) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice(struct { hostname: []const u8 }, persist.allocator, body, .{ .ignore_unknown_fields = true }) catch { h.respondJson(conn, .bad_request, "{\"error\":\"BadRequest\",\"message\":\"invalid JSON\"}"); return; }; defer parsed.deinit(); const host_info = persist.getOrCreateHost(parsed.value.hostname) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"host lookup failed\"}"); return; }; persist.updateHostStatus(host_info.id, "active") catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"DatabaseError\",\"message\":\"status update failed\"}"); return; }; persist.resetHostFailures(host_info.id) catch {}; log.info("admin: unblocked host {s} (id={d})", .{ parsed.value.hostname, host_info.id }); h.respondJson(conn, .ok, "{\"success\":true}"); } /// set or clear the account_limit override for a host. /// POST {"host": "...", "account_limit": 100000} — set override /// POST {"host": "...", "account_limit": null} — clear override (revert to COUNT(*)) pub fn handleAdminChangeLimits(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice( struct { host: []const u8, account_limit: ?u64 }, ctx.persist.allocator, body, .{ .ignore_unknown_fields = true }, ) catch { h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"host\\\":\\\"...\\\",\\\"account_limit\\\":...}\"}"); return; }; defer parsed.deinit(); const host_id = ctx.persist.getHostIdForHostname(parsed.value.host) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"database error\"}"); return; } orelse { h.respondJson(conn, .not_found, "{\"error\":\"host not found\"}"); return; }; ctx.persist.setHostAccountLimit(host_id, parsed.value.account_limit) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to update limit\"}"); return; }; // update running subscriber's rate limits immediately const effective = if (parsed.value.account_limit) |l| l else ctx.persist.getHostAccountCount(host_id); ctx.slurper.updateHostLimits(host_id, effective); if (parsed.value.account_limit) |limit| { log.info("admin: set account_limit for {s} (id={d}): {d}", .{ parsed.value.host, host_id, limit }); } else { log.info("admin: cleared account_limit for {s} (id={d}), reverted to COUNT(*)", .{ parsed.value.host, host_id }); } h.respondJson(conn, .ok, "{\"success\":true}"); } pub fn handleAdminBackfillTrigger(conn: *h.Conn, query: []const u8, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { if (!checkAdmin(conn, headers)) return; const source = h.queryParam(query, "source") orelse "bsky.network"; backfiller.start(source) catch |err| { switch (err) { error.AlreadyRunning => { h.respondJson(conn, .conflict, "{\"error\":\"backfill already in progress\"}"); }, else => { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to start backfill\"}"); }, } return; }; var buf: [256]u8 = undefined; const resp_body = std.fmt.bufPrint(&buf, "{{\"status\":\"started\",\"source\":\"{s}\"}}", .{source}) catch { h.respondJson(conn, .ok, "{\"status\":\"started\"}"); return; }; h.respondJson(conn, .ok, resp_body); } pub fn handleAdminBackfillStatus(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, backfiller: *backfill_mod.Backfiller) void { if (!checkAdmin(conn, headers)) return; const body = backfiller.getStatus(backfiller.allocator) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to query backfill status\"}"); return; }; defer backfiller.allocator.free(body); h.respondJson(conn, .ok, body); } pub fn handleCleanupTrigger(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, cleaner: *cleaner_mod.Cleaner) void { if (!checkAdmin(conn, headers)) return; cleaner.start() catch |err| { switch (err) { error.AlreadyRunning => { h.respondJson(conn, .conflict, "{\"error\":\"cleanup already in progress\"}"); }, else => { h.respondJson(conn, .internal_server_error, "{\"error\":\"failed to start cleanup\"}"); }, } return; }; h.respondJson(conn, .ok, "{\"status\":\"started\"}"); } pub fn handleCleanupStatus(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, cleaner: *cleaner_mod.Cleaner) void { if (!checkAdmin(conn, headers)) return; const status = cleaner.getStatus(); var buf: [256]u8 = undefined; const body = std.fmt.bufPrint(&buf, "{{\"running\":{},\"scanned\":{d},\"removed\":{d}}}", .{ status.running, status.scanned, status.removed, }) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"format failed\"}"); return; }; h.respondJson(conn, .ok, body); } pub fn handleResyncStatus(conn: *h.Conn, headers: *const websocket.Handshake.KeyValue, resyncer: *resync_mod.Resyncer) void { if (!checkAdmin(conn, headers)) return; var buf: [256]u8 = undefined; const body = std.fmt.bufPrint(&buf, "{{\"processed\":{d},\"failed\":{d},\"dropped\":{d},\"queue_depth\":{d}}}", .{ resyncer.processed.load(.monotonic), resyncer.failed.load(.monotonic), resyncer.dropped.load(.monotonic), resyncer.queueDepth(), }) catch { h.respondJson(conn, .internal_server_error, "{\"error\":\"format failed\"}"); return; }; h.respondJson(conn, .ok, body); } pub fn handleResyncTrigger(conn: *h.Conn, body: []const u8, headers: *const websocket.Handshake.KeyValue, resyncer: *resync_mod.Resyncer) void { if (!checkAdmin(conn, headers)) return; const parsed = std.json.parseFromSlice( struct { did: []const u8, hostname: []const u8 }, std.heap.c_allocator, body, .{ .ignore_unknown_fields = true }, ) catch { h.respondJson(conn, .bad_request, "{\"error\":\"invalid JSON, expected {\\\"did\\\":\\\"...\\\",\\\"hostname\\\":\\\"...\\\"}\"}"); return; }; defer parsed.deinit(); resyncer.enqueue(parsed.value.did, parsed.value.hostname); h.respondJson(conn, .ok, "{\"status\":\"enqueued\"}"); } // --- protocol helpers (used only by handleBan) --- /// build a CBOR #account frame for a takedown event. /// header: {op: 1, t: "#account"}, payload: {seq: 0, did: "...", time: "...", active: false, status: "takendown"} fn buildAccountFrame(allocator: std.mem.Allocator, did: []const u8) ?[]const u8 { const zat = @import("zat"); const cbor = zat.cbor; const header: cbor.Value = .{ .map = &.{ .{ .key = "op", .value = .{ .unsigned = 1 } }, .{ .key = "t", .value = .{ .text = "#account" } }, } }; var time_buf: [24]u8 = undefined; const time_str = formatTimestamp(&time_buf); const payload: cbor.Value = .{ .map = &.{ .{ .key = "seq", .value = .{ .unsigned = 0 } }, .{ .key = "did", .value = .{ .text = did } }, .{ .key = "time", .value = .{ .text = time_str } }, .{ .key = "active", .value = .{ .boolean = false } }, .{ .key = "status", .value = .{ .text = "takendown" } }, } }; const header_bytes = cbor.encodeAlloc(allocator, header) catch return null; const payload_bytes = cbor.encodeAlloc(allocator, payload) catch { allocator.free(header_bytes); return null; }; var frame = allocator.alloc(u8, header_bytes.len + payload_bytes.len) catch { allocator.free(header_bytes); allocator.free(payload_bytes); return null; }; @memcpy(frame[0..header_bytes.len], header_bytes); @memcpy(frame[header_bytes.len..], payload_bytes); allocator.free(header_bytes); allocator.free(payload_bytes); return frame; } /// format current UTC time as ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) fn formatTimestamp(buf: *[24]u8) []const u8 { const ts: u64 = @intCast(std.time.timestamp()); const es = std.time.epoch.EpochSeconds{ .secs = ts }; const day = es.getEpochDay(); const yd = day.calculateYearDay(); const md = yd.calculateMonthDay(); const ds = es.getDaySeconds(); return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ yd.year, @as(u32, @intFromEnum(md.month)) + 1, @as(u32, md.day_index) + 1, ds.getHoursIntoDay(), ds.getMinutesIntoHour(), ds.getSecondsIntoMinute(), }) catch "1970-01-01T00:00:00Z"; }