//! fixed-size ring buffer for firehose frames //! //! used for per-consumer send buffers (non-blocking broadcast) //! and global frame history (cursor replay). const std = @import("std"); const Allocator = std.mem.Allocator; pub const Frame = struct { seq: u64, data: []const u8, // owned by the ring buffer pub const empty: Frame = .{ .seq = 0, .data = &.{} }; }; /// thread-safe fixed-size ring buffer of frames. /// push overwrites oldest entries when full. /// data is duped into the buffer and freed on overwrite/deinit. pub fn RingBuffer(comptime capacity: usize) type { return struct { entries: [capacity]Frame = [_]Frame{Frame.empty} ** capacity, write_pos: usize = 0, // next write position read_pos: usize = 0, // next read position (for pop) len: usize = 0, allocator: Allocator, mutex: std.Thread.Mutex = .{}, const Self = @This(); pub fn init(allocator: Allocator) Self { return .{ .allocator = allocator }; } pub fn deinit(self: *Self) void { // free all live entries var i: usize = 0; while (i < self.len) : (i += 1) { const idx = (self.read_pos + i) % capacity; const entry = self.entries[idx]; if (entry.data.len > 0) { self.allocator.free(entry.data); } } self.* = undefined; } /// push a frame. if full, overwrites oldest. returns false if alloc failed. pub fn push(self: *Self, seq: u64, data: []const u8) bool { self.mutex.lock(); defer self.mutex.unlock(); return self.pushUnlocked(seq, data); } fn pushUnlocked(self: *Self, seq: u64, data: []const u8) bool { const duped = self.allocator.dupe(u8, data) catch return false; // free old entry if overwriting if (self.len == capacity) { const old = self.entries[self.write_pos]; if (old.data.len > 0) { self.allocator.free(old.data); } // advance read_pos since we're overwriting the oldest self.read_pos = (self.read_pos + 1) % capacity; } else { self.len += 1; } self.entries[self.write_pos] = .{ .seq = seq, .data = duped }; self.write_pos = (self.write_pos + 1) % capacity; return true; } /// pop the oldest frame. caller owns the returned data. pub fn pop(self: *Self) ?Frame { self.mutex.lock(); defer self.mutex.unlock(); return self.popUnlocked(); } fn popUnlocked(self: *Self) ?Frame { if (self.len == 0) return null; const frame = self.entries[self.read_pos]; self.entries[self.read_pos] = Frame.empty; self.read_pos = (self.read_pos + 1) % capacity; self.len -= 1; return frame; } /// number of frames currently buffered (non-blocking — returns 0 if lock is contended) pub fn count(self: *Self) usize { if (!self.mutex.tryLock()) return 0; defer self.mutex.unlock(); return self.len; } /// check if buffer is full pub fn isFull(self: *Self) bool { self.mutex.lock(); defer self.mutex.unlock(); return self.len == capacity; } /// get all frames with seq > cursor, ordered by seq. /// caller owns the returned slice AND frame data. pub fn framesSince(self: *Self, allocator: Allocator, cursor: u64) ![]const Frame { self.mutex.lock(); defer self.mutex.unlock(); var result: std.ArrayList(Frame) = .{}; errdefer { for (result.items) |f| allocator.free(f.data); result.deinit(allocator); } var i: usize = 0; while (i < self.len) : (i += 1) { const idx = (self.read_pos + i) % capacity; const entry = self.entries[idx]; if (entry.seq > cursor) { const duped = try allocator.dupe(u8, entry.data); try result.append(allocator, .{ .seq = entry.seq, .data = duped }); } } return try result.toOwnedSlice(allocator); } /// oldest seq in the buffer, or null if empty pub fn oldestSeq(self: *Self) ?u64 { self.mutex.lock(); defer self.mutex.unlock(); if (self.len == 0) return null; return self.entries[self.read_pos].seq; } /// newest seq in the buffer, or null if empty pub fn newestSeq(self: *Self) ?u64 { self.mutex.lock(); defer self.mutex.unlock(); if (self.len == 0) return null; const newest_idx = if (self.write_pos == 0) capacity - 1 else self.write_pos - 1; return self.entries[newest_idx].seq; } }; } // === tests === test "push and pop" { var buf = RingBuffer(4).init(std.testing.allocator); defer buf.deinit(); try std.testing.expect(buf.push(1, "hello")); try std.testing.expect(buf.push(2, "world")); try std.testing.expectEqual(@as(usize, 2), buf.count()); const f1 = buf.pop().?; defer std.testing.allocator.free(f1.data); try std.testing.expectEqual(@as(u64, 1), f1.seq); try std.testing.expectEqualStrings("hello", f1.data); const f2 = buf.pop().?; defer std.testing.allocator.free(f2.data); try std.testing.expectEqual(@as(u64, 2), f2.seq); try std.testing.expect(buf.pop() == null); } test "overwrite when full" { var buf = RingBuffer(3).init(std.testing.allocator); defer buf.deinit(); try std.testing.expect(buf.push(1, "a")); try std.testing.expect(buf.push(2, "b")); try std.testing.expect(buf.push(3, "c")); try std.testing.expectEqual(@as(usize, 3), buf.count()); // push overwrites oldest (seq=1) try std.testing.expect(buf.push(4, "d")); try std.testing.expectEqual(@as(usize, 3), buf.count()); const f1 = buf.pop().?; defer std.testing.allocator.free(f1.data); try std.testing.expectEqual(@as(u64, 2), f1.seq); } test "framesSince" { var buf = RingBuffer(8).init(std.testing.allocator); defer buf.deinit(); for (1..6) |i| { try std.testing.expect(buf.push(@intCast(i), "data")); } const frames = try buf.framesSince(std.testing.allocator, 3); defer { for (frames) |f| std.testing.allocator.free(f.data); std.testing.allocator.free(frames); } try std.testing.expectEqual(@as(usize, 2), frames.len); try std.testing.expectEqual(@as(u64, 4), frames[0].seq); try std.testing.expectEqual(@as(u64, 5), frames[1].seq); } test "oldestSeq and newestSeq" { var buf = RingBuffer(4).init(std.testing.allocator); defer buf.deinit(); try std.testing.expect(buf.oldestSeq() == null); try std.testing.expect(buf.newestSeq() == null); try std.testing.expect(buf.push(10, "x")); try std.testing.expect(buf.push(20, "y")); try std.testing.expect(buf.push(30, "z")); try std.testing.expectEqual(@as(u64, 10), buf.oldestSeq().?); try std.testing.expectEqual(@as(u64, 30), buf.newestSeq().?); } test "empty buffer operations" { var buf = RingBuffer(4).init(std.testing.allocator); defer buf.deinit(); try std.testing.expectEqual(@as(usize, 0), buf.count()); try std.testing.expect(!buf.isFull()); try std.testing.expect(buf.pop() == null); const frames = try buf.framesSince(std.testing.allocator, 0); defer std.testing.allocator.free(frames); try std.testing.expectEqual(@as(usize, 0), frames.len); } test "wrap-around with pop and push" { var buf = RingBuffer(3).init(std.testing.allocator); defer buf.deinit(); // fill try std.testing.expect(buf.push(1, "a")); try std.testing.expect(buf.push(2, "b")); try std.testing.expect(buf.push(3, "c")); // pop two const f1 = buf.pop().?; std.testing.allocator.free(f1.data); const f2 = buf.pop().?; std.testing.allocator.free(f2.data); // push two more (wraps around) try std.testing.expect(buf.push(4, "d")); try std.testing.expect(buf.push(5, "e")); try std.testing.expectEqual(@as(usize, 3), buf.count()); try std.testing.expectEqual(@as(u64, 3), buf.oldestSeq().?); try std.testing.expectEqual(@as(u64, 5), buf.newestSeq().?); }