A Python port of the Invisible Internet Project (I2P)
at main 49 lines 1.4 kB view raw
1"""Tests for ClientManager.""" 2 3from i2p_router.client_manager import ClientManager, ClientSession 4 5 6def test_register_and_deliver(): 7 received = [] 8 session = ClientSession( 9 dest_hash=b"\x01" * 32, 10 on_message=lambda p: received.append(p), 11 ) 12 mgr = ClientManager() 13 mgr.register_session(session) 14 assert mgr.message_received(b"\x01" * 32, b"hello") is True 15 assert received == [b"hello"] 16 17 18def test_deliver_to_unknown(): 19 mgr = ClientManager() 20 assert mgr.message_received(b"\x02" * 32, b"data") is False 21 22 23def test_unregister(): 24 mgr = ClientManager() 25 session = ClientSession(dest_hash=b"\x01" * 32, on_message=lambda p: None) 26 mgr.register_session(session) 27 assert mgr.is_local(b"\x01" * 32) is True 28 mgr.unregister_session(b"\x01" * 32) 29 assert mgr.is_local(b"\x01" * 32) is False 30 31 32def test_delivery_status_callback(): 33 statuses = [] 34 session = ClientSession( 35 dest_hash=b"\x01" * 32, 36 on_message=lambda p: None, 37 on_status=lambda mid, s: statuses.append((mid, s)), 38 ) 39 mgr = ClientManager() 40 mgr.register_session(session) 41 assert mgr.report_delivery_status(b"\x01" * 32, 42, 1) is True 42 assert statuses == [(42, 1)] 43 44 45def test_session_count(): 46 mgr = ClientManager() 47 assert mgr.session_count == 0 48 mgr.register_session(ClientSession(dest_hash=b"\x01" * 32, on_message=lambda p: None)) 49 assert mgr.session_count == 1