atproto relay implementation in zig zlay.waow.tech
at main 256 lines 8.7 kB view raw
1//! fixed-size ring buffer for firehose frames 2//! 3//! used for per-consumer send buffers (non-blocking broadcast) 4//! and global frame history (cursor replay). 5 6const std = @import("std"); 7const Allocator = std.mem.Allocator; 8 9pub const Frame = struct { 10 seq: u64, 11 data: []const u8, // owned by the ring buffer 12 13 pub const empty: Frame = .{ .seq = 0, .data = &.{} }; 14}; 15 16/// thread-safe fixed-size ring buffer of frames. 17/// push overwrites oldest entries when full. 18/// data is duped into the buffer and freed on overwrite/deinit. 19pub fn RingBuffer(comptime capacity: usize) type { 20 return struct { 21 entries: [capacity]Frame = [_]Frame{Frame.empty} ** capacity, 22 write_pos: usize = 0, // next write position 23 read_pos: usize = 0, // next read position (for pop) 24 len: usize = 0, 25 allocator: Allocator, 26 mutex: std.Thread.Mutex = .{}, 27 28 const Self = @This(); 29 30 pub fn init(allocator: Allocator) Self { 31 return .{ .allocator = allocator }; 32 } 33 34 pub fn deinit(self: *Self) void { 35 // free all live entries 36 var i: usize = 0; 37 while (i < self.len) : (i += 1) { 38 const idx = (self.read_pos + i) % capacity; 39 const entry = self.entries[idx]; 40 if (entry.data.len > 0) { 41 self.allocator.free(entry.data); 42 } 43 } 44 self.* = undefined; 45 } 46 47 /// push a frame. if full, overwrites oldest. returns false if alloc failed. 48 pub fn push(self: *Self, seq: u64, data: []const u8) bool { 49 self.mutex.lock(); 50 defer self.mutex.unlock(); 51 return self.pushUnlocked(seq, data); 52 } 53 54 fn pushUnlocked(self: *Self, seq: u64, data: []const u8) bool { 55 const duped = self.allocator.dupe(u8, data) catch return false; 56 57 // free old entry if overwriting 58 if (self.len == capacity) { 59 const old = self.entries[self.write_pos]; 60 if (old.data.len > 0) { 61 self.allocator.free(old.data); 62 } 63 // advance read_pos since we're overwriting the oldest 64 self.read_pos = (self.read_pos + 1) % capacity; 65 } else { 66 self.len += 1; 67 } 68 69 self.entries[self.write_pos] = .{ .seq = seq, .data = duped }; 70 self.write_pos = (self.write_pos + 1) % capacity; 71 return true; 72 } 73 74 /// pop the oldest frame. caller owns the returned data. 75 pub fn pop(self: *Self) ?Frame { 76 self.mutex.lock(); 77 defer self.mutex.unlock(); 78 return self.popUnlocked(); 79 } 80 81 fn popUnlocked(self: *Self) ?Frame { 82 if (self.len == 0) return null; 83 const frame = self.entries[self.read_pos]; 84 self.entries[self.read_pos] = Frame.empty; 85 self.read_pos = (self.read_pos + 1) % capacity; 86 self.len -= 1; 87 return frame; 88 } 89 90 /// number of frames currently buffered (non-blocking — returns 0 if lock is contended) 91 pub fn count(self: *Self) usize { 92 if (!self.mutex.tryLock()) return 0; 93 defer self.mutex.unlock(); 94 return self.len; 95 } 96 97 /// check if buffer is full 98 pub fn isFull(self: *Self) bool { 99 self.mutex.lock(); 100 defer self.mutex.unlock(); 101 return self.len == capacity; 102 } 103 104 /// get all frames with seq > cursor, ordered by seq. 105 /// caller owns the returned slice AND frame data. 106 pub fn framesSince(self: *Self, allocator: Allocator, cursor: u64) ![]const Frame { 107 self.mutex.lock(); 108 defer self.mutex.unlock(); 109 110 var result: std.ArrayList(Frame) = .{}; 111 errdefer { 112 for (result.items) |f| allocator.free(f.data); 113 result.deinit(allocator); 114 } 115 116 var i: usize = 0; 117 while (i < self.len) : (i += 1) { 118 const idx = (self.read_pos + i) % capacity; 119 const entry = self.entries[idx]; 120 if (entry.seq > cursor) { 121 const duped = try allocator.dupe(u8, entry.data); 122 try result.append(allocator, .{ .seq = entry.seq, .data = duped }); 123 } 124 } 125 return try result.toOwnedSlice(allocator); 126 } 127 128 /// oldest seq in the buffer, or null if empty 129 pub fn oldestSeq(self: *Self) ?u64 { 130 self.mutex.lock(); 131 defer self.mutex.unlock(); 132 if (self.len == 0) return null; 133 return self.entries[self.read_pos].seq; 134 } 135 136 /// newest seq in the buffer, or null if empty 137 pub fn newestSeq(self: *Self) ?u64 { 138 self.mutex.lock(); 139 defer self.mutex.unlock(); 140 if (self.len == 0) return null; 141 const newest_idx = if (self.write_pos == 0) capacity - 1 else self.write_pos - 1; 142 return self.entries[newest_idx].seq; 143 } 144 }; 145} 146 147// === tests === 148 149test "push and pop" { 150 var buf = RingBuffer(4).init(std.testing.allocator); 151 defer buf.deinit(); 152 153 try std.testing.expect(buf.push(1, "hello")); 154 try std.testing.expect(buf.push(2, "world")); 155 try std.testing.expectEqual(@as(usize, 2), buf.count()); 156 157 const f1 = buf.pop().?; 158 defer std.testing.allocator.free(f1.data); 159 try std.testing.expectEqual(@as(u64, 1), f1.seq); 160 try std.testing.expectEqualStrings("hello", f1.data); 161 162 const f2 = buf.pop().?; 163 defer std.testing.allocator.free(f2.data); 164 try std.testing.expectEqual(@as(u64, 2), f2.seq); 165 166 try std.testing.expect(buf.pop() == null); 167} 168 169test "overwrite when full" { 170 var buf = RingBuffer(3).init(std.testing.allocator); 171 defer buf.deinit(); 172 173 try std.testing.expect(buf.push(1, "a")); 174 try std.testing.expect(buf.push(2, "b")); 175 try std.testing.expect(buf.push(3, "c")); 176 try std.testing.expectEqual(@as(usize, 3), buf.count()); 177 178 // push overwrites oldest (seq=1) 179 try std.testing.expect(buf.push(4, "d")); 180 try std.testing.expectEqual(@as(usize, 3), buf.count()); 181 182 const f1 = buf.pop().?; 183 defer std.testing.allocator.free(f1.data); 184 try std.testing.expectEqual(@as(u64, 2), f1.seq); 185} 186 187test "framesSince" { 188 var buf = RingBuffer(8).init(std.testing.allocator); 189 defer buf.deinit(); 190 191 for (1..6) |i| { 192 try std.testing.expect(buf.push(@intCast(i), "data")); 193 } 194 195 const frames = try buf.framesSince(std.testing.allocator, 3); 196 defer { 197 for (frames) |f| std.testing.allocator.free(f.data); 198 std.testing.allocator.free(frames); 199 } 200 201 try std.testing.expectEqual(@as(usize, 2), frames.len); 202 try std.testing.expectEqual(@as(u64, 4), frames[0].seq); 203 try std.testing.expectEqual(@as(u64, 5), frames[1].seq); 204} 205 206test "oldestSeq and newestSeq" { 207 var buf = RingBuffer(4).init(std.testing.allocator); 208 defer buf.deinit(); 209 210 try std.testing.expect(buf.oldestSeq() == null); 211 try std.testing.expect(buf.newestSeq() == null); 212 213 try std.testing.expect(buf.push(10, "x")); 214 try std.testing.expect(buf.push(20, "y")); 215 try std.testing.expect(buf.push(30, "z")); 216 217 try std.testing.expectEqual(@as(u64, 10), buf.oldestSeq().?); 218 try std.testing.expectEqual(@as(u64, 30), buf.newestSeq().?); 219} 220 221test "empty buffer operations" { 222 var buf = RingBuffer(4).init(std.testing.allocator); 223 defer buf.deinit(); 224 225 try std.testing.expectEqual(@as(usize, 0), buf.count()); 226 try std.testing.expect(!buf.isFull()); 227 try std.testing.expect(buf.pop() == null); 228 229 const frames = try buf.framesSince(std.testing.allocator, 0); 230 defer std.testing.allocator.free(frames); 231 try std.testing.expectEqual(@as(usize, 0), frames.len); 232} 233 234test "wrap-around with pop and push" { 235 var buf = RingBuffer(3).init(std.testing.allocator); 236 defer buf.deinit(); 237 238 // fill 239 try std.testing.expect(buf.push(1, "a")); 240 try std.testing.expect(buf.push(2, "b")); 241 try std.testing.expect(buf.push(3, "c")); 242 243 // pop two 244 const f1 = buf.pop().?; 245 std.testing.allocator.free(f1.data); 246 const f2 = buf.pop().?; 247 std.testing.allocator.free(f2.data); 248 249 // push two more (wraps around) 250 try std.testing.expect(buf.push(4, "d")); 251 try std.testing.expect(buf.push(5, "e")); 252 253 try std.testing.expectEqual(@as(usize, 3), buf.count()); 254 try std.testing.expectEqual(@as(u64, 3), buf.oldestSeq().?); 255 try std.testing.expectEqual(@as(u64, 5), buf.newestSeq().?); 256}