this repo has no description
1"""atproto firehose benchmarks — python (atproto SDK)
2
3decodes a corpus of real firehose frames using the atproto SDK's public API:
4Frame.from_bytes + parse_subscribe_repos_message + CAR.from_bytes.
5under the hood: libipld (Rust extension via PyO3) for CBOR/CAR,
6Python layer wraps results into typed models.
7
8CAR.from_bytes() already decodes every block as DAG-CBOR (via libipld),
9so iterating car.blocks gives us the decoded dicts directly.
10"""
11
12import statistics
13import struct
14import time
15from dataclasses import dataclass
16from pathlib import Path
17
18from atproto import CAR, firehose_models, parse_subscribe_repos_message
19
20WARMUP_PASSES = 2
21MEASURED_PASSES = 5
22FIXTURES_DIR = Path(__file__).parent.parent / "fixtures"
23
24
25@dataclass
26class CorpusInfo:
27 frames: list[bytes]
28 total_bytes: int
29 min_frame: int
30 max_frame: int
31
32
33@dataclass
34class DecodeResult:
35 blocks: int
36 errors: int
37
38
39@dataclass
40class PassResult:
41 frames: int
42 blocks: int
43 errors: int
44 elapsed_ns: int
45
46
47def load_corpus(name: str) -> CorpusInfo:
48 path = FIXTURES_DIR / name
49 if not path.exists():
50 print(f"cannot open {path}")
51 print("run `just capture` first to generate fixtures")
52 raise FileNotFoundError(path)
53
54 data = path.read_bytes()
55 if len(data) < 4:
56 raise ValueError("corpus file too small")
57
58 (frame_count,) = struct.unpack(">I", data[0:4])
59 frames: list[bytes] = []
60 pos = 4
61 total_bytes = 0
62 min_frame = float("inf")
63 max_frame = 0
64
65 for _ in range(frame_count):
66 if pos + 4 > len(data):
67 raise ValueError("truncated corpus")
68 (frame_len,) = struct.unpack(">I", data[pos : pos + 4])
69 pos += 4
70 if pos + frame_len > len(data):
71 raise ValueError("truncated corpus")
72 frames.append(data[pos : pos + frame_len])
73 pos += frame_len
74 total_bytes += frame_len
75 min_frame = min(min_frame, frame_len)
76 max_frame = max(max_frame, frame_len)
77
78 return CorpusInfo(
79 frames=frames,
80 total_bytes=total_bytes,
81 min_frame=int(min_frame),
82 max_frame=max_frame,
83 )
84
85
86def decode_full(frame_data: bytes) -> DecodeResult:
87 """Full decode: frame parse + typed commit + CAR block extraction + per-block decode.
88
89 Never raises — counts failures and continues.
90 """
91 try:
92 frame = firehose_models.Frame.from_bytes(frame_data)
93 msg = parse_subscribe_repos_message(frame)
94 except Exception:
95 return DecodeResult(blocks=0, errors=1)
96
97 blocks = 0
98 errors = 0
99
100 if msg.blocks:
101 try:
102 car = CAR.from_bytes(msg.blocks)
103 # CAR.from_bytes() already decodes each block as DAG-CBOR via libipld.
104 # iterate to count them — the decode work has already happened.
105 blocks = len(car.blocks)
106 except Exception:
107 errors += 1
108
109 return DecodeResult(blocks=blocks, errors=errors)
110
111
112def bench_decode(corpus: CorpusInfo) -> None:
113 """Full pipeline for each frame: the real API a consumer calls."""
114 for _ in range(WARMUP_PASSES):
115 for frame_data in corpus.frames:
116 decode_full(frame_data)
117
118 pass_results: list[PassResult] = []
119
120 for _ in range(MEASURED_PASSES):
121 pass_blocks = 0
122 pass_errors = 0
123 start = time.perf_counter_ns()
124 for frame_data in corpus.frames:
125 result = decode_full(frame_data)
126 pass_blocks += result.blocks
127 pass_errors += result.errors
128 elapsed_ns = time.perf_counter_ns() - start
129 pass_results.append(
130 PassResult(
131 frames=len(corpus.frames),
132 blocks=pass_blocks,
133 errors=pass_errors,
134 elapsed_ns=elapsed_ns,
135 )
136 )
137
138 report_result("decode", corpus, pass_results)
139
140
141def report_result(
142 name: str, corpus: CorpusInfo, pass_results: list[PassResult]
143) -> None:
144 fps_values = [r.frames / (r.elapsed_ns / 1_000_000_000) for r in pass_results]
145 fps_values.sort()
146
147 total_frames = sum(r.frames for r in pass_results)
148 total_blocks = sum(r.blocks for r in pass_results)
149 total_errors = sum(r.errors for r in pass_results)
150 total_elapsed_s = sum(r.elapsed_ns for r in pass_results) / 1_000_000_000
151
152 total_bytes = corpus.total_bytes * MEASURED_PASSES
153 throughput_mb = total_bytes / (1024 * 1024) / total_elapsed_s
154 blocks_per_frame = total_blocks / total_frames
155
156 min_fps = fps_values[0]
157 median_fps = statistics.median(fps_values)
158 max_fps = fps_values[-1]
159
160 print(
161 f"{name:<14} {median_fps:>10.0f} frames/sec {throughput_mb:>8.1f} MB/s"
162 f" blocks={total_blocks} ({blocks_per_frame:.2f}/frame) errors={total_errors}"
163 )
164 print(
165 f"{'':14} variance: min={min_fps:.0f} median={median_fps:.0f} max={max_fps:.0f} frames/sec"
166 )
167
168
169def main() -> None:
170 print("\n=== python benchmarks ===\n")
171
172 try:
173 corpus = load_corpus("firehose-frames.bin")
174 except Exception as e:
175 print(f"firehose-frames.bin: SKIP ({e})")
176 return
177
178 print(f"corpus: {len(corpus.frames)} frames, {corpus.total_bytes} bytes total")
179 print(f" frame sizes: {corpus.min_frame}..{corpus.max_frame} bytes")
180 print(f" passes: {WARMUP_PASSES} warmup, {MEASURED_PASSES} measured")
181 print()
182
183 # verify first frame
184 result = decode_full(corpus.frames[0])
185 print(f"first frame: blocks={result.blocks} errors={result.errors}")
186 print()
187
188 bench_decode(corpus)
189
190 print()
191
192
193if __name__ == "__main__":
194 main()