Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 635 lines 27 kB view raw
1import socket 2import time 3 4import gevent 5try: 6 from gevent.coros import RLock 7except: 8 from gevent.lock import RLock 9 10from Config import config 11from Debug import Debug 12from util import Msgpack 13from Crypt import CryptConnection 14from util import helper 15 16 17class Connection(object): 18 __slots__ = ( 19 "sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "unpacker_bytes", "req_id", "ip_type", 20 "handshake", "crypt", "connected", "event_connected", "closed", "start_time", "handshake_time", "last_recv_time", "is_private_ip", "is_tracker_connection", 21 "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock", 22 "last_ping_delay", "last_req_time", "last_cmd_sent", "last_cmd_recv", "bad_actions", "sites", "name", "waiting_requests", "waiting_streams" 23 ) 24 25 def __init__(self, server, ip, port, sock=None, target_onion=None, is_tracker_connection=False): 26 self.sock = sock 27 self.cert_pin = None 28 if "#" in ip: 29 ip, self.cert_pin = ip.split("#") 30 self.target_onion = target_onion # Requested onion adress 31 self.id = server.last_connection_id 32 server.last_connection_id += 1 33 self.protocol = "?" 34 self.type = "?" 35 self.ip_type = "?" 36 self.port = int(port) 37 self.setIp(ip) 38 39 if helper.isPrivateIp(self.ip) and self.ip not in config.ip_local: 40 self.is_private_ip = True 41 else: 42 self.is_private_ip = False 43 self.is_tracker_connection = is_tracker_connection 44 45 self.server = server 46 self.unpacker = None # Stream incoming socket messages here 47 self.unpacker_bytes = 0 # How many bytes the unpacker received 48 self.req_id = 0 # Last request id 49 self.handshake = {} # Handshake info got from peer 50 self.crypt = None # Connection encryption method 51 self.sock_wrapped = False # Socket wrapped to encryption 52 53 self.connected = False 54 self.event_connected = gevent.event.AsyncResult() # Solves on handshake received 55 self.closed = False 56 57 # Stats 58 self.start_time = time.time() 59 self.handshake_time = 0 60 self.last_recv_time = 0 61 self.last_message_time = 0 62 self.last_send_time = 0 63 self.last_sent_time = 0 64 self.incomplete_buff_recv = 0 65 self.bytes_recv = 0 66 self.bytes_sent = 0 67 self.last_ping_delay = None 68 self.last_req_time = 0 69 self.last_cmd_sent = None 70 self.last_cmd_recv = None 71 self.bad_actions = 0 72 self.sites = 0 73 self.cpu_time = 0.0 74 self.send_lock = RLock() 75 76 self.name = None 77 self.updateName() 78 79 self.waiting_requests = {} # Waiting sent requests 80 self.waiting_streams = {} # Waiting response file streams 81 82 def setIp(self, ip): 83 self.ip = ip 84 self.ip_type = helper.getIpType(ip) 85 self.updateName() 86 87 def createSocket(self): 88 if helper.getIpType(self.ip) == "ipv6" and not hasattr(socket, "socket_noproxy"): 89 # Create IPv6 connection as IPv4 when using proxy 90 return socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 91 else: 92 return socket.socket(socket.AF_INET, socket.SOCK_STREAM) 93 94 def updateName(self): 95 self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol) 96 97 def __str__(self): 98 return self.name 99 100 def __repr__(self): 101 return "<%s>" % self.__str__() 102 103 def log(self, text): 104 self.server.log.debug("%s > %s" % (self.name, text)) 105 106 def getValidSites(self): 107 return [key for key, val in self.server.tor_manager.site_onions.items() if val == self.target_onion] 108 109 def badAction(self, weight=1): 110 self.bad_actions += weight 111 if self.bad_actions > 40: 112 self.close("Too many bad actions") 113 elif self.bad_actions > 20: 114 time.sleep(5) 115 116 def goodAction(self): 117 self.bad_actions = 0 118 119 # Open connection to peer and wait for handshake 120 def connect(self): 121 self.type = "out" 122 if self.ip_type == "onion": 123 if not self.server.tor_manager or not self.server.tor_manager.enabled: 124 raise Exception("Can't connect to onion addresses, no Tor controller present") 125 self.sock = self.server.tor_manager.createSocket(self.ip, self.port) 126 elif config.tor == "always" and helper.isPrivateIp(self.ip) and self.ip not in config.ip_local: 127 raise Exception("Can't connect to local IPs in Tor: always mode") 128 elif config.trackers_proxy != "disable" and config.tor != "always" and self.is_tracker_connection: 129 if config.trackers_proxy == "tor": 130 self.sock = self.server.tor_manager.createSocket(self.ip, self.port) 131 else: 132 import socks 133 self.sock = socks.socksocket() 134 proxy_ip, proxy_port = config.trackers_proxy.split(":") 135 self.sock.set_proxy(socks.PROXY_TYPE_SOCKS5, proxy_ip, int(proxy_port)) 136 else: 137 self.sock = self.createSocket() 138 139 if "TCP_NODELAY" in dir(socket): 140 self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 141 142 timeout_before = self.sock.gettimeout() 143 self.sock.settimeout(30) 144 if self.ip_type == "ipv6" and not hasattr(self.sock, "proxy"): 145 sock_address = (self.ip, self.port, 1, 1) 146 else: 147 sock_address = (self.ip, self.port) 148 149 self.sock.connect(sock_address) 150 151 # Implicit SSL 152 should_encrypt = not self.ip_type == "onion" and self.ip not in self.server.broken_ssl_ips and self.ip not in config.ip_local 153 if self.cert_pin: 154 self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", cert_pin=self.cert_pin) 155 self.sock.do_handshake() 156 self.crypt = "tls-rsa" 157 self.sock_wrapped = True 158 elif should_encrypt and "tls-rsa" in CryptConnection.manager.crypt_supported: 159 try: 160 self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa") 161 self.sock.do_handshake() 162 self.crypt = "tls-rsa" 163 self.sock_wrapped = True 164 except Exception as err: 165 if not config.force_encryption: 166 self.log("Crypt connection error, adding %s:%s as broken ssl. %s" % (self.ip, self.port, Debug.formatException(err))) 167 self.server.broken_ssl_ips[self.ip] = True 168 self.sock.close() 169 self.crypt = None 170 self.sock = self.createSocket() 171 self.sock.settimeout(30) 172 self.sock.connect(sock_address) 173 174 # Detect protocol 175 self.send({"cmd": "handshake", "req_id": 0, "params": self.getHandshakeInfo()}) 176 event_connected = self.event_connected 177 gevent.spawn(self.messageLoop) 178 connect_res = event_connected.get() # Wait for handshake 179 self.sock.settimeout(timeout_before) 180 return connect_res 181 182 # Handle incoming connection 183 def handleIncomingConnection(self, sock): 184 self.log("Incoming connection...") 185 186 if "TCP_NODELAY" in dir(socket): 187 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 188 189 self.type = "in" 190 if self.ip not in config.ip_local: # Clearnet: Check implicit SSL 191 try: 192 first_byte = sock.recv(1, gevent.socket.MSG_PEEK) 193 if first_byte == b"\x16": 194 self.log("Crypt in connection using implicit SSL") 195 self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", True) 196 self.sock_wrapped = True 197 self.crypt = "tls-rsa" 198 except Exception as err: 199 self.log("Socket peek error: %s" % Debug.formatException(err)) 200 self.messageLoop() 201 202 def getMsgpackUnpacker(self): 203 if self.handshake and self.handshake.get("use_bin_type"): 204 return Msgpack.getUnpacker(fallback=True, decode=False) 205 else: # Backward compatibility for <0.7.0 206 return Msgpack.getUnpacker(fallback=True, decode=True) 207 208 # Message loop for connection 209 def messageLoop(self): 210 if not self.sock: 211 self.log("Socket error: No socket found") 212 return False 213 self.protocol = "v2" 214 self.updateName() 215 self.connected = True 216 buff_len = 0 217 req_len = 0 218 self.unpacker_bytes = 0 219 220 try: 221 while not self.closed: 222 buff = self.sock.recv(64 * 1024) 223 if not buff: 224 break # Connection closed 225 buff_len = len(buff) 226 227 # Statistics 228 self.last_recv_time = time.time() 229 self.incomplete_buff_recv += 1 230 self.bytes_recv += buff_len 231 self.server.bytes_recv += buff_len 232 req_len += buff_len 233 234 if not self.unpacker: 235 self.unpacker = self.getMsgpackUnpacker() 236 self.unpacker_bytes = 0 237 238 self.unpacker.feed(buff) 239 self.unpacker_bytes += buff_len 240 241 while True: 242 try: 243 message = next(self.unpacker) 244 except StopIteration: 245 break 246 if not type(message) is dict: 247 if config.debug_socket: 248 self.log("Invalid message type: %s, content: %r, buffer: %r" % (type(message), message, buff[0:16])) 249 raise Exception("Invalid message type: %s" % type(message)) 250 251 # Stats 252 self.incomplete_buff_recv = 0 253 stat_key = message.get("cmd", "unknown") 254 if stat_key == "response" and "to" in message: 255 cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"] 256 stat_key = "response: %s" % cmd_sent 257 if stat_key == "update": 258 stat_key = "update: %s" % message["params"]["site"] 259 self.server.stat_recv[stat_key]["bytes"] += req_len 260 self.server.stat_recv[stat_key]["num"] += 1 261 if "stream_bytes" in message: 262 self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"] 263 req_len = 0 264 265 # Handle message 266 if "stream_bytes" in message: 267 buff_left = self.handleStream(message, buff) 268 self.unpacker = self.getMsgpackUnpacker() 269 self.unpacker.feed(buff_left) 270 self.unpacker_bytes = len(buff_left) 271 if config.debug_socket: 272 self.log("Start new unpacker with buff_left: %r" % buff_left) 273 else: 274 self.handleMessage(message) 275 276 message = None 277 except Exception as err: 278 if not self.closed: 279 self.log("Socket error: %s" % Debug.formatException(err)) 280 self.server.stat_recv["error: %s" % err]["bytes"] += req_len 281 self.server.stat_recv["error: %s" % err]["num"] += 1 282 self.close("MessageLoop ended (closed: %s)" % self.closed) # MessageLoop ended, close connection 283 284 def getUnpackerUnprocessedBytesNum(self): 285 if "tell" in dir(self.unpacker): 286 bytes_num = self.unpacker_bytes - self.unpacker.tell() 287 else: 288 bytes_num = self.unpacker._fb_buf_n - self.unpacker._fb_buf_o 289 return bytes_num 290 291 # Stream socket directly to a file 292 def handleStream(self, message, buff): 293 stream_bytes_left = message["stream_bytes"] 294 file = self.waiting_streams[message["to"]] 295 296 unprocessed_bytes_num = self.getUnpackerUnprocessedBytesNum() 297 298 if unprocessed_bytes_num: # Found stream bytes in unpacker 299 unpacker_stream_bytes = min(unprocessed_bytes_num, stream_bytes_left) 300 buff_stream_start = len(buff) - unprocessed_bytes_num 301 file.write(buff[buff_stream_start:buff_stream_start + unpacker_stream_bytes]) 302 stream_bytes_left -= unpacker_stream_bytes 303 else: 304 unpacker_stream_bytes = 0 305 306 if config.debug_socket: 307 self.log( 308 "Starting stream %s: %s bytes (%s from unpacker, buff size: %s, unprocessed: %s)" % 309 (message["to"], message["stream_bytes"], unpacker_stream_bytes, len(buff), unprocessed_bytes_num) 310 ) 311 312 try: 313 while 1: 314 if stream_bytes_left <= 0: 315 break 316 stream_buff = self.sock.recv(min(64 * 1024, stream_bytes_left)) 317 if not stream_buff: 318 break 319 buff_len = len(stream_buff) 320 stream_bytes_left -= buff_len 321 file.write(stream_buff) 322 323 # Statistics 324 self.last_recv_time = time.time() 325 self.incomplete_buff_recv += 1 326 self.bytes_recv += buff_len 327 self.server.bytes_recv += buff_len 328 except Exception as err: 329 self.log("Stream read error: %s" % Debug.formatException(err)) 330 331 if config.debug_socket: 332 self.log("End stream %s, file pos: %s" % (message["to"], file.tell())) 333 334 self.incomplete_buff_recv = 0 335 self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event 336 del self.waiting_streams[message["to"]] 337 del self.waiting_requests[message["to"]] 338 339 if unpacker_stream_bytes: 340 return buff[buff_stream_start + unpacker_stream_bytes:] 341 else: 342 return b"" 343 344 # My handshake info 345 def getHandshakeInfo(self): 346 # No TLS for onion connections 347 if self.ip_type == "onion": 348 crypt_supported = [] 349 elif self.ip in self.server.broken_ssl_ips: 350 crypt_supported = [] 351 else: 352 crypt_supported = CryptConnection.manager.crypt_supported 353 # No peer id for onion connections 354 if self.ip_type == "onion" or self.ip in config.ip_local: 355 peer_id = "" 356 else: 357 peer_id = self.server.peer_id 358 # Setup peer lock from requested onion address 359 if self.handshake and self.handshake.get("target_ip", "").endswith(".onion") and self.server.tor_manager.start_onions: 360 self.target_onion = self.handshake.get("target_ip").replace(".onion", "") # My onion address 361 if not self.server.tor_manager.site_onions.values(): 362 self.server.log.warning("Unknown target onion address: %s" % self.target_onion) 363 364 handshake = { 365 "version": config.version, 366 "protocol": "v2", 367 "use_bin_type": True, 368 "peer_id": peer_id, 369 "fileserver_port": self.server.port, 370 "port_opened": self.server.port_opened.get(self.ip_type, None), 371 "target_ip": self.ip, 372 "rev": config.rev, 373 "crypt_supported": crypt_supported, 374 "crypt": self.crypt, 375 "time": int(time.time()) 376 } 377 if self.target_onion: 378 handshake["onion"] = self.target_onion 379 elif self.ip_type == "onion": 380 handshake["onion"] = self.server.tor_manager.getOnion("global") 381 382 if self.is_tracker_connection: 383 handshake["tracker_connection"] = True 384 385 if config.debug_socket: 386 self.log("My Handshake: %s" % handshake) 387 388 return handshake 389 390 def setHandshake(self, handshake): 391 if config.debug_socket: 392 self.log("Remote Handshake: %s" % handshake) 393 394 if handshake.get("peer_id") == self.server.peer_id and not handshake.get("tracker_connection") and not self.is_tracker_connection: 395 self.close("Same peer id, can't connect to myself") 396 self.server.peer_blacklist.append((handshake["target_ip"], handshake["fileserver_port"])) 397 return False 398 399 self.handshake = handshake 400 if handshake.get("port_opened", None) is False and "onion" not in handshake and not self.is_private_ip: # Not connectable 401 self.port = 0 402 else: 403 self.port = int(handshake["fileserver_port"]) # Set peer fileserver port 404 405 if handshake.get("use_bin_type") and self.unpacker: 406 unprocessed_bytes_num = self.getUnpackerUnprocessedBytesNum() 407 self.log("Changing unpacker to bin type (unprocessed bytes: %s)" % unprocessed_bytes_num) 408 unprocessed_bytes = self.unpacker.read_bytes(unprocessed_bytes_num) 409 self.unpacker = self.getMsgpackUnpacker() # Create new unpacker for different msgpack type 410 self.unpacker_bytes = 0 411 if unprocessed_bytes: 412 self.unpacker.feed(unprocessed_bytes) 413 414 # Check if we can encrypt the connection 415 if handshake.get("crypt_supported") and self.ip not in self.server.broken_ssl_ips: 416 if type(handshake["crypt_supported"][0]) is bytes: 417 handshake["crypt_supported"] = [item.decode() for item in handshake["crypt_supported"]] # Backward compatibility 418 419 if self.ip_type == "onion" or self.ip in config.ip_local: 420 crypt = None 421 elif handshake.get("crypt"): # Recommended crypt by server 422 crypt = handshake["crypt"] 423 else: # Select the best supported on both sides 424 crypt = CryptConnection.manager.selectCrypt(handshake["crypt_supported"]) 425 426 if crypt: 427 self.crypt = crypt 428 429 if self.type == "in" and handshake.get("onion") and not self.ip_type == "onion": # Set incoming connection's onion address 430 if self.server.ips.get(self.ip) == self: 431 del self.server.ips[self.ip] 432 self.setIp(handshake["onion"] + ".onion") 433 self.log("Changing ip to %s" % self.ip) 434 self.server.ips[self.ip] = self 435 self.updateName() 436 437 self.event_connected.set(True) # Mark handshake as done 438 self.event_connected = None 439 self.handshake_time = time.time() 440 441 # Handle incoming message 442 def handleMessage(self, message): 443 cmd = message["cmd"] 444 445 self.last_message_time = time.time() 446 self.last_cmd_recv = cmd 447 if cmd == "response": # New style response 448 if message["to"] in self.waiting_requests: 449 if self.last_send_time and len(self.waiting_requests) == 1: 450 ping = time.time() - self.last_send_time 451 self.last_ping_delay = ping 452 self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event 453 del self.waiting_requests[message["to"]] 454 elif message["to"] == 0: # Other peers handshake 455 ping = time.time() - self.start_time 456 if config.debug_socket: 457 self.log("Handshake response: %s, ping: %s" % (message, ping)) 458 self.last_ping_delay = ping 459 # Server switched to crypt, lets do it also if not crypted already 460 if message.get("crypt") and not self.sock_wrapped: 461 self.crypt = message["crypt"] 462 server = (self.type == "in") 463 self.log("Crypt out connection using: %s (server side: %s, ping: %.3fs)..." % (self.crypt, server, ping)) 464 self.sock = CryptConnection.manager.wrapSocket(self.sock, self.crypt, server, cert_pin=self.cert_pin) 465 self.sock.do_handshake() 466 self.sock_wrapped = True 467 468 if not self.sock_wrapped and self.cert_pin: 469 self.close("Crypt connection error: Socket not encrypted, but certificate pin present") 470 return 471 472 self.setHandshake(message) 473 else: 474 self.log("Unknown response: %s" % message) 475 elif cmd: 476 self.server.num_recv += 1 477 if cmd == "handshake": 478 self.handleHandshake(message) 479 else: 480 self.server.handleRequest(self, message) 481 482 # Incoming handshake set request 483 def handleHandshake(self, message): 484 self.setHandshake(message["params"]) 485 data = self.getHandshakeInfo() 486 data["cmd"] = "response" 487 data["to"] = message["req_id"] 488 self.send(data) # Send response to handshake 489 # Sent crypt request to client 490 if self.crypt and not self.sock_wrapped: 491 server = (self.type == "in") 492 self.log("Crypt in connection using: %s (server side: %s)..." % (self.crypt, server)) 493 try: 494 self.sock = CryptConnection.manager.wrapSocket(self.sock, self.crypt, server, cert_pin=self.cert_pin) 495 self.sock_wrapped = True 496 except Exception as err: 497 if not config.force_encryption: 498 self.log("Crypt connection error, adding %s:%s as broken ssl. %s" % (self.ip, self.port, Debug.formatException(err))) 499 self.server.broken_ssl_ips[self.ip] = True 500 self.close("Broken ssl") 501 502 if not self.sock_wrapped and self.cert_pin: 503 self.close("Crypt connection error: Socket not encrypted, but certificate pin present") 504 505 # Send data to connection 506 def send(self, message, streaming=False): 507 self.last_send_time = time.time() 508 if config.debug_socket: 509 self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % ( 510 message.get("cmd"), message.get("to"), streaming, 511 message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"), 512 message.get("req_id")) 513 ) 514 515 if not self.sock: 516 self.log("Send error: missing socket") 517 return False 518 519 if not self.connected and message.get("cmd") != "handshake": 520 self.log("Wait for handshake before send request") 521 self.event_connected.get() 522 523 try: 524 stat_key = message.get("cmd", "unknown") 525 if stat_key == "response": 526 stat_key = "response: %s" % self.last_cmd_recv 527 else: 528 self.server.num_sent += 1 529 530 self.server.stat_sent[stat_key]["num"] += 1 531 if streaming: 532 with self.send_lock: 533 bytes_sent = Msgpack.stream(message, self.sock.sendall) 534 self.bytes_sent += bytes_sent 535 self.server.bytes_sent += bytes_sent 536 self.server.stat_sent[stat_key]["bytes"] += bytes_sent 537 message = None 538 else: 539 data = Msgpack.pack(message) 540 self.bytes_sent += len(data) 541 self.server.bytes_sent += len(data) 542 self.server.stat_sent[stat_key]["bytes"] += len(data) 543 message = None 544 with self.send_lock: 545 self.sock.sendall(data) 546 except Exception as err: 547 self.close("Send error: %s (cmd: %s)" % (err, stat_key)) 548 return False 549 self.last_sent_time = time.time() 550 return True 551 552 # Stream file to connection without msgpacking 553 def sendRawfile(self, file, read_bytes): 554 buff = 64 * 1024 555 bytes_left = read_bytes 556 bytes_sent = 0 557 while True: 558 self.last_send_time = time.time() 559 data = file.read(min(bytes_left, buff)) 560 bytes_sent += len(data) 561 with self.send_lock: 562 self.sock.sendall(data) 563 bytes_left -= buff 564 if bytes_left <= 0: 565 break 566 self.bytes_sent += bytes_sent 567 self.server.bytes_sent += bytes_sent 568 self.server.stat_sent["raw_file"]["num"] += 1 569 self.server.stat_sent["raw_file"]["bytes"] += bytes_sent 570 return True 571 572 # Create and send a request to peer 573 def request(self, cmd, params={}, stream_to=None): 574 # Last command sent more than 10 sec ago, timeout 575 if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10: 576 self.close("Request %s timeout: %.3fs" % (self.last_cmd_sent, time.time() - self.last_send_time)) 577 return False 578 579 self.last_req_time = time.time() 580 self.last_cmd_sent = cmd 581 self.req_id += 1 582 data = {"cmd": cmd, "req_id": self.req_id, "params": params} 583 event = gevent.event.AsyncResult() # Create new event for response 584 self.waiting_requests[self.req_id] = {"evt": event, "cmd": cmd} 585 if stream_to: 586 self.waiting_streams[self.req_id] = stream_to 587 self.send(data) # Send request 588 res = event.get() # Wait until event solves 589 return res 590 591 def ping(self): 592 s = time.time() 593 response = None 594 with gevent.Timeout(10.0, False): 595 try: 596 response = self.request("ping") 597 except Exception as err: 598 self.log("Ping error: %s" % Debug.formatException(err)) 599 if response and "body" in response and response["body"] == b"Pong!": 600 self.last_ping_delay = time.time() - s 601 return True 602 else: 603 return False 604 605 # Close connection 606 def close(self, reason="Unknown"): 607 if self.closed: 608 return False # Already closed 609 self.closed = True 610 self.connected = False 611 if self.event_connected: 612 self.event_connected.set(False) 613 614 self.log( 615 "Closing connection: %s, waiting_requests: %s, sites: %s, buff: %s..." % 616 (reason, len(self.waiting_requests), self.sites, self.incomplete_buff_recv) 617 ) 618 for request in self.waiting_requests.values(): # Mark pending requests failed 619 request["evt"].set(False) 620 self.waiting_requests = {} 621 self.waiting_streams = {} 622 self.sites = 0 623 self.server.removeConnection(self) # Remove connection from server registry 624 try: 625 if self.sock: 626 self.sock.shutdown(gevent.socket.SHUT_WR) 627 self.sock.close() 628 except Exception as err: 629 if config.debug_socket: 630 self.log("Close error: %s" % err) 631 632 # Little cleanup 633 self.sock = None 634 self.unpacker = None 635 self.event_connected = None