+11
-11
Cargo.lock
+11
-11
Cargo.lock
···
992
992
993
993
[[package]]
994
994
name = "clap"
995
-
version = "4.5.47"
995
+
version = "4.5.48"
996
996
source = "registry+https://github.com/rust-lang/crates.io-index"
997
-
checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931"
997
+
checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae"
998
998
dependencies = [
999
999
"clap_builder",
1000
1000
"clap_derive",
···
1002
1002
1003
1003
[[package]]
1004
1004
name = "clap_builder"
1005
-
version = "4.5.47"
1005
+
version = "4.5.48"
1006
1006
source = "registry+https://github.com/rust-lang/crates.io-index"
1007
-
checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6"
1007
+
checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9"
1008
1008
dependencies = [
1009
1009
"anstream",
1010
1010
"anstyle",
···
1375
1375
checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f"
1376
1376
dependencies = [
1377
1377
"data-encoding",
1378
-
"syn 1.0.109",
1378
+
"syn 2.0.106",
1379
1379
]
1380
1380
1381
1381
[[package]]
···
1796
1796
[[package]]
1797
1797
name = "fjall"
1798
1798
version = "2.11.2"
1799
-
source = "git+https://github.com/fjall-rs/fjall.git#42d811f7c8cc9004407d520d37d2a1d8d246c03d"
1799
+
source = "git+https://github.com/fjall-rs/fjall.git?rev=fb229572bb7d1d6966a596994dc1708e47ec57d8#fb229572bb7d1d6966a596994dc1708e47ec57d8"
1800
1800
dependencies = [
1801
1801
"byteorder",
1802
1802
"byteview",
···
3045
3045
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
3046
3046
dependencies = [
3047
3047
"cfg-if",
3048
-
"windows-targets 0.48.5",
3048
+
"windows-targets 0.52.6",
3049
3049
]
3050
3050
3051
3051
[[package]]
···
4539
4539
4540
4540
[[package]]
4541
4541
name = "reqwest"
4542
-
version = "0.12.22"
4542
+
version = "0.12.23"
4543
4543
source = "registry+https://github.com/rust-lang/crates.io-index"
4544
-
checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
4544
+
checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb"
4545
4545
dependencies = [
4546
4546
"async-compression",
4547
4547
"base64 0.22.1",
···
6049
6049
"clap",
6050
6050
"dropshot",
6051
6051
"env_logger",
6052
-
"fjall 2.11.2 (git+https://github.com/fjall-rs/fjall.git)",
6052
+
"fjall 2.11.2 (git+https://github.com/fjall-rs/fjall.git?rev=fb229572bb7d1d6966a596994dc1708e47ec57d8)",
6053
6053
"getrandom 0.3.3",
6054
6054
"http",
6055
6055
"jetstream",
···
6440
6440
source = "registry+https://github.com/rust-lang/crates.io-index"
6441
6441
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
6442
6442
dependencies = [
6443
-
"windows-sys 0.48.0",
6443
+
"windows-sys 0.59.0",
6444
6444
]
6445
6445
6446
6446
[[package]]
+48
-13
constellation/src/bin/main.rs
+48
-13
constellation/src/bin/main.rs
···
26
26
#[arg(long)]
27
27
#[clap(default_value = "0.0.0.0:6789")]
28
28
bind: SocketAddr,
29
+
/// optionally disable the metrics server
30
+
#[arg(long)]
31
+
#[clap(default_value_t = false)]
32
+
collect_metrics: bool,
29
33
/// metrics server's listen address
30
34
#[arg(long)]
31
35
#[clap(default_value = "0.0.0.0:8765")]
···
54
58
/// Saved jsonl from jetstream to use instead of a live subscription
55
59
#[arg(short, long)]
56
60
fixture: Option<PathBuf>,
61
+
/// run a scan across the target id table and write all key -> ids to id -> keys
62
+
#[arg(long, action)]
63
+
repair_target_ids: bool,
57
64
}
58
65
59
66
#[derive(Debug, Clone, ValueEnum)]
···
89
96
let bind = args.bind;
90
97
let metrics_bind = args.bind_metrics;
91
98
99
+
let collect_metrics = args.collect_metrics;
92
100
let stay_alive = CancellationToken::new();
93
101
94
102
match args.backend {
···
99
107
stream,
100
108
bind,
101
109
metrics_bind,
110
+
collect_metrics,
102
111
stay_alive,
103
112
),
104
113
#[cfg(feature = "rocks")]
···
115
124
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
116
125
}
117
126
println!("rocks ready.");
118
-
run(
119
-
rocks,
120
-
fixture,
121
-
args.data,
122
-
stream,
123
-
bind,
124
-
metrics_bind,
125
-
stay_alive,
126
-
)
127
+
std::thread::scope(|s| {
128
+
if args.repair_target_ids {
129
+
let rocks = rocks.clone();
130
+
let stay_alive = stay_alive.clone();
131
+
s.spawn(move || {
132
+
let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive);
133
+
eprintln!("repair finished: {rep:?}");
134
+
rep
135
+
});
136
+
}
137
+
s.spawn(|| {
138
+
let r = run(
139
+
rocks,
140
+
fixture,
141
+
args.data,
142
+
stream,
143
+
bind,
144
+
metrics_bind,
145
+
collect_metrics,
146
+
stay_alive,
147
+
);
148
+
eprintln!("run finished: {r:?}");
149
+
r
150
+
});
151
+
});
152
+
Ok(())
127
153
}
128
154
}
129
155
}
130
156
157
+
#[allow(clippy::too_many_lines)]
158
+
#[allow(clippy::too_many_arguments)]
131
159
fn run(
132
160
mut storage: impl LinkStorage,
133
161
fixture: Option<PathBuf>,
···
135
163
stream: String,
136
164
bind: SocketAddr,
137
165
metrics_bind: SocketAddr,
166
+
collect_metrics: bool,
138
167
stay_alive: CancellationToken,
139
168
) -> Result<()> {
140
169
ctrlc::set_handler({
···
179
208
.build()
180
209
.expect("axum startup")
181
210
.block_on(async {
182
-
install_metrics_server(metrics_bind)?;
211
+
// Install metrics server only if requested
212
+
if collect_metrics {
213
+
install_metrics_server(metrics_bind)?;
214
+
}
183
215
serve(readable, bind, staying_alive).await
184
216
})
185
217
.unwrap();
···
187
219
}
188
220
});
189
221
190
-
s.spawn(move || { // monitor thread
222
+
// only spawn monitoring thread if the metrics server is running
223
+
if collect_metrics {
224
+
s.spawn(move || { // monitor thread
191
225
let stay_alive = stay_alive.clone();
192
226
let check_alive = stay_alive.clone();
193
227
···
213
247
214
248
'monitor: loop {
215
249
match readable.get_stats() {
216
-
Ok(StorageStats { dids, targetables, linking_records }) => {
250
+
Ok(StorageStats { dids, targetables, linking_records, .. }) => {
217
251
metrics::gauge!("storage.stats.dids").set(dids as f64);
218
252
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
219
253
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
···
239
273
}
240
274
}
241
275
stay_alive.drop_guard();
242
-
});
276
+
});
277
+
}
243
278
});
244
279
245
280
println!("byeeee");
+13
-6
constellation/src/consumer/jetstream.rs
+13
-6
constellation/src/consumer/jetstream.rs
···
226
226
println!("jetstream closed the websocket cleanly.");
227
227
break;
228
228
}
229
-
r => eprintln!("jetstream: close result after error: {r:?}"),
229
+
Err(_) => {
230
+
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1);
231
+
println!("jetstream failed to close the websocket cleanly.");
232
+
break;
233
+
}
234
+
Ok(r) => {
235
+
eprintln!("jetstream: close result after error: {r:?}");
236
+
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
237
+
.increment(1);
238
+
// if we didn't immediately get ConnectionClosed, we should keep polling read
239
+
// until we get it.
240
+
continue;
241
+
}
230
242
}
231
-
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
232
-
.increment(1);
233
-
// if we didn't immediately get ConnectionClosed, we should keep polling read
234
-
// until we get it.
235
-
continue;
236
243
}
237
244
};
238
245
+8
-6
constellation/src/server/filters.rs
+8
-6
constellation/src/server/filters.rs
···
5
5
Ok({
6
6
if let Some(link) = parse_any_link(s) {
7
7
match link {
8
-
Link::AtUri(at_uri) => at_uri.strip_prefix("at://").map(|noproto| {
9
-
format!("https://atproto-browser-plus-links.vercel.app/at/{noproto}")
10
-
}),
11
-
Link::Did(did) => Some(format!(
12
-
"https://atproto-browser-plus-links.vercel.app/at/{did}"
13
-
)),
8
+
Link::AtUri(at_uri) => at_uri
9
+
.strip_prefix("at://")
10
+
.map(|noproto| format!("https://pdsls.dev/at://{noproto}")),
11
+
Link::Did(did) => Some(format!("https://pdsls.dev/at://{did}")),
14
12
Link::Uri(uri) => Some(uri),
15
13
}
16
14
} else {
···
22
20
pub fn human_number(n: &u64) -> askama::Result<String> {
23
21
Ok(n.to_formatted_string(&Locale::en))
24
22
}
23
+
24
+
pub fn to_u64(n: usize) -> askama::Result<u64> {
25
+
Ok(n as u64)
26
+
}
+291
-20
constellation/src/server/mod.rs
+291
-20
constellation/src/server/mod.rs
···
14
14
use std::collections::{HashMap, HashSet};
15
15
use std::time::{Duration, UNIX_EPOCH};
16
16
use tokio::net::{TcpListener, ToSocketAddrs};
17
-
use tokio::task::block_in_place;
17
+
use tokio::task::spawn_blocking;
18
18
use tokio_util::sync::CancellationToken;
19
19
20
20
use crate::storage::{LinkReader, StorageStats};
···
25
25
26
26
use acceptable::{acceptable, ExtractAccept};
27
27
28
-
const DEFAULT_CURSOR_LIMIT: u64 = 16;
29
-
const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100;
28
+
const DEFAULT_CURSOR_LIMIT: u64 = 100;
29
+
const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000;
30
30
31
-
const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
31
+
fn get_default_cursor_limit() -> u64 {
32
+
DEFAULT_CURSOR_LIMIT
33
+
}
34
+
35
+
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
36
+
eprintln!("handler error: {e}");
37
+
http::StatusCode::INTERNAL_SERVER_ERROR
38
+
}
32
39
33
40
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
34
41
where
···
41
48
"/",
42
49
get({
43
50
let store = store.clone();
44
-
move |accept| async { block_in_place(|| hello(accept, store)) }
51
+
move |accept| async {
52
+
spawn_blocking(|| hello(accept, store))
53
+
.await
54
+
.map_err(to500)?
55
+
}
56
+
}),
57
+
)
58
+
.route(
59
+
"/xrpc/blue.microcosm.links.getManyToManyCounts",
60
+
get({
61
+
let store = store.clone();
62
+
move |accept, query| async {
63
+
spawn_blocking(|| get_many_to_many_counts(accept, query, store))
64
+
.await
65
+
.map_err(to500)?
66
+
}
45
67
}),
46
68
)
47
69
.route(
48
70
"/links/count",
49
71
get({
50
72
let store = store.clone();
51
-
move |accept, query| async { block_in_place(|| count_links(accept, query, store)) }
73
+
move |accept, query| async {
74
+
spawn_blocking(|| count_links(accept, query, store))
75
+
.await
76
+
.map_err(to500)?
77
+
}
52
78
}),
53
79
)
54
80
.route(
···
56
82
get({
57
83
let store = store.clone();
58
84
move |accept, query| async {
59
-
block_in_place(|| count_distinct_dids(accept, query, store))
85
+
spawn_blocking(|| count_distinct_dids(accept, query, store))
86
+
.await
87
+
.map_err(to500)?
88
+
}
89
+
}),
90
+
)
91
+
.route(
92
+
"/xrpc/blue.microcosm.links.getBacklinks",
93
+
get({
94
+
let store = store.clone();
95
+
move |accept, query| async {
96
+
spawn_blocking(|| get_backlinks(accept, query, store))
97
+
.await
98
+
.map_err(to500)?
60
99
}
61
100
}),
62
101
)
···
64
103
"/links",
65
104
get({
66
105
let store = store.clone();
67
-
move |accept, query| async { block_in_place(|| get_links(accept, query, store)) }
106
+
move |accept, query| async {
107
+
spawn_blocking(|| get_links(accept, query, store))
108
+
.await
109
+
.map_err(to500)?
110
+
}
68
111
}),
69
112
)
70
113
.route(
···
72
115
get({
73
116
let store = store.clone();
74
117
move |accept, query| async {
75
-
block_in_place(|| get_distinct_dids(accept, query, store))
118
+
spawn_blocking(|| get_distinct_dids(accept, query, store))
119
+
.await
120
+
.map_err(to500)?
76
121
}
77
122
}),
78
123
)
···
82
127
get({
83
128
let store = store.clone();
84
129
move |accept, query| async {
85
-
block_in_place(|| count_all_links(accept, query, store))
130
+
spawn_blocking(|| count_all_links(accept, query, store))
131
+
.await
132
+
.map_err(to500)?
86
133
}
87
134
}),
88
135
)
···
91
138
get({
92
139
let store = store.clone();
93
140
move |accept, query| async {
94
-
block_in_place(|| explore_links(accept, query, store))
141
+
spawn_blocking(|| explore_links(accept, query, store))
142
+
.await
143
+
.map_err(to500)?
95
144
}
96
145
}),
97
146
)
···
150
199
#[template(path = "hello.html.j2")]
151
200
struct HelloReponse {
152
201
help: &'static str,
153
-
days_indexed: u64,
202
+
days_indexed: Option<u64>,
154
203
stats: StorageStats,
155
204
}
156
205
fn hello(
···
160
209
let stats = store
161
210
.get_stats()
162
211
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
163
-
let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS))
164
-
.elapsed()
212
+
let days_indexed = stats
213
+
.started_at
214
+
.map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed())
215
+
.transpose()
165
216
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
166
-
.as_secs()
167
-
/ 86400;
217
+
.map(|d| d.as_secs() / 86_400);
168
218
Ok(acceptable(accept, HelloReponse {
169
219
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
170
220
days_indexed,
···
173
223
}
174
224
175
225
#[derive(Clone, Deserialize)]
226
+
#[serde(rename_all = "camelCase")]
227
+
struct GetManyToManyCountsQuery {
228
+
subject: String,
229
+
source: String,
230
+
/// path to the secondary link in the linking record
231
+
path_to_other: String,
232
+
/// filter to linking records (join of the m2m) by these DIDs
233
+
#[serde(default)]
234
+
did: Vec<String>,
235
+
/// filter to specific secondary records
236
+
#[serde(default)]
237
+
other_subject: Vec<String>,
238
+
cursor: Option<OpaqueApiCursor>,
239
+
/// Set the max number of links to return per page of results
240
+
#[serde(default = "get_default_cursor_limit")]
241
+
limit: u64,
242
+
}
243
+
#[derive(Serialize)]
244
+
struct OtherSubjectCount {
245
+
subject: String,
246
+
total: u64,
247
+
distinct: u64,
248
+
}
249
+
#[derive(Template, Serialize)]
250
+
#[template(path = "get-many-to-many-counts.html.j2")]
251
+
struct GetManyToManyCountsResponse {
252
+
counts_by_other_subject: Vec<OtherSubjectCount>,
253
+
cursor: Option<OpaqueApiCursor>,
254
+
#[serde(skip_serializing)]
255
+
query: GetManyToManyCountsQuery,
256
+
}
257
+
fn get_many_to_many_counts(
258
+
accept: ExtractAccept,
259
+
query: axum_extra::extract::Query<GetManyToManyCountsQuery>,
260
+
store: impl LinkReader,
261
+
) -> Result<impl IntoResponse, http::StatusCode> {
262
+
let cursor_key = query
263
+
.cursor
264
+
.clone()
265
+
.map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
266
+
.transpose()?
267
+
.map(|c| c.next);
268
+
269
+
let limit = query.limit;
270
+
if limit > DEFAULT_CURSOR_LIMIT_MAX {
271
+
return Err(http::StatusCode::BAD_REQUEST);
272
+
}
273
+
274
+
let filter_dids: HashSet<Did> = HashSet::from_iter(
275
+
query
276
+
.did
277
+
.iter()
278
+
.map(|d| d.trim())
279
+
.filter(|d| !d.is_empty())
280
+
.map(|d| Did(d.to_string())),
281
+
);
282
+
283
+
let filter_other_subjects: HashSet<String> = HashSet::from_iter(
284
+
query
285
+
.other_subject
286
+
.iter()
287
+
.map(|s| s.trim().to_string())
288
+
.filter(|s| !s.is_empty()),
289
+
);
290
+
291
+
let Some((collection, path)) = query.source.split_once(':') else {
292
+
return Err(http::StatusCode::BAD_REQUEST);
293
+
};
294
+
let path = format!(".{path}");
295
+
296
+
let path_to_other = format!(".{}", query.path_to_other);
297
+
298
+
let paged = store
299
+
.get_many_to_many_counts(
300
+
&query.subject,
301
+
collection,
302
+
&path,
303
+
&path_to_other,
304
+
limit,
305
+
cursor_key,
306
+
&filter_dids,
307
+
&filter_other_subjects,
308
+
)
309
+
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
310
+
311
+
let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into());
312
+
313
+
let items = paged
314
+
.items
315
+
.into_iter()
316
+
.map(|(subject, total, distinct)| OtherSubjectCount {
317
+
subject,
318
+
total,
319
+
distinct,
320
+
})
321
+
.collect();
322
+
323
+
Ok(acceptable(
324
+
accept,
325
+
GetManyToManyCountsResponse {
326
+
counts_by_other_subject: items,
327
+
cursor,
328
+
query: (*query).clone(),
329
+
},
330
+
))
331
+
}
332
+
333
+
#[derive(Clone, Deserialize)]
176
334
struct GetLinksCountQuery {
177
335
target: String,
178
336
collection: String,
···
233
391
}
234
392
235
393
#[derive(Clone, Deserialize)]
394
+
struct GetBacklinksQuery {
395
+
/// The link target
396
+
///
397
+
/// can be an AT-URI, plain DID, or regular URI
398
+
subject: String,
399
+
/// Filter links only from this link source
400
+
///
401
+
/// eg.: `app.bsky.feed.like:subject.uri`
402
+
source: String,
403
+
cursor: Option<OpaqueApiCursor>,
404
+
/// Filter links only from these DIDs
405
+
///
406
+
/// include multiple times to filter by multiple source DIDs
407
+
#[serde(default)]
408
+
did: Vec<String>,
409
+
/// Set the max number of links to return per page of results
410
+
#[serde(default = "get_default_cursor_limit")]
411
+
limit: u64,
412
+
// TODO: allow reverse (er, forward) order as well
413
+
}
414
+
#[derive(Template, Serialize)]
415
+
#[template(path = "get-backlinks.html.j2")]
416
+
struct GetBacklinksResponse {
417
+
total: u64,
418
+
records: Vec<RecordId>,
419
+
cursor: Option<OpaqueApiCursor>,
420
+
#[serde(skip_serializing)]
421
+
query: GetBacklinksQuery,
422
+
#[serde(skip_serializing)]
423
+
collection: String,
424
+
#[serde(skip_serializing)]
425
+
path: String,
426
+
}
427
+
fn get_backlinks(
428
+
accept: ExtractAccept,
429
+
query: axum_extra::extract::Query<GetBacklinksQuery>, // supports multiple param occurrences
430
+
store: impl LinkReader,
431
+
) -> Result<impl IntoResponse, http::StatusCode> {
432
+
let until = query
433
+
.cursor
434
+
.clone()
435
+
.map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
436
+
.transpose()?
437
+
.map(|c| c.next);
438
+
439
+
let limit = query.limit;
440
+
if limit > DEFAULT_CURSOR_LIMIT_MAX {
441
+
return Err(http::StatusCode::BAD_REQUEST);
442
+
}
443
+
444
+
let filter_dids: HashSet<Did> = HashSet::from_iter(
445
+
query
446
+
.did
447
+
.iter()
448
+
.map(|d| d.trim())
449
+
.filter(|d| !d.is_empty())
450
+
.map(|d| Did(d.to_string())),
451
+
);
452
+
453
+
let Some((collection, path)) = query.source.split_once(':') else {
454
+
return Err(http::StatusCode::BAD_REQUEST);
455
+
};
456
+
let path = format!(".{path}");
457
+
458
+
let paged = store
459
+
.get_links(
460
+
&query.subject,
461
+
collection,
462
+
&path,
463
+
limit,
464
+
until,
465
+
&filter_dids,
466
+
)
467
+
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
468
+
469
+
let cursor = paged.next.map(|next| {
470
+
ApiCursor {
471
+
version: paged.version,
472
+
next,
473
+
}
474
+
.into()
475
+
});
476
+
477
+
Ok(acceptable(
478
+
accept,
479
+
GetBacklinksResponse {
480
+
total: paged.total,
481
+
records: paged.items,
482
+
cursor,
483
+
query: (*query).clone(),
484
+
collection: collection.to_string(),
485
+
path,
486
+
},
487
+
))
488
+
}
489
+
490
+
#[derive(Clone, Deserialize)]
236
491
struct GetLinkItemsQuery {
237
492
target: String,
238
493
collection: String,
···
251
506
///
252
507
/// deprecated: use `did`, which can be repeated multiple times
253
508
from_dids: Option<String>, // comma separated: gross
254
-
#[serde(default = "get_default_limit")]
509
+
#[serde(default = "get_default_cursor_limit")]
255
510
limit: u64,
256
511
// TODO: allow reverse (er, forward) order as well
257
-
}
258
-
fn get_default_limit() -> u64 {
259
-
DEFAULT_CURSOR_LIMIT
260
512
}
261
513
#[derive(Template, Serialize)]
262
514
#[template(path = "links.html.j2")]
···
475
727
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
476
728
}
477
729
}
730
+
731
+
#[derive(Serialize, Deserialize)] // for bincode
732
+
struct ApiKeyedCursor {
733
+
next: String, // the key
734
+
}
735
+
736
+
impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor {
737
+
type Error = bincode::Error;
738
+
739
+
fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
740
+
bincode::DefaultOptions::new().deserialize(&item.0)
741
+
}
742
+
}
743
+
744
+
impl From<ApiKeyedCursor> for OpaqueApiCursor {
745
+
fn from(item: ApiKeyedCursor) -> Self {
746
+
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
747
+
}
748
+
}
+78
-1
constellation/src/storage/mem_store.rs
+78
-1
constellation/src/storage/mem_store.rs
···
1
-
use super::{LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
1
+
use super::{
2
+
LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats,
3
+
};
2
4
use crate::{ActionableEvent, CountsByCount, Did, RecordId};
3
5
use anyhow::Result;
4
6
use links::CollectedLink;
···
132
134
}
133
135
134
136
impl LinkReader for MemStorage {
137
+
fn get_many_to_many_counts(
138
+
&self,
139
+
target: &str,
140
+
collection: &str,
141
+
path: &str,
142
+
path_to_other: &str,
143
+
limit: u64,
144
+
after: Option<String>,
145
+
filter_dids: &HashSet<Did>,
146
+
filter_to_targets: &HashSet<String>,
147
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
148
+
let data = self.0.lock().unwrap();
149
+
let Some(paths) = data.targets.get(&Target::new(target)) else {
150
+
return Ok(PagedOrderedCollection::default());
151
+
};
152
+
let Some(linkers) = paths.get(&Source::new(collection, path)) else {
153
+
return Ok(PagedOrderedCollection::default());
154
+
};
155
+
156
+
let path_to_other = RecordPath::new(path_to_other);
157
+
let filter_to_targets: HashSet<Target> =
158
+
HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s)));
159
+
160
+
let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new();
161
+
for (did, rkey) in linkers.iter().flatten().cloned() {
162
+
if !filter_dids.is_empty() && !filter_dids.contains(&did) {
163
+
continue;
164
+
}
165
+
if let Some(fwd_target) = data
166
+
.links
167
+
.get(&did)
168
+
.unwrap_or(&HashMap::new())
169
+
.get(&RepoId {
170
+
collection: collection.to_string(),
171
+
rkey,
172
+
})
173
+
.unwrap_or(&Vec::new())
174
+
.iter()
175
+
.filter_map(|(path, target)| {
176
+
if *path == path_to_other
177
+
&& (filter_to_targets.is_empty() || filter_to_targets.contains(target))
178
+
{
179
+
Some(target)
180
+
} else {
181
+
None
182
+
}
183
+
})
184
+
.take(1)
185
+
.next()
186
+
{
187
+
let e = grouped_counts.entry(fwd_target.clone()).or_default();
188
+
e.0 += 1;
189
+
e.1.insert(did.clone());
190
+
}
191
+
}
192
+
let mut items: Vec<(String, u64, u64)> = grouped_counts
193
+
.iter()
194
+
.map(|(k, (n, u))| (k.0.clone(), *n, u.len() as u64))
195
+
.collect();
196
+
items.sort();
197
+
items = items
198
+
.into_iter()
199
+
.skip_while(|(t, _, _)| after.as_ref().map(|a| t <= a).unwrap_or(false))
200
+
.take(limit as usize)
201
+
.collect();
202
+
let next = if items.len() as u64 >= limit {
203
+
items.last().map(|(t, _, _)| t.clone())
204
+
} else {
205
+
None
206
+
};
207
+
Ok(PagedOrderedCollection { items, next })
208
+
}
209
+
135
210
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
136
211
let data = self.0.lock().unwrap();
137
212
let Some(paths) = data.targets.get(&Target::new(target)) else {
···
353
428
dids,
354
429
targetables,
355
430
linking_records,
431
+
started_at: None,
432
+
other_data: Default::default(),
356
433
})
357
434
}
358
435
}
+225
constellation/src/storage/mod.rs
+225
constellation/src/storage/mod.rs
···
19
19
pub total: u64,
20
20
}
21
21
22
+
/// A paged collection whose keys are sorted instead of indexed
23
+
///
24
+
/// this has weaker guarantees than PagedAppendingCollection: it might
25
+
/// return a totally consistent snapshot. but it should avoid duplicates
26
+
/// and each page should at least be internally consistent.
27
+
#[derive(Debug, PartialEq, Default)]
28
+
pub struct PagedOrderedCollection<T, K: Ord> {
29
+
pub items: Vec<T>,
30
+
pub next: Option<K>,
31
+
}
32
+
22
33
#[derive(Debug, Deserialize, Serialize, PartialEq)]
23
34
pub struct StorageStats {
24
35
/// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
···
33
44
/// records with multiple links are single-counted.
34
45
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
35
46
pub linking_records: u64,
47
+
48
+
/// first jetstream cursor when this instance first started
49
+
pub started_at: Option<u64>,
50
+
51
+
/// anything else we want to throw in
52
+
pub other_data: HashMap<String, u64>,
36
53
}
37
54
38
55
pub trait LinkStorage: Send + Sync {
···
48
65
}
49
66
50
67
pub trait LinkReader: Clone + Send + Sync + 'static {
68
+
#[allow(clippy::too_many_arguments)]
69
+
fn get_many_to_many_counts(
70
+
&self,
71
+
target: &str,
72
+
collection: &str,
73
+
path: &str,
74
+
path_to_other: &str,
75
+
limit: u64,
76
+
after: Option<String>,
77
+
filter_dids: &HashSet<Did>,
78
+
filter_to_targets: &HashSet<String>,
79
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>>;
80
+
51
81
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
52
82
53
83
fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
···
1326
1356
counts
1327
1357
});
1328
1358
assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1359
+
});
1360
+
1361
+
//////// many-to-many /////////
1362
+
1363
+
test_each_storage!(get_m2m_counts_empty, |storage| {
1364
+
assert_eq!(
1365
+
storage.get_many_to_many_counts(
1366
+
"a.com",
1367
+
"a.b.c",
1368
+
".d.e",
1369
+
".f.g",
1370
+
10,
1371
+
None,
1372
+
&HashSet::new(),
1373
+
&HashSet::new(),
1374
+
)?,
1375
+
PagedOrderedCollection {
1376
+
items: vec![],
1377
+
next: None,
1378
+
}
1379
+
);
1380
+
});
1381
+
1382
+
test_each_storage!(get_m2m_counts_single, |storage| {
1383
+
storage.push(
1384
+
&ActionableEvent::CreateLinks {
1385
+
record_id: RecordId {
1386
+
did: "did:plc:asdf".into(),
1387
+
collection: "app.t.c".into(),
1388
+
rkey: "asdf".into(),
1389
+
},
1390
+
links: vec![
1391
+
CollectedLink {
1392
+
target: Link::Uri("a.com".into()),
1393
+
path: ".abc.uri".into(),
1394
+
},
1395
+
CollectedLink {
1396
+
target: Link::Uri("b.com".into()),
1397
+
path: ".def.uri".into(),
1398
+
},
1399
+
CollectedLink {
1400
+
target: Link::Uri("b.com".into()),
1401
+
path: ".ghi.uri".into(),
1402
+
},
1403
+
],
1404
+
},
1405
+
0,
1406
+
)?;
1407
+
assert_eq!(
1408
+
storage.get_many_to_many_counts(
1409
+
"a.com",
1410
+
"app.t.c",
1411
+
".abc.uri",
1412
+
".def.uri",
1413
+
10,
1414
+
None,
1415
+
&HashSet::new(),
1416
+
&HashSet::new(),
1417
+
)?,
1418
+
PagedOrderedCollection {
1419
+
items: vec![("b.com".to_string(), 1, 1)],
1420
+
next: None,
1421
+
}
1422
+
);
1423
+
});
1424
+
1425
+
test_each_storage!(get_m2m_counts_filters, |storage| {
1426
+
storage.push(
1427
+
&ActionableEvent::CreateLinks {
1428
+
record_id: RecordId {
1429
+
did: "did:plc:asdf".into(),
1430
+
collection: "app.t.c".into(),
1431
+
rkey: "asdf".into(),
1432
+
},
1433
+
links: vec![
1434
+
CollectedLink {
1435
+
target: Link::Uri("a.com".into()),
1436
+
path: ".abc.uri".into(),
1437
+
},
1438
+
CollectedLink {
1439
+
target: Link::Uri("b.com".into()),
1440
+
path: ".def.uri".into(),
1441
+
},
1442
+
],
1443
+
},
1444
+
0,
1445
+
)?;
1446
+
storage.push(
1447
+
&ActionableEvent::CreateLinks {
1448
+
record_id: RecordId {
1449
+
did: "did:plc:asdfasdf".into(),
1450
+
collection: "app.t.c".into(),
1451
+
rkey: "asdf".into(),
1452
+
},
1453
+
links: vec![
1454
+
CollectedLink {
1455
+
target: Link::Uri("a.com".into()),
1456
+
path: ".abc.uri".into(),
1457
+
},
1458
+
CollectedLink {
1459
+
target: Link::Uri("b.com".into()),
1460
+
path: ".def.uri".into(),
1461
+
},
1462
+
],
1463
+
},
1464
+
1,
1465
+
)?;
1466
+
storage.push(
1467
+
&ActionableEvent::CreateLinks {
1468
+
record_id: RecordId {
1469
+
did: "did:plc:fdsa".into(),
1470
+
collection: "app.t.c".into(),
1471
+
rkey: "asdf".into(),
1472
+
},
1473
+
links: vec![
1474
+
CollectedLink {
1475
+
target: Link::Uri("a.com".into()),
1476
+
path: ".abc.uri".into(),
1477
+
},
1478
+
CollectedLink {
1479
+
target: Link::Uri("c.com".into()),
1480
+
path: ".def.uri".into(),
1481
+
},
1482
+
],
1483
+
},
1484
+
2,
1485
+
)?;
1486
+
storage.push(
1487
+
&ActionableEvent::CreateLinks {
1488
+
record_id: RecordId {
1489
+
did: "did:plc:fdsa".into(),
1490
+
collection: "app.t.c".into(),
1491
+
rkey: "asdf2".into(),
1492
+
},
1493
+
links: vec![
1494
+
CollectedLink {
1495
+
target: Link::Uri("a.com".into()),
1496
+
path: ".abc.uri".into(),
1497
+
},
1498
+
CollectedLink {
1499
+
target: Link::Uri("c.com".into()),
1500
+
path: ".def.uri".into(),
1501
+
},
1502
+
],
1503
+
},
1504
+
3,
1505
+
)?;
1506
+
assert_eq!(
1507
+
storage.get_many_to_many_counts(
1508
+
"a.com",
1509
+
"app.t.c",
1510
+
".abc.uri",
1511
+
".def.uri",
1512
+
10,
1513
+
None,
1514
+
&HashSet::new(),
1515
+
&HashSet::new(),
1516
+
)?,
1517
+
PagedOrderedCollection {
1518
+
items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),],
1519
+
next: None,
1520
+
}
1521
+
);
1522
+
assert_eq!(
1523
+
storage.get_many_to_many_counts(
1524
+
"a.com",
1525
+
"app.t.c",
1526
+
".abc.uri",
1527
+
".def.uri",
1528
+
10,
1529
+
None,
1530
+
&HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1531
+
&HashSet::new(),
1532
+
)?,
1533
+
PagedOrderedCollection {
1534
+
items: vec![("c.com".to_string(), 2, 1),],
1535
+
next: None,
1536
+
}
1537
+
);
1538
+
assert_eq!(
1539
+
storage.get_many_to_many_counts(
1540
+
"a.com",
1541
+
"app.t.c",
1542
+
".abc.uri",
1543
+
".def.uri",
1544
+
10,
1545
+
None,
1546
+
&HashSet::new(),
1547
+
&HashSet::from_iter(["b.com".to_string()]),
1548
+
)?,
1549
+
PagedOrderedCollection {
1550
+
items: vec![("b.com".to_string(), 2, 2),],
1551
+
next: None,
1552
+
}
1553
+
);
1329
1554
});
1330
1555
}
+342
-40
constellation/src/storage/rocks_store.rs
+342
-40
constellation/src/storage/rocks_store.rs
···
1
-
use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
1
+
use super::{
2
+
ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection,
3
+
StorageStats,
4
+
};
2
5
use crate::{CountsByCount, Did, RecordId};
3
6
use anyhow::{bail, Result};
4
7
use bincode::Options as BincodeOptions;
···
11
14
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
12
15
};
13
16
use serde::{Deserialize, Serialize};
14
-
use std::collections::{HashMap, HashSet};
17
+
use std::collections::{BTreeMap, HashMap, HashSet};
15
18
use std::io::Read;
16
19
use std::marker::PhantomData;
17
20
use std::path::{Path, PathBuf};
···
20
23
Arc,
21
24
};
22
25
use std::thread;
23
-
use std::time::{Duration, Instant};
26
+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
27
use tokio_util::sync::CancellationToken;
25
28
26
29
static DID_IDS_CF: &str = "did_ids";
···
29
32
static LINK_TARGETS_CF: &str = "link_targets";
30
33
31
34
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
35
+
static STARTED_AT_KEY: &str = "jetstream_first_cursor";
36
+
// add reverse mappings for targets if this db was running before that was a thing
37
+
static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state";
38
+
39
+
static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started
40
+
41
+
#[derive(Debug, Clone, Serialize, Deserialize)]
42
+
struct TargetIdRepairState {
43
+
/// start time for repair, microseconds timestamp
44
+
current_us_started_at: u64,
45
+
/// id table's latest id when repair started
46
+
id_when_started: u64,
47
+
/// id table id
48
+
latest_repaired_i: u64,
49
+
}
50
+
impl AsRocksValue for TargetIdRepairState {}
51
+
impl ValueFromRocks for TargetIdRepairState {}
32
52
33
53
// todo: actually understand and set these options probably better
34
54
fn rocks_opts_base() -> Options {
···
56
76
#[derive(Debug, Clone)]
57
77
pub struct RocksStorage {
58
78
pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
59
-
did_id_table: IdTable<Did, DidIdValue, true>,
60
-
target_id_table: IdTable<TargetKey, TargetId, false>,
79
+
did_id_table: IdTable<Did, DidIdValue>,
80
+
target_id_table: IdTable<TargetKey, TargetId>,
61
81
is_writer: bool,
62
82
backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
63
83
}
···
85
105
fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
86
106
ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
87
107
}
88
-
fn init<const WITH_REVERSE: bool>(
89
-
self,
90
-
db: &DBWithThreadMode<MultiThreaded>,
91
-
) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> {
108
+
fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> {
92
109
if db.cf_handle(&self.name).is_none() {
93
110
bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
94
111
}
···
119
136
}
120
137
}
121
138
#[derive(Debug, Clone)]
122
-
struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool>
139
+
struct IdTable<Orig, IdVal: IdTableValue>
123
140
where
124
141
Orig: KeyFromRocks,
125
142
for<'a> &'a Orig: AsRocksKey,
···
127
144
base: IdTableBase<Orig, IdVal>,
128
145
priv_id_seq: u64,
129
146
}
130
-
impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE>
147
+
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal>
131
148
where
132
149
Orig: KeyFromRocks,
133
150
for<'v> &'v IdVal: AsRocksValue,
···
139
156
_key_marker: PhantomData,
140
157
_val_marker: PhantomData,
141
158
name: name.into(),
142
-
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1
159
+
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1
143
160
}
144
161
}
145
162
fn get_id_val(
···
178
195
id_value
179
196
}))
180
197
}
198
+
181
199
fn estimate_count(&self) -> u64 {
182
200
self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
183
201
}
184
-
}
185
-
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true>
186
-
where
187
-
Orig: KeyFromRocks,
188
-
for<'v> &'v IdVal: AsRocksValue,
189
-
for<'k> &'k Orig: AsRocksKey,
190
-
{
202
+
191
203
fn get_or_create_id_val(
192
204
&mut self,
193
205
db: &DBWithThreadMode<MultiThreaded>,
···
215
227
}
216
228
}
217
229
}
218
-
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false>
219
-
where
220
-
Orig: KeyFromRocks,
221
-
for<'v> &'v IdVal: AsRocksValue,
222
-
for<'k> &'k Orig: AsRocksKey,
223
-
{
224
-
fn get_or_create_id_val(
225
-
&mut self,
226
-
db: &DBWithThreadMode<MultiThreaded>,
227
-
batch: &mut WriteBatch,
228
-
orig: &Orig,
229
-
) -> Result<IdVal> {
230
-
let cf = db.cf_handle(&self.base.name).unwrap();
231
-
self.__get_or_create_id_val(&cf, db, batch, orig)
232
-
}
233
-
}
234
230
235
231
impl IdTableValue for DidIdValue {
236
232
fn new(v: u64) -> Self {
···
249
245
}
250
246
}
251
247
248
+
fn now() -> u64 {
249
+
SystemTime::now()
250
+
.duration_since(UNIX_EPOCH)
251
+
.unwrap()
252
+
.as_micros() as u64
253
+
}
254
+
252
255
impl RocksStorage {
253
256
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
254
257
Self::describe_metrics();
255
-
RocksStorage::open_readmode(path, false)
258
+
let me = RocksStorage::open_readmode(path, false)?;
259
+
me.global_init()?;
260
+
Ok(me)
256
261
}
257
262
258
263
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
260
265
}
261
266
262
267
fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
263
-
let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF);
264
-
let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF);
268
+
let did_id_table = IdTable::setup(DID_IDS_CF);
269
+
let target_id_table = IdTable::setup(TARGET_IDS_CF);
265
270
271
+
// note: global stuff like jetstream cursor goes in the default cf
272
+
// these are bonus extra cfs
266
273
let cfs = vec![
267
274
// id reference tables
268
275
did_id_table.cf_descriptor(),
···
296
303
is_writer: !readonly,
297
304
backup_task: None.into(),
298
305
})
306
+
}
307
+
308
+
fn global_init(&self) -> Result<()> {
309
+
let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
310
+
if first_run {
311
+
self.db.put(STARTED_AT_KEY, _rv(now()))?;
312
+
313
+
// hack / temporary: if we're a new db, put in a completed repair
314
+
// state so we don't run repairs (repairs are for old-code dbs)
315
+
let completed = TargetIdRepairState {
316
+
id_when_started: 0,
317
+
current_us_started_at: 0,
318
+
latest_repaired_i: 0,
319
+
};
320
+
self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
321
+
}
322
+
Ok(())
323
+
}
324
+
325
+
pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
326
+
let mut state = match self
327
+
.db
328
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
329
+
.map(|s| _vr(&s))
330
+
.transpose()?
331
+
{
332
+
Some(s) => s,
333
+
None => TargetIdRepairState {
334
+
id_when_started: self.did_id_table.priv_id_seq,
335
+
current_us_started_at: now(),
336
+
latest_repaired_i: 0,
337
+
},
338
+
};
339
+
340
+
eprintln!("initial repair state: {state:?}");
341
+
342
+
let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
343
+
344
+
let mut iter = self.db.raw_iterator_cf(&cf);
345
+
iter.seek_to_first();
346
+
347
+
eprintln!("repair iterator sent to first key");
348
+
349
+
// skip ahead if we're done some, or take a single first step
350
+
for _ in 0..state.latest_repaired_i {
351
+
iter.next();
352
+
}
353
+
354
+
eprintln!(
355
+
"repair iterator skipped to {}th key",
356
+
state.latest_repaired_i
357
+
);
358
+
359
+
let mut maybe_done = false;
360
+
361
+
let mut write_fast = rocksdb::WriteOptions::default();
362
+
write_fast.set_sync(false);
363
+
write_fast.disable_wal(true);
364
+
365
+
while !stay_alive.is_cancelled() && !maybe_done {
366
+
// let mut batch = WriteBatch::default();
367
+
368
+
let mut any_written = false;
369
+
370
+
for _ in 0..1000 {
371
+
if state.latest_repaired_i % 1_000_000 == 0 {
372
+
eprintln!("target iter at {}", state.latest_repaired_i);
373
+
}
374
+
state.latest_repaired_i += 1;
375
+
376
+
if !iter.valid() {
377
+
eprintln!("invalid iter, are we done repairing?");
378
+
maybe_done = true;
379
+
break;
380
+
};
381
+
382
+
// eprintln!("iterator seems to be valid! getting the key...");
383
+
let raw_key = iter.key().unwrap();
384
+
if raw_key.len() == 8 {
385
+
// eprintln!("found an 8-byte key, skipping it since it's probably an id...");
386
+
iter.next();
387
+
continue;
388
+
}
389
+
let target: TargetKey = _kr::<TargetKey>(raw_key)?;
390
+
let target_id: TargetId = _vr(iter.value().unwrap())?;
391
+
392
+
self.db
393
+
.put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
394
+
any_written = true;
395
+
iter.next();
396
+
}
397
+
398
+
if any_written {
399
+
self.db
400
+
.put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
401
+
std::thread::sleep(breather);
402
+
}
403
+
}
404
+
405
+
eprintln!("repair iterator done.");
406
+
407
+
Ok(false)
299
408
}
300
409
301
410
pub fn start_backup(
···
826
935
}
827
936
828
937
impl LinkReader for RocksStorage {
938
+
fn get_many_to_many_counts(
939
+
&self,
940
+
target: &str,
941
+
collection: &str,
942
+
path: &str,
943
+
path_to_other: &str,
944
+
limit: u64,
945
+
after: Option<String>,
946
+
filter_dids: &HashSet<Did>,
947
+
filter_to_targets: &HashSet<String>,
948
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
949
+
let collection = Collection(collection.to_string());
950
+
let path = RPath(path.to_string());
951
+
952
+
let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone());
953
+
954
+
// unfortunately the cursor is a, uh, stringified number.
955
+
// this was easier for the memstore (plain target, not target id), and
956
+
// making it generic is a bit awful.
957
+
// so... parse the number out of a string here :(
958
+
// TODO: this should bubble up to a BAD_REQUEST response
959
+
let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
960
+
961
+
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
962
+
eprintln!("nothin doin for this target, {target_key:?}");
963
+
return Ok(Default::default());
964
+
};
965
+
966
+
let filter_did_ids: HashMap<DidId, bool> = filter_dids
967
+
.iter()
968
+
.filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
969
+
.collect::<Result<Vec<DidIdValue>>>()?
970
+
.into_iter()
971
+
.map(|DidIdValue(id, active)| (id, active))
972
+
.collect();
973
+
974
+
// stored targets are keyed by triples of (target, collection, path).
975
+
// target filtering only consideres the target itself, so we actually
976
+
// need to do a prefix iteration of all target ids for this target and
977
+
// keep them all.
978
+
// i *think* the number of keys at a target prefix should usually be
979
+
// pretty small, so this is hopefully fine. but if it turns out to be
980
+
// large, we can push this filtering back into the main links loop and
981
+
// do forward db queries per backlink to get the raw target back out.
982
+
let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new();
983
+
for t in filter_to_targets {
984
+
for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) {
985
+
filter_to_target_ids.insert(target_id);
986
+
}
987
+
}
988
+
989
+
let linkers = self.get_target_linkers(&target_id)?;
990
+
991
+
let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new();
992
+
993
+
for (did_id, rkey) in linkers.0 {
994
+
if did_id.is_empty() {
995
+
continue;
996
+
}
997
+
998
+
if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) {
999
+
continue;
1000
+
}
1001
+
1002
+
let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey);
1003
+
let Some(targets) = self.get_record_link_targets(&record_link_key)? else {
1004
+
continue;
1005
+
};
1006
+
1007
+
let Some(fwd_target) = targets
1008
+
.0
1009
+
.into_iter()
1010
+
.filter_map(|RecordLinkTarget(rpath, target_id)| {
1011
+
if rpath.0 == path_to_other
1012
+
&& (filter_to_target_ids.is_empty()
1013
+
|| filter_to_target_ids.contains(&target_id))
1014
+
{
1015
+
Some(target_id)
1016
+
} else {
1017
+
None
1018
+
}
1019
+
})
1020
+
.take(1)
1021
+
.next()
1022
+
else {
1023
+
eprintln!("no forward match");
1024
+
continue;
1025
+
};
1026
+
1027
+
// small relief: we page over target ids, so we can already bail
1028
+
// reprocessing previous pages here
1029
+
if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) {
1030
+
continue;
1031
+
}
1032
+
1033
+
// aand we can skip target ids that must be on future pages
1034
+
// (this check continues after the did-lookup, which we have to do)
1035
+
let page_is_full = grouped_counts.len() as u64 >= limit;
1036
+
if page_is_full {
1037
+
let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh
1038
+
if fwd_target > *current_max {
1039
+
continue;
1040
+
}
1041
+
}
1042
+
1043
+
// bit painful: 2-step lookup to make sure this did is active
1044
+
let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
1045
+
eprintln!("failed to look up did from did_id {did_id:?}");
1046
+
continue;
1047
+
};
1048
+
let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
1049
+
eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1050
+
continue;
1051
+
};
1052
+
if !active {
1053
+
continue;
1054
+
}
1055
+
1056
+
// page-management, continued
1057
+
// if we have a full page, and we're inserting a *new* key less than
1058
+
// the current max, then we can evict the current max
1059
+
let mut should_evict = false;
1060
+
let entry = grouped_counts.entry(fwd_target.clone()).or_insert_with(|| {
1061
+
// this is a *new* key, so kick the max if we're full
1062
+
should_evict = page_is_full;
1063
+
Default::default()
1064
+
});
1065
+
entry.0 += 1;
1066
+
entry.1.insert(did_id);
1067
+
1068
+
if should_evict {
1069
+
grouped_counts.pop_last();
1070
+
}
1071
+
}
1072
+
1073
+
let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len());
1074
+
for (target_id, (n, dids)) in &grouped_counts {
1075
+
let Some(target) = self
1076
+
.target_id_table
1077
+
.get_val_from_id(&self.db, target_id.0)?
1078
+
else {
1079
+
eprintln!("failed to look up target from target_id {target_id:?}");
1080
+
continue;
1081
+
};
1082
+
items.push((target.0 .0, *n, dids.len() as u64));
1083
+
}
1084
+
1085
+
let next = if grouped_counts.len() as u64 >= limit {
1086
+
// yeah.... it's a number saved as a string......sorry
1087
+
grouped_counts
1088
+
.keys()
1089
+
.next_back()
1090
+
.map(|k| format!("{}", k.0))
1091
+
} else {
1092
+
None
1093
+
};
1094
+
1095
+
Ok(PagedOrderedCollection { items, next })
1096
+
}
1097
+
829
1098
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
830
1099
let target_key = TargetKey(
831
1100
Target(target.to_string()),
···
1042
1311
.map(|s| s.parse::<u64>())
1043
1312
.transpose()?
1044
1313
.unwrap_or(0);
1314
+
let started_at = self
1315
+
.db
1316
+
.get(STARTED_AT_KEY)?
1317
+
.map(|c| _vr(&c))
1318
+
.transpose()?
1319
+
.unwrap_or(COZY_FIRST_CURSOR);
1320
+
1321
+
let other_data = self
1322
+
.db
1323
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
1324
+
.map(|s| _vr(&s))
1325
+
.transpose()?
1326
+
.map(
1327
+
|TargetIdRepairState {
1328
+
current_us_started_at,
1329
+
id_when_started,
1330
+
latest_repaired_i,
1331
+
}| {
1332
+
HashMap::from([
1333
+
("current_us_started_at".to_string(), current_us_started_at),
1334
+
("id_when_started".to_string(), id_when_started),
1335
+
("latest_repaired_i".to_string(), latest_repaired_i),
1336
+
])
1337
+
},
1338
+
)
1339
+
.unwrap_or(HashMap::default());
1340
+
1045
1341
Ok(StorageStats {
1046
1342
dids,
1047
1343
targetables,
1048
1344
linking_records,
1345
+
started_at: Some(started_at),
1346
+
other_data,
1049
1347
})
1050
1348
}
1051
1349
}
···
1071
1369
impl AsRocksValue for &TargetId {}
1072
1370
impl KeyFromRocks for TargetKey {}
1073
1371
impl ValueFromRocks for TargetId {}
1372
+
1373
+
// temp?
1374
+
impl KeyFromRocks for TargetId {}
1375
+
impl AsRocksValue for &TargetKey {}
1074
1376
1075
1377
// target_links table
1076
1378
impl AsRocksKey for &TargetId {}
···
1142
1444
}
1143
1445
1144
1446
// target ids
1145
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1447
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
1146
1448
struct TargetId(u64); // key
1147
1449
1148
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1450
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1149
1451
pub struct Target(pub String); // the actual target/uri
1150
1452
1151
1453
// targets (uris, dids, etc.): the reverse index
+1
-1
constellation/templates/dids.html.j2
+1
-1
constellation/templates/dids.html.j2
···
27
27
{% for did in linking_dids %}
28
28
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }}
29
29
-> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a>
30
-
-> browse <a href="https://atproto-browser-plus-links.vercel.app/at/{{ did.0|urlencode }}">this DID record</a></pre>
30
+
-> browse <a href="https://pdsls.dev/at://{{ did.0 }}">this DID record</a></pre>
31
31
{% endfor %}
32
32
33
33
{% if let Some(c) = cursor %}
+54
constellation/templates/get-backlinks.html.j2
+54
constellation/templates/get-backlinks.html.j2
···
1
+
{% extends "base.html.j2" %}
2
+
{% import "try-it-macros.html.j2" as try_it %}
3
+
4
+
{% block title %}Backlinks{% endblock %}
5
+
{% block description %}All {{ query.source }} records with links to {{ query.subject }}{% endblock %}
6
+
7
+
{% block content %}
8
+
9
+
{% call try_it::get_backlinks(query.subject, query.source, query.did, query.limit) %}
10
+
11
+
<h2>
12
+
Links to <code>{{ query.subject }}</code>
13
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
14
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
15
+
{% endif %}
16
+
</h2>
17
+
18
+
<p><strong>{{ total|human_number }} links</strong> from <code>{{ query.source }}</code>.</p>
19
+
20
+
<ul>
21
+
<li>See distinct linking DIDs at <code>/links/distinct-dids</code>: <a href="/links/distinct-dids?target={{ query.subject|urlencode }}&collection={{ collection|urlencode }}&path={{ path|urlencode }}">/links/distinct-dids?target={{ query.subject }}&collection={{ collection }}&path={{ path }}</a></li>
22
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
23
+
</ul>
24
+
25
+
<h3>Links, most recent first:</h3>
26
+
27
+
{% for record in records %}
28
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ record.did().0 }} (<a href="/links/all?target={{ record.did().0|urlencode }}">DID links</a>)
29
+
<strong>Collection</strong>: {{ record.collection }}
30
+
<strong>RKey</strong>: {{ record.rkey }}
31
+
-> <a href="https://pdsls.dev/at://{{ record.did().0 }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre>
32
+
{% endfor %}
33
+
34
+
{% if let Some(c) = cursor %}
35
+
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
36
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
37
+
<input type="hidden" name="source" value="{{ query.source }}" />
38
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
39
+
{% for did in query.did %}
40
+
<input type="hidden" name="did" value="{{ did }}" />
41
+
{% endfor %}
42
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
43
+
<button type="submit">next page…</button>
44
+
</form>
45
+
{% else %}
46
+
<button disabled><em>end of results</em></button>
47
+
{% endif %}
48
+
49
+
<details>
50
+
<summary>Raw JSON response</summary>
51
+
<pre class="code">{{ self|tojson }}</pre>
52
+
</details>
53
+
54
+
{% endblock %}
+67
constellation/templates/get-many-to-many-counts.html.j2
+67
constellation/templates/get-many-to-many-counts.html.j2
···
1
+
{% extends "base.html.j2" %}
2
+
{% import "try-it-macros.html.j2" as try_it %}
3
+
4
+
{% block title %}Many to Many counts{% endblock %}
5
+
{% block description %}Counts of many-to-many {{ query.source }} join records with links to {{ query.subject }} and a secondary target at {{ query.path_to_other }}{% endblock %}
6
+
7
+
{% block content %}
8
+
9
+
{% call try_it::get_many_to_many_counts(
10
+
query.subject,
11
+
query.source,
12
+
query.path_to_other,
13
+
query.did,
14
+
query.other_subject,
15
+
query.limit,
16
+
) %}
17
+
18
+
<h2>
19
+
Many-to-many links to <code>{{ query.subject }}</code> joining through <code>{{ query.path_to_other }}</code>
20
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
21
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
22
+
{% endif %}
23
+
</h2>
24
+
25
+
<p><strong>{% if cursor.is_some() || query.cursor.is_some() %}more than {% endif %}{{ counts_by_other_subject.len()|to_u64|human_number }} joins</strong> <code>{{ query.source }}โ{{ query.path_to_other }}</code></p>
26
+
27
+
<ul>
28
+
<li>See direct backlinks at <code>/xrpc/blue.microcosm.links.getBacklinks</code>: <a href="/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject|urlencode }}&source={{ query.source|urlencode }}">/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject }}&source={{ query.source }}</a></li>
29
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
30
+
</ul>
31
+
32
+
<h3>Counts by other subject:</h3>
33
+
34
+
{% for counts in counts_by_other_subject %}
35
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>Joined subject</strong>: {{ counts.subject }}
36
+
<strong>Joining records</strong>: {{ counts.total }}
37
+
<strong>Unique joiner ids</strong>: {{ counts.distinct }}
38
+
-> {% if let Some(browseable_uri) = counts.subject|to_browseable -%}
39
+
<a href="{{ browseable_uri }}">browse record</a>
40
+
{%- endif %}</pre>
41
+
{% endfor %}
42
+
43
+
{% if let Some(c) = cursor %}
44
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
45
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
46
+
<input type="hidden" name="source" value="{{ query.source }}" />
47
+
<input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" />
48
+
{% for did in query.did %}
49
+
<input type="hidden" name="did" value="{{ did }}" />
50
+
{% endfor %}
51
+
{% for otherSubject in query.other_subject %}
52
+
<input type="hidden" name="otherSubject" value="{{ otherSubject }}" />
53
+
{% endfor %}
54
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
55
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
56
+
<button type="submit">next page…</button>
57
+
</form>
58
+
{% else %}
59
+
<button disabled><em>end of results</em></button>
60
+
{% endif %}
61
+
62
+
<details>
63
+
<summary>Raw JSON response</summary>
64
+
<pre class="code">{{ self|tojson }}</pre>
65
+
</details>
66
+
67
+
{% endblock %}
+57
-2
constellation/templates/hello.html.j2
+57
-2
constellation/templates/hello.html.j2
···
19
19
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
20
20
21
21
<p>
22
-
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">{{ days_indexed|human_number }}</span> days.<br/>
22
+
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">
23
+
{%- if let Some(days) = days_indexed %}
24
+
{{ days|human_number }}
25
+
{% else %}
26
+
???
27
+
{% endif -%}
28
+
</span> days.<br/>
23
29
<small>(indexing new records in real time, backfill coming soon!)</small>
24
30
</p>
25
31
26
-
<p>But feel free to use it! If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
32
+
{# {% for k, v in stats.other_data.iter() %}
33
+
<p><strong>{{ k }}</strong>: {{ v }}</p>
34
+
{% endfor %} #}
35
+
36
+
<p>You're welcome to use this public instance! Please do not build the torment nexus. If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
27
37
28
38
29
39
<h2>API Endpoints</h2>
30
40
41
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getBacklinks</code></h3>
42
+
43
+
<p>A list of records linking to any record, identity, or uri.</p>
44
+
45
+
<h4>Query parameters:</h4>
46
+
47
+
<ul>
48
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
49
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
50
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
51
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
52
+
</ul>
53
+
54
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
55
+
{% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %}
56
+
57
+
58
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3>
59
+
60
+
<p>TODO: description</p>
61
+
62
+
<h4>Query parameters:</h4>
63
+
64
+
<ul>
65
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
66
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
67
+
<li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li>
68
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
69
+
<li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple users. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
70
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
71
+
</ul>
72
+
73
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
74
+
{% call try_it::get_many_to_many_counts(
75
+
"at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue",
76
+
"sh.tangled.label.op:add[].key",
77
+
"subject",
78
+
[""],
79
+
[""],
80
+
25,
81
+
) %}
82
+
83
+
31
84
<h3 class="route"><code>GET /links</code></h3>
32
85
33
86
<p>A list of records linking to a target.</p>
87
+
88
+
<p>[DEPRECATED]: use <code>GET /xrpc/blue.microcosm.links.getBacklinks</code>. New apps should avoid it, but this endpoint <strong>will</strong> remain supported for the forseeable future.</p>
34
89
35
90
<h4>Query parameters:</h4>
36
91
+1
-1
constellation/templates/links.html.j2
+1
-1
constellation/templates/links.html.j2
···
28
28
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ record.did().0 }} (<a href="/links/all?target={{ record.did().0|urlencode }}">DID links</a>)
29
29
<strong>Collection</strong>: {{ record.collection }}
30
30
<strong>RKey</strong>: {{ record.rkey }}
31
-
-> <a href="https://atproto-browser-plus-links.vercel.app/at/{{ record.did().0|urlencode }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre>
31
+
-> <a href="https://pdsls.dev/at://{{ record.did().0 }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre>
32
32
{% endfor %}
33
33
34
34
{% if let Some(c) = cursor %}
+68
-1
constellation/templates/try-it-macros.html.j2
+68
-1
constellation/templates/try-it-macros.html.j2
···
1
+
{% macro get_backlinks(subject, source, dids, limit) %}
2
+
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
3
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks
4
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
5
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
6
+
{%- for did in dids %}{% if !did.is_empty() %}
7
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
8
+
<span id="did-placeholder"></span> <button id="add-did">+ did filter</button>
9
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
10
+
</form>
11
+
<script>
12
+
const addDidButton = document.getElementById('add-did');
13
+
const didPlaceholder = document.getElementById('did-placeholder');
14
+
addDidButton.addEventListener('click', e => {
15
+
e.preventDefault();
16
+
const i = document.createElement('input');
17
+
i.placeholder = 'did:plc:...';
18
+
i.name = "did"
19
+
const p = addDidButton.parentNode;
20
+
p.insertBefore(document.createTextNode('&did= '), didPlaceholder);
21
+
p.insertBefore(i, didPlaceholder);
22
+
p.insertBefore(document.createTextNode('\n '), didPlaceholder);
23
+
});
24
+
</script>
25
+
{% endmacro %}
26
+
27
+
{% macro get_many_to_many_counts(subject, source, pathToOther, dids, otherSubjects, limit) %}
28
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
29
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToManyCounts
30
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
31
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
32
+
&pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing.uri" />
33
+
{%- for did in dids %}{% if !did.is_empty() %}
34
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
35
+
<span id="m2m-subject-placeholder"></span> <button id="m2m-add-subject">+ other subject filter</button>
36
+
{%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %}
37
+
&otherSubject= <input type="text" name="did" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %}
38
+
<span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button>
39
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
40
+
</form>
41
+
<script>
42
+
const m2mAddDidButton = document.getElementById('m2m-add-did');
43
+
const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder');
44
+
m2mAddDidButton.addEventListener('click', e => {
45
+
e.preventDefault();
46
+
const i = document.createElement('input');
47
+
i.placeholder = 'did:plc:...';
48
+
i.name = "did"
49
+
const p = m2mAddDidButton.parentNode;
50
+
p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder);
51
+
p.insertBefore(i, m2mDidPlaceholder);
52
+
p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder);
53
+
});
54
+
const m2mAddSubjectButton = document.getElementById('m2m-add-subject');
55
+
const m2mSubjectPlaceholder = document.getElementById('m2m-subject-placeholder');
56
+
m2mAddSubjectButton.addEventListener('click', e => {
57
+
e.preventDefault();
58
+
const i = document.createElement('input');
59
+
i.placeholder = 'at-uri, did, uri...';
60
+
i.name = "otherSubject"
61
+
const p = m2mAddSubjectButton.parentNode;
62
+
p.insertBefore(document.createTextNode('&otherSubject= '), m2mSubjectPlaceholder);
63
+
p.insertBefore(i, m2mSubjectPlaceholder);
64
+
p.insertBefore(document.createTextNode('\n '), m2mSubjectPlaceholder);
65
+
});
66
+
</script>
67
+
{% endmacro %}
68
+
1
69
{% macro links(target, collection, path, dids, limit) %}
2
70
<form method="get" action="/links">
3
71
<pre class="code"><strong>GET</strong> /links
···
24
92
});
25
93
</script>
26
94
{% endmacro %}
27
-
28
95
29
96
{% macro dids(target, collection, path) %}
30
97
<form method="get" action="/links/distinct-dids">
+95
lexicons/blue.microcosm/links/getBacklinks.json
+95
lexicons/blue.microcosm/links/getBacklinks.json
···
1
+
{
2
+
"lexicon": 1,
3
+
"id": "blue.microcosm.links.getBacklinks",
4
+
"defs": {
5
+
"main": {
6
+
"type": "query",
7
+
"description": "a list of records linking to any record, identity, or uri",
8
+
"parameters": {
9
+
"type": "params",
10
+
"required": [
11
+
"subject",
12
+
"source"
13
+
],
14
+
"properties": {
15
+
"subject": {
16
+
"type": "string",
17
+
"format": "uri",
18
+
"description": "the target being linked to (at-uri, did, or uri)"
19
+
},
20
+
"source": {
21
+
"type": "string",
22
+
"description": "collection and path specification (e.g., 'app.bsky.feed.like:subject.uri')"
23
+
},
24
+
"did": {
25
+
"type": "array",
26
+
"description": "filter links to those from specific users",
27
+
"items": {
28
+
"type": "string",
29
+
"format": "did"
30
+
}
31
+
},
32
+
"limit": {
33
+
"type": "integer",
34
+
"minimum": 1,
35
+
"maximum": 100,
36
+
"default": 16,
37
+
"description": "number of results to return"
38
+
}
39
+
}
40
+
},
41
+
"output": {
42
+
"encoding": "application/json",
43
+
"schema": {
44
+
"type": "object",
45
+
"required": [
46
+
"total",
47
+
"records"
48
+
],
49
+
"properties": {
50
+
"total": {
51
+
"type": "integer",
52
+
"description": "total number of matching links"
53
+
},
54
+
"records": {
55
+
"type": "array",
56
+
"items": {
57
+
"type": "ref",
58
+
"ref": "#linkRecord"
59
+
}
60
+
},
61
+
"cursor": {
62
+
"type": "string",
63
+
"description": "pagination cursor"
64
+
}
65
+
}
66
+
}
67
+
}
68
+
},
69
+
"linkRecord": {
70
+
"type": "object",
71
+
"required": [
72
+
"did",
73
+
"collection",
74
+
"rkey"
75
+
],
76
+
"properties": {
77
+
"did": {
78
+
"type": "string",
79
+
"format": "did",
80
+
"description": "the DID of the linking record's repository"
81
+
},
82
+
"collection": {
83
+
"type": "string",
84
+
"format": "nsid",
85
+
"description": "the collection of the linking record"
86
+
},
87
+
"rkey": {
88
+
"type": "string",
89
+
"format": "record-key",
90
+
"description": "the record key of the linking record"
91
+
}
92
+
}
93
+
}
94
+
}
95
+
}
+99
lexicons/blue.microcosm/links/getManyToManyCounts.json
+99
lexicons/blue.microcosm/links/getManyToManyCounts.json
···
1
+
{
2
+
"lexicon": 1,
3
+
"id": "blue.microcosm.links.getManyToManyCounts",
4
+
"defs": {
5
+
"main": {
6
+
"type": "query",
7
+
"description": "count many-to-many relationships with secondary link paths",
8
+
"parameters": {
9
+
"type": "params",
10
+
"required": [
11
+
"subject",
12
+
"source",
13
+
"pathToOther"
14
+
],
15
+
"properties": {
16
+
"subject": {
17
+
"type": "string",
18
+
"format": "uri",
19
+
"description": "the primary target being linked to (at-uri, did, or uri)"
20
+
},
21
+
"source": {
22
+
"type": "string",
23
+
"description": "collection and path specification for the primary link"
24
+
},
25
+
"pathToOther": {
26
+
"type": "string",
27
+
"description": "path to the secondary link in the many-to-many record (e.g., 'otherThing.uri')"
28
+
},
29
+
"did": {
30
+
"type": "array",
31
+
"description": "filter links to those from specific users",
32
+
"items": {
33
+
"type": "string",
34
+
"format": "did"
35
+
}
36
+
},
37
+
"otherSubject": {
38
+
"type": "array",
39
+
"description": "filter secondary links to specific subjects",
40
+
"items": {
41
+
"type": "string"
42
+
}
43
+
},
44
+
"limit": {
45
+
"type": "integer",
46
+
"minimum": 1,
47
+
"maximum": 100,
48
+
"default": 16,
49
+
"description": "number of results to return"
50
+
}
51
+
}
52
+
},
53
+
"output": {
54
+
"encoding": "application/json",
55
+
"schema": {
56
+
"type": "object",
57
+
"required": [
58
+
"counts_by_other_subject"
59
+
],
60
+
"properties": {
61
+
"counts_by_other_subject": {
62
+
"type": "array",
63
+
"items": {
64
+
"type": "ref",
65
+
"ref": "#countBySubject"
66
+
}
67
+
},
68
+
"cursor": {
69
+
"type": "string",
70
+
"description": "pagination cursor"
71
+
}
72
+
}
73
+
}
74
+
}
75
+
},
76
+
"countBySubject": {
77
+
"type": "object",
78
+
"required": [
79
+
"subject",
80
+
"total",
81
+
"distinct"
82
+
],
83
+
"properties": {
84
+
"subject": {
85
+
"type": "string",
86
+
"description": "the secondary subject being counted"
87
+
},
88
+
"total": {
89
+
"type": "integer",
90
+
"description": "total number of links to this subject"
91
+
},
92
+
"distinct": {
93
+
"type": "integer",
94
+
"description": "number of distinct DIDs linking to this subject"
95
+
}
96
+
}
97
+
}
98
+
}
99
+
}
+56
lexicons/com.bad-example/identity/resolveMiniDoc.json
+56
lexicons/com.bad-example/identity/resolveMiniDoc.json
···
1
+
{
2
+
"lexicon": 1,
3
+
"id": "com.bad-example.identity.resolveMiniDoc",
4
+
"defs": {
5
+
"main": {
6
+
"type": "query",
7
+
"description": "like com.atproto.identity.resolveIdentity but instead of the full didDoc it returns an atproto-relevant subset",
8
+
"parameters": {
9
+
"type": "params",
10
+
"required": [
11
+
"identifier"
12
+
],
13
+
"properties": {
14
+
"identifier": {
15
+
"type": "string",
16
+
"format": "at-identifier",
17
+
"description": "handle or DID to resolve"
18
+
}
19
+
}
20
+
},
21
+
"output": {
22
+
"encoding": "application/json",
23
+
"schema": {
24
+
"type": "object",
25
+
"required": [
26
+
"did",
27
+
"handle",
28
+
"pds",
29
+
"signing_key"
30
+
],
31
+
"properties": {
32
+
"did": {
33
+
"type": "string",
34
+
"format": "did",
35
+
"description": "DID, bi-directionally verified if a handle was provided in the query"
36
+
},
37
+
"handle": {
38
+
"type": "string",
39
+
"format": "handle",
40
+
"description": "the validated handle of the account or 'handle.invalid' if the handle did not bi-directionally match the DID document"
41
+
},
42
+
"pds": {
43
+
"type": "string",
44
+
"format": "uri",
45
+
"description": "the identity's PDS URL"
46
+
},
47
+
"signing_key": {
48
+
"type": "string",
49
+
"description": "the atproto signing key publicKeyMultibase"
50
+
}
51
+
}
52
+
}
53
+
}
54
+
}
55
+
}
56
+
}
+54
lexicons/com.bad-example/repo/getUriRecord.json
+54
lexicons/com.bad-example/repo/getUriRecord.json
···
1
+
{
2
+
"lexicon": 1,
3
+
"id": "com.bad-example.repo.getUriRecord",
4
+
"defs": {
5
+
"main": {
6
+
"type": "query",
7
+
"description": "ergonomic complement to com.atproto.repo.getRecord which accepts an at-uri instead of individual repo/collection/rkey params",
8
+
"parameters": {
9
+
"type": "params",
10
+
"required": [
11
+
"at_uri"
12
+
],
13
+
"properties": {
14
+
"at_uri": {
15
+
"type": "string",
16
+
"format": "at-uri",
17
+
"description": "the at-uri of the record (identifier can be a DID or handle)"
18
+
},
19
+
"cid": {
20
+
"type": "string",
21
+
"format": "cid",
22
+
"description": "optional CID of the version of the record. if not specified, return the most recent version. if specified and a newer version exists, returns 404."
23
+
}
24
+
}
25
+
},
26
+
"output": {
27
+
"encoding": "application/json",
28
+
"schema": {
29
+
"type": "object",
30
+
"required": [
31
+
"uri",
32
+
"value"
33
+
],
34
+
"properties": {
35
+
"uri": {
36
+
"type": "string",
37
+
"format": "at-uri",
38
+
"description": "at-uri for this record"
39
+
},
40
+
"cid": {
41
+
"type": "string",
42
+
"format": "cid",
43
+
"description": "CID for this exact version of the record"
44
+
},
45
+
"value": {
46
+
"type": "unknown",
47
+
"description": "the record itself"
48
+
}
49
+
}
50
+
}
51
+
}
52
+
}
53
+
}
54
+
}
+11
-11
slingshot/src/server.rs
+11
-11
slingshot/src/server.rs
···
437
437
Ok(did) => did,
438
438
Err(_) => {
439
439
let Ok(alleged_handle) = Handle::new(identifier) else {
440
-
return invalid("identifier was not a valid DID or handle");
440
+
return invalid("Identifier was not a valid DID or handle");
441
441
};
442
442
443
443
match self.identity.handle_to_did(alleged_handle.clone()).await {
···
453
453
Err(e) => {
454
454
log::debug!("failed to resolve handle: {e}");
455
455
// TODO: ServerError not BadRequest
456
-
return invalid("errored while trying to resolve handle to DID");
456
+
return invalid("Errored while trying to resolve handle to DID");
457
457
}
458
458
}
459
459
}
460
460
};
461
461
let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
462
-
return invalid("failed to get DID doc");
462
+
return invalid("Failed to get DID doc");
463
463
};
464
464
let Some(partial_doc) = partial_doc else {
465
-
return invalid("failed to find DID doc");
465
+
return invalid("Failed to find DID doc");
466
466
};
467
467
468
468
// ok so here's where we're at:
···
483
483
.handle_to_did(partial_doc.unverified_handle.clone())
484
484
.await
485
485
else {
486
-
return invalid("failed to get did doc's handle");
486
+
return invalid("Failed to get DID doc's handle");
487
487
};
488
488
let Some(handle_did) = handle_did else {
489
-
return invalid("failed to resolve did doc's handle");
489
+
return invalid("Failed to resolve DID doc's handle");
490
490
};
491
491
if handle_did == did {
492
492
partial_doc.unverified_handle.to_string()
···
516
516
let Ok(handle) = Handle::new(repo) else {
517
517
return GetRecordResponse::BadRequest(xrpc_error(
518
518
"InvalidRequest",
519
-
"repo was not a valid DID or handle",
519
+
"Repo was not a valid DID or handle",
520
520
));
521
521
};
522
522
match self.identity.handle_to_did(handle).await {
···
534
534
log::debug!("handle resolution failed: {e}");
535
535
return GetRecordResponse::ServerError(xrpc_error(
536
536
"ResolutionFailed",
537
-
"errored while trying to resolve handle to DID",
537
+
"Errored while trying to resolve handle to DID",
538
538
));
539
539
}
540
540
}
···
544
544
let Ok(collection) = Nsid::new(collection) else {
545
545
return GetRecordResponse::BadRequest(xrpc_error(
546
546
"InvalidRequest",
547
-
"invalid NSID for collection",
547
+
"Invalid NSID for collection",
548
548
));
549
549
};
550
550
551
551
let Ok(rkey) = RecordKey::new(rkey) else {
552
-
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey"));
552
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
553
553
};
554
554
555
555
let cid: Option<Cid> = if let Some(cid) = cid {
556
556
let Ok(cid) = Cid::from_str(&cid) else {
557
-
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID"));
557
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
558
558
};
559
559
Some(cid)
560
560
} else {
+2
spacedust/src/error.rs
+2
spacedust/src/error.rs
···
30
30
TooManySourcesWanted,
31
31
#[error("more wantedSubjectDids were requested than allowed (max 10,000)")]
32
32
TooManyDidsWanted,
33
+
#[error("more wantedSubjectPrefixes were requested than allowed (max 100)")]
34
+
TooManySubjectPrefixesWanted,
33
35
#[error("more wantedSubjects were requested than allowed (max 50,000)")]
34
36
TooManySubjectsWanted,
35
37
}
+11
-2
spacedust/src/server.rs
+11
-2
spacedust/src/server.rs
···
227
227
#[serde(default)]
228
228
pub wanted_subjects: HashSet<String>,
229
229
#[serde(default)]
230
+
pub wanted_subject_prefixes: HashSet<String>,
231
+
#[serde(default)]
230
232
pub wanted_subject_dids: HashSet<String>,
231
233
#[serde(default)]
232
234
pub wanted_sources: HashSet<String>,
···
241
243
///
242
244
/// The at-uri must be url-encoded
243
245
///
244
-
/// Pass this parameter multiple times to specify multiple collections, like
246
+
/// Pass this parameter multiple times to specify multiple subjects, like
245
247
/// `wantedSubjects=[...]&wantedSubjects=[...]`
246
248
pub wanted_subjects: String,
249
+
/// One or more at-uri, URI, or DID prefixes to receive links about
250
+
///
251
+
/// The uri must be url-encoded
252
+
///
253
+
/// Pass this parameter multiple times to specify multiple prefixes, like
254
+
/// `wantedSubjectPrefixes=[...]&wantedSubjectPrefixes=[...]`
255
+
pub wanted_subject_prefixes: String,
247
256
/// One or more DIDs to receive links about
248
257
///
249
-
/// Pass this parameter multiple times to specify multiple collections
258
+
/// Pass this parameter multiple times to specify multiple subjects
250
259
pub wanted_subject_dids: String,
251
260
/// One or more link sources to receive links about
252
261
///
+10
-1
spacedust/src/subscriber.rs
+10
-1
spacedust/src/subscriber.rs
···
124
124
let query = &self.query;
125
125
126
126
// subject + subject DIDs are logical OR
127
-
if !(query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty()
127
+
if !(query.wanted_subjects.is_empty()
128
+
&& query.wanted_subject_prefixes.is_empty()
129
+
&& query.wanted_subject_dids.is_empty()
128
130
|| query.wanted_subjects.contains(&properties.subject)
131
+
|| query
132
+
.wanted_subject_prefixes
133
+
.iter()
134
+
.any(|p| properties.subject.starts_with(p))
129
135
|| properties
130
136
.subject_did
131
137
.as_ref()
···
154
160
}
155
161
if opts.wanted_subject_dids.len() > 10_000 {
156
162
return Err(SubscriberUpdateError::TooManyDidsWanted);
163
+
}
164
+
if opts.wanted_subject_prefixes.len() > 100 {
165
+
return Err(SubscriberUpdateError::TooManySubjectPrefixesWanted);
157
166
}
158
167
if opts.wanted_subjects.len() > 50_000 {
159
168
return Err(SubscriberUpdateError::TooManySubjectsWanted);
+1
-1
ufos/Cargo.toml
+1
-1
ufos/Cargo.toml
···
13
13
clap = { version = "4.5.31", features = ["derive"] }
14
14
dropshot = "0.16.0"
15
15
env_logger = "0.11.7"
16
-
fjall = { git = "https://github.com/fjall-rs/fjall.git", features = ["lz4"] }
16
+
fjall = { git = "https://github.com/fjall-rs/fjall.git", rev = "fb229572bb7d1d6966a596994dc1708e47ec57d8", features = ["lz4"] }
17
17
getrandom = "0.3.3"
18
18
http = "1.3.1"
19
19
jetstream = { path = "../jetstream", features = ["metrics"] }