atproto blogging
1//! Collaboration endpoint handlers
2
3use std::collections::HashMap;
4
5use axum::{Json, extract::State};
6use jacquard::IntoStatic;
7use jacquard::cowstr::ToCowStr;
8use jacquard::types::datetime::Datetime;
9use jacquard::types::string::{AtUri, Cid, Did, Handle};
10use jacquard_axum::ExtractXrpc;
11use jacquard_axum::service_auth::ExtractOptionalServiceAuth;
12
13use weaver_api::com_atproto::repo::strong_ref::StrongRef;
14use weaver_api::sh_weaver::actor::ProfileViewBasic;
15use weaver_api::sh_weaver::collab::get_collaboration_state::{
16 GetCollaborationStateOutput, GetCollaborationStateRequest,
17};
18use weaver_api::sh_weaver::collab::get_resource_participants::{
19 GetResourceParticipantsOutput, GetResourceParticipantsRequest,
20};
21use weaver_api::sh_weaver::collab::get_resource_sessions::{
22 GetResourceSessionsOutput, GetResourceSessionsRequest,
23};
24use weaver_api::sh_weaver::collab::{CollaborationStateView, ParticipantStateView, SessionView};
25
26use crate::clickhouse::{CollaboratorRow, ProfileRow};
27use crate::endpoints::actor::Viewer;
28use crate::endpoints::repo::XrpcErrorResponse;
29use crate::endpoints::{non_empty_str, resolve_uri};
30use crate::server::AppState;
31
32/// Handle sh.weaver.collab.getResourceParticipants
33///
34/// Returns owner and collaborators who can edit the resource.
35pub async fn get_resource_participants(
36 State(state): State<AppState>,
37 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
38 ExtractXrpc(args): ExtractXrpc<GetResourceParticipantsRequest>,
39) -> Result<Json<GetResourceParticipantsOutput<'static>>, XrpcErrorResponse> {
40 let _viewer: Viewer = viewer;
41 let viewer_did: Option<&str> = _viewer.as_ref().map(|v| v.did().as_str());
42
43 // Resolve URI and get canonical form
44 let resolved = resolve_uri(&state, &args.resource).await?;
45
46 // Get all permissions for the resource
47 let permissions = state
48 .clickhouse
49 .get_resource_permissions(&resolved.canonical_uri)
50 .await
51 .map_err(|e| {
52 tracing::error!("Failed to get resource permissions: {}", e);
53 XrpcErrorResponse::internal_error("Database query failed")
54 })?;
55
56 if permissions.is_empty() {
57 return Err(XrpcErrorResponse::not_found("Resource not found"));
58 }
59
60 // Collect all DIDs for profile hydration
61 let all_dids: Vec<&str> = permissions.iter().map(|p| p.grantee_did.as_str()).collect();
62
63 // Batch fetch profiles
64 let profiles = state
65 .clickhouse
66 .get_profiles_batch(&all_dids)
67 .await
68 .map_err(|e| {
69 tracing::error!("Failed to batch fetch profiles: {}", e);
70 XrpcErrorResponse::internal_error("Database query failed")
71 })?;
72
73 let profile_map: HashMap<&str, &ProfileRow> =
74 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
75
76 // Find owner and build participants
77 let mut owner: Option<ProfileViewBasic<'static>> = None;
78 let mut participants: Vec<ProfileViewBasic<'static>> = Vec::new();
79
80 for perm in &permissions {
81 let profile_view = if let Some(profile) = profile_map.get(perm.grantee_did.as_str()) {
82 profile_to_view_basic(profile)?
83 } else {
84 // No profile found - skip (shouldn't happen if permissions table is consistent)
85 continue;
86 };
87
88 if perm.scope == "owner" {
89 owner = Some(profile_view);
90 } else {
91 participants.push(profile_view);
92 }
93 }
94
95 let owner = owner.ok_or_else(|| {
96 tracing::error!("Resource has no owner in permissions");
97 XrpcErrorResponse::internal_error("Resource has no owner")
98 })?;
99
100 // Check if viewer can edit
101 let viewer_can_edit = viewer_did.map(|v| all_dids.contains(&v));
102
103 Ok(Json(
104 GetResourceParticipantsOutput {
105 owner,
106 participants,
107 viewer_can_edit,
108 extra_data: None,
109 }
110 .into_static(),
111 ))
112}
113
114/// Convert ProfileRow to ProfileViewBasic directly
115pub fn profile_to_view_basic(
116 profile: &ProfileRow,
117) -> Result<ProfileViewBasic<'static>, XrpcErrorResponse> {
118 let did = Did::new_owned(profile.did.clone())
119 .map_err(|_| XrpcErrorResponse::internal_error("Invalid DID in profile"))?;
120
121 let handle = Handle::new_owned(profile.handle.clone())
122 .map_err(|_| XrpcErrorResponse::internal_error("Invalid handle in profile"))?;
123
124 Ok(ProfileViewBasic::new()
125 .did(did)
126 .handle(handle)
127 .maybe_display_name(non_empty_str(&profile.display_name))
128 .build())
129}
130
131/// Handle sh.weaver.collab.getCollaborationState
132///
133/// Returns full collaboration state for a resource.
134pub async fn get_collaboration_state(
135 State(state): State<AppState>,
136 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
137 ExtractXrpc(args): ExtractXrpc<GetCollaborationStateRequest>,
138) -> Result<Json<GetCollaborationStateOutput<'static>>, XrpcErrorResponse> {
139 let _viewer: Viewer = viewer;
140
141 // Resolve URI and get canonical form
142 let resolved = resolve_uri(&state, &args.resource).await?;
143
144 // Get permissions for the resource
145 let permissions = state
146 .clickhouse
147 .get_resource_permissions(&resolved.canonical_uri)
148 .await
149 .map_err(|e| {
150 tracing::error!("Failed to get resource permissions: {}", e);
151 XrpcErrorResponse::internal_error("Database query failed")
152 })?;
153
154 if permissions.is_empty() {
155 return Err(XrpcErrorResponse::not_found("Resource not found"));
156 }
157
158 // Get collaborators (invite+accept pairs) for additional data
159 let collaborators = state
160 .clickhouse
161 .get_collaborators(&resolved.canonical_uri)
162 .await
163 .map_err(|e| {
164 tracing::error!("Failed to get collaborators: {}", e);
165 XrpcErrorResponse::internal_error("Database query failed")
166 })?;
167
168 // Check for divergence
169 let has_divergence = state
170 .clickhouse
171 .has_divergence(&resolved.canonical_uri)
172 .await
173 .map_err(|e| {
174 tracing::error!("Failed to check divergence: {}", e);
175 XrpcErrorResponse::internal_error("Database query failed")
176 })?;
177
178 // Collect all DIDs for profile hydration
179 let all_dids: Vec<&str> = permissions.iter().map(|p| p.grantee_did.as_str()).collect();
180
181 // Batch fetch profiles
182 let profiles = state
183 .clickhouse
184 .get_profiles_batch(&all_dids)
185 .await
186 .map_err(|e| {
187 tracing::error!("Failed to batch fetch profiles: {}", e);
188 XrpcErrorResponse::internal_error("Database query failed")
189 })?;
190
191 let profile_map: HashMap<&str, &ProfileRow> =
192 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
193
194 // Build collaborator lookup for invite/accept URIs
195 let collab_map: HashMap<&str, &CollaboratorRow> = collaborators
196 .iter()
197 .map(|c| (c.collaborator_did.as_str(), c))
198 .collect();
199
200 // Find owner and get resource CID
201 let owner_perm = permissions
202 .iter()
203 .find(|p| p.scope == "owner")
204 .ok_or_else(|| {
205 tracing::error!("Resource has no owner in permissions");
206 XrpcErrorResponse::internal_error("Resource has no owner")
207 })?;
208
209 // Build resource StrongRef - look up the CID from the appropriate table
210 let resource_uri_parsed = AtUri::new(&resolved.canonical_uri)
211 .map_err(|_| XrpcErrorResponse::internal_error("Invalid resource URI"))?
212 .into_static();
213
214 // Look up the resource CID from raw_records
215 let resource_cid = state
216 .clickhouse
217 .get_record_cid(
218 &owner_perm.resource_did,
219 &owner_perm.resource_collection,
220 &owner_perm.resource_rkey,
221 )
222 .await
223 .map_err(|e| {
224 tracing::error!("Failed to get resource CID: {}", e);
225 XrpcErrorResponse::internal_error("Database query failed")
226 })?
227 .ok_or_else(|| XrpcErrorResponse::not_found("Resource not found in database"))?;
228
229 let resource = StrongRef::new()
230 .uri(resource_uri_parsed.clone())
231 .cid(
232 Cid::new(resource_cid.as_bytes())
233 .map_err(|_| XrpcErrorResponse::internal_error("Invalid resource CID"))?
234 .into_static(),
235 )
236 .build();
237
238 // Build participants
239 let mut participants: Vec<ParticipantStateView<'static>> = Vec::new();
240 let mut first_collab_at: Option<chrono::DateTime<chrono::Utc>> = None;
241
242 for perm in &permissions {
243 let profile = profile_map
244 .get(perm.grantee_did.as_str())
245 .ok_or_else(|| XrpcErrorResponse::internal_error("Missing profile for participant"))?;
246 let collab = collab_map.get(perm.grantee_did.as_str());
247
248 // Track first collaborator time
249 if perm.scope != "owner" {
250 if let Some(c) = collab {
251 match first_collab_at {
252 None => first_collab_at = Some(c.accepted_at),
253 Some(t) if c.accepted_at < t => first_collab_at = Some(c.accepted_at),
254 _ => {}
255 }
256 }
257 }
258
259 let participant = build_participant_state(profile, collab, &perm.scope)?;
260 participants.push(participant);
261 }
262
263 // Determine status
264 let status = if collaborators.is_empty() {
265 "solo"
266 } else if has_divergence {
267 "diverged"
268 } else {
269 "synced"
270 };
271
272 let collab_state = CollaborationStateView::new()
273 .resource(resource)
274 .status(status)
275 .participants(participants)
276 .maybe_canonical_uri(Some(resource_uri_parsed))
277 .maybe_has_divergence(Some(has_divergence))
278 .maybe_first_collaborator_added_at(
279 first_collab_at.map(|dt| Datetime::new(dt.fixed_offset())),
280 )
281 .build();
282
283 Ok(Json(
284 GetCollaborationStateOutput {
285 value: collab_state,
286 extra_data: None,
287 }
288 .into_static(),
289 ))
290}
291
292/// Build ParticipantStateView from available data
293fn build_participant_state(
294 profile: &ProfileRow,
295 collab: Option<&&CollaboratorRow>,
296 scope: &str,
297) -> Result<ParticipantStateView<'static>, XrpcErrorResponse> {
298 let user = profile_to_view_basic(profile)?;
299
300 let role = match scope {
301 "owner" => "owner",
302 "collaborator" => "collaborator",
303 _ => "unknown",
304 };
305
306 let status = if collab.is_some() {
307 "active"
308 } else {
309 "pending"
310 };
311
312 // Parse URIs if we have collab data
313 let (invite_uri, accept_uri) = if let Some(c) = collab {
314 let inv = AtUri::new(c.invite_uri.as_str())
315 .map_err(|_| XrpcErrorResponse::internal_error("Invalid invite URI"))?
316 .into_static();
317 let acc = AtUri::new(c.accept_uri.as_str())
318 .map_err(|_| XrpcErrorResponse::internal_error("Invalid accept URI"))?
319 .into_static();
320 (Some(inv), Some(acc))
321 } else {
322 (None, None)
323 };
324
325 Ok(ParticipantStateView::new()
326 .role(role)
327 .status(status)
328 .user(user)
329 .maybe_invite_uri(invite_uri)
330 .maybe_accept_uri(accept_uri)
331 .build())
332}
333
334/// Handle sh.weaver.collab.getResourceSessions
335///
336/// Returns active real-time collaboration sessions for a resource.
337pub async fn get_resource_sessions(
338 State(state): State<AppState>,
339 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
340 ExtractXrpc(args): ExtractXrpc<GetResourceSessionsRequest>,
341) -> Result<Json<GetResourceSessionsOutput<'static>>, XrpcErrorResponse> {
342 let _viewer: Viewer = viewer;
343
344 // Resolve URI and get canonical form
345 let resolved = resolve_uri(&state, &args.resource).await?;
346
347 // Get active sessions
348 let session_rows = state
349 .clickhouse
350 .get_resource_sessions(&resolved.canonical_uri)
351 .await
352 .map_err(|e| {
353 tracing::error!("Failed to get resource sessions: {}", e);
354 XrpcErrorResponse::internal_error("Database query failed")
355 })?;
356
357 if session_rows.is_empty() {
358 return Ok(Json(
359 GetResourceSessionsOutput {
360 sessions: Vec::new(),
361 extra_data: None,
362 }
363 .into_static(),
364 ));
365 }
366
367 // Collect user DIDs for profile hydration
368 let user_dids: Vec<&str> = session_rows.iter().map(|s| s.did.as_str()).collect();
369
370 // Batch fetch profiles
371 let profiles = state
372 .clickhouse
373 .get_profiles_batch(&user_dids)
374 .await
375 .map_err(|e| {
376 tracing::error!("Failed to batch fetch profiles: {}", e);
377 XrpcErrorResponse::internal_error("Database query failed")
378 })?;
379
380 let profile_map: HashMap<&str, &ProfileRow> =
381 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
382
383 // Build resource StrongRef once (same for all sessions)
384 let resource_cid = state
385 .clickhouse
386 .get_record_cid(&resolved.did, &resolved.collection, &resolved.rkey)
387 .await
388 .map_err(|e| {
389 tracing::error!("Failed to get resource CID: {}", e);
390 XrpcErrorResponse::internal_error("Database query failed")
391 })?
392 .ok_or_else(|| XrpcErrorResponse::not_found("Resource not found"))?;
393
394 let resource_ref = StrongRef::new()
395 .uri(args.resource.clone().into_static())
396 .cid(
397 Cid::new(resource_cid.as_bytes())
398 .map_err(|_| XrpcErrorResponse::internal_error("Invalid resource CID"))?
399 .into_static(),
400 )
401 .build();
402
403 // Build session views
404 let mut sessions = Vec::with_capacity(session_rows.len());
405 for row in &session_rows {
406 let uri = AtUri::new(&format!(
407 "at://{}/sh.weaver.collab.session/{}",
408 row.did, row.rkey
409 ))
410 .map_err(|_| XrpcErrorResponse::internal_error("Invalid session URI"))?
411 .into_static();
412
413 let user = profile_map
414 .get(row.did.as_str())
415 .map(|p| profile_to_view_basic(p))
416 .transpose()?
417 .ok_or_else(|| XrpcErrorResponse::internal_error("Missing user profile"))?;
418
419 let created_at = Datetime::new(row.created_at.fixed_offset());
420 let expires_at = row.expires_at.map(|dt| Datetime::new(dt.fixed_offset()));
421
422 let relay_url = if row.relay_url.is_empty() {
423 None
424 } else {
425 jacquard::types::string::Uri::new_owned(row.relay_url.to_string()).ok()
426 };
427
428 sessions.push(
429 SessionView::new()
430 .uri(uri)
431 .user(user)
432 .resource(resource_ref.clone())
433 .node_id(row.node_id.to_cowstr().into_static())
434 .created_at(created_at)
435 .maybe_relay_url(relay_url)
436 .maybe_expires_at(expires_at)
437 .build(),
438 );
439 }
440
441 Ok(Json(
442 GetResourceSessionsOutput {
443 sessions,
444 extra_data: None,
445 }
446 .into_static(),
447 ))
448}