Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 600 lines 26 kB view raw
1import time 2import logging 3import collections 4 5import gevent 6 7from .Worker import Worker 8from .WorkerTaskManager import WorkerTaskManager 9from Config import config 10from util import helper 11from Plugin import PluginManager 12from Debug.DebugLock import DebugLock 13import util 14 15 16@PluginManager.acceptPlugins 17class WorkerManager(object): 18 19 def __init__(self, site): 20 self.site = site 21 self.workers = {} # Key: ip:port, Value: Worker.Worker 22 self.tasks = WorkerTaskManager() 23 self.next_task_id = 1 24 self.lock_add_task = DebugLock(name="Lock AddTask:%s" % self.site.address_short) 25 # {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None, 26 # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids, "lock": None or gevent.lock.RLock} 27 self.started_task_num = 0 # Last added task num 28 self.asked_peers = [] 29 self.running = True 30 self.time_task_added = 0 31 self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short) 32 self.site.greenlet_manager.spawn(self.checkTasks) 33 34 def __str__(self): 35 return "WorkerManager %s" % self.site.address_short 36 37 def __repr__(self): 38 return "<%s>" % self.__str__() 39 40 # Check expired tasks 41 def checkTasks(self): 42 while self.running: 43 tasks = task = worker = workers = None # Cleanup local variables 44 announced = False 45 time.sleep(15) # Check every 15 sec 46 47 # Clean up workers 48 for worker in list(self.workers.values()): 49 if worker.task and worker.task["done"]: 50 worker.skip(reason="Task done") # Stop workers with task done 51 52 if not self.tasks: 53 continue 54 55 tasks = self.tasks[:] # Copy it so removing elements wont cause any problem 56 num_tasks_started = len([task for task in tasks if task["time_started"]]) 57 58 self.log.debug( 59 "Tasks: %s, started: %s, bad files: %s, total started: %s" % 60 (len(tasks), num_tasks_started, len(self.site.bad_files), self.started_task_num) 61 ) 62 63 for task in tasks: 64 if task["time_started"] and time.time() >= task["time_started"] + 60: 65 self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it 66 # Skip to next file workers 67 workers = self.findWorkers(task) 68 if workers: 69 for worker in workers: 70 worker.skip(reason="Task timeout") 71 else: 72 self.failTask(task, reason="No workers") 73 74 elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left 75 self.failTask(task, reason="Timeout") 76 77 elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers: 78 # Find more workers: Task started more than 15 sec ago or no workers 79 workers = self.findWorkers(task) 80 self.log.debug( 81 "Slow task: %s, (workers: %s, optional_hash_id: %s, peers: %s, failed: %s, asked: %s)" % 82 ( 83 task["inner_path"], len(workers), task["optional_hash_id"], 84 len(task["peers"] or []), len(task["failed"]), len(self.asked_peers) 85 ) 86 ) 87 if not announced and task["site"].isAddedRecently(): 88 task["site"].announce(mode="more") # Find more peers 89 announced = True 90 if task["optional_hash_id"]: 91 if self.workers: 92 if not task["time_started"]: 93 ask_limit = 20 94 else: 95 ask_limit = max(10, time.time() - task["time_started"]) 96 if len(self.asked_peers) < ask_limit and len(task["peers"] or []) <= len(task["failed"]) * 2: 97 # Re-search for high priority 98 self.startFindOptional(find_more=True) 99 if task["peers"]: 100 peers_try = [peer for peer in task["peers"] if peer not in task["failed"] and peer not in workers] 101 if peers_try: 102 self.startWorkers(peers_try, force_num=5, reason="Task checker (optional, has peers)") 103 else: 104 self.startFindOptional(find_more=True) 105 else: 106 self.startFindOptional(find_more=True) 107 else: 108 if task["peers"]: # Release the peer lock 109 self.log.debug("Task peer lock release: %s" % task["inner_path"]) 110 task["peers"] = [] 111 self.startWorkers(reason="Task checker") 112 113 if len(self.tasks) > len(self.workers) * 2 and len(self.workers) < self.getMaxWorkers(): 114 self.startWorkers(reason="Task checker (need more workers)") 115 116 self.log.debug("checkTasks stopped running") 117 118 # Returns the next free or less worked task 119 def getTask(self, peer): 120 for task in self.tasks: # Find a task 121 if task["peers"] and peer not in task["peers"]: 122 continue # This peer not allowed to pick this task 123 if peer in task["failed"]: 124 continue # Peer already tried to solve this, but failed 125 if task["optional_hash_id"] and task["peers"] is None: 126 continue # No peers found yet for the optional task 127 if task["done"]: 128 continue 129 return task 130 131 def removeSolvedFileTasks(self, mark_as_good=True): 132 for task in self.tasks[:]: 133 if task["inner_path"] not in self.site.bad_files: 134 self.log.debug("No longer in bad_files, marking as %s: %s" % (mark_as_good, task["inner_path"])) 135 task["done"] = True 136 task["evt"].set(mark_as_good) 137 self.tasks.remove(task) 138 if not self.tasks: 139 self.started_task_num = 0 140 self.site.updateWebsocket() 141 142 # New peers added to site 143 def onPeers(self): 144 self.startWorkers(reason="More peers found") 145 146 def getMaxWorkers(self): 147 if len(self.tasks) > 50: 148 return config.workers * 3 149 else: 150 return config.workers 151 152 # Add new worker 153 def addWorker(self, peer, multiplexing=False, force=False): 154 key = peer.key 155 if len(self.workers) > self.getMaxWorkers() and not force: 156 return False 157 if multiplexing: # Add even if we already have worker for this peer 158 key = "%s/%s" % (key, len(self.workers)) 159 if key not in self.workers: 160 # We dont have worker for that peer and workers num less than max 161 task = self.getTask(peer) 162 if task: 163 worker = Worker(self, peer) 164 self.workers[key] = worker 165 worker.key = key 166 worker.start() 167 return worker 168 else: 169 return False 170 else: # We have worker for this peer or its over the limit 171 return False 172 173 def taskAddPeer(self, task, peer): 174 if task["peers"] is None: 175 task["peers"] = [] 176 if peer in task["failed"]: 177 return False 178 179 if peer not in task["peers"]: 180 task["peers"].append(peer) 181 return True 182 183 # Start workers to process tasks 184 def startWorkers(self, peers=None, force_num=0, reason="Unknown"): 185 if not self.tasks: 186 return False # No task for workers 187 max_workers = min(self.getMaxWorkers(), len(self.site.peers)) 188 if len(self.workers) >= max_workers and not peers: 189 return False # Workers number already maxed and no starting peers defined 190 self.log.debug( 191 "Starting workers (%s), tasks: %s, peers: %s, workers: %s" % 192 (reason, len(self.tasks), len(peers or []), len(self.workers)) 193 ) 194 if not peers: 195 peers = self.site.getConnectedPeers() 196 if len(peers) < max_workers: 197 peers += self.site.getRecentPeers(max_workers * 2) 198 if type(peers) is set: 199 peers = list(peers) 200 201 # Sort by ping 202 peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999) 203 204 for peer in peers: # One worker for every peer 205 if peers and peer not in peers: 206 continue # If peers defined and peer not valid 207 208 if force_num: 209 worker = self.addWorker(peer, force=True) 210 force_num -= 1 211 else: 212 worker = self.addWorker(peer) 213 214 if worker: 215 self.log.debug("Added worker: %s (rep: %s), workers: %s/%s" % (peer.key, peer.reputation, len(self.workers), max_workers)) 216 217 # Find peers for optional hash in local hash tables and add to task peers 218 def findOptionalTasks(self, optional_tasks, reset_task=False): 219 found = collections.defaultdict(list) # { found_hash: [peer1, peer2...], ...} 220 221 for peer in list(self.site.peers.values()): 222 if not peer.has_hashfield: 223 continue 224 225 hashfield_set = set(peer.hashfield) # Finding in set is much faster 226 for task in optional_tasks: 227 optional_hash_id = task["optional_hash_id"] 228 if optional_hash_id in hashfield_set: 229 if reset_task and len(task["failed"]) > 0: 230 task["failed"] = [] 231 if peer in task["failed"]: 232 continue 233 if self.taskAddPeer(task, peer): 234 found[optional_hash_id].append(peer) 235 236 return found 237 238 # Find peers for optional hash ids in local hash tables 239 def findOptionalHashIds(self, optional_hash_ids, limit=0): 240 found = collections.defaultdict(list) # { found_hash_id: [peer1, peer2...], ...} 241 242 for peer in list(self.site.peers.values()): 243 if not peer.has_hashfield: 244 continue 245 246 hashfield_set = set(peer.hashfield) # Finding in set is much faster 247 for optional_hash_id in optional_hash_ids: 248 if optional_hash_id in hashfield_set: 249 found[optional_hash_id].append(peer) 250 if limit and len(found[optional_hash_id]) >= limit: 251 optional_hash_ids.remove(optional_hash_id) 252 253 return found 254 255 # Add peers to tasks from found result 256 def addOptionalPeers(self, found_ips): 257 found = collections.defaultdict(list) 258 for hash_id, peer_ips in found_ips.items(): 259 task = [task for task in self.tasks if task["optional_hash_id"] == hash_id] 260 if task: # Found task, lets take the first 261 task = task[0] 262 else: 263 continue 264 for peer_ip in peer_ips: 265 peer = self.site.addPeer(peer_ip[0], peer_ip[1], return_peer=True, source="optional") 266 if not peer: 267 continue 268 if self.taskAddPeer(task, peer): 269 found[hash_id].append(peer) 270 if peer.hashfield.appendHashId(hash_id): # Peer has this file 271 peer.time_hashfield = None # Peer hashfield probably outdated 272 273 return found 274 275 # Start find peers for optional files 276 @util.Noparallel(blocking=False, ignore_args=True) 277 def startFindOptional(self, reset_task=False, find_more=False, high_priority=False): 278 # Wait for more file requests 279 if len(self.tasks) < 20 or high_priority: 280 time.sleep(0.01) 281 elif len(self.tasks) > 90: 282 time.sleep(5) 283 else: 284 time.sleep(0.5) 285 286 optional_tasks = [task for task in self.tasks if task["optional_hash_id"]] 287 if not optional_tasks: 288 return False 289 optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks]) 290 time_tasks = self.time_task_added 291 292 self.log.debug( 293 "Finding peers for optional files: %s (reset_task: %s, find_more: %s)" % 294 (optional_hash_ids, reset_task, find_more) 295 ) 296 found = self.findOptionalTasks(optional_tasks, reset_task=reset_task) 297 298 if found: 299 found_peers = set([peer for peers in list(found.values()) for peer in peers]) 300 self.startWorkers(found_peers, force_num=3, reason="Optional found in local peers") 301 302 if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.values())): 303 self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found))) 304 305 # Query hashfield from connected peers 306 threads = [] 307 peers = self.site.getConnectedPeers() 308 if not peers: 309 peers = self.site.getConnectablePeers() 310 for peer in peers: 311 threads.append(self.site.greenlet_manager.spawn(peer.updateHashfield, force=find_more)) 312 gevent.joinall(threads, timeout=5) 313 314 if time_tasks != self.time_task_added: # New task added since start 315 optional_tasks = [task for task in self.tasks if task["optional_hash_id"]] 316 optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks]) 317 318 found = self.findOptionalTasks(optional_tasks) 319 self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % ( 320 len(found), len(optional_hash_ids) 321 )) 322 323 if found: 324 found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers]) 325 self.startWorkers(found_peers, force_num=3, reason="Optional found in connected peers") 326 327 if len(found) < len(optional_hash_ids) or find_more: 328 self.log.debug( 329 "No connected hashtable result for optional files: %s (asked: %s)" % 330 (optional_hash_ids - set(found), len(self.asked_peers)) 331 ) 332 if not self.tasks: 333 self.log.debug("No tasks, stopping finding optional peers") 334 return 335 336 # Try to query connected peers 337 threads = [] 338 peers = [peer for peer in self.site.getConnectedPeers() if peer.key not in self.asked_peers][0:10] 339 if not peers: 340 peers = self.site.getConnectablePeers(ignore=self.asked_peers) 341 342 for peer in peers: 343 threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids))) 344 self.asked_peers.append(peer.key) 345 346 for i in range(5): 347 time.sleep(1) 348 349 thread_values = [thread.value for thread in threads if thread.value] 350 if not thread_values: 351 continue 352 353 found_ips = helper.mergeDicts(thread_values) 354 found = self.addOptionalPeers(found_ips) 355 self.log.debug("Found optional files after findhash connected peers: %s/%s (asked: %s)" % ( 356 len(found), len(optional_hash_ids), len(threads) 357 )) 358 359 if found: 360 found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers]) 361 self.startWorkers(found_peers, force_num=3, reason="Optional found by findhash connected peers") 362 363 if len(thread_values) == len(threads): 364 # Got result from all started thread 365 break 366 367 if len(found) < len(optional_hash_ids): 368 self.log.debug( 369 "No findHash result, try random peers: %s (asked: %s)" % 370 (optional_hash_ids - set(found), len(self.asked_peers)) 371 ) 372 # Try to query random peers 373 374 if time_tasks != self.time_task_added: # New task added since start 375 optional_tasks = [task for task in self.tasks if task["optional_hash_id"]] 376 optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks]) 377 378 threads = [] 379 peers = self.site.getConnectablePeers(ignore=self.asked_peers) 380 381 for peer in peers: 382 threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids))) 383 self.asked_peers.append(peer.key) 384 385 gevent.joinall(threads, timeout=15) 386 387 found_ips = helper.mergeDicts([thread.value for thread in threads if thread.value]) 388 found = self.addOptionalPeers(found_ips) 389 self.log.debug("Found optional files after findhash random peers: %s/%s" % (len(found), len(optional_hash_ids))) 390 391 if found: 392 found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers]) 393 self.startWorkers(found_peers, force_num=3, reason="Option found using findhash random peers") 394 395 if len(found) < len(optional_hash_ids): 396 self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found))) 397 398 if time_tasks != self.time_task_added: # New task added since start 399 self.log.debug("New task since start, restarting...") 400 self.site.greenlet_manager.spawnLater(0.1, self.startFindOptional) 401 else: 402 self.log.debug("startFindOptional ended") 403 404 # Stop all worker 405 def stopWorkers(self): 406 num = 0 407 for worker in list(self.workers.values()): 408 worker.stop(reason="Stopping all workers") 409 num += 1 410 tasks = self.tasks[:] # Copy 411 for task in tasks: # Mark all current task as failed 412 self.failTask(task, reason="Stopping all workers") 413 return num 414 415 # Find workers by task 416 def findWorkers(self, task): 417 workers = [] 418 for worker in list(self.workers.values()): 419 if worker.task == task: 420 workers.append(worker) 421 return workers 422 423 # Ends and remove a worker 424 def removeWorker(self, worker): 425 worker.running = False 426 if worker.key in self.workers: 427 del(self.workers[worker.key]) 428 self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers())) 429 if len(self.workers) <= self.getMaxWorkers() / 3 and len(self.asked_peers) < 10: 430 optional_task = next((task for task in self.tasks if task["optional_hash_id"]), None) 431 if optional_task: 432 if len(self.workers) == 0: 433 self.startFindOptional(find_more=True) 434 else: 435 self.startFindOptional() 436 elif self.tasks and not self.workers and worker.task and len(worker.task["failed"]) < 20: 437 self.log.debug("Starting new workers... (tasks: %s)" % len(self.tasks)) 438 self.startWorkers(reason="Removed worker") 439 440 # Tasks sorted by this 441 def getPriorityBoost(self, inner_path): 442 if inner_path == "content.json": 443 return 9999 # Content.json always priority 444 if inner_path == "index.html": 445 return 9998 # index.html also important 446 if "-default" in inner_path: 447 return -4 # Default files are cloning not important 448 elif inner_path.endswith("all.css"): 449 return 14 # boost css files priority 450 elif inner_path.endswith("all.js"): 451 return 13 # boost js files priority 452 elif inner_path.endswith("dbschema.json"): 453 return 12 # boost database specification 454 elif inner_path.endswith("content.json"): 455 return 1 # boost included content.json files priority a bit 456 elif inner_path.endswith(".json"): 457 if len(inner_path) < 50: # Boost non-user json files 458 return 11 459 else: 460 return 2 461 return 0 462 463 def addTaskUpdate(self, task, peer, priority=0): 464 if priority > task["priority"]: 465 self.tasks.updateItem(task, "priority", priority) 466 if peer and task["peers"]: # This peer also has new version, add it to task possible peers 467 task["peers"].append(peer) 468 self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"])) 469 self.startWorkers([peer], reason="Added new task (update received by peer)") 470 elif peer and peer in task["failed"]: 471 task["failed"].remove(peer) # New update arrived, remove the peer from failed peers 472 self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"])) 473 self.startWorkers([peer], reason="Added new task (peer failed before)") 474 475 def addTaskCreate(self, inner_path, peer, priority=0, file_info=None): 476 evt = gevent.event.AsyncResult() 477 if peer: 478 peers = [peer] # Only download from this peer 479 else: 480 peers = None 481 if not file_info: 482 file_info = self.site.content_manager.getFileInfo(inner_path) 483 if file_info and file_info["optional"]: 484 optional_hash_id = helper.toHashId(file_info["sha512"]) 485 else: 486 optional_hash_id = None 487 if file_info: 488 size = file_info.get("size", 0) 489 else: 490 size = 0 491 492 self.lock_add_task.acquire() 493 494 # Check again if we have task for this file 495 task = self.tasks.findTask(inner_path) 496 if task: 497 self.addTaskUpdate(task, peer, priority) 498 return task 499 500 priority += self.getPriorityBoost(inner_path) 501 502 if self.started_task_num == 0: # Boost priority for first requested file 503 priority += 1 504 505 task = { 506 "id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, 507 "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "lock": None, 508 "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size 509 } 510 511 self.tasks.append(task) 512 self.lock_add_task.release() 513 514 self.next_task_id += 1 515 self.started_task_num += 1 516 if config.verbose: 517 self.log.debug( 518 "New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" % 519 (task["inner_path"], peers, priority, optional_hash_id, self.started_task_num) 520 ) 521 522 self.time_task_added = time.time() 523 524 if optional_hash_id: 525 if self.asked_peers: 526 del self.asked_peers[:] # Reset asked peers 527 self.startFindOptional(high_priority=priority > 0) 528 529 if peers: 530 self.startWorkers(peers, reason="Added new optional task") 531 532 else: 533 self.startWorkers(peers, reason="Added new task") 534 return task 535 536 # Create new task and return asyncresult 537 def addTask(self, inner_path, peer=None, priority=0, file_info=None): 538 self.site.onFileStart(inner_path) # First task, trigger site download started 539 task = self.tasks.findTask(inner_path) 540 if task: # Already has task for that file 541 self.addTaskUpdate(task, peer, priority) 542 else: # No task for that file yet 543 task = self.addTaskCreate(inner_path, peer, priority, file_info) 544 return task 545 546 def addTaskWorker(self, task, worker): 547 try: 548 self.tasks.updateItem(task, "workers_num", task["workers_num"] + 1) 549 except ValueError: 550 task["workers_num"] += 1 551 552 def removeTaskWorker(self, task, worker): 553 try: 554 self.tasks.updateItem(task, "workers_num", task["workers_num"] - 1) 555 except ValueError: 556 task["workers_num"] -= 1 557 if len(task["failed"]) >= len(self.workers): 558 fail_reason = "Too many fails: %s (workers: %s)" % (len(task["failed"]), len(self.workers)) 559 self.failTask(task, reason=fail_reason) 560 561 # Wait for other tasks 562 def checkComplete(self): 563 time.sleep(0.1) 564 if not self.tasks: 565 self.log.debug("Check complete: No tasks") 566 self.onComplete() 567 568 def onComplete(self): 569 self.started_task_num = 0 570 del self.asked_peers[:] 571 self.site.onComplete() # No more task trigger site complete 572 573 # Mark a task done 574 def doneTask(self, task): 575 task["done"] = True 576 self.tasks.remove(task) # Remove from queue 577 if task["optional_hash_id"]: 578 self.log.debug( 579 "Downloaded optional file in %.3fs, adding to hashfield: %s" % 580 (time.time() - task["time_started"], task["inner_path"]) 581 ) 582 self.site.content_manager.optionalDownloaded(task["inner_path"], task["optional_hash_id"], task["size"]) 583 self.site.onFileDone(task["inner_path"]) 584 task["evt"].set(True) 585 if not self.tasks: 586 self.site.greenlet_manager.spawn(self.checkComplete) 587 588 # Mark a task failed 589 def failTask(self, task, reason="Unknown"): 590 try: 591 self.tasks.remove(task) # Remove from queue 592 except ValueError as err: 593 return False 594 595 self.log.debug("Task %s failed (Reason: %s)" % (task["inner_path"], reason)) 596 task["done"] = True 597 self.site.onFileFail(task["inner_path"]) 598 task["evt"].set(False) 599 if not self.tasks: 600 self.site.greenlet_manager.spawn(self.checkComplete)