#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["httpx"] # /// """ performance benchmark: typeahead vs bluesky searchActorsTypeahead. measures latency (cold + warm), coverage/overlap, field completeness, display-name search, and stress-tests our API under concurrent load. usage: ./scripts/bench.py # full benchmark against prod ./scripts/bench.py --url http://localhost:8787 # test local dev ./scripts/bench.py --quick # 10 queries, 1 run ./scripts/bench.py --no-stress # skip stress test ./scripts/bench.py --queries nate boorkie # specific queries only ./scripts/bench.py --runs 5 # more runs for confidence """ import argparse import asyncio import json import statistics import sys import time from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from pathlib import Path from urllib.parse import quote import httpx OURS_DEFAULT = "https://typeahead.waow.tech" BSKY = "https://public.api.bsky.app" XRPC = "/xrpc/app.bsky.actor.searchActorsTypeahead" # colors BOLD = "\033[1m" GREEN = "\033[32m" RED = "\033[31m" YELLOW = "\033[33m" CYAN = "\033[36m" RESET = "\033[0m" FULL_CORPUS = [ # short prefixes "a", "na", "sky", "bl", "j", # common names "nate", "sarah", "alex", "dan", "paul", "sam", "chris", "jordan", "mike", "anna", # display-name-only terms (no matching handle expected) "boorkie", "kohari", # specific handles "zzstoatzz", "pfrazee", "jay.bsky.team", "alice", "bob", # unicode "André", "naïve", "café", # multi-word display names "nate kohari", "paul frazee", # edge cases "nate.io", "the", "test", "bot", "news", "art", "dev", "music", # longer/rarer "photographer", "designer", "engineer", "journalist", # more diversity "tokyo", "berlin", "podcast", "crypto", "gaming", ] QUICK_CORPUS = [ "nate", "zzstoatzz", "paul", "boorkie", "sky", "sarah", "André", "nate kohari", "a", "dev", ] DISPLAY_NAME_QUERIES = [ "boorkie", "kohari", "nate kohari", "paul frazee", ] FIELDS_TO_CHECK = ["displayName", "avatar", "createdAt", "associated"] @dataclass class LatencyStats: query: str ours_cold_ms: list[float] = field(default_factory=list) ours_warm_ms: list[float] = field(default_factory=list) bsky_ms: list[float] = field(default_factory=list) def _summarize(self, ms: list[float]) -> dict: if not ms: return {} s = sorted(ms) return { "min": round(min(s), 1), "max": round(max(s), 1), "mean": round(statistics.mean(s), 1), "p50": round(s[len(s) // 2], 1), "p95": round(s[int(len(s) * 0.95)], 1) if len(s) >= 2 else round(max(s), 1), } def summarize(self, side: str) -> dict: if side == "ours_cold": return self._summarize(self.ours_cold_ms) if side == "ours_warm": return self._summarize(self.ours_warm_ms) return self._summarize(self.bsky_ms) @dataclass class CoverageResult: query: str ours_actors: list[dict] bsky_actors: list[dict] ours_dids: list[str] bsky_dids: list[str] overlap: list[str] ours_extras: list[str] bsky_extras: list[str] rank_deltas: list[int] @dataclass class StressResult: concurrency: int total: int ok: int rate_limited: int errors: int latencies_ms: list[float] = field(default_factory=list) def progress(label: str, done: int, total: int): sys.stdout.write(f"\r {label}: {done}/{total}") sys.stdout.flush() def clear_line(): sys.stdout.write("\r" + " " * 70 + "\r") async def timed_fetch( client: httpx.AsyncClient, url: str, timeout: float = 30.0 ) -> tuple[dict | None, float, int]: """fetch JSON, return (body, latency_ms, status_code).""" t0 = time.monotonic() try: r = await client.get(url, timeout=timeout) ms = (time.monotonic() - t0) * 1000 if r.status_code == 200: return r.json(), ms, r.status_code return None, ms, r.status_code except Exception: ms = (time.monotonic() - t0) * 1000 return None, ms, 0 def search_url(base: str, q: str, limit: int = 10) -> str: return f"{base}{XRPC}?q={quote(q)}&limit={limit}" async def run_latency( client: httpx.AsyncClient, ours_url: str, corpus: list[str], runs: int, ) -> list[LatencyStats]: """sequential latency: cold (cache-busted) + warm (cached) + bsky baseline.""" results = [] # cold: runs * 2 reqs/query (ours+bsky), warm: 2 reqs/query (ours+bsky) total = len(corpus) * (runs * 2 + 2) done = 0 for q in corpus: stats = LatencyStats(query=q) # cold runs: vary limit to bust CF cache for run_i in range(runs): limit = 8 + (run_i % 3) _, ms, status = await timed_fetch(client, search_url(ours_url, q, limit)) if status == 200: stats.ours_cold_ms.append(ms) done += 1 progress("latency", done, total) await asyncio.sleep(1.05) _, ms, status = await timed_fetch(client, search_url(BSKY, q, limit)) if status == 200: stats.bsky_ms.append(ms) done += 1 progress("latency", done, total) await asyncio.sleep(0.2) # warm run: repeat exact same request (limit=10, should hit CF cache) _, ms, status = await timed_fetch(client, search_url(ours_url, q, 10)) done += 1 progress("latency", done, total) await asyncio.sleep(1.05) # second hit — this one should be cached _, ms, status = await timed_fetch(client, search_url(ours_url, q, 10)) if status == 200: stats.ours_warm_ms.append(ms) done += 1 progress("latency", done, total) await asyncio.sleep(1.05) results.append(stats) clear_line() return results async def run_coverage_and_fields( client: httpx.AsyncClient, ours_url: str, corpus: list[str], ) -> tuple[list[CoverageResult], dict]: """compare result sets at limit=10; also compute field completeness.""" coverage_results = [] # aggregate field counts ours_field_counts = {f: 0 for f in FIELDS_TO_CHECK} bsky_field_counts = {f: 0 for f in FIELDS_TO_CHECK} ours_actor_total = 0 bsky_actor_total = 0 for i, q in enumerate(corpus): progress("coverage+fields", i + 1, len(corpus)) ours_data, _, _ = await timed_fetch(client, search_url(ours_url, q)) await asyncio.sleep(1.05) bsky_data, _, _ = await timed_fetch(client, search_url(BSKY, q)) await asyncio.sleep(0.2) ours_actors = (ours_data or {}).get("actors", []) bsky_actors = (bsky_data or {}).get("actors", []) ours_dids = [a["did"] for a in ours_actors] bsky_dids = [a["did"] for a in bsky_actors] ours_set = set(ours_dids) bsky_set = set(bsky_dids) overlap = list(ours_set & bsky_set) ours_pos = {d: i for i, d in enumerate(ours_dids)} bsky_pos = {d: i for i, d in enumerate(bsky_dids)} rank_deltas = [abs(ours_pos[d] - bsky_pos[d]) for d in overlap if d in ours_pos and d in bsky_pos] coverage_results.append(CoverageResult( query=q, ours_actors=ours_actors, bsky_actors=bsky_actors, ours_dids=ours_dids, bsky_dids=bsky_dids, overlap=overlap, ours_extras=list(ours_set - bsky_set), bsky_extras=list(bsky_set - ours_set), rank_deltas=rank_deltas, )) # field completeness ours_actor_total += len(ours_actors) bsky_actor_total += len(bsky_actors) for f in FIELDS_TO_CHECK: ours_field_counts[f] += sum(1 for a in ours_actors if a.get(f)) bsky_field_counts[f] += sum(1 for a in bsky_actors if a.get(f)) clear_line() field_summary = { "ours_total": ours_actor_total, "bsky_total": bsky_actor_total, "ours": ours_field_counts, "bsky": bsky_field_counts, } return coverage_results, field_summary async def run_display_name_check( client: httpx.AsyncClient, ours_url: str, queries: list[str], ) -> list[dict]: """verify display-name-only queries return results.""" results = [] for q in queries: ours_data, _, _ = await timed_fetch(client, search_url(ours_url, q)) await asyncio.sleep(1.05) bsky_data, _, _ = await timed_fetch(client, search_url(BSKY, q)) await asyncio.sleep(0.2) ours_actors = (ours_data or {}).get("actors", []) bsky_actors = (bsky_data or {}).get("actors", []) results.append({ "query": q, "ours_count": len(ours_actors), "bsky_count": len(bsky_actors), "found": len(ours_actors) > 0, "ours_sample": [a.get("handle", a.get("did", "?")) for a in ours_actors[:3]], "bsky_sample": [a.get("handle", a.get("did", "?")) for a in bsky_actors[:3]], }) return results async def run_stress( client: httpx.AsyncClient, ours_url: str, corpus: list[str], levels: list[int], ) -> list[StressResult]: """concurrent request stress test (our API only).""" results = [] for n in levels: sys.stdout.write(f"\r stress: concurrency={n}...") sys.stdout.flush() queries = (corpus * ((n // len(corpus)) + 1))[:n] tasks = [ timed_fetch(client, search_url(ours_url, q, limit=7 + (i % 4))) for i, q in enumerate(queries) ] responses = await asyncio.gather(*tasks) sr = StressResult(concurrency=n, total=n, ok=0, rate_limited=0, errors=0) for body, ms, status in responses: sr.latencies_ms.append(ms) if status == 200: sr.ok += 1 elif status == 429: sr.rate_limited += 1 else: sr.errors += 1 results.append(sr) await asyncio.sleep(5) clear_line() return results # ── printing ──────────────────────────────────────────────────────── def pct(n: int, total: int) -> str: return f"{n * 100 / total:.0f}%" if total else "n/a" def fmt_ms(ms: float) -> str: if ms >= 1000: return f"{ms / 1000:.1f}s" return f"{ms:.0f}ms" def print_latency_table(stats_list: list[LatencyStats]): print(f"\n{BOLD}--- latency (cold, cache-busted) ---{RESET}") header = f" {'query':<20} {'ours':>10} {'bsky':>10} {'delta':>10} {'winner':>8}" print(header) print(f" {'─' * 20} {'─' * 10} {'─' * 10} {'─' * 10} {'─' * 8}") all_ours_cold = [] all_bsky = [] ours_wins = 0 total_compared = 0 for s in stats_list: oc = s.summarize("ours_cold") b = s.summarize("bsky") if not oc or not b: continue all_ours_cold.extend(s.ours_cold_ms) all_bsky.extend(s.bsky_ms) op50, bp50 = oc["p50"], b["p50"] delta = op50 - bp50 total_compared += 1 if delta < 0: ours_wins += 1 w_str = f"{GREEN}ours{RESET}" elif delta > 0: w_str = f"{RED}bsky{RESET}" else: w_str = "tie" d_str = f"{'+' if delta > 0 else ''}{fmt_ms(abs(delta)) if delta >= 0 else '-' + fmt_ms(abs(delta))}" print(f" {s.query:<20} {fmt_ms(op50):>10} {fmt_ms(bp50):>10} {d_str:>10} {w_str:>17}") if all_ours_cold and all_bsky: oc50 = sorted(all_ours_cold)[len(all_ours_cold) // 2] oc95 = sorted(all_ours_cold)[int(len(all_ours_cold) * 0.95)] b50 = sorted(all_bsky)[len(all_bsky) // 2] b95 = sorted(all_bsky)[int(len(all_bsky) * 0.95)] print() print(f" {BOLD}cold:{RESET} ours p50={fmt_ms(oc50)} p95={fmt_ms(oc95)} | bsky p50={fmt_ms(b50)} p95={fmt_ms(b95)}") print(f" ours faster on {ours_wins}/{total_compared} queries (cold)") # warm summary all_warm = [] for s in stats_list: all_warm.extend(s.ours_warm_ms) if all_warm: w50 = sorted(all_warm)[len(all_warm) // 2] w95 = sorted(all_warm)[int(len(all_warm) * 0.95)] if len(all_warm) >= 2 else max(all_warm) print(f" {BOLD}warm:{RESET} ours p50={fmt_ms(w50)} p95={fmt_ms(w95)} (CF cache hit)") def print_coverage_table(results: list[CoverageResult]): print(f"\n{BOLD}--- coverage ---{RESET}") print(f" {'query':<20} {'overlap':>10} {'ours':>6} {'bsky':>6} {'pct':>6} {'rank Δ':>8}") print(f" {'─' * 20} {'─' * 10} {'─' * 6} {'─' * 6} {'─' * 6} {'─' * 8}") total_overlap = 0 total_bsky = 0 complete_misses = 0 we_have_more = 0 for r in results: n_overlap = len(r.overlap) n_bsky = len(r.bsky_dids) n_ours = len(r.ours_dids) total_overlap += n_overlap total_bsky += n_bsky p = f"{n_overlap * 100 // n_bsky}%" if n_bsky else "n/a" avg_delta = f"{statistics.mean(r.rank_deltas):.1f}" if r.rank_deltas else "—" if n_ours == 0 and n_bsky > 0: complete_misses += 1 if n_ours > n_bsky: we_have_more += 1 print(f" {r.query:<20} {n_overlap:>3}/{n_bsky:<6} {n_ours:>6} {n_bsky:>6} {p:>6} {avg_delta:>8}") print() mean_pct = total_overlap * 100 / total_bsky if total_bsky else 0 print(f" {BOLD}mean overlap:{RESET} {mean_pct:.0f}%") print(f" complete misses (ours=0, bsky>0): {complete_misses}/{len(results)}") print(f" queries where we have more results: {we_have_more}/{len(results)}") def print_field_table(field_summary: dict): print(f"\n{BOLD}--- field completeness ---{RESET}") ours_total = field_summary["ours_total"] bsky_total = field_summary["bsky_total"] print(f" {'field':<16} {'ours':>8} {'bsky':>8}") print(f" {'─' * 16} {'─' * 8} {'─' * 8}") for f in FIELDS_TO_CHECK: o = pct(field_summary["ours"][f], ours_total) b = pct(field_summary["bsky"][f], bsky_total) print(f" {f:<16} {o:>8} {b:>8}") print(f" {'─' * 16} {'─' * 8} {'─' * 8}") print(f" {'total actors':<16} {ours_total:>8} {bsky_total:>8}") def print_display_name_table(results: list[dict]): print(f"\n{BOLD}--- display name search ---{RESET}") print(f" {'query':<20} {'found?':>8} {'ours':>6} {'bsky':>6} samples") print(f" {'─' * 20} {'─' * 8} {'─' * 6} {'─' * 6} {'─' * 30}") for r in results: found = f"{GREEN}yes{RESET}" if r["found"] else f"{RED}no{RESET}" samples = ", ".join(r["ours_sample"][:3]) if r["ours_sample"] else "—" print(f" {r['query']:<20} {found:>17} {r['ours_count']:>6} {r['bsky_count']:>6} {samples}") def print_stress_table(results: list[StressResult]): print(f"\n{BOLD}--- stress test (ours only) ---{RESET}") print(f" {'concurrency':>12} {'ok':>6} {'429s':>6} {'5xx':>6} {'p50':>8} {'p95':>8}") print(f" {'─' * 12} {'─' * 6} {'─' * 6} {'─' * 6} {'─' * 8} {'─' * 8}") for r in results: lats = sorted(r.latencies_ms) p50 = fmt_ms(lats[len(lats) // 2]) if lats else "—" p95 = fmt_ms(lats[int(len(lats) * 0.95)]) if len(lats) >= 2 else p50 print(f" {r.concurrency:>12} {r.ok:>6} {r.rate_limited:>6} {r.errors:>6} {p50:>8} {p95:>8}") # ── JSON report ───────────────────────────────────────────────────── def build_report( ours_url: str, corpus: list[str], runs: int, latency: list[LatencyStats], coverage: list[CoverageResult], field_summary: dict, display_name: list[dict], stress: list[StressResult], ) -> dict: def latency_entry(s: LatencyStats) -> dict: return { "query": s.query, "ours_cold": s.summarize("ours_cold"), "ours_warm": s.summarize("ours_warm"), "bsky": s.summarize("bsky"), } def coverage_entry(r: CoverageResult) -> dict: return { "query": r.query, "ours_count": len(r.ours_dids), "bsky_count": len(r.bsky_dids), "overlap_count": len(r.overlap), "overlap_pct": round(len(r.overlap) * 100 / len(r.bsky_dids), 1) if r.bsky_dids else None, "ours_extras": len(r.ours_extras), "bsky_extras": len(r.bsky_extras), "avg_rank_delta": round(statistics.mean(r.rank_deltas), 2) if r.rank_deltas else None, } def stress_entry(r: StressResult) -> dict: lats = sorted(r.latencies_ms) return { "concurrency": r.concurrency, "ok": r.ok, "rate_limited": r.rate_limited, "errors": r.errors, "p50_ms": round(lats[len(lats) // 2], 1) if lats else None, "p95_ms": round(lats[int(len(lats) * 0.95)], 1) if len(lats) >= 2 else None, } return { "meta": { "target": ours_url, "baseline": BSKY, "corpus_size": len(corpus), "runs": runs, "date": datetime.now(timezone.utc).isoformat(), }, "latency": [latency_entry(s) for s in latency], "coverage": [coverage_entry(r) for r in coverage], "field_completeness": field_summary, "display_name_search": display_name, "stress": [stress_entry(r) for r in stress], } # ── main ──────────────────────────────────────────────────────────── async def main(): parser = argparse.ArgumentParser(description="typeahead performance benchmark") parser.add_argument("--url", default=OURS_DEFAULT, help=f"our API URL (default: {OURS_DEFAULT})") parser.add_argument("--quick", action="store_true", help="10 queries, 1 run") parser.add_argument("--no-stress", action="store_true", help="skip stress test") parser.add_argument("--queries", nargs="+", help="specific queries only") parser.add_argument("--runs", type=int, default=3, help="runs per query for latency (default: 3)") parser.add_argument("--output", default="scripts/bench-results.json", help="JSON report path") args = parser.parse_args() if args.quick: args.runs = 1 corpus = args.queries or (QUICK_CORPUS if args.quick else FULL_CORPUS) dn_queries = [q for q in DISPLAY_NAME_QUERIES if q in corpus] or DISPLAY_NAME_QUERIES[:2] print(f"\n{BOLD}=== typeahead benchmark ==={RESET}") print(f" target: {args.url}") print(f" baseline: {BSKY}") print(f" corpus: {len(corpus)} queries, {args.runs} run(s) each") print(f" date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}") async with httpx.AsyncClient( headers={"User-Agent": "typeahead-bench/1.0", "X-Client": "bench"}, follow_redirects=True, ) as client: # 1. latency (cold + warm) print(f"\n{CYAN}[1/4] latency comparison (cold + warm){RESET}") latency = await run_latency(client, args.url, corpus, args.runs) print_latency_table(latency) # 2. coverage + field completeness (single pass) print(f"\n{CYAN}[2/4] coverage + field completeness{RESET}") coverage, field_summary = await run_coverage_and_fields(client, args.url, corpus) print_coverage_table(coverage) print_field_table(field_summary) # 3. display name search print(f"\n{CYAN}[3/4] display name search{RESET}") display_name = await run_display_name_check(client, args.url, dn_queries) print_display_name_table(display_name) # 4. stress test stress = [] if args.no_stress: print(f"\n{CYAN}[4/4] stress test{RESET}") print(f" {YELLOW}skipped{RESET}") else: print(f"\n{CYAN}[4/4] stress test (ours only){RESET}") stress = await run_stress(client, args.url, corpus, [5, 10, 20]) print_stress_table(stress) # write report report = build_report(args.url, corpus, args.runs, latency, coverage, field_summary, display_name, stress) out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(report, indent=2) + "\n") print(f"\n full report: {out_path}") print() if __name__ == "__main__": asyncio.run(main())