+30
-4
examples/firehose_demo/firehose_demo_lwt.ml
+30
-4
examples/firehose_demo/firehose_demo_lwt.ml
···
8
8
module Dag_json = Atproto_ipld.Dag_json
9
9
module Cid = Atproto_ipld.Cid
10
10
11
-
(* Global interrupt flag - set by signal handler *)
11
+
(* Global interrupt handling via Lwt *)
12
+
let interrupt_wakener : (unit, exn) result Lwt.u option ref = ref None
12
13
let interrupted = ref false
14
+
15
+
let trigger_interrupt () =
16
+
interrupted := true;
17
+
match !interrupt_wakener with
18
+
| Some w ->
19
+
Lwt.wakeup_later w (Error Exit);
20
+
interrupt_wakener := None
21
+
| None -> ()
13
22
14
23
let truncate n s =
15
24
if String.length s <= n then s else String.sub s 0 (n - 3) ^ "..."
···
205
214
let frag_buf = Buffer.create 65536
206
215
let frag_opcode = ref Websocket.Frame.Opcode.Continuation
207
216
208
-
let recv conn : (string, string) result Lwt.t =
217
+
let recv_impl conn : (string, string) result Lwt.t =
209
218
let open Websocket in
210
219
let rec read_binary () =
211
220
Lwt.catch
···
261
270
in
262
271
read_binary ()
263
272
273
+
(** Interruptible recv - races against interrupt signal *)
274
+
let recv conn : (string, string) result Lwt.t =
275
+
(* Create a waiter that can be woken by Ctrl+C *)
276
+
let interrupt_waiter, wakener = Lwt.wait () in
277
+
interrupt_wakener := Some wakener;
278
+
(* Race between recv and interrupt *)
279
+
Lwt.pick
280
+
[
281
+
recv_impl conn;
282
+
( interrupt_waiter >>= function
283
+
| Ok () -> Lwt.return (Error "Interrupted")
284
+
| Error _ -> Lwt.return (Error "Interrupted") );
285
+
]
286
+
>>= fun result ->
287
+
(* Clear the wakener if we won the race *)
288
+
interrupt_wakener := None;
289
+
Lwt.return result
290
+
264
291
let close conn =
265
292
Lwt.catch
266
293
(fun () ->
···
366
393
367
394
let stats = { total = 0; matched = 0; last_seq = 0L; start = 0. }
368
395
let json_mode = ref false
369
-
let handle_sigint _ = interrupted := true
370
396
371
397
let run config =
372
398
let uri =
···
419
445
Printf.printf "Press Ctrl+C to stop and show stats.\n\n%!"
420
446
end;
421
447
(* Use Lwt_unix signal handling for proper integration with Lwt *)
422
-
let _ = Lwt_unix.on_signal Sys.sigint (fun _ -> handle_sigint ()) in
448
+
let _ = Lwt_unix.on_signal Sys.sigint (fun _ -> trigger_interrupt ()) in
423
449
stats.start <- Unix.gettimeofday ();
424
450
Mirage_crypto_rng_unix.use_default ();
425
451
(try run config with _ -> ());