Added sync functionality if you have several terminals open. Not much on it's own, but it also makes the timestamp of each messages use the created_at property from the blip record instead of just taking whatever the current time is before displaying it.
+20
-15
src/client.rs
+20
-15
src/client.rs
···
1
1
use anyhow::{Context, Result};
2
2
use chrono::Utc;
3
-
use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}};
3
+
use reqwest::{
4
+
header::{HeaderMap, HeaderValue, AUTHORIZATION},
5
+
Client as HttpClient,
6
+
};
4
7
use serde::{Deserialize, Serialize};
5
8
use serde_json::Value;
6
9
···
62
65
63
66
pub async fn login(&mut self, credentials: &Credentials) -> Result<()> {
64
67
let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url);
65
-
68
+
66
69
let request = LoginRequest {
67
70
identifier: credentials.username.clone(),
68
71
password: credentials.password.clone(),
69
72
};
70
73
71
-
let response = self.http_client
74
+
let response = self
75
+
.http_client
72
76
.post(&login_url)
73
77
.header("Content-Type", "application/json")
74
78
.json(&request)
···
93
97
}
94
98
95
99
pub async fn publish_blip(&self, content: &str) -> Result<String> {
96
-
let session = self.session.as_ref()
100
+
let session = self
101
+
.session
102
+
.as_ref()
97
103
.context("Not authenticated. Please run 'thought login' first.")?;
98
104
105
+
let timestamp = Utc::now().to_rfc3339().replace("+00:00", "Z");
106
+
99
107
let record = BlipRecord {
100
108
record_type: "stream.thought.blip".to_string(),
101
109
content: content.to_string(),
102
-
created_at: Utc::now().to_rfc3339().replace("+00:00", "Z"),
110
+
created_at: timestamp.clone(),
103
111
};
104
112
105
113
let request = CreateRecordRequest {
106
114
repo: session.did.clone(),
107
115
collection: "stream.thought.blip".to_string(),
108
-
record: serde_json::to_value(&record)
109
-
.context("Failed to serialize blip record")?,
116
+
record: serde_json::to_value(&record).context("Failed to serialize blip record")?,
110
117
};
111
118
112
119
let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url);
113
-
120
+
114
121
let mut headers = HeaderMap::new();
115
122
headers.insert(
116
123
AUTHORIZATION,
117
124
HeaderValue::from_str(&format!("Bearer {}", session.access_jwt))
118
125
.context("Invalid authorization header")?,
119
126
);
120
-
headers.insert(
121
-
"Content-Type",
122
-
HeaderValue::from_static("application/json"),
123
-
);
127
+
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
124
128
125
-
let response = self.http_client
129
+
let response = self
130
+
.http_client
126
131
.post(&create_url)
127
132
.headers(headers)
128
133
.json(&request)
···
141
146
.await
142
147
.context("Failed to parse create record response")?;
143
148
144
-
Ok(create_response.uri)
149
+
Ok(timestamp)
145
150
}
146
151
147
152
pub fn is_authenticated(&self) -> bool {
···
151
156
pub fn get_user_did(&self) -> Option<String> {
152
157
self.session.as_ref().map(|s| s.did.clone())
153
158
}
154
-
}
159
+
}
+60
-49
src/jetstream.rs
+60
-49
src/jetstream.rs
···
1
1
use anyhow::{Context, Result};
2
+
use chrono::{DateTime, Utc};
2
3
use futures_util::StreamExt;
3
4
use serde::{Deserialize, Serialize};
4
5
use std::{collections::HashMap, time::Duration};
5
6
use tokio::sync::mpsc;
6
7
use tokio_tungstenite::{
7
8
connect_async,
8
-
tungstenite::{
9
-
client::IntoClientRequest,
10
-
http::HeaderValue,
11
-
Message,
12
-
},
9
+
tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
13
10
};
14
11
use url::Url;
15
12
···
46
43
47
44
pub struct JetstreamClient {
48
45
did_cache: HashMap<String, String>, // DID -> handle cache
49
-
own_did: Option<String>, // User's own DID to filter out
46
+
own_did: Option<String>, // User's own DID to filter out
50
47
}
51
48
52
49
impl JetstreamClient {
···
57
54
}
58
55
}
59
56
60
-
pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender<TuiMessage>) -> Result<()> {
57
+
pub async fn connect_and_listen(
58
+
&mut self,
59
+
message_tx: mpsc::UnboundedSender<TuiMessage>,
60
+
) -> Result<()> {
61
61
// Try simple connection first, then with collection filter
62
62
let urls = vec![
63
63
"wss://jetstream2.us-west.bsky.network/subscribe",
64
-
"wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip"
64
+
"wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip",
65
65
];
66
-
66
+
67
67
for (i, jetstream_url) in urls.iter().enumerate() {
68
68
// Send status to TUI instead of console
69
69
let status_msg = crate::tui::Message::new(
70
70
"system".to_string(),
71
71
format!("Trying connection {} of {}", i + 1, urls.len()),
72
72
false,
73
+
None,
73
74
);
74
75
let _ = message_tx.send(status_msg);
75
-
76
+
76
77
loop {
77
-
match self.try_connect_and_listen(&message_tx, jetstream_url).await {
78
+
match self
79
+
.try_connect_and_listen(&message_tx, jetstream_url)
80
+
.await
81
+
{
78
82
Ok(_) => {
79
83
return Ok(());
80
84
}
···
85
89
"system".to_string(),
86
90
"Connection failed, retrying in 5s...".to_string(),
87
91
false,
92
+
None,
88
93
);
89
94
let _ = message_tx.send(retry_msg);
90
95
tokio::time::sleep(Duration::from_secs(5)).await;
···
96
101
}
97
102
}
98
103
}
99
-
104
+
100
105
Ok(())
101
106
}
102
107
···
108
113
// Parse URL and create request with headers
109
114
let url = Url::parse(url_str)?;
110
115
let mut request = url.into_client_request()?;
111
-
116
+
112
117
// Add User-Agent header
113
-
request.headers_mut().insert(
114
-
"User-Agent",
115
-
HeaderValue::from_static("think-cli/0.1.0")
116
-
);
117
-
118
+
request
119
+
.headers_mut()
120
+
.insert("User-Agent", HeaderValue::from_static("think-cli/0.1.0"));
121
+
118
122
// Connect with timeout
119
123
let connect_future = connect_async(request);
120
-
let (ws_stream, _response) = tokio::time::timeout(
121
-
Duration::from_secs(10),
122
-
connect_future
123
-
).await
124
+
let (ws_stream, _response) = tokio::time::timeout(Duration::from_secs(10), connect_future)
125
+
.await
124
126
.context("Connection timeout")?
125
127
.context("Failed to connect to jetstream")?;
126
128
···
129
131
"system".to_string(),
130
132
"Connected to jetstream! Listening for blips...".to_string(),
131
133
false,
134
+
None,
132
135
);
133
136
let _ = message_tx.send(success_msg);
134
137
···
162
165
) -> Result<()> {
163
166
// First, check if it's even a commit event using basic JSON parsing
164
167
let event_value: serde_json::Value = serde_json::from_str(message)?;
165
-
168
+
166
169
// Only process commit events
167
170
if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") {
168
171
return Ok(());
169
172
}
170
-
173
+
171
174
// Check if it has a commit with the right collection
172
175
let commit = event_value.get("commit");
173
176
if let Some(commit_obj) = commit {
174
-
if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") {
177
+
if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip")
178
+
{
175
179
return Ok(());
176
180
}
177
-
181
+
178
182
// Skip delete operations
179
183
if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") {
180
184
return Ok(());
···
184
188
185
189
186
190
187
-
191
+
let event: JetstreamEvent = serde_json::from_str(message)?;
188
192
let commit = event.commit.as_ref().unwrap(); // Safe because we checked above
189
193
190
-
// Skip messages from our own DID
191
-
if let Some(ref own_did) = self.own_did {
192
-
if &event.did == own_did {
193
-
return Ok(());
194
-
}
195
-
}
196
-
197
194
// Parse the blip record
198
195
let record_data = commit.record.as_ref();
196
+
if record_data.is_none() {
199
197
200
198
201
199
202
200
203
201
204
-
205
-
202
+
Err(_) => return Ok(()), // Silently skip unparseable records
206
203
};
207
204
208
-
// Get or resolve the handle
209
-
let handle = self.resolve_did(&event.did).await;
205
+
let is_own = self.own_did.as_ref().is_some_and(|own| own == &event.did);
206
+
let handle = if is_own {
207
+
"you".into()
208
+
} else {
209
+
// Get or resolve the handle
210
+
self.resolve_did(&event.did).await
211
+
};
210
212
211
213
// Create TUI message
212
214
let tui_message = TuiMessage::new(
213
215
handle,
214
216
blip_record.content,
215
-
false, // Not our own message
217
+
is_own,
218
+
DateTime::parse_from_rfc3339(&blip_record.created_at)
219
+
.map(|dt| dt.with_timezone(&Utc))
220
+
.ok(), // Parse RFC3339 โ UTC, None if invalid (so current timestamp instead)
216
221
);
217
222
218
223
// Send to TUI
···
252
257
async fn fetch_handle_for_did(&self, did: &str) -> Result<String> {
253
258
// Use the ATProto API to resolve DID to handle
254
259
let client = reqwest::Client::new();
255
-
let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did);
256
-
260
+
let url = format!(
261
+
"https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}",
262
+
did
263
+
);
264
+
257
265
#[derive(Deserialize)]
258
266
struct ResolveResponse {
259
267
handle: String,
260
268
}
261
269
262
270
// Try a simpler approach - resolve via profile
263
-
let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did);
264
-
271
+
let profile_url = format!(
272
+
"https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}",
273
+
did
274
+
);
275
+
265
276
#[derive(Deserialize)]
266
277
struct ProfileResponse {
267
278
handle: String,
268
279
}
269
280
270
-
let response = client
271
-
.get(&profile_url)
272
-
.send()
273
-
.await?;
281
+
let response = client.get(&profile_url).send().await?;
274
282
275
283
if response.status().is_success() {
276
284
let profile: ProfileResponse = response.json().await?;
···
281
289
}
282
290
}
283
291
284
-
pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender<TuiMessage>, own_did: Option<String>) -> Result<()> {
292
+
pub async fn start_jetstream_listener(
293
+
message_tx: mpsc::UnboundedSender<TuiMessage>,
294
+
own_did: Option<String>,
295
+
) -> Result<()> {
285
296
let mut client = JetstreamClient::new(own_did);
286
297
client.connect_and_listen(message_tx).await
287
-
}
298
+
}
+49
-18
src/tui.rs
+49
-18
src/tui.rs
···
30
30
}
31
31
32
32
impl Message {
33
-
pub fn new(handle: String, content: String, is_own: bool) -> Self {
33
+
pub fn new(
34
+
handle: String,
35
+
content: String,
36
+
is_own: bool,
37
+
timestamp: Option<DateTime<Utc>>,
38
+
) -> Self {
34
39
Self {
35
40
handle,
36
41
content,
37
-
timestamp: Utc::now(),
42
+
timestamp: timestamp.unwrap_or_else(Utc::now),
38
43
is_own,
39
44
}
40
45
}
···
71
76
pub fn add_message(&mut self, message: Message) {
72
77
self.messages.push(message);
73
78
self.message_count += 1;
74
-
79
+
75
80
// Keep only last 1000 messages
76
81
if self.messages.len() > 1000 {
77
82
self.messages.remove(0);
78
83
}
79
-
84
+
80
85
// Auto-scroll to bottom unless user is scrolling up
81
86
if self.scroll_offset == 0 {
82
87
self.scroll_offset = 0; // Stay at bottom
···
139
144
let vertical = Layout::default()
140
145
.direction(Direction::Vertical)
141
146
.constraints([
142
-
Constraint::Min(0), // Messages area
143
-
Constraint::Length(3), // Status area
144
-
Constraint::Length(3), // Input area
147
+
Constraint::Min(0), // Messages area
148
+
Constraint::Length(3), // Status area
149
+
Constraint::Length(3), // Input area
145
150
])
146
151
.split(frame.area());
147
152
148
153
// Render messages
149
154
let mut message_lines = Vec::new();
150
-
155
+
151
156
// Convert messages to styled lines in reverse chronological order (newest first)
152
157
for msg in self.messages.iter().rev() {
153
158
let style = if msg.is_own {
154
-
Style::default().fg(Color::Green).add_modifier(Modifier::BOLD)
159
+
Style::default()
160
+
.fg(Color::Green)
161
+
.add_modifier(Modifier::BOLD)
155
162
} else {
156
163
Style::default().fg(Color::White)
157
164
};
158
-
165
+
159
166
message_lines.push(Line::from(Span::styled(msg.format_display(), style)));
160
167
}
161
-
168
+
162
169
let messages_text = Text::from(message_lines);
163
170
let messages_paragraph = Paragraph::new(messages_text)
164
171
.block(Block::default().borders(Borders::ALL).title("Messages"))
···
172
179
} else {
173
180
Style::default().fg(Color::Yellow)
174
181
};
175
-
182
+
176
183
let status_paragraph = Paragraph::new(self.status.clone())
177
184
.style(status_style)
178
185
.block(Block::default().borders(Borders::ALL).title("Status"));
179
186
frame.render_widget(status_paragraph, vertical[1]);
180
187
181
188
// Render input
182
-
let input_paragraph = Paragraph::new(self.input.clone())
183
-
.block(Block::default().borders(Borders::ALL).title("Input (Esc to quit)"));
189
+
let input_paragraph = Paragraph::new(self.input.clone()).block(
190
+
Block::default()
191
+
.borders(Borders::ALL)
192
+
.title("Input (Esc to quit)"),
193
+
);
184
194
frame.render_widget(input_paragraph, vertical[2]);
185
195
}
186
196
}
···
197
207
let mut terminal = Terminal::new(backend)?;
198
208
199
209
let mut app = TuiApp::new();
200
-
210
+
201
211
// Add welcome message
202
212
app.add_message(Message::new(
203
213
"system".to_string(),
204
214
"Welcome to Think TUI! Connecting to jetstream...".to_string(),
205
215
false,
216
+
None,
206
217
));
207
218
208
219
let result = run_tui_loop(&mut terminal, &mut app, client, &mut message_rx).await;
···
234
245
if let Event::Key(key) = event::read()? {
235
246
if key.kind == KeyEventKind::Press {
236
247
// Handle Ctrl+C
237
-
if matches!(key.code, KeyCode::Char('c')) && key.modifiers.contains(crossterm::event::KeyModifiers::CONTROL) {
248
+
if matches!(key.code, KeyCode::Char('c'))
249
+
&& key
250
+
.modifiers
251
+
.contains(crossterm::event::KeyModifiers::CONTROL)
252
+
{
238
253
break;
239
254
}
240
255
···
242
257
if let Some(message) = app.handle_input(key.code) {
243
258
// Publish the message
244
259
match client.publish_blip(&message).await {
245
-
Ok(_) => {
260
+
Ok(t) => {
246
261
// Add our own message to the display
247
262
app.add_message(Message::new(
248
263
"you".to_string(),
249
264
message,
250
265
true,
266
+
DateTime::parse_from_rfc3339(&t)
267
+
.map(|dt| dt.with_timezone(&Utc))
268
+
.ok(), // Parse RFC3339 โ UTC, None if invalid (so current timestamp instead)
251
269
));
252
270
}
253
271
Err(e) => {
···
256
274
"error".to_string(),
257
275
format!("Failed to publish: {}", e),
258
276
false,
277
+
None,
259
278
));
260
279
}
261
280
}
···
266
285
267
286
// Check for new messages from jetstream
268
287
while let Ok(message) = message_rx.try_recv() {
288
+
// find most recent is_own message and see if it's already there (you posted it)
289
+
let duplicate = app
290
+
.messages
291
+
.iter()
292
+
.rev()
293
+
.find(|m| m.is_own)
294
+
.is_some_and(|m| m.timestamp == message.timestamp);
295
+
296
+
if duplicate {
297
+
continue;
298
+
}
299
+
269
300
app.add_message(message);
270
301
app.set_connection_status(true);
271
302
}
···
277
308
}
278
309
279
310
Ok(())
280
-
}
311
+
}
History
1 round
0 comments
rubberducky.guru
submitted
#0
3 commits
expand
collapse
added multiple terminal sync using global timestamps
cleanup
removed use of unwrap and unused PartialEq
merge conflicts detected
expand
collapse
expand
collapse
- src/client.rs:1
- src/tui.rs:242
- src/tui.rs:294
- src/tui.rs:21