Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import sys
3import collections
4import itertools
5import logging
6
7import gevent
8from util import helper
9from Config import config
10
11
12class ChartCollector(object):
13 def __init__(self, db):
14 self.db = db
15 if config.action == "main":
16 gevent.spawn_later(60 * 3, self.collector)
17 self.log = logging.getLogger("ChartCollector")
18 self.last_values = collections.defaultdict(dict)
19
20 def setInitialLastValues(self, sites):
21 # Recover last value of site bytes/sent
22 for site in sites:
23 self.last_values["site:" + site.address]["site_bytes_recv"] = site.settings.get("bytes_recv", 0)
24 self.last_values["site:" + site.address]["site_bytes_sent"] = site.settings.get("bytes_sent", 0)
25
26 def getCollectors(self):
27 collectors = {}
28 import main
29 file_server = main.file_server
30 sites = file_server.sites
31 if not sites:
32 return collectors
33 content_db = list(sites.values())[0].content_manager.contents.db
34
35 # Connection stats
36 collectors["connection"] = lambda: len(file_server.connections)
37 collectors["connection_in"] = (
38 lambda: len([1 for connection in file_server.connections if connection.type == "in"])
39 )
40 collectors["connection_onion"] = (
41 lambda: len([1 for connection in file_server.connections if connection.ip.endswith(".onion")])
42 )
43 collectors["connection_ping_avg"] = (
44 lambda: round(1000 * helper.avg(
45 [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
46 ))
47 )
48 collectors["connection_ping_min"] = (
49 lambda: round(1000 * min(
50 [connection.last_ping_delay for connection in file_server.connections if connection.last_ping_delay]
51 ))
52 )
53 collectors["connection_rev_avg"] = (
54 lambda: helper.avg(
55 [connection.handshake["rev"] for connection in file_server.connections if connection.handshake]
56 )
57 )
58
59 # Request stats
60 collectors["file_bytes_recv|change"] = lambda: file_server.bytes_recv
61 collectors["file_bytes_sent|change"] = lambda: file_server.bytes_sent
62 collectors["request_num_recv|change"] = lambda: file_server.num_recv
63 collectors["request_num_sent|change"] = lambda: file_server.num_sent
64
65 # Limit
66 collectors["optional_limit"] = lambda: content_db.getOptionalLimitBytes()
67 collectors["optional_used"] = lambda: content_db.getOptionalUsedBytes()
68 collectors["optional_downloaded"] = lambda: sum([site.settings.get("optional_downloaded", 0) for site in sites.values()])
69
70 # Peers
71 collectors["peer"] = lambda peers: len(peers)
72 collectors["peer_onion"] = lambda peers: len([True for peer in peers if ".onion" in peer])
73
74 # Size
75 collectors["size"] = lambda: sum([site.settings.get("size", 0) for site in sites.values()])
76 collectors["size_optional"] = lambda: sum([site.settings.get("size_optional", 0) for site in sites.values()])
77 collectors["content"] = lambda: sum([len(site.content_manager.contents) for site in sites.values()])
78
79 return collectors
80
81 def getSiteCollectors(self):
82 site_collectors = {}
83
84 # Size
85 site_collectors["site_size"] = lambda site: site.settings.get("size", 0)
86 site_collectors["site_size_optional"] = lambda site: site.settings.get("size_optional", 0)
87 site_collectors["site_optional_downloaded"] = lambda site: site.settings.get("optional_downloaded", 0)
88 site_collectors["site_content"] = lambda site: len(site.content_manager.contents)
89
90 # Data transfer
91 site_collectors["site_bytes_recv|change"] = lambda site: site.settings.get("bytes_recv", 0)
92 site_collectors["site_bytes_sent|change"] = lambda site: site.settings.get("bytes_sent", 0)
93
94 # Peers
95 site_collectors["site_peer"] = lambda site: len(site.peers)
96 site_collectors["site_peer_onion"] = lambda site: len(
97 [True for peer in site.peers.values() if peer.ip.endswith(".onion")]
98 )
99 site_collectors["site_peer_connected"] = lambda site: len([True for peer in site.peers.values() if peer.connection])
100
101 return site_collectors
102
103 def getUniquePeers(self):
104 import main
105 sites = main.file_server.sites
106 return set(itertools.chain.from_iterable(
107 [site.peers.keys() for site in sites.values()]
108 ))
109
110 def collectDatas(self, collectors, last_values, site=None):
111 if site is None:
112 peers = self.getUniquePeers()
113 datas = {}
114 for key, collector in collectors.items():
115 try:
116 if site:
117 value = collector(site)
118 elif key.startswith("peer"):
119 value = collector(peers)
120 else:
121 value = collector()
122 except ValueError:
123 value = None
124 except Exception as err:
125 self.log.info("Collector %s error: %s" % (key, err))
126 value = None
127
128 if "|change" in key: # Store changes relative to last value
129 key = key.replace("|change", "")
130 last_value = last_values.get(key, 0)
131 last_values[key] = value
132 value = value - last_value
133
134 if value is None:
135 datas[key] = None
136 else:
137 datas[key] = round(value, 3)
138 return datas
139
140 def collectGlobal(self, collectors, last_values):
141 now = int(time.time())
142 s = time.time()
143 datas = self.collectDatas(collectors, last_values["global"])
144 values = []
145 for key, value in datas.items():
146 values.append((self.db.getTypeId(key), value, now))
147 self.log.debug("Global collectors done in %.3fs" % (time.time() - s))
148
149 s = time.time()
150 cur = self.db.getCursor()
151 cur.executemany("INSERT INTO data (type_id, value, date_added) VALUES (?, ?, ?)", values)
152 self.log.debug("Global collectors inserted in %.3fs" % (time.time() - s))
153
154 def collectSites(self, sites, collectors, last_values):
155 now = int(time.time())
156 s = time.time()
157 values = []
158 for address, site in list(sites.items()):
159 site_datas = self.collectDatas(collectors, last_values["site:%s" % address], site)
160 for key, value in site_datas.items():
161 values.append((self.db.getTypeId(key), self.db.getSiteId(address), value, now))
162 time.sleep(0.001)
163 self.log.debug("Site collections done in %.3fs" % (time.time() - s))
164
165 s = time.time()
166 cur = self.db.getCursor()
167 cur.executemany("INSERT INTO data (type_id, site_id, value, date_added) VALUES (?, ?, ?, ?)", values)
168 self.log.debug("Site collectors inserted in %.3fs" % (time.time() - s))
169
170 def collector(self):
171 collectors = self.getCollectors()
172 site_collectors = self.getSiteCollectors()
173 import main
174 sites = main.file_server.sites
175 i = 0
176 while 1:
177 self.collectGlobal(collectors, self.last_values)
178 if i % 12 == 0: # Only collect sites data every hour
179 self.collectSites(sites, site_collectors, self.last_values)
180 time.sleep(60 * 5)
181 i += 1