Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import logging
3import collections
4
5import gevent
6
7from .Worker import Worker
8from .WorkerTaskManager import WorkerTaskManager
9from Config import config
10from util import helper
11from Plugin import PluginManager
12from Debug.DebugLock import DebugLock
13import util
14
15
16@PluginManager.acceptPlugins
17class WorkerManager(object):
18
19 def __init__(self, site):
20 self.site = site
21 self.workers = {} # Key: ip:port, Value: Worker.Worker
22 self.tasks = WorkerTaskManager()
23 self.next_task_id = 1
24 self.lock_add_task = DebugLock(name="Lock AddTask:%s" % self.site.address_short)
25 # {"id": 1, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
26 # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids, "lock": None or gevent.lock.RLock}
27 self.started_task_num = 0 # Last added task num
28 self.asked_peers = []
29 self.running = True
30 self.time_task_added = 0
31 self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
32 self.site.greenlet_manager.spawn(self.checkTasks)
33
34 def __str__(self):
35 return "WorkerManager %s" % self.site.address_short
36
37 def __repr__(self):
38 return "<%s>" % self.__str__()
39
40 # Check expired tasks
41 def checkTasks(self):
42 while self.running:
43 tasks = task = worker = workers = None # Cleanup local variables
44 announced = False
45 time.sleep(15) # Check every 15 sec
46
47 # Clean up workers
48 for worker in list(self.workers.values()):
49 if worker.task and worker.task["done"]:
50 worker.skip(reason="Task done") # Stop workers with task done
51
52 if not self.tasks:
53 continue
54
55 tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
56 num_tasks_started = len([task for task in tasks if task["time_started"]])
57
58 self.log.debug(
59 "Tasks: %s, started: %s, bad files: %s, total started: %s" %
60 (len(tasks), num_tasks_started, len(self.site.bad_files), self.started_task_num)
61 )
62
63 for task in tasks:
64 if task["time_started"] and time.time() >= task["time_started"] + 60:
65 self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it
66 # Skip to next file workers
67 workers = self.findWorkers(task)
68 if workers:
69 for worker in workers:
70 worker.skip(reason="Task timeout")
71 else:
72 self.failTask(task, reason="No workers")
73
74 elif time.time() >= task["time_added"] + 60 and not self.workers: # No workers left
75 self.failTask(task, reason="Timeout")
76
77 elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
78 # Find more workers: Task started more than 15 sec ago or no workers
79 workers = self.findWorkers(task)
80 self.log.debug(
81 "Slow task: %s, (workers: %s, optional_hash_id: %s, peers: %s, failed: %s, asked: %s)" %
82 (
83 task["inner_path"], len(workers), task["optional_hash_id"],
84 len(task["peers"] or []), len(task["failed"]), len(self.asked_peers)
85 )
86 )
87 if not announced and task["site"].isAddedRecently():
88 task["site"].announce(mode="more") # Find more peers
89 announced = True
90 if task["optional_hash_id"]:
91 if self.workers:
92 if not task["time_started"]:
93 ask_limit = 20
94 else:
95 ask_limit = max(10, time.time() - task["time_started"])
96 if len(self.asked_peers) < ask_limit and len(task["peers"] or []) <= len(task["failed"]) * 2:
97 # Re-search for high priority
98 self.startFindOptional(find_more=True)
99 if task["peers"]:
100 peers_try = [peer for peer in task["peers"] if peer not in task["failed"] and peer not in workers]
101 if peers_try:
102 self.startWorkers(peers_try, force_num=5, reason="Task checker (optional, has peers)")
103 else:
104 self.startFindOptional(find_more=True)
105 else:
106 self.startFindOptional(find_more=True)
107 else:
108 if task["peers"]: # Release the peer lock
109 self.log.debug("Task peer lock release: %s" % task["inner_path"])
110 task["peers"] = []
111 self.startWorkers(reason="Task checker")
112
113 if len(self.tasks) > len(self.workers) * 2 and len(self.workers) < self.getMaxWorkers():
114 self.startWorkers(reason="Task checker (need more workers)")
115
116 self.log.debug("checkTasks stopped running")
117
118 # Returns the next free or less worked task
119 def getTask(self, peer):
120 for task in self.tasks: # Find a task
121 if task["peers"] and peer not in task["peers"]:
122 continue # This peer not allowed to pick this task
123 if peer in task["failed"]:
124 continue # Peer already tried to solve this, but failed
125 if task["optional_hash_id"] and task["peers"] is None:
126 continue # No peers found yet for the optional task
127 if task["done"]:
128 continue
129 return task
130
131 def removeSolvedFileTasks(self, mark_as_good=True):
132 for task in self.tasks[:]:
133 if task["inner_path"] not in self.site.bad_files:
134 self.log.debug("No longer in bad_files, marking as %s: %s" % (mark_as_good, task["inner_path"]))
135 task["done"] = True
136 task["evt"].set(mark_as_good)
137 self.tasks.remove(task)
138 if not self.tasks:
139 self.started_task_num = 0
140 self.site.updateWebsocket()
141
142 # New peers added to site
143 def onPeers(self):
144 self.startWorkers(reason="More peers found")
145
146 def getMaxWorkers(self):
147 if len(self.tasks) > 50:
148 return config.workers * 3
149 else:
150 return config.workers
151
152 # Add new worker
153 def addWorker(self, peer, multiplexing=False, force=False):
154 key = peer.key
155 if len(self.workers) > self.getMaxWorkers() and not force:
156 return False
157 if multiplexing: # Add even if we already have worker for this peer
158 key = "%s/%s" % (key, len(self.workers))
159 if key not in self.workers:
160 # We dont have worker for that peer and workers num less than max
161 task = self.getTask(peer)
162 if task:
163 worker = Worker(self, peer)
164 self.workers[key] = worker
165 worker.key = key
166 worker.start()
167 return worker
168 else:
169 return False
170 else: # We have worker for this peer or its over the limit
171 return False
172
173 def taskAddPeer(self, task, peer):
174 if task["peers"] is None:
175 task["peers"] = []
176 if peer in task["failed"]:
177 return False
178
179 if peer not in task["peers"]:
180 task["peers"].append(peer)
181 return True
182
183 # Start workers to process tasks
184 def startWorkers(self, peers=None, force_num=0, reason="Unknown"):
185 if not self.tasks:
186 return False # No task for workers
187 max_workers = min(self.getMaxWorkers(), len(self.site.peers))
188 if len(self.workers) >= max_workers and not peers:
189 return False # Workers number already maxed and no starting peers defined
190 self.log.debug(
191 "Starting workers (%s), tasks: %s, peers: %s, workers: %s" %
192 (reason, len(self.tasks), len(peers or []), len(self.workers))
193 )
194 if not peers:
195 peers = self.site.getConnectedPeers()
196 if len(peers) < max_workers:
197 peers += self.site.getRecentPeers(max_workers * 2)
198 if type(peers) is set:
199 peers = list(peers)
200
201 # Sort by ping
202 peers.sort(key=lambda peer: peer.connection.last_ping_delay if peer.connection and peer.connection.last_ping_delay and len(peer.connection.waiting_requests) == 0 and peer.connection.connected else 9999)
203
204 for peer in peers: # One worker for every peer
205 if peers and peer not in peers:
206 continue # If peers defined and peer not valid
207
208 if force_num:
209 worker = self.addWorker(peer, force=True)
210 force_num -= 1
211 else:
212 worker = self.addWorker(peer)
213
214 if worker:
215 self.log.debug("Added worker: %s (rep: %s), workers: %s/%s" % (peer.key, peer.reputation, len(self.workers), max_workers))
216
217 # Find peers for optional hash in local hash tables and add to task peers
218 def findOptionalTasks(self, optional_tasks, reset_task=False):
219 found = collections.defaultdict(list) # { found_hash: [peer1, peer2...], ...}
220
221 for peer in list(self.site.peers.values()):
222 if not peer.has_hashfield:
223 continue
224
225 hashfield_set = set(peer.hashfield) # Finding in set is much faster
226 for task in optional_tasks:
227 optional_hash_id = task["optional_hash_id"]
228 if optional_hash_id in hashfield_set:
229 if reset_task and len(task["failed"]) > 0:
230 task["failed"] = []
231 if peer in task["failed"]:
232 continue
233 if self.taskAddPeer(task, peer):
234 found[optional_hash_id].append(peer)
235
236 return found
237
238 # Find peers for optional hash ids in local hash tables
239 def findOptionalHashIds(self, optional_hash_ids, limit=0):
240 found = collections.defaultdict(list) # { found_hash_id: [peer1, peer2...], ...}
241
242 for peer in list(self.site.peers.values()):
243 if not peer.has_hashfield:
244 continue
245
246 hashfield_set = set(peer.hashfield) # Finding in set is much faster
247 for optional_hash_id in optional_hash_ids:
248 if optional_hash_id in hashfield_set:
249 found[optional_hash_id].append(peer)
250 if limit and len(found[optional_hash_id]) >= limit:
251 optional_hash_ids.remove(optional_hash_id)
252
253 return found
254
255 # Add peers to tasks from found result
256 def addOptionalPeers(self, found_ips):
257 found = collections.defaultdict(list)
258 for hash_id, peer_ips in found_ips.items():
259 task = [task for task in self.tasks if task["optional_hash_id"] == hash_id]
260 if task: # Found task, lets take the first
261 task = task[0]
262 else:
263 continue
264 for peer_ip in peer_ips:
265 peer = self.site.addPeer(peer_ip[0], peer_ip[1], return_peer=True, source="optional")
266 if not peer:
267 continue
268 if self.taskAddPeer(task, peer):
269 found[hash_id].append(peer)
270 if peer.hashfield.appendHashId(hash_id): # Peer has this file
271 peer.time_hashfield = None # Peer hashfield probably outdated
272
273 return found
274
275 # Start find peers for optional files
276 @util.Noparallel(blocking=False, ignore_args=True)
277 def startFindOptional(self, reset_task=False, find_more=False, high_priority=False):
278 # Wait for more file requests
279 if len(self.tasks) < 20 or high_priority:
280 time.sleep(0.01)
281 elif len(self.tasks) > 90:
282 time.sleep(5)
283 else:
284 time.sleep(0.5)
285
286 optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
287 if not optional_tasks:
288 return False
289 optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
290 time_tasks = self.time_task_added
291
292 self.log.debug(
293 "Finding peers for optional files: %s (reset_task: %s, find_more: %s)" %
294 (optional_hash_ids, reset_task, find_more)
295 )
296 found = self.findOptionalTasks(optional_tasks, reset_task=reset_task)
297
298 if found:
299 found_peers = set([peer for peers in list(found.values()) for peer in peers])
300 self.startWorkers(found_peers, force_num=3, reason="Optional found in local peers")
301
302 if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.values())):
303 self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
304
305 # Query hashfield from connected peers
306 threads = []
307 peers = self.site.getConnectedPeers()
308 if not peers:
309 peers = self.site.getConnectablePeers()
310 for peer in peers:
311 threads.append(self.site.greenlet_manager.spawn(peer.updateHashfield, force=find_more))
312 gevent.joinall(threads, timeout=5)
313
314 if time_tasks != self.time_task_added: # New task added since start
315 optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
316 optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
317
318 found = self.findOptionalTasks(optional_tasks)
319 self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % (
320 len(found), len(optional_hash_ids)
321 ))
322
323 if found:
324 found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
325 self.startWorkers(found_peers, force_num=3, reason="Optional found in connected peers")
326
327 if len(found) < len(optional_hash_ids) or find_more:
328 self.log.debug(
329 "No connected hashtable result for optional files: %s (asked: %s)" %
330 (optional_hash_ids - set(found), len(self.asked_peers))
331 )
332 if not self.tasks:
333 self.log.debug("No tasks, stopping finding optional peers")
334 return
335
336 # Try to query connected peers
337 threads = []
338 peers = [peer for peer in self.site.getConnectedPeers() if peer.key not in self.asked_peers][0:10]
339 if not peers:
340 peers = self.site.getConnectablePeers(ignore=self.asked_peers)
341
342 for peer in peers:
343 threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids)))
344 self.asked_peers.append(peer.key)
345
346 for i in range(5):
347 time.sleep(1)
348
349 thread_values = [thread.value for thread in threads if thread.value]
350 if not thread_values:
351 continue
352
353 found_ips = helper.mergeDicts(thread_values)
354 found = self.addOptionalPeers(found_ips)
355 self.log.debug("Found optional files after findhash connected peers: %s/%s (asked: %s)" % (
356 len(found), len(optional_hash_ids), len(threads)
357 ))
358
359 if found:
360 found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
361 self.startWorkers(found_peers, force_num=3, reason="Optional found by findhash connected peers")
362
363 if len(thread_values) == len(threads):
364 # Got result from all started thread
365 break
366
367 if len(found) < len(optional_hash_ids):
368 self.log.debug(
369 "No findHash result, try random peers: %s (asked: %s)" %
370 (optional_hash_ids - set(found), len(self.asked_peers))
371 )
372 # Try to query random peers
373
374 if time_tasks != self.time_task_added: # New task added since start
375 optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
376 optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
377
378 threads = []
379 peers = self.site.getConnectablePeers(ignore=self.asked_peers)
380
381 for peer in peers:
382 threads.append(self.site.greenlet_manager.spawn(peer.findHashIds, list(optional_hash_ids)))
383 self.asked_peers.append(peer.key)
384
385 gevent.joinall(threads, timeout=15)
386
387 found_ips = helper.mergeDicts([thread.value for thread in threads if thread.value])
388 found = self.addOptionalPeers(found_ips)
389 self.log.debug("Found optional files after findhash random peers: %s/%s" % (len(found), len(optional_hash_ids)))
390
391 if found:
392 found_peers = set([peer for hash_id_peers in list(found.values()) for peer in hash_id_peers])
393 self.startWorkers(found_peers, force_num=3, reason="Option found using findhash random peers")
394
395 if len(found) < len(optional_hash_ids):
396 self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
397
398 if time_tasks != self.time_task_added: # New task added since start
399 self.log.debug("New task since start, restarting...")
400 self.site.greenlet_manager.spawnLater(0.1, self.startFindOptional)
401 else:
402 self.log.debug("startFindOptional ended")
403
404 # Stop all worker
405 def stopWorkers(self):
406 num = 0
407 for worker in list(self.workers.values()):
408 worker.stop(reason="Stopping all workers")
409 num += 1
410 tasks = self.tasks[:] # Copy
411 for task in tasks: # Mark all current task as failed
412 self.failTask(task, reason="Stopping all workers")
413 return num
414
415 # Find workers by task
416 def findWorkers(self, task):
417 workers = []
418 for worker in list(self.workers.values()):
419 if worker.task == task:
420 workers.append(worker)
421 return workers
422
423 # Ends and remove a worker
424 def removeWorker(self, worker):
425 worker.running = False
426 if worker.key in self.workers:
427 del(self.workers[worker.key])
428 self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers()))
429 if len(self.workers) <= self.getMaxWorkers() / 3 and len(self.asked_peers) < 10:
430 optional_task = next((task for task in self.tasks if task["optional_hash_id"]), None)
431 if optional_task:
432 if len(self.workers) == 0:
433 self.startFindOptional(find_more=True)
434 else:
435 self.startFindOptional()
436 elif self.tasks and not self.workers and worker.task and len(worker.task["failed"]) < 20:
437 self.log.debug("Starting new workers... (tasks: %s)" % len(self.tasks))
438 self.startWorkers(reason="Removed worker")
439
440 # Tasks sorted by this
441 def getPriorityBoost(self, inner_path):
442 if inner_path == "content.json":
443 return 9999 # Content.json always priority
444 if inner_path == "index.html":
445 return 9998 # index.html also important
446 if "-default" in inner_path:
447 return -4 # Default files are cloning not important
448 elif inner_path.endswith("all.css"):
449 return 14 # boost css files priority
450 elif inner_path.endswith("all.js"):
451 return 13 # boost js files priority
452 elif inner_path.endswith("dbschema.json"):
453 return 12 # boost database specification
454 elif inner_path.endswith("content.json"):
455 return 1 # boost included content.json files priority a bit
456 elif inner_path.endswith(".json"):
457 if len(inner_path) < 50: # Boost non-user json files
458 return 11
459 else:
460 return 2
461 return 0
462
463 def addTaskUpdate(self, task, peer, priority=0):
464 if priority > task["priority"]:
465 self.tasks.updateItem(task, "priority", priority)
466 if peer and task["peers"]: # This peer also has new version, add it to task possible peers
467 task["peers"].append(peer)
468 self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
469 self.startWorkers([peer], reason="Added new task (update received by peer)")
470 elif peer and peer in task["failed"]:
471 task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
472 self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
473 self.startWorkers([peer], reason="Added new task (peer failed before)")
474
475 def addTaskCreate(self, inner_path, peer, priority=0, file_info=None):
476 evt = gevent.event.AsyncResult()
477 if peer:
478 peers = [peer] # Only download from this peer
479 else:
480 peers = None
481 if not file_info:
482 file_info = self.site.content_manager.getFileInfo(inner_path)
483 if file_info and file_info["optional"]:
484 optional_hash_id = helper.toHashId(file_info["sha512"])
485 else:
486 optional_hash_id = None
487 if file_info:
488 size = file_info.get("size", 0)
489 else:
490 size = 0
491
492 self.lock_add_task.acquire()
493
494 # Check again if we have task for this file
495 task = self.tasks.findTask(inner_path)
496 if task:
497 self.addTaskUpdate(task, peer, priority)
498 return task
499
500 priority += self.getPriorityBoost(inner_path)
501
502 if self.started_task_num == 0: # Boost priority for first requested file
503 priority += 1
504
505 task = {
506 "id": self.next_task_id, "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
507 "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None, "lock": None,
508 "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
509 }
510
511 self.tasks.append(task)
512 self.lock_add_task.release()
513
514 self.next_task_id += 1
515 self.started_task_num += 1
516 if config.verbose:
517 self.log.debug(
518 "New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" %
519 (task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
520 )
521
522 self.time_task_added = time.time()
523
524 if optional_hash_id:
525 if self.asked_peers:
526 del self.asked_peers[:] # Reset asked peers
527 self.startFindOptional(high_priority=priority > 0)
528
529 if peers:
530 self.startWorkers(peers, reason="Added new optional task")
531
532 else:
533 self.startWorkers(peers, reason="Added new task")
534 return task
535
536 # Create new task and return asyncresult
537 def addTask(self, inner_path, peer=None, priority=0, file_info=None):
538 self.site.onFileStart(inner_path) # First task, trigger site download started
539 task = self.tasks.findTask(inner_path)
540 if task: # Already has task for that file
541 self.addTaskUpdate(task, peer, priority)
542 else: # No task for that file yet
543 task = self.addTaskCreate(inner_path, peer, priority, file_info)
544 return task
545
546 def addTaskWorker(self, task, worker):
547 try:
548 self.tasks.updateItem(task, "workers_num", task["workers_num"] + 1)
549 except ValueError:
550 task["workers_num"] += 1
551
552 def removeTaskWorker(self, task, worker):
553 try:
554 self.tasks.updateItem(task, "workers_num", task["workers_num"] - 1)
555 except ValueError:
556 task["workers_num"] -= 1
557 if len(task["failed"]) >= len(self.workers):
558 fail_reason = "Too many fails: %s (workers: %s)" % (len(task["failed"]), len(self.workers))
559 self.failTask(task, reason=fail_reason)
560
561 # Wait for other tasks
562 def checkComplete(self):
563 time.sleep(0.1)
564 if not self.tasks:
565 self.log.debug("Check complete: No tasks")
566 self.onComplete()
567
568 def onComplete(self):
569 self.started_task_num = 0
570 del self.asked_peers[:]
571 self.site.onComplete() # No more task trigger site complete
572
573 # Mark a task done
574 def doneTask(self, task):
575 task["done"] = True
576 self.tasks.remove(task) # Remove from queue
577 if task["optional_hash_id"]:
578 self.log.debug(
579 "Downloaded optional file in %.3fs, adding to hashfield: %s" %
580 (time.time() - task["time_started"], task["inner_path"])
581 )
582 self.site.content_manager.optionalDownloaded(task["inner_path"], task["optional_hash_id"], task["size"])
583 self.site.onFileDone(task["inner_path"])
584 task["evt"].set(True)
585 if not self.tasks:
586 self.site.greenlet_manager.spawn(self.checkComplete)
587
588 # Mark a task failed
589 def failTask(self, task, reason="Unknown"):
590 try:
591 self.tasks.remove(task) # Remove from queue
592 except ValueError as err:
593 return False
594
595 self.log.debug("Task %s failed (Reason: %s)" % (task["inner_path"], reason))
596 task["done"] = True
597 self.site.onFileFail(task["inner_path"])
598 task["evt"].set(False)
599 if not self.tasks:
600 self.site.greenlet_manager.spawn(self.checkComplete)