Rust AppView - highly experimental!
at experiments 129 lines 4.7 kB view raw
1//! Database writer - coordinates all database operations 2//! 3//! This module decouples event processing from database writes. 4//! Workers parse events and produce `ProcessedEvent` structures containing 5//! all the data needed for database operations. A background database writer 6//! task drains the queue and performs database writes asynchronously. 7//! 8//! Architecture: 30-worker pool (2 sources × 5 types × 3 workers) 9//! - Sources: Tap live (high priority), Tap backfill (medium) 10//! - Types: Actor, Like, Social, Post, Metadata 11//! - Per-DID ordering maintained through advisory locks 12 13pub mod bulk_processor; 14pub mod bulk_types; 15pub mod locking; 16pub mod operations; 17pub mod reference_extraction; 18pub mod routing; 19pub mod timestamp; 20pub mod workers_tap; 21 22// acquire_did_locks is used internally by workers, not exposed 23pub use bulk_types::{BulkOperations, UnresolvedRecord}; 24pub use operations::{process_record_to_operations, DatabaseOperation, ResolvedActorIds}; 25pub use reference_extraction::{extract_references, RecordReferences}; 26pub use timestamp::{validate_record_timestamp_with_tid, validate_tid_timestamp}; 27pub use workers_tap::spawn_database_writer_tap; 28 29/// Wrapper for events sent to the database writer 30/// 31/// The database writer accepts four types of events: 32/// - UnresolvedEvent: Needs actor_id/post_id resolution (from Tap workers) 33/// - ProcessedEvent: Already resolved, ready for dispatch (from Fetch workers) 34/// - UnresolvedBulk: Bulk unresolved events from backfill (50+ records) 35/// - ResolvedBulk: Bulk resolved events ready for COPY operations 36#[derive(Debug)] 37pub enum WriterEvent { 38 /// Event that needs FK resolution before processing (handles both CreateUpdate and Delete) 39 Unresolved(Box<UnresolvedEvent>), 40 /// Event that's already resolved and ready for operation workers 41 Resolved(Box<ProcessedEvent>), 42 /// Bulk unresolved events from backfill (50+ records) 43 UnresolvedBulk { 44 repo: String, 45 actor_id: i32, 46 records: Vec<UnresolvedRecord>, 47 source: EventSource, 48 }, 49 /// Bulk resolved events ready for COPY operations 50 ResolvedBulk { 51 repo: String, 52 actor_id: i32, 53 operations: BulkOperations, 54 source: EventSource, 55 }, 56} 57 58/// Event source for priority queue routing 59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 60pub enum EventSource { 61 /// High priority: Real-time Tap events (processed first) 62 Tap, 63 /// Medium priority: Tap historical backfill events 64 TapBackfill, 65} 66 67/// Type of unresolved event (discriminates between creates/updates and deletes) 68#[derive(Debug)] 69pub enum UnresolvedEventType { 70 /// Create or update operation with record data 71 CreateUpdate { 72 record: Box<crate::relay::types::RecordTypes>, // TODO: Investigate the necessity of this box 73 cid: ipld_core::cid::Cid, 74 }, 75 /// Delete operation 76 Delete { 77 collection: crate::relay::types::CollectionType, 78 }, 79} 80 81/// An unresolved event that needs actor_id (and potentially post_id) resolution 82/// 83/// Tap workers stay database-free by producing these events. 84/// Resolution workers in the database writer will: 85/// 1. Resolve/create actor stub (get actor_id) 86/// 2. For posts with reply/quote: resolve/create parent post stubs (get parent_post_id) 87/// 3. Call process_record_to_operations with resolved IDs (for CreateUpdate) 88/// 4. Create DeleteRecord operation with resolved actor_id (for Delete) 89/// 5. Dispatch resulting ProcessedEvent to operation workers 90#[derive(Debug)] 91pub struct UnresolvedEvent { 92 /// Actor DID (needs resolution to actor_id) 93 pub repo: String, 94 95 /// Event type (CreateUpdate or Delete) 96 pub event_type: UnresolvedEventType, 97 98 /// Full AT-URI (at://did/collection/rkey) 99 pub at_uri: String, 100 101 /// Record key 102 pub rkey: String, 103 104 /// Event source for priority routing 105 pub source: EventSource, 106 107 /// Cursor update for this event (cursor timestamp in microseconds) 108 pub cursor: Option<u64>, 109} 110 111/// A fully processed event ready for database writing 112/// 113/// Workers produce these after parsing and validation. 114/// Contains all data needed for database operations - no further queries needed. 115/// 116/// Note: Aggregate deltas are now calculated by the database writer based on 117/// actual database operation results (rows affected), making stats idempotent. 118#[derive(Debug)] 119pub struct ProcessedEvent { 120 /// Database operations to perform 121 pub operations: Vec<DatabaseOperation>, 122 123 /// Cursor update for this event (batched saves every 10s) 124 /// Contains cursor timestamp in microseconds 125 pub cursor: Option<u64>, 126 127 /// Event source for priority queue routing 128 pub source: EventSource, 129}