type waiter = { promise : string option Eio.Promise.t; resolver : string option Eio.Promise.u; } type t = { table : (int, waiter) Kcas_data.Hashtbl.t } let create () = { table = Kcas_data.Hashtbl.create () } let register t ~seq = let promise, resolver = Eio.Promise.create () in let w = { promise; resolver } in Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.replace ~xt t.table seq w) }; w let complete t ~seq ~payload = let found = Kcas.Xt.commit { tx = (fun ~xt -> match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table seq with | None -> None | Some w -> Kcas_data.Hashtbl.Xt.remove ~xt t.table seq; Some w); } in match found with | None -> false | Some w -> Eio.Promise.resolve w.resolver payload; true let wait w ~timeout ~clock = try Some (Eio.Time.with_timeout_exn clock timeout (fun () -> Eio.Promise.await w.promise)) with Eio.Time.Timeout -> None let cancel t ~seq = Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.remove ~xt t.table seq) } let pending_count t = Kcas_data.Hashtbl.length t.table