A Python port of the Invisible Internet Project (I2P)
at main 153 lines 5.6 kB view raw
1"""Tests for I2P socket abstraction.""" 2 3import asyncio 4from unittest.mock import AsyncMock, MagicMock, patch 5 6import pytest 7 8 9class TestI2PSocket: 10 def test_create_socket(self): 11 from i2p_streaming.connection import StreamConnection, ConnectionState 12 from i2p_streaming.socket import I2PSocket 13 conn = StreamConnection() 14 conn.state = ConnectionState.ESTABLISHED 15 sock = I2PSocket(conn) 16 assert sock.is_connected 17 18 def test_not_connected_when_closed(self): 19 from i2p_streaming.connection import StreamConnection, ConnectionState 20 from i2p_streaming.socket import I2PSocket 21 conn = StreamConnection() 22 conn.state = ConnectionState.CLOSED 23 sock = I2PSocket(conn) 24 assert not sock.is_connected 25 26 def test_destination(self): 27 from i2p_streaming.connection import StreamConnection, ConnectionState 28 from i2p_streaming.socket import I2PSocket 29 conn = StreamConnection() 30 conn.state = ConnectionState.ESTABLISHED 31 conn.destination = b"remote_dest" 32 sock = I2PSocket(conn) 33 assert sock.destination == b"remote_dest" 34 35 @pytest.mark.asyncio 36 async def test_send(self): 37 from i2p_streaming.connection import StreamConnection, ConnectionState 38 from i2p_streaming.socket import I2PSocket 39 conn = StreamConnection() 40 conn.state = ConnectionState.ESTABLISHED 41 conn.destination = b"dest" 42 sock = I2PSocket(conn) 43 sent = await sock.send(b"hello") 44 assert sent == 5 45 46 @pytest.mark.asyncio 47 async def test_send_raises_when_not_connected(self): 48 from i2p_streaming.connection import StreamConnection, ConnectionState 49 from i2p_streaming.socket import I2PSocket 50 conn = StreamConnection() 51 conn.state = ConnectionState.CLOSED 52 sock = I2PSocket(conn) 53 with pytest.raises(ConnectionError): 54 await sock.send(b"hello") 55 56 @pytest.mark.asyncio 57 async def test_recv(self): 58 from i2p_streaming.connection import StreamConnection, ConnectionState 59 from i2p_streaming.socket import I2PSocket 60 conn = StreamConnection() 61 conn.state = ConnectionState.ESTABLISHED 62 conn.destination = b"dest" 63 sock = I2PSocket(conn) 64 # Write data to the recv buffer 65 sock._recv_buffer.extend(b"world") 66 data = await sock.recv(1024) 67 assert data == b"world" 68 69 @pytest.mark.asyncio 70 async def test_recv_max_bytes(self): 71 from i2p_streaming.connection import StreamConnection, ConnectionState 72 from i2p_streaming.socket import I2PSocket 73 conn = StreamConnection() 74 conn.state = ConnectionState.ESTABLISHED 75 conn.destination = b"dest" 76 sock = I2PSocket(conn) 77 sock._recv_buffer.extend(b"hello world") 78 data = await sock.recv(5) 79 assert data == b"hello" 80 assert len(sock._recv_buffer) == 6 81 82 @pytest.mark.asyncio 83 async def test_close(self): 84 from i2p_streaming.connection import StreamConnection, ConnectionState 85 from i2p_streaming.socket import I2PSocket 86 conn = StreamConnection() 87 conn.state = ConnectionState.ESTABLISHED 88 conn.destination = b"dest" 89 sock = I2PSocket(conn) 90 await sock.close() 91 assert not sock.is_connected 92 93 94class TestI2PServerSocket: 95 @pytest.mark.asyncio 96 async def test_accept(self): 97 from i2p_streaming.manager import ConnectionManager 98 from i2p_streaming.socket import I2PServerSocket, I2PSocket 99 from i2p_streaming.packet import StreamPacket, Flags 100 mgr = ConnectionManager() 101 server = I2PServerSocket(mgr) 102 # Simulate incoming SYN 103 pkt = StreamPacket( 104 send_id=100, recv_id=0, seq_num=0, 105 ack_through=0, flags=Flags.SYNCHRONIZE, payload=b"" 106 ) 107 mgr.receive_packet(pkt) 108 sock = await asyncio.wait_for(server.accept(), timeout=1.0) 109 assert isinstance(sock, I2PSocket) 110 111 @pytest.mark.asyncio 112 async def test_close(self): 113 from i2p_streaming.manager import ConnectionManager 114 from i2p_streaming.socket import I2PServerSocket 115 mgr = ConnectionManager() 116 server = I2PServerSocket(mgr) 117 await server.close() 118 assert server._closed 119 120 121class TestI2PSocketManager: 122 def test_create(self): 123 from i2p_streaming.socket import I2PSocketManager 124 mgr = I2PSocketManager() 125 assert mgr is not None 126 127 def test_create_with_options(self): 128 from i2p_streaming.options import StreamOptions 129 from i2p_streaming.socket import I2PSocketManager 130 opts = StreamOptions(max_window_size=32) 131 mgr = I2PSocketManager(options=opts) 132 assert mgr._manager._options.max_window_size == 32 133 134 @pytest.mark.asyncio 135 async def test_connect(self): 136 from i2p_streaming.socket import I2PSocketManager, I2PSocket 137 mgr = I2PSocketManager() 138 sock = await mgr.connect(b"dest1") 139 assert isinstance(sock, I2PSocket) 140 141 def test_get_server_socket(self): 142 from i2p_streaming.socket import I2PSocketManager, I2PServerSocket 143 mgr = I2PSocketManager() 144 server = mgr.get_server_socket() 145 assert isinstance(server, I2PServerSocket) 146 147 @pytest.mark.asyncio 148 async def test_destroy(self): 149 from i2p_streaming.socket import I2PSocketManager 150 mgr = I2PSocketManager() 151 await mgr.connect(b"dest1") 152 await mgr.destroy() 153 assert mgr._manager.active_count == 0