A Python port of the Invisible Internet Project (I2P)
1"""Tests for I2CP message handler/dispatcher."""
2
3import os
4
5import pytest
6
7
8class TestMessageHandler:
9 def test_register_and_dispatch(self):
10 from i2p_client.handler import MessageHandler
11 from i2p_client.i2cp_messages import SessionStatusMessage
12 results = []
13 handler = MessageHandler()
14 handler.register(SessionStatusMessage.TYPE, lambda msg: results.append(msg))
15 msg = SessionStatusMessage(session_id=1, status=0)
16 handler.dispatch(msg)
17 assert len(results) == 1
18 assert results[0].session_id == 1
19
20 def test_dispatch_unregistered_type(self):
21 from i2p_client.handler import MessageHandler
22 from i2p_client.i2cp_messages import GetDateMessage
23 handler = MessageHandler()
24 # Should not raise, just ignore
25 handler.dispatch(GetDateMessage(version="0.9.62"))
26
27 def test_multiple_handlers_same_type(self):
28 from i2p_client.handler import MessageHandler
29 from i2p_client.i2cp_messages import SetDateMessage
30 r1, r2 = [], []
31 handler = MessageHandler()
32 handler.register(SetDateMessage.TYPE, lambda msg: r1.append(msg))
33 handler.register(SetDateMessage.TYPE, lambda msg: r2.append(msg))
34 handler.dispatch(SetDateMessage(date_ms=1000, version="0.9"))
35 assert len(r1) == 1
36 assert len(r2) == 1
37
38 def test_dispatch_from_wire(self):
39 from i2p_client.handler import MessageHandler
40 from i2p_client.i2cp_messages import I2CPMessage, DestroySessionMessage
41 results = []
42 handler = MessageHandler()
43 handler.register(DestroySessionMessage.TYPE, lambda msg: results.append(msg))
44 msg = DestroySessionMessage(session_id=99)
45 wire = msg.to_wire()
46 parsed = I2CPMessage.from_wire(wire)
47 handler.dispatch(parsed)
48 assert len(results) == 1
49 assert results[0].session_id == 99
50
51
52class TestRequestReplyManager:
53 def test_correlate_request_reply(self):
54 from i2p_client.handler import RequestReplyManager
55 mgr = RequestReplyManager()
56 future = mgr.register_request(42)
57 assert not future.done
58 mgr.handle_reply(42, "the reply")
59 assert future.done
60 assert future.result == "the reply"
61
62 def test_unknown_reply_ignored(self):
63 from i2p_client.handler import RequestReplyManager
64 mgr = RequestReplyManager()
65 # Should not raise
66 mgr.handle_reply(999, "orphan")
67
68 def test_multiple_requests(self):
69 from i2p_client.handler import RequestReplyManager
70 mgr = RequestReplyManager()
71 f1 = mgr.register_request(1)
72 f2 = mgr.register_request(2)
73 mgr.handle_reply(2, "second")
74 mgr.handle_reply(1, "first")
75 assert f1.result == "first"
76 assert f2.result == "second"
77
78 def test_cancel_request(self):
79 from i2p_client.handler import RequestReplyManager
80 mgr = RequestReplyManager()
81 future = mgr.register_request(10)
82 mgr.cancel(10)
83 assert future.cancelled
84
85 def test_cancel_nonexistent(self):
86 from i2p_client.handler import RequestReplyManager
87 mgr = RequestReplyManager()
88 # Should not raise
89 mgr.cancel(999)