Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 373 lines 15 kB view raw
1import logging 2import time 3import sys 4import socket 5from collections import defaultdict 6 7import gevent 8import msgpack 9from gevent.server import StreamServer 10from gevent.pool import Pool 11 12import util 13from util import helper 14from Debug import Debug 15from .Connection import Connection 16from Config import config 17from Crypt import CryptConnection 18from Crypt import CryptHash 19from Tor import TorManager 20from Site import SiteManager 21 22 23class ConnectionServer(object): 24 def __init__(self, ip=None, port=None, request_handler=None): 25 if not ip: 26 if config.fileserver_ip_type == "ipv6": 27 ip = "::1" 28 else: 29 ip = "127.0.0.1" 30 port = 15441 31 self.ip = ip 32 self.port = port 33 self.last_connection_id = 1 # Connection id incrementer 34 self.log = logging.getLogger("ConnServer") 35 self.port_opened = {} 36 self.peer_blacklist = SiteManager.peer_blacklist 37 38 self.tor_manager = TorManager(self.ip, self.port) 39 self.connections = [] # Connections 40 self.whitelist = config.ip_local # No flood protection on this ips 41 self.ip_incoming = {} # Incoming connections from ip in the last minute to avoid connection flood 42 self.broken_ssl_ips = {} # Peerids of broken ssl connections 43 self.ips = {} # Connection by ip 44 self.has_internet = True # Internet outage detection 45 46 self.stream_server = None 47 self.stream_server_proxy = None 48 self.running = False 49 self.stopping = False 50 self.thread_checker = None 51 52 self.stat_recv = defaultdict(lambda: defaultdict(int)) 53 self.stat_sent = defaultdict(lambda: defaultdict(int)) 54 self.bytes_recv = 0 55 self.bytes_sent = 0 56 self.num_recv = 0 57 self.num_sent = 0 58 59 self.num_incoming = 0 60 self.num_outgoing = 0 61 self.had_external_incoming = False 62 63 self.timecorrection = 0.0 64 self.pool = Pool(500) # do not accept more than 500 connections 65 66 # Bittorrent style peerid 67 self.peer_id = "-UT3530-%s" % CryptHash.random(12, "base64") 68 69 # Check msgpack version 70 if msgpack.version[0] == 0 and msgpack.version[1] < 4: 71 self.log.error( 72 "Error: Unsupported msgpack version: %s (<0.4.0), please run `sudo apt-get install python-pip; sudo pip install msgpack --upgrade`" % 73 str(msgpack.version) 74 ) 75 sys.exit(0) 76 77 if request_handler: 78 self.handleRequest = request_handler 79 80 def start(self, check_connections=True): 81 if self.stopping: 82 return False 83 self.running = True 84 if check_connections: 85 self.thread_checker = gevent.spawn(self.checkConnections) 86 CryptConnection.manager.loadCerts() 87 if config.tor != "disable": 88 self.tor_manager.start() 89 if not self.port: 90 self.log.info("No port found, not binding") 91 return False 92 93 self.log.debug("Binding to: %s:%s, (msgpack: %s), supported crypt: %s" % ( 94 self.ip, self.port, ".".join(map(str, msgpack.version)), 95 CryptConnection.manager.crypt_supported 96 )) 97 try: 98 self.stream_server = StreamServer( 99 (self.ip, self.port), self.handleIncomingConnection, spawn=self.pool, backlog=100 100 ) 101 except Exception as err: 102 self.log.info("StreamServer create error: %s" % Debug.formatException(err)) 103 104 def listen(self): 105 if not self.running: 106 return None 107 108 if self.stream_server_proxy: 109 gevent.spawn(self.listenProxy) 110 try: 111 self.stream_server.serve_forever() 112 except Exception as err: 113 self.log.info("StreamServer listen error: %s" % err) 114 return False 115 self.log.debug("Stopped.") 116 117 def stop(self): 118 self.log.debug("Stopping %s" % self.stream_server) 119 self.stopping = True 120 self.running = False 121 if self.thread_checker: 122 gevent.kill(self.thread_checker) 123 if self.stream_server: 124 self.stream_server.stop() 125 126 def closeConnections(self): 127 self.log.debug("Closing all connection: %s" % len(self.connections)) 128 for connection in self.connections[:]: 129 connection.close("Close all connections") 130 131 def handleIncomingConnection(self, sock, addr): 132 if config.offline: 133 sock.close() 134 return False 135 136 ip, port = addr[0:2] 137 ip = ip.lower() 138 if ip.startswith("::ffff:"): # IPv6 to IPv4 mapping 139 ip = ip.replace("::ffff:", "", 1) 140 self.num_incoming += 1 141 142 if not self.had_external_incoming and not helper.isPrivateIp(ip): 143 self.had_external_incoming = True 144 145 # Connection flood protection 146 if ip in self.ip_incoming and ip not in self.whitelist: 147 self.ip_incoming[ip] += 1 148 if self.ip_incoming[ip] > 6: # Allow 6 in 1 minute from same ip 149 self.log.debug("Connection flood detected from %s" % ip) 150 time.sleep(30) 151 sock.close() 152 return False 153 else: 154 self.ip_incoming[ip] = 1 155 156 connection = Connection(self, ip, port, sock) 157 self.connections.append(connection) 158 if ip not in config.ip_local: 159 self.ips[ip] = connection 160 connection.handleIncomingConnection(sock) 161 162 def handleMessage(self, *args, **kwargs): 163 pass 164 165 def getConnection(self, ip=None, port=None, peer_id=None, create=True, site=None, is_tracker_connection=False): 166 ip_type = helper.getIpType(ip) 167 has_per_site_onion = (ip.endswith(".onion") or self.port_opened.get(ip_type, None) == False) and self.tor_manager.start_onions and site 168 if has_per_site_onion: # Site-unique connection for Tor 169 if ip.endswith(".onion"): 170 site_onion = self.tor_manager.getOnion(site.address) 171 else: 172 site_onion = self.tor_manager.getOnion("global") 173 key = ip + site_onion 174 else: 175 key = ip 176 177 # Find connection by ip 178 if key in self.ips: 179 connection = self.ips[key] 180 if not peer_id or connection.handshake.get("peer_id") == peer_id: # Filter by peer_id 181 if not connection.connected and create: 182 succ = connection.event_connected.get() # Wait for connection 183 if not succ: 184 raise Exception("Connection event return error") 185 return connection 186 187 # Recover from connection pool 188 for connection in self.connections: 189 if connection.ip == ip: 190 if peer_id and connection.handshake.get("peer_id") != peer_id: # Does not match 191 continue 192 if ip.endswith(".onion") and self.tor_manager.start_onions and ip.replace(".onion", "") != connection.target_onion: 193 # For different site 194 continue 195 if not connection.connected and create: 196 succ = connection.event_connected.get() # Wait for connection 197 if not succ: 198 raise Exception("Connection event return error") 199 return connection 200 201 # No connection found 202 if create and not config.offline: # Allow to create new connection if not found 203 if port == 0: 204 raise Exception("This peer is not connectable") 205 206 if (ip, port) in self.peer_blacklist and not is_tracker_connection: 207 raise Exception("This peer is blacklisted") 208 209 try: 210 if has_per_site_onion: # Lock connection to site 211 connection = Connection(self, ip, port, target_onion=site_onion, is_tracker_connection=is_tracker_connection) 212 else: 213 connection = Connection(self, ip, port, is_tracker_connection=is_tracker_connection) 214 self.num_outgoing += 1 215 self.ips[key] = connection 216 self.connections.append(connection) 217 connection.log("Connecting... (site: %s)" % site) 218 succ = connection.connect() 219 if not succ: 220 connection.close("Connection event return error") 221 raise Exception("Connection event return error") 222 223 except Exception as err: 224 connection.close("%s Connect error: %s" % (ip, Debug.formatException(err))) 225 raise err 226 227 if len(self.connections) > config.global_connected_limit: 228 gevent.spawn(self.checkMaxConnections) 229 230 return connection 231 else: 232 return None 233 234 def removeConnection(self, connection): 235 # Delete if same as in registry 236 if self.ips.get(connection.ip) == connection: 237 del self.ips[connection.ip] 238 # Site locked connection 239 if connection.target_onion: 240 if self.ips.get(connection.ip + connection.target_onion) == connection: 241 del self.ips[connection.ip + connection.target_onion] 242 # Cert pinned connection 243 if connection.cert_pin and self.ips.get(connection.ip + "#" + connection.cert_pin) == connection: 244 del self.ips[connection.ip + "#" + connection.cert_pin] 245 246 if connection in self.connections: 247 self.connections.remove(connection) 248 249 def checkConnections(self): 250 run_i = 0 251 time.sleep(15) 252 while self.running: 253 run_i += 1 254 self.ip_incoming = {} # Reset connected ips counter 255 last_message_time = 0 256 s = time.time() 257 for connection in self.connections[:]: # Make a copy 258 if connection.ip.endswith(".onion") or config.tor == "always": 259 timeout_multipler = 2 260 else: 261 timeout_multipler = 1 262 263 idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time) 264 if connection.last_message_time > last_message_time and not connection.is_private_ip: 265 # Message from local IPs does not means internet connection 266 last_message_time = connection.last_message_time 267 268 if connection.unpacker and idle > 30: 269 # Delete the unpacker if not needed 270 del connection.unpacker 271 connection.unpacker = None 272 273 elif connection.last_cmd_sent == "announce" and idle > 20: # Bootstrapper connection close after 20 sec 274 connection.close("[Cleanup] Tracker connection, idle: %.3fs" % idle) 275 276 if idle > 60 * 60: 277 # Wake up after 1h 278 connection.close("[Cleanup] After wakeup, idle: %.3fs" % idle) 279 280 elif idle > 20 * 60 and connection.last_send_time < time.time() - 10: 281 # Idle more than 20 min and we have not sent request in last 10 sec 282 if not connection.ping(): 283 connection.close("[Cleanup] Ping timeout") 284 285 elif idle > 10 * timeout_multipler and connection.incomplete_buff_recv > 0: 286 # Incomplete data with more than 10 sec idle 287 connection.close("[Cleanup] Connection buff stalled") 288 289 elif idle > 10 * timeout_multipler and connection.protocol == "?": # No connection after 10 sec 290 connection.close( 291 "[Cleanup] Connect timeout: %.3fs" % idle 292 ) 293 294 elif idle > 10 * timeout_multipler and connection.waiting_requests and time.time() - connection.last_send_time > 10 * timeout_multipler: 295 # Sent command and no response in 10 sec 296 connection.close( 297 "[Cleanup] Command %s timeout: %.3fs" % (connection.last_cmd_sent, time.time() - connection.last_send_time) 298 ) 299 300 elif idle < 60 and connection.bad_actions > 40: 301 connection.close( 302 "[Cleanup] Too many bad actions: %s" % connection.bad_actions 303 ) 304 305 elif idle > 5 * 60 and connection.sites == 0: 306 connection.close( 307 "[Cleanup] No site for connection" 308 ) 309 310 elif run_i % 90 == 0: 311 # Reset bad action counter every 30 min 312 connection.bad_actions = 0 313 314 # Internet outage detection 315 if time.time() - last_message_time > max(60, 60 * 10 / max(1, float(len(self.connections)) / 50)): 316 # Offline: Last message more than 60-600sec depending on connection number 317 if self.has_internet and last_message_time: 318 self.has_internet = False 319 self.onInternetOffline() 320 else: 321 # Online 322 if not self.has_internet: 323 self.has_internet = True 324 self.onInternetOnline() 325 326 self.timecorrection = self.getTimecorrection() 327 328 if time.time() - s > 0.01: 329 self.log.debug("Connection cleanup in %.3fs" % (time.time() - s)) 330 331 time.sleep(15) 332 self.log.debug("Checkconnections ended") 333 334 @util.Noparallel(blocking=False) 335 def checkMaxConnections(self): 336 if len(self.connections) < config.global_connected_limit: 337 return 0 338 339 s = time.time() 340 num_connected_before = len(self.connections) 341 self.connections.sort(key=lambda connection: connection.sites) 342 num_closed = 0 343 for connection in self.connections: 344 idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time) 345 if idle > 60: 346 connection.close("Connection limit reached") 347 num_closed += 1 348 if num_closed > config.global_connected_limit * 0.1: 349 break 350 351 self.log.debug("Closed %s connections of %s after reached limit %s in %.3fs" % ( 352 num_closed, num_connected_before, config.global_connected_limit, time.time() - s 353 )) 354 return num_closed 355 356 def onInternetOnline(self): 357 self.log.info("Internet online") 358 359 def onInternetOffline(self): 360 self.had_external_incoming = False 361 self.log.info("Internet offline") 362 363 def getTimecorrection(self): 364 corrections = sorted([ 365 connection.handshake.get("time") - connection.handshake_time + connection.last_ping_delay 366 for connection in self.connections 367 if connection.handshake.get("time") and connection.last_ping_delay 368 ]) 369 if len(corrections) < 9: 370 return 0.0 371 mid = int(len(corrections) / 2 - 1) 372 median = (corrections[mid - 1] + corrections[mid] + corrections[mid + 1]) / 3 373 return median