Highly ambitious ATProtocol AppView service and sdks

run sync in separate worker process, update fly.toml to support that

Changed files
+203 -134
api
+3
api/.env.example
··· 4 4 # Server configuration 5 5 PORT=3000 6 6 7 + # Process type: all (everything), app (HTTP + Jetstream), or worker (sync jobs only) 8 + # PROCESS_TYPE=all 9 + 7 10 # Authentication service base URL 8 11 AUTH_BASE_URL=http://localhost:8081 9 12
+36
api/CLAUDE.md
··· 33 33 SYSTEM_SLICE_URI=at://did:plc:bcgltzqazw5tb6k2g3ttenbj/network.slices.slice/3lymhd4jhrd2z 34 34 AUTH_BASE_URL=http://localhost:8081 35 35 RELAY_ENDPOINT=https://relay1.us-west.bsky.network 36 + PROCESS_TYPE=all # Optional: all (default), app, worker 36 37 ``` 38 + 39 + ### Process Types 40 + 41 + The application supports running different components in separate processes for better resource isolation and scaling: 42 + 43 + - `all` (default): Everything (HTTP API + Jetstream + sync workers) 44 + - `app`: HTTP API server + Jetstream real-time indexing 45 + - `worker`: Background sync job processing only 46 + 47 + Set via `PROCESS_TYPE` environment variable (or `FLY_PROCESS_GROUP` on Fly.io). 37 48 38 49 ## Common Development Commands 39 50 ··· 174 185 - **Auth System**: Uses dedicated auth cache for OAuth and AT Protocol session caching 175 186 - **Actor Resolution**: Caches DID resolution results to avoid repeated lookups 176 187 - **Automatic Fallback**: Redis failures automatically fall back to in-memory caching without errors 188 + 189 + ## Deployment 190 + 191 + ### Fly.io (Multi-Process Architecture) 192 + 193 + The application is configured to run different components in separate process groups on Fly.io: 194 + 195 + ```toml 196 + [processes] 197 + app = "app" # HTTP API + Jetstream 198 + worker = "worker" # Sync job processing 199 + ``` 200 + 201 + **Scale processes independently:** 202 + ```bash 203 + # Scale app instances (HTTP + Jetstream) 204 + fly scale count app=2 205 + 206 + # Scale sync workers (for heavy backfills) 207 + fly scale count worker=5 208 + 209 + # Different VM sizes per workload 210 + fly scale vm shared-cpu-1x --process-group app 211 + fly scale vm shared-cpu-2x --process-group worker 212 + ```
+5 -1
api/fly.toml
··· 15 15 AUTH_BASE_URL="https://slices-aip.fly.dev" 16 16 SYSTEM_SLICE_URI="at://did:plc:bcgltzqazw5tb6k2g3ttenbj/network.slices.slice/3lymhd4jhrd2z" 17 17 18 + [processes] 19 + app = "slices" 20 + worker = "slices" 21 + 18 22 [http_service] 19 23 internal_port = 8080 20 24 force_https = true 21 25 auto_stop_machines = 'stop' 22 26 auto_start_machines = true 23 27 min_machines_running = 1 24 - processes = ['app'] 28 + processes = ['app'] # Only app process handles HTTP 25 29 26 30 [[vm]] 27 31 memory = '1gb'
+2 -5
api/src/jetstream.rs
··· 803 803 804 804 /// Load slice configurations 805 805 pub async fn load_slice_configurations(&self) -> Result<(), JetstreamError> { 806 - info!("Jetstream consumer now uses on-demand loading with caching"); 806 + info!("Loading slice configurations..."); 807 807 808 808 // Get all slices and update cached list 809 809 let slices = self.database.get_all_slices().await?; 810 810 *self.slices_list.write().await = slices.clone(); 811 - info!( 812 - "Found {} total slices in database - data will be loaded on-demand", 813 - slices.len() 814 - ); 811 + info!("Found {} total slices in database", slices.len()); 815 812 816 813 Ok(()) 817 814 }
+157 -128
api/src/main.rs
··· 98 98 // Start log cleanup background task 99 99 start_log_cleanup_task(pool.clone()); 100 100 101 - // Start job queue runner 102 - let pool_for_runner = pool.clone(); 103 - tokio::spawn(async move { 104 - tracing::info!("Starting job queue runner..."); 105 - match jobs::registry() 106 - .runner(&pool_for_runner) 107 - .set_concurrency(1, 2) // Keep 1-2 sync jobs running at a time (reduced for pool management) 108 - .run() 109 - .await 110 - { 111 - Ok(handle) => { 112 - tracing::info!("Job runner started successfully, keeping handle alive..."); 113 - // CRITICAL: We must keep the handle alive for the runner to continue processing jobs 114 - // The runner will stop if the handle is dropped 115 - std::future::pending::<()>().await; // Keep the task alive indefinitely 116 - drop(handle); // This line will never be reached 117 - } 118 - Err(e) => { 119 - tracing::error!("Failed to start job runner: {}", e); 101 + // Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP) 102 + let process_type = env::var("PROCESS_TYPE") 103 + .or_else(|_| env::var("FLY_PROCESS_GROUP")) 104 + .unwrap_or_else(|_| "all".to_string()); 105 + 106 + info!("Starting with process type: {}", process_type); 107 + 108 + let is_all = process_type == "all"; 109 + let is_app = process_type == "app"; 110 + let is_worker = process_type == "worker"; 111 + 112 + // Start job queue runner (in worker or all processes) 113 + if is_worker || is_all { 114 + info!("Starting sync job queue runner for worker process"); 115 + let pool_for_runner = pool.clone(); 116 + tokio::spawn(async move { 117 + tracing::info!("Starting job queue runner..."); 118 + match jobs::registry() 119 + .runner(&pool_for_runner) 120 + .set_concurrency(1, 2) // Keep 1-2 sync jobs running at a time (reduced for pool management) 121 + .run() 122 + .await 123 + { 124 + Ok(handle) => { 125 + tracing::info!("Job runner started successfully, keeping handle alive..."); 126 + // CRITICAL: We must keep the handle alive for the runner to continue processing jobs 127 + // The runner will stop if the handle is dropped 128 + std::future::pending::<()>().await; // Keep the task alive indefinitely 129 + drop(handle); // This line will never be reached 130 + } 131 + Err(e) => { 132 + tracing::error!("Failed to start job runner: {}", e); 133 + } 120 134 } 121 - } 122 - }); 135 + }); 136 + } else { 137 + info!("Skipping sync job queue runner for app process"); 138 + } 123 139 124 140 // Create shared jetstream connection status 125 141 let jetstream_connected = Arc::new(AtomicBool::new(false)); 126 142 127 - // Start Jetstream consumer with cursor persistence and improved reconnection logic 128 - let database_for_jetstream = database.clone(); 129 - let pool_for_jetstream = pool.clone(); 130 - let jetstream_connected_clone = jetstream_connected.clone(); 131 - tokio::spawn(async move { 132 - let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok(); 133 - let redis_url = env::var("REDIS_URL").ok(); 134 - let cursor_write_interval = env::var("JETSTREAM_CURSOR_WRITE_INTERVAL_SECS") 135 - .unwrap_or_else(|_| "5".to_string()) 136 - .parse::<u64>() 137 - .unwrap_or(5); 143 + // Start Jetstream consumer (in app or all processes) 144 + if is_app || is_all { 145 + info!("Starting Jetstream consumer for app process"); 146 + let database_for_jetstream = database.clone(); 147 + let pool_for_jetstream = pool.clone(); 148 + let jetstream_connected_clone = jetstream_connected.clone(); 149 + tokio::spawn(async move { 150 + let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok(); 151 + let redis_url = env::var("REDIS_URL").ok(); 152 + let cursor_write_interval = env::var("JETSTREAM_CURSOR_WRITE_INTERVAL_SECS") 153 + .unwrap_or_else(|_| "5".to_string()) 154 + .parse::<u64>() 155 + .unwrap_or(5); 156 + 157 + // Reconnection rate limiting (5 retries per minute max) 158 + const MAX_RECONNECTS_PER_MINUTE: u32 = 5; 159 + const RECONNECT_WINDOW: tokio::time::Duration = tokio::time::Duration::from_secs(60); 160 + let mut reconnect_count = 0u32; 161 + let mut window_start = std::time::Instant::now(); 138 162 139 - // Reconnection rate limiting (5 retries per minute max) 140 - const MAX_RECONNECTS_PER_MINUTE: u32 = 5; 141 - const RECONNECT_WINDOW: tokio::time::Duration = tokio::time::Duration::from_secs(60); 142 - let mut reconnect_count = 0u32; 143 - let mut window_start = std::time::Instant::now(); 163 + let mut retry_delay = tokio::time::Duration::from_secs(5); 164 + const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300); 144 165 145 - let mut retry_delay = tokio::time::Duration::from_secs(5); 146 - const MAX_RETRY_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(300); 166 + // Configuration reloader setup (run once) 167 + let mut config_reloader_started = false; 147 168 148 - // Configuration reloader setup (run once) 149 - let mut config_reloader_started = false; 169 + loop { 170 + // Rate limiting: reset counter if window has passed 171 + let now = std::time::Instant::now(); 172 + if now.duration_since(window_start) >= RECONNECT_WINDOW { 173 + reconnect_count = 0; 174 + window_start = now; 175 + } 150 176 151 - loop { 152 - // Rate limiting: reset counter if window has passed 153 - let now = std::time::Instant::now(); 154 - if now.duration_since(window_start) >= RECONNECT_WINDOW { 155 - reconnect_count = 0; 156 - window_start = now; 157 - } 177 + // Check rate limit 178 + if reconnect_count >= MAX_RECONNECTS_PER_MINUTE { 179 + let wait_time = RECONNECT_WINDOW - now.duration_since(window_start); 180 + tracing::warn!( 181 + "Rate limit exceeded: {} reconnects in last minute, waiting {:?}", 182 + reconnect_count, 183 + wait_time 184 + ); 185 + tokio::time::sleep(wait_time).await; 186 + continue; 187 + } 158 188 159 - // Check rate limit 160 - if reconnect_count >= MAX_RECONNECTS_PER_MINUTE { 161 - let wait_time = RECONNECT_WINDOW - now.duration_since(window_start); 162 - tracing::warn!( 163 - "Rate limit exceeded: {} reconnects in last minute, waiting {:?}", 164 - reconnect_count, 165 - wait_time 166 - ); 167 - tokio::time::sleep(wait_time).await; 168 - continue; 169 - } 189 + reconnect_count += 1; 170 190 171 - reconnect_count += 1; 191 + // Read cursor position from database 192 + let initial_cursor = 193 + PostgresCursorHandler::read_cursor(&pool_for_jetstream, "default").await; 194 + if let Some(cursor) = initial_cursor { 195 + tracing::info!("Resuming from cursor position: {}", cursor); 196 + } else { 197 + tracing::info!("No cursor found, starting from latest events"); 198 + } 172 199 173 - // Read cursor position from database 174 - let initial_cursor = 175 - PostgresCursorHandler::read_cursor(&pool_for_jetstream, "default").await; 176 - if let Some(cursor) = initial_cursor { 177 - tracing::info!("Resuming from cursor position: {}", cursor); 178 - } else { 179 - tracing::info!("No cursor found, starting from latest events"); 180 - } 200 + // Create cursor handler 201 + let cursor_handler = Arc::new(PostgresCursorHandler::new( 202 + pool_for_jetstream.clone(), 203 + "default".to_string(), 204 + cursor_write_interval, 205 + )); 181 206 182 - // Create cursor handler 183 - let cursor_handler = Arc::new(PostgresCursorHandler::new( 184 - pool_for_jetstream.clone(), 185 - "default".to_string(), 186 - cursor_write_interval, 187 - )); 207 + // Create consumer with cursor support and Redis cache 208 + let consumer_result = JetstreamConsumer::new( 209 + database_for_jetstream.clone(), 210 + jetstream_hostname.clone(), 211 + Some(cursor_handler.clone()), 212 + initial_cursor, 213 + redis_url.clone(), 214 + ) 215 + .await; 188 216 189 - // Create consumer with cursor support and Redis cache 190 - let consumer_result = JetstreamConsumer::new( 191 - database_for_jetstream.clone(), 192 - jetstream_hostname.clone(), 193 - Some(cursor_handler.clone()), 194 - initial_cursor, 195 - redis_url.clone(), 196 - ) 197 - .await; 217 + let consumer_arc = match consumer_result { 218 + Ok(consumer) => { 219 + let arc = Arc::new(consumer); 198 220 199 - let consumer_arc = match consumer_result { 200 - Ok(consumer) => { 201 - let arc = Arc::new(consumer); 221 + // Start configuration reloader only once 222 + if !config_reloader_started { 223 + JetstreamConsumer::start_configuration_reloader(arc.clone()); 224 + config_reloader_started = true; 225 + } 202 226 203 - // Start configuration reloader only once 204 - if !config_reloader_started { 205 - JetstreamConsumer::start_configuration_reloader(arc.clone()); 206 - config_reloader_started = true; 227 + arc 207 228 } 208 - 209 - arc 210 - } 211 - Err(e) => { 212 - tracing::error!( 213 - "Failed to create Jetstream consumer: {} - retry in {:?}", 214 - e, 215 - retry_delay 216 - ); 217 - jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 218 - tokio::time::sleep(retry_delay).await; 219 - retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 220 - continue; 221 - } 222 - }; 229 + Err(e) => { 230 + tracing::error!( 231 + "Failed to create Jetstream consumer: {} - retry in {:?}", 232 + e, 233 + retry_delay 234 + ); 235 + jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 236 + tokio::time::sleep(retry_delay).await; 237 + retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 238 + continue; 239 + } 240 + }; 223 241 224 - // Reset retry delay on successful creation 225 - retry_delay = tokio::time::Duration::from_secs(5); 242 + // Reset retry delay on successful creation 243 + retry_delay = tokio::time::Duration::from_secs(5); 226 244 227 - tracing::info!("Starting Jetstream consumer with cursor support..."); 228 - jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed); 245 + tracing::info!("Starting Jetstream consumer with cursor support..."); 246 + jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed); 229 247 230 - // Start consuming with cancellation token 231 - let cancellation_token = atproto_jetstream::CancellationToken::new(); 232 - match consumer_arc.start_consuming(cancellation_token).await { 233 - Ok(_) => { 234 - tracing::info!("Jetstream consumer shut down normally"); 235 - jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 236 - } 237 - Err(e) => { 238 - tracing::error!("Jetstream consumer failed: {} - will reconnect", e); 239 - jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 240 - tokio::time::sleep(retry_delay).await; 241 - retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 248 + // Start consuming with cancellation token 249 + let cancellation_token = atproto_jetstream::CancellationToken::new(); 250 + match consumer_arc.start_consuming(cancellation_token).await { 251 + Ok(_) => { 252 + tracing::info!("Jetstream consumer shut down normally"); 253 + jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 254 + } 255 + Err(e) => { 256 + tracing::error!("Jetstream consumer failed: {} - will reconnect", e); 257 + jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 258 + tokio::time::sleep(retry_delay).await; 259 + retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 260 + } 242 261 } 243 262 } 244 - } 245 - }); 263 + }); 264 + } else { 265 + info!("Skipping Jetstream consumer for worker process"); 266 + } 246 267 247 268 // Create auth cache for token/session caching (5 minute TTL) 248 269 let redis_url = env::var("REDIS_URL").ok(); ··· 366 387 .layer(CorsLayer::permissive()) 367 388 .with_state(state); 368 389 369 - let port = env::var("PORT").unwrap_or_else(|_| "3000".to_string()); 370 - let addr = format!("0.0.0.0:{}", port); 390 + // Start HTTP server (in app or all processes) 391 + if is_app || is_all { 392 + info!("Starting HTTP server for app process"); 393 + let port = env::var("PORT").unwrap_or_else(|_| "3000".to_string()); 394 + let addr = format!("0.0.0.0:{}", port); 371 395 372 - let listener = tokio::net::TcpListener::bind(&addr).await?; 373 - info!("Server running on http://{}", addr); 396 + let listener = tokio::net::TcpListener::bind(&addr).await?; 397 + info!("Server running on http://{}", addr); 374 398 375 - axum::serve(listener, app).await?; 399 + axum::serve(listener, app).await?; 400 + } else { 401 + info!("Worker process, no HTTP server started"); 402 + std::future::pending::<()>().await; 403 + } 404 + 376 405 Ok(()) 377 406 }