declarative relay deployment on hetzner
relay-eval.waow.tech
atproto
relay
1#!/usr/bin/env -S uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = ["websockets"]
5# ///
6"""
7consume events from an atproto jetstream endpoint and print them.
8
9jetstream re-encodes the relay's CBOR firehose as plain JSON over websockets,
10so no atproto SDK is needed.
11
12usage:
13 ./scripts/jetstream
14 ./scripts/jetstream --duration 30
15 ./scripts/jetstream --url wss://jetstream1.us-east.bsky.network
16 ./scripts/jetstream --collection app.bsky.feed.like
17 ./scripts/jetstream --collection app.bsky.feed.post --collection app.bsky.feed.like
18"""
19
20import argparse
21import json
22import signal
23import time
24from collections import defaultdict
25from urllib.parse import urlencode
26
27import websockets.sync.client as ws
28
29
30def format_event(event: dict) -> str | None:
31 """format a jetstream event for display. returns None to skip."""
32 kind = event.get("kind")
33 did = event.get("did", "?")
34
35 if kind == "commit":
36 commit = event.get("commit", {})
37 collection = commit.get("collection", "")
38 operation = commit.get("operation", "")
39 record = commit.get("record", {})
40
41 if operation == "delete":
42 return None
43
44 if collection == "app.bsky.feed.post":
45 text = record.get("text", "")
46 inline = text.replace("\n", " ")[:120]
47 return f"[{did}] {inline}"
48 elif collection == "app.bsky.feed.like":
49 uri = record.get("subject", {}).get("uri", "?")
50 return f"[{did}] liked {uri}"
51 elif collection == "app.bsky.graph.follow":
52 subject = record.get("subject", "?")
53 return f"[{did}] followed {subject}"
54 else:
55 return f"[{did}] {collection}#{operation}"
56
57 elif kind == "identity":
58 handle = event.get("identity", {}).get("handle", "?")
59 return f"[{did}] identity -> {handle}"
60
61 return None
62
63
64def main():
65 parser = argparse.ArgumentParser(description="consume an atproto jetstream")
66 parser.add_argument(
67 "--url",
68 default="wss://jetstream.waow.tech",
69 help="jetstream base url (default: wss://jetstream.waow.tech)",
70 )
71 parser.add_argument(
72 "--duration",
73 type=int,
74 default=10,
75 help="seconds to consume (default: 10, 0 = forever)",
76 )
77 parser.add_argument(
78 "--collection",
79 action="append",
80 default=None,
81 help="filter by collection (repeatable). omit for all events.",
82 )
83 args = parser.parse_args()
84
85 # build subscribe url with wantedCollections query params
86 params = {}
87 if args.collection:
88 params["wantedCollections"] = args.collection
89 query = urlencode(params, doseq=True)
90 url = f"{args.url.rstrip('/')}/subscribe"
91 if query:
92 url = f"{url}?{query}"
93
94 deadline = time.time() + args.duration if args.duration > 0 else float("inf")
95 counts: dict[str, int] = defaultdict(int)
96 total = 0
97 stopping = False
98
99 def stop(*_):
100 nonlocal stopping
101 stopping = True
102
103 signal.signal(signal.SIGINT, stop)
104
105 collections_desc = ", ".join(args.collection) if args.collection else "all"
106 duration_desc = f"{args.duration}s" if args.duration > 0 else "forever"
107 print(f"consuming from {args.url} for {duration_desc}...")
108 print(f"filtering: {collections_desc}")
109 print()
110
111 try:
112 with ws.connect(url) as conn:
113 while not stopping:
114 if time.time() >= deadline:
115 break
116
117 try:
118 raw = conn.recv(timeout=1.0)
119 except TimeoutError:
120 continue
121
122 event = json.loads(raw)
123 kind = event.get("kind", "unknown")
124
125 collection = ""
126 if kind == "commit":
127 collection = event.get("commit", {}).get("collection", "unknown")
128 counts[collection] += 1
129 else:
130 counts[kind] += 1
131
132 total += 1
133 line = format_event(event)
134 if line:
135 print(line)
136
137 except Exception as e:
138 print(f"\nerror: {e}")
139
140 print()
141 print(f"--- {total} events ---")
142 for col, count in sorted(counts.items()):
143 print(f" {col}: {count}")
144
145
146if __name__ == "__main__":
147 main()