"""NTCP2 real data-phase transport with SipHash-obfuscated frame lengths. Implements the actual I2P NTCP2 data phase framing: - 2-byte SipHash-obfuscated frame lengths - ChaCha20-Poly1305 encrypted payloads with auto-incrementing nonce - Block-based payload format (multiple blocks per frame) This replaces the simplified 4-byte length-prefixed framing in ntcp2_connection.py with the real NTCP2 wire format. """ import asyncio import struct import time from i2p_crypto.noise import CipherState from i2p_crypto.siphash import SipHashRatchet from i2p_transport.ntcp2_blocks import ( NTCP2Block, datetime_block, encode_blocks, decode_blocks, i2np_block, padding_block, termination_block, ) class NTCP2RealConnection: """Encrypted NTCP2 connection with SipHash length obfuscation. After the Noise handshake completes and produces CipherState objects and SipHash ratchets, this class handles the data-phase framing: Wire format per frame: [2 bytes obfuscated length] [encrypted payload including 16-byte MAC] The obfuscated length is the actual encrypted payload length XORed with the low 16 bits of the next SipHash ratchet output. """ def __init__( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, cipher_send: CipherState | None, cipher_recv: CipherState | None, siphash_send: SipHashRatchet | None, siphash_recv: SipHashRatchet | None, remote_hash: bytes = b"", ): self._reader = reader self._writer = writer self._cipher_send = cipher_send self._cipher_recv = cipher_recv self._siphash_send = siphash_send self._siphash_recv = siphash_recv self._remote_hash = remote_hash self._frames_sent = 0 self._frames_received = 0 self._last_write_time = time.monotonic() self._last_read_time = time.monotonic() @property def remote_hash(self) -> bytes: return self._remote_hash async def send_frame(self, blocks: list[NTCP2Block]) -> None: """Encode blocks, encrypt, obfuscate length, and send. Steps: 1. Encode blocks to payload bytes. 2. Encrypt with cipher_send (ChaCha20-Poly1305, no AD). Nonce auto-increments inside CipherState. 3. frame_len = len(encrypted) which includes 16-byte MAC. 4. Obfuscate frame_len with siphash_send. 5. Write [2-byte obfuscated length][encrypted payload] to wire. """ plaintext = encode_blocks(blocks) assert self._cipher_send is not None and self._siphash_send is not None encrypted = self._cipher_send.encrypt_with_ad(b"", plaintext) frame_len = len(encrypted) obfuscated = self._siphash_send.obfuscate_length(frame_len) self._writer.write(obfuscated + encrypted) await self._writer.drain() self._frames_sent += 1 self._last_write_time = time.monotonic() async def recv_frame(self) -> list[NTCP2Block]: """Read a frame from the wire, deobfuscate, decrypt, decode blocks. Steps: 1. Read 2 bytes (obfuscated length). 2. Deobfuscate with siphash_recv to get frame_len. 3. Read frame_len bytes (encrypted payload + MAC). 4. Decrypt with cipher_recv (no AD). 5. Decode blocks from decrypted payload. """ obf_bytes = await self._reader.readexactly(2) assert self._siphash_recv is not None and self._cipher_recv is not None frame_len = self._siphash_recv.deobfuscate_length(obf_bytes) encrypted = await self._reader.readexactly(frame_len) plaintext = self._cipher_recv.decrypt_with_ad(b"", encrypted) self._frames_received += 1 self._last_read_time = time.monotonic() return decode_blocks(plaintext) async def send_i2np(self, message: bytes) -> None: """Send an I2NP message wrapped with DateTime and Padding blocks. Creates a frame containing: - DateTime block (current time) - I2NP block (the message) - Padding block (16 random bytes) """ now = int(time.time()) blocks = [ datetime_block(now), i2np_block(message), padding_block(16), ] await self.send_frame(blocks) async def close(self) -> None: """Send a Termination block and close the connection. The termination block includes the count of frames received and reason code 0 (normal shutdown). """ blocks = [termination_block(self._frames_received, 0)] await self.send_frame(blocks) self._writer.close() await self._writer.wait_closed() def seconds_since_last_write(self) -> float: """Seconds since the last frame was sent.""" return time.monotonic() - self._last_write_time def seconds_since_last_read(self) -> float: """Seconds since the last frame was received.""" return time.monotonic() - self._last_read_time def is_alive(self) -> bool: """Check if the connection is still open.""" return not self._writer.is_closing()