interactive intro to open social

feat: add app-agnostic record links to firehose toasts

Add clickable "view record" links to firehose toast notifications that open raw record JSON from user's PDS. Also fix particle animation direction to flow from apps to PDS (correctly showing data writes).

Changes:
- Add /api/record endpoint to fetch individual records
- Refactor firehose to use DID-specific connections via manager
- Add toast link element with hover styling
- Fetch record details for richer toast messages
- Reverse particle flow direction (app → PDS)
- Add "Your PDS" label to identity circle

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+278 -65
src
static
+72 -18
src/firehose.rs
··· 27 27 /// Broadcaster for firehose events 28 28 pub type FirehoseBroadcaster = Arc<broadcast::Sender<FirehoseEvent>>; 29 29 30 + /// Manager for DID-specific firehose connections 31 + pub type FirehoseManager = Arc<Mutex<HashMap<String, FirehoseBroadcaster>>>; 32 + 30 33 /// A generic ingester that broadcasts all events 31 34 struct BroadcastIngester { 32 35 broadcaster: FirehoseBroadcaster, ··· 59 62 action: action.to_string(), 60 63 collection: commit.collection.clone(), 61 64 rkey: commit.rkey.clone(), 62 - namespace, 65 + namespace: namespace.clone(), 63 66 }; 64 67 68 + info!( 69 + "Received event: {} {} {} (namespace: {})", 70 + action, message.did, commit.collection, namespace 71 + ); 72 + 65 73 // Broadcast the event (ignore if no receivers) 66 - let _ = self.broadcaster.send(firehose_event); 74 + match self.broadcaster.send(firehose_event) { 75 + Ok(receivers) => { 76 + info!("Broadcast to {} receivers", receivers); 77 + } 78 + Err(_) => { 79 + // No receivers, that's ok 80 + } 81 + } 67 82 68 83 Ok(()) 69 84 } 70 85 } 71 86 72 - /// Start the Jetstream ingester that broadcasts events to all listeners 73 - pub async fn start_firehose_broadcaster() -> FirehoseBroadcaster { 87 + /// Create a new FirehoseManager 88 + pub fn create_firehose_manager() -> FirehoseManager { 89 + Arc::new(Mutex::new(HashMap::new())) 90 + } 91 + 92 + /// Get or create a firehose broadcaster for a specific DID 93 + pub async fn get_or_create_broadcaster( 94 + manager: &FirehoseManager, 95 + did: String, 96 + ) -> FirehoseBroadcaster { 97 + // Check if we already have a broadcaster for this DID 98 + { 99 + let broadcasters = manager.lock().unwrap(); 100 + if let Some(broadcaster) = broadcasters.get(&did) { 101 + info!("Reusing existing firehose connection for DID: {}", did); 102 + return broadcaster.clone(); 103 + } 104 + } 105 + 106 + info!("Creating new firehose connection for DID: {}", did); 107 + 74 108 // Create a broadcast channel with a buffer of 100 events 75 109 let (tx, _rx) = broadcast::channel::<FirehoseEvent>(100); 76 110 let broadcaster = Arc::new(tx); 77 111 112 + // Store in manager 113 + { 114 + let mut broadcasters = manager.lock().unwrap(); 115 + broadcasters.insert(did.clone(), broadcaster.clone()); 116 + } 117 + 118 + // Clone for the spawn 78 119 let broadcaster_clone = broadcaster.clone(); 120 + let did_clone = did.clone(); 79 121 80 122 tokio::spawn(async move { 81 123 loop { 82 - info!("Starting Jetstream connection..."); 124 + info!("Starting Jetstream connection for DID: {}...", did_clone); 83 125 84 - // Configure Jetstream to receive all events (no collection filter) 85 - let opts = JetstreamOptions::builder().build(); 126 + // Configure Jetstream to receive events ONLY for this DID 127 + let opts = JetstreamOptions::builder() 128 + .wanted_dids(vec![did_clone.clone()]) 129 + .build(); 86 130 let jetstream = JetstreamConnection::new(opts); 87 131 88 - // Create ingesters - we use a wildcard to capture all collections 89 132 let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = 90 133 HashMap::new(); 91 134 92 - // Use "*" as a catch-all for all collections 93 - ingesters.insert( 94 - "*".to_string(), 95 - Box::new(BroadcastIngester { 96 - broadcaster: broadcaster_clone.clone(), 97 - }), 98 - ); 135 + // Register ingesters for common Bluesky collections 136 + let collections = vec![ 137 + "app.bsky.feed.post", 138 + "app.bsky.feed.like", 139 + "app.bsky.feed.repost", 140 + "app.bsky.graph.follow", 141 + "app.bsky.actor.profile", 142 + ]; 143 + 144 + for collection in collections { 145 + ingesters.insert( 146 + collection.to_string(), 147 + Box::new(BroadcastIngester { 148 + broadcaster: broadcaster_clone.clone(), 149 + }), 150 + ); 151 + } 99 152 100 153 // Get channels 101 154 let msg_rx = jetstream.get_msg_rx(); ··· 105 158 let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); 106 159 let c_cursor = cursor.clone(); 107 160 108 - // Spawn task to process messages 161 + // Spawn task to process messages using proper handler 109 162 tokio::spawn(async move { 163 + info!("Starting message processing loop for DID-filtered connection"); 110 164 while let Ok(message) = msg_rx.recv_async().await { 111 165 if let Err(e) = rocketman::handler::handle_message( 112 166 message, ··· 125 179 let failed = { 126 180 let connect_result = jetstream.connect(cursor).await; 127 181 if let Err(e) = connect_result { 128 - error!("Jetstream connection failed: {}", e); 182 + error!("Jetstream connection failed for DID {}: {}", did_clone, e); 129 183 true 130 184 } else { 131 185 false ··· 137 191 continue; 138 192 } 139 193 140 - info!("Jetstream connection dropped, reconnecting in 5 seconds..."); 194 + info!("Jetstream connection dropped for DID: {}, reconnecting in 5 seconds...", did_clone); 141 195 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 142 196 } 143 197 });
+4 -3
src/main.rs
··· 14 14 15 15 let client = oauth::create_oauth_client(); 16 16 17 - // Start the firehose broadcaster 18 - let firehose_broadcaster = firehose::start_firehose_broadcaster().await; 17 + // Create the firehose manager (connections created lazily per-DID) 18 + let firehose_manager = firehose::create_firehose_manager(); 19 19 20 20 println!("starting server at http://localhost:8080"); 21 21 ··· 35 35 .build(), 36 36 ) 37 37 .app_data(web::Data::new(client.clone())) 38 - .app_data(web::Data::new(firehose_broadcaster.clone())) 38 + .app_data(web::Data::new(firehose_manager.clone())) 39 39 .service(routes::index) 40 40 .service(routes::login) 41 41 .service(routes::callback) ··· 46 46 .service(routes::init) 47 47 .service(routes::get_avatar) 48 48 .service(routes::validate_url) 49 + .service(routes::get_record) 49 50 .service(routes::firehose_watch) 50 51 .service(routes::favicon) 51 52 .service(Files::new("/static", "./static"))
+49 -8
src/routes.rs
··· 3 3 use atrium_oauth::{AuthorizeOptions, CallbackParams, KnownScope, Scope}; 4 4 use serde::Deserialize; 5 5 6 - use crate::firehose::FirehoseBroadcaster; 6 + use crate::firehose::FirehoseManager; 7 7 use crate::mst; 8 8 use crate::oauth::OAuthClientType; 9 9 use crate::templates; ··· 393 393 } 394 394 395 395 #[derive(Deserialize)] 396 + pub struct RecordQuery { 397 + pds: String, 398 + did: String, 399 + collection: String, 400 + rkey: String, 401 + } 402 + 403 + #[get("/api/record")] 404 + pub async fn get_record(query: web::Query<RecordQuery>) -> HttpResponse { 405 + let record_url = format!( 406 + "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 407 + query.pds, query.did, query.collection, query.rkey 408 + ); 409 + 410 + match reqwest::get(&record_url).await { 411 + Ok(response) => { 412 + if !response.status().is_success() { 413 + return HttpResponse::Ok().json(serde_json::json!({ 414 + "error": "record not found" 415 + })); 416 + } 417 + 418 + match response.json::<serde_json::Value>().await { 419 + Ok(data) => HttpResponse::Ok().json(data), 420 + Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ 421 + "error": format!("failed to parse record: {}", e) 422 + })), 423 + } 424 + } 425 + Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ 426 + "error": format!("failed to fetch record: {}", e) 427 + })), 428 + } 429 + } 430 + 431 + #[derive(Deserialize)] 396 432 pub struct FirehoseQuery { 397 433 did: String, 398 434 } ··· 400 436 #[get("/api/firehose/watch")] 401 437 pub async fn firehose_watch( 402 438 query: web::Query<FirehoseQuery>, 403 - broadcaster: web::Data<FirehoseBroadcaster>, 439 + manager: web::Data<FirehoseManager>, 404 440 ) -> HttpResponse { 405 441 let did = query.did.clone(); 442 + 443 + // Get or create a broadcaster for this DID 444 + let broadcaster = crate::firehose::get_or_create_broadcaster(&manager, did.clone()).await; 406 445 let mut rx = broadcaster.subscribe(); 407 446 447 + log::info!("SSE connection established for DID: {}", did); 448 + 408 449 let stream = async_stream::stream! { 409 450 // Send initial connection message 410 451 yield Ok::<_, actix_web::Error>( 411 452 web::Bytes::from(format!("data: {{\"type\":\"connected\"}}\n\n")) 412 453 ); 413 454 414 - // Stream firehose events filtered by DID 455 + log::info!("Sent initial connection message to client"); 456 + 457 + // Stream firehose events (already filtered by DID at Jetstream level) 415 458 while let Ok(event) = rx.recv().await { 416 - // Only send events for this DID 417 - if event.did == did { 418 - let json = serde_json::to_string(&event).unwrap_or_default(); 419 - yield Ok(web::Bytes::from(format!("data: {}\n\n", json))); 420 - } 459 + log::info!("Sending event to client: {} {} {}", event.action, event.did, event.collection); 460 + let json = serde_json::to_string(&event).unwrap_or_default(); 461 + yield Ok(web::Bytes::from(format!("data: {}\n\n", json))); 421 462 } 422 463 }; 423 464
+28
src/templates.rs
··· 469 469 letter-spacing: 0.05em; 470 470 }} 471 471 472 + .identity-pds-label {{ 473 + position: absolute; 474 + bottom: clamp(-1.5rem, -3vmin, -2rem); 475 + font-size: clamp(0.55rem, 1.1vmin, 0.65rem); 476 + color: var(--text-light); 477 + letter-spacing: 0.05em; 478 + font-weight: 500; 479 + }} 480 + 472 481 .identity-avatar {{ 473 482 width: clamp(30px, 6vmin, 45px); 474 483 height: clamp(30px, 6vmin, 45px); ··· 1249 1258 .firehose-toast.visible {{ 1250 1259 opacity: 1; 1251 1260 transform: translateY(0); 1261 + pointer-events: auto; 1252 1262 }} 1253 1263 1254 1264 .firehose-toast-action {{ ··· 1262 1272 margin-top: 0.25rem; 1263 1273 }} 1264 1274 1275 + .firehose-toast-link {{ 1276 + display: inline-block; 1277 + color: var(--text-light); 1278 + font-size: 0.6rem; 1279 + margin-top: 0.5rem; 1280 + text-decoration: none; 1281 + border-bottom: 1px solid transparent; 1282 + transition: all 0.2s ease; 1283 + pointer-events: auto; 1284 + }} 1285 + 1286 + .firehose-toast-link:hover {{ 1287 + color: var(--text); 1288 + border-bottom-color: var(--text); 1289 + }} 1290 + 1265 1291 @media (max-width: 768px) {{ 1266 1292 .watch-live-btn {{ 1267 1293 right: clamp(1rem, 2vmin, 1.5rem); ··· 1288 1314 <div class="firehose-toast" id="firehoseToast"> 1289 1315 <div class="firehose-toast-action"></div> 1290 1316 <div class="firehose-toast-collection"></div> 1317 + <a class='firehose-toast-link' id='firehoseToastLink' href='#' target='_blank' rel='noopener noreferrer'>view record</a> 1291 1318 </div> 1292 1319 1293 1320 <div class="overlay" id="overlay"></div> ··· 1310 1337 <div class="identity-label">@</div> 1311 1338 <div class="identity-value" id="handle">loading...</div> 1312 1339 <div class="identity-hint">tap for details</div> 1340 + <div class="identity-pds-label">Your PDS</div> 1313 1341 </div> 1314 1342 <div id="field" class="loading">loading...</div> 1315 1343 </div>
+125 -36
static/app.js
··· 189 189 let html = ` 190 190 <button class="detail-close" id="detailClose">×</button> 191 191 <h3>${namespace}</h3> 192 - <div class="subtitle">records stored in your pds:</div> 192 + <div class="subtitle">records stored in your <a href="https://atproto.com/guides/self-hosting" target="_blank" rel="noopener noreferrer" style="color: var(--text); text-decoration: underline;">PDS</a>:</div> 193 193 `; 194 194 195 195 if (collections && collections.length > 0) { ··· 831 831 if (alive) { 832 832 particle.draw(firehoseCtx); 833 833 } else { 834 - // Particle reached destination - pulse the app circle 835 - pulseAppCircle(particle.metadata.namespace); 834 + // Particle reached destination - pulse the identity/PDS 835 + pulseIdentity(); 836 836 } 837 837 return alive; 838 838 }); ··· 842 842 } 843 843 } 844 844 845 - function pulseAppCircle(namespace) { 846 - const appCircle = document.querySelector(`[data-namespace="${namespace}"]`); 847 - if (appCircle) { 848 - appCircle.style.transition = 'all 0.3s ease'; 849 - appCircle.style.transform = 'scale(1.2)'; 850 - appCircle.style.boxShadow = '0 0 20px rgba(255, 255, 255, 0.5)'; 851 - 845 + function pulseIdentity() { 846 + const identity = document.querySelector('.identity'); 847 + if (identity) { 848 + identity.style.transition = 'all 0.3s ease'; 849 + identity.style.transform = 'scale(1.15)'; 850 + identity.style.boxShadow = '0 0 25px rgba(255, 255, 255, 0.6)'; 851 + 852 852 setTimeout(() => { 853 - appCircle.style.transform = ''; 854 - appCircle.style.boxShadow = ''; 853 + identity.style.transform = ''; 854 + identity.style.boxShadow = ''; 855 855 }, 300); 856 856 } 857 857 } 858 858 859 - function showFirehoseToast(action, collection) { 860 - const toast = document.getElementById('firehoseToast'); 861 - const actionEl = toast.querySelector('.firehose-toast-action'); 862 - const collectionEl = toast.querySelector('.firehose-toast-collection'); 859 + async function fetchRecordDetails(pds, did, collection, rkey) { 860 + try { 861 + const response = await fetch( 862 + `/api/record?pds=${encodeURIComponent(pds)}&did=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&rkey=${encodeURIComponent(rkey)}` 863 + ); 864 + const data = await response.json(); 865 + if (data.error) return null; 866 + return data.value; 867 + } catch (e) { 868 + console.error('Error fetching record:', e); 869 + return null; 870 + } 871 + } 863 872 873 + function formatToastMessage(action, collection, record) { 864 874 const actionText = { 865 875 'create': 'created', 866 876 'update': 'updated', 867 877 'delete': 'deleted' 868 878 }[action] || action; 869 879 870 - actionEl.textContent = `${actionText} record`; 871 - collectionEl.textContent = collection; 880 + // If we don't have record details, fall back to basic message 881 + if (!record) { 882 + return { 883 + action: `${actionText} record`, 884 + details: collection 885 + }; 886 + } 887 + 888 + // Format based on collection type 889 + if (collection === 'app.bsky.feed.post') { 890 + const text = record.text || ''; 891 + const preview = text.length > 50 ? text.substring(0, 50) + '...' : text; 892 + return { 893 + action: `${actionText} post`, 894 + details: preview || 'no text' 895 + }; 896 + } else if (collection === 'app.bsky.feed.like') { 897 + return { 898 + action: `${actionText} like`, 899 + details: '' 900 + }; 901 + } else if (collection === 'app.bsky.feed.repost') { 902 + return { 903 + action: `${actionText} repost`, 904 + details: '' 905 + }; 906 + } else if (collection === 'app.bsky.graph.follow') { 907 + return { 908 + action: `${actionText} follow`, 909 + details: '' 910 + }; 911 + } else if (collection === 'app.bsky.actor.profile') { 912 + const displayName = record.displayName || ''; 913 + return { 914 + action: `${actionText} profile`, 915 + details: displayName || 'updated profile' 916 + }; 917 + } 918 + 919 + // Default for unknown collections 920 + return { 921 + action: `${actionText} record`, 922 + details: collection 923 + }; 924 + } 925 + 926 + async function showFirehoseToast(event) { 927 + const toast = document.getElementById('firehoseToast'); 928 + const actionEl = toast.querySelector('.firehose-toast-action'); 929 + const collectionEl = toast.querySelector('.firehose-toast-collection'); 930 + const linkEl = document.getElementById('firehoseToastLink'); 931 + 932 + // Build PDS link for the record 933 + if (globalPds && event.did && event.collection && event.rkey) { 934 + const recordUrl = `${globalPds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(event.did)}&collection=${encodeURIComponent(event.collection)}&rkey=${encodeURIComponent(event.rkey)}`; 935 + linkEl.href = recordUrl; 936 + } 937 + 938 + // Fetch record details if available (skip for deletes) 939 + let record = null; 940 + if (event.action !== 'delete' && event.rkey && globalPds) { 941 + record = await fetchRecordDetails(globalPds, event.did, event.collection, event.rkey); 942 + } 943 + 944 + const formatted = formatToastMessage(event.action, event.collection, record); 945 + 946 + actionEl.textContent = formatted.action; 947 + collectionEl.textContent = formatted.details; 872 948 873 949 toast.classList.add('visible'); 874 950 setTimeout(() => { 875 951 toast.classList.remove('visible'); 876 - }, 3000); 952 + }, 4000); // Slightly longer to read details 877 953 } 878 954 879 955 function getParticleColor(action) { ··· 886 962 } 887 963 888 964 function createFirehoseParticle(event) { 889 - // Get identity circle position 890 - const identity = document.querySelector('.identity'); 891 - if (!identity) return; 892 - 893 - const identityRect = identity.getBoundingClientRect(); 894 - const startX = identityRect.left + identityRect.width / 2; 895 - const startY = identityRect.top + identityRect.height / 2; 896 - 897 - // Get target app circle position 965 + // Get source app circle position (where the action happened) 898 966 const appCircle = document.querySelector(`[data-namespace="${event.namespace}"]`); 899 967 if (!appCircle) return; 900 968 901 969 const appRect = appCircle.getBoundingClientRect(); 902 - const endX = appRect.left + appRect.width / 2; 903 - const endY = appRect.top + appRect.height / 2; 970 + const startX = appRect.left + appRect.width / 2; 971 + const startY = appRect.top + appRect.height / 2; 904 972 905 - // Create particle 973 + // Get target identity/PDS position (where data is written) 974 + const identity = document.querySelector('.identity'); 975 + if (!identity) return; 976 + 977 + const identityRect = identity.getBoundingClientRect(); 978 + const endX = identityRect.left + identityRect.width / 2; 979 + const endY = identityRect.top + identityRect.height / 2; 980 + 981 + // Create particle (flows from app TO PDS) 906 982 const particle = new FirehoseParticle( 907 983 startX, startY, 908 984 endX, endY, ··· 918 994 } 919 995 920 996 function connectFirehose() { 921 - if (!did || firehoseEventSource) return; 997 + console.log('[Firehose] connectFirehose called, did =', did, 'existing connection?', !!firehoseEventSource); 998 + if (!did || firehoseEventSource) { 999 + console.warn('[Firehose] Exiting early - did:', did, 'firehoseEventSource:', firehoseEventSource); 1000 + return; 1001 + } 922 1002 923 1003 const url = `/api/firehose/watch?did=${encodeURIComponent(did)}`; 924 - console.log('Connecting to firehose:', url); 1004 + console.log('[Firehose] Connecting to:', url); 925 1005 926 1006 firehoseEventSource = new EventSource(url); 927 1007 ··· 948 1028 949 1029 // Create particle animation 950 1030 createFirehoseParticle(data); 951 - 1031 + 952 1032 // Show toast notification 953 - showFirehoseToast(data.action, data.collection); 1033 + showFirehoseToast(data); 954 1034 } catch (error) { 955 1035 console.error('Error processing firehose message:', error); 956 1036 } ··· 995 1075 996 1076 // Toggle watch live 997 1077 document.addEventListener('DOMContentLoaded', () => { 1078 + console.log('[Firehose] DOMContentLoaded fired, setting up watch button'); 998 1079 const watchBtn = document.getElementById('watchLiveBtn'); 999 - if (!watchBtn) return; 1080 + if (!watchBtn) { 1081 + console.error('[Firehose] Watch button not found!'); 1082 + return; 1083 + } 1000 1084 1085 + console.log('[Firehose] Watch button found, attaching click handler'); 1001 1086 const watchLabel = watchBtn.querySelector('.watch-label'); 1002 1087 1003 1088 watchBtn.addEventListener('click', () => { 1089 + console.log('[Firehose] Watch button clicked! isWatchingLive was:', isWatchingLive); 1004 1090 isWatchingLive = !isWatchingLive; 1091 + console.log('[Firehose] isWatchingLive now:', isWatchingLive); 1005 1092 1006 1093 if (isWatchingLive) { 1007 1094 // Start watching 1095 + console.log('[Firehose] Starting watch mode'); 1008 1096 watchLabel.textContent = 'connecting...'; 1009 1097 initFirehoseCanvas(); 1010 1098 connectFirehose(); 1011 1099 animateFirehoseParticles(); 1012 1100 } else { 1013 1101 // Stop watching 1102 + console.log('[Firehose] Stopping watch mode'); 1014 1103 watchLabel.textContent = 'watch live'; 1015 1104 watchBtn.classList.remove('active'); 1016 1105 disconnectFirehose();