A Python port of the Invisible Internet Project (I2P)
1"""Tests for I2P socket abstraction."""
2
3import asyncio
4from unittest.mock import AsyncMock, MagicMock, patch
5
6import pytest
7
8
9class TestI2PSocket:
10 def test_create_socket(self):
11 from i2p_streaming.connection import StreamConnection, ConnectionState
12 from i2p_streaming.socket import I2PSocket
13 conn = StreamConnection()
14 conn.state = ConnectionState.ESTABLISHED
15 sock = I2PSocket(conn)
16 assert sock.is_connected
17
18 def test_not_connected_when_closed(self):
19 from i2p_streaming.connection import StreamConnection, ConnectionState
20 from i2p_streaming.socket import I2PSocket
21 conn = StreamConnection()
22 conn.state = ConnectionState.CLOSED
23 sock = I2PSocket(conn)
24 assert not sock.is_connected
25
26 def test_destination(self):
27 from i2p_streaming.connection import StreamConnection, ConnectionState
28 from i2p_streaming.socket import I2PSocket
29 conn = StreamConnection()
30 conn.state = ConnectionState.ESTABLISHED
31 conn.destination = b"remote_dest"
32 sock = I2PSocket(conn)
33 assert sock.destination == b"remote_dest"
34
35 @pytest.mark.asyncio
36 async def test_send(self):
37 from i2p_streaming.connection import StreamConnection, ConnectionState
38 from i2p_streaming.socket import I2PSocket
39 conn = StreamConnection()
40 conn.state = ConnectionState.ESTABLISHED
41 conn.destination = b"dest"
42 sock = I2PSocket(conn)
43 sent = await sock.send(b"hello")
44 assert sent == 5
45
46 @pytest.mark.asyncio
47 async def test_send_raises_when_not_connected(self):
48 from i2p_streaming.connection import StreamConnection, ConnectionState
49 from i2p_streaming.socket import I2PSocket
50 conn = StreamConnection()
51 conn.state = ConnectionState.CLOSED
52 sock = I2PSocket(conn)
53 with pytest.raises(ConnectionError):
54 await sock.send(b"hello")
55
56 @pytest.mark.asyncio
57 async def test_recv(self):
58 from i2p_streaming.connection import StreamConnection, ConnectionState
59 from i2p_streaming.socket import I2PSocket
60 conn = StreamConnection()
61 conn.state = ConnectionState.ESTABLISHED
62 conn.destination = b"dest"
63 sock = I2PSocket(conn)
64 # Write data to the recv buffer
65 sock._recv_buffer.extend(b"world")
66 data = await sock.recv(1024)
67 assert data == b"world"
68
69 @pytest.mark.asyncio
70 async def test_recv_max_bytes(self):
71 from i2p_streaming.connection import StreamConnection, ConnectionState
72 from i2p_streaming.socket import I2PSocket
73 conn = StreamConnection()
74 conn.state = ConnectionState.ESTABLISHED
75 conn.destination = b"dest"
76 sock = I2PSocket(conn)
77 sock._recv_buffer.extend(b"hello world")
78 data = await sock.recv(5)
79 assert data == b"hello"
80 assert len(sock._recv_buffer) == 6
81
82 @pytest.mark.asyncio
83 async def test_close(self):
84 from i2p_streaming.connection import StreamConnection, ConnectionState
85 from i2p_streaming.socket import I2PSocket
86 conn = StreamConnection()
87 conn.state = ConnectionState.ESTABLISHED
88 conn.destination = b"dest"
89 sock = I2PSocket(conn)
90 await sock.close()
91 assert not sock.is_connected
92
93
94class TestI2PServerSocket:
95 @pytest.mark.asyncio
96 async def test_accept(self):
97 from i2p_streaming.manager import ConnectionManager
98 from i2p_streaming.socket import I2PServerSocket, I2PSocket
99 from i2p_streaming.packet import StreamPacket, Flags
100 mgr = ConnectionManager()
101 server = I2PServerSocket(mgr)
102 # Simulate incoming SYN
103 pkt = StreamPacket(
104 send_id=100, recv_id=0, seq_num=0,
105 ack_through=0, flags=Flags.SYNCHRONIZE, payload=b""
106 )
107 mgr.receive_packet(pkt)
108 sock = await asyncio.wait_for(server.accept(), timeout=1.0)
109 assert isinstance(sock, I2PSocket)
110
111 @pytest.mark.asyncio
112 async def test_close(self):
113 from i2p_streaming.manager import ConnectionManager
114 from i2p_streaming.socket import I2PServerSocket
115 mgr = ConnectionManager()
116 server = I2PServerSocket(mgr)
117 await server.close()
118 assert server._closed
119
120
121class TestI2PSocketManager:
122 def test_create(self):
123 from i2p_streaming.socket import I2PSocketManager
124 mgr = I2PSocketManager()
125 assert mgr is not None
126
127 def test_create_with_options(self):
128 from i2p_streaming.options import StreamOptions
129 from i2p_streaming.socket import I2PSocketManager
130 opts = StreamOptions(max_window_size=32)
131 mgr = I2PSocketManager(options=opts)
132 assert mgr._manager._options.max_window_size == 32
133
134 @pytest.mark.asyncio
135 async def test_connect(self):
136 from i2p_streaming.socket import I2PSocketManager, I2PSocket
137 mgr = I2PSocketManager()
138 sock = await mgr.connect(b"dest1")
139 assert isinstance(sock, I2PSocket)
140
141 def test_get_server_socket(self):
142 from i2p_streaming.socket import I2PSocketManager, I2PServerSocket
143 mgr = I2PSocketManager()
144 server = mgr.get_server_socket()
145 assert isinstance(server, I2PServerSocket)
146
147 @pytest.mark.asyncio
148 async def test_destroy(self):
149 from i2p_streaming.socket import I2PSocketManager
150 mgr = I2PSocketManager()
151 await mgr.connect(b"dest1")
152 await mgr.destroy()
153 assert mgr._manager.active_count == 0