"""atproto firehose benchmarks — python (atproto SDK) decodes a corpus of real firehose frames using the atproto SDK's public API: Frame.from_bytes + parse_subscribe_repos_message + CAR.from_bytes. under the hood: libipld (Rust extension via PyO3) for CBOR/CAR, Python layer wraps results into typed models. CAR.from_bytes() already decodes every block as DAG-CBOR (via libipld), so iterating car.blocks gives us the decoded dicts directly. """ import statistics import struct import time from dataclasses import dataclass from pathlib import Path from atproto import CAR, firehose_models, parse_subscribe_repos_message WARMUP_PASSES = 2 MEASURED_PASSES = 5 FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" @dataclass class CorpusInfo: frames: list[bytes] total_bytes: int min_frame: int max_frame: int @dataclass class DecodeResult: blocks: int errors: int @dataclass class PassResult: frames: int blocks: int errors: int elapsed_ns: int def load_corpus(name: str) -> CorpusInfo: path = FIXTURES_DIR / name if not path.exists(): print(f"cannot open {path}") print("run `just capture` first to generate fixtures") raise FileNotFoundError(path) data = path.read_bytes() if len(data) < 4: raise ValueError("corpus file too small") (frame_count,) = struct.unpack(">I", data[0:4]) frames: list[bytes] = [] pos = 4 total_bytes = 0 min_frame = float("inf") max_frame = 0 for _ in range(frame_count): if pos + 4 > len(data): raise ValueError("truncated corpus") (frame_len,) = struct.unpack(">I", data[pos : pos + 4]) pos += 4 if pos + frame_len > len(data): raise ValueError("truncated corpus") frames.append(data[pos : pos + frame_len]) pos += frame_len total_bytes += frame_len min_frame = min(min_frame, frame_len) max_frame = max(max_frame, frame_len) return CorpusInfo( frames=frames, total_bytes=total_bytes, min_frame=int(min_frame), max_frame=max_frame, ) def decode_full(frame_data: bytes) -> DecodeResult: """Full decode: frame parse + typed commit + CAR block extraction + per-block decode. Never raises — counts failures and continues. """ try: frame = firehose_models.Frame.from_bytes(frame_data) msg = parse_subscribe_repos_message(frame) except Exception: return DecodeResult(blocks=0, errors=1) blocks = 0 errors = 0 if msg.blocks: try: car = CAR.from_bytes(msg.blocks) # CAR.from_bytes() already decodes each block as DAG-CBOR via libipld. # iterate to count them — the decode work has already happened. blocks = len(car.blocks) except Exception: errors += 1 return DecodeResult(blocks=blocks, errors=errors) def bench_decode(corpus: CorpusInfo) -> None: """Full pipeline for each frame: the real API a consumer calls.""" for _ in range(WARMUP_PASSES): for frame_data in corpus.frames: decode_full(frame_data) pass_results: list[PassResult] = [] for _ in range(MEASURED_PASSES): pass_blocks = 0 pass_errors = 0 start = time.perf_counter_ns() for frame_data in corpus.frames: result = decode_full(frame_data) pass_blocks += result.blocks pass_errors += result.errors elapsed_ns = time.perf_counter_ns() - start pass_results.append( PassResult( frames=len(corpus.frames), blocks=pass_blocks, errors=pass_errors, elapsed_ns=elapsed_ns, ) ) report_result("decode", corpus, pass_results) def report_result( name: str, corpus: CorpusInfo, pass_results: list[PassResult] ) -> None: fps_values = [r.frames / (r.elapsed_ns / 1_000_000_000) for r in pass_results] fps_values.sort() total_frames = sum(r.frames for r in pass_results) total_blocks = sum(r.blocks for r in pass_results) total_errors = sum(r.errors for r in pass_results) total_elapsed_s = sum(r.elapsed_ns for r in pass_results) / 1_000_000_000 total_bytes = corpus.total_bytes * MEASURED_PASSES throughput_mb = total_bytes / (1024 * 1024) / total_elapsed_s blocks_per_frame = total_blocks / total_frames min_fps = fps_values[0] median_fps = statistics.median(fps_values) max_fps = fps_values[-1] print( f"{name:<14} {median_fps:>10.0f} frames/sec {throughput_mb:>8.1f} MB/s" f" blocks={total_blocks} ({blocks_per_frame:.2f}/frame) errors={total_errors}" ) print( f"{'':14} variance: min={min_fps:.0f} median={median_fps:.0f} max={max_fps:.0f} frames/sec" ) def main() -> None: print("\n=== python benchmarks ===\n") try: corpus = load_corpus("firehose-frames.bin") except Exception as e: print(f"firehose-frames.bin: SKIP ({e})") return print(f"corpus: {len(corpus.frames)} frames, {corpus.total_bytes} bytes total") print(f" frame sizes: {corpus.min_frame}..{corpus.max_frame} bytes") print(f" passes: {WARMUP_PASSES} warmup, {MEASURED_PASSES} measured") print() # verify first frame result = decode_full(corpus.frames[0]) print(f"first frame: blocks={result.blocks} errors={result.errors}") print() bench_decode(corpus) print() if __name__ == "__main__": main()