A simple tool for incremental sync of atproto repo CAR files

initial commit

retr0.id c89780fe

+11
README.md
···
··· 1 + # carsync 2 + 3 + Python script to efficiently\* refresh an outdated copy of an atproto repo CAR file using `com.atproto.sync.getBlocks` 4 + 5 + \*Caveats: 6 + - Every missing block is fetched sequentially via `getBlocks` - there is no batching or concurrency. 7 + - The whole CAR file is read and re-written. 8 + 9 + The latter can be solved by storing the repo in SQLite (or maybe rocksdb) instead of a CAR file, and doing an MST diff rather than a full MST traversal (as is currently the case). Solving the former would probably require some galaxy-brain concurrent MST diff impl. 10 + 11 + Despite these limitations, it's still practical and fast even for large-ish repos.
+97
carsync.py
···
··· 1 + import sys 2 + from typing import BinaryIO 3 + import requests 4 + import time 5 + from cbrrr import CID, encode_dag_cbor, decode_dag_cbor 6 + from atmst.mst.node_store import NodeStore 7 + from atmst.mst.node_walker import NodeWalker 8 + from atmst.blockstore.car_file import ReadOnlyCARBlockStore, encode_varint 9 + from atmst.blockstore import BlockStore, OverlayBlockStore 10 + 11 + 12 + class XRPCBlockStore(BlockStore): 13 + def __init__(self, did: str, pds_url: str): 14 + self.did = did 15 + self.pds_url = pds_url 16 + self.session = requests.Session() 17 + self.blocks_read = 0 18 + 19 + def put_block(self, key: bytes, value: bytes) -> None: 20 + raise NotImplementedError("XRPCBlockStore is read-only") 21 + 22 + def get_block(self, key: bytes) -> bytes: 23 + res = self.session.get( 24 + self.pds_url + "/xrpc/com.atproto.sync.getBlocks", 25 + params={ 26 + "did": self.did, 27 + "cids": CID(key).encode() 28 + } 29 + ) 30 + res.raise_for_status() 31 + self.blocks_read += 1 32 + return res.content.partition(key)[2] # terrible hack to parse single-block CAR! 33 + 34 + def del_block(self, key: bytes) -> None: 35 + raise NotImplementedError("XRPCBlockStore is read-only") 36 + 37 + 38 + def write_block(file: BinaryIO, data: bytes) -> None: 39 + file.write(encode_varint(len(data))) 40 + file.write(data) 41 + 42 + 43 + def main(): 44 + if len(sys.argv) != 4: 45 + print("Usage: carsync <src_car> <dst_car> <pds_url>", file=sys.stderr) 46 + sys.exit(1) 47 + 48 + src_path, dst_path, pds_url = sys.argv[1:] 49 + 50 + s = requests.session() 51 + 52 + with open(src_path, "rb") as carfile: 53 + bs1 = ReadOnlyCARBlockStore(carfile) 54 + 55 + # figure out the DID by inspecting the source CAR 56 + commit = decode_dag_cbor(bs1.get_block(bytes(bs1.car_root))) 57 + assert(isinstance(commit, dict)) 58 + did = commit["did"] 59 + assert(isinstance(did, str)) 60 + 61 + # set up the XRPC-backed overlay block store 62 + bs2 = XRPCBlockStore(did, pds_url) 63 + bs = OverlayBlockStore(bs1, bs2) 64 + 65 + # find the *latest* commit 66 + r = s.get(pds_url + "/xrpc/com.atproto.sync.getLatestCommit", params={ 67 + "did": did 68 + }) 69 + r.raise_for_status() 70 + new_commit_cid = CID.decode(r.json()["cid"]) 71 + new_commit = decode_dag_cbor(bs.get_block(bytes(new_commit_cid))) 72 + assert(isinstance(new_commit, dict)) 73 + new_root = new_commit["data"] 74 + assert(isinstance(new_root, CID)) 75 + 76 + # for progress stats 77 + record_count = 0 78 + start_time = time.time() 79 + 80 + with open(dst_path, "wb") as carfile_out: 81 + new_header = encode_dag_cbor({ 82 + "version": 1, 83 + "roots": [new_commit_cid] 84 + }) 85 + write_block(carfile_out, new_header) 86 + write_block(carfile_out, bytes(new_commit_cid) + encode_dag_cbor(new_commit)) 87 + for node in NodeWalker(NodeStore(bs), new_root).iter_nodes(): 88 + write_block(carfile_out, bytes(node.cid) + node.serialised) 89 + for v in node.vals: 90 + write_block(carfile_out, bytes(v) + bs.get_block(bytes(v))) 91 + record_count += 1 92 + print(f"\rwritten {record_count} records. fetched {bs2.blocks_read} new blocks.", end="") 93 + 94 + print(f"\ndone in {time.time() - start_time:.1f} seconds") 95 + 96 + if __name__ == "__main__": 97 + main()
+17
pyproject.toml
···
··· 1 + [project] 2 + name = "carsync" 3 + version = "0.1.0" 4 + description = "A simple tool for incremental sync of atproto repo CAR files" 5 + requires-python = ">=3.8" 6 + dependencies = [ 7 + "cbrrr", 8 + "atmst", 9 + "requests", 10 + ] 11 + 12 + [project.scripts] 13 + carsync = "carsync:main" 14 + 15 + [build-system] 16 + requires = ["setuptools>=61.0"] 17 + build-backend = "setuptools.build_meta"