Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2
3import pytest
4import mock
5
6from File import FileServer
7from Crypt import CryptRsa
8from Config import config
9
10@pytest.mark.usefixtures("resetSettings")
11@pytest.mark.usefixtures("resetTempSettings")
12class TestTor:
13 def testDownload(self, tor_manager):
14 for retry in range(15):
15 time.sleep(1)
16 if tor_manager.enabled and tor_manager.conn:
17 break
18 assert tor_manager.enabled
19
20 def testManagerConnection(self, tor_manager):
21 assert "250-version" in tor_manager.request("GETINFO version")
22
23 def testAddOnion(self, tor_manager):
24 # Add
25 address = tor_manager.addOnion()
26 assert address
27 assert address in tor_manager.privatekeys
28
29 # Delete
30 assert tor_manager.delOnion(address)
31 assert address not in tor_manager.privatekeys
32
33 def testSignOnion(self, tor_manager):
34 address = tor_manager.addOnion()
35
36 # Sign
37 sign = CryptRsa.sign(b"hello", tor_manager.getPrivatekey(address))
38 assert len(sign) == 128
39
40 # Verify
41 publickey = CryptRsa.privatekeyToPublickey(tor_manager.getPrivatekey(address))
42 assert len(publickey) == 140
43 assert CryptRsa.verify(b"hello", publickey, sign)
44 assert not CryptRsa.verify(b"not hello", publickey, sign)
45
46 # Pub to address
47 assert CryptRsa.publickeyToOnion(publickey) == address
48
49 # Delete
50 tor_manager.delOnion(address)
51
52 @pytest.mark.slow
53 def testConnection(self, tor_manager, file_server, site, site_temp):
54 file_server.tor_manager.start_onions = True
55 address = file_server.tor_manager.getOnion(site.address)
56 assert address
57 print("Connecting to", address)
58 for retry in range(5): # Wait for hidden service creation
59 time.sleep(10)
60 try:
61 connection = file_server.getConnection(address + ".onion", 1544)
62 if connection:
63 break
64 except Exception as err:
65 continue
66 assert connection.handshake
67 assert not connection.handshake["peer_id"] # No peer_id for Tor connections
68
69 # Return the same connection without site specified
70 assert file_server.getConnection(address + ".onion", 1544) == connection
71 # No reuse for different site
72 assert file_server.getConnection(address + ".onion", 1544, site=site) != connection
73 assert file_server.getConnection(address + ".onion", 1544, site=site) == file_server.getConnection(address + ".onion", 1544, site=site)
74 site_temp.address = "1OTHERSITE"
75 assert file_server.getConnection(address + ".onion", 1544, site=site) != file_server.getConnection(address + ".onion", 1544, site=site_temp)
76
77 # Only allow to query from the locked site
78 file_server.sites[site.address] = site
79 connection_locked = file_server.getConnection(address + ".onion", 1544, site=site)
80 assert "body" in connection_locked.request("getFile", {"site": site.address, "inner_path": "content.json", "location": 0})
81 assert connection_locked.request("getFile", {"site": "1OTHERSITE", "inner_path": "content.json", "location": 0})["error"] == "Invalid site"
82
83 def testPex(self, file_server, site, site_temp):
84 # Register site to currently running fileserver
85 site.connection_server = file_server
86 file_server.sites[site.address] = site
87 # Create a new file server to emulate new peer connecting to our peer
88 file_server_temp = FileServer(file_server.ip, 1545)
89 site_temp.connection_server = file_server_temp
90 file_server_temp.sites[site_temp.address] = site_temp
91
92 # We will request peers from this
93 peer_source = site_temp.addPeer(file_server.ip, 1544)
94
95 # Get ip4 peers from source site
96 site.addPeer("1.2.3.4", 1555) # Add peer to source site
97 assert peer_source.pex(need_num=10) == 1
98 assert len(site_temp.peers) == 2
99 assert "1.2.3.4:1555" in site_temp.peers
100
101 # Get onion peers from source site
102 site.addPeer("bka4ht2bzxchy44r.onion", 1555)
103 assert "bka4ht2bzxchy44r.onion:1555" not in site_temp.peers
104
105 # Don't add onion peers if not supported
106 assert "onion" not in file_server_temp.supported_ip_types
107 assert peer_source.pex(need_num=10) == 0
108
109 file_server_temp.supported_ip_types.append("onion")
110 assert peer_source.pex(need_num=10) == 1
111
112 assert "bka4ht2bzxchy44r.onion:1555" in site_temp.peers
113
114 def testFindHash(self, tor_manager, file_server, site, site_temp):
115 file_server.ip_incoming = {} # Reset flood protection
116 file_server.sites[site.address] = site
117 file_server.tor_manager = tor_manager
118
119 client = FileServer(file_server.ip, 1545)
120 client.sites = {site_temp.address: site_temp}
121 site_temp.connection_server = client
122
123 # Add file_server as peer to client
124 peer_file_server = site_temp.addPeer(file_server.ip, 1544)
125
126 assert peer_file_server.findHashIds([1234]) == {}
127
128 # Add fake peer with requred hash
129 fake_peer_1 = site.addPeer("bka4ht2bzxchy44r.onion", 1544)
130 fake_peer_1.hashfield.append(1234)
131 fake_peer_2 = site.addPeer("1.2.3.5", 1545)
132 fake_peer_2.hashfield.append(1234)
133 fake_peer_2.hashfield.append(1235)
134 fake_peer_3 = site.addPeer("1.2.3.6", 1546)
135 fake_peer_3.hashfield.append(1235)
136 fake_peer_3.hashfield.append(1236)
137
138 res = peer_file_server.findHashIds([1234, 1235])
139
140 assert sorted(res[1234]) == [('1.2.3.5', 1545), ("bka4ht2bzxchy44r.onion", 1544)]
141 assert sorted(res[1235]) == [('1.2.3.5', 1545), ('1.2.3.6', 1546)]
142
143 # Test my address adding
144 site.content_manager.hashfield.append(1234)
145
146 res = peer_file_server.findHashIds([1234, 1235])
147 assert sorted(res[1234]) == [('1.2.3.5', 1545), (file_server.ip, 1544), ("bka4ht2bzxchy44r.onion", 1544)]
148 assert sorted(res[1235]) == [('1.2.3.5', 1545), ('1.2.3.6', 1546)]
149
150 def testSiteOnion(self, tor_manager):
151 with mock.patch.object(config, "tor", "always"):
152 assert tor_manager.getOnion("address1") != tor_manager.getOnion("address2")
153 assert tor_manager.getOnion("address1") == tor_manager.getOnion("address1")