The smokesignal.events web application

feature: tap rollout to replace jetstream

+1 -1
.gitignore
··· 44 44 /keys.json 45 45 /.vscode/launch.json 46 46 .*.env 47 - .env 47 + .envtmp/
+11 -17
CLAUDE.md
··· 4 4 5 5 ## Project Overview 6 6 7 - Smokesignal is a Rust-based event and RSVP management application built for the AT Protocol ecosystem. It provides decentralized identity management, OAuth authentication, event coordination, real-time event streaming, search capabilities, webhook notifications, and email notifications. The application features RSVP acceptance workflows, private event content with conditional display, profile caching, and content storage with S3 support. It supports both standard AT Protocol OAuth (PDS) and AIP (AT Protocol Improvement Proposal) OAuth flows, with backend selection determined by runtime configuration. 7 + Smokesignal is a Rust-based event and RSVP management application built for the AT Protocol ecosystem. It provides decentralized identity management, OAuth authentication, event coordination, real-time event streaming, search capabilities, and email notifications. The application features RSVP acceptance workflows, private event content with conditional display, profile caching, and content storage with S3 support. It supports both standard AT Protocol OAuth (PDS) and AIP (AT Protocol Improvement Proposal) OAuth flows, with backend selection determined by runtime configuration. 8 8 9 9 ## Tech Stack 10 10 ··· 23 23 - **Cryptography**: P-256 ECDSA with elliptic-curve and p256 crates 24 24 - **Task Management**: Tokio with task tracking and graceful shutdown 25 25 - **DNS Resolution**: Hickory resolver with DoH/DoT support 26 - - **Real-time**: Jetstream consumer for AT Protocol firehose 27 - - **Webhooks**: Event notification system with background processing 26 + - **Real-time**: TAP consumer for AT Protocol events with backfill support 28 27 - **Email**: Lettre for SMTP-based email notifications 29 28 - **Image Processing**: Image resizing and optimization for avatars and content 30 29 - **Rate Limiting**: Redis-backed throttling for API endpoints ··· 105 104 │ ├── denylist.rs # Handle blocking 106 105 │ ├── content.rs # Content storage abstraction (filesystem/S3) 107 106 │ ├── private_event_content.rs # Private event content 108 - │ ├── webhook.rs # Webhook storage operations 109 107 │ ├── notification.rs # Email notification preferences 110 108 │ ├── distributed_lock.rs # Redis-based distributed locking 111 109 │ └── atproto.rs # AT Protocol specific storage 112 110 ├── config.rs # Environment configuration 113 111 ├── key_provider.rs # JWT key management 114 112 ├── i18n.rs # Internationalization 115 - ├── consumer.rs # Jetstream event consumer 113 + ├── tap_processor.rs # TAP event consumer and processor 116 114 ├── processor.rs # Content fetcher for events 117 115 ├── service.rs # Service document and DID management 118 - ├── webhooks.rs # Webhook event constants 119 116 ├── emailer.rs # Email sending functionality 120 117 ├── email_templates.rs # Email template rendering 121 118 ├── email_confirmation.rs # Email confirmation tokens ··· 127 124 ├── facets.rs # AT Protocol facets parsing 128 125 ├── task_identity_refresh.rs # Identity refresh task 129 126 ├── task_oauth_requests_cleanup.rs # OAuth cleanup task 130 - ├── task_search_indexer.rs # OpenSearch indexing 131 - └── task_webhooks.rs # Webhook processing 127 + └── task_search_indexer.rs # OpenSearch indexing 132 128 ``` 133 129 134 130 ### Key Patterns ··· 137 133 - **Dependency Injection**: Services passed through application context 138 134 - **Template-driven UI**: Server-side rendering with optional reloading 139 135 - **Background Tasks**: Multiple async workers for maintenance and processing 140 - - **Event-driven**: Jetstream consumer for real-time AT Protocol events 136 + - **Event-driven**: TAP consumer for real-time AT Protocol events with backfill support 141 137 142 138 ### Database Schema 143 139 - `identity_profiles` - User identity and preferences (including discovery settings) ··· 153 149 - `private_event_content` - Conditional event content based on display criteria 154 150 - `denylist` - Handle blocking for moderation 155 151 - `did_documents` - Cached DID documents for performance 156 - - `identity_webhooks` - User-configured webhook endpoints 157 152 158 153 ## Configuration Requirements 159 154 ··· 182 177 - `RUST_LOG` - Logging configuration (default: info) 183 178 - `PORT` - Server port (default: 3000) 184 179 - `BIND_ADDR` - Bind address (default: 0.0.0.0) 185 - - `ENABLE_JETSTREAM` - Enable Jetstream consumer (default: true) 186 - - `ENABLE_WEBHOOKS` - Enable webhook functionality 187 - - `ENABLE_TASK_WEBHOOKS` - Enable webhook processing task 180 + - `TAP_HOSTNAME` - TAP service hostname (default: localhost:2480) 181 + - `TAP_PASSWORD` - TAP admin password for authentication (optional) 182 + - `ENABLE_TAP` - Enable TAP consumer (default: true) 188 183 - `ENABLE_OPENSEARCH` - Enable OpenSearch integration 189 184 - `ENABLE_TASK_OPENSEARCH` - Enable search indexing task 190 185 - `OPENSEARCH_ENDPOINT` - OpenSearch server endpoint ··· 209 204 - Internationalization support with Fluent 210 205 - LRU caching for OAuth requests and DID documents 211 206 - Graceful shutdown with task tracking and cancellation tokens 212 - - Real-time event ingestion via Jetstream 207 + - Real-time event ingestion via TAP with backfill support 213 208 - Full-text search with OpenSearch 214 - - Webhook notifications for events and RSVPs 215 209 - Email notifications with HTML templates and confirmation workflow 216 210 - RSVP acceptance/validation workflow with tickets and records 217 211 - Private event content with conditional display criteria ··· 264 258 - Profiles (events.smokesignal.profile) 265 259 - Acceptance records (events.smokesignal.calendar.acceptance) 266 260 - Location data (community.lexicon.location) 267 - - Jetstream consumer for real-time event ingestion from AT Protocol firehose 261 + - TAP consumer for real-time event ingestion from AT Protocol with backfill 268 262 - XRPC server endpoints for search and service operations 269 263 - Service identity with did:web support for federated interactions 270 264 - AT Protocol attestation support for verified claims ··· 275 269 - `atproto-oauth-aip` - AIP OAuth implementation 276 270 - `atproto-client` - AT Protocol client functionality 277 271 - `atproto-record` - Record management 278 - - `atproto-jetstream` - Firehose event streaming 272 + - `atproto-tap` - TAP event streaming with backfill support 279 273 - `atproto-xrpcs` - XRPC server implementation 280 274 - `atproto-attestation` - Attestation verification 281 275
+3 -1
Cargo.toml
··· 27 27 atproto-client = { git = "https://tangled.org/@smokesignal.events/atproto-identity-rs" } 28 28 atproto-attestation = { git = "https://tangled.org/@smokesignal.events/atproto-identity-rs" } 29 29 atproto-identity = { git = "https://tangled.org/@smokesignal.events/atproto-identity-rs", features = ["lru", "zeroize", "hickory-dns"] } 30 - atproto-jetstream = { git = "https://tangled.org/@smokesignal.events/atproto-identity-rs" } 30 + atproto-tap = { git = "https://tangled.org/@smokesignal.events/atproto-identity-rs" } 31 31 atproto-oauth = { git = "https://tangled.org/@smokesignal.events/atproto-identity-rs", features = ["lru", "zeroize"] } 32 32 atproto-oauth-aip = { git = "https://tangled.org/@smokesignal.events/atproto-identity-rs" } 33 33 atproto-oauth-axum = { git = "https://tangled.org/@smokesignal.events/atproto-identity-rs", features = ["zeroize"] } ··· 83 83 sqlx = { version = "0.8", default-features = false, features = ["derive", "macros", "migrate", "json", "runtime-tokio", "postgres", "chrono", "tls-rustls-ring-native-roots"] } 84 84 thiserror = "2.0" 85 85 tokio = { version = "1.41", features = ["bytes", "macros", "net", "rt", "rt-multi-thread", "signal", "sync"] } 86 + tokio-stream = "0.1" 86 87 tokio-util = { version = "0.7", features = ["net", "rt", "tracing"] } 87 88 tower-http = { version = "0.6", features = ["cors", "fs", "timeout", "trace", "tracing"] } 88 89 tracing = { version = "0.1", features = ["async-await", "log", "valuable"] } ··· 92 93 url = "2.5" 93 94 urlencoding = "2.1" 94 95 base32 = "0.5.1" 96 + futures = { version = "0.3.31", default-features = false, features = ["alloc"] } 95 97 96 98 [profile.release] 97 99 opt-level = 3
+2
migrations/20251225133302_drop_identity_webhooks.sql
··· 1 + -- Drop identity_webhooks table (webhook feature removed) 2 + DROP TABLE IF EXISTS identity_webhooks;
+61 -246
src/bin/smokesignal.rs
··· 3 3 use atproto_identity::resolve::{ 4 4 HickoryDnsResolver, InnerIdentityResolver, SharedIdentityResolver, 5 5 }; 6 - use atproto_jetstream::{CancellationToken, Consumer as JetstreamConsumer, ConsumerTaskConfig}; 7 6 use atproto_oauth_axum::state::OAuthClientConfig; 8 - use smokesignal::consumer::Consumer; 9 7 use smokesignal::processor::ContentFetcher; 8 + use smokesignal::tap_processor::TapProcessor; 9 + use smokesignal::task_search_indexer::SearchIndexer; 10 + use tokio_util::sync::CancellationToken; 10 11 use smokesignal::service::{ServiceDID, ServiceKey, build_service_document}; 11 12 use smokesignal::storage::content::{CachedContentStorage, ContentStorage, FilesystemStorage}; 12 13 use smokesignal::{ ··· 19 20 storage::{ 20 21 atproto::{PostgresDidDocumentStorage, PostgresOAuthRequestStorage}, 21 22 cache::create_cache_pool, 22 - distributed_lock::DistributedLock, 23 23 }, 24 24 }; 25 25 ··· 33 33 use std::{collections::HashMap, env, str::FromStr, sync::Arc}; 34 34 use tokio::net::TcpListener; 35 35 use tokio::signal; 36 - use tokio::sync::mpsc; 37 36 use tokio_util::task::TaskTracker; 38 37 use tracing_subscriber::prelude::*; 39 38 use unic_langid::LanguageIdentifier; ··· 167 166 plc_hostname: config.plc_hostname.clone(), 168 167 }))); 169 168 170 - // Create webhook channel if webhooks are enabled 171 - let webhook_sender = if config.enable_webhooks && config.enable_task_webhooks { 172 - let (sender, receiver) = mpsc::channel(100); 173 - Some((sender, receiver)) 174 - } else { 175 - None 176 - }; 177 - 178 169 let content_storage: Arc<dyn ContentStorage> = if config.content_storage.starts_with("s3://") { 179 170 #[cfg(feature = "s3")] 180 171 { ··· 273 264 supported_languages, 274 265 locales, 275 266 content_storage.clone(), 276 - webhook_sender.as_ref().map(|(sender, _)| sender.clone()), 277 267 emailer, 278 268 service_did, 279 269 service_document, ··· 316 306 }); 317 307 } 318 308 319 - if config.enable_jetstream { 320 - // Try to acquire distributed lock for Jetstream consumer 321 - tracing::info!("Attempting to acquire Jetstream consumer lock..."); 322 - match DistributedLock::new(cache_pool.clone()).await { 323 - Ok(mut distributed_lock) => { 324 - // Try to acquire the lock with retry for up to 60 seconds 325 - // This is longer than the lock TTL (30s) to ensure any stale locks 326 - // from crashed instances will expire before we give up 327 - let acquired = match distributed_lock 328 - .acquire_with_retry(std::time::Duration::from_secs(60)) 329 - .await 330 - { 331 - Ok(acquired) => acquired, 332 - Err(e) => { 333 - tracing::error!("Failed to acquire distributed lock: {}", e); 334 - false 335 - } 336 - }; 309 + if config.enable_tap { 310 + // Create resolvers for signature verification 311 + let record_resolver = Arc::new( 312 + smokesignal::record_resolver::StorageBackedRecordResolver::new( 313 + http_client.clone(), 314 + identity_resolver.clone(), 315 + pool.clone(), 316 + ), 317 + ); 318 + let key_resolver = IdentityDocumentKeyResolver::new(identity_resolver.clone()); 337 319 338 - if !acquired { 339 - tracing::warn!( 340 - "Could not acquire Jetstream consumer lock after 60 seconds - another instance may be running" 341 - ); 342 - tracing::info!("This instance will not consume Jetstream events"); 343 - tracing::info!("If no other instance is running, the lock will expire in up to 30 seconds"); 344 - } else { 345 - tracing::debug!( 346 - "Successfully acquired Jetstream consumer lock - starting event consumption" 347 - ); 348 - 349 - // Start lock renewal task 350 - let lock_renewal_token = token.clone(); 351 - let lock_renewal_handle = 352 - distributed_lock.start_renewal_task(lock_renewal_token); 353 - 354 - // Track the renewal task 355 - tracker.spawn(lock_renewal_handle); 356 - 357 - // Now proceed with normal Jetstream consumer setup 358 - let consumer = Consumer {}; 359 - let (smoke_signal_event_handler, event_receiver) = 360 - consumer.create_handler("content_fetcher"); 361 - 362 - // Create a second handler for the search indexer if enabled 363 - let (search_indexer_handler, search_indexer_receiver) = 364 - if config.enable_opensearch && config.enable_task_opensearch { 365 - let (handler, receiver) = consumer.create_handler("search_indexer"); 366 - (Some(handler), Some(receiver)) 367 - } else { 368 - (None, None) 369 - }; 320 + // Create content fetcher 321 + let content_fetcher = ContentFetcher::new( 322 + pool.clone(), 323 + content_storage.clone(), 324 + identity_resolver.clone(), 325 + document_storage.clone(), 326 + http_client.clone(), 327 + record_resolver, 328 + key_resolver, 329 + ); 370 330 371 - // Create resolvers for signature verification 372 - let record_resolver = Arc::new( 373 - smokesignal::record_resolver::StorageBackedRecordResolver::new( 374 - http_client.clone(), 375 - identity_resolver.clone(), 376 - pool.clone(), 377 - ), 378 - ); 379 - let key_resolver = IdentityDocumentKeyResolver::new(identity_resolver.clone()); 331 + // Create search indexer if enabled 332 + let search_indexer = if config.enable_opensearch && config.enable_task_opensearch { 333 + let opensearch_endpoint = config.opensearch_endpoint.as_ref().expect( 334 + "OPENSEARCH_ENDPOINT is required when search indexing is enabled", 335 + ); 380 336 381 - let content_fetcher = ContentFetcher::new( 382 - pool.clone(), 383 - content_storage.clone(), 384 - identity_resolver.clone(), 385 - document_storage.clone(), 386 - http_client.clone(), 387 - record_resolver, 388 - key_resolver, 389 - ); 337 + match SearchIndexer::new( 338 + opensearch_endpoint, 339 + identity_resolver.clone(), 340 + document_storage.clone(), 341 + ) 342 + .await 343 + { 344 + Ok(indexer) => Some(indexer), 345 + Err(err) => { 346 + tracing::error!("Failed to create search indexer: {}", err); 347 + None 348 + } 349 + } 350 + } else { 351 + None 352 + }; 390 353 391 - let inner_token = token.clone(); 392 - tracker.spawn(async move { 393 - tokio::select! { 394 - result = content_fetcher.start_processing(event_receiver) => { 395 - if let Err(err) = result { 396 - tracing::error!(error = ?err, "failed processing stuff"); 397 - } 398 - } 399 - () = inner_token.cancelled() => { 400 - tracing::info!("content processing stopped"); 401 - } 402 - } 403 - }); 354 + // Create and spawn the unified TAP processor 355 + // No distributed lock needed - TAP manages the outbox with acks and retries, 356 + // so multiple instances can safely consume in parallel 357 + let tap_processor = TapProcessor::new( 358 + &config.tap_hostname, 359 + config.tap_password.clone(), 360 + &config.user_agent, 361 + content_fetcher, 362 + search_indexer, 363 + token.clone(), 364 + ); 404 365 405 - let inner_token = token.clone(); 406 - let inner_config = config.clone(); 407 - tracker.spawn(async move { 408 - let mut disconnect_times = Vec::new(); 409 - let disconnect_window = std::time::Duration::from_secs(60); // 1 minute window 410 - let max_disconnects_per_minute = 1; 411 - let reconnect_delay = std::time::Duration::from_secs(5); 412 - 413 - loop { 414 - // Create new consumer for each connection attempt 415 - let jetstream_config = ConsumerTaskConfig { 416 - user_agent: inner_config.user_agent.clone(), 417 - compression: false, 418 - zstd_dictionary_location: String::new(), 419 - jetstream_hostname: inner_config.jetstream_hostname.clone(), 420 - collections: vec![ 421 - "community.lexicon.calendar.rsvp".to_string(), 422 - "community.lexicon.calendar.event".to_string(), 423 - "events.smokesignal.profile".to_string(), 424 - "events.smokesignal.calendar.acceptance".to_string() 425 - ], 426 - dids: vec![], 427 - max_message_size_bytes: Some(20 * 1024 * 1024), // 10MB 428 - cursor: None, 429 - require_hello: true, 430 - }; 431 - 432 - let jetstream_consumer = JetstreamConsumer::new(jetstream_config); 433 - 434 - if let Err(err) = jetstream_consumer.register_handler(smoke_signal_event_handler.clone()).await { 435 - tracing::error!("Failed to register handler: {}", err); 436 - inner_token.cancel(); 437 - break; 438 - } 439 - 440 - // Register search indexer handler if enabled 441 - if let Some(search_handler) = &search_indexer_handler 442 - && let Err(err) = jetstream_consumer.register_handler(search_handler.clone()).await { 443 - tracing::error!("Failed to register search indexer handler: {}", err); 444 - inner_token.cancel(); 445 - break; 446 - } 447 - 448 - tokio::select! { 449 - result = jetstream_consumer.run_background(inner_token.clone()) => { 450 - if let Err(err) = result { 451 - let now = std::time::Instant::now(); 452 - disconnect_times.push(now); 453 - 454 - // Remove disconnect times older than the window 455 - disconnect_times.retain(|&t| now.duration_since(t) <= disconnect_window); 456 - 457 - if disconnect_times.len() > max_disconnects_per_minute { 458 - tracing::error!( 459 - "Jetstream disconnect rate exceeded: {} disconnects in 1 minute, exiting", 460 - disconnect_times.len() 461 - ); 462 - inner_token.cancel(); 463 - break; 464 - } 465 - 466 - tracing::error!("Jetstream disconnected: {}, reconnecting in {:?}", err, reconnect_delay); 467 - 468 - // Wait before reconnecting 469 - tokio::select! { 470 - () = tokio::time::sleep(reconnect_delay) => {}, 471 - () = inner_token.cancelled() => { 472 - tracing::info!("Jetstream consumer cancelled during reconnect delay"); 473 - break; 474 - } 475 - } 476 - 477 - // Continue the loop to reconnect 478 - continue; 479 - } 480 - } 481 - () = inner_token.cancelled() => { 482 - tracing::info!("Jetstream consumer cancelled"); 483 - break; 484 - } 485 - } 486 - 487 - // If we reach here, the consumer exited without error (unlikely) 488 - tracing::info!("Jetstream consumer exited normally"); 489 - break; 490 - 491 - } 492 - }); 493 - 494 - // Spawn search indexer task if enabled 495 - if let Some(receiver) = search_indexer_receiver { 496 - use smokesignal::task_search_indexer::SearchIndexer; 497 - 498 - let opensearch_endpoint = config.opensearch_endpoint.as_ref().expect( 499 - "OPENSEARCH_ENDPOINT is required when search indexing is enabled", 500 - ); 501 - 502 - match SearchIndexer::new( 503 - opensearch_endpoint, 504 - identity_resolver.clone(), 505 - document_storage.clone(), 506 - receiver, 507 - token.clone(), 508 - ) 509 - .await 510 - { 511 - Ok(search_indexer) => { 512 - let inner_token = token.clone(); 513 - tracker.spawn(async move { 514 - if let Err(err) = search_indexer.run().await { 515 - tracing::error!("Search indexer task failed: {}", err); 516 - } 517 - inner_token.cancel(); 518 - }); 519 - } 520 - Err(err) => { 521 - tracing::error!("Failed to create search indexer: {}", err); 522 - token.cancel(); 523 - } 524 - } 525 - } 526 - } 366 + let inner_token = token.clone(); 367 + tracker.spawn(async move { 368 + if let Err(err) = tap_processor.run().await { 369 + tracing::error!(error = ?err, "TAP processor failed"); 527 370 } 528 - Err(e) => { 529 - tracing::error!("Failed to create distributed lock: {}", e); 530 - tracing::warn!("Jetstream consumer disabled - could not create lock"); 531 - // Continue without Jetstream if we can't create the lock 532 - // This allows the application to still function without event consumption 533 - } 534 - } 371 + inner_token.cancel(); 372 + }); 535 373 } 536 374 537 375 // Spawn OAuth requests cleanup task if enabled ··· 546 384 tracker.spawn(async move { 547 385 if let Err(err) = cleanup_task.run().await { 548 386 tracing::error!("OAuth requests cleanup task failed: {}", err); 549 - } 550 - inner_token.cancel(); 551 - }); 552 - } 553 - 554 - // Spawn webhook processor task if enabled 555 - if let Some((_, receiver)) = webhook_sender { 556 - use axum::extract::FromRef; 557 - use smokesignal::task_webhooks::WebhookProcessor; 558 - 559 - let webhook_processor = WebhookProcessor::new( 560 - pool.clone(), 561 - document_storage.clone(), 562 - ServiceDID::from_ref(&web_context), 563 - ServiceKey::from_ref(&web_context), 564 - receiver, 565 - token.clone(), 566 - ); 567 - 568 - let inner_token = token.clone(); 569 - tracker.spawn(async move { 570 - if let Err(err) = webhook_processor.run().await { 571 - tracing::error!("Webhook processor task failed: {}", err); 572 387 } 573 388 inner_token.cancel(); 574 389 });
+14 -17
src/config.rs
··· 53 53 pub user_agent: String, 54 54 pub database_url: String, 55 55 pub plc_hostname: String, 56 - pub jetstream_hostname: String, 56 + pub tap_hostname: String, 57 + pub tap_password: Option<String>, 57 58 pub redis_url: String, 58 59 pub admin_dids: AdminDIDs, 59 60 pub dns_nameservers: DnsNameservers, 60 61 pub oauth_backend: OAuthBackendConfig, 61 62 pub enable_task_oauth_requests_cleanup: bool, 62 63 pub enable_task_identity_refresh: bool, 63 - pub enable_jetstream: bool, 64 + pub enable_tap: bool, 64 65 pub content_storage: String, 65 - pub enable_webhooks: bool, 66 - pub enable_task_webhooks: bool, 67 66 pub service_key: ServiceKey, 68 67 pub enable_opensearch: bool, 69 68 pub enable_task_opensearch: bool, ··· 97 96 98 97 let plc_hostname = default_env("PLC_HOSTNAME", "plc.directory"); 99 98 100 - let jetstream_hostname = 101 - default_env("JETSTREAM_HOSTNAME", "jetstream2.us-east.bsky.network"); 99 + let tap_hostname = default_env("TAP_HOSTNAME", "localhost:2480"); 100 + let tap_password_str = optional_env("TAP_PASSWORD"); 101 + let tap_password = if tap_password_str.is_empty() { 102 + None 103 + } else { 104 + Some(tap_password_str) 105 + }; 102 106 103 107 let database_url = default_env("DATABASE_URL", "sqlite://development.db"); 104 108 ··· 139 143 .parse::<bool>() 140 144 .unwrap_or(true); 141 145 142 - let enable_jetstream = default_env("ENABLE_JETSTREAM", "true") 143 - .parse::<bool>() 144 - .unwrap_or(true); 146 + let enable_tap = parse_bool_env("ENABLE_TAP", true); 145 147 146 148 let content_storage = require_env("CONTENT_STORAGE")?; 147 - 148 - // Parse webhook enablement flags 149 - let enable_webhooks = parse_bool_env("ENABLE_WEBHOOKS", false); 150 - let enable_task_webhooks = parse_bool_env("ENABLE_TASK_WEBHOOKS", false); 151 149 152 150 let service_key: ServiceKey = require_env("SERVICE_KEY")?.try_into()?; 153 151 ··· 199 197 certificate_bundles, 200 198 user_agent, 201 199 plc_hostname, 202 - jetstream_hostname, 200 + tap_hostname, 201 + tap_password, 203 202 database_url, 204 203 http_cookie_key, 205 204 redis_url, ··· 208 207 oauth_backend, 209 208 enable_task_oauth_requests_cleanup, 210 209 enable_task_identity_refresh, 211 - enable_jetstream, 210 + enable_tap, 212 211 content_storage, 213 - enable_webhooks, 214 - enable_task_webhooks, 215 212 service_key, 216 213 enable_opensearch, 217 214 enable_task_opensearch,
-107
src/consumer.rs
··· 1 - use anyhow::Result; 2 - use async_trait::async_trait; 3 - use atproto_jetstream::{EventHandler, JetstreamEvent}; 4 - use std::sync::Arc; 5 - use tokio::sync::mpsc; 6 - 7 - use crate::atproto::lexicon::{acceptance::NSID as ACCEPTANCE_NSID, profile::NSID as PROFILE_NSID}; 8 - use atproto_record::lexicon::community::lexicon::{ 9 - calendar::event::NSID as EVENT_NSID, calendar::rsvp::NSID as RSVP_NSID, 10 - }; 11 - 12 - pub type SmokeSignalEventReceiver = mpsc::UnboundedReceiver<SmokeSignalEvent>; 13 - 14 - #[derive(Debug, Clone)] 15 - pub enum SmokeSignalEvent { 16 - Commit { 17 - did: String, 18 - collection: String, 19 - rkey: String, 20 - cid: String, 21 - record: serde_json::Value, 22 - }, 23 - Delete { 24 - did: String, 25 - collection: String, 26 - rkey: String, 27 - }, 28 - } 29 - 30 - pub struct SmokeSignalEventHandler { 31 - event_sender: mpsc::UnboundedSender<SmokeSignalEvent>, 32 - handler_id: String, 33 - } 34 - 35 - impl SmokeSignalEventHandler { 36 - fn new(event_sender: mpsc::UnboundedSender<SmokeSignalEvent>, handler_id: String) -> Self { 37 - Self { 38 - event_sender, 39 - handler_id, 40 - } 41 - } 42 - } 43 - 44 - #[async_trait] 45 - impl EventHandler for SmokeSignalEventHandler { 46 - async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 47 - let incoming_event = match event { 48 - JetstreamEvent::Commit { did, commit, .. } => { 49 - if commit.collection != RSVP_NSID 50 - && commit.collection != EVENT_NSID 51 - && commit.collection != PROFILE_NSID 52 - && commit.collection != ACCEPTANCE_NSID 53 - { 54 - return Ok(()); 55 - } 56 - 57 - SmokeSignalEvent::Commit { 58 - did, 59 - collection: commit.collection, 60 - rkey: commit.rkey, 61 - cid: commit.cid, 62 - record: commit.record, 63 - } 64 - } 65 - JetstreamEvent::Delete { did, commit, .. } => { 66 - if commit.collection != RSVP_NSID 67 - && commit.collection != EVENT_NSID 68 - && commit.collection != PROFILE_NSID 69 - && commit.collection != ACCEPTANCE_NSID 70 - { 71 - return Ok(()); 72 - } 73 - SmokeSignalEvent::Delete { 74 - did, 75 - collection: commit.collection, 76 - rkey: commit.rkey, 77 - } 78 - } 79 - JetstreamEvent::Identity { .. } | JetstreamEvent::Account { .. } => { 80 - return Ok(()); 81 - } 82 - }; 83 - 84 - if let Err(err) = self.event_sender.send(incoming_event) { 85 - tracing::error!(error = ?err, "error sending smokesignal event to processing"); 86 - } 87 - 88 - Ok(()) 89 - } 90 - 91 - fn handler_id(&self) -> String { 92 - self.handler_id.clone() 93 - } 94 - } 95 - 96 - pub struct Consumer {} 97 - 98 - impl Consumer { 99 - pub fn create_handler( 100 - &self, 101 - handler_id: &str, 102 - ) -> (Arc<SmokeSignalEventHandler>, SmokeSignalEventReceiver) { 103 - let (sender, receiver) = mpsc::unbounded_channel(); 104 - let handler = Arc::new(SmokeSignalEventHandler::new(sender, handler_id.to_string())); 105 - (handler, receiver) 106 - } 107 - }
-5
src/http/context.rs
··· 14 14 use minijinja::context as template_context; 15 15 use std::convert::Infallible; 16 16 use std::{ops::Deref, sync::Arc}; 17 - use tokio::sync::mpsc; 18 17 use unic_langid::LanguageIdentifier; 19 18 20 19 #[cfg(all(feature = "reload", not(feature = "embed")))] ··· 34 33 use crate::mcp::session::McpSessionManager; 35 34 use crate::service::{ServiceDID, ServiceDocument, ServiceKey}; 36 35 use crate::storage::content::ContentStorage; 37 - use crate::task_webhooks::TaskWork; 38 36 use crate::{ 39 37 config::Config, 40 38 http::middleware_auth::Auth, ··· 62 60 pub(crate) oauth_storage: Arc<dyn OAuthRequestStorage>, 63 61 pub(crate) document_storage: Arc<dyn DidDocumentStorage>, 64 62 pub(crate) content_storage: Arc<dyn ContentStorage>, 65 - pub(crate) webhook_sender: Option<mpsc::Sender<TaskWork>>, 66 63 pub(crate) emailer: Option<Arc<dyn Emailer>>, 67 64 pub(crate) service_did: ServiceDID, 68 65 pub(crate) service_document: ServiceDocument, ··· 98 95 supported_languages: Vec<LanguageIdentifier>, 99 96 locales: Locales, 100 97 content_storage: Arc<dyn ContentStorage>, 101 - webhook_sender: Option<mpsc::Sender<TaskWork>>, 102 98 emailer: Option<Arc<dyn Emailer>>, 103 99 service_did: ServiceDID, 104 100 service_document: ServiceDocument, ··· 122 118 oauth_storage, 123 119 document_storage, 124 120 content_storage, 125 - webhook_sender, 126 121 emailer, 127 122 service_did, 128 123 service_document,
-58
src/http/handle_create_event.rs
··· 33 33 use crate::http::utils::url_from_aturi; 34 34 use crate::select_template; 35 35 use crate::storage::event::{EventInsertParams, event_insert_with_metadata}; 36 - use crate::storage::webhook::webhook_list_enabled_by_did; 37 - use crate::task_webhooks::TaskWork; 38 36 use atproto_client::com::atproto::repo::{ 39 37 CreateRecordRequest, CreateRecordResponse, create_record, 40 38 }; ··· 468 466 } 469 467 } 470 468 471 - // Send webhooks if enabled 472 - if web_context.config.enable_webhooks 473 - && let Some(webhook_sender) = &web_context.webhook_sender 474 - { 475 - // Get all enabled webhooks for the user 476 - if let Ok(webhooks) = 477 - webhook_list_enabled_by_did(&web_context.pool, &current_handle.did).await 478 - { 479 - // Prepare context with email if shared 480 - let context = json!({}); 481 - 482 - let record_json = json!({ 483 - "uri": &create_record_response.uri, 484 - "cit": &create_record_response.cid, 485 - }); 486 - 487 - // Send webhook for each enabled webhook 488 - for webhook in webhooks { 489 - let _ = webhook_sender 490 - .send(TaskWork::EventCreated { 491 - identity: current_handle.did.clone(), 492 - service: webhook.service, 493 - record: record_json.clone(), 494 - context: context.clone(), 495 - }) 496 - .await; 497 - } 498 - } 499 - } 500 - 501 469 let event_url = url_from_aturi( 502 470 &web_context.config.external_base, 503 471 &create_record_response.uri, ··· 828 796 ); 829 797 // Don't fail the request - the event was created successfully 830 798 // The header can be fetched later via jetstream or admin import 831 - } 832 - } 833 - 834 - // Send webhooks if enabled 835 - if web_context.config.enable_webhooks 836 - && let Some(webhook_sender) = &web_context.webhook_sender 837 - { 838 - if let Ok(webhooks) = 839 - webhook_list_enabled_by_did(&web_context.pool, &current_handle.did).await 840 - { 841 - let context = json!({}); 842 - let record_json = json!({ 843 - "uri": &create_record_response.uri, 844 - "cid": &create_record_response.cid, 845 - }); 846 - 847 - for webhook in webhooks { 848 - let _ = webhook_sender 849 - .send(TaskWork::EventCreated { 850 - identity: current_handle.did.clone(), 851 - service: webhook.service, 852 - record: record_json.clone(), 853 - context: context.clone(), 854 - }) 855 - .await; 856 - } 857 799 } 858 800 } 859 801
-38
src/http/handle_create_rsvp.rs
··· 28 28 }, 29 29 select_template, 30 30 storage::event::{RsvpInsertParams, rsvp_get_by_event_and_did, rsvp_insert_with_metadata}, 31 - storage::webhook::webhook_list_enabled_by_did, 32 - task_webhooks::TaskWork, 33 31 }; 34 32 use atproto_client::com::atproto::repo::{PutRecordRequest, PutRecordResponse, put_record}; 35 33 use atproto_record::lexicon::community::lexicon::calendar::rsvp::{NSID, Rsvp, RsvpStatus}; ··· 447 445 } 448 446 } 449 447 } 450 - } 451 - } 452 - } 453 - 454 - // Send webhooks if enabled 455 - if web_context.config.enable_webhooks 456 - && let Some(webhook_sender) = &web_context.webhook_sender 457 - { 458 - let webhook_identity = 459 - ATURI::from_str(build_rsvp_form.subject_aturi.as_ref().unwrap()) 460 - .map(|value| value.authority) 461 - .unwrap_or_default(); 462 - 463 - // Get all enabled webhooks for the user 464 - if let Ok(webhooks) = 465 - webhook_list_enabled_by_did(&web_context.pool, &webhook_identity).await 466 - { 467 - // Prepare context (empty - email sharing removed) 468 - let context = serde_json::json!({}); 469 - 470 - // Convert the RSVP record to JSON 471 - let record_json = serde_json::json!({ 472 - "uri": &create_record_result.uri, 473 - "cid": &create_record_result.cid, 474 - }); 475 - 476 - // Send webhook for each enabled webhook 477 - for webhook in webhooks { 478 - let _ = webhook_sender 479 - .send(TaskWork::RSVPCreated { 480 - identity: current_handle.did.clone(), 481 - service: webhook.service, 482 - record: record_json.clone(), 483 - context: context.clone(), 484 - }) 485 - .await; 486 448 } 487 449 } 488 450 }
-290
src/http/handle_settings.rs
··· 32 32 notification_get, notification_reset_confirmation, notification_set_preference, 33 33 }, 34 34 profile::{profile_get_by_did, profile_insert}, 35 - webhook::{webhook_delete, webhook_list_by_did, webhook_toggle_enabled, webhook_upsert}, 36 35 }, 37 - task_webhooks::TaskWork, 38 - webhooks::SMOKE_SIGNAL_AUTOMATION_SERVICE, 39 36 }; 40 37 41 38 #[derive(Deserialize, Clone, Debug)] ··· 51 48 #[derive(Deserialize, Clone, Debug)] 52 49 pub(crate) struct EmailForm { 53 50 email: Option<String>, 54 - } 55 - 56 - #[derive(Deserialize, Clone, Debug)] 57 - pub(crate) struct WebhookForm { 58 - service: String, 59 51 } 60 52 61 53 #[derive(Deserialize, Clone, Debug)] ··· 106 98 .map(|lang| lang.to_string()) 107 99 .collect::<Vec<String>>(); 108 100 109 - // Get webhooks if enabled 110 - let webhooks = if web_context.config.enable_webhooks { 111 - webhook_list_by_did(&web_context.pool, &current_handle.did).await? 112 - } else { 113 - vec![] 114 - }; 115 - 116 101 // Get profile data if it exists 117 102 let profile_record = profile_get_by_did(&web_context.pool, &current_handle.did).await?; 118 103 let (profile, profile_display_name, profile_description, profile_host) = if let Some(prof_rec) = ··· 171 156 template_context! { 172 157 timezones => timezones, 173 158 languages => supported_languages, 174 - webhooks => webhooks, 175 - webhooks_enabled => web_context.config.enable_webhooks, 176 159 profile, 177 160 profile_display_name, 178 161 profile_description, ··· 399 382 current_handle, 400 383 email_updated => true, 401 384 ..default_context 402 - }, 403 - ), 404 - ) 405 - .into_response()) 406 - } 407 - 408 - pub(crate) async fn handle_add_webhook( 409 - State(web_context): State<WebContext>, 410 - identity_resolver: State<Arc<dyn IdentityResolver>>, 411 - Language(language): Language, 412 - Cached(auth): Cached<Auth>, 413 - Form(webhook_form): Form<WebhookForm>, 414 - ) -> Result<impl IntoResponse, WebError> { 415 - // Check if webhooks are enabled 416 - if !web_context.config.enable_webhooks { 417 - return Ok((StatusCode::NOT_FOUND, "Not found").into_response()); 418 - } 419 - 420 - let current_handle = auth.require_flat()?; 421 - 422 - let default_context = template_context! { 423 - current_handle => current_handle.clone(), 424 - language => language.to_string(), 425 - }; 426 - 427 - let error_template = select_template!(false, true, language); 428 - 429 - // Validate service is not empty 430 - if webhook_form.service.is_empty() { 431 - return contextual_error!( 432 - web_context, 433 - language, 434 - error_template, 435 - default_context, 436 - "error-smokesignal-settings-3 Service cannot be empty" 437 - ); 438 - } 439 - 440 - // Check if service contains required the suffix 441 - if !webhook_form 442 - .service 443 - .ends_with(SMOKE_SIGNAL_AUTOMATION_SERVICE) 444 - { 445 - return contextual_error!( 446 - web_context, 447 - language, 448 - error_template, 449 - default_context, 450 - "error-smokesignal-settings-4 Only SmokeSignalAutomation services are supported" 451 - ); 452 - } 453 - 454 - // Extract DID by removing the suffix 455 - let service_did = webhook_form 456 - .service 457 - .strip_suffix(SMOKE_SIGNAL_AUTOMATION_SERVICE) 458 - .unwrap(); 459 - 460 - // Resolve the service DID using the identity resolver 461 - let document = match identity_resolver.resolve(service_did).await { 462 - Ok(doc) => doc, 463 - Err(err) => { 464 - tracing::error!(?err, "Failed to resolve service DID: {}", service_did); 465 - return contextual_error!( 466 - web_context, 467 - language, 468 - error_template, 469 - default_context, 470 - format!("error-smokesignal-webhook-1 DID resolution failed: {}", err) 471 - ); 472 - } 473 - }; 474 - 475 - // Store the resolved document 476 - if let Err(err) = web_context 477 - .document_storage 478 - .store_document(document.clone()) 479 - .await 480 - { 481 - tracing::error!(?err, "Failed to store DID document for: {}", service_did); 482 - return contextual_error!( 483 - web_context, 484 - language, 485 - error_template, 486 - default_context, 487 - format!( 488 - "error-smokesignal-webhook-2 Document storage failed: {}", 489 - err 490 - ) 491 - ); 492 - } 493 - 494 - if let Err(err) = webhook_upsert( 495 - &web_context.pool, 496 - &current_handle.did, 497 - &webhook_form.service, 498 - ) 499 - .await 500 - { 501 - tracing::error!(?err, "error inserting webhook?"); 502 - return contextual_error!(web_context, language, error_template, default_context, err); 503 - } 504 - tracing::info!("webhook added?"); 505 - 506 - // Trigger HTMX refresh 507 - match handle_list_webhooks(State(web_context), Language(language), Cached(auth)).await { 508 - Ok(response) => Ok(response.into_response()), 509 - Err(err) => Err(err), 510 - } 511 - } 512 - 513 - pub(crate) async fn handle_toggle_webhook( 514 - State(web_context): State<WebContext>, 515 - Language(language): Language, 516 - Cached(auth): Cached<Auth>, 517 - Form(webhook_form): Form<WebhookForm>, 518 - ) -> Result<impl IntoResponse, WebError> { 519 - // Check if webhooks are enabled 520 - if !web_context.config.enable_webhooks { 521 - return Ok((StatusCode::NOT_FOUND, "Not found").into_response()); 522 - } 523 - 524 - let current_handle = auth.require_flat()?; 525 - 526 - let default_context = template_context! { 527 - current_handle => current_handle.clone(), 528 - language => language.to_string(), 529 - }; 530 - 531 - let error_template = select_template!(false, true, language); 532 - 533 - // Toggle webhook in database7 534 - if let Err(err) = webhook_toggle_enabled( 535 - &web_context.pool, 536 - &current_handle.did, 537 - &webhook_form.service, 538 - ) 539 - .await 540 - { 541 - return contextual_error!(web_context, language, error_template, default_context, err); 542 - } 543 - 544 - // Trigger HTMX refresh 545 - match handle_list_webhooks(State(web_context), Language(language), Cached(auth)).await { 546 - Ok(response) => Ok(response.into_response()), 547 - Err(err) => Err(err), 548 - } 549 - } 550 - 551 - pub(crate) async fn handle_test_webhook( 552 - State(web_context): State<WebContext>, 553 - Language(language): Language, 554 - Cached(auth): Cached<Auth>, 555 - Form(webhook_form): Form<WebhookForm>, 556 - ) -> Result<impl IntoResponse, WebError> { 557 - // Check if webhooks are enabled 558 - if !web_context.config.enable_webhooks { 559 - return Ok((StatusCode::NOT_FOUND, "Not found").into_response()); 560 - } 561 - 562 - let current_handle = auth.require_flat()?; 563 - 564 - let default_context = template_context! { 565 - current_handle => current_handle.clone(), 566 - language => language.to_string(), 567 - }; 568 - 569 - let error_template = select_template!(false, true, language); 570 - 571 - // Send test webhook 572 - if let Some(webhook_sender) = &web_context.webhook_sender { 573 - if let Err(err) = webhook_sender 574 - .send(TaskWork::Test { 575 - identity: current_handle.did.clone(), 576 - service: webhook_form.service.clone(), 577 - }) 578 - .await 579 - { 580 - return contextual_error!( 581 - web_context, 582 - language, 583 - error_template, 584 - default_context, 585 - format!("Failed to send webhook: {}", err) 586 - ); 587 - } 588 - } else { 589 - return contextual_error!( 590 - web_context, 591 - language, 592 - error_template, 593 - default_context, 594 - "Webhook processing is not enabled" 595 - ); 596 - } 597 - 598 - // Trigger HTMX refresh 599 - match handle_list_webhooks(State(web_context), Language(language), Cached(auth)).await { 600 - Ok(response) => Ok(response.into_response()), 601 - Err(err) => Err(err), 602 - } 603 - } 604 - 605 - pub(crate) async fn handle_remove_webhook( 606 - State(web_context): State<WebContext>, 607 - Language(language): Language, 608 - Cached(auth): Cached<Auth>, 609 - Form(webhook_form): Form<WebhookForm>, 610 - ) -> Result<impl IntoResponse, WebError> { 611 - // Check if webhooks are enabled 612 - if !web_context.config.enable_webhooks { 613 - return Ok((StatusCode::NOT_FOUND, "Not found").into_response()); 614 - } 615 - 616 - let current_handle = auth.require_flat()?; 617 - 618 - let default_context = template_context! { 619 - current_handle => current_handle.clone(), 620 - language => language.to_string(), 621 - }; 622 - 623 - let error_template = select_template!(false, true, language); 624 - 625 - // Remove webhook from database 626 - if let Err(err) = webhook_delete( 627 - &web_context.pool, 628 - &current_handle.did, 629 - &webhook_form.service, 630 - ) 631 - .await 632 - { 633 - return contextual_error!(web_context, language, error_template, default_context, err); 634 - } 635 - 636 - // Trigger HTMX refresh 637 - match handle_list_webhooks(State(web_context), Language(language), Cached(auth)).await { 638 - Ok(response) => Ok(response.into_response()), 639 - Err(err) => Err(err), 640 - } 641 - } 642 - 643 - pub(crate) async fn handle_list_webhooks( 644 - State(web_context): State<WebContext>, 645 - Language(language): Language, 646 - Cached(auth): Cached<Auth>, 647 - ) -> Result<impl IntoResponse, WebError> { 648 - // Check if webhooks are enabled 649 - if !web_context.config.enable_webhooks { 650 - return Ok((StatusCode::NOT_FOUND, "Not found").into_response()); 651 - } 652 - 653 - let current_handle = auth.require("/settings")?; 654 - 655 - let render_template = format!( 656 - "{}/settings.webhooks.html", 657 - language.to_string().to_lowercase() 658 - ); 659 - 660 - // Get webhooks 661 - let webhooks = webhook_list_by_did(&web_context.pool, &current_handle.did) 662 - .await 663 - .unwrap_or_default(); 664 - 665 - Ok(( 666 - StatusCode::OK, 667 - RenderHtml( 668 - &render_template, 669 - web_context.engine.clone(), 670 - template_context! { 671 - current_handle => current_handle.clone(), 672 - language => language.to_string(), 673 - webhooks => webhooks, 674 - webhooks_enabled => web_context.config.enable_webhooks, 675 385 }, 676 386 ), 677 387 )
+3 -8
src/http/server.rs
··· 82 82 handle_search::handle_search, 83 83 handle_set_language::handle_set_language, 84 84 handle_settings::{ 85 - handle_add_webhook, handle_email_update, handle_language_update, handle_list_webhooks, 85 + handle_email_update, handle_language_update, 86 86 handle_notification_email_update, handle_notification_preferences_update, 87 - handle_profile_update, handle_remove_webhook, handle_settings, handle_test_webhook, 88 - handle_timezone_update, handle_toggle_webhook, 87 + handle_profile_update, handle_settings, 88 + handle_timezone_update, 89 89 }, 90 90 handle_unaccept_rsvp::handle_unaccept_rsvp, 91 91 handle_unsubscribe::handle_unsubscribe, ··· 266 266 post(upload_profile_banner).layer(DefaultBodyLimit::max(5 * 1024 * 1024)), // 5MB limit 267 267 ) 268 268 .route("/settings/banner/delete", post(delete_profile_banner)) 269 - .route("/settings/webhooks", get(handle_list_webhooks)) 270 - .route("/settings/webhooks/add", post(handle_add_webhook)) 271 - .route("/settings/webhooks/toggle", post(handle_toggle_webhook)) 272 - .route("/settings/webhooks/test", post(handle_test_webhook)) 273 - .route("/settings/webhooks/remove", post(handle_remove_webhook)) 274 269 .route( 275 270 "/settings/notifications/email", 276 271 post(handle_notification_email_update),
+1 -4
src/lib.rs
··· 2 2 pub mod atproto; 3 3 pub mod config; 4 4 pub mod config_errors; 5 - pub mod consumer; 6 5 pub mod email_confirmation; 7 6 pub mod email_errors; 8 7 pub mod email_templates; ··· 27 26 pub mod service; 28 27 pub mod stats; 29 28 pub mod storage; 29 + pub mod tap_processor; 30 30 pub mod task_identity_refresh; 31 31 pub mod task_oauth_requests_cleanup; 32 32 pub mod task_search_indexer; 33 33 pub mod task_search_indexer_errors; 34 - pub mod task_webhooks; 35 - pub mod task_webhooks_errors; 36 34 pub mod throttle; 37 35 pub mod throttle_redis; 38 36 pub mod unsubscribe_token; 39 37 pub mod unsubscribe_token_errors; 40 - pub mod webhooks;
+107 -89
src/processor.rs
··· 5 5 use atproto_identity::model::Document; 6 6 use atproto_identity::resolve::IdentityResolver; 7 7 use atproto_identity::traits::DidDocumentStorage; 8 + use futures::future::join_all; 8 9 use image::GenericImageView; 9 10 use image::ImageFormat; 10 11 use serde_json::Value; ··· 13 14 use crate::atproto::lexicon::acceptance::NSID as AcceptanceNSID; 14 15 use crate::atproto::lexicon::acceptance::TypedAcceptance; 15 16 use crate::atproto::lexicon::profile::{NSID as ProfileNSID, Profile}; 16 - use crate::consumer::SmokeSignalEvent; 17 - use crate::consumer::SmokeSignalEventReceiver; 18 17 use crate::processor_errors::ProcessorError; 19 18 use crate::storage::StoragePool; 20 19 use crate::storage::acceptance::{ ··· 82 81 Ok(document) 83 82 } 84 83 85 - pub async fn start_processing( 84 + /// Handle a commit event (create or update). 85 + /// 86 + /// The `live` flag indicates whether this is a live event (true) or backfill (false). 87 + /// Takes ownership of `record` to avoid cloning during deserialization. 88 + pub async fn handle_commit( 86 89 &self, 87 - mut event_receiver: SmokeSignalEventReceiver, 90 + did: &str, 91 + collection: &str, 92 + rkey: &str, 93 + cid: &str, 94 + record: Value, 95 + live: bool, 88 96 ) -> Result<()> { 89 - tracing::info!("content fetcher started"); 90 - 91 - while let Some(event) = event_receiver.recv().await { 92 - match &event { 93 - SmokeSignalEvent::Commit { 94 - did, 95 - collection, 96 - cid, 97 - record, 98 - rkey, 99 - .. 100 - } => { 101 - let result = match collection.as_str() { 102 - "community.lexicon.calendar.event" => { 103 - self.handle_event_commit(did, rkey, cid, record).await 104 - } 105 - "community.lexicon.calendar.rsvp" => { 106 - self.handle_rsvp_commit(did, rkey, cid, record).await 107 - } 108 - "events.smokesignal.profile" => { 109 - self.handle_profile_commit(did, rkey, cid, record).await 110 - } 111 - "events.smokesignal.calendar.acceptance" => { 112 - self.handle_acceptance_commit(did, rkey, cid, record).await 113 - } 114 - _ => Ok(()), 115 - }; 116 - if let Err(e) = result { 117 - tracing::error!(error = ?e, "error handling commit"); 118 - } 119 - } 120 - SmokeSignalEvent::Delete { 121 - did, 122 - collection, 123 - rkey, 124 - .. 125 - } => { 126 - let result = match collection.as_str() { 127 - "community.lexicon.calendar.event" => { 128 - self.handle_event_delete(did, rkey).await 129 - } 130 - "community.lexicon.calendar.rsvp" => { 131 - self.handle_rsvp_delete(did, rkey).await 132 - } 133 - "events.smokesignal.profile" => self.handle_profile_delete(did, rkey).await, 134 - "events.smokesignal.calendar.acceptance" => { 135 - self.handle_acceptance_delete(did, rkey).await 136 - } 137 - _ => Ok(()), 138 - }; 139 - if let Err(e) = result { 140 - tracing::error!(error = ?e, "error handling delete"); 141 - } 142 - } 97 + match collection { 98 + "community.lexicon.calendar.event" => { 99 + self.handle_event_commit(did, rkey, cid, record, live) 100 + .await 101 + } 102 + "community.lexicon.calendar.rsvp" => { 103 + self.handle_rsvp_commit(did, rkey, cid, record, live).await 104 + } 105 + "events.smokesignal.profile" => { 106 + self.handle_profile_commit(did, rkey, cid, record).await 107 + } 108 + "events.smokesignal.calendar.acceptance" => { 109 + self.handle_acceptance_commit(did, rkey, cid, record).await 143 110 } 111 + _ => Ok(()), 144 112 } 113 + } 145 114 146 - tracing::info!("content fetcher finished"); 147 - Ok(()) 115 + /// Handle a delete event. 116 + /// 117 + /// The `live` flag indicates whether this is a live event (true) or backfill (false). 118 + pub async fn handle_delete( 119 + &self, 120 + did: &str, 121 + collection: &str, 122 + rkey: &str, 123 + live: bool, 124 + ) -> Result<()> { 125 + match collection { 126 + "community.lexicon.calendar.event" => { 127 + self.handle_event_delete(did, rkey, live).await 128 + } 129 + "community.lexicon.calendar.rsvp" => self.handle_rsvp_delete(did, rkey, live).await, 130 + "events.smokesignal.profile" => self.handle_profile_delete(did, rkey).await, 131 + "events.smokesignal.calendar.acceptance" => { 132 + self.handle_acceptance_delete(did, rkey).await 133 + } 134 + _ => Ok(()), 135 + } 148 136 } 149 137 150 138 async fn handle_event_commit( ··· 152 140 did: &str, 153 141 rkey: &str, 154 142 cid: &str, 155 - record: &Value, 143 + record: Value, 144 + _live: bool, 156 145 ) -> Result<()> { 157 146 tracing::info!("Processing event: {} for {}", rkey, did); 158 147 159 148 let aturi = format!("at://{did}/{LexiconCommunityEventNSID}/{rkey}"); 160 149 161 - let event_record: Event = serde_json::from_value(record.clone())?; 150 + let event_record: Event = serde_json::from_value(record)?; 162 151 163 152 let document = self.ensure_identity_stored(did).await?; 164 153 let pds_endpoints = document.pds_endpoints(); ··· 184 173 185 174 let all_media = event_record.media; 186 175 187 - for media in &all_media { 188 - if let Err(err) = self.download_media(pds_endpoint, did, media).await { 189 - tracing::error!(error = ?err, "failed processing image"); 176 + // Download all media items in parallel 177 + if !all_media.is_empty() { 178 + let download_futures: Vec<_> = all_media 179 + .iter() 180 + .map(|media| self.download_media(pds_endpoint, did, media)) 181 + .collect(); 182 + 183 + let results = join_all(download_futures).await; 184 + 185 + for result in results { 186 + if let Err(err) = result { 187 + tracing::error!(error = ?err, "failed processing image"); 188 + } 190 189 } 191 190 } 192 191 ··· 198 197 did: &str, 199 198 rkey: &str, 200 199 cid: &str, 201 - record: &Value, 200 + record: Value, 201 + _live: bool, 202 202 ) -> Result<()> { 203 203 tracing::info!("Processing rsvp: {} for {}", rkey, did); 204 204 205 205 let aturi = format!("at://{did}/{LexiconCommunityRSVPNSID}/{rkey}"); 206 206 207 - let rsvp_record: Rsvp = serde_json::from_value(record.clone())?; 207 + let rsvp_record: Rsvp = serde_json::from_value(record)?; 208 208 209 209 let event_aturi = rsvp_record.subject.uri.clone(); 210 210 let event_cid = rsvp_record.subject.cid.clone(); ··· 272 272 Ok(()) 273 273 } 274 274 275 - async fn handle_event_delete(&self, did: &str, rkey: &str) -> Result<()> { 275 + async fn handle_event_delete(&self, did: &str, rkey: &str, _live: bool) -> Result<()> { 276 276 let aturi = format!("at://{did}/{LexiconCommunityEventNSID}/{rkey}"); 277 277 278 278 event_delete(&self.pool, &aturi).await?; ··· 280 280 Ok(()) 281 281 } 282 282 283 - async fn handle_rsvp_delete(&self, did: &str, rkey: &str) -> Result<()> { 283 + async fn handle_rsvp_delete(&self, did: &str, rkey: &str, _live: bool) -> Result<()> { 284 284 let aturi = format!("at://{did}/{LexiconCommunityEventNSID}/{rkey}"); 285 285 286 286 rsvp_delete(&self.pool, &aturi).await?; ··· 293 293 did: &str, 294 294 rkey: &str, 295 295 cid: &str, 296 - record: &Value, 296 + record: Value, 297 297 ) -> Result<()> { 298 298 tracing::info!("Processing profile: {} for {}", rkey, did); 299 299 ··· 305 305 return Ok(()); 306 306 } 307 307 308 - let profile_record: Profile = serde_json::from_value(record.clone())?; 308 + let profile_record: Profile = serde_json::from_value(record)?; 309 309 310 310 // Get the identity to resolve the handle for display_name fallback and PDS endpoint 311 311 let document = self.ensure_identity_stored(did).await?; ··· 325 325 326 326 profile_insert(&self.pool, &aturi, cid, did, display_name, &profile_record).await?; 327 327 328 - // Download avatar and banner blobs if present 328 + // Download avatar and banner blobs if present (in parallel) 329 329 let pds_endpoints = document.pds_endpoints(); 330 330 if let Some(pds_endpoint) = pds_endpoints.first() { 331 - // Download avatar if present 332 - if let Some(ref avatar) = profile_record.avatar 333 - && let Err(e) = self.download_avatar(pds_endpoint, did, avatar).await 334 - { 331 + // Create futures for avatar and banner downloads 332 + let avatar_future = async { 333 + if let Some(ref avatar) = profile_record.avatar { 334 + self.download_avatar(pds_endpoint, did, avatar).await 335 + } else { 336 + Ok(()) 337 + } 338 + }; 339 + 340 + let banner_future = async { 341 + if let Some(ref banner) = profile_record.banner { 342 + self.download_banner(pds_endpoint, did, banner).await 343 + } else { 344 + Ok(()) 345 + } 346 + }; 347 + 348 + // Download both concurrently 349 + let (avatar_result, banner_result) = tokio::join!(avatar_future, banner_future); 350 + 351 + if let Err(e) = avatar_result { 335 352 tracing::warn!( 336 353 error = ?e, 337 354 did = %did, ··· 339 356 ); 340 357 } 341 358 342 - // Download banner if present 343 - if let Some(ref banner) = profile_record.banner 344 - && let Err(e) = self.download_banner(pds_endpoint, did, banner).await 345 - { 359 + if let Err(e) = banner_result { 346 360 tracing::warn!( 347 361 error = ?e, 348 362 did = %did, ··· 367 381 did: &str, 368 382 rkey: &str, 369 383 cid: &str, 370 - record: &Value, 384 + record: Value, 371 385 ) -> Result<()> { 372 386 tracing::info!("Processing acceptance: {} for {}", rkey, did); 373 387 374 388 let aturi = format!("at://{did}/{AcceptanceNSID}/{rkey}"); 375 389 376 390 // Deserialize and validate the acceptance record 377 - let acceptance_record: TypedAcceptance = serde_json::from_value(record.clone())?; 391 + let acceptance_record: TypedAcceptance = serde_json::from_value(record)?; 378 392 tracing::info!(?acceptance_record, "acceptance_record"); 379 393 380 394 // Validate the acceptance record ··· 383 397 return Ok(()); 384 398 } 385 399 386 - // Store the acceptance record 387 - acceptance_record_upsert(&self.pool, &aturi, cid, did, record).await?; 400 + // Store the acceptance record (use deserialized record to avoid clone) 401 + acceptance_record_upsert(&self.pool, &aturi, cid, did, &acceptance_record).await?; 388 402 389 403 tracing::info!("Acceptance stored: {}", aturi); 390 404 Ok(()) ··· 470 484 img 471 485 }; 472 486 473 - // Convert to PNG 474 - let mut png_buffer = std::io::Cursor::new(Vec::new()); 487 + // Convert to PNG with pre-allocated buffer 488 + // 400x400 PNG typically compresses to ~100-300KB, pre-allocate 256KB 489 + let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(256 * 1024)); 475 490 resized.write_to(&mut png_buffer, ImageFormat::Png)?; 476 491 let png_bytes = png_buffer.into_inner(); 477 492 ··· 558 573 img 559 574 }; 560 575 561 - // Convert to PNG 562 - let mut png_buffer = std::io::Cursor::new(Vec::new()); 576 + // Convert to PNG with pre-allocated buffer 577 + // 1600x900 PNG typically compresses to ~500KB-1.5MB, pre-allocate 1MB 578 + let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(1024 * 1024)); 563 579 resized.write_to(&mut png_buffer, ImageFormat::Png)?; 564 580 let png_bytes = png_buffer.into_inner(); 565 581 ··· 670 686 img 671 687 }; 672 688 673 - let mut png_buffer = std::io::Cursor::new(Vec::new()); 689 + // Convert to PNG with pre-allocated buffer 690 + // 1600x900 PNG typically compresses to ~500KB-1.5MB, pre-allocate 1MB 691 + let mut png_buffer = std::io::Cursor::new(Vec::with_capacity(1024 * 1024)); 674 692 final_image.write_to(&mut png_buffer, ImageFormat::Png)?; 675 693 let png_bytes = png_buffer.into_inner(); 676 694
-1
src/storage/mod.rs
··· 12 12 pub mod private_event_content; 13 13 pub mod profile; 14 14 pub mod types; 15 - pub mod webhook; 16 15 pub use types::*;
-211
src/storage/webhook.rs
··· 1 - use chrono::Utc; 2 - 3 - use crate::storage::{StoragePool, errors::StorageError}; 4 - 5 - pub(crate) mod model { 6 - use chrono::{DateTime, Utc}; 7 - use serde::{Deserialize, Serialize}; 8 - use sqlx::FromRow; 9 - 10 - #[derive(Clone, FromRow, Deserialize, Serialize, Debug)] 11 - pub struct IdentityWebhook { 12 - pub did: String, 13 - pub service: String, 14 - pub created_at: DateTime<Utc>, 15 - pub enabled: bool, 16 - pub errored_at: Option<DateTime<Utc>>, 17 - pub error: Option<String>, 18 - } 19 - } 20 - 21 - pub use model::IdentityWebhook; 22 - 23 - pub async fn webhook_upsert( 24 - pool: &StoragePool, 25 - did: &str, 26 - service: &str, 27 - ) -> Result<(), StorageError> { 28 - let mut tx = pool 29 - .begin() 30 - .await 31 - .map_err(StorageError::CannotBeginDatabaseTransaction)?; 32 - 33 - let now = Utc::now(); 34 - 35 - sqlx::query( 36 - r#" 37 - INSERT INTO identity_webhooks (did, service, created_at, enabled, errored_at, error) 38 - VALUES ($1, $2, $3, true, NULL, NULL) 39 - ON CONFLICT (did, service) DO UPDATE SET 40 - enabled = true, 41 - errored_at = NULL, 42 - error = NULL 43 - "#, 44 - ) 45 - .bind(did) 46 - .bind(service) 47 - .bind(now) 48 - .execute(tx.as_mut()) 49 - .await 50 - .map_err(StorageError::UnableToExecuteQuery)?; 51 - 52 - tx.commit() 53 - .await 54 - .map_err(StorageError::CannotCommitDatabaseTransaction)?; 55 - 56 - Ok(()) 57 - } 58 - 59 - pub async fn webhook_toggle_enabled( 60 - pool: &StoragePool, 61 - did: &str, 62 - service: &str, 63 - ) -> Result<(), StorageError> { 64 - let mut tx = pool 65 - .begin() 66 - .await 67 - .map_err(StorageError::CannotBeginDatabaseTransaction)?; 68 - 69 - sqlx::query( 70 - r#" 71 - UPDATE identity_webhooks 72 - SET enabled = NOT enabled 73 - WHERE did = $1 AND service = $2 74 - "#, 75 - ) 76 - .bind(did) 77 - .bind(service) 78 - .execute(tx.as_mut()) 79 - .await 80 - .map_err(StorageError::UnableToExecuteQuery)?; 81 - 82 - tx.commit() 83 - .await 84 - .map_err(StorageError::CannotCommitDatabaseTransaction)?; 85 - 86 - Ok(()) 87 - } 88 - 89 - pub async fn webhook_list_by_did( 90 - pool: &StoragePool, 91 - did: &str, 92 - ) -> Result<Vec<IdentityWebhook>, StorageError> { 93 - let mut tx = pool 94 - .begin() 95 - .await 96 - .map_err(StorageError::CannotBeginDatabaseTransaction)?; 97 - 98 - let webhooks = sqlx::query_as::<_, model::IdentityWebhook>( 99 - r#" 100 - SELECT did, service, created_at, enabled, errored_at, error 101 - FROM identity_webhooks 102 - WHERE did = $1 103 - ORDER BY created_at DESC 104 - "#, 105 - ) 106 - .bind(did) 107 - .fetch_all(tx.as_mut()) 108 - .await 109 - .map_err(StorageError::UnableToExecuteQuery)?; 110 - 111 - tx.commit() 112 - .await 113 - .map_err(StorageError::CannotCommitDatabaseTransaction)?; 114 - 115 - Ok(webhooks) 116 - } 117 - 118 - pub async fn webhook_delete( 119 - pool: &StoragePool, 120 - did: &str, 121 - service: &str, 122 - ) -> Result<(), StorageError> { 123 - let mut tx = pool 124 - .begin() 125 - .await 126 - .map_err(StorageError::CannotBeginDatabaseTransaction)?; 127 - 128 - sqlx::query( 129 - r#" 130 - DELETE FROM identity_webhooks 131 - WHERE did = $1 AND service = $2 132 - "#, 133 - ) 134 - .bind(did) 135 - .bind(service) 136 - .execute(tx.as_mut()) 137 - .await 138 - .map_err(StorageError::UnableToExecuteQuery)?; 139 - 140 - tx.commit() 141 - .await 142 - .map_err(StorageError::CannotCommitDatabaseTransaction)?; 143 - 144 - Ok(()) 145 - } 146 - 147 - pub async fn webhook_failed( 148 - pool: &StoragePool, 149 - did: &str, 150 - service: &str, 151 - error: &str, 152 - ) -> Result<(), StorageError> { 153 - let mut tx = pool 154 - .begin() 155 - .await 156 - .map_err(StorageError::CannotBeginDatabaseTransaction)?; 157 - 158 - let now = Utc::now(); 159 - 160 - sqlx::query( 161 - r#" 162 - UPDATE identity_webhooks 163 - SET enabled = false, 164 - errored_at = $4, 165 - error = $3 166 - WHERE did = $1 AND service = $2 167 - "#, 168 - ) 169 - .bind(did) 170 - .bind(service) 171 - .bind(error) 172 - .bind(now) 173 - .execute(tx.as_mut()) 174 - .await 175 - .map_err(StorageError::UnableToExecuteQuery)?; 176 - 177 - tx.commit() 178 - .await 179 - .map_err(StorageError::CannotCommitDatabaseTransaction)?; 180 - 181 - Ok(()) 182 - } 183 - 184 - pub async fn webhook_list_enabled_by_did( 185 - pool: &StoragePool, 186 - did: &str, 187 - ) -> Result<Vec<IdentityWebhook>, StorageError> { 188 - let mut tx = pool 189 - .begin() 190 - .await 191 - .map_err(StorageError::CannotBeginDatabaseTransaction)?; 192 - 193 - let webhooks = sqlx::query_as::<_, model::IdentityWebhook>( 194 - r#" 195 - SELECT did, service, created_at, enabled, errored_at, error 196 - FROM identity_webhooks 197 - WHERE did = $1 AND enabled = true 198 - ORDER BY created_at DESC 199 - "#, 200 - ) 201 - .bind(did) 202 - .fetch_all(tx.as_mut()) 203 - .await 204 - .map_err(StorageError::UnableToExecuteQuery)?; 205 - 206 - tx.commit() 207 - .await 208 - .map_err(StorageError::CannotCommitDatabaseTransaction)?; 209 - 210 - Ok(webhooks) 211 - }
+212
src/tap_processor.rs
··· 1 + //! TAP event processor for AT Protocol events. 2 + //! 3 + //! This module provides a unified TAP stream consumer and event processor, 4 + //! eliminating the need for channel-based message passing. 5 + 6 + use anyhow::Result; 7 + use atproto_tap::{connect, RecordAction, TapConfig, TapEvent}; 8 + use std::sync::Arc; 9 + use std::time::Duration; 10 + use tokio_stream::StreamExt; 11 + use tokio_util::sync::CancellationToken; 12 + 13 + use crate::processor::ContentFetcher; 14 + use crate::task_search_indexer::SearchIndexer; 15 + 16 + /// Unified TAP stream consumer and event processor. 17 + /// 18 + /// Combines TAP event consumption with direct processing, eliminating 19 + /// channel overhead. Events are processed inline as they arrive from 20 + /// the TAP stream. 21 + pub struct TapProcessor { 22 + config: TapConfig, 23 + content_fetcher: ContentFetcher, 24 + search_indexer: Option<SearchIndexer>, 25 + cancel_token: CancellationToken, 26 + } 27 + 28 + impl TapProcessor { 29 + /// Create a new TAP processor. 30 + /// 31 + /// # Arguments 32 + /// 33 + /// * `hostname` - TAP service hostname (e.g., "localhost:2480") 34 + /// * `password` - Optional admin password for authentication 35 + /// * `user_agent` - User-Agent header for WebSocket connections 36 + /// * `content_fetcher` - Processor for event/RSVP/profile content 37 + /// * `search_indexer` - Optional search indexer for OpenSearch 38 + /// * `cancel_token` - Token for graceful shutdown 39 + pub fn new( 40 + hostname: &str, 41 + password: Option<String>, 42 + user_agent: &str, 43 + content_fetcher: ContentFetcher, 44 + search_indexer: Option<SearchIndexer>, 45 + cancel_token: CancellationToken, 46 + ) -> Self { 47 + let mut config_builder = TapConfig::builder() 48 + .hostname(hostname) 49 + .send_acks(true) 50 + .user_agent(user_agent) 51 + .initial_reconnect_delay(Duration::from_secs(1)) 52 + .max_reconnect_delay(Duration::from_secs(30)); 53 + 54 + if let Some(password) = password { 55 + config_builder = config_builder.admin_password(password); 56 + } 57 + 58 + Self { 59 + config: config_builder.build(), 60 + content_fetcher, 61 + search_indexer, 62 + cancel_token, 63 + } 64 + } 65 + 66 + /// Run the TAP processor, consuming events until cancelled. 67 + pub async fn run(self) -> Result<()> { 68 + tracing::info!( 69 + "TAP processor starting, connecting to {}", 70 + self.config.hostname 71 + ); 72 + let mut stream = connect(self.config.clone()); 73 + 74 + loop { 75 + tokio::select! { 76 + () = self.cancel_token.cancelled() => { 77 + tracing::info!("TAP processor cancelled, closing stream"); 78 + stream.close().await; 79 + break; 80 + } 81 + Some(result) = stream.next() => { 82 + match result { 83 + Ok(event) => { 84 + self.process_event(&event).await; 85 + } 86 + Err(err) => { 87 + if err.is_fatal() { 88 + tracing::error!(?err, "Fatal TAP error, exiting processor"); 89 + break; 90 + } 91 + tracing::warn!(?err, "TAP stream error (will retry)"); 92 + } 93 + } 94 + } 95 + } 96 + } 97 + 98 + tracing::info!("TAP processor stopped"); 99 + Ok(()) 100 + } 101 + 102 + async fn process_event(&self, event: &Arc<TapEvent>) { 103 + match event.as_ref() { 104 + TapEvent::Record { record, .. } => { 105 + let collection = record.collection.as_ref(); 106 + 107 + // Skip irrelevant collections (TAP should filter, but double-check) 108 + if !Self::is_relevant_collection(collection) { 109 + return; 110 + } 111 + 112 + let did = record.did.as_ref(); 113 + let rkey = record.rkey.as_ref(); 114 + let live = record.live; 115 + 116 + if !live { 117 + tracing::debug!( 118 + "Processing backfill event: {} {} {}", 119 + collection, 120 + did, 121 + rkey 122 + ); 123 + } 124 + 125 + match record.action { 126 + RecordAction::Create | RecordAction::Update => { 127 + let cid = record.cid.as_ref().map(|c| c.as_ref()).unwrap_or(""); 128 + let record_value = record 129 + .record 130 + .as_ref() 131 + .cloned() 132 + .unwrap_or(serde_json::Value::Null); 133 + 134 + // Process content fetcher and search indexer in parallel 135 + // Clone record_value for search indexer so both can take ownership 136 + let indexer_record = if self.search_indexer.is_some() { 137 + Some(record_value.clone()) 138 + } else { 139 + None 140 + }; 141 + 142 + let content_future = self 143 + .content_fetcher 144 + .handle_commit(did, collection, rkey, cid, record_value, live); 145 + 146 + let index_future = async { 147 + if let (Some(indexer), Some(record)) = 148 + (&self.search_indexer, indexer_record) 149 + { 150 + indexer.index_commit(did, collection, rkey, record).await 151 + } else { 152 + Ok(()) 153 + } 154 + }; 155 + 156 + let (content_result, index_result) = 157 + tokio::join!(content_future, index_future); 158 + 159 + if let Err(e) = content_result { 160 + tracing::error!(?e, "Error processing commit"); 161 + } 162 + if let Err(e) = index_result { 163 + tracing::error!(?e, "Error indexing commit"); 164 + } 165 + } 166 + RecordAction::Delete => { 167 + // Process content fetcher and search indexer delete in parallel 168 + let content_future = 169 + self.content_fetcher 170 + .handle_delete(did, collection, rkey, live); 171 + 172 + let index_future = async { 173 + if let Some(ref indexer) = self.search_indexer { 174 + indexer.delete_record(did, collection, rkey).await 175 + } else { 176 + Ok(()) 177 + } 178 + }; 179 + 180 + let (content_result, index_result) = 181 + tokio::join!(content_future, index_future); 182 + 183 + if let Err(e) = content_result { 184 + tracing::error!(?e, "Error processing delete"); 185 + } 186 + if let Err(e) = index_result { 187 + tracing::error!(?e, "Error deleting from index"); 188 + } 189 + } 190 + } 191 + } 192 + TapEvent::Identity { identity, .. } => { 193 + // Identity events ignored (handle updates can be added in future iteration) 194 + tracing::trace!( 195 + did = %identity.did, 196 + handle = %identity.handle, 197 + "Identity event ignored" 198 + ); 199 + } 200 + } 201 + } 202 + 203 + fn is_relevant_collection(collection: &str) -> bool { 204 + matches!( 205 + collection, 206 + "community.lexicon.calendar.rsvp" 207 + | "community.lexicon.calendar.event" 208 + | "events.smokesignal.profile" 209 + | "events.smokesignal.calendar.acceptance" 210 + ) 211 + } 212 + }
+34 -58
src/task_search_indexer.rs
··· 10 10 use serde::Deserialize; 11 11 use serde_json::{Value, json}; 12 12 use std::sync::Arc; 13 - use tokio_util::sync::CancellationToken; 14 13 15 14 use crate::atproto::lexicon::profile::{Profile, NSID as PROFILE_NSID}; 16 15 use crate::atproto::utils::get_profile_hashtags; 17 - use crate::consumer::{SmokeSignalEvent, SmokeSignalEventReceiver}; 18 16 use crate::task_search_indexer_errors::SearchIndexerError; 19 17 20 18 /// A lightweight event struct for search indexing that excludes problematic fields like locations. ··· 64 62 client: Arc<OpenSearch>, 65 63 identity_resolver: Arc<dyn IdentityResolver>, 66 64 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>, 67 - receiver: SmokeSignalEventReceiver, 68 - cancel_token: CancellationToken, 69 65 } 70 66 71 67 impl SearchIndexer { 68 + /// Create a new SearchIndexer. 69 + /// 70 + /// # Arguments 71 + /// 72 + /// * `endpoint` - OpenSearch endpoint URL 73 + /// * `identity_resolver` - Resolver for DID identities 74 + /// * `document_storage` - Storage for DID documents 72 75 pub async fn new( 73 76 endpoint: &str, 74 77 identity_resolver: Arc<dyn IdentityResolver>, 75 78 document_storage: Arc<dyn DidDocumentStorage + Send + Sync>, 76 - receiver: SmokeSignalEventReceiver, 77 - cancel_token: CancellationToken, 78 79 ) -> Result<Self> { 79 80 let transport = Transport::single_node(endpoint)?; 80 81 let client = Arc::new(OpenSearch::new(transport)); ··· 83 84 client, 84 85 identity_resolver, 85 86 document_storage, 86 - receiver, 87 - cancel_token, 88 87 }; 89 88 90 89 indexer.ensure_index().await?; ··· 192 191 Ok(()) 193 192 } 194 193 195 - pub async fn run(mut self) -> Result<()> { 196 - tracing::info!("Search indexer task started"); 197 - 198 - loop { 199 - tokio::select! { 200 - _ = self.cancel_token.cancelled() => { 201 - tracing::info!("Search indexer task shutting down"); 202 - break; 203 - } 204 - Some(event) = self.receiver.recv() => { 205 - if let Err(err) = self.process_event(event).await { 206 - tracing::error!(?err, "Error processing event for search index"); 207 - } 208 - } 209 - } 194 + /// Index a commit event (create or update). 195 + /// 196 + /// Dispatches to the appropriate indexer based on collection type. 197 + /// Takes ownership of `record` to avoid cloning during deserialization. 198 + pub async fn index_commit( 199 + &self, 200 + did: &str, 201 + collection: &str, 202 + rkey: &str, 203 + record: Value, 204 + ) -> Result<()> { 205 + match collection { 206 + "community.lexicon.calendar.event" => self.index_event(did, rkey, record).await, 207 + c if c == PROFILE_NSID => self.index_profile(did, rkey, record).await, 208 + _ => Ok(()), 210 209 } 211 - 212 - Ok(()) 213 210 } 214 211 215 - async fn process_event(&self, event: SmokeSignalEvent) -> Result<()> { 216 - match event { 217 - SmokeSignalEvent::Commit { 218 - did, 219 - collection, 220 - rkey, 221 - record, 222 - .. 223 - } => { 224 - if collection == "community.lexicon.calendar.event" { 225 - self.index_event(&did, &rkey, &record).await?; 226 - } else if collection == PROFILE_NSID { 227 - self.index_profile(&did, &rkey, &record).await?; 228 - } 229 - } 230 - SmokeSignalEvent::Delete { 231 - did, 232 - collection, 233 - rkey, 234 - .. 235 - } => { 236 - if collection == "community.lexicon.calendar.event" { 237 - self.delete_event(&did, &rkey).await?; 238 - } else if collection == PROFILE_NSID { 239 - self.delete_profile(&did, &rkey).await?; 240 - } 241 - } 212 + /// Delete a record from the search index. 213 + /// 214 + /// Dispatches to the appropriate delete method based on collection type. 215 + pub async fn delete_record(&self, did: &str, collection: &str, rkey: &str) -> Result<()> { 216 + match collection { 217 + "community.lexicon.calendar.event" => self.delete_event(did, rkey).await, 218 + c if c == PROFILE_NSID => self.delete_profile(did, rkey).await, 219 + _ => Ok(()), 242 220 } 243 - 244 - Ok(()) 245 221 } 246 222 247 - async fn index_event(&self, did: &str, rkey: &str, record: &Value) -> Result<()> { 223 + async fn index_event(&self, did: &str, rkey: &str, record: Value) -> Result<()> { 248 224 // Use IndexableEvent which excludes problematic fields like locations 249 - let event: IndexableEvent = serde_json::from_value(record.clone())?; 225 + let event: IndexableEvent = serde_json::from_value(record)?; 250 226 251 227 let document = self.ensure_identity_stored(did).await?; 252 228 let handle = document.handles().unwrap_or("invalid.handle"); ··· 317 293 Ok(()) 318 294 } 319 295 320 - async fn index_profile(&self, did: &str, rkey: &str, record: &Value) -> Result<()> { 321 - let profile: Profile = serde_json::from_value(record.clone())?; 296 + async fn index_profile(&self, did: &str, rkey: &str, record: Value) -> Result<()> { 297 + let profile: Profile = serde_json::from_value(record)?; 322 298 323 299 let document = self.ensure_identity_stored(did).await?; 324 300 let handle = document.handles().unwrap_or("invalid.handle");
-269
src/task_webhooks.rs
··· 1 - use anyhow::Result; 2 - use atproto_identity::traits::DidDocumentStorage; 3 - use atproto_oauth::jwt::{Claims, Header, mint}; 4 - use chrono::Utc; 5 - use rand::distr::{Alphanumeric, SampleString}; 6 - use serde::{Deserialize, Serialize}; 7 - use serde_json::json; 8 - use sqlx::PgPool; 9 - use std::sync::Arc; 10 - use std::time::Duration; 11 - use tokio::sync::mpsc; 12 - use tokio_util::sync::CancellationToken; 13 - 14 - use crate::{ 15 - service::{ServiceDID, ServiceKey}, 16 - storage::webhook::webhook_failed, 17 - task_webhooks_errors::WebhookError, 18 - webhooks::{ 19 - EVENT_CREATED_EVENT, RSVP_CREATED_EVENT, SMOKE_SIGNAL_AUTOMATION_SERVICE, TEST_EVENT, 20 - }, 21 - }; 22 - 23 - #[derive(Debug, Clone)] 24 - pub enum TaskWork { 25 - Test { 26 - identity: String, 27 - service: String, 28 - }, 29 - RSVPCreated { 30 - identity: String, 31 - service: String, 32 - record: serde_json::Value, 33 - context: serde_json::Value, 34 - }, 35 - EventCreated { 36 - identity: String, 37 - service: String, 38 - record: serde_json::Value, 39 - context: serde_json::Value, 40 - }, 41 - } 42 - 43 - #[derive(Debug, Serialize, Deserialize)] 44 - struct WebhookPayload { 45 - record: serde_json::Value, 46 - context: serde_json::Value, 47 - event: String, 48 - } 49 - 50 - pub struct WebhookProcessor { 51 - pool: PgPool, 52 - document_storage: Arc<dyn DidDocumentStorage>, 53 - service_did: ServiceDID, 54 - service_key: ServiceKey, 55 - receiver: mpsc::Receiver<TaskWork>, 56 - cancel_token: CancellationToken, 57 - } 58 - 59 - impl WebhookProcessor { 60 - pub fn new( 61 - pool: PgPool, 62 - document_storage: Arc<dyn DidDocumentStorage>, 63 - service_did: ServiceDID, 64 - service_key: ServiceKey, 65 - receiver: mpsc::Receiver<TaskWork>, 66 - cancel_token: CancellationToken, 67 - ) -> Self { 68 - Self { 69 - pool, 70 - document_storage, 71 - service_did, 72 - service_key, 73 - receiver, 74 - cancel_token, 75 - } 76 - } 77 - 78 - pub async fn run(mut self) -> Result<()> { 79 - tracing::info!("Webhook processor task started"); 80 - 81 - loop { 82 - tokio::select! { 83 - _ = self.cancel_token.cancelled() => { 84 - tracing::info!("Webhook processor task shutting down"); 85 - break; 86 - } 87 - Some(work) = self.receiver.recv() => { 88 - if let Err(e) = self.process_work(work).await { 89 - tracing::error!("Error processing webhook work: {:?}", e); 90 - } 91 - } 92 - } 93 - } 94 - 95 - Ok(()) 96 - } 97 - 98 - async fn process_work(&self, work: TaskWork) -> Result<()> { 99 - match work { 100 - TaskWork::Test { identity, service } => { 101 - self.send_webhook( 102 - &identity, 103 - &service, 104 - WebhookPayload { 105 - record: json!({}), 106 - context: json!({}), 107 - event: TEST_EVENT.to_string(), 108 - }, 109 - ) 110 - .await 111 - } 112 - TaskWork::RSVPCreated { 113 - identity, 114 - service, 115 - record, 116 - context, 117 - } => { 118 - self.send_webhook( 119 - &identity, 120 - &service, 121 - WebhookPayload { 122 - record, 123 - context, 124 - event: RSVP_CREATED_EVENT.to_string(), 125 - }, 126 - ) 127 - .await 128 - } 129 - TaskWork::EventCreated { 130 - identity, 131 - service, 132 - record, 133 - context, 134 - } => { 135 - self.send_webhook( 136 - &identity, 137 - &service, 138 - WebhookPayload { 139 - record, 140 - context, 141 - event: EVENT_CREATED_EVENT.to_string(), 142 - }, 143 - ) 144 - .await 145 - } 146 - } 147 - } 148 - 149 - async fn send_webhook( 150 - &self, 151 - identity: &str, 152 - service: &str, 153 - payload: WebhookPayload, 154 - ) -> Result<()> { 155 - // Remove the suffix from service 156 - if !service.ends_with(SMOKE_SIGNAL_AUTOMATION_SERVICE) { 157 - return Err(WebhookError::InvalidServiceSuffix { 158 - suffix: SMOKE_SIGNAL_AUTOMATION_SERVICE.to_string(), 159 - } 160 - .into()); 161 - } 162 - 163 - let service_did = service 164 - .strip_suffix(SMOKE_SIGNAL_AUTOMATION_SERVICE) 165 - .unwrap(); 166 - 167 - // Get the DID document from document storage 168 - let document = self 169 - .document_storage 170 - .get_document_by_did(service_did) 171 - .await 172 - .map_err(|e| WebhookError::DidDocumentRetrievalFailed { 173 - did: service_did.to_string(), 174 - error: e.to_string(), 175 - })? 176 - .ok_or_else(|| WebhookError::DidDocumentNotFound { 177 - did: service_did.to_string(), 178 - })?; 179 - 180 - // Extract the service endpoint 181 - let automation_service = document 182 - .service 183 - .iter() 184 - .find(|service| service.id.ends_with(SMOKE_SIGNAL_AUTOMATION_SERVICE)) 185 - .ok_or(WebhookError::ServiceNotFoundInDidDocument)?; 186 - 187 - // Get the service endpoint - it should be a string URL 188 - let endpoint_url = &automation_service.service_endpoint; 189 - 190 - // Construct the webhook URL 191 - let webhook_url = format!( 192 - "{}/xrpc/events.smokesignal.automation.InvokeWebhook", 193 - endpoint_url 194 - ); 195 - 196 - // Create signed JWT 197 - let header: Header = self.service_key.0.clone().try_into()?; 198 - 199 - let now = Utc::now(); 200 - let jti = Alphanumeric.sample_string(&mut rand::rng(), 30); 201 - 202 - // Create base claims 203 - let mut claims = Claims::default(); 204 - claims.jose.issuer = Some(self.service_did.0.clone()); 205 - claims.jose.audience = Some(service_did.to_string()); 206 - claims.jose.issued_at = Some(now.timestamp() as u64); 207 - claims.jose.expiration = Some((now.timestamp() as u64) + 60); 208 - claims.jose.json_web_token_id = Some(jti); 209 - claims.private.insert( 210 - "lxm".to_string(), 211 - serde_json::Value::String("events.smokesignal.automation.InvokeWebhook".to_string()), 212 - ); 213 - 214 - let token = mint(&self.service_key.0, &header, &claims).map_err(|e| { 215 - WebhookError::JwtCreationFailed { 216 - error: e.to_string(), 217 - } 218 - })?; 219 - 220 - // Prepare headers with JWT authorization 221 - let mut headers = reqwest::header::HeaderMap::new(); 222 - headers.insert("authorization", format!("Bearer {}", token).parse()?); 223 - headers.insert("content-type", "application/json".parse()?); 224 - 225 - // Send the webhook 226 - let client = reqwest::Client::new(); 227 - let response = client 228 - .post(&webhook_url) 229 - .headers(headers) 230 - .json(&payload) 231 - .timeout(Duration::from_secs(30)) 232 - .send() 233 - .await; 234 - 235 - match response { 236 - Ok(resp) if resp.status().is_success() => { 237 - tracing::info!("Webhook sent successfully to {} for {}", service, identity); 238 - Ok(()) 239 - } 240 - Ok(resp) => { 241 - let status = resp.status(); 242 - let error_text = resp 243 - .text() 244 - .await 245 - .unwrap_or_else(|_| "Unknown error".to_string()); 246 - let error_msg = format!("HTTP {}: {}", status, error_text); 247 - 248 - tracing::error!("Webhook failed: {}", error_msg); 249 - 250 - // Update database to mark webhook as failed 251 - webhook_failed(&self.pool, identity, service, &error_msg).await?; 252 - 253 - Err(WebhookError::WebhookRequestFailed { error: error_msg }.into()) 254 - } 255 - Err(e) => { 256 - let error_msg = format!("Request failed: {}", e); 257 - tracing::error!("Webhook request error: {}", error_msg); 258 - 259 - // Update database to mark webhook as failed 260 - webhook_failed(&self.pool, identity, service, &error_msg).await?; 261 - 262 - Err(WebhookError::WebhookTransportFailed { 263 - error: e.to_string(), 264 - } 265 - .into()) 266 - } 267 - } 268 - } 269 - }
-57
src/task_webhooks_errors.rs
··· 1 - use thiserror::Error; 2 - 3 - /// Represents errors that can occur during webhook processing. 4 - /// 5 - /// These errors typically occur when processing webhook notifications, 6 - /// including service validation, DID resolution, and HTTP request failures. 7 - #[derive(Debug, Error)] 8 - pub(crate) enum WebhookError { 9 - /// Error when a service doesn't end with the required suffix. 10 - /// 11 - /// This error occurs when validating a webhook service URL that doesn't 12 - /// end with the expected automation service suffix. 13 - #[error("error-smokesignal-webhook-1 Service must end with {suffix}")] 14 - InvalidServiceSuffix { suffix: String }, 15 - 16 - /// Error when failing to get a DID document. 17 - /// 18 - /// This error occurs when attempting to retrieve a DID document 19 - /// for service validation fails. 20 - #[error("error-smokesignal-webhook-2 Failed to get DID document for {did}: {error}")] 21 - DidDocumentRetrievalFailed { did: String, error: String }, 22 - 23 - /// Error when a DID document is not found. 24 - /// 25 - /// This error occurs when a required DID document cannot be found 26 - /// in the system. 27 - #[error("error-smokesignal-webhook-3 DID document not found for {did}")] 28 - DidDocumentNotFound { did: String }, 29 - 30 - /// Error when a service is not found in a DID document. 31 - /// 32 - /// This error occurs when a DID document doesn't contain the expected 33 - /// automation service entry. 34 - #[error("error-smokesignal-webhook-4 service not found in DID document")] 35 - ServiceNotFoundInDidDocument, 36 - 37 - /// Error when JWT creation fails. 38 - /// 39 - /// This error occurs when attempting to create a JWT token for 40 - /// webhook authentication fails. 41 - #[error("error-smokesignal-webhook-5 Failed to create JWT: {error}")] 42 - JwtCreationFailed { error: String }, 43 - 44 - /// Error when a webhook request fails with an error response. 45 - /// 46 - /// This error occurs when the webhook endpoint returns an error 47 - /// status code or the request fails. 48 - #[error("error-smokesignal-webhook-6 Webhook failed: {error}")] 49 - WebhookRequestFailed { error: String }, 50 - 51 - /// Error when a webhook request fails due to network or other issues. 52 - /// 53 - /// This error occurs when the HTTP request to the webhook endpoint 54 - /// fails due to network issues or other transport problems. 55 - #[error("error-smokesignal-webhook-7 Webhook request failed: {error}")] 56 - WebhookTransportFailed { error: String }, 57 - }
-5
src/webhooks.rs
··· 1 - pub(crate) const SMOKE_SIGNAL_AUTOMATION_SERVICE: &str = "#SmokeSignalAutomation"; 2 - 3 - pub(crate) const RSVP_CREATED_EVENT: &str = "rsvp.created"; 4 - pub(crate) const EVENT_CREATED_EVENT: &str = "event.created"; 5 - pub(crate) const TEST_EVENT: &str = "test";
-35
templates/en-us/settings.common.html
··· 60 60 </div> 61 61 </div> 62 62 </div> 63 - 64 - {% if webhooks_enabled %} 65 - <hr> 66 - 67 - <div class="columns"> 68 - <div class="column"> 69 - <h2 class="subtitle">Webhooks</h2> 70 - <p class="mb-4">Configure webhook endpoints to receive notifications when events occur.</p> 71 - 72 - <div id="webhooks-list"> 73 - {% include "en-us/settings.webhooks.html" %} 74 - </div> 75 - 76 - <div class="box mt-4"> 77 - <h3 class="subtitle is-5">Add New Webhook</h3> 78 - <form hx-post="/settings/webhooks/add" hx-target="#webhooks-list" hx-swap="innerHTML"> 79 - <div class="field has-addons"> 80 - <div class="control is-expanded"> 81 - <input class="input" type="text" name="service" placeholder="webhook-service-name" required> 82 - </div> 83 - <div class="control"> 84 - <button class="button is-primary" type="submit"> 85 - <span class="icon"> 86 - <i class="fas fa-plus"></i> 87 - </span> 88 - <span>Add Webhook</span> 89 - </button> 90 - </div> 91 - </div> 92 - <p class="help">Enter the service name for your webhook endpoint (e.g., "my-automation-service")</p> 93 - </form> 94 - </div> 95 - </div> 96 - </div> 97 - {% endif %} 98 63 </div> 99 64 </div> 100 65 </div>
-76
templates/en-us/settings.webhooks.html
··· 1 - {% if webhooks %} 2 - <div class="table-container"> 3 - <table class="table is-fullwidth is-striped"> 4 - <thead> 5 - <tr> 6 - <th>Service</th> 7 - <th>Status</th> 8 - <th>Created</th> 9 - <th>Last Error</th> 10 - <th>Actions</th> 11 - </tr> 12 - </thead> 13 - <tbody> 14 - {% for webhook in webhooks %} 15 - <tr> 16 - <td>{{ webhook.service }}</td> 17 - <td> 18 - {% if webhook.enabled %} 19 - <span class="tag is-success">Enabled</span> 20 - {% else %} 21 - <span class="tag is-danger">Disabled</span> 22 - {% endif %} 23 - </td> 24 - <td>{{ webhook.created_at }}</td> 25 - <td> 26 - {% if webhook.error %} 27 - <span class="has-text-danger" title="{{ webhook.error }}"> 28 - {{ webhook.errored_at }} 29 - </span> 30 - {% else %} 31 - <span class="has-text-grey">-</span> 32 - {% endif %} 33 - </td> 34 - <td> 35 - <div class="buttons are-small"> 36 - <form hx-post="/settings/webhooks/toggle" hx-target="#webhooks-list" hx-swap="innerHTML" style="display: inline;"> 37 - <input type="hidden" name="service" value="{{ webhook.service }}"> 38 - <button class="button is-small {% if webhook.enabled %}is-warning{% else %}is-success{% endif %}" type="submit"> 39 - <span class="icon"> 40 - <i class="fas {% if webhook.enabled %}fa-pause{% else %}fa-play{% endif %}"></i> 41 - </span> 42 - <span>{% if webhook.enabled %}Disable{% else %}Enable{% endif %}</span> 43 - </button> 44 - </form> 45 - 46 - <form hx-post="/settings/webhooks/test" hx-target="#webhooks-list" hx-swap="innerHTML" style="display: inline;"> 47 - <input type="hidden" name="service" value="{{ webhook.service }}"> 48 - <button class="button is-small is-info" type="submit" {% if not webhook.enabled %}disabled{% endif %}> 49 - <span class="icon"> 50 - <i class="fas fa-vial"></i> 51 - </span> 52 - <span>Test</span> 53 - </button> 54 - </form> 55 - 56 - <form hx-post="/settings/webhooks/remove" hx-target="#webhooks-list" hx-swap="innerHTML" hx-confirm="Are you sure you want to remove this webhook?" style="display: inline;"> 57 - <input type="hidden" name="service" value="{{ webhook.service }}"> 58 - <button class="button is-small is-danger" type="submit"> 59 - <span class="icon"> 60 - <i class="fas fa-trash"></i> 61 - </span> 62 - <span>Remove</span> 63 - </button> 64 - </form> 65 - </div> 66 - </td> 67 - </tr> 68 - {% endfor %} 69 - </tbody> 70 - </table> 71 - </div> 72 - {% else %} 73 - <div class="notification is-info is-light"> 74 - <p>No webhooks configured yet. Add your first webhook using the form below.</p> 75 - </div> 76 - {% endif %}