at main 2653 lines 105 kB view raw
1use jacquard::types::ident::AtIdentifier; 2// Re-export view types for use elsewhere 3pub use weaver_api::sh_weaver::notebook::{ 4 AuthorListView, BookEntryRef, BookEntryView, EntryView, NotebookView, PermissionGrant, 5 PermissionsState, 6}; 7 8// Re-export jacquard for convenience 9use crate::constellation::{GetBacklinksQuery, RecordId}; 10use crate::error::WeaverError; 11#[allow(unused_imports)] 12use crate::{PublishResult, W_TICKER, normalize_title_path}; 13pub use jacquard; 14use jacquard::bytes::Bytes; 15#[allow(unused_imports)] 16use jacquard::client::{AgentError, AgentErrorKind, AgentSession, AgentSessionExt}; 17use jacquard::error::ClientError; 18use jacquard::prelude::*; 19use jacquard::smol_str::SmolStr; 20use jacquard::types::blob::{BlobRef, MimeType}; 21use jacquard::types::string::{AtUri, Datetime, Did, RecordKey, Rkey}; 22#[allow(unused_imports)] 23use jacquard::types::tid::Tid; 24use jacquard::types::uri::Uri; 25use jacquard::url::Url; 26use jacquard::{CowStr, IntoStatic}; 27use mime_sniffer::MimeTypeSniffer; 28#[allow(unused_imports)] 29use std::path::Path; 30use weaver_api::com_atproto::repo::strong_ref::StrongRef; 31use weaver_api::sh_weaver::notebook::entry; 32use weaver_api::sh_weaver::publish::blob::Blob as PublishedBlob; 33 34const CONSTELLATION_URL: &str = "https://constellation.microcosm.blue"; 35 36/// Strip trailing punctuation that URL parsers commonly eat 37/// (period, comma, semicolon, colon, exclamation, question mark) 38fn strip_trailing_punctuation(s: &str) -> &str { 39 s.trim_end_matches(['.', ',', ';', ':', '!', '?']) 40} 41 42/// Check if a search term matches a value, with fallback to stripped punctuation 43pub fn title_matches(value: &str, search: &str) -> bool { 44 // Exact match first 45 if value == search { 46 return true; 47 } 48 // Try with trailing punctuation stripped from search term 49 let stripped_search = strip_trailing_punctuation(search); 50 if stripped_search != search && value == stripped_search { 51 return true; 52 } 53 // Try with trailing punctuation stripped from value (for titles ending in punctuation) 54 let stripped_value = strip_trailing_punctuation(value); 55 if stripped_value != value && stripped_value == search { 56 return true; 57 } 58 false 59} 60 61/// Extension trait providing weaver-specific multi-step operations on Agent 62/// 63/// This trait extends jacquard's Agent with notebook-specific workflows that 64/// involve multiple atproto operations (uploading blobs, creating records, etc.) 65/// 66/// For single-step operations, use jacquard's built-in methods directly: 67/// - `agent.create_record()` - Create a single record 68/// - `agent.get_record()` - Get a single record 69/// - `agent.upload_blob()` - Upload a single blob 70/// 71/// This trait is for multi-step workflows that coordinate between multiple operations. 72pub trait WeaverExt: AgentSessionExt + XrpcExt + Send + Sync + Sized { 73 /// Publish a blob to the user's PDS 74 /// 75 /// Multi-step workflow: 76 /// 1. Upload blob to PDS 77 /// 2. Create blob record with CID 78 /// 79 /// Returns the AT-URI of the published blob 80 fn publish_blob<'a>( 81 &'a self, 82 blob: Bytes, 83 url_path: &'a str, 84 rkey: Option<RecordKey<Rkey<'a>>>, 85 ) -> impl Future<Output = Result<(StrongRef<'a>, PublishedBlob<'a>), WeaverError>> + 'a { 86 async move { 87 let mime_type = 88 MimeType::new_owned(blob.sniff_mime_type().unwrap_or("application/octet-stream")); 89 90 let blob = self.upload_blob(blob, mime_type.into_static()).await?; 91 let publish_record = PublishedBlob::new() 92 .path(url_path) 93 .upload(BlobRef::Blob(blob)) 94 .build(); 95 let record_key = match rkey { 96 Some(key) => key, 97 None => { 98 let tid = W_TICKER.lock().await.next(None); 99 RecordKey(Rkey::new_owned(tid.as_str())?) 100 } 101 }; 102 let record = self 103 .create_record(publish_record.clone(), Some(record_key)) 104 .await?; 105 let strong_ref = StrongRef::new().uri(record.uri).cid(record.cid).build(); 106 107 Ok((strong_ref, publish_record)) 108 } 109 } 110 111 fn confirm_record_ref<'a>( 112 &'a self, 113 uri: &'a AtUri<'a>, 114 ) -> impl Future<Output = Result<StrongRef<'static>, WeaverError>> + 'a { 115 async move { 116 let record = self.fetch_record_slingshot(uri).await?; 117 let cid = record.cid.ok_or_else(|| { 118 AgentError::from(ClientError::invalid_request("Record missing CID")) 119 })?; 120 Ok(StrongRef::new() 121 .uri(record.uri.into_static()) 122 .cid(cid.into_static()) 123 .build()) 124 } 125 } 126 127 /// Fetch a notebook by URI and return its entry list 128 /// 129 /// Returns Ok(Some((uri, entry_list))) if the notebook exists and can be parsed, 130 /// Ok(None) if the notebook doesn't exist, 131 /// Err if there's a network or parsing error. 132 fn get_notebook_by_uri( 133 &self, 134 uri: &str, 135 ) -> impl Future<Output = Result<Option<(AtUri<'static>, Vec<StrongRef<'static>>)>, WeaverError>> 136 where 137 Self: Sized, 138 { 139 async move { 140 use weaver_api::sh_weaver::notebook::book::Book; 141 142 let at_uri = AtUri::new(uri).map_err(|e| { 143 WeaverError::InvalidNotebook(format!("Invalid notebook URI: {}", e)) 144 })?; 145 146 let response = match self.get_record::<Book>(&at_uri).await { 147 Ok(r) => r, 148 Err(_) => return Ok(None), // Notebook doesn't exist 149 }; 150 151 let output = match response.into_output() { 152 Ok(o) => o, 153 Err(_) => return Ok(None), // Failed to parse 154 }; 155 156 let entries = output 157 .value 158 .entry_list 159 .iter() 160 .cloned() 161 .map(IntoStatic::into_static) 162 .collect(); 163 164 Ok(Some((at_uri.into_static(), entries))) 165 } 166 } 167 168 /// Find or create a notebook by title, returning its URI and entry list 169 /// 170 /// If the notebook doesn't exist, creates it with the given DID as author. 171 fn upsert_notebook( 172 &self, 173 title: &str, 174 author_did: &Did<'_>, 175 ) -> impl Future<Output = Result<(AtUri<'static>, Vec<StrongRef<'static>>), WeaverError>> 176 where 177 Self: Sized, 178 { 179 async move { 180 use jacquard::types::collection::Collection; 181 use jacquard::types::nsid::Nsid; 182 use weaver_api::com_atproto::repo::list_records::ListRecords; 183 use weaver_api::sh_weaver::notebook::book::Book; 184 185 // Find the PDS for this DID 186 let pds_url = self.pds_for_did(author_did).await.map_err(|e| { 187 AgentError::from(ClientError::from(e).with_context("Failed to resolve PDS for DID")) 188 })?; 189 190 // Search for existing notebook with this title (paginated) 191 let mut cursor: Option<CowStr<'static>> = None; 192 loop { 193 let resp = self 194 .xrpc(pds_url.clone()) 195 .send( 196 &ListRecords::new() 197 .repo(author_did.clone()) 198 .collection(Nsid::raw(Book::NSID)) 199 .limit(100) 200 .maybe_cursor(cursor.clone()) 201 .build(), 202 ) 203 .await 204 .map_err(|e| AgentError::from(ClientError::from(e)))?; 205 206 let list = match resp.parse() { 207 Ok(l) => l, 208 Err(_) => break, // Parse error, stop searching 209 }; 210 211 for record in list.records { 212 let notebook: Book = jacquard::from_data(&record.value).map_err(|_| { 213 AgentError::from(ClientError::invalid_request( 214 "Failed to parse notebook record", 215 )) 216 })?; 217 if let Some(book_title) = notebook.title 218 && book_title == title 219 { 220 let entries = notebook 221 .entry_list 222 .iter() 223 .cloned() 224 .map(IntoStatic::into_static) 225 .collect(); 226 return Ok((record.uri.into_static(), entries)); 227 } 228 } 229 230 match list.cursor { 231 Some(c) => cursor = Some(c.into_static()), 232 None => break, // No more pages 233 } 234 } 235 236 // Notebook doesn't exist, create it 237 use weaver_api::sh_weaver::actor::Author; 238 let path = normalize_title_path(title); 239 let author = Author::new().did(author_did.clone()).build(); 240 let book = Book::new() 241 .authors(vec![author]) 242 .entry_list(vec![]) 243 .maybe_title(Some(title.into())) 244 .maybe_path(Some(path.into())) 245 .maybe_created_at(Some(jacquard::types::string::Datetime::now())) 246 .build(); 247 248 let response = self.create_record(book, None).await?; 249 Ok((response.uri, Vec::new())) 250 } 251 } 252 253 /// Find or create an entry within a notebook (with pre-fetched notebook data) 254 /// 255 /// This variant accepts notebook URI and entry_refs directly to avoid redundant 256 /// notebook lookups when the caller has already fetched this data. 257 /// 258 /// Returns (entry_ref, notebook_uri, was_created) 259 fn upsert_entry_with_notebook( 260 &self, 261 notebook_uri: AtUri<'static>, 262 entry_refs: Vec<StrongRef<'static>>, 263 entry_title: &str, 264 entry: entry::Entry<'_>, 265 existing_rkey: Option<&str>, 266 ) -> impl Future<Output = Result<(StrongRef<'static>, AtUri<'static>, bool), WeaverError>> 267 where 268 Self: Sized, 269 { 270 async move { 271 // If we have an existing rkey, try to find and update that specific entry 272 if let Some(rkey) = existing_rkey { 273 // Check if this entry exists in the notebook by comparing rkeys 274 for entry_ref in &entry_refs { 275 let ref_rkey = entry_ref.uri.rkey().map(|r| r.0.as_str()); 276 if ref_rkey == Some(rkey) { 277 // Found it - update 278 let output = self 279 .update_record::<entry::Entry>(&entry_ref.uri, |e| { 280 e.content = entry.content.clone(); 281 e.title = entry.title.clone(); 282 e.path = entry.path.clone(); 283 e.embeds = entry.embeds.clone(); 284 e.tags = entry.tags.clone(); 285 }) 286 .await?; 287 let updated_ref = StrongRef::new() 288 .uri(output.uri.into_static()) 289 .cid(output.cid.into_static()) 290 .build(); 291 return Ok((updated_ref, notebook_uri, false)); 292 } 293 } 294 295 // Entry with this rkey not in notebook - create with specific rkey 296 let response = self 297 .create_record(entry, Some(RecordKey::any(rkey)?)) 298 .await?; 299 let new_ref = StrongRef::new() 300 .uri(response.uri.clone().into_static()) 301 .cid(response.cid.clone().into_static()) 302 .build(); 303 304 use weaver_api::sh_weaver::notebook::book::Book; 305 let notebook_entry_ref = StrongRef::new() 306 .uri(response.uri.into_static()) 307 .cid(response.cid.into_static()) 308 .build(); 309 310 self.update_record::<Book>(&notebook_uri, |book| { 311 book.entry_list.push(notebook_entry_ref); 312 }) 313 .await?; 314 315 return Ok((new_ref, notebook_uri, true)); 316 } 317 318 // No existing rkey - use title-based matching 319 320 // Fast path: if notebook is empty, skip search and create directly 321 if entry_refs.is_empty() { 322 let response = self.create_record(entry, None).await?; 323 let new_ref = StrongRef::new() 324 .uri(response.uri.clone().into_static()) 325 .cid(response.cid.clone().into_static()) 326 .build(); 327 328 use weaver_api::sh_weaver::notebook::book::Book; 329 let notebook_entry_ref = StrongRef::new() 330 .uri(response.uri.into_static()) 331 .cid(response.cid.into_static()) 332 .build(); 333 334 self.update_record::<Book>(&notebook_uri, |book| { 335 book.entry_list.push(notebook_entry_ref); 336 }) 337 .await?; 338 339 return Ok((new_ref, notebook_uri, true)); 340 } 341 342 // Check if entry with this title exists in the notebook 343 // O(n) network calls - unavoidable without title indexing 344 for entry_ref in &entry_refs { 345 let existing = self 346 .get_record::<entry::Entry>(&entry_ref.uri) 347 .await 348 .map_err(|e| AgentError::from(ClientError::from(e)))?; 349 if let Ok(existing_entry) = existing.parse() { 350 if existing_entry.value.title == entry_title { 351 // Update existing entry 352 let output = self 353 .update_record::<entry::Entry>(&entry_ref.uri, |e| { 354 e.content = entry.content.clone(); 355 e.embeds = entry.embeds.clone(); 356 e.tags = entry.tags.clone(); 357 e.updated_at = Some(Datetime::now()); 358 }) 359 .await?; 360 let updated_ref = StrongRef::new() 361 .uri(output.uri.into_static()) 362 .cid(output.cid.into_static()) 363 .build(); 364 return Ok((updated_ref, notebook_uri, false)); 365 } 366 } 367 } 368 369 // Entry doesn't exist, create it 370 let response = self.create_record(entry, None).await?; 371 let new_ref = StrongRef::new() 372 .uri(response.uri.clone().into_static()) 373 .cid(response.cid.clone().into_static()) 374 .build(); 375 376 // Add to notebook's entry_list 377 use weaver_api::sh_weaver::notebook::book::Book; 378 let notebook_entry_ref = StrongRef::new() 379 .uri(response.uri.into_static()) 380 .cid(response.cid.into_static()) 381 .build(); 382 383 self.update_record::<Book>(&notebook_uri, |book| { 384 book.entry_list.push(notebook_entry_ref); 385 }) 386 .await?; 387 388 Ok((new_ref, notebook_uri, true)) 389 } 390 } 391 392 /// Find or create an entry within a notebook 393 /// 394 /// Multi-step workflow: 395 /// 1. Find the notebook by title 396 /// 2. If existing_rkey is provided, match by rkey; otherwise match by title 397 /// 3. If found: update the entry with new content 398 /// 4. If not found: create new entry and append to notebook's entry_list 399 /// 400 /// The `existing_rkey` parameter allows updating an entry even if its title changed, 401 /// and enables pre-generating rkeys for path rewriting before publish. 402 /// 403 /// Returns (entry_ref, notebook_uri, was_created) 404 fn upsert_entry( 405 &self, 406 notebook_title: &str, 407 entry_title: &str, 408 entry: entry::Entry<'_>, 409 existing_rkey: Option<&str>, 410 ) -> impl Future<Output = Result<(StrongRef<'static>, AtUri<'static>, bool), WeaverError>> 411 where 412 Self: Sized, 413 { 414 async move { 415 // Get our own DID 416 let (did, _) = self.session_info().await.ok_or_else(|| { 417 AgentError::from(ClientError::invalid_request("No session info available")) 418 })?; 419 420 // Find or create notebook 421 let (notebook_uri, entry_refs) = self.upsert_notebook(notebook_title, &did).await?; 422 423 // Delegate to the variant with pre-fetched notebook data 424 self.upsert_entry_with_notebook( 425 notebook_uri, 426 entry_refs, 427 entry_title, 428 entry, 429 existing_rkey, 430 ) 431 .await 432 } 433 } 434 435 /// View functions - generic versions that work with any Agent 436 437 /// Fetch a notebook and construct NotebookView with author profiles 438 #[cfg(feature = "use-index")] 439 fn view_notebook( 440 &self, 441 uri: &AtUri<'_>, 442 ) -> impl Future<Output = Result<(NotebookView<'static>, Vec<StrongRef<'static>>), WeaverError>> 443 where 444 Self: Sized, 445 { 446 async move { 447 use weaver_api::sh_weaver::notebook::get_notebook::GetNotebook; 448 449 let resp = self 450 .send(GetNotebook::new().notebook(uri.clone()).build()) 451 .await 452 .map_err(|e| AgentError::from(ClientError::from(e)))?; 453 454 let output = resp.into_output().map_err(|e| { 455 AgentError::from(ClientError::invalid_request(format!( 456 "Failed to get notebook: {}", 457 e 458 ))) 459 })?; 460 461 Ok(( 462 output.notebook.into_static(), 463 output 464 .entries 465 .into_iter() 466 .map(IntoStatic::into_static) 467 .collect(), 468 )) 469 } 470 } 471 472 #[cfg(not(feature = "use-index"))] 473 fn view_notebook( 474 &self, 475 uri: &AtUri<'_>, 476 ) -> impl Future<Output = Result<(NotebookView<'static>, Vec<StrongRef<'static>>), WeaverError>> 477 where 478 Self: Sized, 479 { 480 async move { 481 use jacquard::to_data; 482 use weaver_api::sh_weaver::notebook::AuthorListView; 483 use weaver_api::sh_weaver::notebook::book::Book; 484 485 let notebook = self 486 .get_record::<Book>(uri) 487 .await 488 .map_err(|e| AgentError::from(e))? 489 .into_output() 490 .map_err(|_| { 491 AgentError::from(ClientError::invalid_request("Failed to parse Book record")) 492 })?; 493 494 let title = notebook.value.title.clone(); 495 let tags = notebook.value.tags.clone(); 496 let path = notebook.value.path.clone(); 497 498 let mut authors = Vec::new(); 499 use weaver_api::app_bsky::actor::{ 500 ProfileViewDetailed, get_profile::GetProfile, profile::Profile as BskyProfile, 501 }; 502 use weaver_api::sh_weaver::actor::{ 503 ProfileDataView, ProfileDataViewInner, ProfileView, 504 profile::Profile as WeaverProfile, 505 }; 506 507 for (index, author) in notebook.value.authors.iter().enumerate() { 508 let ident = AtIdentifier::Did(author.did.clone()); 509 let (profile_uri, profile_view) = self.hydrate_profile_view(&ident).await?; 510 authors.push( 511 AuthorListView::new() 512 .maybe_uri(profile_uri) 513 .record(profile_view) 514 .index(index as i64) 515 .build(), 516 ); 517 } 518 let entries = notebook 519 .value 520 .entry_list 521 .iter() 522 .cloned() 523 .map(IntoStatic::into_static) 524 .collect(); 525 526 // Fetch permissions for this notebook 527 let permissions = self.get_permissions_for_resource(uri).await?; 528 529 Ok(( 530 NotebookView::new() 531 .cid(notebook.cid.ok_or_else(|| { 532 AgentError::from(ClientError::invalid_request("Notebook missing CID")) 533 })?) 534 .uri(notebook.uri) 535 .indexed_at(jacquard::types::string::Datetime::now()) 536 .maybe_title(title) 537 .maybe_path(path) 538 .maybe_tags(tags) 539 .authors(authors) 540 .permissions(permissions) 541 .record(to_data(&notebook.value).map_err(|_| { 542 AgentError::from(ClientError::invalid_request( 543 "Failed to serialize notebook", 544 )) 545 })?) 546 .build(), 547 entries, 548 )) 549 } 550 } 551 552 /// Fetch an entry and construct EntryView 553 #[cfg(feature = "use-index")] 554 fn fetch_entry_view<'a>( 555 &self, 556 _notebook: &NotebookView<'a>, 557 entry_ref: &StrongRef<'_>, 558 ) -> impl Future<Output = Result<EntryView<'a>, WeaverError>> 559 where 560 Self: Sized, 561 { 562 async move { 563 use weaver_api::sh_weaver::notebook::get_entry::GetEntry; 564 565 let resp = self 566 .send(GetEntry::new().uri(entry_ref.uri.clone()).build()) 567 .await 568 .map_err(|e| AgentError::from(ClientError::from(e)))?; 569 570 let output = resp.into_output().map_err(|e| { 571 AgentError::from(ClientError::invalid_request(format!( 572 "Failed to get entry: {}", 573 e 574 ))) 575 })?; 576 577 Ok(output.value.into_static()) 578 } 579 } 580 581 /// Fetch an entry and construct EntryView 582 #[cfg(not(feature = "use-index"))] 583 fn fetch_entry_view<'a>( 584 &self, 585 notebook: &NotebookView<'a>, 586 entry_ref: &StrongRef<'_>, 587 ) -> impl Future<Output = Result<EntryView<'a>, WeaverError>> 588 where 589 Self: Sized, 590 { 591 async move { 592 use jacquard::to_data; 593 use weaver_api::sh_weaver::notebook::entry::Entry; 594 595 let entry_uri = Entry::uri(entry_ref.uri.clone()) 596 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid entry URI")))?; 597 598 // Get the rkey for version lookup 599 let rkey = entry_uri.rkey().ok_or_else(|| { 600 AgentError::from(ClientError::invalid_request("Entry URI missing rkey")) 601 })?; 602 603 // Fetch permissions for this entry (includes inherited notebook permissions) 604 let permissions = self.get_permissions_for_resource(&entry_uri).await?; 605 606 // Get all collaborators (owner + invited) 607 let owner_did = match entry_uri.authority() { 608 jacquard::types::ident::AtIdentifier::Did(d) => d.clone().into_static(), 609 jacquard::types::ident::AtIdentifier::Handle(h) => { 610 let (did, _) = self.pds_for_handle(h).await.map_err(|e| { 611 AgentError::from( 612 ClientError::from(e).with_context("Failed to resolve handle"), 613 ) 614 })?; 615 did.into_static() 616 } 617 }; 618 let collaborators = self 619 .find_collaborators_for_resource(&entry_uri) 620 .await 621 .unwrap_or_default(); 622 let all_dids: Vec<Did<'static>> = std::iter::once(owner_did) 623 .chain(collaborators.into_iter()) 624 .collect(); 625 626 // Find all versions across collaborators, get latest by updatedAt 627 let versions = self 628 .find_all_versions( 629 <Entry as jacquard::types::collection::Collection>::NSID, 630 rkey.0.as_str(), 631 &all_dids, 632 ) 633 .await 634 .unwrap_or_default(); 635 636 // Use latest version if found, otherwise fall back to original entry_ref 637 let (entry_data, final_uri, final_cid) = if let Some(latest) = versions.first() { 638 // Deserialize from the latest version's value 639 let entry: Entry = jacquard::from_data(&latest.value).map_err(|_| { 640 AgentError::from(ClientError::invalid_request( 641 "Failed to deserialize latest entry", 642 )) 643 })?; 644 (entry.into_static(), latest.uri.clone(), latest.cid.clone()) 645 } else { 646 // No versions found via find_all_versions, fetch directly 647 let entry = self.fetch_record(&entry_uri).await?; 648 let cid = entry.cid.ok_or_else(|| { 649 AgentError::from(ClientError::invalid_request("Entry missing CID")) 650 })?; 651 ( 652 entry.value.into_static(), 653 entry.uri.into_static(), 654 cid.into_static(), 655 ) 656 }; 657 658 let title = entry_data.title.clone(); 659 let path = entry_data.path.clone(); 660 let tags = entry_data.tags.clone(); 661 662 // Fetch contributors (evidence-based authors) for this entry 663 let contributor_dids = self.find_contributors_for_resource(&entry_uri).await?; 664 let mut authors = Vec::new(); 665 for (index, did) in contributor_dids.iter().enumerate() { 666 let ident = AtIdentifier::Did(did.clone()); 667 let (profile_uri, profile_view) = self.hydrate_profile_view(&ident).await?; 668 authors.push( 669 AuthorListView::new() 670 .maybe_uri(profile_uri) 671 .record(profile_view) 672 .index(index as i64) 673 .build(), 674 ); 675 } 676 677 Ok(EntryView::new() 678 .cid(final_cid) 679 .uri(final_uri) 680 .indexed_at(jacquard::types::string::Datetime::now()) 681 .record(to_data(&entry_data).map_err(|_| { 682 AgentError::from(ClientError::invalid_request("Failed to serialize entry")) 683 })?) 684 .maybe_tags(tags) 685 .title(title) 686 .path(path) 687 .authors(authors) 688 .permissions(permissions) 689 .build()) 690 } 691 } 692 693 /// Search for an entry by title within a notebook's entry list 694 /// 695 /// O(n) network calls - unavoidable without title indexing. 696 /// Breaks early on match to minimize unnecessary fetches. 697 fn entry_by_title<'a>( 698 &self, 699 notebook: &NotebookView<'a>, 700 entries: &[StrongRef<'_>], 701 title: &str, 702 ) -> impl Future<Output = Result<Option<(BookEntryView<'a>, entry::Entry<'a>)>, WeaverError>> 703 where 704 Self: Sized, 705 { 706 async move { 707 use weaver_api::sh_weaver::notebook::BookEntryRef; 708 use weaver_api::sh_weaver::notebook::entry::Entry; 709 710 for (index, entry_ref) in entries.iter().enumerate() { 711 let resp = self 712 .get_record::<Entry>(&entry_ref.uri) 713 .await 714 .map_err(|e| AgentError::from(e))?; 715 if let Ok(entry) = resp.parse() { 716 let path_matches = title_matches(entry.value.path.as_ref(), title); 717 let title_field_matches = title_matches(entry.value.title.as_ref(), title); 718 if path_matches || title_field_matches { 719 // Build BookEntryView with prev/next 720 let entry_view = self.fetch_entry_view(notebook, entry_ref).await?; 721 722 let prev_entry = if index > 0 { 723 let prev_entry_ref = &entries[index - 1]; 724 self.fetch_entry_view(notebook, prev_entry_ref).await.ok() 725 } else { 726 None 727 } 728 .map(|e| BookEntryRef::new().entry(e).build()); 729 730 let next_entry = if index < entries.len() - 1 { 731 let next_entry_ref = &entries[index + 1]; 732 self.fetch_entry_view(notebook, next_entry_ref).await.ok() 733 } else { 734 None 735 } 736 .map(|e| BookEntryRef::new().entry(e).build()); 737 738 let book_entry_view = BookEntryView::new() 739 .entry(entry_view) 740 .maybe_next(next_entry) 741 .maybe_prev(prev_entry) 742 .index(index as i64) 743 .build(); 744 745 return Ok(Some((book_entry_view, entry.value.into_static()))); 746 } 747 } 748 } 749 Ok(None) 750 } 751 } 752 753 /// Search for a notebook by title for a given DID or handle 754 #[cfg(feature = "use-index")] 755 fn notebook_by_title( 756 &self, 757 ident: &jacquard::types::ident::AtIdentifier<'_>, 758 title: &str, 759 ) -> impl Future< 760 Output = Result<Option<(NotebookView<'static>, Vec<BookEntryView<'static>>)>, WeaverError>, 761 > 762 where 763 Self: Sized, 764 { 765 async move { 766 use weaver_api::sh_weaver::notebook::resolve_notebook::ResolveNotebook; 767 768 let resp = self 769 .send( 770 ResolveNotebook::new() 771 .actor(ident.clone()) 772 .name(title) 773 .build(), 774 ) 775 .await 776 .map_err(|e| AgentError::from(ClientError::from(e)))?; 777 778 match resp.into_output() { 779 Ok(output) => Ok(Some(( 780 output.notebook.into_static(), 781 output.entries.into_static(), 782 ))), 783 Err(_) => Ok(None), 784 } 785 } 786 } 787 788 /// Search for a notebook by title for a given DID or handle 789 #[cfg(not(feature = "use-index"))] 790 fn notebook_by_title( 791 &self, 792 ident: &jacquard::types::ident::AtIdentifier<'_>, 793 title: &str, 794 ) -> impl Future< 795 Output = Result<Option<(NotebookView<'static>, Vec<BookEntryView<'static>>)>, WeaverError>, 796 > 797 where 798 Self: Sized, 799 { 800 async move { 801 use jacquard::types::collection::Collection; 802 use jacquard::types::nsid::Nsid; 803 use weaver_api::com_atproto::repo::list_records::ListRecords; 804 use weaver_api::sh_weaver::notebook::AuthorListView; 805 use weaver_api::sh_weaver::notebook::book::Book; 806 807 let (repo_did, pds_url) = match ident { 808 jacquard::types::ident::AtIdentifier::Did(did) => { 809 let pds = self.pds_for_did(did).await.map_err(|e| { 810 AgentError::from( 811 ClientError::from(e).with_context("Failed to resolve PDS for DID"), 812 ) 813 })?; 814 (did.clone(), pds) 815 } 816 jacquard::types::ident::AtIdentifier::Handle(handle) => { 817 self.pds_for_handle(handle).await.map_err(|e| { 818 AgentError::from( 819 ClientError::from(e).with_context("Failed to resolve handle"), 820 ) 821 })? 822 } 823 }; 824 825 // Search with pagination 826 let mut cursor: Option<CowStr<'static>> = None; 827 loop { 828 let resp = self 829 .xrpc(pds_url.clone()) 830 .send( 831 &ListRecords::new() 832 .repo(repo_did.clone()) 833 .collection(Nsid::raw(Book::NSID)) 834 .limit(100) 835 .maybe_cursor(cursor.clone()) 836 .build(), 837 ) 838 .await 839 .map_err(|e| AgentError::from(ClientError::from(e)))?; 840 841 let list = match resp.parse() { 842 Ok(l) => l, 843 Err(_) => break, 844 }; 845 846 for record in list.records { 847 let notebook: Book = jacquard::from_data(&record.value).map_err(|_| { 848 AgentError::from(ClientError::invalid_request( 849 "Failed to parse notebook record", 850 )) 851 })?; 852 853 // Match on path first, then title (with trailing punctuation tolerance) 854 let matched_title = if let Some(ref path) = notebook.path 855 && title_matches(path.as_ref(), title) 856 { 857 Some(path.clone()) 858 } else if let Some(ref book_title) = notebook.title 859 && title_matches(book_title.as_ref(), title) 860 { 861 Some(book_title.clone()) 862 } else { 863 None 864 }; 865 866 if let Some(matched) = matched_title { 867 let tags = notebook.tags.clone(); 868 let path = notebook.path.clone(); 869 870 let mut authors = Vec::new(); 871 for (index, author) in notebook.authors.iter().enumerate() { 872 let ident = AtIdentifier::Did(author.did.clone()); 873 let (profile_uri, profile_view) = 874 self.hydrate_profile_view(&ident).await?; 875 authors.push( 876 AuthorListView::new() 877 .maybe_uri(profile_uri) 878 .record(profile_view) 879 .index(index as i64) 880 .build(), 881 ); 882 } 883 884 // TODO: Fix this - entries building is broken because we need NotebookView 885 // to call view_entry but we're still building the NotebookView 886 let entries = Vec::new(); // Temporarily empty 887 888 // let mut entries = Vec::with_capacity(notebook.entry_list.len()); 889 // for (index, _) in notebook.entry_list.iter().enumerate() { 890 // let entry_view = self.view_entry(&notebook_view, &notebook.entry_list, index).await?; 891 // entries.push(entry_view); 892 // } 893 894 // Fetch permissions for this notebook 895 let permissions = self.get_permissions_for_resource(&record.uri).await?; 896 897 return Ok(Some(( 898 NotebookView::new() 899 .cid(record.cid) 900 .uri(record.uri) 901 .indexed_at(jacquard::types::string::Datetime::now()) 902 .title(matched) 903 .maybe_path(path) 904 .maybe_tags(tags) 905 .authors(authors) 906 .permissions(permissions) 907 .record(record.value.clone()) 908 .build() 909 .into_static(), 910 entries, 911 ))); 912 } 913 } 914 915 match list.cursor { 916 Some(c) => cursor = Some(c.into_static()), 917 None => break, 918 } 919 } 920 921 Ok(None) 922 } 923 } 924 925 /// Hydrate a profile view from either weaver or bsky profile 926 #[cfg(feature = "use-index")] 927 fn hydrate_profile_view( 928 &self, 929 ident: &AtIdentifier<'_>, 930 ) -> impl Future< 931 Output = Result< 932 ( 933 Option<AtUri<'static>>, 934 weaver_api::sh_weaver::actor::ProfileDataView<'static>, 935 ), 936 WeaverError, 937 >, 938 > { 939 async move { 940 use weaver_api::sh_weaver::actor::get_profile::GetProfile; 941 942 let resp = self 943 .send(GetProfile::new().actor(ident.clone()).build()) 944 .await 945 .map_err(|e| AgentError::from(ClientError::from(e)))?; 946 947 let output = resp.into_output().map_err(|e| { 948 AgentError::from(ClientError::invalid_request(format!( 949 "Failed to get profile: {}", 950 e 951 ))) 952 })?; 953 954 // URI is goofy in this signature, just return None for now 955 Ok((None, output.value.into_static())) 956 } 957 } 958 959 /// Hydrate a profile view from either weaver or bsky profile 960 #[cfg(not(feature = "use-index"))] 961 fn hydrate_profile_view( 962 &self, 963 ident: &AtIdentifier<'_>, 964 ) -> impl Future< 965 Output = Result< 966 ( 967 Option<AtUri<'static>>, 968 weaver_api::sh_weaver::actor::ProfileDataView<'static>, 969 ), 970 WeaverError, 971 >, 972 > { 973 async move { 974 use weaver_api::app_bsky::actor::{ 975 get_profile::GetProfile, profile::Profile as BskyProfile, 976 }; 977 use weaver_api::sh_weaver::actor::{ 978 ProfileDataView, ProfileDataViewInner, ProfileView, 979 profile::Profile as WeaverProfile, 980 }; 981 982 // Resolve identifier to DID and handle 983 let (did, handle) = match ident { 984 AtIdentifier::Did(d) => { 985 let handles = self.resolve_did_doc_owned(d).await?.handles(); 986 let h = handles.first().ok_or_else(|| { 987 AgentError::from(ClientError::invalid_request("couldn't resolve handle")) 988 })?; 989 (d.clone().into_static(), h.clone()) 990 } 991 AtIdentifier::Handle(h) => { 992 let d = self.resolve_handle(h).await?; 993 (d.into_static(), h.clone().into_static()) 994 } 995 }; 996 997 // Try weaver profile first 998 let weaver_uri = 999 WeaverProfile::uri(format!("at://{}/sh.weaver.actor.profile/self", did)).map_err( 1000 |_| { 1001 AgentError::from(ClientError::invalid_request("Invalid weaver profile URI")) 1002 }, 1003 )?; 1004 let weaver_future = async { 1005 if let Ok(weaver_record) = self.fetch_record(&weaver_uri).await { 1006 // Convert blobs to CDN URLs 1007 let avatar = weaver_record 1008 .value 1009 .avatar 1010 .as_ref() 1011 .map(|blob| { 1012 let cid = blob.blob().cid(); 1013 jacquard::types::string::Uri::new_owned(format!( 1014 "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg", 1015 did, cid 1016 )) 1017 }) 1018 .transpose() 1019 .map_err(|_| { 1020 AgentError::from(ClientError::invalid_request("Invalid avatar URI")) 1021 })?; 1022 let banner = weaver_record 1023 .value 1024 .banner 1025 .as_ref() 1026 .map(|blob| { 1027 let cid = blob.blob().cid(); 1028 jacquard::types::string::Uri::new_owned(format!( 1029 "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg", 1030 did, cid 1031 )) 1032 }) 1033 .transpose() 1034 .map_err(|_| { 1035 AgentError::from(ClientError::invalid_request("Invalid banner URI")) 1036 })?; 1037 1038 let profile_view = ProfileView::new() 1039 .did(did.clone()) 1040 .handle(handle.clone()) 1041 .maybe_display_name(weaver_record.value.display_name.clone()) 1042 .maybe_description(weaver_record.value.description.clone()) 1043 .maybe_avatar(avatar) 1044 .maybe_banner(banner) 1045 .maybe_bluesky(weaver_record.value.bluesky) 1046 .maybe_tangled(weaver_record.value.tangled) 1047 .maybe_streamplace(weaver_record.value.streamplace) 1048 .maybe_location(weaver_record.value.location.clone()) 1049 .maybe_links(weaver_record.value.links.clone()) 1050 .maybe_pronouns(weaver_record.value.pronouns.clone()) 1051 .maybe_pinned(weaver_record.value.pinned.clone()) 1052 .indexed_at(jacquard::types::string::Datetime::now()) 1053 .maybe_created_at(weaver_record.value.created_at) 1054 .build(); 1055 1056 Ok(( 1057 Some(weaver_uri.as_uri().clone().into_static()), 1058 ProfileDataView::new() 1059 .inner(ProfileDataViewInner::ProfileView(Box::new(profile_view))) 1060 .build() 1061 .into_static(), 1062 )) 1063 } else { 1064 Err(WeaverError::Agent( 1065 ClientError::invalid_request("Invalid weaver profile URI").into(), 1066 )) 1067 } 1068 }; 1069 let bsky_appview_future = async { 1070 if let Ok(bsky_resp) = self 1071 .send(GetProfile::new().actor(did.clone()).build()) 1072 .await 1073 { 1074 if let Ok(output) = bsky_resp.into_output() { 1075 let bsky_uri = 1076 BskyProfile::uri(format!("at://{}/app.bsky.actor.profile/self", did)) 1077 .map_err(|_| { 1078 AgentError::from(ClientError::invalid_request( 1079 "Invalid bsky profile URI", 1080 )) 1081 })?; 1082 Ok(( 1083 Some(bsky_uri.as_uri().clone().into_static()), 1084 ProfileDataView::new() 1085 .inner(ProfileDataViewInner::ProfileViewDetailed(Box::new( 1086 output.value, 1087 ))) 1088 .build() 1089 .into_static(), 1090 )) 1091 } else { 1092 Err(WeaverError::Agent( 1093 ClientError::invalid_request("Invalid bsky profile URI").into(), 1094 )) 1095 } 1096 } else { 1097 Err(WeaverError::Agent( 1098 ClientError::invalid_request("Invalid bsky profile URI").into(), 1099 )) 1100 } 1101 }; 1102 1103 if let Ok((profile_uri, weaver_profileview)) = weaver_future.await { 1104 return Ok((profile_uri, weaver_profileview)); 1105 } else if let Ok((profile_uri, bsky_profileview)) = bsky_appview_future.await { 1106 return Ok((profile_uri, bsky_profileview)); 1107 } else { 1108 Err(WeaverError::Agent(AgentError::from( 1109 ClientError::invalid_request("couldn't fetch profile"), 1110 ))) 1111 } 1112 } 1113 } 1114 1115 /// View an entry at a specific index with prev/next navigation 1116 #[cfg(feature = "use-index")] 1117 fn view_entry<'a>( 1118 &self, 1119 notebook: &NotebookView<'a>, 1120 _entries: &[StrongRef<'_>], 1121 index: usize, 1122 ) -> impl Future<Output = Result<BookEntryView<'a>, WeaverError>> { 1123 async move { 1124 use weaver_api::sh_weaver::notebook::get_book_entry::GetBookEntry; 1125 1126 let resp = self 1127 .send( 1128 GetBookEntry::new() 1129 .notebook(notebook.uri.clone()) 1130 .index(index as i64) 1131 .build(), 1132 ) 1133 .await 1134 .map_err(|e| AgentError::from(ClientError::from(e)))?; 1135 1136 let output = resp.into_output().map_err(|e| { 1137 AgentError::from(ClientError::invalid_request(format!( 1138 "Failed to get book entry: {}", 1139 e 1140 ))) 1141 })?; 1142 1143 Ok(output.value.into_static()) 1144 } 1145 } 1146 1147 /// View an entry at a specific index with prev/next navigation 1148 #[cfg(not(feature = "use-index"))] 1149 fn view_entry<'a>( 1150 &self, 1151 notebook: &NotebookView<'a>, 1152 entries: &[StrongRef<'_>], 1153 index: usize, 1154 ) -> impl Future<Output = Result<BookEntryView<'a>, WeaverError>> { 1155 async move { 1156 use weaver_api::sh_weaver::notebook::BookEntryRef; 1157 1158 let entry_ref = entries.get(index).ok_or_else(|| { 1159 AgentError::from(ClientError::invalid_request("entry out of bounds")) 1160 })?; 1161 let entry = self.fetch_entry_view(notebook, entry_ref).await?; 1162 1163 let prev_entry = if index > 0 { 1164 let prev_entry_ref = &entries[index - 1]; 1165 self.fetch_entry_view(notebook, prev_entry_ref).await.ok() 1166 } else { 1167 None 1168 } 1169 .map(|e| BookEntryRef::new().entry(e).build()); 1170 1171 let next_entry = if index < entries.len() - 1 { 1172 let next_entry_ref = &entries[index + 1]; 1173 self.fetch_entry_view(notebook, next_entry_ref).await.ok() 1174 } else { 1175 None 1176 } 1177 .map(|e| BookEntryRef::new().entry(e).build()); 1178 1179 Ok(BookEntryView::new() 1180 .entry(entry) 1181 .maybe_next(next_entry) 1182 .maybe_prev(prev_entry) 1183 .index(index as i64) 1184 .build()) 1185 } 1186 } 1187 1188 /// View a page at a specific index with prev/next navigation 1189 fn view_page<'a>( 1190 &self, 1191 notebook: &NotebookView<'a>, 1192 pages: &[StrongRef<'_>], 1193 index: usize, 1194 ) -> impl Future<Output = Result<BookEntryView<'a>, WeaverError>> { 1195 async move { 1196 use weaver_api::sh_weaver::notebook::BookEntryRef; 1197 1198 let entry_ref = pages.get(index).ok_or_else(|| { 1199 AgentError::from(ClientError::invalid_request("entry out of bounds")) 1200 })?; 1201 let entry = self.fetch_page_view(notebook, entry_ref).await?; 1202 1203 let prev_entry = if index > 0 { 1204 let prev_entry_ref = &pages[index - 1]; 1205 self.fetch_page_view(notebook, prev_entry_ref).await.ok() 1206 } else { 1207 None 1208 } 1209 .map(|e| BookEntryRef::new().entry(e).build()); 1210 1211 let next_entry = if index < pages.len() - 1 { 1212 let next_entry_ref = &pages[index + 1]; 1213 self.fetch_page_view(notebook, next_entry_ref).await.ok() 1214 } else { 1215 None 1216 } 1217 .map(|e| BookEntryRef::new().entry(e).build()); 1218 1219 Ok(BookEntryView::new() 1220 .entry(entry) 1221 .maybe_next(next_entry) 1222 .maybe_prev(prev_entry) 1223 .index(index as i64) 1224 .build()) 1225 } 1226 } 1227 1228 /// Fetch a page view (like fetch_entry_view but for pages) 1229 fn fetch_page_view<'a>( 1230 &self, 1231 notebook: &NotebookView<'a>, 1232 entry_ref: &StrongRef<'_>, 1233 ) -> impl Future<Output = Result<EntryView<'a>, WeaverError>> 1234 where 1235 Self: Sized, 1236 { 1237 async move { 1238 use jacquard::to_data; 1239 use weaver_api::sh_weaver::notebook::page::Page; 1240 1241 let page_uri = Page::uri(entry_ref.uri.clone()) 1242 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid page URI")))?; 1243 1244 // Get the rkey for version lookup 1245 let rkey = page_uri.rkey().ok_or_else(|| { 1246 AgentError::from(ClientError::invalid_request("Page URI missing rkey")) 1247 })?; 1248 1249 // Fetch permissions for this page (includes inherited notebook permissions) 1250 let permissions = self.get_permissions_for_resource(&page_uri).await?; 1251 1252 // Get all collaborators (owner + invited) 1253 let owner_did = match page_uri.authority() { 1254 jacquard::types::ident::AtIdentifier::Did(d) => d.clone().into_static(), 1255 jacquard::types::ident::AtIdentifier::Handle(h) => { 1256 let (did, _) = self.pds_for_handle(h).await.map_err(|e| { 1257 AgentError::from( 1258 ClientError::from(e).with_context("Failed to resolve handle"), 1259 ) 1260 })?; 1261 did.into_static() 1262 } 1263 }; 1264 let collaborators = self 1265 .find_collaborators_for_resource(&page_uri) 1266 .await 1267 .unwrap_or_default(); 1268 let all_dids: Vec<Did<'static>> = std::iter::once(owner_did) 1269 .chain(collaborators.into_iter()) 1270 .collect(); 1271 1272 // Find all versions across collaborators, get latest by updatedAt 1273 let versions = self 1274 .find_all_versions( 1275 <Page as jacquard::types::collection::Collection>::NSID, 1276 rkey.0.as_str(), 1277 &all_dids, 1278 ) 1279 .await 1280 .unwrap_or_default(); 1281 1282 // Use latest version if found, otherwise fall back to direct fetch 1283 let (page_data, final_uri, final_cid) = if let Some(latest) = versions.first() { 1284 let page: Page = jacquard::from_data(&latest.value).map_err(|_| { 1285 AgentError::from(ClientError::invalid_request( 1286 "Failed to deserialize latest page", 1287 )) 1288 })?; 1289 (page.into_static(), latest.uri.clone(), latest.cid.clone()) 1290 } else { 1291 // No versions found, fetch directly from PDS 1292 let page = self.fetch_record(&page_uri).await?; 1293 let cid = page.cid.ok_or_else(|| { 1294 AgentError::from(ClientError::invalid_request("Page missing CID")) 1295 })?; 1296 ( 1297 page.value.into_static(), 1298 page.uri.into_static(), 1299 cid.into_static(), 1300 ) 1301 }; 1302 1303 let title = page_data.title.clone(); 1304 let tags = page_data.tags.clone(); 1305 1306 // Fetch contributors (evidence-based authors) for this page 1307 let contributor_dids = self.find_contributors_for_resource(&page_uri).await?; 1308 let mut authors = Vec::new(); 1309 for (index, did) in contributor_dids.iter().enumerate() { 1310 let (profile_uri, profile_view) = self 1311 .hydrate_profile_view(&AtIdentifier::Did(did.clone())) 1312 .await?; 1313 authors.push( 1314 AuthorListView::new() 1315 .maybe_uri(profile_uri) 1316 .record(profile_view) 1317 .index(index as i64) 1318 .build(), 1319 ); 1320 } 1321 1322 Ok(EntryView::new() 1323 .cid(final_cid) 1324 .uri(final_uri) 1325 .indexed_at(jacquard::types::string::Datetime::now()) 1326 .record(to_data(&page_data).map_err(|_| { 1327 AgentError::from(ClientError::invalid_request("Failed to serialize page")) 1328 })?) 1329 .maybe_tags(tags) 1330 .title(title) 1331 .authors(authors) 1332 .permissions(permissions) 1333 .build()) 1334 } 1335 } 1336 1337 /// Find the notebook that contains a given entry using constellation backlinks. 1338 /// 1339 /// Queries constellation for `sh.weaver.notebook.book` records that reference 1340 /// the given entry URI via the `.entryList[].uri` path. 1341 fn find_notebook_for_entry( 1342 &self, 1343 entry_uri: &AtUri<'_>, 1344 ) -> impl Future<Output = Result<Option<RecordId<'static>>, WeaverError>> 1345 where 1346 Self: Sized, 1347 { 1348 async move { 1349 let (_, first) = self.find_notebooks_for_entry(entry_uri).await?; 1350 Ok(first) 1351 } 1352 } 1353 1354 /// Find notebooks containing an entry, returning count and optionally the first one. 1355 /// 1356 /// Uses constellation backlinks to reverse lookup. Returns: 1357 /// - total count of notebooks containing this entry 1358 /// - The first notebook RecordId (if any exist) 1359 fn find_notebooks_for_entry( 1360 &self, 1361 entry_uri: &AtUri<'_>, 1362 ) -> impl Future<Output = Result<(u64, Option<RecordId<'static>>), WeaverError>> 1363 where 1364 Self: Sized, 1365 { 1366 async move { 1367 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| { 1368 AgentError::from(ClientError::invalid_request(format!( 1369 "Invalid constellation URL: {}", 1370 e 1371 ))) 1372 })?; 1373 1374 // Query with limit 2 - we only need to know if there's more than 1 1375 let query = GetBacklinksQuery { 1376 subject: Uri::At(entry_uri.clone().into_static()), 1377 source: "sh.weaver.notebook.book:.entryList[].uri".into(), 1378 cursor: None, 1379 did: vec![], 1380 limit: 2, 1381 }; 1382 1383 let response = self 1384 .xrpc(constellation_url) 1385 .send(&query) 1386 .await 1387 .map_err(|e| { 1388 AgentError::from(ClientError::invalid_request(format!( 1389 "Constellation query failed: {}", 1390 e 1391 ))) 1392 })?; 1393 1394 let output = response.into_output().map_err(|e| { 1395 AgentError::from(ClientError::invalid_request(format!( 1396 "Failed to parse constellation response: {}", 1397 e 1398 ))) 1399 })?; 1400 1401 Ok(( 1402 output.total, 1403 output.records.into_iter().next().map(|r| r.into_static()), 1404 )) 1405 } 1406 } 1407 1408 /// Fetch an entry directly by its rkey, returning the EntryView and raw Entry. 1409 /// 1410 /// This bypasses notebook context entirely - useful for standalone entries 1411 /// or when you have the rkey but not the notebook. 1412 #[cfg(feature = "use-index")] 1413 fn fetch_entry_by_rkey( 1414 &self, 1415 ident: &jacquard::types::ident::AtIdentifier<'_>, 1416 rkey: &str, 1417 ) -> impl Future<Output = Result<(EntryView<'static>, entry::Entry<'static>), WeaverError>> 1418 where 1419 Self: Sized, 1420 { 1421 async move { 1422 use jacquard::types::collection::Collection; 1423 use weaver_api::sh_weaver::notebook::get_entry::GetEntry; 1424 1425 // Build entry URI from ident + rkey 1426 let entry_uri_str = format!("at://{}/{}/{}", ident, entry::Entry::NSID, rkey); 1427 let entry_uri = AtUri::new(&entry_uri_str) 1428 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid entry URI")))? 1429 .into_static(); 1430 1431 let resp = self 1432 .send(GetEntry::new().uri(entry_uri).build()) 1433 .await 1434 .map_err(|e| AgentError::from(ClientError::from(e)))?; 1435 1436 let output = resp.into_output().map_err(|e| { 1437 AgentError::from(ClientError::invalid_request(format!( 1438 "Failed to get entry: {}", 1439 e 1440 ))) 1441 })?; 1442 1443 // Clone the record for deserialization so we can consume output.value 1444 let record_clone = output.value.record.clone(); 1445 1446 // Deserialize Entry from the cloned record 1447 let entry_value: entry::Entry = jacquard::from_data(&record_clone).map_err(|e| { 1448 AgentError::from(ClientError::invalid_request(format!( 1449 "Failed to deserialize entry record: {}", 1450 e 1451 ))) 1452 })?; 1453 1454 Ok((output.value.into_static(), entry_value.into_static())) 1455 } 1456 } 1457 1458 /// Fetch an entry directly by its rkey, returning the EntryView and raw Entry. 1459 /// 1460 /// This bypasses notebook context entirely - useful for standalone entries 1461 /// or when you have the rkey but not the notebook. 1462 #[cfg(not(feature = "use-index"))] 1463 fn fetch_entry_by_rkey( 1464 &self, 1465 ident: &jacquard::types::ident::AtIdentifier<'_>, 1466 rkey: &str, 1467 ) -> impl Future<Output = Result<(EntryView<'static>, entry::Entry<'static>), WeaverError>> 1468 where 1469 Self: Sized, 1470 { 1471 async move { 1472 use jacquard::to_data; 1473 use jacquard::types::collection::Collection; 1474 1475 // Resolve DID from ident 1476 let repo_did = match ident { 1477 jacquard::types::ident::AtIdentifier::Did(did) => did.clone(), 1478 jacquard::types::ident::AtIdentifier::Handle(handle) => { 1479 let (did, _pds) = self.pds_for_handle(handle).await.map_err(|e| { 1480 AgentError::from( 1481 ClientError::from(e).with_context("Failed to resolve handle"), 1482 ) 1483 })?; 1484 did 1485 } 1486 }; 1487 1488 // Build entry URI for contributor/permission queries 1489 let entry_uri_str = format!("at://{}/{}/{}", repo_did, entry::Entry::NSID, rkey); 1490 let entry_uri = AtUri::new(&entry_uri_str) 1491 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid entry URI")))? 1492 .into_static(); 1493 1494 // Get collaborators for version lookup 1495 let collaborators = self 1496 .find_collaborators_for_resource(&entry_uri) 1497 .await 1498 .unwrap_or_default(); 1499 let all_dids: Vec<Did<'static>> = std::iter::once(repo_did.clone().into_static()) 1500 .chain(collaborators.into_iter()) 1501 .collect(); 1502 1503 // Find all versions across collaborators, get latest by updatedAt 1504 let versions = self 1505 .find_all_versions(entry::Entry::NSID, rkey, &all_dids) 1506 .await 1507 .unwrap_or_default(); 1508 1509 // Use latest version if found, otherwise fetch directly from original ident 1510 let (entry_value, final_uri, final_cid) = if let Some(latest) = versions.first() { 1511 let entry: entry::Entry = jacquard::from_data(&latest.value).map_err(|e| { 1512 AgentError::from(ClientError::invalid_request(format!( 1513 "Failed to deserialize latest entry: {}", 1514 e 1515 ))) 1516 })?; 1517 (entry.into_static(), latest.uri.clone(), latest.cid.clone()) 1518 } else { 1519 // Fallback: fetch directly via slingshot 1520 let record = self.fetch_record_slingshot(&entry_uri).await?; 1521 1522 let entry: entry::Entry = jacquard::from_data(&record.value).map_err(|e| { 1523 AgentError::from(ClientError::invalid_request(format!( 1524 "Failed to deserialize entry: {}", 1525 e 1526 ))) 1527 })?; 1528 1529 let cid = record.cid.ok_or_else(|| { 1530 AgentError::from(ClientError::invalid_request("Entry missing CID")) 1531 })?; 1532 1533 ( 1534 entry.into_static(), 1535 record.uri.into_static(), 1536 cid.into_static(), 1537 ) 1538 }; 1539 1540 // Fetch contributors (evidence-based authors) 1541 let contributor_dids = self.find_contributors_for_resource(&entry_uri).await?; 1542 let mut authors = Vec::new(); 1543 for (index, did) in contributor_dids.iter().enumerate() { 1544 let ident = AtIdentifier::Did(did.clone()); 1545 let (profile_uri, profile_view) = self.hydrate_profile_view(&ident).await?; 1546 authors.push( 1547 AuthorListView::new() 1548 .maybe_uri(profile_uri) 1549 .record(profile_view) 1550 .index(index as i64) 1551 .build(), 1552 ); 1553 } 1554 1555 // Fetch permissions 1556 let permissions = self.get_permissions_for_resource(&entry_uri).await?; 1557 1558 let entry_view = EntryView::new() 1559 .cid(final_cid) 1560 .uri(final_uri) 1561 .indexed_at(jacquard::types::string::Datetime::now()) 1562 .record(to_data(&entry_value).map_err(|_| { 1563 AgentError::from(ClientError::invalid_request("Failed to serialize entry")) 1564 })?) 1565 .maybe_tags(entry_value.tags.clone()) 1566 .title(entry_value.title.clone()) 1567 .path(entry_value.path.clone()) 1568 .authors(authors) 1569 .permissions(permissions) 1570 .build() 1571 .into_static(); 1572 1573 Ok((entry_view, entry_value.into_static())) 1574 } 1575 } 1576 1577 /// Find an entry's index within a notebook by rkey. 1578 /// 1579 /// Scans the notebook's entry_list comparing rkeys extracted from URIs. 1580 /// When found, builds BookEntryView with prev/next navigation. 1581 fn entry_in_notebook_by_rkey<'a>( 1582 &self, 1583 notebook: &NotebookView<'a>, 1584 entries: &[StrongRef<'_>], 1585 rkey: &str, 1586 ) -> impl Future<Output = Result<Option<BookEntryView<'a>>, WeaverError>> { 1587 async move { 1588 use weaver_api::sh_weaver::notebook::BookEntryRef; 1589 1590 // Find the entry index by comparing rkeys 1591 let mut found_index = None; 1592 for (index, entry_ref) in entries.iter().enumerate() { 1593 // Extract rkey from the entry URI 1594 if let Ok(uri) = AtUri::new(entry_ref.uri.as_ref()) { 1595 if let Some(entry_rkey) = uri.rkey() { 1596 if entry_rkey.0.as_str() == rkey { 1597 found_index = Some(index); 1598 break; 1599 } 1600 } 1601 } 1602 } 1603 1604 let index = match found_index { 1605 Some(i) => i, 1606 None => return Ok(None), 1607 }; 1608 1609 // Build BookEntryView with prev/next navigation 1610 let entry_ref = &entries[index]; 1611 let entry = self.fetch_entry_view(notebook, entry_ref).await?; 1612 1613 let prev_entry = if index > 0 { 1614 let prev_entry_ref = &entries[index - 1]; 1615 self.fetch_entry_view(notebook, prev_entry_ref).await.ok() 1616 } else { 1617 None 1618 } 1619 .map(|e| BookEntryRef::new().entry(e).build()); 1620 1621 let next_entry = if index < entries.len() - 1 { 1622 let next_entry_ref = &entries[index + 1]; 1623 self.fetch_entry_view(notebook, next_entry_ref).await.ok() 1624 } else { 1625 None 1626 } 1627 .map(|e| BookEntryRef::new().entry(e).build()); 1628 1629 Ok(Some( 1630 BookEntryView::new() 1631 .entry(entry) 1632 .maybe_next(next_entry) 1633 .maybe_prev(prev_entry) 1634 .index(index as i64) 1635 .build(), 1636 )) 1637 } 1638 } 1639 1640 /// Find valid collaborators for a resource. 1641 /// 1642 /// Queries Constellation for invite/accept record pairs: 1643 /// 1. Find all invites targeting this resource URI 1644 /// 2. For each invite, check if there's a matching accept record 1645 /// 3. Return DIDs that have both invite AND accept 1646 fn find_collaborators_for_resource( 1647 &self, 1648 resource_uri: &AtUri<'_>, 1649 ) -> impl Future<Output = Result<Vec<Did<'static>>, WeaverError>> 1650 where 1651 Self: Sized, 1652 { 1653 async move { 1654 use weaver_api::sh_weaver::collab::invite::Invite; 1655 1656 const INVITE_NSID: &str = "sh.weaver.collab.invite"; 1657 const ACCEPT_NSID: &str = "sh.weaver.collab.accept"; 1658 1659 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| { 1660 AgentError::from(ClientError::invalid_request(format!( 1661 "Invalid constellation URL: {}", 1662 e 1663 ))) 1664 })?; 1665 1666 // Step 1: Find all invites for this resource 1667 let invite_query = GetBacklinksQuery { 1668 subject: Uri::At(resource_uri.clone().into_static()), 1669 source: format!("{}:resource.uri", INVITE_NSID).into(), 1670 cursor: None, 1671 did: vec![], 1672 limit: 100, 1673 }; 1674 1675 let response = self 1676 .xrpc(constellation_url.clone()) 1677 .send(&invite_query) 1678 .await 1679 .map_err(|e| { 1680 AgentError::from(ClientError::invalid_request(format!( 1681 "Constellation query failed: {}", 1682 e 1683 ))) 1684 })?; 1685 1686 let invite_output = response.into_output().map_err(|e| { 1687 AgentError::from(ClientError::invalid_request(format!( 1688 "Failed to parse constellation response: {}", 1689 e 1690 ))) 1691 })?; 1692 1693 let mut collaborators = Vec::new(); 1694 1695 // Step 2: For each invite, check for a matching accept 1696 for record_id in invite_output.records { 1697 let invite_uri_str = format!( 1698 "at://{}/{}/{}", 1699 record_id.did, 1700 INVITE_NSID, 1701 record_id.rkey.0.as_ref() 1702 ); 1703 let Ok(invite_uri) = AtUri::new(&invite_uri_str) else { 1704 continue; 1705 }; 1706 1707 // Fetch the invite to get the invitee DID 1708 let Ok(invite_resp) = self.get_record::<Invite>(&invite_uri).await else { 1709 continue; 1710 }; 1711 let Ok(invite_record) = invite_resp.into_output() else { 1712 continue; 1713 }; 1714 1715 let invitee_did = invite_record.value.invitee.clone().into_static(); 1716 1717 // Query for accept records referencing this invite 1718 let accept_query = GetBacklinksQuery { 1719 subject: Uri::At(invite_uri.into_static()), 1720 source: format!("{}:invite.uri", ACCEPT_NSID).into(), 1721 cursor: None, 1722 did: vec![invitee_did.clone()], 1723 limit: 1, 1724 }; 1725 1726 let Ok(accept_resp) = self 1727 .xrpc(constellation_url.clone()) 1728 .send(&accept_query) 1729 .await 1730 else { 1731 continue; 1732 }; 1733 1734 let Ok(accept_output) = accept_resp.into_output() else { 1735 continue; 1736 }; 1737 1738 if !accept_output.records.is_empty() { 1739 // Both parties in a valid invite+accept pair are authorized 1740 let inviter_did = record_id.did.clone().into_static(); 1741 collaborators.push(inviter_did); 1742 collaborators.push(invitee_did); 1743 } 1744 } 1745 1746 // Deduplicate (someone might appear in multiple pairs) 1747 collaborators.sort(); 1748 collaborators.dedup(); 1749 1750 Ok(collaborators) 1751 } 1752 } 1753 1754 /// Find all versions of a record across collaborator repositories. 1755 /// 1756 /// For each collaborator DID, attempts to fetch `at://{did}/{collection}/{rkey}`. 1757 /// Returns all found versions sorted by `updated_at` descending (latest first). 1758 fn find_all_versions<'a>( 1759 &'a self, 1760 collection: &'a str, 1761 rkey: &'a str, 1762 collaborators: &'a [Did<'_>], 1763 ) -> impl Future<Output = Result<Vec<CollaboratorVersion<'static>>, WeaverError>> + 'a 1764 where 1765 Self: Sized, 1766 { 1767 async move { 1768 use jacquard::Data; 1769 1770 let mut versions = Vec::new(); 1771 1772 for collab_did in collaborators { 1773 // Build URI for this collaborator's version 1774 let uri_str = format!("at://{}/{}/{}", collab_did, collection, rkey); 1775 let Ok(uri) = AtUri::new(&uri_str) else { 1776 continue; 1777 }; 1778 1779 // Fetch via slingshot (handles cross-PDS routing) 1780 let Ok(record) = self.fetch_record_slingshot(&uri).await else { 1781 continue; 1782 }; 1783 1784 let Some(cid) = record.cid else { 1785 continue; 1786 }; 1787 1788 let updated_at = record 1789 .value 1790 .query("...updatedAt") 1791 .first() 1792 .or_else(|| record.value.query("...createdAt").first()) 1793 .and_then(|v: &Data| v.as_str()) 1794 .and_then(|s| s.parse::<jacquard::types::string::Datetime>().ok()); 1795 1796 versions.push(CollaboratorVersion { 1797 did: collab_did.clone().into_static(), 1798 uri: record.uri.into_static(), 1799 cid: cid.into_static(), 1800 updated_at, 1801 value: record.value.into_static(), 1802 }); 1803 } 1804 1805 // Sort by updated_at descending (latest first) 1806 versions.sort_by(|a, b| match (&b.updated_at, &a.updated_at) { 1807 (Some(b_time), Some(a_time)) => b_time.as_ref().cmp(a_time.as_ref()), 1808 (Some(_), None) => std::cmp::Ordering::Less, 1809 (None, Some(_)) => std::cmp::Ordering::Greater, 1810 (None, None) => std::cmp::Ordering::Equal, 1811 }); 1812 1813 Ok(versions) 1814 } 1815 } 1816 1817 /// Check if a user can edit a resource based on collaboration records. 1818 /// 1819 /// Returns true if the user is the resource owner OR has valid invite+accept. 1820 fn can_user_edit_resource<'a>( 1821 &'a self, 1822 resource_uri: &'a AtUri<'_>, 1823 user_did: &'a Did<'_>, 1824 ) -> impl Future<Output = Result<bool, WeaverError>> + 'a 1825 where 1826 Self: Sized, 1827 { 1828 async move { 1829 // Check if user is the owner 1830 if let jacquard::types::ident::AtIdentifier::Did(owner_did) = resource_uri.authority() { 1831 if owner_did == user_did { 1832 return Ok(true); 1833 } 1834 } 1835 1836 // Check for valid collaboration 1837 let collaborators = self.find_collaborators_for_resource(resource_uri).await?; 1838 Ok(collaborators.iter().any(|c| c == user_did)) 1839 } 1840 } 1841 1842 /// Check if a user can edit an entry, considering notebook-level cascading. 1843 /// 1844 /// An entry is editable if user owns it, has entry-level collab, or has notebook-level collab. 1845 fn can_user_edit_entry<'a>( 1846 &'a self, 1847 entry_uri: &'a AtUri<'_>, 1848 user_did: &'a Did<'_>, 1849 ) -> impl Future<Output = Result<bool, WeaverError>> + 'a 1850 where 1851 Self: Sized, 1852 { 1853 async move { 1854 // Check entry-level access first 1855 if self.can_user_edit_resource(entry_uri, user_did).await? { 1856 return Ok(true); 1857 } 1858 1859 // Check notebook-level access (cascade) 1860 if let Some(notebook_id) = self.find_notebook_for_entry(entry_uri).await? { 1861 let notebook_uri_str = format!( 1862 "at://{}/{}/{}", 1863 notebook_id.did, 1864 notebook_id.collection, 1865 notebook_id.rkey.0.as_ref() 1866 ); 1867 if let Ok(notebook_uri) = AtUri::new(&notebook_uri_str) { 1868 if self.can_user_edit_resource(&notebook_uri, user_did).await? { 1869 return Ok(true); 1870 } 1871 } 1872 } 1873 1874 Ok(false) 1875 } 1876 } 1877 1878 /// Get the full permissions state for a resource. 1879 /// 1880 /// Returns PermissionsState with all editors: 1881 /// - Resource authority (source = resource URI, grantedAt = createdAt) 1882 /// - Invited collaborators (source = invite URI, grantedAt = accept createdAt) 1883 /// - For entries: inherited notebook-level collaborators 1884 fn get_permissions_for_resource( 1885 &self, 1886 resource_uri: &AtUri<'_>, 1887 ) -> impl Future<Output = Result<PermissionsState<'static>, WeaverError>> 1888 where 1889 Self: Sized, 1890 { 1891 async move { 1892 use weaver_api::sh_weaver::collab::accept::Accept; 1893 use weaver_api::sh_weaver::collab::invite::Invite; 1894 1895 const INVITE_NSID: &str = "sh.weaver.collab.invite"; 1896 const ACCEPT_NSID: &str = "sh.weaver.collab.accept"; 1897 1898 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| { 1899 AgentError::from(ClientError::invalid_request(format!( 1900 "Invalid constellation URL: {}", 1901 e 1902 ))) 1903 })?; 1904 1905 let mut editors = Vec::new(); 1906 1907 // 1. Resource authority - creating the resource is its own grant 1908 let authority_did = match resource_uri.authority() { 1909 jacquard::types::ident::AtIdentifier::Did(did) => did.clone().into_static(), 1910 jacquard::types::ident::AtIdentifier::Handle(handle) => { 1911 let (did, _) = self.pds_for_handle(handle).await.map_err(|e| { 1912 AgentError::from( 1913 ClientError::from(e).with_context("Failed to resolve handle"), 1914 ) 1915 })?; 1916 did.into_static() 1917 } 1918 }; 1919 1920 // Fetch the record to get createdAt (use untyped fetch to handle any collection) 1921 let record = self 1922 .fetch_record_slingshot(resource_uri) 1923 .await 1924 .map_err(|e| WeaverError::from(AgentError::from(e)))?; 1925 let authority_granted_at = record 1926 .value 1927 .query("createdAt") 1928 .first() 1929 .and_then(|v| v.as_str()) 1930 .and_then(|s| s.parse::<jacquard::types::string::Datetime>().ok()) 1931 .ok_or_else(|| { 1932 WeaverError::from(AgentError::from(ClientError::invalid_request( 1933 "Record missing createdAt", 1934 ))) 1935 })?; 1936 1937 editors.push( 1938 PermissionGrant::new() 1939 .did(authority_did.clone()) 1940 .scope("direct") 1941 .source(resource_uri.clone().into_static()) 1942 .granted_at(authority_granted_at) 1943 .build() 1944 .into_static(), 1945 ); 1946 1947 // 2. Find direct invites for this resource 1948 let invite_query = GetBacklinksQuery { 1949 subject: Uri::At(resource_uri.clone().into_static()), 1950 source: format!("{}:resource.uri", INVITE_NSID).into(), 1951 cursor: None, 1952 did: vec![], 1953 limit: 100, 1954 }; 1955 1956 let invite_response = self 1957 .xrpc(constellation_url.clone()) 1958 .send(&invite_query) 1959 .await 1960 .map_err(|e| { 1961 AgentError::from(ClientError::invalid_request(format!( 1962 "Constellation invite query failed: {}", 1963 e 1964 ))) 1965 })?; 1966 let invite_output = invite_response.into_output().map_err(|e| { 1967 AgentError::from(ClientError::invalid_request(format!( 1968 "Failed to parse Constellation response: {}", 1969 e 1970 ))) 1971 })?; 1972 1973 for record_id in invite_output.records { 1974 let invite_uri_str = format!( 1975 "at://{}/{}/{}", 1976 record_id.did, 1977 INVITE_NSID, 1978 record_id.rkey.0.as_ref() 1979 ); 1980 let invite_uri = AtUri::new(&invite_uri_str).map_err(|_| { 1981 AgentError::from(ClientError::invalid_request( 1982 "Invalid invite URI from Constellation", 1983 )) 1984 })?; 1985 1986 // Fetch invite to get invitee DID 1987 let invite_record = 1988 self.get_record::<Invite>(&invite_uri) 1989 .await 1990 .map_err(|e| WeaverError::from(AgentError::from(e)))? 1991 .into_output() 1992 .map_err(|e| { 1993 WeaverError::from(AgentError::from(ClientError::invalid_request( 1994 format!("Failed to parse invite record: {}", e), 1995 ))) 1996 })?; 1997 1998 let invitee_did = invite_record.value.invitee.clone().into_static(); 1999 2000 // Query for accept records referencing this invite 2001 let accept_query = GetBacklinksQuery { 2002 subject: Uri::At(invite_uri.clone().into_static()), 2003 source: format!("{}:invite.uri", ACCEPT_NSID).into(), 2004 cursor: None, 2005 did: vec![invitee_did.clone()], 2006 limit: 1, 2007 }; 2008 2009 let accept_response = self 2010 .xrpc(constellation_url.clone()) 2011 .send(&accept_query) 2012 .await 2013 .map_err(|e| { 2014 AgentError::from(ClientError::invalid_request(format!( 2015 "Constellation accept query failed: {}", 2016 e 2017 ))) 2018 })?; 2019 let accept_output = accept_response.into_output().map_err(|e| { 2020 AgentError::from(ClientError::invalid_request(format!( 2021 "Failed to parse Constellation accept response: {}", 2022 e 2023 ))) 2024 })?; 2025 2026 // No accept = pending invite, not an error - just skip 2027 let Some(accept_record_id) = accept_output.records.first() else { 2028 continue; 2029 }; 2030 2031 let accept_uri_str = format!( 2032 "at://{}/{}/{}", 2033 accept_record_id.did, 2034 ACCEPT_NSID, 2035 accept_record_id.rkey.0.as_ref() 2036 ); 2037 let accept_uri = AtUri::new(&accept_uri_str).map_err(|_| { 2038 AgentError::from(ClientError::invalid_request( 2039 "Invalid accept URI from Constellation", 2040 )) 2041 })?; 2042 let accept_record = 2043 self.get_record::<Accept>(&accept_uri) 2044 .await 2045 .map_err(|e| WeaverError::from(AgentError::from(e)))? 2046 .into_output() 2047 .map_err(|e| { 2048 WeaverError::from(AgentError::from(ClientError::invalid_request( 2049 format!("Failed to parse accept record: {}", e), 2050 ))) 2051 })?; 2052 2053 editors.push( 2054 PermissionGrant::new() 2055 .did(invitee_did) 2056 .scope("direct") 2057 .source(invite_uri.into_static()) 2058 .granted_at(accept_record.value.created_at) 2059 .build() 2060 .into_static(), 2061 ); 2062 } 2063 2064 // 3. For entries, check notebook-level invites (inherited) 2065 let is_entry = resource_uri 2066 .collection() 2067 .is_some_and(|c| c.as_ref() == "sh.weaver.notebook.entry"); 2068 2069 if is_entry { 2070 // Entry not in a notebook is fine - just no inherited permissions 2071 if let Some(notebook_id) = self.find_notebook_for_entry(resource_uri).await? { 2072 let notebook_uri_str = format!( 2073 "at://{}/{}/{}", 2074 notebook_id.did, 2075 notebook_id.collection, 2076 notebook_id.rkey.0.as_ref() 2077 ); 2078 let notebook_uri = AtUri::new(&notebook_uri_str).map_err(|_| { 2079 AgentError::from(ClientError::invalid_request( 2080 "Invalid notebook URI from Constellation", 2081 )) 2082 })?; 2083 2084 let notebook_invite_query = GetBacklinksQuery { 2085 subject: Uri::At(notebook_uri.clone().into_static()), 2086 source: format!("{}:resource.uri", INVITE_NSID).into(), 2087 cursor: None, 2088 did: vec![], 2089 limit: 100, 2090 }; 2091 2092 let notebook_invite_response = self 2093 .xrpc(constellation_url.clone()) 2094 .send(&notebook_invite_query) 2095 .await 2096 .map_err(|e| { 2097 AgentError::from(ClientError::invalid_request(format!( 2098 "Constellation notebook invite query failed: {}", 2099 e 2100 ))) 2101 })?; 2102 let notebook_invite_output = 2103 notebook_invite_response.into_output().map_err(|e| { 2104 AgentError::from(ClientError::invalid_request(format!( 2105 "Failed to parse Constellation response: {}", 2106 e 2107 ))) 2108 })?; 2109 2110 for record_id in notebook_invite_output.records { 2111 let invite_uri_str = format!( 2112 "at://{}/{}/{}", 2113 record_id.did, 2114 INVITE_NSID, 2115 record_id.rkey.0.as_ref() 2116 ); 2117 let invite_uri = AtUri::new(&invite_uri_str).map_err(|_| { 2118 AgentError::from(ClientError::invalid_request( 2119 "Invalid invite URI from Constellation", 2120 )) 2121 })?; 2122 2123 let invite_record = self 2124 .get_record::<Invite>(&invite_uri) 2125 .await 2126 .map_err(|e| WeaverError::from(AgentError::from(e)))? 2127 .into_output() 2128 .map_err(|e| { 2129 WeaverError::from(AgentError::from(ClientError::invalid_request( 2130 format!("Failed to parse invite record: {}", e), 2131 ))) 2132 })?; 2133 2134 let invitee_did = invite_record.value.invitee.clone().into_static(); 2135 2136 // Skip if already in direct grants (direct takes precedence) 2137 if editors.iter().any(|g| g.did == invitee_did) { 2138 continue; 2139 } 2140 2141 let accept_query = GetBacklinksQuery { 2142 subject: Uri::At(invite_uri.clone().into_static()), 2143 source: format!("{}:.invite.uri", ACCEPT_NSID).into(), 2144 cursor: None, 2145 did: vec![invitee_did.clone()], 2146 limit: 1, 2147 }; 2148 2149 let accept_response = self 2150 .xrpc(constellation_url.clone()) 2151 .send(&accept_query) 2152 .await 2153 .map_err(|e| { 2154 AgentError::from(ClientError::invalid_request(format!( 2155 "Constellation accept query failed: {}", 2156 e 2157 ))) 2158 })?; 2159 let accept_output = accept_response.into_output().map_err(|e| { 2160 AgentError::from(ClientError::invalid_request(format!( 2161 "Failed to parse Constellation accept response: {}", 2162 e 2163 ))) 2164 })?; 2165 2166 // No accept = pending invite, not an error - just skip 2167 let Some(accept_record_id) = accept_output.records.first() else { 2168 continue; 2169 }; 2170 2171 let accept_uri_str = format!( 2172 "at://{}/{}/{}", 2173 accept_record_id.did, 2174 ACCEPT_NSID, 2175 accept_record_id.rkey.0.as_ref() 2176 ); 2177 let accept_uri = AtUri::new(&accept_uri_str).map_err(|_| { 2178 AgentError::from(ClientError::invalid_request( 2179 "Invalid accept URI from Constellation", 2180 )) 2181 })?; 2182 let accept_record = self 2183 .get_record::<Accept>(&accept_uri) 2184 .await 2185 .map_err(|e| WeaverError::from(AgentError::from(e)))? 2186 .into_output() 2187 .map_err(|e| { 2188 WeaverError::from(AgentError::from(ClientError::invalid_request( 2189 format!("Failed to parse accept record: {}", e), 2190 ))) 2191 })?; 2192 2193 editors.push( 2194 PermissionGrant::new() 2195 .did(invitee_did) 2196 .scope("inherited") 2197 .source(invite_uri.into_static()) 2198 .granted_at(accept_record.value.created_at) 2199 .build() 2200 .into_static(), 2201 ); 2202 } 2203 } 2204 } 2205 2206 Ok(PermissionsState::new() 2207 .editors(editors) 2208 .build() 2209 .into_static()) 2210 } 2211 } 2212 2213 // ========================================================================= 2214 // Real-time Collaboration Session Management 2215 // ========================================================================= 2216 2217 /// Create a collaboration session record on the user's PDS. 2218 /// 2219 /// Called when joining a real-time editing session. The session record 2220 /// contains the iroh NodeId so other collaborators can discover and 2221 /// connect to this peer. 2222 /// 2223 /// Returns the AT-URI of the created session record. 2224 fn create_collab_session<'a>( 2225 &'a self, 2226 resource: &'a StrongRef<'a>, 2227 node_id: &'a str, 2228 relay_url: Option<&'a str>, 2229 ttl_minutes: Option<u32>, 2230 ) -> impl Future<Output = Result<AtUri<'static>, WeaverError>> + 'a { 2231 async move { 2232 use jacquard::types::string::Datetime; 2233 use weaver_api::sh_weaver::collab::session::Session; 2234 2235 // Clean up any expired sessions first 2236 let _ = self.cleanup_expired_sessions().await; 2237 2238 let now_chrono = chrono::Utc::now().fixed_offset(); 2239 let now = Datetime::new(now_chrono); 2240 let expires_at = ttl_minutes.map(|mins| { 2241 let expires = now_chrono + chrono::Duration::minutes(mins as i64); 2242 Datetime::new(expires) 2243 }); 2244 2245 let relay_uri = relay_url 2246 .map(|url| jacquard::types::string::Uri::new(url)) 2247 .transpose() 2248 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid relay URL")))?; 2249 2250 let session = Session::new() 2251 .resource(resource.clone()) 2252 .node_id(node_id) 2253 .created_at(now) 2254 .maybe_expires_at(expires_at) 2255 .maybe_relay_url(relay_uri) 2256 .build(); 2257 2258 let response = self.create_record(session, None).await?; 2259 Ok(response.uri.into_static()) 2260 } 2261 } 2262 2263 /// Delete a collaboration session record. 2264 /// 2265 /// Called when leaving a real-time editing session to clean up. 2266 fn delete_collab_session<'a>( 2267 &'a self, 2268 session_uri: &'a AtUri<'a>, 2269 ) -> impl Future<Output = Result<(), WeaverError>> + 'a { 2270 async move { 2271 use weaver_api::sh_weaver::collab::session::Session; 2272 2273 let rkey = session_uri.rkey().ok_or_else(|| { 2274 AgentError::from(ClientError::invalid_request("Session URI missing rkey")) 2275 })?; 2276 self.delete_record::<Session>(rkey.clone()).await?; 2277 Ok(()) 2278 } 2279 } 2280 2281 /// Refresh a collaboration session's TTL. 2282 /// 2283 /// Called periodically to indicate the session is still active. 2284 fn refresh_collab_session<'a>( 2285 &'a self, 2286 session_uri: &'a AtUri<'a>, 2287 ttl_minutes: u32, 2288 ) -> impl Future<Output = Result<(), WeaverError>> + 'a { 2289 async move { 2290 use jacquard::types::string::Datetime; 2291 use weaver_api::sh_weaver::collab::session::Session; 2292 2293 let now_chrono = chrono::Utc::now().fixed_offset(); 2294 let expires = now_chrono + chrono::Duration::minutes(ttl_minutes as i64); 2295 let expires_at = Datetime::new(expires); 2296 2297 self.update_record::<Session>(session_uri, |session| { 2298 session.expires_at = Some(expires_at); 2299 }) 2300 .await?; 2301 Ok(()) 2302 } 2303 } 2304 2305 /// Update the relay URL in an existing session record. 2306 /// 2307 /// Called when the relay connection changes during a session. 2308 fn update_collab_session_relay<'a>( 2309 &'a self, 2310 session_uri: &'a AtUri<'a>, 2311 relay_url: Option<&'a str>, 2312 ) -> impl Future<Output = Result<(), WeaverError>> + 'a { 2313 async move { 2314 use weaver_api::sh_weaver::collab::session::Session; 2315 2316 let relay_uri = relay_url 2317 .map(|url| jacquard::types::string::Uri::new(url)) 2318 .transpose() 2319 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid relay URL")))?; 2320 2321 self.update_record::<Session>(session_uri, |session| { 2322 session.relay_url = relay_uri.clone(); 2323 }) 2324 .await?; 2325 Ok(()) 2326 } 2327 } 2328 2329 /// Delete all expired session records for the current user. 2330 /// 2331 /// Called before creating a new session to clean up stale records. 2332 fn cleanup_expired_sessions<'a>(&'a self) -> impl Future<Output = Result<u32, WeaverError>> + 'a 2333 where 2334 Self: Sized, 2335 { 2336 async move { 2337 use jacquard::types::nsid::Nsid; 2338 use weaver_api::com_atproto::repo::list_records::ListRecords; 2339 use weaver_api::sh_weaver::collab::session::Session; 2340 2341 let (did, _) = self.session_info().await.ok_or_else(|| { 2342 AgentError::from(ClientError::invalid_request("No active session")) 2343 })?; 2344 let now = chrono::Utc::now(); 2345 let mut deleted = 0u32; 2346 2347 // List all our session records 2348 let collection = 2349 Nsid::new("sh.weaver.collab.session").map_err(WeaverError::AtprotoString)?; 2350 let request = ListRecords::new() 2351 .repo(did.clone()) 2352 .collection(collection) 2353 .limit(100) 2354 .build(); 2355 2356 let response = self.send(request).await.map_err(AgentError::from)?; 2357 let output = response.into_output().map_err(|e| { 2358 AgentError::from(ClientError::invalid_request(format!( 2359 "Failed to list sessions: {}", 2360 e 2361 ))) 2362 })?; 2363 2364 for record in output.records { 2365 if let Ok(session) = jacquard::from_data::<Session>(&record.value) { 2366 // Check if expired 2367 if let Some(ref expires_at) = session.expires_at { 2368 let expires_str = expires_at.as_str(); 2369 if let Ok(expires) = chrono::DateTime::parse_from_rfc3339(expires_str) { 2370 if expires.with_timezone(&chrono::Utc) < now { 2371 // Delete expired session 2372 if let Some(rkey) = record.uri.rkey() { 2373 if let Err(e) = 2374 self.delete_record::<Session>(rkey.clone()).await 2375 { 2376 tracing::warn!("Failed to delete expired session: {}", e); 2377 } else { 2378 deleted += 1; 2379 } 2380 } 2381 } 2382 } 2383 } 2384 } 2385 } 2386 2387 if deleted > 0 { 2388 tracing::info!("Cleaned up {} expired session records", deleted); 2389 } 2390 2391 Ok(deleted) 2392 } 2393 } 2394 2395 /// Find active collaboration sessions for a resource. 2396 /// 2397 /// Queries Constellation for session records referencing the given resource, 2398 /// then fetches each to extract peer connection info. 2399 /// 2400 /// Returns peers with unexpired sessions (or no expiry set). 2401 fn find_session_peers<'a>( 2402 &'a self, 2403 resource_uri: &'a AtUri<'a>, 2404 ) -> impl Future<Output = Result<Vec<SessionPeer<'static>>, WeaverError>> + 'a 2405 where 2406 Self: Sized, 2407 { 2408 async move { 2409 use jacquard::types::string::Datetime; 2410 use weaver_api::sh_weaver::collab::session::Session; 2411 2412 const SESSION_NSID: &str = "sh.weaver.collab.session"; 2413 2414 // Get authorized collaborators (owner is checked separately via URI authority) 2415 let collaborators: std::collections::HashSet<Did<'static>> = self 2416 .find_collaborators_for_resource(resource_uri) 2417 .await 2418 .unwrap_or_default() 2419 .into_iter() 2420 .collect(); 2421 2422 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| { 2423 AgentError::from(ClientError::invalid_request(format!( 2424 "Invalid constellation URL: {}", 2425 e 2426 ))) 2427 })?; 2428 2429 // Query for session records referencing this resource 2430 let query = GetBacklinksQuery { 2431 subject: Uri::At(resource_uri.clone().into_static()), 2432 source: format!("{}:resource.uri", SESSION_NSID).into(), 2433 cursor: None, 2434 did: vec![], 2435 limit: 100, 2436 }; 2437 2438 let response = self 2439 .xrpc(constellation_url) 2440 .send(&query) 2441 .await 2442 .map_err(|e| { 2443 AgentError::from(ClientError::invalid_request(format!( 2444 "Constellation query failed: {}", 2445 e 2446 ))) 2447 })?; 2448 2449 let output = response.into_output().map_err(|e| { 2450 AgentError::from(ClientError::invalid_request(format!( 2451 "Failed to parse constellation response: {}", 2452 e 2453 ))) 2454 })?; 2455 2456 let mut peers = Vec::new(); 2457 let now = Datetime::now(); 2458 2459 for record_id in output.records { 2460 let session_uri_str = format!( 2461 "at://{}/{}/{}", 2462 record_id.did, 2463 SESSION_NSID, 2464 record_id.rkey.0.as_ref() 2465 ); 2466 let Ok(session_uri) = AtUri::new(&session_uri_str) else { 2467 continue; 2468 }; 2469 2470 // Fetch the session record 2471 let Ok(session_resp) = self.get_record::<Session>(&session_uri).await else { 2472 continue; 2473 }; 2474 let Ok(session_record) = session_resp.into_output() else { 2475 continue; 2476 }; 2477 2478 // Check if session has expired (Datetime implements Ord) 2479 if let Some(ref expires_at) = session_record.value.expires_at { 2480 if *expires_at < now { 2481 continue; // Session expired 2482 } 2483 } 2484 2485 // Check if peer is authorized (has valid invite+accept pair) 2486 let peer_did = record_id.did.clone().into_static(); 2487 if !collaborators.contains(&peer_did) { 2488 tracing::debug!( 2489 peer = %peer_did, 2490 "Filtering out unauthorized session peer" 2491 ); 2492 continue; 2493 } 2494 2495 peers.push(SessionPeer { 2496 did: record_id.did.into_static(), 2497 node_id: session_record.value.node_id.as_ref().into(), 2498 relay_url: session_record.value.relay_url.map(|u| u.as_ref().into()), 2499 created_at: session_record.value.created_at, 2500 expires_at: session_record.value.expires_at, 2501 }); 2502 } 2503 2504 Ok(peers) 2505 } 2506 } 2507 2508 /// Find contributors (authors) for a resource based on evidence. 2509 /// 2510 /// Contributors are DIDs who have actually contributed to this resource: 2511 /// 1. Edit records (edit.root or edit.diff) referencing this resource 2512 /// 2. Published versions of the record in their repo (same rkey) 2513 /// 2514 /// This is separate from permissions - you can have edit permission without 2515 /// having contributed yet. 2516 fn find_contributors_for_resource( 2517 &self, 2518 resource_uri: &AtUri<'_>, 2519 ) -> impl Future<Output = Result<Vec<Did<'static>>, WeaverError>> 2520 where 2521 Self: Sized, 2522 { 2523 async move { 2524 const EDIT_ROOT_NSID: &str = "sh.weaver.edit.root"; 2525 2526 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| { 2527 AgentError::from(ClientError::invalid_request(format!( 2528 "Invalid constellation URL: {}", 2529 e 2530 ))) 2531 })?; 2532 2533 let mut contributors = std::collections::HashSet::new(); 2534 2535 // 1. Resource authority is always a contributor 2536 let authority_did = match resource_uri.authority() { 2537 jacquard::types::ident::AtIdentifier::Did(did) => did.clone().into_static(), 2538 jacquard::types::ident::AtIdentifier::Handle(handle) => { 2539 let (did, _) = self.pds_for_handle(handle).await.map_err(|e| { 2540 AgentError::from( 2541 ClientError::from(e).with_context("Failed to resolve handle"), 2542 ) 2543 })?; 2544 did.into_static() 2545 } 2546 }; 2547 contributors.insert(authority_did); 2548 2549 // 2. Find DIDs with edit records for this resource 2550 let edit_query = GetBacklinksQuery { 2551 subject: Uri::At(resource_uri.clone().into_static()), 2552 source: format!("{}:doc.value.entry.uri", EDIT_ROOT_NSID).into(), 2553 cursor: None, 2554 did: vec![], 2555 limit: 100, 2556 }; 2557 2558 if let Ok(response) = self.xrpc(constellation_url.clone()).send(&edit_query).await { 2559 if let Ok(edit_output) = response.into_output() { 2560 for record_id in edit_output.records { 2561 contributors.insert(record_id.did.into_static()); 2562 } 2563 } 2564 } 2565 2566 // 3. Find collaborators who have published versions (same rkey) 2567 let collaborators = self.find_collaborators_for_resource(resource_uri).await?; 2568 let rkey = resource_uri.rkey(); 2569 let collection = resource_uri.collection(); 2570 2571 if let (Some(rkey), Some(collection)) = (rkey, collection) { 2572 for collab_did in collaborators { 2573 // Try to fetch their version of the record via slingshot 2574 let collab_uri_str = format!( 2575 "at://{}/{}/{}", 2576 collab_did.as_ref(), 2577 collection, 2578 rkey.as_ref() 2579 ); 2580 if let Ok(collab_uri) = AtUri::new(&collab_uri_str) { 2581 // Check if record actually exists 2582 if self.fetch_record_slingshot(&collab_uri).await.is_ok() { 2583 contributors.insert(collab_did); 2584 } 2585 } 2586 } 2587 } 2588 2589 Ok(contributors.into_iter().collect()) 2590 } 2591 } 2592 2593 /// Fetch a blob from any PDS by DID and CID. 2594 /// 2595 /// Resolves the DID to find its PDS, then fetches the blob. 2596 fn fetch_blob<'a>( 2597 &'a self, 2598 did: &'a Did<'_>, 2599 cid: &'a jacquard::types::string::Cid<'_>, 2600 ) -> impl Future<Output = Result<Bytes, WeaverError>> + 'a { 2601 async move { 2602 use weaver_api::com_atproto::sync::get_blob::GetBlob; 2603 2604 let pds_url = self.pds_for_did(did).await.map_err(|e| { 2605 AgentError::from(ClientError::from(e).with_context("Failed to resolve PDS for DID")) 2606 })?; 2607 2608 let request = GetBlob::new().did(did.clone()).cid(cid.clone()).build(); 2609 2610 let response = self 2611 .xrpc(pds_url) 2612 .send(&request) 2613 .await 2614 .map_err(|e| AgentError::from(ClientError::from(e)))?; 2615 2616 let output = response.into_output().map_err(|e| AgentError::xrpc(e))?; 2617 2618 Ok(output.body) 2619 } 2620 } 2621} 2622 2623/// A version of a record from a collaborator's repository. 2624#[derive(Debug, Clone)] 2625pub struct CollaboratorVersion<'a> { 2626 /// The DID of the collaborator who owns this version. 2627 pub did: Did<'a>, 2628 /// The full URI of this version. 2629 pub uri: AtUri<'a>, 2630 /// CID of this version. 2631 pub cid: jacquard::types::string::Cid<'a>, 2632 /// When this version was last updated. 2633 pub updated_at: Option<jacquard::types::string::Datetime>, 2634 /// The raw record value. 2635 pub value: jacquard::Data<'a>, 2636} 2637 2638/// Information about a peer discovered from session records. 2639#[derive(Debug, Clone)] 2640pub struct SessionPeer<'a> { 2641 /// The peer's DID. 2642 pub did: Did<'a>, 2643 /// The peer's iroh NodeId (z-base32 encoded). 2644 pub node_id: SmolStr, 2645 /// Optional relay URL for browser clients. 2646 pub relay_url: Option<SmolStr>, 2647 /// When the session was created. 2648 pub created_at: jacquard::types::string::Datetime, 2649 /// When the session expires (if set). 2650 pub expires_at: Option<jacquard::types::string::Datetime>, 2651} 2652 2653impl<T: AgentSession + IdentityResolver + XrpcExt> WeaverExt for T {}