+15
links/src/lib.rs
+15
links/src/lib.rs
···
42
42
None
43
43
}
44
44
}
45
+
pub fn did(&self) -> Option<String> {
46
+
let did = match self {
47
+
Link::AtUri(s) => {
48
+
let rest = s.strip_prefix("at://")?; // todo: this might be safe to unwrap?
49
+
if let Some((did, _)) = rest.split_once("/") {
50
+
did
51
+
} else {
52
+
rest
53
+
}
54
+
}
55
+
Link::Uri(_) => return None,
56
+
Link::Did(did) => did,
57
+
};
58
+
Some(did.to_string())
59
+
}
45
60
}
46
61
47
62
#[derive(Debug, PartialEq)]
+29
-23
spacedust/src/consumer.rs
+29
-23
spacedust/src/consumer.rs
···
1
+
use std::sync::Arc;
1
2
use tokio_util::sync::CancellationToken;
2
-
use crate::LinkEvent;
3
+
use crate::ClientMessage;
3
4
use crate::error::ConsumerError;
4
5
use crate::removable_delay_queue;
5
6
use jetstream::{
···
12
13
const MAX_LINKS_PER_EVENT: usize = 100;
13
14
14
15
pub async fn consume(
15
-
b: broadcast::Sender<LinkEvent>,
16
-
d: removable_delay_queue::Input<(String, usize), LinkEvent>,
16
+
b: broadcast::Sender<Arc<ClientMessage>>,
17
+
d: removable_delay_queue::Input<(String, usize), Arc<ClientMessage>>,
17
18
jetstream_endpoint: String,
18
19
cursor: Option<Cursor>,
19
20
no_zstd: bool,
···
21
22
) -> Result<(), ConsumerError> {
22
23
let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(&jetstream_endpoint);
23
24
if endpoint == jetstream_endpoint {
24
-
log::info!("connecting to jetstream at {endpoint}");
25
+
log::info!("consumer: connecting jetstream at {endpoint}");
25
26
} else {
26
-
log::info!("connecting to jetstream at {jetstream_endpoint} => {endpoint}");
27
+
log::info!("consumer: connecting jetstream at {jetstream_endpoint} => {endpoint}");
27
28
}
28
29
let config: JetstreamConfig = JetstreamConfig {
29
30
endpoint,
···
40
41
.connect_cursor(cursor)
41
42
.await?;
42
43
43
-
log::info!("receiving jetstream messages..");
44
+
log::info!("consumer: receiving messages..");
44
45
loop {
45
46
if shutdown.is_cancelled() {
46
-
log::info!("exiting consumer for shutdown");
47
+
log::info!("consumer: exiting for shutdown");
47
48
return Ok(());
48
49
}
49
50
let Some(event) = receiver.recv().await else {
50
-
log::error!("could not receive jetstream event, bailing");
51
+
log::error!("consumer: could not receive event, bailing");
51
52
break;
52
53
};
53
54
54
55
if event.kind != EventKind::Commit {
55
56
continue;
56
57
}
57
-
let Some(commit) = event.commit else {
58
-
log::warn!("jetstream commit event missing commit data, ignoring");
58
+
let Some(ref commit) = event.commit else {
59
+
log::warn!("consumer: commit event missing commit data, ignoring");
59
60
continue;
60
61
};
61
62
63
+
// TODO: something a bit more robust
62
64
let at_uri = format!("at://{}/{}/{}", &*event.did, &*commit.collection, &*commit.rkey);
63
65
64
66
// TODO: keep a buffer and remove quick deletes to debounce notifs
···
67
69
d.remove_range((at_uri.clone(), 0)..=(at_uri.clone(), MAX_LINKS_PER_EVENT)).await;
68
70
continue;
69
71
}
70
-
let Some(record) = commit.record else {
71
-
log::warn!("jetstream commit update/delete missing record, ignoring");
72
+
let Some(ref record) = commit.record else {
73
+
log::warn!("consumer: commit update/delete missing record, ignoring");
72
74
continue;
73
75
};
74
76
75
77
let jv = match record.get().parse() {
76
78
Ok(v) => v,
77
79
Err(e) => {
78
-
log::warn!("jetstream record failed to parse, ignoring: {e}");
80
+
log::warn!("consumer: record failed to parse, ignoring: {e}");
79
81
continue;
80
82
}
81
83
};
82
84
83
-
// todo: indicate if the link limit was reached (-> links omitted)
84
85
for (i, link) in collect_links(&jv).into_iter().enumerate() {
85
86
if i >= MAX_LINKS_PER_EVENT {
86
-
log::warn!("jetstream event has too many links, ignoring the rest");
87
+
// todo: indicate if the link limit was reached (-> links omitted)
88
+
log::warn!("consumer: event has too many links, ignoring the rest");
89
+
metrics::counter!("consumer_dropped_links", "reason" => "too_many_links").increment(1);
87
90
break;
88
91
}
89
-
let link_ev = LinkEvent {
90
-
collection: commit.collection.to_string(),
91
-
path: link.path,
92
-
origin: at_uri.clone(),
93
-
rev: commit.rev.to_string(),
94
-
target: link.target.into_string(),
92
+
let client_message = match ClientMessage::new_link(link, &at_uri, commit) {
93
+
Ok(m) => m,
94
+
Err(e) => {
95
+
// TODO indicate to clients that a link has been dropped
96
+
log::warn!("consumer: failed to serialize link to json: {e:?}");
97
+
metrics::counter!("consumer_dropped_links", "reason" => "failed_to_serialize").increment(1);
98
+
continue;
99
+
}
95
100
};
96
-
let _ = b.send(link_ev.clone()); // only errors if no subscribers are connected, which is just fine.
97
-
d.enqueue((at_uri.clone(), i), link_ev)
101
+
let message = Arc::new(client_message);
102
+
let _ = b.send(message.clone()); // only errors if no subscribers are connected, which is just fine.
103
+
d.enqueue((at_uri.clone(), i), message)
98
104
.await
99
105
.map_err(|_| ConsumerError::DelayQueueOutputDropped)?;
100
106
}
+3
-4
spacedust/src/delay.rs
+3
-4
spacedust/src/delay.rs
···
1
1
use crate::removable_delay_queue;
2
-
use crate::LinkEvent;
3
2
use tokio_util::sync::CancellationToken;
4
3
use tokio::sync::broadcast;
5
4
use crate::error::DelayError;
6
5
7
-
pub async fn to_broadcast(
8
-
source: removable_delay_queue::Output<(String, usize), LinkEvent>,
9
-
dest: broadcast::Sender<LinkEvent>,
6
+
pub async fn to_broadcast<T>(
7
+
source: removable_delay_queue::Output<(String, usize), T>,
8
+
dest: broadcast::Sender<T>,
10
9
shutdown: CancellationToken,
11
10
) -> Result<(), DelayError> {
12
11
loop {
+61
-26
spacedust/src/lib.rs
+61
-26
spacedust/src/lib.rs
···
5
5
pub mod subscriber;
6
6
pub mod removable_delay_queue;
7
7
8
+
use links::CollectedLink;
9
+
use jetstream::events::CommitEvent;
10
+
use tokio_tungstenite::tungstenite::Message;
8
11
use serde::Serialize;
9
12
10
-
#[derive(Debug, Clone)]
11
-
pub struct LinkEvent {
12
-
collection: String,
13
-
path: String,
14
-
origin: String,
15
-
target: String,
16
-
rev: String,
13
+
#[derive(Debug)]
14
+
pub struct FilterableProperties {
15
+
/// Full unmodified DID, at-uri, or url
16
+
pub subject: String,
17
+
/// User/identity DID.
18
+
///
19
+
/// Will match both bare-DIDs and DIDs extracted from at-uris.
20
+
/// `None` for any URL.
21
+
pub subject_did: Option<String>,
22
+
/// Link source -- collection NSID joined with `:` to the record property path.
23
+
pub source: String,
24
+
}
25
+
26
+
/// A serialized message with filterable properties attached
27
+
#[derive(Debug)]
28
+
pub struct ClientMessage {
29
+
pub message: Message, // always Message::Text
30
+
pub properties: FilterableProperties,
31
+
}
32
+
33
+
impl ClientMessage {
34
+
pub fn new_link(link: CollectedLink, at_uri: &str, commit: &CommitEvent) -> Result<Self, serde_json::Error> {
35
+
let subject_did = link.target.did();
36
+
37
+
let subject = link.target.into_string();
38
+
39
+
let undotted = link.path.strip_prefix('.').unwrap_or_else(|| {
40
+
eprintln!("link path did not have expected '.' prefix: {}", link.path);
41
+
""
42
+
});
43
+
let source = format!("{}:{undotted}", &*commit.collection);
44
+
45
+
let client_link_event = ClientLinkEvent {
46
+
operation: "create",
47
+
source: source.clone(),
48
+
source_record: at_uri.to_string(),
49
+
source_rev: commit.rev.to_string(),
50
+
subject: subject.clone(),
51
+
};
52
+
53
+
let client_event = ClientEvent {
54
+
kind: "link",
55
+
origin: "live", // TODO: indicate when we're locally replaying jetstream on reconnect?? maybe not.
56
+
link: client_link_event,
57
+
};
58
+
59
+
let client_event_json = serde_json::to_string(&client_event)?;
60
+
61
+
let message = Message::Text(client_event_json.into());
62
+
63
+
let properties = FilterableProperties { subject, subject_did, source };
64
+
65
+
Ok(ClientMessage { message, properties })
66
+
}
17
67
}
18
68
19
69
#[derive(Debug, Serialize)]
20
70
#[serde(rename_all="snake_case")]
21
71
pub struct ClientEvent {
22
-
kind: String, // "link"
23
-
origin: String, // "live", "replay", "backfill"
72
+
kind: &'static str, // "link"
73
+
origin: &'static str, // "live", "replay", "backfill"
24
74
link: ClientLinkEvent,
25
75
}
26
76
27
77
#[derive(Debug, Serialize)]
28
78
struct ClientLinkEvent {
29
-
operation: String, // "create", "delete" (prob no update, though maybe for rev?)
79
+
operation: &'static str, // "create", "delete" (prob no update, though maybe for rev?)
30
80
source: String,
31
81
source_record: String,
32
82
source_rev: String,
33
83
subject: String,
34
84
// TODO: include the record too? would save clients a level of hydration
35
-
}
36
-
37
-
impl From<LinkEvent> for ClientLinkEvent {
38
-
fn from(link: LinkEvent) -> Self {
39
-
let undotted = link.path.strip_prefix('.').unwrap_or_else(|| {
40
-
eprintln!("link path did not have expected '.' prefix: {}", link.path);
41
-
""
42
-
});
43
-
Self {
44
-
operation: "create".to_string(),
45
-
source: format!("{}:{undotted}", link.collection),
46
-
source_record: link.origin,
47
-
source_rev: link.rev,
48
-
subject: link.target,
49
-
}
50
-
}
85
+
// ^^ no, not for now. until we backfill + support broader deletes at *least*.
51
86
}
+5
-5
spacedust/src/server.rs
+5
-5
spacedust/src/server.rs
···
2
2
use crate::subscriber::Subscriber;
3
3
use metrics::{histogram, counter};
4
4
use std::sync::Arc;
5
-
use crate::LinkEvent;
5
+
use crate::ClientMessage;
6
6
use http::{
7
7
header::{ORIGIN, USER_AGENT},
8
8
Response, StatusCode,
···
28
28
const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
29
29
30
30
pub async fn serve(
31
-
b: broadcast::Sender<LinkEvent>,
32
-
d: broadcast::Sender<LinkEvent>,
31
+
b: broadcast::Sender<Arc<ClientMessage>>,
32
+
d: broadcast::Sender<Arc<ClientMessage>>,
33
33
shutdown: CancellationToken
34
34
) -> Result<(), ServerError> {
35
35
let config_logging = ConfigLogging::StderrTerminal {
···
90
90
#[derive(Debug, Clone)]
91
91
struct Context {
92
92
pub spec: Arc<serde_json::Value>,
93
-
pub b: broadcast::Sender<LinkEvent>,
94
-
pub d: broadcast::Sender<LinkEvent>,
93
+
pub b: broadcast::Sender<Arc<ClientMessage>>,
94
+
pub d: broadcast::Sender<Arc<ClientMessage>>,
95
95
pub shutdown: CancellationToken,
96
96
}
97
97
+16
-40
spacedust/src/subscriber.rs
+16
-40
spacedust/src/subscriber.rs
···
1
+
use std::sync::Arc;
1
2
use tokio::time::interval;
2
3
use std::time::Duration;
3
4
use futures::StreamExt;
4
-
use crate::ClientEvent;
5
-
use crate::LinkEvent;
5
+
use crate::{ClientMessage, FilterableProperties};
6
6
use crate::server::MultiSubscribeQuery;
7
7
use futures::SinkExt;
8
8
use std::error::Error;
···
29
29
pub async fn start(
30
30
self,
31
31
ws: WebSocketStream<WebsocketConnectionRaw>,
32
-
mut receiver: broadcast::Receiver<LinkEvent>
32
+
mut receiver: broadcast::Receiver<Arc<ClientMessage>>
33
33
) -> Result<(), Box<dyn Error>> {
34
34
let mut ping_state = None;
35
35
let (mut ws_sender, mut ws_receiver) = ws.split();
···
44
44
loop {
45
45
tokio::select! {
46
46
l = receiver.recv() => match l {
47
-
Ok(link) => if let Some(message) = self.filter(link) {
48
-
if let Err(e) = ws_sender.send(message).await {
47
+
Ok(link) => if self.filter(&link.properties) {
48
+
if let Err(e) = ws_sender.send(link.message.clone()).await {
49
49
log::warn!("failed to send link, dropping subscriber: {e:?}");
50
50
break;
51
51
}
···
116
116
117
117
fn filter(
118
118
&self,
119
-
link: LinkEvent,
120
-
// mut sender: impl Sink<Message> + Unpin
121
-
) -> Option<Message> {
119
+
properties: &FilterableProperties,
120
+
) -> bool {
122
121
let query = &self.query;
123
122
124
123
// subject + subject DIDs are logical OR
125
-
let target_did = if link.target.starts_with("did:") {
126
-
link.target.clone()
127
-
} else {
128
-
let rest = link.target.strip_prefix("at://")?;
129
-
if let Some((did, _)) = rest.split_once("/") {
130
-
did
131
-
} else {
132
-
rest
133
-
}.to_string()
134
-
};
135
-
if !(query.wanted_subjects.contains(&link.target) || query.wanted_subject_dids.contains(&target_did) || query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty()) {
136
-
// wowwww ^^ fix that
137
-
return None
124
+
if !(
125
+
query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty() ||
126
+
query.wanted_subjects.contains(&properties.subject) ||
127
+
properties.subject_did.as_ref().map(|did| query.wanted_subject_dids.contains(did)).unwrap_or(false)
128
+
) { // wowwww ^^ fix that
129
+
return false
138
130
}
139
131
140
132
// subjects together with sources are logical AND
141
-
142
-
if !query.wanted_sources.is_empty() {
143
-
let undotted = link.path.strip_prefix('.').unwrap_or_else(|| {
144
-
eprintln!("link path did not have expected '.' prefix: {}", link.path);
145
-
""
146
-
});
147
-
let source = format!("{}:{undotted}", link.collection);
148
-
if !query.wanted_sources.contains(&source) {
149
-
return None
150
-
}
133
+
if !(query.wanted_sources.is_empty() || query.wanted_sources.contains(&properties.source)) {
134
+
return false
151
135
}
152
136
153
-
let ev = ClientEvent {
154
-
kind: "link".to_string(),
155
-
origin: "live".to_string(),
156
-
link: link.into(),
157
-
};
158
-
159
-
let json = serde_json::to_string(&ev).unwrap();
160
-
161
-
Some(Message::Text(json.into()))
137
+
true
162
138
}
163
139
}