atproto blogging
1//! sh.weaver.notebook.* endpoint handlers
2
3use std::collections::{HashMap, HashSet};
4
5use axum::{Json, extract::State};
6use jacquard::IntoStatic;
7use jacquard::cowstr::ToCowStr;
8use jacquard::types::string::{AtUri, Cid, Did, Handle, Uri};
9use jacquard::types::value::Data;
10use jacquard_axum::ExtractXrpc;
11use jacquard_axum::service_auth::ExtractOptionalServiceAuth;
12use smol_str::SmolStr;
13use weaver_api::com_atproto::repo::strong_ref::StrongRef;
14use weaver_api::sh_weaver::actor::{ProfileDataView, ProfileDataViewInner, ProfileView};
15use weaver_api::sh_weaver::notebook::{
16 AuthorListView, BookEntryRef, BookEntryView, EntryView, FeedEntryView, NotebookView,
17 get_book_entry::{GetBookEntryOutput, GetBookEntryRequest},
18 get_entry::{GetEntryOutput, GetEntryRequest},
19 get_entry_feed::{GetEntryFeedOutput, GetEntryFeedRequest},
20 get_entry_notebooks::{GetEntryNotebooksOutput, GetEntryNotebooksRequest, NotebookRef},
21 get_notebook::{GetNotebookOutput, GetNotebookRequest},
22 get_notebook_feed::{GetNotebookFeedOutput, GetNotebookFeedRequest},
23 resolve_entry::{ResolveEntryOutput, ResolveEntryRequest},
24 resolve_global_notebook::{ResolveGlobalNotebookOutput, ResolveGlobalNotebookRequest},
25 resolve_notebook::{ResolveNotebookOutput, ResolveNotebookRequest},
26};
27
28use crate::clickhouse::{EntryRow, ProfileRow};
29use crate::endpoints::actor::{Viewer, resolve_actor};
30use crate::endpoints::repo::XrpcErrorResponse;
31use crate::server::AppState;
32
33/// Handle sh.weaver.notebook.resolveNotebook
34///
35/// Resolves a notebook by actor + path/title, returns notebook with entries.
36pub async fn resolve_notebook(
37 State(state): State<AppState>,
38 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
39 ExtractXrpc(args): ExtractXrpc<ResolveNotebookRequest>,
40) -> Result<Json<ResolveNotebookOutput<'static>>, XrpcErrorResponse> {
41 // viewer can be used later for viewer state (bookmarks, read status, etc.)
42 let _viewer: Viewer = viewer;
43
44 // Resolve actor to DID
45 let did = resolve_actor(&state, &args.actor).await?;
46 let did_str = did.as_str();
47 let name = args.name.as_ref();
48
49 let limit = args.entry_limit.unwrap_or(50).clamp(1, 100) as u32;
50 let cursor: Option<u32> = args.entry_cursor.as_deref().and_then(|c| c.parse().ok());
51
52 // Fetch notebook first to get its rkey
53 let notebook_row = state
54 .clickhouse
55 .resolve_notebook(did_str, name)
56 .await
57 .map_err(|e| {
58 tracing::error!("Failed to resolve notebook: {}", e);
59 XrpcErrorResponse::internal_error("Database query failed")
60 })?
61 .ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?;
62
63 // Now fetch entries using notebook's rkey
64 let entry_rows = state
65 .clickhouse
66 .list_notebook_entries(did_str, ¬ebook_row.rkey, limit + 1, cursor)
67 .await
68 .map_err(|e| {
69 tracing::error!("Failed to list entries: {}", e);
70 XrpcErrorResponse::internal_error("Database query failed")
71 })?;
72
73 // Fetch notebook contributors (evidence-based)
74 let notebook_contributors = state
75 .clickhouse
76 .get_notebook_contributors(did_str, ¬ebook_row.rkey)
77 .await
78 .map_err(|e| {
79 tracing::error!("Failed to get notebook contributors: {}", e);
80 XrpcErrorResponse::internal_error("Database query failed")
81 })?;
82
83 // Check if there are more entries
84 let has_more = entry_rows.len() > limit as usize;
85 let entry_rows: Vec<_> = entry_rows.into_iter().take(limit as usize).collect();
86
87 // Collect all unique author DIDs for batch hydration
88 // Start with evidence-based notebook contributors
89 let mut all_author_dids: HashSet<&str> =
90 notebook_contributors.iter().map(|s| s.as_str()).collect();
91 // Also include author_dids from the record (explicit declarations)
92 for did in ¬ebook_row.author_dids {
93 all_author_dids.insert(did.as_str());
94 }
95 for entry in &entry_rows {
96 for did in &entry.author_dids {
97 all_author_dids.insert(did.as_str());
98 }
99 }
100
101 // Batch fetch profiles
102 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
103 let profiles = state
104 .clickhouse
105 .get_profiles_batch(&author_dids_vec)
106 .await
107 .map_err(|e| {
108 tracing::error!("Failed to batch fetch profiles: {}", e);
109 XrpcErrorResponse::internal_error("Database query failed")
110 })?;
111
112 // Build lookup map
113 let profile_map: HashMap<&str, &ProfileRow> =
114 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
115
116 // Build NotebookView
117 let notebook_uri = AtUri::new(¬ebook_row.uri).map_err(|e| {
118 tracing::error!("Invalid notebook URI in db: {}", e);
119 XrpcErrorResponse::internal_error("Invalid URI stored")
120 })?;
121
122 let notebook_cid = Cid::new(notebook_row.cid.as_bytes()).map_err(|e| {
123 tracing::error!("Invalid notebook CID in db: {}", e);
124 XrpcErrorResponse::internal_error("Invalid CID stored")
125 })?;
126
127 // Hydrate notebook authors (evidence-based contributors)
128 let authors = hydrate_authors(¬ebook_contributors, &profile_map)?;
129
130 // Parse record JSON
131 let record = parse_record_json(¬ebook_row.record)?;
132
133 let notebook = NotebookView::new()
134 .uri(notebook_uri.into_static())
135 .cid(notebook_cid.into_static())
136 .authors(authors)
137 .record(record)
138 .indexed_at(notebook_row.indexed_at.fixed_offset())
139 .maybe_title(non_empty_cowstr(¬ebook_row.title))
140 .maybe_path(non_empty_cowstr(¬ebook_row.path))
141 .build();
142
143 // Build entry views (first pass: create EntryViews)
144 let mut entry_views: Vec<EntryView<'static>> = Vec::with_capacity(entry_rows.len());
145 for entry_row in entry_rows.iter() {
146 let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| {
147 tracing::error!("Invalid entry URI in db: {}", e);
148 XrpcErrorResponse::internal_error("Invalid URI stored")
149 })?;
150
151 let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| {
152 tracing::error!("Invalid entry CID in db: {}", e);
153 XrpcErrorResponse::internal_error("Invalid CID stored")
154 })?;
155
156 let entry_contributors = state
157 .clickhouse
158 .get_entry_contributors(did_str, &entry_row.rkey)
159 .await
160 .map_err(|e| {
161 tracing::error!("Failed to get entry contributors: {}", e);
162 XrpcErrorResponse::internal_error("Database query failed")
163 })?;
164
165 let mut all_author_dids: HashSet<SmolStr> = entry_contributors.iter().cloned().collect();
166 // Also include author_dids from the record (explicit declarations)
167 for did in &entry_row.author_dids {
168 all_author_dids.insert(did.clone());
169 }
170
171 let author_dids_vec: Vec<SmolStr> = all_author_dids.into_iter().collect();
172
173 // Hydrate entry authors
174 let entry_authors = hydrate_authors(&author_dids_vec, &profile_map)?;
175
176 // Parse record JSON
177 let entry_record = parse_record_json(&entry_row.record)?;
178
179 let entry_view = EntryView::new()
180 .uri(entry_uri.into_static())
181 .cid(entry_cid.into_static())
182 .authors(entry_authors)
183 .record(entry_record)
184 .indexed_at(entry_row.indexed_at.fixed_offset())
185 .maybe_title(non_empty_cowstr(&entry_row.title))
186 .maybe_path(non_empty_cowstr(&entry_row.path))
187 .build();
188
189 entry_views.push(entry_view);
190 }
191
192 // Build BookEntryViews with prev/next navigation
193 let mut entries: Vec<BookEntryView<'static>> = Vec::with_capacity(entry_views.len());
194 for (idx, entry_view) in entry_views.iter().enumerate() {
195 let prev = (idx > 0).then(|| {
196 BookEntryRef::new()
197 .entry(entry_views[idx - 1].clone())
198 .build()
199 });
200 let next = entry_views
201 .get(idx + 1)
202 .map(|e| BookEntryRef::new().entry(e.clone()).build());
203
204 entries.push(
205 BookEntryView::new()
206 .entry(entry_view.clone())
207 .index(idx as i64)
208 .maybe_prev(prev)
209 .maybe_next(next)
210 .build(),
211 );
212 }
213
214 // Build cursor for pagination (position-based)
215 let next_cursor = if has_more {
216 // Position = cursor offset + number of entries returned
217 let last_position = cursor.unwrap_or(0) + entry_rows.len() as u32;
218 Some(last_position.to_string().into())
219 } else {
220 None
221 };
222
223 Ok(Json(
224 ResolveNotebookOutput {
225 notebook,
226 entries,
227 entry_cursor: next_cursor,
228 extra_data: None,
229 }
230 .into_static(),
231 ))
232}
233
234/// Handle sh.weaver.notebook.getNotebook
235///
236/// Gets a notebook by AT URI, returns notebook view with entry refs.
237pub async fn get_notebook(
238 State(state): State<AppState>,
239 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
240 ExtractXrpc(args): ExtractXrpc<GetNotebookRequest>,
241) -> Result<Json<GetNotebookOutput<'static>>, XrpcErrorResponse> {
242 let _viewer: Viewer = viewer;
243
244 // Parse the AT URI to extract authority and rkey
245 let uri = &args.notebook;
246 let authority = uri.authority();
247 let rkey = uri
248 .rkey()
249 .ok_or_else(|| XrpcErrorResponse::invalid_request("URI must include rkey"))?;
250 let rkey_str = rkey.as_ref();
251
252 // Resolve authority to DID (could be handle or DID)
253 let did = resolve_actor(&state, authority).await?;
254 let did_str = did.as_str();
255
256 // Fetch notebook by DID + rkey
257 let notebook_row = state
258 .clickhouse
259 .get_notebook(did_str, rkey_str)
260 .await
261 .map_err(|e| {
262 tracing::error!("Failed to get notebook: {}", e);
263 XrpcErrorResponse::internal_error("Database query failed")
264 })?
265 .ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?;
266
267 // Fetch notebook contributors
268 let notebook_contributors = state
269 .clickhouse
270 .get_notebook_contributors(did_str, rkey_str)
271 .await
272 .map_err(|e| {
273 tracing::error!("Failed to get notebook contributors: {}", e);
274 XrpcErrorResponse::internal_error("Database query failed")
275 })?;
276
277 // Collect all author DIDs for batch hydration
278 let mut all_author_dids: HashSet<&str> =
279 notebook_contributors.iter().map(|s| s.as_str()).collect();
280 for did in ¬ebook_row.author_dids {
281 all_author_dids.insert(did.as_str());
282 }
283
284 // Batch fetch profiles
285 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
286 let profiles = state
287 .clickhouse
288 .get_profiles_batch(&author_dids_vec)
289 .await
290 .map_err(|e| {
291 tracing::error!("Failed to batch fetch profiles: {}", e);
292 XrpcErrorResponse::internal_error("Database query failed")
293 })?;
294
295 let profile_map: HashMap<&str, &ProfileRow> =
296 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
297
298 // Build NotebookView
299 let notebook_uri = AtUri::new(¬ebook_row.uri).map_err(|e| {
300 tracing::error!("Invalid notebook URI in db: {}", e);
301 XrpcErrorResponse::internal_error("Invalid URI stored")
302 })?;
303
304 let notebook_cid = Cid::new(notebook_row.cid.as_bytes()).map_err(|e| {
305 tracing::error!("Invalid notebook CID in db: {}", e);
306 XrpcErrorResponse::internal_error("Invalid CID stored")
307 })?;
308
309 let authors = hydrate_authors(¬ebook_contributors, &profile_map)?;
310 let record = parse_record_json(¬ebook_row.record)?;
311
312 let notebook = NotebookView::new()
313 .uri(notebook_uri.into_static())
314 .cid(notebook_cid.into_static())
315 .authors(authors)
316 .record(record.clone())
317 .indexed_at(notebook_row.indexed_at.fixed_offset())
318 .maybe_title(non_empty_cowstr(¬ebook_row.title))
319 .maybe_path(non_empty_cowstr(¬ebook_row.path))
320 .build();
321
322 // Deserialize Book from record to get entry_list
323 let book: weaver_api::sh_weaver::notebook::book::Book =
324 jacquard::from_data(&record).map_err(|e| {
325 tracing::error!("Failed to deserialize Book record: {}", e);
326 XrpcErrorResponse::internal_error("Invalid Book record")
327 })?;
328
329 let entries: Vec<StrongRef<'static>> = book
330 .entry_list
331 .into_iter()
332 .map(|r| r.into_static())
333 .collect();
334
335 Ok(Json(
336 GetNotebookOutput {
337 notebook,
338 entries,
339 extra_data: None,
340 }
341 .into_static(),
342 ))
343}
344
345/// Handle sh.weaver.notebook.getEntry
346///
347/// Gets an entry by AT URI.
348pub async fn get_entry(
349 State(state): State<AppState>,
350 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
351 ExtractXrpc(args): ExtractXrpc<GetEntryRequest>,
352) -> Result<Json<GetEntryOutput<'static>>, XrpcErrorResponse> {
353 let _viewer: Viewer = viewer;
354
355 // Parse the AT URI to extract authority and rkey
356 let uri = &args.uri;
357 let authority = uri.authority();
358 let rkey = uri
359 .rkey()
360 .ok_or_else(|| XrpcErrorResponse::invalid_request("URI must include rkey"))?;
361 let rkey_str = rkey.as_ref();
362
363 // Resolve authority to DID (could be handle or DID)
364 let did = resolve_actor(&state, authority).await?;
365 let did_str = did.as_str();
366
367 // Fetch entry and contributors in parallel
368 let contributors_result = state
369 .clickhouse
370 .get_entry_contributors(did_str, rkey_str)
371 .await
372 .map_err(|e| {
373 tracing::error!("Failed to get contributors: {}", e);
374 XrpcErrorResponse::internal_error("Database query failed")
375 })?;
376 // Merge contributors with author_dids from record (dedupe)
377 let mut all_author_dids: HashSet<&str> =
378 contributors_result.iter().map(|s| s.as_str()).collect();
379
380 let entry_result = state
381 .clickhouse
382 .get_entry(
383 rkey_str,
384 &all_author_dids.iter().cloned().collect::<Vec<_>>(),
385 )
386 .await
387 .map_err(|e| {
388 tracing::error!("Failed to get entry: {}", e);
389 XrpcErrorResponse::internal_error("Database query failed")
390 })?;
391 let entry_row = entry_result.ok_or_else(|| XrpcErrorResponse::not_found("Entry not found"))?;
392
393 for did in &entry_row.author_dids {
394 all_author_dids.insert(did.as_str());
395 }
396
397 // Fetch profiles for all authors
398 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
399 let profiles = state
400 .clickhouse
401 .get_profiles_batch(&author_dids_vec)
402 .await
403 .map_err(|e| {
404 tracing::error!("Failed to fetch profiles: {}", e);
405 XrpcErrorResponse::internal_error("Database query failed")
406 })?;
407
408 let profile_map: HashMap<&str, &ProfileRow> =
409 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
410
411 // Build EntryView - use contributors as the author list (evidence-based)
412 let entry_view = build_entry_view_with_authors(&entry_row, &contributors_result, &profile_map)?;
413
414 Ok(Json(
415 GetEntryOutput {
416 value: entry_view,
417 extra_data: None,
418 }
419 .into_static(),
420 ))
421}
422
423/// Handle sh.weaver.notebook.resolveEntry
424///
425/// Resolves an entry by actor + notebook name + entry name.
426pub async fn resolve_entry(
427 State(state): State<AppState>,
428 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
429 ExtractXrpc(args): ExtractXrpc<ResolveEntryRequest>,
430) -> Result<Json<ResolveEntryOutput<'static>>, XrpcErrorResponse> {
431 let _viewer: Viewer = viewer;
432
433 // Resolve actor to DID
434 let did = resolve_actor(&state, &args.actor).await?;
435 let did_str = did.as_str();
436
437 // Resolve notebook and entry in parallel - both just need the DID
438 let notebook_name = args.notebook.as_ref();
439 let entry_name = args.entry.as_ref();
440
441 let (notebook_result, entry_result) = tokio::try_join!(
442 async {
443 state
444 .clickhouse
445 .resolve_notebook(did_str, notebook_name)
446 .await
447 .map_err(|e| {
448 tracing::error!("Failed to resolve notebook: {}", e);
449 XrpcErrorResponse::internal_error("Database query failed")
450 })
451 },
452 // TODO: fix this, as we do need the entries to know for sure which, in case of collisions
453 async {
454 state
455 .clickhouse
456 .resolve_entry(did_str, entry_name)
457 .await
458 .map_err(|e| {
459 tracing::error!("Failed to resolve entry: {}", e);
460 XrpcErrorResponse::internal_error("Database query failed")
461 })
462 }
463 )?;
464
465 let _notebook_row =
466 notebook_result.ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?;
467 let entry_row = entry_result.ok_or_else(|| XrpcErrorResponse::not_found("Entry not found"))?;
468
469 // Fetch contributors and notebooks in parallel (need entry rkey, so must wait for entry resolution)
470 let (contributors, notebooks) = tokio::try_join!(
471 async {
472 state
473 .clickhouse
474 .get_entry_contributors(did_str, &entry_row.rkey)
475 .await
476 .map_err(|e| {
477 tracing::error!("Failed to get contributors: {}", e);
478 XrpcErrorResponse::internal_error("Database query failed")
479 })
480 },
481 async {
482 state
483 .clickhouse
484 .get_notebooks_for_entry(did_str, &entry_row.rkey)
485 .await
486 .map_err(|e| {
487 tracing::error!("Failed to get notebooks for entry: {}", e);
488 XrpcErrorResponse::internal_error("Database query failed")
489 })
490 }
491 )?;
492
493 // Merge contributors with author_dids from record (dedupe)
494 let mut all_author_dids: HashSet<&str> = contributors.iter().map(|s| s.as_str()).collect();
495 for did in &entry_row.author_dids {
496 all_author_dids.insert(did.as_str());
497 }
498
499 // Fetch profiles for all authors
500 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
501 let profiles = state
502 .clickhouse
503 .get_profiles_batch(&author_dids_vec)
504 .await
505 .map_err(|e| {
506 tracing::error!("Failed to fetch profiles: {}", e);
507 XrpcErrorResponse::internal_error("Database query failed")
508 })?;
509
510 let profile_map: HashMap<&str, &ProfileRow> =
511 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
512
513 // Build EntryView - use contributors as the author list (evidence-based)
514 let entry_view = build_entry_view_with_authors(&entry_row, &contributors, &profile_map)?;
515
516 // Parse the record for the output
517 let record = parse_record_json(&entry_row.record)?;
518
519 // Actual count of notebooks containing this entry
520 let notebook_count = notebooks.len() as i64;
521
522 Ok(Json(
523 ResolveEntryOutput {
524 entry: entry_view,
525 notebook_count,
526 notebooks: None,
527 record,
528 extra_data: None,
529 }
530 .into_static(),
531 ))
532}
533
534/// Build an EntryView from an EntryRow with explicit author list (evidence-based contributors)
535pub fn build_entry_view_with_authors(
536 entry_row: &crate::clickhouse::EntryRow,
537 author_dids: &[SmolStr],
538 profile_map: &HashMap<&str, &ProfileRow>,
539) -> Result<EntryView<'static>, XrpcErrorResponse> {
540 let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| {
541 tracing::error!("Invalid entry URI in db: {}", e);
542 XrpcErrorResponse::internal_error("Invalid URI stored")
543 })?;
544
545 let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| {
546 tracing::error!("Invalid entry CID in db: {}", e);
547 XrpcErrorResponse::internal_error("Invalid CID stored")
548 })?;
549
550 let authors = hydrate_authors(author_dids, profile_map)?;
551 let record = parse_record_json(&entry_row.record)?;
552
553 let entry_view = EntryView::new()
554 .uri(entry_uri.into_static())
555 .cid(entry_cid.into_static())
556 .authors(authors)
557 .record(record)
558 .indexed_at(entry_row.indexed_at.fixed_offset())
559 .maybe_title(non_empty_cowstr(&entry_row.title))
560 .maybe_path(non_empty_cowstr(&entry_row.path))
561 .build();
562
563 Ok(entry_view)
564}
565
566/// Convert SmolStr to Option<CowStr> if non-empty
567pub fn non_empty_cowstr(s: &smol_str::SmolStr) -> Option<jacquard::CowStr<'static>> {
568 if s.is_empty() {
569 None
570 } else {
571 Some(s.to_cowstr().into_static())
572 }
573}
574
575/// Parse record JSON string into owned Data
576pub fn parse_record_json(json: &str) -> Result<Data<'static>, XrpcErrorResponse> {
577 let data: Data<'_> = serde_json::from_str(json).map_err(|e| {
578 tracing::error!("Failed to parse record JSON: {}", e);
579 XrpcErrorResponse::internal_error("Invalid record JSON stored")
580 })?;
581 Ok(data.into_static())
582}
583
584/// Hydrate author list from DIDs using profile map
585pub fn hydrate_authors(
586 author_dids: &[SmolStr],
587 profile_map: &HashMap<&str, &ProfileRow>,
588) -> Result<Vec<AuthorListView<'static>>, XrpcErrorResponse> {
589 let mut authors = Vec::with_capacity(author_dids.len());
590
591 for (idx, did_str) in author_dids.iter().enumerate() {
592 let profile_data = if let Some(profile) = profile_map.get(did_str.as_str()) {
593 profile_to_data_view(profile)?
594 } else {
595 // No profile found - create minimal view with just the DID
596 let did = Did::new(did_str).map_err(|e| {
597 tracing::error!("Invalid DID in author_dids: {}", e);
598 XrpcErrorResponse::internal_error("Invalid DID stored")
599 })?;
600
601 let inner_profile = ProfileView::new()
602 .did(did.into_static())
603 .handle(
604 Handle::new(did_str)
605 .unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap()),
606 )
607 .build();
608
609 ProfileDataView::new()
610 .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile)))
611 .build()
612 };
613
614 let author_view = AuthorListView::new()
615 .index(idx as i64)
616 .record(profile_data.into_static())
617 .build();
618
619 authors.push(author_view);
620 }
621
622 Ok(authors)
623}
624
625/// Convert ProfileRow to ProfileDataView
626pub fn profile_to_data_view(
627 profile: &ProfileRow,
628) -> Result<ProfileDataView<'static>, XrpcErrorResponse> {
629 let did = Did::new(&profile.did).map_err(|e| {
630 tracing::error!("Invalid DID in profile: {}", e);
631 XrpcErrorResponse::internal_error("Invalid DID stored")
632 })?;
633
634 let handle = if profile.handle.is_empty() {
635 // Use DID as fallback handle (not ideal but functional)
636 Handle::new(&profile.did).unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap())
637 } else {
638 Handle::new(&profile.handle).map_err(|e| {
639 tracing::error!("Invalid handle in profile: {}", e);
640 XrpcErrorResponse::internal_error("Invalid handle stored")
641 })?
642 };
643
644 // Build avatar URL from CID if present
645 let avatar = if !profile.avatar_cid.is_empty() {
646 let url = format!(
647 "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg",
648 profile.did, profile.avatar_cid
649 );
650 Uri::new_owned(url).ok()
651 } else {
652 None
653 };
654
655 // Build banner URL from CID if present
656 let banner = if !profile.banner_cid.is_empty() {
657 let url = format!(
658 "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg",
659 profile.did, profile.banner_cid
660 );
661 Uri::new_owned(url).ok()
662 } else {
663 None
664 };
665
666 let inner_profile = ProfileView::new()
667 .did(did.into_static())
668 .handle(handle.into_static())
669 .maybe_display_name(non_empty_cowstr(&profile.display_name))
670 .maybe_description(non_empty_cowstr(&profile.description))
671 .maybe_avatar(avatar)
672 .maybe_banner(banner)
673 .build();
674
675 let profile_data = ProfileDataView::new()
676 .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile)))
677 .build();
678
679 Ok(profile_data)
680}
681
682/// Parse cursor string to i64 timestamp millis
683pub fn parse_cursor(cursor: Option<&str>) -> Result<Option<i64>, XrpcErrorResponse> {
684 cursor
685 .map(|c| {
686 c.parse::<i64>()
687 .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))
688 })
689 .transpose()
690}
691
692/// Handle sh.weaver.notebook.getNotebookFeed
693///
694/// Returns a global feed of notebooks.
695pub async fn get_notebook_feed(
696 State(state): State<AppState>,
697 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
698 ExtractXrpc(args): ExtractXrpc<GetNotebookFeedRequest>,
699) -> Result<Json<GetNotebookFeedOutput<'static>>, XrpcErrorResponse> {
700 let _viewer: Viewer = viewer;
701
702 let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32;
703 let cursor = parse_cursor(args.cursor.as_deref())?;
704 let algorithm = args.algorithm.as_deref().unwrap_or("chronological");
705
706 // Convert tags to &[&str] if present
707 let tags_vec: Vec<&str> = args
708 .tags
709 .as_ref()
710 .map(|t| t.iter().map(|s| s.as_ref()).collect())
711 .unwrap_or_default();
712 let tags = if tags_vec.is_empty() {
713 None
714 } else {
715 Some(tags_vec.as_slice())
716 };
717
718 let notebook_rows = state
719 .clickhouse
720 .get_notebook_feed(algorithm, tags, limit + 1, cursor)
721 .await
722 .map_err(|e| {
723 tracing::error!("Failed to get notebook feed: {}", e);
724 XrpcErrorResponse::internal_error("Database query failed")
725 })?;
726
727 // Check if there are more
728 let has_more = notebook_rows.len() > limit as usize;
729 let notebook_rows: Vec<_> = notebook_rows.into_iter().take(limit as usize).collect();
730
731 // Collect author DIDs for hydration
732 let mut all_author_dids: HashSet<&str> = HashSet::new();
733 for nb in ¬ebook_rows {
734 for did in &nb.author_dids {
735 all_author_dids.insert(did.as_str());
736 }
737 }
738
739 // Batch fetch profiles
740 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
741 let profiles = state
742 .clickhouse
743 .get_profiles_batch(&author_dids_vec)
744 .await
745 .map_err(|e| {
746 tracing::error!("Failed to batch fetch profiles: {}", e);
747 XrpcErrorResponse::internal_error("Database query failed")
748 })?;
749
750 let profile_map: HashMap<&str, &ProfileRow> =
751 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
752
753 // Build NotebookViews
754 let mut notebooks: Vec<NotebookView<'static>> = Vec::with_capacity(notebook_rows.len());
755 for nb_row in ¬ebook_rows {
756 let notebook_uri = AtUri::new(&nb_row.uri).map_err(|e| {
757 tracing::error!("Invalid notebook URI in db: {}", e);
758 XrpcErrorResponse::internal_error("Invalid URI stored")
759 })?;
760
761 let notebook_cid = Cid::new(nb_row.cid.as_bytes()).map_err(|e| {
762 tracing::error!("Invalid notebook CID in db: {}", e);
763 XrpcErrorResponse::internal_error("Invalid CID stored")
764 })?;
765
766 let authors = hydrate_authors(&nb_row.author_dids, &profile_map)?;
767 let record = parse_record_json(&nb_row.record)?;
768
769 let notebook = NotebookView::new()
770 .uri(notebook_uri.into_static())
771 .cid(notebook_cid.into_static())
772 .authors(authors)
773 .record(record)
774 .indexed_at(nb_row.indexed_at.fixed_offset())
775 .maybe_title(non_empty_cowstr(&nb_row.title))
776 .maybe_path(non_empty_cowstr(&nb_row.path))
777 .build();
778
779 notebooks.push(notebook);
780 }
781
782 // Build cursor for pagination (created_at millis)
783 let next_cursor = if has_more {
784 notebook_rows
785 .last()
786 .map(|nb| nb.created_at.timestamp_millis().to_cowstr().into_static())
787 } else {
788 None
789 };
790
791 Ok(Json(
792 GetNotebookFeedOutput {
793 notebooks,
794 cursor: next_cursor,
795 extra_data: None,
796 }
797 .into_static(),
798 ))
799}
800
801/// Handle sh.weaver.notebook.getEntryFeed
802///
803/// Returns a global feed of entries.
804pub async fn get_entry_feed(
805 State(state): State<AppState>,
806 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
807 ExtractXrpc(args): ExtractXrpc<GetEntryFeedRequest>,
808) -> Result<Json<GetEntryFeedOutput<'static>>, XrpcErrorResponse> {
809 let _viewer: Viewer = viewer;
810
811 let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32;
812 let cursor = parse_cursor(args.cursor.as_deref())?;
813 let algorithm = args.algorithm.as_deref().unwrap_or("chronological");
814
815 // Convert tags to &[&str] if present
816 let tags_vec: Vec<&str> = args
817 .tags
818 .as_ref()
819 .map(|t| t.iter().map(|s| s.as_ref()).collect())
820 .unwrap_or_default();
821 let tags = if tags_vec.is_empty() {
822 None
823 } else {
824 Some(tags_vec.as_slice())
825 };
826
827 let entry_rows = state
828 .clickhouse
829 .get_entry_feed(algorithm, tags, limit + 1, cursor)
830 .await
831 .map_err(|e| {
832 tracing::error!("Failed to get entry feed: {}", e);
833 XrpcErrorResponse::internal_error("Database query failed")
834 })?;
835
836 // Check if there are more
837 let has_more = entry_rows.len() > limit as usize;
838 let entry_rows: Vec<_> = entry_rows.into_iter().take(limit as usize).collect();
839
840 // Batch fetch contributors for all entries
841 let entry_keys: Vec<(&str, &str)> = entry_rows
842 .iter()
843 .map(|e| (e.did.as_str(), e.rkey.as_str()))
844 .collect();
845 let contributors_map = state
846 .clickhouse
847 .get_entry_contributors_batch(&entry_keys)
848 .await
849 .map_err(|e| {
850 tracing::error!("Failed to batch fetch contributors: {}", e);
851 XrpcErrorResponse::internal_error("Database query failed")
852 })?;
853
854 // Collect all contributor DIDs for profile hydration
855 let mut all_author_dids: HashSet<&str> = HashSet::new();
856 for contributors in contributors_map.values() {
857 for did in contributors {
858 all_author_dids.insert(did.as_str());
859 }
860 }
861
862 // Batch fetch profiles
863 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
864 let profiles = state
865 .clickhouse
866 .get_profiles_batch(&author_dids_vec)
867 .await
868 .map_err(|e| {
869 tracing::error!("Failed to batch fetch profiles: {}", e);
870 XrpcErrorResponse::internal_error("Database query failed")
871 })?;
872
873 let profile_map: HashMap<&str, &ProfileRow> =
874 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
875
876 // Build FeedEntryViews
877 let mut feed: Vec<FeedEntryView<'static>> = Vec::with_capacity(entry_rows.len());
878 for entry_row in &entry_rows {
879 // Get contributors for this entry
880 let entry_key = (entry_row.did.clone(), entry_row.rkey.clone());
881 let contributors = contributors_map
882 .get(&entry_key)
883 .map(|v| v.as_slice())
884 .unwrap_or(&[]);
885
886 let entry_view = build_entry_view_with_authors(entry_row, contributors, &profile_map)?;
887
888 let feed_entry = FeedEntryView::new().entry(entry_view).build();
889
890 feed.push(feed_entry);
891 }
892
893 // Build cursor for pagination (created_at millis)
894 let next_cursor = if has_more {
895 entry_rows
896 .last()
897 .map(|e| e.created_at.timestamp_millis().to_cowstr().into_static())
898 } else {
899 None
900 };
901
902 Ok(Json(
903 GetEntryFeedOutput {
904 feed,
905 cursor: next_cursor,
906 extra_data: None,
907 }
908 .into_static(),
909 ))
910}
911
912/// Handle sh.weaver.notebook.getBookEntry
913///
914/// Returns an entry at a specific index within a notebook, with prev/next navigation.
915pub async fn get_book_entry(
916 State(state): State<AppState>,
917 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
918 ExtractXrpc(args): ExtractXrpc<GetBookEntryRequest>,
919) -> Result<Json<GetBookEntryOutput<'static>>, XrpcErrorResponse> {
920 let _viewer: Viewer = viewer;
921
922 // Parse the notebook URI
923 let notebook_uri = &args.notebook;
924 let authority = notebook_uri.authority();
925 let notebook_rkey = notebook_uri
926 .rkey()
927 .ok_or_else(|| XrpcErrorResponse::invalid_request("Notebook URI must include rkey"))?;
928
929 // Resolve authority to DID
930 let notebook_did = resolve_actor(&state, authority).await?;
931 let notebook_did_str = notebook_did.as_str();
932 let notebook_rkey_str = notebook_rkey.as_ref();
933
934 let index = args.index.unwrap_or(0).max(0) as u32;
935
936 // Fetch entry at index with prev/next
937 let result = state
938 .clickhouse
939 .get_book_entry_at_index(notebook_did_str, notebook_rkey_str, index)
940 .await
941 .map_err(|e| {
942 tracing::error!("Failed to get book entry: {}", e);
943 XrpcErrorResponse::internal_error("Database query failed")
944 })?;
945
946 let (current_row, prev_row, next_row) =
947 result.ok_or_else(|| XrpcErrorResponse::not_found("Entry not found at index"))?;
948
949 // Collect all author DIDs for hydration
950 let mut all_author_dids: HashSet<&str> = HashSet::new();
951 for did in ¤t_row.author_dids {
952 all_author_dids.insert(did.as_str());
953 }
954 if let Some(ref prev) = prev_row {
955 for did in &prev.author_dids {
956 all_author_dids.insert(did.as_str());
957 }
958 }
959 if let Some(ref next) = next_row {
960 for did in &next.author_dids {
961 all_author_dids.insert(did.as_str());
962 }
963 }
964
965 // Batch fetch profiles
966 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
967 let profiles = state
968 .clickhouse
969 .get_profiles_batch(&author_dids_vec)
970 .await
971 .map_err(|e| {
972 tracing::error!("Failed to fetch profiles: {}", e);
973 XrpcErrorResponse::internal_error("Database query failed")
974 })?;
975
976 let profile_map: HashMap<&str, &ProfileRow> =
977 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
978
979 // Build the current entry view
980 let entry_view = build_entry_view(¤t_row, &profile_map)?;
981
982 // Build prev/next refs if present
983 let prev_ref = if let Some(ref prev) = prev_row {
984 let prev_view = build_entry_view(prev, &profile_map)?;
985 Some(BookEntryRef::new().entry(prev_view).build())
986 } else {
987 None
988 };
989
990 let next_ref = if let Some(ref next) = next_row {
991 let next_view = build_entry_view(next, &profile_map)?;
992 Some(BookEntryRef::new().entry(next_view).build())
993 } else {
994 None
995 };
996
997 let book_entry = BookEntryView::new()
998 .entry(entry_view)
999 .index(index as i64)
1000 .maybe_prev(prev_ref)
1001 .maybe_next(next_ref)
1002 .build();
1003
1004 Ok(Json(
1005 GetBookEntryOutput {
1006 value: book_entry,
1007 extra_data: None,
1008 }
1009 .into_static(),
1010 ))
1011}
1012
1013/// Build an EntryView from an EntryRow
1014pub fn build_entry_view(
1015 entry_row: &EntryRow,
1016 profile_map: &HashMap<&str, &ProfileRow>,
1017) -> Result<EntryView<'static>, XrpcErrorResponse> {
1018 let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| {
1019 tracing::error!("Invalid entry URI in db: {}", e);
1020 XrpcErrorResponse::internal_error("Invalid URI stored")
1021 })?;
1022
1023 let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| {
1024 tracing::error!("Invalid entry CID in db: {}", e);
1025 XrpcErrorResponse::internal_error("Invalid CID stored")
1026 })?;
1027
1028 let authors = hydrate_authors(&entry_row.author_dids, profile_map)?;
1029 let record = parse_record_json(&entry_row.record)?;
1030
1031 let entry_view = EntryView::new()
1032 .uri(entry_uri.into_static())
1033 .cid(entry_cid.into_static())
1034 .authors(authors)
1035 .record(record)
1036 .indexed_at(entry_row.indexed_at.fixed_offset())
1037 .maybe_title(non_empty_cowstr(&entry_row.title))
1038 .maybe_path(non_empty_cowstr(&entry_row.path))
1039 .build();
1040
1041 Ok(entry_view)
1042}
1043
1044/// Handle sh.weaver.notebook.getEntryNotebooks
1045///
1046/// Returns notebooks that contain a given entry.
1047pub async fn get_entry_notebooks(
1048 State(state): State<AppState>,
1049 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
1050 ExtractXrpc(args): ExtractXrpc<GetEntryNotebooksRequest>,
1051) -> Result<Json<GetEntryNotebooksOutput<'static>>, XrpcErrorResponse> {
1052 let _viewer: Viewer = viewer;
1053
1054 // Parse the entry URI
1055 let entry_uri = &args.entry;
1056 let authority = entry_uri.authority();
1057 let entry_rkey = entry_uri
1058 .rkey()
1059 .ok_or_else(|| XrpcErrorResponse::invalid_request("Entry URI must include rkey"))?;
1060
1061 // Resolve authority to DID
1062 let entry_did = resolve_actor(&state, authority).await?;
1063 let entry_did_str = entry_did.as_str();
1064 let entry_rkey_str = entry_rkey.as_ref();
1065
1066 // Get notebooks containing this entry
1067 let notebook_refs = state
1068 .clickhouse
1069 .get_notebooks_for_entry(entry_did_str, entry_rkey_str)
1070 .await
1071 .map_err(|e| {
1072 tracing::error!("Failed to get notebooks for entry: {}", e);
1073 XrpcErrorResponse::internal_error("Database query failed")
1074 })?;
1075
1076 if notebook_refs.is_empty() {
1077 return Ok(Json(
1078 GetEntryNotebooksOutput {
1079 notebooks: Vec::new(),
1080 extra_data: None,
1081 }
1082 .into_static(),
1083 ));
1084 }
1085
1086 // Fetch notebook details and owner profiles
1087 let mut notebooks = Vec::with_capacity(notebook_refs.len());
1088 let mut owner_dids: HashSet<&str> = HashSet::new();
1089
1090 // First pass: collect owner DIDs
1091 for (notebook_did, _notebook_rkey) in ¬ebook_refs {
1092 owner_dids.insert(notebook_did.as_str());
1093 }
1094
1095 // Batch fetch profiles
1096 let owner_dids_vec: Vec<&str> = owner_dids.into_iter().collect();
1097 let profiles = state
1098 .clickhouse
1099 .get_profiles_batch(&owner_dids_vec)
1100 .await
1101 .map_err(|e| {
1102 tracing::error!("Failed to batch fetch profiles: {}", e);
1103 XrpcErrorResponse::internal_error("Database query failed")
1104 })?;
1105
1106 let profile_map: HashMap<&str, &ProfileRow> =
1107 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
1108
1109 // Fetch each notebook's details
1110 for (notebook_did, notebook_rkey) in ¬ebook_refs {
1111 let notebook_row = state
1112 .clickhouse
1113 .get_notebook(notebook_did.as_str(), notebook_rkey.as_str())
1114 .await
1115 .map_err(|e| {
1116 tracing::error!("Failed to get notebook: {}", e);
1117 XrpcErrorResponse::internal_error("Database query failed")
1118 })?;
1119
1120 if let Some(nb) = notebook_row {
1121 let uri = AtUri::new(&nb.uri)
1122 .map_err(|_| XrpcErrorResponse::internal_error("Invalid notebook URI"))?
1123 .into_static();
1124
1125 let cid = Cid::new(nb.cid.as_bytes())
1126 .map_err(|_| XrpcErrorResponse::internal_error("Invalid notebook CID"))?
1127 .into_static();
1128
1129 // Get owner profile
1130 let owner = profile_map
1131 .get(notebook_did.as_str())
1132 .map(|p| crate::endpoints::collab::profile_to_view_basic(p))
1133 .transpose()?;
1134
1135 notebooks.push(
1136 NotebookRef::new()
1137 .uri(uri)
1138 .cid(cid)
1139 .maybe_title(non_empty_cowstr(&nb.title))
1140 .maybe_owner(owner)
1141 .build(),
1142 );
1143 }
1144 }
1145
1146 Ok(Json(
1147 GetEntryNotebooksOutput {
1148 notebooks,
1149 extra_data: None,
1150 }
1151 .into_static(),
1152 ))
1153}
1154
1155/// Handle sh.weaver.notebook.resolveGlobalNotebook
1156///
1157/// Resolves a notebook by global path for subdomain routing.
1158pub async fn resolve_global_notebook(
1159 State(state): State<AppState>,
1160 ExtractXrpc(args): ExtractXrpc<ResolveGlobalNotebookRequest>,
1161) -> Result<Json<ResolveGlobalNotebookOutput<'static>>, XrpcErrorResponse> {
1162 let path = args.path.as_ref();
1163
1164 let notebook_row = state
1165 .clickhouse
1166 .resolve_notebook_by_global_path(path)
1167 .await
1168 .map_err(|e| {
1169 tracing::error!("Failed to resolve global notebook: {}", e);
1170 XrpcErrorResponse::internal_error("Database query failed")
1171 })?
1172 .ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?;
1173
1174 // Fetch contributors for author hydration
1175 let notebook_contributors = state
1176 .clickhouse
1177 .get_notebook_contributors(¬ebook_row.did, ¬ebook_row.rkey)
1178 .await
1179 .map_err(|e| {
1180 tracing::error!("Failed to get notebook contributors: {}", e);
1181 XrpcErrorResponse::internal_error("Database query failed")
1182 })?;
1183
1184 // Collect author DIDs
1185 let mut all_author_dids: HashSet<&str> =
1186 notebook_contributors.iter().map(|s| s.as_str()).collect();
1187 for did in ¬ebook_row.author_dids {
1188 all_author_dids.insert(did.as_str());
1189 }
1190
1191 // Batch fetch profiles
1192 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
1193 let profiles = state
1194 .clickhouse
1195 .get_profiles_batch(&author_dids_vec)
1196 .await
1197 .map_err(|e| {
1198 tracing::error!("Failed to batch fetch profiles: {}", e);
1199 XrpcErrorResponse::internal_error("Database query failed")
1200 })?;
1201
1202 let profile_map: HashMap<&str, &ProfileRow> =
1203 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
1204
1205 // Build NotebookView
1206 let notebook_uri = AtUri::new(¬ebook_row.uri).map_err(|e| {
1207 tracing::error!("Invalid notebook URI in db: {}", e);
1208 XrpcErrorResponse::internal_error("Invalid URI stored")
1209 })?;
1210
1211 let notebook_cid = Cid::new(notebook_row.cid.as_bytes()).map_err(|e| {
1212 tracing::error!("Invalid notebook CID in db: {}", e);
1213 XrpcErrorResponse::internal_error("Invalid CID stored")
1214 })?;
1215
1216 let authors = hydrate_authors(¬ebook_contributors, &profile_map)?;
1217 let record = parse_record_json(¬ebook_row.record)?;
1218
1219 let notebook = NotebookView::new()
1220 .uri(notebook_uri.into_static())
1221 .cid(notebook_cid.into_static())
1222 .authors(authors)
1223 .record(record)
1224 .indexed_at(notebook_row.indexed_at.fixed_offset())
1225 .maybe_title(non_empty_cowstr(¬ebook_row.title))
1226 .maybe_path(non_empty_cowstr(¬ebook_row.path))
1227 .build();
1228
1229 Ok(Json(
1230 ResolveGlobalNotebookOutput {
1231 notebook,
1232 extra_data: None,
1233 }
1234 .into_static(),
1235 ))
1236}