A Python port of the Invisible Internet Project (I2P)
1"""NTCP2 asyncio connection layer.
2
3Provides encrypted frame transport over an established NTCP2 session.
4After the Noise handshake completes, this class handles sending and
5receiving length-prefixed, encrypted NTCP2 frames.
6"""
7
8import asyncio
9import struct
10
11from i2p_crypto.noise import CipherState
12from i2p_transport.ntcp2 import NTCP2Frame, FrameType
13
14
15class NTCP2Connection:
16 """Encrypted NTCP2 connection over asyncio streams.
17
18 Wraps a pair of asyncio StreamReader/StreamWriter with Noise
19 CipherState objects to provide encrypted frame transport.
20 """
21
22 def __init__(
23 self,
24 reader: asyncio.StreamReader,
25 writer: asyncio.StreamWriter,
26 cipher_send: CipherState,
27 cipher_recv: CipherState,
28 remote_hash: bytes = b"",
29 ):
30 self._reader = reader
31 self._writer = writer
32 self._cipher_send = cipher_send
33 self._cipher_recv = cipher_recv
34 self._remote_hash = remote_hash
35
36 @property
37 def remote_hash(self) -> bytes:
38 return self._remote_hash
39
40 async def send_frame(self, frame: NTCP2Frame) -> None:
41 """Serialize, encrypt, and send a frame with a 4-byte length prefix."""
42 frame_bytes = frame.to_bytes()
43 encrypted = self._cipher_send.encrypt_with_ad(b"", frame_bytes)
44 length_prefix = struct.pack("!I", len(encrypted))
45 self._writer.write(length_prefix + encrypted)
46 await self._writer.drain()
47
48 async def recv_frame(self) -> NTCP2Frame:
49 """Read a length-prefixed encrypted frame, decrypt, and parse it."""
50 length_data = await self._reader.readexactly(4)
51 length = struct.unpack("!I", length_data)[0]
52 encrypted = await self._reader.readexactly(length)
53 decrypted = self._cipher_recv.decrypt_with_ad(b"", encrypted)
54 return NTCP2Frame.from_bytes(decrypted)
55
56 async def send_i2np(self, message_type: int, payload: bytes) -> None:
57 """Send an I2NP message wrapped in an NTCP2 frame."""
58 inner = struct.pack("!BH", message_type, len(payload)) + payload
59 frame = NTCP2Frame(FrameType.I2NP, inner)
60 await self.send_frame(frame)
61
62 async def close(self) -> None:
63 """Send a termination frame and close the connection."""
64 # Termination payload: reason(1 byte, 0=normal) + valid_received(8 bytes)
65 termination_payload = struct.pack("!BQ", 0, 0)
66 frame = NTCP2Frame(FrameType.TERMINATION, termination_payload)
67 await self.send_frame(frame)
68 self._writer.close()
69 await self._writer.wait_closed()
70
71 def is_alive(self) -> bool:
72 """Check if the connection is still open."""
73 return not self._writer.is_closing()