+56
PDSharp.Core/Firehose.fs
+56
PDSharp.Core/Firehose.fs
···
···
1
+
namespace PDSharp.Core
2
+
3
+
open System
4
+
open System.Threading
5
+
6
+
/// Event stream (firehose) for com.atproto.sync.subscribeRepos
7
+
module Firehose =
8
+
9
+
/// Commit event sent to subscribers
10
+
type CommitEvent = {
11
+
Seq : int64
12
+
Did : string
13
+
Rev : string
14
+
Commit : Cid
15
+
Blocks : byte[]
16
+
Time : DateTimeOffset
17
+
}
18
+
19
+
/// Mutable sequence counter for firehose events
20
+
let private seqCounter = ref 0L
21
+
22
+
/// Get the next sequence number (thread-safe, monotonic)
23
+
let nextSeq () : int64 = Interlocked.Increment(seqCounter)
24
+
25
+
/// Get current sequence without incrementing (for cursor resume)
26
+
let currentSeq () : int64 = seqCounter.Value
27
+
28
+
/// Create a commit event for a repository write
29
+
let createCommitEvent (did : string) (rev : string) (commitCid : Cid) (carBytes : byte[]) : CommitEvent = {
30
+
Seq = nextSeq ()
31
+
Did = did
32
+
Rev = rev
33
+
Commit = commitCid
34
+
Blocks = carBytes
35
+
Time = DateTimeOffset.UtcNow
36
+
}
37
+
38
+
/// Encode a commit event to DAG-CBOR bytes for WebSocket transmission
39
+
/// Format follows AT Protocol #commit message structure
40
+
let encodeEvent (event : CommitEvent) : byte[] =
41
+
let eventMap : Map<string, obj> =
42
+
Map.ofList [
43
+
"$type", box "com.atproto.sync.subscribeRepos#commit"
44
+
"seq", box event.Seq
45
+
"did", box event.Did
46
+
"rev", box event.Rev
47
+
"commit", box event.Commit
48
+
"blocks", box event.Blocks
49
+
"time", box (event.Time.ToString("o"))
50
+
]
51
+
52
+
DagCbor.encode eventMap
53
+
54
+
/// Reset sequence counter (for testing)
55
+
let resetSeq () =
56
+
Interlocked.Exchange(seqCounter, 0L) |> ignore
+1
PDSharp.Core/PDSharp.Core.fsproj
+1
PDSharp.Core/PDSharp.Core.fsproj
+55
PDSharp.Tests/Firehose.Tests.fs
+55
PDSharp.Tests/Firehose.Tests.fs
···
···
1
+
module Firehose.Tests
2
+
3
+
open Xunit
4
+
open PDSharp.Core
5
+
open PDSharp.Core.Firehose
6
+
open PDSharp.Core.Crypto
7
+
8
+
[<Fact>]
9
+
let ``nextSeq monotonically increases`` () =
10
+
resetSeq ()
11
+
let seq1 = nextSeq ()
12
+
let seq2 = nextSeq ()
13
+
let seq3 = nextSeq ()
14
+
15
+
Assert.Equal(1L, seq1)
16
+
Assert.Equal(2L, seq2)
17
+
Assert.Equal(3L, seq3)
18
+
19
+
[<Fact>]
20
+
let ``currentSeq returns without incrementing`` () =
21
+
resetSeq ()
22
+
let _ = nextSeq () // 1
23
+
let _ = nextSeq () // 2
24
+
let current = currentSeq ()
25
+
let next = nextSeq ()
26
+
27
+
Assert.Equal(2L, current)
28
+
Assert.Equal(3L, next)
29
+
30
+
[<Fact>]
31
+
let ``createCommitEvent has correct fields`` () =
32
+
resetSeq ()
33
+
let hash = sha256Str "test"
34
+
let cid = Cid.FromHash hash
35
+
let carBytes = [| 0x01uy; 0x02uy |]
36
+
37
+
let event = createCommitEvent "did:web:test" "rev123" cid carBytes
38
+
39
+
Assert.Equal(1L, event.Seq)
40
+
Assert.Equal("did:web:test", event.Did)
41
+
Assert.Equal("rev123", event.Rev)
42
+
Assert.Equal<byte[]>(cid.Bytes, event.Commit.Bytes)
43
+
Assert.Equal<byte[]>(carBytes, event.Blocks)
44
+
45
+
[<Fact>]
46
+
let ``encodeEvent produces valid CBOR`` () =
47
+
resetSeq ()
48
+
let hash = sha256Str "test"
49
+
let cid = Cid.FromHash hash
50
+
let carBytes = [| 0x01uy; 0x02uy |]
51
+
let event = createCommitEvent "did:web:test" "rev123" cid carBytes
52
+
let encoded = encodeEvent event
53
+
54
+
Assert.True(encoded.Length > 0)
55
+
Assert.True(encoded.[0] >= 0xa0uy, "Should encode as CBOR map")
+1
PDSharp.Tests/PDSharp.Tests.fsproj
+1
PDSharp.Tests/PDSharp.Tests.fsproj
+87
-1
PDSharp/Program.fs
+87
-1
PDSharp/Program.fs
···
16
open PDSharp.Core.Repository
17
open PDSharp.Core.Mst
18
open PDSharp.Core.Crypto
19
20
module App =
21
/// Repo state per DID: MST root, collections, current rev, head commit CID
···
39
let blockStore = MemoryBlockStore()
40
let mutable signingKeys : Map<string, EcKeyPair> = Map.empty
41
42
let getOrCreateKey (did : string) =
43
match Map.tryFind did signingKeys with
44
| Some k -> k
···
155
}
156
157
repos <- Map.add did updatedRepo repos
158
159
let uri = $"at://{did}/{request.collection}/{rkey}"
160
ctx.SetStatusCode 200
···
304
305
repos <- Map.add did updatedRepo repos
306
307
ctx.SetStatusCode 200
308
309
return!
···
496
return! ctx.WriteBytesAsync data
497
}
498
499
let webApp =
500
choose [
501
GET
···
507
GET >=> route "/xrpc/com.atproto.sync.getRepo" >=> getRepoHandler
508
GET >=> route "/xrpc/com.atproto.sync.getBlocks" >=> getBlocksHandler
509
GET >=> route "/xrpc/com.atproto.sync.getBlob" >=> getBlobHandler
510
route "/" >=> text "PDSharp PDS is running."
511
RequestErrors.NOT_FOUND "Not Found"
512
]
513
514
-
let configureApp (app : IApplicationBuilder) = app.UseGiraffe webApp
515
516
let configureServices (config : AppConfig) (services : IServiceCollection) =
517
services.AddGiraffe() |> ignore
···
16
open PDSharp.Core.Repository
17
open PDSharp.Core.Mst
18
open PDSharp.Core.Crypto
19
+
open PDSharp.Core.Firehose
20
21
module App =
22
/// Repo state per DID: MST root, collections, current rev, head commit CID
···
40
let blockStore = MemoryBlockStore()
41
let mutable signingKeys : Map<string, EcKeyPair> = Map.empty
42
43
+
// Firehose subscriber management
44
+
open System.Net.WebSockets
45
+
open System.Collections.Concurrent
46
+
47
+
/// Connected WebSocket subscribers
48
+
let subscribers = ConcurrentDictionary<Guid, WebSocket>()
49
+
50
+
/// Broadcast a commit event to all connected subscribers
51
+
let broadcastEvent (event : CommitEvent) =
52
+
let eventBytes = encodeEvent event
53
+
let segment = ArraySegment<byte>(eventBytes)
54
+
55
+
for kvp in subscribers do
56
+
let ws = kvp.Value
57
+
58
+
if ws.State = WebSocketState.Open then
59
+
try
60
+
ws.SendAsync(segment, WebSocketMessageType.Binary, true, Threading.CancellationToken.None)
61
+
|> Async.AwaitTask
62
+
|> Async.RunSynchronously
63
+
with _ ->
64
+
subscribers.TryRemove(kvp.Key) |> ignore
65
+
66
let getOrCreateKey (did : string) =
67
match Map.tryFind did signingKeys with
68
| Some k -> k
···
179
}
180
181
repos <- Map.add did updatedRepo repos
182
+
183
+
let! allBlocks = (blockStore :> IBlockStore).GetAllCidsAndData()
184
+
let carBytes = Car.createCar [ commitCid ] allBlocks
185
+
let event = createCommitEvent did newRev commitCid carBytes
186
+
broadcastEvent event
187
188
let uri = $"at://{did}/{request.collection}/{rkey}"
189
ctx.SetStatusCode 200
···
333
334
repos <- Map.add did updatedRepo repos
335
336
+
let! allBlocks = (blockStore :> IBlockStore).GetAllCidsAndData()
337
+
let carBytes = Car.createCar [ commitCid ] allBlocks
338
+
let event = createCommitEvent did newRev commitCid carBytes
339
+
broadcastEvent event
340
+
341
ctx.SetStatusCode 200
342
343
return!
···
530
return! ctx.WriteBytesAsync data
531
}
532
533
+
/// subscribeRepos: WebSocket firehose endpoint
534
+
let subscribeReposHandler : HttpHandler =
535
+
fun next ctx -> task {
536
+
if ctx.WebSockets.IsWebSocketRequest then
537
+
let cursor =
538
+
match ctx.Request.Query.TryGetValue("cursor") with
539
+
| true, v when not (String.IsNullOrWhiteSpace(v.ToString())) ->
540
+
Int64.TryParse(v.ToString())
541
+
|> function
542
+
| true, n -> Some n
543
+
| _ -> None
544
+
| _ -> None
545
+
546
+
let! webSocket = ctx.WebSockets.AcceptWebSocketAsync()
547
+
let id = Guid.NewGuid()
548
+
subscribers.TryAdd(id, webSocket) |> ignore
549
+
550
+
let buffer = Array.zeroCreate<byte> 1024
551
+
552
+
try
553
+
let mutable loop = true
554
+
555
+
while loop && webSocket.State = WebSocketState.Open do
556
+
let! result = webSocket.ReceiveAsync(ArraySegment(buffer), Threading.CancellationToken.None)
557
+
558
+
if result.MessageType = WebSocketMessageType.Close then
559
+
loop <- false
560
+
finally
561
+
subscribers.TryRemove(id) |> ignore
562
+
563
+
if webSocket.State = WebSocketState.Open then
564
+
webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closed", Threading.CancellationToken.None)
565
+
|> Async.AwaitTask
566
+
|> Async.RunSynchronously
567
+
568
+
return Some ctx
569
+
else
570
+
ctx.SetStatusCode 400
571
+
572
+
return!
573
+
json
574
+
{
575
+
error = "InvalidRequest"
576
+
message = "WebSocket upgrade required"
577
+
}
578
+
next
579
+
ctx
580
+
}
581
+
582
let webApp =
583
choose [
584
GET
···
590
GET >=> route "/xrpc/com.atproto.sync.getRepo" >=> getRepoHandler
591
GET >=> route "/xrpc/com.atproto.sync.getBlocks" >=> getBlocksHandler
592
GET >=> route "/xrpc/com.atproto.sync.getBlob" >=> getBlobHandler
593
+
GET >=> route "/xrpc/com.atproto.sync.subscribeRepos" >=> subscribeReposHandler
594
route "/" >=> text "PDSharp PDS is running."
595
RequestErrors.NOT_FOUND "Not Found"
596
]
597
598
+
let configureApp (app : IApplicationBuilder) =
599
+
app.UseWebSockets() |> ignore
600
+
app.UseGiraffe webApp
601
602
let configureServices (config : AppConfig) (services : IServiceCollection) =
603
services.AddGiraffe() |> ignore
+66
-2
README.md
+66
-2
README.md
···
34
35
The server will start at `http://localhost:5000`.
36
37
+
## API Testing
38
39
+
### Server Info
40
41
```bash
42
curl http://localhost:5000/xrpc/com.atproto.server.describeServer
43
+
```
44
+
45
+
### Record Operations
46
+
47
+
**Create a record:**
48
+
49
+
```bash
50
+
curl -X POST http://localhost:5000/xrpc/com.atproto.repo.createRecord \
51
+
-H "Content-Type: application/json" \
52
+
-d '{"repo":"did:web:test","collection":"app.bsky.feed.post","record":{"text":"Hello, ATProto!"}}'
53
+
```
54
+
55
+
**Get a record** (use the rkey from createRecord response):
56
+
57
+
```bash
58
+
curl "http://localhost:5000/xrpc/com.atproto.repo.getRecord?repo=did:web:test&collection=app.bsky.feed.post&rkey=<RKEY>"
59
+
```
60
+
61
+
**Put a record** (upsert with explicit rkey):
62
+
63
+
```bash
64
+
curl -X POST http://localhost:5000/xrpc/com.atproto.repo.putRecord \
65
+
-H "Content-Type: application/json" \
66
+
-d '{"repo":"did:web:test","collection":"app.bsky.feed.post","rkey":"my-post","record":{"text":"Updated!"}}'
67
+
```
68
+
69
+
### Sync & CAR Export
70
+
71
+
**Get entire repository as CAR:**
72
+
73
+
```bash
74
+
curl "http://localhost:5000/xrpc/com.atproto.sync.getRepo?did=did:web:test" -o repo.car
75
+
```
76
+
77
+
**Get specific blocks** (comma-separated CIDs):
78
+
79
+
```bash
80
+
curl "http://localhost:5000/xrpc/com.atproto.sync.getBlocks?did=did:web:test&cids=<CID1>,<CID2>" -o blocks.car
81
+
```
82
+
83
+
**Get a blob by CID:**
84
+
85
+
```bash
86
+
curl "http://localhost:5000/xrpc/com.atproto.sync.getBlob?did=did:web:test&cid=<BLOB_CID>"
87
+
```
88
+
89
+
### Firehose (WebSocket)
90
+
91
+
Subscribe to real-time commit events using [websocat](https://github.com/vi/websocat):
92
+
93
+
```bash
94
+
# Install websocat (macOS)
95
+
brew install websocat
96
+
97
+
# Connect to firehose
98
+
websocat ws://localhost:5000/xrpc/com.atproto.sync.subscribeRepos
99
+
```
100
+
101
+
Then create/update records in another terminal to see CBOR-encoded commit events stream in real-time.
102
+
103
+
**With cursor for resumption:**
104
+
105
+
```bash
106
+
websocat "ws://localhost:5000/xrpc/com.atproto.sync.subscribeRepos?cursor=5"
107
```
108
109
## Configuration
+3
-3
roadmap.txt
+3
-3
roadmap.txt
···
43
--------------------------------------------------------------------------------
44
Milestone G: subscribeRepos Firehose
45
--------------------------------------------------------------------------------
46
-
- Monotonic sequence number + commit event generation
47
-
- WebSocket streaming for subscribeRepos
48
DoD: Relay/client receives commit events after writes
49
--------------------------------------------------------------------------------
50
Milestone H: Account + Sessions
···
120
[x] MST producing deterministic root CIDs
121
[x] putRecord + blockstore operational
122
[x] CAR export + sync endpoints
123
-
[ ] subscribeRepos firehose
124
[ ] Authentication (createAccount, createSession)
125
[ ] Lexicon validation
126
[ ] Domain + TLS configured
···
43
--------------------------------------------------------------------------------
44
Milestone G: subscribeRepos Firehose
45
--------------------------------------------------------------------------------
46
+
- [x] Monotonic sequence number + commit event generation
47
+
- [x] WebSocket streaming for subscribeRepos
48
DoD: Relay/client receives commit events after writes
49
--------------------------------------------------------------------------------
50
Milestone H: Account + Sessions
···
120
[x] MST producing deterministic root CIDs
121
[x] putRecord + blockstore operational
122
[x] CAR export + sync endpoints
123
+
[x] subscribeRepos firehose
124
[ ] Authentication (createAccount, createSession)
125
[ ] Lexicon validation
126
[ ] Domain + TLS configured