//! Database writer - coordinates all database operations //! //! This module decouples event processing from database writes. //! Workers parse events and produce `ProcessedEvent` structures containing //! all the data needed for database operations. A background database writer //! task drains the queue and performs database writes asynchronously. //! //! Architecture: 30-worker pool (2 sources × 5 types × 3 workers) //! - Sources: Tap live (high priority), Tap backfill (medium) //! - Types: Actor, Like, Social, Post, Metadata //! - Per-DID ordering maintained through advisory locks pub mod bulk_processor; pub mod bulk_types; pub mod locking; pub mod operations; pub mod reference_extraction; pub mod routing; pub mod timestamp; pub mod workers_tap; // acquire_did_locks is used internally by workers, not exposed pub use bulk_types::{BulkOperations, UnresolvedRecord}; pub use operations::{process_record_to_operations, DatabaseOperation, ResolvedActorIds}; pub use reference_extraction::{extract_references, RecordReferences}; pub use timestamp::{validate_record_timestamp_with_tid, validate_tid_timestamp}; pub use workers_tap::spawn_database_writer_tap; /// Wrapper for events sent to the database writer /// /// The database writer accepts four types of events: /// - UnresolvedEvent: Needs actor_id/post_id resolution (from Tap workers) /// - ProcessedEvent: Already resolved, ready for dispatch (from Fetch workers) /// - UnresolvedBulk: Bulk unresolved events from backfill (50+ records) /// - ResolvedBulk: Bulk resolved events ready for COPY operations #[derive(Debug)] pub enum WriterEvent { /// Event that needs FK resolution before processing (handles both CreateUpdate and Delete) Unresolved(Box), /// Event that's already resolved and ready for operation workers Resolved(Box), /// Bulk unresolved events from backfill (50+ records) UnresolvedBulk { repo: String, actor_id: i32, records: Vec, source: EventSource, }, /// Bulk resolved events ready for COPY operations ResolvedBulk { repo: String, actor_id: i32, operations: BulkOperations, source: EventSource, }, } /// Event source for priority queue routing #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum EventSource { /// High priority: Real-time Tap events (processed first) Tap, /// Medium priority: Tap historical backfill events TapBackfill, } /// Type of unresolved event (discriminates between creates/updates and deletes) #[derive(Debug)] pub enum UnresolvedEventType { /// Create or update operation with record data CreateUpdate { record: Box, // TODO: Investigate the necessity of this box cid: ipld_core::cid::Cid, }, /// Delete operation Delete { collection: crate::relay::types::CollectionType, }, } /// An unresolved event that needs actor_id (and potentially post_id) resolution /// /// Tap workers stay database-free by producing these events. /// Resolution workers in the database writer will: /// 1. Resolve/create actor stub (get actor_id) /// 2. For posts with reply/quote: resolve/create parent post stubs (get parent_post_id) /// 3. Call process_record_to_operations with resolved IDs (for CreateUpdate) /// 4. Create DeleteRecord operation with resolved actor_id (for Delete) /// 5. Dispatch resulting ProcessedEvent to operation workers #[derive(Debug)] pub struct UnresolvedEvent { /// Actor DID (needs resolution to actor_id) pub repo: String, /// Event type (CreateUpdate or Delete) pub event_type: UnresolvedEventType, /// Full AT-URI (at://did/collection/rkey) pub at_uri: String, /// Record key pub rkey: String, /// Event source for priority routing pub source: EventSource, /// Cursor update for this event (cursor timestamp in microseconds) pub cursor: Option, } /// A fully processed event ready for database writing /// /// Workers produce these after parsing and validation. /// Contains all data needed for database operations - no further queries needed. /// /// Note: Aggregate deltas are now calculated by the database writer based on /// actual database operation results (rows affected), making stats idempotent. #[derive(Debug)] pub struct ProcessedEvent { /// Database operations to perform pub operations: Vec, /// Cursor update for this event (batched saves every 10s) /// Contains cursor timestamp in microseconds pub cursor: Option, /// Event source for priority queue routing pub source: EventSource, }