at main 206 lines 6.0 kB view raw
1use std::collections::hash_map::DefaultHasher; 2use std::fs; 3use std::hash::{Hash, Hasher}; 4use std::path::{Path, PathBuf}; 5use std::sync::Mutex; 6use std::time::Instant; 7 8use dashmap::DashMap; 9use rusqlite::Connection; 10use rusqlite_migration::{M, Migrations}; 11use smol_str::SmolStr; 12 13use crate::error::{IndexError, SqliteError}; 14 15/// Key for shard routing - (collection, rkey) tuple 16#[derive(Debug, Clone, Hash, PartialEq, Eq)] 17pub struct ShardKey { 18 pub collection: SmolStr, 19 pub rkey: SmolStr, 20} 21 22impl ShardKey { 23 pub fn new(collection: impl Into<SmolStr>, rkey: impl Into<SmolStr>) -> Self { 24 Self { 25 collection: collection.into(), 26 rkey: rkey.into(), 27 } 28 } 29 30 fn hash_prefix(&self) -> String { 31 let mut hasher = DefaultHasher::new(); 32 self.hash(&mut hasher); 33 let hash = hasher.finish(); 34 format!("{:02x}", (hash & 0xFF) as u8) 35 } 36 37 /// Directory path: {base}/{hash(collection,rkey)[0..2]}/{rkey}/ 38 fn dir_path(&self, base: &Path) -> PathBuf { 39 base.join(self.hash_prefix()).join(self.rkey.as_str()) 40 } 41 42 pub fn collection(&self) -> &str { 43 &self.collection 44 } 45 46 pub fn rkey(&self) -> &str { 47 &self.rkey 48 } 49} 50 51/// A single SQLite shard for a resource 52pub struct SqliteShard { 53 conn: Mutex<Connection>, 54 path: PathBuf, 55 last_accessed: Mutex<Instant>, 56} 57 58impl SqliteShard { 59 const DB_FILENAME: &'static str = "store.sqlite"; 60 61 fn open(dir: &Path) -> Result<Self, IndexError> { 62 fs::create_dir_all(dir).map_err(|e| SqliteError::Io { 63 path: dir.to_path_buf(), 64 source: e, 65 })?; 66 67 let db_path = dir.join(Self::DB_FILENAME); 68 let mut conn = Connection::open(&db_path).map_err(|e| SqliteError::Open { 69 path: db_path.clone(), 70 source: e, 71 })?; 72 73 // Enable WAL mode for better concurrency 74 conn.pragma_update(None, "journal_mode", "WAL") 75 .map_err(|e| SqliteError::Pragma { 76 pragma: "journal_mode", 77 source: e, 78 })?; 79 80 // Run migrations 81 // PERF: rusqlite_migration checks user_version pragma, which is fast when 82 // no migrations needed. If shard open becomes a bottleneck, consider adding 83 // a signal file (e.g., .schema_v{N}) to skip migration check entirely. 84 Self::migrations() 85 .to_latest(&mut conn) 86 .map_err(|e| SqliteError::Migration { 87 message: e.to_string(), 88 })?; 89 90 Ok(Self { 91 conn: Mutex::new(conn), 92 path: db_path, 93 last_accessed: Mutex::new(Instant::now()), 94 }) 95 } 96 97 fn migrations() -> Migrations<'static> { 98 Migrations::new(vec![ 99 M::up(include_str!("sqlite/migrations/001_edit_graph.sql")), 100 M::up(include_str!("sqlite/migrations/002_collaboration.sql")), 101 M::up(include_str!("sqlite/migrations/003_permissions.sql")), 102 ]) 103 } 104 105 pub fn path(&self) -> &Path { 106 &self.path 107 } 108 109 pub fn touch(&self) { 110 if let Ok(mut last) = self.last_accessed.lock() { 111 *last = Instant::now(); 112 } 113 } 114 115 pub fn last_accessed(&self) -> Instant { 116 self.last_accessed.lock().map(|t| *t).expect("poisoned") 117 } 118 119 /// Execute a read operation on the shard 120 pub fn read<F, T>(&self, f: F) -> Result<T, IndexError> 121 where 122 F: FnOnce(&Connection) -> Result<T, rusqlite::Error>, 123 { 124 self.touch(); 125 let conn = self.conn.lock().map_err(|_| SqliteError::LockPoisoned)?; 126 f(&conn).map_err(|e| { 127 SqliteError::Query { 128 message: e.to_string(), 129 } 130 .into() 131 }) 132 } 133 134 /// Execute a write operation on the shard 135 pub fn write<F, T>(&self, f: F) -> Result<T, IndexError> 136 where 137 F: FnOnce(&Connection) -> Result<T, rusqlite::Error>, 138 { 139 self.touch(); 140 let conn = self.conn.lock().map_err(|_| SqliteError::LockPoisoned)?; 141 f(&conn).map_err(|e| { 142 SqliteError::Query { 143 message: e.to_string(), 144 } 145 .into() 146 }) 147 } 148} 149 150/// Routes resources to their SQLite shards 151pub struct ShardRouter { 152 base_path: PathBuf, 153 shards: DashMap<ShardKey, std::sync::Arc<SqliteShard>>, 154} 155 156impl ShardRouter { 157 pub fn new(base_path: impl Into<PathBuf>) -> Self { 158 Self { 159 base_path: base_path.into(), 160 shards: DashMap::new(), 161 } 162 } 163 164 /// Get or create a shard for the given key 165 pub fn get_or_create(&self, key: &ShardKey) -> Result<std::sync::Arc<SqliteShard>, IndexError> { 166 // Fast path: already cached 167 if let Some(shard) = self.shards.get(key) { 168 shard.touch(); 169 return Ok(shard.clone()); 170 } 171 172 // Slow path: create new shard 173 let dir = key.dir_path(&self.base_path); 174 let shard = std::sync::Arc::new(SqliteShard::open(&dir)?); 175 self.shards.insert(key.clone(), shard.clone()); 176 177 Ok(shard) 178 } 179 180 /// Get an existing shard without creating 181 pub fn get(&self, key: &ShardKey) -> Option<std::sync::Arc<SqliteShard>> { 182 self.shards.get(key).map(|s| { 183 s.touch(); 184 s.clone() 185 }) 186 } 187 188 /// Number of active shards 189 pub fn shard_count(&self) -> usize { 190 self.shards.len() 191 } 192 193 /// Iterate over shards that haven't been accessed since the given instant 194 pub fn idle_shards(&self, since: Instant) -> Vec<ShardKey> { 195 self.shards 196 .iter() 197 .filter(|entry| entry.value().last_accessed() < since) 198 .map(|entry| entry.key().clone()) 199 .collect() 200 } 201 202 /// Remove a shard from the cache (for eviction) 203 pub fn evict(&self, key: &ShardKey) -> Option<std::sync::Arc<SqliteShard>> { 204 self.shards.remove(key).map(|(_, shard)| shard) 205 } 206}