at main 1196 lines 43 kB view raw
1//! PDS synchronization for editor edit state. 2//! 3//! This module provides app-specific sync functionality built on top of 4//! `weaver_editor_crdt::sync`. It adds: 5//! - Fetcher-based API (wrapping the generic client) 6//! - Embed prefetching and blob caching 7//! - localStorage integration for document loading 8//! - Dioxus UI components for sync status 9 10use std::collections::HashMap; 11 12use super::document::{LoadedDocState, SignalEditorDocument}; 13use super::publish::load_entry_for_editing; 14use crate::fetch::Fetcher; 15use jacquard::IntoStatic; 16use jacquard::identity::resolver::IdentityResolver; 17use jacquard::prelude::*; 18use jacquard::smol_str::{SmolStr, ToSmolStr}; 19use jacquard::types::ident::AtIdentifier; 20use jacquard::types::string::{AtUri, Cid}; 21use loro::LoroDoc; 22use loro::ToJson; 23use weaver_api::com_atproto::repo::strong_ref::StrongRef; 24use weaver_api::sh_weaver::edit::draft::Draft; 25use weaver_api::sh_weaver::edit::{DocRef, DocRefValue}; 26use weaver_common::{WeaverError, WeaverExt}; 27 28// Re-export crdt sync types for convenience. 29pub use weaver_editor_crdt::{ 30 CreateRootResult, PdsEditState, RemoteDraft, SyncResult, build_draft_uri, list_drafts, 31 load_all_edit_states, load_edit_state_from_draft, load_edit_state_from_entry, 32}; 33 34/// Extract record embeds from a LoroDoc and pre-fetch their rendered content. 35/// 36/// Reads the embeds.records list from the document, extracts RecordEmbed entries, 37/// and fetches/renders each one to populate a ResolvedContent map. 38/// Also pre-warms the blob cache for images if `owner_ident` is provided. 39async fn prefetch_embeds_from_doc( 40 doc: &LoroDoc, 41 fetcher: &Fetcher, 42 owner_ident: Option<&str>, 43) -> weaver_common::ResolvedContent { 44 use weaver_api::sh_weaver::embed::images::Image; 45 use weaver_api::sh_weaver::embed::records::RecordEmbed; 46 47 let mut resolved = weaver_common::ResolvedContent::default(); 48 49 let embeds_map = doc.get_map("embeds"); 50 51 // Pre-warm blob cache for images 52 #[cfg(feature = "fullstack-server")] 53 if let Some(ident) = owner_ident { 54 if let Ok(images_container) = 55 embeds_map.get_or_create_container("images", loro::LoroList::new()) 56 { 57 for i in 0..images_container.len() { 58 let Some(value) = images_container.get(i) else { 59 continue; 60 }; 61 let Some(loro_value) = value.as_value() else { 62 continue; 63 }; 64 let json = loro_value.to_json_value(); 65 let Ok(image) = jacquard::from_json_value::<Image>(json) else { 66 continue; 67 }; 68 69 let cid = image.image.blob().cid(); 70 let name = image.name.as_ref().map(|n| n.as_ref()); 71 if let Err(e) = crate::data::cache_blob( 72 ident.into(), 73 cid.as_ref().into(), 74 name.map(|n| n.into()), 75 ) 76 .await 77 { 78 tracing::warn!("Failed to pre-warm blob cache for {}: {}", cid, e); 79 } 80 } 81 } 82 } 83 #[cfg(not(feature = "fullstack-server"))] 84 let _ = owner_ident; 85 86 // Strategy 1: Get embeds from Loro embeds map -> records list 87 88 if let Ok(records_container) = 89 embeds_map.get_or_create_container("records", loro::LoroList::new()) 90 { 91 for i in 0..records_container.len() { 92 let Some(value) = records_container.get(i) else { 93 continue; 94 }; 95 let Some(loro_value) = value.as_value() else { 96 continue; 97 }; 98 let json = loro_value.to_json_value(); 99 let Ok(record_embed) = jacquard::from_json_value::<RecordEmbed>(json) else { 100 continue; 101 }; 102 103 // name is the key used in markdown, fallback to record.uri 104 let key_uri = if let Some(ref name) = record_embed.name { 105 match AtUri::new(name.as_ref()) { 106 Ok(uri) => uri.into_static(), 107 Err(_) => continue, 108 } 109 } else { 110 record_embed.record.uri.clone().into_static() 111 }; 112 113 // Fetch and render 114 match weaver_renderer::atproto::fetch_and_render(&record_embed.record.uri, fetcher) 115 .await 116 { 117 Ok(html) => { 118 resolved.add_embed(key_uri, html, None); 119 } 120 Err(e) => { 121 tracing::warn!( 122 "Failed to pre-fetch embed {}: {}", 123 record_embed.record.uri, 124 e 125 ); 126 } 127 } 128 } 129 } 130 131 // Strategy 2: If no embeds found in Loro map, parse markdown text 132 if resolved.embed_content.is_empty() { 133 use weaver_common::{ExtractedRef, collect_refs_from_markdown}; 134 135 let text = doc.get_text("content"); 136 let markdown = text.to_string(); 137 138 if !markdown.is_empty() { 139 let refs = collect_refs_from_markdown(&markdown); 140 141 for extracted in refs { 142 if let ExtractedRef::AtEmbed { uri, .. } = extracted { 143 let key_uri = match AtUri::new(&uri) { 144 Ok(u) => u.into_static(), 145 Err(_) => continue, 146 }; 147 148 match weaver_renderer::atproto::fetch_and_render(&key_uri, fetcher).await { 149 Ok(html) => { 150 resolved.add_embed(key_uri, html, None); 151 } 152 Err(e) => { 153 tracing::warn!("Failed to pre-fetch embed {}: {}", uri, e); 154 } 155 } 156 } 157 } 158 } 159 } 160 161 resolved 162} 163 164/// Convert a DocRef to an entry_ref StrongRef. 165/// 166/// For EntryRef: returns the entry's StrongRef directly 167/// For DraftRef: parses the draft_key as AT-URI, fetches the draft record to get CID, builds StrongRef 168/// For NotebookRef: returns the notebook's StrongRef 169async fn doc_ref_to_entry_ref( 170 fetcher: &Fetcher, 171 doc_ref: &DocRef<'_>, 172) -> Option<StrongRef<'static>> { 173 match &doc_ref.value { 174 DocRefValue::EntryRef(entry_ref) => Some(entry_ref.entry.clone().into_static()), 175 DocRefValue::DraftRef(draft_ref) => { 176 // draft_key contains the canonical AT-URI: at://{did}/sh.weaver.edit.draft/{rkey} 177 let draft_uri = AtUri::new(&draft_ref.draft_key).ok()?.into_static(); 178 179 // Fetch the draft record to get its CID 180 match fetcher.client.get_record::<Draft>(&draft_uri).await { 181 Ok(response) => { 182 let output = response.into_output().ok()?; 183 let cid = output.cid?.into_static(); 184 Some(StrongRef::new().uri(draft_uri).cid(cid).build()) 185 } 186 Err(e) => { 187 tracing::warn!("Failed to fetch draft record for entry_ref: {}", e); 188 None 189 } 190 } 191 } 192 DocRefValue::NotebookRef(notebook_ref) => Some(notebook_ref.notebook.clone().into_static()), 193 DocRefValue::Unknown(_) => { 194 tracing::warn!("Unknown DocRefValue variant, cannot convert to entry_ref"); 195 None 196 } 197 } 198} 199 200/// List all drafts from PDS for the current user. 201/// 202/// Wraps the crdt crate's list_drafts function with Fetcher support. 203pub async fn list_drafts_from_pds(fetcher: &Fetcher) -> Result<Vec<RemoteDraft>, WeaverError> { 204 let did = fetcher 205 .current_did() 206 .await 207 .ok_or_else(|| WeaverError::InvalidNotebook("Not authenticated".into()))?; 208 209 list_drafts(fetcher.get_client().as_ref(), &did) 210 .await 211 .map_err(|e| WeaverError::InvalidNotebook(e.to_string())) 212} 213 214/// Create the edit root record for an entry. 215/// 216/// Wraps the crdt crate's create_edit_root with Fetcher support. 217pub async fn create_edit_root( 218 fetcher: &Fetcher, 219 doc: &SignalEditorDocument, 220 draft_key: &str, 221 entry_uri: Option<&AtUri<'_>>, 222 entry_cid: Option<&Cid<'_>>, 223) -> Result<CreateRootResult, WeaverError> { 224 weaver_editor_crdt::create_edit_root( 225 fetcher.get_client().as_ref(), 226 doc, 227 draft_key, 228 entry_uri, 229 entry_cid, 230 ) 231 .await 232 .map_err(|e| WeaverError::InvalidNotebook(e.to_string())) 233} 234 235/// Create a diff record with updates since the last sync. 236/// 237/// Wraps the crdt crate's create_diff with Fetcher support. 238pub async fn create_diff( 239 fetcher: &Fetcher, 240 doc: &SignalEditorDocument, 241 root_uri: &AtUri<'_>, 242 root_cid: &Cid<'_>, 243 prev_diff: Option<(&AtUri<'_>, &Cid<'_>)>, 244 draft_key: &str, 245 entry_uri: Option<&AtUri<'_>>, 246 entry_cid: Option<&Cid<'_>>, 247) -> Result<Option<(AtUri<'static>, Cid<'static>)>, WeaverError> { 248 weaver_editor_crdt::create_diff( 249 fetcher.get_client().as_ref(), 250 doc, 251 root_uri, 252 root_cid, 253 prev_diff, 254 draft_key, 255 entry_uri, 256 entry_cid, 257 ) 258 .await 259 .map_err(|e| WeaverError::InvalidNotebook(e.to_string())) 260} 261 262/// Sync the document to the PDS. 263/// 264/// If no edit root exists, creates one with a full snapshot. 265/// If a root exists, creates a diff with updates since last sync. 266/// Updates the document's sync state on success. 267pub async fn sync_to_pds( 268 fetcher: &Fetcher, 269 doc: &mut SignalEditorDocument, 270 draft_key: &str, 271) -> Result<SyncResult, WeaverError> { 272 let fn_start = crate::perf::now(); 273 274 // Check if we have changes to sync 275 if !doc.has_unsynced_changes() { 276 return Ok(SyncResult::NoChanges); 277 } 278 279 // Get entry info if published 280 let entry_ref = doc.entry_ref(); 281 282 if doc.edit_root().is_none() { 283 // First sync - create root 284 let create_start = crate::perf::now(); 285 let result = create_edit_root( 286 fetcher, 287 doc, 288 draft_key, 289 entry_ref.as_ref().map(|r| &r.uri), 290 entry_ref.as_ref().map(|r| &r.cid), 291 ) 292 .await?; 293 let create_ms = crate::perf::now() - create_start; 294 295 // Build StrongRef for the root 296 let root_ref = StrongRef::new() 297 .uri(result.root_uri.clone()) 298 .cid(result.root_cid.clone()) 299 .build(); 300 301 // Update document state 302 doc.set_edit_root(Some(root_ref)); 303 doc.set_last_diff(None); 304 doc.mark_synced(); 305 306 // For drafts: set entry_ref to the draft record (enables draft discovery/recovery) 307 if let Some(draft_ref) = result.draft_ref { 308 if doc.entry_ref().is_none() { 309 tracing::debug!("Setting entry_ref to draft: {}", draft_ref.uri); 310 doc.set_entry_ref(Some(draft_ref)); 311 } 312 } 313 314 let total_ms = crate::perf::now() - fn_start; 315 tracing::debug!(total_ms, create_ms, "sync_to_pds: created root"); 316 317 Ok(SyncResult::CreatedRoot { 318 uri: result.root_uri, 319 cid: result.root_cid, 320 }) 321 } else { 322 // Subsequent sync - create diff 323 let root_ref = doc.edit_root().unwrap(); 324 let prev_diff = doc.last_diff(); 325 326 let create_start = crate::perf::now(); 327 let result = create_diff( 328 fetcher, 329 doc, 330 &root_ref.uri, 331 &root_ref.cid, 332 prev_diff.as_ref().map(|d| (&d.uri, &d.cid)), 333 draft_key, 334 entry_ref.as_ref().map(|r| &r.uri), 335 entry_ref.as_ref().map(|r| &r.cid), 336 ) 337 .await?; 338 let create_ms = crate::perf::now() - create_start; 339 340 match result { 341 Some((diff_uri, diff_cid)) => { 342 // Build StrongRef for the diff 343 let diff_ref = StrongRef::new() 344 .uri(diff_uri.clone()) 345 .cid(diff_cid.clone()) 346 .build(); 347 348 doc.set_last_diff(Some(diff_ref)); 349 doc.mark_synced(); 350 351 let total_ms = crate::perf::now() - fn_start; 352 tracing::debug!(total_ms, create_ms, "sync_to_pds: created diff"); 353 354 Ok(SyncResult::CreatedDiff { 355 uri: diff_uri, 356 cid: diff_cid, 357 }) 358 } 359 None => { 360 let total_ms = crate::perf::now() - fn_start; 361 tracing::debug!(total_ms, create_ms, "sync_to_pds: no changes in diff"); 362 Ok(SyncResult::NoChanges) 363 } 364 } 365 } 366} 367 368/// Load edit state from the PDS for an entry. 369/// 370/// Wraps the crdt crate's load_edit_state_from_entry with Fetcher support. 371pub async fn load_edit_state_from_pds( 372 fetcher: &Fetcher, 373 entry_uri: &AtUri<'_>, 374) -> Result<Option<PdsEditState>, WeaverError> { 375 let client = fetcher.get_client(); 376 // Get collaborators for this resource. 377 let collaborators = client 378 .find_collaborators_for_resource(entry_uri) 379 .await 380 .unwrap_or_default(); 381 382 load_edit_state_from_entry(client.as_ref(), entry_uri, collaborators) 383 .await 384 .map_err(|e| WeaverError::InvalidNotebook(e.to_string())) 385} 386 387/// Load edit state from ALL collaborator repos for an entry, returning merged state. 388/// 389/// Wraps the crdt crate's load_all_edit_states with Fetcher support. 390pub async fn load_all_edit_states_from_pds( 391 fetcher: &Fetcher, 392 entry_uri: &AtUri<'_>, 393 last_seen_diffs: &HashMap<AtUri<'static>, AtUri<'static>>, 394) -> Result<Option<PdsEditState>, WeaverError> { 395 let client = fetcher.get_client(); 396 397 // Get collaborators for this resource. 398 let collaborators = client 399 .find_collaborators_for_resource(entry_uri) 400 .await 401 .unwrap_or_default(); 402 403 let current_did = fetcher.current_did().await; 404 405 load_all_edit_states( 406 client.as_ref(), 407 entry_uri, 408 collaborators, 409 current_did.as_ref(), 410 last_seen_diffs, 411 ) 412 .await 413 .map_err(|e| WeaverError::InvalidNotebook(e.to_string())) 414} 415 416/// Load document state by merging local storage and PDS state. 417/// 418/// Loads from localStorage and PDS (if available), then merges both using Loro's 419/// CRDT merge. The result is a pre-merged LoroDoc that can be converted to an 420/// SignalEditorDocument inside a reactive context using `use_hook`. 421/// 422/// For unpublished drafts, attempts to discover edit state via Constellation 423/// using the synthetic draft URI. 424pub async fn load_and_merge_document( 425 fetcher: &Fetcher, 426 draft_key: &str, 427 entry_uri: Option<&AtUri<'_>>, 428) -> Result<Option<LoadedDocState>, WeaverError> { 429 use super::storage::load_snapshot_from_storage; 430 431 // Load snapshot + entry_ref from localStorage 432 let local_data = load_snapshot_from_storage(draft_key); 433 434 // Load from PDS - for entries use multi-repo loading (all collaborators), 435 // for drafts use single-repo loading (draft sharing requires knowing the URI) 436 let pds_state = if let Some(uri) = entry_uri { 437 // Published entry: load from ALL collaborators (multi-repo CRDT merge) 438 let empty_last_seen = HashMap::new(); 439 load_all_edit_states_from_pds(fetcher, uri, &empty_last_seen).await? 440 } else if let Some(did) = fetcher.current_did().await { 441 // Unpublished draft: single-repo for now 442 // (draft sharing would require collaborator to know the draft URI) 443 let draft_uri = build_draft_uri(&did, draft_key); 444 load_edit_state_from_draft(fetcher.get_client().as_ref(), &draft_uri) 445 .await 446 .map_err(|e| WeaverError::InvalidNotebook(e.to_string()))? 447 } else { 448 // Not authenticated, can't query PDS 449 None 450 }; 451 452 // Extract owner identity from entry URI for blob cache warming 453 let owner_ident: Option<String> = entry_uri.map(|uri| match uri.authority() { 454 AtIdentifier::Did(d) => d.as_ref().to_string(), 455 AtIdentifier::Handle(h) => h.as_ref().to_string(), 456 }); 457 458 match (local_data, pds_state) { 459 (None, None) => Ok(None), 460 461 (Some(local), None) => { 462 // Only local state exists - build LoroDoc from snapshot 463 tracing::debug!("Loaded document from localStorage only"); 464 let doc = LoroDoc::new(); 465 if let Err(e) = doc.import(&local.snapshot) { 466 tracing::warn!("Failed to import local snapshot: {:?}", e); 467 } 468 469 let resolved_content = 470 prefetch_embeds_from_doc(&doc, fetcher, owner_ident.as_deref()).await; 471 472 Ok(Some(LoadedDocState { 473 doc, 474 entry_ref: local.entry_ref, // Restored from localStorage 475 edit_root: None, 476 last_diff: None, 477 synced_version: None, // Local-only, never synced to PDS 478 last_seen_diffs: std::collections::HashMap::new(), 479 resolved_content, 480 notebook_uri: local.notebook_uri, // Restored from localStorage 481 })) 482 } 483 484 (None, Some(pds)) => { 485 // Only PDS state exists - reconstruct from snapshot + diffs 486 tracing::debug!("Loaded document from PDS only"); 487 let doc = LoroDoc::new(); 488 489 // Import root snapshot 490 if let Err(e) = doc.import(&pds.root_snapshot) { 491 tracing::warn!("Failed to import PDS root snapshot: {:?}", e); 492 } 493 494 // Apply all diffs in order 495 for updates in &pds.diff_updates { 496 if let Err(e) = doc.import(updates) { 497 tracing::warn!("Failed to apply diff update: {:?}", e); 498 } 499 } 500 501 // Capture the version after loading all PDS state - this is our sync baseline 502 let synced_version = Some(doc.oplog_vv()); 503 504 // Reconstruct entry_ref from the DocRef stored in edit.root 505 let entry_ref = doc_ref_to_entry_ref(fetcher, &pds.doc_ref).await; 506 if entry_ref.is_some() { 507 tracing::debug!("Reconstructed entry_ref from PDS DocRef"); 508 } 509 510 let resolved_content = 511 prefetch_embeds_from_doc(&doc, fetcher, owner_ident.as_deref()).await; 512 513 Ok(Some(LoadedDocState { 514 doc, 515 entry_ref, 516 edit_root: Some(pds.root_ref), 517 last_diff: pds.last_diff_ref, 518 synced_version, // Just loaded from PDS, fully synced 519 last_seen_diffs: pds.last_seen_diffs, 520 resolved_content, 521 notebook_uri: None, // PDS-only, notebook context comes from target_notebook 522 })) 523 } 524 525 (Some(local), Some(pds)) => { 526 // Both exist - merge using CRDT 527 tracing::debug!("Merging document from localStorage and PDS"); 528 529 // First, reconstruct the PDS state to get its version vector 530 let pds_doc = LoroDoc::new(); 531 if let Err(e) = pds_doc.import(&pds.root_snapshot) { 532 tracing::warn!("Failed to import PDS root snapshot for VV: {:?}", e); 533 } 534 for updates in &pds.diff_updates { 535 if let Err(e) = pds_doc.import(updates) { 536 tracing::warn!("Failed to apply PDS diff for VV: {:?}", e); 537 } 538 } 539 let pds_version = pds_doc.oplog_vv(); 540 541 // Now create the merged doc 542 let doc = LoroDoc::new(); 543 544 // Import local snapshot first 545 if let Err(e) = doc.import(&local.snapshot) { 546 tracing::warn!("Failed to import local snapshot: {:?}", e); 547 } 548 549 // Import PDS root snapshot - Loro will merge 550 if let Err(e) = doc.import(&pds.root_snapshot) { 551 tracing::warn!("Failed to merge PDS root snapshot: {:?}", e); 552 } 553 554 // Import all diffs 555 for updates in &pds.diff_updates { 556 if let Err(e) = doc.import(updates) { 557 tracing::warn!("Failed to merge PDS diff: {:?}", e); 558 } 559 } 560 561 // Use the PDS version as our sync baseline - any local changes 562 // beyond this will be detected as unsynced 563 let resolved_content = 564 prefetch_embeds_from_doc(&doc, fetcher, owner_ident.as_deref()).await; 565 566 Ok(Some(LoadedDocState { 567 doc, 568 entry_ref: local.entry_ref, // Restored from localStorage 569 edit_root: Some(pds.root_ref), 570 last_diff: pds.last_diff_ref, 571 synced_version: Some(pds_version), 572 last_seen_diffs: pds.last_seen_diffs, 573 resolved_content, 574 notebook_uri: local.notebook_uri, // Restored from localStorage 575 })) 576 } 577 } 578} 579 580// ============================================================================ 581// Sync UI Components 582// ============================================================================ 583 584use crate::auth::AuthState; 585use dioxus::prelude::*; 586 587/// Sync status states for UI display. 588#[derive(Clone, Copy, PartialEq, Eq, Debug)] 589pub enum SyncState { 590 /// All local changes have been synced to PDS 591 Synced, 592 /// Currently syncing to PDS 593 Syncing, 594 /// Has local changes not yet synced 595 Unsynced, 596 /// Remote collaborator changes available 597 RemoteChanges, 598 /// Last sync failed 599 Error, 600 /// Not authenticated or sync disabled 601 Disabled, 602} 603 604/// Props for the SyncStatus component. 605#[derive(Props, Clone, PartialEq)] 606pub struct SyncStatusProps { 607 /// The editor document to sync 608 pub document: SignalEditorDocument, 609 /// Draft key for this document 610 pub draft_key: String, 611 /// Auto-sync interval in milliseconds (0 to disable, default disabled) 612 #[props(default = 0)] 613 pub auto_sync_interval_ms: u32, 614 /// Callback to refresh/reload document from collaborators 615 #[props(default)] 616 pub on_refresh: Option<EventHandler<()>>, 617 /// Whether this is a collaborative document (has collaborators) 618 #[props(default = false)] 619 pub is_collaborative: bool, 620} 621 622/// Sync status indicator with auto-sync functionality. 623/// 624/// Displays the current sync state and automatically syncs to PDS periodically. 625/// Initially shows "Start syncing" until user activates sync, then auto-syncs. 626#[component] 627pub fn SyncStatus(props: SyncStatusProps) -> Element { 628 let fetcher = use_context::<Fetcher>(); 629 let auth_state = use_context::<Signal<AuthState>>(); 630 631 let doc = props.document.clone(); 632 let draft_key = props.draft_key.clone(); 633 634 // Sync activated - true if sync has been started (either manually or doc already has edit_root) 635 // Once activated, auto-sync is enabled 636 let mut sync_activated = use_signal(|| { 637 // If document already has an edit_root, syncing is already active 638 props.document.edit_root().is_some() 639 }); 640 641 // Sync state management 642 let mut sync_state = use_signal(|| { 643 if props.document.has_unsynced_changes() { 644 SyncState::Unsynced 645 } else { 646 SyncState::Synced 647 } 648 }); 649 let mut last_error: Signal<Option<String>> = use_signal(|| None); 650 651 // Check if we're authenticated (drafts can sync via DraftRef even without entry) 652 let is_authenticated = auth_state.read().is_authenticated(); 653 654 // Auto-sync trigger signal - set to true to trigger a sync 655 let mut trigger_sync = use_signal(|| false); 656 657 // Auto-sync timer - only triggers after sync has been activated 658 { 659 let doc_for_check = doc.clone(); 660 661 // Use 30s interval for auto-sync once activated 662 dioxus_sdk::time::use_interval(std::time::Duration::from_secs(30), move |_| { 663 // Only auto-sync if activated 664 if !*sync_activated.peek() { 665 return; 666 } 667 // Only trigger if there are unsynced changes 668 if doc_for_check.has_unsynced_changes() { 669 trigger_sync.set(true); 670 } 671 }); 672 } 673 674 // Collaborator poll timer - checks for collaborator updates periodically 675 // For collaborative documents, poll every 60s 676 // - If user has been idle ≥30s: auto-trigger refresh 677 // - If user is actively editing: show RemoteChanges state 678 { 679 let is_collaborative = props.is_collaborative; 680 let on_refresh = props.on_refresh.clone(); 681 let doc_for_idle = doc.clone(); 682 683 dioxus_sdk::time::use_interval(std::time::Duration::from_secs(60), move |_| { 684 if !is_collaborative { 685 return; 686 } 687 688 let idle_threshold = std::time::Duration::from_secs(30); 689 690 // Check time since last edit 691 let is_idle = match doc_for_idle.last_edit() { 692 Some(edit_info) => edit_info.timestamp.elapsed() >= idle_threshold, 693 None => true, // No edits yet = idle 694 }; 695 696 if is_idle { 697 // User is idle - safe to auto-refresh 698 if let Some(ref handler) = on_refresh { 699 handler.call(()); 700 } 701 } else { 702 // User is actively editing - show remote changes indicator 703 sync_state.set(SyncState::RemoteChanges); 704 } 705 }); 706 } 707 708 // Update sync state when document changes 709 // Note: We use peek() to avoid creating a reactive dependency on sync_state 710 let doc_for_effect = doc.clone(); 711 use_effect(move || { 712 // Read content_changed to create reactive dependency on document changes 713 let _ = doc_for_effect.content_changed.read(); 714 715 // Use peek to avoid reactive loop 716 let current_state = *sync_state.peek(); 717 if current_state != SyncState::Syncing { 718 if doc_for_effect.has_unsynced_changes() && current_state != SyncState::Unsynced { 719 sync_state.set(SyncState::Unsynced); 720 } 721 } 722 }); 723 724 // Sync effect - watches trigger_sync and performs sync when triggered 725 let doc_for_sync = doc.clone(); 726 let draft_key_for_sync = draft_key.clone(); 727 let fetcher_for_sync = fetcher.clone(); 728 729 let doc_for_check = doc.clone(); 730 use_effect(move || { 731 // Read trigger to create reactive dependency 732 let should_sync = *trigger_sync.read(); 733 734 if !should_sync { 735 return; 736 } 737 738 // Reset trigger immediately 739 trigger_sync.set(false); 740 741 // Check if already syncing 742 if *sync_state.peek() == SyncState::Syncing { 743 return; 744 } 745 746 // Check if authenticated (drafts can sync too via DraftRef) 747 if !is_authenticated { 748 return; 749 } 750 751 // Check if there are actually changes to sync 752 if !doc_for_check.has_unsynced_changes() { 753 // Already synced, just update state 754 sync_state.set(SyncState::Synced); 755 return; 756 } 757 758 sync_state.set(SyncState::Syncing); 759 760 let mut doc = doc_for_sync.clone(); 761 let draft_key = draft_key_for_sync.clone(); 762 let fetcher = fetcher_for_sync.clone(); 763 764 // Spawn the async work 765 spawn(async move { 766 match sync_to_pds(&fetcher, &mut doc, &draft_key).await { 767 Ok(SyncResult::NoChanges) => { 768 // No changes to sync - already up to date 769 sync_state.set(SyncState::Synced); 770 last_error.set(None); 771 tracing::debug!("No changes to sync"); 772 } 773 Ok(_) => { 774 sync_state.set(SyncState::Synced); 775 last_error.set(None); 776 // Activate auto-sync after first successful sync 777 if !*sync_activated.peek() { 778 sync_activated.set(true); 779 tracing::debug!("Sync activated - auto-sync enabled"); 780 } 781 tracing::debug!("Sync completed successfully"); 782 } 783 Err(e) => { 784 sync_state.set(SyncState::Error); 785 last_error.set(Some(e.to_string())); 786 tracing::warn!("Sync failed: {}", e); 787 } 788 } 789 }); 790 }); 791 792 // Determine display state (drafts can sync too via DraftRef) 793 let is_activated = *sync_activated.read(); 794 let display_state = if !is_authenticated { 795 SyncState::Disabled 796 } else { 797 *sync_state.read() 798 }; 799 800 // Before activation: show "Start syncing" button 801 // After activation: show normal sync states 802 let (icon, label, class) = if !is_activated && is_authenticated { 803 ("", "Start syncing", "sync-status start-sync") 804 } else { 805 match display_state { 806 SyncState::Synced => ("", "Synced", "sync-status synced"), 807 SyncState::Syncing => ("", "Syncing...", "sync-status syncing"), 808 SyncState::Unsynced => ("", "Unsynced", "sync-status unsynced"), 809 SyncState::RemoteChanges => ("", "Updates", "sync-status remote-changes"), 810 SyncState::Error => ("", "Sync error", "sync-status error"), 811 SyncState::Disabled => ("", "Sync disabled", "sync-status disabled"), 812 } 813 }; 814 815 // Long-press detection for deactivating sync 816 let mut long_press_active = use_signal(|| false); 817 #[cfg(target_arch = "wasm32")] 818 let mut long_press_timeout: Signal<Option<gloo_timers::callback::Timeout>> = 819 use_signal(|| None); 820 821 let on_pointer_down = move |_: dioxus::events::PointerEvent| { 822 // Only allow deactivation if sync is currently activated 823 if !*sync_activated.peek() { 824 return; 825 } 826 827 long_press_active.set(true); 828 829 // Start 1 second timer for long press 830 #[cfg(target_arch = "wasm32")] 831 let timeout = gloo_timers::callback::Timeout::new(1000, move || { 832 if *long_press_active.peek() { 833 sync_activated.set(false); 834 long_press_active.set(false); 835 tracing::debug!("Sync deactivated via long press"); 836 } 837 }); 838 #[cfg(target_arch = "wasm32")] 839 long_press_timeout.set(Some(timeout)); 840 }; 841 842 let on_pointer_up = move |_: dioxus::events::PointerEvent| { 843 long_press_active.set(false); 844 // Cancel the timeout by dropping it 845 #[cfg(target_arch = "wasm32")] 846 long_press_timeout.set(None); 847 }; 848 849 let on_pointer_leave = move |_: dioxus::events::PointerEvent| { 850 long_press_active.set(false); 851 #[cfg(target_arch = "wasm32")] 852 long_press_timeout.set(None); 853 }; 854 855 // Combined sync handler - pulls remote changes first if needed, then pushes local 856 let doc_for_sync = doc.clone(); 857 let on_sync_click = { 858 let on_refresh = props.on_refresh.clone(); 859 let current_state = display_state; 860 move |_: dioxus::events::MouseEvent| { 861 // Don't trigger click if long press just fired 862 if !*sync_activated.peek() && *long_press_active.peek() { 863 return; 864 } 865 866 if *sync_state.peek() == SyncState::Syncing { 867 return; // Already syncing 868 } 869 // If there are remote changes, pull them first 870 if current_state == SyncState::RemoteChanges { 871 if let Some(ref handler) = on_refresh { 872 handler.call(()); 873 } 874 } 875 // Trigger sync if there are local changes 876 if doc_for_sync.has_unsynced_changes() { 877 trigger_sync.set(true); 878 } else if current_state != SyncState::RemoteChanges { 879 sync_state.set(SyncState::Synced); 880 } 881 } 882 }; 883 884 // Show tooltip hint about long-press when sync is active 885 let title = if is_activated { 886 if let Some(ref err) = *last_error.read() { 887 err.clone() 888 } else { 889 format!("{} (hold to stop syncing)", label) 890 } 891 } else { 892 label.to_string() 893 }; 894 895 rsx! { 896 div { 897 class: "{class}", 898 title: "{title}", 899 role: "status", 900 aria_live: "polite", 901 onclick: on_sync_click, 902 onpointerdown: on_pointer_down, 903 onpointerup: on_pointer_up, 904 onpointerleave: on_pointer_leave, 905 906 span { class: "sync-icon", "{icon}" } 907 span { class: "sync-label", "{label}" } 908 } 909 } 910} 911 912// === Editor state loading === 913 914/// Result of loading editor state. 915#[derive(Clone)] 916pub enum LoadEditorResult { 917 /// Document state loaded successfully. 918 Loaded(LoadedDocState), 919 /// Loading failed with error message. 920 Failed(String), 921} 922 923/// Load editor state from various sources. 924/// 925/// This function handles the complete loading flow: 926/// 1. Resolves notebook title to URI if provided 927/// 2. Tries to load and merge from localStorage + PDS edit state 928/// 3. Falls back to loading entry content if no edit state exists 929/// 4. Creates new document with initial content if nothing exists 930/// 931/// # Arguments 932/// - `fetcher`: The fetcher for API calls 933/// - `draft_key`: Unique key for localStorage (entry URI or "new:{tid}") 934/// - `entry_uri`: Optional AT-URI of existing entry to edit 935/// - `initial_content`: Optional initial markdown for new entries 936/// - `target_notebook`: Optional notebook title to resolve to URI 937pub async fn load_editor_state( 938 fetcher: &Fetcher, 939 draft_key: &str, 940 entry_uri: Option<&AtUri<'static>>, 941 initial_content: Option<&str>, 942 target_notebook: Option<&str>, 943) -> LoadEditorResult { 944 // Resolve target_notebook to a URI if provided. 945 let notebook_uri: Option<SmolStr> = if let Some(title) = target_notebook { 946 if let Some(did) = fetcher.current_did().await { 947 let ident = AtIdentifier::Did(did); 948 match fetcher.get_notebook(ident, title.into()).await { 949 Ok(Some(notebook_data)) => Some(notebook_data.0.uri.to_smolstr()), 950 Ok(None) | Err(_) => { 951 tracing::debug!("Could not resolve notebook '{}' to URI", title); 952 None 953 } 954 } 955 } else { 956 None 957 } 958 } else { 959 None 960 }; 961 962 match load_and_merge_document(fetcher, draft_key, entry_uri).await { 963 Ok(Some(mut state)) => { 964 tracing::debug!("Loaded merged document state"); 965 if state.notebook_uri.is_none() { 966 state.notebook_uri = notebook_uri; 967 } 968 return LoadEditorResult::Loaded(state); 969 } 970 Ok(None) => { 971 // No existing state - check if we need to load entry content. 972 if let Some(uri) = entry_uri { 973 // Check that this entry belongs to the current user. 974 if let Some(current_did) = fetcher.current_did().await { 975 let entry_authority = uri.authority(); 976 let is_own_entry = match entry_authority { 977 AtIdentifier::Did(did) => did == &current_did, 978 AtIdentifier::Handle(handle) => { 979 match fetcher.client.resolve_handle(handle).await { 980 Ok(resolved_did) => resolved_did == current_did, 981 Err(_) => false, 982 } 983 } 984 }; 985 if !is_own_entry { 986 tracing::warn!( 987 "Cannot edit entry belonging to another user: {}", 988 entry_authority 989 ); 990 return LoadEditorResult::Failed( 991 "You can only edit your own entries".to_string(), 992 ); 993 } 994 } 995 996 match load_entry_for_editing(fetcher, uri).await { 997 Ok(loaded) => { 998 return LoadEditorResult::Loaded( 999 create_state_from_entry(fetcher, &loaded, uri, notebook_uri).await, 1000 ); 1001 } 1002 Err(e) => { 1003 tracing::error!("Failed to load entry: {}", e); 1004 return LoadEditorResult::Failed(e.to_string()); 1005 } 1006 } 1007 } 1008 1009 // New document with initial content. 1010 let doc = LoroDoc::new(); 1011 if let Some(content) = initial_content { 1012 let text = doc.get_text("content"); 1013 text.insert(0, content).ok(); 1014 doc.commit(); 1015 } 1016 1017 LoadEditorResult::Loaded(LoadedDocState { 1018 doc, 1019 entry_ref: None, 1020 edit_root: None, 1021 last_diff: None, 1022 synced_version: None, 1023 last_seen_diffs: HashMap::new(), 1024 resolved_content: weaver_common::ResolvedContent::default(), 1025 notebook_uri, 1026 }) 1027 } 1028 Err(e) => { 1029 tracing::error!("Failed to load document state: {}", e); 1030 LoadEditorResult::Failed(e.to_string()) 1031 } 1032 } 1033} 1034 1035/// Create LoadedDocState from a loaded entry. 1036/// 1037/// Handles: 1038/// - Creating LoroDoc and populating with entry data 1039/// - Restoring embeds (images and records) 1040/// - Pre-warming blob cache (server feature) 1041/// - Pre-fetching embed content for initial render 1042async fn create_state_from_entry( 1043 fetcher: &Fetcher, 1044 loaded: &super::publish::LoadedEntry, 1045 uri: &AtUri<'_>, 1046 notebook_uri: Option<SmolStr>, 1047) -> LoadedDocState { 1048 let doc = LoroDoc::new(); 1049 let content = doc.get_text("content"); 1050 let title = doc.get_text("title"); 1051 let path = doc.get_text("path"); 1052 let tags = doc.get_list("tags"); 1053 1054 content.insert(0, loaded.entry.content.as_ref()).ok(); 1055 title.insert(0, loaded.entry.title.as_ref()).ok(); 1056 path.insert(0, loaded.entry.path.as_ref()).ok(); 1057 if let Some(ref entry_tags) = loaded.entry.tags { 1058 for tag in entry_tags { 1059 let tag_str: &str = tag.as_ref(); 1060 tags.push(tag_str).ok(); 1061 } 1062 } 1063 1064 // Restore existing embeds from the entry. 1065 if let Some(ref embeds) = loaded.entry.embeds { 1066 let embeds_map = doc.get_map("embeds"); 1067 1068 if let Some(ref images) = embeds.images { 1069 let images_list = embeds_map 1070 .get_or_create_container("images", loro::LoroList::new()) 1071 .expect("images list"); 1072 for image in &images.images { 1073 let json = serde_json::to_value(image).expect("Image serializes"); 1074 images_list.push(json).ok(); 1075 } 1076 } 1077 1078 if let Some(ref records) = embeds.records { 1079 let records_list = embeds_map 1080 .get_or_create_container("records", loro::LoroList::new()) 1081 .expect("records list"); 1082 for record in &records.records { 1083 let json = serde_json::to_value(record).expect("RecordEmbed serializes"); 1084 records_list.push(json).ok(); 1085 } 1086 } 1087 } 1088 1089 doc.commit(); 1090 1091 // Pre-warm blob cache for images. 1092 #[cfg(feature = "fullstack-server")] 1093 if let Some(ref embeds) = loaded.entry.embeds { 1094 if let Some(ref images) = embeds.images { 1095 let ident: &str = match uri.authority() { 1096 AtIdentifier::Did(d) => d.as_ref(), 1097 AtIdentifier::Handle(h) => h.as_ref(), 1098 }; 1099 for image in &images.images { 1100 let cid = image.image.blob().cid(); 1101 let name = image.name.as_ref().map(|n| n.as_ref()); 1102 if let Err(e) = crate::data::cache_blob( 1103 ident.into(), 1104 cid.as_ref().into(), 1105 name.map(|n| n.into()), 1106 ) 1107 .await 1108 { 1109 tracing::warn!("Failed to pre-warm blob cache for {}: {}", cid, e); 1110 } 1111 } 1112 } 1113 } 1114 1115 // Pre-fetch embeds for initial render. 1116 let resolved_content = prefetch_embeds_for_entry(fetcher, &doc, &loaded.entry.embeds).await; 1117 1118 LoadedDocState { 1119 doc, 1120 entry_ref: Some(loaded.entry_ref.clone()), 1121 edit_root: None, 1122 last_diff: None, 1123 synced_version: None, 1124 last_seen_diffs: HashMap::new(), 1125 resolved_content, 1126 notebook_uri, 1127 } 1128} 1129 1130/// Pre-fetch embed content for an entry being loaded. 1131async fn prefetch_embeds_for_entry( 1132 fetcher: &Fetcher, 1133 doc: &LoroDoc, 1134 embeds: &Option<weaver_api::sh_weaver::notebook::entry::EntryEmbeds<'_>>, 1135) -> weaver_common::ResolvedContent { 1136 use weaver_common::{ExtractedRef, collect_refs_from_markdown}; 1137 1138 let mut resolved_content = weaver_common::ResolvedContent::default(); 1139 1140 if let Some(embeds) = embeds { 1141 if let Some(ref records) = embeds.records { 1142 for record in &records.records { 1143 let key_uri = if let Some(ref name) = record.name { 1144 match AtUri::new(name.as_ref()) { 1145 Ok(uri) => uri.into_static(), 1146 Err(_) => continue, 1147 } 1148 } else { 1149 record.record.uri.clone().into_static() 1150 }; 1151 1152 match weaver_renderer::atproto::fetch_and_render(&record.record.uri, fetcher).await 1153 { 1154 Ok(html) => { 1155 resolved_content.add_embed(key_uri, html, None); 1156 } 1157 Err(e) => { 1158 tracing::warn!("Failed to pre-fetch embed {}: {}", record.record.uri, e); 1159 } 1160 } 1161 } 1162 } 1163 } 1164 1165 // Fall back to parsing markdown if no embeds in record. 1166 if resolved_content.embed_content.is_empty() { 1167 let text = doc.get_text("content"); 1168 let markdown = text.to_string(); 1169 1170 if !markdown.is_empty() { 1171 tracing::debug!("Falling back to markdown parsing for embeds"); 1172 let refs = collect_refs_from_markdown(&markdown); 1173 1174 for extracted in refs { 1175 if let ExtractedRef::AtEmbed { uri, .. } = extracted { 1176 let key_uri = match AtUri::new(&uri) { 1177 Ok(u) => u.into_static(), 1178 Err(_) => continue, 1179 }; 1180 1181 match weaver_renderer::atproto::fetch_and_render(&key_uri, fetcher).await { 1182 Ok(html) => { 1183 tracing::debug!("Pre-fetched embed from markdown: {}", uri); 1184 resolved_content.add_embed(key_uri, html, None); 1185 } 1186 Err(e) => { 1187 tracing::warn!("Failed to pre-fetch embed {}: {}", uri, e); 1188 } 1189 } 1190 } 1191 } 1192 } 1193 } 1194 1195 resolved_content 1196}