Added sync functionality if you have several terminals open. Not much on it's own, but it also makes the timestamp of each messages use the created_at property from the blip record instead of just taking whatever the current time is before displaying it.
+20
-15
src/client.rs
+20
-15
src/client.rs
···
1
1
use anyhow::{Context, Result};
2
2
use chrono::Utc;
3
-
use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}};
3
+
use reqwest::{
4
+
header::{HeaderMap, HeaderValue, AUTHORIZATION},
5
+
Client as HttpClient,
6
+
};
4
7
use serde::{Deserialize, Serialize};
5
8
use serde_json::Value;
6
9
···
62
65
63
66
pub async fn login(&mut self, credentials: &Credentials) -> Result<()> {
64
67
let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url);
65
-
68
+
66
69
let request = LoginRequest {
67
70
identifier: credentials.username.clone(),
68
71
password: credentials.password.clone(),
69
72
};
70
73
71
-
let response = self.http_client
74
+
let response = self
75
+
.http_client
72
76
.post(&login_url)
73
77
.header("Content-Type", "application/json")
74
78
.json(&request)
···
93
97
}
94
98
95
99
pub async fn publish_blip(&self, content: &str) -> Result<String> {
96
-
let session = self.session.as_ref()
100
+
let session = self
101
+
.session
102
+
.as_ref()
97
103
.context("Not authenticated. Please run 'thought login' first.")?;
98
104
105
+
let timestamp = Utc::now().to_rfc3339().replace("+00:00", "Z");
106
+
99
107
let record = BlipRecord {
100
108
record_type: "stream.thought.blip".to_string(),
101
109
content: content.to_string(),
102
-
created_at: Utc::now().to_rfc3339().replace("+00:00", "Z"),
110
+
created_at: timestamp.clone(),
103
111
};
104
112
105
113
let request = CreateRecordRequest {
106
114
repo: session.did.clone(),
107
115
collection: "stream.thought.blip".to_string(),
108
-
record: serde_json::to_value(&record)
109
-
.context("Failed to serialize blip record")?,
116
+
record: serde_json::to_value(&record).context("Failed to serialize blip record")?,
110
117
};
111
118
112
119
let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url);
113
-
120
+
114
121
let mut headers = HeaderMap::new();
115
122
headers.insert(
116
123
AUTHORIZATION,
117
124
HeaderValue::from_str(&format!("Bearer {}", session.access_jwt))
118
125
.context("Invalid authorization header")?,
119
126
);
120
-
headers.insert(
121
-
"Content-Type",
122
-
HeaderValue::from_static("application/json"),
123
-
);
127
+
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
124
128
125
-
let response = self.http_client
129
+
let response = self
130
+
.http_client
126
131
.post(&create_url)
127
132
.headers(headers)
128
133
.json(&request)
···
141
146
.await
142
147
.context("Failed to parse create record response")?;
143
148
144
-
Ok(create_response.uri)
149
+
Ok(timestamp)
145
150
}
146
151
147
152
pub fn is_authenticated(&self) -> bool {
···
151
156
pub fn get_user_did(&self) -> Option<String> {
152
157
self.session.as_ref().map(|s| s.did.clone())
153
158
}
154
-
}
159
+
}
+60
-49
src/jetstream.rs
+60
-49
src/jetstream.rs
···
1
1
use anyhow::{Context, Result};
2
+
use chrono::{DateTime, Utc};
2
3
use futures_util::StreamExt;
3
4
use serde::{Deserialize, Serialize};
4
5
use std::{collections::HashMap, time::Duration};
5
6
use tokio::sync::mpsc;
6
7
use tokio_tungstenite::{
7
8
connect_async,
8
-
tungstenite::{
9
-
client::IntoClientRequest,
10
-
http::HeaderValue,
11
-
Message,
12
-
},
9
+
tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
13
10
};
14
11
use url::Url;
15
12
···
46
43
47
44
pub struct JetstreamClient {
48
45
did_cache: HashMap<String, String>, // DID -> handle cache
49
-
own_did: Option<String>, // User's own DID to filter out
46
+
own_did: Option<String>, // User's own DID to filter out
50
47
}
51
48
52
49
impl JetstreamClient {
···
57
54
}
58
55
}
59
56
60
-
pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender<TuiMessage>) -> Result<()> {
57
+
pub async fn connect_and_listen(
58
+
&mut self,
59
+
message_tx: mpsc::UnboundedSender<TuiMessage>,
60
+
) -> Result<()> {
61
61
// Try simple connection first, then with collection filter
62
62
let urls = vec![
63
63
"wss://jetstream2.us-west.bsky.network/subscribe",
64
-
"wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip"
64
+
"wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip",
65
65
];
66
-
66
+
67
67
for (i, jetstream_url) in urls.iter().enumerate() {
68
68
// Send status to TUI instead of console
69
69
let status_msg = crate::tui::Message::new(
70
70
"system".to_string(),
71
71
format!("Trying connection {} of {}", i + 1, urls.len()),
72
72
false,
73
+
None,
73
74
);
74
75
let _ = message_tx.send(status_msg);
75
-
76
+
76
77
loop {
77
-
match self.try_connect_and_listen(&message_tx, jetstream_url).await {
78
+
match self
79
+
.try_connect_and_listen(&message_tx, jetstream_url)
80
+
.await
81
+
{
78
82
Ok(_) => {
79
83
return Ok(());
80
84
}
···
85
89
"system".to_string(),
86
90
"Connection failed, retrying in 5s...".to_string(),
87
91
false,
92
+
None,
88
93
);
89
94
let _ = message_tx.send(retry_msg);
90
95
tokio::time::sleep(Duration::from_secs(5)).await;
···
96
101
}
97
102
}
98
103
}
99
-
104
+
100
105
Ok(())
101
106
}
102
107
···
108
113
// Parse URL and create request with headers
109
114
let url = Url::parse(url_str)?;
110
115
let mut request = url.into_client_request()?;
111
-
116
+
112
117
// Add User-Agent header
113
-
request.headers_mut().insert(
114
-
"User-Agent",
115
-
HeaderValue::from_static("think-cli/0.1.0")
116
-
);
117
-
118
+
request
119
+
.headers_mut()
120
+
.insert("User-Agent", HeaderValue::from_static("think-cli/0.1.0"));
121
+
118
122
// Connect with timeout
119
123
let connect_future = connect_async(request);
120
-
let (ws_stream, _response) = tokio::time::timeout(
121
-
Duration::from_secs(10),
122
-
connect_future
123
-
).await
124
+
let (ws_stream, _response) = tokio::time::timeout(Duration::from_secs(10), connect_future)
125
+
.await
124
126
.context("Connection timeout")?
125
127
.context("Failed to connect to jetstream")?;
126
128
···
129
131
"system".to_string(),
130
132
"Connected to jetstream! Listening for blips...".to_string(),
131
133
false,
134
+
None,
132
135
);
133
136
let _ = message_tx.send(success_msg);
134
137
···
162
165
) -> Result<()> {
163
166
// First, check if it's even a commit event using basic JSON parsing
164
167
let event_value: serde_json::Value = serde_json::from_str(message)?;
165
-
168
+
166
169
// Only process commit events
167
170
if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") {
168
171
return Ok(());
169
172
}
170
-
173
+
171
174
// Check if it has a commit with the right collection
172
175
let commit = event_value.get("commit");
173
176
if let Some(commit_obj) = commit {
174
-
if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") {
177
+
if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip")
178
+
{
175
179
return Ok(());
176
180
}
177
-
181
+
178
182
// Skip delete operations
179
183
if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") {
180
184
return Ok(());
···
184
188
185
189
186
190
187
-
191
+
let event: JetstreamEvent = serde_json::from_str(message)?;
188
192
let commit = event.commit.as_ref().unwrap(); // Safe because we checked above
189
193
190
-
// Skip messages from our own DID
191
-
if let Some(ref own_did) = self.own_did {
192
-
if &event.did == own_did {
193
-
return Ok(());
194
-
}
195
-
}
196
-
197
194
// Parse the blip record
198
195
let record_data = commit.record.as_ref();
196
+
if record_data.is_none() {
199
197
200
198
201
199
202
200
203
201
204
-
205
-
202
+
Err(_) => return Ok(()), // Silently skip unparseable records
206
203
};
207
204
208
-
// Get or resolve the handle
209
-
let handle = self.resolve_did(&event.did).await;
205
+
let is_own = self.own_did.as_ref().is_some_and(|own| own == &event.did);
206
+
let handle = if is_own {
207
+
"you".into()
208
+
} else {
209
+
// Get or resolve the handle
210
+
self.resolve_did(&event.did).await
211
+
};
210
212
211
213
// Create TUI message
212
214
let tui_message = TuiMessage::new(
213
215
handle,
214
216
blip_record.content,
215
-
false, // Not our own message
217
+
is_own,
218
+
DateTime::parse_from_rfc3339(&blip_record.created_at)
219
+
.map(|dt| dt.with_timezone(&Utc))
220
+
.ok(), // Parse RFC3339 โ UTC, None if invalid (so current timestamp instead)
216
221
);
217
222
218
223
// Send to TUI
···
252
257
async fn fetch_handle_for_did(&self, did: &str) -> Result<String> {
253
258
// Use the ATProto API to resolve DID to handle
254
259
let client = reqwest::Client::new();
255
-
let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did);
256
-
260
+
let url = format!(
261
+
"https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}",
262
+
did
263
+
);
264
+
257
265
#[derive(Deserialize)]
258
266
struct ResolveResponse {
259
267
handle: String,
260
268
}
261
269
262
270
// Try a simpler approach - resolve via profile
263
-
let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did);
264
-
271
+
let profile_url = format!(
272
+
"https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}",
273
+
did
274
+
);
275
+
265
276
#[derive(Deserialize)]
266
277
struct ProfileResponse {
267
278
handle: String,
268
279
}
269
280
270
-
let response = client
271
-
.get(&profile_url)
272
-
.send()
273
-
.await?;
281
+
let response = client.get(&profile_url).send().await?;
274
282
275
283
if response.status().is_success() {
276
284
let profile: ProfileResponse = response.json().await?;
···
281
289
}
282
290
}
283
291
284
-
pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender<TuiMessage>, own_did: Option<String>) -> Result<()> {
292
+
pub async fn start_jetstream_listener(
293
+
message_tx: mpsc::UnboundedSender<TuiMessage>,
294
+
own_did: Option<String>,
295
+
) -> Result<()> {
285
296
let mut client = JetstreamClient::new(own_did);
286
297
client.connect_and_listen(message_tx).await
287
-
}
298
+
}
+52
-19
src/tui.rs
+52
-19
src/tui.rs
···
21
21
22
22
use crate::client::AtProtoClient;
23
23
24
-
#[derive(Debug, Clone)]
24
+
#[derive(Debug, Clone, PartialEq)]
25
25
pub struct Message {
26
26
pub handle: String,
27
27
pub content: String,
···
30
30
}
31
31
32
32
impl Message {
33
-
pub fn new(handle: String, content: String, is_own: bool) -> Self {
33
+
pub fn new(
34
+
handle: String,
35
+
content: String,
36
+
is_own: bool,
37
+
timestamp: Option<DateTime<Utc>>,
38
+
) -> Self {
34
39
Self {
35
40
handle,
36
41
content,
37
-
timestamp: Utc::now(),
42
+
timestamp: timestamp.unwrap_or_else(Utc::now),
38
43
is_own,
39
44
}
40
45
}
···
71
76
pub fn add_message(&mut self, message: Message) {
72
77
self.messages.push(message);
73
78
self.message_count += 1;
74
-
79
+
75
80
// Keep only last 1000 messages
76
81
if self.messages.len() > 1000 {
77
82
self.messages.remove(0);
78
83
}
79
-
84
+
80
85
// Auto-scroll to bottom unless user is scrolling up
81
86
if self.scroll_offset == 0 {
82
87
self.scroll_offset = 0; // Stay at bottom
···
139
144
let vertical = Layout::default()
140
145
.direction(Direction::Vertical)
141
146
.constraints([
142
-
Constraint::Min(0), // Messages area
143
-
Constraint::Length(3), // Status area
144
-
Constraint::Length(3), // Input area
147
+
Constraint::Min(0), // Messages area
148
+
Constraint::Length(3), // Status area
149
+
Constraint::Length(3), // Input area
145
150
])
146
151
.split(frame.area());
147
152
148
153
// Render messages
149
154
let mut message_lines = Vec::new();
150
-
155
+
151
156
// Convert messages to styled lines in reverse chronological order (newest first)
152
157
for msg in self.messages.iter().rev() {
153
158
let style = if msg.is_own {
154
-
Style::default().fg(Color::Green).add_modifier(Modifier::BOLD)
159
+
Style::default()
160
+
.fg(Color::Green)
161
+
.add_modifier(Modifier::BOLD)
155
162
} else {
156
163
Style::default().fg(Color::White)
157
164
};
158
-
165
+
159
166
message_lines.push(Line::from(Span::styled(msg.format_display(), style)));
160
167
}
161
-
168
+
162
169
let messages_text = Text::from(message_lines);
163
170
let messages_paragraph = Paragraph::new(messages_text)
164
171
.block(Block::default().borders(Borders::ALL).title("Messages"))
···
172
179
} else {
173
180
Style::default().fg(Color::Yellow)
174
181
};
175
-
182
+
176
183
let status_paragraph = Paragraph::new(self.status.clone())
177
184
.style(status_style)
178
185
.block(Block::default().borders(Borders::ALL).title("Status"));
179
186
frame.render_widget(status_paragraph, vertical[1]);
180
187
181
188
// Render input
182
-
let input_paragraph = Paragraph::new(self.input.clone())
183
-
.block(Block::default().borders(Borders::ALL).title("Input (Esc to quit)"));
189
+
let input_paragraph = Paragraph::new(self.input.clone()).block(
190
+
Block::default()
191
+
.borders(Borders::ALL)
192
+
.title("Input (Esc to quit)"),
193
+
);
184
194
frame.render_widget(input_paragraph, vertical[2]);
185
195
}
186
196
}
···
197
207
let mut terminal = Terminal::new(backend)?;
198
208
199
209
let mut app = TuiApp::new();
200
-
210
+
201
211
// Add welcome message
202
212
app.add_message(Message::new(
203
213
"system".to_string(),
204
214
"Welcome to Think TUI! Connecting to jetstream...".to_string(),
205
215
false,
216
+
None,
206
217
));
207
218
208
219
let result = run_tui_loop(&mut terminal, &mut app, client, &mut message_rx).await;
···
234
245
if let Event::Key(key) = event::read()? {
235
246
if key.kind == KeyEventKind::Press {
236
247
// Handle Ctrl+C
237
-
if matches!(key.code, KeyCode::Char('c')) && key.modifiers.contains(crossterm::event::KeyModifiers::CONTROL) {
248
+
if matches!(key.code, KeyCode::Char('c'))
249
+
&& key
250
+
.modifiers
251
+
.contains(crossterm::event::KeyModifiers::CONTROL)
252
+
{
238
253
break;
239
254
}
240
255
···
242
257
if let Some(message) = app.handle_input(key.code) {
243
258
// Publish the message
244
259
match client.publish_blip(&message).await {
245
-
Ok(_) => {
260
+
Ok(t) => {
246
261
// Add our own message to the display
247
262
app.add_message(Message::new(
248
263
"you".to_string(),
249
264
message,
250
265
true,
266
+
Some(
267
+
DateTime::parse_from_rfc3339(&t)
268
+
.unwrap()
269
+
.with_timezone(&Utc),
270
+
),
251
271
));
252
272
}
253
273
Err(e) => {
···
256
276
"error".to_string(),
257
277
format!("Failed to publish: {}", e),
258
278
false,
279
+
None,
259
280
));
260
281
}
261
282
}
···
266
287
267
288
// Check for new messages from jetstream
268
289
while let Ok(message) = message_rx.try_recv() {
290
+
// find most recent is_own message and see if it's already there (you posted it)
291
+
let duplicate = app
292
+
.messages
293
+
.iter()
294
+
.rev()
295
+
.find(|m| m.is_own)
296
+
.is_some_and(|m| m.timestamp == message.timestamp);
297
+
298
+
if duplicate {
299
+
continue;
300
+
}
301
+
269
302
app.add_message(message);
270
303
app.set_connection_status(true);
271
304
}
···
277
310
}
278
311
279
312
Ok(())
280
-
}
313
+
}
History
1 round
5 comments
rubberducky.guru
submitted
#0
expand 5 comments
-#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct Message { pub handle: String, pub content: String,
@@ -263,11 +263,9 @@ async fn run_tui_loop( "you".to_string(), message, true,
-
Some( -
DateTime::parse_from_rfc3339(&t) -
.unwrap() -
.with_timezone(&Utc), -
),
-
DateTime::parse_from_rfc3339(&t) -
.map(|dt| dt.with_timezone(&Utc)) -
.ok(), // Parse RFC3339 โ UTC, None if invalid (so current timestamp instead) ));
Can't push those changes right now but I'll fix it this weekend
oops markdown formatting destroyed it and I can't edit/delete a comment, welp ยฏ_(ใ)_/ยฏ
Actually it was pretty quick to setup another ssh key. I have updated the PR with the changes
Unable to resubmit, I'll just create a new PR
closed without merging
Zed added made a bunch of other formatting changes and
cargo clippydid not help. Let me know if you want me to change stuff like adding arustfmt.tomlso that it formats the same way for everyone.