+16
examples/firehose_demo/dune
+16
examples/firehose_demo/dune
···
14
14
mirage-crypto-rng.unix
15
15
base64
16
16
cstruct))
17
+
18
+
(executable
19
+
(name firehose_demo_lwt)
20
+
(public_name firehose_demo_lwt)
21
+
(package atproto)
22
+
(libraries
23
+
atproto-sync
24
+
atproto-ipld
25
+
climate
26
+
uri
27
+
lwt
28
+
lwt.unix
29
+
websocket-lwt-unix
30
+
conduit-lwt-unix
31
+
tls-lwt
32
+
mirage-crypto-rng.unix))
+380
examples/firehose_demo/firehose_demo_lwt.ml
+380
examples/firehose_demo/firehose_demo_lwt.ml
···
1
+
(** Firehose Demo (Lwt version) - AT Protocol real-time events. Uses
2
+
websocket-lwt-unix for WebSocket handling. Use --help for options. *)
3
+
4
+
open Lwt.Infix
5
+
module Firehose = Atproto_sync.Firehose
6
+
module Car = Atproto_ipld.Car
7
+
module Dag_cbor = Atproto_ipld.Dag_cbor
8
+
module Dag_json = Atproto_ipld.Dag_json
9
+
module Cid = Atproto_ipld.Cid
10
+
11
+
let truncate n s =
12
+
if String.length s <= n then s else String.sub s 0 (n - 3) ^ "..."
13
+
14
+
let single_line s =
15
+
String.map (fun c -> if c = '\n' || c = '\r' then ' ' else c) s
16
+
17
+
let opt_or d = Option.value ~default:d
18
+
19
+
let extract_blocks blocks_data =
20
+
if String.length blocks_data = 0 then []
21
+
else
22
+
match Car.read blocks_data with
23
+
| Error _ -> []
24
+
| Ok (_, blocks) ->
25
+
List.filter_map
26
+
(fun (b : Car.block) ->
27
+
match Dag_cbor.decode b.data with
28
+
| Ok cbor -> Some (Cid.to_string b.cid, cbor)
29
+
| _ -> None)
30
+
blocks
31
+
32
+
let find_block blocks (op : Firehose.operation) =
33
+
match op.cid with
34
+
| Some cid -> List.assoc_opt (Cid.to_string cid) blocks
35
+
| None -> None
36
+
37
+
let get_text cbor =
38
+
match cbor with
39
+
| Dag_cbor.Map pairs -> Firehose.get_string "text" pairs
40
+
| _ -> None
41
+
42
+
let format_op blocks (op : Firehose.operation) =
43
+
let action =
44
+
match op.action with `Create -> "+" | `Update -> "~" | `Delete -> "-"
45
+
in
46
+
let content =
47
+
match find_block blocks op with
48
+
| Some cbor -> (
49
+
match get_text cbor with
50
+
| Some t -> " \"" ^ truncate 70 (single_line t) ^ "\""
51
+
| None -> "")
52
+
| None -> ""
53
+
in
54
+
Printf.sprintf "%s%s%s" action op.path content
55
+
56
+
let format_event ?(rich = false) = function
57
+
| Firehose.Commit evt ->
58
+
let blocks = if rich then extract_blocks evt.blocks else [] in
59
+
let ops =
60
+
String.concat
61
+
(if rich then "\n " else " ")
62
+
(List.map (format_op blocks) evt.ops)
63
+
in
64
+
Printf.sprintf "#commit seq=%Ld repo=%s%s%s" evt.seq evt.repo
65
+
(if rich then "\n " else " ")
66
+
ops
67
+
| Firehose.Identity e ->
68
+
Printf.sprintf "#identity seq=%Ld did=%s handle=%s" e.seq e.did
69
+
(opt_or "-" e.handle)
70
+
| Firehose.Account e ->
71
+
Printf.sprintf "#account seq=%Ld did=%s active=%b" e.seq e.did e.active
72
+
| Firehose.Handle e ->
73
+
Printf.sprintf "#handle seq=%Ld did=%s handle=%s" e.seq e.did e.handle
74
+
| Firehose.Tombstone e ->
75
+
Printf.sprintf "#tombstone seq=%Ld did=%s" e.seq e.did
76
+
| Firehose.Info m -> Printf.sprintf "#info name=%s" m.name
77
+
| Firehose.StreamError err -> Printf.sprintf "#error %s" err
78
+
79
+
let json_of_op_with_record blocks (op : Firehose.operation) =
80
+
let base_json = Firehose.operation_to_json op in
81
+
match find_block blocks op with
82
+
| Some cbor ->
83
+
let record_json = Yojson.Safe.from_string (Dag_json.encode_string cbor) in
84
+
Firehose.add_record_to_op_json base_json record_json
85
+
| None -> base_json
86
+
87
+
let json_of_event ?(rich = false) event =
88
+
let json =
89
+
match event with
90
+
| Firehose.Commit evt when rich -> (
91
+
let blocks = extract_blocks evt.blocks in
92
+
let ops = `List (List.map (json_of_op_with_record blocks) evt.ops) in
93
+
let base = Firehose.commit_event_to_json evt in
94
+
match base with
95
+
| `Assoc fields ->
96
+
`Assoc
97
+
(List.map
98
+
(fun (k, v) -> if k = "ops" then (k, ops) else (k, v))
99
+
fields)
100
+
| _ -> base)
101
+
| _ -> Firehose.event_to_json event
102
+
in
103
+
Yojson.Safe.to_string json
104
+
105
+
type filter =
106
+
| Posts
107
+
| Likes
108
+
| Follows
109
+
| Reposts
110
+
| Blocks
111
+
| Lists
112
+
| Profiles
113
+
| Feeds
114
+
| Commits
115
+
| Identities
116
+
| Accounts
117
+
| Handles
118
+
| Tombstones
119
+
120
+
let filters =
121
+
[
122
+
("posts", Posts);
123
+
("likes", Likes);
124
+
("follows", Follows);
125
+
("reposts", Reposts);
126
+
("blocks", Blocks);
127
+
("lists", Lists);
128
+
("profiles", Profiles);
129
+
("feeds", Feeds);
130
+
("commits", Commits);
131
+
("identities", Identities);
132
+
("accounts", Accounts);
133
+
("handles", Handles);
134
+
("tombstones", Tombstones);
135
+
]
136
+
137
+
let filter_of_string s = List.assoc_opt s filters
138
+
let filter_to_string f = fst (List.find (fun (_, v) -> v = f) filters)
139
+
140
+
let prefix_match prefix path =
141
+
String.length path >= String.length prefix
142
+
&& String.sub path 0 (String.length prefix) = prefix
143
+
144
+
let op_matches filter (op : Firehose.operation) =
145
+
match filter with
146
+
| Posts -> prefix_match "app.bsky.feed.post" op.path
147
+
| Likes -> prefix_match "app.bsky.feed.like" op.path
148
+
| Follows -> prefix_match "app.bsky.graph.follow" op.path
149
+
| Reposts -> prefix_match "app.bsky.feed.repost" op.path
150
+
| Blocks -> prefix_match "app.bsky.graph.block" op.path
151
+
| Lists -> prefix_match "app.bsky.graph.list" op.path
152
+
| Profiles -> prefix_match "app.bsky.actor.profile" op.path
153
+
| Feeds -> prefix_match "app.bsky.feed.generator" op.path
154
+
| Commits -> true
155
+
| _ -> false
156
+
157
+
let event_matches filters event =
158
+
match filters with
159
+
| [] -> true
160
+
| _ -> (
161
+
match event with
162
+
| Firehose.Commit evt ->
163
+
List.exists
164
+
(fun f -> f = Commits || List.exists (op_matches f) evt.ops)
165
+
filters
166
+
| Firehose.Identity _ -> List.mem Identities filters
167
+
| Firehose.Account _ -> List.mem Accounts filters
168
+
| Firehose.Handle _ -> List.mem Handles filters
169
+
| Firehose.Tombstone _ -> List.mem Tombstones filters
170
+
| Firehose.Info _ | Firehose.StreamError _ -> true)
171
+
172
+
(** WebSocket client using websocket-lwt-unix *)
173
+
module Ws = struct
174
+
type conn = Websocket_lwt_unix.conn
175
+
176
+
let connect uri : (conn, string) result Lwt.t =
177
+
let host = Uri.host uri |> Option.value ~default:"localhost" in
178
+
let scheme = Uri.scheme uri |> Option.value ~default:"wss" in
179
+
let port =
180
+
Uri.port uri |> Option.value ~default:(if scheme = "wss" then 443 else 80)
181
+
in
182
+
let resource = Uri.path_and_query uri in
183
+
(* Resolve hostname to IP *)
184
+
Lwt.catch
185
+
(fun () ->
186
+
Lwt_unix.getaddrinfo host (string_of_int port)
187
+
[ Unix.AI_SOCKTYPE Unix.SOCK_STREAM ]
188
+
>>= function
189
+
| [] -> Lwt.return (Error ("DNS failed: " ^ host))
190
+
| ai :: _ ->
191
+
let ip =
192
+
match ai.Unix.ai_addr with
193
+
| Unix.ADDR_INET (addr, _) -> Ipaddr_unix.of_inet_addr addr
194
+
| _ -> failwith "Unexpected address type"
195
+
in
196
+
let endp = `TLS_native (`Hostname host, `IP ip, `Port port) in
197
+
Websocket_lwt_unix.connect endp (Uri.with_path uri resource)
198
+
>|= fun conn -> Ok conn)
199
+
(fun exn -> Lwt.return (Error (Printexc.to_string exn)))
200
+
201
+
let recv conn : (string, string) result Lwt.t =
202
+
let open Websocket in
203
+
let rec read_binary () =
204
+
Lwt.catch
205
+
(fun () ->
206
+
Websocket_lwt_unix.read conn >>= fun frame ->
207
+
match frame.Frame.opcode with
208
+
| Frame.Opcode.Binary -> Lwt.return (Ok frame.Frame.content)
209
+
| Frame.Opcode.Text -> read_binary () (* Skip text frames *)
210
+
| Frame.Opcode.Close -> Lwt.return (Error "Connection closed")
211
+
| Frame.Opcode.Ping ->
212
+
(* Respond to ping with pong *)
213
+
let pong =
214
+
Frame.create ~opcode:Frame.Opcode.Pong
215
+
~content:frame.Frame.content ()
216
+
in
217
+
Websocket_lwt_unix.write conn pong >>= fun () -> read_binary ()
218
+
| Frame.Opcode.Pong -> read_binary ()
219
+
| _ -> read_binary ())
220
+
(fun exn -> Lwt.return (Error (Printexc.to_string exn)))
221
+
in
222
+
read_binary ()
223
+
224
+
let close conn =
225
+
Lwt.catch
226
+
(fun () ->
227
+
let close_frame = Websocket.Frame.close 1000 in
228
+
Websocket_lwt_unix.write conn close_frame >>= fun () ->
229
+
Websocket_lwt_unix.close_transport conn)
230
+
(fun _ -> Lwt.return_unit)
231
+
end
232
+
233
+
(** Effect handler that uses Lwt for WebSocket operations *)
234
+
let with_websocket_lwt f =
235
+
let open Effect.Deep in
236
+
(* We need to track Lwt state across effect invocations *)
237
+
let ws_conn : Ws.conn option ref = ref None in
238
+
try_with f ()
239
+
{
240
+
effc =
241
+
(fun (type a) (eff : a Effect.t) ->
242
+
match eff with
243
+
| Firehose.Ws_connect uri ->
244
+
Some
245
+
(fun (k : (a, _) continuation) ->
246
+
let result = Lwt_main.run (Ws.connect uri) in
247
+
(match result with
248
+
| Ok conn -> ws_conn := Some conn
249
+
| Error _ -> ());
250
+
continue k (Result.map Obj.magic result))
251
+
| Firehose.Ws_recv _ws ->
252
+
Some
253
+
(fun k ->
254
+
match !ws_conn with
255
+
| None -> continue k (Error "Not connected")
256
+
| Some conn ->
257
+
let result = Lwt_main.run (Ws.recv conn) in
258
+
continue k result)
259
+
| Firehose.Ws_close _ws ->
260
+
Some
261
+
(fun k ->
262
+
(match !ws_conn with
263
+
| None -> ()
264
+
| Some conn ->
265
+
Lwt_main.run (Ws.close conn);
266
+
ws_conn := None);
267
+
continue k ())
268
+
| _ -> None);
269
+
}
270
+
271
+
type config = {
272
+
cursor : int64 option;
273
+
limit : int option;
274
+
filters : filter list;
275
+
json : bool;
276
+
rich : bool;
277
+
}
278
+
279
+
let parse_filters s =
280
+
String.split_on_char ',' s
281
+
|> List.filter_map (fun p -> filter_of_string (String.trim p))
282
+
283
+
let int64_conv =
284
+
let open Climate.Arg_parser in
285
+
make_conv
286
+
~parse:(fun s ->
287
+
match Int64.of_string_opt s with
288
+
| Some n -> Ok n
289
+
| None -> Error (`Msg "invalid integer"))
290
+
~print:(fun fmt n -> Format.fprintf fmt "%Ld" n)
291
+
()
292
+
293
+
let config_parser =
294
+
let open Climate.Arg_parser in
295
+
let+ cursor = named_opt [ "cursor" ] int64_conv ~doc:"Resume from sequence"
296
+
and+ limit = named_opt [ "limit" ] int ~doc:"Stop after N events"
297
+
and+ filter_str =
298
+
named_opt [ "filter" ] string
299
+
~doc:"Filter (posts,likes,follows,reposts,etc.)"
300
+
and+ json = flag [ "json" ] ~doc:"JSON output"
301
+
and+ rich = flag [ "content" ] ~doc:"Show record content" in
302
+
{
303
+
cursor;
304
+
limit;
305
+
filters = Option.fold ~none:[] ~some:parse_filters filter_str;
306
+
json;
307
+
rich;
308
+
}
309
+
310
+
let cli =
311
+
Climate.Command.singleton ~doc:"AT Protocol firehose demo (Lwt)" config_parser
312
+
313
+
type stats = {
314
+
mutable total : int;
315
+
mutable matched : int;
316
+
mutable last_seq : int64;
317
+
mutable start : float;
318
+
}
319
+
320
+
let print_stats s =
321
+
let elapsed = Unix.gettimeofday () -. s.start in
322
+
let rate = if elapsed > 0. then float_of_int s.total /. elapsed else 0. in
323
+
Printf.printf
324
+
"\n--- Stats: %d events (%d matched) | seq=%Ld | %.1fs | %.1f evt/s ---\n%!"
325
+
s.total s.matched s.last_seq elapsed rate
326
+
327
+
let stats = { total = 0; matched = 0; last_seq = 0L; start = 0. }
328
+
let interrupted = ref false
329
+
330
+
let run config =
331
+
let uri =
332
+
Uri.of_string "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
333
+
in
334
+
let cfg = Firehose.config ~uri ?cursor:config.cursor () in
335
+
if not config.json then
336
+
Printf.printf "Connecting to %s...\n%!"
337
+
(Uri.to_string (Firehose.build_uri cfg));
338
+
let fmt =
339
+
if config.json then json_of_event ~rich:config.rich
340
+
else format_event ~rich:config.rich
341
+
in
342
+
let handler event =
343
+
stats.total <- stats.total + 1;
344
+
(match Firehose.event_seq event with
345
+
| Some s -> stats.last_seq <- s
346
+
| None -> ());
347
+
if event_matches config.filters event then begin
348
+
print_endline (fmt event);
349
+
stats.matched <- stats.matched + 1;
350
+
match config.limit with
351
+
| Some max when stats.matched >= max -> false
352
+
| _ -> not !interrupted
353
+
end
354
+
else not !interrupted
355
+
in
356
+
with_websocket_lwt (fun () ->
357
+
if not config.json then Printf.printf "Connected!\n\n%!";
358
+
match Firehose.subscribe cfg ~handler with
359
+
| Ok () -> ()
360
+
| Error e -> Printf.eprintf "Error: %s\n%!" (Firehose.error_to_string e))
361
+
362
+
let () =
363
+
let config =
364
+
Climate.Command.run
365
+
~program_name:(Climate.Program_name.Literal "firehose_demo_lwt") cli
366
+
in
367
+
if not config.json then begin
368
+
Printf.printf
369
+
"AT Protocol Firehose Demo (Lwt)\n===============================\n\n%!";
370
+
if config.filters <> [] then
371
+
Printf.printf "Filtering: %s\n\n%!"
372
+
(String.concat ", " (List.map filter_to_string config.filters))
373
+
end;
374
+
Sys.set_signal Sys.sigint (Sys.Signal_handle (fun _ -> interrupted := true));
375
+
stats.start <- Unix.gettimeofday ();
376
+
Mirage_crypto_rng_unix.use_default ();
377
+
(try run config with
378
+
| Failure m -> Printf.eprintf "Error: %s\n%!" m
379
+
| e -> Printf.eprintf "Error: %s\n%!" (Printexc.to_string e));
380
+
if not config.json then print_stats stats