#!/usr/bin/env -S PYTHONUNBUFFERED=1 uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = [] # /// """ compare listReposByCollection results between two relay endpoints. fetches all DIDs for a collection from both endpoints and reports the difference — which DIDs are unique to each side. usage: ./scripts/collectiondir-diff --collection io.atcr.sailor.profile ./scripts/collectiondir-diff --collection app.bsky.feed.post --limit 10000 ./scripts/collectiondir-diff --collection io.atcr.sailor.profile \ --a https://relay.waow.tech --b https://relay1.us-east.bsky.network """ import argparse import json import sys import urllib.request import urllib.error def fetch_dids(base_url: str, collection: str, limit: int) -> set[str]: """paginate listReposByCollection and return all DIDs up to limit.""" endpoint = f"{base_url.rstrip('/')}/xrpc/com.atproto.sync.listReposByCollection" dids: set[str] = set() cursor = None page_size = min(limit, 2000) while len(dids) < limit: params = f"collection={collection}&limit={page_size}" if cursor: params += f"&cursor={urllib.request.quote(cursor)}" url = f"{endpoint}?{params}" try: req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read()) except urllib.error.HTTPError as e: print(f" error fetching {base_url}: {e.code} {e.reason}", file=sys.stderr) break except Exception as e: print(f" error fetching {base_url}: {e}", file=sys.stderr) break repos = data.get("repos", []) if not repos: break for repo in repos: dids.add(repo["did"]) cursor = data.get("cursor") if not cursor: break sys.stdout.write(f"\r {base_url}: {len(dids)} DIDs fetched...") sys.stdout.flush() return dids def resolve_did(did: str) -> dict: """resolve a DID via plc.directory. returns handle + PDS host.""" try: url = f"https://plc.directory/{did}" with urllib.request.urlopen(url, timeout=10) as resp: data = json.loads(resp.read()) handle = "?" aka = data.get("alsoKnownAs", []) if aka: handle = aka[0].replace("at://", "") pds = "?" services = data.get("service", []) if services: pds = services[0].get("serviceEndpoint", "?") return {"handle": handle, "pds": pds} except Exception: return {"handle": "?", "pds": "?"} def main(): parser = argparse.ArgumentParser( description="compare listReposByCollection between two relay endpoints" ) parser.add_argument("--collection", required=True, help="collection NSID to compare") parser.add_argument( "--a", default="https://relay.waow.tech", help="first endpoint (default: https://relay.waow.tech)", ) parser.add_argument( "--b", default="https://bsky.network", help="second endpoint (default: https://bsky.network)", ) parser.add_argument( "--limit", type=int, default=100_000, help="max DIDs to fetch per endpoint (default: 100000)", ) parser.add_argument( "--resolve", action="store_true", help="resolve DIDs to handles via plc.directory (slower)", ) args = parser.parse_args() print(f"collection: {args.collection}") print(f" A: {args.a}") print(f" B: {args.b}") print(f" limit: {args.limit:,}") print() dids_a = fetch_dids(args.a, args.collection, args.limit) print(f"\r A: {len(dids_a):,} DIDs" + " " * 40) dids_b = fetch_dids(args.b, args.collection, args.limit) print(f"\r B: {len(dids_b):,} DIDs" + " " * 40) print() only_a = dids_a - dids_b only_b = dids_b - dids_a common = dids_a & dids_b print(f"common: {len(common):,}") print(f"only in A ({args.a}): {len(only_a):,}") print(f"only in B ({args.b}): {len(only_b):,}") def print_dids(label: str, dids: set[str]): if not dids: return print(f"\n{label}:") for did in sorted(dids): if args.resolve: info = resolve_did(did) print(f" {did} @{info['handle']} ({info['pds']})") else: print(f" {did}") print_dids(f"only in A", only_a) print_dids(f"only in B", only_b) if not only_a and not only_b: print("\nidentical.") if __name__ == "__main__": main()