···44 let cursor =
55 match Dream.query req "cursor" with
66 | Some s ->
77- max 0 (Option.value (int_of_string_opt s) ~default:0)
77+ Some (max 0 (Option.value (int_of_string_opt s) ~default:0))
88 | None ->
99- 0
99+ None
1010 in
1111 let closed = ref false in
1212 let send (bytes : bytes) =
+104-91
pegasus/lib/sequencer.ml
···621621end
622622623623module Live = struct
624624- let stream_with_backfill ~(conn : Data_store.t) ~(cursor : int)
625625- ~(send : bytes -> unit Lwt.t) : unit Lwt.t =
626626- let%lwt sub = Bus.subscribe () in
627627- let send_consumer_too_slow () =
628628- let err =
629629- { error= "ConsumerTooSlow"
630630- ; message=
631631- Some
632632- "you're not consuming messages fast enough! maybe \
633633- com.atproto.sync.getRepo is more your speed?" }
634634- in
635635- send (Frame.encode_error err)
624624+ let send_consumer_too_slow ~send =
625625+ let err =
626626+ { error= "ConsumerTooSlow"
627627+ ; message=
628628+ Some
629629+ "you're not consuming messages fast enough! maybe \
630630+ com.atproto.sync.getRepo is more your speed?" }
636631 in
632632+ send (Frame.encode_error err)
633633+634634+ let live_loop ~(conn : Data_store.t) ~(sub : Bus.subscriber) ~(send : bytes -> unit Lwt.t)
635635+ ~(start_seq : int) : unit Lwt.t =
636636+ let rec loop last =
637637+ if sub.Bus.closed then
638638+ match sub.Bus.close_reason with
639639+ | Some "ConsumerTooSlow" ->
640640+ send_consumer_too_slow ~send
641641+ | _ ->
642642+ Lwt.return_unit
643643+ else
644644+ Lwt.catch
645645+ (fun () ->
646646+ let%lwt it = Bus.wait_next sub in
647647+ if it.seq <= last then loop last
648648+ else if it.seq > last + 1 then
649649+ let%lwt gap =
650650+ DB.request_seq_range ~earliest_seq:last ~latest_seq:(it.seq - 1)
651651+ ~limit:1000 conn
652652+ in
653653+ let%lwt () =
654654+ Lwt_list.iter_s
655655+ (fun ev ->
656656+ if ev.seq <= last then Lwt.return_unit
657657+ else
658658+ send
659659+ ( match ev.kind with
660660+ | Message (m, _) ->
661661+ Frame.encode_message ~seq:ev.seq ~time:ev.time m
662662+ | Error e ->
663663+ Frame.encode_error e ) )
664664+ gap
665665+ in
666666+ send it.bytes >>= fun () -> loop it.seq
667667+ else send it.bytes >>= fun () -> loop it.seq )
668668+ (fun _exn ->
669669+ (* check if any failure was due to slow consumer *)
670670+ match sub.Bus.close_reason with
671671+ | Some "ConsumerTooSlow" ->
672672+ send_consumer_too_slow ~send
673673+ | _ ->
674674+ Lwt.return_unit )
675675+ in
676676+ loop start_seq
677677+678678+ let stream_live ~(conn : Data_store.t) ~(send : bytes -> unit Lwt.t) : unit Lwt.t =
679679+ let%lwt sub = Bus.subscribe () in
637680 Lwt.finalize
638681 (fun () ->
639639- let%lwt head_db = DB.latest_seq conn in
640640- let cutoff = head_db in
641641- (* try backfill from buffer first *)
642642- let%lwt ring = Bus.ring_after cursor in
643643- let ring_covers =
644644- match ring with
645645- | [] ->
646646- false
647647- | first :: _ -> (
648648- match List.rev ring with
649649- | [] ->
650650- false
651651- | last :: _ ->
652652- (* ring covers if it goes from <= cursor all the way to >= cutoff *)
653653- first.seq <= cursor && last.seq >= cutoff )
654654- in
655655- ( if ring_covers then
656656- Lwt_list.iter_s (fun (it : Bus.item) -> send it.bytes) ring
657657- else
658658- let%lwt events =
659659- DB.request_seq_range ~earliest_seq:cursor ~latest_seq:cutoff
660660- ~limit:1000 conn
682682+ let%lwt start_seq = DB.latest_seq conn in
683683+ live_loop ~conn ~sub ~send ~start_seq )
684684+ (fun () -> Bus.unsubscribe sub)
685685+686686+ let stream_with_backfill ~(conn : Data_store.t) ~(cursor : int option)
687687+ ~(send : bytes -> unit Lwt.t) : unit Lwt.t =
688688+ match cursor with
689689+ | None ->
690690+ stream_live ~conn ~send
691691+ | Some cursor ->
692692+ let%lwt sub = Bus.subscribe () in
693693+ Lwt.finalize
694694+ (fun () ->
695695+ let%lwt head_db = DB.latest_seq conn in
696696+ let cutoff = head_db in
697697+ (* try backfill from buffer first *)
698698+ let%lwt ring = Bus.ring_after cursor in
699699+ let ring_covers =
700700+ match ring with
701701+ | [] ->
702702+ false
703703+ | first :: _ -> (
704704+ match List.rev ring with
705705+ | [] ->
706706+ false
707707+ | last :: _ ->
708708+ (* ring covers if it goes from <= cursor all the way to >= cutoff *)
709709+ first.seq <= cursor && last.seq >= cutoff )
661710 in
662662- Lwt_list.iter_s
663663- (fun ev ->
664664- match ev.kind with
665665- | Error _ ->
666666- Lwt.return_unit
667667- | Message (payload, _) ->
668668- send
669669- (Frame.encode_message ~seq:ev.seq ~time:ev.time payload) )
670670- events )
671671- >>= fun () ->
672672- (* bail if consumer too slow *)
673673- if sub.Bus.closed then
674674- match sub.Bus.close_reason with
675675- | Some "ConsumerTooSlow" ->
676676- send_consumer_too_slow ()
677677- | _ ->
678678- Lwt.return_unit
679679- else
680680- (* live tail *)
681681- let rec loop last =
711711+ ( if ring_covers then
712712+ Lwt_list.iter_s (fun (it : Bus.item) -> send it.bytes) ring
713713+ else
714714+ let%lwt events =
715715+ DB.request_seq_range ~earliest_seq:cursor ~latest_seq:cutoff
716716+ ~limit:1000 conn
717717+ in
718718+ Lwt_list.iter_s
719719+ (fun ev ->
720720+ match ev.kind with
721721+ | Error _ ->
722722+ Lwt.return_unit
723723+ | Message (payload, _) ->
724724+ send
725725+ (Frame.encode_message ~seq:ev.seq ~time:ev.time payload) )
726726+ events )
727727+ >>= fun () ->
728728+ (* bail if consumer too slow *)
682729 if sub.Bus.closed then
683730 match sub.Bus.close_reason with
684731 | Some "ConsumerTooSlow" ->
685685- send_consumer_too_slow ()
732732+ send_consumer_too_slow ~send
686733 | _ ->
687734 Lwt.return_unit
688688- else
689689- Lwt.catch
690690- (fun () ->
691691- let%lwt it = Bus.wait_next sub in
692692- if it.seq <= last then loop last
693693- else if it.seq > last + 1 then
694694- let%lwt gap =
695695- DB.request_seq_range ~earliest_seq:last ~latest_seq:it.seq
696696- ~limit:1000 conn
697697- in
698698- let%lwt () =
699699- Lwt_list.iter_s
700700- (fun ev ->
701701- if ev.seq <= last then Lwt.return_unit
702702- else
703703- send
704704- ( match ev.kind with
705705- | Message (m, _) ->
706706- Frame.encode_message ~seq:ev.seq ~time:ev.time
707707- m
708708- | Error e ->
709709- Frame.encode_error e ) )
710710- gap
711711- in
712712- send it.bytes >>= fun () -> loop it.seq
713713- else send it.bytes >>= fun () -> loop it.seq )
714714- (fun _exn ->
715715- (* check if any failure was due to slow consumer *)
716716- match sub.Bus.close_reason with
717717- | Some "ConsumerTooSlow" ->
718718- send_consumer_too_slow ()
719719- | _ ->
720720- Lwt.return_unit )
721721- in
722722- loop cutoff )
723723- (fun () -> Bus.unsubscribe sub)
735735+ else live_loop ~conn ~sub ~send ~start_seq:cutoff )
736736+ (fun () -> Bus.unsubscribe sub)
724737end
725738726739let sequence_commit (conn : Data_store.t) ~(did : string) ~(commit : Cid.t)
+2-2
pegasus/test/test_sequencer.ml
···107107 in
108108 let stream =
109109 Lwt.catch
110110- (fun () -> Sequencer.Live.stream_with_backfill ~conn ~cursor:0 ~send)
110110+ (fun () -> Sequencer.Live.stream_with_backfill ~conn ~cursor:(Some 0) ~send)
111111 (fun _ -> Lwt.return_unit)
112112 in
113113 let _ = Lwt.async (fun () -> stream) in
···162162 Lwt.async (fun () ->
163163 Lwt.catch
164164 (fun () ->
165165- Sequencer.Live.stream_with_backfill ~conn ~cursor:0 ~send )
165165+ Sequencer.Live.stream_with_backfill ~conn ~cursor:(Some 0) ~send )
166166 (fun _ -> Lwt.return_unit) )
167167 in
168168 (* after stream starts, directly insert identity row with seq=3, then publish identity for seq=4 via bus *)