ocaml http/1, http/2 and websocket client and server library
1type topic = string
2type message = string
3type subscription = { id : int; topic : topic; callback : message -> unit }
4
5type t = {
6 topics : (topic, subscription list) Kcas_data.Hashtbl.t;
7 next_id : int Atomic.t;
8}
9
10let create () =
11 { topics = Kcas_data.Hashtbl.create (); next_id = Atomic.make 0 }
12
13let subscribe t topic callback =
14 let id = Atomic.fetch_and_add t.next_id 1 in
15 let sub = { id; topic; callback } in
16 let subs =
17 match Kcas_data.Hashtbl.find_opt t.topics topic with
18 | Some existing -> sub :: existing
19 | None -> [ sub ]
20 in
21 Kcas_data.Hashtbl.replace t.topics topic subs;
22 sub
23
24let unsubscribe t sub =
25 match Kcas_data.Hashtbl.find_opt t.topics sub.topic with
26 | None -> ()
27 | Some subs ->
28 let remaining = List.filter (fun s -> s.id <> sub.id) subs in
29 if remaining = [] then Kcas_data.Hashtbl.remove t.topics sub.topic
30 else Kcas_data.Hashtbl.replace t.topics sub.topic remaining
31
32let broadcast t topic msg =
33 match Kcas_data.Hashtbl.find_opt t.topics topic with
34 | None -> ()
35 | Some subs -> List.iter (fun sub -> sub.callback msg) subs
36
37let subscriber_count t topic =
38 match Kcas_data.Hashtbl.find_opt t.topics topic with
39 | None -> 0
40 | Some subs -> List.length subs
41
42let topics t = Kcas_data.Hashtbl.to_seq_keys t.topics |> List.of_seq