+162
crates/atproto-jetstream/src/consumer.rs
+162
crates/atproto-jetstream/src/consumer.rs
···
456
456
self.id.clone()
457
457
}
458
458
}
459
+
460
+
#[cfg(test)]
461
+
mod tests {
462
+
use super::*;
463
+
464
+
#[test]
465
+
fn test_parse_account_event() {
466
+
let json_str = r#"{"did":"did:plc:yn72uqr4ihkjfbz7us7buqsq","time_us":1757517640675638,"kind":"account","account":{"active":false,"did":"did:plc:yn72uqr4ihkjfbz7us7buqsq","seq":13206502767,"status":"takendown","time":"2025-09-10T15:20:40.439Z"}}"#;
467
+
468
+
let event = serde_json::from_str::<JetstreamEvent>(json_str)
469
+
.expect("Failed to parse account event JSON");
470
+
471
+
match event {
472
+
JetstreamEvent::Account { did, time_us, kind, identity } => {
473
+
assert_eq!(did, "did:plc:yn72uqr4ihkjfbz7us7buqsq");
474
+
assert_eq!(time_us, 1757517640675638);
475
+
assert_eq!(kind, "account");
476
+
477
+
// Verify the account data structure
478
+
assert!(identity.is_object());
479
+
let account_obj = identity.as_object().unwrap();
480
+
assert_eq!(account_obj.get("active").unwrap(), &serde_json::json!(false));
481
+
assert_eq!(account_obj.get("did").unwrap(), &serde_json::json!("did:plc:yn72uqr4ihkjfbz7us7buqsq"));
482
+
assert_eq!(account_obj.get("seq").unwrap(), &serde_json::json!(13206502767i64));
483
+
assert_eq!(account_obj.get("status").unwrap(), &serde_json::json!("takendown"));
484
+
assert_eq!(account_obj.get("time").unwrap(), &serde_json::json!("2025-09-10T15:20:40.439Z"));
485
+
}
486
+
_ => panic!("Expected JetstreamEvent::Account variant, got {:?}", event),
487
+
}
488
+
}
489
+
490
+
#[test]
491
+
fn test_parse_identity_event() {
492
+
let json_str = r#"{"did":"did:plc:mbuadp4xzlbmc2ncqp3pmtox","time_us":1757517628039893,"kind":"identity","identity":{"did":"did:plc:mbuadp4xzlbmc2ncqp3pmtox","handle":"nhieothv.bsky.social","seq":13206497272,"time":"2025-09-10T15:20:27.610Z"}}"#;
493
+
494
+
let event = serde_json::from_str::<JetstreamEvent>(json_str)
495
+
.expect("Failed to parse identity event JSON");
496
+
497
+
match event {
498
+
JetstreamEvent::Identity { did, time_us, kind, identity } => {
499
+
assert_eq!(did, "did:plc:mbuadp4xzlbmc2ncqp3pmtox");
500
+
assert_eq!(time_us, 1757517628039893);
501
+
assert_eq!(kind, "identity");
502
+
503
+
// Verify the identity data structure
504
+
assert!(identity.is_object());
505
+
let identity_obj = identity.as_object().unwrap();
506
+
assert_eq!(identity_obj.get("did").unwrap(), &serde_json::json!("did:plc:mbuadp4xzlbmc2ncqp3pmtox"));
507
+
assert_eq!(identity_obj.get("handle").unwrap(), &serde_json::json!("nhieothv.bsky.social"));
508
+
assert_eq!(identity_obj.get("seq").unwrap(), &serde_json::json!(13206497272i64));
509
+
assert_eq!(identity_obj.get("time").unwrap(), &serde_json::json!("2025-09-10T15:20:27.610Z"));
510
+
}
511
+
_ => panic!("Expected JetstreamEvent::Identity variant, got {:?}", event),
512
+
}
513
+
}
514
+
515
+
#[test]
516
+
fn test_parse_delete_event() {
517
+
let json_str = r#"{"did":"did:plc:5ozthefrqdo5kqnxzfgthhpp","time_us":1757519323847323,"kind":"commit","commit":{"rev":"3lyileto4q52k","operation":"delete","collection":"app.bsky.graph.follow","rkey":"3lxqxntaew32z"}}"#;
518
+
519
+
let event = serde_json::from_str::<JetstreamEvent>(json_str)
520
+
.expect("Failed to parse delete event JSON");
521
+
522
+
match event {
523
+
JetstreamEvent::Delete { did, time_us, kind, commit } => {
524
+
assert_eq!(did, "did:plc:5ozthefrqdo5kqnxzfgthhpp");
525
+
assert_eq!(time_us, 1757519323847323);
526
+
assert_eq!(kind, "commit");
527
+
528
+
// Verify the delete operation details
529
+
assert_eq!(commit.rev, "3lyileto4q52k");
530
+
assert_eq!(commit.operation, "delete");
531
+
assert_eq!(commit.collection, "app.bsky.graph.follow");
532
+
assert_eq!(commit.rkey, "3lxqxntaew32z");
533
+
}
534
+
_ => panic!("Expected JetstreamEvent::Delete variant, got {:?}", event),
535
+
}
536
+
}
537
+
538
+
#[test]
539
+
fn test_parse_commit_event() {
540
+
let json_str = r#"{"did":"did:plc:suq5ijgyqmsawwf5tskf654x","time_us":1757519323848962,"kind":"commit","commit":{"rev":"3lyiletdopl2c","operation":"create","collection":"app.bsky.feed.like","rkey":"3lyiletddxt2c","record":{"$type":"app.bsky.feed.like","createdAt":"2025-09-10T15:47:13.086Z","subject":{"cid":"bafyreib2pygab7z5l7nkqf6bchcvgt4jwsqiaenpf3sr65lugum2uvzzf4","uri":"at://did:plc:yw65rktdby2chplqdytqzcao/app.bsky.feed.post/3lyildyjxgs2o"}},"cid":"bafyreigroo6vhxt62ufcndhaxzas6btq4jmniuz4egszbwuqgiyisqwqoy"}}"#;
541
+
542
+
let event = serde_json::from_str::<JetstreamEvent>(json_str)
543
+
.expect("Failed to parse commit event JSON");
544
+
545
+
match event {
546
+
JetstreamEvent::Commit { did, time_us, kind, commit } => {
547
+
assert_eq!(did, "did:plc:suq5ijgyqmsawwf5tskf654x");
548
+
assert_eq!(time_us, 1757519323848962);
549
+
assert_eq!(kind, "commit");
550
+
551
+
// Verify the commit operation details
552
+
assert_eq!(commit.rev, "3lyiletdopl2c");
553
+
assert_eq!(commit.operation, "create");
554
+
assert_eq!(commit.collection, "app.bsky.feed.like");
555
+
assert_eq!(commit.rkey, "3lyiletddxt2c");
556
+
assert_eq!(commit.cid, "bafyreigroo6vhxt62ufcndhaxzas6btq4jmniuz4egszbwuqgiyisqwqoy");
557
+
558
+
// Verify the record data structure
559
+
assert!(commit.record.is_object());
560
+
let record_obj = commit.record.as_object().unwrap();
561
+
assert_eq!(record_obj.get("$type").unwrap(), &serde_json::json!("app.bsky.feed.like"));
562
+
assert_eq!(record_obj.get("createdAt").unwrap(), &serde_json::json!("2025-09-10T15:47:13.086Z"));
563
+
564
+
// Verify the subject within the record
565
+
let subject = record_obj.get("subject").unwrap().as_object().unwrap();
566
+
assert_eq!(subject.get("cid").unwrap(), &serde_json::json!("bafyreib2pygab7z5l7nkqf6bchcvgt4jwsqiaenpf3sr65lugum2uvzzf4"));
567
+
assert_eq!(subject.get("uri").unwrap(), &serde_json::json!("at://did:plc:yw65rktdby2chplqdytqzcao/app.bsky.feed.post/3lyildyjxgs2o"));
568
+
}
569
+
_ => panic!("Expected JetstreamEvent::Commit variant, got {:?}", event),
570
+
}
571
+
}
572
+
573
+
#[test]
574
+
fn test_parse_commit_update_event() {
575
+
let json_str = r#"{"did":"did:plc:mek6cpladv2xrlu2zdykoxgz","time_us":1757519523286358,"kind":"commit","commit":{"rev":"3lyilmalk762z","operation":"update","collection":"app.bsky.actor.profile","rkey":"self","record":{"$type":"app.bsky.actor.profile","avatar":{"$type":"blob","ref":{"$link":"bafkreibmn7xi5iwugioov463wux62dg4m4w6qqrbsnileaobrzgxdwbsqy"},"mimeType":"image/jpeg","size":289838},"banner":{"$type":"blob","ref":{"$link":"bafkreicjgdlfs6fyyddjklfzrf6w2boychodkdebtjaiwavhcffjtuavsi"},"mimeType":"image/jpeg","size":676693},"description":"ela/dela | parte da fauna fantástica do céu azul | praticamente inofensiva","displayName":"la mucura mística","pinnedPost":{"cid":"bafyreihn2t4efvipbcignd6rlybmoecb7hx4jgntsojhpibjzxno3zhbuq","uri":"at://did:plc:mek6cpladv2xrlu2zdykoxgz/app.bsky.feed.post/3lxarfbd4ts2j"}},"cid":"bafyreifpmgw3podvvm4raq6zewn6jhoa73t7mlgf3f7hty2adb6f2ga7j4"}}"#;
576
+
577
+
let event = serde_json::from_str::<JetstreamEvent>(json_str)
578
+
.expect("Failed to parse commit update event JSON");
579
+
580
+
match event {
581
+
JetstreamEvent::Commit { did, time_us, kind, commit } => {
582
+
assert_eq!(did, "did:plc:mek6cpladv2xrlu2zdykoxgz");
583
+
assert_eq!(time_us, 1757519523286358);
584
+
assert_eq!(kind, "commit");
585
+
586
+
// Verify the commit operation details
587
+
assert_eq!(commit.rev, "3lyilmalk762z");
588
+
assert_eq!(commit.operation, "update");
589
+
assert_eq!(commit.collection, "app.bsky.actor.profile");
590
+
assert_eq!(commit.rkey, "self");
591
+
assert_eq!(commit.cid, "bafyreifpmgw3podvvm4raq6zewn6jhoa73t7mlgf3f7hty2adb6f2ga7j4");
592
+
593
+
// Verify the record data structure
594
+
assert!(commit.record.is_object());
595
+
let record_obj = commit.record.as_object().unwrap();
596
+
assert_eq!(record_obj.get("$type").unwrap(), &serde_json::json!("app.bsky.actor.profile"));
597
+
assert_eq!(record_obj.get("description").unwrap(), &serde_json::json!("ela/dela | parte da fauna fantástica do céu azul | praticamente inofensiva"));
598
+
assert_eq!(record_obj.get("displayName").unwrap(), &serde_json::json!("la mucura mística"));
599
+
600
+
// Verify avatar blob
601
+
let avatar = record_obj.get("avatar").unwrap().as_object().unwrap();
602
+
assert_eq!(avatar.get("$type").unwrap(), &serde_json::json!("blob"));
603
+
assert_eq!(avatar.get("mimeType").unwrap(), &serde_json::json!("image/jpeg"));
604
+
assert_eq!(avatar.get("size").unwrap(), &serde_json::json!(289838));
605
+
606
+
// Verify banner blob
607
+
let banner = record_obj.get("banner").unwrap().as_object().unwrap();
608
+
assert_eq!(banner.get("$type").unwrap(), &serde_json::json!("blob"));
609
+
assert_eq!(banner.get("mimeType").unwrap(), &serde_json::json!("image/jpeg"));
610
+
assert_eq!(banner.get("size").unwrap(), &serde_json::json!(676693));
611
+
612
+
// Verify pinned post
613
+
let pinned_post = record_obj.get("pinnedPost").unwrap().as_object().unwrap();
614
+
assert_eq!(pinned_post.get("cid").unwrap(), &serde_json::json!("bafyreihn2t4efvipbcignd6rlybmoecb7hx4jgntsojhpibjzxno3zhbuq"));
615
+
assert_eq!(pinned_post.get("uri").unwrap(), &serde_json::json!("at://did:plc:mek6cpladv2xrlu2zdykoxgz/app.bsky.feed.post/3lxarfbd4ts2j"));
616
+
}
617
+
_ => panic!("Expected JetstreamEvent::Commit variant, got {:?}", event),
618
+
}
619
+
}
620
+
}