"""NTCP2 asyncio connection layer. Provides encrypted frame transport over an established NTCP2 session. After the Noise handshake completes, this class handles sending and receiving length-prefixed, encrypted NTCP2 frames. """ import asyncio import struct from i2p_crypto.noise import CipherState from i2p_transport.ntcp2 import NTCP2Frame, FrameType class NTCP2Connection: """Encrypted NTCP2 connection over asyncio streams. Wraps a pair of asyncio StreamReader/StreamWriter with Noise CipherState objects to provide encrypted frame transport. """ def __init__( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, cipher_send: CipherState, cipher_recv: CipherState, remote_hash: bytes = b"", ): self._reader = reader self._writer = writer self._cipher_send = cipher_send self._cipher_recv = cipher_recv self._remote_hash = remote_hash @property def remote_hash(self) -> bytes: return self._remote_hash async def send_frame(self, frame: NTCP2Frame) -> None: """Serialize, encrypt, and send a frame with a 4-byte length prefix.""" frame_bytes = frame.to_bytes() encrypted = self._cipher_send.encrypt_with_ad(b"", frame_bytes) length_prefix = struct.pack("!I", len(encrypted)) self._writer.write(length_prefix + encrypted) await self._writer.drain() async def recv_frame(self) -> NTCP2Frame: """Read a length-prefixed encrypted frame, decrypt, and parse it.""" length_data = await self._reader.readexactly(4) length = struct.unpack("!I", length_data)[0] encrypted = await self._reader.readexactly(length) decrypted = self._cipher_recv.decrypt_with_ad(b"", encrypted) return NTCP2Frame.from_bytes(decrypted) async def send_i2np(self, message_type: int, payload: bytes) -> None: """Send an I2NP message wrapped in an NTCP2 frame.""" inner = struct.pack("!BH", message_type, len(payload)) + payload frame = NTCP2Frame(FrameType.I2NP, inner) await self.send_frame(frame) async def close(self) -> None: """Send a termination frame and close the connection.""" # Termination payload: reason(1 byte, 0=normal) + valid_received(8 bytes) termination_payload = struct.pack("!BQ", 0, 0) frame = NTCP2Frame(FrameType.TERMINATION, termination_payload) await self.send_frame(frame) self._writer.close() await self._writer.wait_closed() def is_alive(self) -> bool: """Check if the connection is still open.""" return not self._writer.is_closing()