Highly ambitious ATProtocol AppView service and sdks
1mod actor_resolver;
2mod api;
3mod atproto_extensions;
4mod auth;
5mod cache;
6mod database;
7mod errors;
8mod graphql;
9mod jetstream;
10mod jetstream_cursor;
11mod jobs;
12mod logging;
13mod models;
14mod sync;
15mod xrpc;
16
17use axum::{
18 Router,
19 routing::{get, post},
20};
21use sqlx::PgPool;
22use std::env;
23use std::sync::Arc;
24use std::sync::atomic::AtomicBool;
25use tokio::sync::Mutex;
26use tower_http::{cors::CorsLayer, trace::TraceLayer};
27use tracing::info;
28
29use crate::database::Database;
30use crate::errors::AppError;
31use crate::jetstream::JetstreamConsumer;
32use crate::jetstream_cursor::PostgresCursorHandler;
33use crate::logging::{Logger, start_log_cleanup_task};
34
35#[derive(Clone)]
36pub struct Config {
37 pub auth_base_url: String,
38 pub relay_endpoint: String,
39 pub system_slice_uri: String,
40 pub default_max_sync_repos: i32,
41}
42
43#[derive(Clone)]
44pub struct AppState {
45 database: Database,
46 database_pool: PgPool,
47 config: Config,
48 pub jetstream_connected: Arc<AtomicBool>,
49 pub auth_cache: Arc<Mutex<cache::SliceCache>>,
50}
51
52#[tokio::main]
53async fn main() -> Result<(), AppError> {
54 // Load environment variables from .env file
55 dotenvy::dotenv().ok();
56
57 // Initialize tracing
58 tracing_subscriber::fmt()
59 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
60 .init();
61
62 // Database connection
63 let database_url = env::var("DATABASE_URL")
64 .unwrap_or_else(|_| "postgresql://slice:slice@localhost:5432/slice".to_string());
65
66 // Configure database pool for high-concurrency workload
67 let pool = sqlx::postgres::PgPoolOptions::new()
68 .max_connections(20) // Increase from default of 10
69 .min_connections(5) // Keep some connections warm
70 .acquire_timeout(std::time::Duration::from_secs(30)) // Match request timeouts
71 .idle_timeout(std::time::Duration::from_secs(300)) // 5 minutes idle timeout
72 .max_lifetime(std::time::Duration::from_secs(1800)) // 30 minutes max lifetime
73 .connect(&database_url)
74 .await?;
75
76 // Run migrations if needed
77 sqlx::migrate!("./migrations").run(&pool).await?;
78
79 let database = Database::new(pool.clone());
80
81 let auth_base_url =
82 env::var("AUTH_BASE_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
83
84 let relay_endpoint = env::var("RELAY_ENDPOINT")
85 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string());
86
87 let system_slice_uri = env::var("SYSTEM_SLICE_URI").unwrap_or_else(|_| {
88 "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/network.slices.slice/3lymhd4jhrd2z".to_string()
89 });
90
91 let default_max_sync_repos = env::var("DEFAULT_MAX_SYNC_REPOS")
92 .unwrap_or_else(|_| "5000".to_string())
93 .parse::<i32>()
94 .unwrap_or(5000);
95
96 let config = Config {
97 auth_base_url,
98 relay_endpoint,
99 system_slice_uri,
100 default_max_sync_repos,
101 };
102
103 // Initialize global logger
104 Logger::init_global(pool.clone());
105
106 // Start log cleanup background task
107 start_log_cleanup_task(pool.clone());
108
109 // Start GraphQL PubSub cleanup task
110 graphql::pubsub::start_cleanup_task();
111
112 // Initialize Redis pub/sub for cross-process sync job updates
113 let redis_url = env::var("REDIS_URL").ok();
114 graphql::schema_ext::sync::initialize_redis_pubsub(redis_url);
115
116 // Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP)
117 let process_type = env::var("PROCESS_TYPE")
118 .or_else(|_| env::var("FLY_PROCESS_GROUP"))
119 .unwrap_or_else(|_| "all".to_string());
120
121 info!("Starting with process type: {}", process_type);
122
123 let is_all = process_type == "all";
124 let is_app = process_type == "app";
125 let is_worker = process_type == "worker";
126
127 // Start job queue runner (in worker or all processes)
128 if is_worker || is_all {
129 info!("Starting sync job queue runner for worker process");
130 let pool_for_runner = pool.clone();
131 tokio::spawn(async move {
132 tracing::info!("Starting job queue runner...");
133 match jobs::registry()
134 .runner(&pool_for_runner)
135 .set_concurrency(1, 2) // Keep 1-2 sync jobs running at a time (reduced for pool management)
136 .run()
137 .await
138 {
139 Ok(handle) => {
140 tracing::info!("Job runner started successfully, keeping handle alive...");
141 // CRITICAL: We must keep the handle alive for the runner to continue processing jobs
142 // The runner will stop if the handle is dropped
143 std::future::pending::<()>().await; // Keep the task alive indefinitely
144 drop(handle); // This line will never be reached
145 }
146 Err(e) => {
147 tracing::error!("Failed to start job runner: {}", e);
148 }
149 }
150 });
151 } else {
152 info!("Skipping sync job queue runner for app process");
153 }
154
155 // Create shared jetstream connection status
156 let jetstream_connected = Arc::new(AtomicBool::new(false));
157
158 // Start Jetstream consumer (in app or all processes)
159 if is_app || is_all {
160 info!("Starting Jetstream consumer for app process");
161 let database_for_jetstream = database.clone();
162 let pool_for_jetstream = pool.clone();
163 let jetstream_connected_clone = jetstream_connected.clone();
164 tokio::spawn(async move {
165 let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok();
166 let redis_url = env::var("REDIS_URL").ok();
167 let cursor_write_interval = env::var("JETSTREAM_CURSOR_WRITE_INTERVAL_SECS")
168 .unwrap_or_else(|_| "5".to_string())
169 .parse::<u64>()
170 .unwrap_or(5);
171
172 // Reconnection rate limiting (5 retries per minute max)
173 const MAX_RECONNECTS_PER_MINUTE: u32 = 5;
174 const RECONNECT_WINDOW: tokio::time::Duration = tokio::time::Duration::from_secs(60);
175 let mut reconnect_count = 0u32;
176 let mut window_start = std::time::Instant::now();
177
178 let mut retry_delay = tokio::time::Duration::from_secs(5);
179 const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300);
180
181 // Configuration reloader setup (run once)
182 let mut config_reloader_started = false;
183
184 loop {
185 // Rate limiting: reset counter if window has passed
186 let now = std::time::Instant::now();
187 if now.duration_since(window_start) >= RECONNECT_WINDOW {
188 reconnect_count = 0;
189 window_start = now;
190 retry_delay = tokio::time::Duration::from_secs(5); // Reset delay after window passes
191 }
192
193 // Check rate limit
194 if reconnect_count >= MAX_RECONNECTS_PER_MINUTE {
195 let wait_time = RECONNECT_WINDOW - now.duration_since(window_start);
196 tracing::warn!(
197 "Rate limit exceeded: {} reconnects in last minute, waiting {:?}",
198 reconnect_count,
199 wait_time
200 );
201 tokio::time::sleep(wait_time).await;
202 continue;
203 }
204
205 reconnect_count += 1;
206 tracing::info!("Jetstream connection attempt #{} (retry delay: {:?})", reconnect_count, retry_delay);
207
208 // Read cursor position from database
209 let initial_cursor =
210 PostgresCursorHandler::read_cursor(&pool_for_jetstream, "default").await;
211 if let Some(cursor) = initial_cursor {
212 tracing::info!("Resuming from cursor position: {}", cursor);
213 } else {
214 tracing::info!("No cursor found, starting from latest events");
215 }
216
217 // Create cursor handler
218 let cursor_handler = Arc::new(PostgresCursorHandler::new(
219 pool_for_jetstream.clone(),
220 "default".to_string(),
221 cursor_write_interval,
222 ));
223
224 // Create consumer with cursor support and Redis cache
225 let consumer_result = JetstreamConsumer::new(
226 database_for_jetstream.clone(),
227 jetstream_hostname.clone(),
228 Some(cursor_handler.clone()),
229 initial_cursor,
230 redis_url.clone(),
231 )
232 .await;
233
234 let consumer_arc = match consumer_result {
235 Ok(consumer) => {
236 let arc = Arc::new(consumer);
237
238 // Start configuration reloader only once
239 if !config_reloader_started {
240 JetstreamConsumer::start_configuration_reloader(arc.clone());
241 config_reloader_started = true;
242 }
243
244 arc
245 }
246 Err(e) => {
247 tracing::error!(
248 "Failed to create Jetstream consumer: {} - retry in {:?}",
249 e,
250 retry_delay
251 );
252 jetstream_connected_clone
253 .store(false, std::sync::atomic::Ordering::Relaxed);
254 tokio::time::sleep(retry_delay).await;
255 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
256 continue;
257 }
258 };
259
260 // Reset retry delay on successful creation
261 retry_delay = tokio::time::Duration::from_secs(5);
262
263 tracing::info!("Starting Jetstream consumer with cursor support...");
264 jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed);
265
266 // Start consuming with cancellation token
267 let cancellation_token = atproto_jetstream::CancellationToken::new();
268 match consumer_arc.start_consuming(cancellation_token).await {
269 Ok(_) => {
270 tracing::info!("Jetstream consumer shut down normally - reconnecting in {:?}", retry_delay);
271 jetstream_connected_clone
272 .store(false, std::sync::atomic::Ordering::Relaxed);
273 tokio::time::sleep(retry_delay).await;
274 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
275 }
276 Err(e) => {
277 tracing::error!("Jetstream consumer failed: {} - reconnecting in {:?}", e, retry_delay);
278 jetstream_connected_clone
279 .store(false, std::sync::atomic::Ordering::Relaxed);
280 tokio::time::sleep(retry_delay).await;
281 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
282 }
283 }
284 }
285 });
286 } else {
287 info!("Skipping Jetstream consumer for worker process");
288 }
289
290 // Create auth cache for token/session caching (5 minute TTL)
291 let redis_url = env::var("REDIS_URL").ok();
292 let auth_cache_backend = if let Some(redis_url) = redis_url {
293 cache::CacheBackend::Redis {
294 url: redis_url,
295 ttl_seconds: Some(300),
296 }
297 } else {
298 cache::CacheBackend::InMemory {
299 ttl_seconds: Some(300),
300 }
301 };
302 let auth_cache = Arc::new(Mutex::new(
303 cache::CacheFactory::create_slice_cache(auth_cache_backend).await?,
304 ));
305
306 let state = AppState {
307 database: database.clone(),
308 database_pool: pool,
309 config,
310 jetstream_connected,
311 auth_cache,
312 };
313
314 // Build application with routes
315 let app = Router::new()
316 // Health check endpoint
317 .route(
318 "/",
319 get(|| async {
320 r#"
321███████╗██╗ ██╗ ██████╗███████╗███████╗
322██╔════╝██║ ██║██╔════╝██╔════╝██╔════╝
323███████╗██║ ██║██║ █████╗ ███████╗
324╚════██║██║ ██║██║ ██╔══╝ ╚════██║
325███████║███████╗██║╚██████╗███████╗███████║
326╚══════╝╚══════╝╚═╝ ╚═════╝╚══════╝╚══════╝
327 "#
328 }),
329 )
330 // XRPC endpoints
331 .route(
332 "/xrpc/com.atproto.repo.uploadBlob",
333 post(xrpc::com::atproto::repo::upload_blob::handler),
334 )
335 .route(
336 "/xrpc/network.slices.slice.startSync",
337 post(xrpc::network::slices::slice::start_sync::handler),
338 )
339 .route(
340 "/xrpc/network.slices.slice.syncUserCollections",
341 post(xrpc::network::slices::slice::sync_user_collections::handler),
342 )
343 .route(
344 "/xrpc/network.slices.slice.clearSliceRecords",
345 post(xrpc::network::slices::slice::clear_slice_records::handler),
346 )
347 .route(
348 "/xrpc/network.slices.slice.getJobStatus",
349 get(xrpc::network::slices::slice::get_job_status::handler),
350 )
351 .route(
352 "/xrpc/network.slices.slice.getJobHistory",
353 get(xrpc::network::slices::slice::get_job_history::handler),
354 )
355 .route(
356 "/xrpc/network.slices.slice.getJobLogs",
357 get(xrpc::network::slices::slice::get_job_logs::handler),
358 )
359 .route(
360 "/xrpc/network.slices.slice.getJetstreamLogs",
361 get(xrpc::network::slices::slice::get_jetstream_logs::handler),
362 )
363 .route(
364 "/xrpc/network.slices.slice.stats",
365 get(xrpc::network::slices::slice::stats::handler),
366 )
367 .route(
368 "/xrpc/network.slices.slice.getSparklines",
369 post(xrpc::network::slices::slice::get_sparklines::handler),
370 )
371 .route(
372 "/xrpc/network.slices.slice.getSliceRecords",
373 post(xrpc::network::slices::slice::get_slice_records::handler),
374 )
375 .route(
376 "/xrpc/network.slices.slice.openapi",
377 get(xrpc::network::slices::slice::openapi::handler),
378 )
379 .route(
380 "/xrpc/network.slices.slice.getJetstreamStatus",
381 get(xrpc::network::slices::slice::get_jetstream_status::handler),
382 )
383 .route(
384 "/xrpc/network.slices.slice.getActors",
385 post(xrpc::network::slices::slice::get_actors::handler),
386 )
387 .route(
388 "/xrpc/network.slices.slice.createOAuthClient",
389 post(xrpc::network::slices::slice::create_oauth_client::handler),
390 )
391 .route(
392 "/xrpc/network.slices.slice.getOAuthClients",
393 get(xrpc::network::slices::slice::get_oauth_clients::handler),
394 )
395 .route(
396 "/xrpc/network.slices.slice.updateOAuthClient",
397 post(xrpc::network::slices::slice::update_oauth_client::handler),
398 )
399 .route(
400 "/xrpc/network.slices.slice.deleteOAuthClient",
401 post(xrpc::network::slices::slice::delete_oauth_client::handler),
402 )
403 .route(
404 "/xrpc/network.slices.slice.getSyncSummary",
405 get(xrpc::network::slices::slice::get_sync_summary::handler),
406 )
407 // GraphQL endpoints
408 .route(
409 "/graphql",
410 get(graphql::graphql_playground).post(graphql::graphql_handler),
411 )
412 .route("/graphql/ws", get(graphql::graphql_subscription_handler))
413 // Dynamic collection-specific XRPC endpoints (wildcard routes must come last)
414 .route(
415 "/xrpc/{*method}",
416 get(api::xrpc_dynamic::dynamic_xrpc_handler),
417 )
418 .route(
419 "/xrpc/{*method}",
420 post(api::xrpc_dynamic::dynamic_xrpc_post_handler),
421 )
422 .layer(TraceLayer::new_for_http())
423 .layer(CorsLayer::permissive())
424 .with_state(state);
425
426 // Start HTTP server (in app or all processes)
427 if is_app || is_all {
428 info!("Starting HTTP server for app process");
429 let port = env::var("PORT").unwrap_or_else(|_| "3000".to_string());
430 let addr = format!("0.0.0.0:{}", port);
431
432 let listener = tokio::net::TcpListener::bind(&addr).await?;
433 info!("Server running on http://{}", addr);
434
435 axum::serve(listener, app).await?;
436 } else {
437 info!("Worker process, no HTTP server started");
438 std::future::pending::<()>().await;
439 }
440
441 Ok(())
442}