(** Crypto tests for AT Protocol. Tests signature verification and did:key encoding using the official interop test fixtures. *) open Atproto_crypto let () = Mirage_crypto_rng_unix.use_default () (** Read test fixture file *) let read_fixture filename = let path = "../fixtures/crypto/" ^ filename in let ic = open_in path in let content = In_channel.input_all ic in close_in ic; match Atproto_json.decode content with | Ok json -> json | Error e -> failwith ("JSON parse error: " ^ e) (** Base64 decode *) let base64_decode s = (* Simple base64 decoder *) let alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" in let decode_table = Array.make 256 (-1) in String.iteri (fun i c -> decode_table.(Char.code c) <- i) alphabet; let len = String.length s in (* Remove padding and calculate output length *) let padding = if len >= 2 && s.[len - 1] = '=' && s.[len - 2] = '=' then 2 else if len >= 1 && s.[len - 1] = '=' then 1 else 0 in let input_len = len - padding in let output_len = input_len * 3 / 4 in let buf = Bytes.create output_len in let rec loop i j = if i >= input_len then () else begin let a = if i < len then decode_table.(Char.code s.[i]) else 0 in let b = if i + 1 < len then decode_table.(Char.code s.[i + 1]) else 0 in let c = if i + 2 < len then decode_table.(Char.code s.[i + 2]) else 0 in let d = if i + 3 < len then decode_table.(Char.code s.[i + 3]) else 0 in let triple = (a lsl 18) lor (b lsl 12) lor (c lsl 6) lor d in if j < output_len then Bytes.set buf j (Char.chr ((triple lsr 16) land 0xff)); if j + 1 < output_len then Bytes.set buf (j + 1) (Char.chr ((triple lsr 8) land 0xff)); if j + 2 < output_len then Bytes.set buf (j + 2) (Char.chr (triple land 0xff)); loop (i + 4) (j + 3) end in loop 0 0; Bytes.to_string buf (** Hex decode *) let hex_decode s = let len = String.length s in let buf = Bytes.create (len / 2) in for i = 0 to (len / 2) - 1 do let hi = let c = s.[i * 2] in if c >= '0' && c <= '9' then Char.code c - Char.code '0' else if c >= 'a' && c <= 'f' then Char.code c - Char.code 'a' + 10 else if c >= 'A' && c <= 'F' then Char.code c - Char.code 'A' + 10 else failwith "invalid hex char" in let lo = let c = s.[(i * 2) + 1] in if c >= '0' && c <= '9' then Char.code c - Char.code '0' else if c >= 'a' && c <= 'f' then Char.code c - Char.code 'a' + 10 else if c >= 'A' && c <= 'F' then Char.code c - Char.code 'A' + 10 else failwith "invalid hex char" in Bytes.set buf i (Char.chr ((hi lsl 4) lor lo)) done; Bytes.to_string buf (* === Signature verification tests === *) let test_signature_verification () = let fixtures = read_fixture "signature-fixtures.json" in match Atproto_json.to_array_opt fixtures with | Some items -> List.iter (fun item -> match Atproto_json.to_object_opt item with | Some fields -> let comment = match Atproto_json.get_string_opt "comment" fields with | Some s -> s | None -> "unknown" in let message_b64 = match Atproto_json.get_string_opt "messageBase64" fields with | Some s -> s | None -> failwith "missing messageBase64" in let algorithm = match Atproto_json.get_string_opt "algorithm" fields with | Some s -> s | None -> failwith "missing algorithm" in let public_key_did = match Atproto_json.get_string_opt "publicKeyDid" fields with | Some s -> s | None -> failwith "missing publicKeyDid" in let signature_b64 = match Atproto_json.get_string_opt "signatureBase64" fields with | Some s -> s | None -> failwith "missing signatureBase64" in let valid_signature = match Atproto_json.get "validSignature" fields with | Some json -> ( match Atproto_json.to_bool_opt json with | Some b -> b | None -> failwith "missing validSignature") | None -> failwith "missing validSignature" in let tags = match Atproto_json.get_array_opt "tags" fields with | Some tags -> List.filter_map Atproto_json.to_string_opt tags | None -> [] in (* Decode inputs *) let message = base64_decode message_b64 in let signature = base64_decode signature_b64 in (* Skip DER-encoded tests (we only support raw format) *) if List.mem "der-encoded" tags then begin Printf.printf "SKIP (DER): %s\n%!" comment; (* DER-encoded signatures should fail - verify returns Error *) match Did_key.decode public_key_did with | Ok key -> let result = Did_key.verify key message signature in Alcotest.(check bool) ("DER should fail: " ^ comment) false (Result.is_ok result) | Error _ -> (* If we can't decode the key, that's also a fail which is correct *) () end else begin Printf.printf "TEST %s: %s (alg=%s)\n%!" (if valid_signature then "valid" else "invalid") comment algorithm; (* Decode the did:key *) match Did_key.decode public_key_did with | Error e -> Alcotest.fail (Printf.sprintf "Failed to decode did:key: %s - %s" public_key_did (Did_key.error_to_string e)) | Ok key -> (* Verify the algorithm matches *) let expected_alg = Did_key.algorithm key in Alcotest.(check string) "algorithm matches" algorithm expected_alg; (* Verify the signature *) let result = Did_key.verify key message signature in let is_valid = Result.is_ok result in Alcotest.(check bool) (Printf.sprintf "signature validity: %s" comment) valid_signature is_valid end | None -> failwith "expected object in fixture array") items | None -> failwith "expected array in fixture file" (* === did:key encoding tests for K-256 === *) let test_didkey_k256 () = let fixtures = read_fixture "w3c_didkey_K256.json" in match Atproto_json.to_array_opt fixtures with | Some items -> List.iter (fun item -> match Atproto_json.to_object_opt item with | Some fields -> ( let private_key_hex = match Atproto_json.get_string_opt "privateKeyBytesHex" fields with | Some s -> s | None -> failwith "missing privateKeyBytesHex" in let expected_did = match Atproto_json.get_string_opt "publicDidKey" fields with | Some s -> s | None -> failwith "missing publicDidKey" in (* Decode private key and derive public key *) let priv_bytes = hex_decode private_key_hex in match K256.private_of_bytes priv_bytes with | Error e -> Alcotest.fail (Printf.sprintf "Failed to decode K256 private key: %s" (K256.error_to_string e)) | Ok priv -> ( let pub = K256.public priv in let did = Did_key.encode (K256 pub) in Printf.printf "K256 did:key test: %s\n%!" expected_did; Alcotest.(check string) "did:key matches" expected_did did; (* Also test roundtrip *) match Did_key.decode did with | Error e -> Alcotest.fail (Printf.sprintf "Failed to decode generated did:key: %s" (Did_key.error_to_string e)) | Ok (K256 _pub') -> () | Ok (P256 _) -> Alcotest.fail "decoded as P256 instead of K256")) | None -> failwith "expected object in fixture array") items | None -> failwith "expected array in fixture file" (* === did:key encoding tests for P-256 === *) let test_didkey_p256 () = let fixtures = read_fixture "w3c_didkey_P256.json" in match Atproto_json.to_array_opt fixtures with | Some items -> List.iter (fun item -> match Atproto_json.to_object_opt item with | Some fields -> ( let private_key_b58 = match Atproto_json.get_string_opt "privateKeyBytesBase58" fields with | Some s -> s | None -> failwith "missing privateKeyBytesBase58" in let expected_did = match Atproto_json.get_string_opt "publicDidKey" fields with | Some s -> s | None -> failwith "missing publicDidKey" in (* Decode private key and derive public key *) match Atproto_multibase.Base58btc.decode private_key_b58 with | Error _ -> Alcotest.fail "Failed to decode base58 private key" | Ok priv_bytes -> ( let priv_str = Bytes.to_string priv_bytes in match P256.private_of_bytes priv_str with | Error e -> Alcotest.fail (Printf.sprintf "Failed to decode P256 private key: %s" (P256.error_to_string e)) | Ok priv -> ( let pub = P256.public priv in let did = Did_key.encode (P256 pub) in Printf.printf "P256 did:key test: %s\n%!" expected_did; Alcotest.(check string) "did:key matches" expected_did did; (* Also test roundtrip *) match Did_key.decode did with | Error e -> Alcotest.fail (Printf.sprintf "Failed to decode generated did:key: %s" (Did_key.error_to_string e)) | Ok (P256 _pub') -> () | Ok (K256 _) -> Alcotest.fail "decoded as K256 instead of P256"))) | None -> failwith "expected object in fixture array") items | None -> failwith "expected array in fixture file" (* === Basic P256 signing tests === *) let test_p256_sign_verify () = let priv = P256.generate () in let pub = P256.public priv in let message = "Hello, AT Protocol!" in let signature = P256.sign priv message in (* Verify signature is correct length *) Alcotest.(check int) "signature length" 64 (String.length signature); (* Verify signature is valid *) match P256.verify pub message signature with | Ok () -> () | Error e -> Alcotest.fail (Printf.sprintf "signature verification failed: %s" (P256.error_to_string e)) let test_p256_invalid_signature () = let priv = P256.generate () in let pub = P256.public priv in let message = "Hello, AT Protocol!" in let signature = P256.sign priv message in (* Modify signature - it should fail verification *) let bad_sig = String.init 64 (fun i -> if i = 0 then Char.chr ((Char.code signature.[0] + 1) mod 256) else signature.[i]) in match P256.verify pub message bad_sig with | Ok () -> Alcotest.fail "modified signature should not verify" | Error _ -> () (* === Basic K256 signing tests === *) let test_k256_sign_verify () = let priv = K256.generate () in let pub = K256.public priv in let message = "Hello, AT Protocol!" in let signature = K256.sign priv message in (* Verify signature is correct length *) Alcotest.(check int) "signature length" 64 (String.length signature); (* Verify signature is valid *) match K256.verify pub message signature with | Ok () -> () | Error e -> Alcotest.fail (Printf.sprintf "signature verification failed: %s" (K256.error_to_string e)) let test_k256_invalid_signature () = let priv = K256.generate () in let pub = K256.public priv in let message = "Hello, AT Protocol!" in let signature = K256.sign priv message in (* Modify signature - it should fail verification *) let bad_sig = String.init 64 (fun i -> if i = 0 then Char.chr ((Char.code signature.[0] + 1) mod 256) else signature.[i]) in match K256.verify pub message bad_sig with | Ok () -> Alcotest.fail "modified signature should not verify" | Error _ -> () (* === JWT tests === *) let test_jwt_create_verify_p256 () = let priv = P256.generate () in let pub = P256.public priv in let now = Int64.of_float (Unix.time ()) in let exp = Int64.add now 3600L in (* 1 hour from now *) let claims : Jwt.claims = { iss = "did:plc:test123"; sub = Some "did:plc:user456"; aud = "https://bsky.social"; exp; iat = now; jti = Some "unique-id-123"; lxm = None; nonce = None; scope = Some "atproto"; } in let token = Jwt.create ~key:(Jwt.P256_key priv) ~typ:"at+jwt" ~claims in let token_str = Jwt.to_string token in (* Verify token structure (3 parts separated by dots) *) let parts = String.split_on_char '.' token_str in Alcotest.(check int) "JWT has 3 parts" 3 (List.length parts); (* Verify we can decode and verify *) match Jwt.decode_and_verify ~key:(Jwt.P256_pub pub) ~now token_str with | Ok decoded -> Alcotest.(check string) "iss" "did:plc:test123" decoded.claims.iss; Alcotest.(check (option string)) "sub" (Some "did:plc:user456") decoded.claims.sub; Alcotest.(check string) "aud" "https://bsky.social" decoded.claims.aud; Alcotest.(check string) "typ" "at+jwt" decoded.header.typ | Error e -> Alcotest.fail (Printf.sprintf "JWT verification failed: %s" (Jwt.error_to_string e)) let test_jwt_create_verify_k256 () = let priv = K256.generate () in let pub = K256.public priv in let now = Int64.of_float (Unix.time ()) in let exp = Int64.add now 3600L in let claims : Jwt.claims = { iss = "did:plc:test123"; sub = None; aud = "did:web:pds.example.com"; exp; iat = now; jti = None; lxm = Some "com.atproto.repo.createRecord"; nonce = None; scope = None; } in let token = Jwt.create ~key:(Jwt.K256_key priv) ~typ:"at+jwt" ~claims in match Jwt.decode_and_verify ~key:(Jwt.K256_pub pub) ~now (Jwt.to_string token) with | Ok decoded -> Alcotest.(check string) "algorithm" "ES256K" (Jwt.algorithm_to_string decoded.header.alg); Alcotest.(check (option string)) "lxm" (Some "com.atproto.repo.createRecord") decoded.claims.lxm | Error e -> Alcotest.fail (Printf.sprintf "JWT verification failed: %s" (Jwt.error_to_string e)) let test_jwt_expired () = let priv = P256.generate () in let pub = P256.public priv in let now = Int64.of_float (Unix.time ()) in let exp = Int64.sub now 3600L in (* Expired 1 hour ago *) let claims : Jwt.claims = { iss = "did:plc:test123"; sub = None; aud = "https://bsky.social"; exp; iat = Int64.sub now 7200L; (* Created 2 hours ago *) jti = None; lxm = None; nonce = None; scope = None; } in let token = Jwt.create ~key:(Jwt.P256_key priv) ~typ:"at+jwt" ~claims in match Jwt.decode_and_verify ~key:(Jwt.P256_pub pub) ~now (Jwt.to_string token) with | Ok _ -> Alcotest.fail "Expired token should not verify" | Error `Expired -> () | Error e -> Alcotest.fail (Printf.sprintf "Expected Expired error, got: %s" (Jwt.error_to_string e)) let test_jwt_invalid_signature () = let priv = P256.generate () in let other_priv = P256.generate () in let other_pub = P256.public other_priv in let now = Int64.of_float (Unix.time ()) in let exp = Int64.add now 3600L in let claims : Jwt.claims = { iss = "did:plc:test123"; sub = None; aud = "https://bsky.social"; exp; iat = now; jti = None; lxm = None; nonce = None; scope = None; } in let token = Jwt.create ~key:(Jwt.P256_key priv) ~typ:"at+jwt" ~claims in (* Verify with a different key - should fail *) match Jwt.decode_and_verify ~key:(Jwt.P256_pub other_pub) ~now (Jwt.to_string token) with | Ok _ -> Alcotest.fail "Token signed with different key should not verify" | Error `Invalid_signature -> () | Error e -> Alcotest.fail (Printf.sprintf "Expected Invalid_signature error, got: %s" (Jwt.error_to_string e)) let test_jwt_decode_unverified () = let priv = P256.generate () in let now = Int64.of_float (Unix.time ()) in let exp = Int64.add now 3600L in let claims : Jwt.claims = { iss = "did:plc:issuer"; sub = Some "did:plc:subject"; aud = "https://audience.example"; exp; iat = now; jti = Some "jti-value"; lxm = None; nonce = None; scope = None; } in let token = Jwt.create ~key:(Jwt.P256_key priv) ~typ:"refresh+jwt" ~claims in (* Decode without verification *) match Jwt.decode_unverified (Jwt.to_string token) with | Ok decoded -> Alcotest.(check string) "typ" "refresh+jwt" decoded.header.typ; Alcotest.(check string) "iss" "did:plc:issuer" decoded.claims.iss | Error e -> Alcotest.fail (Printf.sprintf "Decode failed: %s" (Jwt.error_to_string e)) let test_jwt_invalid_format () = match Jwt.decode_unverified "not.a.valid.jwt.with.too.many.parts" with | Ok _ -> Alcotest.fail "Invalid format should fail" | Error `Invalid_format -> () | Error e -> Alcotest.fail (Printf.sprintf "Expected Invalid_format, got: %s" (Jwt.error_to_string e)) let test_jwt_access_token_helper () = let priv = P256.generate () in let pub = P256.public priv in let now = Int64.of_float (Unix.time ()) in let exp = Int64.add now 3600L in let token = Jwt.create_access_token ~key:(Jwt.P256_key priv) ~iss:"did:plc:issuer" ~sub:"did:plc:subject" ~aud:"https://pds.example.com" ~exp ~iat:now ~scope:"atproto transition:generic" () in match Jwt.decode_and_verify ~key:(Jwt.P256_pub pub) ~now (Jwt.to_string token) with | Ok decoded -> Alcotest.(check string) "typ" "at+jwt" decoded.header.typ; Alcotest.(check (option string)) "scope" (Some "atproto transition:generic") decoded.claims.scope | Error e -> Alcotest.fail (Printf.sprintf "Verification failed: %s" (Jwt.error_to_string e)) let test_jwt_service_token_helper () = let priv = K256.generate () in let pub = K256.public priv in let now = Int64.of_float (Unix.time ()) in let exp = Int64.add now 60L in (* Short-lived service token *) let token = Jwt.create_service_token ~key:(Jwt.K256_key priv) ~iss:"did:plc:service" ~aud:"did:web:pds.example.com" ~exp ~iat:now ~lxm:"com.atproto.server.createSession" () in match Jwt.decode_and_verify ~key:(Jwt.K256_pub pub) ~now (Jwt.to_string token) with | Ok decoded -> Alcotest.(check (option string)) "lxm" (Some "com.atproto.server.createSession") decoded.claims.lxm; Alcotest.(check (option string)) "sub should be None" None decoded.claims.sub | Error e -> Alcotest.fail (Printf.sprintf "Verification failed: %s" (Jwt.error_to_string e)) (* === Test suites === *) let signature_tests = [ Alcotest.test_case "signature verification" `Quick test_signature_verification; ] let didkey_tests = [ Alcotest.test_case "K-256 did:key encoding" `Quick test_didkey_k256; Alcotest.test_case "P-256 did:key encoding" `Quick test_didkey_p256; ] let p256_tests = [ Alcotest.test_case "sign and verify" `Quick test_p256_sign_verify; Alcotest.test_case "invalid signature" `Quick test_p256_invalid_signature; ] let k256_tests = [ Alcotest.test_case "sign and verify" `Quick test_k256_sign_verify; Alcotest.test_case "invalid signature" `Quick test_k256_invalid_signature; ] let jwt_tests = [ Alcotest.test_case "create and verify P256" `Quick test_jwt_create_verify_p256; Alcotest.test_case "create and verify K256" `Quick test_jwt_create_verify_k256; Alcotest.test_case "expired token" `Quick test_jwt_expired; Alcotest.test_case "invalid signature" `Quick test_jwt_invalid_signature; Alcotest.test_case "decode unverified" `Quick test_jwt_decode_unverified; Alcotest.test_case "invalid format" `Quick test_jwt_invalid_format; Alcotest.test_case "access token helper" `Quick test_jwt_access_token_helper; Alcotest.test_case "service token helper" `Quick test_jwt_service_token_helper; ] let () = Alcotest.run "atproto-crypto" [ ("signature", signature_tests); ("did_key", didkey_tests); ("p256", p256_tests); ("k256", k256_tests); ("jwt", jwt_tests); ]