at main 411 lines 13 kB view raw
1//! Edit endpoint handlers 2 3use std::collections::HashMap; 4 5use axum::{Json, extract::State}; 6use jacquard::IntoStatic; 7use jacquard::cowstr::ToCowStr; 8use jacquard::types::datetime::Datetime; 9use jacquard::types::string::{AtUri, Cid}; 10use jacquard_axum::ExtractXrpc; 11use jacquard_axum::service_auth::ExtractOptionalServiceAuth; 12 13use weaver_api::com_atproto::repo::strong_ref::StrongRef; 14use weaver_api::sh_weaver::edit::get_contributors::{ 15 GetContributorsOutput, GetContributorsRequest, 16}; 17use weaver_api::sh_weaver::edit::get_edit_history::{GetEditHistoryOutput, GetEditHistoryRequest}; 18use weaver_api::sh_weaver::edit::list_drafts::{DraftView, ListDraftsOutput, ListDraftsRequest}; 19use weaver_api::sh_weaver::edit::{EditHistoryEntry, EditHistoryEntryType}; 20 21use crate::clickhouse::{EditNodeRow, ProfileRow}; 22use crate::endpoints::actor::{Viewer, resolve_actor}; 23use crate::endpoints::collab::profile_to_view_basic; 24use crate::endpoints::repo::XrpcErrorResponse; 25use crate::endpoints::resolve_uri; 26use crate::server::AppState; 27 28/// Handle sh.weaver.edit.getEditHistory 29/// 30/// Returns edit history (roots and diffs) for a resource. 31pub async fn get_edit_history( 32 State(state): State<AppState>, 33 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 34 ExtractXrpc(args): ExtractXrpc<GetEditHistoryRequest>, 35) -> Result<Json<GetEditHistoryOutput<'static>>, XrpcErrorResponse> { 36 let _viewer: Viewer = viewer; 37 38 let limit = args.limit.unwrap_or(50).min(100).max(1); 39 40 // Resolve URI and get canonical form 41 let resolved = resolve_uri(&state, &args.resource).await?; 42 43 // Parse cursor as millisecond timestamp 44 let cursor = args 45 .cursor 46 .as_deref() 47 .map(|c| c.parse::<i64>()) 48 .transpose() 49 .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))?; 50 51 let after_rkey = args.after_rkey.as_deref(); 52 53 // Fetch edit nodes 54 let nodes = state 55 .clickhouse 56 .get_edit_history( 57 &resolved.did, 58 &resolved.collection, 59 &resolved.rkey, 60 cursor, 61 after_rkey, 62 limit + 1, 63 ) 64 .await 65 .map_err(|e| { 66 tracing::error!("Failed to get edit history: {}", e); 67 XrpcErrorResponse::internal_error("Database query failed") 68 })?; 69 70 // Check if there are more results 71 let has_more = nodes.len() > limit as usize; 72 let nodes: Vec<_> = nodes.into_iter().take(limit as usize).collect(); 73 74 // Collect unique author DIDs 75 let author_dids: Vec<&str> = nodes.iter().map(|n| n.did.as_str()).collect(); 76 let unique_dids: Vec<&str> = author_dids 77 .iter() 78 .copied() 79 .collect::<std::collections::HashSet<_>>() 80 .into_iter() 81 .collect(); 82 83 // Batch fetch profiles 84 let profiles = state 85 .clickhouse 86 .get_profiles_batch(&unique_dids) 87 .await 88 .map_err(|e| { 89 tracing::error!("Failed to batch fetch profiles: {}", e); 90 XrpcErrorResponse::internal_error("Database query failed") 91 })?; 92 93 let profile_map: HashMap<&str, &ProfileRow> = 94 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 95 96 // Separate roots and diffs, building EditHistoryEntry for each 97 let mut roots = Vec::new(); 98 let mut diffs = Vec::new(); 99 100 for node in &nodes { 101 let entry = node_to_history_entry(node, &profile_map)?; 102 103 if node.node_type == "root" { 104 roots.push(entry); 105 } else { 106 diffs.push(entry); 107 } 108 } 109 110 // Build cursor from last node's created_at 111 let next_cursor = if has_more { 112 nodes 113 .last() 114 .map(|n| n.created_at.timestamp_millis().to_cowstr().into_static()) 115 } else { 116 None 117 }; 118 119 Ok(Json( 120 GetEditHistoryOutput { 121 roots, 122 diffs, 123 cursor: next_cursor, 124 extra_data: None, 125 } 126 .into_static(), 127 )) 128} 129 130/// Convert EditNodeRow to EditHistoryEntry 131fn node_to_history_entry( 132 node: &EditNodeRow, 133 profile_map: &HashMap<&str, &ProfileRow>, 134) -> Result<EditHistoryEntry<'static>, XrpcErrorResponse> { 135 let author = profile_map 136 .get(node.did.as_str()) 137 .map(|p| profile_to_view_basic(p)) 138 .transpose()? 139 .ok_or_else(|| XrpcErrorResponse::internal_error("Author profile not found"))?; 140 141 // Build URI 142 let uri = AtUri::new(&format!( 143 "at://{}/{}/{}", 144 node.did, node.collection, node.rkey 145 )) 146 .map_err(|_| XrpcErrorResponse::internal_error("Invalid AT URI"))? 147 .into_static(); 148 149 let cid = Cid::new(node.cid.as_bytes()) 150 .map_err(|_| XrpcErrorResponse::internal_error("Invalid CID"))? 151 .into_static(); 152 153 // Build optional StrongRefs for diffs 154 let root_ref = if !node.root_cid.is_empty() { 155 let root_uri = AtUri::new(&format!( 156 "at://{}/sh.weaver.edit.root/{}", 157 node.root_did, node.root_rkey 158 )) 159 .map_err(|_| XrpcErrorResponse::internal_error("Invalid root URI"))? 160 .into_static(); 161 162 let root_cid = Cid::new(node.root_cid.as_bytes()) 163 .map_err(|_| XrpcErrorResponse::internal_error("Invalid root CID"))? 164 .into_static(); 165 166 Some(StrongRef::new().uri(root_uri).cid(root_cid).build()) 167 } else { 168 None 169 }; 170 171 let prev_ref = if !node.prev_cid.is_empty() { 172 let prev_uri = AtUri::new(&format!( 173 "at://{}/sh.weaver.edit.diff/{}", 174 node.prev_did, node.prev_rkey 175 )) 176 .map_err(|_| XrpcErrorResponse::internal_error("Invalid prev URI"))? 177 .into_static(); 178 179 let prev_cid = Cid::new(node.prev_cid.as_bytes()) 180 .map_err(|_| XrpcErrorResponse::internal_error("Invalid prev CID"))? 181 .into_static(); 182 183 Some(StrongRef::new().uri(prev_uri).cid(prev_cid).build()) 184 } else { 185 None 186 }; 187 188 let created_at = Datetime::new(node.created_at.fixed_offset()); 189 190 Ok(EditHistoryEntry::new() 191 .uri(uri) 192 .cid(cid) 193 .author(author) 194 .created_at(created_at) 195 .r#type(EditHistoryEntryType::from(node.node_type.as_str()).into_static()) 196 .maybe_has_inline_diff(Some(node.has_inline_diff == 1)) 197 .maybe_prev_ref(prev_ref) 198 .maybe_root_ref(root_ref) 199 .build()) 200} 201 202/// Handle sh.weaver.edit.getContributors 203/// 204/// Returns evidence-based contributors for a resource (entry or notebook). 205pub async fn get_contributors( 206 State(state): State<AppState>, 207 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 208 ExtractXrpc(args): ExtractXrpc<GetContributorsRequest>, 209) -> Result<Json<GetContributorsOutput<'static>>, XrpcErrorResponse> { 210 let _viewer: Viewer = viewer; 211 212 let include_cascaded = args.include_cascaded.unwrap_or(true); 213 214 // Resolve URI and get canonical form 215 let resolved = resolve_uri(&state, &args.resource).await?; 216 217 // Get contributors based on resource type 218 let contributor_dids = match resolved.collection.as_str() { 219 "sh.weaver.notebook.entry" => { 220 if include_cascaded { 221 state 222 .clickhouse 223 .get_entry_contributors(&resolved.did, &resolved.rkey) 224 .await 225 } else { 226 state 227 .clickhouse 228 .get_entry_contributors_direct(&resolved.did, &resolved.rkey) 229 .await 230 } 231 } 232 "sh.weaver.notebook.book" => { 233 state 234 .clickhouse 235 .get_notebook_contributors(&resolved.did, &resolved.rkey) 236 .await 237 } 238 _ => { 239 return Err(XrpcErrorResponse::invalid_request( 240 "Resource must be an entry or notebook", 241 )); 242 } 243 } 244 .map_err(|e| { 245 tracing::error!("Failed to get contributors: {}", e); 246 XrpcErrorResponse::internal_error("Database query failed") 247 })?; 248 249 if contributor_dids.is_empty() { 250 return Ok(Json( 251 GetContributorsOutput { 252 contributors: Vec::new(), 253 extra_data: None, 254 } 255 .into_static(), 256 )); 257 } 258 259 // Batch fetch profiles 260 let did_refs: Vec<&str> = contributor_dids.iter().map(|s| s.as_str()).collect(); 261 let profiles = state 262 .clickhouse 263 .get_profiles_batch(&did_refs) 264 .await 265 .map_err(|e| { 266 tracing::error!("Failed to batch fetch profiles: {}", e); 267 XrpcErrorResponse::internal_error("Database query failed") 268 })?; 269 270 let profile_map: HashMap<&str, &ProfileRow> = 271 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 272 273 // Build contributor list 274 let mut contributors = Vec::with_capacity(contributor_dids.len()); 275 for did in &contributor_dids { 276 if let Some(profile) = profile_map.get(did.as_str()) { 277 contributors.push(profile_to_view_basic(profile)?); 278 } 279 } 280 281 Ok(Json( 282 GetContributorsOutput { 283 contributors, 284 extra_data: None, 285 } 286 .into_static(), 287 )) 288} 289 290/// Handle sh.weaver.edit.listDrafts 291/// 292/// Returns draft records for an actor. Requires authentication. 293/// Only returns drafts if viewer is the actor or has collab permission. 294pub async fn list_drafts( 295 State(state): State<AppState>, 296 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 297 ExtractXrpc(args): ExtractXrpc<ListDraftsRequest>, 298) -> Result<Json<ListDraftsOutput<'static>>, XrpcErrorResponse> { 299 // Require authentication 300 let viewer = 301 viewer.ok_or_else(|| XrpcErrorResponse::auth_required("Authentication required"))?; 302 let viewer_did = viewer.did(); 303 304 let limit = args.limit.unwrap_or(50).min(100).max(1); 305 306 // Parse cursor as millisecond timestamp 307 let cursor = args 308 .cursor 309 .as_deref() 310 .map(|c| c.parse::<i64>()) 311 .transpose() 312 .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))?; 313 314 // Resolve actor to DID 315 let actor_did = resolve_actor(&state, &args.actor).await?; 316 317 // Permission check: viewer must be the actor (owner access) 318 // TODO: Add collab grant check for draft sharing 319 if viewer_did.as_str() != actor_did.as_str() { 320 return Err(XrpcErrorResponse::forbidden( 321 "Cannot view another user's drafts", 322 )); 323 } 324 325 // Fetch drafts 326 let draft_rows = state 327 .clickhouse 328 .list_drafts(&actor_did, cursor, limit + 1) 329 .await 330 .map_err(|e| { 331 tracing::error!("Failed to list drafts: {}", e); 332 XrpcErrorResponse::internal_error("Database query failed") 333 })?; 334 335 // Check if there are more results 336 let has_more = draft_rows.len() > limit as usize; 337 let draft_rows: Vec<_> = draft_rows.into_iter().take(limit as usize).collect(); 338 339 // Build draft views 340 let mut drafts = Vec::with_capacity(draft_rows.len()); 341 for row in &draft_rows { 342 let uri = AtUri::new(&format!( 343 "at://{}/sh.weaver.edit.draft/{}", 344 row.did, row.rkey 345 )) 346 .map_err(|_| XrpcErrorResponse::internal_error("Invalid AT URI"))? 347 .into_static(); 348 349 let cid = Cid::new(row.cid.as_bytes()) 350 .map_err(|_| XrpcErrorResponse::internal_error("Invalid CID"))? 351 .into_static(); 352 353 let created_at = Datetime::new(row.created_at.fixed_offset()); 354 355 // Build optional edit root reference 356 let edit_root = if !row.root_cid.is_empty() { 357 let root_uri = AtUri::new(&format!( 358 "at://{}/sh.weaver.edit.root/{}", 359 row.root_did, row.root_rkey 360 )) 361 .map_err(|_| XrpcErrorResponse::internal_error("Invalid root URI"))? 362 .into_static(); 363 364 let root_cid = Cid::new(row.root_cid.as_bytes()) 365 .map_err(|_| XrpcErrorResponse::internal_error("Invalid root CID"))? 366 .into_static(); 367 368 Some(StrongRef::new().uri(root_uri).cid(root_cid).build()) 369 } else { 370 None 371 }; 372 373 let last_edit_at = row.last_edit_at.map(|dt| Datetime::new(dt.fixed_offset())); 374 375 // Include title if available 376 let title = if row.title.is_empty() { 377 None 378 } else { 379 Some(row.title.to_cowstr().into_static()) 380 }; 381 382 drafts.push( 383 DraftView::new() 384 .uri(uri) 385 .cid(cid) 386 .created_at(created_at) 387 .maybe_edit_root(edit_root) 388 .maybe_last_edit_at(last_edit_at) 389 .maybe_title(title) 390 .build(), 391 ); 392 } 393 394 // Build cursor from last draft's created_at 395 let next_cursor = if has_more { 396 draft_rows 397 .last() 398 .map(|d| d.created_at.timestamp_millis().to_cowstr().into_static()) 399 } else { 400 None 401 }; 402 403 Ok(Json( 404 ListDraftsOutput { 405 drafts, 406 cursor: next_cursor, 407 extra_data: None, 408 } 409 .into_static(), 410 )) 411}