GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["httpx"]
5# ///
6"""
7performance benchmark: typeahead vs bluesky searchActorsTypeahead.
8
9measures latency (cold + warm), coverage/overlap, field completeness,
10display-name search, and stress-tests our API under concurrent load.
11
12usage:
13 ./scripts/bench.py # full benchmark against prod
14 ./scripts/bench.py --url http://localhost:8787 # test local dev
15 ./scripts/bench.py --quick # 10 queries, 1 run
16 ./scripts/bench.py --no-stress # skip stress test
17 ./scripts/bench.py --queries nate boorkie # specific queries only
18 ./scripts/bench.py --runs 5 # more runs for confidence
19"""
20
21import argparse
22import asyncio
23import json
24import statistics
25import sys
26import time
27from dataclasses import dataclass, field, asdict
28from datetime import datetime, timezone
29from pathlib import Path
30from urllib.parse import quote
31
32import httpx
33
34OURS_DEFAULT = "https://typeahead.waow.tech"
35BSKY = "https://public.api.bsky.app"
36XRPC = "/xrpc/app.bsky.actor.searchActorsTypeahead"
37
38# colors
39BOLD = "\033[1m"
40GREEN = "\033[32m"
41RED = "\033[31m"
42YELLOW = "\033[33m"
43CYAN = "\033[36m"
44RESET = "\033[0m"
45
46FULL_CORPUS = [
47 # short prefixes
48 "a", "na", "sky", "bl", "j",
49 # common names
50 "nate", "sarah", "alex", "dan", "paul", "sam", "chris", "jordan", "mike", "anna",
51 # display-name-only terms (no matching handle expected)
52 "boorkie", "kohari",
53 # specific handles
54 "zzstoatzz", "pfrazee", "jay.bsky.team", "alice", "bob",
55 # unicode
56 "André", "naïve", "café",
57 # multi-word display names
58 "nate kohari", "paul frazee",
59 # edge cases
60 "nate.io", "the", "test", "bot", "news", "art", "dev", "music",
61 # longer/rarer
62 "photographer", "designer", "engineer", "journalist",
63 # more diversity
64 "tokyo", "berlin", "podcast", "crypto", "gaming",
65]
66
67QUICK_CORPUS = [
68 "nate", "zzstoatzz", "paul", "boorkie", "sky",
69 "sarah", "André", "nate kohari", "a", "dev",
70]
71
72DISPLAY_NAME_QUERIES = [
73 "boorkie", "kohari", "nate kohari", "paul frazee",
74]
75
76FIELDS_TO_CHECK = ["displayName", "avatar", "createdAt", "associated"]
77
78
79@dataclass
80class LatencyStats:
81 query: str
82 ours_cold_ms: list[float] = field(default_factory=list)
83 ours_warm_ms: list[float] = field(default_factory=list)
84 bsky_ms: list[float] = field(default_factory=list)
85
86 def _summarize(self, ms: list[float]) -> dict:
87 if not ms:
88 return {}
89 s = sorted(ms)
90 return {
91 "min": round(min(s), 1),
92 "max": round(max(s), 1),
93 "mean": round(statistics.mean(s), 1),
94 "p50": round(s[len(s) // 2], 1),
95 "p95": round(s[int(len(s) * 0.95)], 1) if len(s) >= 2 else round(max(s), 1),
96 }
97
98 def summarize(self, side: str) -> dict:
99 if side == "ours_cold":
100 return self._summarize(self.ours_cold_ms)
101 if side == "ours_warm":
102 return self._summarize(self.ours_warm_ms)
103 return self._summarize(self.bsky_ms)
104
105
106@dataclass
107class CoverageResult:
108 query: str
109 ours_actors: list[dict]
110 bsky_actors: list[dict]
111 ours_dids: list[str]
112 bsky_dids: list[str]
113 overlap: list[str]
114 ours_extras: list[str]
115 bsky_extras: list[str]
116 rank_deltas: list[int]
117
118
119@dataclass
120class StressResult:
121 concurrency: int
122 total: int
123 ok: int
124 rate_limited: int
125 errors: int
126 latencies_ms: list[float] = field(default_factory=list)
127
128
129def progress(label: str, done: int, total: int):
130 sys.stdout.write(f"\r {label}: {done}/{total}")
131 sys.stdout.flush()
132
133
134def clear_line():
135 sys.stdout.write("\r" + " " * 70 + "\r")
136
137
138async def timed_fetch(
139 client: httpx.AsyncClient, url: str, timeout: float = 30.0
140) -> tuple[dict | None, float, int]:
141 """fetch JSON, return (body, latency_ms, status_code)."""
142 t0 = time.monotonic()
143 try:
144 r = await client.get(url, timeout=timeout)
145 ms = (time.monotonic() - t0) * 1000
146 if r.status_code == 200:
147 return r.json(), ms, r.status_code
148 return None, ms, r.status_code
149 except Exception:
150 ms = (time.monotonic() - t0) * 1000
151 return None, ms, 0
152
153
154def search_url(base: str, q: str, limit: int = 10) -> str:
155 return f"{base}{XRPC}?q={quote(q)}&limit={limit}"
156
157
158async def run_latency(
159 client: httpx.AsyncClient,
160 ours_url: str,
161 corpus: list[str],
162 runs: int,
163) -> list[LatencyStats]:
164 """sequential latency: cold (cache-busted) + warm (cached) + bsky baseline."""
165 results = []
166 # cold: runs * 2 reqs/query (ours+bsky), warm: 2 reqs/query (ours+bsky)
167 total = len(corpus) * (runs * 2 + 2)
168 done = 0
169
170 for q in corpus:
171 stats = LatencyStats(query=q)
172
173 # cold runs: vary limit to bust CF cache
174 for run_i in range(runs):
175 limit = 8 + (run_i % 3)
176
177 _, ms, status = await timed_fetch(client, search_url(ours_url, q, limit))
178 if status == 200:
179 stats.ours_cold_ms.append(ms)
180 done += 1
181 progress("latency", done, total)
182 await asyncio.sleep(1.05)
183
184 _, ms, status = await timed_fetch(client, search_url(BSKY, q, limit))
185 if status == 200:
186 stats.bsky_ms.append(ms)
187 done += 1
188 progress("latency", done, total)
189 await asyncio.sleep(0.2)
190
191 # warm run: repeat exact same request (limit=10, should hit CF cache)
192 _, ms, status = await timed_fetch(client, search_url(ours_url, q, 10))
193 done += 1
194 progress("latency", done, total)
195 await asyncio.sleep(1.05)
196 # second hit — this one should be cached
197 _, ms, status = await timed_fetch(client, search_url(ours_url, q, 10))
198 if status == 200:
199 stats.ours_warm_ms.append(ms)
200 done += 1
201 progress("latency", done, total)
202 await asyncio.sleep(1.05)
203
204 results.append(stats)
205
206 clear_line()
207 return results
208
209
210async def run_coverage_and_fields(
211 client: httpx.AsyncClient,
212 ours_url: str,
213 corpus: list[str],
214) -> tuple[list[CoverageResult], dict]:
215 """compare result sets at limit=10; also compute field completeness."""
216 coverage_results = []
217
218 # aggregate field counts
219 ours_field_counts = {f: 0 for f in FIELDS_TO_CHECK}
220 bsky_field_counts = {f: 0 for f in FIELDS_TO_CHECK}
221 ours_actor_total = 0
222 bsky_actor_total = 0
223
224 for i, q in enumerate(corpus):
225 progress("coverage+fields", i + 1, len(corpus))
226
227 ours_data, _, _ = await timed_fetch(client, search_url(ours_url, q))
228 await asyncio.sleep(1.05)
229 bsky_data, _, _ = await timed_fetch(client, search_url(BSKY, q))
230 await asyncio.sleep(0.2)
231
232 ours_actors = (ours_data or {}).get("actors", [])
233 bsky_actors = (bsky_data or {}).get("actors", [])
234
235 ours_dids = [a["did"] for a in ours_actors]
236 bsky_dids = [a["did"] for a in bsky_actors]
237
238 ours_set = set(ours_dids)
239 bsky_set = set(bsky_dids)
240 overlap = list(ours_set & bsky_set)
241
242 ours_pos = {d: i for i, d in enumerate(ours_dids)}
243 bsky_pos = {d: i for i, d in enumerate(bsky_dids)}
244 rank_deltas = [abs(ours_pos[d] - bsky_pos[d]) for d in overlap if d in ours_pos and d in bsky_pos]
245
246 coverage_results.append(CoverageResult(
247 query=q,
248 ours_actors=ours_actors,
249 bsky_actors=bsky_actors,
250 ours_dids=ours_dids,
251 bsky_dids=bsky_dids,
252 overlap=overlap,
253 ours_extras=list(ours_set - bsky_set),
254 bsky_extras=list(bsky_set - ours_set),
255 rank_deltas=rank_deltas,
256 ))
257
258 # field completeness
259 ours_actor_total += len(ours_actors)
260 bsky_actor_total += len(bsky_actors)
261 for f in FIELDS_TO_CHECK:
262 ours_field_counts[f] += sum(1 for a in ours_actors if a.get(f))
263 bsky_field_counts[f] += sum(1 for a in bsky_actors if a.get(f))
264
265 clear_line()
266
267 field_summary = {
268 "ours_total": ours_actor_total,
269 "bsky_total": bsky_actor_total,
270 "ours": ours_field_counts,
271 "bsky": bsky_field_counts,
272 }
273 return coverage_results, field_summary
274
275
276async def run_display_name_check(
277 client: httpx.AsyncClient,
278 ours_url: str,
279 queries: list[str],
280) -> list[dict]:
281 """verify display-name-only queries return results."""
282 results = []
283 for q in queries:
284 ours_data, _, _ = await timed_fetch(client, search_url(ours_url, q))
285 await asyncio.sleep(1.05)
286 bsky_data, _, _ = await timed_fetch(client, search_url(BSKY, q))
287 await asyncio.sleep(0.2)
288
289 ours_actors = (ours_data or {}).get("actors", [])
290 bsky_actors = (bsky_data or {}).get("actors", [])
291 results.append({
292 "query": q,
293 "ours_count": len(ours_actors),
294 "bsky_count": len(bsky_actors),
295 "found": len(ours_actors) > 0,
296 "ours_sample": [a.get("handle", a.get("did", "?")) for a in ours_actors[:3]],
297 "bsky_sample": [a.get("handle", a.get("did", "?")) for a in bsky_actors[:3]],
298 })
299
300 return results
301
302
303async def run_stress(
304 client: httpx.AsyncClient,
305 ours_url: str,
306 corpus: list[str],
307 levels: list[int],
308) -> list[StressResult]:
309 """concurrent request stress test (our API only)."""
310 results = []
311
312 for n in levels:
313 sys.stdout.write(f"\r stress: concurrency={n}...")
314 sys.stdout.flush()
315
316 queries = (corpus * ((n // len(corpus)) + 1))[:n]
317 tasks = [
318 timed_fetch(client, search_url(ours_url, q, limit=7 + (i % 4)))
319 for i, q in enumerate(queries)
320 ]
321
322 responses = await asyncio.gather(*tasks)
323
324 sr = StressResult(concurrency=n, total=n, ok=0, rate_limited=0, errors=0)
325 for body, ms, status in responses:
326 sr.latencies_ms.append(ms)
327 if status == 200:
328 sr.ok += 1
329 elif status == 429:
330 sr.rate_limited += 1
331 else:
332 sr.errors += 1
333
334 results.append(sr)
335 await asyncio.sleep(5)
336
337 clear_line()
338 return results
339
340
341# ── printing ────────────────────────────────────────────────────────
342
343def pct(n: int, total: int) -> str:
344 return f"{n * 100 / total:.0f}%" if total else "n/a"
345
346
347def fmt_ms(ms: float) -> str:
348 if ms >= 1000:
349 return f"{ms / 1000:.1f}s"
350 return f"{ms:.0f}ms"
351
352
353def print_latency_table(stats_list: list[LatencyStats]):
354 print(f"\n{BOLD}--- latency (cold, cache-busted) ---{RESET}")
355 header = f" {'query':<20} {'ours':>10} {'bsky':>10} {'delta':>10} {'winner':>8}"
356 print(header)
357 print(f" {'─' * 20} {'─' * 10} {'─' * 10} {'─' * 10} {'─' * 8}")
358
359 all_ours_cold = []
360 all_bsky = []
361 ours_wins = 0
362 total_compared = 0
363
364 for s in stats_list:
365 oc = s.summarize("ours_cold")
366 b = s.summarize("bsky")
367 if not oc or not b:
368 continue
369
370 all_ours_cold.extend(s.ours_cold_ms)
371 all_bsky.extend(s.bsky_ms)
372
373 op50, bp50 = oc["p50"], b["p50"]
374 delta = op50 - bp50
375 total_compared += 1
376 if delta < 0:
377 ours_wins += 1
378 w_str = f"{GREEN}ours{RESET}"
379 elif delta > 0:
380 w_str = f"{RED}bsky{RESET}"
381 else:
382 w_str = "tie"
383
384 d_str = f"{'+' if delta > 0 else ''}{fmt_ms(abs(delta)) if delta >= 0 else '-' + fmt_ms(abs(delta))}"
385 print(f" {s.query:<20} {fmt_ms(op50):>10} {fmt_ms(bp50):>10} {d_str:>10} {w_str:>17}")
386
387 if all_ours_cold and all_bsky:
388 oc50 = sorted(all_ours_cold)[len(all_ours_cold) // 2]
389 oc95 = sorted(all_ours_cold)[int(len(all_ours_cold) * 0.95)]
390 b50 = sorted(all_bsky)[len(all_bsky) // 2]
391 b95 = sorted(all_bsky)[int(len(all_bsky) * 0.95)]
392 print()
393 print(f" {BOLD}cold:{RESET} ours p50={fmt_ms(oc50)} p95={fmt_ms(oc95)} | bsky p50={fmt_ms(b50)} p95={fmt_ms(b95)}")
394 print(f" ours faster on {ours_wins}/{total_compared} queries (cold)")
395
396 # warm summary
397 all_warm = []
398 for s in stats_list:
399 all_warm.extend(s.ours_warm_ms)
400 if all_warm:
401 w50 = sorted(all_warm)[len(all_warm) // 2]
402 w95 = sorted(all_warm)[int(len(all_warm) * 0.95)] if len(all_warm) >= 2 else max(all_warm)
403 print(f" {BOLD}warm:{RESET} ours p50={fmt_ms(w50)} p95={fmt_ms(w95)} (CF cache hit)")
404
405
406def print_coverage_table(results: list[CoverageResult]):
407 print(f"\n{BOLD}--- coverage ---{RESET}")
408 print(f" {'query':<20} {'overlap':>10} {'ours':>6} {'bsky':>6} {'pct':>6} {'rank Δ':>8}")
409 print(f" {'─' * 20} {'─' * 10} {'─' * 6} {'─' * 6} {'─' * 6} {'─' * 8}")
410
411 total_overlap = 0
412 total_bsky = 0
413 complete_misses = 0
414 we_have_more = 0
415
416 for r in results:
417 n_overlap = len(r.overlap)
418 n_bsky = len(r.bsky_dids)
419 n_ours = len(r.ours_dids)
420 total_overlap += n_overlap
421 total_bsky += n_bsky
422
423 p = f"{n_overlap * 100 // n_bsky}%" if n_bsky else "n/a"
424 avg_delta = f"{statistics.mean(r.rank_deltas):.1f}" if r.rank_deltas else "—"
425
426 if n_ours == 0 and n_bsky > 0:
427 complete_misses += 1
428 if n_ours > n_bsky:
429 we_have_more += 1
430
431 print(f" {r.query:<20} {n_overlap:>3}/{n_bsky:<6} {n_ours:>6} {n_bsky:>6} {p:>6} {avg_delta:>8}")
432
433 print()
434 mean_pct = total_overlap * 100 / total_bsky if total_bsky else 0
435 print(f" {BOLD}mean overlap:{RESET} {mean_pct:.0f}%")
436 print(f" complete misses (ours=0, bsky>0): {complete_misses}/{len(results)}")
437 print(f" queries where we have more results: {we_have_more}/{len(results)}")
438
439
440def print_field_table(field_summary: dict):
441 print(f"\n{BOLD}--- field completeness ---{RESET}")
442 ours_total = field_summary["ours_total"]
443 bsky_total = field_summary["bsky_total"]
444
445 print(f" {'field':<16} {'ours':>8} {'bsky':>8}")
446 print(f" {'─' * 16} {'─' * 8} {'─' * 8}")
447 for f in FIELDS_TO_CHECK:
448 o = pct(field_summary["ours"][f], ours_total)
449 b = pct(field_summary["bsky"][f], bsky_total)
450 print(f" {f:<16} {o:>8} {b:>8}")
451 print(f" {'─' * 16} {'─' * 8} {'─' * 8}")
452 print(f" {'total actors':<16} {ours_total:>8} {bsky_total:>8}")
453
454
455def print_display_name_table(results: list[dict]):
456 print(f"\n{BOLD}--- display name search ---{RESET}")
457 print(f" {'query':<20} {'found?':>8} {'ours':>6} {'bsky':>6} samples")
458 print(f" {'─' * 20} {'─' * 8} {'─' * 6} {'─' * 6} {'─' * 30}")
459 for r in results:
460 found = f"{GREEN}yes{RESET}" if r["found"] else f"{RED}no{RESET}"
461 samples = ", ".join(r["ours_sample"][:3]) if r["ours_sample"] else "—"
462 print(f" {r['query']:<20} {found:>17} {r['ours_count']:>6} {r['bsky_count']:>6} {samples}")
463
464
465def print_stress_table(results: list[StressResult]):
466 print(f"\n{BOLD}--- stress test (ours only) ---{RESET}")
467 print(f" {'concurrency':>12} {'ok':>6} {'429s':>6} {'5xx':>6} {'p50':>8} {'p95':>8}")
468 print(f" {'─' * 12} {'─' * 6} {'─' * 6} {'─' * 6} {'─' * 8} {'─' * 8}")
469 for r in results:
470 lats = sorted(r.latencies_ms)
471 p50 = fmt_ms(lats[len(lats) // 2]) if lats else "—"
472 p95 = fmt_ms(lats[int(len(lats) * 0.95)]) if len(lats) >= 2 else p50
473 print(f" {r.concurrency:>12} {r.ok:>6} {r.rate_limited:>6} {r.errors:>6} {p50:>8} {p95:>8}")
474
475
476# ── JSON report ─────────────────────────────────────────────────────
477
478def build_report(
479 ours_url: str,
480 corpus: list[str],
481 runs: int,
482 latency: list[LatencyStats],
483 coverage: list[CoverageResult],
484 field_summary: dict,
485 display_name: list[dict],
486 stress: list[StressResult],
487) -> dict:
488 def latency_entry(s: LatencyStats) -> dict:
489 return {
490 "query": s.query,
491 "ours_cold": s.summarize("ours_cold"),
492 "ours_warm": s.summarize("ours_warm"),
493 "bsky": s.summarize("bsky"),
494 }
495
496 def coverage_entry(r: CoverageResult) -> dict:
497 return {
498 "query": r.query,
499 "ours_count": len(r.ours_dids),
500 "bsky_count": len(r.bsky_dids),
501 "overlap_count": len(r.overlap),
502 "overlap_pct": round(len(r.overlap) * 100 / len(r.bsky_dids), 1) if r.bsky_dids else None,
503 "ours_extras": len(r.ours_extras),
504 "bsky_extras": len(r.bsky_extras),
505 "avg_rank_delta": round(statistics.mean(r.rank_deltas), 2) if r.rank_deltas else None,
506 }
507
508 def stress_entry(r: StressResult) -> dict:
509 lats = sorted(r.latencies_ms)
510 return {
511 "concurrency": r.concurrency,
512 "ok": r.ok,
513 "rate_limited": r.rate_limited,
514 "errors": r.errors,
515 "p50_ms": round(lats[len(lats) // 2], 1) if lats else None,
516 "p95_ms": round(lats[int(len(lats) * 0.95)], 1) if len(lats) >= 2 else None,
517 }
518
519 return {
520 "meta": {
521 "target": ours_url,
522 "baseline": BSKY,
523 "corpus_size": len(corpus),
524 "runs": runs,
525 "date": datetime.now(timezone.utc).isoformat(),
526 },
527 "latency": [latency_entry(s) for s in latency],
528 "coverage": [coverage_entry(r) for r in coverage],
529 "field_completeness": field_summary,
530 "display_name_search": display_name,
531 "stress": [stress_entry(r) for r in stress],
532 }
533
534
535# ── main ────────────────────────────────────────────────────────────
536
537async def main():
538 parser = argparse.ArgumentParser(description="typeahead performance benchmark")
539 parser.add_argument("--url", default=OURS_DEFAULT, help=f"our API URL (default: {OURS_DEFAULT})")
540 parser.add_argument("--quick", action="store_true", help="10 queries, 1 run")
541 parser.add_argument("--no-stress", action="store_true", help="skip stress test")
542 parser.add_argument("--queries", nargs="+", help="specific queries only")
543 parser.add_argument("--runs", type=int, default=3, help="runs per query for latency (default: 3)")
544 parser.add_argument("--output", default="scripts/bench-results.json", help="JSON report path")
545 args = parser.parse_args()
546
547 if args.quick:
548 args.runs = 1
549
550 corpus = args.queries or (QUICK_CORPUS if args.quick else FULL_CORPUS)
551 dn_queries = [q for q in DISPLAY_NAME_QUERIES if q in corpus] or DISPLAY_NAME_QUERIES[:2]
552
553 print(f"\n{BOLD}=== typeahead benchmark ==={RESET}")
554 print(f" target: {args.url}")
555 print(f" baseline: {BSKY}")
556 print(f" corpus: {len(corpus)} queries, {args.runs} run(s) each")
557 print(f" date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
558
559 async with httpx.AsyncClient(
560 headers={"User-Agent": "typeahead-bench/1.0", "X-Client": "bench"},
561 follow_redirects=True,
562 ) as client:
563 # 1. latency (cold + warm)
564 print(f"\n{CYAN}[1/4] latency comparison (cold + warm){RESET}")
565 latency = await run_latency(client, args.url, corpus, args.runs)
566 print_latency_table(latency)
567
568 # 2. coverage + field completeness (single pass)
569 print(f"\n{CYAN}[2/4] coverage + field completeness{RESET}")
570 coverage, field_summary = await run_coverage_and_fields(client, args.url, corpus)
571 print_coverage_table(coverage)
572 print_field_table(field_summary)
573
574 # 3. display name search
575 print(f"\n{CYAN}[3/4] display name search{RESET}")
576 display_name = await run_display_name_check(client, args.url, dn_queries)
577 print_display_name_table(display_name)
578
579 # 4. stress test
580 stress = []
581 if args.no_stress:
582 print(f"\n{CYAN}[4/4] stress test{RESET}")
583 print(f" {YELLOW}skipped{RESET}")
584 else:
585 print(f"\n{CYAN}[4/4] stress test (ours only){RESET}")
586 stress = await run_stress(client, args.url, corpus, [5, 10, 20])
587 print_stress_table(stress)
588
589 # write report
590 report = build_report(args.url, corpus, args.runs, latency, coverage, field_summary, display_name, stress)
591 out_path = Path(args.output)
592 out_path.parent.mkdir(parents=True, exist_ok=True)
593 out_path.write_text(json.dumps(report, indent=2) + "\n")
594 print(f"\n full report: {out_path}")
595 print()
596
597
598if __name__ == "__main__":
599 asyncio.run(main())