atproto blogging
1//! Edit endpoint handlers
2
3use std::collections::HashMap;
4
5use axum::{Json, extract::State};
6use jacquard::IntoStatic;
7use jacquard::cowstr::ToCowStr;
8use jacquard::types::datetime::Datetime;
9use jacquard::types::string::{AtUri, Cid};
10use jacquard_axum::ExtractXrpc;
11use jacquard_axum::service_auth::ExtractOptionalServiceAuth;
12
13use weaver_api::com_atproto::repo::strong_ref::StrongRef;
14use weaver_api::sh_weaver::edit::get_contributors::{
15 GetContributorsOutput, GetContributorsRequest,
16};
17use weaver_api::sh_weaver::edit::get_edit_history::{GetEditHistoryOutput, GetEditHistoryRequest};
18use weaver_api::sh_weaver::edit::list_drafts::{DraftView, ListDraftsOutput, ListDraftsRequest};
19use weaver_api::sh_weaver::edit::{EditHistoryEntry, EditHistoryEntryType};
20
21use crate::clickhouse::{EditNodeRow, ProfileRow};
22use crate::endpoints::actor::{Viewer, resolve_actor};
23use crate::endpoints::collab::profile_to_view_basic;
24use crate::endpoints::repo::XrpcErrorResponse;
25use crate::endpoints::resolve_uri;
26use crate::server::AppState;
27
28/// Handle sh.weaver.edit.getEditHistory
29///
30/// Returns edit history (roots and diffs) for a resource.
31pub async fn get_edit_history(
32 State(state): State<AppState>,
33 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
34 ExtractXrpc(args): ExtractXrpc<GetEditHistoryRequest>,
35) -> Result<Json<GetEditHistoryOutput<'static>>, XrpcErrorResponse> {
36 let _viewer: Viewer = viewer;
37
38 let limit = args.limit.unwrap_or(50).min(100).max(1);
39
40 // Resolve URI and get canonical form
41 let resolved = resolve_uri(&state, &args.resource).await?;
42
43 // Parse cursor as millisecond timestamp
44 let cursor = args
45 .cursor
46 .as_deref()
47 .map(|c| c.parse::<i64>())
48 .transpose()
49 .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))?;
50
51 let after_rkey = args.after_rkey.as_deref();
52
53 // Fetch edit nodes
54 let nodes = state
55 .clickhouse
56 .get_edit_history(
57 &resolved.did,
58 &resolved.collection,
59 &resolved.rkey,
60 cursor,
61 after_rkey,
62 limit + 1,
63 )
64 .await
65 .map_err(|e| {
66 tracing::error!("Failed to get edit history: {}", e);
67 XrpcErrorResponse::internal_error("Database query failed")
68 })?;
69
70 // Check if there are more results
71 let has_more = nodes.len() > limit as usize;
72 let nodes: Vec<_> = nodes.into_iter().take(limit as usize).collect();
73
74 // Collect unique author DIDs
75 let author_dids: Vec<&str> = nodes.iter().map(|n| n.did.as_str()).collect();
76 let unique_dids: Vec<&str> = author_dids
77 .iter()
78 .copied()
79 .collect::<std::collections::HashSet<_>>()
80 .into_iter()
81 .collect();
82
83 // Batch fetch profiles
84 let profiles = state
85 .clickhouse
86 .get_profiles_batch(&unique_dids)
87 .await
88 .map_err(|e| {
89 tracing::error!("Failed to batch fetch profiles: {}", e);
90 XrpcErrorResponse::internal_error("Database query failed")
91 })?;
92
93 let profile_map: HashMap<&str, &ProfileRow> =
94 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
95
96 // Separate roots and diffs, building EditHistoryEntry for each
97 let mut roots = Vec::new();
98 let mut diffs = Vec::new();
99
100 for node in &nodes {
101 let entry = node_to_history_entry(node, &profile_map)?;
102
103 if node.node_type == "root" {
104 roots.push(entry);
105 } else {
106 diffs.push(entry);
107 }
108 }
109
110 // Build cursor from last node's created_at
111 let next_cursor = if has_more {
112 nodes
113 .last()
114 .map(|n| n.created_at.timestamp_millis().to_cowstr().into_static())
115 } else {
116 None
117 };
118
119 Ok(Json(
120 GetEditHistoryOutput {
121 roots,
122 diffs,
123 cursor: next_cursor,
124 extra_data: None,
125 }
126 .into_static(),
127 ))
128}
129
130/// Convert EditNodeRow to EditHistoryEntry
131fn node_to_history_entry(
132 node: &EditNodeRow,
133 profile_map: &HashMap<&str, &ProfileRow>,
134) -> Result<EditHistoryEntry<'static>, XrpcErrorResponse> {
135 let author = profile_map
136 .get(node.did.as_str())
137 .map(|p| profile_to_view_basic(p))
138 .transpose()?
139 .ok_or_else(|| XrpcErrorResponse::internal_error("Author profile not found"))?;
140
141 // Build URI
142 let uri = AtUri::new(&format!(
143 "at://{}/{}/{}",
144 node.did, node.collection, node.rkey
145 ))
146 .map_err(|_| XrpcErrorResponse::internal_error("Invalid AT URI"))?
147 .into_static();
148
149 let cid = Cid::new(node.cid.as_bytes())
150 .map_err(|_| XrpcErrorResponse::internal_error("Invalid CID"))?
151 .into_static();
152
153 // Build optional StrongRefs for diffs
154 let root_ref = if !node.root_cid.is_empty() {
155 let root_uri = AtUri::new(&format!(
156 "at://{}/sh.weaver.edit.root/{}",
157 node.root_did, node.root_rkey
158 ))
159 .map_err(|_| XrpcErrorResponse::internal_error("Invalid root URI"))?
160 .into_static();
161
162 let root_cid = Cid::new(node.root_cid.as_bytes())
163 .map_err(|_| XrpcErrorResponse::internal_error("Invalid root CID"))?
164 .into_static();
165
166 Some(StrongRef::new().uri(root_uri).cid(root_cid).build())
167 } else {
168 None
169 };
170
171 let prev_ref = if !node.prev_cid.is_empty() {
172 let prev_uri = AtUri::new(&format!(
173 "at://{}/sh.weaver.edit.diff/{}",
174 node.prev_did, node.prev_rkey
175 ))
176 .map_err(|_| XrpcErrorResponse::internal_error("Invalid prev URI"))?
177 .into_static();
178
179 let prev_cid = Cid::new(node.prev_cid.as_bytes())
180 .map_err(|_| XrpcErrorResponse::internal_error("Invalid prev CID"))?
181 .into_static();
182
183 Some(StrongRef::new().uri(prev_uri).cid(prev_cid).build())
184 } else {
185 None
186 };
187
188 let created_at = Datetime::new(node.created_at.fixed_offset());
189
190 Ok(EditHistoryEntry::new()
191 .uri(uri)
192 .cid(cid)
193 .author(author)
194 .created_at(created_at)
195 .r#type(EditHistoryEntryType::from(node.node_type.as_str()).into_static())
196 .maybe_has_inline_diff(Some(node.has_inline_diff == 1))
197 .maybe_prev_ref(prev_ref)
198 .maybe_root_ref(root_ref)
199 .build())
200}
201
202/// Handle sh.weaver.edit.getContributors
203///
204/// Returns evidence-based contributors for a resource (entry or notebook).
205pub async fn get_contributors(
206 State(state): State<AppState>,
207 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
208 ExtractXrpc(args): ExtractXrpc<GetContributorsRequest>,
209) -> Result<Json<GetContributorsOutput<'static>>, XrpcErrorResponse> {
210 let _viewer: Viewer = viewer;
211
212 let include_cascaded = args.include_cascaded.unwrap_or(true);
213
214 // Resolve URI and get canonical form
215 let resolved = resolve_uri(&state, &args.resource).await?;
216
217 // Get contributors based on resource type
218 let contributor_dids = match resolved.collection.as_str() {
219 "sh.weaver.notebook.entry" => {
220 if include_cascaded {
221 state
222 .clickhouse
223 .get_entry_contributors(&resolved.did, &resolved.rkey)
224 .await
225 } else {
226 state
227 .clickhouse
228 .get_entry_contributors_direct(&resolved.did, &resolved.rkey)
229 .await
230 }
231 }
232 "sh.weaver.notebook.book" => {
233 state
234 .clickhouse
235 .get_notebook_contributors(&resolved.did, &resolved.rkey)
236 .await
237 }
238 _ => {
239 return Err(XrpcErrorResponse::invalid_request(
240 "Resource must be an entry or notebook",
241 ));
242 }
243 }
244 .map_err(|e| {
245 tracing::error!("Failed to get contributors: {}", e);
246 XrpcErrorResponse::internal_error("Database query failed")
247 })?;
248
249 if contributor_dids.is_empty() {
250 return Ok(Json(
251 GetContributorsOutput {
252 contributors: Vec::new(),
253 extra_data: None,
254 }
255 .into_static(),
256 ));
257 }
258
259 // Batch fetch profiles
260 let did_refs: Vec<&str> = contributor_dids.iter().map(|s| s.as_str()).collect();
261 let profiles = state
262 .clickhouse
263 .get_profiles_batch(&did_refs)
264 .await
265 .map_err(|e| {
266 tracing::error!("Failed to batch fetch profiles: {}", e);
267 XrpcErrorResponse::internal_error("Database query failed")
268 })?;
269
270 let profile_map: HashMap<&str, &ProfileRow> =
271 profiles.iter().map(|p| (p.did.as_str(), p)).collect();
272
273 // Build contributor list
274 let mut contributors = Vec::with_capacity(contributor_dids.len());
275 for did in &contributor_dids {
276 if let Some(profile) = profile_map.get(did.as_str()) {
277 contributors.push(profile_to_view_basic(profile)?);
278 }
279 }
280
281 Ok(Json(
282 GetContributorsOutput {
283 contributors,
284 extra_data: None,
285 }
286 .into_static(),
287 ))
288}
289
290/// Handle sh.weaver.edit.listDrafts
291///
292/// Returns draft records for an actor. Requires authentication.
293/// Only returns drafts if viewer is the actor or has collab permission.
294pub async fn list_drafts(
295 State(state): State<AppState>,
296 ExtractOptionalServiceAuth(viewer): ExtractOptionalServiceAuth,
297 ExtractXrpc(args): ExtractXrpc<ListDraftsRequest>,
298) -> Result<Json<ListDraftsOutput<'static>>, XrpcErrorResponse> {
299 // Require authentication
300 let viewer =
301 viewer.ok_or_else(|| XrpcErrorResponse::auth_required("Authentication required"))?;
302 let viewer_did = viewer.did();
303
304 let limit = args.limit.unwrap_or(50).min(100).max(1);
305
306 // Parse cursor as millisecond timestamp
307 let cursor = args
308 .cursor
309 .as_deref()
310 .map(|c| c.parse::<i64>())
311 .transpose()
312 .map_err(|_| XrpcErrorResponse::invalid_request("Invalid cursor format"))?;
313
314 // Resolve actor to DID
315 let actor_did = resolve_actor(&state, &args.actor).await?;
316
317 // Permission check: viewer must be the actor (owner access)
318 // TODO: Add collab grant check for draft sharing
319 if viewer_did.as_str() != actor_did.as_str() {
320 return Err(XrpcErrorResponse::forbidden(
321 "Cannot view another user's drafts",
322 ));
323 }
324
325 // Fetch drafts
326 let draft_rows = state
327 .clickhouse
328 .list_drafts(&actor_did, cursor, limit + 1)
329 .await
330 .map_err(|e| {
331 tracing::error!("Failed to list drafts: {}", e);
332 XrpcErrorResponse::internal_error("Database query failed")
333 })?;
334
335 // Check if there are more results
336 let has_more = draft_rows.len() > limit as usize;
337 let draft_rows: Vec<_> = draft_rows.into_iter().take(limit as usize).collect();
338
339 // Build draft views
340 let mut drafts = Vec::with_capacity(draft_rows.len());
341 for row in &draft_rows {
342 let uri = AtUri::new(&format!(
343 "at://{}/sh.weaver.edit.draft/{}",
344 row.did, row.rkey
345 ))
346 .map_err(|_| XrpcErrorResponse::internal_error("Invalid AT URI"))?
347 .into_static();
348
349 let cid = Cid::new(row.cid.as_bytes())
350 .map_err(|_| XrpcErrorResponse::internal_error("Invalid CID"))?
351 .into_static();
352
353 let created_at = Datetime::new(row.created_at.fixed_offset());
354
355 // Build optional edit root reference
356 let edit_root = if !row.root_cid.is_empty() {
357 let root_uri = AtUri::new(&format!(
358 "at://{}/sh.weaver.edit.root/{}",
359 row.root_did, row.root_rkey
360 ))
361 .map_err(|_| XrpcErrorResponse::internal_error("Invalid root URI"))?
362 .into_static();
363
364 let root_cid = Cid::new(row.root_cid.as_bytes())
365 .map_err(|_| XrpcErrorResponse::internal_error("Invalid root CID"))?
366 .into_static();
367
368 Some(StrongRef::new().uri(root_uri).cid(root_cid).build())
369 } else {
370 None
371 };
372
373 let last_edit_at = row.last_edit_at.map(|dt| Datetime::new(dt.fixed_offset()));
374
375 // Include title if available
376 let title = if row.title.is_empty() {
377 None
378 } else {
379 Some(row.title.to_cowstr().into_static())
380 };
381
382 drafts.push(
383 DraftView::new()
384 .uri(uri)
385 .cid(cid)
386 .created_at(created_at)
387 .maybe_edit_root(edit_root)
388 .maybe_last_edit_at(last_edit_at)
389 .maybe_title(title)
390 .build(),
391 );
392 }
393
394 // Build cursor from last draft's created_at
395 let next_cursor = if has_more {
396 draft_rows
397 .last()
398 .map(|d| d.created_at.timestamp_millis().to_cowstr().into_static())
399 } else {
400 None
401 };
402
403 Ok(Json(
404 ListDraftsOutput {
405 drafts,
406 cursor: next_cursor,
407 extra_data: None,
408 }
409 .into_static(),
410 ))
411}