atproto blogging
1//! PDS synchronization for CRDT documents.
2//!
3//! Generic sync logic for AT Protocol edit records (root/diff/draft).
4//! Works with any client implementing the required traits.
5
6use std::collections::{BTreeMap, HashMap};
7
8use jacquard::bytes::Bytes;
9use jacquard::cowstr::ToCowStr;
10use jacquard::prelude::*;
11use jacquard::smol_str::format_smolstr;
12use jacquard::types::blob::MimeType;
13use jacquard::types::ident::AtIdentifier;
14use jacquard::types::recordkey::RecordKey;
15use jacquard::types::string::{AtUri, Cid, Did, Nsid};
16use jacquard::types::tid::Ticker;
17use jacquard::types::uri::Uri;
18use jacquard::url::Url;
19use jacquard::{CowStr, IntoStatic, to_data};
20use loro::{ExportMode, LoroDoc};
21use weaver_api::com_atproto::repo::create_record::CreateRecord;
22use weaver_api::com_atproto::repo::strong_ref::StrongRef;
23use weaver_api::sh_weaver::edit::diff::Diff;
24use weaver_api::sh_weaver::edit::draft::Draft;
25use weaver_api::sh_weaver::edit::root::Root;
26use weaver_api::sh_weaver::edit::{DocRef, DocRefValue, DraftRef, EntryRef};
27use weaver_common::agent::WeaverExt;
28use weaver_common::constellation::{GetBacklinksQuery, RecordId};
29
30use crate::CrdtError;
31use crate::document::CrdtDocument;
32
33const ROOT_NSID: &str = "sh.weaver.edit.root";
34const DIFF_NSID: &str = "sh.weaver.edit.diff";
35const DRAFT_NSID: &str = "sh.weaver.edit.draft";
36const CONSTELLATION_URL: &str = "https://constellation.microcosm.blue";
37
38/// Result of a sync operation.
39#[derive(Clone, Debug)]
40pub enum SyncResult {
41 /// Created a new root record (first sync).
42 CreatedRoot {
43 uri: AtUri<'static>,
44 cid: Cid<'static>,
45 },
46 /// Created a new diff record.
47 CreatedDiff {
48 uri: AtUri<'static>,
49 cid: Cid<'static>,
50 },
51 /// No changes to sync.
52 NoChanges,
53}
54
55/// Result of creating an edit root.
56pub struct CreateRootResult {
57 /// The root record URI.
58 pub root_uri: AtUri<'static>,
59 /// The root record CID.
60 pub root_cid: Cid<'static>,
61 /// Draft stub StrongRef if this was a new draft.
62 pub draft_ref: Option<StrongRef<'static>>,
63}
64
65/// Build a DocRef for either a published entry or an unpublished draft.
66fn build_doc_ref(
67 did: &Did<'_>,
68 draft_key: &str,
69 entry_uri: Option<&AtUri<'_>>,
70 entry_cid: Option<&Cid<'_>>,
71) -> DocRef<'static> {
72 match (entry_uri, entry_cid) {
73 (Some(uri), Some(cid)) => DocRef {
74 value: DocRefValue::EntryRef(Box::new(EntryRef {
75 entry: StrongRef::new()
76 .uri(uri.clone().into_static())
77 .cid(cid.clone().into_static())
78 .build(),
79 extra_data: None,
80 })),
81 extra_data: None,
82 },
83 _ => {
84 // Transform localStorage key to synthetic AT-URI
85 let rkey = extract_draft_rkey(draft_key);
86 let canonical_uri = format_smolstr!("at://{}/{}/{}", did, DRAFT_NSID, rkey);
87
88 DocRef {
89 value: DocRefValue::DraftRef(Box::new(DraftRef {
90 draft_key: canonical_uri.into(),
91 extra_data: None,
92 })),
93 extra_data: None,
94 }
95 }
96 }
97}
98
99/// Extract the rkey (TID) from a draft key.
100fn extract_draft_rkey(draft_key: &str) -> String {
101 if let Some(tid) = draft_key.strip_prefix("new:") {
102 tid.to_string()
103 } else if draft_key.starts_with("at://") {
104 draft_key.split('/').last().unwrap_or(draft_key).to_string()
105 } else {
106 draft_key.to_string()
107 }
108}
109
110/// Get current DID from session.
111async fn get_current_did<C>(client: &C) -> Result<Did<'static>, CrdtError>
112where
113 C: AgentSession,
114{
115 client
116 .session_info()
117 .await
118 .map(|(did, _)| did)
119 .ok_or(CrdtError::NotAuthenticated)
120}
121
122/// Create the draft stub record on PDS.
123async fn create_draft_stub<C>(
124 client: &C,
125 did: &Did<'_>,
126 rkey: &str,
127) -> Result<(AtUri<'static>, Cid<'static>), CrdtError>
128where
129 C: XrpcClient + AgentSession,
130{
131 let draft = Draft::new()
132 .created_at(jacquard::types::datetime::Datetime::now())
133 .build();
134
135 let draft_data =
136 to_data(&draft).map_err(|e| CrdtError::Serialization(format!("draft: {}", e)))?;
137
138 let record_key =
139 RecordKey::any(rkey).map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?;
140
141 let collection =
142 Nsid::new(DRAFT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?;
143
144 let request = CreateRecord::new()
145 .repo(AtIdentifier::Did(did.clone().into_static()))
146 .collection(collection)
147 .rkey(record_key)
148 .record(draft_data)
149 .build();
150
151 let response = client
152 .send(request)
153 .await
154 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
155
156 let output = response
157 .into_output()
158 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
159
160 Ok((output.uri.into_static(), output.cid.into_static()))
161}
162
163/// Create the edit root record for a document.
164pub async fn create_edit_root<C, D>(
165 client: &C,
166 doc: &D,
167 draft_key: &str,
168 entry_uri: Option<&AtUri<'_>>,
169 entry_cid: Option<&Cid<'_>>,
170) -> Result<CreateRootResult, CrdtError>
171where
172 C: XrpcClient + IdentityResolver + AgentSession,
173 D: CrdtDocument,
174{
175 let did = get_current_did(client).await?;
176
177 // For drafts, create the stub record first
178 let draft_ref: Option<StrongRef<'static>> = if entry_uri.is_none() {
179 let rkey = extract_draft_rkey(draft_key);
180 match create_draft_stub(client, &did, &rkey).await {
181 Ok((uri, cid)) => Some(StrongRef::new().uri(uri).cid(cid).build()),
182 Err(e) => {
183 let err_str = e.to_string();
184 if err_str.contains("RecordAlreadyExists") || err_str.contains("already exists") {
185 // Draft exists, try to fetch it
186 let draft_uri_str = format!("at://{}/{}/{}", did, DRAFT_NSID, rkey);
187 if let Ok(draft_uri) = AtUri::new(&draft_uri_str) {
188 if let Ok(response) = client.get_record::<Draft>(&draft_uri).await {
189 if let Ok(output) = response.into_output() {
190 output.cid.map(|cid| {
191 StrongRef::new()
192 .uri(draft_uri.into_static())
193 .cid(cid.into_static())
194 .build()
195 })
196 } else {
197 None
198 }
199 } else {
200 None
201 }
202 } else {
203 None
204 }
205 } else {
206 tracing::warn!("Failed to create draft stub: {}", e);
207 None
208 }
209 }
210 }
211 } else {
212 None
213 };
214
215 // Export full snapshot
216 let snapshot = doc.export_snapshot();
217
218 // Upload snapshot blob
219 let mime_type = MimeType::new_static("application/octet-stream");
220 let blob_ref = client
221 .upload_blob(snapshot, mime_type)
222 .await
223 .map_err(|e| CrdtError::Xrpc(format!("upload blob: {}", e)))?;
224
225 // Build DocRef
226 let doc_ref = build_doc_ref(&did, draft_key, entry_uri, entry_cid);
227
228 // Build root record
229 let root = Root::new().doc(doc_ref).snapshot(blob_ref).build();
230
231 let root_data = to_data(&root).map_err(|e| CrdtError::Serialization(format!("root: {}", e)))?;
232
233 // Generate TID for the root rkey
234 let root_tid = Ticker::new().next(None);
235 let rkey = RecordKey::any(root_tid.as_str())
236 .map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?;
237
238 let collection =
239 Nsid::new(ROOT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?;
240
241 let request = CreateRecord::new()
242 .repo(AtIdentifier::Did(did))
243 .collection(collection)
244 .rkey(rkey)
245 .record(root_data)
246 .build();
247
248 let response = client
249 .send(request)
250 .await
251 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
252
253 let output = response
254 .into_output()
255 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
256
257 Ok(CreateRootResult {
258 root_uri: output.uri.into_static(),
259 root_cid: output.cid.into_static(),
260 draft_ref,
261 })
262}
263
264/// Create a diff record with updates since the last sync.
265pub async fn create_diff<C, D>(
266 client: &C,
267 doc: &D,
268 root_uri: &AtUri<'_>,
269 root_cid: &Cid<'_>,
270 prev_diff: Option<(&AtUri<'_>, &Cid<'_>)>,
271 draft_key: &str,
272 entry_uri: Option<&AtUri<'_>>,
273 entry_cid: Option<&Cid<'_>>,
274) -> Result<Option<(AtUri<'static>, Cid<'static>)>, CrdtError>
275where
276 C: XrpcClient + IdentityResolver + AgentSession,
277 D: CrdtDocument,
278{
279 // Export updates since last sync
280 let updates = match doc.export_updates_since_sync() {
281 Some(u) => u,
282 None => return Ok(None),
283 };
284
285 let did = get_current_did(client).await?;
286
287 // Threshold for inline vs blob storage (8KB max for inline per lexicon)
288 const INLINE_THRESHOLD: usize = 8192;
289
290 let (blob_ref, inline_diff): (Option<jacquard::types::blob::BlobRef<'static>>, _) =
291 if updates.len() <= INLINE_THRESHOLD {
292 (None, Some(jacquard::bytes::Bytes::from(updates)))
293 } else {
294 let mime_type = MimeType::new_static("application/octet-stream");
295 let blob = client
296 .upload_blob(updates, mime_type)
297 .await
298 .map_err(|e| CrdtError::Xrpc(format!("upload diff: {}", e)))?;
299 (Some(blob.into()), None)
300 };
301
302 // Build DocRef
303 let doc_ref = build_doc_ref(&did, draft_key, entry_uri, entry_cid);
304
305 // Build root reference
306 let root_ref = StrongRef::new()
307 .uri(root_uri.clone().into_static())
308 .cid(root_cid.clone().into_static())
309 .build();
310
311 // Build prev reference
312 let prev_ref = prev_diff.map(|(uri, cid)| {
313 StrongRef::new()
314 .uri(uri.clone().into_static())
315 .cid(cid.clone().into_static())
316 .build()
317 });
318
319 // Build diff record
320 let diff = Diff::new()
321 .doc(doc_ref)
322 .root(root_ref)
323 .maybe_snapshot(blob_ref)
324 .maybe_inline_diff(inline_diff)
325 .maybe_prev(prev_ref)
326 .build();
327
328 let diff_data = to_data(&diff).map_err(|e| CrdtError::Serialization(format!("diff: {}", e)))?;
329
330 // Generate TID for the diff rkey
331 let diff_tid = Ticker::new().next(None);
332 let rkey = RecordKey::any(diff_tid.as_str())
333 .map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?;
334
335 let collection =
336 Nsid::new(DIFF_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?;
337
338 let request = CreateRecord::new()
339 .repo(AtIdentifier::Did(did))
340 .collection(collection)
341 .rkey(rkey)
342 .record(diff_data)
343 .build();
344
345 let response = client
346 .send(request)
347 .await
348 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
349
350 let output = response
351 .into_output()
352 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
353
354 Ok(Some((output.uri.into_static(), output.cid.into_static())))
355}
356
357/// Sync the document to the PDS.
358pub async fn sync_to_pds<C, D>(
359 client: &C,
360 doc: &mut D,
361 draft_key: &str,
362 entry_uri: Option<&AtUri<'_>>,
363 entry_cid: Option<&Cid<'_>>,
364) -> Result<SyncResult, CrdtError>
365where
366 C: XrpcClient + IdentityResolver + AgentSession,
367 D: CrdtDocument,
368{
369 if !doc.has_unsynced_changes() {
370 return Ok(SyncResult::NoChanges);
371 }
372
373 if doc.edit_root().is_none() {
374 // First sync - create root
375 let result = create_edit_root(client, doc, draft_key, entry_uri, entry_cid).await?;
376
377 let root_ref = StrongRef::new()
378 .uri(result.root_uri.clone())
379 .cid(result.root_cid.clone())
380 .build();
381
382 doc.set_edit_root(Some(root_ref));
383 doc.set_last_diff(None);
384 doc.mark_synced();
385
386 Ok(SyncResult::CreatedRoot {
387 uri: result.root_uri,
388 cid: result.root_cid,
389 })
390 } else {
391 // Subsequent sync - create diff
392 let root = doc.edit_root().unwrap();
393 let prev = doc.last_diff();
394
395 let prev_refs = prev.as_ref().map(|p| (&p.uri, &p.cid));
396
397 let result = create_diff(
398 client, doc, &root.uri, &root.cid, prev_refs, draft_key, entry_uri, entry_cid,
399 )
400 .await?;
401
402 match result {
403 Some((uri, cid)) => {
404 let diff_ref = StrongRef::new().uri(uri.clone()).cid(cid.clone()).build();
405 doc.set_last_diff(Some(diff_ref));
406 doc.mark_synced();
407
408 Ok(SyncResult::CreatedDiff { uri, cid })
409 }
410 None => Ok(SyncResult::NoChanges),
411 }
412 }
413}
414
415/// Find all edit roots for an entry using weaver-index.
416#[cfg(feature = "use-index")]
417pub async fn find_all_edit_roots<C>(
418 client: &C,
419 entry_uri: &AtUri<'_>,
420 _collaborator_dids: Vec<Did<'static>>,
421) -> Result<Vec<RecordId<'static>>, CrdtError>
422where
423 C: WeaverExt,
424{
425 use jacquard::types::ident::AtIdentifier;
426 use jacquard::types::nsid::Nsid;
427 use weaver_api::sh_weaver::edit::get_edit_history::GetEditHistory;
428
429 let response = client
430 .send(GetEditHistory::new().resource(entry_uri.clone()).build())
431 .await
432 .map_err(|e| CrdtError::Xrpc(format!("get edit history: {}", e)))?;
433
434 let output = response
435 .into_output()
436 .map_err(|e| CrdtError::Xrpc(format!("parse edit history: {}", e)))?;
437
438 let roots: Vec<RecordId<'static>> = output
439 .roots
440 .into_iter()
441 .filter_map(|entry| {
442 let uri = AtUri::new(entry.uri.as_ref()).ok()?;
443 let did = match uri.authority() {
444 AtIdentifier::Did(d) => d.clone().into_static(),
445 _ => return None,
446 };
447 let rkey = uri.rkey()?.clone().into_static();
448 Some(RecordId {
449 did,
450 collection: Nsid::raw(ROOT_NSID).into_static(),
451 rkey,
452 })
453 })
454 .collect();
455
456 tracing::debug!("find_all_edit_roots (index): found {} roots", roots.len());
457
458 Ok(roots)
459}
460
461/// Find all edit roots for an entry using Constellation backlinks.
462#[cfg(not(feature = "use-index"))]
463pub async fn find_all_edit_roots<C>(
464 client: &C,
465 entry_uri: &AtUri<'_>,
466 collaborator_dids: Vec<Did<'static>>,
467) -> Result<Vec<RecordId<'static>>, CrdtError>
468where
469 C: XrpcClient,
470{
471 let constellation_url =
472 Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?;
473
474 let query = GetBacklinksQuery {
475 subject: Uri::At(entry_uri.clone().into_static()),
476 source: format_smolstr!("{}:doc.value.entry.uri", ROOT_NSID).into(),
477 cursor: None,
478 did: collaborator_dids,
479 limit: 100,
480 };
481
482 let response = client
483 .xrpc(constellation_url)
484 .send(&query)
485 .await
486 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
487
488 let output = response
489 .into_output()
490 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
491
492 Ok(output
493 .records
494 .into_iter()
495 .map(|r| r.into_static())
496 .collect())
497}
498
499/// Find all diffs for a root record using Constellation backlinks.
500pub async fn find_diffs_for_root<C>(
501 client: &C,
502 root_uri: &AtUri<'_>,
503) -> Result<Vec<RecordId<'static>>, CrdtError>
504where
505 C: XrpcClient,
506{
507 let constellation_url =
508 Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?;
509
510 let mut all_diffs = Vec::new();
511 let mut cursor: Option<String> = None;
512
513 loop {
514 let query = GetBacklinksQuery {
515 subject: Uri::At(root_uri.clone().into_static()),
516 source: format_smolstr!("{}:root.uri", DIFF_NSID).into(),
517 cursor: cursor.map(Into::into),
518 did: vec![],
519 limit: 100,
520 };
521
522 let response = client
523 .xrpc(constellation_url.clone())
524 .send(&query)
525 .await
526 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
527
528 let output = response
529 .into_output()
530 .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
531
532 all_diffs.extend(output.records.into_iter().map(|r| r.into_static()));
533
534 match output.cursor {
535 Some(c) => cursor = Some(c.to_string()),
536 None => break,
537 }
538 }
539
540 Ok(all_diffs)
541}
542
543// ============================================================================
544// Loading functions
545// ============================================================================
546
547/// Result of loading edit state from PDS.
548#[derive(Clone, Debug)]
549pub struct PdsEditState {
550 /// The root record reference.
551 pub root_ref: StrongRef<'static>,
552 /// The latest diff reference (if any diffs exist).
553 pub last_diff_ref: Option<StrongRef<'static>>,
554 /// The Loro snapshot bytes from the root.
555 pub root_snapshot: Bytes,
556 /// All diff update bytes in order (oldest first, by TID).
557 pub diff_updates: Vec<Bytes>,
558 /// Last seen diff URI per collaborator root (for incremental sync).
559 pub last_seen_diffs: HashMap<AtUri<'static>, AtUri<'static>>,
560 /// The DocRef from the root record.
561 pub doc_ref: DocRef<'static>,
562}
563
564/// Find edit root for a draft using Constellation backlinks.
565pub async fn find_edit_root_for_draft<C>(
566 client: &C,
567 draft_uri: &AtUri<'_>,
568) -> Result<Option<RecordId<'static>>, CrdtError>
569where
570 C: XrpcClient,
571{
572 let constellation_url =
573 Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?;
574
575 let query = GetBacklinksQuery {
576 subject: Uri::At(draft_uri.clone().into_static()),
577 source: format_smolstr!("{}:doc.value.draft_key", ROOT_NSID).into(),
578 cursor: None,
579 did: vec![],
580 limit: 1,
581 };
582
583 let response = client
584 .xrpc(constellation_url)
585 .send(&query)
586 .await
587 .map_err(|e| CrdtError::Xrpc(format!("constellation query: {}", e)))?;
588
589 let output = response
590 .into_output()
591 .map_err(|e| CrdtError::Xrpc(format!("parse constellation: {}", e)))?;
592
593 Ok(output.records.into_iter().next().map(|r| r.into_static()))
594}
595
596/// Build a canonical draft URI from draft key and DID.
597pub fn build_draft_uri(did: &Did<'_>, draft_key: &str) -> AtUri<'static> {
598 let rkey = extract_draft_rkey(draft_key);
599 let uri_str = format_smolstr!("at://{}/{}/{}", did, DRAFT_NSID, rkey);
600 AtUri::new(&uri_str).unwrap().into_static()
601}
602
603/// Load edit state from a root record ID.
604async fn load_edit_state_from_root_id<C>(
605 client: &C,
606 root_id: RecordId<'static>,
607 after_rkey: Option<&str>,
608) -> Result<Option<PdsEditState>, CrdtError>
609where
610 C: WeaverExt,
611{
612 let root_uri = AtUri::new(&format_smolstr!(
613 "at://{}/{}/{}",
614 root_id.did,
615 ROOT_NSID,
616 root_id.rkey.as_ref()
617 ))
618 .map_err(|e| CrdtError::InvalidUri(format!("root URI: {}", e)))?
619 .into_static();
620
621 let root_response = client
622 .get_record::<Root>(&root_uri)
623 .await
624 .map_err(|e| CrdtError::Xrpc(format!("fetch root: {}", e)))?;
625
626 let root_output = root_response
627 .into_output()
628 .map_err(|e| CrdtError::Xrpc(format!("parse root: {}", e)))?;
629
630 let root_cid = root_output
631 .cid
632 .ok_or_else(|| CrdtError::Xrpc("root missing CID".into()))?;
633
634 let root_ref = StrongRef::new()
635 .uri(root_uri.clone())
636 .cid(root_cid.into_static())
637 .build();
638
639 let doc_ref = root_output.value.doc.into_static();
640
641 let root_snapshot = client
642 .fetch_blob(&root_id.did, root_output.value.snapshot.blob().cid())
643 .await
644 .map_err(|e| CrdtError::Xrpc(format!("fetch snapshot blob: {}", e)))?;
645
646 let diff_ids = find_diffs_for_root(client, &root_uri).await?;
647
648 if diff_ids.is_empty() {
649 return Ok(Some(PdsEditState {
650 root_ref,
651 last_diff_ref: None,
652 root_snapshot,
653 diff_updates: vec![],
654 last_seen_diffs: HashMap::new(),
655 doc_ref,
656 }));
657 }
658
659 let mut diffs_by_rkey: BTreeMap<
660 CowStr<'static>,
661 (Diff<'static>, Cid<'static>, AtUri<'static>),
662 > = BTreeMap::new();
663
664 for diff_id in &diff_ids {
665 let rkey_str: &str = diff_id.rkey.as_ref();
666
667 if let Some(after) = after_rkey {
668 if rkey_str <= after {
669 continue;
670 }
671 }
672
673 let diff_uri = AtUri::new(&format_smolstr!(
674 "at://{}/{}/{}",
675 diff_id.did,
676 DIFF_NSID,
677 rkey_str
678 ))
679 .map_err(|e| CrdtError::InvalidUri(format!("diff URI: {}", e)))?
680 .into_static();
681
682 let diff_response = client
683 .get_record::<Diff>(&diff_uri)
684 .await
685 .map_err(|e| CrdtError::Xrpc(format!("fetch diff: {}", e)))?;
686
687 let diff_output = diff_response
688 .into_output()
689 .map_err(|e| CrdtError::Xrpc(format!("parse diff: {}", e)))?;
690
691 let diff_cid = diff_output
692 .cid
693 .ok_or_else(|| CrdtError::Xrpc("diff missing CID".into()))?;
694
695 diffs_by_rkey.insert(
696 rkey_str.to_cowstr().into_static(),
697 (
698 diff_output.value.into_static(),
699 diff_cid.into_static(),
700 diff_uri,
701 ),
702 );
703 }
704
705 let mut diff_updates = Vec::new();
706 let mut last_diff_ref = None;
707
708 for (_rkey, (diff, cid, uri)) in &diffs_by_rkey {
709 let diff_bytes = if let Some(ref inline) = diff.inline_diff {
710 inline.clone()
711 } else if let Some(ref snapshot) = diff.snapshot {
712 client
713 .fetch_blob(&root_id.did, snapshot.blob().cid())
714 .await
715 .map_err(|e| CrdtError::Xrpc(format!("fetch diff blob: {}", e)))?
716 } else {
717 tracing::warn!("Diff has neither inline_diff nor snapshot, skipping");
718 continue;
719 };
720
721 diff_updates.push(diff_bytes);
722 last_diff_ref = Some(StrongRef::new().uri(uri.clone()).cid(cid.clone()).build());
723 }
724
725 Ok(Some(PdsEditState {
726 root_ref,
727 last_diff_ref,
728 root_snapshot,
729 diff_updates,
730 last_seen_diffs: HashMap::new(),
731 doc_ref,
732 }))
733}
734
735/// Load edit state from PDS for an entry (single root).
736pub async fn load_edit_state_from_entry<C>(
737 client: &C,
738 entry_uri: &AtUri<'_>,
739 collaborator_dids: Vec<Did<'static>>,
740) -> Result<Option<PdsEditState>, CrdtError>
741where
742 C: WeaverExt,
743{
744 let root_id = match find_all_edit_roots(client, entry_uri, collaborator_dids)
745 .await?
746 .into_iter()
747 .next()
748 {
749 Some(id) => id,
750 None => return Ok(None),
751 };
752
753 load_edit_state_from_root_id(client, root_id, None).await
754}
755
756/// Load edit state from PDS for a draft.
757pub async fn load_edit_state_from_draft<C>(
758 client: &C,
759 draft_uri: &AtUri<'_>,
760) -> Result<Option<PdsEditState>, CrdtError>
761where
762 C: WeaverExt,
763{
764 let root_id = match find_edit_root_for_draft(client, draft_uri).await? {
765 Some(id) => id,
766 None => return Ok(None),
767 };
768
769 load_edit_state_from_root_id(client, root_id, None).await
770}
771
772/// Load and merge edit states from ALL collaborator repos.
773pub async fn load_all_edit_states<C>(
774 client: &C,
775 entry_uri: &AtUri<'_>,
776 collaborator_dids: Vec<Did<'static>>,
777 current_did: Option<&Did<'_>>,
778 last_seen_diffs: &HashMap<AtUri<'static>, AtUri<'static>>,
779) -> Result<Option<PdsEditState>, CrdtError>
780where
781 C: WeaverExt,
782{
783 let all_roots = find_all_edit_roots(client, entry_uri, collaborator_dids).await?;
784
785 if all_roots.is_empty() {
786 return Ok(None);
787 }
788
789 let merged_doc = LoroDoc::new();
790 let mut our_root_ref: Option<StrongRef<'static>> = None;
791 let mut our_last_diff_ref: Option<StrongRef<'static>> = None;
792 let mut merged_doc_ref: Option<DocRef<'static>> = None;
793 let mut updated_last_seen = last_seen_diffs.clone();
794
795 for root_id in all_roots {
796 let root_did = root_id.did.clone();
797
798 let root_uri = AtUri::new(&format_smolstr!(
799 "at://{}/{}/{}",
800 root_id.did,
801 ROOT_NSID,
802 root_id.rkey.as_ref()
803 ))
804 .ok()
805 .map(|u| u.into_static());
806
807 let after_rkey = root_uri.as_ref().and_then(|uri| {
808 last_seen_diffs
809 .get(uri)
810 .and_then(|diff_uri| diff_uri.rkey().map(|rk| rk.0.to_string()))
811 });
812
813 if let Some(pds_state) =
814 load_edit_state_from_root_id(client, root_id, after_rkey.as_deref()).await?
815 {
816 if let Err(e) = merged_doc.import(&pds_state.root_snapshot) {
817 tracing::warn!("Failed to import root snapshot from {}: {:?}", root_did, e);
818 continue;
819 }
820
821 for diff in &pds_state.diff_updates {
822 if let Err(e) = merged_doc.import(diff) {
823 tracing::warn!("Failed to import diff from {}: {:?}", root_did, e);
824 }
825 }
826
827 if let (Some(uri), Some(last_diff)) = (&root_uri, &pds_state.last_diff_ref) {
828 updated_last_seen.insert(uri.clone(), last_diff.uri.clone().into_static());
829 }
830
831 if merged_doc_ref.is_none() {
832 merged_doc_ref = Some(pds_state.doc_ref.clone());
833 }
834
835 let is_our_root = current_did.is_some_and(|did| root_did == *did);
836
837 if is_our_root {
838 our_root_ref = Some(pds_state.root_ref);
839 our_last_diff_ref = pds_state.last_diff_ref;
840 } else if our_root_ref.is_none() {
841 our_root_ref = Some(pds_state.root_ref);
842 our_last_diff_ref = pds_state.last_diff_ref;
843 }
844 }
845 }
846
847 let merged_snapshot = merged_doc
848 .export(ExportMode::Snapshot)
849 .map_err(|e| CrdtError::Loro(format!("export merged: {}", e)))?;
850
851 Ok(our_root_ref.map(|root_ref| PdsEditState {
852 root_ref,
853 last_diff_ref: our_last_diff_ref,
854 root_snapshot: merged_snapshot.into(),
855 diff_updates: vec![],
856 last_seen_diffs: updated_last_seen,
857 doc_ref: merged_doc_ref.expect("Should have doc_ref if we have root"),
858 }))
859}
860
861/// Remote draft info from PDS.
862#[derive(Clone, Debug)]
863pub struct RemoteDraft {
864 /// The draft record URI.
865 pub uri: AtUri<'static>,
866 /// The rkey (TID) of the draft.
867 pub rkey: String,
868 /// When the draft was created.
869 pub created_at: String,
870}
871
872/// List all drafts for a user using weaver-index.
873#[cfg(feature = "use-index")]
874pub async fn list_drafts<C>(client: &C, did: &Did<'_>) -> Result<Vec<RemoteDraft>, CrdtError>
875where
876 C: WeaverExt,
877{
878 use jacquard::types::ident::AtIdentifier;
879 use weaver_api::sh_weaver::edit::list_drafts::ListDrafts;
880
881 let actor = AtIdentifier::Did(did.clone().into_static());
882 let response = client
883 .send(ListDrafts::new().actor(actor).build())
884 .await
885 .map_err(|e| CrdtError::Xrpc(format!("list drafts: {}", e)))?;
886
887 let output = response
888 .into_output()
889 .map_err(|e| CrdtError::Xrpc(format!("parse list drafts: {}", e)))?;
890
891 tracing::debug!("list_drafts (index): found {} drafts", output.drafts.len());
892
893 let drafts = output
894 .drafts
895 .into_iter()
896 .filter_map(|draft| {
897 let uri = AtUri::new(draft.uri.as_ref()).ok()?.into_static();
898 let rkey = uri.rkey()?.0.as_str().to_string();
899 let created_at = draft.created_at.to_string();
900 Some(RemoteDraft {
901 uri,
902 rkey,
903 created_at,
904 })
905 })
906 .collect();
907
908 Ok(drafts)
909}
910
911/// List all drafts for a user (direct PDS query, no index).
912#[cfg(not(feature = "use-index"))]
913pub async fn list_drafts<C>(client: &C, did: &Did<'_>) -> Result<Vec<RemoteDraft>, CrdtError>
914where
915 C: WeaverExt,
916{
917 use weaver_api::com_atproto::repo::list_records::ListRecords;
918
919 let pds_url = client
920 .pds_for_did(did)
921 .await
922 .map_err(|e| CrdtError::Xrpc(format!("resolve DID: {}", e)))?;
923
924 let collection =
925 Nsid::new(DRAFT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?;
926
927 let request = ListRecords::new()
928 .repo(did.clone())
929 .collection(collection)
930 .limit(100)
931 .build();
932
933 let response = client
934 .xrpc(pds_url)
935 .send(&request)
936 .await
937 .map_err(|e| CrdtError::Xrpc(format!("list records: {}", e)))?;
938
939 let output = response
940 .into_output()
941 .map_err(|e| CrdtError::Xrpc(format!("parse list records: {}", e)))?;
942
943 let mut drafts = Vec::new();
944 for record in output.records {
945 let rkey = record
946 .uri
947 .rkey()
948 .map(|r| r.0.as_str().to_string())
949 .unwrap_or_default();
950
951 let created_at = jacquard::from_data::<Draft>(&record.value)
952 .map(|d| d.created_at.to_string())
953 .unwrap_or_default();
954
955 drafts.push(RemoteDraft {
956 uri: record.uri.into_static(),
957 rkey,
958 created_at,
959 });
960 }
961
962 Ok(drafts)
963}