A Python port of the Invisible Internet Project (I2P)
at main 56 lines 1.7 kB view raw
1"""Tests for OutboundCache and HashPair.""" 2 3from i2p_router.outbound_cache import HashPair, OutboundCache 4 5 6def test_hashpair_equality(): 7 a = HashPair(b"\x01" * 32, b"\x02" * 32) 8 b = HashPair(b"\x01" * 32, b"\x02" * 32) 9 assert a == b 10 assert hash(a) == hash(b) 11 12 13def test_hashpair_inequality(): 14 a = HashPair(b"\x01" * 32, b"\x02" * 32) 15 b = HashPair(b"\x02" * 32, b"\x01" * 32) 16 assert a != b 17 18 19def test_lease_cache_hit_miss(): 20 cache = OutboundCache() 21 pair = HashPair(b"\x01" * 32, b"\x02" * 32) 22 assert cache.get_cached_lease(pair) is None 23 lease = object() 24 cache.set_cached_lease(pair, lease) 25 assert cache.get_cached_lease(pair) is lease 26 27 28def test_tunnel_cache(): 29 cache = OutboundCache() 30 pair = HashPair(b"\x01" * 32, b"\x02" * 32) 31 tunnel = object() 32 cache.set_cached_tunnel(pair, tunnel) 33 assert cache.get_cached_tunnel(pair) is tunnel 34 cache.clear_cached_tunnel(pair) 35 assert cache.get_cached_tunnel(pair) is None 36 37 38def test_reply_rate_limiting(): 39 cache = OutboundCache() 40 pair = HashPair(b"\x01" * 32, b"\x02" * 32) 41 assert cache.should_request_reply(pair, 1000) is True 42 cache.record_reply_request(pair, 1000) 43 # Too soon 44 assert cache.should_request_reply(pair, 50_000) is False 45 # After interval 46 assert cache.should_request_reply(pair, 61_001) is True 47 48 49def test_leaseset_ack_tracking(): 50 cache = OutboundCache() 51 pair = HashPair(b"\x01" * 32, b"\x02" * 32) 52 assert cache.is_leaseset_acked(pair) is False 53 cache.record_leaseset_ack(pair, 5000) 54 assert cache.is_leaseset_acked(pair) is True 55 cache.clear_leaseset_ack(pair) 56 assert cache.is_leaseset_acked(pair) is False