type topic = string type message = string type subscription = { id : int; topic : topic; callback : message -> unit } type t = { topics : (topic, subscription list) Kcas_data.Hashtbl.t; next_id : int Atomic.t; } let create () = { topics = Kcas_data.Hashtbl.create (); next_id = Atomic.make 0 } let subscribe t topic callback = let id = Atomic.fetch_and_add t.next_id 1 in let sub = { id; topic; callback } in let subs = match Kcas_data.Hashtbl.find_opt t.topics topic with | Some existing -> sub :: existing | None -> [ sub ] in Kcas_data.Hashtbl.replace t.topics topic subs; sub let unsubscribe t sub = match Kcas_data.Hashtbl.find_opt t.topics sub.topic with | None -> () | Some subs -> let remaining = List.filter (fun s -> s.id <> sub.id) subs in if remaining = [] then Kcas_data.Hashtbl.remove t.topics sub.topic else Kcas_data.Hashtbl.replace t.topics sub.topic remaining let broadcast t topic msg = match Kcas_data.Hashtbl.find_opt t.topics topic with | None -> () | Some subs -> List.iter (fun sub -> sub.callback msg) subs let subscriber_count t topic = match Kcas_data.Hashtbl.find_opt t.topics topic with | None -> 0 | Some subs -> List.length subs let topics t = Kcas_data.Hashtbl.to_seq_keys t.topics |> List.of_seq