"""Tests for I2P socket abstraction.""" import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest class TestI2PSocket: def test_create_socket(self): from i2p_streaming.connection import StreamConnection, ConnectionState from i2p_streaming.socket import I2PSocket conn = StreamConnection() conn.state = ConnectionState.ESTABLISHED sock = I2PSocket(conn) assert sock.is_connected def test_not_connected_when_closed(self): from i2p_streaming.connection import StreamConnection, ConnectionState from i2p_streaming.socket import I2PSocket conn = StreamConnection() conn.state = ConnectionState.CLOSED sock = I2PSocket(conn) assert not sock.is_connected def test_destination(self): from i2p_streaming.connection import StreamConnection, ConnectionState from i2p_streaming.socket import I2PSocket conn = StreamConnection() conn.state = ConnectionState.ESTABLISHED conn.destination = b"remote_dest" sock = I2PSocket(conn) assert sock.destination == b"remote_dest" @pytest.mark.asyncio async def test_send(self): from i2p_streaming.connection import StreamConnection, ConnectionState from i2p_streaming.socket import I2PSocket conn = StreamConnection() conn.state = ConnectionState.ESTABLISHED conn.destination = b"dest" sock = I2PSocket(conn) sent = await sock.send(b"hello") assert sent == 5 @pytest.mark.asyncio async def test_send_raises_when_not_connected(self): from i2p_streaming.connection import StreamConnection, ConnectionState from i2p_streaming.socket import I2PSocket conn = StreamConnection() conn.state = ConnectionState.CLOSED sock = I2PSocket(conn) with pytest.raises(ConnectionError): await sock.send(b"hello") @pytest.mark.asyncio async def test_recv(self): from i2p_streaming.connection import StreamConnection, ConnectionState from i2p_streaming.socket import I2PSocket conn = StreamConnection() conn.state = ConnectionState.ESTABLISHED conn.destination = b"dest" sock = I2PSocket(conn) # Write data to the recv buffer sock._recv_buffer.extend(b"world") data = await sock.recv(1024) assert data == b"world" @pytest.mark.asyncio async def test_recv_max_bytes(self): from i2p_streaming.connection import StreamConnection, ConnectionState from i2p_streaming.socket import I2PSocket conn = StreamConnection() conn.state = ConnectionState.ESTABLISHED conn.destination = b"dest" sock = I2PSocket(conn) sock._recv_buffer.extend(b"hello world") data = await sock.recv(5) assert data == b"hello" assert len(sock._recv_buffer) == 6 @pytest.mark.asyncio async def test_close(self): from i2p_streaming.connection import StreamConnection, ConnectionState from i2p_streaming.socket import I2PSocket conn = StreamConnection() conn.state = ConnectionState.ESTABLISHED conn.destination = b"dest" sock = I2PSocket(conn) await sock.close() assert not sock.is_connected class TestI2PServerSocket: @pytest.mark.asyncio async def test_accept(self): from i2p_streaming.manager import ConnectionManager from i2p_streaming.socket import I2PServerSocket, I2PSocket from i2p_streaming.packet import StreamPacket, Flags mgr = ConnectionManager() server = I2PServerSocket(mgr) # Simulate incoming SYN pkt = StreamPacket( send_id=100, recv_id=0, seq_num=0, ack_through=0, flags=Flags.SYNCHRONIZE, payload=b"" ) mgr.receive_packet(pkt) sock = await asyncio.wait_for(server.accept(), timeout=1.0) assert isinstance(sock, I2PSocket) @pytest.mark.asyncio async def test_close(self): from i2p_streaming.manager import ConnectionManager from i2p_streaming.socket import I2PServerSocket mgr = ConnectionManager() server = I2PServerSocket(mgr) await server.close() assert server._closed class TestI2PSocketManager: def test_create(self): from i2p_streaming.socket import I2PSocketManager mgr = I2PSocketManager() assert mgr is not None def test_create_with_options(self): from i2p_streaming.options import StreamOptions from i2p_streaming.socket import I2PSocketManager opts = StreamOptions(max_window_size=32) mgr = I2PSocketManager(options=opts) assert mgr._manager._options.max_window_size == 32 @pytest.mark.asyncio async def test_connect(self): from i2p_streaming.socket import I2PSocketManager, I2PSocket mgr = I2PSocketManager() sock = await mgr.connect(b"dest1") assert isinstance(sock, I2PSocket) def test_get_server_socket(self): from i2p_streaming.socket import I2PSocketManager, I2PServerSocket mgr = I2PSocketManager() server = mgr.get_server_socket() assert isinstance(server, I2PServerSocket) @pytest.mark.asyncio async def test_destroy(self): from i2p_streaming.socket import I2PSocketManager mgr = I2PSocketManager() await mgr.connect(b"dest1") await mgr.destroy() assert mgr._manager.active_count == 0