this repo has no description
at main 395 lines 12 kB view raw
1use std::collections::HashMap; 2use std::sync::Arc; 3 4use crate::errors::Result; 5use async_trait::async_trait; 6use atproto_identity::model::Document; 7use atproto_identity::storage::DidDocumentStorage; 8use chrono::Utc; 9use sqlx::Row; 10use sqlx::sqlite::SqlitePool; 11 12use super::{Identity, IdentityStorage, Post, PostReference, PostStorage, Storage}; 13 14/// SQLite storage implementation for blog posts and identities. 15#[derive(Debug, Clone)] 16pub struct SqliteStorage { 17 pool: SqlitePool, 18} 19 20impl SqliteStorage { 21 /// Create a new SQLite storage instance with the given connection pool. 22 pub fn new(pool: SqlitePool) -> Self { 23 Self { pool } 24 } 25 26 async fn migrate(&self) -> Result<()> { 27 // Create identities table 28 sqlx::query( 29 r#" 30 CREATE TABLE IF NOT EXISTS identities ( 31 did TEXT PRIMARY KEY, 32 handle TEXT NOT NULL, 33 record JSON NOT NULL, 34 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 35 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 36 ); 37 "#, 38 ) 39 .execute(&self.pool) 40 .await?; 41 42 // Create indices for identities 43 sqlx::query( 44 r#" 45 CREATE INDEX IF NOT EXISTS idx_identities_handle ON identities(handle); 46 CREATE INDEX IF NOT EXISTS idx_identities_created_at ON identities(created_at); 47 CREATE INDEX IF NOT EXISTS idx_identities_updated_at ON identities(updated_at); 48 "#, 49 ) 50 .execute(&self.pool) 51 .await?; 52 53 // Create posts table 54 sqlx::query( 55 r#" 56 CREATE TABLE IF NOT EXISTS posts ( 57 aturi TEXT PRIMARY KEY, 58 cid TEXT NOT NULL, 59 title TEXT NOT NULL, 60 slug TEXT NOT NULL UNIQUE, 61 content TEXT NOT NULL, 62 record_key TEXT NOT NULL, 63 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 64 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 65 record JSON NOT NULL 66 ); 67 "#, 68 ) 69 .execute(&self.pool) 70 .await?; 71 72 // Create indices for posts 73 sqlx::query( 74 r#" 75 CREATE INDEX IF NOT EXISTS idx_posts_slug ON posts(slug); 76 CREATE INDEX IF NOT EXISTS idx_posts_record_key ON posts(record_key); 77 CREATE INDEX IF NOT EXISTS idx_posts_created_at ON posts(created_at); 78 CREATE INDEX IF NOT EXISTS idx_posts_updated_at ON posts(updated_at); 79 "#, 80 ) 81 .execute(&self.pool) 82 .await?; 83 84 // Create post_references table 85 sqlx::query( 86 r#" 87 CREATE TABLE IF NOT EXISTS post_references ( 88 aturi TEXT PRIMARY KEY, 89 cid TEXT NOT NULL, 90 did TEXT NOT NULL, 91 collection TEXT NOT NULL, 92 post_aturi TEXT NOT NULL, 93 discovered_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 94 record JSON NOT NULL 95 ); 96 "#, 97 ) 98 .execute(&self.pool) 99 .await?; 100 101 // Create indices for post_references 102 sqlx::query( 103 r#" 104 CREATE INDEX IF NOT EXISTS idx_post_references_did ON post_references(did); 105 CREATE INDEX IF NOT EXISTS idx_post_references_collection ON post_references(collection); 106 CREATE INDEX IF NOT EXISTS idx_post_references_post_aturi ON post_references(post_aturi); 107 CREATE INDEX IF NOT EXISTS idx_post_references_discovered_at ON post_references(discovered_at); 108 "#, 109 ) 110 .execute(&self.pool) 111 .await?; 112 113 Ok(()) 114 } 115} 116 117#[async_trait] 118impl PostStorage for SqliteStorage { 119 async fn upsert_post(&self, post: &Post) -> Result<()> { 120 sqlx::query( 121 r#" 122 INSERT INTO posts (aturi, cid, title, slug, content, record_key, created_at, updated_at, record) 123 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 124 ON CONFLICT(aturi) DO UPDATE SET 125 cid = EXCLUDED.cid, 126 title = EXCLUDED.title, 127 slug = EXCLUDED.slug, 128 content = EXCLUDED.content, 129 record_key = EXCLUDED.record_key, 130 updated_at = EXCLUDED.updated_at, 131 record = EXCLUDED.record 132 "#, 133 ) 134 .bind(&post.aturi) 135 .bind(&post.cid) 136 .bind(&post.title) 137 .bind(&post.slug) 138 .bind(&post.content) 139 .bind(&post.record_key) 140 .bind(post.created_at) 141 .bind(post.updated_at) 142 .bind(&post.record) 143 .execute(&self.pool) 144 .await?; 145 146 Ok(()) 147 } 148 149 async fn get_post(&self, aturi: &str) -> Result<Option<Post>> { 150 let row = sqlx::query_as::<_, Post>("SELECT * FROM posts WHERE aturi = $1") 151 .bind(aturi) 152 .fetch_optional(&self.pool) 153 .await?; 154 155 Ok(row) 156 } 157 158 async fn get_posts(&self) -> Result<Vec<Post>> { 159 let rows = sqlx::query_as::<_, Post>("SELECT * FROM posts ORDER BY created_at DESC") 160 .fetch_all(&self.pool) 161 .await?; 162 163 Ok(rows) 164 } 165 166 async fn delete_post(&self, aturi: &str) -> Result<Option<Post>> { 167 let post = self.get_post(aturi).await?; 168 169 if post.is_some() { 170 sqlx::query("DELETE FROM posts WHERE aturi = $1") 171 .bind(aturi) 172 .execute(&self.pool) 173 .await?; 174 } 175 176 Ok(post) 177 } 178 179 async fn upsert_post_reference(&self, post_reference: &PostReference) -> Result<bool> { 180 let existing = sqlx::query("SELECT 1 FROM post_references WHERE aturi = $1") 181 .bind(&post_reference.aturi) 182 .fetch_optional(&self.pool) 183 .await?; 184 185 let is_new = existing.is_none(); 186 187 sqlx::query( 188 r#" 189 INSERT INTO post_references (aturi, cid, did, collection, post_aturi, discovered_at, record) 190 VALUES ($1, $2, $3, $4, $5, $6, $7) 191 ON CONFLICT(aturi) DO UPDATE SET 192 cid = EXCLUDED.cid, 193 did = EXCLUDED.did, 194 collection = EXCLUDED.collection, 195 post_aturi = EXCLUDED.post_aturi, 196 record = EXCLUDED.record 197 "#, 198 ) 199 .bind(&post_reference.aturi) 200 .bind(&post_reference.cid) 201 .bind(&post_reference.did) 202 .bind(&post_reference.collection) 203 .bind(&post_reference.post_aturi) 204 .bind(post_reference.discovered_at) 205 .bind(&post_reference.record) 206 .execute(&self.pool) 207 .await?; 208 209 Ok(is_new) 210 } 211 212 async fn delete_post_reference(&self, aturi: &str) -> Result<()> { 213 sqlx::query("DELETE FROM post_references WHERE aturi = $1") 214 .bind(aturi) 215 .execute(&self.pool) 216 .await?; 217 218 Ok(()) 219 } 220 221 async fn get_post_reference_count(&self, post_aturi: &str) -> Result<HashMap<String, i64>> { 222 let rows = sqlx::query( 223 r#" 224 SELECT collection, COUNT(*) as count 225 FROM post_references 226 WHERE post_aturi = $1 227 GROUP BY collection 228 "#, 229 ) 230 .bind(post_aturi) 231 .fetch_all(&self.pool) 232 .await?; 233 234 let mut count_map = HashMap::new(); 235 for row in rows { 236 let collection: String = row.get("collection"); 237 let count: i64 = row.get("count"); 238 count_map.insert(collection, count); 239 } 240 241 Ok(count_map) 242 } 243 244 async fn get_post_references_for_post(&self, post_aturi: &str) -> Result<Vec<PostReference>> { 245 let rows = sqlx::query_as::<_, PostReference>( 246 "SELECT * FROM post_references WHERE post_aturi = $1 ORDER BY discovered_at DESC", 247 ) 248 .bind(post_aturi) 249 .fetch_all(&self.pool) 250 .await?; 251 252 Ok(rows) 253 } 254 255 async fn get_post_references_for_post_for_collection( 256 &self, 257 post_aturi: &str, 258 collection: &str, 259 ) -> Result<Vec<PostReference>> { 260 let rows = sqlx::query_as::<_, PostReference>( 261 "SELECT * FROM post_references WHERE post_aturi = $1 AND collection = $2 ORDER BY discovered_at DESC", 262 ) 263 .bind(post_aturi) 264 .bind(collection) 265 .fetch_all(&self.pool) 266 .await?; 267 268 Ok(rows) 269 } 270} 271 272#[async_trait] 273impl IdentityStorage for SqliteStorage { 274 async fn upsert_identity(&self, identity: &Identity) -> Result<()> { 275 sqlx::query( 276 r#" 277 INSERT INTO identities (did, handle, record, created_at, updated_at) 278 VALUES ($1, $2, $3, $4, $5) 279 ON CONFLICT(did) DO UPDATE SET 280 handle = EXCLUDED.handle, 281 record = EXCLUDED.record, 282 updated_at = EXCLUDED.updated_at 283 "#, 284 ) 285 .bind(&identity.did) 286 .bind(&identity.handle) 287 .bind(&identity.record) 288 .bind(identity.created_at) 289 .bind(identity.updated_at) 290 .execute(&self.pool) 291 .await?; 292 293 Ok(()) 294 } 295 296 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> { 297 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE did = $1") 298 .bind(did) 299 .fetch_optional(&self.pool) 300 .await?; 301 302 Ok(row) 303 } 304 305 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> { 306 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE handle = $1") 307 .bind(handle) 308 .fetch_optional(&self.pool) 309 .await?; 310 311 Ok(row) 312 } 313 314 async fn delete_identity(&self, did: &str) -> Result<Option<Identity>> { 315 let identity = self.get_identity_by_did(did).await?; 316 317 if identity.is_some() { 318 sqlx::query("DELETE FROM identities WHERE did = $1") 319 .bind(did) 320 .execute(&self.pool) 321 .await?; 322 } 323 324 Ok(identity) 325 } 326} 327 328#[async_trait] 329impl Storage for SqliteStorage { 330 async fn migrate(&self) -> Result<()> { 331 self.migrate().await 332 } 333} 334 335/// DID document storage implementation using SQLite. 336pub struct SqliteStorageDidDocumentStorage { 337 storage: Arc<SqliteStorage>, 338} 339 340impl SqliteStorageDidDocumentStorage { 341 /// Create a new DID document storage instance backed by SQLite. 342 pub fn new(storage: Arc<SqliteStorage>) -> Self { 343 Self { storage } 344 } 345} 346 347#[async_trait] 348impl DidDocumentStorage for SqliteStorageDidDocumentStorage { 349 async fn get_document_by_did(&self, did: &str) -> anyhow::Result<Option<Document>> { 350 if let Some(identity) = self 351 .storage 352 .get_identity_by_did(did) 353 .await 354 .map_err(anyhow::Error::new)? 355 { 356 let document: Document = serde_json::from_value(identity.record)?; 357 Ok(Some(document)) 358 } else { 359 Ok(None) 360 } 361 } 362 363 async fn store_document(&self, doc: Document) -> anyhow::Result<()> { 364 let handle = doc 365 .also_known_as 366 .first() 367 .and_then(|aka| aka.strip_prefix("at://")) 368 .unwrap_or("unknown.handle") 369 .to_string(); 370 371 // Create a JSON representation of the document 372 let record = serde_json::json!(doc); 373 374 let identity = Identity { 375 did: doc.id.clone(), 376 handle, 377 record, 378 created_at: Utc::now(), 379 updated_at: Utc::now(), 380 }; 381 382 self.storage 383 .upsert_identity(&identity) 384 .await 385 .map_err(anyhow::Error::new) 386 } 387 388 async fn delete_document_by_did(&self, did: &str) -> anyhow::Result<()> { 389 self.storage 390 .delete_identity(did) 391 .await 392 .map_err(anyhow::Error::new)?; 393 Ok(()) 394 } 395}