1import sys
2from typing import BinaryIO
3import requests
4import time
5from cbrrr import CID, encode_dag_cbor, decode_dag_cbor
6from atmst.mst.node_store import NodeStore
7from atmst.mst.node_walker import NodeWalker
8from atmst.blockstore.car_file import ReadOnlyCARBlockStore, encode_varint
9from atmst.blockstore import BlockStore, OverlayBlockStore
10
11
12class XRPCBlockStore(BlockStore):
13 def __init__(self, did: str, pds_url: str):
14 self.did = did
15 self.pds_url = pds_url
16 self.session = requests.Session()
17 self.blocks_read = 0
18
19 def put_block(self, key: bytes, value: bytes) -> None:
20 raise NotImplementedError("XRPCBlockStore is read-only")
21
22 def get_block(self, key: bytes) -> bytes:
23 res = self.session.get(
24 self.pds_url + "/xrpc/com.atproto.sync.getBlocks",
25 params={
26 "did": self.did,
27 "cids": CID(key).encode()
28 }
29 )
30 res.raise_for_status()
31 self.blocks_read += 1
32 return res.content.partition(key)[2] # terrible hack to parse single-block CAR!
33
34 def del_block(self, key: bytes) -> None:
35 raise NotImplementedError("XRPCBlockStore is read-only")
36
37
38def write_block(file: BinaryIO, data: bytes) -> None:
39 file.write(encode_varint(len(data)))
40 file.write(data)
41
42
43def main():
44 if len(sys.argv) != 4:
45 print("Usage: carsync <src_car> <dst_car> <pds_url>", file=sys.stderr)
46 sys.exit(1)
47
48 src_path, dst_path, pds_url = sys.argv[1:]
49
50 s = requests.session()
51
52 with open(src_path, "rb") as carfile:
53 bs1 = ReadOnlyCARBlockStore(carfile)
54
55 # figure out the DID by inspecting the source CAR
56 commit = decode_dag_cbor(bs1.get_block(bytes(bs1.car_root)))
57 assert(isinstance(commit, dict))
58 did = commit["did"]
59 assert(isinstance(did, str))
60
61 # set up the XRPC-backed overlay block store
62 bs2 = XRPCBlockStore(did, pds_url)
63 bs = OverlayBlockStore(bs1, bs2)
64
65 # find the *latest* commit
66 r = s.get(pds_url + "/xrpc/com.atproto.sync.getLatestCommit", params={
67 "did": did
68 })
69 r.raise_for_status()
70 new_commit_cid = CID.decode(r.json()["cid"])
71 new_commit = decode_dag_cbor(bs.get_block(bytes(new_commit_cid)))
72 assert(isinstance(new_commit, dict))
73 new_root = new_commit["data"]
74 assert(isinstance(new_root, CID))
75
76 # for progress stats
77 record_count = 0
78 start_time = time.time()
79
80 with open(dst_path, "wb") as carfile_out:
81 new_header = encode_dag_cbor({
82 "version": 1,
83 "roots": [new_commit_cid]
84 })
85 write_block(carfile_out, new_header)
86 write_block(carfile_out, bytes(new_commit_cid) + encode_dag_cbor(new_commit))
87 for node in NodeWalker(NodeStore(bs), new_root).iter_nodes():
88 write_block(carfile_out, bytes(node.cid) + node.serialised)
89 for v in node.vals:
90 write_block(carfile_out, bytes(v) + bs.get_block(bytes(v)))
91 record_count += 1
92 print(f"\rwritten {record_count} records. fetched {bs2.blocks_read} new blocks.", end="")
93
94 print(f"\ndone in {time.time() - start_time:.1f} seconds")
95
96if __name__ == "__main__":
97 main()