"""Streaming send/receive with retransmits for I2P streaming protocol. Provides MessageInputStream (receive-side buffering and ACK generation), MessageOutputStream (send-side chunking and retransmit tracking), and StreamSession (connection state machine wrapping both streams). """ import random import struct import time class MessageInputStream: """Receive-side stream buffer with out-of-order reassembly and ACK generation.""" def __init__(self): self._buffer: dict[int, bytes] = {} self._next_expected_seq: int = 0 self._read_buffer: bytearray = bytearray() self._close_seq: int | None = None def receive_packet(self, seq_num: int, data: bytes, is_close: bool = False): """Buffer a received packet by sequence number.""" self._buffer[seq_num] = data if is_close: self._close_seq = seq_num # Advance contiguous data into the read buffer self._flush_contiguous() def _flush_contiguous(self): """Move contiguous in-order packets from buffer to read buffer.""" while self._next_expected_seq in self._buffer: self._read_buffer.extend(self._buffer.pop(self._next_expected_seq)) self._next_expected_seq += 1 def read(self, n: int) -> bytes: """Return up to n bytes of in-order data. Non-blocking.""" result = bytes(self._read_buffer[:n]) del self._read_buffer[:n] return result def readable_bytes(self) -> int: """Number of bytes available to read in order.""" return len(self._read_buffer) def generate_acks(self) -> tuple[int, list[int]]: """Generate ACK information. Returns: (ack_through, nack_list) where ack_through is the highest contiguous sequence number received, and nack_list contains missing sequence numbers in gaps. """ ack_through = self._next_expected_seq - 1 nacks = [] if self._buffer: max_buffered = max(self._buffer.keys()) for seq in range(self._next_expected_seq, max_buffered): if seq not in self._buffer: nacks.append(seq) return ack_through, nacks def is_complete(self) -> bool: """True if CLOSE has been received and all data has been consumed.""" if self._close_seq is None: return False # All data through close_seq must have been flushed and read return (self._next_expected_seq > self._close_seq and len(self._read_buffer) == 0) class MessageOutputStream: """Send-side stream with chunking and retransmit tracking.""" def __init__(self, window_size: int = 64): self._window_size = window_size self._next_seq: int = 0 # Outstanding packets: seq -> (data, send_time_ms) self._outstanding: dict[int, tuple[bytes, int]] = {} def write(self, data: bytes, max_packet_size: int = 1024) -> list[tuple[int, bytes]]: """Split data into chunks, assign sequence numbers. Returns list of (seq_num, chunk) pairs. Adds them to the outstanding/retransmit queue. """ now_ms = int(time.time() * 1000) packets = [] offset = 0 while offset < len(data): chunk = data[offset:offset + max_packet_size] seq = self._next_seq self._next_seq += 1 self._outstanding[seq] = (chunk, now_ms) packets.append((seq, chunk)) offset += max_packet_size return packets def on_ack(self, ack_through: int, nacks: list[int] | None = None): """Remove acked packets from the retransmit queue. All seq <= ack_through AND not in nacks are considered acked. """ if nacks is None: nacks = [] nack_set = set(nacks) to_remove = [ seq for seq in self._outstanding if seq <= ack_through and seq not in nack_set ] for seq in to_remove: del self._outstanding[seq] def get_retransmit_packets(self, now_ms: int, rto_ms: int = 1000) -> list[tuple[int, bytes]]: """Return packets sent more than rto_ms ago that haven't been acked.""" result = [] for seq in sorted(self._outstanding): data, send_time = self._outstanding[seq] if now_ms - send_time >= rto_ms: result.append((seq, data)) # Update send time to now to avoid immediate re-retransmit self._outstanding[seq] = (data, now_ms) return result def pending_count(self) -> int: """Number of unacked packets.""" return len(self._outstanding) class StreamSession: """Connection state machine wrapping input and output streams. States: NEW, SYN_SENT, SYN_RECEIVED, ESTABLISHED, CLOSE_SENT, CLOSE_RECEIVED, CLOSED. """ _VALID_STATES = frozenset({ "NEW", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED", "CLOSE_SENT", "CLOSE_RECEIVED", "CLOSED", }) def __init__(self, local_id: int | None = None): if local_id is None: local_id = random.randint(0, 2**32 - 1) self.local_id = local_id self.remote_id: int | None = None self._state = "NEW" self._input = MessageInputStream() self._output = MessageOutputStream() @property def state(self) -> str: return self._state def _require_state(self, *states: str): if self._state not in states: raise RuntimeError( f"Invalid operation in state {self._state}, " f"expected one of {states}" ) def connect(self) -> tuple[int, bytes]: """Create SYN packet, transition to SYN_SENT. Returns (seq_num, syn_data) where syn_data is a minimal SYN marker. """ self._require_state("NEW") self._state = "SYN_SENT" seq = 0 # SYN data encodes the local stream ID syn_data = struct.pack("!I", self.local_id) return seq, syn_data def accept(self, remote_id: int) -> tuple[int, bytes]: """Accept incoming SYN, transition to ESTABLISHED. Returns (seq_num, syn_ack_data). """ self._require_state("NEW") self.remote_id = remote_id self._state = "ESTABLISHED" seq = 0 syn_ack_data = struct.pack("!II", self.local_id, remote_id) return seq, syn_ack_data def receive_syn_ack(self, remote_id: int): """Process SYN-ACK, transition from SYN_SENT to ESTABLISHED.""" self._require_state("SYN_SENT") self.remote_id = remote_id self._state = "ESTABLISHED" def send(self, data: bytes) -> list[tuple[int, bytes]]: """Send data. Delegates to the output stream.""" self._require_state("ESTABLISHED") return self._output.write(data) def receive(self, seq_num: int, data: bytes, is_close: bool = False): """Receive data packet. Delegates to the input stream.""" self._input.receive_packet(seq_num, data, is_close=is_close) if is_close: if self._state == "CLOSE_SENT": self._state = "CLOSED" else: self._state = "CLOSE_RECEIVED" def read(self, n: int) -> bytes: """Read received data. Delegates to the input stream.""" return self._input.read(n) def close(self) -> tuple[int, bytes]: """Send CLOSE, transition to CLOSE_SENT. Returns (seq_num, close_data). """ self._require_state("ESTABLISHED") self._state = "CLOSE_SENT" close_data = struct.pack("!I", self.local_id) return 0, close_data def receive_close(self): """Handle receiving a CLOSE from remote. Transition to CLOSED.""" if self._state == "CLOSE_SENT": self._state = "CLOSED" elif self._state == "ESTABLISHED": self._state = "CLOSE_RECEIVED" else: self._state = "CLOSED"