Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import logging
2import time
3import sys
4import itertools
5import collections
6
7import gevent
8
9import io
10from Debug import Debug
11from Config import config
12from util import helper
13from .PeerHashfield import PeerHashfield
14from Plugin import PluginManager
15
16if config.use_tempfiles:
17 import tempfile
18
19
20# Communicate remote peers
21@PluginManager.acceptPlugins
22class Peer(object):
23 __slots__ = (
24 "ip", "port", "site", "key", "connection", "connection_server", "time_found", "time_response", "time_hashfield",
25 "time_added", "has_hashfield", "is_tracker_connection", "time_my_hashfield_sent", "last_ping", "reputation",
26 "last_content_json_update", "hashfield", "connection_error", "hash_failed", "download_bytes", "download_time"
27 )
28
29 def __init__(self, ip, port, site=None, connection_server=None):
30 self.ip = ip
31 self.port = port
32 self.site = site
33 self.key = "%s:%s" % (ip, port)
34
35 self.connection = None
36 self.connection_server = connection_server
37 self.has_hashfield = False # Lazy hashfield object not created yet
38 self.time_hashfield = None # Last time peer's hashfiled downloaded
39 self.time_my_hashfield_sent = None # Last time my hashfield sent to peer
40 self.time_found = time.time() # Time of last found in the torrent tracker
41 self.time_response = None # Time of last successful response from peer
42 self.time_added = time.time()
43 self.last_ping = None # Last response time for ping
44 self.is_tracker_connection = False # Tracker connection instead of normal peer
45 self.reputation = 0 # More likely to connect if larger
46 self.last_content_json_update = 0.0 # Modify date of last received content.json
47
48 self.connection_error = 0 # Series of connection error
49 self.hash_failed = 0 # Number of bad files from peer
50 self.download_bytes = 0 # Bytes downloaded
51 self.download_time = 0 # Time spent to download
52
53 def __getattr__(self, key):
54 if key == "hashfield":
55 self.has_hashfield = True
56 self.hashfield = PeerHashfield()
57 return self.hashfield
58 else:
59 return getattr(self, key)
60
61 def log(self, text):
62 if not config.verbose:
63 return # Only log if we are in debug mode
64 if self.site:
65 self.site.log.debug("%s:%s %s" % (self.ip, self.port, text))
66 else:
67 logging.debug("%s:%s %s" % (self.ip, self.port, text))
68
69 # Connect to host
70 def connect(self, connection=None):
71 if self.reputation < -10:
72 self.reputation = -10
73 if self.reputation > 10:
74 self.reputation = 10
75
76 if self.connection:
77 self.log("Getting connection (Closing %s)..." % self.connection)
78 self.connection.close("Connection change")
79 else:
80 self.log("Getting connection (reputation: %s)..." % self.reputation)
81
82 if connection: # Connection specified
83 self.log("Assigning connection %s" % connection)
84 self.connection = connection
85 self.connection.sites += 1
86 else: # Try to find from connection pool or create new connection
87 self.connection = None
88
89 try:
90 if self.connection_server:
91 connection_server = self.connection_server
92 elif self.site:
93 connection_server = self.site.connection_server
94 else:
95 import main
96 connection_server = main.file_server
97 self.connection = connection_server.getConnection(self.ip, self.port, site=self.site, is_tracker_connection=self.is_tracker_connection)
98 self.reputation += 1
99 self.connection.sites += 1
100 except Exception as err:
101 self.onConnectionError("Getting connection error")
102 self.log("Getting connection error: %s (connection_error: %s, hash_failed: %s)" %
103 (Debug.formatException(err), self.connection_error, self.hash_failed))
104 self.connection = None
105 return self.connection
106
107 # Check if we have connection to peer
108 def findConnection(self):
109 if self.connection and self.connection.connected: # We have connection to peer
110 return self.connection
111 else: # Try to find from other sites connections
112 self.connection = self.site.connection_server.getConnection(self.ip, self.port, create=False, site=self.site)
113 if self.connection:
114 self.connection.sites += 1
115 return self.connection
116
117 def __str__(self):
118 if self.site:
119 return "Peer:%-12s of %s" % (self.ip, self.site.address_short)
120 else:
121 return "Peer:%-12s" % self.ip
122
123 def __repr__(self):
124 return "<%s>" % self.__str__()
125
126 def packMyAddress(self):
127 if self.ip.endswith(".onion"):
128 return helper.packOnionAddress(self.ip, self.port)
129 else:
130 return helper.packAddress(self.ip, self.port)
131
132 # Found a peer from a source
133 def found(self, source="other"):
134 if self.reputation < 5:
135 if source == "tracker":
136 if self.ip.endswith(".onion"):
137 self.reputation += 1
138 else:
139 self.reputation += 2
140 elif source == "local":
141 self.reputation += 20
142
143 if source in ("tracker", "local"):
144 self.site.peers_recent.appendleft(self)
145 self.time_found = time.time()
146
147 # Send a command to peer and return response value
148 def request(self, cmd, params={}, stream_to=None):
149 if not self.connection or self.connection.closed:
150 self.connect()
151 if not self.connection:
152 self.onConnectionError("Reconnect error")
153 return None # Connection failed
154
155 self.log("Send request: %s %s %s %s" % (params.get("site", ""), cmd, params.get("inner_path", ""), params.get("location", "")))
156
157 for retry in range(1, 4): # Retry 3 times
158 try:
159 if not self.connection:
160 raise Exception("No connection found")
161 res = self.connection.request(cmd, params, stream_to)
162 if not res:
163 raise Exception("Send error")
164 if "error" in res:
165 self.log("%s error: %s" % (cmd, res["error"]))
166 self.onConnectionError("Response error")
167 break
168 else: # Successful request, reset connection error num
169 self.connection_error = 0
170 self.time_response = time.time()
171 if res:
172 return res
173 else:
174 raise Exception("Invalid response: %s" % res)
175 except Exception as err:
176 if type(err).__name__ == "Notify": # Greenlet killed by worker
177 self.log("Peer worker got killed: %s, aborting cmd: %s" % (err.message, cmd))
178 break
179 else:
180 self.onConnectionError("Request error")
181 self.log(
182 "%s (connection_error: %s, hash_failed: %s, retry: %s)" %
183 (Debug.formatException(err), self.connection_error, self.hash_failed, retry)
184 )
185 time.sleep(1 * retry)
186 self.connect()
187 return None # Failed after 4 retry
188
189 # Get a file content from peer
190 def getFile(self, site, inner_path, file_size=None, pos_from=0, pos_to=None, streaming=False):
191 if file_size and file_size > 5 * 1024 * 1024:
192 max_read_size = 1024 * 1024
193 else:
194 max_read_size = 512 * 1024
195
196 if pos_to:
197 read_bytes = min(max_read_size, pos_to - pos_from)
198 else:
199 read_bytes = max_read_size
200
201 location = pos_from
202
203 if config.use_tempfiles:
204 buff = tempfile.SpooledTemporaryFile(max_size=16 * 1024, mode='w+b')
205 else:
206 buff = io.BytesIO()
207
208 s = time.time()
209 while True: # Read in smaller parts
210 if config.stream_downloads or read_bytes > 256 * 1024 or streaming:
211 res = self.request("streamFile", {"site": site, "inner_path": inner_path, "location": location, "read_bytes": read_bytes, "file_size": file_size}, stream_to=buff)
212 if not res or "location" not in res: # Error
213 return False
214 else:
215 self.log("Send: %s" % inner_path)
216 res = self.request("getFile", {"site": site, "inner_path": inner_path, "location": location, "read_bytes": read_bytes, "file_size": file_size})
217 if not res or "location" not in res: # Error
218 return False
219 self.log("Recv: %s" % inner_path)
220 buff.write(res["body"])
221 res["body"] = None # Save memory
222
223 if res["location"] == res["size"] or res["location"] == pos_to: # End of file
224 break
225 else:
226 location = res["location"]
227 if pos_to:
228 read_bytes = min(max_read_size, pos_to - location)
229
230 if pos_to:
231 recv = pos_to - pos_from
232 else:
233 recv = res["location"]
234
235 self.download_bytes += recv
236 self.download_time += (time.time() - s)
237 if self.site:
238 self.site.settings["bytes_recv"] = self.site.settings.get("bytes_recv", 0) + recv
239 self.log("Downloaded: %s, pos: %s, read_bytes: %s" % (inner_path, buff.tell(), read_bytes))
240 buff.seek(0)
241 return buff
242
243 # Send a ping request
244 def ping(self):
245 response_time = None
246 for retry in range(1, 3): # Retry 3 times
247 s = time.time()
248 with gevent.Timeout(10.0, False): # 10 sec timeout, don't raise exception
249 res = self.request("ping")
250
251 if res and "body" in res and res["body"] == b"Pong!":
252 response_time = time.time() - s
253 break # All fine, exit from for loop
254 # Timeout reached or bad response
255 self.onConnectionError("Ping timeout")
256 self.connect()
257 time.sleep(1)
258
259 if response_time:
260 self.log("Ping: %.3f" % response_time)
261 else:
262 self.log("Ping failed")
263 self.last_ping = response_time
264 return response_time
265
266 # Request peer exchange from peer
267 def pex(self, site=None, need_num=5):
268 if not site:
269 site = self.site # If no site defined request peers for this site
270
271 # give back 5 connectible peers
272 packed_peers = helper.packPeers(self.site.getConnectablePeers(5, allow_private=False))
273 request = {"site": site.address, "peers": packed_peers["ipv4"], "need": need_num}
274 if packed_peers["onion"]:
275 request["peers_onion"] = packed_peers["onion"]
276 if packed_peers["ipv6"]:
277 request["peers_ipv6"] = packed_peers["ipv6"]
278 res = self.request("pex", request)
279 if not res or "error" in res:
280 return False
281 added = 0
282
283 # Remove unsupported peer types
284 if "peers_ipv6" in res and self.connection and "ipv6" not in self.connection.server.supported_ip_types:
285 del res["peers_ipv6"]
286
287 if "peers_onion" in res and self.connection and "onion" not in self.connection.server.supported_ip_types:
288 del res["peers_onion"]
289
290 # Add IPv4 + IPv6
291 for peer in itertools.chain(res.get("peers", []), res.get("peers_ipv6", [])):
292 address = helper.unpackAddress(peer)
293 if site.addPeer(*address, source="pex"):
294 added += 1
295
296 # Add Onion
297 for peer in res.get("peers_onion", []):
298 address = helper.unpackOnionAddress(peer)
299 if site.addPeer(*address, source="pex"):
300 added += 1
301
302 if added:
303 self.log("Added peers using pex: %s" % added)
304
305 return added
306
307 # List modified files since the date
308 # Return: {inner_path: modification date,...}
309 def listModified(self, since):
310 return self.request("listModified", {"since": since, "site": self.site.address})
311
312 def updateHashfield(self, force=False):
313 # Don't update hashfield again in 5 min
314 if self.time_hashfield and time.time() - self.time_hashfield < 5 * 60 and not force:
315 return False
316
317 self.time_hashfield = time.time()
318 res = self.request("getHashfield", {"site": self.site.address})
319 if not res or "error" in res or "hashfield_raw" not in res:
320 return False
321 self.hashfield.replaceFromBytes(res["hashfield_raw"])
322
323 return self.hashfield
324
325 # Find peers for hashids
326 # Return: {hash1: ["ip:port", "ip:port",...],...}
327 def findHashIds(self, hash_ids):
328 res = self.request("findHashIds", {"site": self.site.address, "hash_ids": hash_ids})
329 if not res or "error" in res or type(res) is not dict:
330 return False
331
332 back = collections.defaultdict(list)
333
334 for ip_type in ["ipv4", "ipv6", "onion"]:
335 if ip_type == "ipv4":
336 key = "peers"
337 else:
338 key = "peers_%s" % ip_type
339 for hash, peers in list(res.get(key, {}).items())[0:30]:
340 if ip_type == "onion":
341 unpacker_func = helper.unpackOnionAddress
342 else:
343 unpacker_func = helper.unpackAddress
344
345 back[hash] += list(map(unpacker_func, peers))
346
347 for hash in res.get("my", []):
348 if self.connection:
349 back[hash].append((self.connection.ip, self.connection.port))
350 else:
351 back[hash].append((self.ip, self.port))
352
353 return back
354
355 # Send my hashfield to peer
356 # Return: True if sent
357 def sendMyHashfield(self):
358 if self.connection and self.connection.handshake.get("rev", 0) < 510:
359 return False # Not supported
360 if self.time_my_hashfield_sent and self.site.content_manager.hashfield.time_changed <= self.time_my_hashfield_sent:
361 return False # Peer already has the latest hashfield
362
363 res = self.request("setHashfield", {"site": self.site.address, "hashfield_raw": self.site.content_manager.hashfield.tobytes()})
364 if not res or "error" in res:
365 return False
366 else:
367 self.time_my_hashfield_sent = time.time()
368 return True
369
370 def publish(self, address, inner_path, body, modified, diffs=[]):
371 if len(body) > 10 * 1024 and self.connection and self.connection.handshake.get("rev", 0) >= 4095:
372 # To save bw we don't push big content.json to peers
373 body = b""
374
375 return self.request("update", {
376 "site": address,
377 "inner_path": inner_path,
378 "body": body,
379 "modified": modified,
380 "diffs": diffs
381 })
382
383 # Stop and remove from site
384 def remove(self, reason="Removing"):
385 self.log("Removing peer...Connection error: %s, Hash failed: %s" % (self.connection_error, self.hash_failed))
386 if self.site and self.key in self.site.peers:
387 del(self.site.peers[self.key])
388
389 if self.site and self in self.site.peers_recent:
390 self.site.peers_recent.remove(self)
391
392 if self.connection:
393 self.connection.close(reason)
394
395 # - EVENTS -
396
397 # On connection error
398 def onConnectionError(self, reason="Unknown"):
399 self.connection_error += 1
400 if self.site and len(self.site.peers) > 200:
401 limit = 3
402 else:
403 limit = 6
404 self.reputation -= 1
405 if self.connection_error >= limit: # Dead peer
406 self.remove("Peer connection: %s" % reason)
407
408 # Done working with peer
409 def onWorkerDone(self):
410 pass