an atproto pds written in F# (.NET 9) 馃
pds
fsharp
giraffe
dotnet
atproto
1namespace PDSharp.Core
2
3open System
4open System.Threading
5
6/// Event stream (firehose) for com.atproto.sync.subscribeRepos
7module Firehose =
8
9 /// Commit event sent to subscribers
10 type CommitEvent = {
11 Seq : int64
12 Did : string
13 Rev : string
14 Commit : Cid
15 Blocks : byte[]
16 Time : DateTimeOffset
17 }
18
19 /// Mutable sequence counter for firehose events
20 let private seqCounter = ref 0L
21
22 /// Get the next sequence number (thread-safe, monotonic)
23 let nextSeq () : int64 = Interlocked.Increment(seqCounter)
24
25 /// Get current sequence without incrementing (for cursor resume)
26 let currentSeq () : int64 = seqCounter.Value
27
28 /// Create a commit event for a repository write
29 let createCommitEvent (did : string) (rev : string) (commitCid : Cid) (carBytes : byte[]) : CommitEvent = {
30 Seq = nextSeq ()
31 Did = did
32 Rev = rev
33 Commit = commitCid
34 Blocks = carBytes
35 Time = DateTimeOffset.UtcNow
36 }
37
38 /// Encode a commit event to DAG-CBOR bytes for WebSocket transmission
39 /// Format follows AT Protocol #commit message structure
40 let encodeEvent (event : CommitEvent) : byte[] =
41 let eventMap : Map<string, obj> =
42 Map.ofList [
43 "$type", box "com.atproto.sync.subscribeRepos#commit"
44 "seq", box event.Seq
45 "did", box event.Did
46 "rev", box event.Rev
47 "commit", box event.Commit
48 "blocks", box event.Blocks
49 "time", box (event.Time.ToString("o"))
50 ]
51
52 DagCbor.encode eventMap
53
54 /// Reset sequence counter (for testing)
55 let resetSeq () =
56 Interlocked.Exchange(seqCounter, 0L) |> ignore