Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import socket
3import gevent
4
5import pytest
6import mock
7
8from Crypt import CryptConnection
9from Connection import ConnectionServer
10from Config import config
11
12
13@pytest.mark.usefixtures("resetSettings")
14class TestConnection:
15 def testIpv6(self, file_server6):
16 assert ":" in file_server6.ip
17
18 client = ConnectionServer(file_server6.ip, 1545)
19 connection = client.getConnection(file_server6.ip, 1544)
20
21 assert connection.ping()
22
23 # Close connection
24 connection.close()
25 client.stop()
26 time.sleep(0.01)
27 assert len(file_server6.connections) == 0
28
29 # Should not able to reach on ipv4 ip
30 with pytest.raises(socket.error) as err:
31 client = ConnectionServer("127.0.0.1", 1545)
32 connection = client.getConnection("127.0.0.1", 1544)
33
34 def testSslConnection(self, file_server):
35 client = ConnectionServer(file_server.ip, 1545)
36 assert file_server != client
37
38 # Connect to myself
39 with mock.patch('Config.config.ip_local', return_value=[]): # SSL not used for local ips
40 connection = client.getConnection(file_server.ip, 1544)
41
42 assert len(file_server.connections) == 1
43 assert connection.handshake
44 assert connection.crypt
45
46
47 # Close connection
48 connection.close("Test ended")
49 client.stop()
50 time.sleep(0.1)
51 assert len(file_server.connections) == 0
52 assert file_server.num_incoming == 2 # One for file_server fixture, one for the test
53
54 def testRawConnection(self, file_server):
55 client = ConnectionServer(file_server.ip, 1545)
56 assert file_server != client
57
58 # Remove all supported crypto
59 crypt_supported_bk = CryptConnection.manager.crypt_supported
60 CryptConnection.manager.crypt_supported = []
61
62 with mock.patch('Config.config.ip_local', return_value=[]): # SSL not used for local ips
63 connection = client.getConnection(file_server.ip, 1544)
64 assert len(file_server.connections) == 1
65 assert not connection.crypt
66
67 # Close connection
68 connection.close()
69 client.stop()
70 time.sleep(0.01)
71 assert len(file_server.connections) == 0
72
73 # Reset supported crypts
74 CryptConnection.manager.crypt_supported = crypt_supported_bk
75
76 def testPing(self, file_server, site):
77 client = ConnectionServer(file_server.ip, 1545)
78 connection = client.getConnection(file_server.ip, 1544)
79
80 assert connection.ping()
81
82 connection.close()
83 client.stop()
84
85 def testGetConnection(self, file_server):
86 client = ConnectionServer(file_server.ip, 1545)
87 connection = client.getConnection(file_server.ip, 1544)
88
89 # Get connection by ip/port
90 connection2 = client.getConnection(file_server.ip, 1544)
91 assert connection == connection2
92
93 # Get connection by peerid
94 assert not client.getConnection(file_server.ip, 1544, peer_id="notexists", create=False)
95 connection2 = client.getConnection(file_server.ip, 1544, peer_id=connection.handshake["peer_id"], create=False)
96 assert connection2 == connection
97
98 connection.close()
99 client.stop()
100
101 def testFloodProtection(self, file_server):
102 whitelist = file_server.whitelist # Save for reset
103 file_server.whitelist = [] # Disable 127.0.0.1 whitelist
104 client = ConnectionServer(file_server.ip, 1545)
105
106 # Only allow 6 connection in 1 minute
107 for reconnect in range(6):
108 connection = client.getConnection(file_server.ip, 1544)
109 assert connection.handshake
110 connection.close()
111
112 # The 7. one will timeout
113 with pytest.raises(gevent.Timeout):
114 with gevent.Timeout(0.1):
115 connection = client.getConnection(file_server.ip, 1544)
116
117 # Reset whitelist
118 file_server.whitelist = whitelist