A Python port of the Invisible Internet Project (I2P)
at main 138 lines 3.8 kB view raw
1"""SipHash-2-4 implementation for I2P NTCP2 length obfuscation. 2 3Implements the SipHash-2-4 PRF as specified by Aumasson and Bernstein, 4plus a ratcheting wrapper used in I2P's NTCP2 transport protocol. 5 6Reference: https://www.aumasson.jp/siphash/siphash.pdf 7""" 8 9MASK64 = 0xFFFFFFFFFFFFFFFF 10 11 12def _rotl64(x: int, b: int) -> int: 13 """Rotate left a 64-bit value.""" 14 return ((x << b) | (x >> (64 - b))) & MASK64 15 16 17def siphash_2_4(k0: int, k1: int, data: bytes) -> int: 18 """Compute SipHash-2-4. 19 20 Args: 21 k0: First 64-bit key half (unsigned). 22 k1: Second 64-bit key half (unsigned). 23 data: Arbitrary-length message bytes. 24 25 Returns: 26 64-bit unsigned hash value. 27 """ 28 v0 = k0 ^ 0x736F6D6570736575 29 v1 = k1 ^ 0x646F72616E646F6D 30 v2 = k0 ^ 0x6C7967656E657261 31 v3 = k1 ^ 0x7465646279746573 32 33 length = len(data) 34 35 # Process full 8-byte blocks 36 num_blocks = length // 8 37 for i in range(num_blocks): 38 m = int.from_bytes(data[i * 8 : i * 8 + 8], "little") 39 v3 ^= m 40 # 2 compression rounds 41 for _ in range(2): 42 v0, v1, v2, v3 = _sipround(v0, v1, v2, v3) 43 v0 ^= m 44 45 # Final block: remaining bytes padded with zeros, length mod 256 in MSB 46 remaining = data[num_blocks * 8 :] 47 last = (length & 0xFF) << 56 48 for i, byte in enumerate(remaining): 49 last |= byte << (8 * i) 50 51 v3 ^= last 52 for _ in range(2): 53 v0, v1, v2, v3 = _sipround(v0, v1, v2, v3) 54 v0 ^= last 55 56 # Finalization 57 v2 ^= 0xFF 58 for _ in range(4): 59 v0, v1, v2, v3 = _sipround(v0, v1, v2, v3) 60 61 return (v0 ^ v1 ^ v2 ^ v3) & MASK64 62 63 64def _sipround(v0: int, v1: int, v2: int, v3: int) -> tuple[int, int, int, int]: 65 """One SipRound.""" 66 v0 = (v0 + v1) & MASK64 67 v1 = _rotl64(v1, 13) 68 v1 ^= v0 69 v0 = _rotl64(v0, 32) 70 71 v2 = (v2 + v3) & MASK64 72 v3 = _rotl64(v3, 16) 73 v3 ^= v2 74 75 v0 = (v0 + v3) & MASK64 76 v3 = _rotl64(v3, 21) 77 v3 ^= v0 78 79 v2 = (v2 + v1) & MASK64 80 v1 = _rotl64(v1, 17) 81 v1 ^= v2 82 v2 = _rotl64(v2, 32) 83 84 return v0, v1, v2, v3 85 86 87class SipHashRatchet: 88 """Ratcheting SipHash for I2P NTCP2 frame length obfuscation. 89 90 Each call to next() hashes the current IV (as 8-byte LE) and 91 advances the IV to the hash output. This produces a deterministic 92 stream of pseudo-random values from a shared key and initial IV. 93 """ 94 95 def __init__(self, k0: int, k1: int, iv: int) -> None: 96 self._k0 = k0 97 self._k1 = k1 98 self._iv = iv & MASK64 99 100 def next(self) -> int: 101 """Compute SipHash of current IV, advance IV, return hash.""" 102 data = self._iv.to_bytes(8, "little") 103 h = siphash_2_4(self._k0, self._k1, data) 104 self._iv = h 105 return h 106 107 def obfuscate_length(self, length: int) -> bytes: 108 """XOR a 2-byte big-endian length with the low 2 bytes of next(). 109 110 Args: 111 length: Frame length (0..65535). 112 113 Returns: 114 2 bytes of obfuscated length. 115 """ 116 mask = self.next() 117 length_bytes = length.to_bytes(2, "big") 118 mask_bytes = (mask & 0xFFFF).to_bytes(2, "big") 119 return bytes(a ^ b for a, b in zip(length_bytes, mask_bytes)) 120 121 def deobfuscate_length(self, data: bytes) -> int: 122 """Reverse obfuscate_length to recover the original length. 123 124 Args: 125 data: Exactly 2 bytes of obfuscated length. 126 127 Returns: 128 Original frame length. 129 130 Raises: 131 ValueError: If data is not exactly 2 bytes. 132 """ 133 if len(data) != 2: 134 raise ValueError(f"Expected 2 bytes, got {len(data)}") 135 mask = self.next() 136 mask_bytes = (mask & 0xFFFF).to_bytes(2, "big") 137 plain = bytes(a ^ b for a, b in zip(data, mask_bytes)) 138 return int.from_bytes(plain, "big")