Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import collections
3import itertools
4import re
5
6import gevent
7
8from util import helper
9from Plugin import PluginManager
10from Config import config
11from Debug import Debug
12
13if "content_db" not in locals().keys(): # To keep between module reloads
14 content_db = None
15
16
17@PluginManager.registerTo("ContentDb")
18class ContentDbPlugin(object):
19 def __init__(self, *args, **kwargs):
20 global content_db
21 content_db = self
22 self.filled = {} # Site addresses that already filled from content.json
23 self.need_filling = False # file_optional table just created, fill data from content.json files
24 self.time_peer_numbers_updated = 0
25 self.my_optional_files = {} # Last 50 site_address/inner_path called by fileWrite (auto-pinning these files)
26 self.optional_files = collections.defaultdict(dict)
27 self.optional_files_loaded = False
28 self.timer_check_optional = helper.timer(60 * 5, self.checkOptionalLimit)
29 super(ContentDbPlugin, self).__init__(*args, **kwargs)
30
31 def getSchema(self):
32 schema = super(ContentDbPlugin, self).getSchema()
33
34 # Need file_optional table
35 schema["tables"]["file_optional"] = {
36 "cols": [
37 ["file_id", "INTEGER PRIMARY KEY UNIQUE NOT NULL"],
38 ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
39 ["inner_path", "TEXT"],
40 ["hash_id", "INTEGER"],
41 ["size", "INTEGER"],
42 ["peer", "INTEGER DEFAULT 0"],
43 ["uploaded", "INTEGER DEFAULT 0"],
44 ["is_downloaded", "INTEGER DEFAULT 0"],
45 ["is_pinned", "INTEGER DEFAULT 0"],
46 ["time_added", "INTEGER DEFAULT 0"],
47 ["time_downloaded", "INTEGER DEFAULT 0"],
48 ["time_accessed", "INTEGER DEFAULT 0"]
49 ],
50 "indexes": [
51 "CREATE UNIQUE INDEX file_optional_key ON file_optional (site_id, inner_path)",
52 "CREATE INDEX is_downloaded ON file_optional (is_downloaded)"
53 ],
54 "schema_changed": 11
55 }
56
57 return schema
58
59 def initSite(self, site):
60 super(ContentDbPlugin, self).initSite(site)
61 if self.need_filling:
62 self.fillTableFileOptional(site)
63
64 def checkTables(self):
65 changed_tables = super(ContentDbPlugin, self).checkTables()
66 if "file_optional" in changed_tables:
67 self.need_filling = True
68 return changed_tables
69
70 # Load optional files ending
71 def loadFilesOptional(self):
72 s = time.time()
73 num = 0
74 total = 0
75 total_downloaded = 0
76 res = content_db.execute("SELECT site_id, inner_path, size, is_downloaded FROM file_optional")
77 site_sizes = collections.defaultdict(lambda: collections.defaultdict(int))
78 for row in res:
79 self.optional_files[row["site_id"]][row["inner_path"][-8:]] = 1
80 num += 1
81
82 # Update site size stats
83 site_sizes[row["site_id"]]["size_optional"] += row["size"]
84 if row["is_downloaded"]:
85 site_sizes[row["site_id"]]["optional_downloaded"] += row["size"]
86
87 # Site site size stats to sites.json settings
88 site_ids_reverse = {val: key for key, val in self.site_ids.items()}
89 for site_id, stats in site_sizes.items():
90 site_address = site_ids_reverse.get(site_id)
91 if not site_address or site_address not in self.sites:
92 self.log.error("Not found site_id: %s" % site_id)
93 continue
94 site = self.sites[site_address]
95 site.settings["size_optional"] = stats["size_optional"]
96 site.settings["optional_downloaded"] = stats["optional_downloaded"]
97 total += stats["size_optional"]
98 total_downloaded += stats["optional_downloaded"]
99
100 self.log.info(
101 "Loaded %s optional files: %.2fMB, downloaded: %.2fMB in %.3fs" %
102 (num, float(total) / 1024 / 1024, float(total_downloaded) / 1024 / 1024, time.time() - s)
103 )
104
105 if self.need_filling and self.getOptionalLimitBytes() >= 0 and self.getOptionalLimitBytes() < total_downloaded:
106 limit_bytes = self.getOptionalLimitBytes()
107 limit_new = round((float(total_downloaded) / 1024 / 1024 / 1024) * 1.1, 2) # Current limit + 10%
108 self.log.info(
109 "First startup after update and limit is smaller than downloaded files size (%.2fGB), increasing it from %.2fGB to %.2fGB" %
110 (float(total_downloaded) / 1024 / 1024 / 1024, float(limit_bytes) / 1024 / 1024 / 1024, limit_new)
111 )
112 config.saveValue("optional_limit", limit_new)
113 config.optional_limit = str(limit_new)
114
115 # Predicts if the file is optional
116 def isOptionalFile(self, site_id, inner_path):
117 return self.optional_files[site_id].get(inner_path[-8:])
118
119 # Fill file_optional table with optional files found in sites
120 def fillTableFileOptional(self, site):
121 s = time.time()
122 site_id = self.site_ids.get(site.address)
123 if not site_id:
124 return False
125 cur = self.getCursor()
126 res = cur.execute("SELECT * FROM content WHERE size_files_optional > 0 AND site_id = %s" % site_id)
127 num = 0
128 for row in res.fetchall():
129 content = site.content_manager.contents[row["inner_path"]]
130 try:
131 num += self.setContentFilesOptional(site, row["inner_path"], content, cur=cur)
132 except Exception as err:
133 self.log.error("Error loading %s into file_optional: %s" % (row["inner_path"], err))
134 cur.close()
135
136 # Set my files to pinned
137 from User import UserManager
138 user = UserManager.user_manager.get()
139 if not user:
140 user = UserManager.user_manager.create()
141 auth_address = user.getAuthAddress(site.address)
142 res = self.execute(
143 "UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path",
144 {"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address}
145 )
146
147 self.log.debug(
148 "Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" %
149 (site.address, time.time() - s, num, res.rowcount)
150 )
151 self.filled[site.address] = True
152
153 def setContentFilesOptional(self, site, content_inner_path, content, cur=None):
154 if not cur:
155 cur = self
156
157 num = 0
158 site_id = self.site_ids[site.address]
159 content_inner_dir = helper.getDirname(content_inner_path)
160 for relative_inner_path, file in content.get("files_optional", {}).items():
161 file_inner_path = content_inner_dir + relative_inner_path
162 hash_id = int(file["sha512"][0:4], 16)
163 if hash_id in site.content_manager.hashfield:
164 is_downloaded = 1
165 else:
166 is_downloaded = 0
167 if site.address + "/" + content_inner_dir in self.my_optional_files:
168 is_pinned = 1
169 else:
170 is_pinned = 0
171 cur.insertOrUpdate("file_optional", {
172 "hash_id": hash_id,
173 "size": int(file["size"])
174 }, {
175 "site_id": site_id,
176 "inner_path": file_inner_path
177 }, oninsert={
178 "time_added": int(time.time()),
179 "time_downloaded": int(time.time()) if is_downloaded else 0,
180 "is_downloaded": is_downloaded,
181 "peer": is_downloaded,
182 "is_pinned": is_pinned
183 })
184 self.optional_files[site_id][file_inner_path[-8:]] = 1
185 num += 1
186
187 return num
188
189 def setContent(self, site, inner_path, content, size=0):
190 super(ContentDbPlugin, self).setContent(site, inner_path, content, size=size)
191 old_content = site.content_manager.contents.get(inner_path, {})
192 if (not self.need_filling or self.filled.get(site.address)) and ("files_optional" in content or "files_optional" in old_content):
193 self.setContentFilesOptional(site, inner_path, content)
194 # Check deleted files
195 if old_content:
196 old_files = old_content.get("files_optional", {}).keys()
197 new_files = content.get("files_optional", {}).keys()
198 content_inner_dir = helper.getDirname(inner_path)
199 deleted = [content_inner_dir + key for key in old_files if key not in new_files]
200 if deleted:
201 site_id = self.site_ids[site.address]
202 self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": deleted})
203
204 def deleteContent(self, site, inner_path):
205 content = site.content_manager.contents.get(inner_path)
206 if content and "files_optional" in content:
207 site_id = self.site_ids[site.address]
208 content_inner_dir = helper.getDirname(inner_path)
209 optional_inner_paths = [
210 content_inner_dir + relative_inner_path
211 for relative_inner_path in content.get("files_optional", {}).keys()
212 ]
213 self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": optional_inner_paths})
214 super(ContentDbPlugin, self).deleteContent(site, inner_path)
215
216 def updatePeerNumbers(self):
217 s = time.time()
218 num_file = 0
219 num_updated = 0
220 num_site = 0
221 for site in list(self.sites.values()):
222 if not site.content_manager.has_optional_files:
223 continue
224 if not site.isServing():
225 continue
226 has_updated_hashfield = next((
227 peer
228 for peer in site.peers.values()
229 if peer.has_hashfield and peer.hashfield.time_changed > self.time_peer_numbers_updated
230 ), None)
231
232 if not has_updated_hashfield and site.content_manager.hashfield.time_changed < self.time_peer_numbers_updated:
233 continue
234
235 hashfield_peers = itertools.chain.from_iterable(
236 peer.hashfield.storage
237 for peer in site.peers.values()
238 if peer.has_hashfield
239 )
240 peer_nums = collections.Counter(
241 itertools.chain(
242 hashfield_peers,
243 site.content_manager.hashfield
244 )
245 )
246
247 site_id = self.site_ids[site.address]
248 if not site_id:
249 continue
250
251 res = self.execute("SELECT file_id, hash_id, peer FROM file_optional WHERE ?", {"site_id": site_id})
252 updates = {}
253 for row in res:
254 peer_num = peer_nums.get(row["hash_id"], 0)
255 if peer_num != row["peer"]:
256 updates[row["file_id"]] = peer_num
257
258 for file_id, peer_num in updates.items():
259 self.execute("UPDATE file_optional SET peer = ? WHERE file_id = ?", (peer_num, file_id))
260
261 num_updated += len(updates)
262 num_file += len(peer_nums)
263 num_site += 1
264
265 self.time_peer_numbers_updated = time.time()
266 self.log.debug("%s/%s peer number for %s site updated in %.3fs" % (num_updated, num_file, num_site, time.time() - s))
267
268 def queryDeletableFiles(self):
269 # First return the files with atleast 10 seeder and not accessed in last week
270 query = """
271 SELECT * FROM file_optional
272 WHERE peer > 10 AND %s
273 ORDER BY time_accessed < %s DESC, uploaded / size
274 """ % (self.getOptionalUsedWhere(), int(time.time() - 60 * 60 * 7))
275 limit_start = 0
276 while 1:
277 num = 0
278 res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
279 for row in res:
280 yield row
281 num += 1
282 if num < 50:
283 break
284 limit_start += 50
285
286 self.log.debug("queryDeletableFiles returning less-seeded files")
287
288 # Then return files less seeder but still not accessed in last week
289 query = """
290 SELECT * FROM file_optional
291 WHERE peer <= 10 AND %s
292 ORDER BY peer DESC, time_accessed < %s DESC, uploaded / size
293 """ % (self.getOptionalUsedWhere(), int(time.time() - 60 * 60 * 7))
294 limit_start = 0
295 while 1:
296 num = 0
297 res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
298 for row in res:
299 yield row
300 num += 1
301 if num < 50:
302 break
303 limit_start += 50
304
305 self.log.debug("queryDeletableFiles returning everyting")
306
307 # At the end return all files
308 query = """
309 SELECT * FROM file_optional
310 WHERE peer <= 10 AND %s
311 ORDER BY peer DESC, time_accessed, uploaded / size
312 """ % self.getOptionalUsedWhere()
313 limit_start = 0
314 while 1:
315 num = 0
316 res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
317 for row in res:
318 yield row
319 num += 1
320 if num < 50:
321 break
322 limit_start += 50
323
324 def getOptionalLimitBytes(self):
325 if config.optional_limit.endswith("%"):
326 limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
327 limit_bytes = helper.getFreeSpace() * (limit_percent / 100)
328 else:
329 limit_bytes = float(re.sub("[^0-9.]", "", config.optional_limit)) * 1024 * 1024 * 1024
330 return limit_bytes
331
332 def getOptionalUsedWhere(self):
333 maxsize = config.optional_limit_exclude_minsize * 1024 * 1024
334 query = "is_downloaded = 1 AND is_pinned = 0 AND size < %s" % maxsize
335
336 # Don't delete optional files from owned sites
337 my_site_ids = []
338 for address, site in self.sites.items():
339 if site.settings["own"]:
340 my_site_ids.append(str(self.site_ids[address]))
341
342 if my_site_ids:
343 query += " AND site_id NOT IN (%s)" % ", ".join(my_site_ids)
344 return query
345
346 def getOptionalUsedBytes(self):
347 size = self.execute("SELECT SUM(size) FROM file_optional WHERE %s" % self.getOptionalUsedWhere()).fetchone()[0]
348 if not size:
349 size = 0
350 return size
351
352 def getOptionalNeedDelete(self, size):
353 if config.optional_limit.endswith("%"):
354 limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
355 need_delete = size - ((helper.getFreeSpace() + size) * (limit_percent / 100))
356 else:
357 need_delete = size - self.getOptionalLimitBytes()
358 return need_delete
359
360 def checkOptionalLimit(self, limit=None):
361 if not limit:
362 limit = self.getOptionalLimitBytes()
363
364 if limit < 0:
365 self.log.debug("Invalid limit for optional files: %s" % limit)
366 return False
367
368 size = self.getOptionalUsedBytes()
369
370 need_delete = self.getOptionalNeedDelete(size)
371
372 self.log.debug(
373 "Optional size: %.1fMB/%.1fMB, Need delete: %.1fMB" %
374 (float(size) / 1024 / 1024, float(limit) / 1024 / 1024, float(need_delete) / 1024 / 1024)
375 )
376 if need_delete <= 0:
377 return False
378
379 self.updatePeerNumbers()
380
381 site_ids_reverse = {val: key for key, val in self.site_ids.items()}
382 deleted_file_ids = []
383 for row in self.queryDeletableFiles():
384 site_address = site_ids_reverse.get(row["site_id"])
385 site = self.sites.get(site_address)
386 if not site:
387 self.log.error("No site found for id: %s" % row["site_id"])
388 continue
389 site.log.debug("Deleting %s %.3f MB left" % (row["inner_path"], float(need_delete) / 1024 / 1024))
390 deleted_file_ids.append(row["file_id"])
391 try:
392 site.content_manager.optionalRemoved(row["inner_path"], row["hash_id"], row["size"])
393 site.storage.delete(row["inner_path"])
394 need_delete -= row["size"]
395 except Exception as err:
396 site.log.error("Error deleting %s: %s" % (row["inner_path"], err))
397
398 if need_delete <= 0:
399 break
400
401 cur = self.getCursor()
402 for file_id in deleted_file_ids:
403 cur.execute("UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE ?", {"file_id": file_id})
404 cur.close()
405
406
407@PluginManager.registerTo("SiteManager")
408class SiteManagerPlugin(object):
409 def load(self, *args, **kwargs):
410 back = super(SiteManagerPlugin, self).load(*args, **kwargs)
411 if self.sites and not content_db.optional_files_loaded and content_db.conn:
412 content_db.optional_files_loaded = True
413 content_db.loadFilesOptional()
414 return back