Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 414 lines 17 kB view raw
1import time 2import collections 3import itertools 4import re 5 6import gevent 7 8from util import helper 9from Plugin import PluginManager 10from Config import config 11from Debug import Debug 12 13if "content_db" not in locals().keys(): # To keep between module reloads 14 content_db = None 15 16 17@PluginManager.registerTo("ContentDb") 18class ContentDbPlugin(object): 19 def __init__(self, *args, **kwargs): 20 global content_db 21 content_db = self 22 self.filled = {} # Site addresses that already filled from content.json 23 self.need_filling = False # file_optional table just created, fill data from content.json files 24 self.time_peer_numbers_updated = 0 25 self.my_optional_files = {} # Last 50 site_address/inner_path called by fileWrite (auto-pinning these files) 26 self.optional_files = collections.defaultdict(dict) 27 self.optional_files_loaded = False 28 self.timer_check_optional = helper.timer(60 * 5, self.checkOptionalLimit) 29 super(ContentDbPlugin, self).__init__(*args, **kwargs) 30 31 def getSchema(self): 32 schema = super(ContentDbPlugin, self).getSchema() 33 34 # Need file_optional table 35 schema["tables"]["file_optional"] = { 36 "cols": [ 37 ["file_id", "INTEGER PRIMARY KEY UNIQUE NOT NULL"], 38 ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"], 39 ["inner_path", "TEXT"], 40 ["hash_id", "INTEGER"], 41 ["size", "INTEGER"], 42 ["peer", "INTEGER DEFAULT 0"], 43 ["uploaded", "INTEGER DEFAULT 0"], 44 ["is_downloaded", "INTEGER DEFAULT 0"], 45 ["is_pinned", "INTEGER DEFAULT 0"], 46 ["time_added", "INTEGER DEFAULT 0"], 47 ["time_downloaded", "INTEGER DEFAULT 0"], 48 ["time_accessed", "INTEGER DEFAULT 0"] 49 ], 50 "indexes": [ 51 "CREATE UNIQUE INDEX file_optional_key ON file_optional (site_id, inner_path)", 52 "CREATE INDEX is_downloaded ON file_optional (is_downloaded)" 53 ], 54 "schema_changed": 11 55 } 56 57 return schema 58 59 def initSite(self, site): 60 super(ContentDbPlugin, self).initSite(site) 61 if self.need_filling: 62 self.fillTableFileOptional(site) 63 64 def checkTables(self): 65 changed_tables = super(ContentDbPlugin, self).checkTables() 66 if "file_optional" in changed_tables: 67 self.need_filling = True 68 return changed_tables 69 70 # Load optional files ending 71 def loadFilesOptional(self): 72 s = time.time() 73 num = 0 74 total = 0 75 total_downloaded = 0 76 res = content_db.execute("SELECT site_id, inner_path, size, is_downloaded FROM file_optional") 77 site_sizes = collections.defaultdict(lambda: collections.defaultdict(int)) 78 for row in res: 79 self.optional_files[row["site_id"]][row["inner_path"][-8:]] = 1 80 num += 1 81 82 # Update site size stats 83 site_sizes[row["site_id"]]["size_optional"] += row["size"] 84 if row["is_downloaded"]: 85 site_sizes[row["site_id"]]["optional_downloaded"] += row["size"] 86 87 # Site site size stats to sites.json settings 88 site_ids_reverse = {val: key for key, val in self.site_ids.items()} 89 for site_id, stats in site_sizes.items(): 90 site_address = site_ids_reverse.get(site_id) 91 if not site_address or site_address not in self.sites: 92 self.log.error("Not found site_id: %s" % site_id) 93 continue 94 site = self.sites[site_address] 95 site.settings["size_optional"] = stats["size_optional"] 96 site.settings["optional_downloaded"] = stats["optional_downloaded"] 97 total += stats["size_optional"] 98 total_downloaded += stats["optional_downloaded"] 99 100 self.log.info( 101 "Loaded %s optional files: %.2fMB, downloaded: %.2fMB in %.3fs" % 102 (num, float(total) / 1024 / 1024, float(total_downloaded) / 1024 / 1024, time.time() - s) 103 ) 104 105 if self.need_filling and self.getOptionalLimitBytes() >= 0 and self.getOptionalLimitBytes() < total_downloaded: 106 limit_bytes = self.getOptionalLimitBytes() 107 limit_new = round((float(total_downloaded) / 1024 / 1024 / 1024) * 1.1, 2) # Current limit + 10% 108 self.log.info( 109 "First startup after update and limit is smaller than downloaded files size (%.2fGB), increasing it from %.2fGB to %.2fGB" % 110 (float(total_downloaded) / 1024 / 1024 / 1024, float(limit_bytes) / 1024 / 1024 / 1024, limit_new) 111 ) 112 config.saveValue("optional_limit", limit_new) 113 config.optional_limit = str(limit_new) 114 115 # Predicts if the file is optional 116 def isOptionalFile(self, site_id, inner_path): 117 return self.optional_files[site_id].get(inner_path[-8:]) 118 119 # Fill file_optional table with optional files found in sites 120 def fillTableFileOptional(self, site): 121 s = time.time() 122 site_id = self.site_ids.get(site.address) 123 if not site_id: 124 return False 125 cur = self.getCursor() 126 res = cur.execute("SELECT * FROM content WHERE size_files_optional > 0 AND site_id = %s" % site_id) 127 num = 0 128 for row in res.fetchall(): 129 content = site.content_manager.contents[row["inner_path"]] 130 try: 131 num += self.setContentFilesOptional(site, row["inner_path"], content, cur=cur) 132 except Exception as err: 133 self.log.error("Error loading %s into file_optional: %s" % (row["inner_path"], err)) 134 cur.close() 135 136 # Set my files to pinned 137 from User import UserManager 138 user = UserManager.user_manager.get() 139 if not user: 140 user = UserManager.user_manager.create() 141 auth_address = user.getAuthAddress(site.address) 142 res = self.execute( 143 "UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path", 144 {"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address} 145 ) 146 147 self.log.debug( 148 "Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" % 149 (site.address, time.time() - s, num, res.rowcount) 150 ) 151 self.filled[site.address] = True 152 153 def setContentFilesOptional(self, site, content_inner_path, content, cur=None): 154 if not cur: 155 cur = self 156 157 num = 0 158 site_id = self.site_ids[site.address] 159 content_inner_dir = helper.getDirname(content_inner_path) 160 for relative_inner_path, file in content.get("files_optional", {}).items(): 161 file_inner_path = content_inner_dir + relative_inner_path 162 hash_id = int(file["sha512"][0:4], 16) 163 if hash_id in site.content_manager.hashfield: 164 is_downloaded = 1 165 else: 166 is_downloaded = 0 167 if site.address + "/" + content_inner_dir in self.my_optional_files: 168 is_pinned = 1 169 else: 170 is_pinned = 0 171 cur.insertOrUpdate("file_optional", { 172 "hash_id": hash_id, 173 "size": int(file["size"]) 174 }, { 175 "site_id": site_id, 176 "inner_path": file_inner_path 177 }, oninsert={ 178 "time_added": int(time.time()), 179 "time_downloaded": int(time.time()) if is_downloaded else 0, 180 "is_downloaded": is_downloaded, 181 "peer": is_downloaded, 182 "is_pinned": is_pinned 183 }) 184 self.optional_files[site_id][file_inner_path[-8:]] = 1 185 num += 1 186 187 return num 188 189 def setContent(self, site, inner_path, content, size=0): 190 super(ContentDbPlugin, self).setContent(site, inner_path, content, size=size) 191 old_content = site.content_manager.contents.get(inner_path, {}) 192 if (not self.need_filling or self.filled.get(site.address)) and ("files_optional" in content or "files_optional" in old_content): 193 self.setContentFilesOptional(site, inner_path, content) 194 # Check deleted files 195 if old_content: 196 old_files = old_content.get("files_optional", {}).keys() 197 new_files = content.get("files_optional", {}).keys() 198 content_inner_dir = helper.getDirname(inner_path) 199 deleted = [content_inner_dir + key for key in old_files if key not in new_files] 200 if deleted: 201 site_id = self.site_ids[site.address] 202 self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": deleted}) 203 204 def deleteContent(self, site, inner_path): 205 content = site.content_manager.contents.get(inner_path) 206 if content and "files_optional" in content: 207 site_id = self.site_ids[site.address] 208 content_inner_dir = helper.getDirname(inner_path) 209 optional_inner_paths = [ 210 content_inner_dir + relative_inner_path 211 for relative_inner_path in content.get("files_optional", {}).keys() 212 ] 213 self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": optional_inner_paths}) 214 super(ContentDbPlugin, self).deleteContent(site, inner_path) 215 216 def updatePeerNumbers(self): 217 s = time.time() 218 num_file = 0 219 num_updated = 0 220 num_site = 0 221 for site in list(self.sites.values()): 222 if not site.content_manager.has_optional_files: 223 continue 224 if not site.isServing(): 225 continue 226 has_updated_hashfield = next(( 227 peer 228 for peer in site.peers.values() 229 if peer.has_hashfield and peer.hashfield.time_changed > self.time_peer_numbers_updated 230 ), None) 231 232 if not has_updated_hashfield and site.content_manager.hashfield.time_changed < self.time_peer_numbers_updated: 233 continue 234 235 hashfield_peers = itertools.chain.from_iterable( 236 peer.hashfield.storage 237 for peer in site.peers.values() 238 if peer.has_hashfield 239 ) 240 peer_nums = collections.Counter( 241 itertools.chain( 242 hashfield_peers, 243 site.content_manager.hashfield 244 ) 245 ) 246 247 site_id = self.site_ids[site.address] 248 if not site_id: 249 continue 250 251 res = self.execute("SELECT file_id, hash_id, peer FROM file_optional WHERE ?", {"site_id": site_id}) 252 updates = {} 253 for row in res: 254 peer_num = peer_nums.get(row["hash_id"], 0) 255 if peer_num != row["peer"]: 256 updates[row["file_id"]] = peer_num 257 258 for file_id, peer_num in updates.items(): 259 self.execute("UPDATE file_optional SET peer = ? WHERE file_id = ?", (peer_num, file_id)) 260 261 num_updated += len(updates) 262 num_file += len(peer_nums) 263 num_site += 1 264 265 self.time_peer_numbers_updated = time.time() 266 self.log.debug("%s/%s peer number for %s site updated in %.3fs" % (num_updated, num_file, num_site, time.time() - s)) 267 268 def queryDeletableFiles(self): 269 # First return the files with atleast 10 seeder and not accessed in last week 270 query = """ 271 SELECT * FROM file_optional 272 WHERE peer > 10 AND %s 273 ORDER BY time_accessed < %s DESC, uploaded / size 274 """ % (self.getOptionalUsedWhere(), int(time.time() - 60 * 60 * 7)) 275 limit_start = 0 276 while 1: 277 num = 0 278 res = self.execute("%s LIMIT %s, 50" % (query, limit_start)) 279 for row in res: 280 yield row 281 num += 1 282 if num < 50: 283 break 284 limit_start += 50 285 286 self.log.debug("queryDeletableFiles returning less-seeded files") 287 288 # Then return files less seeder but still not accessed in last week 289 query = """ 290 SELECT * FROM file_optional 291 WHERE peer <= 10 AND %s 292 ORDER BY peer DESC, time_accessed < %s DESC, uploaded / size 293 """ % (self.getOptionalUsedWhere(), int(time.time() - 60 * 60 * 7)) 294 limit_start = 0 295 while 1: 296 num = 0 297 res = self.execute("%s LIMIT %s, 50" % (query, limit_start)) 298 for row in res: 299 yield row 300 num += 1 301 if num < 50: 302 break 303 limit_start += 50 304 305 self.log.debug("queryDeletableFiles returning everyting") 306 307 # At the end return all files 308 query = """ 309 SELECT * FROM file_optional 310 WHERE peer <= 10 AND %s 311 ORDER BY peer DESC, time_accessed, uploaded / size 312 """ % self.getOptionalUsedWhere() 313 limit_start = 0 314 while 1: 315 num = 0 316 res = self.execute("%s LIMIT %s, 50" % (query, limit_start)) 317 for row in res: 318 yield row 319 num += 1 320 if num < 50: 321 break 322 limit_start += 50 323 324 def getOptionalLimitBytes(self): 325 if config.optional_limit.endswith("%"): 326 limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit)) 327 limit_bytes = helper.getFreeSpace() * (limit_percent / 100) 328 else: 329 limit_bytes = float(re.sub("[^0-9.]", "", config.optional_limit)) * 1024 * 1024 * 1024 330 return limit_bytes 331 332 def getOptionalUsedWhere(self): 333 maxsize = config.optional_limit_exclude_minsize * 1024 * 1024 334 query = "is_downloaded = 1 AND is_pinned = 0 AND size < %s" % maxsize 335 336 # Don't delete optional files from owned sites 337 my_site_ids = [] 338 for address, site in self.sites.items(): 339 if site.settings["own"]: 340 my_site_ids.append(str(self.site_ids[address])) 341 342 if my_site_ids: 343 query += " AND site_id NOT IN (%s)" % ", ".join(my_site_ids) 344 return query 345 346 def getOptionalUsedBytes(self): 347 size = self.execute("SELECT SUM(size) FROM file_optional WHERE %s" % self.getOptionalUsedWhere()).fetchone()[0] 348 if not size: 349 size = 0 350 return size 351 352 def getOptionalNeedDelete(self, size): 353 if config.optional_limit.endswith("%"): 354 limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit)) 355 need_delete = size - ((helper.getFreeSpace() + size) * (limit_percent / 100)) 356 else: 357 need_delete = size - self.getOptionalLimitBytes() 358 return need_delete 359 360 def checkOptionalLimit(self, limit=None): 361 if not limit: 362 limit = self.getOptionalLimitBytes() 363 364 if limit < 0: 365 self.log.debug("Invalid limit for optional files: %s" % limit) 366 return False 367 368 size = self.getOptionalUsedBytes() 369 370 need_delete = self.getOptionalNeedDelete(size) 371 372 self.log.debug( 373 "Optional size: %.1fMB/%.1fMB, Need delete: %.1fMB" % 374 (float(size) / 1024 / 1024, float(limit) / 1024 / 1024, float(need_delete) / 1024 / 1024) 375 ) 376 if need_delete <= 0: 377 return False 378 379 self.updatePeerNumbers() 380 381 site_ids_reverse = {val: key for key, val in self.site_ids.items()} 382 deleted_file_ids = [] 383 for row in self.queryDeletableFiles(): 384 site_address = site_ids_reverse.get(row["site_id"]) 385 site = self.sites.get(site_address) 386 if not site: 387 self.log.error("No site found for id: %s" % row["site_id"]) 388 continue 389 site.log.debug("Deleting %s %.3f MB left" % (row["inner_path"], float(need_delete) / 1024 / 1024)) 390 deleted_file_ids.append(row["file_id"]) 391 try: 392 site.content_manager.optionalRemoved(row["inner_path"], row["hash_id"], row["size"]) 393 site.storage.delete(row["inner_path"]) 394 need_delete -= row["size"] 395 except Exception as err: 396 site.log.error("Error deleting %s: %s" % (row["inner_path"], err)) 397 398 if need_delete <= 0: 399 break 400 401 cur = self.getCursor() 402 for file_id in deleted_file_ids: 403 cur.execute("UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE ?", {"file_id": file_id}) 404 cur.close() 405 406 407@PluginManager.registerTo("SiteManager") 408class SiteManagerPlugin(object): 409 def load(self, *args, **kwargs): 410 back = super(SiteManagerPlugin, self).load(*args, **kwargs) 411 if self.sites and not content_db.optional_files_loaded and content_db.conn: 412 content_db.optional_files_loaded = True 413 content_db.loadFilesOptional() 414 return back