atproto relay implementation in zig
zlay.waow.tech
1//! HTTP route dispatch and static content.
2//!
3//! defines HttpContext (the shared context plumbed from main.zig) and the
4//! top-level request router that delegates to xrpc and admin handler modules.
5
6const std = @import("std");
7const websocket = @import("websocket");
8const broadcaster = @import("../broadcaster.zig");
9const validator_mod = @import("../validator.zig");
10const slurper_mod = @import("../slurper.zig");
11const event_log_mod = @import("../event_log.zig");
12const collection_index_mod = @import("../collection_index.zig");
13const backfill_mod = @import("../backfill.zig");
14const h = @import("http.zig");
15const xrpc = @import("xrpc.zig");
16const admin = @import("admin.zig");
17
18/// context for HTTP fallback handlers (passed as opaque pointer through broadcaster)
19pub const HttpContext = struct {
20 stats: *broadcaster.Stats,
21 persist: *event_log_mod.DiskPersist,
22 slurper: *slurper_mod.Slurper,
23 collection_index: *collection_index_mod.CollectionIndex,
24 backfiller: *backfill_mod.Backfiller,
25 bc: *broadcaster.Broadcaster,
26 validator: *validator_mod.Validator,
27};
28
29/// top-level HTTP request router — installed as bc.http_fallback
30pub fn handleHttpRequest(
31 conn: *websocket.Conn,
32 method: []const u8,
33 url: []const u8,
34 body: []const u8,
35 headers: *const websocket.Handshake.KeyValue,
36 opaque_ctx: ?*anyopaque,
37) void {
38 const ctx: *HttpContext = @ptrCast(@alignCast(opaque_ctx orelse return));
39
40 const qmark = std.mem.indexOfScalar(u8, url, '?');
41 const path = url[0..(qmark orelse url.len)];
42 const query = if (qmark) |q| url[q + 1 ..] else "";
43
44 if (std.mem.eql(u8, method, "GET")) {
45 handleGet(conn, path, query, headers, ctx);
46 } else if (std.mem.eql(u8, method, "POST")) {
47 handlePost(conn, path, query, body, headers, ctx);
48 } else {
49 h.respondText(conn, .method_not_allowed, "method not allowed");
50 }
51}
52
53fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void {
54 if (std.mem.eql(u8, path, "/_healthz")) {
55 // trivial liveness — process is alive, constant-time, no dependencies
56 h.respondJson(conn, .ok, "{\"status\":\"ok\"}");
57 } else if (std.mem.eql(u8, path, "/_readyz") or std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) {
58 // readiness — checks DB dependency
59 _ = ctx.persist.db.exec("SELECT 1", .{}) catch {
60 h.respondJson(conn, .internal_server_error, "{\"status\":\"error\",\"msg\":\"database unavailable\"}");
61 return;
62 };
63 h.respondJson(conn, .ok, "{\"status\":\"ok\"}");
64 } else if (std.mem.eql(u8, path, "/_stats")) {
65 var stats_buf: [4096]u8 = undefined;
66 const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf);
67 h.respondJson(conn, .ok, body);
68 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) {
69 xrpc.handleListRepos(conn, query, ctx.persist);
70 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) {
71 xrpc.handleGetRepoStatus(conn, query, ctx.persist);
72 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) {
73 xrpc.handleGetLatestCommit(conn, query, ctx.persist);
74 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) {
75 xrpc.handleListReposByCollection(conn, query, ctx.collection_index);
76 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) {
77 xrpc.handleListHosts(conn, query, ctx.persist);
78 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) {
79 xrpc.handleGetHostStatus(conn, query, ctx.persist);
80 } else if (std.mem.eql(u8, path, "/admin/hosts")) {
81 admin.handleAdminListHosts(conn, headers, ctx);
82 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) {
83 admin.handleAdminBackfillStatus(conn, headers, ctx.backfiller);
84 } else if (std.mem.eql(u8, path, "/")) {
85 h.respondText(conn, .ok,
86 \\ _
87 \\ ___| | __ _ _ _
88 \\|_ / |/ _` | | | |
89 \\ / /| | (_| | |_| |
90 \\/___|_|\__,_|\__, |
91 \\ |___/
92 \\
93 \\This is an atproto [https://atproto.com] relay instance,
94 \\running the zlay codebase [https://tangled.org/zzstoatzz.io/zlay]
95 \\
96 \\The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos
97 \\
98 );
99 } else if (std.mem.eql(u8, path, "/favicon.svg") or std.mem.eql(u8, path, "/favicon.ico")) {
100 h.httpRespond(conn, .ok, "image/svg+xml",
101 \\<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 32 32">
102 \\<rect width="32" height="32" rx="6" fill="#1a1a2e"/>
103 \\<text x="16" y="24" font-family="monospace" font-size="22" font-weight="bold" fill="#e94560" text-anchor="middle">Z</text>
104 \\</svg>
105 );
106 } else {
107 h.respondText(conn, .not_found, "not found");
108 }
109}
110
111fn handlePost(conn: *websocket.Conn, path: []const u8, query: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void {
112 if (std.mem.eql(u8, path, "/admin/repo/ban")) {
113 admin.handleBan(conn, body, headers, ctx);
114 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) {
115 xrpc.handleRequestCrawl(conn, body, ctx.slurper);
116 } else if (std.mem.eql(u8, path, "/admin/hosts/block")) {
117 admin.handleAdminBlockHost(conn, body, headers, ctx.persist);
118 } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) {
119 admin.handleAdminUnblockHost(conn, body, headers, ctx.persist);
120 } else if (std.mem.eql(u8, path, "/admin/hosts/changeLimits")) {
121 admin.handleAdminChangeLimits(conn, body, headers, ctx);
122 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) {
123 admin.handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller);
124 } else {
125 h.respondText(conn, .not_found, "not found");
126 }
127}