at main 963 lines 30 kB view raw
1//! PDS synchronization for CRDT documents. 2//! 3//! Generic sync logic for AT Protocol edit records (root/diff/draft). 4//! Works with any client implementing the required traits. 5 6use std::collections::{BTreeMap, HashMap}; 7 8use jacquard::bytes::Bytes; 9use jacquard::cowstr::ToCowStr; 10use jacquard::prelude::*; 11use jacquard::smol_str::format_smolstr; 12use jacquard::types::blob::MimeType; 13use jacquard::types::ident::AtIdentifier; 14use jacquard::types::recordkey::RecordKey; 15use jacquard::types::string::{AtUri, Cid, Did, Nsid}; 16use jacquard::types::tid::Ticker; 17use jacquard::types::uri::Uri; 18use jacquard::url::Url; 19use jacquard::{CowStr, IntoStatic, to_data}; 20use loro::{ExportMode, LoroDoc}; 21use weaver_api::com_atproto::repo::create_record::CreateRecord; 22use weaver_api::com_atproto::repo::strong_ref::StrongRef; 23use weaver_api::sh_weaver::edit::diff::Diff; 24use weaver_api::sh_weaver::edit::draft::Draft; 25use weaver_api::sh_weaver::edit::root::Root; 26use weaver_api::sh_weaver::edit::{DocRef, DocRefValue, DraftRef, EntryRef}; 27use weaver_common::agent::WeaverExt; 28use weaver_common::constellation::{GetBacklinksQuery, RecordId}; 29 30use crate::CrdtError; 31use crate::document::CrdtDocument; 32 33const ROOT_NSID: &str = "sh.weaver.edit.root"; 34const DIFF_NSID: &str = "sh.weaver.edit.diff"; 35const DRAFT_NSID: &str = "sh.weaver.edit.draft"; 36const CONSTELLATION_URL: &str = "https://constellation.microcosm.blue"; 37 38/// Result of a sync operation. 39#[derive(Clone, Debug)] 40pub enum SyncResult { 41 /// Created a new root record (first sync). 42 CreatedRoot { 43 uri: AtUri<'static>, 44 cid: Cid<'static>, 45 }, 46 /// Created a new diff record. 47 CreatedDiff { 48 uri: AtUri<'static>, 49 cid: Cid<'static>, 50 }, 51 /// No changes to sync. 52 NoChanges, 53} 54 55/// Result of creating an edit root. 56pub struct CreateRootResult { 57 /// The root record URI. 58 pub root_uri: AtUri<'static>, 59 /// The root record CID. 60 pub root_cid: Cid<'static>, 61 /// Draft stub StrongRef if this was a new draft. 62 pub draft_ref: Option<StrongRef<'static>>, 63} 64 65/// Build a DocRef for either a published entry or an unpublished draft. 66fn build_doc_ref( 67 did: &Did<'_>, 68 draft_key: &str, 69 entry_uri: Option<&AtUri<'_>>, 70 entry_cid: Option<&Cid<'_>>, 71) -> DocRef<'static> { 72 match (entry_uri, entry_cid) { 73 (Some(uri), Some(cid)) => DocRef { 74 value: DocRefValue::EntryRef(Box::new(EntryRef { 75 entry: StrongRef::new() 76 .uri(uri.clone().into_static()) 77 .cid(cid.clone().into_static()) 78 .build(), 79 extra_data: None, 80 })), 81 extra_data: None, 82 }, 83 _ => { 84 // Transform localStorage key to synthetic AT-URI 85 let rkey = extract_draft_rkey(draft_key); 86 let canonical_uri = format_smolstr!("at://{}/{}/{}", did, DRAFT_NSID, rkey); 87 88 DocRef { 89 value: DocRefValue::DraftRef(Box::new(DraftRef { 90 draft_key: canonical_uri.into(), 91 extra_data: None, 92 })), 93 extra_data: None, 94 } 95 } 96 } 97} 98 99/// Extract the rkey (TID) from a draft key. 100fn extract_draft_rkey(draft_key: &str) -> String { 101 if let Some(tid) = draft_key.strip_prefix("new:") { 102 tid.to_string() 103 } else if draft_key.starts_with("at://") { 104 draft_key.split('/').last().unwrap_or(draft_key).to_string() 105 } else { 106 draft_key.to_string() 107 } 108} 109 110/// Get current DID from session. 111async fn get_current_did<C>(client: &C) -> Result<Did<'static>, CrdtError> 112where 113 C: AgentSession, 114{ 115 client 116 .session_info() 117 .await 118 .map(|(did, _)| did) 119 .ok_or(CrdtError::NotAuthenticated) 120} 121 122/// Create the draft stub record on PDS. 123async fn create_draft_stub<C>( 124 client: &C, 125 did: &Did<'_>, 126 rkey: &str, 127) -> Result<(AtUri<'static>, Cid<'static>), CrdtError> 128where 129 C: XrpcClient + AgentSession, 130{ 131 let draft = Draft::new() 132 .created_at(jacquard::types::datetime::Datetime::now()) 133 .build(); 134 135 let draft_data = 136 to_data(&draft).map_err(|e| CrdtError::Serialization(format!("draft: {}", e)))?; 137 138 let record_key = 139 RecordKey::any(rkey).map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?; 140 141 let collection = 142 Nsid::new(DRAFT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?; 143 144 let request = CreateRecord::new() 145 .repo(AtIdentifier::Did(did.clone().into_static())) 146 .collection(collection) 147 .rkey(record_key) 148 .record(draft_data) 149 .build(); 150 151 let response = client 152 .send(request) 153 .await 154 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 155 156 let output = response 157 .into_output() 158 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 159 160 Ok((output.uri.into_static(), output.cid.into_static())) 161} 162 163/// Create the edit root record for a document. 164pub async fn create_edit_root<C, D>( 165 client: &C, 166 doc: &D, 167 draft_key: &str, 168 entry_uri: Option<&AtUri<'_>>, 169 entry_cid: Option<&Cid<'_>>, 170) -> Result<CreateRootResult, CrdtError> 171where 172 C: XrpcClient + IdentityResolver + AgentSession, 173 D: CrdtDocument, 174{ 175 let did = get_current_did(client).await?; 176 177 // For drafts, create the stub record first 178 let draft_ref: Option<StrongRef<'static>> = if entry_uri.is_none() { 179 let rkey = extract_draft_rkey(draft_key); 180 match create_draft_stub(client, &did, &rkey).await { 181 Ok((uri, cid)) => Some(StrongRef::new().uri(uri).cid(cid).build()), 182 Err(e) => { 183 let err_str = e.to_string(); 184 if err_str.contains("RecordAlreadyExists") || err_str.contains("already exists") { 185 // Draft exists, try to fetch it 186 let draft_uri_str = format!("at://{}/{}/{}", did, DRAFT_NSID, rkey); 187 if let Ok(draft_uri) = AtUri::new(&draft_uri_str) { 188 if let Ok(response) = client.get_record::<Draft>(&draft_uri).await { 189 if let Ok(output) = response.into_output() { 190 output.cid.map(|cid| { 191 StrongRef::new() 192 .uri(draft_uri.into_static()) 193 .cid(cid.into_static()) 194 .build() 195 }) 196 } else { 197 None 198 } 199 } else { 200 None 201 } 202 } else { 203 None 204 } 205 } else { 206 tracing::warn!("Failed to create draft stub: {}", e); 207 None 208 } 209 } 210 } 211 } else { 212 None 213 }; 214 215 // Export full snapshot 216 let snapshot = doc.export_snapshot(); 217 218 // Upload snapshot blob 219 let mime_type = MimeType::new_static("application/octet-stream"); 220 let blob_ref = client 221 .upload_blob(snapshot, mime_type) 222 .await 223 .map_err(|e| CrdtError::Xrpc(format!("upload blob: {}", e)))?; 224 225 // Build DocRef 226 let doc_ref = build_doc_ref(&did, draft_key, entry_uri, entry_cid); 227 228 // Build root record 229 let root = Root::new().doc(doc_ref).snapshot(blob_ref).build(); 230 231 let root_data = to_data(&root).map_err(|e| CrdtError::Serialization(format!("root: {}", e)))?; 232 233 // Generate TID for the root rkey 234 let root_tid = Ticker::new().next(None); 235 let rkey = RecordKey::any(root_tid.as_str()) 236 .map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?; 237 238 let collection = 239 Nsid::new(ROOT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?; 240 241 let request = CreateRecord::new() 242 .repo(AtIdentifier::Did(did)) 243 .collection(collection) 244 .rkey(rkey) 245 .record(root_data) 246 .build(); 247 248 let response = client 249 .send(request) 250 .await 251 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 252 253 let output = response 254 .into_output() 255 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 256 257 Ok(CreateRootResult { 258 root_uri: output.uri.into_static(), 259 root_cid: output.cid.into_static(), 260 draft_ref, 261 }) 262} 263 264/// Create a diff record with updates since the last sync. 265pub async fn create_diff<C, D>( 266 client: &C, 267 doc: &D, 268 root_uri: &AtUri<'_>, 269 root_cid: &Cid<'_>, 270 prev_diff: Option<(&AtUri<'_>, &Cid<'_>)>, 271 draft_key: &str, 272 entry_uri: Option<&AtUri<'_>>, 273 entry_cid: Option<&Cid<'_>>, 274) -> Result<Option<(AtUri<'static>, Cid<'static>)>, CrdtError> 275where 276 C: XrpcClient + IdentityResolver + AgentSession, 277 D: CrdtDocument, 278{ 279 // Export updates since last sync 280 let updates = match doc.export_updates_since_sync() { 281 Some(u) => u, 282 None => return Ok(None), 283 }; 284 285 let did = get_current_did(client).await?; 286 287 // Threshold for inline vs blob storage (8KB max for inline per lexicon) 288 const INLINE_THRESHOLD: usize = 8192; 289 290 let (blob_ref, inline_diff): (Option<jacquard::types::blob::BlobRef<'static>>, _) = 291 if updates.len() <= INLINE_THRESHOLD { 292 (None, Some(jacquard::bytes::Bytes::from(updates))) 293 } else { 294 let mime_type = MimeType::new_static("application/octet-stream"); 295 let blob = client 296 .upload_blob(updates, mime_type) 297 .await 298 .map_err(|e| CrdtError::Xrpc(format!("upload diff: {}", e)))?; 299 (Some(blob.into()), None) 300 }; 301 302 // Build DocRef 303 let doc_ref = build_doc_ref(&did, draft_key, entry_uri, entry_cid); 304 305 // Build root reference 306 let root_ref = StrongRef::new() 307 .uri(root_uri.clone().into_static()) 308 .cid(root_cid.clone().into_static()) 309 .build(); 310 311 // Build prev reference 312 let prev_ref = prev_diff.map(|(uri, cid)| { 313 StrongRef::new() 314 .uri(uri.clone().into_static()) 315 .cid(cid.clone().into_static()) 316 .build() 317 }); 318 319 // Build diff record 320 let diff = Diff::new() 321 .doc(doc_ref) 322 .root(root_ref) 323 .maybe_snapshot(blob_ref) 324 .maybe_inline_diff(inline_diff) 325 .maybe_prev(prev_ref) 326 .build(); 327 328 let diff_data = to_data(&diff).map_err(|e| CrdtError::Serialization(format!("diff: {}", e)))?; 329 330 // Generate TID for the diff rkey 331 let diff_tid = Ticker::new().next(None); 332 let rkey = RecordKey::any(diff_tid.as_str()) 333 .map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?; 334 335 let collection = 336 Nsid::new(DIFF_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?; 337 338 let request = CreateRecord::new() 339 .repo(AtIdentifier::Did(did)) 340 .collection(collection) 341 .rkey(rkey) 342 .record(diff_data) 343 .build(); 344 345 let response = client 346 .send(request) 347 .await 348 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 349 350 let output = response 351 .into_output() 352 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 353 354 Ok(Some((output.uri.into_static(), output.cid.into_static()))) 355} 356 357/// Sync the document to the PDS. 358pub async fn sync_to_pds<C, D>( 359 client: &C, 360 doc: &mut D, 361 draft_key: &str, 362 entry_uri: Option<&AtUri<'_>>, 363 entry_cid: Option<&Cid<'_>>, 364) -> Result<SyncResult, CrdtError> 365where 366 C: XrpcClient + IdentityResolver + AgentSession, 367 D: CrdtDocument, 368{ 369 if !doc.has_unsynced_changes() { 370 return Ok(SyncResult::NoChanges); 371 } 372 373 if doc.edit_root().is_none() { 374 // First sync - create root 375 let result = create_edit_root(client, doc, draft_key, entry_uri, entry_cid).await?; 376 377 let root_ref = StrongRef::new() 378 .uri(result.root_uri.clone()) 379 .cid(result.root_cid.clone()) 380 .build(); 381 382 doc.set_edit_root(Some(root_ref)); 383 doc.set_last_diff(None); 384 doc.mark_synced(); 385 386 Ok(SyncResult::CreatedRoot { 387 uri: result.root_uri, 388 cid: result.root_cid, 389 }) 390 } else { 391 // Subsequent sync - create diff 392 let root = doc.edit_root().unwrap(); 393 let prev = doc.last_diff(); 394 395 let prev_refs = prev.as_ref().map(|p| (&p.uri, &p.cid)); 396 397 let result = create_diff( 398 client, doc, &root.uri, &root.cid, prev_refs, draft_key, entry_uri, entry_cid, 399 ) 400 .await?; 401 402 match result { 403 Some((uri, cid)) => { 404 let diff_ref = StrongRef::new().uri(uri.clone()).cid(cid.clone()).build(); 405 doc.set_last_diff(Some(diff_ref)); 406 doc.mark_synced(); 407 408 Ok(SyncResult::CreatedDiff { uri, cid }) 409 } 410 None => Ok(SyncResult::NoChanges), 411 } 412 } 413} 414 415/// Find all edit roots for an entry using weaver-index. 416#[cfg(feature = "use-index")] 417pub async fn find_all_edit_roots<C>( 418 client: &C, 419 entry_uri: &AtUri<'_>, 420 _collaborator_dids: Vec<Did<'static>>, 421) -> Result<Vec<RecordId<'static>>, CrdtError> 422where 423 C: WeaverExt, 424{ 425 use jacquard::types::ident::AtIdentifier; 426 use jacquard::types::nsid::Nsid; 427 use weaver_api::sh_weaver::edit::get_edit_history::GetEditHistory; 428 429 let response = client 430 .send(GetEditHistory::new().resource(entry_uri.clone()).build()) 431 .await 432 .map_err(|e| CrdtError::Xrpc(format!("get edit history: {}", e)))?; 433 434 let output = response 435 .into_output() 436 .map_err(|e| CrdtError::Xrpc(format!("parse edit history: {}", e)))?; 437 438 let roots: Vec<RecordId<'static>> = output 439 .roots 440 .into_iter() 441 .filter_map(|entry| { 442 let uri = AtUri::new(entry.uri.as_ref()).ok()?; 443 let did = match uri.authority() { 444 AtIdentifier::Did(d) => d.clone().into_static(), 445 _ => return None, 446 }; 447 let rkey = uri.rkey()?.clone().into_static(); 448 Some(RecordId { 449 did, 450 collection: Nsid::raw(ROOT_NSID).into_static(), 451 rkey, 452 }) 453 }) 454 .collect(); 455 456 tracing::debug!("find_all_edit_roots (index): found {} roots", roots.len()); 457 458 Ok(roots) 459} 460 461/// Find all edit roots for an entry using Constellation backlinks. 462#[cfg(not(feature = "use-index"))] 463pub async fn find_all_edit_roots<C>( 464 client: &C, 465 entry_uri: &AtUri<'_>, 466 collaborator_dids: Vec<Did<'static>>, 467) -> Result<Vec<RecordId<'static>>, CrdtError> 468where 469 C: XrpcClient, 470{ 471 let constellation_url = 472 Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?; 473 474 let query = GetBacklinksQuery { 475 subject: Uri::At(entry_uri.clone().into_static()), 476 source: format_smolstr!("{}:doc.value.entry.uri", ROOT_NSID).into(), 477 cursor: None, 478 did: collaborator_dids, 479 limit: 100, 480 }; 481 482 let response = client 483 .xrpc(constellation_url) 484 .send(&query) 485 .await 486 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 487 488 let output = response 489 .into_output() 490 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 491 492 Ok(output 493 .records 494 .into_iter() 495 .map(|r| r.into_static()) 496 .collect()) 497} 498 499/// Find all diffs for a root record using Constellation backlinks. 500pub async fn find_diffs_for_root<C>( 501 client: &C, 502 root_uri: &AtUri<'_>, 503) -> Result<Vec<RecordId<'static>>, CrdtError> 504where 505 C: XrpcClient, 506{ 507 let constellation_url = 508 Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?; 509 510 let mut all_diffs = Vec::new(); 511 let mut cursor: Option<String> = None; 512 513 loop { 514 let query = GetBacklinksQuery { 515 subject: Uri::At(root_uri.clone().into_static()), 516 source: format_smolstr!("{}:root.uri", DIFF_NSID).into(), 517 cursor: cursor.map(Into::into), 518 did: vec![], 519 limit: 100, 520 }; 521 522 let response = client 523 .xrpc(constellation_url.clone()) 524 .send(&query) 525 .await 526 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 527 528 let output = response 529 .into_output() 530 .map_err(|e| CrdtError::Xrpc(e.to_string()))?; 531 532 all_diffs.extend(output.records.into_iter().map(|r| r.into_static())); 533 534 match output.cursor { 535 Some(c) => cursor = Some(c.to_string()), 536 None => break, 537 } 538 } 539 540 Ok(all_diffs) 541} 542 543// ============================================================================ 544// Loading functions 545// ============================================================================ 546 547/// Result of loading edit state from PDS. 548#[derive(Clone, Debug)] 549pub struct PdsEditState { 550 /// The root record reference. 551 pub root_ref: StrongRef<'static>, 552 /// The latest diff reference (if any diffs exist). 553 pub last_diff_ref: Option<StrongRef<'static>>, 554 /// The Loro snapshot bytes from the root. 555 pub root_snapshot: Bytes, 556 /// All diff update bytes in order (oldest first, by TID). 557 pub diff_updates: Vec<Bytes>, 558 /// Last seen diff URI per collaborator root (for incremental sync). 559 pub last_seen_diffs: HashMap<AtUri<'static>, AtUri<'static>>, 560 /// The DocRef from the root record. 561 pub doc_ref: DocRef<'static>, 562} 563 564/// Find edit root for a draft using Constellation backlinks. 565pub async fn find_edit_root_for_draft<C>( 566 client: &C, 567 draft_uri: &AtUri<'_>, 568) -> Result<Option<RecordId<'static>>, CrdtError> 569where 570 C: XrpcClient, 571{ 572 let constellation_url = 573 Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?; 574 575 let query = GetBacklinksQuery { 576 subject: Uri::At(draft_uri.clone().into_static()), 577 source: format_smolstr!("{}:doc.value.draft_key", ROOT_NSID).into(), 578 cursor: None, 579 did: vec![], 580 limit: 1, 581 }; 582 583 let response = client 584 .xrpc(constellation_url) 585 .send(&query) 586 .await 587 .map_err(|e| CrdtError::Xrpc(format!("constellation query: {}", e)))?; 588 589 let output = response 590 .into_output() 591 .map_err(|e| CrdtError::Xrpc(format!("parse constellation: {}", e)))?; 592 593 Ok(output.records.into_iter().next().map(|r| r.into_static())) 594} 595 596/// Build a canonical draft URI from draft key and DID. 597pub fn build_draft_uri(did: &Did<'_>, draft_key: &str) -> AtUri<'static> { 598 let rkey = extract_draft_rkey(draft_key); 599 let uri_str = format_smolstr!("at://{}/{}/{}", did, DRAFT_NSID, rkey); 600 AtUri::new(&uri_str).unwrap().into_static() 601} 602 603/// Load edit state from a root record ID. 604async fn load_edit_state_from_root_id<C>( 605 client: &C, 606 root_id: RecordId<'static>, 607 after_rkey: Option<&str>, 608) -> Result<Option<PdsEditState>, CrdtError> 609where 610 C: WeaverExt, 611{ 612 let root_uri = AtUri::new(&format_smolstr!( 613 "at://{}/{}/{}", 614 root_id.did, 615 ROOT_NSID, 616 root_id.rkey.as_ref() 617 )) 618 .map_err(|e| CrdtError::InvalidUri(format!("root URI: {}", e)))? 619 .into_static(); 620 621 let root_response = client 622 .get_record::<Root>(&root_uri) 623 .await 624 .map_err(|e| CrdtError::Xrpc(format!("fetch root: {}", e)))?; 625 626 let root_output = root_response 627 .into_output() 628 .map_err(|e| CrdtError::Xrpc(format!("parse root: {}", e)))?; 629 630 let root_cid = root_output 631 .cid 632 .ok_or_else(|| CrdtError::Xrpc("root missing CID".into()))?; 633 634 let root_ref = StrongRef::new() 635 .uri(root_uri.clone()) 636 .cid(root_cid.into_static()) 637 .build(); 638 639 let doc_ref = root_output.value.doc.into_static(); 640 641 let root_snapshot = client 642 .fetch_blob(&root_id.did, root_output.value.snapshot.blob().cid()) 643 .await 644 .map_err(|e| CrdtError::Xrpc(format!("fetch snapshot blob: {}", e)))?; 645 646 let diff_ids = find_diffs_for_root(client, &root_uri).await?; 647 648 if diff_ids.is_empty() { 649 return Ok(Some(PdsEditState { 650 root_ref, 651 last_diff_ref: None, 652 root_snapshot, 653 diff_updates: vec![], 654 last_seen_diffs: HashMap::new(), 655 doc_ref, 656 })); 657 } 658 659 let mut diffs_by_rkey: BTreeMap< 660 CowStr<'static>, 661 (Diff<'static>, Cid<'static>, AtUri<'static>), 662 > = BTreeMap::new(); 663 664 for diff_id in &diff_ids { 665 let rkey_str: &str = diff_id.rkey.as_ref(); 666 667 if let Some(after) = after_rkey { 668 if rkey_str <= after { 669 continue; 670 } 671 } 672 673 let diff_uri = AtUri::new(&format_smolstr!( 674 "at://{}/{}/{}", 675 diff_id.did, 676 DIFF_NSID, 677 rkey_str 678 )) 679 .map_err(|e| CrdtError::InvalidUri(format!("diff URI: {}", e)))? 680 .into_static(); 681 682 let diff_response = client 683 .get_record::<Diff>(&diff_uri) 684 .await 685 .map_err(|e| CrdtError::Xrpc(format!("fetch diff: {}", e)))?; 686 687 let diff_output = diff_response 688 .into_output() 689 .map_err(|e| CrdtError::Xrpc(format!("parse diff: {}", e)))?; 690 691 let diff_cid = diff_output 692 .cid 693 .ok_or_else(|| CrdtError::Xrpc("diff missing CID".into()))?; 694 695 diffs_by_rkey.insert( 696 rkey_str.to_cowstr().into_static(), 697 ( 698 diff_output.value.into_static(), 699 diff_cid.into_static(), 700 diff_uri, 701 ), 702 ); 703 } 704 705 let mut diff_updates = Vec::new(); 706 let mut last_diff_ref = None; 707 708 for (_rkey, (diff, cid, uri)) in &diffs_by_rkey { 709 let diff_bytes = if let Some(ref inline) = diff.inline_diff { 710 inline.clone() 711 } else if let Some(ref snapshot) = diff.snapshot { 712 client 713 .fetch_blob(&root_id.did, snapshot.blob().cid()) 714 .await 715 .map_err(|e| CrdtError::Xrpc(format!("fetch diff blob: {}", e)))? 716 } else { 717 tracing::warn!("Diff has neither inline_diff nor snapshot, skipping"); 718 continue; 719 }; 720 721 diff_updates.push(diff_bytes); 722 last_diff_ref = Some(StrongRef::new().uri(uri.clone()).cid(cid.clone()).build()); 723 } 724 725 Ok(Some(PdsEditState { 726 root_ref, 727 last_diff_ref, 728 root_snapshot, 729 diff_updates, 730 last_seen_diffs: HashMap::new(), 731 doc_ref, 732 })) 733} 734 735/// Load edit state from PDS for an entry (single root). 736pub async fn load_edit_state_from_entry<C>( 737 client: &C, 738 entry_uri: &AtUri<'_>, 739 collaborator_dids: Vec<Did<'static>>, 740) -> Result<Option<PdsEditState>, CrdtError> 741where 742 C: WeaverExt, 743{ 744 let root_id = match find_all_edit_roots(client, entry_uri, collaborator_dids) 745 .await? 746 .into_iter() 747 .next() 748 { 749 Some(id) => id, 750 None => return Ok(None), 751 }; 752 753 load_edit_state_from_root_id(client, root_id, None).await 754} 755 756/// Load edit state from PDS for a draft. 757pub async fn load_edit_state_from_draft<C>( 758 client: &C, 759 draft_uri: &AtUri<'_>, 760) -> Result<Option<PdsEditState>, CrdtError> 761where 762 C: WeaverExt, 763{ 764 let root_id = match find_edit_root_for_draft(client, draft_uri).await? { 765 Some(id) => id, 766 None => return Ok(None), 767 }; 768 769 load_edit_state_from_root_id(client, root_id, None).await 770} 771 772/// Load and merge edit states from ALL collaborator repos. 773pub async fn load_all_edit_states<C>( 774 client: &C, 775 entry_uri: &AtUri<'_>, 776 collaborator_dids: Vec<Did<'static>>, 777 current_did: Option<&Did<'_>>, 778 last_seen_diffs: &HashMap<AtUri<'static>, AtUri<'static>>, 779) -> Result<Option<PdsEditState>, CrdtError> 780where 781 C: WeaverExt, 782{ 783 let all_roots = find_all_edit_roots(client, entry_uri, collaborator_dids).await?; 784 785 if all_roots.is_empty() { 786 return Ok(None); 787 } 788 789 let merged_doc = LoroDoc::new(); 790 let mut our_root_ref: Option<StrongRef<'static>> = None; 791 let mut our_last_diff_ref: Option<StrongRef<'static>> = None; 792 let mut merged_doc_ref: Option<DocRef<'static>> = None; 793 let mut updated_last_seen = last_seen_diffs.clone(); 794 795 for root_id in all_roots { 796 let root_did = root_id.did.clone(); 797 798 let root_uri = AtUri::new(&format_smolstr!( 799 "at://{}/{}/{}", 800 root_id.did, 801 ROOT_NSID, 802 root_id.rkey.as_ref() 803 )) 804 .ok() 805 .map(|u| u.into_static()); 806 807 let after_rkey = root_uri.as_ref().and_then(|uri| { 808 last_seen_diffs 809 .get(uri) 810 .and_then(|diff_uri| diff_uri.rkey().map(|rk| rk.0.to_string())) 811 }); 812 813 if let Some(pds_state) = 814 load_edit_state_from_root_id(client, root_id, after_rkey.as_deref()).await? 815 { 816 if let Err(e) = merged_doc.import(&pds_state.root_snapshot) { 817 tracing::warn!("Failed to import root snapshot from {}: {:?}", root_did, e); 818 continue; 819 } 820 821 for diff in &pds_state.diff_updates { 822 if let Err(e) = merged_doc.import(diff) { 823 tracing::warn!("Failed to import diff from {}: {:?}", root_did, e); 824 } 825 } 826 827 if let (Some(uri), Some(last_diff)) = (&root_uri, &pds_state.last_diff_ref) { 828 updated_last_seen.insert(uri.clone(), last_diff.uri.clone().into_static()); 829 } 830 831 if merged_doc_ref.is_none() { 832 merged_doc_ref = Some(pds_state.doc_ref.clone()); 833 } 834 835 let is_our_root = current_did.is_some_and(|did| root_did == *did); 836 837 if is_our_root { 838 our_root_ref = Some(pds_state.root_ref); 839 our_last_diff_ref = pds_state.last_diff_ref; 840 } else if our_root_ref.is_none() { 841 our_root_ref = Some(pds_state.root_ref); 842 our_last_diff_ref = pds_state.last_diff_ref; 843 } 844 } 845 } 846 847 let merged_snapshot = merged_doc 848 .export(ExportMode::Snapshot) 849 .map_err(|e| CrdtError::Loro(format!("export merged: {}", e)))?; 850 851 Ok(our_root_ref.map(|root_ref| PdsEditState { 852 root_ref, 853 last_diff_ref: our_last_diff_ref, 854 root_snapshot: merged_snapshot.into(), 855 diff_updates: vec![], 856 last_seen_diffs: updated_last_seen, 857 doc_ref: merged_doc_ref.expect("Should have doc_ref if we have root"), 858 })) 859} 860 861/// Remote draft info from PDS. 862#[derive(Clone, Debug)] 863pub struct RemoteDraft { 864 /// The draft record URI. 865 pub uri: AtUri<'static>, 866 /// The rkey (TID) of the draft. 867 pub rkey: String, 868 /// When the draft was created. 869 pub created_at: String, 870} 871 872/// List all drafts for a user using weaver-index. 873#[cfg(feature = "use-index")] 874pub async fn list_drafts<C>(client: &C, did: &Did<'_>) -> Result<Vec<RemoteDraft>, CrdtError> 875where 876 C: WeaverExt, 877{ 878 use jacquard::types::ident::AtIdentifier; 879 use weaver_api::sh_weaver::edit::list_drafts::ListDrafts; 880 881 let actor = AtIdentifier::Did(did.clone().into_static()); 882 let response = client 883 .send(ListDrafts::new().actor(actor).build()) 884 .await 885 .map_err(|e| CrdtError::Xrpc(format!("list drafts: {}", e)))?; 886 887 let output = response 888 .into_output() 889 .map_err(|e| CrdtError::Xrpc(format!("parse list drafts: {}", e)))?; 890 891 tracing::debug!("list_drafts (index): found {} drafts", output.drafts.len()); 892 893 let drafts = output 894 .drafts 895 .into_iter() 896 .filter_map(|draft| { 897 let uri = AtUri::new(draft.uri.as_ref()).ok()?.into_static(); 898 let rkey = uri.rkey()?.0.as_str().to_string(); 899 let created_at = draft.created_at.to_string(); 900 Some(RemoteDraft { 901 uri, 902 rkey, 903 created_at, 904 }) 905 }) 906 .collect(); 907 908 Ok(drafts) 909} 910 911/// List all drafts for a user (direct PDS query, no index). 912#[cfg(not(feature = "use-index"))] 913pub async fn list_drafts<C>(client: &C, did: &Did<'_>) -> Result<Vec<RemoteDraft>, CrdtError> 914where 915 C: WeaverExt, 916{ 917 use weaver_api::com_atproto::repo::list_records::ListRecords; 918 919 let pds_url = client 920 .pds_for_did(did) 921 .await 922 .map_err(|e| CrdtError::Xrpc(format!("resolve DID: {}", e)))?; 923 924 let collection = 925 Nsid::new(DRAFT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?; 926 927 let request = ListRecords::new() 928 .repo(did.clone()) 929 .collection(collection) 930 .limit(100) 931 .build(); 932 933 let response = client 934 .xrpc(pds_url) 935 .send(&request) 936 .await 937 .map_err(|e| CrdtError::Xrpc(format!("list records: {}", e)))?; 938 939 let output = response 940 .into_output() 941 .map_err(|e| CrdtError::Xrpc(format!("parse list records: {}", e)))?; 942 943 let mut drafts = Vec::new(); 944 for record in output.records { 945 let rkey = record 946 .uri 947 .rkey() 948 .map(|r| r.0.as_str().to_string()) 949 .unwrap_or_default(); 950 951 let created_at = jacquard::from_data::<Draft>(&record.value) 952 .map(|d| d.created_at.to_string()) 953 .unwrap_or_default(); 954 955 drafts.push(RemoteDraft { 956 uri: record.uri.into_static(), 957 rkey, 958 created_at, 959 }); 960 } 961 962 Ok(drafts) 963}