atproto libraries implementation in ocaml
1(** Tests for Firehose (Event Stream) Client and Repository Sync *)
2
3open Atproto_sync
4open Atproto_ipld
5module Repo_sync = Atproto_sync.Repo_sync
6
7(** {1 Test Helpers} *)
8
9(** Build a frame from header and payload CBOR values *)
10let make_frame header_cbor payload_cbor =
11 let header_bytes = Dag_cbor.encode header_cbor in
12 let payload_bytes = Dag_cbor.encode payload_cbor in
13 header_bytes ^ payload_bytes
14
15(** Build a message frame header *)
16let message_header event_type =
17 Dag_cbor.Map [ ("op", Dag_cbor.Int 1L); ("t", Dag_cbor.String event_type) ]
18
19(** Build an error frame header *)
20let error_header () = Dag_cbor.Map [ ("op", Dag_cbor.Int (-1L)) ]
21
22(** Create a test CID *)
23let test_cid =
24 (* A valid CID for testing - CIDv1, dag-cbor, sha2-256 *)
25 match
26 Cid.of_string "bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454"
27 with
28 | Ok cid -> cid
29 | Error _ -> failwith "Invalid test CID"
30
31(** {1 Frame Decoding Tests} *)
32
33let test_decode_commit_event () =
34 let payload =
35 Dag_cbor.Map
36 [
37 ("seq", Dag_cbor.Int 12345L);
38 ("repo", Dag_cbor.String "did:plc:test123");
39 ("rev", Dag_cbor.String "3jui7kd2z2y2a");
40 ("since", Dag_cbor.String "3jui7kd2z2y2b");
41 ("commit", Dag_cbor.Link test_cid);
42 ("blocks", Dag_cbor.Bytes "\x00\x01\x02\x03");
43 ( "ops",
44 Dag_cbor.Array
45 [
46 Dag_cbor.Map
47 [
48 ("action", Dag_cbor.String "create");
49 ("path", Dag_cbor.String "app.bsky.feed.post/abc123");
50 ("cid", Dag_cbor.Link test_cid);
51 ];
52 ] );
53 ("tooBig", Dag_cbor.Bool false);
54 ]
55 in
56 let frame = make_frame (message_header "#commit") payload in
57 match Firehose.decode_frame frame with
58 | Ok (Firehose.Commit evt) ->
59 Alcotest.(check int64) "seq" 12345L evt.seq;
60 Alcotest.(check string) "repo" "did:plc:test123" evt.repo;
61 Alcotest.(check string) "rev" "3jui7kd2z2y2a" evt.rev;
62 Alcotest.(check (option string)) "since" (Some "3jui7kd2z2y2b") evt.since;
63 Alcotest.(check string) "blocks" "\x00\x01\x02\x03" evt.blocks;
64 Alcotest.(check int) "ops count" 1 (List.length evt.ops);
65 let op = List.hd evt.ops in
66 Alcotest.(check bool) "action is create" true (op.action = `Create);
67 Alcotest.(check string) "op path" "app.bsky.feed.post/abc123" op.path;
68 Alcotest.(check bool) "too_big" false evt.too_big
69 | Ok _ -> Alcotest.fail "Expected Commit event"
70 | Error e -> Alcotest.fail (Firehose.error_to_string e)
71
72let test_decode_identity_event () =
73 let payload =
74 Dag_cbor.Map
75 [
76 ("seq", Dag_cbor.Int 99L);
77 ("did", Dag_cbor.String "did:plc:user123");
78 ("time", Dag_cbor.String "2024-01-15T10:30:00Z");
79 ("handle", Dag_cbor.String "alice.bsky.social");
80 ]
81 in
82 let frame = make_frame (message_header "#identity") payload in
83 match Firehose.decode_frame frame with
84 | Ok (Firehose.Identity evt) ->
85 Alcotest.(check int64) "seq" 99L evt.seq;
86 Alcotest.(check string) "did" "did:plc:user123" evt.did;
87 Alcotest.(check string) "time" "2024-01-15T10:30:00Z" evt.time;
88 Alcotest.(check (option string))
89 "handle" (Some "alice.bsky.social") evt.handle
90 | Ok _ -> Alcotest.fail "Expected Identity event"
91 | Error e -> Alcotest.fail (Firehose.error_to_string e)
92
93let test_decode_identity_event_no_handle () =
94 let payload =
95 Dag_cbor.Map
96 [
97 ("seq", Dag_cbor.Int 100L);
98 ("did", Dag_cbor.String "did:plc:user456");
99 ("time", Dag_cbor.String "2024-01-15T11:00:00Z");
100 ]
101 in
102 let frame = make_frame (message_header "#identity") payload in
103 match Firehose.decode_frame frame with
104 | Ok (Firehose.Identity evt) ->
105 Alcotest.(check int64) "seq" 100L evt.seq;
106 Alcotest.(check (option string)) "handle" None evt.handle
107 | Ok _ -> Alcotest.fail "Expected Identity event"
108 | Error e -> Alcotest.fail (Firehose.error_to_string e)
109
110let test_decode_account_event () =
111 let payload =
112 Dag_cbor.Map
113 [
114 ("seq", Dag_cbor.Int 200L);
115 ("did", Dag_cbor.String "did:plc:account123");
116 ("time", Dag_cbor.String "2024-01-15T12:00:00Z");
117 ("active", Dag_cbor.Bool true);
118 ("status", Dag_cbor.String "active");
119 ]
120 in
121 let frame = make_frame (message_header "#account") payload in
122 match Firehose.decode_frame frame with
123 | Ok (Firehose.Account evt) ->
124 Alcotest.(check int64) "seq" 200L evt.seq;
125 Alcotest.(check string) "did" "did:plc:account123" evt.did;
126 Alcotest.(check bool) "active" true evt.active;
127 Alcotest.(check (option string)) "status" (Some "active") evt.status
128 | Ok _ -> Alcotest.fail "Expected Account event"
129 | Error e -> Alcotest.fail (Firehose.error_to_string e)
130
131let test_decode_handle_event () =
132 let payload =
133 Dag_cbor.Map
134 [
135 ("seq", Dag_cbor.Int 300L);
136 ("did", Dag_cbor.String "did:plc:handle123");
137 ("time", Dag_cbor.String "2024-01-15T13:00:00Z");
138 ("handle", Dag_cbor.String "newhandle.bsky.social");
139 ]
140 in
141 let frame = make_frame (message_header "#handle") payload in
142 match Firehose.decode_frame frame with
143 | Ok (Firehose.Handle evt) ->
144 Alcotest.(check int64) "seq" 300L evt.seq;
145 Alcotest.(check string) "did" "did:plc:handle123" evt.did;
146 Alcotest.(check string) "handle" "newhandle.bsky.social" evt.handle
147 | Ok _ -> Alcotest.fail "Expected Handle event"
148 | Error e -> Alcotest.fail (Firehose.error_to_string e)
149
150let test_decode_tombstone_event () =
151 let payload =
152 Dag_cbor.Map
153 [
154 ("seq", Dag_cbor.Int 400L);
155 ("did", Dag_cbor.String "did:plc:deleted123");
156 ("time", Dag_cbor.String "2024-01-15T14:00:00Z");
157 ]
158 in
159 let frame = make_frame (message_header "#tombstone") payload in
160 match Firehose.decode_frame frame with
161 | Ok (Firehose.Tombstone evt) ->
162 Alcotest.(check int64) "seq" 400L evt.seq;
163 Alcotest.(check string) "did" "did:plc:deleted123" evt.did;
164 Alcotest.(check string) "time" "2024-01-15T14:00:00Z" evt.time
165 | Ok _ -> Alcotest.fail "Expected Tombstone event"
166 | Error e -> Alcotest.fail (Firehose.error_to_string e)
167
168let test_decode_info_event () =
169 let payload =
170 Dag_cbor.Map
171 [
172 ("name", Dag_cbor.String "OutdatedCursor");
173 ("message", Dag_cbor.String "Cursor is outdated");
174 ]
175 in
176 let frame = make_frame (message_header "#info") payload in
177 match Firehose.decode_frame frame with
178 | Ok (Firehose.Info msg) ->
179 Alcotest.(check string) "name" "OutdatedCursor" msg.name;
180 Alcotest.(check (option string))
181 "message" (Some "Cursor is outdated") msg.message
182 | Ok _ -> Alcotest.fail "Expected Info event"
183 | Error e -> Alcotest.fail (Firehose.error_to_string e)
184
185let test_decode_stream_error () =
186 let payload = Dag_cbor.Map [ ("error", Dag_cbor.String "FutureCursor") ] in
187 let frame = make_frame (error_header ()) payload in
188 match Firehose.decode_frame frame with
189 | Ok (Firehose.StreamError msg) ->
190 Alcotest.(check string) "error" "FutureCursor" msg
191 | Ok _ -> Alcotest.fail "Expected StreamError event"
192 | Error e -> Alcotest.fail (Firehose.error_to_string e)
193
194let test_decode_unknown_event_type () =
195 let payload = Dag_cbor.Map [ ("foo", Dag_cbor.String "bar") ] in
196 let frame = make_frame (message_header "#unknown") payload in
197 match Firehose.decode_frame frame with
198 | Error (Firehose.Protocol_error msg) ->
199 Alcotest.(check bool) "contains unknown type" true (String.length msg > 0)
200 | Ok _ -> Alcotest.fail "Expected Protocol_error"
201 | Error e -> Alcotest.fail ("Wrong error: " ^ Firehose.error_to_string e)
202
203let test_decode_invalid_cbor () =
204 match Firehose.decode_frame "not valid cbor" with
205 | Error (Firehose.Decode_error _) -> ()
206 | Ok _ -> Alcotest.fail "Expected Decode_error"
207 | Error e -> Alcotest.fail ("Wrong error: " ^ Firehose.error_to_string e)
208
209let test_decode_missing_payload () =
210 let header = Dag_cbor.encode (message_header "#commit") in
211 match Firehose.decode_frame header with
212 | Error (Firehose.Decode_error msg) ->
213 Alcotest.(check bool) "mentions payload" true (String.length msg > 0)
214 | Ok _ -> Alcotest.fail "Expected Decode_error"
215 | Error e -> Alcotest.fail ("Wrong error: " ^ Firehose.error_to_string e)
216
217(** {1 Helper Function Tests} *)
218
219let test_event_seq () =
220 let commit_evt =
221 Firehose.Commit
222 {
223 seq = 123L;
224 repo = "did:plc:test";
225 rev = "abc";
226 since = None;
227 commit = test_cid;
228 blocks = "";
229 ops = [];
230 too_big = false;
231 }
232 in
233 let identity_evt =
234 Firehose.Identity
235 { seq = 456L; did = "did:plc:test"; time = ""; handle = None }
236 in
237 let info_evt = Firehose.Info { name = "test"; message = None } in
238 let stream_error = Firehose.StreamError "test error" in
239 Alcotest.(check (option int64))
240 "commit seq" (Some 123L)
241 (Firehose.event_seq commit_evt);
242 Alcotest.(check (option int64))
243 "identity seq" (Some 456L)
244 (Firehose.event_seq identity_evt);
245 Alcotest.(check (option int64)) "info seq" None (Firehose.event_seq info_evt);
246 Alcotest.(check (option int64))
247 "error seq" None
248 (Firehose.event_seq stream_error)
249
250let test_event_did () =
251 let commit_evt =
252 Firehose.Commit
253 {
254 seq = 0L;
255 repo = "did:plc:repo123";
256 rev = "";
257 since = None;
258 commit = test_cid;
259 blocks = "";
260 ops = [];
261 too_big = false;
262 }
263 in
264 let identity_evt =
265 Firehose.Identity
266 { seq = 0L; did = "did:plc:identity456"; time = ""; handle = None }
267 in
268 let info_evt = Firehose.Info { name = ""; message = None } in
269 Alcotest.(check (option string))
270 "commit did" (Some "did:plc:repo123")
271 (Firehose.event_did commit_evt);
272 Alcotest.(check (option string))
273 "identity did" (Some "did:plc:identity456")
274 (Firehose.event_did identity_evt);
275 Alcotest.(check (option string)) "info did" None (Firehose.event_did info_evt)
276
277(** {1 Config Tests} *)
278
279let test_config_no_cursor () =
280 let uri =
281 Uri.of_string "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
282 in
283 let cfg = Firehose.config ~uri () in
284 let built = Firehose.build_uri cfg in
285 Alcotest.(check string)
286 "uri without cursor"
287 "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
288 (Uri.to_string built)
289
290let test_config_with_cursor () =
291 let uri =
292 Uri.of_string "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
293 in
294 let cfg = Firehose.config ~uri ~cursor:12345L () in
295 let built = Firehose.build_uri cfg in
296 Alcotest.(check bool)
297 "uri has cursor param" true
298 (String.length (Uri.to_string built) > 50)
299
300(** {1 Repo_sync Tests} *)
301
302let test_memory_blockstore () =
303 let store = Repo_sync.create_memory_blockstore () in
304 (* Test put and get *)
305 store.put test_cid "test data";
306 Alcotest.(check (option string))
307 "get returns data" (Some "test data") (store.get test_cid);
308 (* Test missing block *)
309 let other_cid =
310 match
311 Cid.of_string
312 "bafyreib5uam2ik53lqxqxqxu5ebhxyppafxhgq6ysuvvxe4qjg5ynpz7t4"
313 with
314 | Ok cid -> cid
315 | Error _ -> failwith "Invalid CID"
316 in
317 Alcotest.(check (option string))
318 "missing block returns None" None (store.get other_cid)
319
320let test_diff_from_commit_event () =
321 let commit_event : Firehose.commit_event =
322 {
323 seq = 12345L;
324 repo = "did:plc:test123";
325 rev = "3jui7kd2z2y2a";
326 since = None;
327 commit = test_cid;
328 blocks = "";
329 ops =
330 [
331 {
332 action = `Create;
333 path = "app.bsky.feed.post/abc123";
334 cid = Some test_cid;
335 };
336 {
337 action = `Update;
338 path = "app.bsky.actor.profile/self";
339 cid = Some test_cid;
340 };
341 { action = `Delete; path = "app.bsky.feed.like/xyz789"; cid = None };
342 ];
343 too_big = false;
344 }
345 in
346 let diff = Repo_sync.diff_from_commit_event commit_event in
347 Alcotest.(check int) "diff count" 3 (List.length diff);
348 let entry1 = List.nth diff 0 in
349 Alcotest.(check bool) "first is Create" true (entry1.action = Repo_sync.Create);
350 Alcotest.(check string)
351 "first collection" "app.bsky.feed.post" entry1.collection;
352 Alcotest.(check string) "first rkey" "abc123" entry1.rkey;
353 let entry2 = List.nth diff 1 in
354 Alcotest.(check bool)
355 "second is Update" true
356 (entry2.action = Repo_sync.Update);
357 let entry3 = List.nth diff 2 in
358 Alcotest.(check bool) "third is Delete" true (entry3.action = Repo_sync.Delete);
359 Alcotest.(check bool) "third cid is None" true (entry3.cid = None)
360
361let test_sync_state_from_commit_event () =
362 let commit_event : Firehose.commit_event =
363 {
364 seq = 12345L;
365 repo = "did:plc:testdid";
366 rev = "3jui7kd2z2y2a";
367 since = None;
368 commit = test_cid;
369 blocks = "";
370 ops = [];
371 too_big = false;
372 }
373 in
374 let state = Repo_sync.sync_state_from_commit_event commit_event in
375 Alcotest.(check string) "did" "did:plc:testdid" state.did;
376 Alcotest.(check string) "rev" "3jui7kd2z2y2a" state.rev;
377 Alcotest.(check string)
378 "commit cid" (Cid.to_string test_cid)
379 (Cid.to_string state.commit)
380
381let test_load_car_blocks () =
382 (* Create a simple CAR file with one block *)
383 let block_data = Dag_cbor.encode (Dag_cbor.String "hello world") in
384 let block_cid = Cid.of_dag_cbor block_data in
385 let car_data =
386 Car.write ~roots:[ block_cid ]
387 ~blocks:[ { cid = block_cid; data = block_data } ]
388 in
389 let store = Repo_sync.create_memory_blockstore () in
390 match Repo_sync.load_car_blocks store car_data with
391 | Ok roots ->
392 Alcotest.(check int) "one root" 1 (List.length roots);
393 Alcotest.(check string)
394 "root matches" (Cid.to_string block_cid)
395 (Cid.to_string (List.hd roots));
396 Alcotest.(check (option string))
397 "block loaded" (Some block_data) (store.get block_cid)
398 | Error e -> Alcotest.fail (Repo_sync.error_to_string e)
399
400let test_load_car_blocks_invalid () =
401 let store = Repo_sync.create_memory_blockstore () in
402 match Repo_sync.load_car_blocks store "not a CAR file" with
403 | Error (Repo_sync.Invalid_car _) -> ()
404 | Error e -> Alcotest.fail ("Wrong error: " ^ Repo_sync.error_to_string e)
405 | Ok _ -> Alcotest.fail "Expected Invalid_car error"
406
407let test_parse_commit () =
408 let commit_cbor =
409 Dag_cbor.Map
410 [
411 ("did", Dag_cbor.String "did:plc:test123");
412 ("version", Dag_cbor.Int 3L);
413 ("data", Dag_cbor.Link test_cid);
414 ("rev", Dag_cbor.String "3jui7kd2z2y2a");
415 ]
416 in
417 let data = Dag_cbor.encode commit_cbor in
418 match Repo_sync.parse_commit data with
419 | Ok commit ->
420 Alcotest.(check string) "did" "did:plc:test123" commit.did;
421 Alcotest.(check int) "version" 3 commit.version;
422 Alcotest.(check string) "rev" "3jui7kd2z2y2a" commit.rev;
423 Alcotest.(check bool) "prev is None" true (commit.prev = None)
424 | Error e -> Alcotest.fail (Repo_sync.error_to_string e)
425
426let test_parse_commit_with_prev () =
427 let prev_cid =
428 match
429 Cid.of_string
430 "bafyreib5uam2ik53lqxqxqxu5ebhxyppafxhgq6ysuvvxe4qjg5ynpz7t4"
431 with
432 | Ok cid -> cid
433 | Error _ -> failwith "Invalid CID"
434 in
435 let commit_cbor =
436 Dag_cbor.Map
437 [
438 ("did", Dag_cbor.String "did:plc:test123");
439 ("version", Dag_cbor.Int 3L);
440 ("data", Dag_cbor.Link test_cid);
441 ("rev", Dag_cbor.String "3jui7kd2z2y2a");
442 ("prev", Dag_cbor.Link prev_cid);
443 ]
444 in
445 let data = Dag_cbor.encode commit_cbor in
446 match Repo_sync.parse_commit data with
447 | Ok commit -> Alcotest.(check bool) "prev is Some" true (commit.prev <> None)
448 | Error e -> Alcotest.fail (Repo_sync.error_to_string e)
449
450let test_parse_commit_invalid () =
451 (* Missing required fields *)
452 let invalid_cbor = Dag_cbor.Map [ ("did", Dag_cbor.String "did:plc:test") ] in
453 let data = Dag_cbor.encode invalid_cbor in
454 match Repo_sync.parse_commit data with
455 | Error (Repo_sync.Invalid_commit _) -> ()
456 | Error e -> Alcotest.fail ("Wrong error: " ^ Repo_sync.error_to_string e)
457 | Ok _ -> Alcotest.fail "Expected Invalid_commit error"
458
459let test_cursor_roundtrip () =
460 let cursor_str = "12345" in
461 match Repo_sync.cursor_of_string cursor_str with
462 | Some cursor ->
463 Alcotest.(check int64) "cursor seq" 12345L cursor.seq;
464 Alcotest.(check string)
465 "cursor to string" "12345"
466 (Repo_sync.cursor_to_string cursor)
467 | None -> Alcotest.fail "Failed to parse cursor"
468
469let test_cursor_invalid () =
470 match Repo_sync.cursor_of_string "not a number" with
471 | None -> ()
472 | Some _ -> Alcotest.fail "Expected None for invalid cursor"
473
474let test_cursor_of_event () =
475 let commit_event =
476 Firehose.Commit
477 {
478 seq = 99999L;
479 repo = "did:plc:test";
480 rev = "abc";
481 since = None;
482 commit = test_cid;
483 blocks = "";
484 ops = [];
485 too_big = false;
486 }
487 in
488 match Repo_sync.cursor_of_event commit_event with
489 | Some cursor -> Alcotest.(check int64) "cursor seq" 99999L cursor.seq
490 | None -> Alcotest.fail "Expected cursor from commit event"
491
492let test_cursor_of_event_no_seq () =
493 let info_event = Firehose.Info { name = "test"; message = None } in
494 match Repo_sync.cursor_of_event info_event with
495 | None -> ()
496 | Some _ -> Alcotest.fail "Expected None for event without seq"
497
498let test_apply_diff_create () =
499 let store = Repo_sync.create_memory_blockstore () in
500 let record_data = Dag_cbor.encode (Dag_cbor.String "record content") in
501 let record_cid = Cid.of_dag_cbor record_data in
502 store.put record_cid record_data;
503 let diff =
504 [
505 {
506 Repo_sync.action = Repo_sync.Create;
507 collection = "app.bsky.feed.post";
508 rkey = "abc123";
509 cid = Some record_cid;
510 };
511 ]
512 in
513 let received = ref [] in
514 let on_record entry data = received := (entry, data) :: !received in
515 let result = Repo_sync.apply_diff ~store ~on_record diff in
516 Alcotest.(check int) "applied" 1 result.applied;
517 Alcotest.(check int) "skipped" 0 result.skipped;
518 Alcotest.(check int) "errors" 0 (List.length result.errors);
519 Alcotest.(check int) "received callbacks" 1 (List.length !received)
520
521let test_apply_diff_delete () =
522 let store = Repo_sync.create_memory_blockstore () in
523 let diff =
524 [
525 {
526 Repo_sync.action = Repo_sync.Delete;
527 collection = "app.bsky.feed.post";
528 rkey = "abc123";
529 cid = None;
530 };
531 ]
532 in
533 let received = ref [] in
534 let on_record entry data = received := (entry, data) :: !received in
535 let result = Repo_sync.apply_diff ~store ~on_record diff in
536 Alcotest.(check int) "applied" 1 result.applied;
537 Alcotest.(check int) "skipped" 0 result.skipped;
538 (* Verify callback received None for data *)
539 match !received with
540 | [ (_, None) ] -> ()
541 | _ -> Alcotest.fail "Expected delete callback with None data"
542
543let test_apply_diff_missing_block () =
544 let store = Repo_sync.create_memory_blockstore () in
545 (* Create a CID but don't add the block to store *)
546 let diff =
547 [
548 {
549 Repo_sync.action = Repo_sync.Create;
550 collection = "app.bsky.feed.post";
551 rkey = "abc123";
552 cid = Some test_cid;
553 };
554 ]
555 in
556 let received = ref [] in
557 let on_record entry data = received := (entry, data) :: !received in
558 let result = Repo_sync.apply_diff ~store ~on_record diff in
559 Alcotest.(check int) "applied" 0 result.applied;
560 Alcotest.(check int) "skipped" 1 result.skipped;
561 Alcotest.(check int) "errors" 1 (List.length result.errors)
562
563let test_process_commit_event () =
564 (* Create a CAR file with a block *)
565 let record_data = Dag_cbor.encode (Dag_cbor.String "post content") in
566 let record_cid = Cid.of_dag_cbor record_data in
567 let car_data =
568 Car.write ~roots:[ record_cid ]
569 ~blocks:[ { cid = record_cid; data = record_data } ]
570 in
571 let commit_event : Firehose.commit_event =
572 {
573 seq = 12345L;
574 repo = "did:plc:test123";
575 rev = "3jui7kd2z2y2a";
576 since = None;
577 commit = test_cid;
578 blocks = car_data;
579 ops =
580 [
581 {
582 action = `Create;
583 path = "app.bsky.feed.post/abc123";
584 cid = Some record_cid;
585 };
586 ];
587 too_big = false;
588 }
589 in
590 let store = Repo_sync.create_memory_blockstore () in
591 match Repo_sync.process_commit_event ~store commit_event with
592 | Ok diff ->
593 Alcotest.(check int) "diff count" 1 (List.length diff);
594 (* Verify block was loaded *)
595 Alcotest.(check (option string))
596 "block loaded" (Some record_data) (store.get record_cid)
597 | Error e -> Alcotest.fail (Repo_sync.error_to_string e)
598
599let test_error_to_string () =
600 let errors =
601 [
602 Repo_sync.Parse_error "test";
603 Repo_sync.Invalid_car "bad car";
604 Repo_sync.Missing_block test_cid;
605 Repo_sync.Invalid_commit "bad commit";
606 Repo_sync.Sync_error "sync failed";
607 ]
608 in
609 List.iter
610 (fun e ->
611 let s = Repo_sync.error_to_string e in
612 Alcotest.(check bool) "error string not empty" true (String.length s > 0))
613 errors
614
615(** {1 Commit Proof Fixture Tests} *)
616
617(** Load commit-proof-fixtures.json *)
618let load_commit_proof_fixtures () =
619 (* During tests, the working directory is _build/default/test/sync *)
620 let paths =
621 [
622 "../../../../test/fixtures/firehose/commit-proof-fixtures.json";
623 "../../../test/fixtures/firehose/commit-proof-fixtures.json";
624 "../../test/fixtures/firehose/commit-proof-fixtures.json";
625 "test/fixtures/firehose/commit-proof-fixtures.json";
626 ]
627 in
628 let rec try_paths = function
629 | [] -> failwith "Could not find commit-proof-fixtures.json"
630 | path :: rest ->
631 if Sys.file_exists path then (
632 let ic = open_in path in
633 let content = really_input_string ic (in_channel_length ic) in
634 close_in ic;
635 content)
636 else try_paths rest
637 in
638 match Atproto_json.decode (try_paths paths) with
639 | Ok json -> (
640 match Atproto_json.to_array_opt json with
641 | Some fixtures -> fixtures
642 | None -> failwith "Expected array of fixtures")
643 | Error e -> failwith ("JSON parse error: " ^ e)
644
645(** Extract string from JSON *)
646let json_string json =
647 match Atproto_json.to_string_opt json with
648 | Some s -> s
649 | None -> failwith "Expected string"
650
651(** Extract string list from JSON *)
652let json_string_list json =
653 match Atproto_json.to_array_opt json with
654 | Some items -> List.map json_string items
655 | None -> failwith "Expected array of strings"
656
657(** Get field from JSON object *)
658let json_field name json =
659 match Atproto_json.to_object_opt json with
660 | Some pairs -> (
661 match Atproto_json.get name pairs with
662 | Some v -> v
663 | None -> failwith ("Expected object with field " ^ name))
664 | None -> failwith ("Expected object with field " ^ name)
665
666module Mst = Atproto_mst
667
668(** Test a single commit-proof fixture *)
669let test_commit_proof_fixture fixture () =
670 let comment = json_string (json_field "comment" fixture) in
671 let leaf_value_str = json_string (json_field "leafValue" fixture) in
672 let keys = json_string_list (json_field "keys" fixture) in
673 let adds = json_string_list (json_field "adds" fixture) in
674 let dels = json_string_list (json_field "dels" fixture) in
675 let root_before_str = json_string (json_field "rootBeforeCommit" fixture) in
676 let root_after_str = json_string (json_field "rootAfterCommit" fixture) in
677
678 (* Parse the leaf value CID - all values in the MST point to this *)
679 let leaf_value =
680 match Cid.of_string leaf_value_str with
681 | Ok cid -> cid
682 | Error e -> failwith ("Invalid leaf value CID: " ^ Cid.error_to_string e)
683 in
684
685 (* Parse expected root CIDs *)
686 let expected_root_before =
687 match Cid.of_string root_before_str with
688 | Ok cid -> cid
689 | Error e ->
690 failwith ("Invalid rootBeforeCommit CID: " ^ Cid.error_to_string e)
691 in
692 let expected_root_after =
693 match Cid.of_string root_after_str with
694 | Ok cid -> cid
695 | Error e ->
696 failwith ("Invalid rootAfterCommit CID: " ^ Cid.error_to_string e)
697 in
698
699 (* Create blockstore and MST module *)
700 let store = Mst.Memory_blockstore.create () in
701 let module M = Mst.Make (Mst.Memory_blockstore) in
702 (* Build initial MST from keys *)
703 let entries = List.map (fun k -> (k, leaf_value)) keys in
704 let root_before = M.of_entries store entries in
705
706 (* Verify root before commit *)
707 Alcotest.(check string)
708 (Printf.sprintf "[%s] rootBeforeCommit" comment)
709 (Cid.to_string expected_root_before)
710 (Cid.to_string root_before);
711
712 (* Apply adds *)
713 let root_with_adds =
714 List.fold_left
715 (fun root key -> M.add store root key leaf_value)
716 root_before adds
717 in
718
719 (* Apply deletes *)
720 let root_after =
721 List.fold_left (fun root key -> M.delete store root key) root_with_adds dels
722 in
723
724 (* Verify root after commit *)
725 Alcotest.(check string)
726 (Printf.sprintf "[%s] rootAfterCommit" comment)
727 (Cid.to_string expected_root_after)
728 (Cid.to_string root_after)
729
730(** Generate test cases from fixtures *)
731let commit_proof_tests () =
732 let fixtures = load_commit_proof_fixtures () in
733 List.mapi
734 (fun i fixture ->
735 let comment =
736 try json_string (json_field "comment" fixture)
737 with _ -> Printf.sprintf "fixture %d" i
738 in
739 Alcotest.test_case comment `Quick (test_commit_proof_fixture fixture))
740 fixtures
741
742(** {1 Test Runner} *)
743
744let () =
745 Alcotest.run "Sync"
746 [
747 ( "frame_decoding",
748 [
749 Alcotest.test_case "decode commit event" `Quick
750 test_decode_commit_event;
751 Alcotest.test_case "decode identity event" `Quick
752 test_decode_identity_event;
753 Alcotest.test_case "decode identity (no handle)" `Quick
754 test_decode_identity_event_no_handle;
755 Alcotest.test_case "decode account event" `Quick
756 test_decode_account_event;
757 Alcotest.test_case "decode handle event" `Quick
758 test_decode_handle_event;
759 Alcotest.test_case "decode tombstone event" `Quick
760 test_decode_tombstone_event;
761 Alcotest.test_case "decode info event" `Quick test_decode_info_event;
762 Alcotest.test_case "decode stream error" `Quick
763 test_decode_stream_error;
764 Alcotest.test_case "decode unknown event type" `Quick
765 test_decode_unknown_event_type;
766 Alcotest.test_case "decode invalid cbor" `Quick
767 test_decode_invalid_cbor;
768 Alcotest.test_case "decode missing payload" `Quick
769 test_decode_missing_payload;
770 ] );
771 ( "helpers",
772 [
773 Alcotest.test_case "event_seq" `Quick test_event_seq;
774 Alcotest.test_case "event_did" `Quick test_event_did;
775 ] );
776 ( "config",
777 [
778 Alcotest.test_case "config no cursor" `Quick test_config_no_cursor;
779 Alcotest.test_case "config with cursor" `Quick test_config_with_cursor;
780 ] );
781 ( "repo_sync_blockstore",
782 [
783 Alcotest.test_case "memory blockstore" `Quick test_memory_blockstore;
784 Alcotest.test_case "load car blocks" `Quick test_load_car_blocks;
785 Alcotest.test_case "load car blocks invalid" `Quick
786 test_load_car_blocks_invalid;
787 ] );
788 ( "repo_sync_diff",
789 [
790 Alcotest.test_case "diff from commit event" `Quick
791 test_diff_from_commit_event;
792 Alcotest.test_case "sync state from commit event" `Quick
793 test_sync_state_from_commit_event;
794 ] );
795 ( "repo_sync_commit",
796 [
797 Alcotest.test_case "parse commit" `Quick test_parse_commit;
798 Alcotest.test_case "parse commit with prev" `Quick
799 test_parse_commit_with_prev;
800 Alcotest.test_case "parse commit invalid" `Quick
801 test_parse_commit_invalid;
802 ] );
803 ( "repo_sync_cursor",
804 [
805 Alcotest.test_case "cursor roundtrip" `Quick test_cursor_roundtrip;
806 Alcotest.test_case "cursor invalid" `Quick test_cursor_invalid;
807 Alcotest.test_case "cursor of event" `Quick test_cursor_of_event;
808 Alcotest.test_case "cursor of event no seq" `Quick
809 test_cursor_of_event_no_seq;
810 ] );
811 ( "repo_sync_apply",
812 [
813 Alcotest.test_case "apply diff create" `Quick test_apply_diff_create;
814 Alcotest.test_case "apply diff delete" `Quick test_apply_diff_delete;
815 Alcotest.test_case "apply diff missing block" `Quick
816 test_apply_diff_missing_block;
817 Alcotest.test_case "process commit event" `Quick
818 test_process_commit_event;
819 ] );
820 ( "repo_sync_errors",
821 [ Alcotest.test_case "error to string" `Quick test_error_to_string ] );
822 ("commit_proof_fixtures", commit_proof_tests ());
823 ]