Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import logging
2import time
3import random
4import socket
5import sys
6
7import gevent
8import gevent.pool
9from gevent.server import StreamServer
10
11import util
12from util import helper
13from Config import config
14from .FileRequest import FileRequest
15from Peer import PeerPortchecker
16from Site import SiteManager
17from Connection import ConnectionServer
18from Plugin import PluginManager
19from Debug import Debug
20
21
22@PluginManager.acceptPlugins
23class FileServer(ConnectionServer):
24
25 def __init__(self, ip=config.fileserver_ip, port=config.fileserver_port, ip_type=config.fileserver_ip_type):
26 self.site_manager = SiteManager.site_manager
27 self.portchecker = PeerPortchecker.PeerPortchecker(self)
28 self.log = logging.getLogger("FileServer")
29 self.ip_type = ip_type
30 self.ip_external_list = []
31
32 self.supported_ip_types = ["ipv4"] # Outgoing ip_type support
33 if helper.getIpType(ip) == "ipv6" or self.isIpv6Supported():
34 self.supported_ip_types.append("ipv6")
35
36 if ip_type == "ipv6" or (ip_type == "dual" and "ipv6" in self.supported_ip_types):
37 ip = ip.replace("*", "::")
38 else:
39 ip = ip.replace("*", "0.0.0.0")
40
41 if config.tor == "always":
42 port = config.tor_hs_port
43 config.fileserver_port = port
44 elif port == 0: # Use random port
45 port_range_from, port_range_to = list(map(int, config.fileserver_port_range.split("-")))
46 port = self.getRandomPort(ip, port_range_from, port_range_to)
47 config.fileserver_port = port
48 if not port:
49 raise Exception("Can't find bindable port")
50 if not config.tor == "always":
51 config.saveValue("fileserver_port", port) # Save random port value for next restart
52 config.arguments.fileserver_port = port
53
54 ConnectionServer.__init__(self, ip, port, self.handleRequest)
55 self.log.debug("Supported IP types: %s" % self.supported_ip_types)
56
57 if ip_type == "dual" and ip == "::":
58 # Also bind to ipv4 addres in dual mode
59 try:
60 self.log.debug("Binding proxy to %s:%s" % ("::", self.port))
61 self.stream_server_proxy = StreamServer(
62 ("0.0.0.0", self.port), self.handleIncomingConnection, spawn=self.pool, backlog=100
63 )
64 except Exception as err:
65 self.log.info("StreamServer proxy create error: %s" % Debug.formatException(err))
66
67 self.port_opened = {}
68
69 self.sites = self.site_manager.sites
70 self.last_request = time.time()
71 self.files_parsing = {}
72 self.ui_server = None
73
74 def getRandomPort(self, ip, port_range_from, port_range_to):
75 self.log.info("Getting random port in range %s-%s..." % (port_range_from, port_range_to))
76 tried = []
77 for bind_retry in range(100):
78 port = random.randint(port_range_from, port_range_to)
79 if port in tried:
80 continue
81 tried.append(port)
82 sock = helper.createSocket(ip)
83 try:
84 sock.bind((ip, port))
85 success = True
86 except Exception as err:
87 self.log.warning("Error binding to port %s: %s" % (port, err))
88 success = False
89 sock.close()
90 if success:
91 self.log.info("Found unused random port: %s" % port)
92 return port
93 else:
94 time.sleep(0.1)
95 return False
96
97 def isIpv6Supported(self):
98 if config.tor == "always":
99 return True
100 # Test if we can connect to ipv6 address
101 ipv6_testip = "fcec:ae97:8902:d810:6c92:ec67:efb2:3ec5"
102 try:
103 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
104 sock.connect((ipv6_testip, 80))
105 local_ipv6 = sock.getsockname()[0]
106 if local_ipv6 == "::1":
107 self.log.debug("IPv6 not supported, no local IPv6 address")
108 return False
109 else:
110 self.log.debug("IPv6 supported on IP %s" % local_ipv6)
111 return True
112 except socket.error as err:
113 self.log.warning("IPv6 not supported: %s" % err)
114 return False
115 except Exception as err:
116 self.log.error("IPv6 check error: %s" % err)
117 return False
118
119 def listenProxy(self):
120 try:
121 self.stream_server_proxy.serve_forever()
122 except Exception as err:
123 if err.errno == 98: # Address already in use error
124 self.log.debug("StreamServer proxy listen error: %s" % err)
125 else:
126 self.log.info("StreamServer proxy listen error: %s" % err)
127
128 # Handle request to fileserver
129 def handleRequest(self, connection, message):
130 if config.verbose:
131 if "params" in message:
132 self.log.debug(
133 "FileRequest: %s %s %s %s" %
134 (str(connection), message["cmd"], message["params"].get("site"), message["params"].get("inner_path"))
135 )
136 else:
137 self.log.debug("FileRequest: %s %s" % (str(connection), message["cmd"]))
138 req = FileRequest(self, connection)
139 req.route(message["cmd"], message.get("req_id"), message.get("params"))
140 if not self.has_internet and not connection.is_private_ip:
141 self.has_internet = True
142 self.onInternetOnline()
143
144 def onInternetOnline(self):
145 self.log.info("Internet online")
146 gevent.spawn(self.checkSites, check_files=False, force_port_check=True)
147
148 # Reload the FileRequest class to prevent restarts in debug mode
149 def reload(self):
150 global FileRequest
151 import imp
152 FileRequest = imp.load_source("FileRequest", "src/File/FileRequest.py").FileRequest
153
154 def portCheck(self):
155 if config.offline:
156 self.log.info("Offline mode: port check disabled")
157 res = {"ipv4": None, "ipv6": None}
158 self.port_opened = res
159 return res
160
161 if config.ip_external:
162 for ip_external in config.ip_external:
163 SiteManager.peer_blacklist.append((ip_external, self.port)) # Add myself to peer blacklist
164
165 ip_external_types = set([helper.getIpType(ip) for ip in config.ip_external])
166 res = {
167 "ipv4": "ipv4" in ip_external_types,
168 "ipv6": "ipv6" in ip_external_types
169 }
170 self.ip_external_list = config.ip_external
171 self.port_opened.update(res)
172 self.log.info("Server port opened based on configuration ipv4: %s, ipv6: %s" % (res["ipv4"], res["ipv6"]))
173 return res
174
175 self.port_opened = {}
176 if self.ui_server:
177 self.ui_server.updateWebsocket()
178
179 if "ipv6" in self.supported_ip_types:
180 res_ipv6_thread = gevent.spawn(self.portchecker.portCheck, self.port, "ipv6")
181 else:
182 res_ipv6_thread = None
183
184 res_ipv4 = self.portchecker.portCheck(self.port, "ipv4")
185 if not res_ipv4["opened"] and config.tor != "always":
186 if self.portchecker.portOpen(self.port):
187 res_ipv4 = self.portchecker.portCheck(self.port, "ipv4")
188
189 if res_ipv6_thread is None:
190 res_ipv6 = {"ip": None, "opened": None}
191 else:
192 res_ipv6 = res_ipv6_thread.get()
193 if res_ipv6["opened"] and not helper.getIpType(res_ipv6["ip"]) == "ipv6":
194 self.log.info("Invalid IPv6 address from port check: %s" % res_ipv6["ip"])
195 res_ipv6["opened"] = False
196
197 self.ip_external_list = []
198 for res_ip in [res_ipv4, res_ipv6]:
199 if res_ip["ip"] and res_ip["ip"] not in self.ip_external_list:
200 self.ip_external_list.append(res_ip["ip"])
201 SiteManager.peer_blacklist.append((res_ip["ip"], self.port))
202
203 self.log.info("Server port opened ipv4: %s, ipv6: %s" % (res_ipv4["opened"], res_ipv6["opened"]))
204
205 res = {"ipv4": res_ipv4["opened"], "ipv6": res_ipv6["opened"]}
206
207 # Add external IPs from local interfaces
208 interface_ips = helper.getInterfaceIps("ipv4")
209 if "ipv6" in self.supported_ip_types:
210 interface_ips += helper.getInterfaceIps("ipv6")
211 for ip in interface_ips:
212 if not helper.isPrivateIp(ip) and ip not in self.ip_external_list:
213 self.ip_external_list.append(ip)
214 res[helper.getIpType(ip)] = True # We have opened port if we have external ip
215 SiteManager.peer_blacklist.append((ip, self.port))
216 self.log.debug("External ip found on interfaces: %s" % ip)
217
218 self.port_opened.update(res)
219
220 if self.ui_server:
221 self.ui_server.updateWebsocket()
222
223 return res
224
225 # Check site file integrity
226 def checkSite(self, site, check_files=False):
227 if site.isServing():
228 site.announce(mode="startup") # Announce site to tracker
229 site.update(check_files=check_files) # Update site's content.json and download changed files
230 site.sendMyHashfield()
231 site.updateHashfield()
232
233 # Check sites integrity
234 @util.Noparallel()
235 def checkSites(self, check_files=False, force_port_check=False):
236 self.log.debug("Checking sites...")
237 s = time.time()
238 sites_checking = False
239 if not self.port_opened or force_port_check: # Test and open port if not tested yet
240 if len(self.sites) <= 2: # Don't wait port opening on first startup
241 sites_checking = True
242 for address, site in list(self.sites.items()):
243 gevent.spawn(self.checkSite, site, check_files)
244
245 self.portCheck()
246
247 if not self.port_opened["ipv4"]:
248 self.tor_manager.startOnions()
249
250 if not sites_checking:
251 check_pool = gevent.pool.Pool(5)
252 # Check sites integrity
253 for site in sorted(list(self.sites.values()), key=lambda site: site.settings.get("modified", 0), reverse=True):
254 if not site.isServing():
255 continue
256 check_thread = check_pool.spawn(self.checkSite, site, check_files) # Check in new thread
257 time.sleep(2)
258 if site.settings.get("modified", 0) < time.time() - 60 * 60 * 24: # Not so active site, wait some sec to finish
259 check_thread.join(timeout=5)
260 self.log.debug("Checksites done in %.3fs" % (time.time() - s))
261
262 def cleanupSites(self):
263 import gc
264 startup = True
265 time.sleep(5 * 60) # Sites already cleaned up on startup
266 peers_protected = set([])
267 while 1:
268 # Sites health care every 20 min
269 self.log.debug(
270 "Running site cleanup, connections: %s, internet: %s, protected peers: %s" %
271 (len(self.connections), self.has_internet, len(peers_protected))
272 )
273
274 for address, site in list(self.sites.items()):
275 if not site.isServing():
276 continue
277
278 if not startup:
279 site.cleanupPeers(peers_protected)
280
281 time.sleep(1) # Prevent too quick request
282
283 peers_protected = set([])
284 for address, site in list(self.sites.items()):
285 if not site.isServing():
286 continue
287
288 if site.peers:
289 with gevent.Timeout(10, exception=False):
290 site.announcer.announcePex()
291
292 # Last check modification failed
293 if site.content_updated is False:
294 site.update()
295 elif site.bad_files:
296 site.retryBadFiles()
297
298 if time.time() - site.settings.get("modified", 0) < 60 * 60 * 24 * 7:
299 # Keep active connections if site has been modified witin 7 days
300 connected_num = site.needConnections(check_site_on_reconnect=True)
301
302 if connected_num < config.connected_limit: # This site has small amount of peers, protect them from closing
303 peers_protected.update([peer.key for peer in site.getConnectedPeers()])
304
305 time.sleep(1) # Prevent too quick request
306
307 site = None
308 gc.collect() # Implicit garbage collection
309 startup = False
310 time.sleep(60 * 20)
311
312 def announceSite(self, site):
313 site.announce(mode="update", pex=False)
314 active_site = time.time() - site.settings.get("modified", 0) < 24 * 60 * 60
315 if site.settings["own"] or active_site:
316 # Check connections more frequently on own and active sites to speed-up first connections
317 site.needConnections(check_site_on_reconnect=True)
318 site.sendMyHashfield(3)
319 site.updateHashfield(3)
320
321 # Announce sites every 20 min
322 def announceSites(self):
323 time.sleep(5 * 60) # Sites already announced on startup
324 while 1:
325 config.loadTrackersFile()
326 s = time.time()
327 for address, site in list(self.sites.items()):
328 if not site.isServing():
329 continue
330 gevent.spawn(self.announceSite, site).join(timeout=10)
331 time.sleep(1)
332 taken = time.time() - s
333
334 # Query all trackers one-by-one in 20 minutes evenly distributed
335 sleep = max(0, 60 * 20 / len(config.trackers) - taken)
336
337 self.log.debug("Site announce tracker done in %.3fs, sleeping for %.3fs..." % (taken, sleep))
338 time.sleep(sleep)
339
340 # Detects if computer back from wakeup
341 def wakeupWatcher(self):
342 last_time = time.time()
343 last_my_ips = socket.gethostbyname_ex('')[2]
344 while 1:
345 time.sleep(30)
346 is_time_changed = time.time() - max(self.last_request, last_time) > 60 * 3
347 if is_time_changed:
348 # If taken more than 3 minute then the computer was in sleep mode
349 self.log.info(
350 "Wakeup detected: time warp from %0.f to %0.f (%0.f sleep seconds), acting like startup..." %
351 (last_time, time.time(), time.time() - last_time)
352 )
353
354 my_ips = socket.gethostbyname_ex('')[2]
355 is_ip_changed = my_ips != last_my_ips
356 if is_ip_changed:
357 self.log.info("IP change detected from %s to %s" % (last_my_ips, my_ips))
358
359 if is_time_changed or is_ip_changed:
360 self.checkSites(check_files=False, force_port_check=True)
361
362 last_time = time.time()
363 last_my_ips = my_ips
364
365 # Bind and start serving sites
366 def start(self, check_sites=True):
367 if self.stopping:
368 return False
369
370 ConnectionServer.start(self)
371
372 try:
373 self.stream_server.start()
374 except Exception as err:
375 self.log.error("Error listening on: %s:%s: %s" % (self.ip, self.port, err))
376
377 self.sites = self.site_manager.list()
378 if config.debug:
379 # Auto reload FileRequest on change
380 from Debug import DebugReloader
381 DebugReloader.watcher.addCallback(self.reload)
382
383 if check_sites: # Open port, Update sites, Check files integrity
384 gevent.spawn(self.checkSites)
385
386 thread_announce_sites = gevent.spawn(self.announceSites)
387 thread_cleanup_sites = gevent.spawn(self.cleanupSites)
388 thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher)
389
390 ConnectionServer.listen(self)
391
392 self.log.debug("Stopped.")
393
394 def stop(self):
395 if self.running and self.portchecker.upnp_port_opened:
396 self.log.debug('Closing port %d' % self.port)
397 try:
398 self.portchecker.portClose(self.port)
399 self.log.info('Closed port via upnp.')
400 except Exception as err:
401 self.log.info("Failed at attempt to use upnp to close port: %s" % err)
402
403 return ConnectionServer.stop(self)