use std::collections::HashMap; use std::sync::Arc; use crate::errors::Result; use async_trait::async_trait; use atproto_identity::model::Document; use atproto_identity::storage::DidDocumentStorage; use chrono::Utc; use sqlx::Row; use sqlx::sqlite::SqlitePool; use super::{Identity, IdentityStorage, Post, PostReference, PostStorage, Storage}; /// SQLite storage implementation for blog posts and identities. #[derive(Debug, Clone)] pub struct SqliteStorage { pool: SqlitePool, } impl SqliteStorage { /// Create a new SQLite storage instance with the given connection pool. pub fn new(pool: SqlitePool) -> Self { Self { pool } } async fn migrate(&self) -> Result<()> { // Create identities table sqlx::query( r#" CREATE TABLE IF NOT EXISTS identities ( did TEXT PRIMARY KEY, handle TEXT NOT NULL, record JSON NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); "#, ) .execute(&self.pool) .await?; // Create indices for identities sqlx::query( r#" CREATE INDEX IF NOT EXISTS idx_identities_handle ON identities(handle); CREATE INDEX IF NOT EXISTS idx_identities_created_at ON identities(created_at); CREATE INDEX IF NOT EXISTS idx_identities_updated_at ON identities(updated_at); "#, ) .execute(&self.pool) .await?; // Create posts table sqlx::query( r#" CREATE TABLE IF NOT EXISTS posts ( aturi TEXT PRIMARY KEY, cid TEXT NOT NULL, title TEXT NOT NULL, slug TEXT NOT NULL UNIQUE, content TEXT NOT NULL, record_key TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, record JSON NOT NULL ); "#, ) .execute(&self.pool) .await?; // Create indices for posts sqlx::query( r#" CREATE INDEX IF NOT EXISTS idx_posts_slug ON posts(slug); CREATE INDEX IF NOT EXISTS idx_posts_record_key ON posts(record_key); CREATE INDEX IF NOT EXISTS idx_posts_created_at ON posts(created_at); CREATE INDEX IF NOT EXISTS idx_posts_updated_at ON posts(updated_at); "#, ) .execute(&self.pool) .await?; // Create post_references table sqlx::query( r#" CREATE TABLE IF NOT EXISTS post_references ( aturi TEXT PRIMARY KEY, cid TEXT NOT NULL, did TEXT NOT NULL, collection TEXT NOT NULL, post_aturi TEXT NOT NULL, discovered_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, record JSON NOT NULL ); "#, ) .execute(&self.pool) .await?; // Create indices for post_references sqlx::query( r#" CREATE INDEX IF NOT EXISTS idx_post_references_did ON post_references(did); CREATE INDEX IF NOT EXISTS idx_post_references_collection ON post_references(collection); CREATE INDEX IF NOT EXISTS idx_post_references_post_aturi ON post_references(post_aturi); CREATE INDEX IF NOT EXISTS idx_post_references_discovered_at ON post_references(discovered_at); "#, ) .execute(&self.pool) .await?; Ok(()) } } #[async_trait] impl PostStorage for SqliteStorage { async fn upsert_post(&self, post: &Post) -> Result<()> { sqlx::query( r#" INSERT INTO posts (aturi, cid, title, slug, content, record_key, created_at, updated_at, record) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT(aturi) DO UPDATE SET cid = EXCLUDED.cid, title = EXCLUDED.title, slug = EXCLUDED.slug, content = EXCLUDED.content, record_key = EXCLUDED.record_key, updated_at = EXCLUDED.updated_at, record = EXCLUDED.record "#, ) .bind(&post.aturi) .bind(&post.cid) .bind(&post.title) .bind(&post.slug) .bind(&post.content) .bind(&post.record_key) .bind(post.created_at) .bind(post.updated_at) .bind(&post.record) .execute(&self.pool) .await?; Ok(()) } async fn get_post(&self, aturi: &str) -> Result> { let row = sqlx::query_as::<_, Post>("SELECT * FROM posts WHERE aturi = $1") .bind(aturi) .fetch_optional(&self.pool) .await?; Ok(row) } async fn get_posts(&self) -> Result> { let rows = sqlx::query_as::<_, Post>("SELECT * FROM posts ORDER BY created_at DESC") .fetch_all(&self.pool) .await?; Ok(rows) } async fn delete_post(&self, aturi: &str) -> Result> { let post = self.get_post(aturi).await?; if post.is_some() { sqlx::query("DELETE FROM posts WHERE aturi = $1") .bind(aturi) .execute(&self.pool) .await?; } Ok(post) } async fn upsert_post_reference(&self, post_reference: &PostReference) -> Result { let existing = sqlx::query("SELECT 1 FROM post_references WHERE aturi = $1") .bind(&post_reference.aturi) .fetch_optional(&self.pool) .await?; let is_new = existing.is_none(); sqlx::query( r#" INSERT INTO post_references (aturi, cid, did, collection, post_aturi, discovered_at, record) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT(aturi) DO UPDATE SET cid = EXCLUDED.cid, did = EXCLUDED.did, collection = EXCLUDED.collection, post_aturi = EXCLUDED.post_aturi, record = EXCLUDED.record "#, ) .bind(&post_reference.aturi) .bind(&post_reference.cid) .bind(&post_reference.did) .bind(&post_reference.collection) .bind(&post_reference.post_aturi) .bind(post_reference.discovered_at) .bind(&post_reference.record) .execute(&self.pool) .await?; Ok(is_new) } async fn delete_post_reference(&self, aturi: &str) -> Result<()> { sqlx::query("DELETE FROM post_references WHERE aturi = $1") .bind(aturi) .execute(&self.pool) .await?; Ok(()) } async fn get_post_reference_count(&self, post_aturi: &str) -> Result> { let rows = sqlx::query( r#" SELECT collection, COUNT(*) as count FROM post_references WHERE post_aturi = $1 GROUP BY collection "#, ) .bind(post_aturi) .fetch_all(&self.pool) .await?; let mut count_map = HashMap::new(); for row in rows { let collection: String = row.get("collection"); let count: i64 = row.get("count"); count_map.insert(collection, count); } Ok(count_map) } async fn get_post_references_for_post(&self, post_aturi: &str) -> Result> { let rows = sqlx::query_as::<_, PostReference>( "SELECT * FROM post_references WHERE post_aturi = $1 ORDER BY discovered_at DESC", ) .bind(post_aturi) .fetch_all(&self.pool) .await?; Ok(rows) } async fn get_post_references_for_post_for_collection( &self, post_aturi: &str, collection: &str, ) -> Result> { let rows = sqlx::query_as::<_, PostReference>( "SELECT * FROM post_references WHERE post_aturi = $1 AND collection = $2 ORDER BY discovered_at DESC", ) .bind(post_aturi) .bind(collection) .fetch_all(&self.pool) .await?; Ok(rows) } } #[async_trait] impl IdentityStorage for SqliteStorage { async fn upsert_identity(&self, identity: &Identity) -> Result<()> { sqlx::query( r#" INSERT INTO identities (did, handle, record, created_at, updated_at) VALUES ($1, $2, $3, $4, $5) ON CONFLICT(did) DO UPDATE SET handle = EXCLUDED.handle, record = EXCLUDED.record, updated_at = EXCLUDED.updated_at "#, ) .bind(&identity.did) .bind(&identity.handle) .bind(&identity.record) .bind(identity.created_at) .bind(identity.updated_at) .execute(&self.pool) .await?; Ok(()) } async fn get_identity_by_did(&self, did: &str) -> Result> { let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE did = $1") .bind(did) .fetch_optional(&self.pool) .await?; Ok(row) } async fn get_identity_by_handle(&self, handle: &str) -> Result> { let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE handle = $1") .bind(handle) .fetch_optional(&self.pool) .await?; Ok(row) } async fn delete_identity(&self, did: &str) -> Result> { let identity = self.get_identity_by_did(did).await?; if identity.is_some() { sqlx::query("DELETE FROM identities WHERE did = $1") .bind(did) .execute(&self.pool) .await?; } Ok(identity) } } #[async_trait] impl Storage for SqliteStorage { async fn migrate(&self) -> Result<()> { self.migrate().await } } /// DID document storage implementation using SQLite. pub struct SqliteStorageDidDocumentStorage { storage: Arc, } impl SqliteStorageDidDocumentStorage { /// Create a new DID document storage instance backed by SQLite. pub fn new(storage: Arc) -> Self { Self { storage } } } #[async_trait] impl DidDocumentStorage for SqliteStorageDidDocumentStorage { async fn get_document_by_did(&self, did: &str) -> anyhow::Result> { if let Some(identity) = self .storage .get_identity_by_did(did) .await .map_err(anyhow::Error::new)? { let document: Document = serde_json::from_value(identity.record)?; Ok(Some(document)) } else { Ok(None) } } async fn store_document(&self, doc: Document) -> anyhow::Result<()> { let handle = doc .also_known_as .first() .and_then(|aka| aka.strip_prefix("at://")) .unwrap_or("unknown.handle") .to_string(); // Create a JSON representation of the document let record = serde_json::json!(doc); let identity = Identity { did: doc.id.clone(), handle, record, created_at: Utc::now(), updated_at: Utc::now(), }; self.storage .upsert_identity(&identity) .await .map_err(anyhow::Error::new) } async fn delete_document_by_did(&self, did: &str) -> anyhow::Result<()> { self.storage .delete_identity(did) .await .map_err(anyhow::Error::new)?; Ok(()) } }