A Python port of the Invisible Internet Project (I2P)
at main 203 lines 7.1 kB view raw
1"""NTCP2 listener, connector, and connection manager. 2 3Provides the server-side listener (NTCP2Listener) that accepts incoming 4NTCP2 connections, the client-side connector (NTCP2Connector) that 5initiates outbound connections, and a ConnectionManager for tracking 6active peer connections. 7 8All handshake messages are length-framed on the wire using a 2-byte 9big-endian length prefix during the Noise_XK handshake phase. 10""" 11 12import asyncio 13import struct 14from typing import Callable, Awaitable 15 16from i2p_transport.ntcp2_connection import NTCP2Connection 17from i2p_transport.ntcp2_handshake import NTCP2Handshake 18 19 20# --------------------------------------------------------------------------- 21# Handshake wire helpers — 2-byte big-endian length prefix 22# --------------------------------------------------------------------------- 23 24async def _send_handshake_msg(writer: asyncio.StreamWriter, msg: bytes) -> None: 25 """Send a handshake message with a 2-byte big-endian length prefix.""" 26 writer.write(struct.pack("!H", len(msg)) + msg) 27 await writer.drain() 28 29 30async def _recv_handshake_msg(reader: asyncio.StreamReader) -> bytes: 31 """Receive a handshake message (2-byte length prefix, then payload).""" 32 length_bytes = await reader.readexactly(2) 33 length = struct.unpack("!H", length_bytes)[0] 34 return await reader.readexactly(length) 35 36 37# --------------------------------------------------------------------------- 38# ConnectionManager 39# --------------------------------------------------------------------------- 40 41class ConnectionManager: 42 """Tracks active NTCP2 connections keyed by peer hash.""" 43 44 def __init__(self) -> None: 45 self._connections: dict[bytes, NTCP2Connection] = {} 46 47 def add(self, peer_hash: bytes, connection: NTCP2Connection) -> None: 48 """Store a connection for the given peer hash.""" 49 self._connections[peer_hash] = connection 50 51 def get(self, peer_hash: bytes) -> NTCP2Connection | None: 52 """Retrieve a connection by peer hash, or None if not found.""" 53 return self._connections.get(peer_hash) 54 55 def remove(self, peer_hash: bytes) -> None: 56 """Remove a connection by peer hash (no-op if not present).""" 57 self._connections.pop(peer_hash, None) 58 59 def active_count(self) -> int: 60 """Return the number of tracked connections.""" 61 return len(self._connections) 62 63 def all_peer_hashes(self) -> list[bytes]: 64 """Return a list of all tracked peer hashes.""" 65 return list(self._connections.keys()) 66 67 def close_all(self) -> None: 68 """Close all connections synchronously (calls writer.close()).""" 69 for conn in self._connections.values(): 70 conn._writer.close() 71 self._connections.clear() 72 73 74# --------------------------------------------------------------------------- 75# NTCP2Listener — accepts incoming connections 76# --------------------------------------------------------------------------- 77 78class NTCP2Listener: 79 """Listens for incoming NTCP2 connections and performs the responder handshake. 80 81 Args: 82 host: Bind address (e.g. "0.0.0.0" or "127.0.0.1"). 83 port: Bind port (use 0 to let the OS pick a free port). 84 our_static: (private_key, public_key) X25519 keypair. 85 on_connection: Optional async callback invoked with each new 86 NTCP2Connection after a successful handshake. 87 """ 88 89 def __init__( 90 self, 91 host: str, 92 port: int, 93 our_static: tuple[bytes, bytes], 94 on_connection: Callable[[NTCP2Connection], Awaitable[None]] | None = None, 95 ) -> None: 96 self._host = host 97 self._port = port 98 self._our_static = our_static 99 self._on_connection = on_connection 100 101 async def start(self) -> asyncio.Server: 102 """Start listening and return the asyncio.Server object.""" 103 server = await asyncio.start_server( 104 self._handle_client, self._host, self._port 105 ) 106 return server 107 108 async def _handle_client( 109 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter 110 ) -> None: 111 """Run the Noise_XK handshake as responder, then notify callback.""" 112 try: 113 hs = NTCP2Handshake( 114 our_static=self._our_static, 115 peer_static_pub=None, 116 initiator=False, 117 ) 118 119 # Responder: read msg1, send msg2, read msg3 120 msg1 = await _recv_handshake_msg(reader) 121 msg2 = hs.process_message_1(msg1) 122 await _send_handshake_msg(writer, msg2) 123 124 msg3 = await _recv_handshake_msg(reader) 125 hs.process_message_3(msg3) 126 127 # Derive transport cipher states 128 send_cipher, recv_cipher = hs.split() 129 130 conn = NTCP2Connection( 131 reader=reader, 132 writer=writer, 133 cipher_send=send_cipher, 134 cipher_recv=recv_cipher, 135 remote_hash=hs.remote_static_key() or b"", 136 ) 137 138 if self._on_connection is not None: 139 await self._on_connection(conn) 140 141 except Exception: 142 writer.close() 143 144 145# --------------------------------------------------------------------------- 146# NTCP2Connector — initiates outbound connections 147# --------------------------------------------------------------------------- 148 149class NTCP2Connector: 150 """Connects to a remote NTCP2 peer and performs the initiator handshake.""" 151 152 async def connect( 153 self, 154 host: str, 155 port: int, 156 our_static: tuple[bytes, bytes], 157 peer_static_pub: bytes, 158 ) -> NTCP2Connection: 159 """Open a TCP connection and perform the Noise_XK handshake as initiator. 160 161 Args: 162 host: Remote host address. 163 port: Remote port. 164 our_static: (private_key, public_key) X25519 keypair. 165 peer_static_pub: Remote peer's static X25519 public key. 166 167 Returns: 168 An established NTCP2Connection ready for frame exchange. 169 170 Raises: 171 ConnectionRefusedError: If the TCP connection cannot be established. 172 """ 173 reader, writer = await asyncio.open_connection(host, port) 174 175 try: 176 hs = NTCP2Handshake( 177 our_static=our_static, 178 peer_static_pub=peer_static_pub, 179 initiator=True, 180 ) 181 182 # Initiator: send msg1, read msg2, send msg3 183 msg1 = hs.create_message_1() 184 await _send_handshake_msg(writer, msg1) 185 186 msg2 = await _recv_handshake_msg(reader) 187 msg3 = hs.process_message_2(msg2) 188 await _send_handshake_msg(writer, msg3) 189 190 # Derive transport cipher states 191 send_cipher, recv_cipher = hs.split() 192 193 return NTCP2Connection( 194 reader=reader, 195 writer=writer, 196 cipher_send=send_cipher, 197 cipher_recv=recv_cipher, 198 remote_hash=peer_static_pub, 199 ) 200 201 except Exception: 202 writer.close() 203 raise