A Rust CLI for publishing thought records. Designed to work with thought.stream.
at main 9.2 kB view raw
1use anyhow::{Context, Result}; 2use futures_util::StreamExt; 3use serde::{Deserialize, Serialize}; 4use std::{collections::HashMap, time::Duration}; 5use tokio::sync::mpsc; 6use tokio_tungstenite::{ 7 connect_async, 8 tungstenite::{ 9 client::IntoClientRequest, 10 http::HeaderValue, 11 Message, 12 }, 13}; 14use url::Url; 15 16use crate::tui::Message as TuiMessage; 17 18#[derive(Debug, Clone, Serialize, Deserialize)] 19pub struct JetstreamEvent { 20 #[serde(rename = "kind")] 21 pub kind: String, 22 #[serde(rename = "time_us")] 23 pub time_us: i64, 24 pub did: String, 25 pub commit: Option<CommitData>, 26} 27 28#[derive(Debug, Clone, Serialize, Deserialize)] 29pub struct CommitData { 30 pub rev: String, 31 pub operation: String, 32 pub collection: String, 33 pub rkey: String, 34 pub record: Option<serde_json::Value>, 35 pub cid: String, 36} 37 38#[derive(Debug, Clone, Serialize, Deserialize)] 39pub struct BlipRecord { 40 #[serde(rename = "$type")] 41 pub record_type: String, 42 pub content: String, 43 #[serde(rename = "createdAt")] 44 pub created_at: String, 45} 46 47pub struct JetstreamClient { 48 did_cache: HashMap<String, String>, // DID -> handle cache 49 own_did: Option<String>, // User's own DID to filter out 50} 51 52impl JetstreamClient { 53 pub fn new(own_did: Option<String>) -> Self { 54 Self { 55 did_cache: HashMap::new(), 56 own_did, 57 } 58 } 59 60 pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender<TuiMessage>) -> Result<()> { 61 // Try simple connection first, then with collection filter 62 let urls = vec![ 63 "wss://jetstream2.us-west.bsky.network/subscribe", 64 "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip" 65 ]; 66 67 for (i, jetstream_url) in urls.iter().enumerate() { 68 // Send status to TUI instead of console 69 let status_msg = crate::tui::Message::new( 70 "system".to_string(), 71 format!("Trying connection {} of {}", i + 1, urls.len()), 72 false, 73 ); 74 let _ = message_tx.send(status_msg); 75 76 loop { 77 match self.try_connect_and_listen(&message_tx, jetstream_url).await { 78 Ok(_) => { 79 return Ok(()); 80 } 81 Err(_e) => { 82 // If this is the last URL, retry it after a delay 83 if i == urls.len() - 1 { 84 let retry_msg = crate::tui::Message::new( 85 "system".to_string(), 86 "Connection failed, retrying in 5s...".to_string(), 87 false, 88 ); 89 let _ = message_tx.send(retry_msg); 90 tokio::time::sleep(Duration::from_secs(5)).await; 91 } else { 92 // Try the next URL 93 break; 94 } 95 } 96 } 97 } 98 } 99 100 Ok(()) 101 } 102 103 async fn try_connect_and_listen( 104 &mut self, 105 message_tx: &mpsc::UnboundedSender<TuiMessage>, 106 url_str: &str, 107 ) -> Result<()> { 108 // Parse URL and create request with headers 109 let url = Url::parse(url_str)?; 110 let mut request = url.into_client_request()?; 111 112 // Add User-Agent header 113 request.headers_mut().insert( 114 "User-Agent", 115 HeaderValue::from_static("think-cli/0.1.0") 116 ); 117 118 // Connect with timeout 119 let connect_future = connect_async(request); 120 let (ws_stream, _response) = tokio::time::timeout( 121 Duration::from_secs(10), 122 connect_future 123 ).await 124 .context("Connection timeout")? 125 .context("Failed to connect to jetstream")?; 126 127 // Send a connection success message to the TUI 128 let success_msg = crate::tui::Message::new( 129 "system".to_string(), 130 "Connected to jetstream! Listening for blips...".to_string(), 131 false, 132 ); 133 let _ = message_tx.send(success_msg); 134 135 let (mut _write, mut read) = ws_stream.split(); 136 137 while let Some(msg) = read.next().await { 138 match msg { 139 Ok(Message::Text(text)) => { 140 // Silently ignore message handling errors 141 let _ = self.handle_message(&text, message_tx).await; 142 } 143 Ok(Message::Close(_)) => { 144 break; 145 } 146 Err(e) => { 147 return Err(anyhow::anyhow!("WebSocket error: {}", e)); 148 } 149 _ => { 150 // Ignore other message types (binary, ping, pong) 151 } 152 } 153 } 154 155 Ok(()) 156 } 157 158 async fn handle_message( 159 &mut self, 160 message: &str, 161 message_tx: &mpsc::UnboundedSender<TuiMessage>, 162 ) -> Result<()> { 163 // First, check if it's even a commit event using basic JSON parsing 164 let event_value: serde_json::Value = serde_json::from_str(message)?; 165 166 // Only process commit events 167 if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") { 168 return Ok(()); 169 } 170 171 // Check if it has a commit with the right collection 172 let commit = event_value.get("commit"); 173 if let Some(commit_obj) = commit { 174 if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") { 175 return Ok(()); 176 } 177 178 // Skip delete operations 179 if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") { 180 return Ok(()); 181 } 182 } else { 183 return Ok(()); 184 } 185 186 // Now try to parse as our structured event 187 let event: JetstreamEvent = serde_json::from_str(message)?; 188 let commit = event.commit.as_ref().unwrap(); // Safe because we checked above 189 190 // Skip messages from our own DID 191 if let Some(ref own_did) = self.own_did { 192 if &event.did == own_did { 193 return Ok(()); 194 } 195 } 196 197 // Parse the blip record 198 let record_data = commit.record.as_ref(); 199 if record_data.is_none() { 200 return Ok(()); 201 } 202 203 let blip_record: BlipRecord = match serde_json::from_value(record_data.unwrap().clone()) { 204 Ok(record) => record, 205 Err(_) => return Ok(()), // Silently skip unparseable records 206 }; 207 208 // Get or resolve the handle 209 let handle = self.resolve_did(&event.did).await; 210 211 // Create TUI message 212 let tui_message = TuiMessage::new( 213 handle, 214 blip_record.content, 215 false, // Not our own message 216 ); 217 218 // Send to TUI 219 if let Err(_) = message_tx.send(tui_message) { 220 // Channel closed, probably shutting down 221 return Err(anyhow::anyhow!("Message channel closed")); 222 } 223 224 Ok(()) 225 } 226 227 async fn resolve_did(&mut self, did: &str) -> String { 228 // Check cache first 229 if let Some(handle) = self.did_cache.get(did) { 230 return handle.clone(); 231 } 232 233 // Try to resolve the DID to a handle 234 let handle = match self.fetch_handle_for_did(did).await { 235 Ok(h) => h, 236 Err(_) => { 237 // Fallback to showing just the DID (truncated) 238 if did.len() > 20 { 239 format!("{}...", &did[..20]) 240 } else { 241 did.to_string() 242 } 243 } 244 }; 245 246 // Cache the result 247 self.did_cache.insert(did.to_string(), handle.clone()); 248 249 handle 250 } 251 252 async fn fetch_handle_for_did(&self, did: &str) -> Result<String> { 253 // Use the ATProto API to resolve DID to handle 254 let client = reqwest::Client::new(); 255 let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did); 256 257 #[derive(Deserialize)] 258 struct ResolveResponse { 259 handle: String, 260 } 261 262 // Try a simpler approach - resolve via profile 263 let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did); 264 265 #[derive(Deserialize)] 266 struct ProfileResponse { 267 handle: String, 268 } 269 270 let response = client 271 .get(&profile_url) 272 .send() 273 .await?; 274 275 if response.status().is_success() { 276 let profile: ProfileResponse = response.json().await?; 277 Ok(profile.handle) 278 } else { 279 Err(anyhow::anyhow!("Failed to resolve DID to handle")) 280 } 281 } 282} 283 284pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender<TuiMessage>, own_did: Option<String>) -> Result<()> { 285 let mut client = JetstreamClient::new(own_did); 286 client.connect_and_listen(message_tx).await 287}