tangled
alpha
login
or
join now
microcosm.blue
/
microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
60
fork
atom
overview
issues
8
pulls
4
pipelines
Compare changes
Choose any two refs to compare.
base:
ufos/fjall-weak-delete
spacedust-backfill
slingshot-update-foyer
slingshot-proxy-hydrate
proxy-blobby
pocket
order_query
np-config-cache
metrics
many-to-many-counts
major-compact
main
constellation/did-web
no tags found
compare:
ufos/fjall-weak-delete
spacedust-backfill
slingshot-update-foyer
slingshot-proxy-hydrate
proxy-blobby
pocket
order_query
np-config-cache
metrics
many-to-many-counts
major-compact
main
constellation/did-web
no tags found
go
+1479
-115
11 changed files
expand all
collapse all
unified
split
Cargo.lock
constellation
src
bin
main.rs
server
mod.rs
slingshot
Cargo.toml
src
error.rs
identity.rs
lib.rs
main.rs
proxy.rs
record.rs
server.rs
+3
-2
Cargo.lock
···
803
803
804
804
[[package]]
805
805
name = "bytes"
806
806
-
version = "1.11.1"
806
806
+
version = "1.10.1"
807
807
source = "registry+https://github.com/rust-lang/crates.io-index"
808
808
-
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
808
808
+
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
809
809
810
810
[[package]]
811
811
name = "byteview"
···
5965
5965
"form_urlencoded",
5966
5966
"idna",
5967
5967
"percent-encoding",
5968
5968
+
"serde",
5968
5969
]
5969
5970
5970
5971
[[package]]
+1
-7
constellation/src/bin/main.rs
···
45
45
#[arg(short, long)]
46
46
#[clap(value_enum, default_value_t = StorageBackend::Memory)]
47
47
backend: StorageBackend,
48
48
-
/// Serve a did:web document for this domain
49
49
-
#[arg(long)]
50
50
-
did_web_domain: Option<String>,
51
48
/// Initiate a database backup into this dir, if supported by the storage
52
49
#[arg(long)]
53
50
backup: Option<PathBuf>,
···
106
103
MemStorage::new(),
107
104
fixture,
108
105
None,
109
109
-
args.did_web_domain,
110
106
stream,
111
107
bind,
112
108
metrics_bind,
···
142
138
rocks,
143
139
fixture,
144
140
args.data,
145
145
-
args.did_web_domain,
146
141
stream,
147
142
bind,
148
143
metrics_bind,
···
164
159
mut storage: impl LinkStorage,
165
160
fixture: Option<PathBuf>,
166
161
data_dir: Option<PathBuf>,
167
167
-
did_web_domain: Option<String>,
168
162
stream: String,
169
163
bind: SocketAddr,
170
164
metrics_bind: SocketAddr,
···
217
211
if collect_metrics {
218
212
install_metrics_server(metrics_bind)?;
219
213
}
220
220
-
serve(readable, bind, did_web_domain, staying_alive).await
214
214
+
serve(readable, bind, staying_alive).await
221
215
})
222
216
.unwrap();
223
217
stay_alive.drop_guard();
+7
-32
constellation/src/server/mod.rs
···
3
3
extract::{Query, Request},
4
4
http::{self, header},
5
5
middleware::{self, Next},
6
6
-
response::{IntoResponse, Json, Response},
6
6
+
response::{IntoResponse, Response},
7
7
routing::get,
8
8
Router,
9
9
};
···
37
37
http::StatusCode::INTERNAL_SERVER_ERROR
38
38
}
39
39
40
40
-
pub async fn serve<S: LinkReader, A: ToSocketAddrs>(
41
41
-
store: S,
42
42
-
addr: A,
43
43
-
did_web_domain: Option<String>,
44
44
-
stay_alive: CancellationToken,
45
45
-
) -> anyhow::Result<()> {
46
46
-
let mut app = Router::new();
47
47
-
48
48
-
if let Some(d) = did_web_domain {
49
49
-
app = app.route(
50
50
-
"/.well-known/did.json",
51
51
-
get({
52
52
-
let domain = d.clone();
53
53
-
move || did_web(domain)
54
54
-
}),
55
55
-
)
56
56
-
}
57
57
-
58
58
-
let app = app
40
40
+
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
41
41
+
where
42
42
+
S: LinkReader,
43
43
+
A: ToSocketAddrs,
44
44
+
{
45
45
+
let app = Router::new()
59
46
.route("/robots.txt", get(robots))
60
47
.route(
61
48
"/",
···
217
204
User-agent: *
218
205
Disallow: /links
219
206
Disallow: /links/
220
220
-
Disallow: /xrpc/
221
207
"
222
222
-
}
223
223
-
224
224
-
async fn did_web(domain: String) -> impl IntoResponse {
225
225
-
Json(serde_json::json!({
226
226
-
"id": format!("did:web:{domain}"),
227
227
-
"service": [{
228
228
-
"id": "#constellation",
229
229
-
"type": "ConstellationGraphService",
230
230
-
"serviceEndpoint": format!("https://{domain}")
231
231
-
}]
232
232
-
}))
233
208
}
234
209
235
210
#[derive(Template, Serialize, Deserialize)]
+1
-1
slingshot/Cargo.toml
···
28
28
tokio = { version = "1.47.0", features = ["full"] }
29
29
tokio-util = "0.7.15"
30
30
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
31
31
-
url = "2.5.4"
31
31
+
url = { version = "2.5.4", features = ["serde"] }
+18
slingshot/src/error.rs
···
91
91
#[error("upstream non-atproto bad request")]
92
92
UpstreamBadBadNotGoodRequest(reqwest::Error),
93
93
}
94
94
+
95
95
+
#[derive(Debug, Error)]
96
96
+
pub enum ProxyError {
97
97
+
#[error("failed to parse path: {0}")]
98
98
+
PathParseError(String),
99
99
+
#[error(transparent)]
100
100
+
UrlParseError(#[from] url::ParseError),
101
101
+
#[error(transparent)]
102
102
+
ReqwestError(#[from] reqwest::Error),
103
103
+
#[error(transparent)]
104
104
+
InvalidHeader(#[from] reqwest::header::InvalidHeaderValue),
105
105
+
#[error(transparent)]
106
106
+
IdentityError(#[from] IdentityError),
107
107
+
#[error("upstream service could not be resolved")]
108
108
+
ServiceNotFound,
109
109
+
#[error("upstream service was found but no services matched")]
110
110
+
ServiceNotMatched,
111
111
+
}
+208
-16
slingshot/src/identity.rs
···
17
17
18
18
use crate::error::IdentityError;
19
19
use atrium_api::{
20
20
-
did_doc::DidDocument,
20
20
+
did_doc::{DidDocument, Service as DidDocServic},
21
21
types::string::{Did, Handle},
22
22
};
23
23
use atrium_common::resolver::Resolver;
···
41
41
pub enum IdentityKey {
42
42
Handle(Handle),
43
43
Did(Did),
44
44
+
ServiceDid(Did),
44
45
}
45
46
46
47
impl IdentityKey {
···
48
49
let s = match self {
49
50
IdentityKey::Handle(h) => h.as_str(),
50
51
IdentityKey::Did(d) => d.as_str(),
52
52
+
IdentityKey::ServiceDid(d) => d.as_str(),
51
53
};
52
54
std::mem::size_of::<Self>() + std::mem::size_of_val(s)
53
55
}
···
59
61
#[derive(Debug, Serialize, Deserialize)]
60
62
enum IdentityData {
61
63
NotFound,
62
62
-
Did(Did),
63
63
-
Doc(PartialMiniDoc),
64
64
+
Did(Did), // from handle
65
65
+
Doc(PartialMiniDoc), // from did
66
66
+
ServiceDoc(MiniServiceDoc), // from service did
64
67
}
65
68
66
69
impl IdentityVal {
···
71
74
IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
72
75
IdentityData::Doc(d) => {
73
76
std::mem::size_of_val(d.unverified_handle.as_str())
74
74
-
+ std::mem::size_of_val(d.pds.as_str())
75
75
-
+ std::mem::size_of_val(d.signing_key.as_str())
77
77
+
+ std::mem::size_of_val(&d.pds)
78
78
+
+ std::mem::size_of_val(&d.signing_key)
79
79
+
}
80
80
+
IdentityData::ServiceDoc(d) => {
81
81
+
let mut s = std::mem::size_of::<MiniServiceDoc>();
82
82
+
s += std::mem::size_of_val(&d.services);
83
83
+
for sv in &d.services {
84
84
+
s += std::mem::size_of_val(&sv.full_id);
85
85
+
s += std::mem::size_of_val(&sv.r#type);
86
86
+
s += std::mem::size_of_val(&sv.endpoint);
87
87
+
}
88
88
+
s
76
89
}
77
90
};
78
91
wrapping + inner
···
168
181
}
169
182
}
170
183
184
184
+
/// Simplified info from service DID docs
185
185
+
#[derive(Debug, Clone, Serialize, Deserialize)]
186
186
+
pub struct MiniServiceDoc {
187
187
+
services: Vec<MiniService>,
188
188
+
}
189
189
+
190
190
+
impl MiniServiceDoc {
191
191
+
pub fn get(&self, id_fragment: &str, service_type: Option<&str>) -> Option<&MiniService> {
192
192
+
self.services.iter().find(|ms| {
193
193
+
ms.full_id.ends_with(id_fragment)
194
194
+
&& service_type.map(|t| t == ms.r#type).unwrap_or(true)
195
195
+
})
196
196
+
}
197
197
+
}
198
198
+
199
199
+
/// The corresponding service info
200
200
+
#[derive(Debug, Clone, Serialize, Deserialize)]
201
201
+
pub struct MiniService {
202
202
+
/// The full id
203
203
+
///
204
204
+
/// for informational purposes only -- services are deduplicated by id fragment
205
205
+
full_id: String,
206
206
+
r#type: String,
207
207
+
/// HTTP endpoint for the actual service
208
208
+
pub endpoint: String,
209
209
+
}
210
210
+
211
211
+
impl TryFrom<DidDocument> for MiniServiceDoc {
212
212
+
type Error = String;
213
213
+
fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
214
214
+
let mut services = Vec::new();
215
215
+
let mut seen = HashSet::new();
216
216
+
217
217
+
for DidDocServic {
218
218
+
id,
219
219
+
r#type,
220
220
+
service_endpoint,
221
221
+
} in did_doc.service.unwrap_or(vec![])
222
222
+
{
223
223
+
let Some((_, id_fragment)) = id.rsplit_once('#') else {
224
224
+
continue;
225
225
+
};
226
226
+
if !seen.insert((id_fragment.to_string(), r#type.clone())) {
227
227
+
continue;
228
228
+
}
229
229
+
services.push(MiniService {
230
230
+
full_id: id,
231
231
+
r#type,
232
232
+
endpoint: service_endpoint,
233
233
+
});
234
234
+
}
235
235
+
236
236
+
Ok(Self { services })
237
237
+
}
238
238
+
}
239
239
+
171
240
/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
172
241
///
173
242
/// the hashset allows testing for presense of items in the queue.
···
296
365
let now = UtcDateTime::now();
297
366
let IdentityVal(last_fetch, data) = entry.value();
298
367
match data {
299
299
-
IdentityData::Doc(_) => {
300
300
-
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
301
301
-
Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
302
302
-
}
303
368
IdentityData::NotFound => {
304
369
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
305
370
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
313
378
self.queue_refresh(key).await;
314
379
}
315
380
Ok(Some(did.clone()))
381
381
+
}
382
382
+
_ => {
383
383
+
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
384
384
+
Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
316
385
}
317
386
}
318
387
}
···
362
431
let now = UtcDateTime::now();
363
432
let IdentityVal(last_fetch, data) = entry.value();
364
433
match data {
365
365
-
IdentityData::Did(_) => {
366
366
-
log::error!("identity value mixup: got a did from a did key (should be a doc)");
367
367
-
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
368
368
-
}
369
434
IdentityData::NotFound => {
370
435
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
371
436
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···
373
438
}
374
439
Ok(None)
375
440
}
376
376
-
IdentityData::Doc(mini_did) => {
441
441
+
IdentityData::Doc(mini_doc) => {
377
442
if (now - *last_fetch) >= MIN_TTL {
378
443
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
379
444
self.queue_refresh(key).await;
380
445
}
381
381
-
Ok(Some(mini_did.clone()))
446
446
+
Ok(Some(mini_doc.clone()))
447
447
+
}
448
448
+
_ => {
449
449
+
log::error!("identity value mixup: got a doc from a handle key (should be a did)");
450
450
+
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
451
451
+
}
452
452
+
}
453
453
+
}
454
454
+
455
455
+
/// Fetch (and cache) a service mini doc from a did
456
456
+
pub async fn did_to_mini_service_doc(
457
457
+
&self,
458
458
+
did: &Did,
459
459
+
) -> Result<Option<MiniServiceDoc>, IdentityError> {
460
460
+
let key = IdentityKey::ServiceDid(did.clone());
461
461
+
metrics::counter!("slingshot_get_service_did_doc").increment(1);
462
462
+
let entry = self
463
463
+
.cache
464
464
+
.get_or_fetch(&key, {
465
465
+
let did = did.clone();
466
466
+
let resolver = self.did_resolver.clone();
467
467
+
|| async move {
468
468
+
let t0 = Instant::now();
469
469
+
let (res, success) = match resolver.resolve(&did).await {
470
470
+
Ok(did_doc) if did_doc.id != did.to_string() => (
471
471
+
// TODO: fix in atrium: should verify id is did
472
472
+
Err(IdentityError::BadDidDoc(
473
473
+
"did doc's id did not match did".to_string(),
474
474
+
)),
475
475
+
"false",
476
476
+
),
477
477
+
Ok(did_doc) => match did_doc.try_into() {
478
478
+
Ok(mini_service_doc) => (
479
479
+
Ok(IdentityVal(
480
480
+
UtcDateTime::now(),
481
481
+
IdentityData::ServiceDoc(mini_service_doc),
482
482
+
)),
483
483
+
"true",
484
484
+
),
485
485
+
Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"),
486
486
+
},
487
487
+
Err(atrium_identity::Error::NotFound) => (
488
488
+
Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)),
489
489
+
"false",
490
490
+
),
491
491
+
Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"),
492
492
+
};
493
493
+
metrics::histogram!("slingshot_fetch_service_did_doc", "success" => success)
494
494
+
.record(t0.elapsed());
495
495
+
res
496
496
+
}
497
497
+
})
498
498
+
.await?;
499
499
+
500
500
+
let now = UtcDateTime::now();
501
501
+
let IdentityVal(last_fetch, data) = entry.value();
502
502
+
match data {
503
503
+
IdentityData::NotFound => {
504
504
+
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
505
505
+
metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
506
506
+
self.queue_refresh(key).await;
507
507
+
}
508
508
+
Ok(None)
509
509
+
}
510
510
+
IdentityData::ServiceDoc(mini_service_doc) => {
511
511
+
if (now - *last_fetch) >= MIN_TTL {
512
512
+
metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
513
513
+
self.queue_refresh(key).await;
514
514
+
}
515
515
+
Ok(Some(mini_service_doc.clone()))
516
516
+
}
517
517
+
_ => {
518
518
+
log::error!(
519
519
+
"identity value mixup: got a doc from a different key type (should be a service did)"
520
520
+
);
521
521
+
Err(IdentityError::IdentityValTypeMixup(did.to_string()))
382
522
}
383
523
}
384
524
}
···
519
659
log::warn!(
520
660
"refreshed did doc failed: wrong did doc id. dropping refresh."
521
661
);
662
662
+
self.complete_refresh(&task_key).await?;
522
663
continue;
523
664
}
524
665
let mini_doc = match did_doc.try_into() {
···
526
667
Err(e) => {
527
668
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
528
669
log::warn!(
529
529
-
"converting mini doc failed: {e:?}. dropping refresh."
670
670
+
"converting mini doc for {did:?} failed: {e:?}. dropping refresh."
530
671
);
672
672
+
self.complete_refresh(&task_key).await?;
531
673
continue;
532
674
}
533
675
};
···
554
696
}
555
697
556
698
self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
699
699
+
}
700
700
+
IdentityKey::ServiceDid(ref did) => {
701
701
+
log::trace!("refreshing service did doc: {did:?}");
702
702
+
703
703
+
match self.did_resolver.resolve(did).await {
704
704
+
Ok(did_doc) => {
705
705
+
// TODO: fix in atrium: should verify id is did
706
706
+
if did_doc.id != did.to_string() {
707
707
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "wrong did").increment(1);
708
708
+
log::warn!(
709
709
+
"refreshed did doc failed: wrong did doc id. dropping refresh."
710
710
+
);
711
711
+
self.complete_refresh(&task_key).await?;
712
712
+
continue;
713
713
+
}
714
714
+
let mini_service_doc = match did_doc.try_into() {
715
715
+
Ok(md) => md,
716
716
+
Err(e) => {
717
717
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
718
718
+
log::warn!(
719
719
+
"converting mini service doc failed: {e:?}. dropping refresh."
720
720
+
);
721
721
+
self.complete_refresh(&task_key).await?;
722
722
+
continue;
723
723
+
}
724
724
+
};
725
725
+
metrics::counter!("identity_service_did_refresh", "success" => "true")
726
726
+
.increment(1);
727
727
+
self.cache.insert(
728
728
+
task_key.clone(),
729
729
+
IdentityVal(
730
730
+
UtcDateTime::now(),
731
731
+
IdentityData::ServiceDoc(mini_service_doc),
732
732
+
),
733
733
+
);
734
734
+
}
735
735
+
Err(atrium_identity::Error::NotFound) => {
736
736
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "not found").increment(1);
737
737
+
self.cache.insert(
738
738
+
task_key.clone(),
739
739
+
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
740
740
+
);
741
741
+
}
742
742
+
Err(err) => {
743
743
+
metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "other").increment(1);
744
744
+
log::warn!(
745
745
+
"failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
746
746
+
);
747
747
+
}
748
748
+
}
557
749
}
558
750
}
559
751
}
+2
slingshot/src/lib.rs
···
3
3
mod firehose_cache;
4
4
mod healthcheck;
5
5
mod identity;
6
6
+
mod proxy;
6
7
mod record;
7
8
mod server;
8
9
···
10
11
pub use firehose_cache::firehose_cache;
11
12
pub use healthcheck::healthcheck;
12
13
pub use identity::{Identity, IdentityKey};
14
14
+
pub use proxy::Proxy;
13
15
pub use record::{CachedRecord, ErrorResponseObject, Repo};
14
16
pub use server::serve;
+23
-8
slingshot/src/main.rs
···
1
1
-
// use foyer::HybridCache;
2
2
-
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
1
use metrics_exporter_prometheus::PrometheusBuilder;
4
2
use slingshot::{
5
5
-
Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
3
3
+
Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6
4
};
7
5
use std::net::SocketAddr;
8
6
use std::path::PathBuf;
9
7
10
8
use clap::Parser;
11
9
use tokio_util::sync::CancellationToken;
10
10
+
use url::Url;
12
11
13
12
/// Slingshot record edge cache
14
13
#[derive(Parser, Debug, Clone)]
···
48
47
#[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
49
48
#[clap(default_value_t = 1)]
50
49
identity_cache_disk_gb: usize,
50
50
+
/// the address of this server
51
51
+
///
52
52
+
/// used if --acme-domain is not set, defaulting to `--bind`
53
53
+
#[arg(long, conflicts_with("acme_domain"), env = "SLINGSHOT_PUBLIC_HOST")]
54
54
+
base_url: Option<Url>,
51
55
/// the domain pointing to this server
52
56
///
53
57
/// if present:
···
101
105
102
106
let args = Args::parse();
103
107
108
108
+
let base_url: Url = args
109
109
+
.base_url
110
110
+
.or_else(|| {
111
111
+
args.acme_domain
112
112
+
.as_ref()
113
113
+
.map(|d| Url::parse(&format!("https://{d}")).unwrap())
114
114
+
})
115
115
+
.unwrap_or_else(|| Url::parse(&format!("http://{}", args.bind)).unwrap());
116
116
+
104
117
if args.collect_metrics {
105
118
log::trace!("installing metrics server...");
106
119
if let Err(e) = install_metrics_server(args.bind_metrics) {
···
143
156
)
144
157
.await
145
158
.map_err(|e| format!("identity setup failed: {e:?}"))?;
146
146
-
147
147
-
log::info!("identity service ready.");
148
159
let identity_refresher = identity.clone();
149
160
let identity_shutdown = shutdown.clone();
150
161
tasks.spawn(async move {
151
162
identity_refresher.run_refresher(identity_shutdown).await?;
152
163
Ok(())
153
164
});
165
165
+
log::info!("identity service ready.");
154
166
155
167
let repo = Repo::new(identity.clone());
168
168
+
let proxy = Proxy::new(identity.clone());
156
169
157
170
let identity_for_server = identity.clone();
158
171
let server_shutdown = shutdown.clone();
···
163
176
server_cache_handle,
164
177
identity_for_server,
165
178
repo,
179
179
+
proxy,
180
180
+
base_url,
166
181
args.acme_domain,
167
182
args.acme_contact,
168
183
args.acme_cache_path,
···
235
250
) -> Result<(), metrics_exporter_prometheus::BuildError> {
236
251
log::info!("installing metrics server...");
237
252
PrometheusBuilder::new()
238
238
-
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
239
239
-
.set_bucket_duration(std::time::Duration::from_secs(300))?
240
240
-
.set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
253
253
+
.set_buckets(&[0.001, 0.006, 0.036, 0.216, 1.296, 7.776, 45.656])?
254
254
+
.set_bucket_duration(std::time::Duration::from_secs(15))?
255
255
+
.set_bucket_count(std::num::NonZero::new(4).unwrap()) // count * duration = bucket lifetime
241
256
.set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
242
257
.with_http_listener(bind_metrics)
243
258
.install()?;
+601
slingshot/src/proxy.rs
···
1
1
+
use crate::{Identity, error::ProxyError, server::HydrationSource};
2
2
+
use atrium_api::types::string::{AtIdentifier, Cid, Did, Nsid, RecordKey};
3
3
+
use reqwest::Client;
4
4
+
use serde_json::{Map, Value};
5
5
+
use std::{collections::HashMap, time::Duration};
6
6
+
use url::Url;
7
7
+
8
8
+
pub enum ParamValue {
9
9
+
String(Vec<String>),
10
10
+
Int(Vec<i64>),
11
11
+
Bool(Vec<bool>),
12
12
+
}
13
13
+
pub struct Params(HashMap<String, ParamValue>);
14
14
+
15
15
+
impl TryFrom<Map<String, Value>> for Params {
16
16
+
type Error = (); // TODO
17
17
+
fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18
18
+
let mut out = HashMap::new();
19
19
+
for (k, v) in val {
20
20
+
match v {
21
21
+
Value::String(s) => out.insert(k, ParamValue::String(vec![s])),
22
22
+
Value::Bool(b) => out.insert(k, ParamValue::Bool(vec![b])),
23
23
+
Value::Number(n) => {
24
24
+
let Some(i) = n.as_i64() else {
25
25
+
return Err(());
26
26
+
};
27
27
+
out.insert(k, ParamValue::Int(vec![i]))
28
28
+
}
29
29
+
Value::Array(a) => {
30
30
+
let Some(first) = a.first() else {
31
31
+
continue;
32
32
+
};
33
33
+
if first.is_string() {
34
34
+
let mut vals = Vec::with_capacity(a.len());
35
35
+
for v in a {
36
36
+
let Some(v) = v.as_str() else {
37
37
+
return Err(());
38
38
+
};
39
39
+
vals.push(v.to_string());
40
40
+
}
41
41
+
out.insert(k, ParamValue::String(vals));
42
42
+
} else if first.is_i64() {
43
43
+
let mut vals = Vec::with_capacity(a.len());
44
44
+
for v in a {
45
45
+
let Some(v) = v.as_i64() else {
46
46
+
return Err(());
47
47
+
};
48
48
+
vals.push(v);
49
49
+
}
50
50
+
out.insert(k, ParamValue::Int(vals));
51
51
+
} else if first.is_boolean() {
52
52
+
let mut vals = Vec::with_capacity(a.len());
53
53
+
for v in a {
54
54
+
let Some(v) = v.as_bool() else {
55
55
+
return Err(());
56
56
+
};
57
57
+
vals.push(v);
58
58
+
}
59
59
+
out.insert(k, ParamValue::Bool(vals));
60
60
+
}
61
61
+
todo!();
62
62
+
}
63
63
+
_ => return Err(()),
64
64
+
};
65
65
+
}
66
66
+
67
67
+
Ok(Self(out))
68
68
+
}
69
69
+
}
70
70
+
71
71
+
#[derive(Clone)]
72
72
+
pub struct Proxy {
73
73
+
identity: Identity,
74
74
+
client: Client,
75
75
+
}
76
76
+
77
77
+
impl Proxy {
78
78
+
pub fn new(identity: Identity) -> Self {
79
79
+
let client = Client::builder()
80
80
+
.user_agent(format!(
81
81
+
"microcosm slingshot v{} (contact: @bad-example.com)",
82
82
+
env!("CARGO_PKG_VERSION")
83
83
+
))
84
84
+
.no_proxy()
85
85
+
.timeout(Duration::from_secs(6))
86
86
+
.build()
87
87
+
.unwrap();
88
88
+
Self { client, identity }
89
89
+
}
90
90
+
91
91
+
pub async fn proxy(
92
92
+
&self,
93
93
+
service_did: &Did,
94
94
+
service_id: &str,
95
95
+
xrpc: &Nsid,
96
96
+
authorization: Option<&str>,
97
97
+
atproto_accept_labelers: Option<&str>,
98
98
+
params: Option<Map<String, Value>>,
99
99
+
) -> Result<Value, ProxyError> {
100
100
+
let mut upstream: Url = self
101
101
+
.identity
102
102
+
.did_to_mini_service_doc(service_did)
103
103
+
.await?
104
104
+
.ok_or(ProxyError::ServiceNotFound)?
105
105
+
.get(service_id, None)
106
106
+
.ok_or(ProxyError::ServiceNotMatched)?
107
107
+
.endpoint
108
108
+
.parse()?;
109
109
+
110
110
+
upstream.set_path(&format!("/xrpc/{}", xrpc.as_str()));
111
111
+
112
112
+
if let Some(params) = params {
113
113
+
let mut query = upstream.query_pairs_mut();
114
114
+
let Params(ps) = params.try_into().expect("valid params");
115
115
+
for (k, pvs) in ps {
116
116
+
match pvs {
117
117
+
ParamValue::String(s) => {
118
118
+
for s in s {
119
119
+
query.append_pair(&k, &s);
120
120
+
}
121
121
+
}
122
122
+
ParamValue::Int(i) => {
123
123
+
for i in i {
124
124
+
query.append_pair(&k, &i.to_string());
125
125
+
}
126
126
+
}
127
127
+
ParamValue::Bool(b) => {
128
128
+
for b in b {
129
129
+
query.append_pair(&k, &b.to_string());
130
130
+
}
131
131
+
}
132
132
+
}
133
133
+
}
134
134
+
}
135
135
+
136
136
+
// TODO i mean maybe we should look for headers also in our headers but not obviously
137
137
+
let mut headers = reqwest::header::HeaderMap::new();
138
138
+
// TODO: check the jwt aud against the upstream!!!
139
139
+
if let Some(auth) = authorization {
140
140
+
headers.insert("Authorization", auth.try_into()?);
141
141
+
}
142
142
+
if let Some(aal) = atproto_accept_labelers {
143
143
+
headers.insert("atproto-accept-labelers", aal.try_into()?);
144
144
+
}
145
145
+
146
146
+
let t0 = std::time::Instant::now();
147
147
+
let res = self
148
148
+
.client
149
149
+
.get(upstream)
150
150
+
.headers(headers)
151
151
+
.send()
152
152
+
.await
153
153
+
.and_then(|r| r.error_for_status());
154
154
+
155
155
+
if res.is_ok() {
156
156
+
metrics::histogram!("slingshot_proxy_upstream_request", "success" => "true")
157
157
+
.record(t0.elapsed());
158
158
+
} else {
159
159
+
metrics::histogram!("slingshot_proxy_upstream_request", "success" => "false")
160
160
+
.record(t0.elapsed());
161
161
+
}
162
162
+
163
163
+
Ok(res?.json().await?)
164
164
+
}
165
165
+
}
166
166
+
167
167
+
#[derive(Debug, PartialEq)]
168
168
+
pub enum PathPart {
169
169
+
Scalar(String),
170
170
+
Vector(String, Option<String>), // key, $type
171
171
+
}
172
172
+
173
173
+
pub fn parse_record_path(input: &str) -> Result<Vec<PathPart>, String> {
174
174
+
let mut out = Vec::new();
175
175
+
176
176
+
let mut key_acc = String::new();
177
177
+
let mut type_acc = String::new();
178
178
+
let mut in_bracket = false;
179
179
+
let mut chars = input.chars().enumerate();
180
180
+
while let Some((i, c)) = chars.next() {
181
181
+
match c {
182
182
+
'[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
183
183
+
'[' if key_acc.is_empty() => {
184
184
+
return Err(format!("missing key before opening bracket, at {i}"));
185
185
+
}
186
186
+
'[' => in_bracket = true,
187
187
+
']' if in_bracket => {
188
188
+
in_bracket = false;
189
189
+
let key = std::mem::take(&mut key_acc);
190
190
+
let r#type = std::mem::take(&mut type_acc);
191
191
+
let t = if r#type.is_empty() {
192
192
+
None
193
193
+
} else {
194
194
+
Some(r#type)
195
195
+
};
196
196
+
out.push(PathPart::Vector(key, t));
197
197
+
// peek ahead because we need a dot after array if there's more and i don't want to add more loop state
198
198
+
let Some((i, c)) = chars.next() else {
199
199
+
break;
200
200
+
};
201
201
+
if c != '.' {
202
202
+
return Err(format!(
203
203
+
"expected dot after close bracket, found {c:?} at {i}"
204
204
+
));
205
205
+
}
206
206
+
}
207
207
+
']' => return Err(format!("unexpected close bracket at {i}")),
208
208
+
'.' if in_bracket => type_acc.push(c),
209
209
+
'.' if key_acc.is_empty() => {
210
210
+
return Err(format!("missing key before next segment, at {i}"));
211
211
+
}
212
212
+
'.' => {
213
213
+
let key = std::mem::take(&mut key_acc);
214
214
+
assert!(type_acc.is_empty());
215
215
+
out.push(PathPart::Scalar(key));
216
216
+
}
217
217
+
_ if in_bracket => type_acc.push(c),
218
218
+
_ => key_acc.push(c),
219
219
+
}
220
220
+
}
221
221
+
if in_bracket {
222
222
+
return Err("unclosed bracket".into());
223
223
+
}
224
224
+
if !key_acc.is_empty() {
225
225
+
out.push(PathPart::Scalar(key_acc));
226
226
+
}
227
227
+
Ok(out)
228
228
+
}
229
229
+
230
230
+
#[derive(Debug, Clone, Copy, PartialEq)]
231
231
+
pub enum RefShape {
232
232
+
StrongRef,
233
233
+
AtUri,
234
234
+
AtUriParts,
235
235
+
Did,
236
236
+
Handle,
237
237
+
AtIdentifier,
238
238
+
}
239
239
+
240
240
+
impl TryFrom<&str> for RefShape {
241
241
+
type Error = String;
242
242
+
fn try_from(s: &str) -> Result<Self, Self::Error> {
243
243
+
match s {
244
244
+
"strong-ref" => Ok(Self::StrongRef),
245
245
+
"at-uri" => Ok(Self::AtUri),
246
246
+
"at-uri-parts" => Ok(Self::AtUriParts),
247
247
+
"did" => Ok(Self::Did),
248
248
+
"handle" => Ok(Self::Handle),
249
249
+
"at-identifier" => Ok(Self::AtIdentifier),
250
250
+
_ => Err(format!("unknown shape: {s}")),
251
251
+
}
252
252
+
}
253
253
+
}
254
254
+
255
255
+
#[derive(Debug, PartialEq)]
256
256
+
pub struct FullAtUriParts {
257
257
+
pub repo: AtIdentifier,
258
258
+
pub collection: Nsid,
259
259
+
pub rkey: RecordKey,
260
260
+
pub cid: Option<Cid>,
261
261
+
}
262
262
+
263
263
+
impl FullAtUriParts {
264
264
+
pub fn to_uri(&self) -> String {
265
265
+
let repo: String = self.repo.clone().into(); // no as_str for AtIdentifier atrium???
266
266
+
let collection = self.collection.as_str();
267
267
+
let rkey = self.rkey.as_str();
268
268
+
format!("at://{repo}/{collection}/{rkey}")
269
269
+
}
270
270
+
}
271
271
+
272
272
+
// TODO: move this to links
273
273
+
pub fn split_uri(uri: &str) -> Option<(AtIdentifier, Nsid, RecordKey)> {
274
274
+
let rest = uri.strip_prefix("at://")?;
275
275
+
let (repo, rest) = rest.split_once("/")?;
276
276
+
let repo = repo.parse().ok()?;
277
277
+
let (collection, rkey) = rest.split_once("/")?;
278
278
+
let collection = collection.parse().ok()?;
279
279
+
let rkey = rkey.split_once('#').map(|(k, _)| k).unwrap_or(rkey);
280
280
+
let rkey = rkey.split_once('?').map(|(k, _)| k).unwrap_or(rkey);
281
281
+
let rkey = rkey.parse().ok()?;
282
282
+
Some((repo, collection, rkey))
283
283
+
}
284
284
+
285
285
+
#[derive(Debug, PartialEq)]
286
286
+
pub enum MatchedRef {
287
287
+
AtUri(FullAtUriParts),
288
288
+
Identifier(AtIdentifier),
289
289
+
}
290
290
+
291
291
+
pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> {
292
292
+
// TODO: actually validate at-uri format
293
293
+
// TODO: actually validate everything else also
294
294
+
// TODO: should this function normalize identifiers to DIDs probably?
295
295
+
// or just return at-uri parts so the caller can resolve and reassemble
296
296
+
match shape {
297
297
+
RefShape::StrongRef => {
298
298
+
let o = val.as_object()?;
299
299
+
let uri = o.get("uri")?.as_str()?.to_string();
300
300
+
let cid = o.get("cid")?.as_str()?.parse().ok()?;
301
301
+
let (repo, collection, rkey) = split_uri(&uri)?;
302
302
+
Some(MatchedRef::AtUri(FullAtUriParts {
303
303
+
repo,
304
304
+
collection,
305
305
+
rkey,
306
306
+
cid: Some(cid),
307
307
+
}))
308
308
+
}
309
309
+
RefShape::AtUri => {
310
310
+
let uri = val.as_str()?.to_string();
311
311
+
let (repo, collection, rkey) = split_uri(&uri)?;
312
312
+
Some(MatchedRef::AtUri(FullAtUriParts {
313
313
+
repo,
314
314
+
collection,
315
315
+
rkey,
316
316
+
cid: None,
317
317
+
}))
318
318
+
}
319
319
+
RefShape::AtUriParts => {
320
320
+
let o = val.as_object()?;
321
321
+
let repo = o.get("repo").or(o.get("did"))?.as_str()?.parse().ok()?;
322
322
+
let collection = o.get("collection")?.as_str()?.parse().ok()?;
323
323
+
let rkey = o.get("rkey")?.as_str()?.parse().ok()?;
324
324
+
let cid = o
325
325
+
.get("cid")
326
326
+
.and_then(|v| v.as_str())
327
327
+
.and_then(|s| s.parse().ok());
328
328
+
Some(MatchedRef::AtUri(FullAtUriParts {
329
329
+
repo,
330
330
+
collection,
331
331
+
rkey,
332
332
+
cid,
333
333
+
}))
334
334
+
}
335
335
+
RefShape::Did => {
336
336
+
let did = val.as_str()?.parse().ok()?;
337
337
+
Some(MatchedRef::Identifier(AtIdentifier::Did(did)))
338
338
+
}
339
339
+
RefShape::Handle => {
340
340
+
let handle = val.as_str()?.parse().ok()?;
341
341
+
Some(MatchedRef::Identifier(AtIdentifier::Handle(handle)))
342
342
+
}
343
343
+
RefShape::AtIdentifier => {
344
344
+
let identifier = val.as_str()?.parse().ok()?;
345
345
+
Some(MatchedRef::Identifier(identifier))
346
346
+
}
347
347
+
}
348
348
+
}
349
349
+
350
350
+
// TODO: send back metadata about the matching
351
351
+
pub fn extract_links(
352
352
+
sources: Vec<HydrationSource>,
353
353
+
skeleton: &Value,
354
354
+
) -> Result<Vec<MatchedRef>, String> {
355
355
+
// collect early to catch errors from the client
356
356
+
// (TODO maybe the handler should do this and pass in the processed stuff probably definitely yeah)
357
357
+
let sources = sources
358
358
+
.into_iter()
359
359
+
.map(|HydrationSource { path, shape }| {
360
360
+
let path_parts = parse_record_path(&path)?;
361
361
+
let shape: RefShape = shape.as_str().try_into()?;
362
362
+
Ok((path_parts, shape))
363
363
+
})
364
364
+
.collect::<Result<Vec<_>, String>>()?;
365
365
+
366
366
+
// lazy first impl, just re-walk the skeleton as many times as needed
367
367
+
// not deduplicating for now
368
368
+
let mut out = Vec::new();
369
369
+
for (path_parts, shape) in sources {
370
370
+
for val in PathWalker::new(&path_parts, skeleton) {
371
371
+
if let Some(matched) = match_shape(shape, val) {
372
372
+
out.push(matched);
373
373
+
}
374
374
+
}
375
375
+
}
376
376
+
377
377
+
Ok(out)
378
378
+
}
379
379
+
380
380
+
struct PathWalker<'a> {
381
381
+
todo: Vec<(&'a [PathPart], &'a Value)>,
382
382
+
}
383
383
+
impl<'a> PathWalker<'a> {
384
384
+
fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
385
385
+
Self {
386
386
+
todo: vec![(path_parts, skeleton)],
387
387
+
}
388
388
+
}
389
389
+
}
390
390
+
impl<'a> Iterator for PathWalker<'a> {
391
391
+
type Item = &'a Value;
392
392
+
fn next(&mut self) -> Option<Self::Item> {
393
393
+
loop {
394
394
+
let (parts, val) = self.todo.pop()?;
395
395
+
let Some((part, rest)) = parts.split_first() else {
396
396
+
return Some(val);
397
397
+
};
398
398
+
let Some(o) = val.as_object() else {
399
399
+
continue;
400
400
+
};
401
401
+
match part {
402
402
+
PathPart::Scalar(k) => {
403
403
+
let Some(v) = o.get(k) else {
404
404
+
continue;
405
405
+
};
406
406
+
self.todo.push((rest, v));
407
407
+
}
408
408
+
PathPart::Vector(k, t) => {
409
409
+
let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
410
410
+
continue;
411
411
+
};
412
412
+
for v in a.iter().rev().filter(|c| {
413
413
+
let Some(t) = t else { return true };
414
414
+
c.as_object()
415
415
+
.and_then(|o| o.get("$type"))
416
416
+
.and_then(|v| v.as_str())
417
417
+
.map(|s| s == t)
418
418
+
.unwrap_or(false)
419
419
+
}) {
420
420
+
self.todo.push((rest, v))
421
421
+
}
422
422
+
}
423
423
+
}
424
424
+
}
425
425
+
}
426
426
+
}
427
427
+
428
428
+
#[cfg(test)]
429
429
+
mod tests {
430
430
+
use super::*;
431
431
+
use serde_json::json;
432
432
+
433
433
+
static TEST_CID: &str = "bafyreidffwk5wvh5l76yy7zefiqrovv6yaaegb4wg4zaq35w7nt3quix5a";
434
434
+
435
435
+
#[test]
436
436
+
fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
437
437
+
let cases = [
438
438
+
("", vec![]),
439
439
+
("subject", vec![PathPart::Scalar("subject".into())]),
440
440
+
("authorDid", vec![PathPart::Scalar("authorDid".into())]),
441
441
+
(
442
442
+
"subject.uri",
443
443
+
vec![
444
444
+
PathPart::Scalar("subject".into()),
445
445
+
PathPart::Scalar("uri".into()),
446
446
+
],
447
447
+
),
448
448
+
("members[]", vec![PathPart::Vector("members".into(), None)]),
449
449
+
(
450
450
+
"add[].key",
451
451
+
vec![
452
452
+
PathPart::Vector("add".into(), None),
453
453
+
PathPart::Scalar("key".into()),
454
454
+
],
455
455
+
),
456
456
+
("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
457
457
+
(
458
458
+
"a[b.c]",
459
459
+
vec![PathPart::Vector("a".into(), Some("b.c".into()))],
460
460
+
),
461
461
+
(
462
462
+
"facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did",
463
463
+
vec![
464
464
+
PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
465
465
+
PathPart::Vector(
466
466
+
"features".into(),
467
467
+
Some("app.bsky.richtext.facet#mention".into()),
468
468
+
),
469
469
+
PathPart::Scalar("did".into()),
470
470
+
],
471
471
+
),
472
472
+
];
473
473
+
474
474
+
for (path, expected) in cases {
475
475
+
let parsed = parse_record_path(path)?;
476
476
+
assert_eq!(parsed, expected, "path: {path:?}");
477
477
+
}
478
478
+
479
479
+
Ok(())
480
480
+
}
481
481
+
482
482
+
#[test]
483
483
+
fn test_match_shape() {
484
484
+
let cases = [
485
485
+
("strong-ref", json!(""), None),
486
486
+
("strong-ref", json!({}), None),
487
487
+
("strong-ref", json!({ "uri": "abc" }), None),
488
488
+
("strong-ref", json!({ "cid": TEST_CID }), None),
489
489
+
(
490
490
+
"strong-ref",
491
491
+
json!({ "uri": "at://a.com/xx.yy.zz/1", "cid": TEST_CID }),
492
492
+
Some(MatchedRef::AtUri(FullAtUriParts {
493
493
+
repo: "a.com".parse().unwrap(),
494
494
+
collection: "xx.yy.zz".parse().unwrap(),
495
495
+
rkey: "1".parse().unwrap(),
496
496
+
cid: Some(TEST_CID.parse().unwrap()),
497
497
+
})),
498
498
+
),
499
499
+
("at-uri", json!({ "uri": "abc" }), None),
500
500
+
(
501
501
+
"at-uri",
502
502
+
json!({ "uri": "at://did:web:y.com/xx.yy.zz/1", "cid": TEST_CID }),
503
503
+
None,
504
504
+
),
505
505
+
(
506
506
+
"at-uri",
507
507
+
json!("at://did:web:y.com/xx.yy.zz/1"),
508
508
+
Some(MatchedRef::AtUri(FullAtUriParts {
509
509
+
repo: "did:web:y.com".parse().unwrap(),
510
510
+
collection: "xx.yy.zz".parse().unwrap(),
511
511
+
rkey: "1".parse().unwrap(),
512
512
+
cid: None,
513
513
+
})),
514
514
+
),
515
515
+
("at-uri-parts", json!("abc"), None),
516
516
+
("at-uri-parts", json!({}), None),
517
517
+
(
518
518
+
"at-uri-parts",
519
519
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}),
520
520
+
Some(MatchedRef::AtUri(FullAtUriParts {
521
521
+
repo: "a.com".parse().unwrap(),
522
522
+
collection: "xx.yy.zz".parse().unwrap(),
523
523
+
rkey: "1".parse().unwrap(),
524
524
+
cid: Some(TEST_CID.parse().unwrap()),
525
525
+
})),
526
526
+
),
527
527
+
(
528
528
+
"at-uri-parts",
529
529
+
json!({"did": "a.com", "collection": "xx.yy.zz", "rkey": "1"}),
530
530
+
Some(MatchedRef::AtUri(FullAtUriParts {
531
531
+
repo: "a.com".parse().unwrap(),
532
532
+
collection: "xx.yy.zz".parse().unwrap(),
533
533
+
rkey: "1".parse().unwrap(),
534
534
+
cid: None,
535
535
+
})),
536
536
+
),
537
537
+
(
538
538
+
"at-uri-parts",
539
539
+
// 'repo' takes precedence over 'did'
540
540
+
json!({"did": "did:web:a.com", "repo": "z.com", "collection": "xx.yy.zz", "rkey": "1"}),
541
541
+
Some(MatchedRef::AtUri(FullAtUriParts {
542
542
+
repo: "z.com".parse().unwrap(),
543
543
+
collection: "xx.yy.zz".parse().unwrap(),
544
544
+
rkey: "1".parse().unwrap(),
545
545
+
cid: None,
546
546
+
})),
547
547
+
),
548
548
+
(
549
549
+
"at-uri-parts",
550
550
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}),
551
551
+
Some(MatchedRef::AtUri(FullAtUriParts {
552
552
+
repo: "a.com".parse().unwrap(),
553
553
+
collection: "xx.yy.zz".parse().unwrap(),
554
554
+
rkey: "1".parse().unwrap(),
555
555
+
cid: Some(TEST_CID.parse().unwrap()),
556
556
+
})),
557
557
+
),
558
558
+
(
559
559
+
"at-uri-parts",
560
560
+
json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": {}}),
561
561
+
Some(MatchedRef::AtUri(FullAtUriParts {
562
562
+
repo: "a.com".parse().unwrap(),
563
563
+
collection: "xx.yy.zz".parse().unwrap(),
564
564
+
rkey: "1".parse().unwrap(),
565
565
+
cid: None,
566
566
+
})),
567
567
+
),
568
568
+
("did", json!({}), None),
569
569
+
("did", json!(""), None),
570
570
+
("did", json!("bad-example.com"), None),
571
571
+
(
572
572
+
"did",
573
573
+
json!("did:plc:xyz"),
574
574
+
Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())),
575
575
+
),
576
576
+
("handle", json!({}), None),
577
577
+
(
578
578
+
"handle",
579
579
+
json!("bad-example.com"),
580
580
+
Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())),
581
581
+
),
582
582
+
("handle", json!("did:plc:xyz"), None),
583
583
+
("at-identifier", json!({}), None),
584
584
+
(
585
585
+
"at-identifier",
586
586
+
json!("bad-example.com"),
587
587
+
Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())),
588
588
+
),
589
589
+
(
590
590
+
"at-identifier",
591
591
+
json!("did:plc:xyz"),
592
592
+
Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())),
593
593
+
),
594
594
+
];
595
595
+
for (i, (shape, val, expected)) in cases.into_iter().enumerate() {
596
596
+
let s = shape.try_into().unwrap();
597
597
+
let matched = match_shape(s, &val);
598
598
+
assert_eq!(matched, expected, "{i}: shape: {shape:?}, val: {val:?}");
599
599
+
}
600
600
+
}
601
601
+
}
+2
-2
slingshot/src/record.rs
···
11
11
12
12
#[derive(Debug, Serialize, Deserialize)]
13
13
pub struct RawRecord {
14
14
-
cid: Cid,
15
15
-
record: String,
14
14
+
pub cid: Cid,
15
15
+
pub record: String,
16
16
}
17
17
18
18
// TODO: should be able to do typed CID
+613
-47
slingshot/src/server.rs
···
1
1
use crate::{
2
2
-
CachedRecord, ErrorResponseObject, Identity, Repo,
2
2
+
CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3
3
error::{RecordError, ServerError},
4
4
+
proxy::{FullAtUriParts, MatchedRef, extract_links, split_uri},
5
5
+
record::RawRecord,
4
6
};
5
5
-
use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
7
7
+
use atrium_api::types::string::{AtIdentifier, Cid, Did, Handle, Nsid, RecordKey};
6
8
use foyer::HybridCache;
7
9
use links::at_uri::parse_at_uri as normalize_at_uri;
8
10
use serde::Serialize;
9
9
-
use std::path::PathBuf;
10
10
-
use std::str::FromStr;
11
11
-
use std::sync::Arc;
12
12
-
use std::time::Instant;
11
11
+
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc, time::Instant};
12
12
+
use tokio::sync::mpsc;
13
13
use tokio_util::sync::CancellationToken;
14
14
15
15
use poem::{
···
24
24
};
25
25
use poem_openapi::{
26
26
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27
27
-
param::Query, payload::Json, types::Example,
27
27
+
Union, param::Query, payload::Json, types::Example,
28
28
};
29
29
30
30
fn example_handle() -> String {
···
33
33
fn example_did() -> String {
34
34
"did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
35
35
}
36
36
+
fn example_service_did() -> String {
37
37
+
"did:web:constellation.microcosm.blue".to_string()
38
38
+
}
36
39
fn example_collection() -> String {
37
40
"app.bsky.feed.like".to_string()
38
41
}
39
42
fn example_rkey() -> String {
40
43
"3lv4ouczo2b2a".to_string()
44
44
+
}
45
45
+
fn example_id_fragment() -> String {
46
46
+
"#constellation".to_string()
41
47
}
42
48
fn example_uri() -> String {
43
49
format!(
···
54
60
"zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
55
61
}
56
62
57
57
-
#[derive(Object)]
63
63
+
#[derive(Debug, Object)]
58
64
#[oai(example = true)]
59
65
struct XrpcErrorResponseObject {
60
66
/// Should correspond an error `name` in the lexicon errors array
···
85
91
}))
86
92
}
87
93
88
88
-
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
89
89
-
ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
94
94
+
fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniDocResponse {
95
95
+
ResolveMiniDocResponse::BadRequest(Json(XrpcErrorResponseObject {
96
96
+
error: "InvalidRequest".to_string(),
97
97
+
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
98
98
+
}))
99
99
+
}
100
100
+
101
101
+
fn bad_request_handler_resolve_service(err: poem::Error) -> ResolveServiceResponse {
102
102
+
ResolveServiceResponse::BadRequest(Json(XrpcErrorResponseObject {
103
103
+
error: "InvalidRequest".to_string(),
104
104
+
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
105
105
+
}))
106
106
+
}
107
107
+
108
108
+
fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
109
109
+
ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
90
110
error: "InvalidRequest".to_string(),
91
111
message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
92
112
}))
···
181
201
182
202
#[derive(ApiResponse)]
183
203
#[oai(bad_request_handler = "bad_request_handler_resolve_mini")]
184
184
-
enum ResolveMiniIDResponse {
204
204
+
enum ResolveMiniDocResponse {
185
205
/// Identity resolved
186
206
#[oai(status = 200)]
187
207
Ok(Json<MiniDocResponseObject>),
···
192
212
193
213
#[derive(Object)]
194
214
#[oai(example = true)]
215
215
+
struct ServiceResponseObject {
216
216
+
/// The service endpoint URL, if found
217
217
+
endpoint: String,
218
218
+
}
219
219
+
impl Example for ServiceResponseObject {
220
220
+
fn example() -> Self {
221
221
+
Self {
222
222
+
endpoint: "https://example.com".to_string(),
223
223
+
}
224
224
+
}
225
225
+
}
226
226
+
227
227
+
#[derive(ApiResponse)]
228
228
+
#[oai(bad_request_handler = "bad_request_handler_resolve_service")]
229
229
+
enum ResolveServiceResponse {
230
230
+
/// Service resolved
231
231
+
#[oai(status = 200)]
232
232
+
Ok(Json<ServiceResponseObject>),
233
233
+
/// Bad request or service not resolved
234
234
+
#[oai(status = 400)]
235
235
+
BadRequest(XrpcError),
236
236
+
}
237
237
+
238
238
+
#[derive(Object)]
239
239
+
#[oai(rename_all = "camelCase")]
240
240
+
struct ProxyHydrationError {
241
241
+
/// Short description of why the hydration failed
242
242
+
reason: String,
243
243
+
/// Whether or not it's recommended to retry requesting this item
244
244
+
should_retry: bool,
245
245
+
/// URL to follow up at if retrying
246
246
+
follow_up: String,
247
247
+
}
248
248
+
249
249
+
#[derive(Object)]
250
250
+
#[oai(rename_all = "camelCase")]
251
251
+
struct ProxyHydrationPending {
252
252
+
/// URL you can request to finish hydrating this item
253
253
+
follow_up: String,
254
254
+
/// Why this item couldn't be hydrated: 'deadline' or 'limit'
255
255
+
///
256
256
+
/// - `deadline`: the item fetch didn't complete before the response was
257
257
+
/// due, but will continue on slingshot in the background -- `followUp`
258
258
+
/// requests are coalesced into the original item fetch to be available as
259
259
+
/// early as possible.
260
260
+
///
261
261
+
/// - `limit`: slingshot only attempts to hydrate the first 100 items found
262
262
+
/// in a proxied response, with the remaining marked `pending`. You can
263
263
+
/// request `followUp` to fetch them.
264
264
+
///
265
265
+
/// In the future, Slingshot may put pending links after `limit` into a low-
266
266
+
/// priority fetch queue, so that these items become available sooner on
267
267
+
/// follow-up request as well.
268
268
+
reason: String,
269
269
+
}
270
270
+
271
271
+
// todo: there's gotta be a supertrait that collects these?
272
272
+
use poem_openapi::types::{IsObjectType, ParseFromJSON, ToJSON, Type};
273
273
+
274
274
+
#[derive(Union)]
275
275
+
#[oai(discriminator_name = "status", rename_all = "camelCase")]
276
276
+
enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> {
277
277
+
Error(ProxyHydrationError),
278
278
+
Pending(ProxyHydrationPending),
279
279
+
Found(T),
280
280
+
}
281
281
+
282
282
+
#[derive(Object)]
283
283
+
#[oai(example = true)]
284
284
+
struct ProxyHydrateResponseObject {
285
285
+
/// The original upstream response content
286
286
+
output: serde_json::Value,
287
287
+
/// Any hydrated records
288
288
+
records: HashMap<String, Hydration<FoundRecordResponseObject>>,
289
289
+
/// Any hydrated identifiers
290
290
+
identifiers: HashMap<String, Hydration<MiniDocResponseObject>>,
291
291
+
}
292
292
+
impl Example for ProxyHydrateResponseObject {
293
293
+
fn example() -> Self {
294
294
+
Self {
295
295
+
output: serde_json::json!({}),
296
296
+
records: HashMap::from([(
297
297
+
"asdf".into(),
298
298
+
Hydration::Pending(ProxyHydrationPending {
299
299
+
follow_up: "/xrpc/com.atproto.repo.getRecord?...".to_string(),
300
300
+
reason: "deadline".to_string(),
301
301
+
}),
302
302
+
)]),
303
303
+
identifiers: HashMap::new(),
304
304
+
}
305
305
+
}
306
306
+
}
307
307
+
308
308
+
#[derive(ApiResponse)]
309
309
+
#[oai(bad_request_handler = "bad_request_handler_proxy_query")]
310
310
+
enum ProxyHydrateResponse {
311
311
+
#[oai(status = 200)]
312
312
+
Ok(Json<ProxyHydrateResponseObject>),
313
313
+
#[oai(status = 400)]
314
314
+
BadRequest(XrpcError),
315
315
+
}
316
316
+
317
317
+
#[derive(Object)]
318
318
+
pub struct HydrationSource {
319
319
+
/// Record Path syntax for locating fields
320
320
+
pub path: String,
321
321
+
/// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'.
322
322
+
///
323
323
+
/// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys.
324
324
+
/// - `at-uri`: string, must have all segments present (identifier, collection, rkey)
325
325
+
/// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored.
326
326
+
/// - `did`: string, `did` format
327
327
+
/// - `handle`: string, `handle` format
328
328
+
/// - `at-identifier`: string, `did` or `handle` format
329
329
+
pub shape: String,
330
330
+
}
331
331
+
332
332
+
#[derive(Object)]
333
333
+
#[oai(example = true)]
334
334
+
struct ProxyQueryPayload {
335
335
+
/// The NSID of the XRPC you wish to forward
336
336
+
xrpc: String,
337
337
+
/// The destination service the request will be forwarded to
338
338
+
atproto_proxy: String,
339
339
+
/// An optional auth token to pass on
340
340
+
///
341
341
+
/// the `aud` field must match the upstream atproto_proxy service
342
342
+
authorization: Option<String>,
343
343
+
/// An optional set of labelers to request be applied by the upstream
344
344
+
atproto_accept_labelers: Option<String>,
345
345
+
/// The `params` for the destination service XRPC endpoint
346
346
+
///
347
347
+
/// Currently this will be passed along unchecked, but a future version of
348
348
+
/// slingshot may attempt to do lexicon resolution to validate `params`
349
349
+
/// based on the upstream service
350
350
+
params: Option<serde_json::Value>,
351
351
+
/// Paths within the response to look for at-uris that can be hydrated
352
352
+
hydration_sources: Vec<HydrationSource>,
353
353
+
// todo: let clients pass a hydration deadline?
354
354
+
}
355
355
+
impl Example for ProxyQueryPayload {
356
356
+
fn example() -> Self {
357
357
+
Self {
358
358
+
xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
359
359
+
atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
360
360
+
authorization: None,
361
361
+
atproto_accept_labelers: None,
362
362
+
params: Some(serde_json::json!({
363
363
+
"feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
364
364
+
})),
365
365
+
hydration_sources: vec![HydrationSource {
366
366
+
path: "feed[].post".to_string(),
367
367
+
shape: "at-uri".to_string(),
368
368
+
}],
369
369
+
}
370
370
+
}
371
371
+
}
372
372
+
373
373
+
#[derive(Object)]
374
374
+
#[oai(example = true)]
195
375
struct FoundDidResponseObject {
196
376
/// the DID, bi-directionally verified if using Slingshot
197
377
did: String,
···
219
399
}
220
400
221
401
struct Xrpc {
402
402
+
base_url: url::Url,
222
403
cache: HybridCache<String, CachedRecord>,
223
404
identity: Identity,
405
405
+
proxy: Arc<Proxy>,
224
406
repo: Arc<Repo>,
225
407
}
226
408
···
286
468
/// only retains the most recent version of a record.
287
469
Query(cid): Query<Option<String>>,
288
470
) -> GetRecordResponse {
289
289
-
self.get_record_impl(repo, collection, rkey, cid).await
471
471
+
self.get_record_impl(&repo, &collection, &rkey, cid.as_deref())
472
472
+
.await
290
473
}
291
474
292
475
/// blue.microcosm.repo.getRecordByUri
···
356
539
return bad_at_uri();
357
540
};
358
541
359
359
-
// TODO: move this to links
360
360
-
let Some(rest) = normalized.strip_prefix("at://") else {
361
361
-
return bad_at_uri();
362
362
-
};
363
363
-
let Some((repo, rest)) = rest.split_once('/') else {
542
542
+
let Some((repo, collection, rkey)) = split_uri(&normalized) else {
364
543
return bad_at_uri();
365
544
};
366
366
-
let Some((collection, rest)) = rest.split_once('/') else {
367
367
-
return bad_at_uri();
368
368
-
};
369
369
-
let rkey = if let Some((rkey, _rest)) = rest.split_once('?') {
370
370
-
rkey
371
371
-
} else {
372
372
-
rest
373
373
-
};
374
545
375
546
self.get_record_impl(
376
376
-
repo.to_string(),
377
377
-
collection.to_string(),
378
378
-
rkey.to_string(),
379
379
-
cid,
547
547
+
Into::<String>::into(repo).as_str(),
548
548
+
collection.as_str(),
549
549
+
rkey.as_str(),
550
550
+
cid.as_deref(),
380
551
)
381
552
.await
382
553
}
···
456
627
/// Handle or DID to resolve
457
628
#[oai(example = "example_handle")]
458
629
Query(identifier): Query<String>,
459
459
-
) -> ResolveMiniIDResponse {
630
630
+
) -> ResolveMiniDocResponse {
460
631
self.resolve_mini_id(Query(identifier)).await
461
632
}
462
633
···
474
645
/// Handle or DID to resolve
475
646
#[oai(example = "example_handle")]
476
647
Query(identifier): Query<String>,
477
477
-
) -> ResolveMiniIDResponse {
648
648
+
) -> ResolveMiniDocResponse {
649
649
+
Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await
650
650
+
}
651
651
+
652
652
+
async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniDocResponse {
478
653
let invalid = |reason: &'static str| {
479
479
-
ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
654
654
+
ResolveMiniDocResponse::BadRequest(xrpc_error("InvalidRequest", reason))
480
655
};
481
656
482
657
let mut unverified_handle = None;
483
483
-
let did = match Did::new(identifier.clone()) {
658
658
+
let did = match Did::new(identifier.to_string()) {
484
659
Ok(did) => did,
485
660
Err(_) => {
486
661
let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
487
662
return invalid("Identifier was not a valid DID or handle");
488
663
};
489
664
490
490
-
match self.identity.handle_to_did(alleged_handle.clone()).await {
665
665
+
match identity.handle_to_did(alleged_handle.clone()).await {
491
666
Ok(res) => {
492
667
if let Some(did) = res {
493
668
// we did it joe
···
505
680
}
506
681
}
507
682
};
508
508
-
let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
683
683
+
let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else {
509
684
return invalid("Failed to get DID doc");
510
685
};
511
686
let Some(partial_doc) = partial_doc else {
···
525
700
"handle.invalid".to_string()
526
701
}
527
702
} else {
528
528
-
let Ok(handle_did) = self
529
529
-
.identity
703
703
+
let Ok(handle_did) = identity
530
704
.handle_to_did(partial_doc.unverified_handle.clone())
531
705
.await
532
706
else {
···
542
716
}
543
717
};
544
718
545
545
-
ResolveMiniIDResponse::Ok(Json(MiniDocResponseObject {
719
719
+
ResolveMiniDocResponse::Ok(Json(MiniDocResponseObject {
546
720
did: did.to_string(),
547
721
handle,
548
722
pds: partial_doc.pds,
···
550
724
}))
551
725
}
552
726
727
727
+
/// com.bad-example.identity.resolveService
728
728
+
///
729
729
+
/// resolve an atproto service did + id to its http endpoint
730
730
+
///
731
731
+
/// > [!important]
732
732
+
/// > this endpoint is experimental and may change
733
733
+
#[oai(
734
734
+
path = "/com.bad-example.identity.resolveService",
735
735
+
method = "get",
736
736
+
tag = "ApiTags::Custom"
737
737
+
)]
738
738
+
async fn resolve_service(
739
739
+
&self,
740
740
+
/// the service's did
741
741
+
#[oai(example = "example_service_did")]
742
742
+
Query(did): Query<String>,
743
743
+
/// id fragment, starting with '#'
744
744
+
///
745
745
+
/// must be url-encoded!
746
746
+
#[oai(example = "example_id_fragment")]
747
747
+
Query(id): Query<String>,
748
748
+
/// optionally, the exact service type to filter
749
749
+
///
750
750
+
/// resolving a pds requires matching the type as well as id. service
751
751
+
/// proxying ignores the type.
752
752
+
Query(r#type): Query<Option<String>>,
753
753
+
) -> ResolveServiceResponse {
754
754
+
let Ok(did) = Did::new(did) else {
755
755
+
return ResolveServiceResponse::BadRequest(xrpc_error(
756
756
+
"InvalidRequest",
757
757
+
"could not parse 'did' into a DID",
758
758
+
));
759
759
+
};
760
760
+
let identity = self.identity.clone();
761
761
+
Self::resolve_service_impl(&did, &id, r#type.as_deref(), identity).await
762
762
+
}
763
763
+
764
764
+
async fn resolve_service_impl(
765
765
+
did: &Did,
766
766
+
id_fragment: &str,
767
767
+
service_type: Option<&str>,
768
768
+
identity: Identity,
769
769
+
) -> ResolveServiceResponse {
770
770
+
let invalid = |reason: &'static str| {
771
771
+
ResolveServiceResponse::BadRequest(xrpc_error("InvalidRequest", reason))
772
772
+
};
773
773
+
let Ok(service_mini_doc) = identity.did_to_mini_service_doc(did).await else {
774
774
+
return invalid("Failed to get DID doc");
775
775
+
};
776
776
+
let Some(service_mini_doc) = service_mini_doc else {
777
777
+
return invalid("Failed to find DID doc");
778
778
+
};
779
779
+
780
780
+
let Some(matching) = service_mini_doc.get(id_fragment, service_type) else {
781
781
+
return invalid("failed to match identity (and maybe type)");
782
782
+
};
783
783
+
784
784
+
ResolveServiceResponse::Ok(Json(ServiceResponseObject {
785
785
+
endpoint: matching.endpoint.clone(),
786
786
+
}))
787
787
+
}
788
788
+
789
789
+
/// com.bad-example.proxy.hydrateQueryResponse
790
790
+
///
791
791
+
/// > [!important]
792
792
+
/// > Unstable! This endpoint is experimental and may change.
793
793
+
///
794
794
+
/// Fetch + include records referenced from an upstream xrpc query response
795
795
+
#[oai(
796
796
+
path = "/com.bad-example.proxy.hydrateQueryResponse",
797
797
+
method = "post",
798
798
+
tag = "ApiTags::Custom"
799
799
+
)]
800
800
+
async fn proxy_hydrate_query(
801
801
+
&self,
802
802
+
Json(payload): Json<ProxyQueryPayload>,
803
803
+
) -> ProxyHydrateResponse {
804
804
+
let params = if let Some(p) = payload.params {
805
805
+
let serde_json::Value::Object(map) = p else {
806
806
+
panic!("params have to be an object");
807
807
+
};
808
808
+
Some(map)
809
809
+
} else {
810
810
+
None
811
811
+
};
812
812
+
813
813
+
let Some((service_did, id_fragment)) = payload.atproto_proxy.rsplit_once("#") else {
814
814
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
815
815
+
"BadParameter",
816
816
+
"atproto_proxy could not be understood",
817
817
+
));
818
818
+
};
819
819
+
820
820
+
let Ok(service_did) = service_did.parse() else {
821
821
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
822
822
+
"BadParameter",
823
823
+
"atproto_proxy service did could not be parsed",
824
824
+
));
825
825
+
};
826
826
+
827
827
+
let Ok(xrpc) = payload.xrpc.parse() else {
828
828
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
829
829
+
"BadParameter",
830
830
+
"invalid NSID for xrpc param",
831
831
+
));
832
832
+
};
833
833
+
834
834
+
match self
835
835
+
.proxy
836
836
+
.proxy(
837
837
+
&service_did,
838
838
+
&format!("#{id_fragment}"),
839
839
+
&xrpc,
840
840
+
payload.authorization.as_deref(),
841
841
+
payload.atproto_accept_labelers.as_deref(),
842
842
+
params,
843
843
+
)
844
844
+
.await
845
845
+
{
846
846
+
Ok(skeleton) => {
847
847
+
let links = match extract_links(payload.hydration_sources, &skeleton) {
848
848
+
Ok(l) => l,
849
849
+
Err(e) => {
850
850
+
log::warn!("problem extracting: {e:?}");
851
851
+
return ProxyHydrateResponse::BadRequest(xrpc_error(
852
852
+
"oop",
853
853
+
"sorry, error extracting",
854
854
+
));
855
855
+
}
856
856
+
};
857
857
+
let mut records = HashMap::new();
858
858
+
let mut identifiers = HashMap::new();
859
859
+
860
860
+
enum GetThing {
861
861
+
Record(String, Hydration<FoundRecordResponseObject>),
862
862
+
Identifier(String, Hydration<MiniDocResponseObject>),
863
863
+
}
864
864
+
865
865
+
let (tx, mut rx) = mpsc::channel(1);
866
866
+
867
867
+
let t0 = Instant::now();
868
868
+
869
869
+
for (i, link) in links.into_iter().enumerate() {
870
870
+
match link {
871
871
+
MatchedRef::AtUri(parts) => {
872
872
+
let non_canonical_url = parts.to_uri();
873
873
+
if records.contains_key(&non_canonical_url) {
874
874
+
log::warn!("skipping duplicate record without checking cid");
875
875
+
continue;
876
876
+
}
877
877
+
let mut follow_up = self.base_url.clone();
878
878
+
follow_up.set_path("/xrpc/com.atproto.repo.getRecord");
879
879
+
follow_up
880
880
+
.query_pairs_mut()
881
881
+
.append_pair("repo", &Into::<String>::into(parts.repo.clone()))
882
882
+
.append_pair("collection", parts.collection.as_str())
883
883
+
.append_pair("rkey", parts.rkey.as_str());
884
884
+
if let Some(ref cid) = parts.cid {
885
885
+
follow_up
886
886
+
.query_pairs_mut()
887
887
+
.append_pair("cid", &cid.as_ref().to_string());
888
888
+
}
889
889
+
890
890
+
if i >= 100 {
891
891
+
records.insert(
892
892
+
non_canonical_url.clone(),
893
893
+
Hydration::Pending(ProxyHydrationPending {
894
894
+
reason: "limit".to_string(),
895
895
+
follow_up: follow_up.to_string(),
896
896
+
}),
897
897
+
);
898
898
+
continue;
899
899
+
} else {
900
900
+
records.insert(
901
901
+
non_canonical_url.clone(),
902
902
+
Hydration::Pending(ProxyHydrationPending {
903
903
+
reason: "deadline".to_string(),
904
904
+
follow_up: follow_up.to_string(),
905
905
+
}),
906
906
+
);
907
907
+
}
908
908
+
909
909
+
let tx = tx.clone();
910
910
+
let identity = self.identity.clone();
911
911
+
let repo = self.repo.clone();
912
912
+
tokio::task::spawn(async move {
913
913
+
let FullAtUriParts {
914
914
+
repo: ident,
915
915
+
collection,
916
916
+
rkey,
917
917
+
cid,
918
918
+
} = parts;
919
919
+
let did = match ident {
920
920
+
AtIdentifier::Did(did) => did,
921
921
+
AtIdentifier::Handle(handle) => {
922
922
+
let Ok(Some(did)) = identity.handle_to_did(handle).await
923
923
+
else {
924
924
+
let res = Hydration::Error(ProxyHydrationError {
925
925
+
reason: "could not resolve handle".to_string(),
926
926
+
should_retry: true,
927
927
+
follow_up: follow_up.to_string(),
928
928
+
});
929
929
+
return if tx
930
930
+
.send(GetThing::Record(non_canonical_url, res))
931
931
+
.await
932
932
+
.is_ok()
933
933
+
{
934
934
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1);
935
935
+
} else {
936
936
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1);
937
937
+
};
938
938
+
};
939
939
+
did
940
940
+
}
941
941
+
};
942
942
+
943
943
+
let res =
944
944
+
match repo.get_record(&did, &collection, &rkey, &cid).await {
945
945
+
Ok(CachedRecord::Deleted) => {
946
946
+
Hydration::Error(ProxyHydrationError {
947
947
+
reason: "record deleted".to_string(),
948
948
+
should_retry: false,
949
949
+
follow_up: follow_up.to_string(),
950
950
+
})
951
951
+
}
952
952
+
Ok(CachedRecord::Found(RawRecord {
953
953
+
cid: found_cid,
954
954
+
record,
955
955
+
})) => {
956
956
+
if cid
957
957
+
.as_ref()
958
958
+
.map(|expected| *expected != found_cid)
959
959
+
.unwrap_or(false)
960
960
+
{
961
961
+
Hydration::Error(ProxyHydrationError {
962
962
+
reason: "not found".to_string(),
963
963
+
should_retry: false,
964
964
+
follow_up: follow_up.to_string(),
965
965
+
})
966
966
+
} else if let Ok(value) = serde_json::from_str(&record)
967
967
+
{
968
968
+
let canonical_uri = FullAtUriParts {
969
969
+
repo: AtIdentifier::Did(did),
970
970
+
collection,
971
971
+
rkey,
972
972
+
cid: None, // not used for .to_uri
973
973
+
}
974
974
+
.to_uri();
975
975
+
Hydration::Found(FoundRecordResponseObject {
976
976
+
cid: Some(found_cid.as_ref().to_string()),
977
977
+
uri: canonical_uri,
978
978
+
value,
979
979
+
})
980
980
+
} else {
981
981
+
Hydration::Error(ProxyHydrationError {
982
982
+
reason: "could not parse upstream response"
983
983
+
.to_string(),
984
984
+
should_retry: false,
985
985
+
follow_up: follow_up.to_string(),
986
986
+
})
987
987
+
}
988
988
+
}
989
989
+
Err(e) => {
990
990
+
log::warn!("finally oop {e:?}");
991
991
+
Hydration::Error(ProxyHydrationError {
992
992
+
reason: "failed to fetch record".to_string(),
993
993
+
should_retry: true, // TODO
994
994
+
follow_up: follow_up.to_string(),
995
995
+
})
996
996
+
}
997
997
+
};
998
998
+
if tx
999
999
+
.send(GetThing::Record(non_canonical_url, res))
1000
1000
+
.await
1001
1001
+
.is_ok()
1002
1002
+
{
1003
1003
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1);
1004
1004
+
} else {
1005
1005
+
metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1);
1006
1006
+
}
1007
1007
+
});
1008
1008
+
}
1009
1009
+
MatchedRef::Identifier(id) => {
1010
1010
+
let identifier: String = id.clone().into();
1011
1011
+
if identifiers.contains_key(&identifier) {
1012
1012
+
continue;
1013
1013
+
}
1014
1014
+
1015
1015
+
let mut follow_up = self.base_url.clone();
1016
1016
+
follow_up.set_path("/xrpc/blue.microcosm.identity.resolveMiniDoc");
1017
1017
+
1018
1018
+
follow_up
1019
1019
+
.query_pairs_mut()
1020
1020
+
.append_pair("identifier", &identifier);
1021
1021
+
1022
1022
+
if i >= 100 {
1023
1023
+
identifiers.insert(
1024
1024
+
identifier.clone(),
1025
1025
+
Hydration::Pending(ProxyHydrationPending {
1026
1026
+
reason: "limit".to_string(),
1027
1027
+
follow_up: follow_up.to_string(),
1028
1028
+
}),
1029
1029
+
);
1030
1030
+
continue;
1031
1031
+
} else {
1032
1032
+
identifiers.insert(
1033
1033
+
identifier.clone(),
1034
1034
+
Hydration::Pending(ProxyHydrationPending {
1035
1035
+
reason: "deadline".to_string(),
1036
1036
+
follow_up: follow_up.to_string(),
1037
1037
+
}),
1038
1038
+
);
1039
1039
+
}
1040
1040
+
1041
1041
+
let tx = tx.clone();
1042
1042
+
let identity = self.identity.clone();
1043
1043
+
tokio::task::spawn(async move {
1044
1044
+
let res = match Self::resolve_mini_doc_impl(&identifier, identity)
1045
1045
+
.await
1046
1046
+
{
1047
1047
+
ResolveMiniDocResponse::Ok(Json(mini_doc)) => {
1048
1048
+
Hydration::Found(mini_doc)
1049
1049
+
}
1050
1050
+
ResolveMiniDocResponse::BadRequest(e) => {
1051
1051
+
log::warn!("minidoc fail: {:?}", e.0);
1052
1052
+
Hydration::Error(ProxyHydrationError {
1053
1053
+
reason: "failed to resolve mini doc".to_string(),
1054
1054
+
should_retry: false,
1055
1055
+
follow_up: follow_up.to_string(),
1056
1056
+
})
1057
1057
+
}
1058
1058
+
};
1059
1059
+
if tx.send(GetThing::Identifier(identifier, res)).await.is_ok() {
1060
1060
+
metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "true").increment(1);
1061
1061
+
} else {
1062
1062
+
metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "false").increment(1);
1063
1063
+
}
1064
1064
+
});
1065
1065
+
}
1066
1066
+
}
1067
1067
+
}
1068
1068
+
// so the channel can close when all are completed
1069
1069
+
// (we shoudl be doing a timeout...)
1070
1070
+
drop(tx);
1071
1071
+
1072
1072
+
let deadline = t0 + std::time::Duration::from_secs_f64(1.6);
1073
1073
+
let res = tokio::time::timeout_at(deadline.into(), async {
1074
1074
+
while let Some(hydration) = rx.recv().await {
1075
1075
+
match hydration {
1076
1076
+
GetThing::Record(uri, h) => {
1077
1077
+
if let Some(r) = records.get_mut(&uri) {
1078
1078
+
match (&r, &h) {
1079
1079
+
(_, Hydration::Found(_)) => *r = h, // always replace if found
1080
1080
+
(Hydration::Pending(_), _) => *r = h, // or if it was pending
1081
1081
+
_ => {} // else leave it
1082
1082
+
}
1083
1083
+
} else {
1084
1084
+
records.insert(uri, h);
1085
1085
+
}
1086
1086
+
}
1087
1087
+
GetThing::Identifier(identifier, md) => {
1088
1088
+
identifiers.insert(identifier.to_string(), md);
1089
1089
+
}
1090
1090
+
};
1091
1091
+
}
1092
1092
+
})
1093
1093
+
.await;
1094
1094
+
1095
1095
+
if res.is_ok() {
1096
1096
+
metrics::histogram!("slingshot_hydration_all_completed").record(t0.elapsed());
1097
1097
+
} else {
1098
1098
+
metrics::counter!("slingshot_hydration_cut_off").increment(1);
1099
1099
+
}
1100
1100
+
1101
1101
+
ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
1102
1102
+
output: skeleton,
1103
1103
+
records,
1104
1104
+
identifiers,
1105
1105
+
}))
1106
1106
+
}
1107
1107
+
Err(e) => {
1108
1108
+
log::warn!("oh no: {e:?}");
1109
1109
+
ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
1110
1110
+
}
1111
1111
+
}
1112
1112
+
}
1113
1113
+
553
1114
async fn get_record_impl(
554
1115
&self,
555
555
-
repo: String,
556
556
-
collection: String,
557
557
-
rkey: String,
558
558
-
cid: Option<String>,
1116
1116
+
repo: &str,
1117
1117
+
collection: &str,
1118
1118
+
rkey: &str,
1119
1119
+
cid: Option<&str>,
559
1120
) -> GetRecordResponse {
560
560
-
let did = match Did::new(repo.clone()) {
1121
1121
+
let did = match Did::new(repo.to_string()) {
561
1122
Ok(did) => did,
562
1123
Err(_) => {
563
1124
let Ok(handle) = Handle::new(repo.to_lowercase()) else {
···
588
1149
}
589
1150
};
590
1151
591
591
-
let Ok(collection) = Nsid::new(collection) else {
1152
1152
+
let Ok(collection) = Nsid::new(collection.to_string()) else {
592
1153
return GetRecordResponse::BadRequest(xrpc_error(
593
1154
"InvalidRequest",
594
1155
"Invalid NSID for collection",
595
1156
));
596
1157
};
597
1158
598
598
-
let Ok(rkey) = RecordKey::new(rkey) else {
1159
1159
+
let Ok(rkey) = RecordKey::new(rkey.to_string()) else {
599
1160
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey"));
600
1161
};
601
1162
602
1163
let cid: Option<Cid> = if let Some(cid) = cid {
603
603
-
let Ok(cid) = Cid::from_str(&cid) else {
1164
1164
+
let Ok(cid) = Cid::from_str(cid) else {
604
1165
return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID"));
605
1166
};
606
1167
Some(cid)
···
748
1309
cache: HybridCache<String, CachedRecord>,
749
1310
identity: Identity,
750
1311
repo: Repo,
1312
1312
+
proxy: Proxy,
1313
1313
+
base_url: url::Url,
751
1314
acme_domain: Option<String>,
752
1315
acme_contact: Option<String>,
753
1316
acme_cache_path: Option<PathBuf>,
···
756
1319
bind: std::net::SocketAddr,
757
1320
) -> Result<(), ServerError> {
758
1321
let repo = Arc::new(repo);
1322
1322
+
let proxy = Arc::new(proxy);
759
1323
let api_service = OpenApiService::new(
760
1324
Xrpc {
1325
1325
+
base_url,
761
1326
cache,
762
1327
identity,
1328
1328
+
proxy,
763
1329
repo,
764
1330
},
765
1331
"Slingshot",
···
823
1389
.with(
824
1390
Cors::new()
825
1391
.allow_origin_regex("*")
826
826
-
.allow_methods([Method::GET])
1392
1392
+
.allow_methods([Method::GET, Method::POST])
827
1393
.allow_credentials(false),
828
1394
)
829
1395
.with(CatchPanic::new())