ocaml http/1, http/2 and websocket client and server library
at main 1.3 kB view raw
1type topic = string 2type message = string 3type subscription = { id : int; topic : topic; callback : message -> unit } 4 5type t = { 6 topics : (topic, subscription list) Kcas_data.Hashtbl.t; 7 next_id : int Atomic.t; 8} 9 10let create () = 11 { topics = Kcas_data.Hashtbl.create (); next_id = Atomic.make 0 } 12 13let subscribe t topic callback = 14 let id = Atomic.fetch_and_add t.next_id 1 in 15 let sub = { id; topic; callback } in 16 let subs = 17 match Kcas_data.Hashtbl.find_opt t.topics topic with 18 | Some existing -> sub :: existing 19 | None -> [ sub ] 20 in 21 Kcas_data.Hashtbl.replace t.topics topic subs; 22 sub 23 24let unsubscribe t sub = 25 match Kcas_data.Hashtbl.find_opt t.topics sub.topic with 26 | None -> () 27 | Some subs -> 28 let remaining = List.filter (fun s -> s.id <> sub.id) subs in 29 if remaining = [] then Kcas_data.Hashtbl.remove t.topics sub.topic 30 else Kcas_data.Hashtbl.replace t.topics sub.topic remaining 31 32let broadcast t topic msg = 33 match Kcas_data.Hashtbl.find_opt t.topics topic with 34 | None -> () 35 | Some subs -> List.iter (fun sub -> sub.callback msg) subs 36 37let subscriber_count t topic = 38 match Kcas_data.Hashtbl.find_opt t.topics topic with 39 | None -> 0 40 | Some subs -> List.length subs 41 42let topics t = Kcas_data.Hashtbl.to_seq_keys t.topics |> List.of_seq