Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 118 lines 4.0 kB view raw
1import time 2import socket 3import gevent 4 5import pytest 6import mock 7 8from Crypt import CryptConnection 9from Connection import ConnectionServer 10from Config import config 11 12 13@pytest.mark.usefixtures("resetSettings") 14class TestConnection: 15 def testIpv6(self, file_server6): 16 assert ":" in file_server6.ip 17 18 client = ConnectionServer(file_server6.ip, 1545) 19 connection = client.getConnection(file_server6.ip, 1544) 20 21 assert connection.ping() 22 23 # Close connection 24 connection.close() 25 client.stop() 26 time.sleep(0.01) 27 assert len(file_server6.connections) == 0 28 29 # Should not able to reach on ipv4 ip 30 with pytest.raises(socket.error) as err: 31 client = ConnectionServer("127.0.0.1", 1545) 32 connection = client.getConnection("127.0.0.1", 1544) 33 34 def testSslConnection(self, file_server): 35 client = ConnectionServer(file_server.ip, 1545) 36 assert file_server != client 37 38 # Connect to myself 39 with mock.patch('Config.config.ip_local', return_value=[]): # SSL not used for local ips 40 connection = client.getConnection(file_server.ip, 1544) 41 42 assert len(file_server.connections) == 1 43 assert connection.handshake 44 assert connection.crypt 45 46 47 # Close connection 48 connection.close("Test ended") 49 client.stop() 50 time.sleep(0.1) 51 assert len(file_server.connections) == 0 52 assert file_server.num_incoming == 2 # One for file_server fixture, one for the test 53 54 def testRawConnection(self, file_server): 55 client = ConnectionServer(file_server.ip, 1545) 56 assert file_server != client 57 58 # Remove all supported crypto 59 crypt_supported_bk = CryptConnection.manager.crypt_supported 60 CryptConnection.manager.crypt_supported = [] 61 62 with mock.patch('Config.config.ip_local', return_value=[]): # SSL not used for local ips 63 connection = client.getConnection(file_server.ip, 1544) 64 assert len(file_server.connections) == 1 65 assert not connection.crypt 66 67 # Close connection 68 connection.close() 69 client.stop() 70 time.sleep(0.01) 71 assert len(file_server.connections) == 0 72 73 # Reset supported crypts 74 CryptConnection.manager.crypt_supported = crypt_supported_bk 75 76 def testPing(self, file_server, site): 77 client = ConnectionServer(file_server.ip, 1545) 78 connection = client.getConnection(file_server.ip, 1544) 79 80 assert connection.ping() 81 82 connection.close() 83 client.stop() 84 85 def testGetConnection(self, file_server): 86 client = ConnectionServer(file_server.ip, 1545) 87 connection = client.getConnection(file_server.ip, 1544) 88 89 # Get connection by ip/port 90 connection2 = client.getConnection(file_server.ip, 1544) 91 assert connection == connection2 92 93 # Get connection by peerid 94 assert not client.getConnection(file_server.ip, 1544, peer_id="notexists", create=False) 95 connection2 = client.getConnection(file_server.ip, 1544, peer_id=connection.handshake["peer_id"], create=False) 96 assert connection2 == connection 97 98 connection.close() 99 client.stop() 100 101 def testFloodProtection(self, file_server): 102 whitelist = file_server.whitelist # Save for reset 103 file_server.whitelist = [] # Disable 127.0.0.1 whitelist 104 client = ConnectionServer(file_server.ip, 1545) 105 106 # Only allow 6 connection in 1 minute 107 for reconnect in range(6): 108 connection = client.getConnection(file_server.ip, 1544) 109 assert connection.handshake 110 connection.close() 111 112 # The 7. one will timeout 113 with pytest.raises(gevent.Timeout): 114 with gevent.Timeout(0.1): 115 connection = client.getConnection(file_server.ip, 1544) 116 117 # Reset whitelist 118 file_server.whitelist = whitelist