-790
api/src/api/xrpc_dynamic.rs.bak
-790
api/src/api/xrpc_dynamic.rs.bak
···
1
-
use atproto_client::com::atproto::repo::{
2
-
CreateRecordRequest, CreateRecordResponse, DeleteRecordRequest, PutRecordRequest,
3
-
PutRecordResponse, create_record, delete_record, put_record,
4
-
};
5
-
use axum::{
6
-
extract::{Path, Query, State},
7
-
http::HeaderMap,
8
-
response::Json,
9
-
};
10
-
use chrono::Utc;
11
-
use serde::Deserialize;
12
-
13
-
use crate::AppState;
14
-
use crate::auth::{
15
-
extract_bearer_token, get_atproto_auth_for_user_cached, verify_oauth_token_cached,
16
-
};
17
-
use crate::errors::AppError;
18
-
use crate::models::{
19
-
IndexedRecord, Record, SliceRecordsOutput, SliceRecordsParams, SortField, WhereCondition,
20
-
};
21
-
use std::collections::HashMap;
22
-
23
-
24
-
#[derive(Deserialize)]
25
-
pub struct GetRecordParams {
26
-
pub uri: String,
27
-
pub slice: String,
28
-
}
29
-
30
-
// Dynamic XRPC handler that routes based on method name (for GET requests)
31
-
pub async fn dynamic_xrpc_handler(
32
-
Path(method): Path<String>,
33
-
State(state): State<AppState>,
34
-
Query(params): Query<serde_json::Value>,
35
-
) -> Result<Json<serde_json::Value>, AppError> {
36
-
// Parse the XRPC method (e.g., "social.grain.gallery.getRecords")
37
-
if method.ends_with(".getRecords") {
38
-
let collection = method.trim_end_matches(".getRecords").to_string();
39
-
dynamic_get_records_handler(collection, state, params).await
40
-
} else if method.ends_with(".countRecords") {
41
-
let collection = method.trim_end_matches(".countRecords").to_string();
42
-
dynamic_count_records_handler(collection, state, params).await
43
-
} else if method.ends_with(".getRecord") {
44
-
let collection = method.trim_end_matches(".getRecord").to_string();
45
-
dynamic_get_record_impl(collection, state, params).await
46
-
} else {
47
-
Err(AppError::NotFound("Unknown XRPC method".to_string()))
48
-
}
49
-
}
50
-
51
-
// Dynamic XRPC handler for POST requests (create, update, delete)
52
-
pub async fn dynamic_xrpc_post_handler(
53
-
Path(method): Path<String>,
54
-
State(state): State<AppState>,
55
-
headers: HeaderMap,
56
-
Json(body): Json<serde_json::Value>,
57
-
) -> Result<Json<serde_json::Value>, AppError> {
58
-
// Handle dynamic collection methods (e.g., social.grain.gallery.createRecord)
59
-
if method.ends_with(".getRecords") {
60
-
let collection = method.trim_end_matches(".getRecords").to_string();
61
-
dynamic_get_records_post_handler(collection, state, body).await
62
-
} else if method.ends_with(".countRecords") {
63
-
let collection = method.trim_end_matches(".countRecords").to_string();
64
-
dynamic_count_records_post_handler(collection, state, body).await
65
-
} else if method.ends_with(".createRecord") {
66
-
let collection = method.trim_end_matches(".createRecord").to_string();
67
-
dynamic_collection_create_impl(state, headers, body, collection).await
68
-
} else if method.ends_with(".updateRecord") {
69
-
let collection = method.trim_end_matches(".updateRecord").to_string();
70
-
dynamic_collection_update_impl(state, headers, body, collection).await
71
-
} else if method.ends_with(".deleteRecord") {
72
-
let collection = method.trim_end_matches(".deleteRecord").to_string();
73
-
dynamic_collection_delete_impl(state, headers, body, collection).await
74
-
} else {
75
-
Err(AppError::NotFound("Method not found".to_string()))
76
-
}
77
-
}
78
-
79
-
// Handler for get records using unified where clause approach
80
-
async fn dynamic_get_records_handler(
81
-
collection: String,
82
-
state: AppState,
83
-
params: serde_json::Value,
84
-
) -> Result<Json<serde_json::Value>, AppError> {
85
-
// Parse parameters into SliceRecordsParams format
86
-
let slice = params
87
-
.get("slice")
88
-
.and_then(|v| v.as_str())
89
-
.ok_or(AppError::BadRequest("Missing slice parameter".to_string()))?
90
-
.to_string();
91
-
92
-
let limit = params.get("limit").and_then(|v| {
93
-
if let Some(s) = v.as_str() {
94
-
s.parse::<i32>().ok()
95
-
} else {
96
-
v.as_i64().map(|i| i as i32)
97
-
}
98
-
});
99
-
100
-
let cursor = params
101
-
.get("cursor")
102
-
.and_then(|v| v.as_str())
103
-
.map(|s| s.to_string());
104
-
105
-
// Parse sortBy from params - convert legacy sort string to new array format if present
106
-
let sort_by = params.get("sort").and_then(|v| v.as_str()).map(|sort_str| {
107
-
// Convert legacy "field:direction" format to new array format
108
-
let mut sort_fields = Vec::new();
109
-
for sort_item in sort_str.split(',') {
110
-
let parts: Vec<&str> = sort_item.trim().split(':').collect();
111
-
if parts.len() == 2 {
112
-
sort_fields.push(SortField {
113
-
field: parts[0].trim().to_string(),
114
-
direction: parts[1].trim().to_string(),
115
-
});
116
-
} else if parts.len() == 1 && !parts[0].is_empty() {
117
-
// Default to ascending if no direction specified
118
-
sort_fields.push(SortField {
119
-
field: parts[0].trim().to_string(),
120
-
direction: "asc".to_string(),
121
-
});
122
-
}
123
-
}
124
-
sort_fields
125
-
});
126
-
127
-
// Parse where conditions from query params if present
128
-
let mut where_conditions = HashMap::new();
129
-
130
-
// Handle legacy author/authors params by converting to where clause
131
-
if let Some(author_str) = params.get("author").and_then(|v| v.as_str()) {
132
-
where_conditions.insert(
133
-
"did".to_string(),
134
-
WhereCondition {
135
-
eq: Some(serde_json::Value::String(author_str.to_string())),
136
-
in_values: None,
137
-
contains: None,
138
-
},
139
-
);
140
-
} else if let Some(authors_str) = params.get("authors").and_then(|v| v.as_str()) {
141
-
let authors: Vec<serde_json::Value> = authors_str
142
-
.split(',')
143
-
.map(|s| serde_json::Value::String(s.trim().to_string()))
144
-
.collect();
145
-
where_conditions.insert(
146
-
"did".to_string(),
147
-
WhereCondition {
148
-
eq: None,
149
-
in_values: Some(authors),
150
-
contains: None,
151
-
},
152
-
);
153
-
}
154
-
155
-
// Handle legacy query param by converting to where clause with contains
156
-
if let Some(query_str) = params.get("query").and_then(|v| v.as_str()) {
157
-
let field = params
158
-
.get("field")
159
-
.and_then(|v| v.as_str())
160
-
.unwrap_or("text"); // Default to text field for search
161
-
where_conditions.insert(
162
-
field.to_string(),
163
-
WhereCondition {
164
-
eq: None,
165
-
in_values: None,
166
-
contains: Some(query_str.to_string()),
167
-
},
168
-
);
169
-
}
170
-
171
-
// Add collection filter to where conditions
172
-
where_conditions.insert(
173
-
"collection".to_string(),
174
-
WhereCondition {
175
-
eq: Some(serde_json::Value::String(collection.clone())),
176
-
in_values: None,
177
-
contains: None,
178
-
},
179
-
);
180
-
181
-
let where_clause = Some(crate::models::WhereClause {
182
-
conditions: where_conditions,
183
-
or_conditions: None,
184
-
});
185
-
186
-
let records_params = SliceRecordsParams {
187
-
slice,
188
-
limit,
189
-
cursor,
190
-
where_clause,
191
-
sort_by: sort_by.clone(),
192
-
};
193
-
194
-
// First verify the collection belongs to this slice
195
-
let slice_collections = state
196
-
.database
197
-
.get_slice_collections_list(&records_params.slice)
198
-
.await
199
-
.map_err(|_| AppError::Internal("Database error".to_string()))?;
200
-
201
-
// Special handling: network.slices.lexicon is always allowed as it defines the schema
202
-
if collection != "network.slices.lexicon" && !slice_collections.contains(&collection) {
203
-
return Err(AppError::NotFound("Collection not found".to_string()));
204
-
}
205
-
206
-
// Use the unified database method
207
-
match state
208
-
.database
209
-
.get_slice_collections_records(
210
-
&records_params.slice,
211
-
records_params.limit,
212
-
records_params.cursor.as_deref(),
213
-
sort_by.as_ref(),
214
-
records_params.where_clause.as_ref(),
215
-
)
216
-
.await
217
-
{
218
-
Ok((records, cursor)) => {
219
-
// No need to filter - collection filter is in the SQL query now
220
-
221
-
let indexed_records: Vec<IndexedRecord> = records
222
-
.into_iter()
223
-
.map(|record| IndexedRecord {
224
-
uri: record.uri,
225
-
cid: record.cid,
226
-
did: record.did,
227
-
collection: record.collection,
228
-
value: record.json,
229
-
indexed_at: record.indexed_at.to_rfc3339(),
230
-
})
231
-
.collect();
232
-
233
-
let output = SliceRecordsOutput {
234
-
records: indexed_records,
235
-
cursor,
236
-
};
237
-
238
-
Ok(Json(
239
-
serde_json::to_value(output).map_err(|_| AppError::Internal("Serialization error".to_string()))?,
240
-
))
241
-
}
242
-
Err(_) => Err(AppError::Internal("Database error".to_string())),
243
-
}
244
-
}
245
-
246
-
// Implementation for get record
247
-
async fn dynamic_get_record_impl(
248
-
collection: String,
249
-
state: AppState,
250
-
params: serde_json::Value,
251
-
) -> Result<Json<serde_json::Value>, AppError> {
252
-
let get_params: GetRecordParams =
253
-
serde_json::from_value(params).map_err(|_| AppError::BadRequest("Invalid parameters".to_string()))?;
254
-
255
-
// First verify the collection belongs to this slice
256
-
let slice_collections = state
257
-
.database
258
-
.get_slice_collections_list(&get_params.slice)
259
-
.await
260
-
.map_err(|_| AppError::Internal("Database error".to_string()))?;
261
-
262
-
// Special handling: network.slices.lexicon is always allowed as it defines the schema
263
-
if collection != "network.slices.lexicon" && !slice_collections.contains(&collection) {
264
-
return Err(AppError::NotFound("Collection not found".to_string()));
265
-
}
266
-
267
-
// Use direct database query by URI for efficiency
268
-
match state.database.get_record(&get_params.uri).await {
269
-
Ok(Some(record)) => {
270
-
let json_value =
271
-
serde_json::to_value(record).map_err(|_| AppError::Internal("Serialization error".to_string()))?;
272
-
Ok(Json(json_value))
273
-
}
274
-
Ok(None) => Err(AppError::NotFound("Record not found".to_string())),
275
-
Err(_e) => Err(AppError::Internal("Database error".to_string())),
276
-
}
277
-
}
278
-
279
-
// Handler for get records via POST with JSON body
280
-
async fn dynamic_get_records_post_handler(
281
-
collection: String,
282
-
state: AppState,
283
-
body: serde_json::Value,
284
-
) -> Result<Json<serde_json::Value>, AppError> {
285
-
// Parse the JSON body into SliceRecordsParams
286
-
let mut records_params: SliceRecordsParams = serde_json::from_value(body).map_err(|_| AppError::BadRequest("Invalid request body".to_string()))?;
287
-
288
-
// First verify the collection belongs to this slice
289
-
let slice_collections = state
290
-
.database
291
-
.get_slice_collections_list(&records_params.slice)
292
-
.await
293
-
.map_err(|_| AppError::Internal("Database error".to_string()))?;
294
-
295
-
// Special handling: network.slices.lexicon is always allowed as it defines the schema
296
-
if collection != "network.slices.lexicon" && !slice_collections.contains(&collection) {
297
-
return Err(AppError::NotFound("Collection not found".to_string()));
298
-
}
299
-
300
-
// Add collection filter to where conditions
301
-
let mut where_clause = records_params
302
-
.where_clause
303
-
.unwrap_or(crate::models::WhereClause {
304
-
conditions: HashMap::new(),
305
-
or_conditions: None,
306
-
});
307
-
where_clause.conditions.insert(
308
-
"collection".to_string(),
309
-
WhereCondition {
310
-
eq: Some(serde_json::Value::String(collection.clone())),
311
-
in_values: None,
312
-
contains: None,
313
-
},
314
-
);
315
-
records_params.where_clause = Some(where_clause);
316
-
317
-
// Use the unified database method
318
-
match state
319
-
.database
320
-
.get_slice_collections_records(
321
-
&records_params.slice,
322
-
records_params.limit,
323
-
records_params.cursor.as_deref(),
324
-
records_params.sort_by.as_ref(),
325
-
records_params.where_clause.as_ref(),
326
-
)
327
-
.await
328
-
{
329
-
Ok((records, cursor)) => {
330
-
// No need to filter - collection filter is in the SQL query now
331
-
332
-
// Transform Record to IndexedRecord for the response
333
-
let indexed_records: Vec<IndexedRecord> = records
334
-
.into_iter()
335
-
.map(|record| IndexedRecord {
336
-
uri: record.uri,
337
-
cid: record.cid,
338
-
did: record.did,
339
-
collection: record.collection,
340
-
value: record.json,
341
-
indexed_at: record.indexed_at.to_rfc3339(),
342
-
})
343
-
.collect();
344
-
345
-
let output = SliceRecordsOutput {
346
-
records: indexed_records,
347
-
cursor,
348
-
};
349
-
350
-
Ok(Json(serde_json::to_value(output).map_err(|e| AppError::Internal(format!("Serialization error: {}", e)))?))
351
-
}
352
-
Err(e) => Err(AppError::Internal(format!("Database error: {}", e))),
353
-
}
354
-
}
355
-
356
-
// Dynamic count records handler for GET requests
357
-
async fn dynamic_count_records_handler(
358
-
collection: String,
359
-
state: AppState,
360
-
params: serde_json::Value,
361
-
) -> Result<Json<serde_json::Value>, AppError> {
362
-
// Convert query parameters to SliceRecordsParams
363
-
let mut records_params: SliceRecordsParams =
364
-
serde_json::from_value(params).map_err(|_| AppError::BadRequest("Invalid parameters".to_string()))?;
365
-
366
-
// First verify the collection belongs to this slice
367
-
let slice_collections = state
368
-
.database
369
-
.get_slice_collections_list(&records_params.slice)
370
-
.await
371
-
.map_err(|_| AppError::Internal("Database error".to_string()))?;
372
-
373
-
// Special handling: network.slices.lexicon is always allowed as it defines the schema
374
-
if collection != "network.slices.lexicon" && !slice_collections.contains(&collection) {
375
-
return Err(AppError::NotFound("Collection not found".to_string()));
376
-
}
377
-
378
-
// Add collection filter to where conditions
379
-
let mut where_clause = records_params
380
-
.where_clause
381
-
.unwrap_or(crate::models::WhereClause {
382
-
conditions: HashMap::new(),
383
-
or_conditions: None,
384
-
});
385
-
where_clause.conditions.insert(
386
-
"collection".to_string(),
387
-
WhereCondition {
388
-
eq: Some(collection.clone().into()),
389
-
contains: None,
390
-
in_values: None,
391
-
},
392
-
);
393
-
records_params.where_clause = Some(where_clause);
394
-
395
-
match state
396
-
.database
397
-
.count_slice_collections_records(
398
-
&records_params.slice,
399
-
records_params.where_clause.as_ref(),
400
-
)
401
-
.await
402
-
{
403
-
Ok(count) => Ok(Json(serde_json::json!({
404
-
"success": true,
405
-
"count": count,
406
-
"message": null
407
-
}))),
408
-
Err(_) => Ok(Json(serde_json::json!({
409
-
"success": false,
410
-
"count": 0,
411
-
"message": "Failed to count records"
412
-
}))),
413
-
}
414
-
}
415
-
416
-
// Dynamic count records handler for POST requests
417
-
async fn dynamic_count_records_post_handler(
418
-
collection: String,
419
-
state: AppState,
420
-
body: serde_json::Value,
421
-
) -> Result<Json<serde_json::Value>, AppError> {
422
-
// Parse the JSON body into SliceRecordsParams
423
-
let mut records_params: SliceRecordsParams = serde_json::from_value(body).map_err(|_| AppError::BadRequest("Invalid request body".to_string()))?;
424
-
425
-
// First verify the collection belongs to this slice
426
-
let slice_collections = state
427
-
.database
428
-
.get_slice_collections_list(&records_params.slice)
429
-
.await
430
-
.map_err(|_| AppError::Internal("Database error".to_string()))?;
431
-
432
-
// Special handling: network.slices.lexicon is always allowed as it defines the schema
433
-
if collection != "network.slices.lexicon" && !slice_collections.contains(&collection) {
434
-
return Err(AppError::NotFound("Collection not found".to_string()));
435
-
}
436
-
437
-
// Add collection filter to where conditions
438
-
let mut where_clause = records_params
439
-
.where_clause
440
-
.unwrap_or(crate::models::WhereClause {
441
-
conditions: HashMap::new(),
442
-
or_conditions: None,
443
-
});
444
-
where_clause.conditions.insert(
445
-
"collection".to_string(),
446
-
WhereCondition {
447
-
eq: Some(collection.clone().into()),
448
-
in_values: None,
449
-
contains: None,
450
-
},
451
-
);
452
-
records_params.where_clause = Some(where_clause);
453
-
454
-
match state
455
-
.database
456
-
.count_slice_collections_records(
457
-
&records_params.slice,
458
-
records_params.where_clause.as_ref(),
459
-
)
460
-
.await
461
-
{
462
-
Ok(count) => Ok(Json(serde_json::json!({
463
-
"success": true,
464
-
"count": count,
465
-
"message": null
466
-
}))),
467
-
Err(_) => Ok(Json(serde_json::json!({
468
-
"success": false,
469
-
"count": 0,
470
-
"message": "Failed to count records"
471
-
}))),
472
-
}
473
-
}
474
-
475
-
// Dynamic collection create (e.g., social.grain.gallery.createRecord)
476
-
async fn dynamic_collection_create_impl(
477
-
state: AppState,
478
-
headers: HeaderMap,
479
-
body: serde_json::Value,
480
-
collection: String,
481
-
) -> Result<Json<serde_json::Value>, AppError> {
482
-
// Extract and verify OAuth token
483
-
let token = extract_bearer_token(&headers).map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
484
-
let user_info = verify_oauth_token_cached(
485
-
&token,
486
-
&state.config.auth_base_url,
487
-
Some(state.auth_cache.clone()),
488
-
)
489
-
.await
490
-
.map_err(|_| AppError::AuthRequired("Invalid token".to_string()))?;
491
-
492
-
// Get AT Protocol DPoP auth and PDS URL (with caching)
493
-
let (dpop_auth, pds_url) = get_atproto_auth_for_user_cached(
494
-
&token,
495
-
&state.config.auth_base_url,
496
-
Some(state.auth_cache.clone()),
497
-
)
498
-
.await
499
-
.map_err(|_| AppError::AuthRequired("Invalid token".to_string()))?;
500
-
501
-
// Extract the repo DID from user info
502
-
let repo = user_info.did.unwrap_or(user_info.sub);
503
-
504
-
// Create HTTP client
505
-
let http_client = reqwest::Client::new();
506
-
507
-
// Extract slice URI, rkey, and record value from structured body
508
-
let slice_uri = body
509
-
.get("slice")
510
-
.and_then(|v| v.as_str())
511
-
.ok_or_else(|| AppError::BadRequest("Missing parameter".to_string()))?
512
-
.to_string();
513
-
514
-
let record_key = body
515
-
.get("rkey")
516
-
.and_then(|v| v.as_str())
517
-
.filter(|s| !s.is_empty()) // Filter out empty strings
518
-
.map(|s| s.to_string());
519
-
520
-
let record_data = body
521
-
.get("record")
522
-
.ok_or_else(|| AppError::BadRequest("Missing parameter".to_string()))?
523
-
.clone();
524
-
525
-
// Validate the record against its lexicon
526
-
527
-
// For network.slices.lexicon collection, validate against the system slice
528
-
let validation_slice_uri = if collection == "network.slices.lexicon" {
529
-
&state.config.system_slice_uri
530
-
} else {
531
-
&slice_uri
532
-
};
533
-
534
-
// Get lexicons for validation
535
-
match state
536
-
.database
537
-
.get_lexicons_by_slice(validation_slice_uri)
538
-
.await
539
-
{
540
-
Ok(lexicons) if !lexicons.is_empty() => {
541
-
if let Err(e) =
542
-
slices_lexicon::validate_record(lexicons, &collection, record_data.clone())
543
-
{
544
-
return Err(AppError::BadRequest(format!("ValidationError: {}", e)));
545
-
}
546
-
}
547
-
_ => {
548
-
// If no lexicons found, continue without validation (backwards compatibility)
549
-
}
550
-
}
551
-
552
-
// Create record using AT Protocol functions with DPoP
553
-
554
-
let create_request = CreateRecordRequest {
555
-
repo: repo.clone(),
556
-
collection: collection.clone(),
557
-
record_key,
558
-
record: record_data.clone(),
559
-
swap_commit: None,
560
-
validate: false,
561
-
};
562
-
563
-
let result = create_record(
564
-
&http_client,
565
-
&atproto_client::client::Auth::DPoP(dpop_auth),
566
-
&pds_url,
567
-
create_request,
568
-
)
569
-
.await
570
-
.map_err(|_e| AppError::Internal("AT Protocol request failed".to_string()))?;
571
-
572
-
// Extract URI and CID from the response enum
573
-
let (uri, cid) = match result {
574
-
CreateRecordResponse::StrongRef { uri, cid, .. } => (uri, cid),
575
-
CreateRecordResponse::Error(_e) => {
576
-
return Err(AppError::Internal("AT Protocol response error".to_string()));
577
-
}
578
-
};
579
-
580
-
// Also store in local database for indexing
581
-
let record = Record {
582
-
uri: uri.clone(),
583
-
cid: cid.clone(),
584
-
did: repo,
585
-
collection,
586
-
json: record_data,
587
-
indexed_at: Utc::now(),
588
-
slice_uri: Some(slice_uri),
589
-
};
590
-
591
-
// Store in local database (ignore errors as AT Protocol operation succeeded)
592
-
let _insert_result = state.database.insert_record(&record).await;
593
-
594
-
Ok(Json(serde_json::json!({
595
-
"uri": uri,
596
-
"cid": cid,
597
-
})))
598
-
}
599
-
600
-
// Dynamic collection update (e.g., social.grain.gallery.updateRecord)
601
-
async fn dynamic_collection_update_impl(
602
-
state: AppState,
603
-
headers: HeaderMap,
604
-
body: serde_json::Value,
605
-
collection: String,
606
-
) -> Result<Json<serde_json::Value>, AppError> {
607
-
// Extract and verify OAuth token
608
-
let token = extract_bearer_token(&headers).map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
609
-
let user_info = verify_oauth_token_cached(
610
-
&token,
611
-
&state.config.auth_base_url,
612
-
Some(state.auth_cache.clone()),
613
-
)
614
-
.await
615
-
.map_err(|_| AppError::AuthRequired("Invalid token".to_string()))?;
616
-
617
-
// Get AT Protocol DPoP auth and PDS URL (with caching)
618
-
let (dpop_auth, pds_url) = get_atproto_auth_for_user_cached(
619
-
&token,
620
-
&state.config.auth_base_url,
621
-
Some(state.auth_cache.clone()),
622
-
)
623
-
.await
624
-
.map_err(|_| AppError::AuthRequired("Invalid token".to_string()))?;
625
-
626
-
// Extract slice URI, rkey, and record value from structured body
627
-
let slice_uri = body
628
-
.get("slice")
629
-
.and_then(|v| v.as_str())
630
-
.ok_or_else(|| AppError::BadRequest("Missing parameter".to_string()))?
631
-
.to_string();
632
-
633
-
let rkey = body
634
-
.get("rkey")
635
-
.and_then(|v| v.as_str())
636
-
.ok_or_else(|| AppError::BadRequest("Missing parameter".to_string()))?
637
-
.to_string();
638
-
639
-
let record_data = body
640
-
.get("record")
641
-
.ok_or_else(|| AppError::BadRequest("Missing parameter".to_string()))?
642
-
.clone();
643
-
644
-
// Extract repo from user info
645
-
let repo = user_info.did.unwrap_or(user_info.sub);
646
-
647
-
// Validate the record against its lexicon
648
-
649
-
// For network.slices.lexicon collection, validate against the system slice
650
-
let validation_slice_uri = if collection == "network.slices.lexicon" {
651
-
&state.config.system_slice_uri
652
-
} else {
653
-
&slice_uri
654
-
};
655
-
656
-
// Get lexicons for validation
657
-
match state
658
-
.database
659
-
.get_lexicons_by_slice(validation_slice_uri)
660
-
.await
661
-
{
662
-
Ok(lexicons) if !lexicons.is_empty() => {
663
-
if let Err(e) =
664
-
slices_lexicon::validate_record(lexicons, &collection, record_data.clone())
665
-
{
666
-
return Err(AppError::BadRequest(format!("ValidationError: {}", e)));
667
-
}
668
-
}
669
-
_ => {
670
-
// If no lexicons found, continue without validation (backwards compatibility)
671
-
}
672
-
}
673
-
674
-
// Create HTTP client
675
-
let http_client = reqwest::Client::new();
676
-
677
-
// Update record using AT Protocol functions with DPoP
678
-
let put_request = PutRecordRequest {
679
-
repo: repo.clone(),
680
-
collection: collection.clone(),
681
-
record_key: rkey,
682
-
record: record_data.clone(),
683
-
swap_record: None,
684
-
swap_commit: None,
685
-
validate: false,
686
-
};
687
-
688
-
let result = put_record(
689
-
&http_client,
690
-
&atproto_client::client::Auth::DPoP(dpop_auth),
691
-
&pds_url,
692
-
put_request,
693
-
)
694
-
.await
695
-
.map_err(|_| AppError::Internal("AT Protocol request failed".to_string()))?;
696
-
697
-
// Extract URI and CID from the response enum
698
-
let (uri, cid) = match result {
699
-
PutRecordResponse::StrongRef { uri, cid, .. } => (uri, cid),
700
-
PutRecordResponse::Error(_) => {
701
-
return Err(AppError::Internal("AT Protocol response error".to_string()));
702
-
}
703
-
};
704
-
705
-
// Also update in local database for indexing
706
-
let record = Record {
707
-
uri: uri.clone(),
708
-
cid: cid.clone(),
709
-
did: repo,
710
-
collection,
711
-
json: record_data,
712
-
indexed_at: Utc::now(),
713
-
slice_uri: Some(slice_uri),
714
-
};
715
-
716
-
// Update in local database (ignore errors as AT Protocol operation succeeded)
717
-
let _ = state.database.update_record(&record).await;
718
-
719
-
Ok(Json(serde_json::json!({
720
-
"uri": uri,
721
-
"cid": cid,
722
-
})))
723
-
}
724
-
725
-
// Dynamic collection delete (e.g., social.grain.gallery.delete)
726
-
async fn dynamic_collection_delete_impl(
727
-
state: AppState,
728
-
headers: HeaderMap,
729
-
body: serde_json::Value,
730
-
collection: String,
731
-
) -> Result<Json<serde_json::Value>, AppError> {
732
-
// Extract and verify OAuth token
733
-
let token = extract_bearer_token(&headers).map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
734
-
let user_info = verify_oauth_token_cached(
735
-
&token,
736
-
&state.config.auth_base_url,
737
-
Some(state.auth_cache.clone()),
738
-
)
739
-
.await
740
-
.map_err(|_| AppError::AuthRequired("Invalid token".to_string()))?;
741
-
742
-
// Get AT Protocol DPoP auth and PDS URL (with caching)
743
-
let (dpop_auth, pds_url) = get_atproto_auth_for_user_cached(
744
-
&token,
745
-
&state.config.auth_base_url,
746
-
Some(state.auth_cache.clone()),
747
-
)
748
-
.await
749
-
.map_err(|_| AppError::AuthRequired("Invalid token".to_string()))?;
750
-
751
-
// Extract repo and rkey from body
752
-
let repo = user_info.did.unwrap_or(user_info.sub);
753
-
let rkey = body["rkey"]
754
-
.as_str()
755
-
.ok_or_else(|| AppError::BadRequest("Missing parameter".to_string()))?
756
-
.to_string();
757
-
758
-
// Create HTTP client
759
-
let http_client = reqwest::Client::new();
760
-
761
-
// Delete record using AT Protocol functions with DPoP
762
-
let delete_request = DeleteRecordRequest {
763
-
repo: repo.clone(),
764
-
collection: collection.clone(),
765
-
record_key: rkey.clone(),
766
-
swap_record: None,
767
-
swap_commit: None,
768
-
};
769
-
770
-
delete_record(
771
-
&http_client,
772
-
&atproto_client::client::Auth::DPoP(dpop_auth),
773
-
&pds_url,
774
-
delete_request,
775
-
)
776
-
.await
777
-
.map_err(|_| AppError::Internal("AT Protocol request failed".to_string()))?;
778
-
779
-
// Also delete from local database (from all slices)
780
-
let uri = format!("at://{}/{}/{}", repo, collection, rkey);
781
-
782
-
// Handle cascade deletion before deleting the record
783
-
if let Err(e) = state.database.handle_cascade_deletion(&uri, &collection).await {
784
-
tracing::warn!("Cascade deletion failed for {}: {}", uri, e);
785
-
}
786
-
787
-
let _ = state.database.delete_record_by_uri(&uri, None).await;
788
-
789
-
Ok(Json(serde_json::json!({})))
790
-
}