use std::sync::Arc; use crate::errors::Result; use async_trait::async_trait; use atproto_identity::model::Document; use atproto_identity::storage::DidDocumentStorage; use chrono::Utc; use sqlx::sqlite::SqlitePool; use super::{Award, AwardWithBadge, Badge, Identity, Storage}; /// SQLite storage implementation for badges and awards. #[derive(Debug, Clone)] pub struct SqliteStorage { pool: SqlitePool, } impl SqliteStorage { /// Create a new SQLite storage instance with the given connection pool. pub fn new(pool: SqlitePool) -> Self { Self { pool } } async fn migrate(&self) -> Result<()> { sqlx::query( r#" CREATE TABLE IF NOT EXISTS identities ( did TEXT PRIMARY KEY, handle TEXT NOT NULL, record JSON NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); "#, ) .execute(&self.pool) .await?; sqlx::query( r#" CREATE INDEX IF NOT EXISTS idx_identities_handle ON identities(handle); CREATE INDEX IF NOT EXISTS idx_identities_created_at ON identities(created_at); CREATE INDEX IF NOT EXISTS idx_identities_updated_at ON identities(updated_at); "#, ) .execute(&self.pool) .await?; sqlx::query( r#" CREATE TABLE IF NOT EXISTS awards ( aturi TEXT PRIMARY KEY, cid TEXT NOT NULL, did TEXT NOT NULL, badge TEXT NOT NULL, badge_cid TEXT NOT NULL, badge_name TEXT NOT NULL, validated_issuers JSON NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, record JSON NOT NULL ); "#, ) .execute(&self.pool) .await?; sqlx::query( r#" CREATE INDEX IF NOT EXISTS idx_awards_did ON awards(did); CREATE INDEX IF NOT EXISTS idx_awards_badge ON awards(badge); CREATE INDEX IF NOT EXISTS idx_awards_badge_cid ON awards(badge_cid); CREATE INDEX IF NOT EXISTS idx_awards_created_at ON awards(created_at); CREATE INDEX IF NOT EXISTS idx_awards_updated_at ON awards(updated_at); "#, ) .execute(&self.pool) .await?; sqlx::query( r#" CREATE TABLE IF NOT EXISTS badges ( aturi TEXT NOT NULL, cid TEXT NOT NULL, name TEXT NOT NULL, image TEXT, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, count INTEGER NOT NULL DEFAULT 0, record JSON NOT NULL DEFAULT '{}', PRIMARY KEY (aturi, cid) ); "#, ) .execute(&self.pool) .await?; // Add record column to existing badges table if it doesn't exist sqlx::query( r#" ALTER TABLE badges ADD COLUMN record JSON NOT NULL DEFAULT '{}'; "#, ) .execute(&self.pool) .await .ok(); // Ignore error if column already exists sqlx::query( r#" CREATE INDEX IF NOT EXISTS idx_badges_aturi ON badges(aturi); CREATE INDEX IF NOT EXISTS idx_badges_cid ON badges(cid); CREATE INDEX IF NOT EXISTS idx_badges_created_at ON badges(created_at); CREATE INDEX IF NOT EXISTS idx_badges_updated_at ON badges(updated_at); CREATE INDEX IF NOT EXISTS idx_badges_record ON badges(json_extract(record, '$.name')); "#, ) .execute(&self.pool) .await?; Ok(()) } async fn upsert_identity(&self, identity: &Identity) -> Result<()> { sqlx::query( r#" INSERT INTO identities (did, handle, record, created_at, updated_at) VALUES ($1, $2, $3, $4, $5) ON CONFLICT(did) DO UPDATE SET handle = EXCLUDED.handle, record = EXCLUDED.record, updated_at = EXCLUDED.updated_at "#, ) .bind(&identity.did) .bind(&identity.handle) .bind(&identity.record) .bind(identity.created_at) .bind(identity.updated_at) .execute(&self.pool) .await?; Ok(()) } async fn get_identity_by_did(&self, did: &str) -> Result> { let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE did = $1") .bind(did) .fetch_optional(&self.pool) .await?; Ok(row) } async fn get_identity_by_handle(&self, handle: &str) -> Result> { let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE handle = $1") .bind(handle) .fetch_optional(&self.pool) .await?; Ok(row) } async fn upsert_badge(&self, badge: &Badge) -> Result<()> { sqlx::query( r#" INSERT INTO badges (aturi, cid, name, image, created_at, updated_at, count, record) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT(aturi, cid) DO UPDATE SET name = EXCLUDED.name, image = EXCLUDED.image, updated_at = EXCLUDED.updated_at, record = EXCLUDED.record "#, ) .bind(&badge.aturi) .bind(&badge.cid) .bind(&badge.name) .bind(&badge.image) .bind(badge.created_at) .bind(badge.updated_at) .bind(badge.count) .bind(&badge.record) .execute(&self.pool) .await?; Ok(()) } async fn get_badge(&self, aturi: &str, cid: &str) -> Result> { let row = sqlx::query_as::<_, Badge>("SELECT * FROM badges WHERE aturi = $1 AND cid = $2") .bind(aturi) .bind(cid) .fetch_optional(&self.pool) .await?; Ok(row) } async fn increment_badge_count(&self, aturi: &str, cid: &str) -> Result<()> { sqlx::query( r#" UPDATE badges SET count = count + 1, updated_at = CURRENT_TIMESTAMP WHERE aturi = $1 AND cid = $2 "#, ) .bind(aturi) .bind(cid) .execute(&self.pool) .await?; Ok(()) } async fn decrement_badge_count(&self, aturi: &str, cid: &str) -> Result<()> { sqlx::query( r#" UPDATE badges SET count = GREATEST(0, count - 1), updated_at = CURRENT_TIMESTAMP WHERE aturi = $1 AND cid = $2 "#, ) .bind(aturi) .bind(cid) .execute(&self.pool) .await?; Ok(()) } async fn upsert_award(&self, award: &Award) -> Result { let existing = self.get_award(&award.aturi).await?; let is_new = existing.is_none(); sqlx::query( r#" INSERT INTO awards (aturi, cid, did, badge, badge_cid, badge_name, validated_issuers, created_at, updated_at, record) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT(aturi) DO UPDATE SET cid = EXCLUDED.cid, badge = EXCLUDED.badge, badge_cid = EXCLUDED.badge_cid, badge_name = EXCLUDED.badge_name, validated_issuers = EXCLUDED.validated_issuers, updated_at = EXCLUDED.updated_at, record = EXCLUDED.record "#, ) .bind(&award.aturi) .bind(&award.cid) .bind(&award.did) .bind(&award.badge) .bind(&award.badge_cid) .bind(&award.badge_name) .bind(&award.validated_issuers) .bind(award.created_at) .bind(award.updated_at) .bind(&award.record) .execute(&self.pool) .await?; Ok(is_new) } async fn get_award(&self, aturi: &str) -> Result> { let row = sqlx::query_as::<_, Award>("SELECT * FROM awards WHERE aturi = $1") .bind(aturi) .fetch_optional(&self.pool) .await?; Ok(row) } async fn delete_award(&self, aturi: &str) -> Result> { let award = self.get_award(aturi).await?; if award.is_some() { sqlx::query("DELETE FROM awards WHERE aturi = $1") .bind(aturi) .execute(&self.pool) .await?; } Ok(award) } async fn trim_awards_for_did(&self, did: &str, max_count: i64) -> Result<()> { sqlx::query( r#" DELETE FROM awards WHERE aturi IN ( SELECT aturi FROM awards WHERE did = $1 ORDER BY updated_at DESC LIMIT -1 OFFSET $2 ) "#, ) .bind(did) .bind(max_count) .execute(&self.pool) .await?; Ok(()) } async fn get_recent_awards(&self, limit: i64) -> Result> { let awards = sqlx::query_as::<_, Award>("SELECT * FROM awards ORDER BY updated_at DESC LIMIT $1") .bind(limit) .fetch_all(&self.pool) .await?; self.enrich_awards_with_details(awards).await } async fn get_awards_for_did(&self, did: &str, limit: i64) -> Result> { let awards = sqlx::query_as::<_, Award>( "SELECT * FROM awards WHERE did = $1 ORDER BY updated_at DESC LIMIT $2", ) .bind(did) .bind(limit) .fetch_all(&self.pool) .await?; self.enrich_awards_with_details(awards).await } async fn enrich_awards_with_details(&self, awards: Vec) -> Result> { let mut result = Vec::new(); for award in awards { let badge = self.get_badge(&award.badge, &award.badge_cid).await?; let identity = self.get_identity_by_did(&award.did).await?; let validated_issuers: Vec = serde_json::from_value(award.validated_issuers.clone()).unwrap_or_default(); let mut signer_identities = Vec::new(); for issuer in validated_issuers { if let Ok(Some(signer_identity)) = self.get_identity_by_did(&issuer).await { signer_identities.push(signer_identity); } } result.push(AwardWithBadge { award, badge, identity, signer_identities, }); } Ok(result) } } #[async_trait] impl Storage for SqliteStorage { async fn migrate(&self) -> Result<()> { self.migrate().await } async fn upsert_identity(&self, identity: &Identity) -> Result<()> { self.upsert_identity(identity).await } async fn get_identity_by_did(&self, did: &str) -> Result> { self.get_identity_by_did(did).await } async fn get_identity_by_handle(&self, handle: &str) -> Result> { self.get_identity_by_handle(handle).await } async fn upsert_badge(&self, badge: &Badge) -> Result<()> { self.upsert_badge(badge).await } async fn get_badge(&self, aturi: &str, cid: &str) -> Result> { self.get_badge(aturi, cid).await } async fn increment_badge_count(&self, aturi: &str, cid: &str) -> Result<()> { self.increment_badge_count(aturi, cid).await } async fn decrement_badge_count(&self, aturi: &str, cid: &str) -> Result<()> { self.decrement_badge_count(aturi, cid).await } async fn upsert_award(&self, award: &Award) -> Result { self.upsert_award(award).await } async fn get_award(&self, aturi: &str) -> Result> { self.get_award(aturi).await } async fn delete_award(&self, aturi: &str) -> Result> { self.delete_award(aturi).await } async fn trim_awards_for_did(&self, did: &str, max_count: i64) -> Result<()> { self.trim_awards_for_did(did, max_count).await } async fn get_recent_awards(&self, limit: i64) -> Result> { self.get_recent_awards(limit).await } async fn get_awards_for_did(&self, did: &str, limit: i64) -> Result> { self.get_awards_for_did(did, limit).await } } /// DID document storage implementation using SQLite. pub struct SqliteStorageDidDocumentStorage { storage: Arc, } impl SqliteStorageDidDocumentStorage { /// Create a new DID document storage instance backed by SQLite. pub fn new(storage: Arc) -> Self { Self { storage } } } #[async_trait] impl DidDocumentStorage for SqliteStorageDidDocumentStorage { async fn get_document_by_did(&self, did: &str) -> anyhow::Result> { if let Some(identity) = self .storage .get_identity_by_did(did) .await .map_err(anyhow::Error::new)? { let document: Document = serde_json::from_value(identity.record)?; Ok(Some(document)) } else { Ok(None) } } async fn store_document(&self, doc: Document) -> anyhow::Result<()> { let handle = doc .also_known_as .first() .and_then(|aka| aka.strip_prefix("at://")) .unwrap_or("unknown.handle") .to_string(); // Create a simple JSON representation of the document let record = serde_json::json!(doc); let identity = Identity { did: doc.id.clone(), handle, record, created_at: Utc::now(), updated_at: Utc::now(), }; self.storage .upsert_identity(&identity) .await .map_err(anyhow::Error::new) } async fn delete_document_by_did(&self, _did: &str) -> anyhow::Result<()> { Ok(()) } }