Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import socket
2import time
3
4import gevent
5try:
6 from gevent.coros import RLock
7except:
8 from gevent.lock import RLock
9
10from Config import config
11from Debug import Debug
12from util import Msgpack
13from Crypt import CryptConnection
14from util import helper
15
16
17class Connection(object):
18 __slots__ = (
19 "sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "unpacker_bytes", "req_id", "ip_type",
20 "handshake", "crypt", "connected", "event_connected", "closed", "start_time", "handshake_time", "last_recv_time", "is_private_ip", "is_tracker_connection",
21 "last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock",
22 "last_ping_delay", "last_req_time", "last_cmd_sent", "last_cmd_recv", "bad_actions", "sites", "name", "waiting_requests", "waiting_streams"
23 )
24
25 def __init__(self, server, ip, port, sock=None, target_onion=None, is_tracker_connection=False):
26 self.sock = sock
27 self.cert_pin = None
28 if "#" in ip:
29 ip, self.cert_pin = ip.split("#")
30 self.target_onion = target_onion # Requested onion adress
31 self.id = server.last_connection_id
32 server.last_connection_id += 1
33 self.protocol = "?"
34 self.type = "?"
35 self.ip_type = "?"
36 self.port = int(port)
37 self.setIp(ip)
38
39 if helper.isPrivateIp(self.ip) and self.ip not in config.ip_local:
40 self.is_private_ip = True
41 else:
42 self.is_private_ip = False
43 self.is_tracker_connection = is_tracker_connection
44
45 self.server = server
46 self.unpacker = None # Stream incoming socket messages here
47 self.unpacker_bytes = 0 # How many bytes the unpacker received
48 self.req_id = 0 # Last request id
49 self.handshake = {} # Handshake info got from peer
50 self.crypt = None # Connection encryption method
51 self.sock_wrapped = False # Socket wrapped to encryption
52
53 self.connected = False
54 self.event_connected = gevent.event.AsyncResult() # Solves on handshake received
55 self.closed = False
56
57 # Stats
58 self.start_time = time.time()
59 self.handshake_time = 0
60 self.last_recv_time = 0
61 self.last_message_time = 0
62 self.last_send_time = 0
63 self.last_sent_time = 0
64 self.incomplete_buff_recv = 0
65 self.bytes_recv = 0
66 self.bytes_sent = 0
67 self.last_ping_delay = None
68 self.last_req_time = 0
69 self.last_cmd_sent = None
70 self.last_cmd_recv = None
71 self.bad_actions = 0
72 self.sites = 0
73 self.cpu_time = 0.0
74 self.send_lock = RLock()
75
76 self.name = None
77 self.updateName()
78
79 self.waiting_requests = {} # Waiting sent requests
80 self.waiting_streams = {} # Waiting response file streams
81
82 def setIp(self, ip):
83 self.ip = ip
84 self.ip_type = helper.getIpType(ip)
85 self.updateName()
86
87 def createSocket(self):
88 if helper.getIpType(self.ip) == "ipv6" and not hasattr(socket, "socket_noproxy"):
89 # Create IPv6 connection as IPv4 when using proxy
90 return socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
91 else:
92 return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
93
94 def updateName(self):
95 self.name = "Conn#%2s %-12s [%s]" % (self.id, self.ip, self.protocol)
96
97 def __str__(self):
98 return self.name
99
100 def __repr__(self):
101 return "<%s>" % self.__str__()
102
103 def log(self, text):
104 self.server.log.debug("%s > %s" % (self.name, text))
105
106 def getValidSites(self):
107 return [key for key, val in self.server.tor_manager.site_onions.items() if val == self.target_onion]
108
109 def badAction(self, weight=1):
110 self.bad_actions += weight
111 if self.bad_actions > 40:
112 self.close("Too many bad actions")
113 elif self.bad_actions > 20:
114 time.sleep(5)
115
116 def goodAction(self):
117 self.bad_actions = 0
118
119 # Open connection to peer and wait for handshake
120 def connect(self):
121 self.type = "out"
122 if self.ip_type == "onion":
123 if not self.server.tor_manager or not self.server.tor_manager.enabled:
124 raise Exception("Can't connect to onion addresses, no Tor controller present")
125 self.sock = self.server.tor_manager.createSocket(self.ip, self.port)
126 elif config.tor == "always" and helper.isPrivateIp(self.ip) and self.ip not in config.ip_local:
127 raise Exception("Can't connect to local IPs in Tor: always mode")
128 elif config.trackers_proxy != "disable" and config.tor != "always" and self.is_tracker_connection:
129 if config.trackers_proxy == "tor":
130 self.sock = self.server.tor_manager.createSocket(self.ip, self.port)
131 else:
132 import socks
133 self.sock = socks.socksocket()
134 proxy_ip, proxy_port = config.trackers_proxy.split(":")
135 self.sock.set_proxy(socks.PROXY_TYPE_SOCKS5, proxy_ip, int(proxy_port))
136 else:
137 self.sock = self.createSocket()
138
139 if "TCP_NODELAY" in dir(socket):
140 self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
141
142 timeout_before = self.sock.gettimeout()
143 self.sock.settimeout(30)
144 if self.ip_type == "ipv6" and not hasattr(self.sock, "proxy"):
145 sock_address = (self.ip, self.port, 1, 1)
146 else:
147 sock_address = (self.ip, self.port)
148
149 self.sock.connect(sock_address)
150
151 # Implicit SSL
152 should_encrypt = not self.ip_type == "onion" and self.ip not in self.server.broken_ssl_ips and self.ip not in config.ip_local
153 if self.cert_pin:
154 self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", cert_pin=self.cert_pin)
155 self.sock.do_handshake()
156 self.crypt = "tls-rsa"
157 self.sock_wrapped = True
158 elif should_encrypt and "tls-rsa" in CryptConnection.manager.crypt_supported:
159 try:
160 self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa")
161 self.sock.do_handshake()
162 self.crypt = "tls-rsa"
163 self.sock_wrapped = True
164 except Exception as err:
165 if not config.force_encryption:
166 self.log("Crypt connection error, adding %s:%s as broken ssl. %s" % (self.ip, self.port, Debug.formatException(err)))
167 self.server.broken_ssl_ips[self.ip] = True
168 self.sock.close()
169 self.crypt = None
170 self.sock = self.createSocket()
171 self.sock.settimeout(30)
172 self.sock.connect(sock_address)
173
174 # Detect protocol
175 self.send({"cmd": "handshake", "req_id": 0, "params": self.getHandshakeInfo()})
176 event_connected = self.event_connected
177 gevent.spawn(self.messageLoop)
178 connect_res = event_connected.get() # Wait for handshake
179 self.sock.settimeout(timeout_before)
180 return connect_res
181
182 # Handle incoming connection
183 def handleIncomingConnection(self, sock):
184 self.log("Incoming connection...")
185
186 if "TCP_NODELAY" in dir(socket):
187 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
188
189 self.type = "in"
190 if self.ip not in config.ip_local: # Clearnet: Check implicit SSL
191 try:
192 first_byte = sock.recv(1, gevent.socket.MSG_PEEK)
193 if first_byte == b"\x16":
194 self.log("Crypt in connection using implicit SSL")
195 self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", True)
196 self.sock_wrapped = True
197 self.crypt = "tls-rsa"
198 except Exception as err:
199 self.log("Socket peek error: %s" % Debug.formatException(err))
200 self.messageLoop()
201
202 def getMsgpackUnpacker(self):
203 if self.handshake and self.handshake.get("use_bin_type"):
204 return Msgpack.getUnpacker(fallback=True, decode=False)
205 else: # Backward compatibility for <0.7.0
206 return Msgpack.getUnpacker(fallback=True, decode=True)
207
208 # Message loop for connection
209 def messageLoop(self):
210 if not self.sock:
211 self.log("Socket error: No socket found")
212 return False
213 self.protocol = "v2"
214 self.updateName()
215 self.connected = True
216 buff_len = 0
217 req_len = 0
218 self.unpacker_bytes = 0
219
220 try:
221 while not self.closed:
222 buff = self.sock.recv(64 * 1024)
223 if not buff:
224 break # Connection closed
225 buff_len = len(buff)
226
227 # Statistics
228 self.last_recv_time = time.time()
229 self.incomplete_buff_recv += 1
230 self.bytes_recv += buff_len
231 self.server.bytes_recv += buff_len
232 req_len += buff_len
233
234 if not self.unpacker:
235 self.unpacker = self.getMsgpackUnpacker()
236 self.unpacker_bytes = 0
237
238 self.unpacker.feed(buff)
239 self.unpacker_bytes += buff_len
240
241 while True:
242 try:
243 message = next(self.unpacker)
244 except StopIteration:
245 break
246 if not type(message) is dict:
247 if config.debug_socket:
248 self.log("Invalid message type: %s, content: %r, buffer: %r" % (type(message), message, buff[0:16]))
249 raise Exception("Invalid message type: %s" % type(message))
250
251 # Stats
252 self.incomplete_buff_recv = 0
253 stat_key = message.get("cmd", "unknown")
254 if stat_key == "response" and "to" in message:
255 cmd_sent = self.waiting_requests.get(message["to"], {"cmd": "unknown"})["cmd"]
256 stat_key = "response: %s" % cmd_sent
257 if stat_key == "update":
258 stat_key = "update: %s" % message["params"]["site"]
259 self.server.stat_recv[stat_key]["bytes"] += req_len
260 self.server.stat_recv[stat_key]["num"] += 1
261 if "stream_bytes" in message:
262 self.server.stat_recv[stat_key]["bytes"] += message["stream_bytes"]
263 req_len = 0
264
265 # Handle message
266 if "stream_bytes" in message:
267 buff_left = self.handleStream(message, buff)
268 self.unpacker = self.getMsgpackUnpacker()
269 self.unpacker.feed(buff_left)
270 self.unpacker_bytes = len(buff_left)
271 if config.debug_socket:
272 self.log("Start new unpacker with buff_left: %r" % buff_left)
273 else:
274 self.handleMessage(message)
275
276 message = None
277 except Exception as err:
278 if not self.closed:
279 self.log("Socket error: %s" % Debug.formatException(err))
280 self.server.stat_recv["error: %s" % err]["bytes"] += req_len
281 self.server.stat_recv["error: %s" % err]["num"] += 1
282 self.close("MessageLoop ended (closed: %s)" % self.closed) # MessageLoop ended, close connection
283
284 def getUnpackerUnprocessedBytesNum(self):
285 if "tell" in dir(self.unpacker):
286 bytes_num = self.unpacker_bytes - self.unpacker.tell()
287 else:
288 bytes_num = self.unpacker._fb_buf_n - self.unpacker._fb_buf_o
289 return bytes_num
290
291 # Stream socket directly to a file
292 def handleStream(self, message, buff):
293 stream_bytes_left = message["stream_bytes"]
294 file = self.waiting_streams[message["to"]]
295
296 unprocessed_bytes_num = self.getUnpackerUnprocessedBytesNum()
297
298 if unprocessed_bytes_num: # Found stream bytes in unpacker
299 unpacker_stream_bytes = min(unprocessed_bytes_num, stream_bytes_left)
300 buff_stream_start = len(buff) - unprocessed_bytes_num
301 file.write(buff[buff_stream_start:buff_stream_start + unpacker_stream_bytes])
302 stream_bytes_left -= unpacker_stream_bytes
303 else:
304 unpacker_stream_bytes = 0
305
306 if config.debug_socket:
307 self.log(
308 "Starting stream %s: %s bytes (%s from unpacker, buff size: %s, unprocessed: %s)" %
309 (message["to"], message["stream_bytes"], unpacker_stream_bytes, len(buff), unprocessed_bytes_num)
310 )
311
312 try:
313 while 1:
314 if stream_bytes_left <= 0:
315 break
316 stream_buff = self.sock.recv(min(64 * 1024, stream_bytes_left))
317 if not stream_buff:
318 break
319 buff_len = len(stream_buff)
320 stream_bytes_left -= buff_len
321 file.write(stream_buff)
322
323 # Statistics
324 self.last_recv_time = time.time()
325 self.incomplete_buff_recv += 1
326 self.bytes_recv += buff_len
327 self.server.bytes_recv += buff_len
328 except Exception as err:
329 self.log("Stream read error: %s" % Debug.formatException(err))
330
331 if config.debug_socket:
332 self.log("End stream %s, file pos: %s" % (message["to"], file.tell()))
333
334 self.incomplete_buff_recv = 0
335 self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event
336 del self.waiting_streams[message["to"]]
337 del self.waiting_requests[message["to"]]
338
339 if unpacker_stream_bytes:
340 return buff[buff_stream_start + unpacker_stream_bytes:]
341 else:
342 return b""
343
344 # My handshake info
345 def getHandshakeInfo(self):
346 # No TLS for onion connections
347 if self.ip_type == "onion":
348 crypt_supported = []
349 elif self.ip in self.server.broken_ssl_ips:
350 crypt_supported = []
351 else:
352 crypt_supported = CryptConnection.manager.crypt_supported
353 # No peer id for onion connections
354 if self.ip_type == "onion" or self.ip in config.ip_local:
355 peer_id = ""
356 else:
357 peer_id = self.server.peer_id
358 # Setup peer lock from requested onion address
359 if self.handshake and self.handshake.get("target_ip", "").endswith(".onion") and self.server.tor_manager.start_onions:
360 self.target_onion = self.handshake.get("target_ip").replace(".onion", "") # My onion address
361 if not self.server.tor_manager.site_onions.values():
362 self.server.log.warning("Unknown target onion address: %s" % self.target_onion)
363
364 handshake = {
365 "version": config.version,
366 "protocol": "v2",
367 "use_bin_type": True,
368 "peer_id": peer_id,
369 "fileserver_port": self.server.port,
370 "port_opened": self.server.port_opened.get(self.ip_type, None),
371 "target_ip": self.ip,
372 "rev": config.rev,
373 "crypt_supported": crypt_supported,
374 "crypt": self.crypt,
375 "time": int(time.time())
376 }
377 if self.target_onion:
378 handshake["onion"] = self.target_onion
379 elif self.ip_type == "onion":
380 handshake["onion"] = self.server.tor_manager.getOnion("global")
381
382 if self.is_tracker_connection:
383 handshake["tracker_connection"] = True
384
385 if config.debug_socket:
386 self.log("My Handshake: %s" % handshake)
387
388 return handshake
389
390 def setHandshake(self, handshake):
391 if config.debug_socket:
392 self.log("Remote Handshake: %s" % handshake)
393
394 if handshake.get("peer_id") == self.server.peer_id and not handshake.get("tracker_connection") and not self.is_tracker_connection:
395 self.close("Same peer id, can't connect to myself")
396 self.server.peer_blacklist.append((handshake["target_ip"], handshake["fileserver_port"]))
397 return False
398
399 self.handshake = handshake
400 if handshake.get("port_opened", None) is False and "onion" not in handshake and not self.is_private_ip: # Not connectable
401 self.port = 0
402 else:
403 self.port = int(handshake["fileserver_port"]) # Set peer fileserver port
404
405 if handshake.get("use_bin_type") and self.unpacker:
406 unprocessed_bytes_num = self.getUnpackerUnprocessedBytesNum()
407 self.log("Changing unpacker to bin type (unprocessed bytes: %s)" % unprocessed_bytes_num)
408 unprocessed_bytes = self.unpacker.read_bytes(unprocessed_bytes_num)
409 self.unpacker = self.getMsgpackUnpacker() # Create new unpacker for different msgpack type
410 self.unpacker_bytes = 0
411 if unprocessed_bytes:
412 self.unpacker.feed(unprocessed_bytes)
413
414 # Check if we can encrypt the connection
415 if handshake.get("crypt_supported") and self.ip not in self.server.broken_ssl_ips:
416 if type(handshake["crypt_supported"][0]) is bytes:
417 handshake["crypt_supported"] = [item.decode() for item in handshake["crypt_supported"]] # Backward compatibility
418
419 if self.ip_type == "onion" or self.ip in config.ip_local:
420 crypt = None
421 elif handshake.get("crypt"): # Recommended crypt by server
422 crypt = handshake["crypt"]
423 else: # Select the best supported on both sides
424 crypt = CryptConnection.manager.selectCrypt(handshake["crypt_supported"])
425
426 if crypt:
427 self.crypt = crypt
428
429 if self.type == "in" and handshake.get("onion") and not self.ip_type == "onion": # Set incoming connection's onion address
430 if self.server.ips.get(self.ip) == self:
431 del self.server.ips[self.ip]
432 self.setIp(handshake["onion"] + ".onion")
433 self.log("Changing ip to %s" % self.ip)
434 self.server.ips[self.ip] = self
435 self.updateName()
436
437 self.event_connected.set(True) # Mark handshake as done
438 self.event_connected = None
439 self.handshake_time = time.time()
440
441 # Handle incoming message
442 def handleMessage(self, message):
443 cmd = message["cmd"]
444
445 self.last_message_time = time.time()
446 self.last_cmd_recv = cmd
447 if cmd == "response": # New style response
448 if message["to"] in self.waiting_requests:
449 if self.last_send_time and len(self.waiting_requests) == 1:
450 ping = time.time() - self.last_send_time
451 self.last_ping_delay = ping
452 self.waiting_requests[message["to"]]["evt"].set(message) # Set the response to event
453 del self.waiting_requests[message["to"]]
454 elif message["to"] == 0: # Other peers handshake
455 ping = time.time() - self.start_time
456 if config.debug_socket:
457 self.log("Handshake response: %s, ping: %s" % (message, ping))
458 self.last_ping_delay = ping
459 # Server switched to crypt, lets do it also if not crypted already
460 if message.get("crypt") and not self.sock_wrapped:
461 self.crypt = message["crypt"]
462 server = (self.type == "in")
463 self.log("Crypt out connection using: %s (server side: %s, ping: %.3fs)..." % (self.crypt, server, ping))
464 self.sock = CryptConnection.manager.wrapSocket(self.sock, self.crypt, server, cert_pin=self.cert_pin)
465 self.sock.do_handshake()
466 self.sock_wrapped = True
467
468 if not self.sock_wrapped and self.cert_pin:
469 self.close("Crypt connection error: Socket not encrypted, but certificate pin present")
470 return
471
472 self.setHandshake(message)
473 else:
474 self.log("Unknown response: %s" % message)
475 elif cmd:
476 self.server.num_recv += 1
477 if cmd == "handshake":
478 self.handleHandshake(message)
479 else:
480 self.server.handleRequest(self, message)
481
482 # Incoming handshake set request
483 def handleHandshake(self, message):
484 self.setHandshake(message["params"])
485 data = self.getHandshakeInfo()
486 data["cmd"] = "response"
487 data["to"] = message["req_id"]
488 self.send(data) # Send response to handshake
489 # Sent crypt request to client
490 if self.crypt and not self.sock_wrapped:
491 server = (self.type == "in")
492 self.log("Crypt in connection using: %s (server side: %s)..." % (self.crypt, server))
493 try:
494 self.sock = CryptConnection.manager.wrapSocket(self.sock, self.crypt, server, cert_pin=self.cert_pin)
495 self.sock_wrapped = True
496 except Exception as err:
497 if not config.force_encryption:
498 self.log("Crypt connection error, adding %s:%s as broken ssl. %s" % (self.ip, self.port, Debug.formatException(err)))
499 self.server.broken_ssl_ips[self.ip] = True
500 self.close("Broken ssl")
501
502 if not self.sock_wrapped and self.cert_pin:
503 self.close("Crypt connection error: Socket not encrypted, but certificate pin present")
504
505 # Send data to connection
506 def send(self, message, streaming=False):
507 self.last_send_time = time.time()
508 if config.debug_socket:
509 self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % (
510 message.get("cmd"), message.get("to"), streaming,
511 message.get("params", {}).get("site"), message.get("params", {}).get("inner_path"),
512 message.get("req_id"))
513 )
514
515 if not self.sock:
516 self.log("Send error: missing socket")
517 return False
518
519 if not self.connected and message.get("cmd") != "handshake":
520 self.log("Wait for handshake before send request")
521 self.event_connected.get()
522
523 try:
524 stat_key = message.get("cmd", "unknown")
525 if stat_key == "response":
526 stat_key = "response: %s" % self.last_cmd_recv
527 else:
528 self.server.num_sent += 1
529
530 self.server.stat_sent[stat_key]["num"] += 1
531 if streaming:
532 with self.send_lock:
533 bytes_sent = Msgpack.stream(message, self.sock.sendall)
534 self.bytes_sent += bytes_sent
535 self.server.bytes_sent += bytes_sent
536 self.server.stat_sent[stat_key]["bytes"] += bytes_sent
537 message = None
538 else:
539 data = Msgpack.pack(message)
540 self.bytes_sent += len(data)
541 self.server.bytes_sent += len(data)
542 self.server.stat_sent[stat_key]["bytes"] += len(data)
543 message = None
544 with self.send_lock:
545 self.sock.sendall(data)
546 except Exception as err:
547 self.close("Send error: %s (cmd: %s)" % (err, stat_key))
548 return False
549 self.last_sent_time = time.time()
550 return True
551
552 # Stream file to connection without msgpacking
553 def sendRawfile(self, file, read_bytes):
554 buff = 64 * 1024
555 bytes_left = read_bytes
556 bytes_sent = 0
557 while True:
558 self.last_send_time = time.time()
559 data = file.read(min(bytes_left, buff))
560 bytes_sent += len(data)
561 with self.send_lock:
562 self.sock.sendall(data)
563 bytes_left -= buff
564 if bytes_left <= 0:
565 break
566 self.bytes_sent += bytes_sent
567 self.server.bytes_sent += bytes_sent
568 self.server.stat_sent["raw_file"]["num"] += 1
569 self.server.stat_sent["raw_file"]["bytes"] += bytes_sent
570 return True
571
572 # Create and send a request to peer
573 def request(self, cmd, params={}, stream_to=None):
574 # Last command sent more than 10 sec ago, timeout
575 if self.waiting_requests and self.protocol == "v2" and time.time() - max(self.last_req_time, self.last_recv_time) > 10:
576 self.close("Request %s timeout: %.3fs" % (self.last_cmd_sent, time.time() - self.last_send_time))
577 return False
578
579 self.last_req_time = time.time()
580 self.last_cmd_sent = cmd
581 self.req_id += 1
582 data = {"cmd": cmd, "req_id": self.req_id, "params": params}
583 event = gevent.event.AsyncResult() # Create new event for response
584 self.waiting_requests[self.req_id] = {"evt": event, "cmd": cmd}
585 if stream_to:
586 self.waiting_streams[self.req_id] = stream_to
587 self.send(data) # Send request
588 res = event.get() # Wait until event solves
589 return res
590
591 def ping(self):
592 s = time.time()
593 response = None
594 with gevent.Timeout(10.0, False):
595 try:
596 response = self.request("ping")
597 except Exception as err:
598 self.log("Ping error: %s" % Debug.formatException(err))
599 if response and "body" in response and response["body"] == b"Pong!":
600 self.last_ping_delay = time.time() - s
601 return True
602 else:
603 return False
604
605 # Close connection
606 def close(self, reason="Unknown"):
607 if self.closed:
608 return False # Already closed
609 self.closed = True
610 self.connected = False
611 if self.event_connected:
612 self.event_connected.set(False)
613
614 self.log(
615 "Closing connection: %s, waiting_requests: %s, sites: %s, buff: %s..." %
616 (reason, len(self.waiting_requests), self.sites, self.incomplete_buff_recv)
617 )
618 for request in self.waiting_requests.values(): # Mark pending requests failed
619 request["evt"].set(False)
620 self.waiting_requests = {}
621 self.waiting_streams = {}
622 self.sites = 0
623 self.server.removeConnection(self) # Remove connection from server registry
624 try:
625 if self.sock:
626 self.sock.shutdown(gevent.socket.SHUT_WR)
627 self.sock.close()
628 except Exception as err:
629 if config.debug_socket:
630 self.log("Close error: %s" % err)
631
632 # Little cleanup
633 self.sock = None
634 self.unpacker = None
635 self.event_connected = None