A Python port of the Invisible Internet Project (I2P)
1"""NTCP2 Noise handshake integration.
2
3Wraps the Noise_XK HandshakeState for the NTCP2 3-message handshake,
4and provides frame encryption/decryption and message fragmentation
5after the handshake completes.
6"""
7
8from i2p_crypto.noise import HandshakeState, CipherState
9from i2p_crypto.x25519 import X25519DH
10from i2p_transport.ntcp2 import NTCP2Frame, FrameType
11
12
13# Placeholder payload sent by responder/initiator for RouterInfo blocks
14_ROUTER_INFO_PLACEHOLDER = b""
15
16
17class NTCP2Handshake:
18 """Wraps HandshakeState('Noise_XK') for the NTCP2 3-message handshake."""
19
20 def __init__(
21 self,
22 our_static: tuple[bytes, bytes],
23 peer_static_pub: bytes | None = None,
24 initiator: bool = True,
25 ):
26 """
27 Args:
28 our_static: (private_key, public_key) tuple.
29 peer_static_pub: Remote static public key (needed for initiator).
30 initiator: True if we initiate the connection.
31 """
32 self._initiator = initiator
33 self._hs = HandshakeState(
34 pattern="Noise_XK",
35 initiator=initiator,
36 s=our_static,
37 rs=peer_static_pub,
38 )
39 self._step = 0 # tracks which handshake step we are on
40
41 # -- Initiator step 1 --------------------------------------------------
42
43 def create_message_1(self, options: bytes = b"") -> bytes:
44 """Initiator sends message 1: write_message(options)."""
45 if not self._initiator:
46 raise RuntimeError("Only initiator can create message 1")
47 if self._step != 0:
48 raise RuntimeError("create_message_1 must be called first")
49 msg = self._hs.write_message(options)
50 self._step = 1
51 return msg
52
53 # -- Responder step: receive msg1, produce msg2 -------------------------
54
55 def process_message_1(self, msg1: bytes) -> bytes:
56 """Responder receives msg1, returns msg2."""
57 if self._initiator:
58 raise RuntimeError("Only responder can process message 1")
59 if self._step != 0:
60 raise RuntimeError("process_message_1 must be called first")
61 self._hs.read_message(msg1)
62 msg2 = self._hs.write_message(_ROUTER_INFO_PLACEHOLDER)
63 self._step = 2
64 return msg2
65
66 # -- Initiator step: receive msg2, produce msg3 -------------------------
67
68 def process_message_2(self, msg2: bytes) -> bytes:
69 """Initiator receives msg2, returns msg3."""
70 if not self._initiator:
71 raise RuntimeError("Only initiator can process message 2")
72 if self._step != 1:
73 raise RuntimeError("Must call create_message_1 before process_message_2")
74 self._hs.read_message(msg2)
75 msg3 = self._hs.write_message(_ROUTER_INFO_PLACEHOLDER)
76 self._step = 3
77 return msg3
78
79 # -- Responder step: receive msg3 ---------------------------------------
80
81 def process_message_3(self, msg3: bytes) -> None:
82 """Responder receives msg3."""
83 if self._initiator:
84 raise RuntimeError("Only responder can process message 3")
85 if self._step != 2:
86 raise RuntimeError("Must call process_message_1 before process_message_3")
87 self._hs.read_message(msg3)
88 self._step = 3
89
90 # -- Post-handshake -----------------------------------------------------
91
92 def split(self) -> tuple[CipherState, CipherState]:
93 """After handshake, return (send_cipher, recv_cipher).
94
95 Noise split() produces (c1, c2). By convention:
96 - Initiator sends with c1, receives with c2.
97 - Responder sends with c2, receives with c1.
98 """
99 c1, c2 = self._hs.split()
100 if self._initiator:
101 return c1, c2
102 else:
103 return c2, c1
104
105 def is_complete(self) -> bool:
106 return self._hs.complete
107
108 def remote_static_key(self) -> bytes | None:
109 return self._hs.remote_static
110
111
112class NTCP2FrameCodec:
113 """Encrypts/decrypts NTCP2 frames after handshake completion."""
114
115 def encrypt_frame(self, cipher: CipherState, frame: NTCP2Frame) -> bytes:
116 """Serialize frame and encrypt with the given CipherState."""
117 frame_bytes = frame.to_bytes()
118 return cipher.encrypt_with_ad(b"", frame_bytes)
119
120 def decrypt_frame(self, cipher: CipherState, encrypted: bytes) -> NTCP2Frame:
121 """Decrypt and parse an NTCP2Frame."""
122 frame_bytes = cipher.decrypt_with_ad(b"", encrypted)
123 return NTCP2Frame.from_bytes(frame_bytes)
124
125
126class NTCP2MessageFragmenter:
127 """Fragment/reassemble I2NP messages across NTCP2 frames."""
128
129 def fragment(self, i2np_bytes: bytes, max_payload: int = 65535) -> list[NTCP2Frame]:
130 """Split i2np_bytes into chunks, each becoming an I2NP frame."""
131 if len(i2np_bytes) == 0:
132 return [NTCP2Frame(FrameType.I2NP, b"")]
133
134 frames = []
135 offset = 0
136 while offset < len(i2np_bytes):
137 chunk = i2np_bytes[offset : offset + max_payload]
138 frames.append(NTCP2Frame(FrameType.I2NP, chunk))
139 offset += max_payload
140 return frames
141
142 def reassemble(self, frames: list[NTCP2Frame]) -> bytes:
143 """Concatenate payloads of all I2NP frames."""
144 return b"".join(f.payload for f in frames)