Highly ambitious ATProtocol AppView service and sdks
at main 21 kB view raw
1//! Batched logging system for high-throughput database log persistence. 2//! 3//! This module provides an async, batched logging system that: 4//! - Queues log entries in memory using an unbounded channel 5//! - Flushes to PostgreSQL in batches (every 5 seconds or 100 entries) 6//! - Maintains a global singleton logger instance 7//! - Supports different log types (sync jobs, Jetstream events, system logs) 8//! - Automatically cleans up old logs (1 day for Jetstream, 7 days for jobs) 9//! 10//! The batching approach significantly reduces database load during high-throughput 11//! scenarios like Jetstream event processing. 12 13use chrono::Utc; 14use serde_json::Value; 15use sqlx::PgPool; 16use std::sync::OnceLock; 17use tokio::sync::mpsc; 18use tokio::time::{Duration, interval}; 19use tracing::{error, info, warn}; 20use uuid::Uuid; 21 22/// Log severity levels for structured logging. 23#[derive(Debug, Clone)] 24pub enum LogLevel { 25 Info, 26 Warn, 27 Error, 28} 29 30impl LogLevel { 31 /// Returns the string representation of the log level. 32 pub fn as_str(&self) -> &'static str { 33 match self { 34 LogLevel::Info => "info", 35 LogLevel::Warn => "warn", 36 LogLevel::Error => "error", 37 } 38 } 39} 40 41/// Categories of log entries for filtering and organization. 42#[derive(Debug, Clone)] 43#[allow(dead_code)] 44pub enum LogType { 45 /// Background sync job logs (user-initiated collection sync) 46 SyncJob, 47 /// Real-time Jetstream event processing logs 48 Jetstream, 49 /// System-level operational logs 50 System, 51} 52 53impl LogType { 54 /// Returns the string representation of the log type. 55 pub fn as_str(&self) -> &'static str { 56 match self { 57 LogType::SyncJob => "sync_job", 58 LogType::Jetstream => "jetstream", 59 LogType::System => "system", 60 } 61 } 62} 63 64/// Global singleton logger instance, initialized once at application startup. 65static GLOBAL_LOGGER: OnceLock<Logger> = OnceLock::new(); 66 67/// Internal representation of a log entry pending database insertion. 68/// 69/// These entries are queued in memory and flushed in batches to reduce 70/// database round-trips and improve throughput. 71#[derive(Debug, Clone)] 72struct QueuedLogEntry { 73 log_type: String, 74 job_id: Option<Uuid>, 75 user_did: Option<String>, 76 slice_uri: Option<String>, 77 level: String, 78 message: String, 79 metadata: Option<Value>, 80 created_at: chrono::DateTime<chrono::Utc>, 81} 82 83/// Batched logger that queues log entries and flushes them periodically. 84/// 85/// This logger uses an unbounded channel to queue log entries, which are then 86/// flushed to the database by a background worker. The worker flushes when: 87/// - 100 entries accumulate (batch size threshold) 88/// - 5 seconds elapse (time-based threshold) 89/// - The channel is closed (graceful shutdown) 90/// 91/// Logs are also immediately written to stdout via the `tracing` crate for 92/// real-time visibility during development and debugging. 93#[derive(Clone)] 94pub struct Logger { 95 sender: mpsc::UnboundedSender<QueuedLogEntry>, 96} 97 98impl Logger { 99 /// Creates a new batched logger and spawns the background worker task. 100 /// 101 /// The background worker runs for the lifetime of the application, processing 102 /// the log queue and flushing to the database. 103 /// 104 /// # Arguments 105 /// * `pool` - PostgreSQL connection pool for database writes 106 pub fn new(pool: PgPool) -> Self { 107 let (sender, receiver) = mpsc::unbounded_channel(); 108 109 // Spawn background worker that will run for the lifetime of the application 110 tokio::spawn(Self::background_worker(receiver, pool)); 111 112 Self { sender } 113 } 114 115 /// Initializes the global logger singleton. 116 /// 117 /// This should be called once at application startup before any logging occurs. 118 /// Subsequent calls will be ignored with a warning. 119 /// 120 /// # Arguments 121 /// * `pool` - PostgreSQL connection pool for database writes 122 /// 123 /// # Example 124 /// ```ignore 125 /// Logger::init_global(pool.clone()); 126 /// let logger = Logger::global(); 127 /// logger.log_jetstream(LogLevel::Info, "Started", None); 128 /// ``` 129 pub fn init_global(pool: PgPool) { 130 let logger = Self::new(pool); 131 if GLOBAL_LOGGER.set(logger).is_err() { 132 warn!("Global logger was already initialized"); 133 } 134 } 135 136 /// Returns a reference to the global logger instance. 137 /// 138 /// # Panics 139 /// Panics if called before `init_global()`. Ensure the logger is initialized 140 /// during application startup. 141 pub fn global() -> &'static Logger { 142 GLOBAL_LOGGER 143 .get() 144 .expect("Global logger not initialized - call Logger::init_global() first") 145 } 146 147 /// Logs a sync job message, queuing it for batched database insertion. 148 /// 149 /// Sync job logs track the progress of background synchronization tasks where 150 /// users fetch their collection data from their PDS. 151 /// 152 /// # Arguments 153 /// * `job_id` - Unique identifier for the sync job 154 /// * `user_did` - Decentralized identifier of the user being synced 155 /// * `slice_uri` - AT-URI of the slice being synchronized 156 /// * `level` - Log severity level 157 /// * `message` - Human-readable log message 158 /// * `metadata` - Optional structured metadata (JSON) 159 /// 160 /// # Behavior 161 /// - Immediately writes to stdout via `tracing` for real-time visibility 162 /// - Queues the entry for batch insertion to the database 163 /// - Send failures are silently ignored (if channel is closed) 164 pub fn log_sync_job( 165 &self, 166 job_id: Uuid, 167 user_did: &str, 168 slice_uri: &str, 169 level: LogLevel, 170 message: &str, 171 metadata: Option<Value>, 172 ) { 173 let entry = QueuedLogEntry { 174 log_type: LogType::SyncJob.as_str().to_string(), 175 job_id: Some(job_id), 176 user_did: Some(user_did.to_string()), 177 slice_uri: Some(slice_uri.to_string()), 178 level: level.as_str().to_string(), 179 message: message.to_string(), 180 metadata, 181 created_at: Utc::now(), 182 }; 183 184 // Write to stdout immediately for real-time monitoring and debugging 185 match level { 186 LogLevel::Info => info!("[sync_job] {}", message), 187 LogLevel::Warn => warn!("[sync_job] {}", message), 188 LogLevel::Error => error!("[sync_job] {}", message), 189 } 190 191 // Queue for batch database insertion (ignore send errors if channel closed) 192 let _ = self.sender.send(entry); 193 } 194 195 /// Logs a Jetstream message without slice context. 196 /// 197 /// This is a convenience wrapper around `log_jetstream_with_slice` for 198 /// global Jetstream events (e.g., connection status, errors). 199 /// 200 /// # Arguments 201 /// * `level` - Log severity level 202 /// * `message` - Human-readable log message 203 /// * `metadata` - Optional structured metadata (JSON) 204 pub fn log_jetstream(&self, level: LogLevel, message: &str, metadata: Option<Value>) { 205 self.log_jetstream_with_slice(level, message, metadata, None); 206 } 207 208 /// Logs a Jetstream message with optional slice context. 209 /// 210 /// Jetstream logs track real-time event processing from the AT Protocol firehose. 211 /// Including `slice_uri` associates the log with a specific slice's event processing. 212 /// 213 /// # Arguments 214 /// * `level` - Log severity level 215 /// * `message` - Human-readable log message 216 /// * `metadata` - Optional structured metadata (JSON) 217 /// * `slice_uri` - Optional AT-URI to associate this log with a specific slice 218 /// 219 /// # Behavior 220 /// - Immediately writes to stdout via `tracing` for real-time visibility 221 /// - Queues the entry for batch insertion to the database 222 /// - Send failures are silently ignored (if channel is closed) 223 pub fn log_jetstream_with_slice( 224 &self, 225 level: LogLevel, 226 message: &str, 227 metadata: Option<Value>, 228 slice_uri: Option<&str>, 229 ) { 230 let entry = QueuedLogEntry { 231 log_type: LogType::Jetstream.as_str().to_string(), 232 job_id: None, 233 user_did: None, 234 slice_uri: slice_uri.map(|s| s.to_string()), 235 level: level.as_str().to_string(), 236 message: message.to_string(), 237 metadata, 238 created_at: Utc::now(), 239 }; 240 241 // Write to stdout immediately for real-time monitoring and debugging 242 match level { 243 LogLevel::Info => info!("[jetstream] {}", message), 244 LogLevel::Warn => warn!("[jetstream] {}", message), 245 LogLevel::Error => error!("[jetstream] {}", message), 246 } 247 248 // Queue for batch database insertion (ignore send errors if channel closed) 249 let _ = self.sender.send(entry); 250 } 251 252 /// Background worker that processes the log queue and flushes to the database. 253 /// 254 /// This worker runs in a dedicated tokio task and flushes batches when: 255 /// - 100 entries accumulate (to prevent unbounded memory growth) 256 /// - 5 seconds elapse (to ensure timely persistence) 257 /// - The channel closes (graceful shutdown) 258 /// 259 /// # Arguments 260 /// * `receiver` - Channel receiver for queued log entries 261 /// * `pool` - PostgreSQL connection pool for batch inserts 262 async fn background_worker( 263 mut receiver: mpsc::UnboundedReceiver<QueuedLogEntry>, 264 pool: PgPool, 265 ) { 266 let mut batch = Vec::new(); 267 // Periodic flush to ensure logs are persisted even during low-volume periods 268 let mut flush_interval = interval(Duration::from_secs(5)); 269 270 info!("Started batched logging background worker"); 271 272 loop { 273 tokio::select! { 274 // Receive log entries from the queue 275 Some(entry) = receiver.recv() => { 276 batch.push(entry); 277 278 // Flush when batch reaches size threshold to prevent memory buildup 279 if batch.len() >= 100 { 280 Self::flush_batch(&pool, &mut batch).await; 281 } 282 } 283 284 // Time-based flush to ensure logs are persisted within 5 seconds 285 _ = flush_interval.tick() => { 286 if !batch.is_empty() { 287 Self::flush_batch(&pool, &mut batch).await; 288 } 289 } 290 291 // Channel closed (shutdown), flush remaining logs and exit gracefully 292 else => { 293 if !batch.is_empty() { 294 Self::flush_batch(&pool, &mut batch).await; 295 } 296 break; 297 } 298 } 299 } 300 301 info!("Batched logging background worker shut down"); 302 } 303 304 /// Flushes a batch of log entries to the database using a bulk INSERT. 305 /// 306 /// This method dynamically constructs a multi-value INSERT statement to minimize 307 /// database round-trips. Each log entry contributes 8 parameters (fields). 308 /// 309 /// # Arguments 310 /// * `pool` - PostgreSQL connection pool 311 /// * `batch` - Mutable vector of queued log entries (cleared after flush) 312 /// 313 /// # Performance 314 /// - Warns if a batch takes >100ms to insert (potential database issue) 315 /// - Logs successful flushes with timing information 316 /// - On error, logs are lost but the system continues (fail-open) 317 async fn flush_batch(pool: &PgPool, batch: &mut Vec<QueuedLogEntry>) { 318 if batch.is_empty() { 319 return; 320 } 321 322 let batch_size = batch.len(); 323 let start = std::time::Instant::now(); 324 325 // Build bulk INSERT query dynamically based on batch size 326 let mut query = String::from( 327 "INSERT INTO logs (log_type, job_id, user_did, slice_uri, level, message, metadata, created_at) VALUES ", 328 ); 329 330 // Add placeholders for each record (8 parameters per entry) 331 for i in 0..batch_size { 332 if i > 0 { 333 query.push_str(", "); 334 } 335 // Calculate base parameter index (8 fields per log entry, 1-indexed) 336 let base = i * 8 + 1; 337 query.push_str(&format!( 338 "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})", 339 base, 340 base + 1, 341 base + 2, 342 base + 3, 343 base + 4, 344 base + 5, 345 base + 6, 346 base + 7 347 )); 348 } 349 350 // Bind all parameters in order (log_type, job_id, user_did, slice_uri, level, message, metadata, created_at) 351 let mut sqlx_query = sqlx::query(&query); 352 for entry in batch.iter() { 353 sqlx_query = sqlx_query 354 .bind(&entry.log_type) 355 .bind(entry.job_id) 356 .bind(&entry.user_did) 357 .bind(&entry.slice_uri) 358 .bind(&entry.level) 359 .bind(&entry.message) 360 .bind(&entry.metadata) 361 .bind(entry.created_at); 362 } 363 364 // Execute the batch insert and handle errors gracefully 365 match sqlx_query.execute(pool).await { 366 Ok(_) => { 367 let elapsed = start.elapsed(); 368 // Warn about slow inserts that may indicate database performance issues 369 if elapsed.as_millis() > 100 { 370 warn!( 371 "Slow log batch insert: {} entries in {:?}", 372 batch_size, elapsed 373 ); 374 } else { 375 info!("Flushed {} log entries in {:?}", batch_size, elapsed); 376 } 377 } 378 Err(e) => { 379 error!("Failed to flush log batch of {} entries: {}", batch_size, e); 380 // Fail-open: logs are lost but the system continues to prevent cascading failures 381 } 382 } 383 384 batch.clear(); 385 } 386} 387 388/// Represents a log entry retrieved from the database. 389/// 390/// This struct is used for query results and API responses. Field names are 391/// converted to camelCase for JSON serialization. 392#[derive(Debug, serde::Serialize, sqlx::FromRow)] 393#[serde(rename_all = "camelCase")] 394pub struct LogEntry { 395 pub id: i64, 396 pub created_at: chrono::DateTime<chrono::Utc>, 397 pub log_type: String, 398 pub job_id: Option<Uuid>, 399 pub user_did: Option<String>, 400 pub slice_uri: Option<String>, 401 pub level: String, 402 pub message: String, 403 pub metadata: Option<serde_json::Value>, 404} 405 406/// Retrieves logs for a specific sync job, ordered chronologically. 407/// 408/// # Arguments 409/// * `pool` - PostgreSQL connection pool 410/// * `job_id` - Unique identifier of the sync job 411/// * `limit` - Optional maximum number of logs to return (default: 100) 412/// 413/// # Returns 414/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (ASC) 415/// * `Err(sqlx::Error)` - Database query error 416pub async fn get_sync_job_logs( 417 pool: &PgPool, 418 job_id: Uuid, 419 limit: Option<i64>, 420) -> Result<Vec<LogEntry>, sqlx::Error> { 421 let limit = limit.unwrap_or(100); 422 423 let rows = sqlx::query_as!( 424 LogEntry, 425 r#" 426 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 427 FROM logs 428 WHERE log_type = 'sync_job' AND job_id = $1 429 ORDER BY created_at ASC 430 LIMIT $2 431 "#, 432 job_id, 433 limit 434 ) 435 .fetch_all(pool) 436 .await?; 437 438 Ok(rows) 439} 440 441/// Retrieves Jetstream logs, optionally filtered by slice URI. 442/// 443/// When a slice filter is provided, returns both slice-specific logs AND global 444/// connection logs (where slice_uri is NULL). This ensures connection status logs 445/// are visible when viewing slice-specific logs. 446/// 447/// # Arguments 448/// * `pool` - PostgreSQL connection pool 449/// * `slice_filter` - Optional slice URI to filter logs 450/// * `limit` - Optional maximum number of logs to return (default: 100) 451/// 452/// # Returns 453/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (DESC) 454/// * `Err(sqlx::Error)` - Database query error 455pub async fn get_jetstream_logs( 456 pool: &PgPool, 457 slice_filter: Option<&str>, 458 limit: Option<i64>, 459) -> Result<Vec<LogEntry>, sqlx::Error> { 460 let limit = limit.unwrap_or(100); 461 462 let rows = if let Some(slice_uri) = slice_filter { 463 tracing::info!("Querying jetstream logs with slice filter: {}", slice_uri); 464 // Include both slice-specific logs and global connection logs for context 465 let results = sqlx::query_as!( 466 LogEntry, 467 r#" 468 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 469 FROM logs 470 WHERE log_type = 'jetstream' 471 AND (slice_uri = $1 OR slice_uri IS NULL) 472 ORDER BY created_at DESC 473 LIMIT $2 474 "#, 475 slice_uri, 476 limit 477 ) 478 .fetch_all(pool) 479 .await?; 480 481 tracing::info!( 482 "Found {} jetstream logs for slice {}", 483 results.len(), 484 slice_uri 485 ); 486 results 487 } else { 488 // No filter provided, return all Jetstream logs across all slices 489 sqlx::query_as!( 490 LogEntry, 491 r#" 492 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 493 FROM logs 494 WHERE log_type = 'jetstream' 495 ORDER BY created_at DESC 496 LIMIT $1 497 "#, 498 limit 499 ) 500 .fetch_all(pool) 501 .await? 502 }; 503 504 Ok(rows) 505} 506 507/// Retrieves all logs associated with a specific slice URI. 508/// 509/// This includes both sync job logs and Jetstream logs for the slice. 510/// 511/// # Arguments 512/// * `pool` - PostgreSQL connection pool 513/// * `slice_uri` - AT-URI of the slice 514/// * `log_type_filter` - Optional log type filter ("sync_job", "jetstream", "system") 515/// * `limit` - Optional maximum number of logs to return (default: 100) 516/// 517/// # Returns 518/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (DESC) 519/// * `Err(sqlx::Error)` - Database query error 520#[allow(dead_code)] 521pub async fn get_slice_logs( 522 pool: &PgPool, 523 slice_uri: &str, 524 log_type_filter: Option<&str>, 525 limit: Option<i64>, 526) -> Result<Vec<LogEntry>, sqlx::Error> { 527 let limit = limit.unwrap_or(100); 528 529 let rows = if let Some(log_type) = log_type_filter { 530 sqlx::query_as!( 531 LogEntry, 532 r#" 533 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 534 FROM logs 535 WHERE slice_uri = $1 AND log_type = $2 536 ORDER BY created_at DESC 537 LIMIT $3 538 "#, 539 slice_uri, 540 log_type, 541 limit 542 ) 543 .fetch_all(pool) 544 .await? 545 } else { 546 sqlx::query_as!( 547 LogEntry, 548 r#" 549 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 550 FROM logs 551 WHERE slice_uri = $1 552 ORDER BY created_at DESC 553 LIMIT $2 554 "#, 555 slice_uri, 556 limit 557 ) 558 .fetch_all(pool) 559 .await? 560 }; 561 562 Ok(rows) 563} 564 565/// Deletes old log entries to prevent unbounded database growth. 566/// 567/// Retention policy: 568/// - Jetstream logs: 1 day (high volume, primarily for real-time debugging) 569/// - Sync job logs: 7 days (lower volume, useful for historical analysis) 570/// - System logs: 7 days 571/// 572/// # Arguments 573/// * `pool` - PostgreSQL connection pool 574/// 575/// # Returns 576/// * `Ok(u64)` - Number of deleted log entries 577/// * `Err(sqlx::Error)` - Database query error 578pub async fn cleanup_old_logs(pool: &PgPool) -> Result<u64, sqlx::Error> { 579 let result = sqlx::query!( 580 r#" 581 DELETE FROM logs 582 WHERE 583 (log_type = 'jetstream' AND created_at < NOW() - INTERVAL '1 day') 584 OR (log_type = 'sync_job' AND created_at < NOW() - INTERVAL '7 days') 585 OR (log_type = 'system' AND created_at < NOW() - INTERVAL '7 days') 586 "#, 587 ) 588 .execute(pool) 589 .await?; 590 591 Ok(result.rows_affected()) 592} 593 594/// Spawns a background task that periodically cleans up old logs. 595/// 596/// The task runs every 6 hours for the lifetime of the application, deleting 597/// logs according to the retention policy in `cleanup_old_logs`. 598/// 599/// # Arguments 600/// * `pool` - PostgreSQL connection pool (cloned into the spawned task) 601pub fn start_log_cleanup_task(pool: PgPool) { 602 tokio::spawn(async move { 603 // Run cleanup every 6 hours (balances database load with timely cleanup) 604 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(6 * 3600)); 605 606 info!("Started log cleanup background task (runs every 6 hours)"); 607 608 loop { 609 interval.tick().await; 610 611 match cleanup_old_logs(&pool).await { 612 Ok(deleted) => { 613 if deleted > 0 { 614 info!("Log cleanup: deleted {} old log entries", deleted); 615 } else { 616 info!("Log cleanup: no old logs to delete"); 617 } 618 } 619 Err(e) => { 620 error!("Failed to cleanup old logs: {}", e); 621 } 622 } 623 } 624 }); 625}