at main 448 lines 15 kB view raw
1//! Collaboration endpoint handlers 2 3use std::collections::HashMap; 4 5use axum::{Json, extract::State}; 6use jacquard::IntoStatic; 7use jacquard::cowstr::ToCowStr; 8use jacquard::types::datetime::Datetime; 9use jacquard::types::string::{AtUri, Cid, Did, Handle}; 10use jacquard_axum::ExtractXrpc; 11use jacquard_axum::service_auth::ExtractOptionalServiceAuth; 12 13use weaver_api::com_atproto::repo::strong_ref::StrongRef; 14use weaver_api::sh_weaver::actor::ProfileViewBasic; 15use weaver_api::sh_weaver::collab::get_collaboration_state::{ 16 GetCollaborationStateOutput, GetCollaborationStateRequest, 17}; 18use weaver_api::sh_weaver::collab::get_resource_participants::{ 19 GetResourceParticipantsOutput, GetResourceParticipantsRequest, 20}; 21use weaver_api::sh_weaver::collab::get_resource_sessions::{ 22 GetResourceSessionsOutput, GetResourceSessionsRequest, 23}; 24use weaver_api::sh_weaver::collab::{CollaborationStateView, ParticipantStateView, SessionView}; 25 26use crate::clickhouse::{CollaboratorRow, ProfileRow}; 27use crate::endpoints::actor::Viewer; 28use crate::endpoints::repo::XrpcErrorResponse; 29use crate::endpoints::{non_empty_str, resolve_uri}; 30use crate::server::AppState; 31 32/// Handle sh.weaver.collab.getResourceParticipants 33/// 34/// Returns owner and collaborators who can edit the resource. 35pub async fn get_resource_participants( 36 State(state): State<AppState>, 37 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 38 ExtractXrpc(args): ExtractXrpc<GetResourceParticipantsRequest>, 39) -> Result<Json<GetResourceParticipantsOutput<'static>>, XrpcErrorResponse> { 40 let _viewer: Viewer = viewer; 41 let viewer_did: Option<&str> = _viewer.as_ref().map(|v| v.did().as_str()); 42 43 // Resolve URI and get canonical form 44 let resolved = resolve_uri(&state, &args.resource).await?; 45 46 // Get all permissions for the resource 47 let permissions = state 48 .clickhouse 49 .get_resource_permissions(&resolved.canonical_uri) 50 .await 51 .map_err(|e| { 52 tracing::error!("Failed to get resource permissions: {}", e); 53 XrpcErrorResponse::internal_error("Database query failed") 54 })?; 55 56 if permissions.is_empty() { 57 return Err(XrpcErrorResponse::not_found("Resource not found")); 58 } 59 60 // Collect all DIDs for profile hydration 61 let all_dids: Vec<&str> = permissions.iter().map(|p| p.grantee_did.as_str()).collect(); 62 63 // Batch fetch profiles 64 let profiles = state 65 .clickhouse 66 .get_profiles_batch(&all_dids) 67 .await 68 .map_err(|e| { 69 tracing::error!("Failed to batch fetch profiles: {}", e); 70 XrpcErrorResponse::internal_error("Database query failed") 71 })?; 72 73 let profile_map: HashMap<&str, &ProfileRow> = 74 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 75 76 // Find owner and build participants 77 let mut owner: Option<ProfileViewBasic<'static>> = None; 78 let mut participants: Vec<ProfileViewBasic<'static>> = Vec::new(); 79 80 for perm in &permissions { 81 let profile_view = if let Some(profile) = profile_map.get(perm.grantee_did.as_str()) { 82 profile_to_view_basic(profile)? 83 } else { 84 // No profile found - skip (shouldn't happen if permissions table is consistent) 85 continue; 86 }; 87 88 if perm.scope == "owner" { 89 owner = Some(profile_view); 90 } else { 91 participants.push(profile_view); 92 } 93 } 94 95 let owner = owner.ok_or_else(|| { 96 tracing::error!("Resource has no owner in permissions"); 97 XrpcErrorResponse::internal_error("Resource has no owner") 98 })?; 99 100 // Check if viewer can edit 101 let viewer_can_edit = viewer_did.map(|v| all_dids.contains(&v)); 102 103 Ok(Json( 104 GetResourceParticipantsOutput { 105 owner, 106 participants, 107 viewer_can_edit, 108 extra_data: None, 109 } 110 .into_static(), 111 )) 112} 113 114/// Convert ProfileRow to ProfileViewBasic directly 115pub fn profile_to_view_basic( 116 profile: &ProfileRow, 117) -> Result<ProfileViewBasic<'static>, XrpcErrorResponse> { 118 let did = Did::new_owned(profile.did.clone()) 119 .map_err(|_| XrpcErrorResponse::internal_error("Invalid DID in profile"))?; 120 121 let handle = Handle::new_owned(profile.handle.clone()) 122 .map_err(|_| XrpcErrorResponse::internal_error("Invalid handle in profile"))?; 123 124 Ok(ProfileViewBasic::new() 125 .did(did) 126 .handle(handle) 127 .maybe_display_name(non_empty_str(&profile.display_name)) 128 .build()) 129} 130 131/// Handle sh.weaver.collab.getCollaborationState 132/// 133/// Returns full collaboration state for a resource. 134pub async fn get_collaboration_state( 135 State(state): State<AppState>, 136 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 137 ExtractXrpc(args): ExtractXrpc<GetCollaborationStateRequest>, 138) -> Result<Json<GetCollaborationStateOutput<'static>>, XrpcErrorResponse> { 139 let _viewer: Viewer = viewer; 140 141 // Resolve URI and get canonical form 142 let resolved = resolve_uri(&state, &args.resource).await?; 143 144 // Get permissions for the resource 145 let permissions = state 146 .clickhouse 147 .get_resource_permissions(&resolved.canonical_uri) 148 .await 149 .map_err(|e| { 150 tracing::error!("Failed to get resource permissions: {}", e); 151 XrpcErrorResponse::internal_error("Database query failed") 152 })?; 153 154 if permissions.is_empty() { 155 return Err(XrpcErrorResponse::not_found("Resource not found")); 156 } 157 158 // Get collaborators (invite+accept pairs) for additional data 159 let collaborators = state 160 .clickhouse 161 .get_collaborators(&resolved.canonical_uri) 162 .await 163 .map_err(|e| { 164 tracing::error!("Failed to get collaborators: {}", e); 165 XrpcErrorResponse::internal_error("Database query failed") 166 })?; 167 168 // Check for divergence 169 let has_divergence = state 170 .clickhouse 171 .has_divergence(&resolved.canonical_uri) 172 .await 173 .map_err(|e| { 174 tracing::error!("Failed to check divergence: {}", e); 175 XrpcErrorResponse::internal_error("Database query failed") 176 })?; 177 178 // Collect all DIDs for profile hydration 179 let all_dids: Vec<&str> = permissions.iter().map(|p| p.grantee_did.as_str()).collect(); 180 181 // Batch fetch profiles 182 let profiles = state 183 .clickhouse 184 .get_profiles_batch(&all_dids) 185 .await 186 .map_err(|e| { 187 tracing::error!("Failed to batch fetch profiles: {}", e); 188 XrpcErrorResponse::internal_error("Database query failed") 189 })?; 190 191 let profile_map: HashMap<&str, &ProfileRow> = 192 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 193 194 // Build collaborator lookup for invite/accept URIs 195 let collab_map: HashMap<&str, &CollaboratorRow> = collaborators 196 .iter() 197 .map(|c| (c.collaborator_did.as_str(), c)) 198 .collect(); 199 200 // Find owner and get resource CID 201 let owner_perm = permissions 202 .iter() 203 .find(|p| p.scope == "owner") 204 .ok_or_else(|| { 205 tracing::error!("Resource has no owner in permissions"); 206 XrpcErrorResponse::internal_error("Resource has no owner") 207 })?; 208 209 // Build resource StrongRef - look up the CID from the appropriate table 210 let resource_uri_parsed = AtUri::new(&resolved.canonical_uri) 211 .map_err(|_| XrpcErrorResponse::internal_error("Invalid resource URI"))? 212 .into_static(); 213 214 // Look up the resource CID from raw_records 215 let resource_cid = state 216 .clickhouse 217 .get_record_cid( 218 &owner_perm.resource_did, 219 &owner_perm.resource_collection, 220 &owner_perm.resource_rkey, 221 ) 222 .await 223 .map_err(|e| { 224 tracing::error!("Failed to get resource CID: {}", e); 225 XrpcErrorResponse::internal_error("Database query failed") 226 })? 227 .ok_or_else(|| XrpcErrorResponse::not_found("Resource not found in database"))?; 228 229 let resource = StrongRef::new() 230 .uri(resource_uri_parsed.clone()) 231 .cid( 232 Cid::new(resource_cid.as_bytes()) 233 .map_err(|_| XrpcErrorResponse::internal_error("Invalid resource CID"))? 234 .into_static(), 235 ) 236 .build(); 237 238 // Build participants 239 let mut participants: Vec<ParticipantStateView<'static>> = Vec::new(); 240 let mut first_collab_at: Option<chrono::DateTime<chrono::Utc>> = None; 241 242 for perm in &permissions { 243 let profile = profile_map 244 .get(perm.grantee_did.as_str()) 245 .ok_or_else(|| XrpcErrorResponse::internal_error("Missing profile for participant"))?; 246 let collab = collab_map.get(perm.grantee_did.as_str()); 247 248 // Track first collaborator time 249 if perm.scope != "owner" { 250 if let Some(c) = collab { 251 match first_collab_at { 252 None => first_collab_at = Some(c.accepted_at), 253 Some(t) if c.accepted_at < t => first_collab_at = Some(c.accepted_at), 254 _ => {} 255 } 256 } 257 } 258 259 let participant = build_participant_state(profile, collab, &perm.scope)?; 260 participants.push(participant); 261 } 262 263 // Determine status 264 let status = if collaborators.is_empty() { 265 "solo" 266 } else if has_divergence { 267 "diverged" 268 } else { 269 "synced" 270 }; 271 272 let collab_state = CollaborationStateView::new() 273 .resource(resource) 274 .status(status) 275 .participants(participants) 276 .maybe_canonical_uri(Some(resource_uri_parsed)) 277 .maybe_has_divergence(Some(has_divergence)) 278 .maybe_first_collaborator_added_at( 279 first_collab_at.map(|dt| Datetime::new(dt.fixed_offset())), 280 ) 281 .build(); 282 283 Ok(Json( 284 GetCollaborationStateOutput { 285 value: collab_state, 286 extra_data: None, 287 } 288 .into_static(), 289 )) 290} 291 292/// Build ParticipantStateView from available data 293fn build_participant_state( 294 profile: &ProfileRow, 295 collab: Option<&&CollaboratorRow>, 296 scope: &str, 297) -> Result<ParticipantStateView<'static>, XrpcErrorResponse> { 298 let user = profile_to_view_basic(profile)?; 299 300 let role = match scope { 301 "owner" => "owner", 302 "collaborator" => "collaborator", 303 _ => "unknown", 304 }; 305 306 let status = if collab.is_some() { 307 "active" 308 } else { 309 "pending" 310 }; 311 312 // Parse URIs if we have collab data 313 let (invite_uri, accept_uri) = if let Some(c) = collab { 314 let inv = AtUri::new(c.invite_uri.as_str()) 315 .map_err(|_| XrpcErrorResponse::internal_error("Invalid invite URI"))? 316 .into_static(); 317 let acc = AtUri::new(c.accept_uri.as_str()) 318 .map_err(|_| XrpcErrorResponse::internal_error("Invalid accept URI"))? 319 .into_static(); 320 (Some(inv), Some(acc)) 321 } else { 322 (None, None) 323 }; 324 325 Ok(ParticipantStateView::new() 326 .role(role) 327 .status(status) 328 .user(user) 329 .maybe_invite_uri(invite_uri) 330 .maybe_accept_uri(accept_uri) 331 .build()) 332} 333 334/// Handle sh.weaver.collab.getResourceSessions 335/// 336/// Returns active real-time collaboration sessions for a resource. 337pub async fn get_resource_sessions( 338 State(state): State<AppState>, 339 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth, 340 ExtractXrpc(args): ExtractXrpc<GetResourceSessionsRequest>, 341) -> Result<Json<GetResourceSessionsOutput<'static>>, XrpcErrorResponse> { 342 let _viewer: Viewer = viewer; 343 344 // Resolve URI and get canonical form 345 let resolved = resolve_uri(&state, &args.resource).await?; 346 347 // Get active sessions 348 let session_rows = state 349 .clickhouse 350 .get_resource_sessions(&resolved.canonical_uri) 351 .await 352 .map_err(|e| { 353 tracing::error!("Failed to get resource sessions: {}", e); 354 XrpcErrorResponse::internal_error("Database query failed") 355 })?; 356 357 if session_rows.is_empty() { 358 return Ok(Json( 359 GetResourceSessionsOutput { 360 sessions: Vec::new(), 361 extra_data: None, 362 } 363 .into_static(), 364 )); 365 } 366 367 // Collect user DIDs for profile hydration 368 let user_dids: Vec<&str> = session_rows.iter().map(|s| s.did.as_str()).collect(); 369 370 // Batch fetch profiles 371 let profiles = state 372 .clickhouse 373 .get_profiles_batch(&user_dids) 374 .await 375 .map_err(|e| { 376 tracing::error!("Failed to batch fetch profiles: {}", e); 377 XrpcErrorResponse::internal_error("Database query failed") 378 })?; 379 380 let profile_map: HashMap<&str, &ProfileRow> = 381 profiles.iter().map(|p| (p.did.as_str(), p)).collect(); 382 383 // Build resource StrongRef once (same for all sessions) 384 let resource_cid = state 385 .clickhouse 386 .get_record_cid(&resolved.did, &resolved.collection, &resolved.rkey) 387 .await 388 .map_err(|e| { 389 tracing::error!("Failed to get resource CID: {}", e); 390 XrpcErrorResponse::internal_error("Database query failed") 391 })? 392 .ok_or_else(|| XrpcErrorResponse::not_found("Resource not found"))?; 393 394 let resource_ref = StrongRef::new() 395 .uri(args.resource.clone().into_static()) 396 .cid( 397 Cid::new(resource_cid.as_bytes()) 398 .map_err(|_| XrpcErrorResponse::internal_error("Invalid resource CID"))? 399 .into_static(), 400 ) 401 .build(); 402 403 // Build session views 404 let mut sessions = Vec::with_capacity(session_rows.len()); 405 for row in &session_rows { 406 let uri = AtUri::new(&format!( 407 "at://{}/sh.weaver.collab.session/{}", 408 row.did, row.rkey 409 )) 410 .map_err(|_| XrpcErrorResponse::internal_error("Invalid session URI"))? 411 .into_static(); 412 413 let user = profile_map 414 .get(row.did.as_str()) 415 .map(|p| profile_to_view_basic(p)) 416 .transpose()? 417 .ok_or_else(|| XrpcErrorResponse::internal_error("Missing user profile"))?; 418 419 let created_at = Datetime::new(row.created_at.fixed_offset()); 420 let expires_at = row.expires_at.map(|dt| Datetime::new(dt.fixed_offset())); 421 422 let relay_url = if row.relay_url.is_empty() { 423 None 424 } else { 425 jacquard::types::string::Uri::new_owned(row.relay_url.to_string()).ok() 426 }; 427 428 sessions.push( 429 SessionView::new() 430 .uri(uri) 431 .user(user) 432 .resource(resource_ref.clone()) 433 .node_id(row.node_id.to_cowstr().into_static()) 434 .created_at(created_at) 435 .maybe_relay_url(relay_url) 436 .maybe_expires_at(expires_at) 437 .build(), 438 ); 439 } 440 441 Ok(Json( 442 GetResourceSessionsOutput { 443 sessions, 444 extra_data: None, 445 } 446 .into_static(), 447 )) 448}