A simple tool for incremental sync of atproto repo CAR files
at main 2.9 kB view raw
1import sys 2from typing import BinaryIO 3import requests 4import time 5from cbrrr import CID, encode_dag_cbor, decode_dag_cbor 6from atmst.mst.node_store import NodeStore 7from atmst.mst.node_walker import NodeWalker 8from atmst.blockstore.car_file import ReadOnlyCARBlockStore, encode_varint 9from atmst.blockstore import BlockStore, OverlayBlockStore 10 11 12class XRPCBlockStore(BlockStore): 13 def __init__(self, did: str, pds_url: str): 14 self.did = did 15 self.pds_url = pds_url 16 self.session = requests.Session() 17 self.blocks_read = 0 18 19 def put_block(self, key: bytes, value: bytes) -> None: 20 raise NotImplementedError("XRPCBlockStore is read-only") 21 22 def get_block(self, key: bytes) -> bytes: 23 res = self.session.get( 24 self.pds_url + "/xrpc/com.atproto.sync.getBlocks", 25 params={ 26 "did": self.did, 27 "cids": CID(key).encode() 28 } 29 ) 30 res.raise_for_status() 31 self.blocks_read += 1 32 return res.content.partition(key)[2] # terrible hack to parse single-block CAR! 33 34 def del_block(self, key: bytes) -> None: 35 raise NotImplementedError("XRPCBlockStore is read-only") 36 37 38def write_block(file: BinaryIO, data: bytes) -> None: 39 file.write(encode_varint(len(data))) 40 file.write(data) 41 42 43def main(): 44 if len(sys.argv) != 4: 45 print("Usage: carsync <src_car> <dst_car> <pds_url>", file=sys.stderr) 46 sys.exit(1) 47 48 src_path, dst_path, pds_url = sys.argv[1:] 49 50 s = requests.session() 51 52 with open(src_path, "rb") as carfile: 53 bs1 = ReadOnlyCARBlockStore(carfile) 54 55 # figure out the DID by inspecting the source CAR 56 commit = decode_dag_cbor(bs1.get_block(bytes(bs1.car_root))) 57 assert(isinstance(commit, dict)) 58 did = commit["did"] 59 assert(isinstance(did, str)) 60 61 # set up the XRPC-backed overlay block store 62 bs2 = XRPCBlockStore(did, pds_url) 63 bs = OverlayBlockStore(bs1, bs2) 64 65 # find the *latest* commit 66 r = s.get(pds_url + "/xrpc/com.atproto.sync.getLatestCommit", params={ 67 "did": did 68 }) 69 r.raise_for_status() 70 new_commit_cid = CID.decode(r.json()["cid"]) 71 new_commit = decode_dag_cbor(bs.get_block(bytes(new_commit_cid))) 72 assert(isinstance(new_commit, dict)) 73 new_root = new_commit["data"] 74 assert(isinstance(new_root, CID)) 75 76 # for progress stats 77 record_count = 0 78 start_time = time.time() 79 80 with open(dst_path, "wb") as carfile_out: 81 new_header = encode_dag_cbor({ 82 "version": 1, 83 "roots": [new_commit_cid] 84 }) 85 write_block(carfile_out, new_header) 86 write_block(carfile_out, bytes(new_commit_cid) + encode_dag_cbor(new_commit)) 87 for node in NodeWalker(NodeStore(bs), new_root).iter_nodes(): 88 write_block(carfile_out, bytes(node.cid) + node.serialised) 89 for v in node.vals: 90 write_block(carfile_out, bytes(v) + bs.get_block(bytes(v))) 91 record_count += 1 92 print(f"\rwritten {record_count} records. fetched {bs2.blocks_read} new blocks.", end="") 93 94 print(f"\ndone in {time.time() - start_time:.1f} seconds") 95 96if __name__ == "__main__": 97 main()