I've been saying "PDSes seem easy enough, they're what, some CRUD to a db? I can do that in my sleep". well i'm sleeping rn so let's go
at main 26 kB view raw
1mod common; 2 3use cid::Cid; 4use common::*; 5use futures::{SinkExt, stream::StreamExt}; 6use iroh_car::CarReader; 7use reqwest::StatusCode; 8use serde::{Deserialize, Serialize}; 9use serde_json::{Value, json}; 10use std::io::Cursor; 11use tokio_tungstenite::{connect_async, tungstenite}; 12 13#[derive(Debug, Deserialize, Serialize)] 14struct FrameHeader { 15 op: i64, 16 t: String, 17} 18 19#[derive(Debug, Deserialize)] 20struct CommitFrame { 21 seq: i64, 22 #[serde(default)] 23 rebase: bool, 24 #[serde(rename = "tooBig", default)] 25 too_big: bool, 26 repo: String, 27 commit: Cid, 28 rev: String, 29 since: Option<String>, 30 #[serde(with = "serde_bytes")] 31 blocks: Vec<u8>, 32 ops: Vec<RepoOp>, 33 #[serde(default)] 34 blobs: Vec<Cid>, 35 time: String, 36 #[serde(rename = "prevData")] 37 prev_data: Option<Cid>, 38} 39 40#[derive(Debug, Deserialize)] 41struct RepoOp { 42 action: String, 43 path: String, 44 cid: Option<Cid>, 45 prev: Option<Cid>, 46} 47 48fn find_cbor_map_end(bytes: &[u8]) -> Result<usize, String> { 49 let mut pos = 0; 50 51 fn read_uint(bytes: &[u8], pos: &mut usize, additional: u8) -> Result<u64, String> { 52 match additional { 53 0..=23 => Ok(additional as u64), 54 24 => { 55 if *pos >= bytes.len() { 56 return Err("Unexpected end".into()); 57 } 58 let val = bytes[*pos] as u64; 59 *pos += 1; 60 Ok(val) 61 } 62 25 => { 63 if *pos + 2 > bytes.len() { 64 return Err("Unexpected end".into()); 65 } 66 let val = u16::from_be_bytes([bytes[*pos], bytes[*pos + 1]]) as u64; 67 *pos += 2; 68 Ok(val) 69 } 70 26 => { 71 if *pos + 4 > bytes.len() { 72 return Err("Unexpected end".into()); 73 } 74 let val = u32::from_be_bytes([ 75 bytes[*pos], 76 bytes[*pos + 1], 77 bytes[*pos + 2], 78 bytes[*pos + 3], 79 ]) as u64; 80 *pos += 4; 81 Ok(val) 82 } 83 27 => { 84 if *pos + 8 > bytes.len() { 85 return Err("Unexpected end".into()); 86 } 87 let val = u64::from_be_bytes([ 88 bytes[*pos], 89 bytes[*pos + 1], 90 bytes[*pos + 2], 91 bytes[*pos + 3], 92 bytes[*pos + 4], 93 bytes[*pos + 5], 94 bytes[*pos + 6], 95 bytes[*pos + 7], 96 ]); 97 *pos += 8; 98 Ok(val) 99 } 100 _ => Err(format!("Invalid additional info: {}", additional)), 101 } 102 } 103 104 fn skip_value(bytes: &[u8], pos: &mut usize) -> Result<(), String> { 105 if *pos >= bytes.len() { 106 return Err("Unexpected end".into()); 107 } 108 let initial = bytes[*pos]; 109 *pos += 1; 110 let major = initial >> 5; 111 let additional = initial & 0x1f; 112 113 match major { 114 0 | 1 => { 115 read_uint(bytes, pos, additional)?; 116 Ok(()) 117 } 118 2 | 3 => { 119 let len = read_uint(bytes, pos, additional)? as usize; 120 *pos += len; 121 Ok(()) 122 } 123 4 => { 124 let len = read_uint(bytes, pos, additional)?; 125 for _ in 0..len { 126 skip_value(bytes, pos)?; 127 } 128 Ok(()) 129 } 130 5 => { 131 let len = read_uint(bytes, pos, additional)?; 132 for _ in 0..len { 133 skip_value(bytes, pos)?; 134 skip_value(bytes, pos)?; 135 } 136 Ok(()) 137 } 138 6 => { 139 read_uint(bytes, pos, additional)?; 140 skip_value(bytes, pos) 141 } 142 7 => Ok(()), 143 _ => Err(format!("Unknown major type: {}", major)), 144 } 145 } 146 147 skip_value(bytes, &mut pos)?; 148 Ok(pos) 149} 150 151fn parse_frame(bytes: &[u8]) -> Result<(FrameHeader, CommitFrame), String> { 152 let header_len = find_cbor_map_end(bytes)?; 153 let header: FrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 154 .map_err(|e| format!("Failed to parse header: {:?}", e))?; 155 156 if header.t != "#commit" { 157 return Err(format!("Not a commit frame: {}", header.t)); 158 } 159 160 let remaining = &bytes[header_len..]; 161 let frame: CommitFrame = serde_ipld_dagcbor::from_slice(remaining) 162 .map_err(|e| format!("Failed to parse commit frame: {:?}", e))?; 163 164 Ok((header, frame)) 165} 166 167fn is_valid_tid(s: &str) -> bool { 168 s.len() == 13 && s.chars().all(|c| c.is_alphanumeric()) 169} 170 171fn is_valid_time_format(s: &str) -> bool { 172 if !s.ends_with('Z') { 173 return false; 174 } 175 let parts: Vec<&str> = s.split('T').collect(); 176 if parts.len() != 2 { 177 return false; 178 } 179 let time_part = parts[1].trim_end_matches('Z'); 180 let time_parts: Vec<&str> = time_part.split(':').collect(); 181 if time_parts.len() != 3 { 182 return false; 183 } 184 let seconds_part = time_parts[2]; 185 if let Some(dot_pos) = seconds_part.find('.') { 186 let millis = &seconds_part[dot_pos + 1..]; 187 millis.len() == 3 188 } else { 189 false 190 } 191} 192 193#[tokio::test] 194async fn test_firehose_frame_structure() { 195 let client = client(); 196 let (token, did) = create_account_and_login(&client).await; 197 198 let url = format!( 199 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 200 app_port() 201 ); 202 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 203 tokio::time::sleep(std::time::Duration::from_millis(100)).await; 204 205 let post_text = "Testing firehose validation!"; 206 let post_payload = json!({ 207 "repo": did, 208 "collection": "app.bsky.feed.post", 209 "record": { 210 "$type": "app.bsky.feed.post", 211 "text": post_text, 212 "createdAt": chrono::Utc::now().to_rfc3339(), 213 } 214 }); 215 let res = client 216 .post(format!( 217 "{}/xrpc/com.atproto.repo.createRecord", 218 base_url().await 219 )) 220 .bearer_auth(&token) 221 .json(&post_payload) 222 .send() 223 .await 224 .expect("Failed to create post"); 225 assert_eq!(res.status(), StatusCode::OK); 226 227 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None; 228 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 229 loop { 230 let msg = ws_stream.next().await.unwrap().unwrap(); 231 let raw_bytes = match msg { 232 tungstenite::Message::Binary(bin) => bin, 233 _ => continue, 234 }; 235 if let Ok((h, f)) = parse_frame(&raw_bytes) 236 && f.repo == did 237 { 238 frame_opt = Some((h, f)); 239 break; 240 } 241 } 242 }) 243 .await; 244 assert!(timeout.is_ok(), "Timed out waiting for event for our DID"); 245 let (header, frame) = frame_opt.expect("No matching frame found"); 246 247 println!("\n=== Frame Structure Validation ===\n"); 248 249 println!("Header:"); 250 println!(" op: {} (expected: 1)", header.op); 251 println!(" t: {} (expected: #commit)", header.t); 252 assert_eq!(header.op, 1, "Header op should be 1"); 253 assert_eq!(header.t, "#commit", "Header t should be #commit"); 254 255 println!("\nCommitFrame fields:"); 256 println!(" seq: {}", frame.seq); 257 println!(" rebase: {}", frame.rebase); 258 println!(" tooBig: {}", frame.too_big); 259 println!(" repo: {}", frame.repo); 260 println!(" commit: {}", frame.commit); 261 println!( 262 " rev: {} (valid TID: {})", 263 frame.rev, 264 is_valid_tid(&frame.rev) 265 ); 266 println!(" since: {:?}", frame.since); 267 println!(" blocks length: {} bytes", frame.blocks.len()); 268 println!(" ops count: {}", frame.ops.len()); 269 println!(" blobs count: {}", frame.blobs.len()); 270 println!( 271 " time: {} (valid format: {})", 272 frame.time, 273 is_valid_time_format(&frame.time) 274 ); 275 println!( 276 " prevData: {:?} (IMPORTANT - should have value for updates)", 277 frame.prev_data 278 ); 279 280 assert_eq!(frame.repo, did, "Frame repo should match DID"); 281 assert!( 282 is_valid_tid(&frame.rev), 283 "Rev should be valid TID format, got: {}", 284 frame.rev 285 ); 286 assert!(!frame.blocks.is_empty(), "Blocks should not be empty"); 287 assert!( 288 is_valid_time_format(&frame.time), 289 "Time should be ISO 8601 with milliseconds and Z suffix" 290 ); 291 292 println!("\nOps validation:"); 293 for (i, op) in frame.ops.iter().enumerate() { 294 println!(" Op {}:", i); 295 println!(" action: {}", op.action); 296 println!(" path: {}", op.path); 297 println!(" cid: {:?}", op.cid); 298 println!( 299 " prev: {:?} (should be Some for updates/deletes)", 300 op.prev 301 ); 302 303 assert!( 304 ["create", "update", "delete"].contains(&op.action.as_str()), 305 "Invalid action: {}", 306 op.action 307 ); 308 assert!( 309 op.path.contains('/'), 310 "Path should contain collection/rkey: {}", 311 op.path 312 ); 313 314 if op.action == "create" { 315 assert!(op.cid.is_some(), "Create op should have cid"); 316 } 317 } 318 319 println!("\nCAR validation:"); 320 let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap(); 321 let car_header = car_reader.header().clone(); 322 println!(" CAR roots: {:?}", car_header.roots()); 323 324 assert!( 325 !car_header.roots().is_empty(), 326 "CAR should have at least one root" 327 ); 328 assert_eq!( 329 car_header.roots()[0], 330 frame.commit, 331 "First CAR root should be commit CID" 332 ); 333 334 let mut block_cids: Vec<Cid> = Vec::new(); 335 while let Ok(Some((cid, _))) = car_reader.next_block().await { 336 block_cids.push(cid); 337 } 338 println!(" CAR blocks: {} total", block_cids.len()); 339 for cid in &block_cids { 340 println!(" - {}", cid); 341 } 342 343 assert!( 344 block_cids.contains(&frame.commit), 345 "CAR should contain commit block" 346 ); 347 348 for op in &frame.ops { 349 if let Some(ref cid) = op.cid { 350 assert!( 351 block_cids.contains(cid), 352 "CAR should contain op's record block: {}", 353 cid 354 ); 355 } 356 } 357 358 println!("\n=== Validation Complete ===\n"); 359 360 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 361} 362 363#[tokio::test] 364async fn test_firehose_update_has_prev_field() { 365 let client = client(); 366 let (token, did) = create_account_and_login(&client).await; 367 368 let profile_payload = json!({ 369 "repo": did, 370 "collection": "app.bsky.actor.profile", 371 "rkey": "self", 372 "record": { 373 "$type": "app.bsky.actor.profile", 374 "displayName": "Test User v1", 375 } 376 }); 377 let res = client 378 .post(format!( 379 "{}/xrpc/com.atproto.repo.putRecord", 380 base_url().await 381 )) 382 .bearer_auth(&token) 383 .json(&profile_payload) 384 .send() 385 .await 386 .expect("Failed to create profile"); 387 assert_eq!(res.status(), StatusCode::OK); 388 let first_profile: Value = res.json().await.unwrap(); 389 let first_cid = first_profile["cid"].as_str().unwrap(); 390 391 let url = format!( 392 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 393 app_port() 394 ); 395 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 396 tokio::time::sleep(std::time::Duration::from_millis(100)).await; 397 398 let update_payload = json!({ 399 "repo": did, 400 "collection": "app.bsky.actor.profile", 401 "rkey": "self", 402 "record": { 403 "$type": "app.bsky.actor.profile", 404 "displayName": "Test User v2", 405 } 406 }); 407 let res = client 408 .post(format!( 409 "{}/xrpc/com.atproto.repo.putRecord", 410 base_url().await 411 )) 412 .bearer_auth(&token) 413 .json(&update_payload) 414 .send() 415 .await 416 .expect("Failed to update profile"); 417 assert_eq!(res.status(), StatusCode::OK); 418 419 let mut frame_opt: Option<CommitFrame> = None; 420 let timeout = tokio::time::timeout(std::time::Duration::from_secs(20), async { 421 loop { 422 let msg = match ws_stream.next().await { 423 Some(Ok(m)) => m, 424 _ => continue, 425 }; 426 let raw_bytes = match msg { 427 tungstenite::Message::Binary(bin) => bin, 428 _ => continue, 429 }; 430 if let Ok((_, f)) = parse_frame(&raw_bytes) 431 && f.repo == did 432 { 433 frame_opt = Some(f); 434 break; 435 } 436 } 437 }) 438 .await; 439 assert!(timeout.is_ok(), "Timed out waiting for update commit"); 440 let frame = frame_opt.expect("No matching frame found"); 441 442 println!("\n=== Update Operation Validation ===\n"); 443 println!("First profile CID: {}", first_cid); 444 println!("Frame prevData: {:?}", frame.prev_data); 445 446 for op in &frame.ops { 447 println!( 448 "Op: action={}, path={}, cid={:?}, prev={:?}", 449 op.action, op.path, op.cid, op.prev 450 ); 451 452 if op.action == "update" && op.path.contains("app.bsky.actor.profile") { 453 assert!( 454 op.prev.is_some(), 455 "Update operation should have 'prev' field with old CID! Got: {:?}", 456 op.prev 457 ); 458 println!(" ✓ Update op has prev field: {:?}", op.prev); 459 } 460 } 461 462 println!("\n=== Validation Complete ===\n"); 463 464 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 465} 466 467#[tokio::test] 468async fn test_firehose_commit_has_prev_data() { 469 let client = client(); 470 let (token, did) = create_account_and_login(&client).await; 471 472 let url = format!( 473 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 474 app_port() 475 ); 476 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 477 tokio::time::sleep(std::time::Duration::from_millis(100)).await; 478 479 let post_payload = json!({ 480 "repo": did, 481 "collection": "app.bsky.feed.post", 482 "record": { 483 "$type": "app.bsky.feed.post", 484 "text": "First post", 485 "createdAt": chrono::Utc::now().to_rfc3339(), 486 } 487 }); 488 client 489 .post(format!( 490 "{}/xrpc/com.atproto.repo.createRecord", 491 base_url().await 492 )) 493 .bearer_auth(&token) 494 .json(&post_payload) 495 .send() 496 .await 497 .expect("Failed to create first post"); 498 499 let mut first_frame_opt: Option<CommitFrame> = None; 500 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 501 loop { 502 let msg = ws_stream.next().await.unwrap().unwrap(); 503 let raw_bytes = match msg { 504 tungstenite::Message::Binary(bin) => bin, 505 _ => continue, 506 }; 507 if let Ok((_, f)) = parse_frame(&raw_bytes) 508 && f.repo == did 509 { 510 first_frame_opt = Some(f); 511 break; 512 } 513 } 514 }) 515 .await; 516 assert!(timeout.is_ok(), "Timed out waiting for first commit"); 517 let first_frame = first_frame_opt.expect("No first frame found"); 518 519 println!("\n=== First Commit ==="); 520 println!( 521 " prevData: {:?} (first commit may be None)", 522 first_frame.prev_data 523 ); 524 println!( 525 " since: {:?} (first commit should be None)", 526 first_frame.since 527 ); 528 529 let post_payload2 = json!({ 530 "repo": did, 531 "collection": "app.bsky.feed.post", 532 "record": { 533 "$type": "app.bsky.feed.post", 534 "text": "Second post", 535 "createdAt": chrono::Utc::now().to_rfc3339(), 536 } 537 }); 538 client 539 .post(format!( 540 "{}/xrpc/com.atproto.repo.createRecord", 541 base_url().await 542 )) 543 .bearer_auth(&token) 544 .json(&post_payload2) 545 .send() 546 .await 547 .expect("Failed to create second post"); 548 549 let mut second_frame_opt: Option<CommitFrame> = None; 550 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 551 loop { 552 let msg = ws_stream.next().await.unwrap().unwrap(); 553 let raw_bytes = match msg { 554 tungstenite::Message::Binary(bin) => bin, 555 _ => continue, 556 }; 557 if let Ok((_, f)) = parse_frame(&raw_bytes) 558 && f.repo == did 559 { 560 second_frame_opt = Some(f); 561 break; 562 } 563 } 564 }) 565 .await; 566 assert!(timeout.is_ok(), "Timed out waiting for second commit"); 567 let second_frame = second_frame_opt.expect("No second frame found"); 568 569 println!("\n=== Second Commit ==="); 570 println!( 571 " prevData: {:?} (should have value - MST root CID)", 572 second_frame.prev_data 573 ); 574 println!( 575 " since: {:?} (should have value - previous rev)", 576 second_frame.since 577 ); 578 579 assert!( 580 second_frame.since.is_some(), 581 "Second commit should have 'since' field pointing to first commit rev" 582 ); 583 584 println!("\n=== Validation Complete ===\n"); 585 586 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 587} 588 589#[tokio::test] 590async fn test_compare_raw_cbor_encoding() { 591 let client = client(); 592 let (token, did) = create_account_and_login(&client).await; 593 594 let url = format!( 595 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 596 app_port() 597 ); 598 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 599 tokio::time::sleep(std::time::Duration::from_millis(100)).await; 600 601 let post_payload = json!({ 602 "repo": did, 603 "collection": "app.bsky.feed.post", 604 "record": { 605 "$type": "app.bsky.feed.post", 606 "text": "CBOR encoding test", 607 "createdAt": chrono::Utc::now().to_rfc3339(), 608 } 609 }); 610 client 611 .post(format!( 612 "{}/xrpc/com.atproto.repo.createRecord", 613 base_url().await 614 )) 615 .bearer_auth(&token) 616 .json(&post_payload) 617 .send() 618 .await 619 .expect("Failed to create post"); 620 621 let mut raw_bytes_opt: Option<Vec<u8>> = None; 622 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 623 loop { 624 let msg = ws_stream.next().await.unwrap().unwrap(); 625 let raw = match msg { 626 tungstenite::Message::Binary(bin) => bin, 627 _ => continue, 628 }; 629 if let Ok((_, f)) = parse_frame(&raw) 630 && f.repo == did 631 { 632 raw_bytes_opt = Some(raw.to_vec()); 633 break; 634 } 635 } 636 }) 637 .await; 638 assert!(timeout.is_ok(), "Timed out waiting for event for our DID"); 639 let raw_bytes = raw_bytes_opt.expect("No matching frame found"); 640 641 println!("\n=== Raw CBOR Analysis ===\n"); 642 println!("Total frame size: {} bytes", raw_bytes.len()); 643 644 fn bytes_to_hex(bytes: &[u8]) -> String { 645 bytes 646 .iter() 647 .map(|b| format!("{:02x}", b)) 648 .collect::<Vec<_>>() 649 .join("") 650 } 651 652 println!( 653 "First 64 bytes (hex): {}", 654 bytes_to_hex(&raw_bytes[..64.min(raw_bytes.len())]) 655 ); 656 657 let header_end = find_cbor_map_end(&raw_bytes).expect("Failed to find header end"); 658 659 println!("\nHeader section: {} bytes", header_end); 660 println!("Header hex: {}", bytes_to_hex(&raw_bytes[..header_end])); 661 662 println!("\nPayload section: {} bytes", raw_bytes.len() - header_end); 663 664 println!("\n=== Analysis Complete ===\n"); 665 666 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 667} 668 669#[derive(Debug, Deserialize)] 670struct ErrorFrameHeader { 671 op: i64, 672} 673 674#[derive(Debug, Deserialize)] 675struct ErrorFrameBody { 676 error: String, 677 #[allow(dead_code)] 678 message: Option<String>, 679} 680 681#[derive(Debug, Deserialize)] 682struct InfoFrameHeader { 683 #[allow(dead_code)] 684 op: i64, 685 t: String, 686} 687 688#[derive(Debug, Deserialize)] 689struct InfoFrameBody { 690 name: String, 691 #[allow(dead_code)] 692 message: Option<String>, 693} 694 695fn parse_error_frame(bytes: &[u8]) -> Result<(ErrorFrameHeader, ErrorFrameBody), String> { 696 let header_len = find_cbor_map_end(bytes)?; 697 let header: ErrorFrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 698 .map_err(|e| format!("Failed to parse error header: {:?}", e))?; 699 700 if header.op != -1 { 701 return Err(format!("Not an error frame, op: {}", header.op)); 702 } 703 704 let remaining = &bytes[header_len..]; 705 let body: ErrorFrameBody = serde_ipld_dagcbor::from_slice(remaining) 706 .map_err(|e| format!("Failed to parse error body: {:?}", e))?; 707 708 Ok((header, body)) 709} 710 711fn parse_info_frame(bytes: &[u8]) -> Result<(InfoFrameHeader, InfoFrameBody), String> { 712 let header_len = find_cbor_map_end(bytes)?; 713 let header: InfoFrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 714 .map_err(|e| format!("Failed to parse info header: {:?}", e))?; 715 716 if header.t != "#info" { 717 return Err(format!("Not an info frame, t: {}", header.t)); 718 } 719 720 let remaining = &bytes[header_len..]; 721 let body: InfoFrameBody = serde_ipld_dagcbor::from_slice(remaining) 722 .map_err(|e| format!("Failed to parse info body: {:?}", e))?; 723 724 Ok((header, body)) 725} 726 727#[tokio::test] 728async fn test_firehose_future_cursor_error() { 729 let _ = base_url().await; 730 731 let future_cursor = 9999999999i64; 732 let url = format!( 733 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}", 734 app_port(), 735 future_cursor 736 ); 737 738 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 739 740 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 741 loop { 742 match ws_stream.next().await { 743 Some(Ok(tungstenite::Message::Binary(bin))) => { 744 if let Ok((header, body)) = parse_error_frame(&bin) { 745 println!("Received error frame: {:?} {:?}", header, body); 746 assert_eq!(header.op, -1, "Error frame op should be -1"); 747 assert_eq!(body.error, "FutureCursor", "Error should be FutureCursor"); 748 return true; 749 } 750 } 751 Some(Ok(tungstenite::Message::Close(_))) => { 752 println!("Connection closed"); 753 return false; 754 } 755 None => { 756 println!("Stream ended"); 757 return false; 758 } 759 _ => continue, 760 } 761 } 762 }) 763 .await; 764 765 match timeout { 766 Ok(received_error) => { 767 assert!( 768 received_error, 769 "Should have received FutureCursor error frame before connection closed" 770 ); 771 } 772 Err(_) => { 773 panic!( 774 "Timed out waiting for FutureCursor error - connection should close quickly with error" 775 ); 776 } 777 } 778} 779 780#[tokio::test] 781async fn test_firehose_outdated_cursor_info() { 782 let client = client(); 783 let (token, did) = create_account_and_login(&client).await; 784 785 let post_payload = json!({ 786 "repo": did, 787 "collection": "app.bsky.feed.post", 788 "record": { 789 "$type": "app.bsky.feed.post", 790 "text": "Post for outdated cursor test", 791 "createdAt": chrono::Utc::now().to_rfc3339(), 792 } 793 }); 794 let _ = client 795 .post(format!( 796 "{}/xrpc/com.atproto.repo.createRecord", 797 base_url().await 798 )) 799 .bearer_auth(&token) 800 .json(&post_payload) 801 .send() 802 .await 803 .expect("Failed to create post"); 804 805 tokio::time::sleep(std::time::Duration::from_millis(100)).await; 806 807 let outdated_cursor = 1i64; 808 let url = format!( 809 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}", 810 app_port(), 811 outdated_cursor 812 ); 813 814 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 815 816 let mut found_info = false; 817 let mut found_commit = false; 818 819 let timeout = tokio::time::timeout(std::time::Duration::from_secs(15), async { 820 loop { 821 match ws_stream.next().await { 822 Some(Ok(tungstenite::Message::Binary(bin))) => { 823 if let Ok((header, body)) = parse_info_frame(&bin) { 824 println!("Received info frame: {:?} {:?}", header, body); 825 if body.name == "OutdatedCursor" { 826 found_info = true; 827 println!("Found OutdatedCursor info frame!"); 828 } 829 } else if let Ok((_, frame)) = parse_frame(&bin) 830 && frame.repo == did 831 { 832 found_commit = true; 833 println!("Found commit for our DID"); 834 } 835 if found_commit { 836 break; 837 } 838 } 839 Some(Ok(tungstenite::Message::Close(_))) => break, 840 None => break, 841 _ => continue, 842 } 843 } 844 }) 845 .await; 846 847 assert!(timeout.is_ok(), "Timed out"); 848 assert!( 849 found_commit, 850 "Should have received commits even with outdated cursor" 851 ); 852}