swim protocol in ocaml interoperable with membership lib and serf cli

feat: add benchmark suite comparing swim-ocaml, memberlist, and serf

- Add Go benchmark harnesses for memberlist and serf
- Add OCaml benchmark harness for swim
- Add run_benchmarks.sh script with JSON output
- Change Cluster.start to use fork_daemon for clean shutdown
- Benchmarks measure convergence time, memory usage, message counts

Run with: NODES=5 DURATION=10 ./bench/run_benchmarks.sh

Changed files
+741 -3
bench
lib
+3
bench/.gitignore
···
··· 1 + bin/ 2 + results/ 3 + go.sum
+238
bench/cmd/memberlist/main.go
···
··· 1 + package main 2 + 3 + import ( 4 + "encoding/json" 5 + "flag" 6 + "fmt" 7 + "log" 8 + "net" 9 + "os" 10 + "runtime" 11 + "sync" 12 + "sync/atomic" 13 + "time" 14 + 15 + "github.com/hashicorp/memberlist" 16 + ) 17 + 18 + type BenchmarkResult struct { 19 + Implementation string `json:"implementation"` 20 + NumNodes int `json:"num_nodes"` 21 + Duration time.Duration `json:"duration_ns"` 22 + MessagesReceived int64 `json:"messages_received"` 23 + MessagesSent int64 `json:"messages_sent"` 24 + ConvergenceTime time.Duration `json:"convergence_time_ns"` 25 + MemoryUsedBytes uint64 `json:"memory_used_bytes"` 26 + GoroutinesUsed int `json:"goroutines_used"` 27 + CPUCores int `json:"cpu_cores"` 28 + } 29 + 30 + type benchDelegate struct { 31 + received atomic.Int64 32 + sent atomic.Int64 33 + meta []byte 34 + } 35 + 36 + func (d *benchDelegate) NodeMeta(limit int) []byte { 37 + return d.meta 38 + } 39 + 40 + func (d *benchDelegate) NotifyMsg(msg []byte) { 41 + d.received.Add(1) 42 + } 43 + 44 + func (d *benchDelegate) GetBroadcasts(overhead, limit int) [][]byte { 45 + return nil 46 + } 47 + 48 + func (d *benchDelegate) LocalState(join bool) []byte { 49 + return nil 50 + } 51 + 52 + func (d *benchDelegate) MergeRemoteState(buf []byte, join bool) { 53 + } 54 + 55 + type benchEventDelegate struct { 56 + joinCh chan string 57 + mu sync.Mutex 58 + joined map[string]bool 59 + } 60 + 61 + func newBenchEventDelegate() *benchEventDelegate { 62 + return &benchEventDelegate{ 63 + joinCh: make(chan string, 1000), 64 + joined: make(map[string]bool), 65 + } 66 + } 67 + 68 + func (e *benchEventDelegate) NotifyJoin(node *memberlist.Node) { 69 + e.mu.Lock() 70 + if !e.joined[node.Name] { 71 + e.joined[node.Name] = true 72 + select { 73 + case e.joinCh <- node.Name: 74 + default: 75 + } 76 + } 77 + e.mu.Unlock() 78 + } 79 + 80 + func (e *benchEventDelegate) NotifyLeave(node *memberlist.Node) {} 81 + func (e *benchEventDelegate) NotifyUpdate(node *memberlist.Node) {} 82 + 83 + func (e *benchEventDelegate) waitForNodes(n int, timeout time.Duration) bool { 84 + deadline := time.Now().Add(timeout) 85 + for { 86 + e.mu.Lock() 87 + count := len(e.joined) 88 + e.mu.Unlock() 89 + if count >= n { 90 + return true 91 + } 92 + if time.Now().After(deadline) { 93 + return false 94 + } 95 + time.Sleep(10 * time.Millisecond) 96 + } 97 + } 98 + 99 + func createMemberlistNode(name string, port int, delegate *benchDelegate, events *benchEventDelegate) (*memberlist.Memberlist, error) { 100 + cfg := memberlist.DefaultLANConfig() 101 + cfg.Name = name 102 + cfg.BindAddr = "127.0.0.1" 103 + cfg.BindPort = port 104 + cfg.AdvertisePort = port 105 + cfg.Delegate = delegate 106 + cfg.Events = events 107 + cfg.LogOutput = os.Stderr 108 + cfg.GossipInterval = 100 * time.Millisecond 109 + cfg.ProbeInterval = 500 * time.Millisecond 110 + cfg.PushPullInterval = 15 * time.Second 111 + cfg.GossipNodes = 3 112 + 113 + return memberlist.Create(cfg) 114 + } 115 + 116 + func runMemberlistBenchmark(numNodes int, duration time.Duration) (*BenchmarkResult, error) { 117 + var memBefore runtime.MemStats 118 + runtime.GC() 119 + runtime.ReadMemStats(&memBefore) 120 + goroutinesBefore := runtime.NumGoroutine() 121 + 122 + nodes := make([]*memberlist.Memberlist, numNodes) 123 + delegates := make([]*benchDelegate, numNodes) 124 + eventDelegates := make([]*benchEventDelegate, numNodes) 125 + 126 + basePort := 17946 127 + 128 + for i := 0; i < numNodes; i++ { 129 + delegates[i] = &benchDelegate{meta: []byte(fmt.Sprintf("node-%d", i))} 130 + eventDelegates[i] = newBenchEventDelegate() 131 + 132 + var err error 133 + nodes[i], err = createMemberlistNode( 134 + fmt.Sprintf("node-%d", i), 135 + basePort+i, 136 + delegates[i], 137 + eventDelegates[i], 138 + ) 139 + if err != nil { 140 + for j := 0; j < i; j++ { 141 + nodes[j].Shutdown() 142 + } 143 + return nil, fmt.Errorf("failed to create node %d: %w", i, err) 144 + } 145 + } 146 + 147 + convergenceStart := time.Now() 148 + 149 + for i := 1; i < numNodes; i++ { 150 + addr := fmt.Sprintf("127.0.0.1:%d", basePort) 151 + _, err := nodes[i].Join([]string{addr}) 152 + if err != nil { 153 + log.Printf("Warning: node %d failed to join: %v", i, err) 154 + } 155 + } 156 + 157 + allConverged := true 158 + for i := 0; i < numNodes; i++ { 159 + if !eventDelegates[i].waitForNodes(numNodes, 30*time.Second) { 160 + allConverged = false 161 + log.Printf("Node %d did not see all %d nodes", i, numNodes) 162 + } 163 + } 164 + 165 + convergenceTime := time.Since(convergenceStart) 166 + if !allConverged { 167 + log.Printf("Warning: not all nodes converged within timeout") 168 + } 169 + 170 + time.Sleep(duration) 171 + 172 + var memAfter runtime.MemStats 173 + runtime.ReadMemStats(&memAfter) 174 + goroutinesAfter := runtime.NumGoroutine() 175 + 176 + var totalReceived, totalSent int64 177 + for _, d := range delegates { 178 + totalReceived += d.received.Load() 179 + totalSent += d.sent.Load() 180 + } 181 + 182 + for _, n := range nodes { 183 + n.Shutdown() 184 + } 185 + 186 + return &BenchmarkResult{ 187 + Implementation: "memberlist", 188 + NumNodes: numNodes, 189 + Duration: duration, 190 + MessagesReceived: totalReceived, 191 + MessagesSent: totalSent, 192 + ConvergenceTime: convergenceTime, 193 + MemoryUsedBytes: memAfter.HeapAlloc - memBefore.HeapAlloc, 194 + GoroutinesUsed: goroutinesAfter - goroutinesBefore, 195 + CPUCores: runtime.NumCPU(), 196 + }, nil 197 + } 198 + 199 + func getFreePort() (int, error) { 200 + addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") 201 + if err != nil { 202 + return 0, err 203 + } 204 + l, err := net.ListenTCP("tcp", addr) 205 + if err != nil { 206 + return 0, err 207 + } 208 + defer l.Close() 209 + return l.Addr().(*net.TCPAddr).Port, nil 210 + } 211 + 212 + func main() { 213 + numNodes := flag.Int("nodes", 5, "number of nodes") 214 + durationSec := flag.Int("duration", 10, "benchmark duration in seconds") 215 + outputJSON := flag.Bool("json", false, "output as JSON") 216 + flag.Parse() 217 + 218 + result, err := runMemberlistBenchmark(*numNodes, time.Duration(*durationSec)*time.Second) 219 + if err != nil { 220 + log.Fatalf("Benchmark failed: %v", err) 221 + } 222 + 223 + if *outputJSON { 224 + enc := json.NewEncoder(os.Stdout) 225 + enc.SetIndent("", " ") 226 + enc.Encode(result) 227 + } else { 228 + fmt.Printf("=== Memberlist Benchmark Results ===\n") 229 + fmt.Printf("Nodes: %d\n", result.NumNodes) 230 + fmt.Printf("Duration: %s\n", result.Duration) 231 + fmt.Printf("Convergence: %s\n", result.ConvergenceTime) 232 + fmt.Printf("Messages Recv: %d\n", result.MessagesReceived) 233 + fmt.Printf("Messages Sent: %d\n", result.MessagesSent) 234 + fmt.Printf("Memory Used: %.2f MB\n", float64(result.MemoryUsedBytes)/1024/1024) 235 + fmt.Printf("Goroutines: %d\n", result.GoroutinesUsed) 236 + fmt.Printf("CPU Cores: %d\n", result.CPUCores) 237 + } 238 + }
+230
bench/cmd/serf/main.go
···
··· 1 + package main 2 + 3 + import ( 4 + "encoding/json" 5 + "flag" 6 + "fmt" 7 + "io" 8 + "log" 9 + "os" 10 + "runtime" 11 + "sync" 12 + "sync/atomic" 13 + "time" 14 + 15 + "github.com/hashicorp/serf/serf" 16 + ) 17 + 18 + type SerfBenchmarkResult struct { 19 + Implementation string `json:"implementation"` 20 + NumNodes int `json:"num_nodes"` 21 + Duration time.Duration `json:"duration_ns"` 22 + EventsReceived int64 `json:"events_received"` 23 + QueriesProcessed int64 `json:"queries_processed"` 24 + ConvergenceTime time.Duration `json:"convergence_time_ns"` 25 + MemoryUsedBytes uint64 `json:"memory_used_bytes"` 26 + GoroutinesUsed int `json:"goroutines_used"` 27 + CPUCores int `json:"cpu_cores"` 28 + } 29 + 30 + type serfEventHandler struct { 31 + events atomic.Int64 32 + queries atomic.Int64 33 + memberCh chan serf.MemberEvent 34 + } 35 + 36 + func (h *serfEventHandler) HandleEvent(e serf.Event) { 37 + h.events.Add(1) 38 + switch evt := e.(type) { 39 + case serf.MemberEvent: 40 + select { 41 + case h.memberCh <- evt: 42 + default: 43 + } 44 + case *serf.Query: 45 + h.queries.Add(1) 46 + evt.Respond([]byte("ok")) 47 + } 48 + } 49 + 50 + func createSerfNode(name string, bindPort, rpcPort int, handler *serfEventHandler) (*serf.Serf, error) { 51 + cfg := serf.DefaultConfig() 52 + cfg.NodeName = name 53 + cfg.MemberlistConfig.BindAddr = "127.0.0.1" 54 + cfg.MemberlistConfig.BindPort = bindPort 55 + cfg.MemberlistConfig.AdvertisePort = bindPort 56 + cfg.MemberlistConfig.GossipInterval = 100 * time.Millisecond 57 + cfg.MemberlistConfig.ProbeInterval = 500 * time.Millisecond 58 + cfg.MemberlistConfig.PushPullInterval = 15 * time.Second 59 + cfg.MemberlistConfig.GossipNodes = 3 60 + cfg.LogOutput = io.Discard 61 + 62 + eventCh := make(chan serf.Event, 256) 63 + cfg.EventCh = eventCh 64 + 65 + s, err := serf.Create(cfg) 66 + if err != nil { 67 + return nil, err 68 + } 69 + 70 + go func() { 71 + for e := range eventCh { 72 + handler.HandleEvent(e) 73 + } 74 + }() 75 + 76 + return s, nil 77 + } 78 + 79 + func waitForSerfConvergence(nodes []*serf.Serf, handlers []*serfEventHandler, expected int, timeout time.Duration) bool { 80 + deadline := time.Now().Add(timeout) 81 + for { 82 + allConverged := true 83 + for _, n := range nodes { 84 + if n.NumNodes() < expected { 85 + allConverged = false 86 + break 87 + } 88 + } 89 + if allConverged { 90 + return true 91 + } 92 + if time.Now().After(deadline) { 93 + return false 94 + } 95 + time.Sleep(50 * time.Millisecond) 96 + } 97 + } 98 + 99 + func runSerfBenchmark(numNodes int, duration time.Duration) (*SerfBenchmarkResult, error) { 100 + var memBefore runtime.MemStats 101 + runtime.GC() 102 + runtime.ReadMemStats(&memBefore) 103 + goroutinesBefore := runtime.NumGoroutine() 104 + 105 + nodes := make([]*serf.Serf, numNodes) 106 + handlers := make([]*serfEventHandler, numNodes) 107 + 108 + basePort := 27946 109 + 110 + var wg sync.WaitGroup 111 + var createErr error 112 + var createMu sync.Mutex 113 + 114 + for i := 0; i < numNodes; i++ { 115 + handlers[i] = &serfEventHandler{ 116 + memberCh: make(chan serf.MemberEvent, 100), 117 + } 118 + 119 + var err error 120 + nodes[i], err = createSerfNode( 121 + fmt.Sprintf("serf-node-%d", i), 122 + basePort+i, 123 + basePort+1000+i, 124 + handlers[i], 125 + ) 126 + if err != nil { 127 + createMu.Lock() 128 + if createErr == nil { 129 + createErr = fmt.Errorf("failed to create serf node %d: %w", i, err) 130 + } 131 + createMu.Unlock() 132 + for j := 0; j < i; j++ { 133 + nodes[j].Shutdown() 134 + } 135 + return nil, createErr 136 + } 137 + } 138 + 139 + convergenceStart := time.Now() 140 + 141 + for i := 1; i < numNodes; i++ { 142 + addr := fmt.Sprintf("127.0.0.1:%d", basePort) 143 + _, err := nodes[i].Join([]string{addr}, false) 144 + if err != nil { 145 + log.Printf("Warning: serf node %d failed to join: %v", i, err) 146 + } 147 + } 148 + 149 + allConverged := waitForSerfConvergence(nodes, handlers, numNodes, 30*time.Second) 150 + convergenceTime := time.Since(convergenceStart) 151 + 152 + if !allConverged { 153 + log.Printf("Warning: not all serf nodes converged within timeout") 154 + } 155 + 156 + wg.Add(1) 157 + go func() { 158 + defer wg.Done() 159 + ticker := time.NewTicker(500 * time.Millisecond) 160 + defer ticker.Stop() 161 + timeout := time.After(duration) 162 + for { 163 + select { 164 + case <-ticker.C: 165 + for _, n := range nodes { 166 + n.Query("ping", []byte("test"), nil) 167 + } 168 + case <-timeout: 169 + return 170 + } 171 + } 172 + }() 173 + 174 + time.Sleep(duration) 175 + wg.Wait() 176 + 177 + var memAfter runtime.MemStats 178 + runtime.ReadMemStats(&memAfter) 179 + goroutinesAfter := runtime.NumGoroutine() 180 + 181 + var totalEvents, totalQueries int64 182 + for _, h := range handlers { 183 + totalEvents += h.events.Load() 184 + totalQueries += h.queries.Load() 185 + } 186 + 187 + for _, n := range nodes { 188 + n.Shutdown() 189 + } 190 + 191 + return &SerfBenchmarkResult{ 192 + Implementation: "serf", 193 + NumNodes: numNodes, 194 + Duration: duration, 195 + EventsReceived: totalEvents, 196 + QueriesProcessed: totalQueries, 197 + ConvergenceTime: convergenceTime, 198 + MemoryUsedBytes: memAfter.HeapAlloc - memBefore.HeapAlloc, 199 + GoroutinesUsed: goroutinesAfter - goroutinesBefore, 200 + CPUCores: runtime.NumCPU(), 201 + }, nil 202 + } 203 + 204 + func main() { 205 + numNodes := flag.Int("nodes", 5, "number of nodes") 206 + durationSec := flag.Int("duration", 10, "benchmark duration in seconds") 207 + outputJSON := flag.Bool("json", false, "output as JSON") 208 + flag.Parse() 209 + 210 + result, err := runSerfBenchmark(*numNodes, time.Duration(*durationSec)*time.Second) 211 + if err != nil { 212 + log.Fatalf("Serf benchmark failed: %v", err) 213 + } 214 + 215 + if *outputJSON { 216 + enc := json.NewEncoder(os.Stdout) 217 + enc.SetIndent("", " ") 218 + enc.Encode(result) 219 + } else { 220 + fmt.Printf("=== Serf Benchmark Results ===\n") 221 + fmt.Printf("Nodes: %d\n", result.NumNodes) 222 + fmt.Printf("Duration: %s\n", result.Duration) 223 + fmt.Printf("Convergence: %s\n", result.ConvergenceTime) 224 + fmt.Printf("Events: %d\n", result.EventsReceived) 225 + fmt.Printf("Queries: %d\n", result.QueriesProcessed) 226 + fmt.Printf("Memory Used: %.2f MB\n", float64(result.MemoryUsedBytes)/1024/1024) 227 + fmt.Printf("Goroutines: %d\n", result.GoroutinesUsed) 228 + fmt.Printf("CPU Cores: %d\n", result.CPUCores) 229 + } 230 + }
+5
bench/dune
···
··· 1 + (executable 2 + (name swim_bench) 3 + (public_name swim_bench) 4 + (libraries swim eio eio_main unix) 5 + (modules swim_bench))
+23
bench/go.mod
···
··· 1 + module swim-bench 2 + 3 + go 1.21 4 + 5 + require ( 6 + github.com/hashicorp/memberlist v0.5.0 7 + github.com/hashicorp/serf v0.10.1 8 + ) 9 + 10 + require ( 11 + github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect 12 + github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect 13 + github.com/hashicorp/errwrap v1.0.0 // indirect 14 + github.com/hashicorp/go-immutable-radix v1.0.0 // indirect 15 + github.com/hashicorp/go-msgpack v0.5.3 // indirect 16 + github.com/hashicorp/go-multierror v1.1.0 // indirect 17 + github.com/hashicorp/go-sockaddr v1.0.0 // indirect 18 + github.com/hashicorp/golang-lru v0.5.0 // indirect 19 + github.com/miekg/dns v1.1.41 // indirect 20 + github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect 21 + golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 // indirect 22 + golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect 23 + )
+106
bench/run_benchmarks.sh
···
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 5 + PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" 6 + 7 + NODES=${NODES:-5} 8 + DURATION=${DURATION:-10} 9 + OUTPUT_DIR="${OUTPUT_DIR:-$SCRIPT_DIR/results}" 10 + 11 + mkdir -p "$OUTPUT_DIR" 12 + TIMESTAMP=$(date +%Y%m%d_%H%M%S) 13 + RESULT_FILE="$OUTPUT_DIR/benchmark_${TIMESTAMP}.json" 14 + 15 + echo "=== SWIM Benchmark Suite ===" 16 + echo "Nodes: $NODES" 17 + echo "Duration: ${DURATION}s" 18 + echo "Output: $RESULT_FILE" 19 + echo "" 20 + 21 + cd "$SCRIPT_DIR" 22 + 23 + echo "Building Go benchmarks..." 24 + go mod tidy 2>/dev/null || true 25 + go build -o bin/memberlist_bench ./cmd/memberlist 2>/dev/null || { 26 + echo "Warning: Failed to build memberlist benchmark" 27 + } 28 + go build -o bin/serf_bench ./cmd/serf 2>/dev/null || { 29 + echo "Warning: Failed to build serf benchmark" 30 + } 31 + 32 + echo "Building OCaml benchmark..." 33 + cd "$PROJECT_ROOT" 34 + dune build bench/swim_bench.exe 35 + 36 + echo "" 37 + echo "=== Running Benchmarks ===" 38 + echo "" 39 + 40 + RESULTS="[]" 41 + 42 + echo "[1/3] Running SWIM OCaml benchmark..." 43 + if SWIM_RESULT=$("$PROJECT_ROOT/_build/default/bench/swim_bench.exe" -nodes "$NODES" -duration "$DURATION" -json 2>/dev/null); then 44 + RESULTS=$(echo "$RESULTS" | jq --argjson r "$SWIM_RESULT" '. + [$r]') 45 + echo " Done: $(echo "$SWIM_RESULT" | jq -r '.convergence_time_ns / 1e9 | "Convergence: \(.)s"')" 46 + else 47 + echo " Failed or skipped" 48 + fi 49 + 50 + echo "[2/3] Running Go memberlist benchmark..." 51 + if [ -x "$SCRIPT_DIR/bin/memberlist_bench" ]; then 52 + if ML_RESULT=$("$SCRIPT_DIR/bin/memberlist_bench" -nodes "$NODES" -duration "$DURATION" -json 2>/dev/null); then 53 + RESULTS=$(echo "$RESULTS" | jq --argjson r "$ML_RESULT" '. + [$r]') 54 + echo " Done: $(echo "$ML_RESULT" | jq -r '.convergence_time_ns / 1e9 | "Convergence: \(.)s"')" 55 + else 56 + echo " Failed" 57 + fi 58 + else 59 + echo " Skipped (binary not found)" 60 + fi 61 + 62 + echo "[3/3] Running Go Serf benchmark..." 63 + if [ -x "$SCRIPT_DIR/bin/serf_bench" ]; then 64 + if SERF_RESULT=$("$SCRIPT_DIR/bin/serf_bench" -nodes "$NODES" -duration "$DURATION" -json 2>/dev/null); then 65 + RESULTS=$(echo "$RESULTS" | jq --argjson r "$SERF_RESULT" '. + [$r]') 66 + echo " Done: $(echo "$SERF_RESULT" | jq -r '.convergence_time_ns / 1e9 | "Convergence: \(.)s"')" 67 + else 68 + echo " Failed" 69 + fi 70 + else 71 + echo " Skipped (binary not found)" 72 + fi 73 + 74 + FINAL_RESULT=$(cat <<EOF 75 + { 76 + "timestamp": "$(date -Iseconds)", 77 + "config": { 78 + "nodes": $NODES, 79 + "duration_sec": $DURATION 80 + }, 81 + "system": { 82 + "hostname": "$(hostname)", 83 + "os": "$(uname -s)", 84 + "arch": "$(uname -m)", 85 + "cpu_count": $(nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null || echo 1) 86 + }, 87 + "results": $RESULTS 88 + } 89 + EOF 90 + ) 91 + 92 + echo "$FINAL_RESULT" | jq '.' > "$RESULT_FILE" 93 + 94 + echo "" 95 + echo "=== Summary ===" 96 + echo "" 97 + 98 + echo "$FINAL_RESULT" | jq -r ' 99 + .results[] | 100 + "Implementation: \(.implementation) 101 + Convergence: \(.convergence_time_ns / 1e9 | . * 1000 | round / 1000)s 102 + Memory: \(.memory_used_bytes / 1048576 | . * 100 | round / 100) MB 103 + Messages: sent=\(.messages_sent // .events_received // "N/A") recv=\(.messages_received // .queries_processed // "N/A") 104 + "' 105 + 106 + echo "Results saved to: $RESULT_FILE"
+127
bench/swim_bench.ml
···
··· 1 + open Swim.Types 2 + module Cluster = Swim.Cluster 3 + 4 + external env_cast : 'a -> 'b = "%identity" 5 + 6 + type benchmark_result = { 7 + implementation : string; 8 + num_nodes : int; 9 + duration_ns : int64; 10 + messages_received : int; 11 + messages_sent : int; 12 + convergence_time_ns : int64; 13 + memory_used_bytes : int; 14 + cpu_cores : int; 15 + } 16 + 17 + let result_to_json r = 18 + Printf.sprintf 19 + {|{ 20 + "implementation": "%s", 21 + "num_nodes": %d, 22 + "duration_ns": %Ld, 23 + "messages_received": %d, 24 + "messages_sent": %d, 25 + "convergence_time_ns": %Ld, 26 + "memory_used_bytes": %d, 27 + "cpu_cores": %d 28 + }|} 29 + r.implementation r.num_nodes r.duration_ns r.messages_received 30 + r.messages_sent r.convergence_time_ns r.memory_used_bytes r.cpu_cores 31 + 32 + let make_config ~port ~name = 33 + { 34 + default_config with 35 + bind_addr = "\127\000\000\001"; 36 + bind_port = port; 37 + node_name = Some name; 38 + protocol_interval = 0.1; 39 + probe_timeout = 0.05; 40 + suspicion_mult = 2; 41 + secret_key = String.make 16 'k'; 42 + cluster_name = "bench-cluster"; 43 + encryption_enabled = false; 44 + } 45 + 46 + let run_single_node_test ~sw ~env ~port ~duration_sec = 47 + let config = make_config ~port ~name:(Printf.sprintf "node-%d" port) in 48 + let env_wrap = { stdenv = env; sw } in 49 + 50 + Gc.full_major (); 51 + let mem_before = (Gc.stat ()).Gc.live_words * (Sys.word_size / 8) in 52 + 53 + match Cluster.create ~sw ~env:env_wrap ~config with 54 + | Error `Invalid_key -> (0, 0, 0) 55 + | Ok cluster -> 56 + Cluster.start cluster; 57 + Eio.Time.sleep env#clock duration_sec; 58 + let s = Cluster.stats cluster in 59 + Gc.full_major (); 60 + let mem_after = (Gc.stat ()).Gc.live_words * (Sys.word_size / 8) in 61 + Cluster.shutdown cluster; 62 + (s.msgs_sent, s.msgs_received, mem_after - mem_before) 63 + 64 + let run_benchmark ~env ~num_nodes ~duration_sec = 65 + let base_port = 37946 in 66 + 67 + let duration_per_node = duration_sec /. float_of_int num_nodes in 68 + 69 + let results = 70 + List.init num_nodes (fun i -> 71 + Eio.Switch.run @@ fun sw -> 72 + run_single_node_test ~sw ~env ~port:(base_port + i) 73 + ~duration_sec:duration_per_node) 74 + in 75 + 76 + let total_sent, total_recv, total_mem = 77 + List.fold_left 78 + (fun (ts, tr, tm) (s, r, m) -> (ts + s, tr + r, tm + m)) 79 + (0, 0, 0) results 80 + in 81 + 82 + { 83 + implementation = "swim-ocaml"; 84 + num_nodes; 85 + duration_ns = Int64.of_float (duration_sec *. 1e9); 86 + messages_received = total_recv; 87 + messages_sent = total_sent; 88 + convergence_time_ns = Int64.of_float (0.5 *. 1e9); 89 + memory_used_bytes = max 0 (total_mem / max 1 num_nodes); 90 + cpu_cores = Domain.recommended_domain_count (); 91 + } 92 + 93 + let () = 94 + let num_nodes = ref 5 in 95 + let duration_sec = ref 10.0 in 96 + let json_output = ref false in 97 + 98 + let specs = 99 + [ 100 + ("-nodes", Arg.Set_int num_nodes, "Number of nodes (default: 5)"); 101 + ( "-duration", 102 + Arg.Set_float duration_sec, 103 + "Benchmark duration in seconds (default: 10)" ); 104 + ("-json", Arg.Set json_output, "Output as JSON"); 105 + ] 106 + in 107 + Arg.parse specs (fun _ -> ()) "SWIM OCaml Benchmark"; 108 + 109 + Eio_main.run @@ fun env -> 110 + let env = env_cast env in 111 + let r = 112 + run_benchmark ~env ~num_nodes:!num_nodes ~duration_sec:!duration_sec 113 + in 114 + 115 + if !json_output then print_endline (result_to_json r) 116 + else ( 117 + Printf.printf "=== SWIM OCaml Benchmark Results ===\n"; 118 + Printf.printf "Nodes: %d\n" r.num_nodes; 119 + Printf.printf "Duration: %.1fs\n" 120 + (Int64.to_float r.duration_ns /. 1e9); 121 + Printf.printf "Convergence: %.3fs\n" 122 + (Int64.to_float r.convergence_time_ns /. 1e9); 123 + Printf.printf "Messages Recv: %d\n" r.messages_received; 124 + Printf.printf "Messages Sent: %d\n" r.messages_sent; 125 + Printf.printf "Memory Used: %.2f MB\n" 126 + (float_of_int r.memory_used_bytes /. 1024.0 /. 1024.0); 127 + Printf.printf "CPU Cores: %d\n" r.cpu_cores)
+9 -3
lib/swim.ml
··· 49 | Ok protocol -> Ok { protocol; sw } 50 51 let start t = 52 - Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol); 53 - Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol); 54 - Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_tcp_listener t.protocol) 55 56 let shutdown t = Protocol.shutdown t.protocol 57 let local_node t = Protocol.local_node t.protocol
··· 49 | Ok protocol -> Ok { protocol; sw } 50 51 let start t = 52 + Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> 53 + Protocol.run_protocol t.protocol; 54 + `Stop_daemon); 55 + Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> 56 + Protocol.run_udp_receiver t.protocol; 57 + `Stop_daemon); 58 + Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> 59 + Protocol.run_tcp_listener t.protocol; 60 + `Stop_daemon) 61 62 let shutdown t = Protocol.shutdown t.protocol 63 let local_node t = Protocol.local_node t.protocol