//! Batched logging system for high-throughput database log persistence. //! //! This module provides an async, batched logging system that: //! - Queues log entries in memory using an unbounded channel //! - Flushes to PostgreSQL in batches (every 5 seconds or 100 entries) //! - Maintains a global singleton logger instance //! - Supports different log types (sync jobs, Jetstream events, system logs) //! - Automatically cleans up old logs (1 day for Jetstream, 7 days for jobs) //! //! The batching approach significantly reduces database load during high-throughput //! scenarios like Jetstream event processing. use chrono::Utc; use serde_json::Value; use sqlx::PgPool; use std::sync::OnceLock; use tokio::sync::mpsc; use tokio::time::{Duration, interval}; use tracing::{error, info, warn}; use uuid::Uuid; /// Log severity levels for structured logging. #[derive(Debug, Clone)] pub enum LogLevel { Info, Warn, Error, } impl LogLevel { /// Returns the string representation of the log level. pub fn as_str(&self) -> &'static str { match self { LogLevel::Info => "info", LogLevel::Warn => "warn", LogLevel::Error => "error", } } } /// Categories of log entries for filtering and organization. #[derive(Debug, Clone)] #[allow(dead_code)] pub enum LogType { /// Background sync job logs (user-initiated collection sync) SyncJob, /// Real-time Jetstream event processing logs Jetstream, /// System-level operational logs System, } impl LogType { /// Returns the string representation of the log type. pub fn as_str(&self) -> &'static str { match self { LogType::SyncJob => "sync_job", LogType::Jetstream => "jetstream", LogType::System => "system", } } } /// Global singleton logger instance, initialized once at application startup. static GLOBAL_LOGGER: OnceLock = OnceLock::new(); /// Internal representation of a log entry pending database insertion. /// /// These entries are queued in memory and flushed in batches to reduce /// database round-trips and improve throughput. #[derive(Debug, Clone)] struct QueuedLogEntry { log_type: String, job_id: Option, user_did: Option, slice_uri: Option, level: String, message: String, metadata: Option, created_at: chrono::DateTime, } /// Batched logger that queues log entries and flushes them periodically. /// /// This logger uses an unbounded channel to queue log entries, which are then /// flushed to the database by a background worker. The worker flushes when: /// - 100 entries accumulate (batch size threshold) /// - 5 seconds elapse (time-based threshold) /// - The channel is closed (graceful shutdown) /// /// Logs are also immediately written to stdout via the `tracing` crate for /// real-time visibility during development and debugging. #[derive(Clone)] pub struct Logger { sender: mpsc::UnboundedSender, } impl Logger { /// Creates a new batched logger and spawns the background worker task. /// /// The background worker runs for the lifetime of the application, processing /// the log queue and flushing to the database. /// /// # Arguments /// * `pool` - PostgreSQL connection pool for database writes pub fn new(pool: PgPool) -> Self { let (sender, receiver) = mpsc::unbounded_channel(); // Spawn background worker that will run for the lifetime of the application tokio::spawn(Self::background_worker(receiver, pool)); Self { sender } } /// Initializes the global logger singleton. /// /// This should be called once at application startup before any logging occurs. /// Subsequent calls will be ignored with a warning. /// /// # Arguments /// * `pool` - PostgreSQL connection pool for database writes /// /// # Example /// ```ignore /// Logger::init_global(pool.clone()); /// let logger = Logger::global(); /// logger.log_jetstream(LogLevel::Info, "Started", None); /// ``` pub fn init_global(pool: PgPool) { let logger = Self::new(pool); if GLOBAL_LOGGER.set(logger).is_err() { warn!("Global logger was already initialized"); } } /// Returns a reference to the global logger instance. /// /// # Panics /// Panics if called before `init_global()`. Ensure the logger is initialized /// during application startup. pub fn global() -> &'static Logger { GLOBAL_LOGGER .get() .expect("Global logger not initialized - call Logger::init_global() first") } /// Logs a sync job message, queuing it for batched database insertion. /// /// Sync job logs track the progress of background synchronization tasks where /// users fetch their collection data from their PDS. /// /// # Arguments /// * `job_id` - Unique identifier for the sync job /// * `user_did` - Decentralized identifier of the user being synced /// * `slice_uri` - AT-URI of the slice being synchronized /// * `level` - Log severity level /// * `message` - Human-readable log message /// * `metadata` - Optional structured metadata (JSON) /// /// # Behavior /// - Immediately writes to stdout via `tracing` for real-time visibility /// - Queues the entry for batch insertion to the database /// - Send failures are silently ignored (if channel is closed) pub fn log_sync_job( &self, job_id: Uuid, user_did: &str, slice_uri: &str, level: LogLevel, message: &str, metadata: Option, ) { let entry = QueuedLogEntry { log_type: LogType::SyncJob.as_str().to_string(), job_id: Some(job_id), user_did: Some(user_did.to_string()), slice_uri: Some(slice_uri.to_string()), level: level.as_str().to_string(), message: message.to_string(), metadata, created_at: Utc::now(), }; // Write to stdout immediately for real-time monitoring and debugging match level { LogLevel::Info => info!("[sync_job] {}", message), LogLevel::Warn => warn!("[sync_job] {}", message), LogLevel::Error => error!("[sync_job] {}", message), } // Queue for batch database insertion (ignore send errors if channel closed) let _ = self.sender.send(entry); } /// Logs a Jetstream message without slice context. /// /// This is a convenience wrapper around `log_jetstream_with_slice` for /// global Jetstream events (e.g., connection status, errors). /// /// # Arguments /// * `level` - Log severity level /// * `message` - Human-readable log message /// * `metadata` - Optional structured metadata (JSON) pub fn log_jetstream(&self, level: LogLevel, message: &str, metadata: Option) { self.log_jetstream_with_slice(level, message, metadata, None); } /// Logs a Jetstream message with optional slice context. /// /// Jetstream logs track real-time event processing from the AT Protocol firehose. /// Including `slice_uri` associates the log with a specific slice's event processing. /// /// # Arguments /// * `level` - Log severity level /// * `message` - Human-readable log message /// * `metadata` - Optional structured metadata (JSON) /// * `slice_uri` - Optional AT-URI to associate this log with a specific slice /// /// # Behavior /// - Immediately writes to stdout via `tracing` for real-time visibility /// - Queues the entry for batch insertion to the database /// - Send failures are silently ignored (if channel is closed) pub fn log_jetstream_with_slice( &self, level: LogLevel, message: &str, metadata: Option, slice_uri: Option<&str>, ) { let entry = QueuedLogEntry { log_type: LogType::Jetstream.as_str().to_string(), job_id: None, user_did: None, slice_uri: slice_uri.map(|s| s.to_string()), level: level.as_str().to_string(), message: message.to_string(), metadata, created_at: Utc::now(), }; // Write to stdout immediately for real-time monitoring and debugging match level { LogLevel::Info => info!("[jetstream] {}", message), LogLevel::Warn => warn!("[jetstream] {}", message), LogLevel::Error => error!("[jetstream] {}", message), } // Queue for batch database insertion (ignore send errors if channel closed) let _ = self.sender.send(entry); } /// Background worker that processes the log queue and flushes to the database. /// /// This worker runs in a dedicated tokio task and flushes batches when: /// - 100 entries accumulate (to prevent unbounded memory growth) /// - 5 seconds elapse (to ensure timely persistence) /// - The channel closes (graceful shutdown) /// /// # Arguments /// * `receiver` - Channel receiver for queued log entries /// * `pool` - PostgreSQL connection pool for batch inserts async fn background_worker( mut receiver: mpsc::UnboundedReceiver, pool: PgPool, ) { let mut batch = Vec::new(); // Periodic flush to ensure logs are persisted even during low-volume periods let mut flush_interval = interval(Duration::from_secs(5)); info!("Started batched logging background worker"); loop { tokio::select! { // Receive log entries from the queue Some(entry) = receiver.recv() => { batch.push(entry); // Flush when batch reaches size threshold to prevent memory buildup if batch.len() >= 100 { Self::flush_batch(&pool, &mut batch).await; } } // Time-based flush to ensure logs are persisted within 5 seconds _ = flush_interval.tick() => { if !batch.is_empty() { Self::flush_batch(&pool, &mut batch).await; } } // Channel closed (shutdown), flush remaining logs and exit gracefully else => { if !batch.is_empty() { Self::flush_batch(&pool, &mut batch).await; } break; } } } info!("Batched logging background worker shut down"); } /// Flushes a batch of log entries to the database using a bulk INSERT. /// /// This method dynamically constructs a multi-value INSERT statement to minimize /// database round-trips. Each log entry contributes 8 parameters (fields). /// /// # Arguments /// * `pool` - PostgreSQL connection pool /// * `batch` - Mutable vector of queued log entries (cleared after flush) /// /// # Performance /// - Warns if a batch takes >100ms to insert (potential database issue) /// - Logs successful flushes with timing information /// - On error, logs are lost but the system continues (fail-open) async fn flush_batch(pool: &PgPool, batch: &mut Vec) { if batch.is_empty() { return; } let batch_size = batch.len(); let start = std::time::Instant::now(); // Build bulk INSERT query dynamically based on batch size let mut query = String::from( "INSERT INTO logs (log_type, job_id, user_did, slice_uri, level, message, metadata, created_at) VALUES ", ); // Add placeholders for each record (8 parameters per entry) for i in 0..batch_size { if i > 0 { query.push_str(", "); } // Calculate base parameter index (8 fields per log entry, 1-indexed) let base = i * 8 + 1; query.push_str(&format!( "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})", base, base + 1, base + 2, base + 3, base + 4, base + 5, base + 6, base + 7 )); } // Bind all parameters in order (log_type, job_id, user_did, slice_uri, level, message, metadata, created_at) let mut sqlx_query = sqlx::query(&query); for entry in batch.iter() { sqlx_query = sqlx_query .bind(&entry.log_type) .bind(entry.job_id) .bind(&entry.user_did) .bind(&entry.slice_uri) .bind(&entry.level) .bind(&entry.message) .bind(&entry.metadata) .bind(entry.created_at); } // Execute the batch insert and handle errors gracefully match sqlx_query.execute(pool).await { Ok(_) => { let elapsed = start.elapsed(); // Warn about slow inserts that may indicate database performance issues if elapsed.as_millis() > 100 { warn!( "Slow log batch insert: {} entries in {:?}", batch_size, elapsed ); } else { info!("Flushed {} log entries in {:?}", batch_size, elapsed); } } Err(e) => { error!("Failed to flush log batch of {} entries: {}", batch_size, e); // Fail-open: logs are lost but the system continues to prevent cascading failures } } batch.clear(); } } /// Represents a log entry retrieved from the database. /// /// This struct is used for query results and API responses. Field names are /// converted to camelCase for JSON serialization. #[derive(Debug, Clone, serde::Serialize, sqlx::FromRow)] #[serde(rename_all = "camelCase")] pub struct LogEntry { pub id: i64, pub created_at: chrono::DateTime, pub log_type: String, pub job_id: Option, pub user_did: Option, pub slice_uri: Option, pub level: String, pub message: String, pub metadata: Option, } /// Retrieves logs for a specific sync job, ordered chronologically. /// /// # Arguments /// * `pool` - PostgreSQL connection pool /// * `job_id` - Unique identifier of the sync job /// * `limit` - Optional maximum number of logs to return (default: 100) /// /// # Returns /// * `Ok(Vec)` - List of log entries ordered by creation time (ASC) /// * `Err(sqlx::Error)` - Database query error pub async fn get_sync_job_logs( pool: &PgPool, job_id: Uuid, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); let rows = sqlx::query_as!( LogEntry, r#" SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata FROM logs WHERE log_type = 'sync_job' AND job_id = $1 ORDER BY created_at ASC LIMIT $2 "#, job_id, limit ) .fetch_all(pool) .await?; Ok(rows) } /// Retrieves Jetstream logs, optionally filtered by slice URI. /// /// When a slice filter is provided, returns both slice-specific logs AND global /// connection logs (where slice_uri is NULL). This ensures connection status logs /// are visible when viewing slice-specific logs. /// /// # Arguments /// * `pool` - PostgreSQL connection pool /// * `slice_filter` - Optional slice URI to filter logs /// * `limit` - Optional maximum number of logs to return (default: 100) /// /// # Returns /// * `Ok(Vec)` - List of log entries ordered by creation time (DESC) /// * `Err(sqlx::Error)` - Database query error pub async fn get_jetstream_logs( pool: &PgPool, slice_filter: Option<&str>, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); let rows = if let Some(slice_uri) = slice_filter { // Include both slice-specific logs and global connection logs for context sqlx::query_as!( LogEntry, r#" SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata FROM logs WHERE log_type = 'jetstream' AND (slice_uri = $1 OR slice_uri IS NULL) ORDER BY created_at DESC LIMIT $2 "#, slice_uri, limit ) .fetch_all(pool) .await? } else { // No filter provided, return all Jetstream logs across all slices sqlx::query_as!( LogEntry, r#" SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata FROM logs WHERE log_type = 'jetstream' ORDER BY created_at DESC LIMIT $1 "#, limit ) .fetch_all(pool) .await? }; Ok(rows) } /// Retrieves all logs associated with a specific slice URI. /// /// This includes both sync job logs and Jetstream logs for the slice. /// /// # Arguments /// * `pool` - PostgreSQL connection pool /// * `slice_uri` - AT-URI of the slice /// * `log_type_filter` - Optional log type filter ("sync_job", "jetstream", "system") /// * `limit` - Optional maximum number of logs to return (default: 100) /// /// # Returns /// * `Ok(Vec)` - List of log entries ordered by creation time (DESC) /// * `Err(sqlx::Error)` - Database query error #[allow(dead_code)] pub async fn get_slice_logs( pool: &PgPool, slice_uri: &str, log_type_filter: Option<&str>, limit: Option, ) -> Result, sqlx::Error> { let limit = limit.unwrap_or(100); let rows = if let Some(log_type) = log_type_filter { sqlx::query_as!( LogEntry, r#" SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata FROM logs WHERE slice_uri = $1 AND log_type = $2 ORDER BY created_at DESC LIMIT $3 "#, slice_uri, log_type, limit ) .fetch_all(pool) .await? } else { sqlx::query_as!( LogEntry, r#" SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata FROM logs WHERE slice_uri = $1 ORDER BY created_at DESC LIMIT $2 "#, slice_uri, limit ) .fetch_all(pool) .await? }; Ok(rows) } /// Deletes old log entries to prevent unbounded database growth. /// /// Retention policy: /// - Jetstream logs: 1 day (high volume, primarily for real-time debugging) /// - Sync job logs: 7 days (lower volume, useful for historical analysis) /// - System logs: 7 days /// /// # Arguments /// * `pool` - PostgreSQL connection pool /// /// # Returns /// * `Ok(u64)` - Number of deleted log entries /// * `Err(sqlx::Error)` - Database query error pub async fn cleanup_old_logs(pool: &PgPool) -> Result { let result = sqlx::query!( r#" DELETE FROM logs WHERE (log_type = 'jetstream' AND created_at < NOW() - INTERVAL '1 day') OR (log_type = 'sync_job' AND created_at < NOW() - INTERVAL '7 days') OR (log_type = 'system' AND created_at < NOW() - INTERVAL '7 days') "#, ) .execute(pool) .await?; Ok(result.rows_affected()) } /// Spawns a background task that periodically cleans up old logs. /// /// The task runs every 6 hours for the lifetime of the application, deleting /// logs according to the retention policy in `cleanup_old_logs`. /// /// # Arguments /// * `pool` - PostgreSQL connection pool (cloned into the spawned task) pub fn start_log_cleanup_task(pool: PgPool) { tokio::spawn(async move { // Run cleanup every 6 hours (balances database load with timely cleanup) let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(6 * 3600)); info!("Started log cleanup background task (runs every 6 hours)"); loop { interval.tick().await; match cleanup_old_logs(&pool).await { Ok(deleted) => { if deleted > 0 { info!("Log cleanup: deleted {} old log entries", deleted); } else { info!("Log cleanup: no old logs to delete"); } } Err(e) => { error!("Failed to cleanup old logs: {}", e); } } } }); }