Live video on the AT Protocol
1use crate::db::clickhouse::{ClickHouseClient, EventRow};
2use crate::ingest::{EventBuffer, validate_event};
3use crate::proto::analytics_server::Analytics;
4use crate::proto::*;
5use crate::query::{get_realtime_stats, get_streamer_stats, get_viewer_history};
6use chrono::{DateTime, TimeZone, Utc};
7use std::str::FromStr;
8use std::sync::Arc;
9use tonic::{Request, Response, Status};
10use tracing::{error, warn};
11use uuid::Uuid;
12
13pub struct AnalyticsService {
14 clickhouse: ClickHouseClient,
15 buffer: Arc<EventBuffer>,
16}
17
18impl AnalyticsService {
19 pub fn new(clickhouse: ClickHouseClient, buffer: Arc<EventBuffer>) -> Self {
20 Self { clickhouse, buffer }
21 }
22}
23
24#[tonic::async_trait]
25impl Analytics for AnalyticsService {
26 async fn ingest_events(
27 &self,
28 request: Request<IngestEventsRequest>,
29 ) -> Result<Response<IngestEventsResponse>, Status> {
30 let req = request.into_inner();
31 let mut accepted = 0u32;
32 let mut rejected = 0u32;
33 let mut errors = Vec::new();
34
35 let mut valid_events = Vec::new();
36
37 for event in req.events {
38 match validate_event(&event) {
39 Ok(_) => match convert_to_event_row(&event) {
40 Ok(row) => {
41 valid_events.push(row);
42 accepted += 1;
43 }
44 Err(e) => {
45 rejected += 1;
46 errors.push(format!(
47 "conversion error for event {}: {}",
48 event.event_id, e
49 ));
50 }
51 },
52 Err(e) => {
53 rejected += 1;
54 errors.push(format!(
55 "validation error for event {}: {}",
56 event.event_id, e
57 ));
58 }
59 }
60 }
61
62 if !valid_events.is_empty() {
63 if let Err(e) = self.buffer.add_events(valid_events).await {
64 error!("failed to buffer events: {}", e);
65 return Err(Status::internal("failed to buffer events"));
66 }
67 }
68
69 Ok(Response::new(IngestEventsResponse {
70 accepted,
71 rejected,
72 errors,
73 }))
74 }
75
76 async fn get_streamer_stats(
77 &self,
78 request: Request<StreamerStatsRequest>,
79 ) -> Result<Response<StreamerStatsResponse>, Status> {
80 let req = request.into_inner();
81
82 let (total_views, total_watch_time_ms, unique_viewers, daily_stats) = get_streamer_stats(
83 self.clickhouse.client(),
84 &req.streamer_did,
85 req.start_time_ms,
86 req.end_time_ms,
87 )
88 .await
89 .map_err(|e| {
90 error!("failed to get streamer stats: {}", e);
91 Status::internal("query failed")
92 })?;
93
94 let daily_stats_proto: Vec<DailyStats> = daily_stats
95 .into_iter()
96 .map(|s| DailyStats {
97 date: s.date,
98 views: s.views,
99 watch_time_ms: s.watch_time_ms,
100 unique_viewers: s.unique_viewers,
101 })
102 .collect();
103
104 Ok(Response::new(StreamerStatsResponse {
105 streamer_did: req.streamer_did,
106 total_views,
107 total_watch_time_ms,
108 unique_viewers,
109 daily_stats: daily_stats_proto,
110 }))
111 }
112
113 async fn get_viewer_history(
114 &self,
115 request: Request<ViewerHistoryRequest>,
116 ) -> Result<Response<ViewerHistoryResponse>, Status> {
117 let req = request.into_inner();
118
119 let sessions = get_viewer_history(
120 self.clickhouse.client(),
121 &req.did,
122 req.start_time_ms,
123 req.end_time_ms,
124 req.limit,
125 )
126 .await
127 .map_err(|e| {
128 error!("failed to get viewer history: {}", e);
129 Status::internal("query failed")
130 })?;
131
132 let sessions_proto: Vec<WatchSession> = sessions
133 .into_iter()
134 .map(|s| WatchSession {
135 session_id: s.session_id,
136 streamer_did: s.streamer_did,
137 stream_id: s.stream_id,
138 start_time_ms: s.start_time_ms,
139 end_time_ms: s.end_time_ms,
140 duration_ms: s.duration_ms,
141 })
142 .collect();
143
144 Ok(Response::new(ViewerHistoryResponse {
145 sessions: sessions_proto,
146 }))
147 }
148
149 async fn get_realtime_stats(
150 &self,
151 request: Request<RealtimeStatsRequest>,
152 ) -> Result<Response<RealtimeStatsResponse>, Status> {
153 let req = request.into_inner();
154
155 let stats = get_realtime_stats(
156 self.clickhouse.client(),
157 req.streamer_did.as_deref(),
158 req.window_minutes,
159 )
160 .await
161 .map_err(|e| {
162 error!("failed to get realtime stats: {}", e);
163 Status::internal("query failed")
164 })?;
165
166 let streamers: Vec<StreamerRealtimeStats> = stats
167 .into_iter()
168 .map(|s| StreamerRealtimeStats {
169 streamer_did: s.streamer_did,
170 current_viewers: s.current_viewers,
171 total_watch_time_ms: s.total_watch_time_ms,
172 })
173 .collect();
174
175 Ok(Response::new(RealtimeStatsResponse { streamers }))
176 }
177
178 async fn delete_user_data(
179 &self,
180 request: Request<DeleteUserDataRequest>,
181 ) -> Result<Response<DeleteUserDataResponse>, Status> {
182 let req = request.into_inner();
183
184 let request_id = crate::privacy::delete_user_data(&self.clickhouse, req.did)
185 .await
186 .map_err(|e| {
187 error!("failed to create deletion request: {}", e);
188 Status::internal("deletion request failed")
189 })?;
190
191 Ok(Response::new(DeleteUserDataResponse {
192 request_id: request_id.to_string(),
193 status: "pending".to_string(),
194 }))
195 }
196
197 async fn get_deletion_status(
198 &self,
199 request: Request<GetDeletionStatusRequest>,
200 ) -> Result<Response<DeletionStatusResponse>, Status> {
201 let req = request.into_inner();
202
203 let request_id = Uuid::from_str(&req.request_id).map_err(|e| {
204 warn!("invalid request_id: {}", e);
205 Status::invalid_argument("invalid request_id")
206 })?;
207
208 let status = crate::privacy::get_deletion_status(&self.clickhouse, request_id)
209 .await
210 .map_err(|e| {
211 error!("failed to get deletion status: {}", e);
212 Status::internal("query failed")
213 })?;
214
215 match status {
216 Some(s) => Ok(Response::new(DeletionStatusResponse {
217 request_id: s.request_id.to_string(),
218 did: s.did,
219 status: s.status,
220 requested_at_ms: Some(s.requested_at.timestamp_millis()),
221 completed_at_ms: s.completed_at.map(|dt| dt.timestamp_millis()),
222 })),
223 None => Err(Status::not_found("deletion request not found")),
224 }
225 }
226}
227
228fn convert_to_event_row(event: &Event) -> Result<EventRow, String> {
229 let event_id =
230 Uuid::from_str(&event.event_id).map_err(|e| format!("invalid event_id: {}", e))?;
231
232 let timestamp_secs = event.timestamp_ms / 1000;
233 let timestamp_nanos = ((event.timestamp_ms % 1000) * 1_000_000) as u32;
234
235 let timestamp: DateTime<Utc> = Utc
236 .timestamp_opt(timestamp_secs, timestamp_nanos)
237 .single()
238 .ok_or_else(|| "invalid timestamp".to_string())?;
239
240 Ok(EventRow {
241 event_id,
242 event_type: event.event_type.clone(),
243 device_id: event.device_id.clone(),
244 did: event.did.clone(),
245 session_id: event.session_id.clone(),
246 timestamp,
247 streamer_did: event.streamer_did.clone(),
248 stream_id: event.stream_id.clone(),
249 properties: event.properties_json.clone(),
250 schema_version: event.schema_version as u16,
251 client_version: event.client_version.clone(),
252 platform: event.platform.clone(),
253 })
254}