Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import logging
2import time
3import sys
4import socket
5from collections import defaultdict
6
7import gevent
8import msgpack
9from gevent.server import StreamServer
10from gevent.pool import Pool
11
12import util
13from util import helper
14from Debug import Debug
15from .Connection import Connection
16from Config import config
17from Crypt import CryptConnection
18from Crypt import CryptHash
19from Tor import TorManager
20from Site import SiteManager
21
22
23class ConnectionServer(object):
24 def __init__(self, ip=None, port=None, request_handler=None):
25 if not ip:
26 if config.fileserver_ip_type == "ipv6":
27 ip = "::1"
28 else:
29 ip = "127.0.0.1"
30 port = 15441
31 self.ip = ip
32 self.port = port
33 self.last_connection_id = 1 # Connection id incrementer
34 self.log = logging.getLogger("ConnServer")
35 self.port_opened = {}
36 self.peer_blacklist = SiteManager.peer_blacklist
37
38 self.tor_manager = TorManager(self.ip, self.port)
39 self.connections = [] # Connections
40 self.whitelist = config.ip_local # No flood protection on this ips
41 self.ip_incoming = {} # Incoming connections from ip in the last minute to avoid connection flood
42 self.broken_ssl_ips = {} # Peerids of broken ssl connections
43 self.ips = {} # Connection by ip
44 self.has_internet = True # Internet outage detection
45
46 self.stream_server = None
47 self.stream_server_proxy = None
48 self.running = False
49 self.stopping = False
50 self.thread_checker = None
51
52 self.stat_recv = defaultdict(lambda: defaultdict(int))
53 self.stat_sent = defaultdict(lambda: defaultdict(int))
54 self.bytes_recv = 0
55 self.bytes_sent = 0
56 self.num_recv = 0
57 self.num_sent = 0
58
59 self.num_incoming = 0
60 self.num_outgoing = 0
61 self.had_external_incoming = False
62
63 self.timecorrection = 0.0
64 self.pool = Pool(500) # do not accept more than 500 connections
65
66 # Bittorrent style peerid
67 self.peer_id = "-UT3530-%s" % CryptHash.random(12, "base64")
68
69 # Check msgpack version
70 if msgpack.version[0] == 0 and msgpack.version[1] < 4:
71 self.log.error(
72 "Error: Unsupported msgpack version: %s (<0.4.0), please run `sudo apt-get install python-pip; sudo pip install msgpack --upgrade`" %
73 str(msgpack.version)
74 )
75 sys.exit(0)
76
77 if request_handler:
78 self.handleRequest = request_handler
79
80 def start(self, check_connections=True):
81 if self.stopping:
82 return False
83 self.running = True
84 if check_connections:
85 self.thread_checker = gevent.spawn(self.checkConnections)
86 CryptConnection.manager.loadCerts()
87 if config.tor != "disable":
88 self.tor_manager.start()
89 if not self.port:
90 self.log.info("No port found, not binding")
91 return False
92
93 self.log.debug("Binding to: %s:%s, (msgpack: %s), supported crypt: %s" % (
94 self.ip, self.port, ".".join(map(str, msgpack.version)),
95 CryptConnection.manager.crypt_supported
96 ))
97 try:
98 self.stream_server = StreamServer(
99 (self.ip, self.port), self.handleIncomingConnection, spawn=self.pool, backlog=100
100 )
101 except Exception as err:
102 self.log.info("StreamServer create error: %s" % Debug.formatException(err))
103
104 def listen(self):
105 if not self.running:
106 return None
107
108 if self.stream_server_proxy:
109 gevent.spawn(self.listenProxy)
110 try:
111 self.stream_server.serve_forever()
112 except Exception as err:
113 self.log.info("StreamServer listen error: %s" % err)
114 return False
115 self.log.debug("Stopped.")
116
117 def stop(self):
118 self.log.debug("Stopping %s" % self.stream_server)
119 self.stopping = True
120 self.running = False
121 if self.thread_checker:
122 gevent.kill(self.thread_checker)
123 if self.stream_server:
124 self.stream_server.stop()
125
126 def closeConnections(self):
127 self.log.debug("Closing all connection: %s" % len(self.connections))
128 for connection in self.connections[:]:
129 connection.close("Close all connections")
130
131 def handleIncomingConnection(self, sock, addr):
132 if config.offline:
133 sock.close()
134 return False
135
136 ip, port = addr[0:2]
137 ip = ip.lower()
138 if ip.startswith("::ffff:"): # IPv6 to IPv4 mapping
139 ip = ip.replace("::ffff:", "", 1)
140 self.num_incoming += 1
141
142 if not self.had_external_incoming and not helper.isPrivateIp(ip):
143 self.had_external_incoming = True
144
145 # Connection flood protection
146 if ip in self.ip_incoming and ip not in self.whitelist:
147 self.ip_incoming[ip] += 1
148 if self.ip_incoming[ip] > 6: # Allow 6 in 1 minute from same ip
149 self.log.debug("Connection flood detected from %s" % ip)
150 time.sleep(30)
151 sock.close()
152 return False
153 else:
154 self.ip_incoming[ip] = 1
155
156 connection = Connection(self, ip, port, sock)
157 self.connections.append(connection)
158 if ip not in config.ip_local:
159 self.ips[ip] = connection
160 connection.handleIncomingConnection(sock)
161
162 def handleMessage(self, *args, **kwargs):
163 pass
164
165 def getConnection(self, ip=None, port=None, peer_id=None, create=True, site=None, is_tracker_connection=False):
166 ip_type = helper.getIpType(ip)
167 has_per_site_onion = (ip.endswith(".onion") or self.port_opened.get(ip_type, None) == False) and self.tor_manager.start_onions and site
168 if has_per_site_onion: # Site-unique connection for Tor
169 if ip.endswith(".onion"):
170 site_onion = self.tor_manager.getOnion(site.address)
171 else:
172 site_onion = self.tor_manager.getOnion("global")
173 key = ip + site_onion
174 else:
175 key = ip
176
177 # Find connection by ip
178 if key in self.ips:
179 connection = self.ips[key]
180 if not peer_id or connection.handshake.get("peer_id") == peer_id: # Filter by peer_id
181 if not connection.connected and create:
182 succ = connection.event_connected.get() # Wait for connection
183 if not succ:
184 raise Exception("Connection event return error")
185 return connection
186
187 # Recover from connection pool
188 for connection in self.connections:
189 if connection.ip == ip:
190 if peer_id and connection.handshake.get("peer_id") != peer_id: # Does not match
191 continue
192 if ip.endswith(".onion") and self.tor_manager.start_onions and ip.replace(".onion", "") != connection.target_onion:
193 # For different site
194 continue
195 if not connection.connected and create:
196 succ = connection.event_connected.get() # Wait for connection
197 if not succ:
198 raise Exception("Connection event return error")
199 return connection
200
201 # No connection found
202 if create and not config.offline: # Allow to create new connection if not found
203 if port == 0:
204 raise Exception("This peer is not connectable")
205
206 if (ip, port) in self.peer_blacklist and not is_tracker_connection:
207 raise Exception("This peer is blacklisted")
208
209 try:
210 if has_per_site_onion: # Lock connection to site
211 connection = Connection(self, ip, port, target_onion=site_onion, is_tracker_connection=is_tracker_connection)
212 else:
213 connection = Connection(self, ip, port, is_tracker_connection=is_tracker_connection)
214 self.num_outgoing += 1
215 self.ips[key] = connection
216 self.connections.append(connection)
217 connection.log("Connecting... (site: %s)" % site)
218 succ = connection.connect()
219 if not succ:
220 connection.close("Connection event return error")
221 raise Exception("Connection event return error")
222
223 except Exception as err:
224 connection.close("%s Connect error: %s" % (ip, Debug.formatException(err)))
225 raise err
226
227 if len(self.connections) > config.global_connected_limit:
228 gevent.spawn(self.checkMaxConnections)
229
230 return connection
231 else:
232 return None
233
234 def removeConnection(self, connection):
235 # Delete if same as in registry
236 if self.ips.get(connection.ip) == connection:
237 del self.ips[connection.ip]
238 # Site locked connection
239 if connection.target_onion:
240 if self.ips.get(connection.ip + connection.target_onion) == connection:
241 del self.ips[connection.ip + connection.target_onion]
242 # Cert pinned connection
243 if connection.cert_pin and self.ips.get(connection.ip + "#" + connection.cert_pin) == connection:
244 del self.ips[connection.ip + "#" + connection.cert_pin]
245
246 if connection in self.connections:
247 self.connections.remove(connection)
248
249 def checkConnections(self):
250 run_i = 0
251 time.sleep(15)
252 while self.running:
253 run_i += 1
254 self.ip_incoming = {} # Reset connected ips counter
255 last_message_time = 0
256 s = time.time()
257 for connection in self.connections[:]: # Make a copy
258 if connection.ip.endswith(".onion") or config.tor == "always":
259 timeout_multipler = 2
260 else:
261 timeout_multipler = 1
262
263 idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time)
264 if connection.last_message_time > last_message_time and not connection.is_private_ip:
265 # Message from local IPs does not means internet connection
266 last_message_time = connection.last_message_time
267
268 if connection.unpacker and idle > 30:
269 # Delete the unpacker if not needed
270 del connection.unpacker
271 connection.unpacker = None
272
273 elif connection.last_cmd_sent == "announce" and idle > 20: # Bootstrapper connection close after 20 sec
274 connection.close("[Cleanup] Tracker connection, idle: %.3fs" % idle)
275
276 if idle > 60 * 60:
277 # Wake up after 1h
278 connection.close("[Cleanup] After wakeup, idle: %.3fs" % idle)
279
280 elif idle > 20 * 60 and connection.last_send_time < time.time() - 10:
281 # Idle more than 20 min and we have not sent request in last 10 sec
282 if not connection.ping():
283 connection.close("[Cleanup] Ping timeout")
284
285 elif idle > 10 * timeout_multipler and connection.incomplete_buff_recv > 0:
286 # Incomplete data with more than 10 sec idle
287 connection.close("[Cleanup] Connection buff stalled")
288
289 elif idle > 10 * timeout_multipler and connection.protocol == "?": # No connection after 10 sec
290 connection.close(
291 "[Cleanup] Connect timeout: %.3fs" % idle
292 )
293
294 elif idle > 10 * timeout_multipler and connection.waiting_requests and time.time() - connection.last_send_time > 10 * timeout_multipler:
295 # Sent command and no response in 10 sec
296 connection.close(
297 "[Cleanup] Command %s timeout: %.3fs" % (connection.last_cmd_sent, time.time() - connection.last_send_time)
298 )
299
300 elif idle < 60 and connection.bad_actions > 40:
301 connection.close(
302 "[Cleanup] Too many bad actions: %s" % connection.bad_actions
303 )
304
305 elif idle > 5 * 60 and connection.sites == 0:
306 connection.close(
307 "[Cleanup] No site for connection"
308 )
309
310 elif run_i % 90 == 0:
311 # Reset bad action counter every 30 min
312 connection.bad_actions = 0
313
314 # Internet outage detection
315 if time.time() - last_message_time > max(60, 60 * 10 / max(1, float(len(self.connections)) / 50)):
316 # Offline: Last message more than 60-600sec depending on connection number
317 if self.has_internet and last_message_time:
318 self.has_internet = False
319 self.onInternetOffline()
320 else:
321 # Online
322 if not self.has_internet:
323 self.has_internet = True
324 self.onInternetOnline()
325
326 self.timecorrection = self.getTimecorrection()
327
328 if time.time() - s > 0.01:
329 self.log.debug("Connection cleanup in %.3fs" % (time.time() - s))
330
331 time.sleep(15)
332 self.log.debug("Checkconnections ended")
333
334 @util.Noparallel(blocking=False)
335 def checkMaxConnections(self):
336 if len(self.connections) < config.global_connected_limit:
337 return 0
338
339 s = time.time()
340 num_connected_before = len(self.connections)
341 self.connections.sort(key=lambda connection: connection.sites)
342 num_closed = 0
343 for connection in self.connections:
344 idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time)
345 if idle > 60:
346 connection.close("Connection limit reached")
347 num_closed += 1
348 if num_closed > config.global_connected_limit * 0.1:
349 break
350
351 self.log.debug("Closed %s connections of %s after reached limit %s in %.3fs" % (
352 num_closed, num_connected_before, config.global_connected_limit, time.time() - s
353 ))
354 return num_closed
355
356 def onInternetOnline(self):
357 self.log.info("Internet online")
358
359 def onInternetOffline(self):
360 self.had_external_incoming = False
361 self.log.info("Internet offline")
362
363 def getTimecorrection(self):
364 corrections = sorted([
365 connection.handshake.get("time") - connection.handshake_time + connection.last_ping_delay
366 for connection in self.connections
367 if connection.handshake.get("time") and connection.last_ping_delay
368 ])
369 if len(corrections) < 9:
370 return 0.0
371 mid = int(len(corrections) / 2 - 1)
372 median = (corrections[mid - 1] + corrections[mid] + corrections[mid + 1]) / 3
373 return median