Our Personal Data Server from scratch! tranquil.farm
atproto pds rust postgresql fun oauth

feat(lexicon): dynamic value types and schema registry #45

merged opened by oyster.cafe targeting main from feat/real-lex-schema-validation
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mgvbqsbibn22
+489
Diff #1
+278
crates/tranquil-lexicon/src/dynamic.rs
··· 1 + use crate::resolve::{ResolveError, resolve_lexicon}; 2 + use crate::schema::LexiconDoc; 3 + use parking_lot::RwLock; 4 + use std::collections::{HashMap, VecDeque}; 5 + use std::sync::Arc; 6 + use std::sync::atomic::{AtomicBool, Ordering}; 7 + use std::time::{Duration, Instant}; 8 + 9 + const NEGATIVE_CACHE_TTL: Duration = Duration::from_secs(24 * 60 * 60); 10 + const MAX_DYNAMIC_SCHEMAS: usize = 1024; 11 + 12 + struct NegativeEntry { 13 + expires_at: Instant, 14 + } 15 + 16 + struct SchemaStore { 17 + schemas: HashMap<String, Arc<LexiconDoc>>, 18 + insertion_order: VecDeque<String>, 19 + } 20 + 21 + pub struct DynamicRegistry { 22 + store: RwLock<SchemaStore>, 23 + negative_cache: RwLock<HashMap<String, NegativeEntry>>, 24 + network_disabled: AtomicBool, 25 + } 26 + 27 + impl DynamicRegistry { 28 + pub fn new() -> Self { 29 + let network_disabled = 30 + std::env::var("TRANQUIL_LEXICON_OFFLINE").is_ok_and(|v| v == "1" || v == "true"); 31 + Self { 32 + store: RwLock::new(SchemaStore { 33 + schemas: HashMap::new(), 34 + insertion_order: VecDeque::new(), 35 + }), 36 + negative_cache: RwLock::new(HashMap::new()), 37 + network_disabled: AtomicBool::new(network_disabled), 38 + } 39 + } 40 + 41 + #[allow(dead_code)] 42 + pub fn set_network_disabled(&self, disabled: bool) { 43 + self.network_disabled.store(disabled, Ordering::Relaxed); 44 + } 45 + 46 + pub fn get(&self, nsid: &str) -> Option<Arc<LexiconDoc>> { 47 + self.store.read().schemas.get(nsid).cloned() 48 + } 49 + 50 + pub fn is_negative_cached(&self, nsid: &str) -> bool { 51 + let cache = self.negative_cache.read(); 52 + cache 53 + .get(nsid) 54 + .is_some_and(|entry| entry.expires_at > Instant::now()) 55 + } 56 + 57 + fn insert_negative(&self, nsid: &str) { 58 + let mut cache = self.negative_cache.write(); 59 + if cache.len() > MAX_DYNAMIC_SCHEMAS { 60 + let now = Instant::now(); 61 + cache.retain(|_, entry| entry.expires_at > now); 62 + } 63 + cache.insert( 64 + nsid.to_string(), 65 + NegativeEntry { 66 + expires_at: Instant::now() + NEGATIVE_CACHE_TTL, 67 + }, 68 + ); 69 + } 70 + 71 + pub(crate) fn insert_schema(&self, doc: LexiconDoc) -> Arc<LexiconDoc> { 72 + let arc = Arc::new(doc); 73 + let nsid = arc.id.clone(); 74 + 75 + let mut store = self.store.write(); 76 + 77 + if store.schemas.len() >= MAX_DYNAMIC_SCHEMAS { 78 + tracing::warn!( 79 + count = store.schemas.len(), 80 + "dynamic schema registry at capacity, evicting oldest entries" 81 + ); 82 + let evict_count = store.schemas.len() / 4; 83 + (0..evict_count).for_each(|_| { 84 + if let Some(key) = store.insertion_order.pop_front() { 85 + store.schemas.remove(&key); 86 + } 87 + }); 88 + } 89 + 90 + if store 91 + .schemas 92 + .insert(nsid.clone(), Arc::clone(&arc)) 93 + .is_some() 94 + { 95 + store.insertion_order.retain(|k| k != &nsid); 96 + } 97 + store.insertion_order.push_back(nsid.clone()); 98 + 99 + self.negative_cache.write().remove(&arc.id); 100 + 101 + arc 102 + } 103 + 104 + pub async fn resolve_and_cache(&self, nsid: &str) -> Result<Arc<LexiconDoc>, ResolveError> { 105 + if let Some(doc) = self.get(nsid) { 106 + return Ok(doc); 107 + } 108 + 109 + if self.network_disabled.load(Ordering::Relaxed) { 110 + return Err(ResolveError::NetworkDisabled); 111 + } 112 + 113 + if self.is_negative_cached(nsid) { 114 + return Err(ResolveError::NegativelyCached { 115 + nsid: nsid.to_string(), 116 + ttl_secs: NEGATIVE_CACHE_TTL.as_secs(), 117 + }); 118 + } 119 + 120 + match resolve_lexicon(nsid).await { 121 + Ok(doc) => Ok(self.insert_schema(doc)), 122 + Err(e) => { 123 + tracing::debug!(nsid = nsid, error = %e, "caching negative resolution result"); 124 + self.insert_negative(nsid); 125 + Err(e) 126 + } 127 + } 128 + } 129 + 130 + pub fn schema_count(&self) -> usize { 131 + self.store.read().schemas.len() 132 + } 133 + } 134 + 135 + impl Default for DynamicRegistry { 136 + fn default() -> Self { 137 + Self::new() 138 + } 139 + } 140 + 141 + #[cfg(test)] 142 + mod tests { 143 + use super::*; 144 + 145 + #[test] 146 + fn test_negative_cache() { 147 + let registry = DynamicRegistry::new(); 148 + assert!(!registry.is_negative_cached("com.example.test")); 149 + 150 + registry.insert_negative("com.example.test"); 151 + assert!(registry.is_negative_cached("com.example.test")); 152 + } 153 + 154 + #[tokio::test] 155 + async fn test_negative_cache_returns_appropriate_error_variant() { 156 + let registry = DynamicRegistry::new(); 157 + registry.insert_negative("com.example.cached"); 158 + 159 + let err = registry 160 + .resolve_and_cache("com.example.cached") 161 + .await 162 + .unwrap_err(); 163 + 164 + assert!( 165 + !matches!(err, ResolveError::InvalidNsid(_)), 166 + "negative cache hit should not return InvalidNsid - the NSID is valid, it just failed resolution recently. got: {}", 167 + err 168 + ); 169 + } 170 + 171 + #[test] 172 + fn test_empty_lookup() { 173 + let registry = DynamicRegistry::new(); 174 + assert!(registry.get("com.example.nonexistent").is_none()); 175 + assert_eq!(registry.schema_count(), 0); 176 + } 177 + 178 + #[test] 179 + fn test_insert_and_retrieve() { 180 + let registry = DynamicRegistry::new(); 181 + let doc = LexiconDoc { 182 + lexicon: 1, 183 + id: "com.example.test".to_string(), 184 + defs: HashMap::new(), 185 + }; 186 + 187 + let arc = registry.insert_schema(doc); 188 + assert_eq!(arc.id, "com.example.test"); 189 + assert_eq!(registry.schema_count(), 1); 190 + 191 + let retrieved = registry.get("com.example.test"); 192 + assert!(retrieved.is_some()); 193 + assert_eq!(retrieved.unwrap().id, "com.example.test"); 194 + } 195 + 196 + #[test] 197 + fn test_negative_cache_cleared_on_insert() { 198 + let registry = DynamicRegistry::new(); 199 + 200 + registry.insert_negative("com.example.test"); 201 + assert!(registry.is_negative_cached("com.example.test")); 202 + 203 + let doc = LexiconDoc { 204 + lexicon: 1, 205 + id: "com.example.test".to_string(), 206 + defs: HashMap::new(), 207 + }; 208 + registry.insert_schema(doc); 209 + 210 + assert!(!registry.is_negative_cached("com.example.test")); 211 + } 212 + 213 + #[test] 214 + fn test_eviction_is_fifo() { 215 + let registry = DynamicRegistry::new(); 216 + 217 + (0..MAX_DYNAMIC_SCHEMAS).for_each(|i| { 218 + let doc = LexiconDoc { 219 + lexicon: 1, 220 + id: format!("com.example.schema{}", i), 221 + defs: HashMap::new(), 222 + }; 223 + registry.insert_schema(doc); 224 + }); 225 + assert_eq!(registry.schema_count(), MAX_DYNAMIC_SCHEMAS); 226 + 227 + let trigger = LexiconDoc { 228 + lexicon: 1, 229 + id: "com.example.trigger".to_string(), 230 + defs: HashMap::new(), 231 + }; 232 + registry.insert_schema(trigger); 233 + 234 + assert!( 235 + registry.get("com.example.schema0").is_none(), 236 + "oldest entry should be evicted" 237 + ); 238 + assert!( 239 + registry.get("com.example.trigger").is_some(), 240 + "newly inserted entry should exist" 241 + ); 242 + let evict_count = MAX_DYNAMIC_SCHEMAS / 4; 243 + assert!( 244 + registry 245 + .get(&format!("com.example.schema{}", evict_count)) 246 + .is_some(), 247 + "entry after eviction window should survive" 248 + ); 249 + } 250 + 251 + #[test] 252 + fn test_eviction_frees_memory() { 253 + let registry = DynamicRegistry::new(); 254 + let doc = LexiconDoc { 255 + lexicon: 1, 256 + id: "com.example.tracked".to_string(), 257 + defs: HashMap::new(), 258 + }; 259 + let arc = registry.insert_schema(doc); 260 + let weak = Arc::downgrade(&arc); 261 + drop(arc); 262 + 263 + assert!(weak.upgrade().is_some(), "registry still holds a reference"); 264 + 265 + (0..MAX_DYNAMIC_SCHEMAS).for_each(|i| { 266 + registry.insert_schema(LexiconDoc { 267 + lexicon: 1, 268 + id: format!("com.example.filler{}", i), 269 + defs: HashMap::new(), 270 + }); 271 + }); 272 + 273 + assert!( 274 + weak.upgrade().is_none(), 275 + "evicted Arc should be freed when no external references remain" 276 + ); 277 + } 278 + }
+211
crates/tranquil-lexicon/src/registry.rs
··· 1 + use crate::schema::{LexDef, LexObject, LexiconDoc, ParsedRef, parse_ref}; 2 + use std::collections::HashMap; 3 + use std::sync::{Arc, OnceLock}; 4 + 5 + static REGISTRY: OnceLock<LexiconRegistry> = OnceLock::new(); 6 + 7 + pub struct LexiconRegistry { 8 + schemas: HashMap<String, Arc<LexiconDoc>>, 9 + #[cfg(feature = "resolve")] 10 + dynamic: crate::dynamic::DynamicRegistry, 11 + } 12 + 13 + impl Default for LexiconRegistry { 14 + fn default() -> Self { 15 + Self::new() 16 + } 17 + } 18 + 19 + impl LexiconRegistry { 20 + pub fn global() -> &'static Self { 21 + REGISTRY.get_or_init(Self::new) 22 + } 23 + 24 + pub fn new() -> Self { 25 + Self { 26 + schemas: HashMap::new(), 27 + #[cfg(feature = "resolve")] 28 + dynamic: crate::dynamic::DynamicRegistry::new(), 29 + } 30 + } 31 + 32 + pub fn register(&mut self, doc: LexiconDoc) { 33 + let id = doc.id.clone(); 34 + self.schemas.insert(id, Arc::new(doc)); 35 + } 36 + 37 + #[cfg(feature = "resolve")] 38 + pub fn preload(&self, doc: LexiconDoc) { 39 + self.dynamic.insert_schema(doc); 40 + } 41 + 42 + pub fn get_doc(&self, nsid: &str) -> Option<Arc<LexiconDoc>> { 43 + self.schemas.get(nsid).cloned().or_else(|| { 44 + #[cfg(feature = "resolve")] 45 + { 46 + self.dynamic.get(nsid) 47 + } 48 + #[cfg(not(feature = "resolve"))] 49 + { 50 + None 51 + } 52 + }) 53 + } 54 + 55 + pub fn get_record_def(&self, nsid: &str) -> Option<Arc<LexiconDoc>> { 56 + let doc = self.get_doc(nsid)?; 57 + match doc.defs.get("main")? { 58 + LexDef::Record(_) => Some(doc), 59 + _ => None, 60 + } 61 + } 62 + 63 + pub fn resolve_ref(&self, reference: &str, context_nsid: &str) -> Option<ResolvedRef> { 64 + match parse_ref(reference) { 65 + ParsedRef::Local(local) => { 66 + let doc = self.get_doc(context_nsid)?; 67 + Self::def_to_resolved(&doc, local) 68 + } 69 + ParsedRef::Qualified { nsid, fragment } => { 70 + let doc = self.get_doc(nsid)?; 71 + Self::def_to_resolved(&doc, fragment) 72 + } 73 + ParsedRef::Bare(nsid) => { 74 + let doc = self.get_doc(nsid)?; 75 + Self::def_to_resolved(&doc, "main") 76 + } 77 + } 78 + } 79 + 80 + fn def_to_resolved(doc: &Arc<LexiconDoc>, def_name: &str) -> Option<ResolvedRef> { 81 + let def = doc.defs.get(def_name)?; 82 + match def { 83 + LexDef::Object(_) | LexDef::Record(_) | LexDef::Token {} | LexDef::StringDef(_) => { 84 + Some(ResolvedRef { 85 + doc: Arc::clone(doc), 86 + def_name: def_name.to_string(), 87 + }) 88 + } 89 + _ => None, 90 + } 91 + } 92 + 93 + pub fn has_schema(&self, nsid: &str) -> bool { 94 + self.get_doc(nsid).is_some() 95 + } 96 + 97 + pub fn schema_count(&self) -> usize { 98 + let embedded = self.schemas.len(); 99 + #[cfg(feature = "resolve")] 100 + { 101 + embedded + self.dynamic.schema_count() 102 + } 103 + #[cfg(not(feature = "resolve"))] 104 + { 105 + embedded 106 + } 107 + } 108 + 109 + #[cfg(feature = "resolve")] 110 + pub async fn resolve_dynamic( 111 + &self, 112 + nsid: &str, 113 + ) -> Result<Arc<LexiconDoc>, crate::resolve::ResolveError> { 114 + self.dynamic.resolve_and_cache(nsid).await 115 + } 116 + 117 + #[cfg(feature = "resolve")] 118 + pub fn is_negative_cached(&self, nsid: &str) -> bool { 119 + self.dynamic.is_negative_cached(nsid) 120 + } 121 + } 122 + 123 + pub struct ResolvedRef { 124 + doc: Arc<LexiconDoc>, 125 + def_name: String, 126 + } 127 + 128 + impl ResolvedRef { 129 + pub fn as_object(&self) -> Option<&LexObject> { 130 + match self.doc.defs.get(&self.def_name)? { 131 + LexDef::Object(obj) => Some(obj), 132 + LexDef::Record(rec) => Some(&rec.record), 133 + _ => None, 134 + } 135 + } 136 + 137 + pub fn is_token(&self) -> bool { 138 + self.doc 139 + .defs 140 + .get(&self.def_name) 141 + .is_some_and(|def| matches!(def, LexDef::Token {} | LexDef::StringDef(_))) 142 + } 143 + } 144 + 145 + #[cfg(test)] 146 + mod tests { 147 + use super::*; 148 + 149 + #[test] 150 + fn test_empty_registry() { 151 + let registry = LexiconRegistry::new(); 152 + assert_eq!(registry.schema_count(), 0); 153 + assert!(!registry.has_schema("app.bsky.feed.post")); 154 + } 155 + 156 + #[test] 157 + fn test_register_and_lookup() { 158 + let mut registry = LexiconRegistry::new(); 159 + let doc = LexiconDoc { 160 + lexicon: 1, 161 + id: "com.example.test".to_string(), 162 + defs: HashMap::new(), 163 + }; 164 + registry.register(doc); 165 + assert_eq!(registry.schema_count(), 1); 166 + assert!(registry.has_schema("com.example.test")); 167 + assert!(!registry.has_schema("com.example.other")); 168 + } 169 + 170 + #[test] 171 + fn test_get_record_def() { 172 + let registry = crate::test_schemas::test_registry(); 173 + let doc = registry.get_record_def("com.test.basic"); 174 + assert!(doc.is_some()); 175 + let doc = doc.unwrap(); 176 + match doc.defs.get("main").unwrap() { 177 + LexDef::Record(rec) => { 178 + assert!(rec.record.required.contains(&"text".to_string())); 179 + assert!(rec.record.required.contains(&"createdAt".to_string())); 180 + } 181 + _ => panic!("expected record def"), 182 + } 183 + } 184 + 185 + #[test] 186 + fn test_get_record_def_unknown() { 187 + let registry = LexiconRegistry::new(); 188 + assert!(registry.get_record_def("com.example.nonexistent").is_none()); 189 + } 190 + 191 + #[test] 192 + fn test_resolve_ref_cross_schema() { 193 + let registry = crate::test_schemas::test_registry(); 194 + let resolved = registry.resolve_ref("com.test.strongref", "com.test.withref"); 195 + assert!(resolved.is_some_and(|r| r.as_object().is_some())); 196 + } 197 + 198 + #[test] 199 + fn test_resolve_local_ref() { 200 + let registry = crate::test_schemas::test_registry(); 201 + let resolved = registry.resolve_ref("#replyRef", "com.test.withreply"); 202 + assert!(resolved.is_some_and(|r| r.as_object().is_some())); 203 + } 204 + 205 + #[test] 206 + fn test_has_schema() { 207 + let registry = crate::test_schemas::test_registry(); 208 + assert!(registry.has_schema("com.test.basic")); 209 + assert!(!registry.has_schema("com.example.nonexistent")); 210 + } 211 + }

History

2 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
feat(lexicon): dynamic value types and schema registry
expand 0 comments
pull request successfully merged
1 commit
expand
feat(lexicon): dynamic value types and schema registry
expand 0 comments