+20
-12
examples/firehose_demo/firehose_demo_lwt.ml
+20
-12
examples/firehose_demo/firehose_demo_lwt.ml
···
8
8
module Dag_json = Atproto_ipld.Dag_json
9
9
module Cid = Atproto_ipld.Cid
10
10
11
+
(* Global interrupt flag - set by signal handler *)
12
+
let interrupted = ref false
13
+
11
14
let truncate n s =
12
15
if String.length s <= n then s else String.sub s 0 (n - 3) ^ "..."
13
16
···
251
254
| Firehose.Ws_recv _ws ->
252
255
Some
253
256
(fun k ->
254
-
match !ws_conn with
255
-
| None -> continue k (Error "Not connected")
256
-
| Some conn ->
257
-
let result = Lwt_main.run (Ws.recv conn) in
258
-
continue k result)
257
+
if !interrupted then continue k (Error "Interrupted")
258
+
else
259
+
match !ws_conn with
260
+
| None -> continue k (Error "Not connected")
261
+
| Some conn ->
262
+
let result = Lwt_main.run (Ws.recv conn) in
263
+
if !interrupted then continue k (Error "Interrupted")
264
+
else continue k result)
259
265
| Firehose.Ws_close _ws ->
260
266
Some
261
267
(fun k ->
···
325
331
s.total s.matched s.last_seq elapsed rate
326
332
327
333
let stats = { total = 0; matched = 0; last_seq = 0L; start = 0. }
328
-
let interrupted = ref false
334
+
let json_mode = ref false
335
+
let handle_sigint _ = interrupted := true
329
336
330
337
let run config =
331
338
let uri =
···
357
364
if not config.json then Printf.printf "Connected!\n\n%!";
358
365
match Firehose.subscribe cfg ~handler with
359
366
| Ok () -> ()
360
-
| Error e -> Printf.eprintf "Error: %s\n%!" (Firehose.error_to_string e))
367
+
| Error _ -> () (* Errors on exit are expected - just exit cleanly *))
361
368
362
369
let () =
363
370
let config =
364
371
Climate.Command.run
365
372
~program_name:(Climate.Program_name.Literal "firehose_demo_lwt") cli
366
373
in
374
+
json_mode := config.json;
367
375
if not config.json then begin
368
376
Printf.printf
369
377
"AT Protocol Firehose Demo (Lwt)\n===============================\n\n%!";
370
378
if config.filters <> [] then
371
379
Printf.printf "Filtering: %s\n\n%!"
372
-
(String.concat ", " (List.map filter_to_string config.filters))
380
+
(String.concat ", " (List.map filter_to_string config.filters));
381
+
Printf.printf "Press Ctrl+C to stop and show stats.\n\n%!"
373
382
end;
374
-
Sys.set_signal Sys.sigint (Sys.Signal_handle (fun _ -> interrupted := true));
383
+
(* Use Lwt_unix signal handling for proper integration with Lwt *)
384
+
let _ = Lwt_unix.on_signal Sys.sigint (fun _ -> handle_sigint ()) in
375
385
stats.start <- Unix.gettimeofday ();
376
386
Mirage_crypto_rng_unix.use_default ();
377
-
(try run config with
378
-
| Failure m -> Printf.eprintf "Error: %s\n%!" m
379
-
| e -> Printf.eprintf "Error: %s\n%!" (Printexc.to_string e));
387
+
(try run config with _ -> ());
380
388
if not config.json then print_stats stats