1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::errors::Result;
5use async_trait::async_trait;
6use atproto_identity::model::Document;
7use atproto_identity::storage::DidDocumentStorage;
8use chrono::Utc;
9use sqlx::Row;
10use sqlx::sqlite::SqlitePool;
11
12use super::{Identity, IdentityStorage, Post, PostReference, PostStorage, Storage};
13
14/// SQLite storage implementation for blog posts and identities.
15#[derive(Debug, Clone)]
16pub struct SqliteStorage {
17 pool: SqlitePool,
18}
19
20impl SqliteStorage {
21 /// Create a new SQLite storage instance with the given connection pool.
22 pub fn new(pool: SqlitePool) -> Self {
23 Self { pool }
24 }
25
26 async fn migrate(&self) -> Result<()> {
27 // Create identities table
28 sqlx::query(
29 r#"
30 CREATE TABLE IF NOT EXISTS identities (
31 did TEXT PRIMARY KEY,
32 handle TEXT NOT NULL,
33 record JSON NOT NULL,
34 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
35 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
36 );
37 "#,
38 )
39 .execute(&self.pool)
40 .await?;
41
42 // Create indices for identities
43 sqlx::query(
44 r#"
45 CREATE INDEX IF NOT EXISTS idx_identities_handle ON identities(handle);
46 CREATE INDEX IF NOT EXISTS idx_identities_created_at ON identities(created_at);
47 CREATE INDEX IF NOT EXISTS idx_identities_updated_at ON identities(updated_at);
48 "#,
49 )
50 .execute(&self.pool)
51 .await?;
52
53 // Create posts table
54 sqlx::query(
55 r#"
56 CREATE TABLE IF NOT EXISTS posts (
57 aturi TEXT PRIMARY KEY,
58 cid TEXT NOT NULL,
59 title TEXT NOT NULL,
60 slug TEXT NOT NULL UNIQUE,
61 content TEXT NOT NULL,
62 record_key TEXT NOT NULL,
63 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
64 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
65 record JSON NOT NULL
66 );
67 "#,
68 )
69 .execute(&self.pool)
70 .await?;
71
72 // Create indices for posts
73 sqlx::query(
74 r#"
75 CREATE INDEX IF NOT EXISTS idx_posts_slug ON posts(slug);
76 CREATE INDEX IF NOT EXISTS idx_posts_record_key ON posts(record_key);
77 CREATE INDEX IF NOT EXISTS idx_posts_created_at ON posts(created_at);
78 CREATE INDEX IF NOT EXISTS idx_posts_updated_at ON posts(updated_at);
79 "#,
80 )
81 .execute(&self.pool)
82 .await?;
83
84 // Create post_references table
85 sqlx::query(
86 r#"
87 CREATE TABLE IF NOT EXISTS post_references (
88 aturi TEXT PRIMARY KEY,
89 cid TEXT NOT NULL,
90 did TEXT NOT NULL,
91 collection TEXT NOT NULL,
92 post_aturi TEXT NOT NULL,
93 discovered_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
94 record JSON NOT NULL
95 );
96 "#,
97 )
98 .execute(&self.pool)
99 .await?;
100
101 // Create indices for post_references
102 sqlx::query(
103 r#"
104 CREATE INDEX IF NOT EXISTS idx_post_references_did ON post_references(did);
105 CREATE INDEX IF NOT EXISTS idx_post_references_collection ON post_references(collection);
106 CREATE INDEX IF NOT EXISTS idx_post_references_post_aturi ON post_references(post_aturi);
107 CREATE INDEX IF NOT EXISTS idx_post_references_discovered_at ON post_references(discovered_at);
108 "#,
109 )
110 .execute(&self.pool)
111 .await?;
112
113 Ok(())
114 }
115}
116
117#[async_trait]
118impl PostStorage for SqliteStorage {
119 async fn upsert_post(&self, post: &Post) -> Result<()> {
120 sqlx::query(
121 r#"
122 INSERT INTO posts (aturi, cid, title, slug, content, record_key, created_at, updated_at, record)
123 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
124 ON CONFLICT(aturi) DO UPDATE SET
125 cid = EXCLUDED.cid,
126 title = EXCLUDED.title,
127 slug = EXCLUDED.slug,
128 content = EXCLUDED.content,
129 record_key = EXCLUDED.record_key,
130 updated_at = EXCLUDED.updated_at,
131 record = EXCLUDED.record
132 "#,
133 )
134 .bind(&post.aturi)
135 .bind(&post.cid)
136 .bind(&post.title)
137 .bind(&post.slug)
138 .bind(&post.content)
139 .bind(&post.record_key)
140 .bind(post.created_at)
141 .bind(post.updated_at)
142 .bind(&post.record)
143 .execute(&self.pool)
144 .await?;
145
146 Ok(())
147 }
148
149 async fn get_post(&self, aturi: &str) -> Result<Option<Post>> {
150 let row = sqlx::query_as::<_, Post>("SELECT * FROM posts WHERE aturi = $1")
151 .bind(aturi)
152 .fetch_optional(&self.pool)
153 .await?;
154
155 Ok(row)
156 }
157
158 async fn get_posts(&self) -> Result<Vec<Post>> {
159 let rows = sqlx::query_as::<_, Post>("SELECT * FROM posts ORDER BY created_at DESC")
160 .fetch_all(&self.pool)
161 .await?;
162
163 Ok(rows)
164 }
165
166 async fn delete_post(&self, aturi: &str) -> Result<Option<Post>> {
167 let post = self.get_post(aturi).await?;
168
169 if post.is_some() {
170 sqlx::query("DELETE FROM posts WHERE aturi = $1")
171 .bind(aturi)
172 .execute(&self.pool)
173 .await?;
174 }
175
176 Ok(post)
177 }
178
179 async fn upsert_post_reference(&self, post_reference: &PostReference) -> Result<bool> {
180 let existing = sqlx::query("SELECT 1 FROM post_references WHERE aturi = $1")
181 .bind(&post_reference.aturi)
182 .fetch_optional(&self.pool)
183 .await?;
184
185 let is_new = existing.is_none();
186
187 sqlx::query(
188 r#"
189 INSERT INTO post_references (aturi, cid, did, collection, post_aturi, discovered_at, record)
190 VALUES ($1, $2, $3, $4, $5, $6, $7)
191 ON CONFLICT(aturi) DO UPDATE SET
192 cid = EXCLUDED.cid,
193 did = EXCLUDED.did,
194 collection = EXCLUDED.collection,
195 post_aturi = EXCLUDED.post_aturi,
196 record = EXCLUDED.record
197 "#,
198 )
199 .bind(&post_reference.aturi)
200 .bind(&post_reference.cid)
201 .bind(&post_reference.did)
202 .bind(&post_reference.collection)
203 .bind(&post_reference.post_aturi)
204 .bind(post_reference.discovered_at)
205 .bind(&post_reference.record)
206 .execute(&self.pool)
207 .await?;
208
209 Ok(is_new)
210 }
211
212 async fn delete_post_reference(&self, aturi: &str) -> Result<()> {
213 sqlx::query("DELETE FROM post_references WHERE aturi = $1")
214 .bind(aturi)
215 .execute(&self.pool)
216 .await?;
217
218 Ok(())
219 }
220
221 async fn get_post_reference_count(&self, post_aturi: &str) -> Result<HashMap<String, i64>> {
222 let rows = sqlx::query(
223 r#"
224 SELECT collection, COUNT(*) as count
225 FROM post_references
226 WHERE post_aturi = $1
227 GROUP BY collection
228 "#,
229 )
230 .bind(post_aturi)
231 .fetch_all(&self.pool)
232 .await?;
233
234 let mut count_map = HashMap::new();
235 for row in rows {
236 let collection: String = row.get("collection");
237 let count: i64 = row.get("count");
238 count_map.insert(collection, count);
239 }
240
241 Ok(count_map)
242 }
243
244 async fn get_post_references_for_post(&self, post_aturi: &str) -> Result<Vec<PostReference>> {
245 let rows = sqlx::query_as::<_, PostReference>(
246 "SELECT * FROM post_references WHERE post_aturi = $1 ORDER BY discovered_at DESC",
247 )
248 .bind(post_aturi)
249 .fetch_all(&self.pool)
250 .await?;
251
252 Ok(rows)
253 }
254
255 async fn get_post_references_for_post_for_collection(
256 &self,
257 post_aturi: &str,
258 collection: &str,
259 ) -> Result<Vec<PostReference>> {
260 let rows = sqlx::query_as::<_, PostReference>(
261 "SELECT * FROM post_references WHERE post_aturi = $1 AND collection = $2 ORDER BY discovered_at DESC",
262 )
263 .bind(post_aturi)
264 .bind(collection)
265 .fetch_all(&self.pool)
266 .await?;
267
268 Ok(rows)
269 }
270}
271
272#[async_trait]
273impl IdentityStorage for SqliteStorage {
274 async fn upsert_identity(&self, identity: &Identity) -> Result<()> {
275 sqlx::query(
276 r#"
277 INSERT INTO identities (did, handle, record, created_at, updated_at)
278 VALUES ($1, $2, $3, $4, $5)
279 ON CONFLICT(did) DO UPDATE SET
280 handle = EXCLUDED.handle,
281 record = EXCLUDED.record,
282 updated_at = EXCLUDED.updated_at
283 "#,
284 )
285 .bind(&identity.did)
286 .bind(&identity.handle)
287 .bind(&identity.record)
288 .bind(identity.created_at)
289 .bind(identity.updated_at)
290 .execute(&self.pool)
291 .await?;
292
293 Ok(())
294 }
295
296 async fn get_identity_by_did(&self, did: &str) -> Result<Option<Identity>> {
297 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE did = $1")
298 .bind(did)
299 .fetch_optional(&self.pool)
300 .await?;
301
302 Ok(row)
303 }
304
305 async fn get_identity_by_handle(&self, handle: &str) -> Result<Option<Identity>> {
306 let row = sqlx::query_as::<_, Identity>("SELECT * FROM identities WHERE handle = $1")
307 .bind(handle)
308 .fetch_optional(&self.pool)
309 .await?;
310
311 Ok(row)
312 }
313
314 async fn delete_identity(&self, did: &str) -> Result<Option<Identity>> {
315 let identity = self.get_identity_by_did(did).await?;
316
317 if identity.is_some() {
318 sqlx::query("DELETE FROM identities WHERE did = $1")
319 .bind(did)
320 .execute(&self.pool)
321 .await?;
322 }
323
324 Ok(identity)
325 }
326}
327
328#[async_trait]
329impl Storage for SqliteStorage {
330 async fn migrate(&self) -> Result<()> {
331 self.migrate().await
332 }
333}
334
335/// DID document storage implementation using SQLite.
336pub struct SqliteStorageDidDocumentStorage {
337 storage: Arc<SqliteStorage>,
338}
339
340impl SqliteStorageDidDocumentStorage {
341 /// Create a new DID document storage instance backed by SQLite.
342 pub fn new(storage: Arc<SqliteStorage>) -> Self {
343 Self { storage }
344 }
345}
346
347#[async_trait]
348impl DidDocumentStorage for SqliteStorageDidDocumentStorage {
349 async fn get_document_by_did(&self, did: &str) -> anyhow::Result<Option<Document>> {
350 if let Some(identity) = self
351 .storage
352 .get_identity_by_did(did)
353 .await
354 .map_err(anyhow::Error::new)?
355 {
356 let document: Document = serde_json::from_value(identity.record)?;
357 Ok(Some(document))
358 } else {
359 Ok(None)
360 }
361 }
362
363 async fn store_document(&self, doc: Document) -> anyhow::Result<()> {
364 let handle = doc
365 .also_known_as
366 .first()
367 .and_then(|aka| aka.strip_prefix("at://"))
368 .unwrap_or("unknown.handle")
369 .to_string();
370
371 // Create a JSON representation of the document
372 let record = serde_json::json!(doc);
373
374 let identity = Identity {
375 did: doc.id.clone(),
376 handle,
377 record,
378 created_at: Utc::now(),
379 updated_at: Utc::now(),
380 };
381
382 self.storage
383 .upsert_identity(&identity)
384 .await
385 .map_err(anyhow::Error::new)
386 }
387
388 async fn delete_document_by_did(&self, did: &str) -> anyhow::Result<()> {
389 self.storage
390 .delete_identity(did)
391 .await
392 .map_err(anyhow::Error::new)?;
393 Ok(())
394 }
395}