Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import os
2import sys
3import urllib.request
4import time
5import logging
6import json
7import shutil
8import gc
9import datetime
10import atexit
11import threading
12import socket
13
14import pytest
15import mock
16
17import gevent
18if "libev" not in str(gevent.config.loop):
19 # Workaround for random crash when libuv used with threads
20 gevent.config.loop = "libev-cext"
21
22import gevent.event
23from gevent import monkey
24monkey.patch_all(thread=False, subprocess=False)
25
26atexit_register = atexit.register
27atexit.register = lambda func: "" # Don't register shutdown functions to avoid IO error on exit
28
29def pytest_addoption(parser):
30 parser.addoption("--slow", action='store_true', default=False, help="Also run slow tests")
31
32
33def pytest_collection_modifyitems(config, items):
34 if config.getoption("--slow"):
35 # --runslow given in cli: do not skip slow tests
36 return
37 skip_slow = pytest.mark.skip(reason="need --slow option to run")
38 for item in items:
39 if "slow" in item.keywords:
40 item.add_marker(skip_slow)
41
42# Config
43if sys.platform == "win32":
44 CHROMEDRIVER_PATH = "tools/chrome/chromedriver.exe"
45else:
46 CHROMEDRIVER_PATH = "chromedriver"
47SITE_URL = "http://127.0.0.1:43110"
48
49TEST_DATA_PATH = 'src/Test/testdata'
50sys.path.insert(0, os.path.abspath(os.path.dirname(__file__) + "/../lib")) # External modules directory
51sys.path.insert(0, os.path.abspath(os.path.dirname(__file__) + "/..")) # Imports relative to src dir
52
53from Config import config
54config.argv = ["none"] # Dont pass any argv to config parser
55config.parse(silent=True, parse_config=False) # Plugins need to access the configuration
56config.action = "test"
57
58# Load plugins
59from Plugin import PluginManager
60
61config.data_dir = TEST_DATA_PATH # Use test data for unittests
62config.debug = True
63
64os.chdir(os.path.abspath(os.path.dirname(__file__) + "/../..")) # Set working dir
65
66all_loaded = PluginManager.plugin_manager.loadPlugins()
67assert all_loaded, "Not all plugin loaded successfully"
68
69config.loadPlugins()
70config.parse(parse_config=False) # Parse again to add plugin configuration options
71
72config.action = "test"
73config.debug = True
74config.debug_socket = True # Use test data for unittests
75config.verbose = True # Use test data for unittests
76config.tor = "disable" # Don't start Tor client
77config.trackers = []
78config.data_dir = TEST_DATA_PATH # Use test data for unittests
79if "ZERONET_LOG_DIR" in os.environ:
80 config.log_dir = os.environ["ZERONET_LOG_DIR"]
81config.initLogging(console_logging=False)
82
83# Set custom formatter with realative time format (via: https://stackoverflow.com/questions/31521859/python-logging-module-time-since-last-log)
84time_start = time.time()
85class TimeFilter(logging.Filter):
86 def __init__(self, *args, **kwargs):
87 self.time_last = time.time()
88 self.main_thread_id = threading.current_thread().ident
89 super().__init__(*args, **kwargs)
90
91 def filter(self, record):
92 if threading.current_thread().ident != self.main_thread_id:
93 record.thread_marker = "T"
94 record.thread_title = "(Thread#%s)" % self.main_thread_id
95 else:
96 record.thread_marker = " "
97 record.thread_title = ""
98
99 since_last = time.time() - self.time_last
100 if since_last > 0.1:
101 line_marker = "!"
102 elif since_last > 0.02:
103 line_marker = "*"
104 elif since_last > 0.01:
105 line_marker = "-"
106 else:
107 line_marker = " "
108
109 since_start = time.time() - time_start
110 record.since_start = "%s%.3fs" % (line_marker, since_start)
111
112 self.time_last = time.time()
113 return True
114
115log = logging.getLogger()
116fmt = logging.Formatter(fmt='%(since_start)s %(thread_marker)s %(levelname)-8s %(name)s %(message)s %(thread_title)s')
117[hndl.addFilter(TimeFilter()) for hndl in log.handlers]
118[hndl.setFormatter(fmt) for hndl in log.handlers]
119
120from Site.Site import Site
121from Site import SiteManager
122from User import UserManager
123from File import FileServer
124from Connection import ConnectionServer
125from Crypt import CryptConnection
126from Crypt import CryptBitcoin
127from Ui import UiWebsocket
128from Tor import TorManager
129from Content import ContentDb
130from util import RateLimit
131from Db import Db
132from Debug import Debug
133
134gevent.get_hub().NOT_ERROR += (Debug.Notify,)
135
136def cleanup():
137 Db.dbCloseAll()
138 for dir_path in [config.data_dir, config.data_dir + "-temp"]:
139 if os.path.isdir(dir_path):
140 for file_name in os.listdir(dir_path):
141 ext = file_name.rsplit(".", 1)[-1]
142 if ext not in ["csr", "pem", "srl", "db", "json", "tmp"]:
143 continue
144 file_path = dir_path + "/" + file_name
145 if os.path.isfile(file_path):
146 os.unlink(file_path)
147
148atexit_register(cleanup)
149
150@pytest.fixture(scope="session")
151def resetSettings(request):
152 open("%s/sites.json" % config.data_dir, "w").write("{}")
153 open("%s/filters.json" % config.data_dir, "w").write("{}")
154 open("%s/users.json" % config.data_dir, "w").write("""
155 {
156 "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc": {
157 "certs": {},
158 "master_seed": "024bceac1105483d66585d8a60eaf20aa8c3254b0f266e0d626ddb6114e2949a",
159 "sites": {}
160 }
161 }
162 """)
163
164
165@pytest.fixture(scope="session")
166def resetTempSettings(request):
167 data_dir_temp = config.data_dir + "-temp"
168 if not os.path.isdir(data_dir_temp):
169 os.mkdir(data_dir_temp)
170 open("%s/sites.json" % data_dir_temp, "w").write("{}")
171 open("%s/filters.json" % data_dir_temp, "w").write("{}")
172 open("%s/users.json" % data_dir_temp, "w").write("""
173 {
174 "15E5rhcAUD69WbiYsYARh4YHJ4sLm2JEyc": {
175 "certs": {},
176 "master_seed": "024bceac1105483d66585d8a60eaf20aa8c3254b0f266e0d626ddb6114e2949a",
177 "sites": {}
178 }
179 }
180 """)
181
182 def cleanup():
183 os.unlink("%s/sites.json" % data_dir_temp)
184 os.unlink("%s/users.json" % data_dir_temp)
185 os.unlink("%s/filters.json" % data_dir_temp)
186 request.addfinalizer(cleanup)
187
188
189@pytest.fixture()
190def site(request):
191 threads_before = [obj for obj in gc.get_objects() if isinstance(obj, gevent.Greenlet)]
192 # Reset ratelimit
193 RateLimit.queue_db = {}
194 RateLimit.called_db = {}
195
196 site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
197
198 # Always use original data
199 assert "1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT" in site.storage.getPath("") # Make sure we dont delete everything
200 shutil.rmtree(site.storage.getPath(""), True)
201 shutil.copytree(site.storage.getPath("") + "-original", site.storage.getPath(""))
202
203 # Add to site manager
204 SiteManager.site_manager.get("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
205 site.announce = mock.MagicMock(return_value=True) # Don't try to find peers from the net
206
207 def cleanup():
208 site.delete()
209 site.content_manager.contents.db.close("Test cleanup")
210 site.content_manager.contents.db.timer_check_optional.kill()
211 SiteManager.site_manager.sites.clear()
212 db_path = "%s/content.db" % config.data_dir
213 os.unlink(db_path)
214 del ContentDb.content_dbs[db_path]
215 gevent.killall([obj for obj in gc.get_objects() if isinstance(obj, gevent.Greenlet) and obj not in threads_before])
216 request.addfinalizer(cleanup)
217
218 site.greenlet_manager.stopGreenlets()
219 site = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT") # Create new Site object to load content.json files
220 if not SiteManager.site_manager.sites:
221 SiteManager.site_manager.sites = {}
222 SiteManager.site_manager.sites["1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT"] = site
223 site.settings["serving"] = True
224 return site
225
226
227@pytest.fixture()
228def site_temp(request):
229 threads_before = [obj for obj in gc.get_objects() if isinstance(obj, gevent.Greenlet)]
230 with mock.patch("Config.config.data_dir", config.data_dir + "-temp"):
231 site_temp = Site("1TeSTvb4w2PWE81S2rEELgmX2GCCExQGT")
232 site_temp.settings["serving"] = True
233 site_temp.announce = mock.MagicMock(return_value=True) # Don't try to find peers from the net
234
235 def cleanup():
236 site_temp.delete()
237 site_temp.content_manager.contents.db.close("Test cleanup")
238 site_temp.content_manager.contents.db.timer_check_optional.kill()
239 db_path = "%s-temp/content.db" % config.data_dir
240 os.unlink(db_path)
241 del ContentDb.content_dbs[db_path]
242 gevent.killall([obj for obj in gc.get_objects() if isinstance(obj, gevent.Greenlet) and obj not in threads_before])
243 request.addfinalizer(cleanup)
244 site_temp.log = logging.getLogger("Temp:%s" % site_temp.address_short)
245 return site_temp
246
247
248@pytest.fixture(scope="session")
249def user():
250 user = UserManager.user_manager.get()
251 if not user:
252 user = UserManager.user_manager.create()
253 user.sites = {} # Reset user data
254 return user
255
256
257@pytest.fixture(scope="session")
258def browser(request):
259 try:
260 from selenium import webdriver
261 print("Starting chromedriver...")
262 options = webdriver.chrome.options.Options()
263 options.add_argument("--headless")
264 options.add_argument("--window-size=1920x1080")
265 options.add_argument("--log-level=1")
266 browser = webdriver.Chrome(executable_path=CHROMEDRIVER_PATH, service_log_path=os.path.devnull, options=options)
267
268 def quit():
269 browser.quit()
270 request.addfinalizer(quit)
271 except Exception as err:
272 raise pytest.skip("Test requires selenium + chromedriver: %s" % err)
273 return browser
274
275
276@pytest.fixture(scope="session")
277def site_url():
278 try:
279 urllib.request.urlopen(SITE_URL).read()
280 except Exception as err:
281 raise pytest.skip("Test requires zeronet client running: %s" % err)
282 return SITE_URL
283
284
285@pytest.fixture(params=['ipv4', 'ipv6'])
286def file_server(request):
287 if request.param == "ipv4":
288 return request.getfixturevalue("file_server4")
289 else:
290 return request.getfixturevalue("file_server6")
291
292
293@pytest.fixture
294def file_server4(request):
295 time.sleep(0.1)
296 file_server = FileServer("127.0.0.1", 1544)
297 file_server.ip_external = "1.2.3.4" # Fake external ip
298
299 def listen():
300 ConnectionServer.start(file_server)
301 ConnectionServer.listen(file_server)
302
303 gevent.spawn(listen)
304 # Wait for port opening
305 for retry in range(10):
306 time.sleep(0.1) # Port opening
307 try:
308 conn = file_server.getConnection("127.0.0.1", 1544)
309 conn.close()
310 break
311 except Exception as err:
312 print("FileServer6 startup error", Debug.formatException(err))
313 assert file_server.running
314 file_server.ip_incoming = {} # Reset flood protection
315
316 def stop():
317 file_server.stop()
318 request.addfinalizer(stop)
319 return file_server
320
321
322@pytest.fixture
323def file_server6(request):
324 try:
325 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
326 sock.connect(("::1", 80, 1, 1))
327 has_ipv6 = True
328 except OSError:
329 has_ipv6 = False
330 if not has_ipv6:
331 pytest.skip("Ipv6 not supported")
332
333
334 time.sleep(0.1)
335 file_server6 = FileServer("::1", 1544)
336 file_server6.ip_external = 'fca5:95d6:bfde:d902:8951:276e:1111:a22c' # Fake external ip
337
338 def listen():
339 ConnectionServer.start(file_server6)
340 ConnectionServer.listen(file_server6)
341
342 gevent.spawn(listen)
343 # Wait for port opening
344 for retry in range(10):
345 time.sleep(0.1) # Port opening
346 try:
347 conn = file_server6.getConnection("::1", 1544)
348 conn.close()
349 break
350 except Exception as err:
351 print("FileServer6 startup error", Debug.formatException(err))
352 assert file_server6.running
353 file_server6.ip_incoming = {} # Reset flood protection
354
355 def stop():
356 file_server6.stop()
357 request.addfinalizer(stop)
358 return file_server6
359
360
361@pytest.fixture()
362def ui_websocket(site, user):
363 class WsMock:
364 def __init__(self):
365 self.result = gevent.event.AsyncResult()
366
367 def send(self, data):
368 logging.debug("WsMock: Set result (data: %s) called by %s" % (data, Debug.formatStack()))
369 self.result.set(json.loads(data)["result"])
370
371 def getResult(self):
372 logging.debug("WsMock: Get result")
373 back = self.result.get()
374 logging.debug("WsMock: Got result (data: %s)" % back)
375 self.result = gevent.event.AsyncResult()
376 return back
377
378 ws_mock = WsMock()
379 ui_websocket = UiWebsocket(ws_mock, site, None, user, None)
380
381 def testAction(action, *args, **kwargs):
382 ui_websocket.handleRequest({"id": 0, "cmd": action, "params": list(args) if args else kwargs})
383 return ui_websocket.ws.getResult()
384
385 ui_websocket.testAction = testAction
386 return ui_websocket
387
388
389@pytest.fixture(scope="session")
390def tor_manager():
391 try:
392 tor_manager = TorManager(fileserver_port=1544)
393 tor_manager.start()
394 assert tor_manager.conn is not None
395 tor_manager.startOnions()
396 except Exception as err:
397 raise pytest.skip("Test requires Tor with ControlPort: %s, %s" % (config.tor_controller, err))
398 return tor_manager
399
400
401@pytest.fixture()
402def db(request):
403 db_path = "%s/zeronet.db" % config.data_dir
404 schema = {
405 "db_name": "TestDb",
406 "db_file": "%s/zeronet.db" % config.data_dir,
407 "maps": {
408 "data.json": {
409 "to_table": [
410 "test",
411 {"node": "test", "table": "test_importfilter", "import_cols": ["test_id", "title"]}
412 ]
413 }
414 },
415 "tables": {
416 "test": {
417 "cols": [
418 ["test_id", "INTEGER"],
419 ["title", "TEXT"],
420 ["json_id", "INTEGER REFERENCES json (json_id)"]
421 ],
422 "indexes": ["CREATE UNIQUE INDEX test_id ON test(test_id)"],
423 "schema_changed": 1426195822
424 },
425 "test_importfilter": {
426 "cols": [
427 ["test_id", "INTEGER"],
428 ["title", "TEXT"],
429 ["json_id", "INTEGER REFERENCES json (json_id)"]
430 ],
431 "indexes": ["CREATE UNIQUE INDEX test_importfilter_id ON test_importfilter(test_id)"],
432 "schema_changed": 1426195822
433 }
434 }
435 }
436
437 if os.path.isfile(db_path):
438 os.unlink(db_path)
439 db = Db.Db(schema, db_path)
440 db.checkTables()
441
442 def stop():
443 db.close("Test db cleanup")
444 os.unlink(db_path)
445
446 request.addfinalizer(stop)
447 return db
448
449
450@pytest.fixture(params=["sslcrypto", "sslcrypto_fallback", "libsecp256k1"])
451def crypt_bitcoin_lib(request, monkeypatch):
452 monkeypatch.setattr(CryptBitcoin, "lib_verify_best", request.param)
453 CryptBitcoin.loadLib(request.param)
454 return CryptBitcoin
455
456@pytest.fixture(scope='function', autouse=True)
457def logCaseStart(request):
458 global time_start
459 time_start = time.time()
460 logging.debug("---- Start test case: %s ----" % request._pyfuncitem)
461 yield None # Wait until all test done
462
463
464# Workaround for pytest bug when logging in atexit/post-fixture handlers (I/O operation on closed file)
465def workaroundPytestLogError():
466 import _pytest.capture
467 write_original = _pytest.capture.EncodedFile.write
468
469 def write_patched(obj, *args, **kwargs):
470 try:
471 write_original(obj, *args, **kwargs)
472 except ValueError as err:
473 if str(err) == "I/O operation on closed file":
474 pass
475 else:
476 raise err
477
478 def flush_patched(obj, *args, **kwargs):
479 try:
480 obj.buffer.flush(*args, **kwargs)
481 except ValueError as err:
482 if str(err).startswith("I/O operation on closed file"):
483 pass
484 else:
485 raise err
486
487 _pytest.capture.EncodedFile.write = write_patched
488 _pytest.capture.EncodedFile.flush = flush_patched
489
490
491workaroundPytestLogError()
492
493@pytest.fixture(scope='session', autouse=True)
494def disableLog():
495 yield None # Wait until all test done
496 logging.getLogger('').setLevel(logging.getLevelName(logging.CRITICAL))
497