this repo has no description
at main 194 lines 5.6 kB view raw
1"""atproto firehose benchmarks — python (atproto SDK) 2 3decodes a corpus of real firehose frames using the atproto SDK's public API: 4Frame.from_bytes + parse_subscribe_repos_message + CAR.from_bytes. 5under the hood: libipld (Rust extension via PyO3) for CBOR/CAR, 6Python layer wraps results into typed models. 7 8CAR.from_bytes() already decodes every block as DAG-CBOR (via libipld), 9so iterating car.blocks gives us the decoded dicts directly. 10""" 11 12import statistics 13import struct 14import time 15from dataclasses import dataclass 16from pathlib import Path 17 18from atproto import CAR, firehose_models, parse_subscribe_repos_message 19 20WARMUP_PASSES = 2 21MEASURED_PASSES = 5 22FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" 23 24 25@dataclass 26class CorpusInfo: 27 frames: list[bytes] 28 total_bytes: int 29 min_frame: int 30 max_frame: int 31 32 33@dataclass 34class DecodeResult: 35 blocks: int 36 errors: int 37 38 39@dataclass 40class PassResult: 41 frames: int 42 blocks: int 43 errors: int 44 elapsed_ns: int 45 46 47def load_corpus(name: str) -> CorpusInfo: 48 path = FIXTURES_DIR / name 49 if not path.exists(): 50 print(f"cannot open {path}") 51 print("run `just capture` first to generate fixtures") 52 raise FileNotFoundError(path) 53 54 data = path.read_bytes() 55 if len(data) < 4: 56 raise ValueError("corpus file too small") 57 58 (frame_count,) = struct.unpack(">I", data[0:4]) 59 frames: list[bytes] = [] 60 pos = 4 61 total_bytes = 0 62 min_frame = float("inf") 63 max_frame = 0 64 65 for _ in range(frame_count): 66 if pos + 4 > len(data): 67 raise ValueError("truncated corpus") 68 (frame_len,) = struct.unpack(">I", data[pos : pos + 4]) 69 pos += 4 70 if pos + frame_len > len(data): 71 raise ValueError("truncated corpus") 72 frames.append(data[pos : pos + frame_len]) 73 pos += frame_len 74 total_bytes += frame_len 75 min_frame = min(min_frame, frame_len) 76 max_frame = max(max_frame, frame_len) 77 78 return CorpusInfo( 79 frames=frames, 80 total_bytes=total_bytes, 81 min_frame=int(min_frame), 82 max_frame=max_frame, 83 ) 84 85 86def decode_full(frame_data: bytes) -> DecodeResult: 87 """Full decode: frame parse + typed commit + CAR block extraction + per-block decode. 88 89 Never raises — counts failures and continues. 90 """ 91 try: 92 frame = firehose_models.Frame.from_bytes(frame_data) 93 msg = parse_subscribe_repos_message(frame) 94 except Exception: 95 return DecodeResult(blocks=0, errors=1) 96 97 blocks = 0 98 errors = 0 99 100 if msg.blocks: 101 try: 102 car = CAR.from_bytes(msg.blocks) 103 # CAR.from_bytes() already decodes each block as DAG-CBOR via libipld. 104 # iterate to count them — the decode work has already happened. 105 blocks = len(car.blocks) 106 except Exception: 107 errors += 1 108 109 return DecodeResult(blocks=blocks, errors=errors) 110 111 112def bench_decode(corpus: CorpusInfo) -> None: 113 """Full pipeline for each frame: the real API a consumer calls.""" 114 for _ in range(WARMUP_PASSES): 115 for frame_data in corpus.frames: 116 decode_full(frame_data) 117 118 pass_results: list[PassResult] = [] 119 120 for _ in range(MEASURED_PASSES): 121 pass_blocks = 0 122 pass_errors = 0 123 start = time.perf_counter_ns() 124 for frame_data in corpus.frames: 125 result = decode_full(frame_data) 126 pass_blocks += result.blocks 127 pass_errors += result.errors 128 elapsed_ns = time.perf_counter_ns() - start 129 pass_results.append( 130 PassResult( 131 frames=len(corpus.frames), 132 blocks=pass_blocks, 133 errors=pass_errors, 134 elapsed_ns=elapsed_ns, 135 ) 136 ) 137 138 report_result("decode", corpus, pass_results) 139 140 141def report_result( 142 name: str, corpus: CorpusInfo, pass_results: list[PassResult] 143) -> None: 144 fps_values = [r.frames / (r.elapsed_ns / 1_000_000_000) for r in pass_results] 145 fps_values.sort() 146 147 total_frames = sum(r.frames for r in pass_results) 148 total_blocks = sum(r.blocks for r in pass_results) 149 total_errors = sum(r.errors for r in pass_results) 150 total_elapsed_s = sum(r.elapsed_ns for r in pass_results) / 1_000_000_000 151 152 total_bytes = corpus.total_bytes * MEASURED_PASSES 153 throughput_mb = total_bytes / (1024 * 1024) / total_elapsed_s 154 blocks_per_frame = total_blocks / total_frames 155 156 min_fps = fps_values[0] 157 median_fps = statistics.median(fps_values) 158 max_fps = fps_values[-1] 159 160 print( 161 f"{name:<14} {median_fps:>10.0f} frames/sec {throughput_mb:>8.1f} MB/s" 162 f" blocks={total_blocks} ({blocks_per_frame:.2f}/frame) errors={total_errors}" 163 ) 164 print( 165 f"{'':14} variance: min={min_fps:.0f} median={median_fps:.0f} max={max_fps:.0f} frames/sec" 166 ) 167 168 169def main() -> None: 170 print("\n=== python benchmarks ===\n") 171 172 try: 173 corpus = load_corpus("firehose-frames.bin") 174 except Exception as e: 175 print(f"firehose-frames.bin: SKIP ({e})") 176 return 177 178 print(f"corpus: {len(corpus.frames)} frames, {corpus.total_bytes} bytes total") 179 print(f" frame sizes: {corpus.min_frame}..{corpus.max_frame} bytes") 180 print(f" passes: {WARMUP_PASSES} warmup, {MEASURED_PASSES} measured") 181 print() 182 183 # verify first frame 184 result = decode_full(corpus.frames[0]) 185 print(f"first frame: blocks={result.blocks} errors={result.errors}") 186 print() 187 188 bench_decode(corpus) 189 190 print() 191 192 193if __name__ == "__main__": 194 main()