use anyhow::{Context, Result}; use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, time::Duration}; use tokio::sync::mpsc; use tokio_tungstenite::{ connect_async, tungstenite::{ client::IntoClientRequest, http::HeaderValue, Message, }, }; use url::Url; use crate::tui::Message as TuiMessage; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JetstreamEvent { #[serde(rename = "kind")] pub kind: String, #[serde(rename = "time_us")] pub time_us: i64, pub did: String, pub commit: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommitData { pub rev: String, pub operation: String, pub collection: String, pub rkey: String, pub record: Option, pub cid: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BlipRecord { #[serde(rename = "$type")] pub record_type: String, pub content: String, #[serde(rename = "createdAt")] pub created_at: String, } pub struct JetstreamClient { did_cache: HashMap, // DID -> handle cache own_did: Option, // User's own DID to filter out } impl JetstreamClient { pub fn new(own_did: Option) -> Self { Self { did_cache: HashMap::new(), own_did, } } pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender) -> Result<()> { // Try simple connection first, then with collection filter let urls = vec![ "wss://jetstream2.us-west.bsky.network/subscribe", "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip" ]; for (i, jetstream_url) in urls.iter().enumerate() { // Send status to TUI instead of console let status_msg = crate::tui::Message::new( "system".to_string(), format!("Trying connection {} of {}", i + 1, urls.len()), false, ); let _ = message_tx.send(status_msg); loop { match self.try_connect_and_listen(&message_tx, jetstream_url).await { Ok(_) => { return Ok(()); } Err(_e) => { // If this is the last URL, retry it after a delay if i == urls.len() - 1 { let retry_msg = crate::tui::Message::new( "system".to_string(), "Connection failed, retrying in 5s...".to_string(), false, ); let _ = message_tx.send(retry_msg); tokio::time::sleep(Duration::from_secs(5)).await; } else { // Try the next URL break; } } } } } Ok(()) } async fn try_connect_and_listen( &mut self, message_tx: &mpsc::UnboundedSender, url_str: &str, ) -> Result<()> { // Parse URL and create request with headers let url = Url::parse(url_str)?; let mut request = url.into_client_request()?; // Add User-Agent header request.headers_mut().insert( "User-Agent", HeaderValue::from_static("think-cli/0.1.0") ); // Connect with timeout let connect_future = connect_async(request); let (ws_stream, _response) = tokio::time::timeout( Duration::from_secs(10), connect_future ).await .context("Connection timeout")? .context("Failed to connect to jetstream")?; // Send a connection success message to the TUI let success_msg = crate::tui::Message::new( "system".to_string(), "Connected to jetstream! Listening for blips...".to_string(), false, ); let _ = message_tx.send(success_msg); let (mut _write, mut read) = ws_stream.split(); while let Some(msg) = read.next().await { match msg { Ok(Message::Text(text)) => { // Silently ignore message handling errors let _ = self.handle_message(&text, message_tx).await; } Ok(Message::Close(_)) => { break; } Err(e) => { return Err(anyhow::anyhow!("WebSocket error: {}", e)); } _ => { // Ignore other message types (binary, ping, pong) } } } Ok(()) } async fn handle_message( &mut self, message: &str, message_tx: &mpsc::UnboundedSender, ) -> Result<()> { // First, check if it's even a commit event using basic JSON parsing let event_value: serde_json::Value = serde_json::from_str(message)?; // Only process commit events if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") { return Ok(()); } // Check if it has a commit with the right collection let commit = event_value.get("commit"); if let Some(commit_obj) = commit { if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") { return Ok(()); } // Skip delete operations if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") { return Ok(()); } } else { return Ok(()); } // Now try to parse as our structured event let event: JetstreamEvent = serde_json::from_str(message)?; let commit = event.commit.as_ref().unwrap(); // Safe because we checked above // Skip messages from our own DID if let Some(ref own_did) = self.own_did { if &event.did == own_did { return Ok(()); } } // Parse the blip record let record_data = commit.record.as_ref(); if record_data.is_none() { return Ok(()); } let blip_record: BlipRecord = match serde_json::from_value(record_data.unwrap().clone()) { Ok(record) => record, Err(_) => return Ok(()), // Silently skip unparseable records }; // Get or resolve the handle let handle = self.resolve_did(&event.did).await; // Create TUI message let tui_message = TuiMessage::new( handle, blip_record.content, false, // Not our own message ); // Send to TUI if let Err(_) = message_tx.send(tui_message) { // Channel closed, probably shutting down return Err(anyhow::anyhow!("Message channel closed")); } Ok(()) } async fn resolve_did(&mut self, did: &str) -> String { // Check cache first if let Some(handle) = self.did_cache.get(did) { return handle.clone(); } // Try to resolve the DID to a handle let handle = match self.fetch_handle_for_did(did).await { Ok(h) => h, Err(_) => { // Fallback to showing just the DID (truncated) if did.len() > 20 { format!("{}...", &did[..20]) } else { did.to_string() } } }; // Cache the result self.did_cache.insert(did.to_string(), handle.clone()); handle } async fn fetch_handle_for_did(&self, did: &str) -> Result { // Use the ATProto API to resolve DID to handle let client = reqwest::Client::new(); let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did); #[derive(Deserialize)] struct ResolveResponse { handle: String, } // Try a simpler approach - resolve via profile let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did); #[derive(Deserialize)] struct ProfileResponse { handle: String, } let response = client .get(&profile_url) .send() .await?; if response.status().is_success() { let profile: ProfileResponse = response.json().await?; Ok(profile.handle) } else { Err(anyhow::anyhow!("Failed to resolve DID to handle")) } } } pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender, own_did: Option) -> Result<()> { let mut client = JetstreamClient::new(own_did); client.connect_and_listen(message_tx).await }