Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 186 lines 5.7 kB view raw
1import sys 2import os 3import re 4from Config import config 5 6 7# Non fatal exception 8class Notify(Exception): 9 def __init__(self, message=None): 10 if message: 11 self.message = message 12 13 def __str__(self): 14 return self.message 15 16 17# Gevent greenlet.kill accept Exception type 18def createNotifyType(message): 19 return type("Notify", (Notify, ), {"message": message}) 20 21 22def formatExceptionMessage(err): 23 err_type = err.__class__.__name__ 24 if err.args: 25 err_message = err.args[-1] 26 else: 27 err_message = err.__str__() 28 return "%s: %s" % (err_type, err_message) 29 30 31python_lib_dirs = [path.replace("\\", "/") for path in sys.path if re.sub(r".*[\\/]", "", path) in ("site-packages", "dist-packages")] 32python_lib_dirs.append(os.path.dirname(os.__file__).replace("\\", "/")) # TODO: check if returns the correct path for PyPy 33 34root_dir = os.path.realpath(os.path.dirname(__file__) + "/../../") 35root_dir = root_dir.replace("\\", "/") 36 37 38def formatTraceback(items, limit=None, fold_builtin=True): 39 back = [] 40 i = 0 41 prev_file_title = "" 42 is_prev_builtin = False 43 44 for path, line in items: 45 i += 1 46 is_last = i == len(items) 47 path = path.replace("\\", "/") 48 49 if path.startswith("src/gevent/"): 50 file_title = "<gevent>/" + path[len("src/gevent/"):] 51 is_builtin = True 52 is_skippable_builtin = False 53 elif path in ("<frozen importlib._bootstrap>", "<frozen importlib._bootstrap_external>"): 54 file_title = "(importlib)" 55 is_builtin = True 56 is_skippable_builtin = True 57 else: 58 is_skippable_builtin = False 59 for base in python_lib_dirs: 60 if path.startswith(base + "/"): 61 file_title = path[len(base + "/"):] 62 module_name, *tail = file_title.split("/") 63 if module_name.endswith(".py"): 64 module_name = module_name[:-3] 65 file_title = "/".join(["<%s>" % module_name] + tail) 66 is_builtin = True 67 break 68 else: 69 is_builtin = False 70 for base in (root_dir + "/src", root_dir + "/plugins", root_dir): 71 if path.startswith(base + "/"): 72 file_title = path[len(base + "/"):] 73 break 74 else: 75 # For unknown paths, do our best to hide absolute path 76 file_title = path 77 for needle in ("/zeronet/", "/core/"): 78 if needle in file_title.lower(): 79 file_title = "?/" + file_title[file_title.lower().rindex(needle) + len(needle):] 80 81 # Path compression: A/AB/ABC/X/Y.py -> ABC/X/Y.py 82 # E.g.: in 'Db/DbCursor.py' the directory part is unnecessary 83 if not file_title.startswith("/"): 84 prev_part = "" 85 for i, part in enumerate(file_title.split("/") + [""]): 86 if not part.startswith(prev_part): 87 break 88 prev_part = part 89 file_title = "/".join(file_title.split("/")[i - 1:]) 90 91 if is_skippable_builtin and fold_builtin: 92 pass 93 elif is_builtin and is_prev_builtin and not is_last and fold_builtin: 94 if back[-1] != "...": 95 back.append("...") 96 else: 97 if file_title == prev_file_title: 98 back.append("%s" % line) 99 else: 100 back.append("%s line %s" % (file_title, line)) 101 102 prev_file_title = file_title 103 is_prev_builtin = is_builtin 104 105 if limit and i >= limit: 106 back.append("...") 107 break 108 return back 109 110 111def formatException(err=None, format="text"): 112 import traceback 113 if type(err) == Notify: 114 return err 115 elif type(err) == tuple and err and err[0] is not None: # Passed trackeback info 116 exc_type, exc_obj, exc_tb = err 117 err = None 118 else: # No trackeback info passed, get latest 119 exc_type, exc_obj, exc_tb = sys.exc_info() 120 121 if not err: 122 if hasattr(err, "message"): 123 err = exc_obj.message 124 else: 125 err = exc_obj 126 127 tb = formatTraceback([[frame[0], frame[1]] for frame in traceback.extract_tb(exc_tb)]) 128 if format == "html": 129 return "%s: %s<br><small class='multiline'>%s</small>" % (repr(err), err, " > ".join(tb)) 130 else: 131 return "%s: %s in %s" % (exc_type.__name__, err, " > ".join(tb)) 132 133 134def formatStack(limit=None): 135 import inspect 136 tb = formatTraceback([[frame[1], frame[2]] for frame in inspect.stack()[1:]], limit=limit) 137 return " > ".join(tb) 138 139 140# Test if gevent eventloop blocks 141import logging 142import gevent 143import time 144 145 146num_block = 0 147 148 149def testBlock(): 150 global num_block 151 logging.debug("Gevent block checker started") 152 last_time = time.time() 153 while 1: 154 time.sleep(1) 155 if time.time() - last_time > 1.1: 156 logging.debug("Gevent block detected: %.3fs" % (time.time() - last_time - 1)) 157 num_block += 1 158 last_time = time.time() 159 160 161gevent.spawn(testBlock) 162 163 164if __name__ == "__main__": 165 try: 166 print(1 / 0) 167 except Exception as err: 168 print(type(err).__name__) 169 print("1/0 error: %s" % formatException(err)) 170 171 def loadJson(): 172 json.loads("Errr") 173 174 import json 175 try: 176 loadJson() 177 except Exception as err: 178 print(err) 179 print("Json load error: %s" % formatException(err)) 180 181 try: 182 raise Notify("nothing...") 183 except Exception as err: 184 print("Notify: %s" % formatException(err)) 185 186 loadJson()