A Python port of the Invisible Internet Project (I2P)
1"""Tests for ClientManager."""
2
3from i2p_router.client_manager import ClientManager, ClientSession
4
5
6def test_register_and_deliver():
7 received = []
8 session = ClientSession(
9 dest_hash=b"\x01" * 32,
10 on_message=lambda p: received.append(p),
11 )
12 mgr = ClientManager()
13 mgr.register_session(session)
14 assert mgr.message_received(b"\x01" * 32, b"hello") is True
15 assert received == [b"hello"]
16
17
18def test_deliver_to_unknown():
19 mgr = ClientManager()
20 assert mgr.message_received(b"\x02" * 32, b"data") is False
21
22
23def test_unregister():
24 mgr = ClientManager()
25 session = ClientSession(dest_hash=b"\x01" * 32, on_message=lambda p: None)
26 mgr.register_session(session)
27 assert mgr.is_local(b"\x01" * 32) is True
28 mgr.unregister_session(b"\x01" * 32)
29 assert mgr.is_local(b"\x01" * 32) is False
30
31
32def test_delivery_status_callback():
33 statuses = []
34 session = ClientSession(
35 dest_hash=b"\x01" * 32,
36 on_message=lambda p: None,
37 on_status=lambda mid, s: statuses.append((mid, s)),
38 )
39 mgr = ClientManager()
40 mgr.register_session(session)
41 assert mgr.report_delivery_status(b"\x01" * 32, 42, 1) is True
42 assert statuses == [(42, 1)]
43
44
45def test_session_count():
46 mgr = ClientManager()
47 assert mgr.session_count == 0
48 mgr.register_session(ClientSession(dest_hash=b"\x01" * 32, on_message=lambda p: None))
49 assert mgr.session_count == 1