swim protocol in ocaml interoperable with membership lib and serf cli

Implement SWIM protocol library core modules

Core library implementation with 11 modules:
- types.ml/mli: Core immutable types (node_id, incarnation, member_state, protocol_msg, packet, config, env, stats)
- codec.ml: Zero-copy binary encoding/decoding with magic bytes, version, message tags
- crypto.ml: AES-256-GCM encryption using Mirage_crypto
- buffer_pool.ml/mli: Lock-free buffer pool using Kcas_data.Queue + Eio.Semaphore
- protocol_pure.ml/mli: Pure SWIM state transitions (handle_alive/suspect/dead, suspicion_timeout, etc)
- membership.ml/mli: Kcas-based member table with Kcas_data.Hashtbl
- dissemination.ml/mli: Broadcast queue with transmit counting and message invalidation
- pending_acks.ml/mli: Ack tracking with Eio.Promise for async waiting
- transport.ml: UDP/TCP networking with Eio.Net
- protocol.ml: Main protocol loop with message handlers and probe cycles
- swim.ml: Public API with Cluster module

Design constraints followed:
- Eio only (no Lwt/Async/Mutex/Condition)
- All coordination via kcas (no locks)
- Result types for expected errors (no exceptions)
- Zero-copy buffer management

+39
.beads/.gitignore
··· 1 + # SQLite databases 2 + *.db 3 + *.db?* 4 + *.db-journal 5 + *.db-wal 6 + *.db-shm 7 + 8 + # Daemon runtime files 9 + daemon.lock 10 + daemon.log 11 + daemon.pid 12 + bd.sock 13 + sync-state.json 14 + last-touched 15 + 16 + # Local version tracking (prevents upgrade notification spam after git ops) 17 + .local_version 18 + 19 + # Legacy database files 20 + db.sqlite 21 + bd.db 22 + 23 + # Worktree redirect file (contains relative path to main repo's .beads/) 24 + # Must not be committed as paths would be wrong in other clones 25 + redirect 26 + 27 + # Merge artifacts (temporary files from 3-way merge) 28 + beads.base.jsonl 29 + beads.base.meta.json 30 + beads.left.jsonl 31 + beads.left.meta.json 32 + beads.right.jsonl 33 + beads.right.meta.json 34 + 35 + # NOTE: Do NOT add negation patterns (e.g., !issues.jsonl) here. 36 + # They would override fork protection in .git/info/exclude, allowing 37 + # contributors to accidentally commit upstream issue databases. 38 + # The JSONL files (issues.jsonl, interactions.jsonl) and config files 39 + # are tracked by git by default since no pattern above ignores them.
+81
.beads/README.md
··· 1 + # Beads - AI-Native Issue Tracking 2 + 3 + Welcome to Beads! This repository uses **Beads** for issue tracking - a modern, AI-native tool designed to live directly in your codebase alongside your code. 4 + 5 + ## What is Beads? 6 + 7 + Beads is issue tracking that lives in your repo, making it perfect for AI coding agents and developers who want their issues close to their code. No web UI required - everything works through the CLI and integrates seamlessly with git. 8 + 9 + **Learn more:** [github.com/steveyegge/beads](https://github.com/steveyegge/beads) 10 + 11 + ## Quick Start 12 + 13 + ### Essential Commands 14 + 15 + ```bash 16 + # Create new issues 17 + bd create "Add user authentication" 18 + 19 + # View all issues 20 + bd list 21 + 22 + # View issue details 23 + bd show <issue-id> 24 + 25 + # Update issue status 26 + bd update <issue-id> --status in_progress 27 + bd update <issue-id> --status done 28 + 29 + # Sync with git remote 30 + bd sync 31 + ``` 32 + 33 + ### Working with Issues 34 + 35 + Issues in Beads are: 36 + - **Git-native**: Stored in `.beads/issues.jsonl` and synced like code 37 + - **AI-friendly**: CLI-first design works perfectly with AI coding agents 38 + - **Branch-aware**: Issues can follow your branch workflow 39 + - **Always in sync**: Auto-syncs with your commits 40 + 41 + ## Why Beads? 42 + 43 + ✨ **AI-Native Design** 44 + - Built specifically for AI-assisted development workflows 45 + - CLI-first interface works seamlessly with AI coding agents 46 + - No context switching to web UIs 47 + 48 + 🚀 **Developer Focused** 49 + - Issues live in your repo, right next to your code 50 + - Works offline, syncs when you push 51 + - Fast, lightweight, and stays out of your way 52 + 53 + 🔧 **Git Integration** 54 + - Automatic sync with git commits 55 + - Branch-aware issue tracking 56 + - Intelligent JSONL merge resolution 57 + 58 + ## Get Started with Beads 59 + 60 + Try Beads in your own projects: 61 + 62 + ```bash 63 + # Install Beads 64 + curl -sSL https://raw.githubusercontent.com/steveyegge/beads/main/scripts/install.sh | bash 65 + 66 + # Initialize in your repo 67 + bd init 68 + 69 + # Create your first issue 70 + bd create "Try out Beads" 71 + ``` 72 + 73 + ## Learn More 74 + 75 + - **Documentation**: [github.com/steveyegge/beads/docs](https://github.com/steveyegge/beads/tree/main/docs) 76 + - **Quick Start Guide**: Run `bd quickstart` 77 + - **Examples**: [github.com/steveyegge/beads/examples](https://github.com/steveyegge/beads/tree/main/examples) 78 + 79 + --- 80 + 81 + *Beads: Issue tracking that moves at the speed of thought* ⚡
+62
.beads/config.yaml
··· 1 + # Beads Configuration File 2 + # This file configures default behavior for all bd commands in this repository 3 + # All settings can also be set via environment variables (BD_* prefix) 4 + # or overridden with command-line flags 5 + 6 + # Issue prefix for this repository (used by bd init) 7 + # If not set, bd init will auto-detect from directory name 8 + # Example: issue-prefix: "myproject" creates issues like "myproject-1", "myproject-2", etc. 9 + # issue-prefix: "" 10 + 11 + # Use no-db mode: load from JSONL, no SQLite, write back after each command 12 + # When true, bd will use .beads/issues.jsonl as the source of truth 13 + # instead of SQLite database 14 + # no-db: false 15 + 16 + # Disable daemon for RPC communication (forces direct database access) 17 + # no-daemon: false 18 + 19 + # Disable auto-flush of database to JSONL after mutations 20 + # no-auto-flush: false 21 + 22 + # Disable auto-import from JSONL when it's newer than database 23 + # no-auto-import: false 24 + 25 + # Enable JSON output by default 26 + # json: false 27 + 28 + # Default actor for audit trails (overridden by BD_ACTOR or --actor) 29 + # actor: "" 30 + 31 + # Path to database (overridden by BEADS_DB or --db) 32 + # db: "" 33 + 34 + # Auto-start daemon if not running (can also use BEADS_AUTO_START_DAEMON) 35 + # auto-start-daemon: true 36 + 37 + # Debounce interval for auto-flush (can also use BEADS_FLUSH_DEBOUNCE) 38 + # flush-debounce: "5s" 39 + 40 + # Git branch for beads commits (bd sync will commit to this branch) 41 + # IMPORTANT: Set this for team projects so all clones use the same sync branch. 42 + # This setting persists across clones (unlike database config which is gitignored). 43 + # Can also use BEADS_SYNC_BRANCH env var for local override. 44 + # If not set, bd sync will require you to run 'bd config set sync.branch <branch>'. 45 + # sync-branch: "beads-sync" 46 + 47 + # Multi-repo configuration (experimental - bd-307) 48 + # Allows hydrating from multiple repositories and routing writes to the correct JSONL 49 + # repos: 50 + # primary: "." # Primary repo (where this database lives) 51 + # additional: # Additional repos to hydrate from (read-only) 52 + # - ~/beads-planning # Personal planning repo 53 + # - ~/work-planning # Work planning repo 54 + 55 + # Integration settings (access with 'bd config get/set') 56 + # These are stored in the database, not in this file: 57 + # - jira.url 58 + # - jira.project 59 + # - linear.url 60 + # - linear.api-key 61 + # - github.org 62 + # - github.repo
.beads/interactions.jsonl

This is a binary file and will not be displayed.

