forked from
slices.network/slices
Highly ambitious ATProtocol AppView service and sdks
1mod actor_resolver;
2mod api;
3mod atproto_extensions;
4mod auth;
5mod cache;
6mod database;
7mod errors;
8mod graphql;
9mod jetstream;
10mod jetstream_cursor;
11mod jobs;
12mod logging;
13mod models;
14mod sync;
15mod xrpc;
16
17use axum::{
18 Router,
19 routing::{get, post},
20};
21use sqlx::PgPool;
22use std::env;
23use std::sync::Arc;
24use std::sync::atomic::AtomicBool;
25use tokio::sync::Mutex;
26use tower_http::{cors::CorsLayer, trace::TraceLayer};
27use tracing::info;
28
29use crate::database::Database;
30use crate::errors::AppError;
31use crate::jetstream::JetstreamConsumer;
32use crate::jetstream_cursor::PostgresCursorHandler;
33use crate::logging::{Logger, start_log_cleanup_task};
34
35#[derive(Clone)]
36pub struct Config {
37 pub auth_base_url: String,
38 pub relay_endpoint: String,
39 pub system_slice_uri: String,
40 pub default_max_sync_repos: i32,
41}
42
43#[derive(Clone)]
44pub struct AppState {
45 database: Database,
46 database_pool: PgPool,
47 config: Config,
48 pub jetstream_connected: Arc<AtomicBool>,
49 pub auth_cache: Arc<Mutex<cache::SliceCache>>,
50}
51
52#[tokio::main]
53async fn main() -> Result<(), AppError> {
54 // Load environment variables from .env file
55 dotenvy::dotenv().ok();
56
57 // Initialize tracing
58 tracing_subscriber::fmt()
59 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
60 .init();
61
62 // Database connection
63 let database_url = env::var("DATABASE_URL")
64 .unwrap_or_else(|_| "postgresql://slice:slice@localhost:5432/slice".to_string());
65
66 // Configure database pool for high-concurrency workload
67 let pool = sqlx::postgres::PgPoolOptions::new()
68 .max_connections(20) // Increase from default of 10
69 .min_connections(5) // Keep some connections warm
70 .acquire_timeout(std::time::Duration::from_secs(30)) // Match request timeouts
71 .idle_timeout(std::time::Duration::from_secs(300)) // 5 minutes idle timeout
72 .max_lifetime(std::time::Duration::from_secs(1800)) // 30 minutes max lifetime
73 .connect(&database_url)
74 .await?;
75
76 // Run migrations if needed
77 sqlx::migrate!("./migrations").run(&pool).await?;
78
79 let database = Database::new(pool.clone());
80
81 let auth_base_url =
82 env::var("AUTH_BASE_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
83
84 let relay_endpoint = env::var("RELAY_ENDPOINT")
85 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string());
86
87 let system_slice_uri = env::var("SYSTEM_SLICE_URI").unwrap_or_else(|_| {
88 "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/network.slices.slice/3lymhd4jhrd2z".to_string()
89 });
90
91 let default_max_sync_repos = env::var("DEFAULT_MAX_SYNC_REPOS")
92 .unwrap_or_else(|_| "5000".to_string())
93 .parse::<i32>()
94 .unwrap_or(5000);
95
96 let config = Config {
97 auth_base_url,
98 relay_endpoint,
99 system_slice_uri,
100 default_max_sync_repos,
101 };
102
103 // Initialize global logger
104 Logger::init_global(pool.clone());
105
106 // Start log cleanup background task
107 start_log_cleanup_task(pool.clone());
108
109 // Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP)
110 let process_type = env::var("PROCESS_TYPE")
111 .or_else(|_| env::var("FLY_PROCESS_GROUP"))
112 .unwrap_or_else(|_| "all".to_string());
113
114 info!("Starting with process type: {}", process_type);
115
116 let is_all = process_type == "all";
117 let is_app = process_type == "app";
118 let is_worker = process_type == "worker";
119
120 // Start job queue runner (in worker or all processes)
121 if is_worker || is_all {
122 info!("Starting sync job queue runner for worker process");
123 let pool_for_runner = pool.clone();
124 tokio::spawn(async move {
125 tracing::info!("Starting job queue runner...");
126 match jobs::registry()
127 .runner(&pool_for_runner)
128 .set_concurrency(1, 2) // Keep 1-2 sync jobs running at a time (reduced for pool management)
129 .run()
130 .await
131 {
132 Ok(handle) => {
133 tracing::info!("Job runner started successfully, keeping handle alive...");
134 // CRITICAL: We must keep the handle alive for the runner to continue processing jobs
135 // The runner will stop if the handle is dropped
136 std::future::pending::<()>().await; // Keep the task alive indefinitely
137 drop(handle); // This line will never be reached
138 }
139 Err(e) => {
140 tracing::error!("Failed to start job runner: {}", e);
141 }
142 }
143 });
144 } else {
145 info!("Skipping sync job queue runner for app process");
146 }
147
148 // Create shared jetstream connection status
149 let jetstream_connected = Arc::new(AtomicBool::new(false));
150
151 // Start Jetstream consumer (in app or all processes)
152 if is_app || is_all {
153 info!("Starting Jetstream consumer for app process");
154 let database_for_jetstream = database.clone();
155 let pool_for_jetstream = pool.clone();
156 let jetstream_connected_clone = jetstream_connected.clone();
157 tokio::spawn(async move {
158 let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok();
159 let redis_url = env::var("REDIS_URL").ok();
160 let cursor_write_interval = env::var("JETSTREAM_CURSOR_WRITE_INTERVAL_SECS")
161 .unwrap_or_else(|_| "5".to_string())
162 .parse::<u64>()
163 .unwrap_or(5);
164
165 // Reconnection rate limiting (5 retries per minute max)
166 const MAX_RECONNECTS_PER_MINUTE: u32 = 5;
167 const RECONNECT_WINDOW: tokio::time::Duration = tokio::time::Duration::from_secs(60);
168 let mut reconnect_count = 0u32;
169 let mut window_start = std::time::Instant::now();
170
171 let mut retry_delay = tokio::time::Duration::from_secs(5);
172 const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300);
173
174 // Configuration reloader setup (run once)
175 let mut config_reloader_started = false;
176
177 loop {
178 // Rate limiting: reset counter if window has passed
179 let now = std::time::Instant::now();
180 if now.duration_since(window_start) >= RECONNECT_WINDOW {
181 reconnect_count = 0;
182 window_start = now;
183 }
184
185 // Check rate limit
186 if reconnect_count >= MAX_RECONNECTS_PER_MINUTE {
187 let wait_time = RECONNECT_WINDOW - now.duration_since(window_start);
188 tracing::warn!(
189 "Rate limit exceeded: {} reconnects in last minute, waiting {:?}",
190 reconnect_count,
191 wait_time
192 );
193 tokio::time::sleep(wait_time).await;
194 continue;
195 }
196
197 reconnect_count += 1;
198
199 // Read cursor position from database
200 let initial_cursor =
201 PostgresCursorHandler::read_cursor(&pool_for_jetstream, "default").await;
202 if let Some(cursor) = initial_cursor {
203 tracing::info!("Resuming from cursor position: {}", cursor);
204 } else {
205 tracing::info!("No cursor found, starting from latest events");
206 }
207
208 // Create cursor handler
209 let cursor_handler = Arc::new(PostgresCursorHandler::new(
210 pool_for_jetstream.clone(),
211 "default".to_string(),
212 cursor_write_interval,
213 ));
214
215 // Create consumer with cursor support and Redis cache
216 let consumer_result = JetstreamConsumer::new(
217 database_for_jetstream.clone(),
218 jetstream_hostname.clone(),
219 Some(cursor_handler.clone()),
220 initial_cursor,
221 redis_url.clone(),
222 )
223 .await;
224
225 let consumer_arc = match consumer_result {
226 Ok(consumer) => {
227 let arc = Arc::new(consumer);
228
229 // Start configuration reloader only once
230 if !config_reloader_started {
231 JetstreamConsumer::start_configuration_reloader(arc.clone());
232 config_reloader_started = true;
233 }
234
235 arc
236 }
237 Err(e) => {
238 tracing::error!(
239 "Failed to create Jetstream consumer: {} - retry in {:?}",
240 e,
241 retry_delay
242 );
243 jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed);
244 tokio::time::sleep(retry_delay).await;
245 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
246 continue;
247 }
248 };
249
250 // Reset retry delay on successful creation
251 retry_delay = tokio::time::Duration::from_secs(5);
252
253 tracing::info!("Starting Jetstream consumer with cursor support...");
254 jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed);
255
256 // Start consuming with cancellation token
257 let cancellation_token = atproto_jetstream::CancellationToken::new();
258 match consumer_arc.start_consuming(cancellation_token).await {
259 Ok(_) => {
260 tracing::info!("Jetstream consumer shut down normally");
261 jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed);
262 }
263 Err(e) => {
264 tracing::error!("Jetstream consumer failed: {} - will reconnect", e);
265 jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed);
266 tokio::time::sleep(retry_delay).await;
267 retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
268 }
269 }
270 }
271 });
272 } else {
273 info!("Skipping Jetstream consumer for worker process");
274 }
275
276 // Create auth cache for token/session caching (5 minute TTL)
277 let redis_url = env::var("REDIS_URL").ok();
278 let auth_cache_backend = if let Some(redis_url) = redis_url {
279 cache::CacheBackend::Redis {
280 url: redis_url,
281 ttl_seconds: Some(300),
282 }
283 } else {
284 cache::CacheBackend::InMemory {
285 ttl_seconds: Some(300),
286 }
287 };
288 let auth_cache = Arc::new(Mutex::new(
289 cache::CacheFactory::create_slice_cache(auth_cache_backend).await?,
290 ));
291
292 let state = AppState {
293 database: database.clone(),
294 database_pool: pool,
295 config,
296 jetstream_connected,
297 auth_cache,
298 };
299
300 // Build application with routes
301 let app = Router::new()
302 // Health check endpoint
303 .route(
304 "/",
305 get(|| async {
306 r#"
307███████╗██╗ ██╗ ██████╗███████╗███████╗
308██╔════╝██║ ██║██╔════╝██╔════╝██╔════╝
309███████╗██║ ██║██║ █████╗ ███████╗
310╚════██║██║ ██║██║ ██╔══╝ ╚════██║
311███████║███████╗██║╚██████╗███████╗███████║
312╚══════╝╚══════╝╚═╝ ╚═════╝╚══════╝╚══════╝
313 "#
314 }),
315 )
316 // XRPC endpoints
317 .route(
318 "/xrpc/com.atproto.repo.uploadBlob",
319 post(xrpc::com::atproto::repo::upload_blob::handler),
320 )
321 .route(
322 "/xrpc/network.slices.slice.startSync",
323 post(xrpc::network::slices::slice::start_sync::handler),
324 )
325 .route(
326 "/xrpc/network.slices.slice.syncUserCollections",
327 post(xrpc::network::slices::slice::sync_user_collections::handler),
328 )
329 .route(
330 "/xrpc/network.slices.slice.clearSliceRecords",
331 post(xrpc::network::slices::slice::clear_slice_records::handler),
332 )
333 .route(
334 "/xrpc/network.slices.slice.getJobStatus",
335 get(xrpc::network::slices::slice::get_job_status::handler),
336 )
337 .route(
338 "/xrpc/network.slices.slice.getJobHistory",
339 get(xrpc::network::slices::slice::get_job_history::handler),
340 )
341 .route(
342 "/xrpc/network.slices.slice.getJobLogs",
343 get(xrpc::network::slices::slice::get_job_logs::handler),
344 )
345 .route(
346 "/xrpc/network.slices.slice.getJetstreamLogs",
347 get(xrpc::network::slices::slice::get_jetstream_logs::handler),
348 )
349 .route(
350 "/xrpc/network.slices.slice.stats",
351 get(xrpc::network::slices::slice::stats::handler),
352 )
353 .route(
354 "/xrpc/network.slices.slice.getSparklines",
355 post(xrpc::network::slices::slice::get_sparklines::handler),
356 )
357 .route(
358 "/xrpc/network.slices.slice.getSliceRecords",
359 post(xrpc::network::slices::slice::get_slice_records::handler),
360 )
361 .route(
362 "/xrpc/network.slices.slice.openapi",
363 get(xrpc::network::slices::slice::openapi::handler),
364 )
365 .route(
366 "/xrpc/network.slices.slice.getJetstreamStatus",
367 get(xrpc::network::slices::slice::get_jetstream_status::handler),
368 )
369 .route(
370 "/xrpc/network.slices.slice.getActors",
371 post(xrpc::network::slices::slice::get_actors::handler),
372 )
373 .route(
374 "/xrpc/network.slices.slice.createOAuthClient",
375 post(xrpc::network::slices::slice::create_oauth_client::handler),
376 )
377 .route(
378 "/xrpc/network.slices.slice.getOAuthClients",
379 get(xrpc::network::slices::slice::get_oauth_clients::handler),
380 )
381 .route(
382 "/xrpc/network.slices.slice.updateOAuthClient",
383 post(xrpc::network::slices::slice::update_oauth_client::handler),
384 )
385 .route(
386 "/xrpc/network.slices.slice.deleteOAuthClient",
387 post(xrpc::network::slices::slice::delete_oauth_client::handler),
388 )
389 .route(
390 "/xrpc/network.slices.slice.getSyncSummary",
391 get(xrpc::network::slices::slice::get_sync_summary::handler),
392 )
393 // GraphQL endpoint
394 .route(
395 "/graphql",
396 get(graphql::graphql_playground).post(graphql::graphql_handler),
397 )
398 // Dynamic collection-specific XRPC endpoints (wildcard routes must come last)
399 .route(
400 "/xrpc/{*method}",
401 get(api::xrpc_dynamic::dynamic_xrpc_handler),
402 )
403 .route(
404 "/xrpc/{*method}",
405 post(api::xrpc_dynamic::dynamic_xrpc_post_handler),
406 )
407 .layer(TraceLayer::new_for_http())
408 .layer(CorsLayer::permissive())
409 .with_state(state);
410
411 // Start HTTP server (in app or all processes)
412 if is_app || is_all {
413 info!("Starting HTTP server for app process");
414 let port = env::var("PORT").unwrap_or_else(|_| "3000".to_string());
415 let addr = format!("0.0.0.0:{}", port);
416
417 let listener = tokio::net::TcpListener::bind(&addr).await?;
418 info!("Server running on http://{}", addr);
419
420 axum::serve(listener, app).await?;
421 } else {
422 info!("Worker process, no HTTP server started");
423 std::future::pending::<()>().await;
424 }
425
426 Ok(())
427}