atproto libraries implementation in ocaml
at main 29 kB view raw
1(** Tests for Firehose (Event Stream) Client and Repository Sync *) 2 3open Atproto_sync 4open Atproto_ipld 5module Repo_sync = Atproto_sync.Repo_sync 6 7(** {1 Test Helpers} *) 8 9(** Build a frame from header and payload CBOR values *) 10let make_frame header_cbor payload_cbor = 11 let header_bytes = Dag_cbor.encode header_cbor in 12 let payload_bytes = Dag_cbor.encode payload_cbor in 13 header_bytes ^ payload_bytes 14 15(** Build a message frame header *) 16let message_header event_type = 17 Dag_cbor.Map [ ("op", Dag_cbor.Int 1L); ("t", Dag_cbor.String event_type) ] 18 19(** Build an error frame header *) 20let error_header () = Dag_cbor.Map [ ("op", Dag_cbor.Int (-1L)) ] 21 22(** Create a test CID *) 23let test_cid = 24 (* A valid CID for testing - CIDv1, dag-cbor, sha2-256 *) 25 match 26 Cid.of_string "bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454" 27 with 28 | Ok cid -> cid 29 | Error _ -> failwith "Invalid test CID" 30 31(** {1 Frame Decoding Tests} *) 32 33let test_decode_commit_event () = 34 let payload = 35 Dag_cbor.Map 36 [ 37 ("seq", Dag_cbor.Int 12345L); 38 ("repo", Dag_cbor.String "did:plc:test123"); 39 ("rev", Dag_cbor.String "3jui7kd2z2y2a"); 40 ("since", Dag_cbor.String "3jui7kd2z2y2b"); 41 ("commit", Dag_cbor.Link test_cid); 42 ("blocks", Dag_cbor.Bytes "\x00\x01\x02\x03"); 43 ( "ops", 44 Dag_cbor.Array 45 [ 46 Dag_cbor.Map 47 [ 48 ("action", Dag_cbor.String "create"); 49 ("path", Dag_cbor.String "app.bsky.feed.post/abc123"); 50 ("cid", Dag_cbor.Link test_cid); 51 ]; 52 ] ); 53 ("tooBig", Dag_cbor.Bool false); 54 ] 55 in 56 let frame = make_frame (message_header "#commit") payload in 57 match Firehose.decode_frame frame with 58 | Ok (Firehose.Commit evt) -> 59 Alcotest.(check int64) "seq" 12345L evt.seq; 60 Alcotest.(check string) "repo" "did:plc:test123" evt.repo; 61 Alcotest.(check string) "rev" "3jui7kd2z2y2a" evt.rev; 62 Alcotest.(check (option string)) "since" (Some "3jui7kd2z2y2b") evt.since; 63 Alcotest.(check string) "blocks" "\x00\x01\x02\x03" evt.blocks; 64 Alcotest.(check int) "ops count" 1 (List.length evt.ops); 65 let op = List.hd evt.ops in 66 Alcotest.(check bool) "action is create" true (op.action = `Create); 67 Alcotest.(check string) "op path" "app.bsky.feed.post/abc123" op.path; 68 Alcotest.(check bool) "too_big" false evt.too_big 69 | Ok _ -> Alcotest.fail "Expected Commit event" 70 | Error e -> Alcotest.fail (Firehose.error_to_string e) 71 72let test_decode_identity_event () = 73 let payload = 74 Dag_cbor.Map 75 [ 76 ("seq", Dag_cbor.Int 99L); 77 ("did", Dag_cbor.String "did:plc:user123"); 78 ("time", Dag_cbor.String "2024-01-15T10:30:00Z"); 79 ("handle", Dag_cbor.String "alice.bsky.social"); 80 ] 81 in 82 let frame = make_frame (message_header "#identity") payload in 83 match Firehose.decode_frame frame with 84 | Ok (Firehose.Identity evt) -> 85 Alcotest.(check int64) "seq" 99L evt.seq; 86 Alcotest.(check string) "did" "did:plc:user123" evt.did; 87 Alcotest.(check string) "time" "2024-01-15T10:30:00Z" evt.time; 88 Alcotest.(check (option string)) 89 "handle" (Some "alice.bsky.social") evt.handle 90 | Ok _ -> Alcotest.fail "Expected Identity event" 91 | Error e -> Alcotest.fail (Firehose.error_to_string e) 92 93let test_decode_identity_event_no_handle () = 94 let payload = 95 Dag_cbor.Map 96 [ 97 ("seq", Dag_cbor.Int 100L); 98 ("did", Dag_cbor.String "did:plc:user456"); 99 ("time", Dag_cbor.String "2024-01-15T11:00:00Z"); 100 ] 101 in 102 let frame = make_frame (message_header "#identity") payload in 103 match Firehose.decode_frame frame with 104 | Ok (Firehose.Identity evt) -> 105 Alcotest.(check int64) "seq" 100L evt.seq; 106 Alcotest.(check (option string)) "handle" None evt.handle 107 | Ok _ -> Alcotest.fail "Expected Identity event" 108 | Error e -> Alcotest.fail (Firehose.error_to_string e) 109 110let test_decode_account_event () = 111 let payload = 112 Dag_cbor.Map 113 [ 114 ("seq", Dag_cbor.Int 200L); 115 ("did", Dag_cbor.String "did:plc:account123"); 116 ("time", Dag_cbor.String "2024-01-15T12:00:00Z"); 117 ("active", Dag_cbor.Bool true); 118 ("status", Dag_cbor.String "active"); 119 ] 120 in 121 let frame = make_frame (message_header "#account") payload in 122 match Firehose.decode_frame frame with 123 | Ok (Firehose.Account evt) -> 124 Alcotest.(check int64) "seq" 200L evt.seq; 125 Alcotest.(check string) "did" "did:plc:account123" evt.did; 126 Alcotest.(check bool) "active" true evt.active; 127 Alcotest.(check (option string)) "status" (Some "active") evt.status 128 | Ok _ -> Alcotest.fail "Expected Account event" 129 | Error e -> Alcotest.fail (Firehose.error_to_string e) 130 131let test_decode_handle_event () = 132 let payload = 133 Dag_cbor.Map 134 [ 135 ("seq", Dag_cbor.Int 300L); 136 ("did", Dag_cbor.String "did:plc:handle123"); 137 ("time", Dag_cbor.String "2024-01-15T13:00:00Z"); 138 ("handle", Dag_cbor.String "newhandle.bsky.social"); 139 ] 140 in 141 let frame = make_frame (message_header "#handle") payload in 142 match Firehose.decode_frame frame with 143 | Ok (Firehose.Handle evt) -> 144 Alcotest.(check int64) "seq" 300L evt.seq; 145 Alcotest.(check string) "did" "did:plc:handle123" evt.did; 146 Alcotest.(check string) "handle" "newhandle.bsky.social" evt.handle 147 | Ok _ -> Alcotest.fail "Expected Handle event" 148 | Error e -> Alcotest.fail (Firehose.error_to_string e) 149 150let test_decode_tombstone_event () = 151 let payload = 152 Dag_cbor.Map 153 [ 154 ("seq", Dag_cbor.Int 400L); 155 ("did", Dag_cbor.String "did:plc:deleted123"); 156 ("time", Dag_cbor.String "2024-01-15T14:00:00Z"); 157 ] 158 in 159 let frame = make_frame (message_header "#tombstone") payload in 160 match Firehose.decode_frame frame with 161 | Ok (Firehose.Tombstone evt) -> 162 Alcotest.(check int64) "seq" 400L evt.seq; 163 Alcotest.(check string) "did" "did:plc:deleted123" evt.did; 164 Alcotest.(check string) "time" "2024-01-15T14:00:00Z" evt.time 165 | Ok _ -> Alcotest.fail "Expected Tombstone event" 166 | Error e -> Alcotest.fail (Firehose.error_to_string e) 167 168let test_decode_info_event () = 169 let payload = 170 Dag_cbor.Map 171 [ 172 ("name", Dag_cbor.String "OutdatedCursor"); 173 ("message", Dag_cbor.String "Cursor is outdated"); 174 ] 175 in 176 let frame = make_frame (message_header "#info") payload in 177 match Firehose.decode_frame frame with 178 | Ok (Firehose.Info msg) -> 179 Alcotest.(check string) "name" "OutdatedCursor" msg.name; 180 Alcotest.(check (option string)) 181 "message" (Some "Cursor is outdated") msg.message 182 | Ok _ -> Alcotest.fail "Expected Info event" 183 | Error e -> Alcotest.fail (Firehose.error_to_string e) 184 185let test_decode_stream_error () = 186 let payload = Dag_cbor.Map [ ("error", Dag_cbor.String "FutureCursor") ] in 187 let frame = make_frame (error_header ()) payload in 188 match Firehose.decode_frame frame with 189 | Ok (Firehose.StreamError msg) -> 190 Alcotest.(check string) "error" "FutureCursor" msg 191 | Ok _ -> Alcotest.fail "Expected StreamError event" 192 | Error e -> Alcotest.fail (Firehose.error_to_string e) 193 194let test_decode_unknown_event_type () = 195 let payload = Dag_cbor.Map [ ("foo", Dag_cbor.String "bar") ] in 196 let frame = make_frame (message_header "#unknown") payload in 197 match Firehose.decode_frame frame with 198 | Error (Firehose.Protocol_error msg) -> 199 Alcotest.(check bool) "contains unknown type" true (String.length msg > 0) 200 | Ok _ -> Alcotest.fail "Expected Protocol_error" 201 | Error e -> Alcotest.fail ("Wrong error: " ^ Firehose.error_to_string e) 202 203let test_decode_invalid_cbor () = 204 match Firehose.decode_frame "not valid cbor" with 205 | Error (Firehose.Decode_error _) -> () 206 | Ok _ -> Alcotest.fail "Expected Decode_error" 207 | Error e -> Alcotest.fail ("Wrong error: " ^ Firehose.error_to_string e) 208 209let test_decode_missing_payload () = 210 let header = Dag_cbor.encode (message_header "#commit") in 211 match Firehose.decode_frame header with 212 | Error (Firehose.Decode_error msg) -> 213 Alcotest.(check bool) "mentions payload" true (String.length msg > 0) 214 | Ok _ -> Alcotest.fail "Expected Decode_error" 215 | Error e -> Alcotest.fail ("Wrong error: " ^ Firehose.error_to_string e) 216 217(** {1 Helper Function Tests} *) 218 219let test_event_seq () = 220 let commit_evt = 221 Firehose.Commit 222 { 223 seq = 123L; 224 repo = "did:plc:test"; 225 rev = "abc"; 226 since = None; 227 commit = test_cid; 228 blocks = ""; 229 ops = []; 230 too_big = false; 231 } 232 in 233 let identity_evt = 234 Firehose.Identity 235 { seq = 456L; did = "did:plc:test"; time = ""; handle = None } 236 in 237 let info_evt = Firehose.Info { name = "test"; message = None } in 238 let stream_error = Firehose.StreamError "test error" in 239 Alcotest.(check (option int64)) 240 "commit seq" (Some 123L) 241 (Firehose.event_seq commit_evt); 242 Alcotest.(check (option int64)) 243 "identity seq" (Some 456L) 244 (Firehose.event_seq identity_evt); 245 Alcotest.(check (option int64)) "info seq" None (Firehose.event_seq info_evt); 246 Alcotest.(check (option int64)) 247 "error seq" None 248 (Firehose.event_seq stream_error) 249 250let test_event_did () = 251 let commit_evt = 252 Firehose.Commit 253 { 254 seq = 0L; 255 repo = "did:plc:repo123"; 256 rev = ""; 257 since = None; 258 commit = test_cid; 259 blocks = ""; 260 ops = []; 261 too_big = false; 262 } 263 in 264 let identity_evt = 265 Firehose.Identity 266 { seq = 0L; did = "did:plc:identity456"; time = ""; handle = None } 267 in 268 let info_evt = Firehose.Info { name = ""; message = None } in 269 Alcotest.(check (option string)) 270 "commit did" (Some "did:plc:repo123") 271 (Firehose.event_did commit_evt); 272 Alcotest.(check (option string)) 273 "identity did" (Some "did:plc:identity456") 274 (Firehose.event_did identity_evt); 275 Alcotest.(check (option string)) "info did" None (Firehose.event_did info_evt) 276 277(** {1 Config Tests} *) 278 279let test_config_no_cursor () = 280 let uri = 281 Uri.of_string "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 282 in 283 let cfg = Firehose.config ~uri () in 284 let built = Firehose.build_uri cfg in 285 Alcotest.(check string) 286 "uri without cursor" 287 "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 288 (Uri.to_string built) 289 290let test_config_with_cursor () = 291 let uri = 292 Uri.of_string "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 293 in 294 let cfg = Firehose.config ~uri ~cursor:12345L () in 295 let built = Firehose.build_uri cfg in 296 Alcotest.(check bool) 297 "uri has cursor param" true 298 (String.length (Uri.to_string built) > 50) 299 300(** {1 Repo_sync Tests} *) 301 302let test_memory_blockstore () = 303 let store = Repo_sync.create_memory_blockstore () in 304 (* Test put and get *) 305 store.put test_cid "test data"; 306 Alcotest.(check (option string)) 307 "get returns data" (Some "test data") (store.get test_cid); 308 (* Test missing block *) 309 let other_cid = 310 match 311 Cid.of_string 312 "bafyreib5uam2ik53lqxqxqxu5ebhxyppafxhgq6ysuvvxe4qjg5ynpz7t4" 313 with 314 | Ok cid -> cid 315 | Error _ -> failwith "Invalid CID" 316 in 317 Alcotest.(check (option string)) 318 "missing block returns None" None (store.get other_cid) 319 320let test_diff_from_commit_event () = 321 let commit_event : Firehose.commit_event = 322 { 323 seq = 12345L; 324 repo = "did:plc:test123"; 325 rev = "3jui7kd2z2y2a"; 326 since = None; 327 commit = test_cid; 328 blocks = ""; 329 ops = 330 [ 331 { 332 action = `Create; 333 path = "app.bsky.feed.post/abc123"; 334 cid = Some test_cid; 335 }; 336 { 337 action = `Update; 338 path = "app.bsky.actor.profile/self"; 339 cid = Some test_cid; 340 }; 341 { action = `Delete; path = "app.bsky.feed.like/xyz789"; cid = None }; 342 ]; 343 too_big = false; 344 } 345 in 346 let diff = Repo_sync.diff_from_commit_event commit_event in 347 Alcotest.(check int) "diff count" 3 (List.length diff); 348 let entry1 = List.nth diff 0 in 349 Alcotest.(check bool) "first is Create" true (entry1.action = Repo_sync.Create); 350 Alcotest.(check string) 351 "first collection" "app.bsky.feed.post" entry1.collection; 352 Alcotest.(check string) "first rkey" "abc123" entry1.rkey; 353 let entry2 = List.nth diff 1 in 354 Alcotest.(check bool) 355 "second is Update" true 356 (entry2.action = Repo_sync.Update); 357 let entry3 = List.nth diff 2 in 358 Alcotest.(check bool) "third is Delete" true (entry3.action = Repo_sync.Delete); 359 Alcotest.(check bool) "third cid is None" true (entry3.cid = None) 360 361let test_sync_state_from_commit_event () = 362 let commit_event : Firehose.commit_event = 363 { 364 seq = 12345L; 365 repo = "did:plc:testdid"; 366 rev = "3jui7kd2z2y2a"; 367 since = None; 368 commit = test_cid; 369 blocks = ""; 370 ops = []; 371 too_big = false; 372 } 373 in 374 let state = Repo_sync.sync_state_from_commit_event commit_event in 375 Alcotest.(check string) "did" "did:plc:testdid" state.did; 376 Alcotest.(check string) "rev" "3jui7kd2z2y2a" state.rev; 377 Alcotest.(check string) 378 "commit cid" (Cid.to_string test_cid) 379 (Cid.to_string state.commit) 380 381let test_load_car_blocks () = 382 (* Create a simple CAR file with one block *) 383 let block_data = Dag_cbor.encode (Dag_cbor.String "hello world") in 384 let block_cid = Cid.of_dag_cbor block_data in 385 let car_data = 386 Car.write ~roots:[ block_cid ] 387 ~blocks:[ { cid = block_cid; data = block_data } ] 388 in 389 let store = Repo_sync.create_memory_blockstore () in 390 match Repo_sync.load_car_blocks store car_data with 391 | Ok roots -> 392 Alcotest.(check int) "one root" 1 (List.length roots); 393 Alcotest.(check string) 394 "root matches" (Cid.to_string block_cid) 395 (Cid.to_string (List.hd roots)); 396 Alcotest.(check (option string)) 397 "block loaded" (Some block_data) (store.get block_cid) 398 | Error e -> Alcotest.fail (Repo_sync.error_to_string e) 399 400let test_load_car_blocks_invalid () = 401 let store = Repo_sync.create_memory_blockstore () in 402 match Repo_sync.load_car_blocks store "not a CAR file" with 403 | Error (Repo_sync.Invalid_car _) -> () 404 | Error e -> Alcotest.fail ("Wrong error: " ^ Repo_sync.error_to_string e) 405 | Ok _ -> Alcotest.fail "Expected Invalid_car error" 406 407let test_parse_commit () = 408 let commit_cbor = 409 Dag_cbor.Map 410 [ 411 ("did", Dag_cbor.String "did:plc:test123"); 412 ("version", Dag_cbor.Int 3L); 413 ("data", Dag_cbor.Link test_cid); 414 ("rev", Dag_cbor.String "3jui7kd2z2y2a"); 415 ] 416 in 417 let data = Dag_cbor.encode commit_cbor in 418 match Repo_sync.parse_commit data with 419 | Ok commit -> 420 Alcotest.(check string) "did" "did:plc:test123" commit.did; 421 Alcotest.(check int) "version" 3 commit.version; 422 Alcotest.(check string) "rev" "3jui7kd2z2y2a" commit.rev; 423 Alcotest.(check bool) "prev is None" true (commit.prev = None) 424 | Error e -> Alcotest.fail (Repo_sync.error_to_string e) 425 426let test_parse_commit_with_prev () = 427 let prev_cid = 428 match 429 Cid.of_string 430 "bafyreib5uam2ik53lqxqxqxu5ebhxyppafxhgq6ysuvvxe4qjg5ynpz7t4" 431 with 432 | Ok cid -> cid 433 | Error _ -> failwith "Invalid CID" 434 in 435 let commit_cbor = 436 Dag_cbor.Map 437 [ 438 ("did", Dag_cbor.String "did:plc:test123"); 439 ("version", Dag_cbor.Int 3L); 440 ("data", Dag_cbor.Link test_cid); 441 ("rev", Dag_cbor.String "3jui7kd2z2y2a"); 442 ("prev", Dag_cbor.Link prev_cid); 443 ] 444 in 445 let data = Dag_cbor.encode commit_cbor in 446 match Repo_sync.parse_commit data with 447 | Ok commit -> Alcotest.(check bool) "prev is Some" true (commit.prev <> None) 448 | Error e -> Alcotest.fail (Repo_sync.error_to_string e) 449 450let test_parse_commit_invalid () = 451 (* Missing required fields *) 452 let invalid_cbor = Dag_cbor.Map [ ("did", Dag_cbor.String "did:plc:test") ] in 453 let data = Dag_cbor.encode invalid_cbor in 454 match Repo_sync.parse_commit data with 455 | Error (Repo_sync.Invalid_commit _) -> () 456 | Error e -> Alcotest.fail ("Wrong error: " ^ Repo_sync.error_to_string e) 457 | Ok _ -> Alcotest.fail "Expected Invalid_commit error" 458 459let test_cursor_roundtrip () = 460 let cursor_str = "12345" in 461 match Repo_sync.cursor_of_string cursor_str with 462 | Some cursor -> 463 Alcotest.(check int64) "cursor seq" 12345L cursor.seq; 464 Alcotest.(check string) 465 "cursor to string" "12345" 466 (Repo_sync.cursor_to_string cursor) 467 | None -> Alcotest.fail "Failed to parse cursor" 468 469let test_cursor_invalid () = 470 match Repo_sync.cursor_of_string "not a number" with 471 | None -> () 472 | Some _ -> Alcotest.fail "Expected None for invalid cursor" 473 474let test_cursor_of_event () = 475 let commit_event = 476 Firehose.Commit 477 { 478 seq = 99999L; 479 repo = "did:plc:test"; 480 rev = "abc"; 481 since = None; 482 commit = test_cid; 483 blocks = ""; 484 ops = []; 485 too_big = false; 486 } 487 in 488 match Repo_sync.cursor_of_event commit_event with 489 | Some cursor -> Alcotest.(check int64) "cursor seq" 99999L cursor.seq 490 | None -> Alcotest.fail "Expected cursor from commit event" 491 492let test_cursor_of_event_no_seq () = 493 let info_event = Firehose.Info { name = "test"; message = None } in 494 match Repo_sync.cursor_of_event info_event with 495 | None -> () 496 | Some _ -> Alcotest.fail "Expected None for event without seq" 497 498let test_apply_diff_create () = 499 let store = Repo_sync.create_memory_blockstore () in 500 let record_data = Dag_cbor.encode (Dag_cbor.String "record content") in 501 let record_cid = Cid.of_dag_cbor record_data in 502 store.put record_cid record_data; 503 let diff = 504 [ 505 { 506 Repo_sync.action = Repo_sync.Create; 507 collection = "app.bsky.feed.post"; 508 rkey = "abc123"; 509 cid = Some record_cid; 510 }; 511 ] 512 in 513 let received = ref [] in 514 let on_record entry data = received := (entry, data) :: !received in 515 let result = Repo_sync.apply_diff ~store ~on_record diff in 516 Alcotest.(check int) "applied" 1 result.applied; 517 Alcotest.(check int) "skipped" 0 result.skipped; 518 Alcotest.(check int) "errors" 0 (List.length result.errors); 519 Alcotest.(check int) "received callbacks" 1 (List.length !received) 520 521let test_apply_diff_delete () = 522 let store = Repo_sync.create_memory_blockstore () in 523 let diff = 524 [ 525 { 526 Repo_sync.action = Repo_sync.Delete; 527 collection = "app.bsky.feed.post"; 528 rkey = "abc123"; 529 cid = None; 530 }; 531 ] 532 in 533 let received = ref [] in 534 let on_record entry data = received := (entry, data) :: !received in 535 let result = Repo_sync.apply_diff ~store ~on_record diff in 536 Alcotest.(check int) "applied" 1 result.applied; 537 Alcotest.(check int) "skipped" 0 result.skipped; 538 (* Verify callback received None for data *) 539 match !received with 540 | [ (_, None) ] -> () 541 | _ -> Alcotest.fail "Expected delete callback with None data" 542 543let test_apply_diff_missing_block () = 544 let store = Repo_sync.create_memory_blockstore () in 545 (* Create a CID but don't add the block to store *) 546 let diff = 547 [ 548 { 549 Repo_sync.action = Repo_sync.Create; 550 collection = "app.bsky.feed.post"; 551 rkey = "abc123"; 552 cid = Some test_cid; 553 }; 554 ] 555 in 556 let received = ref [] in 557 let on_record entry data = received := (entry, data) :: !received in 558 let result = Repo_sync.apply_diff ~store ~on_record diff in 559 Alcotest.(check int) "applied" 0 result.applied; 560 Alcotest.(check int) "skipped" 1 result.skipped; 561 Alcotest.(check int) "errors" 1 (List.length result.errors) 562 563let test_process_commit_event () = 564 (* Create a CAR file with a block *) 565 let record_data = Dag_cbor.encode (Dag_cbor.String "post content") in 566 let record_cid = Cid.of_dag_cbor record_data in 567 let car_data = 568 Car.write ~roots:[ record_cid ] 569 ~blocks:[ { cid = record_cid; data = record_data } ] 570 in 571 let commit_event : Firehose.commit_event = 572 { 573 seq = 12345L; 574 repo = "did:plc:test123"; 575 rev = "3jui7kd2z2y2a"; 576 since = None; 577 commit = test_cid; 578 blocks = car_data; 579 ops = 580 [ 581 { 582 action = `Create; 583 path = "app.bsky.feed.post/abc123"; 584 cid = Some record_cid; 585 }; 586 ]; 587 too_big = false; 588 } 589 in 590 let store = Repo_sync.create_memory_blockstore () in 591 match Repo_sync.process_commit_event ~store commit_event with 592 | Ok diff -> 593 Alcotest.(check int) "diff count" 1 (List.length diff); 594 (* Verify block was loaded *) 595 Alcotest.(check (option string)) 596 "block loaded" (Some record_data) (store.get record_cid) 597 | Error e -> Alcotest.fail (Repo_sync.error_to_string e) 598 599let test_error_to_string () = 600 let errors = 601 [ 602 Repo_sync.Parse_error "test"; 603 Repo_sync.Invalid_car "bad car"; 604 Repo_sync.Missing_block test_cid; 605 Repo_sync.Invalid_commit "bad commit"; 606 Repo_sync.Sync_error "sync failed"; 607 ] 608 in 609 List.iter 610 (fun e -> 611 let s = Repo_sync.error_to_string e in 612 Alcotest.(check bool) "error string not empty" true (String.length s > 0)) 613 errors 614 615(** {1 Commit Proof Fixture Tests} *) 616 617(** Load commit-proof-fixtures.json *) 618let load_commit_proof_fixtures () = 619 (* During tests, the working directory is _build/default/test/sync *) 620 let paths = 621 [ 622 "../../../../test/fixtures/firehose/commit-proof-fixtures.json"; 623 "../../../test/fixtures/firehose/commit-proof-fixtures.json"; 624 "../../test/fixtures/firehose/commit-proof-fixtures.json"; 625 "test/fixtures/firehose/commit-proof-fixtures.json"; 626 ] 627 in 628 let rec try_paths = function 629 | [] -> failwith "Could not find commit-proof-fixtures.json" 630 | path :: rest -> 631 if Sys.file_exists path then ( 632 let ic = open_in path in 633 let content = really_input_string ic (in_channel_length ic) in 634 close_in ic; 635 content) 636 else try_paths rest 637 in 638 match Atproto_json.decode (try_paths paths) with 639 | Ok json -> ( 640 match Atproto_json.to_array_opt json with 641 | Some fixtures -> fixtures 642 | None -> failwith "Expected array of fixtures") 643 | Error e -> failwith ("JSON parse error: " ^ e) 644 645(** Extract string from JSON *) 646let json_string json = 647 match Atproto_json.to_string_opt json with 648 | Some s -> s 649 | None -> failwith "Expected string" 650 651(** Extract string list from JSON *) 652let json_string_list json = 653 match Atproto_json.to_array_opt json with 654 | Some items -> List.map json_string items 655 | None -> failwith "Expected array of strings" 656 657(** Get field from JSON object *) 658let json_field name json = 659 match Atproto_json.to_object_opt json with 660 | Some pairs -> ( 661 match Atproto_json.get name pairs with 662 | Some v -> v 663 | None -> failwith ("Expected object with field " ^ name)) 664 | None -> failwith ("Expected object with field " ^ name) 665 666module Mst = Atproto_mst 667 668(** Test a single commit-proof fixture *) 669let test_commit_proof_fixture fixture () = 670 let comment = json_string (json_field "comment" fixture) in 671 let leaf_value_str = json_string (json_field "leafValue" fixture) in 672 let keys = json_string_list (json_field "keys" fixture) in 673 let adds = json_string_list (json_field "adds" fixture) in 674 let dels = json_string_list (json_field "dels" fixture) in 675 let root_before_str = json_string (json_field "rootBeforeCommit" fixture) in 676 let root_after_str = json_string (json_field "rootAfterCommit" fixture) in 677 678 (* Parse the leaf value CID - all values in the MST point to this *) 679 let leaf_value = 680 match Cid.of_string leaf_value_str with 681 | Ok cid -> cid 682 | Error e -> failwith ("Invalid leaf value CID: " ^ Cid.error_to_string e) 683 in 684 685 (* Parse expected root CIDs *) 686 let expected_root_before = 687 match Cid.of_string root_before_str with 688 | Ok cid -> cid 689 | Error e -> 690 failwith ("Invalid rootBeforeCommit CID: " ^ Cid.error_to_string e) 691 in 692 let expected_root_after = 693 match Cid.of_string root_after_str with 694 | Ok cid -> cid 695 | Error e -> 696 failwith ("Invalid rootAfterCommit CID: " ^ Cid.error_to_string e) 697 in 698 699 (* Create blockstore and MST module *) 700 let store = Mst.Memory_blockstore.create () in 701 let module M = Mst.Make (Mst.Memory_blockstore) in 702 (* Build initial MST from keys *) 703 let entries = List.map (fun k -> (k, leaf_value)) keys in 704 let root_before = M.of_entries store entries in 705 706 (* Verify root before commit *) 707 Alcotest.(check string) 708 (Printf.sprintf "[%s] rootBeforeCommit" comment) 709 (Cid.to_string expected_root_before) 710 (Cid.to_string root_before); 711 712 (* Apply adds *) 713 let root_with_adds = 714 List.fold_left 715 (fun root key -> M.add store root key leaf_value) 716 root_before adds 717 in 718 719 (* Apply deletes *) 720 let root_after = 721 List.fold_left (fun root key -> M.delete store root key) root_with_adds dels 722 in 723 724 (* Verify root after commit *) 725 Alcotest.(check string) 726 (Printf.sprintf "[%s] rootAfterCommit" comment) 727 (Cid.to_string expected_root_after) 728 (Cid.to_string root_after) 729 730(** Generate test cases from fixtures *) 731let commit_proof_tests () = 732 let fixtures = load_commit_proof_fixtures () in 733 List.mapi 734 (fun i fixture -> 735 let comment = 736 try json_string (json_field "comment" fixture) 737 with _ -> Printf.sprintf "fixture %d" i 738 in 739 Alcotest.test_case comment `Quick (test_commit_proof_fixture fixture)) 740 fixtures 741 742(** {1 Test Runner} *) 743 744let () = 745 Alcotest.run "Sync" 746 [ 747 ( "frame_decoding", 748 [ 749 Alcotest.test_case "decode commit event" `Quick 750 test_decode_commit_event; 751 Alcotest.test_case "decode identity event" `Quick 752 test_decode_identity_event; 753 Alcotest.test_case "decode identity (no handle)" `Quick 754 test_decode_identity_event_no_handle; 755 Alcotest.test_case "decode account event" `Quick 756 test_decode_account_event; 757 Alcotest.test_case "decode handle event" `Quick 758 test_decode_handle_event; 759 Alcotest.test_case "decode tombstone event" `Quick 760 test_decode_tombstone_event; 761 Alcotest.test_case "decode info event" `Quick test_decode_info_event; 762 Alcotest.test_case "decode stream error" `Quick 763 test_decode_stream_error; 764 Alcotest.test_case "decode unknown event type" `Quick 765 test_decode_unknown_event_type; 766 Alcotest.test_case "decode invalid cbor" `Quick 767 test_decode_invalid_cbor; 768 Alcotest.test_case "decode missing payload" `Quick 769 test_decode_missing_payload; 770 ] ); 771 ( "helpers", 772 [ 773 Alcotest.test_case "event_seq" `Quick test_event_seq; 774 Alcotest.test_case "event_did" `Quick test_event_did; 775 ] ); 776 ( "config", 777 [ 778 Alcotest.test_case "config no cursor" `Quick test_config_no_cursor; 779 Alcotest.test_case "config with cursor" `Quick test_config_with_cursor; 780 ] ); 781 ( "repo_sync_blockstore", 782 [ 783 Alcotest.test_case "memory blockstore" `Quick test_memory_blockstore; 784 Alcotest.test_case "load car blocks" `Quick test_load_car_blocks; 785 Alcotest.test_case "load car blocks invalid" `Quick 786 test_load_car_blocks_invalid; 787 ] ); 788 ( "repo_sync_diff", 789 [ 790 Alcotest.test_case "diff from commit event" `Quick 791 test_diff_from_commit_event; 792 Alcotest.test_case "sync state from commit event" `Quick 793 test_sync_state_from_commit_event; 794 ] ); 795 ( "repo_sync_commit", 796 [ 797 Alcotest.test_case "parse commit" `Quick test_parse_commit; 798 Alcotest.test_case "parse commit with prev" `Quick 799 test_parse_commit_with_prev; 800 Alcotest.test_case "parse commit invalid" `Quick 801 test_parse_commit_invalid; 802 ] ); 803 ( "repo_sync_cursor", 804 [ 805 Alcotest.test_case "cursor roundtrip" `Quick test_cursor_roundtrip; 806 Alcotest.test_case "cursor invalid" `Quick test_cursor_invalid; 807 Alcotest.test_case "cursor of event" `Quick test_cursor_of_event; 808 Alcotest.test_case "cursor of event no seq" `Quick 809 test_cursor_of_event_no_seq; 810 ] ); 811 ( "repo_sync_apply", 812 [ 813 Alcotest.test_case "apply diff create" `Quick test_apply_diff_create; 814 Alcotest.test_case "apply diff delete" `Quick test_apply_diff_delete; 815 Alcotest.test_case "apply diff missing block" `Quick 816 test_apply_diff_missing_block; 817 Alcotest.test_case "process commit event" `Quick 818 test_process_commit_event; 819 ] ); 820 ( "repo_sync_errors", 821 [ Alcotest.test_case "error to string" `Quick test_error_to_string ] ); 822 ("commit_proof_fixtures", commit_proof_tests ()); 823 ]