zig library for atproto applications
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

jetstream: handle partial message reads

rockorager.dev f99758b2 04691ae8

verified
+32 -4
+28 -4
src/jetstream.zig
··· 25 25 conn: *stda.tls.Client, 26 26 }, 27 27 28 + buffer: std.ArrayListUnmanaged(u8) = .empty, 29 + 28 30 const hosts = [_][]const u8{ 29 31 "jetstream1.us-east.bsky.network", 30 32 "jetstream2.us-east.bsky.network", ··· 138 140 pub fn deinit(self: *Stream) void { 139 141 self.gpa.free(self.query); 140 142 self.gpa.free(self.challenge); 143 + self.buffer.deinit(self.gpa); 141 144 switch (self.state) { 142 145 .dns => {}, 143 146 .connect => {}, ··· 281 284 const n = try result.recv; 282 285 const buf = self.state.conn.read_buf[0..n]; 283 286 try self.decodeFrames(io, buf); 284 - @panic("here"); 285 287 }, 286 288 287 289 .write => { ··· 295 297 } 296 298 297 299 fn decodeFrames(self: *Stream, io: *ourio.Ring, bytes: []const u8) !void { 298 - var iter: FrameIterator = .{ .bytes = bytes }; 300 + try self.buffer.appendSlice(self.gpa, bytes); 301 + var iter: FrameIterator = .{ .bytes = self.buffer.items }; 299 302 while (try iter.next()) |data| { 300 303 switch (data) { 301 304 .text => |s| { ··· 310 313 else => return error.UnsupportedOp, 311 314 } 312 315 } 316 + 317 + self.buffer.replaceRangeAssumeCapacity(0, iter.idx, ""); 313 318 } 314 319 }; 315 320 ··· 356 361 if (byte2.mask) return error.MaskNotSupported; 357 362 358 363 const len: usize, const data_start: usize = switch (byte2.len) { 359 - 126 => .{ std.mem.readInt(u16, self.bytes[self.idx..][0..2], .big), self.idx + 2 }, 360 - 127 => .{ std.mem.readInt(u64, self.bytes[self.idx..][0..8], .big), self.idx + 8 }, 364 + 126 => blk: { 365 + if (self.idx + 2 > self.bytes.len) { 366 + self.idx -= 2; 367 + return null; 368 + } 369 + break :blk .{ 370 + std.mem.readInt(u16, self.bytes[self.idx..][0..2], .big), 371 + self.idx + 2, 372 + }; 373 + }, 374 + 127 => blk: { 375 + if (self.idx + 8 > self.bytes.len) { 376 + self.idx -= 2; 377 + return null; 378 + } 379 + break :blk .{ std.mem.readInt(u64, self.bytes[self.idx..][0..8], .big), self.idx + 8 }; 380 + }, 361 381 else => .{ byte2.len, self.idx }, 362 382 }; 363 383 364 384 const end = data_start + len; 385 + if (data_start >= self.bytes.len or end > self.bytes.len) { 386 + self.idx -|= 2; 387 + return null; 388 + } 365 389 defer self.idx = end; 366 390 367 391 switch (byte1.opcode) {
+4
src/root.zig
··· 1 1 const std = @import("std"); 2 2 const testing = std.testing; 3 3 4 + const jetstream = @import("jetstream.zig"); 5 + 6 + pub const Jetstream = jetstream.Stream; 7 + 4 8 test { 5 9 _ = @import("jetstream.zig"); 6 10 }