+2
api/migrations/012_pg_trgm_extension.sql
+2
api/migrations/012_pg_trgm_extension.sql
+60
-43
api/src/api/xrpc_dynamic.rs
+60
-43
api/src/api/xrpc_dynamic.rs
···
20
20
};
21
21
use std::collections::HashMap;
22
22
23
-
24
23
#[derive(Deserialize)]
25
24
pub struct GetRecordParams {
26
25
pub uri: String,
···
132
131
where_conditions.insert(
133
132
"did".to_string(),
134
133
WhereCondition {
134
+
eq: Some(serde_json::Value::String(author_str.to_string())),
135
+
in_values: None,
136
+
contains: None,
137
+
fuzzy: None,
135
138
gt: None,
136
139
gte: None,
137
140
lt: None,
138
141
lte: None,
139
-
eq: Some(serde_json::Value::String(author_str.to_string())),
140
-
in_values: None,
141
-
contains: None,
142
142
},
143
143
);
144
144
} else if let Some(authors_str) = params.get("authors").and_then(|v| v.as_str()) {
···
149
149
where_conditions.insert(
150
150
"did".to_string(),
151
151
WhereCondition {
152
+
eq: None,
153
+
in_values: Some(authors),
154
+
contains: None,
155
+
fuzzy: None,
152
156
gt: None,
153
157
gte: None,
154
158
lt: None,
155
159
lte: None,
156
-
eq: None,
157
-
in_values: Some(authors),
158
-
contains: None,
159
160
},
160
161
);
161
162
}
···
169
170
where_conditions.insert(
170
171
field.to_string(),
171
172
WhereCondition {
173
+
eq: None,
174
+
in_values: None,
175
+
contains: Some(query_str.to_string()),
176
+
fuzzy: None,
172
177
gt: None,
173
178
gte: None,
174
179
lt: None,
175
180
lte: None,
176
-
eq: None,
177
-
in_values: None,
178
-
contains: Some(query_str.to_string()),
179
181
},
180
182
);
181
183
}
···
184
186
where_conditions.insert(
185
187
"collection".to_string(),
186
188
WhereCondition {
187
-
gt: None,
188
-
gte: None,
189
-
lt: None,
190
-
lte: None,
191
189
eq: Some(serde_json::Value::String(collection.clone())),
192
190
in_values: None,
193
191
contains: None,
192
+
fuzzy: None,
193
+
gt: None,
194
+
gte: None,
195
+
lt: None,
196
+
lte: None,
194
197
},
195
198
);
196
199
···
253
256
cursor,
254
257
};
255
258
256
-
Ok(Json(
257
-
serde_json::to_value(output).map_err(|_| AppError::Internal("Serialization error".to_string()))?,
258
-
))
259
+
Ok(Json(serde_json::to_value(output).map_err(|_| {
260
+
AppError::Internal("Serialization error".to_string())
261
+
})?))
259
262
}
260
263
Err(_) => Err(AppError::Internal("Database error".to_string())),
261
264
}
···
267
270
state: AppState,
268
271
params: serde_json::Value,
269
272
) -> Result<Json<serde_json::Value>, AppError> {
270
-
let get_params: GetRecordParams =
271
-
serde_json::from_value(params).map_err(|_| AppError::BadRequest("Invalid parameters".to_string()))?;
273
+
let get_params: GetRecordParams = serde_json::from_value(params)
274
+
.map_err(|_| AppError::BadRequest("Invalid parameters".to_string()))?;
272
275
273
276
// First verify the collection belongs to this slice
274
277
let slice_collections = state
···
285
288
// Use direct database query by URI for efficiency
286
289
match state.database.get_record(&get_params.uri).await {
287
290
Ok(Some(record)) => {
288
-
let json_value =
289
-
serde_json::to_value(record).map_err(|_| AppError::Internal("Serialization error".to_string()))?;
291
+
let json_value = serde_json::to_value(record)
292
+
.map_err(|_| AppError::Internal("Serialization error".to_string()))?;
290
293
Ok(Json(json_value))
291
294
}
292
295
Ok(None) => Err(AppError::NotFound("Record not found".to_string())),
···
301
304
body: serde_json::Value,
302
305
) -> Result<Json<serde_json::Value>, AppError> {
303
306
// Parse the JSON body into SliceRecordsParams
304
-
let mut records_params: SliceRecordsParams = serde_json::from_value(body).map_err(|_| AppError::BadRequest("Invalid request body".to_string()))?;
307
+
let mut records_params: SliceRecordsParams = serde_json::from_value(body)
308
+
.map_err(|_| AppError::BadRequest("Invalid request body".to_string()))?;
305
309
306
310
// First verify the collection belongs to this slice
307
311
let slice_collections = state
···
327
331
where_clause.conditions.insert(
328
332
"collection".to_string(),
329
333
WhereCondition {
330
-
gt: None,
331
-
gte: None,
332
-
lt: None,
333
-
lte: None,
334
334
eq: Some(serde_json::Value::String(collection.clone())),
335
335
in_values: None,
336
336
contains: None,
337
+
fuzzy: None,
338
+
gt: None,
339
+
gte: None,
340
+
lt: None,
341
+
lte: None,
337
342
},
338
343
);
339
344
records_params.where_clause = Some(where_clause);
···
371
376
cursor,
372
377
};
373
378
374
-
Ok(Json(serde_json::to_value(output).map_err(|e| AppError::Internal(format!("Serialization error: {}", e)))?))
379
+
Ok(Json(serde_json::to_value(output).map_err(|e| {
380
+
AppError::Internal(format!("Serialization error: {}", e))
381
+
})?))
375
382
}
376
383
Err(e) => Err(AppError::Internal(format!("Database error: {}", e))),
377
384
}
···
384
391
params: serde_json::Value,
385
392
) -> Result<Json<serde_json::Value>, AppError> {
386
393
// Convert query parameters to SliceRecordsParams
387
-
let mut records_params: SliceRecordsParams =
388
-
serde_json::from_value(params).map_err(|_| AppError::BadRequest("Invalid parameters".to_string()))?;
394
+
let mut records_params: SliceRecordsParams = serde_json::from_value(params)
395
+
.map_err(|_| AppError::BadRequest("Invalid parameters".to_string()))?;
389
396
390
397
// First verify the collection belongs to this slice
391
398
let slice_collections = state
···
411
418
where_clause.conditions.insert(
412
419
"collection".to_string(),
413
420
WhereCondition {
414
-
gt: None,
415
-
gte: None,
416
-
lt: None,
417
-
lte: None,
418
421
eq: Some(collection.clone().into()),
422
+
in_values: None,
419
423
contains: None,
420
-
in_values: None,
424
+
fuzzy: None,
425
+
gt: None,
426
+
gte: None,
427
+
lt: None,
428
+
lte: None,
421
429
},
422
430
);
423
431
records_params.where_clause = Some(where_clause);
···
450
458
body: serde_json::Value,
451
459
) -> Result<Json<serde_json::Value>, AppError> {
452
460
// Parse the JSON body into SliceRecordsParams
453
-
let mut records_params: SliceRecordsParams = serde_json::from_value(body).map_err(|_| AppError::BadRequest("Invalid request body".to_string()))?;
461
+
let mut records_params: SliceRecordsParams = serde_json::from_value(body)
462
+
.map_err(|_| AppError::BadRequest("Invalid request body".to_string()))?;
454
463
455
464
// First verify the collection belongs to this slice
456
465
let slice_collections = state
···
476
485
where_clause.conditions.insert(
477
486
"collection".to_string(),
478
487
WhereCondition {
479
-
gt: None,
480
-
gte: None,
481
-
lt: None,
482
-
lte: None,
483
488
eq: Some(collection.clone().into()),
484
489
in_values: None,
485
490
contains: None,
491
+
fuzzy: None,
492
+
gt: None,
493
+
gte: None,
494
+
lt: None,
495
+
lte: None,
486
496
},
487
497
);
488
498
records_params.where_clause = Some(where_clause);
···
516
526
collection: String,
517
527
) -> Result<Json<serde_json::Value>, AppError> {
518
528
// Extract and verify OAuth token
519
-
let token = extract_bearer_token(&headers).map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
529
+
let token = extract_bearer_token(&headers)
530
+
.map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
520
531
let user_info = verify_oauth_token_cached(
521
532
&token,
522
533
&state.config.auth_base_url,
···
641
652
collection: String,
642
653
) -> Result<Json<serde_json::Value>, AppError> {
643
654
// Extract and verify OAuth token
644
-
let token = extract_bearer_token(&headers).map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
655
+
let token = extract_bearer_token(&headers)
656
+
.map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
645
657
let user_info = verify_oauth_token_cached(
646
658
&token,
647
659
&state.config.auth_base_url,
···
766
778
collection: String,
767
779
) -> Result<Json<serde_json::Value>, AppError> {
768
780
// Extract and verify OAuth token
769
-
let token = extract_bearer_token(&headers).map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
781
+
let token = extract_bearer_token(&headers)
782
+
.map_err(|_| AppError::AuthRequired("Missing bearer token".to_string()))?;
770
783
let user_info = verify_oauth_token_cached(
771
784
&token,
772
785
&state.config.auth_base_url,
···
816
829
let uri = format!("at://{}/{}/{}", repo, collection, rkey);
817
830
818
831
// Handle cascade deletion before deleting the record
819
-
if let Err(e) = state.database.handle_cascade_deletion(&uri, &collection).await {
832
+
if let Err(e) = state
833
+
.database
834
+
.handle_cascade_deletion(&uri, &collection)
835
+
.await
836
+
{
820
837
tracing::warn!("Cascade deletion failed for {}: {}", uri, e);
821
838
}
822
839
+5
-11
api/src/database/actors.rs
+5
-11
api/src/database/actors.rs
···
66
66
let mut param_count = 2;
67
67
68
68
// Build WHERE conditions for actors (handle table columns properly)
69
-
let (and_conditions, or_conditions) = build_actor_where_conditions(where_clause, &mut param_count);
69
+
let (and_conditions, or_conditions) =
70
+
build_actor_where_conditions(where_clause, &mut param_count);
70
71
where_clauses.extend(and_conditions);
71
72
72
73
if !or_conditions.is_empty() {
···
90
91
ORDER BY did ASC
91
92
LIMIT ${}
92
93
"#,
93
-
where_sql,
94
-
param_count
94
+
where_sql, param_count
95
95
);
96
96
97
97
let mut sqlx_query = sqlx::query_as::<_, Actor>(&query);
···
164
164
Ok((records, cursor))
165
165
}
166
166
167
-
168
167
/// Gets all actors across all slices.
169
168
///
170
169
/// # Returns
···
236
235
///
237
236
/// # Returns
238
237
/// Number of actors deleted
239
-
pub async fn delete_all_actors_for_slice(
240
-
&self,
241
-
slice_uri: &str,
242
-
) -> Result<u64, DatabaseError> {
238
+
pub async fn delete_all_actors_for_slice(&self, slice_uri: &str) -> Result<u64, DatabaseError> {
243
239
let result = sqlx::query!(
244
240
r#"
245
241
DELETE FROM actor
···
269
265
return Ok(Vec::new());
270
266
}
271
267
272
-
let placeholders: Vec<String> = (1..=handles.len())
273
-
.map(|i| format!("${}", i))
274
-
.collect();
268
+
let placeholders: Vec<String> = (1..=handles.len()).map(|i| format!("${}", i)).collect();
275
269
let query_sql = format!(
276
270
"SELECT DISTINCT did FROM actor WHERE handle = ANY(ARRAY[{}]) AND slice_uri = ${}",
277
271
placeholders.join(", "),
+57
-36
api/src/database/cursor.rs
+57
-36
api/src/database/cursor.rs
···
94
94
///
95
95
/// # Returns
96
96
/// Result containing DecodedCursor or error if decoding fails
97
-
pub fn decode_cursor(cursor: &str, sort_by: Option<&Vec<SortField>>) -> Result<DecodedCursor, String> {
98
-
let decoded_bytes = general_purpose::URL_SAFE_NO_PAD.decode(cursor)
97
+
pub fn decode_cursor(
98
+
cursor: &str,
99
+
sort_by: Option<&Vec<SortField>>,
100
+
) -> Result<DecodedCursor, String> {
101
+
let decoded_bytes = general_purpose::URL_SAFE_NO_PAD
102
+
.decode(cursor)
99
103
.map_err(|e| format!("Failed to decode base64: {}", e))?;
100
-
let decoded_str = String::from_utf8(decoded_bytes)
101
-
.map_err(|e| format!("Invalid UTF-8 in cursor: {}", e))?;
104
+
let decoded_str =
105
+
String::from_utf8(decoded_bytes).map_err(|e| format!("Invalid UTF-8 in cursor: {}", e))?;
102
106
103
107
let parts: Vec<&str> = decoded_str.split('|').collect();
104
108
···
123
127
.map(|s| s.to_string())
124
128
.collect();
125
129
126
-
Ok(DecodedCursor {
127
-
field_values,
128
-
cid,
129
-
})
130
+
Ok(DecodedCursor { field_values, cid })
130
131
}
131
132
132
133
/// Builds cursor-based WHERE conditions for proper multi-field pagination.
···
173
174
for (j, sort_field) in sort_fields.iter().enumerate().take(i) {
174
175
let field = &sort_field.field;
175
176
let cursor_value = &decoded_cursor.field_values[j];
176
-
let is_datetime = field_types.and_then(|types| types.get(j).copied()).unwrap_or(false);
177
+
let is_datetime = field_types
178
+
.and_then(|types| types.get(j).copied())
179
+
.unwrap_or(false);
177
180
178
181
let field_ref = build_field_reference(field, is_datetime);
179
182
let param_cast = if is_datetime { "::timestamp" } else { "" };
···
187
190
let field = &sort_fields[i].field;
188
191
let cursor_value = &decoded_cursor.field_values[i];
189
192
let direction = &sort_fields[i].direction;
190
-
let is_datetime = field_types.and_then(|types| types.get(i).copied()).unwrap_or(false);
193
+
let is_datetime = field_types
194
+
.and_then(|types| types.get(i).copied())
195
+
.unwrap_or(false);
191
196
192
-
let comparison_op = if direction.to_lowercase() == "desc" { "<" } else { ">" };
197
+
let comparison_op = if direction.to_lowercase() == "desc" {
198
+
"<"
199
+
} else {
200
+
">"
201
+
};
193
202
let field_ref = build_field_reference(field, is_datetime);
194
203
let param_cast = if is_datetime { "::timestamp" } else { "" };
195
204
196
-
clause_parts.push(format!("{} {} ${}{}", field_ref, comparison_op, param_count, param_cast));
205
+
clause_parts.push(format!(
206
+
"{} {} ${}{}",
207
+
field_ref, comparison_op, param_count, param_cast
208
+
));
197
209
*param_count += 1;
198
210
bind_values.push(cursor_value.clone());
199
211
···
205
217
let mut final_clause_parts = Vec::new();
206
218
for (j, field) in sort_fields.iter().enumerate() {
207
219
let cursor_value = &decoded_cursor.field_values[j];
208
-
let is_datetime = field_types.and_then(|types| types.get(j).copied()).unwrap_or(false);
220
+
let is_datetime = field_types
221
+
.and_then(|types| types.get(j).copied())
222
+
.unwrap_or(false);
209
223
210
224
let field_ref = build_field_reference(&field.field, is_datetime);
211
225
let param_cast = if is_datetime { "::timestamp" } else { "" };
···
217
231
218
232
// CID comparison uses the direction of the last sort field
219
233
let last_direction = &sort_fields[sort_fields.len() - 1].direction;
220
-
let cid_comparison_op = if last_direction.to_lowercase() == "desc" { "<" } else { ">" };
234
+
let cid_comparison_op = if last_direction.to_lowercase() == "desc" {
235
+
"<"
236
+
} else {
237
+
">"
238
+
};
221
239
222
240
final_clause_parts.push(format!("cid {} ${}", cid_comparison_op, param_count));
223
241
*param_count += 1;
···
301
319
#[test]
302
320
fn test_generate_cursor_with_sort() {
303
321
let record = create_test_record();
304
-
let sort_by = vec![
305
-
SortField {
306
-
field: "text".to_string(),
307
-
direction: "desc".to_string(),
308
-
},
309
-
];
322
+
let sort_by = vec![SortField {
323
+
field: "text".to_string(),
324
+
direction: "desc".to_string(),
325
+
}];
310
326
311
327
let cursor = generate_cursor_from_record(&record, Some(&sort_by));
312
328
let decoded = general_purpose::URL_SAFE_NO_PAD.decode(&cursor).unwrap();
···
317
333
318
334
#[test]
319
335
fn test_decode_cursor_single_field() {
320
-
let sort_by = vec![
321
-
SortField {
322
-
field: "createdAt".to_string(),
323
-
direction: "desc".to_string(),
324
-
},
325
-
];
336
+
let sort_by = vec![SortField {
337
+
field: "createdAt".to_string(),
338
+
direction: "desc".to_string(),
339
+
}];
326
340
327
341
let cursor_content = "2025-01-15T12:00:00Z|bafytest123";
328
342
let cursor = general_purpose::URL_SAFE_NO_PAD.encode(cursor_content);
···
351
365
352
366
let decoded = decode_cursor(&cursor, Some(&sort_by)).unwrap();
353
367
354
-
assert_eq!(decoded.field_values, vec!["Hello world", "2025-01-15T12:00:00Z"]);
368
+
assert_eq!(
369
+
decoded.field_values,
370
+
vec!["Hello world", "2025-01-15T12:00:00Z"]
371
+
);
355
372
assert_eq!(decoded.cid, "bafytest123");
356
373
}
357
374
358
375
#[test]
359
376
fn test_decode_cursor_invalid_format() {
360
-
let sort_by = vec![
361
-
SortField {
362
-
field: "text".to_string(),
363
-
direction: "desc".to_string(),
364
-
},
365
-
];
377
+
let sort_by = vec![SortField {
378
+
field: "text".to_string(),
379
+
direction: "desc".to_string(),
380
+
}];
366
381
367
382
let cursor_content = "bafytest123";
368
383
let cursor = general_purpose::URL_SAFE_NO_PAD.encode(cursor_content);
···
406
421
assert_eq!(extract_field_value(&record, "uri"), record.uri);
407
422
assert_eq!(extract_field_value(&record, "cid"), record.cid);
408
423
assert_eq!(extract_field_value(&record, "did"), record.did);
409
-
assert_eq!(extract_field_value(&record, "collection"), record.collection);
424
+
assert_eq!(
425
+
extract_field_value(&record, "collection"),
426
+
record.collection
427
+
);
410
428
}
411
429
412
430
#[test]
···
414
432
let record = create_test_record();
415
433
416
434
assert_eq!(extract_field_value(&record, "text"), "Hello world");
417
-
assert_eq!(extract_field_value(&record, "createdAt"), "2025-01-15T12:00:00Z");
435
+
assert_eq!(
436
+
extract_field_value(&record, "createdAt"),
437
+
"2025-01-15T12:00:00Z"
438
+
);
418
439
}
419
440
420
441
#[test]
···
431
452
assert_eq!(extract_field_value(&record, "nonexistent"), "NULL");
432
453
assert_eq!(extract_field_value(&record, "nested.nonexistent"), "NULL");
433
454
}
434
-
}
455
+
}
+46
-26
api/src/database/query_builder.rs
+46
-26
api/src/database/query_builder.rs
···
7
7
use super::types::{SortField, WhereClause, WhereCondition};
8
8
use crate::models::Record;
9
9
10
-
11
10
/// Builds an ORDER BY clause with optional datetime field information.
12
11
///
13
12
/// # Arguments
···
39
38
{
40
39
let field_ref = build_field_reference(field, is_datetime);
41
40
42
-
if matches!(field.as_str(), "indexed_at" | "uri" | "cid" | "did" | "collection") {
41
+
if matches!(
42
+
field.as_str(),
43
+
"indexed_at" | "uri" | "cid" | "did" | "collection"
44
+
) {
43
45
order_clauses.push(format!("{field_ref} {direction}"));
44
46
} else {
45
47
order_clauses.push(format!("{field_ref} {direction} NULLS LAST"));
···
200
202
201
203
/// Builds a single SQL condition clause for a field.
202
204
///
203
-
/// Supports equality (eq), array membership (in_values), and pattern matching (contains)
205
+
/// Supports equality (eq), array membership (in_values), pattern matching (contains),
206
+
/// fuzzy matching (fuzzy), and comparison operators (gt, gte, lt, lte)
204
207
/// for both table columns and JSON fields with nested paths.
205
208
///
206
209
/// # Arguments
207
210
/// * `field` - Field name (table column or JSON path)
208
-
/// * `condition` - The condition to apply (eq, in_values, or contains)
211
+
/// * `condition` - The condition to apply
209
212
/// * `param_count` - Mutable counter for parameter numbering
210
213
///
211
214
/// # Returns
···
254
257
};
255
258
*param_count += 1;
256
259
clause
260
+
} else if let Some(_fuzzy_value) = &condition.fuzzy {
261
+
let clause = match field {
262
+
"did" | "collection" | "uri" | "cid" => {
263
+
format!("{} % ${}", field, param_count)
264
+
}
265
+
"json" => {
266
+
format!("json::text % ${}", param_count)
267
+
}
268
+
_ => {
269
+
let json_path = build_json_path(field);
270
+
format!("({}) % ${}", json_path, param_count)
271
+
}
272
+
};
273
+
*param_count += 1;
274
+
clause
257
275
} else if let Some(_gt_value) = &condition.gt {
258
276
let clause = match field {
259
277
"indexed_at" => {
···
407
425
408
426
/// Binds parameters for a single condition to a sqlx query.
409
427
///
410
-
/// Handles eq (single value), in_values (array), and contains (pattern) conditions.
428
+
/// Handles eq (single value), in_values (array), contains (pattern), fuzzy (similarity), and comparison conditions.
411
429
fn bind_single_condition<'q>(
412
430
mut query_builder: sqlx::query::QueryAs<
413
431
'q,
···
435
453
436
454
if let Some(contains_value) = &condition.contains {
437
455
query_builder = query_builder.bind(contains_value);
456
+
}
457
+
458
+
if let Some(fuzzy_value) = &condition.fuzzy {
459
+
query_builder = query_builder.bind(fuzzy_value);
438
460
}
439
461
440
462
if let Some(gt_value) = &condition.gt {
···
484
506
/// # Returns
485
507
/// Query builder with all parameters bound
486
508
pub fn bind_where_parameters_scalar<'q, T>(
487
-
mut query_builder: sqlx::query::QueryScalar<
488
-
'q,
489
-
sqlx::Postgres,
490
-
T,
491
-
sqlx::postgres::PgArguments,
492
-
>,
509
+
mut query_builder: sqlx::query::QueryScalar<'q, sqlx::Postgres, T, sqlx::postgres::PgArguments>,
493
510
where_clause: Option<&'q WhereClause>,
494
511
) -> sqlx::query::QueryScalar<'q, sqlx::Postgres, T, sqlx::postgres::PgArguments>
495
512
where
···
503
520
504
521
/// Recursively binds parameters from a WhereClause to a scalar query.
505
522
fn bind_clause_recursive_scalar<'q, T>(
506
-
mut query_builder: sqlx::query::QueryScalar<
507
-
'q,
508
-
sqlx::Postgres,
509
-
T,
510
-
sqlx::postgres::PgArguments,
511
-
>,
523
+
mut query_builder: sqlx::query::QueryScalar<'q, sqlx::Postgres, T, sqlx::postgres::PgArguments>,
512
524
clause: &'q WhereClause,
513
525
) -> sqlx::query::QueryScalar<'q, sqlx::Postgres, T, sqlx::postgres::PgArguments>
514
526
where
···
545
557
546
558
/// Binds parameters for a single condition to a sqlx scalar query.
547
559
fn bind_single_condition_scalar<'q, T>(
548
-
mut query_builder: sqlx::query::QueryScalar<
549
-
'q,
550
-
sqlx::Postgres,
551
-
T,
552
-
sqlx::postgres::PgArguments,
553
-
>,
560
+
mut query_builder: sqlx::query::QueryScalar<'q, sqlx::Postgres, T, sqlx::postgres::PgArguments>,
554
561
condition: &'q WhereCondition,
555
562
) -> sqlx::query::QueryScalar<'q, sqlx::Postgres, T, sqlx::postgres::PgArguments>
556
563
where
···
576
583
query_builder = query_builder.bind(contains_value);
577
584
}
578
585
586
+
if let Some(fuzzy_value) = &condition.fuzzy {
587
+
query_builder = query_builder.bind(fuzzy_value);
588
+
}
589
+
579
590
if let Some(gt_value) = &condition.gt {
580
591
if let Some(str_val) = gt_value.as_str() {
581
592
query_builder = query_builder.bind(str_val);
···
629
640
}];
630
641
631
642
let result = build_order_by_clause_with_field_info(Some(&sort_by), None);
632
-
assert_eq!(result, "json->>'createdAt' DESC NULLS LAST, indexed_at DESC");
643
+
assert_eq!(
644
+
result,
645
+
"json->>'createdAt' DESC NULLS LAST, indexed_at DESC"
646
+
);
633
647
}
634
648
635
649
#[test]
···
640
654
}];
641
655
642
656
let result = build_order_by_clause_with_field_info(Some(&sort_by), Some(true));
643
-
assert_eq!(result, "(json->>'createdAt')::timestamp DESC NULLS LAST, indexed_at DESC");
657
+
assert_eq!(
658
+
result,
659
+
"(json->>'createdAt')::timestamp DESC NULLS LAST, indexed_at DESC"
660
+
);
644
661
}
645
662
646
663
#[test]
···
662
679
}];
663
680
664
681
let result = build_order_by_clause_with_field_info(Some(&sort_by), None);
665
-
assert_eq!(result, "json->'author'->>'name' ASC NULLS LAST, indexed_at DESC");
682
+
assert_eq!(
683
+
result,
684
+
"json->'author'->>'name' ASC NULLS LAST, indexed_at DESC"
685
+
);
666
686
}
667
687
668
688
#[test]
+89
-28
api/src/database/records.rs
+89
-28
api/src/database/records.rs
···
6
6
7
7
use super::client::Database;
8
8
use super::cursor::{build_cursor_where_condition, decode_cursor, generate_cursor_from_record};
9
-
use super::query_builder::{bind_where_parameters, bind_where_parameters_scalar, build_order_by_clause_with_field_info, build_where_conditions};
9
+
use super::query_builder::{
10
+
bind_where_parameters, bind_where_parameters_scalar, build_order_by_clause_with_field_info,
11
+
build_where_conditions,
12
+
};
10
13
use super::types::{SortField, WhereClause};
11
14
use crate::errors::DatabaseError;
12
15
use crate::models::{IndexedRecord, Record};
···
238
241
Ok(lexicons) => {
239
242
let types: Vec<bool> = sort_fields
240
243
.iter()
241
-
.map(|field| is_field_datetime(&lexicons, collection_name, &field.field))
244
+
.map(|field| {
245
+
is_field_datetime(&lexicons, collection_name, &field.field)
246
+
})
242
247
.collect();
243
248
Some(types)
244
249
}
···
252
257
};
253
258
254
259
// Get first field type for ORDER BY (for backward compatibility)
255
-
let primary_field_is_datetime = field_types.as_ref().and_then(|types| types.first().copied());
260
+
let primary_field_is_datetime = field_types
261
+
.as_ref()
262
+
.and_then(|types| types.first().copied());
256
263
257
264
// Build ORDER BY clause with datetime field information
258
265
let order_by = build_order_by_clause_with_field_info(sort_by, primary_field_is_datetime);
···
306
313
Ok(decoded_cursor) => {
307
314
// Use the datetime field information we already computed
308
315
let field_type_slice = field_types.as_deref();
309
-
let (cursor_where, bind_values) =
310
-
build_cursor_where_condition(&decoded_cursor, sort_by, &mut param_count, field_type_slice);
316
+
let (cursor_where, bind_values) = build_cursor_where_condition(
317
+
&decoded_cursor,
318
+
sort_by,
319
+
&mut param_count,
320
+
field_type_slice,
321
+
);
311
322
where_clauses.push(cursor_where);
312
323
cursor_bind_values = bind_values;
313
324
}
···
323
334
where_clauses.push(or_clause);
324
335
}
325
336
326
-
let where_sql = where_clauses.into_iter().filter(|clause| !clause.is_empty()).collect::<Vec<_>>().join(" AND ");
337
+
let where_sql = where_clauses
338
+
.into_iter()
339
+
.filter(|clause| !clause.is_empty())
340
+
.collect::<Vec<_>>()
341
+
.join(" AND ");
327
342
328
343
// Assign limit parameter AFTER all other parameters
329
344
let limit_param = param_count;
···
361
376
362
377
// Only return cursor if we got a full page, indicating there might be more
363
378
let cursor = if records.len() < limit as usize {
364
-
None // Last page - no more results
379
+
None // Last page - no more results
365
380
} else {
366
381
records
367
382
.last()
···
405
420
where_clauses.push(or_clause);
406
421
}
407
422
408
-
let filtered_where_clauses: Vec<_> = where_clauses.into_iter().filter(|clause| !clause.is_empty()).collect();
423
+
let filtered_where_clauses: Vec<_> = where_clauses
424
+
.into_iter()
425
+
.filter(|clause| !clause.is_empty())
426
+
.collect();
409
427
let where_sql = if filtered_where_clauses.is_empty() {
410
428
String::new()
411
429
} else {
···
456
474
match group_by_field {
457
475
crate::models::GroupByField::Simple(field) => {
458
476
// Check if it's a table column
459
-
if matches!(field.as_str(), "did" | "collection" | "uri" | "cid" | "indexed_at") {
477
+
if matches!(
478
+
field.as_str(),
479
+
"did" | "collection" | "uri" | "cid" | "indexed_at"
480
+
) {
460
481
format!("\"{}\" as field_{}", field, i)
461
482
} else {
462
483
// JSON field
···
469
490
470
491
// Check if it's a table column
471
492
if field == "indexed_at" {
472
-
format!("date_trunc('{}', \"{}\")::text as field_{}", interval_str, field, i)
493
+
format!(
494
+
"date_trunc('{}', \"{}\")::text as field_{}",
495
+
interval_str, field, i
496
+
)
473
497
} else {
474
498
// JSON field - cast to timestamp for date_trunc, then to text
475
-
format!("date_trunc('{}', (json->>'{}')::timestamp)::text as field_{}", interval_str, field, i)
499
+
format!(
500
+
"date_trunc('{}', (json->>'{}')::timestamp)::text as field_{}",
501
+
interval_str, field, i
502
+
)
476
503
}
477
504
}
478
505
}
···
509
536
510
537
let query = format!(
511
538
"SELECT {} FROM record{} GROUP BY {} {} LIMIT {}",
512
-
select_clause, where_sql, group_by_clause.join(", "), order_by_sql, limit
539
+
select_clause,
540
+
where_sql,
541
+
group_by_clause.join(", "),
542
+
order_by_sql,
543
+
limit
513
544
);
514
545
515
546
tracing::debug!("Generated SQL: {}", query);
···
635
666
// Check if it looks like JSON (starts with [ or {)
636
667
if str_val.starts_with('[') || str_val.starts_with('{') {
637
668
// Try to parse as JSON
638
-
serde_json::from_str(str_val).unwrap_or_else(|_| serde_json::Value::String(str_val.clone()))
669
+
serde_json::from_str(str_val)
670
+
.unwrap_or_else(|_| serde_json::Value::String(str_val.clone()))
639
671
} else {
640
672
serde_json::Value::String(str_val.clone())
641
673
}
···
697
729
&self,
698
730
slice_uri: &str,
699
731
) -> Result<u64, DatabaseError> {
700
-
let result = sqlx::query("DELETE FROM record WHERE slice_uri = $1 AND collection NOT LIKE 'network.slices.%'")
701
-
.bind(slice_uri)
702
-
.execute(&self.pool)
703
-
.await?;
732
+
let result = sqlx::query(
733
+
"DELETE FROM record WHERE slice_uri = $1 AND collection NOT LIKE 'network.slices.%'",
734
+
)
735
+
.bind(slice_uri)
736
+
.execute(&self.pool)
737
+
.await?;
704
738
Ok(result.rows_affected())
705
739
}
706
740
···
736
770
/// # Arguments
737
771
/// * `uri` - AT-URI of the deleted record
738
772
/// * `collection` - Collection name (e.g., "network.slices.lexicon")
739
-
pub async fn handle_cascade_deletion(&self, uri: &str, collection: &str) -> Result<(), DatabaseError> {
773
+
pub async fn handle_cascade_deletion(
774
+
&self,
775
+
uri: &str,
776
+
collection: &str,
777
+
) -> Result<(), DatabaseError> {
740
778
match collection {
741
779
"network.slices.lexicon" => {
742
780
// Get the lexicon record to extract collection name and slice URI
743
781
if let Ok(Some(lexicon_record)) = self.get_record(uri).await
744
782
&& let (Some(nsid), Some(slice_uri_from_record)) = (
745
783
lexicon_record.value.get("nsid").and_then(|v| v.as_str()),
746
-
lexicon_record.value.get("slice").and_then(|v| v.as_str())
784
+
lexicon_record.value.get("slice").and_then(|v| v.as_str()),
747
785
)
748
786
{
749
787
// Delete all records of this collection type from the slice
750
-
let deleted = self.delete_records_by_collection(slice_uri_from_record, nsid).await?;
788
+
let deleted = self
789
+
.delete_records_by_collection(slice_uri_from_record, nsid)
790
+
.await?;
751
791
tracing::info!(
752
792
"Cascade delete: removed {} records of collection {} from slice {}",
753
-
deleted, nsid, slice_uri_from_record
793
+
deleted,
794
+
nsid,
795
+
slice_uri_from_record
754
796
);
755
797
}
756
798
}
···
762
804
let records_deleted = self.delete_all_records_for_slice(slice_uri).await?;
763
805
tracing::info!(
764
806
"Cascade delete: removed {} records from slice {}",
765
-
records_deleted, slice_uri
807
+
records_deleted,
808
+
slice_uri
766
809
);
767
810
768
811
// Delete all actors for this slice
769
-
let actors_deleted = super::client::Database::delete_all_actors_for_slice(self, slice_uri).await?;
812
+
let actors_deleted =
813
+
super::client::Database::delete_all_actors_for_slice(self, slice_uri).await?;
770
814
tracing::info!(
771
815
"Cascade delete: removed {} actors from slice {}",
772
-
actors_deleted, slice_uri
816
+
actors_deleted,
817
+
slice_uri
773
818
);
774
819
}
775
820
_ => {
···
889
934
}
890
935
})];
891
936
892
-
assert!(is_field_datetime(&lexicons, "app.bsky.feed.post", "createdAt"));
937
+
assert!(is_field_datetime(
938
+
&lexicons,
939
+
"app.bsky.feed.post",
940
+
"createdAt"
941
+
));
893
942
}
894
943
895
944
#[test]
···
931
980
}
932
981
})];
933
982
934
-
assert!(!is_field_datetime(&lexicons, "app.bsky.feed.post", "nonexistent"));
983
+
assert!(!is_field_datetime(
984
+
&lexicons,
985
+
"app.bsky.feed.post",
986
+
"nonexistent"
987
+
));
935
988
}
936
989
937
990
#[test]
···
953
1006
}
954
1007
})];
955
1008
956
-
assert!(!is_field_datetime(&lexicons, "app.bsky.actor.profile", "createdAt"));
1009
+
assert!(!is_field_datetime(
1010
+
&lexicons,
1011
+
"app.bsky.actor.profile",
1012
+
"createdAt"
1013
+
));
957
1014
}
958
1015
959
1016
#[test]
···
992
1049
}),
993
1050
];
994
1051
995
-
assert!(is_field_datetime(&lexicons, "app.bsky.actor.profile", "createdAt"));
1052
+
assert!(is_field_datetime(
1053
+
&lexicons,
1054
+
"app.bsky.actor.profile",
1055
+
"createdAt"
1056
+
));
996
1057
assert!(!is_field_datetime(&lexicons, "app.bsky.feed.post", "text"));
997
1058
}
998
1059
}
+2
api/src/database/types.rs
+2
api/src/database/types.rs
···
13
13
/// - `eq`: Exact match (field = value)
14
14
/// - `in_values`: Array membership (field IN (...))
15
15
/// - `contains`: Pattern matching (field ILIKE '%value%')
16
+
/// - `fuzzy`: Fuzzy text matching (field % value)
16
17
/// - `gt`: Greater than (field > value)
17
18
/// - `gte`: Greater than or equal (field >= value)
18
19
/// - `lt`: Less than (field < value)
···
24
25
#[serde(rename = "in")]
25
26
pub in_values: Option<Vec<Value>>,
26
27
pub contains: Option<String>,
28
+
pub fuzzy: Option<String>,
27
29
pub gt: Option<Value>,
28
30
pub gte: Option<Value>,
29
31
pub lt: Option<Value>,
+73
-40
api/src/graphql/dataloader.rs
+73
-40
api/src/graphql/dataloader.rs
···
33
33
type Value = Vec<IndexedRecord>;
34
34
type Error = Arc<String>;
35
35
36
-
async fn load(&self, keys: &[CollectionDidKey]) -> Result<HashMap<CollectionDidKey, Self::Value>, Self::Error> {
36
+
async fn load(
37
+
&self,
38
+
keys: &[CollectionDidKey],
39
+
) -> Result<HashMap<CollectionDidKey, Self::Value>, Self::Error> {
37
40
// Group keys by slice_uri and collection for optimal batching
38
41
let mut grouped: HashMap<(String, String), Vec<String>> = HashMap::new();
39
42
···
59
62
where_clause.conditions.insert(
60
63
"collection".to_string(),
61
64
WhereCondition {
62
-
gt: None,
63
-
gte: None,
64
-
lt: None,
65
-
lte: None,
66
65
eq: Some(serde_json::Value::String(collection.clone())),
67
66
in_values: None,
68
67
contains: None,
68
+
fuzzy: None,
69
+
gt: None,
70
+
gte: None,
71
+
lt: None,
72
+
lte: None,
69
73
},
70
74
);
71
75
···
73
77
where_clause.conditions.insert(
74
78
"did".to_string(),
75
79
WhereCondition {
76
-
gt: None,
77
-
gte: None,
78
-
lt: None,
79
-
lte: None,
80
80
eq: None,
81
81
in_values: Some(
82
82
dids.iter()
83
83
.map(|did| serde_json::Value::String(did.clone()))
84
-
.collect()
84
+
.collect(),
85
85
),
86
86
contains: None,
87
+
fuzzy: None,
88
+
gt: None,
89
+
gte: None,
90
+
lt: None,
91
+
lte: None,
87
92
},
88
93
);
89
94
90
95
// Query database with no limit - load all records for batched filtering
91
-
match self.db.get_slice_collections_records(
92
-
&slice_uri,
93
-
None, // No limit - load all records for this DID
94
-
None, // cursor
95
-
None, // sort
96
-
Some(&where_clause),
97
-
).await {
96
+
match self
97
+
.db
98
+
.get_slice_collections_records(
99
+
&slice_uri,
100
+
None, // No limit - load all records for this DID
101
+
None, // cursor
102
+
None, // sort
103
+
Some(&where_clause),
104
+
)
105
+
.await
106
+
{
98
107
Ok((records, _cursor)) => {
99
108
// Group results by DID
100
109
for record in records {
···
121
130
}
122
131
}
123
132
Err(e) => {
124
-
tracing::error!("DataLoader batch query failed for {}/{}: {}", slice_uri, collection, e);
133
+
tracing::error!(
134
+
"DataLoader batch query failed for {}/{}: {}",
135
+
slice_uri,
136
+
collection,
137
+
e
138
+
);
125
139
// Return empty results for failed queries rather than failing the entire batch
126
140
}
127
141
}
···
161
175
type Value = Vec<IndexedRecord>;
162
176
type Error = Arc<String>;
163
177
164
-
async fn load(&self, keys: &[CollectionUriKey]) -> Result<HashMap<CollectionUriKey, Self::Value>, Self::Error> {
178
+
async fn load(
179
+
&self,
180
+
keys: &[CollectionUriKey],
181
+
) -> Result<HashMap<CollectionUriKey, Self::Value>, Self::Error> {
165
182
// Group keys by (slice_uri, collection, reference_field) for optimal batching
166
183
let mut grouped: HashMap<(String, String, String), Vec<String>> = HashMap::new();
167
184
168
185
for key in keys {
169
186
grouped
170
-
.entry((key.slice_uri.clone(), key.collection.clone(), key.reference_field.clone()))
187
+
.entry((
188
+
key.slice_uri.clone(),
189
+
key.collection.clone(),
190
+
key.reference_field.clone(),
191
+
))
171
192
.or_insert_with(Vec::new)
172
193
.push(key.parent_uri.clone());
173
194
}
···
187
208
where_clause.conditions.insert(
188
209
"collection".to_string(),
189
210
WhereCondition {
190
-
gt: None,
191
-
gte: None,
192
-
lt: None,
193
-
lte: None,
194
211
eq: Some(serde_json::Value::String(collection.clone())),
195
212
in_values: None,
196
213
contains: None,
214
+
fuzzy: None,
215
+
gt: None,
216
+
gte: None,
217
+
lt: None,
218
+
lte: None,
197
219
},
198
220
);
199
221
···
202
224
where_clause.conditions.insert(
203
225
reference_field.clone(),
204
226
WhereCondition {
205
-
gt: None,
206
-
gte: None,
207
-
lt: None,
208
-
lte: None,
209
227
eq: None,
210
228
in_values: Some(
211
-
parent_uris.iter()
229
+
parent_uris
230
+
.iter()
212
231
.map(|uri| serde_json::Value::String(uri.clone()))
213
-
.collect()
232
+
.collect(),
214
233
),
215
234
contains: None,
235
+
fuzzy: None,
236
+
gt: None,
237
+
gte: None,
238
+
lt: None,
239
+
lte: None,
216
240
},
217
241
);
218
242
219
243
// Query database with no limit - load all records for batched filtering
220
-
match self.db.get_slice_collections_records(
221
-
&slice_uri,
222
-
None, // No limit - load all records matching parent URIs
223
-
None, // cursor
224
-
None, // sort
225
-
Some(&where_clause),
226
-
).await {
244
+
match self
245
+
.db
246
+
.get_slice_collections_records(
247
+
&slice_uri,
248
+
None, // No limit - load all records matching parent URIs
249
+
None, // cursor
250
+
None, // sort
251
+
Some(&where_clause),
252
+
)
253
+
.await
254
+
{
227
255
Ok((records, _cursor)) => {
228
256
// Group results by parent URI (extract from the reference field)
229
257
for record in records {
···
263
291
}
264
292
}
265
293
Err(e) => {
266
-
tracing::error!("CollectionUriLoader batch query failed for {}/{}: {}", slice_uri, collection, e);
294
+
tracing::error!(
295
+
"CollectionUriLoader batch query failed for {}/{}: {}",
296
+
slice_uri,
297
+
collection,
298
+
e
299
+
);
267
300
// Return empty results for failed queries rather than failing the entire batch
268
301
}
269
302
}
···
291
324
Self {
292
325
collection_did_loader: Arc::new(AsyncGraphQLDataLoader::new(
293
326
CollectionDidLoader::new(db.clone()),
294
-
tokio::spawn
327
+
tokio::spawn,
295
328
)),
296
329
collection_uri_loader: Arc::new(AsyncGraphQLDataLoader::new(
297
330
CollectionUriLoader::new(db),
298
-
tokio::spawn
331
+
tokio::spawn,
299
332
)),
300
333
}
301
334
}
+4
-1
api/src/graphql/dataloaders.rs
+4
-1
api/src/graphql/dataloaders.rs
···
9
9
// Check if this is a strongRef
10
10
if let Some(type_val) = obj.get("$type") {
11
11
if type_val.as_str() == Some("com.atproto.repo.strongRef") {
12
-
return obj.get("uri").and_then(|u| u.as_str()).map(|s| s.to_string());
12
+
return obj
13
+
.get("uri")
14
+
.and_then(|u| u.as_str())
15
+
.map(|s| s.to_string());
13
16
}
14
17
}
15
18
+14
-17
api/src/graphql/handler.rs
+14
-17
api/src/graphql/handler.rs
···
4
4
use async_graphql::http::{WebSocket as GraphQLWebSocket, WebSocketProtocols, WsMessage};
5
5
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
6
6
use axum::{
7
-
extract::{ws::{WebSocket, Message}, Query, State, WebSocketUpgrade},
7
+
extract::{
8
+
Query, State, WebSocketUpgrade,
9
+
ws::{Message, WebSocket},
10
+
},
8
11
http::{HeaderMap, StatusCode},
9
12
response::{Html, Response},
10
13
};
11
-
use futures_util::{StreamExt, SinkExt};
14
+
use futures_util::{SinkExt, StreamExt};
12
15
use serde::Deserialize;
13
16
use std::sync::Arc;
14
17
use tokio::sync::RwLock;
15
18
16
-
use crate::errors::AppError;
17
19
use crate::AppState;
20
+
use crate::errors::AppError;
18
21
use crate::graphql::GraphQLContext;
19
22
20
23
/// Global schema cache (one schema per slice)
···
58
61
Ok(s) => s,
59
62
Err(e) => {
60
63
tracing::error!("Failed to get GraphQL schema: {:?}", e);
61
-
return Ok(async_graphql::Response::from_errors(vec![async_graphql::ServerError::new(
62
-
format!("Schema error: {:?}", e),
63
-
None,
64
-
)])
64
+
return Ok(async_graphql::Response::from_errors(vec![
65
+
async_graphql::ServerError::new(format!("Schema error: {:?}", e), None),
66
+
])
65
67
.into());
66
68
}
67
69
};
···
256
258
}
257
259
258
260
/// Gets schema from cache or builds it if not cached
259
-
async fn get_or_build_schema(
260
-
state: &AppState,
261
-
slice_uri: &str,
262
-
) -> Result<Schema, AppError> {
261
+
async fn get_or_build_schema(state: &AppState, slice_uri: &str) -> Result<Schema, AppError> {
263
262
// Check cache first
264
263
{
265
264
let cache = SCHEMA_CACHE.read().await;
···
269
268
}
270
269
271
270
// Build schema
272
-
let schema = crate::graphql::build_graphql_schema(
273
-
state.database.clone(),
274
-
slice_uri.to_string(),
275
-
)
276
-
.await
277
-
.map_err(|e| AppError::Internal(format!("Failed to build GraphQL schema: {}", e)))?;
271
+
let schema =
272
+
crate::graphql::build_graphql_schema(state.database.clone(), slice_uri.to_string())
273
+
.await
274
+
.map_err(|e| AppError::Internal(format!("Failed to build GraphQL schema: {}", e)))?;
278
275
279
276
// Cache it
280
277
{
+6
-6
api/src/graphql/mod.rs
+6
-6
api/src/graphql/mod.rs
···
3
3
//! This module provides a GraphQL interface to query slice records with support
4
4
//! for joining linked records through AT Protocol strongRef references.
5
5
6
-
mod schema_builder;
7
-
mod dataloaders;
8
6
mod dataloader;
9
-
mod types;
7
+
mod dataloaders;
10
8
pub mod handler;
11
9
pub mod pubsub;
10
+
mod schema_builder;
11
+
mod types;
12
12
13
-
pub use schema_builder::build_graphql_schema;
14
-
pub use handler::{graphql_handler, graphql_playground, graphql_subscription_handler};
15
-
pub use pubsub::{RecordUpdateEvent, RecordOperation, PUBSUB};
16
13
pub use dataloader::GraphQLContext;
14
+
pub use handler::{graphql_handler, graphql_playground, graphql_subscription_handler};
15
+
pub use pubsub::{PUBSUB, RecordOperation, RecordUpdateEvent};
16
+
pub use schema_builder::build_graphql_schema;
+1
-2
api/src/graphql/pubsub.rs
+1
-2
api/src/graphql/pubsub.rs
···
6
6
use serde::{Deserialize, Serialize};
7
7
use std::collections::HashMap;
8
8
use std::sync::Arc;
9
-
use tokio::sync::{broadcast, RwLock};
9
+
use tokio::sync::{RwLock, broadcast};
10
10
use tracing::{debug, info};
11
11
12
12
/// Event broadcast when a record is created or updated
···
29
29
Update,
30
30
Delete,
31
31
}
32
-
33
32
34
33
/// PubSub manager for broadcasting events to subscribers
35
34
///
+725
-479
api/src/graphql/schema_builder.rs
+725
-479
api/src/graphql/schema_builder.rs
···
3
3
//! This module generates GraphQL schemas at runtime based on lexicon definitions
4
4
//! stored in the database, enabling flexible querying of slice records.
5
5
6
-
use async_graphql::dynamic::{Field, FieldFuture, FieldValue, Object, Schema, Scalar, TypeRef, InputObject, InputValue, Enum, EnumItem, Subscription, SubscriptionField, SubscriptionFieldFuture};
6
+
use async_graphql::dynamic::{
7
+
Enum, EnumItem, Field, FieldFuture, FieldValue, InputObject, InputValue, Object, Scalar,
8
+
Schema, Subscription, SubscriptionField, SubscriptionFieldFuture, TypeRef,
9
+
};
7
10
use async_graphql::{Error, Value as GraphQLValue};
8
-
use base64::engine::general_purpose;
9
11
use base64::Engine;
12
+
use base64::engine::general_purpose;
10
13
use serde_json;
11
14
use std::collections::HashMap;
12
15
use std::sync::Arc;
13
16
use tokio::sync::Mutex;
14
17
15
18
use crate::database::Database;
16
-
use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType};
17
19
use crate::graphql::PUBSUB;
18
20
use crate::graphql::dataloader::GraphQLContext;
21
+
use crate::graphql::types::{
22
+
GraphQLField, GraphQLType, extract_collection_fields, extract_record_key,
23
+
};
19
24
20
25
/// Metadata about a collection for cross-referencing
21
26
#[derive(Clone)]
22
27
struct CollectionMeta {
23
28
nsid: String,
24
-
key_type: String, // "tid", "literal:self", or "any"
25
-
type_name: String, // GraphQL type name for this collection
29
+
key_type: String, // "tid", "literal:self", or "any"
30
+
type_name: String, // GraphQL type name for this collection
26
31
at_uri_fields: Vec<String>, // Fields with format "at-uri" for reverse joins
27
32
}
28
33
29
34
/// Builds a dynamic GraphQL schema from lexicons for a given slice
30
-
pub async fn build_graphql_schema(
31
-
database: Database,
32
-
slice_uri: String,
33
-
) -> Result<Schema, String> {
35
+
pub async fn build_graphql_schema(database: Database, slice_uri: String) -> Result<Schema, String> {
34
36
// Fetch all lexicons for this slice
35
37
let all_lexicons = database
36
38
.get_lexicons_by_slice(&slice_uri)
···
73
75
if !fields.is_empty() {
74
76
if let Some(key_type) = extract_record_key(defs) {
75
77
// Extract at-uri field names for reverse joins
76
-
let at_uri_fields: Vec<String> = fields.iter()
78
+
let at_uri_fields: Vec<String> = fields
79
+
.iter()
77
80
.filter(|f| f.format.as_deref() == Some("at-uri"))
78
81
.map(|f| f.name.clone())
79
82
.collect();
80
83
81
84
if !at_uri_fields.is_empty() {
82
-
tracing::debug!(
83
-
"Collection {} has at-uri fields: {:?}",
84
-
nsid,
85
-
at_uri_fields
86
-
);
85
+
tracing::debug!("Collection {} has at-uri fields: {:?}", nsid, at_uri_fields);
87
86
}
88
87
89
88
all_collections.push(CollectionMeta {
···
115
114
if !fields.is_empty() {
116
115
// Create a GraphQL type for this collection
117
116
let type_name = nsid_to_type_name(nsid);
118
-
let record_type = create_record_type(&type_name, &fields, database.clone(), slice_uri.clone(), &all_collections);
117
+
let record_type = create_record_type(
118
+
&type_name,
119
+
&fields,
120
+
database.clone(),
121
+
slice_uri.clone(),
122
+
&all_collections,
123
+
);
119
124
120
125
// Create edge and connection types for this collection (Relay standard)
121
126
let edge_type = create_edge_type(&type_name);
···
140
145
141
146
for (field_name, filter_type) in system_fields {
142
147
if !lexicon_field_names.contains(field_name) {
143
-
where_input = where_input.field(InputValue::new(field_name, TypeRef::named(filter_type)));
148
+
where_input =
149
+
where_input.field(InputValue::new(field_name, TypeRef::named(filter_type)));
144
150
}
145
151
}
146
152
···
150
156
GraphQLType::Int => "IntFilter",
151
157
_ => "StringFilter", // Default to StringFilter for strings and other types
152
158
};
153
-
where_input = where_input.field(InputValue::new(&field.name, TypeRef::named(filter_type)));
159
+
where_input =
160
+
where_input.field(InputValue::new(&field.name, TypeRef::named(filter_type)));
154
161
}
155
162
156
163
// Add nested and/or support
157
164
where_input = where_input
158
-
.field(InputValue::new("and", TypeRef::named_list(format!("{}WhereInput", type_name))))
159
-
.field(InputValue::new("or", TypeRef::named_list(format!("{}WhereInput", type_name))));
165
+
.field(InputValue::new(
166
+
"and",
167
+
TypeRef::named_list(format!("{}WhereInput", type_name)),
168
+
))
169
+
.field(InputValue::new(
170
+
"or",
171
+
TypeRef::named_list(format!("{}WhereInput", type_name)),
172
+
));
160
173
161
174
// Create GroupByField enum for this collection
162
175
let mut group_by_enum = Enum::new(format!("{}GroupByField", type_name));
···
168
181
169
182
// Create collection-specific GroupByFieldInput
170
183
let group_by_input = InputObject::new(format!("{}GroupByFieldInput", type_name))
171
-
.field(InputValue::new("field", TypeRef::named_nn(format!("{}GroupByField", type_name))))
184
+
.field(InputValue::new(
185
+
"field",
186
+
TypeRef::named_nn(format!("{}GroupByField", type_name)),
187
+
))
172
188
.field(InputValue::new("interval", TypeRef::named("DateInterval")));
173
189
174
190
// Create collection-specific SortFieldInput
175
191
let sort_field_input = InputObject::new(format!("{}SortFieldInput", type_name))
176
-
.field(InputValue::new("field", TypeRef::named_nn(format!("{}GroupByField", type_name))))
177
-
.field(InputValue::new("direction", TypeRef::named("SortDirection")));
192
+
.field(InputValue::new(
193
+
"field",
194
+
TypeRef::named_nn(format!("{}GroupByField", type_name)),
195
+
))
196
+
.field(InputValue::new(
197
+
"direction",
198
+
TypeRef::named("SortDirection"),
199
+
));
178
200
179
201
// Collect the types to register with schema later
180
202
objects_to_register.push(record_type);
···
214
236
};
215
237
216
238
// Parse sortBy argument
217
-
let sort_by: Option<Vec<crate::models::SortField>> = match ctx.args.get("sortBy") {
239
+
let sort_by: Option<Vec<crate::models::SortField>> = match ctx
240
+
.args
241
+
.get("sortBy")
242
+
{
218
243
Some(val) => {
219
244
if let Ok(list) = val.list() {
220
245
let mut sort_fields = Vec::new();
221
246
for item in list.iter() {
222
247
if let Ok(obj) = item.object() {
223
-
let field = obj.get("field")
224
-
.and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
248
+
let field = obj
249
+
.get("field")
250
+
.and_then(|v| {
251
+
v.enum_name().ok().map(|s| s.to_string())
252
+
})
225
253
.unwrap_or_else(|| "indexedAt".to_string());
226
-
let direction = obj.get("direction")
227
-
.and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
254
+
let direction = obj
255
+
.get("direction")
256
+
.and_then(|v| {
257
+
v.enum_name().ok().map(|s| s.to_string())
258
+
})
228
259
.unwrap_or_else(|| "desc".to_string());
229
-
sort_fields.push(crate::models::SortField { field, direction });
260
+
sort_fields.push(crate::models::SortField {
261
+
field,
262
+
direction,
263
+
});
230
264
}
231
265
}
232
266
Some(sort_fields)
233
267
} else {
234
268
None
235
269
}
236
-
},
270
+
}
237
271
None => None,
238
272
};
239
273
···
249
283
where_clause.conditions.insert(
250
284
"collection".to_string(),
251
285
crate::models::WhereCondition {
252
-
gt: None,
253
-
gte: None,
254
-
lt: None,
255
-
lte: None,
286
+
gt: None,
287
+
gte: None,
288
+
lt: None,
289
+
lte: None,
256
290
eq: Some(serde_json::Value::String(collection.clone())),
257
291
in_values: None,
258
292
contains: None,
293
+
fuzzy: None,
259
294
},
260
295
);
261
296
···
272
307
}
273
308
274
309
// Resolve actorHandle to did if present
275
-
if let Some(actor_handle_condition) = where_clause.conditions.remove("actorHandle") {
310
+
if let Some(actor_handle_condition) =
311
+
where_clause.conditions.remove("actorHandle")
312
+
{
276
313
// Collect handles to resolve
277
314
let mut handles = Vec::new();
278
315
if let Some(eq_value) = &actor_handle_condition.eq {
···
296
333
// Replace actorHandle condition with did condition
297
334
let did_condition = if dids.len() == 1 {
298
335
crate::models::WhereCondition {
299
-
gt: None,
300
-
gte: None,
301
-
lt: None,
302
-
lte: None,
303
-
eq: Some(serde_json::Value::String(dids[0].clone())),
336
+
gt: None,
337
+
gte: None,
338
+
lt: None,
339
+
lte: None,
340
+
eq: Some(serde_json::Value::String(
341
+
dids[0].clone(),
342
+
)),
304
343
in_values: None,
305
344
contains: None,
345
+
fuzzy: None,
306
346
}
307
347
} else {
308
348
crate::models::WhereCondition {
309
-
gt: None,
310
-
gte: None,
311
-
lt: None,
312
-
lte: None,
349
+
gt: None,
350
+
gte: None,
351
+
lt: None,
352
+
lte: None,
313
353
eq: None,
314
-
in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
354
+
in_values: Some(
355
+
dids.into_iter()
356
+
.map(|d| {
357
+
serde_json::Value::String(d)
358
+
})
359
+
.collect(),
360
+
),
315
361
contains: None,
362
+
fuzzy: None,
316
363
}
317
364
};
318
-
where_clause.conditions.insert("did".to_string(), did_condition);
365
+
where_clause
366
+
.conditions
367
+
.insert("did".to_string(), did_condition);
319
368
}
320
369
// If no DIDs found, the query will return 0 results naturally
321
370
}
···
336
385
Some(&where_clause),
337
386
)
338
387
.await
339
-
.map_err(|e| {
340
-
Error::new(format!("Database query failed: {}", e))
341
-
})?;
388
+
.map_err(|e| Error::new(format!("Database query failed: {}", e)))?;
342
389
343
390
// Query database for total count
344
391
let total_count = db
345
392
.count_slice_collections_records(&slice, Some(&where_clause))
346
393
.await
347
-
.map_err(|e| {
348
-
Error::new(format!("Count query failed: {}", e))
349
-
})? as i32;
394
+
.map_err(|e| Error::new(format!("Count query failed: {}", e)))?
395
+
as i32;
350
396
351
397
// Convert records to RecordContainers
352
398
let record_containers: Vec<RecordContainer> = records
···
521
567
eq: Some(serde_json::Value::String(collection.clone())),
522
568
in_values: None,
523
569
contains: None,
570
+
fuzzy: None,
524
571
},
525
572
);
526
573
···
565
612
eq: Some(serde_json::Value::String(dids[0].clone())),
566
613
in_values: None,
567
614
contains: None,
615
+
fuzzy: None,
568
616
}
569
617
} else {
570
618
crate::models::WhereCondition {
···
575
623
eq: None,
576
624
in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
577
625
contains: None,
626
+
fuzzy: None,
578
627
}
579
628
};
580
629
where_clause.conditions.insert("did".to_string(), did_condition);
···
639
688
let subscription = create_subscription_type(slice_uri.clone(), &lexicons);
640
689
641
690
// Build and return the schema with complexity limits
642
-
let mut schema_builder = Schema::build(query.type_name(), Some(mutation.type_name()), Some(subscription.type_name()))
643
-
.register(query)
644
-
.register(mutation)
645
-
.register(subscription)
646
-
.limit_depth(50) // Higher limit to support GraphiQL introspection with reverse joins
647
-
.limit_complexity(5000); // Prevent expensive deeply nested queries
691
+
let mut schema_builder = Schema::build(
692
+
query.type_name(),
693
+
Some(mutation.type_name()),
694
+
Some(subscription.type_name()),
695
+
)
696
+
.register(query)
697
+
.register(mutation)
698
+
.register(subscription)
699
+
.limit_depth(50) // Higher limit to support GraphiQL introspection with reverse joins
700
+
.limit_complexity(5000); // Prevent expensive deeply nested queries
648
701
649
702
// Register JSON scalar type for complex fields
650
703
let json_scalar = Scalar::new("JSON");
···
655
708
.field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)))
656
709
.field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING)))
657
710
.field(InputValue::new("contains", TypeRef::named(TypeRef::STRING)))
711
+
.field(InputValue::new("fuzzy", TypeRef::named(TypeRef::STRING)))
658
712
.field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
659
713
.field(InputValue::new("gte", TypeRef::named(TypeRef::STRING)))
660
714
.field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
···
746
800
/// Container to hold blob data and DID for URL generation
747
801
#[derive(Clone)]
748
802
struct BlobContainer {
749
-
blob_ref: String, // CID reference
750
-
mime_type: String, // MIME type
751
-
size: i64, // Size in bytes
752
-
did: String, // DID for CDN URL generation
803
+
blob_ref: String, // CID reference
804
+
mime_type: String, // MIME type
805
+
size: i64, // Size in bytes
806
+
did: String, // DID for CDN URL generation
753
807
}
754
808
755
809
/// Creates a GraphQL Object type for a record collection
···
768
822
769
823
// Add standard AT Protocol fields only if they don't conflict with lexicon fields
770
824
if !lexicon_field_names.contains("uri") {
771
-
object = object.field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| {
772
-
FieldFuture::new(async move {
773
-
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
774
-
Ok(Some(GraphQLValue::from(container.record.uri.clone())))
775
-
})
776
-
}));
825
+
object = object.field(Field::new(
826
+
"uri",
827
+
TypeRef::named_nn(TypeRef::STRING),
828
+
|ctx| {
829
+
FieldFuture::new(async move {
830
+
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
831
+
Ok(Some(GraphQLValue::from(container.record.uri.clone())))
832
+
})
833
+
},
834
+
));
777
835
}
778
836
779
837
if !lexicon_field_names.contains("cid") {
780
-
object = object.field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| {
781
-
FieldFuture::new(async move {
782
-
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
783
-
Ok(Some(GraphQLValue::from(container.record.cid.clone())))
784
-
})
785
-
}));
838
+
object = object.field(Field::new(
839
+
"cid",
840
+
TypeRef::named_nn(TypeRef::STRING),
841
+
|ctx| {
842
+
FieldFuture::new(async move {
843
+
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
844
+
Ok(Some(GraphQLValue::from(container.record.cid.clone())))
845
+
})
846
+
},
847
+
));
786
848
}
787
849
788
850
if !lexicon_field_names.contains("did") {
789
-
object = object.field(Field::new("did", TypeRef::named_nn(TypeRef::STRING), |ctx| {
790
-
FieldFuture::new(async move {
791
-
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
792
-
Ok(Some(GraphQLValue::from(container.record.did.clone())))
793
-
})
794
-
}));
851
+
object = object.field(Field::new(
852
+
"did",
853
+
TypeRef::named_nn(TypeRef::STRING),
854
+
|ctx| {
855
+
FieldFuture::new(async move {
856
+
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
857
+
Ok(Some(GraphQLValue::from(container.record.did.clone())))
858
+
})
859
+
},
860
+
));
795
861
}
796
862
797
863
if !lexicon_field_names.contains("indexedAt") {
···
817
883
"actorHandle",
818
884
TypeRef::named(TypeRef::STRING),
819
885
move |ctx| {
820
-
let db = db_for_actor.clone();
821
-
let slice = slice_for_actor.clone();
822
-
FieldFuture::new(async move {
823
-
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
824
-
let did = &container.record.did;
886
+
let db = db_for_actor.clone();
887
+
let slice = slice_for_actor.clone();
888
+
FieldFuture::new(async move {
889
+
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
890
+
let did = &container.record.did;
825
891
826
-
// Build where clause to find actor by DID
827
-
let mut where_clause = crate::models::WhereClause {
828
-
conditions: std::collections::HashMap::new(),
829
-
or_conditions: None,
830
-
and: None,
831
-
or: None,
832
-
};
833
-
where_clause.conditions.insert(
834
-
"did".to_string(),
835
-
crate::models::WhereCondition {
836
-
gt: None,
837
-
gte: None,
838
-
lt: None,
839
-
lte: None,
840
-
eq: Some(serde_json::Value::String(did.clone())),
841
-
in_values: None,
842
-
contains: None,
843
-
},
844
-
);
892
+
// Build where clause to find actor by DID
893
+
let mut where_clause = crate::models::WhereClause {
894
+
conditions: std::collections::HashMap::new(),
895
+
or_conditions: None,
896
+
and: None,
897
+
or: None,
898
+
};
899
+
where_clause.conditions.insert(
900
+
"did".to_string(),
901
+
crate::models::WhereCondition {
902
+
gt: None,
903
+
gte: None,
904
+
lt: None,
905
+
lte: None,
906
+
eq: Some(serde_json::Value::String(did.clone())),
907
+
in_values: None,
908
+
contains: None,
909
+
fuzzy: None,
910
+
},
911
+
);
845
912
846
-
match db.get_slice_actors(&slice, Some(1), None, Some(&where_clause)).await {
847
-
Ok((actors, _cursor)) => {
848
-
if let Some(actor) = actors.first() {
849
-
if let Some(handle) = &actor.handle {
850
-
Ok(Some(GraphQLValue::from(handle.clone())))
851
-
} else {
852
-
Ok(None)
853
-
}
913
+
match db
914
+
.get_slice_actors(&slice, Some(1), None, Some(&where_clause))
915
+
.await
916
+
{
917
+
Ok((actors, _cursor)) => {
918
+
if let Some(actor) = actors.first() {
919
+
if let Some(handle) = &actor.handle {
920
+
Ok(Some(GraphQLValue::from(handle.clone())))
854
921
} else {
855
922
Ok(None)
856
923
}
857
-
}
858
-
Err(e) => {
859
-
tracing::debug!("Actor not found for {}: {}", did, e);
924
+
} else {
860
925
Ok(None)
861
926
}
862
927
}
863
-
})
864
-
},
865
-
));
928
+
Err(e) => {
929
+
tracing::debug!("Actor not found for {}: {}", did, e);
930
+
Ok(None)
931
+
}
932
+
}
933
+
})
934
+
},
935
+
));
866
936
867
937
// Add fields from lexicon
868
938
for field in fields {
···
910
980
.unwrap_or("image/jpeg")
911
981
.to_string();
912
982
913
-
let size = obj
914
-
.get("size")
915
-
.and_then(|s| s.as_i64())
916
-
.unwrap_or(0);
983
+
let size =
984
+
obj.get("size").and_then(|s| s.as_i64()).unwrap_or(0);
917
985
918
986
let blob_container = BlobContainer {
919
987
blob_ref,
···
952
1020
.unwrap_or("image/jpeg")
953
1021
.to_string();
954
1022
955
-
let size = obj
956
-
.get("size")
957
-
.and_then(|s| s.as_i64())
958
-
.unwrap_or(0);
1023
+
let size = obj.get("size").and_then(|s| s.as_i64()).unwrap_or(0);
959
1024
960
1025
let blob_container = BlobContainer {
961
1026
blob_ref,
···
980
1045
match db.get_record(&uri).await {
981
1046
Ok(Some(linked_record)) => {
982
1047
// Convert the linked record to a JSON value
983
-
let record_json = serde_json::to_value(linked_record)
984
-
.map_err(|e| {
1048
+
let record_json =
1049
+
serde_json::to_value(linked_record).map_err(|e| {
985
1050
Error::new(format!("Serialization error: {}", e))
986
1051
})?;
987
1052
···
1021
1086
1022
1087
// Collect all string fields with format "at-uri" that might reference this collection
1023
1088
// We'll check each one at runtime to see if it contains a URI to this collection
1024
-
let uri_ref_fields: Vec<_> = fields.iter()
1089
+
let uri_ref_fields: Vec<_> = fields
1090
+
.iter()
1025
1091
.filter(|f| matches!(f.format.as_deref(), Some("at-uri")))
1026
1092
.collect();
1027
1093
···
1031
1097
1032
1098
// If we found at-uri fields, create a resolver that checks each one at runtime
1033
1099
if !uri_ref_fields.is_empty() {
1034
-
let ref_field_names: Vec<String> = uri_ref_fields.iter().map(|f| f.name.clone()).collect();
1100
+
let ref_field_names: Vec<String> =
1101
+
uri_ref_fields.iter().map(|f| f.name.clone()).collect();
1035
1102
let db_for_uri_join = database.clone();
1036
1103
let target_collection = collection_nsid.clone();
1037
1104
···
1055
1122
match db.get_record(uri).await {
1056
1123
Ok(Some(record)) => {
1057
1124
let new_container = RecordContainer { record };
1058
-
return Ok(Some(FieldValue::owned_any(new_container)));
1125
+
return Ok(Some(FieldValue::owned_any(
1126
+
new_container,
1127
+
)));
1059
1128
}
1060
1129
Ok(None) => continue, // Try next field
1061
-
Err(_) => continue, // Try next field
1130
+
Err(_) => continue, // Try next field
1062
1131
}
1063
1132
}
1064
1133
}
···
1083
1152
let db = db_for_join.clone();
1084
1153
let nsid = collection_nsid.clone();
1085
1154
FieldFuture::new(async move {
1086
-
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1155
+
let container =
1156
+
ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1087
1157
let uri = format!("at://{}/{}/self", container.record.did, nsid);
1088
1158
1089
1159
match db.get_record(&uri).await {
1090
1160
Ok(Some(record)) => {
1091
-
let new_container = RecordContainer {
1092
-
record,
1093
-
};
1161
+
let new_container = RecordContainer { record };
1094
1162
Ok(Some(FieldValue::owned_any(new_container)))
1095
1163
}
1096
1164
Ok(None) => Ok(None),
···
1179
1247
eq: Some(serde_json::Value::String(nsid.clone())),
1180
1248
in_values: None,
1181
1249
contains: None,
1250
+
fuzzy: None,
1182
1251
},
1183
1252
);
1184
1253
where_clause.conditions.insert(
···
1191
1260
eq: Some(serde_json::Value::String(did.clone())),
1192
1261
in_values: None,
1193
1262
contains: None,
1263
+
fuzzy: None,
1194
1264
},
1195
1265
);
1196
1266
···
1354
1424
let collection_for_count = collection.nsid.clone();
1355
1425
let at_uri_fields_for_count = collection.at_uri_fields.clone();
1356
1426
1357
-
object = object.field(
1358
-
Field::new(
1359
-
&count_field_name,
1360
-
TypeRef::named_nn(TypeRef::INT),
1361
-
move |ctx| {
1362
-
let slice = slice_for_count.clone();
1363
-
let nsid = collection_for_count.clone();
1364
-
let db = db_for_count.clone();
1365
-
let ref_fields = at_uri_fields_for_count.clone();
1366
-
FieldFuture::new(async move {
1367
-
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1368
-
let parent_uri = &container.record.uri;
1427
+
object = object.field(Field::new(
1428
+
&count_field_name,
1429
+
TypeRef::named_nn(TypeRef::INT),
1430
+
move |ctx| {
1431
+
let slice = slice_for_count.clone();
1432
+
let nsid = collection_for_count.clone();
1433
+
let db = db_for_count.clone();
1434
+
let ref_fields = at_uri_fields_for_count.clone();
1435
+
FieldFuture::new(async move {
1436
+
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
1437
+
let parent_uri = &container.record.uri;
1369
1438
1370
-
// Build where clause to count records referencing this URI
1371
-
for ref_field in &ref_fields {
1372
-
let mut where_clause = crate::models::WhereClause {
1373
-
conditions: HashMap::new(),
1374
-
or_conditions: None,
1375
-
and: None,
1376
-
or: None,
1377
-
};
1439
+
// Build where clause to count records referencing this URI
1440
+
for ref_field in &ref_fields {
1441
+
let mut where_clause = crate::models::WhereClause {
1442
+
conditions: HashMap::new(),
1443
+
or_conditions: None,
1444
+
and: None,
1445
+
or: None,
1446
+
};
1378
1447
1379
-
where_clause.conditions.insert(
1380
-
"collection".to_string(),
1381
-
crate::models::WhereCondition {
1382
-
gt: None,
1383
-
gte: None,
1384
-
lt: None,
1385
-
lte: None,
1386
-
eq: Some(serde_json::Value::String(nsid.clone())),
1387
-
in_values: None,
1388
-
contains: None,
1389
-
},
1390
-
);
1448
+
where_clause.conditions.insert(
1449
+
"collection".to_string(),
1450
+
crate::models::WhereCondition {
1451
+
gt: None,
1452
+
gte: None,
1453
+
lt: None,
1454
+
lte: None,
1455
+
eq: Some(serde_json::Value::String(nsid.clone())),
1456
+
in_values: None,
1457
+
contains: None,
1458
+
fuzzy: None,
1459
+
},
1460
+
);
1391
1461
1392
-
where_clause.conditions.insert(
1393
-
ref_field.clone(),
1394
-
crate::models::WhereCondition {
1395
-
gt: None,
1396
-
gte: None,
1397
-
lt: None,
1398
-
lte: None,
1399
-
eq: Some(serde_json::Value::String(parent_uri.clone())),
1400
-
in_values: None,
1401
-
contains: None,
1402
-
},
1403
-
);
1462
+
where_clause.conditions.insert(
1463
+
ref_field.clone(),
1464
+
crate::models::WhereCondition {
1465
+
gt: None,
1466
+
gte: None,
1467
+
lt: None,
1468
+
lte: None,
1469
+
eq: Some(serde_json::Value::String(parent_uri.clone())),
1470
+
in_values: None,
1471
+
contains: None,
1472
+
fuzzy: None,
1473
+
},
1474
+
);
1404
1475
1405
-
match db.count_slice_collections_records(&slice, Some(&where_clause)).await {
1406
-
Ok(count) if count > 0 => {
1407
-
return Ok(Some(FieldValue::value(count as i32)));
1408
-
}
1409
-
Ok(_) => continue,
1410
-
Err(e) => {
1411
-
tracing::debug!("Count error for {}: {}", nsid, e);
1412
-
continue;
1413
-
}
1476
+
match db
1477
+
.count_slice_collections_records(&slice, Some(&where_clause))
1478
+
.await
1479
+
{
1480
+
Ok(count) if count > 0 => {
1481
+
return Ok(Some(FieldValue::value(count as i32)));
1482
+
}
1483
+
Ok(_) => continue,
1484
+
Err(e) => {
1485
+
tracing::debug!("Count error for {}: {}", nsid, e);
1486
+
continue;
1414
1487
}
1415
1488
}
1489
+
}
1416
1490
1417
-
// No matching field found, return 0
1418
-
Ok(Some(FieldValue::value(0)))
1419
-
})
1420
-
},
1421
-
)
1422
-
);
1491
+
// No matching field found, return 0
1492
+
Ok(Some(FieldValue::value(0)))
1493
+
})
1494
+
},
1495
+
));
1423
1496
}
1424
1497
1425
1498
object
···
1504
1577
// For arrays of primitives, use typed arrays
1505
1578
// For arrays of complex types, use JSON scalar
1506
1579
match inner.as_ref() {
1507
-
GraphQLType::String | GraphQLType::Int | GraphQLType::Boolean | GraphQLType::Float => {
1580
+
GraphQLType::String
1581
+
| GraphQLType::Int
1582
+
| GraphQLType::Boolean
1583
+
| GraphQLType::Float => {
1508
1584
let inner_ref = match inner.as_ref() {
1509
1585
GraphQLType::String => TypeRef::STRING,
1510
1586
GraphQLType::Int => TypeRef::INT,
···
1545
1621
let mut blob = Object::new("Blob");
1546
1622
1547
1623
// ref field - CID reference
1548
-
blob = blob.field(Field::new("ref", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1549
-
FieldFuture::new(async move {
1550
-
let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
1551
-
Ok(Some(GraphQLValue::from(container.blob_ref.clone())))
1552
-
})
1553
-
}));
1624
+
blob = blob.field(Field::new(
1625
+
"ref",
1626
+
TypeRef::named_nn(TypeRef::STRING),
1627
+
|ctx| {
1628
+
FieldFuture::new(async move {
1629
+
let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
1630
+
Ok(Some(GraphQLValue::from(container.blob_ref.clone())))
1631
+
})
1632
+
},
1633
+
));
1554
1634
1555
1635
// mimeType field
1556
-
blob = blob.field(Field::new("mimeType", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1557
-
FieldFuture::new(async move {
1558
-
let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
1559
-
Ok(Some(GraphQLValue::from(container.mime_type.clone())))
1560
-
})
1561
-
}));
1636
+
blob = blob.field(Field::new(
1637
+
"mimeType",
1638
+
TypeRef::named_nn(TypeRef::STRING),
1639
+
|ctx| {
1640
+
FieldFuture::new(async move {
1641
+
let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
1642
+
Ok(Some(GraphQLValue::from(container.mime_type.clone())))
1643
+
})
1644
+
},
1645
+
));
1562
1646
1563
1647
// size field
1564
1648
blob = blob.field(Field::new("size", TypeRef::named_nn(TypeRef::INT), |ctx| {
···
1607
1691
fn create_sync_result_type() -> Object {
1608
1692
let mut sync_result = Object::new("SyncResult");
1609
1693
1610
-
sync_result = sync_result.field(Field::new("success", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
1611
-
FieldFuture::new(async move {
1612
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1613
-
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1614
-
if let GraphQLValue::Object(obj) = value {
1615
-
if let Some(success) = obj.get("success") {
1616
-
return Ok(Some(success.clone()));
1694
+
sync_result = sync_result.field(Field::new(
1695
+
"success",
1696
+
TypeRef::named_nn(TypeRef::BOOLEAN),
1697
+
|ctx| {
1698
+
FieldFuture::new(async move {
1699
+
let value = ctx
1700
+
.parent_value
1701
+
.downcast_ref::<GraphQLValue>()
1702
+
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1703
+
if let GraphQLValue::Object(obj) = value {
1704
+
if let Some(success) = obj.get("success") {
1705
+
return Ok(Some(success.clone()));
1706
+
}
1617
1707
}
1618
-
}
1619
-
Ok(None)
1620
-
})
1621
-
}));
1708
+
Ok(None)
1709
+
})
1710
+
},
1711
+
));
1622
1712
1623
-
sync_result = sync_result.field(Field::new("reposProcessed", TypeRef::named_nn(TypeRef::INT), |ctx| {
1624
-
FieldFuture::new(async move {
1625
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1626
-
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1627
-
if let GraphQLValue::Object(obj) = value {
1628
-
if let Some(repos) = obj.get("reposProcessed") {
1629
-
return Ok(Some(repos.clone()));
1713
+
sync_result = sync_result.field(Field::new(
1714
+
"reposProcessed",
1715
+
TypeRef::named_nn(TypeRef::INT),
1716
+
|ctx| {
1717
+
FieldFuture::new(async move {
1718
+
let value = ctx
1719
+
.parent_value
1720
+
.downcast_ref::<GraphQLValue>()
1721
+
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1722
+
if let GraphQLValue::Object(obj) = value {
1723
+
if let Some(repos) = obj.get("reposProcessed") {
1724
+
return Ok(Some(repos.clone()));
1725
+
}
1630
1726
}
1631
-
}
1632
-
Ok(None)
1633
-
})
1634
-
}));
1727
+
Ok(None)
1728
+
})
1729
+
},
1730
+
));
1635
1731
1636
-
sync_result = sync_result.field(Field::new("recordsSynced", TypeRef::named_nn(TypeRef::INT), |ctx| {
1637
-
FieldFuture::new(async move {
1638
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1639
-
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1640
-
if let GraphQLValue::Object(obj) = value {
1641
-
if let Some(records) = obj.get("recordsSynced") {
1642
-
return Ok(Some(records.clone()));
1732
+
sync_result = sync_result.field(Field::new(
1733
+
"recordsSynced",
1734
+
TypeRef::named_nn(TypeRef::INT),
1735
+
|ctx| {
1736
+
FieldFuture::new(async move {
1737
+
let value = ctx
1738
+
.parent_value
1739
+
.downcast_ref::<GraphQLValue>()
1740
+
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1741
+
if let GraphQLValue::Object(obj) = value {
1742
+
if let Some(records) = obj.get("recordsSynced") {
1743
+
return Ok(Some(records.clone()));
1744
+
}
1643
1745
}
1644
-
}
1645
-
Ok(None)
1646
-
})
1647
-
}));
1746
+
Ok(None)
1747
+
})
1748
+
},
1749
+
));
1648
1750
1649
-
sync_result = sync_result.field(Field::new("timedOut", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
1650
-
FieldFuture::new(async move {
1651
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1652
-
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1653
-
if let GraphQLValue::Object(obj) = value {
1654
-
if let Some(timed_out) = obj.get("timedOut") {
1655
-
return Ok(Some(timed_out.clone()));
1751
+
sync_result = sync_result.field(Field::new(
1752
+
"timedOut",
1753
+
TypeRef::named_nn(TypeRef::BOOLEAN),
1754
+
|ctx| {
1755
+
FieldFuture::new(async move {
1756
+
let value = ctx
1757
+
.parent_value
1758
+
.downcast_ref::<GraphQLValue>()
1759
+
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1760
+
if let GraphQLValue::Object(obj) = value {
1761
+
if let Some(timed_out) = obj.get("timedOut") {
1762
+
return Ok(Some(timed_out.clone()));
1763
+
}
1656
1764
}
1657
-
}
1658
-
Ok(None)
1659
-
})
1660
-
}));
1765
+
Ok(None)
1766
+
})
1767
+
},
1768
+
));
1661
1769
1662
-
sync_result = sync_result.field(Field::new("message", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1663
-
FieldFuture::new(async move {
1664
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1665
-
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1666
-
if let GraphQLValue::Object(obj) = value {
1667
-
if let Some(message) = obj.get("message") {
1668
-
return Ok(Some(message.clone()));
1770
+
sync_result = sync_result.field(Field::new(
1771
+
"message",
1772
+
TypeRef::named_nn(TypeRef::STRING),
1773
+
|ctx| {
1774
+
FieldFuture::new(async move {
1775
+
let value = ctx
1776
+
.parent_value
1777
+
.downcast_ref::<GraphQLValue>()
1778
+
.ok_or_else(|| Error::new("Failed to downcast sync result"))?;
1779
+
if let GraphQLValue::Object(obj) = value {
1780
+
if let Some(message) = obj.get("message") {
1781
+
return Ok(Some(message.clone()));
1782
+
}
1669
1783
}
1670
-
}
1671
-
Ok(None)
1672
-
})
1673
-
}));
1784
+
Ok(None)
1785
+
})
1786
+
},
1787
+
));
1674
1788
1675
1789
sync_result
1676
1790
}
···
1698
1812
.field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)))
1699
1813
.field(InputValue::new("in", TypeRef::named_list(TypeRef::STRING)))
1700
1814
.field(InputValue::new("contains", TypeRef::named(TypeRef::STRING)))
1815
+
.field(InputValue::new("fuzzy", TypeRef::named(TypeRef::STRING)))
1701
1816
}
1702
1817
1703
1818
/// Creates the IntCondition input type for int field filtering
···
1711
1826
fn create_page_info_type() -> Object {
1712
1827
let mut page_info = Object::new("PageInfo");
1713
1828
1714
-
page_info = page_info.field(Field::new("hasNextPage", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
1715
-
FieldFuture::new(async move {
1716
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1717
-
.ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
1718
-
if let GraphQLValue::Object(obj) = value {
1719
-
if let Some(has_next) = obj.get("hasNextPage") {
1720
-
return Ok(Some(has_next.clone()));
1829
+
page_info = page_info.field(Field::new(
1830
+
"hasNextPage",
1831
+
TypeRef::named_nn(TypeRef::BOOLEAN),
1832
+
|ctx| {
1833
+
FieldFuture::new(async move {
1834
+
let value = ctx
1835
+
.parent_value
1836
+
.downcast_ref::<GraphQLValue>()
1837
+
.ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
1838
+
if let GraphQLValue::Object(obj) = value {
1839
+
if let Some(has_next) = obj.get("hasNextPage") {
1840
+
return Ok(Some(has_next.clone()));
1841
+
}
1721
1842
}
1722
-
}
1723
-
Ok(Some(GraphQLValue::from(false)))
1724
-
})
1725
-
}));
1843
+
Ok(Some(GraphQLValue::from(false)))
1844
+
})
1845
+
},
1846
+
));
1726
1847
1727
-
page_info = page_info.field(Field::new("hasPreviousPage", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
1728
-
FieldFuture::new(async move {
1729
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1730
-
.ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
1731
-
if let GraphQLValue::Object(obj) = value {
1732
-
if let Some(has_prev) = obj.get("hasPreviousPage") {
1733
-
return Ok(Some(has_prev.clone()));
1848
+
page_info = page_info.field(Field::new(
1849
+
"hasPreviousPage",
1850
+
TypeRef::named_nn(TypeRef::BOOLEAN),
1851
+
|ctx| {
1852
+
FieldFuture::new(async move {
1853
+
let value = ctx
1854
+
.parent_value
1855
+
.downcast_ref::<GraphQLValue>()
1856
+
.ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
1857
+
if let GraphQLValue::Object(obj) = value {
1858
+
if let Some(has_prev) = obj.get("hasPreviousPage") {
1859
+
return Ok(Some(has_prev.clone()));
1860
+
}
1734
1861
}
1735
-
}
1736
-
Ok(Some(GraphQLValue::from(false)))
1737
-
})
1738
-
}));
1862
+
Ok(Some(GraphQLValue::from(false)))
1863
+
})
1864
+
},
1865
+
));
1739
1866
1740
-
page_info = page_info.field(Field::new("startCursor", TypeRef::named(TypeRef::STRING), |ctx| {
1741
-
FieldFuture::new(async move {
1742
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1743
-
.ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
1744
-
if let GraphQLValue::Object(obj) = value {
1745
-
if let Some(cursor) = obj.get("startCursor") {
1746
-
return Ok(Some(cursor.clone()));
1867
+
page_info = page_info.field(Field::new(
1868
+
"startCursor",
1869
+
TypeRef::named(TypeRef::STRING),
1870
+
|ctx| {
1871
+
FieldFuture::new(async move {
1872
+
let value = ctx
1873
+
.parent_value
1874
+
.downcast_ref::<GraphQLValue>()
1875
+
.ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
1876
+
if let GraphQLValue::Object(obj) = value {
1877
+
if let Some(cursor) = obj.get("startCursor") {
1878
+
return Ok(Some(cursor.clone()));
1879
+
}
1747
1880
}
1748
-
}
1749
-
Ok(None)
1750
-
})
1751
-
}));
1881
+
Ok(None)
1882
+
})
1883
+
},
1884
+
));
1752
1885
1753
-
page_info = page_info.field(Field::new("endCursor", TypeRef::named(TypeRef::STRING), |ctx| {
1754
-
FieldFuture::new(async move {
1755
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
1756
-
.ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
1757
-
if let GraphQLValue::Object(obj) = value {
1758
-
if let Some(cursor) = obj.get("endCursor") {
1759
-
return Ok(Some(cursor.clone()));
1886
+
page_info = page_info.field(Field::new(
1887
+
"endCursor",
1888
+
TypeRef::named(TypeRef::STRING),
1889
+
|ctx| {
1890
+
FieldFuture::new(async move {
1891
+
let value = ctx
1892
+
.parent_value
1893
+
.downcast_ref::<GraphQLValue>()
1894
+
.ok_or_else(|| Error::new("Failed to downcast PageInfo"))?;
1895
+
if let GraphQLValue::Object(obj) = value {
1896
+
if let Some(cursor) = obj.get("endCursor") {
1897
+
return Ok(Some(cursor.clone()));
1898
+
}
1760
1899
}
1761
-
}
1762
-
Ok(None)
1763
-
})
1764
-
}));
1900
+
Ok(None)
1901
+
})
1902
+
},
1903
+
));
1765
1904
1766
1905
page_info
1767
1906
}
···
1798
1937
}));
1799
1938
1800
1939
// Add cursor field
1801
-
edge = edge.field(Field::new("cursor", TypeRef::named_nn(TypeRef::STRING), |ctx| {
1802
-
FieldFuture::new(async move {
1803
-
let edge_data = ctx.parent_value.try_downcast_ref::<EdgeData>()?;
1804
-
Ok(Some(GraphQLValue::from(edge_data.cursor.clone())))
1805
-
})
1806
-
}));
1940
+
edge = edge.field(Field::new(
1941
+
"cursor",
1942
+
TypeRef::named_nn(TypeRef::STRING),
1943
+
|ctx| {
1944
+
FieldFuture::new(async move {
1945
+
let edge_data = ctx.parent_value.try_downcast_ref::<EdgeData>()?;
1946
+
Ok(Some(GraphQLValue::from(edge_data.cursor.clone())))
1947
+
})
1948
+
},
1949
+
));
1807
1950
1808
1951
edge
1809
1952
}
···
1815
1958
let mut connection = Object::new(&connection_name);
1816
1959
1817
1960
// Add totalCount field
1818
-
connection = connection.field(Field::new("totalCount", TypeRef::named_nn(TypeRef::INT), |ctx| {
1819
-
FieldFuture::new(async move {
1820
-
let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
1821
-
Ok(Some(GraphQLValue::from(data.total_count)))
1822
-
})
1823
-
}));
1961
+
connection = connection.field(Field::new(
1962
+
"totalCount",
1963
+
TypeRef::named_nn(TypeRef::INT),
1964
+
|ctx| {
1965
+
FieldFuture::new(async move {
1966
+
let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
1967
+
Ok(Some(GraphQLValue::from(data.total_count)))
1968
+
})
1969
+
},
1970
+
));
1824
1971
1825
1972
// Add pageInfo field
1826
-
connection = connection.field(Field::new("pageInfo", TypeRef::named_nn("PageInfo"), |ctx| {
1827
-
FieldFuture::new(async move {
1828
-
let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
1973
+
connection = connection.field(Field::new(
1974
+
"pageInfo",
1975
+
TypeRef::named_nn("PageInfo"),
1976
+
|ctx| {
1977
+
FieldFuture::new(async move {
1978
+
let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
1829
1979
1830
-
let mut page_info = async_graphql::indexmap::IndexMap::new();
1831
-
page_info.insert(
1832
-
async_graphql::Name::new("hasNextPage"),
1833
-
GraphQLValue::from(data.has_next_page)
1834
-
);
1835
-
// For forward pagination only, hasPreviousPage is always false
1836
-
page_info.insert(
1837
-
async_graphql::Name::new("hasPreviousPage"),
1838
-
GraphQLValue::from(false)
1839
-
);
1980
+
let mut page_info = async_graphql::indexmap::IndexMap::new();
1981
+
page_info.insert(
1982
+
async_graphql::Name::new("hasNextPage"),
1983
+
GraphQLValue::from(data.has_next_page),
1984
+
);
1985
+
// For forward pagination only, hasPreviousPage is always false
1986
+
page_info.insert(
1987
+
async_graphql::Name::new("hasPreviousPage"),
1988
+
GraphQLValue::from(false),
1989
+
);
1840
1990
1841
-
// Add startCursor (first node's cid if available)
1842
-
if !data.nodes.is_empty() {
1843
-
if let Some(first_record) = data.nodes.first() {
1844
-
let start_cursor = general_purpose::URL_SAFE_NO_PAD.encode(first_record.record.cid.clone());
1991
+
// Add startCursor (first node's cid if available)
1992
+
if !data.nodes.is_empty() {
1993
+
if let Some(first_record) = data.nodes.first() {
1994
+
let start_cursor = general_purpose::URL_SAFE_NO_PAD
1995
+
.encode(first_record.record.cid.clone());
1996
+
page_info.insert(
1997
+
async_graphql::Name::new("startCursor"),
1998
+
GraphQLValue::from(start_cursor),
1999
+
);
2000
+
}
2001
+
}
2002
+
2003
+
// Add endCursor
2004
+
if let Some(ref cursor) = data.end_cursor {
1845
2005
page_info.insert(
1846
-
async_graphql::Name::new("startCursor"),
1847
-
GraphQLValue::from(start_cursor)
2006
+
async_graphql::Name::new("endCursor"),
2007
+
GraphQLValue::from(cursor.clone()),
1848
2008
);
1849
2009
}
1850
-
}
1851
2010
1852
-
// Add endCursor
1853
-
if let Some(ref cursor) = data.end_cursor {
1854
-
page_info.insert(
1855
-
async_graphql::Name::new("endCursor"),
1856
-
GraphQLValue::from(cursor.clone())
1857
-
);
1858
-
}
1859
-
1860
-
Ok(Some(FieldValue::owned_any(GraphQLValue::Object(page_info))))
1861
-
})
1862
-
}));
2011
+
Ok(Some(FieldValue::owned_any(GraphQLValue::Object(page_info))))
2012
+
})
2013
+
},
2014
+
));
1863
2015
1864
2016
// Add edges field (Relay standard)
1865
2017
let edge_type = format!("{}Edge", record_type_name);
1866
-
connection = connection.field(Field::new("edges", TypeRef::named_nn_list_nn(&edge_type), |ctx| {
1867
-
FieldFuture::new(async move {
1868
-
let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
2018
+
connection = connection.field(Field::new(
2019
+
"edges",
2020
+
TypeRef::named_nn_list_nn(&edge_type),
2021
+
|ctx| {
2022
+
FieldFuture::new(async move {
2023
+
let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
1869
2024
1870
-
let field_values: Vec<FieldValue<'_>> = data.nodes.iter()
1871
-
.map(|node| {
1872
-
// Use base64-encoded CID as cursor
1873
-
let cursor = general_purpose::URL_SAFE_NO_PAD.encode(node.record.cid.clone());
1874
-
let edge = EdgeData {
1875
-
node: node.clone(),
1876
-
cursor,
1877
-
};
1878
-
FieldValue::owned_any(edge)
1879
-
})
1880
-
.collect();
2025
+
let field_values: Vec<FieldValue<'_>> = data
2026
+
.nodes
2027
+
.iter()
2028
+
.map(|node| {
2029
+
// Use base64-encoded CID as cursor
2030
+
let cursor =
2031
+
general_purpose::URL_SAFE_NO_PAD.encode(node.record.cid.clone());
2032
+
let edge = EdgeData {
2033
+
node: node.clone(),
2034
+
cursor,
2035
+
};
2036
+
FieldValue::owned_any(edge)
2037
+
})
2038
+
.collect();
1881
2039
1882
-
Ok(Some(FieldValue::list(field_values)))
1883
-
})
1884
-
}));
2040
+
Ok(Some(FieldValue::list(field_values)))
2041
+
})
2042
+
},
2043
+
));
1885
2044
1886
2045
// Add nodes field (convenience, direct access to records without edges wrapper)
1887
-
connection = connection.field(Field::new("nodes", TypeRef::named_nn_list_nn(record_type_name), |ctx| {
1888
-
FieldFuture::new(async move {
1889
-
let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
2046
+
connection = connection.field(Field::new(
2047
+
"nodes",
2048
+
TypeRef::named_nn_list_nn(record_type_name),
2049
+
|ctx| {
2050
+
FieldFuture::new(async move {
2051
+
let data = ctx.parent_value.try_downcast_ref::<ConnectionData>()?;
1890
2052
1891
-
let field_values: Vec<FieldValue<'_>> = data.nodes.iter()
1892
-
.map(|node| FieldValue::owned_any(node.clone()))
1893
-
.collect();
2053
+
let field_values: Vec<FieldValue<'_>> = data
2054
+
.nodes
2055
+
.iter()
2056
+
.map(|node| FieldValue::owned_any(node.clone()))
2057
+
.collect();
1894
2058
1895
-
Ok(Some(FieldValue::list(field_values)))
1896
-
})
1897
-
}));
2059
+
Ok(Some(FieldValue::list(field_values)))
2060
+
})
2061
+
},
2062
+
));
1898
2063
1899
2064
connection
1900
2065
}
···
1916
2081
let slice = slice_clone.clone();
1917
2082
1918
2083
FieldFuture::new(async move {
1919
-
let did = ctx.args.get("did")
2084
+
let did = ctx
2085
+
.args
2086
+
.get("did")
1920
2087
.and_then(|v| v.string().ok())
1921
2088
.ok_or_else(|| Error::new("did argument is required"))?;
1922
2089
1923
2090
// Create sync service and call sync_user_collections
1924
2091
let cache_backend = crate::cache::CacheFactory::create_cache(
1925
-
crate::cache::CacheBackend::InMemory { ttl_seconds: None }
1926
-
).await.map_err(|e| Error::new(format!("Failed to create cache: {}", e)))?;
2092
+
crate::cache::CacheBackend::InMemory { ttl_seconds: None },
2093
+
)
2094
+
.await
2095
+
.map_err(|e| Error::new(format!("Failed to create cache: {}", e)))?;
1927
2096
let cache = Arc::new(Mutex::new(crate::cache::SliceCache::new(cache_backend)));
1928
2097
let sync_service = crate::sync::SyncService::with_cache(
1929
2098
db.clone(),
···
1939
2108
1940
2109
// Convert result to GraphQL object
1941
2110
let mut obj = async_graphql::indexmap::IndexMap::new();
1942
-
obj.insert(async_graphql::Name::new("success"), GraphQLValue::from(result.success));
1943
-
obj.insert(async_graphql::Name::new("reposProcessed"), GraphQLValue::from(result.repos_processed));
1944
-
obj.insert(async_graphql::Name::new("recordsSynced"), GraphQLValue::from(result.records_synced));
1945
-
obj.insert(async_graphql::Name::new("timedOut"), GraphQLValue::from(result.timed_out));
1946
-
obj.insert(async_graphql::Name::new("message"), GraphQLValue::from(result.message));
2111
+
obj.insert(
2112
+
async_graphql::Name::new("success"),
2113
+
GraphQLValue::from(result.success),
2114
+
);
2115
+
obj.insert(
2116
+
async_graphql::Name::new("reposProcessed"),
2117
+
GraphQLValue::from(result.repos_processed),
2118
+
);
2119
+
obj.insert(
2120
+
async_graphql::Name::new("recordsSynced"),
2121
+
GraphQLValue::from(result.records_synced),
2122
+
);
2123
+
obj.insert(
2124
+
async_graphql::Name::new("timedOut"),
2125
+
GraphQLValue::from(result.timed_out),
2126
+
);
2127
+
obj.insert(
2128
+
async_graphql::Name::new("message"),
2129
+
GraphQLValue::from(result.message),
2130
+
);
1947
2131
1948
2132
Ok(Some(FieldValue::owned_any(GraphQLValue::Object(obj))))
1949
2133
})
···
1953
2137
"did",
1954
2138
TypeRef::named_nn(TypeRef::STRING),
1955
2139
))
1956
-
.description("Sync user collections for a given DID")
2140
+
.description("Sync user collections for a given DID"),
1957
2141
);
1958
2142
1959
2143
mutation
···
1982
2166
let camel_case = nsid_to_join_field_name(nsid);
1983
2167
1984
2168
// Then pluralize the end
1985
-
if camel_case.ends_with("s") || camel_case.ends_with("x") || camel_case.ends_with("ch") || camel_case.ends_with("sh") {
2169
+
if camel_case.ends_with("s")
2170
+
|| camel_case.ends_with("x")
2171
+
|| camel_case.ends_with("ch")
2172
+
|| camel_case.ends_with("sh")
2173
+
{
1986
2174
format!("{}es", camel_case) // status -> statuses, box -> boxes
1987
2175
} else if camel_case.ends_with("y") && camel_case.len() > 1 {
1988
2176
let chars: Vec<char> = camel_case.chars().collect();
···
2027
2215
for field in fields {
2028
2216
let field_name = field.name.clone();
2029
2217
let field_name_clone = field_name.clone();
2030
-
aggregated = aggregated.field(Field::new(&field_name, TypeRef::named("JSON"), move |ctx| {
2031
-
let field_name = field_name_clone.clone();
2218
+
aggregated = aggregated.field(Field::new(
2219
+
&field_name,
2220
+
TypeRef::named("JSON"),
2221
+
move |ctx| {
2222
+
let field_name = field_name_clone.clone();
2223
+
FieldFuture::new(async move {
2224
+
let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?;
2225
+
if let Some(obj) = json_value.as_object() {
2226
+
if let Some(value) = obj.get(&field_name) {
2227
+
// Convert serde_json::Value to async_graphql::Value
2228
+
let graphql_value = serde_json_to_graphql_value(value);
2229
+
return Ok(Some(graphql_value));
2230
+
}
2231
+
}
2232
+
Ok(None)
2233
+
})
2234
+
},
2235
+
));
2236
+
}
2237
+
2238
+
// Add count field
2239
+
aggregated = aggregated.field(Field::new(
2240
+
"count",
2241
+
TypeRef::named_nn(TypeRef::INT),
2242
+
|ctx| {
2032
2243
FieldFuture::new(async move {
2033
2244
let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?;
2034
2245
if let Some(obj) = json_value.as_object() {
2035
-
if let Some(value) = obj.get(&field_name) {
2036
-
// Convert serde_json::Value to async_graphql::Value
2037
-
let graphql_value = serde_json_to_graphql_value(value);
2038
-
return Ok(Some(graphql_value));
2246
+
if let Some(count) = obj.get("count") {
2247
+
if let Some(count_i64) = count.as_i64() {
2248
+
return Ok(Some(GraphQLValue::from(count_i64 as i32)));
2249
+
}
2039
2250
}
2040
2251
}
2041
-
Ok(None)
2252
+
Ok(Some(GraphQLValue::from(0)))
2042
2253
})
2043
-
}));
2044
-
}
2045
-
2046
-
// Add count field
2047
-
aggregated = aggregated.field(Field::new("count", TypeRef::named_nn(TypeRef::INT), |ctx| {
2048
-
FieldFuture::new(async move {
2049
-
let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?;
2050
-
if let Some(obj) = json_value.as_object() {
2051
-
if let Some(count) = obj.get("count") {
2052
-
if let Some(count_i64) = count.as_i64() {
2053
-
return Ok(Some(GraphQLValue::from(count_i64 as i32)));
2054
-
}
2055
-
}
2056
-
}
2057
-
Ok(Some(GraphQLValue::from(0)))
2058
-
})
2059
-
}));
2254
+
},
2255
+
));
2060
2256
2061
2257
aggregated
2062
2258
}
···
2113
2309
fn create_record_update_type() -> Object {
2114
2310
let mut record_update = Object::new("RecordUpdate");
2115
2311
2116
-
record_update = record_update.field(Field::new("uri", TypeRef::named_nn(TypeRef::STRING), |ctx| {
2117
-
FieldFuture::new(async move {
2118
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
2119
-
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2120
-
if let GraphQLValue::Object(obj) = value {
2121
-
if let Some(uri) = obj.get("uri") {
2122
-
return Ok(Some(uri.clone()));
2312
+
record_update = record_update.field(Field::new(
2313
+
"uri",
2314
+
TypeRef::named_nn(TypeRef::STRING),
2315
+
|ctx| {
2316
+
FieldFuture::new(async move {
2317
+
let value = ctx
2318
+
.parent_value
2319
+
.downcast_ref::<GraphQLValue>()
2320
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2321
+
if let GraphQLValue::Object(obj) = value {
2322
+
if let Some(uri) = obj.get("uri") {
2323
+
return Ok(Some(uri.clone()));
2324
+
}
2123
2325
}
2124
-
}
2125
-
Ok(None)
2126
-
})
2127
-
}));
2326
+
Ok(None)
2327
+
})
2328
+
},
2329
+
));
2128
2330
2129
-
record_update = record_update.field(Field::new("cid", TypeRef::named_nn(TypeRef::STRING), |ctx| {
2130
-
FieldFuture::new(async move {
2131
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
2132
-
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2133
-
if let GraphQLValue::Object(obj) = value {
2134
-
if let Some(cid) = obj.get("cid") {
2135
-
return Ok(Some(cid.clone()));
2331
+
record_update = record_update.field(Field::new(
2332
+
"cid",
2333
+
TypeRef::named_nn(TypeRef::STRING),
2334
+
|ctx| {
2335
+
FieldFuture::new(async move {
2336
+
let value = ctx
2337
+
.parent_value
2338
+
.downcast_ref::<GraphQLValue>()
2339
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2340
+
if let GraphQLValue::Object(obj) = value {
2341
+
if let Some(cid) = obj.get("cid") {
2342
+
return Ok(Some(cid.clone()));
2343
+
}
2136
2344
}
2137
-
}
2138
-
Ok(None)
2139
-
})
2140
-
}));
2345
+
Ok(None)
2346
+
})
2347
+
},
2348
+
));
2141
2349
2142
-
record_update = record_update.field(Field::new("did", TypeRef::named_nn(TypeRef::STRING), |ctx| {
2143
-
FieldFuture::new(async move {
2144
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
2145
-
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2146
-
if let GraphQLValue::Object(obj) = value {
2147
-
if let Some(did) = obj.get("did") {
2148
-
return Ok(Some(did.clone()));
2350
+
record_update = record_update.field(Field::new(
2351
+
"did",
2352
+
TypeRef::named_nn(TypeRef::STRING),
2353
+
|ctx| {
2354
+
FieldFuture::new(async move {
2355
+
let value = ctx
2356
+
.parent_value
2357
+
.downcast_ref::<GraphQLValue>()
2358
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2359
+
if let GraphQLValue::Object(obj) = value {
2360
+
if let Some(did) = obj.get("did") {
2361
+
return Ok(Some(did.clone()));
2362
+
}
2149
2363
}
2150
-
}
2151
-
Ok(None)
2152
-
})
2153
-
}));
2364
+
Ok(None)
2365
+
})
2366
+
},
2367
+
));
2154
2368
2155
-
record_update = record_update.field(Field::new("collection", TypeRef::named_nn(TypeRef::STRING), |ctx| {
2156
-
FieldFuture::new(async move {
2157
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
2158
-
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2159
-
if let GraphQLValue::Object(obj) = value {
2160
-
if let Some(collection) = obj.get("collection") {
2161
-
return Ok(Some(collection.clone()));
2369
+
record_update = record_update.field(Field::new(
2370
+
"collection",
2371
+
TypeRef::named_nn(TypeRef::STRING),
2372
+
|ctx| {
2373
+
FieldFuture::new(async move {
2374
+
let value = ctx
2375
+
.parent_value
2376
+
.downcast_ref::<GraphQLValue>()
2377
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2378
+
if let GraphQLValue::Object(obj) = value {
2379
+
if let Some(collection) = obj.get("collection") {
2380
+
return Ok(Some(collection.clone()));
2381
+
}
2162
2382
}
2163
-
}
2164
-
Ok(None)
2165
-
})
2166
-
}));
2383
+
Ok(None)
2384
+
})
2385
+
},
2386
+
));
2167
2387
2168
-
record_update = record_update.field(Field::new("indexedAt", TypeRef::named_nn(TypeRef::STRING), |ctx| {
2169
-
FieldFuture::new(async move {
2170
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
2171
-
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2172
-
if let GraphQLValue::Object(obj) = value {
2173
-
if let Some(indexed_at) = obj.get("indexedAt") {
2174
-
return Ok(Some(indexed_at.clone()));
2388
+
record_update = record_update.field(Field::new(
2389
+
"indexedAt",
2390
+
TypeRef::named_nn(TypeRef::STRING),
2391
+
|ctx| {
2392
+
FieldFuture::new(async move {
2393
+
let value = ctx
2394
+
.parent_value
2395
+
.downcast_ref::<GraphQLValue>()
2396
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2397
+
if let GraphQLValue::Object(obj) = value {
2398
+
if let Some(indexed_at) = obj.get("indexedAt") {
2399
+
return Ok(Some(indexed_at.clone()));
2400
+
}
2175
2401
}
2176
-
}
2177
-
Ok(None)
2178
-
})
2179
-
}));
2402
+
Ok(None)
2403
+
})
2404
+
},
2405
+
));
2180
2406
2181
-
record_update = record_update.field(Field::new("operation", TypeRef::named_nn(TypeRef::STRING), |ctx| {
2182
-
FieldFuture::new(async move {
2183
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
2184
-
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2185
-
if let GraphQLValue::Object(obj) = value {
2186
-
if let Some(operation) = obj.get("operation") {
2187
-
return Ok(Some(operation.clone()));
2407
+
record_update = record_update.field(Field::new(
2408
+
"operation",
2409
+
TypeRef::named_nn(TypeRef::STRING),
2410
+
|ctx| {
2411
+
FieldFuture::new(async move {
2412
+
let value = ctx
2413
+
.parent_value
2414
+
.downcast_ref::<GraphQLValue>()
2415
+
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2416
+
if let GraphQLValue::Object(obj) = value {
2417
+
if let Some(operation) = obj.get("operation") {
2418
+
return Ok(Some(operation.clone()));
2419
+
}
2188
2420
}
2189
-
}
2190
-
Ok(None)
2191
-
})
2192
-
}));
2421
+
Ok(None)
2422
+
})
2423
+
},
2424
+
));
2193
2425
2194
2426
record_update = record_update.field(Field::new("value", TypeRef::named_nn("JSON"), |ctx| {
2195
2427
FieldFuture::new(async move {
2196
-
let value = ctx.parent_value.downcast_ref::<GraphQLValue>()
2428
+
let value = ctx
2429
+
.parent_value
2430
+
.downcast_ref::<GraphQLValue>()
2197
2431
.ok_or_else(|| Error::new("Failed to downcast RecordUpdate"))?;
2198
2432
if let GraphQLValue::Object(obj) = value {
2199
2433
if let Some(val) = obj.get("value") {
···
2370
2604
}
2371
2605
2372
2606
/// Helper function to parse GraphQL where clause recursively
2373
-
fn parse_where_clause(where_obj: async_graphql::dynamic::ObjectAccessor) -> crate::models::WhereClause {
2607
+
fn parse_where_clause(
2608
+
where_obj: async_graphql::dynamic::ObjectAccessor,
2609
+
) -> crate::models::WhereClause {
2374
2610
let mut where_clause = crate::models::WhereClause {
2375
2611
conditions: HashMap::new(),
2376
2612
or_conditions: None,
···
2419
2655
eq: None,
2420
2656
in_values: None,
2421
2657
contains: None,
2658
+
fuzzy: None,
2422
2659
gt: None,
2423
2660
gte: None,
2424
2661
lt: None,
···
2456
2693
}
2457
2694
}
2458
2695
2696
+
// Parse fuzzy condition
2697
+
if let Some(fuzzy_val) = condition_obj.get("fuzzy") {
2698
+
if let Ok(fuzzy_str) = fuzzy_val.string() {
2699
+
where_condition.fuzzy = Some(fuzzy_str.to_string());
2700
+
}
2701
+
}
2702
+
2459
2703
// Parse gt condition
2460
2704
if let Some(gt_val) = condition_obj.get("gt") {
2461
2705
if let Ok(gt_str) = gt_val.string() {
···
2499
2743
field_str.to_string()
2500
2744
};
2501
2745
2502
-
where_clause.conditions.insert(db_field_name, where_condition);
2746
+
where_clause
2747
+
.conditions
2748
+
.insert(db_field_name, where_condition);
2503
2749
}
2504
2750
}
2505
2751
+8
-18
api/src/graphql/types.rs
+8
-18
api/src/graphql/types.rs
···
33
33
}
34
34
35
35
/// Maps AT Protocol lexicon type to GraphQL type
36
-
pub fn map_lexicon_type_to_graphql(
37
-
type_name: &str,
38
-
lexicon_def: &Value,
39
-
) -> GraphQLType {
36
+
pub fn map_lexicon_type_to_graphql(type_name: &str, lexicon_def: &Value) -> GraphQLType {
40
37
match type_name {
41
38
"string" => GraphQLType::String,
42
39
"integer" => GraphQLType::Int,
···
74
71
GraphQLType::Array(Box::new(item_type))
75
72
}
76
73
"object" => {
77
-
let properties = lexicon_def
78
-
.get("properties")
79
-
.and_then(|p| p.as_object());
74
+
let properties = lexicon_def.get("properties").and_then(|p| p.as_object());
80
75
81
76
let required_fields: Vec<String> = lexicon_def
82
77
.get("required")
···
105
100
GraphQLField {
106
101
name: field_name.clone(),
107
102
format,
108
-
field_type: map_lexicon_type_to_graphql(
109
-
field_type_name,
110
-
field_def,
111
-
),
103
+
field_type: map_lexicon_type_to_graphql(field_type_name, field_def),
112
104
is_required: required_fields.contains(&field_name.to_string()),
113
105
}
114
106
})
···
119
111
GraphQLType::Json
120
112
}
121
113
}
122
-
"union" => {
123
-
GraphQLType::Union
124
-
}
114
+
"union" => GraphQLType::Union,
125
115
_ => GraphQLType::Json,
126
116
}
127
117
}
128
118
129
119
/// Extract collection schema from lexicon definitions
130
-
pub fn extract_collection_fields(
131
-
lexicon_defs: &Value,
132
-
) -> Vec<GraphQLField> {
120
+
pub fn extract_collection_fields(lexicon_defs: &Value) -> Vec<GraphQLField> {
133
121
let main_def = lexicon_defs
134
122
.get("main")
135
123
.or_else(|| lexicon_defs.get("record"));
···
153
141
.and_then(|t| t.as_str())
154
142
.unwrap_or("object");
155
143
156
-
if let GraphQLType::Object(fields) = map_lexicon_type_to_graphql(object_type_name, object_def) {
144
+
if let GraphQLType::Object(fields) =
145
+
map_lexicon_type_to_graphql(object_type_name, object_def)
146
+
{
157
147
return fields;
158
148
}
159
149
}
+6
-3
api/src/jetstream.rs
+6
-3
api/src/jetstream.rs
···
14
14
use crate::cache::{CacheBackend, CacheFactory, SliceCache};
15
15
use crate::database::Database;
16
16
use crate::errors::JetstreamError;
17
-
use crate::graphql::{RecordOperation, RecordUpdateEvent, PUBSUB};
17
+
use crate::graphql::{PUBSUB, RecordOperation, RecordUpdateEvent};
18
18
use crate::jetstream_cursor::PostgresCursorHandler;
19
19
use crate::logging::{LogLevel, Logger};
20
20
use crate::models::{Actor, Record};
···
324
324
325
325
// Check if this is a primary collection (starts with slice domain)
326
326
// Lexicon records for this slice are always treated as primary
327
-
let is_primary_collection = commit.collection.starts_with(&domain) || is_lexicon_for_this_slice;
327
+
let is_primary_collection =
328
+
commit.collection.starts_with(&domain) || is_lexicon_for_this_slice;
328
329
329
330
// For external collections, check actor status BEFORE expensive validation
330
331
if !is_primary_collection {
···
428
429
};
429
430
430
431
// Insert into database
431
-
if let Err(e) = self.database.batch_insert_actors(&[actor]).await {
432
+
if let Err(e) =
433
+
self.database.batch_insert_actors(&[actor]).await
434
+
{
432
435
error!("Failed to create actor {}: {}", did, e);
433
436
} else {
434
437
// Add to cache after successful database insert
+7
-7
api/src/main.rs
+7
-7
api/src/main.rs
···
243
243
e,
244
244
retry_delay
245
245
);
246
-
jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed);
246
+
jetstream_connected_clone
247
+
.store(false, std::sync::atomic::Ordering::Relaxed);
247
248
tokio::time::sleep(retry_delay).await;
248
249
retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
249
250
continue;
···
261
262
match consumer_arc.start_consuming(cancellation_token).await {
262
263
Ok(_) => {
263
264
tracing::info!("Jetstream consumer shut down normally");
264
-
jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed);
265
+
jetstream_connected_clone
266
+
.store(false, std::sync::atomic::Ordering::Relaxed);
265
267
}
266
268
Err(e) => {
267
269
tracing::error!("Jetstream consumer failed: {} - will reconnect", e);
268
-
jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed);
270
+
jetstream_connected_clone
271
+
.store(false, std::sync::atomic::Ordering::Relaxed);
269
272
tokio::time::sleep(retry_delay).await;
270
273
retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
271
274
}
···
398
401
"/graphql",
399
402
get(graphql::graphql_playground).post(graphql::graphql_handler),
400
403
)
401
-
.route(
402
-
"/graphql/ws",
403
-
get(graphql::graphql_subscription_handler),
404
-
)
404
+
.route("/graphql/ws", get(graphql::graphql_subscription_handler))
405
405
// Dynamic collection-specific XRPC endpoints (wildcard routes must come last)
406
406
.route(
407
407
"/xrpc/{*method}",
+44
-16
api/src/sync.rs
+44
-16
api/src/sync.rs
···
253
253
// First, get all repos from primary collections
254
254
let mut primary_repos = std::collections::HashSet::new();
255
255
for collection in &primary_collections {
256
-
match self.get_repos_for_collection(collection, slice_uri, max_repos).await {
256
+
match self
257
+
.get_repos_for_collection(collection, slice_uri, max_repos)
258
+
.await
259
+
{
257
260
Ok(repos) => {
258
261
info!(
259
262
"Found {} repositories for primary collection \"{}\"",
···
465
468
match database.batch_insert_records(&batch).await {
466
469
Ok(_) => {
467
470
write_count += batch_size;
468
-
info!("Database writer: Inserted batch of {} records (total: {})", batch_size, write_count);
471
+
info!(
472
+
"Database writer: Inserted batch of {} records (total: {})",
473
+
batch_size, write_count
474
+
);
469
475
}
470
476
Err(e) => {
471
477
error!("Database writer: Failed to insert batch: {}", e);
···
611
617
612
618
// Send batch to writer when buffer is full
613
619
if batch_buffer.len() >= BATCH_SIZE {
614
-
let batch_to_send = std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE));
620
+
let batch_to_send =
621
+
std::mem::replace(&mut batch_buffer, Vec::with_capacity(BATCH_SIZE));
615
622
let batch_count = batch_to_send.len() as i64;
616
-
info!("Sending batch of {} records to database writer", batch_count);
623
+
info!(
624
+
"Sending batch of {} records to database writer",
625
+
batch_count
626
+
);
617
627
618
628
// Send to writer channel (non-blocking)
619
629
if let Err(e) = tx.send(batch_to_send).await {
620
630
error!("Failed to send batch to writer: {}", e);
621
-
return Err(SyncError::Generic(format!("Failed to send batch to writer: {}", e)));
631
+
return Err(SyncError::Generic(format!(
632
+
"Failed to send batch to writer: {}",
633
+
e
634
+
)));
622
635
}
623
636
624
637
let mut total = total_indexed_records.lock().await;
···
629
642
// Flush any remaining records in the buffer
630
643
if !batch_buffer.is_empty() {
631
644
let batch_count = batch_buffer.len() as i64;
632
-
info!("Sending final batch of {} records to database writer", batch_count);
645
+
info!(
646
+
"Sending final batch of {} records to database writer",
647
+
batch_count
648
+
);
633
649
634
650
if let Err(e) = tx.send(batch_buffer).await {
635
651
error!("Failed to send final batch to writer: {}", e);
636
-
return Err(SyncError::Generic(format!("Failed to send final batch to writer: {}", e)));
652
+
return Err(SyncError::Generic(format!(
653
+
"Failed to send final batch to writer: {}",
654
+
e
655
+
)));
637
656
}
638
657
639
658
let mut total = total_indexed_records.lock().await;
···
642
661
643
662
// Close the channel and wait for writer to finish
644
663
drop(tx);
645
-
let write_result = writer_task.await
664
+
let write_result = writer_task
665
+
.await
646
666
.map_err(|e| SyncError::Generic(format!("Writer task panicked: {}", e)))?;
647
667
648
668
let final_count = match write_result {
···
655
675
successful_tasks, failed_tasks
656
676
);
657
677
658
-
info!(
659
-
"Indexed {} new/changed records in batches",
660
-
final_count
661
-
);
678
+
info!("Indexed {} new/changed records in batches", final_count);
662
679
663
680
info!("Backfill complete!");
664
681
···
699
716
if page_count > max_pages {
700
717
warn!(
701
718
"Reached maximum page limit ({}) for collection {} (based on repo limit {:?}, estimated max {} repos at {} per page)",
702
-
max_pages, collection, max_repos, max_pages * REPOS_PER_PAGE, REPOS_PER_PAGE
719
+
max_pages,
720
+
collection,
721
+
max_repos,
722
+
max_pages * REPOS_PER_PAGE,
723
+
REPOS_PER_PAGE
703
724
);
704
725
break;
705
726
}
···
979
1000
const CHUNK_SIZE: usize = 50; // Process DIDs in chunks
980
1001
const MAX_CONCURRENT: usize = 10; // Limit concurrent resolutions
981
1002
982
-
info!("Resolving ATP data for {} repositories in chunks", repos.len());
1003
+
info!(
1004
+
"Resolving ATP data for {} repositories in chunks",
1005
+
repos.len()
1006
+
);
983
1007
984
1008
for (chunk_idx, chunk) in repos.chunks(CHUNK_SIZE).enumerate() {
985
1009
let chunk_start = chunk_idx * CHUNK_SIZE;
···
1031
1055
}
1032
1056
}
1033
1057
1034
-
info!("Successfully resolved ATP data for {}/{} repositories", atp_map.len(), repos.len());
1058
+
info!(
1059
+
"Successfully resolved ATP data for {}/{} repositories",
1060
+
atp_map.len(),
1061
+
repos.len()
1062
+
);
1035
1063
Ok(atp_map)
1036
1064
}
1037
1065
···
1193
1221
Some(&external_collections),
1194
1222
Some(&[user_did.to_string()]), // Only sync this user's repos
1195
1223
false, // Always validate user collections
1196
-
None, // No limit for user-specific sync
1224
+
None, // No limit for user-specific sync
1197
1225
)
1198
1226
.await
1199
1227
};
+2
-2
api/src/xrpc/network/slices/slice/clear_slice_records.rs
+2
-2
api/src/xrpc/network/slices/slice/clear_slice_records.rs
+1
-6
api/src/xrpc/network/slices/slice/get_actors.rs
+1
-6
api/src/xrpc/network/slices/slice/get_actors.rs
+9
-2
api/src/xrpc/network/slices/slice/get_sync_summary.rs
+9
-2
api/src/xrpc/network/slices/slice/get_sync_summary.rs
···
1
1
use crate::{AppState, auth, errors::AppError, sync::SyncService};
2
-
use axum::{extract::{Query, State}, http::HeaderMap, response::Json};
2
+
use axum::{
3
+
extract::{Query, State},
4
+
http::HeaderMap,
5
+
response::Json,
6
+
};
3
7
use serde::{Deserialize, Serialize};
4
8
use std::collections::HashMap;
5
9
···
144
148
145
149
// First, get repos ONLY from primary collections
146
150
for collection in &primary_collections {
147
-
match sync_service.get_repos_for_collection(collection, slice_uri, Some(applied_limit)).await {
151
+
match sync_service
152
+
.get_repos_for_collection(collection, slice_uri, Some(applied_limit))
153
+
.await
154
+
{
148
155
Ok(repos) => {
149
156
counts.insert(collection.clone(), repos.len() as i64);
150
157
discovered_repos.extend(repos);
+37
docs/graphql-api.md
+37
docs/graphql-api.md
···
79
79
- `eq`: Exact match
80
80
- `in`: Match any value in array
81
81
- `contains`: Substring match (case-insensitive)
82
+
- `fuzzy`: Fuzzy/similarity match (typo-tolerant)
82
83
- `gt`: Greater than (lexicographic)
83
84
- `gte`: Greater than or equal to
84
85
- `lt`: Less than
···
100
101
- `gte`: At or after datetime
101
102
- `lt`: Before datetime
102
103
- `lte`: At or before datetime
104
+
105
+
#### Fuzzy Matching Example
106
+
107
+
The `fuzzy` filter uses PostgreSQL's trigram similarity for typo-tolerant search:
108
+
109
+
```graphql
110
+
query FuzzySearch {
111
+
fmTealAlphaFeedPlays(
112
+
where: {
113
+
trackName: { fuzzy: "love" }
114
+
}
115
+
) {
116
+
edges {
117
+
node {
118
+
trackName
119
+
artists
120
+
}
121
+
}
122
+
}
123
+
}
124
+
```
125
+
126
+
This will match track names like:
127
+
- "Love" (exact)
128
+
- "Love Song"
129
+
- "Lovely"
130
+
- "I Love You"
131
+
- "Lover"
132
+
- "Loveless"
133
+
134
+
The fuzzy filter is great for:
135
+
- Handling typos and misspellings
136
+
- Finding similar variations of text
137
+
- Flexible search without exact matching
138
+
139
+
**Note**: Fuzzy matching works on the similarity between strings (using trigrams), so it's more flexible than `contains` but may return unexpected matches if the similarity threshold is met.
103
140
104
141
#### Date Range Example
105
142