swim protocol in ocaml interoperable with membership lib and serf cli
at main 2.6 kB view raw
1open Types 2 3type item = { 4 msg : protocol_msg; 5 transmits : int Kcas.Loc.t; 6 created : Mtime.span; 7} 8 9type t = { queue : item Kcas_data.Queue.t; depth : int Kcas.Loc.t } 10 11let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 } 12 13let enqueue t msg ~transmits ~created ~limit = 14 let item = { msg; transmits = Kcas.Loc.make transmits; created } in 15 Kcas.Xt.commit 16 { 17 tx = 18 (fun ~xt -> 19 let d = Kcas.Xt.get ~xt t.depth in 20 if d >= limit then ignore (Kcas_data.Queue.Xt.take_opt ~xt t.queue) 21 else Kcas.Xt.set ~xt t.depth (d + 1); 22 Kcas_data.Queue.Xt.add ~xt item t.queue); 23 } 24 25let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) } 26 27let drain t ~max_bytes ~encode_size = 28 let rec loop acc bytes_used = 29 let result = 30 Kcas.Xt.commit 31 { 32 tx = 33 (fun ~xt -> 34 match Kcas_data.Queue.Xt.take_opt ~xt t.queue with 35 | None -> `Done (List.rev acc) 36 | Some item -> 37 let msg_size = encode_size item.msg in 38 if bytes_used + msg_size > max_bytes && acc <> [] then begin 39 Kcas_data.Queue.Xt.add ~xt item t.queue; 40 `Done (List.rev acc) 41 end 42 else begin 43 let remaining = Kcas.Xt.get ~xt item.transmits - 1 in 44 if remaining > 0 then begin 45 Kcas.Xt.set ~xt item.transmits remaining; 46 Kcas_data.Queue.Xt.add ~xt item t.queue 47 end 48 else Kcas.Xt.modify ~xt t.depth pred; 49 `Continue (item.msg, msg_size) 50 end); 51 } 52 in 53 match result with 54 | `Done msgs -> msgs 55 | `Continue (msg, msg_size) -> loop (msg :: acc) (bytes_used + msg_size) 56 in 57 loop [] 0 58 59let invalidate t ~invalidates newer_msg = 60 let items = 61 Kcas.Xt.commit 62 { tx = (fun ~xt -> Kcas_data.Queue.Xt.to_seq ~xt t.queue |> List.of_seq) } 63 in 64 let valid_items, removed_count = 65 List.fold_left 66 (fun (valid, removed) item -> 67 if invalidates ~newer:newer_msg ~older:item.msg then (valid, removed + 1) 68 else (item :: valid, removed)) 69 ([], 0) items 70 in 71 if removed_count > 0 then begin 72 Kcas.Xt.commit 73 { 74 tx = 75 (fun ~xt -> 76 Kcas_data.Queue.Xt.clear ~xt t.queue; 77 List.iter 78 (fun item -> Kcas_data.Queue.Xt.add ~xt item t.queue) 79 (List.rev valid_items); 80 Kcas.Xt.modify ~xt t.depth (fun d -> d - removed_count)); 81 } 82 end