at main 570 lines 20 kB view raw
1//! sh.weaver.actor.* endpoint handlers 2 3use std::collections::{HashMap, HashSet}; 4 5use axum::{Json, extract::State}; 6use jacquard::IntoStatic; 7use jacquard::cowstr::ToCowStr; 8use jacquard::identity::resolver::IdentityResolver; 9use jacquard::prelude::*; 10use jacquard::types::ident::AtIdentifier; 11use jacquard::types::string::{AtUri, Cid, Did, Handle, Uri}; 12use jacquard_axum::ExtractXrpc; 13use jacquard_axum::service_auth::{ExtractOptionalServiceAuth, VerifiedServiceAuth}; 14use smol_str::SmolStr; 15use weaver_api::sh_weaver::actor::{ 16 ProfileDataView, ProfileDataViewInner, ProfileView, 17 get_actor_entries::{GetActorEntriesOutput, GetActorEntriesRequest}, 18 get_actor_notebooks::{GetActorNotebooksOutput, GetActorNotebooksRequest}, 19 get_profile::{GetProfileOutput, GetProfileRequest}, 20}; 21use weaver_api::sh_weaver::notebook::{AuthorListView, EntryView, NotebookView}; 22 23use crate::clickhouse::ProfileRow; 24use crate::endpoints::repo::XrpcErrorResponse; 25use crate::server::AppState; 26 27/// Authenticated viewer context (if present) 28pub type Viewer = Option<VerifiedServiceAuth<'static>>; 29 30/// Handle sh.weaver.actor.getProfile 31/// 32/// Returns a profile view with counts for the requested actor. 33pub async fn get_profile( 34 State(state): State<AppState>, 35 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 36 ExtractXrpc(args): ExtractXrpc<GetProfileRequest>, 37) -> Result<Json<GetProfileOutput<'static>>, XrpcErrorResponse> { 38 // viewer contains Some(auth) if the request has valid service auth 39 // can be used later for viewer-specific state (e.g., "you follow this person") 40 let _viewer = viewer; 41 // Resolve identifier to DID 42 let did = resolve_actor(&state, &args.actor).await?; 43 let did_str = did.as_str(); 44 45 // Fetch profile with counts from ClickHouse 46 let profile_data = state 47 .clickhouse 48 .get_profile_with_counts(did_str) 49 .await 50 .map_err(|e| { 51 tracing::error!("Failed to get profile: {}", e); 52 XrpcErrorResponse::internal_error("Database query failed") 53 })?; 54 55 let Some(data) = profile_data else { 56 // get the bluesky profile 57 // TODO: either cache this or yell at tap to start tracking their account! 58 let profile_resp = state 59 .resolver 60 .send( 61 weaver_api::app_bsky::actor::get_profile::GetProfile::new() 62 .actor(did) 63 .build(), 64 ) 65 .await 66 .map_err(|e| XrpcErrorResponse::not_found(e.to_string()))?; 67 let bsky_profile = profile_resp 68 .into_output() 69 .map_err(|e| XrpcErrorResponse::not_found(e.to_string()))? 70 .value; 71 let inner_profile = ProfileView::new() 72 .did(bsky_profile.did) 73 .handle(bsky_profile.handle) 74 .maybe_display_name(bsky_profile.display_name) 75 .maybe_description(bsky_profile.description) 76 .maybe_avatar(bsky_profile.avatar) 77 .maybe_banner(bsky_profile.banner) 78 .build(); 79 80 let inner = ProfileDataViewInner::ProfileView(Box::new(inner_profile)); 81 82 let output = ProfileDataView::new().inner(inner).build(); 83 84 return Ok(Json( 85 GetProfileOutput { 86 value: output, 87 extra_data: None, 88 } 89 .into_static(), 90 )); 91 }; 92 93 // Build the response 94 let profile = &data.profile; 95 96 // Determine handle - use from profile row, or resolve if empty 97 let handle_str = if profile.handle.is_empty() { 98 // Try to resolve DID -> handle 99 match state.clickhouse.resolve_did_to_handle(did_str).await { 100 Ok(Some(mapping)) => mapping.handle.to_string(), 101 _ => { 102 // Last resort: use a placeholder or try external resolver 103 // For now, use the DID as handle (not ideal but functional) 104 did_str.to_string() 105 } 106 } 107 } else { 108 profile.handle.to_string() 109 }; 110 111 let handle = Handle::new(&handle_str).map_err(|e| { 112 tracing::error!("Invalid handle in database: {}", e); 113 XrpcErrorResponse::internal_error("Invalid handle stored") 114 })?; 115 116 // Build avatar URL from CID if present 117 let avatar = if !profile.avatar_cid.is_empty() { 118 let url = format!( 119 "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg", 120 profile.did, profile.avatar_cid 121 ); 122 Uri::new_owned(url).ok() 123 } else { 124 None 125 }; 126 127 // Build banner URL from CID if present 128 let banner = if !profile.banner_cid.is_empty() { 129 let url = format!( 130 "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg", 131 profile.did, profile.banner_cid 132 ); 133 Uri::new_owned(url).ok() 134 } else { 135 None 136 }; 137 138 // Build ProfileView (weaver native profile) 139 let inner_profile = ProfileView::new() 140 .did(did.clone()) 141 .handle(handle) 142 .maybe_display_name(non_empty_str(&profile.display_name)) 143 .maybe_description(non_empty_str(&profile.description)) 144 .maybe_avatar(avatar) 145 .maybe_banner(banner) 146 .build(); 147 148 let inner = ProfileDataViewInner::ProfileView(Box::new(inner_profile)); 149 150 // Build ProfileDataView with counts 151 let counts = data.counts.as_ref(); 152 153 let output = ProfileDataView::new() 154 .inner(inner) 155 .maybe_follower_count(counts.map(|c| c.follower_count)) 156 .maybe_following_count(counts.map(|c| c.following_count)) 157 .maybe_notebook_count(counts.map(|c| c.notebook_count)) 158 .maybe_entry_count(counts.map(|c| c.entry_count)) 159 .build(); 160 161 Ok(Json( 162 GetProfileOutput { 163 value: output, 164 extra_data: None, 165 } 166 .into_static(), 167 )) 168} 169 170/// Resolve an AtIdentifier to a DID. 171/// 172/// For handles: tries handle_mappings first, falls back to external resolver. 173/// For DIDs: returns as-is. 174pub async fn resolve_actor<'a>( 175 state: &AppState, 176 actor: &AtIdentifier<'a>, 177) -> Result<Did<'static>, XrpcErrorResponse> { 178 match actor { 179 AtIdentifier::Did(did) => Ok(did.clone().into_static()), 180 AtIdentifier::Handle(handle) => { 181 let handle_str = handle.as_str(); 182 183 // Try handle_mappings first 184 match state.clickhouse.resolve_handle(handle_str).await { 185 Ok(Some(mapping)) => { 186 let did = Did::new(&mapping.did).map_err(|e| { 187 tracing::error!("Invalid DID in handle_mappings: {}", e); 188 XrpcErrorResponse::internal_error("Invalid DID stored") 189 })?; 190 return Ok(did.into_static()); 191 } 192 Ok(None) => { 193 tracing::debug!("Handle {} not in cache, trying resolver", handle_str); 194 } 195 Err(e) => { 196 tracing::warn!("Handle lookup failed, trying resolver: {}", e); 197 } 198 } 199 200 // Fall back to external resolver 201 let resolved = state.resolver.resolve_handle(handle).await.map_err(|e| { 202 tracing::warn!("Handle resolution failed for {}: {}", handle, e); 203 XrpcErrorResponse::invalid_request(format!("Could not resolve handle: {}", handle)) 204 })?; 205 206 // Cache the result (fire-and-forget) 207 let clickhouse = state.clickhouse.clone(); 208 let handle_owned = handle_str.to_string(); 209 let did_owned = resolved.as_str().to_string(); 210 tokio::spawn(async move { 211 if let Err(e) = clickhouse 212 .cache_handle_resolution(&handle_owned, &did_owned) 213 .await 214 { 215 tracing::warn!("Failed to cache handle resolution: {}", e); 216 } 217 }); 218 219 Ok(resolved.into_static()) 220 } 221 } 222} 223 224// Re-export from parent for local use 225use super::non_empty_str; 226 227/// Parse cursor string to i64 timestamp millis 228fn parse_cursor(cursor: Option<&str>) -> Result<Option<i64>, XrpcErrorResponse> { 229 cursor 230 .map(|c| { 231 c.parse::<i64>() 232 .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format")) 233 }) 234 .transpose() 235} 236 237/// Handle sh.weaver.actor.getActorNotebooks 238/// 239/// Returns notebooks owned by the given actor. 240pub async fn get_actor_notebooks( 241 State(state): State<AppState>, 242 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 243 ExtractXrpc(args): ExtractXrpc<GetActorNotebooksRequest>, 244) -> Result<Json<GetActorNotebooksOutput<'static>>, XrpcErrorResponse> { 245 let _viewer: Viewer = viewer; 246 247 // Resolve actor to DID 248 let did = resolve_actor(&state, &args.actor).await?; 249 let did_str = did.as_str(); 250 251 // Fetch notebooks for this actor 252 let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32; 253 let cursor = parse_cursor(args.cursor.as_deref())?; 254 255 let notebook_rows = state 256 .clickhouse 257 .list_actor_notebooks(did_str, limit + 1, cursor) 258 .await 259 .map_err(|e| { 260 tracing::error!("Failed to list actor notebooks: {}", e); 261 XrpcErrorResponse::internal_error("Database query failed") 262 })?; 263 264 // Check if there are more 265 let has_more = notebook_rows.len() > limit as usize; 266 let notebook_rows: Vec<_> = notebook_rows.into_iter().take(limit as usize).collect(); 267 268 // Collect author DIDs for hydration 269 let mut all_author_dids: HashSet<&str> = HashSet::new(); 270 for nb in &notebook_rows { 271 for did in &nb.author_dids { 272 all_author_dids.insert(did.as_str()); 273 } 274 } 275 276 // Batch fetch profiles 277 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 278 let profiles = state 279 .clickhouse 280 .get_profiles_batch(&author_dids_vec) 281 .await 282 .map_err(|e| { 283 tracing::error!("Failed to batch fetch profiles: {}", e); 284 XrpcErrorResponse::internal_error("Database query failed") 285 })?; 286 287 let profile_map: HashMap<&str, &ProfileRow> = 288 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 289 290 // Build NotebookViews 291 let mut notebooks: Vec<NotebookView<'static>> = Vec::with_capacity(notebook_rows.len()); 292 for nb_row in &notebook_rows { 293 let notebook_uri = AtUri::new(&nb_row.uri).map_err(|e| { 294 tracing::error!("Invalid notebook URI in db: {}", e); 295 XrpcErrorResponse::internal_error("Invalid URI stored") 296 })?; 297 298 let notebook_cid = Cid::new(nb_row.cid.as_bytes()).map_err(|e| { 299 tracing::error!("Invalid notebook CID in db: {}", e); 300 XrpcErrorResponse::internal_error("Invalid CID stored") 301 })?; 302 303 let authors = hydrate_authors(&nb_row.author_dids, &profile_map)?; 304 let record = parse_record_json(&nb_row.record)?; 305 306 let notebook = NotebookView::new() 307 .uri(notebook_uri.into_static()) 308 .cid(notebook_cid.into_static()) 309 .authors(authors) 310 .record(record) 311 .indexed_at(nb_row.indexed_at.fixed_offset()) 312 .maybe_title(non_empty_str(&nb_row.title)) 313 .maybe_path(non_empty_str(&nb_row.path)) 314 .build(); 315 316 notebooks.push(notebook); 317 } 318 319 // Build cursor for pagination (created_at millis) 320 let next_cursor = if has_more { 321 notebook_rows 322 .last() 323 .map(|nb| nb.created_at.timestamp_millis().to_cowstr().into_static()) 324 } else { 325 None 326 }; 327 328 Ok(Json( 329 GetActorNotebooksOutput { 330 notebooks, 331 cursor: next_cursor, 332 extra_data: None, 333 } 334 .into_static(), 335 )) 336} 337 338/// Handle sh.weaver.actor.getActorEntries 339/// 340/// Returns entries owned by the given actor. 341pub async fn get_actor_entries( 342 State(state): State<AppState>, 343 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 344 ExtractXrpc(args): ExtractXrpc<GetActorEntriesRequest>, 345) -> Result<Json<GetActorEntriesOutput<'static>>, XrpcErrorResponse> { 346 let _viewer: Viewer = viewer; 347 348 // Resolve actor to DID 349 let did = resolve_actor(&state, &args.actor).await?; 350 let did_str = did.as_str(); 351 352 // Fetch entries for this actor 353 let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32; 354 let cursor = parse_cursor(args.cursor.as_deref())?; 355 356 let entry_rows = state 357 .clickhouse 358 .list_actor_entries(did_str, limit + 1, cursor) 359 .await 360 .map_err(|e| { 361 tracing::error!("Failed to list actor entries: {}", e); 362 XrpcErrorResponse::internal_error("Database query failed") 363 })?; 364 365 // Check if there are more 366 let has_more = entry_rows.len() > limit as usize; 367 let entry_rows: Vec<_> = entry_rows.into_iter().take(limit as usize).collect(); 368 369 // Batch fetch contributors for all entries 370 let entry_keys: Vec<(&str, &str)> = entry_rows 371 .iter() 372 .map(|e| (e.did.as_str(), e.rkey.as_str())) 373 .collect(); 374 let contributors_map = state 375 .clickhouse 376 .get_entry_contributors_batch(&entry_keys) 377 .await 378 .map_err(|e| { 379 tracing::error!("Failed to batch fetch contributors: {}", e); 380 XrpcErrorResponse::internal_error("Database query failed") 381 })?; 382 383 // Collect all contributor DIDs for profile hydration 384 let mut all_author_dids: HashSet<&str> = HashSet::new(); 385 for contributors in contributors_map.values() { 386 for did in contributors { 387 all_author_dids.insert(did.as_str()); 388 } 389 } 390 391 // Batch fetch profiles 392 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect(); 393 let profiles = state 394 .clickhouse 395 .get_profiles_batch(&author_dids_vec) 396 .await 397 .map_err(|e| { 398 tracing::error!("Failed to batch fetch profiles: {}", e); 399 XrpcErrorResponse::internal_error("Database query failed") 400 })?; 401 402 let profile_map: HashMap<&str, &ProfileRow> = 403 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 404 405 // Build EntryViews 406 let mut entries: Vec<EntryView<'static>> = Vec::with_capacity(entry_rows.len()); 407 for entry_row in &entry_rows { 408 let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| { 409 tracing::error!("Invalid entry URI in db: {}", e); 410 XrpcErrorResponse::internal_error("Invalid URI stored") 411 })?; 412 413 let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| { 414 tracing::error!("Invalid entry CID in db: {}", e); 415 XrpcErrorResponse::internal_error("Invalid CID stored") 416 })?; 417 418 // Get contributors for this entry 419 let entry_key = (entry_row.did.clone(), entry_row.rkey.clone()); 420 let contributors = contributors_map 421 .get(&entry_key) 422 .map(|v| v.as_slice()) 423 .unwrap_or(&[]); 424 425 let authors = hydrate_authors(contributors, &profile_map)?; 426 let record = parse_record_json(&entry_row.record)?; 427 428 let entry = EntryView::new() 429 .uri(entry_uri.into_static()) 430 .cid(entry_cid.into_static()) 431 .authors(authors) 432 .record(record) 433 .indexed_at(entry_row.indexed_at.fixed_offset()) 434 .maybe_title(non_empty_str(&entry_row.title)) 435 .maybe_path(non_empty_str(&entry_row.path)) 436 .build(); 437 438 entries.push(entry); 439 } 440 441 // Build cursor for pagination (created_at millis) 442 let next_cursor = if has_more { 443 entry_rows 444 .last() 445 .map(|e| e.created_at.timestamp_millis().to_cowstr().into_static()) 446 } else { 447 None 448 }; 449 450 Ok(Json( 451 GetActorEntriesOutput { 452 entries, 453 cursor: next_cursor, 454 extra_data: None, 455 } 456 .into_static(), 457 )) 458} 459 460/// Hydrate author list from DIDs using profile map 461fn hydrate_authors( 462 author_dids: &[SmolStr], 463 profile_map: &HashMap<&str, &ProfileRow>, 464) -> Result<Vec<AuthorListView<'static>>, XrpcErrorResponse> { 465 let mut authors = Vec::with_capacity(author_dids.len()); 466 467 for (idx, did_str) in author_dids.iter().enumerate() { 468 let profile_data = if let Some(profile) = profile_map.get(did_str.as_str()) { 469 profile_to_data_view(profile)? 470 } else { 471 // No profile found - create minimal view with just the DID 472 let did = Did::new(did_str).map_err(|e| { 473 tracing::error!("Invalid DID in author_dids: {}", e); 474 XrpcErrorResponse::internal_error("Invalid DID stored") 475 })?; 476 477 let inner_profile = ProfileView::new() 478 .did(did.into_static()) 479 .handle( 480 Handle::new(did_str) 481 .unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap()), 482 ) 483 .build(); 484 485 ProfileDataView::new() 486 .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile))) 487 .build() 488 }; 489 490 let author_view = AuthorListView::new() 491 .index(idx as i64) 492 .record(profile_data.into_static()) 493 .build(); 494 495 authors.push(author_view); 496 } 497 498 Ok(authors) 499} 500 501/// Convert ProfileRow to ProfileDataView 502pub fn profile_to_data_view( 503 profile: &ProfileRow, 504) -> Result<ProfileDataView<'static>, XrpcErrorResponse> { 505 use jacquard::types::string::Uri; 506 507 let did = Did::new(&profile.did).map_err(|e| { 508 tracing::error!("Invalid DID in profile: {}", e); 509 XrpcErrorResponse::internal_error("Invalid DID stored") 510 })?; 511 512 let handle = if profile.handle.is_empty() { 513 Handle::new(&profile.did).unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap()) 514 } else { 515 Handle::new(&profile.handle).map_err(|e| { 516 tracing::error!("Invalid handle in profile: {}", e); 517 XrpcErrorResponse::internal_error("Invalid handle stored") 518 })? 519 }; 520 521 // Build avatar URL from CID if present 522 let avatar = if !profile.avatar_cid.is_empty() { 523 let url = format!( 524 "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg", 525 profile.did, profile.avatar_cid 526 ); 527 Uri::new_owned(url).ok() 528 } else { 529 None 530 }; 531 532 // Build banner URL from CID if present 533 let banner = if !profile.banner_cid.is_empty() { 534 let url = format!( 535 "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg", 536 profile.did, profile.banner_cid 537 ); 538 Uri::new_owned(url).ok() 539 } else { 540 None 541 }; 542 543 let inner_profile = ProfileView::new() 544 .did(did.into_static()) 545 .handle(handle.into_static()) 546 .maybe_display_name(non_empty_str(&profile.display_name)) 547 .maybe_description(non_empty_str(&profile.description)) 548 .maybe_avatar(avatar) 549 .maybe_banner(banner) 550 .build(); 551 552 let profile_data = ProfileDataView::new() 553 .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile))) 554 .build(); 555 556 Ok(profile_data) 557} 558 559/// Parse record JSON string into owned Data 560fn parse_record_json( 561 json: &str, 562) -> Result<jacquard::types::value::Data<'static>, XrpcErrorResponse> { 563 use jacquard::types::value::Data; 564 565 let data: Data<'_> = serde_json::from_str(json).map_err(|e| { 566 tracing::error!("Failed to parse record JSON: {}", e); 567 XrpcErrorResponse::internal_error("Invalid record JSON stored") 568 })?; 569 Ok(data.into_static()) 570}