swim protocol in ocaml interoperable with membership lib and serf cli

fix(gossip): bound broadcast queue to prevent memory leaks

- Add max_gossip_queue_depth to config (default 5000)
- Enforce limit in Dissemination.enqueue by dropping oldest items (ring buffer)
- Prevents unbounded memory growth when broadcast rate exceeds gossip bandwidth

+5 -3
lib/dissemination.ml
··· 10 10 11 11 let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 } 12 12 13 - let enqueue t msg ~transmits ~created = 13 + let enqueue t msg ~transmits ~created ~limit = 14 14 let item = { msg; transmits = Kcas.Loc.make transmits; created } in 15 15 Kcas.Xt.commit 16 16 { 17 17 tx = 18 18 (fun ~xt -> 19 - Kcas_data.Queue.Xt.add ~xt item t.queue; 20 - Kcas.Xt.modify ~xt t.depth succ); 19 + let d = Kcas.Xt.get ~xt t.depth in 20 + if d >= limit then ignore (Kcas_data.Queue.Xt.take_opt ~xt t.queue) 21 + else Kcas.Xt.set ~xt t.depth (d + 1); 22 + Kcas_data.Queue.Xt.add ~xt item t.queue); 21 23 } 22 24 23 25 let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) }
+4 -1
lib/dissemination.mli
··· 9 9 type t 10 10 11 11 val create : unit -> t 12 - val enqueue : t -> protocol_msg -> transmits:int -> created:Mtime.span -> unit 12 + 13 + val enqueue : 14 + t -> protocol_msg -> transmits:int -> created:Mtime.span -> limit:int -> unit 15 + 13 16 val depth : t -> int 14 17 15 18 val drain :
+2 -1
lib/protocol.ml
··· 94 94 Protocol_pure.retransmit_limit t.config 95 95 ~node_count:(Membership.count t.members) 96 96 in 97 - Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t); 97 + Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t) 98 + ~limit:t.config.max_gossip_queue_depth; 98 99 Dissemination.invalidate t.broadcast_queue 99 100 ~invalidates:Protocol_pure.invalidates msg 100 101
+2
lib/types.ml
··· 116 116 encryption_enabled : bool; 117 117 gossip_verify_incoming : bool; 118 118 gossip_verify_outgoing : bool; 119 + max_gossip_queue_depth : int; 119 120 } 120 121 121 122 let default_config = ··· 139 140 encryption_enabled = false; 140 141 gossip_verify_incoming = true; 141 142 gossip_verify_outgoing = true; 143 + max_gossip_queue_depth = 5000; 142 144 } 143 145 144 146 type 'a env = {
+1
lib/types.mli
··· 92 92 encryption_enabled : bool; 93 93 gossip_verify_incoming : bool; 94 94 gossip_verify_outgoing : bool; 95 + max_gossip_queue_depth : int; 95 96 } 96 97 97 98 val default_config : config
+3 -1
test/generators.ml
··· 198 198 and+ label = oneof [ return ""; gen_topic ] 199 199 and+ encryption_enabled = bool 200 200 and+ gossip_verify_incoming = bool 201 - and+ gossip_verify_outgoing = bool in 201 + and+ gossip_verify_outgoing = bool 202 + and+ max_gossip_queue_depth = int_range 10 10000 in 202 203 { 203 204 bind_addr; 204 205 bind_port; ··· 219 220 encryption_enabled; 220 221 gossip_verify_incoming; 221 222 gossip_verify_outgoing; 223 + max_gossip_queue_depth; 222 224 } 223 225 224 226 let gen_decode_error : decode_error QCheck.Gen.t =