declarative relay deployment on hetzner relay-eval.waow.tech
atproto relay
at main 147 lines 4.4 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["websockets"] 5# /// 6""" 7consume events from an atproto jetstream endpoint and print them. 8 9jetstream re-encodes the relay's CBOR firehose as plain JSON over websockets, 10so no atproto SDK is needed. 11 12usage: 13 ./scripts/jetstream 14 ./scripts/jetstream --duration 30 15 ./scripts/jetstream --url wss://jetstream1.us-east.bsky.network 16 ./scripts/jetstream --collection app.bsky.feed.like 17 ./scripts/jetstream --collection app.bsky.feed.post --collection app.bsky.feed.like 18""" 19 20import argparse 21import json 22import signal 23import time 24from collections import defaultdict 25from urllib.parse import urlencode 26 27import websockets.sync.client as ws 28 29 30def format_event(event: dict) -> str | None: 31 """format a jetstream event for display. returns None to skip.""" 32 kind = event.get("kind") 33 did = event.get("did", "?") 34 35 if kind == "commit": 36 commit = event.get("commit", {}) 37 collection = commit.get("collection", "") 38 operation = commit.get("operation", "") 39 record = commit.get("record", {}) 40 41 if operation == "delete": 42 return None 43 44 if collection == "app.bsky.feed.post": 45 text = record.get("text", "") 46 inline = text.replace("\n", " ")[:120] 47 return f"[{did}] {inline}" 48 elif collection == "app.bsky.feed.like": 49 uri = record.get("subject", {}).get("uri", "?") 50 return f"[{did}] liked {uri}" 51 elif collection == "app.bsky.graph.follow": 52 subject = record.get("subject", "?") 53 return f"[{did}] followed {subject}" 54 else: 55 return f"[{did}] {collection}#{operation}" 56 57 elif kind == "identity": 58 handle = event.get("identity", {}).get("handle", "?") 59 return f"[{did}] identity -> {handle}" 60 61 return None 62 63 64def main(): 65 parser = argparse.ArgumentParser(description="consume an atproto jetstream") 66 parser.add_argument( 67 "--url", 68 default="wss://jetstream.waow.tech", 69 help="jetstream base url (default: wss://jetstream.waow.tech)", 70 ) 71 parser.add_argument( 72 "--duration", 73 type=int, 74 default=10, 75 help="seconds to consume (default: 10, 0 = forever)", 76 ) 77 parser.add_argument( 78 "--collection", 79 action="append", 80 default=None, 81 help="filter by collection (repeatable). omit for all events.", 82 ) 83 args = parser.parse_args() 84 85 # build subscribe url with wantedCollections query params 86 params = {} 87 if args.collection: 88 params["wantedCollections"] = args.collection 89 query = urlencode(params, doseq=True) 90 url = f"{args.url.rstrip('/')}/subscribe" 91 if query: 92 url = f"{url}?{query}" 93 94 deadline = time.time() + args.duration if args.duration > 0 else float("inf") 95 counts: dict[str, int] = defaultdict(int) 96 total = 0 97 stopping = False 98 99 def stop(*_): 100 nonlocal stopping 101 stopping = True 102 103 signal.signal(signal.SIGINT, stop) 104 105 collections_desc = ", ".join(args.collection) if args.collection else "all" 106 duration_desc = f"{args.duration}s" if args.duration > 0 else "forever" 107 print(f"consuming from {args.url} for {duration_desc}...") 108 print(f"filtering: {collections_desc}") 109 print() 110 111 try: 112 with ws.connect(url) as conn: 113 while not stopping: 114 if time.time() >= deadline: 115 break 116 117 try: 118 raw = conn.recv(timeout=1.0) 119 except TimeoutError: 120 continue 121 122 event = json.loads(raw) 123 kind = event.get("kind", "unknown") 124 125 collection = "" 126 if kind == "commit": 127 collection = event.get("commit", {}).get("collection", "unknown") 128 counts[collection] += 1 129 else: 130 counts[kind] += 1 131 132 total += 1 133 line = format_event(event) 134 if line: 135 print(line) 136 137 except Exception as e: 138 print(f"\nerror: {e}") 139 140 print() 141 print(f"--- {total} events ---") 142 for col, count in sorted(counts.items()): 143 print(f" {col}: {count}") 144 145 146if __name__ == "__main__": 147 main()