forked from
lewis.moe/bspds-sandbox
I've been saying "PDSes seem easy enough, they're what, some CRUD to a db? I can do that in my sleep". well i'm sleeping rn so let's go
1mod common;
2
3use cid::Cid;
4use common::*;
5use futures::{SinkExt, stream::StreamExt};
6use iroh_car::CarReader;
7use reqwest::StatusCode;
8use serde::{Deserialize, Serialize};
9use serde_json::{Value, json};
10use std::io::Cursor;
11use tokio_tungstenite::{connect_async, tungstenite};
12
13#[derive(Debug, Deserialize, Serialize)]
14struct FrameHeader {
15 op: i64,
16 t: String,
17}
18
19#[derive(Debug, Deserialize)]
20struct CommitFrame {
21 seq: i64,
22 #[serde(default)]
23 rebase: bool,
24 #[serde(rename = "tooBig", default)]
25 too_big: bool,
26 repo: String,
27 commit: Cid,
28 rev: String,
29 since: Option<String>,
30 #[serde(with = "serde_bytes")]
31 blocks: Vec<u8>,
32 ops: Vec<RepoOp>,
33 #[serde(default)]
34 blobs: Vec<Cid>,
35 time: String,
36 #[serde(rename = "prevData")]
37 prev_data: Option<Cid>,
38}
39
40#[derive(Debug, Deserialize)]
41struct RepoOp {
42 action: String,
43 path: String,
44 cid: Option<Cid>,
45 prev: Option<Cid>,
46}
47
48fn find_cbor_map_end(bytes: &[u8]) -> Result<usize, String> {
49 let mut pos = 0;
50
51 fn read_uint(bytes: &[u8], pos: &mut usize, additional: u8) -> Result<u64, String> {
52 match additional {
53 0..=23 => Ok(additional as u64),
54 24 => {
55 if *pos >= bytes.len() {
56 return Err("Unexpected end".into());
57 }
58 let val = bytes[*pos] as u64;
59 *pos += 1;
60 Ok(val)
61 }
62 25 => {
63 if *pos + 2 > bytes.len() {
64 return Err("Unexpected end".into());
65 }
66 let val = u16::from_be_bytes([bytes[*pos], bytes[*pos + 1]]) as u64;
67 *pos += 2;
68 Ok(val)
69 }
70 26 => {
71 if *pos + 4 > bytes.len() {
72 return Err("Unexpected end".into());
73 }
74 let val = u32::from_be_bytes([
75 bytes[*pos],
76 bytes[*pos + 1],
77 bytes[*pos + 2],
78 bytes[*pos + 3],
79 ]) as u64;
80 *pos += 4;
81 Ok(val)
82 }
83 27 => {
84 if *pos + 8 > bytes.len() {
85 return Err("Unexpected end".into());
86 }
87 let val = u64::from_be_bytes([
88 bytes[*pos],
89 bytes[*pos + 1],
90 bytes[*pos + 2],
91 bytes[*pos + 3],
92 bytes[*pos + 4],
93 bytes[*pos + 5],
94 bytes[*pos + 6],
95 bytes[*pos + 7],
96 ]);
97 *pos += 8;
98 Ok(val)
99 }
100 _ => Err(format!("Invalid additional info: {}", additional)),
101 }
102 }
103
104 fn skip_value(bytes: &[u8], pos: &mut usize) -> Result<(), String> {
105 if *pos >= bytes.len() {
106 return Err("Unexpected end".into());
107 }
108 let initial = bytes[*pos];
109 *pos += 1;
110 let major = initial >> 5;
111 let additional = initial & 0x1f;
112
113 match major {
114 0 | 1 => {
115 read_uint(bytes, pos, additional)?;
116 Ok(())
117 }
118 2 | 3 => {
119 let len = read_uint(bytes, pos, additional)? as usize;
120 *pos += len;
121 Ok(())
122 }
123 4 => {
124 let len = read_uint(bytes, pos, additional)?;
125 for _ in 0..len {
126 skip_value(bytes, pos)?;
127 }
128 Ok(())
129 }
130 5 => {
131 let len = read_uint(bytes, pos, additional)?;
132 for _ in 0..len {
133 skip_value(bytes, pos)?;
134 skip_value(bytes, pos)?;
135 }
136 Ok(())
137 }
138 6 => {
139 read_uint(bytes, pos, additional)?;
140 skip_value(bytes, pos)
141 }
142 7 => Ok(()),
143 _ => Err(format!("Unknown major type: {}", major)),
144 }
145 }
146
147 skip_value(bytes, &mut pos)?;
148 Ok(pos)
149}
150
151fn parse_frame(bytes: &[u8]) -> Result<(FrameHeader, CommitFrame), String> {
152 let header_len = find_cbor_map_end(bytes)?;
153 let header: FrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len])
154 .map_err(|e| format!("Failed to parse header: {:?}", e))?;
155
156 if header.t != "#commit" {
157 return Err(format!("Not a commit frame: {}", header.t));
158 }
159
160 let remaining = &bytes[header_len..];
161 let frame: CommitFrame = serde_ipld_dagcbor::from_slice(remaining)
162 .map_err(|e| format!("Failed to parse commit frame: {:?}", e))?;
163
164 Ok((header, frame))
165}
166
167fn is_valid_tid(s: &str) -> bool {
168 s.len() == 13 && s.chars().all(|c| c.is_alphanumeric())
169}
170
171fn is_valid_time_format(s: &str) -> bool {
172 if !s.ends_with('Z') {
173 return false;
174 }
175 let parts: Vec<&str> = s.split('T').collect();
176 if parts.len() != 2 {
177 return false;
178 }
179 let time_part = parts[1].trim_end_matches('Z');
180 let time_parts: Vec<&str> = time_part.split(':').collect();
181 if time_parts.len() != 3 {
182 return false;
183 }
184 let seconds_part = time_parts[2];
185 if let Some(dot_pos) = seconds_part.find('.') {
186 let millis = &seconds_part[dot_pos + 1..];
187 millis.len() == 3
188 } else {
189 false
190 }
191}
192
193#[tokio::test]
194async fn test_firehose_frame_structure() {
195 let client = client();
196 let (token, did) = create_account_and_login(&client).await;
197
198 let url = format!(
199 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
200 app_port()
201 );
202 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
203 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
204
205 let post_text = "Testing firehose validation!";
206 let post_payload = json!({
207 "repo": did,
208 "collection": "app.bsky.feed.post",
209 "record": {
210 "$type": "app.bsky.feed.post",
211 "text": post_text,
212 "createdAt": chrono::Utc::now().to_rfc3339(),
213 }
214 });
215 let res = client
216 .post(format!(
217 "{}/xrpc/com.atproto.repo.createRecord",
218 base_url().await
219 ))
220 .bearer_auth(&token)
221 .json(&post_payload)
222 .send()
223 .await
224 .expect("Failed to create post");
225 assert_eq!(res.status(), StatusCode::OK);
226
227 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None;
228 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async {
229 loop {
230 let msg = ws_stream.next().await.unwrap().unwrap();
231 let raw_bytes = match msg {
232 tungstenite::Message::Binary(bin) => bin,
233 _ => continue,
234 };
235 if let Ok((h, f)) = parse_frame(&raw_bytes)
236 && f.repo == did
237 {
238 frame_opt = Some((h, f));
239 break;
240 }
241 }
242 })
243 .await;
244 assert!(timeout.is_ok(), "Timed out waiting for event for our DID");
245 let (header, frame) = frame_opt.expect("No matching frame found");
246
247 println!("\n=== Frame Structure Validation ===\n");
248
249 println!("Header:");
250 println!(" op: {} (expected: 1)", header.op);
251 println!(" t: {} (expected: #commit)", header.t);
252 assert_eq!(header.op, 1, "Header op should be 1");
253 assert_eq!(header.t, "#commit", "Header t should be #commit");
254
255 println!("\nCommitFrame fields:");
256 println!(" seq: {}", frame.seq);
257 println!(" rebase: {}", frame.rebase);
258 println!(" tooBig: {}", frame.too_big);
259 println!(" repo: {}", frame.repo);
260 println!(" commit: {}", frame.commit);
261 println!(
262 " rev: {} (valid TID: {})",
263 frame.rev,
264 is_valid_tid(&frame.rev)
265 );
266 println!(" since: {:?}", frame.since);
267 println!(" blocks length: {} bytes", frame.blocks.len());
268 println!(" ops count: {}", frame.ops.len());
269 println!(" blobs count: {}", frame.blobs.len());
270 println!(
271 " time: {} (valid format: {})",
272 frame.time,
273 is_valid_time_format(&frame.time)
274 );
275 println!(
276 " prevData: {:?} (IMPORTANT - should have value for updates)",
277 frame.prev_data
278 );
279
280 assert_eq!(frame.repo, did, "Frame repo should match DID");
281 assert!(
282 is_valid_tid(&frame.rev),
283 "Rev should be valid TID format, got: {}",
284 frame.rev
285 );
286 assert!(!frame.blocks.is_empty(), "Blocks should not be empty");
287 assert!(
288 is_valid_time_format(&frame.time),
289 "Time should be ISO 8601 with milliseconds and Z suffix"
290 );
291
292 println!("\nOps validation:");
293 for (i, op) in frame.ops.iter().enumerate() {
294 println!(" Op {}:", i);
295 println!(" action: {}", op.action);
296 println!(" path: {}", op.path);
297 println!(" cid: {:?}", op.cid);
298 println!(
299 " prev: {:?} (should be Some for updates/deletes)",
300 op.prev
301 );
302
303 assert!(
304 ["create", "update", "delete"].contains(&op.action.as_str()),
305 "Invalid action: {}",
306 op.action
307 );
308 assert!(
309 op.path.contains('/'),
310 "Path should contain collection/rkey: {}",
311 op.path
312 );
313
314 if op.action == "create" {
315 assert!(op.cid.is_some(), "Create op should have cid");
316 }
317 }
318
319 println!("\nCAR validation:");
320 let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap();
321 let car_header = car_reader.header().clone();
322 println!(" CAR roots: {:?}", car_header.roots());
323
324 assert!(
325 !car_header.roots().is_empty(),
326 "CAR should have at least one root"
327 );
328 assert_eq!(
329 car_header.roots()[0],
330 frame.commit,
331 "First CAR root should be commit CID"
332 );
333
334 let mut block_cids: Vec<Cid> = Vec::new();
335 while let Ok(Some((cid, _))) = car_reader.next_block().await {
336 block_cids.push(cid);
337 }
338 println!(" CAR blocks: {} total", block_cids.len());
339 for cid in &block_cids {
340 println!(" - {}", cid);
341 }
342
343 assert!(
344 block_cids.contains(&frame.commit),
345 "CAR should contain commit block"
346 );
347
348 for op in &frame.ops {
349 if let Some(ref cid) = op.cid {
350 assert!(
351 block_cids.contains(cid),
352 "CAR should contain op's record block: {}",
353 cid
354 );
355 }
356 }
357
358 println!("\n=== Validation Complete ===\n");
359
360 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
361}
362
363#[tokio::test]
364async fn test_firehose_update_has_prev_field() {
365 let client = client();
366 let (token, did) = create_account_and_login(&client).await;
367
368 let profile_payload = json!({
369 "repo": did,
370 "collection": "app.bsky.actor.profile",
371 "rkey": "self",
372 "record": {
373 "$type": "app.bsky.actor.profile",
374 "displayName": "Test User v1",
375 }
376 });
377 let res = client
378 .post(format!(
379 "{}/xrpc/com.atproto.repo.putRecord",
380 base_url().await
381 ))
382 .bearer_auth(&token)
383 .json(&profile_payload)
384 .send()
385 .await
386 .expect("Failed to create profile");
387 assert_eq!(res.status(), StatusCode::OK);
388 let first_profile: Value = res.json().await.unwrap();
389 let first_cid = first_profile["cid"].as_str().unwrap();
390
391 let url = format!(
392 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
393 app_port()
394 );
395 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
396 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
397
398 let update_payload = json!({
399 "repo": did,
400 "collection": "app.bsky.actor.profile",
401 "rkey": "self",
402 "record": {
403 "$type": "app.bsky.actor.profile",
404 "displayName": "Test User v2",
405 }
406 });
407 let res = client
408 .post(format!(
409 "{}/xrpc/com.atproto.repo.putRecord",
410 base_url().await
411 ))
412 .bearer_auth(&token)
413 .json(&update_payload)
414 .send()
415 .await
416 .expect("Failed to update profile");
417 assert_eq!(res.status(), StatusCode::OK);
418
419 let mut frame_opt: Option<CommitFrame> = None;
420 let timeout = tokio::time::timeout(std::time::Duration::from_secs(20), async {
421 loop {
422 let msg = match ws_stream.next().await {
423 Some(Ok(m)) => m,
424 _ => continue,
425 };
426 let raw_bytes = match msg {
427 tungstenite::Message::Binary(bin) => bin,
428 _ => continue,
429 };
430 if let Ok((_, f)) = parse_frame(&raw_bytes)
431 && f.repo == did
432 {
433 frame_opt = Some(f);
434 break;
435 }
436 }
437 })
438 .await;
439 assert!(timeout.is_ok(), "Timed out waiting for update commit");
440 let frame = frame_opt.expect("No matching frame found");
441
442 println!("\n=== Update Operation Validation ===\n");
443 println!("First profile CID: {}", first_cid);
444 println!("Frame prevData: {:?}", frame.prev_data);
445
446 for op in &frame.ops {
447 println!(
448 "Op: action={}, path={}, cid={:?}, prev={:?}",
449 op.action, op.path, op.cid, op.prev
450 );
451
452 if op.action == "update" && op.path.contains("app.bsky.actor.profile") {
453 assert!(
454 op.prev.is_some(),
455 "Update operation should have 'prev' field with old CID! Got: {:?}",
456 op.prev
457 );
458 println!(" ✓ Update op has prev field: {:?}", op.prev);
459 }
460 }
461
462 println!("\n=== Validation Complete ===\n");
463
464 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
465}
466
467#[tokio::test]
468async fn test_firehose_commit_has_prev_data() {
469 let client = client();
470 let (token, did) = create_account_and_login(&client).await;
471
472 let url = format!(
473 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
474 app_port()
475 );
476 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
477 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
478
479 let post_payload = json!({
480 "repo": did,
481 "collection": "app.bsky.feed.post",
482 "record": {
483 "$type": "app.bsky.feed.post",
484 "text": "First post",
485 "createdAt": chrono::Utc::now().to_rfc3339(),
486 }
487 });
488 client
489 .post(format!(
490 "{}/xrpc/com.atproto.repo.createRecord",
491 base_url().await
492 ))
493 .bearer_auth(&token)
494 .json(&post_payload)
495 .send()
496 .await
497 .expect("Failed to create first post");
498
499 let mut first_frame_opt: Option<CommitFrame> = None;
500 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async {
501 loop {
502 let msg = ws_stream.next().await.unwrap().unwrap();
503 let raw_bytes = match msg {
504 tungstenite::Message::Binary(bin) => bin,
505 _ => continue,
506 };
507 if let Ok((_, f)) = parse_frame(&raw_bytes)
508 && f.repo == did
509 {
510 first_frame_opt = Some(f);
511 break;
512 }
513 }
514 })
515 .await;
516 assert!(timeout.is_ok(), "Timed out waiting for first commit");
517 let first_frame = first_frame_opt.expect("No first frame found");
518
519 println!("\n=== First Commit ===");
520 println!(
521 " prevData: {:?} (first commit may be None)",
522 first_frame.prev_data
523 );
524 println!(
525 " since: {:?} (first commit should be None)",
526 first_frame.since
527 );
528
529 let post_payload2 = json!({
530 "repo": did,
531 "collection": "app.bsky.feed.post",
532 "record": {
533 "$type": "app.bsky.feed.post",
534 "text": "Second post",
535 "createdAt": chrono::Utc::now().to_rfc3339(),
536 }
537 });
538 client
539 .post(format!(
540 "{}/xrpc/com.atproto.repo.createRecord",
541 base_url().await
542 ))
543 .bearer_auth(&token)
544 .json(&post_payload2)
545 .send()
546 .await
547 .expect("Failed to create second post");
548
549 let mut second_frame_opt: Option<CommitFrame> = None;
550 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async {
551 loop {
552 let msg = ws_stream.next().await.unwrap().unwrap();
553 let raw_bytes = match msg {
554 tungstenite::Message::Binary(bin) => bin,
555 _ => continue,
556 };
557 if let Ok((_, f)) = parse_frame(&raw_bytes)
558 && f.repo == did
559 {
560 second_frame_opt = Some(f);
561 break;
562 }
563 }
564 })
565 .await;
566 assert!(timeout.is_ok(), "Timed out waiting for second commit");
567 let second_frame = second_frame_opt.expect("No second frame found");
568
569 println!("\n=== Second Commit ===");
570 println!(
571 " prevData: {:?} (should have value - MST root CID)",
572 second_frame.prev_data
573 );
574 println!(
575 " since: {:?} (should have value - previous rev)",
576 second_frame.since
577 );
578
579 assert!(
580 second_frame.since.is_some(),
581 "Second commit should have 'since' field pointing to first commit rev"
582 );
583
584 println!("\n=== Validation Complete ===\n");
585
586 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
587}
588
589#[tokio::test]
590async fn test_compare_raw_cbor_encoding() {
591 let client = client();
592 let (token, did) = create_account_and_login(&client).await;
593
594 let url = format!(
595 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
596 app_port()
597 );
598 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
599 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
600
601 let post_payload = json!({
602 "repo": did,
603 "collection": "app.bsky.feed.post",
604 "record": {
605 "$type": "app.bsky.feed.post",
606 "text": "CBOR encoding test",
607 "createdAt": chrono::Utc::now().to_rfc3339(),
608 }
609 });
610 client
611 .post(format!(
612 "{}/xrpc/com.atproto.repo.createRecord",
613 base_url().await
614 ))
615 .bearer_auth(&token)
616 .json(&post_payload)
617 .send()
618 .await
619 .expect("Failed to create post");
620
621 let mut raw_bytes_opt: Option<Vec<u8>> = None;
622 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async {
623 loop {
624 let msg = ws_stream.next().await.unwrap().unwrap();
625 let raw = match msg {
626 tungstenite::Message::Binary(bin) => bin,
627 _ => continue,
628 };
629 if let Ok((_, f)) = parse_frame(&raw)
630 && f.repo == did
631 {
632 raw_bytes_opt = Some(raw.to_vec());
633 break;
634 }
635 }
636 })
637 .await;
638 assert!(timeout.is_ok(), "Timed out waiting for event for our DID");
639 let raw_bytes = raw_bytes_opt.expect("No matching frame found");
640
641 println!("\n=== Raw CBOR Analysis ===\n");
642 println!("Total frame size: {} bytes", raw_bytes.len());
643
644 fn bytes_to_hex(bytes: &[u8]) -> String {
645 bytes
646 .iter()
647 .map(|b| format!("{:02x}", b))
648 .collect::<Vec<_>>()
649 .join("")
650 }
651
652 println!(
653 "First 64 bytes (hex): {}",
654 bytes_to_hex(&raw_bytes[..64.min(raw_bytes.len())])
655 );
656
657 let header_end = find_cbor_map_end(&raw_bytes).expect("Failed to find header end");
658
659 println!("\nHeader section: {} bytes", header_end);
660 println!("Header hex: {}", bytes_to_hex(&raw_bytes[..header_end]));
661
662 println!("\nPayload section: {} bytes", raw_bytes.len() - header_end);
663
664 println!("\n=== Analysis Complete ===\n");
665
666 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
667}
668
669#[derive(Debug, Deserialize)]
670struct ErrorFrameHeader {
671 op: i64,
672}
673
674#[derive(Debug, Deserialize)]
675struct ErrorFrameBody {
676 error: String,
677 #[allow(dead_code)]
678 message: Option<String>,
679}
680
681#[derive(Debug, Deserialize)]
682struct InfoFrameHeader {
683 #[allow(dead_code)]
684 op: i64,
685 t: String,
686}
687
688#[derive(Debug, Deserialize)]
689struct InfoFrameBody {
690 name: String,
691 #[allow(dead_code)]
692 message: Option<String>,
693}
694
695fn parse_error_frame(bytes: &[u8]) -> Result<(ErrorFrameHeader, ErrorFrameBody), String> {
696 let header_len = find_cbor_map_end(bytes)?;
697 let header: ErrorFrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len])
698 .map_err(|e| format!("Failed to parse error header: {:?}", e))?;
699
700 if header.op != -1 {
701 return Err(format!("Not an error frame, op: {}", header.op));
702 }
703
704 let remaining = &bytes[header_len..];
705 let body: ErrorFrameBody = serde_ipld_dagcbor::from_slice(remaining)
706 .map_err(|e| format!("Failed to parse error body: {:?}", e))?;
707
708 Ok((header, body))
709}
710
711fn parse_info_frame(bytes: &[u8]) -> Result<(InfoFrameHeader, InfoFrameBody), String> {
712 let header_len = find_cbor_map_end(bytes)?;
713 let header: InfoFrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len])
714 .map_err(|e| format!("Failed to parse info header: {:?}", e))?;
715
716 if header.t != "#info" {
717 return Err(format!("Not an info frame, t: {}", header.t));
718 }
719
720 let remaining = &bytes[header_len..];
721 let body: InfoFrameBody = serde_ipld_dagcbor::from_slice(remaining)
722 .map_err(|e| format!("Failed to parse info body: {:?}", e))?;
723
724 Ok((header, body))
725}
726
727#[tokio::test]
728async fn test_firehose_future_cursor_error() {
729 let _ = base_url().await;
730
731 let future_cursor = 9999999999i64;
732 let url = format!(
733 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}",
734 app_port(),
735 future_cursor
736 );
737
738 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
739
740 let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async {
741 loop {
742 match ws_stream.next().await {
743 Some(Ok(tungstenite::Message::Binary(bin))) => {
744 if let Ok((header, body)) = parse_error_frame(&bin) {
745 println!("Received error frame: {:?} {:?}", header, body);
746 assert_eq!(header.op, -1, "Error frame op should be -1");
747 assert_eq!(body.error, "FutureCursor", "Error should be FutureCursor");
748 return true;
749 }
750 }
751 Some(Ok(tungstenite::Message::Close(_))) => {
752 println!("Connection closed");
753 return false;
754 }
755 None => {
756 println!("Stream ended");
757 return false;
758 }
759 _ => continue,
760 }
761 }
762 })
763 .await;
764
765 match timeout {
766 Ok(received_error) => {
767 assert!(
768 received_error,
769 "Should have received FutureCursor error frame before connection closed"
770 );
771 }
772 Err(_) => {
773 panic!(
774 "Timed out waiting for FutureCursor error - connection should close quickly with error"
775 );
776 }
777 }
778}
779
780#[tokio::test]
781async fn test_firehose_outdated_cursor_info() {
782 let client = client();
783 let (token, did) = create_account_and_login(&client).await;
784
785 let post_payload = json!({
786 "repo": did,
787 "collection": "app.bsky.feed.post",
788 "record": {
789 "$type": "app.bsky.feed.post",
790 "text": "Post for outdated cursor test",
791 "createdAt": chrono::Utc::now().to_rfc3339(),
792 }
793 });
794 let _ = client
795 .post(format!(
796 "{}/xrpc/com.atproto.repo.createRecord",
797 base_url().await
798 ))
799 .bearer_auth(&token)
800 .json(&post_payload)
801 .send()
802 .await
803 .expect("Failed to create post");
804
805 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
806
807 let outdated_cursor = 1i64;
808 let url = format!(
809 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}",
810 app_port(),
811 outdated_cursor
812 );
813
814 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
815
816 let mut found_info = false;
817 let mut found_commit = false;
818
819 let timeout = tokio::time::timeout(std::time::Duration::from_secs(15), async {
820 loop {
821 match ws_stream.next().await {
822 Some(Ok(tungstenite::Message::Binary(bin))) => {
823 if let Ok((header, body)) = parse_info_frame(&bin) {
824 println!("Received info frame: {:?} {:?}", header, body);
825 if body.name == "OutdatedCursor" {
826 found_info = true;
827 println!("Found OutdatedCursor info frame!");
828 }
829 } else if let Ok((_, frame)) = parse_frame(&bin)
830 && frame.repo == did
831 {
832 found_commit = true;
833 println!("Found commit for our DID");
834 }
835 if found_commit {
836 break;
837 }
838 }
839 Some(Ok(tungstenite::Message::Close(_))) => break,
840 None => break,
841 _ => continue,
842 }
843 }
844 })
845 .await;
846
847 assert!(timeout.is_ok(), "Timed out");
848 assert!(
849 found_commit,
850 "Should have received commits even with outdated cursor"
851 );
852}