atproto relay implementation in zig zlay.waow.tech
at main 958 lines 37 kB view raw
1//! relay frame validator — DID key resolution + real signature verification 2//! 3//! validates firehose commit frames by verifying the commit signature against 4//! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload 5//! from the subscriber (decoded via zat SDK). on cache miss, skips validation 6//! and queues background resolution. no frame is ever blocked on network I/O. 7 8const std = @import("std"); 9const zat = @import("zat"); 10const broadcaster = @import("broadcaster.zig"); 11const event_log_mod = @import("event_log.zig"); 12const lru = @import("lru.zig"); 13 14const Allocator = std.mem.Allocator; 15const log = std.log.scoped(.relay); 16 17/// decoded and cached signing key for a DID 18const CachedKey = struct { 19 key_type: zat.multicodec.KeyType, 20 raw: [33]u8, // compressed public key (secp256k1 or p256) 21 len: u8, 22 resolve_time: i64 = 0, // epoch seconds when resolved 23}; 24 25pub const ValidationResult = struct { 26 valid: bool, 27 skipped: bool, 28 data_cid: ?[]const u8 = null, // MST root CID from verified commit 29 commit_rev: ?[]const u8 = null, // rev from verified commit 30}; 31 32/// configuration for commit validation checks 33pub const ValidatorConfig = struct { 34 /// verify MST structure during signature verification 35 verify_mst: bool = false, // off by default for relay throughput 36 /// verify commit diffs via MST inversion (sync 1.1) 37 verify_commit_diff: bool = false, 38 /// max allowed operations per commit 39 max_ops: usize = 200, 40 /// max clock skew for rev timestamps (seconds) 41 rev_clock_skew: i64 = 300, // 5 minutes 42}; 43 44pub const Validator = struct { 45 allocator: Allocator, 46 stats: *broadcaster.Stats, 47 config: ValidatorConfig, 48 persist: ?*event_log_mod.DiskPersist = null, 49 // DID → signing key cache (decoded, ready for verification) 50 cache: lru.LruCache(CachedKey), 51 // background resolve queue 52 queue: std.ArrayListUnmanaged([]const u8) = .{}, 53 // in-flight set — prevents duplicate DID entries in the queue 54 queued_set: std.StringHashMapUnmanaged(void) = .{}, 55 queue_mutex: std.Thread.Mutex = .{}, 56 queue_cond: std.Thread.Condition = .{}, 57 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, 58 alive: std.atomic.Value(bool) = .{ .raw = true }, 59 max_cache_size: u32 = 250_000, 60 61 const max_resolver_threads = 8; 62 const default_resolver_threads = 4; 63 const max_queue_size: usize = 100_000; 64 65 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 66 return initWithConfig(allocator, stats, .{}); 67 } 68 69 pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig) Validator { 70 return .{ 71 .allocator = allocator, 72 .stats = stats, 73 .config = config, 74 .cache = lru.LruCache(CachedKey).init(allocator, 250_000), 75 }; 76 } 77 78 pub fn deinit(self: *Validator) void { 79 self.alive.store(false, .release); 80 self.queue_cond.broadcast(); 81 for (&self.resolver_threads) |*t| { 82 if (t.*) |thread| { 83 thread.join(); 84 t.* = null; 85 } 86 } 87 88 self.cache.deinit(); 89 90 // free queued DIDs 91 for (self.queue.items) |did| { 92 self.allocator.free(did); 93 } 94 self.queue.deinit(self.allocator); 95 self.queued_set.deinit(self.allocator); 96 } 97 98 /// start background resolver threads 99 pub fn start(self: *Validator) !void { 100 self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size); 101 self.cache.capacity = self.max_cache_size; 102 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads); 103 const count = @min(n, max_resolver_threads); 104 for (self.resolver_threads[0..count]) |*t| { 105 t.* = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, resolveLoop, .{self}); 106 } 107 } 108 109 /// validate a #sync frame: signature verification only (no ops, no MST). 110 /// #sync resets a repo to a new commit state — used for recovery from broken streams. 111 /// on cache miss, queues background resolution and skips. 112 pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult { 113 const did = payload.getString("did") orelse { 114 _ = self.stats.skipped.fetchAdd(1, .monotonic); 115 return .{ .valid = true, .skipped = true }; 116 }; 117 118 if (zat.Did.parse(did) == null) { 119 _ = self.stats.failed.fetchAdd(1, .monotonic); 120 _ = self.stats.failed_bad_did.fetchAdd(1, .monotonic); 121 return .{ .valid = false, .skipped = false }; 122 } 123 124 // check rev is valid TID (if present) 125 if (payload.getString("rev")) |rev| { 126 if (zat.Tid.parse(rev) == null) { 127 _ = self.stats.failed.fetchAdd(1, .monotonic); 128 _ = self.stats.failed_bad_rev.fetchAdd(1, .monotonic); 129 return .{ .valid = false, .skipped = false }; 130 } 131 } 132 133 const blocks = payload.getBytes("blocks") orelse { 134 _ = self.stats.failed.fetchAdd(1, .monotonic); 135 _ = self.stats.failed_missing_blocks.fetchAdd(1, .monotonic); 136 return .{ .valid = false, .skipped = false }; 137 }; 138 139 // #sync CAR should be small (just the signed commit block) 140 // lexicon maxLength: 10000 141 if (blocks.len > 10_000) { 142 _ = self.stats.failed.fetchAdd(1, .monotonic); 143 _ = self.stats.failed_oversized_blocks.fetchAdd(1, .monotonic); 144 return .{ .valid = false, .skipped = false }; 145 } 146 147 // cache lookup 148 const cached_key: ?CachedKey = self.cache.get(did); 149 150 if (cached_key == null) { 151 _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 152 _ = self.stats.skipped.fetchAdd(1, .monotonic); 153 self.queueResolve(did); 154 return .{ .valid = true, .skipped = true }; 155 } 156 157 _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 158 159 // verify signature (no MST, no ops) 160 const public_key = zat.multicodec.PublicKey{ 161 .key_type = cached_key.?.key_type, 162 .raw = cached_key.?.raw[0..cached_key.?.len], 163 }; 164 165 var arena = std.heap.ArenaAllocator.init(self.allocator); 166 defer arena.deinit(); 167 168 const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{ 169 .verify_mst = false, 170 .expected_did = did, 171 .max_car_size = 10 * 1024, 172 }) catch |err| { 173 log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) }); 174 // sync spec: on signature failure, key may have rotated. 175 // evict cached key and queue re-resolution. skip this frame. 176 self.evictKey(did); 177 self.queueResolve(did); 178 _ = self.stats.skipped.fetchAdd(1, .monotonic); 179 return .{ .valid = true, .skipped = true }; 180 }; 181 182 _ = self.stats.validated.fetchAdd(1, .monotonic); 183 return .{ 184 .valid = true, 185 .skipped = false, 186 .data_cid = result.commit_cid, 187 .commit_rev = result.commit_rev, 188 }; 189 } 190 191 /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder). 192 /// on cache miss, queues background resolution and skips. 193 pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult { 194 // extract DID from decoded payload 195 const did = payload.getString("repo") orelse { 196 _ = self.stats.skipped.fetchAdd(1, .monotonic); 197 return .{ .valid = true, .skipped = true }; 198 }; 199 200 // check cache for pre-resolved signing key 201 const cached_key: ?CachedKey = self.cache.get(did); 202 203 if (cached_key == null) { 204 // cache miss — queue for background resolution, skip validation 205 _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 206 _ = self.stats.skipped.fetchAdd(1, .monotonic); 207 self.queueResolve(did); 208 return .{ .valid = true, .skipped = true }; 209 } 210 211 _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 212 213 // cache hit — do structure checks + signature verification 214 if (self.verifyCommit(payload, did, cached_key.?)) |vr| { 215 _ = self.stats.validated.fetchAdd(1, .monotonic); 216 return vr; 217 } else |err| { 218 log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) }); 219 // sync spec: on signature failure, key may have rotated. 220 // evict cached key and queue re-resolution. skip this frame 221 // (treat as cache miss). next commit will use the refreshed key. 222 self.evictKey(did); 223 self.queueResolve(did); 224 _ = self.stats.skipped.fetchAdd(1, .monotonic); 225 return .{ .valid = true, .skipped = true }; 226 } 227 } 228 229 fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult { 230 // commit structure checks first (cheap, no allocation) 231 self.checkCommitStructure(payload) catch { 232 _ = self.stats.failed_bad_structure.fetchAdd(1, .monotonic); 233 return error.InvalidFrame; 234 }; 235 236 // extract blocks (raw CAR bytes) from the pre-decoded payload 237 const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame; 238 239 // blocks size check — lexicon maxLength: 2000000 240 if (blocks.len > 2_000_000) return error.InvalidFrame; 241 242 // build public key for verification 243 const public_key = zat.multicodec.PublicKey{ 244 .key_type = cached_key.key_type, 245 .raw = cached_key.raw[0..cached_key.len], 246 }; 247 248 // run real signature verification (needs its own arena for CAR/MST temporaries) 249 var arena = std.heap.ArenaAllocator.init(self.allocator); 250 defer arena.deinit(); 251 const alloc = arena.allocator(); 252 253 // try sync 1.1 path: extract ops and use verifyCommitDiff 254 if (self.config.verify_commit_diff) { 255 if (self.extractOps(alloc, payload)) |msg_ops| { 256 // get stored prev_data from payload 257 const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) { 258 .cid => |c| c.raw, 259 .null => null, 260 else => null, 261 } else null; 262 263 const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{ 264 .expected_did = expected_did, 265 .skip_inversion = prev_data == null, 266 }) catch |err| { 267 return err; 268 }; 269 270 return .{ 271 .valid = true, 272 .skipped = false, 273 .data_cid = diff_result.data_cid, 274 .commit_rev = diff_result.commit_rev, 275 }; 276 } 277 } 278 279 // fallback: legacy verification (signature + optional MST walk) 280 const result = zat.verifyCommitCar(alloc, blocks, public_key, .{ 281 .verify_mst = self.config.verify_mst, 282 .expected_did = expected_did, 283 }) catch |err| { 284 return err; 285 }; 286 287 return .{ 288 .valid = true, 289 .skipped = false, 290 .data_cid = result.commit_cid, 291 .commit_rev = result.commit_rev, 292 }; 293 } 294 295 /// extract ops from payload and convert to mst.Operation array. 296 /// the firehose format uses a single "path" field ("collection/rkey"), 297 /// not separate "collection"/"rkey" fields. 298 fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation { 299 _ = self; 300 const ops_array = payload.getArray("ops") orelse return null; 301 var ops: std.ArrayListUnmanaged(zat.MstOperation) = .{}; 302 for (ops_array) |op| { 303 const action = op.getString("action") orelse continue; 304 const path = op.getString("path") orelse continue; 305 306 // validate path contains "/" (collection/rkey) 307 if (std.mem.indexOfScalar(u8, path, '/') == null) continue; 308 309 // extract CID values 310 const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) { 311 .cid => |c| c.raw, 312 else => null, 313 } else null; 314 315 var value: ?[]const u8 = null; 316 var prev: ?[]const u8 = null; 317 318 if (std.mem.eql(u8, action, "create")) { 319 value = cid_value; 320 } else if (std.mem.eql(u8, action, "update")) { 321 value = cid_value; 322 prev = if (op.get("prev")) |v| switch (v) { 323 .cid => |c| c.raw, 324 else => null, 325 } else null; 326 } else if (std.mem.eql(u8, action, "delete")) { 327 prev = if (op.get("prev")) |v| switch (v) { 328 .cid => |c| c.raw, 329 else => null, 330 } else null; 331 } else continue; 332 333 ops.append(alloc, .{ 334 .path = path, 335 .value = value, 336 .prev = prev, 337 }) catch return null; 338 } 339 340 if (ops.items.len == 0) return null; 341 return ops.items; 342 } 343 344 fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void { 345 // check repo field is a valid DID 346 const repo = payload.getString("repo") orelse return error.InvalidFrame; 347 if (zat.Did.parse(repo) == null) return error.InvalidFrame; 348 349 // check rev is a valid TID 350 if (payload.getString("rev")) |rev| { 351 if (zat.Tid.parse(rev) == null) return error.InvalidFrame; 352 } 353 354 // check ops count 355 if (payload.get("ops")) |ops_value| { 356 switch (ops_value) { 357 .array => |ops| { 358 if (ops.len > self.config.max_ops) return error.InvalidFrame; 359 // validate each op has valid path (collection/rkey) 360 for (ops) |op| { 361 if (op.getString("path")) |path| { 362 if (std.mem.indexOfScalar(u8, path, '/')) |sep| { 363 const collection = path[0..sep]; 364 const rkey = path[sep + 1 ..]; 365 if (zat.Nsid.parse(collection) == null) return error.InvalidFrame; 366 if (rkey.len > 0) { 367 if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame; 368 } 369 } else return error.InvalidFrame; // path must contain '/' 370 } 371 } 372 }, 373 else => return error.InvalidFrame, 374 } 375 } 376 } 377 378 fn queueResolve(self: *Validator, did: []const u8) void { 379 // check if already cached (race between validate and resolver) 380 if (self.cache.contains(did)) return; 381 382 const duped = self.allocator.dupe(u8, did) catch return; 383 384 self.queue_mutex.lock(); 385 defer self.queue_mutex.unlock(); 386 387 // skip if already queued (prevents unbounded queue growth) 388 if (self.queued_set.contains(duped)) { 389 self.allocator.free(duped); 390 return; 391 } 392 393 // cap queue size — drop DID without adding to queued_set so it can be re-queued later 394 if (self.queue.items.len >= max_queue_size) { 395 self.allocator.free(duped); 396 return; 397 } 398 399 self.queue.append(self.allocator, duped) catch { 400 self.allocator.free(duped); 401 return; 402 }; 403 self.queued_set.put(self.allocator, duped, {}) catch {}; 404 self.queue_cond.signal(); 405 } 406 407 fn resolveLoop(self: *Validator) void { 408 var resolver = zat.DidResolver.initWithOptions(self.allocator, .{ .keep_alive = true }); 409 defer resolver.deinit(); 410 411 while (self.alive.load(.acquire)) { 412 var did: ?[]const u8 = null; 413 { 414 self.queue_mutex.lock(); 415 defer self.queue_mutex.unlock(); 416 while (self.queue.items.len == 0 and self.alive.load(.acquire)) { 417 self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {}; 418 } 419 if (self.queue.items.len > 0) { 420 did = self.queue.orderedRemove(0); 421 _ = self.queued_set.remove(did.?); 422 } 423 } 424 425 const d = did orelse continue; 426 defer self.allocator.free(d); 427 428 // skip if already cached (resolved while queued) 429 if (self.cache.contains(d)) continue; 430 431 // resolve DID → signing key 432 const parsed = zat.Did.parse(d) orelse continue; 433 var doc = resolver.resolve(parsed) catch |err| { 434 log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); 435 continue; 436 }; 437 defer doc.deinit(); 438 439 // extract and decode signing key 440 const vm = doc.signingKey() orelse continue; 441 const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue; 442 defer self.allocator.free(key_bytes); 443 const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue; 444 445 // store decoded key in cache (fixed-size, no pointer chasing) 446 var cached = CachedKey{ 447 .key_type = public_key.key_type, 448 .raw = undefined, 449 .len = @intCast(public_key.raw.len), 450 .resolve_time = std.time.timestamp(), 451 }; 452 @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); 453 454 self.cache.put(d, cached) catch continue; 455 456 // --- host validation (merged from migration queue) --- 457 // while we have the DID doc, check PDS endpoint and update host if needed. 458 // best-effort: failures don't prevent signing key caching. 459 if (self.persist) |persist| { 460 if (doc.pdsEndpoint()) |pds_endpoint| { 461 if (extractHostFromUrl(pds_endpoint)) |pds_host| { 462 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue; 463 const uid = persist.uidForDid(d) catch continue; 464 const current_host = persist.getAccountHostId(uid) catch continue; 465 if (current_host != 0 and current_host != pds_host_id) { 466 persist.setAccountHostId(uid, pds_host_id) catch {}; 467 log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id }); 468 } 469 } 470 } 471 } 472 } 473 } 474 475 /// evict a DID's cached signing key (e.g. on #identity event). 476 /// the next commit from this DID will trigger a fresh resolution. 477 pub fn evictKey(self: *Validator, did: []const u8) void { 478 _ = self.cache.remove(did); 479 } 480 481 /// cache size (for diagnostics) 482 pub fn cacheSize(self: *Validator) u32 { 483 return self.cache.count(); 484 } 485 486 /// resolve queue length (for diagnostics — non-blocking) 487 pub fn resolveQueueLen(self: *Validator) usize { 488 if (!self.queue_mutex.tryLock()) return 0; 489 defer self.queue_mutex.unlock(); 490 return self.queue.items.len; 491 } 492 493 /// resolve dedup set size (for diagnostics — non-blocking) 494 pub fn resolveQueuedSetCount(self: *Validator) u32 { 495 if (!self.queue_mutex.tryLock()) return 0; 496 defer self.queue_mutex.unlock(); 497 return self.queued_set.count(); 498 } 499 500 /// signing key cache hashmap backing capacity (for memory attribution) 501 pub fn cacheMapCapacity(self: *Validator) u32 { 502 return self.cache.mapCapacity(); 503 } 504 505 /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking) 506 pub fn resolveQueuedSetCapacity(self: *Validator) u32 { 507 if (!self.queue_mutex.tryLock()) return 0; 508 defer self.queue_mutex.unlock(); 509 return self.queued_set.capacity(); 510 } 511 512 pub const HostAuthority = enum { accept, migrate, reject }; 513 514 /// synchronous host authority check. called on first-seen DIDs (is_new) 515 /// and host migrations (host_changed). resolves the DID doc to verify the 516 /// PDS endpoint matches the incoming host. retries once on failure to 517 /// handle transient network errors. 518 /// 519 /// returns: 520 /// .accept — should not happen (caller should only call on new/mismatch) 521 /// .migrate — DID doc confirms this host, caller should update host_id 522 /// .reject — DID doc does not confirm, caller should drop the event 523 pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority { 524 const persist = self.persist orelse return .migrate; // no DB — can't check 525 526 var resolver = zat.DidResolver.initWithOptions(self.allocator, .{}); 527 defer resolver.deinit(); 528 529 const parsed = zat.Did.parse(did) orelse return .reject; 530 531 // first resolve attempt 532 var doc = resolver.resolve(parsed) catch { 533 // retry once on network failure 534 var doc2 = resolver.resolve(parsed) catch return .reject; 535 defer doc2.deinit(); 536 return self.checkPdsHost(&doc2, persist, incoming_host_id); 537 }; 538 defer doc.deinit(); 539 return self.checkPdsHost(&doc, persist, incoming_host_id); 540 } 541 542 fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority { 543 _ = self; 544 const pds_endpoint = doc.pdsEndpoint() orelse return .reject; 545 const pds_host = extractHostFromUrl(pds_endpoint) orelse return .reject; 546 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse return .reject; 547 if (pds_host_id == incoming_host_id) return .migrate; 548 return .reject; 549 } 550}; 551 552/// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path" 553pub fn extractHostFromUrl(url: []const u8) ?[]const u8 { 554 // strip scheme 555 var rest = url; 556 if (std.mem.startsWith(u8, rest, "https://")) { 557 rest = rest["https://".len..]; 558 } else if (std.mem.startsWith(u8, rest, "http://")) { 559 rest = rest["http://".len..]; 560 } 561 // strip path 562 if (std.mem.indexOfScalar(u8, rest, '/')) |i| { 563 rest = rest[0..i]; 564 } 565 // strip port 566 if (std.mem.indexOfScalar(u8, rest, ':')) |i| { 567 rest = rest[0..i]; 568 } 569 if (rest.len == 0) return null; 570 return rest; 571} 572 573fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { 574 const val = std.posix.getenv(key) orelse return default; 575 return std.fmt.parseInt(T, val, 10) catch default; 576} 577 578// --- tests --- 579 580test "validateCommit skips on cache miss" { 581 var stats = broadcaster.Stats{}; 582 var v = Validator.init(std.testing.allocator, &stats); 583 defer v.deinit(); 584 585 // build a commit payload using SDK 586 const payload: zat.cbor.Value = .{ .map = &.{ 587 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 588 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 589 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, 590 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 591 } }; 592 593 const result = v.validateCommit(payload); 594 try std.testing.expect(result.valid); 595 try std.testing.expect(result.skipped); 596 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 597} 598 599test "validateCommit skips when no repo field" { 600 var stats = broadcaster.Stats{}; 601 var v = Validator.init(std.testing.allocator, &stats); 602 defer v.deinit(); 603 604 // payload without "repo" field 605 const payload: zat.cbor.Value = .{ .map = &.{ 606 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 607 } }; 608 609 const result = v.validateCommit(payload); 610 try std.testing.expect(result.valid); 611 try std.testing.expect(result.skipped); 612 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 613} 614 615test "checkCommitStructure rejects invalid DID" { 616 var stats = broadcaster.Stats{}; 617 var v = Validator.init(std.testing.allocator, &stats); 618 defer v.deinit(); 619 620 const payload: zat.cbor.Value = .{ .map = &.{ 621 .{ .key = "repo", .value = .{ .text = "not-a-did" } }, 622 } }; 623 624 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 625} 626 627test "checkCommitStructure accepts valid commit" { 628 var stats = broadcaster.Stats{}; 629 var v = Validator.init(std.testing.allocator, &stats); 630 defer v.deinit(); 631 632 const payload: zat.cbor.Value = .{ .map = &.{ 633 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 634 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 635 } }; 636 637 try v.checkCommitStructure(payload); 638} 639 640test "validateSync skips on cache miss" { 641 var stats = broadcaster.Stats{}; 642 var v = Validator.init(std.testing.allocator, &stats); 643 defer v.deinit(); 644 645 const payload: zat.cbor.Value = .{ .map = &.{ 646 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 647 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 648 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 649 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 650 } }; 651 652 const result = v.validateSync(payload); 653 try std.testing.expect(result.valid); 654 try std.testing.expect(result.skipped); 655 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 656} 657 658test "validateSync rejects invalid DID" { 659 var stats = broadcaster.Stats{}; 660 var v = Validator.init(std.testing.allocator, &stats); 661 defer v.deinit(); 662 663 const payload: zat.cbor.Value = .{ .map = &.{ 664 .{ .key = "did", .value = .{ .text = "not-a-did" } }, 665 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 666 } }; 667 668 const result = v.validateSync(payload); 669 try std.testing.expect(!result.valid); 670 try std.testing.expect(!result.skipped); 671 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 672} 673 674test "validateSync rejects missing blocks" { 675 var stats = broadcaster.Stats{}; 676 var v = Validator.init(std.testing.allocator, &stats); 677 defer v.deinit(); 678 679 const payload: zat.cbor.Value = .{ .map = &.{ 680 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 681 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 682 } }; 683 684 const result = v.validateSync(payload); 685 try std.testing.expect(!result.valid); 686 try std.testing.expect(!result.skipped); 687 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 688} 689 690test "validateSync skips when no did field" { 691 var stats = broadcaster.Stats{}; 692 var v = Validator.init(std.testing.allocator, &stats); 693 defer v.deinit(); 694 695 const payload: zat.cbor.Value = .{ .map = &.{ 696 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 697 } }; 698 699 const result = v.validateSync(payload); 700 try std.testing.expect(result.valid); 701 try std.testing.expect(result.skipped); 702 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 703} 704 705test "LRU cache evicts least recently used" { 706 var stats = broadcaster.Stats{}; 707 var v = Validator.init(std.testing.allocator, &stats); 708 v.cache.capacity = 3; 709 defer v.deinit(); 710 711 const mk = CachedKey{ .key_type = .p256, .raw = .{0} ** 33, .len = 33 }; 712 713 try v.cache.put("did:plc:aaa", mk); 714 try v.cache.put("did:plc:bbb", mk); 715 try v.cache.put("did:plc:ccc", mk); 716 717 // access "aaa" to promote it 718 _ = v.cache.get("did:plc:aaa"); 719 720 // insert "ddd" — should evict "bbb" (LRU) 721 try v.cache.put("did:plc:ddd", mk); 722 723 try std.testing.expect(v.cache.get("did:plc:bbb") == null); 724 try std.testing.expect(v.cache.get("did:plc:aaa") != null); 725 try std.testing.expect(v.cache.get("did:plc:ccc") != null); 726 try std.testing.expect(v.cache.get("did:plc:ddd") != null); 727 try std.testing.expectEqual(@as(u32, 3), v.cache.count()); 728} 729 730test "checkCommitStructure rejects too many ops" { 731 var stats = broadcaster.Stats{}; 732 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 }); 733 defer v.deinit(); 734 735 // build ops array with 3 items (over limit of 2) 736 const ops = [_]zat.cbor.Value{ 737 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 738 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 739 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 740 }; 741 742 const payload: zat.cbor.Value = .{ .map = &.{ 743 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 744 .{ .key = "ops", .value = .{ .array = &ops } }, 745 } }; 746 747 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 748} 749 750// --- spec conformance tests --- 751 752test "spec: #commit blocks > 2,000,000 bytes rejected" { 753 // lexicon maxLength for #commit blocks: 2,000,000 754 var stats = broadcaster.Stats{}; 755 var v = Validator.init(std.testing.allocator, &stats); 756 defer v.deinit(); 757 758 // insert a fake cached key so we reach the blocks size check 759 const did = "did:plc:test123"; 760 try v.cache.put(did, .{ 761 .key_type = .p256, 762 .raw = .{0} ** 33, 763 .len = 33, 764 .resolve_time = 100, 765 }); 766 767 // blocks with 2,000,001 bytes (1 byte over limit) 768 const oversized_blocks = try std.testing.allocator.alloc(u8, 2_000_001); 769 defer std.testing.allocator.free(oversized_blocks); 770 @memset(oversized_blocks, 0); 771 772 const payload: zat.cbor.Value = .{ .map = &.{ 773 .{ .key = "repo", .value = .{ .text = did } }, 774 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 775 .{ .key = "blocks", .value = .{ .bytes = oversized_blocks } }, 776 } }; 777 778 const result = v.validateCommit(payload); 779 try std.testing.expect(!result.valid or result.skipped); 780} 781 782test "spec: #commit blocks = 2,000,000 bytes accepted (boundary)" { 783 // lexicon maxLength for #commit blocks: 2,000,000 — exactly at limit should pass size check 784 var stats = broadcaster.Stats{}; 785 var v = Validator.init(std.testing.allocator, &stats); 786 defer v.deinit(); 787 788 const did = "did:plc:test123"; 789 try v.cache.put(did, .{ 790 .key_type = .p256, 791 .raw = .{0} ** 33, 792 .len = 33, 793 .resolve_time = 100, 794 }); 795 796 // exactly 2,000,000 bytes — should pass size check (may fail signature verify, that's ok) 797 const exact_blocks = try std.testing.allocator.alloc(u8, 2_000_000); 798 defer std.testing.allocator.free(exact_blocks); 799 @memset(exact_blocks, 0); 800 801 const payload: zat.cbor.Value = .{ .map = &.{ 802 .{ .key = "repo", .value = .{ .text = did } }, 803 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 804 .{ .key = "blocks", .value = .{ .bytes = exact_blocks } }, 805 } }; 806 807 const result = v.validateCommit(payload); 808 // should not be rejected for size — may fail signature verification (that's fine, 809 // it means we passed the size check). with P1.1c, sig failure → skipped=true. 810 try std.testing.expect(result.valid or result.skipped); 811} 812 813test "spec: #sync blocks > 10,000 bytes rejected" { 814 // lexicon maxLength for #sync blocks: 10,000 815 var stats = broadcaster.Stats{}; 816 var v = Validator.init(std.testing.allocator, &stats); 817 defer v.deinit(); 818 819 const payload: zat.cbor.Value = .{ .map = &.{ 820 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 821 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 822 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_001) } }, 823 } }; 824 825 const result = v.validateSync(payload); 826 try std.testing.expect(!result.valid); 827 try std.testing.expect(!result.skipped); 828} 829 830test "spec: #sync blocks = 10,000 bytes accepted (boundary)" { 831 // lexicon maxLength for #sync blocks: 10,000 — exactly at limit should pass size check 832 var stats = broadcaster.Stats{}; 833 var v = Validator.init(std.testing.allocator, &stats); 834 defer v.deinit(); 835 836 const payload: zat.cbor.Value = .{ .map = &.{ 837 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 838 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 839 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_000) } }, 840 } }; 841 842 const result = v.validateSync(payload); 843 // should pass size check — will be a cache miss → skipped (no cached key) 844 try std.testing.expect(result.valid); 845 try std.testing.expect(result.skipped); 846} 847 848test "extractOps reads path field from firehose format" { 849 var stats = broadcaster.Stats{}; 850 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); 851 defer v.deinit(); 852 853 // use arena since extractOps allocates an ArrayList internally 854 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 855 defer arena.deinit(); 856 857 const ops = [_]zat.cbor.Value{ 858 .{ .map = &.{ 859 .{ .key = "action", .value = .{ .text = "create" } }, 860 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 861 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 862 } }, 863 .{ .map = &.{ 864 .{ .key = "action", .value = .{ .text = "delete" } }, 865 .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } }, 866 } }, 867 }; 868 869 const payload: zat.cbor.Value = .{ .map = &.{ 870 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 871 .{ .key = "ops", .value = .{ .array = &ops } }, 872 } }; 873 874 const result = v.extractOps(arena.allocator(), payload); 875 try std.testing.expect(result != null); 876 try std.testing.expectEqual(@as(usize, 2), result.?.len); 877 try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path); 878 try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path); 879 try std.testing.expect(result.?[0].value != null); // create has cid 880 try std.testing.expect(result.?[1].value == null); // delete has no cid 881} 882 883test "extractOps rejects malformed path without slash" { 884 var stats = broadcaster.Stats{}; 885 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); 886 defer v.deinit(); 887 888 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 889 defer arena.deinit(); 890 891 const ops = [_]zat.cbor.Value{ 892 .{ .map = &.{ 893 .{ .key = "action", .value = .{ .text = "create" } }, 894 .{ .key = "path", .value = .{ .text = "noslash" } }, 895 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 896 } }, 897 }; 898 899 const payload: zat.cbor.Value = .{ .map = &.{ 900 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 901 .{ .key = "ops", .value = .{ .array = &ops } }, 902 } }; 903 904 // malformed path (no slash) → all ops skipped → returns null 905 const result = v.extractOps(arena.allocator(), payload); 906 try std.testing.expect(result == null); 907} 908 909test "checkCommitStructure validates path field" { 910 var stats = broadcaster.Stats{}; 911 var v = Validator.init(std.testing.allocator, &stats); 912 defer v.deinit(); 913 914 // valid path 915 const valid_ops = [_]zat.cbor.Value{ 916 .{ .map = &.{ 917 .{ .key = "action", .value = .{ .text = "create" } }, 918 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 919 } }, 920 }; 921 922 const valid_payload: zat.cbor.Value = .{ .map = &.{ 923 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 924 .{ .key = "ops", .value = .{ .array = &valid_ops } }, 925 } }; 926 927 try v.checkCommitStructure(valid_payload); 928 929 // invalid collection in path 930 const invalid_ops = [_]zat.cbor.Value{ 931 .{ .map = &.{ 932 .{ .key = "action", .value = .{ .text = "create" } }, 933 .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } }, 934 } }, 935 }; 936 937 const invalid_payload: zat.cbor.Value = .{ .map = &.{ 938 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 939 .{ .key = "ops", .value = .{ .array = &invalid_ops } }, 940 } }; 941 942 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload)); 943} 944 945test "queueResolve deduplicates repeated DIDs" { 946 var stats = broadcaster.Stats{}; 947 var v = Validator.init(std.testing.allocator, &stats); 948 defer v.deinit(); 949 950 // queue the same DID 100 times 951 for (0..100) |_| { 952 v.queueResolve("did:plc:duplicate"); 953 } 954 955 // should have exactly 1 entry, not 100 956 try std.testing.expectEqual(@as(usize, 1), v.queue.items.len); 957 try std.testing.expectEqual(@as(u32, 1), v.queued_set.count()); 958}