···83pub fn CollabCoordinator(props: CollabCoordinatorProps) -> Element {
84 #[cfg(target_arch = "wasm32")]
85 {
86- use super::worker::{WorkerInput, WorkerOutput};
87 use crate::collab_context::CollabDebugState;
88 use crate::fetch::Fetcher;
89 use futures_util::stream::SplitSink;
···92 use gloo_worker::reactor::ReactorBridge;
93 use jacquard::IntoStatic;
94 use weaver_common::WeaverExt;
95-96- use super::worker::EditorReactor;
9798 let fetcher = use_context::<Fetcher>();
99
···83pub fn CollabCoordinator(props: CollabCoordinatorProps) -> Element {
84 #[cfg(target_arch = "wasm32")]
85 {
086 use crate::collab_context::CollabDebugState;
87 use crate::fetch::Fetcher;
88 use futures_util::stream::SplitSink;
···91 use gloo_worker::reactor::ReactorBridge;
92 use jacquard::IntoStatic;
93 use weaver_common::WeaverExt;
94+ use weaver_editor_crdt::{EditorReactor, WorkerInput, WorkerOutput};
09596 let fetcher = use_context::<Fetcher>();
97
···1-//! Web Worker for offloading expensive editor operations.
2//!
3//! This worker maintains a shadow copy of the Loro document and handles
4//! CPU-intensive operations like snapshot export and base64 encoding
5//! off the main thread.
6//!
7-//! When the `collab-worker` feature is enabled, also handles iroh P2P
8//! networking for real-time collaboration.
910#[cfg(all(target_family = "wasm", target_os = "unknown"))]
···138 use gloo_worker::reactor::{ReactorScope, reactor};
139 use weaver_common::transport::CollaboratorInfo;
140141- #[cfg(feature = "collab-worker")]
142 use jacquard::smol_str::ToSmolStr;
143- #[cfg(feature = "collab-worker")]
144 use std::sync::Arc;
145- #[cfg(feature = "collab-worker")]
146 use weaver_common::transport::{
147 CollabMessage, CollabNode, CollabSession, PresenceTracker, SessionEvent, TopicId,
148 parse_node_id,
149 };
150151 /// Internal event from gossip handler task to main reactor loop.
152- #[cfg(feature = "collab-worker")]
153 enum CollabEvent {
154 RemoteUpdates { data: Vec<u8> },
155 PresenceChanged(PresenceSnapshot),
···162 let mut doc: Option<loro::LoroDoc> = None;
163 let mut draft_key = SmolStr::default();
164165- // Collab state (only used when collab-worker feature enabled)
166- #[cfg(feature = "collab-worker")]
167 let mut collab_node: Option<Arc<CollabNode>> = None;
168- #[cfg(feature = "collab-worker")]
169 let mut collab_session: Option<Arc<CollabSession>> = None;
170- #[cfg(feature = "collab-worker")]
171 let mut collab_event_rx: Option<tokio::sync::mpsc::UnboundedReceiver<CollabEvent>> = None;
172- #[cfg(feature = "collab-worker")]
173 const OUR_COLOR: u32 = 0x4ECDC4FF;
174175 // Helper enum for racing coordinator messages vs collab events
176- #[cfg(feature = "collab-worker")]
177 enum RaceResult {
178 CoordinatorMsg(Option<WorkerInput>),
179 CollabEvent(Option<CollabEvent>),
···181182 loop {
183 // Race between coordinator messages and collab events
184- #[cfg(feature = "collab-worker")]
185 let race_result = if let Some(ref mut event_rx) = collab_event_rx {
186 use n0_future::FutureExt;
187 let coord_fut = async { RaceResult::CoordinatorMsg(scope.next().await) };
···191 RaceResult::CoordinatorMsg(scope.next().await)
192 };
193194- #[cfg(feature = "collab-worker")]
195 match race_result {
196 RaceResult::CollabEvent(Some(event)) => {
197 match event {
···281 continue;
282 };
283284- let export_start = crate::perf::now();
285 let snapshot_bytes = match doc.export(loro::ExportMode::Snapshot) {
286 Ok(bytes) => bytes,
287 Err(e) => {
···298 continue;
299 }
300 };
301- let export_ms = crate::perf::now() - export_start;
302303- let encode_start = crate::perf::now();
304 let b64_snapshot = BASE64.encode(&snapshot_bytes);
305- let encode_ms = crate::perf::now() - encode_start;
306307 let content = doc.get_text("content").to_string();
308 let title: SmolStr = doc.get_text("title").to_string().into();
···327 }
328329 // ============================================================
330- // Collab handlers - full impl when collab-worker feature enabled
331 // ============================================================
332- #[cfg(feature = "collab-worker")]
333 WorkerInput::StartCollab {
334 topic,
335 bootstrap_peers,
···388 "Failed to send CollabJoined to coordinator: {e}"
389 );
390 }
391-392- // NOTE: Don't broadcast Join here - wait for BroadcastJoin message
393- // after peers have been added via AddPeers
394395 // Create channel for events from spawned task
396 let (event_tx, event_rx) =
···461 selection,
462 ..
463 } => {
464- // Note: cursor updates require the collaborator to exist
465- // (added via Join message)
466 let exists = presence.contains(&from);
467 tracing::debug!(%from, position, ?selection, exists, "Received Cursor message");
468 presence.update_cursor(
···485 }
486 SessionEvent::PeerJoined(peer) => {
487 tracing::info!(%peer, "PeerJoined - notifying coordinator");
488- // Notify coordinator so it can send BroadcastJoin
489- // Don't add to presence yet - wait for their Join message
490 if event_tx
491 .send(CollabEvent::PeerConnected)
492 .is_err()
···531 }
532 }
533534- #[cfg(feature = "collab-worker")]
535 WorkerInput::BroadcastUpdate { data } => {
536 if let Some(ref session) = collab_session {
537 let msg = CollabMessage::LoroUpdate {
···544 }
545 }
546547- #[cfg(feature = "collab-worker")]
548 WorkerInput::BroadcastCursor {
549 position,
550 selection,
···572 }
573 }
574575- #[cfg(feature = "collab-worker")]
576 WorkerInput::AddPeers { peers } => {
577 tracing::info!(count = peers.len(), "Worker: received AddPeers");
578 if let Some(ref session) = collab_session {
579 let peer_ids: Vec<_> = peers
580- .iter()
581- .filter_map(|s| {
582- match parse_node_id(s) {
583- Ok(id) => Some(id),
584- Err(e) => {
585- tracing::warn!(node_id = %s, error = %e, "Failed to parse node_id");
586- None
587- }
588- }
589- })
590- .collect();
591 tracing::info!(
592 parsed_count = peer_ids.len(),
593 "Worker: joining peers"
···600 }
601 }
602603- #[cfg(feature = "collab-worker")]
604 WorkerInput::BroadcastJoin { did, display_name } => {
605 if let Some(ref session) = collab_session {
606 let join_msg = CollabMessage::Join { did, display_name };
···610 }
611 }
612613- #[cfg(feature = "collab-worker")]
614 WorkerInput::StopCollab => {
615 collab_session = None;
616 collab_node = None;
···619 tracing::error!("Failed to send CollabStopped to coordinator: {e}");
620 }
621 }
000000000000000000000000000622 } // end match msg
623 } // end RaceResult::CoordinatorMsg(Some(msg))
624 } // end match race_result
625626- // Non-collab-worker: simple message loop
627- #[cfg(not(feature = "collab-worker"))]
628 {
629 let Some(msg) = scope.next().await else { break };
630 tracing::debug!(?msg, "Worker: received message");
···679 }
680 continue;
681 };
682- let export_start = crate::perf::now();
683 let snapshot_bytes = match doc.export(loro::ExportMode::Snapshot) {
684 Ok(bytes) => bytes,
685 Err(e) => {
···696 continue;
697 }
698 };
699- let export_ms = crate::perf::now() - export_start;
700- let encode_start = crate::perf::now();
701 let b64_snapshot = BASE64.encode(&snapshot_bytes);
702- let encode_ms = crate::perf::now() - encode_start;
703 let content = doc.get_text("content").to_string();
704 let title: SmolStr = doc.get_text("title").to_string().into();
705 if let Err(e) = scope
···720 tracing::error!("Failed to send Snapshot to coordinator: {e}");
721 }
722 }
723- // Collab stubs for non-collab-worker build
724 WorkerInput::StartCollab { .. } => {
725 if let Err(e) = scope
726 .send(WorkerOutput::Error {
···746 }
747748 /// Convert PresenceTracker to serializable PresenceSnapshot.
749- #[cfg(feature = "collab-worker")]
750 fn presence_to_snapshot(tracker: &PresenceTracker) -> PresenceSnapshot {
0751 let collaborators = tracker
752 .collaborators()
753 .map(|c| CollaboratorInfo {
···1+//! Web Worker reactor for offloading expensive editor operations.
2//!
3//! This worker maintains a shadow copy of the Loro document and handles
4//! CPU-intensive operations like snapshot export and base64 encoding
5//! off the main thread.
6//!
7+//! When the `collab` feature is enabled, also handles iroh P2P
8//! networking for real-time collaboration.
910#[cfg(all(target_family = "wasm", target_os = "unknown"))]
···138 use gloo_worker::reactor::{ReactorScope, reactor};
139 use weaver_common::transport::CollaboratorInfo;
140141+ #[cfg(feature = "collab")]
142 use jacquard::smol_str::ToSmolStr;
143+ #[cfg(feature = "collab")]
144 use std::sync::Arc;
145+ #[cfg(feature = "collab")]
146 use weaver_common::transport::{
147 CollabMessage, CollabNode, CollabSession, PresenceTracker, SessionEvent, TopicId,
148 parse_node_id,
149 };
150151 /// Internal event from gossip handler task to main reactor loop.
152+ #[cfg(feature = "collab")]
153 enum CollabEvent {
154 RemoteUpdates { data: Vec<u8> },
155 PresenceChanged(PresenceSnapshot),
···162 let mut doc: Option<loro::LoroDoc> = None;
163 let mut draft_key = SmolStr::default();
164165+ // Collab state (only used when collab feature enabled)
166+ #[cfg(feature = "collab")]
167 let mut collab_node: Option<Arc<CollabNode>> = None;
168+ #[cfg(feature = "collab")]
169 let mut collab_session: Option<Arc<CollabSession>> = None;
170+ #[cfg(feature = "collab")]
171 let mut collab_event_rx: Option<tokio::sync::mpsc::UnboundedReceiver<CollabEvent>> = None;
172+ #[cfg(feature = "collab")]
173 const OUR_COLOR: u32 = 0x4ECDC4FF;
174175 // Helper enum for racing coordinator messages vs collab events
176+ #[cfg(feature = "collab")]
177 enum RaceResult {
178 CoordinatorMsg(Option<WorkerInput>),
179 CollabEvent(Option<CollabEvent>),
···181182 loop {
183 // Race between coordinator messages and collab events
184+ #[cfg(feature = "collab")]
185 let race_result = if let Some(ref mut event_rx) = collab_event_rx {
186 use n0_future::FutureExt;
187 let coord_fut = async { RaceResult::CoordinatorMsg(scope.next().await) };
···191 RaceResult::CoordinatorMsg(scope.next().await)
192 };
193194+ #[cfg(feature = "collab")]
195 match race_result {
196 RaceResult::CollabEvent(Some(event)) => {
197 match event {
···281 continue;
282 };
283284+ let export_start = weaver_common::perf::now();
285 let snapshot_bytes = match doc.export(loro::ExportMode::Snapshot) {
286 Ok(bytes) => bytes,
287 Err(e) => {
···298 continue;
299 }
300 };
301+ let export_ms = weaver_common::perf::now() - export_start;
302303+ let encode_start = weaver_common::perf::now();
304 let b64_snapshot = BASE64.encode(&snapshot_bytes);
305+ let encode_ms = weaver_common::perf::now() - encode_start;
306307 let content = doc.get_text("content").to_string();
308 let title: SmolStr = doc.get_text("title").to_string().into();
···327 }
328329 // ============================================================
330+ // Collab handlers - full impl when collab feature enabled
331 // ============================================================
332+ #[cfg(feature = "collab")]
333 WorkerInput::StartCollab {
334 topic,
335 bootstrap_peers,
···388 "Failed to send CollabJoined to coordinator: {e}"
389 );
390 }
000391392 // Create channel for events from spawned task
393 let (event_tx, event_rx) =
···458 selection,
459 ..
460 } => {
00461 let exists = presence.contains(&from);
462 tracing::debug!(%from, position, ?selection, exists, "Received Cursor message");
463 presence.update_cursor(
···480 }
481 SessionEvent::PeerJoined(peer) => {
482 tracing::info!(%peer, "PeerJoined - notifying coordinator");
00483 if event_tx
484 .send(CollabEvent::PeerConnected)
485 .is_err()
···524 }
525 }
526527+ #[cfg(feature = "collab")]
528 WorkerInput::BroadcastUpdate { data } => {
529 if let Some(ref session) = collab_session {
530 let msg = CollabMessage::LoroUpdate {
···537 }
538 }
539540+ #[cfg(feature = "collab")]
541 WorkerInput::BroadcastCursor {
542 position,
543 selection,
···565 }
566 }
567568+ #[cfg(feature = "collab")]
569 WorkerInput::AddPeers { peers } => {
570 tracing::info!(count = peers.len(), "Worker: received AddPeers");
571 if let Some(ref session) = collab_session {
572 let peer_ids: Vec<_> = peers
573+ .iter()
574+ .filter_map(|s| {
575+ match parse_node_id(s) {
576+ Ok(id) => Some(id),
577+ Err(e) => {
578+ tracing::warn!(node_id = %s, error = %e, "Failed to parse node_id");
579+ None
580+ }
581+ }
582+ })
583+ .collect();
584 tracing::info!(
585 parsed_count = peer_ids.len(),
586 "Worker: joining peers"
···593 }
594 }
595596+ #[cfg(feature = "collab")]
597 WorkerInput::BroadcastJoin { did, display_name } => {
598 if let Some(ref session) = collab_session {
599 let join_msg = CollabMessage::Join { did, display_name };
···603 }
604 }
605606+ #[cfg(feature = "collab")]
607 WorkerInput::StopCollab => {
608 collab_session = None;
609 collab_node = None;
···612 tracing::error!("Failed to send CollabStopped to coordinator: {e}");
613 }
614 }
615+616+ // Non-collab stubs for when collab feature is enabled but message doesn't match
617+ #[cfg(not(feature = "collab"))]
618+ WorkerInput::StartCollab { .. } => {
619+ if let Err(e) = scope
620+ .send(WorkerOutput::Error {
621+ message: "Collab not enabled".into(),
622+ })
623+ .await
624+ {
625+ tracing::error!("Failed to send Error to coordinator: {e}");
626+ }
627+ }
628+ #[cfg(not(feature = "collab"))]
629+ WorkerInput::BroadcastUpdate { .. } => {}
630+ #[cfg(not(feature = "collab"))]
631+ WorkerInput::AddPeers { .. } => {}
632+ #[cfg(not(feature = "collab"))]
633+ WorkerInput::BroadcastJoin { .. } => {}
634+ #[cfg(not(feature = "collab"))]
635+ WorkerInput::BroadcastCursor { .. } => {}
636+ #[cfg(not(feature = "collab"))]
637+ WorkerInput::StopCollab => {
638+ if let Err(e) = scope.send(WorkerOutput::CollabStopped).await {
639+ tracing::error!("Failed to send CollabStopped to coordinator: {e}");
640+ }
641+ }
642 } // end match msg
643 } // end RaceResult::CoordinatorMsg(Some(msg))
644 } // end match race_result
645646+ // Non-collab: simple message loop
647+ #[cfg(not(feature = "collab"))]
648 {
649 let Some(msg) = scope.next().await else { break };
650 tracing::debug!(?msg, "Worker: received message");
···699 }
700 continue;
701 };
702+ let export_start = weaver_common::perf::now();
703 let snapshot_bytes = match doc.export(loro::ExportMode::Snapshot) {
704 Ok(bytes) => bytes,
705 Err(e) => {
···716 continue;
717 }
718 };
719+ let export_ms = weaver_common::perf::now() - export_start;
720+ let encode_start = weaver_common::perf::now();
721 let b64_snapshot = BASE64.encode(&snapshot_bytes);
722+ let encode_ms = weaver_common::perf::now() - encode_start;
723 let content = doc.get_text("content").to_string();
724 let title: SmolStr = doc.get_text("title").to_string().into();
725 if let Err(e) = scope
···740 tracing::error!("Failed to send Snapshot to coordinator: {e}");
741 }
742 }
743+ // Collab stubs for non-collab build
744 WorkerInput::StartCollab { .. } => {
745 if let Err(e) = scope
746 .send(WorkerOutput::Error {
···766 }
767768 /// Convert PresenceTracker to serializable PresenceSnapshot.
769+ #[cfg(feature = "collab")]
770 fn presence_to_snapshot(tracker: &PresenceTracker) -> PresenceSnapshot {
771+ use jacquard::smol_str::ToSmolStr;
772 let collaborators = tracker
773 .collaborators()
774 .map(|c| CollaboratorInfo {
+29
crates/weaver-common/src/agent.rs
···2589 Ok(contributors.into_iter().collect())
2590 }
2591 }
000000000000000000000000000002592}
25932594/// A version of a record from a collaborator's repository.
···2589 Ok(contributors.into_iter().collect())
2590 }
2591 }
2592+2593+ /// Fetch a blob from any PDS by DID and CID.
2594+ ///
2595+ /// Resolves the DID to find its PDS, then fetches the blob.
2596+ fn fetch_blob<'a>(
2597+ &'a self,
2598+ did: &'a Did<'_>,
2599+ cid: &'a jacquard::types::string::Cid<'_>,
2600+ ) -> impl Future<Output = Result<Bytes, WeaverError>> + 'a {
2601+ async move {
2602+ use weaver_api::com_atproto::sync::get_blob::GetBlob;
2603+2604+ let pds_url = self.pds_for_did(did).await.map_err(|e| {
2605+ AgentError::from(ClientError::from(e).with_context("Failed to resolve PDS for DID"))
2606+ })?;
2607+2608+ let request = GetBlob::new().did(did.clone()).cid(cid.clone()).build();
2609+2610+ let response = self
2611+ .xrpc(pds_url)
2612+ .send(&request)
2613+ .await
2614+ .map_err(|e| AgentError::from(ClientError::from(e)))?;
2615+2616+ let output = response.into_output().map_err(|e| AgentError::xrpc(e))?;
2617+2618+ Ok(output.body)
2619+ }
2620+ }
2621}
26222623/// A version of a record from a collaborator's repository.
···1+//! Entry point for the editor web worker.
2+//!
3+//! This binary is compiled separately and loaded by the main app
4+//! to handle CPU-intensive editor operations off the main thread.
5+6+#[cfg(all(target_family = "wasm", target_os = "unknown"))]
7+fn main() {
8+ console_error_panic_hook::set_once();
9+ use tracing::Level;
10+ use tracing::subscriber::set_global_default;
11+ use tracing_subscriber::Registry;
12+ use tracing_subscriber::filter::EnvFilter;
13+ use tracing_subscriber::layer::SubscriberExt;
14+15+ let console_level = if cfg!(debug_assertions) {
16+ Level::DEBUG
17+ } else {
18+ Level::DEBUG
19+ };
20+21+ let wasm_layer = tracing_wasm::WASMLayer::new(
22+ tracing_wasm::WASMLayerConfigBuilder::new()
23+ .set_max_level(console_level)
24+ .build(),
25+ );
26+27+ // Filter out noisy crates
28+ let filter = EnvFilter::new(
29+ "debug,loro_internal=warn,jacquard_identity=info,jacquard_common=info,iroh=info",
30+ );
31+32+ let reg = Registry::default().with(filter).with(wasm_layer);
33+34+ let _ = set_global_default(reg);
35+36+ use gloo_worker::Registrable;
37+ use weaver_editor_crdt::EditorReactor;
38+39+ EditorReactor::registrar().register();
40+}
41+42+#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
43+fn main() {
44+ eprintln!("This binary is only meant to run as a WASM web worker");
45+}
···1+//! PDS synchronization for CRDT documents.
2+//!
3+//! Generic sync logic for AT Protocol edit records (root/diff/draft).
4+//! Works with any client implementing the required traits.
5+6+use std::collections::{BTreeMap, HashMap};
7+8+use jacquard::bytes::Bytes;
9+use jacquard::cowstr::ToCowStr;
10+use jacquard::prelude::*;
11+use jacquard::smol_str::format_smolstr;
12+use jacquard::types::blob::MimeType;
13+use jacquard::types::ident::AtIdentifier;
14+use jacquard::types::recordkey::RecordKey;
15+use jacquard::types::string::{AtUri, Cid, Did, Nsid};
16+use jacquard::types::tid::Ticker;
17+use jacquard::types::uri::Uri;
18+use jacquard::url::Url;
19+use jacquard::{CowStr, IntoStatic, to_data};
20+use loro::{ExportMode, LoroDoc};
21+use weaver_api::com_atproto::repo::create_record::CreateRecord;
22+use weaver_api::com_atproto::repo::strong_ref::StrongRef;
23+use weaver_api::sh_weaver::edit::diff::Diff;
24+use weaver_api::sh_weaver::edit::draft::Draft;
25+use weaver_api::sh_weaver::edit::root::Root;
26+use weaver_api::sh_weaver::edit::{DocRef, DocRefValue, DraftRef, EntryRef};
27+use weaver_common::agent::WeaverExt;
28+use weaver_common::constellation::{GetBacklinksQuery, RecordId};
29+30+use crate::CrdtError;
31+use crate::document::CrdtDocument;
32+33+const ROOT_NSID: &str = "sh.weaver.edit.root";
34+const DIFF_NSID: &str = "sh.weaver.edit.diff";
35+const DRAFT_NSID: &str = "sh.weaver.edit.draft";
36+const CONSTELLATION_URL: &str = "https://constellation.microcosm.blue";
37+38+/// Result of a sync operation.
39+#[derive(Clone, Debug)]
40+pub enum SyncResult {
41+ /// Created a new root record (first sync).
42+ CreatedRoot {
43+ uri: AtUri<'static>,
44+ cid: Cid<'static>,
45+ },
46+ /// Created a new diff record.
47+ CreatedDiff {
48+ uri: AtUri<'static>,
49+ cid: Cid<'static>,
50+ },
51+ /// No changes to sync.
52+ NoChanges,
53+}
54+55+/// Result of creating an edit root.
56+pub struct CreateRootResult {
57+ /// The root record URI.
58+ pub root_uri: AtUri<'static>,
59+ /// The root record CID.
60+ pub root_cid: Cid<'static>,
61+ /// Draft stub StrongRef if this was a new draft.
62+ pub draft_ref: Option<StrongRef<'static>>,
63+}
64+65+/// Build a DocRef for either a published entry or an unpublished draft.
66+fn build_doc_ref(
67+ did: &Did<'_>,
68+ draft_key: &str,
69+ entry_uri: Option<&AtUri<'_>>,
70+ entry_cid: Option<&Cid<'_>>,
71+) -> DocRef<'static> {
72+ match (entry_uri, entry_cid) {
73+ (Some(uri), Some(cid)) => DocRef {
74+ value: DocRefValue::EntryRef(Box::new(EntryRef {
75+ entry: StrongRef::new()
76+ .uri(uri.clone().into_static())
77+ .cid(cid.clone().into_static())
78+ .build(),
79+ extra_data: None,
80+ })),
81+ extra_data: None,
82+ },
83+ _ => {
84+ // Transform localStorage key to synthetic AT-URI
85+ let rkey = extract_draft_rkey(draft_key);
86+ let canonical_uri = format_smolstr!("at://{}/{}/{}", did, DRAFT_NSID, rkey);
87+88+ DocRef {
89+ value: DocRefValue::DraftRef(Box::new(DraftRef {
90+ draft_key: canonical_uri.into(),
91+ extra_data: None,
92+ })),
93+ extra_data: None,
94+ }
95+ }
96+ }
97+}
98+99+/// Extract the rkey (TID) from a draft key.
100+fn extract_draft_rkey(draft_key: &str) -> String {
101+ if let Some(tid) = draft_key.strip_prefix("new:") {
102+ tid.to_string()
103+ } else if draft_key.starts_with("at://") {
104+ draft_key.split('/').last().unwrap_or(draft_key).to_string()
105+ } else {
106+ draft_key.to_string()
107+ }
108+}
109+110+/// Get current DID from session.
111+async fn get_current_did<C>(client: &C) -> Result<Did<'static>, CrdtError>
112+where
113+ C: AgentSession,
114+{
115+ client
116+ .session_info()
117+ .await
118+ .map(|(did, _)| did)
119+ .ok_or(CrdtError::NotAuthenticated)
120+}
121+122+/// Create the draft stub record on PDS.
123+async fn create_draft_stub<C>(
124+ client: &C,
125+ did: &Did<'_>,
126+ rkey: &str,
127+) -> Result<(AtUri<'static>, Cid<'static>), CrdtError>
128+where
129+ C: XrpcClient + AgentSession,
130+{
131+ let draft = Draft::new()
132+ .created_at(jacquard::types::datetime::Datetime::now())
133+ .build();
134+135+ let draft_data =
136+ to_data(&draft).map_err(|e| CrdtError::Serialization(format!("draft: {}", e)))?;
137+138+ let record_key =
139+ RecordKey::any(rkey).map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?;
140+141+ let collection =
142+ Nsid::new(DRAFT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?;
143+144+ let request = CreateRecord::new()
145+ .repo(AtIdentifier::Did(did.clone().into_static()))
146+ .collection(collection)
147+ .rkey(record_key)
148+ .record(draft_data)
149+ .build();
150+151+ let response = client
152+ .send(request)
153+ .await
154+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
155+156+ let output = response
157+ .into_output()
158+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
159+160+ Ok((output.uri.into_static(), output.cid.into_static()))
161+}
162+163+/// Create the edit root record for a document.
164+pub async fn create_edit_root<C, D>(
165+ client: &C,
166+ doc: &D,
167+ draft_key: &str,
168+ entry_uri: Option<&AtUri<'_>>,
169+ entry_cid: Option<&Cid<'_>>,
170+) -> Result<CreateRootResult, CrdtError>
171+where
172+ C: XrpcClient + IdentityResolver + AgentSession,
173+ D: CrdtDocument,
174+{
175+ let did = get_current_did(client).await?;
176+177+ // For drafts, create the stub record first
178+ let draft_ref: Option<StrongRef<'static>> = if entry_uri.is_none() {
179+ let rkey = extract_draft_rkey(draft_key);
180+ match create_draft_stub(client, &did, &rkey).await {
181+ Ok((uri, cid)) => Some(StrongRef::new().uri(uri).cid(cid).build()),
182+ Err(e) => {
183+ let err_str = e.to_string();
184+ if err_str.contains("RecordAlreadyExists") || err_str.contains("already exists") {
185+ // Draft exists, try to fetch it
186+ let draft_uri_str = format!("at://{}/{}/{}", did, DRAFT_NSID, rkey);
187+ if let Ok(draft_uri) = AtUri::new(&draft_uri_str) {
188+ if let Ok(response) = client.get_record::<Draft>(&draft_uri).await {
189+ if let Ok(output) = response.into_output() {
190+ output.cid.map(|cid| {
191+ StrongRef::new()
192+ .uri(draft_uri.into_static())
193+ .cid(cid.into_static())
194+ .build()
195+ })
196+ } else {
197+ None
198+ }
199+ } else {
200+ None
201+ }
202+ } else {
203+ None
204+ }
205+ } else {
206+ tracing::warn!("Failed to create draft stub: {}", e);
207+ None
208+ }
209+ }
210+ }
211+ } else {
212+ None
213+ };
214+215+ // Export full snapshot
216+ let snapshot = doc.export_snapshot();
217+218+ // Upload snapshot blob
219+ let mime_type = MimeType::new_static("application/octet-stream");
220+ let blob_ref = client
221+ .upload_blob(snapshot, mime_type)
222+ .await
223+ .map_err(|e| CrdtError::Xrpc(format!("upload blob: {}", e)))?;
224+225+ // Build DocRef
226+ let doc_ref = build_doc_ref(&did, draft_key, entry_uri, entry_cid);
227+228+ // Build root record
229+ let root = Root::new().doc(doc_ref).snapshot(blob_ref).build();
230+231+ let root_data = to_data(&root).map_err(|e| CrdtError::Serialization(format!("root: {}", e)))?;
232+233+ // Generate TID for the root rkey
234+ let root_tid = Ticker::new().next(None);
235+ let rkey = RecordKey::any(root_tid.as_str())
236+ .map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?;
237+238+ let collection =
239+ Nsid::new(ROOT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?;
240+241+ let request = CreateRecord::new()
242+ .repo(AtIdentifier::Did(did))
243+ .collection(collection)
244+ .rkey(rkey)
245+ .record(root_data)
246+ .build();
247+248+ let response = client
249+ .send(request)
250+ .await
251+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
252+253+ let output = response
254+ .into_output()
255+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
256+257+ Ok(CreateRootResult {
258+ root_uri: output.uri.into_static(),
259+ root_cid: output.cid.into_static(),
260+ draft_ref,
261+ })
262+}
263+264+/// Create a diff record with updates since the last sync.
265+pub async fn create_diff<C, D>(
266+ client: &C,
267+ doc: &D,
268+ root_uri: &AtUri<'_>,
269+ root_cid: &Cid<'_>,
270+ prev_diff: Option<(&AtUri<'_>, &Cid<'_>)>,
271+ draft_key: &str,
272+ entry_uri: Option<&AtUri<'_>>,
273+ entry_cid: Option<&Cid<'_>>,
274+) -> Result<Option<(AtUri<'static>, Cid<'static>)>, CrdtError>
275+where
276+ C: XrpcClient + IdentityResolver + AgentSession,
277+ D: CrdtDocument,
278+{
279+ // Export updates since last sync
280+ let updates = match doc.export_updates_since_sync() {
281+ Some(u) => u,
282+ None => return Ok(None),
283+ };
284+285+ let did = get_current_did(client).await?;
286+287+ // Threshold for inline vs blob storage (8KB max for inline per lexicon)
288+ const INLINE_THRESHOLD: usize = 8192;
289+290+ let (blob_ref, inline_diff): (Option<jacquard::types::blob::BlobRef<'static>>, _) =
291+ if updates.len() <= INLINE_THRESHOLD {
292+ (None, Some(jacquard::bytes::Bytes::from(updates)))
293+ } else {
294+ let mime_type = MimeType::new_static("application/octet-stream");
295+ let blob = client
296+ .upload_blob(updates, mime_type)
297+ .await
298+ .map_err(|e| CrdtError::Xrpc(format!("upload diff: {}", e)))?;
299+ (Some(blob.into()), None)
300+ };
301+302+ // Build DocRef
303+ let doc_ref = build_doc_ref(&did, draft_key, entry_uri, entry_cid);
304+305+ // Build root reference
306+ let root_ref = StrongRef::new()
307+ .uri(root_uri.clone().into_static())
308+ .cid(root_cid.clone().into_static())
309+ .build();
310+311+ // Build prev reference
312+ let prev_ref = prev_diff.map(|(uri, cid)| {
313+ StrongRef::new()
314+ .uri(uri.clone().into_static())
315+ .cid(cid.clone().into_static())
316+ .build()
317+ });
318+319+ // Build diff record
320+ let diff = Diff::new()
321+ .doc(doc_ref)
322+ .root(root_ref)
323+ .maybe_snapshot(blob_ref)
324+ .maybe_inline_diff(inline_diff)
325+ .maybe_prev(prev_ref)
326+ .build();
327+328+ let diff_data = to_data(&diff).map_err(|e| CrdtError::Serialization(format!("diff: {}", e)))?;
329+330+ // Generate TID for the diff rkey
331+ let diff_tid = Ticker::new().next(None);
332+ let rkey = RecordKey::any(diff_tid.as_str())
333+ .map_err(|e| CrdtError::InvalidUri(format!("rkey: {}", e)))?;
334+335+ let collection =
336+ Nsid::new(DIFF_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?;
337+338+ let request = CreateRecord::new()
339+ .repo(AtIdentifier::Did(did))
340+ .collection(collection)
341+ .rkey(rkey)
342+ .record(diff_data)
343+ .build();
344+345+ let response = client
346+ .send(request)
347+ .await
348+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
349+350+ let output = response
351+ .into_output()
352+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
353+354+ Ok(Some((output.uri.into_static(), output.cid.into_static())))
355+}
356+357+/// Sync the document to the PDS.
358+pub async fn sync_to_pds<C, D>(
359+ client: &C,
360+ doc: &mut D,
361+ draft_key: &str,
362+ entry_uri: Option<&AtUri<'_>>,
363+ entry_cid: Option<&Cid<'_>>,
364+) -> Result<SyncResult, CrdtError>
365+where
366+ C: XrpcClient + IdentityResolver + AgentSession,
367+ D: CrdtDocument,
368+{
369+ if !doc.has_unsynced_changes() {
370+ return Ok(SyncResult::NoChanges);
371+ }
372+373+ if doc.edit_root().is_none() {
374+ // First sync - create root
375+ let result = create_edit_root(client, doc, draft_key, entry_uri, entry_cid).await?;
376+377+ let root_ref = StrongRef::new()
378+ .uri(result.root_uri.clone())
379+ .cid(result.root_cid.clone())
380+ .build();
381+382+ doc.set_edit_root(Some(root_ref));
383+ doc.set_last_diff(None);
384+ doc.mark_synced();
385+386+ Ok(SyncResult::CreatedRoot {
387+ uri: result.root_uri,
388+ cid: result.root_cid,
389+ })
390+ } else {
391+ // Subsequent sync - create diff
392+ let root = doc.edit_root().unwrap();
393+ let prev = doc.last_diff();
394+395+ let prev_refs = prev.as_ref().map(|p| (&p.uri, &p.cid));
396+397+ let result = create_diff(
398+ client, doc, &root.uri, &root.cid, prev_refs, draft_key, entry_uri, entry_cid,
399+ )
400+ .await?;
401+402+ match result {
403+ Some((uri, cid)) => {
404+ let diff_ref = StrongRef::new().uri(uri.clone()).cid(cid.clone()).build();
405+ doc.set_last_diff(Some(diff_ref));
406+ doc.mark_synced();
407+408+ Ok(SyncResult::CreatedDiff { uri, cid })
409+ }
410+ None => Ok(SyncResult::NoChanges),
411+ }
412+ }
413+}
414+415+/// Find all edit roots for an entry using weaver-index.
416+#[cfg(feature = "use-index")]
417+pub async fn find_all_edit_roots<C>(
418+ client: &C,
419+ entry_uri: &AtUri<'_>,
420+ _collaborator_dids: Vec<Did<'static>>,
421+) -> Result<Vec<RecordId<'static>>, CrdtError>
422+where
423+ C: WeaverExt,
424+{
425+ use jacquard::types::ident::AtIdentifier;
426+ use jacquard::types::nsid::Nsid;
427+ use weaver_api::sh_weaver::edit::get_edit_history::GetEditHistory;
428+429+ let response = client
430+ .send(GetEditHistory::new().resource(entry_uri.clone()).build())
431+ .await
432+ .map_err(|e| CrdtError::Xrpc(format!("get edit history: {}", e)))?;
433+434+ let output = response
435+ .into_output()
436+ .map_err(|e| CrdtError::Xrpc(format!("parse edit history: {}", e)))?;
437+438+ let roots: Vec<RecordId<'static>> = output
439+ .roots
440+ .into_iter()
441+ .filter_map(|entry| {
442+ let uri = AtUri::new(entry.uri.as_ref()).ok()?;
443+ let did = match uri.authority() {
444+ AtIdentifier::Did(d) => d.clone().into_static(),
445+ _ => return None,
446+ };
447+ let rkey = uri.rkey()?.clone().into_static();
448+ Some(RecordId {
449+ did,
450+ collection: Nsid::raw(ROOT_NSID).into_static(),
451+ rkey,
452+ })
453+ })
454+ .collect();
455+456+ tracing::debug!("find_all_edit_roots (index): found {} roots", roots.len());
457+458+ Ok(roots)
459+}
460+461+/// Find all edit roots for an entry using Constellation backlinks.
462+#[cfg(not(feature = "use-index"))]
463+pub async fn find_all_edit_roots<C>(
464+ client: &C,
465+ entry_uri: &AtUri<'_>,
466+ collaborator_dids: Vec<Did<'static>>,
467+) -> Result<Vec<RecordId<'static>>, CrdtError>
468+where
469+ C: XrpcClient,
470+{
471+ let constellation_url =
472+ Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?;
473+474+ let query = GetBacklinksQuery {
475+ subject: Uri::At(entry_uri.clone().into_static()),
476+ source: format_smolstr!("{}:doc.value.entry.uri", ROOT_NSID).into(),
477+ cursor: None,
478+ did: collaborator_dids,
479+ limit: 100,
480+ };
481+482+ let response = client
483+ .xrpc(constellation_url)
484+ .send(&query)
485+ .await
486+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
487+488+ let output = response
489+ .into_output()
490+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
491+492+ Ok(output
493+ .records
494+ .into_iter()
495+ .map(|r| r.into_static())
496+ .collect())
497+}
498+499+/// Find all diffs for a root record using Constellation backlinks.
500+pub async fn find_diffs_for_root<C>(
501+ client: &C,
502+ root_uri: &AtUri<'_>,
503+) -> Result<Vec<RecordId<'static>>, CrdtError>
504+where
505+ C: XrpcClient,
506+{
507+ let constellation_url =
508+ Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?;
509+510+ let mut all_diffs = Vec::new();
511+ let mut cursor: Option<String> = None;
512+513+ loop {
514+ let query = GetBacklinksQuery {
515+ subject: Uri::At(root_uri.clone().into_static()),
516+ source: format_smolstr!("{}:root.uri", DIFF_NSID).into(),
517+ cursor: cursor.map(Into::into),
518+ did: vec![],
519+ limit: 100,
520+ };
521+522+ let response = client
523+ .xrpc(constellation_url.clone())
524+ .send(&query)
525+ .await
526+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
527+528+ let output = response
529+ .into_output()
530+ .map_err(|e| CrdtError::Xrpc(e.to_string()))?;
531+532+ all_diffs.extend(output.records.into_iter().map(|r| r.into_static()));
533+534+ match output.cursor {
535+ Some(c) => cursor = Some(c.to_string()),
536+ None => break,
537+ }
538+ }
539+540+ Ok(all_diffs)
541+}
542+543+// ============================================================================
544+// Loading functions
545+// ============================================================================
546+547+/// Result of loading edit state from PDS.
548+#[derive(Clone, Debug)]
549+pub struct PdsEditState {
550+ /// The root record reference.
551+ pub root_ref: StrongRef<'static>,
552+ /// The latest diff reference (if any diffs exist).
553+ pub last_diff_ref: Option<StrongRef<'static>>,
554+ /// The Loro snapshot bytes from the root.
555+ pub root_snapshot: Bytes,
556+ /// All diff update bytes in order (oldest first, by TID).
557+ pub diff_updates: Vec<Bytes>,
558+ /// Last seen diff URI per collaborator root (for incremental sync).
559+ pub last_seen_diffs: HashMap<AtUri<'static>, AtUri<'static>>,
560+ /// The DocRef from the root record.
561+ pub doc_ref: DocRef<'static>,
562+}
563+564+/// Find edit root for a draft using Constellation backlinks.
565+pub async fn find_edit_root_for_draft<C>(
566+ client: &C,
567+ draft_uri: &AtUri<'_>,
568+) -> Result<Option<RecordId<'static>>, CrdtError>
569+where
570+ C: XrpcClient,
571+{
572+ let constellation_url =
573+ Url::parse(CONSTELLATION_URL).map_err(|e| CrdtError::InvalidUri(e.to_string()))?;
574+575+ let query = GetBacklinksQuery {
576+ subject: Uri::At(draft_uri.clone().into_static()),
577+ source: format_smolstr!("{}:doc.value.draft_key", ROOT_NSID).into(),
578+ cursor: None,
579+ did: vec![],
580+ limit: 1,
581+ };
582+583+ let response = client
584+ .xrpc(constellation_url)
585+ .send(&query)
586+ .await
587+ .map_err(|e| CrdtError::Xrpc(format!("constellation query: {}", e)))?;
588+589+ let output = response
590+ .into_output()
591+ .map_err(|e| CrdtError::Xrpc(format!("parse constellation: {}", e)))?;
592+593+ Ok(output.records.into_iter().next().map(|r| r.into_static()))
594+}
595+596+/// Build a canonical draft URI from draft key and DID.
597+pub fn build_draft_uri(did: &Did<'_>, draft_key: &str) -> AtUri<'static> {
598+ let rkey = extract_draft_rkey(draft_key);
599+ let uri_str = format_smolstr!("at://{}/{}/{}", did, DRAFT_NSID, rkey);
600+ AtUri::new(&uri_str).unwrap().into_static()
601+}
602+603+/// Load edit state from a root record ID.
604+async fn load_edit_state_from_root_id<C>(
605+ client: &C,
606+ root_id: RecordId<'static>,
607+ after_rkey: Option<&str>,
608+) -> Result<Option<PdsEditState>, CrdtError>
609+where
610+ C: WeaverExt,
611+{
612+ let root_uri = AtUri::new(&format_smolstr!(
613+ "at://{}/{}/{}",
614+ root_id.did,
615+ ROOT_NSID,
616+ root_id.rkey.as_ref()
617+ ))
618+ .map_err(|e| CrdtError::InvalidUri(format!("root URI: {}", e)))?
619+ .into_static();
620+621+ let root_response = client
622+ .get_record::<Root>(&root_uri)
623+ .await
624+ .map_err(|e| CrdtError::Xrpc(format!("fetch root: {}", e)))?;
625+626+ let root_output = root_response
627+ .into_output()
628+ .map_err(|e| CrdtError::Xrpc(format!("parse root: {}", e)))?;
629+630+ let root_cid = root_output
631+ .cid
632+ .ok_or_else(|| CrdtError::Xrpc("root missing CID".into()))?;
633+634+ let root_ref = StrongRef::new()
635+ .uri(root_uri.clone())
636+ .cid(root_cid.into_static())
637+ .build();
638+639+ let doc_ref = root_output.value.doc.into_static();
640+641+ let root_snapshot = client
642+ .fetch_blob(&root_id.did, root_output.value.snapshot.blob().cid())
643+ .await
644+ .map_err(|e| CrdtError::Xrpc(format!("fetch snapshot blob: {}", e)))?;
645+646+ let diff_ids = find_diffs_for_root(client, &root_uri).await?;
647+648+ if diff_ids.is_empty() {
649+ return Ok(Some(PdsEditState {
650+ root_ref,
651+ last_diff_ref: None,
652+ root_snapshot,
653+ diff_updates: vec![],
654+ last_seen_diffs: HashMap::new(),
655+ doc_ref,
656+ }));
657+ }
658+659+ let mut diffs_by_rkey: BTreeMap<
660+ CowStr<'static>,
661+ (Diff<'static>, Cid<'static>, AtUri<'static>),
662+ > = BTreeMap::new();
663+664+ for diff_id in &diff_ids {
665+ let rkey_str: &str = diff_id.rkey.as_ref();
666+667+ if let Some(after) = after_rkey {
668+ if rkey_str <= after {
669+ continue;
670+ }
671+ }
672+673+ let diff_uri = AtUri::new(&format_smolstr!(
674+ "at://{}/{}/{}",
675+ diff_id.did,
676+ DIFF_NSID,
677+ rkey_str
678+ ))
679+ .map_err(|e| CrdtError::InvalidUri(format!("diff URI: {}", e)))?
680+ .into_static();
681+682+ let diff_response = client
683+ .get_record::<Diff>(&diff_uri)
684+ .await
685+ .map_err(|e| CrdtError::Xrpc(format!("fetch diff: {}", e)))?;
686+687+ let diff_output = diff_response
688+ .into_output()
689+ .map_err(|e| CrdtError::Xrpc(format!("parse diff: {}", e)))?;
690+691+ let diff_cid = diff_output
692+ .cid
693+ .ok_or_else(|| CrdtError::Xrpc("diff missing CID".into()))?;
694+695+ diffs_by_rkey.insert(
696+ rkey_str.to_cowstr().into_static(),
697+ (
698+ diff_output.value.into_static(),
699+ diff_cid.into_static(),
700+ diff_uri,
701+ ),
702+ );
703+ }
704+705+ let mut diff_updates = Vec::new();
706+ let mut last_diff_ref = None;
707+708+ for (_rkey, (diff, cid, uri)) in &diffs_by_rkey {
709+ let diff_bytes = if let Some(ref inline) = diff.inline_diff {
710+ inline.clone()
711+ } else if let Some(ref snapshot) = diff.snapshot {
712+ client
713+ .fetch_blob(&root_id.did, snapshot.blob().cid())
714+ .await
715+ .map_err(|e| CrdtError::Xrpc(format!("fetch diff blob: {}", e)))?
716+ } else {
717+ tracing::warn!("Diff has neither inline_diff nor snapshot, skipping");
718+ continue;
719+ };
720+721+ diff_updates.push(diff_bytes);
722+ last_diff_ref = Some(StrongRef::new().uri(uri.clone()).cid(cid.clone()).build());
723+ }
724+725+ Ok(Some(PdsEditState {
726+ root_ref,
727+ last_diff_ref,
728+ root_snapshot,
729+ diff_updates,
730+ last_seen_diffs: HashMap::new(),
731+ doc_ref,
732+ }))
733+}
734+735+/// Load edit state from PDS for an entry (single root).
736+pub async fn load_edit_state_from_entry<C>(
737+ client: &C,
738+ entry_uri: &AtUri<'_>,
739+ collaborator_dids: Vec<Did<'static>>,
740+) -> Result<Option<PdsEditState>, CrdtError>
741+where
742+ C: WeaverExt,
743+{
744+ let root_id = match find_all_edit_roots(client, entry_uri, collaborator_dids)
745+ .await?
746+ .into_iter()
747+ .next()
748+ {
749+ Some(id) => id,
750+ None => return Ok(None),
751+ };
752+753+ load_edit_state_from_root_id(client, root_id, None).await
754+}
755+756+/// Load edit state from PDS for a draft.
757+pub async fn load_edit_state_from_draft<C>(
758+ client: &C,
759+ draft_uri: &AtUri<'_>,
760+) -> Result<Option<PdsEditState>, CrdtError>
761+where
762+ C: WeaverExt,
763+{
764+ let root_id = match find_edit_root_for_draft(client, draft_uri).await? {
765+ Some(id) => id,
766+ None => return Ok(None),
767+ };
768+769+ load_edit_state_from_root_id(client, root_id, None).await
770+}
771+772+/// Load and merge edit states from ALL collaborator repos.
773+pub async fn load_all_edit_states<C>(
774+ client: &C,
775+ entry_uri: &AtUri<'_>,
776+ collaborator_dids: Vec<Did<'static>>,
777+ current_did: Option<&Did<'_>>,
778+ last_seen_diffs: &HashMap<AtUri<'static>, AtUri<'static>>,
779+) -> Result<Option<PdsEditState>, CrdtError>
780+where
781+ C: WeaverExt,
782+{
783+ let all_roots = find_all_edit_roots(client, entry_uri, collaborator_dids).await?;
784+785+ if all_roots.is_empty() {
786+ return Ok(None);
787+ }
788+789+ let merged_doc = LoroDoc::new();
790+ let mut our_root_ref: Option<StrongRef<'static>> = None;
791+ let mut our_last_diff_ref: Option<StrongRef<'static>> = None;
792+ let mut merged_doc_ref: Option<DocRef<'static>> = None;
793+ let mut updated_last_seen = last_seen_diffs.clone();
794+795+ for root_id in all_roots {
796+ let root_did = root_id.did.clone();
797+798+ let root_uri = AtUri::new(&format_smolstr!(
799+ "at://{}/{}/{}",
800+ root_id.did,
801+ ROOT_NSID,
802+ root_id.rkey.as_ref()
803+ ))
804+ .ok()
805+ .map(|u| u.into_static());
806+807+ let after_rkey = root_uri.as_ref().and_then(|uri| {
808+ last_seen_diffs
809+ .get(uri)
810+ .and_then(|diff_uri| diff_uri.rkey().map(|rk| rk.0.to_string()))
811+ });
812+813+ if let Some(pds_state) =
814+ load_edit_state_from_root_id(client, root_id, after_rkey.as_deref()).await?
815+ {
816+ if let Err(e) = merged_doc.import(&pds_state.root_snapshot) {
817+ tracing::warn!("Failed to import root snapshot from {}: {:?}", root_did, e);
818+ continue;
819+ }
820+821+ for diff in &pds_state.diff_updates {
822+ if let Err(e) = merged_doc.import(diff) {
823+ tracing::warn!("Failed to import diff from {}: {:?}", root_did, e);
824+ }
825+ }
826+827+ if let (Some(uri), Some(last_diff)) = (&root_uri, &pds_state.last_diff_ref) {
828+ updated_last_seen.insert(uri.clone(), last_diff.uri.clone().into_static());
829+ }
830+831+ if merged_doc_ref.is_none() {
832+ merged_doc_ref = Some(pds_state.doc_ref.clone());
833+ }
834+835+ let is_our_root = current_did.is_some_and(|did| root_did == *did);
836+837+ if is_our_root {
838+ our_root_ref = Some(pds_state.root_ref);
839+ our_last_diff_ref = pds_state.last_diff_ref;
840+ } else if our_root_ref.is_none() {
841+ our_root_ref = Some(pds_state.root_ref);
842+ our_last_diff_ref = pds_state.last_diff_ref;
843+ }
844+ }
845+ }
846+847+ let merged_snapshot = merged_doc
848+ .export(ExportMode::Snapshot)
849+ .map_err(|e| CrdtError::Loro(format!("export merged: {}", e)))?;
850+851+ Ok(our_root_ref.map(|root_ref| PdsEditState {
852+ root_ref,
853+ last_diff_ref: our_last_diff_ref,
854+ root_snapshot: merged_snapshot.into(),
855+ diff_updates: vec![],
856+ last_seen_diffs: updated_last_seen,
857+ doc_ref: merged_doc_ref.expect("Should have doc_ref if we have root"),
858+ }))
859+}
860+861+/// Remote draft info from PDS.
862+#[derive(Clone, Debug)]
863+pub struct RemoteDraft {
864+ /// The draft record URI.
865+ pub uri: AtUri<'static>,
866+ /// The rkey (TID) of the draft.
867+ pub rkey: String,
868+ /// When the draft was created.
869+ pub created_at: String,
870+}
871+872+/// List all drafts for a user using weaver-index.
873+#[cfg(feature = "use-index")]
874+pub async fn list_drafts<C>(client: &C, did: &Did<'_>) -> Result<Vec<RemoteDraft>, CrdtError>
875+where
876+ C: WeaverExt,
877+{
878+ use jacquard::types::ident::AtIdentifier;
879+ use weaver_api::sh_weaver::edit::list_drafts::ListDrafts;
880+881+ let actor = AtIdentifier::Did(did.clone().into_static());
882+ let response = client
883+ .send(ListDrafts::new().actor(actor).build())
884+ .await
885+ .map_err(|e| CrdtError::Xrpc(format!("list drafts: {}", e)))?;
886+887+ let output = response
888+ .into_output()
889+ .map_err(|e| CrdtError::Xrpc(format!("parse list drafts: {}", e)))?;
890+891+ tracing::debug!("list_drafts (index): found {} drafts", output.drafts.len());
892+893+ let drafts = output
894+ .drafts
895+ .into_iter()
896+ .filter_map(|draft| {
897+ let uri = AtUri::new(draft.uri.as_ref()).ok()?.into_static();
898+ let rkey = uri.rkey()?.0.as_str().to_string();
899+ let created_at = draft.created_at.to_string();
900+ Some(RemoteDraft {
901+ uri,
902+ rkey,
903+ created_at,
904+ })
905+ })
906+ .collect();
907+908+ Ok(drafts)
909+}
910+911+/// List all drafts for a user (direct PDS query, no index).
912+#[cfg(not(feature = "use-index"))]
913+pub async fn list_drafts<C>(client: &C, did: &Did<'_>) -> Result<Vec<RemoteDraft>, CrdtError>
914+where
915+ C: WeaverExt,
916+{
917+ use weaver_api::com_atproto::repo::list_records::ListRecords;
918+919+ let pds_url = client
920+ .pds_for_did(did)
921+ .await
922+ .map_err(|e| CrdtError::Xrpc(format!("resolve DID: {}", e)))?;
923+924+ let collection =
925+ Nsid::new(DRAFT_NSID).map_err(|e| CrdtError::InvalidUri(format!("nsid: {}", e)))?;
926+927+ let request = ListRecords::new()
928+ .repo(did.clone())
929+ .collection(collection)
930+ .limit(100)
931+ .build();
932+933+ let response = client
934+ .xrpc(pds_url)
935+ .send(&request)
936+ .await
937+ .map_err(|e| CrdtError::Xrpc(format!("list records: {}", e)))?;
938+939+ let output = response
940+ .into_output()
941+ .map_err(|e| CrdtError::Xrpc(format!("parse list records: {}", e)))?;
942+943+ let mut drafts = Vec::new();
944+ for record in output.records {
945+ let rkey = record
946+ .uri
947+ .rkey()
948+ .map(|r| r.0.as_str().to_string())
949+ .unwrap_or_default();
950+951+ let created_at = jacquard::from_data::<Draft>(&record.value)
952+ .map(|d| d.created_at.to_string())
953+ .unwrap_or_default();
954+955+ drafts.push(RemoteDraft {
956+ uri: record.uri.into_static(),
957+ rkey,
958+ created_at,
959+ });
960+ }
961+962+ Ok(drafts)
963+}
+11
crates/weaver-editor-crdt/src/worker/mod.rs
···00000000000
···1+//! Worker implementation for off-main-thread CRDT operations.
2+//!
3+//! Currently WASM-specific using gloo-worker, but the core state machine
4+//! could be abstracted to work with any async channel pair.
5+6+mod reactor;
7+8+pub use reactor::{WorkerInput, WorkerOutput};
9+10+#[cfg(all(target_family = "wasm", target_os = "unknown"))]
11+pub use reactor::EditorReactor;