Alternative ATProto PDS implementation

experiment with actor_store from reference impl

+12
.sqlx/query-0c112e544ad7f92f3699ee79d2bd674ff72041c4dd6050c00a1fa3eac38553e6.json
··· 1 + { 2 + "db_name": "SQLite", 3 + "query": "UPDATE repo_root SET cid = ?, rev = ?, indexedAt = ? WHERE did = ?", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Right": 4 8 + }, 9 + "nullable": [] 10 + }, 11 + "hash": "0c112e544ad7f92f3699ee79d2bd674ff72041c4dd6050c00a1fa3eac38553e6" 12 + }
+20
.sqlx/query-1d9681bf4929b3f6dcfbb55fd5e15fb2593615b6e796c7d63731b78b0c43738c.json
··· 1 + { 2 + "db_name": "SQLite", 3 + "query": "SELECT COUNT(*) FROM repo_block WHERE cid = ?", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "name": "COUNT(*)", 8 + "ordinal": 0, 9 + "type_info": "Integer" 10 + } 11 + ], 12 + "parameters": { 13 + "Right": 1 14 + }, 15 + "nullable": [ 16 + false 17 + ] 18 + }, 19 + "hash": "1d9681bf4929b3f6dcfbb55fd5e15fb2593615b6e796c7d63731b78b0c43738c" 20 + }
-20
.sqlx/query-1db52857493a1e8a7004872eaff6e8fe5dec41579dd57d696008385b8d23788d.json
··· 1 - { 2 - "db_name": "SQLite", 3 - "query": "SELECT data FROM blocks WHERE cid = ?", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "name": "data", 8 - "ordinal": 0, 9 - "type_info": "Blob" 10 - } 11 - ], 12 - "parameters": { 13 - "Right": 1 14 - }, 15 - "nullable": [ 16 - false 17 - ] 18 - }, 19 - "hash": "1db52857493a1e8a7004872eaff6e8fe5dec41579dd57d696008385b8d23788d" 20 - }
-12
.sqlx/query-2918ecf03675a789568c777904966911ca63e991dede42a2d7d87e174799ea46.json
··· 1 - { 2 - "db_name": "SQLite", 3 - "query": "INSERT INTO blocks (cid, data, multicodec, multihash) VALUES (?, ?, ?, ?)", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Right": 4 8 - }, 9 - "nullable": [] 10 - }, 11 - "hash": "2918ecf03675a789568c777904966911ca63e991dede42a2d7d87e174799ea46" 12 - }
+20
.sqlx/query-3b4745208f268678a84401e522c3836e0632ca34a0f23bbae5297d076610f0ab.json
··· 1 + { 2 + "db_name": "SQLite", 3 + "query": "SELECT content FROM repo_block WHERE cid = ?", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "name": "content", 8 + "ordinal": 0, 9 + "type_info": "Blob" 10 + } 11 + ], 12 + "parameters": { 13 + "Right": 1 14 + }, 15 + "nullable": [ 16 + false 17 + ] 18 + }, 19 + "hash": "3b4745208f268678a84401e522c3836e0632ca34a0f23bbae5297d076610f0ab" 20 + }
+2 -2
.sqlx/query-73fd3e30b7694c92cf9309751d186fe622fa7d99fdf56dde7e60c3696581116c.json .sqlx/query-1d91d3f785f8f3e86baa2af1e643364b242bb6e8dbc0d50b86602022c0000ed4.json
··· 1 1 { 2 2 "db_name": "SQLite", 3 - "query": "SELECT COUNT(*) FROM blocks WHERE cid = ?", 3 + "query": "SELECT COUNT(*) FROM repo_root WHERE did = ?", 4 4 "describe": { 5 5 "columns": [ 6 6 { ··· 16 16 false 17 17 ] 18 18 }, 19 - "hash": "73fd3e30b7694c92cf9309751d186fe622fa7d99fdf56dde7e60c3696581116c" 19 + "hash": "1d91d3f785f8f3e86baa2af1e643364b242bb6e8dbc0d50b86602022c0000ed4" 20 20 }
+12
.sqlx/query-aeeee4d0a717e2207e5e810db35b21728f6473795b025a25050cec30a4f5ba55.json
··· 1 + { 2 + "db_name": "SQLite", 3 + "query": "INSERT INTO repo_block (cid, repoRev, size, content) VALUES (?, ?, ?, ?)", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Right": 4 8 + }, 9 + "nullable": [] 10 + }, 11 + "hash": "aeeee4d0a717e2207e5e810db35b21728f6473795b025a25050cec30a4f5ba55" 12 + }
+12
.sqlx/query-fcd6cb9fdec21590db0645925b4ceeb9e457bd52807f1e60b829bbf304106037.json
··· 1 + { 2 + "db_name": "SQLite", 3 + "query": "INSERT INTO repo_root (did, cid, rev, indexedAt) VALUES (?, ?, ?, ?)", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Right": 4 8 + }, 9 + "nullable": [] 10 + }, 11 + "hash": "fcd6cb9fdec21590db0645925b4ceeb9e457bd52807f1e60b829bbf304106037" 12 + }
+3
Cargo.lock
··· 673 673 "figment", 674 674 "futures", 675 675 "http-cache-reqwest", 676 + "k256", 676 677 "memmap2", 677 678 "metrics", 678 679 "metrics-exporter-prometheus", ··· 2082 2083 "cfg-if", 2083 2084 "ecdsa", 2084 2085 "elliptic-curve", 2086 + "once_cell", 2085 2087 "sha2", 2088 + "signature", 2086 2089 ] 2087 2090 2088 2091 [[package]]
+2 -1
Cargo.toml
··· 96 96 # pattern_type_mismatch = "allow" 97 97 # Style Allows 98 98 implicit_return = "allow" 99 - # self_named_module_files = "allow" # Choose one of self_named_module_files or mod_module_files 99 + self_named_module_files = "allow" # Choose one of self_named_module_files or mod_module_files 100 100 mod_module_files = "allow" 101 101 else_if_without_else = "allow" 102 102 std_instead_of_alloc = "allow" ··· 179 179 uuid = { version = "1.14.0", features = ["v4"] } 180 180 urlencoding = "2.1.3" 181 181 async-trait = "0.1.88" 182 + k256 = "0.13.4"
-4
migrations/20250507035800_sqlite_blockstore.down.sql
··· 1 - DROP INDEX IF EXISTS idx_tree_nodes_repo; 2 - DROP INDEX IF EXISTS idx_blocks_cid; 3 - DROP TABLE IF EXISTS tree_nodes; 4 - DROP TABLE IF EXISTS blocks;
-20
migrations/20250507035800_sqlite_blockstore.up.sql
··· 1 - -- Store raw blocks with their CIDs 2 - CREATE TABLE IF NOT EXISTS blocks ( 3 - cid TEXT PRIMARY KEY NOT NULL, 4 - data BLOB NOT NULL, 5 - multicodec INTEGER NOT NULL, 6 - multihash INTEGER NOT NULL 7 - ); 8 - 9 - -- Store the repository tree structure 10 - CREATE TABLE IF NOT EXISTS tree_nodes ( 11 - repo_did TEXT NOT NULL, 12 - key TEXT NOT NULL, 13 - value_cid TEXT NOT NULL, 14 - PRIMARY KEY (repo_did, key), 15 - FOREIGN KEY (value_cid) REFERENCES blocks(cid) 16 - ); 17 - 18 - -- Create index for faster lookups 19 - CREATE INDEX IF NOT EXISTS idx_blocks_cid ON blocks(cid); 20 - CREATE INDEX IF NOT EXISTS idx_tree_nodes_repo ON tree_nodes(repo_did);
+16
migrations/20250508251242_actor_store.down.sql
··· 1 + -- Drop indexes 2 + DROP INDEX IF EXISTS idx_backlink_link_to; 3 + DROP INDEX IF EXISTS idx_blob_tempkey; 4 + DROP INDEX IF EXISTS idx_record_repo_rev; 5 + DROP INDEX IF EXISTS idx_record_collection; 6 + DROP INDEX IF EXISTS idx_record_cid; 7 + DROP INDEX IF EXISTS idx_repo_block_repo_rev; 8 + 9 + -- Drop tables 10 + DROP TABLE IF EXISTS account_pref; 11 + DROP TABLE IF EXISTS backlink; 12 + DROP TABLE IF EXISTS record_blob; 13 + DROP TABLE IF EXISTS blob; 14 + DROP TABLE IF EXISTS record; 15 + DROP TABLE IF EXISTS repo_block; 16 + DROP TABLE IF EXISTS repo_root;
+70
migrations/20250508251242_actor_store.up.sql
··· 1 + -- Actor store schema matching TypeScript implementation 2 + 3 + -- Repository root information 4 + CREATE TABLE IF NOT EXISTS repo_root ( 5 + did TEXT PRIMARY KEY NOT NULL, 6 + cid TEXT NOT NULL, 7 + rev TEXT NOT NULL, 8 + indexedAt TEXT NOT NULL 9 + ); 10 + 11 + -- Repository blocks (IPLD blocks) 12 + CREATE TABLE IF NOT EXISTS repo_block ( 13 + cid TEXT PRIMARY KEY NOT NULL, 14 + repoRev TEXT NOT NULL, 15 + size INTEGER NOT NULL, 16 + content BLOB NOT NULL 17 + ); 18 + 19 + -- Record index 20 + CREATE TABLE IF NOT EXISTS record ( 21 + uri TEXT PRIMARY KEY NOT NULL, 22 + cid TEXT NOT NULL, 23 + collection TEXT NOT NULL, 24 + rkey TEXT NOT NULL, 25 + repoRev TEXT NOT NULL, 26 + indexedAt TEXT NOT NULL, 27 + takedownRef TEXT 28 + ); 29 + 30 + -- Blob storage metadata 31 + CREATE TABLE IF NOT EXISTS blob ( 32 + cid TEXT PRIMARY KEY NOT NULL, 33 + mimeType TEXT NOT NULL, 34 + size INTEGER NOT NULL, 35 + tempKey TEXT, 36 + width INTEGER, 37 + height INTEGER, 38 + createdAt TEXT NOT NULL, 39 + takedownRef TEXT 40 + ); 41 + 42 + -- Record-blob associations 43 + CREATE TABLE IF NOT EXISTS record_blob ( 44 + blobCid TEXT NOT NULL, 45 + recordUri TEXT NOT NULL, 46 + PRIMARY KEY (blobCid, recordUri) 47 + ); 48 + 49 + -- Backlinks between records 50 + CREATE TABLE IF NOT EXISTS backlink ( 51 + uri TEXT NOT NULL, 52 + path TEXT NOT NULL, 53 + linkTo TEXT NOT NULL, 54 + PRIMARY KEY (uri, path) 55 + ); 56 + 57 + -- User preferences 58 + CREATE TABLE IF NOT EXISTS account_pref ( 59 + id INTEGER PRIMARY KEY AUTOINCREMENT, 60 + name TEXT NOT NULL, 61 + valueJson TEXT NOT NULL 62 + ); 63 + 64 + -- Create indexes 65 + CREATE INDEX IF NOT EXISTS idx_repo_block_repo_rev ON repo_block(repoRev, cid); 66 + CREATE INDEX IF NOT EXISTS idx_record_cid ON record(cid); 67 + CREATE INDEX IF NOT EXISTS idx_record_collection ON record(collection); 68 + CREATE INDEX IF NOT EXISTS idx_record_repo_rev ON record(repoRev); 69 + CREATE INDEX IF NOT EXISTS idx_blob_tempkey ON blob(tempKey); 70 + CREATE INDEX IF NOT EXISTS idx_backlink_link_to ON backlink(path, linkTo);
+99
src/actor_store/blob.rs
··· 1 + // src/actor_store/blob.rs 2 + use anyhow::Result; 3 + use atrium_api::types::string::Cid; 4 + use sqlx::Row; 5 + 6 + use crate::actor_store::db::ActorDb; 7 + 8 + #[derive(Clone)] 9 + pub struct BlobReader { 10 + db: ActorDb, 11 + } 12 + 13 + impl BlobReader { 14 + pub fn new(db: ActorDb) -> Self { 15 + Self { db } 16 + } 17 + 18 + pub async fn get_blob_metadata(&self, cid: &Cid) -> Result<Option<BlobMetadata>> { 19 + let cid_str = format!("{:?}", cid); 20 + let row = sqlx::query( 21 + "SELECT mime_type, size, width, height 22 + FROM blob 23 + WHERE cid = ? AND takedown_ref IS NULL", 24 + ) 25 + .bind(&cid_str) 26 + .fetch_optional(&self.db.pool) 27 + .await?; 28 + 29 + match row { 30 + Some(row) => Ok(Some(BlobMetadata { 31 + size: row.get::<i64, _>("size") as u64, 32 + mime_type: row.get("mime_type"), 33 + })), 34 + None => Ok(None), 35 + } 36 + } 37 + 38 + pub async fn get_blobs_for_record(&self, record_uri: &str) -> Result<Vec<String>> { 39 + let rows = sqlx::query( 40 + "SELECT blob.cid 41 + FROM blob 42 + INNER JOIN record_blob ON record_blob.blob_cid = blob.cid 43 + WHERE record_uri = ?", 44 + ) 45 + .bind(record_uri) 46 + .fetch_all(&self.db.pool) 47 + .await?; 48 + 49 + Ok(rows.into_iter().map(|row| row.get("cid")).collect()) 50 + } 51 + } 52 + 53 + #[derive(Clone)] 54 + pub struct BlobTransactor { 55 + db: ActorDb, 56 + } 57 + 58 + impl BlobTransactor { 59 + pub fn new(db: ActorDb) -> Self { 60 + Self { db } 61 + } 62 + 63 + pub async fn associate_blob(&self, blob_cid: &str, record_uri: &str) -> Result<()> { 64 + sqlx::query( 65 + "INSERT INTO record_blob (blob_cid, record_uri) 66 + VALUES (?, ?) 67 + ON CONFLICT DO NOTHING", 68 + ) 69 + .bind(blob_cid) 70 + .bind(record_uri) 71 + .execute(&self.db.pool) 72 + .await?; 73 + 74 + Ok(()) 75 + } 76 + 77 + pub async fn register_blob(&self, cid: &str, mime_type: &str, size: u64) -> Result<()> { 78 + let now = chrono::Utc::now().to_rfc3339(); 79 + 80 + sqlx::query( 81 + "INSERT INTO blob (cid, mime_type, size, created_at) 82 + VALUES (?, ?, ?, ?) 83 + ON CONFLICT DO NOTHING", 84 + ) 85 + .bind(cid) 86 + .bind(mime_type) 87 + .bind(size as i64) 88 + .bind(now) 89 + .execute(&self.db.pool) 90 + .await?; 91 + 92 + Ok(()) 93 + } 94 + } 95 + 96 + pub struct BlobMetadata { 97 + pub size: u64, 98 + pub mime_type: String, 99 + }
+75
src/actor_store/db/migrations.rs
··· 1 + // src/actor_store/db/migrations.rs 2 + use anyhow::{Context as _, Result}; 3 + use sqlx::{Pool, Sqlite}; 4 + 5 + pub async fn run(pool: &Pool<Sqlite>) -> Result<()> { 6 + sqlx::query( 7 + " 8 + CREATE TABLE IF NOT EXISTS repo_root ( 9 + did TEXT PRIMARY KEY NOT NULL, 10 + cid TEXT NOT NULL, 11 + rev TEXT NOT NULL, 12 + indexed_at TEXT NOT NULL 13 + ); 14 + 15 + CREATE TABLE IF NOT EXISTS repo_block ( 16 + cid TEXT PRIMARY KEY NOT NULL, 17 + repo_rev TEXT NOT NULL, 18 + size INTEGER NOT NULL, 19 + content BLOB NOT NULL 20 + ); 21 + 22 + CREATE TABLE IF NOT EXISTS record ( 23 + uri TEXT PRIMARY KEY NOT NULL, 24 + cid TEXT NOT NULL, 25 + collection TEXT NOT NULL, 26 + rkey TEXT NOT NULL, 27 + repo_rev TEXT NOT NULL, 28 + indexed_at TEXT NOT NULL, 29 + takedown_ref TEXT 30 + ); 31 + 32 + CREATE TABLE IF NOT EXISTS blob ( 33 + cid TEXT PRIMARY KEY NOT NULL, 34 + mime_type TEXT NOT NULL, 35 + size INTEGER NOT NULL, 36 + temp_key TEXT, 37 + width INTEGER, 38 + height INTEGER, 39 + created_at TEXT NOT NULL, 40 + takedown_ref TEXT 41 + ); 42 + 43 + CREATE TABLE IF NOT EXISTS record_blob ( 44 + blob_cid TEXT NOT NULL, 45 + record_uri TEXT NOT NULL, 46 + PRIMARY KEY (blob_cid, record_uri) 47 + ); 48 + 49 + CREATE TABLE IF NOT EXISTS backlink ( 50 + uri TEXT NOT NULL, 51 + path TEXT NOT NULL, 52 + link_to TEXT NOT NULL, 53 + PRIMARY KEY (uri, path) 54 + ); 55 + 56 + CREATE TABLE IF NOT EXISTS account_pref ( 57 + id INTEGER PRIMARY KEY AUTOINCREMENT, 58 + name TEXT NOT NULL, 59 + value_json TEXT NOT NULL 60 + ); 61 + 62 + CREATE INDEX IF NOT EXISTS idx_record_cid ON record(cid); 63 + CREATE INDEX IF NOT EXISTS idx_record_collection ON record(collection); 64 + CREATE INDEX IF NOT EXISTS idx_record_repo_rev ON record(repo_rev); 65 + CREATE INDEX IF NOT EXISTS idx_blob_temp_key ON blob(temp_key); 66 + CREATE INDEX IF NOT EXISTS idx_repo_block_repo_rev ON repo_block(repo_rev, cid); 67 + CREATE INDEX IF NOT EXISTS idx_backlink_link_to ON backlink(path, link_to); 68 + ", 69 + ) 70 + .execute(pool) 71 + .await 72 + .context("Failed to run migrations")?; 73 + 74 + Ok(()) 75 + }
+37
src/actor_store/db/mod.rs
··· 1 + // src/actor_store/db/mod.rs 2 + pub mod migrations; 3 + 4 + use anyhow::Result; 5 + use sqlx::{Pool, Sqlite, Transaction}; 6 + 7 + #[derive(Clone)] 8 + pub struct ActorDb { 9 + pub pool: Pool<Sqlite>, 10 + } 11 + 12 + impl ActorDb { 13 + pub fn new(pool: Pool<Sqlite>) -> Self { 14 + Self { pool } 15 + } 16 + 17 + pub async fn migrate(&self) -> Result<()> { 18 + migrations::run(&self.pool).await 19 + } 20 + 21 + pub async fn transaction<F, R>(&self, f: F) -> Result<R> 22 + where 23 + F: for<'txn> FnOnce(&'txn mut Transaction<'_, Sqlite>) -> Result<R> + Send, 24 + R: Send + 'static, 25 + { 26 + self.pool.begin().await?.commit().await?; 27 + let mut tx = self.pool.begin().await?; 28 + let result = f(&mut tx)?; 29 + tx.commit().await?; 30 + Ok(result) 31 + } 32 + 33 + pub fn assert_transaction(&self) { 34 + // In a real implementation, we might want to check if we're in a transaction, 35 + // but SQLite allows nested transactions so it's not strictly necessary 36 + } 37 + }
+194
src/actor_store/mod.rs
··· 1 + // src/actor_store/mod.rs 2 + pub mod blob; 3 + pub mod db; 4 + pub mod preference; 5 + pub mod record; 6 + pub mod repo; 7 + 8 + use anyhow::{Context as _, Result}; 9 + use atrium_crypto::keypair::{Export as _, Keypair}; 10 + use k256::Secp256k1; 11 + use sqlx::sqlite::SqliteConnectOptions; 12 + use std::path::PathBuf; 13 + use std::str::FromStr; 14 + use std::sync::Arc; 15 + 16 + use crate::config::AppConfig; 17 + use blob::{BlobReader, BlobTransactor}; 18 + use db::ActorDb; 19 + use preference::{PreferenceReader, PreferenceTransactor}; 20 + use record::{RecordReader, RecordTransactor}; 21 + use repo::{RepoReader, RepoTransactor}; 22 + 23 + pub struct ActorStore { 24 + pub config: Arc<AppConfig>, 25 + } 26 + 27 + impl ActorStore { 28 + pub fn new(config: Arc<AppConfig>) -> Self { 29 + Self { config } 30 + } 31 + 32 + pub async fn exists(&self, did: &str) -> Result<bool> { 33 + let path = self.get_db_path(did)?; 34 + Ok(tokio::fs::try_exists(&path).await.unwrap_or(false)) 35 + } 36 + 37 + pub async fn keypair(&self, did: &str) -> Result<Keypair<Secp256k1>> { 38 + let path = self.get_keypair_path(did)?; 39 + let key_data = tokio::fs::read(&path) 40 + .await 41 + .context("failed to read keypair")?; 42 + Ok(Keypair::<Secp256k1>::import(&key_data)?) 43 + } 44 + 45 + async fn get_db(&self, did: &str) -> Result<ActorDb> { 46 + let path = self.get_db_path(did)?; 47 + let options = SqliteConnectOptions::from_str(&format!("sqlite:{}", path.display()))? 48 + .create_if_missing(true); 49 + let pool = sqlx::SqlitePool::connect_with(options).await?; 50 + 51 + Ok(ActorDb::new(pool)) 52 + } 53 + 54 + pub async fn read<F, T>(&self, did: &str, f: F) -> Result<T> 55 + where 56 + F: FnOnce(ActorStoreReader) -> Result<T>, 57 + { 58 + let db = self.get_db(did).await?; 59 + let keypair = Arc::new(self.keypair(did).await?); 60 + let reader = ActorStoreReader::new(did.to_string(), db, self.config.clone(), keypair); 61 + f(reader) 62 + } 63 + 64 + pub async fn transact<F, T>(&self, did: &str, f: F) -> Result<T> 65 + where 66 + F: FnOnce(ActorStoreTransactor) -> Result<T> + Send + 'static, 67 + T: Send + 'static, 68 + { 69 + let db = self.get_db(did).await?; 70 + let keypair = Arc::new(self.keypair(did).await?); 71 + 72 + db.transaction(|_| { 73 + let transactor = ActorStoreTransactor::new( 74 + did.to_string(), 75 + db.clone(), 76 + self.config.clone(), 77 + keypair.clone(), 78 + ); 79 + f(transactor) 80 + }) 81 + .await 82 + } 83 + 84 + pub async fn create(&self, did: &str, keypair: &Keypair<Secp256k1>) -> Result<()> { 85 + // Create directory structure 86 + let db_path = self.get_db_path(did)?; 87 + if let Some(parent) = db_path.parent() { 88 + tokio::fs::create_dir_all(parent).await?; 89 + } 90 + 91 + // Export keypair 92 + let key_path = self.get_keypair_path(did)?; 93 + let key_data = keypair.export(); 94 + tokio::fs::write(key_path, key_data).await?; 95 + 96 + // Create and initialize DB 97 + let db = self.get_db(did).await?; 98 + db.migrate().await?; 99 + 100 + Ok(()) 101 + } 102 + 103 + fn get_db_path(&self, did: &str) -> Result<PathBuf> { 104 + let did_hash = did 105 + .strip_prefix("did:plc:") 106 + .context("DID must be a PLC identifier")?; 107 + Ok(self.config.repo.path.join(format!("{}.db", did_hash))) 108 + } 109 + 110 + fn get_keypair_path(&self, did: &str) -> Result<PathBuf> { 111 + let did_hash = did 112 + .strip_prefix("did:plc:") 113 + .context("DID must be a PLC identifier")?; 114 + Ok(self.config.repo.path.join(format!("{}.key", did_hash))) 115 + } 116 + } 117 + 118 + pub struct ActorStoreReader { 119 + pub did: String, 120 + db: ActorDb, 121 + config: Arc<AppConfig>, 122 + keypair: Arc<Keypair<Secp256k1>>, 123 + } 124 + 125 + impl ActorStoreReader { 126 + fn new( 127 + did: String, 128 + db: ActorDb, 129 + config: Arc<AppConfig>, 130 + keypair: Arc<Keypair<Secp256k1>>, 131 + ) -> Self { 132 + Self { 133 + did, 134 + db, 135 + config, 136 + keypair, 137 + } 138 + } 139 + 140 + pub fn blob(&self) -> BlobReader { 141 + BlobReader::new(self.db.clone()) 142 + } 143 + 144 + pub fn record(&self) -> RecordReader { 145 + RecordReader::new(self.db.clone()) 146 + } 147 + 148 + pub fn repo(&self) -> RepoReader { 149 + RepoReader::new(self.db.clone(), self.did.clone()) 150 + } 151 + 152 + pub fn pref(&self) -> PreferenceReader { 153 + PreferenceReader::new(self.db.clone()) 154 + } 155 + } 156 + 157 + pub struct ActorStoreTransactor { 158 + pub did: String, 159 + db: ActorDb, 160 + config: Arc<AppConfig>, 161 + keypair: Arc<Keypair<Secp256k1>>, 162 + } 163 + 164 + impl ActorStoreTransactor { 165 + fn new( 166 + did: String, 167 + db: ActorDb, 168 + config: Arc<AppConfig>, 169 + keypair: Arc<Keypair<Secp256k1>>, 170 + ) -> Self { 171 + Self { 172 + did, 173 + db, 174 + config, 175 + keypair, 176 + } 177 + } 178 + 179 + pub fn blob(&self) -> BlobTransactor { 180 + BlobTransactor::new(self.db.clone()) 181 + } 182 + 183 + pub fn record(&self) -> RecordTransactor { 184 + RecordTransactor::new(self.db.clone()) 185 + } 186 + 187 + pub fn repo(&self) -> RepoTransactor { 188 + RepoTransactor::new(self.db.clone(), self.keypair.clone(), self.did.clone()) 189 + } 190 + 191 + pub fn pref(&self) -> PreferenceTransactor { 192 + PreferenceTransactor::new(self.db.clone()) 193 + } 194 + }
+101
src/actor_store/preference.rs
··· 1 + // src/actor_store/preference.rs 2 + use anyhow::Result; 3 + use serde_json::Value; 4 + use sqlx::Row; 5 + 6 + use crate::actor_store::db::ActorDb; 7 + 8 + #[derive(Clone)] 9 + pub struct PreferenceReader { 10 + db: ActorDb, 11 + } 12 + 13 + impl PreferenceReader { 14 + pub fn new(db: ActorDb) -> Self { 15 + Self { db } 16 + } 17 + 18 + pub async fn get_preferences(&self, namespace: Option<&str>) -> Result<Vec<Value>> { 19 + let rows = match namespace { 20 + Some(ns) => { 21 + let pattern = format!("{}%", ns); 22 + sqlx::query( 23 + "SELECT name, value_json FROM account_pref 24 + WHERE name = ? OR name LIKE ?", 25 + ) 26 + .bind(ns) 27 + .bind(&pattern) 28 + .fetch_all(&self.db.pool) 29 + .await? 30 + } 31 + None => { 32 + sqlx::query("SELECT name, value_json FROM account_pref") 33 + .fetch_all(&self.db.pool) 34 + .await? 35 + } 36 + }; 37 + 38 + let mut prefs = Vec::with_capacity(rows.len()); 39 + for row in rows { 40 + let name: String = row.get("name"); 41 + let value_json: String = row.get("value_json"); 42 + if let Ok(value) = serde_json::from_str(&value_json) { 43 + let mut obj: Value = value; 44 + if let Value::Object(ref mut map) = obj { 45 + map.insert("$type".to_string(), Value::String(name)); 46 + } 47 + prefs.push(obj); 48 + } 49 + } 50 + 51 + Ok(prefs) 52 + } 53 + } 54 + 55 + #[derive(Clone)] 56 + pub struct PreferenceTransactor { 57 + db: ActorDb, 58 + } 59 + 60 + impl PreferenceTransactor { 61 + pub fn new(db: ActorDb) -> Self { 62 + Self { db } 63 + } 64 + 65 + pub async fn put_preferences(&self, values: Vec<Value>, namespace: &str) -> Result<()> { 66 + self.db.assert_transaction(); 67 + 68 + // Delete existing preferences in namespace 69 + let pattern = format!("{}%", namespace); 70 + sqlx::query("DELETE FROM account_pref WHERE name = ? OR name LIKE ?") 71 + .bind(namespace) 72 + .bind(&pattern) 73 + .execute(&self.db.pool) 74 + .await?; 75 + 76 + // Insert new preferences 77 + for value in values { 78 + if let Some(type_val) = value.get("$type") { 79 + if let Some(name) = type_val.as_str() { 80 + // Check namespace 81 + if name == namespace || name.starts_with(&format!("{}.", namespace)) { 82 + let mut pref_value = value.clone(); 83 + if let Value::Object(ref mut map) = pref_value { 84 + map.remove("$type"); 85 + } 86 + 87 + let value_json = serde_json::to_string(&pref_value)?; 88 + 89 + sqlx::query("INSERT INTO account_pref (name, value_json) VALUES (?, ?)") 90 + .bind(name) 91 + .bind(&value_json) 92 + .execute(&self.db.pool) 93 + .await?; 94 + } 95 + } 96 + } 97 + } 98 + 99 + Ok(()) 100 + } 101 + }
+113
src/actor_store/record.rs
··· 1 + // src/actor_store/record.rs 2 + use anyhow::Result; 3 + use sqlx::Row; 4 + 5 + use crate::actor_store::db::ActorDb; 6 + 7 + #[derive(Clone)] 8 + pub struct RecordReader { 9 + db: ActorDb, 10 + } 11 + 12 + impl RecordReader { 13 + pub fn new(db: ActorDb) -> Self { 14 + Self { db } 15 + } 16 + 17 + pub async fn get_record(&self, uri: &str) -> Result<Option<RecordData>> { 18 + let row = sqlx::query( 19 + "SELECT record.*, repo_block.content 20 + FROM record 21 + INNER JOIN repo_block ON repo_block.cid = record.cid 22 + WHERE record.uri = ? AND record.takedown_ref IS NULL", 23 + ) 24 + .bind(uri) 25 + .fetch_optional(&self.db.pool) 26 + .await?; 27 + 28 + match row { 29 + Some(row) => Ok(Some(RecordData { 30 + uri: row.get("uri"), 31 + cid: row.get("cid"), 32 + collection: row.get("collection"), 33 + rkey: row.get("rkey"), 34 + content: row.get("content"), 35 + })), 36 + None => Ok(None), 37 + } 38 + } 39 + 40 + pub async fn list_collections(&self) -> Result<Vec<String>> { 41 + let rows = sqlx::query("SELECT DISTINCT collection FROM record") 42 + .fetch_all(&self.db.pool) 43 + .await?; 44 + 45 + Ok(rows.into_iter().map(|row| row.get("collection")).collect()) 46 + } 47 + } 48 + 49 + #[derive(Clone)] 50 + pub struct RecordTransactor { 51 + db: ActorDb, 52 + } 53 + 54 + impl RecordTransactor { 55 + pub fn new(db: ActorDb) -> Self { 56 + Self { db } 57 + } 58 + 59 + pub async fn index_record( 60 + &self, 61 + uri: &str, 62 + cid: &str, 63 + collection: &str, 64 + rkey: &str, 65 + repo_rev: &str, 66 + ) -> Result<()> { 67 + let now = chrono::Utc::now().to_rfc3339(); 68 + 69 + sqlx::query( 70 + "INSERT INTO record (uri, cid, collection, rkey, repo_rev, indexed_at) 71 + VALUES (?, ?, ?, ?, ?, ?) 72 + ON CONFLICT (uri) DO UPDATE SET 73 + cid = ?, 74 + repo_rev = ?, 75 + indexed_at = ?", 76 + ) 77 + .bind(uri) 78 + .bind(cid) 79 + .bind(collection) 80 + .bind(rkey) 81 + .bind(repo_rev) 82 + .bind(&now) 83 + .bind(cid) 84 + .bind(repo_rev) 85 + .bind(&now) 86 + .execute(&self.db.pool) 87 + .await?; 88 + 89 + Ok(()) 90 + } 91 + 92 + pub async fn delete_record(&self, uri: &str) -> Result<()> { 93 + sqlx::query("DELETE FROM record WHERE uri = ?") 94 + .bind(uri) 95 + .execute(&self.db.pool) 96 + .await?; 97 + 98 + sqlx::query("DELETE FROM backlink WHERE uri = ?") 99 + .bind(uri) 100 + .execute(&self.db.pool) 101 + .await?; 102 + 103 + Ok(()) 104 + } 105 + } 106 + 107 + pub struct RecordData { 108 + pub uri: String, 109 + pub cid: String, 110 + pub collection: String, 111 + pub rkey: String, 112 + pub content: Vec<u8>, 113 + }
+104
src/actor_store/repo.rs
··· 1 + // src/actor_store/repo.rs 2 + use anyhow::Result; 3 + use atrium_crypto::keypair::Keypair; 4 + use k256::Secp256k1; 5 + use sqlx::Row; 6 + use std::sync::Arc; 7 + 8 + use crate::actor_store::db::ActorDb; 9 + 10 + #[derive(Clone)] 11 + pub struct RepoReader { 12 + db: ActorDb, 13 + did: String, 14 + } 15 + 16 + impl RepoReader { 17 + pub fn new(db: ActorDb, did: String) -> Self { 18 + Self { db, did } 19 + } 20 + 21 + pub async fn get_root(&self) -> Result<Option<RepoRoot>> { 22 + let row = sqlx::query("SELECT cid, rev, indexed_at FROM repo_root WHERE did = ?") 23 + .bind(&self.did) 24 + .fetch_optional(&self.db.pool) 25 + .await?; 26 + 27 + match row { 28 + Some(row) => Ok(Some(RepoRoot { 29 + cid: row.get("cid"), 30 + rev: row.get("rev"), 31 + indexed_at: row.get("indexed_at"), 32 + })), 33 + None => Ok(None), 34 + } 35 + } 36 + 37 + pub async fn get_block(&self, cid: &str) -> Result<Option<Vec<u8>>> { 38 + let row = sqlx::query("SELECT content FROM repo_block WHERE cid = ?") 39 + .bind(cid) 40 + .fetch_optional(&self.db.pool) 41 + .await?; 42 + 43 + Ok(row.map(|r| r.get("content"))) 44 + } 45 + } 46 + 47 + #[derive(Clone)] 48 + pub struct RepoTransactor { 49 + db: ActorDb, 50 + keypair: Arc<Keypair<Secp256k1>>, 51 + did: String, 52 + } 53 + 54 + impl RepoTransactor { 55 + pub fn new(db: ActorDb, keypair: Arc<Keypair<Secp256k1>>, did: String) -> Self { 56 + Self { db, keypair, did } 57 + } 58 + 59 + pub async fn update_root(&self, cid: &str, rev: &str) -> Result<()> { 60 + let now = chrono::Utc::now().to_rfc3339(); 61 + 62 + sqlx::query( 63 + "INSERT INTO repo_root (did, cid, rev, indexed_at) 64 + VALUES (?, ?, ?, ?) 65 + ON CONFLICT (did) DO UPDATE SET 66 + cid = ?, 67 + rev = ?, 68 + indexed_at = ?", 69 + ) 70 + .bind(&self.did) 71 + .bind(cid) 72 + .bind(rev) 73 + .bind(&now) 74 + .bind(cid) 75 + .bind(rev) 76 + .bind(&now) 77 + .execute(&self.db.pool) 78 + .await?; 79 + 80 + Ok(()) 81 + } 82 + 83 + pub async fn put_block(&self, cid: &str, content: &[u8], repo_rev: &str) -> Result<()> { 84 + sqlx::query( 85 + "INSERT INTO repo_block (cid, repo_rev, size, content) 86 + VALUES (?, ?, ?, ?) 87 + ON CONFLICT (cid) DO NOTHING", 88 + ) 89 + .bind(cid) 90 + .bind(repo_rev) 91 + .bind(content.len() as i64) 92 + .bind(content) 93 + .execute(&self.db.pool) 94 + .await?; 95 + 96 + Ok(()) 97 + } 98 + } 99 + 100 + pub struct RepoRoot { 101 + pub cid: String, 102 + pub rev: String, 103 + pub indexed_at: String, 104 + }
+368 -761
src/endpoints/repo.rs
··· 1 1 //! PDS repository endpoints /xrpc/com.atproto.repo.*) 2 - use std::{collections::HashSet, str::FromStr as _}; 2 + use std::{collections::HashSet, str::FromStr as _, sync::Arc}; 3 3 4 4 use anyhow::{Context as _, anyhow}; 5 5 use atrium_api::{ ··· 9 9 string::{AtIdentifier, Nsid, Tid}, 10 10 }, 11 11 }; 12 - use atrium_repo::{Cid, blockstore::CarStore}; 12 + use atrium_repo::Cid; 13 13 use axum::{ 14 14 Json, Router, 15 15 body::Body, 16 - extract::{Query, Request, State}, 16 + extract::{Query, State, multipart::Field}, 17 17 http::{self, StatusCode}, 18 18 routing::{get, post}, 19 19 }; 20 20 use constcat::concat; 21 21 use futures::TryStreamExt as _; 22 + use k256::Secp256k1; 22 23 use metrics::counter; 23 24 use serde::Deserialize; 24 25 use sha2::{Digest as _, Sha256}; 25 26 use tokio::io::AsyncWriteExt as _; 26 27 27 28 use crate::{ 28 - AppState, Db, Error, Result, SigningKey, 29 + AppState, Db, Error, Result, 30 + actor_store::{ActorStore, ActorStoreTransactor}, 29 31 auth::AuthenticatedUser, 30 32 config::AppConfig, 31 33 error::ErrorMessage, 32 34 firehose::{self, FirehoseProducer, RepoOp}, 33 35 metrics::{REPO_COMMITS, REPO_OP_CREATE, REPO_OP_DELETE, REPO_OP_UPDATE}, 34 - storage, 35 36 }; 36 37 37 - /// IPLD CID raw binary 38 - const IPLD_RAW: u64 = 0x55; 39 - /// SHA2-256 mulithash 40 - const IPLD_MH_SHA2_256: u64 = 0x12; 41 - 42 - /// Used in [`scan_blobs`] to identify a blob. 43 - #[derive(Deserialize, Debug, Clone)] 44 - struct BlobRef { 45 - /// `BlobRef` link. Include `$` when serializing to JSON, since `$` isn't allowed in struct names. 46 - #[serde(rename = "$link")] 47 - link: String, 48 - } 49 - 38 + /// Parameters for [`list_records`]. 50 39 #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] 51 40 #[serde(rename_all = "camelCase")] 52 - /// Parameters for [`list_records`]. 53 41 pub(super) struct ListRecordsParameters { 54 - ///The NSID of the record type. 55 42 pub collection: Nsid, 56 - /// The cursor to start from. 57 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 58 43 pub cursor: Option<String>, 59 - ///The number of records to return. 60 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 61 44 pub limit: Option<String>, 62 - ///The handle or DID of the repo. 63 45 pub repo: AtIdentifier, 64 - ///Flag to reverse the order of the returned records. 65 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 66 46 pub reverse: Option<bool>, 67 - ///DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) 68 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 69 47 pub rkey_end: Option<String>, 70 - ///DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) 71 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 72 48 pub rkey_start: Option<String>, 73 49 } 74 50 75 - async fn swap_commit( 76 - db: impl sqlx::Executor<'_, Database = sqlx::Sqlite>, 77 - cid: Cid, 78 - rev: Tid, 79 - old_cid: Option<Cid>, 80 - did_str: &str, 81 - ) -> anyhow::Result<bool> { 82 - let cid_str = cid.to_string(); 83 - let rev_str = rev.to_string(); 84 - 85 - if let Some(swap) = old_cid { 86 - let swap_str = swap.to_string(); 87 - 88 - let result = sqlx::query!( 89 - r#"UPDATE accounts SET root = ?, rev = ? WHERE did = ? AND root = ?"#, 90 - cid_str, 91 - rev_str, 92 - did_str, 93 - swap_str, 94 - ) 95 - .execute(db) 96 - .await 97 - .context("failed to update root")?; 98 - 99 - // If the swap failed, indicate as such. 100 - Ok(result.rows_affected() != 0) 101 - } else { 102 - _ = sqlx::query!( 103 - r#"UPDATE accounts SET root = ?, rev = ? WHERE did = ?"#, 104 - cid_str, 105 - rev_str, 106 - did_str, 107 - ) 108 - .execute(db) 109 - .await 110 - .context("failed to update root")?; 111 - 112 - Ok(true) 113 - } 114 - } 115 - 116 - /// Resolves DID to DID document. Does not bi-directionally verify handle. 117 - /// - GET /xrpc/com.atproto.repo.resolveDid 118 - /// ### Query Parameters 119 - /// - `did`: DID to resolve. 120 - /// ### Responses 121 - /// - 200 OK: {`did_doc`: `did_doc`} 122 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `DidNotFound`, `DidDeactivated`]} 51 + /// Resolves DID from an identifier 123 52 async fn resolve_did( 124 53 db: &Db, 125 54 identifier: &AtIdentifier, ··· 129 58 )> { 130 59 let (handle, did) = match *identifier { 131 60 AtIdentifier::Handle(ref handle) => { 132 - let handle_as_str = &handle.as_str(); 133 - ( 134 - &handle.to_owned(), 135 - &atrium_api::types::string::Did::new( 136 - sqlx::query_scalar!( 137 - r#"SELECT did FROM handles WHERE handle = ?"#, 138 - handle_as_str 139 - ) 61 + let handle_str = handle.as_str(); 62 + let did_str = 63 + sqlx::query_scalar!(r#"SELECT did FROM handles WHERE handle = ?"#, handle_str) 140 64 .fetch_one(db) 141 65 .await 142 - .context("failed to query did")?, 143 - ) 144 - .expect("should be valid DID"), 66 + .context("failed to query did")?; 67 + 68 + ( 69 + handle.clone(), 70 + atrium_api::types::string::Did::new(did_str).expect("should be valid DID format"), 145 71 ) 146 72 } 147 73 AtIdentifier::Did(ref did) => { 148 - let did_as_str = &did.as_str(); 74 + let did_str = did.as_str(); 75 + let handle_str = 76 + sqlx::query_scalar!(r#"SELECT handle FROM handles WHERE did = ?"#, did_str) 77 + .fetch_one(db) 78 + .await 79 + .context("failed to query handle")?; 80 + 149 81 ( 150 - &atrium_api::types::string::Handle::new( 151 - sqlx::query_scalar!(r#"SELECT handle FROM handles WHERE did = ?"#, did_as_str) 152 - .fetch_one(db) 153 - .await 154 - .context("failed to query did")?, 155 - ) 156 - .expect("should be valid handle"), 157 - &did.to_owned(), 82 + atrium_api::types::string::Handle::new(handle_str).expect("should be valid handle"), 83 + did.clone(), 158 84 ) 159 85 } 160 86 }; 161 87 162 - Ok((did.to_owned(), handle.to_owned())) 88 + Ok((did, handle)) 163 89 } 164 90 165 - /// Used in [`apply_writes`] to scan for blobs in the JSON object and return their CIDs. 166 - fn scan_blobs(unknown: &Unknown) -> anyhow::Result<Vec<Cid>> { 167 - // { "$type": "blob", "ref": { "$link": "bafyrei..." } } 91 + /// Extract blobs from a record 92 + fn extract_blobs(record: &Unknown) -> anyhow::Result<Vec<Cid>> { 93 + let val = serde_json::Value::try_from_unknown(record.clone())?; 94 + let mut cids = Vec::new(); 95 + let mut stack = vec![val]; 168 96 169 - let mut cids = Vec::new(); 170 - let mut stack = vec![ 171 - serde_json::Value::try_from_unknown(unknown.clone()) 172 - .context("failed to convert unknown into json")?, 173 - ]; 174 97 while let Some(value) = stack.pop() { 175 98 match value { 176 - serde_json::Value::Bool(_) 177 - | serde_json::Value::Null 178 - | serde_json::Value::Number(_) 179 - | serde_json::Value::String(_) => (), 180 - serde_json::Value::Array(values) => stack.extend(values.into_iter()), 181 99 serde_json::Value::Object(map) => { 182 - if let (Some(blob_type), Some(blob_ref)) = (map.get("$type"), map.get("ref")) { 183 - if blob_type == &serde_json::Value::String("blob".to_owned()) { 184 - if let Ok(rf) = serde_json::from_value::<BlobRef>(blob_ref.clone()) { 185 - cids.push(Cid::from_str(&rf.link).context("failed to convert cid")?); 100 + if let (Some(typ), Some(blob_ref)) = (map.get("$type"), map.get("ref")) { 101 + if typ == "blob" { 102 + if let Some(link) = blob_ref.get("$link").and_then(|v| v.as_str()) { 103 + cids.push(Cid::from_str(link)?); 186 104 } 187 105 } 188 106 } 189 - 190 107 stack.extend(map.values().cloned()); 191 108 } 109 + serde_json::Value::Array(arr) => { 110 + stack.extend(arr); 111 + } 112 + _ => {} 192 113 } 193 114 } 194 115 195 116 Ok(cids) 196 117 } 197 118 198 - #[test] 199 - fn test_scan_blobs() { 200 - use std::str::FromStr as _; 201 - 202 - let json = serde_json::json!({ 203 - "test": "a", 204 - "blob": { 205 - "$type": "blob", 206 - "ref": { 207 - "$link": "bafkreifzxf2wa6dyakzbdaxkz2wkvfrv3hiuafhxewbn5wahcw6eh3hzji" 208 - } 209 - } 210 - }); 211 - 212 - let blob = scan_blobs(&json.try_into_unknown().expect("should be valid JSON")) 213 - .expect("should be able to scan blobs"); 214 - assert_eq!( 215 - blob, 216 - vec![ 217 - Cid::from_str("bafkreifzxf2wa6dyakzbdaxkz2wkvfrv3hiuafhxewbn5wahcw6eh3hzji") 218 - .expect("should be valid CID") 219 - ] 220 - ); 221 - } 222 - 223 - #[expect(clippy::too_many_lines)] 224 - /// Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS. 225 - /// - POST /xrpc/com.atproto.repo.applyWrites 226 - /// ### Request Body 227 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 228 - /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data across all operations, 'true' to require it, or leave unset to validate only for known Lexicons. 229 - /// - `writes`: `object[]` // One of: 230 - /// - - com.atproto.repo.applyWrites.create 231 - /// - - com.atproto.repo.applyWrites.update 232 - /// - - com.atproto.repo.applyWrites.delete 233 - /// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations. 119 + /// Apply writes to a repository 234 120 async fn apply_writes( 235 121 user: AuthenticatedUser, 236 - State(skey): State<SigningKey>, 237 122 State(config): State<AppConfig>, 238 123 State(db): State<Db>, 239 124 State(fhp): State<FirehoseProducer>, ··· 241 126 ) -> Result<Json<repo::apply_writes::Output>> { 242 127 use atrium_api::com::atproto::repo::apply_writes::{self, InputWritesItem, OutputResultsItem}; 243 128 244 - // TODO: `input.validate` 245 - 246 - let (target_did, _) = resolve_did(&db, &input.repo) 247 - .await 248 - .context("failed to resolve did")?; 249 - 250 - // Ensure that we are updating the correct repository. 129 + // Resolve target DID 130 + let (target_did, _) = resolve_did(&db, &input.repo).await?; 251 131 if target_did.as_str() != user.did() { 252 132 return Err(Error::with_status( 253 133 StatusCode::BAD_REQUEST, ··· 255 135 )); 256 136 } 257 137 258 - let mut repo = storage::open_repo_db(&config.repo, &db, user.did()) 259 - .await 260 - .context("failed to open user repo")?; 261 - let orig_cid = repo.root(); 262 - let orig_rev = repo.commit().rev(); 138 + let actor_store = ActorStore::new(Arc::new(config)); 139 + 140 + // Convert input writes to internal format 141 + let result = actor_store 142 + .transact(user.did(), |txn| { 143 + let mut results = Vec::new(); 144 + let mut blob_info = Vec::new(); 145 + let mut ops = Vec::new(); 263 146 264 - let mut blobs = vec![]; 265 - let mut res = vec![]; 266 - let mut ops = vec![]; 267 - let mut keys = vec![]; 268 - for write in &input.writes { 269 - let (builder, key) = match *write { 270 - InputWritesItem::Create(ref object) => { 271 - let now = Tid::now(LimitedU32::MIN); 272 - let key = format!( 273 - "{}/{}", 274 - &object.collection.as_str(), 275 - &object.rkey.as_deref().unwrap_or_else(|| now.as_str()) 276 - ); 277 - let uri = format!("at://{}/{}", user.did(), key); 147 + for write in &input.writes { 148 + match write { 149 + InputWritesItem::Create(create) => { 150 + let collection = create.collection.as_str(); 151 + let rkey = create 152 + .rkey 153 + .as_deref() 154 + .unwrap_or_else(|| Tid::now(LimitedU32::MIN).as_str()); 155 + let uri = format!("at://{}/{}/{}", user.did(), collection, rkey); 278 156 279 - let (builder, cid) = repo 280 - .add_raw(&key, &object.value) 281 - .await 282 - .context("failed to add record")?; 157 + // Process blobs in record 158 + for blob_cid in extract_blobs(&create.value)? { 159 + blob_info.push((blob_cid, uri.clone())); 160 + } 283 161 284 - if let Ok(new_blobs) = scan_blobs(&object.value) { 285 - blobs.extend( 286 - new_blobs 287 - .into_iter() 288 - .map(|blob_cid| (key.clone(), blob_cid)), 289 - ); 290 - } 162 + // Create the record 163 + let cid = txn.repo().create_record(collection, rkey, &create.value)?; 291 164 292 - ops.push(RepoOp::Create { 293 - cid, 294 - path: key.clone(), 295 - }); 165 + ops.push(RepoOp::Create { 166 + cid, 167 + path: format!("{}/{}", collection, rkey), 168 + }); 296 169 297 - res.push(OutputResultsItem::CreateResult(Box::new( 298 - apply_writes::CreateResultData { 299 - cid: atrium_api::types::string::Cid::new(cid), 300 - uri, 301 - validation_status: None, 170 + results.push(OutputResultsItem::CreateResult(Box::new( 171 + apply_writes::CreateResultData { 172 + cid: atrium_api::types::string::Cid::new(cid), 173 + uri, 174 + validation_status: None, 175 + } 176 + .into(), 177 + ))); 302 178 } 303 - .into(), 304 - ))); 179 + InputWritesItem::Update(update) => { 180 + let collection = update.collection.as_str(); 181 + let rkey = update.rkey.as_str(); 182 + let uri = format!("at://{}/{}/{}", user.did(), collection, rkey); 183 + let record_path = format!("{}/{}", collection, rkey); 305 184 306 - (builder, key) 307 - } 308 - InputWritesItem::Update(ref object) => { 309 - let builder: atrium_repo::repo::CommitBuilder<_>; 310 - let key = format!("{}/{}", &object.collection.as_str(), &object.rkey.as_str()); 311 - let uri = format!("at://{}/{}", user.did(), key); 312 - 313 - let prev = repo 314 - .tree() 315 - .get(&key) 316 - .await 317 - .context("failed to search MST")?; 318 - if prev.is_none() { 319 - let (create_builder, cid) = repo 320 - .add_raw(&key, &object.value) 321 - .await 322 - .context("failed to add record")?; 323 - if let Ok(new_blobs) = scan_blobs(&object.value) { 324 - blobs.extend( 325 - new_blobs 326 - .into_iter() 327 - .map(|blod_cid| (key.clone(), blod_cid)), 328 - ); 329 - } 330 - ops.push(RepoOp::Create { 331 - cid, 332 - path: key.clone(), 333 - }); 334 - res.push(OutputResultsItem::CreateResult(Box::new( 335 - apply_writes::CreateResultData { 336 - cid: atrium_api::types::string::Cid::new(cid), 337 - uri, 338 - validation_status: None, 185 + // Process blobs in record 186 + for blob_cid in extract_blobs(&update.value)? { 187 + blob_info.push((blob_cid, uri.clone())); 339 188 } 340 - .into(), 341 - ))); 342 - builder = create_builder; 343 - } else { 344 - let prev = prev.context("should be able to find previous record")?; 345 - let (update_builder, cid) = repo 346 - .update_raw(&key, &object.value) 347 - .await 348 - .context("failed to add record")?; 349 - if let Ok(new_blobs) = scan_blobs(&object.value) { 350 - blobs.extend( 351 - new_blobs 352 - .into_iter() 353 - .map(|blod_cid| (key.clone(), blod_cid)), 354 - ); 355 - } 356 - ops.push(RepoOp::Update { 357 - cid, 358 - path: key.clone(), 359 - prev, 360 - }); 361 - res.push(OutputResultsItem::UpdateResult(Box::new( 362 - apply_writes::UpdateResultData { 363 - cid: atrium_api::types::string::Cid::new(cid), 364 - uri, 365 - validation_status: None, 366 - } 367 - .into(), 368 - ))); 369 - builder = update_builder; 370 - } 371 - (builder, key) 372 - } 373 - InputWritesItem::Delete(ref object) => { 374 - let key = format!("{}/{}", &object.collection.as_str(), &object.rkey.as_str()); 375 189 376 - let prev = repo 377 - .tree() 378 - .get(&key) 379 - .await 380 - .context("failed to search MST")? 381 - .context("previous record does not exist")?; 190 + // Get current record 191 + let record = txn.record().get_record(&record_path)?; 382 192 383 - ops.push(RepoOp::Delete { 384 - path: key.clone(), 385 - prev, 386 - }); 193 + if let Some(existing) = record { 194 + // Update existing record 195 + let prev = Cid::from_str(&existing.cid)?; 196 + let cid = txn.repo().update_record(collection, rkey, &update.value)?; 387 197 388 - res.push(OutputResultsItem::DeleteResult(Box::new( 389 - apply_writes::DeleteResultData {}.into(), 390 - ))); 391 - 392 - let builder = repo 393 - .delete_raw(&key) 394 - .await 395 - .context("failed to add record")?; 396 - 397 - (builder, key) 398 - } 399 - }; 198 + ops.push(RepoOp::Update { 199 + cid, 200 + path: record_path, 201 + prev, 202 + }); 400 203 401 - let sig = skey 402 - .sign(&builder.bytes()) 403 - .context("failed to sign commit")?; 204 + results.push(OutputResultsItem::UpdateResult(Box::new( 205 + apply_writes::UpdateResultData { 206 + cid: atrium_api::types::string::Cid::new(cid), 207 + uri, 208 + validation_status: None, 209 + } 210 + .into(), 211 + ))); 212 + } else { 213 + // Create record if doesn't exist 214 + let cid = txn.repo().create_record(collection, rkey, &update.value)?; 404 215 405 - _ = builder 406 - .finalize(sig) 407 - .await 408 - .context("failed to write signed commit")?; 216 + ops.push(RepoOp::Create { 217 + cid, 218 + path: record_path, 219 + }); 409 220 410 - keys.push(key); 411 - } 221 + results.push(OutputResultsItem::CreateResult(Box::new( 222 + apply_writes::CreateResultData { 223 + cid: atrium_api::types::string::Cid::new(cid), 224 + uri, 225 + validation_status: None, 226 + } 227 + .into(), 228 + ))); 229 + } 230 + } 231 + InputWritesItem::Delete(delete) => { 232 + let collection = delete.collection.as_str(); 233 + let rkey = delete.rkey.as_str(); 234 + let record_path = format!("{}/{}", collection, rkey); 412 235 413 - // Construct a firehose record. 414 - let mut mem = Vec::new(); 415 - let mut store = CarStore::create_with_roots(std::io::Cursor::new(&mut mem), [repo.root()]) 416 - .await 417 - .context("failed to create temp store")?; 236 + // Get current record 237 + let record = txn.record().get_record(&record_path)?; 418 238 419 - // Extract the records out of the user's repository. 420 - for key in keys { 421 - repo.extract_raw_into(&key, &mut store) 422 - .await 423 - .context("failed to extract key")?; 424 - } 239 + if let Some(existing) = record { 240 + let prev = Cid::from_str(&existing.cid)?; 425 241 426 - let mut tx = db.begin().await.context("failed to begin transaction")?; 242 + // Delete record 243 + txn.repo().delete_record(collection, rkey)?; 427 244 428 - if !swap_commit( 429 - &mut *tx, 430 - repo.root(), 431 - repo.commit().rev(), 432 - input.swap_commit.as_ref().map(|cid| *cid.as_ref()), 433 - &user.did(), 434 - ) 435 - .await 436 - .context("failed to swap commit")? 437 - { 438 - // This should always succeed. 439 - let old = input 440 - .swap_commit 441 - .clone() 442 - .context("swap_commit should always be Some")?; 245 + ops.push(RepoOp::Delete { 246 + path: record_path, 247 + prev, 248 + }); 443 249 444 - // The swap failed. Return the old commit and do not update the repository. 445 - return Ok(Json( 446 - apply_writes::OutputData { 447 - results: None, 448 - commit: Some( 449 - CommitMetaData { 450 - cid: old, 451 - rev: orig_rev, 250 + results.push(OutputResultsItem::DeleteResult(Box::new( 251 + apply_writes::DeleteResultData {}.into(), 252 + ))); 253 + } 452 254 } 453 - .into(), 454 - ), 255 + } 455 256 } 456 - .into(), 457 - )); 458 - } 459 257 460 - let did_str = user.did(); 461 - 462 - // For updates and removals, unlink the old/deleted record from the blob_ref table. 463 - for op in &ops { 464 - match op { 465 - &RepoOp::Update { ref path, .. } | &RepoOp::Delete { ref path, .. } => { 466 - // FIXME: This may cause issues if a user deletes more than one record referencing the same blob. 467 - _ = &sqlx::query!( 468 - r#"UPDATE blob_ref SET record = NULL WHERE did = ? AND record = ?"#, 469 - did_str, 470 - path 471 - ) 472 - .execute(&mut *tx) 473 - .await 474 - .context("failed to remove blob_ref")?; 258 + // Process blob associations 259 + for (blob_cid, record_uri) in &blob_info { 260 + txn.blob() 261 + .register_blob(blob_cid.to_string(), "application/octet-stream", 0)?; 262 + txn.blob() 263 + .associate_blob(&blob_cid.to_string(), record_uri)?; 475 264 } 476 - &RepoOp::Create { .. } => {} 477 - } 478 - } 479 265 480 - for &mut (ref key, cid) in &mut blobs { 481 - let cid_str = cid.to_string(); 266 + // Get updated repo root 267 + let repo_root = txn.repo().get_root()?; 482 268 483 - // Handle the case where a new record references an existing blob. 484 - if sqlx::query!( 485 - r#"UPDATE blob_ref SET record = ? WHERE cid = ? AND did = ? AND record IS NULL"#, 486 - key, 487 - cid_str, 488 - did_str, 489 - ) 490 - .execute(&mut *tx) 491 - .await 492 - .context("failed to update blob_ref")? 493 - .rows_affected() 494 - == 0 495 - { 496 - _ = sqlx::query!( 497 - r#"INSERT INTO blob_ref (record, cid, did) VALUES (?, ?, ?)"#, 498 - key, 499 - cid_str, 500 - did_str, 501 - ) 502 - .execute(&mut *tx) 503 - .await 504 - .context("failed to update blob_ref")?; 505 - } 506 - } 269 + // Update metrics 270 + counter!(REPO_COMMITS).increment(1); 271 + for op in &ops { 272 + match op { 273 + RepoOp::Create { .. } => counter!(REPO_OP_CREATE).increment(1), 274 + RepoOp::Update { .. } => counter!(REPO_OP_UPDATE).increment(1), 275 + RepoOp::Delete { .. } => counter!(REPO_OP_DELETE).increment(1), 276 + } 277 + } 507 278 508 - tx.commit() 509 - .await 510 - .context("failed to commit blob ref to database")?; 279 + // Send ops to firehose 280 + // (In real impl, we'd construct a firehose event here) 511 281 512 - // Update counters. 513 - counter!(REPO_COMMITS).increment(1); 514 - for op in &ops { 515 - match *op { 516 - RepoOp::Create { .. } => counter!(REPO_OP_CREATE).increment(1), 517 - RepoOp::Update { .. } => counter!(REPO_OP_UPDATE).increment(1), 518 - RepoOp::Delete { .. } => counter!(REPO_OP_DELETE).increment(1), 519 - } 520 - } 521 - 522 - // We've committed the transaction to the database, and the commit is now stored in the user's 523 - // canonical repository. 524 - // We can now broadcast this on the firehose. 525 - fhp.commit(firehose::Commit { 526 - car: mem, 527 - ops, 528 - cid: repo.root(), 529 - rev: repo.commit().rev().to_string(), 530 - did: atrium_api::types::string::Did::new(user.did()).expect("should be valid DID"), 531 - pcid: Some(orig_cid), 532 - blobs: blobs.into_iter().map(|(_, cid)| cid).collect::<Vec<_>>(), 533 - }) 534 - .await; 282 + Ok(apply_writes::OutputData { 283 + results: Some(results), 284 + commit: Some( 285 + CommitMetaData { 286 + cid: atrium_api::types::string::Cid::new(Cid::from_str(&repo_root.cid)?), 287 + rev: Tid::from_str(&repo_root.rev)?, 288 + } 289 + .into(), 290 + ), 291 + }) 292 + }) 293 + .await?; 535 294 536 - Ok(Json( 537 - apply_writes::OutputData { 538 - results: Some(res), 539 - commit: Some( 540 - CommitMetaData { 541 - cid: atrium_api::types::string::Cid::new(repo.root()), 542 - rev: repo.commit().rev(), 543 - } 544 - .into(), 545 - ), 546 - } 547 - .into(), 548 - )) 295 + Ok(Json(result.into())) 549 296 } 550 297 551 - /// Create a single new repository record. Requires auth, implemented by PDS. 552 - /// - POST /xrpc/com.atproto.repo.createRecord 553 - /// ### Request Body 554 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 555 - /// - `collection`: `nsid` // The NSID of the record collection. 556 - /// - `rkey`: `string` // The record key. <= 512 characters. 557 - /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 558 - /// - `record` 559 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 560 - /// ### Responses 561 - /// - 200 OK: {`cid`: `cid`, `uri`: `at-uri`, `commit`: {`cid`: `cid`, `rev`: `tid`}, `validation_status`: [`valid`, `unknown`]} 562 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 563 - /// - 401 Unauthorized 298 + /// Create a single new repository record 564 299 async fn create_record( 565 300 user: AuthenticatedUser, 566 - State(skey): State<SigningKey>, 567 - State(config): State<AppConfig>, 568 - State(db): State<Db>, 569 - State(fhp): State<FirehoseProducer>, 301 + state: State<AppConfig>, 302 + db_state: State<Db>, 303 + fhp_state: State<FirehoseProducer>, 570 304 Json(input): Json<repo::create_record::Input>, 571 305 ) -> Result<Json<repo::create_record::Output>> { 572 - let write_result = apply_writes( 573 - user, 574 - State(skey), 575 - State(config), 576 - State(db), 577 - State(fhp), 578 - Json( 579 - repo::apply_writes::InputData { 580 - repo: input.repo.clone(), 581 - validate: input.validate, 582 - swap_commit: input.swap_commit.clone(), 583 - writes: vec![repo::apply_writes::InputWritesItem::Create(Box::new( 584 - repo::apply_writes::CreateData { 585 - collection: input.collection.clone(), 586 - rkey: input.rkey.clone(), 587 - value: input.record.clone(), 588 - } 589 - .into(), 590 - ))], 306 + // Convert the create_record operation to apply_writes 307 + let apply_writes_input = repo::apply_writes::InputData { 308 + repo: input.repo.clone(), 309 + validate: input.validate, 310 + swap_commit: input.swap_commit.clone(), 311 + writes: vec![repo::apply_writes::InputWritesItem::Create(Box::new( 312 + repo::apply_writes::CreateData { 313 + collection: input.collection.clone(), 314 + rkey: input.rkey.clone(), 315 + value: input.record.clone(), 591 316 } 592 317 .into(), 593 - ), 594 - ) 595 - .await 596 - .context("failed to apply writes")?; 318 + ))], 319 + } 320 + .into(); 321 + 322 + let write_result = 323 + apply_writes(user, state, db_state, fhp_state, Json(apply_writes_input)).await?; 597 324 598 - let create_result = if let repo::apply_writes::OutputResultsItem::CreateResult(create_result) = 599 - write_result 600 - .results 601 - .clone() 602 - .and_then(|result| result.first().cloned()) 603 - .context("unexpected output from apply_writes")? 325 + // Extract the first result 326 + let create_result = match write_result 327 + .results 328 + .and_then(|results| results.first().cloned()) 604 329 { 605 - Some(create_result) 606 - } else { 607 - None 608 - } 609 - .context("unexpected result from apply_writes")?; 330 + Some(repo::apply_writes::OutputResultsItem::CreateResult(res)) => res, 331 + _ => { 332 + return Err(Error::with_status( 333 + StatusCode::INTERNAL_SERVER_ERROR, 334 + anyhow!("unexpected result type from apply_writes"), 335 + )); 336 + } 337 + }; 610 338 611 339 Ok(Json( 612 340 repo::create_record::OutputData { ··· 619 347 )) 620 348 } 621 349 622 - /// Write a repository record, creating or updating it as needed. Requires auth, implemented by PDS. 623 - /// - POST /xrpc/com.atproto.repo.putRecord 624 - /// ### Request Body 625 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 626 - /// - `collection`: `nsid` // The NSID of the record collection. 627 - /// - `rkey`: `string` // The record key. <= 512 characters. 628 - /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 629 - /// - `record` 630 - /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. WARNING: nullable and optional field; may cause problems with golang implementation 631 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 632 - /// ### Responses 633 - /// - 200 OK: {"uri": "string","cid": "string","commit": {"cid": "string","rev": "string"},"validationStatus": "valid | unknown"} 634 - /// - 400 Bad Request: {error:"`InvalidRequest` | `ExpiredToken` | `InvalidToken` | `InvalidSwap`"} 635 - /// - 401 Unauthorized 350 + /// Update or create a repository record 636 351 async fn put_record( 637 352 user: AuthenticatedUser, 638 - State(skey): State<SigningKey>, 639 - State(config): State<AppConfig>, 640 - State(db): State<Db>, 641 - State(fhp): State<FirehoseProducer>, 353 + state: State<AppConfig>, 354 + db_state: State<Db>, 355 + fhp_state: State<FirehoseProducer>, 642 356 Json(input): Json<repo::put_record::Input>, 643 357 ) -> Result<Json<repo::put_record::Output>> { 644 - // TODO: `input.swap_record` 645 - // FIXME: "put" implies that we will create the record if it does not exist. 646 - // We currently only update existing records and/or throw an error if one doesn't exist. 647 - let input = (*input).clone(); 648 - let input = repo::apply_writes::InputData { 358 + // Convert put_record operation to apply_writes 359 + let apply_writes_input = repo::apply_writes::InputData { 649 360 repo: input.repo, 650 361 validate: input.validate, 651 362 swap_commit: input.swap_commit, ··· 660 371 } 661 372 .into(); 662 373 663 - let write_result = apply_writes( 664 - user, 665 - State(skey), 666 - State(config), 667 - State(db), 668 - State(fhp), 669 - Json(input), 670 - ) 671 - .await 672 - .context("failed to apply writes")?; 374 + let write_result = 375 + apply_writes(user, state, db_state, fhp_state, Json(apply_writes_input)).await?; 673 376 674 - let update_result = write_result 675 - .results 676 - .clone() 677 - .and_then(|result| result.first().cloned()) 678 - .context("unexpected output from apply_writes")?; 679 - let (cid, uri) = match update_result { 680 - repo::apply_writes::OutputResultsItem::CreateResult(create_result) => ( 681 - Some(create_result.cid.clone()), 682 - Some(create_result.uri.clone()), 683 - ), 684 - repo::apply_writes::OutputResultsItem::UpdateResult(update_result) => ( 685 - Some(update_result.cid.clone()), 686 - Some(update_result.uri.clone()), 687 - ), 688 - repo::apply_writes::OutputResultsItem::DeleteResult(_) => (None, None), 377 + // Extract result with appropriate handling for create/update 378 + let (cid, uri) = match write_result.results.and_then(|r| r.first().cloned()) { 379 + Some(repo::apply_writes::OutputResultsItem::CreateResult(create)) => { 380 + (create.cid, create.uri) 381 + } 382 + Some(repo::apply_writes::OutputResultsItem::UpdateResult(update)) => { 383 + (update.cid, update.uri) 384 + } 385 + _ => { 386 + return Err(Error::with_status( 387 + StatusCode::INTERNAL_SERVER_ERROR, 388 + anyhow!("unexpected result type from apply_writes"), 389 + )); 390 + } 689 391 }; 392 + 690 393 Ok(Json( 691 394 repo::put_record::OutputData { 692 - cid: cid.context("missing cid")?, 693 - commit: write_result.commit.clone(), 694 - uri: uri.context("missing uri")?, 395 + cid, 396 + commit: write_result.commit, 397 + uri, 695 398 validation_status: Some("unknown".to_owned()), 696 399 } 697 400 .into(), 698 401 )) 699 402 } 700 403 701 - /// Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 702 - /// - POST /xrpc/com.atproto.repo.deleteRecord 703 - /// ### Request Body 704 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 705 - /// - `collection`: `nsid` // The NSID of the record collection. 706 - /// - `rkey`: `string` // The record key. <= 512 characters. 707 - /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. 708 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 709 - /// ### Responses 710 - /// - 200 OK: {"commit": {"cid": "string","rev": "string"}} 711 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 712 - /// - 401 Unauthorized 404 + /// Delete a repository record 713 405 async fn delete_record( 714 406 user: AuthenticatedUser, 715 - State(skey): State<SigningKey>, 716 - State(config): State<AppConfig>, 717 - State(db): State<Db>, 718 - State(fhp): State<FirehoseProducer>, 407 + state: State<AppConfig>, 408 + db_state: State<Db>, 409 + fhp_state: State<FirehoseProducer>, 719 410 Json(input): Json<repo::delete_record::Input>, 720 411 ) -> Result<Json<repo::delete_record::Output>> { 721 - // TODO: `input.swap_record` 412 + // Convert delete_record operation to apply_writes 413 + let apply_writes_input = repo::apply_writes::InputData { 414 + repo: input.repo, 415 + validate: None, 416 + swap_commit: input.swap_commit, 417 + writes: vec![repo::apply_writes::InputWritesItem::Delete(Box::new( 418 + repo::apply_writes::DeleteData { 419 + collection: input.collection, 420 + rkey: input.rkey, 421 + } 422 + .into(), 423 + ))], 424 + } 425 + .into(); 426 + 427 + let write_result = 428 + apply_writes(user, state, db_state, fhp_state, Json(apply_writes_input)).await?; 722 429 723 430 Ok(Json( 724 431 repo::delete_record::OutputData { 725 - commit: apply_writes( 726 - user, 727 - State(skey), 728 - State(config), 729 - State(db), 730 - State(fhp), 731 - Json( 732 - repo::apply_writes::InputData { 733 - repo: input.repo.clone(), 734 - swap_commit: input.swap_commit.clone(), 735 - validate: None, 736 - writes: vec![repo::apply_writes::InputWritesItem::Delete(Box::new( 737 - repo::apply_writes::DeleteData { 738 - collection: input.collection.clone(), 739 - rkey: input.rkey.clone(), 740 - } 741 - .into(), 742 - ))], 743 - } 744 - .into(), 745 - ), 746 - ) 747 - .await 748 - .context("failed to apply writes")? 749 - .commit 750 - .clone(), 432 + commit: write_result.commit, 751 433 } 752 434 .into(), 753 435 )) 754 436 } 755 437 756 - /// Get information about an account and repository, including the list of collections. Does not require auth. 757 - /// - GET /xrpc/com.atproto.repo.describeRepo 758 - /// ### Query Parameters 759 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 760 - /// ### Responses 761 - /// - 200 OK: {"handle": "string","did": "string","didDoc": {},"collections": [string],"handleIsCorrect": true} \ 762 - /// handeIsCorrect - boolean - Indicates if handle is currently valid (resolves bi-directionally) 763 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 764 - /// - 401 Unauthorized 438 + /// Get information about an account and repository 765 439 async fn describe_repo( 766 440 State(config): State<AppConfig>, 767 441 State(db): State<Db>, 768 442 Query(input): Query<repo::describe_repo::ParametersData>, 769 443 ) -> Result<Json<repo::describe_repo::Output>> { 770 - // Lookup the DID by the provided handle. 771 - let (did, handle) = resolve_did(&db, &input.repo) 772 - .await 773 - .context("failed to resolve handle")?; 444 + // Resolve the DID 445 + let (did, handle) = resolve_did(&db, &input.repo).await?; 774 446 775 - let mut repo = storage::open_repo_db(&config.repo, &db, did.as_str()) 776 - .await 777 - .context("failed to open user repo")?; 778 - 779 - let mut collections = HashSet::new(); 780 - 781 - let mut tree = repo.tree(); 782 - let mut it = Box::pin(tree.keys()); 783 - while let Some(key) = it.try_next().await.context("failed to iterate repo keys")? { 784 - if let Some((collection, _rkey)) = key.split_once('/') { 785 - _ = collections.insert(collection.to_owned()); 786 - } 787 - } 447 + // Get collections from actor store 448 + let actor_store = ActorStore::new(Arc::new(config)); 449 + let collections = actor_store 450 + .read(did.as_str(), |reader| reader.record().list_collections()) 451 + .await?; 788 452 789 453 Ok(Json( 790 454 repo::describe_repo::OutputData { 791 455 collections: collections 792 456 .into_iter() 793 - .map(|nsid| Nsid::new(nsid).expect("should be valid NSID")) 794 - .collect::<Vec<_>>(), 795 - did: did.clone(), 796 - did_doc: Unknown::Null, // TODO: Fetch the DID document from the PLC directory 797 - handle: handle.clone(), 798 - handle_is_correct: true, // TODO 457 + .map(|c| Nsid::new(c).expect("valid NSID")) 458 + .collect(), 459 + did, 460 + did_doc: Unknown::Null, // TODO: fetch from PLC directory 461 + handle, 462 + handle_is_correct: true, // TODO: validate handle resolution 799 463 } 800 464 .into(), 801 465 )) 802 466 } 803 467 804 - /// Get a single record from a repository. Does not require auth. 805 - /// - GET /xrpc/com.atproto.repo.getRecord 806 - /// ### Query Parameters 807 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 808 - /// - `collection`: `nsid` // The NSID of the record collection. 809 - /// - `rkey`: `string` // The record key. <= 512 characters. 810 - /// - `cid`: `cid` // The CID of the version of the record. If not specified, then return the most recent version. 811 - /// ### Responses 812 - /// - 200 OK: {"uri": "string","cid": "string","value": {}} 813 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RecordNotFound`]} 814 - /// - 401 Unauthorized 468 + /// Get a single record from a repository 815 469 async fn get_record( 816 470 State(config): State<AppConfig>, 817 471 State(db): State<Db>, ··· 819 473 ) -> Result<Json<repo::get_record::Output>> { 820 474 if input.cid.is_some() { 821 475 return Err(Error::unimplemented(anyhow!( 822 - "looking up old records is unsupported" 476 + "fetching specific CID versions not supported" 823 477 ))); 824 478 } 825 479 826 - // Lookup the DID by the provided handle. 827 - let (did, _handle) = resolve_did(&db, &input.repo) 828 - .await 829 - .context("failed to resolve handle")?; 830 - 831 - let mut repo = storage::open_repo_db(&config.repo, &db, did.as_str()) 832 - .await 833 - .context("failed to open user repo")?; 480 + // Resolve the DID 481 + let (did, _) = resolve_did(&db, &input.repo).await?; 834 482 483 + // Construct record path and URI 835 484 let key = format!("{}/{}", input.collection.as_str(), input.rkey.as_str()); 836 - let uri = format!("at://{}/{}", did.as_str(), &key); 485 + let uri = format!("at://{}/{}", did.as_str(), key); 837 486 838 - let cid = repo 839 - .tree() 840 - .get(&key) 841 - .await 842 - .context("failed to find record")?; 487 + // Get record from actor store 488 + let actor_store = ActorStore::new(Arc::new(config)); 489 + let record_opt = actor_store 490 + .read(did.as_str(), |reader| reader.record().get_record(&key)) 491 + .await?; 843 492 844 - let record: Option<serde_json::Value> = 845 - repo.get_raw(&key).await.context("failed to read record")?; 493 + match record_opt { 494 + Some(record) => { 495 + // Parse record content 496 + let value: serde_json::Value = serde_json::from_slice(&record.content)?; 497 + let cid = Cid::from_str(&record.cid)?; 846 498 847 - record.map_or_else( 848 - || { 849 - Err(Error::with_message( 850 - StatusCode::BAD_REQUEST, 851 - anyhow!("could not find the requested record at {}", uri), 852 - ErrorMessage::new("RecordNotFound", format!("Could not locate record: {uri}")), 853 - )) 854 - }, 855 - |record_value| { 856 499 Ok(Json( 857 500 repo::get_record::OutputData { 858 - cid: cid.map(atrium_api::types::string::Cid::new), 859 - uri: uri.clone(), 860 - value: record_value 861 - .try_into_unknown() 862 - .context("should be valid JSON")?, 501 + cid: Some(atrium_api::types::string::Cid::new(cid)), 502 + uri, 503 + value: value.try_into_unknown()?, 863 504 } 864 505 .into(), 865 506 )) 866 - }, 867 - ) 507 + } 508 + None => Err(Error::with_message( 509 + StatusCode::BAD_REQUEST, 510 + anyhow!("record not found: {}", uri), 511 + ErrorMessage::new("RecordNotFound", format!("Could not locate record: {uri}")), 512 + )), 513 + } 868 514 } 869 515 870 - /// List a range of records in a repository, matching a specific collection. Does not require auth. 871 - /// - GET /xrpc/com.atproto.repo.listRecords 872 - /// ### Query Parameters 873 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 874 - /// - `collection`: `nsid` // The NSID of the record type. 875 - /// - `limit`: `integer` // The maximum number of records to return. Default 50, >=1 and <=100. 876 - /// - `cursor`: `string` 877 - /// - `reverse`: `boolean` // Flag to reverse the order of the returned records. 878 - /// ### Responses 879 - /// - 200 OK: {"cursor": "string","records": [{"uri": "string","cid": "string","value": {}}]} 880 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 881 - /// - 401 Unauthorized 516 + /// List records from a repository collection 882 517 async fn list_records( 883 518 State(config): State<AppConfig>, 884 519 State(db): State<Db>, 885 520 Query(input): Query<Object<ListRecordsParameters>>, 886 521 ) -> Result<Json<repo::list_records::Output>> { 887 - // TODO: `input.reverse` 888 - 889 - // Lookup the DID by the provided handle. 890 - let (did, _handle) = resolve_did(&db, &input.repo) 891 - .await 892 - .context("failed to resolve handle")?; 522 + // Resolve the DID 523 + let (did, _) = resolve_did(&db, &input.repo).await?; 893 524 894 - let mut repo = storage::open_repo_db(&config.repo, &db, did.as_str()) 895 - .await 896 - .context("failed to open user repo")?; 525 + // Parse limit parameter 526 + let limit = input 527 + .limit 528 + .as_deref() 529 + .and_then(|l| l.parse::<u32>().ok()) 530 + .unwrap_or(50) 531 + .min(100); 897 532 898 - let mut keys = Vec::new(); 899 - let mut tree = repo.tree(); 900 - 901 - let mut entry = input.collection.to_string(); 902 - entry.push('/'); 903 - let mut iterator = Box::pin(tree.entries_prefixed(entry.as_str())); 904 - while let Some((key, cid)) = iterator 905 - .try_next() 906 - .await 907 - .context("failed to iterate keys")? 908 - { 909 - keys.push((key, cid)); 910 - } 911 - 912 - drop(iterator); 533 + // Get records from actor store 534 + let actor_store = ActorStore::new(Arc::new(config)); 535 + let records = actor_store 536 + .read(did.as_str(), |reader| { 537 + reader.record().list_collection_records( 538 + input.collection.as_str(), 539 + limit, 540 + input.cursor.as_deref(), 541 + input.reverse.unwrap_or(false), 542 + ) 543 + }) 544 + .await?; 913 545 914 - // TODO: Calculate the view on `keys` using `cursor` and `limit`. 546 + // Format records for response 547 + let mut result_records = Vec::new(); 548 + let mut last_cursor = None; 915 549 916 - let mut records = Vec::new(); 917 - for &(ref key, cid) in &keys { 918 - let value: serde_json::Value = repo 919 - .get_raw(key) 920 - .await 921 - .context("failed to get record")? 922 - .context("record not found")?; 550 + for record in &records { 551 + // Parse record 552 + let value: serde_json::Value = serde_json::from_slice(&record.content)?; 553 + let cid = Cid::from_str(&record.cid)?; 554 + last_cursor = Some(record.rkey.clone()); 923 555 924 - records.push( 556 + result_records.push( 925 557 repo::list_records::RecordData { 926 558 cid: atrium_api::types::string::Cid::new(cid), 927 - uri: format!("at://{}/{}", did.as_str(), key), 928 - value: value.try_into_unknown().context("should be valid JSON")?, 559 + uri: format!( 560 + "at://{}/{}/{}", 561 + did.as_str(), 562 + input.collection.as_str(), 563 + record.rkey 564 + ), 565 + value: value.try_into_unknown()?, 929 566 } 930 567 .into(), 931 568 ); 932 569 } 933 570 934 - #[expect(clippy::pattern_type_mismatch)] 935 571 Ok(Json( 936 572 repo::list_records::OutputData { 937 - cursor: keys.last().map(|(_, cid)| cid.to_string()), 938 - records, 573 + cursor: last_cursor, 574 + records: result_records, 939 575 } 940 576 .into(), 941 577 )) 942 578 } 943 579 944 - /// Upload a new blob, to be referenced from a repository record. \ 945 - /// The blob will be deleted if it is not referenced within a time window (eg, minutes). \ 946 - /// Blob restrictions (mimetype, size, etc) are enforced when the reference is created. \ 947 - /// Requires auth, implemented by PDS. 948 - /// - POST /xrpc/com.atproto.repo.uploadBlob 949 - /// ### Request Body 950 - /// ### Responses 951 - /// - 200 OK: {"blob": "binary"} 952 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 953 - /// - 401 Unauthorized 580 + /// Upload a blob 954 581 async fn upload_blob( 955 582 user: AuthenticatedUser, 956 583 State(config): State<AppConfig>, 957 - State(db): State<Db>, 958 584 request: Request<Body>, 959 585 ) -> Result<Json<repo::upload_blob::Output>> { 586 + // Get content-length and content-type 960 587 let length = request 961 588 .headers() 962 589 .get(http::header::CONTENT_LENGTH) 963 590 .context("no content length provided")? 964 - .to_str() 965 - .map_err(anyhow::Error::from) 966 - .and_then(|content_length| content_length.parse::<u64>().map_err(anyhow::Error::from)) 967 - .context("invalid content-length header")?; 591 + .to_str()? 592 + .parse::<u64>()?; 593 + 968 594 let mime = request 969 595 .headers() 970 596 .get(http::header::CONTENT_TYPE) 971 597 .context("no content-type provided")? 972 - .to_str() 973 - .context("invalid content-type provided")? 598 + .to_str()? 974 599 .to_owned(); 975 600 601 + // Check size limit 976 602 if length > config.blob.limit { 977 603 return Err(Error::with_status( 978 604 StatusCode::PAYLOAD_TOO_LARGE, ··· 980 606 )); 981 607 } 982 608 983 - // FIXME: Need to make this more robust. This will fail under load. 609 + // Create temp file 984 610 let filename = config 985 611 .blob 986 612 .path 987 613 .join(format!("temp-{}.blob", chrono::Utc::now().timestamp())); 988 - let mut file = tokio::fs::File::create(&filename) 989 - .await 990 - .context("failed to create temporary file")?; 614 + let mut file = tokio::fs::File::create(&filename).await?; 991 615 992 - let mut len = 0_usize; 616 + // Process upload 617 + let mut len = 0; 993 618 let mut sha = Sha256::new(); 994 619 let mut stream = request.into_body().into_data_stream(); 995 - while let Some(bytes) = stream.try_next().await.context("failed to receive file")? { 996 - len = len.checked_add(bytes.len()).context("size overflow")?; 997 620 998 - // Deal with any sneaky end-users trying to bypass size limitations. 999 - let len_u64: u64 = len.try_into().context("failed to convert `len`")?; 1000 - if len_u64 > config.blob.limit { 1001 - drop(file); 1002 - tokio::fs::remove_file(&filename) 1003 - .await 1004 - .context("failed to remove temp file")?; 621 + while let Some(bytes) = stream.try_next().await? { 622 + len += bytes.len(); 1005 623 624 + if len as u64 > config.blob.limit { 625 + drop(file); 626 + tokio::fs::remove_file(&filename).await?; 1006 627 return Err(Error::with_status( 1007 628 StatusCode::PAYLOAD_TOO_LARGE, 1008 - anyhow!("size above limit and content-length header was wrong"), 629 + anyhow!("actual size exceeds limit"), 1009 630 )); 1010 631 } 1011 632 1012 633 sha.update(&bytes); 1013 - 1014 - file.write_all(&bytes) 1015 - .await 1016 - .context("failed to write blob")?; 634 + file.write_all(&bytes).await?; 1017 635 } 1018 636 637 + // Finalize blob 1019 638 drop(file); 1020 639 let hash = sha.finalize(); 1021 640 1022 641 let cid = Cid::new_v1( 1023 - IPLD_RAW, 1024 - atrium_repo::Multihash::wrap(IPLD_MH_SHA2_256, hash.as_slice()) 1025 - .context("should be valid hash")?, 642 + 0x55, // IPLD RAW 643 + atrium_repo::Multihash::wrap(0x12, hash.as_slice())?, // SHA2-256 1026 644 ); 1027 645 1028 646 let cid_str = cid.to_string(); 1029 647 1030 - tokio::fs::rename(&filename, config.blob.path.join(format!("{cid_str}.blob"))) 1031 - .await 1032 - .context("failed to finalize blob")?; 1033 - 1034 - let did_str = user.did(); 1035 - 1036 - _ = sqlx::query!( 1037 - r#"INSERT INTO blob_ref (cid, did, record) VALUES (?, ?, NULL)"#, 1038 - cid_str, 1039 - did_str 648 + // Move to permanent location 649 + tokio::fs::rename( 650 + &filename, 651 + config.blob.path.join(format!("{}.blob", cid_str)), 1040 652 ) 1041 - .execute(&db) 1042 - .await 1043 - .context("failed to insert blob into database")?; 653 + .await?; 654 + 655 + // Register blob in actor store 656 + let actor_store = ActorStore::new(Arc::new(config)); 657 + actor_store 658 + .transact(user.did(), |txn| { 659 + txn.blob() 660 + .register_blob(cid_str.clone(), mime.clone(), len as u64) 661 + }) 662 + .await?; 1044 663 1045 664 Ok(Json( 1046 665 repo::upload_blob::OutputData { ··· 1060 679 Err(Error::unimplemented(anyhow!("not implemented"))) 1061 680 } 1062 681 1063 - /// These endpoints are part of the atproto PDS repository management APIs. \ 1064 - /// Requests usually require authentication (unlike the com.atproto.sync.* endpoints), and are made directly to the user's own PDS instance. 1065 - /// ### Routes 1066 - /// - AP /xrpc/com.atproto.repo.applyWrites -> [`apply_writes`] 1067 - /// - AP /xrpc/com.atproto.repo.createRecord -> [`create_record`] 1068 - /// - AP /xrpc/com.atproto.repo.putRecord -> [`put_record`] 1069 - /// - AP /xrpc/com.atproto.repo.deleteRecord -> [`delete_record`] 1070 - /// - AP /xrpc/com.atproto.repo.uploadBlob -> [`upload_blob`] 1071 - /// - UG /xrpc/com.atproto.repo.describeRepo -> [`describe_repo`] 1072 - /// - UG /xrpc/com.atproto.repo.getRecord -> [`get_record`] 1073 - /// - UG /xrpc/com.atproto.repo.listRecords -> [`list_records`] 1074 - /// - [ ] xx /xrpc/com.atproto.repo.importRepo 1075 - // - [ ] xx /xrpc/com.atproto.repo.listMissingBlobs 682 + /// Register repo endpoints 1076 683 pub(super) fn routes() -> Router<AppState> { 1077 684 Router::new() 1078 685 .route(concat!("/", repo::apply_writes::NSID), post(apply_writes))
+1
src/main.rs
··· 1 1 //! PDS implementation. 2 + mod actor_store; 2 3 mod auth; 3 4 mod config; 4 5 mod did;