atproto relay implementation in zig zlay.waow.tech
at main 127 lines 6.1 kB view raw
1//! HTTP route dispatch and static content. 2//! 3//! defines HttpContext (the shared context plumbed from main.zig) and the 4//! top-level request router that delegates to xrpc and admin handler modules. 5 6const std = @import("std"); 7const websocket = @import("websocket"); 8const broadcaster = @import("../broadcaster.zig"); 9const validator_mod = @import("../validator.zig"); 10const slurper_mod = @import("../slurper.zig"); 11const event_log_mod = @import("../event_log.zig"); 12const collection_index_mod = @import("../collection_index.zig"); 13const backfill_mod = @import("../backfill.zig"); 14const h = @import("http.zig"); 15const xrpc = @import("xrpc.zig"); 16const admin = @import("admin.zig"); 17 18/// context for HTTP fallback handlers (passed as opaque pointer through broadcaster) 19pub const HttpContext = struct { 20 stats: *broadcaster.Stats, 21 persist: *event_log_mod.DiskPersist, 22 slurper: *slurper_mod.Slurper, 23 collection_index: *collection_index_mod.CollectionIndex, 24 backfiller: *backfill_mod.Backfiller, 25 bc: *broadcaster.Broadcaster, 26 validator: *validator_mod.Validator, 27}; 28 29/// top-level HTTP request router — installed as bc.http_fallback 30pub fn handleHttpRequest( 31 conn: *websocket.Conn, 32 method: []const u8, 33 url: []const u8, 34 body: []const u8, 35 headers: *const websocket.Handshake.KeyValue, 36 opaque_ctx: ?*anyopaque, 37) void { 38 const ctx: *HttpContext = @ptrCast(@alignCast(opaque_ctx orelse return)); 39 40 const qmark = std.mem.indexOfScalar(u8, url, '?'); 41 const path = url[0..(qmark orelse url.len)]; 42 const query = if (qmark) |q| url[q + 1 ..] else ""; 43 44 if (std.mem.eql(u8, method, "GET")) { 45 handleGet(conn, path, query, headers, ctx); 46 } else if (std.mem.eql(u8, method, "POST")) { 47 handlePost(conn, path, query, body, headers, ctx); 48 } else { 49 h.respondText(conn, .method_not_allowed, "method not allowed"); 50 } 51} 52 53fn handleGet(conn: *websocket.Conn, path: []const u8, query: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 54 if (std.mem.eql(u8, path, "/_healthz")) { 55 // trivial liveness — process is alive, constant-time, no dependencies 56 h.respondJson(conn, .ok, "{\"status\":\"ok\"}"); 57 } else if (std.mem.eql(u8, path, "/_readyz") or std.mem.eql(u8, path, "/_health") or std.mem.eql(u8, path, "/xrpc/_health")) { 58 // readiness — checks DB dependency 59 _ = ctx.persist.db.exec("SELECT 1", .{}) catch { 60 h.respondJson(conn, .internal_server_error, "{\"status\":\"error\",\"msg\":\"database unavailable\"}"); 61 return; 62 }; 63 h.respondJson(conn, .ok, "{\"status\":\"ok\"}"); 64 } else if (std.mem.eql(u8, path, "/_stats")) { 65 var stats_buf: [4096]u8 = undefined; 66 const body = broadcaster.formatStatsResponse(ctx.stats, &stats_buf); 67 h.respondJson(conn, .ok, body); 68 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listRepos")) { 69 xrpc.handleListRepos(conn, query, ctx.persist); 70 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getRepoStatus")) { 71 xrpc.handleGetRepoStatus(conn, query, ctx.persist); 72 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getLatestCommit")) { 73 xrpc.handleGetLatestCommit(conn, query, ctx.persist); 74 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listReposByCollection")) { 75 xrpc.handleListReposByCollection(conn, query, ctx.collection_index); 76 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.listHosts")) { 77 xrpc.handleListHosts(conn, query, ctx.persist); 78 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.getHostStatus")) { 79 xrpc.handleGetHostStatus(conn, query, ctx.persist); 80 } else if (std.mem.eql(u8, path, "/admin/hosts")) { 81 admin.handleAdminListHosts(conn, headers, ctx); 82 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 83 admin.handleAdminBackfillStatus(conn, headers, ctx.backfiller); 84 } else if (std.mem.eql(u8, path, "/")) { 85 h.respondText(conn, .ok, 86 \\ _ 87 \\ ___| | __ _ _ _ 88 \\|_ / |/ _` | | | | 89 \\ / /| | (_| | |_| | 90 \\/___|_|\__,_|\__, | 91 \\ |___/ 92 \\ 93 \\This is an atproto [https://atproto.com] relay instance, 94 \\running the zlay codebase [https://tangled.org/zzstoatzz.io/zlay] 95 \\ 96 \\The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 97 \\ 98 ); 99 } else if (std.mem.eql(u8, path, "/favicon.svg") or std.mem.eql(u8, path, "/favicon.ico")) { 100 h.httpRespond(conn, .ok, "image/svg+xml", 101 \\<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 32 32"> 102 \\<rect width="32" height="32" rx="6" fill="#1a1a2e"/> 103 \\<text x="16" y="24" font-family="monospace" font-size="22" font-weight="bold" fill="#e94560" text-anchor="middle">Z</text> 104 \\</svg> 105 ); 106 } else { 107 h.respondText(conn, .not_found, "not found"); 108 } 109} 110 111fn handlePost(conn: *websocket.Conn, path: []const u8, query: []const u8, body: []const u8, headers: *const websocket.Handshake.KeyValue, ctx: *HttpContext) void { 112 if (std.mem.eql(u8, path, "/admin/repo/ban")) { 113 admin.handleBan(conn, body, headers, ctx); 114 } else if (std.mem.eql(u8, path, "/xrpc/com.atproto.sync.requestCrawl")) { 115 xrpc.handleRequestCrawl(conn, body, ctx.slurper); 116 } else if (std.mem.eql(u8, path, "/admin/hosts/block")) { 117 admin.handleAdminBlockHost(conn, body, headers, ctx.persist); 118 } else if (std.mem.eql(u8, path, "/admin/hosts/unblock")) { 119 admin.handleAdminUnblockHost(conn, body, headers, ctx.persist); 120 } else if (std.mem.eql(u8, path, "/admin/hosts/changeLimits")) { 121 admin.handleAdminChangeLimits(conn, body, headers, ctx); 122 } else if (std.mem.eql(u8, path, "/admin/backfill-collections")) { 123 admin.handleAdminBackfillTrigger(conn, query, headers, ctx.backfiller); 124 } else { 125 h.respondText(conn, .not_found, "not found"); 126 } 127}