+29
-10
constellation/src/bin/main.rs
+29
-10
constellation/src/bin/main.rs
···
54
/// Saved jsonl from jetstream to use instead of a live subscription
55
#[arg(short, long)]
56
fixture: Option<PathBuf>,
57
}
58
59
#[derive(Debug, Clone, ValueEnum)]
···
115
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
116
}
117
println!("rocks ready.");
118
-
run(
119
-
rocks,
120
-
fixture,
121
-
args.data,
122
-
stream,
123
-
bind,
124
-
metrics_bind,
125
-
stay_alive,
126
-
)
127
}
128
}
129
}
···
213
214
'monitor: loop {
215
match readable.get_stats() {
216
-
Ok(StorageStats { dids, targetables, linking_records }) => {
217
metrics::gauge!("storage.stats.dids").set(dids as f64);
218
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
219
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
···
54
/// Saved jsonl from jetstream to use instead of a live subscription
55
#[arg(short, long)]
56
fixture: Option<PathBuf>,
57
+
/// run a scan across the target id table and write all key -> ids to id -> keys
58
+
#[arg(long, action)]
59
+
repair_target_ids: bool,
60
}
61
62
#[derive(Debug, Clone, ValueEnum)]
···
118
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
119
}
120
println!("rocks ready.");
121
+
std::thread::scope(|s| {
122
+
if args.repair_target_ids {
123
+
let rocks = rocks.clone();
124
+
let stay_alive = stay_alive.clone();
125
+
s.spawn(move || {
126
+
let rep = rocks.run_repair(time::Duration::from_millis(1), stay_alive);
127
+
eprintln!("repair finished: {rep:?}");
128
+
rep
129
+
});
130
+
}
131
+
s.spawn(|| {
132
+
let r = run(
133
+
rocks,
134
+
fixture,
135
+
args.data,
136
+
stream,
137
+
bind,
138
+
metrics_bind,
139
+
stay_alive,
140
+
);
141
+
eprintln!("run finished: {r:?}");
142
+
r
143
+
});
144
+
});
145
+
Ok(())
146
}
147
}
148
}
···
232
233
'monitor: loop {
234
match readable.get_stats() {
235
+
Ok(StorageStats { dids, targetables, linking_records, .. }) => {
236
metrics::gauge!("storage.stats.dids").set(dids as f64);
237
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
238
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
+6
-7
constellation/src/server/mod.rs
+6
-7
constellation/src/server/mod.rs
···
32
DEFAULT_CURSOR_LIMIT
33
}
34
35
-
const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
36
-
37
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
38
eprintln!("handler error: {e}");
39
http::StatusCode::INTERNAL_SERVER_ERROR
···
201
#[template(path = "hello.html.j2")]
202
struct HelloReponse {
203
help: &'static str,
204
-
days_indexed: u64,
205
stats: StorageStats,
206
}
207
fn hello(
···
211
let stats = store
212
.get_stats()
213
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
214
-
let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS))
215
-
.elapsed()
216
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
217
-
.as_secs()
218
-
/ 86400;
219
Ok(acceptable(accept, HelloReponse {
220
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
221
days_indexed,
···
32
DEFAULT_CURSOR_LIMIT
33
}
34
35
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
36
eprintln!("handler error: {e}");
37
http::StatusCode::INTERNAL_SERVER_ERROR
···
199
#[template(path = "hello.html.j2")]
200
struct HelloReponse {
201
help: &'static str,
202
+
days_indexed: Option<u64>,
203
stats: StorageStats,
204
}
205
fn hello(
···
209
let stats = store
210
.get_stats()
211
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
212
+
let days_indexed = stats
213
+
.started_at
214
+
.map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed())
215
+
.transpose()
216
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
217
+
.map(|d| d.as_secs() / 86_400);
218
Ok(acceptable(accept, HelloReponse {
219
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
220
days_indexed,
+2
constellation/src/storage/mem_store.rs
+2
constellation/src/storage/mem_store.rs
+6
constellation/src/storage/mod.rs
+6
constellation/src/storage/mod.rs
···
44
/// records with multiple links are single-counted.
45
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
46
pub linking_records: u64,
47
+
48
+
/// first jetstream cursor when this instance first started
49
+
pub started_at: Option<u64>,
50
+
51
+
/// anything else we want to throw in
52
+
pub other_data: HashMap<String, u64>,
53
}
54
55
pub trait LinkStorage: Send + Sync {
+162
-3
constellation/src/storage/rocks_store.rs
+162
-3
constellation/src/storage/rocks_store.rs
···
23
Arc,
24
};
25
use std::thread;
26
-
use std::time::{Duration, Instant};
27
use tokio_util::sync::CancellationToken;
28
29
static DID_IDS_CF: &str = "did_ids";
···
32
static LINK_TARGETS_CF: &str = "link_targets";
33
34
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
35
36
// todo: actually understand and set these options probably better
37
fn rocks_opts_base() -> Options {
···
139
_key_marker: PhantomData,
140
_val_marker: PhantomData,
141
name: name.into(),
142
-
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1
143
}
144
}
145
fn get_id_val(
···
228
}
229
}
230
231
impl RocksStorage {
232
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
233
Self::describe_metrics();
234
-
RocksStorage::open_readmode(path, false)
235
}
236
237
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
242
let did_id_table = IdTable::setup(DID_IDS_CF);
243
let target_id_table = IdTable::setup(TARGET_IDS_CF);
244
245
let cfs = vec![
246
// id reference tables
247
did_id_table.cf_descriptor(),
···
275
is_writer: !readonly,
276
backup_task: None.into(),
277
})
278
}
279
280
pub fn start_backup(
···
1179
.map(|s| s.parse::<u64>())
1180
.transpose()?
1181
.unwrap_or(0);
1182
Ok(StorageStats {
1183
dids,
1184
targetables,
1185
linking_records,
1186
})
1187
}
1188
}
···
1208
impl AsRocksValue for &TargetId {}
1209
impl KeyFromRocks for TargetKey {}
1210
impl ValueFromRocks for TargetId {}
1211
1212
// target_links table
1213
impl AsRocksKey for &TargetId {}
···
23
Arc,
24
};
25
use std::thread;
26
+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27
use tokio_util::sync::CancellationToken;
28
29
static DID_IDS_CF: &str = "did_ids";
···
32
static LINK_TARGETS_CF: &str = "link_targets";
33
34
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
35
+
static STARTED_AT_KEY: &str = "jetstream_first_cursor";
36
+
// add reverse mappings for targets if this db was running before that was a thing
37
+
static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state";
38
+
39
+
static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started
40
+
41
+
#[derive(Debug, Clone, Serialize, Deserialize)]
42
+
struct TargetIdRepairState {
43
+
/// start time for repair, microseconds timestamp
44
+
current_us_started_at: u64,
45
+
/// id table's latest id when repair started
46
+
id_when_started: u64,
47
+
/// id table id
48
+
latest_repaired_i: u64,
49
+
}
50
+
impl AsRocksValue for TargetIdRepairState {}
51
+
impl ValueFromRocks for TargetIdRepairState {}
52
53
// todo: actually understand and set these options probably better
54
fn rocks_opts_base() -> Options {
···
156
_key_marker: PhantomData,
157
_val_marker: PhantomData,
158
name: name.into(),
159
+
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1
160
}
161
}
162
fn get_id_val(
···
245
}
246
}
247
248
+
fn now() -> u64 {
249
+
SystemTime::now()
250
+
.duration_since(UNIX_EPOCH)
251
+
.unwrap()
252
+
.as_micros() as u64
253
+
}
254
+
255
impl RocksStorage {
256
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
257
Self::describe_metrics();
258
+
let me = RocksStorage::open_readmode(path, false)?;
259
+
me.global_init()?;
260
+
Ok(me)
261
}
262
263
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
268
let did_id_table = IdTable::setup(DID_IDS_CF);
269
let target_id_table = IdTable::setup(TARGET_IDS_CF);
270
271
+
// note: global stuff like jetstream cursor goes in the default cf
272
+
// these are bonus extra cfs
273
let cfs = vec![
274
// id reference tables
275
did_id_table.cf_descriptor(),
···
303
is_writer: !readonly,
304
backup_task: None.into(),
305
})
306
+
}
307
+
308
+
fn global_init(&self) -> Result<()> {
309
+
let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
310
+
if first_run {
311
+
self.db.put(STARTED_AT_KEY, _rv(now()))?;
312
+
313
+
// hack / temporary: if we're a new db, put in a completed repair
314
+
// state so we don't run repairs (repairs are for old-code dbs)
315
+
let completed = TargetIdRepairState {
316
+
id_when_started: 0,
317
+
current_us_started_at: 0,
318
+
latest_repaired_i: 0,
319
+
};
320
+
self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
321
+
}
322
+
Ok(())
323
+
}
324
+
325
+
pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
326
+
let mut state = match self
327
+
.db
328
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
329
+
.map(|s| _vr(&s))
330
+
.transpose()?
331
+
{
332
+
Some(s) => s,
333
+
None => TargetIdRepairState {
334
+
id_when_started: self.did_id_table.priv_id_seq,
335
+
current_us_started_at: now(),
336
+
latest_repaired_i: 0,
337
+
},
338
+
};
339
+
340
+
eprintln!("initial repair state: {state:?}");
341
+
342
+
let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
343
+
344
+
let mut iter = self.db.raw_iterator_cf(&cf);
345
+
iter.seek_to_first();
346
+
347
+
eprintln!("repair iterator sent to first key");
348
+
349
+
// skip ahead if we're done some, or take a single first step
350
+
for _ in 0..state.latest_repaired_i {
351
+
iter.next();
352
+
}
353
+
354
+
eprintln!(
355
+
"repair iterator skipped to {}th key",
356
+
state.latest_repaired_i
357
+
);
358
+
359
+
let mut maybe_done = false;
360
+
361
+
while !stay_alive.is_cancelled() && !maybe_done {
362
+
// let mut batch = WriteBatch::default();
363
+
364
+
let mut any_written = false;
365
+
366
+
for _ in 0..1000 {
367
+
if state.latest_repaired_i % 1_000_000 == 0 {
368
+
eprintln!("target iter at {}", state.latest_repaired_i);
369
+
}
370
+
state.latest_repaired_i += 1;
371
+
372
+
if !iter.valid() {
373
+
eprintln!("invalid iter, are we done repairing?");
374
+
maybe_done = true;
375
+
break;
376
+
};
377
+
378
+
// eprintln!("iterator seems to be valid! getting the key...");
379
+
let raw_key = iter.key().unwrap();
380
+
if raw_key.len() == 8 {
381
+
// eprintln!("found an 8-byte key, skipping it since it's probably an id...");
382
+
iter.next();
383
+
continue;
384
+
}
385
+
let target: TargetKey = _kr::<TargetKey>(raw_key)?;
386
+
let target_id: TargetId = _vr(iter.value().unwrap())?;
387
+
388
+
self.db
389
+
.put_cf(&cf, target_id.id().to_be_bytes(), _rv(&target))?;
390
+
any_written = true;
391
+
iter.next();
392
+
}
393
+
394
+
if any_written {
395
+
self.db
396
+
.put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
397
+
std::thread::sleep(breather);
398
+
}
399
+
}
400
+
401
+
eprintln!("repair iterator done.");
402
+
403
+
Ok(false)
404
}
405
406
pub fn start_backup(
···
1305
.map(|s| s.parse::<u64>())
1306
.transpose()?
1307
.unwrap_or(0);
1308
+
let started_at = self
1309
+
.db
1310
+
.get(STARTED_AT_KEY)?
1311
+
.map(|c| _vr(&c))
1312
+
.transpose()?
1313
+
.unwrap_or(COZY_FIRST_CURSOR);
1314
+
1315
+
let other_data = self
1316
+
.db
1317
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
1318
+
.map(|s| _vr(&s))
1319
+
.transpose()?
1320
+
.map(
1321
+
|TargetIdRepairState {
1322
+
current_us_started_at,
1323
+
id_when_started,
1324
+
latest_repaired_i,
1325
+
}| {
1326
+
HashMap::from([
1327
+
("current_us_started_at".to_string(), current_us_started_at),
1328
+
("id_when_started".to_string(), id_when_started),
1329
+
("latest_repaired_i".to_string(), latest_repaired_i),
1330
+
])
1331
+
},
1332
+
)
1333
+
.unwrap_or(HashMap::default());
1334
+
1335
Ok(StorageStats {
1336
dids,
1337
targetables,
1338
linking_records,
1339
+
started_at: Some(started_at),
1340
+
other_data,
1341
})
1342
}
1343
}
···
1363
impl AsRocksValue for &TargetId {}
1364
impl KeyFromRocks for TargetKey {}
1365
impl ValueFromRocks for TargetId {}
1366
+
1367
+
// temp?
1368
+
impl KeyFromRocks for TargetId {}
1369
+
impl AsRocksValue for &TargetKey {}
1370
1371
// target_links table
1372
impl AsRocksKey for &TargetId {}
+12
-2
constellation/templates/hello.html.j2
+12
-2
constellation/templates/hello.html.j2
···
19
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
20
21
<p>
22
-
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">{{ days_indexed|human_number }}</span> days.<br/>
23
<small>(indexing new records in real time, backfill coming soon!)</small>
24
</p>
25
26
-
<p>But feel free to use it! If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
27
28
29
<h2>API Endpoints</h2>
···
19
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
20
21
<p>
22
+
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">
23
+
{%- if let Some(days) = days_indexed %}
24
+
{{ days|human_number }}
25
+
{% else %}
26
+
???
27
+
{% endif -%}
28
+
</span> days.<br/>
29
<small>(indexing new records in real time, backfill coming soon!)</small>
30
</p>
31
32
+
{# {% for k, v in stats.other_data.iter() %}
33
+
<p><strong>{{ k }}</strong>: {{ v }}</p>
34
+
{% endfor %} #}
35
+
36
+
<p>You're welcome to use this public instance! Please do not build the torment nexus. If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
37
38
39
<h2>API Endpoints</h2>