+20
.beads/issues.jsonl
··· 1 + {"id":"swim-294","title":"Implement test generators (test/generators.ml)","description":"Create QCheck generators for property-based testing.\n\n## Generators to implement\n\n### Basic types\n- `gen_node_id : node_id QCheck.Gen.t`\n- `gen_incarnation : incarnation QCheck.Gen.t`\n- `gen_member_state : member_state QCheck.Gen.t`\n\n### Node types\n- `gen_node_info : node_info QCheck.Gen.t`\n - Generate valid addresses\n - Random metadata strings\n\n### Protocol messages\n- `gen_ping : protocol_msg QCheck.Gen.t`\n- `gen_ping_req : protocol_msg QCheck.Gen.t`\n- `gen_ack : protocol_msg QCheck.Gen.t`\n- `gen_alive : protocol_msg QCheck.Gen.t`\n- `gen_suspect : protocol_msg QCheck.Gen.t`\n- `gen_dead : protocol_msg QCheck.Gen.t`\n- `gen_user_msg : protocol_msg QCheck.Gen.t`\n- `gen_protocol_msg : protocol_msg QCheck.Gen.t` (uniform choice)\n\n### Packets\n- `gen_packet : packet QCheck.Gen.t`\n - Valid cluster names\n - Primary + piggyback messages\n\n### Binary data\n- `gen_cstruct : Cstruct.t QCheck.Gen.t`\n - Various sizes\n\n### Arbitrary instances\n- `arb_*` wrappers with shrinkers where useful\n\n## Design constraints\n- Use QCheck.Gen combinators\n- Generate valid data by construction\n- Include edge cases (empty strings, max values)","acceptance_criteria":"- All message types have generators\n- Generators produce valid data\n- Good distribution of test cases","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-08T18:49:22.04090675+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:49:22.04090675+01:00","labels":["qcheck","test"],"dependencies":[{"issue_id":"swim-294","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:49:22.044472866+01:00","created_by":"gdiazlo"},{"issue_id":"swim-294","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:49:26.910584411+01:00","created_by":"gdiazlo"}]} 2 + {"id":"swim-461","title":"Implement crypto tests (test/test_crypto.ml)","description":"Property-based and unit tests for crypto module.\n\n## Property tests\n\n### Roundtrip\n- `test_crypto_roundtrip` - encrypt then decrypt equals original\n- Test with various data sizes\n\n### Key validation\n- `test_invalid_key_length_rejected`\n- Test 31, 32, 33 byte keys\n\n## Unit tests\n\n### Encryption\n- Test output size = input size + overhead (28 bytes)\n- Test nonce is prepended\n- Test different plaintexts produce different ciphertexts\n\n### Decryption\n- Test successful decryption\n- Test tampered ciphertext fails\n- Test truncated data fails\n- Test wrong key fails\n\n### Key initialization\n- Test valid 32-byte key\n- Test invalid lengths rejected\n\n## Security tests\n- Verify nonces are unique (probabilistic)\n- Verify ciphertext differs from plaintext\n\n## Design constraints\n- Use QCheck for property tests\n- Test all error paths\n- Don't expose key material in errors","acceptance_criteria":"- All property tests pass\n- All unit tests pass\n- Security properties verified\n- Error handling tested","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-08T18:49:51.401236876+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:49:51.401236876+01:00","labels":["crypto","security","test"],"dependencies":[{"issue_id":"swim-461","depends_on_id":"swim-hc9","type":"blocks","created_at":"2026-01-08T18:49:51.404483911+01:00","created_by":"gdiazlo"},{"issue_id":"swim-461","depends_on_id":"swim-294","type":"blocks","created_at":"2026-01-08T18:49:51.405793127+01:00","created_by":"gdiazlo"},{"issue_id":"swim-461","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:49:56.45969199+01:00","created_by":"gdiazlo"}]} 3 + {"id":"swim-90e","title":"Implement transport.ml - Eio UDP/TCP networking","description":"Implement network transport layer using Eio.\n\n## UDP Transport\n\n### Functions\n- `create_udp_socket : Eio.Net.t -\u003e addr:string -\u003e port:int -\u003e Eio.Net.datagram_socket`\n- `send_udp : Eio.Net.datagram_socket -\u003e Eio.Net.Sockaddr.datagram -\u003e Cstruct.t -\u003e unit`\n- `recv_udp : Eio.Net.datagram_socket -\u003e Cstruct.t -\u003e (int * Eio.Net.Sockaddr.datagram)`\n\n## TCP Transport (for large payloads)\n\n### Functions\n- `create_tcp_listener : Eio.Net.t -\u003e addr:string -\u003e port:int -\u003e Eio.Net.listening_socket`\n- `connect_tcp : Eio.Net.t -\u003e addr:Eio.Net.Sockaddr.stream -\u003e timeout:float -\u003e clock:Eio.Time.clock -\u003e (Eio.Net.stream_socket, send_error) result`\n- `send_tcp : Eio.Net.stream_socket -\u003e Cstruct.t -\u003e (unit, send_error) result`\n- `recv_tcp : Eio.Net.stream_socket -\u003e Cstruct.t -\u003e (int, [`Connection_reset]) result`\n\n## Address parsing\n- `parse_addr : string -\u003e (Eio.Net.Sockaddr.datagram, [`Invalid_addr]) result`\n - Parse \"host:port\" format\n\n## Design constraints\n- Use Eio.Net for all I/O\n- No blocking except Eio primitives\n- Proper error handling via Result\n- Support for IPv4 and IPv6","acceptance_criteria":"- UDP send/recv works\n- TCP connect/send/recv works\n- Proper error handling\n- Address parsing robust","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:48:09.296035344+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:39:34.082898832+01:00","closed_at":"2026-01-08T19:39:34.082898832+01:00","close_reason":"Implemented UDP and TCP transport with Eio.Net, plus address parsing (mli skipped due to complex Eio row types)","labels":["core","eio","transport"],"dependencies":[{"issue_id":"swim-90e","depends_on_id":"swim-oun","type":"blocks","created_at":"2026-01-08T18:48:09.299855321+01:00","created_by":"gdiazlo"},{"issue_id":"swim-90e","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:48:15.52111057+01:00","created_by":"gdiazlo"}]} 4 + {"id":"swim-don","title":"Implement benchmarks (bench/)","description":"Performance benchmarks for critical paths.\n\n## bench/bench_codec.ml\n- `bench_encode_ping` - encoding a Ping message\n- `bench_encode_packet` - full packet with piggyback\n- `bench_decode_packet` - decoding a packet\n- `bench_encoded_size` - size calculation\n\n## bench/bench_crypto.ml\n- `bench_encrypt` - encryption throughput\n- `bench_decrypt` - decryption throughput\n- `bench_key_init` - key initialization\n\n## bench/bench_throughput.ml\n- `bench_broadcast_throughput` - messages/second\n- `bench_probe_cycle` - probe cycle latency\n- `bench_concurrent_probes` - parallel probe handling\n\n## bench/bench_allocations.ml\n- `bench_probe_cycle_allocations` - count allocations per probe\n- `bench_buffer_reuse_rate` - % of buffers reused\n- `bench_message_handling_allocations` - allocations per message\n\n## Performance targets to verify\n- \u003c 5 allocations per probe cycle\n- \u003e 95% buffer reuse rate\n- \u003c 3 seconds failure detection\n- \u003e 10,000 broadcast/sec\n- \u003c 1% CPU idle, \u003c 5% under load\n\n## Design constraints\n- Use core_bench or similar\n- Warm up before measuring\n- Multiple iterations for stability\n- Report with confidence intervals","acceptance_criteria":"- All benchmarks run\n- Performance targets documented\n- Regression detection possible\n- Results reproducible","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-08T18:50:57.818433013+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:50:57.818433013+01:00","labels":["bench","performance"],"dependencies":[{"issue_id":"swim-don","depends_on_id":"swim-zsi","type":"blocks","created_at":"2026-01-08T18:50:57.821397737+01:00","created_by":"gdiazlo"},{"issue_id":"swim-don","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:51:03.066326187+01:00","created_by":"gdiazlo"}]} 5 + {"id":"swim-etm","title":"Implement pending_acks.ml - Ack tracking with promises","description":"Implement pending ack tracking for probe responses.\n\n## Pending_acks module\n```ocaml\ntype waiter = {\n promise : string option Eio.Promise.t;\n resolver : string option Eio.Promise.u;\n}\n\ntype t = {\n table : (int, waiter) Kcas_data.Hashtbl.t;\n}\n```\n\n### Functions\n- `create : unit -\u003e t`\n\n- `register : t -\u003e seq:int -\u003e waiter`\n - Create promise/resolver pair\n - Store in hashtable keyed by sequence number\n - Return waiter handle\n\n- `complete : t -\u003e seq:int -\u003e payload:string option -\u003e bool`\n - Find waiter by seq\n - Resolve promise with payload\n - Remove from table\n - Return true if found\n\n- `wait : waiter -\u003e timeout:float -\u003e clock:Eio.Time.clock -\u003e string option option`\n - Wait for promise with timeout\n - Return Some payload on success\n - Return None on timeout\n\n- `cancel : t -\u003e seq:int -\u003e unit`\n - Remove waiter from table\n - Called on timeout to cleanup\n\n## Design constraints\n- Use Eio.Promise for async waiting\n- Use Eio.Time.with_timeout for timeouts\n- Lock-free via Kcas_data.Hashtbl\n- Cleanup on timeout to prevent leaks","acceptance_criteria":"- Acks properly matched to probes\n- Timeouts work correctly\n- No memory leaks on timeout\n- Concurrent completion safe","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:47:51.390307674+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:35:56.984403853+01:00","closed_at":"2026-01-08T19:35:56.984403853+01:00","close_reason":"Implemented pending_acks with Eio.Promise for async waiting and Kcas_data.Hashtbl for lock-free storage","labels":["core","kcas","protocol"],"dependencies":[{"issue_id":"swim-etm","depends_on_id":"swim-oun","type":"blocks","created_at":"2026-01-08T18:47:51.394677184+01:00","created_by":"gdiazlo"},{"issue_id":"swim-etm","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:47:57.657173744+01:00","created_by":"gdiazlo"}]} 6 + {"id":"swim-fac","title":"Implement protocol_pure.ml - Pure SWIM state transitions","description":"Implement pure (no effects) SWIM protocol logic for state transitions.\n\n## Core abstraction\n```ocaml\ntype 'a transition = {\n new_state : 'a;\n broadcasts : protocol_msg list;\n events : node_event list;\n}\n```\n\n## State transition functions\n- `handle_alive : member_state -\u003e alive_msg -\u003e now:float -\u003e member_state transition`\n- `handle_suspect : member_state -\u003e suspect_msg -\u003e now:float -\u003e member_state transition`\n- `handle_dead : member_state -\u003e dead_msg -\u003e now:float -\u003e member_state transition`\n- `handle_ack : probe_state -\u003e ack_msg -\u003e probe_state transition`\n\n## Timeout calculations\n- `suspicion_timeout : config -\u003e node_count:int -\u003e float`\n - Based on suspicion_mult and log(node_count)\n - Capped by suspicion_max_timeout\n\n## Probe target selection\n- `next_probe_target : probe_index:int -\u003e members:node list -\u003e (node * int) option`\n - Round-robin with wraparound\n - Skip self\n\n## Message invalidation (for queue pruning)\n- `invalidates : protocol_msg -\u003e protocol_msg -\u003e bool`\n - Alive invalidates Suspect for same node with \u003e= incarnation\n - Dead invalidates everything for same node\n - Suspect invalidates older Suspect\n\n## State merging\n- `merge_member_state : local:member_state -\u003e remote:member_state -\u003e member_state`\n - CRDT-style merge based on incarnation\n - Dead is final (tombstone)\n - Higher incarnation wins\n\n## Retransmit calculation\n- `retransmit_limit : config -\u003e node_count:int -\u003e int`\n - Based on retransmit_mult * ceil(log(node_count + 1))\n\n## Design constraints\n- PURE functions only - no I/O, no time, no randomness\n- All inputs explicit\n- Exhaustive pattern matching\n- Fully testable with property-based tests","acceptance_criteria":"- All functions are pure (no effects)\n- Property-based tests for SWIM invariants\n- Incarnation ordering correct\n- Suspicion timeout formula matches SWIM paper","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:46:48.400928801+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:29:29.816719466+01:00","closed_at":"2026-01-08T19:29:29.816719466+01:00","close_reason":"Implemented all pure SWIM state transitions: handle_alive, handle_suspect, handle_dead, suspicion_timeout, retransmit_limit, next_probe_target, invalidates, merge_member_state, select_indirect_targets","labels":["core","protocol","pure"],"dependencies":[{"issue_id":"swim-fac","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:46:48.40501031+01:00","created_by":"gdiazlo"},{"issue_id":"swim-fac","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:46:52.770706917+01:00","created_by":"gdiazlo"}]} 7 + {"id":"swim-hc9","title":"Implement crypto.ml - AES-256-GCM encryption","description":"Implement encryption layer using mirage-crypto for AES-256-GCM.\n\n## Constants\n- `nonce_size = 12`\n- `tag_size = 16`\n- `overhead = nonce_size + tag_size` (28 bytes)\n\n## Functions\n\n### Key initialization\n- `init_key : string -\u003e (key, [`Invalid_key_length]) result`\n- Must be exactly 32 bytes for AES-256\n\n### Encryption\n- `encrypt : key -\u003e Cstruct.t -\u003e Cstruct.t`\n- Generate random nonce via mirage-crypto-rng\n- Prepend nonce to ciphertext\n- Result: nonce (12) + ciphertext + tag (16)\n\n### Decryption\n- `decrypt : key -\u003e Cstruct.t -\u003e (Cstruct.t, [`Too_short | `Decryption_failed]) result`\n- Extract nonce from first 12 bytes\n- Verify and decrypt remaining data\n- Return plaintext or error\n\n## Design constraints\n- Use mirage-crypto.Cipher_block.AES.GCM\n- Use mirage-crypto-rng for nonce generation\n- Return Result types, no exceptions\n- Consider in-place decryption where possible","acceptance_criteria":"- Property-based roundtrip tests pass\n- Invalid data properly rejected\n- Key validation works\n- Nonces are unique (use RNG)","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:46:09.946405585+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:24:49.736202746+01:00","closed_at":"2026-01-08T19:24:49.736202746+01:00","close_reason":"Implemented crypto.ml with AES-256-GCM using mirage-crypto. Uses Eio.Flow for secure random nonce generation.","labels":["core","crypto","security"],"dependencies":[{"issue_id":"swim-hc9","depends_on_id":"swim-oun","type":"blocks","created_at":"2026-01-08T18:46:09.950083952+01:00","created_by":"gdiazlo"},{"issue_id":"swim-hc9","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:46:14.608204384+01:00","created_by":"gdiazlo"}]} 8 + {"id":"swim-iwg","title":"Implement dissemination.ml - Broadcast queue with invalidation","description":"Implement the broadcast queue for SWIM protocol message dissemination.\n\n## Broadcast_queue module\n```ocaml\ntype item = {\n msg : protocol_msg;\n transmits : int Kcas.Loc.t;\n created : Mtime.span;\n}\n\ntype t = {\n queue : item Kcas_data.Queue.t;\n depth : int Kcas.Loc.t;\n}\n```\n\n### Functions\n- `create : unit -\u003e t`\n\n- `enqueue : t -\u003e protocol_msg -\u003e transmits:int -\u003e created:Mtime.span -\u003e unit`\n - Add message with initial transmit count\n - Increment depth\n\n- `drain : t -\u003e max_bytes:int -\u003e encode_size:(protocol_msg -\u003e int) -\u003e protocol_msg list`\n - Pop messages up to max_bytes\n - Decrement transmit count\n - Re-enqueue if transmits \u003e 0\n - Return list of messages to piggyback\n\n- `depth : t -\u003e int`\n\n- `invalidate : t -\u003e invalidates:(protocol_msg -\u003e protocol_msg -\u003e bool) -\u003e protocol_msg -\u003e unit`\n - Remove messages invalidated by newer message\n - Uses Protocol_pure.invalidates\n\n## Design constraints\n- Lock-free via Kcas_data.Queue\n- Transmit counting for reliable dissemination\n- Size-aware draining for UDP packet limits\n- Message invalidation to prune stale updates","acceptance_criteria":"- Messages properly disseminated\n- Transmit counts respected\n- Invalidation works correctly\n- No message loss during concurrent access","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:47:32.926237507+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:34:04.973053383+01:00","closed_at":"2026-01-08T19:34:04.973053383+01:00","close_reason":"Implemented broadcast queue with enqueue, drain (size-aware), and invalidate functions using Kcas_data.Queue","labels":["core","dissemination","kcas"],"dependencies":[{"issue_id":"swim-iwg","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:47:32.933998652+01:00","created_by":"gdiazlo"},{"issue_id":"swim-iwg","depends_on_id":"swim-fac","type":"blocks","created_at":"2026-01-08T18:47:32.93580631+01:00","created_by":"gdiazlo"},{"issue_id":"swim-iwg","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:47:40.222942145+01:00","created_by":"gdiazlo"}]} 9 + {"id":"swim-l32","title":"Implement codec tests (test/test_codec.ml)","description":"Property-based and unit tests for codec module.\n\n## Property tests\n\n### Roundtrip\n- `test_codec_roundtrip` - encode then decode equals original\n- `test_encoder_decoder_roundtrip` - for primitive types\n\n### Size calculation\n- `test_encoded_size_accurate` - encoded_size matches actual encoding\n\n### Error handling\n- `test_invalid_magic_rejected`\n- `test_unsupported_version_rejected`\n- `test_truncated_message_rejected`\n- `test_invalid_tag_rejected`\n\n## Unit tests\n\n### Encoder\n- Test write_byte, write_int16_be, etc.\n- Test write_string with various lengths\n- Test buffer overflow detection\n\n### Decoder\n- Test read operations\n- Test remaining/is_empty\n- Test boundary conditions\n\n### Message encoding\n- Test each message type individually\n- Test packet with piggyback messages\n- Test empty piggyback list\n\n## Design constraints\n- Use QCheck for property tests\n- Use Alcotest or similar for unit tests\n- Cover all message types\n- Test error paths","acceptance_criteria":"- All property tests pass\n- All unit tests pass\n- Edge cases covered\n- Error handling tested","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-08T18:49:38.017959466+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:49:38.017959466+01:00","labels":["codec","test"],"dependencies":[{"issue_id":"swim-l32","depends_on_id":"swim-l5y","type":"blocks","created_at":"2026-01-08T18:49:38.021527282+01:00","created_by":"gdiazlo"},{"issue_id":"swim-l32","depends_on_id":"swim-294","type":"blocks","created_at":"2026-01-08T18:49:38.02331756+01:00","created_by":"gdiazlo"},{"issue_id":"swim-l32","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:49:42.065502393+01:00","created_by":"gdiazlo"}]} 10 + {"id":"swim-l5y","title":"Implement codec.ml - Zero-copy binary encoding/decoding","description":"Implement binary encoding/decoding with zero-copy semantics using Cstruct.\n\n## Components\n\n### Encoder module\n- `type t` with buf and mutable pos\n- `create : buf:Cstruct.t -\u003e t`\n- `write_byte`, `write_int16_be`, `write_int32_be`, `write_int64_be`\n- `write_string` (length-prefixed)\n- `write_bytes`\n- `to_cstruct` - returns view, no copy\n- `reset`, `remaining`\n\n### Decoder module\n- `type t` with buf and mutable pos\n- `create : Cstruct.t -\u003e t`\n- `read_byte`, `read_int16_be`, `read_int32_be`, `read_int64_be`\n- `read_string` - returns string (must copy for safety)\n- `read_bytes` - returns Cstruct view\n- `remaining`, `is_empty`\n\n### Codec module\n- Magic bytes: \"SWIM\"\n- Version: 1\n- Message tags: 0x01-0x07 for each message type\n- `encode_packet : packet -\u003e buf:Cstruct.t -\u003e (int, [`Buffer_too_small]) result`\n- `decode_packet : Cstruct.t -\u003e packet decode_result`\n- `encoded_size : protocol_msg -\u003e int` for queue draining\n\n### Helper encoders\n- `encode_node`, `encode_node_id`\n- `encode_option`\n- `decode_msg`\n\n## Design constraints\n- No allocations in hot path except unavoidable string creation\n- Return Result types, no exceptions\n- Use Cstruct sub-views where possible","acceptance_criteria":"- Property-based roundtrip tests pass\n- No unnecessary allocations\n- All message types encode/decode correctly\n- Error handling for truncated/invalid data","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:45:54.407900731+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:23:12.726852552+01:00","closed_at":"2026-01-08T19:23:12.726852552+01:00","close_reason":"Implemented codec.ml with Encoder/Decoder modules, zero-copy encoding/decoding for all protocol messages, IP address parsing, and encoded_size calculation","labels":["codec","core","zero-copy"],"dependencies":[{"issue_id":"swim-l5y","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:45:54.412742463+01:00","created_by":"gdiazlo"},{"issue_id":"swim-l5y","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:45:59.779010836+01:00","created_by":"gdiazlo"}]} 11 + {"id":"swim-oll","title":"Implement membership.ml - Kcas-based member table","description":"Implement lock-free membership state management using kcas and kcas_data.\n\n## Member module\n```ocaml\ntype t = {\n node : node_info; (* Immutable *)\n state : member_state Kcas.Loc.t;\n incarnation : incarnation Kcas.Loc.t;\n state_change_time : Mtime.span Kcas.Loc.t;\n last_ack_time : Mtime.span Kcas.Loc.t;\n}\n```\n\n### Functions\n- `create : node_info -\u003e t`\n- `node : t -\u003e node_info` (pure accessor)\n- `get_state`, `get_incarnation`, `get_last_ack` (kcas reads)\n- `set_alive`, `set_suspect`, `set_dead` with `~xt:Kcas.Xt.t`\n- `record_ack : t -\u003e now:Mtime.span -\u003e xt:Kcas.Xt.t -\u003e unit`\n- `snapshot : t -\u003e xt:Kcas.Xt.t -\u003e member_snapshot`\n\n## Membership module\n```ocaml\ntype t = {\n table : (string, Member.t) Kcas_data.Hashtbl.t;\n count : int Kcas.Loc.t;\n}\n```\n\n### Functions\n- `create : unit -\u003e t`\n- `add : t -\u003e Member.t -\u003e unit`\n- `remove : t -\u003e node_id -\u003e unit` (returns bool for success)\n- `find : t -\u003e node_id -\u003e Member.t option`\n- `mem : t -\u003e node_id -\u003e bool`\n- `to_list : t -\u003e Member.t list` (snapshot)\n- `count : t -\u003e int`\n- `update_member : t -\u003e node_id -\u003e (Member.t -\u003e xt:Kcas.Xt.t -\u003e unit) -\u003e bool`\n\n## Design constraints\n- All state via kcas locations\n- Use Kcas_data.Hashtbl for lock-free hashtable\n- Transactional updates via Kcas.Xt.commit\n- No I/O inside transactions\n- Short transactions only","acceptance_criteria":"- Lock-free operations work correctly\n- Concurrent access safe\n- Atomic state transitions\n- Snapshot consistency","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:47:11.022624275+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:33:07.449792483+01:00","closed_at":"2026-01-08T19:33:07.449792483+01:00","close_reason":"Implemented Member module with kcas locations and Membership table with Kcas_data.Hashtbl","labels":["core","kcas","membership"],"dependencies":[{"issue_id":"swim-oll","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:47:11.047048045+01:00","created_by":"gdiazlo"},{"issue_id":"swim-oll","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:47:20.00544253+01:00","created_by":"gdiazlo"}]} 12 + {"id":"swim-oun","title":"Project setup: dune-project, opam, dependencies","description":"Set up the project structure and dependencies for the SWIM library.\n\n## Tasks\n1. Update dune-project with proper metadata and dependencies\n2. Configure swim.opam with all required dependencies:\n - eio (\u003e= 1.0)\n - kcas (\u003e= 0.7)\n - kcas_data (\u003e= 0.7)\n - mirage-crypto\n - mirage-crypto-rng\n - cstruct\n - qcheck (for testing)\n3. Create lib/dune with proper library configuration\n4. Create test/dune for test configuration\n5. Create bench/dune for benchmarks (optional initially)\n6. Verify project builds with `dune build`","acceptance_criteria":"- dune build succeeds\n- opam install . --deps-only works\n- All dependencies available","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:45:16.711747605+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:13:17.972217465+01:00","closed_at":"2026-01-08T19:13:17.972217465+01:00","close_reason":"Project setup complete: dune-project, lib/dune, test/dune configured. Build and tests pass.","labels":["infrastructure","setup"],"dependencies":[{"issue_id":"swim-oun","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:45:20.330948173+01:00","created_by":"gdiazlo"}]} 13 + {"id":"swim-szx","title":"Implement kcas data structure tests (test/test_kcas.ml)","description":"Concurrent correctness tests for kcas-based data structures.\n\n## Buffer_pool tests\n- `test_buffer_pool_no_leaks` - all acquired buffers released\n- `test_buffer_pool_concurrent` - multiple fibers acquiring/releasing\n- `test_with_buffer_exception_safe` - buffer released on exception\n\n## Membership tests\n- `test_membership_concurrent_add_remove` - no lost updates\n- `test_membership_snapshot_consistency` - to_list is consistent\n- `test_membership_count_accurate` - count matches actual\n\n## Broadcast_queue tests\n- `test_broadcast_queue_fifo` - messages dequeued in order\n- `test_broadcast_queue_transmit_counting` - transmits decremented correctly\n- `test_broadcast_queue_invalidation` - old messages pruned\n- `test_broadcast_queue_concurrent` - concurrent enqueue/drain safe\n\n## Pending_acks tests\n- `test_pending_acks_complete` - ack resolves waiter\n- `test_pending_acks_timeout` - timeout returns None\n- `test_pending_acks_cancel` - cancel removes waiter\n- `test_pending_acks_concurrent` - multiple pending acks\n\n## Transactional tests\n- `test_atomic_member_update` - multi-location update is atomic\n- `test_transaction_retry` - conflicting transactions retry\n\n## Design constraints\n- Use Eio for concurrency\n- Test with multiple domains if possible\n- Verify linearizability properties","acceptance_criteria":"- All concurrent tests pass\n- No race conditions\n- Atomicity verified\n- Stress tests pass","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-08T18:50:25.944980162+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:50:25.944980162+01:00","labels":["concurrency","kcas","test"],"dependencies":[{"issue_id":"swim-szx","depends_on_id":"swim-xoo","type":"blocks","created_at":"2026-01-08T18:50:25.94903667+01:00","created_by":"gdiazlo"},{"issue_id":"swim-szx","depends_on_id":"swim-oll","type":"blocks","created_at":"2026-01-08T18:50:25.950569487+01:00","created_by":"gdiazlo"},{"issue_id":"swim-szx","depends_on_id":"swim-iwg","type":"blocks","created_at":"2026-01-08T18:50:25.951465481+01:00","created_by":"gdiazlo"},{"issue_id":"swim-szx","depends_on_id":"swim-etm","type":"blocks","created_at":"2026-01-08T18:50:25.952262505+01:00","created_by":"gdiazlo"},{"issue_id":"swim-szx","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:50:30.713954321+01:00","created_by":"gdiazlo"}]} 14 + {"id":"swim-t28","title":"Implement protocol.ml - Main protocol loop and handlers","description":"Implement the effectful protocol runner that applies pure transitions.\n\n## Main Cluster Type\n```ocaml\ntype t = {\n config : config;\n env : env;\n self : node;\n members : Membership.t;\n incarnation : int Kcas.Loc.t;\n sequence : int Kcas.Loc.t;\n broadcast_queue : Broadcast_queue.t;\n pending_acks : Pending_acks.t;\n probe_index : int Kcas.Loc.t;\n send_pool : Buffer_pool.t;\n recv_pool : Buffer_pool.t;\n udp_sock : Eio.Net.datagram_socket;\n tcp_listener : Eio.Net.listening_socket;\n event_stream : node_event Eio.Stream.t;\n handlers : (node -\u003e string -\u003e string -\u003e unit) list Kcas.Loc.t;\n cipher_key : Mirage_crypto.Cipher_block.AES.GCM.key;\n stats : stats Kcas.Loc.t;\n shutdown : bool Kcas.Loc.t;\n}\n```\n\n## Protocol Loop\n- `run_protocol : t -\u003e unit`\n - Main loop: probe cycle, timing, check shutdown\n - Use Protocol_pure for state transitions\n\n- `probe_cycle : t -\u003e Member.t -\u003e unit`\n - Get sequence number\n - Drain piggyback messages\n - Send ping\n - Wait for ack with timeout\n - On timeout: indirect probe\n\n- `indirect_probe : t -\u003e Member.t -\u003e seq:int -\u003e now:Mtime.span -\u003e unit`\n - Select k random members\n - Send ping_req through them\n - Wait for any ack\n\n## Receive Loop\n- `run_udp_receiver : t -\u003e unit`\n - Acquire buffer from pool\n - Receive packet\n - Fork fiber for processing\n - Release buffer after processing\n\n- `handle_udp_packet : t -\u003e buf:Cstruct.t -\u003e addr:Eio.Net.Sockaddr.datagram -\u003e unit`\n - Decrypt\n - Decode\n - Dispatch to handler\n\n## Message Handlers\n- `handle_packet : t -\u003e addr:Eio.Net.Sockaddr.datagram -\u003e packet -\u003e unit`\n- `handle_ping : t -\u003e Ping.t -\u003e unit`\n- `handle_ping_req : t -\u003e Ping_req.t -\u003e unit`\n- `handle_ack : t -\u003e Ack.t -\u003e unit`\n- `handle_alive : t -\u003e Alive.t -\u003e unit`\n- `handle_suspect : t -\u003e Suspect.t -\u003e unit`\n- `handle_dead : t -\u003e Dead.t -\u003e unit`\n- `handle_user_msg : t -\u003e User_msg.t -\u003e unit`\n\n## Design constraints\n- Thin effectful wrapper over Protocol_pure\n- Use kcas for all state\n- Buffer pool for zero-copy I/O\n- Fork fibers for concurrent handling","acceptance_criteria":"- Protocol loop runs correctly\n- Probe cycles at configured interval\n- All message types handled\n- Stats updated accurately","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:48:36.304687885+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:53:04.782054511+01:00","closed_at":"2026-01-08T19:53:04.782054511+01:00","close_reason":"Implemented main protocol loop with all message handlers, probe cycles, indirect probing, UDP receiver, and cluster state management","labels":["core","eio","protocol"],"dependencies":[{"issue_id":"swim-t28","depends_on_id":"swim-td8","type":"blocks","created_at":"2026-01-08T18:48:36.308642743+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-l5y","type":"blocks","created_at":"2026-01-08T18:48:36.310137809+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-hc9","type":"blocks","created_at":"2026-01-08T18:48:36.310988083+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-xoo","type":"blocks","created_at":"2026-01-08T18:48:36.311690387+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-fac","type":"blocks","created_at":"2026-01-08T18:48:36.3123488+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-oll","type":"blocks","created_at":"2026-01-08T18:48:36.313012122+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-iwg","type":"blocks","created_at":"2026-01-08T18:48:36.313695305+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-etm","type":"blocks","created_at":"2026-01-08T18:48:36.314462189+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-90e","type":"blocks","created_at":"2026-01-08T18:48:36.315296073+01:00","created_by":"gdiazlo"},{"issue_id":"swim-t28","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:48:48.416247923+01:00","created_by":"gdiazlo"}]} 15 + {"id":"swim-td8","title":"Implement types.ml - Immutable message and node types","description":"Create the core immutable types for the SWIM protocol.\n\n## Types to implement\n\n### Node identification\n- `node_id = Node_id of string [@@unboxed]`\n- `incarnation = Incarnation of int [@@unboxed]`\n\n### Node information\n- `node_info` record with id, addr (Eio.Net.Sockaddr.datagram), meta\n\n### Member state\n- `member_state = Alive | Suspect | Dead`\n- `member_snapshot` record for pure operations\n\n### Protocol messages (pattern-matchable variants)\n- `Ping of { seq; sender }`\n- `Ping_req of { seq; target; sender }`\n- `Ack of { seq; responder; payload }`\n- `Alive of { node; incarnation }`\n- `Suspect of { node; incarnation; suspector }`\n- `Dead of { node; incarnation; declarator }`\n- `User_msg of { topic; payload; origin }`\n\n### Packet structure\n- `packet = { cluster; primary; piggyback }`\n\n### Error types\n- `decode_error` variants\n- `send_error` variants\n\n### Configuration\n- `config` record with all SWIM parameters\n- `default_config` value\n\n### Environment\n- `env` record with Eio dependencies (net, clock, mono_clock, random, sw)\n\n## Design constraints\n- All types immutable\n- Use [@@unboxed] where appropriate for performance\n- Pattern-matchable variants for protocol messages","acceptance_criteria":"- All types defined with proper signatures in types.mli\n- Types compile with dune build\n- No mutable fields except where kcas will manage them","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:45:34.790084068+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:16:46.941262108+01:00","closed_at":"2026-01-08T19:16:46.941262108+01:00","close_reason":"Implemented types.ml and types.mli with all core types: node_id, incarnation, node_info, member_state, protocol_msg, packet, decode_error, send_error, node_event, config, env, stats","labels":["core","types"],"dependencies":[{"issue_id":"swim-td8","depends_on_id":"swim-oun","type":"blocks","created_at":"2026-01-08T18:45:34.794012265+01:00","created_by":"gdiazlo"},{"issue_id":"swim-td8","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:45:39.489609655+01:00","created_by":"gdiazlo"}]} 16 + {"id":"swim-w4y","title":"Implement protocol_pure tests (test/test_pure.ml)","description":"Property-based tests for pure SWIM logic.\n\n## State transition properties\n\n### Incarnation ordering\n- `test_alive_dominates_suspect` - Alive with \u003e= incarnation beats Suspect\n- `test_higher_incarnation_wins` - Higher incarnation always dominates\n- `test_dead_is_final` - Dead state cannot be overridden\n\n### Message invalidation\n- `test_invalidation_transitive` - if A invalidates B and B invalidates C, A invalidates C\n- `test_alive_invalidates_suspect` - for same node with \u003e= incarnation\n- `test_dead_invalidates_all` - Dead invalidates Alive and Suspect for same node\n\n### Merge properties\n- `test_merge_commutative` - merge(a, b) = merge(b, a)\n- `test_merge_idempotent` - merge(a, a) = a\n- `test_merge_respects_incarnation` - higher incarnation wins\n\n### Timeout calculation\n- `test_suspicion_timeout_increases_with_nodes` - more nodes = longer timeout\n- `test_suspicion_timeout_bounded` - never exceeds max\n\n### Probe target selection\n- `test_probe_wraps_around` - index wraps at list end\n- `test_probe_skips_self` - self is never selected\n\n## Unit tests\n- Test specific transition scenarios\n- Test edge cases (empty member list, incarnation 0, etc.)\n\n## Design constraints\n- All tests on pure functions\n- No I/O or effects in tests\n- Comprehensive property coverage","acceptance_criteria":"- All SWIM invariants tested\n- Properties match SWIM paper\n- Edge cases covered\n- All tests pass","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-08T18:50:08.398465616+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:50:08.398465616+01:00","labels":["protocol","pure","test"],"dependencies":[{"issue_id":"swim-w4y","depends_on_id":"swim-fac","type":"blocks","created_at":"2026-01-08T18:50:08.402396924+01:00","created_by":"gdiazlo"},{"issue_id":"swim-w4y","depends_on_id":"swim-294","type":"blocks","created_at":"2026-01-08T18:50:08.40380169+01:00","created_by":"gdiazlo"},{"issue_id":"swim-w4y","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:50:13.114782761+01:00","created_by":"gdiazlo"}]} 17 + {"id":"swim-wdc","title":"SWIM Protocol Library Implementation","description":"Production-ready SWIM (Scalable Weakly-consistent Infection-style Process Group Membership) protocol library in OCaml 5 for cluster membership, failure detection, and lightweight pub/sub messaging.\n\n## Core Design Principles\n- Pure functions by default, separate pure logic from effectful operations\n- Immutable data structures, mutations only through kcas\n- Zero-copy buffer management with buffer pools\n- Lock-free coordination via kcas/kcas_data\n- AES-256-GCM encryption\n\n## Dependencies (allowed)\n- eio (\u003e= 1.0), kcas (\u003e= 0.7), kcas_data (\u003e= 0.7)\n- mirage-crypto, mirage-crypto-rng, cstruct\n\n## Target Scale\n- Up to 100 nodes\n- Sub-second failure detection\n- Optimized for datacenter/cloud environments","acceptance_criteria":"- All 11 core modules implemented\n- Property-based tests for pure functions\n- Integration tests passing\n- Build and opam package working\n- Performance targets met (\u003c 5 allocations/probe, \u003e 95% buffer reuse)","status":"open","priority":1,"issue_type":"epic","created_at":"2026-01-08T18:45:08.49485159+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:45:08.49485159+01:00","labels":["epic","ocaml5","swim"]} 18 + {"id":"swim-wwr","title":"Implement integration tests (test/test_integration.ml)","description":"End-to-end integration tests for the SWIM library.\n\n## Two-node tests\n- `test_two_node_join` - node2 joins node1, both see each other\n- `test_two_node_leave` - graceful leave propagates\n- `test_two_node_broadcast` - broadcast message received\n- `test_two_node_direct_send` - direct TCP message delivered\n\n## Three-node tests\n- `test_gossip_propagation` - message reaches all nodes\n- `test_indirect_probe` - indirect probe detects alive node\n- `test_failure_detection` - dead node detected and removed\n\n## Failure scenarios\n- `test_network_partition` - nodes handle partition\n- `test_node_crash` - crashed node detected as dead\n- `test_rejoin_after_crash` - node can rejoin after restart\n\n## Metadata tests\n- `test_metadata_propagation` - metadata updates reach all nodes\n- `test_metadata_update` - updated metadata replaces old\n\n## Event stream tests\n- `test_join_event_fired` - Join event on new member\n- `test_leave_event_fired` - Leave event on departure\n- `test_suspect_event_fired` - Suspect event on probe timeout\n\n## Performance tests\n- `test_convergence_time` - cluster converges within expected time\n- `test_message_throughput` - broadcast rate meets target\n\n## Design constraints\n- Use Eio_main.run for all tests\n- Proper cleanup with shutdown\n- Realistic timing (but accelerated)\n- Isolated network per test","acceptance_criteria":"- All integration tests pass\n- Failure scenarios handled\n- Performance targets met\n- Clean teardown","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-08T18:50:43.333077327+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T18:50:43.333077327+01:00","labels":["integration","test"],"dependencies":[{"issue_id":"swim-wwr","depends_on_id":"swim-zsi","type":"blocks","created_at":"2026-01-08T18:50:43.337480017+01:00","created_by":"gdiazlo"},{"issue_id":"swim-wwr","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:50:46.783801496+01:00","created_by":"gdiazlo"}]} 19 + {"id":"swim-xoo","title":"Implement buffer_pool.ml - Buffer management with kcas_data","description":"Implement buffer pool for zero-copy network I/O.\n\n## Buffer_pool module\n\n### Type\n```ocaml\ntype t = {\n buffers : Cstruct.t Kcas_data.Queue.t;\n size : int;\n total : int;\n semaphore : Eio.Semaphore.t;\n}\n```\n\n### Functions\n- `create : size:int -\u003e count:int -\u003e t`\n - Pre-allocate `count` buffers of `size` bytes\n - Use Kcas_data.Queue for lock-free storage\n - Eio.Semaphore for blocking acquire\n\n- `acquire : t -\u003e Cstruct.t`\n - Block on semaphore if no buffers\n - Pop from queue\n - Reset buffer (memset 0) before returning\n\n- `try_acquire : t -\u003e Cstruct.t option`\n - Non-blocking acquire\n - Return None if no buffers available\n\n- `release : t -\u003e Cstruct.t -\u003e unit`\n - Push buffer back to queue\n - Release semaphore\n\n- `with_buffer : t -\u003e (Cstruct.t -\u003e 'a) -\u003e 'a`\n - RAII-style acquire/release\n - Use Fun.protect for exception safety\n\n- `available : t -\u003e int` - current available count\n- `total : t -\u003e int` - total pool size\n\n## Design constraints\n- Lock-free queue via kcas_data\n- Semaphore for blocking (only blocking allowed per spec)\n- Clear buffer ownership semantics\n- No memory leaks on exceptions","acceptance_criteria":"- Buffers properly recycled\n- No leaks under concurrent use\n- with_buffer is exception-safe\n- Stats accurate","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:46:28.146790073+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:27:29.348943322+01:00","closed_at":"2026-01-08T19:27:29.348943322+01:00","close_reason":"Implemented buffer_pool.ml with Kcas_data.Queue and Eio.Semaphore","labels":["buffer","core","zero-copy"],"dependencies":[{"issue_id":"swim-xoo","depends_on_id":"swim-oun","type":"blocks","created_at":"2026-01-08T18:46:28.151030562+01:00","created_by":"gdiazlo"},{"issue_id":"swim-xoo","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:46:32.927877844+01:00","created_by":"gdiazlo"}]} 20 + {"id":"swim-zsi","title":"Implement swim.ml/swim.mli - Public API assembly","description":"Implement the public API as specified in swim.mli.\n\n## Cluster module (public interface)\n\n### Lifecycle\n- `create : env -\u003e config -\u003e (t, [\u003e `Invalid_key | `Bind_failed of string]) result`\n - Initialize crypto\n - Create buffer pools\n - Bind sockets\n - Start protocol and receiver fibers\n\n- `join : t -\u003e seed_nodes:string list -\u003e (unit, [\u003e `No_seeds_reachable]) result`\n - Parse seed addresses\n - Send join requests\n - Wait for acks\n\n- `leave : t -\u003e ?timeout:float -\u003e unit -\u003e unit`\n - Broadcast leave\n - Wait for propagation\n - Graceful shutdown\n\n- `shutdown : t -\u003e unit`\n - Set shutdown flag\n - Close sockets\n - Release resources\n\n### Membership queries (pure)\n- `local_node : t -\u003e node`\n- `nodes : t -\u003e node list`\n- `node_count : t -\u003e int`\n- `is_alive : t -\u003e node_id -\u003e bool`\n- `find_node : t -\u003e node_id -\u003e node option`\n\n### Events\n- `events : t -\u003e node_event Eio.Stream.t`\n\n### Metadata\n- `set_meta : t -\u003e string -\u003e (unit, [\u003e `Too_large]) result`\n\n### Msg submodule\n- `broadcast : t -\u003e topic:string -\u003e payload:string -\u003e (unit, [\u003e `Too_large]) result`\n- `send : t -\u003e node -\u003e payload:string -\u003e (unit, [\u003e `Unreachable | `Timeout]) result`\n- `on_message : t -\u003e handler -\u003e unit`\n- `on_topic : t -\u003e string -\u003e (node -\u003e string -\u003e unit) -\u003e unit`\n\n### Health submodule\n- `stats : t -\u003e stats`\n- `is_healthy : t -\u003e bool`\n\n## Design constraints\n- All operations fiber-safe\n- No blocking except Eio primitives\n- Result types for fallible operations\n- Clean module signature in .mli","acceptance_criteria":"- Public API matches spec\n- All operations work correctly\n- Clean .mli signature\n- Documentation comments","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-08T18:49:05.567892446+01:00","created_by":"gdiazlo","updated_at":"2026-01-08T19:54:05.541094079+01:00","closed_at":"2026-01-08T19:54:05.541094079+01:00","close_reason":"Implemented Cluster module as public API with create, start, shutdown, join, broadcast, member queries, and event streaming","labels":["api","core"],"dependencies":[{"issue_id":"swim-zsi","depends_on_id":"swim-t28","type":"blocks","created_at":"2026-01-08T18:49:05.571629003+01:00","created_by":"gdiazlo"},{"issue_id":"swim-zsi","depends_on_id":"swim-wdc","type":"parent-child","created_at":"2026-01-08T18:49:09.915596516+01:00","created_by":"gdiazlo"}]}
+4
.beads/metadata.json
··· 1 + { 2 + "database": "beads.db", 3 + "jsonl_export": "issues.jsonl" 4 + }
+3
.gitattributes
··· 1 + 2 + # Use bd merge for beads JSONL files 3 + .beads/issues.jsonl merge=beads
+3
.gitignore
··· 1 + _build/ 2 + *.install 3 + .merlin
+2
.ocamlformat
··· 1 + version = 0.28.1 2 + profile = default
+80
.opencode/agent/ocaml.md
··· 1 + --- 2 + description: Expert OCaml coding agent for building, designing, and implementing OCaml code following functional programming best practices 3 + mode: subagent 4 + tools: 5 + write: true 6 + edit: true 7 + bash: true 8 + read: true 9 + glob: true 10 + grep: true 11 + temperature: 0.1 12 + --- 13 + 14 + # OCaml Agent 15 + 16 + You are an expert OCaml coding and design partner. You never write documentation unless explicitly asked. You always plan and use bd to manage your tasks. 17 + 18 + ## Expertise Areas 19 + 20 + ### Languages 21 + - OCaml (expert - following best practices and functional programming) 22 + - C/C++ (systems programming, FFI) 23 + - JavaScript/HTML (Web Components, D3.js) 24 + 25 + ### Domains 26 + - Functional programming (algebraic effects, composable functions, recursion, immutability) 27 + - HTTP API protocol expert 28 + - Social network API expert (Facebook, X, LinkedIn, Bluesky, Reddit) 29 + 30 + ## Key Principles 31 + 32 + ### DO 33 + - Write functional readable code 34 + - Design with composition in mind 35 + - Use OCaml standard library 36 + - Document in `.mli` files only 37 + - Keep functions under 70 lines 38 + - Handle errors with `Result` types at the end of the composition chain when appropriate 39 + - Use qualified names (`List.map`, not `map`) 40 + - Define explicit types for domain concepts 41 + - Consider security in every change 42 + 43 + ### DON'T 44 + - Use Jane Street libraries (Base/Core) 45 + - Write unit tests 46 + - Add comments in `.ml` files 47 + - Use catch-all `| _ ->` patterns carelessly 48 + - Use `open` directives 49 + - Create speculative features 50 + - Use Boolean flags instead of types 51 + 52 + ## Code Patterns 53 + 54 + Instead of a library with custom types (producer, >>| operators), rely on: 55 + 56 + 1. **Standard types**: `Result.t`, `Seq.t`, `option` 57 + 2. **Standard combinators**: `Result.bind`, `Seq.map`, `Seq.filter`, `Seq.fold_left` 58 + 3. **Composable function signatures**: Functions that take state, return `(state, error) result` 59 + 4. **let* syntax**: For clean Result chaining 60 + 5. **Modules and functors**: To define complex types with behaviors 61 + 6. **Error management**: At the end of a composable chain. Leverage `(state, error) result`! 62 + 63 + ## Security Checklist 64 + 65 + When making changes, consider: 66 + - Input validation implemented 67 + - No secrets in logs 68 + - SQL injection prevented (parameterized queries) 69 + - Path traversal prevented 70 + - Rate limiting considered 71 + - Access control enforced 72 + - Data sanitized before display 73 + 74 + ## Build Commands 75 + 76 + For this project, use dune for building: 77 + - `dune build` - Build the project 78 + - `dune test` - Run tests 79 + - `dune exec socials` - Run the main executable 80 + - `dune clean` - Clean build artifacts
+40
AGENTS.md
··· 1 + # Agent Instructions 2 + 3 + This project uses **bd** (beads) for issue tracking. Run `bd onboard` to get started. 4 + 5 + ## Quick Reference 6 + 7 + ```bash 8 + bd ready # Find available work 9 + bd show <id> # View issue details 10 + bd update <id> --status in_progress # Claim work 11 + bd close <id> # Complete work 12 + bd sync # Sync with git 13 + ``` 14 + 15 + ## Landing the Plane (Session Completion) 16 + 17 + **When ending a work session**, you MUST complete ALL steps below. Work is NOT complete until `git push` succeeds. 18 + 19 + **MANDATORY WORKFLOW:** 20 + 21 + 1. **File issues for remaining work** - Create issues for anything that needs follow-up 22 + 2. **Run quality gates** (if code changed) - Tests, linters, builds 23 + 3. **Update issue status** - Close finished work, update in-progress items 24 + 4. **PUSH TO REMOTE** - This is MANDATORY: 25 + ```bash 26 + git pull --rebase 27 + bd sync 28 + git push 29 + git status # MUST show "up to date with origin" 30 + ``` 31 + 5. **Clean up** - Clear stashes, prune remote branches 32 + 6. **Verify** - All changes committed AND pushed 33 + 7. **Hand off** - Provide context for next session 34 + 35 + **CRITICAL RULES:** 36 + - Work is NOT complete until `git push` succeeds 37 + - NEVER stop before pushing - that leaves work stranded locally 38 + - NEVER say "ready to push when you are" - YOU must push 39 + - If push fails, resolve and retry until it succeeds 40 +
+15
LICENSE
··· 1 + ISC License 2 + 3 + Copyright (c) 2026 Gabriel Díaz López de la Llave 4 + 5 + Permission to use, copy, modify, and/or distribute this software for any 6 + purpose with or without fee is hereby granted, provided that the above 7 + copyright notice and this permission notice appear in all copies. 8 + 9 + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH 10 + REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 11 + AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, 12 + INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 13 + LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR 14 + OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 15 + PERFORMANCE OF THIS SOFTWARE.
+4
bin/dune
··· 1 + (executable 2 + (public_name swim-demo) 3 + (name main) 4 + (libraries swim eio_main))
+1
bin/main.ml
··· 1 + let () = print_endline ("SWIM library version: " ^ Swim.version)
+39
dune-project
··· 1 + (lang dune 3.20) 2 + 3 + (name swim) 4 + 5 + (generate_opam_files true) 6 + 7 + (source 8 + (github gdiazlo/swim)) 9 + 10 + (authors "Guillermo Diaz-Romero <guillermo.diaz@gmail.com>") 11 + 12 + (maintainers "Guillermo Diaz-Romero <guillermo.diaz@gmail.com>") 13 + 14 + (license MIT) 15 + 16 + (documentation https://github.com/gdiazlo/swim) 17 + 18 + (package 19 + (name swim) 20 + (synopsis "SWIM protocol library for cluster membership and failure detection") 21 + (description 22 + "Production-ready SWIM (Scalable Weakly-consistent Infection-style Process Group Membership) protocol library in OCaml 5 for cluster membership, failure detection, and lightweight pub/sub messaging. Features lock-free coordination via kcas, zero-copy buffer management, and AES-256-GCM encryption.") 23 + (depends 24 + (ocaml (>= 5.1)) 25 + (dune (>= 3.20)) 26 + (eio (>= 1.0)) 27 + (eio_main (>= 1.0)) 28 + (kcas (>= 0.7)) 29 + (kcas_data (>= 0.7)) 30 + (mirage-crypto (>= 1.0)) 31 + (mirage-crypto-rng (>= 1.0)) 32 + (cstruct (>= 6.0)) 33 + (mtime (>= 2.0)) 34 + (qcheck (>= 0.21)) 35 + (qcheck-alcotest (>= 0.21)) 36 + (alcotest (>= 1.7)) 37 + (logs (>= 0.7))) 38 + (tags 39 + (swim cluster membership gossip "failure detection" ocaml5 eio)))
+74
lib/buffer_pool.ml
··· 1 + (** Lock-free buffer pool using Kcas and Eio. 2 + 3 + Provides pre-allocated buffers for zero-copy I/O operations. Uses 4 + Kcas_data.Queue for lock-free buffer storage and Eio.Semaphore for blocking 5 + acquire when pool is exhausted. *) 6 + 7 + type t = { 8 + buffers : Cstruct.t Kcas_data.Queue.t; 9 + buf_size : int; 10 + total : int; 11 + semaphore : Eio.Semaphore.t; 12 + } 13 + 14 + let create ~size ~count = 15 + let buffers = Kcas_data.Queue.create () in 16 + for _ = 1 to count do 17 + Kcas.Xt.commit 18 + { 19 + tx = 20 + (fun ~xt -> Kcas_data.Queue.Xt.add ~xt (Cstruct.create size) buffers); 21 + } 22 + done; 23 + { 24 + buffers; 25 + buf_size = size; 26 + total = count; 27 + semaphore = Eio.Semaphore.make count; 28 + } 29 + 30 + let acquire t = 31 + Eio.Semaphore.acquire t.semaphore; 32 + let buf_opt = 33 + Kcas.Xt.commit 34 + { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } 35 + in 36 + match buf_opt with 37 + | Some buf -> 38 + Cstruct.memset buf 0; 39 + buf 40 + | None -> 41 + (* Should not happen if semaphore is properly synchronized, 42 + but handle gracefully by allocating a new buffer *) 43 + Cstruct.create t.buf_size 44 + 45 + let try_acquire t = 46 + (* Check if semaphore has available permits without blocking *) 47 + if Eio.Semaphore.get_value t.semaphore > 0 then begin 48 + (* Race condition possible here - another fiber might acquire between 49 + get_value and acquire. In that case, acquire will block briefly. 50 + For truly non-blocking behavior, we'd need atomic CAS on semaphore. *) 51 + Eio.Semaphore.acquire t.semaphore; 52 + let buf_opt = 53 + Kcas.Xt.commit 54 + { tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) } 55 + in 56 + match buf_opt with 57 + | Some buf -> 58 + Cstruct.memset buf 0; 59 + Some buf 60 + | None -> Some (Cstruct.create t.buf_size) 61 + end 62 + else None 63 + 64 + let release t buf = 65 + Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt buf t.buffers) }; 66 + Eio.Semaphore.release t.semaphore 67 + 68 + let with_buffer t f = 69 + let buf = acquire t in 70 + Fun.protect ~finally:(fun () -> release t buf) (fun () -> f buf) 71 + 72 + let available t = Eio.Semaphore.get_value t.semaphore 73 + let total t = t.total 74 + let size t = t.buf_size
+10
lib/buffer_pool.mli
··· 1 + type t 2 + 3 + val create : size:int -> count:int -> t 4 + val acquire : t -> Cstruct.t 5 + val try_acquire : t -> Cstruct.t option 6 + val release : t -> Cstruct.t -> unit 7 + val with_buffer : t -> (Cstruct.t -> 'a) -> 'a 8 + val available : t -> int 9 + val total : t -> int 10 + val size : t -> int
+307
lib/codec.ml
··· 1 + open Types 2 + 3 + module Encoder = struct 4 + type t = { buf : Cstruct.t; mutable pos : int } 5 + 6 + let create ~buf = { buf; pos = 0 } 7 + 8 + let write_byte t v = 9 + Cstruct.set_uint8 t.buf t.pos v; 10 + t.pos <- t.pos + 1 11 + 12 + let write_int16_be t v = 13 + Cstruct.BE.set_uint16 t.buf t.pos v; 14 + t.pos <- t.pos + 2 15 + 16 + let write_int32_be t v = 17 + Cstruct.BE.set_uint32 t.buf t.pos v; 18 + t.pos <- t.pos + 4 19 + 20 + let write_int64_be t v = 21 + Cstruct.BE.set_uint64 t.buf t.pos v; 22 + t.pos <- t.pos + 8 23 + 24 + let write_string t s = 25 + let len = String.length s in 26 + write_int16_be t len; 27 + Cstruct.blit_from_string s 0 t.buf t.pos len; 28 + t.pos <- t.pos + len 29 + 30 + let write_bytes t cs = 31 + let len = Cstruct.length cs in 32 + Cstruct.blit cs 0 t.buf t.pos len; 33 + t.pos <- t.pos + len 34 + 35 + let to_cstruct t = Cstruct.sub t.buf 0 t.pos 36 + let reset t = t.pos <- 0 37 + let remaining t = Cstruct.length t.buf - t.pos 38 + let pos t = t.pos 39 + end 40 + 41 + module Decoder = struct 42 + type t = { buf : Cstruct.t; mutable pos : int } 43 + 44 + let create buf = { buf; pos = 0 } 45 + 46 + let read_byte t = 47 + let v = Cstruct.get_uint8 t.buf t.pos in 48 + t.pos <- t.pos + 1; 49 + v 50 + 51 + let read_int16_be t = 52 + let v = Cstruct.BE.get_uint16 t.buf t.pos in 53 + t.pos <- t.pos + 2; 54 + v 55 + 56 + let read_int32_be t = 57 + let v = Cstruct.BE.get_uint32 t.buf t.pos in 58 + t.pos <- t.pos + 4; 59 + v 60 + 61 + let read_int64_be t = 62 + let v = Cstruct.BE.get_uint64 t.buf t.pos in 63 + t.pos <- t.pos + 8; 64 + v 65 + 66 + let read_string t = 67 + let len = read_int16_be t in 68 + let s = Cstruct.to_string ~off:t.pos ~len t.buf in 69 + t.pos <- t.pos + len; 70 + s 71 + 72 + let read_bytes t ~len = 73 + let cs = Cstruct.sub t.buf t.pos len in 74 + t.pos <- t.pos + len; 75 + cs 76 + 77 + let remaining t = Cstruct.length t.buf - t.pos 78 + let is_empty t = t.pos >= Cstruct.length t.buf 79 + let pos t = t.pos 80 + end 81 + 82 + let magic = "SWIM" 83 + let version = 1 84 + let tag_ping = 0x01 85 + let tag_ping_req = 0x02 86 + let tag_ack = 0x03 87 + let tag_alive = 0x04 88 + let tag_suspect = 0x05 89 + let tag_dead = 0x06 90 + let tag_user_msg = 0x07 91 + let ip_to_string ip = Fmt.to_to_string Eio.Net.Ipaddr.pp ip 92 + 93 + let parse_ipv4 s = 94 + Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d -> 95 + let buf = Bytes.create 4 in 96 + Bytes.set_uint8 buf 0 a; 97 + Bytes.set_uint8 buf 1 b; 98 + Bytes.set_uint8 buf 2 c; 99 + Bytes.set_uint8 buf 3 d; 100 + Eio.Net.Ipaddr.of_raw (Bytes.to_string buf)) 101 + 102 + let parse_ipv6 s = 103 + let parts = String.split_on_char ':' s in 104 + let buf = Bytes.create 16 in 105 + let rec fill idx = function 106 + | [] -> () 107 + | "" :: rest when List.exists (( = ) "") rest -> 108 + let tail_len = List.length (List.filter (( <> ) "") rest) in 109 + let zeros = 8 - idx - tail_len in 110 + for i = 0 to (zeros * 2) - 1 do 111 + Bytes.set_uint8 buf ((idx * 2) + i) 0 112 + done; 113 + fill (idx + zeros) rest 114 + | "" :: rest -> fill idx rest 115 + | h :: rest -> 116 + let v = int_of_string ("0x" ^ h) in 117 + Bytes.set_uint8 buf (idx * 2) (v lsr 8); 118 + Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff); 119 + fill (idx + 1) rest 120 + in 121 + fill 0 parts; 122 + Eio.Net.Ipaddr.of_raw (Bytes.to_string buf) 123 + 124 + let ip_of_string s = 125 + if String.contains s ':' then parse_ipv6 s else parse_ipv4 s 126 + 127 + let encode_addr enc (addr : Eio.Net.Sockaddr.datagram) = 128 + match addr with 129 + | `Udp (ip, port) -> 130 + Encoder.write_string enc (ip_to_string ip); 131 + Encoder.write_int16_be enc port 132 + | `Unix _ -> failwith "Unix sockets not supported for SWIM protocol" 133 + 134 + let decode_addr dec : Eio.Net.Sockaddr.datagram = 135 + let ip_str = Decoder.read_string dec in 136 + let port = Decoder.read_int16_be dec in 137 + `Udp (ip_of_string ip_str, port) 138 + 139 + let encode_node_id enc (node_id : node_id) = 140 + Encoder.write_string enc (node_id_to_string node_id) 141 + 142 + let decode_node_id dec : node_id = node_id_of_string (Decoder.read_string dec) 143 + 144 + let encode_node enc (node : node_info) = 145 + encode_node_id enc node.id; 146 + encode_addr enc node.addr; 147 + Encoder.write_string enc node.meta 148 + 149 + let decode_node dec : node_info = 150 + let id = decode_node_id dec in 151 + let addr = decode_addr dec in 152 + let meta = Decoder.read_string dec in 153 + { id; addr; meta } 154 + 155 + let encode_incarnation enc (inc : incarnation) = 156 + Encoder.write_int32_be enc (Int32.of_int (incarnation_to_int inc)) 157 + 158 + let decode_incarnation dec : incarnation = 159 + incarnation_of_int (Int32.to_int (Decoder.read_int32_be dec)) 160 + 161 + let encode_option encode_elem enc = function 162 + | None -> Encoder.write_byte enc 0 163 + | Some v -> 164 + Encoder.write_byte enc 1; 165 + encode_elem enc v 166 + 167 + let decode_option decode_elem dec = 168 + match Decoder.read_byte dec with 0 -> None | _ -> Some (decode_elem dec) 169 + 170 + let encode_msg enc msg = 171 + match msg with 172 + | Ping { seq; sender } -> 173 + Encoder.write_byte enc tag_ping; 174 + Encoder.write_int32_be enc (Int32.of_int seq); 175 + encode_node enc sender 176 + | Ping_req { seq; target; sender } -> 177 + Encoder.write_byte enc tag_ping_req; 178 + Encoder.write_int32_be enc (Int32.of_int seq); 179 + encode_node_id enc target; 180 + encode_node enc sender 181 + | Ack { seq; responder; payload } -> 182 + Encoder.write_byte enc tag_ack; 183 + Encoder.write_int32_be enc (Int32.of_int seq); 184 + encode_node enc responder; 185 + encode_option Encoder.write_string enc payload 186 + | Alive { node; incarnation } -> 187 + Encoder.write_byte enc tag_alive; 188 + encode_node enc node; 189 + encode_incarnation enc incarnation 190 + | Suspect { node; incarnation; suspector } -> 191 + Encoder.write_byte enc tag_suspect; 192 + encode_node_id enc node; 193 + encode_incarnation enc incarnation; 194 + encode_node_id enc suspector 195 + | Dead { node; incarnation; declarator } -> 196 + Encoder.write_byte enc tag_dead; 197 + encode_node_id enc node; 198 + encode_incarnation enc incarnation; 199 + encode_node_id enc declarator 200 + | User_msg { topic; payload; origin } -> 201 + Encoder.write_byte enc tag_user_msg; 202 + Encoder.write_string enc topic; 203 + Encoder.write_string enc payload; 204 + encode_node_id enc origin 205 + 206 + let decode_msg dec : (protocol_msg, decode_error) result = 207 + let tag = Decoder.read_byte dec in 208 + match tag with 209 + | t when t = tag_ping -> 210 + let seq = Int32.to_int (Decoder.read_int32_be dec) in 211 + let sender = decode_node dec in 212 + Ok (Ping { seq; sender }) 213 + | t when t = tag_ping_req -> 214 + let seq = Int32.to_int (Decoder.read_int32_be dec) in 215 + let target = decode_node_id dec in 216 + let sender = decode_node dec in 217 + Ok (Ping_req { seq; target; sender }) 218 + | t when t = tag_ack -> 219 + let seq = Int32.to_int (Decoder.read_int32_be dec) in 220 + let responder = decode_node dec in 221 + let payload = decode_option Decoder.read_string dec in 222 + Ok (Ack { seq; responder; payload }) 223 + | t when t = tag_alive -> 224 + let node = decode_node dec in 225 + let incarnation = decode_incarnation dec in 226 + Ok (Alive { node; incarnation }) 227 + | t when t = tag_suspect -> 228 + let node = decode_node_id dec in 229 + let incarnation = decode_incarnation dec in 230 + let suspector = decode_node_id dec in 231 + Ok (Suspect { node; incarnation; suspector }) 232 + | t when t = tag_dead -> 233 + let node = decode_node_id dec in 234 + let incarnation = decode_incarnation dec in 235 + let declarator = decode_node_id dec in 236 + Ok (Dead { node; incarnation; declarator }) 237 + | t when t = tag_user_msg -> 238 + let topic = Decoder.read_string dec in 239 + let payload = Decoder.read_string dec in 240 + let origin = decode_node_id dec in 241 + Ok (User_msg { topic; payload; origin }) 242 + | t -> Error (Invalid_tag t) 243 + 244 + let encode_packet packet ~buf = 245 + let enc = Encoder.create ~buf in 246 + Encoder.write_bytes enc (Cstruct.of_string magic); 247 + Encoder.write_byte enc version; 248 + Encoder.write_string enc packet.cluster; 249 + let msg_count = 1 + List.length packet.piggyback in 250 + Encoder.write_int16_be enc msg_count; 251 + encode_msg enc packet.primary; 252 + List.iter (encode_msg enc) packet.piggyback; 253 + if Encoder.remaining enc < 0 then Error `Buffer_too_small 254 + else Ok (Encoder.pos enc) 255 + 256 + let decode_packet buf : (packet, decode_error) result = 257 + let dec = Decoder.create buf in 258 + let magic_bytes = Decoder.read_bytes dec ~len:4 in 259 + if Cstruct.to_string magic_bytes <> magic then Error Invalid_magic 260 + else 261 + let ver = Decoder.read_byte dec in 262 + if ver <> version then Error (Unsupported_version ver) 263 + else 264 + let cluster = Decoder.read_string dec in 265 + let msg_count = Decoder.read_int16_be dec in 266 + let rec decode_msgs acc remaining = 267 + if remaining = 0 then Ok (List.rev acc) 268 + else 269 + match decode_msg dec with 270 + | Error e -> Error e 271 + | Ok msg -> decode_msgs (msg :: acc) (remaining - 1) 272 + in 273 + match decode_msgs [] msg_count with 274 + | Error e -> Error e 275 + | Ok [] -> Error Truncated_message 276 + | Ok (primary :: piggyback) -> Ok { cluster; primary; piggyback } 277 + 278 + let node_id_size node_id = 2 + String.length (node_id_to_string node_id) 279 + 280 + let addr_size (addr : Eio.Net.Sockaddr.datagram) = 281 + match addr with 282 + | `Udp (ip, _) -> 283 + let ip_str = ip_to_string ip in 284 + 2 + String.length ip_str + 2 285 + | `Unix _ -> failwith "Unix sockets not supported for SWIM protocol" 286 + 287 + let node_size (node : node_info) = 288 + node_id_size node.id + addr_size node.addr + 2 + String.length node.meta 289 + 290 + let option_size f = function None -> 1 | Some v -> 1 + f v 291 + 292 + let encoded_size msg = 293 + match msg with 294 + | Ping { sender; _ } -> 1 + 4 + node_size sender 295 + | Ping_req { target; sender; _ } -> 296 + 1 + 4 + node_id_size target + node_size sender 297 + | Ack { responder; payload; _ } -> 298 + 1 + 4 + node_size responder 299 + + option_size (fun s -> 2 + String.length s) payload 300 + | Alive { node; _ } -> 1 + node_size node + 4 301 + | Suspect { node; suspector; _ } -> 302 + 1 + node_id_size node + 4 + node_id_size suspector 303 + | Dead { node; declarator; _ } -> 304 + 1 + node_id_size node + 4 + node_id_size declarator 305 + | User_msg { topic; payload; origin } -> 306 + 1 + 2 + String.length topic + 2 + String.length payload 307 + + node_id_size origin
+38
lib/crypto.ml
··· 1 + let nonce_size = 12 2 + let tag_size = 16 3 + let overhead = nonce_size + tag_size 4 + 5 + type key = Mirage_crypto.AES.GCM.key 6 + 7 + let init_key secret = 8 + if String.length secret <> 32 then Error `Invalid_key_length 9 + else Ok (Mirage_crypto.AES.GCM.of_secret secret) 10 + 11 + let generate_nonce (random : _ Eio.Flow.source) = 12 + let buf = Cstruct.create nonce_size in 13 + Eio.Flow.read_exact random buf; 14 + Cstruct.to_string buf 15 + 16 + let encrypt ~key ~random plaintext = 17 + let nonce = generate_nonce random in 18 + let ciphertext = 19 + Mirage_crypto.AES.GCM.authenticate_encrypt ~key ~nonce 20 + (Cstruct.to_string plaintext) 21 + in 22 + let result = Cstruct.create (nonce_size + String.length ciphertext) in 23 + Cstruct.blit_from_string nonce 0 result 0 nonce_size; 24 + Cstruct.blit_from_string ciphertext 0 result nonce_size 25 + (String.length ciphertext); 26 + result 27 + 28 + let decrypt ~key data = 29 + if Cstruct.length data < overhead then Error `Too_short 30 + else 31 + let nonce = Cstruct.to_string (Cstruct.sub data 0 nonce_size) in 32 + let ciphertext = 33 + Cstruct.to_string 34 + (Cstruct.sub data nonce_size (Cstruct.length data - nonce_size)) 35 + in 36 + match Mirage_crypto.AES.GCM.authenticate_decrypt ~key ~nonce ciphertext with 37 + | Some plaintext -> Ok (Cstruct.of_string plaintext) 38 + | None -> Error `Decryption_failed
+74
lib/dissemination.ml
··· 1 + open Types 2 + 3 + type item = { 4 + msg : protocol_msg; 5 + transmits : int Kcas.Loc.t; 6 + created : Mtime.span; 7 + } 8 + 9 + type t = { queue : item Kcas_data.Queue.t; depth : int Kcas.Loc.t } 10 + 11 + let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 } 12 + 13 + let enqueue t msg ~transmits ~created = 14 + let item = { msg; transmits = Kcas.Loc.make transmits; created } in 15 + Kcas.Xt.commit 16 + { 17 + tx = 18 + (fun ~xt -> 19 + Kcas_data.Queue.Xt.add ~xt item t.queue; 20 + Kcas.Xt.modify ~xt t.depth succ); 21 + } 22 + 23 + let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) } 24 + 25 + let drain t ~max_bytes ~encode_size = 26 + let rec loop acc bytes_used = 27 + Kcas.Xt.commit 28 + { 29 + tx = 30 + (fun ~xt -> 31 + match Kcas_data.Queue.Xt.take_opt ~xt t.queue with 32 + | None -> List.rev acc 33 + | Some item -> 34 + let msg_size = encode_size item.msg in 35 + if bytes_used + msg_size > max_bytes && acc <> [] then begin 36 + Kcas_data.Queue.Xt.add ~xt item t.queue; 37 + List.rev acc 38 + end 39 + else 40 + let remaining = Kcas.Xt.get ~xt item.transmits - 1 in 41 + if remaining > 0 then begin 42 + Kcas.Xt.set ~xt item.transmits remaining; 43 + Kcas_data.Queue.Xt.add ~xt item t.queue 44 + end 45 + else Kcas.Xt.modify ~xt t.depth pred; 46 + loop (item.msg :: acc) (bytes_used + msg_size)); 47 + } 48 + in 49 + loop [] 0 50 + 51 + let invalidate t ~invalidates newer_msg = 52 + let items = 53 + Kcas.Xt.commit 54 + { tx = (fun ~xt -> Kcas_data.Queue.Xt.to_seq ~xt t.queue |> List.of_seq) } 55 + in 56 + let valid_items, removed_count = 57 + List.fold_left 58 + (fun (valid, removed) item -> 59 + if invalidates ~newer:newer_msg ~older:item.msg then (valid, removed + 1) 60 + else (item :: valid, removed)) 61 + ([], 0) items 62 + in 63 + if removed_count > 0 then begin 64 + Kcas.Xt.commit 65 + { 66 + tx = 67 + (fun ~xt -> 68 + Kcas_data.Queue.Xt.clear ~xt t.queue; 69 + List.iter 70 + (fun item -> Kcas_data.Queue.Xt.add ~xt item t.queue) 71 + (List.rev valid_items); 72 + Kcas.Xt.modify ~xt t.depth (fun d -> d - removed_count)); 73 + } 74 + end
+22
lib/dissemination.mli
··· 1 + open Types 2 + 3 + type item = { 4 + msg : protocol_msg; 5 + transmits : int Kcas.Loc.t; 6 + created : Mtime.span; 7 + } 8 + 9 + type t 10 + 11 + val create : unit -> t 12 + val enqueue : t -> protocol_msg -> transmits:int -> created:Mtime.span -> unit 13 + val depth : t -> int 14 + 15 + val drain : 16 + t -> max_bytes:int -> encode_size:(protocol_msg -> int) -> protocol_msg list 17 + 18 + val invalidate : 19 + t -> 20 + invalidates:(newer:protocol_msg -> older:protocol_msg -> bool) -> 21 + protocol_msg -> 22 + unit
+14
lib/dune
··· 1 + (library 2 + (name swim) 3 + (public_name swim) 4 + (libraries 5 + eio 6 + eio_main 7 + kcas 8 + kcas_data 9 + mirage-crypto 10 + mirage-crypto-rng 11 + cstruct 12 + mtime 13 + logs 14 + fmt))
+130
lib/membership.ml
··· 1 + open Types 2 + 3 + module Member = struct 4 + type t = { 5 + node : node_info; 6 + state : member_state Kcas.Loc.t; 7 + incarnation : incarnation Kcas.Loc.t; 8 + state_change_time : Mtime.span Kcas.Loc.t; 9 + last_ack_time : Mtime.span Kcas.Loc.t; 10 + } 11 + 12 + let create ?(initial_state : member_state = Types.Alive) 13 + ?(initial_incarnation = zero_incarnation) ~(now : Mtime.span) 14 + (node : node_info) = 15 + { 16 + node; 17 + state = Kcas.Loc.make initial_state; 18 + incarnation = Kcas.Loc.make initial_incarnation; 19 + state_change_time = Kcas.Loc.make now; 20 + last_ack_time = Kcas.Loc.make now; 21 + } 22 + 23 + let node t = t.node 24 + let get_state ~xt t = Kcas.Xt.get ~xt t.state 25 + let get_incarnation ~xt t = Kcas.Xt.get ~xt t.incarnation 26 + let get_state_change_time ~xt t = Kcas.Xt.get ~xt t.state_change_time 27 + let get_last_ack_time ~xt t = Kcas.Xt.get ~xt t.last_ack_time 28 + 29 + let set_state ~xt t state ~now = 30 + Kcas.Xt.set ~xt t.state state; 31 + Kcas.Xt.set ~xt t.state_change_time now 32 + 33 + let set_incarnation ~xt t inc = Kcas.Xt.set ~xt t.incarnation inc 34 + 35 + let set_alive ~xt t ~incarnation ~now = 36 + set_state ~xt t Alive ~now; 37 + set_incarnation ~xt t incarnation 38 + 39 + let set_suspect ~xt t ~incarnation ~now = 40 + set_state ~xt t Suspect ~now; 41 + set_incarnation ~xt t incarnation 42 + 43 + let set_dead ~xt t ~incarnation ~now = 44 + set_state ~xt t Dead ~now; 45 + set_incarnation ~xt t incarnation 46 + 47 + let record_ack ~xt t ~now = Kcas.Xt.set ~xt t.last_ack_time now 48 + 49 + let snapshot ~xt t : member_snapshot = 50 + { 51 + node = t.node; 52 + state = get_state ~xt t; 53 + incarnation = get_incarnation ~xt t; 54 + state_change = get_state_change_time ~xt t; 55 + } 56 + 57 + let snapshot_now t : member_snapshot = 58 + Kcas.Xt.commit { tx = (fun ~xt -> snapshot ~xt t) } 59 + end 60 + 61 + type t = { 62 + table : (string, Member.t) Kcas_data.Hashtbl.t; 63 + count : int Kcas.Loc.t; 64 + } 65 + 66 + let create () = { table = Kcas_data.Hashtbl.create (); count = Kcas.Loc.make 0 } 67 + let key_of_id (Node_id s) = s 68 + 69 + let add t (member : Member.t) = 70 + Kcas.Xt.commit 71 + { 72 + tx = 73 + (fun ~xt -> 74 + let key = key_of_id member.node.id in 75 + match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table key with 76 + | Some _ -> () 77 + | None -> 78 + Kcas_data.Hashtbl.Xt.replace ~xt t.table key member; 79 + Kcas.Xt.modify ~xt t.count succ); 80 + } 81 + 82 + let remove t id = 83 + Kcas.Xt.commit 84 + { 85 + tx = 86 + (fun ~xt -> 87 + let key = key_of_id id in 88 + match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table key with 89 + | None -> false 90 + | Some _ -> 91 + Kcas_data.Hashtbl.Xt.remove ~xt t.table key; 92 + Kcas.Xt.modify ~xt t.count pred; 93 + true); 94 + } 95 + 96 + let find t id = 97 + Kcas.Xt.commit 98 + { 99 + tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.find_opt ~xt t.table (key_of_id id)); 100 + } 101 + 102 + let mem t id = 103 + Kcas.Xt.commit 104 + { 105 + tx = 106 + (fun ~xt -> 107 + Option.is_some 108 + (Kcas_data.Hashtbl.Xt.find_opt ~xt t.table (key_of_id id))); 109 + } 110 + 111 + let to_list t = Kcas_data.Hashtbl.to_seq t.table |> Seq.map snd |> List.of_seq 112 + let to_node_list t = to_list t |> List.map Member.node 113 + let count t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.count) } 114 + 115 + type member_updater = { update : 'x. Member.t -> xt:'x Kcas.Xt.t -> unit } 116 + 117 + let update_member t id updater = 118 + match Kcas_data.Hashtbl.find_opt t.table (key_of_id id) with 119 + | None -> false 120 + | Some member -> 121 + Kcas.Xt.commit { tx = (fun ~xt -> updater.update member ~xt) }; 122 + true 123 + 124 + let iter_alive t f = 125 + to_list t 126 + |> List.iter (fun m -> 127 + let snap = Member.snapshot_now m in 128 + if snap.state = Alive then f m snap) 129 + 130 + let snapshot_all t = to_list t |> List.map Member.snapshot_now
+50
lib/membership.mli
··· 1 + open Types 2 + 3 + module Member : sig 4 + type t 5 + 6 + val create : 7 + ?initial_state:member_state -> 8 + ?initial_incarnation:incarnation -> 9 + now:Mtime.span -> 10 + node_info -> 11 + t 12 + 13 + val node : t -> node_info 14 + val get_state : xt:'x Kcas.Xt.t -> t -> member_state 15 + val get_incarnation : xt:'x Kcas.Xt.t -> t -> incarnation 16 + val get_state_change_time : xt:'x Kcas.Xt.t -> t -> Mtime.span 17 + val get_last_ack_time : xt:'x Kcas.Xt.t -> t -> Mtime.span 18 + val set_state : xt:'x Kcas.Xt.t -> t -> member_state -> now:Mtime.span -> unit 19 + val set_incarnation : xt:'x Kcas.Xt.t -> t -> incarnation -> unit 20 + 21 + val set_alive : 22 + xt:'x Kcas.Xt.t -> t -> incarnation:incarnation -> now:Mtime.span -> unit 23 + 24 + val set_suspect : 25 + xt:'x Kcas.Xt.t -> t -> incarnation:incarnation -> now:Mtime.span -> unit 26 + 27 + val set_dead : 28 + xt:'x Kcas.Xt.t -> t -> incarnation:incarnation -> now:Mtime.span -> unit 29 + 30 + val record_ack : xt:'x Kcas.Xt.t -> t -> now:Mtime.span -> unit 31 + val snapshot : xt:'x Kcas.Xt.t -> t -> member_snapshot 32 + val snapshot_now : t -> member_snapshot 33 + end 34 + 35 + type t 36 + 37 + val create : unit -> t 38 + val add : t -> Member.t -> unit 39 + val remove : t -> node_id -> bool 40 + val find : t -> node_id -> Member.t option 41 + val mem : t -> node_id -> bool 42 + val to_list : t -> Member.t list 43 + val to_node_list : t -> node_info list 44 + val count : t -> int 45 + 46 + type member_updater = { update : 'x. Member.t -> xt:'x Kcas.Xt.t -> unit } 47 + 48 + val update_member : t -> node_id -> member_updater -> bool 49 + val iter_alive : t -> (Member.t -> member_snapshot -> unit) -> unit 50 + val snapshot_all : t -> member_snapshot list
+47
lib/pending_acks.ml
··· 1 + type waiter = { 2 + promise : string option Eio.Promise.t; 3 + resolver : string option Eio.Promise.u; 4 + } 5 + 6 + type t = { table : (int, waiter) Kcas_data.Hashtbl.t } 7 + 8 + let create () = { table = Kcas_data.Hashtbl.create () } 9 + 10 + let register t ~seq = 11 + let promise, resolver = Eio.Promise.create () in 12 + let w = { promise; resolver } in 13 + Kcas.Xt.commit 14 + { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.replace ~xt t.table seq w) }; 15 + w 16 + 17 + let complete t ~seq ~payload = 18 + let found = 19 + Kcas.Xt.commit 20 + { 21 + tx = 22 + (fun ~xt -> 23 + match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table seq with 24 + | None -> None 25 + | Some w -> 26 + Kcas_data.Hashtbl.Xt.remove ~xt t.table seq; 27 + Some w); 28 + } 29 + in 30 + match found with 31 + | None -> false 32 + | Some w -> 33 + Eio.Promise.resolve w.resolver payload; 34 + true 35 + 36 + let wait w ~timeout ~clock = 37 + try 38 + Some 39 + (Eio.Time.with_timeout_exn clock timeout (fun () -> 40 + Eio.Promise.await w.promise)) 41 + with Eio.Time.Timeout -> None 42 + 43 + let cancel t ~seq = 44 + Kcas.Xt.commit 45 + { tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.remove ~xt t.table seq) } 46 + 47 + let pending_count t = Kcas_data.Hashtbl.length t.table
+16
lib/pending_acks.mli
··· 1 + type waiter = { 2 + promise : string option Eio.Promise.t; 3 + resolver : string option Eio.Promise.u; 4 + } 5 + 6 + type t 7 + 8 + val create : unit -> t 9 + val register : t -> seq:int -> waiter 10 + val complete : t -> seq:int -> payload:string option -> bool 11 + 12 + val wait : 13 + waiter -> timeout:float -> clock:_ Eio.Time.clock -> string option option 14 + 15 + val cancel : t -> seq:int -> unit 16 + val pending_count : t -> int
+403
lib/protocol.ml
··· 1 + open Types 2 + 3 + type t = { 4 + config : config; 5 + self : node_info; 6 + members : Membership.t; 7 + incarnation : incarnation Kcas.Loc.t; 8 + sequence : int Kcas.Loc.t; 9 + broadcast_queue : Dissemination.t; 10 + pending_acks : Pending_acks.t; 11 + probe_index : int Kcas.Loc.t; 12 + send_pool : Buffer_pool.t; 13 + recv_pool : Buffer_pool.t; 14 + udp_sock : [ `Generic ] Eio.Net.datagram_socket_ty Eio.Resource.t; 15 + event_stream : node_event Eio.Stream.t; 16 + user_handlers : (node_info -> string -> string -> unit) list Kcas.Loc.t; 17 + cipher_key : Crypto.key; 18 + stats : stats Kcas.Loc.t; 19 + shutdown : bool Kcas.Loc.t; 20 + clock : float Eio.Time.clock_ty Eio.Resource.t; 21 + mono_clock : Eio.Time.Mono.ty Eio.Resource.t; 22 + secure_random : Eio.Flow.source_ty Eio.Resource.t; 23 + } 24 + 25 + let next_seq t = 26 + Kcas.Xt.commit 27 + { 28 + tx = 29 + (fun ~xt -> 30 + let seq = Kcas.Xt.get ~xt t.sequence in 31 + Kcas.Xt.set ~xt t.sequence (seq + 1); 32 + seq); 33 + } 34 + 35 + let get_incarnation t = 36 + Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.incarnation) } 37 + 38 + let incr_my_incarnation t = 39 + Kcas.Xt.commit 40 + { 41 + tx = 42 + (fun ~xt -> 43 + let inc = Kcas.Xt.get ~xt t.incarnation in 44 + let new_inc = incr_incarnation inc in 45 + Kcas.Xt.set ~xt t.incarnation new_inc; 46 + new_inc); 47 + } 48 + 49 + let is_shutdown t = 50 + Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.shutdown) } 51 + 52 + let now_mtime t = 53 + Eio.Time.Mono.now t.mono_clock 54 + |> Mtime.to_uint64_ns |> Mtime.Span.of_uint64_ns 55 + 56 + let update_stats t f = 57 + Kcas.Xt.commit 58 + { 59 + tx = 60 + (fun ~xt -> 61 + let s = Kcas.Xt.get ~xt t.stats in 62 + Kcas.Xt.set ~xt t.stats (f s)); 63 + } 64 + 65 + let emit_event t ev = Eio.Stream.add t.event_stream ev 66 + 67 + let send_packet t ~dst (packet : packet) = 68 + Buffer_pool.with_buffer t.send_pool (fun buf -> 69 + match Codec.encode_packet packet ~buf with 70 + | Error `Buffer_too_small -> () 71 + | Ok encoded_len -> 72 + let encoded = Cstruct.sub buf 0 encoded_len in 73 + let encrypted = 74 + Crypto.encrypt ~key:t.cipher_key ~random:t.secure_random encoded 75 + in 76 + Transport.send_udp t.udp_sock dst encrypted; 77 + update_stats t (fun s -> { s with msgs_sent = s.msgs_sent + 1 })) 78 + 79 + let make_packet t ~primary ~piggyback = 80 + { cluster = t.config.cluster_name; primary; piggyback } 81 + 82 + let drain_piggyback t ~max_bytes = 83 + Dissemination.drain t.broadcast_queue ~max_bytes 84 + ~encode_size:Codec.encoded_size 85 + 86 + let enqueue_broadcast t msg = 87 + let transmits = 88 + Protocol_pure.retransmit_limit t.config 89 + ~node_count:(Membership.count t.members) 90 + in 91 + Dissemination.enqueue t.broadcast_queue msg ~transmits ~created:(now_mtime t); 92 + Dissemination.invalidate t.broadcast_queue 93 + ~invalidates:Protocol_pure.invalidates msg 94 + 95 + let handle_ping t ~src (ping : protocol_msg) = 96 + match ping with 97 + | Ping { seq; sender = _ } -> 98 + let piggyback = 99 + drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 100 + in 101 + let ack = Ack { seq; responder = t.self; payload = None } in 102 + let packet = make_packet t ~primary:ack ~piggyback in 103 + send_packet t ~dst:src packet 104 + | _ -> () 105 + 106 + let handle_ping_req t ~src:_ (ping_req : protocol_msg) = 107 + match ping_req with 108 + | Ping_req { seq; target; sender = _ } -> ( 109 + match Membership.find t.members target with 110 + | None -> () 111 + | Some member -> 112 + let target_addr = (Membership.Member.node member).addr in 113 + let ping = Ping { seq; sender = t.self } in 114 + let packet = make_packet t ~primary:ping ~piggyback:[] in 115 + send_packet t ~dst:target_addr packet) 116 + | _ -> () 117 + 118 + let handle_ack t (ack : protocol_msg) = 119 + match ack with 120 + | Ack { seq; responder = _; payload } -> 121 + ignore (Pending_acks.complete t.pending_acks ~seq ~payload) 122 + | _ -> () 123 + 124 + let apply_member_transition t member_id transition_fn = 125 + let now = now_mtime t in 126 + match Membership.find t.members member_id with 127 + | None -> () 128 + | Some member -> 129 + let snap = Membership.Member.snapshot_now member in 130 + let transition = transition_fn snap ~now in 131 + if transition.Protocol_pure.new_state.state <> snap.state then begin 132 + Membership.update_member t.members member_id 133 + { 134 + update = 135 + (fun m ~xt -> 136 + match transition.new_state.state with 137 + | Alive -> 138 + Membership.Member.set_alive ~xt m 139 + ~incarnation:transition.new_state.incarnation ~now 140 + | Suspect -> 141 + Membership.Member.set_suspect ~xt m 142 + ~incarnation:transition.new_state.incarnation ~now 143 + | Dead -> 144 + Membership.Member.set_dead ~xt m 145 + ~incarnation:transition.new_state.incarnation ~now); 146 + } 147 + |> ignore 148 + end; 149 + List.iter (fun msg -> enqueue_broadcast t msg) transition.broadcasts; 150 + List.iter (emit_event t) transition.events 151 + 152 + let handle_alive_msg t (msg : protocol_msg) = 153 + match msg with 154 + | Alive { node; incarnation = _ } -> 155 + apply_member_transition t node.id (fun snap ~now -> 156 + Protocol_pure.handle_alive ~self:t.self.id snap msg ~now) 157 + | _ -> () 158 + 159 + let handle_suspect_msg t (msg : protocol_msg) = 160 + match msg with 161 + | Suspect { node; incarnation = _; suspector = _ } -> 162 + apply_member_transition t node (fun snap ~now -> 163 + Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 164 + | _ -> () 165 + 166 + let handle_dead_msg t (msg : protocol_msg) = 167 + match msg with 168 + | Dead { node; incarnation = _; declarator = _ } -> 169 + apply_member_transition t node (fun snap ~now -> 170 + Protocol_pure.handle_dead snap msg ~now) 171 + | _ -> () 172 + 173 + let handle_user_msg t (msg : protocol_msg) = 174 + match msg with 175 + | User_msg { topic; payload; origin } -> ( 176 + let handlers = 177 + Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.user_handlers) } 178 + in 179 + match Membership.find t.members origin with 180 + | None -> () 181 + | Some member -> 182 + let node = Membership.Member.node member in 183 + List.iter (fun h -> h node topic payload) handlers) 184 + | _ -> () 185 + 186 + let handle_message t ~src (msg : protocol_msg) = 187 + match msg with 188 + | Ping _ -> handle_ping t ~src msg 189 + | Ping_req _ -> handle_ping_req t ~src msg 190 + | Ack _ -> handle_ack t msg 191 + | Alive _ -> handle_alive_msg t msg 192 + | Suspect _ -> handle_suspect_msg t msg 193 + | Dead _ -> handle_dead_msg t msg 194 + | User_msg _ -> handle_user_msg t msg 195 + 196 + let handle_packet t ~src (packet : packet) = 197 + if String.equal packet.cluster t.config.cluster_name then begin 198 + handle_message t ~src packet.primary; 199 + List.iter (handle_message t ~src) packet.piggyback; 200 + update_stats t (fun s -> { s with msgs_received = s.msgs_received + 1 }) 201 + end 202 + 203 + let process_udp_packet t ~buf ~src = 204 + match Crypto.decrypt ~key:t.cipher_key buf with 205 + | Error _ -> 206 + update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 207 + | Ok decrypted -> ( 208 + match Codec.decode_packet decrypted with 209 + | Error _ -> 210 + update_stats t (fun s -> { s with msgs_dropped = s.msgs_dropped + 1 }) 211 + | Ok packet -> handle_packet t ~src packet) 212 + 213 + let run_udp_receiver t = 214 + while not (is_shutdown t) do 215 + Buffer_pool.with_buffer t.recv_pool (fun buf -> 216 + let n, src = Transport.recv_udp t.udp_sock buf in 217 + let received = Cstruct.sub buf 0 n in 218 + process_udp_packet t ~buf:received ~src) 219 + done 220 + 221 + let probe_member t (member : Membership.Member.t) = 222 + let target = Membership.Member.node member in 223 + let seq = next_seq t in 224 + let piggyback = 225 + drain_piggyback t ~max_bytes:(t.config.udp_buffer_size - 100) 226 + in 227 + let ping = Ping { seq; sender = t.self } in 228 + let packet = make_packet t ~primary:ping ~piggyback in 229 + 230 + let waiter = Pending_acks.register t.pending_acks ~seq in 231 + send_packet t ~dst:target.addr packet; 232 + 233 + match 234 + Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 235 + with 236 + | Some _ -> 237 + let now = now_mtime t in 238 + Membership.update_member t.members target.id 239 + { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 240 + |> ignore; 241 + true 242 + | None -> 243 + Pending_acks.cancel t.pending_acks ~seq; 244 + false 245 + 246 + let indirect_probe t (member : Membership.Member.t) = 247 + let target = Membership.Member.node member in 248 + let seq = next_seq t in 249 + let ping_req = Ping_req { seq; target = target.id; sender = t.self } in 250 + 251 + let all_members = Membership.to_node_list t.members in 252 + let indirect_targets = 253 + Protocol_pure.select_indirect_targets ~self:t.self.id ~exclude:target.id 254 + ~count:t.config.indirect_checks ~members:all_members 255 + in 256 + 257 + let waiter = Pending_acks.register t.pending_acks ~seq in 258 + List.iter 259 + (fun node -> 260 + let packet = make_packet t ~primary:ping_req ~piggyback:[] in 261 + send_packet t ~dst:node.addr packet) 262 + indirect_targets; 263 + 264 + match 265 + Pending_acks.wait waiter ~timeout:t.config.probe_timeout ~clock:t.clock 266 + with 267 + | Some _ -> 268 + let now = now_mtime t in 269 + Membership.update_member t.members target.id 270 + { update = (fun m ~xt -> Membership.Member.record_ack ~xt m ~now) } 271 + |> ignore; 272 + true 273 + | None -> 274 + Pending_acks.cancel t.pending_acks ~seq; 275 + false 276 + 277 + let suspect_member t (member : Membership.Member.t) = 278 + let node = Membership.Member.node member in 279 + let inc = get_incarnation t in 280 + let msg = 281 + Suspect { node = node.id; incarnation = inc; suspector = t.self.id } 282 + in 283 + apply_member_transition t node.id (fun snap ~now -> 284 + Protocol_pure.handle_suspect ~self:t.self.id snap msg ~now) 285 + 286 + let probe_cycle t = 287 + let members = Membership.to_list t.members in 288 + let member_nodes = List.map Membership.Member.node members in 289 + let probe_idx = 290 + Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.probe_index) } 291 + in 292 + 293 + match 294 + Protocol_pure.next_probe_target ~self:t.self.id ~probe_index:probe_idx 295 + ~members:member_nodes 296 + with 297 + | None -> () 298 + | Some (target_node, new_idx) -> ( 299 + Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.probe_index new_idx) }; 300 + match Membership.find t.members target_node.id with 301 + | None -> () 302 + | Some member -> 303 + let direct_ok = probe_member t member in 304 + if not direct_ok then 305 + let indirect_ok = indirect_probe t member in 306 + if not indirect_ok then suspect_member t member) 307 + 308 + let run_protocol t = 309 + while not (is_shutdown t) do 310 + probe_cycle t; 311 + Eio.Time.sleep t.clock t.config.protocol_interval 312 + done 313 + 314 + let create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random = 315 + match Crypto.init_key config.secret_key with 316 + | Error _ -> Error `Invalid_key 317 + | Ok cipher_key -> 318 + Ok 319 + { 320 + config; 321 + self; 322 + members = Membership.create (); 323 + incarnation = Kcas.Loc.make zero_incarnation; 324 + sequence = Kcas.Loc.make 0; 325 + broadcast_queue = Dissemination.create (); 326 + pending_acks = Pending_acks.create (); 327 + probe_index = Kcas.Loc.make 0; 328 + send_pool = 329 + Buffer_pool.create ~size:config.udp_buffer_size 330 + ~count:config.send_buffer_count; 331 + recv_pool = 332 + Buffer_pool.create ~size:config.udp_buffer_size 333 + ~count:config.recv_buffer_count; 334 + udp_sock; 335 + event_stream = Eio.Stream.create 100; 336 + user_handlers = Kcas.Loc.make []; 337 + cipher_key; 338 + stats = Kcas.Loc.make empty_stats; 339 + shutdown = Kcas.Loc.make false; 340 + clock; 341 + mono_clock; 342 + secure_random; 343 + } 344 + 345 + let shutdown t = 346 + Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.set ~xt t.shutdown true) } 347 + 348 + let add_member t node_info = 349 + let now = now_mtime t in 350 + let member = Membership.Member.create ~now node_info in 351 + Membership.add t.members member; 352 + emit_event t (Join node_info) 353 + 354 + let remove_member t node_id = 355 + match Membership.find t.members node_id with 356 + | None -> false 357 + | Some member -> 358 + let node = Membership.Member.node member in 359 + let removed = Membership.remove t.members node_id in 360 + if removed then emit_event t (Leave node); 361 + removed 362 + 363 + let local_node t = t.self 364 + let members t = Membership.to_list t.members 365 + let member_count t = Membership.count t.members 366 + let events t = t.event_stream 367 + 368 + let stats t = 369 + let base = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.stats) } in 370 + let alive, suspect, dead = 371 + Membership.snapshot_all t.members 372 + |> List.fold_left 373 + (fun (a, s, d) snap -> 374 + match snap.state with 375 + | Alive -> (a + 1, s, d) 376 + | Suspect -> (a, s + 1, d) 377 + | Dead -> (a, s, d + 1)) 378 + (0, 0, 0) 379 + in 380 + { 381 + base with 382 + nodes_alive = alive; 383 + nodes_suspect = suspect; 384 + nodes_dead = dead; 385 + queue_depth = Dissemination.depth t.broadcast_queue; 386 + buffers_available = 387 + Buffer_pool.available t.send_pool + Buffer_pool.available t.recv_pool; 388 + buffers_total = 389 + Buffer_pool.total t.send_pool + Buffer_pool.total t.recv_pool; 390 + } 391 + 392 + let broadcast t ~topic ~payload = 393 + let msg = User_msg { topic; payload; origin = t.self.id } in 394 + enqueue_broadcast t msg 395 + 396 + let on_message t handler = 397 + Kcas.Xt.commit 398 + { 399 + tx = 400 + (fun ~xt -> 401 + let handlers = Kcas.Xt.get ~xt t.user_handlers in 402 + Kcas.Xt.set ~xt t.user_handlers (handler :: handlers)); 403 + }
+190
lib/protocol_pure.ml
··· 1 + open Types 2 + 3 + type 'a transition = { 4 + new_state : 'a; 5 + broadcasts : protocol_msg list; 6 + events : node_event list; 7 + } 8 + 9 + let no_change state = { new_state = state; broadcasts = []; events = [] } 10 + 11 + let node_id_of_msg = function 12 + | Ping { sender; _ } -> sender.id 13 + | Ping_req { sender; _ } -> sender.id 14 + | Ack { responder; _ } -> responder.id 15 + | Alive { node; _ } -> node.id 16 + | Suspect { node; _ } -> node 17 + | Dead { node; _ } -> node 18 + | User_msg { origin; _ } -> origin 19 + 20 + let incarnation_of_msg = function 21 + | Alive { incarnation; _ } -> Some incarnation 22 + | Suspect { incarnation; _ } -> Some incarnation 23 + | Dead { incarnation; _ } -> Some incarnation 24 + | Ping _ | Ping_req _ | Ack _ | User_msg _ -> None 25 + 26 + let handle_alive ~(self : node_id) (member : member_snapshot) 27 + (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition = 28 + let _ = self in 29 + match msg with 30 + | Alive { node; incarnation = msg_inc } -> 31 + if not (equal_node_id node.id member.node.id) then no_change member 32 + else 33 + let cmp = compare_incarnation msg_inc member.incarnation in 34 + if cmp > 0 then 35 + let new_member = 36 + { node; state = Alive; incarnation = msg_inc; state_change = now } 37 + in 38 + let events = 39 + match member.state with 40 + | Dead -> [ Join node ] 41 + | Suspect -> [ Alive_event node ] 42 + | Alive -> [ Update node ] 43 + in 44 + { new_state = new_member; broadcasts = [ msg ]; events } 45 + else if cmp = 0 && member.state = Suspect then 46 + let new_member = { member with state = Alive; state_change = now } in 47 + { 48 + new_state = new_member; 49 + broadcasts = [ msg ]; 50 + events = [ Alive_event node ]; 51 + } 52 + else no_change member 53 + | _ -> no_change member 54 + 55 + let handle_suspect ~(self : node_id) (member : member_snapshot) 56 + (msg : protocol_msg) ~(now : Mtime.span) : member_snapshot transition = 57 + match msg with 58 + | Suspect { node = msg_node; incarnation = msg_inc; suspector = _ } -> 59 + if not (equal_node_id msg_node member.node.id) then no_change member 60 + else if member.state = Dead then no_change member 61 + else if equal_node_id msg_node self then 62 + let new_inc = incr_incarnation member.incarnation in 63 + let refute = Alive { node = member.node; incarnation = new_inc } in 64 + { 65 + new_state = { member with incarnation = new_inc }; 66 + broadcasts = [ refute ]; 67 + events = []; 68 + } 69 + else 70 + let dominated = 71 + compare_incarnation msg_inc member.incarnation > 0 72 + || compare_incarnation msg_inc member.incarnation = 0 73 + && member.state = Alive 74 + in 75 + if dominated then 76 + let new_member = 77 + { 78 + member with 79 + state = Suspect; 80 + incarnation = msg_inc; 81 + state_change = now; 82 + } 83 + in 84 + { 85 + new_state = new_member; 86 + broadcasts = [ msg ]; 87 + events = [ Suspect_event member.node ]; 88 + } 89 + else no_change member 90 + | _ -> no_change member 91 + 92 + let handle_dead (member : member_snapshot) (msg : protocol_msg) 93 + ~(now : Mtime.span) : member_snapshot transition = 94 + match msg with 95 + | Dead { node = msg_node; incarnation = msg_inc; declarator = _ } -> 96 + if not (equal_node_id msg_node member.node.id) then no_change member 97 + else if member.state = Dead then no_change member 98 + else if compare_incarnation msg_inc member.incarnation >= 0 then 99 + let new_member = 100 + { 101 + member with 102 + state = Dead; 103 + incarnation = msg_inc; 104 + state_change = now; 105 + } 106 + in 107 + { 108 + new_state = new_member; 109 + broadcasts = [ msg ]; 110 + events = [ Leave member.node ]; 111 + } 112 + else no_change member 113 + | _ -> no_change member 114 + 115 + let suspicion_timeout (config : config) ~node_count = 116 + let base = 117 + float_of_int config.suspicion_mult 118 + *. log (float_of_int (max 1 node_count) +. 1.) 119 + *. config.protocol_interval 120 + in 121 + Float.min base config.suspicion_max_timeout 122 + 123 + let retransmit_limit (config : config) ~node_count = 124 + let log_n = log (float_of_int (max 1 node_count) +. 1.) in 125 + int_of_float (ceil (float_of_int config.retransmit_mult *. log_n)) 126 + 127 + let next_probe_target ~(self : node_id) ~probe_index ~(members : node_info list) 128 + = 129 + match members with 130 + | [] -> None 131 + | _ -> 132 + let len = List.length members in 133 + let rec find idx attempts = 134 + if attempts >= len then None 135 + else 136 + let candidate = List.nth members (idx mod len) in 137 + if equal_node_id candidate.id self then find (idx + 1) (attempts + 1) 138 + else Some (candidate, (idx + 1) mod len) 139 + in 140 + find probe_index 0 141 + 142 + let invalidates ~(newer : protocol_msg) ~(older : protocol_msg) : bool = 143 + let newer_id = node_id_of_msg newer in 144 + let older_id = node_id_of_msg older in 145 + if not (equal_node_id newer_id older_id) then false 146 + else 147 + match (newer, older) with 148 + | Dead _, (Alive _ | Suspect _ | Dead _) -> true 149 + | Alive { incarnation = new_inc; _ }, Suspect { incarnation = old_inc; _ } 150 + -> 151 + compare_incarnation new_inc old_inc >= 0 152 + | Alive { incarnation = new_inc; _ }, Alive { incarnation = old_inc; _ } -> 153 + compare_incarnation new_inc old_inc > 0 154 + | Suspect { incarnation = new_inc; _ }, Suspect { incarnation = old_inc; _ } 155 + -> 156 + compare_incarnation new_inc old_inc > 0 157 + | _ -> false 158 + 159 + let merge_member_state ~(local : member_snapshot) ~(remote : member_snapshot) : 160 + member_snapshot = 161 + if not (equal_node_id local.node.id remote.node.id) then local 162 + else 163 + match (local.state, remote.state) with 164 + | Dead, _ -> local 165 + | _, Dead -> 166 + if compare_incarnation remote.incarnation local.incarnation >= 0 then 167 + remote 168 + else local 169 + | Alive, Alive | Suspect, Suspect -> 170 + if compare_incarnation remote.incarnation local.incarnation > 0 then 171 + remote 172 + else local 173 + | Alive, Suspect -> 174 + if compare_incarnation remote.incarnation local.incarnation > 0 then 175 + remote 176 + else local 177 + | Suspect, Alive -> 178 + if compare_incarnation remote.incarnation local.incarnation >= 0 then 179 + remote 180 + else local 181 + 182 + let select_indirect_targets ~(self : node_id) ~exclude ~count 183 + ~(members : node_info list) : node_info list = 184 + members 185 + |> List.filter (fun m -> 186 + (not (equal_node_id m.id self)) && not (equal_node_id m.id exclude)) 187 + |> fun candidates -> 188 + let len = List.length candidates in 189 + if len <= count then candidates 190 + else List.filteri (fun i _ -> i < count) candidates
+52
lib/protocol_pure.mli
··· 1 + open Types 2 + 3 + type 'a transition = { 4 + new_state : 'a; 5 + broadcasts : protocol_msg list; 6 + events : node_event list; 7 + } 8 + 9 + val no_change : 'a -> 'a transition 10 + val node_id_of_msg : protocol_msg -> node_id 11 + val incarnation_of_msg : protocol_msg -> incarnation option 12 + 13 + val handle_alive : 14 + self:node_id -> 15 + member_snapshot -> 16 + protocol_msg -> 17 + now:Mtime.span -> 18 + member_snapshot transition 19 + 20 + val handle_suspect : 21 + self:node_id -> 22 + member_snapshot -> 23 + protocol_msg -> 24 + now:Mtime.span -> 25 + member_snapshot transition 26 + 27 + val handle_dead : 28 + member_snapshot -> 29 + protocol_msg -> 30 + now:Mtime.span -> 31 + member_snapshot transition 32 + 33 + val suspicion_timeout : config -> node_count:int -> float 34 + val retransmit_limit : config -> node_count:int -> int 35 + 36 + val next_probe_target : 37 + self:node_id -> 38 + probe_index:int -> 39 + members:node_info list -> 40 + (node_info * int) option 41 + 42 + val invalidates : newer:protocol_msg -> older:protocol_msg -> bool 43 + 44 + val merge_member_state : 45 + local:member_snapshot -> remote:member_snapshot -> member_snapshot 46 + 47 + val select_indirect_targets : 48 + self:node_id -> 49 + exclude:node_id -> 50 + count:int -> 51 + members:node_info list -> 52 + node_info list
+99
lib/swim.ml
··· 1 + module Types = Types 2 + module Codec = Codec 3 + module Crypto = Crypto 4 + module Buffer_pool = Buffer_pool 5 + module Protocol_pure = Protocol_pure 6 + module Membership = Membership 7 + module Dissemination = Dissemination 8 + module Pending_acks = Pending_acks 9 + module Transport = Transport 10 + module Protocol = Protocol 11 + 12 + module Cluster = struct 13 + type t = { protocol : Protocol.t; sw : Eio.Switch.t } 14 + 15 + let create ~sw ~(env : _ Types.env) ~config = 16 + let net = env.stdenv#net in 17 + let clock = env.stdenv#clock in 18 + let mono_clock = env.stdenv#mono_clock in 19 + let secure_random = env.stdenv#secure_random in 20 + 21 + let node_name = 22 + match config.Types.node_name with 23 + | Some name -> name 24 + | None -> Printf.sprintf "node-%d" (Random.int 100000) 25 + in 26 + let self_id = Types.node_id_of_string node_name in 27 + 28 + let udp_sock = 29 + Transport.create_udp_socket net ~sw ~addr:config.bind_addr 30 + ~port:config.bind_port 31 + in 32 + 33 + let self_addr = 34 + `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port) 35 + in 36 + let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in 37 + 38 + match 39 + Protocol.create ~config ~self ~udp_sock ~clock ~mono_clock ~secure_random 40 + with 41 + | Error `Invalid_key -> Error `Invalid_key 42 + | Ok protocol -> Ok { protocol; sw } 43 + 44 + let start t = 45 + Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_protocol t.protocol); 46 + Eio.Fiber.fork ~sw:t.sw (fun () -> Protocol.run_udp_receiver t.protocol) 47 + 48 + let shutdown t = Protocol.shutdown t.protocol 49 + let local_node t = Protocol.local_node t.protocol 50 + let members t = Protocol.members t.protocol |> List.map Membership.Member.node 51 + let member_count t = Protocol.member_count t.protocol 52 + let events t = Protocol.events t.protocol 53 + let stats t = Protocol.stats t.protocol 54 + let add_member t node_info = Protocol.add_member t.protocol node_info 55 + let remove_member t node_id = Protocol.remove_member t.protocol node_id 56 + 57 + let join t ~seed_nodes = 58 + let parse_and_try seed = 59 + match Transport.parse_udp_addr seed with 60 + | Error _ -> false 61 + | Ok addr -> 62 + let node_id = Types.node_id_of_string seed in 63 + let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in 64 + Protocol.add_member t.protocol node; 65 + true 66 + in 67 + let any_success = List.exists parse_and_try seed_nodes in 68 + if any_success then Ok () else Error `No_seeds_reachable 69 + 70 + let broadcast t ~topic ~payload = 71 + Protocol.broadcast t.protocol ~topic ~payload 72 + 73 + let on_message t handler = Protocol.on_message t.protocol handler 74 + 75 + let is_alive t node_id = 76 + match 77 + Membership.find 78 + ( Protocol.members t.protocol |> fun _ -> 79 + let p = t.protocol in 80 + p.Protocol.members ) 81 + node_id 82 + with 83 + | None -> false 84 + | Some member -> 85 + let snap = Membership.Member.snapshot_now member in 86 + snap.Types.state = Types.Alive 87 + 88 + let find_node t node_id = 89 + match Membership.find t.protocol.Protocol.members node_id with 90 + | None -> None 91 + | Some member -> Some (Membership.Member.node member) 92 + 93 + let is_healthy t = 94 + let s = stats t in 95 + s.Types.nodes_alive > 0 96 + end 97 + 98 + let default_config = Types.default_config 99 + let version = "0.1.0"
+76
lib/transport.ml
··· 1 + open Types 2 + 3 + let create_udp_socket (net : _ Eio.Net.t) ~sw ~addr ~port : 4 + _ Eio.Net.datagram_socket_ty Eio.Resource.t = 5 + let bind_addr = 6 + match Eio.Net.Ipaddr.of_raw addr with ip -> `Udp (ip, port) 7 + in 8 + Eio.Net.datagram_socket ~sw net bind_addr 9 + 10 + let send_udp sock dst buf = Eio.Net.send sock ~dst [ buf ] 11 + 12 + let recv_udp sock buf = 13 + let dst, n = Eio.Net.recv sock buf in 14 + (n, dst) 15 + 16 + let create_tcp_listener (net : _ Eio.Net.t) ~sw ~addr ~port ~backlog = 17 + let bind_addr = 18 + match Eio.Net.Ipaddr.of_raw addr with ip -> `Tcp (ip, port) 19 + in 20 + Eio.Net.listen ~sw ~backlog net bind_addr 21 + 22 + let connect_tcp (net : _ Eio.Net.t) ~sw ~addr ~timeout ~clock = 23 + try 24 + Ok 25 + (Eio.Time.with_timeout_exn clock timeout (fun () -> 26 + Eio.Net.connect ~sw net addr)) 27 + with 28 + | Eio.Time.Timeout -> Error Timeout 29 + | Eio.Io (Eio.Net.E (Connection_failure _), _) -> Error Node_unreachable 30 + | Eio.Io (Eio.Net.E (Connection_reset _), _) -> Error Connection_reset 31 + 32 + let send_tcp sock buf = 33 + try 34 + Eio.Flow.write sock [ buf ]; 35 + Ok () 36 + with 37 + | Eio.Io (Eio.Net.E (Connection_reset _), _) -> Error Connection_reset 38 + | End_of_file -> Error Connection_reset 39 + 40 + let recv_tcp sock buf = 41 + try 42 + let n = Eio.Flow.single_read sock buf in 43 + Ok n 44 + with 45 + | Eio.Io (Eio.Net.E (Connection_reset _), _) -> Error Connection_reset 46 + | End_of_file -> Error Connection_reset 47 + 48 + let parse_addr_port s = 49 + match String.rindex_opt s ':' with 50 + | None -> Error `Invalid_addr 51 + | Some idx -> ( 52 + let host = String.sub s 0 idx in 53 + let port_str = String.sub s (idx + 1) (String.length s - idx - 1) in 54 + match int_of_string_opt port_str with 55 + | None -> Error `Invalid_addr 56 + | Some port -> 57 + if port < 0 || port > 65535 then Error `Invalid_addr 58 + else Ok (host, port)) 59 + 60 + let parse_udp_addr s = 61 + match parse_addr_port s with 62 + | Error e -> Error e 63 + | Ok (host, port) -> ( 64 + try 65 + let ip = Eio.Net.Ipaddr.of_raw host in 66 + Ok (`Udp (ip, port)) 67 + with _ -> Error `Invalid_addr) 68 + 69 + let parse_tcp_addr s = 70 + match parse_addr_port s with 71 + | Error e -> Error e 72 + | Ok (host, port) -> ( 73 + try 74 + let ip = Eio.Net.Ipaddr.of_raw host in 75 + Ok (`Tcp (ip, port)) 76 + with _ -> Error `Invalid_addr)
+154
lib/types.ml
··· 1 + type node_id = Node_id of string [@@unboxed] 2 + 3 + let node_id_to_string (Node_id s) = s 4 + let node_id_of_string s = Node_id s 5 + let equal_node_id (Node_id a) (Node_id b) = String.equal a b 6 + let compare_node_id (Node_id a) (Node_id b) = String.compare a b 7 + 8 + type incarnation = Incarnation of int [@@unboxed] 9 + 10 + let incarnation_to_int (Incarnation i) = i 11 + let incarnation_of_int i = Incarnation i 12 + let zero_incarnation = Incarnation 0 13 + let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b 14 + let incr_incarnation (Incarnation i) = Incarnation (i + 1) 15 + 16 + type addr = Eio.Net.Sockaddr.datagram 17 + type node_info = { id : node_id; addr : addr; meta : string } 18 + 19 + let make_node_info ~id ~addr ~meta = { id; addr; meta } 20 + 21 + type member_state = Alive | Suspect | Dead 22 + 23 + let member_state_to_string = function 24 + | Alive -> "alive" 25 + | Suspect -> "suspect" 26 + | Dead -> "dead" 27 + 28 + type member_snapshot = { 29 + node : node_info; 30 + state : member_state; 31 + incarnation : incarnation; 32 + state_change : Mtime.span; 33 + } 34 + 35 + type protocol_msg = 36 + | Ping of { seq : int; sender : node_info } 37 + | Ping_req of { seq : int; target : node_id; sender : node_info } 38 + | Ack of { seq : int; responder : node_info; payload : string option } 39 + | Alive of { node : node_info; incarnation : incarnation } 40 + | Suspect of { 41 + node : node_id; 42 + incarnation : incarnation; 43 + suspector : node_id; 44 + } 45 + | Dead of { node : node_id; incarnation : incarnation; declarator : node_id } 46 + | User_msg of { topic : string; payload : string; origin : node_id } 47 + 48 + type packet = { 49 + cluster : string; 50 + primary : protocol_msg; 51 + piggyback : protocol_msg list; 52 + } 53 + 54 + type decode_error = 55 + | Invalid_magic 56 + | Unsupported_version of int 57 + | Truncated_message 58 + | Invalid_tag of int 59 + | Decryption_failed 60 + 61 + let decode_error_to_string = function 62 + | Invalid_magic -> "invalid magic bytes" 63 + | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v 64 + | Truncated_message -> "truncated message" 65 + | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t 66 + | Decryption_failed -> "decryption failed" 67 + 68 + type send_error = Node_unreachable | Timeout | Connection_reset 69 + 70 + let send_error_to_string = function 71 + | Node_unreachable -> "node unreachable" 72 + | Timeout -> "timeout" 73 + | Connection_reset -> "connection reset" 74 + 75 + type node_event = 76 + | Join of node_info 77 + | Leave of node_info 78 + | Update of node_info 79 + | Suspect_event of node_info 80 + | Alive_event of node_info 81 + 82 + type config = { 83 + bind_addr : string; 84 + bind_port : int; 85 + node_name : string option; 86 + protocol_interval : float; 87 + probe_timeout : float; 88 + indirect_checks : int; 89 + suspicion_mult : int; 90 + suspicion_max_timeout : float; 91 + retransmit_mult : int; 92 + udp_buffer_size : int; 93 + tcp_timeout : float; 94 + send_buffer_count : int; 95 + recv_buffer_count : int; 96 + secret_key : string; 97 + cluster_name : string; 98 + } 99 + 100 + let default_config = 101 + { 102 + bind_addr = "0.0.0.0"; 103 + bind_port = 7946; 104 + node_name = None; 105 + protocol_interval = 1.0; 106 + probe_timeout = 0.5; 107 + indirect_checks = 3; 108 + suspicion_mult = 4; 109 + suspicion_max_timeout = 60.0; 110 + retransmit_mult = 4; 111 + udp_buffer_size = 1400; 112 + tcp_timeout = 10.0; 113 + send_buffer_count = 16; 114 + recv_buffer_count = 16; 115 + secret_key = String.make 32 '\x00'; 116 + cluster_name = "default"; 117 + } 118 + 119 + type 'a env = { 120 + stdenv : 'a; 121 + sw : Eio.Switch.t; 122 + } 123 + constraint 124 + 'a = 125 + < net : _ Eio.Net.t 126 + ; clock : _ Eio.Time.clock 127 + ; mono_clock : _ Eio.Time.Mono.t 128 + ; secure_random : _ Eio.Flow.source 129 + ; .. > 130 + 131 + type stats = { 132 + nodes_alive : int; 133 + nodes_suspect : int; 134 + nodes_dead : int; 135 + msgs_sent : int; 136 + msgs_received : int; 137 + msgs_dropped : int; 138 + queue_depth : int; 139 + buffers_available : int; 140 + buffers_total : int; 141 + } 142 + 143 + let empty_stats = 144 + { 145 + nodes_alive = 0; 146 + nodes_suspect = 0; 147 + nodes_dead = 0; 148 + msgs_sent = 0; 149 + msgs_received = 0; 150 + msgs_dropped = 0; 151 + queue_depth = 0; 152 + buffers_available = 0; 153 + buffers_total = 0; 154 + }
+115
lib/types.mli
··· 1 + type node_id = Node_id of string [@@unboxed] 2 + 3 + val node_id_to_string : node_id -> string 4 + val node_id_of_string : string -> node_id 5 + val equal_node_id : node_id -> node_id -> bool 6 + val compare_node_id : node_id -> node_id -> int 7 + 8 + type incarnation = Incarnation of int [@@unboxed] 9 + 10 + val incarnation_to_int : incarnation -> int 11 + val incarnation_of_int : int -> incarnation 12 + val zero_incarnation : incarnation 13 + val compare_incarnation : incarnation -> incarnation -> int 14 + val incr_incarnation : incarnation -> incarnation 15 + 16 + type addr = Eio.Net.Sockaddr.datagram 17 + type node_info = { id : node_id; addr : addr; meta : string } 18 + 19 + val make_node_info : id:node_id -> addr:addr -> meta:string -> node_info 20 + 21 + type member_state = Alive | Suspect | Dead 22 + 23 + val member_state_to_string : member_state -> string 24 + 25 + type member_snapshot = { 26 + node : node_info; 27 + state : member_state; 28 + incarnation : incarnation; 29 + state_change : Mtime.span; 30 + } 31 + 32 + type protocol_msg = 33 + | Ping of { seq : int; sender : node_info } 34 + | Ping_req of { seq : int; target : node_id; sender : node_info } 35 + | Ack of { seq : int; responder : node_info; payload : string option } 36 + | Alive of { node : node_info; incarnation : incarnation } 37 + | Suspect of { 38 + node : node_id; 39 + incarnation : incarnation; 40 + suspector : node_id; 41 + } 42 + | Dead of { node : node_id; incarnation : incarnation; declarator : node_id } 43 + | User_msg of { topic : string; payload : string; origin : node_id } 44 + 45 + type packet = { 46 + cluster : string; 47 + primary : protocol_msg; 48 + piggyback : protocol_msg list; 49 + } 50 + 51 + type decode_error = 52 + | Invalid_magic 53 + | Unsupported_version of int 54 + | Truncated_message 55 + | Invalid_tag of int 56 + | Decryption_failed 57 + 58 + val decode_error_to_string : decode_error -> string 59 + 60 + type send_error = Node_unreachable | Timeout | Connection_reset 61 + 62 + val send_error_to_string : send_error -> string 63 + 64 + type node_event = 65 + | Join of node_info 66 + | Leave of node_info 67 + | Update of node_info 68 + | Suspect_event of node_info 69 + | Alive_event of node_info 70 + 71 + type config = { 72 + bind_addr : string; 73 + bind_port : int; 74 + node_name : string option; 75 + protocol_interval : float; 76 + probe_timeout : float; 77 + indirect_checks : int; 78 + suspicion_mult : int; 79 + suspicion_max_timeout : float; 80 + retransmit_mult : int; 81 + udp_buffer_size : int; 82 + tcp_timeout : float; 83 + send_buffer_count : int; 84 + recv_buffer_count : int; 85 + secret_key : string; 86 + cluster_name : string; 87 + } 88 + 89 + val default_config : config 90 + 91 + type 'a env = { 92 + stdenv : 'a; 93 + sw : Eio.Switch.t; 94 + } 95 + constraint 96 + 'a = 97 + < clock : _ Eio.Time.clock 98 + ; mono_clock : _ Eio.Time.Mono.t 99 + ; net : _ Eio.Net.t 100 + ; secure_random : _ Eio.Flow.source 101 + ; .. > 102 + 103 + type stats = { 104 + nodes_alive : int; 105 + nodes_suspect : int; 106 + nodes_dead : int; 107 + msgs_sent : int; 108 + msgs_received : int; 109 + msgs_dropped : int; 110 + queue_depth : int; 111 + buffers_available : int; 112 + buffers_total : int; 113 + } 114 + 115 + val empty_stats : stats
+43
opencode.json
··· 1 + { 2 + "$schema": "https://opencode.ai/config.json", 3 + "snapshot": false, 4 + "lsp": { 5 + "ocaml-lsp": { 6 + "command": ["ocamllsp"], 7 + "extensions": [".ml", ".mli"] 8 + }, 9 + "clangd": { 10 + "command": ["clangd"], 11 + "extensions": [".c", ".h", ".cpp", ".hpp", ".cc", ".cxx", ".hh", ".hxx"] 12 + } 13 + }, 14 + "formatter": { 15 + "ocamlformat": { 16 + "command": ["ocamlformat", "--inplace", "$FILE"], 17 + "extensions": [".ml", ".mli"] 18 + }, 19 + "clang-format": { 20 + "command": ["clang-format", "-i", "$FILE"], 21 + "extensions": [".c", ".h", ".cpp", ".hpp", ".cc", ".cxx", ".hh", ".hxx"] 22 + } 23 + }, 24 + "mcp": { 25 + "beads": { 26 + "type": "local", 27 + "command": ["uv", "tool", "run", "beads-mcp"], 28 + "enabled": true 29 + }, 30 + "tod": { 31 + "type": "local", 32 + "command": ["tod", "mcp", "--log-file", "/tmp/tod.log"], 33 + "enabled": true 34 + }, 35 + "jetbrains": { 36 + "type": "local", 37 + "environment": { 38 + "IJ_MCP_SERVER_PORT": "64342" 39 + }, 40 + "command": [ "/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/jbr/bin/java", "-classpath", "/home/gdiazlo/.local/share/JetBrains/CLion2025.3/mcpserver/lib/mcpserver.jar:/home/gdiazlo/.local/share/JetBrains/CLion2025.3/mcpserver/lib/io.modelcontextprotocol.kotlin.sdk.jar:/home/gdiazlo/.local/share/JetBrains/CLion2025.3/mcpserver/lib/io.github.oshai.kotlin.logging.jvm.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/util-8.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/module-intellij.libraries.ktor.client.cio.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/module-intellij.libraries.ktor.client.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/module-intellij.libraries.ktor.network.tls.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/module-intellij.libraries.ktor.io.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/module-intellij.libraries.ktor.utils.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/module-intellij.libraries.kotlinx.io.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/module-intellij.libraries.kotlinx.serialization.core.jar:/home/gdiazlo/.local/share/JetBrains/Toolbox/apps/clion/lib/module-intellij.libraries.kotlinx.serialization.json.jar", "com.intellij.mcpserver.stdio.McpStdioRunnerKt" ] 41 + } 42 + } 43 + }
+48
swim.opam
··· 1 + # This file is generated by dune, edit dune-project instead 2 + opam-version: "2.0" 3 + synopsis: 4 + "SWIM protocol library for cluster membership and failure detection" 5 + description: 6 + "Production-ready SWIM (Scalable Weakly-consistent Infection-style Process Group Membership) protocol library in OCaml 5 for cluster membership, failure detection, and lightweight pub/sub messaging. Features lock-free coordination via kcas, zero-copy buffer management, and AES-256-GCM encryption." 7 + maintainer: ["Guillermo Diaz-Romero <guillermo.diaz@gmail.com>"] 8 + authors: ["Guillermo Diaz-Romero <guillermo.diaz@gmail.com>"] 9 + license: "MIT" 10 + tags: [ 11 + "swim" "cluster" "membership" "gossip" "failure detection" "ocaml5" "eio" 12 + ] 13 + homepage: "https://github.com/gdiazlo/swim" 14 + doc: "https://github.com/gdiazlo/swim" 15 + bug-reports: "https://github.com/gdiazlo/swim/issues" 16 + depends: [ 17 + "ocaml" {>= "5.1"} 18 + "dune" {>= "3.20" & >= "3.20"} 19 + "eio" {>= "1.0"} 20 + "eio_main" {>= "1.0"} 21 + "kcas" {>= "0.7"} 22 + "kcas_data" {>= "0.7"} 23 + "mirage-crypto" {>= "1.0"} 24 + "mirage-crypto-rng" {>= "1.0"} 25 + "cstruct" {>= "6.0"} 26 + "mtime" {>= "2.0"} 27 + "qcheck" {>= "0.21"} 28 + "qcheck-alcotest" {>= "0.21"} 29 + "alcotest" {>= "1.7"} 30 + "logs" {>= "0.7"} 31 + "odoc" {with-doc} 32 + ] 33 + build: [ 34 + ["dune" "subst"] {dev} 35 + [ 36 + "dune" 37 + "build" 38 + "-p" 39 + name 40 + "-j" 41 + jobs 42 + "@install" 43 + "@runtest" {with-test} 44 + "@doc" {with-doc} 45 + ] 46 + ] 47 + dev-repo: "git+https://github.com/gdiazlo/swim.git" 48 + x-maintenance-intent: ["(latest)"]
+8
test/dune
··· 1 + (test 2 + (name test_swim) 3 + (libraries 4 + swim 5 + alcotest 6 + qcheck 7 + qcheck-alcotest 8 + eio_main))
+9
test/test_swim.ml
··· 1 + let () = 2 + Alcotest.run "swim" 3 + [ 4 + ( "version", 5 + [ 6 + Alcotest.test_case "version is set" `Quick (fun () -> 7 + Alcotest.(check string) "version" "0.1.0" Swim.version); 8 + ] ); 9 + ]