atproto blogging
1use jacquard::types::ident::AtIdentifier;
2// Re-export view types for use elsewhere
3pub use weaver_api::sh_weaver::notebook::{
4 AuthorListView, BookEntryRef, BookEntryView, EntryView, NotebookView, PermissionGrant,
5 PermissionsState,
6};
7
8// Re-export jacquard for convenience
9use crate::constellation::{GetBacklinksQuery, RecordId};
10use crate::error::WeaverError;
11#[allow(unused_imports)]
12use crate::{PublishResult, W_TICKER, normalize_title_path};
13pub use jacquard;
14use jacquard::bytes::Bytes;
15#[allow(unused_imports)]
16use jacquard::client::{AgentError, AgentErrorKind, AgentSession, AgentSessionExt};
17use jacquard::error::ClientError;
18use jacquard::prelude::*;
19use jacquard::smol_str::SmolStr;
20use jacquard::types::blob::{BlobRef, MimeType};
21use jacquard::types::string::{AtUri, Datetime, Did, RecordKey, Rkey};
22#[allow(unused_imports)]
23use jacquard::types::tid::Tid;
24use jacquard::types::uri::Uri;
25use jacquard::url::Url;
26use jacquard::{CowStr, IntoStatic};
27use mime_sniffer::MimeTypeSniffer;
28#[allow(unused_imports)]
29use std::path::Path;
30use weaver_api::com_atproto::repo::strong_ref::StrongRef;
31use weaver_api::sh_weaver::notebook::entry;
32use weaver_api::sh_weaver::publish::blob::Blob as PublishedBlob;
33
34const CONSTELLATION_URL: &str = "https://constellation.microcosm.blue";
35
36/// Strip trailing punctuation that URL parsers commonly eat
37/// (period, comma, semicolon, colon, exclamation, question mark)
38fn strip_trailing_punctuation(s: &str) -> &str {
39 s.trim_end_matches(['.', ',', ';', ':', '!', '?'])
40}
41
42/// Check if a search term matches a value, with fallback to stripped punctuation
43pub fn title_matches(value: &str, search: &str) -> bool {
44 // Exact match first
45 if value == search {
46 return true;
47 }
48 // Try with trailing punctuation stripped from search term
49 let stripped_search = strip_trailing_punctuation(search);
50 if stripped_search != search && value == stripped_search {
51 return true;
52 }
53 // Try with trailing punctuation stripped from value (for titles ending in punctuation)
54 let stripped_value = strip_trailing_punctuation(value);
55 if stripped_value != value && stripped_value == search {
56 return true;
57 }
58 false
59}
60
61/// Extension trait providing weaver-specific multi-step operations on Agent
62///
63/// This trait extends jacquard's Agent with notebook-specific workflows that
64/// involve multiple atproto operations (uploading blobs, creating records, etc.)
65///
66/// For single-step operations, use jacquard's built-in methods directly:
67/// - `agent.create_record()` - Create a single record
68/// - `agent.get_record()` - Get a single record
69/// - `agent.upload_blob()` - Upload a single blob
70///
71/// This trait is for multi-step workflows that coordinate between multiple operations.
72pub trait WeaverExt: AgentSessionExt + XrpcExt + Send + Sync + Sized {
73 /// Publish a blob to the user's PDS
74 ///
75 /// Multi-step workflow:
76 /// 1. Upload blob to PDS
77 /// 2. Create blob record with CID
78 ///
79 /// Returns the AT-URI of the published blob
80 fn publish_blob<'a>(
81 &'a self,
82 blob: Bytes,
83 url_path: &'a str,
84 rkey: Option<RecordKey<Rkey<'a>>>,
85 ) -> impl Future<Output = Result<(StrongRef<'a>, PublishedBlob<'a>), WeaverError>> + 'a {
86 async move {
87 let mime_type =
88 MimeType::new_owned(blob.sniff_mime_type().unwrap_or("application/octet-stream"));
89
90 let blob = self.upload_blob(blob, mime_type.into_static()).await?;
91 let publish_record = PublishedBlob::new()
92 .path(url_path)
93 .upload(BlobRef::Blob(blob))
94 .build();
95 let record_key = match rkey {
96 Some(key) => key,
97 None => {
98 let tid = W_TICKER.lock().await.next(None);
99 RecordKey(Rkey::new_owned(tid.as_str())?)
100 }
101 };
102 let record = self
103 .create_record(publish_record.clone(), Some(record_key))
104 .await?;
105 let strong_ref = StrongRef::new().uri(record.uri).cid(record.cid).build();
106
107 Ok((strong_ref, publish_record))
108 }
109 }
110
111 fn confirm_record_ref<'a>(
112 &'a self,
113 uri: &'a AtUri<'a>,
114 ) -> impl Future<Output = Result<StrongRef<'static>, WeaverError>> + 'a {
115 async move {
116 let record = self.fetch_record_slingshot(uri).await?;
117 let cid = record.cid.ok_or_else(|| {
118 AgentError::from(ClientError::invalid_request("Record missing CID"))
119 })?;
120 Ok(StrongRef::new()
121 .uri(record.uri.into_static())
122 .cid(cid.into_static())
123 .build())
124 }
125 }
126
127 /// Fetch a notebook by URI and return its entry list
128 ///
129 /// Returns Ok(Some((uri, entry_list))) if the notebook exists and can be parsed,
130 /// Ok(None) if the notebook doesn't exist,
131 /// Err if there's a network or parsing error.
132 fn get_notebook_by_uri(
133 &self,
134 uri: &str,
135 ) -> impl Future<Output = Result<Option<(AtUri<'static>, Vec<StrongRef<'static>>)>, WeaverError>>
136 where
137 Self: Sized,
138 {
139 async move {
140 use weaver_api::sh_weaver::notebook::book::Book;
141
142 let at_uri = AtUri::new(uri).map_err(|e| {
143 WeaverError::InvalidNotebook(format!("Invalid notebook URI: {}", e))
144 })?;
145
146 let response = match self.get_record::<Book>(&at_uri).await {
147 Ok(r) => r,
148 Err(_) => return Ok(None), // Notebook doesn't exist
149 };
150
151 let output = match response.into_output() {
152 Ok(o) => o,
153 Err(_) => return Ok(None), // Failed to parse
154 };
155
156 let entries = output
157 .value
158 .entry_list
159 .iter()
160 .cloned()
161 .map(IntoStatic::into_static)
162 .collect();
163
164 Ok(Some((at_uri.into_static(), entries)))
165 }
166 }
167
168 /// Find or create a notebook by title, returning its URI and entry list
169 ///
170 /// If the notebook doesn't exist, creates it with the given DID as author.
171 fn upsert_notebook(
172 &self,
173 title: &str,
174 author_did: &Did<'_>,
175 ) -> impl Future<Output = Result<(AtUri<'static>, Vec<StrongRef<'static>>), WeaverError>>
176 where
177 Self: Sized,
178 {
179 async move {
180 use jacquard::types::collection::Collection;
181 use jacquard::types::nsid::Nsid;
182 use weaver_api::com_atproto::repo::list_records::ListRecords;
183 use weaver_api::sh_weaver::notebook::book::Book;
184
185 // Find the PDS for this DID
186 let pds_url = self.pds_for_did(author_did).await.map_err(|e| {
187 AgentError::from(ClientError::from(e).with_context("Failed to resolve PDS for DID"))
188 })?;
189
190 // Search for existing notebook with this title (paginated)
191 let mut cursor: Option<CowStr<'static>> = None;
192 loop {
193 let resp = self
194 .xrpc(pds_url.clone())
195 .send(
196 &ListRecords::new()
197 .repo(author_did.clone())
198 .collection(Nsid::raw(Book::NSID))
199 .limit(100)
200 .maybe_cursor(cursor.clone())
201 .build(),
202 )
203 .await
204 .map_err(|e| AgentError::from(ClientError::from(e)))?;
205
206 let list = match resp.parse() {
207 Ok(l) => l,
208 Err(_) => break, // Parse error, stop searching
209 };
210
211 for record in list.records {
212 let notebook: Book = jacquard::from_data(&record.value).map_err(|_| {
213 AgentError::from(ClientError::invalid_request(
214 "Failed to parse notebook record",
215 ))
216 })?;
217 if let Some(book_title) = notebook.title
218 && book_title == title
219 {
220 let entries = notebook
221 .entry_list
222 .iter()
223 .cloned()
224 .map(IntoStatic::into_static)
225 .collect();
226 return Ok((record.uri.into_static(), entries));
227 }
228 }
229
230 match list.cursor {
231 Some(c) => cursor = Some(c.into_static()),
232 None => break, // No more pages
233 }
234 }
235
236 // Notebook doesn't exist, create it
237 use weaver_api::sh_weaver::actor::Author;
238 let path = normalize_title_path(title);
239 let author = Author::new().did(author_did.clone()).build();
240 let book = Book::new()
241 .authors(vec![author])
242 .entry_list(vec![])
243 .maybe_title(Some(title.into()))
244 .maybe_path(Some(path.into()))
245 .maybe_created_at(Some(jacquard::types::string::Datetime::now()))
246 .build();
247
248 let response = self.create_record(book, None).await?;
249 Ok((response.uri, Vec::new()))
250 }
251 }
252
253 /// Find or create an entry within a notebook (with pre-fetched notebook data)
254 ///
255 /// This variant accepts notebook URI and entry_refs directly to avoid redundant
256 /// notebook lookups when the caller has already fetched this data.
257 ///
258 /// Returns (entry_ref, notebook_uri, was_created)
259 fn upsert_entry_with_notebook(
260 &self,
261 notebook_uri: AtUri<'static>,
262 entry_refs: Vec<StrongRef<'static>>,
263 entry_title: &str,
264 entry: entry::Entry<'_>,
265 existing_rkey: Option<&str>,
266 ) -> impl Future<Output = Result<(StrongRef<'static>, AtUri<'static>, bool), WeaverError>>
267 where
268 Self: Sized,
269 {
270 async move {
271 // If we have an existing rkey, try to find and update that specific entry
272 if let Some(rkey) = existing_rkey {
273 // Check if this entry exists in the notebook by comparing rkeys
274 for entry_ref in &entry_refs {
275 let ref_rkey = entry_ref.uri.rkey().map(|r| r.0.as_str());
276 if ref_rkey == Some(rkey) {
277 // Found it - update
278 let output = self
279 .update_record::<entry::Entry>(&entry_ref.uri, |e| {
280 e.content = entry.content.clone();
281 e.title = entry.title.clone();
282 e.path = entry.path.clone();
283 e.embeds = entry.embeds.clone();
284 e.tags = entry.tags.clone();
285 })
286 .await?;
287 let updated_ref = StrongRef::new()
288 .uri(output.uri.into_static())
289 .cid(output.cid.into_static())
290 .build();
291 return Ok((updated_ref, notebook_uri, false));
292 }
293 }
294
295 // Entry with this rkey not in notebook - create with specific rkey
296 let response = self
297 .create_record(entry, Some(RecordKey::any(rkey)?))
298 .await?;
299 let new_ref = StrongRef::new()
300 .uri(response.uri.clone().into_static())
301 .cid(response.cid.clone().into_static())
302 .build();
303
304 use weaver_api::sh_weaver::notebook::book::Book;
305 let notebook_entry_ref = StrongRef::new()
306 .uri(response.uri.into_static())
307 .cid(response.cid.into_static())
308 .build();
309
310 self.update_record::<Book>(¬ebook_uri, |book| {
311 book.entry_list.push(notebook_entry_ref);
312 })
313 .await?;
314
315 return Ok((new_ref, notebook_uri, true));
316 }
317
318 // No existing rkey - use title-based matching
319
320 // Fast path: if notebook is empty, skip search and create directly
321 if entry_refs.is_empty() {
322 let response = self.create_record(entry, None).await?;
323 let new_ref = StrongRef::new()
324 .uri(response.uri.clone().into_static())
325 .cid(response.cid.clone().into_static())
326 .build();
327
328 use weaver_api::sh_weaver::notebook::book::Book;
329 let notebook_entry_ref = StrongRef::new()
330 .uri(response.uri.into_static())
331 .cid(response.cid.into_static())
332 .build();
333
334 self.update_record::<Book>(¬ebook_uri, |book| {
335 book.entry_list.push(notebook_entry_ref);
336 })
337 .await?;
338
339 return Ok((new_ref, notebook_uri, true));
340 }
341
342 // Check if entry with this title exists in the notebook
343 // O(n) network calls - unavoidable without title indexing
344 for entry_ref in &entry_refs {
345 let existing = self
346 .get_record::<entry::Entry>(&entry_ref.uri)
347 .await
348 .map_err(|e| AgentError::from(ClientError::from(e)))?;
349 if let Ok(existing_entry) = existing.parse() {
350 if existing_entry.value.title == entry_title {
351 // Update existing entry
352 let output = self
353 .update_record::<entry::Entry>(&entry_ref.uri, |e| {
354 e.content = entry.content.clone();
355 e.embeds = entry.embeds.clone();
356 e.tags = entry.tags.clone();
357 e.updated_at = Some(Datetime::now());
358 })
359 .await?;
360 let updated_ref = StrongRef::new()
361 .uri(output.uri.into_static())
362 .cid(output.cid.into_static())
363 .build();
364 return Ok((updated_ref, notebook_uri, false));
365 }
366 }
367 }
368
369 // Entry doesn't exist, create it
370 let response = self.create_record(entry, None).await?;
371 let new_ref = StrongRef::new()
372 .uri(response.uri.clone().into_static())
373 .cid(response.cid.clone().into_static())
374 .build();
375
376 // Add to notebook's entry_list
377 use weaver_api::sh_weaver::notebook::book::Book;
378 let notebook_entry_ref = StrongRef::new()
379 .uri(response.uri.into_static())
380 .cid(response.cid.into_static())
381 .build();
382
383 self.update_record::<Book>(¬ebook_uri, |book| {
384 book.entry_list.push(notebook_entry_ref);
385 })
386 .await?;
387
388 Ok((new_ref, notebook_uri, true))
389 }
390 }
391
392 /// Find or create an entry within a notebook
393 ///
394 /// Multi-step workflow:
395 /// 1. Find the notebook by title
396 /// 2. If existing_rkey is provided, match by rkey; otherwise match by title
397 /// 3. If found: update the entry with new content
398 /// 4. If not found: create new entry and append to notebook's entry_list
399 ///
400 /// The `existing_rkey` parameter allows updating an entry even if its title changed,
401 /// and enables pre-generating rkeys for path rewriting before publish.
402 ///
403 /// Returns (entry_ref, notebook_uri, was_created)
404 fn upsert_entry(
405 &self,
406 notebook_title: &str,
407 entry_title: &str,
408 entry: entry::Entry<'_>,
409 existing_rkey: Option<&str>,
410 ) -> impl Future<Output = Result<(StrongRef<'static>, AtUri<'static>, bool), WeaverError>>
411 where
412 Self: Sized,
413 {
414 async move {
415 // Get our own DID
416 let (did, _) = self.session_info().await.ok_or_else(|| {
417 AgentError::from(ClientError::invalid_request("No session info available"))
418 })?;
419
420 // Find or create notebook
421 let (notebook_uri, entry_refs) = self.upsert_notebook(notebook_title, &did).await?;
422
423 // Delegate to the variant with pre-fetched notebook data
424 self.upsert_entry_with_notebook(
425 notebook_uri,
426 entry_refs,
427 entry_title,
428 entry,
429 existing_rkey,
430 )
431 .await
432 }
433 }
434
435 /// View functions - generic versions that work with any Agent
436
437 /// Fetch a notebook and construct NotebookView with author profiles
438 #[cfg(feature = "use-index")]
439 fn view_notebook(
440 &self,
441 uri: &AtUri<'_>,
442 ) -> impl Future<Output = Result<(NotebookView<'static>, Vec<StrongRef<'static>>), WeaverError>>
443 where
444 Self: Sized,
445 {
446 async move {
447 use weaver_api::sh_weaver::notebook::get_notebook::GetNotebook;
448
449 let resp = self
450 .send(GetNotebook::new().notebook(uri.clone()).build())
451 .await
452 .map_err(|e| AgentError::from(ClientError::from(e)))?;
453
454 let output = resp.into_output().map_err(|e| {
455 AgentError::from(ClientError::invalid_request(format!(
456 "Failed to get notebook: {}",
457 e
458 )))
459 })?;
460
461 Ok((
462 output.notebook.into_static(),
463 output
464 .entries
465 .into_iter()
466 .map(IntoStatic::into_static)
467 .collect(),
468 ))
469 }
470 }
471
472 #[cfg(not(feature = "use-index"))]
473 fn view_notebook(
474 &self,
475 uri: &AtUri<'_>,
476 ) -> impl Future<Output = Result<(NotebookView<'static>, Vec<StrongRef<'static>>), WeaverError>>
477 where
478 Self: Sized,
479 {
480 async move {
481 use jacquard::to_data;
482 use weaver_api::sh_weaver::notebook::AuthorListView;
483 use weaver_api::sh_weaver::notebook::book::Book;
484
485 let notebook = self
486 .get_record::<Book>(uri)
487 .await
488 .map_err(|e| AgentError::from(e))?
489 .into_output()
490 .map_err(|_| {
491 AgentError::from(ClientError::invalid_request("Failed to parse Book record"))
492 })?;
493
494 let title = notebook.value.title.clone();
495 let tags = notebook.value.tags.clone();
496 let path = notebook.value.path.clone();
497
498 let mut authors = Vec::new();
499 use weaver_api::app_bsky::actor::{
500 ProfileViewDetailed, get_profile::GetProfile, profile::Profile as BskyProfile,
501 };
502 use weaver_api::sh_weaver::actor::{
503 ProfileDataView, ProfileDataViewInner, ProfileView,
504 profile::Profile as WeaverProfile,
505 };
506
507 for (index, author) in notebook.value.authors.iter().enumerate() {
508 let ident = AtIdentifier::Did(author.did.clone());
509 let (profile_uri, profile_view) = self.hydrate_profile_view(&ident).await?;
510 authors.push(
511 AuthorListView::new()
512 .maybe_uri(profile_uri)
513 .record(profile_view)
514 .index(index as i64)
515 .build(),
516 );
517 }
518 let entries = notebook
519 .value
520 .entry_list
521 .iter()
522 .cloned()
523 .map(IntoStatic::into_static)
524 .collect();
525
526 // Fetch permissions for this notebook
527 let permissions = self.get_permissions_for_resource(uri).await?;
528
529 Ok((
530 NotebookView::new()
531 .cid(notebook.cid.ok_or_else(|| {
532 AgentError::from(ClientError::invalid_request("Notebook missing CID"))
533 })?)
534 .uri(notebook.uri)
535 .indexed_at(jacquard::types::string::Datetime::now())
536 .maybe_title(title)
537 .maybe_path(path)
538 .maybe_tags(tags)
539 .authors(authors)
540 .permissions(permissions)
541 .record(to_data(¬ebook.value).map_err(|_| {
542 AgentError::from(ClientError::invalid_request(
543 "Failed to serialize notebook",
544 ))
545 })?)
546 .build(),
547 entries,
548 ))
549 }
550 }
551
552 /// Fetch an entry and construct EntryView
553 #[cfg(feature = "use-index")]
554 fn fetch_entry_view<'a>(
555 &self,
556 _notebook: &NotebookView<'a>,
557 entry_ref: &StrongRef<'_>,
558 ) -> impl Future<Output = Result<EntryView<'a>, WeaverError>>
559 where
560 Self: Sized,
561 {
562 async move {
563 use weaver_api::sh_weaver::notebook::get_entry::GetEntry;
564
565 let resp = self
566 .send(GetEntry::new().uri(entry_ref.uri.clone()).build())
567 .await
568 .map_err(|e| AgentError::from(ClientError::from(e)))?;
569
570 let output = resp.into_output().map_err(|e| {
571 AgentError::from(ClientError::invalid_request(format!(
572 "Failed to get entry: {}",
573 e
574 )))
575 })?;
576
577 Ok(output.value.into_static())
578 }
579 }
580
581 /// Fetch an entry and construct EntryView
582 #[cfg(not(feature = "use-index"))]
583 fn fetch_entry_view<'a>(
584 &self,
585 notebook: &NotebookView<'a>,
586 entry_ref: &StrongRef<'_>,
587 ) -> impl Future<Output = Result<EntryView<'a>, WeaverError>>
588 where
589 Self: Sized,
590 {
591 async move {
592 use jacquard::to_data;
593 use weaver_api::sh_weaver::notebook::entry::Entry;
594
595 let entry_uri = Entry::uri(entry_ref.uri.clone())
596 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid entry URI")))?;
597
598 // Get the rkey for version lookup
599 let rkey = entry_uri.rkey().ok_or_else(|| {
600 AgentError::from(ClientError::invalid_request("Entry URI missing rkey"))
601 })?;
602
603 // Fetch permissions for this entry (includes inherited notebook permissions)
604 let permissions = self.get_permissions_for_resource(&entry_uri).await?;
605
606 // Get all collaborators (owner + invited)
607 let owner_did = match entry_uri.authority() {
608 jacquard::types::ident::AtIdentifier::Did(d) => d.clone().into_static(),
609 jacquard::types::ident::AtIdentifier::Handle(h) => {
610 let (did, _) = self.pds_for_handle(h).await.map_err(|e| {
611 AgentError::from(
612 ClientError::from(e).with_context("Failed to resolve handle"),
613 )
614 })?;
615 did.into_static()
616 }
617 };
618 let collaborators = self
619 .find_collaborators_for_resource(&entry_uri)
620 .await
621 .unwrap_or_default();
622 let all_dids: Vec<Did<'static>> = std::iter::once(owner_did)
623 .chain(collaborators.into_iter())
624 .collect();
625
626 // Find all versions across collaborators, get latest by updatedAt
627 let versions = self
628 .find_all_versions(
629 <Entry as jacquard::types::collection::Collection>::NSID,
630 rkey.0.as_str(),
631 &all_dids,
632 )
633 .await
634 .unwrap_or_default();
635
636 // Use latest version if found, otherwise fall back to original entry_ref
637 let (entry_data, final_uri, final_cid) = if let Some(latest) = versions.first() {
638 // Deserialize from the latest version's value
639 let entry: Entry = jacquard::from_data(&latest.value).map_err(|_| {
640 AgentError::from(ClientError::invalid_request(
641 "Failed to deserialize latest entry",
642 ))
643 })?;
644 (entry.into_static(), latest.uri.clone(), latest.cid.clone())
645 } else {
646 // No versions found via find_all_versions, fetch directly
647 let entry = self.fetch_record(&entry_uri).await?;
648 let cid = entry.cid.ok_or_else(|| {
649 AgentError::from(ClientError::invalid_request("Entry missing CID"))
650 })?;
651 (
652 entry.value.into_static(),
653 entry.uri.into_static(),
654 cid.into_static(),
655 )
656 };
657
658 let title = entry_data.title.clone();
659 let path = entry_data.path.clone();
660 let tags = entry_data.tags.clone();
661
662 // Fetch contributors (evidence-based authors) for this entry
663 let contributor_dids = self.find_contributors_for_resource(&entry_uri).await?;
664 let mut authors = Vec::new();
665 for (index, did) in contributor_dids.iter().enumerate() {
666 let ident = AtIdentifier::Did(did.clone());
667 let (profile_uri, profile_view) = self.hydrate_profile_view(&ident).await?;
668 authors.push(
669 AuthorListView::new()
670 .maybe_uri(profile_uri)
671 .record(profile_view)
672 .index(index as i64)
673 .build(),
674 );
675 }
676
677 Ok(EntryView::new()
678 .cid(final_cid)
679 .uri(final_uri)
680 .indexed_at(jacquard::types::string::Datetime::now())
681 .record(to_data(&entry_data).map_err(|_| {
682 AgentError::from(ClientError::invalid_request("Failed to serialize entry"))
683 })?)
684 .maybe_tags(tags)
685 .title(title)
686 .path(path)
687 .authors(authors)
688 .permissions(permissions)
689 .build())
690 }
691 }
692
693 /// Search for an entry by title within a notebook's entry list
694 ///
695 /// O(n) network calls - unavoidable without title indexing.
696 /// Breaks early on match to minimize unnecessary fetches.
697 fn entry_by_title<'a>(
698 &self,
699 notebook: &NotebookView<'a>,
700 entries: &[StrongRef<'_>],
701 title: &str,
702 ) -> impl Future<Output = Result<Option<(BookEntryView<'a>, entry::Entry<'a>)>, WeaverError>>
703 where
704 Self: Sized,
705 {
706 async move {
707 use weaver_api::sh_weaver::notebook::BookEntryRef;
708 use weaver_api::sh_weaver::notebook::entry::Entry;
709
710 for (index, entry_ref) in entries.iter().enumerate() {
711 let resp = self
712 .get_record::<Entry>(&entry_ref.uri)
713 .await
714 .map_err(|e| AgentError::from(e))?;
715 if let Ok(entry) = resp.parse() {
716 let path_matches = title_matches(entry.value.path.as_ref(), title);
717 let title_field_matches = title_matches(entry.value.title.as_ref(), title);
718 if path_matches || title_field_matches {
719 // Build BookEntryView with prev/next
720 let entry_view = self.fetch_entry_view(notebook, entry_ref).await?;
721
722 let prev_entry = if index > 0 {
723 let prev_entry_ref = &entries[index - 1];
724 self.fetch_entry_view(notebook, prev_entry_ref).await.ok()
725 } else {
726 None
727 }
728 .map(|e| BookEntryRef::new().entry(e).build());
729
730 let next_entry = if index < entries.len() - 1 {
731 let next_entry_ref = &entries[index + 1];
732 self.fetch_entry_view(notebook, next_entry_ref).await.ok()
733 } else {
734 None
735 }
736 .map(|e| BookEntryRef::new().entry(e).build());
737
738 let book_entry_view = BookEntryView::new()
739 .entry(entry_view)
740 .maybe_next(next_entry)
741 .maybe_prev(prev_entry)
742 .index(index as i64)
743 .build();
744
745 return Ok(Some((book_entry_view, entry.value.into_static())));
746 }
747 }
748 }
749 Ok(None)
750 }
751 }
752
753 /// Search for a notebook by title for a given DID or handle
754 #[cfg(feature = "use-index")]
755 fn notebook_by_title(
756 &self,
757 ident: &jacquard::types::ident::AtIdentifier<'_>,
758 title: &str,
759 ) -> impl Future<
760 Output = Result<Option<(NotebookView<'static>, Vec<BookEntryView<'static>>)>, WeaverError>,
761 >
762 where
763 Self: Sized,
764 {
765 async move {
766 use weaver_api::sh_weaver::notebook::resolve_notebook::ResolveNotebook;
767
768 let resp = self
769 .send(
770 ResolveNotebook::new()
771 .actor(ident.clone())
772 .name(title)
773 .build(),
774 )
775 .await
776 .map_err(|e| AgentError::from(ClientError::from(e)))?;
777
778 match resp.into_output() {
779 Ok(output) => Ok(Some((
780 output.notebook.into_static(),
781 output.entries.into_static(),
782 ))),
783 Err(_) => Ok(None),
784 }
785 }
786 }
787
788 /// Search for a notebook by title for a given DID or handle
789 #[cfg(not(feature = "use-index"))]
790 fn notebook_by_title(
791 &self,
792 ident: &jacquard::types::ident::AtIdentifier<'_>,
793 title: &str,
794 ) -> impl Future<
795 Output = Result<Option<(NotebookView<'static>, Vec<BookEntryView<'static>>)>, WeaverError>,
796 >
797 where
798 Self: Sized,
799 {
800 async move {
801 use jacquard::types::collection::Collection;
802 use jacquard::types::nsid::Nsid;
803 use weaver_api::com_atproto::repo::list_records::ListRecords;
804 use weaver_api::sh_weaver::notebook::AuthorListView;
805 use weaver_api::sh_weaver::notebook::book::Book;
806
807 let (repo_did, pds_url) = match ident {
808 jacquard::types::ident::AtIdentifier::Did(did) => {
809 let pds = self.pds_for_did(did).await.map_err(|e| {
810 AgentError::from(
811 ClientError::from(e).with_context("Failed to resolve PDS for DID"),
812 )
813 })?;
814 (did.clone(), pds)
815 }
816 jacquard::types::ident::AtIdentifier::Handle(handle) => {
817 self.pds_for_handle(handle).await.map_err(|e| {
818 AgentError::from(
819 ClientError::from(e).with_context("Failed to resolve handle"),
820 )
821 })?
822 }
823 };
824
825 // Search with pagination
826 let mut cursor: Option<CowStr<'static>> = None;
827 loop {
828 let resp = self
829 .xrpc(pds_url.clone())
830 .send(
831 &ListRecords::new()
832 .repo(repo_did.clone())
833 .collection(Nsid::raw(Book::NSID))
834 .limit(100)
835 .maybe_cursor(cursor.clone())
836 .build(),
837 )
838 .await
839 .map_err(|e| AgentError::from(ClientError::from(e)))?;
840
841 let list = match resp.parse() {
842 Ok(l) => l,
843 Err(_) => break,
844 };
845
846 for record in list.records {
847 let notebook: Book = jacquard::from_data(&record.value).map_err(|_| {
848 AgentError::from(ClientError::invalid_request(
849 "Failed to parse notebook record",
850 ))
851 })?;
852
853 // Match on path first, then title (with trailing punctuation tolerance)
854 let matched_title = if let Some(ref path) = notebook.path
855 && title_matches(path.as_ref(), title)
856 {
857 Some(path.clone())
858 } else if let Some(ref book_title) = notebook.title
859 && title_matches(book_title.as_ref(), title)
860 {
861 Some(book_title.clone())
862 } else {
863 None
864 };
865
866 if let Some(matched) = matched_title {
867 let tags = notebook.tags.clone();
868 let path = notebook.path.clone();
869
870 let mut authors = Vec::new();
871 for (index, author) in notebook.authors.iter().enumerate() {
872 let ident = AtIdentifier::Did(author.did.clone());
873 let (profile_uri, profile_view) =
874 self.hydrate_profile_view(&ident).await?;
875 authors.push(
876 AuthorListView::new()
877 .maybe_uri(profile_uri)
878 .record(profile_view)
879 .index(index as i64)
880 .build(),
881 );
882 }
883
884 // TODO: Fix this - entries building is broken because we need NotebookView
885 // to call view_entry but we're still building the NotebookView
886 let entries = Vec::new(); // Temporarily empty
887
888 // let mut entries = Vec::with_capacity(notebook.entry_list.len());
889 // for (index, _) in notebook.entry_list.iter().enumerate() {
890 // let entry_view = self.view_entry(¬ebook_view, ¬ebook.entry_list, index).await?;
891 // entries.push(entry_view);
892 // }
893
894 // Fetch permissions for this notebook
895 let permissions = self.get_permissions_for_resource(&record.uri).await?;
896
897 return Ok(Some((
898 NotebookView::new()
899 .cid(record.cid)
900 .uri(record.uri)
901 .indexed_at(jacquard::types::string::Datetime::now())
902 .title(matched)
903 .maybe_path(path)
904 .maybe_tags(tags)
905 .authors(authors)
906 .permissions(permissions)
907 .record(record.value.clone())
908 .build()
909 .into_static(),
910 entries,
911 )));
912 }
913 }
914
915 match list.cursor {
916 Some(c) => cursor = Some(c.into_static()),
917 None => break,
918 }
919 }
920
921 Ok(None)
922 }
923 }
924
925 /// Hydrate a profile view from either weaver or bsky profile
926 #[cfg(feature = "use-index")]
927 fn hydrate_profile_view(
928 &self,
929 ident: &AtIdentifier<'_>,
930 ) -> impl Future<
931 Output = Result<
932 (
933 Option<AtUri<'static>>,
934 weaver_api::sh_weaver::actor::ProfileDataView<'static>,
935 ),
936 WeaverError,
937 >,
938 > {
939 async move {
940 use weaver_api::sh_weaver::actor::get_profile::GetProfile;
941
942 let resp = self
943 .send(GetProfile::new().actor(ident.clone()).build())
944 .await
945 .map_err(|e| AgentError::from(ClientError::from(e)))?;
946
947 let output = resp.into_output().map_err(|e| {
948 AgentError::from(ClientError::invalid_request(format!(
949 "Failed to get profile: {}",
950 e
951 )))
952 })?;
953
954 // URI is goofy in this signature, just return None for now
955 Ok((None, output.value.into_static()))
956 }
957 }
958
959 /// Hydrate a profile view from either weaver or bsky profile
960 #[cfg(not(feature = "use-index"))]
961 fn hydrate_profile_view(
962 &self,
963 ident: &AtIdentifier<'_>,
964 ) -> impl Future<
965 Output = Result<
966 (
967 Option<AtUri<'static>>,
968 weaver_api::sh_weaver::actor::ProfileDataView<'static>,
969 ),
970 WeaverError,
971 >,
972 > {
973 async move {
974 use weaver_api::app_bsky::actor::{
975 get_profile::GetProfile, profile::Profile as BskyProfile,
976 };
977 use weaver_api::sh_weaver::actor::{
978 ProfileDataView, ProfileDataViewInner, ProfileView,
979 profile::Profile as WeaverProfile,
980 };
981
982 // Resolve identifier to DID and handle
983 let (did, handle) = match ident {
984 AtIdentifier::Did(d) => {
985 let handles = self.resolve_did_doc_owned(d).await?.handles();
986 let h = handles.first().ok_or_else(|| {
987 AgentError::from(ClientError::invalid_request("couldn't resolve handle"))
988 })?;
989 (d.clone().into_static(), h.clone())
990 }
991 AtIdentifier::Handle(h) => {
992 let d = self.resolve_handle(h).await?;
993 (d.into_static(), h.clone().into_static())
994 }
995 };
996
997 // Try weaver profile first
998 let weaver_uri =
999 WeaverProfile::uri(format!("at://{}/sh.weaver.actor.profile/self", did)).map_err(
1000 |_| {
1001 AgentError::from(ClientError::invalid_request("Invalid weaver profile URI"))
1002 },
1003 )?;
1004 let weaver_future = async {
1005 if let Ok(weaver_record) = self.fetch_record(&weaver_uri).await {
1006 // Convert blobs to CDN URLs
1007 let avatar = weaver_record
1008 .value
1009 .avatar
1010 .as_ref()
1011 .map(|blob| {
1012 let cid = blob.blob().cid();
1013 jacquard::types::string::Uri::new_owned(format!(
1014 "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg",
1015 did, cid
1016 ))
1017 })
1018 .transpose()
1019 .map_err(|_| {
1020 AgentError::from(ClientError::invalid_request("Invalid avatar URI"))
1021 })?;
1022 let banner = weaver_record
1023 .value
1024 .banner
1025 .as_ref()
1026 .map(|blob| {
1027 let cid = blob.blob().cid();
1028 jacquard::types::string::Uri::new_owned(format!(
1029 "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg",
1030 did, cid
1031 ))
1032 })
1033 .transpose()
1034 .map_err(|_| {
1035 AgentError::from(ClientError::invalid_request("Invalid banner URI"))
1036 })?;
1037
1038 let profile_view = ProfileView::new()
1039 .did(did.clone())
1040 .handle(handle.clone())
1041 .maybe_display_name(weaver_record.value.display_name.clone())
1042 .maybe_description(weaver_record.value.description.clone())
1043 .maybe_avatar(avatar)
1044 .maybe_banner(banner)
1045 .maybe_bluesky(weaver_record.value.bluesky)
1046 .maybe_tangled(weaver_record.value.tangled)
1047 .maybe_streamplace(weaver_record.value.streamplace)
1048 .maybe_location(weaver_record.value.location.clone())
1049 .maybe_links(weaver_record.value.links.clone())
1050 .maybe_pronouns(weaver_record.value.pronouns.clone())
1051 .maybe_pinned(weaver_record.value.pinned.clone())
1052 .indexed_at(jacquard::types::string::Datetime::now())
1053 .maybe_created_at(weaver_record.value.created_at)
1054 .build();
1055
1056 Ok((
1057 Some(weaver_uri.as_uri().clone().into_static()),
1058 ProfileDataView::new()
1059 .inner(ProfileDataViewInner::ProfileView(Box::new(profile_view)))
1060 .build()
1061 .into_static(),
1062 ))
1063 } else {
1064 Err(WeaverError::Agent(
1065 ClientError::invalid_request("Invalid weaver profile URI").into(),
1066 ))
1067 }
1068 };
1069 let bsky_appview_future = async {
1070 if let Ok(bsky_resp) = self
1071 .send(GetProfile::new().actor(did.clone()).build())
1072 .await
1073 {
1074 if let Ok(output) = bsky_resp.into_output() {
1075 let bsky_uri =
1076 BskyProfile::uri(format!("at://{}/app.bsky.actor.profile/self", did))
1077 .map_err(|_| {
1078 AgentError::from(ClientError::invalid_request(
1079 "Invalid bsky profile URI",
1080 ))
1081 })?;
1082 Ok((
1083 Some(bsky_uri.as_uri().clone().into_static()),
1084 ProfileDataView::new()
1085 .inner(ProfileDataViewInner::ProfileViewDetailed(Box::new(
1086 output.value,
1087 )))
1088 .build()
1089 .into_static(),
1090 ))
1091 } else {
1092 Err(WeaverError::Agent(
1093 ClientError::invalid_request("Invalid bsky profile URI").into(),
1094 ))
1095 }
1096 } else {
1097 Err(WeaverError::Agent(
1098 ClientError::invalid_request("Invalid bsky profile URI").into(),
1099 ))
1100 }
1101 };
1102
1103 if let Ok((profile_uri, weaver_profileview)) = weaver_future.await {
1104 return Ok((profile_uri, weaver_profileview));
1105 } else if let Ok((profile_uri, bsky_profileview)) = bsky_appview_future.await {
1106 return Ok((profile_uri, bsky_profileview));
1107 } else {
1108 Err(WeaverError::Agent(AgentError::from(
1109 ClientError::invalid_request("couldn't fetch profile"),
1110 )))
1111 }
1112 }
1113 }
1114
1115 /// View an entry at a specific index with prev/next navigation
1116 #[cfg(feature = "use-index")]
1117 fn view_entry<'a>(
1118 &self,
1119 notebook: &NotebookView<'a>,
1120 _entries: &[StrongRef<'_>],
1121 index: usize,
1122 ) -> impl Future<Output = Result<BookEntryView<'a>, WeaverError>> {
1123 async move {
1124 use weaver_api::sh_weaver::notebook::get_book_entry::GetBookEntry;
1125
1126 let resp = self
1127 .send(
1128 GetBookEntry::new()
1129 .notebook(notebook.uri.clone())
1130 .index(index as i64)
1131 .build(),
1132 )
1133 .await
1134 .map_err(|e| AgentError::from(ClientError::from(e)))?;
1135
1136 let output = resp.into_output().map_err(|e| {
1137 AgentError::from(ClientError::invalid_request(format!(
1138 "Failed to get book entry: {}",
1139 e
1140 )))
1141 })?;
1142
1143 Ok(output.value.into_static())
1144 }
1145 }
1146
1147 /// View an entry at a specific index with prev/next navigation
1148 #[cfg(not(feature = "use-index"))]
1149 fn view_entry<'a>(
1150 &self,
1151 notebook: &NotebookView<'a>,
1152 entries: &[StrongRef<'_>],
1153 index: usize,
1154 ) -> impl Future<Output = Result<BookEntryView<'a>, WeaverError>> {
1155 async move {
1156 use weaver_api::sh_weaver::notebook::BookEntryRef;
1157
1158 let entry_ref = entries.get(index).ok_or_else(|| {
1159 AgentError::from(ClientError::invalid_request("entry out of bounds"))
1160 })?;
1161 let entry = self.fetch_entry_view(notebook, entry_ref).await?;
1162
1163 let prev_entry = if index > 0 {
1164 let prev_entry_ref = &entries[index - 1];
1165 self.fetch_entry_view(notebook, prev_entry_ref).await.ok()
1166 } else {
1167 None
1168 }
1169 .map(|e| BookEntryRef::new().entry(e).build());
1170
1171 let next_entry = if index < entries.len() - 1 {
1172 let next_entry_ref = &entries[index + 1];
1173 self.fetch_entry_view(notebook, next_entry_ref).await.ok()
1174 } else {
1175 None
1176 }
1177 .map(|e| BookEntryRef::new().entry(e).build());
1178
1179 Ok(BookEntryView::new()
1180 .entry(entry)
1181 .maybe_next(next_entry)
1182 .maybe_prev(prev_entry)
1183 .index(index as i64)
1184 .build())
1185 }
1186 }
1187
1188 /// View a page at a specific index with prev/next navigation
1189 fn view_page<'a>(
1190 &self,
1191 notebook: &NotebookView<'a>,
1192 pages: &[StrongRef<'_>],
1193 index: usize,
1194 ) -> impl Future<Output = Result<BookEntryView<'a>, WeaverError>> {
1195 async move {
1196 use weaver_api::sh_weaver::notebook::BookEntryRef;
1197
1198 let entry_ref = pages.get(index).ok_or_else(|| {
1199 AgentError::from(ClientError::invalid_request("entry out of bounds"))
1200 })?;
1201 let entry = self.fetch_page_view(notebook, entry_ref).await?;
1202
1203 let prev_entry = if index > 0 {
1204 let prev_entry_ref = &pages[index - 1];
1205 self.fetch_page_view(notebook, prev_entry_ref).await.ok()
1206 } else {
1207 None
1208 }
1209 .map(|e| BookEntryRef::new().entry(e).build());
1210
1211 let next_entry = if index < pages.len() - 1 {
1212 let next_entry_ref = &pages[index + 1];
1213 self.fetch_page_view(notebook, next_entry_ref).await.ok()
1214 } else {
1215 None
1216 }
1217 .map(|e| BookEntryRef::new().entry(e).build());
1218
1219 Ok(BookEntryView::new()
1220 .entry(entry)
1221 .maybe_next(next_entry)
1222 .maybe_prev(prev_entry)
1223 .index(index as i64)
1224 .build())
1225 }
1226 }
1227
1228 /// Fetch a page view (like fetch_entry_view but for pages)
1229 fn fetch_page_view<'a>(
1230 &self,
1231 notebook: &NotebookView<'a>,
1232 entry_ref: &StrongRef<'_>,
1233 ) -> impl Future<Output = Result<EntryView<'a>, WeaverError>>
1234 where
1235 Self: Sized,
1236 {
1237 async move {
1238 use jacquard::to_data;
1239 use weaver_api::sh_weaver::notebook::page::Page;
1240
1241 let page_uri = Page::uri(entry_ref.uri.clone())
1242 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid page URI")))?;
1243
1244 // Get the rkey for version lookup
1245 let rkey = page_uri.rkey().ok_or_else(|| {
1246 AgentError::from(ClientError::invalid_request("Page URI missing rkey"))
1247 })?;
1248
1249 // Fetch permissions for this page (includes inherited notebook permissions)
1250 let permissions = self.get_permissions_for_resource(&page_uri).await?;
1251
1252 // Get all collaborators (owner + invited)
1253 let owner_did = match page_uri.authority() {
1254 jacquard::types::ident::AtIdentifier::Did(d) => d.clone().into_static(),
1255 jacquard::types::ident::AtIdentifier::Handle(h) => {
1256 let (did, _) = self.pds_for_handle(h).await.map_err(|e| {
1257 AgentError::from(
1258 ClientError::from(e).with_context("Failed to resolve handle"),
1259 )
1260 })?;
1261 did.into_static()
1262 }
1263 };
1264 let collaborators = self
1265 .find_collaborators_for_resource(&page_uri)
1266 .await
1267 .unwrap_or_default();
1268 let all_dids: Vec<Did<'static>> = std::iter::once(owner_did)
1269 .chain(collaborators.into_iter())
1270 .collect();
1271
1272 // Find all versions across collaborators, get latest by updatedAt
1273 let versions = self
1274 .find_all_versions(
1275 <Page as jacquard::types::collection::Collection>::NSID,
1276 rkey.0.as_str(),
1277 &all_dids,
1278 )
1279 .await
1280 .unwrap_or_default();
1281
1282 // Use latest version if found, otherwise fall back to direct fetch
1283 let (page_data, final_uri, final_cid) = if let Some(latest) = versions.first() {
1284 let page: Page = jacquard::from_data(&latest.value).map_err(|_| {
1285 AgentError::from(ClientError::invalid_request(
1286 "Failed to deserialize latest page",
1287 ))
1288 })?;
1289 (page.into_static(), latest.uri.clone(), latest.cid.clone())
1290 } else {
1291 // No versions found, fetch directly from PDS
1292 let page = self.fetch_record(&page_uri).await?;
1293 let cid = page.cid.ok_or_else(|| {
1294 AgentError::from(ClientError::invalid_request("Page missing CID"))
1295 })?;
1296 (
1297 page.value.into_static(),
1298 page.uri.into_static(),
1299 cid.into_static(),
1300 )
1301 };
1302
1303 let title = page_data.title.clone();
1304 let tags = page_data.tags.clone();
1305
1306 // Fetch contributors (evidence-based authors) for this page
1307 let contributor_dids = self.find_contributors_for_resource(&page_uri).await?;
1308 let mut authors = Vec::new();
1309 for (index, did) in contributor_dids.iter().enumerate() {
1310 let (profile_uri, profile_view) = self
1311 .hydrate_profile_view(&AtIdentifier::Did(did.clone()))
1312 .await?;
1313 authors.push(
1314 AuthorListView::new()
1315 .maybe_uri(profile_uri)
1316 .record(profile_view)
1317 .index(index as i64)
1318 .build(),
1319 );
1320 }
1321
1322 Ok(EntryView::new()
1323 .cid(final_cid)
1324 .uri(final_uri)
1325 .indexed_at(jacquard::types::string::Datetime::now())
1326 .record(to_data(&page_data).map_err(|_| {
1327 AgentError::from(ClientError::invalid_request("Failed to serialize page"))
1328 })?)
1329 .maybe_tags(tags)
1330 .title(title)
1331 .authors(authors)
1332 .permissions(permissions)
1333 .build())
1334 }
1335 }
1336
1337 /// Find the notebook that contains a given entry using constellation backlinks.
1338 ///
1339 /// Queries constellation for `sh.weaver.notebook.book` records that reference
1340 /// the given entry URI via the `.entryList[].uri` path.
1341 fn find_notebook_for_entry(
1342 &self,
1343 entry_uri: &AtUri<'_>,
1344 ) -> impl Future<Output = Result<Option<RecordId<'static>>, WeaverError>>
1345 where
1346 Self: Sized,
1347 {
1348 async move {
1349 let (_, first) = self.find_notebooks_for_entry(entry_uri).await?;
1350 Ok(first)
1351 }
1352 }
1353
1354 /// Find notebooks containing an entry, returning count and optionally the first one.
1355 ///
1356 /// Uses constellation backlinks to reverse lookup. Returns:
1357 /// - total count of notebooks containing this entry
1358 /// - The first notebook RecordId (if any exist)
1359 fn find_notebooks_for_entry(
1360 &self,
1361 entry_uri: &AtUri<'_>,
1362 ) -> impl Future<Output = Result<(u64, Option<RecordId<'static>>), WeaverError>>
1363 where
1364 Self: Sized,
1365 {
1366 async move {
1367 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| {
1368 AgentError::from(ClientError::invalid_request(format!(
1369 "Invalid constellation URL: {}",
1370 e
1371 )))
1372 })?;
1373
1374 // Query with limit 2 - we only need to know if there's more than 1
1375 let query = GetBacklinksQuery {
1376 subject: Uri::At(entry_uri.clone().into_static()),
1377 source: "sh.weaver.notebook.book:.entryList[].uri".into(),
1378 cursor: None,
1379 did: vec![],
1380 limit: 2,
1381 };
1382
1383 let response = self
1384 .xrpc(constellation_url)
1385 .send(&query)
1386 .await
1387 .map_err(|e| {
1388 AgentError::from(ClientError::invalid_request(format!(
1389 "Constellation query failed: {}",
1390 e
1391 )))
1392 })?;
1393
1394 let output = response.into_output().map_err(|e| {
1395 AgentError::from(ClientError::invalid_request(format!(
1396 "Failed to parse constellation response: {}",
1397 e
1398 )))
1399 })?;
1400
1401 Ok((
1402 output.total,
1403 output.records.into_iter().next().map(|r| r.into_static()),
1404 ))
1405 }
1406 }
1407
1408 /// Fetch an entry directly by its rkey, returning the EntryView and raw Entry.
1409 ///
1410 /// This bypasses notebook context entirely - useful for standalone entries
1411 /// or when you have the rkey but not the notebook.
1412 #[cfg(feature = "use-index")]
1413 fn fetch_entry_by_rkey(
1414 &self,
1415 ident: &jacquard::types::ident::AtIdentifier<'_>,
1416 rkey: &str,
1417 ) -> impl Future<Output = Result<(EntryView<'static>, entry::Entry<'static>), WeaverError>>
1418 where
1419 Self: Sized,
1420 {
1421 async move {
1422 use jacquard::types::collection::Collection;
1423 use weaver_api::sh_weaver::notebook::get_entry::GetEntry;
1424
1425 // Build entry URI from ident + rkey
1426 let entry_uri_str = format!("at://{}/{}/{}", ident, entry::Entry::NSID, rkey);
1427 let entry_uri = AtUri::new(&entry_uri_str)
1428 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid entry URI")))?
1429 .into_static();
1430
1431 let resp = self
1432 .send(GetEntry::new().uri(entry_uri).build())
1433 .await
1434 .map_err(|e| AgentError::from(ClientError::from(e)))?;
1435
1436 let output = resp.into_output().map_err(|e| {
1437 AgentError::from(ClientError::invalid_request(format!(
1438 "Failed to get entry: {}",
1439 e
1440 )))
1441 })?;
1442
1443 // Clone the record for deserialization so we can consume output.value
1444 let record_clone = output.value.record.clone();
1445
1446 // Deserialize Entry from the cloned record
1447 let entry_value: entry::Entry = jacquard::from_data(&record_clone).map_err(|e| {
1448 AgentError::from(ClientError::invalid_request(format!(
1449 "Failed to deserialize entry record: {}",
1450 e
1451 )))
1452 })?;
1453
1454 Ok((output.value.into_static(), entry_value.into_static()))
1455 }
1456 }
1457
1458 /// Fetch an entry directly by its rkey, returning the EntryView and raw Entry.
1459 ///
1460 /// This bypasses notebook context entirely - useful for standalone entries
1461 /// or when you have the rkey but not the notebook.
1462 #[cfg(not(feature = "use-index"))]
1463 fn fetch_entry_by_rkey(
1464 &self,
1465 ident: &jacquard::types::ident::AtIdentifier<'_>,
1466 rkey: &str,
1467 ) -> impl Future<Output = Result<(EntryView<'static>, entry::Entry<'static>), WeaverError>>
1468 where
1469 Self: Sized,
1470 {
1471 async move {
1472 use jacquard::to_data;
1473 use jacquard::types::collection::Collection;
1474
1475 // Resolve DID from ident
1476 let repo_did = match ident {
1477 jacquard::types::ident::AtIdentifier::Did(did) => did.clone(),
1478 jacquard::types::ident::AtIdentifier::Handle(handle) => {
1479 let (did, _pds) = self.pds_for_handle(handle).await.map_err(|e| {
1480 AgentError::from(
1481 ClientError::from(e).with_context("Failed to resolve handle"),
1482 )
1483 })?;
1484 did
1485 }
1486 };
1487
1488 // Build entry URI for contributor/permission queries
1489 let entry_uri_str = format!("at://{}/{}/{}", repo_did, entry::Entry::NSID, rkey);
1490 let entry_uri = AtUri::new(&entry_uri_str)
1491 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid entry URI")))?
1492 .into_static();
1493
1494 // Get collaborators for version lookup
1495 let collaborators = self
1496 .find_collaborators_for_resource(&entry_uri)
1497 .await
1498 .unwrap_or_default();
1499 let all_dids: Vec<Did<'static>> = std::iter::once(repo_did.clone().into_static())
1500 .chain(collaborators.into_iter())
1501 .collect();
1502
1503 // Find all versions across collaborators, get latest by updatedAt
1504 let versions = self
1505 .find_all_versions(entry::Entry::NSID, rkey, &all_dids)
1506 .await
1507 .unwrap_or_default();
1508
1509 // Use latest version if found, otherwise fetch directly from original ident
1510 let (entry_value, final_uri, final_cid) = if let Some(latest) = versions.first() {
1511 let entry: entry::Entry = jacquard::from_data(&latest.value).map_err(|e| {
1512 AgentError::from(ClientError::invalid_request(format!(
1513 "Failed to deserialize latest entry: {}",
1514 e
1515 )))
1516 })?;
1517 (entry.into_static(), latest.uri.clone(), latest.cid.clone())
1518 } else {
1519 // Fallback: fetch directly via slingshot
1520 let record = self.fetch_record_slingshot(&entry_uri).await?;
1521
1522 let entry: entry::Entry = jacquard::from_data(&record.value).map_err(|e| {
1523 AgentError::from(ClientError::invalid_request(format!(
1524 "Failed to deserialize entry: {}",
1525 e
1526 )))
1527 })?;
1528
1529 let cid = record.cid.ok_or_else(|| {
1530 AgentError::from(ClientError::invalid_request("Entry missing CID"))
1531 })?;
1532
1533 (
1534 entry.into_static(),
1535 record.uri.into_static(),
1536 cid.into_static(),
1537 )
1538 };
1539
1540 // Fetch contributors (evidence-based authors)
1541 let contributor_dids = self.find_contributors_for_resource(&entry_uri).await?;
1542 let mut authors = Vec::new();
1543 for (index, did) in contributor_dids.iter().enumerate() {
1544 let ident = AtIdentifier::Did(did.clone());
1545 let (profile_uri, profile_view) = self.hydrate_profile_view(&ident).await?;
1546 authors.push(
1547 AuthorListView::new()
1548 .maybe_uri(profile_uri)
1549 .record(profile_view)
1550 .index(index as i64)
1551 .build(),
1552 );
1553 }
1554
1555 // Fetch permissions
1556 let permissions = self.get_permissions_for_resource(&entry_uri).await?;
1557
1558 let entry_view = EntryView::new()
1559 .cid(final_cid)
1560 .uri(final_uri)
1561 .indexed_at(jacquard::types::string::Datetime::now())
1562 .record(to_data(&entry_value).map_err(|_| {
1563 AgentError::from(ClientError::invalid_request("Failed to serialize entry"))
1564 })?)
1565 .maybe_tags(entry_value.tags.clone())
1566 .title(entry_value.title.clone())
1567 .path(entry_value.path.clone())
1568 .authors(authors)
1569 .permissions(permissions)
1570 .build()
1571 .into_static();
1572
1573 Ok((entry_view, entry_value.into_static()))
1574 }
1575 }
1576
1577 /// Find an entry's index within a notebook by rkey.
1578 ///
1579 /// Scans the notebook's entry_list comparing rkeys extracted from URIs.
1580 /// When found, builds BookEntryView with prev/next navigation.
1581 fn entry_in_notebook_by_rkey<'a>(
1582 &self,
1583 notebook: &NotebookView<'a>,
1584 entries: &[StrongRef<'_>],
1585 rkey: &str,
1586 ) -> impl Future<Output = Result<Option<BookEntryView<'a>>, WeaverError>> {
1587 async move {
1588 use weaver_api::sh_weaver::notebook::BookEntryRef;
1589
1590 // Find the entry index by comparing rkeys
1591 let mut found_index = None;
1592 for (index, entry_ref) in entries.iter().enumerate() {
1593 // Extract rkey from the entry URI
1594 if let Ok(uri) = AtUri::new(entry_ref.uri.as_ref()) {
1595 if let Some(entry_rkey) = uri.rkey() {
1596 if entry_rkey.0.as_str() == rkey {
1597 found_index = Some(index);
1598 break;
1599 }
1600 }
1601 }
1602 }
1603
1604 let index = match found_index {
1605 Some(i) => i,
1606 None => return Ok(None),
1607 };
1608
1609 // Build BookEntryView with prev/next navigation
1610 let entry_ref = &entries[index];
1611 let entry = self.fetch_entry_view(notebook, entry_ref).await?;
1612
1613 let prev_entry = if index > 0 {
1614 let prev_entry_ref = &entries[index - 1];
1615 self.fetch_entry_view(notebook, prev_entry_ref).await.ok()
1616 } else {
1617 None
1618 }
1619 .map(|e| BookEntryRef::new().entry(e).build());
1620
1621 let next_entry = if index < entries.len() - 1 {
1622 let next_entry_ref = &entries[index + 1];
1623 self.fetch_entry_view(notebook, next_entry_ref).await.ok()
1624 } else {
1625 None
1626 }
1627 .map(|e| BookEntryRef::new().entry(e).build());
1628
1629 Ok(Some(
1630 BookEntryView::new()
1631 .entry(entry)
1632 .maybe_next(next_entry)
1633 .maybe_prev(prev_entry)
1634 .index(index as i64)
1635 .build(),
1636 ))
1637 }
1638 }
1639
1640 /// Find valid collaborators for a resource.
1641 ///
1642 /// Queries Constellation for invite/accept record pairs:
1643 /// 1. Find all invites targeting this resource URI
1644 /// 2. For each invite, check if there's a matching accept record
1645 /// 3. Return DIDs that have both invite AND accept
1646 fn find_collaborators_for_resource(
1647 &self,
1648 resource_uri: &AtUri<'_>,
1649 ) -> impl Future<Output = Result<Vec<Did<'static>>, WeaverError>>
1650 where
1651 Self: Sized,
1652 {
1653 async move {
1654 use weaver_api::sh_weaver::collab::invite::Invite;
1655
1656 const INVITE_NSID: &str = "sh.weaver.collab.invite";
1657 const ACCEPT_NSID: &str = "sh.weaver.collab.accept";
1658
1659 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| {
1660 AgentError::from(ClientError::invalid_request(format!(
1661 "Invalid constellation URL: {}",
1662 e
1663 )))
1664 })?;
1665
1666 // Step 1: Find all invites for this resource
1667 let invite_query = GetBacklinksQuery {
1668 subject: Uri::At(resource_uri.clone().into_static()),
1669 source: format!("{}:resource.uri", INVITE_NSID).into(),
1670 cursor: None,
1671 did: vec![],
1672 limit: 100,
1673 };
1674
1675 let response = self
1676 .xrpc(constellation_url.clone())
1677 .send(&invite_query)
1678 .await
1679 .map_err(|e| {
1680 AgentError::from(ClientError::invalid_request(format!(
1681 "Constellation query failed: {}",
1682 e
1683 )))
1684 })?;
1685
1686 let invite_output = response.into_output().map_err(|e| {
1687 AgentError::from(ClientError::invalid_request(format!(
1688 "Failed to parse constellation response: {}",
1689 e
1690 )))
1691 })?;
1692
1693 let mut collaborators = Vec::new();
1694
1695 // Step 2: For each invite, check for a matching accept
1696 for record_id in invite_output.records {
1697 let invite_uri_str = format!(
1698 "at://{}/{}/{}",
1699 record_id.did,
1700 INVITE_NSID,
1701 record_id.rkey.0.as_ref()
1702 );
1703 let Ok(invite_uri) = AtUri::new(&invite_uri_str) else {
1704 continue;
1705 };
1706
1707 // Fetch the invite to get the invitee DID
1708 let Ok(invite_resp) = self.get_record::<Invite>(&invite_uri).await else {
1709 continue;
1710 };
1711 let Ok(invite_record) = invite_resp.into_output() else {
1712 continue;
1713 };
1714
1715 let invitee_did = invite_record.value.invitee.clone().into_static();
1716
1717 // Query for accept records referencing this invite
1718 let accept_query = GetBacklinksQuery {
1719 subject: Uri::At(invite_uri.into_static()),
1720 source: format!("{}:invite.uri", ACCEPT_NSID).into(),
1721 cursor: None,
1722 did: vec![invitee_did.clone()],
1723 limit: 1,
1724 };
1725
1726 let Ok(accept_resp) = self
1727 .xrpc(constellation_url.clone())
1728 .send(&accept_query)
1729 .await
1730 else {
1731 continue;
1732 };
1733
1734 let Ok(accept_output) = accept_resp.into_output() else {
1735 continue;
1736 };
1737
1738 if !accept_output.records.is_empty() {
1739 // Both parties in a valid invite+accept pair are authorized
1740 let inviter_did = record_id.did.clone().into_static();
1741 collaborators.push(inviter_did);
1742 collaborators.push(invitee_did);
1743 }
1744 }
1745
1746 // Deduplicate (someone might appear in multiple pairs)
1747 collaborators.sort();
1748 collaborators.dedup();
1749
1750 Ok(collaborators)
1751 }
1752 }
1753
1754 /// Find all versions of a record across collaborator repositories.
1755 ///
1756 /// For each collaborator DID, attempts to fetch `at://{did}/{collection}/{rkey}`.
1757 /// Returns all found versions sorted by `updated_at` descending (latest first).
1758 fn find_all_versions<'a>(
1759 &'a self,
1760 collection: &'a str,
1761 rkey: &'a str,
1762 collaborators: &'a [Did<'_>],
1763 ) -> impl Future<Output = Result<Vec<CollaboratorVersion<'static>>, WeaverError>> + 'a
1764 where
1765 Self: Sized,
1766 {
1767 async move {
1768 use jacquard::Data;
1769
1770 let mut versions = Vec::new();
1771
1772 for collab_did in collaborators {
1773 // Build URI for this collaborator's version
1774 let uri_str = format!("at://{}/{}/{}", collab_did, collection, rkey);
1775 let Ok(uri) = AtUri::new(&uri_str) else {
1776 continue;
1777 };
1778
1779 // Fetch via slingshot (handles cross-PDS routing)
1780 let Ok(record) = self.fetch_record_slingshot(&uri).await else {
1781 continue;
1782 };
1783
1784 let Some(cid) = record.cid else {
1785 continue;
1786 };
1787
1788 let updated_at = record
1789 .value
1790 .query("...updatedAt")
1791 .first()
1792 .or_else(|| record.value.query("...createdAt").first())
1793 .and_then(|v: &Data| v.as_str())
1794 .and_then(|s| s.parse::<jacquard::types::string::Datetime>().ok());
1795
1796 versions.push(CollaboratorVersion {
1797 did: collab_did.clone().into_static(),
1798 uri: record.uri.into_static(),
1799 cid: cid.into_static(),
1800 updated_at,
1801 value: record.value.into_static(),
1802 });
1803 }
1804
1805 // Sort by updated_at descending (latest first)
1806 versions.sort_by(|a, b| match (&b.updated_at, &a.updated_at) {
1807 (Some(b_time), Some(a_time)) => b_time.as_ref().cmp(a_time.as_ref()),
1808 (Some(_), None) => std::cmp::Ordering::Less,
1809 (None, Some(_)) => std::cmp::Ordering::Greater,
1810 (None, None) => std::cmp::Ordering::Equal,
1811 });
1812
1813 Ok(versions)
1814 }
1815 }
1816
1817 /// Check if a user can edit a resource based on collaboration records.
1818 ///
1819 /// Returns true if the user is the resource owner OR has valid invite+accept.
1820 fn can_user_edit_resource<'a>(
1821 &'a self,
1822 resource_uri: &'a AtUri<'_>,
1823 user_did: &'a Did<'_>,
1824 ) -> impl Future<Output = Result<bool, WeaverError>> + 'a
1825 where
1826 Self: Sized,
1827 {
1828 async move {
1829 // Check if user is the owner
1830 if let jacquard::types::ident::AtIdentifier::Did(owner_did) = resource_uri.authority() {
1831 if owner_did == user_did {
1832 return Ok(true);
1833 }
1834 }
1835
1836 // Check for valid collaboration
1837 let collaborators = self.find_collaborators_for_resource(resource_uri).await?;
1838 Ok(collaborators.iter().any(|c| c == user_did))
1839 }
1840 }
1841
1842 /// Check if a user can edit an entry, considering notebook-level cascading.
1843 ///
1844 /// An entry is editable if user owns it, has entry-level collab, or has notebook-level collab.
1845 fn can_user_edit_entry<'a>(
1846 &'a self,
1847 entry_uri: &'a AtUri<'_>,
1848 user_did: &'a Did<'_>,
1849 ) -> impl Future<Output = Result<bool, WeaverError>> + 'a
1850 where
1851 Self: Sized,
1852 {
1853 async move {
1854 // Check entry-level access first
1855 if self.can_user_edit_resource(entry_uri, user_did).await? {
1856 return Ok(true);
1857 }
1858
1859 // Check notebook-level access (cascade)
1860 if let Some(notebook_id) = self.find_notebook_for_entry(entry_uri).await? {
1861 let notebook_uri_str = format!(
1862 "at://{}/{}/{}",
1863 notebook_id.did,
1864 notebook_id.collection,
1865 notebook_id.rkey.0.as_ref()
1866 );
1867 if let Ok(notebook_uri) = AtUri::new(¬ebook_uri_str) {
1868 if self.can_user_edit_resource(¬ebook_uri, user_did).await? {
1869 return Ok(true);
1870 }
1871 }
1872 }
1873
1874 Ok(false)
1875 }
1876 }
1877
1878 /// Get the full permissions state for a resource.
1879 ///
1880 /// Returns PermissionsState with all editors:
1881 /// - Resource authority (source = resource URI, grantedAt = createdAt)
1882 /// - Invited collaborators (source = invite URI, grantedAt = accept createdAt)
1883 /// - For entries: inherited notebook-level collaborators
1884 fn get_permissions_for_resource(
1885 &self,
1886 resource_uri: &AtUri<'_>,
1887 ) -> impl Future<Output = Result<PermissionsState<'static>, WeaverError>>
1888 where
1889 Self: Sized,
1890 {
1891 async move {
1892 use weaver_api::sh_weaver::collab::accept::Accept;
1893 use weaver_api::sh_weaver::collab::invite::Invite;
1894
1895 const INVITE_NSID: &str = "sh.weaver.collab.invite";
1896 const ACCEPT_NSID: &str = "sh.weaver.collab.accept";
1897
1898 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| {
1899 AgentError::from(ClientError::invalid_request(format!(
1900 "Invalid constellation URL: {}",
1901 e
1902 )))
1903 })?;
1904
1905 let mut editors = Vec::new();
1906
1907 // 1. Resource authority - creating the resource is its own grant
1908 let authority_did = match resource_uri.authority() {
1909 jacquard::types::ident::AtIdentifier::Did(did) => did.clone().into_static(),
1910 jacquard::types::ident::AtIdentifier::Handle(handle) => {
1911 let (did, _) = self.pds_for_handle(handle).await.map_err(|e| {
1912 AgentError::from(
1913 ClientError::from(e).with_context("Failed to resolve handle"),
1914 )
1915 })?;
1916 did.into_static()
1917 }
1918 };
1919
1920 // Fetch the record to get createdAt (use untyped fetch to handle any collection)
1921 let record = self
1922 .fetch_record_slingshot(resource_uri)
1923 .await
1924 .map_err(|e| WeaverError::from(AgentError::from(e)))?;
1925 let authority_granted_at = record
1926 .value
1927 .query("createdAt")
1928 .first()
1929 .and_then(|v| v.as_str())
1930 .and_then(|s| s.parse::<jacquard::types::string::Datetime>().ok())
1931 .ok_or_else(|| {
1932 WeaverError::from(AgentError::from(ClientError::invalid_request(
1933 "Record missing createdAt",
1934 )))
1935 })?;
1936
1937 editors.push(
1938 PermissionGrant::new()
1939 .did(authority_did.clone())
1940 .scope("direct")
1941 .source(resource_uri.clone().into_static())
1942 .granted_at(authority_granted_at)
1943 .build()
1944 .into_static(),
1945 );
1946
1947 // 2. Find direct invites for this resource
1948 let invite_query = GetBacklinksQuery {
1949 subject: Uri::At(resource_uri.clone().into_static()),
1950 source: format!("{}:resource.uri", INVITE_NSID).into(),
1951 cursor: None,
1952 did: vec![],
1953 limit: 100,
1954 };
1955
1956 let invite_response = self
1957 .xrpc(constellation_url.clone())
1958 .send(&invite_query)
1959 .await
1960 .map_err(|e| {
1961 AgentError::from(ClientError::invalid_request(format!(
1962 "Constellation invite query failed: {}",
1963 e
1964 )))
1965 })?;
1966 let invite_output = invite_response.into_output().map_err(|e| {
1967 AgentError::from(ClientError::invalid_request(format!(
1968 "Failed to parse Constellation response: {}",
1969 e
1970 )))
1971 })?;
1972
1973 for record_id in invite_output.records {
1974 let invite_uri_str = format!(
1975 "at://{}/{}/{}",
1976 record_id.did,
1977 INVITE_NSID,
1978 record_id.rkey.0.as_ref()
1979 );
1980 let invite_uri = AtUri::new(&invite_uri_str).map_err(|_| {
1981 AgentError::from(ClientError::invalid_request(
1982 "Invalid invite URI from Constellation",
1983 ))
1984 })?;
1985
1986 // Fetch invite to get invitee DID
1987 let invite_record =
1988 self.get_record::<Invite>(&invite_uri)
1989 .await
1990 .map_err(|e| WeaverError::from(AgentError::from(e)))?
1991 .into_output()
1992 .map_err(|e| {
1993 WeaverError::from(AgentError::from(ClientError::invalid_request(
1994 format!("Failed to parse invite record: {}", e),
1995 )))
1996 })?;
1997
1998 let invitee_did = invite_record.value.invitee.clone().into_static();
1999
2000 // Query for accept records referencing this invite
2001 let accept_query = GetBacklinksQuery {
2002 subject: Uri::At(invite_uri.clone().into_static()),
2003 source: format!("{}:invite.uri", ACCEPT_NSID).into(),
2004 cursor: None,
2005 did: vec![invitee_did.clone()],
2006 limit: 1,
2007 };
2008
2009 let accept_response = self
2010 .xrpc(constellation_url.clone())
2011 .send(&accept_query)
2012 .await
2013 .map_err(|e| {
2014 AgentError::from(ClientError::invalid_request(format!(
2015 "Constellation accept query failed: {}",
2016 e
2017 )))
2018 })?;
2019 let accept_output = accept_response.into_output().map_err(|e| {
2020 AgentError::from(ClientError::invalid_request(format!(
2021 "Failed to parse Constellation accept response: {}",
2022 e
2023 )))
2024 })?;
2025
2026 // No accept = pending invite, not an error - just skip
2027 let Some(accept_record_id) = accept_output.records.first() else {
2028 continue;
2029 };
2030
2031 let accept_uri_str = format!(
2032 "at://{}/{}/{}",
2033 accept_record_id.did,
2034 ACCEPT_NSID,
2035 accept_record_id.rkey.0.as_ref()
2036 );
2037 let accept_uri = AtUri::new(&accept_uri_str).map_err(|_| {
2038 AgentError::from(ClientError::invalid_request(
2039 "Invalid accept URI from Constellation",
2040 ))
2041 })?;
2042 let accept_record =
2043 self.get_record::<Accept>(&accept_uri)
2044 .await
2045 .map_err(|e| WeaverError::from(AgentError::from(e)))?
2046 .into_output()
2047 .map_err(|e| {
2048 WeaverError::from(AgentError::from(ClientError::invalid_request(
2049 format!("Failed to parse accept record: {}", e),
2050 )))
2051 })?;
2052
2053 editors.push(
2054 PermissionGrant::new()
2055 .did(invitee_did)
2056 .scope("direct")
2057 .source(invite_uri.into_static())
2058 .granted_at(accept_record.value.created_at)
2059 .build()
2060 .into_static(),
2061 );
2062 }
2063
2064 // 3. For entries, check notebook-level invites (inherited)
2065 let is_entry = resource_uri
2066 .collection()
2067 .is_some_and(|c| c.as_ref() == "sh.weaver.notebook.entry");
2068
2069 if is_entry {
2070 // Entry not in a notebook is fine - just no inherited permissions
2071 if let Some(notebook_id) = self.find_notebook_for_entry(resource_uri).await? {
2072 let notebook_uri_str = format!(
2073 "at://{}/{}/{}",
2074 notebook_id.did,
2075 notebook_id.collection,
2076 notebook_id.rkey.0.as_ref()
2077 );
2078 let notebook_uri = AtUri::new(¬ebook_uri_str).map_err(|_| {
2079 AgentError::from(ClientError::invalid_request(
2080 "Invalid notebook URI from Constellation",
2081 ))
2082 })?;
2083
2084 let notebook_invite_query = GetBacklinksQuery {
2085 subject: Uri::At(notebook_uri.clone().into_static()),
2086 source: format!("{}:resource.uri", INVITE_NSID).into(),
2087 cursor: None,
2088 did: vec![],
2089 limit: 100,
2090 };
2091
2092 let notebook_invite_response = self
2093 .xrpc(constellation_url.clone())
2094 .send(¬ebook_invite_query)
2095 .await
2096 .map_err(|e| {
2097 AgentError::from(ClientError::invalid_request(format!(
2098 "Constellation notebook invite query failed: {}",
2099 e
2100 )))
2101 })?;
2102 let notebook_invite_output =
2103 notebook_invite_response.into_output().map_err(|e| {
2104 AgentError::from(ClientError::invalid_request(format!(
2105 "Failed to parse Constellation response: {}",
2106 e
2107 )))
2108 })?;
2109
2110 for record_id in notebook_invite_output.records {
2111 let invite_uri_str = format!(
2112 "at://{}/{}/{}",
2113 record_id.did,
2114 INVITE_NSID,
2115 record_id.rkey.0.as_ref()
2116 );
2117 let invite_uri = AtUri::new(&invite_uri_str).map_err(|_| {
2118 AgentError::from(ClientError::invalid_request(
2119 "Invalid invite URI from Constellation",
2120 ))
2121 })?;
2122
2123 let invite_record = self
2124 .get_record::<Invite>(&invite_uri)
2125 .await
2126 .map_err(|e| WeaverError::from(AgentError::from(e)))?
2127 .into_output()
2128 .map_err(|e| {
2129 WeaverError::from(AgentError::from(ClientError::invalid_request(
2130 format!("Failed to parse invite record: {}", e),
2131 )))
2132 })?;
2133
2134 let invitee_did = invite_record.value.invitee.clone().into_static();
2135
2136 // Skip if already in direct grants (direct takes precedence)
2137 if editors.iter().any(|g| g.did == invitee_did) {
2138 continue;
2139 }
2140
2141 let accept_query = GetBacklinksQuery {
2142 subject: Uri::At(invite_uri.clone().into_static()),
2143 source: format!("{}:.invite.uri", ACCEPT_NSID).into(),
2144 cursor: None,
2145 did: vec![invitee_did.clone()],
2146 limit: 1,
2147 };
2148
2149 let accept_response = self
2150 .xrpc(constellation_url.clone())
2151 .send(&accept_query)
2152 .await
2153 .map_err(|e| {
2154 AgentError::from(ClientError::invalid_request(format!(
2155 "Constellation accept query failed: {}",
2156 e
2157 )))
2158 })?;
2159 let accept_output = accept_response.into_output().map_err(|e| {
2160 AgentError::from(ClientError::invalid_request(format!(
2161 "Failed to parse Constellation accept response: {}",
2162 e
2163 )))
2164 })?;
2165
2166 // No accept = pending invite, not an error - just skip
2167 let Some(accept_record_id) = accept_output.records.first() else {
2168 continue;
2169 };
2170
2171 let accept_uri_str = format!(
2172 "at://{}/{}/{}",
2173 accept_record_id.did,
2174 ACCEPT_NSID,
2175 accept_record_id.rkey.0.as_ref()
2176 );
2177 let accept_uri = AtUri::new(&accept_uri_str).map_err(|_| {
2178 AgentError::from(ClientError::invalid_request(
2179 "Invalid accept URI from Constellation",
2180 ))
2181 })?;
2182 let accept_record = self
2183 .get_record::<Accept>(&accept_uri)
2184 .await
2185 .map_err(|e| WeaverError::from(AgentError::from(e)))?
2186 .into_output()
2187 .map_err(|e| {
2188 WeaverError::from(AgentError::from(ClientError::invalid_request(
2189 format!("Failed to parse accept record: {}", e),
2190 )))
2191 })?;
2192
2193 editors.push(
2194 PermissionGrant::new()
2195 .did(invitee_did)
2196 .scope("inherited")
2197 .source(invite_uri.into_static())
2198 .granted_at(accept_record.value.created_at)
2199 .build()
2200 .into_static(),
2201 );
2202 }
2203 }
2204 }
2205
2206 Ok(PermissionsState::new()
2207 .editors(editors)
2208 .build()
2209 .into_static())
2210 }
2211 }
2212
2213 // =========================================================================
2214 // Real-time Collaboration Session Management
2215 // =========================================================================
2216
2217 /// Create a collaboration session record on the user's PDS.
2218 ///
2219 /// Called when joining a real-time editing session. The session record
2220 /// contains the iroh NodeId so other collaborators can discover and
2221 /// connect to this peer.
2222 ///
2223 /// Returns the AT-URI of the created session record.
2224 fn create_collab_session<'a>(
2225 &'a self,
2226 resource: &'a StrongRef<'a>,
2227 node_id: &'a str,
2228 relay_url: Option<&'a str>,
2229 ttl_minutes: Option<u32>,
2230 ) -> impl Future<Output = Result<AtUri<'static>, WeaverError>> + 'a {
2231 async move {
2232 use jacquard::types::string::Datetime;
2233 use weaver_api::sh_weaver::collab::session::Session;
2234
2235 // Clean up any expired sessions first
2236 let _ = self.cleanup_expired_sessions().await;
2237
2238 let now_chrono = chrono::Utc::now().fixed_offset();
2239 let now = Datetime::new(now_chrono);
2240 let expires_at = ttl_minutes.map(|mins| {
2241 let expires = now_chrono + chrono::Duration::minutes(mins as i64);
2242 Datetime::new(expires)
2243 });
2244
2245 let relay_uri = relay_url
2246 .map(|url| jacquard::types::string::Uri::new(url))
2247 .transpose()
2248 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid relay URL")))?;
2249
2250 let session = Session::new()
2251 .resource(resource.clone())
2252 .node_id(node_id)
2253 .created_at(now)
2254 .maybe_expires_at(expires_at)
2255 .maybe_relay_url(relay_uri)
2256 .build();
2257
2258 let response = self.create_record(session, None).await?;
2259 Ok(response.uri.into_static())
2260 }
2261 }
2262
2263 /// Delete a collaboration session record.
2264 ///
2265 /// Called when leaving a real-time editing session to clean up.
2266 fn delete_collab_session<'a>(
2267 &'a self,
2268 session_uri: &'a AtUri<'a>,
2269 ) -> impl Future<Output = Result<(), WeaverError>> + 'a {
2270 async move {
2271 use weaver_api::sh_weaver::collab::session::Session;
2272
2273 let rkey = session_uri.rkey().ok_or_else(|| {
2274 AgentError::from(ClientError::invalid_request("Session URI missing rkey"))
2275 })?;
2276 self.delete_record::<Session>(rkey.clone()).await?;
2277 Ok(())
2278 }
2279 }
2280
2281 /// Refresh a collaboration session's TTL.
2282 ///
2283 /// Called periodically to indicate the session is still active.
2284 fn refresh_collab_session<'a>(
2285 &'a self,
2286 session_uri: &'a AtUri<'a>,
2287 ttl_minutes: u32,
2288 ) -> impl Future<Output = Result<(), WeaverError>> + 'a {
2289 async move {
2290 use jacquard::types::string::Datetime;
2291 use weaver_api::sh_weaver::collab::session::Session;
2292
2293 let now_chrono = chrono::Utc::now().fixed_offset();
2294 let expires = now_chrono + chrono::Duration::minutes(ttl_minutes as i64);
2295 let expires_at = Datetime::new(expires);
2296
2297 self.update_record::<Session>(session_uri, |session| {
2298 session.expires_at = Some(expires_at);
2299 })
2300 .await?;
2301 Ok(())
2302 }
2303 }
2304
2305 /// Update the relay URL in an existing session record.
2306 ///
2307 /// Called when the relay connection changes during a session.
2308 fn update_collab_session_relay<'a>(
2309 &'a self,
2310 session_uri: &'a AtUri<'a>,
2311 relay_url: Option<&'a str>,
2312 ) -> impl Future<Output = Result<(), WeaverError>> + 'a {
2313 async move {
2314 use weaver_api::sh_weaver::collab::session::Session;
2315
2316 let relay_uri = relay_url
2317 .map(|url| jacquard::types::string::Uri::new(url))
2318 .transpose()
2319 .map_err(|_| AgentError::from(ClientError::invalid_request("Invalid relay URL")))?;
2320
2321 self.update_record::<Session>(session_uri, |session| {
2322 session.relay_url = relay_uri.clone();
2323 })
2324 .await?;
2325 Ok(())
2326 }
2327 }
2328
2329 /// Delete all expired session records for the current user.
2330 ///
2331 /// Called before creating a new session to clean up stale records.
2332 fn cleanup_expired_sessions<'a>(&'a self) -> impl Future<Output = Result<u32, WeaverError>> + 'a
2333 where
2334 Self: Sized,
2335 {
2336 async move {
2337 use jacquard::types::nsid::Nsid;
2338 use weaver_api::com_atproto::repo::list_records::ListRecords;
2339 use weaver_api::sh_weaver::collab::session::Session;
2340
2341 let (did, _) = self.session_info().await.ok_or_else(|| {
2342 AgentError::from(ClientError::invalid_request("No active session"))
2343 })?;
2344 let now = chrono::Utc::now();
2345 let mut deleted = 0u32;
2346
2347 // List all our session records
2348 let collection =
2349 Nsid::new("sh.weaver.collab.session").map_err(WeaverError::AtprotoString)?;
2350 let request = ListRecords::new()
2351 .repo(did.clone())
2352 .collection(collection)
2353 .limit(100)
2354 .build();
2355
2356 let response = self.send(request).await.map_err(AgentError::from)?;
2357 let output = response.into_output().map_err(|e| {
2358 AgentError::from(ClientError::invalid_request(format!(
2359 "Failed to list sessions: {}",
2360 e
2361 )))
2362 })?;
2363
2364 for record in output.records {
2365 if let Ok(session) = jacquard::from_data::<Session>(&record.value) {
2366 // Check if expired
2367 if let Some(ref expires_at) = session.expires_at {
2368 let expires_str = expires_at.as_str();
2369 if let Ok(expires) = chrono::DateTime::parse_from_rfc3339(expires_str) {
2370 if expires.with_timezone(&chrono::Utc) < now {
2371 // Delete expired session
2372 if let Some(rkey) = record.uri.rkey() {
2373 if let Err(e) =
2374 self.delete_record::<Session>(rkey.clone()).await
2375 {
2376 tracing::warn!("Failed to delete expired session: {}", e);
2377 } else {
2378 deleted += 1;
2379 }
2380 }
2381 }
2382 }
2383 }
2384 }
2385 }
2386
2387 if deleted > 0 {
2388 tracing::info!("Cleaned up {} expired session records", deleted);
2389 }
2390
2391 Ok(deleted)
2392 }
2393 }
2394
2395 /// Find active collaboration sessions for a resource.
2396 ///
2397 /// Queries Constellation for session records referencing the given resource,
2398 /// then fetches each to extract peer connection info.
2399 ///
2400 /// Returns peers with unexpired sessions (or no expiry set).
2401 fn find_session_peers<'a>(
2402 &'a self,
2403 resource_uri: &'a AtUri<'a>,
2404 ) -> impl Future<Output = Result<Vec<SessionPeer<'static>>, WeaverError>> + 'a
2405 where
2406 Self: Sized,
2407 {
2408 async move {
2409 use jacquard::types::string::Datetime;
2410 use weaver_api::sh_weaver::collab::session::Session;
2411
2412 const SESSION_NSID: &str = "sh.weaver.collab.session";
2413
2414 // Get authorized collaborators (owner is checked separately via URI authority)
2415 let collaborators: std::collections::HashSet<Did<'static>> = self
2416 .find_collaborators_for_resource(resource_uri)
2417 .await
2418 .unwrap_or_default()
2419 .into_iter()
2420 .collect();
2421
2422 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| {
2423 AgentError::from(ClientError::invalid_request(format!(
2424 "Invalid constellation URL: {}",
2425 e
2426 )))
2427 })?;
2428
2429 // Query for session records referencing this resource
2430 let query = GetBacklinksQuery {
2431 subject: Uri::At(resource_uri.clone().into_static()),
2432 source: format!("{}:resource.uri", SESSION_NSID).into(),
2433 cursor: None,
2434 did: vec![],
2435 limit: 100,
2436 };
2437
2438 let response = self
2439 .xrpc(constellation_url)
2440 .send(&query)
2441 .await
2442 .map_err(|e| {
2443 AgentError::from(ClientError::invalid_request(format!(
2444 "Constellation query failed: {}",
2445 e
2446 )))
2447 })?;
2448
2449 let output = response.into_output().map_err(|e| {
2450 AgentError::from(ClientError::invalid_request(format!(
2451 "Failed to parse constellation response: {}",
2452 e
2453 )))
2454 })?;
2455
2456 let mut peers = Vec::new();
2457 let now = Datetime::now();
2458
2459 for record_id in output.records {
2460 let session_uri_str = format!(
2461 "at://{}/{}/{}",
2462 record_id.did,
2463 SESSION_NSID,
2464 record_id.rkey.0.as_ref()
2465 );
2466 let Ok(session_uri) = AtUri::new(&session_uri_str) else {
2467 continue;
2468 };
2469
2470 // Fetch the session record
2471 let Ok(session_resp) = self.get_record::<Session>(&session_uri).await else {
2472 continue;
2473 };
2474 let Ok(session_record) = session_resp.into_output() else {
2475 continue;
2476 };
2477
2478 // Check if session has expired (Datetime implements Ord)
2479 if let Some(ref expires_at) = session_record.value.expires_at {
2480 if *expires_at < now {
2481 continue; // Session expired
2482 }
2483 }
2484
2485 // Check if peer is authorized (has valid invite+accept pair)
2486 let peer_did = record_id.did.clone().into_static();
2487 if !collaborators.contains(&peer_did) {
2488 tracing::debug!(
2489 peer = %peer_did,
2490 "Filtering out unauthorized session peer"
2491 );
2492 continue;
2493 }
2494
2495 peers.push(SessionPeer {
2496 did: record_id.did.into_static(),
2497 node_id: session_record.value.node_id.as_ref().into(),
2498 relay_url: session_record.value.relay_url.map(|u| u.as_ref().into()),
2499 created_at: session_record.value.created_at,
2500 expires_at: session_record.value.expires_at,
2501 });
2502 }
2503
2504 Ok(peers)
2505 }
2506 }
2507
2508 /// Find contributors (authors) for a resource based on evidence.
2509 ///
2510 /// Contributors are DIDs who have actually contributed to this resource:
2511 /// 1. Edit records (edit.root or edit.diff) referencing this resource
2512 /// 2. Published versions of the record in their repo (same rkey)
2513 ///
2514 /// This is separate from permissions - you can have edit permission without
2515 /// having contributed yet.
2516 fn find_contributors_for_resource(
2517 &self,
2518 resource_uri: &AtUri<'_>,
2519 ) -> impl Future<Output = Result<Vec<Did<'static>>, WeaverError>>
2520 where
2521 Self: Sized,
2522 {
2523 async move {
2524 const EDIT_ROOT_NSID: &str = "sh.weaver.edit.root";
2525
2526 let constellation_url = Url::parse(CONSTELLATION_URL).map_err(|e| {
2527 AgentError::from(ClientError::invalid_request(format!(
2528 "Invalid constellation URL: {}",
2529 e
2530 )))
2531 })?;
2532
2533 let mut contributors = std::collections::HashSet::new();
2534
2535 // 1. Resource authority is always a contributor
2536 let authority_did = match resource_uri.authority() {
2537 jacquard::types::ident::AtIdentifier::Did(did) => did.clone().into_static(),
2538 jacquard::types::ident::AtIdentifier::Handle(handle) => {
2539 let (did, _) = self.pds_for_handle(handle).await.map_err(|e| {
2540 AgentError::from(
2541 ClientError::from(e).with_context("Failed to resolve handle"),
2542 )
2543 })?;
2544 did.into_static()
2545 }
2546 };
2547 contributors.insert(authority_did);
2548
2549 // 2. Find DIDs with edit records for this resource
2550 let edit_query = GetBacklinksQuery {
2551 subject: Uri::At(resource_uri.clone().into_static()),
2552 source: format!("{}:doc.value.entry.uri", EDIT_ROOT_NSID).into(),
2553 cursor: None,
2554 did: vec![],
2555 limit: 100,
2556 };
2557
2558 if let Ok(response) = self.xrpc(constellation_url.clone()).send(&edit_query).await {
2559 if let Ok(edit_output) = response.into_output() {
2560 for record_id in edit_output.records {
2561 contributors.insert(record_id.did.into_static());
2562 }
2563 }
2564 }
2565
2566 // 3. Find collaborators who have published versions (same rkey)
2567 let collaborators = self.find_collaborators_for_resource(resource_uri).await?;
2568 let rkey = resource_uri.rkey();
2569 let collection = resource_uri.collection();
2570
2571 if let (Some(rkey), Some(collection)) = (rkey, collection) {
2572 for collab_did in collaborators {
2573 // Try to fetch their version of the record via slingshot
2574 let collab_uri_str = format!(
2575 "at://{}/{}/{}",
2576 collab_did.as_ref(),
2577 collection,
2578 rkey.as_ref()
2579 );
2580 if let Ok(collab_uri) = AtUri::new(&collab_uri_str) {
2581 // Check if record actually exists
2582 if self.fetch_record_slingshot(&collab_uri).await.is_ok() {
2583 contributors.insert(collab_did);
2584 }
2585 }
2586 }
2587 }
2588
2589 Ok(contributors.into_iter().collect())
2590 }
2591 }
2592
2593 /// Fetch a blob from any PDS by DID and CID.
2594 ///
2595 /// Resolves the DID to find its PDS, then fetches the blob.
2596 fn fetch_blob<'a>(
2597 &'a self,
2598 did: &'a Did<'_>,
2599 cid: &'a jacquard::types::string::Cid<'_>,
2600 ) -> impl Future<Output = Result<Bytes, WeaverError>> + 'a {
2601 async move {
2602 use weaver_api::com_atproto::sync::get_blob::GetBlob;
2603
2604 let pds_url = self.pds_for_did(did).await.map_err(|e| {
2605 AgentError::from(ClientError::from(e).with_context("Failed to resolve PDS for DID"))
2606 })?;
2607
2608 let request = GetBlob::new().did(did.clone()).cid(cid.clone()).build();
2609
2610 let response = self
2611 .xrpc(pds_url)
2612 .send(&request)
2613 .await
2614 .map_err(|e| AgentError::from(ClientError::from(e)))?;
2615
2616 let output = response.into_output().map_err(|e| AgentError::xrpc(e))?;
2617
2618 Ok(output.body)
2619 }
2620 }
2621}
2622
2623/// A version of a record from a collaborator's repository.
2624#[derive(Debug, Clone)]
2625pub struct CollaboratorVersion<'a> {
2626 /// The DID of the collaborator who owns this version.
2627 pub did: Did<'a>,
2628 /// The full URI of this version.
2629 pub uri: AtUri<'a>,
2630 /// CID of this version.
2631 pub cid: jacquard::types::string::Cid<'a>,
2632 /// When this version was last updated.
2633 pub updated_at: Option<jacquard::types::string::Datetime>,
2634 /// The raw record value.
2635 pub value: jacquard::Data<'a>,
2636}
2637
2638/// Information about a peer discovered from session records.
2639#[derive(Debug, Clone)]
2640pub struct SessionPeer<'a> {
2641 /// The peer's DID.
2642 pub did: Did<'a>,
2643 /// The peer's iroh NodeId (z-base32 encoded).
2644 pub node_id: SmolStr,
2645 /// Optional relay URL for browser clients.
2646 pub relay_url: Option<SmolStr>,
2647 /// When the session was created.
2648 pub created_at: jacquard::types::string::Datetime,
2649 /// When the session expires (if set).
2650 pub expires_at: Option<jacquard::types::string::Datetime>,
2651}
2652
2653impl<T: AgentSession + IdentityResolver + XrpcExt> WeaverExt for T {}