Highly ambitious ATProtocol AppView service and sdks
at main 3.0 kB view raw
1//! Analytics and time-series data queries. 2//! 3//! This module handles database operations for generating analytics data, 4//! including sparkline time-series data for record indexing activity. 5 6use super::client::Database; 7use crate::errors::DatabaseError; 8use crate::models::SparklinePoint; 9use std::collections::HashMap; 10 11impl Database { 12 /// Gets sparkline data for multiple slices in a single query batch. 13 /// 14 /// Generates time-bucketed counts of indexed records for visualization. 15 /// 16 /// # Arguments 17 /// * `slice_uris` - Array of slice URIs to get data for 18 /// * `interval` - Time bucket size: "minute", "hour", or "day" 19 /// * `duration_hours` - How many hours of history to include 20 /// 21 /// # Returns 22 /// HashMap mapping slice_uri -> array of (timestamp, count) data points 23 pub async fn get_batch_sparkline_data( 24 &self, 25 slice_uris: &[String], 26 interval: &str, 27 duration_hours: i32, 28 ) -> Result<HashMap<String, Vec<SparklinePoint>>, DatabaseError> { 29 use chrono::{Duration, Utc}; 30 let cutoff_time = Utc::now() - Duration::hours(duration_hours as i64); 31 32 let mut sparklines = HashMap::new(); 33 34 for slice_uri in slice_uris { 35 let interval_validated = match interval { 36 "minute" => "minute", 37 "day" => "day", 38 _ => "hour", 39 }; 40 41 // Generate a complete series of time buckets with zero-filled gaps 42 let query = format!( 43 r#" 44 WITH time_series AS ( 45 SELECT generate_series( 46 date_trunc('{}', $1::timestamptz), 47 date_trunc('{}', NOW()), 48 '1 {}'::interval 49 ) AS bucket 50 ) 51 SELECT 52 ts.bucket, 53 COALESCE(COUNT(r.cid), 0) as count 54 FROM time_series ts 55 LEFT JOIN record r ON date_trunc('{}', r.indexed_at) = ts.bucket 56 AND r.indexed_at >= $1 57 AND r.slice_uri = $2 58 GROUP BY ts.bucket 59 ORDER BY ts.bucket 60 "#, 61 interval_validated, 62 interval_validated, 63 interval_validated, 64 interval_validated 65 ); 66 67 let rows = 68 sqlx::query_as::<_, (Option<chrono::DateTime<chrono::Utc>>, Option<i64>)>(&query) 69 .bind(cutoff_time) 70 .bind(slice_uri) 71 .fetch_all(&self.pool) 72 .await?; 73 74 let data_points = rows 75 .into_iter() 76 .map(|(bucket, count)| SparklinePoint { 77 timestamp: bucket.unwrap().to_rfc3339(), 78 count: count.unwrap_or(0), 79 }) 80 .collect(); 81 82 sparklines.insert(slice_uri.clone(), data_points); 83 } 84 85 Ok(sparklines) 86 } 87}