Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2
3import gevent
4
5from Plugin import PluginManager
6from Config import config
7from . import BroadcastServer
8
9
10@PluginManager.registerTo("SiteAnnouncer")
11class SiteAnnouncerPlugin(object):
12 def announce(self, force=False, *args, **kwargs):
13 local_announcer = self.site.connection_server.local_announcer
14
15 thread = None
16 if local_announcer and (force or time.time() - local_announcer.last_discover > 5 * 60):
17 thread = gevent.spawn(local_announcer.discover, force=force)
18 back = super(SiteAnnouncerPlugin, self).announce(force=force, *args, **kwargs)
19
20 if thread:
21 thread.join()
22
23 return back
24
25
26class LocalAnnouncer(BroadcastServer.BroadcastServer):
27 def __init__(self, server, listen_port):
28 super(LocalAnnouncer, self).__init__("zeronet", listen_port=listen_port)
29 self.server = server
30
31 self.sender_info["peer_id"] = self.server.peer_id
32 self.sender_info["port"] = self.server.port
33 self.sender_info["broadcast_port"] = listen_port
34 self.sender_info["rev"] = config.rev
35
36 self.known_peers = {}
37 self.last_discover = 0
38
39 def discover(self, force=False):
40 self.log.debug("Sending discover request (force: %s)" % force)
41 self.last_discover = time.time()
42 if force: # Probably new site added, clean cache
43 self.known_peers = {}
44
45 for peer_id, known_peer in list(self.known_peers.items()):
46 if time.time() - known_peer["found"] > 20 * 60:
47 del(self.known_peers[peer_id])
48 self.log.debug("Timeout, removing from known_peers: %s" % peer_id)
49 self.broadcast({"cmd": "discoverRequest", "params": {}}, port=self.listen_port)
50
51 def actionDiscoverRequest(self, sender, params):
52 back = {
53 "cmd": "discoverResponse",
54 "params": {
55 "sites_changed": self.server.site_manager.sites_changed
56 }
57 }
58
59 if sender["peer_id"] not in self.known_peers:
60 self.known_peers[sender["peer_id"]] = {"added": time.time(), "sites_changed": 0, "updated": 0, "found": time.time()}
61 self.log.debug("Got discover request from unknown peer %s (%s), time to refresh known peers" % (sender["ip"], sender["peer_id"]))
62 gevent.spawn_later(1.0, self.discover) # Let the response arrive first to the requester
63
64 return back
65
66 def actionDiscoverResponse(self, sender, params):
67 if sender["peer_id"] in self.known_peers:
68 self.known_peers[sender["peer_id"]]["found"] = time.time()
69 if params["sites_changed"] != self.known_peers.get(sender["peer_id"], {}).get("sites_changed"):
70 # Peer's site list changed, request the list of new sites
71 return {"cmd": "siteListRequest"}
72 else:
73 # Peer's site list is the same
74 for site in self.server.sites.values():
75 peer = site.peers.get("%s:%s" % (sender["ip"], sender["port"]))
76 if peer:
77 peer.found("local")
78
79 def actionSiteListRequest(self, sender, params):
80 back = []
81 sites = list(self.server.sites.values())
82
83 # Split adresses to group of 100 to avoid UDP size limit
84 site_groups = [sites[i:i + 100] for i in range(0, len(sites), 100)]
85 for site_group in site_groups:
86 res = {}
87 res["sites_changed"] = self.server.site_manager.sites_changed
88 res["sites"] = [site.address_hash for site in site_group]
89 back.append({"cmd": "siteListResponse", "params": res})
90 return back
91
92 def actionSiteListResponse(self, sender, params):
93 s = time.time()
94 peer_sites = set(params["sites"])
95 num_found = 0
96 added_sites = []
97 for site in self.server.sites.values():
98 if site.address_hash in peer_sites:
99 added = site.addPeer(sender["ip"], sender["port"], source="local")
100 num_found += 1
101 if added:
102 site.worker_manager.onPeers()
103 site.updateWebsocket(peers_added=1)
104 added_sites.append(site)
105
106 # Save sites changed value to avoid unnecessary site list download
107 if sender["peer_id"] not in self.known_peers:
108 self.known_peers[sender["peer_id"]] = {"added": time.time()}
109
110 self.known_peers[sender["peer_id"]]["sites_changed"] = params["sites_changed"]
111 self.known_peers[sender["peer_id"]]["updated"] = time.time()
112 self.known_peers[sender["peer_id"]]["found"] = time.time()
113
114 self.log.debug(
115 "Tracker result: Discover from %s response parsed in %.3fs, found: %s added: %s of %s" %
116 (sender["ip"], time.time() - s, num_found, added_sites, len(peer_sites))
117 )
118
119
120@PluginManager.registerTo("FileServer")
121class FileServerPlugin(object):
122 def __init__(self, *args, **kwargs):
123 super(FileServerPlugin, self).__init__(*args, **kwargs)
124 if config.broadcast_port and config.tor != "always" and not config.disable_udp:
125 self.local_announcer = LocalAnnouncer(self, config.broadcast_port)
126 else:
127 self.local_announcer = None
128
129 def start(self, *args, **kwargs):
130 if self.local_announcer:
131 gevent.spawn(self.local_announcer.start)
132 return super(FileServerPlugin, self).start(*args, **kwargs)
133
134 def stop(self):
135 if self.local_announcer:
136 self.local_announcer.stop()
137 res = super(FileServerPlugin, self).stop()
138 return res
139
140
141@PluginManager.registerTo("ConfigPlugin")
142class ConfigPlugin(object):
143 def createArguments(self):
144 group = self.parser.add_argument_group("AnnounceLocal plugin")
145 group.add_argument('--broadcast_port', help='UDP broadcasting port for local peer discovery', default=1544, type=int, metavar='port')
146
147 return super(ConfigPlugin, self).createArguments()