"""Tests for OutboundMessageDistributor.""" import time from i2p_router.outbound_message_distributor import OutboundMessageDistributor def test_distribute_to_known_peer(): dist = OutboundMessageDistributor(transport_manager=object()) assert dist.distribute(b"msg", b"\x01" * 32) is True assert dist.known_router_count == 1 def test_known_peer_not_throttled(): dist = OutboundMessageDistributor(transport_manager=object()) peer = b"\x01" * 32 # First call registers the peer dist.distribute(b"msg", peer) # Subsequent calls to same peer never throttled for _ in range(100): assert dist.distribute(b"msg", peer) is True def test_throttle_new_routers(): dist = OutboundMessageDistributor(transport_manager=object()) now = time.monotonic() # Fill up the window for i in range(48): peer = i.to_bytes(32, 'big') dist.distribute(b"msg", peer, now=now) # 49th new router should be throttled assert dist.distribute(b"msg", b"\xff" * 32, now=now) is False def test_no_transport_returns_false(): dist = OutboundMessageDistributor(transport_manager=None) assert dist.distribute(b"msg", b"\x01" * 32) is False def test_window_expires(): dist = OutboundMessageDistributor(transport_manager=object()) now = time.monotonic() for i in range(48): dist.distribute(b"msg", i.to_bytes(32, 'big'), now=now) # After window expires, new routers allowed again future = now + 31 assert dist.distribute(b"msg", b"\xff" * 32, now=future) is True