//! Tap event processor //! //! Converts Tap events into database operations without performing any database access. //! This maintains the database-free architecture of the worker layer. use crate::database_writer::{UnresolvedEvent, UnresolvedEventType, UnresolvedRecord, WriterEvent, EventSource}; use crate::relay::types::{CollectionType, RecordTypes}; use crate::sources::tap::{TapEvent, RecordAction}; use eyre::Result; use std::str::FromStr; /// Process a Tap event and convert it to database operations /// /// Returns a vector of WriterEvents to be sent to the database writer. /// Most events produce a single WriterEvent, but some may produce multiple. pub async fn process_tap_event(event: TapEvent) -> Result> { let mut writer_events = Vec::new(); if let Some(record) = event.record { // Parse record action let action = RecordAction::from_str(&record.action) .ok_or_else(|| eyre::eyre!("Unknown record action: {}", record.action))?; // Build AT URI let at_uri = format!("at://{}/{}/{}", record.did, record.collection, record.rkey); // Determine event type let event_type = match action { RecordAction::Delete => { // Parse collection type let collection = parse_collection_type(&record.collection)?; UnresolvedEventType::Delete { collection } } RecordAction::Create | RecordAction::Update => { // Parse and deserialize the record if let Some(record_data) = record.record { let record_type = parse_record(&record.collection, record_data)?; let cid = if let Some(cid_str) = record.cid { ipld_core::cid::Cid::from_str(&cid_str)? } else { return Err(eyre::eyre!("Missing CID for create/update operation")); }; UnresolvedEventType::CreateUpdate { record: Box::new(record_type), cid, } } else { return Err(eyre::eyre!("Missing record data for create/update operation")); } } }; // Create unresolved event for the database writer let unresolved = UnresolvedEvent { repo: record.did.clone(), event_type, at_uri, rkey: record.rkey.clone(), source: if record.live { EventSource::Tap } else { EventSource::TapBackfill }, cursor: None, // Will be set by the relay indexer }; // For backfill events, we could potentially batch them // For now, treat them the same as live events but with different source writer_events.push(WriterEvent::Unresolved(Box::new(unresolved))); } else if let Some(identity) = event.identity { // Handle identity events as resolved events (no FK resolution needed) // These update actor handle and status use crate::database_writer::DatabaseOperation; use chrono::Utc; let status = if identity.is_active { Some(parakeet_db::types::ActorStatus::Active) } else { Some(parakeet_db::types::ActorStatus::Deactivated) }; let operation = DatabaseOperation::UpsertActor { did: identity.did.clone(), status, sync_state: parakeet_db::types::ActorSyncState::Synced, handle: identity.handle.clone(), account_created_at: None, // Not provided by Tap identity events timestamp: Utc::now(), }; let processed = crate::database_writer::ProcessedEvent { operations: vec![operation], cursor: None, source: EventSource::Tap, }; writer_events.push(WriterEvent::Resolved(Box::new(processed))); } Ok(writer_events) } /// Parse collection string into CollectionType enum fn parse_collection_type(collection: &str) -> Result { match collection { "app.bsky.actor.profile" => Ok(CollectionType::BskyProfile), "app.bsky.actor.status" => Ok(CollectionType::BskyStatus), "app.bsky.feed.generator" => Ok(CollectionType::BskyFeedGen), "app.bsky.feed.like" => Ok(CollectionType::BskyFeedLike), "app.bsky.feed.post" => Ok(CollectionType::BskyFeedPost), "app.bsky.feed.repost" => Ok(CollectionType::BskyFeedRepost), "app.bsky.feed.threadgate" => Ok(CollectionType::BskyFeedThreadgate), "app.bsky.graph.block" => Ok(CollectionType::BskyBlock), "app.bsky.graph.follow" => Ok(CollectionType::BskyFollow), "app.bsky.graph.list" => Ok(CollectionType::BskyList), "app.bsky.graph.listblock" => Ok(CollectionType::BskyListBlock), "app.bsky.graph.listitem" => Ok(CollectionType::BskyListItem), "app.bsky.graph.starterpack" => Ok(CollectionType::BskyStarterPack), "app.bsky.labeler.service" => Ok(CollectionType::BskyLabelerService), _ => Ok(CollectionType::Unsupported), } } /// Parse JSON record into RecordTypes enum fn parse_record(collection: &str, record_data: serde_json::Value) -> Result { // Parse the record with the $type field let mut record_with_type = record_data; // Add the $type field if not present (for deserialization) if !record_with_type.get("$type").is_some() { if let Some(obj) = record_with_type.as_object_mut() { obj.insert("$type".to_string(), serde_json::Value::String(collection.to_string())); } } let record_type: RecordTypes = serde_json::from_str(&record_with_type.to_string())?; Ok(record_type) } /// Process a batch of Tap events /// /// This function batches backfill events by actor for bulk processing. /// Live events are processed immediately, while backfill events are collected /// into batches by actor and sent as UnresolvedBulk or ResolvedBulk events. pub async fn process_tap_events_batch(events: Vec) -> Result> { let mut writer_events = Vec::new(); let mut backfill_by_actor: std::collections::HashMap> = std::collections::HashMap::new(); for event in events { if let Some(record) = event.record { // Check if this is a backfill event if !record.live { // This is a backfill event - batch it by actor let actor_did = record.did.clone(); // Parse record action let action = RecordAction::from_str(&record.action) .ok_or_else(|| eyre::eyre!("Unknown record action: {}", record.action))?; // Build AT URI let at_uri = format!("at://{}/{}/{}", record.did, record.collection, record.rkey); // For backfill, we only batch create/update operations // Deletes are processed individually match action { RecordAction::Create | RecordAction::Update => { if let Some(record_data) = record.record { let record_type = parse_record(&record.collection, record_data)?; let cid = if let Some(cid_str) = record.cid { ipld_core::cid::Cid::from_str(&cid_str)? } else { return Err(eyre::eyre!("Missing CID for create/update operation")); }; // Create unresolved record for batching let unresolved_record = UnresolvedRecord { at_uri: at_uri.clone(), rkey: record.rkey.clone(), cid, record: Box::new(record_type), }; backfill_by_actor.entry(actor_did) .or_default() .push(unresolved_record); } } RecordAction::Delete => { // Process deletes immediately (they're rare in backfill) let collection = parse_collection_type(&record.collection)?; let unresolved = UnresolvedEvent { repo: record.did.clone(), event_type: UnresolvedEventType::Delete { collection }, at_uri, rkey: record.rkey.clone(), source: EventSource::TapBackfill, cursor: None, }; writer_events.push(WriterEvent::Unresolved(Box::new(unresolved))); } } } else { // Live event - process immediately let live_events = process_tap_event(TapEvent { id: event.id, event_type: event.event_type, record: Some(record), identity: None, }).await?; writer_events.extend(live_events); } } else if let Some(identity) = event.identity { // Identity events are always processed immediately let identity_events = process_tap_event(TapEvent { id: event.id, event_type: event.event_type, record: None, identity: Some(identity), }).await?; writer_events.extend(identity_events); } } // Send batched backfill events as UnresolvedBulk // Batch size threshold: 50+ records per actor const BULK_THRESHOLD: usize = 50; for (actor_did, records) in backfill_by_actor { if records.len() >= BULK_THRESHOLD { // Send as bulk event for efficient processing // Note: We don't have actor_id yet, it will be resolved by the database writer writer_events.push(WriterEvent::UnresolvedBulk { repo: actor_did, actor_id: 0, // Will be resolved by database writer records, source: EventSource::TapBackfill, }); } else { // Small batch - send as individual unresolved events for unresolved_record in records { let unresolved = UnresolvedEvent { repo: actor_did.clone(), event_type: UnresolvedEventType::CreateUpdate { record: unresolved_record.record, cid: unresolved_record.cid, }, at_uri: unresolved_record.at_uri, rkey: unresolved_record.rkey, source: EventSource::TapBackfill, cursor: None, }; writer_events.push(WriterEvent::Unresolved(Box::new(unresolved))); } } } Ok(writer_events) }