A Rust CLI for publishing thought records. Designed to work with thought.stream.
1use anyhow::{Context, Result};
2use futures_util::StreamExt;
3use serde::{Deserialize, Serialize};
4use std::{collections::HashMap, time::Duration};
5use tokio::sync::mpsc;
6use tokio_tungstenite::{
7 connect_async,
8 tungstenite::{
9 client::IntoClientRequest,
10 http::HeaderValue,
11 Message,
12 },
13};
14use url::Url;
15
16use crate::tui::Message as TuiMessage;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct JetstreamEvent {
20 #[serde(rename = "kind")]
21 pub kind: String,
22 #[serde(rename = "time_us")]
23 pub time_us: i64,
24 pub did: String,
25 pub commit: Option<CommitData>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct CommitData {
30 pub rev: String,
31 pub operation: String,
32 pub collection: String,
33 pub rkey: String,
34 pub record: Option<serde_json::Value>,
35 pub cid: String,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BlipRecord {
40 #[serde(rename = "$type")]
41 pub record_type: String,
42 pub content: String,
43 #[serde(rename = "createdAt")]
44 pub created_at: String,
45}
46
47pub struct JetstreamClient {
48 did_cache: HashMap<String, String>, // DID -> handle cache
49 own_did: Option<String>, // User's own DID to filter out
50}
51
52impl JetstreamClient {
53 pub fn new(own_did: Option<String>) -> Self {
54 Self {
55 did_cache: HashMap::new(),
56 own_did,
57 }
58 }
59
60 pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender<TuiMessage>) -> Result<()> {
61 // Try simple connection first, then with collection filter
62 let urls = vec![
63 "wss://jetstream2.us-west.bsky.network/subscribe",
64 "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip"
65 ];
66
67 for (i, jetstream_url) in urls.iter().enumerate() {
68 // Send status to TUI instead of console
69 let status_msg = crate::tui::Message::new(
70 "system".to_string(),
71 format!("Trying connection {} of {}", i + 1, urls.len()),
72 false,
73 );
74 let _ = message_tx.send(status_msg);
75
76 loop {
77 match self.try_connect_and_listen(&message_tx, jetstream_url).await {
78 Ok(_) => {
79 return Ok(());
80 }
81 Err(_e) => {
82 // If this is the last URL, retry it after a delay
83 if i == urls.len() - 1 {
84 let retry_msg = crate::tui::Message::new(
85 "system".to_string(),
86 "Connection failed, retrying in 5s...".to_string(),
87 false,
88 );
89 let _ = message_tx.send(retry_msg);
90 tokio::time::sleep(Duration::from_secs(5)).await;
91 } else {
92 // Try the next URL
93 break;
94 }
95 }
96 }
97 }
98 }
99
100 Ok(())
101 }
102
103 async fn try_connect_and_listen(
104 &mut self,
105 message_tx: &mpsc::UnboundedSender<TuiMessage>,
106 url_str: &str,
107 ) -> Result<()> {
108 // Parse URL and create request with headers
109 let url = Url::parse(url_str)?;
110 let mut request = url.into_client_request()?;
111
112 // Add User-Agent header
113 request.headers_mut().insert(
114 "User-Agent",
115 HeaderValue::from_static("think-cli/0.1.0")
116 );
117
118 // Connect with timeout
119 let connect_future = connect_async(request);
120 let (ws_stream, _response) = tokio::time::timeout(
121 Duration::from_secs(10),
122 connect_future
123 ).await
124 .context("Connection timeout")?
125 .context("Failed to connect to jetstream")?;
126
127 // Send a connection success message to the TUI
128 let success_msg = crate::tui::Message::new(
129 "system".to_string(),
130 "Connected to jetstream! Listening for blips...".to_string(),
131 false,
132 );
133 let _ = message_tx.send(success_msg);
134
135 let (mut _write, mut read) = ws_stream.split();
136
137 while let Some(msg) = read.next().await {
138 match msg {
139 Ok(Message::Text(text)) => {
140 // Silently ignore message handling errors
141 let _ = self.handle_message(&text, message_tx).await;
142 }
143 Ok(Message::Close(_)) => {
144 break;
145 }
146 Err(e) => {
147 return Err(anyhow::anyhow!("WebSocket error: {}", e));
148 }
149 _ => {
150 // Ignore other message types (binary, ping, pong)
151 }
152 }
153 }
154
155 Ok(())
156 }
157
158 async fn handle_message(
159 &mut self,
160 message: &str,
161 message_tx: &mpsc::UnboundedSender<TuiMessage>,
162 ) -> Result<()> {
163 // First, check if it's even a commit event using basic JSON parsing
164 let event_value: serde_json::Value = serde_json::from_str(message)?;
165
166 // Only process commit events
167 if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") {
168 return Ok(());
169 }
170
171 // Check if it has a commit with the right collection
172 let commit = event_value.get("commit");
173 if let Some(commit_obj) = commit {
174 if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") {
175 return Ok(());
176 }
177
178 // Skip delete operations
179 if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") {
180 return Ok(());
181 }
182 } else {
183 return Ok(());
184 }
185
186 // Now try to parse as our structured event
187 let event: JetstreamEvent = serde_json::from_str(message)?;
188 let commit = event.commit.as_ref().unwrap(); // Safe because we checked above
189
190 // Skip messages from our own DID
191 if let Some(ref own_did) = self.own_did {
192 if &event.did == own_did {
193 return Ok(());
194 }
195 }
196
197 // Parse the blip record
198 let record_data = commit.record.as_ref();
199 if record_data.is_none() {
200 return Ok(());
201 }
202
203 let blip_record: BlipRecord = match serde_json::from_value(record_data.unwrap().clone()) {
204 Ok(record) => record,
205 Err(_) => return Ok(()), // Silently skip unparseable records
206 };
207
208 // Get or resolve the handle
209 let handle = self.resolve_did(&event.did).await;
210
211 // Create TUI message
212 let tui_message = TuiMessage::new(
213 handle,
214 blip_record.content,
215 false, // Not our own message
216 );
217
218 // Send to TUI
219 if let Err(_) = message_tx.send(tui_message) {
220 // Channel closed, probably shutting down
221 return Err(anyhow::anyhow!("Message channel closed"));
222 }
223
224 Ok(())
225 }
226
227 async fn resolve_did(&mut self, did: &str) -> String {
228 // Check cache first
229 if let Some(handle) = self.did_cache.get(did) {
230 return handle.clone();
231 }
232
233 // Try to resolve the DID to a handle
234 let handle = match self.fetch_handle_for_did(did).await {
235 Ok(h) => h,
236 Err(_) => {
237 // Fallback to showing just the DID (truncated)
238 if did.len() > 20 {
239 format!("{}...", &did[..20])
240 } else {
241 did.to_string()
242 }
243 }
244 };
245
246 // Cache the result
247 self.did_cache.insert(did.to_string(), handle.clone());
248
249 handle
250 }
251
252 async fn fetch_handle_for_did(&self, did: &str) -> Result<String> {
253 // Use the ATProto API to resolve DID to handle
254 let client = reqwest::Client::new();
255 let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did);
256
257 #[derive(Deserialize)]
258 struct ResolveResponse {
259 handle: String,
260 }
261
262 // Try a simpler approach - resolve via profile
263 let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did);
264
265 #[derive(Deserialize)]
266 struct ProfileResponse {
267 handle: String,
268 }
269
270 let response = client
271 .get(&profile_url)
272 .send()
273 .await?;
274
275 if response.status().is_success() {
276 let profile: ProfileResponse = response.json().await?;
277 Ok(profile.handle)
278 } else {
279 Err(anyhow::anyhow!("Failed to resolve DID to handle"))
280 }
281 }
282}
283
284pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender<TuiMessage>, own_did: Option<String>) -> Result<()> {
285 let mut client = JetstreamClient::new(own_did);
286 client.connect_and_listen(message_tx).await
287}