A Python port of the Invisible Internet Project (I2P)
at main 258 lines 9.3 kB view raw
1"""Tests for tunnel hop encryption/decryption. 2 3Covers: TunnelLayerEncryptor, TunnelLayerDecryptor, 4 OutboundTunnelEncryptor, InboundTunnelDecryptor, 5 BuildRecordEncryptor, BuildReplyDecryptor. 6""" 7 8import os 9import hashlib 10 11import pytest 12 13from i2p_tunnel.crypto import ( 14 TunnelLayerEncryptor, 15 TunnelLayerDecryptor, 16 OutboundTunnelEncryptor, 17 InboundTunnelDecryptor, 18 BuildRecordEncryptor, 19 BuildReplyDecryptor, 20) 21from i2p_crypto.aes import AESEngine 22from i2p_crypto.elgamal import ElGamalEngine 23from i2p_tunnel.builder import BuildRecord 24 25 26# --------------------------------------------------------------------------- 27# Helpers 28# --------------------------------------------------------------------------- 29 30def _random_key() -> bytes: 31 """32-byte random AES key.""" 32 return os.urandom(32) 33 34 35def _random_iv_key() -> bytes: 36 """32-byte random IV key (first 16 bytes used as IV).""" 37 return os.urandom(32) 38 39 40TUNNEL_DATA_SIZE = 1024 # Standard I2P tunnel data size 41 42 43# --------------------------------------------------------------------------- 44# TunnelLayerEncryptor / TunnelLayerDecryptor — single layer 45# --------------------------------------------------------------------------- 46 47class TestSingleLayerRoundtrip: 48 """encrypt_layer -> decrypt_layer must recover the original data.""" 49 50 def test_roundtrip_1024_bytes(self): 51 plaintext = os.urandom(TUNNEL_DATA_SIZE) 52 layer_key = _random_key() 53 iv_key = _random_iv_key() 54 55 ciphertext = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) 56 assert ciphertext != plaintext 57 assert len(ciphertext) == TUNNEL_DATA_SIZE 58 59 recovered = TunnelLayerDecryptor.decrypt_layer(ciphertext, layer_key, iv_key) 60 assert recovered == plaintext 61 62 def test_roundtrip_multiple_of_16(self): 63 """Any multiple-of-16 data should work, not just 1024.""" 64 for size in (16, 32, 256, 512): 65 plaintext = os.urandom(size) 66 layer_key = _random_key() 67 iv_key = _random_iv_key() 68 69 ct = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) 70 assert len(ct) == size 71 pt = TunnelLayerDecryptor.decrypt_layer(ct, layer_key, iv_key) 72 assert pt == plaintext 73 74 def test_different_keys_different_ciphertext(self): 75 """Different layer keys must produce different ciphertext.""" 76 plaintext = os.urandom(TUNNEL_DATA_SIZE) 77 iv_key = _random_iv_key() 78 key_a = _random_key() 79 key_b = _random_key() 80 81 ct_a = TunnelLayerEncryptor.encrypt_layer(plaintext, key_a, iv_key) 82 ct_b = TunnelLayerEncryptor.encrypt_layer(plaintext, key_b, iv_key) 83 assert ct_a != ct_b 84 85 86# --------------------------------------------------------------------------- 87# OutboundTunnelEncryptor — multi-hop 88# --------------------------------------------------------------------------- 89 90class TestOutboundTunnelEncryptor: 91 """Outbound encryption applies layers in reverse; each hop peels one.""" 92 93 def test_three_hop_peel(self): 94 """Simulate 3-hop outbound tunnel: encrypt, then each hop decrypts.""" 95 plaintext = os.urandom(TUNNEL_DATA_SIZE) 96 97 # 3 hops: gateway (0), middle (1), endpoint (2) 98 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] 99 100 encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys) 101 assert encrypted != plaintext 102 103 # Each hop decrypts one layer in forward order 104 data = encrypted 105 for layer_key, iv_key in hop_keys: 106 data = TunnelLayerDecryptor.decrypt_layer(data, layer_key, iv_key) 107 108 assert data == plaintext 109 110 def test_single_hop(self): 111 plaintext = os.urandom(TUNNEL_DATA_SIZE) 112 hop_keys = [(_random_key(), _random_iv_key())] 113 114 encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys) 115 decrypted = TunnelLayerDecryptor.decrypt_layer( 116 encrypted, hop_keys[0][0], hop_keys[0][1] 117 ) 118 assert decrypted == plaintext 119 120 121# --------------------------------------------------------------------------- 122# InboundTunnelDecryptor — multi-hop 123# --------------------------------------------------------------------------- 124 125class TestInboundTunnelDecryptor: 126 """Inbound decryption peels layers in reverse order to match hop encryption.""" 127 128 def test_three_hop_roundtrip(self): 129 """Each hop encrypts in forward order; endpoint decrypts in reverse. 130 131 In an inbound tunnel, each hop encrypts a layer as the message 132 passes through (hop 0 first, then hop 1, then hop 2). The 133 endpoint decrypts in reverse order (hop 2 first) to recover 134 the original plaintext. 135 """ 136 plaintext = os.urandom(TUNNEL_DATA_SIZE) 137 138 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] 139 140 # Simulate each hop encrypting in forward order 141 data = plaintext 142 for layer_key, iv_key in hop_keys: 143 data = TunnelLayerEncryptor.encrypt_layer(data, layer_key, iv_key) 144 145 # Tunnel endpoint decrypts all layers in forward order 146 recovered = InboundTunnelDecryptor.decrypt(data, hop_keys) 147 assert recovered == plaintext 148 149 def test_single_hop_inbound(self): 150 """Single hop: one encrypt + one decrypt is a simple roundtrip.""" 151 plaintext = os.urandom(TUNNEL_DATA_SIZE) 152 hop_keys = [(_random_key(), _random_iv_key())] 153 154 encrypted = TunnelLayerEncryptor.encrypt_layer( 155 plaintext, hop_keys[0][0], hop_keys[0][1] 156 ) 157 recovered = InboundTunnelDecryptor.decrypt(encrypted, hop_keys) 158 assert recovered == plaintext 159 160 161# --------------------------------------------------------------------------- 162# BuildRecordEncryptor — ElGamal 163# --------------------------------------------------------------------------- 164 165class TestBuildRecordEncryptor: 166 """Build record encrypt/decrypt roundtrip using ElGamal.""" 167 168 def test_roundtrip(self): 169 pub_key, priv_key = ElGamalEngine.generate_keypair() 170 171 # Create a 222-byte build record 172 record = BuildRecord( 173 receive_tunnel_id=12345, 174 our_ident=os.urandom(32), 175 next_tunnel_id=67890, 176 next_ident=os.urandom(32), 177 layer_key=os.urandom(32), 178 iv_key=os.urandom(32), 179 reply_key=os.urandom(32), 180 reply_iv=os.urandom(16), 181 is_gateway=True, 182 is_endpoint=False, 183 ) 184 record_bytes = record.to_bytes() 185 assert len(record_bytes) == BuildRecord.SIZE # 222 186 187 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pub_key) 188 assert len(encrypted) == 528 189 190 # Decrypt: strip padding and ElGamal-decrypt 191 # The first 514 bytes are the ElGamal ciphertext 192 decrypted = ElGamalEngine.decrypt(encrypted[:514], priv_key) 193 assert decrypted is not None 194 assert decrypted == record_bytes 195 196 def test_output_size(self): 197 pub_key, _ = ElGamalEngine.generate_keypair() 198 record_bytes = os.urandom(222) 199 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pub_key) 200 assert len(encrypted) == 528 201 202 203# --------------------------------------------------------------------------- 204# BuildReplyDecryptor 205# --------------------------------------------------------------------------- 206 207class TestBuildReplyDecryptor: 208 """Build reply AES encrypt→decrypt roundtrip.""" 209 210 def test_roundtrip(self): 211 reply_key = _random_key() 212 reply_iv = os.urandom(16) 213 # Reply record is typically 496 bytes, but must be multiple of 16 214 reply_data = os.urandom(496) 215 216 # Encrypt (simulating what the hop does) 217 encrypted = AESEngine.encrypt(reply_data, reply_key, reply_iv[:16]) 218 219 decrypted = BuildReplyDecryptor.decrypt_reply(encrypted, reply_key, reply_iv) 220 assert decrypted == reply_data 221 222 def test_different_keys_different_output(self): 223 reply_iv = os.urandom(16) 224 reply_data = os.urandom(496) 225 226 key_a = _random_key() 227 key_b = _random_key() 228 229 enc_a = AESEngine.encrypt(reply_data, key_a, reply_iv[:16]) 230 enc_b = AESEngine.encrypt(reply_data, key_b, reply_iv[:16]) 231 232 dec_a = BuildReplyDecryptor.decrypt_reply(enc_a, key_a, reply_iv) 233 dec_b = BuildReplyDecryptor.decrypt_reply(enc_b, key_b, reply_iv) 234 235 assert dec_a == dec_b == reply_data 236 assert enc_a != enc_b 237 238 239# --------------------------------------------------------------------------- 240# Data alignment 241# --------------------------------------------------------------------------- 242 243class TestDataAlignment: 244 """Standard tunnel data is 1024 bytes — verify it works end-to-end.""" 245 246 def test_1024_byte_outbound_inbound(self): 247 plaintext = os.urandom(1024) 248 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] 249 250 # Outbound: encrypt 251 encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys) 252 assert len(encrypted) == 1024 253 254 # Simulate each hop decrypting 255 data = encrypted 256 for lk, ik in hop_keys: 257 data = TunnelLayerDecryptor.decrypt_layer(data, lk, ik) 258 assert data == plaintext