declarative relay deployment on hetzner
relay.waow.tech
atproto
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet
2# /// script
3# requires-python = ">=3.12"
4# dependencies = []
5# ///
6"""
7backfill collectiondir by crawling PDS hosts in batches.
8
9reads a host list and sends batched requestCrawl calls, polling crawlStatus
10between batches to let active crawls drain before proceeding.
11
12usage:
13 ./scripts/backfill --token "$TOKEN" --hosts hosts.txt
14 ./scripts/backfill --token "$TOKEN" --hosts hosts.txt --batch-size 20 --pause 30
15 ./scripts/backfill --token "$TOKEN" --hosts hosts.txt --url http://localhost:2510
16"""
17
18import argparse
19import json
20import signal
21import sys
22import time
23import urllib.request
24import urllib.error
25
26
27def request_crawl(url: str, token: str, hosts: list[str]) -> dict:
28 """POST /admin/pds/requestCrawl with a batch of hosts."""
29 data = json.dumps({"hosts": hosts}).encode()
30 req = urllib.request.Request(
31 f"{url}/admin/pds/requestCrawl",
32 data=data,
33 headers={
34 "Authorization": f"Bearer {token}",
35 "Content-Type": "application/json",
36 },
37 method="POST",
38 )
39 with urllib.request.urlopen(req, timeout=30) as resp:
40 return json.loads(resp.read())
41
42
43def crawl_status(url: str, token: str) -> dict:
44 """GET /admin/crawlStatus — returns active crawl info."""
45 req = urllib.request.Request(
46 f"{url}/admin/crawlStatus",
47 headers={"Authorization": f"Bearer {token}"},
48 )
49 with urllib.request.urlopen(req, timeout=30) as resp:
50 return json.loads(resp.read())
51
52
53def active_crawl_count(url: str, token: str, retries: int = 12) -> tuple[int, int]:
54 """returns (active_count, total_repos_seen). retries on transient errors with backoff."""
55 for attempt in range(retries):
56 try:
57 status = crawl_status(url, token)
58 active = status.get("host_starts", {})
59 seen = sum(h.get("seen", 0) for h in active.values())
60 return len(active), seen
61 except (ConnectionError, OSError, urllib.error.URLError) as e:
62 if attempt < retries - 1:
63 delay = min(5 * (attempt + 1), 30)
64 sys.stdout.write(f"\r connection error (attempt {attempt + 1}/{retries}), retrying in {delay}s...")
65 sys.stdout.flush()
66 time.sleep(delay)
67 else:
68 raise
69
70
71def wait_for_batch(url: str, token: str, baseline: int, poll_interval: float = 3.0, timeout: float = 0) -> int:
72 """poll until active crawl count returns to baseline or timeout. returns repos seen this batch."""
73 peak_seen = 0
74 batch_start = time.time()
75 while True:
76 count, seen = active_crawl_count(url, token)
77 peak_seen = max(peak_seen, seen)
78 if count <= baseline:
79 sys.stdout.write(f"\r done — {peak_seen} repos described" + " " * 20 + "\n")
80 return peak_seen
81 if timeout > 0 and (time.time() - batch_start) > timeout:
82 sys.stdout.write(f"\r timeout — {count - baseline} crawls still active, {peak_seen} repos described (skipping)" + " " * 20 + "\n")
83 return peak_seen
84 sys.stdout.write(f"\r {count - baseline} crawls active, {seen} repos described")
85 sys.stdout.flush()
86 time.sleep(poll_interval)
87
88
89def main():
90 parser = argparse.ArgumentParser(description="backfill collectiondir via batched PDS crawl")
91 parser.add_argument("--token", required=True, help="admin bearer token")
92 parser.add_argument("--hosts", required=True, help="file with one PDS hostname per line")
93 parser.add_argument("--url", default="http://localhost:2510", help="collectiondir base url (default: http://localhost:2510)")
94 parser.add_argument("--batch-size", type=int, default=10, help="hosts per batch (default: 10)")
95 parser.add_argument("--pause", type=int, default=5, help="seconds to pause between batches (default: 5)")
96 parser.add_argument("--batch-timeout", type=int, default=300, help="max seconds to wait per batch before skipping (default: 300, 0=no timeout)")
97 parser.add_argument("--skip", type=int, default=0, help="skip the first N batches (for resuming)")
98 args = parser.parse_args()
99
100 with open(args.hosts) as f:
101 all_hosts = [line.strip() for line in f if line.strip()]
102
103 if not all_hosts:
104 print("no hosts found in file")
105 return
106
107 batches = [all_hosts[i : i + args.batch_size] for i in range(0, len(all_hosts), args.batch_size)]
108
109 stopping = False
110
111 def stop(*_):
112 nonlocal stopping
113 print("\n\nctrl-c: finishing current batch, then stopping...")
114 stopping = True
115
116 signal.signal(signal.SIGINT, stop)
117
118 print(f"backfill: {len(all_hosts)} hosts in {len(batches)} batches of {args.batch_size}")
119 print(f"target: {args.url}")
120
121 # snapshot baseline before we start — accounts for leftover crawls
122 baseline, baseline_repos = active_crawl_count(args.url, args.token)
123 if baseline > 0:
124 print(f"note: {baseline} crawls already active ({baseline_repos} repos), waiting for those too")
125 print()
126
127 start = time.time()
128 total_repos = 0
129 hosts_crawled = 0
130
131 for i, batch in enumerate(batches):
132 if stopping:
133 break
134
135 if i < args.skip:
136 continue
137
138 elapsed = time.time() - start
139 rate = hosts_crawled / elapsed if elapsed > 0 else 0
140 eta = (len(all_hosts) - hosts_crawled) / rate if rate > 0 else 0
141 eta_str = f", ~{eta:.0f}s remaining" if hosts_crawled > 0 else ""
142 print(f"batch {i + 1}/{len(batches)} — {', '.join(batch[:3])}{'...' if len(batch) > 3 else ''}")
143 print(f" [{hosts_crawled}/{len(all_hosts)} hosts, {total_repos} repos, {elapsed:.0f}s elapsed{eta_str}]")
144
145 for attempt in range(6):
146 try:
147 request_crawl(args.url, args.token, batch)
148 break
149 except urllib.error.HTTPError as e:
150 body = e.read().decode(errors="replace")
151 print(f" error: {e.code} {e.reason} — {body}")
152 if e.code == 403:
153 print(" check that COLLECTIONS_ADMIN_TOKEN is set on the collectiondir pod")
154 return
155 break
156 except (ConnectionError, OSError, urllib.error.URLError) as e:
157 if attempt < 5:
158 delay = min(5 * (attempt + 1), 30)
159 sys.stdout.write(f"\r request_crawl connection error (attempt {attempt + 1}/6), retrying in {delay}s...")
160 sys.stdout.flush()
161 time.sleep(delay)
162 else:
163 print(f" error sending batch after 6 attempts: {e}")
164 break
165
166 repos = wait_for_batch(args.url, args.token, baseline, timeout=args.batch_timeout)
167 total_repos += repos
168 hosts_crawled += len(batch)
169
170 if i < len(batches) - 1 and not stopping:
171 time.sleep(args.pause)
172
173 elapsed = time.time() - start
174 print(f"\nbackfill complete: {hosts_crawled}/{len(all_hosts)} hosts, {total_repos} repos, {elapsed:.0f}s")
175
176
177if __name__ == "__main__":
178 main()