A Rust CLI for publishing thought records. Designed to work with thought.stream.
1use anyhow::{Context, Result};
2use chrono::Utc;
3use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::time::Duration;
7
8use crate::credentials::Credentials;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct Session {
12 #[serde(rename = "accessJwt")]
13 pub access_jwt: String,
14 #[serde(rename = "refreshJwt")]
15 pub refresh_jwt: String,
16 pub handle: String,
17 pub did: String,
18}
19
20#[derive(Debug, Clone)]
21pub struct AtProtoClient {
22 http_client: HttpClient,
23 base_url: String,
24 session: Option<Session>,
25}
26
27#[derive(Debug, Serialize)]
28struct LoginRequest {
29 identifier: String,
30 password: String,
31}
32
33#[derive(Debug, Serialize)]
34struct CreateRecordRequest {
35 repo: String,
36 collection: String,
37 record: Value,
38}
39
40#[derive(Debug, Serialize)]
41struct BlipRecord {
42 #[serde(rename = "$type")]
43 record_type: String,
44 content: String,
45 #[serde(rename = "createdAt")]
46 created_at: String,
47}
48
49#[derive(Debug, Deserialize)]
50struct CreateRecordResponse {
51 uri: String,
52 cid: String,
53}
54
55#[derive(Debug, Deserialize)]
56struct RefreshSessionResponse {
57 #[serde(rename = "accessJwt")]
58 access_jwt: String,
59 #[serde(rename = "refreshJwt")]
60 refresh_jwt: String,
61 handle: String,
62 did: String,
63}
64
65impl AtProtoClient {
66 pub fn new(pds_uri: &str) -> Self {
67 Self {
68 http_client: HttpClient::new(),
69 base_url: pds_uri.to_string(),
70 session: None,
71 }
72 }
73
74 pub async fn login(&mut self, credentials: &Credentials) -> Result<()> {
75 let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url);
76
77 let request = LoginRequest {
78 identifier: credentials.username.clone(),
79 password: credentials.password.clone(),
80 };
81
82 let response = self.http_client
83 .post(&login_url)
84 .header("Content-Type", "application/json")
85 .json(&request)
86 .send()
87 .await
88 .context("Failed to send login request")?;
89
90 if !response.status().is_success() {
91 let status = response.status();
92 let error_text = response.text().await.unwrap_or_default();
93 anyhow::bail!("Login failed: {} - {}", status, error_text);
94 }
95
96 let session: Session = response
97 .json()
98 .await
99 .context("Failed to parse login response")?;
100
101 println!("Authenticated as: {}", session.handle);
102 self.session = Some(session);
103 Ok(())
104 }
105
106 pub async fn publish_blip(&mut self, content: &str) -> Result<String> {
107 // Try the request, and if it fails with auth error, refresh and retry once
108 match self.try_publish_blip(content).await {
109 Ok(uri) => Ok(uri),
110 Err(e) => {
111 // Check if this is an authentication error by examining the error message
112 let error_msg = e.to_string();
113 if error_msg.contains("401") || error_msg.contains("Unauthorized") ||
114 error_msg.contains("ExpiredToken") || error_msg.contains("Token has expired") {
115 // Try to refresh the session
116 match self.refresh_session().await {
117 Ok(_) => {
118 // Retry the request with the new token
119 self.try_publish_blip(content).await
120 .context("Failed to publish blip after session refresh")
121 }
122 Err(refresh_err) => {
123 // Session refresh failed - clear any stored session
124 if let Ok(store) = crate::credentials::CredentialStore::new() {
125 let _ = store.clear_session();
126 }
127 Err(anyhow::anyhow!(
128 "Authentication failed and session refresh failed: {}. Please run 'thought login' again.",
129 refresh_err
130 ))
131 }
132 }
133 } else {
134 Err(e)
135 }
136 }
137 }
138 }
139
140 async fn try_publish_blip(&self, content: &str) -> Result<String> {
141 let session = self.session.as_ref()
142 .context("Not authenticated. Please run 'thought login' first.")?;
143
144 let record = BlipRecord {
145 record_type: "stream.thought.blip".to_string(),
146 content: content.to_string(),
147 created_at: Utc::now().to_rfc3339().replace("+00:00", "Z"),
148 };
149
150 let request = CreateRecordRequest {
151 repo: session.did.clone(),
152 collection: "stream.thought.blip".to_string(),
153 record: serde_json::to_value(&record)
154 .context("Failed to serialize blip record")?,
155 };
156
157 let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url);
158
159 let mut headers = HeaderMap::new();
160 headers.insert(
161 AUTHORIZATION,
162 HeaderValue::from_str(&format!("Bearer {}", session.access_jwt))
163 .context("Invalid authorization header")?,
164 );
165 headers.insert(
166 "Content-Type",
167 HeaderValue::from_static("application/json"),
168 );
169
170 let response = self.http_client
171 .post(&create_url)
172 .headers(headers)
173 .json(&request)
174 .timeout(Duration::from_secs(30))
175 .send()
176 .await
177 .context("Failed to send create record request (timeout or network error)")?;
178
179 if !response.status().is_success() {
180 let status = response.status();
181 let error_text = response.text().await.unwrap_or_default();
182 anyhow::bail!("Failed to publish blip: {} - {}", status, error_text);
183 }
184
185 let create_response: CreateRecordResponse = response
186 .json()
187 .await
188 .context("Failed to parse create record response")?;
189
190 Ok(create_response.uri)
191 }
192
193 pub fn is_authenticated(&self) -> bool {
194 self.session.is_some()
195 }
196
197 pub fn get_user_did(&self) -> Option<String> {
198 self.session.as_ref().map(|s| s.did.clone())
199 }
200
201 pub async fn refresh_session(&mut self) -> Result<()> {
202 let session = self.session.as_ref()
203 .context("No session to refresh. Please login first.")?;
204
205 let refresh_url = format!("{}/xrpc/com.atproto.server.refreshSession", self.base_url);
206
207 let mut headers = HeaderMap::new();
208 headers.insert(
209 AUTHORIZATION,
210 HeaderValue::from_str(&format!("Bearer {}", session.refresh_jwt))
211 .context("Invalid refresh token for authorization header")?,
212 );
213
214 let response = self.http_client
215 .post(&refresh_url)
216 .headers(headers)
217 .timeout(Duration::from_secs(30))
218 .send()
219 .await
220 .context("Failed to send refresh session request (timeout or network error)")?;
221
222 if !response.status().is_success() {
223 let status = response.status();
224 let error_text = response.text().await.unwrap_or_default();
225 anyhow::bail!("Session refresh failed: {} - {}", status, error_text);
226 }
227
228 let refresh_response: RefreshSessionResponse = response
229 .json()
230 .await
231 .context("Failed to parse refresh session response")?;
232
233 let new_session = Session {
234 access_jwt: refresh_response.access_jwt,
235 refresh_jwt: refresh_response.refresh_jwt,
236 handle: refresh_response.handle,
237 did: refresh_response.did,
238 };
239
240 self.session = Some(new_session);
241 Ok(())
242 }
243
244 pub fn get_session(&self) -> Option<&Session> {
245 self.session.as_ref()
246 }
247
248 pub fn set_session(&mut self, session: Session) {
249 self.session = Some(session);
250 }
251}