A Python port of the Invisible Internet Project (I2P)
at main 46 lines 1.6 kB view raw
1"""Tests for OutboundMessageDistributor.""" 2 3import time 4from i2p_router.outbound_message_distributor import OutboundMessageDistributor 5 6 7def test_distribute_to_known_peer(): 8 dist = OutboundMessageDistributor(transport_manager=object()) 9 assert dist.distribute(b"msg", b"\x01" * 32) is True 10 assert dist.known_router_count == 1 11 12 13def test_known_peer_not_throttled(): 14 dist = OutboundMessageDistributor(transport_manager=object()) 15 peer = b"\x01" * 32 16 # First call registers the peer 17 dist.distribute(b"msg", peer) 18 # Subsequent calls to same peer never throttled 19 for _ in range(100): 20 assert dist.distribute(b"msg", peer) is True 21 22 23def test_throttle_new_routers(): 24 dist = OutboundMessageDistributor(transport_manager=object()) 25 now = time.monotonic() 26 # Fill up the window 27 for i in range(48): 28 peer = i.to_bytes(32, 'big') 29 dist.distribute(b"msg", peer, now=now) 30 # 49th new router should be throttled 31 assert dist.distribute(b"msg", b"\xff" * 32, now=now) is False 32 33 34def test_no_transport_returns_false(): 35 dist = OutboundMessageDistributor(transport_manager=None) 36 assert dist.distribute(b"msg", b"\x01" * 32) is False 37 38 39def test_window_expires(): 40 dist = OutboundMessageDistributor(transport_manager=object()) 41 now = time.monotonic() 42 for i in range(48): 43 dist.distribute(b"msg", i.to_bytes(32, 'big'), now=now) 44 # After window expires, new routers allowed again 45 future = now + 31 46 assert dist.distribute(b"msg", b"\xff" * 32, now=future) is True