GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
at main 599 lines 21 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["httpx"] 5# /// 6""" 7performance benchmark: typeahead vs bluesky searchActorsTypeahead. 8 9measures latency (cold + warm), coverage/overlap, field completeness, 10display-name search, and stress-tests our API under concurrent load. 11 12usage: 13 ./scripts/bench.py # full benchmark against prod 14 ./scripts/bench.py --url http://localhost:8787 # test local dev 15 ./scripts/bench.py --quick # 10 queries, 1 run 16 ./scripts/bench.py --no-stress # skip stress test 17 ./scripts/bench.py --queries nate boorkie # specific queries only 18 ./scripts/bench.py --runs 5 # more runs for confidence 19""" 20 21import argparse 22import asyncio 23import json 24import statistics 25import sys 26import time 27from dataclasses import dataclass, field, asdict 28from datetime import datetime, timezone 29from pathlib import Path 30from urllib.parse import quote 31 32import httpx 33 34OURS_DEFAULT = "https://typeahead.waow.tech" 35BSKY = "https://public.api.bsky.app" 36XRPC = "/xrpc/app.bsky.actor.searchActorsTypeahead" 37 38# colors 39BOLD = "\033[1m" 40GREEN = "\033[32m" 41RED = "\033[31m" 42YELLOW = "\033[33m" 43CYAN = "\033[36m" 44RESET = "\033[0m" 45 46FULL_CORPUS = [ 47 # short prefixes 48 "a", "na", "sky", "bl", "j", 49 # common names 50 "nate", "sarah", "alex", "dan", "paul", "sam", "chris", "jordan", "mike", "anna", 51 # display-name-only terms (no matching handle expected) 52 "boorkie", "kohari", 53 # specific handles 54 "zzstoatzz", "pfrazee", "jay.bsky.team", "alice", "bob", 55 # unicode 56 "André", "naïve", "café", 57 # multi-word display names 58 "nate kohari", "paul frazee", 59 # edge cases 60 "nate.io", "the", "test", "bot", "news", "art", "dev", "music", 61 # longer/rarer 62 "photographer", "designer", "engineer", "journalist", 63 # more diversity 64 "tokyo", "berlin", "podcast", "crypto", "gaming", 65] 66 67QUICK_CORPUS = [ 68 "nate", "zzstoatzz", "paul", "boorkie", "sky", 69 "sarah", "André", "nate kohari", "a", "dev", 70] 71 72DISPLAY_NAME_QUERIES = [ 73 "boorkie", "kohari", "nate kohari", "paul frazee", 74] 75 76FIELDS_TO_CHECK = ["displayName", "avatar", "createdAt", "associated"] 77 78 79@dataclass 80class LatencyStats: 81 query: str 82 ours_cold_ms: list[float] = field(default_factory=list) 83 ours_warm_ms: list[float] = field(default_factory=list) 84 bsky_ms: list[float] = field(default_factory=list) 85 86 def _summarize(self, ms: list[float]) -> dict: 87 if not ms: 88 return {} 89 s = sorted(ms) 90 return { 91 "min": round(min(s), 1), 92 "max": round(max(s), 1), 93 "mean": round(statistics.mean(s), 1), 94 "p50": round(s[len(s) // 2], 1), 95 "p95": round(s[int(len(s) * 0.95)], 1) if len(s) >= 2 else round(max(s), 1), 96 } 97 98 def summarize(self, side: str) -> dict: 99 if side == "ours_cold": 100 return self._summarize(self.ours_cold_ms) 101 if side == "ours_warm": 102 return self._summarize(self.ours_warm_ms) 103 return self._summarize(self.bsky_ms) 104 105 106@dataclass 107class CoverageResult: 108 query: str 109 ours_actors: list[dict] 110 bsky_actors: list[dict] 111 ours_dids: list[str] 112 bsky_dids: list[str] 113 overlap: list[str] 114 ours_extras: list[str] 115 bsky_extras: list[str] 116 rank_deltas: list[int] 117 118 119@dataclass 120class StressResult: 121 concurrency: int 122 total: int 123 ok: int 124 rate_limited: int 125 errors: int 126 latencies_ms: list[float] = field(default_factory=list) 127 128 129def progress(label: str, done: int, total: int): 130 sys.stdout.write(f"\r {label}: {done}/{total}") 131 sys.stdout.flush() 132 133 134def clear_line(): 135 sys.stdout.write("\r" + " " * 70 + "\r") 136 137 138async def timed_fetch( 139 client: httpx.AsyncClient, url: str, timeout: float = 30.0 140) -> tuple[dict | None, float, int]: 141 """fetch JSON, return (body, latency_ms, status_code).""" 142 t0 = time.monotonic() 143 try: 144 r = await client.get(url, timeout=timeout) 145 ms = (time.monotonic() - t0) * 1000 146 if r.status_code == 200: 147 return r.json(), ms, r.status_code 148 return None, ms, r.status_code 149 except Exception: 150 ms = (time.monotonic() - t0) * 1000 151 return None, ms, 0 152 153 154def search_url(base: str, q: str, limit: int = 10) -> str: 155 return f"{base}{XRPC}?q={quote(q)}&limit={limit}" 156 157 158async def run_latency( 159 client: httpx.AsyncClient, 160 ours_url: str, 161 corpus: list[str], 162 runs: int, 163) -> list[LatencyStats]: 164 """sequential latency: cold (cache-busted) + warm (cached) + bsky baseline.""" 165 results = [] 166 # cold: runs * 2 reqs/query (ours+bsky), warm: 2 reqs/query (ours+bsky) 167 total = len(corpus) * (runs * 2 + 2) 168 done = 0 169 170 for q in corpus: 171 stats = LatencyStats(query=q) 172 173 # cold runs: vary limit to bust CF cache 174 for run_i in range(runs): 175 limit = 8 + (run_i % 3) 176 177 _, ms, status = await timed_fetch(client, search_url(ours_url, q, limit)) 178 if status == 200: 179 stats.ours_cold_ms.append(ms) 180 done += 1 181 progress("latency", done, total) 182 await asyncio.sleep(1.05) 183 184 _, ms, status = await timed_fetch(client, search_url(BSKY, q, limit)) 185 if status == 200: 186 stats.bsky_ms.append(ms) 187 done += 1 188 progress("latency", done, total) 189 await asyncio.sleep(0.2) 190 191 # warm run: repeat exact same request (limit=10, should hit CF cache) 192 _, ms, status = await timed_fetch(client, search_url(ours_url, q, 10)) 193 done += 1 194 progress("latency", done, total) 195 await asyncio.sleep(1.05) 196 # second hit — this one should be cached 197 _, ms, status = await timed_fetch(client, search_url(ours_url, q, 10)) 198 if status == 200: 199 stats.ours_warm_ms.append(ms) 200 done += 1 201 progress("latency", done, total) 202 await asyncio.sleep(1.05) 203 204 results.append(stats) 205 206 clear_line() 207 return results 208 209 210async def run_coverage_and_fields( 211 client: httpx.AsyncClient, 212 ours_url: str, 213 corpus: list[str], 214) -> tuple[list[CoverageResult], dict]: 215 """compare result sets at limit=10; also compute field completeness.""" 216 coverage_results = [] 217 218 # aggregate field counts 219 ours_field_counts = {f: 0 for f in FIELDS_TO_CHECK} 220 bsky_field_counts = {f: 0 for f in FIELDS_TO_CHECK} 221 ours_actor_total = 0 222 bsky_actor_total = 0 223 224 for i, q in enumerate(corpus): 225 progress("coverage+fields", i + 1, len(corpus)) 226 227 ours_data, _, _ = await timed_fetch(client, search_url(ours_url, q)) 228 await asyncio.sleep(1.05) 229 bsky_data, _, _ = await timed_fetch(client, search_url(BSKY, q)) 230 await asyncio.sleep(0.2) 231 232 ours_actors = (ours_data or {}).get("actors", []) 233 bsky_actors = (bsky_data or {}).get("actors", []) 234 235 ours_dids = [a["did"] for a in ours_actors] 236 bsky_dids = [a["did"] for a in bsky_actors] 237 238 ours_set = set(ours_dids) 239 bsky_set = set(bsky_dids) 240 overlap = list(ours_set & bsky_set) 241 242 ours_pos = {d: i for i, d in enumerate(ours_dids)} 243 bsky_pos = {d: i for i, d in enumerate(bsky_dids)} 244 rank_deltas = [abs(ours_pos[d] - bsky_pos[d]) for d in overlap if d in ours_pos and d in bsky_pos] 245 246 coverage_results.append(CoverageResult( 247 query=q, 248 ours_actors=ours_actors, 249 bsky_actors=bsky_actors, 250 ours_dids=ours_dids, 251 bsky_dids=bsky_dids, 252 overlap=overlap, 253 ours_extras=list(ours_set - bsky_set), 254 bsky_extras=list(bsky_set - ours_set), 255 rank_deltas=rank_deltas, 256 )) 257 258 # field completeness 259 ours_actor_total += len(ours_actors) 260 bsky_actor_total += len(bsky_actors) 261 for f in FIELDS_TO_CHECK: 262 ours_field_counts[f] += sum(1 for a in ours_actors if a.get(f)) 263 bsky_field_counts[f] += sum(1 for a in bsky_actors if a.get(f)) 264 265 clear_line() 266 267 field_summary = { 268 "ours_total": ours_actor_total, 269 "bsky_total": bsky_actor_total, 270 "ours": ours_field_counts, 271 "bsky": bsky_field_counts, 272 } 273 return coverage_results, field_summary 274 275 276async def run_display_name_check( 277 client: httpx.AsyncClient, 278 ours_url: str, 279 queries: list[str], 280) -> list[dict]: 281 """verify display-name-only queries return results.""" 282 results = [] 283 for q in queries: 284 ours_data, _, _ = await timed_fetch(client, search_url(ours_url, q)) 285 await asyncio.sleep(1.05) 286 bsky_data, _, _ = await timed_fetch(client, search_url(BSKY, q)) 287 await asyncio.sleep(0.2) 288 289 ours_actors = (ours_data or {}).get("actors", []) 290 bsky_actors = (bsky_data or {}).get("actors", []) 291 results.append({ 292 "query": q, 293 "ours_count": len(ours_actors), 294 "bsky_count": len(bsky_actors), 295 "found": len(ours_actors) > 0, 296 "ours_sample": [a.get("handle", a.get("did", "?")) for a in ours_actors[:3]], 297 "bsky_sample": [a.get("handle", a.get("did", "?")) for a in bsky_actors[:3]], 298 }) 299 300 return results 301 302 303async def run_stress( 304 client: httpx.AsyncClient, 305 ours_url: str, 306 corpus: list[str], 307 levels: list[int], 308) -> list[StressResult]: 309 """concurrent request stress test (our API only).""" 310 results = [] 311 312 for n in levels: 313 sys.stdout.write(f"\r stress: concurrency={n}...") 314 sys.stdout.flush() 315 316 queries = (corpus * ((n // len(corpus)) + 1))[:n] 317 tasks = [ 318 timed_fetch(client, search_url(ours_url, q, limit=7 + (i % 4))) 319 for i, q in enumerate(queries) 320 ] 321 322 responses = await asyncio.gather(*tasks) 323 324 sr = StressResult(concurrency=n, total=n, ok=0, rate_limited=0, errors=0) 325 for body, ms, status in responses: 326 sr.latencies_ms.append(ms) 327 if status == 200: 328 sr.ok += 1 329 elif status == 429: 330 sr.rate_limited += 1 331 else: 332 sr.errors += 1 333 334 results.append(sr) 335 await asyncio.sleep(5) 336 337 clear_line() 338 return results 339 340 341# ── printing ──────────────────────────────────────────────────────── 342 343def pct(n: int, total: int) -> str: 344 return f"{n * 100 / total:.0f}%" if total else "n/a" 345 346 347def fmt_ms(ms: float) -> str: 348 if ms >= 1000: 349 return f"{ms / 1000:.1f}s" 350 return f"{ms:.0f}ms" 351 352 353def print_latency_table(stats_list: list[LatencyStats]): 354 print(f"\n{BOLD}--- latency (cold, cache-busted) ---{RESET}") 355 header = f" {'query':<20} {'ours':>10} {'bsky':>10} {'delta':>10} {'winner':>8}" 356 print(header) 357 print(f" {'' * 20} {'' * 10} {'' * 10} {'' * 10} {'' * 8}") 358 359 all_ours_cold = [] 360 all_bsky = [] 361 ours_wins = 0 362 total_compared = 0 363 364 for s in stats_list: 365 oc = s.summarize("ours_cold") 366 b = s.summarize("bsky") 367 if not oc or not b: 368 continue 369 370 all_ours_cold.extend(s.ours_cold_ms) 371 all_bsky.extend(s.bsky_ms) 372 373 op50, bp50 = oc["p50"], b["p50"] 374 delta = op50 - bp50 375 total_compared += 1 376 if delta < 0: 377 ours_wins += 1 378 w_str = f"{GREEN}ours{RESET}" 379 elif delta > 0: 380 w_str = f"{RED}bsky{RESET}" 381 else: 382 w_str = "tie" 383 384 d_str = f"{'+' if delta > 0 else ''}{fmt_ms(abs(delta)) if delta >= 0 else '-' + fmt_ms(abs(delta))}" 385 print(f" {s.query:<20} {fmt_ms(op50):>10} {fmt_ms(bp50):>10} {d_str:>10} {w_str:>17}") 386 387 if all_ours_cold and all_bsky: 388 oc50 = sorted(all_ours_cold)[len(all_ours_cold) // 2] 389 oc95 = sorted(all_ours_cold)[int(len(all_ours_cold) * 0.95)] 390 b50 = sorted(all_bsky)[len(all_bsky) // 2] 391 b95 = sorted(all_bsky)[int(len(all_bsky) * 0.95)] 392 print() 393 print(f" {BOLD}cold:{RESET} ours p50={fmt_ms(oc50)} p95={fmt_ms(oc95)} | bsky p50={fmt_ms(b50)} p95={fmt_ms(b95)}") 394 print(f" ours faster on {ours_wins}/{total_compared} queries (cold)") 395 396 # warm summary 397 all_warm = [] 398 for s in stats_list: 399 all_warm.extend(s.ours_warm_ms) 400 if all_warm: 401 w50 = sorted(all_warm)[len(all_warm) // 2] 402 w95 = sorted(all_warm)[int(len(all_warm) * 0.95)] if len(all_warm) >= 2 else max(all_warm) 403 print(f" {BOLD}warm:{RESET} ours p50={fmt_ms(w50)} p95={fmt_ms(w95)} (CF cache hit)") 404 405 406def print_coverage_table(results: list[CoverageResult]): 407 print(f"\n{BOLD}--- coverage ---{RESET}") 408 print(f" {'query':<20} {'overlap':>10} {'ours':>6} {'bsky':>6} {'pct':>6} {'rank Δ':>8}") 409 print(f" {'' * 20} {'' * 10} {'' * 6} {'' * 6} {'' * 6} {'' * 8}") 410 411 total_overlap = 0 412 total_bsky = 0 413 complete_misses = 0 414 we_have_more = 0 415 416 for r in results: 417 n_overlap = len(r.overlap) 418 n_bsky = len(r.bsky_dids) 419 n_ours = len(r.ours_dids) 420 total_overlap += n_overlap 421 total_bsky += n_bsky 422 423 p = f"{n_overlap * 100 // n_bsky}%" if n_bsky else "n/a" 424 avg_delta = f"{statistics.mean(r.rank_deltas):.1f}" if r.rank_deltas else "" 425 426 if n_ours == 0 and n_bsky > 0: 427 complete_misses += 1 428 if n_ours > n_bsky: 429 we_have_more += 1 430 431 print(f" {r.query:<20} {n_overlap:>3}/{n_bsky:<6} {n_ours:>6} {n_bsky:>6} {p:>6} {avg_delta:>8}") 432 433 print() 434 mean_pct = total_overlap * 100 / total_bsky if total_bsky else 0 435 print(f" {BOLD}mean overlap:{RESET} {mean_pct:.0f}%") 436 print(f" complete misses (ours=0, bsky>0): {complete_misses}/{len(results)}") 437 print(f" queries where we have more results: {we_have_more}/{len(results)}") 438 439 440def print_field_table(field_summary: dict): 441 print(f"\n{BOLD}--- field completeness ---{RESET}") 442 ours_total = field_summary["ours_total"] 443 bsky_total = field_summary["bsky_total"] 444 445 print(f" {'field':<16} {'ours':>8} {'bsky':>8}") 446 print(f" {'' * 16} {'' * 8} {'' * 8}") 447 for f in FIELDS_TO_CHECK: 448 o = pct(field_summary["ours"][f], ours_total) 449 b = pct(field_summary["bsky"][f], bsky_total) 450 print(f" {f:<16} {o:>8} {b:>8}") 451 print(f" {'' * 16} {'' * 8} {'' * 8}") 452 print(f" {'total actors':<16} {ours_total:>8} {bsky_total:>8}") 453 454 455def print_display_name_table(results: list[dict]): 456 print(f"\n{BOLD}--- display name search ---{RESET}") 457 print(f" {'query':<20} {'found?':>8} {'ours':>6} {'bsky':>6} samples") 458 print(f" {'' * 20} {'' * 8} {'' * 6} {'' * 6} {'' * 30}") 459 for r in results: 460 found = f"{GREEN}yes{RESET}" if r["found"] else f"{RED}no{RESET}" 461 samples = ", ".join(r["ours_sample"][:3]) if r["ours_sample"] else "" 462 print(f" {r['query']:<20} {found:>17} {r['ours_count']:>6} {r['bsky_count']:>6} {samples}") 463 464 465def print_stress_table(results: list[StressResult]): 466 print(f"\n{BOLD}--- stress test (ours only) ---{RESET}") 467 print(f" {'concurrency':>12} {'ok':>6} {'429s':>6} {'5xx':>6} {'p50':>8} {'p95':>8}") 468 print(f" {'' * 12} {'' * 6} {'' * 6} {'' * 6} {'' * 8} {'' * 8}") 469 for r in results: 470 lats = sorted(r.latencies_ms) 471 p50 = fmt_ms(lats[len(lats) // 2]) if lats else "" 472 p95 = fmt_ms(lats[int(len(lats) * 0.95)]) if len(lats) >= 2 else p50 473 print(f" {r.concurrency:>12} {r.ok:>6} {r.rate_limited:>6} {r.errors:>6} {p50:>8} {p95:>8}") 474 475 476# ── JSON report ───────────────────────────────────────────────────── 477 478def build_report( 479 ours_url: str, 480 corpus: list[str], 481 runs: int, 482 latency: list[LatencyStats], 483 coverage: list[CoverageResult], 484 field_summary: dict, 485 display_name: list[dict], 486 stress: list[StressResult], 487) -> dict: 488 def latency_entry(s: LatencyStats) -> dict: 489 return { 490 "query": s.query, 491 "ours_cold": s.summarize("ours_cold"), 492 "ours_warm": s.summarize("ours_warm"), 493 "bsky": s.summarize("bsky"), 494 } 495 496 def coverage_entry(r: CoverageResult) -> dict: 497 return { 498 "query": r.query, 499 "ours_count": len(r.ours_dids), 500 "bsky_count": len(r.bsky_dids), 501 "overlap_count": len(r.overlap), 502 "overlap_pct": round(len(r.overlap) * 100 / len(r.bsky_dids), 1) if r.bsky_dids else None, 503 "ours_extras": len(r.ours_extras), 504 "bsky_extras": len(r.bsky_extras), 505 "avg_rank_delta": round(statistics.mean(r.rank_deltas), 2) if r.rank_deltas else None, 506 } 507 508 def stress_entry(r: StressResult) -> dict: 509 lats = sorted(r.latencies_ms) 510 return { 511 "concurrency": r.concurrency, 512 "ok": r.ok, 513 "rate_limited": r.rate_limited, 514 "errors": r.errors, 515 "p50_ms": round(lats[len(lats) // 2], 1) if lats else None, 516 "p95_ms": round(lats[int(len(lats) * 0.95)], 1) if len(lats) >= 2 else None, 517 } 518 519 return { 520 "meta": { 521 "target": ours_url, 522 "baseline": BSKY, 523 "corpus_size": len(corpus), 524 "runs": runs, 525 "date": datetime.now(timezone.utc).isoformat(), 526 }, 527 "latency": [latency_entry(s) for s in latency], 528 "coverage": [coverage_entry(r) for r in coverage], 529 "field_completeness": field_summary, 530 "display_name_search": display_name, 531 "stress": [stress_entry(r) for r in stress], 532 } 533 534 535# ── main ──────────────────────────────────────────────────────────── 536 537async def main(): 538 parser = argparse.ArgumentParser(description="typeahead performance benchmark") 539 parser.add_argument("--url", default=OURS_DEFAULT, help=f"our API URL (default: {OURS_DEFAULT})") 540 parser.add_argument("--quick", action="store_true", help="10 queries, 1 run") 541 parser.add_argument("--no-stress", action="store_true", help="skip stress test") 542 parser.add_argument("--queries", nargs="+", help="specific queries only") 543 parser.add_argument("--runs", type=int, default=3, help="runs per query for latency (default: 3)") 544 parser.add_argument("--output", default="scripts/bench-results.json", help="JSON report path") 545 args = parser.parse_args() 546 547 if args.quick: 548 args.runs = 1 549 550 corpus = args.queries or (QUICK_CORPUS if args.quick else FULL_CORPUS) 551 dn_queries = [q for q in DISPLAY_NAME_QUERIES if q in corpus] or DISPLAY_NAME_QUERIES[:2] 552 553 print(f"\n{BOLD}=== typeahead benchmark ==={RESET}") 554 print(f" target: {args.url}") 555 print(f" baseline: {BSKY}") 556 print(f" corpus: {len(corpus)} queries, {args.runs} run(s) each") 557 print(f" date: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}") 558 559 async with httpx.AsyncClient( 560 headers={"User-Agent": "typeahead-bench/1.0", "X-Client": "bench"}, 561 follow_redirects=True, 562 ) as client: 563 # 1. latency (cold + warm) 564 print(f"\n{CYAN}[1/4] latency comparison (cold + warm){RESET}") 565 latency = await run_latency(client, args.url, corpus, args.runs) 566 print_latency_table(latency) 567 568 # 2. coverage + field completeness (single pass) 569 print(f"\n{CYAN}[2/4] coverage + field completeness{RESET}") 570 coverage, field_summary = await run_coverage_and_fields(client, args.url, corpus) 571 print_coverage_table(coverage) 572 print_field_table(field_summary) 573 574 # 3. display name search 575 print(f"\n{CYAN}[3/4] display name search{RESET}") 576 display_name = await run_display_name_check(client, args.url, dn_queries) 577 print_display_name_table(display_name) 578 579 # 4. stress test 580 stress = [] 581 if args.no_stress: 582 print(f"\n{CYAN}[4/4] stress test{RESET}") 583 print(f" {YELLOW}skipped{RESET}") 584 else: 585 print(f"\n{CYAN}[4/4] stress test (ours only){RESET}") 586 stress = await run_stress(client, args.url, corpus, [5, 10, 20]) 587 print_stress_table(stress) 588 589 # write report 590 report = build_report(args.url, corpus, args.runs, latency, coverage, field_summary, display_name, stress) 591 out_path = Path(args.output) 592 out_path.parent.mkdir(parents=True, exist_ok=True) 593 out_path.write_text(json.dumps(report, indent=2) + "\n") 594 print(f"\n full report: {out_path}") 595 print() 596 597 598if __name__ == "__main__": 599 asyncio.run(main())