Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 239 lines 8.5 kB view raw
1import time 2 3import gevent 4import gevent.lock 5 6from Debug import Debug 7from Config import config 8from Content.ContentManager import VerifyError 9 10 11class WorkerDownloadError(Exception): 12 pass 13 14 15class WorkerIOError(Exception): 16 pass 17 18 19class WorkerStop(Exception): 20 pass 21 22 23class Worker(object): 24 25 def __init__(self, manager, peer): 26 self.manager = manager 27 self.peer = peer 28 self.task = None 29 self.key = None 30 self.running = False 31 self.thread = None 32 self.num_downloaded = 0 33 self.num_failed = 0 34 35 def __str__(self): 36 return "Worker %s %s" % (self.manager.site.address_short, self.key) 37 38 def __repr__(self): 39 return "<%s>" % self.__str__() 40 41 def waitForTask(self, task, timeout): # Wait for other workers to finish the task 42 for sleep_i in range(1, timeout * 10): 43 time.sleep(0.1) 44 if task["done"] or task["workers_num"] == 0: 45 if config.verbose: 46 self.manager.log.debug("%s: %s, picked task free after %ss sleep. (done: %s)" % ( 47 self.key, task["inner_path"], 0.1 * sleep_i, task["done"] 48 )) 49 break 50 51 if sleep_i % 10 == 0: 52 workers = self.manager.findWorkers(task) 53 if not workers or not workers[0].peer.connection: 54 break 55 worker_idle = time.time() - workers[0].peer.connection.last_recv_time 56 if worker_idle > 1: 57 if config.verbose: 58 self.manager.log.debug("%s: %s, worker %s seems idle, picked up task after %ss sleep. (done: %s)" % ( 59 self.key, task["inner_path"], workers[0].key, 0.1 * sleep_i, task["done"] 60 )) 61 break 62 return True 63 64 def pickTask(self): # Find and select a new task for the worker 65 task = self.manager.getTask(self.peer) 66 if not task: # No more task 67 time.sleep(0.1) # Wait a bit for new tasks 68 task = self.manager.getTask(self.peer) 69 if not task: # Still no task, stop it 70 stats = "downloaded files: %s, failed: %s" % (self.num_downloaded, self.num_failed) 71 self.manager.log.debug("%s: No task found, stopping (%s)" % (self.key, stats)) 72 return False 73 74 if not task["time_started"]: 75 task["time_started"] = time.time() # Task started now 76 77 if task["workers_num"] > 0: # Wait a bit if someone already working on it 78 if task["peers"]: # It's an update 79 timeout = 3 80 else: 81 timeout = 1 82 83 if task["size"] > 100 * 1024 * 1024: 84 timeout = timeout * 2 85 86 if config.verbose: 87 self.manager.log.debug("%s: Someone already working on %s (pri: %s), sleeping %s sec..." % ( 88 self.key, task["inner_path"], task["priority"], timeout 89 )) 90 91 self.waitForTask(task, timeout) 92 return task 93 94 def downloadTask(self, task): 95 try: 96 buff = self.peer.getFile(task["site"].address, task["inner_path"], task["size"]) 97 except Exception as err: 98 self.manager.log.debug("%s: getFile error: %s" % (self.key, err)) 99 raise WorkerDownloadError(str(err)) 100 101 if not buff: 102 raise WorkerDownloadError("No response") 103 104 return buff 105 106 def getTaskLock(self, task): 107 if task["lock"] is None: 108 task["lock"] = gevent.lock.Semaphore() 109 return task["lock"] 110 111 def writeTask(self, task, buff): 112 buff.seek(0) 113 try: 114 task["site"].storage.write(task["inner_path"], buff) 115 except Exception as err: 116 if type(err) == Debug.Notify: 117 self.manager.log.debug("%s: Write aborted: %s (%s: %s)" % (self.key, task["inner_path"], type(err), err)) 118 else: 119 self.manager.log.error("%s: Error writing: %s (%s: %s)" % (self.key, task["inner_path"], type(err), err)) 120 raise WorkerIOError(str(err)) 121 122 def onTaskVerifyFail(self, task, error_message): 123 self.num_failed += 1 124 if self.manager.started_task_num < 50 or config.verbose: 125 self.manager.log.debug( 126 "%s: Verify failed: %s, error: %s, failed peers: %s, workers: %s" % 127 (self.key, task["inner_path"], error_message, len(task["failed"]), task["workers_num"]) 128 ) 129 task["failed"].append(self.peer) 130 self.peer.hash_failed += 1 131 if self.peer.hash_failed >= max(len(self.manager.tasks), 3) or self.peer.connection_error > 10: 132 # Broken peer: More fails than tasks number but atleast 3 133 raise WorkerStop( 134 "Too many errors (hash failed: %s, connection error: %s)" % 135 (self.peer.hash_failed, self.peer.connection_error) 136 ) 137 138 def handleTask(self, task): 139 download_err = write_err = False 140 141 write_lock = None 142 try: 143 buff = self.downloadTask(task) 144 145 if task["done"] is True: # Task done, try to find new one 146 return None 147 148 if self.running is False: # Worker no longer needed or got killed 149 self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"])) 150 raise WorkerStop("Running got disabled") 151 152 write_lock = self.getTaskLock(task) 153 write_lock.acquire() 154 if task["site"].content_manager.verifyFile(task["inner_path"], buff) is None: 155 is_same = True 156 else: 157 is_same = False 158 is_valid = True 159 except (WorkerDownloadError, VerifyError) as err: 160 download_err = err 161 is_valid = False 162 is_same = False 163 164 if is_valid and not is_same: 165 if self.manager.started_task_num < 50 or task["priority"] > 10 or config.verbose: 166 self.manager.log.debug("%s: Verify correct: %s" % (self.key, task["inner_path"])) 167 try: 168 self.writeTask(task, buff) 169 except WorkerIOError as err: 170 write_err = err 171 172 if not task["done"]: 173 if write_err: 174 self.manager.failTask(task, reason="Write error") 175 self.num_failed += 1 176 self.manager.log.error("%s: Error writing %s: %s" % (self.key, task["inner_path"], write_err)) 177 elif is_valid: 178 self.manager.doneTask(task) 179 self.num_downloaded += 1 180 181 if write_lock is not None and write_lock.locked(): 182 write_lock.release() 183 184 if not is_valid: 185 self.onTaskVerifyFail(task, download_err) 186 time.sleep(1) 187 return False 188 189 return True 190 191 def downloader(self): 192 self.peer.hash_failed = 0 # Reset hash error counter 193 while self.running: 194 # Try to pickup free file download task 195 task = self.pickTask() 196 197 if not task: 198 break 199 200 if task["done"]: 201 continue 202 203 self.task = task 204 205 self.manager.addTaskWorker(task, self) 206 207 try: 208 success = self.handleTask(task) 209 except WorkerStop as err: 210 self.manager.log.debug("%s: Worker stopped: %s" % (self.key, err)) 211 self.manager.removeTaskWorker(task, self) 212 break 213 214 self.manager.removeTaskWorker(task, self) 215 216 self.peer.onWorkerDone() 217 self.running = False 218 self.manager.removeWorker(self) 219 220 # Start the worker 221 def start(self): 222 self.running = True 223 self.thread = gevent.spawn(self.downloader) 224 225 # Skip current task 226 def skip(self, reason="Unknown"): 227 self.manager.log.debug("%s: Force skipping (reason: %s)" % (self.key, reason)) 228 if self.thread: 229 self.thread.kill(exception=Debug.createNotifyType("Worker skipping (reason: %s)" % reason)) 230 self.start() 231 232 # Force stop the worker 233 def stop(self, reason="Unknown"): 234 self.manager.log.debug("%s: Force stopping (reason: %s)" % (self.key, reason)) 235 self.running = False 236 if self.thread: 237 self.thread.kill(exception=Debug.createNotifyType("Worker stopped (reason: %s)" % reason)) 238 del self.thread 239 self.manager.removeWorker(self)