an atproto pds written in F# (.NET 9) 🦒
pds fsharp giraffe dotnet atproto

feat: CARv1 writer

* add CID parsing utilities

+9 -4
PDSharp.Core/BlockStore.fs
··· 10 10 abstract member Get : Cid -> Async<byte[] option> 11 11 abstract member Put : byte[] -> Async<Cid> 12 12 abstract member Has : Cid -> Async<bool> 13 + abstract member GetAllCidsAndData : unit -> Async<(Cid * byte[]) list> 13 14 14 15 /// In-memory implementation of IBlockStore for testing 15 16 type MemoryBlockStore() = 16 - let store = ConcurrentDictionary<string, byte[]>() 17 + let store = ConcurrentDictionary<string, (Cid * byte[])>() 17 18 18 19 let cidKey (cid : Cid) = 19 20 System.Convert.ToBase64String(cid.Bytes) ··· 21 22 interface IBlockStore with 22 23 member _.Get(cid : Cid) = async { 23 24 let key = cidKey cid 24 - let success, data = store.TryGetValue(key) 25 - return if success then Some data else None 25 + 26 + match store.TryGetValue(key) with 27 + | true, (_, data) -> return Some data 28 + | false, _ -> return None 26 29 } 27 30 28 31 member _.Put(data : byte[]) = async { 29 32 let hash = Crypto.sha256 data 30 33 let cid = Cid.FromHash hash 31 34 let key = cidKey cid 32 - store.[key] <- data 35 + store.[key] <- (cid, data) 33 36 return cid 34 37 } 35 38 ··· 37 40 let key = cidKey cid 38 41 return store.ContainsKey(key) 39 42 } 43 + 44 + member _.GetAllCidsAndData() = async { return store.Values |> Seq.toList } 40 45 41 46 /// Get the number of blocks stored (for testing) 42 47 member _.Count = store.Count
+85
PDSharp.Core/Car.fs
··· 1 + namespace PDSharp.Core 2 + 3 + open System 4 + open System.IO 5 + 6 + /// CARv1 (Content Addressable aRchives) writer module 7 + /// Implements the CAR format per https://ipld.io/specs/transport/car/carv1/ 8 + module Car = 9 + /// Encode an unsigned integer as LEB128 varint 10 + let encodeVarint (value : int) : byte[] = 11 + if value < 0 then 12 + failwith "Varint value must be non-negative" 13 + elif value = 0 then 14 + [| 0uy |] 15 + else 16 + use ms = new MemoryStream() 17 + let mutable v = value 18 + 19 + while v > 0 do 20 + let mutable b = byte (v &&& 0x7F) 21 + v <- v >>> 7 22 + 23 + if v > 0 then 24 + b <- b ||| 0x80uy 25 + 26 + ms.WriteByte(b) 27 + 28 + ms.ToArray() 29 + 30 + /// Create CAR header as DAG-CBOR encoded bytes 31 + /// Header format: { version: 1, roots: [CID, ...] } 32 + let createHeader (roots : Cid list) : byte[] = 33 + let headerMap = 34 + Map.ofList [ ("roots", box (roots |> List.map box)); ("version", box 1) ] 35 + 36 + DagCbor.encode headerMap 37 + 38 + /// Encode a single block section: varint(len) | CID bytes | block data 39 + let encodeBlock (cid : Cid) (data : byte[]) : byte[] = 40 + let cidBytes = cid.Bytes 41 + let sectionLen = cidBytes.Length + data.Length 42 + let varintBytes = encodeVarint sectionLen 43 + 44 + use ms = new MemoryStream() 45 + ms.Write(varintBytes, 0, varintBytes.Length) 46 + ms.Write(cidBytes, 0, cidBytes.Length) 47 + ms.Write(data, 0, data.Length) 48 + ms.ToArray() 49 + 50 + /// Create a complete CARv1 file from roots and blocks 51 + /// CAR format: [varint | header] [varint | CID | block]... 52 + let createCar (roots : Cid list) (blocks : (Cid * byte[]) seq) : byte[] = 53 + use ms = new MemoryStream() 54 + 55 + let headerBytes = createHeader roots 56 + let headerVarint = encodeVarint headerBytes.Length 57 + ms.Write(headerVarint, 0, headerVarint.Length) 58 + ms.Write(headerBytes, 0, headerBytes.Length) 59 + 60 + for cid, data in blocks do 61 + let blockSection = encodeBlock cid data 62 + ms.Write(blockSection, 0, blockSection.Length) 63 + 64 + ms.ToArray() 65 + 66 + /// Create a CAR from a single root with an async block fetcher 67 + let createCarAsync (roots : Cid list) (getBlock : Cid -> Async<byte[] option>) (allCids : Cid seq) = async { 68 + use ms = new MemoryStream() 69 + 70 + let headerBytes = createHeader roots 71 + let headerVarint = encodeVarint headerBytes.Length 72 + ms.Write(headerVarint, 0, headerVarint.Length) 73 + ms.Write(headerBytes, 0, headerBytes.Length) 74 + 75 + for cid in allCids do 76 + let! dataOpt = getBlock cid 77 + 78 + match dataOpt with 79 + | Some data -> 80 + let blockSection = encodeBlock cid data 81 + ms.Write(blockSection, 0, blockSection.Length) 82 + | None -> () 83 + 84 + return ms.ToArray() 85 + }
+41
PDSharp.Core/Cid.fs
··· 43 43 44 44 sb.ToString() 45 45 46 + let FromString (s : string) : byte[] option = 47 + if String.IsNullOrEmpty s then 48 + Some [||] 49 + else 50 + try 51 + let bits = s.Length * 5 52 + let bytes = Array.zeroCreate<byte> (bits / 8) 53 + let mutable buffer = 0 54 + let mutable bitsInBuffer = 0 55 + let mutable byteIndex = 0 56 + 57 + for c in s do 58 + let idx = alphabet.IndexOf(Char.ToLowerInvariant c) 59 + 60 + if idx < 0 then 61 + failwith "Invalid base32 character" 62 + 63 + buffer <- buffer <<< 5 ||| idx 64 + bitsInBuffer <- bitsInBuffer + 5 65 + 66 + if bitsInBuffer >= 8 then 67 + bitsInBuffer <- bitsInBuffer - 8 68 + 69 + if byteIndex < bytes.Length then 70 + bytes.[byteIndex] <- byte ((buffer >>> bitsInBuffer) &&& 0xFF) 71 + byteIndex <- byteIndex + 1 72 + 73 + Some bytes 74 + with _ -> 75 + None 76 + 46 77 /// Basic CID implementation for AT Protocol (CIDv1 + dag-cbor + sha2-256) 47 78 /// 48 79 /// Constants for ATProto defaults: ··· 65 96 cidBytes.[3] <- 0x20uy 66 97 Array.Copy(hash, 0, cidBytes, 4, 32) 67 98 Cid cidBytes 99 + 100 + static member TryParse(s : string) : Cid option = 101 + if String.IsNullOrWhiteSpace s then 102 + None 103 + elif s.StartsWith("b") then 104 + match Base32Encoding.FromString(s.Substring(1)) with 105 + | Some bytes when bytes.Length = 36 -> Some(Cid bytes) 106 + | _ -> None 107 + else 108 + None 68 109 69 110 override this.ToString() = 70 111 "b" + Base32Encoding.ToString(this.Bytes)
+1
PDSharp.Core/PDSharp.Core.fsproj
··· 11 11 <Compile Include="Crypto.fs" /> 12 12 <Compile Include="Mst.fs" /> 13 13 <Compile Include="BlockStore.fs" /> 14 + <Compile Include="Car.fs" /> 14 15 <Compile Include="AtUri.fs" /> 15 16 <Compile Include="Repository.fs" /> 16 17 <Compile Include="DidResolver.fs" />
+71
PDSharp.Tests/Car.Tests.fs
··· 1 + module CarTests 2 + 3 + open Xunit 4 + open PDSharp.Core 5 + open PDSharp.Core.Car 6 + open PDSharp.Core.Crypto 7 + 8 + [<Fact>] 9 + let ``Varint encodes zero correctly`` () = 10 + let result = encodeVarint 0 11 + Assert.Equal<byte[]>([| 0uy |], result) 12 + 13 + [<Fact>] 14 + let ``Varint encodes single byte values correctly`` () = 15 + let result1 = encodeVarint 1 16 + Assert.Equal<byte[]>([| 1uy |], result1) 17 + 18 + let result127 = encodeVarint 127 19 + Assert.Equal<byte[]>([| 127uy |], result127) 20 + 21 + [<Fact>] 22 + let ``Varint encodes multi-byte values correctly`` () = 23 + let result128 = encodeVarint 128 24 + Assert.Equal<byte[]>([| 0x80uy; 0x01uy |], result128) 25 + 26 + let result300 = encodeVarint 300 27 + Assert.Equal<byte[]>([| 0xACuy; 0x02uy |], result300) 28 + 29 + let result16384 = encodeVarint 16384 30 + Assert.Equal<byte[]>([| 0x80uy; 0x80uy; 0x01uy |], result16384) 31 + 32 + [<Fact>] 33 + let ``CAR header starts with version and roots`` () = 34 + let hash = sha256Str "test-root" 35 + let root = Cid.FromHash hash 36 + let header = createHeader [ root ] 37 + 38 + Assert.True(header.Length > 0, "Header should not be empty") 39 + Assert.True(header.[0] >= 0xa0uy && header.[0] <= 0xbfuy, "Header should be a CBOR map") 40 + 41 + [<Fact>] 42 + let ``CAR block section is varint + CID + data`` () = 43 + let hash = sha256Str "test-block" 44 + let cid = Cid.FromHash hash 45 + let data = [| 1uy; 2uy; 3uy; 4uy |] 46 + 47 + let block = encodeBlock cid data 48 + 49 + Assert.Equal(40uy, block.[0]) 50 + Assert.Equal(41, block.Length) 51 + 52 + [<Fact>] 53 + let ``Full CAR creation produces valid structure`` () = 54 + let hash = sha256Str "root-data" 55 + let rootCid = Cid.FromHash hash 56 + let blocks = [ (rootCid, [| 1uy; 2uy; 3uy |]) ] 57 + let car = createCar [ rootCid ] blocks 58 + 59 + Assert.True(car.Length > 0, "CAR should not be empty") 60 + Assert.True(car.[0] < 128uy, "Header length should fit in one varint byte for small headers") 61 + 62 + [<Fact>] 63 + let ``CAR with multiple blocks`` () = 64 + let hash1 = sha256Str "block1" 65 + let hash2 = sha256Str "block2" 66 + let cid1 = Cid.FromHash hash1 67 + let cid2 = Cid.FromHash hash2 68 + 69 + let blocks = [ cid1, [| 1uy; 2uy; 3uy |]; cid2, [| 4uy; 5uy; 6uy; 7uy |] ] 70 + let car = createCar [ cid1 ] blocks 71 + Assert.True(car.Length > 80, "CAR with two blocks should be substantial")
+1 -2
PDSharp.Tests/PDSharp.Tests.fsproj
··· 1 1 <Project Sdk="Microsoft.NET.Sdk"> 2 - 3 2 <PropertyGroup> 4 3 <TargetFramework>net9.0</TargetFramework> 5 4 <IsPackable>false</IsPackable> ··· 12 11 <Compile Include="BlockStore.Tests.fs" /> 13 12 <Compile Include="AtUri.Tests.fs" /> 14 13 <Compile Include="Repository.Tests.fs" /> 14 + <Compile Include="Car.Tests.fs" /> 15 15 <Compile Include="Program.fs" /> 16 16 </ItemGroup> 17 17 ··· 26 26 <ProjectReference Include="..\PDSharp\PDSharp.fsproj" /> 27 27 <ProjectReference Include="..\PDSharp.Core\PDSharp.Core.fsproj" /> 28 28 </ItemGroup> 29 - 30 29 </Project>
+16
PDSharp.Tests/Tests.fs
··· 21 21 Assert.Equal("did:web:example.com", config.DidHost) 22 22 23 23 [<Fact>] 24 + let ``CID TryParse roundtrip`` () = 25 + let hash = Crypto.sha256Str "test-data" 26 + let cid = Cid.FromHash hash 27 + let cidStr = cid.ToString() 28 + 29 + match Cid.TryParse cidStr with 30 + | Some parsed -> Assert.Equal<byte[]>(cid.Bytes, parsed.Bytes) 31 + | None -> Assert.Fail "TryParse should succeed for valid CID" 32 + 33 + [<Fact>] 34 + let ``CID TryParse returns None for invalid`` () = 35 + Assert.True(Cid.TryParse("invalid").IsNone) 36 + Assert.True(Cid.TryParse("").IsNone) 37 + Assert.True(Cid.TryParse("btooshort").IsNone) 38 + 39 + [<Fact>] 24 40 let ``Can instantiate DescribeServerResponse`` () = 25 41 let response = { 26 42 availableUserDomains = [ "example.com" ]
+171
PDSharp/Program.fs
··· 328 328 ctx 329 329 } 330 330 331 + /// sync.getRepo: Export entire repository as CAR file 332 + let getRepoHandler : HttpHandler = 333 + fun next ctx -> task { 334 + let did = ctx.Request.Query.["did"].ToString() 335 + 336 + if String.IsNullOrWhiteSpace(did) then 337 + ctx.SetStatusCode 400 338 + 339 + return! 340 + json 341 + { 342 + error = "InvalidRequest" 343 + message = "Missing required query parameter: did" 344 + } 345 + next 346 + ctx 347 + else 348 + match Map.tryFind did repos with 349 + | None -> 350 + ctx.SetStatusCode 404 351 + 352 + return! 353 + json 354 + { 355 + error = "RepoNotFound" 356 + message = $"Repository not found: {did}" 357 + } 358 + next 359 + ctx 360 + | Some repoData -> 361 + match repoData.Head with 362 + | None -> 363 + ctx.SetStatusCode 404 364 + 365 + return! 366 + json 367 + { 368 + error = "RepoNotFound" 369 + message = "Repository has no commits" 370 + } 371 + next 372 + ctx 373 + | Some headCid -> 374 + let! allBlocks = (blockStore :> IBlockStore).GetAllCidsAndData() 375 + let carBytes = Car.createCar [ headCid ] allBlocks 376 + ctx.SetContentType "application/vnd.ipld.car" 377 + ctx.SetStatusCode 200 378 + return! ctx.WriteBytesAsync carBytes 379 + } 380 + 381 + /// sync.getBlocks: Fetch specific blocks by CID 382 + let getBlocksHandler : HttpHandler = 383 + fun next ctx -> task { 384 + let did = ctx.Request.Query.["did"].ToString() 385 + let cidsParam = ctx.Request.Query.["cids"].ToString() 386 + 387 + if String.IsNullOrWhiteSpace did || String.IsNullOrWhiteSpace cidsParam then 388 + ctx.SetStatusCode 400 389 + 390 + return! 391 + json 392 + { 393 + error = "InvalidRequest" 394 + message = "Missing required query parameters: did, cids" 395 + } 396 + next 397 + ctx 398 + else 399 + match Map.tryFind did repos with 400 + | None -> 401 + ctx.SetStatusCode 404 402 + 403 + return! 404 + json 405 + { 406 + error = "RepoNotFound" 407 + message = $"Repository not found: {did}" 408 + } 409 + next 410 + ctx 411 + | Some _ -> 412 + let cidStrs = cidsParam.Split(',') |> Array.map (fun s -> s.Trim()) 413 + let parsedCids = cidStrs |> Array.choose Cid.TryParse |> Array.toList 414 + let! allBlocks = (blockStore :> IBlockStore).GetAllCidsAndData() 415 + 416 + let filteredBlocks = 417 + if parsedCids.IsEmpty then 418 + allBlocks 419 + else 420 + allBlocks 421 + |> List.filter (fun (c, _) -> parsedCids |> List.exists (fun pc -> pc.Bytes = c.Bytes)) 422 + 423 + let roots = 424 + if filteredBlocks.Length > 0 then 425 + [ fst filteredBlocks.[0] ] 426 + else 427 + [] 428 + 429 + let carBytes = Car.createCar roots filteredBlocks 430 + ctx.SetContentType "application/vnd.ipld.car" 431 + ctx.SetStatusCode 200 432 + return! ctx.WriteBytesAsync carBytes 433 + } 434 + 435 + /// sync.getBlob: Fetch a blob by CID 436 + let getBlobHandler : HttpHandler = 437 + fun next ctx -> task { 438 + let did = ctx.Request.Query.["did"].ToString() 439 + let cidStr = ctx.Request.Query.["cid"].ToString() 440 + 441 + if String.IsNullOrWhiteSpace(did) || String.IsNullOrWhiteSpace(cidStr) then 442 + ctx.SetStatusCode 400 443 + 444 + return! 445 + json 446 + { 447 + error = "InvalidRequest" 448 + message = "Missing required query parameters: did, cid" 449 + } 450 + next 451 + ctx 452 + else 453 + match Map.tryFind did repos with 454 + | None -> 455 + ctx.SetStatusCode 404 456 + 457 + return! 458 + json 459 + { 460 + error = "RepoNotFound" 461 + message = $"Repository not found: {did}" 462 + } 463 + next 464 + ctx 465 + | Some _ -> 466 + match Cid.TryParse cidStr with 467 + | None -> 468 + ctx.SetStatusCode 400 469 + 470 + return! 471 + json 472 + { 473 + error = "InvalidRequest" 474 + message = $"Invalid CID format: {cidStr}" 475 + } 476 + next 477 + ctx 478 + | Some cid -> 479 + let! dataOpt = (blockStore :> IBlockStore).Get(cid) 480 + 481 + match dataOpt with 482 + | None -> 483 + ctx.SetStatusCode 404 484 + 485 + return! 486 + json 487 + { 488 + error = "BlobNotFound" 489 + message = $"Blob not found: {cidStr}" 490 + } 491 + next 492 + ctx 493 + | Some data -> 494 + ctx.SetContentType "application/octet-stream" 495 + ctx.SetStatusCode 200 496 + return! ctx.WriteBytesAsync data 497 + } 498 + 331 499 let webApp = 332 500 choose [ 333 501 GET ··· 336 504 POST >=> route "/xrpc/com.atproto.repo.createRecord" >=> createRecordHandler 337 505 GET >=> route "/xrpc/com.atproto.repo.getRecord" >=> getRecordHandler 338 506 POST >=> route "/xrpc/com.atproto.repo.putRecord" >=> putRecordHandler 507 + GET >=> route "/xrpc/com.atproto.sync.getRepo" >=> getRepoHandler 508 + GET >=> route "/xrpc/com.atproto.sync.getBlocks" >=> getBlocksHandler 509 + GET >=> route "/xrpc/com.atproto.sync.getBlob" >=> getBlobHandler 339 510 route "/" >=> text "PDSharp PDS is running." 340 511 RequestErrors.NOT_FOUND "Not Found" 341 512 ]
+3 -3
roadmap.txt
··· 37 37 -------------------------------------------------------------------------------- 38 38 Milestone F: CAR Export + Sync Endpoints 39 39 -------------------------------------------------------------------------------- 40 - - CARv1 writer (roots = commit CID, blocks stream) 41 - - Implement: sync.getRepo, sync.getBlocks, sync.getBlob 40 + - [x] CARv1 writer (roots = commit CID, blocks stream) 41 + - [x] Implement: sync.getRepo, sync.getBlocks, sync.getBlob 42 42 DoD: External services can fetch repo snapshot + blocks 43 43 -------------------------------------------------------------------------------- 44 44 Milestone G: subscribeRepos Firehose ··· 119 119 [x] DAG-CBOR + CID generation correct 120 120 [x] MST producing deterministic root CIDs 121 121 [x] putRecord + blockstore operational 122 - [ ] CAR export + sync endpoints 122 + [x] CAR export + sync endpoints 123 123 [ ] subscribeRepos firehose 124 124 [ ] Authentication (createAccount, createSession) 125 125 [ ] Lexicon validation