Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import re
3import collections
4
5import gevent
6
7from util import helper
8from Plugin import PluginManager
9from . import ContentDbPlugin
10
11
12# We can only import plugin host clases after the plugins are loaded
13@PluginManager.afterLoad
14def importPluginnedClasses():
15 global config
16 from Config import config
17
18
19def processAccessLog():
20 global access_log
21 if access_log:
22 content_db = ContentDbPlugin.content_db
23 if not content_db.conn:
24 return False
25
26 s = time.time()
27 access_log_prev = access_log
28 access_log = collections.defaultdict(dict)
29 now = int(time.time())
30 num = 0
31 for site_id in access_log_prev:
32 content_db.execute(
33 "UPDATE file_optional SET time_accessed = %s WHERE ?" % now,
34 {"site_id": site_id, "inner_path": list(access_log_prev[site_id].keys())}
35 )
36 num += len(access_log_prev[site_id])
37
38 content_db.log.debug("Inserted %s web request stat in %.3fs" % (num, time.time() - s))
39
40
41def processRequestLog():
42 global request_log
43 if request_log:
44 content_db = ContentDbPlugin.content_db
45 if not content_db.conn:
46 return False
47
48 s = time.time()
49 request_log_prev = request_log
50 request_log = collections.defaultdict(lambda: collections.defaultdict(int)) # {site_id: {inner_path1: 1, inner_path2: 1...}}
51 num = 0
52 for site_id in request_log_prev:
53 for inner_path, uploaded in request_log_prev[site_id].items():
54 content_db.execute(
55 "UPDATE file_optional SET uploaded = uploaded + %s WHERE ?" % uploaded,
56 {"site_id": site_id, "inner_path": inner_path}
57 )
58 num += 1
59 content_db.log.debug("Inserted %s file request stat in %.3fs" % (num, time.time() - s))
60
61
62if "access_log" not in locals().keys(): # To keep between module reloads
63 access_log = collections.defaultdict(dict) # {site_id: {inner_path1: 1, inner_path2: 1...}}
64 request_log = collections.defaultdict(lambda: collections.defaultdict(int)) # {site_id: {inner_path1: 1, inner_path2: 1...}}
65 helper.timer(61, processAccessLog)
66 helper.timer(60, processRequestLog)
67
68
69@PluginManager.registerTo("ContentManager")
70class ContentManagerPlugin(object):
71 def __init__(self, *args, **kwargs):
72 self.cache_is_pinned = {}
73 super(ContentManagerPlugin, self).__init__(*args, **kwargs)
74
75 def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
76 if "|" in inner_path: # Big file piece
77 file_inner_path, file_range = inner_path.split("|")
78 else:
79 file_inner_path = inner_path
80
81 self.contents.db.executeDelayed(
82 "UPDATE file_optional SET time_downloaded = :now, is_downloaded = 1, peer = peer + 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 0",
83 {"now": int(time.time()), "site_id": self.contents.db.site_ids[self.site.address], "inner_path": file_inner_path}
84 )
85
86 return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
87
88 def optionalRemoved(self, inner_path, hash_id, size=None):
89 res = self.contents.db.execute(
90 "UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 1",
91 {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
92 )
93
94 if res.rowcount > 0:
95 back = super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
96 # Re-add to hashfield if we have other file with the same hash_id
97 if self.isDownloaded(hash_id=hash_id, force_check_db=True):
98 self.hashfield.appendHashId(hash_id)
99 else:
100 back = False
101 self.cache_is_pinned = {}
102 return back
103
104 def optionalRenamed(self, inner_path_old, inner_path_new):
105 back = super(ContentManagerPlugin, self).optionalRenamed(inner_path_old, inner_path_new)
106 self.cache_is_pinned = {}
107 self.contents.db.execute(
108 "UPDATE file_optional SET inner_path = :inner_path_new WHERE site_id = :site_id AND inner_path = :inner_path_old",
109 {"site_id": self.contents.db.site_ids[self.site.address], "inner_path_old": inner_path_old, "inner_path_new": inner_path_new}
110 )
111 return back
112
113 def isDownloaded(self, inner_path=None, hash_id=None, force_check_db=False):
114 if hash_id and not force_check_db and hash_id not in self.hashfield:
115 return False
116
117 if inner_path:
118 res = self.contents.db.execute(
119 "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1",
120 {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
121 )
122 else:
123 res = self.contents.db.execute(
124 "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND hash_id = :hash_id AND is_downloaded = 1 LIMIT 1",
125 {"site_id": self.contents.db.site_ids[self.site.address], "hash_id": hash_id}
126 )
127 row = res.fetchone()
128 if row and row["is_downloaded"]:
129 return True
130 else:
131 return False
132
133 def isPinned(self, inner_path):
134 if inner_path in self.cache_is_pinned:
135 self.site.log.debug("Cached is pinned: %s" % inner_path)
136 return self.cache_is_pinned[inner_path]
137
138 res = self.contents.db.execute(
139 "SELECT is_pinned FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1",
140 {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
141 )
142 row = res.fetchone()
143
144 if row and row[0]:
145 is_pinned = True
146 else:
147 is_pinned = False
148
149 self.cache_is_pinned[inner_path] = is_pinned
150 self.site.log.debug("Cache set is pinned: %s %s" % (inner_path, is_pinned))
151
152 return is_pinned
153
154 def setPin(self, inner_path, is_pinned):
155 content_db = self.contents.db
156 site_id = content_db.site_ids[self.site.address]
157 content_db.execute("UPDATE file_optional SET is_pinned = %d WHERE ?" % is_pinned, {"site_id": site_id, "inner_path": inner_path})
158 self.cache_is_pinned = {}
159
160 def optionalDelete(self, inner_path):
161 if self.isPinned(inner_path):
162 self.site.log.debug("Skip deleting pinned optional file: %s" % inner_path)
163 return False
164 else:
165 return super(ContentManagerPlugin, self).optionalDelete(inner_path)
166
167
168@PluginManager.registerTo("WorkerManager")
169class WorkerManagerPlugin(object):
170 def doneTask(self, task):
171 super(WorkerManagerPlugin, self).doneTask(task)
172
173 if task["optional_hash_id"] and not self.tasks: # Execute delayed queries immedietly after tasks finished
174 ContentDbPlugin.content_db.processDelayed()
175
176
177@PluginManager.registerTo("UiRequest")
178class UiRequestPlugin(object):
179 def parsePath(self, path):
180 global access_log
181 path_parts = super(UiRequestPlugin, self).parsePath(path)
182 if path_parts:
183 site_id = ContentDbPlugin.content_db.site_ids.get(path_parts["request_address"])
184 if site_id:
185 if ContentDbPlugin.content_db.isOptionalFile(site_id, path_parts["inner_path"]):
186 access_log[site_id][path_parts["inner_path"]] = 1
187 return path_parts
188
189
190@PluginManager.registerTo("FileRequest")
191class FileRequestPlugin(object):
192 def actionGetFile(self, params):
193 stats = super(FileRequestPlugin, self).actionGetFile(params)
194 self.recordFileRequest(params["site"], params["inner_path"], stats)
195 return stats
196
197 def actionStreamFile(self, params):
198 stats = super(FileRequestPlugin, self).actionStreamFile(params)
199 self.recordFileRequest(params["site"], params["inner_path"], stats)
200 return stats
201
202 def recordFileRequest(self, site_address, inner_path, stats):
203 if not stats:
204 # Only track the last request of files
205 return False
206 site_id = ContentDbPlugin.content_db.site_ids[site_address]
207 if site_id and ContentDbPlugin.content_db.isOptionalFile(site_id, inner_path):
208 request_log[site_id][inner_path] += stats["bytes_sent"]
209
210
211@PluginManager.registerTo("Site")
212class SitePlugin(object):
213 def isDownloadable(self, inner_path):
214 is_downloadable = super(SitePlugin, self).isDownloadable(inner_path)
215 if is_downloadable:
216 return is_downloadable
217
218 for path in self.settings.get("optional_help", {}).keys():
219 if inner_path.startswith(path):
220 return True
221
222 return False
223
224 def fileForgot(self, inner_path):
225 if "|" in inner_path and self.content_manager.isPinned(re.sub(r"\|.*", "", inner_path)):
226 self.log.debug("File %s is pinned, no fileForgot" % inner_path)
227 return False
228 else:
229 return super(SitePlugin, self).fileForgot(inner_path)
230
231 def fileDone(self, inner_path):
232 if "|" in inner_path and self.bad_files.get(inner_path, 0) > 5: # Idle optional file done
233 inner_path_file = re.sub(r"\|.*", "", inner_path)
234 num_changed = 0
235 for key, val in self.bad_files.items():
236 if key.startswith(inner_path_file) and val > 1:
237 self.bad_files[key] = 1
238 num_changed += 1
239 self.log.debug("Idle optional file piece done, changed retry number of %s pieces." % num_changed)
240 if num_changed:
241 gevent.spawn(self.retryBadFiles)
242
243 return super(SitePlugin, self).fileDone(inner_path)
244
245
246@PluginManager.registerTo("ConfigPlugin")
247class ConfigPlugin(object):
248 def createArguments(self):
249 group = self.parser.add_argument_group("OptionalManager plugin")
250 group.add_argument('--optional_limit', help='Limit total size of optional files', default="10%", metavar="GB or free space %")
251 group.add_argument('--optional_limit_exclude_minsize', help='Exclude files larger than this limit from optional size limit calculation', default=20, metavar="MB", type=int)
252
253 return super(ConfigPlugin, self).createArguments()