Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 403 lines 16 kB view raw
1import logging 2import time 3import random 4import socket 5import sys 6 7import gevent 8import gevent.pool 9from gevent.server import StreamServer 10 11import util 12from util import helper 13from Config import config 14from .FileRequest import FileRequest 15from Peer import PeerPortchecker 16from Site import SiteManager 17from Connection import ConnectionServer 18from Plugin import PluginManager 19from Debug import Debug 20 21 22@PluginManager.acceptPlugins 23class FileServer(ConnectionServer): 24 25 def __init__(self, ip=config.fileserver_ip, port=config.fileserver_port, ip_type=config.fileserver_ip_type): 26 self.site_manager = SiteManager.site_manager 27 self.portchecker = PeerPortchecker.PeerPortchecker(self) 28 self.log = logging.getLogger("FileServer") 29 self.ip_type = ip_type 30 self.ip_external_list = [] 31 32 self.supported_ip_types = ["ipv4"] # Outgoing ip_type support 33 if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported(): 34 self.supported_ip_types.append("ipv6") 35 36 if ip_type == "ipv6" or (ip_type == "dual" and "ipv6" in self.supported_ip_types): 37 ip = ip.replace("*", "::") 38 else: 39 ip = ip.replace("*", "0.0.0.0") 40 41 if config.tor == "always": 42 port = config.tor_hs_port 43 config.fileserver_port = port 44 elif port == 0: # Use random port 45 port_range_from, port_range_to = list(map(int, config.fileserver_port_range.split("-"))) 46 port = self.getRandomPort(ip, port_range_from, port_range_to) 47 config.fileserver_port = port 48 if not port: 49 raise Exception("Can't find bindable port") 50 if not config.tor == "always": 51 config.saveValue("fileserver_port", port) # Save random port value for next restart 52 config.arguments.fileserver_port = port 53 54 ConnectionServer.__init__(self, ip, port, self.handleRequest) 55 self.log.debug("Supported IP types: %s" % self.supported_ip_types) 56 57 if ip_type == "dual" and ip == "::": 58 # Also bind to ipv4 addres in dual mode 59 try: 60 self.log.debug("Binding proxy to %s:%s" % ("::", self.port)) 61 self.stream_server_proxy = StreamServer( 62 ("0.0.0.0", self.port), self.handleIncomingConnection, spawn=self.pool, backlog=100 63 ) 64 except Exception as err: 65 self.log.info("StreamServer proxy create error: %s" % Debug.formatException(err)) 66 67 self.port_opened = {} 68 69 self.sites = self.site_manager.sites 70 self.last_request = time.time() 71 self.files_parsing = {} 72 self.ui_server = None 73 74 def getRandomPort(self, ip, port_range_from, port_range_to): 75 self.log.info("Getting random port in range %s-%s..." % (port_range_from, port_range_to)) 76 tried = [] 77 for bind_retry in range(100): 78 port = random.randint(port_range_from, port_range_to) 79 if port in tried: 80 continue 81 tried.append(port) 82 sock = helper.createSocket(ip) 83 try: 84 sock.bind((ip, port)) 85 success = True 86 except Exception as err: 87 self.log.warning("Error binding to port %s: %s" % (port, err)) 88 success = False 89 sock.close() 90 if success: 91 self.log.info("Found unused random port: %s" % port) 92 return port 93 else: 94 time.sleep(0.1) 95 return False 96 97 def isIpv6Supported(self): 98 if config.tor == "always": 99 return True 100 # Test if we can connect to ipv6 address 101 ipv6_testip = "fcec:ae97:8902:d810:6c92:ec67:efb2:3ec5" 102 try: 103 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 104 sock.connect((ipv6_testip, 80)) 105 local_ipv6 = sock.getsockname()[0] 106 if local_ipv6 == "::1": 107 self.log.debug("IPv6 not supported, no local IPv6 address") 108 return False 109 else: 110 self.log.debug("IPv6 supported on IP %s" % local_ipv6) 111 return True 112 except socket.error as err: 113 self.log.warning("IPv6 not supported: %s" % err) 114 return False 115 except Exception as err: 116 self.log.error("IPv6 check error: %s" % err) 117 return False 118 119 def listenProxy(self): 120 try: 121 self.stream_server_proxy.serve_forever() 122 except Exception as err: 123 if err.errno == 98: # Address already in use error 124 self.log.debug("StreamServer proxy listen error: %s" % err) 125 else: 126 self.log.info("StreamServer proxy listen error: %s" % err) 127 128 # Handle request to fileserver 129 def handleRequest(self, connection, message): 130 if config.verbose: 131 if "params" in message: 132 self.log.debug( 133 "FileRequest: %s %s %s %s" % 134 (str(connection), message["cmd"], message["params"].get("site"), message["params"].get("inner_path")) 135 ) 136 else: 137 self.log.debug("FileRequest: %s %s" % (str(connection), message["cmd"])) 138 req = FileRequest(self, connection) 139 req.route(message["cmd"], message.get("req_id"), message.get("params")) 140 if not self.has_internet and not connection.is_private_ip: 141 self.has_internet = True 142 self.onInternetOnline() 143 144 def onInternetOnline(self): 145 self.log.info("Internet online") 146 gevent.spawn(self.checkSites, check_files=False, force_port_check=True) 147 148 # Reload the FileRequest class to prevent restarts in debug mode 149 def reload(self): 150 global FileRequest 151 import imp 152 FileRequest = imp.load_source("FileRequest", "src/File/FileRequest.py").FileRequest 153 154 def portCheck(self): 155 if config.offline: 156 self.log.info("Offline mode: port check disabled") 157 res = {"ipv4": None, "ipv6": None} 158 self.port_opened = res 159 return res 160 161 if config.ip_external: 162 for ip_external in config.ip_external: 163 SiteManager.peer_blacklist.append((ip_external, self.port)) # Add myself to peer blacklist 164 165 ip_external_types = set([helper.getIpType(ip) for ip in config.ip_external]) 166 res = { 167 "ipv4": "ipv4" in ip_external_types, 168 "ipv6": "ipv6" in ip_external_types 169 } 170 self.ip_external_list = config.ip_external 171 self.port_opened.update(res) 172 self.log.info("Server port opened based on configuration ipv4: %s, ipv6: %s" % (res["ipv4"], res["ipv6"])) 173 return res 174 175 self.port_opened = {} 176 if self.ui_server: 177 self.ui_server.updateWebsocket() 178 179 if "ipv6" in self.supported_ip_types: 180 res_ipv6_thread = gevent.spawn(self.portchecker.portCheck, self.port, "ipv6") 181 else: 182 res_ipv6_thread = None 183 184 res_ipv4 = self.portchecker.portCheck(self.port, "ipv4") 185 if not res_ipv4["opened"] and config.tor != "always": 186 if self.portchecker.portOpen(self.port): 187 res_ipv4 = self.portchecker.portCheck(self.port, "ipv4") 188 189 if res_ipv6_thread is None: 190 res_ipv6 = {"ip": None, "opened": None} 191 else: 192 res_ipv6 = res_ipv6_thread.get() 193 if res_ipv6["opened"] and not helper.getIpType(res_ipv6["ip"]) == "ipv6": 194 self.log.info("Invalid IPv6 address from port check: %s" % res_ipv6["ip"]) 195 res_ipv6["opened"] = False 196 197 self.ip_external_list = [] 198 for res_ip in [res_ipv4, res_ipv6]: 199 if res_ip["ip"] and res_ip["ip"] not in self.ip_external_list: 200 self.ip_external_list.append(res_ip["ip"]) 201 SiteManager.peer_blacklist.append((res_ip["ip"], self.port)) 202 203 self.log.info("Server port opened ipv4: %s, ipv6: %s" % (res_ipv4["opened"], res_ipv6["opened"])) 204 205 res = {"ipv4": res_ipv4["opened"], "ipv6": res_ipv6["opened"]} 206 207 # Add external IPs from local interfaces 208 interface_ips = helper.getInterfaceIps("ipv4") 209 if "ipv6" in self.supported_ip_types: 210 interface_ips += helper.getInterfaceIps("ipv6") 211 for ip in interface_ips: 212 if not helper.isPrivateIp(ip) and ip not in self.ip_external_list: 213 self.ip_external_list.append(ip) 214 res[helper.getIpType(ip)] = True # We have opened port if we have external ip 215 SiteManager.peer_blacklist.append((ip, self.port)) 216 self.log.debug("External ip found on interfaces: %s" % ip) 217 218 self.port_opened.update(res) 219 220 if self.ui_server: 221 self.ui_server.updateWebsocket() 222 223 return res 224 225 # Check site file integrity 226 def checkSite(self, site, check_files=False): 227 if site.isServing(): 228 site.announce(mode="startup") # Announce site to tracker 229 site.update(check_files=check_files) # Update site's content.json and download changed files 230 site.sendMyHashfield() 231 site.updateHashfield() 232 233 # Check sites integrity 234 @util.Noparallel() 235 def checkSites(self, check_files=False, force_port_check=False): 236 self.log.debug("Checking sites...") 237 s = time.time() 238 sites_checking = False 239 if not self.port_opened or force_port_check: # Test and open port if not tested yet 240 if len(self.sites) <= 2: # Don't wait port opening on first startup 241 sites_checking = True 242 for address, site in list(self.sites.items()): 243 gevent.spawn(self.checkSite, site, check_files) 244 245 self.portCheck() 246 247 if not self.port_opened["ipv4"]: 248 self.tor_manager.startOnions() 249 250 if not sites_checking: 251 check_pool = gevent.pool.Pool(5) 252 # Check sites integrity 253 for site in sorted(list(self.sites.values()), key=lambda site: site.settings.get("modified", 0), reverse=True): 254 if not site.isServing(): 255 continue 256 check_thread = check_pool.spawn(self.checkSite, site, check_files) # Check in new thread 257 time.sleep(2) 258 if site.settings.get("modified", 0) < time.time() - 60 * 60 * 24: # Not so active site, wait some sec to finish 259 check_thread.join(timeout=5) 260 self.log.debug("Checksites done in %.3fs" % (time.time() - s)) 261 262 def cleanupSites(self): 263 import gc 264 startup = True 265 time.sleep(5 * 60) # Sites already cleaned up on startup 266 peers_protected = set([]) 267 while 1: 268 # Sites health care every 20 min 269 self.log.debug( 270 "Running site cleanup, connections: %s, internet: %s, protected peers: %s" % 271 (len(self.connections), self.has_internet, len(peers_protected)) 272 ) 273 274 for address, site in list(self.sites.items()): 275 if not site.isServing(): 276 continue 277 278 if not startup: 279 site.cleanupPeers(peers_protected) 280 281 time.sleep(1) # Prevent too quick request 282 283 peers_protected = set([]) 284 for address, site in list(self.sites.items()): 285 if not site.isServing(): 286 continue 287 288 if site.peers: 289 with gevent.Timeout(10, exception=False): 290 site.announcer.announcePex() 291 292 # Last check modification failed 293 if site.content_updated is False: 294 site.update() 295 elif site.bad_files: 296 site.retryBadFiles() 297 298 if time.time() - site.settings.get("modified", 0) < 60 * 60 * 24 * 7: 299 # Keep active connections if site has been modified witin 7 days 300 connected_num = site.needConnections(check_site_on_reconnect=True) 301 302 if connected_num < config.connected_limit: # This site has small amount of peers, protect them from closing 303 peers_protected.update([peer.key for peer in site.getConnectedPeers()]) 304 305 time.sleep(1) # Prevent too quick request 306 307 site = None 308 gc.collect() # Implicit garbage collection 309 startup = False 310 time.sleep(60 * 20) 311 312 def announceSite(self, site): 313 site.announce(mode="update", pex=False) 314 active_site = time.time() - site.settings.get("modified", 0) < 24 * 60 * 60 315 if site.settings["own"] or active_site: 316 # Check connections more frequently on own and active sites to speed-up first connections 317 site.needConnections(check_site_on_reconnect=True) 318 site.sendMyHashfield(3) 319 site.updateHashfield(3) 320 321 # Announce sites every 20 min 322 def announceSites(self): 323 time.sleep(5 * 60) # Sites already announced on startup 324 while 1: 325 config.loadTrackersFile() 326 s = time.time() 327 for address, site in list(self.sites.items()): 328 if not site.isServing(): 329 continue 330 gevent.spawn(self.announceSite, site).join(timeout=10) 331 time.sleep(1) 332 taken = time.time() - s 333 334 # Query all trackers one-by-one in 20 minutes evenly distributed 335 sleep = max(0, 60 * 20 / len(config.trackers) - taken) 336 337 self.log.debug("Site announce tracker done in %.3fs, sleeping for %.3fs..." % (taken, sleep)) 338 time.sleep(sleep) 339 340 # Detects if computer back from wakeup 341 def wakeupWatcher(self): 342 last_time = time.time() 343 last_my_ips = socket.gethostbyname_ex('')[2] 344 while 1: 345 time.sleep(30) 346 is_time_changed = time.time() - max(self.last_request, last_time) > 60 * 3 347 if is_time_changed: 348 # If taken more than 3 minute then the computer was in sleep mode 349 self.log.info( 350 "Wakeup detected: time warp from %0.f to %0.f (%0.f sleep seconds), acting like startup..." % 351 (last_time, time.time(), time.time() - last_time) 352 ) 353 354 my_ips = socket.gethostbyname_ex('')[2] 355 is_ip_changed = my_ips != last_my_ips 356 if is_ip_changed: 357 self.log.info("IP change detected from %s to %s" % (last_my_ips, my_ips)) 358 359 if is_time_changed or is_ip_changed: 360 self.checkSites(check_files=False, force_port_check=True) 361 362 last_time = time.time() 363 last_my_ips = my_ips 364 365 # Bind and start serving sites 366 def start(self, check_sites=True): 367 if self.stopping: 368 return False 369 370 ConnectionServer.start(self) 371 372 try: 373 self.stream_server.start() 374 except Exception as err: 375 self.log.error("Error listening on: %s:%s: %s" % (self.ip, self.port, err)) 376 377 self.sites = self.site_manager.list() 378 if config.debug: 379 # Auto reload FileRequest on change 380 from Debug import DebugReloader 381 DebugReloader.watcher.addCallback(self.reload) 382 383 if check_sites: # Open port, Update sites, Check files integrity 384 gevent.spawn(self.checkSites) 385 386 thread_announce_sites = gevent.spawn(self.announceSites) 387 thread_cleanup_sites = gevent.spawn(self.cleanupSites) 388 thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher) 389 390 ConnectionServer.listen(self) 391 392 self.log.debug("Stopped.") 393 394 def stop(self): 395 if self.running and self.portchecker.upnp_port_opened: 396 self.log.debug('Closing port %d' % self.port) 397 try: 398 self.portchecker.portClose(self.port) 399 self.log.info('Closed port via upnp.') 400 except Exception as err: 401 self.log.info("Failed at attempt to use upnp to close port: %s" % err) 402 403 return ConnectionServer.stop(self)