atproto blogging
1//! sh.weaver.actor.* endpoint handlers
2
3use std::collections::{HashMap, HashSet};
4
5use axum::{Json, extract::State};
6use jacquard::IntoStatic;
7use jacquard::cowstr::ToCowStr;
8use jacquard::identity::resolver::IdentityResolver;
9use jacquard::prelude::*;
10use jacquard::types::ident::AtIdentifier;
11use jacquard::types::string::{AtUri, Cid, Did, Handle, Uri};
12use jacquard_axum::ExtractXrpc;
13use jacquard_axum::service_auth::{ExtractOptionalServiceAuth, VerifiedServiceAuth};
14use smol_str::SmolStr;
15use weaver_api::sh_weaver::actor::{
16 ProfileDataView, ProfileDataViewInner, ProfileView,
17 get_actor_entries::{GetActorEntriesOutput, GetActorEntriesRequest},
18 get_actor_notebooks::{GetActorNotebooksOutput, GetActorNotebooksRequest},
19 get_profile::{GetProfileOutput, GetProfileRequest},
20};
21use weaver_api::sh_weaver::notebook::{AuthorListView, EntryView, NotebookView};
22
23use crate::clickhouse::ProfileRow;
24use crate::endpoints::repo::XrpcErrorResponse;
25use crate::server::AppState;
26
27/// Authenticated viewer context (if present)
28pub type Viewer = Option<VerifiedServiceAuth<'static>>;
29
30/// Handle sh.weaver.actor.getProfile
31///
32/// Returns a profile view with counts for the requested actor.
33pub async fn get_profile(
34 State(state): State<AppState>,
35 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
36 ExtractXrpc(args): ExtractXrpc<GetProfileRequest>,
37) -> Result<Json<GetProfileOutput<'static>>, XrpcErrorResponse> {
38 // viewer contains Some(auth) if the request has valid service auth
39 // can be used later for viewer-specific state (e.g., "you follow this person")
40 let _viewer = viewer;
41 // Resolve identifier to DID
42 let did = resolve_actor(&state, &args.actor).await?;
43 let did_str = did.as_str();
44
45 // Fetch profile with counts from ClickHouse
46 let profile_data = state
47 .clickhouse
48 .get_profile_with_counts(did_str)
49 .await
50 .map_err(|e| {
51 tracing::error!("Failed to get profile: {}", e);
52 XrpcErrorResponse::internal_error("Database query failed")
53 })?;
54
55 let Some(data) = profile_data else {
56 // get the bluesky profile
57 // TODO: either cache this or yell at tap to start tracking their account!
58 let profile_resp = state
59 .resolver
60 .send(
61 weaver_api::app_bsky::actor::get_profile::GetProfile::new()
62 .actor(did)
63 .build(),
64 )
65 .await
66 .map_err(|e| XrpcErrorResponse::not_found(e.to_string()))?;
67 let bsky_profile = profile_resp
68 .into_output()
69 .map_err(|e| XrpcErrorResponse::not_found(e.to_string()))?
70 .value;
71 let inner_profile = ProfileView::new()
72 .did(bsky_profile.did)
73 .handle(bsky_profile.handle)
74 .maybe_display_name(bsky_profile.display_name)
75 .maybe_description(bsky_profile.description)
76 .maybe_avatar(bsky_profile.avatar)
77 .maybe_banner(bsky_profile.banner)
78 .build();
79
80 let inner = ProfileDataViewInner::ProfileView(Box::new(inner_profile));
81
82 let output = ProfileDataView::new().inner(inner).build();
83
84 return Ok(Json(
85 GetProfileOutput {
86 value: output,
87 extra_data: None,
88 }
89 .into_static(),
90 ));
91 };
92
93 // Build the response
94 let profile = &data.profile;
95
96 // Determine handle - use from profile row, or resolve if empty
97 let handle_str = if profile.handle.is_empty() {
98 // Try to resolve DID -> handle
99 match state.clickhouse.resolve_did_to_handle(did_str).await {
100 Ok(Some(mapping)) => mapping.handle.to_string(),
101 _ => {
102 // Last resort: use a placeholder or try external resolver
103 // For now, use the DID as handle (not ideal but functional)
104 did_str.to_string()
105 }
106 }
107 } else {
108 profile.handle.to_string()
109 };
110
111 let handle = Handle::new(&handle_str).map_err(|e| {
112 tracing::error!("Invalid handle in database: {}", e);
113 XrpcErrorResponse::internal_error("Invalid handle stored")
114 })?;
115
116 // Build avatar URL from CID if present
117 let avatar = if !profile.avatar_cid.is_empty() {
118 let url = format!(
119 "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg",
120 profile.did, profile.avatar_cid
121 );
122 Uri::new_owned(url).ok()
123 } else {
124 None
125 };
126
127 // Build banner URL from CID if present
128 let banner = if !profile.banner_cid.is_empty() {
129 let url = format!(
130 "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg",
131 profile.did, profile.banner_cid
132 );
133 Uri::new_owned(url).ok()
134 } else {
135 None
136 };
137
138 // Build ProfileView (weaver native profile)
139 let inner_profile = ProfileView::new()
140 .did(did.clone())
141 .handle(handle)
142 .maybe_display_name(non_empty_str(&profile.display_name))
143 .maybe_description(non_empty_str(&profile.description))
144 .maybe_avatar(avatar)
145 .maybe_banner(banner)
146 .build();
147
148 let inner = ProfileDataViewInner::ProfileView(Box::new(inner_profile));
149
150 // Build ProfileDataView with counts
151 let counts = data.counts.as_ref();
152
153 let output = ProfileDataView::new()
154 .inner(inner)
155 .maybe_follower_count(counts.map(|c| c.follower_count))
156 .maybe_following_count(counts.map(|c| c.following_count))
157 .maybe_notebook_count(counts.map(|c| c.notebook_count))
158 .maybe_entry_count(counts.map(|c| c.entry_count))
159 .build();
160
161 Ok(Json(
162 GetProfileOutput {
163 value: output,
164 extra_data: None,
165 }
166 .into_static(),
167 ))
168}
169
170/// Resolve an AtIdentifier to a DID.
171///
172/// For handles: tries handle_mappings first, falls back to external resolver.
173/// For DIDs: returns as-is.
174pub async fn resolve_actor<'a>(
175 state: &AppState,
176 actor: &AtIdentifier<'a>,
177) -> Result<Did<'static>, XrpcErrorResponse> {
178 match actor {
179 AtIdentifier::Did(did) => Ok(did.clone().into_static()),
180 AtIdentifier::Handle(handle) => {
181 let handle_str = handle.as_str();
182
183 // Try handle_mappings first
184 match state.clickhouse.resolve_handle(handle_str).await {
185 Ok(Some(mapping)) => {
186 let did = Did::new(&mapping.did).map_err(|e| {
187 tracing::error!("Invalid DID in handle_mappings: {}", e);
188 XrpcErrorResponse::internal_error("Invalid DID stored")
189 })?;
190 return Ok(did.into_static());
191 }
192 Ok(None) => {
193 tracing::debug!("Handle {} not in cache, trying resolver", handle_str);
194 }
195 Err(e) => {
196 tracing::warn!("Handle lookup failed, trying resolver: {}", e);
197 }
198 }
199
200 // Fall back to external resolver
201 let resolved = state.resolver.resolve_handle(handle).await.map_err(|e| {
202 tracing::warn!("Handle resolution failed for {}: {}", handle, e);
203 XrpcErrorResponse::invalid_request(format!("Could not resolve handle: {}", handle))
204 })?;
205
206 // Cache the result (fire-and-forget)
207 let clickhouse = state.clickhouse.clone();
208 let handle_owned = handle_str.to_string();
209 let did_owned = resolved.as_str().to_string();
210 tokio::spawn(async move {
211 if let Err(e) = clickhouse
212 .cache_handle_resolution(&handle_owned, &did_owned)
213 .await
214 {
215 tracing::warn!("Failed to cache handle resolution: {}", e);
216 }
217 });
218
219 Ok(resolved.into_static())
220 }
221 }
222}
223
224// Re-export from parent for local use
225use super::non_empty_str;
226
227/// Parse cursor string to i64 timestamp millis
228fn parse_cursor(cursor: Option<&str>) -> Result<Option<i64>, XrpcErrorResponse> {
229 cursor
230 .map(|c| {
231 c.parse::<i64>()
232 .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))
233 })
234 .transpose()
235}
236
237/// Handle sh.weaver.actor.getActorNotebooks
238///
239/// Returns notebooks owned by the given actor.
240pub async fn get_actor_notebooks(
241 State(state): State<AppState>,
242 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
243 ExtractXrpc(args): ExtractXrpc<GetActorNotebooksRequest>,
244) -> Result<Json<GetActorNotebooksOutput<'static>>, XrpcErrorResponse> {
245 let _viewer: Viewer = viewer;
246
247 // Resolve actor to DID
248 let did = resolve_actor(&state, &args.actor).await?;
249 let did_str = did.as_str();
250
251 // Fetch notebooks for this actor
252 let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32;
253 let cursor = parse_cursor(args.cursor.as_deref())?;
254
255 let notebook_rows = state
256 .clickhouse
257 .list_actor_notebooks(did_str, limit + 1, cursor)
258 .await
259 .map_err(|e| {
260 tracing::error!("Failed to list actor notebooks: {}", e);
261 XrpcErrorResponse::internal_error("Database query failed")
262 })?;
263
264 // Check if there are more
265 let has_more = notebook_rows.len() > limit as usize;
266 let notebook_rows: Vec<_> = notebook_rows.into_iter().take(limit as usize).collect();
267
268 // Collect author DIDs for hydration
269 let mut all_author_dids: HashSet<&str> = HashSet::new();
270 for nb in ¬ebook_rows {
271 for did in &nb.author_dids {
272 all_author_dids.insert(did.as_str());
273 }
274 }
275
276 // Batch fetch profiles
277 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
278 let profiles = state
279 .clickhouse
280 .get_profiles_batch(&author_dids_vec)
281 .await
282 .map_err(|e| {
283 tracing::error!("Failed to batch fetch profiles: {}", e);
284 XrpcErrorResponse::internal_error("Database query failed")
285 })?;
286
287 let profile_map: HashMap<&str, &ProfileRow> =
288 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
289
290 // Build NotebookViews
291 let mut notebooks: Vec<NotebookView<'static>> = Vec::with_capacity(notebook_rows.len());
292 for nb_row in ¬ebook_rows {
293 let notebook_uri = AtUri::new(&nb_row.uri).map_err(|e| {
294 tracing::error!("Invalid notebook URI in db: {}", e);
295 XrpcErrorResponse::internal_error("Invalid URI stored")
296 })?;
297
298 let notebook_cid = Cid::new(nb_row.cid.as_bytes()).map_err(|e| {
299 tracing::error!("Invalid notebook CID in db: {}", e);
300 XrpcErrorResponse::internal_error("Invalid CID stored")
301 })?;
302
303 let authors = hydrate_authors(&nb_row.author_dids, &profile_map)?;
304 let record = parse_record_json(&nb_row.record)?;
305
306 let notebook = NotebookView::new()
307 .uri(notebook_uri.into_static())
308 .cid(notebook_cid.into_static())
309 .authors(authors)
310 .record(record)
311 .indexed_at(nb_row.indexed_at.fixed_offset())
312 .maybe_title(non_empty_str(&nb_row.title))
313 .maybe_path(non_empty_str(&nb_row.path))
314 .build();
315
316 notebooks.push(notebook);
317 }
318
319 // Build cursor for pagination (created_at millis)
320 let next_cursor = if has_more {
321 notebook_rows
322 .last()
323 .map(|nb| nb.created_at.timestamp_millis().to_cowstr().into_static())
324 } else {
325 None
326 };
327
328 Ok(Json(
329 GetActorNotebooksOutput {
330 notebooks,
331 cursor: next_cursor,
332 extra_data: None,
333 }
334 .into_static(),
335 ))
336}
337
338/// Handle sh.weaver.actor.getActorEntries
339///
340/// Returns entries owned by the given actor.
341pub async fn get_actor_entries(
342 State(state): State<AppState>,
343 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
344 ExtractXrpc(args): ExtractXrpc<GetActorEntriesRequest>,
345) -> Result<Json<GetActorEntriesOutput<'static>>, XrpcErrorResponse> {
346 let _viewer: Viewer = viewer;
347
348 // Resolve actor to DID
349 let did = resolve_actor(&state, &args.actor).await?;
350 let did_str = did.as_str();
351
352 // Fetch entries for this actor
353 let limit = args.limit.unwrap_or(50).clamp(1, 100) as u32;
354 let cursor = parse_cursor(args.cursor.as_deref())?;
355
356 let entry_rows = state
357 .clickhouse
358 .list_actor_entries(did_str, limit + 1, cursor)
359 .await
360 .map_err(|e| {
361 tracing::error!("Failed to list actor entries: {}", e);
362 XrpcErrorResponse::internal_error("Database query failed")
363 })?;
364
365 // Check if there are more
366 let has_more = entry_rows.len() > limit as usize;
367 let entry_rows: Vec<_> = entry_rows.into_iter().take(limit as usize).collect();
368
369 // Batch fetch contributors for all entries
370 let entry_keys: Vec<(&str, &str)> = entry_rows
371 .iter()
372 .map(|e| (e.did.as_str(), e.rkey.as_str()))
373 .collect();
374 let contributors_map = state
375 .clickhouse
376 .get_entry_contributors_batch(&entry_keys)
377 .await
378 .map_err(|e| {
379 tracing::error!("Failed to batch fetch contributors: {}", e);
380 XrpcErrorResponse::internal_error("Database query failed")
381 })?;
382
383 // Collect all contributor DIDs for profile hydration
384 let mut all_author_dids: HashSet<&str> = HashSet::new();
385 for contributors in contributors_map.values() {
386 for did in contributors {
387 all_author_dids.insert(did.as_str());
388 }
389 }
390
391 // Batch fetch profiles
392 let author_dids_vec: Vec<&str> = all_author_dids.into_iter().collect();
393 let profiles = state
394 .clickhouse
395 .get_profiles_batch(&author_dids_vec)
396 .await
397 .map_err(|e| {
398 tracing::error!("Failed to batch fetch profiles: {}", e);
399 XrpcErrorResponse::internal_error("Database query failed")
400 })?;
401
402 let profile_map: HashMap<&str, &ProfileRow> =
403 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
404
405 // Build EntryViews
406 let mut entries: Vec<EntryView<'static>> = Vec::with_capacity(entry_rows.len());
407 for entry_row in &entry_rows {
408 let entry_uri = AtUri::new(&entry_row.uri).map_err(|e| {
409 tracing::error!("Invalid entry URI in db: {}", e);
410 XrpcErrorResponse::internal_error("Invalid URI stored")
411 })?;
412
413 let entry_cid = Cid::new(entry_row.cid.as_bytes()).map_err(|e| {
414 tracing::error!("Invalid entry CID in db: {}", e);
415 XrpcErrorResponse::internal_error("Invalid CID stored")
416 })?;
417
418 // Get contributors for this entry
419 let entry_key = (entry_row.did.clone(), entry_row.rkey.clone());
420 let contributors = contributors_map
421 .get(&entry_key)
422 .map(|v| v.as_slice())
423 .unwrap_or(&[]);
424
425 let authors = hydrate_authors(contributors, &profile_map)?;
426 let record = parse_record_json(&entry_row.record)?;
427
428 let entry = EntryView::new()
429 .uri(entry_uri.into_static())
430 .cid(entry_cid.into_static())
431 .authors(authors)
432 .record(record)
433 .indexed_at(entry_row.indexed_at.fixed_offset())
434 .maybe_title(non_empty_str(&entry_row.title))
435 .maybe_path(non_empty_str(&entry_row.path))
436 .build();
437
438 entries.push(entry);
439 }
440
441 // Build cursor for pagination (created_at millis)
442 let next_cursor = if has_more {
443 entry_rows
444 .last()
445 .map(|e| e.created_at.timestamp_millis().to_cowstr().into_static())
446 } else {
447 None
448 };
449
450 Ok(Json(
451 GetActorEntriesOutput {
452 entries,
453 cursor: next_cursor,
454 extra_data: None,
455 }
456 .into_static(),
457 ))
458}
459
460/// Hydrate author list from DIDs using profile map
461fn hydrate_authors(
462 author_dids: &[SmolStr],
463 profile_map: &HashMap<&str, &ProfileRow>,
464) -> Result<Vec<AuthorListView<'static>>, XrpcErrorResponse> {
465 let mut authors = Vec::with_capacity(author_dids.len());
466
467 for (idx, did_str) in author_dids.iter().enumerate() {
468 let profile_data = if let Some(profile) = profile_map.get(did_str.as_str()) {
469 profile_to_data_view(profile)?
470 } else {
471 // No profile found - create minimal view with just the DID
472 let did = Did::new(did_str).map_err(|e| {
473 tracing::error!("Invalid DID in author_dids: {}", e);
474 XrpcErrorResponse::internal_error("Invalid DID stored")
475 })?;
476
477 let inner_profile = ProfileView::new()
478 .did(did.into_static())
479 .handle(
480 Handle::new(did_str)
481 .unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap()),
482 )
483 .build();
484
485 ProfileDataView::new()
486 .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile)))
487 .build()
488 };
489
490 let author_view = AuthorListView::new()
491 .index(idx as i64)
492 .record(profile_data.into_static())
493 .build();
494
495 authors.push(author_view);
496 }
497
498 Ok(authors)
499}
500
501/// Convert ProfileRow to ProfileDataView
502pub fn profile_to_data_view(
503 profile: &ProfileRow,
504) -> Result<ProfileDataView<'static>, XrpcErrorResponse> {
505 use jacquard::types::string::Uri;
506
507 let did = Did::new(&profile.did).map_err(|e| {
508 tracing::error!("Invalid DID in profile: {}", e);
509 XrpcErrorResponse::internal_error("Invalid DID stored")
510 })?;
511
512 let handle = if profile.handle.is_empty() {
513 Handle::new(&profile.did).unwrap_or_else(|_| Handle::new("unknown.invalid").unwrap())
514 } else {
515 Handle::new(&profile.handle).map_err(|e| {
516 tracing::error!("Invalid handle in profile: {}", e);
517 XrpcErrorResponse::internal_error("Invalid handle stored")
518 })?
519 };
520
521 // Build avatar URL from CID if present
522 let avatar = if !profile.avatar_cid.is_empty() {
523 let url = format!(
524 "https://cdn.bsky.app/img/avatar/plain/{}/{}@jpeg",
525 profile.did, profile.avatar_cid
526 );
527 Uri::new_owned(url).ok()
528 } else {
529 None
530 };
531
532 // Build banner URL from CID if present
533 let banner = if !profile.banner_cid.is_empty() {
534 let url = format!(
535 "https://cdn.bsky.app/img/banner/plain/{}/{}@jpeg",
536 profile.did, profile.banner_cid
537 );
538 Uri::new_owned(url).ok()
539 } else {
540 None
541 };
542
543 let inner_profile = ProfileView::new()
544 .did(did.into_static())
545 .handle(handle.into_static())
546 .maybe_display_name(non_empty_str(&profile.display_name))
547 .maybe_description(non_empty_str(&profile.description))
548 .maybe_avatar(avatar)
549 .maybe_banner(banner)
550 .build();
551
552 let profile_data = ProfileDataView::new()
553 .inner(ProfileDataViewInner::ProfileView(Box::new(inner_profile)))
554 .build();
555
556 Ok(profile_data)
557}
558
559/// Parse record JSON string into owned Data
560fn parse_record_json(
561 json: &str,
562) -> Result<jacquard::types::value::Data<'static>, XrpcErrorResponse> {
563 use jacquard::types::value::Data;
564
565 let data: Data<'_> = serde_json::from_str(json).map_err(|e| {
566 tracing::error!("Failed to parse record JSON: {}", e);
567 XrpcErrorResponse::internal_error("Invalid record JSON stored")
568 })?;
569 Ok(data.into_static())
570}