Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import io
3import binascii
4
5import pytest
6import mock
7
8from Connection import ConnectionServer
9from Content.ContentManager import VerifyError
10from File import FileServer
11from File import FileRequest
12from Worker import WorkerManager
13from Peer import Peer
14from Bigfile import BigfilePiecefield, BigfilePiecefieldPacked
15from Test import Spy
16from util import Msgpack
17
18
19@pytest.mark.usefixtures("resetSettings")
20@pytest.mark.usefixtures("resetTempSettings")
21class TestBigfile:
22 privatekey = "5KUh3PvNm5HUWoCfSUfcYvfQ2g3PrRNJWr6Q9eqdBGu23mtMntv"
23 piece_size = 1024 * 1024
24
25 def createBigfile(self, site, inner_path="data/optional.any.iso", pieces=10):
26 f = site.storage.open(inner_path, "w")
27 for i in range(pieces * 100):
28 f.write(("Test%s" % i).ljust(10, "-") * 1000)
29 f.close()
30 assert site.content_manager.sign("content.json", self.privatekey)
31 return inner_path
32
33 def testPiecemapCreate(self, site):
34 inner_path = self.createBigfile(site)
35 content = site.storage.loadJson("content.json")
36 assert "data/optional.any.iso" in content["files_optional"]
37 file_node = content["files_optional"][inner_path]
38 assert file_node["size"] == 10 * 1000 * 1000
39 assert file_node["sha512"] == "47a72cde3be80b4a829e7674f72b7c6878cf6a70b0c58c6aa6c17d7e9948daf6"
40 assert file_node["piecemap"] == inner_path + ".piecemap.msgpack"
41
42 piecemap = Msgpack.unpack(site.storage.open(file_node["piecemap"], "rb").read())["optional.any.iso"]
43 assert len(piecemap["sha512_pieces"]) == 10
44 assert piecemap["sha512_pieces"][0] != piecemap["sha512_pieces"][1]
45 assert binascii.hexlify(piecemap["sha512_pieces"][0]) == b"a73abad9992b3d0b672d0c2a292046695d31bebdcb1e150c8410bbe7c972eff3"
46
47 def testVerifyPiece(self, site):
48 inner_path = self.createBigfile(site)
49
50 # Verify all 10 piece
51 f = site.storage.open(inner_path, "rb")
52 for i in range(10):
53 piece = io.BytesIO(f.read(1024 * 1024))
54 piece.seek(0)
55 site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece)
56 f.close()
57
58 # Try to verify piece 0 with piece 1 hash
59 with pytest.raises(VerifyError) as err:
60 i = 1
61 f = site.storage.open(inner_path, "rb")
62 piece = io.BytesIO(f.read(1024 * 1024))
63 f.close()
64 site.content_manager.verifyPiece(inner_path, i * 1024 * 1024, piece)
65 assert "Invalid hash" in str(err.value)
66
67 def testSparseFile(self, site):
68 inner_path = "sparsefile"
69
70 # Create a 100MB sparse file
71 site.storage.createSparseFile(inner_path, 100 * 1024 * 1024)
72
73 # Write to file beginning
74 s = time.time()
75 f = site.storage.write("%s|%s-%s" % (inner_path, 0, 1024 * 1024), b"hellostart" * 1024)
76 time_write_start = time.time() - s
77
78 # Write to file end
79 s = time.time()
80 f = site.storage.write("%s|%s-%s" % (inner_path, 99 * 1024 * 1024, 99 * 1024 * 1024 + 1024 * 1024), b"helloend" * 1024)
81 time_write_end = time.time() - s
82
83 # Verify writes
84 f = site.storage.open(inner_path)
85 assert f.read(10) == b"hellostart"
86 f.seek(99 * 1024 * 1024)
87 assert f.read(8) == b"helloend"
88 f.close()
89
90 site.storage.delete(inner_path)
91
92 # Writing to end shold not take much longer, than writing to start
93 assert time_write_end <= max(0.1, time_write_start * 1.1)
94
95 def testRangedFileRequest(self, file_server, site, site_temp):
96 inner_path = self.createBigfile(site)
97
98 file_server.sites[site.address] = site
99 client = FileServer(file_server.ip, 1545)
100 client.sites[site_temp.address] = site_temp
101 site_temp.connection_server = client
102 connection = client.getConnection(file_server.ip, 1544)
103
104 # Add file_server as peer to client
105 peer_file_server = site_temp.addPeer(file_server.ip, 1544)
106
107 buff = peer_file_server.getFile(site_temp.address, "%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
108
109 assert len(buff.getvalue()) == 1 * 1024 * 1024 # Correct block size
110 assert buff.getvalue().startswith(b"Test524") # Correct data
111 buff.seek(0)
112 assert site.content_manager.verifyPiece(inner_path, 5 * 1024 * 1024, buff) # Correct hash
113
114 connection.close()
115 client.stop()
116
117 def testRangedFileDownload(self, file_server, site, site_temp):
118 inner_path = self.createBigfile(site)
119
120 # Init source server
121 site.connection_server = file_server
122 file_server.sites[site.address] = site
123
124 # Make sure the file and the piecemap in the optional hashfield
125 file_info = site.content_manager.getFileInfo(inner_path)
126 assert site.content_manager.hashfield.hasHash(file_info["sha512"])
127
128 piecemap_hash = site.content_manager.getFileInfo(file_info["piecemap"])["sha512"]
129 assert site.content_manager.hashfield.hasHash(piecemap_hash)
130
131 # Init client server
132 client = ConnectionServer(file_server.ip, 1545)
133 site_temp.connection_server = client
134 peer_client = site_temp.addPeer(file_server.ip, 1544)
135
136 # Download site
137 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
138
139 bad_files = site_temp.storage.verifyFiles(quick_check=True)["bad_files"]
140 assert not bad_files
141
142 # client_piecefield = peer_client.piecefields[file_info["sha512"]].tostring()
143 # assert client_piecefield == "1" * 10
144
145 # Download 5. and 10. block
146
147 site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
148 site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024))
149
150 # Verify 0. block not downloaded
151 f = site_temp.storage.open(inner_path)
152 assert f.read(10) == b"\0" * 10
153 # Verify 5. and 10. block downloaded
154 f.seek(5 * 1024 * 1024)
155 assert f.read(7) == b"Test524"
156 f.seek(9 * 1024 * 1024)
157 assert f.read(7) == b"943---T"
158
159 # Verify hashfield
160 assert set(site_temp.content_manager.hashfield) == set([18343, 43727]) # 18343: data/optional.any.iso, 43727: data/optional.any.iso.hashmap.msgpack
161
162 def testOpenBigfile(self, file_server, site, site_temp):
163 inner_path = self.createBigfile(site)
164
165 # Init source server
166 site.connection_server = file_server
167 file_server.sites[site.address] = site
168
169 # Init client server
170 client = ConnectionServer(file_server.ip, 1545)
171 site_temp.connection_server = client
172 site_temp.addPeer(file_server.ip, 1544)
173
174 # Download site
175 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
176
177 # Open virtual file
178 assert not site_temp.storage.isFile(inner_path)
179
180 with site_temp.storage.openBigfile(inner_path) as f:
181 with Spy.Spy(FileRequest, "route") as requests:
182 f.seek(5 * 1024 * 1024)
183 assert f.read(7) == b"Test524"
184
185 f.seek(9 * 1024 * 1024)
186 assert f.read(7) == b"943---T"
187
188 assert len(requests) == 4 # 1x peicemap + 1x getpiecefield + 2x for pieces
189
190 assert set(site_temp.content_manager.hashfield) == set([18343, 43727])
191
192 assert site_temp.storage.piecefields[f.sha512].tostring() == "0000010001"
193 assert f.sha512 in site_temp.getSettingsCache()["piecefields"]
194
195 # Test requesting already downloaded
196 with Spy.Spy(FileRequest, "route") as requests:
197 f.seek(5 * 1024 * 1024)
198 assert f.read(7) == b"Test524"
199
200 assert len(requests) == 0
201
202 # Test requesting multi-block overflow reads
203 with Spy.Spy(FileRequest, "route") as requests:
204 f.seek(5 * 1024 * 1024) # We already have this block
205 data = f.read(1024 * 1024 * 3) # Our read overflow to 6. and 7. block
206 assert data.startswith(b"Test524")
207 assert data.endswith(b"Test838-")
208 assert b"\0" not in data # No null bytes allowed
209
210 assert len(requests) == 2 # Two block download
211
212 # Test out of range request
213 f.seek(5 * 1024 * 1024)
214 data = f.read(1024 * 1024 * 30)
215 assert len(data) == 10 * 1000 * 1000 - (5 * 1024 * 1024)
216
217 f.seek(30 * 1024 * 1024)
218 data = f.read(1024 * 1024 * 30)
219 assert len(data) == 0
220
221 @pytest.mark.parametrize("piecefield_obj", [BigfilePiecefield, BigfilePiecefieldPacked])
222 def testPiecefield(self, piecefield_obj, site):
223 testdatas = [
224 b"\x01" * 100 + b"\x00" * 900 + b"\x01" * 4000 + b"\x00" * 4999 + b"\x01",
225 b"\x00\x01\x00\x01\x00\x01" * 10 + b"\x00\x01" * 90 + b"\x01\x00" * 400 + b"\x00" * 4999,
226 b"\x01" * 10000,
227 b"\x00" * 10000
228 ]
229 for testdata in testdatas:
230 piecefield = piecefield_obj()
231
232 piecefield.frombytes(testdata)
233 assert piecefield.tobytes() == testdata
234 assert piecefield[0] == testdata[0]
235 assert piecefield[100] == testdata[100]
236 assert piecefield[1000] == testdata[1000]
237 assert piecefield[len(testdata) - 1] == testdata[len(testdata) - 1]
238
239 packed = piecefield.pack()
240 piecefield_new = piecefield_obj()
241 piecefield_new.unpack(packed)
242 assert piecefield.tobytes() == piecefield_new.tobytes()
243 assert piecefield_new.tobytes() == testdata
244
245 def testFileGet(self, file_server, site, site_temp):
246 inner_path = self.createBigfile(site)
247
248 # Init source server
249 site.connection_server = file_server
250 file_server.sites[site.address] = site
251
252 # Init client server
253 site_temp.connection_server = FileServer(file_server.ip, 1545)
254 site_temp.connection_server.sites[site_temp.address] = site_temp
255 site_temp.addPeer(file_server.ip, 1544)
256
257 # Download site
258 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
259
260 # Download second block
261 with site_temp.storage.openBigfile(inner_path) as f:
262 f.seek(1024 * 1024)
263 assert f.read(1024)[0:1] != b"\0"
264
265 # Make sure first block not download
266 with site_temp.storage.open(inner_path) as f:
267 assert f.read(1024)[0:1] == b"\0"
268
269 peer2 = site.addPeer(file_server.ip, 1545, return_peer=True)
270
271 # Should drop error on first block request
272 assert not peer2.getFile(site.address, "%s|0-%s" % (inner_path, 1024 * 1024 * 1))
273
274 # Should not drop error for second block request
275 assert peer2.getFile(site.address, "%s|%s-%s" % (inner_path, 1024 * 1024 * 1, 1024 * 1024 * 2))
276
277 def benchmarkPeerMemory(self, site, file_server):
278 # Init source server
279 site.connection_server = file_server
280 file_server.sites[site.address] = site
281
282 import psutil, os
283 meminfo = psutil.Process(os.getpid()).memory_info
284
285 mem_s = meminfo()[0]
286 s = time.time()
287 for i in range(25000):
288 site.addPeer(file_server.ip, i)
289 print("%.3fs MEM: + %sKB" % (time.time() - s, (meminfo()[0] - mem_s) / 1024)) # 0.082s MEM: + 6800KB
290 print(list(site.peers.values())[0].piecefields)
291
292 def testUpdatePiecefield(self, file_server, site, site_temp):
293 inner_path = self.createBigfile(site)
294
295 server1 = file_server
296 server1.sites[site.address] = site
297 server2 = FileServer(file_server.ip, 1545)
298 server2.sites[site_temp.address] = site_temp
299 site_temp.connection_server = server2
300
301 # Add file_server as peer to client
302 server2_peer1 = site_temp.addPeer(file_server.ip, 1544)
303
304 # Testing piecefield sync
305 assert len(server2_peer1.piecefields) == 0
306 assert server2_peer1.updatePiecefields() # Query piecefields from peer
307 assert len(server2_peer1.piecefields) > 0
308
309 def testWorkerManagerPiecefieldDeny(self, file_server, site, site_temp):
310 inner_path = self.createBigfile(site)
311
312 server1 = file_server
313 server1.sites[site.address] = site
314 server2 = FileServer(file_server.ip, 1545)
315 server2.sites[site_temp.address] = site_temp
316 site_temp.connection_server = server2
317
318 # Add file_server as peer to client
319 server2_peer1 = site_temp.addPeer(file_server.ip, 1544) # Working
320
321 site_temp.downloadContent("content.json", download_files=False)
322 site_temp.needFile("data/optional.any.iso.piecemap.msgpack")
323
324 # Add fake peers with optional files downloaded
325 for i in range(5):
326 fake_peer = site_temp.addPeer("127.0.1.%s" % i, 1544)
327 fake_peer.hashfield = site.content_manager.hashfield
328 fake_peer.has_hashfield = True
329
330 with Spy.Spy(WorkerManager, "addWorker") as requests:
331 site_temp.needFile("%s|%s-%s" % (inner_path, 5 * 1024 * 1024, 6 * 1024 * 1024))
332 site_temp.needFile("%s|%s-%s" % (inner_path, 6 * 1024 * 1024, 7 * 1024 * 1024))
333
334 # It should only request parts from peer1 as the other peers does not have the requested parts in piecefields
335 assert len([request[1] for request in requests if request[1] != server2_peer1]) == 0
336
337 def testWorkerManagerPiecefieldDownload(self, file_server, site, site_temp):
338 inner_path = self.createBigfile(site)
339
340 server1 = file_server
341 server1.sites[site.address] = site
342 server2 = FileServer(file_server.ip, 1545)
343 server2.sites[site_temp.address] = site_temp
344 site_temp.connection_server = server2
345 sha512 = site.content_manager.getFileInfo(inner_path)["sha512"]
346
347 # Create 10 fake peer for each piece
348 for i in range(10):
349 peer = Peer(file_server.ip, 1544, site_temp, server2)
350 peer.piecefields[sha512][i] = b"\x01"
351 peer.updateHashfield = mock.MagicMock(return_value=False)
352 peer.updatePiecefields = mock.MagicMock(return_value=False)
353 peer.findHashIds = mock.MagicMock(return_value={"nope": []})
354 peer.hashfield = site.content_manager.hashfield
355 peer.has_hashfield = True
356 peer.key = "Peer:%s" % i
357 site_temp.peers["Peer:%s" % i] = peer
358
359 site_temp.downloadContent("content.json", download_files=False)
360 site_temp.needFile("data/optional.any.iso.piecemap.msgpack")
361
362 with Spy.Spy(Peer, "getFile") as requests:
363 for i in range(10):
364 site_temp.needFile("%s|%s-%s" % (inner_path, i * 1024 * 1024, (i + 1) * 1024 * 1024))
365
366 assert len(requests) == 10
367 for i in range(10):
368 assert requests[i][0] == site_temp.peers["Peer:%s" % i] # Every part should be requested from piece owner peer
369
370 def testDownloadStats(self, file_server, site, site_temp):
371 inner_path = self.createBigfile(site)
372
373 # Init source server
374 site.connection_server = file_server
375 file_server.sites[site.address] = site
376
377 # Init client server
378 client = ConnectionServer(file_server.ip, 1545)
379 site_temp.connection_server = client
380 site_temp.addPeer(file_server.ip, 1544)
381
382 # Download site
383 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
384
385 # Open virtual file
386 assert not site_temp.storage.isFile(inner_path)
387
388 # Check size before downloads
389 assert site_temp.settings["size"] < 10 * 1024 * 1024
390 assert site_temp.settings["optional_downloaded"] == 0
391 size_piecemap = site_temp.content_manager.getFileInfo(inner_path + ".piecemap.msgpack")["size"]
392 size_bigfile = site_temp.content_manager.getFileInfo(inner_path)["size"]
393
394 with site_temp.storage.openBigfile(inner_path) as f:
395 assert b"\0" not in f.read(1024)
396 assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
397
398 with site_temp.storage.openBigfile(inner_path) as f:
399 # Don't count twice
400 assert b"\0" not in f.read(1024)
401 assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
402
403 # Add second block
404 assert b"\0" not in f.read(1024 * 1024)
405 assert site_temp.settings["optional_downloaded"] == size_piecemap + size_bigfile
406
407 def testPrebuffer(self, file_server, site, site_temp):
408 inner_path = self.createBigfile(site)
409
410 # Init source server
411 site.connection_server = file_server
412 file_server.sites[site.address] = site
413
414 # Init client server
415 client = ConnectionServer(file_server.ip, 1545)
416 site_temp.connection_server = client
417 site_temp.addPeer(file_server.ip, 1544)
418
419 # Download site
420 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
421
422 # Open virtual file
423 assert not site_temp.storage.isFile(inner_path)
424
425 with site_temp.storage.openBigfile(inner_path, prebuffer=1024 * 1024 * 2) as f:
426 with Spy.Spy(FileRequest, "route") as requests:
427 f.seek(5 * 1024 * 1024)
428 assert f.read(7) == b"Test524"
429 # assert len(requests) == 3 # 1x piecemap + 1x getpiecefield + 1x for pieces
430 assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 2
431
432 time.sleep(0.5) # Wait prebuffer download
433
434 sha512 = site.content_manager.getFileInfo(inner_path)["sha512"]
435 assert site_temp.storage.piecefields[sha512].tostring() == "0000011100"
436
437 # No prebuffer beyond end of the file
438 f.seek(9 * 1024 * 1024)
439 assert b"\0" not in f.read(7)
440
441 assert len([task for task in site_temp.worker_manager.tasks if task["inner_path"].startswith(inner_path)]) == 0
442
443 def testDownloadAllPieces(self, file_server, site, site_temp):
444 inner_path = self.createBigfile(site)
445
446 # Init source server
447 site.connection_server = file_server
448 file_server.sites[site.address] = site
449
450 # Init client server
451 client = ConnectionServer(file_server.ip, 1545)
452 site_temp.connection_server = client
453 site_temp.addPeer(file_server.ip, 1544)
454
455 # Download site
456 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
457
458 # Open virtual file
459 assert not site_temp.storage.isFile(inner_path)
460
461 with Spy.Spy(FileRequest, "route") as requests:
462 site_temp.needFile("%s|all" % inner_path)
463
464 assert len(requests) == 12 # piecemap.msgpack, getPiecefields, 10 x piece
465
466 # Don't re-download already got pieces
467 with Spy.Spy(FileRequest, "route") as requests:
468 site_temp.needFile("%s|all" % inner_path)
469
470 assert len(requests) == 0
471
472 def testFileSize(self, file_server, site, site_temp):
473 inner_path = self.createBigfile(site)
474
475 # Init source server
476 site.connection_server = file_server
477 file_server.sites[site.address] = site
478
479 # Init client server
480 client = ConnectionServer(file_server.ip, 1545)
481 site_temp.connection_server = client
482 site_temp.addPeer(file_server.ip, 1544)
483
484 # Download site
485 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
486
487 # Open virtual file
488 assert not site_temp.storage.isFile(inner_path)
489
490 # Download first block
491 site_temp.needFile("%s|%s-%s" % (inner_path, 0 * 1024 * 1024, 1 * 1024 * 1024))
492 assert site_temp.storage.getSize(inner_path) < 1000 * 1000 * 10 # Size on the disk should be smaller than the real size
493
494 site_temp.needFile("%s|%s-%s" % (inner_path, 9 * 1024 * 1024, 10 * 1024 * 1024))
495 assert site_temp.storage.getSize(inner_path) == site.storage.getSize(inner_path)
496
497 def testFileRename(self, file_server, site, site_temp):
498 inner_path = self.createBigfile(site)
499
500 # Init source server
501 site.connection_server = file_server
502 file_server.sites[site.address] = site
503
504 # Init client server
505 site_temp.connection_server = FileServer(file_server.ip, 1545)
506 site_temp.connection_server.sites[site_temp.address] = site_temp
507 site_temp.addPeer(file_server.ip, 1544)
508
509 # Download site
510 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
511
512 with Spy.Spy(FileRequest, "route") as requests:
513 site_temp.needFile("%s|%s-%s" % (inner_path, 0, 1 * self.piece_size))
514
515 assert len([req for req in requests if req[1] == "streamFile"]) == 2 # 1 piece + piecemap
516
517 # Rename the file
518 inner_path_new = inner_path.replace(".iso", "-new.iso")
519 site.storage.rename(inner_path, inner_path_new)
520 site.storage.delete("data/optional.any.iso.piecemap.msgpack")
521 assert site.content_manager.sign("content.json", self.privatekey, remove_missing_optional=True)
522
523 files_optional = site.content_manager.contents["content.json"]["files_optional"].keys()
524
525 assert "data/optional.any-new.iso.piecemap.msgpack" in files_optional
526 assert "data/optional.any.iso.piecemap.msgpack" not in files_optional
527 assert "data/optional.any.iso" not in files_optional
528
529 with Spy.Spy(FileRequest, "route") as requests:
530 site.publish()
531 time.sleep(0.1)
532 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10) # Wait for download
533
534 assert len([req[1] for req in requests if req[1] == "streamFile"]) == 0
535
536 with site_temp.storage.openBigfile(inner_path_new, prebuffer=0) as f:
537 f.read(1024)
538
539 # First piece already downloaded
540 assert [req for req in requests if req[1] == "streamFile"] == []
541
542 # Second piece needs to be downloaded + changed piecemap
543 f.seek(self.piece_size)
544 f.read(1024)
545 assert [req[3]["inner_path"] for req in requests if req[1] == "streamFile"] == [inner_path_new + ".piecemap.msgpack", inner_path_new]
546
547 @pytest.mark.parametrize("size", [1024 * 3, 1024 * 1024 * 3, 1024 * 1024 * 30])
548 def testNullFileRead(self, file_server, site, site_temp, size):
549 inner_path = "data/optional.iso"
550
551 f = site.storage.open(inner_path, "w")
552 f.write("\0" * size)
553 f.close()
554 assert site.content_manager.sign("content.json", self.privatekey)
555
556 # Init source server
557 site.connection_server = file_server
558 file_server.sites[site.address] = site
559
560 # Init client server
561 site_temp.connection_server = FileServer(file_server.ip, 1545)
562 site_temp.connection_server.sites[site_temp.address] = site_temp
563 site_temp.addPeer(file_server.ip, 1544)
564
565 # Download site
566 site_temp.download(blind_includes=True, retry_bad_files=False).join(timeout=10)
567
568 if "piecemap" in site.content_manager.getFileInfo(inner_path): # Bigfile
569 site_temp.needFile(inner_path + "|all")
570 else:
571 site_temp.needFile(inner_path)
572
573
574 assert site_temp.storage.getSize(inner_path) == size