Highly ambitious ATProtocol AppView service and sdks
at main 21 kB view raw
1//! Batched logging system for high-throughput database log persistence. 2//! 3//! This module provides an async, batched logging system that: 4//! - Queues log entries in memory using an unbounded channel 5//! - Flushes to PostgreSQL in batches (every 5 seconds or 100 entries) 6//! - Maintains a global singleton logger instance 7//! - Supports different log types (sync jobs, Jetstream events, system logs) 8//! - Automatically cleans up old logs (1 day for Jetstream, 7 days for jobs) 9//! 10//! The batching approach significantly reduces database load during high-throughput 11//! scenarios like Jetstream event processing. 12 13use chrono::Utc; 14use serde_json::Value; 15use sqlx::PgPool; 16use std::sync::OnceLock; 17use tokio::sync::mpsc; 18use tokio::time::{Duration, interval}; 19use tracing::{error, info, warn}; 20use uuid::Uuid; 21 22/// Log severity levels for structured logging. 23#[derive(Debug, Clone)] 24pub enum LogLevel { 25 Info, 26 Warn, 27 Error, 28} 29 30impl LogLevel { 31 /// Returns the string representation of the log level. 32 pub fn as_str(&self) -> &'static str { 33 match self { 34 LogLevel::Info => "info", 35 LogLevel::Warn => "warn", 36 LogLevel::Error => "error", 37 } 38 } 39} 40 41/// Categories of log entries for filtering and organization. 42#[derive(Debug, Clone)] 43#[allow(dead_code)] 44pub enum LogType { 45 /// Background sync job logs (user-initiated collection sync) 46 SyncJob, 47 /// Real-time Jetstream event processing logs 48 Jetstream, 49 /// System-level operational logs 50 System, 51} 52 53impl LogType { 54 /// Returns the string representation of the log type. 55 pub fn as_str(&self) -> &'static str { 56 match self { 57 LogType::SyncJob => "sync_job", 58 LogType::Jetstream => "jetstream", 59 LogType::System => "system", 60 } 61 } 62} 63 64/// Global singleton logger instance, initialized once at application startup. 65static GLOBAL_LOGGER: OnceLock<Logger> = OnceLock::new(); 66 67/// Internal representation of a log entry pending database insertion. 68/// 69/// These entries are queued in memory and flushed in batches to reduce 70/// database round-trips and improve throughput. 71#[derive(Debug, Clone)] 72struct QueuedLogEntry { 73 log_type: String, 74 job_id: Option<Uuid>, 75 user_did: Option<String>, 76 slice_uri: Option<String>, 77 level: String, 78 message: String, 79 metadata: Option<Value>, 80 created_at: chrono::DateTime<chrono::Utc>, 81} 82 83/// Batched logger that queues log entries and flushes them periodically. 84/// 85/// This logger uses an unbounded channel to queue log entries, which are then 86/// flushed to the database by a background worker. The worker flushes when: 87/// - 100 entries accumulate (batch size threshold) 88/// - 5 seconds elapse (time-based threshold) 89/// - The channel is closed (graceful shutdown) 90/// 91/// Logs are also immediately written to stdout via the `tracing` crate for 92/// real-time visibility during development and debugging. 93#[derive(Clone)] 94pub struct Logger { 95 sender: mpsc::UnboundedSender<QueuedLogEntry>, 96} 97 98impl Logger { 99 /// Creates a new batched logger and spawns the background worker task. 100 /// 101 /// The background worker runs for the lifetime of the application, processing 102 /// the log queue and flushing to the database. 103 /// 104 /// # Arguments 105 /// * `pool` - PostgreSQL connection pool for database writes 106 pub fn new(pool: PgPool) -> Self { 107 let (sender, receiver) = mpsc::unbounded_channel(); 108 109 // Spawn background worker that will run for the lifetime of the application 110 tokio::spawn(Self::background_worker(receiver, pool)); 111 112 Self { sender } 113 } 114 115 /// Initializes the global logger singleton. 116 /// 117 /// This should be called once at application startup before any logging occurs. 118 /// Subsequent calls will be ignored with a warning. 119 /// 120 /// # Arguments 121 /// * `pool` - PostgreSQL connection pool for database writes 122 /// 123 /// # Example 124 /// ```ignore 125 /// Logger::init_global(pool.clone()); 126 /// let logger = Logger::global(); 127 /// logger.log_jetstream(LogLevel::Info, "Started", None); 128 /// ``` 129 pub fn init_global(pool: PgPool) { 130 let logger = Self::new(pool); 131 if GLOBAL_LOGGER.set(logger).is_err() { 132 warn!("Global logger was already initialized"); 133 } 134 } 135 136 /// Returns a reference to the global logger instance. 137 /// 138 /// # Panics 139 /// Panics if called before `init_global()`. Ensure the logger is initialized 140 /// during application startup. 141 pub fn global() -> &'static Logger { 142 GLOBAL_LOGGER 143 .get() 144 .expect("Global logger not initialized - call Logger::init_global() first") 145 } 146 147 /// Logs a sync job message, queuing it for batched database insertion. 148 /// 149 /// Sync job logs track the progress of background synchronization tasks where 150 /// users fetch their collection data from their PDS. 151 /// 152 /// # Arguments 153 /// * `job_id` - Unique identifier for the sync job 154 /// * `user_did` - Decentralized identifier of the user being synced 155 /// * `slice_uri` - AT-URI of the slice being synchronized 156 /// * `level` - Log severity level 157 /// * `message` - Human-readable log message 158 /// * `metadata` - Optional structured metadata (JSON) 159 /// 160 /// # Behavior 161 /// - Immediately writes to stdout via `tracing` for real-time visibility 162 /// - Queues the entry for batch insertion to the database 163 /// - Send failures are silently ignored (if channel is closed) 164 pub fn log_sync_job( 165 &self, 166 job_id: Uuid, 167 user_did: &str, 168 slice_uri: &str, 169 level: LogLevel, 170 message: &str, 171 metadata: Option<Value>, 172 ) { 173 let entry = QueuedLogEntry { 174 log_type: LogType::SyncJob.as_str().to_string(), 175 job_id: Some(job_id), 176 user_did: Some(user_did.to_string()), 177 slice_uri: Some(slice_uri.to_string()), 178 level: level.as_str().to_string(), 179 message: message.to_string(), 180 metadata, 181 created_at: Utc::now(), 182 }; 183 184 // Write to stdout immediately for real-time monitoring and debugging 185 match level { 186 LogLevel::Info => info!("[sync_job] {}", message), 187 LogLevel::Warn => warn!("[sync_job] {}", message), 188 LogLevel::Error => error!("[sync_job] {}", message), 189 } 190 191 // Queue for batch database insertion (ignore send errors if channel closed) 192 let _ = self.sender.send(entry); 193 } 194 195 /// Logs a Jetstream message without slice context. 196 /// 197 /// This is a convenience wrapper around `log_jetstream_with_slice` for 198 /// global Jetstream events (e.g., connection status, errors). 199 /// 200 /// # Arguments 201 /// * `level` - Log severity level 202 /// * `message` - Human-readable log message 203 /// * `metadata` - Optional structured metadata (JSON) 204 pub fn log_jetstream(&self, level: LogLevel, message: &str, metadata: Option<Value>) { 205 self.log_jetstream_with_slice(level, message, metadata, None); 206 } 207 208 /// Logs a Jetstream message with optional slice context. 209 /// 210 /// Jetstream logs track real-time event processing from the AT Protocol firehose. 211 /// Including `slice_uri` associates the log with a specific slice's event processing. 212 /// 213 /// # Arguments 214 /// * `level` - Log severity level 215 /// * `message` - Human-readable log message 216 /// * `metadata` - Optional structured metadata (JSON) 217 /// * `slice_uri` - Optional AT-URI to associate this log with a specific slice 218 /// 219 /// # Behavior 220 /// - Immediately writes to stdout via `tracing` for real-time visibility 221 /// - Queues the entry for batch insertion to the database 222 /// - Send failures are silently ignored (if channel is closed) 223 pub fn log_jetstream_with_slice( 224 &self, 225 level: LogLevel, 226 message: &str, 227 metadata: Option<Value>, 228 slice_uri: Option<&str>, 229 ) { 230 let entry = QueuedLogEntry { 231 log_type: LogType::Jetstream.as_str().to_string(), 232 job_id: None, 233 user_did: None, 234 slice_uri: slice_uri.map(|s| s.to_string()), 235 level: level.as_str().to_string(), 236 message: message.to_string(), 237 metadata, 238 created_at: Utc::now(), 239 }; 240 241 // Write to stdout immediately for real-time monitoring and debugging 242 match level { 243 LogLevel::Info => info!("[jetstream] {}", message), 244 LogLevel::Warn => warn!("[jetstream] {}", message), 245 LogLevel::Error => error!("[jetstream] {}", message), 246 } 247 248 // Queue for batch database insertion (ignore send errors if channel closed) 249 let _ = self.sender.send(entry); 250 } 251 252 /// Background worker that processes the log queue and flushes to the database. 253 /// 254 /// This worker runs in a dedicated tokio task and flushes batches when: 255 /// - 100 entries accumulate (to prevent unbounded memory growth) 256 /// - 5 seconds elapse (to ensure timely persistence) 257 /// - The channel closes (graceful shutdown) 258 /// 259 /// # Arguments 260 /// * `receiver` - Channel receiver for queued log entries 261 /// * `pool` - PostgreSQL connection pool for batch inserts 262 async fn background_worker( 263 mut receiver: mpsc::UnboundedReceiver<QueuedLogEntry>, 264 pool: PgPool, 265 ) { 266 let mut batch = Vec::new(); 267 // Periodic flush to ensure logs are persisted even during low-volume periods 268 let mut flush_interval = interval(Duration::from_secs(5)); 269 270 info!("Started batched logging background worker"); 271 272 loop { 273 tokio::select! { 274 // Receive log entries from the queue 275 Some(entry) = receiver.recv() => { 276 batch.push(entry); 277 278 // Flush when batch reaches size threshold to prevent memory buildup 279 if batch.len() >= 100 { 280 Self::flush_batch(&pool, &mut batch).await; 281 } 282 } 283 284 // Time-based flush to ensure logs are persisted within 5 seconds 285 _ = flush_interval.tick() => { 286 if !batch.is_empty() { 287 Self::flush_batch(&pool, &mut batch).await; 288 } 289 } 290 291 // Channel closed (shutdown), flush remaining logs and exit gracefully 292 else => { 293 if !batch.is_empty() { 294 Self::flush_batch(&pool, &mut batch).await; 295 } 296 break; 297 } 298 } 299 } 300 301 info!("Batched logging background worker shut down"); 302 } 303 304 /// Flushes a batch of log entries to the database using a bulk INSERT. 305 /// 306 /// This method dynamically constructs a multi-value INSERT statement to minimize 307 /// database round-trips. Each log entry contributes 8 parameters (fields). 308 /// 309 /// # Arguments 310 /// * `pool` - PostgreSQL connection pool 311 /// * `batch` - Mutable vector of queued log entries (cleared after flush) 312 /// 313 /// # Performance 314 /// - Warns if a batch takes >100ms to insert (potential database issue) 315 /// - Logs successful flushes with timing information 316 /// - On error, logs are lost but the system continues (fail-open) 317 async fn flush_batch(pool: &PgPool, batch: &mut Vec<QueuedLogEntry>) { 318 if batch.is_empty() { 319 return; 320 } 321 322 let batch_size = batch.len(); 323 let start = std::time::Instant::now(); 324 325 // Build bulk INSERT query dynamically based on batch size 326 let mut query = String::from( 327 "INSERT INTO logs (log_type, job_id, user_did, slice_uri, level, message, metadata, created_at) VALUES ", 328 ); 329 330 // Add placeholders for each record (8 parameters per entry) 331 for i in 0..batch_size { 332 if i > 0 { 333 query.push_str(", "); 334 } 335 // Calculate base parameter index (8 fields per log entry, 1-indexed) 336 let base = i * 8 + 1; 337 query.push_str(&format!( 338 "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})", 339 base, 340 base + 1, 341 base + 2, 342 base + 3, 343 base + 4, 344 base + 5, 345 base + 6, 346 base + 7 347 )); 348 } 349 350 // Bind all parameters in order (log_type, job_id, user_did, slice_uri, level, message, metadata, created_at) 351 let mut sqlx_query = sqlx::query(&query); 352 for entry in batch.iter() { 353 sqlx_query = sqlx_query 354 .bind(&entry.log_type) 355 .bind(entry.job_id) 356 .bind(&entry.user_did) 357 .bind(&entry.slice_uri) 358 .bind(&entry.level) 359 .bind(&entry.message) 360 .bind(&entry.metadata) 361 .bind(entry.created_at); 362 } 363 364 // Execute the batch insert and handle errors gracefully 365 match sqlx_query.execute(pool).await { 366 Ok(_) => { 367 let elapsed = start.elapsed(); 368 // Warn about slow inserts that may indicate database performance issues 369 if elapsed.as_millis() > 100 { 370 warn!( 371 "Slow log batch insert: {} entries in {:?}", 372 batch_size, elapsed 373 ); 374 } else { 375 info!("Flushed {} log entries in {:?}", batch_size, elapsed); 376 } 377 } 378 Err(e) => { 379 error!("Failed to flush log batch of {} entries: {}", batch_size, e); 380 // Fail-open: logs are lost but the system continues to prevent cascading failures 381 } 382 } 383 384 batch.clear(); 385 } 386} 387 388/// Represents a log entry retrieved from the database. 389/// 390/// This struct is used for query results and API responses. Field names are 391/// converted to camelCase for JSON serialization. 392#[derive(Debug, Clone, serde::Serialize, sqlx::FromRow)] 393#[serde(rename_all = "camelCase")] 394pub struct LogEntry { 395 pub id: i64, 396 pub created_at: chrono::DateTime<chrono::Utc>, 397 pub log_type: String, 398 pub job_id: Option<Uuid>, 399 pub user_did: Option<String>, 400 pub slice_uri: Option<String>, 401 pub level: String, 402 pub message: String, 403 pub metadata: Option<serde_json::Value>, 404} 405 406/// Retrieves logs for a specific sync job, ordered chronologically. 407/// 408/// # Arguments 409/// * `pool` - PostgreSQL connection pool 410/// * `job_id` - Unique identifier of the sync job 411/// * `limit` - Optional maximum number of logs to return (default: 100) 412/// 413/// # Returns 414/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (ASC) 415/// * `Err(sqlx::Error)` - Database query error 416pub async fn get_sync_job_logs( 417 pool: &PgPool, 418 job_id: Uuid, 419 limit: Option<i64>, 420) -> Result<Vec<LogEntry>, sqlx::Error> { 421 let limit = limit.unwrap_or(100); 422 423 let rows = sqlx::query_as!( 424 LogEntry, 425 r#" 426 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 427 FROM logs 428 WHERE log_type = 'sync_job' AND job_id = $1 429 ORDER BY created_at ASC 430 LIMIT $2 431 "#, 432 job_id, 433 limit 434 ) 435 .fetch_all(pool) 436 .await?; 437 438 Ok(rows) 439} 440 441/// Retrieves Jetstream logs, optionally filtered by slice URI. 442/// 443/// When a slice filter is provided, returns both slice-specific logs AND global 444/// connection logs (where slice_uri is NULL). This ensures connection status logs 445/// are visible when viewing slice-specific logs. 446/// 447/// # Arguments 448/// * `pool` - PostgreSQL connection pool 449/// * `slice_filter` - Optional slice URI to filter logs 450/// * `limit` - Optional maximum number of logs to return (default: 100) 451/// 452/// # Returns 453/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (DESC) 454/// * `Err(sqlx::Error)` - Database query error 455pub async fn get_jetstream_logs( 456 pool: &PgPool, 457 slice_filter: Option<&str>, 458 limit: Option<i64>, 459) -> Result<Vec<LogEntry>, sqlx::Error> { 460 let limit = limit.unwrap_or(100); 461 462 let rows = if let Some(slice_uri) = slice_filter { 463 // Include both slice-specific logs and global connection logs for context 464 sqlx::query_as!( 465 LogEntry, 466 r#" 467 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 468 FROM logs 469 WHERE log_type = 'jetstream' 470 AND (slice_uri = $1 OR slice_uri IS NULL) 471 ORDER BY created_at DESC 472 LIMIT $2 473 "#, 474 slice_uri, 475 limit 476 ) 477 .fetch_all(pool) 478 .await? 479 } else { 480 // No filter provided, return all Jetstream logs across all slices 481 sqlx::query_as!( 482 LogEntry, 483 r#" 484 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 485 FROM logs 486 WHERE log_type = 'jetstream' 487 ORDER BY created_at DESC 488 LIMIT $1 489 "#, 490 limit 491 ) 492 .fetch_all(pool) 493 .await? 494 }; 495 496 Ok(rows) 497} 498 499/// Retrieves all logs associated with a specific slice URI. 500/// 501/// This includes both sync job logs and Jetstream logs for the slice. 502/// 503/// # Arguments 504/// * `pool` - PostgreSQL connection pool 505/// * `slice_uri` - AT-URI of the slice 506/// * `log_type_filter` - Optional log type filter ("sync_job", "jetstream", "system") 507/// * `limit` - Optional maximum number of logs to return (default: 100) 508/// 509/// # Returns 510/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (DESC) 511/// * `Err(sqlx::Error)` - Database query error 512#[allow(dead_code)] 513pub async fn get_slice_logs( 514 pool: &PgPool, 515 slice_uri: &str, 516 log_type_filter: Option<&str>, 517 limit: Option<i64>, 518) -> Result<Vec<LogEntry>, sqlx::Error> { 519 let limit = limit.unwrap_or(100); 520 521 let rows = if let Some(log_type) = log_type_filter { 522 sqlx::query_as!( 523 LogEntry, 524 r#" 525 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 526 FROM logs 527 WHERE slice_uri = $1 AND log_type = $2 528 ORDER BY created_at DESC 529 LIMIT $3 530 "#, 531 slice_uri, 532 log_type, 533 limit 534 ) 535 .fetch_all(pool) 536 .await? 537 } else { 538 sqlx::query_as!( 539 LogEntry, 540 r#" 541 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata 542 FROM logs 543 WHERE slice_uri = $1 544 ORDER BY created_at DESC 545 LIMIT $2 546 "#, 547 slice_uri, 548 limit 549 ) 550 .fetch_all(pool) 551 .await? 552 }; 553 554 Ok(rows) 555} 556 557/// Deletes old log entries to prevent unbounded database growth. 558/// 559/// Retention policy: 560/// - Jetstream logs: 1 day (high volume, primarily for real-time debugging) 561/// - Sync job logs: 7 days (lower volume, useful for historical analysis) 562/// - System logs: 7 days 563/// 564/// # Arguments 565/// * `pool` - PostgreSQL connection pool 566/// 567/// # Returns 568/// * `Ok(u64)` - Number of deleted log entries 569/// * `Err(sqlx::Error)` - Database query error 570pub async fn cleanup_old_logs(pool: &PgPool) -> Result<u64, sqlx::Error> { 571 let result = sqlx::query!( 572 r#" 573 DELETE FROM logs 574 WHERE 575 (log_type = 'jetstream' AND created_at < NOW() - INTERVAL '1 day') 576 OR (log_type = 'sync_job' AND created_at < NOW() - INTERVAL '7 days') 577 OR (log_type = 'system' AND created_at < NOW() - INTERVAL '7 days') 578 "#, 579 ) 580 .execute(pool) 581 .await?; 582 583 Ok(result.rows_affected()) 584} 585 586/// Spawns a background task that periodically cleans up old logs. 587/// 588/// The task runs every 6 hours for the lifetime of the application, deleting 589/// logs according to the retention policy in `cleanup_old_logs`. 590/// 591/// # Arguments 592/// * `pool` - PostgreSQL connection pool (cloned into the spawned task) 593pub fn start_log_cleanup_task(pool: PgPool) { 594 tokio::spawn(async move { 595 // Run cleanup every 6 hours (balances database load with timely cleanup) 596 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(6 * 3600)); 597 598 info!("Started log cleanup background task (runs every 6 hours)"); 599 600 loop { 601 interval.tick().await; 602 603 match cleanup_old_logs(&pool).await { 604 Ok(deleted) => { 605 if deleted > 0 { 606 info!("Log cleanup: deleted {} old log entries", deleted); 607 } else { 608 info!("Log cleanup: no old logs to delete"); 609 } 610 } 611 Err(e) => { 612 error!("Failed to cleanup old logs: {}", e); 613 } 614 } 615 } 616 }); 617}