Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 253 lines 10 kB view raw
1import time 2import re 3import collections 4 5import gevent 6 7from util import helper 8from Plugin import PluginManager 9from . import ContentDbPlugin 10 11 12# We can only import plugin host clases after the plugins are loaded 13@PluginManager.afterLoad 14def importPluginnedClasses(): 15 global config 16 from Config import config 17 18 19def processAccessLog(): 20 global access_log 21 if access_log: 22 content_db = ContentDbPlugin.content_db 23 if not content_db.conn: 24 return False 25 26 s = time.time() 27 access_log_prev = access_log 28 access_log = collections.defaultdict(dict) 29 now = int(time.time()) 30 num = 0 31 for site_id in access_log_prev: 32 content_db.execute( 33 "UPDATE file_optional SET time_accessed = %s WHERE ?" % now, 34 {"site_id": site_id, "inner_path": list(access_log_prev[site_id].keys())} 35 ) 36 num += len(access_log_prev[site_id]) 37 38 content_db.log.debug("Inserted %s web request stat in %.3fs" % (num, time.time() - s)) 39 40 41def processRequestLog(): 42 global request_log 43 if request_log: 44 content_db = ContentDbPlugin.content_db 45 if not content_db.conn: 46 return False 47 48 s = time.time() 49 request_log_prev = request_log 50 request_log = collections.defaultdict(lambda: collections.defaultdict(int)) # {site_id: {inner_path1: 1, inner_path2: 1...}} 51 num = 0 52 for site_id in request_log_prev: 53 for inner_path, uploaded in request_log_prev[site_id].items(): 54 content_db.execute( 55 "UPDATE file_optional SET uploaded = uploaded + %s WHERE ?" % uploaded, 56 {"site_id": site_id, "inner_path": inner_path} 57 ) 58 num += 1 59 content_db.log.debug("Inserted %s file request stat in %.3fs" % (num, time.time() - s)) 60 61 62if "access_log" not in locals().keys(): # To keep between module reloads 63 access_log = collections.defaultdict(dict) # {site_id: {inner_path1: 1, inner_path2: 1...}} 64 request_log = collections.defaultdict(lambda: collections.defaultdict(int)) # {site_id: {inner_path1: 1, inner_path2: 1...}} 65 helper.timer(61, processAccessLog) 66 helper.timer(60, processRequestLog) 67 68 69@PluginManager.registerTo("ContentManager") 70class ContentManagerPlugin(object): 71 def __init__(self, *args, **kwargs): 72 self.cache_is_pinned = {} 73 super(ContentManagerPlugin, self).__init__(*args, **kwargs) 74 75 def optionalDownloaded(self, inner_path, hash_id, size=None, own=False): 76 if "|" in inner_path: # Big file piece 77 file_inner_path, file_range = inner_path.split("|") 78 else: 79 file_inner_path = inner_path 80 81 self.contents.db.executeDelayed( 82 "UPDATE file_optional SET time_downloaded = :now, is_downloaded = 1, peer = peer + 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 0", 83 {"now": int(time.time()), "site_id": self.contents.db.site_ids[self.site.address], "inner_path": file_inner_path} 84 ) 85 86 return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own) 87 88 def optionalRemoved(self, inner_path, hash_id, size=None): 89 res = self.contents.db.execute( 90 "UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 1", 91 {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path} 92 ) 93 94 if res.rowcount > 0: 95 back = super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size) 96 # Re-add to hashfield if we have other file with the same hash_id 97 if self.isDownloaded(hash_id=hash_id, force_check_db=True): 98 self.hashfield.appendHashId(hash_id) 99 else: 100 back = False 101 self.cache_is_pinned = {} 102 return back 103 104 def optionalRenamed(self, inner_path_old, inner_path_new): 105 back = super(ContentManagerPlugin, self).optionalRenamed(inner_path_old, inner_path_new) 106 self.cache_is_pinned = {} 107 self.contents.db.execute( 108 "UPDATE file_optional SET inner_path = :inner_path_new WHERE site_id = :site_id AND inner_path = :inner_path_old", 109 {"site_id": self.contents.db.site_ids[self.site.address], "inner_path_old": inner_path_old, "inner_path_new": inner_path_new} 110 ) 111 return back 112 113 def isDownloaded(self, inner_path=None, hash_id=None, force_check_db=False): 114 if hash_id and not force_check_db and hash_id not in self.hashfield: 115 return False 116 117 if inner_path: 118 res = self.contents.db.execute( 119 "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1", 120 {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path} 121 ) 122 else: 123 res = self.contents.db.execute( 124 "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND hash_id = :hash_id AND is_downloaded = 1 LIMIT 1", 125 {"site_id": self.contents.db.site_ids[self.site.address], "hash_id": hash_id} 126 ) 127 row = res.fetchone() 128 if row and row["is_downloaded"]: 129 return True 130 else: 131 return False 132 133 def isPinned(self, inner_path): 134 if inner_path in self.cache_is_pinned: 135 self.site.log.debug("Cached is pinned: %s" % inner_path) 136 return self.cache_is_pinned[inner_path] 137 138 res = self.contents.db.execute( 139 "SELECT is_pinned FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1", 140 {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path} 141 ) 142 row = res.fetchone() 143 144 if row and row[0]: 145 is_pinned = True 146 else: 147 is_pinned = False 148 149 self.cache_is_pinned[inner_path] = is_pinned 150 self.site.log.debug("Cache set is pinned: %s %s" % (inner_path, is_pinned)) 151 152 return is_pinned 153 154 def setPin(self, inner_path, is_pinned): 155 content_db = self.contents.db 156 site_id = content_db.site_ids[self.site.address] 157 content_db.execute("UPDATE file_optional SET is_pinned = %d WHERE ?" % is_pinned, {"site_id": site_id, "inner_path": inner_path}) 158 self.cache_is_pinned = {} 159 160 def optionalDelete(self, inner_path): 161 if self.isPinned(inner_path): 162 self.site.log.debug("Skip deleting pinned optional file: %s" % inner_path) 163 return False 164 else: 165 return super(ContentManagerPlugin, self).optionalDelete(inner_path) 166 167 168@PluginManager.registerTo("WorkerManager") 169class WorkerManagerPlugin(object): 170 def doneTask(self, task): 171 super(WorkerManagerPlugin, self).doneTask(task) 172 173 if task["optional_hash_id"] and not self.tasks: # Execute delayed queries immedietly after tasks finished 174 ContentDbPlugin.content_db.processDelayed() 175 176 177@PluginManager.registerTo("UiRequest") 178class UiRequestPlugin(object): 179 def parsePath(self, path): 180 global access_log 181 path_parts = super(UiRequestPlugin, self).parsePath(path) 182 if path_parts: 183 site_id = ContentDbPlugin.content_db.site_ids.get(path_parts["request_address"]) 184 if site_id: 185 if ContentDbPlugin.content_db.isOptionalFile(site_id, path_parts["inner_path"]): 186 access_log[site_id][path_parts["inner_path"]] = 1 187 return path_parts 188 189 190@PluginManager.registerTo("FileRequest") 191class FileRequestPlugin(object): 192 def actionGetFile(self, params): 193 stats = super(FileRequestPlugin, self).actionGetFile(params) 194 self.recordFileRequest(params["site"], params["inner_path"], stats) 195 return stats 196 197 def actionStreamFile(self, params): 198 stats = super(FileRequestPlugin, self).actionStreamFile(params) 199 self.recordFileRequest(params["site"], params["inner_path"], stats) 200 return stats 201 202 def recordFileRequest(self, site_address, inner_path, stats): 203 if not stats: 204 # Only track the last request of files 205 return False 206 site_id = ContentDbPlugin.content_db.site_ids[site_address] 207 if site_id and ContentDbPlugin.content_db.isOptionalFile(site_id, inner_path): 208 request_log[site_id][inner_path] += stats["bytes_sent"] 209 210 211@PluginManager.registerTo("Site") 212class SitePlugin(object): 213 def isDownloadable(self, inner_path): 214 is_downloadable = super(SitePlugin, self).isDownloadable(inner_path) 215 if is_downloadable: 216 return is_downloadable 217 218 for path in self.settings.get("optional_help", {}).keys(): 219 if inner_path.startswith(path): 220 return True 221 222 return False 223 224 def fileForgot(self, inner_path): 225 if "|" in inner_path and self.content_manager.isPinned(re.sub(r"\|.*", "", inner_path)): 226 self.log.debug("File %s is pinned, no fileForgot" % inner_path) 227 return False 228 else: 229 return super(SitePlugin, self).fileForgot(inner_path) 230 231 def fileDone(self, inner_path): 232 if "|" in inner_path and self.bad_files.get(inner_path, 0) > 5: # Idle optional file done 233 inner_path_file = re.sub(r"\|.*", "", inner_path) 234 num_changed = 0 235 for key, val in self.bad_files.items(): 236 if key.startswith(inner_path_file) and val > 1: 237 self.bad_files[key] = 1 238 num_changed += 1 239 self.log.debug("Idle optional file piece done, changed retry number of %s pieces." % num_changed) 240 if num_changed: 241 gevent.spawn(self.retryBadFiles) 242 243 return super(SitePlugin, self).fileDone(inner_path) 244 245 246@PluginManager.registerTo("ConfigPlugin") 247class ConfigPlugin(object): 248 def createArguments(self): 249 group = self.parser.add_argument_group("OptionalManager plugin") 250 group.add_argument('--optional_limit', help='Limit total size of optional files', default="10%", metavar="GB or free space %") 251 group.add_argument('--optional_limit_exclude_minsize', help='Exclude files larger than this limit from optional size limit calculation', default=20, metavar="MB", type=int) 252 253 return super(ConfigPlugin, self).createArguments()