A Python port of the Invisible Internet Project (I2P)
1"""Tests for OutboundCache and HashPair."""
2
3from i2p_router.outbound_cache import HashPair, OutboundCache
4
5
6def test_hashpair_equality():
7 a = HashPair(b"\x01" * 32, b"\x02" * 32)
8 b = HashPair(b"\x01" * 32, b"\x02" * 32)
9 assert a == b
10 assert hash(a) == hash(b)
11
12
13def test_hashpair_inequality():
14 a = HashPair(b"\x01" * 32, b"\x02" * 32)
15 b = HashPair(b"\x02" * 32, b"\x01" * 32)
16 assert a != b
17
18
19def test_lease_cache_hit_miss():
20 cache = OutboundCache()
21 pair = HashPair(b"\x01" * 32, b"\x02" * 32)
22 assert cache.get_cached_lease(pair) is None
23 lease = object()
24 cache.set_cached_lease(pair, lease)
25 assert cache.get_cached_lease(pair) is lease
26
27
28def test_tunnel_cache():
29 cache = OutboundCache()
30 pair = HashPair(b"\x01" * 32, b"\x02" * 32)
31 tunnel = object()
32 cache.set_cached_tunnel(pair, tunnel)
33 assert cache.get_cached_tunnel(pair) is tunnel
34 cache.clear_cached_tunnel(pair)
35 assert cache.get_cached_tunnel(pair) is None
36
37
38def test_reply_rate_limiting():
39 cache = OutboundCache()
40 pair = HashPair(b"\x01" * 32, b"\x02" * 32)
41 assert cache.should_request_reply(pair, 1000) is True
42 cache.record_reply_request(pair, 1000)
43 # Too soon
44 assert cache.should_request_reply(pair, 50_000) is False
45 # After interval
46 assert cache.should_request_reply(pair, 61_001) is True
47
48
49def test_leaseset_ack_tracking():
50 cache = OutboundCache()
51 pair = HashPair(b"\x01" * 32, b"\x02" * 32)
52 assert cache.is_leaseset_acked(pair) is False
53 cache.record_leaseset_ack(pair, 5000)
54 assert cache.is_leaseset_acked(pair) is True
55 cache.clear_leaseset_ack(pair)
56 assert cache.is_leaseset_acked(pair) is False