"""SipHash-2-4 implementation for I2P NTCP2 length obfuscation. Implements the SipHash-2-4 PRF as specified by Aumasson and Bernstein, plus a ratcheting wrapper used in I2P's NTCP2 transport protocol. Reference: https://www.aumasson.jp/siphash/siphash.pdf """ MASK64 = 0xFFFFFFFFFFFFFFFF def _rotl64(x: int, b: int) -> int: """Rotate left a 64-bit value.""" return ((x << b) | (x >> (64 - b))) & MASK64 def siphash_2_4(k0: int, k1: int, data: bytes) -> int: """Compute SipHash-2-4. Args: k0: First 64-bit key half (unsigned). k1: Second 64-bit key half (unsigned). data: Arbitrary-length message bytes. Returns: 64-bit unsigned hash value. """ v0 = k0 ^ 0x736F6D6570736575 v1 = k1 ^ 0x646F72616E646F6D v2 = k0 ^ 0x6C7967656E657261 v3 = k1 ^ 0x7465646279746573 length = len(data) # Process full 8-byte blocks num_blocks = length // 8 for i in range(num_blocks): m = int.from_bytes(data[i * 8 : i * 8 + 8], "little") v3 ^= m # 2 compression rounds for _ in range(2): v0, v1, v2, v3 = _sipround(v0, v1, v2, v3) v0 ^= m # Final block: remaining bytes padded with zeros, length mod 256 in MSB remaining = data[num_blocks * 8 :] last = (length & 0xFF) << 56 for i, byte in enumerate(remaining): last |= byte << (8 * i) v3 ^= last for _ in range(2): v0, v1, v2, v3 = _sipround(v0, v1, v2, v3) v0 ^= last # Finalization v2 ^= 0xFF for _ in range(4): v0, v1, v2, v3 = _sipround(v0, v1, v2, v3) return (v0 ^ v1 ^ v2 ^ v3) & MASK64 def _sipround(v0: int, v1: int, v2: int, v3: int) -> tuple[int, int, int, int]: """One SipRound.""" v0 = (v0 + v1) & MASK64 v1 = _rotl64(v1, 13) v1 ^= v0 v0 = _rotl64(v0, 32) v2 = (v2 + v3) & MASK64 v3 = _rotl64(v3, 16) v3 ^= v2 v0 = (v0 + v3) & MASK64 v3 = _rotl64(v3, 21) v3 ^= v0 v2 = (v2 + v1) & MASK64 v1 = _rotl64(v1, 17) v1 ^= v2 v2 = _rotl64(v2, 32) return v0, v1, v2, v3 class SipHashRatchet: """Ratcheting SipHash for I2P NTCP2 frame length obfuscation. Each call to next() hashes the current IV (as 8-byte LE) and advances the IV to the hash output. This produces a deterministic stream of pseudo-random values from a shared key and initial IV. """ def __init__(self, k0: int, k1: int, iv: int) -> None: self._k0 = k0 self._k1 = k1 self._iv = iv & MASK64 def next(self) -> int: """Compute SipHash of current IV, advance IV, return hash.""" data = self._iv.to_bytes(8, "little") h = siphash_2_4(self._k0, self._k1, data) self._iv = h return h def obfuscate_length(self, length: int) -> bytes: """XOR a 2-byte big-endian length with the low 2 bytes of next(). Args: length: Frame length (0..65535). Returns: 2 bytes of obfuscated length. """ mask = self.next() length_bytes = length.to_bytes(2, "big") mask_bytes = (mask & 0xFFFF).to_bytes(2, "big") return bytes(a ^ b for a, b in zip(length_bytes, mask_bytes)) def deobfuscate_length(self, data: bytes) -> int: """Reverse obfuscate_length to recover the original length. Args: data: Exactly 2 bytes of obfuscated length. Returns: Original frame length. Raises: ValueError: If data is not exactly 2 bytes. """ if len(data) != 2: raise ValueError(f"Expected 2 bytes, got {len(data)}") mask = self.next() mask_bytes = (mask & 0xFFFF).to_bytes(2, "big") plain = bytes(a ^ b for a, b in zip(data, mask_bytes)) return int.from_bytes(plain, "big")