Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 448 lines 20 kB view raw
1# Included modules 2import os 3import time 4import json 5import collections 6import itertools 7 8# Third party modules 9import gevent 10 11from Debug import Debug 12from Config import config 13from util import RateLimit 14from util import Msgpack 15from util import helper 16from Plugin import PluginManager 17from contextlib import closing 18 19FILE_BUFF = 1024 * 512 20 21 22class RequestError(Exception): 23 pass 24 25 26# Incoming requests 27@PluginManager.acceptPlugins 28class FileRequest(object): 29 __slots__ = ("server", "connection", "req_id", "sites", "log", "responded") 30 31 def __init__(self, server, connection): 32 self.server = server 33 self.connection = connection 34 35 self.req_id = None 36 self.sites = self.server.sites 37 self.log = server.log 38 self.responded = False # Responded to the request 39 40 def send(self, msg, streaming=False): 41 if not self.connection.closed: 42 self.connection.send(msg, streaming) 43 44 def sendRawfile(self, file, read_bytes): 45 if not self.connection.closed: 46 self.connection.sendRawfile(file, read_bytes) 47 48 def response(self, msg, streaming=False): 49 if self.responded: 50 if config.verbose: 51 self.log.debug("Req id %s already responded" % self.req_id) 52 return 53 if not isinstance(msg, dict): # If msg not a dict create a {"body": msg} 54 msg = {"body": msg} 55 msg["cmd"] = "response" 56 msg["to"] = self.req_id 57 self.responded = True 58 self.send(msg, streaming=streaming) 59 60 # Route file requests 61 def route(self, cmd, req_id, params): 62 self.req_id = req_id 63 # Don't allow other sites than locked 64 if "site" in params and self.connection.target_onion: 65 valid_sites = self.connection.getValidSites() 66 if params["site"] not in valid_sites and valid_sites != ["global"]: 67 self.response({"error": "Invalid site"}) 68 self.connection.log( 69 "Site lock violation: %s not in %s, target onion: %s" % 70 (params["site"], valid_sites, self.connection.target_onion) 71 ) 72 self.connection.badAction(5) 73 return False 74 75 if cmd == "update": 76 event = "%s update %s %s" % (self.connection.id, params["site"], params["inner_path"]) 77 # If called more than once within 15 sec only keep the last update 78 RateLimit.callAsync(event, max(self.connection.bad_actions, 15), self.actionUpdate, params) 79 else: 80 func_name = "action" + cmd[0].upper() + cmd[1:] 81 func = getattr(self, func_name, None) 82 if cmd not in ["getFile", "streamFile"]: # Skip IO bound functions 83 if self.connection.cpu_time > 0.5: 84 self.log.debug( 85 "Delay %s %s, cpu_time used by connection: %.3fs" % 86 (self.connection.ip, cmd, self.connection.cpu_time) 87 ) 88 time.sleep(self.connection.cpu_time) 89 if self.connection.cpu_time > 5: 90 self.connection.close("Cpu time: %.3fs" % self.connection.cpu_time) 91 s = time.time() 92 if func: 93 func(params) 94 else: 95 self.actionUnknown(cmd, params) 96 97 if cmd not in ["getFile", "streamFile"]: 98 taken = time.time() - s 99 taken_sent = self.connection.last_sent_time - self.connection.last_send_time 100 self.connection.cpu_time += taken - taken_sent 101 102 # Update a site file request 103 def actionUpdate(self, params): 104 site = self.sites.get(params["site"]) 105 if not site or not site.isServing(): # Site unknown or not serving 106 self.response({"error": "Unknown site"}) 107 self.connection.badAction(1) 108 self.connection.badAction(5) 109 return False 110 111 inner_path = params.get("inner_path", "") 112 current_content_modified = site.content_manager.contents.get(inner_path, {}).get("modified", 0) 113 body = params["body"] 114 115 if not inner_path.endswith("content.json"): 116 self.response({"error": "Only content.json update allowed"}) 117 self.connection.badAction(5) 118 return 119 120 should_validate_content = True 121 if "modified" in params and params["modified"] <= current_content_modified: 122 should_validate_content = False 123 valid = None # Same or earlier content as we have 124 elif not body: # No body sent, we have to download it first 125 site.log.debug("Missing body from update for file %s, downloading ..." % inner_path) 126 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer 127 try: 128 body = peer.getFile(site.address, inner_path).read() 129 except Exception as err: 130 site.log.debug("Can't download updated file %s: %s" % (inner_path, err)) 131 self.response({"error": "File invalid update: Can't download updaed file"}) 132 self.connection.badAction(5) 133 return 134 135 if should_validate_content: 136 try: 137 content = json.loads(body.decode()) 138 except Exception as err: 139 site.log.debug("Update for %s is invalid JSON: %s" % (inner_path, err)) 140 self.response({"error": "File invalid JSON"}) 141 self.connection.badAction(5) 142 return 143 144 file_uri = "%s/%s:%s" % (site.address, inner_path, content["modified"]) 145 146 if self.server.files_parsing.get(file_uri): # Check if we already working on it 147 valid = None # Same file 148 else: 149 try: 150 valid = site.content_manager.verifyFile(inner_path, content) 151 except Exception as err: 152 site.log.debug("Update for %s is invalid: %s" % (inner_path, err)) 153 error = err 154 valid = False 155 156 if valid is True: # Valid and changed 157 site.log.info("Update for %s looks valid, saving..." % inner_path) 158 self.server.files_parsing[file_uri] = True 159 site.storage.write(inner_path, body) 160 del params["body"] 161 162 site.onFileDone(inner_path) # Trigger filedone 163 164 if inner_path.endswith("content.json"): # Download every changed file from peer 165 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update") # Add or get peer 166 # On complete publish to other peers 167 diffs = params.get("diffs", {}) 168 site.onComplete.once(lambda: site.publish(inner_path=inner_path, diffs=diffs, limit=3), "publish_%s" % inner_path) 169 170 # Load new content file and download changed files in new thread 171 def downloader(): 172 site.downloadContent(inner_path, peer=peer, diffs=params.get("diffs", {})) 173 del self.server.files_parsing[file_uri] 174 175 gevent.spawn(downloader) 176 else: 177 del self.server.files_parsing[file_uri] 178 179 self.response({"ok": "Thanks, file %s updated!" % inner_path}) 180 self.connection.goodAction() 181 182 elif valid is None: # Not changed 183 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="update old") # Add or get peer 184 if peer: 185 if not peer.connection: 186 peer.connect(self.connection) # Assign current connection to peer 187 if inner_path in site.content_manager.contents: 188 peer.last_content_json_update = site.content_manager.contents[inner_path]["modified"] 189 if config.verbose: 190 site.log.debug( 191 "Same version, adding new peer for locked files: %s, tasks: %s" % 192 (peer.key, len(site.worker_manager.tasks)) 193 ) 194 for task in site.worker_manager.tasks: # New peer add to every ongoing task 195 if task["peers"] and not task["optional_hash_id"]: 196 # Download file from this peer too if its peer locked 197 site.needFile(task["inner_path"], peer=peer, update=True, blocking=False) 198 199 self.response({"ok": "File not changed"}) 200 self.connection.badAction() 201 202 else: # Invalid sign or sha hash 203 self.response({"error": "File %s invalid: %s" % (inner_path, error)}) 204 self.connection.badAction(5) 205 206 def isReadable(self, site, inner_path, file, pos): 207 return True 208 209 # Send file content request 210 def handleGetFile(self, params, streaming=False): 211 site = self.sites.get(params["site"]) 212 if not site or not site.isServing(): # Site unknown or not serving 213 self.response({"error": "Unknown site"}) 214 self.connection.badAction(5) 215 return False 216 try: 217 file_path = site.storage.getPath(params["inner_path"]) 218 if streaming: 219 file_obj = site.storage.open(params["inner_path"]) 220 else: 221 file_obj = Msgpack.FilePart(file_path, "rb") 222 223 with file_obj as file: 224 file.seek(params["location"]) 225 read_bytes = params.get("read_bytes", FILE_BUFF) 226 file_size = os.fstat(file.fileno()).st_size 227 228 if file_size > read_bytes: # Check if file is readable at current position (for big files) 229 if not self.isReadable(site, params["inner_path"], file, params["location"]): 230 raise RequestError("File not readable at position: %s" % params["location"]) 231 else: 232 if params.get("file_size") and params["file_size"] != file_size: 233 self.connection.badAction(2) 234 raise RequestError("File size does not match: %sB != %sB" % (params["file_size"], file_size)) 235 236 if not streaming: 237 file.read_bytes = read_bytes 238 239 if params["location"] > file_size: 240 self.connection.badAction(5) 241 raise RequestError("Bad file location") 242 243 if streaming: 244 back = { 245 "size": file_size, 246 "location": min(file.tell() + read_bytes, file_size), 247 "stream_bytes": min(read_bytes, file_size - params["location"]) 248 } 249 self.response(back) 250 self.sendRawfile(file, read_bytes=read_bytes) 251 else: 252 back = { 253 "body": file, 254 "size": file_size, 255 "location": min(file.tell() + file.read_bytes, file_size) 256 } 257 self.response(back, streaming=True) 258 259 bytes_sent = min(read_bytes, file_size - params["location"]) # Number of bytes we going to send 260 site.settings["bytes_sent"] = site.settings.get("bytes_sent", 0) + bytes_sent 261 if config.debug_socket: 262 self.log.debug("File %s at position %s sent %s bytes" % (file_path, params["location"], bytes_sent)) 263 264 # Add peer to site if not added before 265 connected_peer = site.addPeer(self.connection.ip, self.connection.port, source="request") 266 if connected_peer: # Just added 267 connected_peer.connect(self.connection) # Assign current connection to peer 268 269 return {"bytes_sent": bytes_sent, "file_size": file_size, "location": params["location"]} 270 271 except RequestError as err: 272 self.log.debug("GetFile %s %s %s request error: %s" % (self.connection, params["site"], params["inner_path"], Debug.formatException(err))) 273 self.response({"error": "File read error: %s" % err}) 274 except OSError as err: 275 if config.verbose: 276 self.log.debug("GetFile read error: %s" % Debug.formatException(err)) 277 self.response({"error": "File read error"}) 278 return False 279 except Exception as err: 280 self.log.error("GetFile exception: %s" % Debug.formatException(err)) 281 self.response({"error": "File read exception"}) 282 return False 283 284 def actionGetFile(self, params): 285 return self.handleGetFile(params) 286 287 def actionStreamFile(self, params): 288 return self.handleGetFile(params, streaming=True) 289 290 # Peer exchange request 291 def actionPex(self, params): 292 site = self.sites.get(params["site"]) 293 if not site or not site.isServing(): # Site unknown or not serving 294 self.response({"error": "Unknown site"}) 295 self.connection.badAction(5) 296 return False 297 298 got_peer_keys = [] 299 added = 0 300 301 # Add requester peer to site 302 connected_peer = site.addPeer(self.connection.ip, self.connection.port, source="request") 303 304 if connected_peer: # It was not registered before 305 added += 1 306 connected_peer.connect(self.connection) # Assign current connection to peer 307 308 # Add sent peers to site 309 for packed_address in itertools.chain(params.get("peers", []), params.get("peers_ipv6", [])): 310 address = helper.unpackAddress(packed_address) 311 got_peer_keys.append("%s:%s" % address) 312 if site.addPeer(*address, source="pex"): 313 added += 1 314 315 # Add sent onion peers to site 316 for packed_address in params.get("peers_onion", []): 317 address = helper.unpackOnionAddress(packed_address) 318 got_peer_keys.append("%s:%s" % address) 319 if site.addPeer(*address, source="pex"): 320 added += 1 321 322 # Send back peers that is not in the sent list and connectable (not port 0) 323 packed_peers = helper.packPeers(site.getConnectablePeers(params["need"], ignore=got_peer_keys, allow_private=False)) 324 325 if added: 326 site.worker_manager.onPeers() 327 if config.verbose: 328 self.log.debug( 329 "Added %s peers to %s using pex, sending back %s" % 330 (added, site, {key: len(val) for key, val in packed_peers.items()}) 331 ) 332 333 back = { 334 "peers": packed_peers["ipv4"], 335 "peers_ipv6": packed_peers["ipv6"], 336 "peers_onion": packed_peers["onion"] 337 } 338 339 self.response(back) 340 341 # Get modified content.json files since 342 def actionListModified(self, params): 343 site = self.sites.get(params["site"]) 344 if not site or not site.isServing(): # Site unknown or not serving 345 self.response({"error": "Unknown site"}) 346 self.connection.badAction(5) 347 return False 348 modified_files = site.content_manager.listModified(params["since"]) 349 350 # Add peer to site if not added before 351 connected_peer = site.addPeer(self.connection.ip, self.connection.port, source="request") 352 if connected_peer: # Just added 353 connected_peer.connect(self.connection) # Assign current connection to peer 354 355 self.response({"modified_files": modified_files}) 356 357 def actionGetHashfield(self, params): 358 site = self.sites.get(params["site"]) 359 if not site or not site.isServing(): # Site unknown or not serving 360 self.response({"error": "Unknown site"}) 361 self.connection.badAction(5) 362 return False 363 364 # Add peer to site if not added before 365 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, source="request") 366 if not peer.connection: # Just added 367 peer.connect(self.connection) # Assign current connection to peer 368 369 peer.time_my_hashfield_sent = time.time() # Don't send again if not changed 370 371 self.response({"hashfield_raw": site.content_manager.hashfield.tobytes()}) 372 373 def findHashIds(self, site, hash_ids, limit=100): 374 back = collections.defaultdict(lambda: collections.defaultdict(list)) 375 found = site.worker_manager.findOptionalHashIds(hash_ids, limit=limit) 376 377 for hash_id, peers in found.items(): 378 for peer in peers: 379 ip_type = helper.getIpType(peer.ip) 380 if len(back[ip_type][hash_id]) < 20: 381 back[ip_type][hash_id].append(peer.packMyAddress()) 382 return back 383 384 def actionFindHashIds(self, params): 385 site = self.sites.get(params["site"]) 386 s = time.time() 387 if not site or not site.isServing(): # Site unknown or not serving 388 self.response({"error": "Unknown site"}) 389 self.connection.badAction(5) 390 return False 391 392 event_key = "%s_findHashIds_%s_%s" % (self.connection.ip, params["site"], len(params["hash_ids"])) 393 if self.connection.cpu_time > 0.5 or not RateLimit.isAllowed(event_key, 60 * 5): 394 time.sleep(0.1) 395 back = self.findHashIds(site, params["hash_ids"], limit=10) 396 else: 397 back = self.findHashIds(site, params["hash_ids"]) 398 RateLimit.called(event_key) 399 400 my_hashes = [] 401 my_hashfield_set = set(site.content_manager.hashfield) 402 for hash_id in params["hash_ids"]: 403 if hash_id in my_hashfield_set: 404 my_hashes.append(hash_id) 405 406 if config.verbose: 407 self.log.debug( 408 "Found: %s for %s hashids in %.3fs" % 409 ({key: len(val) for key, val in back.items()}, len(params["hash_ids"]), time.time() - s) 410 ) 411 self.response({"peers": back["ipv4"], "peers_onion": back["onion"], "peers_ipv6": back["ipv6"], "my": my_hashes}) 412 413 def actionSetHashfield(self, params): 414 site = self.sites.get(params["site"]) 415 if not site or not site.isServing(): # Site unknown or not serving 416 self.response({"error": "Unknown site"}) 417 self.connection.badAction(5) 418 return False 419 420 # Add or get peer 421 peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection, source="request") 422 if not peer.connection: 423 peer.connect(self.connection) 424 peer.hashfield.replaceFromBytes(params["hashfield_raw"]) 425 self.response({"ok": "Updated"}) 426 427 # Send a simple Pong! answer 428 def actionPing(self, params): 429 self.response(b"Pong!") 430 431 # Check requested port of the other peer 432 def actionCheckport(self, params): 433 if helper.getIpType(self.connection.ip) == "ipv6": 434 sock_address = (self.connection.ip, params["port"], 0, 0) 435 else: 436 sock_address = (self.connection.ip, params["port"]) 437 438 with closing(helper.createSocket(self.connection.ip)) as sock: 439 sock.settimeout(5) 440 if sock.connect_ex(sock_address) == 0: 441 self.response({"status": "open", "ip_external": self.connection.ip}) 442 else: 443 self.response({"status": "closed", "ip_external": self.connection.ip}) 444 445 # Unknown command 446 def actionUnknown(self, cmd, params): 447 self.response({"error": "Unknown command: %s" % cmd}) 448 self.connection.badAction(5)