Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 410 lines 16 kB view raw
1import logging 2import time 3import sys 4import itertools 5import collections 6 7import gevent 8 9import io 10from Debug import Debug 11from Config import config 12from util import helper 13from .PeerHashfield import PeerHashfield 14from Plugin import PluginManager 15 16if config.use_tempfiles: 17 import tempfile 18 19 20# Communicate remote peers 21@PluginManager.acceptPlugins 22class Peer(object): 23 __slots__ = ( 24 "ip", "port", "site", "key", "connection", "connection_server", "time_found", "time_response", "time_hashfield", 25 "time_added", "has_hashfield", "is_tracker_connection", "time_my_hashfield_sent", "last_ping", "reputation", 26 "last_content_json_update", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time" 27 ) 28 29 def __init__(self, ip, port, site=None, connection_server=None): 30 self.ip = ip 31 self.port = port 32 self.site = site 33 self.key = "%s:%s" % (ip, port) 34 35 self.connection = None 36 self.connection_server = connection_server 37 self.has_hashfield = False # Lazy hashfield object not created yet 38 self.time_hashfield = None # Last time peer's hashfiled downloaded 39 self.time_my_hashfield_sent = None # Last time my hashfield sent to peer 40 self.time_found = time.time() # Time of last found in the torrent tracker 41 self.time_response = None # Time of last successful response from peer 42 self.time_added = time.time() 43 self.last_ping = None # Last response time for ping 44 self.is_tracker_connection = False # Tracker connection instead of normal peer 45 self.reputation = 0 # More likely to connect if larger 46 self.last_content_json_update = 0.0 # Modify date of last received content.json 47 48 self.connection_error = 0 # Series of connection error 49 self.hash_failed = 0 # Number of bad files from peer 50 self.download_bytes = 0 # Bytes downloaded 51 self.download_time = 0 # Time spent to download 52 53 def __getattr__(self, key): 54 if key == "hashfield": 55 self.has_hashfield = True 56 self.hashfield = PeerHashfield() 57 return self.hashfield 58 else: 59 return getattr(self, key) 60 61 def log(self, text): 62 if not config.verbose: 63 return # Only log if we are in debug mode 64 if self.site: 65 self.site.log.debug("%s:%s %s" % (self.ip, self.port, text)) 66 else: 67 logging.debug("%s:%s %s" % (self.ip, self.port, text)) 68 69 # Connect to host 70 def connect(self, connection=None): 71 if self.reputation < -10: 72 self.reputation = -10 73 if self.reputation > 10: 74 self.reputation = 10 75 76 if self.connection: 77 self.log("Getting connection (Closing %s)..." % self.connection) 78 self.connection.close("Connection change") 79 else: 80 self.log("Getting connection (reputation: %s)..." % self.reputation) 81 82 if connection: # Connection specified 83 self.log("Assigning connection %s" % connection) 84 self.connection = connection 85 self.connection.sites += 1 86 else: # Try to find from connection pool or create new connection 87 self.connection = None 88 89 try: 90 if self.connection_server: 91 connection_server = self.connection_server 92 elif self.site: 93 connection_server = self.site.connection_server 94 else: 95 import main 96 connection_server = main.file_server 97 self.connection = connection_server.getConnection(self.ip, self.port, site=self.site, is_tracker_connection=self.is_tracker_connection) 98 self.reputation += 1 99 self.connection.sites += 1 100 except Exception as err: 101 self.onConnectionError("Getting connection error") 102 self.log("Getting connection error: %s (connection_error: %s, hash_failed: %s)" % 103 (Debug.formatException(err), self.connection_error, self.hash_failed)) 104 self.connection = None 105 return self.connection 106 107 # Check if we have connection to peer 108 def findConnection(self): 109 if self.connection and self.connection.connected: # We have connection to peer 110 return self.connection 111 else: # Try to find from other sites connections 112 self.connection = self.site.connection_server.getConnection(self.ip, self.port, create=False, site=self.site) 113 if self.connection: 114 self.connection.sites += 1 115 return self.connection 116 117 def __str__(self): 118 if self.site: 119 return "Peer:%-12s of %s" % (self.ip, self.site.address_short) 120 else: 121 return "Peer:%-12s" % self.ip 122 123 def __repr__(self): 124 return "<%s>" % self.__str__() 125 126 def packMyAddress(self): 127 if self.ip.endswith(".onion"): 128 return helper.packOnionAddress(self.ip, self.port) 129 else: 130 return helper.packAddress(self.ip, self.port) 131 132 # Found a peer from a source 133 def found(self, source="other"): 134 if self.reputation < 5: 135 if source == "tracker": 136 if self.ip.endswith(".onion"): 137 self.reputation += 1 138 else: 139 self.reputation += 2 140 elif source == "local": 141 self.reputation += 20 142 143 if source in ("tracker", "local"): 144 self.site.peers_recent.appendleft(self) 145 self.time_found = time.time() 146 147 # Send a command to peer and return response value 148 def request(self, cmd, params={}, stream_to=None): 149 if not self.connection or self.connection.closed: 150 self.connect() 151 if not self.connection: 152 self.onConnectionError("Reconnect error") 153 return None # Connection failed 154 155 self.log("Send request: %s %s %s %s" % (params.get("site", ""), cmd, params.get("inner_path", ""), params.get("location", ""))) 156 157 for retry in range(1, 4): # Retry 3 times 158 try: 159 if not self.connection: 160 raise Exception("No connection found") 161 res = self.connection.request(cmd, params, stream_to) 162 if not res: 163 raise Exception("Send error") 164 if "error" in res: 165 self.log("%s error: %s" % (cmd, res["error"])) 166 self.onConnectionError("Response error") 167 break 168 else: # Successful request, reset connection error num 169 self.connection_error = 0 170 self.time_response = time.time() 171 if res: 172 return res 173 else: 174 raise Exception("Invalid response: %s" % res) 175 except Exception as err: 176 if type(err).__name__ == "Notify": # Greenlet killed by worker 177 self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd)) 178 break 179 else: 180 self.onConnectionError("Request error") 181 self.log( 182 "%s (connection_error: %s, hash_failed: %s, retry: %s)" % 183 (Debug.formatException(err), self.connection_error, self.hash_failed, retry) 184 ) 185 time.sleep(1 * retry) 186 self.connect() 187 return None # Failed after 4 retry 188 189 # Get a file content from peer 190 def getFile(self, site, inner_path, file_size=None, pos_from=0, pos_to=None, streaming=False): 191 if file_size and file_size > 5 * 1024 * 1024: 192 max_read_size = 1024 * 1024 193 else: 194 max_read_size = 512 * 1024 195 196 if pos_to: 197 read_bytes = min(max_read_size, pos_to - pos_from) 198 else: 199 read_bytes = max_read_size 200 201 location = pos_from 202 203 if config.use_tempfiles: 204 buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b') 205 else: 206 buff = io.BytesIO() 207 208 s = time.time() 209 while True: # Read in smaller parts 210 if config.stream_downloads or read_bytes > 256 * 1024 or streaming: 211 res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location, "read_bytes": read_bytes, "file_size": file_size}, stream_to=buff) 212 if not res or "location" not in res: # Error 213 return False 214 else: 215 self.log("Send: %s" % inner_path) 216 res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location, "read_bytes": read_bytes, "file_size": file_size}) 217 if not res or "location" not in res: # Error 218 return False 219 self.log("Recv: %s" % inner_path) 220 buff.write(res["body"]) 221 res["body"] = None # Save memory 222 223 if res["location"] == res["size"] or res["location"] == pos_to: # End of file 224 break 225 else: 226 location = res["location"] 227 if pos_to: 228 read_bytes = min(max_read_size, pos_to - location) 229 230 if pos_to: 231 recv = pos_to - pos_from 232 else: 233 recv = res["location"] 234 235 self.download_bytes += recv 236 self.download_time += (time.time() - s) 237 if self.site: 238 self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + recv 239 self.log("Downloaded: %s, pos: %s, read_bytes: %s" % (inner_path, buff.tell(), read_bytes)) 240 buff.seek(0) 241 return buff 242 243 # Send a ping request 244 def ping(self): 245 response_time = None 246 for retry in range(1, 3): # Retry 3 times 247 s = time.time() 248 with gevent.Timeout(10.0, False): # 10 sec timeout, don't raise exception 249 res = self.request("ping") 250 251 if res and "body" in res and res["body"] == b"Pong!": 252 response_time = time.time() - s 253 break # All fine, exit from for loop 254 # Timeout reached or bad response 255 self.onConnectionError("Ping timeout") 256 self.connect() 257 time.sleep(1) 258 259 if response_time: 260 self.log("Ping: %.3f" % response_time) 261 else: 262 self.log("Ping failed") 263 self.last_ping = response_time 264 return response_time 265 266 # Request peer exchange from peer 267 def pex(self, site=None, need_num=5): 268 if not site: 269 site = self.site # If no site defined request peers for this site 270 271 # give back 5 connectible peers 272 packed_peers = helper.packPeers(self.site.getConnectablePeers(5, allow_private=False)) 273 request = {"site": site.address, "peers": packed_peers["ipv4"], "need": need_num} 274 if packed_peers["onion"]: 275 request["peers_onion"] = packed_peers["onion"] 276 if packed_peers["ipv6"]: 277 request["peers_ipv6"] = packed_peers["ipv6"] 278 res = self.request("pex", request) 279 if not res or "error" in res: 280 return False 281 added = 0 282 283 # Remove unsupported peer types 284 if "peers_ipv6" in res and self.connection and "ipv6" not in self.connection.server.supported_ip_types: 285 del res["peers_ipv6"] 286 287 if "peers_onion" in res and self.connection and "onion" not in self.connection.server.supported_ip_types: 288 del res["peers_onion"] 289 290 # Add IPv4 + IPv6 291 for peer in itertools.chain(res.get("peers", []), res.get("peers_ipv6", [])): 292 address = helper.unpackAddress(peer) 293 if site.addPeer(*address, source="pex"): 294 added += 1 295 296 # Add Onion 297 for peer in res.get("peers_onion", []): 298 address = helper.unpackOnionAddress(peer) 299 if site.addPeer(*address, source="pex"): 300 added += 1 301 302 if added: 303 self.log("Added peers using pex: %s" % added) 304 305 return added 306 307 # List modified files since the date 308 # Return: {inner_path: modification date,...} 309 def listModified(self, since): 310 return self.request("listModified", {"since": since, "site": self.site.address}) 311 312 def updateHashfield(self, force=False): 313 # Don't update hashfield again in 5 min 314 if self.time_hashfield and time.time() - self.time_hashfield < 5 * 60 and not force: 315 return False 316 317 self.time_hashfield = time.time() 318 res = self.request("getHashfield", {"site": self.site.address}) 319 if not res or "error" in res or "hashfield_raw" not in res: 320 return False 321 self.hashfield.replaceFromBytes(res["hashfield_raw"]) 322 323 return self.hashfield 324 325 # Find peers for hashids 326 # Return: {hash1: ["ip:port", "ip:port",...],...} 327 def findHashIds(self, hash_ids): 328 res = self.request("findHashIds", {"site": self.site.address, "hash_ids": hash_ids}) 329 if not res or "error" in res or type(res) is not dict: 330 return False 331 332 back = collections.defaultdict(list) 333 334 for ip_type in ["ipv4", "ipv6", "onion"]: 335 if ip_type == "ipv4": 336 key = "peers" 337 else: 338 key = "peers_%s" % ip_type 339 for hash, peers in list(res.get(key, {}).items())[0:30]: 340 if ip_type == "onion": 341 unpacker_func = helper.unpackOnionAddress 342 else: 343 unpacker_func = helper.unpackAddress 344 345 back[hash] += list(map(unpacker_func, peers)) 346 347 for hash in res.get("my", []): 348 if self.connection: 349 back[hash].append((self.connection.ip, self.connection.port)) 350 else: 351 back[hash].append((self.ip, self.port)) 352 353 return back 354 355 # Send my hashfield to peer 356 # Return: True if sent 357 def sendMyHashfield(self): 358 if self.connection and self.connection.handshake.get("rev", 0) < 510: 359 return False # Not supported 360 if self.time_my_hashfield_sent and self.site.content_manager.hashfield.time_changed <= self.time_my_hashfield_sent: 361 return False # Peer already has the latest hashfield 362 363 res = self.request("setHashfield", {"site": self.site.address, "hashfield_raw": self.site.content_manager.hashfield.tobytes()}) 364 if not res or "error" in res: 365 return False 366 else: 367 self.time_my_hashfield_sent = time.time() 368 return True 369 370 def publish(self, address, inner_path, body, modified, diffs=[]): 371 if len(body) > 10 * 1024 and self.connection and self.connection.handshake.get("rev", 0) >= 4095: 372 # To save bw we don't push big content.json to peers 373 body = b"" 374 375 return self.request("update", { 376 "site": address, 377 "inner_path": inner_path, 378 "body": body, 379 "modified": modified, 380 "diffs": diffs 381 }) 382 383 # Stop and remove from site 384 def remove(self, reason="Removing"): 385 self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed)) 386 if self.site and self.key in self.site.peers: 387 del(self.site.peers[self.key]) 388 389 if self.site and self in self.site.peers_recent: 390 self.site.peers_recent.remove(self) 391 392 if self.connection: 393 self.connection.close(reason) 394 395 # - EVENTS - 396 397 # On connection error 398 def onConnectionError(self, reason="Unknown"): 399 self.connection_error += 1 400 if self.site and len(self.site.peers) > 200: 401 limit = 3 402 else: 403 limit = 6 404 self.reputation -= 1 405 if self.connection_error >= limit: # Dead peer 406 self.remove("Peer connection: %s" % reason) 407 408 # Done working with peer 409 def onWorkerDone(self): 410 pass