//! HTTP route dispatch and static content. //! //! defines HttpContext (the shared context plumbed from main.zig) and the //! top-level request router that delegates to xrpc and admin handler modules. const std = @import("std"); const websocket = @import("websocket"); const broadcaster = @import("../broadcaster.zig"); const validator_mod = @import("../validator.zig"); const slurper_mod = @import("../slurper.zig"); const event_log_mod = @import("../event_log.zig"); const collection_index_mod = @import("../collection_index.zig"); const backfill_mod = @import("../backfill.zig"); const h = @import("http.zig"); const xrpc = @import("xrpc.zig"); const admin = @import("admin.zig"); /// context for HTTP fallback handlers (passed as opaque pointer through broadcaster) pub const HttpContext = struct { stats: *broadcaster.Stats, persist: *event_log_mod.DiskPersist, slurper: *slurper_mod.Slurper, collection_index: *collection_index_mod.CollectionIndex, backfiller: *backfill_mod.Backfiller, bc: *broadcaster.Broadcaster, validator: *validator_mod.Validator, }; /// top-level HTTP request router — installed as bc.http_fallback pub fn handleHttpRequest( conn: *websocket.Conn, method: []const u8, url: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, opaque_ctx: ?*anyopaque, ) void { const ctx: *HttpContext = @ptrCast(@alignCast(opaque_ctx orelse return)); const qmark = std.mem.indexOfScalar(u8, url, '?'); const path = url[0..(qmark orelse url.len)]; const query = if (qmark) |q| url[q + 1 ..] else ""; if (std.mem.eql(u8, method, "GET")) { handleGet(conn, path, query, headers, ctx); } else if (std.mem.eql(u8, method, "POST")) { handlePost(conn, path, query, body, headers, ctx); } else { h.respondText(conn, .method_not_allowed, "method not allowed"); } } fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (std.mem.eql(u8, path, "/_healthz")) { // trivial liveness — process is alive, constant-time, no dependencies h.respondJson(conn, .ok, "{\"status\":\"ok\"}"); } else if (std.mem.eql(u8, path, "/_readyz") or std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { // readiness — checks DB dependency _ = ctx.persist.db.exec("SELECT 1", .{}) catch { h.respondJson(conn, .internal_server_error, "{\"status\":\"error\",\"msg\":\"database unavailable\"}"); return; }; h.respondJson(conn, .ok, "{\"status\":\"ok\"}"); } else if (std.mem.eql(u8, path, "/_stats")) { var stats_buf: [4096]u8 = undefined; const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf); h.respondJson(conn, .ok, body); } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { xrpc.handleListRepos(conn, query, ctx.persist); } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { xrpc.handleGetRepoStatus(conn, query, ctx.persist); } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { xrpc.handleGetLatestCommit(conn, query, ctx.persist); } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { xrpc.handleListReposByCollection(conn, query, ctx.collection_index); } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { xrpc.handleListHosts(conn, query, ctx.persist); } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { xrpc.handleGetHostStatus(conn, query, ctx.persist); } else if (std.mem.eql(u8, path, "/admin/hosts")) { admin.handleAdminListHosts(conn, headers, ctx); } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { admin.handleAdminBackfillStatus(conn, headers, ctx.backfiller); } else if (std.mem.eql(u8, path, "/")) { h.respondText(conn, .ok, \\ _ \\ ___| | __ _ _ _ \\|_ / |/ _` | | | | \\ / /| | (_| | |_| | \\/___|_|\__,_|\__, | \\ |___/ \\ \\This is an atproto [https://atproto.com] relay instance, \\running the zlay codebase [https://tangled.org/zzstoatzz.io/zlay] \\ \\The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos \\ ); } else if (std.mem.eql(u8, path, "/favicon.svg") or std.mem.eql(u8, path, "/favicon.ico")) { h.httpRespond(conn, .ok, "image/svg+xml", \\ \\ \\Z \\ ); } else { h.respondText(conn, .not_found, "not found"); } } fn handlePost(conn: *websocket.Conn, path: []const u8, query: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { if (std.mem.eql(u8, path, "/admin/repo/ban")) { admin.handleBan(conn, body, headers, ctx); } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { xrpc.handleRequestCrawl(conn, body, ctx.slurper); } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { admin.handleAdminBlockHost(conn, body, headers, ctx.persist); } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { admin.handleAdminUnblockHost(conn, body, headers, ctx.persist); } else if (std.mem.eql(u8, path, "/admin/hosts/changeLimits")) { admin.handleAdminChangeLimits(conn, body, headers, ctx); } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { admin.handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller); } else { h.respondText(conn, .not_found, "not found"); } }