+489
Diff
round #1
+278
crates/tranquil-lexicon/src/dynamic.rs
+278
crates/tranquil-lexicon/src/dynamic.rs
···
1
+
use crate::resolve::{ResolveError, resolve_lexicon};
2
+
use crate::schema::LexiconDoc;
3
+
use parking_lot::RwLock;
4
+
use std::collections::{HashMap, VecDeque};
5
+
use std::sync::Arc;
6
+
use std::sync::atomic::{AtomicBool, Ordering};
7
+
use std::time::{Duration, Instant};
8
+
9
+
const NEGATIVE_CACHE_TTL: Duration = Duration::from_secs(24 * 60 * 60);
10
+
const MAX_DYNAMIC_SCHEMAS: usize = 1024;
11
+
12
+
struct NegativeEntry {
13
+
expires_at: Instant,
14
+
}
15
+
16
+
struct SchemaStore {
17
+
schemas: HashMap<String, Arc<LexiconDoc>>,
18
+
insertion_order: VecDeque<String>,
19
+
}
20
+
21
+
pub struct DynamicRegistry {
22
+
store: RwLock<SchemaStore>,
23
+
negative_cache: RwLock<HashMap<String, NegativeEntry>>,
24
+
network_disabled: AtomicBool,
25
+
}
26
+
27
+
impl DynamicRegistry {
28
+
pub fn new() -> Self {
29
+
let network_disabled =
30
+
std::env::var("TRANQUIL_LEXICON_OFFLINE").is_ok_and(|v| v == "1" || v == "true");
31
+
Self {
32
+
store: RwLock::new(SchemaStore {
33
+
schemas: HashMap::new(),
34
+
insertion_order: VecDeque::new(),
35
+
}),
36
+
negative_cache: RwLock::new(HashMap::new()),
37
+
network_disabled: AtomicBool::new(network_disabled),
38
+
}
39
+
}
40
+
41
+
#[allow(dead_code)]
42
+
pub fn set_network_disabled(&self, disabled: bool) {
43
+
self.network_disabled.store(disabled, Ordering::Relaxed);
44
+
}
45
+
46
+
pub fn get(&self, nsid: &str) -> Option<Arc<LexiconDoc>> {
47
+
self.store.read().schemas.get(nsid).cloned()
48
+
}
49
+
50
+
pub fn is_negative_cached(&self, nsid: &str) -> bool {
51
+
let cache = self.negative_cache.read();
52
+
cache
53
+
.get(nsid)
54
+
.is_some_and(|entry| entry.expires_at > Instant::now())
55
+
}
56
+
57
+
fn insert_negative(&self, nsid: &str) {
58
+
let mut cache = self.negative_cache.write();
59
+
if cache.len() > MAX_DYNAMIC_SCHEMAS {
60
+
let now = Instant::now();
61
+
cache.retain(|_, entry| entry.expires_at > now);
62
+
}
63
+
cache.insert(
64
+
nsid.to_string(),
65
+
NegativeEntry {
66
+
expires_at: Instant::now() + NEGATIVE_CACHE_TTL,
67
+
},
68
+
);
69
+
}
70
+
71
+
pub(crate) fn insert_schema(&self, doc: LexiconDoc) -> Arc<LexiconDoc> {
72
+
let arc = Arc::new(doc);
73
+
let nsid = arc.id.clone();
74
+
75
+
let mut store = self.store.write();
76
+
77
+
if store.schemas.len() >= MAX_DYNAMIC_SCHEMAS {
78
+
tracing::warn!(
79
+
count = store.schemas.len(),
80
+
"dynamic schema registry at capacity, evicting oldest entries"
81
+
);
82
+
let evict_count = store.schemas.len() / 4;
83
+
(0..evict_count).for_each(|_| {
84
+
if let Some(key) = store.insertion_order.pop_front() {
85
+
store.schemas.remove(&key);
86
+
}
87
+
});
88
+
}
89
+
90
+
if store
91
+
.schemas
92
+
.insert(nsid.clone(), Arc::clone(&arc))
93
+
.is_some()
94
+
{
95
+
store.insertion_order.retain(|k| k != &nsid);
96
+
}
97
+
store.insertion_order.push_back(nsid.clone());
98
+
99
+
self.negative_cache.write().remove(&arc.id);
100
+
101
+
arc
102
+
}
103
+
104
+
pub async fn resolve_and_cache(&self, nsid: &str) -> Result<Arc<LexiconDoc>, ResolveError> {
105
+
if let Some(doc) = self.get(nsid) {
106
+
return Ok(doc);
107
+
}
108
+
109
+
if self.network_disabled.load(Ordering::Relaxed) {
110
+
return Err(ResolveError::NetworkDisabled);
111
+
}
112
+
113
+
if self.is_negative_cached(nsid) {
114
+
return Err(ResolveError::NegativelyCached {
115
+
nsid: nsid.to_string(),
116
+
ttl_secs: NEGATIVE_CACHE_TTL.as_secs(),
117
+
});
118
+
}
119
+
120
+
match resolve_lexicon(nsid).await {
121
+
Ok(doc) => Ok(self.insert_schema(doc)),
122
+
Err(e) => {
123
+
tracing::debug!(nsid = nsid, error = %e, "caching negative resolution result");
124
+
self.insert_negative(nsid);
125
+
Err(e)
126
+
}
127
+
}
128
+
}
129
+
130
+
pub fn schema_count(&self) -> usize {
131
+
self.store.read().schemas.len()
132
+
}
133
+
}
134
+
135
+
impl Default for DynamicRegistry {
136
+
fn default() -> Self {
137
+
Self::new()
138
+
}
139
+
}
140
+
141
+
#[cfg(test)]
142
+
mod tests {
143
+
use super::*;
144
+
145
+
#[test]
146
+
fn test_negative_cache() {
147
+
let registry = DynamicRegistry::new();
148
+
assert!(!registry.is_negative_cached("com.example.test"));
149
+
150
+
registry.insert_negative("com.example.test");
151
+
assert!(registry.is_negative_cached("com.example.test"));
152
+
}
153
+
154
+
#[tokio::test]
155
+
async fn test_negative_cache_returns_appropriate_error_variant() {
156
+
let registry = DynamicRegistry::new();
157
+
registry.insert_negative("com.example.cached");
158
+
159
+
let err = registry
160
+
.resolve_and_cache("com.example.cached")
161
+
.await
162
+
.unwrap_err();
163
+
164
+
assert!(
165
+
!matches!(err, ResolveError::InvalidNsid(_)),
166
+
"negative cache hit should not return InvalidNsid - the NSID is valid, it just failed resolution recently. got: {}",
167
+
err
168
+
);
169
+
}
170
+
171
+
#[test]
172
+
fn test_empty_lookup() {
173
+
let registry = DynamicRegistry::new();
174
+
assert!(registry.get("com.example.nonexistent").is_none());
175
+
assert_eq!(registry.schema_count(), 0);
176
+
}
177
+
178
+
#[test]
179
+
fn test_insert_and_retrieve() {
180
+
let registry = DynamicRegistry::new();
181
+
let doc = LexiconDoc {
182
+
lexicon: 1,
183
+
id: "com.example.test".to_string(),
184
+
defs: HashMap::new(),
185
+
};
186
+
187
+
let arc = registry.insert_schema(doc);
188
+
assert_eq!(arc.id, "com.example.test");
189
+
assert_eq!(registry.schema_count(), 1);
190
+
191
+
let retrieved = registry.get("com.example.test");
192
+
assert!(retrieved.is_some());
193
+
assert_eq!(retrieved.unwrap().id, "com.example.test");
194
+
}
195
+
196
+
#[test]
197
+
fn test_negative_cache_cleared_on_insert() {
198
+
let registry = DynamicRegistry::new();
199
+
200
+
registry.insert_negative("com.example.test");
201
+
assert!(registry.is_negative_cached("com.example.test"));
202
+
203
+
let doc = LexiconDoc {
204
+
lexicon: 1,
205
+
id: "com.example.test".to_string(),
206
+
defs: HashMap::new(),
207
+
};
208
+
registry.insert_schema(doc);
209
+
210
+
assert!(!registry.is_negative_cached("com.example.test"));
211
+
}
212
+
213
+
#[test]
214
+
fn test_eviction_is_fifo() {
215
+
let registry = DynamicRegistry::new();
216
+
217
+
(0..MAX_DYNAMIC_SCHEMAS).for_each(|i| {
218
+
let doc = LexiconDoc {
219
+
lexicon: 1,
220
+
id: format!("com.example.schema{}", i),
221
+
defs: HashMap::new(),
222
+
};
223
+
registry.insert_schema(doc);
224
+
});
225
+
assert_eq!(registry.schema_count(), MAX_DYNAMIC_SCHEMAS);
226
+
227
+
let trigger = LexiconDoc {
228
+
lexicon: 1,
229
+
id: "com.example.trigger".to_string(),
230
+
defs: HashMap::new(),
231
+
};
232
+
registry.insert_schema(trigger);
233
+
234
+
assert!(
235
+
registry.get("com.example.schema0").is_none(),
236
+
"oldest entry should be evicted"
237
+
);
238
+
assert!(
239
+
registry.get("com.example.trigger").is_some(),
240
+
"newly inserted entry should exist"
241
+
);
242
+
let evict_count = MAX_DYNAMIC_SCHEMAS / 4;
243
+
assert!(
244
+
registry
245
+
.get(&format!("com.example.schema{}", evict_count))
246
+
.is_some(),
247
+
"entry after eviction window should survive"
248
+
);
249
+
}
250
+
251
+
#[test]
252
+
fn test_eviction_frees_memory() {
253
+
let registry = DynamicRegistry::new();
254
+
let doc = LexiconDoc {
255
+
lexicon: 1,
256
+
id: "com.example.tracked".to_string(),
257
+
defs: HashMap::new(),
258
+
};
259
+
let arc = registry.insert_schema(doc);
260
+
let weak = Arc::downgrade(&arc);
261
+
drop(arc);
262
+
263
+
assert!(weak.upgrade().is_some(), "registry still holds a reference");
264
+
265
+
(0..MAX_DYNAMIC_SCHEMAS).for_each(|i| {
266
+
registry.insert_schema(LexiconDoc {
267
+
lexicon: 1,
268
+
id: format!("com.example.filler{}", i),
269
+
defs: HashMap::new(),
270
+
});
271
+
});
272
+
273
+
assert!(
274
+
weak.upgrade().is_none(),
275
+
"evicted Arc should be freed when no external references remain"
276
+
);
277
+
}
278
+
}
+211
crates/tranquil-lexicon/src/registry.rs
+211
crates/tranquil-lexicon/src/registry.rs
···
1
+
use crate::schema::{LexDef, LexObject, LexiconDoc, ParsedRef, parse_ref};
2
+
use std::collections::HashMap;
3
+
use std::sync::{Arc, OnceLock};
4
+
5
+
static REGISTRY: OnceLock<LexiconRegistry> = OnceLock::new();
6
+
7
+
pub struct LexiconRegistry {
8
+
schemas: HashMap<String, Arc<LexiconDoc>>,
9
+
#[cfg(feature = "resolve")]
10
+
dynamic: crate::dynamic::DynamicRegistry,
11
+
}
12
+
13
+
impl Default for LexiconRegistry {
14
+
fn default() -> Self {
15
+
Self::new()
16
+
}
17
+
}
18
+
19
+
impl LexiconRegistry {
20
+
pub fn global() -> &'static Self {
21
+
REGISTRY.get_or_init(Self::new)
22
+
}
23
+
24
+
pub fn new() -> Self {
25
+
Self {
26
+
schemas: HashMap::new(),
27
+
#[cfg(feature = "resolve")]
28
+
dynamic: crate::dynamic::DynamicRegistry::new(),
29
+
}
30
+
}
31
+
32
+
pub fn register(&mut self, doc: LexiconDoc) {
33
+
let id = doc.id.clone();
34
+
self.schemas.insert(id, Arc::new(doc));
35
+
}
36
+
37
+
#[cfg(feature = "resolve")]
38
+
pub fn preload(&self, doc: LexiconDoc) {
39
+
self.dynamic.insert_schema(doc);
40
+
}
41
+
42
+
pub fn get_doc(&self, nsid: &str) -> Option<Arc<LexiconDoc>> {
43
+
self.schemas.get(nsid).cloned().or_else(|| {
44
+
#[cfg(feature = "resolve")]
45
+
{
46
+
self.dynamic.get(nsid)
47
+
}
48
+
#[cfg(not(feature = "resolve"))]
49
+
{
50
+
None
51
+
}
52
+
})
53
+
}
54
+
55
+
pub fn get_record_def(&self, nsid: &str) -> Option<Arc<LexiconDoc>> {
56
+
let doc = self.get_doc(nsid)?;
57
+
match doc.defs.get("main")? {
58
+
LexDef::Record(_) => Some(doc),
59
+
_ => None,
60
+
}
61
+
}
62
+
63
+
pub fn resolve_ref(&self, reference: &str, context_nsid: &str) -> Option<ResolvedRef> {
64
+
match parse_ref(reference) {
65
+
ParsedRef::Local(local) => {
66
+
let doc = self.get_doc(context_nsid)?;
67
+
Self::def_to_resolved(&doc, local)
68
+
}
69
+
ParsedRef::Qualified { nsid, fragment } => {
70
+
let doc = self.get_doc(nsid)?;
71
+
Self::def_to_resolved(&doc, fragment)
72
+
}
73
+
ParsedRef::Bare(nsid) => {
74
+
let doc = self.get_doc(nsid)?;
75
+
Self::def_to_resolved(&doc, "main")
76
+
}
77
+
}
78
+
}
79
+
80
+
fn def_to_resolved(doc: &Arc<LexiconDoc>, def_name: &str) -> Option<ResolvedRef> {
81
+
let def = doc.defs.get(def_name)?;
82
+
match def {
83
+
LexDef::Object(_) | LexDef::Record(_) | LexDef::Token {} | LexDef::StringDef(_) => {
84
+
Some(ResolvedRef {
85
+
doc: Arc::clone(doc),
86
+
def_name: def_name.to_string(),
87
+
})
88
+
}
89
+
_ => None,
90
+
}
91
+
}
92
+
93
+
pub fn has_schema(&self, nsid: &str) -> bool {
94
+
self.get_doc(nsid).is_some()
95
+
}
96
+
97
+
pub fn schema_count(&self) -> usize {
98
+
let embedded = self.schemas.len();
99
+
#[cfg(feature = "resolve")]
100
+
{
101
+
embedded + self.dynamic.schema_count()
102
+
}
103
+
#[cfg(not(feature = "resolve"))]
104
+
{
105
+
embedded
106
+
}
107
+
}
108
+
109
+
#[cfg(feature = "resolve")]
110
+
pub async fn resolve_dynamic(
111
+
&self,
112
+
nsid: &str,
113
+
) -> Result<Arc<LexiconDoc>, crate::resolve::ResolveError> {
114
+
self.dynamic.resolve_and_cache(nsid).await
115
+
}
116
+
117
+
#[cfg(feature = "resolve")]
118
+
pub fn is_negative_cached(&self, nsid: &str) -> bool {
119
+
self.dynamic.is_negative_cached(nsid)
120
+
}
121
+
}
122
+
123
+
pub struct ResolvedRef {
124
+
doc: Arc<LexiconDoc>,
125
+
def_name: String,
126
+
}
127
+
128
+
impl ResolvedRef {
129
+
pub fn as_object(&self) -> Option<&LexObject> {
130
+
match self.doc.defs.get(&self.def_name)? {
131
+
LexDef::Object(obj) => Some(obj),
132
+
LexDef::Record(rec) => Some(&rec.record),
133
+
_ => None,
134
+
}
135
+
}
136
+
137
+
pub fn is_token(&self) -> bool {
138
+
self.doc
139
+
.defs
140
+
.get(&self.def_name)
141
+
.is_some_and(|def| matches!(def, LexDef::Token {} | LexDef::StringDef(_)))
142
+
}
143
+
}
144
+
145
+
#[cfg(test)]
146
+
mod tests {
147
+
use super::*;
148
+
149
+
#[test]
150
+
fn test_empty_registry() {
151
+
let registry = LexiconRegistry::new();
152
+
assert_eq!(registry.schema_count(), 0);
153
+
assert!(!registry.has_schema("app.bsky.feed.post"));
154
+
}
155
+
156
+
#[test]
157
+
fn test_register_and_lookup() {
158
+
let mut registry = LexiconRegistry::new();
159
+
let doc = LexiconDoc {
160
+
lexicon: 1,
161
+
id: "com.example.test".to_string(),
162
+
defs: HashMap::new(),
163
+
};
164
+
registry.register(doc);
165
+
assert_eq!(registry.schema_count(), 1);
166
+
assert!(registry.has_schema("com.example.test"));
167
+
assert!(!registry.has_schema("com.example.other"));
168
+
}
169
+
170
+
#[test]
171
+
fn test_get_record_def() {
172
+
let registry = crate::test_schemas::test_registry();
173
+
let doc = registry.get_record_def("com.test.basic");
174
+
assert!(doc.is_some());
175
+
let doc = doc.unwrap();
176
+
match doc.defs.get("main").unwrap() {
177
+
LexDef::Record(rec) => {
178
+
assert!(rec.record.required.contains(&"text".to_string()));
179
+
assert!(rec.record.required.contains(&"createdAt".to_string()));
180
+
}
181
+
_ => panic!("expected record def"),
182
+
}
183
+
}
184
+
185
+
#[test]
186
+
fn test_get_record_def_unknown() {
187
+
let registry = LexiconRegistry::new();
188
+
assert!(registry.get_record_def("com.example.nonexistent").is_none());
189
+
}
190
+
191
+
#[test]
192
+
fn test_resolve_ref_cross_schema() {
193
+
let registry = crate::test_schemas::test_registry();
194
+
let resolved = registry.resolve_ref("com.test.strongref", "com.test.withref");
195
+
assert!(resolved.is_some_and(|r| r.as_object().is_some()));
196
+
}
197
+
198
+
#[test]
199
+
fn test_resolve_local_ref() {
200
+
let registry = crate::test_schemas::test_registry();
201
+
let resolved = registry.resolve_ref("#replyRef", "com.test.withreply");
202
+
assert!(resolved.is_some_and(|r| r.as_object().is_some()));
203
+
}
204
+
205
+
#[test]
206
+
fn test_has_schema() {
207
+
let registry = crate::test_schemas::test_registry();
208
+
assert!(registry.has_schema("com.test.basic"));
209
+
assert!(!registry.has_schema("com.example.nonexistent"));
210
+
}
211
+
}
History
2 rounds
0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
feat(lexicon): dynamic value types and schema registry
expand 0 comments
pull request successfully merged
oyster.cafe
submitted
#0
1 commit
expand
collapse
feat(lexicon): dynamic value types and schema registry