GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
at main 268 lines 8.8 kB view raw
1//! HTTP server for search and health endpoints 2//! Uses std.http.Server with thread pool 3 4const std = @import("std"); 5const http = std.http; 6const mem = std.mem; 7const json = std.json; 8const io = std.Options.debug_io; 9const logfire = @import("logfire"); 10const LocalDb = @import("db/LocalDb.zig"); 11const search = @import("search.zig"); 12const ingest = @import("ingest.zig"); 13 14const log = std.log.scoped(.server); 15 16const HTTP_BUF_SIZE = 65536; 17 18pub fn handleConnection(stream: std.Io.net.Stream, local_db: *LocalDb) void { 19 defer stream.close(io); 20 21 var read_buffer: [HTTP_BUF_SIZE]u8 = undefined; 22 var write_buffer: [HTTP_BUF_SIZE]u8 = undefined; 23 24 var reader = stream.reader(io, &read_buffer); 25 var writer = stream.writer(io, &write_buffer); 26 27 var server = http.Server.init(&reader.interface, &writer.interface); 28 29 while (true) { 30 var request = server.receiveHead() catch |err| { 31 if (err != error.HttpConnectionClosing and err != error.EndOfStream) { 32 log.debug("http receive error: {}", .{err}); 33 } 34 return; 35 }; 36 37 handleRequest(&request, local_db) catch |err| { 38 log.err("request error: {}", .{err}); 39 return; 40 }; 41 42 if (!request.head.keep_alive) return; 43 } 44} 45 46fn handleRequest(request: *http.Server.Request, local_db: *LocalDb) !void { 47 const target = request.head.target; 48 49 if (request.head.method == .OPTIONS) { 50 try sendCors(request); 51 return; 52 } 53 54 const path = if (mem.indexOf(u8, target, "?")) |qi| target[0..qi] else target; 55 56 if (mem.eql(u8, path, "/search")) { 57 try handleSearch(request, target, local_db); 58 } else if (mem.eql(u8, path, "/health")) { 59 try handleHealth(request, local_db); 60 } else if (mem.eql(u8, path, "/debug/fts")) { 61 try handleDebugFts(request, target, local_db); 62 } else { 63 try sendJson(request, .not_found, "{\"error\":\"not found\"}"); 64 } 65} 66 67fn handleSearch(request: *http.Server.Request, target: []const u8, local_db: *LocalDb) !void { 68 if (!local_db.isReady()) { 69 try sendJson(request, .service_unavailable, "{\"error\":\"not ready\"}"); 70 return; 71 } 72 73 const raw_q = parseQueryParam(target, "q") orelse parseQueryParam(target, "term") orelse { 74 try sendJson(request, .bad_request, "{\"error\":\"missing q parameter\"}"); 75 return; 76 }; 77 78 var decode_buf: [512]u8 = undefined; 79 const q = percentDecode(raw_q, &decode_buf) orelse raw_q; 80 81 if (q.len == 0) { 82 try sendJson(request, .bad_request, "{\"error\":\"empty query\"}"); 83 return; 84 } 85 86 const limit_str = parseQueryParam(target, "limit"); 87 const limit: usize = if (limit_str) |s| 88 std.fmt.parseInt(usize, s, 10) catch 10 89 else 90 10; 91 const clamped_limit = @min(@max(limit, 1), 100); 92 93 var output: std.Io.Writer.Allocating = .init(std.heap.page_allocator); 94 defer output.deinit(); 95 96 search.search(local_db, q, clamped_limit, &output.writer) catch { 97 try sendJson(request, .internal_server_error, "{\"error\":\"search failed\"}"); 98 return; 99 }; 100 101 const body = output.written(); 102 try request.respond(body, .{ 103 .extra_headers = &.{ 104 .{ .name = "content-type", .value = "application/json" }, 105 .{ .name = "access-control-allow-origin", .value = "*" }, 106 .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" }, 107 .{ .name = "access-control-allow-headers", .value = "content-type" }, 108 }, 109 }); 110} 111 112fn handleHealth(request: *http.Server.Request, local_db: *LocalDb) !void { 113 const span = logfire.span("http.health", .{}); 114 defer span.end(); 115 116 const ready = local_db.isReady(); 117 const actors = local_db.countActors(); 118 const rss = ingest.getRssKB(); 119 120 var buf: [256]u8 = undefined; 121 const body = std.fmt.bufPrint(&buf, 122 \\{{"status":"ok","ready":{s},"actors":{d},"rss_kb":{d}}} 123 , .{ 124 if (ready) "true" else "false", 125 actors, 126 rss, 127 }) catch "{\"status\":\"ok\"}"; 128 129 span.setAttribute("ready", if (ready) "true" else "false"); 130 span.setAttribute("actors", @as(i64, @intCast(actors))); 131 span.setStatus(.ok, null); 132 133 try sendJson(request, .ok, body); 134} 135 136fn sendJson(request: *http.Server.Request, status: http.Status, body: []const u8) !void { 137 try request.respond(body, .{ 138 .status = status, 139 .extra_headers = &.{ 140 .{ .name = "content-type", .value = "application/json" }, 141 .{ .name = "access-control-allow-origin", .value = "*" }, 142 .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" }, 143 .{ .name = "access-control-allow-headers", .value = "content-type" }, 144 }, 145 }); 146} 147 148fn sendCors(request: *http.Server.Request) !void { 149 try request.respond("", .{ 150 .status = .no_content, 151 .extra_headers = &.{ 152 .{ .name = "access-control-allow-origin", .value = "*" }, 153 .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" }, 154 .{ .name = "access-control-allow-headers", .value = "content-type" }, 155 }, 156 }); 157} 158 159fn parseQueryParam(target: []const u8, param: []const u8) ?[]const u8 { 160 const prefixes = [_][]const u8{ "?", "&" }; 161 for (prefixes) |prefix| { 162 var search_buf: [64]u8 = undefined; 163 const search_str = std.fmt.bufPrint(&search_buf, "{s}{s}=", .{ prefix, param }) catch continue; 164 if (mem.indexOf(u8, target, search_str)) |idx| { 165 const rest = target[idx + search_str.len ..]; 166 const end = mem.indexOf(u8, rest, "&") orelse rest.len; 167 const value = rest[0..end]; 168 if (value.len == 0) return null; 169 return value; 170 } 171 } 172 return null; 173} 174 175/// Decode percent-encoded URL query value into caller-provided buffer. 176/// Handles %XX hex pairs and '+' as space. Thread-safe (no shared state). 177fn percentDecode(input: []const u8, buf: []u8) ?[]const u8 { 178 var out: usize = 0; 179 var i: usize = 0; 180 while (i < input.len) { 181 if (out >= buf.len) return null; 182 if (input[i] == '%' and i + 2 < input.len) { 183 const hi = hexVal(input[i + 1]) orelse { 184 buf[out] = input[i]; 185 out += 1; 186 i += 1; 187 continue; 188 }; 189 const lo = hexVal(input[i + 2]) orelse { 190 buf[out] = input[i]; 191 out += 1; 192 i += 1; 193 continue; 194 }; 195 buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); 196 out += 1; 197 i += 3; 198 } else if (input[i] == '+') { 199 buf[out] = ' '; 200 out += 1; 201 i += 1; 202 } else { 203 buf[out] = input[i]; 204 out += 1; 205 i += 1; 206 } 207 } 208 return buf[0..out]; 209} 210 211fn hexVal(c: u8) ?u4 { 212 return switch (c) { 213 '0'...'9' => @intCast(c - '0'), 214 'a'...'f' => @intCast(c - 'a' + 10), 215 'A'...'F' => @intCast(c - 'A' + 10), 216 else => null, 217 }; 218} 219 220/// Debug endpoint: compare actors vs actors_fts for a DID 221/// Usage: /debug/fts?did=did:plc:xxx 222fn handleDebugFts(request: *http.Server.Request, target: []const u8, local_db: *LocalDb) !void { 223 const did = parseQueryParam(target, "did") orelse { 224 try sendJson(request, .bad_request, "{\"error\":\"missing did parameter\"}"); 225 return; 226 }; 227 228 var decode_buf: [512]u8 = undefined; 229 const term = percentDecode(did, &decode_buf) orelse did; 230 231 var out: std.Io.Writer.Allocating = .init(std.heap.page_allocator); 232 defer out.deinit(); 233 var jw: json.Stringify = .{ .writer = &out.writer }; 234 235 try jw.beginObject(); 236 237 // actors table row 238 try jw.objectField("actors"); 239 try writeDebugRow(&jw, local_db, "SELECT did, handle, display_name FROM actors WHERE did = ?", term); 240 241 // FTS table row for same DID 242 try jw.objectField("fts"); 243 try writeDebugRow(&jw, local_db, "SELECT did, handle, display_name FROM actors_fts WHERE did = ?", term); 244 245 try jw.endObject(); 246 247 try sendJson(request, .ok, out.written()); 248} 249 250fn writeDebugRow(jw: anytype, local_db: *LocalDb, comptime sql: []const u8, param: []const u8) !void { 251 var rows = local_db.query(sql, .{param}) catch { 252 try jw.write("error"); 253 return; 254 }; 255 defer rows.deinit(); 256 if (rows.next()) |row| { 257 try jw.beginObject(); 258 try jw.objectField("did"); 259 try jw.write(row.text(0)); 260 try jw.objectField("handle"); 261 try jw.write(row.text(1)); 262 try jw.objectField("display_name"); 263 try jw.write(row.text(2)); 264 try jw.endObject(); 265 } else { 266 try jw.write(null); 267 } 268}