atproto relay implementation in zig
zlay.waow.tech
1//! relay frame validator — DID key resolution + real signature verification
2//!
3//! validates firehose commit frames by verifying the commit signature against
4//! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload
5//! from the subscriber (decoded via zat SDK). on cache miss, skips validation
6//! and queues background resolution. no frame is ever blocked on network I/O.
7
8const std = @import("std");
9const zat = @import("zat");
10const broadcaster = @import("broadcaster.zig");
11const event_log_mod = @import("event_log.zig");
12const lru = @import("lru.zig");
13
14const Allocator = std.mem.Allocator;
15const log = std.log.scoped(.relay);
16
17/// decoded and cached signing key for a DID
18const CachedKey = struct {
19 key_type: zat.multicodec.KeyType,
20 raw: [33]u8, // compressed public key (secp256k1 or p256)
21 len: u8,
22 resolve_time: i64 = 0, // epoch seconds when resolved
23};
24
25pub const ValidationResult = struct {
26 valid: bool,
27 skipped: bool,
28 data_cid: ?[]const u8 = null, // MST root CID from verified commit
29 commit_rev: ?[]const u8 = null, // rev from verified commit
30};
31
32/// configuration for commit validation checks
33pub const ValidatorConfig = struct {
34 /// verify MST structure during signature verification
35 verify_mst: bool = false, // off by default for relay throughput
36 /// verify commit diffs via MST inversion (sync 1.1)
37 verify_commit_diff: bool = false,
38 /// max allowed operations per commit
39 max_ops: usize = 200,
40 /// max clock skew for rev timestamps (seconds)
41 rev_clock_skew: i64 = 300, // 5 minutes
42};
43
44pub const Validator = struct {
45 allocator: Allocator,
46 stats: *broadcaster.Stats,
47 config: ValidatorConfig,
48 persist: ?*event_log_mod.DiskPersist = null,
49 // DID → signing key cache (decoded, ready for verification)
50 cache: lru.LruCache(CachedKey),
51 // background resolve queue
52 queue: std.ArrayListUnmanaged([]const u8) = .{},
53 // in-flight set — prevents duplicate DID entries in the queue
54 queued_set: std.StringHashMapUnmanaged(void) = .{},
55 queue_mutex: std.Thread.Mutex = .{},
56 queue_cond: std.Thread.Condition = .{},
57 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads,
58 alive: std.atomic.Value(bool) = .{ .raw = true },
59 max_cache_size: u32 = 250_000,
60
61 const max_resolver_threads = 8;
62 const default_resolver_threads = 4;
63 const max_queue_size: usize = 100_000;
64
65 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator {
66 return initWithConfig(allocator, stats, .{});
67 }
68
69 pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig) Validator {
70 return .{
71 .allocator = allocator,
72 .stats = stats,
73 .config = config,
74 .cache = lru.LruCache(CachedKey).init(allocator, 250_000),
75 };
76 }
77
78 pub fn deinit(self: *Validator) void {
79 self.alive.store(false, .release);
80 self.queue_cond.broadcast();
81 for (&self.resolver_threads) |*t| {
82 if (t.*) |thread| {
83 thread.join();
84 t.* = null;
85 }
86 }
87
88 self.cache.deinit();
89
90 // free queued DIDs
91 for (self.queue.items) |did| {
92 self.allocator.free(did);
93 }
94 self.queue.deinit(self.allocator);
95 self.queued_set.deinit(self.allocator);
96 }
97
98 /// start background resolver threads
99 pub fn start(self: *Validator) !void {
100 self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size);
101 self.cache.capacity = self.max_cache_size;
102 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads);
103 const count = @min(n, max_resolver_threads);
104 for (self.resolver_threads[0..count]) |*t| {
105 t.* = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, resolveLoop, .{self});
106 }
107 }
108
109 /// validate a #sync frame: signature verification only (no ops, no MST).
110 /// #sync resets a repo to a new commit state — used for recovery from broken streams.
111 /// on cache miss, queues background resolution and skips.
112 pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult {
113 const did = payload.getString("did") orelse {
114 _ = self.stats.skipped.fetchAdd(1, .monotonic);
115 return .{ .valid = true, .skipped = true };
116 };
117
118 if (zat.Did.parse(did) == null) {
119 _ = self.stats.failed.fetchAdd(1, .monotonic);
120 _ = self.stats.failed_bad_did.fetchAdd(1, .monotonic);
121 return .{ .valid = false, .skipped = false };
122 }
123
124 // check rev is valid TID (if present)
125 if (payload.getString("rev")) |rev| {
126 if (zat.Tid.parse(rev) == null) {
127 _ = self.stats.failed.fetchAdd(1, .monotonic);
128 _ = self.stats.failed_bad_rev.fetchAdd(1, .monotonic);
129 return .{ .valid = false, .skipped = false };
130 }
131 }
132
133 const blocks = payload.getBytes("blocks") orelse {
134 _ = self.stats.failed.fetchAdd(1, .monotonic);
135 _ = self.stats.failed_missing_blocks.fetchAdd(1, .monotonic);
136 return .{ .valid = false, .skipped = false };
137 };
138
139 // #sync CAR should be small (just the signed commit block)
140 // lexicon maxLength: 10000
141 if (blocks.len > 10_000) {
142 _ = self.stats.failed.fetchAdd(1, .monotonic);
143 _ = self.stats.failed_oversized_blocks.fetchAdd(1, .monotonic);
144 return .{ .valid = false, .skipped = false };
145 }
146
147 // cache lookup
148 const cached_key: ?CachedKey = self.cache.get(did);
149
150 if (cached_key == null) {
151 _ = self.stats.cache_misses.fetchAdd(1, .monotonic);
152 _ = self.stats.skipped.fetchAdd(1, .monotonic);
153 self.queueResolve(did);
154 return .{ .valid = true, .skipped = true };
155 }
156
157 _ = self.stats.cache_hits.fetchAdd(1, .monotonic);
158
159 // verify signature (no MST, no ops)
160 const public_key = zat.multicodec.PublicKey{
161 .key_type = cached_key.?.key_type,
162 .raw = cached_key.?.raw[0..cached_key.?.len],
163 };
164
165 var arena = std.heap.ArenaAllocator.init(self.allocator);
166 defer arena.deinit();
167
168 const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{
169 .verify_mst = false,
170 .expected_did = did,
171 .max_car_size = 10 * 1024,
172 }) catch |err| {
173 log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) });
174 // sync spec: on signature failure, key may have rotated.
175 // evict cached key and queue re-resolution. skip this frame.
176 self.evictKey(did);
177 self.queueResolve(did);
178 _ = self.stats.skipped.fetchAdd(1, .monotonic);
179 return .{ .valid = true, .skipped = true };
180 };
181
182 _ = self.stats.validated.fetchAdd(1, .monotonic);
183 return .{
184 .valid = true,
185 .skipped = false,
186 .data_cid = result.commit_cid,
187 .commit_rev = result.commit_rev,
188 };
189 }
190
191 /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder).
192 /// on cache miss, queues background resolution and skips.
193 pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult {
194 // extract DID from decoded payload
195 const did = payload.getString("repo") orelse {
196 _ = self.stats.skipped.fetchAdd(1, .monotonic);
197 return .{ .valid = true, .skipped = true };
198 };
199
200 // check cache for pre-resolved signing key
201 const cached_key: ?CachedKey = self.cache.get(did);
202
203 if (cached_key == null) {
204 // cache miss — queue for background resolution, skip validation
205 _ = self.stats.cache_misses.fetchAdd(1, .monotonic);
206 _ = self.stats.skipped.fetchAdd(1, .monotonic);
207 self.queueResolve(did);
208 return .{ .valid = true, .skipped = true };
209 }
210
211 _ = self.stats.cache_hits.fetchAdd(1, .monotonic);
212
213 // cache hit — do structure checks + signature verification
214 if (self.verifyCommit(payload, did, cached_key.?)) |vr| {
215 _ = self.stats.validated.fetchAdd(1, .monotonic);
216 return vr;
217 } else |err| {
218 log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) });
219 // sync spec: on signature failure, key may have rotated.
220 // evict cached key and queue re-resolution. skip this frame
221 // (treat as cache miss). next commit will use the refreshed key.
222 self.evictKey(did);
223 self.queueResolve(did);
224 _ = self.stats.skipped.fetchAdd(1, .monotonic);
225 return .{ .valid = true, .skipped = true };
226 }
227 }
228
229 fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult {
230 // commit structure checks first (cheap, no allocation)
231 self.checkCommitStructure(payload) catch {
232 _ = self.stats.failed_bad_structure.fetchAdd(1, .monotonic);
233 return error.InvalidFrame;
234 };
235
236 // extract blocks (raw CAR bytes) from the pre-decoded payload
237 const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame;
238
239 // blocks size check — lexicon maxLength: 2000000
240 if (blocks.len > 2_000_000) return error.InvalidFrame;
241
242 // build public key for verification
243 const public_key = zat.multicodec.PublicKey{
244 .key_type = cached_key.key_type,
245 .raw = cached_key.raw[0..cached_key.len],
246 };
247
248 // run real signature verification (needs its own arena for CAR/MST temporaries)
249 var arena = std.heap.ArenaAllocator.init(self.allocator);
250 defer arena.deinit();
251 const alloc = arena.allocator();
252
253 // try sync 1.1 path: extract ops and use verifyCommitDiff
254 if (self.config.verify_commit_diff) {
255 if (self.extractOps(alloc, payload)) |msg_ops| {
256 // get stored prev_data from payload
257 const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) {
258 .cid => |c| c.raw,
259 .null => null,
260 else => null,
261 } else null;
262
263 const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{
264 .expected_did = expected_did,
265 .skip_inversion = prev_data == null,
266 }) catch |err| {
267 return err;
268 };
269
270 return .{
271 .valid = true,
272 .skipped = false,
273 .data_cid = diff_result.data_cid,
274 .commit_rev = diff_result.commit_rev,
275 };
276 }
277 }
278
279 // fallback: legacy verification (signature + optional MST walk)
280 const result = zat.verifyCommitCar(alloc, blocks, public_key, .{
281 .verify_mst = self.config.verify_mst,
282 .expected_did = expected_did,
283 }) catch |err| {
284 return err;
285 };
286
287 return .{
288 .valid = true,
289 .skipped = false,
290 .data_cid = result.commit_cid,
291 .commit_rev = result.commit_rev,
292 };
293 }
294
295 /// extract ops from payload and convert to mst.Operation array.
296 /// the firehose format uses a single "path" field ("collection/rkey"),
297 /// not separate "collection"/"rkey" fields.
298 fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation {
299 _ = self;
300 const ops_array = payload.getArray("ops") orelse return null;
301 var ops: std.ArrayListUnmanaged(zat.MstOperation) = .{};
302 for (ops_array) |op| {
303 const action = op.getString("action") orelse continue;
304 const path = op.getString("path") orelse continue;
305
306 // validate path contains "/" (collection/rkey)
307 if (std.mem.indexOfScalar(u8, path, '/') == null) continue;
308
309 // extract CID values
310 const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) {
311 .cid => |c| c.raw,
312 else => null,
313 } else null;
314
315 var value: ?[]const u8 = null;
316 var prev: ?[]const u8 = null;
317
318 if (std.mem.eql(u8, action, "create")) {
319 value = cid_value;
320 } else if (std.mem.eql(u8, action, "update")) {
321 value = cid_value;
322 prev = if (op.get("prev")) |v| switch (v) {
323 .cid => |c| c.raw,
324 else => null,
325 } else null;
326 } else if (std.mem.eql(u8, action, "delete")) {
327 prev = if (op.get("prev")) |v| switch (v) {
328 .cid => |c| c.raw,
329 else => null,
330 } else null;
331 } else continue;
332
333 ops.append(alloc, .{
334 .path = path,
335 .value = value,
336 .prev = prev,
337 }) catch return null;
338 }
339
340 if (ops.items.len == 0) return null;
341 return ops.items;
342 }
343
344 fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void {
345 // check repo field is a valid DID
346 const repo = payload.getString("repo") orelse return error.InvalidFrame;
347 if (zat.Did.parse(repo) == null) return error.InvalidFrame;
348
349 // check rev is a valid TID
350 if (payload.getString("rev")) |rev| {
351 if (zat.Tid.parse(rev) == null) return error.InvalidFrame;
352 }
353
354 // check ops count
355 if (payload.get("ops")) |ops_value| {
356 switch (ops_value) {
357 .array => |ops| {
358 if (ops.len > self.config.max_ops) return error.InvalidFrame;
359 // validate each op has valid path (collection/rkey)
360 for (ops) |op| {
361 if (op.getString("path")) |path| {
362 if (std.mem.indexOfScalar(u8, path, '/')) |sep| {
363 const collection = path[0..sep];
364 const rkey = path[sep + 1 ..];
365 if (zat.Nsid.parse(collection) == null) return error.InvalidFrame;
366 if (rkey.len > 0) {
367 if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame;
368 }
369 } else return error.InvalidFrame; // path must contain '/'
370 }
371 }
372 },
373 else => return error.InvalidFrame,
374 }
375 }
376 }
377
378 fn queueResolve(self: *Validator, did: []const u8) void {
379 // check if already cached (race between validate and resolver)
380 if (self.cache.contains(did)) return;
381
382 const duped = self.allocator.dupe(u8, did) catch return;
383
384 self.queue_mutex.lock();
385 defer self.queue_mutex.unlock();
386
387 // skip if already queued (prevents unbounded queue growth)
388 if (self.queued_set.contains(duped)) {
389 self.allocator.free(duped);
390 return;
391 }
392
393 // cap queue size — drop DID without adding to queued_set so it can be re-queued later
394 if (self.queue.items.len >= max_queue_size) {
395 self.allocator.free(duped);
396 return;
397 }
398
399 self.queue.append(self.allocator, duped) catch {
400 self.allocator.free(duped);
401 return;
402 };
403 self.queued_set.put(self.allocator, duped, {}) catch {};
404 self.queue_cond.signal();
405 }
406
407 fn resolveLoop(self: *Validator) void {
408 var resolver = zat.DidResolver.initWithOptions(self.allocator, .{ .keep_alive = true });
409 defer resolver.deinit();
410
411 while (self.alive.load(.acquire)) {
412 var did: ?[]const u8 = null;
413 {
414 self.queue_mutex.lock();
415 defer self.queue_mutex.unlock();
416 while (self.queue.items.len == 0 and self.alive.load(.acquire)) {
417 self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {};
418 }
419 if (self.queue.items.len > 0) {
420 did = self.queue.orderedRemove(0);
421 _ = self.queued_set.remove(did.?);
422 }
423 }
424
425 const d = did orelse continue;
426 defer self.allocator.free(d);
427
428 // skip if already cached (resolved while queued)
429 if (self.cache.contains(d)) continue;
430
431 // resolve DID → signing key
432 const parsed = zat.Did.parse(d) orelse continue;
433 var doc = resolver.resolve(parsed) catch |err| {
434 log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) });
435 continue;
436 };
437 defer doc.deinit();
438
439 // extract and decode signing key
440 const vm = doc.signingKey() orelse continue;
441 const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue;
442 defer self.allocator.free(key_bytes);
443 const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue;
444
445 // store decoded key in cache (fixed-size, no pointer chasing)
446 var cached = CachedKey{
447 .key_type = public_key.key_type,
448 .raw = undefined,
449 .len = @intCast(public_key.raw.len),
450 .resolve_time = std.time.timestamp(),
451 };
452 @memcpy(cached.raw[0..public_key.raw.len], public_key.raw);
453
454 self.cache.put(d, cached) catch continue;
455
456 // --- host validation (merged from migration queue) ---
457 // while we have the DID doc, check PDS endpoint and update host if needed.
458 // best-effort: failures don't prevent signing key caching.
459 if (self.persist) |persist| {
460 if (doc.pdsEndpoint()) |pds_endpoint| {
461 if (extractHostFromUrl(pds_endpoint)) |pds_host| {
462 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue;
463 const uid = persist.uidForDid(d) catch continue;
464 const current_host = persist.getAccountHostId(uid) catch continue;
465 if (current_host != 0 and current_host != pds_host_id) {
466 persist.setAccountHostId(uid, pds_host_id) catch {};
467 log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id });
468 }
469 }
470 }
471 }
472 }
473 }
474
475 /// evict a DID's cached signing key (e.g. on #identity event).
476 /// the next commit from this DID will trigger a fresh resolution.
477 pub fn evictKey(self: *Validator, did: []const u8) void {
478 _ = self.cache.remove(did);
479 }
480
481 /// cache size (for diagnostics)
482 pub fn cacheSize(self: *Validator) u32 {
483 return self.cache.count();
484 }
485
486 /// resolve queue length (for diagnostics — non-blocking)
487 pub fn resolveQueueLen(self: *Validator) usize {
488 if (!self.queue_mutex.tryLock()) return 0;
489 defer self.queue_mutex.unlock();
490 return self.queue.items.len;
491 }
492
493 /// resolve dedup set size (for diagnostics — non-blocking)
494 pub fn resolveQueuedSetCount(self: *Validator) u32 {
495 if (!self.queue_mutex.tryLock()) return 0;
496 defer self.queue_mutex.unlock();
497 return self.queued_set.count();
498 }
499
500 /// signing key cache hashmap backing capacity (for memory attribution)
501 pub fn cacheMapCapacity(self: *Validator) u32 {
502 return self.cache.mapCapacity();
503 }
504
505 /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking)
506 pub fn resolveQueuedSetCapacity(self: *Validator) u32 {
507 if (!self.queue_mutex.tryLock()) return 0;
508 defer self.queue_mutex.unlock();
509 return self.queued_set.capacity();
510 }
511
512 pub const HostAuthority = enum { accept, migrate, reject };
513
514 /// synchronous host authority check. called on first-seen DIDs (is_new)
515 /// and host migrations (host_changed). resolves the DID doc to verify the
516 /// PDS endpoint matches the incoming host. retries once on failure to
517 /// handle transient network errors.
518 ///
519 /// returns:
520 /// .accept — should not happen (caller should only call on new/mismatch)
521 /// .migrate — DID doc confirms this host, caller should update host_id
522 /// .reject — DID doc does not confirm, caller should drop the event
523 pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority {
524 const persist = self.persist orelse return .migrate; // no DB — can't check
525
526 var resolver = zat.DidResolver.initWithOptions(self.allocator, .{});
527 defer resolver.deinit();
528
529 const parsed = zat.Did.parse(did) orelse return .reject;
530
531 // first resolve attempt
532 var doc = resolver.resolve(parsed) catch {
533 // retry once on network failure
534 var doc2 = resolver.resolve(parsed) catch return .reject;
535 defer doc2.deinit();
536 return self.checkPdsHost(&doc2, persist, incoming_host_id);
537 };
538 defer doc.deinit();
539 return self.checkPdsHost(&doc, persist, incoming_host_id);
540 }
541
542 fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority {
543 _ = self;
544 const pds_endpoint = doc.pdsEndpoint() orelse return .reject;
545 const pds_host = extractHostFromUrl(pds_endpoint) orelse return .reject;
546 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse return .reject;
547 if (pds_host_id == incoming_host_id) return .migrate;
548 return .reject;
549 }
550};
551
552/// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path"
553pub fn extractHostFromUrl(url: []const u8) ?[]const u8 {
554 // strip scheme
555 var rest = url;
556 if (std.mem.startsWith(u8, rest, "https://")) {
557 rest = rest["https://".len..];
558 } else if (std.mem.startsWith(u8, rest, "http://")) {
559 rest = rest["http://".len..];
560 }
561 // strip path
562 if (std.mem.indexOfScalar(u8, rest, '/')) |i| {
563 rest = rest[0..i];
564 }
565 // strip port
566 if (std.mem.indexOfScalar(u8, rest, ':')) |i| {
567 rest = rest[0..i];
568 }
569 if (rest.len == 0) return null;
570 return rest;
571}
572
573fn parseEnvInt(comptime T: type, key: []const u8, default: T) T {
574 const val = std.posix.getenv(key) orelse return default;
575 return std.fmt.parseInt(T, val, 10) catch default;
576}
577
578// --- tests ---
579
580test "validateCommit skips on cache miss" {
581 var stats = broadcaster.Stats{};
582 var v = Validator.init(std.testing.allocator, &stats);
583 defer v.deinit();
584
585 // build a commit payload using SDK
586 const payload: zat.cbor.Value = .{ .map = &.{
587 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
588 .{ .key = "seq", .value = .{ .unsigned = 42 } },
589 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } },
590 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } },
591 } };
592
593 const result = v.validateCommit(payload);
594 try std.testing.expect(result.valid);
595 try std.testing.expect(result.skipped);
596 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire));
597}
598
599test "validateCommit skips when no repo field" {
600 var stats = broadcaster.Stats{};
601 var v = Validator.init(std.testing.allocator, &stats);
602 defer v.deinit();
603
604 // payload without "repo" field
605 const payload: zat.cbor.Value = .{ .map = &.{
606 .{ .key = "seq", .value = .{ .unsigned = 42 } },
607 } };
608
609 const result = v.validateCommit(payload);
610 try std.testing.expect(result.valid);
611 try std.testing.expect(result.skipped);
612 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire));
613}
614
615test "checkCommitStructure rejects invalid DID" {
616 var stats = broadcaster.Stats{};
617 var v = Validator.init(std.testing.allocator, &stats);
618 defer v.deinit();
619
620 const payload: zat.cbor.Value = .{ .map = &.{
621 .{ .key = "repo", .value = .{ .text = "not-a-did" } },
622 } };
623
624 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload));
625}
626
627test "checkCommitStructure accepts valid commit" {
628 var stats = broadcaster.Stats{};
629 var v = Validator.init(std.testing.allocator, &stats);
630 defer v.deinit();
631
632 const payload: zat.cbor.Value = .{ .map = &.{
633 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
634 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
635 } };
636
637 try v.checkCommitStructure(payload);
638}
639
640test "validateSync skips on cache miss" {
641 var stats = broadcaster.Stats{};
642 var v = Validator.init(std.testing.allocator, &stats);
643 defer v.deinit();
644
645 const payload: zat.cbor.Value = .{ .map = &.{
646 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
647 .{ .key = "seq", .value = .{ .unsigned = 42 } },
648 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
649 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } },
650 } };
651
652 const result = v.validateSync(payload);
653 try std.testing.expect(result.valid);
654 try std.testing.expect(result.skipped);
655 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire));
656}
657
658test "validateSync rejects invalid DID" {
659 var stats = broadcaster.Stats{};
660 var v = Validator.init(std.testing.allocator, &stats);
661 defer v.deinit();
662
663 const payload: zat.cbor.Value = .{ .map = &.{
664 .{ .key = "did", .value = .{ .text = "not-a-did" } },
665 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } },
666 } };
667
668 const result = v.validateSync(payload);
669 try std.testing.expect(!result.valid);
670 try std.testing.expect(!result.skipped);
671 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire));
672}
673
674test "validateSync rejects missing blocks" {
675 var stats = broadcaster.Stats{};
676 var v = Validator.init(std.testing.allocator, &stats);
677 defer v.deinit();
678
679 const payload: zat.cbor.Value = .{ .map = &.{
680 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
681 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
682 } };
683
684 const result = v.validateSync(payload);
685 try std.testing.expect(!result.valid);
686 try std.testing.expect(!result.skipped);
687 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire));
688}
689
690test "validateSync skips when no did field" {
691 var stats = broadcaster.Stats{};
692 var v = Validator.init(std.testing.allocator, &stats);
693 defer v.deinit();
694
695 const payload: zat.cbor.Value = .{ .map = &.{
696 .{ .key = "seq", .value = .{ .unsigned = 42 } },
697 } };
698
699 const result = v.validateSync(payload);
700 try std.testing.expect(result.valid);
701 try std.testing.expect(result.skipped);
702 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire));
703}
704
705test "LRU cache evicts least recently used" {
706 var stats = broadcaster.Stats{};
707 var v = Validator.init(std.testing.allocator, &stats);
708 v.cache.capacity = 3;
709 defer v.deinit();
710
711 const mk = CachedKey{ .key_type = .p256, .raw = .{0} ** 33, .len = 33 };
712
713 try v.cache.put("did:plc:aaa", mk);
714 try v.cache.put("did:plc:bbb", mk);
715 try v.cache.put("did:plc:ccc", mk);
716
717 // access "aaa" to promote it
718 _ = v.cache.get("did:plc:aaa");
719
720 // insert "ddd" — should evict "bbb" (LRU)
721 try v.cache.put("did:plc:ddd", mk);
722
723 try std.testing.expect(v.cache.get("did:plc:bbb") == null);
724 try std.testing.expect(v.cache.get("did:plc:aaa") != null);
725 try std.testing.expect(v.cache.get("did:plc:ccc") != null);
726 try std.testing.expect(v.cache.get("did:plc:ddd") != null);
727 try std.testing.expectEqual(@as(u32, 3), v.cache.count());
728}
729
730test "checkCommitStructure rejects too many ops" {
731 var stats = broadcaster.Stats{};
732 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 });
733 defer v.deinit();
734
735 // build ops array with 3 items (over limit of 2)
736 const ops = [_]zat.cbor.Value{
737 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
738 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
739 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
740 };
741
742 const payload: zat.cbor.Value = .{ .map = &.{
743 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
744 .{ .key = "ops", .value = .{ .array = &ops } },
745 } };
746
747 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload));
748}
749
750// --- spec conformance tests ---
751
752test "spec: #commit blocks > 2,000,000 bytes rejected" {
753 // lexicon maxLength for #commit blocks: 2,000,000
754 var stats = broadcaster.Stats{};
755 var v = Validator.init(std.testing.allocator, &stats);
756 defer v.deinit();
757
758 // insert a fake cached key so we reach the blocks size check
759 const did = "did:plc:test123";
760 try v.cache.put(did, .{
761 .key_type = .p256,
762 .raw = .{0} ** 33,
763 .len = 33,
764 .resolve_time = 100,
765 });
766
767 // blocks with 2,000,001 bytes (1 byte over limit)
768 const oversized_blocks = try std.testing.allocator.alloc(u8, 2_000_001);
769 defer std.testing.allocator.free(oversized_blocks);
770 @memset(oversized_blocks, 0);
771
772 const payload: zat.cbor.Value = .{ .map = &.{
773 .{ .key = "repo", .value = .{ .text = did } },
774 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
775 .{ .key = "blocks", .value = .{ .bytes = oversized_blocks } },
776 } };
777
778 const result = v.validateCommit(payload);
779 try std.testing.expect(!result.valid or result.skipped);
780}
781
782test "spec: #commit blocks = 2,000,000 bytes accepted (boundary)" {
783 // lexicon maxLength for #commit blocks: 2,000,000 — exactly at limit should pass size check
784 var stats = broadcaster.Stats{};
785 var v = Validator.init(std.testing.allocator, &stats);
786 defer v.deinit();
787
788 const did = "did:plc:test123";
789 try v.cache.put(did, .{
790 .key_type = .p256,
791 .raw = .{0} ** 33,
792 .len = 33,
793 .resolve_time = 100,
794 });
795
796 // exactly 2,000,000 bytes — should pass size check (may fail signature verify, that's ok)
797 const exact_blocks = try std.testing.allocator.alloc(u8, 2_000_000);
798 defer std.testing.allocator.free(exact_blocks);
799 @memset(exact_blocks, 0);
800
801 const payload: zat.cbor.Value = .{ .map = &.{
802 .{ .key = "repo", .value = .{ .text = did } },
803 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
804 .{ .key = "blocks", .value = .{ .bytes = exact_blocks } },
805 } };
806
807 const result = v.validateCommit(payload);
808 // should not be rejected for size — may fail signature verification (that's fine,
809 // it means we passed the size check). with P1.1c, sig failure → skipped=true.
810 try std.testing.expect(result.valid or result.skipped);
811}
812
813test "spec: #sync blocks > 10,000 bytes rejected" {
814 // lexicon maxLength for #sync blocks: 10,000
815 var stats = broadcaster.Stats{};
816 var v = Validator.init(std.testing.allocator, &stats);
817 defer v.deinit();
818
819 const payload: zat.cbor.Value = .{ .map = &.{
820 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
821 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
822 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_001) } },
823 } };
824
825 const result = v.validateSync(payload);
826 try std.testing.expect(!result.valid);
827 try std.testing.expect(!result.skipped);
828}
829
830test "spec: #sync blocks = 10,000 bytes accepted (boundary)" {
831 // lexicon maxLength for #sync blocks: 10,000 — exactly at limit should pass size check
832 var stats = broadcaster.Stats{};
833 var v = Validator.init(std.testing.allocator, &stats);
834 defer v.deinit();
835
836 const payload: zat.cbor.Value = .{ .map = &.{
837 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
838 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
839 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_000) } },
840 } };
841
842 const result = v.validateSync(payload);
843 // should pass size check — will be a cache miss → skipped (no cached key)
844 try std.testing.expect(result.valid);
845 try std.testing.expect(result.skipped);
846}
847
848test "extractOps reads path field from firehose format" {
849 var stats = broadcaster.Stats{};
850 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true });
851 defer v.deinit();
852
853 // use arena since extractOps allocates an ArrayList internally
854 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
855 defer arena.deinit();
856
857 const ops = [_]zat.cbor.Value{
858 .{ .map = &.{
859 .{ .key = "action", .value = .{ .text = "create" } },
860 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } },
861 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } },
862 } },
863 .{ .map = &.{
864 .{ .key = "action", .value = .{ .text = "delete" } },
865 .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } },
866 } },
867 };
868
869 const payload: zat.cbor.Value = .{ .map = &.{
870 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
871 .{ .key = "ops", .value = .{ .array = &ops } },
872 } };
873
874 const result = v.extractOps(arena.allocator(), payload);
875 try std.testing.expect(result != null);
876 try std.testing.expectEqual(@as(usize, 2), result.?.len);
877 try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path);
878 try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path);
879 try std.testing.expect(result.?[0].value != null); // create has cid
880 try std.testing.expect(result.?[1].value == null); // delete has no cid
881}
882
883test "extractOps rejects malformed path without slash" {
884 var stats = broadcaster.Stats{};
885 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true });
886 defer v.deinit();
887
888 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
889 defer arena.deinit();
890
891 const ops = [_]zat.cbor.Value{
892 .{ .map = &.{
893 .{ .key = "action", .value = .{ .text = "create" } },
894 .{ .key = "path", .value = .{ .text = "noslash" } },
895 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } },
896 } },
897 };
898
899 const payload: zat.cbor.Value = .{ .map = &.{
900 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
901 .{ .key = "ops", .value = .{ .array = &ops } },
902 } };
903
904 // malformed path (no slash) → all ops skipped → returns null
905 const result = v.extractOps(arena.allocator(), payload);
906 try std.testing.expect(result == null);
907}
908
909test "checkCommitStructure validates path field" {
910 var stats = broadcaster.Stats{};
911 var v = Validator.init(std.testing.allocator, &stats);
912 defer v.deinit();
913
914 // valid path
915 const valid_ops = [_]zat.cbor.Value{
916 .{ .map = &.{
917 .{ .key = "action", .value = .{ .text = "create" } },
918 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } },
919 } },
920 };
921
922 const valid_payload: zat.cbor.Value = .{ .map = &.{
923 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
924 .{ .key = "ops", .value = .{ .array = &valid_ops } },
925 } };
926
927 try v.checkCommitStructure(valid_payload);
928
929 // invalid collection in path
930 const invalid_ops = [_]zat.cbor.Value{
931 .{ .map = &.{
932 .{ .key = "action", .value = .{ .text = "create" } },
933 .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } },
934 } },
935 };
936
937 const invalid_payload: zat.cbor.Value = .{ .map = &.{
938 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
939 .{ .key = "ops", .value = .{ .array = &invalid_ops } },
940 } };
941
942 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload));
943}
944
945test "queueResolve deduplicates repeated DIDs" {
946 var stats = broadcaster.Stats{};
947 var v = Validator.init(std.testing.allocator, &stats);
948 defer v.deinit();
949
950 // queue the same DID 100 times
951 for (0..100) |_| {
952 v.queueResolve("did:plc:duplicate");
953 }
954
955 // should have exactly 1 entry, not 100
956 try std.testing.expectEqual(@as(usize, 1), v.queue.items.len);
957 try std.testing.expectEqual(@as(u32, 1), v.queued_set.count());
958}