Alternative ATProto PDS implementation

prototype actor_store

Changed files
+132 -97
src
actor_store
+100 -6
src/actor_store/db/migrations.rs
··· 1 1 //! Database migrations for the actor store. 2 - use anyhow::Result; 2 + use anyhow::{Context as _, Result}; 3 + use sqlx::SqlitePool; 3 4 4 5 use crate::actor_store::db::ActorDb; 5 6 ··· 17 18 pub(crate) fn new(db: ActorDb) -> Self { 18 19 Self { 19 20 db, 20 - migrations: vec![init_migration], 21 + migrations: vec![_001_init], 21 22 } 22 23 } 23 24 24 25 /// Run all migrations 25 26 pub(crate) async fn migrate_to_latest(&self) -> Result<()> { 26 - // We could track which migrations have been run 27 - // For simplicity, we just run them all for now 27 + let past_migrations = sqlx::query!("SELECT name FROM migration") 28 + .fetch_all(&self.db.db) 29 + .await?; 30 + let mut past_migration_names = past_migrations 31 + .iter() 32 + .map(|m| m.name.clone()) 33 + .collect::<Vec<_>>(); 34 + past_migration_names.sort(); 28 35 for migration in &self.migrations { 29 - migration(&self.db)?; 36 + let name = format!("{:p}", migration); 37 + if !past_migration_names.contains(&name) { 38 + migration(&self.db)?; 39 + sqlx::query("INSERT INTO migration (name, appliedAt) VALUES (?, ?)") 40 + .bind(name) 41 + .bind(chrono::Utc::now().to_rfc3339()) 42 + .execute(&self.db.db) 43 + .await?; 44 + } 30 45 } 31 46 Ok(()) 32 47 } ··· 40 55 } 41 56 42 57 /// Initial migration to create tables 43 - fn init_migration(db: &ActorDb) -> Result<()> { 58 + fn _001_init(db: &ActorDb) -> Result<()> { 44 59 tokio::task::block_in_place(|| { 45 60 tokio::runtime::Handle::current() 46 61 .block_on(async { crate::actor_store::db::create_tables(&db.db).await }) 47 62 }) 48 63 } 64 + 65 + /// Create the initial database tables 66 + pub(crate) async fn create_tables(db: &SqlitePool) -> Result<()> { 67 + sqlx::query( 68 + " 69 + CREATE TABLE IF NOT EXISTS repo_root ( 70 + did TEXT PRIMARY KEY NOT NULL, 71 + cid TEXT NOT NULL, 72 + rev TEXT NOT NULL, 73 + indexedAt TEXT NOT NULL 74 + ); 75 + 76 + CREATE TABLE IF NOT EXISTS repo_block ( 77 + cid TEXT PRIMARY KEY NOT NULL, 78 + repoRev TEXT NOT NULL, 79 + size INTEGER NOT NULL, 80 + content BLOB NOT NULL 81 + ); 82 + 83 + CREATE TABLE IF NOT EXISTS record ( 84 + uri TEXT PRIMARY KEY NOT NULL, 85 + cid TEXT NOT NULL, 86 + collection TEXT NOT NULL, 87 + rkey TEXT NOT NULL, 88 + repoRev TEXT NOT NULL, 89 + indexedAt TEXT NOT NULL, 90 + takedownRef TEXT 91 + ); 92 + 93 + CREATE TABLE IF NOT EXISTS blob ( 94 + cid TEXT PRIMARY KEY NOT NULL, 95 + mimeType TEXT NOT NULL, 96 + size INTEGER NOT NULL, 97 + tempKey TEXT, 98 + width INTEGER, 99 + height INTEGER, 100 + createdAt TEXT NOT NULL, 101 + takedownRef TEXT 102 + ); 103 + 104 + CREATE TABLE IF NOT EXISTS record_blob ( 105 + blobCid TEXT NOT NULL, 106 + recordUri TEXT NOT NULL, 107 + PRIMARY KEY (blobCid, recordUri) 108 + ); 109 + 110 + CREATE TABLE IF NOT EXISTS backlink ( 111 + uri TEXT NOT NULL, 112 + path TEXT NOT NULL, 113 + linkTo TEXT NOT NULL, 114 + PRIMARY KEY (uri, path) 115 + ); 116 + 117 + CREATE TABLE IF NOT EXISTS account_pref ( 118 + id INTEGER PRIMARY KEY AUTOINCREMENT, 119 + name TEXT NOT NULL, 120 + valueJson TEXT NOT NULL 121 + ); 122 + 123 + CREATE TABLE IF NOT EXISTS migration ( 124 + id INTEGER PRIMARY KEY AUTOINCREMENT, 125 + name TEXT NOT NULL, 126 + appliedAt TEXT NOT NULL 127 + ); 128 + 129 + CREATE INDEX IF NOT EXISTS idx_repo_block_repo_rev ON repo_block(repoRev, cid); 130 + CREATE INDEX IF NOT EXISTS idx_record_cid ON record(cid); 131 + CREATE INDEX IF NOT EXISTS idx_record_collection ON record(collection); 132 + CREATE INDEX IF NOT EXISTS idx_record_repo_rev ON record(repoRev); 133 + CREATE INDEX IF NOT EXISTS idx_blob_tempkey ON blob(tempKey); 134 + CREATE INDEX IF NOT EXISTS idx_backlink_link_to ON backlink(path, linkTo); 135 + ", 136 + ) 137 + .execute(db) 138 + .await 139 + .context("failed to create tables")?; 140 + 141 + Ok(()) 142 + }
-75
src/actor_store/db/mod.rs
··· 139 139 Ok(ActorDb::new(pool)) 140 140 } 141 141 142 - /// Create the initial database tables 143 - pub(crate) async fn create_tables(db: &SqlitePool) -> Result<()> { 144 - sqlx::query( 145 - " 146 - CREATE TABLE IF NOT EXISTS repo_root ( 147 - did TEXT PRIMARY KEY NOT NULL, 148 - cid TEXT NOT NULL, 149 - rev TEXT NOT NULL, 150 - indexedAt TEXT NOT NULL 151 - ); 152 - 153 - CREATE TABLE IF NOT EXISTS repo_block ( 154 - cid TEXT PRIMARY KEY NOT NULL, 155 - repoRev TEXT NOT NULL, 156 - size INTEGER NOT NULL, 157 - content BLOB NOT NULL 158 - ); 159 - 160 - CREATE TABLE IF NOT EXISTS record ( 161 - uri TEXT PRIMARY KEY NOT NULL, 162 - cid TEXT NOT NULL, 163 - collection TEXT NOT NULL, 164 - rkey TEXT NOT NULL, 165 - repoRev TEXT NOT NULL, 166 - indexedAt TEXT NOT NULL, 167 - takedownRef TEXT 168 - ); 169 - 170 - CREATE TABLE IF NOT EXISTS blob ( 171 - cid TEXT PRIMARY KEY NOT NULL, 172 - mimeType TEXT NOT NULL, 173 - size INTEGER NOT NULL, 174 - tempKey TEXT, 175 - width INTEGER, 176 - height INTEGER, 177 - createdAt TEXT NOT NULL, 178 - takedownRef TEXT 179 - ); 180 - 181 - CREATE TABLE IF NOT EXISTS record_blob ( 182 - blobCid TEXT NOT NULL, 183 - recordUri TEXT NOT NULL, 184 - PRIMARY KEY (blobCid, recordUri) 185 - ); 186 - 187 - CREATE TABLE IF NOT EXISTS backlink ( 188 - uri TEXT NOT NULL, 189 - path TEXT NOT NULL, 190 - linkTo TEXT NOT NULL, 191 - PRIMARY KEY (uri, path) 192 - ); 193 - 194 - CREATE TABLE IF NOT EXISTS account_pref ( 195 - id INTEGER PRIMARY KEY AUTOINCREMENT, 196 - name TEXT NOT NULL, 197 - valueJson TEXT NOT NULL 198 - ); 199 - 200 - CREATE INDEX IF NOT EXISTS idx_repo_block_repo_rev ON repo_block(repoRev, cid); 201 - CREATE INDEX IF NOT EXISTS idx_record_cid ON record(cid); 202 - CREATE INDEX IF NOT EXISTS idx_record_collection ON record(collection); 203 - CREATE INDEX IF NOT EXISTS idx_record_repo_rev ON record(repoRev); 204 - CREATE INDEX IF NOT EXISTS idx_blob_tempkey ON blob(tempKey); 205 - CREATE INDEX IF NOT EXISTS idx_backlink_link_to ON backlink(path, linkTo); 206 - ", 207 - ) 208 - .execute(db) 209 - .await 210 - .context("failed to create tables")?; 211 - 212 - Ok(()) 213 - } 214 - 215 142 /// Get a migrator for the database. 216 143 pub(crate) fn get_migrator(db: ActorDb) -> migrations::Migrator { 217 144 migrations::Migrator::new(db) ··· 234 161 format!("{} IS NULL", field_ref.into()) 235 162 } 236 163 } 237 - 238 - // Re-export commonly used types
+14 -7
src/actor_store/db/schema.rs
··· 1 1 //! Database schema definitions for the actor store. 2 2 3 3 /// Repository root information 4 - #[derive(Debug, Clone)] 4 + #[derive(Debug, Clone, sqlx::FromRow)] 5 + #[sqlx(rename_all = "camelCase")] 5 6 pub(crate) struct RepoRoot { 6 7 pub(crate) did: String, 7 8 pub(crate) cid: String, ··· 12 13 pub(crate) const REPO_ROOT_TABLE: &str = "repo_root"; 13 14 14 15 /// Repository block (IPLD block) 15 - #[derive(Debug, Clone)] 16 + #[derive(Debug, Clone, sqlx::FromRow)] 17 + #[sqlx(rename_all = "camelCase")] 16 18 pub(crate) struct RepoBlock { 17 19 pub(crate) cid: String, 18 20 pub(crate) repo_rev: String, ··· 23 25 pub(crate) const REPO_BLOCK_TABLE: &str = "repo_block"; 24 26 25 27 /// Record information 26 - #[derive(Debug, Clone)] 28 + #[derive(Debug, Clone, sqlx::FromRow)] 29 + #[sqlx(rename_all = "camelCase")] 27 30 pub(crate) struct Record { 28 31 pub(crate) uri: String, 29 32 pub(crate) cid: String, ··· 38 41 pub(crate) const RECORD_TABLE: &str = "record"; 39 42 40 43 /// Blob information 41 - #[derive(Debug, Clone)] 44 + #[derive(Debug, Clone, sqlx::FromRow)] 45 + #[sqlx(rename_all = "camelCase")] 42 46 pub(crate) struct Blob { 43 47 pub(crate) cid: String, 44 48 pub(crate) mime_type: String, ··· 54 58 pub(crate) const BLOB_TABLE: &str = "blob"; 55 59 56 60 /// Record-blob association 57 - #[derive(Debug, Clone)] 61 + #[derive(Debug, Clone, sqlx::FromRow)] 62 + #[sqlx(rename_all = "camelCase")] 58 63 pub(crate) struct RecordBlob { 59 64 pub(crate) blob_cid: String, 60 65 pub(crate) record_uri: String, ··· 63 68 pub(crate) const RECORD_BLOB_TABLE: &str = "record_blob"; 64 69 65 70 /// Backlink between records 66 - #[derive(Debug, Clone)] 71 + #[derive(Debug, Clone, sqlx::FromRow)] 72 + #[sqlx(rename_all = "camelCase")] 67 73 pub(crate) struct Backlink { 68 74 pub(crate) uri: String, 69 75 pub(crate) path: String, ··· 73 79 pub(crate) const BACKLINK_TABLE: &str = "backlink"; 74 80 75 81 /// Account preference 76 - #[derive(Debug, Clone)] 82 + #[derive(Debug, Clone, sqlx::FromRow)] 83 + #[sqlx(rename_all = "camelCase")] 77 84 pub(crate) struct AccountPref { 78 85 pub(crate) id: i64, 79 86 pub(crate) name: String,
+18 -9
src/actor_store/preference/reader.rs
··· 7 7 pub(crate) struct PreferenceReader { 8 8 /// Database connection. 9 9 pub db: SqlitePool, 10 - /// DID of the repository owner. 11 - pub did: String, 12 10 } 13 11 14 12 /// User preference with type information. 15 - #[derive(Debug, Clone)] 13 + #[derive(Debug, Clone, serde::Deserialize)] 16 14 pub(crate) struct AccountPreference { 17 15 /// Type of the preference. 18 16 pub r#type: String, ··· 22 20 23 21 impl PreferenceReader { 24 22 /// Create a new preference reader. 25 - pub(crate) fn new(db: SqlitePool, did: String) -> Self { 26 - Self { db, did } 23 + pub(crate) fn new(db: SqlitePool) -> Self { 24 + Self { db } 27 25 } 28 26 29 27 /// Get preferences for a namespace. 30 28 pub(crate) async fn get_preferences( 31 29 &self, 32 - namespace: &str, 30 + namespace: Option<&str>, 33 31 scope: &str, 34 32 ) -> Result<Vec<AccountPreference>> { 35 - // TODO: Implement preference reader 36 - // For now, just return an empty list 37 - Ok(Vec::new()) 33 + let prefs_res = sqlx::query!("SELECT * FROM account_pref ORDER BY id") 34 + .fetch_all(&self.db) 35 + .await?; 36 + 37 + let prefs = prefs_res 38 + .into_iter() 39 + .filter(|pref| { 40 + namespace.map_or(true, |ns| pref_match_namespace(ns, &pref.name)) 41 + && pref_in_scope(scope, &pref.name) 42 + }) 43 + .map(|pref| serde_json::from_str(&pref.valueJson).unwrap()) 44 + .collect(); 45 + 46 + Ok(prefs) 38 47 } 39 48 } 40 49