at main 1236 lines 43 kB view raw
1//! sh.weaver.notebook.* endpoint handlers 2 3use std::collections::{HashMap, HashSet}; 4 5use axum::{Json, extract::State}; 6use jacquard::IntoStatic; 7use jacquard::cowstr::ToCowStr; 8use jacquard::types::string::{AtUri, Cid, Did, Handle, Uri}; 9use jacquard::types::value::Data; 10use jacquard_axum::ExtractXrpc; 11use jacquard_axum::service_auth::ExtractOptionalServiceAuth; 12use smol_str::SmolStr; 13use weaver_api::com_atproto::repo::strong_ref::StrongRef; 14use weaver_api::sh_weaver::actor::{ProfileDataView, ProfileDataViewInner, ProfileView}; 15use weaver_api::sh_weaver::notebook::{ 16 AuthorListView, BookEntryRef, BookEntryView, EntryView, FeedEntryView, NotebookView, 17 get_book_entry::{GetBookEntryOutput, GetBookEntryRequest}, 18 get_entry::{GetEntryOutput, GetEntryRequest}, 19 get_entry_feed::{GetEntryFeedOutput, GetEntryFeedRequest}, 20 get_entry_notebooks::{GetEntryNotebooksOutput, GetEntryNotebooksRequest, NotebookRef}, 21 get_notebook::{GetNotebookOutput, GetNotebookRequest}, 22 get_notebook_feed::{GetNotebookFeedOutput, GetNotebookFeedRequest}, 23 resolve_entry::{ResolveEntryOutput, ResolveEntryRequest}, 24 resolve_global_notebook::{ResolveGlobalNotebookOutput, ResolveGlobalNotebookRequest}, 25 resolve_notebook::{ResolveNotebookOutput, ResolveNotebookRequest}, 26}; 27 28use crate::clickhouse::{EntryRow, ProfileRow}; 29use crate::endpoints::actor::{Viewer, resolve_actor}; 30use crate::endpoints::repo::XrpcErrorResponse; 31use crate::server::AppState; 32 33/// Handle sh.weaver.notebook.resolveNotebook 34/// 35/// Resolves a notebook by actor + path/title, returns notebook with entries. 36pub async fn resolve_notebook( 37 State(state): State<AppState>, 38 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 39 ExtractXrpc(args): ExtractXrpc<ResolveNotebookRequest>, 40) -> Result<Json<ResolveNotebookOutput<'static>>, XrpcErrorResponse> { 41 // viewer can be used later for viewer state (bookmarks, read status, etc.) 42 let _viewer: Viewer = viewer; 43 44 // Resolve actor to DID 45 let did = resolve_actor(&state, &args.actor).await?; 46 let did_str = did.as_str(); 47 let name = args.name.as_ref(); 48 49 let limit = args.entry_limit.unwrap_or(50).clamp(1, 100) as u32; 50 let cursor: Option<u32> = args.entry_cursor.as_deref().and_then(|c| c.parse().ok()); 51 52 // Fetch notebook first to get its rkey 53 let notebook_row = state 54 .clickhouse 55 .resolve_notebook(did_str, name) 56 .await 57 .map_err(|e| { 58 tracing::error!("Failed to resolve notebook: {}", e); 59 XrpcErrorResponse::internal_error("Database query failed") 60 })? 61 .ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?; 62 63 // Now fetch entries using notebook's rkey 64 let entry_rows = state 65 .clickhouse 66 .list_notebook_entries(did_str, &notebook_row.rkey, limit + 1, cursor) 67 .await 68 .map_err(|e| { 69 tracing::error!("Failed to list entries: {}", e); 70 XrpcErrorResponse::internal_error("Database query failed") 71 })?; 72 73 // Fetch notebook contributors (evidence-based) 74 let notebook_contributors = state 75 .clickhouse 76 .get_notebook_contributors(did_str, &notebook_row.rkey) 77 .await 78 .map_err(|e| { 79 tracing::error!("Failed to get notebook contributors: {}", e); 80 XrpcErrorResponse::internal_error("Database query failed") 81 })?; 82 83 // Check if there are more entries 84 let has_more = entry_rows.len() > limit as usize; 85 let entry_rows: Vec<_> = entry_rows.into_iter().take(limit as usize).collect(); 86 87 // Collect all unique author DIDs for batch hydration 88 // Start with evidence-based notebook contributors 89 let mut all_author_dids: HashSet<&str> = 90 notebook_contributors.iter().map(|s| s.as_str()).collect(); 91 // Also include author_dids from the record (explicit declarations) 92 for did in &notebook_row.author_dids { 93 all_author_dids.insert(did.as_str()); 94 } 95 for entry in &entry_rows { 96 for did in &entry.author_dids { 97 all_author_dids.insert(did.as_str()); 98 } 99 } 100 101 // Batch fetch profiles 102 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 103 let profiles = state 104 .clickhouse 105 .get_profiles_batch(&author_dids_vec) 106 .await 107 .map_err(|e| { 108 tracing::error!("Failed to batch fetch profiles: {}", e); 109 XrpcErrorResponse::internal_error("Database query failed") 110 })?; 111 112 // Build lookup map 113 let profile_map: HashMap<&str, &ProfileRow> = 114 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 115 116 // Build NotebookView 117 let notebook_uri = AtUri::new(&notebook_row.uri).map_err(|e| { 118 tracing::error!("Invalid notebook URI in db: {}", e); 119 XrpcErrorResponse::internal_error("Invalid URI stored") 120 })?; 121 122 let notebook_cid = Cid::new(notebook_row.cid.as_bytes()).map_err(|e| { 123 tracing::error!("Invalid notebook CID in db: {}", e); 124 XrpcErrorResponse::internal_error("Invalid CID stored") 125 })?; 126 127 // Hydrate notebook authors (evidence-based contributors) 128 let authors = hydrate_authors(&notebook_contributors, &profile_map)?; 129 130 // Parse record JSON 131 let record = parse_record_json(&notebook_row.record)?; 132 133 let notebook = NotebookView::new() 134 .uri(notebook_uri.into_static()) 135 .cid(notebook_cid.into_static()) 136 .authors(authors) 137 .record(record) 138 .indexed_at(notebook_row.indexed_at.fixed_offset()) 139 .maybe_title(non_empty_cowstr(&notebook_row.title)) 140 .maybe_path(non_empty_cowstr(&notebook_row.path)) 141 .build(); 142 143 // Build entry views (first pass: create EntryViews) 144 let mut entry_views: Vec<EntryView<'static>> = Vec::with_capacity(entry_rows.len()); 145 for entry_row in entry_rows.iter() { 146 let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| { 147 tracing::error!("Invalid entry URI in db: {}", e); 148 XrpcErrorResponse::internal_error("Invalid URI stored") 149 })?; 150 151 let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| { 152 tracing::error!("Invalid entry CID in db: {}", e); 153 XrpcErrorResponse::internal_error("Invalid CID stored") 154 })?; 155 156 let entry_contributors = state 157 .clickhouse 158 .get_entry_contributors(did_str, &entry_row.rkey) 159 .await 160 .map_err(|e| { 161 tracing::error!("Failed to get entry contributors: {}", e); 162 XrpcErrorResponse::internal_error("Database query failed") 163 })?; 164 165 let mut all_author_dids: HashSet<SmolStr> = entry_contributors.iter().cloned().collect(); 166 // Also include author_dids from the record (explicit declarations) 167 for did in &entry_row.author_dids { 168 all_author_dids.insert(did.clone()); 169 } 170 171 let author_dids_vec: Vec<SmolStr> = all_author_dids.into_iter().collect(); 172 173 // Hydrate entry authors 174 let entry_authors = hydrate_authors(&author_dids_vec, &profile_map)?; 175 176 // Parse record JSON 177 let entry_record = parse_record_json(&entry_row.record)?; 178 179 let entry_view = EntryView::new() 180 .uri(entry_uri.into_static()) 181 .cid(entry_cid.into_static()) 182 .authors(entry_authors) 183 .record(entry_record) 184 .indexed_at(entry_row.indexed_at.fixed_offset()) 185 .maybe_title(non_empty_cowstr(&entry_row.title)) 186 .maybe_path(non_empty_cowstr(&entry_row.path)) 187 .build(); 188 189 entry_views.push(entry_view); 190 } 191 192 // Build BookEntryViews with prev/next navigation 193 let mut entries: Vec<BookEntryView<'static>> = Vec::with_capacity(entry_views.len()); 194 for (idx, entry_view) in entry_views.iter().enumerate() { 195 let prev = (idx > 0).then(|| { 196 BookEntryRef::new() 197 .entry(entry_views[idx - 1].clone()) 198 .build() 199 }); 200 let next = entry_views 201 .get(idx + 1) 202 .map(|e| BookEntryRef::new().entry(e.clone()).build()); 203 204 entries.push( 205 BookEntryView::new() 206 .entry(entry_view.clone()) 207 .index(idx as i64) 208 .maybe_prev(prev) 209 .maybe_next(next) 210 .build(), 211 ); 212 } 213 214 // Build cursor for pagination (position-based) 215 let next_cursor = if has_more { 216 // Position = cursor offset + number of entries returned 217 let last_position = cursor.unwrap_or(0) + entry_rows.len() as u32; 218 Some(last_position.to_string().into()) 219 } else { 220 None 221 }; 222 223 Ok(Json( 224 ResolveNotebookOutput { 225 notebook, 226 entries, 227 entry_cursor: next_cursor, 228 extra_data: None, 229 } 230 .into_static(), 231 )) 232} 233 234/// Handle sh.weaver.notebook.getNotebook 235/// 236/// Gets a notebook by AT URI, returns notebook view with entry refs. 237pub async fn get_notebook( 238 State(state): State<AppState>, 239 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 240 ExtractXrpc(args): ExtractXrpc<GetNotebookRequest>, 241) -> Result<Json<GetNotebookOutput<'static>>, XrpcErrorResponse> { 242 let _viewer: Viewer = viewer; 243 244 // Parse the AT URI to extract authority and rkey 245 let uri = &args.notebook; 246 let authority = uri.authority(); 247 let rkey = uri 248 .rkey() 249 .ok_or_else(|| XrpcErrorResponse::invalid_request("URI must include rkey"))?; 250 let rkey_str = rkey.as_ref(); 251 252 // Resolve authority to DID (could be handle or DID) 253 let did = resolve_actor(&state, authority).await?; 254 let did_str = did.as_str(); 255 256 // Fetch notebook by DID + rkey 257 let notebook_row = state 258 .clickhouse 259 .get_notebook(did_str, rkey_str) 260 .await 261 .map_err(|e| { 262 tracing::error!("Failed to get notebook: {}", e); 263 XrpcErrorResponse::internal_error("Database query failed") 264 })? 265 .ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?; 266 267 // Fetch notebook contributors 268 let notebook_contributors = state 269 .clickhouse 270 .get_notebook_contributors(did_str, rkey_str) 271 .await 272 .map_err(|e| { 273 tracing::error!("Failed to get notebook contributors: {}", e); 274 XrpcErrorResponse::internal_error("Database query failed") 275 })?; 276 277 // Collect all author DIDs for batch hydration 278 let mut all_author_dids: HashSet<&str> = 279 notebook_contributors.iter().map(|s| s.as_str()).collect(); 280 for did in &notebook_row.author_dids { 281 all_author_dids.insert(did.as_str()); 282 } 283 284 // Batch fetch profiles 285 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 286 let profiles = state 287 .clickhouse 288 .get_profiles_batch(&author_dids_vec) 289 .await 290 .map_err(|e| { 291 tracing::error!("Failed to batch fetch profiles: {}", e); 292 XrpcErrorResponse::internal_error("Database query failed") 293 })?; 294 295 let profile_map: HashMap<&str, &ProfileRow> = 296 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 297 298 // Build NotebookView 299 let notebook_uri = AtUri::new(&notebook_row.uri).map_err(|e| { 300 tracing::error!("Invalid notebook URI in db: {}", e); 301 XrpcErrorResponse::internal_error("Invalid URI stored") 302 })?; 303 304 let notebook_cid = Cid::new(notebook_row.cid.as_bytes()).map_err(|e| { 305 tracing::error!("Invalid notebook CID in db: {}", e); 306 XrpcErrorResponse::internal_error("Invalid CID stored") 307 })?; 308 309 let authors = hydrate_authors(&notebook_contributors, &profile_map)?; 310 let record = parse_record_json(&notebook_row.record)?; 311 312 let notebook = NotebookView::new() 313 .uri(notebook_uri.into_static()) 314 .cid(notebook_cid.into_static()) 315 .authors(authors) 316 .record(record.clone()) 317 .indexed_at(notebook_row.indexed_at.fixed_offset()) 318 .maybe_title(non_empty_cowstr(&notebook_row.title)) 319 .maybe_path(non_empty_cowstr(&notebook_row.path)) 320 .build(); 321 322 // Deserialize Book from record to get entry_list 323 let book: weaver_api::sh_weaver::notebook::book::Book = 324 jacquard::from_data(&record).map_err(|e| { 325 tracing::error!("Failed to deserialize Book record: {}", e); 326 XrpcErrorResponse::internal_error("Invalid Book record") 327 })?; 328 329 let entries: Vec<StrongRef<'static>> = book 330 .entry_list 331 .into_iter() 332 .map(|r| r.into_static()) 333 .collect(); 334 335 Ok(Json( 336 GetNotebookOutput { 337 notebook, 338 entries, 339 extra_data: None, 340 } 341 .into_static(), 342 )) 343} 344 345/// Handle sh.weaver.notebook.getEntry 346/// 347/// Gets an entry by AT URI. 348pub async fn get_entry( 349 State(state): State<AppState>, 350 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 351 ExtractXrpc(args): ExtractXrpc<GetEntryRequest>, 352) -> Result<Json<GetEntryOutput<'static>>, XrpcErrorResponse> { 353 let _viewer: Viewer = viewer; 354 355 // Parse the AT URI to extract authority and rkey 356 let uri = &args.uri; 357 let authority = uri.authority(); 358 let rkey = uri 359 .rkey() 360 .ok_or_else(|| XrpcErrorResponse::invalid_request("URI must include rkey"))?; 361 let rkey_str = rkey.as_ref(); 362 363 // Resolve authority to DID (could be handle or DID) 364 let did = resolve_actor(&state, authority).await?; 365 let did_str = did.as_str(); 366 367 // Fetch entry and contributors in parallel 368 let contributors_result = state 369 .clickhouse 370 .get_entry_contributors(did_str, rkey_str) 371 .await 372 .map_err(|e| { 373 tracing::error!("Failed to get contributors: {}", e); 374 XrpcErrorResponse::internal_error("Database query failed") 375 })?; 376 // Merge contributors with author_dids from record (dedupe) 377 let mut all_author_dids: HashSet<&str> = 378 contributors_result.iter().map(|s| s.as_str()).collect(); 379 380 let entry_result = state 381 .clickhouse 382 .get_entry( 383 rkey_str, 384 &all_author_dids.iter().cloned().collect::<Vec<_>>(), 385 ) 386 .await 387 .map_err(|e| { 388 tracing::error!("Failed to get entry: {}", e); 389 XrpcErrorResponse::internal_error("Database query failed") 390 })?; 391 let entry_row = entry_result.ok_or_else(|| XrpcErrorResponse::not_found("Entry not found"))?; 392 393 for did in &entry_row.author_dids { 394 all_author_dids.insert(did.as_str()); 395 } 396 397 // Fetch profiles for all authors 398 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 399 let profiles = state 400 .clickhouse 401 .get_profiles_batch(&author_dids_vec) 402 .await 403 .map_err(|e| { 404 tracing::error!("Failed to fetch profiles: {}", e); 405 XrpcErrorResponse::internal_error("Database query failed") 406 })?; 407 408 let profile_map: HashMap<&str, &ProfileRow> = 409 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 410 411 // Build EntryView - use contributors as the author list (evidence-based) 412 let entry_view = build_entry_view_with_authors(&entry_row, &contributors_result, &profile_map)?; 413 414 Ok(Json( 415 GetEntryOutput { 416 value: entry_view, 417 extra_data: None, 418 } 419 .into_static(), 420 )) 421} 422 423/// Handle sh.weaver.notebook.resolveEntry 424/// 425/// Resolves an entry by actor + notebook name + entry name. 426pub async fn resolve_entry( 427 State(state): State<AppState>, 428 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 429 ExtractXrpc(args): ExtractXrpc<ResolveEntryRequest>, 430) -> Result<Json<ResolveEntryOutput<'static>>, XrpcErrorResponse> { 431 let _viewer: Viewer = viewer; 432 433 // Resolve actor to DID 434 let did = resolve_actor(&state, &args.actor).await?; 435 let did_str = did.as_str(); 436 437 // Resolve notebook and entry in parallel - both just need the DID 438 let notebook_name = args.notebook.as_ref(); 439 let entry_name = args.entry.as_ref(); 440 441 let (notebook_result, entry_result) = tokio::try_join!( 442 async { 443 state 444 .clickhouse 445 .resolve_notebook(did_str, notebook_name) 446 .await 447 .map_err(|e| { 448 tracing::error!("Failed to resolve notebook: {}", e); 449 XrpcErrorResponse::internal_error("Database query failed") 450 }) 451 }, 452 // TODO: fix this, as we do need the entries to know for sure which, in case of collisions 453 async { 454 state 455 .clickhouse 456 .resolve_entry(did_str, entry_name) 457 .await 458 .map_err(|e| { 459 tracing::error!("Failed to resolve entry: {}", e); 460 XrpcErrorResponse::internal_error("Database query failed") 461 }) 462 } 463 )?; 464 465 let _notebook_row = 466 notebook_result.ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?; 467 let entry_row = entry_result.ok_or_else(|| XrpcErrorResponse::not_found("Entry not found"))?; 468 469 // Fetch contributors and notebooks in parallel (need entry rkey, so must wait for entry resolution) 470 let (contributors, notebooks) = tokio::try_join!( 471 async { 472 state 473 .clickhouse 474 .get_entry_contributors(did_str, &entry_row.rkey) 475 .await 476 .map_err(|e| { 477 tracing::error!("Failed to get contributors: {}", e); 478 XrpcErrorResponse::internal_error("Database query failed") 479 }) 480 }, 481 async { 482 state 483 .clickhouse 484 .get_notebooks_for_entry(did_str, &entry_row.rkey) 485 .await 486 .map_err(|e| { 487 tracing::error!("Failed to get notebooks for entry: {}", e); 488 XrpcErrorResponse::internal_error("Database query failed") 489 }) 490 } 491 )?; 492 493 // Merge contributors with author_dids from record (dedupe) 494 let mut all_author_dids: HashSet<&str> = contributors.iter().map(|s| s.as_str()).collect(); 495 for did in &entry_row.author_dids { 496 all_author_dids.insert(did.as_str()); 497 } 498 499 // Fetch profiles for all authors 500 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 501 let profiles = state 502 .clickhouse 503 .get_profiles_batch(&author_dids_vec) 504 .await 505 .map_err(|e| { 506 tracing::error!("Failed to fetch profiles: {}", e); 507 XrpcErrorResponse::internal_error("Database query failed") 508 })?; 509 510 let profile_map: HashMap<&str, &ProfileRow> = 511 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 512 513 // Build EntryView - use contributors as the author list (evidence-based) 514 let entry_view = build_entry_view_with_authors(&entry_row, &contributors, &profile_map)?; 515 516 // Parse the record for the output 517 let record = parse_record_json(&entry_row.record)?; 518 519 // Actual count of notebooks containing this entry 520 let notebook_count = notebooks.len() as i64; 521 522 Ok(Json( 523 ResolveEntryOutput { 524 entry: entry_view, 525 notebook_count, 526 notebooks: None, 527 record, 528 extra_data: None, 529 } 530 .into_static(), 531 )) 532} 533 534/// Build an EntryView from an EntryRow with explicit author list (evidence-based contributors) 535pub fn build_entry_view_with_authors( 536 entry_row: &crate::clickhouse::EntryRow, 537 author_dids: &[SmolStr], 538 profile_map: &HashMap<&str, &ProfileRow>, 539) -> Result<EntryView<'static>, XrpcErrorResponse> { 540 let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| { 541 tracing::error!("Invalid entry URI in db: {}", e); 542 XrpcErrorResponse::internal_error("Invalid URI stored") 543 })?; 544 545 let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| { 546 tracing::error!("Invalid entry CID in db: {}", e); 547 XrpcErrorResponse::internal_error("Invalid CID stored") 548 })?; 549 550 let authors = hydrate_authors(author_dids, profile_map)?; 551 let record = parse_record_json(&entry_row.record)?; 552 553 let entry_view = EntryView::new() 554 .uri(entry_uri.into_static()) 555 .cid(entry_cid.into_static()) 556 .authors(authors) 557 .record(record) 558 .indexed_at(entry_row.indexed_at.fixed_offset()) 559 .maybe_title(non_empty_cowstr(&entry_row.title)) 560 .maybe_path(non_empty_cowstr(&entry_row.path)) 561 .build(); 562 563 Ok(entry_view) 564} 565 566/// Convert SmolStr to Option<CowStr> if non-empty 567pub fn non_empty_cowstr(s: &smol_str::SmolStr) -> Option<jacquard::CowStr<'static>> { 568 if s.is_empty() { 569 None 570 } else { 571 Some(s.to_cowstr().into_static()) 572 } 573} 574 575/// Parse record JSON string into owned Data 576pub fn parse_record_json(json: &str) -> Result<Data<'static>, XrpcErrorResponse> { 577 let data: Data<'_> = serde_json::from_str(json).map_err(|e| { 578 tracing::error!("Failed to parse record JSON: {}", e); 579 XrpcErrorResponse::internal_error("Invalid record JSON stored") 580 })?; 581 Ok(data.into_static()) 582} 583 584/// Hydrate author list from DIDs using profile map 585pub fn hydrate_authors( 586 author_dids: &[SmolStr], 587 profile_map: &HashMap<&str, &ProfileRow>, 588) -> Result<Vec<AuthorListView<'static>>, XrpcErrorResponse> { 589 let mut authors = Vec::with_capacity(author_dids.len()); 590 591 for (idx, did_str) in author_dids.iter().enumerate() { 592 let profile_data = if let Some(profile) = profile_map.get(did_str.as_str()) { 593 profile_to_data_view(profile)? 594 } else { 595 // No profile found - create minimal view with just the DID 596 let did = Did::new(did_str).map_err(|e| { 597 tracing::error!("Invalid DID in author_dids: {}", e); 598 XrpcErrorResponse::internal_error("Invalid DID stored") 599 })?; 600 601 let inner_profile = ProfileView::new() 602 .did(did.into_static()) 603 .handle( 604 Handle::new(did_str) 605 .unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap()), 606 ) 607 .build(); 608 609 ProfileDataView::new() 610 .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile))) 611 .build() 612 }; 613 614 let author_view = AuthorListView::new() 615 .index(idx as i64) 616 .record(profile_data.into_static()) 617 .build(); 618 619 authors.push(author_view); 620 } 621 622 Ok(authors) 623} 624 625/// Convert ProfileRow to ProfileDataView 626pub fn profile_to_data_view( 627 profile: &ProfileRow, 628) -> Result<ProfileDataView<'static>, XrpcErrorResponse> { 629 let did = Did::new(&profile.did).map_err(|e| { 630 tracing::error!("Invalid DID in profile: {}", e); 631 XrpcErrorResponse::internal_error("Invalid DID stored") 632 })?; 633 634 let handle = if profile.handle.is_empty() { 635 // Use DID as fallback handle (not ideal but functional) 636 Handle::new(&profile.did).unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap()) 637 } else { 638 Handle::new(&profile.handle).map_err(|e| { 639 tracing::error!("Invalid handle in profile: {}", e); 640 XrpcErrorResponse::internal_error("Invalid handle stored") 641 })? 642 }; 643 644 // Build avatar URL from CID if present 645 let avatar = if !profile.avatar_cid.is_empty() { 646 let url = format!( 647 "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg", 648 profile.did, profile.avatar_cid 649 ); 650 Uri::new_owned(url).ok() 651 } else { 652 None 653 }; 654 655 // Build banner URL from CID if present 656 let banner = if !profile.banner_cid.is_empty() { 657 let url = format!( 658 "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg", 659 profile.did, profile.banner_cid 660 ); 661 Uri::new_owned(url).ok() 662 } else { 663 None 664 }; 665 666 let inner_profile = ProfileView::new() 667 .did(did.into_static()) 668 .handle(handle.into_static()) 669 .maybe_display_name(non_empty_cowstr(&profile.display_name)) 670 .maybe_description(non_empty_cowstr(&profile.description)) 671 .maybe_avatar(avatar) 672 .maybe_banner(banner) 673 .build(); 674 675 let profile_data = ProfileDataView::new() 676 .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile))) 677 .build(); 678 679 Ok(profile_data) 680} 681 682/// Parse cursor string to i64 timestamp millis 683pub fn parse_cursor(cursor: Option<&str>) -> Result<Option<i64>, XrpcErrorResponse> { 684 cursor 685 .map(|c| { 686 c.parse::<i64>() 687 .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format")) 688 }) 689 .transpose() 690} 691 692/// Handle sh.weaver.notebook.getNotebookFeed 693/// 694/// Returns a global feed of notebooks. 695pub async fn get_notebook_feed( 696 State(state): State<AppState>, 697 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 698 ExtractXrpc(args): ExtractXrpc<GetNotebookFeedRequest>, 699) -> Result<Json<GetNotebookFeedOutput<'static>>, XrpcErrorResponse> { 700 let _viewer: Viewer = viewer; 701 702 let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32; 703 let cursor = parse_cursor(args.cursor.as_deref())?; 704 let algorithm = args.algorithm.as_deref().unwrap_or("chronological"); 705 706 // Convert tags to &[&str] if present 707 let tags_vec: Vec<&str> = args 708 .tags 709 .as_ref() 710 .map(|t| t.iter().map(|s| s.as_ref()).collect()) 711 .unwrap_or_default(); 712 let tags = if tags_vec.is_empty() { 713 None 714 } else { 715 Some(tags_vec.as_slice()) 716 }; 717 718 let notebook_rows = state 719 .clickhouse 720 .get_notebook_feed(algorithm, tags, limit + 1, cursor) 721 .await 722 .map_err(|e| { 723 tracing::error!("Failed to get notebook feed: {}", e); 724 XrpcErrorResponse::internal_error("Database query failed") 725 })?; 726 727 // Check if there are more 728 let has_more = notebook_rows.len() > limit as usize; 729 let notebook_rows: Vec<_> = notebook_rows.into_iter().take(limit as usize).collect(); 730 731 // Collect author DIDs for hydration 732 let mut all_author_dids: HashSet<&str> = HashSet::new(); 733 for nb in &notebook_rows { 734 for did in &nb.author_dids { 735 all_author_dids.insert(did.as_str()); 736 } 737 } 738 739 // Batch fetch profiles 740 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 741 let profiles = state 742 .clickhouse 743 .get_profiles_batch(&author_dids_vec) 744 .await 745 .map_err(|e| { 746 tracing::error!("Failed to batch fetch profiles: {}", e); 747 XrpcErrorResponse::internal_error("Database query failed") 748 })?; 749 750 let profile_map: HashMap<&str, &ProfileRow> = 751 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 752 753 // Build NotebookViews 754 let mut notebooks: Vec<NotebookView<'static>> = Vec::with_capacity(notebook_rows.len()); 755 for nb_row in &notebook_rows { 756 let notebook_uri = AtUri::new(&nb_row.uri).map_err(|e| { 757 tracing::error!("Invalid notebook URI in db: {}", e); 758 XrpcErrorResponse::internal_error("Invalid URI stored") 759 })?; 760 761 let notebook_cid = Cid::new(nb_row.cid.as_bytes()).map_err(|e| { 762 tracing::error!("Invalid notebook CID in db: {}", e); 763 XrpcErrorResponse::internal_error("Invalid CID stored") 764 })?; 765 766 let authors = hydrate_authors(&nb_row.author_dids, &profile_map)?; 767 let record = parse_record_json(&nb_row.record)?; 768 769 let notebook = NotebookView::new() 770 .uri(notebook_uri.into_static()) 771 .cid(notebook_cid.into_static()) 772 .authors(authors) 773 .record(record) 774 .indexed_at(nb_row.indexed_at.fixed_offset()) 775 .maybe_title(non_empty_cowstr(&nb_row.title)) 776 .maybe_path(non_empty_cowstr(&nb_row.path)) 777 .build(); 778 779 notebooks.push(notebook); 780 } 781 782 // Build cursor for pagination (created_at millis) 783 let next_cursor = if has_more { 784 notebook_rows 785 .last() 786 .map(|nb| nb.created_at.timestamp_millis().to_cowstr().into_static()) 787 } else { 788 None 789 }; 790 791 Ok(Json( 792 GetNotebookFeedOutput { 793 notebooks, 794 cursor: next_cursor, 795 extra_data: None, 796 } 797 .into_static(), 798 )) 799} 800 801/// Handle sh.weaver.notebook.getEntryFeed 802/// 803/// Returns a global feed of entries. 804pub async fn get_entry_feed( 805 State(state): State<AppState>, 806 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 807 ExtractXrpc(args): ExtractXrpc<GetEntryFeedRequest>, 808) -> Result<Json<GetEntryFeedOutput<'static>>, XrpcErrorResponse> { 809 let _viewer: Viewer = viewer; 810 811 let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32; 812 let cursor = parse_cursor(args.cursor.as_deref())?; 813 let algorithm = args.algorithm.as_deref().unwrap_or("chronological"); 814 815 // Convert tags to &[&str] if present 816 let tags_vec: Vec<&str> = args 817 .tags 818 .as_ref() 819 .map(|t| t.iter().map(|s| s.as_ref()).collect()) 820 .unwrap_or_default(); 821 let tags = if tags_vec.is_empty() { 822 None 823 } else { 824 Some(tags_vec.as_slice()) 825 }; 826 827 let entry_rows = state 828 .clickhouse 829 .get_entry_feed(algorithm, tags, limit + 1, cursor) 830 .await 831 .map_err(|e| { 832 tracing::error!("Failed to get entry feed: {}", e); 833 XrpcErrorResponse::internal_error("Database query failed") 834 })?; 835 836 // Check if there are more 837 let has_more = entry_rows.len() > limit as usize; 838 let entry_rows: Vec<_> = entry_rows.into_iter().take(limit as usize).collect(); 839 840 // Batch fetch contributors for all entries 841 let entry_keys: Vec<(&str, &str)> = entry_rows 842 .iter() 843 .map(|e| (e.did.as_str(), e.rkey.as_str())) 844 .collect(); 845 let contributors_map = state 846 .clickhouse 847 .get_entry_contributors_batch(&entry_keys) 848 .await 849 .map_err(|e| { 850 tracing::error!("Failed to batch fetch contributors: {}", e); 851 XrpcErrorResponse::internal_error("Database query failed") 852 })?; 853 854 // Collect all contributor DIDs for profile hydration 855 let mut all_author_dids: HashSet<&str> = HashSet::new(); 856 for contributors in contributors_map.values() { 857 for did in contributors { 858 all_author_dids.insert(did.as_str()); 859 } 860 } 861 862 // Batch fetch profiles 863 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 864 let profiles = state 865 .clickhouse 866 .get_profiles_batch(&author_dids_vec) 867 .await 868 .map_err(|e| { 869 tracing::error!("Failed to batch fetch profiles: {}", e); 870 XrpcErrorResponse::internal_error("Database query failed") 871 })?; 872 873 let profile_map: HashMap<&str, &ProfileRow> = 874 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 875 876 // Build FeedEntryViews 877 let mut feed: Vec<FeedEntryView<'static>> = Vec::with_capacity(entry_rows.len()); 878 for entry_row in &entry_rows { 879 // Get contributors for this entry 880 let entry_key = (entry_row.did.clone(), entry_row.rkey.clone()); 881 let contributors = contributors_map 882 .get(&entry_key) 883 .map(|v| v.as_slice()) 884 .unwrap_or(&[]); 885 886 let entry_view = build_entry_view_with_authors(entry_row, contributors, &profile_map)?; 887 888 let feed_entry = FeedEntryView::new().entry(entry_view).build(); 889 890 feed.push(feed_entry); 891 } 892 893 // Build cursor for pagination (created_at millis) 894 let next_cursor = if has_more { 895 entry_rows 896 .last() 897 .map(|e| e.created_at.timestamp_millis().to_cowstr().into_static()) 898 } else { 899 None 900 }; 901 902 Ok(Json( 903 GetEntryFeedOutput { 904 feed, 905 cursor: next_cursor, 906 extra_data: None, 907 } 908 .into_static(), 909 )) 910} 911 912/// Handle sh.weaver.notebook.getBookEntry 913/// 914/// Returns an entry at a specific index within a notebook, with prev/next navigation. 915pub async fn get_book_entry( 916 State(state): State<AppState>, 917 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 918 ExtractXrpc(args): ExtractXrpc<GetBookEntryRequest>, 919) -> Result<Json<GetBookEntryOutput<'static>>, XrpcErrorResponse> { 920 let _viewer: Viewer = viewer; 921 922 // Parse the notebook URI 923 let notebook_uri = &args.notebook; 924 let authority = notebook_uri.authority(); 925 let notebook_rkey = notebook_uri 926 .rkey() 927 .ok_or_else(|| XrpcErrorResponse::invalid_request("Notebook URI must include rkey"))?; 928 929 // Resolve authority to DID 930 let notebook_did = resolve_actor(&state, authority).await?; 931 let notebook_did_str = notebook_did.as_str(); 932 let notebook_rkey_str = notebook_rkey.as_ref(); 933 934 let index = args.index.unwrap_or(0).max(0) as u32; 935 936 // Fetch entry at index with prev/next 937 let result = state 938 .clickhouse 939 .get_book_entry_at_index(notebook_did_str, notebook_rkey_str, index) 940 .await 941 .map_err(|e| { 942 tracing::error!("Failed to get book entry: {}", e); 943 XrpcErrorResponse::internal_error("Database query failed") 944 })?; 945 946 let (current_row, prev_row, next_row) = 947 result.ok_or_else(|| XrpcErrorResponse::not_found("Entry not found at index"))?; 948 949 // Collect all author DIDs for hydration 950 let mut all_author_dids: HashSet<&str> = HashSet::new(); 951 for did in &current_row.author_dids { 952 all_author_dids.insert(did.as_str()); 953 } 954 if let Some(ref prev) = prev_row { 955 for did in &prev.author_dids { 956 all_author_dids.insert(did.as_str()); 957 } 958 } 959 if let Some(ref next) = next_row { 960 for did in &next.author_dids { 961 all_author_dids.insert(did.as_str()); 962 } 963 } 964 965 // Batch fetch profiles 966 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 967 let profiles = state 968 .clickhouse 969 .get_profiles_batch(&author_dids_vec) 970 .await 971 .map_err(|e| { 972 tracing::error!("Failed to fetch profiles: {}", e); 973 XrpcErrorResponse::internal_error("Database query failed") 974 })?; 975 976 let profile_map: HashMap<&str, &ProfileRow> = 977 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 978 979 // Build the current entry view 980 let entry_view = build_entry_view(&current_row, &profile_map)?; 981 982 // Build prev/next refs if present 983 let prev_ref = if let Some(ref prev) = prev_row { 984 let prev_view = build_entry_view(prev, &profile_map)?; 985 Some(BookEntryRef::new().entry(prev_view).build()) 986 } else { 987 None 988 }; 989 990 let next_ref = if let Some(ref next) = next_row { 991 let next_view = build_entry_view(next, &profile_map)?; 992 Some(BookEntryRef::new().entry(next_view).build()) 993 } else { 994 None 995 }; 996 997 let book_entry = BookEntryView::new() 998 .entry(entry_view) 999 .index(index as i64) 1000 .maybe_prev(prev_ref) 1001 .maybe_next(next_ref) 1002 .build(); 1003 1004 Ok(Json( 1005 GetBookEntryOutput { 1006 value: book_entry, 1007 extra_data: None, 1008 } 1009 .into_static(), 1010 )) 1011} 1012 1013/// Build an EntryView from an EntryRow 1014pub fn build_entry_view( 1015 entry_row: &EntryRow, 1016 profile_map: &HashMap<&str, &ProfileRow>, 1017) -> Result<EntryView<'static>, XrpcErrorResponse> { 1018 let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| { 1019 tracing::error!("Invalid entry URI in db: {}", e); 1020 XrpcErrorResponse::internal_error("Invalid URI stored") 1021 })?; 1022 1023 let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| { 1024 tracing::error!("Invalid entry CID in db: {}", e); 1025 XrpcErrorResponse::internal_error("Invalid CID stored") 1026 })?; 1027 1028 let authors = hydrate_authors(&entry_row.author_dids, profile_map)?; 1029 let record = parse_record_json(&entry_row.record)?; 1030 1031 let entry_view = EntryView::new() 1032 .uri(entry_uri.into_static()) 1033 .cid(entry_cid.into_static()) 1034 .authors(authors) 1035 .record(record) 1036 .indexed_at(entry_row.indexed_at.fixed_offset()) 1037 .maybe_title(non_empty_cowstr(&entry_row.title)) 1038 .maybe_path(non_empty_cowstr(&entry_row.path)) 1039 .build(); 1040 1041 Ok(entry_view) 1042} 1043 1044/// Handle sh.weaver.notebook.getEntryNotebooks 1045/// 1046/// Returns notebooks that contain a given entry. 1047pub async fn get_entry_notebooks( 1048 State(state): State<AppState>, 1049 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 1050 ExtractXrpc(args): ExtractXrpc<GetEntryNotebooksRequest>, 1051) -> Result<Json<GetEntryNotebooksOutput<'static>>, XrpcErrorResponse> { 1052 let _viewer: Viewer = viewer; 1053 1054 // Parse the entry URI 1055 let entry_uri = &args.entry; 1056 let authority = entry_uri.authority(); 1057 let entry_rkey = entry_uri 1058 .rkey() 1059 .ok_or_else(|| XrpcErrorResponse::invalid_request("Entry URI must include rkey"))?; 1060 1061 // Resolve authority to DID 1062 let entry_did = resolve_actor(&state, authority).await?; 1063 let entry_did_str = entry_did.as_str(); 1064 let entry_rkey_str = entry_rkey.as_ref(); 1065 1066 // Get notebooks containing this entry 1067 let notebook_refs = state 1068 .clickhouse 1069 .get_notebooks_for_entry(entry_did_str, entry_rkey_str) 1070 .await 1071 .map_err(|e| { 1072 tracing::error!("Failed to get notebooks for entry: {}", e); 1073 XrpcErrorResponse::internal_error("Database query failed") 1074 })?; 1075 1076 if notebook_refs.is_empty() { 1077 return Ok(Json( 1078 GetEntryNotebooksOutput { 1079 notebooks: Vec::new(), 1080 extra_data: None, 1081 } 1082 .into_static(), 1083 )); 1084 } 1085 1086 // Fetch notebook details and owner profiles 1087 let mut notebooks = Vec::with_capacity(notebook_refs.len()); 1088 let mut owner_dids: HashSet<&str> = HashSet::new(); 1089 1090 // First pass: collect owner DIDs 1091 for (notebook_did, _notebook_rkey) in &notebook_refs { 1092 owner_dids.insert(notebook_did.as_str()); 1093 } 1094 1095 // Batch fetch profiles 1096 let owner_dids_vec: Vec<&str> = owner_dids.into_iter().collect(); 1097 let profiles = state 1098 .clickhouse 1099 .get_profiles_batch(&owner_dids_vec) 1100 .await 1101 .map_err(|e| { 1102 tracing::error!("Failed to batch fetch profiles: {}", e); 1103 XrpcErrorResponse::internal_error("Database query failed") 1104 })?; 1105 1106 let profile_map: HashMap<&str, &ProfileRow> = 1107 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 1108 1109 // Fetch each notebook's details 1110 for (notebook_did, notebook_rkey) in &notebook_refs { 1111 let notebook_row = state 1112 .clickhouse 1113 .get_notebook(notebook_did.as_str(), notebook_rkey.as_str()) 1114 .await 1115 .map_err(|e| { 1116 tracing::error!("Failed to get notebook: {}", e); 1117 XrpcErrorResponse::internal_error("Database query failed") 1118 })?; 1119 1120 if let Some(nb) = notebook_row { 1121 let uri = AtUri::new(&nb.uri) 1122 .map_err(|_| XrpcErrorResponse::internal_error("Invalid notebook URI"))? 1123 .into_static(); 1124 1125 let cid = Cid::new(nb.cid.as_bytes()) 1126 .map_err(|_| XrpcErrorResponse::internal_error("Invalid notebook CID"))? 1127 .into_static(); 1128 1129 // Get owner profile 1130 let owner = profile_map 1131 .get(notebook_did.as_str()) 1132 .map(|p| crate::endpoints::collab::profile_to_view_basic(p)) 1133 .transpose()?; 1134 1135 notebooks.push( 1136 NotebookRef::new() 1137 .uri(uri) 1138 .cid(cid) 1139 .maybe_title(non_empty_cowstr(&nb.title)) 1140 .maybe_owner(owner) 1141 .build(), 1142 ); 1143 } 1144 } 1145 1146 Ok(Json( 1147 GetEntryNotebooksOutput { 1148 notebooks, 1149 extra_data: None, 1150 } 1151 .into_static(), 1152 )) 1153} 1154 1155/// Handle sh.weaver.notebook.resolveGlobalNotebook 1156/// 1157/// Resolves a notebook by global path for subdomain routing. 1158pub async fn resolve_global_notebook( 1159 State(state): State<AppState>, 1160 ExtractXrpc(args): ExtractXrpc<ResolveGlobalNotebookRequest>, 1161) -> Result<Json<ResolveGlobalNotebookOutput<'static>>, XrpcErrorResponse> { 1162 let path = args.path.as_ref(); 1163 1164 let notebook_row = state 1165 .clickhouse 1166 .resolve_notebook_by_global_path(path) 1167 .await 1168 .map_err(|e| { 1169 tracing::error!("Failed to resolve global notebook: {}", e); 1170 XrpcErrorResponse::internal_error("Database query failed") 1171 })? 1172 .ok_or_else(|| XrpcErrorResponse::not_found("Notebook not found"))?; 1173 1174 // Fetch contributors for author hydration 1175 let notebook_contributors = state 1176 .clickhouse 1177 .get_notebook_contributors(&notebook_row.did, &notebook_row.rkey) 1178 .await 1179 .map_err(|e| { 1180 tracing::error!("Failed to get notebook contributors: {}", e); 1181 XrpcErrorResponse::internal_error("Database query failed") 1182 })?; 1183 1184 // Collect author DIDs 1185 let mut all_author_dids: HashSet<&str> = 1186 notebook_contributors.iter().map(|s| s.as_str()).collect(); 1187 for did in &notebook_row.author_dids { 1188 all_author_dids.insert(did.as_str()); 1189 } 1190 1191 // Batch fetch profiles 1192 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 1193 let profiles = state 1194 .clickhouse 1195 .get_profiles_batch(&author_dids_vec) 1196 .await 1197 .map_err(|e| { 1198 tracing::error!("Failed to batch fetch profiles: {}", e); 1199 XrpcErrorResponse::internal_error("Database query failed") 1200 })?; 1201 1202 let profile_map: HashMap<&str, &ProfileRow> = 1203 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 1204 1205 // Build NotebookView 1206 let notebook_uri = AtUri::new(&notebook_row.uri).map_err(|e| { 1207 tracing::error!("Invalid notebook URI in db: {}", e); 1208 XrpcErrorResponse::internal_error("Invalid URI stored") 1209 })?; 1210 1211 let notebook_cid = Cid::new(notebook_row.cid.as_bytes()).map_err(|e| { 1212 tracing::error!("Invalid notebook CID in db: {}", e); 1213 XrpcErrorResponse::internal_error("Invalid CID stored") 1214 })?; 1215 1216 let authors = hydrate_authors(&notebook_contributors, &profile_map)?; 1217 let record = parse_record_json(&notebook_row.record)?; 1218 1219 let notebook = NotebookView::new() 1220 .uri(notebook_uri.into_static()) 1221 .cid(notebook_cid.into_static()) 1222 .authors(authors) 1223 .record(record) 1224 .indexed_at(notebook_row.indexed_at.fixed_offset()) 1225 .maybe_title(non_empty_cowstr(&notebook_row.title)) 1226 .maybe_path(non_empty_cowstr(&notebook_row.path)) 1227 .build(); 1228 1229 Ok(Json( 1230 ResolveGlobalNotebookOutput { 1231 notebook, 1232 extra_data: None, 1233 } 1234 .into_static(), 1235 )) 1236}