A Rust CLI for publishing thought records. Designed to work with thought.stream.
at main 8.2 kB view raw
1use anyhow::{Context, Result}; 2use chrono::Utc; 3use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}}; 4use serde::{Deserialize, Serialize}; 5use serde_json::Value; 6use std::time::Duration; 7 8use crate::credentials::Credentials; 9 10#[derive(Debug, Clone, Serialize, Deserialize)] 11pub struct Session { 12 #[serde(rename = "accessJwt")] 13 pub access_jwt: String, 14 #[serde(rename = "refreshJwt")] 15 pub refresh_jwt: String, 16 pub handle: String, 17 pub did: String, 18} 19 20#[derive(Debug, Clone)] 21pub struct AtProtoClient { 22 http_client: HttpClient, 23 base_url: String, 24 session: Option<Session>, 25} 26 27#[derive(Debug, Serialize)] 28struct LoginRequest { 29 identifier: String, 30 password: String, 31} 32 33#[derive(Debug, Serialize)] 34struct CreateRecordRequest { 35 repo: String, 36 collection: String, 37 record: Value, 38} 39 40#[derive(Debug, Serialize)] 41struct BlipRecord { 42 #[serde(rename = "$type")] 43 record_type: String, 44 content: String, 45 #[serde(rename = "createdAt")] 46 created_at: String, 47} 48 49#[derive(Debug, Deserialize)] 50struct CreateRecordResponse { 51 uri: String, 52 cid: String, 53} 54 55#[derive(Debug, Deserialize)] 56struct RefreshSessionResponse { 57 #[serde(rename = "accessJwt")] 58 access_jwt: String, 59 #[serde(rename = "refreshJwt")] 60 refresh_jwt: String, 61 handle: String, 62 did: String, 63} 64 65impl AtProtoClient { 66 pub fn new(pds_uri: &str) -> Self { 67 Self { 68 http_client: HttpClient::new(), 69 base_url: pds_uri.to_string(), 70 session: None, 71 } 72 } 73 74 pub async fn login(&mut self, credentials: &Credentials) -> Result<()> { 75 let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url); 76 77 let request = LoginRequest { 78 identifier: credentials.username.clone(), 79 password: credentials.password.clone(), 80 }; 81 82 let response = self.http_client 83 .post(&login_url) 84 .header("Content-Type", "application/json") 85 .json(&request) 86 .send() 87 .await 88 .context("Failed to send login request")?; 89 90 if !response.status().is_success() { 91 let status = response.status(); 92 let error_text = response.text().await.unwrap_or_default(); 93 anyhow::bail!("Login failed: {} - {}", status, error_text); 94 } 95 96 let session: Session = response 97 .json() 98 .await 99 .context("Failed to parse login response")?; 100 101 println!("Authenticated as: {}", session.handle); 102 self.session = Some(session); 103 Ok(()) 104 } 105 106 pub async fn publish_blip(&mut self, content: &str) -> Result<String> { 107 // Try the request, and if it fails with auth error, refresh and retry once 108 match self.try_publish_blip(content).await { 109 Ok(uri) => Ok(uri), 110 Err(e) => { 111 // Check if this is an authentication error by examining the error message 112 let error_msg = e.to_string(); 113 if error_msg.contains("401") || error_msg.contains("Unauthorized") || 114 error_msg.contains("ExpiredToken") || error_msg.contains("Token has expired") { 115 // Try to refresh the session 116 match self.refresh_session().await { 117 Ok(_) => { 118 // Retry the request with the new token 119 self.try_publish_blip(content).await 120 .context("Failed to publish blip after session refresh") 121 } 122 Err(refresh_err) => { 123 // Session refresh failed - clear any stored session 124 if let Ok(store) = crate::credentials::CredentialStore::new() { 125 let _ = store.clear_session(); 126 } 127 Err(anyhow::anyhow!( 128 "Authentication failed and session refresh failed: {}. Please run 'thought login' again.", 129 refresh_err 130 )) 131 } 132 } 133 } else { 134 Err(e) 135 } 136 } 137 } 138 } 139 140 async fn try_publish_blip(&self, content: &str) -> Result<String> { 141 let session = self.session.as_ref() 142 .context("Not authenticated. Please run 'thought login' first.")?; 143 144 let record = BlipRecord { 145 record_type: "stream.thought.blip".to_string(), 146 content: content.to_string(), 147 created_at: Utc::now().to_rfc3339().replace("+00:00", "Z"), 148 }; 149 150 let request = CreateRecordRequest { 151 repo: session.did.clone(), 152 collection: "stream.thought.blip".to_string(), 153 record: serde_json::to_value(&record) 154 .context("Failed to serialize blip record")?, 155 }; 156 157 let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url); 158 159 let mut headers = HeaderMap::new(); 160 headers.insert( 161 AUTHORIZATION, 162 HeaderValue::from_str(&format!("Bearer {}", session.access_jwt)) 163 .context("Invalid authorization header")?, 164 ); 165 headers.insert( 166 "Content-Type", 167 HeaderValue::from_static("application/json"), 168 ); 169 170 let response = self.http_client 171 .post(&create_url) 172 .headers(headers) 173 .json(&request) 174 .timeout(Duration::from_secs(30)) 175 .send() 176 .await 177 .context("Failed to send create record request (timeout or network error)")?; 178 179 if !response.status().is_success() { 180 let status = response.status(); 181 let error_text = response.text().await.unwrap_or_default(); 182 anyhow::bail!("Failed to publish blip: {} - {}", status, error_text); 183 } 184 185 let create_response: CreateRecordResponse = response 186 .json() 187 .await 188 .context("Failed to parse create record response")?; 189 190 Ok(create_response.uri) 191 } 192 193 pub fn is_authenticated(&self) -> bool { 194 self.session.is_some() 195 } 196 197 pub fn get_user_did(&self) -> Option<String> { 198 self.session.as_ref().map(|s| s.did.clone()) 199 } 200 201 pub async fn refresh_session(&mut self) -> Result<()> { 202 let session = self.session.as_ref() 203 .context("No session to refresh. Please login first.")?; 204 205 let refresh_url = format!("{}/xrpc/com.atproto.server.refreshSession", self.base_url); 206 207 let mut headers = HeaderMap::new(); 208 headers.insert( 209 AUTHORIZATION, 210 HeaderValue::from_str(&format!("Bearer {}", session.refresh_jwt)) 211 .context("Invalid refresh token for authorization header")?, 212 ); 213 214 let response = self.http_client 215 .post(&refresh_url) 216 .headers(headers) 217 .timeout(Duration::from_secs(30)) 218 .send() 219 .await 220 .context("Failed to send refresh session request (timeout or network error)")?; 221 222 if !response.status().is_success() { 223 let status = response.status(); 224 let error_text = response.text().await.unwrap_or_default(); 225 anyhow::bail!("Session refresh failed: {} - {}", status, error_text); 226 } 227 228 let refresh_response: RefreshSessionResponse = response 229 .json() 230 .await 231 .context("Failed to parse refresh session response")?; 232 233 let new_session = Session { 234 access_jwt: refresh_response.access_jwt, 235 refresh_jwt: refresh_response.refresh_jwt, 236 handle: refresh_response.handle, 237 did: refresh_response.did, 238 }; 239 240 self.session = Some(new_session); 241 Ok(()) 242 } 243 244 pub fn get_session(&self) -> Option<&Session> { 245 self.session.as_ref() 246 } 247 248 pub fn set_session(&mut self, session: Session) { 249 self.session = Some(session); 250 } 251}