+25
-44
examples/firehose_demo/firehose_demo.ml
+25
-44
examples/firehose_demo/firehose_demo.ml
···
74
74
| Firehose.Info m -> Printf.sprintf "#info name=%s" m.name
75
75
| Firehose.StreamError err -> Printf.sprintf "#error %s" err
76
76
77
-
let json_of_op blocks (op : Firehose.operation) =
78
-
let action =
79
-
match op.action with
80
-
| `Create -> "create"
81
-
| `Update -> "update"
82
-
| `Delete -> "delete"
77
+
let json_of_op_with_record blocks (op : Firehose.operation) =
78
+
let base_json = Firehose.operation_to_json op in
79
+
match find_block blocks op with
80
+
| Some cbor ->
81
+
let record_json = Yojson.Safe.from_string (Dag_json.encode_string cbor) in
82
+
Firehose.add_record_to_op_json base_json record_json
83
+
| None -> base_json
84
+
85
+
let json_of_event ?(rich = false) event =
86
+
let json =
87
+
match event with
88
+
| Firehose.Commit evt when rich -> (
89
+
let blocks = extract_blocks evt.blocks in
90
+
let ops = `List (List.map (json_of_op_with_record blocks) evt.ops) in
91
+
let base = Firehose.commit_event_to_json evt in
92
+
(* Replace the ops field with our enriched version *)
93
+
match base with
94
+
| `Assoc fields ->
95
+
`Assoc
96
+
(List.map
97
+
(fun (k, v) -> if k = "ops" then (k, ops) else (k, v))
98
+
fields)
99
+
| _ -> base)
100
+
| _ -> Firehose.event_to_json event
83
101
in
84
-
let cid_json =
85
-
match op.cid with
86
-
| Some c -> ",\"cid\":\"" ^ Cid.to_string c ^ "\""
87
-
| None -> ""
88
-
in
89
-
let record_json =
90
-
match find_block blocks op with
91
-
| Some cbor -> ",\"record\":" ^ Dag_json.encode_string cbor
92
-
| None -> ""
93
-
in
94
-
Printf.sprintf "{\"action\":\"%s\",\"path\":\"%s\"%s%s}" action op.path
95
-
cid_json record_json
96
-
97
-
let json_of_event ?(rich = false) = function
98
-
| Firehose.Commit evt ->
99
-
let blocks = if rich then extract_blocks evt.blocks else [] in
100
-
let ops = String.concat "," (List.map (json_of_op blocks) evt.ops) in
101
-
Printf.sprintf
102
-
"{\"type\":\"commit\",\"seq\":%Ld,\"repo\":\"%s\",\"ops\":[%s]}" evt.seq
103
-
evt.repo ops
104
-
| Firehose.Identity e ->
105
-
Printf.sprintf "{\"type\":\"identity\",\"seq\":%Ld,\"did\":\"%s\"}" e.seq
106
-
e.did
107
-
| Firehose.Account e ->
108
-
Printf.sprintf
109
-
"{\"type\":\"account\",\"seq\":%Ld,\"did\":\"%s\",\"active\":%b}" e.seq
110
-
e.did e.active
111
-
| Firehose.Handle e ->
112
-
Printf.sprintf
113
-
"{\"type\":\"handle\",\"seq\":%Ld,\"did\":\"%s\",\"handle\":\"%s\"}"
114
-
e.seq e.did e.handle
115
-
| Firehose.Tombstone e ->
116
-
Printf.sprintf "{\"type\":\"tombstone\",\"seq\":%Ld,\"did\":\"%s\"}" e.seq
117
-
e.did
118
-
| Firehose.Info m ->
119
-
Printf.sprintf "{\"type\":\"info\",\"name\":\"%s\"}" m.name
120
-
| Firehose.StreamError err ->
121
-
Printf.sprintf "{\"type\":\"error\",\"message\":\"%s\"}" err
102
+
Yojson.Safe.to_string json
122
103
123
104
type filter =
124
105
| Posts
+121
lib/sync/firehose.ml
+121
lib/sync/firehose.ml
···
341
341
| Tombstone e -> Some e.did
342
342
| Info _ -> None
343
343
| StreamError _ -> None
344
+
345
+
(** {1 JSON Serialization} *)
346
+
347
+
(** Convert an operation to JSON. Does not include record content - use
348
+
[add_record_to_op_json] to add it. *)
349
+
let operation_to_json (op : operation) : Yojson.Safe.t =
350
+
let action =
351
+
match op.action with
352
+
| `Create -> "create"
353
+
| `Update -> "update"
354
+
| `Delete -> "delete"
355
+
in
356
+
let base = [ ("action", `String action); ("path", `String op.path) ] in
357
+
let with_cid =
358
+
match op.cid with
359
+
| Some c -> base @ [ ("cid", `String (Cid.to_string c)) ]
360
+
| None -> base
361
+
in
362
+
`Assoc with_cid
363
+
364
+
(** Add a record field to an operation JSON object *)
365
+
let add_record_to_op_json (op_json : Yojson.Safe.t) (record : Yojson.Safe.t) :
366
+
Yojson.Safe.t =
367
+
match op_json with
368
+
| `Assoc fields -> `Assoc (fields @ [ ("record", record) ])
369
+
| _ -> op_json
370
+
371
+
(** Convert a commit event to JSON. Operations don't include record content. *)
372
+
let commit_event_to_json (evt : commit_event) : Yojson.Safe.t =
373
+
let ops = `List (List.map operation_to_json evt.ops) in
374
+
`Assoc
375
+
[
376
+
("type", `String "commit");
377
+
("seq", `Intlit (Int64.to_string evt.seq));
378
+
("repo", `String evt.repo);
379
+
("rev", `String evt.rev);
380
+
("commit", `String (Cid.to_string evt.commit));
381
+
("ops", ops);
382
+
]
383
+
384
+
(** Convert an identity event to JSON *)
385
+
let identity_event_to_json (e : identity_event) : Yojson.Safe.t =
386
+
let base =
387
+
[
388
+
("type", `String "identity");
389
+
("seq", `Intlit (Int64.to_string e.seq));
390
+
("did", `String e.did);
391
+
("time", `String e.time);
392
+
]
393
+
in
394
+
let with_handle =
395
+
match e.handle with
396
+
| Some h -> base @ [ ("handle", `String h) ]
397
+
| None -> base
398
+
in
399
+
`Assoc with_handle
400
+
401
+
(** Convert an account event to JSON *)
402
+
let account_event_to_json (e : account_event) : Yojson.Safe.t =
403
+
let base =
404
+
[
405
+
("type", `String "account");
406
+
("seq", `Intlit (Int64.to_string e.seq));
407
+
("did", `String e.did);
408
+
("time", `String e.time);
409
+
("active", `Bool e.active);
410
+
]
411
+
in
412
+
let with_status =
413
+
match e.status with
414
+
| Some s -> base @ [ ("status", `String s) ]
415
+
| None -> base
416
+
in
417
+
`Assoc with_status
418
+
419
+
(** Convert a handle event to JSON *)
420
+
let handle_event_to_json (e : handle_event) : Yojson.Safe.t =
421
+
`Assoc
422
+
[
423
+
("type", `String "handle");
424
+
("seq", `Intlit (Int64.to_string e.seq));
425
+
("did", `String e.did);
426
+
("time", `String e.time);
427
+
("handle", `String e.handle);
428
+
]
429
+
430
+
(** Convert a tombstone event to JSON *)
431
+
let tombstone_event_to_json (e : tombstone_event) : Yojson.Safe.t =
432
+
`Assoc
433
+
[
434
+
("type", `String "tombstone");
435
+
("seq", `Intlit (Int64.to_string e.seq));
436
+
("did", `String e.did);
437
+
("time", `String e.time);
438
+
]
439
+
440
+
(** Convert an info message to JSON *)
441
+
let info_message_to_json (m : info_message) : Yojson.Safe.t =
442
+
let base = [ ("type", `String "info"); ("name", `String m.name) ] in
443
+
let with_msg =
444
+
match m.message with
445
+
| Some msg -> base @ [ ("message", `String msg) ]
446
+
| None -> base
447
+
in
448
+
`Assoc with_msg
449
+
450
+
(** Convert any event to JSON. For commit events, operations don't include
451
+
record content - extract blocks separately and use [add_record_to_op_json].
452
+
*)
453
+
let event_to_json = function
454
+
| Commit evt -> commit_event_to_json evt
455
+
| Identity e -> identity_event_to_json e
456
+
| Account e -> account_event_to_json e
457
+
| Handle e -> handle_event_to_json e
458
+
| Tombstone e -> tombstone_event_to_json e
459
+
| Info m -> info_message_to_json m
460
+
| StreamError msg ->
461
+
`Assoc [ ("type", `String "error"); ("message", `String msg) ]
462
+
463
+
(** Convert an event to a JSON string *)
464
+
let event_to_json_string evt = Yojson.Safe.to_string (event_to_json evt)