A Python port of the Invisible Internet Project (I2P)
at main 89 lines 3.2 kB view raw
1"""Tests for I2CP message handler/dispatcher.""" 2 3import os 4 5import pytest 6 7 8class TestMessageHandler: 9 def test_register_and_dispatch(self): 10 from i2p_client.handler import MessageHandler 11 from i2p_client.i2cp_messages import SessionStatusMessage 12 results = [] 13 handler = MessageHandler() 14 handler.register(SessionStatusMessage.TYPE, lambda msg: results.append(msg)) 15 msg = SessionStatusMessage(session_id=1, status=0) 16 handler.dispatch(msg) 17 assert len(results) == 1 18 assert results[0].session_id == 1 19 20 def test_dispatch_unregistered_type(self): 21 from i2p_client.handler import MessageHandler 22 from i2p_client.i2cp_messages import GetDateMessage 23 handler = MessageHandler() 24 # Should not raise, just ignore 25 handler.dispatch(GetDateMessage(version="0.9.62")) 26 27 def test_multiple_handlers_same_type(self): 28 from i2p_client.handler import MessageHandler 29 from i2p_client.i2cp_messages import SetDateMessage 30 r1, r2 = [], [] 31 handler = MessageHandler() 32 handler.register(SetDateMessage.TYPE, lambda msg: r1.append(msg)) 33 handler.register(SetDateMessage.TYPE, lambda msg: r2.append(msg)) 34 handler.dispatch(SetDateMessage(date_ms=1000, version="0.9")) 35 assert len(r1) == 1 36 assert len(r2) == 1 37 38 def test_dispatch_from_wire(self): 39 from i2p_client.handler import MessageHandler 40 from i2p_client.i2cp_messages import I2CPMessage, DestroySessionMessage 41 results = [] 42 handler = MessageHandler() 43 handler.register(DestroySessionMessage.TYPE, lambda msg: results.append(msg)) 44 msg = DestroySessionMessage(session_id=99) 45 wire = msg.to_wire() 46 parsed = I2CPMessage.from_wire(wire) 47 handler.dispatch(parsed) 48 assert len(results) == 1 49 assert results[0].session_id == 99 50 51 52class TestRequestReplyManager: 53 def test_correlate_request_reply(self): 54 from i2p_client.handler import RequestReplyManager 55 mgr = RequestReplyManager() 56 future = mgr.register_request(42) 57 assert not future.done 58 mgr.handle_reply(42, "the reply") 59 assert future.done 60 assert future.result == "the reply" 61 62 def test_unknown_reply_ignored(self): 63 from i2p_client.handler import RequestReplyManager 64 mgr = RequestReplyManager() 65 # Should not raise 66 mgr.handle_reply(999, "orphan") 67 68 def test_multiple_requests(self): 69 from i2p_client.handler import RequestReplyManager 70 mgr = RequestReplyManager() 71 f1 = mgr.register_request(1) 72 f2 = mgr.register_request(2) 73 mgr.handle_reply(2, "second") 74 mgr.handle_reply(1, "first") 75 assert f1.result == "first" 76 assert f2.result == "second" 77 78 def test_cancel_request(self): 79 from i2p_client.handler import RequestReplyManager 80 mgr = RequestReplyManager() 81 future = mgr.register_request(10) 82 mgr.cancel(10) 83 assert future.cancelled 84 85 def test_cancel_nonexistent(self): 86 from i2p_client.handler import RequestReplyManager 87 mgr = RequestReplyManager() 88 # Should not raise 89 mgr.cancel(999)