#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = [] # /// """ backfill collectiondir by crawling PDS hosts in batches. reads a host list and sends batched requestCrawl calls, polling crawlStatus between batches to let active crawls drain before proceeding. usage: ./scripts/backfill --token "$TOKEN" --hosts hosts.txt ./scripts/backfill --token "$TOKEN" --hosts hosts.txt --batch-size 20 --pause 30 ./scripts/backfill --token "$TOKEN" --hosts hosts.txt --url http://localhost:2510 """ import argparse import json import signal import sys import time import urllib.request import urllib.error def request_crawl(url: str, token: str, hosts: list[str]) -> dict: """POST /admin/pds/requestCrawl with a batch of hosts.""" data = json.dumps({"hosts": hosts}).encode() req = urllib.request.Request( f"{url}/admin/pds/requestCrawl", data=data, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json", }, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read()) def crawl_status(url: str, token: str) -> dict: """GET /admin/crawlStatus — returns active crawl info.""" req = urllib.request.Request( f"{url}/admin/crawlStatus", headers={"Authorization": f"Bearer {token}"}, ) with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read()) def active_crawl_count(url: str, token: str, retries: int = 12) -> tuple[int, int]: """returns (active_count, total_repos_seen). retries on transient errors with backoff.""" for attempt in range(retries): try: status = crawl_status(url, token) active = status.get("host_starts", {}) seen = sum(h.get("seen", 0) for h in active.values()) return len(active), seen except (ConnectionError, OSError, urllib.error.URLError) as e: if attempt < retries - 1: delay = min(5 * (attempt + 1), 30) sys.stdout.write(f"\r connection error (attempt {attempt + 1}/{retries}), retrying in {delay}s...") sys.stdout.flush() time.sleep(delay) else: raise def wait_for_batch(url: str, token: str, baseline: int, poll_interval: float = 3.0, timeout: float = 0) -> int: """poll until active crawl count returns to baseline or timeout. returns repos seen this batch.""" peak_seen = 0 batch_start = time.time() while True: count, seen = active_crawl_count(url, token) peak_seen = max(peak_seen, seen) if count <= baseline: sys.stdout.write(f"\r done — {peak_seen} repos described" + " " * 20 + "\n") return peak_seen if timeout > 0 and (time.time() - batch_start) > timeout: sys.stdout.write(f"\r timeout — {count - baseline} crawls still active, {peak_seen} repos described (skipping)" + " " * 20 + "\n") return peak_seen sys.stdout.write(f"\r {count - baseline} crawls active, {seen} repos described") sys.stdout.flush() time.sleep(poll_interval) def main(): parser = argparse.ArgumentParser(description="backfill collectiondir via batched PDS crawl") parser.add_argument("--token", required=True, help="admin bearer token") parser.add_argument("--hosts", required=True, help="file with one PDS hostname per line") parser.add_argument("--url", default="http://localhost:2510", help="collectiondir base url (default: http://localhost:2510)") parser.add_argument("--batch-size", type=int, default=10, help="hosts per batch (default: 10)") parser.add_argument("--pause", type=int, default=5, help="seconds to pause between batches (default: 5)") parser.add_argument("--batch-timeout", type=int, default=300, help="max seconds to wait per batch before skipping (default: 300, 0=no timeout)") parser.add_argument("--skip", type=int, default=0, help="skip the first N batches (for resuming)") args = parser.parse_args() with open(args.hosts) as f: all_hosts = [line.strip() for line in f if line.strip()] if not all_hosts: print("no hosts found in file") return batches = [all_hosts[i : i + args.batch_size] for i in range(0, len(all_hosts), args.batch_size)] stopping = False def stop(*_): nonlocal stopping print("\n\nctrl-c: finishing current batch, then stopping...") stopping = True signal.signal(signal.SIGINT, stop) print(f"backfill: {len(all_hosts)} hosts in {len(batches)} batches of {args.batch_size}") print(f"target: {args.url}") # snapshot baseline before we start — accounts for leftover crawls baseline, baseline_repos = active_crawl_count(args.url, args.token) if baseline > 0: print(f"note: {baseline} crawls already active ({baseline_repos} repos), waiting for those too") print() start = time.time() total_repos = 0 hosts_crawled = 0 for i, batch in enumerate(batches): if stopping: break if i < args.skip: continue elapsed = time.time() - start rate = hosts_crawled / elapsed if elapsed > 0 else 0 eta = (len(all_hosts) - hosts_crawled) / rate if rate > 0 else 0 eta_str = f", ~{eta:.0f}s remaining" if hosts_crawled > 0 else "" print(f"batch {i + 1}/{len(batches)} — {', '.join(batch[:3])}{'...' if len(batch) > 3 else ''}") print(f" [{hosts_crawled}/{len(all_hosts)} hosts, {total_repos} repos, {elapsed:.0f}s elapsed{eta_str}]") for attempt in range(6): try: request_crawl(args.url, args.token, batch) break except urllib.error.HTTPError as e: body = e.read().decode(errors="replace") print(f" error: {e.code} {e.reason} — {body}") if e.code == 403: print(" check that COLLECTIONS_ADMIN_TOKEN is set on the collectiondir pod") return break except (ConnectionError, OSError, urllib.error.URLError) as e: if attempt < 5: delay = min(5 * (attempt + 1), 30) sys.stdout.write(f"\r request_crawl connection error (attempt {attempt + 1}/6), retrying in {delay}s...") sys.stdout.flush() time.sleep(delay) else: print(f" error sending batch after 6 attempts: {e}") break repos = wait_for_batch(args.url, args.token, baseline, timeout=args.batch_timeout) total_repos += repos hosts_crawled += len(batch) if i < len(batches) - 1 and not stopping: time.sleep(args.pause) elapsed = time.time() - start print(f"\nbackfill complete: {hosts_crawled}/{len(all_hosts)} hosts, {total_repos} repos, {elapsed:.0f}s") if __name__ == "__main__": main()