+161
api/src/database/records.rs
+161
api/src/database/records.rs
···
10
10
use super::types::{SortField, WhereClause};
11
11
use crate::errors::DatabaseError;
12
12
use crate::models::{IndexedRecord, Record};
13
+
use sqlx::Row;
13
14
14
15
impl Database {
15
16
/// Inserts a single record into the database.
···
459
460
460
461
let count = query_builder.fetch_one(&self.pool).await?;
461
462
Ok(count)
463
+
}
464
+
465
+
/// Queries aggregated records with GROUP BY support.
466
+
///
467
+
/// # Arguments
468
+
/// * `slice_uri` - AT-URI of the slice to query
469
+
/// * `group_by_fields` - JSON paths to group by (e.g., ["releaseMbId", "releaseName"])
470
+
/// * `where_clause` - Optional WHERE conditions
471
+
/// * `order_by_count` - Optional ordering ("asc" or "desc")
472
+
/// * `limit` - Maximum number of groups to return
473
+
///
474
+
/// # Returns
475
+
/// Vec of (field_values, count) tuples
476
+
pub async fn get_aggregated_records(
477
+
&self,
478
+
slice_uri: &str,
479
+
group_by_fields: &[String],
480
+
where_clause: Option<&WhereClause>,
481
+
order_by_count: Option<&str>,
482
+
limit: Option<i32>,
483
+
) -> Result<Vec<serde_json::Value>, DatabaseError> {
484
+
if group_by_fields.is_empty() {
485
+
return Ok(Vec::new());
486
+
}
487
+
488
+
let limit = limit.unwrap_or(50).min(1000);
489
+
let mut param_count = 1;
490
+
491
+
// Build SELECT clause with JSON field extraction
492
+
let select_fields: Vec<String> = group_by_fields
493
+
.iter()
494
+
.enumerate()
495
+
.map(|(i, field)| {
496
+
// Check if it's a table column
497
+
if matches!(field.as_str(), "did" | "collection" | "uri" | "cid" | "indexed_at") {
498
+
format!("\"{}\" as field_{}", field, i)
499
+
} else {
500
+
// JSON field
501
+
format!("json->>'{}' as field_{}", field, i)
502
+
}
503
+
})
504
+
.collect();
505
+
506
+
let select_clause = format!("{}, COUNT(*) as count", select_fields.join(", "));
507
+
508
+
// Build GROUP BY clause
509
+
let group_by_clause: Vec<String> = (0..group_by_fields.len())
510
+
.map(|i| format!("field_{}", i))
511
+
.collect();
512
+
513
+
// Build WHERE clause
514
+
let mut where_clauses = vec![format!("slice_uri = ${}", param_count)];
515
+
param_count += 1;
516
+
517
+
let (and_conditions, or_conditions) =
518
+
build_where_conditions(where_clause, &mut param_count);
519
+
where_clauses.extend(and_conditions);
520
+
521
+
if !or_conditions.is_empty() {
522
+
let or_clause = format!("({})", or_conditions.join(" OR "));
523
+
where_clauses.push(or_clause);
524
+
}
525
+
526
+
let where_sql = format!(" WHERE {}", where_clauses.join(" AND "));
527
+
528
+
// Build ORDER BY clause
529
+
let order_by_sql = match order_by_count {
530
+
Some("asc") => " ORDER BY count ASC",
531
+
Some("desc") | Some(_) | None => " ORDER BY count DESC",
532
+
};
533
+
534
+
let query = format!(
535
+
"SELECT {} FROM record{} GROUP BY {} {} LIMIT {}",
536
+
select_clause, where_sql, group_by_clause.join(", "), order_by_sql, limit
537
+
);
538
+
539
+
let mut query_builder = sqlx::query(&query);
540
+
query_builder = query_builder.bind(slice_uri);
541
+
542
+
// Bind WHERE parameters manually
543
+
if let Some(clause) = where_clause {
544
+
for condition in clause.conditions.values() {
545
+
if let Some(eq_value) = &condition.eq {
546
+
if let Some(str_val) = eq_value.as_str() {
547
+
query_builder = query_builder.bind(str_val);
548
+
} else {
549
+
query_builder = query_builder.bind(eq_value.to_string());
550
+
}
551
+
}
552
+
if let Some(in_values) = &condition.in_values {
553
+
let str_values: Vec<String> = in_values
554
+
.iter()
555
+
.filter_map(|v| v.as_str().map(|s| s.to_string()))
556
+
.collect();
557
+
query_builder = query_builder.bind(str_values);
558
+
}
559
+
if let Some(contains_value) = &condition.contains {
560
+
query_builder = query_builder.bind(contains_value);
561
+
}
562
+
}
563
+
564
+
if let Some(or_conditions) = &clause.or_conditions {
565
+
for condition in or_conditions.values() {
566
+
if let Some(eq_value) = &condition.eq {
567
+
if let Some(str_val) = eq_value.as_str() {
568
+
query_builder = query_builder.bind(str_val);
569
+
} else {
570
+
query_builder = query_builder.bind(eq_value.to_string());
571
+
}
572
+
}
573
+
if let Some(in_values) = &condition.in_values {
574
+
let str_values: Vec<String> = in_values
575
+
.iter()
576
+
.filter_map(|v| v.as_str().map(|s| s.to_string()))
577
+
.collect();
578
+
query_builder = query_builder.bind(str_values);
579
+
}
580
+
if let Some(contains_value) = &condition.contains {
581
+
query_builder = query_builder.bind(contains_value);
582
+
}
583
+
}
584
+
}
585
+
}
586
+
587
+
let rows = query_builder.fetch_all(&self.pool).await?;
588
+
589
+
// Convert rows to JSON objects
590
+
let mut results = Vec::new();
591
+
for row in rows {
592
+
let mut obj = serde_json::Map::new();
593
+
594
+
// Extract grouped field values
595
+
for (i, field_name) in group_by_fields.iter().enumerate() {
596
+
let col_name = format!("field_{}", i);
597
+
let value: Option<String> = row.try_get(col_name.as_str()).ok();
598
+
599
+
// Try to parse as JSON first (for arrays/objects), otherwise use as string
600
+
let json_value = if let Some(ref str_val) = value {
601
+
// Check if it looks like JSON (starts with [ or {)
602
+
if str_val.starts_with('[') || str_val.starts_with('{') {
603
+
// Try to parse as JSON
604
+
serde_json::from_str(str_val).unwrap_or_else(|_| serde_json::Value::String(str_val.clone()))
605
+
} else {
606
+
serde_json::Value::String(str_val.clone())
607
+
}
608
+
} else {
609
+
serde_json::Value::Null
610
+
};
611
+
612
+
obj.insert(field_name.clone(), json_value);
613
+
}
614
+
615
+
// Extract count
616
+
let count: i64 = row.try_get("count").unwrap_or(0);
617
+
obj.insert("count".to_string(), serde_json::Value::Number(count.into()));
618
+
619
+
results.push(serde_json::Value::Object(obj));
620
+
}
621
+
622
+
Ok(results)
462
623
}
463
624
464
625
/// Deletes a record by URI.
+293
api/src/graphql/schema_builder.rs
+293
api/src/graphql/schema_builder.rs
···
351
351
))
352
352
.description(format!("Query {} records", nsid)),
353
353
);
354
+
355
+
// Add aggregated query field for this collection
356
+
let aggregated_query_name = format!("{}Aggregated", collection_query_name);
357
+
let aggregated_type_name = format!("{}Aggregated", &type_name);
358
+
359
+
// Create aggregated type
360
+
let aggregated_type = create_aggregated_type(&aggregated_type_name, &fields);
361
+
objects_to_register.push(aggregated_type);
362
+
363
+
let db_clone_agg = database.clone();
364
+
let slice_clone_agg = slice_uri.clone();
365
+
let nsid_clone_agg = nsid.to_string();
366
+
367
+
query = query.field(
368
+
Field::new(
369
+
&aggregated_query_name,
370
+
TypeRef::named_nn_list_nn(&aggregated_type_name),
371
+
move |ctx| {
372
+
let db = db_clone_agg.clone();
373
+
let slice = slice_clone_agg.clone();
374
+
let collection = nsid_clone_agg.clone();
375
+
376
+
FieldFuture::new(async move {
377
+
// Parse groupBy argument
378
+
let group_by_fields: Vec<String> = match ctx.args.get("groupBy") {
379
+
Some(val) => {
380
+
if let Ok(list) = val.list() {
381
+
list.iter()
382
+
.filter_map(|v| v.string().ok().map(|s| s.to_string()))
383
+
.collect()
384
+
} else {
385
+
Vec::new()
386
+
}
387
+
}
388
+
None => Vec::new(),
389
+
};
390
+
391
+
if group_by_fields.is_empty() {
392
+
return Err(Error::new("groupBy is required for aggregated queries"));
393
+
}
394
+
395
+
// Parse limit argument
396
+
let limit: i32 = match ctx.args.get("limit") {
397
+
Some(val) => val.i64().unwrap_or(50) as i32,
398
+
None => 50,
399
+
};
400
+
401
+
// Parse orderBy argument
402
+
let order_by_count: Option<String> = match ctx.args.get("orderBy") {
403
+
Some(val) => {
404
+
if let Ok(obj) = val.object() {
405
+
obj.get("count")
406
+
.and_then(|v| v.string().ok())
407
+
.map(|s| s.to_string())
408
+
} else {
409
+
None
410
+
}
411
+
}
412
+
None => None,
413
+
};
414
+
415
+
// Build where clause for this collection
416
+
let mut where_clause = crate::models::WhereClause {
417
+
conditions: HashMap::new(),
418
+
or_conditions: None,
419
+
};
420
+
421
+
// Always filter by collection
422
+
where_clause.conditions.insert(
423
+
"collection".to_string(),
424
+
crate::models::WhereCondition {
425
+
eq: Some(serde_json::Value::String(collection.clone())),
426
+
in_values: None,
427
+
contains: None,
428
+
},
429
+
);
430
+
431
+
// Parse where argument if provided
432
+
if let Some(where_val) = ctx.args.get("where") {
433
+
if let Ok(where_obj) = where_val.object() {
434
+
for (field_name, condition_val) in where_obj.iter() {
435
+
if let Ok(condition_obj) = condition_val.object() {
436
+
let mut where_condition = crate::models::WhereCondition {
437
+
eq: None,
438
+
in_values: None,
439
+
contains: None,
440
+
};
441
+
442
+
// Parse eq condition
443
+
if let Some(eq_val) = condition_obj.get("eq") {
444
+
if let Ok(eq_str) = eq_val.string() {
445
+
where_condition.eq = Some(serde_json::Value::String(eq_str.to_string()));
446
+
} else if let Ok(eq_i64) = eq_val.i64() {
447
+
where_condition.eq = Some(serde_json::Value::Number(eq_i64.into()));
448
+
}
449
+
}
450
+
451
+
// Parse in condition
452
+
if let Some(in_val) = condition_obj.get("in") {
453
+
if let Ok(in_list) = in_val.list() {
454
+
let mut values = Vec::new();
455
+
for item in in_list.iter() {
456
+
if let Ok(s) = item.string() {
457
+
values.push(serde_json::Value::String(s.to_string()));
458
+
} else if let Ok(i) = item.i64() {
459
+
values.push(serde_json::Value::Number(i.into()));
460
+
}
461
+
}
462
+
where_condition.in_values = Some(values);
463
+
}
464
+
}
465
+
466
+
// Parse contains condition
467
+
if let Some(contains_val) = condition_obj.get("contains") {
468
+
if let Ok(contains_str) = contains_val.string() {
469
+
where_condition.contains = Some(contains_str.to_string());
470
+
}
471
+
}
472
+
473
+
where_clause.conditions.insert(field_name.to_string(), where_condition);
474
+
}
475
+
}
476
+
}
477
+
}
478
+
479
+
// Resolve actorHandle to did if present
480
+
if let Some(actor_handle_condition) = where_clause.conditions.remove("actorHandle") {
481
+
let mut handles = Vec::new();
482
+
if let Some(eq_value) = &actor_handle_condition.eq {
483
+
if let Some(handle_str) = eq_value.as_str() {
484
+
handles.push(handle_str.to_string());
485
+
}
486
+
}
487
+
if let Some(in_values) = &actor_handle_condition.in_values {
488
+
for value in in_values {
489
+
if let Some(handle_str) = value.as_str() {
490
+
handles.push(handle_str.to_string());
491
+
}
492
+
}
493
+
}
494
+
495
+
if !handles.is_empty() {
496
+
match db.resolve_handles_to_dids(&handles, &slice).await {
497
+
Ok(dids) => {
498
+
if !dids.is_empty() {
499
+
let did_condition = if dids.len() == 1 {
500
+
crate::models::WhereCondition {
501
+
eq: Some(serde_json::Value::String(dids[0].clone())),
502
+
in_values: None,
503
+
contains: None,
504
+
}
505
+
} else {
506
+
crate::models::WhereCondition {
507
+
eq: None,
508
+
in_values: Some(dids.into_iter().map(|d| serde_json::Value::String(d)).collect()),
509
+
contains: None,
510
+
}
511
+
};
512
+
where_clause.conditions.insert("did".to_string(), did_condition);
513
+
}
514
+
}
515
+
Err(_) => {
516
+
// If resolution fails, skip the condition
517
+
}
518
+
}
519
+
}
520
+
}
521
+
522
+
// Query database for aggregated records
523
+
let results = db
524
+
.get_aggregated_records(
525
+
&slice,
526
+
&group_by_fields,
527
+
Some(&where_clause),
528
+
order_by_count.as_deref(),
529
+
Some(limit),
530
+
)
531
+
.await
532
+
.map_err(|e| {
533
+
Error::new(format!("Aggregation query failed: {}", e))
534
+
})?;
535
+
536
+
// Convert JSON values to GraphQL values
537
+
let field_values: Vec<FieldValue<'_>> = results
538
+
.into_iter()
539
+
.map(|json_val| FieldValue::owned_any(json_val))
540
+
.collect();
541
+
542
+
Ok(Some(FieldValue::list(field_values)))
543
+
})
544
+
},
545
+
)
546
+
.argument(async_graphql::dynamic::InputValue::new(
547
+
"groupBy",
548
+
TypeRef::named_nn_list_nn(TypeRef::STRING),
549
+
))
550
+
.argument(async_graphql::dynamic::InputValue::new(
551
+
"where",
552
+
TypeRef::named("JSON"),
553
+
))
554
+
.argument(async_graphql::dynamic::InputValue::new(
555
+
"orderBy",
556
+
TypeRef::named("AggregationOrderBy"),
557
+
))
558
+
.argument(async_graphql::dynamic::InputValue::new(
559
+
"limit",
560
+
TypeRef::named(TypeRef::INT),
561
+
))
562
+
.description(format!("Aggregated query for {} records with GROUP BY support", nsid)),
563
+
);
354
564
}
355
565
}
356
566
···
388
598
389
599
let int_condition_input = create_int_condition_input();
390
600
schema_builder = schema_builder.register(int_condition_input);
601
+
602
+
// Register AggregationOrderBy input type
603
+
let aggregation_order_by_input = create_aggregation_order_by_input();
604
+
schema_builder = schema_builder.register(aggregation_order_by_input);
391
605
392
606
// Register PageInfo type
393
607
let page_info_type = create_page_info_type();
···
1443
1657
1444
1658
result
1445
1659
}
1660
+
1661
+
/// Creates an aggregated type for GROUP BY queries
1662
+
/// Returns a dynamic object with the grouped fields plus a count field
1663
+
fn create_aggregated_type(type_name: &str, fields: &[GraphQLField]) -> Object {
1664
+
let mut aggregated = Object::new(type_name);
1665
+
1666
+
// Add fields from the lexicon that can be grouped
1667
+
// Use JSON type for all fields to support both strings and complex types
1668
+
for field in fields {
1669
+
let field_name = field.name.clone();
1670
+
let field_name_clone = field_name.clone();
1671
+
aggregated = aggregated.field(Field::new(&field_name, TypeRef::named("JSON"), move |ctx| {
1672
+
let field_name = field_name_clone.clone();
1673
+
FieldFuture::new(async move {
1674
+
let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?;
1675
+
if let Some(obj) = json_value.as_object() {
1676
+
if let Some(value) = obj.get(&field_name) {
1677
+
// Convert serde_json::Value to async_graphql::Value
1678
+
let graphql_value = serde_json_to_graphql_value(value);
1679
+
return Ok(Some(graphql_value));
1680
+
}
1681
+
}
1682
+
Ok(None)
1683
+
})
1684
+
}));
1685
+
}
1686
+
1687
+
// Add count field
1688
+
aggregated = aggregated.field(Field::new("count", TypeRef::named_nn(TypeRef::INT), |ctx| {
1689
+
FieldFuture::new(async move {
1690
+
let json_value = ctx.parent_value.try_downcast_ref::<serde_json::Value>()?;
1691
+
if let Some(obj) = json_value.as_object() {
1692
+
if let Some(count) = obj.get("count") {
1693
+
if let Some(count_i64) = count.as_i64() {
1694
+
return Ok(Some(GraphQLValue::from(count_i64 as i32)));
1695
+
}
1696
+
}
1697
+
}
1698
+
Ok(Some(GraphQLValue::from(0)))
1699
+
})
1700
+
}));
1701
+
1702
+
aggregated
1703
+
}
1704
+
1705
+
/// Creates the AggregationOrderBy input type for ordering by count
1706
+
fn create_aggregation_order_by_input() -> InputObject {
1707
+
InputObject::new("AggregationOrderBy")
1708
+
.field(InputValue::new("count", TypeRef::named("SortDirection")))
1709
+
}
1710
+
1711
+
/// Converts a serde_json::Value to an async_graphql::Value
1712
+
fn serde_json_to_graphql_value(value: &serde_json::Value) -> GraphQLValue {
1713
+
match value {
1714
+
serde_json::Value::Null => GraphQLValue::Null,
1715
+
serde_json::Value::Bool(b) => GraphQLValue::Boolean(*b),
1716
+
serde_json::Value::Number(n) => {
1717
+
if let Some(i) = n.as_i64() {
1718
+
GraphQLValue::Number(i.into())
1719
+
} else if let Some(f) = n.as_f64() {
1720
+
GraphQLValue::Number(serde_json::Number::from_f64(f).unwrap().into())
1721
+
} else {
1722
+
GraphQLValue::Null
1723
+
}
1724
+
}
1725
+
serde_json::Value::String(s) => GraphQLValue::String(s.clone()),
1726
+
serde_json::Value::Array(arr) => {
1727
+
let values: Vec<GraphQLValue> = arr.iter().map(serde_json_to_graphql_value).collect();
1728
+
GraphQLValue::List(values)
1729
+
}
1730
+
serde_json::Value::Object(obj) => {
1731
+
let mut map = async_graphql::indexmap::IndexMap::new();
1732
+
for (k, v) in obj {
1733
+
map.insert(async_graphql::Name::new(k), serde_json_to_graphql_value(v));
1734
+
}
1735
+
GraphQLValue::Object(map)
1736
+
}
1737
+
}
1738
+
}