atproto relay implementation in zig
zlay.waow.tech
1//! fixed-size ring buffer for firehose frames
2//!
3//! used for per-consumer send buffers (non-blocking broadcast)
4//! and global frame history (cursor replay).
5
6const std = @import("std");
7const Allocator = std.mem.Allocator;
8
9pub const Frame = struct {
10 seq: u64,
11 data: []const u8, // owned by the ring buffer
12
13 pub const empty: Frame = .{ .seq = 0, .data = &.{} };
14};
15
16/// thread-safe fixed-size ring buffer of frames.
17/// push overwrites oldest entries when full.
18/// data is duped into the buffer and freed on overwrite/deinit.
19pub fn RingBuffer(comptime capacity: usize) type {
20 return struct {
21 entries: [capacity]Frame = [_]Frame{Frame.empty} ** capacity,
22 write_pos: usize = 0, // next write position
23 read_pos: usize = 0, // next read position (for pop)
24 len: usize = 0,
25 allocator: Allocator,
26 mutex: std.Thread.Mutex = .{},
27
28 const Self = @This();
29
30 pub fn init(allocator: Allocator) Self {
31 return .{ .allocator = allocator };
32 }
33
34 pub fn deinit(self: *Self) void {
35 // free all live entries
36 var i: usize = 0;
37 while (i < self.len) : (i += 1) {
38 const idx = (self.read_pos + i) % capacity;
39 const entry = self.entries[idx];
40 if (entry.data.len > 0) {
41 self.allocator.free(entry.data);
42 }
43 }
44 self.* = undefined;
45 }
46
47 /// push a frame. if full, overwrites oldest. returns false if alloc failed.
48 pub fn push(self: *Self, seq: u64, data: []const u8) bool {
49 self.mutex.lock();
50 defer self.mutex.unlock();
51 return self.pushUnlocked(seq, data);
52 }
53
54 fn pushUnlocked(self: *Self, seq: u64, data: []const u8) bool {
55 const duped = self.allocator.dupe(u8, data) catch return false;
56
57 // free old entry if overwriting
58 if (self.len == capacity) {
59 const old = self.entries[self.write_pos];
60 if (old.data.len > 0) {
61 self.allocator.free(old.data);
62 }
63 // advance read_pos since we're overwriting the oldest
64 self.read_pos = (self.read_pos + 1) % capacity;
65 } else {
66 self.len += 1;
67 }
68
69 self.entries[self.write_pos] = .{ .seq = seq, .data = duped };
70 self.write_pos = (self.write_pos + 1) % capacity;
71 return true;
72 }
73
74 /// pop the oldest frame. caller owns the returned data.
75 pub fn pop(self: *Self) ?Frame {
76 self.mutex.lock();
77 defer self.mutex.unlock();
78 return self.popUnlocked();
79 }
80
81 fn popUnlocked(self: *Self) ?Frame {
82 if (self.len == 0) return null;
83 const frame = self.entries[self.read_pos];
84 self.entries[self.read_pos] = Frame.empty;
85 self.read_pos = (self.read_pos + 1) % capacity;
86 self.len -= 1;
87 return frame;
88 }
89
90 /// number of frames currently buffered (non-blocking — returns 0 if lock is contended)
91 pub fn count(self: *Self) usize {
92 if (!self.mutex.tryLock()) return 0;
93 defer self.mutex.unlock();
94 return self.len;
95 }
96
97 /// check if buffer is full
98 pub fn isFull(self: *Self) bool {
99 self.mutex.lock();
100 defer self.mutex.unlock();
101 return self.len == capacity;
102 }
103
104 /// get all frames with seq > cursor, ordered by seq.
105 /// caller owns the returned slice AND frame data.
106 pub fn framesSince(self: *Self, allocator: Allocator, cursor: u64) ![]const Frame {
107 self.mutex.lock();
108 defer self.mutex.unlock();
109
110 var result: std.ArrayList(Frame) = .{};
111 errdefer {
112 for (result.items) |f| allocator.free(f.data);
113 result.deinit(allocator);
114 }
115
116 var i: usize = 0;
117 while (i < self.len) : (i += 1) {
118 const idx = (self.read_pos + i) % capacity;
119 const entry = self.entries[idx];
120 if (entry.seq > cursor) {
121 const duped = try allocator.dupe(u8, entry.data);
122 try result.append(allocator, .{ .seq = entry.seq, .data = duped });
123 }
124 }
125 return try result.toOwnedSlice(allocator);
126 }
127
128 /// oldest seq in the buffer, or null if empty
129 pub fn oldestSeq(self: *Self) ?u64 {
130 self.mutex.lock();
131 defer self.mutex.unlock();
132 if (self.len == 0) return null;
133 return self.entries[self.read_pos].seq;
134 }
135
136 /// newest seq in the buffer, or null if empty
137 pub fn newestSeq(self: *Self) ?u64 {
138 self.mutex.lock();
139 defer self.mutex.unlock();
140 if (self.len == 0) return null;
141 const newest_idx = if (self.write_pos == 0) capacity - 1 else self.write_pos - 1;
142 return self.entries[newest_idx].seq;
143 }
144 };
145}
146
147// === tests ===
148
149test "push and pop" {
150 var buf = RingBuffer(4).init(std.testing.allocator);
151 defer buf.deinit();
152
153 try std.testing.expect(buf.push(1, "hello"));
154 try std.testing.expect(buf.push(2, "world"));
155 try std.testing.expectEqual(@as(usize, 2), buf.count());
156
157 const f1 = buf.pop().?;
158 defer std.testing.allocator.free(f1.data);
159 try std.testing.expectEqual(@as(u64, 1), f1.seq);
160 try std.testing.expectEqualStrings("hello", f1.data);
161
162 const f2 = buf.pop().?;
163 defer std.testing.allocator.free(f2.data);
164 try std.testing.expectEqual(@as(u64, 2), f2.seq);
165
166 try std.testing.expect(buf.pop() == null);
167}
168
169test "overwrite when full" {
170 var buf = RingBuffer(3).init(std.testing.allocator);
171 defer buf.deinit();
172
173 try std.testing.expect(buf.push(1, "a"));
174 try std.testing.expect(buf.push(2, "b"));
175 try std.testing.expect(buf.push(3, "c"));
176 try std.testing.expectEqual(@as(usize, 3), buf.count());
177
178 // push overwrites oldest (seq=1)
179 try std.testing.expect(buf.push(4, "d"));
180 try std.testing.expectEqual(@as(usize, 3), buf.count());
181
182 const f1 = buf.pop().?;
183 defer std.testing.allocator.free(f1.data);
184 try std.testing.expectEqual(@as(u64, 2), f1.seq);
185}
186
187test "framesSince" {
188 var buf = RingBuffer(8).init(std.testing.allocator);
189 defer buf.deinit();
190
191 for (1..6) |i| {
192 try std.testing.expect(buf.push(@intCast(i), "data"));
193 }
194
195 const frames = try buf.framesSince(std.testing.allocator, 3);
196 defer {
197 for (frames) |f| std.testing.allocator.free(f.data);
198 std.testing.allocator.free(frames);
199 }
200
201 try std.testing.expectEqual(@as(usize, 2), frames.len);
202 try std.testing.expectEqual(@as(u64, 4), frames[0].seq);
203 try std.testing.expectEqual(@as(u64, 5), frames[1].seq);
204}
205
206test "oldestSeq and newestSeq" {
207 var buf = RingBuffer(4).init(std.testing.allocator);
208 defer buf.deinit();
209
210 try std.testing.expect(buf.oldestSeq() == null);
211 try std.testing.expect(buf.newestSeq() == null);
212
213 try std.testing.expect(buf.push(10, "x"));
214 try std.testing.expect(buf.push(20, "y"));
215 try std.testing.expect(buf.push(30, "z"));
216
217 try std.testing.expectEqual(@as(u64, 10), buf.oldestSeq().?);
218 try std.testing.expectEqual(@as(u64, 30), buf.newestSeq().?);
219}
220
221test "empty buffer operations" {
222 var buf = RingBuffer(4).init(std.testing.allocator);
223 defer buf.deinit();
224
225 try std.testing.expectEqual(@as(usize, 0), buf.count());
226 try std.testing.expect(!buf.isFull());
227 try std.testing.expect(buf.pop() == null);
228
229 const frames = try buf.framesSince(std.testing.allocator, 0);
230 defer std.testing.allocator.free(frames);
231 try std.testing.expectEqual(@as(usize, 0), frames.len);
232}
233
234test "wrap-around with pop and push" {
235 var buf = RingBuffer(3).init(std.testing.allocator);
236 defer buf.deinit();
237
238 // fill
239 try std.testing.expect(buf.push(1, "a"));
240 try std.testing.expect(buf.push(2, "b"));
241 try std.testing.expect(buf.push(3, "c"));
242
243 // pop two
244 const f1 = buf.pop().?;
245 std.testing.allocator.free(f1.data);
246 const f2 = buf.pop().?;
247 std.testing.allocator.free(f2.data);
248
249 // push two more (wraps around)
250 try std.testing.expect(buf.push(4, "d"));
251 try std.testing.expect(buf.push(5, "e"));
252
253 try std.testing.expectEqual(@as(usize, 3), buf.count());
254 try std.testing.expectEqual(@as(u64, 3), buf.oldestSeq().?);
255 try std.testing.expectEqual(@as(u64, 5), buf.newestSeq().?);
256}