A Rust CLI for publishing thought records. Designed to work with thought.stream.

adding sync for multiple terminal using global timestamps (copy of #1 because resubmitting did not work) #2

open opened by rubberducky.guru targeting main from rubberducky.guru/thought-stream-cli: main

Added sync functionality if you have several terminals open. Not much on it's own, but it also makes the timestamp of each messages use the created_at property from the blip record instead of just taking whatever the current time is before displaying it.

Labels

None yet.

Participants 1
AT URI
at://did:plc:rcuvobqkuogi6meqto3hf6jl/sh.tangled.repo.pull/3lynn3lgxtn22
+129 -82
Diff #0
+20 -15
src/client.rs
··· 1 1 use anyhow::{Context, Result}; 2 2 use chrono::Utc; 3 - use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}}; 3 + use reqwest::{ 4 + header::{HeaderMap, HeaderValue, AUTHORIZATION}, 5 + Client as HttpClient, 6 + }; 4 7 use serde::{Deserialize, Serialize}; 5 8 use serde_json::Value; 6 9 ··· 62 65 63 66 pub async fn login(&mut self, credentials: &Credentials) -> Result<()> { 64 67 let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url); 65 - 68 + 66 69 let request = LoginRequest { 67 70 identifier: credentials.username.clone(), 68 71 password: credentials.password.clone(), 69 72 }; 70 73 71 - let response = self.http_client 74 + let response = self 75 + .http_client 72 76 .post(&login_url) 73 77 .header("Content-Type", "application/json") 74 78 .json(&request) ··· 93 97 } 94 98 95 99 pub async fn publish_blip(&self, content: &str) -> Result<String> { 96 - let session = self.session.as_ref() 100 + let session = self 101 + .session 102 + .as_ref() 97 103 .context("Not authenticated. Please run 'thought login' first.")?; 98 104 105 + let timestamp = Utc::now().to_rfc3339().replace("+00:00", "Z"); 106 + 99 107 let record = BlipRecord { 100 108 record_type: "stream.thought.blip".to_string(), 101 109 content: content.to_string(), 102 - created_at: Utc::now().to_rfc3339().replace("+00:00", "Z"), 110 + created_at: timestamp.clone(), 103 111 }; 104 112 105 113 let request = CreateRecordRequest { 106 114 repo: session.did.clone(), 107 115 collection: "stream.thought.blip".to_string(), 108 - record: serde_json::to_value(&record) 109 - .context("Failed to serialize blip record")?, 116 + record: serde_json::to_value(&record).context("Failed to serialize blip record")?, 110 117 }; 111 118 112 119 let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url); 113 - 120 + 114 121 let mut headers = HeaderMap::new(); 115 122 headers.insert( 116 123 AUTHORIZATION, 117 124 HeaderValue::from_str(&format!("Bearer {}", session.access_jwt)) 118 125 .context("Invalid authorization header")?, 119 126 ); 120 - headers.insert( 121 - "Content-Type", 122 - HeaderValue::from_static("application/json"), 123 - ); 127 + headers.insert("Content-Type", HeaderValue::from_static("application/json")); 124 128 125 - let response = self.http_client 129 + let response = self 130 + .http_client 126 131 .post(&create_url) 127 132 .headers(headers) 128 133 .json(&request) ··· 141 146 .await 142 147 .context("Failed to parse create record response")?; 143 148 144 - Ok(create_response.uri) 149 + Ok(timestamp) 145 150 } 146 151 147 152 pub fn is_authenticated(&self) -> bool { ··· 151 156 pub fn get_user_did(&self) -> Option<String> { 152 157 self.session.as_ref().map(|s| s.did.clone()) 153 158 } 154 - } 159 + }
+60 -49
src/jetstream.rs
··· 1 1 use anyhow::{Context, Result}; 2 + use chrono::{DateTime, Utc}; 2 3 use futures_util::StreamExt; 3 4 use serde::{Deserialize, Serialize}; 4 5 use std::{collections::HashMap, time::Duration}; 5 6 use tokio::sync::mpsc; 6 7 use tokio_tungstenite::{ 7 8 connect_async, 8 - tungstenite::{ 9 - client::IntoClientRequest, 10 - http::HeaderValue, 11 - Message, 12 - }, 9 + tungstenite::{client::IntoClientRequest, http::HeaderValue, Message}, 13 10 }; 14 11 use url::Url; 15 12 ··· 46 43 47 44 pub struct JetstreamClient { 48 45 did_cache: HashMap<String, String>, // DID -> handle cache 49 - own_did: Option<String>, // User's own DID to filter out 46 + own_did: Option<String>, // User's own DID to filter out 50 47 } 51 48 52 49 impl JetstreamClient { ··· 57 54 } 58 55 } 59 56 60 - pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender<TuiMessage>) -> Result<()> { 57 + pub async fn connect_and_listen( 58 + &mut self, 59 + message_tx: mpsc::UnboundedSender<TuiMessage>, 60 + ) -> Result<()> { 61 61 // Try simple connection first, then with collection filter 62 62 let urls = vec![ 63 63 "wss://jetstream2.us-west.bsky.network/subscribe", 64 - "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip" 64 + "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip", 65 65 ]; 66 - 66 + 67 67 for (i, jetstream_url) in urls.iter().enumerate() { 68 68 // Send status to TUI instead of console 69 69 let status_msg = crate::tui::Message::new( 70 70 "system".to_string(), 71 71 format!("Trying connection {} of {}", i + 1, urls.len()), 72 72 false, 73 + None, 73 74 ); 74 75 let _ = message_tx.send(status_msg); 75 - 76 + 76 77 loop { 77 - match self.try_connect_and_listen(&message_tx, jetstream_url).await { 78 + match self 79 + .try_connect_and_listen(&message_tx, jetstream_url) 80 + .await 81 + { 78 82 Ok(_) => { 79 83 return Ok(()); 80 84 } ··· 85 89 "system".to_string(), 86 90 "Connection failed, retrying in 5s...".to_string(), 87 91 false, 92 + None, 88 93 ); 89 94 let _ = message_tx.send(retry_msg); 90 95 tokio::time::sleep(Duration::from_secs(5)).await; ··· 96 101 } 97 102 } 98 103 } 99 - 104 + 100 105 Ok(()) 101 106 } 102 107 ··· 108 113 // Parse URL and create request with headers 109 114 let url = Url::parse(url_str)?; 110 115 let mut request = url.into_client_request()?; 111 - 116 + 112 117 // Add User-Agent header 113 - request.headers_mut().insert( 114 - "User-Agent", 115 - HeaderValue::from_static("think-cli/0.1.0") 116 - ); 117 - 118 + request 119 + .headers_mut() 120 + .insert("User-Agent", HeaderValue::from_static("think-cli/0.1.0")); 121 + 118 122 // Connect with timeout 119 123 let connect_future = connect_async(request); 120 - let (ws_stream, _response) = tokio::time::timeout( 121 - Duration::from_secs(10), 122 - connect_future 123 - ).await 124 + let (ws_stream, _response) = tokio::time::timeout(Duration::from_secs(10), connect_future) 125 + .await 124 126 .context("Connection timeout")? 125 127 .context("Failed to connect to jetstream")?; 126 128 ··· 129 131 "system".to_string(), 130 132 "Connected to jetstream! Listening for blips...".to_string(), 131 133 false, 134 + None, 132 135 ); 133 136 let _ = message_tx.send(success_msg); 134 137 ··· 162 165 ) -> Result<()> { 163 166 // First, check if it's even a commit event using basic JSON parsing 164 167 let event_value: serde_json::Value = serde_json::from_str(message)?; 165 - 168 + 166 169 // Only process commit events 167 170 if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") { 168 171 return Ok(()); 169 172 } 170 - 173 + 171 174 // Check if it has a commit with the right collection 172 175 let commit = event_value.get("commit"); 173 176 if let Some(commit_obj) = commit { 174 - if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") { 177 + if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") 178 + { 175 179 return Ok(()); 176 180 } 177 - 181 + 178 182 // Skip delete operations 179 183 if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") { 180 184 return Ok(()); ··· 184 188 185 189 186 190 187 - 191 + let event: JetstreamEvent = serde_json::from_str(message)?; 188 192 let commit = event.commit.as_ref().unwrap(); // Safe because we checked above 189 193 190 - // Skip messages from our own DID 191 - if let Some(ref own_did) = self.own_did { 192 - if &event.did == own_did { 193 - return Ok(()); 194 - } 195 - } 196 - 197 194 // Parse the blip record 198 195 let record_data = commit.record.as_ref(); 196 + if record_data.is_none() { 199 197 200 198 201 199 202 200 203 201 204 - 205 - 202 + Err(_) => return Ok(()), // Silently skip unparseable records 206 203 }; 207 204 208 - // Get or resolve the handle 209 - let handle = self.resolve_did(&event.did).await; 205 + let is_own = self.own_did.as_ref().is_some_and(|own| own == &event.did); 206 + let handle = if is_own { 207 + "you".into() 208 + } else { 209 + // Get or resolve the handle 210 + self.resolve_did(&event.did).await 211 + }; 210 212 211 213 // Create TUI message 212 214 let tui_message = TuiMessage::new( 213 215 handle, 214 216 blip_record.content, 215 - false, // Not our own message 217 + is_own, 218 + DateTime::parse_from_rfc3339(&blip_record.created_at) 219 + .map(|dt| dt.with_timezone(&Utc)) 220 + .ok(), // Parse RFC3339 โ†’ UTC, None if invalid (so current timestamp instead) 216 221 ); 217 222 218 223 // Send to TUI ··· 252 257 async fn fetch_handle_for_did(&self, did: &str) -> Result<String> { 253 258 // Use the ATProto API to resolve DID to handle 254 259 let client = reqwest::Client::new(); 255 - let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did); 256 - 260 + let url = format!( 261 + "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", 262 + did 263 + ); 264 + 257 265 #[derive(Deserialize)] 258 266 struct ResolveResponse { 259 267 handle: String, 260 268 } 261 269 262 270 // Try a simpler approach - resolve via profile 263 - let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did); 264 - 271 + let profile_url = format!( 272 + "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", 273 + did 274 + ); 275 + 265 276 #[derive(Deserialize)] 266 277 struct ProfileResponse { 267 278 handle: String, 268 279 } 269 280 270 - let response = client 271 - .get(&profile_url) 272 - .send() 273 - .await?; 281 + let response = client.get(&profile_url).send().await?; 274 282 275 283 if response.status().is_success() { 276 284 let profile: ProfileResponse = response.json().await?; ··· 281 289 } 282 290 } 283 291 284 - pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender<TuiMessage>, own_did: Option<String>) -> Result<()> { 292 + pub async fn start_jetstream_listener( 293 + message_tx: mpsc::UnboundedSender<TuiMessage>, 294 + own_did: Option<String>, 295 + ) -> Result<()> { 285 296 let mut client = JetstreamClient::new(own_did); 286 297 client.connect_and_listen(message_tx).await 287 - } 298 + }
+49 -18
src/tui.rs
··· 30 30 } 31 31 32 32 impl Message { 33 - pub fn new(handle: String, content: String, is_own: bool) -> Self { 33 + pub fn new( 34 + handle: String, 35 + content: String, 36 + is_own: bool, 37 + timestamp: Option<DateTime<Utc>>, 38 + ) -> Self { 34 39 Self { 35 40 handle, 36 41 content, 37 - timestamp: Utc::now(), 42 + timestamp: timestamp.unwrap_or_else(Utc::now), 38 43 is_own, 39 44 } 40 45 } ··· 71 76 pub fn add_message(&mut self, message: Message) { 72 77 self.messages.push(message); 73 78 self.message_count += 1; 74 - 79 + 75 80 // Keep only last 1000 messages 76 81 if self.messages.len() > 1000 { 77 82 self.messages.remove(0); 78 83 } 79 - 84 + 80 85 // Auto-scroll to bottom unless user is scrolling up 81 86 if self.scroll_offset == 0 { 82 87 self.scroll_offset = 0; // Stay at bottom ··· 139 144 let vertical = Layout::default() 140 145 .direction(Direction::Vertical) 141 146 .constraints([ 142 - Constraint::Min(0), // Messages area 143 - Constraint::Length(3), // Status area 144 - Constraint::Length(3), // Input area 147 + Constraint::Min(0), // Messages area 148 + Constraint::Length(3), // Status area 149 + Constraint::Length(3), // Input area 145 150 ]) 146 151 .split(frame.area()); 147 152 148 153 // Render messages 149 154 let mut message_lines = Vec::new(); 150 - 155 + 151 156 // Convert messages to styled lines in reverse chronological order (newest first) 152 157 for msg in self.messages.iter().rev() { 153 158 let style = if msg.is_own { 154 - Style::default().fg(Color::Green).add_modifier(Modifier::BOLD) 159 + Style::default() 160 + .fg(Color::Green) 161 + .add_modifier(Modifier::BOLD) 155 162 } else { 156 163 Style::default().fg(Color::White) 157 164 }; 158 - 165 + 159 166 message_lines.push(Line::from(Span::styled(msg.format_display(), style))); 160 167 } 161 - 168 + 162 169 let messages_text = Text::from(message_lines); 163 170 let messages_paragraph = Paragraph::new(messages_text) 164 171 .block(Block::default().borders(Borders::ALL).title("Messages")) ··· 172 179 } else { 173 180 Style::default().fg(Color::Yellow) 174 181 }; 175 - 182 + 176 183 let status_paragraph = Paragraph::new(self.status.clone()) 177 184 .style(status_style) 178 185 .block(Block::default().borders(Borders::ALL).title("Status")); 179 186 frame.render_widget(status_paragraph, vertical[1]); 180 187 181 188 // Render input 182 - let input_paragraph = Paragraph::new(self.input.clone()) 183 - .block(Block::default().borders(Borders::ALL).title("Input (Esc to quit)")); 189 + let input_paragraph = Paragraph::new(self.input.clone()).block( 190 + Block::default() 191 + .borders(Borders::ALL) 192 + .title("Input (Esc to quit)"), 193 + ); 184 194 frame.render_widget(input_paragraph, vertical[2]); 185 195 } 186 196 } ··· 197 207 let mut terminal = Terminal::new(backend)?; 198 208 199 209 let mut app = TuiApp::new(); 200 - 210 + 201 211 // Add welcome message 202 212 app.add_message(Message::new( 203 213 "system".to_string(), 204 214 "Welcome to Think TUI! Connecting to jetstream...".to_string(), 205 215 false, 216 + None, 206 217 )); 207 218 208 219 let result = run_tui_loop(&mut terminal, &mut app, client, &mut message_rx).await; ··· 234 245 if let Event::Key(key) = event::read()? { 235 246 if key.kind == KeyEventKind::Press { 236 247 // Handle Ctrl+C 237 - if matches!(key.code, KeyCode::Char('c')) && key.modifiers.contains(crossterm::event::KeyModifiers::CONTROL) { 248 + if matches!(key.code, KeyCode::Char('c')) 249 + && key 250 + .modifiers 251 + .contains(crossterm::event::KeyModifiers::CONTROL) 252 + { 238 253 break; 239 254 } 240 255 ··· 242 257 if let Some(message) = app.handle_input(key.code) { 243 258 // Publish the message 244 259 match client.publish_blip(&message).await { 245 - Ok(_) => { 260 + Ok(t) => { 246 261 // Add our own message to the display 247 262 app.add_message(Message::new( 248 263 "you".to_string(), 249 264 message, 250 265 true, 266 + DateTime::parse_from_rfc3339(&t) 267 + .map(|dt| dt.with_timezone(&Utc)) 268 + .ok(), // Parse RFC3339 โ†’ UTC, None if invalid (so current timestamp instead) 251 269 )); 252 270 } 253 271 Err(e) => { ··· 256 274 "error".to_string(), 257 275 format!("Failed to publish: {}", e), 258 276 false, 277 + None, 259 278 )); 260 279 } 261 280 } ··· 266 285 267 286 // Check for new messages from jetstream 268 287 while let Ok(message) = message_rx.try_recv() { 288 + // find most recent is_own message and see if it's already there (you posted it) 289 + let duplicate = app 290 + .messages 291 + .iter() 292 + .rev() 293 + .find(|m| m.is_own) 294 + .is_some_and(|m| m.timestamp == message.timestamp); 295 + 296 + if duplicate { 297 + continue; 298 + } 299 + 269 300 app.add_message(message); 270 301 app.set_connection_status(true); 271 302 } ··· 277 308 } 278 309 279 310 Ok(()) 280 - } 311 + }

History

1 round 0 comments
sign up or login to add to the discussion
rubberducky.guru submitted #0
3 commits
expand
added multiple terminal sync using global timestamps
cleanup
removed use of unwrap and unused PartialEq
merge conflicts detected
expand
  • src/client.rs:1
  • src/tui.rs:242
  • src/tui.rs:294
  • src/tui.rs:21
expand 0 comments