A Python port of the Invisible Internet Project (I2P)
at main 229 lines 8.1 kB view raw
1"""Streaming send/receive with retransmits for I2P streaming protocol. 2 3Provides MessageInputStream (receive-side buffering and ACK generation), 4MessageOutputStream (send-side chunking and retransmit tracking), and 5StreamSession (connection state machine wrapping both streams). 6""" 7 8import random 9import struct 10import time 11 12 13class MessageInputStream: 14 """Receive-side stream buffer with out-of-order reassembly and ACK generation.""" 15 16 def __init__(self): 17 self._buffer: dict[int, bytes] = {} 18 self._next_expected_seq: int = 0 19 self._read_buffer: bytearray = bytearray() 20 self._close_seq: int | None = None 21 22 def receive_packet(self, seq_num: int, data: bytes, is_close: bool = False): 23 """Buffer a received packet by sequence number.""" 24 self._buffer[seq_num] = data 25 if is_close: 26 self._close_seq = seq_num 27 # Advance contiguous data into the read buffer 28 self._flush_contiguous() 29 30 def _flush_contiguous(self): 31 """Move contiguous in-order packets from buffer to read buffer.""" 32 while self._next_expected_seq in self._buffer: 33 self._read_buffer.extend(self._buffer.pop(self._next_expected_seq)) 34 self._next_expected_seq += 1 35 36 def read(self, n: int) -> bytes: 37 """Return up to n bytes of in-order data. Non-blocking.""" 38 result = bytes(self._read_buffer[:n]) 39 del self._read_buffer[:n] 40 return result 41 42 def readable_bytes(self) -> int: 43 """Number of bytes available to read in order.""" 44 return len(self._read_buffer) 45 46 def generate_acks(self) -> tuple[int, list[int]]: 47 """Generate ACK information. 48 49 Returns: 50 (ack_through, nack_list) where ack_through is the highest 51 contiguous sequence number received, and nack_list contains 52 missing sequence numbers in gaps. 53 """ 54 ack_through = self._next_expected_seq - 1 55 56 nacks = [] 57 if self._buffer: 58 max_buffered = max(self._buffer.keys()) 59 for seq in range(self._next_expected_seq, max_buffered): 60 if seq not in self._buffer: 61 nacks.append(seq) 62 63 return ack_through, nacks 64 65 def is_complete(self) -> bool: 66 """True if CLOSE has been received and all data has been consumed.""" 67 if self._close_seq is None: 68 return False 69 # All data through close_seq must have been flushed and read 70 return (self._next_expected_seq > self._close_seq 71 and len(self._read_buffer) == 0) 72 73 74class MessageOutputStream: 75 """Send-side stream with chunking and retransmit tracking.""" 76 77 def __init__(self, window_size: int = 64): 78 self._window_size = window_size 79 self._next_seq: int = 0 80 # Outstanding packets: seq -> (data, send_time_ms) 81 self._outstanding: dict[int, tuple[bytes, int]] = {} 82 83 def write(self, data: bytes, max_packet_size: int = 1024) -> list[tuple[int, bytes]]: 84 """Split data into chunks, assign sequence numbers. 85 86 Returns list of (seq_num, chunk) pairs. Adds them to the 87 outstanding/retransmit queue. 88 """ 89 now_ms = int(time.time() * 1000) 90 packets = [] 91 offset = 0 92 while offset < len(data): 93 chunk = data[offset:offset + max_packet_size] 94 seq = self._next_seq 95 self._next_seq += 1 96 self._outstanding[seq] = (chunk, now_ms) 97 packets.append((seq, chunk)) 98 offset += max_packet_size 99 return packets 100 101 def on_ack(self, ack_through: int, nacks: list[int] | None = None): 102 """Remove acked packets from the retransmit queue. 103 104 All seq <= ack_through AND not in nacks are considered acked. 105 """ 106 if nacks is None: 107 nacks = [] 108 nack_set = set(nacks) 109 to_remove = [ 110 seq for seq in self._outstanding 111 if seq <= ack_through and seq not in nack_set 112 ] 113 for seq in to_remove: 114 del self._outstanding[seq] 115 116 def get_retransmit_packets(self, now_ms: int, rto_ms: int = 1000) -> list[tuple[int, bytes]]: 117 """Return packets sent more than rto_ms ago that haven't been acked.""" 118 result = [] 119 for seq in sorted(self._outstanding): 120 data, send_time = self._outstanding[seq] 121 if now_ms - send_time >= rto_ms: 122 result.append((seq, data)) 123 # Update send time to now to avoid immediate re-retransmit 124 self._outstanding[seq] = (data, now_ms) 125 return result 126 127 def pending_count(self) -> int: 128 """Number of unacked packets.""" 129 return len(self._outstanding) 130 131 132class StreamSession: 133 """Connection state machine wrapping input and output streams. 134 135 States: NEW, SYN_SENT, SYN_RECEIVED, ESTABLISHED, CLOSE_SENT, 136 CLOSE_RECEIVED, CLOSED. 137 """ 138 139 _VALID_STATES = frozenset({ 140 "NEW", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED", 141 "CLOSE_SENT", "CLOSE_RECEIVED", "CLOSED", 142 }) 143 144 def __init__(self, local_id: int | None = None): 145 if local_id is None: 146 local_id = random.randint(0, 2**32 - 1) 147 self.local_id = local_id 148 self.remote_id: int | None = None 149 self._state = "NEW" 150 self._input = MessageInputStream() 151 self._output = MessageOutputStream() 152 153 @property 154 def state(self) -> str: 155 return self._state 156 157 def _require_state(self, *states: str): 158 if self._state not in states: 159 raise RuntimeError( 160 f"Invalid operation in state {self._state}, " 161 f"expected one of {states}" 162 ) 163 164 def connect(self) -> tuple[int, bytes]: 165 """Create SYN packet, transition to SYN_SENT. 166 167 Returns (seq_num, syn_data) where syn_data is a minimal SYN marker. 168 """ 169 self._require_state("NEW") 170 self._state = "SYN_SENT" 171 seq = 0 172 # SYN data encodes the local stream ID 173 syn_data = struct.pack("!I", self.local_id) 174 return seq, syn_data 175 176 def accept(self, remote_id: int) -> tuple[int, bytes]: 177 """Accept incoming SYN, transition to ESTABLISHED. 178 179 Returns (seq_num, syn_ack_data). 180 """ 181 self._require_state("NEW") 182 self.remote_id = remote_id 183 self._state = "ESTABLISHED" 184 seq = 0 185 syn_ack_data = struct.pack("!II", self.local_id, remote_id) 186 return seq, syn_ack_data 187 188 def receive_syn_ack(self, remote_id: int): 189 """Process SYN-ACK, transition from SYN_SENT to ESTABLISHED.""" 190 self._require_state("SYN_SENT") 191 self.remote_id = remote_id 192 self._state = "ESTABLISHED" 193 194 def send(self, data: bytes) -> list[tuple[int, bytes]]: 195 """Send data. Delegates to the output stream.""" 196 self._require_state("ESTABLISHED") 197 return self._output.write(data) 198 199 def receive(self, seq_num: int, data: bytes, is_close: bool = False): 200 """Receive data packet. Delegates to the input stream.""" 201 self._input.receive_packet(seq_num, data, is_close=is_close) 202 if is_close: 203 if self._state == "CLOSE_SENT": 204 self._state = "CLOSED" 205 else: 206 self._state = "CLOSE_RECEIVED" 207 208 def read(self, n: int) -> bytes: 209 """Read received data. Delegates to the input stream.""" 210 return self._input.read(n) 211 212 def close(self) -> tuple[int, bytes]: 213 """Send CLOSE, transition to CLOSE_SENT. 214 215 Returns (seq_num, close_data). 216 """ 217 self._require_state("ESTABLISHED") 218 self._state = "CLOSE_SENT" 219 close_data = struct.pack("!I", self.local_id) 220 return 0, close_data 221 222 def receive_close(self): 223 """Handle receiving a CLOSE from remote. Transition to CLOSED.""" 224 if self._state == "CLOSE_SENT": 225 self._state = "CLOSED" 226 elif self._state == "ESTABLISHED": 227 self._state = "CLOSE_RECEIVED" 228 else: 229 self._state = "CLOSED"