Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 181 lines 7.5 kB view raw
1import time 2import sys 3import collections 4import itertools 5import logging 6 7import gevent 8from util import helper 9from Config import config 10 11 12class ChartCollector(object): 13 def __init__(self, db): 14 self.db = db 15 if config.action == "main": 16 gevent.spawn_later(60 * 3, self.collector) 17 self.log = logging.getLogger("ChartCollector") 18 self.last_values = collections.defaultdict(dict) 19 20 def setInitialLastValues(self, sites): 21 # Recover last value of site bytes/sent 22 for site in sites: 23 self.last_values["site:" + site.address]["site_bytes_recv"] = site.settings.get("bytes_recv", 0) 24 self.last_values["site:" + site.address]["site_bytes_sent"] = site.settings.get("bytes_sent", 0) 25 26 def getCollectors(self): 27 collectors = {} 28 import main 29 file_server = main.file_server 30 sites = file_server.sites 31 if not sites: 32 return collectors 33 content_db = list(sites.values())[0].content_manager.contents.db 34 35 # Connection stats 36 collectors["connection"] = lambda: len(file_server.connections) 37 collectors["connection_in"] = ( 38 lambda: len([1 for connection in file_server.connections if connection.type == "in"]) 39 ) 40 collectors["connection_onion"] = ( 41 lambda: len([1 for connection in file_server.connections if connection.ip.endswith(".onion")]) 42 ) 43 collectors["connection_ping_avg"] = ( 44 lambda: round(1000 * helper.avg( 45 [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay] 46 )) 47 ) 48 collectors["connection_ping_min"] = ( 49 lambda: round(1000 * min( 50 [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay] 51 )) 52 ) 53 collectors["connection_rev_avg"] = ( 54 lambda: helper.avg( 55 [connection.handshake["rev"] for connection in file_server.connections if connection.handshake] 56 ) 57 ) 58 59 # Request stats 60 collectors["file_bytes_recv|change"] = lambda: file_server.bytes_recv 61 collectors["file_bytes_sent|change"] = lambda: file_server.bytes_sent 62 collectors["request_num_recv|change"] = lambda: file_server.num_recv 63 collectors["request_num_sent|change"] = lambda: file_server.num_sent 64 65 # Limit 66 collectors["optional_limit"] = lambda: content_db.getOptionalLimitBytes() 67 collectors["optional_used"] = lambda: content_db.getOptionalUsedBytes() 68 collectors["optional_downloaded"] = lambda: sum([site.settings.get("optional_downloaded", 0) for site in sites.values()]) 69 70 # Peers 71 collectors["peer"] = lambda peers: len(peers) 72 collectors["peer_onion"] = lambda peers: len([True for peer in peers if ".onion" in peer]) 73 74 # Size 75 collectors["size"] = lambda: sum([site.settings.get("size", 0) for site in sites.values()]) 76 collectors["size_optional"] = lambda: sum([site.settings.get("size_optional", 0) for site in sites.values()]) 77 collectors["content"] = lambda: sum([len(site.content_manager.contents) for site in sites.values()]) 78 79 return collectors 80 81 def getSiteCollectors(self): 82 site_collectors = {} 83 84 # Size 85 site_collectors["site_size"] = lambda site: site.settings.get("size", 0) 86 site_collectors["site_size_optional"] = lambda site: site.settings.get("size_optional", 0) 87 site_collectors["site_optional_downloaded"] = lambda site: site.settings.get("optional_downloaded", 0) 88 site_collectors["site_content"] = lambda site: len(site.content_manager.contents) 89 90 # Data transfer 91 site_collectors["site_bytes_recv|change"] = lambda site: site.settings.get("bytes_recv", 0) 92 site_collectors["site_bytes_sent|change"] = lambda site: site.settings.get("bytes_sent", 0) 93 94 # Peers 95 site_collectors["site_peer"] = lambda site: len(site.peers) 96 site_collectors["site_peer_onion"] = lambda site: len( 97 [True for peer in site.peers.values() if peer.ip.endswith(".onion")] 98 ) 99 site_collectors["site_peer_connected"] = lambda site: len([True for peer in site.peers.values() if peer.connection]) 100 101 return site_collectors 102 103 def getUniquePeers(self): 104 import main 105 sites = main.file_server.sites 106 return set(itertools.chain.from_iterable( 107 [site.peers.keys() for site in sites.values()] 108 )) 109 110 def collectDatas(self, collectors, last_values, site=None): 111 if site is None: 112 peers = self.getUniquePeers() 113 datas = {} 114 for key, collector in collectors.items(): 115 try: 116 if site: 117 value = collector(site) 118 elif key.startswith("peer"): 119 value = collector(peers) 120 else: 121 value = collector() 122 except ValueError: 123 value = None 124 except Exception as err: 125 self.log.info("Collector %s error: %s" % (key, err)) 126 value = None 127 128 if "|change" in key: # Store changes relative to last value 129 key = key.replace("|change", "") 130 last_value = last_values.get(key, 0) 131 last_values[key] = value 132 value = value - last_value 133 134 if value is None: 135 datas[key] = None 136 else: 137 datas[key] = round(value, 3) 138 return datas 139 140 def collectGlobal(self, collectors, last_values): 141 now = int(time.time()) 142 s = time.time() 143 datas = self.collectDatas(collectors, last_values["global"]) 144 values = [] 145 for key, value in datas.items(): 146 values.append((self.db.getTypeId(key), value, now)) 147 self.log.debug("Global collectors done in %.3fs" % (time.time() - s)) 148 149 s = time.time() 150 cur = self.db.getCursor() 151 cur.executemany("INSERT INTO data (type_id, value, date_added) VALUES (?, ?, ?)", values) 152 self.log.debug("Global collectors inserted in %.3fs" % (time.time() - s)) 153 154 def collectSites(self, sites, collectors, last_values): 155 now = int(time.time()) 156 s = time.time() 157 values = [] 158 for address, site in list(sites.items()): 159 site_datas = self.collectDatas(collectors, last_values["site:%s" % address], site) 160 for key, value in site_datas.items(): 161 values.append((self.db.getTypeId(key), self.db.getSiteId(address), value, now)) 162 time.sleep(0.001) 163 self.log.debug("Site collections done in %.3fs" % (time.time() - s)) 164 165 s = time.time() 166 cur = self.db.getCursor() 167 cur.executemany("INSERT INTO data (type_id, site_id, value, date_added) VALUES (?, ?, ?, ?)", values) 168 self.log.debug("Site collectors inserted in %.3fs" % (time.time() - s)) 169 170 def collector(self): 171 collectors = self.getCollectors() 172 site_collectors = self.getSiteCollectors() 173 import main 174 sites = main.file_server.sites 175 i = 0 176 while 1: 177 self.collectGlobal(collectors, self.last_values) 178 if i % 12 == 0: # Only collect sites data every hour 179 self.collectSites(sites, site_collectors, self.last_values) 180 time.sleep(60 * 5) 181 i += 1