Alternative ATProto PDS implementation

prototype blockmap

Changed files
+220 -3
src
actor_store
+14
Cargo.lock
··· 754 754 "memmap2", 755 755 "metrics", 756 756 "metrics-exporter-prometheus", 757 + "multihash 0.19.3", 757 758 "rand 0.8.5", 758 759 "regex", 759 760 "reqwest", ··· 761 762 "rsky-repo", 762 763 "rsky-syntax", 763 764 "serde", 765 + "serde_bytes", 764 766 "serde_ipld_dagcbor", 767 + "serde_ipld_dagjson", 765 768 "serde_json", 766 769 "sha2", 767 770 "sqlx", ··· 4017 4020 "ipld-core", 4018 4021 "scopeguard", 4019 4022 "serde", 4023 + ] 4024 + 4025 + [[package]] 4026 + name = "serde_ipld_dagjson" 4027 + version = "0.2.0" 4028 + source = "registry+https://github.com/rust-lang/crates.io-index" 4029 + checksum = "3359b47ba7f4a306ef5984665e10539e212e97217afa489437d533208eecda36" 4030 + dependencies = [ 4031 + "ipld-core", 4032 + "serde", 4033 + "serde_json", 4020 4034 ] 4021 4035 4022 4036 [[package]]
+4 -3
Cargo.toml
··· 163 163 reqwest = { version = "0.12.12", features = ["json"] } 164 164 reqwest-middleware = { version = "0.4.0", features = ["json"] } 165 165 serde = { version = "1.0.218", features = ["derive"] } 166 - serde_ipld_dagcbor = { version = "0.6.2", default-features = false, features = [ 167 - "std", 168 - ] } 166 + serde_ipld_dagcbor = { version = "0.6.2", default-features = false, features = ["std"] } 169 167 serde_json = "1.0.139" 170 168 sha2 = "0.10.8" 171 169 sqlx = { version = "0.8.3", features = ["json", "runtime-tokio", "sqlite"] } ··· 186 184 regex = "1.11.1" 187 185 rsky-syntax = { git = "https://github.com/blacksky-algorithms/rsky.git" } 188 186 rsky-repo = { git = "https://github.com/blacksky-algorithms/rsky.git" } 187 + serde_bytes = "0.11.17" 188 + multihash = "0.19.3" 189 + serde_ipld_dagjson = "0.2.0"
+201
src/actor_store/block_map.rs
··· 1 + //! BlockMap impl 2 + //! Ref: https://github.com/blacksky-algorithms/rsky/blob/main/rsky-repo/src/block_map.rs 3 + //! blacksky-algorithms/rsky is licensed under the Apache License 2.0 4 + //! Reimplemented here because of conflicting dependencies between atrium and rsky 5 + 6 + use anyhow::Result; 7 + use atrium_repo::Cid; 8 + use ipld_core::codec::Codec; 9 + use serde::{Deserialize, Serialize}; 10 + use serde_ipld_dagcbor::codec::DagCborCodec; 11 + use std::collections::BTreeMap; 12 + use std::str::FromStr; 13 + 14 + /// Ref: https://github.com/blacksky-algorithms/rsky/blob/main/rsky-common/src/lib.rs#L57 15 + pub fn struct_to_cbor<T: Serialize>(obj: &T) -> Result<Vec<u8>> { 16 + Ok(serde_ipld_dagcbor::to_vec(obj)?) 17 + } 18 + 19 + /// Ref: https://github.com/blacksky-algorithms/rsky/blob/37954845d06aaafea2b914d9096a1657abfc8d75/rsky-common/src/ipld.rs 20 + pub fn cid_for_cbor<T: Serialize>(data: &T) -> Result<Cid> { 21 + // let bytes = struct_to_cbor(data)?; 22 + // let cid = Cid::new_v1( 23 + // u64::from(DagCborCodec), 24 + // Code::Sha2_256.digest(bytes.as_slice()), 25 + // ); 26 + // Ok(cid) 27 + todo!() 28 + } 29 + 30 + // pub fn sha256_to_cid<T: Codec>(hash: Vec<u8>, codec: T) -> Cid 31 + // where 32 + // u64: From<T>, 33 + // { 34 + // let cid = Cid::new_v1(u64::from(codec), Code::Sha2_256.digest(hash.as_slice())); 35 + // cid 36 + // } 37 + // todo!() 38 + 39 + pub fn sha256_raw_to_cid(hash: Vec<u8>) -> Cid { 40 + // sha256_to_cid(hash, RawCodec) 41 + todo!() 42 + } 43 + 44 + /// Ref: https://github.com/blacksky-algorithms/rsky/blob/main/rsky-repo/src/types.rs#L436 45 + pub type CarBlock = CidAndBytes; 46 + pub struct CidAndBytes { 47 + pub cid: Cid, 48 + pub bytes: Vec<u8>, 49 + } 50 + 51 + // Thinly wraps a Vec<u8> 52 + // The #[serde(transparent)] attribute ensures that during (de)serialization 53 + // this newtype is treated the same as the underlying Vec<u8>. 54 + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 55 + #[serde(transparent)] 56 + pub struct Bytes(#[serde(with = "serde_bytes")] pub Vec<u8>); 57 + 58 + #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] 59 + pub struct BlockMap { 60 + pub map: BTreeMap<String, Bytes>, 61 + } 62 + 63 + impl BlockMap { 64 + pub fn new() -> Self { 65 + BlockMap { 66 + map: BTreeMap::new(), 67 + } 68 + } 69 + 70 + pub fn add<T: Serialize>(&mut self, value: T) -> Result<Cid> { 71 + let cid = cid_for_cbor(&value)?; 72 + self.set( 73 + cid, 74 + struct_to_cbor(&value)?, //bytes 75 + ); 76 + Ok(cid) 77 + } 78 + 79 + pub fn set(&mut self, cid: Cid, bytes: Vec<u8>) -> () { 80 + self.map.insert(cid.to_string(), Bytes(bytes)); 81 + () 82 + } 83 + 84 + pub fn get(&self, cid: Cid) -> Option<&Vec<u8>> { 85 + self.map.get(&cid.to_string()).map(|bytes| &bytes.0) 86 + } 87 + pub fn delete(&mut self, cid: Cid) -> Result<()> { 88 + self.map.remove(&cid.to_string()); 89 + Ok(()) 90 + } 91 + 92 + pub fn get_many(&mut self, cids: Vec<Cid>) -> Result<BlocksAndMissing> { 93 + let mut missing: Vec<Cid> = Vec::new(); 94 + let mut blocks = BlockMap::new(); 95 + for cid in cids { 96 + let got = self.map.get(&cid.to_string()).map(|bytes| &bytes.0); 97 + if let Some(bytes) = got { 98 + blocks.set(cid, bytes.clone()); 99 + } else { 100 + missing.push(cid); 101 + } 102 + } 103 + Ok(BlocksAndMissing { blocks, missing }) 104 + } 105 + 106 + pub fn has(&self, cid: Cid) -> bool { 107 + self.map.contains_key(&cid.to_string()) 108 + } 109 + 110 + pub fn clear(&mut self) -> () { 111 + self.map.clear() 112 + } 113 + 114 + // Not really using. Issues with closures 115 + pub fn for_each(&self, cb: impl Fn(&Vec<u8>, Cid) -> ()) -> Result<()> { 116 + for (key, val) in self.map.iter() { 117 + cb(&val.0, Cid::from_str(&key)?); 118 + } 119 + Ok(()) 120 + } 121 + 122 + pub fn entries(&self) -> Result<Vec<CidAndBytes>> { 123 + let mut entries: Vec<CidAndBytes> = Vec::new(); 124 + for (cid, bytes) in self.map.iter() { 125 + entries.push(CidAndBytes { 126 + cid: Cid::from_str(cid)?, 127 + bytes: bytes.0.clone(), 128 + }); 129 + } 130 + Ok(entries) 131 + } 132 + 133 + pub fn cids(&self) -> Result<Vec<Cid>> { 134 + Ok(self.entries()?.into_iter().map(|e| e.cid).collect()) 135 + } 136 + 137 + pub fn add_map(&mut self, to_add: BlockMap) -> Result<()> { 138 + let results = for (cid, bytes) in to_add.map.iter() { 139 + self.set(Cid::from_str(cid)?, bytes.0.clone()); 140 + }; 141 + Ok(results) 142 + } 143 + 144 + pub fn size(&self) -> usize { 145 + self.map.len() 146 + } 147 + 148 + pub fn byte_size(&self) -> Result<usize> { 149 + let mut size = 0; 150 + for (_, bytes) in self.map.iter() { 151 + size += bytes.0.len(); 152 + } 153 + Ok(size) 154 + } 155 + 156 + pub fn equals(&self, other: BlockMap) -> Result<bool> { 157 + if self.size() != other.size() { 158 + return Ok(false); 159 + } 160 + for entry in self.entries()? { 161 + let other_bytes = other.get(entry.cid); 162 + if let Some(o) = other_bytes { 163 + if &entry.bytes != o { 164 + return Ok(false); 165 + } 166 + } else { 167 + return Ok(false); 168 + } 169 + } 170 + Ok(true) 171 + } 172 + } 173 + 174 + // Helper function for the iterator conversion. 175 + fn to_cid_and_bytes((cid_str, bytes): (String, Bytes)) -> CidAndBytes { 176 + // We assume the key is always valid; otherwise, you could handle the error. 177 + let cid = Cid::from_str(&cid_str).expect("BlockMap contains an invalid CID string"); 178 + CidAndBytes { 179 + cid, 180 + bytes: bytes.0, 181 + } 182 + } 183 + 184 + impl IntoIterator for BlockMap { 185 + type Item = CidAndBytes; 186 + // Using the iterator returned by BTreeMap's into_iter, then mapping with a function pointer. 187 + type IntoIter = std::iter::Map< 188 + std::collections::btree_map::IntoIter<String, Bytes>, 189 + fn((String, Bytes)) -> CidAndBytes, 190 + >; 191 + 192 + fn into_iter(self) -> Self::IntoIter { 193 + self.map.into_iter().map(to_cid_and_bytes) 194 + } 195 + } 196 + 197 + #[derive(Debug)] 198 + pub struct BlocksAndMissing { 199 + pub blocks: BlockMap, 200 + pub missing: Vec<Cid>, 201 + }
+1
src/actor_store/mod.rs
··· 1 1 //! Actor store implementation for ATProto PDS. 2 2 3 3 mod blob; 4 + mod block_map; 4 5 mod db; 5 6 mod preference; 6 7 mod record;