A Python port of the Invisible Internet Project (I2P)
1"""Tier 4 protocol gap tests: FragmentHandler + FragmentBuilder.
2
3Tests tunnel message fragmentation and reassembly per I2P spec.
41024-byte tunnel data blocks, multi-fragment messages, expiry.
5"""
6
7import os
8import time
9
10import pytest
11
12from i2p_tunnel.fragment_handler import FragmentHandler, FragmentedMessage
13from i2p_tunnel.fragment_builder import FragmentBuilder
14
15
16# -- Constants --
17
18class TestConstants:
19 def test_block_size(self):
20 assert FragmentBuilder.BLOCK_SIZE == 1024
21
22 def test_delivery_types(self):
23 assert FragmentBuilder.DELIVERY_LOCAL == 0
24 assert FragmentBuilder.DELIVERY_TUNNEL == 1
25 assert FragmentBuilder.DELIVERY_ROUTER == 2
26
27 def test_max_defragment_time(self):
28 assert FragmentHandler.MAX_DEFRAGMENT_TIME_MS == 60_000
29
30
31# -- FragmentBuilder: single-fragment messages --
32
33class TestBuildSingleFragment:
34 def test_small_message_single_block(self):
35 """Message < max payload -> single 1024-byte block."""
36 msg = os.urandom(200)
37 blocks = FragmentBuilder.build(msg, delivery_type=0)
38 assert len(blocks) == 1
39 assert len(blocks[0]) == 1024
40
41 def test_single_fragment_control_byte(self):
42 """Single-fragment control byte: MSB=0, not fragmented."""
43 msg = os.urandom(100)
44 blocks = FragmentBuilder.build(msg, delivery_type=0)
45 block = blocks[0]
46 # Find first non-zero byte after padding area, preceded by 0x00 terminator
47 # Block format: padding + 0x00 + control + size(2) + payload
48 # For local delivery, no fragmented flag
49 # Control byte should have delivery_type=0, fragmented=0
50 # Just verify it's a single block
51 assert len(blocks) == 1
52
53 def test_local_delivery(self):
54 """Local delivery (type 0) roundtrip."""
55 msg = os.urandom(200)
56 blocks = FragmentBuilder.build(msg, delivery_type=0)
57 handler = FragmentHandler()
58 results = []
59 for block in blocks:
60 result = handler.receive_block(block)
61 if result is not None:
62 results.append(result)
63 assert len(results) == 1
64 assert results[0].payload == msg
65
66 def test_tunnel_delivery(self):
67 """Tunnel delivery (type 1) with tunnel_id and router_hash."""
68 msg = os.urandom(150)
69 router_hash = os.urandom(32)
70 blocks = FragmentBuilder.build(
71 msg,
72 delivery_type=1,
73 target_tunnel_id=0xDEADBEEF,
74 target_router_hash=router_hash,
75 )
76 handler = FragmentHandler()
77 result = handler.receive_block(blocks[0])
78 assert result is not None
79 assert result.payload == msg
80 assert result.delivery_type == 1
81 assert result.tunnel_id == 0xDEADBEEF
82 assert result.router_hash == router_hash
83
84 def test_router_delivery(self):
85 """Router delivery (type 2) with router_hash."""
86 msg = os.urandom(150)
87 router_hash = os.urandom(32)
88 blocks = FragmentBuilder.build(
89 msg,
90 delivery_type=2,
91 target_router_hash=router_hash,
92 )
93 handler = FragmentHandler()
94 result = handler.receive_block(blocks[0])
95 assert result is not None
96 assert result.payload == msg
97 assert result.delivery_type == 2
98 assert result.router_hash == router_hash
99
100
101# -- FragmentBuilder: multi-fragment messages --
102
103class TestBuildMultiFragment:
104 def test_large_message_multiple_blocks(self):
105 """Message > max single payload -> multiple 1024-byte blocks."""
106 msg = os.urandom(3000)
107 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=0x12345678)
108 assert len(blocks) > 1
109 for block in blocks:
110 assert len(block) == 1024
111
112 def test_all_blocks_1024_bytes(self):
113 """Every block must be exactly 1024 bytes."""
114 for size in [500, 1000, 2000, 5000, 10000]:
115 msg = os.urandom(size)
116 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=1)
117 for i, block in enumerate(blocks):
118 assert len(block) == 1024, f"Block {i} for msg size {size} is {len(block)} bytes"
119
120
121# -- FragmentHandler: defragmentation --
122
123class TestDefragmentSingle:
124 def test_single_fragment_immediate(self):
125 """Single fragment -> immediate complete message."""
126 msg = os.urandom(200)
127 blocks = FragmentBuilder.build(msg, delivery_type=0)
128 handler = FragmentHandler()
129 result = handler.receive_block(blocks[0])
130 assert result is not None
131 assert result.payload == msg
132
133
134class TestDefragmentMulti:
135 def test_multi_fragment_in_order(self):
136 """Multiple fragments in order -> complete message."""
137 msg = os.urandom(3000)
138 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=42)
139 handler = FragmentHandler()
140 results = []
141 for block in blocks:
142 result = handler.receive_block(block)
143 if result is not None:
144 results.append(result)
145 assert len(results) == 1
146 assert results[0].payload == msg
147
148 def test_multi_fragment_out_of_order(self):
149 """Fragments arrive out of order -> still reassembles."""
150 msg = os.urandom(3000)
151 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=99)
152 assert len(blocks) >= 3
153 handler = FragmentHandler()
154
155 # Send last, first, middle
156 reordered = [blocks[-1]] + [blocks[0]] + blocks[1:-1]
157 results = []
158 for block in reordered:
159 result = handler.receive_block(block)
160 if result is not None:
161 results.append(result)
162 assert len(results) == 1
163 assert results[0].payload == msg
164
165 def test_two_interleaved_messages(self):
166 """Two messages interleaved -> both reassemble correctly."""
167 msg_a = os.urandom(2500)
168 msg_b = os.urandom(2500)
169 blocks_a = FragmentBuilder.build(msg_a, delivery_type=0, message_id=100)
170 blocks_b = FragmentBuilder.build(msg_b, delivery_type=0, message_id=200)
171
172 handler = FragmentHandler()
173 results = []
174 # Interleave blocks
175 max_len = max(len(blocks_a), len(blocks_b))
176 for i in range(max_len):
177 if i < len(blocks_a):
178 r = handler.receive_block(blocks_a[i])
179 if r:
180 results.append(r)
181 if i < len(blocks_b):
182 r = handler.receive_block(blocks_b[i])
183 if r:
184 results.append(r)
185 assert len(results) == 2
186 payloads = {r.payload for r in results}
187 assert msg_a in payloads
188 assert msg_b in payloads
189
190
191class TestDefragmentExpiry:
192 def test_expire_incomplete(self):
193 """Incomplete fragments are cleaned up after timeout."""
194 msg = os.urandom(3000)
195 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=77)
196 handler = FragmentHandler()
197
198 # Send only the first block (incomplete message)
199 handler.receive_block(blocks[0])
200 assert len(handler.pending) == 1
201
202 # Expire with future timestamp
203 dropped = handler.expire_old(time.time() * 1000 + 61_000)
204 assert dropped == 1
205 assert len(handler.pending) == 0
206
207 def test_no_expire_recent(self):
208 """Recent fragments are not expired."""
209 msg = os.urandom(3000)
210 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=88)
211 handler = FragmentHandler()
212 handler.receive_block(blocks[0])
213
214 # Try to expire at current time (should not expire)
215 dropped = handler.expire_old(time.time() * 1000)
216 assert dropped == 0
217 assert len(handler.pending) == 1
218
219
220# -- Roundtrip tests --
221
222class TestRoundtrip:
223 @pytest.mark.parametrize("size", [1, 100, 500, 996, 997, 1500, 3000, 10000])
224 def test_roundtrip_various_sizes(self, size):
225 """Fragment and defragment messages of various sizes."""
226 msg = os.urandom(size)
227 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=size)
228 handler = FragmentHandler()
229 results = []
230 for block in blocks:
231 result = handler.receive_block(block)
232 if result is not None:
233 results.append(result)
234 assert len(results) == 1
235 assert results[0].payload == msg
236
237 def test_roundtrip_with_delivery_info(self):
238 """Delivery info preserved through fragment/defragment cycle."""
239 msg = os.urandom(2500)
240 router_hash = os.urandom(32)
241 blocks = FragmentBuilder.build(
242 msg,
243 delivery_type=1,
244 target_tunnel_id=0xCAFEBABE,
245 target_router_hash=router_hash,
246 message_id=555,
247 )
248 handler = FragmentHandler()
249 results = []
250 for block in blocks:
251 result = handler.receive_block(block)
252 if result is not None:
253 results.append(result)
254 assert len(results) == 1
255 r = results[0]
256 assert r.payload == msg
257 assert r.delivery_type == 1
258 assert r.tunnel_id == 0xCAFEBABE
259 assert r.router_hash == router_hash