tangled
alpha
login
or
join now
ptr.pet
/
hydrant
61
fork
atom
very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
61
fork
atom
overview
issues
6
pulls
pipelines
[ingest] log relay url
ptr.pet
2 weeks ago
df5b8367
1aa36eda
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+80
-87
9 changed files
expand all
collapse all
unified
split
src
crawler
mod.rs
db
keys.rs
mod.rs
ingest
firehose.rs
mod.rs
worker.rs
main.rs
state.rs
util.rs
+3
-3
src/crawler/mod.rs
reviewed
···
3
3
use crate::db::{Db, keys, ser_repo_state};
4
4
use crate::state::AppState;
5
5
use crate::types::RepoState;
6
6
-
use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after, relay_id};
6
6
+
use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after};
7
7
use chrono::{DateTime, TimeDelta, Utc};
8
8
use futures::FutureExt;
9
9
use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput;
···
214
214
}
215
215
216
216
async fn get_cursor(&self, relay_host: &Url) -> Result<Cursor> {
217
217
-
let key = crawler_cursor_key(&relay_id(relay_host));
217
217
+
let key = crawler_cursor_key(relay_host);
218
218
let cursor_bytes = Db::get(self.state.db.cursors.clone(), &key).await?;
219
219
let cursor: Cursor = cursor_bytes
220
220
.as_deref()
···
603
603
}
604
604
batch.insert(
605
605
&db.cursors,
606
606
-
crawler_cursor_key(&relay_id(relay_host)),
606
606
+
crawler_cursor_key(relay_host),
607
607
rmp_serde::to_vec(&cursor)
608
608
.into_diagnostic()
609
609
.wrap_err("cant serialize cursor")?,
+5
-5
src/db/keys.rs
reviewed
···
2
2
use smol_str::SmolStr;
3
3
4
4
use crate::db::types::{DbRkey, DbTid, TrimmedDid};
5
5
-
use crate::util::RelayId;
5
5
+
use url::Url;
6
6
7
7
/// separator used for composite keys
8
8
pub const SEP: u8 = b'|';
···
163
163
TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..])
164
164
}
165
165
166
166
-
pub fn crawler_cursor_key(relay_id: &RelayId) -> Vec<u8> {
166
166
+
pub fn crawler_cursor_key(relay: &Url) -> Vec<u8> {
167
167
let mut key = b"crawler_cursor|".to_vec();
168
168
-
key.extend_from_slice(relay_id);
168
168
+
key.extend_from_slice(relay.as_str().as_bytes());
169
169
key
170
170
}
171
171
172
172
-
pub fn firehose_cursor_key(relay_id: &RelayId) -> Vec<u8> {
172
172
+
pub fn firehose_cursor_key(relay: &Url) -> Vec<u8> {
173
173
let mut key = b"firehose_cursor|".to_vec();
174
174
-
key.extend_from_slice(relay_id);
174
174
+
key.extend_from_slice(relay.as_str().as_bytes());
175
175
key
176
176
}
+5
-5
src/db/mod.rs
reviewed
···
13
13
use std::sync::Arc;
14
14
use std::sync::atomic::{AtomicBool, AtomicU64};
15
15
16
16
-
use crate::util::RelayId;
16
16
+
use url::Url;
17
17
18
18
pub mod compaction;
19
19
pub mod filter;
···
517
517
}
518
518
}
519
519
520
520
-
pub fn set_firehose_cursor(db: &Db, relay_id: &RelayId, cursor: i64) -> Result<()> {
520
520
+
pub fn set_firehose_cursor(db: &Db, relay: &Url, cursor: i64) -> Result<()> {
521
521
db.cursors
522
522
-
.insert(keys::firehose_cursor_key(relay_id), cursor.to_be_bytes())
522
522
+
.insert(keys::firehose_cursor_key(relay), cursor.to_be_bytes())
523
523
.into_diagnostic()
524
524
}
525
525
526
526
-
pub async fn get_firehose_cursor(db: &Db, relay_id: &RelayId) -> Result<Option<i64>> {
527
527
-
let per_relay_key = keys::firehose_cursor_key(relay_id);
526
526
+
pub async fn get_firehose_cursor(db: &Db, relay: &Url) -> Result<Option<i64>> {
527
527
+
let per_relay_key = keys::firehose_cursor_key(relay);
528
528
if let Some(v) = Db::get(db.cursors.clone(), per_relay_key).await? {
529
529
return Ok(Some(i64::from_be_bytes(
530
530
v.as_ref()
+55
-55
src/ingest/firehose.rs
reviewed
···
1
1
-
use crate::db;
1
1
+
use crate::db::{self, deser_repo_state};
2
2
use crate::filter::{FilterHandle, FilterMode};
3
3
use crate::ingest::stream::{FirehoseStream, SubscribeReposMessage, decode_frame};
4
4
use crate::ingest::{BufferTx, IngestMessage};
5
5
use crate::state::AppState;
6
6
-
use crate::util::RelayId;
7
6
use jacquard_common::IntoStatic;
8
7
use jacquard_common::types::did::Did;
9
8
use miette::{IntoDiagnostic, Result};
10
9
use std::sync::Arc;
11
10
use std::time::Duration;
12
12
-
use tracing::{debug, error, info, trace};
11
11
+
use tracing::{Span, debug, error, info, trace};
13
12
use url::Url;
14
13
15
14
pub struct FirehoseIngestor {
16
15
state: Arc<AppState>,
17
16
buffer_tx: BufferTx,
18
17
relay_host: Url,
19
19
-
relay_id: RelayId,
20
18
filter: FilterHandle,
21
19
_verify_signatures: bool,
22
20
}
···
29
27
filter: FilterHandle,
30
28
verify_signatures: bool,
31
29
) -> Self {
32
32
-
let relay_id = crate::util::relay_id(&relay_host);
33
30
Self {
34
31
state,
35
32
buffer_tx,
36
33
relay_host,
37
37
-
relay_id,
38
34
filter,
39
35
_verify_signatures: verify_signatures,
40
36
}
41
37
}
42
38
39
39
+
#[tracing::instrument(skip(self), fields(relay = %self.relay_host))]
43
40
pub async fn run(self) -> Result<()> {
44
41
loop {
45
45
-
let start_cursor = db::get_firehose_cursor(&self.state.db, &self.relay_id).await?;
42
42
+
let start_cursor = db::get_firehose_cursor(&self.state.db, &self.relay_host).await?;
46
43
47
44
match start_cursor {
48
48
-
Some(c) => info!(relay = %self.relay_host, cursor = %c, "resuming from cursor"),
49
49
-
None => info!(relay = %self.relay_host, "no cursor found, live tailing"),
45
45
+
Some(c) => info!(cursor = %c, "resuming from cursor"),
46
46
+
None => info!("no cursor found, live tailing"),
50
47
}
51
48
52
52
-
let mut stream = match FirehoseStream::connect(self.relay_host.clone(), start_cursor)
53
53
-
.await
54
54
-
{
55
55
-
Ok(s) => s,
56
56
-
Err(e) => {
57
57
-
error!(relay = %self.relay_host, err = %e, "failed to connect to firehose, retrying in 5s");
58
58
-
tokio::time::sleep(Duration::from_secs(5)).await;
59
59
-
continue;
60
60
-
}
61
61
-
};
49
49
+
let mut stream =
50
50
+
match FirehoseStream::connect(self.relay_host.clone(), start_cursor).await {
51
51
+
Ok(s) => s,
52
52
+
Err(e) => {
53
53
+
error!(err = %e, "failed to connect to firehose, retrying in 5s");
54
54
+
tokio::time::sleep(Duration::from_secs(5)).await;
55
55
+
continue;
56
56
+
}
57
57
+
};
62
58
63
63
-
info!(relay = %self.relay_host, "firehose connected");
59
59
+
info!("firehose connected");
64
60
65
61
while let Some(bytes_res) = stream.next().await {
66
62
let bytes = match bytes_res {
67
63
Ok(b) => b,
68
64
Err(e) => {
69
69
-
error!(relay = %self.relay_host, err = %e, "firehose stream error");
65
65
+
error!(err = %e, "firehose stream error");
70
66
break;
71
67
}
72
68
};
73
69
match decode_frame(&bytes) {
74
70
Ok(msg) => self.handle_message(msg).await,
75
71
Err(e) => {
76
76
-
error!(relay = %self.relay_host, err = %e, "firehose stream error");
72
72
+
error!(err = %e, "firehose stream error");
77
73
break;
78
74
}
79
75
}
80
76
}
81
77
82
82
-
error!(relay = %self.relay_host, "firehose disconnected, reconnecting in 5s...");
78
78
+
error!("firehose disconnected, reconnecting in 5s...");
83
79
tokio::time::sleep(Duration::from_secs(5)).await;
84
80
}
85
81
}
···
105
101
trace!(did = %did, "forwarding message to ingest buffer");
106
102
107
103
if let Err(e) = self.buffer_tx.send(IngestMessage::Firehose {
108
108
-
relay_id: self.relay_id.clone(),
104
104
+
relay: self.relay_host.clone(),
109
105
msg: msg.into_static(),
110
106
}) {
111
107
error!(err = %e, "failed to send message to buffer processor");
···
114
110
115
111
async fn should_process(&self, did: &Did<'_>) -> Result<bool> {
116
112
let filter = self.filter.load();
113
113
+
let state = self.state.clone();
114
114
+
let did = did.clone().into_static();
115
115
+
let span = Span::current();
117
116
118
118
-
let excl_key = crate::db::filter::exclude_key(did.as_str())?;
119
119
-
if self
120
120
-
.state
121
121
-
.db
122
122
-
.filter
123
123
-
.contains_key(&excl_key)
124
124
-
.into_diagnostic()?
125
125
-
{
126
126
-
return Ok(false);
127
127
-
}
117
117
+
tokio::task::spawn_blocking(move || {
118
118
+
let _entered = span.entered();
119
119
+
let _entered = tracing::info_span!("should_process", repo = %did).entered();
128
120
129
129
-
match filter.mode {
130
130
-
FilterMode::Full => Ok(true),
131
131
-
FilterMode::Filter => {
132
132
-
let repo_key = crate::db::keys::repo_key(did);
133
133
-
if let Some(state_bytes) = self.state.db.repos.get(&repo_key).into_diagnostic()? {
134
134
-
let repo_state: crate::types::RepoState =
135
135
-
rmp_serde::from_slice(&state_bytes).into_diagnostic()?;
121
121
+
let excl_key = crate::db::filter::exclude_key(did.as_str())?;
122
122
+
if state.db.filter.contains_key(&excl_key).into_diagnostic()? {
123
123
+
return Ok(false);
124
124
+
}
125
125
+
126
126
+
match filter.mode {
127
127
+
FilterMode::Full => Ok(true),
128
128
+
FilterMode::Filter => {
129
129
+
let repo_key = crate::db::keys::repo_key(&did);
130
130
+
if let Some(bytes) = state.db.repos.get(&repo_key).into_diagnostic()? {
131
131
+
let repo_state = deser_repo_state(&bytes)?;
132
132
+
133
133
+
if repo_state.tracked {
134
134
+
trace!(did = %did, "tracked repo, processing");
135
135
+
return Ok(true);
136
136
+
} else {
137
137
+
debug!(did = %did, "known but explicitly untracked, skipping");
138
138
+
return Ok(false);
139
139
+
}
140
140
+
}
136
141
137
137
-
if repo_state.tracked {
138
138
-
trace!(did = %did, "tracked repo, processing");
139
139
-
return Ok(true);
142
142
+
if !filter.signals.is_empty() {
143
143
+
trace!(did = %did, "unknown — passing to worker for signal check");
144
144
+
Ok(true)
140
145
} else {
141
141
-
debug!(did = %did, "known but explicitly untracked, skipping");
142
142
-
return Ok(false);
146
146
+
trace!(did = %did, "unknown and no signals configured, skipping");
147
147
+
Ok(false)
143
148
}
144
149
}
145
145
-
146
146
-
if !filter.signals.is_empty() {
147
147
-
trace!(did = %did, "unknown — passing to worker for signal check");
148
148
-
Ok(true)
149
149
-
} else {
150
150
-
trace!(did = %did, "unknown and no signals configured, skipping");
151
151
-
Ok(false)
152
152
-
}
153
150
}
154
154
-
}
151
151
+
})
152
152
+
.await
153
153
+
.into_diagnostic()
154
154
+
.flatten()
155
155
}
156
156
}
+2
-2
src/ingest/mod.rs
reviewed
···
7
7
use jacquard_common::types::did::Did;
8
8
9
9
use crate::ingest::stream::SubscribeReposMessage;
10
10
-
use crate::util::RelayId;
10
10
+
use url::Url;
11
11
12
12
#[derive(Debug)]
13
13
pub enum IngestMessage {
14
14
Firehose {
15
15
-
relay_id: RelayId,
15
15
+
relay: Url,
16
16
msg: SubscribeReposMessage<'static>,
17
17
},
18
18
BackfillFinished(Did<'static>),
+6
-5
src/ingest/worker.rs
reviewed
···
236
236
}
237
237
}
238
238
}
239
239
-
IngestMessage::Firehose { relay_id, msg } => {
239
239
+
IngestMessage::Firehose { relay, msg } => {
240
240
+
let _span = tracing::info_span!("firehose", relay = %relay).entered();
240
241
let (did, seq) = match &msg {
241
242
SubscribeReposMessage::Commit(c) => (&c.repo, c.seq),
242
243
SubscribeReposMessage::Identity(i) => (&i.did, i.seq),
···
252
253
db::check_poisoned_report(r);
253
254
}
254
255
error!(did = %did, err = %e, "error in check_repo_state");
255
255
-
if let Some((_, cursor)) = state.relay_cursors.get(&relay_id) {
256
256
+
if let Some(cursor) = state.relay_cursors.get(&relay) {
256
257
cursor.store(seq, std::sync::atomic::Ordering::SeqCst);
257
258
}
258
259
continue;
···
306
307
did = %did, err = %e,
307
308
"failed to transition inactive repo to synced"
308
309
);
309
309
-
if let Some((_, cursor)) =
310
310
-
state.relay_cursors.get(&relay_id)
310
310
+
if let Some(cursor) =
311
311
+
state.relay_cursors.get(&relay)
311
312
{
312
313
cursor.store(
313
314
seq,
···
358
359
}
359
360
}
360
361
361
361
-
if let Some((_, cursor)) = state.relay_cursors.get(&relay_id) {
362
362
+
if let Some(cursor) = state.relay_cursors.get(&relay) {
362
363
cursor.store(seq, std::sync::atomic::Ordering::SeqCst);
363
364
}
364
365
}
+2
-2
src/main.rs
reviewed
···
177
177
std::thread::sleep(persist_interval);
178
178
179
179
// persist firehose cursors
180
180
-
for (relay_id, (relay, cursor)) in &state.relay_cursors {
180
180
+
for (relay, cursor) in &state.relay_cursors {
181
181
let seq = cursor.load(Ordering::SeqCst);
182
182
if seq > 0 {
183
183
-
if let Err(e) = db::set_firehose_cursor(&state.db, relay_id, seq) {
183
183
+
if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) {
184
184
error!(relay = %relay, err = %e, "failed to save cursor");
185
185
db::check_poisoned_report(&e);
186
186
}
+2
-3
src/state.rs
reviewed
···
10
10
db::Db,
11
11
filter::{FilterHandle, new_handle},
12
12
resolver::Resolver,
13
13
-
util::{RelayId, relay_id},
14
13
};
15
14
16
15
pub struct AppState {
17
16
pub db: Db,
18
17
pub resolver: Resolver,
19
18
pub filter: FilterHandle,
20
20
-
pub relay_cursors: HashMap<RelayId, (Url, AtomicI64)>,
19
19
+
pub relay_cursors: HashMap<Url, AtomicI64>,
21
20
pub backfill_notify: Notify,
22
21
}
23
22
···
31
30
let relay_cursors = config
32
31
.relays
33
32
.iter()
34
34
-
.map(|url| (relay_id(url), (url.clone(), AtomicI64::new(0))))
33
33
+
.map(|url| (url.clone(), AtomicI64::new(0)))
35
34
.collect();
36
35
37
36
Ok(Self {
-7
src/util.rs
reviewed
···
3
3
use rand::RngExt;
4
4
use reqwest::StatusCode;
5
5
use serde::{Deserialize, Deserializer, Serializer};
6
6
-
use url::Url;
7
7
-
8
8
-
pub type RelayId = Vec<u8>;
9
9
-
10
10
-
pub fn relay_id(url: &Url) -> RelayId {
11
11
-
url.as_str().as_bytes().to_vec()
12
12
-
}
13
6
14
7
/// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed.
15
8
pub enum RetryOutcome<E> {