atproto blogging
1use std::collections::hash_map::DefaultHasher;
2use std::fs;
3use std::hash::{Hash, Hasher};
4use std::path::{Path, PathBuf};
5use std::sync::Mutex;
6use std::time::Instant;
7
8use dashmap::DashMap;
9use rusqlite::Connection;
10use rusqlite_migration::{M, Migrations};
11use smol_str::SmolStr;
12
13use crate::error::{IndexError, SqliteError};
14
15/// Key for shard routing - (collection, rkey) tuple
16#[derive(Debug, Clone, Hash, PartialEq, Eq)]
17pub struct ShardKey {
18 pub collection: SmolStr,
19 pub rkey: SmolStr,
20}
21
22impl ShardKey {
23 pub fn new(collection: impl Into<SmolStr>, rkey: impl Into<SmolStr>) -> Self {
24 Self {
25 collection: collection.into(),
26 rkey: rkey.into(),
27 }
28 }
29
30 fn hash_prefix(&self) -> String {
31 let mut hasher = DefaultHasher::new();
32 self.hash(&mut hasher);
33 let hash = hasher.finish();
34 format!("{:02x}", (hash & 0xFF) as u8)
35 }
36
37 /// Directory path: {base}/{hash(collection,rkey)[0..2]}/{rkey}/
38 fn dir_path(&self, base: &Path) -> PathBuf {
39 base.join(self.hash_prefix()).join(self.rkey.as_str())
40 }
41
42 pub fn collection(&self) -> &str {
43 &self.collection
44 }
45
46 pub fn rkey(&self) -> &str {
47 &self.rkey
48 }
49}
50
51/// A single SQLite shard for a resource
52pub struct SqliteShard {
53 conn: Mutex<Connection>,
54 path: PathBuf,
55 last_accessed: Mutex<Instant>,
56}
57
58impl SqliteShard {
59 const DB_FILENAME: &'static str = "store.sqlite";
60
61 fn open(dir: &Path) -> Result<Self, IndexError> {
62 fs::create_dir_all(dir).map_err(|e| SqliteError::Io {
63 path: dir.to_path_buf(),
64 source: e,
65 })?;
66
67 let db_path = dir.join(Self::DB_FILENAME);
68 let mut conn = Connection::open(&db_path).map_err(|e| SqliteError::Open {
69 path: db_path.clone(),
70 source: e,
71 })?;
72
73 // Enable WAL mode for better concurrency
74 conn.pragma_update(None, "journal_mode", "WAL")
75 .map_err(|e| SqliteError::Pragma {
76 pragma: "journal_mode",
77 source: e,
78 })?;
79
80 // Run migrations
81 // PERF: rusqlite_migration checks user_version pragma, which is fast when
82 // no migrations needed. If shard open becomes a bottleneck, consider adding
83 // a signal file (e.g., .schema_v{N}) to skip migration check entirely.
84 Self::migrations()
85 .to_latest(&mut conn)
86 .map_err(|e| SqliteError::Migration {
87 message: e.to_string(),
88 })?;
89
90 Ok(Self {
91 conn: Mutex::new(conn),
92 path: db_path,
93 last_accessed: Mutex::new(Instant::now()),
94 })
95 }
96
97 fn migrations() -> Migrations<'static> {
98 Migrations::new(vec![
99 M::up(include_str!("sqlite/migrations/001_edit_graph.sql")),
100 M::up(include_str!("sqlite/migrations/002_collaboration.sql")),
101 M::up(include_str!("sqlite/migrations/003_permissions.sql")),
102 ])
103 }
104
105 pub fn path(&self) -> &Path {
106 &self.path
107 }
108
109 pub fn touch(&self) {
110 if let Ok(mut last) = self.last_accessed.lock() {
111 *last = Instant::now();
112 }
113 }
114
115 pub fn last_accessed(&self) -> Instant {
116 self.last_accessed.lock().map(|t| *t).expect("poisoned")
117 }
118
119 /// Execute a read operation on the shard
120 pub fn read<F, T>(&self, f: F) -> Result<T, IndexError>
121 where
122 F: FnOnce(&Connection) -> Result<T, rusqlite::Error>,
123 {
124 self.touch();
125 let conn = self.conn.lock().map_err(|_| SqliteError::LockPoisoned)?;
126 f(&conn).map_err(|e| {
127 SqliteError::Query {
128 message: e.to_string(),
129 }
130 .into()
131 })
132 }
133
134 /// Execute a write operation on the shard
135 pub fn write<F, T>(&self, f: F) -> Result<T, IndexError>
136 where
137 F: FnOnce(&Connection) -> Result<T, rusqlite::Error>,
138 {
139 self.touch();
140 let conn = self.conn.lock().map_err(|_| SqliteError::LockPoisoned)?;
141 f(&conn).map_err(|e| {
142 SqliteError::Query {
143 message: e.to_string(),
144 }
145 .into()
146 })
147 }
148}
149
150/// Routes resources to their SQLite shards
151pub struct ShardRouter {
152 base_path: PathBuf,
153 shards: DashMap<ShardKey, std::sync::Arc<SqliteShard>>,
154}
155
156impl ShardRouter {
157 pub fn new(base_path: impl Into<PathBuf>) -> Self {
158 Self {
159 base_path: base_path.into(),
160 shards: DashMap::new(),
161 }
162 }
163
164 /// Get or create a shard for the given key
165 pub fn get_or_create(&self, key: &ShardKey) -> Result<std::sync::Arc<SqliteShard>, IndexError> {
166 // Fast path: already cached
167 if let Some(shard) = self.shards.get(key) {
168 shard.touch();
169 return Ok(shard.clone());
170 }
171
172 // Slow path: create new shard
173 let dir = key.dir_path(&self.base_path);
174 let shard = std::sync::Arc::new(SqliteShard::open(&dir)?);
175 self.shards.insert(key.clone(), shard.clone());
176
177 Ok(shard)
178 }
179
180 /// Get an existing shard without creating
181 pub fn get(&self, key: &ShardKey) -> Option<std::sync::Arc<SqliteShard>> {
182 self.shards.get(key).map(|s| {
183 s.touch();
184 s.clone()
185 })
186 }
187
188 /// Number of active shards
189 pub fn shard_count(&self) -> usize {
190 self.shards.len()
191 }
192
193 /// Iterate over shards that haven't been accessed since the given instant
194 pub fn idle_shards(&self, since: Instant) -> Vec<ShardKey> {
195 self.shards
196 .iter()
197 .filter(|entry| entry.value().last_accessed() < since)
198 .map(|entry| entry.key().clone())
199 .collect()
200 }
201
202 /// Remove a shard from the cache (for eviction)
203 pub fn evict(&self, key: &ShardKey) -> Option<std::sync::Arc<SqliteShard>> {
204 self.shards.remove(key).map(|(_, shard)| shard)
205 }
206}