GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1//! HTTP server for search and health endpoints
2//! Uses std.http.Server with thread pool
3
4const std = @import("std");
5const http = std.http;
6const mem = std.mem;
7const json = std.json;
8const io = std.Options.debug_io;
9const logfire = @import("logfire");
10const LocalDb = @import("db/LocalDb.zig");
11const search = @import("search.zig");
12const ingest = @import("ingest.zig");
13
14const log = std.log.scoped(.server);
15
16const HTTP_BUF_SIZE = 65536;
17
18pub fn handleConnection(stream: std.Io.net.Stream, local_db: *LocalDb) void {
19 defer stream.close(io);
20
21 var read_buffer: [HTTP_BUF_SIZE]u8 = undefined;
22 var write_buffer: [HTTP_BUF_SIZE]u8 = undefined;
23
24 var reader = stream.reader(io, &read_buffer);
25 var writer = stream.writer(io, &write_buffer);
26
27 var server = http.Server.init(&reader.interface, &writer.interface);
28
29 while (true) {
30 var request = server.receiveHead() catch |err| {
31 if (err != error.HttpConnectionClosing and err != error.EndOfStream) {
32 log.debug("http receive error: {}", .{err});
33 }
34 return;
35 };
36
37 handleRequest(&request, local_db) catch |err| {
38 log.err("request error: {}", .{err});
39 return;
40 };
41
42 if (!request.head.keep_alive) return;
43 }
44}
45
46fn handleRequest(request: *http.Server.Request, local_db: *LocalDb) !void {
47 const target = request.head.target;
48
49 if (request.head.method == .OPTIONS) {
50 try sendCors(request);
51 return;
52 }
53
54 const path = if (mem.indexOf(u8, target, "?")) |qi| target[0..qi] else target;
55
56 if (mem.eql(u8, path, "/search")) {
57 try handleSearch(request, target, local_db);
58 } else if (mem.eql(u8, path, "/health")) {
59 try handleHealth(request, local_db);
60 } else if (mem.eql(u8, path, "/debug/fts")) {
61 try handleDebugFts(request, target, local_db);
62 } else {
63 try sendJson(request, .not_found, "{\"error\":\"not found\"}");
64 }
65}
66
67fn handleSearch(request: *http.Server.Request, target: []const u8, local_db: *LocalDb) !void {
68 if (!local_db.isReady()) {
69 try sendJson(request, .service_unavailable, "{\"error\":\"not ready\"}");
70 return;
71 }
72
73 const raw_q = parseQueryParam(target, "q") orelse parseQueryParam(target, "term") orelse {
74 try sendJson(request, .bad_request, "{\"error\":\"missing q parameter\"}");
75 return;
76 };
77
78 var decode_buf: [512]u8 = undefined;
79 const q = percentDecode(raw_q, &decode_buf) orelse raw_q;
80
81 if (q.len == 0) {
82 try sendJson(request, .bad_request, "{\"error\":\"empty query\"}");
83 return;
84 }
85
86 const limit_str = parseQueryParam(target, "limit");
87 const limit: usize = if (limit_str) |s|
88 std.fmt.parseInt(usize, s, 10) catch 10
89 else
90 10;
91 const clamped_limit = @min(@max(limit, 1), 100);
92
93 var output: std.Io.Writer.Allocating = .init(std.heap.page_allocator);
94 defer output.deinit();
95
96 search.search(local_db, q, clamped_limit, &output.writer) catch {
97 try sendJson(request, .internal_server_error, "{\"error\":\"search failed\"}");
98 return;
99 };
100
101 const body = output.written();
102 try request.respond(body, .{
103 .extra_headers = &.{
104 .{ .name = "content-type", .value = "application/json" },
105 .{ .name = "access-control-allow-origin", .value = "*" },
106 .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" },
107 .{ .name = "access-control-allow-headers", .value = "content-type" },
108 },
109 });
110}
111
112fn handleHealth(request: *http.Server.Request, local_db: *LocalDb) !void {
113 const span = logfire.span("http.health", .{});
114 defer span.end();
115
116 const ready = local_db.isReady();
117 const actors = local_db.countActors();
118 const rss = ingest.getRssKB();
119
120 var buf: [256]u8 = undefined;
121 const body = std.fmt.bufPrint(&buf,
122 \\{{"status":"ok","ready":{s},"actors":{d},"rss_kb":{d}}}
123 , .{
124 if (ready) "true" else "false",
125 actors,
126 rss,
127 }) catch "{\"status\":\"ok\"}";
128
129 span.setAttribute("ready", if (ready) "true" else "false");
130 span.setAttribute("actors", @as(i64, @intCast(actors)));
131 span.setStatus(.ok, null);
132
133 try sendJson(request, .ok, body);
134}
135
136fn sendJson(request: *http.Server.Request, status: http.Status, body: []const u8) !void {
137 try request.respond(body, .{
138 .status = status,
139 .extra_headers = &.{
140 .{ .name = "content-type", .value = "application/json" },
141 .{ .name = "access-control-allow-origin", .value = "*" },
142 .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" },
143 .{ .name = "access-control-allow-headers", .value = "content-type" },
144 },
145 });
146}
147
148fn sendCors(request: *http.Server.Request) !void {
149 try request.respond("", .{
150 .status = .no_content,
151 .extra_headers = &.{
152 .{ .name = "access-control-allow-origin", .value = "*" },
153 .{ .name = "access-control-allow-methods", .value = "GET, OPTIONS" },
154 .{ .name = "access-control-allow-headers", .value = "content-type" },
155 },
156 });
157}
158
159fn parseQueryParam(target: []const u8, param: []const u8) ?[]const u8 {
160 const prefixes = [_][]const u8{ "?", "&" };
161 for (prefixes) |prefix| {
162 var search_buf: [64]u8 = undefined;
163 const search_str = std.fmt.bufPrint(&search_buf, "{s}{s}=", .{ prefix, param }) catch continue;
164 if (mem.indexOf(u8, target, search_str)) |idx| {
165 const rest = target[idx + search_str.len ..];
166 const end = mem.indexOf(u8, rest, "&") orelse rest.len;
167 const value = rest[0..end];
168 if (value.len == 0) return null;
169 return value;
170 }
171 }
172 return null;
173}
174
175/// Decode percent-encoded URL query value into caller-provided buffer.
176/// Handles %XX hex pairs and '+' as space. Thread-safe (no shared state).
177fn percentDecode(input: []const u8, buf: []u8) ?[]const u8 {
178 var out: usize = 0;
179 var i: usize = 0;
180 while (i < input.len) {
181 if (out >= buf.len) return null;
182 if (input[i] == '%' and i + 2 < input.len) {
183 const hi = hexVal(input[i + 1]) orelse {
184 buf[out] = input[i];
185 out += 1;
186 i += 1;
187 continue;
188 };
189 const lo = hexVal(input[i + 2]) orelse {
190 buf[out] = input[i];
191 out += 1;
192 i += 1;
193 continue;
194 };
195 buf[out] = (@as(u8, hi) << 4) | @as(u8, lo);
196 out += 1;
197 i += 3;
198 } else if (input[i] == '+') {
199 buf[out] = ' ';
200 out += 1;
201 i += 1;
202 } else {
203 buf[out] = input[i];
204 out += 1;
205 i += 1;
206 }
207 }
208 return buf[0..out];
209}
210
211fn hexVal(c: u8) ?u4 {
212 return switch (c) {
213 '0'...'9' => @intCast(c - '0'),
214 'a'...'f' => @intCast(c - 'a' + 10),
215 'A'...'F' => @intCast(c - 'A' + 10),
216 else => null,
217 };
218}
219
220/// Debug endpoint: compare actors vs actors_fts for a DID
221/// Usage: /debug/fts?did=did:plc:xxx
222fn handleDebugFts(request: *http.Server.Request, target: []const u8, local_db: *LocalDb) !void {
223 const did = parseQueryParam(target, "did") orelse {
224 try sendJson(request, .bad_request, "{\"error\":\"missing did parameter\"}");
225 return;
226 };
227
228 var decode_buf: [512]u8 = undefined;
229 const term = percentDecode(did, &decode_buf) orelse did;
230
231 var out: std.Io.Writer.Allocating = .init(std.heap.page_allocator);
232 defer out.deinit();
233 var jw: json.Stringify = .{ .writer = &out.writer };
234
235 try jw.beginObject();
236
237 // actors table row
238 try jw.objectField("actors");
239 try writeDebugRow(&jw, local_db, "SELECT did, handle, display_name FROM actors WHERE did = ?", term);
240
241 // FTS table row for same DID
242 try jw.objectField("fts");
243 try writeDebugRow(&jw, local_db, "SELECT did, handle, display_name FROM actors_fts WHERE did = ?", term);
244
245 try jw.endObject();
246
247 try sendJson(request, .ok, out.written());
248}
249
250fn writeDebugRow(jw: anytype, local_db: *LocalDb, comptime sql: []const u8, param: []const u8) !void {
251 var rows = local_db.query(sql, .{param}) catch {
252 try jw.write("error");
253 return;
254 };
255 defer rows.deinit();
256 if (rows.next()) |row| {
257 try jw.beginObject();
258 try jw.objectField("did");
259 try jw.write(row.text(0));
260 try jw.objectField("handle");
261 try jw.write(row.text(1));
262 try jw.objectField("display_name");
263 try jw.write(row.text(2));
264 try jw.endObject();
265 } else {
266 try jw.write(null);
267 }
268}