A Python port of the Invisible Internet Project (I2P)
1"""Streaming send/receive with retransmits for I2P streaming protocol.
2
3Provides MessageInputStream (receive-side buffering and ACK generation),
4MessageOutputStream (send-side chunking and retransmit tracking), and
5StreamSession (connection state machine wrapping both streams).
6"""
7
8import random
9import struct
10import time
11
12
13class MessageInputStream:
14 """Receive-side stream buffer with out-of-order reassembly and ACK generation."""
15
16 def __init__(self):
17 self._buffer: dict[int, bytes] = {}
18 self._next_expected_seq: int = 0
19 self._read_buffer: bytearray = bytearray()
20 self._close_seq: int | None = None
21
22 def receive_packet(self, seq_num: int, data: bytes, is_close: bool = False):
23 """Buffer a received packet by sequence number."""
24 self._buffer[seq_num] = data
25 if is_close:
26 self._close_seq = seq_num
27 # Advance contiguous data into the read buffer
28 self._flush_contiguous()
29
30 def _flush_contiguous(self):
31 """Move contiguous in-order packets from buffer to read buffer."""
32 while self._next_expected_seq in self._buffer:
33 self._read_buffer.extend(self._buffer.pop(self._next_expected_seq))
34 self._next_expected_seq += 1
35
36 def read(self, n: int) -> bytes:
37 """Return up to n bytes of in-order data. Non-blocking."""
38 result = bytes(self._read_buffer[:n])
39 del self._read_buffer[:n]
40 return result
41
42 def readable_bytes(self) -> int:
43 """Number of bytes available to read in order."""
44 return len(self._read_buffer)
45
46 def generate_acks(self) -> tuple[int, list[int]]:
47 """Generate ACK information.
48
49 Returns:
50 (ack_through, nack_list) where ack_through is the highest
51 contiguous sequence number received, and nack_list contains
52 missing sequence numbers in gaps.
53 """
54 ack_through = self._next_expected_seq - 1
55
56 nacks = []
57 if self._buffer:
58 max_buffered = max(self._buffer.keys())
59 for seq in range(self._next_expected_seq, max_buffered):
60 if seq not in self._buffer:
61 nacks.append(seq)
62
63 return ack_through, nacks
64
65 def is_complete(self) -> bool:
66 """True if CLOSE has been received and all data has been consumed."""
67 if self._close_seq is None:
68 return False
69 # All data through close_seq must have been flushed and read
70 return (self._next_expected_seq > self._close_seq
71 and len(self._read_buffer) == 0)
72
73
74class MessageOutputStream:
75 """Send-side stream with chunking and retransmit tracking."""
76
77 def __init__(self, window_size: int = 64):
78 self._window_size = window_size
79 self._next_seq: int = 0
80 # Outstanding packets: seq -> (data, send_time_ms)
81 self._outstanding: dict[int, tuple[bytes, int]] = {}
82
83 def write(self, data: bytes, max_packet_size: int = 1024) -> list[tuple[int, bytes]]:
84 """Split data into chunks, assign sequence numbers.
85
86 Returns list of (seq_num, chunk) pairs. Adds them to the
87 outstanding/retransmit queue.
88 """
89 now_ms = int(time.time() * 1000)
90 packets = []
91 offset = 0
92 while offset < len(data):
93 chunk = data[offset:offset + max_packet_size]
94 seq = self._next_seq
95 self._next_seq += 1
96 self._outstanding[seq] = (chunk, now_ms)
97 packets.append((seq, chunk))
98 offset += max_packet_size
99 return packets
100
101 def on_ack(self, ack_through: int, nacks: list[int] | None = None):
102 """Remove acked packets from the retransmit queue.
103
104 All seq <= ack_through AND not in nacks are considered acked.
105 """
106 if nacks is None:
107 nacks = []
108 nack_set = set(nacks)
109 to_remove = [
110 seq for seq in self._outstanding
111 if seq <= ack_through and seq not in nack_set
112 ]
113 for seq in to_remove:
114 del self._outstanding[seq]
115
116 def get_retransmit_packets(self, now_ms: int, rto_ms: int = 1000) -> list[tuple[int, bytes]]:
117 """Return packets sent more than rto_ms ago that haven't been acked."""
118 result = []
119 for seq in sorted(self._outstanding):
120 data, send_time = self._outstanding[seq]
121 if now_ms - send_time >= rto_ms:
122 result.append((seq, data))
123 # Update send time to now to avoid immediate re-retransmit
124 self._outstanding[seq] = (data, now_ms)
125 return result
126
127 def pending_count(self) -> int:
128 """Number of unacked packets."""
129 return len(self._outstanding)
130
131
132class StreamSession:
133 """Connection state machine wrapping input and output streams.
134
135 States: NEW, SYN_SENT, SYN_RECEIVED, ESTABLISHED, CLOSE_SENT,
136 CLOSE_RECEIVED, CLOSED.
137 """
138
139 _VALID_STATES = frozenset({
140 "NEW", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED",
141 "CLOSE_SENT", "CLOSE_RECEIVED", "CLOSED",
142 })
143
144 def __init__(self, local_id: int | None = None):
145 if local_id is None:
146 local_id = random.randint(0, 2**32 - 1)
147 self.local_id = local_id
148 self.remote_id: int | None = None
149 self._state = "NEW"
150 self._input = MessageInputStream()
151 self._output = MessageOutputStream()
152
153 @property
154 def state(self) -> str:
155 return self._state
156
157 def _require_state(self, *states: str):
158 if self._state not in states:
159 raise RuntimeError(
160 f"Invalid operation in state {self._state}, "
161 f"expected one of {states}"
162 )
163
164 def connect(self) -> tuple[int, bytes]:
165 """Create SYN packet, transition to SYN_SENT.
166
167 Returns (seq_num, syn_data) where syn_data is a minimal SYN marker.
168 """
169 self._require_state("NEW")
170 self._state = "SYN_SENT"
171 seq = 0
172 # SYN data encodes the local stream ID
173 syn_data = struct.pack("!I", self.local_id)
174 return seq, syn_data
175
176 def accept(self, remote_id: int) -> tuple[int, bytes]:
177 """Accept incoming SYN, transition to ESTABLISHED.
178
179 Returns (seq_num, syn_ack_data).
180 """
181 self._require_state("NEW")
182 self.remote_id = remote_id
183 self._state = "ESTABLISHED"
184 seq = 0
185 syn_ack_data = struct.pack("!II", self.local_id, remote_id)
186 return seq, syn_ack_data
187
188 def receive_syn_ack(self, remote_id: int):
189 """Process SYN-ACK, transition from SYN_SENT to ESTABLISHED."""
190 self._require_state("SYN_SENT")
191 self.remote_id = remote_id
192 self._state = "ESTABLISHED"
193
194 def send(self, data: bytes) -> list[tuple[int, bytes]]:
195 """Send data. Delegates to the output stream."""
196 self._require_state("ESTABLISHED")
197 return self._output.write(data)
198
199 def receive(self, seq_num: int, data: bytes, is_close: bool = False):
200 """Receive data packet. Delegates to the input stream."""
201 self._input.receive_packet(seq_num, data, is_close=is_close)
202 if is_close:
203 if self._state == "CLOSE_SENT":
204 self._state = "CLOSED"
205 else:
206 self._state = "CLOSE_RECEIVED"
207
208 def read(self, n: int) -> bytes:
209 """Read received data. Delegates to the input stream."""
210 return self._input.read(n)
211
212 def close(self) -> tuple[int, bytes]:
213 """Send CLOSE, transition to CLOSE_SENT.
214
215 Returns (seq_num, close_data).
216 """
217 self._require_state("ESTABLISHED")
218 self._state = "CLOSE_SENT"
219 close_data = struct.pack("!I", self.local_id)
220 return 0, close_data
221
222 def receive_close(self):
223 """Handle receiving a CLOSE from remote. Transition to CLOSED."""
224 if self._state == "CLOSE_SENT":
225 self._state = "CLOSED"
226 elif self._state == "ESTABLISHED":
227 self._state = "CLOSE_RECEIVED"
228 else:
229 self._state = "CLOSED"