+9
-9
Cargo.lock
+9
-9
Cargo.lock
···
992
993
[[package]]
994
name = "clap"
995
-
version = "4.5.47"
996
source = "registry+https://github.com/rust-lang/crates.io-index"
997
-
checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931"
998
dependencies = [
999
"clap_builder",
1000
"clap_derive",
···
1002
1003
[[package]]
1004
name = "clap_builder"
1005
-
version = "4.5.47"
1006
source = "registry+https://github.com/rust-lang/crates.io-index"
1007
-
checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6"
1008
dependencies = [
1009
"anstream",
1010
"anstyle",
···
1375
checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f"
1376
dependencies = [
1377
"data-encoding",
1378
-
"syn 1.0.109",
1379
]
1380
1381
[[package]]
···
3045
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
3046
dependencies = [
3047
"cfg-if",
3048
-
"windows-targets 0.48.5",
3049
]
3050
3051
[[package]]
···
4539
4540
[[package]]
4541
name = "reqwest"
4542
-
version = "0.12.22"
4543
source = "registry+https://github.com/rust-lang/crates.io-index"
4544
-
checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
4545
dependencies = [
4546
"async-compression",
4547
"base64 0.22.1",
···
6440
source = "registry+https://github.com/rust-lang/crates.io-index"
6441
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
6442
dependencies = [
6443
-
"windows-sys 0.48.0",
6444
]
6445
6446
[[package]]
···
992
993
[[package]]
994
name = "clap"
995
+
version = "4.5.48"
996
source = "registry+https://github.com/rust-lang/crates.io-index"
997
+
checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae"
998
dependencies = [
999
"clap_builder",
1000
"clap_derive",
···
1002
1003
[[package]]
1004
name = "clap_builder"
1005
+
version = "4.5.48"
1006
source = "registry+https://github.com/rust-lang/crates.io-index"
1007
+
checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9"
1008
dependencies = [
1009
"anstream",
1010
"anstyle",
···
1375
checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f"
1376
dependencies = [
1377
"data-encoding",
1378
+
"syn 2.0.106",
1379
]
1380
1381
[[package]]
···
3045
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
3046
dependencies = [
3047
"cfg-if",
3048
+
"windows-targets 0.52.6",
3049
]
3050
3051
[[package]]
···
4539
4540
[[package]]
4541
name = "reqwest"
4542
+
version = "0.12.23"
4543
source = "registry+https://github.com/rust-lang/crates.io-index"
4544
+
checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb"
4545
dependencies = [
4546
"async-compression",
4547
"base64 0.22.1",
···
6440
source = "registry+https://github.com/rust-lang/crates.io-index"
6441
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
6442
dependencies = [
6443
+
"windows-sys 0.59.0",
6444
]
6445
6446
[[package]]
+57
-14
constellation/src/bin/main.rs
+57
-14
constellation/src/bin/main.rs
···
1
use anyhow::{bail, Result};
2
use clap::{Parser, ValueEnum};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
use std::num::NonZero;
5
use std::path::PathBuf;
6
use std::sync::{atomic::AtomicU32, Arc};
···
21
#[derive(Parser, Debug)]
22
#[command(version, about, long_about = None)]
23
struct Args {
24
-
#[arg(short, long)]
25
/// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
26
/// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
27
#[arg(short, long)]
···
46
/// Saved jsonl from jetstream to use instead of a live subscription
47
#[arg(short, long)]
48
fixture: Option<PathBuf>,
49
}
50
51
#[derive(Debug, Clone, ValueEnum)]
···
78
let stream = jetstream_url(&args.jetstream);
79
println!("using jetstream server {stream:?}...",);
80
81
let stay_alive = CancellationToken::new();
82
83
match args.backend {
84
-
StorageBackend::Memory => run(MemStorage::new(), fixture, None, stream, stay_alive),
85
#[cfg(feature = "rocks")]
86
StorageBackend::Rocks => {
87
let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
···
96
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
97
}
98
println!("rocks ready.");
99
-
run(rocks, fixture, args.data, stream, stay_alive)
100
}
101
}
102
}
···
106
fixture: Option<PathBuf>,
107
data_dir: Option<PathBuf>,
108
stream: String,
109
stay_alive: CancellationToken,
110
) -> Result<()> {
111
ctrlc::set_handler({
···
150
.build()
151
.expect("axum startup")
152
.block_on(async {
153
-
install_metrics_server()?;
154
-
serve(readable, "0.0.0.0:6789", staying_alive).await
155
})
156
.unwrap();
157
stay_alive.drop_guard();
···
184
185
'monitor: loop {
186
match readable.get_stats() {
187
-
Ok(StorageStats { dids, targetables, linking_records }) => {
188
metrics::gauge!("storage.stats.dids").set(dids as f64);
189
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
190
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
···
218
Ok(())
219
}
220
221
-
fn install_metrics_server() -> Result<()> {
222
println!("installing metrics server...");
223
-
let host = [0, 0, 0, 0];
224
-
let port = 8765;
225
PrometheusBuilder::new()
226
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
227
.set_bucket_duration(time::Duration::from_secs(30))?
228
.set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here.
229
.set_enable_unit_suffix(true)
230
-
.with_http_listener((host, port))
231
.install()?;
232
-
println!(
233
-
"metrics server installed! listening on http://{}.{}.{}.{}:{port}",
234
-
host[0], host[1], host[2], host[3]
235
-
);
236
Ok(())
237
}
238
···
1
use anyhow::{bail, Result};
2
use clap::{Parser, ValueEnum};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
+
use std::net::SocketAddr;
5
use std::num::NonZero;
6
use std::path::PathBuf;
7
use std::sync::{atomic::AtomicU32, Arc};
···
22
#[derive(Parser, Debug)]
23
#[command(version, about, long_about = None)]
24
struct Args {
25
+
/// constellation server's listen address
26
+
#[arg(long)]
27
+
#[clap(default_value = "0.0.0.0:6789")]
28
+
bind: SocketAddr,
29
+
/// metrics server's listen address
30
+
#[arg(long)]
31
+
#[clap(default_value = "0.0.0.0:8765")]
32
+
bind_metrics: SocketAddr,
33
/// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
34
/// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
35
#[arg(short, long)]
···
54
/// Saved jsonl from jetstream to use instead of a live subscription
55
#[arg(short, long)]
56
fixture: Option<PathBuf>,
57
+
/// run a scan across the target id table and write all key -> ids to id -> keys
58
+
#[arg(long, action)]
59
+
repair_target_ids: bool,
60
}
61
62
#[derive(Debug, Clone, ValueEnum)]
···
89
let stream = jetstream_url(&args.jetstream);
90
println!("using jetstream server {stream:?}...",);
91
92
+
let bind = args.bind;
93
+
let metrics_bind = args.bind_metrics;
94
+
95
let stay_alive = CancellationToken::new();
96
97
match args.backend {
98
+
StorageBackend::Memory => run(
99
+
MemStorage::new(),
100
+
fixture,
101
+
None,
102
+
stream,
103
+
bind,
104
+
metrics_bind,
105
+
stay_alive,
106
+
),
107
#[cfg(feature = "rocks")]
108
StorageBackend::Rocks => {
109
let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
···
118
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
119
}
120
println!("rocks ready.");
121
+
std::thread::scope(|s| {
122
+
if args.repair_target_ids {
123
+
let rocks = rocks.clone();
124
+
let stay_alive = stay_alive.clone();
125
+
s.spawn(move || {
126
+
let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive);
127
+
eprintln!("repair finished: {rep:?}");
128
+
rep
129
+
});
130
+
}
131
+
s.spawn(|| {
132
+
let r = run(
133
+
rocks,
134
+
fixture,
135
+
args.data,
136
+
stream,
137
+
bind,
138
+
metrics_bind,
139
+
stay_alive,
140
+
);
141
+
eprintln!("run finished: {r:?}");
142
+
r
143
+
});
144
+
});
145
+
Ok(())
146
}
147
}
148
}
···
152
fixture: Option<PathBuf>,
153
data_dir: Option<PathBuf>,
154
stream: String,
155
+
bind: SocketAddr,
156
+
metrics_bind: SocketAddr,
157
stay_alive: CancellationToken,
158
) -> Result<()> {
159
ctrlc::set_handler({
···
198
.build()
199
.expect("axum startup")
200
.block_on(async {
201
+
install_metrics_server(metrics_bind)?;
202
+
serve(readable, bind, staying_alive).await
203
})
204
.unwrap();
205
stay_alive.drop_guard();
···
232
233
'monitor: loop {
234
match readable.get_stats() {
235
+
Ok(StorageStats { dids, targetables, linking_records, .. }) => {
236
metrics::gauge!("storage.stats.dids").set(dids as f64);
237
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
238
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
···
266
Ok(())
267
}
268
269
+
fn install_metrics_server(metrics_bind: SocketAddr) -> Result<()> {
270
println!("installing metrics server...");
271
PrometheusBuilder::new()
272
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
273
.set_bucket_duration(time::Duration::from_secs(30))?
274
.set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here.
275
.set_enable_unit_suffix(true)
276
+
.with_http_listener(metrics_bind)
277
.install()?;
278
+
println!("metrics server installed! listening at {metrics_bind:?}");
279
Ok(())
280
}
281
+4
constellation/src/server/filters.rs
+4
constellation/src/server/filters.rs
+289
-18
constellation/src/server/mod.rs
+289
-18
constellation/src/server/mod.rs
···
14
use std::collections::{HashMap, HashSet};
15
use std::time::{Duration, UNIX_EPOCH};
16
use tokio::net::{TcpListener, ToSocketAddrs};
17
-
use tokio::task::block_in_place;
18
use tokio_util::sync::CancellationToken;
19
20
use crate::storage::{LinkReader, StorageStats};
···
28
const DEFAULT_CURSOR_LIMIT: u64 = 16;
29
const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100;
30
31
-
const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
32
33
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
34
where
···
41
"/",
42
get({
43
let store = store.clone();
44
-
move |accept| async { block_in_place(|| hello(accept, store)) }
45
}),
46
)
47
.route(
48
"/links/count",
49
get({
50
let store = store.clone();
51
-
move |accept, query| async { block_in_place(|| count_links(accept, query, store)) }
52
}),
53
)
54
.route(
···
56
get({
57
let store = store.clone();
58
move |accept, query| async {
59
-
block_in_place(|| count_distinct_dids(accept, query, store))
60
}
61
}),
62
)
···
64
"/links",
65
get({
66
let store = store.clone();
67
-
move |accept, query| async { block_in_place(|| get_links(accept, query, store)) }
68
}),
69
)
70
.route(
···
72
get({
73
let store = store.clone();
74
move |accept, query| async {
75
-
block_in_place(|| get_distinct_dids(accept, query, store))
76
}
77
}),
78
)
···
82
get({
83
let store = store.clone();
84
move |accept, query| async {
85
-
block_in_place(|| count_all_links(accept, query, store))
86
}
87
}),
88
)
···
91
get({
92
let store = store.clone();
93
move |accept, query| async {
94
-
block_in_place(|| explore_links(accept, query, store))
95
}
96
}),
97
)
···
150
#[template(path = "hello.html.j2")]
151
struct HelloReponse {
152
help: &'static str,
153
-
days_indexed: u64,
154
stats: StorageStats,
155
}
156
fn hello(
···
160
let stats = store
161
.get_stats()
162
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
163
-
let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS))
164
-
.elapsed()
165
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
166
-
.as_secs()
167
-
/ 86400;
168
Ok(acceptable(accept, HelloReponse {
169
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
170
days_indexed,
···
173
}
174
175
#[derive(Clone, Deserialize)]
176
struct GetLinksCountQuery {
177
target: String,
178
collection: String,
···
233
}
234
235
#[derive(Clone, Deserialize)]
236
struct GetLinkItemsQuery {
237
target: String,
238
collection: String,
···
251
///
252
/// deprecated: use `did`, which can be repeated multiple times
253
from_dids: Option<String>, // comma separated: gross
254
-
#[serde(default = "get_default_limit")]
255
limit: u64,
256
// TODO: allow reverse (er, forward) order as well
257
-
}
258
-
fn get_default_limit() -> u64 {
259
-
DEFAULT_CURSOR_LIMIT
260
}
261
#[derive(Template, Serialize)]
262
#[template(path = "links.html.j2")]
···
475
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
476
}
477
}
···
14
use std::collections::{HashMap, HashSet};
15
use std::time::{Duration, UNIX_EPOCH};
16
use tokio::net::{TcpListener, ToSocketAddrs};
17
+
use tokio::task::spawn_blocking;
18
use tokio_util::sync::CancellationToken;
19
20
use crate::storage::{LinkReader, StorageStats};
···
28
const DEFAULT_CURSOR_LIMIT: u64 = 16;
29
const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100;
30
31
+
fn get_default_cursor_limit() -> u64 {
32
+
DEFAULT_CURSOR_LIMIT
33
+
}
34
+
35
+
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
36
+
eprintln!("handler error: {e}");
37
+
http::StatusCode::INTERNAL_SERVER_ERROR
38
+
}
39
40
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
41
where
···
48
"/",
49
get({
50
let store = store.clone();
51
+
move |accept| async {
52
+
spawn_blocking(|| hello(accept, store))
53
+
.await
54
+
.map_err(to500)?
55
+
}
56
+
}),
57
+
)
58
+
.route(
59
+
"/xrpc/blue.microcosm.links.getManyToManyCounts",
60
+
get({
61
+
let store = store.clone();
62
+
move |accept, query| async {
63
+
spawn_blocking(|| get_many_to_many_counts(accept, query, store))
64
+
.await
65
+
.map_err(to500)?
66
+
}
67
}),
68
)
69
.route(
70
"/links/count",
71
get({
72
let store = store.clone();
73
+
move |accept, query| async {
74
+
spawn_blocking(|| count_links(accept, query, store))
75
+
.await
76
+
.map_err(to500)?
77
+
}
78
}),
79
)
80
.route(
···
82
get({
83
let store = store.clone();
84
move |accept, query| async {
85
+
spawn_blocking(|| count_distinct_dids(accept, query, store))
86
+
.await
87
+
.map_err(to500)?
88
+
}
89
+
}),
90
+
)
91
+
.route(
92
+
"/xrpc/blue.microcosm.links.getBacklinks",
93
+
get({
94
+
let store = store.clone();
95
+
move |accept, query| async {
96
+
spawn_blocking(|| get_backlinks(accept, query, store))
97
+
.await
98
+
.map_err(to500)?
99
}
100
}),
101
)
···
103
"/links",
104
get({
105
let store = store.clone();
106
+
move |accept, query| async {
107
+
spawn_blocking(|| get_links(accept, query, store))
108
+
.await
109
+
.map_err(to500)?
110
+
}
111
}),
112
)
113
.route(
···
115
get({
116
let store = store.clone();
117
move |accept, query| async {
118
+
spawn_blocking(|| get_distinct_dids(accept, query, store))
119
+
.await
120
+
.map_err(to500)?
121
}
122
}),
123
)
···
127
get({
128
let store = store.clone();
129
move |accept, query| async {
130
+
spawn_blocking(|| count_all_links(accept, query, store))
131
+
.await
132
+
.map_err(to500)?
133
}
134
}),
135
)
···
138
get({
139
let store = store.clone();
140
move |accept, query| async {
141
+
spawn_blocking(|| explore_links(accept, query, store))
142
+
.await
143
+
.map_err(to500)?
144
}
145
}),
146
)
···
199
#[template(path = "hello.html.j2")]
200
struct HelloReponse {
201
help: &'static str,
202
+
days_indexed: Option<u64>,
203
stats: StorageStats,
204
}
205
fn hello(
···
209
let stats = store
210
.get_stats()
211
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
212
+
let days_indexed = stats
213
+
.started_at
214
+
.map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed())
215
+
.transpose()
216
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
217
+
.map(|d| d.as_secs() / 86_400);
218
Ok(acceptable(accept, HelloReponse {
219
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
220
days_indexed,
···
223
}
224
225
#[derive(Clone, Deserialize)]
226
+
#[serde(rename_all = "camelCase")]
227
+
struct GetManyToManyCountsQuery {
228
+
subject: String,
229
+
source: String,
230
+
/// path to the secondary link in the linking record
231
+
path_to_other: String,
232
+
/// filter to linking records (join of the m2m) by these DIDs
233
+
#[serde(default)]
234
+
did: Vec<String>,
235
+
/// filter to specific secondary records
236
+
#[serde(default)]
237
+
other_subject: Vec<String>,
238
+
cursor: Option<OpaqueApiCursor>,
239
+
/// Set the max number of links to return per page of results
240
+
#[serde(default = "get_default_cursor_limit")]
241
+
limit: u64,
242
+
}
243
+
#[derive(Serialize)]
244
+
struct OtherSubjectCount {
245
+
subject: String,
246
+
total: u64,
247
+
distinct: u64,
248
+
}
249
+
#[derive(Template, Serialize)]
250
+
#[template(path = "get-many-to-many-counts.html.j2")]
251
+
struct GetManyToManyCountsResponse {
252
+
counts_by_other_subject: Vec<OtherSubjectCount>,
253
+
cursor: Option<OpaqueApiCursor>,
254
+
#[serde(skip_serializing)]
255
+
query: GetManyToManyCountsQuery,
256
+
}
257
+
fn get_many_to_many_counts(
258
+
accept: ExtractAccept,
259
+
query: axum_extra::extract::Query<GetManyToManyCountsQuery>,
260
+
store: impl LinkReader,
261
+
) -> Result<impl IntoResponse, http::StatusCode> {
262
+
let cursor_key = query
263
+
.cursor
264
+
.clone()
265
+
.map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
266
+
.transpose()?
267
+
.map(|c| c.next);
268
+
269
+
let limit = query.limit;
270
+
if limit > DEFAULT_CURSOR_LIMIT_MAX {
271
+
return Err(http::StatusCode::BAD_REQUEST);
272
+
}
273
+
274
+
let filter_dids: HashSet<Did> = HashSet::from_iter(
275
+
query
276
+
.did
277
+
.iter()
278
+
.map(|d| d.trim())
279
+
.filter(|d| !d.is_empty())
280
+
.map(|d| Did(d.to_string())),
281
+
);
282
+
283
+
let filter_other_subjects: HashSet<String> = HashSet::from_iter(
284
+
query
285
+
.other_subject
286
+
.iter()
287
+
.map(|s| s.trim().to_string())
288
+
.filter(|s| !s.is_empty()),
289
+
);
290
+
291
+
let Some((collection, path)) = query.source.split_once(':') else {
292
+
return Err(http::StatusCode::BAD_REQUEST);
293
+
};
294
+
let path = format!(".{path}");
295
+
296
+
let path_to_other = format!(".{}", query.path_to_other);
297
+
298
+
let paged = store
299
+
.get_many_to_many_counts(
300
+
&query.subject,
301
+
collection,
302
+
&path,
303
+
&path_to_other,
304
+
limit,
305
+
cursor_key,
306
+
&filter_dids,
307
+
&filter_other_subjects,
308
+
)
309
+
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
310
+
311
+
let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into());
312
+
313
+
let items = paged
314
+
.items
315
+
.into_iter()
316
+
.map(|(subject, total, distinct)| OtherSubjectCount {
317
+
subject,
318
+
total,
319
+
distinct,
320
+
})
321
+
.collect();
322
+
323
+
Ok(acceptable(
324
+
accept,
325
+
GetManyToManyCountsResponse {
326
+
counts_by_other_subject: items,
327
+
cursor,
328
+
query: (*query).clone(),
329
+
},
330
+
))
331
+
}
332
+
333
+
#[derive(Clone, Deserialize)]
334
struct GetLinksCountQuery {
335
target: String,
336
collection: String,
···
391
}
392
393
#[derive(Clone, Deserialize)]
394
+
struct GetBacklinksQuery {
395
+
/// The link target
396
+
///
397
+
/// can be an AT-URI, plain DID, or regular URI
398
+
subject: String,
399
+
/// Filter links only from this link source
400
+
///
401
+
/// eg.: `app.bsky.feed.like:subject.uri`
402
+
source: String,
403
+
cursor: Option<OpaqueApiCursor>,
404
+
/// Filter links only from these DIDs
405
+
///
406
+
/// include multiple times to filter by multiple source DIDs
407
+
#[serde(default)]
408
+
did: Vec<String>,
409
+
/// Set the max number of links to return per page of results
410
+
#[serde(default = "get_default_cursor_limit")]
411
+
limit: u64,
412
+
// TODO: allow reverse (er, forward) order as well
413
+
}
414
+
#[derive(Template, Serialize)]
415
+
#[template(path = "get-backlinks.html.j2")]
416
+
struct GetBacklinksResponse {
417
+
total: u64,
418
+
records: Vec<RecordId>,
419
+
cursor: Option<OpaqueApiCursor>,
420
+
#[serde(skip_serializing)]
421
+
query: GetBacklinksQuery,
422
+
#[serde(skip_serializing)]
423
+
collection: String,
424
+
#[serde(skip_serializing)]
425
+
path: String,
426
+
}
427
+
fn get_backlinks(
428
+
accept: ExtractAccept,
429
+
query: axum_extra::extract::Query<GetBacklinksQuery>, // supports multiple param occurrences
430
+
store: impl LinkReader,
431
+
) -> Result<impl IntoResponse, http::StatusCode> {
432
+
let until = query
433
+
.cursor
434
+
.clone()
435
+
.map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
436
+
.transpose()?
437
+
.map(|c| c.next);
438
+
439
+
let limit = query.limit;
440
+
if limit > DEFAULT_CURSOR_LIMIT_MAX {
441
+
return Err(http::StatusCode::BAD_REQUEST);
442
+
}
443
+
444
+
let filter_dids: HashSet<Did> = HashSet::from_iter(
445
+
query
446
+
.did
447
+
.iter()
448
+
.map(|d| d.trim())
449
+
.filter(|d| !d.is_empty())
450
+
.map(|d| Did(d.to_string())),
451
+
);
452
+
453
+
let Some((collection, path)) = query.source.split_once(':') else {
454
+
return Err(http::StatusCode::BAD_REQUEST);
455
+
};
456
+
let path = format!(".{path}");
457
+
458
+
let paged = store
459
+
.get_links(
460
+
&query.subject,
461
+
collection,
462
+
&path,
463
+
limit,
464
+
until,
465
+
&filter_dids,
466
+
)
467
+
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
468
+
469
+
let cursor = paged.next.map(|next| {
470
+
ApiCursor {
471
+
version: paged.version,
472
+
next,
473
+
}
474
+
.into()
475
+
});
476
+
477
+
Ok(acceptable(
478
+
accept,
479
+
GetBacklinksResponse {
480
+
total: paged.total,
481
+
records: paged.items,
482
+
cursor,
483
+
query: (*query).clone(),
484
+
collection: collection.to_string(),
485
+
path,
486
+
},
487
+
))
488
+
}
489
+
490
+
#[derive(Clone, Deserialize)]
491
struct GetLinkItemsQuery {
492
target: String,
493
collection: String,
···
506
///
507
/// deprecated: use `did`, which can be repeated multiple times
508
from_dids: Option<String>, // comma separated: gross
509
+
#[serde(default = "get_default_cursor_limit")]
510
limit: u64,
511
// TODO: allow reverse (er, forward) order as well
512
}
513
#[derive(Template, Serialize)]
514
#[template(path = "links.html.j2")]
···
727
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
728
}
729
}
730
+
731
+
#[derive(Serialize, Deserialize)] // for bincode
732
+
struct ApiKeyedCursor {
733
+
next: String, // the key
734
+
}
735
+
736
+
impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor {
737
+
type Error = bincode::Error;
738
+
739
+
fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
740
+
bincode::DefaultOptions::new().deserialize(&item.0)
741
+
}
742
+
}
743
+
744
+
impl From<ApiKeyedCursor> for OpaqueApiCursor {
745
+
fn from(item: ApiKeyedCursor) -> Self {
746
+
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
747
+
}
748
+
}
+78
-1
constellation/src/storage/mem_store.rs
+78
-1
constellation/src/storage/mem_store.rs
···
1
-
use super::{LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
2
use crate::{ActionableEvent, CountsByCount, Did, RecordId};
3
use anyhow::Result;
4
use links::CollectedLink;
···
132
}
133
134
impl LinkReader for MemStorage {
135
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
136
let data = self.0.lock().unwrap();
137
let Some(paths) = data.targets.get(&Target::new(target)) else {
···
353
dids,
354
targetables,
355
linking_records,
356
})
357
}
358
}
···
1
+
use super::{
2
+
LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats,
3
+
};
4
use crate::{ActionableEvent, CountsByCount, Did, RecordId};
5
use anyhow::Result;
6
use links::CollectedLink;
···
134
}
135
136
impl LinkReader for MemStorage {
137
+
fn get_many_to_many_counts(
138
+
&self,
139
+
target: &str,
140
+
collection: &str,
141
+
path: &str,
142
+
path_to_other: &str,
143
+
limit: u64,
144
+
after: Option<String>,
145
+
filter_dids: &HashSet<Did>,
146
+
filter_to_targets: &HashSet<String>,
147
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
148
+
let data = self.0.lock().unwrap();
149
+
let Some(paths) = data.targets.get(&Target::new(target)) else {
150
+
return Ok(PagedOrderedCollection::default());
151
+
};
152
+
let Some(linkers) = paths.get(&Source::new(collection, path)) else {
153
+
return Ok(PagedOrderedCollection::default());
154
+
};
155
+
156
+
let path_to_other = RecordPath::new(path_to_other);
157
+
let filter_to_targets: HashSet<Target> =
158
+
HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s)));
159
+
160
+
let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new();
161
+
for (did, rkey) in linkers.iter().flatten().cloned() {
162
+
if !filter_dids.is_empty() && !filter_dids.contains(&did) {
163
+
continue;
164
+
}
165
+
if let Some(fwd_target) = data
166
+
.links
167
+
.get(&did)
168
+
.unwrap_or(&HashMap::new())
169
+
.get(&RepoId {
170
+
collection: collection.to_string(),
171
+
rkey,
172
+
})
173
+
.unwrap_or(&Vec::new())
174
+
.iter()
175
+
.filter_map(|(path, target)| {
176
+
if *path == path_to_other
177
+
&& (filter_to_targets.is_empty() || filter_to_targets.contains(target))
178
+
{
179
+
Some(target)
180
+
} else {
181
+
None
182
+
}
183
+
})
184
+
.take(1)
185
+
.next()
186
+
{
187
+
let e = grouped_counts.entry(fwd_target.clone()).or_default();
188
+
e.0 += 1;
189
+
e.1.insert(did.clone());
190
+
}
191
+
}
192
+
let mut items: Vec<(String, u64, u64)> = grouped_counts
193
+
.iter()
194
+
.map(|(k, (n, u))| (k.0.clone(), *n, u.len() as u64))
195
+
.collect();
196
+
items.sort();
197
+
items = items
198
+
.into_iter()
199
+
.skip_while(|(t, _, _)| after.as_ref().map(|a| t <= a).unwrap_or(false))
200
+
.take(limit as usize)
201
+
.collect();
202
+
let next = if items.len() as u64 >= limit {
203
+
items.last().map(|(t, _, _)| t.clone())
204
+
} else {
205
+
None
206
+
};
207
+
Ok(PagedOrderedCollection { items, next })
208
+
}
209
+
210
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
211
let data = self.0.lock().unwrap();
212
let Some(paths) = data.targets.get(&Target::new(target)) else {
···
428
dids,
429
targetables,
430
linking_records,
431
+
started_at: None,
432
+
other_data: Default::default(),
433
})
434
}
435
}
+225
constellation/src/storage/mod.rs
+225
constellation/src/storage/mod.rs
···
19
pub total: u64,
20
}
21
22
#[derive(Debug, Deserialize, Serialize, PartialEq)]
23
pub struct StorageStats {
24
/// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
···
33
/// records with multiple links are single-counted.
34
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
35
pub linking_records: u64,
36
}
37
38
pub trait LinkStorage: Send + Sync {
···
48
}
49
50
pub trait LinkReader: Clone + Send + Sync + 'static {
51
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
52
53
fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
···
1326
counts
1327
});
1328
assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1329
});
1330
}
···
19
pub total: u64,
20
}
21
22
+
/// A paged collection whose keys are sorted instead of indexed
23
+
///
24
+
/// this has weaker guarantees than PagedAppendingCollection: it might
25
+
/// return a totally consistent snapshot. but it should avoid duplicates
26
+
/// and each page should at least be internally consistent.
27
+
#[derive(Debug, PartialEq, Default)]
28
+
pub struct PagedOrderedCollection<T, K: Ord> {
29
+
pub items: Vec<T>,
30
+
pub next: Option<K>,
31
+
}
32
+
33
#[derive(Debug, Deserialize, Serialize, PartialEq)]
34
pub struct StorageStats {
35
/// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
···
44
/// records with multiple links are single-counted.
45
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
46
pub linking_records: u64,
47
+
48
+
/// first jetstream cursor when this instance first started
49
+
pub started_at: Option<u64>,
50
+
51
+
/// anything else we want to throw in
52
+
pub other_data: HashMap<String, u64>,
53
}
54
55
pub trait LinkStorage: Send + Sync {
···
65
}
66
67
pub trait LinkReader: Clone + Send + Sync + 'static {
68
+
#[allow(clippy::too_many_arguments)]
69
+
fn get_many_to_many_counts(
70
+
&self,
71
+
target: &str,
72
+
collection: &str,
73
+
path: &str,
74
+
path_to_other: &str,
75
+
limit: u64,
76
+
after: Option<String>,
77
+
filter_dids: &HashSet<Did>,
78
+
filter_to_targets: &HashSet<String>,
79
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>>;
80
+
81
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
82
83
fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
···
1356
counts
1357
});
1358
assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1359
+
});
1360
+
1361
+
//////// many-to-many /////////
1362
+
1363
+
test_each_storage!(get_m2m_counts_empty, |storage| {
1364
+
assert_eq!(
1365
+
storage.get_many_to_many_counts(
1366
+
"a.com",
1367
+
"a.b.c",
1368
+
".d.e",
1369
+
".f.g",
1370
+
10,
1371
+
None,
1372
+
&HashSet::new(),
1373
+
&HashSet::new(),
1374
+
)?,
1375
+
PagedOrderedCollection {
1376
+
items: vec![],
1377
+
next: None,
1378
+
}
1379
+
);
1380
+
});
1381
+
1382
+
test_each_storage!(get_m2m_counts_single, |storage| {
1383
+
storage.push(
1384
+
&ActionableEvent::CreateLinks {
1385
+
record_id: RecordId {
1386
+
did: "did:plc:asdf".into(),
1387
+
collection: "app.t.c".into(),
1388
+
rkey: "asdf".into(),
1389
+
},
1390
+
links: vec![
1391
+
CollectedLink {
1392
+
target: Link::Uri("a.com".into()),
1393
+
path: ".abc.uri".into(),
1394
+
},
1395
+
CollectedLink {
1396
+
target: Link::Uri("b.com".into()),
1397
+
path: ".def.uri".into(),
1398
+
},
1399
+
CollectedLink {
1400
+
target: Link::Uri("b.com".into()),
1401
+
path: ".ghi.uri".into(),
1402
+
},
1403
+
],
1404
+
},
1405
+
0,
1406
+
)?;
1407
+
assert_eq!(
1408
+
storage.get_many_to_many_counts(
1409
+
"a.com",
1410
+
"app.t.c",
1411
+
".abc.uri",
1412
+
".def.uri",
1413
+
10,
1414
+
None,
1415
+
&HashSet::new(),
1416
+
&HashSet::new(),
1417
+
)?,
1418
+
PagedOrderedCollection {
1419
+
items: vec![("b.com".to_string(), 1, 1)],
1420
+
next: None,
1421
+
}
1422
+
);
1423
+
});
1424
+
1425
+
test_each_storage!(get_m2m_counts_filters, |storage| {
1426
+
storage.push(
1427
+
&ActionableEvent::CreateLinks {
1428
+
record_id: RecordId {
1429
+
did: "did:plc:asdf".into(),
1430
+
collection: "app.t.c".into(),
1431
+
rkey: "asdf".into(),
1432
+
},
1433
+
links: vec![
1434
+
CollectedLink {
1435
+
target: Link::Uri("a.com".into()),
1436
+
path: ".abc.uri".into(),
1437
+
},
1438
+
CollectedLink {
1439
+
target: Link::Uri("b.com".into()),
1440
+
path: ".def.uri".into(),
1441
+
},
1442
+
],
1443
+
},
1444
+
0,
1445
+
)?;
1446
+
storage.push(
1447
+
&ActionableEvent::CreateLinks {
1448
+
record_id: RecordId {
1449
+
did: "did:plc:asdfasdf".into(),
1450
+
collection: "app.t.c".into(),
1451
+
rkey: "asdf".into(),
1452
+
},
1453
+
links: vec![
1454
+
CollectedLink {
1455
+
target: Link::Uri("a.com".into()),
1456
+
path: ".abc.uri".into(),
1457
+
},
1458
+
CollectedLink {
1459
+
target: Link::Uri("b.com".into()),
1460
+
path: ".def.uri".into(),
1461
+
},
1462
+
],
1463
+
},
1464
+
1,
1465
+
)?;
1466
+
storage.push(
1467
+
&ActionableEvent::CreateLinks {
1468
+
record_id: RecordId {
1469
+
did: "did:plc:fdsa".into(),
1470
+
collection: "app.t.c".into(),
1471
+
rkey: "asdf".into(),
1472
+
},
1473
+
links: vec![
1474
+
CollectedLink {
1475
+
target: Link::Uri("a.com".into()),
1476
+
path: ".abc.uri".into(),
1477
+
},
1478
+
CollectedLink {
1479
+
target: Link::Uri("c.com".into()),
1480
+
path: ".def.uri".into(),
1481
+
},
1482
+
],
1483
+
},
1484
+
2,
1485
+
)?;
1486
+
storage.push(
1487
+
&ActionableEvent::CreateLinks {
1488
+
record_id: RecordId {
1489
+
did: "did:plc:fdsa".into(),
1490
+
collection: "app.t.c".into(),
1491
+
rkey: "asdf2".into(),
1492
+
},
1493
+
links: vec![
1494
+
CollectedLink {
1495
+
target: Link::Uri("a.com".into()),
1496
+
path: ".abc.uri".into(),
1497
+
},
1498
+
CollectedLink {
1499
+
target: Link::Uri("c.com".into()),
1500
+
path: ".def.uri".into(),
1501
+
},
1502
+
],
1503
+
},
1504
+
3,
1505
+
)?;
1506
+
assert_eq!(
1507
+
storage.get_many_to_many_counts(
1508
+
"a.com",
1509
+
"app.t.c",
1510
+
".abc.uri",
1511
+
".def.uri",
1512
+
10,
1513
+
None,
1514
+
&HashSet::new(),
1515
+
&HashSet::new(),
1516
+
)?,
1517
+
PagedOrderedCollection {
1518
+
items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),],
1519
+
next: None,
1520
+
}
1521
+
);
1522
+
assert_eq!(
1523
+
storage.get_many_to_many_counts(
1524
+
"a.com",
1525
+
"app.t.c",
1526
+
".abc.uri",
1527
+
".def.uri",
1528
+
10,
1529
+
None,
1530
+
&HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1531
+
&HashSet::new(),
1532
+
)?,
1533
+
PagedOrderedCollection {
1534
+
items: vec![("c.com".to_string(), 2, 1),],
1535
+
next: None,
1536
+
}
1537
+
);
1538
+
assert_eq!(
1539
+
storage.get_many_to_many_counts(
1540
+
"a.com",
1541
+
"app.t.c",
1542
+
".abc.uri",
1543
+
".def.uri",
1544
+
10,
1545
+
None,
1546
+
&HashSet::new(),
1547
+
&HashSet::from_iter(["b.com".to_string()]),
1548
+
)?,
1549
+
PagedOrderedCollection {
1550
+
items: vec![("b.com".to_string(), 2, 2),],
1551
+
next: None,
1552
+
}
1553
+
);
1554
});
1555
}
+342
-40
constellation/src/storage/rocks_store.rs
+342
-40
constellation/src/storage/rocks_store.rs
···
1
-
use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
2
use crate::{CountsByCount, Did, RecordId};
3
use anyhow::{bail, Result};
4
use bincode::Options as BincodeOptions;
···
11
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
12
};
13
use serde::{Deserialize, Serialize};
14
-
use std::collections::{HashMap, HashSet};
15
use std::io::Read;
16
use std::marker::PhantomData;
17
use std::path::{Path, PathBuf};
···
20
Arc,
21
};
22
use std::thread;
23
-
use std::time::{Duration, Instant};
24
use tokio_util::sync::CancellationToken;
25
26
static DID_IDS_CF: &str = "did_ids";
···
29
static LINK_TARGETS_CF: &str = "link_targets";
30
31
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
32
33
// todo: actually understand and set these options probably better
34
fn rocks_opts_base() -> Options {
···
56
#[derive(Debug, Clone)]
57
pub struct RocksStorage {
58
pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
59
-
did_id_table: IdTable<Did, DidIdValue, true>,
60
-
target_id_table: IdTable<TargetKey, TargetId, false>,
61
is_writer: bool,
62
backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
63
}
···
85
fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
86
ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
87
}
88
-
fn init<const WITH_REVERSE: bool>(
89
-
self,
90
-
db: &DBWithThreadMode<MultiThreaded>,
91
-
) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> {
92
if db.cf_handle(&self.name).is_none() {
93
bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
94
}
···
119
}
120
}
121
#[derive(Debug, Clone)]
122
-
struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool>
123
where
124
Orig: KeyFromRocks,
125
for<'a> &'a Orig: AsRocksKey,
···
127
base: IdTableBase<Orig, IdVal>,
128
priv_id_seq: u64,
129
}
130
-
impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE>
131
where
132
Orig: KeyFromRocks,
133
for<'v> &'v IdVal: AsRocksValue,
···
139
_key_marker: PhantomData,
140
_val_marker: PhantomData,
141
name: name.into(),
142
-
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1
143
}
144
}
145
fn get_id_val(
···
178
id_value
179
}))
180
}
181
fn estimate_count(&self) -> u64 {
182
self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
183
}
184
-
}
185
-
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true>
186
-
where
187
-
Orig: KeyFromRocks,
188
-
for<'v> &'v IdVal: AsRocksValue,
189
-
for<'k> &'k Orig: AsRocksKey,
190
-
{
191
fn get_or_create_id_val(
192
&mut self,
193
db: &DBWithThreadMode<MultiThreaded>,
···
215
}
216
}
217
}
218
-
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false>
219
-
where
220
-
Orig: KeyFromRocks,
221
-
for<'v> &'v IdVal: AsRocksValue,
222
-
for<'k> &'k Orig: AsRocksKey,
223
-
{
224
-
fn get_or_create_id_val(
225
-
&mut self,
226
-
db: &DBWithThreadMode<MultiThreaded>,
227
-
batch: &mut WriteBatch,
228
-
orig: &Orig,
229
-
) -> Result<IdVal> {
230
-
let cf = db.cf_handle(&self.base.name).unwrap();
231
-
self.__get_or_create_id_val(&cf, db, batch, orig)
232
-
}
233
-
}
234
235
impl IdTableValue for DidIdValue {
236
fn new(v: u64) -> Self {
···
249
}
250
}
251
252
impl RocksStorage {
253
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
254
Self::describe_metrics();
255
-
RocksStorage::open_readmode(path, false)
256
}
257
258
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
260
}
261
262
fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
263
-
let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF);
264
-
let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF);
265
266
let cfs = vec![
267
// id reference tables
268
did_id_table.cf_descriptor(),
···
296
is_writer: !readonly,
297
backup_task: None.into(),
298
})
299
}
300
301
pub fn start_backup(
···
826
}
827
828
impl LinkReader for RocksStorage {
829
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
830
let target_key = TargetKey(
831
Target(target.to_string()),
···
1042
.map(|s| s.parse::<u64>())
1043
.transpose()?
1044
.unwrap_or(0);
1045
Ok(StorageStats {
1046
dids,
1047
targetables,
1048
linking_records,
1049
})
1050
}
1051
}
···
1071
impl AsRocksValue for &TargetId {}
1072
impl KeyFromRocks for TargetKey {}
1073
impl ValueFromRocks for TargetId {}
1074
1075
// target_links table
1076
impl AsRocksKey for &TargetId {}
···
1142
}
1143
1144
// target ids
1145
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1146
struct TargetId(u64); // key
1147
1148
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1149
pub struct Target(pub String); // the actual target/uri
1150
1151
// targets (uris, dids, etc.): the reverse index
···
1
+
use super::{
2
+
ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection,
3
+
StorageStats,
4
+
};
5
use crate::{CountsByCount, Did, RecordId};
6
use anyhow::{bail, Result};
7
use bincode::Options as BincodeOptions;
···
14
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
15
};
16
use serde::{Deserialize, Serialize};
17
+
use std::collections::{BTreeMap, HashMap, HashSet};
18
use std::io::Read;
19
use std::marker::PhantomData;
20
use std::path::{Path, PathBuf};
···
23
Arc,
24
};
25
use std::thread;
26
+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27
use tokio_util::sync::CancellationToken;
28
29
static DID_IDS_CF: &str = "did_ids";
···
32
static LINK_TARGETS_CF: &str = "link_targets";
33
34
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
35
+
static STARTED_AT_KEY: &str = "jetstream_first_cursor";
36
+
// add reverse mappings for targets if this db was running before that was a thing
37
+
static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state";
38
+
39
+
static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started
40
+
41
+
#[derive(Debug, Clone, Serialize, Deserialize)]
42
+
struct TargetIdRepairState {
43
+
/// start time for repair, microseconds timestamp
44
+
current_us_started_at: u64,
45
+
/// id table's latest id when repair started
46
+
id_when_started: u64,
47
+
/// id table id
48
+
latest_repaired_i: u64,
49
+
}
50
+
impl AsRocksValue for TargetIdRepairState {}
51
+
impl ValueFromRocks for TargetIdRepairState {}
52
53
// todo: actually understand and set these options probably better
54
fn rocks_opts_base() -> Options {
···
76
#[derive(Debug, Clone)]
77
pub struct RocksStorage {
78
pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
79
+
did_id_table: IdTable<Did, DidIdValue>,
80
+
target_id_table: IdTable<TargetKey, TargetId>,
81
is_writer: bool,
82
backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
83
}
···
105
fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
106
ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
107
}
108
+
fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> {
109
if db.cf_handle(&self.name).is_none() {
110
bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
111
}
···
136
}
137
}
138
#[derive(Debug, Clone)]
139
+
struct IdTable<Orig, IdVal: IdTableValue>
140
where
141
Orig: KeyFromRocks,
142
for<'a> &'a Orig: AsRocksKey,
···
144
base: IdTableBase<Orig, IdVal>,
145
priv_id_seq: u64,
146
}
147
+
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal>
148
where
149
Orig: KeyFromRocks,
150
for<'v> &'v IdVal: AsRocksValue,
···
156
_key_marker: PhantomData,
157
_val_marker: PhantomData,
158
name: name.into(),
159
+
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1
160
}
161
}
162
fn get_id_val(
···
195
id_value
196
}))
197
}
198
+
199
fn estimate_count(&self) -> u64 {
200
self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
201
}
202
+
203
fn get_or_create_id_val(
204
&mut self,
205
db: &DBWithThreadMode<MultiThreaded>,
···
227
}
228
}
229
}
230
231
impl IdTableValue for DidIdValue {
232
fn new(v: u64) -> Self {
···
245
}
246
}
247
248
+
fn now() -> u64 {
249
+
SystemTime::now()
250
+
.duration_since(UNIX_EPOCH)
251
+
.unwrap()
252
+
.as_micros() as u64
253
+
}
254
+
255
impl RocksStorage {
256
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
257
Self::describe_metrics();
258
+
let me = RocksStorage::open_readmode(path, false)?;
259
+
me.global_init()?;
260
+
Ok(me)
261
}
262
263
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
265
}
266
267
fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
268
+
let did_id_table = IdTable::setup(DID_IDS_CF);
269
+
let target_id_table = IdTable::setup(TARGET_IDS_CF);
270
271
+
// note: global stuff like jetstream cursor goes in the default cf
272
+
// these are bonus extra cfs
273
let cfs = vec![
274
// id reference tables
275
did_id_table.cf_descriptor(),
···
303
is_writer: !readonly,
304
backup_task: None.into(),
305
})
306
+
}
307
+
308
+
fn global_init(&self) -> Result<()> {
309
+
let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
310
+
if first_run {
311
+
self.db.put(STARTED_AT_KEY, _rv(now()))?;
312
+
313
+
// hack / temporary: if we're a new db, put in a completed repair
314
+
// state so we don't run repairs (repairs are for old-code dbs)
315
+
let completed = TargetIdRepairState {
316
+
id_when_started: 0,
317
+
current_us_started_at: 0,
318
+
latest_repaired_i: 0,
319
+
};
320
+
self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
321
+
}
322
+
Ok(())
323
+
}
324
+
325
+
pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
326
+
let mut state = match self
327
+
.db
328
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
329
+
.map(|s| _vr(&s))
330
+
.transpose()?
331
+
{
332
+
Some(s) => s,
333
+
None => TargetIdRepairState {
334
+
id_when_started: self.did_id_table.priv_id_seq,
335
+
current_us_started_at: now(),
336
+
latest_repaired_i: 0,
337
+
},
338
+
};
339
+
340
+
eprintln!("initial repair state: {state:?}");
341
+
342
+
let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
343
+
344
+
let mut iter = self.db.raw_iterator_cf(&cf);
345
+
iter.seek_to_first();
346
+
347
+
eprintln!("repair iterator sent to first key");
348
+
349
+
// skip ahead if we're done some, or take a single first step
350
+
for _ in 0..state.latest_repaired_i {
351
+
iter.next();
352
+
}
353
+
354
+
eprintln!(
355
+
"repair iterator skipped to {}th key",
356
+
state.latest_repaired_i
357
+
);
358
+
359
+
let mut maybe_done = false;
360
+
361
+
let mut write_fast = rocksdb::WriteOptions::default();
362
+
write_fast.set_sync(false);
363
+
write_fast.disable_wal(true);
364
+
365
+
while !stay_alive.is_cancelled() && !maybe_done {
366
+
// let mut batch = WriteBatch::default();
367
+
368
+
let mut any_written = false;
369
+
370
+
for _ in 0..1000 {
371
+
if state.latest_repaired_i % 1_000_000 == 0 {
372
+
eprintln!("target iter at {}", state.latest_repaired_i);
373
+
}
374
+
state.latest_repaired_i += 1;
375
+
376
+
if !iter.valid() {
377
+
eprintln!("invalid iter, are we done repairing?");
378
+
maybe_done = true;
379
+
break;
380
+
};
381
+
382
+
// eprintln!("iterator seems to be valid! getting the key...");
383
+
let raw_key = iter.key().unwrap();
384
+
if raw_key.len() == 8 {
385
+
// eprintln!("found an 8-byte key, skipping it since it's probably an id...");
386
+
iter.next();
387
+
continue;
388
+
}
389
+
let target: TargetKey = _kr::<TargetKey>(raw_key)?;
390
+
let target_id: TargetId = _vr(iter.value().unwrap())?;
391
+
392
+
self.db
393
+
.put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
394
+
any_written = true;
395
+
iter.next();
396
+
}
397
+
398
+
if any_written {
399
+
self.db
400
+
.put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
401
+
std::thread::sleep(breather);
402
+
}
403
+
}
404
+
405
+
eprintln!("repair iterator done.");
406
+
407
+
Ok(false)
408
}
409
410
pub fn start_backup(
···
935
}
936
937
impl LinkReader for RocksStorage {
938
+
fn get_many_to_many_counts(
939
+
&self,
940
+
target: &str,
941
+
collection: &str,
942
+
path: &str,
943
+
path_to_other: &str,
944
+
limit: u64,
945
+
after: Option<String>,
946
+
filter_dids: &HashSet<Did>,
947
+
filter_to_targets: &HashSet<String>,
948
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
949
+
let collection = Collection(collection.to_string());
950
+
let path = RPath(path.to_string());
951
+
952
+
let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone());
953
+
954
+
// unfortunately the cursor is a, uh, stringified number.
955
+
// this was easier for the memstore (plain target, not target id), and
956
+
// making it generic is a bit awful.
957
+
// so... parse the number out of a string here :(
958
+
// TODO: this should bubble up to a BAD_REQUEST response
959
+
let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
960
+
961
+
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
962
+
eprintln!("nothin doin for this target, {target_key:?}");
963
+
return Ok(Default::default());
964
+
};
965
+
966
+
let filter_did_ids: HashMap<DidId, bool> = filter_dids
967
+
.iter()
968
+
.filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
969
+
.collect::<Result<Vec<DidIdValue>>>()?
970
+
.into_iter()
971
+
.map(|DidIdValue(id, active)| (id, active))
972
+
.collect();
973
+
974
+
// stored targets are keyed by triples of (target, collection, path).
975
+
// target filtering only consideres the target itself, so we actually
976
+
// need to do a prefix iteration of all target ids for this target and
977
+
// keep them all.
978
+
// i *think* the number of keys at a target prefix should usually be
979
+
// pretty small, so this is hopefully fine. but if it turns out to be
980
+
// large, we can push this filtering back into the main links loop and
981
+
// do forward db queries per backlink to get the raw target back out.
982
+
let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new();
983
+
for t in filter_to_targets {
984
+
for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) {
985
+
filter_to_target_ids.insert(target_id);
986
+
}
987
+
}
988
+
989
+
let linkers = self.get_target_linkers(&target_id)?;
990
+
991
+
let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new();
992
+
993
+
for (did_id, rkey) in linkers.0 {
994
+
if did_id.is_empty() {
995
+
continue;
996
+
}
997
+
998
+
if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) {
999
+
continue;
1000
+
}
1001
+
1002
+
let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey);
1003
+
let Some(targets) = self.get_record_link_targets(&record_link_key)? else {
1004
+
continue;
1005
+
};
1006
+
1007
+
let Some(fwd_target) = targets
1008
+
.0
1009
+
.into_iter()
1010
+
.filter_map(|RecordLinkTarget(rpath, target_id)| {
1011
+
if rpath.0 == path_to_other
1012
+
&& (filter_to_target_ids.is_empty()
1013
+
|| filter_to_target_ids.contains(&target_id))
1014
+
{
1015
+
Some(target_id)
1016
+
} else {
1017
+
None
1018
+
}
1019
+
})
1020
+
.take(1)
1021
+
.next()
1022
+
else {
1023
+
eprintln!("no forward match");
1024
+
continue;
1025
+
};
1026
+
1027
+
// small relief: we page over target ids, so we can already bail
1028
+
// reprocessing previous pages here
1029
+
if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) {
1030
+
continue;
1031
+
}
1032
+
1033
+
// aand we can skip target ids that must be on future pages
1034
+
// (this check continues after the did-lookup, which we have to do)
1035
+
let page_is_full = grouped_counts.len() as u64 >= limit;
1036
+
if page_is_full {
1037
+
let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh
1038
+
if fwd_target > *current_max {
1039
+
continue;
1040
+
}
1041
+
}
1042
+
1043
+
// bit painful: 2-step lookup to make sure this did is active
1044
+
let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
1045
+
eprintln!("failed to look up did from did_id {did_id:?}");
1046
+
continue;
1047
+
};
1048
+
let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
1049
+
eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1050
+
continue;
1051
+
};
1052
+
if !active {
1053
+
continue;
1054
+
}
1055
+
1056
+
// page-management, continued
1057
+
// if we have a full page, and we're inserting a *new* key less than
1058
+
// the current max, then we can evict the current max
1059
+
let mut should_evict = false;
1060
+
let entry = grouped_counts.entry(fwd_target.clone()).or_insert_with(|| {
1061
+
// this is a *new* key, so kick the max if we're full
1062
+
should_evict = page_is_full;
1063
+
Default::default()
1064
+
});
1065
+
entry.0 += 1;
1066
+
entry.1.insert(did_id);
1067
+
1068
+
if should_evict {
1069
+
grouped_counts.pop_last();
1070
+
}
1071
+
}
1072
+
1073
+
let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len());
1074
+
for (target_id, (n, dids)) in &grouped_counts {
1075
+
let Some(target) = self
1076
+
.target_id_table
1077
+
.get_val_from_id(&self.db, target_id.0)?
1078
+
else {
1079
+
eprintln!("failed to look up target from target_id {target_id:?}");
1080
+
continue;
1081
+
};
1082
+
items.push((target.0 .0, *n, dids.len() as u64));
1083
+
}
1084
+
1085
+
let next = if grouped_counts.len() as u64 >= limit {
1086
+
// yeah.... it's a number saved as a string......sorry
1087
+
grouped_counts
1088
+
.keys()
1089
+
.next_back()
1090
+
.map(|k| format!("{}", k.0))
1091
+
} else {
1092
+
None
1093
+
};
1094
+
1095
+
Ok(PagedOrderedCollection { items, next })
1096
+
}
1097
+
1098
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
1099
let target_key = TargetKey(
1100
Target(target.to_string()),
···
1311
.map(|s| s.parse::<u64>())
1312
.transpose()?
1313
.unwrap_or(0);
1314
+
let started_at = self
1315
+
.db
1316
+
.get(STARTED_AT_KEY)?
1317
+
.map(|c| _vr(&c))
1318
+
.transpose()?
1319
+
.unwrap_or(COZY_FIRST_CURSOR);
1320
+
1321
+
let other_data = self
1322
+
.db
1323
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
1324
+
.map(|s| _vr(&s))
1325
+
.transpose()?
1326
+
.map(
1327
+
|TargetIdRepairState {
1328
+
current_us_started_at,
1329
+
id_when_started,
1330
+
latest_repaired_i,
1331
+
}| {
1332
+
HashMap::from([
1333
+
("current_us_started_at".to_string(), current_us_started_at),
1334
+
("id_when_started".to_string(), id_when_started),
1335
+
("latest_repaired_i".to_string(), latest_repaired_i),
1336
+
])
1337
+
},
1338
+
)
1339
+
.unwrap_or(HashMap::default());
1340
+
1341
Ok(StorageStats {
1342
dids,
1343
targetables,
1344
linking_records,
1345
+
started_at: Some(started_at),
1346
+
other_data,
1347
})
1348
}
1349
}
···
1369
impl AsRocksValue for &TargetId {}
1370
impl KeyFromRocks for TargetKey {}
1371
impl ValueFromRocks for TargetId {}
1372
+
1373
+
// temp?
1374
+
impl KeyFromRocks for TargetId {}
1375
+
impl AsRocksValue for &TargetKey {}
1376
1377
// target_links table
1378
impl AsRocksKey for &TargetId {}
···
1444
}
1445
1446
// target ids
1447
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
1448
struct TargetId(u64); // key
1449
1450
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1451
pub struct Target(pub String); // the actual target/uri
1452
1453
// targets (uris, dids, etc.): the reverse index
+54
constellation/templates/get-backlinks.html.j2
+54
constellation/templates/get-backlinks.html.j2
···
···
1
+
{% extends "base.html.j2" %}
2
+
{% import "try-it-macros.html.j2" as try_it %}
3
+
4
+
{% block title %}Backlinks{% endblock %}
5
+
{% block description %}All {{ query.source }} records with links to {{ query.subject }}{% endblock %}
6
+
7
+
{% block content %}
8
+
9
+
{% call try_it::get_backlinks(query.subject, query.source, query.did, query.limit) %}
10
+
11
+
<h2>
12
+
Links to <code>{{ query.subject }}</code>
13
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
14
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
15
+
{% endif %}
16
+
</h2>
17
+
18
+
<p><strong>{{ total|human_number }} links</strong> from <code>{{ query.source }}</code>.</p>
19
+
20
+
<ul>
21
+
<li>See distinct linking DIDs at <code>/links/distinct-dids</code>: <a href="/links/distinct-dids?target={{ query.subject|urlencode }}&collection={{ collection|urlencode }}&path={{ path|urlencode }}">/links/distinct-dids?target={{ query.subject }}&collection={{ collection }}&path={{ path }}</a></li>
22
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
23
+
</ul>
24
+
25
+
<h3>Links, most recent first:</h3>
26
+
27
+
{% for record in records %}
28
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ record.did().0 }} (<a href="/links/all?target={{ record.did().0|urlencode }}">DID links</a>)
29
+
<strong>Collection</strong>: {{ record.collection }}
30
+
<strong>RKey</strong>: {{ record.rkey }}
31
+
-> <a href="https://pdsls.dev/at://{{ record.did().0 }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre>
32
+
{% endfor %}
33
+
34
+
{% if let Some(c) = cursor %}
35
+
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
36
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
37
+
<input type="hidden" name="source" value="{{ query.source }}" />
38
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
39
+
{% for did in query.did %}
40
+
<input type="hidden" name="did" value="{{ did }}" />
41
+
{% endfor %}
42
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
43
+
<button type="submit">next page…</button>
44
+
</form>
45
+
{% else %}
46
+
<button disabled><em>end of results</em></button>
47
+
{% endif %}
48
+
49
+
<details>
50
+
<summary>Raw JSON response</summary>
51
+
<pre class="code">{{ self|tojson }}</pre>
52
+
</details>
53
+
54
+
{% endblock %}
+67
constellation/templates/get-many-to-many-counts.html.j2
+67
constellation/templates/get-many-to-many-counts.html.j2
···
···
1
+
{% extends "base.html.j2" %}
2
+
{% import "try-it-macros.html.j2" as try_it %}
3
+
4
+
{% block title %}Many to Many counts{% endblock %}
5
+
{% block description %}Counts of many-to-many {{ query.source }} join records with links to {{ query.subject }} and a secondary target at {{ query.path_to_other }}{% endblock %}
6
+
7
+
{% block content %}
8
+
9
+
{% call try_it::get_many_to_many_counts(
10
+
query.subject,
11
+
query.source,
12
+
query.path_to_other,
13
+
query.did,
14
+
query.other_subject,
15
+
query.limit,
16
+
) %}
17
+
18
+
<h2>
19
+
Many-to-many links to <code>{{ query.subject }}</code> joining through <code>{{ query.path_to_other }}</code>
20
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
21
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
22
+
{% endif %}
23
+
</h2>
24
+
25
+
<p><strong>{% if cursor.is_some() || query.cursor.is_some() %}more than {% endif %}{{ counts_by_other_subject.len()|to_u64|human_number }} joins</strong> <code>{{ query.source }}โ{{ query.path_to_other }}</code></p>
26
+
27
+
<ul>
28
+
<li>See direct backlinks at <code>/xrpc/blue.microcosm.links.getBacklinks</code>: <a href="/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject|urlencode }}&source={{ query.source|urlencode }}">/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject }}&source={{ query.source }}</a></li>
29
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
30
+
</ul>
31
+
32
+
<h3>Counts by other subject:</h3>
33
+
34
+
{% for counts in counts_by_other_subject %}
35
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>Joined subject</strong>: {{ counts.subject }}
36
+
<strong>Joining records</strong>: {{ counts.total }}
37
+
<strong>Unique joiner ids</strong>: {{ counts.distinct }}
38
+
-> {% if let Some(browseable_uri) = counts.subject|to_browseable -%}
39
+
<a href="{{ browseable_uri }}">browse record</a>
40
+
{%- endif %}</pre>
41
+
{% endfor %}
42
+
43
+
{% if let Some(c) = cursor %}
44
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
45
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
46
+
<input type="hidden" name="source" value="{{ query.source }}" />
47
+
<input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" />
48
+
{% for did in query.did %}
49
+
<input type="hidden" name="did" value="{{ did }}" />
50
+
{% endfor %}
51
+
{% for otherSubject in query.other_subject %}
52
+
<input type="hidden" name="otherSubject" value="{{ otherSubject }}" />
53
+
{% endfor %}
54
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
55
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
56
+
<button type="submit">next page…</button>
57
+
</form>
58
+
{% else %}
59
+
<button disabled><em>end of results</em></button>
60
+
{% endif %}
61
+
62
+
<details>
63
+
<summary>Raw JSON response</summary>
64
+
<pre class="code">{{ self|tojson }}</pre>
65
+
</details>
66
+
67
+
{% endblock %}
+57
-2
constellation/templates/hello.html.j2
+57
-2
constellation/templates/hello.html.j2
···
19
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
20
21
<p>
22
-
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">{{ days_indexed|human_number }}</span> days.<br/>
23
<small>(indexing new records in real time, backfill coming soon!)</small>
24
</p>
25
26
-
<p>But feel free to use it! If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
27
28
29
<h2>API Endpoints</h2>
30
31
<h3 class="route"><code>GET /links</code></h3>
32
33
<p>A list of records linking to a target.</p>
34
35
<h4>Query parameters:</h4>
36
···
19
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
20
21
<p>
22
+
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">
23
+
{%- if let Some(days) = days_indexed %}
24
+
{{ days|human_number }}
25
+
{% else %}
26
+
???
27
+
{% endif -%}
28
+
</span> days.<br/>
29
<small>(indexing new records in real time, backfill coming soon!)</small>
30
</p>
31
32
+
{# {% for k, v in stats.other_data.iter() %}
33
+
<p><strong>{{ k }}</strong>: {{ v }}</p>
34
+
{% endfor %} #}
35
+
36
+
<p>You're welcome to use this public instance! Please do not build the torment nexus. If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
37
38
39
<h2>API Endpoints</h2>
40
41
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getBacklinks</code></h3>
42
+
43
+
<p>A list of records linking to any record, identity, or uri.</p>
44
+
45
+
<h4>Query parameters:</h4>
46
+
47
+
<ul>
48
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
49
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
50
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
51
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
52
+
</ul>
53
+
54
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
55
+
{% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %}
56
+
57
+
58
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3>
59
+
60
+
<p>TODO: description</p>
61
+
62
+
<h4>Query parameters:</h4>
63
+
64
+
<ul>
65
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
66
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
67
+
<li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li>
68
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
69
+
<li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple users. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
70
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
71
+
</ul>
72
+
73
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
74
+
{% call try_it::get_many_to_many_counts(
75
+
"at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue",
76
+
"sh.tangled.label.op:add[].key",
77
+
"subject",
78
+
[""],
79
+
[""],
80
+
25,
81
+
) %}
82
+
83
+
84
<h3 class="route"><code>GET /links</code></h3>
85
86
<p>A list of records linking to a target.</p>
87
+
88
+
<p>[DEPRECATED]: use <code>GET /xrpc/blue.microcosm.links.getBacklinks</code>. New apps should avoid it, but this endpoint <strong>will</strong> remain supported for the forseeable future.</p>
89
90
<h4>Query parameters:</h4>
91
+1
-1
constellation/templates/links.html.j2
+1
-1
constellation/templates/links.html.j2
···
28
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ record.did().0 }} (<a href="/links/all?target={{ record.did().0|urlencode }}">DID links</a>)
29
<strong>Collection</strong>: {{ record.collection }}
30
<strong>RKey</strong>: {{ record.rkey }}
31
-
-> <a href="https://atproto-browser-plus-links.vercel.app/at/{{ record.did().0|urlencode }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre>
32
{% endfor %}
33
34
{% if let Some(c) = cursor %}
···
28
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ record.did().0 }} (<a href="/links/all?target={{ record.did().0|urlencode }}">DID links</a>)
29
<strong>Collection</strong>: {{ record.collection }}
30
<strong>RKey</strong>: {{ record.rkey }}
31
+
-> <a href="https://pdsls.dev/at://{{ record.did().0 }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre>
32
{% endfor %}
33
34
{% if let Some(c) = cursor %}
+68
-1
constellation/templates/try-it-macros.html.j2
+68
-1
constellation/templates/try-it-macros.html.j2
···
1
+
{% macro get_backlinks(subject, source, dids, limit) %}
2
+
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
3
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks
4
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
5
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
6
+
{%- for did in dids %}{% if !did.is_empty() %}
7
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
8
+
<span id="did-placeholder"></span> <button id="add-did">+ did filter</button>
9
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
10
+
</form>
11
+
<script>
12
+
const addDidButton = document.getElementById('add-did');
13
+
const didPlaceholder = document.getElementById('did-placeholder');
14
+
addDidButton.addEventListener('click', e => {
15
+
e.preventDefault();
16
+
const i = document.createElement('input');
17
+
i.placeholder = 'did:plc:...';
18
+
i.name = "did"
19
+
const p = addDidButton.parentNode;
20
+
p.insertBefore(document.createTextNode('&did= '), didPlaceholder);
21
+
p.insertBefore(i, didPlaceholder);
22
+
p.insertBefore(document.createTextNode('\n '), didPlaceholder);
23
+
});
24
+
</script>
25
+
{% endmacro %}
26
+
27
+
{% macro get_many_to_many_counts(subject, source, pathToOther, dids, otherSubjects, limit) %}
28
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
29
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToManyCounts
30
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
31
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
32
+
&pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing.uri" />
33
+
{%- for did in dids %}{% if !did.is_empty() %}
34
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
35
+
<span id="m2m-subject-placeholder"></span> <button id="m2m-add-subject">+ other subject filter</button>
36
+
{%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %}
37
+
&otherSubject= <input type="text" name="did" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %}
38
+
<span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button>
39
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
40
+
</form>
41
+
<script>
42
+
const m2mAddDidButton = document.getElementById('m2m-add-did');
43
+
const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder');
44
+
m2mAddDidButton.addEventListener('click', e => {
45
+
e.preventDefault();
46
+
const i = document.createElement('input');
47
+
i.placeholder = 'did:plc:...';
48
+
i.name = "did"
49
+
const p = m2mAddDidButton.parentNode;
50
+
p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder);
51
+
p.insertBefore(i, m2mDidPlaceholder);
52
+
p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder);
53
+
});
54
+
const m2mAddSubjectButton = document.getElementById('m2m-add-subject');
55
+
const m2mSubjectPlaceholder = document.getElementById('m2m-subject-placeholder');
56
+
m2mAddSubjectButton.addEventListener('click', e => {
57
+
e.preventDefault();
58
+
const i = document.createElement('input');
59
+
i.placeholder = 'at-uri, did, uri...';
60
+
i.name = "otherSubject"
61
+
const p = m2mAddSubjectButton.parentNode;
62
+
p.insertBefore(document.createTextNode('&otherSubject= '), m2mSubjectPlaceholder);
63
+
p.insertBefore(i, m2mSubjectPlaceholder);
64
+
p.insertBefore(document.createTextNode('\n '), m2mSubjectPlaceholder);
65
+
});
66
+
</script>
67
+
{% endmacro %}
68
+
69
{% macro links(target, collection, path, dids, limit) %}
70
<form method="get" action="/links">
71
<pre class="code"><strong>GET</strong> /links
···
92
});
93
</script>
94
{% endmacro %}
95
96
{% macro dids(target, collection, path) %}
97
<form method="get" action="/links/distinct-dids">