Alternative ATProto PDS implementation

prototype actor_store

Changed files
+186 -34
src
actor_store
preference
record
repo
+164 -28
src/actor_store/mod.rs
··· 10 10 use std::sync::Arc; 11 11 12 12 use anyhow::{Context as _, Result}; 13 - use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; 13 + use atrium_crypto::keypair::Export as _; 14 14 use sqlx::SqlitePool; 15 15 16 + use crate::SigningKey; 16 17 use crate::config::RepoConfig; 17 18 18 19 /// Resources required by the actor store. 19 20 pub(crate) struct ActorStoreResources { 20 21 /// Configuration for the repo. 21 22 pub(crate) config: RepoConfig, 23 + /// Configuration for the blob store. 24 + pub(crate) blob_config: crate::config::BlobConfig, 22 25 /// Background task queue (we'll need to implement this later). 23 26 pub(crate) background_queue: Arc<()>, // TODO: Placeholder until we implement a proper queue 24 27 } ··· 46 49 /// Reader for actor data. 47 50 pub(crate) struct ActorStoreReader { 48 51 /// The DID of the actor. 49 - did: String, 52 + pub(crate) did: String, 50 53 /// The database connection. 51 - db: SqlitePool, 54 + pub(crate) db: SqlitePool, 52 55 /// The actor's keypair. 53 - keypair: Arc<Secp256k1Keypair>, 56 + keypair: Arc<SigningKey>, 54 57 /// Resources for the actor store. 55 - resources: Arc<ActorStoreResources>, 58 + pub(crate) resources: Arc<ActorStoreResources>, 59 + /// Repository reader 60 + pub(crate) repo: repo::RepoReader, 61 + /// Record reader 62 + pub(crate) record: record::RecordReader, 63 + /// Preference reader 64 + pub(crate) pref: preference::PreferenceReader, 56 65 } 57 66 58 67 /// Writer for actor data with transaction support. 59 68 pub(crate) struct ActorStoreWriter { 60 69 /// The DID of the actor. 61 - did: String, 70 + pub(crate) did: String, 62 71 /// The database connection. 63 - db: SqlitePool, 72 + pub(crate) db: SqlitePool, 64 73 /// The actor's keypair. 65 - keypair: Arc<Secp256k1Keypair>, 74 + keypair: Arc<SigningKey>, 66 75 /// Resources for the actor store. 67 - resources: Arc<ActorStoreResources>, 76 + pub(crate) resources: Arc<ActorStoreResources>, 77 + /// Repository access 78 + pub(crate) repo: repo::RepoTransactor, 79 + /// Record access 80 + pub(crate) record: record::RecordTransactor, 81 + /// Preference access 82 + pub(crate) pref: preference::PreferenceTransactor, 68 83 } 69 84 70 85 /// Transactor for actor data. 71 86 pub(crate) struct ActorStoreTransactor { 72 87 /// The DID of the actor. 73 - did: String, 88 + pub(crate) did: String, 74 89 /// The database connection. 75 - db: SqlitePool, 90 + pub(crate) db: SqlitePool, 76 91 /// The actor's keypair. 77 - keypair: Arc<Secp256k1Keypair>, 92 + keypair: Arc<SigningKey>, 78 93 /// Resources for the actor store. 79 - resources: Arc<ActorStoreResources>, 94 + pub(crate) resources: Arc<ActorStoreResources>, 95 + /// Repository access 96 + pub(crate) repo: repo::RepoTransactor, 97 + /// Record access 98 + pub(crate) record: record::RecordTransactor, 99 + /// Preference access 100 + pub(crate) pref: preference::PreferenceTransactor, 80 101 } 81 102 82 103 impl ActorStore { ··· 91 112 } 92 113 } 93 114 115 + /// Load an actor store based on a given DID. 116 + pub(crate) async fn load(did: &str, resources: ActorStoreResources) -> Result<Self> { 117 + let did_hash = sha256_hex(did).await?; 118 + let directory = resources.config.path.join(&did_hash[0..2]).join(did); 119 + let reserved_key_dir = directory.join("reserved_keys"); 120 + 121 + Ok(Self { 122 + directory, 123 + reserved_key_dir, 124 + resources, 125 + }) 126 + } 127 + 94 128 /// Get the location of a DID's data. 95 129 pub(crate) async fn get_location(&self, did: &str) -> Result<ActorLocation> { 96 130 let did_hash = sha256_hex(did).await?; ··· 112 146 } 113 147 114 148 /// Get the keypair for an actor. 115 - pub(crate) async fn keypair(&self, did: &str) -> Result<Arc<Secp256k1Keypair>> { 149 + pub(crate) async fn keypair(&self, did: &str) -> Result<Arc<SigningKey>> { 116 150 let location = self.get_location(did).await?; 117 151 let priv_key = tokio::fs::read(&location.key_location).await?; 118 - let keypair = Secp256k1Keypair::import(&priv_key)?; 152 + let keypair = SigningKey::import(&priv_key)?; 119 153 Ok(Arc::new(keypair)) 120 154 } 121 155 ··· 148 182 let db = self.open_db(did).await?; 149 183 let keypair = self.keypair(did).await?; 150 184 let resources = Arc::new(self.resources.clone()); 185 + let did_str = did.to_string(); 151 186 152 187 let reader = ActorStoreReader { 153 - did: did.to_string(), 188 + did: did_str.clone(), 189 + repo: repo::RepoReader::new( 190 + db.clone(), 191 + did_str.clone(), 192 + self.resources.blob_config.clone(), 193 + ), 194 + record: record::RecordReader::new(db.clone(), did_str.clone()), 195 + pref: preference::PreferenceReader::new(db.clone(), did_str), 154 196 db, 155 197 keypair, 156 198 resources, ··· 164 206 where 165 207 F: FnOnce(ActorStoreTransactor) -> Result<T>, 166 208 { 167 - let db = self.open_db(did).await?; 168 209 let keypair = self.keypair(did).await?; 210 + let db = self.open_db(did).await?; 169 211 let resources = Arc::new(self.resources.clone()); 212 + let did_str = did.to_string(); 170 213 171 214 let transactor = ActorStoreTransactor { 172 - did: did.to_string(), 215 + did: did_str.clone(), 216 + repo: repo::RepoTransactor::new( 217 + db.clone(), 218 + did_str.clone(), 219 + (*keypair).clone(), 220 + self.resources.blob_config.clone(), 221 + ), 222 + record: record::RecordTransactor::new( 223 + db.clone(), 224 + db.clone(), // Using db as placeholder for blobstore 225 + did_str.clone(), 226 + ), 227 + pref: preference::PreferenceTransactor::new(db.clone(), did_str), 173 228 db, 174 229 keypair, 175 230 resources, ··· 186 241 let db = self.open_db(did).await?; 187 242 let keypair = self.keypair(did).await?; 188 243 let resources = Arc::new(self.resources.clone()); 244 + let did_str = did.to_string(); 189 245 190 246 let writer = ActorStoreWriter { 191 - did: did.to_string(), 247 + did: did_str.clone(), 248 + repo: repo::RepoTransactor::new( 249 + db.clone(), 250 + did_str.clone(), 251 + (*keypair).clone(), 252 + self.resources.blob_config.clone(), 253 + ), 254 + record: record::RecordTransactor::new( 255 + db.clone(), 256 + db.clone(), // Using db as placeholder for blobstore 257 + did_str.clone(), 258 + ), 259 + pref: preference::PreferenceTransactor::new(db.clone(), did_str), 192 260 db, 193 261 keypair, 194 262 resources, ··· 198 266 } 199 267 200 268 /// Create a new actor repository. 201 - pub(crate) async fn create(&self, did: &str, keypair: Secp256k1Keypair) -> Result<()> { 269 + pub(crate) async fn create(&self, did: &str, keypair: SigningKey) -> Result<()> { 202 270 let location = self.get_location(did).await?; 203 271 204 272 // Create directory if it doesn't exist ··· 229 297 Ok(()) 230 298 } 231 299 232 - // TODO: To be implemented: destroy, reserve_keypair, etc. 300 + /// Destroy an actor's repository and associated data. 301 + pub(crate) async fn destroy(&self, did: &str) -> Result<()> { 302 + // TODO: Implement repository destruction 303 + // - Delete all blobs for the repository 304 + // - Remove the repository directory 305 + todo!("Implement repository destruction") 306 + } 307 + 308 + /// Reserve a keypair for a DID. 309 + pub(crate) async fn reserve_keypair(&self, did: Option<&str>) -> Result<String> { 310 + // TODO: Implement keypair reservation 311 + // - Generate a keypair if one doesn't exist 312 + // - Store the keypair in the reserved_key_dir 313 + // - Return the DID of the keypair 314 + todo!("Implement keypair reservation") 315 + } 316 + 317 + /// Get a reserved keypair. 318 + pub(crate) async fn get_reserved_keypair( 319 + &self, 320 + signing_key_or_did: &str, 321 + ) -> Result<Option<()>> { 322 + // TODO: Implement getting a reserved keypair 323 + // - Load the keypair from the reserved_key_dir 324 + todo!("Implement getting a reserved keypair") 325 + } 326 + 327 + /// Clear a reserved keypair. 328 + pub(crate) async fn clear_reserved_keypair( 329 + &self, 330 + key_did: &str, 331 + did: Option<&str>, 332 + ) -> Result<()> { 333 + // TODO: Implement clearing a reserved keypair 334 + // - Remove the keypair file from the reserved_key_dir 335 + todo!("Implement clearing a reserved keypair") 336 + } 337 + 338 + /// Store a PLC operation. 339 + pub(crate) async fn store_plc_op(&self, did: &str, op: &[u8]) -> Result<()> { 340 + // TODO: Implement storing a PLC operation 341 + // - Store the operation in the actor's directory 342 + todo!("Implement storing a PLC operation") 343 + } 344 + 345 + /// Get a stored PLC operation. 346 + pub(crate) async fn get_plc_op(&self, did: &str) -> Result<Vec<u8>> { 347 + // TODO: Implement getting a PLC operation 348 + // - Retrieve the operation from the actor's directory 349 + todo!("Implement getting a PLC operation") 350 + } 351 + 352 + /// Clear a stored PLC operation. 353 + pub(crate) async fn clear_plc_op(&self, did: &str) -> Result<()> { 354 + // TODO: Implement clearing a PLC operation 355 + // - Remove the operation file from the actor's directory 356 + todo!("Implement clearing a PLC operation") 357 + } 233 358 } 234 359 235 - // Helper function for SHA-256 hashing 236 - async fn sha256_hex(input: &str) -> Result<String> { 237 - use sha2::{Digest, Sha256}; 238 - let mut hasher = Sha256::new(); 239 - hasher.update(input.as_bytes()); 240 - let result = hasher.finalize(); 241 - Ok(hex::encode(result)) 360 + impl ActorStoreWriter { 361 + /// Transact with the writer. 362 + pub(crate) async fn transact<T, F>(&self, f: F) -> Result<T> 363 + where 364 + F: FnOnce(ActorStoreTransactor) -> Result<T>, 365 + { 366 + todo!("Implement transact method for ActorStoreWriter") 367 + } 242 368 } 243 369 244 370 impl Clone for ActorStoreResources { 245 371 fn clone(&self) -> Self { 246 372 Self { 247 373 config: self.config.clone(), 374 + blob_config: self.blob_config.clone(), 248 375 background_queue: self.background_queue.clone(), 249 376 } 250 377 } 251 378 } 379 + 380 + // Helper function for SHA-256 hashing 381 + async fn sha256_hex(input: &str) -> Result<String> { 382 + use sha2::{Digest, Sha256}; 383 + let mut hasher = Sha256::new(); 384 + hasher.update(input.as_bytes()); 385 + let result = hasher.finalize(); 386 + Ok(hex::encode(result)) 387 + }
+3
src/actor_store/preference/mod.rs
··· 2 2 3 3 mod reader; 4 4 mod transactor; 5 + 6 + pub(crate) use reader::PreferenceReader; 7 + pub(crate) use transactor::PreferenceTransactor;
+3 -3
src/actor_store/preference/transactor.rs
··· 5 5 use super::reader::{AccountPreference, PreferenceReader, pref_in_scope, pref_match_namespace}; 6 6 7 7 /// Transactor for preference operations. 8 - pub(super) struct PreferenceTransactor { 8 + pub(crate) struct PreferenceTransactor { 9 9 /// Preference reader. 10 10 pub reader: PreferenceReader, 11 11 } 12 12 13 13 impl PreferenceTransactor { 14 14 /// Create a new preference transactor. 15 - pub(super) fn new(db: SqlitePool, did: String) -> Self { 15 + pub(crate) fn new(db: SqlitePool, did: String) -> Self { 16 16 Self { 17 17 reader: PreferenceReader::new(db, did), 18 18 } 19 19 } 20 20 21 21 /// Put preferences for a namespace. 22 - pub(super) async fn put_preferences( 22 + pub(crate) async fn put_preferences( 23 23 &self, 24 24 values: Vec<AccountPreference>, 25 25 namespace: &str,
+3
src/actor_store/record/mod.rs
··· 2 2 3 3 mod reader; 4 4 mod transactor; 5 + 6 + pub(crate) use reader::RecordReader; 7 + pub(crate) use transactor::RecordTransactor;
+5
src/actor_store/repo/mod.rs
··· 4 4 mod sql_repo_reader; 5 5 mod sql_repo_transactor; 6 6 mod transactor; 7 + 8 + pub(crate) use reader::RepoReader; 9 + pub(crate) use sql_repo_reader::SqlRepoReader; 10 + pub(crate) use sql_repo_transactor::SqlRepoTransactor; 11 + pub(crate) use transactor::RepoTransactor;
-3
src/config.rs
··· 33 33 pub(crate) struct RepoConfig { 34 34 /// The path to the repository storage. 35 35 pub path: PathBuf, 36 - /// Use SQLite for repository storage instead of CAR files. 37 - #[serde(default)] 38 - pub use_sqlite: bool, 39 36 } 40 37 41 38 #[derive(Deserialize, Debug, Clone)]
+8
src/main.rs
··· 169 169 } 170 170 } 171 171 172 + impl SigningKey { 173 + /// Import from a private key. 174 + pub fn import(key: &[u8]) -> Result<Self> { 175 + let key = Secp256k1Keypair::import(key).context("failed to import signing key")?; 176 + Ok(Self(Arc::new(key))) 177 + } 178 + } 179 + 172 180 impl std::ops::Deref for RotationKey { 173 181 type Target = Secp256k1Keypair; 174 182