A Python port of the Invisible Internet Project (I2P)
at main 144 lines 5.3 kB view raw
1"""NTCP2 Noise handshake integration. 2 3Wraps the Noise_XK HandshakeState for the NTCP2 3-message handshake, 4and provides frame encryption/decryption and message fragmentation 5after the handshake completes. 6""" 7 8from i2p_crypto.noise import HandshakeState, CipherState 9from i2p_crypto.x25519 import X25519DH 10from i2p_transport.ntcp2 import NTCP2Frame, FrameType 11 12 13# Placeholder payload sent by responder/initiator for RouterInfo blocks 14_ROUTER_INFO_PLACEHOLDER = b"" 15 16 17class NTCP2Handshake: 18 """Wraps HandshakeState('Noise_XK') for the NTCP2 3-message handshake.""" 19 20 def __init__( 21 self, 22 our_static: tuple[bytes, bytes], 23 peer_static_pub: bytes | None = None, 24 initiator: bool = True, 25 ): 26 """ 27 Args: 28 our_static: (private_key, public_key) tuple. 29 peer_static_pub: Remote static public key (needed for initiator). 30 initiator: True if we initiate the connection. 31 """ 32 self._initiator = initiator 33 self._hs = HandshakeState( 34 pattern="Noise_XK", 35 initiator=initiator, 36 s=our_static, 37 rs=peer_static_pub, 38 ) 39 self._step = 0 # tracks which handshake step we are on 40 41 # -- Initiator step 1 -------------------------------------------------- 42 43 def create_message_1(self, options: bytes = b"") -> bytes: 44 """Initiator sends message 1: write_message(options).""" 45 if not self._initiator: 46 raise RuntimeError("Only initiator can create message 1") 47 if self._step != 0: 48 raise RuntimeError("create_message_1 must be called first") 49 msg = self._hs.write_message(options) 50 self._step = 1 51 return msg 52 53 # -- Responder step: receive msg1, produce msg2 ------------------------- 54 55 def process_message_1(self, msg1: bytes) -> bytes: 56 """Responder receives msg1, returns msg2.""" 57 if self._initiator: 58 raise RuntimeError("Only responder can process message 1") 59 if self._step != 0: 60 raise RuntimeError("process_message_1 must be called first") 61 self._hs.read_message(msg1) 62 msg2 = self._hs.write_message(_ROUTER_INFO_PLACEHOLDER) 63 self._step = 2 64 return msg2 65 66 # -- Initiator step: receive msg2, produce msg3 ------------------------- 67 68 def process_message_2(self, msg2: bytes) -> bytes: 69 """Initiator receives msg2, returns msg3.""" 70 if not self._initiator: 71 raise RuntimeError("Only initiator can process message 2") 72 if self._step != 1: 73 raise RuntimeError("Must call create_message_1 before process_message_2") 74 self._hs.read_message(msg2) 75 msg3 = self._hs.write_message(_ROUTER_INFO_PLACEHOLDER) 76 self._step = 3 77 return msg3 78 79 # -- Responder step: receive msg3 --------------------------------------- 80 81 def process_message_3(self, msg3: bytes) -> None: 82 """Responder receives msg3.""" 83 if self._initiator: 84 raise RuntimeError("Only responder can process message 3") 85 if self._step != 2: 86 raise RuntimeError("Must call process_message_1 before process_message_3") 87 self._hs.read_message(msg3) 88 self._step = 3 89 90 # -- Post-handshake ----------------------------------------------------- 91 92 def split(self) -> tuple[CipherState, CipherState]: 93 """After handshake, return (send_cipher, recv_cipher). 94 95 Noise split() produces (c1, c2). By convention: 96 - Initiator sends with c1, receives with c2. 97 - Responder sends with c2, receives with c1. 98 """ 99 c1, c2 = self._hs.split() 100 if self._initiator: 101 return c1, c2 102 else: 103 return c2, c1 104 105 def is_complete(self) -> bool: 106 return self._hs.complete 107 108 def remote_static_key(self) -> bytes | None: 109 return self._hs.remote_static 110 111 112class NTCP2FrameCodec: 113 """Encrypts/decrypts NTCP2 frames after handshake completion.""" 114 115 def encrypt_frame(self, cipher: CipherState, frame: NTCP2Frame) -> bytes: 116 """Serialize frame and encrypt with the given CipherState.""" 117 frame_bytes = frame.to_bytes() 118 return cipher.encrypt_with_ad(b"", frame_bytes) 119 120 def decrypt_frame(self, cipher: CipherState, encrypted: bytes) -> NTCP2Frame: 121 """Decrypt and parse an NTCP2Frame.""" 122 frame_bytes = cipher.decrypt_with_ad(b"", encrypted) 123 return NTCP2Frame.from_bytes(frame_bytes) 124 125 126class NTCP2MessageFragmenter: 127 """Fragment/reassemble I2NP messages across NTCP2 frames.""" 128 129 def fragment(self, i2np_bytes: bytes, max_payload: int = 65535) -> list[NTCP2Frame]: 130 """Split i2np_bytes into chunks, each becoming an I2NP frame.""" 131 if len(i2np_bytes) == 0: 132 return [NTCP2Frame(FrameType.I2NP, b"")] 133 134 frames = [] 135 offset = 0 136 while offset < len(i2np_bytes): 137 chunk = i2np_bytes[offset : offset + max_payload] 138 frames.append(NTCP2Frame(FrameType.I2NP, chunk)) 139 offset += max_payload 140 return frames 141 142 def reassemble(self, frames: list[NTCP2Frame]) -> bytes: 143 """Concatenate payloads of all I2NP frames.""" 144 return b"".join(f.payload for f in frames)