declarative relay deployment on hetzner relay.waow.tech
atproto
at main 178 lines 7.1 kB view raw
1#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [] 5# /// 6""" 7backfill collectiondir by crawling PDS hosts in batches. 8 9reads a host list and sends batched requestCrawl calls, polling crawlStatus 10between batches to let active crawls drain before proceeding. 11 12usage: 13 ./scripts/backfill --token "$TOKEN" --hosts hosts.txt 14 ./scripts/backfill --token "$TOKEN" --hosts hosts.txt --batch-size 20 --pause 30 15 ./scripts/backfill --token "$TOKEN" --hosts hosts.txt --url http://localhost:2510 16""" 17 18import argparse 19import json 20import signal 21import sys 22import time 23import urllib.request 24import urllib.error 25 26 27def request_crawl(url: str, token: str, hosts: list[str]) -> dict: 28 """POST /admin/pds/requestCrawl with a batch of hosts.""" 29 data = json.dumps({"hosts": hosts}).encode() 30 req = urllib.request.Request( 31 f"{url}/admin/pds/requestCrawl", 32 data=data, 33 headers={ 34 "Authorization": f"Bearer {token}", 35 "Content-Type": "application/json", 36 }, 37 method="POST", 38 ) 39 with urllib.request.urlopen(req, timeout=30) as resp: 40 return json.loads(resp.read()) 41 42 43def crawl_status(url: str, token: str) -> dict: 44 """GET /admin/crawlStatus — returns active crawl info.""" 45 req = urllib.request.Request( 46 f"{url}/admin/crawlStatus", 47 headers={"Authorization": f"Bearer {token}"}, 48 ) 49 with urllib.request.urlopen(req, timeout=30) as resp: 50 return json.loads(resp.read()) 51 52 53def active_crawl_count(url: str, token: str, retries: int = 12) -> tuple[int, int]: 54 """returns (active_count, total_repos_seen). retries on transient errors with backoff.""" 55 for attempt in range(retries): 56 try: 57 status = crawl_status(url, token) 58 active = status.get("host_starts", {}) 59 seen = sum(h.get("seen", 0) for h in active.values()) 60 return len(active), seen 61 except (ConnectionError, OSError, urllib.error.URLError) as e: 62 if attempt < retries - 1: 63 delay = min(5 * (attempt + 1), 30) 64 sys.stdout.write(f"\r connection error (attempt {attempt + 1}/{retries}), retrying in {delay}s...") 65 sys.stdout.flush() 66 time.sleep(delay) 67 else: 68 raise 69 70 71def wait_for_batch(url: str, token: str, baseline: int, poll_interval: float = 3.0, timeout: float = 0) -> int: 72 """poll until active crawl count returns to baseline or timeout. returns repos seen this batch.""" 73 peak_seen = 0 74 batch_start = time.time() 75 while True: 76 count, seen = active_crawl_count(url, token) 77 peak_seen = max(peak_seen, seen) 78 if count <= baseline: 79 sys.stdout.write(f"\r done — {peak_seen} repos described" + " " * 20 + "\n") 80 return peak_seen 81 if timeout > 0 and (time.time() - batch_start) > timeout: 82 sys.stdout.write(f"\r timeout — {count - baseline} crawls still active, {peak_seen} repos described (skipping)" + " " * 20 + "\n") 83 return peak_seen 84 sys.stdout.write(f"\r {count - baseline} crawls active, {seen} repos described") 85 sys.stdout.flush() 86 time.sleep(poll_interval) 87 88 89def main(): 90 parser = argparse.ArgumentParser(description="backfill collectiondir via batched PDS crawl") 91 parser.add_argument("--token", required=True, help="admin bearer token") 92 parser.add_argument("--hosts", required=True, help="file with one PDS hostname per line") 93 parser.add_argument("--url", default="http://localhost:2510", help="collectiondir base url (default: http://localhost:2510)") 94 parser.add_argument("--batch-size", type=int, default=10, help="hosts per batch (default: 10)") 95 parser.add_argument("--pause", type=int, default=5, help="seconds to pause between batches (default: 5)") 96 parser.add_argument("--batch-timeout", type=int, default=300, help="max seconds to wait per batch before skipping (default: 300, 0=no timeout)") 97 parser.add_argument("--skip", type=int, default=0, help="skip the first N batches (for resuming)") 98 args = parser.parse_args() 99 100 with open(args.hosts) as f: 101 all_hosts = [line.strip() for line in f if line.strip()] 102 103 if not all_hosts: 104 print("no hosts found in file") 105 return 106 107 batches = [all_hosts[i : i + args.batch_size] for i in range(0, len(all_hosts), args.batch_size)] 108 109 stopping = False 110 111 def stop(*_): 112 nonlocal stopping 113 print("\n\nctrl-c: finishing current batch, then stopping...") 114 stopping = True 115 116 signal.signal(signal.SIGINT, stop) 117 118 print(f"backfill: {len(all_hosts)} hosts in {len(batches)} batches of {args.batch_size}") 119 print(f"target: {args.url}") 120 121 # snapshot baseline before we start — accounts for leftover crawls 122 baseline, baseline_repos = active_crawl_count(args.url, args.token) 123 if baseline > 0: 124 print(f"note: {baseline} crawls already active ({baseline_repos} repos), waiting for those too") 125 print() 126 127 start = time.time() 128 total_repos = 0 129 hosts_crawled = 0 130 131 for i, batch in enumerate(batches): 132 if stopping: 133 break 134 135 if i < args.skip: 136 continue 137 138 elapsed = time.time() - start 139 rate = hosts_crawled / elapsed if elapsed > 0 else 0 140 eta = (len(all_hosts) - hosts_crawled) / rate if rate > 0 else 0 141 eta_str = f", ~{eta:.0f}s remaining" if hosts_crawled > 0 else "" 142 print(f"batch {i + 1}/{len(batches)} — {', '.join(batch[:3])}{'...' if len(batch) > 3 else ''}") 143 print(f" [{hosts_crawled}/{len(all_hosts)} hosts, {total_repos} repos, {elapsed:.0f}s elapsed{eta_str}]") 144 145 for attempt in range(6): 146 try: 147 request_crawl(args.url, args.token, batch) 148 break 149 except urllib.error.HTTPError as e: 150 body = e.read().decode(errors="replace") 151 print(f" error: {e.code} {e.reason} — {body}") 152 if e.code == 403: 153 print(" check that COLLECTIONS_ADMIN_TOKEN is set on the collectiondir pod") 154 return 155 break 156 except (ConnectionError, OSError, urllib.error.URLError) as e: 157 if attempt < 5: 158 delay = min(5 * (attempt + 1), 30) 159 sys.stdout.write(f"\r request_crawl connection error (attempt {attempt + 1}/6), retrying in {delay}s...") 160 sys.stdout.flush() 161 time.sleep(delay) 162 else: 163 print(f" error sending batch after 6 attempts: {e}") 164 break 165 166 repos = wait_for_batch(args.url, args.token, baseline, timeout=args.batch_timeout) 167 total_repos += repos 168 hosts_crawled += len(batch) 169 170 if i < len(batches) - 1 and not stopping: 171 time.sleep(args.pause) 172 173 elapsed = time.time() - start 174 print(f"\nbackfill complete: {hosts_crawled}/{len(all_hosts)} hosts, {total_repos} repos, {elapsed:.0f}s") 175 176 177if __name__ == "__main__": 178 main()