Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1# Included modules
2import os
3import time
4import json
5import collections
6import itertools
7
8# Third party modules
9import gevent
10
11from Debug import Debug
12from Config import config
13from util import RateLimit
14from util import Msgpack
15from util import helper
16from Plugin import PluginManager
17from contextlib import closing
18
19FILE_BUFF = 1024 * 512
20
21
22class RequestError(Exception):
23 pass
24
25
26# Incoming requests
27@PluginManager.acceptPlugins
28class FileRequest(object):
29 __slots__ = ("server", "connection", "req_id", "sites", "log", "responded")
30
31 def __init__(self, server, connection):
32 self.server = server
33 self.connection = connection
34
35 self.req_id = None
36 self.sites = self.server.sites
37 self.log = server.log
38 self.responded = False # Responded to the request
39
40 def send(self, msg, streaming=False):
41 if not self.connection.closed:
42 self.connection.send(msg, streaming)
43
44 def sendRawfile(self, file, read_bytes):
45 if not self.connection.closed:
46 self.connection.sendRawfile(file, read_bytes)
47
48 def response(self, msg, streaming=False):
49 if self.responded:
50 if config.verbose:
51 self.log.debug("Req id %s already responded" % self.req_id)
52 return
53 if not isinstance(msg, dict): # If msg not a dict create a {"body": msg}
54 msg = {"body": msg}
55 msg["cmd"] = "response"
56 msg["to"] = self.req_id
57 self.responded = True
58 self.send(msg, streaming=streaming)
59
60 # Route file requests
61 def route(self, cmd, req_id, params):
62 self.req_id = req_id
63 # Don't allow other sites than locked
64 if "site" in params and self.connection.target_onion:
65 valid_sites = self.connection.getValidSites()
66 if params["site"] not in valid_sites and valid_sites != ["global"]:
67 self.response({"error": "Invalid site"})
68 self.connection.log(
69 "Site lock violation: %s not in %s, target onion: %s" %
70 (params["site"], valid_sites, self.connection.target_onion)
71 )
72 self.connection.badAction(5)
73 return False
74
75 if cmd == "update":
76 event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"])
77 # If called more than once within 15 sec only keep the last update
78 RateLimit.callAsync(event, max(self.connection.bad_actions, 15), self.actionUpdate, params)
79 else:
80 func_name = "action" + cmd[0].upper() + cmd[1:]
81 func = getattr(self, func_name, None)
82 if cmd not in ["getFile", "streamFile"]: # Skip IO bound functions
83 if self.connection.cpu_time > 0.5:
84 self.log.debug(
85 "Delay %s %s, cpu_time used by connection: %.3fs" %
86 (self.connection.ip, cmd, self.connection.cpu_time)
87 )
88 time.sleep(self.connection.cpu_time)
89 if self.connection.cpu_time > 5:
90 self.connection.close("Cpu time: %.3fs" % self.connection.cpu_time)
91 s = time.time()
92 if func:
93 func(params)
94 else:
95 self.actionUnknown(cmd, params)
96
97 if cmd not in ["getFile", "streamFile"]:
98 taken = time.time() - s
99 taken_sent = self.connection.last_sent_time - self.connection.last_send_time
100 self.connection.cpu_time += taken - taken_sent
101
102 # Update a site file request
103 def actionUpdate(self, params):
104 site = self.sites.get(params["site"])
105 if not site or not site.isServing(): # Site unknown or not serving
106 self.response({"error": "Unknown site"})
107 self.connection.badAction(1)
108 self.connection.badAction(5)
109 return False
110
111 inner_path = params.get("inner_path", "")
112 current_content_modified = site.content_manager.contents.get(inner_path, {}).get("modified", 0)
113 body = params["body"]
114
115 if not inner_path.endswith("content.json"):
116 self.response({"error": "Only content.json update allowed"})
117 self.connection.badAction(5)
118 return
119
120 should_validate_content = True
121 if "modified" in params and params["modified"] <= current_content_modified:
122 should_validate_content = False
123 valid = None # Same or earlier content as we have
124 elif not body: # No body sent, we have to download it first
125 site.log.debug("Missing body from update for file %s, downloading ..." % inner_path)
126 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer
127 try:
128 body = peer.getFile(site.address, inner_path).read()
129 except Exception as err:
130 site.log.debug("Can't download updated file %s: %s" % (inner_path, err))
131 self.response({"error": "File invalid update: Can't download updaed file"})
132 self.connection.badAction(5)
133 return
134
135 if should_validate_content:
136 try:
137 content = json.loads(body.decode())
138 except Exception as err:
139 site.log.debug("Update for %s is invalid JSON: %s" % (inner_path, err))
140 self.response({"error": "File invalid JSON"})
141 self.connection.badAction(5)
142 return
143
144 file_uri = "%s/%s:%s" % (site.address, inner_path, content["modified"])
145
146 if self.server.files_parsing.get(file_uri): # Check if we already working on it
147 valid = None # Same file
148 else:
149 try:
150 valid = site.content_manager.verifyFile(inner_path, content)
151 except Exception as err:
152 site.log.debug("Update for %s is invalid: %s" % (inner_path, err))
153 error = err
154 valid = False
155
156 if valid is True: # Valid and changed
157 site.log.info("Update for %s looks valid, saving..." % inner_path)
158 self.server.files_parsing[file_uri] = True
159 site.storage.write(inner_path, body)
160 del params["body"]
161
162 site.onFileDone(inner_path) # Trigger filedone
163
164 if inner_path.endswith("content.json"): # Download every changed file from peer
165 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer
166 # On complete publish to other peers
167 diffs = params.get("diffs", {})
168 site.onComplete.once(lambda: site.publish(inner_path=inner_path, diffs=diffs, limit=3), "publish_%s" % inner_path)
169
170 # Load new content file and download changed files in new thread
171 def downloader():
172 site.downloadContent(inner_path, peer=peer, diffs=params.get("diffs", {}))
173 del self.server.files_parsing[file_uri]
174
175 gevent.spawn(downloader)
176 else:
177 del self.server.files_parsing[file_uri]
178
179 self.response({"ok": "Thanks, file %s updated!" % inner_path})
180 self.connection.goodAction()
181
182 elif valid is None: # Not changed
183 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update old") # Add or get peer
184 if peer:
185 if not peer.connection:
186 peer.connect(self.connection) # Assign current connection to peer
187 if inner_path in site.content_manager.contents:
188 peer.last_content_json_update = site.content_manager.contents[inner_path]["modified"]
189 if config.verbose:
190 site.log.debug(
191 "Same version, adding new peer for locked files: %s, tasks: %s" %
192 (peer.key, len(site.worker_manager.tasks))
193 )
194 for task in site.worker_manager.tasks: # New peer add to every ongoing task
195 if task["peers"] and not task["optional_hash_id"]:
196 # Download file from this peer too if its peer locked
197 site.needFile(task["inner_path"], peer=peer, update=True, blocking=False)
198
199 self.response({"ok": "File not changed"})
200 self.connection.badAction()
201
202 else: # Invalid sign or sha hash
203 self.response({"error": "File %s invalid: %s" % (inner_path, error)})
204 self.connection.badAction(5)
205
206 def isReadable(self, site, inner_path, file, pos):
207 return True
208
209 # Send file content request
210 def handleGetFile(self, params, streaming=False):
211 site = self.sites.get(params["site"])
212 if not site or not site.isServing(): # Site unknown or not serving
213 self.response({"error": "Unknown site"})
214 self.connection.badAction(5)
215 return False
216 try:
217 file_path = site.storage.getPath(params["inner_path"])
218 if streaming:
219 file_obj = site.storage.open(params["inner_path"])
220 else:
221 file_obj = Msgpack.FilePart(file_path, "rb")
222
223 with file_obj as file:
224 file.seek(params["location"])
225 read_bytes = params.get("read_bytes", FILE_BUFF)
226 file_size = os.fstat(file.fileno()).st_size
227
228 if file_size > read_bytes: # Check if file is readable at current position (for big files)
229 if not self.isReadable(site, params["inner_path"], file, params["location"]):
230 raise RequestError("File not readable at position: %s" % params["location"])
231 else:
232 if params.get("file_size") and params["file_size"] != file_size:
233 self.connection.badAction(2)
234 raise RequestError("File size does not match: %sB != %sB" % (params["file_size"], file_size))
235
236 if not streaming:
237 file.read_bytes = read_bytes
238
239 if params["location"] > file_size:
240 self.connection.badAction(5)
241 raise RequestError("Bad file location")
242
243 if streaming:
244 back = {
245 "size": file_size,
246 "location": min(file.tell() + read_bytes, file_size),
247 "stream_bytes": min(read_bytes, file_size - params["location"])
248 }
249 self.response(back)
250 self.sendRawfile(file, read_bytes=read_bytes)
251 else:
252 back = {
253 "body": file,
254 "size": file_size,
255 "location": min(file.tell() + file.read_bytes, file_size)
256 }
257 self.response(back, streaming=True)
258
259 bytes_sent = min(read_bytes, file_size - params["location"]) # Number of bytes we going to send
260 site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + bytes_sent
261 if config.debug_socket:
262 self.log.debug("File %s at position %s sent %s bytes" % (file_path, params["location"], bytes_sent))
263
264 # Add peer to site if not added before
265 connected_peer = site.addPeer(self.connection.ip, self.connection.port, source="request")
266 if connected_peer: # Just added
267 connected_peer.connect(self.connection) # Assign current connection to peer
268
269 return {"bytes_sent": bytes_sent, "file_size": file_size, "location": params["location"]}
270
271 except RequestError as err:
272 self.log.debug("GetFile %s %s %s request error: %s" % (self.connection, params["site"], params["inner_path"], Debug.formatException(err)))
273 self.response({"error": "File read error: %s" % err})
274 except OSError as err:
275 if config.verbose:
276 self.log.debug("GetFile read error: %s" % Debug.formatException(err))
277 self.response({"error": "File read error"})
278 return False
279 except Exception as err:
280 self.log.error("GetFile exception: %s" % Debug.formatException(err))
281 self.response({"error": "File read exception"})
282 return False
283
284 def actionGetFile(self, params):
285 return self.handleGetFile(params)
286
287 def actionStreamFile(self, params):
288 return self.handleGetFile(params, streaming=True)
289
290 # Peer exchange request
291 def actionPex(self, params):
292 site = self.sites.get(params["site"])
293 if not site or not site.isServing(): # Site unknown or not serving
294 self.response({"error": "Unknown site"})
295 self.connection.badAction(5)
296 return False
297
298 got_peer_keys = []
299 added = 0
300
301 # Add requester peer to site
302 connected_peer = site.addPeer(self.connection.ip, self.connection.port, source="request")
303
304 if connected_peer: # It was not registered before
305 added += 1
306 connected_peer.connect(self.connection) # Assign current connection to peer
307
308 # Add sent peers to site
309 for packed_address in itertools.chain(params.get("peers", []), params.get("peers_ipv6", [])):
310 address = helper.unpackAddress(packed_address)
311 got_peer_keys.append("%s:%s" % address)
312 if site.addPeer(*address, source="pex"):
313 added += 1
314
315 # Add sent onion peers to site
316 for packed_address in params.get("peers_onion", []):
317 address = helper.unpackOnionAddress(packed_address)
318 got_peer_keys.append("%s:%s" % address)
319 if site.addPeer(*address, source="pex"):
320 added += 1
321
322 # Send back peers that is not in the sent list and connectable (not port 0)
323 packed_peers = helper.packPeers(site.getConnectablePeers(params["need"], ignore=got_peer_keys, allow_private=False))
324
325 if added:
326 site.worker_manager.onPeers()
327 if config.verbose:
328 self.log.debug(
329 "Added %s peers to %s using pex, sending back %s" %
330 (added, site, {key: len(val) for key, val in packed_peers.items()})
331 )
332
333 back = {
334 "peers": packed_peers["ipv4"],
335 "peers_ipv6": packed_peers["ipv6"],
336 "peers_onion": packed_peers["onion"]
337 }
338
339 self.response(back)
340
341 # Get modified content.json files since
342 def actionListModified(self, params):
343 site = self.sites.get(params["site"])
344 if not site or not site.isServing(): # Site unknown or not serving
345 self.response({"error": "Unknown site"})
346 self.connection.badAction(5)
347 return False
348 modified_files = site.content_manager.listModified(params["since"])
349
350 # Add peer to site if not added before
351 connected_peer = site.addPeer(self.connection.ip, self.connection.port, source="request")
352 if connected_peer: # Just added
353 connected_peer.connect(self.connection) # Assign current connection to peer
354
355 self.response({"modified_files": modified_files})
356
357 def actionGetHashfield(self, params):
358 site = self.sites.get(params["site"])
359 if not site or not site.isServing(): # Site unknown or not serving
360 self.response({"error": "Unknown site"})
361 self.connection.badAction(5)
362 return False
363
364 # Add peer to site if not added before
365 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="request")
366 if not peer.connection: # Just added
367 peer.connect(self.connection) # Assign current connection to peer
368
369 peer.time_my_hashfield_sent = time.time() # Don't send again if not changed
370
371 self.response({"hashfield_raw": site.content_manager.hashfield.tobytes()})
372
373 def findHashIds(self, site, hash_ids, limit=100):
374 back = collections.defaultdict(lambda: collections.defaultdict(list))
375 found = site.worker_manager.findOptionalHashIds(hash_ids, limit=limit)
376
377 for hash_id, peers in found.items():
378 for peer in peers:
379 ip_type = helper.getIpType(peer.ip)
380 if len(back[ip_type][hash_id]) < 20:
381 back[ip_type][hash_id].append(peer.packMyAddress())
382 return back
383
384 def actionFindHashIds(self, params):
385 site = self.sites.get(params["site"])
386 s = time.time()
387 if not site or not site.isServing(): # Site unknown or not serving
388 self.response({"error": "Unknown site"})
389 self.connection.badAction(5)
390 return False
391
392 event_key = "%s_findHashIds_%s_%s" % (self.connection.ip, params["site"], len(params["hash_ids"]))
393 if self.connection.cpu_time > 0.5 or not RateLimit.isAllowed(event_key, 60 * 5):
394 time.sleep(0.1)
395 back = self.findHashIds(site, params["hash_ids"], limit=10)
396 else:
397 back = self.findHashIds(site, params["hash_ids"])
398 RateLimit.called(event_key)
399
400 my_hashes = []
401 my_hashfield_set = set(site.content_manager.hashfield)
402 for hash_id in params["hash_ids"]:
403 if hash_id in my_hashfield_set:
404 my_hashes.append(hash_id)
405
406 if config.verbose:
407 self.log.debug(
408 "Found: %s for %s hashids in %.3fs" %
409 ({key: len(val) for key, val in back.items()}, len(params["hash_ids"]), time.time() - s)
410 )
411 self.response({"peers": back["ipv4"], "peers_onion": back["onion"], "peers_ipv6": back["ipv6"], "my": my_hashes})
412
413 def actionSetHashfield(self, params):
414 site = self.sites.get(params["site"])
415 if not site or not site.isServing(): # Site unknown or not serving
416 self.response({"error": "Unknown site"})
417 self.connection.badAction(5)
418 return False
419
420 # Add or get peer
421 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection, source="request")
422 if not peer.connection:
423 peer.connect(self.connection)
424 peer.hashfield.replaceFromBytes(params["hashfield_raw"])
425 self.response({"ok": "Updated"})
426
427 # Send a simple Pong! answer
428 def actionPing(self, params):
429 self.response(b"Pong!")
430
431 # Check requested port of the other peer
432 def actionCheckport(self, params):
433 if helper.getIpType(self.connection.ip) == "ipv6":
434 sock_address = (self.connection.ip, params["port"], 0, 0)
435 else:
436 sock_address = (self.connection.ip, params["port"])
437
438 with closing(helper.createSocket(self.connection.ip)) as sock:
439 sock.settimeout(5)
440 if sock.connect_ex(sock_address) == 0:
441 self.response({"status": "open", "ip_external": self.connection.ip})
442 else:
443 self.response({"status": "closed", "ip_external": self.connection.ip})
444
445 # Unknown command
446 def actionUnknown(self, cmd, params):
447 self.response({"error": "Unknown command: %s" % cmd})
448 self.connection.badAction(5)