Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import re
3import html
4import os
5
6from Plugin import PluginManager
7from Translate import Translate
8from Config import config
9from util.Flag import flag
10
11from .ContentFilterStorage import ContentFilterStorage
12
13
14plugin_dir = os.path.dirname(__file__)
15
16if "_" not in locals():
17 _ = Translate(plugin_dir + "/languages/")
18
19
20@PluginManager.registerTo("SiteManager")
21class SiteManagerPlugin(object):
22 def load(self, *args, **kwargs):
23 global filter_storage
24 super(SiteManagerPlugin, self).load(*args, **kwargs)
25 filter_storage = ContentFilterStorage(site_manager=self)
26
27 def add(self, address, *args, **kwargs):
28 should_ignore_block = kwargs.get("ignore_block") or kwargs.get("settings")
29 if should_ignore_block:
30 block_details = None
31 elif filter_storage.isSiteblocked(address):
32 block_details = filter_storage.getSiteblockDetails(address)
33 else:
34 address_hashed = filter_storage.getSiteAddressHashed(address)
35 if filter_storage.isSiteblocked(address_hashed):
36 block_details = filter_storage.getSiteblockDetails(address_hashed)
37 else:
38 block_details = None
39
40 if block_details:
41 raise Exception("Site blocked: %s" % html.escape(block_details.get("reason", "unknown reason")))
42 else:
43 return super(SiteManagerPlugin, self).add(address, *args, **kwargs)
44
45
46@PluginManager.registerTo("UiWebsocket")
47class UiWebsocketPlugin(object):
48 # Mute
49 def cbMuteAdd(self, to, auth_address, cert_user_id, reason):
50 filter_storage.file_content["mutes"][auth_address] = {
51 "cert_user_id": cert_user_id, "reason": reason, "source": self.site.address, "date_added": time.time()
52 }
53 filter_storage.save()
54 filter_storage.changeDbs(auth_address, "remove")
55 self.response(to, "ok")
56
57 @flag.no_multiuser
58 def actionMuteAdd(self, to, auth_address, cert_user_id, reason):
59 if "ADMIN" in self.getPermissions(to):
60 self.cbMuteAdd(to, auth_address, cert_user_id, reason)
61 else:
62 self.cmd(
63 "confirm",
64 [_["Hide all content from <b>%s</b>?"] % html.escape(cert_user_id), _["Mute"]],
65 lambda res: self.cbMuteAdd(to, auth_address, cert_user_id, reason)
66 )
67
68 @flag.no_multiuser
69 def cbMuteRemove(self, to, auth_address):
70 del filter_storage.file_content["mutes"][auth_address]
71 filter_storage.save()
72 filter_storage.changeDbs(auth_address, "load")
73 self.response(to, "ok")
74
75 @flag.no_multiuser
76 def actionMuteRemove(self, to, auth_address):
77 if "ADMIN" in self.getPermissions(to):
78 self.cbMuteRemove(to, auth_address)
79 else:
80 cert_user_id = html.escape(filter_storage.file_content["mutes"][auth_address]["cert_user_id"])
81 self.cmd(
82 "confirm",
83 [_["Unmute <b>%s</b>?"] % cert_user_id, _["Unmute"]],
84 lambda res: self.cbMuteRemove(to, auth_address)
85 )
86
87 @flag.admin
88 def actionMuteList(self, to):
89 self.response(to, filter_storage.file_content["mutes"])
90
91 # Siteblock
92 @flag.no_multiuser
93 @flag.admin
94 def actionSiteblockIgnoreAddSite(self, to, site_address):
95 if site_address in filter_storage.site_manager.sites:
96 return {"error": "Site already added"}
97 else:
98 if filter_storage.site_manager.need(site_address, ignore_block=True):
99 return "ok"
100 else:
101 return {"error": "Invalid address"}
102
103 @flag.no_multiuser
104 @flag.admin
105 def actionSiteblockAdd(self, to, site_address, reason=None):
106 filter_storage.file_content["siteblocks"][site_address] = {"date_added": time.time(), "reason": reason}
107 filter_storage.save()
108 self.response(to, "ok")
109
110 @flag.no_multiuser
111 @flag.admin
112 def actionSiteblockRemove(self, to, site_address):
113 del filter_storage.file_content["siteblocks"][site_address]
114 filter_storage.save()
115 self.response(to, "ok")
116
117 @flag.admin
118 def actionSiteblockList(self, to):
119 self.response(to, filter_storage.file_content["siteblocks"])
120
121 @flag.admin
122 def actionSiteblockGet(self, to, site_address):
123 if filter_storage.isSiteblocked(site_address):
124 res = filter_storage.getSiteblockDetails(site_address)
125 else:
126 site_address_hashed = filter_storage.getSiteAddressHashed(site_address)
127 if filter_storage.isSiteblocked(site_address_hashed):
128 res = filter_storage.getSiteblockDetails(site_address_hashed)
129 else:
130 res = {"error": "Site block not found"}
131 self.response(to, res)
132
133 # Include
134 @flag.no_multiuser
135 def actionFilterIncludeAdd(self, to, inner_path, description=None, address=None):
136 if address:
137 if "ADMIN" not in self.getPermissions(to):
138 return self.response(to, {"error": "Forbidden: Only ADMIN sites can manage different site include"})
139 site = self.server.sites[address]
140 else:
141 address = self.site.address
142 site = self.site
143
144 if "ADMIN" in self.getPermissions(to):
145 self.cbFilterIncludeAdd(to, True, address, inner_path, description)
146 else:
147 content = site.storage.loadJson(inner_path)
148 title = _["New shared global content filter: <b>%s</b> (%s sites, %s users)"] % (
149 html.escape(inner_path), len(content.get("siteblocks", {})), len(content.get("mutes", {}))
150 )
151
152 self.cmd(
153 "confirm",
154 [title, "Add"],
155 lambda res: self.cbFilterIncludeAdd(to, res, address, inner_path, description)
156 )
157
158 def cbFilterIncludeAdd(self, to, res, address, inner_path, description):
159 if not res:
160 self.response(to, res)
161 return False
162
163 filter_storage.includeAdd(address, inner_path, description)
164 self.response(to, "ok")
165
166 @flag.no_multiuser
167 def actionFilterIncludeRemove(self, to, inner_path, address=None):
168 if address:
169 if "ADMIN" not in self.getPermissions(to):
170 return self.response(to, {"error": "Forbidden: Only ADMIN sites can manage different site include"})
171 else:
172 address = self.site.address
173
174 key = "%s/%s" % (address, inner_path)
175 if key not in filter_storage.file_content["includes"]:
176 self.response(to, {"error": "Include not found"})
177 filter_storage.includeRemove(address, inner_path)
178 self.response(to, "ok")
179
180 def actionFilterIncludeList(self, to, all_sites=False, filters=False):
181 if all_sites and "ADMIN" not in self.getPermissions(to):
182 return self.response(to, {"error": "Forbidden: Only ADMIN sites can list all sites includes"})
183
184 back = []
185 includes = filter_storage.file_content.get("includes", {}).values()
186 for include in includes:
187 if not all_sites and include["address"] != self.site.address:
188 continue
189 if filters:
190 include = dict(include) # Don't modify original file_content
191 include_site = filter_storage.site_manager.get(include["address"])
192 if not include_site:
193 continue
194 content = include_site.storage.loadJson(include["inner_path"])
195 include["mutes"] = content.get("mutes", {})
196 include["siteblocks"] = content.get("siteblocks", {})
197 back.append(include)
198 self.response(to, back)
199
200
201@PluginManager.registerTo("SiteStorage")
202class SiteStoragePlugin(object):
203 def updateDbFile(self, inner_path, file=None, cur=None):
204 if file is not False: # File deletion always allowed
205 # Find for bitcoin addresses in file path
206 matches = re.findall("/(1[A-Za-z0-9]{26,35})/", inner_path)
207 # Check if any of the adresses are in the mute list
208 for auth_address in matches:
209 if filter_storage.isMuted(auth_address):
210 self.log.debug("Mute match: %s, ignoring %s" % (auth_address, inner_path))
211 return False
212
213 return super(SiteStoragePlugin, self).updateDbFile(inner_path, file=file, cur=cur)
214
215 def onUpdated(self, inner_path, file=None):
216 file_path = "%s/%s" % (self.site.address, inner_path)
217 if file_path in filter_storage.file_content["includes"]:
218 self.log.debug("Filter file updated: %s" % inner_path)
219 filter_storage.includeUpdateAll()
220 return super(SiteStoragePlugin, self).onUpdated(inner_path, file=file)
221
222
223@PluginManager.registerTo("UiRequest")
224class UiRequestPlugin(object):
225 def actionWrapper(self, path, extra_headers=None):
226 match = re.match(r"/(?P<address>[A-Za-z0-9\._-]+)(?P<inner_path>/.*|$)", path)
227 if not match:
228 return False
229 address = match.group("address")
230
231 if self.server.site_manager.get(address): # Site already exists
232 return super(UiRequestPlugin, self).actionWrapper(path, extra_headers)
233
234 if self.isDomain(address):
235 address = self.resolveDomain(address)
236
237 if address:
238 address_hashed = filter_storage.getSiteAddressHashed(address)
239 else:
240 address_hashed = None
241
242 if filter_storage.isSiteblocked(address) or filter_storage.isSiteblocked(address_hashed):
243 site = self.server.site_manager.get(config.homepage)
244 if not extra_headers:
245 extra_headers = {}
246
247 script_nonce = self.getScriptNonce()
248
249 self.sendHeader(extra_headers=extra_headers, script_nonce=script_nonce)
250 return iter([super(UiRequestPlugin, self).renderWrapper(
251 site, path, "uimedia/plugins/contentfilter/blocklisted.html?address=" + address,
252 "Blacklisted site", extra_headers, show_loadingscreen=False, script_nonce=script_nonce
253 )])
254 else:
255 return super(UiRequestPlugin, self).actionWrapper(path, extra_headers)
256
257 def actionUiMedia(self, path, *args, **kwargs):
258 if path.startswith("/uimedia/plugins/contentfilter/"):
259 file_path = path.replace("/uimedia/plugins/contentfilter/", plugin_dir + "/media/")
260 return self.actionFile(file_path)
261 else:
262 return super(UiRequestPlugin, self).actionUiMedia(path)