learn and share notes on atproto (wip) 馃 malfestio.stormlightlabs.org/
readability solid axum atproto srs
at main 414 lines 14 kB view raw
1//! PDS client for XRPC operations. 2//! 3//! Handles communication with a user's Personal Data Server. 4 5use crate::oauth::dpop::DpopKeypair; 6use malfestio_core::at_uri::AtUri; 7use serde::{Deserialize, Serialize}; 8 9/// A client for interacting with a user's PDS. 10/// 11/// Supports both DPoP-bound tokens (OAuth) and Bearer tokens (app passwords). 12pub struct PdsClient { 13 http_client: reqwest::Client, 14 pds_url: String, 15 access_token: String, 16 dpop_keypair: Option<DpopKeypair>, 17} 18 19/// Request body for putRecord XRPC. 20#[derive(Serialize)] 21#[serde(rename_all = "camelCase")] 22pub struct PutRecordRequest { 23 pub repo: String, 24 pub collection: String, 25 pub rkey: String, 26 pub record: serde_json::Value, 27 #[serde(skip_serializing_if = "Option::is_none")] 28 pub swap_record: Option<String>, 29 #[serde(skip_serializing_if = "Option::is_none")] 30 pub swap_commit: Option<String>, 31 #[serde(skip_serializing_if = "Option::is_none")] 32 pub validate: Option<bool>, 33} 34 35/// Response from putRecord XRPC. 36#[derive(Deserialize)] 37#[serde(rename_all = "camelCase")] 38pub struct PutRecordResponse { 39 pub uri: String, 40 pub cid: String, 41} 42 43/// Response from getRecord XRPC. 44#[derive(Deserialize)] 45#[serde(rename_all = "camelCase")] 46pub struct GetRecordResponse { 47 pub uri: String, 48 pub cid: String, 49 pub value: serde_json::Value, 50} 51 52/// Result of getting a record from PDS. 53#[derive(Debug, Clone)] 54pub struct GetRecordResult { 55 pub uri: String, 56 pub cid: String, 57 pub value: serde_json::Value, 58} 59 60/// Request body for deleteRecord XRPC. 61#[derive(Serialize)] 62#[serde(rename_all = "camelCase")] 63pub struct DeleteRecordRequest { 64 pub repo: String, 65 pub collection: String, 66 pub rkey: String, 67 #[serde(skip_serializing_if = "Option::is_none")] 68 pub swap_record: Option<String>, 69 #[serde(skip_serializing_if = "Option::is_none")] 70 pub swap_commit: Option<String>, 71} 72 73/// Response from uploadBlob XRPC. 74#[derive(Deserialize)] 75pub struct UploadBlobResponse { 76 pub blob: BlobRef, 77} 78 79/// A reference to an uploaded blob. 80#[derive(Clone, Serialize, Deserialize)] 81pub struct BlobRef { 82 #[serde(rename = "$type")] 83 pub blob_type: String, 84 #[serde(rename = "ref")] 85 pub cid: CidLink, 86 #[serde(rename = "mimeType")] 87 pub mime_type: String, 88 pub size: u64, 89} 90 91/// A CID link. 92#[derive(Clone, Serialize, Deserialize)] 93pub struct CidLink { 94 #[serde(rename = "$link")] 95 pub link: String, 96} 97 98/// Error type for PDS operations. 99#[derive(Debug, Clone)] 100pub enum PdsError { 101 NetworkError(String), 102 AuthError(String), 103 ValidationError(String), 104 NotFound(String), 105 ServerError(String), 106} 107 108impl std::fmt::Display for PdsError { 109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 110 match self { 111 PdsError::NetworkError(e) => write!(f, "Network error: {}", e), 112 PdsError::AuthError(e) => write!(f, "Authentication error: {}", e), 113 PdsError::ValidationError(e) => write!(f, "Validation error: {}", e), 114 PdsError::NotFound(e) => write!(f, "Not found: {}", e), 115 PdsError::ServerError(e) => write!(f, "Server error: {}", e), 116 } 117 } 118} 119 120impl std::error::Error for PdsError {} 121 122impl PdsClient { 123 /// Create a new PDS client with DPoP support (OAuth tokens). 124 /// 125 /// Uses DPoP proof-of-possession for enhanced security. 126 pub fn new_with_dpop(pds_url: String, access_token: String, dpop_keypair: DpopKeypair) -> Self { 127 Self { http_client: reqwest::Client::new(), pds_url, access_token, dpop_keypair: Some(dpop_keypair) } 128 } 129 130 /// Create a new PDS client with Bearer authentication (app password tokens). 131 /// 132 /// Uses standard Bearer token authentication without DPoP. 133 pub fn new_bearer(pds_url: String, access_token: String) -> Self { 134 Self { http_client: reqwest::Client::new(), pds_url, access_token, dpop_keypair: None } 135 } 136 137 /// Create a new PDS client (deprecated - use new_with_dpop or new_bearer). 138 #[deprecated(since = "0.1.0", note = "Use new_with_dpop or new_bearer instead")] 139 pub fn new(pds_url: String, access_token: String, dpop_keypair: DpopKeypair) -> Self { 140 Self::new_with_dpop(pds_url, access_token, dpop_keypair) 141 } 142 143 /// Create or update a record in the repository. 144 pub async fn put_record( 145 &self, did: &str, collection: &str, rkey: &str, record: serde_json::Value, 146 ) -> Result<AtUri, PdsError> { 147 let url = format!("{}/xrpc/com.atproto.repo.putRecord", self.pds_url); 148 149 let request = PutRecordRequest { 150 repo: did.to_string(), 151 collection: collection.to_string(), 152 rkey: rkey.to_string(), 153 record, 154 swap_record: None, 155 swap_commit: None, 156 validate: Some(false), 157 }; 158 159 let mut request_builder = self.http_client.post(&url); 160 161 if let Some(ref dpop_keypair) = self.dpop_keypair { 162 let dpop_proof = dpop_keypair.generate_proof("POST", &url, Some(&self.access_token)); 163 request_builder = request_builder 164 .header("Authorization", format!("DPoP {}", self.access_token)) 165 .header("DPoP", dpop_proof); 166 } else { 167 request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token)); 168 } 169 170 let response = request_builder 171 .json(&request) 172 .send() 173 .await 174 .map_err(|e| PdsError::NetworkError(e.to_string()))?; 175 176 self.handle_response(response).await 177 } 178 179 /// Get a record from the repository. 180 pub async fn get_record(&self, did: &str, collection: &str, rkey: &str) -> Result<GetRecordResult, PdsError> { 181 let url = format!( 182 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 183 self.pds_url, did, collection, rkey 184 ); 185 186 let mut request_builder = self.http_client.get(&url); 187 188 if let Some(ref dpop_keypair) = self.dpop_keypair { 189 let dpop_proof = dpop_keypair.generate_proof("GET", &url, Some(&self.access_token)); 190 request_builder = request_builder 191 .header("Authorization", format!("DPoP {}", self.access_token)) 192 .header("DPoP", dpop_proof); 193 } else { 194 request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token)); 195 } 196 197 let response = request_builder 198 .send() 199 .await 200 .map_err(|e| PdsError::NetworkError(e.to_string()))?; 201 202 if response.status().is_success() { 203 let get_response: GetRecordResponse = response 204 .json() 205 .await 206 .map_err(|e| PdsError::NetworkError(format!("Failed to parse response: {}", e)))?; 207 Ok(GetRecordResult { uri: get_response.uri, cid: get_response.cid, value: get_response.value }) 208 } else { 209 let status = response.status(); 210 let body = response.text().await.unwrap_or_default(); 211 Err(self.map_error_status(status, body)) 212 } 213 } 214 215 /// Delete a record from the repository. 216 pub async fn delete_record(&self, did: &str, collection: &str, rkey: &str) -> Result<(), PdsError> { 217 let url = format!("{}/xrpc/com.atproto.repo.deleteRecord", self.pds_url); 218 219 let request = DeleteRecordRequest { 220 repo: did.to_string(), 221 collection: collection.to_string(), 222 rkey: rkey.to_string(), 223 swap_record: None, 224 swap_commit: None, 225 }; 226 227 let mut request_builder = self.http_client.post(&url); 228 229 if let Some(ref dpop_keypair) = self.dpop_keypair { 230 let dpop_proof = dpop_keypair.generate_proof("POST", &url, Some(&self.access_token)); 231 request_builder = request_builder 232 .header("Authorization", format!("DPoP {}", self.access_token)) 233 .header("DPoP", dpop_proof); 234 } else { 235 request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token)); 236 } 237 238 let response = request_builder 239 .json(&request) 240 .send() 241 .await 242 .map_err(|e| PdsError::NetworkError(e.to_string()))?; 243 244 if response.status().is_success() { 245 Ok(()) 246 } else { 247 let status = response.status(); 248 let body = response.text().await.unwrap_or_default(); 249 Err(self.map_error_status(status, body)) 250 } 251 } 252 253 /// Upload a blob (media attachment) to the repository. 254 pub async fn upload_blob(&self, data: Vec<u8>, mime_type: &str) -> Result<BlobRef, PdsError> { 255 let url = format!("{}/xrpc/com.atproto.repo.uploadBlob", self.pds_url); 256 257 let mut request_builder = self.http_client.post(&url); 258 259 if let Some(ref dpop_keypair) = self.dpop_keypair { 260 let dpop_proof = dpop_keypair.generate_proof("POST", &url, Some(&self.access_token)); 261 request_builder = request_builder 262 .header("Authorization", format!("DPoP {}", self.access_token)) 263 .header("DPoP", dpop_proof); 264 } else { 265 request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token)); 266 } 267 268 let response = request_builder 269 .header("Content-Type", mime_type) 270 .body(data) 271 .send() 272 .await 273 .map_err(|e| PdsError::NetworkError(e.to_string()))?; 274 275 if !response.status().is_success() { 276 let status = response.status(); 277 let body = response.text().await.unwrap_or_default(); 278 return Err(self.map_error_status(status, body)); 279 } 280 281 let upload_response: UploadBlobResponse = response 282 .json() 283 .await 284 .map_err(|e| PdsError::NetworkError(e.to_string()))?; 285 286 Ok(upload_response.blob) 287 } 288 289 /// Handle response and parse AT-URI from success. 290 async fn handle_response(&self, response: reqwest::Response) -> Result<AtUri, PdsError> { 291 if !response.status().is_success() { 292 let status = response.status(); 293 let body = response.text().await.unwrap_or_default(); 294 return Err(self.map_error_status(status, body)); 295 } 296 297 let put_response: PutRecordResponse = response 298 .json() 299 .await 300 .map_err(|e| PdsError::NetworkError(e.to_string()))?; 301 302 AtUri::parse(&put_response.uri).map_err(|e| PdsError::ValidationError(e.to_string())) 303 } 304 305 /// Map HTTP status to PdsError. 306 fn map_error_status(&self, status: reqwest::StatusCode, body: String) -> PdsError { 307 match status.as_u16() { 308 401 => PdsError::AuthError(body), 309 400 => PdsError::ValidationError(body), 310 404 => PdsError::NotFound(body), 311 _ => PdsError::ServerError(format!("{}: {}", status, body)), 312 } 313 } 314} 315 316#[cfg(test)] 317mod tests { 318 use super::*; 319 320 #[test] 321 fn test_put_record_request_serialization() { 322 let request = PutRecordRequest { 323 repo: "did:plc:abc123".to_string(), 324 collection: "org.stormlightlabs.malfestio.deck".to_string(), 325 rkey: "3k5abc123".to_string(), 326 record: serde_json::json!({ 327 "title": "Test Deck", 328 "createdAt": "2024-01-01T00:00:00Z" 329 }), 330 swap_record: None, 331 swap_commit: None, 332 validate: Some(true), 333 }; 334 335 let json = serde_json::to_string(&request).unwrap(); 336 assert!(json.contains("\"repo\":\"did:plc:abc123\"")); 337 assert!(json.contains("\"collection\":\"org.stormlightlabs.malfestio.deck\"")); 338 assert!(json.contains("\"rkey\":\"3k5abc123\"")); 339 assert!(json.contains("\"validate\":true")); 340 } 341 342 #[test] 343 fn test_delete_record_request_serialization() { 344 let request = DeleteRecordRequest { 345 repo: "did:plc:abc123".to_string(), 346 collection: "org.stormlightlabs.malfestio.deck".to_string(), 347 rkey: "3k5abc123".to_string(), 348 swap_record: None, 349 swap_commit: None, 350 }; 351 352 let json = serde_json::to_string(&request).unwrap(); 353 assert!(json.contains("\"repo\":\"did:plc:abc123\"")); 354 assert!(!json.contains("swapRecord")); 355 } 356 357 #[test] 358 fn test_blob_ref_serialization() { 359 let blob_ref = BlobRef { 360 blob_type: "blob".to_string(), 361 cid: CidLink { link: "bafyreiabc123".to_string() }, 362 mime_type: "image/jpeg".to_string(), 363 size: 12345, 364 }; 365 366 let json = serde_json::to_string(&blob_ref).unwrap(); 367 assert!(json.contains("\"$type\":\"blob\"")); 368 assert!(json.contains("\"$link\":\"bafyreiabc123\"")); 369 assert!(json.contains("\"mimeType\":\"image/jpeg\"")); 370 } 371 372 #[test] 373 fn test_pds_error_display() { 374 let err = PdsError::AuthError("Invalid token".to_string()); 375 assert!(err.to_string().contains("Invalid token")); 376 377 let err = PdsError::NetworkError("Connection refused".to_string()); 378 assert!(err.to_string().contains("Connection refused")); 379 } 380 381 #[test] 382 fn test_pds_client_new_with_dpop() { 383 use crate::oauth::dpop::DpopKeypair; 384 385 let keypair = DpopKeypair::generate(); 386 let client = PdsClient::new_with_dpop("https://bsky.social".to_string(), "test_token".to_string(), keypair); 387 388 assert_eq!(client.pds_url, "https://bsky.social"); 389 assert_eq!(client.access_token, "test_token"); 390 assert!(client.dpop_keypair.is_some()); 391 } 392 393 #[test] 394 fn test_pds_client_new_bearer() { 395 let client = PdsClient::new_bearer("https://bsky.social".to_string(), "test_token".to_string()); 396 397 assert_eq!(client.pds_url, "https://bsky.social"); 398 assert_eq!(client.access_token, "test_token"); 399 assert!(client.dpop_keypair.is_none()); 400 } 401 402 #[test] 403 #[allow(deprecated)] 404 fn test_pds_client_new_deprecated() { 405 use crate::oauth::dpop::DpopKeypair; 406 407 let keypair = DpopKeypair::generate(); 408 let client = PdsClient::new("https://bsky.social".to_string(), "test_token".to_string(), keypair); 409 410 assert_eq!(client.pds_url, "https://bsky.social"); 411 assert_eq!(client.access_token, "test_token"); 412 assert!(client.dpop_keypair.is_some()); 413 } 414}