A Python port of the Invisible Internet Project (I2P)
1"""SipHash-2-4 implementation for I2P NTCP2 length obfuscation.
2
3Implements the SipHash-2-4 PRF as specified by Aumasson and Bernstein,
4plus a ratcheting wrapper used in I2P's NTCP2 transport protocol.
5
6Reference: https://www.aumasson.jp/siphash/siphash.pdf
7"""
8
9MASK64 = 0xFFFFFFFFFFFFFFFF
10
11
12def _rotl64(x: int, b: int) -> int:
13 """Rotate left a 64-bit value."""
14 return ((x << b) | (x >> (64 - b))) & MASK64
15
16
17def siphash_2_4(k0: int, k1: int, data: bytes) -> int:
18 """Compute SipHash-2-4.
19
20 Args:
21 k0: First 64-bit key half (unsigned).
22 k1: Second 64-bit key half (unsigned).
23 data: Arbitrary-length message bytes.
24
25 Returns:
26 64-bit unsigned hash value.
27 """
28 v0 = k0 ^ 0x736F6D6570736575
29 v1 = k1 ^ 0x646F72616E646F6D
30 v2 = k0 ^ 0x6C7967656E657261
31 v3 = k1 ^ 0x7465646279746573
32
33 length = len(data)
34
35 # Process full 8-byte blocks
36 num_blocks = length // 8
37 for i in range(num_blocks):
38 m = int.from_bytes(data[i * 8 : i * 8 + 8], "little")
39 v3 ^= m
40 # 2 compression rounds
41 for _ in range(2):
42 v0, v1, v2, v3 = _sipround(v0, v1, v2, v3)
43 v0 ^= m
44
45 # Final block: remaining bytes padded with zeros, length mod 256 in MSB
46 remaining = data[num_blocks * 8 :]
47 last = (length & 0xFF) << 56
48 for i, byte in enumerate(remaining):
49 last |= byte << (8 * i)
50
51 v3 ^= last
52 for _ in range(2):
53 v0, v1, v2, v3 = _sipround(v0, v1, v2, v3)
54 v0 ^= last
55
56 # Finalization
57 v2 ^= 0xFF
58 for _ in range(4):
59 v0, v1, v2, v3 = _sipround(v0, v1, v2, v3)
60
61 return (v0 ^ v1 ^ v2 ^ v3) & MASK64
62
63
64def _sipround(v0: int, v1: int, v2: int, v3: int) -> tuple[int, int, int, int]:
65 """One SipRound."""
66 v0 = (v0 + v1) & MASK64
67 v1 = _rotl64(v1, 13)
68 v1 ^= v0
69 v0 = _rotl64(v0, 32)
70
71 v2 = (v2 + v3) & MASK64
72 v3 = _rotl64(v3, 16)
73 v3 ^= v2
74
75 v0 = (v0 + v3) & MASK64
76 v3 = _rotl64(v3, 21)
77 v3 ^= v0
78
79 v2 = (v2 + v1) & MASK64
80 v1 = _rotl64(v1, 17)
81 v1 ^= v2
82 v2 = _rotl64(v2, 32)
83
84 return v0, v1, v2, v3
85
86
87class SipHashRatchet:
88 """Ratcheting SipHash for I2P NTCP2 frame length obfuscation.
89
90 Each call to next() hashes the current IV (as 8-byte LE) and
91 advances the IV to the hash output. This produces a deterministic
92 stream of pseudo-random values from a shared key and initial IV.
93 """
94
95 def __init__(self, k0: int, k1: int, iv: int) -> None:
96 self._k0 = k0
97 self._k1 = k1
98 self._iv = iv & MASK64
99
100 def next(self) -> int:
101 """Compute SipHash of current IV, advance IV, return hash."""
102 data = self._iv.to_bytes(8, "little")
103 h = siphash_2_4(self._k0, self._k1, data)
104 self._iv = h
105 return h
106
107 def obfuscate_length(self, length: int) -> bytes:
108 """XOR a 2-byte big-endian length with the low 2 bytes of next().
109
110 Args:
111 length: Frame length (0..65535).
112
113 Returns:
114 2 bytes of obfuscated length.
115 """
116 mask = self.next()
117 length_bytes = length.to_bytes(2, "big")
118 mask_bytes = (mask & 0xFFFF).to_bytes(2, "big")
119 return bytes(a ^ b for a, b in zip(length_bytes, mask_bytes))
120
121 def deobfuscate_length(self, data: bytes) -> int:
122 """Reverse obfuscate_length to recover the original length.
123
124 Args:
125 data: Exactly 2 bytes of obfuscated length.
126
127 Returns:
128 Original frame length.
129
130 Raises:
131 ValueError: If data is not exactly 2 bytes.
132 """
133 if len(data) != 2:
134 raise ValueError(f"Expected 2 bytes, got {len(data)}")
135 mask = self.next()
136 mask_bytes = (mask & 0xFFFF).to_bytes(2, "big")
137 plain = bytes(a ^ b for a, b in zip(data, mask_bytes))
138 return int.from_bytes(plain, "big")