Rust AppView - highly experimental!
1//! Tap event processor 2//! 3//! Converts Tap events into database operations without performing any database access. 4//! This maintains the database-free architecture of the worker layer. 5 6use crate::database_writer::{UnresolvedEvent, UnresolvedEventType, UnresolvedRecord, WriterEvent, EventSource}; 7use crate::relay::types::{CollectionType, RecordTypes}; 8use crate::sources::tap::{TapEvent, RecordAction}; 9use eyre::Result; 10use std::str::FromStr; 11 12/// Process a Tap event and convert it to database operations 13/// 14/// Returns a vector of WriterEvents to be sent to the database writer. 15/// Most events produce a single WriterEvent, but some may produce multiple. 16pub async fn process_tap_event(event: TapEvent) -> Result<Vec<WriterEvent>> { 17 let mut writer_events = Vec::new(); 18 19 if let Some(record) = event.record { 20 // Parse record action 21 let action = RecordAction::from_str(&record.action) 22 .ok_or_else(|| eyre::eyre!("Unknown record action: {}", record.action))?; 23 24 // Build AT URI 25 let at_uri = format!("at://{}/{}/{}", record.did, record.collection, record.rkey); 26 27 // Determine event type 28 let event_type = match action { 29 RecordAction::Delete => { 30 // Parse collection type 31 let collection = parse_collection_type(&record.collection)?; 32 UnresolvedEventType::Delete { collection } 33 } 34 RecordAction::Create | RecordAction::Update => { 35 // Parse and deserialize the record 36 if let Some(record_data) = record.record { 37 let record_type = parse_record(&record.collection, record_data)?; 38 let cid = if let Some(cid_str) = record.cid { 39 ipld_core::cid::Cid::from_str(&cid_str)? 40 } else { 41 return Err(eyre::eyre!("Missing CID for create/update operation")); 42 }; 43 44 UnresolvedEventType::CreateUpdate { 45 record: Box::new(record_type), 46 cid, 47 } 48 } else { 49 return Err(eyre::eyre!("Missing record data for create/update operation")); 50 } 51 } 52 }; 53 54 // Create unresolved event for the database writer 55 let unresolved = UnresolvedEvent { 56 repo: record.did.clone(), 57 event_type, 58 at_uri, 59 rkey: record.rkey.clone(), 60 source: if record.live { EventSource::Tap } else { EventSource::TapBackfill }, 61 cursor: None, // Will be set by the relay indexer 62 }; 63 64 // For backfill events, we could potentially batch them 65 // For now, treat them the same as live events but with different source 66 writer_events.push(WriterEvent::Unresolved(Box::new(unresolved))); 67 } else if let Some(identity) = event.identity { 68 // Handle identity events as resolved events (no FK resolution needed) 69 // These update actor handle and status 70 use crate::database_writer::DatabaseOperation; 71 use chrono::Utc; 72 73 let status = if identity.is_active { 74 Some(parakeet_db::types::ActorStatus::Active) 75 } else { 76 Some(parakeet_db::types::ActorStatus::Deactivated) 77 }; 78 79 let operation = DatabaseOperation::UpsertActor { 80 did: identity.did.clone(), 81 status, 82 sync_state: parakeet_db::types::ActorSyncState::Synced, 83 handle: identity.handle.clone(), 84 account_created_at: None, // Not provided by Tap identity events 85 timestamp: Utc::now(), 86 }; 87 88 let processed = crate::database_writer::ProcessedEvent { 89 operations: vec![operation], 90 cursor: None, 91 source: EventSource::Tap, 92 }; 93 94 writer_events.push(WriterEvent::Resolved(Box::new(processed))); 95 } 96 97 Ok(writer_events) 98} 99 100/// Parse collection string into CollectionType enum 101fn parse_collection_type(collection: &str) -> Result<CollectionType> { 102 match collection { 103 "app.bsky.actor.profile" => Ok(CollectionType::BskyProfile), 104 "app.bsky.actor.status" => Ok(CollectionType::BskyStatus), 105 "app.bsky.feed.generator" => Ok(CollectionType::BskyFeedGen), 106 "app.bsky.feed.like" => Ok(CollectionType::BskyFeedLike), 107 "app.bsky.feed.post" => Ok(CollectionType::BskyFeedPost), 108 "app.bsky.feed.repost" => Ok(CollectionType::BskyFeedRepost), 109 "app.bsky.feed.threadgate" => Ok(CollectionType::BskyFeedThreadgate), 110 "app.bsky.graph.block" => Ok(CollectionType::BskyBlock), 111 "app.bsky.graph.follow" => Ok(CollectionType::BskyFollow), 112 "app.bsky.graph.list" => Ok(CollectionType::BskyList), 113 "app.bsky.graph.listblock" => Ok(CollectionType::BskyListBlock), 114 "app.bsky.graph.listitem" => Ok(CollectionType::BskyListItem), 115 "app.bsky.graph.starterpack" => Ok(CollectionType::BskyStarterPack), 116 "app.bsky.labeler.service" => Ok(CollectionType::BskyLabelerService), 117 _ => Ok(CollectionType::Unsupported), 118 } 119} 120 121/// Parse JSON record into RecordTypes enum 122fn parse_record(collection: &str, record_data: serde_json::Value) -> Result<RecordTypes> { 123 // Parse the record with the $type field 124 let mut record_with_type = record_data; 125 126 // Add the $type field if not present (for deserialization) 127 if !record_with_type.get("$type").is_some() { 128 if let Some(obj) = record_with_type.as_object_mut() { 129 obj.insert("$type".to_string(), serde_json::Value::String(collection.to_string())); 130 } 131 } 132 133 let record_type: RecordTypes = serde_json::from_str(&record_with_type.to_string())?; 134 Ok(record_type) 135} 136 137/// Process a batch of Tap events 138/// 139/// This function batches backfill events by actor for bulk processing. 140/// Live events are processed immediately, while backfill events are collected 141/// into batches by actor and sent as UnresolvedBulk or ResolvedBulk events. 142pub async fn process_tap_events_batch(events: Vec<TapEvent>) -> Result<Vec<WriterEvent>> { 143 let mut writer_events = Vec::new(); 144 let mut backfill_by_actor: std::collections::HashMap<String, Vec<UnresolvedRecord>> = std::collections::HashMap::new(); 145 146 for event in events { 147 if let Some(record) = event.record { 148 // Check if this is a backfill event 149 if !record.live { 150 // This is a backfill event - batch it by actor 151 let actor_did = record.did.clone(); 152 153 // Parse record action 154 let action = RecordAction::from_str(&record.action) 155 .ok_or_else(|| eyre::eyre!("Unknown record action: {}", record.action))?; 156 157 // Build AT URI 158 let at_uri = format!("at://{}/{}/{}", record.did, record.collection, record.rkey); 159 160 // For backfill, we only batch create/update operations 161 // Deletes are processed individually 162 match action { 163 RecordAction::Create | RecordAction::Update => { 164 if let Some(record_data) = record.record { 165 let record_type = parse_record(&record.collection, record_data)?; 166 let cid = if let Some(cid_str) = record.cid { 167 ipld_core::cid::Cid::from_str(&cid_str)? 168 } else { 169 return Err(eyre::eyre!("Missing CID for create/update operation")); 170 }; 171 172 // Create unresolved record for batching 173 let unresolved_record = UnresolvedRecord { 174 at_uri: at_uri.clone(), 175 rkey: record.rkey.clone(), 176 cid, 177 record: Box::new(record_type), 178 }; 179 180 backfill_by_actor.entry(actor_did) 181 .or_default() 182 .push(unresolved_record); 183 } 184 } 185 RecordAction::Delete => { 186 // Process deletes immediately (they're rare in backfill) 187 let collection = parse_collection_type(&record.collection)?; 188 let unresolved = UnresolvedEvent { 189 repo: record.did.clone(), 190 event_type: UnresolvedEventType::Delete { collection }, 191 at_uri, 192 rkey: record.rkey.clone(), 193 source: EventSource::TapBackfill, 194 cursor: None, 195 }; 196 writer_events.push(WriterEvent::Unresolved(Box::new(unresolved))); 197 } 198 } 199 } else { 200 // Live event - process immediately 201 let live_events = process_tap_event(TapEvent { 202 id: event.id, 203 event_type: event.event_type, 204 record: Some(record), 205 identity: None, 206 }).await?; 207 writer_events.extend(live_events); 208 } 209 } else if let Some(identity) = event.identity { 210 // Identity events are always processed immediately 211 let identity_events = process_tap_event(TapEvent { 212 id: event.id, 213 event_type: event.event_type, 214 record: None, 215 identity: Some(identity), 216 }).await?; 217 writer_events.extend(identity_events); 218 } 219 } 220 221 // Send batched backfill events as UnresolvedBulk 222 // Batch size threshold: 50+ records per actor 223 const BULK_THRESHOLD: usize = 50; 224 225 for (actor_did, records) in backfill_by_actor { 226 if records.len() >= BULK_THRESHOLD { 227 // Send as bulk event for efficient processing 228 // Note: We don't have actor_id yet, it will be resolved by the database writer 229 writer_events.push(WriterEvent::UnresolvedBulk { 230 repo: actor_did, 231 actor_id: 0, // Will be resolved by database writer 232 records, 233 source: EventSource::TapBackfill, 234 }); 235 } else { 236 // Small batch - send as individual unresolved events 237 for unresolved_record in records { 238 let unresolved = UnresolvedEvent { 239 repo: actor_did.clone(), 240 event_type: UnresolvedEventType::CreateUpdate { 241 record: unresolved_record.record, 242 cid: unresolved_record.cid, 243 }, 244 at_uri: unresolved_record.at_uri, 245 rkey: unresolved_record.rkey, 246 source: EventSource::TapBackfill, 247 cursor: None, 248 }; 249 writer_events.push(WriterEvent::Unresolved(Box::new(unresolved))); 250 } 251 } 252 } 253 254 Ok(writer_events) 255}