+15
consumer/src/firehose/mod.rs
+15
consumer/src/firehose/mod.rs
···
117
117
118
118
FirehoseEvent::Label(event)
119
119
}
120
+
"#sync" => {
121
+
counter!("firehose_events.total", "event" => "sync").increment(1);
122
+
let event: AtpSyncEvent =
123
+
serde_ipld_dagcbor::from_reader(&mut reader)?;
124
+
125
+
// increment the seq
126
+
if self.seq < event.seq {
127
+
self.seq = event.seq;
128
+
} else {
129
+
tracing::error!("Event sequence was not greater than previous seq, exiting. {} <= {}", event.seq, self.seq);
130
+
return Ok(FirehoseOutput::Close);
131
+
}
132
+
133
+
FirehoseEvent::Sync(event)
134
+
}
120
135
_ => {
121
136
tracing::warn!("unknown event type {ty}");
122
137
return Ok(FirehoseOutput::Continue);
+23
consumer/src/firehose/types.rs
+23
consumer/src/firehose/types.rs
···
31
31
Account(AtpAccountEvent),
32
32
Commit(AtpCommitEvent),
33
33
Label(AtpLabelEvent),
34
+
Sync(AtpSyncEvent),
34
35
}
35
36
36
37
#[derive(Debug, Deserialize)]
···
48
49
Suspended,
49
50
Deleted,
50
51
Deactivated,
52
+
Throttled,
53
+
Desynchronized,
51
54
}
52
55
53
56
impl AtpAccountStatus {
···
57
60
AtpAccountStatus::Suspended => "suspended",
58
61
AtpAccountStatus::Deleted => "deleted",
59
62
AtpAccountStatus::Deactivated => "deactivated",
63
+
AtpAccountStatus::Throttled => "throttled",
64
+
AtpAccountStatus::Desynchronized => "desynchronized",
60
65
}
61
66
}
62
67
}
···
68
73
AtpAccountStatus::Suspended => parakeet_db::types::ActorStatus::Suspended,
69
74
AtpAccountStatus::Deleted => parakeet_db::types::ActorStatus::Deleted,
70
75
AtpAccountStatus::Deactivated => parakeet_db::types::ActorStatus::Deactivated,
76
+
AtpAccountStatus::Throttled | AtpAccountStatus::Desynchronized => {
77
+
parakeet_db::types::ActorStatus::Active
78
+
}
71
79
}
72
80
}
73
81
}
···
90
98
pub since: Option<String>,
91
99
pub commit: Cid,
92
100
#[serde(rename = "tooBig")]
101
+
#[deprecated]
93
102
pub too_big: bool,
94
103
#[serde(default)]
95
104
pub blocks: ByteBuf,
96
105
#[serde(default)]
97
106
pub ops: Vec<CommitOp>,
98
107
#[serde(default)]
108
+
#[deprecated]
99
109
pub blobs: Vec<Cid>,
110
+
#[serde(rename = "prevData")]
111
+
pub prev_data: Option<Cid>,
100
112
}
101
113
102
114
#[derive(Debug, Deserialize)]
103
115
pub struct CommitOp {
104
116
pub action: String,
105
117
pub cid: Option<Cid>,
118
+
pub prev: Option<Cid>,
106
119
pub path: String,
107
120
}
108
121
···
124
137
pub seq: u64,
125
138
pub labels: Vec<AtpLabel>,
126
139
}
140
+
141
+
#[derive(Debug, Deserialize)]
142
+
pub struct AtpSyncEvent {
143
+
pub seq: u64,
144
+
pub did: String,
145
+
pub time: DateTime<Utc>,
146
+
pub rev: String,
147
+
#[serde(default)]
148
+
pub blocks: ByteBuf,
149
+
}
+26
-2
consumer/src/indexer/mod.rs
+26
-2
consumer/src/indexer/mod.rs
···
1
1
use crate::config::HistoryMode;
2
2
use crate::db;
3
3
use crate::firehose::{
4
-
AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, CommitOp, FirehoseConsumer, FirehoseEvent,
5
-
FirehoseOutput,
4
+
AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, AtpSyncEvent, CommitOp, FirehoseConsumer,
5
+
FirehoseEvent, FirehoseOutput,
6
6
};
7
7
use crate::indexer::types::{
8
8
AggregateDeltaStore, BackfillItem, BackfillItemInner, CollectionType, RecordTypes,
···
107
107
FirehoseEvent::Commit(commit) => {
108
108
index_commit(&mut state, &mut conn, &mut rc, commit).await
109
109
}
110
+
FirehoseEvent::Sync(sync) => {
111
+
process_sync(&state, &mut conn, &mut rc, sync).await
112
+
}
110
113
FirehoseEvent::Label(_) => unreachable!(),
111
114
};
112
115
···
188
191
FirehoseEvent::Identity(identity) => self.hasher.hash_one(&identity.did) % threads,
189
192
FirehoseEvent::Account(account) => self.hasher.hash_one(&account.did) % threads,
190
193
FirehoseEvent::Commit(commit) => self.hasher.hash_one(&commit.repo) % threads,
194
+
FirehoseEvent::Sync(sync) => self.hasher.hash_one(&sync.did) % threads,
191
195
FirehoseEvent::Label(_) => {
192
196
// We handle all labels through direct connections to labelers
193
197
tracing::warn!("got #labels from the relay");
···
199
203
tracing::error!("Error sending event: {e}");
200
204
}
201
205
}
206
+
}
207
+
208
+
#[instrument(skip_all, fields(seq = sync.seq, repo = sync.did))]
209
+
async fn process_sync(
210
+
state: &RelayIndexerState,
211
+
conn: &mut Object,
212
+
rc: &mut MultiplexedConnection,
213
+
sync: AtpSyncEvent,
214
+
) -> eyre::Result<()> {
215
+
let Some((sync_state, Some(current_rev))) = db::actor_get_repo_status(conn, &sync.did).await? else {
216
+
return Ok(());
217
+
};
218
+
219
+
// don't care if we're not synced. also no point if !do_backfill bc we might not have a worker
220
+
if sync_state == ActorSyncState::Synced && state.do_backfill && sync.rev > current_rev {
221
+
tracing::debug!("triggering backfill due to #sync");
222
+
rc.rpush::<_, _, i32>("backfill_queue", sync.did).await?;
223
+
}
224
+
225
+
Ok(())
202
226
}
203
227
204
228
#[instrument(skip_all, fields(seq = identity.seq, repo = identity.did))]