+2
-2
api/Cargo.toml
+2
-2
api/Cargo.toml
···
63
63
sqlxmq = "0.6"
64
64
regex = "1.11.2"
65
65
66
-
# Redis for caching
67
-
redis = { version = "0.32", features = ["tokio-comp", "connection-manager"] }
66
+
# Redis for caching and pub/sub
67
+
redis = { version = "0.32", features = ["tokio-comp", "connection-manager", "aio"] }
68
68
69
69
# GraphQL server
70
70
async-graphql = { version = "7.0", features = ["dynamic-schema", "dataloader"] }
+500
-24
api/src/graphql/schema_builder.rs
+500
-24
api/src/graphql/schema_builder.rs
···
21
21
add_cancel_job_mutation, add_create_oauth_client_mutation, add_delete_job_mutation,
22
22
add_delete_oauth_client_mutation, add_delete_slice_records_mutation, add_get_sync_summary_query,
23
23
add_jetstream_logs_query, add_jetstream_logs_subscription, add_oauth_clients_query,
24
-
add_slice_records_query, add_sparklines_query, add_sparklines_field_to_slice,
25
-
add_start_sync_mutation, add_stats_field_to_slice, add_sync_job_logs_query, add_sync_job_query,
26
-
add_sync_job_subscription, add_sync_jobs_query, add_update_oauth_client_mutation,
27
-
add_upload_blob_mutation, create_blob_upload_response_type, create_collection_stats_type,
28
-
create_collection_summary_type, create_delete_slice_records_output_type,
29
-
create_jetstream_log_entry_type, create_oauth_client_type, create_slice_record_type,
30
-
create_slice_record_edge_type, create_slice_records_connection_type,
31
-
create_slice_records_where_input, create_slice_sparkline_type, create_slice_stats_type,
32
-
create_sparkline_point_type, create_start_sync_output_type, create_sync_job_result_type,
33
-
create_sync_job_type, create_sync_summary_type,
24
+
add_oauth_clients_field_to_slice, add_slice_records_query, add_sparklines_query,
25
+
add_sparklines_field_to_slice, add_start_sync_mutation, add_stats_field_to_slice,
26
+
add_sync_job_logs_query, add_sync_job_query, add_sync_job_subscription, add_sync_jobs_query,
27
+
add_update_oauth_client_mutation, add_upload_blob_mutation, create_blob_upload_response_type,
28
+
create_collection_stats_type, create_collection_summary_type,
29
+
create_delete_slice_records_output_type, create_jetstream_log_entry_type,
30
+
create_oauth_client_type, create_slice_record_type, create_slice_record_edge_type,
31
+
create_slice_records_connection_type, create_slice_records_where_input,
32
+
create_slice_sparkline_type, create_slice_stats_type, create_sparkline_point_type,
33
+
create_start_sync_output_type, create_sync_job_result_type, create_sync_job_type,
34
+
create_sync_summary_type,
34
35
};
35
36
use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType};
36
37
use crate::graphql::PUBSUB;
···
44
45
at_uri_fields: Vec<String>, // Fields with format "at-uri" for reverse joins
45
46
}
46
47
48
+
/// Type registry for tracking generated nested object types
49
+
type TypeRegistry = HashMap<String, Object>;
50
+
51
+
/// Container for nested object field values
52
+
#[derive(Clone)]
53
+
struct NestedObjectContainer {
54
+
data: serde_json::Value,
55
+
}
56
+
57
+
/// Generates a unique type name for a nested object field
58
+
fn generate_nested_type_name(parent_type: &str, field_name: &str) -> String {
59
+
let mut chars = field_name.chars();
60
+
let capitalized_field = match chars.next() {
61
+
None => String::new(),
62
+
Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
63
+
};
64
+
format!("{}{}", parent_type, capitalized_field)
65
+
}
66
+
67
+
/// Resolves a lexicon ref and generates a GraphQL type for it
68
+
/// Returns the generated type name
69
+
fn resolve_lexicon_ref_type(
70
+
ref_nsid: &str,
71
+
current_lexicon_nsid: &str,
72
+
all_lexicons: &[serde_json::Value],
73
+
type_registry: &mut TypeRegistry,
74
+
database: &Database,
75
+
) -> String {
76
+
// Handle different ref formats:
77
+
// 1. Local ref: #image
78
+
// 2. External ref with specific def: app.bsky.embed.defs#aspectRatio
79
+
// 3. External ref to main: community.lexicon.location.hthree
80
+
let (target_nsid, def_name) = if ref_nsid.starts_with('#') {
81
+
// Local ref - use current lexicon NSID and the def name without #
82
+
(current_lexicon_nsid, &ref_nsid[1..])
83
+
} else if let Some(hash_pos) = ref_nsid.find('#') {
84
+
// External ref with specific def - split on #
85
+
(&ref_nsid[..hash_pos], &ref_nsid[hash_pos + 1..])
86
+
} else {
87
+
// External ref to main def
88
+
(ref_nsid, "main")
89
+
};
90
+
91
+
// Generate type name from NSID and def name
92
+
let type_name = if def_name == "main" {
93
+
// For refs to main: CommunityLexiconLocationHthree
94
+
nsid_to_type_name(target_nsid)
95
+
} else {
96
+
// For refs to specific def: AppBskyEmbedDefsAspectRatio
97
+
format!("{}{}", nsid_to_type_name(target_nsid), capitalize_first(def_name))
98
+
};
99
+
100
+
// Check if already generated
101
+
if type_registry.contains_key(&type_name) {
102
+
return type_name;
103
+
}
104
+
105
+
// Find the lexicon definition
106
+
let lexicon = all_lexicons.iter().find(|lex| {
107
+
lex.get("id").and_then(|id| id.as_str()) == Some(target_nsid)
108
+
});
109
+
110
+
if let Some(lex) = lexicon {
111
+
// Extract the definition (either "main" or specific def like "image")
112
+
if let Some(defs) = lex.get("defs") {
113
+
if let Some(def) = defs.get(def_name) {
114
+
// Extract fields from this specific definition
115
+
if let Some(properties) = def.get("properties") {
116
+
let fields = extract_fields_from_properties(properties);
117
+
118
+
if !fields.is_empty() {
119
+
// Generate the type using existing nested object generator
120
+
generate_nested_object_type(&type_name, &fields, type_registry, database);
121
+
return type_name;
122
+
}
123
+
}
124
+
}
125
+
}
126
+
}
127
+
128
+
// Fallback: couldn't resolve the ref, will use JSON
129
+
tracing::warn!("Could not resolve lexicon ref: {} (target: {}, def: {})", ref_nsid, target_nsid, def_name);
130
+
type_name
131
+
}
132
+
133
+
/// Capitalizes the first character of a string
134
+
fn capitalize_first(s: &str) -> String {
135
+
let mut chars = s.chars();
136
+
match chars.next() {
137
+
None => String::new(),
138
+
Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
139
+
}
140
+
}
141
+
142
+
/// Extracts fields from a lexicon properties object
143
+
fn extract_fields_from_properties(properties: &serde_json::Value) -> Vec<GraphQLField> {
144
+
let mut fields = Vec::new();
145
+
146
+
if let Some(props) = properties.as_object() {
147
+
for (field_name, field_def) in props {
148
+
let field_type_str = field_def.get("type").and_then(|t| t.as_str()).unwrap_or("unknown");
149
+
let field_type = crate::graphql::types::map_lexicon_type_to_graphql(field_type_str, field_def);
150
+
151
+
// Check if field is required
152
+
let is_required = false; // We'd need the parent's "required" array to know this
153
+
154
+
// Extract format if present
155
+
let format = field_def.get("format").and_then(|f| f.as_str()).map(|s| s.to_string());
156
+
157
+
fields.push(GraphQLField {
158
+
name: field_name.clone(),
159
+
field_type,
160
+
is_required,
161
+
format,
162
+
});
163
+
}
164
+
}
165
+
166
+
fields
167
+
}
168
+
169
+
/// Recursively generates GraphQL object types for nested objects
170
+
/// Returns the type name of the generated object type
171
+
fn generate_nested_object_type(
172
+
type_name: &str,
173
+
fields: &[GraphQLField],
174
+
type_registry: &mut TypeRegistry,
175
+
database: &Database,
176
+
) -> String {
177
+
// Check if type already exists in registry
178
+
if type_registry.contains_key(type_name) {
179
+
return type_name.to_string();
180
+
}
181
+
182
+
let mut object = Object::new(type_name);
183
+
184
+
// Add fields to the object
185
+
for field in fields {
186
+
let field_name = field.name.clone();
187
+
let field_name_for_field = field_name.clone(); // Clone for Field::new
188
+
let field_type = field.field_type.clone();
189
+
190
+
// Determine the TypeRef for this field
191
+
let type_ref = match &field.field_type {
192
+
GraphQLType::Object(nested_fields) => {
193
+
// Generate nested object type recursively
194
+
let nested_type_name = generate_nested_type_name(type_name, &field_name);
195
+
let actual_type_name = generate_nested_object_type(
196
+
&nested_type_name,
197
+
nested_fields,
198
+
type_registry,
199
+
database,
200
+
);
201
+
202
+
if field.is_required {
203
+
TypeRef::named_nn(actual_type_name)
204
+
} else {
205
+
TypeRef::named(actual_type_name)
206
+
}
207
+
}
208
+
GraphQLType::Array(inner) => {
209
+
if let GraphQLType::Object(nested_fields) = inner.as_ref() {
210
+
// Generate nested object type for array items
211
+
let nested_type_name = generate_nested_type_name(type_name, &field_name);
212
+
let actual_type_name = generate_nested_object_type(
213
+
&nested_type_name,
214
+
nested_fields,
215
+
type_registry,
216
+
database,
217
+
);
218
+
219
+
if field.is_required {
220
+
TypeRef::named_nn_list(actual_type_name)
221
+
} else {
222
+
TypeRef::named_list(actual_type_name)
223
+
}
224
+
} else {
225
+
// Use standard type ref for arrays of primitives
226
+
graphql_type_to_typeref(&field.field_type, field.is_required)
227
+
}
228
+
}
229
+
_ => {
230
+
// Use standard type ref for other types
231
+
graphql_type_to_typeref(&field.field_type, field.is_required)
232
+
}
233
+
};
234
+
235
+
// Add field with resolver
236
+
object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| {
237
+
let field_name = field_name.clone();
238
+
let field_type = field_type.clone();
239
+
240
+
FieldFuture::new(async move {
241
+
// Get parent container
242
+
let container = ctx.parent_value.try_downcast_ref::<NestedObjectContainer>()?;
243
+
let value = container.data.get(&field_name);
244
+
245
+
if let Some(val) = value {
246
+
if val.is_null() {
247
+
return Ok(None);
248
+
}
249
+
250
+
// For nested objects, wrap in container
251
+
if matches!(field_type, GraphQLType::Object(_)) {
252
+
let nested_container = NestedObjectContainer {
253
+
data: val.clone(),
254
+
};
255
+
return Ok(Some(FieldValue::owned_any(nested_container)));
256
+
}
257
+
258
+
// For arrays of objects, wrap each item
259
+
if let GraphQLType::Array(inner) = &field_type {
260
+
if matches!(inner.as_ref(), GraphQLType::Object(_)) {
261
+
if let Some(arr) = val.as_array() {
262
+
let containers: Vec<FieldValue> = arr
263
+
.iter()
264
+
.map(|item| {
265
+
let nested_container = NestedObjectContainer {
266
+
data: item.clone(),
267
+
};
268
+
FieldValue::owned_any(nested_container)
269
+
})
270
+
.collect();
271
+
return Ok(Some(FieldValue::list(containers)));
272
+
}
273
+
return Ok(Some(FieldValue::list(Vec::<FieldValue>::new())));
274
+
}
275
+
}
276
+
277
+
// For other types, return the GraphQL value
278
+
let graphql_val = json_to_graphql_value(val);
279
+
Ok(Some(FieldValue::value(graphql_val)))
280
+
} else {
281
+
Ok(None)
282
+
}
283
+
})
284
+
}));
285
+
}
286
+
287
+
// Store the generated type in registry
288
+
type_registry.insert(type_name.to_string(), object);
289
+
type_name.to_string()
290
+
}
291
+
47
292
/// Builds a dynamic GraphQL schema from lexicons for a given slice
48
293
pub async fn build_graphql_schema(database: Database, slice_uri: String, auth_base_url: String) -> Result<Schema, String> {
49
294
// Fetch all lexicons for this slice
···
109
354
}
110
355
}
111
356
357
+
// Initialize type registry for nested object types
358
+
let mut type_registry: TypeRegistry = HashMap::new();
359
+
112
360
// Second pass: create types and queries
113
361
for lexicon in &lexicons {
114
362
// get_lexicons_by_slice returns {lexicon: 1, id: "nsid", defs: {...}}
···
134
382
database.clone(),
135
383
slice_uri.clone(),
136
384
&all_collections,
385
+
auth_base_url.clone(),
386
+
&mut type_registry,
387
+
&lexicons,
388
+
nsid,
137
389
);
138
390
139
391
// Create edge and connection types for this collection (Relay standard)
···
1049
1301
schema_builder = schema_builder.register(mutation_input);
1050
1302
}
1051
1303
1304
+
// Register all nested object types from the type registry
1305
+
for (_, nested_type) in type_registry {
1306
+
schema_builder = schema_builder.register(nested_type);
1307
+
}
1308
+
1052
1309
schema_builder
1053
1310
.finish()
1054
1311
.map_err(|e| format!("Schema build error: {:?}", e))
···
1062
1319
1063
1320
/// Container to hold blob data and DID for URL generation
1064
1321
#[derive(Clone)]
1065
-
struct BlobContainer {
1066
-
blob_ref: String, // CID reference
1067
-
mime_type: String, // MIME type
1068
-
size: i64, // Size in bytes
1069
-
did: String, // DID for CDN URL generation
1322
+
pub struct BlobContainer {
1323
+
pub blob_ref: String, // CID reference
1324
+
pub mime_type: String, // MIME type
1325
+
pub size: i64, // Size in bytes
1326
+
pub did: String, // DID for CDN URL generation
1070
1327
}
1071
1328
1072
1329
/// Creates a GraphQL Object type for a record collection
···
1076
1333
database: Database,
1077
1334
slice_uri: String,
1078
1335
all_collections: &[CollectionMeta],
1336
+
auth_base_url: String,
1337
+
type_registry: &mut TypeRegistry,
1338
+
all_lexicons: &[serde_json::Value],
1339
+
lexicon_nsid: &str,
1079
1340
) -> Object {
1080
1341
let mut object = Object::new(type_name);
1081
1342
···
1219
1480
let field_type = field.field_type.clone();
1220
1481
let db_clone = database.clone();
1221
1482
1222
-
let type_ref = graphql_type_to_typeref(&field.field_type, field.is_required);
1483
+
// Determine type ref - handle nested objects and lexicon refs specially
1484
+
let type_ref = match &field.field_type {
1485
+
GraphQLType::LexiconRef(ref_nsid) => {
1486
+
// Resolve lexicon ref and generate type for it
1487
+
let resolved_type_name = resolve_lexicon_ref_type(
1488
+
ref_nsid,
1489
+
lexicon_nsid,
1490
+
all_lexicons,
1491
+
type_registry,
1492
+
&database,
1493
+
);
1494
+
1495
+
if field.is_required {
1496
+
TypeRef::named_nn(resolved_type_name)
1497
+
} else {
1498
+
TypeRef::named(resolved_type_name)
1499
+
}
1500
+
}
1501
+
GraphQLType::Object(nested_fields) => {
1502
+
// Generate nested object type
1503
+
let nested_type_name = generate_nested_type_name(type_name, &field_name);
1504
+
let actual_type_name = generate_nested_object_type(
1505
+
&nested_type_name,
1506
+
nested_fields,
1507
+
type_registry,
1508
+
&database,
1509
+
);
1510
+
1511
+
if field.is_required {
1512
+
TypeRef::named_nn(actual_type_name)
1513
+
} else {
1514
+
TypeRef::named(actual_type_name)
1515
+
}
1516
+
}
1517
+
GraphQLType::Array(inner) => {
1518
+
match inner.as_ref() {
1519
+
GraphQLType::LexiconRef(ref_nsid) => {
1520
+
// Resolve lexicon ref for array items
1521
+
let resolved_type_name = resolve_lexicon_ref_type(
1522
+
ref_nsid,
1523
+
lexicon_nsid,
1524
+
all_lexicons,
1525
+
type_registry,
1526
+
&database,
1527
+
);
1528
+
1529
+
if field.is_required {
1530
+
TypeRef::named_nn_list(resolved_type_name)
1531
+
} else {
1532
+
TypeRef::named_list(resolved_type_name)
1533
+
}
1534
+
}
1535
+
GraphQLType::Object(nested_fields) => {
1536
+
// Generate nested object type for array items
1537
+
let nested_type_name = generate_nested_type_name(type_name, &field_name);
1538
+
let actual_type_name = generate_nested_object_type(
1539
+
&nested_type_name,
1540
+
nested_fields,
1541
+
type_registry,
1542
+
&database,
1543
+
);
1544
+
1545
+
if field.is_required {
1546
+
TypeRef::named_nn_list(actual_type_name)
1547
+
} else {
1548
+
TypeRef::named_list(actual_type_name)
1549
+
}
1550
+
}
1551
+
_ => graphql_type_to_typeref(&field.field_type, field.is_required),
1552
+
}
1553
+
}
1554
+
_ => graphql_type_to_typeref(&field.field_type, field.is_required),
1555
+
};
1223
1556
1224
1557
object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| {
1225
1558
let field_name = field_name.clone();
···
1343
1676
}
1344
1677
}
1345
1678
1346
-
// For non-ref fields, return the raw JSON value
1679
+
// Check if this is a lexicon ref field
1680
+
if matches!(field_type, GraphQLType::LexiconRef(_)) {
1681
+
let nested_container = NestedObjectContainer {
1682
+
data: val.clone(),
1683
+
};
1684
+
return Ok(Some(FieldValue::owned_any(nested_container)));
1685
+
}
1686
+
1687
+
// Check if this is a nested object field
1688
+
if matches!(field_type, GraphQLType::Object(_)) {
1689
+
let nested_container = NestedObjectContainer {
1690
+
data: val.clone(),
1691
+
};
1692
+
return Ok(Some(FieldValue::owned_any(nested_container)));
1693
+
}
1694
+
1695
+
// Check if this is an array of nested objects or lexicon refs
1696
+
if let GraphQLType::Array(inner) = &field_type {
1697
+
if matches!(inner.as_ref(), GraphQLType::LexiconRef(_)) || matches!(inner.as_ref(), GraphQLType::Object(_)) {
1698
+
if let Some(arr) = val.as_array() {
1699
+
let containers: Vec<FieldValue> = arr
1700
+
.iter()
1701
+
.map(|item| {
1702
+
let nested_container = NestedObjectContainer {
1703
+
data: item.clone(),
1704
+
};
1705
+
FieldValue::owned_any(nested_container)
1706
+
})
1707
+
.collect();
1708
+
return Ok(Some(FieldValue::list(containers)));
1709
+
}
1710
+
return Ok(Some(FieldValue::list(Vec::<FieldValue>::new())));
1711
+
}
1712
+
}
1713
+
1714
+
// For non-ref, non-object fields, return the raw JSON value
1347
1715
let graphql_val = json_to_graphql_value(val);
1348
1716
Ok(Some(FieldValue::value(graphql_val)))
1349
1717
} else {
···
1834
2202
));
1835
2203
}
1836
2204
1837
-
// Add sparklines and stats fields for NetworkSlicesSlice type
2205
+
// Add sparklines, stats, and oauth clients fields for NetworkSlicesSlice type
1838
2206
if type_name == "NetworkSlicesSlice" {
1839
2207
object = add_sparklines_field_to_slice(object, database.clone());
1840
2208
object = add_stats_field_to_slice(object, database.clone());
2209
+
object = add_oauth_clients_field_to_slice(object, auth_base_url);
1841
2210
}
1842
2211
1843
2212
object
···
1910
2279
// Always nullable since blob data might be missing or malformed
1911
2280
TypeRef::named("Blob")
1912
2281
}
1913
-
GraphQLType::Json | GraphQLType::Ref | GraphQLType::Object(_) | GraphQLType::Union => {
1914
-
// JSON scalar type - linked records and complex objects return as JSON
2282
+
GraphQLType::Json | GraphQLType::Ref | GraphQLType::LexiconRef(_) | GraphQLType::Object(_) | GraphQLType::Union => {
2283
+
// JSON scalar type - linked records, lexicon refs, and complex objects return as JSON (fallback)
1915
2284
if is_required {
1916
2285
TypeRef::named_nn("JSON")
1917
2286
} else {
···
2500
2869
let type_name = nsid_to_type_name(nsid);
2501
2870
2502
2871
// Add create mutation
2503
-
mutation = add_create_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone());
2872
+
mutation = add_create_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone());
2504
2873
2505
2874
// Add update mutation
2506
-
mutation = add_update_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone());
2875
+
mutation = add_update_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone());
2507
2876
2508
2877
// Add delete mutation
2509
2878
mutation = add_delete_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone());
···
3153
3522
input
3154
3523
}
3155
3524
3525
+
/// Transforms fields in record data from GraphQL format to AT Protocol format
3526
+
///
3527
+
/// Blob fields:
3528
+
/// - GraphQL format: `{ref: "bafyrei...", mimeType: "...", size: 123}`
3529
+
/// - AT Protocol format: `{$type: "blob", ref: {$link: "bafyrei..."}, mimeType: "...", size: 123}`
3530
+
///
3531
+
/// Lexicon ref fields:
3532
+
/// - Adds `$type: "{ref_nsid}"` to objects (e.g., `{$type: "community.lexicon.location.hthree#main", ...}`)
3533
+
///
3534
+
/// Nested objects:
3535
+
/// - Recursively processes nested objects and arrays
3536
+
fn transform_fields_for_atproto(
3537
+
mut data: serde_json::Value,
3538
+
fields: &[GraphQLField],
3539
+
) -> serde_json::Value {
3540
+
if let serde_json::Value::Object(ref mut map) = data {
3541
+
for field in fields {
3542
+
if let Some(field_value) = map.get_mut(&field.name) {
3543
+
match &field.field_type {
3544
+
GraphQLType::Blob => {
3545
+
// Transform single blob field
3546
+
if let Some(blob_obj) = field_value.as_object_mut() {
3547
+
// Add $type: "blob"
3548
+
blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string()));
3549
+
3550
+
// Check if ref is a string (GraphQL format)
3551
+
if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") {
3552
+
// Transform to {$link: "cid"} (AT Protocol format)
3553
+
let link_obj = serde_json::json!({
3554
+
"$link": cid
3555
+
});
3556
+
blob_obj.insert("ref".to_string(), link_obj);
3557
+
}
3558
+
}
3559
+
}
3560
+
GraphQLType::LexiconRef(ref_nsid) => {
3561
+
// Transform lexicon ref field by adding $type
3562
+
if let Some(ref_obj) = field_value.as_object_mut() {
3563
+
ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone()));
3564
+
}
3565
+
}
3566
+
GraphQLType::Object(nested_fields) => {
3567
+
// Recursively transform nested objects
3568
+
*field_value = transform_fields_for_atproto(field_value.clone(), nested_fields);
3569
+
}
3570
+
GraphQLType::Array(inner) => {
3571
+
match inner.as_ref() {
3572
+
GraphQLType::Blob => {
3573
+
// Transform array of blobs
3574
+
if let Some(arr) = field_value.as_array_mut() {
3575
+
for blob_value in arr {
3576
+
if let Some(blob_obj) = blob_value.as_object_mut() {
3577
+
// Add $type: "blob"
3578
+
blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string()));
3579
+
3580
+
if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") {
3581
+
let link_obj = serde_json::json!({
3582
+
"$link": cid
3583
+
});
3584
+
blob_obj.insert("ref".to_string(), link_obj);
3585
+
}
3586
+
}
3587
+
}
3588
+
}
3589
+
}
3590
+
GraphQLType::LexiconRef(ref_nsid) => {
3591
+
// Transform array of lexicon refs
3592
+
if let Some(arr) = field_value.as_array_mut() {
3593
+
for ref_value in arr {
3594
+
if let Some(ref_obj) = ref_value.as_object_mut() {
3595
+
ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone()));
3596
+
}
3597
+
}
3598
+
}
3599
+
}
3600
+
GraphQLType::Object(nested_fields) => {
3601
+
// Transform array of objects recursively
3602
+
if let Some(arr) = field_value.as_array_mut() {
3603
+
for item in arr {
3604
+
*item = transform_fields_for_atproto(item.clone(), nested_fields);
3605
+
}
3606
+
}
3607
+
}
3608
+
_ => {} // Other array types don't need transformation
3609
+
}
3610
+
}
3611
+
_ => {} // Other field types don't need transformation
3612
+
}
3613
+
}
3614
+
}
3615
+
}
3616
+
3617
+
data
3618
+
}
3619
+
3156
3620
/// Adds a create mutation for a collection
3157
3621
fn add_create_mutation(
3158
3622
mutation: Object,
3159
3623
type_name: &str,
3160
3624
nsid: &str,
3625
+
fields: &[GraphQLField],
3161
3626
database: Database,
3162
3627
slice_uri: String,
3163
3628
) -> Object {
3164
3629
let mutation_name = format!("create{}", type_name);
3165
3630
let nsid = nsid.to_string();
3166
3631
let nsid_clone = nsid.clone();
3632
+
let fields = fields.to_vec();
3167
3633
3168
3634
mutation.field(
3169
3635
Field::new(
···
3173
3639
let db = database.clone();
3174
3640
let slice = slice_uri.clone();
3175
3641
let collection = nsid.clone();
3642
+
let fields = fields.clone();
3176
3643
3177
3644
FieldFuture::new(async move {
3178
3645
// Get GraphQL context which contains auth info
···
3188
3655
.ok_or_else(|| Error::new("Missing input argument"))?;
3189
3656
3190
3657
// Convert GraphQL value to JSON using deserialize
3191
-
let record_data: serde_json::Value = input.deserialize()
3658
+
let mut record_data: serde_json::Value = input.deserialize()
3192
3659
.map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?;
3660
+
3661
+
// Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs)
3662
+
record_data = transform_fields_for_atproto(record_data, &fields);
3193
3663
3194
3664
// Optional rkey argument
3195
3665
let rkey = ctx.args.get("rkey")
···
3318
3788
mutation: Object,
3319
3789
type_name: &str,
3320
3790
nsid: &str,
3791
+
fields: &[GraphQLField],
3321
3792
database: Database,
3322
3793
slice_uri: String,
3323
3794
) -> Object {
3324
3795
let mutation_name = format!("update{}", type_name);
3325
3796
let nsid = nsid.to_string();
3326
3797
let nsid_clone = nsid.clone();
3798
+
let fields = fields.to_vec();
3327
3799
3328
3800
mutation.field(
3329
3801
Field::new(
···
3333
3805
let db = database.clone();
3334
3806
let slice = slice_uri.clone();
3335
3807
let collection = nsid.clone();
3808
+
let fields = fields.clone();
3336
3809
3337
3810
FieldFuture::new(async move {
3338
3811
// Get GraphQL context which contains auth info
···
3354
3827
.ok_or_else(|| Error::new("Missing input argument"))?;
3355
3828
3356
3829
// Convert GraphQL value to JSON using deserialize
3357
-
let record_data: serde_json::Value = input.deserialize()
3830
+
let mut record_data: serde_json::Value = input.deserialize()
3358
3831
.map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?;
3832
+
3833
+
// Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs)
3834
+
record_data = transform_fields_for_atproto(record_data, &fields);
3359
3835
3360
3836
// Verify OAuth token and get user info
3361
3837
let user_info = crate::auth::verify_oauth_token_cached(
+29
-18
api/src/graphql/schema_ext/blob_upload.rs
+29
-18
api/src/graphql/schema_ext/blob_upload.rs
···
1
1
//! GraphQL schema extension for blob uploads
2
2
3
3
use async_graphql::dynamic::{Field, FieldFuture, FieldValue, InputValue, Object, TypeRef};
4
-
use async_graphql::{Error, Value as GraphQLValue};
4
+
use async_graphql::Error;
5
5
use base64::engine::general_purpose;
6
6
use base64::Engine;
7
7
8
8
use crate::atproto_extensions::upload_blob as atproto_upload_blob;
9
9
use crate::auth;
10
-
11
-
/// Container for blob upload response
12
-
#[derive(Clone)]
13
-
struct BlobUploadContainer {
14
-
blob: serde_json::Value,
15
-
}
10
+
use crate::graphql::schema_builder::BlobContainer;
16
11
17
12
/// Creates the BlobUploadResponse GraphQL type
18
13
pub fn create_blob_upload_response_type() -> Object {
19
14
let mut response = Object::new("BlobUploadResponse");
20
15
21
-
response = response.field(Field::new("blob", TypeRef::named_nn("JSON"), |ctx| {
16
+
// Return the Blob type instead of JSON to ensure consistent ref field handling
17
+
response = response.field(Field::new("blob", TypeRef::named_nn("Blob"), |ctx| {
22
18
FieldFuture::new(async move {
23
-
let container = ctx.parent_value.try_downcast_ref::<BlobUploadContainer>()?;
24
-
// Convert serde_json::Value to async_graphql::Value
25
-
let graphql_value: GraphQLValue = serde_json::from_value(container.blob.clone())
26
-
.map_err(|e| async_graphql::Error::new(format!("Failed to convert blob to GraphQL value: {}", e)))?;
27
-
Ok(Some(graphql_value))
19
+
// The BlobContainer is passed through from the mutation resolver
20
+
// The Blob type resolver will handle extracting the fields
21
+
let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?;
22
+
Ok(Some(FieldValue::owned_any(container.clone())))
28
23
})
29
24
}));
30
25
···
70
65
.decode(data_base64)
71
66
.map_err(|e| Error::new(format!("Invalid base64 data: {}", e)))?;
72
67
68
+
// Verify OAuth token to get user info (needed for DID)
69
+
let user_info = auth::verify_oauth_token_cached(
70
+
token,
71
+
&auth_base,
72
+
gql_ctx.auth_cache.clone(),
73
+
)
74
+
.await
75
+
.map_err(|e| Error::new(format!("Invalid token: {}", e)))?;
76
+
73
77
// Get ATProto DPoP auth and PDS URL for this user
74
78
let (dpop_auth, pds_url) = auth::get_atproto_auth_for_user_cached(
75
79
token,
···
91
95
.await
92
96
.map_err(|e| Error::new(format!("Failed to upload blob: {}", e)))?;
93
97
94
-
// Convert blob to JSON value
95
-
let blob_json = serde_json::to_value(&upload_result.blob)
96
-
.map_err(|e| Error::new(format!("Failed to serialize blob: {}", e)))?;
98
+
// Extract the DID from user info
99
+
let did = user_info.did.unwrap_or(user_info.sub);
100
+
101
+
// Create BlobContainer with flattened ref field (CID string)
102
+
// This ensures the GraphQL Blob type returns ref as a String, not an object
103
+
let blob_container = BlobContainer {
104
+
blob_ref: upload_result.blob.r#ref.link.clone(), // Extract CID from ref.$link
105
+
mime_type: upload_result.blob.mime_type.clone(),
106
+
size: upload_result.blob.size as i64,
107
+
did,
108
+
};
97
109
98
-
let container = BlobUploadContainer { blob: blob_json };
99
-
Ok(Some(FieldValue::owned_any(container)))
110
+
Ok(Some(FieldValue::owned_any(blob_container)))
100
111
})
101
112
},
102
113
)
+1
api/src/graphql/schema_ext/mod.rs
+1
api/src/graphql/schema_ext/mod.rs
+107
api/src/graphql/schema_ext/oauth.rs
+107
api/src/graphql/schema_ext/oauth.rs
···
156
156
oauth_client
157
157
}
158
158
159
+
/// Add oauthClients field to NetworkSlicesSlice type
160
+
pub fn add_oauth_clients_field_to_slice(
161
+
object: Object,
162
+
auth_base_url: String,
163
+
) -> Object {
164
+
use crate::graphql::schema_builder::RecordContainer;
165
+
166
+
let base_url_for_oauth = auth_base_url.clone();
167
+
168
+
object.field(
169
+
Field::new(
170
+
"oauthClients",
171
+
TypeRef::named_nn_list_nn("OAuthClient"),
172
+
move |ctx| {
173
+
let base_url = base_url_for_oauth.clone();
174
+
175
+
FieldFuture::new(async move {
176
+
let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?;
177
+
let slice_uri = &container.record.uri;
178
+
179
+
// Get pool from context and create database instance
180
+
let pool = ctx.data::<sqlx::PgPool>()
181
+
.map_err(|_| Error::new("Database pool not found in context"))?;
182
+
let database = crate::database::Database::new(pool.clone());
183
+
184
+
// Fetch OAuth clients from database
185
+
let clients = database
186
+
.get_oauth_clients_for_slice(slice_uri)
187
+
.await
188
+
.map_err(|e| Error::new(format!("Failed to fetch OAuth clients: {}", e)))?;
189
+
190
+
if clients.is_empty() {
191
+
return Ok(Some(FieldValue::list(Vec::<FieldValue<'_>>::new())));
192
+
}
193
+
194
+
// Fetch details from AIP server
195
+
let http_client = Client::new();
196
+
let mut client_data_list = Vec::new();
197
+
198
+
for oauth_client in clients {
199
+
let aip_url = format!("{}/oauth/clients/{}", base_url, oauth_client.client_id);
200
+
let mut request_builder = http_client.get(&aip_url);
201
+
202
+
if let Some(token) = &oauth_client.registration_access_token {
203
+
request_builder = request_builder.bearer_auth(token);
204
+
}
205
+
206
+
match request_builder.send().await {
207
+
Ok(response) if response.status().is_success() => {
208
+
if let Ok(response_text) = response.text().await {
209
+
if let Ok(aip_client) = serde_json::from_str::<AipClientResponse>(&response_text) {
210
+
client_data_list.push(OAuthClientData {
211
+
client_id: aip_client.client_id,
212
+
client_secret: aip_client.client_secret,
213
+
client_name: aip_client.client_name,
214
+
redirect_uris: aip_client.redirect_uris,
215
+
grant_types: aip_client.grant_types,
216
+
response_types: aip_client.response_types,
217
+
scope: aip_client.scope,
218
+
client_uri: aip_client.client_uri,
219
+
logo_uri: aip_client.logo_uri,
220
+
tos_uri: aip_client.tos_uri,
221
+
policy_uri: aip_client.policy_uri,
222
+
created_at: oauth_client.created_at,
223
+
created_by_did: oauth_client.created_by_did,
224
+
});
225
+
}
226
+
}
227
+
}
228
+
_ => {
229
+
// Fallback for clients we can't fetch details for
230
+
client_data_list.push(OAuthClientData {
231
+
client_id: oauth_client.client_id,
232
+
client_secret: None,
233
+
client_name: "Unknown".to_string(),
234
+
redirect_uris: vec![],
235
+
grant_types: vec!["authorization_code".to_string()],
236
+
response_types: vec!["code".to_string()],
237
+
scope: None,
238
+
client_uri: None,
239
+
logo_uri: None,
240
+
tos_uri: None,
241
+
policy_uri: None,
242
+
created_at: oauth_client.created_at,
243
+
created_by_did: oauth_client.created_by_did,
244
+
});
245
+
}
246
+
}
247
+
}
248
+
249
+
// Convert to GraphQL values
250
+
let field_values: Vec<FieldValue<'_>> = client_data_list
251
+
.into_iter()
252
+
.map(|client_data| {
253
+
let container = OAuthClientContainer { client: client_data };
254
+
FieldValue::owned_any(container)
255
+
})
256
+
.collect();
257
+
258
+
Ok(Some(FieldValue::list(field_values)))
259
+
})
260
+
},
261
+
)
262
+
.description("Get all OAuth clients for this slice")
263
+
)
264
+
}
265
+
159
266
/// Add oauthClients query to the Query type
160
267
pub fn add_oauth_clients_query(query: Object, slice_uri: String, auth_base_url: String) -> Object {
161
268
query.field(
+143
-4
api/src/graphql/schema_ext/sync.rs
+143
-4
api/src/graphql/schema_ext/sync.rs
···
10
10
use uuid::Uuid;
11
11
use base64::engine::general_purpose;
12
12
use base64::Engine;
13
+
use redis::aio::ConnectionManager;
14
+
use redis::{Client, AsyncCommands};
15
+
use futures_util::StreamExt;
13
16
14
17
/// Global broadcast channel for sync job status updates
15
18
/// This allows real-time job status streaming to GraphQL subscriptions
16
19
static JOB_CHANNEL: OnceLock<Arc<Mutex<broadcast::Sender<JobStatus>>>> = OnceLock::new();
20
+
21
+
/// Global Redis client for cross-process pub/sub (optional)
22
+
static REDIS_CLIENT: OnceLock<Option<Client>> = OnceLock::new();
17
23
18
24
/// Initialize or get the global job channel
19
25
fn get_job_channel() -> Arc<Mutex<broadcast::Sender<JobStatus>>> {
···
27
33
28
34
/// Publish a sync job status update to subscribers
29
35
pub async fn publish_sync_job_update(job_status: JobStatus) {
36
+
// Publish to in-memory broadcast channel (for same-process subscribers)
30
37
let sender = get_job_channel();
31
38
let sender_lock = sender.lock().await;
32
-
let _ = sender_lock.send(job_status); // Ignore errors if no subscribers
39
+
let _ = sender_lock.send(job_status.clone()); // Ignore errors if no subscribers
40
+
drop(sender_lock);
41
+
42
+
// Also publish to Redis for cross-process communication (if Redis is configured)
43
+
if let Some(Some(client)) = REDIS_CLIENT.get() {
44
+
if let Err(e) = publish_to_redis(client, &job_status).await {
45
+
tracing::warn!("Failed to publish job status to Redis: {}", e);
46
+
}
47
+
}
48
+
}
49
+
50
+
/// Publish job status to Redis for cross-process communication
51
+
async fn publish_to_redis(client: &Client, job_status: &JobStatus) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
52
+
let mut conn = ConnectionManager::new(client.clone()).await?;
53
+
let payload = serde_json::to_string(job_status)?;
54
+
let _: () = conn.publish("sync_job_updates", payload).await?;
55
+
Ok(())
33
56
}
34
57
35
58
/// Container for JobStatus to implement Any trait for GraphQL
···
226
249
FieldFuture::new(async move {
227
250
let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
228
251
Ok(Some(GraphQLValue::from(container.status.job_id.to_string())))
252
+
})
253
+
}));
254
+
255
+
job = job.field(Field::new("sliceUri", TypeRef::named_nn(TypeRef::STRING), |ctx| {
256
+
FieldFuture::new(async move {
257
+
let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?;
258
+
Ok(Some(GraphQLValue::from(container.status.slice_uri.clone())))
229
259
})
230
260
}));
231
261
···
711
741
let mut receiver = sender_lock.subscribe();
712
742
drop(sender_lock); // Release lock
713
743
744
+
// Get optional slice filter from arguments
745
+
let slice_filter: Option<String> = ctx.args.get("slice")
746
+
.and_then(|val| val.string().ok())
747
+
.map(|s| s.to_string());
748
+
714
749
let stream = async_stream::stream! {
715
750
while let Ok(job_status) = receiver.recv().await {
716
751
// Filter by job_id if provided
···
720
755
}
721
756
}
722
757
723
-
// Filter by slice_uri if provided (need to query for slice_uri)
724
-
// For now, skip slice filtering since JobStatus doesn't include slice_uri
725
-
// TODO: Add slice_uri to JobStatus or query it separately
758
+
// Filter by slice_uri if provided
759
+
if let Some(ref filter_slice) = slice_filter {
760
+
if &job_status.slice_uri != filter_slice {
761
+
continue;
762
+
}
763
+
}
726
764
727
765
// Convert to GraphQL value and yield
728
766
let container = JobStatusContainer { status: job_status };
···
822
860
.description("Delete a sync job from the database")
823
861
)
824
862
}
863
+
864
+
/// Initialize Redis pub/sub for sync job updates
865
+
///
866
+
/// This function should be called once at application startup.
867
+
/// It initializes the Redis client and starts a background task to listen for
868
+
/// job updates from other processes (e.g., worker processes).
869
+
///
870
+
/// # Arguments
871
+
/// * `redis_url` - Optional Redis connection URL. If None, Redis pub/sub is disabled.
872
+
pub fn initialize_redis_pubsub(redis_url: Option<String>) {
873
+
// Initialize Redis client (or None if not configured)
874
+
let client = redis_url.and_then(|url| {
875
+
match Client::open(url.as_str()) {
876
+
Ok(client) => {
877
+
tracing::info!("Initialized Redis client for sync job pub/sub");
878
+
Some(client)
879
+
}
880
+
Err(e) => {
881
+
tracing::error!("Failed to create Redis client for sync job pub/sub: {}", e);
882
+
None
883
+
}
884
+
}
885
+
});
886
+
887
+
let has_redis = client.is_some();
888
+
REDIS_CLIENT.get_or_init(|| client);
889
+
890
+
// Start Redis subscription listener task if Redis is available
891
+
if has_redis {
892
+
start_redis_listener();
893
+
} else {
894
+
tracing::info!("Redis not configured - sync job updates will use in-memory broadcast only");
895
+
}
896
+
}
897
+
898
+
/// Start a background task that subscribes to Redis and forwards messages to the in-memory broadcast channel
899
+
fn start_redis_listener() {
900
+
tokio::spawn(async {
901
+
tracing::info!("Starting Redis subscription listener for sync job updates");
902
+
903
+
loop {
904
+
// Get Redis client
905
+
let client = match REDIS_CLIENT.get() {
906
+
Some(Some(client)) => client,
907
+
_ => {
908
+
tracing::error!("Redis client not available for subscription");
909
+
return;
910
+
}
911
+
};
912
+
913
+
// Connect and subscribe
914
+
match subscribe_to_redis(client).await {
915
+
Ok(_) => {
916
+
tracing::warn!("Redis subscription ended, reconnecting in 5 seconds...");
917
+
}
918
+
Err(e) => {
919
+
tracing::error!("Redis subscription error: {}, reconnecting in 5 seconds...", e);
920
+
}
921
+
}
922
+
923
+
// Wait before reconnecting
924
+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
925
+
}
926
+
});
927
+
}
928
+
929
+
/// Subscribe to Redis channel and forward messages to in-memory broadcast
930
+
async fn subscribe_to_redis(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
931
+
// Create a pub/sub connection from the client
932
+
let mut pubsub = client.get_async_pubsub().await?;
933
+
934
+
pubsub.subscribe("sync_job_updates").await?;
935
+
tracing::info!("Subscribed to Redis channel: sync_job_updates");
936
+
937
+
// Get the in-memory broadcast sender
938
+
let sender = get_job_channel();
939
+
940
+
loop {
941
+
let msg = pubsub.on_message().next().await;
942
+
if let Some(msg) = msg {
943
+
let payload: String = msg.get_payload()?;
944
+
945
+
// Deserialize JobStatus from JSON
946
+
match serde_json::from_str::<JobStatus>(&payload) {
947
+
Ok(job_status) => {
948
+
// Forward to in-memory broadcast channel
949
+
let sender_lock = sender.lock().await;
950
+
if let Err(e) = sender_lock.send(job_status.clone()) {
951
+
tracing::debug!("No local subscribers for job update: {}", e);
952
+
}
953
+
drop(sender_lock);
954
+
955
+
tracing::debug!("Forwarded job update from Redis: job_id={}", job_status.job_id);
956
+
}
957
+
Err(e) => {
958
+
tracing::warn!("Failed to deserialize job status from Redis: {}", e);
959
+
}
960
+
}
961
+
}
962
+
}
963
+
}
+6
-1
api/src/graphql/types.rs
+6
-1
api/src/graphql/types.rs
···
20
20
Float,
21
21
/// Reference to another record (for strongRef)
22
22
Ref,
23
+
/// Reference to a lexicon type definition (e.g., community.lexicon.location.hthree)
24
+
LexiconRef(String),
23
25
/// Array of a type
24
26
Array(Box<GraphQLType>),
25
27
/// Object with nested fields
···
45
47
"unknown" => GraphQLType::Json,
46
48
"null" => GraphQLType::Json,
47
49
"ref" => {
48
-
// Check if this is a strongRef (link to another record)
50
+
// Check if this is a strongRef (link to another record) or a lexicon type ref
49
51
let ref_name = lexicon_def
50
52
.get("ref")
51
53
.and_then(|r| r.as_str())
···
53
55
54
56
if ref_name == "com.atproto.repo.strongRef" {
55
57
GraphQLType::Ref
58
+
} else if !ref_name.is_empty() {
59
+
// This is a reference to a lexicon type definition
60
+
GraphQLType::LexiconRef(ref_name.to_string())
56
61
} else {
57
62
GraphQLType::Json
58
63
}
+19
api/src/jobs.rs
+19
api/src/jobs.rs
···
148
148
// Publish job running status to subscribers
149
149
let running_status = JobStatus {
150
150
job_id: payload.job_id,
151
+
slice_uri: payload.slice_uri.clone(),
151
152
status: "running".to_string(),
152
153
created_at: now,
153
154
started_at: Some(now),
···
260
261
// Publish job status update to GraphQL subscribers
261
262
let job_status = JobStatus {
262
263
job_id: payload.job_id,
264
+
slice_uri: payload.slice_uri.clone(),
263
265
status: "completed".to_string(),
264
266
created_at: chrono::Utc::now(),
265
267
started_at: Some(chrono::Utc::now()),
···
337
339
// Publish job status update to GraphQL subscribers
338
340
let job_status = JobStatus {
339
341
job_id: payload.job_id,
342
+
slice_uri: payload.slice_uri.clone(),
340
343
status: "failed".to_string(),
341
344
created_at: chrono::Utc::now(),
342
345
started_at: Some(chrono::Utc::now()),
···
536
539
// Publish job creation event to subscribers
537
540
let job_status = JobStatus {
538
541
job_id,
542
+
slice_uri: slice_uri.clone(),
539
543
status: "pending".to_string(),
540
544
created_at: chrono::Utc::now(),
541
545
started_at: None,
···
559
563
pub struct JobStatus {
560
564
/// Unique identifier for the job
561
565
pub job_id: Uuid,
566
+
/// Slice URI this job belongs to
567
+
pub slice_uri: String,
562
568
/// Current status: "pending", "running", "completed", or "failed"
563
569
pub status: String,
564
570
/// Timestamp when job was enqueued
···
611
617
612
618
return Ok(Some(JobStatus {
613
619
job_id,
620
+
slice_uri: result.slice_uri,
614
621
status: result.status,
615
622
created_at: result.created_at,
616
623
started_at: Some(result.created_at),
···
647
654
648
655
match queue_row {
649
656
Some(row) => {
657
+
// Extract slice_uri from payload JSON
658
+
let slice_uri = row.payload_json
659
+
.as_ref()
660
+
.and_then(|json| json.get("slice_uri"))
661
+
.and_then(|v| v.as_str())
662
+
.unwrap_or_default()
663
+
.to_string();
664
+
650
665
// Determine status based on attempt_at timestamp
651
666
let status = if row.attempt_at.is_none() {
652
667
"completed".to_string()
···
662
677
663
678
Ok(Some(JobStatus {
664
679
job_id,
680
+
slice_uri,
665
681
status: status.clone(),
666
682
created_at: row.created_at.unwrap_or_else(chrono::Utc::now),
667
683
started_at: if status == "running" || status == "completed" {
···
790
806
791
807
results.push(JobStatus {
792
808
job_id: row.job_id.unwrap_or_else(Uuid::new_v4),
809
+
slice_uri: row.slice_uri.clone().unwrap_or_default(),
793
810
status: row.status.unwrap_or_default(),
794
811
created_at: row.created_at.unwrap_or_else(chrono::Utc::now),
795
812
started_at: row.created_at,
···
902
919
903
920
results.push(JobStatus {
904
921
job_id: row.job_id.unwrap_or_else(Uuid::new_v4),
922
+
slice_uri: row.slice_uri.clone().unwrap_or_default(),
905
923
status: row.status.unwrap_or_default(),
906
924
created_at: row.created_at.unwrap_or_else(chrono::Utc::now),
907
925
started_at: row.created_at,
···
1055
1073
// Publish job status update for subscribers
1056
1074
let job_status = JobStatus {
1057
1075
job_id,
1076
+
slice_uri,
1058
1077
status: "cancelled".to_string(),
1059
1078
created_at: now,
1060
1079
started_at: None,
+10
-2
api/src/main.rs
+10
-2
api/src/main.rs
···
109
109
// Start GraphQL PubSub cleanup task
110
110
graphql::pubsub::start_cleanup_task();
111
111
112
+
// Initialize Redis pub/sub for cross-process sync job updates
113
+
let redis_url = env::var("REDIS_URL").ok();
114
+
graphql::schema_ext::sync::initialize_redis_pubsub(redis_url);
115
+
112
116
// Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP)
113
117
let process_type = env::var("PROCESS_TYPE")
114
118
.or_else(|_| env::var("FLY_PROCESS_GROUP"))
···
183
187
if now.duration_since(window_start) >= RECONNECT_WINDOW {
184
188
reconnect_count = 0;
185
189
window_start = now;
190
+
retry_delay = tokio::time::Duration::from_secs(5); // Reset delay after window passes
186
191
}
187
192
188
193
// Check rate limit
···
198
203
}
199
204
200
205
reconnect_count += 1;
206
+
tracing::info!("Jetstream connection attempt #{} (retry delay: {:?})", reconnect_count, retry_delay);
201
207
202
208
// Read cursor position from database
203
209
let initial_cursor =
···
261
267
let cancellation_token = atproto_jetstream::CancellationToken::new();
262
268
match consumer_arc.start_consuming(cancellation_token).await {
263
269
Ok(_) => {
264
-
tracing::info!("Jetstream consumer shut down normally");
270
+
tracing::info!("Jetstream consumer shut down normally - reconnecting in {:?}", retry_delay);
265
271
jetstream_connected_clone
266
272
.store(false, std::sync::atomic::Ordering::Relaxed);
273
+
tokio::time::sleep(retry_delay).await;
274
+
retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
267
275
}
268
276
Err(e) => {
269
-
tracing::error!("Jetstream consumer failed: {} - will reconnect", e);
277
+
tracing::error!("Jetstream consumer failed: {} - reconnecting in {:?}", e, retry_delay);
270
278
jetstream_connected_clone
271
279
.store(false, std::sync::atomic::Ordering::Relaxed);
272
280
tokio::time::sleep(retry_delay).await;
+24
-11
crates/slices-lexicon/src/validation/primitive/string.rs
+24
-11
crates/slices-lexicon/src/validation/primitive/string.rs
···
577
577
578
578
/// Validates TID (Timestamp Identifier) format
579
579
///
580
-
/// TID format: 13-character base32-encoded timestamp + random bits
581
-
/// Uses Crockford base32 alphabet: 0123456789ABCDEFGHJKMNPQRSTVWXYZ (case-insensitive)
580
+
/// TID format: 13-character base32-sortable encoded timestamp + random bits
581
+
/// Uses ATProto base32-sortable alphabet: 234567abcdefghijklmnopqrstuvwxyz (lowercase only)
582
582
pub fn is_valid_tid(&self, value: &str) -> bool {
583
583
use regex::Regex;
584
584
···
586
586
return false;
587
587
}
588
588
589
-
// TID uses Crockford base32 (case-insensitive, excludes I, L, O, U)
590
-
let tid_regex = Regex::new(r"^[0-9A-HJKMNP-TV-Z]{13}$").unwrap();
591
-
let uppercase_value = value.to_uppercase();
589
+
// TID uses base32-sortable (s32) - lowercase only
590
+
// First character must be from limited set (ensures top bit is 0)
591
+
// Remaining 12 characters from full base32-sortable alphabet
592
+
let tid_regex = Regex::new(r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$").unwrap();
592
593
593
-
tid_regex.is_match(&uppercase_value)
594
+
tid_regex.is_match(value)
594
595
}
595
596
596
597
/// Validates Record Key format
···
1096
1097
1097
1098
let validator = StringValidator;
1098
1099
1099
-
// Valid TIDs (13 characters, Crockford base32)
1100
-
assert!(validator.validate_data(&json!("3JZFKJT0000ZZ"), &schema, &ctx).is_ok());
1101
-
assert!(validator.validate_data(&json!("3jzfkjt0000zz"), &schema, &ctx).is_ok()); // case insensitive
1100
+
// Valid TIDs (base32-sortable, 13 chars, lowercase)
1101
+
assert!(validator.validate_data(&json!("3m3zm7eurxk26"), &schema, &ctx).is_ok());
1102
+
assert!(validator.validate_data(&json!("2222222222222"), &schema, &ctx).is_ok()); // minimum TID
1103
+
assert!(validator.validate_data(&json!("a222222222222"), &schema, &ctx).is_ok()); // leading 'a' (lower bound)
1104
+
assert!(validator.validate_data(&json!("j234567abcdef"), &schema, &ctx).is_ok()); // leading 'j' (upper bound)
1105
+
1102
1106
1103
-
// Invalid TIDs
1107
+
// Invalid TIDs - uppercase not allowed (charset is lowercase only)
1108
+
assert!(validator.validate_data(&json!("3m3zM7eurxk26"), &schema, &ctx).is_err()); // mixed case
1109
+
1110
+
// Invalid TIDs - wrong length
1104
1111
assert!(validator.validate_data(&json!("too-short"), &schema, &ctx).is_err());
1105
1112
assert!(validator.validate_data(&json!("too-long-string"), &schema, &ctx).is_err());
1113
+
1114
+
// Invalid TIDs - invalid characters (hyphen/punct rejected; digits 0,1,8,9 not allowed)
1106
1115
assert!(validator.validate_data(&json!("invalid-chars!"), &schema, &ctx).is_err());
1107
-
assert!(validator.validate_data(&json!("invalid-ILOU0"), &schema, &ctx).is_err()); // invalid chars (I, L, O, U)
1116
+
assert!(validator.validate_data(&json!("xyz1234567890"), &schema, &ctx).is_err()); // has 0,1,8,9
1117
+
1118
+
// Invalid TIDs - first character must be one of 234567abcdefghij
1119
+
assert!(validator.validate_data(&json!("k222222222222"), &schema, &ctx).is_err()); // leading 'k' forbidden
1120
+
assert!(validator.validate_data(&json!("z234567abcdef"), &schema, &ctx).is_err()); // leading 'z' forbidden
1108
1121
}
1109
1122
1110
1123
#[test]
+146
-3
deno.lock
+146
-3
deno.lock
···
22
22
"jsr:@std/path@^1.1.1": "1.1.2",
23
23
"jsr:@std/streams@^1.0.10": "1.0.12",
24
24
"npm:@deno/vite-plugin@^1.0.5": "1.0.5_vite@7.1.10__@types+node@24.7.2__picomatch@4.0.3_@types+node@24.7.2_@types+node@24.2.0",
25
+
"npm:@libsql/client@0.6.0": "0.6.0",
25
26
"npm:@shikijs/core@^3.7.0": "3.13.0",
26
27
"npm:@shikijs/engine-oniguruma@^3.7.0": "3.13.0",
27
28
"npm:@shikijs/types@^3.7.0": "3.13.0",
···
483
484
"@jridgewell/sourcemap-codec"
484
485
]
485
486
},
487
+
"@libsql/client@0.6.0": {
488
+
"integrity": "sha512-qhQzTG/y2IEVbL3+9PULDvlQFWJ/RnjFXECr/Nc3nRngGiiMysDaOV5VUzYk7DulUX98EA4wi+z3FspKrUplUA==",
489
+
"dependencies": [
490
+
"@libsql/core",
491
+
"@libsql/hrana-client",
492
+
"js-base64",
493
+
"libsql"
494
+
]
495
+
},
496
+
"@libsql/core@0.6.2": {
497
+
"integrity": "sha512-c2P4M+4u/4b2L02A0KjggO3UW51rGkhxr/7fzJO0fEAqsqrWGxuNj2YtRkina/oxfYvAof6xjp8RucNoIV/Odw==",
498
+
"dependencies": [
499
+
"js-base64"
500
+
]
501
+
},
502
+
"@libsql/darwin-arm64@0.3.19": {
503
+
"integrity": "sha512-rmOqsLcDI65zzxlUOoEiPJLhqmbFsZF6p4UJQ2kMqB+Kc0Rt5/A1OAdOZ/Wo8fQfJWjR1IbkbpEINFioyKf+nQ==",
504
+
"os": ["darwin"],
505
+
"cpu": ["arm64"]
506
+
},
507
+
"@libsql/darwin-x64@0.3.19": {
508
+
"integrity": "sha512-q9O55B646zU+644SMmOQL3FIfpmEvdWpRpzubwFc2trsa+zoBlSkHuzU9v/C+UNoPHQVRMP7KQctJ455I/h/xw==",
509
+
"os": ["darwin"],
510
+
"cpu": ["x64"]
511
+
},
512
+
"@libsql/hrana-client@0.6.2": {
513
+
"integrity": "sha512-MWxgD7mXLNf9FXXiM0bc90wCjZSpErWKr5mGza7ERy2FJNNMXd7JIOv+DepBA1FQTIfI8TFO4/QDYgaQC0goNw==",
514
+
"dependencies": [
515
+
"@libsql/isomorphic-fetch",
516
+
"@libsql/isomorphic-ws",
517
+
"js-base64",
518
+
"node-fetch@3.3.2"
519
+
]
520
+
},
521
+
"@libsql/isomorphic-fetch@0.2.5": {
522
+
"integrity": "sha512-8s/B2TClEHms2yb+JGpsVRTPBfy1ih/Pq6h6gvyaNcYnMVJvgQRY7wAa8U2nD0dppbCuDU5evTNMEhrQ17ZKKg=="
523
+
},
524
+
"@libsql/isomorphic-ws@0.1.5": {
525
+
"integrity": "sha512-DtLWIH29onUYR00i0GlQ3UdcTRC6EP4u9w/h9LxpUZJWRMARk6dQwZ6Jkd+QdwVpuAOrdxt18v0K2uIYR3fwFg==",
526
+
"dependencies": [
527
+
"@types/ws",
528
+
"ws"
529
+
]
530
+
},
531
+
"@libsql/linux-arm64-gnu@0.3.19": {
532
+
"integrity": "sha512-mgeAUU1oqqh57k7I3cQyU6Trpdsdt607eFyEmH5QO7dv303ti+LjUvh1pp21QWV6WX7wZyjeJV1/VzEImB+jRg==",
533
+
"os": ["linux"],
534
+
"cpu": ["arm64"]
535
+
},
536
+
"@libsql/linux-arm64-musl@0.3.19": {
537
+
"integrity": "sha512-VEZtxghyK6zwGzU9PHohvNxthruSxBEnRrX7BSL5jQ62tN4n2JNepJ6SdzXp70pdzTfwroOj/eMwiPt94gkVRg==",
538
+
"os": ["linux"],
539
+
"cpu": ["arm64"]
540
+
},
541
+
"@libsql/linux-x64-gnu@0.3.19": {
542
+
"integrity": "sha512-2t/J7LD5w2f63wGihEO+0GxfTyYIyLGEvTFEsMO16XI5o7IS9vcSHrxsvAJs4w2Pf907uDjmc7fUfMg6L82BrQ==",
543
+
"os": ["linux"],
544
+
"cpu": ["x64"]
545
+
},
546
+
"@libsql/linux-x64-musl@0.3.19": {
547
+
"integrity": "sha512-BLsXyJaL8gZD8+3W2LU08lDEd9MIgGds0yPy5iNPp8tfhXx3pV/Fge2GErN0FC+nzt4DYQtjL+A9GUMglQefXQ==",
548
+
"os": ["linux"],
549
+
"cpu": ["x64"]
550
+
},
551
+
"@libsql/win32-x64-msvc@0.3.19": {
552
+
"integrity": "sha512-ay1X9AobE4BpzG0XPw1gplyLZPGHIgJOovvW23gUrukRegiUP62uzhpRbKNogLlUOynyXeq//prHgPXiebUfWg==",
553
+
"os": ["win32"],
554
+
"cpu": ["x64"]
555
+
},
486
556
"@napi-rs/wasm-runtime@1.0.7": {
487
557
"integrity": "sha512-SeDnOO0Tk7Okiq6DbXmmBODgOAb9dp9gjlphokTUxmt8U3liIP1ZsozBahH69j/RJv+Rfs6IwUKHTgQYJ/HBAw==",
488
558
"dependencies": [
···
490
560
"@emnapi/runtime",
491
561
"@tybys/wasm-util"
492
562
]
563
+
},
564
+
"@neon-rs/load@0.0.4": {
565
+
"integrity": "sha512-kTPhdZyTQxB+2wpiRcFWrDcejc4JI6tkPuS7UZCG4l6Zvc5kU/gGQ/ozvHTh1XR5tS+UlfAfGuPajjzQjCiHCw=="
493
566
},
494
567
"@nodelib/fs.scandir@2.1.5": {
495
568
"integrity": "sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==",
···
749
822
"@tailwindcss/oxide@4.1.14": {
750
823
"integrity": "sha512-23yx+VUbBwCg2x5XWdB8+1lkPajzLmALEfMb51zZUBYaYVPDQvBSD/WYDqiVyBIo2BZFa3yw1Rpy3G2Jp+K0dw==",
751
824
"dependencies": [
752
-
"detect-libc",
825
+
"detect-libc@2.1.2",
753
826
"tar"
754
827
],
755
828
"optionalDependencies": [
···
982
1055
"@types/unist@3.0.3": {
983
1056
"integrity": "sha512-ko/gIFJRv177XgZsZcBwnqJN5x/Gien8qNOn0D5bQU/zAzVf9Zt3BlcUiLqhV9y4ARk0GbT3tnUiPNgnTXzc/Q=="
984
1057
},
1058
+
"@types/ws@8.18.1": {
1059
+
"integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==",
1060
+
"dependencies": [
1061
+
"@types/node@24.2.0"
1062
+
]
1063
+
},
985
1064
"@ungap/structured-clone@1.3.0": {
986
1065
"integrity": "sha512-WmoN8qaIAo7WTYWbAZuG8PYEhn5fkz7dZrqTBZ7dtt//lL2Gwms1IcnQ5yHqjDfX8Ft5j4YzDM23f87zBfDe9g=="
987
1066
},
···
1138
1217
"cross-fetch@3.2.0": {
1139
1218
"integrity": "sha512-Q+xVJLoGOeIMXZmbUK4HYk+69cQH6LudR0Vu/pRm2YlU/hDV9CiS0gKUMaWY5f2NeUH9C1nV3bsTlCo0FsTV1Q==",
1140
1219
"dependencies": [
1141
-
"node-fetch"
1220
+
"node-fetch@2.7.0"
1142
1221
]
1143
1222
},
1144
1223
"csstype@3.1.3": {
1145
1224
"integrity": "sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw=="
1146
1225
},
1226
+
"data-uri-to-buffer@4.0.1": {
1227
+
"integrity": "sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A=="
1228
+
},
1147
1229
"debug@4.4.3": {
1148
1230
"integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==",
1149
1231
"dependencies": [
···
1152
1234
},
1153
1235
"dequal@2.0.3": {
1154
1236
"integrity": "sha512-0je+qPKHEMohvfRTCEo3CrPG6cAzAYgmzKyxRiYSSDkS6eGJdyVJm7WaYA5ECaAD9wLB2T4EEeymA5aFVcYXCA=="
1237
+
},
1238
+
"detect-libc@2.0.2": {
1239
+
"integrity": "sha512-UX6sGumvvqSaXgdKGUsgZWqcUyIXZ/vZTrlRT/iobiKhGL0zL4d3osHj3uqllWJK+i+sixDS/3COVEOFbupFyw=="
1155
1240
},
1156
1241
"detect-libc@2.1.2": {
1157
1242
"integrity": "sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ=="
···
1258
1343
"picomatch@4.0.3"
1259
1344
]
1260
1345
},
1346
+
"fetch-blob@3.2.0": {
1347
+
"integrity": "sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==",
1348
+
"dependencies": [
1349
+
"node-domexception",
1350
+
"web-streams-polyfill"
1351
+
]
1352
+
},
1261
1353
"fill-range@7.1.1": {
1262
1354
"integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==",
1263
1355
"dependencies": [
1264
1356
"to-regex-range"
1357
+
]
1358
+
},
1359
+
"formdata-polyfill@4.0.10": {
1360
+
"integrity": "sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==",
1361
+
"dependencies": [
1362
+
"fetch-blob"
1265
1363
]
1266
1364
},
1267
1365
"fraction.js@4.3.7": {
···
1381
1479
"integrity": "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==",
1382
1480
"bin": true
1383
1481
},
1482
+
"js-base64@3.7.8": {
1483
+
"integrity": "sha512-hNngCeKxIUQiEUN3GPJOkz4wF/YvdUdbNL9hsBcMQTkKzboD7T/q3OYOuuPZLUE6dBxSGpwhk5mwuDud7JVAow=="
1484
+
},
1384
1485
"js-tokens@4.0.0": {
1385
1486
"integrity": "sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ=="
1386
1487
},
···
1406
1507
"integrity": "sha512-XmOWe7eyHYH14cLdVPoyg+GOH3rYX++KpzrylJwSW98t3Nk+U8XOl8FWKOgwtzdb8lXGf6zYwDUzeHMWfxasyg==",
1407
1508
"bin": true
1408
1509
},
1510
+
"libsql@0.3.19": {
1511
+
"integrity": "sha512-Aj5cQ5uk/6fHdmeW0TiXK42FqUlwx7ytmMLPSaUQPin5HKKKuUPD62MAbN4OEweGBBI7q1BekoEN4gPUEL6MZA==",
1512
+
"dependencies": [
1513
+
"@neon-rs/load",
1514
+
"detect-libc@2.0.2"
1515
+
],
1516
+
"optionalDependencies": [
1517
+
"@libsql/darwin-arm64",
1518
+
"@libsql/darwin-x64",
1519
+
"@libsql/linux-arm64-gnu",
1520
+
"@libsql/linux-arm64-musl",
1521
+
"@libsql/linux-x64-gnu",
1522
+
"@libsql/linux-x64-musl",
1523
+
"@libsql/win32-x64-msvc"
1524
+
],
1525
+
"os": ["darwin", "linux", "win32"],
1526
+
"cpu": ["x64", "arm64", "wasm32"]
1527
+
},
1409
1528
"lightningcss-darwin-arm64@1.30.1": {
1410
1529
"integrity": "sha512-c8JK7hyE65X1MHMN+Viq9n11RRC7hgin3HhYKhrMyaXflk5GVplZ60IxyoVtzILeKr+xAJwg6zK6sjTBJ0FKYQ==",
1411
1530
"os": ["darwin"],
···
1459
1578
"lightningcss@1.30.1": {
1460
1579
"integrity": "sha512-xi6IyHML+c9+Q3W0S4fCQJOym42pyurFiJUHEcEyHS0CeKzia4yZDEsLlqOFykxOdHpNy0NmvVO31vcSqAxJCg==",
1461
1580
"dependencies": [
1462
-
"detect-libc"
1581
+
"detect-libc@2.1.2"
1463
1582
],
1464
1583
"optionalDependencies": [
1465
1584
"lightningcss-darwin-arm64",
···
1588
1707
"integrity": "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==",
1589
1708
"bin": true
1590
1709
},
1710
+
"node-domexception@1.0.0": {
1711
+
"integrity": "sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==",
1712
+
"deprecated": true
1713
+
},
1591
1714
"node-fetch@2.7.0": {
1592
1715
"integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==",
1593
1716
"dependencies": [
1594
1717
"whatwg-url"
1718
+
]
1719
+
},
1720
+
"node-fetch@3.3.2": {
1721
+
"integrity": "sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==",
1722
+
"dependencies": [
1723
+
"data-uri-to-buffer",
1724
+
"fetch-blob",
1725
+
"formdata-polyfill"
1595
1726
]
1596
1727
},
1597
1728
"node-releases@2.0.23": {
···
2087
2218
],
2088
2219
"bin": true
2089
2220
},
2221
+
"web-streams-polyfill@3.3.3": {
2222
+
"integrity": "sha512-d2JWLCivmZYTSIoge9MsgFCZrt571BikcWGYkjC1khllbTeDlGqZ2D8vD8E/lJa8WGWbb7Plm8/XJYV7IJHZZw=="
2223
+
},
2090
2224
"webidl-conversions@3.0.1": {
2091
2225
"integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ=="
2092
2226
},
···
2096
2230
"tr46",
2097
2231
"webidl-conversions"
2098
2232
]
2233
+
},
2234
+
"ws@8.18.3": {
2235
+
"integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg=="
2099
2236
},
2100
2237
"xtend@4.0.2": {
2101
2238
"integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ=="
···
2188
2325
"npm:ts-morph@26.0.0"
2189
2326
]
2190
2327
},
2328
+
"packages/oauth": {
2329
+
"dependencies": [
2330
+
"npm:@libsql/client@0.6.0"
2331
+
]
2332
+
},
2191
2333
"packages/session": {
2192
2334
"dependencies": [
2335
+
"npm:@libsql/client@0.6.0",
2193
2336
"npm:pg@^8.16.3"
2194
2337
]
2195
2338
}
+20
-7
docs/graphql-api.md
+20
-7
docs/graphql-api.md
···
881
881
882
882
**Returns:**
883
883
884
-
- `blob`: A JSON blob object containing:
885
-
- `ref`: The CID (content identifier) reference for the blob
886
-
- `mimeType`: The MIME type of the uploaded blob
887
-
- `size`: The size of the blob in bytes
884
+
- `blob`: A Blob object containing:
885
+
- `ref` (String): The CID (content identifier) reference for the blob
886
+
- `mimeType` (String): The MIME type of the uploaded blob
887
+
- `size` (Int): The size of the blob in bytes
888
+
- `url` (String): CDN URL for the blob (supports presets)
888
889
889
890
**Example with Variables:**
890
891
···
897
898
898
899
**Usage in Records:**
899
900
900
-
After uploading a blob, use the returned blob object in your record mutations:
901
+
After uploading a blob, use the returned blob object in your record mutations. You can provide the blob as a complete object with `ref` as a String:
901
902
902
903
```graphql
903
904
mutation UpdateProfile($avatar: JSON) {
···
905
906
rkey: "self"
906
907
input: {
907
908
displayName: "My Name"
908
-
avatar: $avatar # Use the blob object from uploadBlob
909
+
avatar: $avatar # Blob object with ref as String (CID)
909
910
}
910
911
) {
911
912
uri
912
913
displayName
913
914
avatar {
914
-
ref
915
+
ref # Returns as String (CID)
915
916
mimeType
916
917
size
917
918
url(preset: "avatar")
···
919
920
}
920
921
}
921
922
```
923
+
924
+
**Example blob object for mutations:**
925
+
926
+
```json
927
+
{
928
+
"ref": "bafyreigbtj4x7ip5legnfznufuopl4sg4knzc2cof6duas4b3q2fy6swua",
929
+
"mimeType": "image/jpeg",
930
+
"size": 245678
931
+
}
932
+
```
933
+
934
+
**Note:** The GraphQL API automatically handles the conversion between the GraphQL format (where `ref` is a String containing the CID) and the AT Protocol format (where `ref` is an object `{$link: "cid"}`). You always work with `ref` as a simple String in GraphQL queries and mutations.
922
935
923
936
### Create Records
924
937
+5
-1
frontend-v2/schema.graphql
+5
-1
frontend-v2/schema.graphql
···
931
931
}
932
932
933
933
type BlobUploadResponse {
934
-
blob: JSON!
934
+
blob: Blob!
935
935
}
936
936
937
937
type CollectionStats {
···
1495
1495
Get statistics for this slice including collection counts, record counts, and actor counts
1496
1496
"""
1497
1497
stats: SliceStats!
1498
+
1499
+
"""Get all OAuth clients for this slice"""
1500
+
oauthClients: [OAuthClient!]!
1498
1501
}
1499
1502
1500
1503
type NetworkSlicesSliceAggregated {
···
2105
2108
type SyncJob {
2106
2109
id: ID!
2107
2110
jobId: String!
2111
+
sliceUri: String!
2108
2112
status: String!
2109
2113
createdAt: String!
2110
2114
startedAt: String
+8
-7
frontend-v2/server/profile-init.ts
+8
-7
frontend-v2/server/profile-init.ts
···
18
18
export async function initializeUserProfile(
19
19
userDid: string,
20
20
userHandle: string,
21
-
tokens: TokenInfo
21
+
tokens: TokenInfo,
22
22
): Promise<void> {
23
23
if (!API_URL || !SLICE_URI) {
24
24
console.error("Missing API_URL or VITE_SLICE_URI environment variables");
···
26
26
}
27
27
28
28
try {
29
-
const graphqlUrl = `${API_URL}/graphql?slice=${encodeURIComponent(SLICE_URI)}`;
29
+
const graphqlUrl = `${API_URL}/graphql?slice=${
30
+
encodeURIComponent(SLICE_URI)
31
+
}`;
30
32
const authHeader = `${tokens.tokenType} ${tokens.accessToken}`;
31
33
32
34
// 1. Check if profile already exists
···
132
134
});
133
135
134
136
if (!bskyResponse.ok) {
135
-
throw new Error(`Fetch Bluesky profile failed: ${bskyResponse.statusText}`);
137
+
throw new Error(
138
+
`Fetch Bluesky profile failed: ${bskyResponse.statusText}`,
139
+
);
136
140
}
137
141
138
142
const bskyData = await bskyResponse.json();
···
160
164
) {
161
165
// Reconstruct blob format for AT Protocol
162
166
profileInput.avatar = {
163
-
$type: "blob",
164
-
ref: {
165
-
$link: bskyProfile.avatar.ref,
166
-
},
167
+
ref: bskyProfile.avatar.ref,
167
168
mimeType: bskyProfile.avatar.mimeType,
168
169
size: bskyProfile.avatar.size,
169
170
};
+66
-68
frontend-v2/src/__generated__/OAuthClientsQuery.graphql.ts
+66
-68
frontend-v2/src/__generated__/OAuthClientsQuery.graphql.ts
···
1
1
/**
2
-
* @generated SignedSource<<4c24e57ecdb7163fc62f4069422aac37>>
2
+
* @generated SignedSource<<4bc4595b4bf2e4b263476f66a31ccca4>>
3
3
* @lightSyntaxTransform
4
4
* @nogrep
5
5
*/
···
41
41
lte?: string | null | undefined;
42
42
};
43
43
export type OAuthClientsQuery$variables = {
44
-
slice: string;
45
44
where?: NetworkSlicesSliceWhereInput | null | undefined;
46
45
};
47
46
export type OAuthClientsQuery$data = {
···
51
50
readonly actorHandle: string | null | undefined;
52
51
readonly did: string;
53
52
readonly name: string;
53
+
readonly oauthClients: ReadonlyArray<{
54
+
readonly clientId: string;
55
+
readonly clientName: string;
56
+
readonly clientSecret: string | null | undefined;
57
+
readonly clientUri: string | null | undefined;
58
+
readonly createdAt: string;
59
+
readonly createdByDid: string;
60
+
readonly logoUri: string | null | undefined;
61
+
readonly policyUri: string | null | undefined;
62
+
readonly redirectUris: ReadonlyArray<string>;
63
+
readonly scope: string | null | undefined;
64
+
readonly tosUri: string | null | undefined;
65
+
}>;
66
+
readonly uri: string;
54
67
};
55
68
}>;
56
69
};
57
-
readonly oauthClients: ReadonlyArray<{
58
-
readonly clientId: string;
59
-
readonly clientName: string;
60
-
readonly clientSecret: string | null | undefined;
61
-
readonly clientUri: string | null | undefined;
62
-
readonly createdAt: string;
63
-
readonly createdByDid: string;
64
-
readonly logoUri: string | null | undefined;
65
-
readonly policyUri: string | null | undefined;
66
-
readonly redirectUris: ReadonlyArray<string>;
67
-
readonly scope: string | null | undefined;
68
-
readonly tosUri: string | null | undefined;
69
-
}>;
70
70
};
71
71
export type OAuthClientsQuery = {
72
72
response: OAuthClientsQuery$data;
···
78
78
{
79
79
"defaultValue": null,
80
80
"kind": "LocalArgument",
81
-
"name": "slice"
81
+
"name": "where"
82
+
}
83
+
],
84
+
v1 = [
85
+
{
86
+
"kind": "Literal",
87
+
"name": "first",
88
+
"value": 1
82
89
},
83
90
{
84
-
"defaultValue": null,
85
-
"kind": "LocalArgument",
86
-
"name": "where"
91
+
"kind": "Variable",
92
+
"name": "where",
93
+
"variableName": "where"
87
94
}
88
95
],
89
-
v1 = {
96
+
v2 = {
97
+
"alias": null,
98
+
"args": null,
99
+
"kind": "ScalarField",
100
+
"name": "name",
101
+
"storageKey": null
102
+
},
103
+
v3 = {
90
104
"alias": null,
91
-
"args": [
92
-
{
93
-
"kind": "Variable",
94
-
"name": "slice",
95
-
"variableName": "slice"
96
-
}
97
-
],
105
+
"args": null,
106
+
"kind": "ScalarField",
107
+
"name": "did",
108
+
"storageKey": null
109
+
},
110
+
v4 = {
111
+
"alias": null,
112
+
"args": null,
113
+
"kind": "ScalarField",
114
+
"name": "actorHandle",
115
+
"storageKey": null
116
+
},
117
+
v5 = {
118
+
"alias": null,
119
+
"args": null,
120
+
"kind": "ScalarField",
121
+
"name": "uri",
122
+
"storageKey": null
123
+
},
124
+
v6 = {
125
+
"alias": null,
126
+
"args": null,
98
127
"concreteType": "OAuthClient",
99
128
"kind": "LinkedField",
100
129
"name": "oauthClients",
···
179
208
}
180
209
],
181
210
"storageKey": null
182
-
},
183
-
v2 = [
184
-
{
185
-
"kind": "Literal",
186
-
"name": "first",
187
-
"value": 1
188
-
},
189
-
{
190
-
"kind": "Variable",
191
-
"name": "where",
192
-
"variableName": "where"
193
-
}
194
-
],
195
-
v3 = {
196
-
"alias": null,
197
-
"args": null,
198
-
"kind": "ScalarField",
199
-
"name": "name",
200
-
"storageKey": null
201
-
},
202
-
v4 = {
203
-
"alias": null,
204
-
"args": null,
205
-
"kind": "ScalarField",
206
-
"name": "did",
207
-
"storageKey": null
208
-
},
209
-
v5 = {
210
-
"alias": null,
211
-
"args": null,
212
-
"kind": "ScalarField",
213
-
"name": "actorHandle",
214
-
"storageKey": null
215
211
};
216
212
return {
217
213
"fragment": {
···
220
216
"metadata": null,
221
217
"name": "OAuthClientsQuery",
222
218
"selections": [
223
-
(v1/*: any*/),
224
219
{
225
220
"alias": null,
226
-
"args": (v2/*: any*/),
221
+
"args": (v1/*: any*/),
227
222
"concreteType": "NetworkSlicesSliceConnection",
228
223
"kind": "LinkedField",
229
224
"name": "networkSlicesSlices",
···
245
240
"name": "node",
246
241
"plural": false,
247
242
"selections": [
243
+
(v2/*: any*/),
248
244
(v3/*: any*/),
249
245
(v4/*: any*/),
250
-
(v5/*: any*/)
246
+
(v5/*: any*/),
247
+
(v6/*: any*/)
251
248
],
252
249
"storageKey": null
253
250
}
···
267
264
"kind": "Operation",
268
265
"name": "OAuthClientsQuery",
269
266
"selections": [
270
-
(v1/*: any*/),
271
267
{
272
268
"alias": null,
273
-
"args": (v2/*: any*/),
269
+
"args": (v1/*: any*/),
274
270
"concreteType": "NetworkSlicesSliceConnection",
275
271
"kind": "LinkedField",
276
272
"name": "networkSlicesSlices",
···
292
288
"name": "node",
293
289
"plural": false,
294
290
"selections": [
291
+
(v2/*: any*/),
295
292
(v3/*: any*/),
296
293
(v4/*: any*/),
297
294
(v5/*: any*/),
295
+
(v6/*: any*/),
298
296
{
299
297
"alias": null,
300
298
"args": null,
···
314
312
]
315
313
},
316
314
"params": {
317
-
"cacheID": "20abc4b49d5c52da4a3ad1935662056a",
315
+
"cacheID": "953a2b7074ba3074cca3f11991af440e",
318
316
"id": null,
319
317
"metadata": {},
320
318
"name": "OAuthClientsQuery",
321
319
"operationKind": "query",
322
-
"text": "query OAuthClientsQuery(\n $slice: String!\n $where: NetworkSlicesSliceWhereInput\n) {\n oauthClients(slice: $slice) {\n clientId\n clientSecret\n clientName\n redirectUris\n scope\n clientUri\n logoUri\n tosUri\n policyUri\n createdAt\n createdByDid\n }\n networkSlicesSlices(first: 1, where: $where) {\n edges {\n node {\n name\n did\n actorHandle\n id\n }\n }\n }\n}\n"
320
+
"text": "query OAuthClientsQuery(\n $where: NetworkSlicesSliceWhereInput\n) {\n networkSlicesSlices(first: 1, where: $where) {\n edges {\n node {\n name\n did\n actorHandle\n uri\n oauthClients {\n clientId\n clientSecret\n clientName\n redirectUris\n scope\n clientUri\n logoUri\n tosUri\n policyUri\n createdAt\n createdByDid\n }\n id\n }\n }\n }\n}\n"
323
321
}
324
322
};
325
323
})();
326
324
327
-
(node as any).hash = "b3eda4c7e0bda285a5261efa81e7b5cd";
325
+
(node as any).hash = "4c0e3d21f0879129255130f260edcb75";
328
326
329
327
export default node;
+35
-6
frontend-v2/src/__generated__/ProfileSettingsUploadBlobMutation.graphql.ts
+35
-6
frontend-v2/src/__generated__/ProfileSettingsUploadBlobMutation.graphql.ts
···
1
1
/**
2
-
* @generated SignedSource<<a2334c7e93bb6d5b4748df1211a418ae>>
2
+
* @generated SignedSource<<728b9a3525f975b6c58a5cdcd323f89e>>
3
3
* @lightSyntaxTransform
4
4
* @nogrep
5
5
*/
···
15
15
};
16
16
export type ProfileSettingsUploadBlobMutation$data = {
17
17
readonly uploadBlob: {
18
-
readonly blob: any;
18
+
readonly blob: {
19
+
readonly mimeType: string;
20
+
readonly ref: string;
21
+
readonly size: number;
22
+
};
19
23
};
20
24
};
21
25
export type ProfileSettingsUploadBlobMutation = {
···
59
63
{
60
64
"alias": null,
61
65
"args": null,
62
-
"kind": "ScalarField",
66
+
"concreteType": "Blob",
67
+
"kind": "LinkedField",
63
68
"name": "blob",
69
+
"plural": false,
70
+
"selections": [
71
+
{
72
+
"alias": null,
73
+
"args": null,
74
+
"kind": "ScalarField",
75
+
"name": "ref",
76
+
"storageKey": null
77
+
},
78
+
{
79
+
"alias": null,
80
+
"args": null,
81
+
"kind": "ScalarField",
82
+
"name": "mimeType",
83
+
"storageKey": null
84
+
},
85
+
{
86
+
"alias": null,
87
+
"args": null,
88
+
"kind": "ScalarField",
89
+
"name": "size",
90
+
"storageKey": null
91
+
}
92
+
],
64
93
"storageKey": null
65
94
}
66
95
],
···
85
114
"selections": (v1/*: any*/)
86
115
},
87
116
"params": {
88
-
"cacheID": "3a4a6b19d2898f14635b098941614cab",
117
+
"cacheID": "afd8db2ee7590308e81afc0b0e5c86dd",
89
118
"id": null,
90
119
"metadata": {},
91
120
"name": "ProfileSettingsUploadBlobMutation",
92
121
"operationKind": "mutation",
93
-
"text": "mutation ProfileSettingsUploadBlobMutation(\n $data: String!\n $mimeType: String!\n) {\n uploadBlob(data: $data, mimeType: $mimeType) {\n blob\n }\n}\n"
122
+
"text": "mutation ProfileSettingsUploadBlobMutation(\n $data: String!\n $mimeType: String!\n) {\n uploadBlob(data: $data, mimeType: $mimeType) {\n blob {\n ref\n mimeType\n size\n }\n }\n}\n"
94
123
}
95
124
};
96
125
})();
97
126
98
-
(node as any).hash = "76da65b07a282ed7f2dee12b4cac82d6";
127
+
(node as any).hash = "74a3a8bf43181cd62d2e81c45be384e5";
99
128
100
129
export default node;
+19
-23
frontend-v2/src/pages/OAuthClients.tsx
+19
-23
frontend-v2/src/pages/OAuthClients.tsx
···
138
138
export default function OAuthClients() {
139
139
const { handle, rkey } = useParams<{ handle: string; rkey: string }>();
140
140
141
-
// Build slice URI from params
142
-
const sliceUri =
143
-
`at://did:placeholder/${handle}/network.slices.slice/${rkey}`;
144
-
145
141
return (
146
142
<Suspense
147
143
fallback={
···
152
148
</Layout>
153
149
}
154
150
>
155
-
<OAuthClientsWrapper sliceUri={sliceUri} handle={handle!} rkey={rkey!} />
151
+
<OAuthClientsWrapper handle={handle!} rkey={rkey!} />
156
152
</Suspense>
157
153
);
158
154
}
159
155
160
156
function OAuthClientsWrapper(
161
-
{ sliceUri, handle, rkey }: {
162
-
sliceUri: string;
157
+
{ handle, rkey }: {
163
158
handle: string;
164
159
rkey: string;
165
160
},
166
161
) {
167
162
const { session } = useSessionContext();
163
+
168
164
const data = useLazyLoadQuery<OAuthClientsQuery>(
169
165
graphql`
170
166
query OAuthClientsQuery(
171
-
$slice: String!
172
167
$where: NetworkSlicesSliceWhereInput
173
168
) {
174
-
oauthClients(slice: $slice) {
175
-
clientId
176
-
clientSecret
177
-
clientName
178
-
redirectUris
179
-
scope
180
-
clientUri
181
-
logoUri
182
-
tosUri
183
-
policyUri
184
-
createdAt
185
-
createdByDid
186
-
}
187
169
networkSlicesSlices(first: 1, where: $where) {
188
170
edges {
189
171
node {
190
172
name
191
173
did
192
174
actorHandle
175
+
uri
176
+
oauthClients {
177
+
clientId
178
+
clientSecret
179
+
clientName
180
+
redirectUris
181
+
scope
182
+
clientUri
183
+
logoUri
184
+
tosUri
185
+
policyUri
186
+
createdAt
187
+
createdByDid
188
+
}
193
189
}
194
190
}
195
191
}
196
192
}
197
193
`,
198
194
{
199
-
slice: sliceUri,
200
195
where: {
201
196
actorHandle: { eq: handle },
202
197
uri: { contains: rkey },
···
207
202
208
203
const slice = data.networkSlicesSlices.edges[0]?.node;
209
204
const sliceName = slice?.name;
205
+
const sliceUri = slice?.uri || `at://${slice?.did}/network.slices.slice/${rkey}`;
210
206
211
207
// Check if current user is the slice owner or admin
212
208
const isOwner = isSliceOwner(slice, session);
···
229
225
}
230
226
>
231
227
<OAuthClientsContent
232
-
clients={data.oauthClients || []}
228
+
clients={slice?.oauthClients || []}
233
229
sliceUri={sliceUri}
234
230
/>
235
231
</Layout>
+18
-13
frontend-v2/src/pages/ProfileSettings.tsx
+18
-13
frontend-v2/src/pages/ProfileSettings.tsx
···
1
-
import { useParams, Link } from "react-router-dom";
1
+
import { Link, useParams } from "react-router-dom";
2
2
import { useState } from "react";
3
3
import { graphql, useLazyLoadQuery, useMutation } from "react-relay";
4
4
import type { ProfileSettingsQuery } from "../__generated__/ProfileSettingsQuery.graphql.ts";
···
44
44
where: {
45
45
actorHandle: { eq: handle },
46
46
},
47
-
}
47
+
},
48
48
);
49
49
50
50
const profile = data.networkSlicesActorProfiles.edges[0]?.node;
···
59
59
graphql`
60
60
mutation ProfileSettingsUploadBlobMutation($data: String!, $mimeType: String!) {
61
61
uploadBlob(data: $data, mimeType: $mimeType) {
62
-
blob
62
+
blob {
63
+
ref
64
+
mimeType
65
+
size
66
+
}
63
67
}
64
68
}
65
-
`
69
+
`,
66
70
);
67
71
68
72
const [commitUpdateProfile, isUpdatingProfile] = useMutation(
···
80
84
}
81
85
}
82
86
}
83
-
`
87
+
`,
84
88
);
85
89
86
90
const [commitCreateProfile, isCreatingProfile] = useMutation(
···
98
102
}
99
103
}
100
104
}
101
-
`
105
+
`,
102
106
);
103
107
104
108
// Helper to convert File to base64
···
108
112
reader.onload = () => {
109
113
const arrayBuffer = reader.result as ArrayBuffer;
110
114
const bytes = new Uint8Array(arrayBuffer);
111
-
const binary = Array.from(bytes).map(b => String.fromCharCode(b)).join('');
115
+
const binary = Array.from(bytes).map((b) => String.fromCharCode(b))
116
+
.join("");
112
117
resolve(btoa(binary));
113
118
};
114
119
reader.onerror = reject;
···
129
134
// Upload new avatar
130
135
const base64Data = await fileToBase64(avatarFile);
131
136
132
-
const uploadResult = await new Promise<{ uploadBlob: { blob: unknown } }>((resolve, reject) => {
137
+
const uploadResult = await new Promise<
138
+
{ uploadBlob: { blob: unknown } }
139
+
>((resolve, reject) => {
133
140
commitUploadBlob({
134
141
variables: {
135
142
data: base64Data,
136
143
mimeType: avatarFile.type,
137
144
},
138
-
onCompleted: (data) => resolve(data as { uploadBlob: { blob: unknown } }),
145
+
onCompleted: (data) =>
146
+
resolve(data as { uploadBlob: { blob: unknown } }),
139
147
onError: (error) => reject(error),
140
148
});
141
149
});
···
144
152
} else if (profile?.avatar) {
145
153
// Keep existing avatar - reconstruct blob with $type field for AT Protocol
146
154
avatarBlob = {
147
-
$type: "blob",
148
-
ref: {
149
-
$link: profile.avatar.ref,
150
-
},
155
+
ref: profile.avatar.ref,
151
156
mimeType: profile.avatar.mimeType,
152
157
size: profile.avatar.size,
153
158
};
+11
-11
packages/session/src/adapters/postgres.ts
+11
-11
packages/session/src/adapters/postgres.ts
···
6
6
user_id: string;
7
7
handle: string | null;
8
8
is_authenticated: boolean;
9
-
data: string | null;
10
-
created_at: Date;
11
-
expires_at: Date;
12
-
last_accessed_at: Date;
9
+
data: Record<string, unknown> | null;
10
+
created_at: number;
11
+
expires_at: number;
12
+
last_accessed_at: number;
13
13
}
14
14
15
15
export class PostgresAdapter implements SessionAdapter {
···
100
100
data.userId,
101
101
data.handle || null,
102
102
data.isAuthenticated,
103
-
data.data ? JSON.stringify(data.data) : null,
103
+
data.data || null,
104
104
data.createdAt,
105
105
data.expiresAt,
106
106
data.lastAccessedAt,
···
116
116
updates: Partial<SessionData>
117
117
): Promise<boolean> {
118
118
const setParts: string[] = [];
119
-
const values: (string | number | boolean | null)[] = [];
119
+
const values: (string | number | boolean | null | Record<string, unknown>)[] = [];
120
120
let paramIndex = 1;
121
121
122
122
if (updates.userId !== undefined) {
···
136
136
137
137
if (updates.data !== undefined) {
138
138
setParts.push(`data = $${paramIndex++}`);
139
-
values.push(updates.data ? JSON.stringify(updates.data) : null);
139
+
values.push(updates.data || null);
140
140
}
141
141
142
142
if (updates.expiresAt !== undefined) {
···
226
226
userId: row.user_id,
227
227
handle: row.handle || undefined,
228
228
isAuthenticated: row.is_authenticated,
229
-
data: row.data ? JSON.parse(row.data) : undefined,
230
-
createdAt: row.created_at.getTime(),
231
-
expiresAt: row.expires_at.getTime(),
232
-
lastAccessedAt: row.last_accessed_at.getTime(),
229
+
data: row.data || undefined,
230
+
createdAt: row.created_at,
231
+
expiresAt: row.expires_at,
232
+
lastAccessedAt: row.last_accessed_at,
233
233
};
234
234
}
235
235