+1
-1
Makefile
+1
-1
Makefile
+12
-6
spacedust/src/consumer.rs
+12
-6
spacedust/src/consumer.rs
···
1
-
use std::sync::Arc;
2
-
use tokio_util::sync::CancellationToken;
3
1
use crate::ClientMessage;
4
2
use crate::error::ConsumerError;
5
3
use crate::removable_delay_queue;
···
8
6
events::{CommitOp, Cursor, EventKind},
9
7
};
10
8
use links::collect_links;
9
+
use std::sync::Arc;
11
10
use tokio::sync::broadcast;
11
+
use tokio_util::sync::CancellationToken;
12
12
13
13
const MAX_LINKS_PER_EVENT: usize = 100;
14
14
···
61
61
};
62
62
63
63
// TODO: something a bit more robust
64
-
let at_uri = format!("at://{}/{}/{}", &*event.did, &*commit.collection, &*commit.rkey);
64
+
let at_uri = format!(
65
+
"at://{}/{}/{}",
66
+
&*event.did, &*commit.collection, &*commit.rkey
67
+
);
65
68
66
69
// TODO: keep a buffer and remove quick deletes to debounce notifs
67
70
// for now we just drop all deletes eek
68
71
if commit.operation == CommitOp::Delete {
69
-
d.remove_range((at_uri.clone(), 0)..=(at_uri.clone(), MAX_LINKS_PER_EVENT)).await;
72
+
d.remove_range((at_uri.clone(), 0)..=(at_uri.clone(), MAX_LINKS_PER_EVENT))
73
+
.await;
70
74
continue;
71
75
}
72
76
let Some(ref record) = commit.record else {
···
86
90
if i >= MAX_LINKS_PER_EVENT {
87
91
// todo: indicate if the link limit was reached (-> links omitted)
88
92
log::warn!("consumer: event has too many links, ignoring the rest");
89
-
metrics::counter!("consumer_dropped_links", "reason" => "too_many_links").increment(1);
93
+
metrics::counter!("consumer_dropped_links", "reason" => "too_many_links")
94
+
.increment(1);
90
95
break;
91
96
}
92
97
let client_message = match ClientMessage::new_link(link, &at_uri, commit) {
···
94
99
Err(e) => {
95
100
// TODO indicate to clients that a link has been dropped
96
101
log::warn!("consumer: failed to serialize link to json: {e:?}");
97
-
metrics::counter!("consumer_dropped_links", "reason" => "failed_to_serialize").increment(1);
102
+
metrics::counter!("consumer_dropped_links", "reason" => "failed_to_serialize")
103
+
.increment(1);
98
104
continue;
99
105
}
100
106
};
+2
-2
spacedust/src/delay.rs
+2
-2
spacedust/src/delay.rs
···
1
+
use crate::error::DelayError;
1
2
use crate::removable_delay_queue;
2
-
use tokio_util::sync::CancellationToken;
3
3
use tokio::sync::broadcast;
4
-
use crate::error::DelayError;
4
+
use tokio_util::sync::CancellationToken;
5
5
6
6
pub async fn to_broadcast<T>(
7
7
source: removable_delay_queue::Output<(String, usize), T>,
+19
-8
spacedust/src/lib.rs
+19
-8
spacedust/src/lib.rs
···
1
1
pub mod consumer;
2
2
pub mod delay;
3
3
pub mod error;
4
+
pub mod removable_delay_queue;
4
5
pub mod server;
5
6
pub mod subscriber;
6
-
pub mod removable_delay_queue;
7
7
8
-
use links::CollectedLink;
9
8
use jetstream::events::CommitEvent;
10
-
use tokio_tungstenite::tungstenite::Message;
9
+
use links::CollectedLink;
11
10
use serde::{Deserialize, Serialize};
12
11
use server::MultiSubscribeQuery;
12
+
use tokio_tungstenite::tungstenite::Message;
13
13
14
14
#[derive(Debug)]
15
15
pub struct FilterableProperties {
···
32
32
}
33
33
34
34
impl ClientMessage {
35
-
pub fn new_link(link: CollectedLink, at_uri: &str, commit: &CommitEvent) -> Result<Self, serde_json::Error> {
35
+
pub fn new_link(
36
+
link: CollectedLink,
37
+
at_uri: &str,
38
+
commit: &CommitEvent,
39
+
) -> Result<Self, serde_json::Error> {
36
40
let subject_did = link.target.did();
37
41
38
42
let subject = link.target.into_string();
···
61
65
62
66
let message = Message::Text(client_event_json.into());
63
67
64
-
let properties = FilterableProperties { subject, subject_did, source };
68
+
let properties = FilterableProperties {
69
+
subject,
70
+
subject_did,
71
+
source,
72
+
};
65
73
66
-
Ok(ClientMessage { message, properties })
74
+
Ok(ClientMessage {
75
+
message,
76
+
properties,
77
+
})
67
78
}
68
79
}
69
80
70
81
#[derive(Debug, Serialize)]
71
-
#[serde(rename_all="snake_case")]
82
+
#[serde(rename_all = "snake_case")]
72
83
pub struct ClientEvent {
73
-
kind: &'static str, // "link"
84
+
kind: &'static str, // "link"
74
85
origin: &'static str, // "live", "replay", "backfill"
75
86
link: ClientLinkEvent,
76
87
}
+11
-6
spacedust/src/main.rs
+11
-6
spacedust/src/main.rs
···
1
-
use spacedust::error::MainTaskError;
2
1
use spacedust::consumer;
3
-
use spacedust::server;
4
2
use spacedust::delay;
3
+
use spacedust::error::MainTaskError;
5
4
use spacedust::removable_delay_queue::removable_delay_queue;
5
+
use spacedust::server;
6
6
7
7
use clap::Parser;
8
8
use metrics_exporter_prometheus::PrometheusBuilder;
9
+
use std::time::Duration;
9
10
use tokio::sync::broadcast;
10
11
use tokio_util::sync::CancellationToken;
11
-
use std::time::Duration;
12
12
13
13
/// Aggregate links in the at-mosphere
14
14
#[derive(Parser, Debug, Clone)]
···
80
80
args.jetstream,
81
81
None,
82
82
args.jetstream_no_zstd,
83
-
consumer_shutdown
83
+
consumer_shutdown,
84
84
)
85
-
.await?;
85
+
.await?;
86
86
Ok(())
87
87
});
88
88
89
89
let delay_shutdown = shutdown.clone();
90
90
tasks.spawn(async move {
91
-
delay::to_broadcast(delay_queue_receiver, consumer_delayed_sender, delay_shutdown).await?;
91
+
delay::to_broadcast(
92
+
delay_queue_receiver,
93
+
consumer_delayed_sender,
94
+
delay_shutdown,
95
+
)
96
+
.await?;
92
97
Ok(())
93
98
});
94
99
+15
-11
spacedust/src/removable_delay_queue.rs
+15
-11
spacedust/src/removable_delay_queue.rs
···
1
-
use std::ops::RangeBounds;
2
1
use std::collections::{BTreeMap, VecDeque};
3
-
use std::time::{Duration, Instant};
4
-
use tokio::sync::Mutex;
2
+
use std::ops::RangeBounds;
5
3
use std::sync::Arc;
4
+
use std::time::{Duration, Instant};
6
5
use thiserror::Error;
6
+
use tokio::sync::Mutex;
7
7
8
8
#[derive(Debug, Error)]
9
9
pub enum EnqueueError<T> {
···
17
17
#[derive(Debug)]
18
18
struct Queue<K: Key, T> {
19
19
queue: VecDeque<(Instant, K)>,
20
-
items: BTreeMap<K, T>
20
+
items: BTreeMap<K, T>,
21
21
}
22
22
23
23
pub struct Input<K: Key, T> {
···
49
49
pub async fn remove_range(&self, range: impl RangeBounds<K>) {
50
50
let n = {
51
51
let mut q = self.q.lock().await;
52
-
let keys = q.items.range(range).map(|(k, _)| k).cloned().collect::<Vec<_>>();
52
+
let keys = q
53
+
.items
54
+
.range(range)
55
+
.map(|(k, _)| k)
56
+
.cloned()
57
+
.collect::<Vec<_>>();
53
58
for k in &keys {
54
59
q.items.remove(k);
55
60
}
···
94
99
} else {
95
100
let overshoot = now.saturating_duration_since(expected_release);
96
101
metrics::counter!("delay_queue_emit_total", "early" => "no").increment(1);
97
-
metrics::histogram!("delay_queue_emit_overshoot").record(overshoot.as_secs_f64());
102
+
metrics::histogram!("delay_queue_emit_overshoot")
103
+
.record(overshoot.as_secs_f64());
98
104
}
99
-
return Some(item)
105
+
return Some(item);
100
106
} else if Arc::strong_count(&self.q) == 1 {
101
107
return None;
102
108
}
103
109
// the queue is *empty*, so we need to wait at least as long as the current delay
104
110
tokio::time::sleep(self.delay).await;
105
111
metrics::counter!("delay_queue_entirely_empty_total").increment(1);
106
-
};
112
+
}
107
113
}
108
114
}
109
115
110
-
pub fn removable_delay_queue<K: Key, T>(
111
-
delay: Duration,
112
-
) -> (Input<K, T>, Output<K, T>) {
116
+
pub fn removable_delay_queue<K: Key, T>(delay: Duration) -> (Input<K, T>, Output<K, T>) {
113
117
let q: Arc<Mutex<Queue<K, T>>> = Arc::new(Mutex::new(Queue {
114
118
queue: VecDeque::new(),
115
119
items: BTreeMap::new(),
+19
-17
spacedust/src/server.rs
+19
-17
spacedust/src/server.rs
···
1
+
use crate::ClientMessage;
1
2
use crate::error::ServerError;
2
3
use crate::subscriber::Subscriber;
3
-
use metrics::{histogram, counter};
4
-
use std::sync::Arc;
5
-
use crate::ClientMessage;
4
+
use dropshot::{
5
+
ApiDescription, ApiEndpointBodyContentType, Body, ConfigDropshot, ConfigLogging,
6
+
ConfigLoggingLevel, ExtractorMetadata, HttpError, HttpResponse, Query, RequestContext,
7
+
ServerBuilder, ServerContext, SharedExtractor, WebsocketConnection, channel, endpoint,
8
+
};
6
9
use http::{
7
-
header::{ORIGIN, USER_AGENT},
8
10
Response, StatusCode,
9
-
};
10
-
use dropshot::{
11
-
Body,
12
-
ApiDescription, ConfigDropshot, ConfigLogging, ConfigLoggingLevel, Query, RequestContext,
13
-
ServerBuilder, WebsocketConnection, channel, endpoint, HttpResponse,
14
-
ApiEndpointBodyContentType, ExtractorMetadata, HttpError, ServerContext,
15
-
SharedExtractor,
11
+
header::{ORIGIN, USER_AGENT},
16
12
};
13
+
use metrics::{counter, histogram};
14
+
use std::sync::Arc;
17
15
16
+
use async_trait::async_trait;
18
17
use schemars::JsonSchema;
19
18
use serde::{Deserialize, Serialize};
19
+
use std::collections::HashSet;
20
20
use tokio::sync::broadcast;
21
21
use tokio::time::Instant;
22
22
use tokio_tungstenite::tungstenite::protocol::{Role, WebSocketConfig};
23
23
use tokio_util::sync::CancellationToken;
24
-
use async_trait::async_trait;
25
-
use std::collections::HashSet;
26
24
27
25
const INDEX_HTML: &str = include_str!("../static/index.html");
28
26
const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
···
30
28
pub async fn serve(
31
29
b: broadcast::Sender<Arc<ClientMessage>>,
32
30
d: broadcast::Sender<Arc<ClientMessage>>,
33
-
shutdown: CancellationToken
31
+
shutdown: CancellationToken,
34
32
) -> Result<(), ServerError> {
35
33
let config_logging = ConfigLogging::StderrTerminal {
36
34
level: ConfigLoggingLevel::Info,
···
65
63
);
66
64
67
65
let sub_shutdown = shutdown.clone();
68
-
let ctx = Context { spec, b, d, shutdown: sub_shutdown };
66
+
let ctx = Context {
67
+
spec,
68
+
b,
69
+
d,
70
+
shutdown: sub_shutdown,
71
+
};
69
72
70
73
let server = ServerBuilder::new(api, ctx, log)
71
74
.config(ConfigDropshot {
···
161
164
}
162
165
163
166
// TODO: cors for HttpError
164
-
165
167
166
168
/// Serve index page as html
167
169
#[endpoint {
···
316
318
upgraded.into_inner(),
317
319
Role::Server,
318
320
Some(WebSocketConfig::default().max_message_size(
319
-
Some(10 * 2_usize.pow(20)) // 10MiB, matching jetstream
321
+
Some(10 * 2_usize.pow(20)), // 10MiB, matching jetstream
320
322
)),
321
323
)
322
324
.await;
+23
-26
spacedust/src/subscriber.rs
+23
-26
spacedust/src/subscriber.rs
···
1
1
use crate::error::SubscriberUpdateError;
2
-
use std::sync::Arc;
3
-
use tokio::time::interval;
4
-
use std::time::Duration;
5
-
use futures::StreamExt;
2
+
use crate::server::MultiSubscribeQuery;
6
3
use crate::{ClientMessage, FilterableProperties, SubscriberSourcedMessage};
7
-
use crate::server::MultiSubscribeQuery;
4
+
use dropshot::WebsocketConnectionRaw;
8
5
use futures::SinkExt;
6
+
use futures::StreamExt;
9
7
use std::error::Error;
8
+
use std::sync::Arc;
9
+
use std::time::Duration;
10
10
use tokio::sync::broadcast::{self, error::RecvError};
11
+
use tokio::time::interval;
11
12
use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
12
13
use tokio_util::sync::CancellationToken;
13
-
use dropshot::WebsocketConnectionRaw;
14
14
15
15
const PING_PERIOD: Duration = Duration::from_secs(30);
16
16
···
20
20
}
21
21
22
22
impl Subscriber {
23
-
pub fn new(
24
-
query: MultiSubscribeQuery,
25
-
shutdown: CancellationToken,
26
-
) -> Self {
23
+
pub fn new(query: MultiSubscribeQuery, shutdown: CancellationToken) -> Self {
27
24
Self { query, shutdown }
28
25
}
29
26
30
27
pub async fn start(
31
28
mut self,
32
29
ws: WebSocketStream<WebsocketConnectionRaw>,
33
-
mut receiver: broadcast::Receiver<Arc<ClientMessage>>
30
+
mut receiver: broadcast::Receiver<Arc<ClientMessage>>,
34
31
) -> Result<(), Box<dyn Error>> {
35
32
let mut ping_state = None;
36
33
let (mut ws_sender, mut ws_receiver) = ws.split();
···
83
80
// TODO: send client an explanation
84
81
self.shutdown.cancel();
85
82
}
83
+
log::trace!("subscriber updated with opts: {:?}", self.query);
86
84
},
87
85
Some(Ok(m)) => log::trace!("subscriber sent an unexpected message: {m:?}"),
88
86
Some(Err(e)) => {
···
122
120
Ok(())
123
121
}
124
122
125
-
fn filter(
126
-
&self,
127
-
properties: &FilterableProperties,
128
-
) -> bool {
123
+
fn filter(&self, properties: &FilterableProperties) -> bool {
129
124
let query = &self.query;
130
125
131
126
// subject + subject DIDs are logical OR
132
-
if !(
133
-
query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty() ||
134
-
query.wanted_subjects.contains(&properties.subject) ||
135
-
properties.subject_did.as_ref().map(|did| query.wanted_subject_dids.contains(did)).unwrap_or(false)
136
-
) { // wowwww ^^ fix that
137
-
return false
127
+
if !(query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty()
128
+
|| query.wanted_subjects.contains(&properties.subject)
129
+
|| properties
130
+
.subject_did
131
+
.as_ref()
132
+
.map(|did| query.wanted_subject_dids.contains(did))
133
+
.unwrap_or(false))
134
+
{
135
+
// wowwww ^^ fix that
136
+
return false;
138
137
}
139
138
140
139
// subjects together with sources are logical AND
141
140
if !(query.wanted_sources.is_empty() || query.wanted_sources.contains(&properties.source)) {
142
-
return false
141
+
return false;
143
142
}
144
143
145
144
true
146
145
}
147
146
}
148
147
149
-
150
-
151
148
impl MultiSubscribeQuery {
152
149
pub fn update_from_raw(&mut self, s: &str) -> Result<(), SubscriberUpdateError> {
153
-
let SubscriberSourcedMessage::OptionsUpdate(opts) = serde_json::from_str(s)
154
-
.map_err(SubscriberUpdateError::FailedToParseMessage)?;
150
+
let SubscriberSourcedMessage::OptionsUpdate(opts) =
151
+
serde_json::from_str(s).map_err(SubscriberUpdateError::FailedToParseMessage)?;
155
152
if opts.wanted_sources.len() > 1_000 {
156
153
return Err(SubscriberUpdateError::TooManySourcesWanted);
157
154
}
+1
-1
who-am-i/Cargo.toml
+1
-1
who-am-i/Cargo.toml
···
4
4
edition = "2024"
5
5
6
6
[dependencies]
7
-
atrium-api = { version = "0.25.4", default-features = false, features = ["tokio", "agent"] }
7
+
atrium-api = { version = "0.25.4", default-features = false }
8
8
atrium-identity = "0.1.5"
9
9
atrium-oauth = "0.1.3"
10
10
clap = { version = "4.5.40", features = ["derive"] }
+1
-1
who-am-i/src/lib.rs
+1
-1
who-am-i/src/lib.rs
+1
-1
who-am-i/src/main.rs
+1
-1
who-am-i/src/main.rs
+14
-17
who-am-i/src/oauth.rs
+14
-17
who-am-i/src/oauth.rs
···
1
+
use crate::HickoryDnsTxtResolver;
1
2
use atrium_identity::{
2
3
did::{CommonDidResolver, CommonDidResolverConfig, DEFAULT_PLC_DIRECTORY_URL},
3
4
handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig},
4
5
};
5
6
use atrium_oauth::{
6
-
AuthorizeOptions,
7
+
AtprotoLocalhostClientMetadata, AuthorizeOptions, DefaultHttpClient, KnownScope, OAuthClient,
8
+
OAuthClientConfig, OAuthResolverConfig, Scope,
7
9
store::{session::MemorySessionStore, state::MemoryStateStore},
8
-
AtprotoLocalhostClientMetadata, DefaultHttpClient, KnownScope, OAuthClient, OAuthClientConfig,
9
-
OAuthResolverConfig, Scope,
10
10
};
11
11
use std::sync::Arc;
12
-
use crate::HickoryDnsTxtResolver;
13
12
14
13
pub type Client = OAuthClient<
15
14
MemoryStateStore,
···
23
22
let config = OAuthClientConfig {
24
23
client_metadata: AtprotoLocalhostClientMetadata {
25
24
redirect_uris: Some(vec![String::from("http://127.0.0.1:9997/authorized")]),
26
-
scopes: Some(vec![
27
-
Scope::Known(KnownScope::Atproto),
28
-
]),
25
+
scopes: Some(vec![Scope::Known(KnownScope::Atproto)]),
29
26
},
30
27
keys: None,
31
28
resolver: OAuthResolverConfig {
···
52
49
}
53
50
54
51
pub async fn authorize(client: &Client, handle: &str) -> String {
55
-
let Ok(url) = client.authorize(
56
-
handle,
57
-
AuthorizeOptions {
58
-
scopes: vec![
59
-
Scope::Known(KnownScope::Atproto),
60
-
],
61
-
..Default::default()
62
-
},
63
-
)
64
-
.await else {
52
+
let Ok(url) = client
53
+
.authorize(
54
+
handle,
55
+
AuthorizeOptions {
56
+
scopes: vec![Scope::Known(KnownScope::Atproto)],
57
+
..Default::default()
58
+
},
59
+
)
60
+
.await
61
+
else {
65
62
panic!("failed to authorize");
66
63
};
67
64
url
+16
-18
who-am-i/src/server.rs
+16
-18
who-am-i/src/server.rs
···
1
-
2
1
use atrium_api::agent::SessionManager;
3
-
use std::error::Error;
4
-
use metrics::{histogram, counter};
5
-
use std::sync::Arc;
2
+
use dropshot::{
3
+
ApiDescription, Body, ConfigDropshot, ConfigLogging, ConfigLoggingLevel, HttpError,
4
+
HttpResponse, HttpResponseSeeOther, Query, RequestContext, ServerBuilder, ServerContext,
5
+
endpoint, http_response_see_other,
6
+
};
6
7
use http::{
7
-
header::{ORIGIN, USER_AGENT},
8
8
Response, StatusCode,
9
+
header::{ORIGIN, USER_AGENT},
9
10
};
10
-
use dropshot::{
11
-
Body, HttpResponseSeeOther, http_response_see_other,
12
-
ApiDescription, ConfigDropshot, ConfigLogging, ConfigLoggingLevel, RequestContext,
13
-
ServerBuilder, endpoint, HttpResponse, HttpError, ServerContext, Query,
14
-
};
11
+
use metrics::{counter, histogram};
12
+
use std::error::Error;
13
+
use std::sync::Arc;
15
14
16
15
use atrium_oauth::CallbackParams;
17
16
use schemars::JsonSchema;
···
19
18
use tokio::time::Instant;
20
19
use tokio_util::sync::CancellationToken;
21
20
22
-
use crate::{Client, client, authorize};
21
+
use crate::{Client, authorize, client};
23
22
24
23
const INDEX_HTML: &str = include_str!("../static/index.html");
25
24
const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
26
25
27
-
pub async fn serve(
28
-
shutdown: CancellationToken
29
-
) -> Result<(), Box<dyn Error + Send + Sync>> {
26
+
pub async fn serve(shutdown: CancellationToken) -> Result<(), Box<dyn Error + Send + Sync>> {
30
27
let config_logging = ConfigLogging::StderrTerminal {
31
28
level: ConfigLoggingLevel::Info,
32
29
};
33
30
34
-
let log = config_logging
35
-
.to_logger("example-basic")?;
31
+
let log = config_logging.to_logger("example-basic")?;
36
32
37
33
let mut api = ApiDescription::new();
38
34
api.register(index).unwrap();
···
58
54
.json()?,
59
55
);
60
56
61
-
let ctx = Context { spec, client: client().into() };
57
+
let ctx = Context {
58
+
spec,
59
+
client: client().into(),
60
+
};
62
61
63
62
let server = ServerBuilder::new(api, ctx, log)
64
63
.config(ConfigDropshot {
···
152
151
}
153
152
154
153
// TODO: cors for HttpError
155
-
156
154
157
155
/// Serve index page as html
158
156
#[endpoint {