Live video on the AT Protocol
at natb/analytics 254 lines 8.2 kB view raw
1use crate::db::clickhouse::{ClickHouseClient, EventRow}; 2use crate::ingest::{EventBuffer, validate_event}; 3use crate::proto::analytics_server::Analytics; 4use crate::proto::*; 5use crate::query::{get_realtime_stats, get_streamer_stats, get_viewer_history}; 6use chrono::{DateTime, TimeZone, Utc}; 7use std::str::FromStr; 8use std::sync::Arc; 9use tonic::{Request, Response, Status}; 10use tracing::{error, warn}; 11use uuid::Uuid; 12 13pub struct AnalyticsService { 14 clickhouse: ClickHouseClient, 15 buffer: Arc<EventBuffer>, 16} 17 18impl AnalyticsService { 19 pub fn new(clickhouse: ClickHouseClient, buffer: Arc<EventBuffer>) -> Self { 20 Self { clickhouse, buffer } 21 } 22} 23 24#[tonic::async_trait] 25impl Analytics for AnalyticsService { 26 async fn ingest_events( 27 &self, 28 request: Request<IngestEventsRequest>, 29 ) -> Result<Response<IngestEventsResponse>, Status> { 30 let req = request.into_inner(); 31 let mut accepted = 0u32; 32 let mut rejected = 0u32; 33 let mut errors = Vec::new(); 34 35 let mut valid_events = Vec::new(); 36 37 for event in req.events { 38 match validate_event(&event) { 39 Ok(_) => match convert_to_event_row(&event) { 40 Ok(row) => { 41 valid_events.push(row); 42 accepted += 1; 43 } 44 Err(e) => { 45 rejected += 1; 46 errors.push(format!( 47 "conversion error for event {}: {}", 48 event.event_id, e 49 )); 50 } 51 }, 52 Err(e) => { 53 rejected += 1; 54 errors.push(format!( 55 "validation error for event {}: {}", 56 event.event_id, e 57 )); 58 } 59 } 60 } 61 62 if !valid_events.is_empty() { 63 if let Err(e) = self.buffer.add_events(valid_events).await { 64 error!("failed to buffer events: {}", e); 65 return Err(Status::internal("failed to buffer events")); 66 } 67 } 68 69 Ok(Response::new(IngestEventsResponse { 70 accepted, 71 rejected, 72 errors, 73 })) 74 } 75 76 async fn get_streamer_stats( 77 &self, 78 request: Request<StreamerStatsRequest>, 79 ) -> Result<Response<StreamerStatsResponse>, Status> { 80 let req = request.into_inner(); 81 82 let (total_views, total_watch_time_ms, unique_viewers, daily_stats) = get_streamer_stats( 83 self.clickhouse.client(), 84 &req.streamer_did, 85 req.start_time_ms, 86 req.end_time_ms, 87 ) 88 .await 89 .map_err(|e| { 90 error!("failed to get streamer stats: {}", e); 91 Status::internal("query failed") 92 })?; 93 94 let daily_stats_proto: Vec<DailyStats> = daily_stats 95 .into_iter() 96 .map(|s| DailyStats { 97 date: s.date, 98 views: s.views, 99 watch_time_ms: s.watch_time_ms, 100 unique_viewers: s.unique_viewers, 101 }) 102 .collect(); 103 104 Ok(Response::new(StreamerStatsResponse { 105 streamer_did: req.streamer_did, 106 total_views, 107 total_watch_time_ms, 108 unique_viewers, 109 daily_stats: daily_stats_proto, 110 })) 111 } 112 113 async fn get_viewer_history( 114 &self, 115 request: Request<ViewerHistoryRequest>, 116 ) -> Result<Response<ViewerHistoryResponse>, Status> { 117 let req = request.into_inner(); 118 119 let sessions = get_viewer_history( 120 self.clickhouse.client(), 121 &req.did, 122 req.start_time_ms, 123 req.end_time_ms, 124 req.limit, 125 ) 126 .await 127 .map_err(|e| { 128 error!("failed to get viewer history: {}", e); 129 Status::internal("query failed") 130 })?; 131 132 let sessions_proto: Vec<WatchSession> = sessions 133 .into_iter() 134 .map(|s| WatchSession { 135 session_id: s.session_id, 136 streamer_did: s.streamer_did, 137 stream_id: s.stream_id, 138 start_time_ms: s.start_time_ms, 139 end_time_ms: s.end_time_ms, 140 duration_ms: s.duration_ms, 141 }) 142 .collect(); 143 144 Ok(Response::new(ViewerHistoryResponse { 145 sessions: sessions_proto, 146 })) 147 } 148 149 async fn get_realtime_stats( 150 &self, 151 request: Request<RealtimeStatsRequest>, 152 ) -> Result<Response<RealtimeStatsResponse>, Status> { 153 let req = request.into_inner(); 154 155 let stats = get_realtime_stats( 156 self.clickhouse.client(), 157 req.streamer_did.as_deref(), 158 req.window_minutes, 159 ) 160 .await 161 .map_err(|e| { 162 error!("failed to get realtime stats: {}", e); 163 Status::internal("query failed") 164 })?; 165 166 let streamers: Vec<StreamerRealtimeStats> = stats 167 .into_iter() 168 .map(|s| StreamerRealtimeStats { 169 streamer_did: s.streamer_did, 170 current_viewers: s.current_viewers, 171 total_watch_time_ms: s.total_watch_time_ms, 172 }) 173 .collect(); 174 175 Ok(Response::new(RealtimeStatsResponse { streamers })) 176 } 177 178 async fn delete_user_data( 179 &self, 180 request: Request<DeleteUserDataRequest>, 181 ) -> Result<Response<DeleteUserDataResponse>, Status> { 182 let req = request.into_inner(); 183 184 let request_id = crate::privacy::delete_user_data(&self.clickhouse, req.did) 185 .await 186 .map_err(|e| { 187 error!("failed to create deletion request: {}", e); 188 Status::internal("deletion request failed") 189 })?; 190 191 Ok(Response::new(DeleteUserDataResponse { 192 request_id: request_id.to_string(), 193 status: "pending".to_string(), 194 })) 195 } 196 197 async fn get_deletion_status( 198 &self, 199 request: Request<GetDeletionStatusRequest>, 200 ) -> Result<Response<DeletionStatusResponse>, Status> { 201 let req = request.into_inner(); 202 203 let request_id = Uuid::from_str(&req.request_id).map_err(|e| { 204 warn!("invalid request_id: {}", e); 205 Status::invalid_argument("invalid request_id") 206 })?; 207 208 let status = crate::privacy::get_deletion_status(&self.clickhouse, request_id) 209 .await 210 .map_err(|e| { 211 error!("failed to get deletion status: {}", e); 212 Status::internal("query failed") 213 })?; 214 215 match status { 216 Some(s) => Ok(Response::new(DeletionStatusResponse { 217 request_id: s.request_id.to_string(), 218 did: s.did, 219 status: s.status, 220 requested_at_ms: Some(s.requested_at.timestamp_millis()), 221 completed_at_ms: s.completed_at.map(|dt| dt.timestamp_millis()), 222 })), 223 None => Err(Status::not_found("deletion request not found")), 224 } 225 } 226} 227 228fn convert_to_event_row(event: &Event) -> Result<EventRow, String> { 229 let event_id = 230 Uuid::from_str(&event.event_id).map_err(|e| format!("invalid event_id: {}", e))?; 231 232 let timestamp_secs = event.timestamp_ms / 1000; 233 let timestamp_nanos = ((event.timestamp_ms % 1000) * 1_000_000) as u32; 234 235 let timestamp: DateTime<Utc> = Utc 236 .timestamp_opt(timestamp_secs, timestamp_nanos) 237 .single() 238 .ok_or_else(|| "invalid timestamp".to_string())?; 239 240 Ok(EventRow { 241 event_id, 242 event_type: event.event_type.clone(), 243 device_id: event.device_id.clone(), 244 did: event.did.clone(), 245 session_id: event.session_id.clone(), 246 timestamp, 247 streamer_did: event.streamer_did.clone(), 248 stream_id: event.stream_id.clone(), 249 properties: event.properties_json.clone(), 250 schema_version: event.schema_version as u16, 251 client_version: event.client_version.clone(), 252 platform: event.platform.clone(), 253 }) 254}