personal activity index (bluesky, leaflet, substack) pai.desertthunder.dev
rss bluesky
at main 458 lines 16 kB view raw
1use pai_core::{Item, ListFilter, PaiError, Result, SourceKind, Storage}; 2use rusqlite::{params, Connection, OptionalExtension}; 3use std::path::Path; 4 5const SCHEMA_VERSION: i32 = 1; 6 7const INIT_SQL: &str = r#" 8CREATE TABLE IF NOT EXISTS schema_version ( 9 version INTEGER PRIMARY KEY 10); 11 12CREATE TABLE IF NOT EXISTS items ( 13 id TEXT PRIMARY KEY, 14 source_kind TEXT NOT NULL, 15 source_id TEXT NOT NULL, 16 author TEXT, 17 title TEXT, 18 summary TEXT, 19 url TEXT NOT NULL, 20 content_html TEXT, 21 published_at TEXT NOT NULL, 22 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP 23); 24 25CREATE INDEX IF NOT EXISTS idx_items_source_date 26 ON items (source_kind, source_id, published_at DESC); 27"#; 28 29/// SQLite implementation of the Storage trait 30/// 31/// Manages persistent storage of items in a local SQLite database. 32/// Handles schema initialization and migrations automatically on first connection. 33pub struct SqliteStorage { 34 conn: Connection, 35} 36 37impl SqliteStorage { 38 /// Opens or creates a SQLite database at the given path 39 /// 40 /// Initializes the schema if the database is new or runs migrations if needed. 41 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> { 42 let path_ref = path.as_ref(); 43 44 if let Some(parent) = path_ref.parent() { 45 std::fs::create_dir_all(parent) 46 .map_err(|e| PaiError::Storage(format!("Failed to create database directory: {e}")))?; 47 } 48 49 let conn = Connection::open(path).map_err(|e| PaiError::Storage(format!("Failed to open database: {e}")))?; 50 51 let mut storage = Self { conn }; 52 storage.init_schema()?; 53 Ok(storage) 54 } 55 56 /// Initializes the database schema 57 /// 58 /// Creates tables and indexes if they don't exist, and sets up version tracking. 59 fn init_schema(&mut self) -> Result<()> { 60 self.conn 61 .execute_batch(INIT_SQL) 62 .map_err(|e| PaiError::Storage(format!("Failed to initialize schema: {e}")))?; 63 64 let version: Option<i32> = self 65 .conn 66 .query_row("SELECT version FROM schema_version LIMIT 1", [], |row| row.get(0)) 67 .optional() 68 .map_err(|e| PaiError::Storage(format!("Failed to check schema version: {e}")))?; 69 70 match version { 71 None => { 72 self.conn 73 .execute( 74 "INSERT INTO schema_version (version) VALUES (?1)", 75 params![SCHEMA_VERSION], 76 ) 77 .map_err(|e| PaiError::Storage(format!("Failed to set schema version: {e}")))?; 78 } 79 Some(v) if v < SCHEMA_VERSION => { 80 return Err(PaiError::Storage(format!( 81 "Database migration needed: current={v}, required={SCHEMA_VERSION}" 82 ))); 83 } 84 _ => {} 85 } 86 87 Ok(()) 88 } 89 90 /// Gets basic statistics about stored items 91 pub fn get_stats(&self) -> Result<Vec<(String, usize)>> { 92 let mut stmt = self 93 .conn 94 .prepare("SELECT source_kind, COUNT(*) FROM items GROUP BY source_kind ORDER BY source_kind") 95 .map_err(|e| PaiError::Storage(format!("Failed to prepare stats query: {e}")))?; 96 97 let stats = stmt 98 .query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?))) 99 .map_err(|e| PaiError::Storage(format!("Failed to query stats: {e}")))? 100 .collect::<std::result::Result<Vec<_>, _>>() 101 .map_err(|e| PaiError::Storage(format!("Failed to collect stats: {e}")))?; 102 103 Ok(stats) 104 } 105 106 /// Gets total item count 107 pub fn count_items(&self) -> Result<usize> { 108 self.conn 109 .query_row("SELECT COUNT(*) FROM items", [], |row| row.get(0)) 110 .map_err(|e| PaiError::Storage(format!("Failed to count items: {e}"))) 111 } 112 113 /// Verifies schema integrity 114 /// 115 /// Checks that required tables and indexes exist. 116 pub fn verify_schema(&self) -> Result<()> { 117 let tables = vec!["schema_version", "items"]; 118 for table in tables { 119 let exists: bool = self 120 .conn 121 .query_row( 122 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", 123 params![table], 124 |row| { 125 let count: i32 = row.get(0)?; 126 Ok(count > 0) 127 }, 128 ) 129 .map_err(|e| PaiError::Storage(format!("Failed to verify table {table}: {e}")))?; 130 131 if !exists { 132 return Err(PaiError::Storage(format!("Missing table: {table}"))); 133 } 134 } 135 136 Ok(()) 137 } 138 139 /// Fetches a single item by ID, if it exists 140 pub fn get_item(&self, id: &str) -> Result<Option<Item>> { 141 let mut stmt = self 142 .conn 143 .prepare( 144 "SELECT id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at 145 FROM items WHERE id = ?1 LIMIT 1", 146 ) 147 .map_err(|e| PaiError::Storage(format!("Failed to prepare get_item query: {e}")))?; 148 149 stmt.query_row([id], |row| { 150 let source_kind_str: String = row.get(1)?; 151 let source_kind = source_kind_str 152 .parse::<SourceKind>() 153 .map_err(|e| rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(e)))?; 154 155 Ok(Item { 156 id: row.get(0)?, 157 source_kind, 158 source_id: row.get(2)?, 159 author: row.get(3)?, 160 title: row.get(4)?, 161 summary: row.get(5)?, 162 url: row.get(6)?, 163 content_html: row.get(7)?, 164 published_at: row.get(8)?, 165 created_at: row.get(9)?, 166 }) 167 }) 168 .optional() 169 .map_err(|e| PaiError::Storage(format!("Failed to fetch item by id: {e}"))) 170 } 171} 172 173impl Storage for SqliteStorage { 174 fn insert_or_replace_item(&self, item: &Item) -> Result<()> { 175 self.conn 176 .execute( 177 "INSERT OR REPLACE INTO items 178 (id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at) 179 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 180 params![ 181 item.id, 182 item.source_kind.to_string(), 183 item.source_id, 184 item.author, 185 item.title, 186 item.summary, 187 item.url, 188 item.content_html, 189 item.published_at, 190 item.created_at, 191 ], 192 ) 193 .map_err(|e| PaiError::Storage(format!("Failed to insert item: {e}")))?; 194 195 Ok(()) 196 } 197 198 fn list_items(&self, filter: &ListFilter) -> Result<Vec<Item>> { 199 let mut sql = String::from("SELECT id, source_kind, source_id, author, title, summary, url, content_html, published_at, created_at FROM items WHERE 1=1"); 200 let mut conditions = Vec::new(); 201 202 if filter.source_kind.is_some() { 203 sql.push_str(" AND source_kind = ?"); 204 conditions.push(filter.source_kind.unwrap().to_string()); 205 } 206 207 if let Some(ref source_id) = filter.source_id { 208 sql.push_str(" AND source_id = ?"); 209 conditions.push(source_id.clone()); 210 } 211 212 if let Some(ref since) = filter.since { 213 sql.push_str(" AND published_at >= ?"); 214 conditions.push(since.clone()); 215 } 216 217 if let Some(ref query) = filter.query { 218 sql.push_str(" AND (title LIKE ? OR summary LIKE ?)"); 219 let pattern = format!("%{query}%"); 220 conditions.push(pattern.clone()); 221 conditions.push(pattern); 222 } 223 224 sql.push_str(" ORDER BY published_at DESC"); 225 226 if let Some(limit) = filter.limit { 227 sql.push_str(&format!(" LIMIT {limit}")); 228 } 229 230 let mut stmt = self 231 .conn 232 .prepare(&sql) 233 .map_err(|e| PaiError::Storage(format!("Failed to prepare query: {e}")))?; 234 235 let params_refs: Vec<&dyn rusqlite::ToSql> = conditions.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); 236 237 let items = stmt 238 .query_map(params_refs.as_slice(), |row| { 239 let source_kind_str: String = row.get(1)?; 240 let source_kind = source_kind_str.parse::<SourceKind>().map_err(|e| { 241 rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Text, Box::new(e)) 242 })?; 243 244 Ok(Item { 245 id: row.get(0)?, 246 source_kind, 247 source_id: row.get(2)?, 248 author: row.get(3)?, 249 title: row.get(4)?, 250 summary: row.get(5)?, 251 url: row.get(6)?, 252 content_html: row.get(7)?, 253 published_at: row.get(8)?, 254 created_at: row.get(9)?, 255 }) 256 }) 257 .map_err(|e| PaiError::Storage(format!("Failed to query items: {e}")))? 258 .collect::<std::result::Result<Vec<_>, _>>() 259 .map_err(|e| PaiError::Storage(format!("Failed to collect items: {e}")))?; 260 261 Ok(items) 262 } 263} 264 265#[cfg(test)] 266mod tests { 267 use super::*; 268 use chrono::Utc; 269 270 fn create_test_storage() -> SqliteStorage { 271 SqliteStorage::new(":memory:").expect("Failed to create in-memory database") 272 } 273 274 fn create_test_item(id: &str, source_kind: SourceKind, source_id: &str) -> Item { 275 Item { 276 id: id.to_string(), 277 source_kind, 278 source_id: source_id.to_string(), 279 author: Some("Test Author".to_string()), 280 title: Some("Test Title".to_string()), 281 summary: Some("Test summary".to_string()), 282 url: format!("https://example.com/{id}"), 283 content_html: Some("<p>Test content</p>".to_string()), 284 published_at: Utc::now().to_rfc3339(), 285 created_at: Utc::now().to_rfc3339(), 286 } 287 } 288 289 #[test] 290 fn new_database_initializes_schema() { 291 let storage = create_test_storage(); 292 assert!(storage.verify_schema().is_ok()); 293 } 294 295 #[test] 296 fn insert_and_retrieve_item() { 297 let storage = create_test_storage(); 298 let item = create_test_item("test-1", SourceKind::Substack, "test.substack.com"); 299 300 storage.insert_or_replace_item(&item).expect("Failed to insert item"); 301 302 let filter = ListFilter::default(); 303 let items = storage.list_items(&filter).expect("Failed to list items"); 304 305 assert_eq!(items.len(), 1); 306 assert_eq!(items[0].id, "test-1"); 307 assert_eq!(items[0].source_kind, SourceKind::Substack); 308 } 309 310 #[test] 311 fn insert_replaces_existing_item() { 312 let storage = create_test_storage(); 313 let mut item = create_test_item("test-1", SourceKind::Substack, "test.substack.com"); 314 315 storage.insert_or_replace_item(&item).expect("Failed to insert item"); 316 317 item.title = Some("Updated Title".to_string()); 318 storage.insert_or_replace_item(&item).expect("Failed to replace item"); 319 320 let filter = ListFilter::default(); 321 let items = storage.list_items(&filter).expect("Failed to list items"); 322 323 assert_eq!(items.len(), 1); 324 assert_eq!(items[0].title, Some("Updated Title".to_string())); 325 } 326 327 #[test] 328 fn filter_by_source_kind() { 329 let storage = create_test_storage(); 330 331 storage 332 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Substack, "test.substack.com")) 333 .expect("Failed to insert"); 334 storage 335 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Bluesky, "test.bsky.social")) 336 .expect("Failed to insert"); 337 338 let filter = ListFilter { source_kind: Some(SourceKind::Substack), ..Default::default() }; 339 let items = storage.list_items(&filter).expect("Failed to list items"); 340 341 assert_eq!(items.len(), 1); 342 assert_eq!(items[0].source_kind, SourceKind::Substack); 343 } 344 345 #[test] 346 fn filter_by_source_id() { 347 let storage = create_test_storage(); 348 349 storage 350 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Leaflet, "source1.leaflet.pub")) 351 .expect("Failed to insert"); 352 storage 353 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Leaflet, "source2.leaflet.pub")) 354 .expect("Failed to insert"); 355 356 let filter = ListFilter { source_id: Some("source1.leaflet.pub".to_string()), ..Default::default() }; 357 let items = storage.list_items(&filter).expect("Failed to list items"); 358 359 assert_eq!(items.len(), 1); 360 assert_eq!(items[0].source_id, "source1.leaflet.pub"); 361 } 362 363 #[test] 364 fn filter_with_limit() { 365 let storage = create_test_storage(); 366 367 for i in 0..5 { 368 storage 369 .insert_or_replace_item(&create_test_item( 370 &format!("test-{i}"), 371 SourceKind::Substack, 372 "test.substack.com", 373 )) 374 .expect("Failed to insert"); 375 } 376 377 let filter = ListFilter { limit: Some(3), ..Default::default() }; 378 let items = storage.list_items(&filter).expect("Failed to list items"); 379 380 assert_eq!(items.len(), 3); 381 } 382 383 #[test] 384 fn filter_by_query() { 385 let storage = create_test_storage(); 386 387 let mut item1 = create_test_item("test-1", SourceKind::Substack, "test.substack.com"); 388 item1.title = Some("Rust Programming".to_string()); 389 storage.insert_or_replace_item(&item1).expect("Failed to insert"); 390 391 let mut item2 = create_test_item("test-2", SourceKind::Substack, "test.substack.com"); 392 item2.title = Some("Python Tutorial".to_string()); 393 storage.insert_or_replace_item(&item2).expect("Failed to insert"); 394 395 let filter = ListFilter { query: Some("Rust".to_string()), ..Default::default() }; 396 let items = storage.list_items(&filter).expect("Failed to list items"); 397 398 assert_eq!(items.len(), 1); 399 assert_eq!(items[0].id, "test-1"); 400 } 401 402 #[test] 403 fn get_stats_returns_counts_by_source() { 404 let storage = create_test_storage(); 405 406 storage 407 .insert_or_replace_item(&create_test_item("test-1", SourceKind::Substack, "test.substack.com")) 408 .expect("Failed to insert"); 409 storage 410 .insert_or_replace_item(&create_test_item("test-2", SourceKind::Substack, "test.substack.com")) 411 .expect("Failed to insert"); 412 storage 413 .insert_or_replace_item(&create_test_item("test-3", SourceKind::Bluesky, "test.bsky.social")) 414 .expect("Failed to insert"); 415 416 let stats = storage.get_stats().expect("Failed to get stats"); 417 418 assert_eq!(stats.len(), 2); 419 assert!(stats.iter().any(|(k, v)| k == "bluesky" && *v == 1)); 420 assert!(stats.iter().any(|(k, v)| k == "substack" && *v == 2)); 421 } 422 423 #[test] 424 fn count_items_returns_total() { 425 let storage = create_test_storage(); 426 427 for i in 0..3 { 428 storage 429 .insert_or_replace_item(&create_test_item( 430 &format!("test-{i}"), 431 SourceKind::Substack, 432 "test.substack.com", 433 )) 434 .expect("Failed to insert"); 435 } 436 437 let count = storage.count_items().expect("Failed to count items"); 438 assert_eq!(count, 3); 439 } 440 441 #[test] 442 fn get_item_returns_record() { 443 let storage = create_test_storage(); 444 let item = create_test_item("test-1", SourceKind::Substack, "test.substack.com"); 445 storage.insert_or_replace_item(&item).expect("Failed to insert"); 446 447 let fetched = storage.get_item("test-1").expect("query failed").unwrap(); 448 assert_eq!(fetched.id, "test-1"); 449 assert_eq!(fetched.source_kind, SourceKind::Substack); 450 } 451 452 #[test] 453 fn get_item_returns_none_for_missing() { 454 let storage = create_test_storage(); 455 let result = storage.get_item("nope").expect("query failed"); 456 assert!(result.is_none()); 457 } 458}