+13
-6
constellation/src/consumer/jetstream.rs
+13
-6
constellation/src/consumer/jetstream.rs
···
226
226
println!("jetstream closed the websocket cleanly.");
227
227
break;
228
228
}
229
-
r => eprintln!("jetstream: close result after error: {r:?}"),
229
+
Err(_) => {
230
+
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "dirty close").increment(1);
231
+
println!("jetstream failed to close the websocket cleanly.");
232
+
break;
233
+
}
234
+
Ok(r) => {
235
+
eprintln!("jetstream: close result after error: {r:?}");
236
+
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
237
+
.increment(1);
238
+
// if we didn't immediately get ConnectionClosed, we should keep polling read
239
+
// until we get it.
240
+
continue;
241
+
}
230
242
}
231
-
counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "read error")
232
-
.increment(1);
233
-
// if we didn't immediately get ConnectionClosed, we should keep polling read
234
-
// until we get it.
235
-
continue;
236
243
}
237
244
};
238
245
+4
-6
constellation/src/server/filters.rs
+4
-6
constellation/src/server/filters.rs
···
5
5
Ok({
6
6
if let Some(link) = parse_any_link(s) {
7
7
match link {
8
-
Link::AtUri(at_uri) => at_uri.strip_prefix("at://").map(|noproto| {
9
-
format!("https://atproto-browser-plus-links.vercel.app/at/{noproto}")
10
-
}),
11
-
Link::Did(did) => Some(format!(
12
-
"https://atproto-browser-plus-links.vercel.app/at/{did}"
13
-
)),
8
+
Link::AtUri(at_uri) => at_uri
9
+
.strip_prefix("at://")
10
+
.map(|noproto| format!("https://pdsls.dev/at://{noproto}")),
11
+
Link::Did(did) => Some(format!("https://pdsls.dev/at://{did}")),
14
12
Link::Uri(uri) => Some(uri),
15
13
}
16
14
} else {
+1
-1
constellation/templates/dids.html.j2
+1
-1
constellation/templates/dids.html.j2
···
27
27
{% for did in linking_dids %}
28
28
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }}
29
29
-> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a>
30
-
-> browse <a href="https://atproto-browser-plus-links.vercel.app/at/{{ did.0|urlencode }}">this DID record</a></pre>
30
+
-> browse <a href="https://pdsls.dev/at://{{ did.0|urlencode }}">this DID record</a></pre>
31
31
{% endfor %}
32
32
33
33
{% if let Some(c) = cursor %}
+4
-2
slingshot/src/firehose_cache.rs
+4
-2
slingshot/src/firehose_cache.rs
···
4
4
5
5
pub async fn firehose_cache(
6
6
cache_dir: impl AsRef<Path>,
7
+
memory_mb: usize,
8
+
disk_gb: usize,
7
9
) -> Result<HybridCache<String, CachedRecord>, String> {
8
10
let cache = HybridCacheBuilder::new()
9
11
.with_name("firehose")
10
-
.memory(64 * 2_usize.pow(20))
12
+
.memory(memory_mb * 2_usize.pow(20))
11
13
.with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v))
12
14
.storage(Engine::large())
13
15
.with_device_options(
14
16
DirectFsDeviceOptions::new(cache_dir)
15
-
.with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something)
17
+
.with_capacity(disk_gb * 2_usize.pow(30))
16
18
.with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records
17
19
)
18
20
.build()
+26
-5
slingshot/src/main.rs
+26
-5
slingshot/src/main.rs
···
25
25
/// where to keep disk caches
26
26
#[arg(long)]
27
27
cache_dir: PathBuf,
28
+
/// memory cache size in MB
29
+
#[arg(long, default_value_t = 64)]
30
+
cache_memory_mb: usize,
31
+
/// disk cache size in GB
32
+
#[arg(long, default_value_t = 1)]
33
+
cache_disk_gb: usize,
34
+
/// host for HTTP server (when not using --domain)
35
+
#[arg(long, default_value = "127.0.0.1")]
36
+
host: String,
37
+
/// port for HTTP server (when not using --domain)
38
+
#[arg(long, default_value_t = 3000)]
39
+
port: u16,
40
+
/// port for metrics/prometheus server
41
+
#[arg(long, default_value_t = 8765)]
42
+
metrics_port: u16,
28
43
/// the domain pointing to this server
29
44
///
30
45
/// if present:
···
62
77
63
78
let args = Args::parse();
64
79
65
-
if let Err(e) = install_metrics_server() {
80
+
if let Err(e) = install_metrics_server(args.metrics_port) {
66
81
log::error!("failed to install metrics server: {e:?}");
67
82
} else {
68
-
log::info!("metrics listening at http://0.0.0.0:8765");
83
+
log::info!("metrics listening at http://0.0.0.0:{}", args.metrics_port);
69
84
}
70
85
71
86
std::fs::create_dir_all(&args.cache_dir).map_err(|e| {
···
83
98
log::info!("cache dir ready at at {cache_dir:?}.");
84
99
85
100
log::info!("setting up firehose cache...");
86
-
let cache = firehose_cache(cache_dir.join("./firehose")).await?;
101
+
let cache = firehose_cache(
102
+
cache_dir.join("./firehose"),
103
+
args.cache_memory_mb,
104
+
args.cache_disk_gb,
105
+
)
106
+
.await?;
87
107
log::info!("firehose cache ready.");
88
108
89
109
let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
···
112
132
args.domain,
113
133
args.acme_contact,
114
134
args.certs,
135
+
args.host,
136
+
args.port,
115
137
server_shutdown,
116
138
)
117
139
.await?;
···
172
194
Ok(())
173
195
}
174
196
175
-
fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> {
197
+
fn install_metrics_server(port: u16) -> Result<(), metrics_exporter_prometheus::BuildError> {
176
198
log::info!("installing metrics server...");
177
199
let host = [0, 0, 0, 0];
178
-
let port = 8765;
179
200
PrometheusBuilder::new()
180
201
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
181
202
.set_bucket_duration(std::time::Duration::from_secs(300))?
+19
-12
slingshot/src/server.rs
+19
-12
slingshot/src/server.rs
···
437
437
Ok(did) => did,
438
438
Err(_) => {
439
439
let Ok(alleged_handle) = Handle::new(identifier) else {
440
-
return invalid("identifier was not a valid DID or handle");
440
+
return invalid("Identifier was not a valid DID or handle");
441
441
};
442
442
443
443
match self.identity.handle_to_did(alleged_handle.clone()).await {
···
453
453
Err(e) => {
454
454
log::debug!("failed to resolve handle: {e}");
455
455
// TODO: ServerError not BadRequest
456
-
return invalid("errored while trying to resolve handle to DID");
456
+
return invalid("Errored while trying to resolve handle to DID");
457
457
}
458
458
}
459
459
}
460
460
};
461
461
let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
462
-
return invalid("failed to get DID doc");
462
+
return invalid("Failed to get DID doc");
463
463
};
464
464
let Some(partial_doc) = partial_doc else {
465
-
return invalid("failed to find DID doc");
465
+
return invalid("Failed to find DID doc");
466
466
};
467
467
468
468
// ok so here's where we're at:
···
483
483
.handle_to_did(partial_doc.unverified_handle.clone())
484
484
.await
485
485
else {
486
-
return invalid("failed to get did doc's handle");
486
+
return invalid("Failed to get DID doc's handle");
487
487
};
488
488
let Some(handle_did) = handle_did else {
489
-
return invalid("failed to resolve did doc's handle");
489
+
return invalid("Failed to resolve DID doc's handle");
490
490
};
491
491
if handle_did == did {
492
492
partial_doc.unverified_handle.to_string()
···
516
516
let Ok(handle) = Handle::new(repo) else {
517
517
return GetRecordResponse::BadRequest(xrpc_error(
518
518
"InvalidRequest",
519
-
"repo was not a valid DID or handle",
519
+
"Repo was not a valid DID or handle",
520
520
));
521
521
};
522
522
match self.identity.handle_to_did(handle).await {
···
534
534
log::debug!("handle resolution failed: {e}");
535
535
return GetRecordResponse::ServerError(xrpc_error(
536
536
"ResolutionFailed",
537
-
"errored while trying to resolve handle to DID",
537
+
"Errored while trying to resolve handle to DID",
538
538
));
539
539
}
540
540
}
···
544
544
let Ok(collection) = Nsid::new(collection) else {
545
545
return GetRecordResponse::BadRequest(xrpc_error(
546
546
"InvalidRequest",
547
-
"invalid NSID for collection",
547
+
"Invalid NSID for collection",
548
548
));
549
549
};
550
550
551
551
let Ok(rkey) = RecordKey::new(rkey) else {
552
-
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid rkey"));
552
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
553
553
};
554
554
555
555
let cid: Option<Cid> = if let Some(cid) = cid {
556
556
let Ok(cid) = Cid::from_str(&cid) else {
557
-
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "invalid CID"));
557
+
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
558
558
};
559
559
Some(cid)
560
560
} else {
···
694
694
domain: Option<String>,
695
695
acme_contact: Option<String>,
696
696
certs: Option<PathBuf>,
697
+
host: String,
698
+
port: u16,
697
699
shutdown: CancellationToken,
698
700
) -> Result<(), ServerError> {
699
701
let repo = Arc::new(repo);
···
752
754
)
753
755
.await
754
756
} else {
755
-
run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await
757
+
run(
758
+
TcpListener::bind(format!("{host}:{port}")),
759
+
app,
760
+
shutdown,
761
+
)
762
+
.await
756
763
}
757
764
}
758
765