an atproto pds written in F# (.NET 9) 馃
pds fsharp giraffe dotnet atproto
at main 1.6 kB view raw
1namespace PDSharp.Core 2 3open System 4open System.Threading 5 6/// Event stream (firehose) for com.atproto.sync.subscribeRepos 7module Firehose = 8 9 /// Commit event sent to subscribers 10 type CommitEvent = { 11 Seq : int64 12 Did : string 13 Rev : string 14 Commit : Cid 15 Blocks : byte[] 16 Time : DateTimeOffset 17 } 18 19 /// Mutable sequence counter for firehose events 20 let private seqCounter = ref 0L 21 22 /// Get the next sequence number (thread-safe, monotonic) 23 let nextSeq () : int64 = Interlocked.Increment(seqCounter) 24 25 /// Get current sequence without incrementing (for cursor resume) 26 let currentSeq () : int64 = seqCounter.Value 27 28 /// Create a commit event for a repository write 29 let createCommitEvent (did : string) (rev : string) (commitCid : Cid) (carBytes : byte[]) : CommitEvent = { 30 Seq = nextSeq () 31 Did = did 32 Rev = rev 33 Commit = commitCid 34 Blocks = carBytes 35 Time = DateTimeOffset.UtcNow 36 } 37 38 /// Encode a commit event to DAG-CBOR bytes for WebSocket transmission 39 /// Format follows AT Protocol #commit message structure 40 let encodeEvent (event : CommitEvent) : byte[] = 41 let eventMap : Map<string, obj> = 42 Map.ofList [ 43 "$type", box "com.atproto.sync.subscribeRepos#commit" 44 "seq", box event.Seq 45 "did", box event.Did 46 "rev", box event.Rev 47 "commit", box event.Commit 48 "blocks", box event.Blocks 49 "time", box (event.Time.ToString("o")) 50 ] 51 52 DagCbor.encode eventMap 53 54 /// Reset sequence counter (for testing) 55 let resetSeq () = 56 Interlocked.Exchange(seqCounter, 0L) |> ignore