1//! Tap event processor
2//!
3//! Converts Tap events into database operations without performing any database access.
4//! This maintains the database-free architecture of the worker layer.
5
6use crate::database_writer::{UnresolvedEvent, UnresolvedEventType, UnresolvedRecord, WriterEvent, EventSource};
7use crate::relay::types::{CollectionType, RecordTypes};
8use crate::sources::tap::{TapEvent, RecordAction};
9use eyre::Result;
10use std::str::FromStr;
11
12/// Process a Tap event and convert it to database operations
13///
14/// Returns a vector of WriterEvents to be sent to the database writer.
15/// Most events produce a single WriterEvent, but some may produce multiple.
16pub async fn process_tap_event(event: TapEvent) -> Result<Vec<WriterEvent>> {
17 let mut writer_events = Vec::new();
18
19 if let Some(record) = event.record {
20 // Parse record action
21 let action = RecordAction::from_str(&record.action)
22 .ok_or_else(|| eyre::eyre!("Unknown record action: {}", record.action))?;
23
24 // Build AT URI
25 let at_uri = format!("at://{}/{}/{}", record.did, record.collection, record.rkey);
26
27 // Determine event type
28 let event_type = match action {
29 RecordAction::Delete => {
30 // Parse collection type
31 let collection = parse_collection_type(&record.collection)?;
32 UnresolvedEventType::Delete { collection }
33 }
34 RecordAction::Create | RecordAction::Update => {
35 // Parse and deserialize the record
36 if let Some(record_data) = record.record {
37 let record_type = parse_record(&record.collection, record_data)?;
38 let cid = if let Some(cid_str) = record.cid {
39 ipld_core::cid::Cid::from_str(&cid_str)?
40 } else {
41 return Err(eyre::eyre!("Missing CID for create/update operation"));
42 };
43
44 UnresolvedEventType::CreateUpdate {
45 record: Box::new(record_type),
46 cid,
47 }
48 } else {
49 return Err(eyre::eyre!("Missing record data for create/update operation"));
50 }
51 }
52 };
53
54 // Create unresolved event for the database writer
55 let unresolved = UnresolvedEvent {
56 repo: record.did.clone(),
57 event_type,
58 at_uri,
59 rkey: record.rkey.clone(),
60 source: if record.live { EventSource::Tap } else { EventSource::TapBackfill },
61 cursor: None, // Will be set by the relay indexer
62 };
63
64 // For backfill events, we could potentially batch them
65 // For now, treat them the same as live events but with different source
66 writer_events.push(WriterEvent::Unresolved(Box::new(unresolved)));
67 } else if let Some(identity) = event.identity {
68 // Handle identity events as resolved events (no FK resolution needed)
69 // These update actor handle and status
70 use crate::database_writer::DatabaseOperation;
71 use chrono::Utc;
72
73 let status = if identity.is_active {
74 Some(parakeet_db::types::ActorStatus::Active)
75 } else {
76 Some(parakeet_db::types::ActorStatus::Deactivated)
77 };
78
79 let operation = DatabaseOperation::UpsertActor {
80 did: identity.did.clone(),
81 status,
82 sync_state: parakeet_db::types::ActorSyncState::Synced,
83 handle: identity.handle.clone(),
84 account_created_at: None, // Not provided by Tap identity events
85 timestamp: Utc::now(),
86 };
87
88 let processed = crate::database_writer::ProcessedEvent {
89 operations: vec![operation],
90 cursor: None,
91 source: EventSource::Tap,
92 };
93
94 writer_events.push(WriterEvent::Resolved(Box::new(processed)));
95 }
96
97 Ok(writer_events)
98}
99
100/// Parse collection string into CollectionType enum
101fn parse_collection_type(collection: &str) -> Result<CollectionType> {
102 match collection {
103 "app.bsky.actor.profile" => Ok(CollectionType::BskyProfile),
104 "app.bsky.actor.status" => Ok(CollectionType::BskyStatus),
105 "app.bsky.feed.generator" => Ok(CollectionType::BskyFeedGen),
106 "app.bsky.feed.like" => Ok(CollectionType::BskyFeedLike),
107 "app.bsky.feed.post" => Ok(CollectionType::BskyFeedPost),
108 "app.bsky.feed.repost" => Ok(CollectionType::BskyFeedRepost),
109 "app.bsky.feed.threadgate" => Ok(CollectionType::BskyFeedThreadgate),
110 "app.bsky.graph.block" => Ok(CollectionType::BskyBlock),
111 "app.bsky.graph.follow" => Ok(CollectionType::BskyFollow),
112 "app.bsky.graph.list" => Ok(CollectionType::BskyList),
113 "app.bsky.graph.listblock" => Ok(CollectionType::BskyListBlock),
114 "app.bsky.graph.listitem" => Ok(CollectionType::BskyListItem),
115 "app.bsky.graph.starterpack" => Ok(CollectionType::BskyStarterPack),
116 "app.bsky.labeler.service" => Ok(CollectionType::BskyLabelerService),
117 _ => Ok(CollectionType::Unsupported),
118 }
119}
120
121/// Parse JSON record into RecordTypes enum
122fn parse_record(collection: &str, record_data: serde_json::Value) -> Result<RecordTypes> {
123 // Parse the record with the $type field
124 let mut record_with_type = record_data;
125
126 // Add the $type field if not present (for deserialization)
127 if !record_with_type.get("$type").is_some() {
128 if let Some(obj) = record_with_type.as_object_mut() {
129 obj.insert("$type".to_string(), serde_json::Value::String(collection.to_string()));
130 }
131 }
132
133 let record_type: RecordTypes = serde_json::from_str(&record_with_type.to_string())?;
134 Ok(record_type)
135}
136
137/// Process a batch of Tap events
138///
139/// This function batches backfill events by actor for bulk processing.
140/// Live events are processed immediately, while backfill events are collected
141/// into batches by actor and sent as UnresolvedBulk or ResolvedBulk events.
142pub async fn process_tap_events_batch(events: Vec<TapEvent>) -> Result<Vec<WriterEvent>> {
143 let mut writer_events = Vec::new();
144 let mut backfill_by_actor: std::collections::HashMap<String, Vec<UnresolvedRecord>> = std::collections::HashMap::new();
145
146 for event in events {
147 if let Some(record) = event.record {
148 // Check if this is a backfill event
149 if !record.live {
150 // This is a backfill event - batch it by actor
151 let actor_did = record.did.clone();
152
153 // Parse record action
154 let action = RecordAction::from_str(&record.action)
155 .ok_or_else(|| eyre::eyre!("Unknown record action: {}", record.action))?;
156
157 // Build AT URI
158 let at_uri = format!("at://{}/{}/{}", record.did, record.collection, record.rkey);
159
160 // For backfill, we only batch create/update operations
161 // Deletes are processed individually
162 match action {
163 RecordAction::Create | RecordAction::Update => {
164 if let Some(record_data) = record.record {
165 let record_type = parse_record(&record.collection, record_data)?;
166 let cid = if let Some(cid_str) = record.cid {
167 ipld_core::cid::Cid::from_str(&cid_str)?
168 } else {
169 return Err(eyre::eyre!("Missing CID for create/update operation"));
170 };
171
172 // Create unresolved record for batching
173 let unresolved_record = UnresolvedRecord {
174 at_uri: at_uri.clone(),
175 rkey: record.rkey.clone(),
176 cid,
177 record: Box::new(record_type),
178 };
179
180 backfill_by_actor.entry(actor_did)
181 .or_default()
182 .push(unresolved_record);
183 }
184 }
185 RecordAction::Delete => {
186 // Process deletes immediately (they're rare in backfill)
187 let collection = parse_collection_type(&record.collection)?;
188 let unresolved = UnresolvedEvent {
189 repo: record.did.clone(),
190 event_type: UnresolvedEventType::Delete { collection },
191 at_uri,
192 rkey: record.rkey.clone(),
193 source: EventSource::TapBackfill,
194 cursor: None,
195 };
196 writer_events.push(WriterEvent::Unresolved(Box::new(unresolved)));
197 }
198 }
199 } else {
200 // Live event - process immediately
201 let live_events = process_tap_event(TapEvent {
202 id: event.id,
203 event_type: event.event_type,
204 record: Some(record),
205 identity: None,
206 }).await?;
207 writer_events.extend(live_events);
208 }
209 } else if let Some(identity) = event.identity {
210 // Identity events are always processed immediately
211 let identity_events = process_tap_event(TapEvent {
212 id: event.id,
213 event_type: event.event_type,
214 record: None,
215 identity: Some(identity),
216 }).await?;
217 writer_events.extend(identity_events);
218 }
219 }
220
221 // Send batched backfill events as UnresolvedBulk
222 // Batch size threshold: 50+ records per actor
223 const BULK_THRESHOLD: usize = 50;
224
225 for (actor_did, records) in backfill_by_actor {
226 if records.len() >= BULK_THRESHOLD {
227 // Send as bulk event for efficient processing
228 // Note: We don't have actor_id yet, it will be resolved by the database writer
229 writer_events.push(WriterEvent::UnresolvedBulk {
230 repo: actor_did,
231 actor_id: 0, // Will be resolved by database writer
232 records,
233 source: EventSource::TapBackfill,
234 });
235 } else {
236 // Small batch - send as individual unresolved events
237 for unresolved_record in records {
238 let unresolved = UnresolvedEvent {
239 repo: actor_did.clone(),
240 event_type: UnresolvedEventType::CreateUpdate {
241 record: unresolved_record.record,
242 cid: unresolved_record.cid,
243 },
244 at_uri: unresolved_record.at_uri,
245 rkey: unresolved_record.rkey,
246 source: EventSource::TapBackfill,
247 cursor: None,
248 };
249 writer_events.push(WriterEvent::Unresolved(Box::new(unresolved)));
250 }
251 }
252 }
253
254 Ok(writer_events)
255}