collab endpoints, etc.

Orual 641d199e ac534ffe

+1532 -44
-10
crates/weaver-app/.env-dev
··· 1 - WEAVER_APP_ENV="dev" 2 - WEAVER_APP_HOST="http://localhost" 3 - WEAVER_APP_DOMAIN="" 4 - WEAVER_PORT=8080 5 - WEAVER_APP_SCOPES="atproto transition:generic" 6 - WEAVER_CLIENT_NAME="Weaver" 7 - 8 - WEAVER_LOGO_URI="" 9 - WEAVER_TOS_URI="" 10 - WEAVER_PRIVACY_POLICY_URI=""
+13
crates/weaver-app/.env-prod
··· 1 + WEAVER_APP_ENV="prod" 2 + WEAVER_APP_HOST="https://alpha.weaver.sh" 3 + WEAVER_APP_DOMAIN="https://alpha.weaver.sh" 4 + WEAVER_PORT=8080 5 + WEAVER_APP_SCOPES="atproto transition:generic" 6 + WEAVER_CLIENT_NAME="Weaver" 7 + 8 + WEAVER_LOGO_URI="https://alpha.weaver.sh/favicon.ico" 9 + WEAVER_TOS_URI="https://alpha.weaver.sh/tos" 10 + WEAVER_PRIVACY_POLICY_URI="https://alpha.weaver.sh/privacy" 11 + WEAVER_OWNER_DID="did:plc:yfvwmnlztr4dwkb7hwz55r2g" 12 + WEAVER_INDEXER_URL="https://index.weaver.sh" 13 + WEAVER_INDEXER_DID="did:web:index.weaver.sh"
+3 -2
crates/weaver-app/Cargo.toml
··· 23 23 required-features = ["web"] 24 24 25 25 [features] 26 - default = ["web", "fullstack-server", "no-app-index"] 26 + default = ["web", "fullstack-server", "use-index"] 27 27 # Fullstack mode with SSR and server functions 28 28 fullstack-server = ["dioxus/fullstack"] 29 29 wasm-split = ["dioxus/wasm-split"] 30 - no-app-index = [] 30 + # Use weaver-index for reads (proxied when authenticated) 31 + use-index = ["weaver-common/use-index"] 31 32 32 33 web = ["dioxus/web", "dioxus-primitives/web"] 33 34 desktop = ["dioxus/desktop"]
+13
crates/weaver-app/build.rs
··· 11 11 dotenv().ok(); 12 12 f.write_all(b"// This file is automatically generated by build.rs\n\n") 13 13 .unwrap(); 14 + 15 + // Track which keys we've written 16 + let mut written_keys = std::collections::HashSet::new(); 17 + 14 18 for (key, value) in env::vars() { 15 19 if key.starts_with("WEAVER_") { 16 20 let line = format!( ··· 18 22 key, 19 23 value.replace("\"", "\\\"") 20 24 ); 25 + f.write_all(line.as_bytes()).unwrap(); 26 + written_keys.insert(key); 27 + } 28 + } 29 + 30 + // Ensure index-related constants are always defined (even if empty) 31 + for key in ["WEAVER_INDEXER_URL", "WEAVER_INDEXER_DID"] { 32 + if !written_keys.contains(key) { 33 + let line = format!("#[allow(unused)]\npub const {}: &'static str = \"\";\n", key); 21 34 f.write_all(line.as_bytes()).unwrap(); 22 35 } 23 36 }
+10 -6
crates/weaver-app/src/env.rs
··· 1 1 // This file is automatically generated by build.rs 2 2 3 3 #[allow(unused)] 4 - pub const WEAVER_APP_ENV: &'static str = "prod"; 4 + pub const WEAVER_APP_ENV: &'static str = "dev"; 5 5 #[allow(unused)] 6 - pub const WEAVER_APP_HOST: &'static str = "https://alpha.weaver.sh"; 6 + pub const WEAVER_APP_HOST: &'static str = "http://localhost"; 7 7 #[allow(unused)] 8 - pub const WEAVER_APP_DOMAIN: &'static str = "https://alpha.weaver.sh"; 8 + pub const WEAVER_APP_DOMAIN: &'static str = ""; 9 9 #[allow(unused)] 10 10 pub const WEAVER_PORT: &'static str = "8080"; 11 11 #[allow(unused)] ··· 13 13 #[allow(unused)] 14 14 pub const WEAVER_CLIENT_NAME: &'static str = "Weaver"; 15 15 #[allow(unused)] 16 - pub const WEAVER_LOGO_URI: &'static str = "https://alpha.weaver.sh/favicon.ico"; 16 + pub const WEAVER_LOGO_URI: &'static str = ""; 17 17 #[allow(unused)] 18 - pub const WEAVER_TOS_URI: &'static str = "https://alpha.weaver.sh/tos"; 18 + pub const WEAVER_TOS_URI: &'static str = ""; 19 19 #[allow(unused)] 20 - pub const WEAVER_PRIVACY_POLICY_URI: &'static str = "https://alpha.weaver.sh/privacy"; 20 + pub const WEAVER_PRIVACY_POLICY_URI: &'static str = ""; 21 + #[allow(unused)] 22 + pub const WEAVER_INDEXER_URL: &'static str = "http://localhost:3000"; 23 + #[allow(unused)] 24 + pub const WEAVER_INDEXER_DID: &'static str = "did:web:index.weaver.sh"; 21 25 #[allow(unused)] 22 26 pub const WEAVER_OWNER_DID: &'static str = "did:plc:yfvwmnlztr4dwkb7hwz55r2g";
+252 -9
crates/weaver-app/src/fetch.rs
··· 24 24 use jacquard::types::string::Nsid; 25 25 use jacquard::xrpc::XrpcResponse; 26 26 use jacquard::xrpc::*; 27 - use jacquard::{smol_str::{SmolStr, format_smolstr}, types::ident::AtIdentifier}; 27 + use jacquard::{ 28 + smol_str::{SmolStr, format_smolstr}, 29 + types::ident::AtIdentifier, 30 + }; 28 31 use serde::{Deserialize, Serialize}; 29 32 use std::future::Future; 30 33 use std::{sync::Arc, time::Duration}; ··· 112 115 if let Some(session) = guard.clone() { 113 116 session.base_uri().await 114 117 } else { 118 + // When unauthenticated, use index if configured 119 + #[cfg(feature = "use-index")] 120 + if !crate::env::WEAVER_INDEXER_URL.is_empty() { 121 + return CowStr::from(crate::env::WEAVER_INDEXER_URL); 122 + } 115 123 self.oauth_client.base_uri().await 116 124 } 117 125 } ··· 392 400 &self, 393 401 session: OAuthSession<JacquardResolver, crate::auth::AuthStore>, 394 402 ) { 403 + let agent = Arc::new(Agent::new(session)); 404 + 405 + // When use-index is enabled, set the atproto_proxy header for service proxying 406 + #[cfg(feature = "use-index")] 407 + if !crate::env::WEAVER_INDEXER_DID.is_empty() { 408 + let proxy_value = format!("{}#atproto_index", crate::env::WEAVER_INDEXER_DID); 409 + let mut opts = agent.opts().await; 410 + opts.atproto_proxy = Some(CowStr::from(proxy_value)); 411 + agent.set_opts(opts).await; 412 + } 413 + 395 414 let mut session_slot = self.client.session.write().await; 396 - *session_slot = Some(Arc::new(Agent::new(session))); 415 + *session_slot = Some(agent); 397 416 } 398 417 399 418 pub async fn downgrade_to_unauthenticated(&self) { ··· 508 527 } 509 528 } 510 529 530 + #[cfg(feature = "use-index")] 531 + pub async fn fetch_notebooks_from_ufos( 532 + &self, 533 + ) -> Result<Vec<Arc<(NotebookView<'static>, Vec<StrongRef<'static>>)>>> { 534 + use weaver_api::sh_weaver::notebook::book::Book; 535 + use weaver_api::sh_weaver::notebook::get_notebook_feed::GetNotebookFeed; 536 + 537 + let client = self.get_client(); 538 + 539 + let resp = client 540 + .send(GetNotebookFeed::new().limit(100).build()) 541 + .await 542 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 543 + 544 + let output = resp 545 + .into_output() 546 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 547 + 548 + let mut notebooks = Vec::new(); 549 + 550 + for notebook in output.notebooks { 551 + // Extract entry_list from the record 552 + let book: Book = jacquard::from_data(&notebook.record) 553 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 554 + let book = book.into_static(); 555 + 556 + let entries: Vec<StrongRef<'static>> = book 557 + .entry_list 558 + .into_iter() 559 + .map(IntoStatic::into_static) 560 + .collect(); 561 + 562 + let ident = notebook.uri.authority().clone().into_static(); 563 + let title = notebook 564 + .title 565 + .as_ref() 566 + .map(|t| SmolStr::new(t.as_ref())) 567 + .unwrap_or_else(|| SmolStr::new("Untitled")); 568 + 569 + let result = Arc::new((notebook.into_static(), entries)); 570 + #[cfg(feature = "server")] 571 + { 572 + cache_impl::insert(&self.notebook_key_cache, title.clone(), ident.clone()); 573 + cache_impl::insert(&self.book_cache, (ident.clone(), title), result.clone()); 574 + if let Some(path) = result.0.path.as_ref() { 575 + let path: SmolStr = path.as_ref().into(); 576 + cache_impl::insert(&self.notebook_key_cache, path.clone(), ident.clone()); 577 + cache_impl::insert(&self.book_cache, (ident, path), result.clone()); 578 + } 579 + } 580 + notebooks.push(result); 581 + } 582 + 583 + Ok(notebooks) 584 + } 585 + 586 + #[cfg(not(feature = "use-index"))] 511 587 pub async fn fetch_notebooks_from_ufos( 512 588 &self, 513 589 ) -> Result<Vec<Arc<(NotebookView<'static>, Vec<StrongRef<'static>>)>>> { ··· 530 606 // Construct URI 531 607 let uri_str = format_smolstr!( 532 608 "at://{}/{}/{}", 533 - ufos_record.did, ufos_record.collection, ufos_record.rkey 609 + ufos_record.did, 610 + ufos_record.collection, 611 + ufos_record.rkey 534 612 ); 535 - let uri = AtUri::new_owned(uri_str) 536 - .map_err(|e| dioxus::CapturedError::from_display(format_smolstr!("Invalid URI: {}", e).as_str()))?; 613 + let uri = AtUri::new_owned(uri_str).map_err(|e| { 614 + dioxus::CapturedError::from_display(format_smolstr!("Invalid URI: {}", e).as_str()) 615 + })?; 537 616 match client.view_notebook(&uri).await { 538 617 Ok((notebook, entries)) => { 539 618 let ident = uri.authority().clone().into_static(); ··· 573 652 Ok(notebooks) 574 653 } 575 654 655 + /// Fetch entries from index feed (reverse chronological) 656 + #[cfg(feature = "use-index")] 657 + pub async fn fetch_entries_from_ufos( 658 + &self, 659 + ) -> Result<Vec<Arc<(EntryView<'static>, Entry<'static>, u64)>>> { 660 + use jacquard::IntoStatic; 661 + use weaver_api::sh_weaver::notebook::entry::Entry; 662 + use weaver_api::sh_weaver::notebook::get_entry_feed::GetEntryFeed; 663 + 664 + let client = self.get_client(); 665 + 666 + let resp = client 667 + .send(GetEntryFeed::new().limit(100).build()) 668 + .await 669 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 670 + 671 + let output = resp 672 + .into_output() 673 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 674 + 675 + let mut entries = Vec::new(); 676 + 677 + for feed_entry in output.feed { 678 + let entry_view = feed_entry.entry; 679 + // indexed_at is ISO datetime, parse to get millisecond timestamp 680 + let timestamp = chrono::DateTime::parse_from_rfc3339(entry_view.indexed_at.as_str()) 681 + .map(|dt| dt.timestamp_millis() as u64) 682 + .unwrap_or(0); 683 + 684 + let entry: Entry = jacquard::from_data(&entry_view.record) 685 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 686 + let entry = entry.into_static(); 687 + 688 + entries.push(Arc::new((entry_view.into_static(), entry, timestamp))); 689 + } 690 + 691 + Ok(entries) 692 + } 693 + 576 694 /// Fetch entries from UFOS discovery service (reverse chronological) 695 + #[cfg(not(feature = "use-index"))] 577 696 pub async fn fetch_entries_from_ufos( 578 697 &self, 579 698 ) -> Result<Vec<Arc<(EntryView<'static>, Entry<'static>, u64)>>> { ··· 630 749 Ok(entries) 631 750 } 632 751 752 + #[cfg(feature = "use-index")] 753 + pub async fn fetch_notebooks_for_did( 754 + &self, 755 + ident: &AtIdentifier<'_>, 756 + ) -> Result<Vec<Arc<(NotebookView<'static>, Vec<StrongRef<'static>>)>>> { 757 + use weaver_api::sh_weaver::actor::get_actor_notebooks::GetActorNotebooks; 758 + use weaver_api::sh_weaver::notebook::book::Book; 759 + 760 + let client = self.get_client(); 761 + 762 + let resp = client 763 + .send( 764 + GetActorNotebooks::new() 765 + .actor(ident.clone()) 766 + .limit(100) 767 + .build(), 768 + ) 769 + .await 770 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 771 + 772 + let output = resp 773 + .into_output() 774 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 775 + 776 + let mut notebooks = Vec::new(); 777 + 778 + for notebook in output.notebooks { 779 + // Extract entry_list from the record 780 + let book: Book = jacquard::from_data(&notebook.record) 781 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 782 + let book = book.into_static(); 783 + 784 + let entries: Vec<StrongRef<'static>> = book 785 + .entry_list 786 + .into_iter() 787 + .map(IntoStatic::into_static) 788 + .collect(); 789 + 790 + let ident_static = notebook.uri.authority().clone().into_static(); 791 + let title = notebook 792 + .title 793 + .as_ref() 794 + .map(|t| SmolStr::new(t.as_ref())) 795 + .unwrap_or_else(|| SmolStr::new("Untitled")); 796 + 797 + let result = Arc::new((notebook.into_static(), entries)); 798 + #[cfg(feature = "server")] 799 + { 800 + cache_impl::insert( 801 + &self.notebook_key_cache, 802 + title.clone(), 803 + ident_static.clone(), 804 + ); 805 + cache_impl::insert( 806 + &self.book_cache, 807 + (ident_static.clone(), title), 808 + result.clone(), 809 + ); 810 + if let Some(path) = result.0.path.as_ref() { 811 + let path: SmolStr = path.as_ref().into(); 812 + cache_impl::insert( 813 + &self.notebook_key_cache, 814 + path.clone(), 815 + ident_static.clone(), 816 + ); 817 + cache_impl::insert(&self.book_cache, (ident_static, path), result.clone()); 818 + } 819 + } 820 + notebooks.push(result); 821 + } 822 + 823 + Ok(notebooks) 824 + } 825 + 826 + #[cfg(not(feature = "use-index"))] 633 827 pub async fn fetch_notebooks_for_did( 634 828 &self, 635 829 ident: &AtIdentifier<'_>, ··· 728 922 notebooks.push(result); 729 923 } 730 924 Err(e) => { 731 - tracing::warn!("fetch_notebooks_for_did: view_notebook failed for {}: {}", record.uri, e); 925 + tracing::warn!( 926 + "fetch_notebooks_for_did: view_notebook failed for {}: {}", 927 + record.uri, 928 + e 929 + ); 732 930 continue; 733 931 } 734 932 } ··· 738 936 } 739 937 740 938 /// Fetch all entries for a DID (for profile timeline) 939 + #[cfg(feature = "use-index")] 940 + pub async fn fetch_entries_for_did( 941 + &self, 942 + ident: &AtIdentifier<'_>, 943 + ) -> Result<Vec<Arc<(EntryView<'static>, Entry<'static>)>>> { 944 + use weaver_api::sh_weaver::actor::get_actor_entries::GetActorEntries; 945 + 946 + let client = self.get_client(); 947 + 948 + let resp = client 949 + .send( 950 + GetActorEntries::new() 951 + .actor(ident.clone()) 952 + .limit(100) 953 + .build(), 954 + ) 955 + .await 956 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 957 + 958 + let output = resp 959 + .into_output() 960 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 961 + 962 + let mut entries = Vec::new(); 963 + 964 + for entry_view in output.entries { 965 + // Deserialize Entry from the record field 966 + let entry: Entry = jacquard::from_data(&entry_view.record) 967 + .map_err(|e| dioxus::CapturedError::from_display(e))?; 968 + let entry = entry.into_static(); 969 + 970 + entries.push(Arc::new((entry_view.into_static(), entry))); 971 + } 972 + 973 + Ok(entries) 974 + } 975 + 976 + /// Fetch all entries for a DID (for profile timeline) 977 + #[cfg(not(feature = "use-index"))] 741 978 pub async fn fetch_entries_for_did( 742 979 &self, 743 980 ident: &AtIdentifier<'_>, ··· 922 1159 // Try to find notebook context via constellation 923 1160 let entry_uri = entry_view.uri.clone(); 924 1161 let at_uri = AtUri::new(entry_uri.as_ref()).map_err(|e| { 925 - dioxus::CapturedError::from_display(format_smolstr!("Invalid entry URI: {}", e).as_str()) 1162 + dioxus::CapturedError::from_display( 1163 + format_smolstr!("Invalid entry URI: {}", e).as_str(), 1164 + ) 926 1165 })?; 927 1166 928 1167 let (total, first_notebook) = client ··· 941 1180 notebook_id.rkey.0.as_str() 942 1181 ); 943 1182 let notebook_uri = AtUri::new_owned(notebook_uri_str).map_err(|e| { 944 - dioxus::CapturedError::from_display(format_smolstr!("Invalid notebook URI: {}", e).as_str()) 1183 + dioxus::CapturedError::from_display( 1184 + format_smolstr!("Invalid notebook URI: {}", e).as_str(), 1185 + ) 945 1186 })?; 946 1187 947 1188 // Fetch notebook and find entry position ··· 1032 1273 // Check if entry is in multiple notebooks - if so, clear prev/next 1033 1274 let entry_uri = book_entry_view.entry.uri.clone(); 1034 1275 let at_uri = AtUri::new(entry_uri.as_ref()).map_err(|e| { 1035 - dioxus::CapturedError::from_display(format_smolstr!("Invalid entry URI: {}", e).as_str()) 1276 + dioxus::CapturedError::from_display( 1277 + format_smolstr!("Invalid entry URI: {}", e).as_str(), 1278 + ) 1036 1279 })?; 1037 1280 1038 1281 let (total, _) = client
+1
crates/weaver-common/Cargo.toml
··· 9 9 default = ["dev"] 10 10 dev = [] 11 11 native = ["jacquard/dns"] 12 + use-index = [] 12 13 iroh = ["dep:iroh", "dep:iroh-gossip", "dep:iroh-tickets"] 13 14 telemetry = ["dep:metrics", "dep:metrics-exporter-prometheus", "dep:tracing-subscriber", "dep:tracing-loki"] 14 15
+202
crates/weaver-common/src/agent.rs
··· 359 359 /// View functions - generic versions that work with any Agent 360 360 361 361 /// Fetch a notebook and construct NotebookView with author profiles 362 + #[cfg(feature = "use-index")] 363 + fn view_notebook( 364 + &self, 365 + uri: &AtUri<'_>, 366 + ) -> impl Future<Output = Result<(NotebookView<'static>, Vec<StrongRef<'static>>), WeaverError>> 367 + where 368 + Self: Sized, 369 + { 370 + async move { 371 + use weaver_api::sh_weaver::notebook::get_notebook::GetNotebook; 372 + 373 + let resp = self 374 + .send(GetNotebook::new().notebook(uri.clone()).build()) 375 + .await 376 + .map_err(|e| AgentError::from(ClientError::from(e)))?; 377 + 378 + let output = resp.into_output().map_err(|e| { 379 + AgentError::from(ClientError::invalid_request(format!( 380 + "Failed to get notebook: {}", 381 + e 382 + ))) 383 + })?; 384 + 385 + Ok(( 386 + output.notebook.into_static(), 387 + output 388 + .entries 389 + .into_iter() 390 + .map(IntoStatic::into_static) 391 + .collect(), 392 + )) 393 + } 394 + } 395 + 396 + #[cfg(not(feature = "use-index"))] 362 397 fn view_notebook( 363 398 &self, 364 399 uri: &AtUri<'_>, ··· 608 643 } 609 644 610 645 /// Search for a notebook by title for a given DID or handle 646 + #[cfg(feature = "use-index")] 647 + fn notebook_by_title( 648 + &self, 649 + ident: &jacquard::types::ident::AtIdentifier<'_>, 650 + title: &str, 651 + ) -> impl Future< 652 + Output = Result<Option<(NotebookView<'static>, Vec<StrongRef<'static>>)>, WeaverError>, 653 + > 654 + where 655 + Self: Sized, 656 + { 657 + async move { 658 + use weaver_api::sh_weaver::notebook::resolve_notebook::ResolveNotebook; 659 + 660 + let resp = self 661 + .send( 662 + ResolveNotebook::new() 663 + .actor(ident.clone()) 664 + .name(title) 665 + .build(), 666 + ) 667 + .await 668 + .map_err(|e| AgentError::from(ClientError::from(e)))?; 669 + 670 + match resp.into_output() { 671 + Ok(output) => { 672 + // Extract StrongRefs from the BookEntryViews for compatibility 673 + let entries: Vec<StrongRef<'static>> = output 674 + .entries 675 + .iter() 676 + .map(|bev| { 677 + StrongRef::new() 678 + .uri(bev.entry.uri.clone()) 679 + .cid(bev.entry.cid.clone()) 680 + .build() 681 + .into_static() 682 + }) 683 + .collect(); 684 + 685 + Ok(Some((output.notebook.into_static(), entries))) 686 + } 687 + Err(_) => Ok(None), 688 + } 689 + } 690 + } 691 + 692 + /// Search for a notebook by title for a given DID or handle 693 + #[cfg(not(feature = "use-index"))] 611 694 fn notebook_by_title( 612 695 &self, 613 696 ident: &jacquard::types::ident::AtIdentifier<'_>, ··· 740 823 } 741 824 742 825 /// Hydrate a profile view from either weaver or bsky profile 826 + #[cfg(feature = "use-index")] 827 + fn hydrate_profile_view( 828 + &self, 829 + did: &Did<'_>, 830 + ) -> impl Future< 831 + Output = Result< 832 + ( 833 + Option<AtUri<'static>>, 834 + weaver_api::sh_weaver::actor::ProfileDataView<'static>, 835 + ), 836 + WeaverError, 837 + >, 838 + > { 839 + async move { 840 + use weaver_api::sh_weaver::actor::get_profile::GetProfile; 841 + 842 + let resp = self 843 + .send(GetProfile::new().actor(did.clone()).build()) 844 + .await 845 + .map_err(|e| AgentError::from(ClientError::from(e)))?; 846 + 847 + let output = resp.into_output().map_err(|e| { 848 + AgentError::from(ClientError::invalid_request(format!( 849 + "Failed to get profile: {}", 850 + e 851 + ))) 852 + })?; 853 + 854 + // URI is goofy in this signature, just return None for now 855 + Ok((None, output.value.into_static())) 856 + } 857 + } 858 + 859 + /// Hydrate a profile view from either weaver or bsky profile 860 + #[cfg(not(feature = "use-index"))] 743 861 fn hydrate_profile_view( 744 862 &self, 745 863 did: &Did<'_>, ··· 885 1003 } 886 1004 887 1005 /// View an entry at a specific index with prev/next navigation 1006 + #[cfg(feature = "use-index")] 1007 + fn view_entry<'a>( 1008 + &self, 1009 + notebook: &NotebookView<'a>, 1010 + _entries: &[StrongRef<'_>], 1011 + index: usize, 1012 + ) -> impl Future<Output = Result<BookEntryView<'a>, WeaverError>> { 1013 + async move { 1014 + use weaver_api::sh_weaver::notebook::get_book_entry::GetBookEntry; 1015 + 1016 + let resp = self 1017 + .send( 1018 + GetBookEntry::new() 1019 + .notebook(notebook.uri.clone()) 1020 + .index(index as i64) 1021 + .build(), 1022 + ) 1023 + .await 1024 + .map_err(|e| AgentError::from(ClientError::from(e)))?; 1025 + 1026 + let output = resp.into_output().map_err(|e| { 1027 + AgentError::from(ClientError::invalid_request(format!( 1028 + "Failed to get book entry: {}", 1029 + e 1030 + ))) 1031 + })?; 1032 + 1033 + Ok(output.value.into_static()) 1034 + } 1035 + } 1036 + 1037 + /// View an entry at a specific index with prev/next navigation 1038 + #[cfg(not(feature = "use-index"))] 888 1039 fn view_entry<'a>( 889 1040 &self, 890 1041 notebook: &NotebookView<'a>, ··· 1146 1297 /// 1147 1298 /// This bypasses notebook context entirely - useful for standalone entries 1148 1299 /// or when you have the rkey but not the notebook. 1300 + #[cfg(feature = "use-index")] 1301 + fn fetch_entry_by_rkey( 1302 + &self, 1303 + ident: &jacquard::types::ident::AtIdentifier<'_>, 1304 + rkey: &str, 1305 + ) -> impl Future<Output = Result<(EntryView<'static>, entry::Entry<'static>), WeaverError>> 1306 + where 1307 + Self: Sized, 1308 + { 1309 + async move { 1310 + use jacquard::types::collection::Collection; 1311 + use weaver_api::sh_weaver::notebook::get_entry::GetEntry; 1312 + 1313 + // Build entry URI from ident + rkey 1314 + let entry_uri_str = format!("at://{}/{}/{}", ident, entry::Entry::NSID, rkey); 1315 + let entry_uri = AtUri::new(&entry_uri_str) 1316 + .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid entry URI")))? 1317 + .into_static(); 1318 + 1319 + let resp = self 1320 + .send(GetEntry::new().uri(entry_uri).build()) 1321 + .await 1322 + .map_err(|e| AgentError::from(ClientError::from(e)))?; 1323 + 1324 + let output = resp.into_output().map_err(|e| { 1325 + AgentError::from(ClientError::invalid_request(format!( 1326 + "Failed to get entry: {}", 1327 + e 1328 + ))) 1329 + })?; 1330 + 1331 + // Clone the record for deserialization so we can consume output.value 1332 + let record_clone = output.value.record.clone(); 1333 + 1334 + // Deserialize Entry from the cloned record 1335 + let entry_value: entry::Entry = jacquard::from_data(&record_clone).map_err(|e| { 1336 + AgentError::from(ClientError::invalid_request(format!( 1337 + "Failed to deserialize entry record: {}", 1338 + e 1339 + ))) 1340 + })?; 1341 + 1342 + Ok((output.value.into_static(), entry_value.into_static())) 1343 + } 1344 + } 1345 + 1346 + /// Fetch an entry directly by its rkey, returning the EntryView and raw Entry. 1347 + /// 1348 + /// This bypasses notebook context entirely - useful for standalone entries 1349 + /// or when you have the rkey but not the notebook. 1350 + #[cfg(not(feature = "use-index"))] 1149 1351 fn fetch_entry_by_rkey( 1150 1352 &self, 1151 1353 ident: &jacquard::types::ident::AtIdentifier<'_>,
+4 -2
crates/weaver-index/migrations/clickhouse/025_edit_nodes.sql
··· 22 22 resource_rkey String DEFAULT '', 23 23 resource_collection LowCardinality(String) DEFAULT '', 24 24 25 - -- For diffs: reference to root 25 + -- For diffs: reference to root (StrongRef) 26 26 root_did String DEFAULT '', 27 27 root_rkey String DEFAULT '', 28 + root_cid String DEFAULT '', 28 29 29 - -- For diffs: reference to previous node 30 + -- For diffs: reference to previous node (StrongRef) 30 31 prev_did String DEFAULT '', 31 32 prev_rkey String DEFAULT '', 33 + prev_cid String DEFAULT '', 32 34 33 35 -- Whether this has inline diff data vs blob snapshot 34 36 has_inline_diff UInt8 DEFAULT 0,
+2
crates/weaver-index/migrations/clickhouse/026_edit_roots_mv.sql
··· 47 47 -- Roots don't have root/prev refs 48 48 '' as root_did, 49 49 '' as root_rkey, 50 + '' as root_cid, 50 51 '' as prev_did, 51 52 '' as prev_rkey, 53 + '' as prev_cid, 52 54 53 55 -- Roots always have snapshot 54 56 0 as has_inline_diff,
+6 -2
crates/weaver-index/migrations/clickhouse/027_edit_diffs_mv.sql
··· 44 44 '' 45 45 ) as resource_collection, 46 46 47 - -- Root reference 47 + -- Root reference (StrongRef: uri + cid) 48 48 splitByChar('/', replaceOne(toString(record.root.uri), 'at://', ''))[1] as root_did, 49 49 splitByChar('/', replaceOne(toString(record.root.uri), 'at://', ''))[3] as root_rkey, 50 + toString(record.root.cid) as root_cid, 50 51 51 - -- Prev reference (optional) 52 + -- Prev reference (optional StrongRef) 52 53 if(toString(record.prev.uri) != '', 53 54 splitByChar('/', replaceOne(toString(record.prev.uri), 'at://', ''))[1], 54 55 '') as prev_did, 55 56 if(toString(record.prev.uri) != '', 56 57 splitByChar('/', replaceOne(toString(record.prev.uri), 'at://', ''))[3], 57 58 '') as prev_rkey, 59 + if(toString(record.prev.uri) != '', 60 + toString(record.prev.cid), 61 + '') as prev_cid, 58 62 59 63 -- Check for inline diff vs snapshot 60 64 if(length(toString(record.inlineDiff)) > 0, 1, 0) as has_inline_diff,
+2 -1
crates/weaver-index/src/clickhouse.rs
··· 7 7 pub use client::{Client, TableSize}; 8 8 pub use migrations::{DbObject, MigrationResult, Migrator, ObjectType}; 9 9 pub use queries::{ 10 - EntryRow, HandleMappingRow, NotebookRow, ProfileCountsRow, ProfileRow, ProfileWithCounts, 10 + CollaboratorRow, EditHeadRow, EditNodeRow, EntryRow, HandleMappingRow, NotebookRow, 11 + ProfileCountsRow, ProfileRow, ProfileWithCounts, 11 12 }; 12 13 pub use resilient_inserter::{InserterConfig, ResilientRecordInserter}; 13 14 pub use schema::{
+6
crates/weaver-index/src/clickhouse/queries.rs
··· 2 2 //! 3 3 //! These modules add query methods to the ClickHouse Client via impl blocks. 4 4 5 + mod collab; 6 + mod collab_state; 5 7 mod contributors; 8 + mod edit; 6 9 mod identity; 7 10 mod notebooks; 8 11 mod profiles; 9 12 13 + pub use collab::PermissionRow; 14 + pub use collab_state::{CollaboratorRow, EditHeadRow}; 15 + pub use edit::EditNodeRow; 10 16 pub use identity::HandleMappingRow; 11 17 pub use notebooks::{EntryRow, NotebookRow}; 12 18 pub use profiles::{ProfileCountsRow, ProfileRow, ProfileWithCounts};
+93
crates/weaver-index/src/clickhouse/queries/collab.rs
··· 1 + //! Collaboration and permission queries 2 + 3 + use clickhouse::Row; 4 + use serde::Deserialize; 5 + use smol_str::SmolStr; 6 + 7 + use crate::clickhouse::Client; 8 + use crate::error::{ClickHouseError, IndexError}; 9 + 10 + /// Permission row from the permissions materialized view 11 + #[derive(Debug, Clone, Row, Deserialize)] 12 + pub struct PermissionRow { 13 + pub resource_did: SmolStr, 14 + pub resource_collection: SmolStr, 15 + pub resource_rkey: SmolStr, 16 + pub resource_uri: SmolStr, 17 + pub grantee_did: SmolStr, 18 + pub scope: SmolStr, 19 + #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] 20 + pub granted_at: chrono::DateTime<chrono::Utc>, 21 + } 22 + 23 + impl Client { 24 + /// Get all permissions for a resource by URI. 25 + /// 26 + /// Returns owner and all collaborators who have accepted invites. 27 + pub async fn get_resource_permissions( 28 + &self, 29 + resource_uri: &str, 30 + ) -> Result<Vec<PermissionRow>, IndexError> { 31 + let query = r#" 32 + SELECT 33 + resource_did, 34 + resource_collection, 35 + resource_rkey, 36 + resource_uri, 37 + grantee_did, 38 + scope, 39 + granted_at 40 + FROM permissions FINAL 41 + WHERE resource_uri = ? 42 + ORDER BY scope DESC, granted_at ASC 43 + "#; 44 + 45 + let rows = self 46 + .inner() 47 + .query(query) 48 + .bind(resource_uri) 49 + .fetch_all::<PermissionRow>() 50 + .await 51 + .map_err(|e| ClickHouseError::Query { 52 + message: "failed to get resource permissions".into(), 53 + source: e, 54 + })?; 55 + 56 + Ok(rows) 57 + } 58 + 59 + /// Check if a DID can edit a resource. 60 + /// 61 + /// Returns true if the DID is owner or collaborator. 62 + pub async fn can_edit_resource( 63 + &self, 64 + resource_uri: &str, 65 + did: &str, 66 + ) -> Result<bool, IndexError> { 67 + let query = r#" 68 + SELECT count(*) as cnt 69 + FROM permissions FINAL 70 + WHERE resource_uri = ? 71 + AND grantee_did = ? 72 + "#; 73 + 74 + #[derive(Row, Deserialize)] 75 + struct CountRow { 76 + cnt: u64, 77 + } 78 + 79 + let row = self 80 + .inner() 81 + .query(query) 82 + .bind(resource_uri) 83 + .bind(did) 84 + .fetch_one::<CountRow>() 85 + .await 86 + .map_err(|e| ClickHouseError::Query { 87 + message: "failed to check edit permission".into(), 88 + source: e, 89 + })?; 90 + 91 + Ok(row.cnt > 0) 92 + } 93 + }
+156
crates/weaver-index/src/clickhouse/queries/collab_state.rs
··· 1 + //! Collaboration state queries 2 + 3 + use clickhouse::Row; 4 + use serde::Deserialize; 5 + use smol_str::SmolStr; 6 + 7 + use crate::clickhouse::Client; 8 + use crate::error::{ClickHouseError, IndexError}; 9 + 10 + /// Collaborator row from the collaborators MV 11 + #[derive(Debug, Clone, Row, Deserialize)] 12 + pub struct CollaboratorRow { 13 + pub resource_uri: SmolStr, 14 + pub collaborator_did: SmolStr, 15 + pub inviter_did: SmolStr, 16 + pub invite_uri: SmolStr, 17 + pub accept_uri: SmolStr, 18 + pub scope: SmolStr, 19 + #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] 20 + pub invited_at: chrono::DateTime<chrono::Utc>, 21 + #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] 22 + pub accepted_at: chrono::DateTime<chrono::Utc>, 23 + } 24 + 25 + /// Edit head row from the edit_heads MV 26 + #[derive(Debug, Clone, Row, Deserialize)] 27 + pub struct EditHeadRow { 28 + pub resource_uri: SmolStr, 29 + pub head_did: SmolStr, 30 + pub head_rkey: SmolStr, 31 + pub head_cid: SmolStr, 32 + pub head_uri: SmolStr, 33 + pub head_type: SmolStr, 34 + pub root_did: SmolStr, 35 + pub root_rkey: SmolStr, 36 + pub root_cid: SmolStr, 37 + pub root_uri: SmolStr, 38 + #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] 39 + pub head_created_at: chrono::DateTime<chrono::Utc>, 40 + } 41 + 42 + impl Client { 43 + /// Get collaborators for a resource (matched invite+accept pairs). 44 + pub async fn get_collaborators( 45 + &self, 46 + resource_uri: &str, 47 + ) -> Result<Vec<CollaboratorRow>, IndexError> { 48 + let query = r#" 49 + SELECT 50 + resource_uri, 51 + collaborator_did, 52 + inviter_did, 53 + invite_uri, 54 + accept_uri, 55 + scope, 56 + invited_at, 57 + accepted_at 58 + FROM collaborators FINAL 59 + WHERE resource_uri = ? 60 + ORDER BY accepted_at ASC 61 + "#; 62 + 63 + let rows = self 64 + .inner() 65 + .query(query) 66 + .bind(resource_uri) 67 + .fetch_all::<CollaboratorRow>() 68 + .await 69 + .map_err(|e| ClickHouseError::Query { 70 + message: "failed to get collaborators".into(), 71 + source: e, 72 + })?; 73 + 74 + Ok(rows) 75 + } 76 + 77 + /// Get edit heads for a resource. 78 + /// 79 + /// Multiple heads means divergent branches. 80 + pub async fn get_edit_heads(&self, resource_uri: &str) -> Result<Vec<EditHeadRow>, IndexError> { 81 + let query = r#" 82 + SELECT 83 + resource_uri, 84 + head_did, 85 + head_rkey, 86 + head_cid, 87 + head_uri, 88 + head_type, 89 + root_did, 90 + root_rkey, 91 + root_cid, 92 + root_uri, 93 + head_created_at 94 + FROM edit_heads FINAL 95 + WHERE resource_uri = ? 96 + ORDER BY head_created_at DESC 97 + "#; 98 + 99 + let rows = self 100 + .inner() 101 + .query(query) 102 + .bind(resource_uri) 103 + .fetch_all::<EditHeadRow>() 104 + .await 105 + .map_err(|e| ClickHouseError::Query { 106 + message: "failed to get edit heads".into(), 107 + source: e, 108 + })?; 109 + 110 + Ok(rows) 111 + } 112 + 113 + /// Check if resource has divergent branches (more than one head). 114 + pub async fn has_divergence(&self, resource_uri: &str) -> Result<bool, IndexError> { 115 + let heads = self.get_edit_heads(resource_uri).await?; 116 + Ok(heads.len() > 1) 117 + } 118 + 119 + /// Get CID for a record from raw_records. 120 + pub async fn get_record_cid( 121 + &self, 122 + did: &str, 123 + collection: &str, 124 + rkey: &str, 125 + ) -> Result<Option<SmolStr>, IndexError> { 126 + #[derive(Row, Deserialize)] 127 + struct CidRow { 128 + cid: SmolStr, 129 + } 130 + 131 + let query = r#" 132 + SELECT cid 133 + FROM raw_records FINAL 134 + WHERE did = ? 135 + AND collection = ? 136 + AND rkey = ? 137 + AND operation != 'delete' 138 + LIMIT 1 139 + "#; 140 + 141 + let row = self 142 + .inner() 143 + .query(query) 144 + .bind(did) 145 + .bind(collection) 146 + .bind(rkey) 147 + .fetch_optional::<CidRow>() 148 + .await 149 + .map_err(|e| ClickHouseError::Query { 150 + message: "failed to get record cid".into(), 151 + source: e, 152 + })?; 153 + 154 + Ok(row.map(|r| r.cid)) 155 + } 156 + }
+107
crates/weaver-index/src/clickhouse/queries/edit.rs
··· 1 + //! Edit history queries 2 + 3 + use clickhouse::Row; 4 + use serde::Deserialize; 5 + use smol_str::SmolStr; 6 + 7 + use crate::clickhouse::Client; 8 + use crate::error::{ClickHouseError, IndexError}; 9 + 10 + /// Edit node row from the edit_nodes table 11 + #[derive(Debug, Clone, Row, Deserialize)] 12 + pub struct EditNodeRow { 13 + pub did: SmolStr, 14 + pub rkey: SmolStr, 15 + pub cid: SmolStr, 16 + pub collection: SmolStr, 17 + pub node_type: SmolStr, 18 + pub root_did: SmolStr, 19 + pub root_rkey: SmolStr, 20 + pub root_cid: SmolStr, 21 + pub prev_did: SmolStr, 22 + pub prev_rkey: SmolStr, 23 + pub prev_cid: SmolStr, 24 + pub has_inline_diff: u8, 25 + pub has_snapshot: u8, 26 + #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] 27 + pub created_at: chrono::DateTime<chrono::Utc>, 28 + } 29 + 30 + impl Client { 31 + /// Get edit history for a resource. 32 + /// 33 + /// Returns roots and diffs separately, ordered by created_at. 34 + /// The resource_uri should be an at:// URI for an entry or notebook. 35 + pub async fn get_edit_history( 36 + &self, 37 + resource_uri: &str, 38 + cursor: Option<i64>, 39 + after_rkey: Option<&str>, 40 + limit: i64, 41 + ) -> Result<Vec<EditNodeRow>, IndexError> { 42 + // Parse resource URI to extract did/collection/rkey 43 + let parts: Vec<&str> = resource_uri 44 + .strip_prefix("at://") 45 + .unwrap_or(resource_uri) 46 + .split('/') 47 + .collect(); 48 + 49 + if parts.len() < 3 { 50 + return Ok(Vec::new()); 51 + } 52 + 53 + let resource_did = parts[0]; 54 + let resource_collection = parts[1]; 55 + let resource_rkey = parts[2]; 56 + 57 + let query = r#" 58 + SELECT 59 + did, 60 + rkey, 61 + cid, 62 + collection, 63 + node_type, 64 + root_did, 65 + root_rkey, 66 + root_cid, 67 + prev_did, 68 + prev_rkey, 69 + prev_cid, 70 + has_inline_diff, 71 + has_snapshot, 72 + created_at 73 + FROM edit_nodes FINAL 74 + WHERE resource_did = ? 75 + AND resource_collection = ? 76 + AND resource_rkey = ? 77 + AND deleted_at = toDateTime64(0, 3) 78 + AND (? = 0 OR toUnixTimestamp64Milli(created_at) < ?) 79 + AND (? = '' OR rkey > ?) 80 + ORDER BY created_at DESC 81 + LIMIT ? 82 + "#; 83 + 84 + let cursor_val = cursor.unwrap_or(0); 85 + let after_rkey_val = after_rkey.unwrap_or(""); 86 + 87 + let rows = self 88 + .inner() 89 + .query(query) 90 + .bind(resource_did) 91 + .bind(resource_collection) 92 + .bind(resource_rkey) 93 + .bind(cursor_val) 94 + .bind(cursor_val) 95 + .bind(after_rkey_val) 96 + .bind(after_rkey_val) 97 + .bind(limit) 98 + .fetch_all::<EditNodeRow>() 99 + .await 100 + .map_err(|e| ClickHouseError::Query { 101 + message: "failed to get edit history".into(), 102 + source: e, 103 + })?; 104 + 105 + Ok(rows) 106 + } 107 + }
+3 -9
crates/weaver-index/src/endpoints/actor.rs
··· 163 163 } 164 164 } 165 165 166 - /// Convert SmolStr to Option<CowStr> if non-empty 167 - fn non_empty_str(s: &smol_str::SmolStr) -> Option<jacquard::CowStr<'static>> { 168 - if s.is_empty() { 169 - None 170 - } else { 171 - Some(s.to_cowstr().into_static()) 172 - } 173 - } 166 + // Re-export from parent for local use 167 + use super::non_empty_str; 174 168 175 169 /// Parse cursor string to i64 timestamp millis 176 170 fn parse_cursor(cursor: Option<&str>) -> Result<Option<i64>, XrpcErrorResponse> { ··· 426 420 } 427 421 428 422 /// Convert ProfileRow to ProfileDataView 429 - fn profile_to_data_view( 423 + pub fn profile_to_data_view( 430 424 profile: &ProfileRow, 431 425 ) -> Result<ProfileDataView<'static>, XrpcErrorResponse> { 432 426 use jacquard::types::string::Uri;
+327
crates/weaver-index/src/endpoints/collab.rs
··· 1 + //! Collaboration endpoint handlers 2 + 3 + use std::collections::HashMap; 4 + 5 + use axum::{Json, extract::State}; 6 + use jacquard::IntoStatic; 7 + use jacquard::cowstr::ToCowStr; 8 + use jacquard::types::datetime::Datetime; 9 + use jacquard::types::string::{AtUri, Cid, Did, Handle}; 10 + use jacquard_axum::ExtractXrpc; 11 + use jacquard_axum::service_auth::ExtractOptionalServiceAuth; 12 + 13 + use weaver_api::com_atproto::repo::strong_ref::StrongRef; 14 + use weaver_api::sh_weaver::actor::ProfileViewBasic; 15 + use weaver_api::sh_weaver::collab::get_collaboration_state::{ 16 + GetCollaborationStateOutput, GetCollaborationStateRequest, 17 + }; 18 + use weaver_api::sh_weaver::collab::get_resource_participants::{ 19 + GetResourceParticipantsOutput, GetResourceParticipantsRequest, 20 + }; 21 + use weaver_api::sh_weaver::collab::{CollaborationStateView, ParticipantStateView}; 22 + 23 + use crate::clickhouse::{CollaboratorRow, ProfileRow}; 24 + use crate::endpoints::actor::Viewer; 25 + use crate::endpoints::non_empty_str; 26 + use crate::endpoints::repo::XrpcErrorResponse; 27 + use crate::server::AppState; 28 + 29 + /// Handle sh.weaver.collab.getResourceParticipants 30 + /// 31 + /// Returns owner and collaborators who can edit the resource. 32 + pub async fn get_resource_participants( 33 + State(state): State<AppState>, 34 + ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 35 + ExtractXrpc(args): ExtractXrpc<GetResourceParticipantsRequest>, 36 + ) -> Result<Json<GetResourceParticipantsOutput<'static>>, XrpcErrorResponse> { 37 + let _viewer: Viewer = viewer; 38 + let viewer_did: Option<&str> = _viewer.as_ref().map(|v| v.did().as_str()); 39 + 40 + let resource_uri = args.resource.as_str(); 41 + 42 + // Get all permissions for the resource 43 + let permissions = state 44 + .clickhouse 45 + .get_resource_permissions(resource_uri) 46 + .await 47 + .map_err(|e| { 48 + tracing::error!("Failed to get resource permissions: {}", e); 49 + XrpcErrorResponse::internal_error("Database query failed") 50 + })?; 51 + 52 + if permissions.is_empty() { 53 + return Err(XrpcErrorResponse::not_found("Resource not found")); 54 + } 55 + 56 + // Collect all DIDs for profile hydration 57 + let all_dids: Vec<&str> = permissions.iter().map(|p| p.grantee_did.as_str()).collect(); 58 + 59 + // Batch fetch profiles 60 + let profiles = state 61 + .clickhouse 62 + .get_profiles_batch(&all_dids) 63 + .await 64 + .map_err(|e| { 65 + tracing::error!("Failed to batch fetch profiles: {}", e); 66 + XrpcErrorResponse::internal_error("Database query failed") 67 + })?; 68 + 69 + let profile_map: HashMap<&str, &ProfileRow> = 70 + profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 71 + 72 + // Find owner and build participants 73 + let mut owner: Option<ProfileViewBasic<'static>> = None; 74 + let mut participants: Vec<ProfileViewBasic<'static>> = Vec::new(); 75 + 76 + for perm in &permissions { 77 + let profile_view = if let Some(profile) = profile_map.get(perm.grantee_did.as_str()) { 78 + profile_to_view_basic(profile)? 79 + } else { 80 + // No profile found - skip (shouldn't happen if permissions table is consistent) 81 + continue; 82 + }; 83 + 84 + if perm.scope == "owner" { 85 + owner = Some(profile_view); 86 + } else { 87 + participants.push(profile_view); 88 + } 89 + } 90 + 91 + let owner = owner.ok_or_else(|| { 92 + tracing::error!("Resource has no owner in permissions"); 93 + XrpcErrorResponse::internal_error("Resource has no owner") 94 + })?; 95 + 96 + // Check if viewer can edit 97 + let viewer_can_edit = viewer_did.map(|v| all_dids.contains(&v)); 98 + 99 + Ok(Json( 100 + GetResourceParticipantsOutput { 101 + owner, 102 + participants, 103 + viewer_can_edit, 104 + extra_data: None, 105 + } 106 + .into_static(), 107 + )) 108 + } 109 + 110 + /// Convert ProfileRow to ProfileViewBasic directly 111 + pub fn profile_to_view_basic( 112 + profile: &ProfileRow, 113 + ) -> Result<ProfileViewBasic<'static>, XrpcErrorResponse> { 114 + let did = Did::new_owned(profile.did.clone()) 115 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid DID in profile"))?; 116 + 117 + let handle = Handle::new_owned(profile.handle.clone()) 118 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid handle in profile"))?; 119 + 120 + Ok(ProfileViewBasic::new() 121 + .did(did) 122 + .handle(handle) 123 + .maybe_display_name(non_empty_str(&profile.display_name)) 124 + .build()) 125 + } 126 + 127 + /// Handle sh.weaver.collab.getCollaborationState 128 + /// 129 + /// Returns full collaboration state for a resource. 130 + pub async fn get_collaboration_state( 131 + State(state): State<AppState>, 132 + ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 133 + ExtractXrpc(args): ExtractXrpc<GetCollaborationStateRequest>, 134 + ) -> Result<Json<GetCollaborationStateOutput<'static>>, XrpcErrorResponse> { 135 + let _viewer: Viewer = viewer; 136 + 137 + let resource_uri = args.resource.as_str(); 138 + 139 + // Get permissions for the resource 140 + let permissions = state 141 + .clickhouse 142 + .get_resource_permissions(resource_uri) 143 + .await 144 + .map_err(|e| { 145 + tracing::error!("Failed to get resource permissions: {}", e); 146 + XrpcErrorResponse::internal_error("Database query failed") 147 + })?; 148 + 149 + if permissions.is_empty() { 150 + return Err(XrpcErrorResponse::not_found("Resource not found")); 151 + } 152 + 153 + // Get collaborators (invite+accept pairs) for additional data 154 + let collaborators = state 155 + .clickhouse 156 + .get_collaborators(resource_uri) 157 + .await 158 + .map_err(|e| { 159 + tracing::error!("Failed to get collaborators: {}", e); 160 + XrpcErrorResponse::internal_error("Database query failed") 161 + })?; 162 + 163 + // Check for divergence 164 + let has_divergence = state 165 + .clickhouse 166 + .has_divergence(resource_uri) 167 + .await 168 + .map_err(|e| { 169 + tracing::error!("Failed to check divergence: {}", e); 170 + XrpcErrorResponse::internal_error("Database query failed") 171 + })?; 172 + 173 + // Collect all DIDs for profile hydration 174 + let all_dids: Vec<&str> = permissions.iter().map(|p| p.grantee_did.as_str()).collect(); 175 + 176 + // Batch fetch profiles 177 + let profiles = state 178 + .clickhouse 179 + .get_profiles_batch(&all_dids) 180 + .await 181 + .map_err(|e| { 182 + tracing::error!("Failed to batch fetch profiles: {}", e); 183 + XrpcErrorResponse::internal_error("Database query failed") 184 + })?; 185 + 186 + let profile_map: HashMap<&str, &ProfileRow> = 187 + profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 188 + 189 + // Build collaborator lookup for invite/accept URIs 190 + let collab_map: HashMap<&str, &CollaboratorRow> = collaborators 191 + .iter() 192 + .map(|c| (c.collaborator_did.as_str(), c)) 193 + .collect(); 194 + 195 + // Find owner and get resource CID 196 + let owner_perm = permissions 197 + .iter() 198 + .find(|p| p.scope == "owner") 199 + .ok_or_else(|| { 200 + tracing::error!("Resource has no owner in permissions"); 201 + XrpcErrorResponse::internal_error("Resource has no owner") 202 + })?; 203 + 204 + // Build resource StrongRef - look up the CID from the appropriate table 205 + let resource_uri_parsed = AtUri::new(resource_uri) 206 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid resource URI"))? 207 + .into_static(); 208 + 209 + // Look up the resource CID from raw_records 210 + let resource_cid = state 211 + .clickhouse 212 + .get_record_cid( 213 + &owner_perm.resource_did, 214 + &owner_perm.resource_collection, 215 + &owner_perm.resource_rkey, 216 + ) 217 + .await 218 + .map_err(|e| { 219 + tracing::error!("Failed to get resource CID: {}", e); 220 + XrpcErrorResponse::internal_error("Database query failed") 221 + })? 222 + .ok_or_else(|| XrpcErrorResponse::not_found("Resource not found in database"))?; 223 + 224 + let resource = StrongRef::new() 225 + .uri(resource_uri_parsed.clone()) 226 + .cid( 227 + Cid::new(resource_cid.as_bytes()) 228 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid resource CID"))? 229 + .into_static(), 230 + ) 231 + .build(); 232 + 233 + // Build participants 234 + let mut participants: Vec<ParticipantStateView<'static>> = Vec::new(); 235 + let mut first_collab_at: Option<chrono::DateTime<chrono::Utc>> = None; 236 + 237 + for perm in &permissions { 238 + let profile = profile_map 239 + .get(perm.grantee_did.as_str()) 240 + .ok_or_else(|| XrpcErrorResponse::internal_error("Missing profile for participant"))?; 241 + let collab = collab_map.get(perm.grantee_did.as_str()); 242 + 243 + // Track first collaborator time 244 + if perm.scope != "owner" { 245 + if let Some(c) = collab { 246 + match first_collab_at { 247 + None => first_collab_at = Some(c.accepted_at), 248 + Some(t) if c.accepted_at < t => first_collab_at = Some(c.accepted_at), 249 + _ => {} 250 + } 251 + } 252 + } 253 + 254 + let participant = build_participant_state(profile, collab, &perm.scope)?; 255 + participants.push(participant); 256 + } 257 + 258 + // Determine status 259 + let status = if collaborators.is_empty() { 260 + "solo" 261 + } else if has_divergence { 262 + "diverged" 263 + } else { 264 + "synced" 265 + }; 266 + 267 + let collab_state = CollaborationStateView::new() 268 + .resource(resource) 269 + .status(status) 270 + .participants(participants) 271 + .maybe_canonical_uri(Some(resource_uri_parsed)) 272 + .maybe_has_divergence(Some(has_divergence)) 273 + .maybe_first_collaborator_added_at( 274 + first_collab_at.map(|dt| Datetime::new(dt.fixed_offset())), 275 + ) 276 + .build(); 277 + 278 + Ok(Json( 279 + GetCollaborationStateOutput { 280 + value: collab_state, 281 + extra_data: None, 282 + } 283 + .into_static(), 284 + )) 285 + } 286 + 287 + /// Build ParticipantStateView from available data 288 + fn build_participant_state( 289 + profile: &ProfileRow, 290 + collab: Option<&&CollaboratorRow>, 291 + scope: &str, 292 + ) -> Result<ParticipantStateView<'static>, XrpcErrorResponse> { 293 + let user = profile_to_view_basic(profile)?; 294 + 295 + let role = match scope { 296 + "owner" => "owner", 297 + "collaborator" => "collaborator", 298 + _ => "unknown", 299 + }; 300 + 301 + let status = if collab.is_some() { 302 + "active" 303 + } else { 304 + "pending" 305 + }; 306 + 307 + // Parse URIs if we have collab data 308 + let (invite_uri, accept_uri) = if let Some(c) = collab { 309 + let inv = AtUri::new(c.invite_uri.as_str()) 310 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid invite URI"))? 311 + .into_static(); 312 + let acc = AtUri::new(c.accept_uri.as_str()) 313 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid accept URI"))? 314 + .into_static(); 315 + (Some(inv), Some(acc)) 316 + } else { 317 + (None, None) 318 + }; 319 + 320 + Ok(ParticipantStateView::new() 321 + .role(role) 322 + .status(status) 323 + .user(user) 324 + .maybe_invite_uri(invite_uri) 325 + .maybe_accept_uri(accept_uri) 326 + .build()) 327 + }
+186
crates/weaver-index/src/endpoints/edit.rs
··· 1 + //! Edit endpoint handlers 2 + 3 + use std::collections::HashMap; 4 + 5 + use axum::{Json, extract::State}; 6 + use jacquard::IntoStatic; 7 + use jacquard::cowstr::ToCowStr; 8 + use jacquard::types::datetime::Datetime; 9 + use jacquard::types::string::{AtUri, Cid}; 10 + use jacquard_axum::ExtractXrpc; 11 + use jacquard_axum::service_auth::ExtractOptionalServiceAuth; 12 + 13 + use weaver_api::com_atproto::repo::strong_ref::StrongRef; 14 + use weaver_api::sh_weaver::edit::EditHistoryEntry; 15 + use weaver_api::sh_weaver::edit::get_edit_history::{GetEditHistoryOutput, GetEditHistoryRequest}; 16 + 17 + use crate::clickhouse::{EditNodeRow, ProfileRow}; 18 + use crate::endpoints::actor::Viewer; 19 + use crate::endpoints::collab::profile_to_view_basic; 20 + use crate::endpoints::repo::XrpcErrorResponse; 21 + use crate::server::AppState; 22 + 23 + /// Handle sh.weaver.edit.getEditHistory 24 + /// 25 + /// Returns edit history (roots and diffs) for a resource. 26 + pub async fn get_edit_history( 27 + State(state): State<AppState>, 28 + ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 29 + ExtractXrpc(args): ExtractXrpc<GetEditHistoryRequest>, 30 + ) -> Result<Json<GetEditHistoryOutput<'static>>, XrpcErrorResponse> { 31 + let _viewer: Viewer = viewer; 32 + 33 + let resource_uri = args.resource.as_str(); 34 + let limit = args.limit.unwrap_or(50).min(100).max(1); 35 + 36 + // Parse cursor as millisecond timestamp 37 + let cursor = args 38 + .cursor 39 + .as_deref() 40 + .map(|c| c.parse::<i64>()) 41 + .transpose() 42 + .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))?; 43 + 44 + let after_rkey = args.after_rkey.as_deref(); 45 + 46 + // Fetch edit nodes 47 + let nodes = state 48 + .clickhouse 49 + .get_edit_history(resource_uri, cursor, after_rkey, limit + 1) 50 + .await 51 + .map_err(|e| { 52 + tracing::error!("Failed to get edit history: {}", e); 53 + XrpcErrorResponse::internal_error("Database query failed") 54 + })?; 55 + 56 + // Check if there are more results 57 + let has_more = nodes.len() > limit as usize; 58 + let nodes: Vec<_> = nodes.into_iter().take(limit as usize).collect(); 59 + 60 + // Collect unique author DIDs 61 + let author_dids: Vec<&str> = nodes.iter().map(|n| n.did.as_str()).collect(); 62 + let unique_dids: Vec<&str> = author_dids 63 + .iter() 64 + .copied() 65 + .collect::<std::collections::HashSet<_>>() 66 + .into_iter() 67 + .collect(); 68 + 69 + // Batch fetch profiles 70 + let profiles = state 71 + .clickhouse 72 + .get_profiles_batch(&unique_dids) 73 + .await 74 + .map_err(|e| { 75 + tracing::error!("Failed to batch fetch profiles: {}", e); 76 + XrpcErrorResponse::internal_error("Database query failed") 77 + })?; 78 + 79 + let profile_map: HashMap<&str, &ProfileRow> = 80 + profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 81 + 82 + // Separate roots and diffs, building EditHistoryEntry for each 83 + let mut roots = Vec::new(); 84 + let mut diffs = Vec::new(); 85 + 86 + for node in &nodes { 87 + let entry = node_to_history_entry(node, &profile_map)?; 88 + 89 + if node.node_type == "root" { 90 + roots.push(entry); 91 + } else { 92 + diffs.push(entry); 93 + } 94 + } 95 + 96 + // Build cursor from last node's created_at 97 + let next_cursor = if has_more { 98 + nodes 99 + .last() 100 + .map(|n| n.created_at.timestamp_millis().to_cowstr().into_static()) 101 + } else { 102 + None 103 + }; 104 + 105 + Ok(Json( 106 + GetEditHistoryOutput { 107 + roots, 108 + diffs, 109 + cursor: next_cursor, 110 + extra_data: None, 111 + } 112 + .into_static(), 113 + )) 114 + } 115 + 116 + /// Convert EditNodeRow to EditHistoryEntry 117 + fn node_to_history_entry( 118 + node: &EditNodeRow, 119 + profile_map: &HashMap<&str, &ProfileRow>, 120 + ) -> Result<EditHistoryEntry<'static>, XrpcErrorResponse> { 121 + let author = profile_map 122 + .get(node.did.as_str()) 123 + .map(|p| profile_to_view_basic(p)) 124 + .transpose()? 125 + .ok_or_else(|| XrpcErrorResponse::internal_error("Author profile not found"))?; 126 + 127 + // Build URI 128 + let uri = AtUri::new(&format!( 129 + "at://{}/{}/{}", 130 + node.did, node.collection, node.rkey 131 + )) 132 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid AT URI"))? 133 + .into_static(); 134 + 135 + let cid = Cid::new(node.cid.as_bytes()) 136 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid CID"))? 137 + .into_static(); 138 + 139 + // Build optional StrongRefs for diffs 140 + let root_ref = if !node.root_cid.is_empty() { 141 + let root_uri = AtUri::new(&format!( 142 + "at://{}/sh.weaver.edit.root/{}", 143 + node.root_did, node.root_rkey 144 + )) 145 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid root URI"))? 146 + .into_static(); 147 + 148 + let root_cid = Cid::new(node.root_cid.as_bytes()) 149 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid root CID"))? 150 + .into_static(); 151 + 152 + Some(StrongRef::new().uri(root_uri).cid(root_cid).build()) 153 + } else { 154 + None 155 + }; 156 + 157 + let prev_ref = if !node.prev_cid.is_empty() { 158 + let prev_uri = AtUri::new(&format!( 159 + "at://{}/sh.weaver.edit.diff/{}", 160 + node.prev_did, node.prev_rkey 161 + )) 162 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid prev URI"))? 163 + .into_static(); 164 + 165 + let prev_cid = Cid::new(node.prev_cid.as_bytes()) 166 + .map_err(|_| XrpcErrorResponse::internal_error("Invalid prev CID"))? 167 + .into_static(); 168 + 169 + Some(StrongRef::new().uri(prev_uri).cid(prev_cid).build()) 170 + } else { 171 + None 172 + }; 173 + 174 + let created_at = Datetime::new(node.created_at.fixed_offset()); 175 + 176 + Ok(EditHistoryEntry::new() 177 + .uri(uri) 178 + .cid(cid) 179 + .author(author) 180 + .created_at(created_at) 181 + .r#type(node.node_type.clone()) 182 + .maybe_has_inline_diff(Some(node.has_inline_diff == 1)) 183 + .maybe_prev_ref(prev_ref) 184 + .maybe_root_ref(root_ref) 185 + .build()) 186 + }
+16
crates/weaver-index/src/endpoints/mod.rs
··· 1 1 //! XRPC endpoint handlers for the appview. 2 2 3 + use jacquard::CowStr; 4 + use jacquard::IntoStatic; 5 + use jacquard::cowstr::ToCowStr; 6 + use smol_str::SmolStr; 7 + 3 8 pub mod actor; 9 + pub mod collab; 10 + pub mod edit; 4 11 pub mod notebook; 5 12 pub mod repo; 13 + 14 + /// Convert SmolStr to Option<CowStr> if non-empty 15 + pub fn non_empty_str(s: &SmolStr) -> Option<CowStr<'static>> { 16 + if s.is_empty() { 17 + None 18 + } else { 19 + Some(s.to_cowstr().into_static()) 20 + } 21 + }
+113
crates/weaver-index/src/endpoints/notebook.rs
··· 10 10 use jacquard_axum::ExtractXrpc; 11 11 use jacquard_axum::service_auth::ExtractOptionalServiceAuth; 12 12 use smol_str::SmolStr; 13 + use weaver_api::com_atproto::repo::strong_ref::StrongRef; 13 14 use weaver_api::sh_weaver::actor::{ProfileDataView, ProfileDataViewInner, ProfileView}; 14 15 use weaver_api::sh_weaver::notebook::{ 15 16 AuthorListView, BookEntryRef, BookEntryView, EntryView, FeedEntryView, NotebookView, 16 17 get_book_entry::{GetBookEntryOutput, GetBookEntryRequest}, 17 18 get_entry::{GetEntryOutput, GetEntryRequest}, 18 19 get_entry_feed::{GetEntryFeedOutput, GetEntryFeedRequest}, 20 + get_notebook::{GetNotebookOutput, GetNotebookRequest}, 19 21 get_notebook_feed::{GetNotebookFeedOutput, GetNotebookFeedRequest}, 20 22 resolve_entry::{ResolveEntryOutput, ResolveEntryRequest}, 21 23 resolve_notebook::{ResolveNotebookOutput, ResolveNotebookRequest}, ··· 192 194 notebook, 193 195 entries, 194 196 entry_cursor: next_cursor, 197 + extra_data: None, 198 + } 199 + .into_static(), 200 + )) 201 + } 202 + 203 + /// Handle sh.weaver.notebook.getNotebook 204 + /// 205 + /// Gets a notebook by AT URI, returns notebook view with entry refs. 206 + pub async fn get_notebook( 207 + State(state): State<AppState>, 208 + ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 209 + ExtractXrpc(args): ExtractXrpc<GetNotebookRequest>, 210 + ) -> Result<Json<GetNotebookOutput<'static>>, XrpcErrorResponse> { 211 + let _viewer: Viewer = viewer; 212 + 213 + // Parse the AT URI to extract authority and rkey 214 + let uri = &args.notebook; 215 + let authority = uri.authority(); 216 + let rkey = uri 217 + .rkey() 218 + .ok_or_else(|| XrpcErrorResponse::invalid_request("URI must include rkey"))?; 219 + let rkey_str = rkey.as_ref(); 220 + 221 + // Resolve authority to DID (could be handle or DID) 222 + let did = resolve_actor(&state, authority).await?; 223 + let did_str = did.as_str(); 224 + 225 + // Fetch notebook by DID + rkey 226 + let notebook_row = state 227 + .clickhouse 228 + .get_notebook(did_str, rkey_str) 229 + .await 230 + .map_err(|e| { 231 + tracing::error!("Failed to get notebook: {}", e); 232 + XrpcErrorResponse::internal_error("Database query failed") 233 + })? 234 + .ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?; 235 + 236 + // Fetch notebook contributors 237 + let notebook_contributors = state 238 + .clickhouse 239 + .get_notebook_contributors(did_str, rkey_str) 240 + .await 241 + .map_err(|e| { 242 + tracing::error!("Failed to get notebook contributors: {}", e); 243 + XrpcErrorResponse::internal_error("Database query failed") 244 + })?; 245 + 246 + // Collect all author DIDs for batch hydration 247 + let mut all_author_dids: HashSet<&str> = 248 + notebook_contributors.iter().map(|s| s.as_str()).collect(); 249 + for did in &notebook_row.author_dids { 250 + all_author_dids.insert(did.as_str()); 251 + } 252 + 253 + // Batch fetch profiles 254 + let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 255 + let profiles = state 256 + .clickhouse 257 + .get_profiles_batch(&author_dids_vec) 258 + .await 259 + .map_err(|e| { 260 + tracing::error!("Failed to batch fetch profiles: {}", e); 261 + XrpcErrorResponse::internal_error("Database query failed") 262 + })?; 263 + 264 + let profile_map: HashMap<&str, &ProfileRow> = 265 + profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 266 + 267 + // Build NotebookView 268 + let notebook_uri = AtUri::new(&notebook_row.uri).map_err(|e| { 269 + tracing::error!("Invalid notebook URI in db: {}", e); 270 + XrpcErrorResponse::internal_error("Invalid URI stored") 271 + })?; 272 + 273 + let notebook_cid = Cid::new(notebook_row.cid.as_bytes()).map_err(|e| { 274 + tracing::error!("Invalid notebook CID in db: {}", e); 275 + XrpcErrorResponse::internal_error("Invalid CID stored") 276 + })?; 277 + 278 + let authors = hydrate_authors(&notebook_contributors, &profile_map)?; 279 + let record = parse_record_json(&notebook_row.record)?; 280 + 281 + let notebook = NotebookView::new() 282 + .uri(notebook_uri.into_static()) 283 + .cid(notebook_cid.into_static()) 284 + .authors(authors) 285 + .record(record.clone()) 286 + .indexed_at(notebook_row.indexed_at.fixed_offset()) 287 + .maybe_title(non_empty_cowstr(&notebook_row.title)) 288 + .maybe_path(non_empty_cowstr(&notebook_row.path)) 289 + .build(); 290 + 291 + // Deserialize Book from record to get entry_list 292 + let book: weaver_api::sh_weaver::notebook::book::Book = 293 + jacquard::from_data(&record).map_err(|e| { 294 + tracing::error!("Failed to deserialize Book record: {}", e); 295 + XrpcErrorResponse::internal_error("Invalid Book record") 296 + })?; 297 + 298 + let entries: Vec<StrongRef<'static>> = book 299 + .entry_list 300 + .into_iter() 301 + .map(|r| r.into_static()) 302 + .collect(); 303 + 304 + Ok(Json( 305 + GetNotebookOutput { 306 + notebook, 307 + entries, 195 308 extra_data: None, 196 309 } 197 310 .into_static(),
+17 -3
crates/weaver-index/src/server.rs
··· 19 19 get_actor_entries::GetActorEntriesRequest, get_actor_notebooks::GetActorNotebooksRequest, 20 20 get_profile::GetProfileRequest, 21 21 }; 22 + use weaver_api::sh_weaver::collab::get_collaboration_state::GetCollaborationStateRequest; 23 + use weaver_api::sh_weaver::collab::get_resource_participants::GetResourceParticipantsRequest; 24 + use weaver_api::sh_weaver::edit::get_edit_history::GetEditHistoryRequest; 22 25 use weaver_api::sh_weaver::notebook::{ 23 26 get_book_entry::GetBookEntryRequest, get_entry::GetEntryRequest, 24 - get_entry_feed::GetEntryFeedRequest, get_notebook_feed::GetNotebookFeedRequest, 25 - resolve_entry::ResolveEntryRequest, resolve_notebook::ResolveNotebookRequest, 27 + get_entry_feed::GetEntryFeedRequest, get_notebook::GetNotebookRequest, 28 + get_notebook_feed::GetNotebookFeedRequest, resolve_entry::ResolveEntryRequest, 29 + resolve_notebook::ResolveNotebookRequest, 26 30 }; 27 31 28 32 use crate::clickhouse::Client; 29 33 use crate::config::ShardConfig; 30 - use crate::endpoints::{actor, notebook, repo}; 34 + use crate::endpoints::{actor, collab, edit, notebook, repo}; 31 35 use crate::error::{IndexError, ServerError}; 32 36 use crate::sqlite::ShardRouter; 33 37 ··· 94 98 .merge(ResolveNotebookRequest::into_router( 95 99 notebook::resolve_notebook, 96 100 )) 101 + .merge(GetNotebookRequest::into_router(notebook::get_notebook)) 97 102 .merge(GetEntryRequest::into_router(notebook::get_entry)) 98 103 .merge(ResolveEntryRequest::into_router(notebook::resolve_entry)) 99 104 .merge(GetNotebookFeedRequest::into_router( ··· 101 106 )) 102 107 .merge(GetEntryFeedRequest::into_router(notebook::get_entry_feed)) 103 108 .merge(GetBookEntryRequest::into_router(notebook::get_book_entry)) 109 + // sh.weaver.collab.* endpoints 110 + .merge(GetResourceParticipantsRequest::into_router( 111 + collab::get_resource_participants, 112 + )) 113 + .merge(GetCollaborationStateRequest::into_router( 114 + collab::get_collaboration_state, 115 + )) 116 + // sh.weaver.edit.* endpoints 117 + .merge(GetEditHistoryRequest::into_router(edit::get_edit_history)) 104 118 .layer(TraceLayer::new_for_http()) 105 119 .with_state(state) 106 120 .merge(did_web_router(did_doc))