Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 147 lines 6.0 kB view raw
1import time 2 3import gevent 4 5from Plugin import PluginManager 6from Config import config 7from . import BroadcastServer 8 9 10@PluginManager.registerTo("SiteAnnouncer") 11class SiteAnnouncerPlugin(object): 12 def announce(self, force=False, *args, **kwargs): 13 local_announcer = self.site.connection_server.local_announcer 14 15 thread = None 16 if local_announcer and (force or time.time() - local_announcer.last_discover > 5 * 60): 17 thread = gevent.spawn(local_announcer.discover, force=force) 18 back = super(SiteAnnouncerPlugin, self).announce(force=force, *args, **kwargs) 19 20 if thread: 21 thread.join() 22 23 return back 24 25 26class LocalAnnouncer(BroadcastServer.BroadcastServer): 27 def __init__(self, server, listen_port): 28 super(LocalAnnouncer, self).__init__("zeronet", listen_port=listen_port) 29 self.server = server 30 31 self.sender_info["peer_id"] = self.server.peer_id 32 self.sender_info["port"] = self.server.port 33 self.sender_info["broadcast_port"] = listen_port 34 self.sender_info["rev"] = config.rev 35 36 self.known_peers = {} 37 self.last_discover = 0 38 39 def discover(self, force=False): 40 self.log.debug("Sending discover request (force: %s)" % force) 41 self.last_discover = time.time() 42 if force: # Probably new site added, clean cache 43 self.known_peers = {} 44 45 for peer_id, known_peer in list(self.known_peers.items()): 46 if time.time() - known_peer["found"] > 20 * 60: 47 del(self.known_peers[peer_id]) 48 self.log.debug("Timeout, removing from known_peers: %s" % peer_id) 49 self.broadcast({"cmd": "discoverRequest", "params": {}}, port=self.listen_port) 50 51 def actionDiscoverRequest(self, sender, params): 52 back = { 53 "cmd": "discoverResponse", 54 "params": { 55 "sites_changed": self.server.site_manager.sites_changed 56 } 57 } 58 59 if sender["peer_id"] not in self.known_peers: 60 self.known_peers[sender["peer_id"]] = {"added": time.time(), "sites_changed": 0, "updated": 0, "found": time.time()} 61 self.log.debug("Got discover request from unknown peer %s (%s), time to refresh known peers" % (sender["ip"], sender["peer_id"])) 62 gevent.spawn_later(1.0, self.discover) # Let the response arrive first to the requester 63 64 return back 65 66 def actionDiscoverResponse(self, sender, params): 67 if sender["peer_id"] in self.known_peers: 68 self.known_peers[sender["peer_id"]]["found"] = time.time() 69 if params["sites_changed"] != self.known_peers.get(sender["peer_id"], {}).get("sites_changed"): 70 # Peer's site list changed, request the list of new sites 71 return {"cmd": "siteListRequest"} 72 else: 73 # Peer's site list is the same 74 for site in self.server.sites.values(): 75 peer = site.peers.get("%s:%s" % (sender["ip"], sender["port"])) 76 if peer: 77 peer.found("local") 78 79 def actionSiteListRequest(self, sender, params): 80 back = [] 81 sites = list(self.server.sites.values()) 82 83 # Split adresses to group of 100 to avoid UDP size limit 84 site_groups = [sites[i:i + 100] for i in range(0, len(sites), 100)] 85 for site_group in site_groups: 86 res = {} 87 res["sites_changed"] = self.server.site_manager.sites_changed 88 res["sites"] = [site.address_hash for site in site_group] 89 back.append({"cmd": "siteListResponse", "params": res}) 90 return back 91 92 def actionSiteListResponse(self, sender, params): 93 s = time.time() 94 peer_sites = set(params["sites"]) 95 num_found = 0 96 added_sites = [] 97 for site in self.server.sites.values(): 98 if site.address_hash in peer_sites: 99 added = site.addPeer(sender["ip"], sender["port"], source="local") 100 num_found += 1 101 if added: 102 site.worker_manager.onPeers() 103 site.updateWebsocket(peers_added=1) 104 added_sites.append(site) 105 106 # Save sites changed value to avoid unnecessary site list download 107 if sender["peer_id"] not in self.known_peers: 108 self.known_peers[sender["peer_id"]] = {"added": time.time()} 109 110 self.known_peers[sender["peer_id"]]["sites_changed"] = params["sites_changed"] 111 self.known_peers[sender["peer_id"]]["updated"] = time.time() 112 self.known_peers[sender["peer_id"]]["found"] = time.time() 113 114 self.log.debug( 115 "Tracker result: Discover from %s response parsed in %.3fs, found: %s added: %s of %s" % 116 (sender["ip"], time.time() - s, num_found, added_sites, len(peer_sites)) 117 ) 118 119 120@PluginManager.registerTo("FileServer") 121class FileServerPlugin(object): 122 def __init__(self, *args, **kwargs): 123 super(FileServerPlugin, self).__init__(*args, **kwargs) 124 if config.broadcast_port and config.tor != "always" and not config.disable_udp: 125 self.local_announcer = LocalAnnouncer(self, config.broadcast_port) 126 else: 127 self.local_announcer = None 128 129 def start(self, *args, **kwargs): 130 if self.local_announcer: 131 gevent.spawn(self.local_announcer.start) 132 return super(FileServerPlugin, self).start(*args, **kwargs) 133 134 def stop(self): 135 if self.local_announcer: 136 self.local_announcer.stop() 137 res = super(FileServerPlugin, self).stop() 138 return res 139 140 141@PluginManager.registerTo("ConfigPlugin") 142class ConfigPlugin(object): 143 def createArguments(self): 144 group = self.parser.add_argument_group("AnnounceLocal plugin") 145 group.add_argument('--broadcast_port', help='UDP broadcasting port for local peer discovery', default=1544, type=int, metavar='port') 146 147 return super(ConfigPlugin, self).createArguments()