Highly ambitious ATProtocol AppView service and sdks

fix sync job subscription slice filtering

Add slice_uri field to JobStatus struct and implement proper filtering in GraphQL subscriptions to prevent cross-slice job updates from appearing on the wrong slice's sync page.

- Add slice_uri to JobStatus struct in jobs.rs
- Update all JobStatus creations to include slice_uri from payload or database
- Implement slice filtering in syncJobUpdated subscription
- Add sliceUri field to SyncJob GraphQL type

Changed files
+38 -3
api
src
graphql
schema_ext
frontend-v2
+18 -3
api/src/graphql/schema_ext/sync.rs
··· 229 229 }) 230 230 })); 231 231 232 + job = job.field(Field::new("sliceUri", TypeRef::named_nn(TypeRef::STRING), |ctx| { 233 + FieldFuture::new(async move { 234 + let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; 235 + Ok(Some(GraphQLValue::from(container.status.slice_uri.clone()))) 236 + }) 237 + })); 238 + 232 239 job = job.field(Field::new("status", TypeRef::named_nn(TypeRef::STRING), |ctx| { 233 240 FieldFuture::new(async move { 234 241 let container = ctx.parent_value.try_downcast_ref::<JobStatusContainer>()?; ··· 711 718 let mut receiver = sender_lock.subscribe(); 712 719 drop(sender_lock); // Release lock 713 720 721 + // Get optional slice filter from arguments 722 + let slice_filter: Option<String> = ctx.args.get("slice") 723 + .and_then(|val| val.string().ok()) 724 + .map(|s| s.to_string()); 725 + 714 726 let stream = async_stream::stream! { 715 727 while let Ok(job_status) = receiver.recv().await { 716 728 // Filter by job_id if provided ··· 720 732 } 721 733 } 722 734 723 - // Filter by slice_uri if provided (need to query for slice_uri) 724 - // For now, skip slice filtering since JobStatus doesn't include slice_uri 725 - // TODO: Add slice_uri to JobStatus or query it separately 735 + // Filter by slice_uri if provided 736 + if let Some(ref filter_slice) = slice_filter { 737 + if &job_status.slice_uri != filter_slice { 738 + continue; 739 + } 740 + } 726 741 727 742 // Convert to GraphQL value and yield 728 743 let container = JobStatusContainer { status: job_status };
+19
api/src/jobs.rs
··· 148 148 // Publish job running status to subscribers 149 149 let running_status = JobStatus { 150 150 job_id: payload.job_id, 151 + slice_uri: payload.slice_uri.clone(), 151 152 status: "running".to_string(), 152 153 created_at: now, 153 154 started_at: Some(now), ··· 260 261 // Publish job status update to GraphQL subscribers 261 262 let job_status = JobStatus { 262 263 job_id: payload.job_id, 264 + slice_uri: payload.slice_uri.clone(), 263 265 status: "completed".to_string(), 264 266 created_at: chrono::Utc::now(), 265 267 started_at: Some(chrono::Utc::now()), ··· 337 339 // Publish job status update to GraphQL subscribers 338 340 let job_status = JobStatus { 339 341 job_id: payload.job_id, 342 + slice_uri: payload.slice_uri.clone(), 340 343 status: "failed".to_string(), 341 344 created_at: chrono::Utc::now(), 342 345 started_at: Some(chrono::Utc::now()), ··· 536 539 // Publish job creation event to subscribers 537 540 let job_status = JobStatus { 538 541 job_id, 542 + slice_uri: slice_uri.clone(), 539 543 status: "pending".to_string(), 540 544 created_at: chrono::Utc::now(), 541 545 started_at: None, ··· 559 563 pub struct JobStatus { 560 564 /// Unique identifier for the job 561 565 pub job_id: Uuid, 566 + /// Slice URI this job belongs to 567 + pub slice_uri: String, 562 568 /// Current status: "pending", "running", "completed", or "failed" 563 569 pub status: String, 564 570 /// Timestamp when job was enqueued ··· 611 617 612 618 return Ok(Some(JobStatus { 613 619 job_id, 620 + slice_uri: result.slice_uri, 614 621 status: result.status, 615 622 created_at: result.created_at, 616 623 started_at: Some(result.created_at), ··· 647 654 648 655 match queue_row { 649 656 Some(row) => { 657 + // Extract slice_uri from payload JSON 658 + let slice_uri = row.payload_json 659 + .as_ref() 660 + .and_then(|json| json.get("slice_uri")) 661 + .and_then(|v| v.as_str()) 662 + .unwrap_or_default() 663 + .to_string(); 664 + 650 665 // Determine status based on attempt_at timestamp 651 666 let status = if row.attempt_at.is_none() { 652 667 "completed".to_string() ··· 662 677 663 678 Ok(Some(JobStatus { 664 679 job_id, 680 + slice_uri, 665 681 status: status.clone(), 666 682 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 667 683 started_at: if status == "running" || status == "completed" { ··· 790 806 791 807 results.push(JobStatus { 792 808 job_id: row.job_id.unwrap_or_else(Uuid::new_v4), 809 + slice_uri: row.slice_uri.clone().unwrap_or_default(), 793 810 status: row.status.unwrap_or_default(), 794 811 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 795 812 started_at: row.created_at, ··· 902 919 903 920 results.push(JobStatus { 904 921 job_id: row.job_id.unwrap_or_else(Uuid::new_v4), 922 + slice_uri: row.slice_uri.clone().unwrap_or_default(), 905 923 status: row.status.unwrap_or_default(), 906 924 created_at: row.created_at.unwrap_or_else(chrono::Utc::now), 907 925 started_at: row.created_at, ··· 1055 1073 // Publish job status update for subscribers 1056 1074 let job_status = JobStatus { 1057 1075 job_id, 1076 + slice_uri, 1058 1077 status: "cancelled".to_string(), 1059 1078 created_at: now, 1060 1079 started_at: None,
+1
frontend-v2/schema.graphql
··· 2105 2105 type SyncJob { 2106 2106 id: ID! 2107 2107 jobId: String! 2108 + sliceUri: String! 2108 2109 status: String! 2109 2110 createdAt: String! 2110 2111 startedAt: String