Highly ambitious ATProtocol AppView service and sdks

Compare changes

Choose any two refs to compare.

Changed files
+1169 -201
api
crates
slices-lexicon
src
validation
primitive
docs
frontend-v2
packages
session
src
adapters
+2 -2
api/Cargo.toml
··· 63 63 sqlxmq = "0.6" 64 64 regex = "1.11.2" 65 65 66 - # Redis for caching 67 - redis = { version = "0.32", features = ["tokio-comp", "connection-manager"] } 66 + # Redis for caching and pub/sub 67 + redis = { version = "0.32", features = ["tokio-comp", "connection-manager", "aio"] } 68 68 69 69 # GraphQL server 70 70 async-graphql = { version = "7.0", features = ["dynamic-schema", "dataloader"] }
+500 -24
api/src/graphql/schema_builder.rs
··· 21 21 add_cancel_job_mutation, add_create_oauth_client_mutation, add_delete_job_mutation, 22 22 add_delete_oauth_client_mutation, add_delete_slice_records_mutation, add_get_sync_summary_query, 23 23 add_jetstream_logs_query, add_jetstream_logs_subscription, add_oauth_clients_query, 24 - add_slice_records_query, add_sparklines_query, add_sparklines_field_to_slice, 25 - add_start_sync_mutation, add_stats_field_to_slice, add_sync_job_logs_query, add_sync_job_query, 26 - add_sync_job_subscription, add_sync_jobs_query, add_update_oauth_client_mutation, 27 - add_upload_blob_mutation, create_blob_upload_response_type, create_collection_stats_type, 28 - create_collection_summary_type, create_delete_slice_records_output_type, 29 - create_jetstream_log_entry_type, create_oauth_client_type, create_slice_record_type, 30 - create_slice_record_edge_type, create_slice_records_connection_type, 31 - create_slice_records_where_input, create_slice_sparkline_type, create_slice_stats_type, 32 - create_sparkline_point_type, create_start_sync_output_type, create_sync_job_result_type, 33 - create_sync_job_type, create_sync_summary_type, 24 + add_oauth_clients_field_to_slice, add_slice_records_query, add_sparklines_query, 25 + add_sparklines_field_to_slice, add_start_sync_mutation, add_stats_field_to_slice, 26 + add_sync_job_logs_query, add_sync_job_query, add_sync_job_subscription, add_sync_jobs_query, 27 + add_update_oauth_client_mutation, add_upload_blob_mutation, create_blob_upload_response_type, 28 + create_collection_stats_type, create_collection_summary_type, 29 + create_delete_slice_records_output_type, create_jetstream_log_entry_type, 30 + create_oauth_client_type, create_slice_record_type, create_slice_record_edge_type, 31 + create_slice_records_connection_type, create_slice_records_where_input, 32 + create_slice_sparkline_type, create_slice_stats_type, create_sparkline_point_type, 33 + create_start_sync_output_type, create_sync_job_result_type, create_sync_job_type, 34 + create_sync_summary_type, 34 35 }; 35 36 use crate::graphql::types::{extract_collection_fields, extract_record_key, GraphQLField, GraphQLType}; 36 37 use crate::graphql::PUBSUB; ··· 44 45 at_uri_fields: Vec<String>, // Fields with format "at-uri" for reverse joins 45 46 } 46 47 48 + /// Type registry for tracking generated nested object types 49 + type TypeRegistry = HashMap<String, Object>; 50 + 51 + /// Container for nested object field values 52 + #[derive(Clone)] 53 + struct NestedObjectContainer { 54 + data: serde_json::Value, 55 + } 56 + 57 + /// Generates a unique type name for a nested object field 58 + fn generate_nested_type_name(parent_type: &str, field_name: &str) -> String { 59 + let mut chars = field_name.chars(); 60 + let capitalized_field = match chars.next() { 61 + None => String::new(), 62 + Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(), 63 + }; 64 + format!("{}{}", parent_type, capitalized_field) 65 + } 66 + 67 + /// Resolves a lexicon ref and generates a GraphQL type for it 68 + /// Returns the generated type name 69 + fn resolve_lexicon_ref_type( 70 + ref_nsid: &str, 71 + current_lexicon_nsid: &str, 72 + all_lexicons: &[serde_json::Value], 73 + type_registry: &mut TypeRegistry, 74 + database: &Database, 75 + ) -> String { 76 + // Handle different ref formats: 77 + // 1. Local ref: #image 78 + // 2. External ref with specific def: app.bsky.embed.defs#aspectRatio 79 + // 3. External ref to main: community.lexicon.location.hthree 80 + let (target_nsid, def_name) = if ref_nsid.starts_with('#') { 81 + // Local ref - use current lexicon NSID and the def name without # 82 + (current_lexicon_nsid, &ref_nsid[1..]) 83 + } else if let Some(hash_pos) = ref_nsid.find('#') { 84 + // External ref with specific def - split on # 85 + (&ref_nsid[..hash_pos], &ref_nsid[hash_pos + 1..]) 86 + } else { 87 + // External ref to main def 88 + (ref_nsid, "main") 89 + }; 90 + 91 + // Generate type name from NSID and def name 92 + let type_name = if def_name == "main" { 93 + // For refs to main: CommunityLexiconLocationHthree 94 + nsid_to_type_name(target_nsid) 95 + } else { 96 + // For refs to specific def: AppBskyEmbedDefsAspectRatio 97 + format!("{}{}", nsid_to_type_name(target_nsid), capitalize_first(def_name)) 98 + }; 99 + 100 + // Check if already generated 101 + if type_registry.contains_key(&type_name) { 102 + return type_name; 103 + } 104 + 105 + // Find the lexicon definition 106 + let lexicon = all_lexicons.iter().find(|lex| { 107 + lex.get("id").and_then(|id| id.as_str()) == Some(target_nsid) 108 + }); 109 + 110 + if let Some(lex) = lexicon { 111 + // Extract the definition (either "main" or specific def like "image") 112 + if let Some(defs) = lex.get("defs") { 113 + if let Some(def) = defs.get(def_name) { 114 + // Extract fields from this specific definition 115 + if let Some(properties) = def.get("properties") { 116 + let fields = extract_fields_from_properties(properties); 117 + 118 + if !fields.is_empty() { 119 + // Generate the type using existing nested object generator 120 + generate_nested_object_type(&type_name, &fields, type_registry, database); 121 + return type_name; 122 + } 123 + } 124 + } 125 + } 126 + } 127 + 128 + // Fallback: couldn't resolve the ref, will use JSON 129 + tracing::warn!("Could not resolve lexicon ref: {} (target: {}, def: {})", ref_nsid, target_nsid, def_name); 130 + type_name 131 + } 132 + 133 + /// Capitalizes the first character of a string 134 + fn capitalize_first(s: &str) -> String { 135 + let mut chars = s.chars(); 136 + match chars.next() { 137 + None => String::new(), 138 + Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(), 139 + } 140 + } 141 + 142 + /// Extracts fields from a lexicon properties object 143 + fn extract_fields_from_properties(properties: &serde_json::Value) -> Vec<GraphQLField> { 144 + let mut fields = Vec::new(); 145 + 146 + if let Some(props) = properties.as_object() { 147 + for (field_name, field_def) in props { 148 + let field_type_str = field_def.get("type").and_then(|t| t.as_str()).unwrap_or("unknown"); 149 + let field_type = crate::graphql::types::map_lexicon_type_to_graphql(field_type_str, field_def); 150 + 151 + // Check if field is required 152 + let is_required = false; // We'd need the parent's "required" array to know this 153 + 154 + // Extract format if present 155 + let format = field_def.get("format").and_then(|f| f.as_str()).map(|s| s.to_string()); 156 + 157 + fields.push(GraphQLField { 158 + name: field_name.clone(), 159 + field_type, 160 + is_required, 161 + format, 162 + }); 163 + } 164 + } 165 + 166 + fields 167 + } 168 + 169 + /// Recursively generates GraphQL object types for nested objects 170 + /// Returns the type name of the generated object type 171 + fn generate_nested_object_type( 172 + type_name: &str, 173 + fields: &[GraphQLField], 174 + type_registry: &mut TypeRegistry, 175 + database: &Database, 176 + ) -> String { 177 + // Check if type already exists in registry 178 + if type_registry.contains_key(type_name) { 179 + return type_name.to_string(); 180 + } 181 + 182 + let mut object = Object::new(type_name); 183 + 184 + // Add fields to the object 185 + for field in fields { 186 + let field_name = field.name.clone(); 187 + let field_name_for_field = field_name.clone(); // Clone for Field::new 188 + let field_type = field.field_type.clone(); 189 + 190 + // Determine the TypeRef for this field 191 + let type_ref = match &field.field_type { 192 + GraphQLType::Object(nested_fields) => { 193 + // Generate nested object type recursively 194 + let nested_type_name = generate_nested_type_name(type_name, &field_name); 195 + let actual_type_name = generate_nested_object_type( 196 + &nested_type_name, 197 + nested_fields, 198 + type_registry, 199 + database, 200 + ); 201 + 202 + if field.is_required { 203 + TypeRef::named_nn(actual_type_name) 204 + } else { 205 + TypeRef::named(actual_type_name) 206 + } 207 + } 208 + GraphQLType::Array(inner) => { 209 + if let GraphQLType::Object(nested_fields) = inner.as_ref() { 210 + // Generate nested object type for array items 211 + let nested_type_name = generate_nested_type_name(type_name, &field_name); 212 + let actual_type_name = generate_nested_object_type( 213 + &nested_type_name, 214 + nested_fields, 215 + type_registry, 216 + database, 217 + ); 218 + 219 + if field.is_required { 220 + TypeRef::named_nn_list(actual_type_name) 221 + } else { 222 + TypeRef::named_list(actual_type_name) 223 + } 224 + } else { 225 + // Use standard type ref for arrays of primitives 226 + graphql_type_to_typeref(&field.field_type, field.is_required) 227 + } 228 + } 229 + _ => { 230 + // Use standard type ref for other types 231 + graphql_type_to_typeref(&field.field_type, field.is_required) 232 + } 233 + }; 234 + 235 + // Add field with resolver 236 + object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| { 237 + let field_name = field_name.clone(); 238 + let field_type = field_type.clone(); 239 + 240 + FieldFuture::new(async move { 241 + // Get parent container 242 + let container = ctx.parent_value.try_downcast_ref::<NestedObjectContainer>()?; 243 + let value = container.data.get(&field_name); 244 + 245 + if let Some(val) = value { 246 + if val.is_null() { 247 + return Ok(None); 248 + } 249 + 250 + // For nested objects, wrap in container 251 + if matches!(field_type, GraphQLType::Object(_)) { 252 + let nested_container = NestedObjectContainer { 253 + data: val.clone(), 254 + }; 255 + return Ok(Some(FieldValue::owned_any(nested_container))); 256 + } 257 + 258 + // For arrays of objects, wrap each item 259 + if let GraphQLType::Array(inner) = &field_type { 260 + if matches!(inner.as_ref(), GraphQLType::Object(_)) { 261 + if let Some(arr) = val.as_array() { 262 + let containers: Vec<FieldValue> = arr 263 + .iter() 264 + .map(|item| { 265 + let nested_container = NestedObjectContainer { 266 + data: item.clone(), 267 + }; 268 + FieldValue::owned_any(nested_container) 269 + }) 270 + .collect(); 271 + return Ok(Some(FieldValue::list(containers))); 272 + } 273 + return Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))); 274 + } 275 + } 276 + 277 + // For other types, return the GraphQL value 278 + let graphql_val = json_to_graphql_value(val); 279 + Ok(Some(FieldValue::value(graphql_val))) 280 + } else { 281 + Ok(None) 282 + } 283 + }) 284 + })); 285 + } 286 + 287 + // Store the generated type in registry 288 + type_registry.insert(type_name.to_string(), object); 289 + type_name.to_string() 290 + } 291 + 47 292 /// Builds a dynamic GraphQL schema from lexicons for a given slice 48 293 pub async fn build_graphql_schema(database: Database, slice_uri: String, auth_base_url: String) -> Result<Schema, String> { 49 294 // Fetch all lexicons for this slice ··· 109 354 } 110 355 } 111 356 357 + // Initialize type registry for nested object types 358 + let mut type_registry: TypeRegistry = HashMap::new(); 359 + 112 360 // Second pass: create types and queries 113 361 for lexicon in &lexicons { 114 362 // get_lexicons_by_slice returns {lexicon: 1, id: "nsid", defs: {...}} ··· 134 382 database.clone(), 135 383 slice_uri.clone(), 136 384 &all_collections, 385 + auth_base_url.clone(), 386 + &mut type_registry, 387 + &lexicons, 388 + nsid, 137 389 ); 138 390 139 391 // Create edge and connection types for this collection (Relay standard) ··· 1049 1301 schema_builder = schema_builder.register(mutation_input); 1050 1302 } 1051 1303 1304 + // Register all nested object types from the type registry 1305 + for (_, nested_type) in type_registry { 1306 + schema_builder = schema_builder.register(nested_type); 1307 + } 1308 + 1052 1309 schema_builder 1053 1310 .finish() 1054 1311 .map_err(|e| format!("Schema build error: {:?}", e)) ··· 1062 1319 1063 1320 /// Container to hold blob data and DID for URL generation 1064 1321 #[derive(Clone)] 1065 - struct BlobContainer { 1066 - blob_ref: String, // CID reference 1067 - mime_type: String, // MIME type 1068 - size: i64, // Size in bytes 1069 - did: String, // DID for CDN URL generation 1322 + pub struct BlobContainer { 1323 + pub blob_ref: String, // CID reference 1324 + pub mime_type: String, // MIME type 1325 + pub size: i64, // Size in bytes 1326 + pub did: String, // DID for CDN URL generation 1070 1327 } 1071 1328 1072 1329 /// Creates a GraphQL Object type for a record collection ··· 1076 1333 database: Database, 1077 1334 slice_uri: String, 1078 1335 all_collections: &[CollectionMeta], 1336 + auth_base_url: String, 1337 + type_registry: &mut TypeRegistry, 1338 + all_lexicons: &[serde_json::Value], 1339 + lexicon_nsid: &str, 1079 1340 ) -> Object { 1080 1341 let mut object = Object::new(type_name); 1081 1342 ··· 1219 1480 let field_type = field.field_type.clone(); 1220 1481 let db_clone = database.clone(); 1221 1482 1222 - let type_ref = graphql_type_to_typeref(&field.field_type, field.is_required); 1483 + // Determine type ref - handle nested objects and lexicon refs specially 1484 + let type_ref = match &field.field_type { 1485 + GraphQLType::LexiconRef(ref_nsid) => { 1486 + // Resolve lexicon ref and generate type for it 1487 + let resolved_type_name = resolve_lexicon_ref_type( 1488 + ref_nsid, 1489 + lexicon_nsid, 1490 + all_lexicons, 1491 + type_registry, 1492 + &database, 1493 + ); 1494 + 1495 + if field.is_required { 1496 + TypeRef::named_nn(resolved_type_name) 1497 + } else { 1498 + TypeRef::named(resolved_type_name) 1499 + } 1500 + } 1501 + GraphQLType::Object(nested_fields) => { 1502 + // Generate nested object type 1503 + let nested_type_name = generate_nested_type_name(type_name, &field_name); 1504 + let actual_type_name = generate_nested_object_type( 1505 + &nested_type_name, 1506 + nested_fields, 1507 + type_registry, 1508 + &database, 1509 + ); 1510 + 1511 + if field.is_required { 1512 + TypeRef::named_nn(actual_type_name) 1513 + } else { 1514 + TypeRef::named(actual_type_name) 1515 + } 1516 + } 1517 + GraphQLType::Array(inner) => { 1518 + match inner.as_ref() { 1519 + GraphQLType::LexiconRef(ref_nsid) => { 1520 + // Resolve lexicon ref for array items 1521 + let resolved_type_name = resolve_lexicon_ref_type( 1522 + ref_nsid, 1523 + lexicon_nsid, 1524 + all_lexicons, 1525 + type_registry, 1526 + &database, 1527 + ); 1528 + 1529 + if field.is_required { 1530 + TypeRef::named_nn_list(resolved_type_name) 1531 + } else { 1532 + TypeRef::named_list(resolved_type_name) 1533 + } 1534 + } 1535 + GraphQLType::Object(nested_fields) => { 1536 + // Generate nested object type for array items 1537 + let nested_type_name = generate_nested_type_name(type_name, &field_name); 1538 + let actual_type_name = generate_nested_object_type( 1539 + &nested_type_name, 1540 + nested_fields, 1541 + type_registry, 1542 + &database, 1543 + ); 1544 + 1545 + if field.is_required { 1546 + TypeRef::named_nn_list(actual_type_name) 1547 + } else { 1548 + TypeRef::named_list(actual_type_name) 1549 + } 1550 + } 1551 + _ => graphql_type_to_typeref(&field.field_type, field.is_required), 1552 + } 1553 + } 1554 + _ => graphql_type_to_typeref(&field.field_type, field.is_required), 1555 + }; 1223 1556 1224 1557 object = object.field(Field::new(&field_name_for_field, type_ref, move |ctx| { 1225 1558 let field_name = field_name.clone(); ··· 1343 1676 } 1344 1677 } 1345 1678 1346 - // For non-ref fields, return the raw JSON value 1679 + // Check if this is a lexicon ref field 1680 + if matches!(field_type, GraphQLType::LexiconRef(_)) { 1681 + let nested_container = NestedObjectContainer { 1682 + data: val.clone(), 1683 + }; 1684 + return Ok(Some(FieldValue::owned_any(nested_container))); 1685 + } 1686 + 1687 + // Check if this is a nested object field 1688 + if matches!(field_type, GraphQLType::Object(_)) { 1689 + let nested_container = NestedObjectContainer { 1690 + data: val.clone(), 1691 + }; 1692 + return Ok(Some(FieldValue::owned_any(nested_container))); 1693 + } 1694 + 1695 + // Check if this is an array of nested objects or lexicon refs 1696 + if let GraphQLType::Array(inner) = &field_type { 1697 + if matches!(inner.as_ref(), GraphQLType::LexiconRef(_)) || matches!(inner.as_ref(), GraphQLType::Object(_)) { 1698 + if let Some(arr) = val.as_array() { 1699 + let containers: Vec<FieldValue> = arr 1700 + .iter() 1701 + .map(|item| { 1702 + let nested_container = NestedObjectContainer { 1703 + data: item.clone(), 1704 + }; 1705 + FieldValue::owned_any(nested_container) 1706 + }) 1707 + .collect(); 1708 + return Ok(Some(FieldValue::list(containers))); 1709 + } 1710 + return Ok(Some(FieldValue::list(Vec::<FieldValue>::new()))); 1711 + } 1712 + } 1713 + 1714 + // For non-ref, non-object fields, return the raw JSON value 1347 1715 let graphql_val = json_to_graphql_value(val); 1348 1716 Ok(Some(FieldValue::value(graphql_val))) 1349 1717 } else { ··· 1834 2202 )); 1835 2203 } 1836 2204 1837 - // Add sparklines and stats fields for NetworkSlicesSlice type 2205 + // Add sparklines, stats, and oauth clients fields for NetworkSlicesSlice type 1838 2206 if type_name == "NetworkSlicesSlice" { 1839 2207 object = add_sparklines_field_to_slice(object, database.clone()); 1840 2208 object = add_stats_field_to_slice(object, database.clone()); 2209 + object = add_oauth_clients_field_to_slice(object, auth_base_url); 1841 2210 } 1842 2211 1843 2212 object ··· 1910 2279 // Always nullable since blob data might be missing or malformed 1911 2280 TypeRef::named("Blob") 1912 2281 } 1913 - GraphQLType::Json | GraphQLType::Ref | GraphQLType::Object(_) | GraphQLType::Union => { 1914 - // JSON scalar type - linked records and complex objects return as JSON 2282 + GraphQLType::Json | GraphQLType::Ref | GraphQLType::LexiconRef(_) | GraphQLType::Object(_) | GraphQLType::Union => { 2283 + // JSON scalar type - linked records, lexicon refs, and complex objects return as JSON (fallback) 1915 2284 if is_required { 1916 2285 TypeRef::named_nn("JSON") 1917 2286 } else { ··· 2500 2869 let type_name = nsid_to_type_name(nsid); 2501 2870 2502 2871 // Add create mutation 2503 - mutation = add_create_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone()); 2872 + mutation = add_create_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone()); 2504 2873 2505 2874 // Add update mutation 2506 - mutation = add_update_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone()); 2875 + mutation = add_update_mutation(mutation, &type_name, nsid, &fields, database.clone(), slice_uri.clone()); 2507 2876 2508 2877 // Add delete mutation 2509 2878 mutation = add_delete_mutation(mutation, &type_name, nsid, database.clone(), slice_uri.clone()); ··· 3153 3522 input 3154 3523 } 3155 3524 3525 + /// Transforms fields in record data from GraphQL format to AT Protocol format 3526 + /// 3527 + /// Blob fields: 3528 + /// - GraphQL format: `{ref: "bafyrei...", mimeType: "...", size: 123}` 3529 + /// - AT Protocol format: `{$type: "blob", ref: {$link: "bafyrei..."}, mimeType: "...", size: 123}` 3530 + /// 3531 + /// Lexicon ref fields: 3532 + /// - Adds `$type: "{ref_nsid}"` to objects (e.g., `{$type: "community.lexicon.location.hthree#main", ...}`) 3533 + /// 3534 + /// Nested objects: 3535 + /// - Recursively processes nested objects and arrays 3536 + fn transform_fields_for_atproto( 3537 + mut data: serde_json::Value, 3538 + fields: &[GraphQLField], 3539 + ) -> serde_json::Value { 3540 + if let serde_json::Value::Object(ref mut map) = data { 3541 + for field in fields { 3542 + if let Some(field_value) = map.get_mut(&field.name) { 3543 + match &field.field_type { 3544 + GraphQLType::Blob => { 3545 + // Transform single blob field 3546 + if let Some(blob_obj) = field_value.as_object_mut() { 3547 + // Add $type: "blob" 3548 + blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string())); 3549 + 3550 + // Check if ref is a string (GraphQL format) 3551 + if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") { 3552 + // Transform to {$link: "cid"} (AT Protocol format) 3553 + let link_obj = serde_json::json!({ 3554 + "$link": cid 3555 + }); 3556 + blob_obj.insert("ref".to_string(), link_obj); 3557 + } 3558 + } 3559 + } 3560 + GraphQLType::LexiconRef(ref_nsid) => { 3561 + // Transform lexicon ref field by adding $type 3562 + if let Some(ref_obj) = field_value.as_object_mut() { 3563 + ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone())); 3564 + } 3565 + } 3566 + GraphQLType::Object(nested_fields) => { 3567 + // Recursively transform nested objects 3568 + *field_value = transform_fields_for_atproto(field_value.clone(), nested_fields); 3569 + } 3570 + GraphQLType::Array(inner) => { 3571 + match inner.as_ref() { 3572 + GraphQLType::Blob => { 3573 + // Transform array of blobs 3574 + if let Some(arr) = field_value.as_array_mut() { 3575 + for blob_value in arr { 3576 + if let Some(blob_obj) = blob_value.as_object_mut() { 3577 + // Add $type: "blob" 3578 + blob_obj.insert("$type".to_string(), serde_json::Value::String("blob".to_string())); 3579 + 3580 + if let Some(serde_json::Value::String(cid)) = blob_obj.get("ref") { 3581 + let link_obj = serde_json::json!({ 3582 + "$link": cid 3583 + }); 3584 + blob_obj.insert("ref".to_string(), link_obj); 3585 + } 3586 + } 3587 + } 3588 + } 3589 + } 3590 + GraphQLType::LexiconRef(ref_nsid) => { 3591 + // Transform array of lexicon refs 3592 + if let Some(arr) = field_value.as_array_mut() { 3593 + for ref_value in arr { 3594 + if let Some(ref_obj) = ref_value.as_object_mut() { 3595 + ref_obj.insert("$type".to_string(), serde_json::Value::String(ref_nsid.clone())); 3596 + } 3597 + } 3598 + } 3599 + } 3600 + GraphQLType::Object(nested_fields) => { 3601 + // Transform array of objects recursively 3602 + if let Some(arr) = field_value.as_array_mut() { 3603 + for item in arr { 3604 + *item = transform_fields_for_atproto(item.clone(), nested_fields); 3605 + } 3606 + } 3607 + } 3608 + _ => {} // Other array types don't need transformation 3609 + } 3610 + } 3611 + _ => {} // Other field types don't need transformation 3612 + } 3613 + } 3614 + } 3615 + } 3616 + 3617 + data 3618 + } 3619 + 3156 3620 /// Adds a create mutation for a collection 3157 3621 fn add_create_mutation( 3158 3622 mutation: Object, 3159 3623 type_name: &str, 3160 3624 nsid: &str, 3625 + fields: &[GraphQLField], 3161 3626 database: Database, 3162 3627 slice_uri: String, 3163 3628 ) -> Object { 3164 3629 let mutation_name = format!("create{}", type_name); 3165 3630 let nsid = nsid.to_string(); 3166 3631 let nsid_clone = nsid.clone(); 3632 + let fields = fields.to_vec(); 3167 3633 3168 3634 mutation.field( 3169 3635 Field::new( ··· 3173 3639 let db = database.clone(); 3174 3640 let slice = slice_uri.clone(); 3175 3641 let collection = nsid.clone(); 3642 + let fields = fields.clone(); 3176 3643 3177 3644 FieldFuture::new(async move { 3178 3645 // Get GraphQL context which contains auth info ··· 3188 3655 .ok_or_else(|| Error::new("Missing input argument"))?; 3189 3656 3190 3657 // Convert GraphQL value to JSON using deserialize 3191 - let record_data: serde_json::Value = input.deserialize() 3658 + let mut record_data: serde_json::Value = input.deserialize() 3192 3659 .map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?; 3660 + 3661 + // Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs) 3662 + record_data = transform_fields_for_atproto(record_data, &fields); 3193 3663 3194 3664 // Optional rkey argument 3195 3665 let rkey = ctx.args.get("rkey") ··· 3318 3788 mutation: Object, 3319 3789 type_name: &str, 3320 3790 nsid: &str, 3791 + fields: &[GraphQLField], 3321 3792 database: Database, 3322 3793 slice_uri: String, 3323 3794 ) -> Object { 3324 3795 let mutation_name = format!("update{}", type_name); 3325 3796 let nsid = nsid.to_string(); 3326 3797 let nsid_clone = nsid.clone(); 3798 + let fields = fields.to_vec(); 3327 3799 3328 3800 mutation.field( 3329 3801 Field::new( ··· 3333 3805 let db = database.clone(); 3334 3806 let slice = slice_uri.clone(); 3335 3807 let collection = nsid.clone(); 3808 + let fields = fields.clone(); 3336 3809 3337 3810 FieldFuture::new(async move { 3338 3811 // Get GraphQL context which contains auth info ··· 3354 3827 .ok_or_else(|| Error::new("Missing input argument"))?; 3355 3828 3356 3829 // Convert GraphQL value to JSON using deserialize 3357 - let record_data: serde_json::Value = input.deserialize() 3830 + let mut record_data: serde_json::Value = input.deserialize() 3358 3831 .map_err(|e| Error::new(format!("Failed to deserialize input: {:?}", e)))?; 3832 + 3833 + // Transform fields from GraphQL to AT Protocol format (adds $type, transforms blob refs) 3834 + record_data = transform_fields_for_atproto(record_data, &fields); 3359 3835 3360 3836 // Verify OAuth token and get user info 3361 3837 let user_info = crate::auth::verify_oauth_token_cached(
+29 -18
api/src/graphql/schema_ext/blob_upload.rs
··· 1 1 //! GraphQL schema extension for blob uploads 2 2 3 3 use async_graphql::dynamic::{Field, FieldFuture, FieldValue, InputValue, Object, TypeRef}; 4 - use async_graphql::{Error, Value as GraphQLValue}; 4 + use async_graphql::Error; 5 5 use base64::engine::general_purpose; 6 6 use base64::Engine; 7 7 8 8 use crate::atproto_extensions::upload_blob as atproto_upload_blob; 9 9 use crate::auth; 10 - 11 - /// Container for blob upload response 12 - #[derive(Clone)] 13 - struct BlobUploadContainer { 14 - blob: serde_json::Value, 15 - } 10 + use crate::graphql::schema_builder::BlobContainer; 16 11 17 12 /// Creates the BlobUploadResponse GraphQL type 18 13 pub fn create_blob_upload_response_type() -> Object { 19 14 let mut response = Object::new("BlobUploadResponse"); 20 15 21 - response = response.field(Field::new("blob", TypeRef::named_nn("JSON"), |ctx| { 16 + // Return the Blob type instead of JSON to ensure consistent ref field handling 17 + response = response.field(Field::new("blob", TypeRef::named_nn("Blob"), |ctx| { 22 18 FieldFuture::new(async move { 23 - let container = ctx.parent_value.try_downcast_ref::<BlobUploadContainer>()?; 24 - // Convert serde_json::Value to async_graphql::Value 25 - let graphql_value: GraphQLValue = serde_json::from_value(container.blob.clone()) 26 - .map_err(|e| async_graphql::Error::new(format!("Failed to convert blob to GraphQL value: {}", e)))?; 27 - Ok(Some(graphql_value)) 19 + // The BlobContainer is passed through from the mutation resolver 20 + // The Blob type resolver will handle extracting the fields 21 + let container = ctx.parent_value.try_downcast_ref::<BlobContainer>()?; 22 + Ok(Some(FieldValue::owned_any(container.clone()))) 28 23 }) 29 24 })); 30 25 ··· 70 65 .decode(data_base64) 71 66 .map_err(|e| Error::new(format!("Invalid base64 data: {}", e)))?; 72 67 68 + // Verify OAuth token to get user info (needed for DID) 69 + let user_info = auth::verify_oauth_token_cached( 70 + token, 71 + &auth_base, 72 + gql_ctx.auth_cache.clone(), 73 + ) 74 + .await 75 + .map_err(|e| Error::new(format!("Invalid token: {}", e)))?; 76 + 73 77 // Get ATProto DPoP auth and PDS URL for this user 74 78 let (dpop_auth, pds_url) = auth::get_atproto_auth_for_user_cached( 75 79 token, ··· 91 95 .await 92 96 .map_err(|e| Error::new(format!("Failed to upload blob: {}", e)))?; 93 97 94 - // Convert blob to JSON value 95 - let blob_json = serde_json::to_value(&upload_result.blob) 96 - .map_err(|e| Error::new(format!("Failed to serialize blob: {}", e)))?; 98 + // Extract the DID from user info 99 + let did = user_info.did.unwrap_or(user_info.sub); 100 + 101 + // Create BlobContainer with flattened ref field (CID string) 102 + // This ensures the GraphQL Blob type returns ref as a String, not an object 103 + let blob_container = BlobContainer { 104 + blob_ref: upload_result.blob.r#ref.link.clone(), // Extract CID from ref.$link 105 + mime_type: upload_result.blob.mime_type.clone(), 106 + size: upload_result.blob.size as i64, 107 + did, 108 + }; 97 109 98 - let container = BlobUploadContainer { blob: blob_json }; 99 - Ok(Some(FieldValue::owned_any(container))) 110 + Ok(Some(FieldValue::owned_any(blob_container))) 100 111 }) 101 112 }, 102 113 )
+1
api/src/graphql/schema_ext/mod.rs
··· 66 66 pub use oauth::{ 67 67 create_oauth_client_type, 68 68 add_oauth_clients_query, 69 + add_oauth_clients_field_to_slice, 69 70 add_create_oauth_client_mutation, 70 71 add_update_oauth_client_mutation, 71 72 add_delete_oauth_client_mutation,
+107
api/src/graphql/schema_ext/oauth.rs
··· 156 156 oauth_client 157 157 } 158 158 159 + /// Add oauthClients field to NetworkSlicesSlice type 160 + pub fn add_oauth_clients_field_to_slice( 161 + object: Object, 162 + auth_base_url: String, 163 + ) -> Object { 164 + use crate::graphql::schema_builder::RecordContainer; 165 + 166 + let base_url_for_oauth = auth_base_url.clone(); 167 + 168 + object.field( 169 + Field::new( 170 + "oauthClients", 171 + TypeRef::named_nn_list_nn("OAuthClient"), 172 + move |ctx| { 173 + let base_url = base_url_for_oauth.clone(); 174 + 175 + FieldFuture::new(async move { 176 + let container = ctx.parent_value.try_downcast_ref::<RecordContainer>()?; 177 + let slice_uri = &container.record.uri; 178 + 179 + // Get pool from context and create database instance 180 + let pool = ctx.data::<sqlx::PgPool>() 181 + .map_err(|_| Error::new("Database pool not found in context"))?; 182 + let database = crate::database::Database::new(pool.clone()); 183 + 184 + // Fetch OAuth clients from database 185 + let clients = database 186 + .get_oauth_clients_for_slice(slice_uri) 187 + .await 188 + .map_err(|e| Error::new(format!("Failed to fetch OAuth clients: {}", e)))?; 189 + 190 + if clients.is_empty() { 191 + return Ok(Some(FieldValue::list(Vec::<FieldValue<'_>>::new()))); 192 + } 193 + 194 + // Fetch details from AIP server 195 + let http_client = Client::new(); 196 + let mut client_data_list = Vec::new(); 197 + 198 + for oauth_client in clients { 199 + let aip_url = format!("{}/oauth/clients/{}", base_url, oauth_client.client_id); 200 + let mut request_builder = http_client.get(&aip_url); 201 + 202 + if let Some(token) = &oauth_client.registration_access_token { 203 + request_builder = request_builder.bearer_auth(token); 204 + } 205 + 206 + match request_builder.send().await { 207 + Ok(response) if response.status().is_success() => { 208 + if let Ok(response_text) = response.text().await { 209 + if let Ok(aip_client) = serde_json::from_str::<AipClientResponse>(&response_text) { 210 + client_data_list.push(OAuthClientData { 211 + client_id: aip_client.client_id, 212 + client_secret: aip_client.client_secret, 213 + client_name: aip_client.client_name, 214 + redirect_uris: aip_client.redirect_uris, 215 + grant_types: aip_client.grant_types, 216 + response_types: aip_client.response_types, 217 + scope: aip_client.scope, 218 + client_uri: aip_client.client_uri, 219 + logo_uri: aip_client.logo_uri, 220 + tos_uri: aip_client.tos_uri, 221 + policy_uri: aip_client.policy_uri, 222 + created_at: oauth_client.created_at, 223 + created_by_did: oauth_client.created_by_did, 224 + }); 225 + } 226 + } 227 + } 228 + _ => { 229 + // Fallback for clients we can't fetch details for 230 + client_data_list.push(OAuthClientData { 231 + client_id: oauth_client.client_id, 232 + client_secret: None, 233 + client_name: "Unknown".to_string(), 234 + redirect_uris: vec![], 235 + grant_types: vec!["authorization_code".to_string()], 236 + response_types: vec!["code".to_string()], 237 + scope: None, 238 + client_uri: None, 239 + logo_uri: None, 240 + tos_uri: None, 241 + policy_uri: None, 242 + created_at: oauth_client.created_at, 243 + created_by_did: oauth_client.created_by_did, 244 + }); 245 + } 246 + } 247 + } 248 + 249 + // Convert to GraphQL values 250 + let field_values: Vec<FieldValue<'_>> = client_data_list 251 + .into_iter() 252 + .map(|client_data| { 253 + let container = OAuthClientContainer { client: client_data }; 254 + FieldValue::owned_any(container) 255 + }) 256 + .collect(); 257 + 258 + Ok(Some(FieldValue::list(field_values))) 259 + }) 260 + }, 261 + ) 262 + .description("Get all OAuth clients for this slice") 263 + ) 264 + } 265 + 159 266 /// Add oauthClients query to the Query type 160 267 pub fn add_oauth_clients_query(query: Object, slice_uri: String, auth_base_url: String) -> Object { 161 268 query.field(
+143 -4
api/src/graphql/schema_ext/sync.rs
··· 10 10 use uuid::Uuid; 11 11 use base64::engine::general_purpose; 12 12 use base64::Engine; 13 + use redis::aio::ConnectionManager; 14 + use redis::{Client, AsyncCommands}; 15 + use futures_util::StreamExt; 13 16 14 17 /// Global broadcast channel for sync job status updates 15 18 /// This allows real-time job status streaming to GraphQL subscriptions 16 19 static JOB_CHANNEL: OnceLock<Arc<Mutex<broadcast::Sender<JobStatus>>>> = OnceLock::new(); 20 + 21 + /// Global Redis client for cross-process pub/sub (optional) 22 + static REDIS_CLIENT: OnceLock<Option<Client>> = OnceLock::new(); 17 23 18 24 /// Initialize or get the global job channel 19 25 fn get_job_channel() -> Arc<Mutex<broadcast::Sender<JobStatus>>> { ··· 27 33 28 34 /// Publish a sync job status update to subscribers 29 35 pub async fn publish_sync_job_update(job_status: JobStatus) { 36 + // Publish to in-memory broadcast channel (for same-process subscribers) 30 37 let sender = get_job_channel(); 31 38 let sender_lock = sender.lock().await; 32 - let _ = sender_lock.send(job_status); // Ignore errors if no subscribers 39 + let _ = sender_lock.send(job_status.clone()); // Ignore errors if no subscribers 40 + drop(sender_lock); 41 + 42 + // Also publish to Redis for cross-process communication (if Redis is configured) 43 + if let Some(Some(client)) = REDIS_CLIENT.get() { 44 + if let Err(e) = publish_to_redis(client, &job_status).await { 45 + tracing::warn!("Failed to publish job status to Redis: {}", e); 46 + } 47 + } 48 + } 49 + 50 + /// Publish job status to Redis for cross-process communication 51 + async fn publish_to_redis(client: &Client, job_status: &JobStatus) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 52 + let mut conn = ConnectionManager::new(client.clone()).await?; 53 + let payload = serde_json::to_string(job_status)?; 54 + let _: () = conn.publish("sync_job_updates", payload).await?; 55 + Ok(()) 33 56 } 34 57 35 58 /// Container for JobStatus to implement Any trait for GraphQL ··· 226 249 FieldFuture::new(async move { 227 250 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 228 251 Ok(Some(GraphQLValue::from(container.status.job_id.to_string()))) 252 + }) 253 + })); 254 + 255 + job = job.field(Field::new("sliceUri", TypeRef::named_nn(TypeRef::STRING), |ctx| { 256 + FieldFuture::new(async move { 257 + let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 258 + Ok(Some(GraphQLValue::from(container.status.slice_uri.clone()))) 229 259 }) 230 260 })); 231 261 ··· 711 741 let mut receiver = sender_lock.subscribe(); 712 742 drop(sender_lock); // Release lock 713 743 744 + // Get optional slice filter from arguments 745 + let slice_filter: Option<String> = ctx.args.get("slice") 746 + .and_then(|val| val.string().ok()) 747 + .map(|s| s.to_string()); 748 + 714 749 let stream = async_stream::stream! { 715 750 while let Ok(job_status) = receiver.recv().await { 716 751 // Filter by job_id if provided ··· 720 755 } 721 756 } 722 757 723 - // Filter by slice_uri if provided (need to query for slice_uri) 724 - // For now, skip slice filtering since JobStatus doesn't include slice_uri 725 - // TODO: Add slice_uri to JobStatus or query it separately 758 + // Filter by slice_uri if provided 759 + if let Some(ref filter_slice) = slice_filter { 760 + if &job_status.slice_uri != filter_slice { 761 + continue; 762 + } 763 + } 726 764 727 765 // Convert to GraphQL value and yield 728 766 let container = JobStatusContainer { status: job_status }; ··· 822 860 .description("Delete a sync job from the database") 823 861 ) 824 862 } 863 + 864 + /// Initialize Redis pub/sub for sync job updates 865 + /// 866 + /// This function should be called once at application startup. 867 + /// It initializes the Redis client and starts a background task to listen for 868 + /// job updates from other processes (e.g., worker processes). 869 + /// 870 + /// # Arguments 871 + /// * `redis_url` - Optional Redis connection URL. If None, Redis pub/sub is disabled. 872 + pub fn initialize_redis_pubsub(redis_url: Option<String>) { 873 + // Initialize Redis client (or None if not configured) 874 + let client = redis_url.and_then(|url| { 875 + match Client::open(url.as_str()) { 876 + Ok(client) => { 877 + tracing::info!("Initialized Redis client for sync job pub/sub"); 878 + Some(client) 879 + } 880 + Err(e) => { 881 + tracing::error!("Failed to create Redis client for sync job pub/sub: {}", e); 882 + None 883 + } 884 + } 885 + }); 886 + 887 + let has_redis = client.is_some(); 888 + REDIS_CLIENT.get_or_init(|| client); 889 + 890 + // Start Redis subscription listener task if Redis is available 891 + if has_redis { 892 + start_redis_listener(); 893 + } else { 894 + tracing::info!("Redis not configured - sync job updates will use in-memory broadcast only"); 895 + } 896 + } 897 + 898 + /// Start a background task that subscribes to Redis and forwards messages to the in-memory broadcast channel 899 + fn start_redis_listener() { 900 + tokio::spawn(async { 901 + tracing::info!("Starting Redis subscription listener for sync job updates"); 902 + 903 + loop { 904 + // Get Redis client 905 + let client = match REDIS_CLIENT.get() { 906 + Some(Some(client)) => client, 907 + _ => { 908 + tracing::error!("Redis client not available for subscription"); 909 + return; 910 + } 911 + }; 912 + 913 + // Connect and subscribe 914 + match subscribe_to_redis(client).await { 915 + Ok(_) => { 916 + tracing::warn!("Redis subscription ended, reconnecting in 5 seconds..."); 917 + } 918 + Err(e) => { 919 + tracing::error!("Redis subscription error: {}, reconnecting in 5 seconds...", e); 920 + } 921 + } 922 + 923 + // Wait before reconnecting 924 + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 925 + } 926 + }); 927 + } 928 + 929 + /// Subscribe to Redis channel and forward messages to in-memory broadcast 930 + async fn subscribe_to_redis(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 931 + // Create a pub/sub connection from the client 932 + let mut pubsub = client.get_async_pubsub().await?; 933 + 934 + pubsub.subscribe("sync_job_updates").await?; 935 + tracing::info!("Subscribed to Redis channel: sync_job_updates"); 936 + 937 + // Get the in-memory broadcast sender 938 + let sender = get_job_channel(); 939 + 940 + loop { 941 + let msg = pubsub.on_message().next().await; 942 + if let Some(msg) = msg { 943 + let payload: String = msg.get_payload()?; 944 + 945 + // Deserialize JobStatus from JSON 946 + match serde_json::from_str::<JobStatus>(&payload) { 947 + Ok(job_status) => { 948 + // Forward to in-memory broadcast channel 949 + let sender_lock = sender.lock().await; 950 + if let Err(e) = sender_lock.send(job_status.clone()) { 951 + tracing::debug!("No local subscribers for job update: {}", e); 952 + } 953 + drop(sender_lock); 954 + 955 + tracing::debug!("Forwarded job update from Redis: job_id={}", job_status.job_id); 956 + } 957 + Err(e) => { 958 + tracing::warn!("Failed to deserialize job status from Redis: {}", e); 959 + } 960 + } 961 + } 962 + } 963 + }
+6 -1
api/src/graphql/types.rs
··· 20 20 Float, 21 21 /// Reference to another record (for strongRef) 22 22 Ref, 23 + /// Reference to a lexicon type definition (e.g., community.lexicon.location.hthree) 24 + LexiconRef(String), 23 25 /// Array of a type 24 26 Array(Box<GraphQLType>), 25 27 /// Object with nested fields ··· 45 47 "unknown" => GraphQLType::Json, 46 48 "null" => GraphQLType::Json, 47 49 "ref" => { 48 - // Check if this is a strongRef (link to another record) 50 + // Check if this is a strongRef (link to another record) or a lexicon type ref 49 51 let ref_name = lexicon_def 50 52 .get("ref") 51 53 .and_then(|r| r.as_str()) ··· 53 55 54 56 if ref_name == "com.atproto.repo.strongRef" { 55 57 GraphQLType::Ref 58 + } else if !ref_name.is_empty() { 59 + // This is a reference to a lexicon type definition 60 + GraphQLType::LexiconRef(ref_name.to_string()) 56 61 } else { 57 62 GraphQLType::Json 58 63 }
+19
api/src/jobs.rs
··· 148 148 // Publish job running status to subscribers 149 149 let running_status = JobStatus { 150 150 job_id: payload.job_id, 151 + slice_uri: payload.slice_uri.clone(), 151 152 status: "running".to_string(), 152 153 created_at: now, 153 154 started_at: Some(now), ··· 260 261 // Publish job status update to GraphQL subscribers 261 262 let job_status = JobStatus { 262 263 job_id: payload.job_id, 264 + slice_uri: payload.slice_uri.clone(), 263 265 status: "completed".to_string(), 264 266 created_at: chrono::Utc::now(), 265 267 started_at: Some(chrono::Utc::now()), ··· 337 339 // Publish job status update to GraphQL subscribers 338 340 let job_status = JobStatus { 339 341 job_id: payload.job_id, 342 + slice_uri: payload.slice_uri.clone(), 340 343 status: "failed".to_string(), 341 344 created_at: chrono::Utc::now(), 342 345 started_at: Some(chrono::Utc::now()), ··· 536 539 // Publish job creation event to subscribers 537 540 let job_status = JobStatus { 538 541 job_id, 542 + slice_uri: slice_uri.clone(), 539 543 status: "pending".to_string(), 540 544 created_at: chrono::Utc::now(), 541 545 started_at: None, ··· 559 563 pub struct JobStatus { 560 564 /// Unique identifier for the job 561 565 pub job_id: Uuid, 566 + /// Slice URI this job belongs to 567 + pub slice_uri: String, 562 568 /// Current status: "pending", "running", "completed", or "failed" 563 569 pub status: String, 564 570 /// Timestamp when job was enqueued ··· 611 617 612 618 return Ok(Some(JobStatus { 613 619 job_id, 620 + slice_uri: result.slice_uri, 614 621 status: result.status, 615 622 created_at: result.created_at, 616 623 started_at: Some(result.created_at), ··· 647 654 648 655 match queue_row { 649 656 Some(row) => { 657 + // Extract slice_uri from payload JSON 658 + let slice_uri = row.payload_json 659 + .as_ref() 660 + .and_then(|json| json.get("slice_uri")) 661 + .and_then(|v| v.as_str()) 662 + .unwrap_or_default() 663 + .to_string(); 664 + 650 665 // Determine status based on attempt_at timestamp 651 666 let status = if row.attempt_at.is_none() { 652 667 "completed".to_string() ··· 662 677 663 678 Ok(Some(JobStatus { 664 679 job_id, 680 + slice_uri, 665 681 status: status.clone(), 666 682 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 667 683 started_at: if status == "running" || status == "completed" { ··· 790 806 791 807 results.push(JobStatus { 792 808 job_id: row.job_id.unwrap_or_else(Uuid::new_v4), 809 + slice_uri: row.slice_uri.clone().unwrap_or_default(), 793 810 status: row.status.unwrap_or_default(), 794 811 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 795 812 started_at: row.created_at, ··· 902 919 903 920 results.push(JobStatus { 904 921 job_id: row.job_id.unwrap_or_else(Uuid::new_v4), 922 + slice_uri: row.slice_uri.clone().unwrap_or_default(), 905 923 status: row.status.unwrap_or_default(), 906 924 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 907 925 started_at: row.created_at, ··· 1055 1073 // Publish job status update for subscribers 1056 1074 let job_status = JobStatus { 1057 1075 job_id, 1076 + slice_uri, 1058 1077 status: "cancelled".to_string(), 1059 1078 created_at: now, 1060 1079 started_at: None,
+10 -2
api/src/main.rs
··· 109 109 // Start GraphQL PubSub cleanup task 110 110 graphql::pubsub::start_cleanup_task(); 111 111 112 + // Initialize Redis pub/sub for cross-process sync job updates 113 + let redis_url = env::var("REDIS_URL").ok(); 114 + graphql::schema_ext::sync::initialize_redis_pubsub(redis_url); 115 + 112 116 // Detect process type from environment (supports both PROCESS_TYPE and FLY_PROCESS_GROUP) 113 117 let process_type = env::var("PROCESS_TYPE") 114 118 .or_else(|_| env::var("FLY_PROCESS_GROUP")) ··· 183 187 if now.duration_since(window_start) >= RECONNECT_WINDOW { 184 188 reconnect_count = 0; 185 189 window_start = now; 190 + retry_delay = tokio::time::Duration::from_secs(5); // Reset delay after window passes 186 191 } 187 192 188 193 // Check rate limit ··· 198 203 } 199 204 200 205 reconnect_count += 1; 206 + tracing::info!("Jetstream connection attempt #{} (retry delay: {:?})", reconnect_count, retry_delay); 201 207 202 208 // Read cursor position from database 203 209 let initial_cursor = ··· 261 267 let cancellation_token = atproto_jetstream::CancellationToken::new(); 262 268 match consumer_arc.start_consuming(cancellation_token).await { 263 269 Ok(_) => { 264 - tracing::info!("Jetstream consumer shut down normally"); 270 + tracing::info!("Jetstream consumer shut down normally - reconnecting in {:?}", retry_delay); 265 271 jetstream_connected_clone 266 272 .store(false, std::sync::atomic::Ordering::Relaxed); 273 + tokio::time::sleep(retry_delay).await; 274 + retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); 267 275 } 268 276 Err(e) => { 269 - tracing::error!("Jetstream consumer failed: {} - will reconnect", e); 277 + tracing::error!("Jetstream consumer failed: {} - reconnecting in {:?}", e, retry_delay); 270 278 jetstream_connected_clone 271 279 .store(false, std::sync::atomic::Ordering::Relaxed); 272 280 tokio::time::sleep(retry_delay).await;
+24 -11
crates/slices-lexicon/src/validation/primitive/string.rs
··· 577 577 578 578 /// Validates TID (Timestamp Identifier) format 579 579 /// 580 - /// TID format: 13-character base32-encoded timestamp + random bits 581 - /// Uses Crockford base32 alphabet: 0123456789ABCDEFGHJKMNPQRSTVWXYZ (case-insensitive) 580 + /// TID format: 13-character base32-sortable encoded timestamp + random bits 581 + /// Uses ATProto base32-sortable alphabet: 234567abcdefghijklmnopqrstuvwxyz (lowercase only) 582 582 pub fn is_valid_tid(&self, value: &str) -> bool { 583 583 use regex::Regex; 584 584 ··· 586 586 return false; 587 587 } 588 588 589 - // TID uses Crockford base32 (case-insensitive, excludes I, L, O, U) 590 - let tid_regex = Regex::new(r"^[0-9A-HJKMNP-TV-Z]{13}$").unwrap(); 591 - let uppercase_value = value.to_uppercase(); 589 + // TID uses base32-sortable (s32) - lowercase only 590 + // First character must be from limited set (ensures top bit is 0) 591 + // Remaining 12 characters from full base32-sortable alphabet 592 + let tid_regex = Regex::new(r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$").unwrap(); 592 593 593 - tid_regex.is_match(&uppercase_value) 594 + tid_regex.is_match(value) 594 595 } 595 596 596 597 /// Validates Record Key format ··· 1096 1097 1097 1098 let validator = StringValidator; 1098 1099 1099 - // Valid TIDs (13 characters, Crockford base32) 1100 - assert!(validator.validate_data(&json!("3JZFKJT0000ZZ"), &schema, &ctx).is_ok()); 1101 - assert!(validator.validate_data(&json!("3jzfkjt0000zz"), &schema, &ctx).is_ok()); // case insensitive 1100 + // Valid TIDs (base32-sortable, 13 chars, lowercase) 1101 + assert!(validator.validate_data(&json!("3m3zm7eurxk26"), &schema, &ctx).is_ok()); 1102 + assert!(validator.validate_data(&json!("2222222222222"), &schema, &ctx).is_ok()); // minimum TID 1103 + assert!(validator.validate_data(&json!("a222222222222"), &schema, &ctx).is_ok()); // leading 'a' (lower bound) 1104 + assert!(validator.validate_data(&json!("j234567abcdef"), &schema, &ctx).is_ok()); // leading 'j' (upper bound) 1105 + 1102 1106 1103 - // Invalid TIDs 1107 + // Invalid TIDs - uppercase not allowed (charset is lowercase only) 1108 + assert!(validator.validate_data(&json!("3m3zM7eurxk26"), &schema, &ctx).is_err()); // mixed case 1109 + 1110 + // Invalid TIDs - wrong length 1104 1111 assert!(validator.validate_data(&json!("too-short"), &schema, &ctx).is_err()); 1105 1112 assert!(validator.validate_data(&json!("too-long-string"), &schema, &ctx).is_err()); 1113 + 1114 + // Invalid TIDs - invalid characters (hyphen/punct rejected; digits 0,1,8,9 not allowed) 1106 1115 assert!(validator.validate_data(&json!("invalid-chars!"), &schema, &ctx).is_err()); 1107 - assert!(validator.validate_data(&json!("invalid-ILOU0"), &schema, &ctx).is_err()); // invalid chars (I, L, O, U) 1116 + assert!(validator.validate_data(&json!("xyz1234567890"), &schema, &ctx).is_err()); // has 0,1,8,9 1117 + 1118 + // Invalid TIDs - first character must be one of 234567abcdefghij 1119 + assert!(validator.validate_data(&json!("k222222222222"), &schema, &ctx).is_err()); // leading 'k' forbidden 1120 + assert!(validator.validate_data(&json!("z234567abcdef"), &schema, &ctx).is_err()); // leading 'z' forbidden 1108 1121 } 1109 1122 1110 1123 #[test]
+146 -3
deno.lock
··· 22 22 "jsr:@std/path@^1.1.1": "1.1.2", 23 23 "jsr:@std/streams@^1.0.10": "1.0.12", 24 24 "npm:@deno/vite-plugin@^1.0.5": "1.0.5_vite@7.1.10__@types+node@24.7.2__picomatch@4.0.3_@types+node@24.7.2_@types+node@24.2.0", 25 + "npm:@libsql/client@0.6.0": "0.6.0", 25 26 "npm:@shikijs/core@^3.7.0": "3.13.0", 26 27 "npm:@shikijs/engine-oniguruma@^3.7.0": "3.13.0", 27 28 "npm:@shikijs/types@^3.7.0": "3.13.0", ··· 483 484 "@jridgewell/sourcemap-codec" 484 485 ] 485 486 }, 487 + "@libsql/client@0.6.0": { 488 + "integrity": "sha512-qhQzTG/y2IEVbL3+9PULDvlQFWJ/RnjFXECr/Nc3nRngGiiMysDaOV5VUzYk7DulUX98EA4wi+z3FspKrUplUA==", 489 + "dependencies": [ 490 + "@libsql/core", 491 + "@libsql/hrana-client", 492 + "js-base64", 493 + "libsql" 494 + ] 495 + }, 496 + "@libsql/core@0.6.2": { 497 + "integrity": "sha512-c2P4M+4u/4b2L02A0KjggO3UW51rGkhxr/7fzJO0fEAqsqrWGxuNj2YtRkina/oxfYvAof6xjp8RucNoIV/Odw==", 498 + "dependencies": [ 499 + "js-base64" 500 + ] 501 + }, 502 + "@libsql/darwin-arm64@0.3.19": { 503 + "integrity": "sha512-rmOqsLcDI65zzxlUOoEiPJLhqmbFsZF6p4UJQ2kMqB+Kc0Rt5/A1OAdOZ/Wo8fQfJWjR1IbkbpEINFioyKf+nQ==", 504 + "os": ["darwin"], 505 + "cpu": ["arm64"] 506 + }, 507 + "@libsql/darwin-x64@0.3.19": { 508 + "integrity": "sha512-q9O55B646zU+644SMmOQL3FIfpmEvdWpRpzubwFc2trsa+zoBlSkHuzU9v/C+UNoPHQVRMP7KQctJ455I/h/xw==", 509 + "os": ["darwin"], 510 + "cpu": ["x64"] 511 + }, 512 + "@libsql/hrana-client@0.6.2": { 513 + "integrity": "sha512-MWxgD7mXLNf9FXXiM0bc90wCjZSpErWKr5mGza7ERy2FJNNMXd7JIOv+DepBA1FQTIfI8TFO4/QDYgaQC0goNw==", 514 + "dependencies": [ 515 + "@libsql/isomorphic-fetch", 516 + "@libsql/isomorphic-ws", 517 + "js-base64", 518 + "node-fetch@3.3.2" 519 + ] 520 + }, 521 + "@libsql/isomorphic-fetch@0.2.5": { 522 + "integrity": "sha512-8s/B2TClEHms2yb+JGpsVRTPBfy1ih/Pq6h6gvyaNcYnMVJvgQRY7wAa8U2nD0dppbCuDU5evTNMEhrQ17ZKKg==" 523 + }, 524 + "@libsql/isomorphic-ws@0.1.5": { 525 + "integrity": "sha512-DtLWIH29onUYR00i0GlQ3UdcTRC6EP4u9w/h9LxpUZJWRMARk6dQwZ6Jkd+QdwVpuAOrdxt18v0K2uIYR3fwFg==", 526 + "dependencies": [ 527 + "@types/ws", 528 + "ws" 529 + ] 530 + }, 531 + "@libsql/linux-arm64-gnu@0.3.19": { 532 + "integrity": "sha512-mgeAUU1oqqh57k7I3cQyU6Trpdsdt607eFyEmH5QO7dv303ti+LjUvh1pp21QWV6WX7wZyjeJV1/VzEImB+jRg==", 533 + "os": ["linux"], 534 + "cpu": ["arm64"] 535 + }, 536 + "@libsql/linux-arm64-musl@0.3.19": { 537 + "integrity": "sha512-VEZtxghyK6zwGzU9PHohvNxthruSxBEnRrX7BSL5jQ62tN4n2JNepJ6SdzXp70pdzTfwroOj/eMwiPt94gkVRg==", 538 + "os": ["linux"], 539 + "cpu": ["arm64"] 540 + }, 541 + "@libsql/linux-x64-gnu@0.3.19": { 542 + "integrity": "sha512-2t/J7LD5w2f63wGihEO+0GxfTyYIyLGEvTFEsMO16XI5o7IS9vcSHrxsvAJs4w2Pf907uDjmc7fUfMg6L82BrQ==", 543 + "os": ["linux"], 544 + "cpu": ["x64"] 545 + }, 546 + "@libsql/linux-x64-musl@0.3.19": { 547 + "integrity": "sha512-BLsXyJaL8gZD8+3W2LU08lDEd9MIgGds0yPy5iNPp8tfhXx3pV/Fge2GErN0FC+nzt4DYQtjL+A9GUMglQefXQ==", 548 + "os": ["linux"], 549 + "cpu": ["x64"] 550 + }, 551 + "@libsql/win32-x64-msvc@0.3.19": { 552 + "integrity": "sha512-ay1X9AobE4BpzG0XPw1gplyLZPGHIgJOovvW23gUrukRegiUP62uzhpRbKNogLlUOynyXeq//prHgPXiebUfWg==", 553 + "os": ["win32"], 554 + "cpu": ["x64"] 555 + }, 486 556 "@napi-rs/wasm-runtime@1.0.7": { 487 557 "integrity": "sha512-SeDnOO0Tk7Okiq6DbXmmBODgOAb9dp9gjlphokTUxmt8U3liIP1ZsozBahH69j/RJv+Rfs6IwUKHTgQYJ/HBAw==", 488 558 "dependencies": [ ··· 490 560 "@emnapi/runtime", 491 561 "@tybys/wasm-util" 492 562 ] 563 + }, 564 + "@neon-rs/load@0.0.4": { 565 + "integrity": "sha512-kTPhdZyTQxB+2wpiRcFWrDcejc4JI6tkPuS7UZCG4l6Zvc5kU/gGQ/ozvHTh1XR5tS+UlfAfGuPajjzQjCiHCw==" 493 566 }, 494 567 "@nodelib/fs.scandir@2.1.5": { 495 568 "integrity": "sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==", ··· 749 822 "@tailwindcss/oxide@4.1.14": { 750 823 "integrity": "sha512-23yx+VUbBwCg2x5XWdB8+1lkPajzLmALEfMb51zZUBYaYVPDQvBSD/WYDqiVyBIo2BZFa3yw1Rpy3G2Jp+K0dw==", 751 824 "dependencies": [ 752 - "detect-libc", 825 + "detect-libc@2.1.2", 753 826 "tar" 754 827 ], 755 828 "optionalDependencies": [ ··· 982 1055 "@types/unist@3.0.3": { 983 1056 "integrity": "sha512-ko/gIFJRv177XgZsZcBwnqJN5x/Gien8qNOn0D5bQU/zAzVf9Zt3BlcUiLqhV9y4ARk0GbT3tnUiPNgnTXzc/Q==" 984 1057 }, 1058 + "@types/ws@8.18.1": { 1059 + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", 1060 + "dependencies": [ 1061 + "@types/node@24.2.0" 1062 + ] 1063 + }, 985 1064 "@ungap/structured-clone@1.3.0": { 986 1065 "integrity": "sha512-WmoN8qaIAo7WTYWbAZuG8PYEhn5fkz7dZrqTBZ7dtt//lL2Gwms1IcnQ5yHqjDfX8Ft5j4YzDM23f87zBfDe9g==" 987 1066 }, ··· 1138 1217 "cross-fetch@3.2.0": { 1139 1218 "integrity": "sha512-Q+xVJLoGOeIMXZmbUK4HYk+69cQH6LudR0Vu/pRm2YlU/hDV9CiS0gKUMaWY5f2NeUH9C1nV3bsTlCo0FsTV1Q==", 1140 1219 "dependencies": [ 1141 - "node-fetch" 1220 + "node-fetch@2.7.0" 1142 1221 ] 1143 1222 }, 1144 1223 "csstype@3.1.3": { 1145 1224 "integrity": "sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==" 1146 1225 }, 1226 + "data-uri-to-buffer@4.0.1": { 1227 + "integrity": "sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A==" 1228 + }, 1147 1229 "debug@4.4.3": { 1148 1230 "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", 1149 1231 "dependencies": [ ··· 1152 1234 }, 1153 1235 "dequal@2.0.3": { 1154 1236 "integrity": "sha512-0je+qPKHEMohvfRTCEo3CrPG6cAzAYgmzKyxRiYSSDkS6eGJdyVJm7WaYA5ECaAD9wLB2T4EEeymA5aFVcYXCA==" 1237 + }, 1238 + "detect-libc@2.0.2": { 1239 + "integrity": "sha512-UX6sGumvvqSaXgdKGUsgZWqcUyIXZ/vZTrlRT/iobiKhGL0zL4d3osHj3uqllWJK+i+sixDS/3COVEOFbupFyw==" 1155 1240 }, 1156 1241 "detect-libc@2.1.2": { 1157 1242 "integrity": "sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==" ··· 1258 1343 "picomatch@4.0.3" 1259 1344 ] 1260 1345 }, 1346 + "fetch-blob@3.2.0": { 1347 + "integrity": "sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==", 1348 + "dependencies": [ 1349 + "node-domexception", 1350 + "web-streams-polyfill" 1351 + ] 1352 + }, 1261 1353 "fill-range@7.1.1": { 1262 1354 "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", 1263 1355 "dependencies": [ 1264 1356 "to-regex-range" 1357 + ] 1358 + }, 1359 + "formdata-polyfill@4.0.10": { 1360 + "integrity": "sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==", 1361 + "dependencies": [ 1362 + "fetch-blob" 1265 1363 ] 1266 1364 }, 1267 1365 "fraction.js@4.3.7": { ··· 1381 1479 "integrity": "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==", 1382 1480 "bin": true 1383 1481 }, 1482 + "js-base64@3.7.8": { 1483 + "integrity": "sha512-hNngCeKxIUQiEUN3GPJOkz4wF/YvdUdbNL9hsBcMQTkKzboD7T/q3OYOuuPZLUE6dBxSGpwhk5mwuDud7JVAow==" 1484 + }, 1384 1485 "js-tokens@4.0.0": { 1385 1486 "integrity": "sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==" 1386 1487 }, ··· 1406 1507 "integrity": "sha512-XmOWe7eyHYH14cLdVPoyg+GOH3rYX++KpzrylJwSW98t3Nk+U8XOl8FWKOgwtzdb8lXGf6zYwDUzeHMWfxasyg==", 1407 1508 "bin": true 1408 1509 }, 1510 + "libsql@0.3.19": { 1511 + "integrity": "sha512-Aj5cQ5uk/6fHdmeW0TiXK42FqUlwx7ytmMLPSaUQPin5HKKKuUPD62MAbN4OEweGBBI7q1BekoEN4gPUEL6MZA==", 1512 + "dependencies": [ 1513 + "@neon-rs/load", 1514 + "detect-libc@2.0.2" 1515 + ], 1516 + "optionalDependencies": [ 1517 + "@libsql/darwin-arm64", 1518 + "@libsql/darwin-x64", 1519 + "@libsql/linux-arm64-gnu", 1520 + "@libsql/linux-arm64-musl", 1521 + "@libsql/linux-x64-gnu", 1522 + "@libsql/linux-x64-musl", 1523 + "@libsql/win32-x64-msvc" 1524 + ], 1525 + "os": ["darwin", "linux", "win32"], 1526 + "cpu": ["x64", "arm64", "wasm32"] 1527 + }, 1409 1528 "lightningcss-darwin-arm64@1.30.1": { 1410 1529 "integrity": "sha512-c8JK7hyE65X1MHMN+Viq9n11RRC7hgin3HhYKhrMyaXflk5GVplZ60IxyoVtzILeKr+xAJwg6zK6sjTBJ0FKYQ==", 1411 1530 "os": ["darwin"], ··· 1459 1578 "lightningcss@1.30.1": { 1460 1579 "integrity": "sha512-xi6IyHML+c9+Q3W0S4fCQJOym42pyurFiJUHEcEyHS0CeKzia4yZDEsLlqOFykxOdHpNy0NmvVO31vcSqAxJCg==", 1461 1580 "dependencies": [ 1462 - "detect-libc" 1581 + "detect-libc@2.1.2" 1463 1582 ], 1464 1583 "optionalDependencies": [ 1465 1584 "lightningcss-darwin-arm64", ··· 1588 1707 "integrity": "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==", 1589 1708 "bin": true 1590 1709 }, 1710 + "node-domexception@1.0.0": { 1711 + "integrity": "sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==", 1712 + "deprecated": true 1713 + }, 1591 1714 "node-fetch@2.7.0": { 1592 1715 "integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==", 1593 1716 "dependencies": [ 1594 1717 "whatwg-url" 1718 + ] 1719 + }, 1720 + "node-fetch@3.3.2": { 1721 + "integrity": "sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==", 1722 + "dependencies": [ 1723 + "data-uri-to-buffer", 1724 + "fetch-blob", 1725 + "formdata-polyfill" 1595 1726 ] 1596 1727 }, 1597 1728 "node-releases@2.0.23": { ··· 2087 2218 ], 2088 2219 "bin": true 2089 2220 }, 2221 + "web-streams-polyfill@3.3.3": { 2222 + "integrity": "sha512-d2JWLCivmZYTSIoge9MsgFCZrt571BikcWGYkjC1khllbTeDlGqZ2D8vD8E/lJa8WGWbb7Plm8/XJYV7IJHZZw==" 2223 + }, 2090 2224 "webidl-conversions@3.0.1": { 2091 2225 "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==" 2092 2226 }, ··· 2096 2230 "tr46", 2097 2231 "webidl-conversions" 2098 2232 ] 2233 + }, 2234 + "ws@8.18.3": { 2235 + "integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==" 2099 2236 }, 2100 2237 "xtend@4.0.2": { 2101 2238 "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==" ··· 2188 2325 "npm:ts-morph@26.0.0" 2189 2326 ] 2190 2327 }, 2328 + "packages/oauth": { 2329 + "dependencies": [ 2330 + "npm:@libsql/client@0.6.0" 2331 + ] 2332 + }, 2191 2333 "packages/session": { 2192 2334 "dependencies": [ 2335 + "npm:@libsql/client@0.6.0", 2193 2336 "npm:pg@^8.16.3" 2194 2337 ] 2195 2338 }
+20 -7
docs/graphql-api.md
··· 881 881 882 882 **Returns:** 883 883 884 - - `blob`: A JSON blob object containing: 885 - - `ref`: The CID (content identifier) reference for the blob 886 - - `mimeType`: The MIME type of the uploaded blob 887 - - `size`: The size of the blob in bytes 884 + - `blob`: A Blob object containing: 885 + - `ref` (String): The CID (content identifier) reference for the blob 886 + - `mimeType` (String): The MIME type of the uploaded blob 887 + - `size` (Int): The size of the blob in bytes 888 + - `url` (String): CDN URL for the blob (supports presets) 888 889 889 890 **Example with Variables:** 890 891 ··· 897 898 898 899 **Usage in Records:** 899 900 900 - After uploading a blob, use the returned blob object in your record mutations: 901 + After uploading a blob, use the returned blob object in your record mutations. You can provide the blob as a complete object with `ref` as a String: 901 902 902 903 ```graphql 903 904 mutation UpdateProfile($avatar: JSON) { ··· 905 906 rkey: "self" 906 907 input: { 907 908 displayName: "My Name" 908 - avatar: $avatar # Use the blob object from uploadBlob 909 + avatar: $avatar # Blob object with ref as String (CID) 909 910 } 910 911 ) { 911 912 uri 912 913 displayName 913 914 avatar { 914 - ref 915 + ref # Returns as String (CID) 915 916 mimeType 916 917 size 917 918 url(preset: "avatar") ··· 919 920 } 920 921 } 921 922 ``` 923 + 924 + **Example blob object for mutations:** 925 + 926 + ```json 927 + { 928 + "ref": "bafyreigbtj4x7ip5legnfznufuopl4sg4knzc2cof6duas4b3q2fy6swua", 929 + "mimeType": "image/jpeg", 930 + "size": 245678 931 + } 932 + ``` 933 + 934 + **Note:** The GraphQL API automatically handles the conversion between the GraphQL format (where `ref` is a String containing the CID) and the AT Protocol format (where `ref` is an object `{$link: "cid"}`). You always work with `ref` as a simple String in GraphQL queries and mutations. 922 935 923 936 ### Create Records 924 937
+5 -1
frontend-v2/schema.graphql
··· 931 931 } 932 932 933 933 type BlobUploadResponse { 934 - blob: JSON! 934 + blob: Blob! 935 935 } 936 936 937 937 type CollectionStats { ··· 1495 1495 Get statistics for this slice including collection counts, record counts, and actor counts 1496 1496 """ 1497 1497 stats: SliceStats! 1498 + 1499 + """Get all OAuth clients for this slice""" 1500 + oauthClients: [OAuthClient!]! 1498 1501 } 1499 1502 1500 1503 type NetworkSlicesSliceAggregated { ··· 2105 2108 type SyncJob { 2106 2109 id: ID! 2107 2110 jobId: String! 2111 + sliceUri: String! 2108 2112 status: String! 2109 2113 createdAt: String! 2110 2114 startedAt: String
+8 -7
frontend-v2/server/profile-init.ts
··· 18 18 export async function initializeUserProfile( 19 19 userDid: string, 20 20 userHandle: string, 21 - tokens: TokenInfo 21 + tokens: TokenInfo, 22 22 ): Promise<void> { 23 23 if (!API_URL || !SLICE_URI) { 24 24 console.error("Missing API_URL or VITE_SLICE_URI environment variables"); ··· 26 26 } 27 27 28 28 try { 29 - const graphqlUrl = `${API_URL}/graphql?slice=${encodeURIComponent(SLICE_URI)}`; 29 + const graphqlUrl = `${API_URL}/graphql?slice=${ 30 + encodeURIComponent(SLICE_URI) 31 + }`; 30 32 const authHeader = `${tokens.tokenType} ${tokens.accessToken}`; 31 33 32 34 // 1. Check if profile already exists ··· 132 134 }); 133 135 134 136 if (!bskyResponse.ok) { 135 - throw new Error(`Fetch Bluesky profile failed: ${bskyResponse.statusText}`); 137 + throw new Error( 138 + `Fetch Bluesky profile failed: ${bskyResponse.statusText}`, 139 + ); 136 140 } 137 141 138 142 const bskyData = await bskyResponse.json(); ··· 160 164 ) { 161 165 // Reconstruct blob format for AT Protocol 162 166 profileInput.avatar = { 163 - $type: "blob", 164 - ref: { 165 - $link: bskyProfile.avatar.ref, 166 - }, 167 + ref: bskyProfile.avatar.ref, 167 168 mimeType: bskyProfile.avatar.mimeType, 168 169 size: bskyProfile.avatar.size, 169 170 };
+66 -68
frontend-v2/src/__generated__/OAuthClientsQuery.graphql.ts
··· 1 1 /** 2 - * @generated SignedSource<<4c24e57ecdb7163fc62f4069422aac37>> 2 + * @generated SignedSource<<4bc4595b4bf2e4b263476f66a31ccca4>> 3 3 * @lightSyntaxTransform 4 4 * @nogrep 5 5 */ ··· 41 41 lte?: string | null | undefined; 42 42 }; 43 43 export type OAuthClientsQuery$variables = { 44 - slice: string; 45 44 where?: NetworkSlicesSliceWhereInput | null | undefined; 46 45 }; 47 46 export type OAuthClientsQuery$data = { ··· 51 50 readonly actorHandle: string | null | undefined; 52 51 readonly did: string; 53 52 readonly name: string; 53 + readonly oauthClients: ReadonlyArray<{ 54 + readonly clientId: string; 55 + readonly clientName: string; 56 + readonly clientSecret: string | null | undefined; 57 + readonly clientUri: string | null | undefined; 58 + readonly createdAt: string; 59 + readonly createdByDid: string; 60 + readonly logoUri: string | null | undefined; 61 + readonly policyUri: string | null | undefined; 62 + readonly redirectUris: ReadonlyArray<string>; 63 + readonly scope: string | null | undefined; 64 + readonly tosUri: string | null | undefined; 65 + }>; 66 + readonly uri: string; 54 67 }; 55 68 }>; 56 69 }; 57 - readonly oauthClients: ReadonlyArray<{ 58 - readonly clientId: string; 59 - readonly clientName: string; 60 - readonly clientSecret: string | null | undefined; 61 - readonly clientUri: string | null | undefined; 62 - readonly createdAt: string; 63 - readonly createdByDid: string; 64 - readonly logoUri: string | null | undefined; 65 - readonly policyUri: string | null | undefined; 66 - readonly redirectUris: ReadonlyArray<string>; 67 - readonly scope: string | null | undefined; 68 - readonly tosUri: string | null | undefined; 69 - }>; 70 70 }; 71 71 export type OAuthClientsQuery = { 72 72 response: OAuthClientsQuery$data; ··· 78 78 { 79 79 "defaultValue": null, 80 80 "kind": "LocalArgument", 81 - "name": "slice" 81 + "name": "where" 82 + } 83 + ], 84 + v1 = [ 85 + { 86 + "kind": "Literal", 87 + "name": "first", 88 + "value": 1 82 89 }, 83 90 { 84 - "defaultValue": null, 85 - "kind": "LocalArgument", 86 - "name": "where" 91 + "kind": "Variable", 92 + "name": "where", 93 + "variableName": "where" 87 94 } 88 95 ], 89 - v1 = { 96 + v2 = { 97 + "alias": null, 98 + "args": null, 99 + "kind": "ScalarField", 100 + "name": "name", 101 + "storageKey": null 102 + }, 103 + v3 = { 90 104 "alias": null, 91 - "args": [ 92 - { 93 - "kind": "Variable", 94 - "name": "slice", 95 - "variableName": "slice" 96 - } 97 - ], 105 + "args": null, 106 + "kind": "ScalarField", 107 + "name": "did", 108 + "storageKey": null 109 + }, 110 + v4 = { 111 + "alias": null, 112 + "args": null, 113 + "kind": "ScalarField", 114 + "name": "actorHandle", 115 + "storageKey": null 116 + }, 117 + v5 = { 118 + "alias": null, 119 + "args": null, 120 + "kind": "ScalarField", 121 + "name": "uri", 122 + "storageKey": null 123 + }, 124 + v6 = { 125 + "alias": null, 126 + "args": null, 98 127 "concreteType": "OAuthClient", 99 128 "kind": "LinkedField", 100 129 "name": "oauthClients", ··· 179 208 } 180 209 ], 181 210 "storageKey": null 182 - }, 183 - v2 = [ 184 - { 185 - "kind": "Literal", 186 - "name": "first", 187 - "value": 1 188 - }, 189 - { 190 - "kind": "Variable", 191 - "name": "where", 192 - "variableName": "where" 193 - } 194 - ], 195 - v3 = { 196 - "alias": null, 197 - "args": null, 198 - "kind": "ScalarField", 199 - "name": "name", 200 - "storageKey": null 201 - }, 202 - v4 = { 203 - "alias": null, 204 - "args": null, 205 - "kind": "ScalarField", 206 - "name": "did", 207 - "storageKey": null 208 - }, 209 - v5 = { 210 - "alias": null, 211 - "args": null, 212 - "kind": "ScalarField", 213 - "name": "actorHandle", 214 - "storageKey": null 215 211 }; 216 212 return { 217 213 "fragment": { ··· 220 216 "metadata": null, 221 217 "name": "OAuthClientsQuery", 222 218 "selections": [ 223 - (v1/*: any*/), 224 219 { 225 220 "alias": null, 226 - "args": (v2/*: any*/), 221 + "args": (v1/*: any*/), 227 222 "concreteType": "NetworkSlicesSliceConnection", 228 223 "kind": "LinkedField", 229 224 "name": "networkSlicesSlices", ··· 245 240 "name": "node", 246 241 "plural": false, 247 242 "selections": [ 243 + (v2/*: any*/), 248 244 (v3/*: any*/), 249 245 (v4/*: any*/), 250 - (v5/*: any*/) 246 + (v5/*: any*/), 247 + (v6/*: any*/) 251 248 ], 252 249 "storageKey": null 253 250 } ··· 267 264 "kind": "Operation", 268 265 "name": "OAuthClientsQuery", 269 266 "selections": [ 270 - (v1/*: any*/), 271 267 { 272 268 "alias": null, 273 - "args": (v2/*: any*/), 269 + "args": (v1/*: any*/), 274 270 "concreteType": "NetworkSlicesSliceConnection", 275 271 "kind": "LinkedField", 276 272 "name": "networkSlicesSlices", ··· 292 288 "name": "node", 293 289 "plural": false, 294 290 "selections": [ 291 + (v2/*: any*/), 295 292 (v3/*: any*/), 296 293 (v4/*: any*/), 297 294 (v5/*: any*/), 295 + (v6/*: any*/), 298 296 { 299 297 "alias": null, 300 298 "args": null, ··· 314 312 ] 315 313 }, 316 314 "params": { 317 - "cacheID": "20abc4b49d5c52da4a3ad1935662056a", 315 + "cacheID": "953a2b7074ba3074cca3f11991af440e", 318 316 "id": null, 319 317 "metadata": {}, 320 318 "name": "OAuthClientsQuery", 321 319 "operationKind": "query", 322 - "text": "query OAuthClientsQuery(\n $slice: String!\n $where: NetworkSlicesSliceWhereInput\n) {\n oauthClients(slice: $slice) {\n clientId\n clientSecret\n clientName\n redirectUris\n scope\n clientUri\n logoUri\n tosUri\n policyUri\n createdAt\n createdByDid\n }\n networkSlicesSlices(first: 1, where: $where) {\n edges {\n node {\n name\n did\n actorHandle\n id\n }\n }\n }\n}\n" 320 + "text": "query OAuthClientsQuery(\n $where: NetworkSlicesSliceWhereInput\n) {\n networkSlicesSlices(first: 1, where: $where) {\n edges {\n node {\n name\n did\n actorHandle\n uri\n oauthClients {\n clientId\n clientSecret\n clientName\n redirectUris\n scope\n clientUri\n logoUri\n tosUri\n policyUri\n createdAt\n createdByDid\n }\n id\n }\n }\n }\n}\n" 323 321 } 324 322 }; 325 323 })(); 326 324 327 - (node as any).hash = "b3eda4c7e0bda285a5261efa81e7b5cd"; 325 + (node as any).hash = "4c0e3d21f0879129255130f260edcb75"; 328 326 329 327 export default node;
+35 -6
frontend-v2/src/__generated__/ProfileSettingsUploadBlobMutation.graphql.ts
··· 1 1 /** 2 - * @generated SignedSource<<a2334c7e93bb6d5b4748df1211a418ae>> 2 + * @generated SignedSource<<728b9a3525f975b6c58a5cdcd323f89e>> 3 3 * @lightSyntaxTransform 4 4 * @nogrep 5 5 */ ··· 15 15 }; 16 16 export type ProfileSettingsUploadBlobMutation$data = { 17 17 readonly uploadBlob: { 18 - readonly blob: any; 18 + readonly blob: { 19 + readonly mimeType: string; 20 + readonly ref: string; 21 + readonly size: number; 22 + }; 19 23 }; 20 24 }; 21 25 export type ProfileSettingsUploadBlobMutation = { ··· 59 63 { 60 64 "alias": null, 61 65 "args": null, 62 - "kind": "ScalarField", 66 + "concreteType": "Blob", 67 + "kind": "LinkedField", 63 68 "name": "blob", 69 + "plural": false, 70 + "selections": [ 71 + { 72 + "alias": null, 73 + "args": null, 74 + "kind": "ScalarField", 75 + "name": "ref", 76 + "storageKey": null 77 + }, 78 + { 79 + "alias": null, 80 + "args": null, 81 + "kind": "ScalarField", 82 + "name": "mimeType", 83 + "storageKey": null 84 + }, 85 + { 86 + "alias": null, 87 + "args": null, 88 + "kind": "ScalarField", 89 + "name": "size", 90 + "storageKey": null 91 + } 92 + ], 64 93 "storageKey": null 65 94 } 66 95 ], ··· 85 114 "selections": (v1/*: any*/) 86 115 }, 87 116 "params": { 88 - "cacheID": "3a4a6b19d2898f14635b098941614cab", 117 + "cacheID": "afd8db2ee7590308e81afc0b0e5c86dd", 89 118 "id": null, 90 119 "metadata": {}, 91 120 "name": "ProfileSettingsUploadBlobMutation", 92 121 "operationKind": "mutation", 93 - "text": "mutation ProfileSettingsUploadBlobMutation(\n $data: String!\n $mimeType: String!\n) {\n uploadBlob(data: $data, mimeType: $mimeType) {\n blob\n }\n}\n" 122 + "text": "mutation ProfileSettingsUploadBlobMutation(\n $data: String!\n $mimeType: String!\n) {\n uploadBlob(data: $data, mimeType: $mimeType) {\n blob {\n ref\n mimeType\n size\n }\n }\n}\n" 94 123 } 95 124 }; 96 125 })(); 97 126 98 - (node as any).hash = "76da65b07a282ed7f2dee12b4cac82d6"; 127 + (node as any).hash = "74a3a8bf43181cd62d2e81c45be384e5"; 99 128 100 129 export default node;
+19 -23
frontend-v2/src/pages/OAuthClients.tsx
··· 138 138 export default function OAuthClients() { 139 139 const { handle, rkey } = useParams<{ handle: string; rkey: string }>(); 140 140 141 - // Build slice URI from params 142 - const sliceUri = 143 - `at://did:placeholder/${handle}/network.slices.slice/${rkey}`; 144 - 145 141 return ( 146 142 <Suspense 147 143 fallback={ ··· 152 148 </Layout> 153 149 } 154 150 > 155 - <OAuthClientsWrapper sliceUri={sliceUri} handle={handle!} rkey={rkey!} /> 151 + <OAuthClientsWrapper handle={handle!} rkey={rkey!} /> 156 152 </Suspense> 157 153 ); 158 154 } 159 155 160 156 function OAuthClientsWrapper( 161 - { sliceUri, handle, rkey }: { 162 - sliceUri: string; 157 + { handle, rkey }: { 163 158 handle: string; 164 159 rkey: string; 165 160 }, 166 161 ) { 167 162 const { session } = useSessionContext(); 163 + 168 164 const data = useLazyLoadQuery<OAuthClientsQuery>( 169 165 graphql` 170 166 query OAuthClientsQuery( 171 - $slice: String! 172 167 $where: NetworkSlicesSliceWhereInput 173 168 ) { 174 - oauthClients(slice: $slice) { 175 - clientId 176 - clientSecret 177 - clientName 178 - redirectUris 179 - scope 180 - clientUri 181 - logoUri 182 - tosUri 183 - policyUri 184 - createdAt 185 - createdByDid 186 - } 187 169 networkSlicesSlices(first: 1, where: $where) { 188 170 edges { 189 171 node { 190 172 name 191 173 did 192 174 actorHandle 175 + uri 176 + oauthClients { 177 + clientId 178 + clientSecret 179 + clientName 180 + redirectUris 181 + scope 182 + clientUri 183 + logoUri 184 + tosUri 185 + policyUri 186 + createdAt 187 + createdByDid 188 + } 193 189 } 194 190 } 195 191 } 196 192 } 197 193 `, 198 194 { 199 - slice: sliceUri, 200 195 where: { 201 196 actorHandle: { eq: handle }, 202 197 uri: { contains: rkey }, ··· 207 202 208 203 const slice = data.networkSlicesSlices.edges[0]?.node; 209 204 const sliceName = slice?.name; 205 + const sliceUri = slice?.uri || `at://${slice?.did}/network.slices.slice/${rkey}`; 210 206 211 207 // Check if current user is the slice owner or admin 212 208 const isOwner = isSliceOwner(slice, session); ··· 229 225 } 230 226 > 231 227 <OAuthClientsContent 232 - clients={data.oauthClients || []} 228 + clients={slice?.oauthClients || []} 233 229 sliceUri={sliceUri} 234 230 /> 235 231 </Layout>
+18 -13
frontend-v2/src/pages/ProfileSettings.tsx
··· 1 - import { useParams, Link } from "react-router-dom"; 1 + import { Link, useParams } from "react-router-dom"; 2 2 import { useState } from "react"; 3 3 import { graphql, useLazyLoadQuery, useMutation } from "react-relay"; 4 4 import type { ProfileSettingsQuery } from "../__generated__/ProfileSettingsQuery.graphql.ts"; ··· 44 44 where: { 45 45 actorHandle: { eq: handle }, 46 46 }, 47 - } 47 + }, 48 48 ); 49 49 50 50 const profile = data.networkSlicesActorProfiles.edges[0]?.node; ··· 59 59 graphql` 60 60 mutation ProfileSettingsUploadBlobMutation($data: String!, $mimeType: String!) { 61 61 uploadBlob(data: $data, mimeType: $mimeType) { 62 - blob 62 + blob { 63 + ref 64 + mimeType 65 + size 66 + } 63 67 } 64 68 } 65 - ` 69 + `, 66 70 ); 67 71 68 72 const [commitUpdateProfile, isUpdatingProfile] = useMutation( ··· 80 84 } 81 85 } 82 86 } 83 - ` 87 + `, 84 88 ); 85 89 86 90 const [commitCreateProfile, isCreatingProfile] = useMutation( ··· 98 102 } 99 103 } 100 104 } 101 - ` 105 + `, 102 106 ); 103 107 104 108 // Helper to convert File to base64 ··· 108 112 reader.onload = () => { 109 113 const arrayBuffer = reader.result as ArrayBuffer; 110 114 const bytes = new Uint8Array(arrayBuffer); 111 - const binary = Array.from(bytes).map(b => String.fromCharCode(b)).join(''); 115 + const binary = Array.from(bytes).map((b) => String.fromCharCode(b)) 116 + .join(""); 112 117 resolve(btoa(binary)); 113 118 }; 114 119 reader.onerror = reject; ··· 129 134 // Upload new avatar 130 135 const base64Data = await fileToBase64(avatarFile); 131 136 132 - const uploadResult = await new Promise<{ uploadBlob: { blob: unknown } }>((resolve, reject) => { 137 + const uploadResult = await new Promise< 138 + { uploadBlob: { blob: unknown } } 139 + >((resolve, reject) => { 133 140 commitUploadBlob({ 134 141 variables: { 135 142 data: base64Data, 136 143 mimeType: avatarFile.type, 137 144 }, 138 - onCompleted: (data) => resolve(data as { uploadBlob: { blob: unknown } }), 145 + onCompleted: (data) => 146 + resolve(data as { uploadBlob: { blob: unknown } }), 139 147 onError: (error) => reject(error), 140 148 }); 141 149 }); ··· 144 152 } else if (profile?.avatar) { 145 153 // Keep existing avatar - reconstruct blob with $type field for AT Protocol 146 154 avatarBlob = { 147 - $type: "blob", 148 - ref: { 149 - $link: profile.avatar.ref, 150 - }, 155 + ref: profile.avatar.ref, 151 156 mimeType: profile.avatar.mimeType, 152 157 size: profile.avatar.size, 153 158 };
+11 -11
packages/session/src/adapters/postgres.ts
··· 6 6 user_id: string; 7 7 handle: string | null; 8 8 is_authenticated: boolean; 9 - data: string | null; 10 - created_at: Date; 11 - expires_at: Date; 12 - last_accessed_at: Date; 9 + data: Record<string, unknown> | null; 10 + created_at: number; 11 + expires_at: number; 12 + last_accessed_at: number; 13 13 } 14 14 15 15 export class PostgresAdapter implements SessionAdapter { ··· 100 100 data.userId, 101 101 data.handle || null, 102 102 data.isAuthenticated, 103 - data.data ? JSON.stringify(data.data) : null, 103 + data.data || null, 104 104 data.createdAt, 105 105 data.expiresAt, 106 106 data.lastAccessedAt, ··· 116 116 updates: Partial<SessionData> 117 117 ): Promise<boolean> { 118 118 const setParts: string[] = []; 119 - const values: (string | number | boolean | null)[] = []; 119 + const values: (string | number | boolean | null | Record<string, unknown>)[] = []; 120 120 let paramIndex = 1; 121 121 122 122 if (updates.userId !== undefined) { ··· 136 136 137 137 if (updates.data !== undefined) { 138 138 setParts.push(`data = $${paramIndex++}`); 139 - values.push(updates.data ? JSON.stringify(updates.data) : null); 139 + values.push(updates.data || null); 140 140 } 141 141 142 142 if (updates.expiresAt !== undefined) { ··· 226 226 userId: row.user_id, 227 227 handle: row.handle || undefined, 228 228 isAuthenticated: row.is_authenticated, 229 - data: row.data ? JSON.parse(row.data) : undefined, 230 - createdAt: row.created_at.getTime(), 231 - expiresAt: row.expires_at.getTime(), 232 - lastAccessedAt: row.last_accessed_at.getTime(), 229 + data: row.data || undefined, 230 + createdAt: row.created_at, 231 + expiresAt: row.expires_at, 232 + lastAccessedAt: row.last_accessed_at, 233 233 }; 234 234 } 235 235