Alternative ATProto PDS implementation

prototype actor_store and db

+2
Cargo.lock
··· 1300 1300 "constcat", 1301 1301 "diesel", 1302 1302 "diesel_migrations", 1303 + "dotenvy", 1303 1304 "figment", 1304 1305 "futures", 1305 1306 "hex", ··· 1316 1317 "regex", 1317 1318 "reqwest 0.12.15", 1318 1319 "reqwest-middleware", 1320 + "rocket_sync_db_pools", 1319 1321 "rsky-common", 1320 1322 "rsky-lexicon", 1321 1323 "rsky-pds",
+4
Cargo.toml
··· 251 251 async-trait = "0.1.88" 252 252 lazy_static = "1.5.0" 253 253 secp256k1 = "0.28.2" 254 + dotenvy = "0.15.7" 255 + [dependencies.rocket_sync_db_pools] 256 + version = "=0.1.0" 257 + features = ["diesel_sqlite_pool"]
+6 -4
src/actor_store/actor_store.rs
··· 25 25 use std::{env, fmt}; 26 26 use tokio::sync::RwLock; 27 27 28 - use super::ActorDb; 28 + use crate::db::DbConn; 29 + 29 30 use super::blob::BlobReader; 30 31 use super::preference::PreferenceReader; 31 32 use super::record::RecordReader; ··· 66 67 // Combination of RepoReader/Transactor, BlobReader/Transactor, SqlRepoReader/Transactor 67 68 impl ActorStore { 68 69 /// Concrete reader of an individual repo (hence BlobStoreSql which takes `did` param) 69 - pub fn new(did: String, blobstore: BlobStoreSql, db: ActorDb) -> Self { 70 + pub fn new(did: String, blobstore: BlobStoreSql, db: DbConn) -> Self { 71 + let db = Arc::new(db); 70 72 ActorStore { 71 73 storage: Arc::new(RwLock::new(SqlRepoReader::new( 72 74 did.clone(), ··· 428 430 pub async fn destroy(&mut self) -> Result<()> { 429 431 let did: String = self.did.clone(); 430 432 let storage_guard = self.storage.read().await; 431 - let db: ActorDb = storage_guard.db.clone(); 433 + let db: Arc<DbConn> = storage_guard.db.clone(); 432 434 use rsky_pds::schema::pds::blob::dsl as BlobSchema; 433 435 434 436 let blob_rows: Vec<String> = db ··· 462 464 } 463 465 let did: String = self.did.clone(); 464 466 let storage_guard = self.storage.read().await; 465 - let db: ActorDb = storage_guard.db.clone(); 467 + let db: Arc<DbConn> = storage_guard.db.clone(); 466 468 use rsky_pds::schema::pds::record::dsl as RecordSchema; 467 469 468 470 let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
+6 -5
src/actor_store/blob.rs
··· 4 4 //! 5 5 //! Modified for SQLite backend 6 6 7 - use anyhow::{Result, bail}; 7 + use std::sync::Arc; 8 + 9 + use anyhow::{Error, Result, bail}; 8 10 use cidv10::Cid; 9 11 use diesel::dsl::{count_distinct, exists, not}; 10 - use diesel::result::Error; 11 12 use diesel::sql_types::{Integer, Nullable, Text}; 12 13 use diesel::*; 13 14 use futures::stream::{self, StreamExt}; ··· 30 31 use rsky_repo::types::{PreparedBlobRef, PreparedWrite}; 31 32 use sha2::Digest; 32 33 33 - use super::ActorDb; 34 34 use super::sql_blob::BlobStoreSql; 35 + use crate::db::DbConn; 35 36 36 37 pub struct BlobReader { 37 38 pub blobstore: BlobStoreSql, 38 39 pub did: String, 39 - pub db: ActorDb, 40 + pub db: Arc<DbConn>, 40 41 } 41 42 42 43 // Basically handles getting blob records from db 43 44 impl BlobReader { 44 - pub fn new(blobstore: BlobStoreSql, db: ActorDb) -> Self { 45 + pub fn new(blobstore: BlobStoreSql, db: Arc<DbConn>) -> Self { 45 46 // BlobReader { 46 47 // did: blobstore.bucket.clone(), 47 48 // blobstore,
-52
src/actor_store/db.rs
··· 1 - //! Database schema and connection management for the actor store. 2 - 3 - use crate::db::DatabaseConnection; 4 - use anyhow::{Context as _, Result}; 5 - 6 - /// Type alias for the actor database. 7 - pub(crate) type ActorDb = DatabaseConnection; 8 - 9 - /// Gets a database connection for the actor store. 10 - /// 11 - /// # Arguments 12 - /// 13 - /// * `location` - The file path or URI for the SQLite database. 14 - /// * `disable_wal_auto_checkpoint` - Whether to disable the WAL auto-checkpoint. 15 - /// 16 - /// # Returns 17 - /// 18 - /// A `Result` containing the `ActorDb` instance or an error. 19 - pub async fn get_db(location: &str, disable_wal_auto_checkpoint: bool) -> Result<ActorDb> { 20 - let pragmas = if disable_wal_auto_checkpoint { 21 - Some( 22 - &[ 23 - ("wal_autocheckpoint", "0"), 24 - ("journal_mode", "WAL"), 25 - ("synchronous", "NORMAL"), 26 - ("foreign_keys", "ON"), 27 - ][..], 28 - ) 29 - } else { 30 - Some( 31 - &[ 32 - ("journal_mode", "WAL"), 33 - ("synchronous", "NORMAL"), 34 - ("foreign_keys", "ON"), 35 - ][..], 36 - ) 37 - }; 38 - 39 - let db = DatabaseConnection::new(location, pragmas) 40 - .await 41 - .context("Failed to initialize the actor database")?; 42 - 43 - // Ensure WAL mode is properly set up 44 - db.ensure_wal().await?; 45 - 46 - // Run migrations 47 - // TODO: make sure the migrations are populated? 48 - db.run_migrations() 49 - .context("Failed to run migrations on the actor database")?; 50 - 51 - Ok(db) 52 - }
-3
src/actor_store/mod.rs
··· 2 2 3 3 mod actor_store; 4 4 mod blob; 5 - mod db; 6 5 mod preference; 7 6 mod record; 8 7 mod sql_blob; 9 8 mod sql_repo; 10 - 11 - pub(crate) use db::ActorDb;
+5 -3
src/actor_store/preference.rs
··· 4 4 //! 5 5 //! Modified for SQLite backend 6 6 7 + use std::sync::Arc; 8 + 7 9 use anyhow::{Result, bail}; 8 10 use diesel::*; 9 11 use rsky_lexicon::app::bsky::actor::RefPreferences; ··· 12 14 use rsky_pds::auth_verifier::AuthScope; 13 15 use rsky_pds::models::AccountPref; 14 16 15 - use super::ActorDb; 17 + use crate::db::DbConn; 16 18 17 19 pub struct PreferenceReader { 18 20 pub did: String, 19 - pub db: ActorDb, 21 + pub db: Arc<DbConn>, 20 22 } 21 23 22 24 impl PreferenceReader { 23 - pub fn new(did: String, db: ActorDb) -> Self { 25 + pub fn new(did: String, db: Arc<DbConn>) -> Self { 24 26 PreferenceReader { did, db } 25 27 } 26 28
+5 -4
src/actor_store/record.rs
··· 17 17 use rsky_syntax::aturi::AtUri; 18 18 use std::env; 19 19 use std::str::FromStr; 20 + use std::sync::Arc; 20 21 21 - use crate::actor_store::db::ActorDb; 22 + use crate::db::DbConn; 22 23 23 24 /// Combined handler for record operations with both read and write capabilities. 24 25 pub(crate) struct RecordReader { 25 26 /// Database connection. 26 - pub db: ActorDb, 27 + pub db: Arc<DbConn>, 27 28 /// DID of the actor. 28 29 pub did: String, 29 30 } 30 31 31 32 impl RecordReader { 32 33 /// Create a new record handler. 33 - pub(crate) fn new(did: String, db: ActorDb) -> Self { 34 + pub(crate) fn new(did: String, db: Arc<DbConn>) -> Self { 34 35 Self { did, db } 35 36 } 36 37 ··· 292 293 let record_backlinks = get_backlinks(uri, record)?; 293 294 let conflicts: Vec<Vec<Record>> = stream::iter(record_backlinks) 294 295 .then(|backlink| async move { 295 - Ok::<Vec<Record>, Error>( 296 + Ok::<Vec<Record>, anyhow::Error>( 296 297 self.get_record_backlinks( 297 298 uri.get_collection(), 298 299 backlink.path,
+3 -3
src/actor_store/sql_blob.rs
··· 1 - use std::{path::PathBuf, str::FromStr as _}; 1 + use std::{path::PathBuf, str::FromStr as _, sync::Arc}; 2 2 3 3 use anyhow::Result; 4 4 use cidv10::Cid; 5 5 use rsky_common::get_random_str; 6 6 7 - use crate::db::DatabaseConnection; 7 + use crate::db::DbConn; 8 8 9 9 /// Type for stream of blob data 10 10 pub type BlobStream = Box<dyn std::io::Read + Send>; ··· 12 12 /// Placeholder implementation for blob store 13 13 #[derive(Clone)] 14 14 pub(crate) struct BlobStoreSql { 15 - client: DatabaseConnection, 15 + client: Arc<DbConn>, 16 16 path: PathBuf, 17 17 } 18 18
+16 -16
src/actor_store/sql_repo.rs
··· 1 - //! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/repo/sql_repo.rs 1 + //! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/record/mod.rs 2 2 //! blacksky-algorithms/rsky is licensed under the Apache License 2.0 3 3 //! 4 4 //! Modified for SQLite backend ··· 25 25 use std::sync::Arc; 26 26 use tokio::sync::RwLock; 27 27 28 - use super::ActorDb; 28 + use crate::db::DbConn; 29 29 30 30 #[derive(Clone, Debug)] 31 31 pub struct SqlRepoReader { 32 32 pub cache: Arc<RwLock<BlockMap>>, 33 - pub db: ActorDb, 33 + pub db: Arc<DbConn>, 34 34 pub root: Option<Cid>, 35 35 pub rev: Option<String>, 36 36 pub now: String, ··· 43 43 cid: &'a Cid, 44 44 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>>> + Send + Sync + 'a>> { 45 45 let did: String = self.did.clone(); 46 - let db: ActorDb = self.db.clone(); 46 + let db: Arc<DbConn> = self.db.clone(); 47 47 let cid = cid.clone(); 48 48 49 49 Box::pin(async move { ··· 94 94 cids: Vec<Cid>, 95 95 ) -> Pin<Box<dyn Future<Output = Result<BlocksAndMissing>> + Send + Sync + 'a>> { 96 96 let did: String = self.did.clone(); 97 - let db: ActorDb = self.db.clone(); 97 + let db: Arc<DbConn> = self.db.clone(); 98 98 99 99 Box::pin(async move { 100 100 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; ··· 189 189 rev: String, 190 190 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'a>> { 191 191 let did: String = self.did.clone(); 192 - let db: ActorDb = self.db.clone(); 192 + let db: Arc<DbConn> = self.db.clone(); 193 193 let bytes_cloned = bytes.clone(); 194 194 Box::pin(async move { 195 195 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; ··· 220 220 rev: String, 221 221 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'a>> { 222 222 let did: String = self.did.clone(); 223 - let db: ActorDb = self.db.clone(); 223 + let db: Arc<DbConn> = self.db.clone(); 224 224 225 225 Box::pin(async move { 226 226 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; ··· 270 270 is_create: Option<bool>, 271 271 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'a>> { 272 272 let did: String = self.did.clone(); 273 - let db: ActorDb = self.db.clone(); 273 + let db: Arc<DbConn> = self.db.clone(); 274 274 let now: String = self.now.clone(); 275 275 276 276 Box::pin(async move { ··· 323 323 324 324 // Basically handles getting ipld blocks from db 325 325 impl SqlRepoReader { 326 - pub fn new(did: String, now: Option<String>, db: ActorDb) -> Self { 326 + pub fn new(did: String, now: Option<String>, db: Arc<DbConn>) -> Self { 327 327 let now = now.unwrap_or_else(rsky_common::now); 328 328 SqlRepoReader { 329 329 cache: Arc::new(RwLock::new(BlockMap::new())), ··· 370 370 cursor: &Option<CidAndRev>, 371 371 ) -> Result<Vec<RepoBlock>> { 372 372 let did: String = self.did.clone(); 373 - let db: ActorDb = self.db.clone(); 373 + let db: Arc<DbConn> = self.db.clone(); 374 374 let since = since.clone(); 375 375 let cursor = cursor.clone(); 376 376 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; ··· 408 408 409 409 pub async fn count_blocks(&self) -> Result<i64> { 410 410 let did: String = self.did.clone(); 411 - let db: ActorDb = self.db.clone(); 411 + let db: Arc<DbConn> = self.db.clone(); 412 412 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 413 413 414 414 let res = db ··· 428 428 /// Proactively cache all blocks from a particular commit (to prevent multiple roundtrips) 429 429 pub async fn cache_rev(&mut self, rev: String) -> Result<()> { 430 430 let did: String = self.did.clone(); 431 - let db: ActorDb = self.db.clone(); 431 + let db: Arc<DbConn> = self.db.clone(); 432 432 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 433 433 434 - let res: Vec<(String, Vec<u8>)> = db 434 + let result: Vec<(String, Vec<u8>)> = db 435 435 .run(move |conn| { 436 436 RepoBlockSchema::repo_block 437 437 .filter(RepoBlockSchema::did.eq(did)) ··· 441 441 .get_results::<(String, Vec<u8>)>(conn) 442 442 }) 443 443 .await?; 444 - for row in res { 444 + for row in result { 445 445 let mut cache_guard = self.cache.write().await; 446 446 cache_guard.set(Cid::from_str(&row.0)?, row.1) 447 447 } ··· 453 453 return Ok(()); 454 454 } 455 455 let did: String = self.did.clone(); 456 - let db: ActorDb = self.db.clone(); 456 + let db: Arc<DbConn> = self.db.clone(); 457 457 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 458 458 459 459 let cid_strings: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); ··· 469 469 470 470 pub async fn get_root_detailed(&self) -> Result<CidAndRev> { 471 471 let did: String = self.did.clone(); 472 - let db: ActorDb = self.db.clone(); 472 + let db: Arc<DbConn> = self.db.clone(); 473 473 use rsky_pds::schema::pds::repo_root::dsl as RepoRootSchema; 474 474 475 475 let res = db
+19 -188
src/db/mod.rs
··· 1 - use anyhow::{Context, Result}; 2 - use diesel::connection::SimpleConnection; 3 - use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; 4 - use diesel::sqlite::Sqlite; 5 - use diesel::*; 6 - use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; 1 + use anyhow::Result; 2 + use diesel::prelude::*; 3 + use dotenvy::dotenv; 4 + use rocket_sync_db_pools::database; 7 5 use std::env; 8 - use std::path::Path; 9 - use std::time::Duration; 6 + use std::fmt::{Debug, Formatter}; 10 7 11 - pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); 12 - pub type SqlitePool = Pool<ConnectionManager<SqliteConnection>>; 13 - pub type SqlitePooledConnection = PooledConnection<ConnectionManager<SqliteConnection>>; 8 + #[database("sqlite_db")] 9 + pub struct DbConn(PgConnection); 14 10 15 - /// Database type for all queries 16 - pub type DbType = Sqlite; 11 + impl Debug for DbConn { 12 + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { 13 + todo!() 14 + } 15 + } 17 16 18 17 #[tracing::instrument(skip_all)] 19 - pub async fn establish_connection_for_sequencer() -> Result<DatabaseConnection> { 18 + pub fn establish_connection_for_sequencer() -> Result<SqliteConnection> { 19 + dotenv().ok(); 20 + tracing::debug!("Establishing database connection for Sequencer"); 20 21 let database_url = env::var("BLUEPDS_DB").unwrap_or("sqlite://data/sqlite.db".into()); 21 - let pragmas = Some( 22 - &[ 23 - ("journal_mode", "WAL"), 24 - ("synchronous", "NORMAL"), 25 - ("foreign_keys", "ON"), 26 - ][..], 27 - ); 28 - let db = DatabaseConnection::new(&database_url, pragmas) 29 - .await 30 - .context("Failed to initialize the actor database")?; 31 - db.ensure_wal().await?; 32 - db.run_migrations() 33 - .context("Failed to run migrations on the actor database")?; 22 + let db = SqliteConnection::establish(&database_url).map_err(|error| { 23 + let context = format!("Error connecting to {database_url:?}"); 24 + anyhow::Error::new(error).context(context) 25 + })?; 34 26 Ok(db) 35 27 } 36 - 37 - /// Database connection wrapper 38 - #[derive(Clone, Debug)] 39 - pub struct DatabaseConnection { 40 - pub pool: SqlitePool, 41 - } 42 - 43 - impl DatabaseConnection { 44 - /// Create a new database connection with optional pragmas 45 - pub async fn new(path: &str, pragmas: Option<&[(&str, &str)]>) -> Result<Self> { 46 - // Create the database directory if it doesn't exist 47 - if let Some(parent) = Path::new(path).parent() { 48 - if !parent.exists() { 49 - tokio::fs::create_dir_all(parent) 50 - .await 51 - .context(format!("Failed to create directory: {:?}", parent))?; 52 - } 53 - } 54 - 55 - // Sanitize the path for connection string 56 - let database_url = format!("sqlite:{}", path); 57 - 58 - // Create a connection manager 59 - let manager = ConnectionManager::<SqliteConnection>::new(database_url); 60 - 61 - // Create the connection pool with SQLite-specific configurations 62 - let pool = Pool::builder() 63 - .max_size(10) 64 - .connection_timeout(Duration::from_secs(30)) 65 - .test_on_check_out(true) 66 - .build(manager) 67 - .context("Failed to create connection pool")?; 68 - 69 - // Initialize the database with pragmas 70 - if let Some(pragmas) = pragmas { 71 - let conn = &mut pool.get().context("Failed to get connection from pool")?; 72 - 73 - // Apply all pragmas 74 - for (pragma, value) in pragmas { 75 - let sql = format!("PRAGMA {} = {}", pragma, value); 76 - conn.batch_execute(&sql) 77 - .context(format!("Failed to set pragma {}", pragma))?; 78 - } 79 - } 80 - 81 - let db = DatabaseConnection { pool }; 82 - Ok(db) 83 - } 84 - 85 - /// Run migrations on the database 86 - pub fn run_migrations(&self) -> Result<()> { 87 - let mut conn = self 88 - .pool 89 - .get() 90 - .context("Failed to get connection for migrations")?; 91 - 92 - conn.run_pending_migrations(MIGRATIONS) 93 - .map_err(|e| anyhow::anyhow!("Failed to run migrations: {}", e))?; 94 - 95 - Ok(()) 96 - } 97 - 98 - /// Ensure WAL mode is enabled 99 - pub async fn ensure_wal(&self) -> Result<()> { 100 - let conn = &mut self.pool.get().context("Failed to get connection")?; 101 - conn.batch_execute("PRAGMA journal_mode = WAL;")?; 102 - conn.batch_execute("PRAGMA synchronous = NORMAL;")?; 103 - conn.batch_execute("PRAGMA foreign_keys = ON;")?; 104 - Ok(()) 105 - } 106 - 107 - /// Execute a database operation with retries for busy errors 108 - pub async fn run<F, T>(&self, operation: F) -> Result<T> 109 - where 110 - F: FnOnce(&mut SqliteConnection) -> QueryResult<T> + Send, 111 - T: Send + 'static, 112 - { 113 - let mut retries = 0; 114 - let max_retries = 5; 115 - let mut last_error = None; 116 - 117 - while retries < max_retries { 118 - let mut conn = self.pool.get().context("Failed to get connection")?; 119 - match operation(&mut conn) { 120 - Ok(result) => return Ok(result), 121 - // TODO: Busy error handling 122 - // Err(diesel::result::Error::DatabaseError( 123 - // diesel::result::DatabaseErrorKind::DatabaseIsLocked, 124 - // _, 125 - // )) => { 126 - // retries += 1; 127 - // let backoff_ms = 10 * (1 << retries); // Exponential backoff 128 - // last_error = Some(diesel::result::Error::DatabaseError( 129 - // diesel::result::DatabaseErrorKind::DatabaseIsLocked, 130 - // Box::new("Database is locked".to_string()), 131 - // )); 132 - // tokio::time::sleep(Duration::from_millis(backoff_ms)).await; 133 - // } 134 - Err(e) => return Err(e.into()), 135 - } 136 - } 137 - 138 - Err(anyhow::anyhow!( 139 - "Max retries exceeded: {}", 140 - last_error.unwrap_or_else(|| result::Error::RollbackTransaction) 141 - )) 142 - } 143 - 144 - /// Check if currently in a transaction 145 - pub fn assert_transaction(&self) -> Result<()> { 146 - // SQLite doesn't have a straightforward way to check transaction state 147 - // We'll implement a simplified version that just returns Ok for now 148 - Ok(()) 149 - } 150 - 151 - /// Run a transaction with retry logic for busy database errors 152 - pub async fn transaction<T, F>(&self, f: F) -> Result<T> 153 - where 154 - F: FnOnce(&mut SqliteConnection) -> Result<T> + Send, 155 - T: Send + 'static, 156 - { 157 - self.run(|conn| { 158 - conn.transaction(|tx| f(tx).map_err(|e| result::Error::RollbackTransaction)) 159 - }) 160 - .await 161 - } 162 - 163 - /// Run a transaction with no retry logic 164 - pub async fn transaction_no_retry<T, F>(&self, f: F) -> Result<T> 165 - where 166 - F: FnOnce(&mut SqliteConnection) -> std::result::Result<T, result::Error> + Send, 167 - T: Send + 'static, 168 - { 169 - let mut conn = self 170 - .pool 171 - .get() 172 - .context("Failed to get connection for transaction")?; 173 - 174 - conn.transaction(|tx| f(tx)) 175 - .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e)) 176 - } 177 - } 178 - 179 - /// Create a connection pool for SQLite 180 - pub async fn create_sqlite_pool(database_url: &str) -> Result<SqlitePool> { 181 - let manager = ConnectionManager::<SqliteConnection>::new(database_url); 182 - let pool = Pool::builder() 183 - .max_size(10) 184 - .connection_timeout(Duration::from_secs(30)) 185 - .test_on_check_out(true) 186 - .build(manager) 187 - .context("Failed to create connection pool")?; 188 - 189 - // Apply recommended SQLite settings 190 - let conn = &mut pool.get()?; 191 - conn.batch_execute( 192 - "PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; PRAGMA foreign_keys = ON;", 193 - )?; 194 - 195 - Ok(pool) 196 - }