A Python port of the Invisible Internet Project (I2P)
1"""NTCP2 listener, connector, and connection manager.
2
3Provides the server-side listener (NTCP2Listener) that accepts incoming
4NTCP2 connections, the client-side connector (NTCP2Connector) that
5initiates outbound connections, and a ConnectionManager for tracking
6active peer connections.
7
8All handshake messages are length-framed on the wire using a 2-byte
9big-endian length prefix during the Noise_XK handshake phase.
10"""
11
12import asyncio
13import struct
14from typing import Callable, Awaitable
15
16from i2p_transport.ntcp2_connection import NTCP2Connection
17from i2p_transport.ntcp2_handshake import NTCP2Handshake
18
19
20# ---------------------------------------------------------------------------
21# Handshake wire helpers — 2-byte big-endian length prefix
22# ---------------------------------------------------------------------------
23
24async def _send_handshake_msg(writer: asyncio.StreamWriter, msg: bytes) -> None:
25 """Send a handshake message with a 2-byte big-endian length prefix."""
26 writer.write(struct.pack("!H", len(msg)) + msg)
27 await writer.drain()
28
29
30async def _recv_handshake_msg(reader: asyncio.StreamReader) -> bytes:
31 """Receive a handshake message (2-byte length prefix, then payload)."""
32 length_bytes = await reader.readexactly(2)
33 length = struct.unpack("!H", length_bytes)[0]
34 return await reader.readexactly(length)
35
36
37# ---------------------------------------------------------------------------
38# ConnectionManager
39# ---------------------------------------------------------------------------
40
41class ConnectionManager:
42 """Tracks active NTCP2 connections keyed by peer hash."""
43
44 def __init__(self) -> None:
45 self._connections: dict[bytes, NTCP2Connection] = {}
46
47 def add(self, peer_hash: bytes, connection: NTCP2Connection) -> None:
48 """Store a connection for the given peer hash."""
49 self._connections[peer_hash] = connection
50
51 def get(self, peer_hash: bytes) -> NTCP2Connection | None:
52 """Retrieve a connection by peer hash, or None if not found."""
53 return self._connections.get(peer_hash)
54
55 def remove(self, peer_hash: bytes) -> None:
56 """Remove a connection by peer hash (no-op if not present)."""
57 self._connections.pop(peer_hash, None)
58
59 def active_count(self) -> int:
60 """Return the number of tracked connections."""
61 return len(self._connections)
62
63 def all_peer_hashes(self) -> list[bytes]:
64 """Return a list of all tracked peer hashes."""
65 return list(self._connections.keys())
66
67 def close_all(self) -> None:
68 """Close all connections synchronously (calls writer.close())."""
69 for conn in self._connections.values():
70 conn._writer.close()
71 self._connections.clear()
72
73
74# ---------------------------------------------------------------------------
75# NTCP2Listener — accepts incoming connections
76# ---------------------------------------------------------------------------
77
78class NTCP2Listener:
79 """Listens for incoming NTCP2 connections and performs the responder handshake.
80
81 Args:
82 host: Bind address (e.g. "0.0.0.0" or "127.0.0.1").
83 port: Bind port (use 0 to let the OS pick a free port).
84 our_static: (private_key, public_key) X25519 keypair.
85 on_connection: Optional async callback invoked with each new
86 NTCP2Connection after a successful handshake.
87 """
88
89 def __init__(
90 self,
91 host: str,
92 port: int,
93 our_static: tuple[bytes, bytes],
94 on_connection: Callable[[NTCP2Connection], Awaitable[None]] | None = None,
95 ) -> None:
96 self._host = host
97 self._port = port
98 self._our_static = our_static
99 self._on_connection = on_connection
100
101 async def start(self) -> asyncio.Server:
102 """Start listening and return the asyncio.Server object."""
103 server = await asyncio.start_server(
104 self._handle_client, self._host, self._port
105 )
106 return server
107
108 async def _handle_client(
109 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
110 ) -> None:
111 """Run the Noise_XK handshake as responder, then notify callback."""
112 try:
113 hs = NTCP2Handshake(
114 our_static=self._our_static,
115 peer_static_pub=None,
116 initiator=False,
117 )
118
119 # Responder: read msg1, send msg2, read msg3
120 msg1 = await _recv_handshake_msg(reader)
121 msg2 = hs.process_message_1(msg1)
122 await _send_handshake_msg(writer, msg2)
123
124 msg3 = await _recv_handshake_msg(reader)
125 hs.process_message_3(msg3)
126
127 # Derive transport cipher states
128 send_cipher, recv_cipher = hs.split()
129
130 conn = NTCP2Connection(
131 reader=reader,
132 writer=writer,
133 cipher_send=send_cipher,
134 cipher_recv=recv_cipher,
135 remote_hash=hs.remote_static_key() or b"",
136 )
137
138 if self._on_connection is not None:
139 await self._on_connection(conn)
140
141 except Exception:
142 writer.close()
143
144
145# ---------------------------------------------------------------------------
146# NTCP2Connector — initiates outbound connections
147# ---------------------------------------------------------------------------
148
149class NTCP2Connector:
150 """Connects to a remote NTCP2 peer and performs the initiator handshake."""
151
152 async def connect(
153 self,
154 host: str,
155 port: int,
156 our_static: tuple[bytes, bytes],
157 peer_static_pub: bytes,
158 ) -> NTCP2Connection:
159 """Open a TCP connection and perform the Noise_XK handshake as initiator.
160
161 Args:
162 host: Remote host address.
163 port: Remote port.
164 our_static: (private_key, public_key) X25519 keypair.
165 peer_static_pub: Remote peer's static X25519 public key.
166
167 Returns:
168 An established NTCP2Connection ready for frame exchange.
169
170 Raises:
171 ConnectionRefusedError: If the TCP connection cannot be established.
172 """
173 reader, writer = await asyncio.open_connection(host, port)
174
175 try:
176 hs = NTCP2Handshake(
177 our_static=our_static,
178 peer_static_pub=peer_static_pub,
179 initiator=True,
180 )
181
182 # Initiator: send msg1, read msg2, send msg3
183 msg1 = hs.create_message_1()
184 await _send_handshake_msg(writer, msg1)
185
186 msg2 = await _recv_handshake_msg(reader)
187 msg3 = hs.process_message_2(msg2)
188 await _send_handshake_msg(writer, msg3)
189
190 # Derive transport cipher states
191 send_cipher, recv_cipher = hs.split()
192
193 return NTCP2Connection(
194 reader=reader,
195 writer=writer,
196 cipher_send=send_cipher,
197 cipher_recv=recv_cipher,
198 remote_hash=peer_static_pub,
199 )
200
201 except Exception:
202 writer.close()
203 raise