A Python port of the Invisible Internet Project (I2P)
1"""SSU2 transport protocol types.
2
3Ported from net.i2p.router.transport.udp.SSU2Header and related classes.
4"""
5
6import enum
7import os
8import struct
9
10from i2p_crypto.chacha20 import ChaCha20
11
12# ---------------------------------------------------------------------------
13# Header / packet constants
14# ---------------------------------------------------------------------------
15
16SHORT_HEADER_SIZE = 16 # dest_conn_id(8) + pkt_num(4) + type(1) + flags(3)
17LONG_HEADER_SIZE = 32 # short + version(1) + net_id(1) + src_conn_id(8) + token(8)
18SESSION_HEADER_SIZE = 64 # long(32) + ephemeral_key(32)
19MAC_LEN = 16 # ChaCha20-Poly1305 tag
20KEY_LEN = 32 # X25519 key size
21HEADER_PROT_SAMPLE_LEN = 24 # Bytes needed for header protection sampling
22PROTOCOL_VERSION = 2
23
24
25class SSU2State(enum.Enum):
26 UNKNOWN = 0
27 TOKEN_REQUEST_SENT = 1
28 SESSION_REQUEST_SENT = 2
29 SESSION_CONFIRMED = 3
30 ESTABLISHED = 4
31 TERMINATED = 5
32
33
34class BlockType(enum.IntEnum):
35 DATA = 0
36 ACK = 1
37 ADDRESS = 2
38 RELAY_REQUEST = 3
39 RELAY_RESPONSE = 4
40 RELAY_INTRO = 5
41 PEER_TEST = 6
42 I2NP = 7
43 FIRST_FRAGMENT = 8
44 FOLLOW_ON_FRAGMENT = 9
45 PADDING = 254
46 TERMINATION = 255
47
48
49class SSU2Header:
50 """SSU2 packet header: dest_conn_id(8) + pkt_num(4) + type(1) + version(1) + net_id(2)."""
51
52 SIZE = 16
53
54 def __init__(self, dest_conn_id: int, pkt_num: int,
55 header_type: int = 0, version: int = 2, net_id: int = 2):
56 self.dest_conn_id = dest_conn_id
57 self.pkt_num = pkt_num
58 self.header_type = header_type
59 self.version = version
60 self.net_id = net_id
61
62 def to_bytes(self) -> bytes:
63 return struct.pack("!QIBBH", self.dest_conn_id, self.pkt_num,
64 self.header_type, self.version, self.net_id)
65
66 @classmethod
67 def from_bytes(cls, data: bytes) -> "SSU2Header":
68 dest_conn_id, pkt_num, header_type, version, net_id = struct.unpack(
69 "!QIBBH", data[:cls.SIZE])
70 return cls(dest_conn_id, pkt_num, header_type, version, net_id)
71
72
73class SSU2Block:
74 """SSU2 block: type(1) + length(2) + payload."""
75
76 def __init__(self, block_type: BlockType, payload: bytes):
77 self.block_type = block_type
78 self.payload = payload
79
80 def to_bytes(self) -> bytes:
81 return struct.pack("!BH", self.block_type, len(self.payload)) + self.payload
82
83 @classmethod
84 def from_bytes(cls, data: bytes) -> "SSU2Block":
85 block_type, length = struct.unpack("!BH", data[:3])
86 return cls(BlockType(block_type), data[3:3 + length])
87
88 @classmethod
89 def from_stream(cls, stream) -> "SSU2Block":
90 header = stream.read(3)
91 block_type, length = struct.unpack("!BH", header)
92 payload = stream.read(length)
93 return cls(BlockType(block_type), payload)
94
95
96class SSU2SessionState:
97 """SSU2 session state machine."""
98
99 def __init__(self):
100 self.state = SSU2State.UNKNOWN
101 self.peer_test_active = False
102 self.path_challenge_pending = False
103 self._path_nonce: bytes | None = None
104
105 def send_token_request(self):
106 self.state = SSU2State.TOKEN_REQUEST_SENT
107
108 def send_session_request(self):
109 self.state = SSU2State.SESSION_REQUEST_SENT
110
111 def receive_session_created(self):
112 if self.state != SSU2State.SESSION_REQUEST_SENT:
113 raise RuntimeError(f"Cannot receive session created from {self.state}")
114 self.state = SSU2State.SESSION_CONFIRMED
115
116 def confirm(self):
117 if self.state != SSU2State.SESSION_CONFIRMED:
118 raise RuntimeError(f"Cannot confirm from {self.state}")
119 self.state = SSU2State.ESTABLISHED
120
121 def terminate(self):
122 self.state = SSU2State.TERMINATED
123
124 def start_peer_test(self):
125 self.peer_test_active = True
126
127 def start_path_validation(self) -> bytes:
128 self._path_nonce = os.urandom(8)
129 self.path_challenge_pending = True
130 return self._path_nonce
131
132
133# ---------------------------------------------------------------------------
134# Header protection
135# ---------------------------------------------------------------------------
136
137class SSU2HeaderProtection:
138 """ChaCha20-based header protection.
139
140 Ported from net.i2p.router.transport.udp.SSU2Header.
141
142 The first 8 bytes of the header are XORed with a ChaCha20 keystream
143 derived from a key and a nonce sampled from the encrypted payload.
144 This prevents on-path observers from reading connection IDs.
145 """
146
147 def __init__(self, header_key1: bytes, header_key2: bytes):
148 """Two keys: one for short headers, one for long/handshake headers.
149
150 Args:
151 header_key1: 32-byte key used for short header protection
152 and the first region of long headers.
153 header_key2: 32-byte key used for the second region of long headers.
154 """
155 if len(header_key1) < KEY_LEN:
156 raise ValueError(f"header_key1 must be at least {KEY_LEN} bytes")
157 if len(header_key2) < KEY_LEN:
158 raise ValueError(f"header_key2 must be at least {KEY_LEN} bytes")
159 self._key1 = header_key1[:KEY_LEN]
160 self._key2 = header_key2[:KEY_LEN]
161
162 @staticmethod
163 def _xor_region(packet: bytearray, offset: int, length: int,
164 keystream: bytes) -> None:
165 """XOR a region of *packet* in-place with *keystream*."""
166 for i in range(length):
167 packet[offset + i] ^= keystream[i]
168
169 def encrypt_short_header(self, packet: bytearray) -> None:
170 """Encrypt short header in-place.
171
172 Nonce is sampled from packet[SHORT_HEADER_SIZE:SHORT_HEADER_SIZE+12]
173 (the first 12 bytes of the encrypted body).
174 Keystream = ChaCha20(key1, nonce, counter=0)[:8]
175 XOR first 8 bytes of header with keystream.
176 """
177 nonce = bytes(packet[SHORT_HEADER_SIZE:SHORT_HEADER_SIZE + 12])
178 # Use counter=0 by passing a 16-byte nonce with 4-byte zero prefix
179 full_nonce = b"\x00\x00\x00\x00" + nonce
180 keystream = ChaCha20.encrypt(self._key1, full_nonce, b"\x00" * 8)
181 self._xor_region(packet, 0, 8, keystream)
182
183 def decrypt_short_header(self, packet: bytearray) -> None:
184 """Decrypt short header in-place (same operation as encrypt — XOR is symmetric)."""
185 self.encrypt_short_header(packet)
186
187 def encrypt_long_header(self, packet: bytearray) -> None:
188 """Encrypt long header in-place.
189
190 For long headers two regions are protected:
191 - bytes [0:8] XORed with keystream from key1
192 - bytes [12:20] XORed with keystream from key2
193 Nonce sampled from packet[LONG_HEADER_SIZE:LONG_HEADER_SIZE+12].
194 """
195 nonce = bytes(packet[LONG_HEADER_SIZE:LONG_HEADER_SIZE + 12])
196 full_nonce = b"\x00\x00\x00\x00" + nonce
197
198 ks1 = ChaCha20.encrypt(self._key1, full_nonce, b"\x00" * 8)
199 self._xor_region(packet, 0, 8, ks1)
200
201 ks2 = ChaCha20.encrypt(self._key2, full_nonce, b"\x00" * 8)
202 self._xor_region(packet, 12, 8, ks2)
203
204 def decrypt_long_header(self, packet: bytearray) -> None:
205 """Decrypt long header in-place (same operation as encrypt)."""
206 self.encrypt_long_header(packet)