learn and share notes on atproto (wip) 馃
malfestio.stormlightlabs.org/
readability
solid
axum
atproto
srs
1//! PDS client for XRPC operations.
2//!
3//! Handles communication with a user's Personal Data Server.
4
5use crate::oauth::dpop::DpopKeypair;
6use malfestio_core::at_uri::AtUri;
7use serde::{Deserialize, Serialize};
8
9/// A client for interacting with a user's PDS.
10///
11/// Supports both DPoP-bound tokens (OAuth) and Bearer tokens (app passwords).
12pub struct PdsClient {
13 http_client: reqwest::Client,
14 pds_url: String,
15 access_token: String,
16 dpop_keypair: Option<DpopKeypair>,
17}
18
19/// Request body for putRecord XRPC.
20#[derive(Serialize)]
21#[serde(rename_all = "camelCase")]
22pub struct PutRecordRequest {
23 pub repo: String,
24 pub collection: String,
25 pub rkey: String,
26 pub record: serde_json::Value,
27 #[serde(skip_serializing_if = "Option::is_none")]
28 pub swap_record: Option<String>,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub swap_commit: Option<String>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub validate: Option<bool>,
33}
34
35/// Response from putRecord XRPC.
36#[derive(Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct PutRecordResponse {
39 pub uri: String,
40 pub cid: String,
41}
42
43/// Response from getRecord XRPC.
44#[derive(Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct GetRecordResponse {
47 pub uri: String,
48 pub cid: String,
49 pub value: serde_json::Value,
50}
51
52/// Result of getting a record from PDS.
53#[derive(Debug, Clone)]
54pub struct GetRecordResult {
55 pub uri: String,
56 pub cid: String,
57 pub value: serde_json::Value,
58}
59
60/// Request body for deleteRecord XRPC.
61#[derive(Serialize)]
62#[serde(rename_all = "camelCase")]
63pub struct DeleteRecordRequest {
64 pub repo: String,
65 pub collection: String,
66 pub rkey: String,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub swap_record: Option<String>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub swap_commit: Option<String>,
71}
72
73/// Response from uploadBlob XRPC.
74#[derive(Deserialize)]
75pub struct UploadBlobResponse {
76 pub blob: BlobRef,
77}
78
79/// A reference to an uploaded blob.
80#[derive(Clone, Serialize, Deserialize)]
81pub struct BlobRef {
82 #[serde(rename = "$type")]
83 pub blob_type: String,
84 #[serde(rename = "ref")]
85 pub cid: CidLink,
86 #[serde(rename = "mimeType")]
87 pub mime_type: String,
88 pub size: u64,
89}
90
91/// A CID link.
92#[derive(Clone, Serialize, Deserialize)]
93pub struct CidLink {
94 #[serde(rename = "$link")]
95 pub link: String,
96}
97
98/// Error type for PDS operations.
99#[derive(Debug, Clone)]
100pub enum PdsError {
101 NetworkError(String),
102 AuthError(String),
103 ValidationError(String),
104 NotFound(String),
105 ServerError(String),
106}
107
108impl std::fmt::Display for PdsError {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 PdsError::NetworkError(e) => write!(f, "Network error: {}", e),
112 PdsError::AuthError(e) => write!(f, "Authentication error: {}", e),
113 PdsError::ValidationError(e) => write!(f, "Validation error: {}", e),
114 PdsError::NotFound(e) => write!(f, "Not found: {}", e),
115 PdsError::ServerError(e) => write!(f, "Server error: {}", e),
116 }
117 }
118}
119
120impl std::error::Error for PdsError {}
121
122impl PdsClient {
123 /// Create a new PDS client with DPoP support (OAuth tokens).
124 ///
125 /// Uses DPoP proof-of-possession for enhanced security.
126 pub fn new_with_dpop(pds_url: String, access_token: String, dpop_keypair: DpopKeypair) -> Self {
127 Self { http_client: reqwest::Client::new(), pds_url, access_token, dpop_keypair: Some(dpop_keypair) }
128 }
129
130 /// Create a new PDS client with Bearer authentication (app password tokens).
131 ///
132 /// Uses standard Bearer token authentication without DPoP.
133 pub fn new_bearer(pds_url: String, access_token: String) -> Self {
134 Self { http_client: reqwest::Client::new(), pds_url, access_token, dpop_keypair: None }
135 }
136
137 /// Create a new PDS client (deprecated - use new_with_dpop or new_bearer).
138 #[deprecated(since = "0.1.0", note = "Use new_with_dpop or new_bearer instead")]
139 pub fn new(pds_url: String, access_token: String, dpop_keypair: DpopKeypair) -> Self {
140 Self::new_with_dpop(pds_url, access_token, dpop_keypair)
141 }
142
143 /// Create or update a record in the repository.
144 pub async fn put_record(
145 &self, did: &str, collection: &str, rkey: &str, record: serde_json::Value,
146 ) -> Result<AtUri, PdsError> {
147 let url = format!("{}/xrpc/com.atproto.repo.putRecord", self.pds_url);
148
149 let request = PutRecordRequest {
150 repo: did.to_string(),
151 collection: collection.to_string(),
152 rkey: rkey.to_string(),
153 record,
154 swap_record: None,
155 swap_commit: None,
156 validate: Some(false),
157 };
158
159 let mut request_builder = self.http_client.post(&url);
160
161 if let Some(ref dpop_keypair) = self.dpop_keypair {
162 let dpop_proof = dpop_keypair.generate_proof("POST", &url, Some(&self.access_token));
163 request_builder = request_builder
164 .header("Authorization", format!("DPoP {}", self.access_token))
165 .header("DPoP", dpop_proof);
166 } else {
167 request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token));
168 }
169
170 let response = request_builder
171 .json(&request)
172 .send()
173 .await
174 .map_err(|e| PdsError::NetworkError(e.to_string()))?;
175
176 self.handle_response(response).await
177 }
178
179 /// Get a record from the repository.
180 pub async fn get_record(&self, did: &str, collection: &str, rkey: &str) -> Result<GetRecordResult, PdsError> {
181 let url = format!(
182 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}",
183 self.pds_url, did, collection, rkey
184 );
185
186 let mut request_builder = self.http_client.get(&url);
187
188 if let Some(ref dpop_keypair) = self.dpop_keypair {
189 let dpop_proof = dpop_keypair.generate_proof("GET", &url, Some(&self.access_token));
190 request_builder = request_builder
191 .header("Authorization", format!("DPoP {}", self.access_token))
192 .header("DPoP", dpop_proof);
193 } else {
194 request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token));
195 }
196
197 let response = request_builder
198 .send()
199 .await
200 .map_err(|e| PdsError::NetworkError(e.to_string()))?;
201
202 if response.status().is_success() {
203 let get_response: GetRecordResponse = response
204 .json()
205 .await
206 .map_err(|e| PdsError::NetworkError(format!("Failed to parse response: {}", e)))?;
207 Ok(GetRecordResult { uri: get_response.uri, cid: get_response.cid, value: get_response.value })
208 } else {
209 let status = response.status();
210 let body = response.text().await.unwrap_or_default();
211 Err(self.map_error_status(status, body))
212 }
213 }
214
215 /// Delete a record from the repository.
216 pub async fn delete_record(&self, did: &str, collection: &str, rkey: &str) -> Result<(), PdsError> {
217 let url = format!("{}/xrpc/com.atproto.repo.deleteRecord", self.pds_url);
218
219 let request = DeleteRecordRequest {
220 repo: did.to_string(),
221 collection: collection.to_string(),
222 rkey: rkey.to_string(),
223 swap_record: None,
224 swap_commit: None,
225 };
226
227 let mut request_builder = self.http_client.post(&url);
228
229 if let Some(ref dpop_keypair) = self.dpop_keypair {
230 let dpop_proof = dpop_keypair.generate_proof("POST", &url, Some(&self.access_token));
231 request_builder = request_builder
232 .header("Authorization", format!("DPoP {}", self.access_token))
233 .header("DPoP", dpop_proof);
234 } else {
235 request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token));
236 }
237
238 let response = request_builder
239 .json(&request)
240 .send()
241 .await
242 .map_err(|e| PdsError::NetworkError(e.to_string()))?;
243
244 if response.status().is_success() {
245 Ok(())
246 } else {
247 let status = response.status();
248 let body = response.text().await.unwrap_or_default();
249 Err(self.map_error_status(status, body))
250 }
251 }
252
253 /// Upload a blob (media attachment) to the repository.
254 pub async fn upload_blob(&self, data: Vec<u8>, mime_type: &str) -> Result<BlobRef, PdsError> {
255 let url = format!("{}/xrpc/com.atproto.repo.uploadBlob", self.pds_url);
256
257 let mut request_builder = self.http_client.post(&url);
258
259 if let Some(ref dpop_keypair) = self.dpop_keypair {
260 let dpop_proof = dpop_keypair.generate_proof("POST", &url, Some(&self.access_token));
261 request_builder = request_builder
262 .header("Authorization", format!("DPoP {}", self.access_token))
263 .header("DPoP", dpop_proof);
264 } else {
265 request_builder = request_builder.header("Authorization", format!("Bearer {}", self.access_token));
266 }
267
268 let response = request_builder
269 .header("Content-Type", mime_type)
270 .body(data)
271 .send()
272 .await
273 .map_err(|e| PdsError::NetworkError(e.to_string()))?;
274
275 if !response.status().is_success() {
276 let status = response.status();
277 let body = response.text().await.unwrap_or_default();
278 return Err(self.map_error_status(status, body));
279 }
280
281 let upload_response: UploadBlobResponse = response
282 .json()
283 .await
284 .map_err(|e| PdsError::NetworkError(e.to_string()))?;
285
286 Ok(upload_response.blob)
287 }
288
289 /// Handle response and parse AT-URI from success.
290 async fn handle_response(&self, response: reqwest::Response) -> Result<AtUri, PdsError> {
291 if !response.status().is_success() {
292 let status = response.status();
293 let body = response.text().await.unwrap_or_default();
294 return Err(self.map_error_status(status, body));
295 }
296
297 let put_response: PutRecordResponse = response
298 .json()
299 .await
300 .map_err(|e| PdsError::NetworkError(e.to_string()))?;
301
302 AtUri::parse(&put_response.uri).map_err(|e| PdsError::ValidationError(e.to_string()))
303 }
304
305 /// Map HTTP status to PdsError.
306 fn map_error_status(&self, status: reqwest::StatusCode, body: String) -> PdsError {
307 match status.as_u16() {
308 401 => PdsError::AuthError(body),
309 400 => PdsError::ValidationError(body),
310 404 => PdsError::NotFound(body),
311 _ => PdsError::ServerError(format!("{}: {}", status, body)),
312 }
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 #[test]
321 fn test_put_record_request_serialization() {
322 let request = PutRecordRequest {
323 repo: "did:plc:abc123".to_string(),
324 collection: "org.stormlightlabs.malfestio.deck".to_string(),
325 rkey: "3k5abc123".to_string(),
326 record: serde_json::json!({
327 "title": "Test Deck",
328 "createdAt": "2024-01-01T00:00:00Z"
329 }),
330 swap_record: None,
331 swap_commit: None,
332 validate: Some(true),
333 };
334
335 let json = serde_json::to_string(&request).unwrap();
336 assert!(json.contains("\"repo\":\"did:plc:abc123\""));
337 assert!(json.contains("\"collection\":\"org.stormlightlabs.malfestio.deck\""));
338 assert!(json.contains("\"rkey\":\"3k5abc123\""));
339 assert!(json.contains("\"validate\":true"));
340 }
341
342 #[test]
343 fn test_delete_record_request_serialization() {
344 let request = DeleteRecordRequest {
345 repo: "did:plc:abc123".to_string(),
346 collection: "org.stormlightlabs.malfestio.deck".to_string(),
347 rkey: "3k5abc123".to_string(),
348 swap_record: None,
349 swap_commit: None,
350 };
351
352 let json = serde_json::to_string(&request).unwrap();
353 assert!(json.contains("\"repo\":\"did:plc:abc123\""));
354 assert!(!json.contains("swapRecord"));
355 }
356
357 #[test]
358 fn test_blob_ref_serialization() {
359 let blob_ref = BlobRef {
360 blob_type: "blob".to_string(),
361 cid: CidLink { link: "bafyreiabc123".to_string() },
362 mime_type: "image/jpeg".to_string(),
363 size: 12345,
364 };
365
366 let json = serde_json::to_string(&blob_ref).unwrap();
367 assert!(json.contains("\"$type\":\"blob\""));
368 assert!(json.contains("\"$link\":\"bafyreiabc123\""));
369 assert!(json.contains("\"mimeType\":\"image/jpeg\""));
370 }
371
372 #[test]
373 fn test_pds_error_display() {
374 let err = PdsError::AuthError("Invalid token".to_string());
375 assert!(err.to_string().contains("Invalid token"));
376
377 let err = PdsError::NetworkError("Connection refused".to_string());
378 assert!(err.to_string().contains("Connection refused"));
379 }
380
381 #[test]
382 fn test_pds_client_new_with_dpop() {
383 use crate::oauth::dpop::DpopKeypair;
384
385 let keypair = DpopKeypair::generate();
386 let client = PdsClient::new_with_dpop("https://bsky.social".to_string(), "test_token".to_string(), keypair);
387
388 assert_eq!(client.pds_url, "https://bsky.social");
389 assert_eq!(client.access_token, "test_token");
390 assert!(client.dpop_keypair.is_some());
391 }
392
393 #[test]
394 fn test_pds_client_new_bearer() {
395 let client = PdsClient::new_bearer("https://bsky.social".to_string(), "test_token".to_string());
396
397 assert_eq!(client.pds_url, "https://bsky.social");
398 assert_eq!(client.access_token, "test_token");
399 assert!(client.dpop_keypair.is_none());
400 }
401
402 #[test]
403 #[allow(deprecated)]
404 fn test_pds_client_new_deprecated() {
405 use crate::oauth::dpop::DpopKeypair;
406
407 let keypair = DpopKeypair::generate();
408 let client = PdsClient::new("https://bsky.social".to_string(), "test_token".to_string(), keypair);
409
410 assert_eq!(client.pds_url, "https://bsky.social");
411 assert_eq!(client.access_token, "test_token");
412 assert!(client.dpop_keypair.is_some());
413 }
414}