1//! Database writer - coordinates all database operations
2//!
3//! This module decouples event processing from database writes.
4//! Workers parse events and produce `ProcessedEvent` structures containing
5//! all the data needed for database operations. A background database writer
6//! task drains the queue and performs database writes asynchronously.
7//!
8//! Architecture: 30-worker pool (2 sources × 5 types × 3 workers)
9//! - Sources: Tap live (high priority), Tap backfill (medium)
10//! - Types: Actor, Like, Social, Post, Metadata
11//! - Per-DID ordering maintained through advisory locks
12
13pub mod bulk_processor;
14pub mod bulk_types;
15pub mod locking;
16pub mod operations;
17pub mod reference_extraction;
18pub mod routing;
19pub mod timestamp;
20pub mod workers_tap;
21
22// acquire_did_locks is used internally by workers, not exposed
23pub use bulk_types::{BulkOperations, UnresolvedRecord};
24pub use operations::{process_record_to_operations, DatabaseOperation, ResolvedActorIds};
25pub use reference_extraction::{extract_references, RecordReferences};
26pub use timestamp::{validate_record_timestamp_with_tid, validate_tid_timestamp};
27pub use workers_tap::spawn_database_writer_tap;
28
29/// Wrapper for events sent to the database writer
30///
31/// The database writer accepts four types of events:
32/// - UnresolvedEvent: Needs actor_id/post_id resolution (from Tap workers)
33/// - ProcessedEvent: Already resolved, ready for dispatch (from Fetch workers)
34/// - UnresolvedBulk: Bulk unresolved events from backfill (50+ records)
35/// - ResolvedBulk: Bulk resolved events ready for COPY operations
36#[derive(Debug)]
37pub enum WriterEvent {
38 /// Event that needs FK resolution before processing (handles both CreateUpdate and Delete)
39 Unresolved(Box<UnresolvedEvent>),
40 /// Event that's already resolved and ready for operation workers
41 Resolved(Box<ProcessedEvent>),
42 /// Bulk unresolved events from backfill (50+ records)
43 UnresolvedBulk {
44 repo: String,
45 actor_id: i32,
46 records: Vec<UnresolvedRecord>,
47 source: EventSource,
48 },
49 /// Bulk resolved events ready for COPY operations
50 ResolvedBulk {
51 repo: String,
52 actor_id: i32,
53 operations: BulkOperations,
54 source: EventSource,
55 },
56}
57
58/// Event source for priority queue routing
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub enum EventSource {
61 /// High priority: Real-time Tap events (processed first)
62 Tap,
63 /// Medium priority: Tap historical backfill events
64 TapBackfill,
65}
66
67/// Type of unresolved event (discriminates between creates/updates and deletes)
68#[derive(Debug)]
69pub enum UnresolvedEventType {
70 /// Create or update operation with record data
71 CreateUpdate {
72 record: Box<crate::relay::types::RecordTypes>, // TODO: Investigate the necessity of this box
73 cid: ipld_core::cid::Cid,
74 },
75 /// Delete operation
76 Delete {
77 collection: crate::relay::types::CollectionType,
78 },
79}
80
81/// An unresolved event that needs actor_id (and potentially post_id) resolution
82///
83/// Tap workers stay database-free by producing these events.
84/// Resolution workers in the database writer will:
85/// 1. Resolve/create actor stub (get actor_id)
86/// 2. For posts with reply/quote: resolve/create parent post stubs (get parent_post_id)
87/// 3. Call process_record_to_operations with resolved IDs (for CreateUpdate)
88/// 4. Create DeleteRecord operation with resolved actor_id (for Delete)
89/// 5. Dispatch resulting ProcessedEvent to operation workers
90#[derive(Debug)]
91pub struct UnresolvedEvent {
92 /// Actor DID (needs resolution to actor_id)
93 pub repo: String,
94
95 /// Event type (CreateUpdate or Delete)
96 pub event_type: UnresolvedEventType,
97
98 /// Full AT-URI (at://did/collection/rkey)
99 pub at_uri: String,
100
101 /// Record key
102 pub rkey: String,
103
104 /// Event source for priority routing
105 pub source: EventSource,
106
107 /// Cursor update for this event (cursor timestamp in microseconds)
108 pub cursor: Option<u64>,
109}
110
111/// A fully processed event ready for database writing
112///
113/// Workers produce these after parsing and validation.
114/// Contains all data needed for database operations - no further queries needed.
115///
116/// Note: Aggregate deltas are now calculated by the database writer based on
117/// actual database operation results (rows affected), making stats idempotent.
118#[derive(Debug)]
119pub struct ProcessedEvent {
120 /// Database operations to perform
121 pub operations: Vec<DatabaseOperation>,
122
123 /// Cursor update for this event (batched saves every 10s)
124 /// Contains cursor timestamp in microseconds
125 pub cursor: Option<u64>,
126
127 /// Event source for priority queue routing
128 pub source: EventSource,
129}