A Python port of the Invisible Internet Project (I2P)
1"""Tests for NTCP2 real data-phase transport with SipHash-obfuscated frame lengths."""
2
3import asyncio
4import struct
5import time
6
7import pytest
8
9from i2p_crypto.noise import CipherState
10from i2p_crypto.siphash import SipHashRatchet
11from i2p_transport.ntcp2_blocks import (
12 BLOCK_DATETIME,
13 BLOCK_I2NP,
14 BLOCK_PADDING,
15 BLOCK_TERMINATION,
16 NTCP2Block,
17 decode_blocks,
18 encode_blocks,
19)
20from i2p_transport.ntcp2_real_connection import NTCP2RealConnection
21
22
23# ---------------------------------------------------------------------------
24# Mock reader / writer
25# ---------------------------------------------------------------------------
26
27class MockWriter:
28 def __init__(self):
29 self.data = bytearray()
30 self._closed = False
31
32 def write(self, data):
33 self.data.extend(data)
34
35 async def drain(self):
36 pass
37
38 def close(self):
39 self._closed = True
40
41 async def wait_closed(self):
42 pass
43
44 def is_closing(self):
45 return self._closed
46
47
48class MockReader:
49 def __init__(self, data: bytes = b""):
50 self._data = bytearray(data)
51 self._pos = 0
52
53 def feed(self, data: bytes):
54 self._data.extend(data)
55
56 async def readexactly(self, n):
57 if self._pos + n > len(self._data):
58 raise asyncio.IncompleteReadError(
59 bytes(self._data[self._pos:]), n
60 )
61 result = bytes(self._data[self._pos : self._pos + n])
62 self._pos += n
63 return result
64
65
66# ---------------------------------------------------------------------------
67# Helpers to create matched cipher + siphash pairs
68# ---------------------------------------------------------------------------
69
70def _make_cipher_pair():
71 """Return (cipher_send, cipher_recv) sharing the same key."""
72 key = b"\x01" * 32
73 return CipherState(key), CipherState(key)
74
75
76def _make_siphash_pair():
77 """Return (siphash_send, siphash_recv) with identical state."""
78 k0, k1, iv = 0x0706050403020100, 0x0F0E0D0C0B0A0908, 0
79 return SipHashRatchet(k0, k1, iv), SipHashRatchet(k0, k1, iv)
80
81
82# ---------------------------------------------------------------------------
83# Tests
84# ---------------------------------------------------------------------------
85
86class TestConstruction:
87 def test_construction_stores_attributes(self):
88 reader, writer = MockReader(), MockWriter()
89 cs, cr = _make_cipher_pair()
90 ss, sr = _make_siphash_pair()
91 conn = NTCP2RealConnection(reader, writer, cs, cr, ss, sr, b"\xaa" * 32)
92 assert conn.remote_hash == b"\xaa" * 32
93
94 def test_construction_default_remote_hash(self):
95 reader, writer = MockReader(), MockWriter()
96 cs, cr = _make_cipher_pair()
97 ss, sr = _make_siphash_pair()
98 conn = NTCP2RealConnection(reader, writer, cs, cr, ss, sr)
99 assert conn.remote_hash == b""
100
101
102class TestSendRecvRoundtrip:
103 def test_single_frame_roundtrip(self):
104 """send_frame -> wire bytes -> recv_frame produces same blocks."""
105 async def _run():
106 send_key = b"\x11" * 32
107 cs = CipherState(send_key)
108 cr = CipherState(send_key)
109 ss, sr = _make_siphash_pair()
110
111 writer = MockWriter()
112 conn_send = NTCP2RealConnection(
113 MockReader(), writer, cs, None, ss, None
114 )
115
116 blocks = [NTCP2Block(BLOCK_I2NP, b"hello i2np")]
117 await conn_send.send_frame(blocks)
118
119 wire = bytes(writer.data)
120 assert len(wire) > 2
121
122 reader = MockReader(wire)
123 conn_recv = NTCP2RealConnection(
124 reader, MockWriter(), None, cr, None, sr
125 )
126 got = await conn_recv.recv_frame()
127
128 assert len(got) == 1
129 assert got[0].block_type == BLOCK_I2NP
130 assert got[0].data == b"hello i2np"
131
132 asyncio.run(_run())
133
134 def test_frame_format_structure(self):
135 """Wire format: [2 bytes obfuscated length][encrypted payload + 16 MAC]."""
136 async def _run():
137 send_key = b"\x22" * 32
138 cs = CipherState(send_key)
139 ss, _ = _make_siphash_pair()
140
141 writer = MockWriter()
142 conn = NTCP2RealConnection(MockReader(), writer, cs, None, ss, None)
143
144 blocks = [NTCP2Block(BLOCK_I2NP, b"x" * 10)]
145 await conn.send_frame(blocks)
146
147 wire = bytes(writer.data)
148 encrypted_payload = wire[2:]
149
150 plaintext = encode_blocks(blocks)
151 assert len(encrypted_payload) == len(plaintext) + 16
152
153 asyncio.run(_run())
154
155
156class TestSipHashObfuscation:
157 def test_obfuscated_length_differs_from_plain(self):
158 """The 2-byte length on the wire should differ from the actual length."""
159 async def _run():
160 send_key = b"\x33" * 32
161 cs = CipherState(send_key)
162 ss, _ = _make_siphash_pair()
163
164 writer = MockWriter()
165 conn = NTCP2RealConnection(MockReader(), writer, cs, None, ss, None)
166
167 blocks = [NTCP2Block(BLOCK_I2NP, b"test")]
168 await conn.send_frame(blocks)
169
170 wire = bytes(writer.data)
171 obf_len = struct.unpack("!H", wire[:2])[0]
172 actual_len = len(wire) - 2
173 assert obf_len != actual_len
174
175 asyncio.run(_run())
176
177
178class TestMultiFrameExchange:
179 def test_nonce_increments_and_siphash_ratchets(self):
180 """Multiple frames: each uses next nonce and next SipHash value."""
181 async def _run():
182 send_key = b"\x44" * 32
183 cs = CipherState(send_key)
184 cr = CipherState(send_key)
185 ss1, sr1 = _make_siphash_pair()
186
187 writer = MockWriter()
188 conn_send = NTCP2RealConnection(
189 MockReader(), writer, cs, None, ss1, None
190 )
191
192 await conn_send.send_frame([NTCP2Block(BLOCK_I2NP, b"frame1")])
193 wire1_len = len(writer.data)
194 await conn_send.send_frame([NTCP2Block(BLOCK_I2NP, b"frame2")])
195 wire_total = bytes(writer.data)
196
197 wire1 = wire_total[:wire1_len]
198 wire2 = wire_total[wire1_len:]
199
200 assert wire1[:2] != wire2[:2]
201
202 reader = MockReader(wire_total)
203 conn_recv = NTCP2RealConnection(
204 reader, MockWriter(), None, cr, None, sr1
205 )
206 got1 = await conn_recv.recv_frame()
207 got2 = await conn_recv.recv_frame()
208
209 assert got1[0].data == b"frame1"
210 assert got2[0].data == b"frame2"
211
212 asyncio.run(_run())
213
214
215class TestSendI2NP:
216 def test_send_i2np_includes_datetime_and_padding(self):
217 """send_i2np wraps message in I2NP block with DateTime and Padding."""
218 async def _run():
219 send_key = b"\x55" * 32
220 cs = CipherState(send_key)
221 cr = CipherState(send_key)
222 ss, sr = _make_siphash_pair()
223
224 writer = MockWriter()
225 conn_send = NTCP2RealConnection(
226 MockReader(), writer, cs, None, ss, None
227 )
228 await conn_send.send_i2np(b"i2np-payload-data")
229
230 wire = bytes(writer.data)
231 reader = MockReader(wire)
232 conn_recv = NTCP2RealConnection(
233 reader, MockWriter(), None, cr, None, sr
234 )
235 blocks = await conn_recv.recv_frame()
236
237 types = [b.block_type for b in blocks]
238 assert BLOCK_DATETIME in types
239 assert BLOCK_I2NP in types
240 assert BLOCK_PADDING in types
241
242 i2np_blocks = [b for b in blocks if b.block_type == BLOCK_I2NP]
243 assert len(i2np_blocks) == 1
244 assert i2np_blocks[0].data == b"i2np-payload-data"
245
246 dt_blocks = [b for b in blocks if b.block_type == BLOCK_DATETIME]
247 assert len(dt_blocks) == 1
248 assert len(dt_blocks[0].data) == 4
249
250 asyncio.run(_run())
251
252
253class TestBlockBasedPayloads:
254 def test_multi_block_frame(self):
255 """A frame with DateTime + I2NP + Padding blocks roundtrips."""
256 async def _run():
257 send_key = b"\x66" * 32
258 cs = CipherState(send_key)
259 cr = CipherState(send_key)
260 ss, sr = _make_siphash_pair()
261
262 blocks = [
263 NTCP2Block(BLOCK_DATETIME, struct.pack("!I", 1000000)),
264 NTCP2Block(BLOCK_I2NP, b"msg-content"),
265 NTCP2Block(BLOCK_PADDING, b"\x00" * 16),
266 ]
267
268 writer = MockWriter()
269 conn_send = NTCP2RealConnection(
270 MockReader(), writer, cs, None, ss, None
271 )
272 await conn_send.send_frame(blocks)
273
274 wire = bytes(writer.data)
275 reader = MockReader(wire)
276 conn_recv = NTCP2RealConnection(
277 reader, MockWriter(), None, cr, None, sr
278 )
279 got = await conn_recv.recv_frame()
280
281 assert len(got) == 3
282 assert got[0].block_type == BLOCK_DATETIME
283 assert got[0].data == struct.pack("!I", 1000000)
284 assert got[1].block_type == BLOCK_I2NP
285 assert got[1].data == b"msg-content"
286 assert got[2].block_type == BLOCK_PADDING
287 assert got[2].data == b"\x00" * 16
288
289 asyncio.run(_run())
290
291
292class TestClose:
293 def test_close_full_roundtrip(self):
294 """Full roundtrip: send frames, close, verify termination on recv side."""
295 async def _run():
296 send_key = b"\x88" * 32
297 cs = CipherState(send_key)
298 cr = CipherState(send_key)
299 ss, sr = _make_siphash_pair()
300
301 writer = MockWriter()
302 conn_send = NTCP2RealConnection(
303 MockReader(), writer, cs, None, ss, None,
304 )
305 await conn_send.send_frame([NTCP2Block(BLOCK_I2NP, b"msg1")])
306 await conn_send.close()
307
308 wire = bytes(writer.data)
309 reader = MockReader(wire)
310 conn_recv = NTCP2RealConnection(
311 reader, MockWriter(), None, cr, None, sr
312 )
313
314 blocks1 = await conn_recv.recv_frame()
315 assert blocks1[0].block_type == BLOCK_I2NP
316 assert blocks1[0].data == b"msg1"
317
318 blocks2 = await conn_recv.recv_frame()
319 term_blocks = [b for b in blocks2 if b.block_type == BLOCK_TERMINATION]
320 assert len(term_blocks) == 1
321
322 asyncio.run(_run())
323
324 def test_close_closes_writer(self):
325 """close() should close the underlying writer."""
326 async def _run():
327 send_key = b"\x99" * 32
328 cs = CipherState(send_key)
329 ss, _ = _make_siphash_pair()
330
331 writer = MockWriter()
332 conn = NTCP2RealConnection(
333 MockReader(), writer, cs, None, ss, None,
334 )
335 await conn.close()
336 assert writer._closed
337
338 asyncio.run(_run())
339
340
341class TestIsAlive:
342 def test_is_alive_initially_true(self):
343 send_key = b"\xaa" * 32
344 cs = CipherState(send_key)
345 ss, _ = _make_siphash_pair()
346 writer = MockWriter()
347 conn = NTCP2RealConnection(MockReader(), writer, cs, None, ss, None)
348 assert conn.is_alive()
349
350 def test_is_alive_false_after_close(self):
351 async def _run():
352 send_key = b"\xbb" * 32
353 cs = CipherState(send_key)
354 ss, _ = _make_siphash_pair()
355 writer = MockWriter()
356 conn = NTCP2RealConnection(MockReader(), writer, cs, None, ss, None)
357 await conn.close()
358 assert not conn.is_alive()
359
360 asyncio.run(_run())