A Python port of the Invisible Internet Project (I2P)
1"""NTCP2 real data-phase transport with SipHash-obfuscated frame lengths.
2
3Implements the actual I2P NTCP2 data phase framing:
4- 2-byte SipHash-obfuscated frame lengths
5- ChaCha20-Poly1305 encrypted payloads with auto-incrementing nonce
6- Block-based payload format (multiple blocks per frame)
7
8This replaces the simplified 4-byte length-prefixed framing in
9ntcp2_connection.py with the real NTCP2 wire format.
10"""
11
12import asyncio
13import struct
14import time
15
16from i2p_crypto.noise import CipherState
17from i2p_crypto.siphash import SipHashRatchet
18from i2p_transport.ntcp2_blocks import (
19 NTCP2Block,
20 datetime_block,
21 encode_blocks,
22 decode_blocks,
23 i2np_block,
24 padding_block,
25 termination_block,
26)
27
28
29class NTCP2RealConnection:
30 """Encrypted NTCP2 connection with SipHash length obfuscation.
31
32 After the Noise handshake completes and produces CipherState objects
33 and SipHash ratchets, this class handles the data-phase framing:
34
35 Wire format per frame:
36 [2 bytes obfuscated length] [encrypted payload including 16-byte MAC]
37
38 The obfuscated length is the actual encrypted payload length XORed
39 with the low 16 bits of the next SipHash ratchet output.
40 """
41
42 def __init__(
43 self,
44 reader: asyncio.StreamReader,
45 writer: asyncio.StreamWriter,
46 cipher_send: CipherState | None,
47 cipher_recv: CipherState | None,
48 siphash_send: SipHashRatchet | None,
49 siphash_recv: SipHashRatchet | None,
50 remote_hash: bytes = b"",
51 ):
52 self._reader = reader
53 self._writer = writer
54 self._cipher_send = cipher_send
55 self._cipher_recv = cipher_recv
56 self._siphash_send = siphash_send
57 self._siphash_recv = siphash_recv
58 self._remote_hash = remote_hash
59 self._frames_sent = 0
60 self._frames_received = 0
61 self._last_write_time = time.monotonic()
62 self._last_read_time = time.monotonic()
63
64 @property
65 def remote_hash(self) -> bytes:
66 return self._remote_hash
67
68 async def send_frame(self, blocks: list[NTCP2Block]) -> None:
69 """Encode blocks, encrypt, obfuscate length, and send.
70
71 Steps:
72 1. Encode blocks to payload bytes.
73 2. Encrypt with cipher_send (ChaCha20-Poly1305, no AD).
74 Nonce auto-increments inside CipherState.
75 3. frame_len = len(encrypted) which includes 16-byte MAC.
76 4. Obfuscate frame_len with siphash_send.
77 5. Write [2-byte obfuscated length][encrypted payload] to wire.
78 """
79 plaintext = encode_blocks(blocks)
80 assert self._cipher_send is not None and self._siphash_send is not None
81 encrypted = self._cipher_send.encrypt_with_ad(b"", plaintext)
82 frame_len = len(encrypted)
83 obfuscated = self._siphash_send.obfuscate_length(frame_len)
84 self._writer.write(obfuscated + encrypted)
85 await self._writer.drain()
86 self._frames_sent += 1
87 self._last_write_time = time.monotonic()
88
89 async def recv_frame(self) -> list[NTCP2Block]:
90 """Read a frame from the wire, deobfuscate, decrypt, decode blocks.
91
92 Steps:
93 1. Read 2 bytes (obfuscated length).
94 2. Deobfuscate with siphash_recv to get frame_len.
95 3. Read frame_len bytes (encrypted payload + MAC).
96 4. Decrypt with cipher_recv (no AD).
97 5. Decode blocks from decrypted payload.
98 """
99 obf_bytes = await self._reader.readexactly(2)
100 assert self._siphash_recv is not None and self._cipher_recv is not None
101 frame_len = self._siphash_recv.deobfuscate_length(obf_bytes)
102 encrypted = await self._reader.readexactly(frame_len)
103 plaintext = self._cipher_recv.decrypt_with_ad(b"", encrypted)
104 self._frames_received += 1
105 self._last_read_time = time.monotonic()
106 return decode_blocks(plaintext)
107
108 async def send_i2np(self, message: bytes) -> None:
109 """Send an I2NP message wrapped with DateTime and Padding blocks.
110
111 Creates a frame containing:
112 - DateTime block (current time)
113 - I2NP block (the message)
114 - Padding block (16 random bytes)
115 """
116 now = int(time.time())
117 blocks = [
118 datetime_block(now),
119 i2np_block(message),
120 padding_block(16),
121 ]
122 await self.send_frame(blocks)
123
124 async def close(self) -> None:
125 """Send a Termination block and close the connection.
126
127 The termination block includes the count of frames received
128 and reason code 0 (normal shutdown).
129 """
130 blocks = [termination_block(self._frames_received, 0)]
131 await self.send_frame(blocks)
132 self._writer.close()
133 await self._writer.wait_closed()
134
135 def seconds_since_last_write(self) -> float:
136 """Seconds since the last frame was sent."""
137 return time.monotonic() - self._last_write_time
138
139 def seconds_since_last_read(self) -> float:
140 """Seconds since the last frame was received."""
141 return time.monotonic() - self._last_read_time
142
143 def is_alive(self) -> bool:
144 """Check if the connection is still open."""
145 return not self._writer.is_closing()