forked from
slices.network/slices
Highly ambitious ATProtocol AppView service and sdks
1//! Batched logging system for high-throughput database log persistence.
2//!
3//! This module provides an async, batched logging system that:
4//! - Queues log entries in memory using an unbounded channel
5//! - Flushes to PostgreSQL in batches (every 5 seconds or 100 entries)
6//! - Maintains a global singleton logger instance
7//! - Supports different log types (sync jobs, Jetstream events, system logs)
8//! - Automatically cleans up old logs (1 day for Jetstream, 7 days for jobs)
9//!
10//! The batching approach significantly reduces database load during high-throughput
11//! scenarios like Jetstream event processing.
12
13use chrono::Utc;
14use serde_json::Value;
15use sqlx::PgPool;
16use std::sync::OnceLock;
17use tokio::sync::mpsc;
18use tokio::time::{Duration, interval};
19use tracing::{error, info, warn};
20use uuid::Uuid;
21
22/// Log severity levels for structured logging.
23#[derive(Debug, Clone)]
24pub enum LogLevel {
25 Info,
26 Warn,
27 Error,
28}
29
30impl LogLevel {
31 /// Returns the string representation of the log level.
32 pub fn as_str(&self) -> &'static str {
33 match self {
34 LogLevel::Info => "info",
35 LogLevel::Warn => "warn",
36 LogLevel::Error => "error",
37 }
38 }
39}
40
41/// Categories of log entries for filtering and organization.
42#[derive(Debug, Clone)]
43#[allow(dead_code)]
44pub enum LogType {
45 /// Background sync job logs (user-initiated collection sync)
46 SyncJob,
47 /// Real-time Jetstream event processing logs
48 Jetstream,
49 /// System-level operational logs
50 System,
51}
52
53impl LogType {
54 /// Returns the string representation of the log type.
55 pub fn as_str(&self) -> &'static str {
56 match self {
57 LogType::SyncJob => "sync_job",
58 LogType::Jetstream => "jetstream",
59 LogType::System => "system",
60 }
61 }
62}
63
64/// Global singleton logger instance, initialized once at application startup.
65static GLOBAL_LOGGER: OnceLock<Logger> = OnceLock::new();
66
67/// Internal representation of a log entry pending database insertion.
68///
69/// These entries are queued in memory and flushed in batches to reduce
70/// database round-trips and improve throughput.
71#[derive(Debug, Clone)]
72struct QueuedLogEntry {
73 log_type: String,
74 job_id: Option<Uuid>,
75 user_did: Option<String>,
76 slice_uri: Option<String>,
77 level: String,
78 message: String,
79 metadata: Option<Value>,
80 created_at: chrono::DateTime<chrono::Utc>,
81}
82
83/// Batched logger that queues log entries and flushes them periodically.
84///
85/// This logger uses an unbounded channel to queue log entries, which are then
86/// flushed to the database by a background worker. The worker flushes when:
87/// - 100 entries accumulate (batch size threshold)
88/// - 5 seconds elapse (time-based threshold)
89/// - The channel is closed (graceful shutdown)
90///
91/// Logs are also immediately written to stdout via the `tracing` crate for
92/// real-time visibility during development and debugging.
93#[derive(Clone)]
94pub struct Logger {
95 sender: mpsc::UnboundedSender<QueuedLogEntry>,
96}
97
98impl Logger {
99 /// Creates a new batched logger and spawns the background worker task.
100 ///
101 /// The background worker runs for the lifetime of the application, processing
102 /// the log queue and flushing to the database.
103 ///
104 /// # Arguments
105 /// * `pool` - PostgreSQL connection pool for database writes
106 pub fn new(pool: PgPool) -> Self {
107 let (sender, receiver) = mpsc::unbounded_channel();
108
109 // Spawn background worker that will run for the lifetime of the application
110 tokio::spawn(Self::background_worker(receiver, pool));
111
112 Self { sender }
113 }
114
115 /// Initializes the global logger singleton.
116 ///
117 /// This should be called once at application startup before any logging occurs.
118 /// Subsequent calls will be ignored with a warning.
119 ///
120 /// # Arguments
121 /// * `pool` - PostgreSQL connection pool for database writes
122 ///
123 /// # Example
124 /// ```ignore
125 /// Logger::init_global(pool.clone());
126 /// let logger = Logger::global();
127 /// logger.log_jetstream(LogLevel::Info, "Started", None);
128 /// ```
129 pub fn init_global(pool: PgPool) {
130 let logger = Self::new(pool);
131 if GLOBAL_LOGGER.set(logger).is_err() {
132 warn!("Global logger was already initialized");
133 }
134 }
135
136 /// Returns a reference to the global logger instance.
137 ///
138 /// # Panics
139 /// Panics if called before `init_global()`. Ensure the logger is initialized
140 /// during application startup.
141 pub fn global() -> &'static Logger {
142 GLOBAL_LOGGER
143 .get()
144 .expect("Global logger not initialized - call Logger::init_global() first")
145 }
146
147 /// Logs a sync job message, queuing it for batched database insertion.
148 ///
149 /// Sync job logs track the progress of background synchronization tasks where
150 /// users fetch their collection data from their PDS.
151 ///
152 /// # Arguments
153 /// * `job_id` - Unique identifier for the sync job
154 /// * `user_did` - Decentralized identifier of the user being synced
155 /// * `slice_uri` - AT-URI of the slice being synchronized
156 /// * `level` - Log severity level
157 /// * `message` - Human-readable log message
158 /// * `metadata` - Optional structured metadata (JSON)
159 ///
160 /// # Behavior
161 /// - Immediately writes to stdout via `tracing` for real-time visibility
162 /// - Queues the entry for batch insertion to the database
163 /// - Send failures are silently ignored (if channel is closed)
164 pub fn log_sync_job(
165 &self,
166 job_id: Uuid,
167 user_did: &str,
168 slice_uri: &str,
169 level: LogLevel,
170 message: &str,
171 metadata: Option<Value>,
172 ) {
173 let entry = QueuedLogEntry {
174 log_type: LogType::SyncJob.as_str().to_string(),
175 job_id: Some(job_id),
176 user_did: Some(user_did.to_string()),
177 slice_uri: Some(slice_uri.to_string()),
178 level: level.as_str().to_string(),
179 message: message.to_string(),
180 metadata,
181 created_at: Utc::now(),
182 };
183
184 // Write to stdout immediately for real-time monitoring and debugging
185 match level {
186 LogLevel::Info => info!("[sync_job] {}", message),
187 LogLevel::Warn => warn!("[sync_job] {}", message),
188 LogLevel::Error => error!("[sync_job] {}", message),
189 }
190
191 // Queue for batch database insertion (ignore send errors if channel closed)
192 let _ = self.sender.send(entry);
193 }
194
195 /// Logs a Jetstream message without slice context.
196 ///
197 /// This is a convenience wrapper around `log_jetstream_with_slice` for
198 /// global Jetstream events (e.g., connection status, errors).
199 ///
200 /// # Arguments
201 /// * `level` - Log severity level
202 /// * `message` - Human-readable log message
203 /// * `metadata` - Optional structured metadata (JSON)
204 pub fn log_jetstream(&self, level: LogLevel, message: &str, metadata: Option<Value>) {
205 self.log_jetstream_with_slice(level, message, metadata, None);
206 }
207
208 /// Logs a Jetstream message with optional slice context.
209 ///
210 /// Jetstream logs track real-time event processing from the AT Protocol firehose.
211 /// Including `slice_uri` associates the log with a specific slice's event processing.
212 ///
213 /// # Arguments
214 /// * `level` - Log severity level
215 /// * `message` - Human-readable log message
216 /// * `metadata` - Optional structured metadata (JSON)
217 /// * `slice_uri` - Optional AT-URI to associate this log with a specific slice
218 ///
219 /// # Behavior
220 /// - Immediately writes to stdout via `tracing` for real-time visibility
221 /// - Queues the entry for batch insertion to the database
222 /// - Send failures are silently ignored (if channel is closed)
223 pub fn log_jetstream_with_slice(
224 &self,
225 level: LogLevel,
226 message: &str,
227 metadata: Option<Value>,
228 slice_uri: Option<&str>,
229 ) {
230 let entry = QueuedLogEntry {
231 log_type: LogType::Jetstream.as_str().to_string(),
232 job_id: None,
233 user_did: None,
234 slice_uri: slice_uri.map(|s| s.to_string()),
235 level: level.as_str().to_string(),
236 message: message.to_string(),
237 metadata,
238 created_at: Utc::now(),
239 };
240
241 // Write to stdout immediately for real-time monitoring and debugging
242 match level {
243 LogLevel::Info => info!("[jetstream] {}", message),
244 LogLevel::Warn => warn!("[jetstream] {}", message),
245 LogLevel::Error => error!("[jetstream] {}", message),
246 }
247
248 // Queue for batch database insertion (ignore send errors if channel closed)
249 let _ = self.sender.send(entry);
250 }
251
252 /// Background worker that processes the log queue and flushes to the database.
253 ///
254 /// This worker runs in a dedicated tokio task and flushes batches when:
255 /// - 100 entries accumulate (to prevent unbounded memory growth)
256 /// - 5 seconds elapse (to ensure timely persistence)
257 /// - The channel closes (graceful shutdown)
258 ///
259 /// # Arguments
260 /// * `receiver` - Channel receiver for queued log entries
261 /// * `pool` - PostgreSQL connection pool for batch inserts
262 async fn background_worker(
263 mut receiver: mpsc::UnboundedReceiver<QueuedLogEntry>,
264 pool: PgPool,
265 ) {
266 let mut batch = Vec::new();
267 // Periodic flush to ensure logs are persisted even during low-volume periods
268 let mut flush_interval = interval(Duration::from_secs(5));
269
270 info!("Started batched logging background worker");
271
272 loop {
273 tokio::select! {
274 // Receive log entries from the queue
275 Some(entry) = receiver.recv() => {
276 batch.push(entry);
277
278 // Flush when batch reaches size threshold to prevent memory buildup
279 if batch.len() >= 100 {
280 Self::flush_batch(&pool, &mut batch).await;
281 }
282 }
283
284 // Time-based flush to ensure logs are persisted within 5 seconds
285 _ = flush_interval.tick() => {
286 if !batch.is_empty() {
287 Self::flush_batch(&pool, &mut batch).await;
288 }
289 }
290
291 // Channel closed (shutdown), flush remaining logs and exit gracefully
292 else => {
293 if !batch.is_empty() {
294 Self::flush_batch(&pool, &mut batch).await;
295 }
296 break;
297 }
298 }
299 }
300
301 info!("Batched logging background worker shut down");
302 }
303
304 /// Flushes a batch of log entries to the database using a bulk INSERT.
305 ///
306 /// This method dynamically constructs a multi-value INSERT statement to minimize
307 /// database round-trips. Each log entry contributes 8 parameters (fields).
308 ///
309 /// # Arguments
310 /// * `pool` - PostgreSQL connection pool
311 /// * `batch` - Mutable vector of queued log entries (cleared after flush)
312 ///
313 /// # Performance
314 /// - Warns if a batch takes >100ms to insert (potential database issue)
315 /// - Logs successful flushes with timing information
316 /// - On error, logs are lost but the system continues (fail-open)
317 async fn flush_batch(pool: &PgPool, batch: &mut Vec<QueuedLogEntry>) {
318 if batch.is_empty() {
319 return;
320 }
321
322 let batch_size = batch.len();
323 let start = std::time::Instant::now();
324
325 // Build bulk INSERT query dynamically based on batch size
326 let mut query = String::from(
327 "INSERT INTO logs (log_type, job_id, user_did, slice_uri, level, message, metadata, created_at) VALUES ",
328 );
329
330 // Add placeholders for each record (8 parameters per entry)
331 for i in 0..batch_size {
332 if i > 0 {
333 query.push_str(", ");
334 }
335 // Calculate base parameter index (8 fields per log entry, 1-indexed)
336 let base = i * 8 + 1;
337 query.push_str(&format!(
338 "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
339 base,
340 base + 1,
341 base + 2,
342 base + 3,
343 base + 4,
344 base + 5,
345 base + 6,
346 base + 7
347 ));
348 }
349
350 // Bind all parameters in order (log_type, job_id, user_did, slice_uri, level, message, metadata, created_at)
351 let mut sqlx_query = sqlx::query(&query);
352 for entry in batch.iter() {
353 sqlx_query = sqlx_query
354 .bind(&entry.log_type)
355 .bind(entry.job_id)
356 .bind(&entry.user_did)
357 .bind(&entry.slice_uri)
358 .bind(&entry.level)
359 .bind(&entry.message)
360 .bind(&entry.metadata)
361 .bind(entry.created_at);
362 }
363
364 // Execute the batch insert and handle errors gracefully
365 match sqlx_query.execute(pool).await {
366 Ok(_) => {
367 let elapsed = start.elapsed();
368 // Warn about slow inserts that may indicate database performance issues
369 if elapsed.as_millis() > 100 {
370 warn!(
371 "Slow log batch insert: {} entries in {:?}",
372 batch_size, elapsed
373 );
374 } else {
375 info!("Flushed {} log entries in {:?}", batch_size, elapsed);
376 }
377 }
378 Err(e) => {
379 error!("Failed to flush log batch of {} entries: {}", batch_size, e);
380 // Fail-open: logs are lost but the system continues to prevent cascading failures
381 }
382 }
383
384 batch.clear();
385 }
386}
387
388/// Represents a log entry retrieved from the database.
389///
390/// This struct is used for query results and API responses. Field names are
391/// converted to camelCase for JSON serialization.
392#[derive(Debug, serde::Serialize, sqlx::FromRow)]
393#[serde(rename_all = "camelCase")]
394pub struct LogEntry {
395 pub id: i64,
396 pub created_at: chrono::DateTime<chrono::Utc>,
397 pub log_type: String,
398 pub job_id: Option<Uuid>,
399 pub user_did: Option<String>,
400 pub slice_uri: Option<String>,
401 pub level: String,
402 pub message: String,
403 pub metadata: Option<serde_json::Value>,
404}
405
406/// Retrieves logs for a specific sync job, ordered chronologically.
407///
408/// # Arguments
409/// * `pool` - PostgreSQL connection pool
410/// * `job_id` - Unique identifier of the sync job
411/// * `limit` - Optional maximum number of logs to return (default: 100)
412///
413/// # Returns
414/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (ASC)
415/// * `Err(sqlx::Error)` - Database query error
416pub async fn get_sync_job_logs(
417 pool: &PgPool,
418 job_id: Uuid,
419 limit: Option<i64>,
420) -> Result<Vec<LogEntry>, sqlx::Error> {
421 let limit = limit.unwrap_or(100);
422
423 let rows = sqlx::query_as!(
424 LogEntry,
425 r#"
426 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata
427 FROM logs
428 WHERE log_type = 'sync_job' AND job_id = $1
429 ORDER BY created_at ASC
430 LIMIT $2
431 "#,
432 job_id,
433 limit
434 )
435 .fetch_all(pool)
436 .await?;
437
438 Ok(rows)
439}
440
441/// Retrieves Jetstream logs, optionally filtered by slice URI.
442///
443/// When a slice filter is provided, returns both slice-specific logs AND global
444/// connection logs (where slice_uri is NULL). This ensures connection status logs
445/// are visible when viewing slice-specific logs.
446///
447/// # Arguments
448/// * `pool` - PostgreSQL connection pool
449/// * `slice_filter` - Optional slice URI to filter logs
450/// * `limit` - Optional maximum number of logs to return (default: 100)
451///
452/// # Returns
453/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (DESC)
454/// * `Err(sqlx::Error)` - Database query error
455pub async fn get_jetstream_logs(
456 pool: &PgPool,
457 slice_filter: Option<&str>,
458 limit: Option<i64>,
459) -> Result<Vec<LogEntry>, sqlx::Error> {
460 let limit = limit.unwrap_or(100);
461
462 let rows = if let Some(slice_uri) = slice_filter {
463 tracing::info!("Querying jetstream logs with slice filter: {}", slice_uri);
464 // Include both slice-specific logs and global connection logs for context
465 let results = sqlx::query_as!(
466 LogEntry,
467 r#"
468 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata
469 FROM logs
470 WHERE log_type = 'jetstream'
471 AND (slice_uri = $1 OR slice_uri IS NULL)
472 ORDER BY created_at DESC
473 LIMIT $2
474 "#,
475 slice_uri,
476 limit
477 )
478 .fetch_all(pool)
479 .await?;
480
481 tracing::info!(
482 "Found {} jetstream logs for slice {}",
483 results.len(),
484 slice_uri
485 );
486 results
487 } else {
488 // No filter provided, return all Jetstream logs across all slices
489 sqlx::query_as!(
490 LogEntry,
491 r#"
492 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata
493 FROM logs
494 WHERE log_type = 'jetstream'
495 ORDER BY created_at DESC
496 LIMIT $1
497 "#,
498 limit
499 )
500 .fetch_all(pool)
501 .await?
502 };
503
504 Ok(rows)
505}
506
507/// Retrieves all logs associated with a specific slice URI.
508///
509/// This includes both sync job logs and Jetstream logs for the slice.
510///
511/// # Arguments
512/// * `pool` - PostgreSQL connection pool
513/// * `slice_uri` - AT-URI of the slice
514/// * `log_type_filter` - Optional log type filter ("sync_job", "jetstream", "system")
515/// * `limit` - Optional maximum number of logs to return (default: 100)
516///
517/// # Returns
518/// * `Ok(Vec<LogEntry>)` - List of log entries ordered by creation time (DESC)
519/// * `Err(sqlx::Error)` - Database query error
520#[allow(dead_code)]
521pub async fn get_slice_logs(
522 pool: &PgPool,
523 slice_uri: &str,
524 log_type_filter: Option<&str>,
525 limit: Option<i64>,
526) -> Result<Vec<LogEntry>, sqlx::Error> {
527 let limit = limit.unwrap_or(100);
528
529 let rows = if let Some(log_type) = log_type_filter {
530 sqlx::query_as!(
531 LogEntry,
532 r#"
533 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata
534 FROM logs
535 WHERE slice_uri = $1 AND log_type = $2
536 ORDER BY created_at DESC
537 LIMIT $3
538 "#,
539 slice_uri,
540 log_type,
541 limit
542 )
543 .fetch_all(pool)
544 .await?
545 } else {
546 sqlx::query_as!(
547 LogEntry,
548 r#"
549 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata
550 FROM logs
551 WHERE slice_uri = $1
552 ORDER BY created_at DESC
553 LIMIT $2
554 "#,
555 slice_uri,
556 limit
557 )
558 .fetch_all(pool)
559 .await?
560 };
561
562 Ok(rows)
563}
564
565/// Deletes old log entries to prevent unbounded database growth.
566///
567/// Retention policy:
568/// - Jetstream logs: 1 day (high volume, primarily for real-time debugging)
569/// - Sync job logs: 7 days (lower volume, useful for historical analysis)
570/// - System logs: 7 days
571///
572/// # Arguments
573/// * `pool` - PostgreSQL connection pool
574///
575/// # Returns
576/// * `Ok(u64)` - Number of deleted log entries
577/// * `Err(sqlx::Error)` - Database query error
578pub async fn cleanup_old_logs(pool: &PgPool) -> Result<u64, sqlx::Error> {
579 let result = sqlx::query!(
580 r#"
581 DELETE FROM logs
582 WHERE
583 (log_type = 'jetstream' AND created_at < NOW() - INTERVAL '1 day')
584 OR (log_type = 'sync_job' AND created_at < NOW() - INTERVAL '7 days')
585 OR (log_type = 'system' AND created_at < NOW() - INTERVAL '7 days')
586 "#,
587 )
588 .execute(pool)
589 .await?;
590
591 Ok(result.rows_affected())
592}
593
594/// Spawns a background task that periodically cleans up old logs.
595///
596/// The task runs every 6 hours for the lifetime of the application, deleting
597/// logs according to the retention policy in `cleanup_old_logs`.
598///
599/// # Arguments
600/// * `pool` - PostgreSQL connection pool (cloned into the spawned task)
601pub fn start_log_cleanup_task(pool: PgPool) {
602 tokio::spawn(async move {
603 // Run cleanup every 6 hours (balances database load with timely cleanup)
604 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(6 * 3600));
605
606 info!("Started log cleanup background task (runs every 6 hours)");
607
608 loop {
609 interval.tick().await;
610
611 match cleanup_old_logs(&pool).await {
612 Ok(deleted) => {
613 if deleted > 0 {
614 info!("Log cleanup: deleted {} old log entries", deleted);
615 } else {
616 info!("Log cleanup: no old logs to delete");
617 }
618 }
619 Err(e) => {
620 error!("Failed to cleanup old logs: {}", e);
621 }
622 }
623 }
624 });
625}