personal activity index (bluesky, leaflet, substack)
pai.desertthunder.dev
rss
bluesky
1use pai_core::{Item, ListFilter, PaiError, Result, SourceKind, Storage};
2use rusqlite::{params, Connection, OptionalExtension};
3use std::path::Path;
4
5const SCHEMA_VERSION: i32 = 1;
6
7const INIT_SQL: &str = r#"
8CREATE TABLE IF NOT EXISTS schema_version (
9 version INTEGER PRIMARY KEY
10);
11
12CREATE TABLE IF NOT EXISTS items (
13 id TEXT PRIMARY KEY,
14 source_kind TEXT NOT NULL,
15 source_id TEXT NOT NULL,
16 author TEXT,
17 title TEXT,
18 summary TEXT,
19 url TEXT NOT NULL,
20 content_html TEXT,
21 published_at TEXT NOT NULL,
22 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
23);
24
25CREATE INDEX IF NOT EXISTS idx_items_source_date
26 ON items (source_kind, source_id, published_at DESC);
27"#;
28
29/// SQLite implementation of the Storage trait
30///
31/// Manages persistent storage of items in a local SQLite database.
32/// Handles schema initialization and migrations automatically on first connection.
33pub struct SqliteStorage {
34 conn: Connection,
35}
36
37impl SqliteStorage {
38 /// Opens or creates a SQLite database at the given path
39 ///
40 /// Initializes the schema if the database is new or runs migrations if needed.
41 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
42 let path_ref = path.as_ref();
43
44 if let Some(parent) = path_ref.parent() {
45 std::fs::create_dir_all(parent)
46 .map_err(|e| PaiError::Storage(format!("Failed to create database directory: {e}")))?;
47 }
48
49 let conn = Connection::open(path).map_err(|e| PaiError::Storage(format!("Failed to open database: {e}")))?;
50
51 let mut storage = Self { conn };
52 storage.init_schema()?;
53 Ok(storage)
54 }
55
56 /// Initializes the database schema
57 ///
58 /// Creates tables and indexes if they don't exist, and sets up version tracking.
59 fn init_schema(&mut self) -> Result<()> {
60 self.conn
61 .execute_batch(INIT_SQL)
62 .map_err(|e| PaiError::Storage(format!("Failed to initialize schema: {e}")))?;
63
64 let version: Option<i32> = self
65 .conn
66 .query_row("SELECT version FROM schema_version LIMIT 1", [], |row| row.get(0))
67 .optional()
68 .map_err(|e| PaiError::Storage(format!("Failed to check schema version: {e}")))?;
69
70 match version {
71 None => {
72 self.conn
73 .execute(
74 "INSERT INTO schema_version (version) VALUES (?1)",
75 params![SCHEMA_VERSION],
76 )
77 .map_err(|e| PaiError::Storage(format!("Failed to set schema version: {e}")))?;
78 }
79 Some(v) if v < SCHEMA_VERSION => {
80 return Err(PaiError::Storage(format!(
81 "Database migration needed: current={v}, required={SCHEMA_VERSION}"
82 )));
83 }
84 _ => {}
85 }
86
87 Ok(())
88 }
89
90 /// Gets basic statistics about stored items
91 pub fn get_stats(&self) -> Result<Vec<(String, usize)>> {
92 let mut stmt = self
93 .conn
94 .prepare("SELECT source_kind, COUNT(*) FROM items GROUP BY source_kind ORDER BY source_kind")
95 .map_err(|e| PaiError::Storage(format!("Failed to prepare stats query: {e}")))?;
96
97 let stats = stmt
98 .query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?)))
99 .map_err(|e| PaiError::Storage(format!("Failed to query stats: {e}")))?
100 .collect::<std::result::Result<Vec<_>, _>>()
101 .map_err(|e| PaiError::Storage(format!("Failed to collect stats: {e}")))?;
102
103 Ok(stats)
104 }
105
106 /// Gets total item count
107 pub fn count_items(&self) -> Result<usize> {
108 self.conn
109 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0))
110 .map_err(|e| PaiError::Storage(format!("Failed to count items: {e}")))
111 }
112
113 /// Verifies schema integrity
114 ///
115 /// Checks that required tables and indexes exist.
116 pub fn verify_schema(&self) -> Result<()> {
117 let tables = vec!["schema_version", "items"];
118 for table in tables {
119 let exists: bool = self
120 .conn
121 .query_row(
122 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
123 params![table],
124 |row| {
125 let count: i32 = row.get(0)?;
126 Ok(count > 0)
127 },
128 )
129 .map_err(|e| PaiError::Storage(format!("Failed to verify table {table}: {e}")))?;
130
131 if !exists {
132 return Err(PaiError::Storage(format!("Missing table: {table}")));
133 }
134 }
135
136 Ok(())
137 }
138
139 /// Fetches a single item by ID, if it exists
140 pub fn get_item(&self, id: &str) -> Result<Option<Item>> {
141 let mut stmt = self
142 .conn
143 .prepare(
144 "SELECT id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at
145 FROM items WHERE id = ?1 LIMIT 1",
146 )
147 .map_err(|e| PaiError::Storage(format!("Failed to prepare get_item query: {e}")))?;
148
149 stmt.query_row([id], |row| {
150 let source_kind_str: String = row.get(1)?;
151 let source_kind = source_kind_str
152 .parse::<SourceKind>()
153 .map_err(|e| rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(e)))?;
154
155 Ok(Item {
156 id: row.get(0)?,
157 source_kind,
158 source_id: row.get(2)?,
159 author: row.get(3)?,
160 title: row.get(4)?,
161 summary: row.get(5)?,
162 url: row.get(6)?,
163 content_html: row.get(7)?,
164 published_at: row.get(8)?,
165 created_at: row.get(9)?,
166 })
167 })
168 .optional()
169 .map_err(|e| PaiError::Storage(format!("Failed to fetch item by id: {e}")))
170 }
171}
172
173impl Storage for SqliteStorage {
174 fn insert_or_replace_item(&self, item: &Item) -> Result<()> {
175 self.conn
176 .execute(
177 "INSERT OR REPLACE INTO items
178 (id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at)
179 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
180 params![
181 item.id,
182 item.source_kind.to_string(),
183 item.source_id,
184 item.author,
185 item.title,
186 item.summary,
187 item.url,
188 item.content_html,
189 item.published_at,
190 item.created_at,
191 ],
192 )
193 .map_err(|e| PaiError::Storage(format!("Failed to insert item: {e}")))?;
194
195 Ok(())
196 }
197
198 fn list_items(&self, filter: &ListFilter) -> Result<Vec<Item>> {
199 let mut sql = String::from("SELECT id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at FROM items WHERE 1=1");
200 let mut conditions = Vec::new();
201
202 if filter.source_kind.is_some() {
203 sql.push_str(" AND source_kind = ?");
204 conditions.push(filter.source_kind.unwrap().to_string());
205 }
206
207 if let Some(ref source_id) = filter.source_id {
208 sql.push_str(" AND source_id = ?");
209 conditions.push(source_id.clone());
210 }
211
212 if let Some(ref since) = filter.since {
213 sql.push_str(" AND published_at >= ?");
214 conditions.push(since.clone());
215 }
216
217 if let Some(ref query) = filter.query {
218 sql.push_str(" AND (title LIKE ? OR summary LIKE ?)");
219 let pattern = format!("%{query}%");
220 conditions.push(pattern.clone());
221 conditions.push(pattern);
222 }
223
224 sql.push_str(" ORDER BY published_at DESC");
225
226 if let Some(limit) = filter.limit {
227 sql.push_str(&format!(" LIMIT {limit}"));
228 }
229
230 let mut stmt = self
231 .conn
232 .prepare(&sql)
233 .map_err(|e| PaiError::Storage(format!("Failed to prepare query: {e}")))?;
234
235 let params_refs: Vec<&dyn rusqlite::ToSql> = conditions.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
236
237 let items = stmt
238 .query_map(params_refs.as_slice(), |row| {
239 let source_kind_str: String = row.get(1)?;
240 let source_kind = source_kind_str.parse::<SourceKind>().map_err(|e| {
241 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(e))
242 })?;
243
244 Ok(Item {
245 id: row.get(0)?,
246 source_kind,
247 source_id: row.get(2)?,
248 author: row.get(3)?,
249 title: row.get(4)?,
250 summary: row.get(5)?,
251 url: row.get(6)?,
252 content_html: row.get(7)?,
253 published_at: row.get(8)?,
254 created_at: row.get(9)?,
255 })
256 })
257 .map_err(|e| PaiError::Storage(format!("Failed to query items: {e}")))?
258 .collect::<std::result::Result<Vec<_>, _>>()
259 .map_err(|e| PaiError::Storage(format!("Failed to collect items: {e}")))?;
260
261 Ok(items)
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use chrono::Utc;
269
270 fn create_test_storage() -> SqliteStorage {
271 SqliteStorage::new(":memory:").expect("Failed to create in-memory database")
272 }
273
274 fn create_test_item(id: &str, source_kind: SourceKind, source_id: &str) -> Item {
275 Item {
276 id: id.to_string(),
277 source_kind,
278 source_id: source_id.to_string(),
279 author: Some("Test Author".to_string()),
280 title: Some("Test Title".to_string()),
281 summary: Some("Test summary".to_string()),
282 url: format!("https://example.com/{id}"),
283 content_html: Some("<p>Test content</p>".to_string()),
284 published_at: Utc::now().to_rfc3339(),
285 created_at: Utc::now().to_rfc3339(),
286 }
287 }
288
289 #[test]
290 fn new_database_initializes_schema() {
291 let storage = create_test_storage();
292 assert!(storage.verify_schema().is_ok());
293 }
294
295 #[test]
296 fn insert_and_retrieve_item() {
297 let storage = create_test_storage();
298 let item = create_test_item("test-1", SourceKind::Substack, "test.substack.com");
299
300 storage.insert_or_replace_item(&item).expect("Failed to insert item");
301
302 let filter = ListFilter::default();
303 let items = storage.list_items(&filter).expect("Failed to list items");
304
305 assert_eq!(items.len(), 1);
306 assert_eq!(items[0].id, "test-1");
307 assert_eq!(items[0].source_kind, SourceKind::Substack);
308 }
309
310 #[test]
311 fn insert_replaces_existing_item() {
312 let storage = create_test_storage();
313 let mut item = create_test_item("test-1", SourceKind::Substack, "test.substack.com");
314
315 storage.insert_or_replace_item(&item).expect("Failed to insert item");
316
317 item.title = Some("Updated Title".to_string());
318 storage.insert_or_replace_item(&item).expect("Failed to replace item");
319
320 let filter = ListFilter::default();
321 let items = storage.list_items(&filter).expect("Failed to list items");
322
323 assert_eq!(items.len(), 1);
324 assert_eq!(items[0].title, Some("Updated Title".to_string()));
325 }
326
327 #[test]
328 fn filter_by_source_kind() {
329 let storage = create_test_storage();
330
331 storage
332 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Substack, "test.substack.com"))
333 .expect("Failed to insert");
334 storage
335 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Bluesky, "test.bsky.social"))
336 .expect("Failed to insert");
337
338 let filter = ListFilter { source_kind: Some(SourceKind::Substack), ..Default::default() };
339 let items = storage.list_items(&filter).expect("Failed to list items");
340
341 assert_eq!(items.len(), 1);
342 assert_eq!(items[0].source_kind, SourceKind::Substack);
343 }
344
345 #[test]
346 fn filter_by_source_id() {
347 let storage = create_test_storage();
348
349 storage
350 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Leaflet, "source1.leaflet.pub"))
351 .expect("Failed to insert");
352 storage
353 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Leaflet, "source2.leaflet.pub"))
354 .expect("Failed to insert");
355
356 let filter = ListFilter { source_id: Some("source1.leaflet.pub".to_string()), ..Default::default() };
357 let items = storage.list_items(&filter).expect("Failed to list items");
358
359 assert_eq!(items.len(), 1);
360 assert_eq!(items[0].source_id, "source1.leaflet.pub");
361 }
362
363 #[test]
364 fn filter_with_limit() {
365 let storage = create_test_storage();
366
367 for i in 0..5 {
368 storage
369 .insert_or_replace_item(&create_test_item(
370 &format!("test-{i}"),
371 SourceKind::Substack,
372 "test.substack.com",
373 ))
374 .expect("Failed to insert");
375 }
376
377 let filter = ListFilter { limit: Some(3), ..Default::default() };
378 let items = storage.list_items(&filter).expect("Failed to list items");
379
380 assert_eq!(items.len(), 3);
381 }
382
383 #[test]
384 fn filter_by_query() {
385 let storage = create_test_storage();
386
387 let mut item1 = create_test_item("test-1", SourceKind::Substack, "test.substack.com");
388 item1.title = Some("Rust Programming".to_string());
389 storage.insert_or_replace_item(&item1).expect("Failed to insert");
390
391 let mut item2 = create_test_item("test-2", SourceKind::Substack, "test.substack.com");
392 item2.title = Some("Python Tutorial".to_string());
393 storage.insert_or_replace_item(&item2).expect("Failed to insert");
394
395 let filter = ListFilter { query: Some("Rust".to_string()), ..Default::default() };
396 let items = storage.list_items(&filter).expect("Failed to list items");
397
398 assert_eq!(items.len(), 1);
399 assert_eq!(items[0].id, "test-1");
400 }
401
402 #[test]
403 fn get_stats_returns_counts_by_source() {
404 let storage = create_test_storage();
405
406 storage
407 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Substack, "test.substack.com"))
408 .expect("Failed to insert");
409 storage
410 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Substack, "test.substack.com"))
411 .expect("Failed to insert");
412 storage
413 .insert_or_replace_item(&create_test_item("test-3", SourceKind::Bluesky, "test.bsky.social"))
414 .expect("Failed to insert");
415
416 let stats = storage.get_stats().expect("Failed to get stats");
417
418 assert_eq!(stats.len(), 2);
419 assert!(stats.iter().any(|(k, v)| k == "bluesky" && *v == 1));
420 assert!(stats.iter().any(|(k, v)| k == "substack" && *v == 2));
421 }
422
423 #[test]
424 fn count_items_returns_total() {
425 let storage = create_test_storage();
426
427 for i in 0..3 {
428 storage
429 .insert_or_replace_item(&create_test_item(
430 &format!("test-{i}"),
431 SourceKind::Substack,
432 "test.substack.com",
433 ))
434 .expect("Failed to insert");
435 }
436
437 let count = storage.count_items().expect("Failed to count items");
438 assert_eq!(count, 3);
439 }
440
441 #[test]
442 fn get_item_returns_record() {
443 let storage = create_test_storage();
444 let item = create_test_item("test-1", SourceKind::Substack, "test.substack.com");
445 storage.insert_or_replace_item(&item).expect("Failed to insert");
446
447 let fetched = storage.get_item("test-1").expect("query failed").unwrap();
448 assert_eq!(fetched.id, "test-1");
449 assert_eq!(fetched.source_kind, SourceKind::Substack);
450 }
451
452 #[test]
453 fn get_item_returns_none_for_missing() {
454 let storage = create_test_storage();
455 let result = storage.get_item("nope").expect("query failed");
456 assert!(result.is_none());
457 }
458}