Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2
3import gevent
4import gevent.lock
5
6from Debug import Debug
7from Config import config
8from Content.ContentManager import VerifyError
9
10
11class WorkerDownloadError(Exception):
12 pass
13
14
15class WorkerIOError(Exception):
16 pass
17
18
19class WorkerStop(Exception):
20 pass
21
22
23class Worker(object):
24
25 def __init__(self, manager, peer):
26 self.manager = manager
27 self.peer = peer
28 self.task = None
29 self.key = None
30 self.running = False
31 self.thread = None
32 self.num_downloaded = 0
33 self.num_failed = 0
34
35 def __str__(self):
36 return "Worker %s %s" % (self.manager.site.address_short, self.key)
37
38 def __repr__(self):
39 return "<%s>" % self.__str__()
40
41 def waitForTask(self, task, timeout): # Wait for other workers to finish the task
42 for sleep_i in range(1, timeout * 10):
43 time.sleep(0.1)
44 if task["done"] or task["workers_num"] == 0:
45 if config.verbose:
46 self.manager.log.debug("%s: %s, picked task free after %ss sleep. (done: %s)" % (
47 self.key, task["inner_path"], 0.1 * sleep_i, task["done"]
48 ))
49 break
50
51 if sleep_i % 10 == 0:
52 workers = self.manager.findWorkers(task)
53 if not workers or not workers[0].peer.connection:
54 break
55 worker_idle = time.time() - workers[0].peer.connection.last_recv_time
56 if worker_idle > 1:
57 if config.verbose:
58 self.manager.log.debug("%s: %s, worker %s seems idle, picked up task after %ss sleep. (done: %s)" % (
59 self.key, task["inner_path"], workers[0].key, 0.1 * sleep_i, task["done"]
60 ))
61 break
62 return True
63
64 def pickTask(self): # Find and select a new task for the worker
65 task = self.manager.getTask(self.peer)
66 if not task: # No more task
67 time.sleep(0.1) # Wait a bit for new tasks
68 task = self.manager.getTask(self.peer)
69 if not task: # Still no task, stop it
70 stats = "downloaded files: %s, failed: %s" % (self.num_downloaded, self.num_failed)
71 self.manager.log.debug("%s: No task found, stopping (%s)" % (self.key, stats))
72 return False
73
74 if not task["time_started"]:
75 task["time_started"] = time.time() # Task started now
76
77 if task["workers_num"] > 0: # Wait a bit if someone already working on it
78 if task["peers"]: # It's an update
79 timeout = 3
80 else:
81 timeout = 1
82
83 if task["size"] > 100 * 1024 * 1024:
84 timeout = timeout * 2
85
86 if config.verbose:
87 self.manager.log.debug("%s: Someone already working on %s (pri: %s), sleeping %s sec..." % (
88 self.key, task["inner_path"], task["priority"], timeout
89 ))
90
91 self.waitForTask(task, timeout)
92 return task
93
94 def downloadTask(self, task):
95 try:
96 buff = self.peer.getFile(task["site"].address, task["inner_path"], task["size"])
97 except Exception as err:
98 self.manager.log.debug("%s: getFile error: %s" % (self.key, err))
99 raise WorkerDownloadError(str(err))
100
101 if not buff:
102 raise WorkerDownloadError("No response")
103
104 return buff
105
106 def getTaskLock(self, task):
107 if task["lock"] is None:
108 task["lock"] = gevent.lock.Semaphore()
109 return task["lock"]
110
111 def writeTask(self, task, buff):
112 buff.seek(0)
113 try:
114 task["site"].storage.write(task["inner_path"], buff)
115 except Exception as err:
116 if type(err) == Debug.Notify:
117 self.manager.log.debug("%s: Write aborted: %s (%s: %s)" % (self.key, task["inner_path"], type(err), err))
118 else:
119 self.manager.log.error("%s: Error writing: %s (%s: %s)" % (self.key, task["inner_path"], type(err), err))
120 raise WorkerIOError(str(err))
121
122 def onTaskVerifyFail(self, task, error_message):
123 self.num_failed += 1
124 if self.manager.started_task_num < 50 or config.verbose:
125 self.manager.log.debug(
126 "%s: Verify failed: %s, error: %s, failed peers: %s, workers: %s" %
127 (self.key, task["inner_path"], error_message, len(task["failed"]), task["workers_num"])
128 )
129 task["failed"].append(self.peer)
130 self.peer.hash_failed += 1
131 if self.peer.hash_failed >= max(len(self.manager.tasks), 3) or self.peer.connection_error > 10:
132 # Broken peer: More fails than tasks number but atleast 3
133 raise WorkerStop(
134 "Too many errors (hash failed: %s, connection error: %s)" %
135 (self.peer.hash_failed, self.peer.connection_error)
136 )
137
138 def handleTask(self, task):
139 download_err = write_err = False
140
141 write_lock = None
142 try:
143 buff = self.downloadTask(task)
144
145 if task["done"] is True: # Task done, try to find new one
146 return None
147
148 if self.running is False: # Worker no longer needed or got killed
149 self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"]))
150 raise WorkerStop("Running got disabled")
151
152 write_lock = self.getTaskLock(task)
153 write_lock.acquire()
154 if task["site"].content_manager.verifyFile(task["inner_path"], buff) is None:
155 is_same = True
156 else:
157 is_same = False
158 is_valid = True
159 except (WorkerDownloadError, VerifyError) as err:
160 download_err = err
161 is_valid = False
162 is_same = False
163
164 if is_valid and not is_same:
165 if self.manager.started_task_num < 50 or task["priority"] > 10 or config.verbose:
166 self.manager.log.debug("%s: Verify correct: %s" % (self.key, task["inner_path"]))
167 try:
168 self.writeTask(task, buff)
169 except WorkerIOError as err:
170 write_err = err
171
172 if not task["done"]:
173 if write_err:
174 self.manager.failTask(task, reason="Write error")
175 self.num_failed += 1
176 self.manager.log.error("%s: Error writing %s: %s" % (self.key, task["inner_path"], write_err))
177 elif is_valid:
178 self.manager.doneTask(task)
179 self.num_downloaded += 1
180
181 if write_lock is not None and write_lock.locked():
182 write_lock.release()
183
184 if not is_valid:
185 self.onTaskVerifyFail(task, download_err)
186 time.sleep(1)
187 return False
188
189 return True
190
191 def downloader(self):
192 self.peer.hash_failed = 0 # Reset hash error counter
193 while self.running:
194 # Try to pickup free file download task
195 task = self.pickTask()
196
197 if not task:
198 break
199
200 if task["done"]:
201 continue
202
203 self.task = task
204
205 self.manager.addTaskWorker(task, self)
206
207 try:
208 success = self.handleTask(task)
209 except WorkerStop as err:
210 self.manager.log.debug("%s: Worker stopped: %s" % (self.key, err))
211 self.manager.removeTaskWorker(task, self)
212 break
213
214 self.manager.removeTaskWorker(task, self)
215
216 self.peer.onWorkerDone()
217 self.running = False
218 self.manager.removeWorker(self)
219
220 # Start the worker
221 def start(self):
222 self.running = True
223 self.thread = gevent.spawn(self.downloader)
224
225 # Skip current task
226 def skip(self, reason="Unknown"):
227 self.manager.log.debug("%s: Force skipping (reason: %s)" % (self.key, reason))
228 if self.thread:
229 self.thread.kill(exception=Debug.createNotifyType("Worker skipping (reason: %s)" % reason))
230 self.start()
231
232 # Force stop the worker
233 def stop(self, reason="Unknown"):
234 self.manager.log.debug("%s: Force stopping (reason: %s)" % (self.key, reason))
235 self.running = False
236 if self.thread:
237 self.thread.kill(exception=Debug.createNotifyType("Worker stopped (reason: %s)" % reason))
238 del self.thread
239 self.manager.removeWorker(self)