A Python port of the Invisible Internet Project (I2P)
1"""Tests for OutboundMessageDistributor."""
2
3import time
4from i2p_router.outbound_message_distributor import OutboundMessageDistributor
5
6
7def test_distribute_to_known_peer():
8 dist = OutboundMessageDistributor(transport_manager=object())
9 assert dist.distribute(b"msg", b"\x01" * 32) is True
10 assert dist.known_router_count == 1
11
12
13def test_known_peer_not_throttled():
14 dist = OutboundMessageDistributor(transport_manager=object())
15 peer = b"\x01" * 32
16 # First call registers the peer
17 dist.distribute(b"msg", peer)
18 # Subsequent calls to same peer never throttled
19 for _ in range(100):
20 assert dist.distribute(b"msg", peer) is True
21
22
23def test_throttle_new_routers():
24 dist = OutboundMessageDistributor(transport_manager=object())
25 now = time.monotonic()
26 # Fill up the window
27 for i in range(48):
28 peer = i.to_bytes(32, 'big')
29 dist.distribute(b"msg", peer, now=now)
30 # 49th new router should be throttled
31 assert dist.distribute(b"msg", b"\xff" * 32, now=now) is False
32
33
34def test_no_transport_returns_false():
35 dist = OutboundMessageDistributor(transport_manager=None)
36 assert dist.distribute(b"msg", b"\x01" * 32) is False
37
38
39def test_window_expires():
40 dist = OutboundMessageDistributor(transport_manager=object())
41 now = time.monotonic()
42 for i in range(48):
43 dist.distribute(b"msg", i.to_bytes(32, 'big'), now=now)
44 # After window expires, new routers allowed again
45 future = now + 31
46 assert dist.distribute(b"msg", b"\xff" * 32, now=future) is True