Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import sys
2import os
3import re
4from Config import config
5
6
7# Non fatal exception
8class Notify(Exception):
9 def __init__(self, message=None):
10 if message:
11 self.message = message
12
13 def __str__(self):
14 return self.message
15
16
17# Gevent greenlet.kill accept Exception type
18def createNotifyType(message):
19 return type("Notify", (Notify, ), {"message": message})
20
21
22def formatExceptionMessage(err):
23 err_type = err.__class__.__name__
24 if err.args:
25 err_message = err.args[-1]
26 else:
27 err_message = err.__str__()
28 return "%s: %s" % (err_type, err_message)
29
30
31python_lib_dirs = [path.replace("\\", "/") for path in sys.path if re.sub(r".*[\\/]", "", path) in ("site-packages", "dist-packages")]
32python_lib_dirs.append(os.path.dirname(os.__file__).replace("\\", "/")) # TODO: check if returns the correct path for PyPy
33
34root_dir = os.path.realpath(os.path.dirname(__file__) + "/../../")
35root_dir = root_dir.replace("\\", "/")
36
37
38def formatTraceback(items, limit=None, fold_builtin=True):
39 back = []
40 i = 0
41 prev_file_title = ""
42 is_prev_builtin = False
43
44 for path, line in items:
45 i += 1
46 is_last = i == len(items)
47 path = path.replace("\\", "/")
48
49 if path.startswith("src/gevent/"):
50 file_title = "<gevent>/" + path[len("src/gevent/"):]
51 is_builtin = True
52 is_skippable_builtin = False
53 elif path in ("<frozen importlib._bootstrap>", "<frozen importlib._bootstrap_external>"):
54 file_title = "(importlib)"
55 is_builtin = True
56 is_skippable_builtin = True
57 else:
58 is_skippable_builtin = False
59 for base in python_lib_dirs:
60 if path.startswith(base + "/"):
61 file_title = path[len(base + "/"):]
62 module_name, *tail = file_title.split("/")
63 if module_name.endswith(".py"):
64 module_name = module_name[:-3]
65 file_title = "/".join(["<%s>" % module_name] + tail)
66 is_builtin = True
67 break
68 else:
69 is_builtin = False
70 for base in (root_dir + "/src", root_dir + "/plugins", root_dir):
71 if path.startswith(base + "/"):
72 file_title = path[len(base + "/"):]
73 break
74 else:
75 # For unknown paths, do our best to hide absolute path
76 file_title = path
77 for needle in ("/zeronet/", "/core/"):
78 if needle in file_title.lower():
79 file_title = "?/" + file_title[file_title.lower().rindex(needle) + len(needle):]
80
81 # Path compression: A/AB/ABC/X/Y.py -> ABC/X/Y.py
82 # E.g.: in 'Db/DbCursor.py' the directory part is unnecessary
83 if not file_title.startswith("/"):
84 prev_part = ""
85 for i, part in enumerate(file_title.split("/") + [""]):
86 if not part.startswith(prev_part):
87 break
88 prev_part = part
89 file_title = "/".join(file_title.split("/")[i - 1:])
90
91 if is_skippable_builtin and fold_builtin:
92 pass
93 elif is_builtin and is_prev_builtin and not is_last and fold_builtin:
94 if back[-1] != "...":
95 back.append("...")
96 else:
97 if file_title == prev_file_title:
98 back.append("%s" % line)
99 else:
100 back.append("%s line %s" % (file_title, line))
101
102 prev_file_title = file_title
103 is_prev_builtin = is_builtin
104
105 if limit and i >= limit:
106 back.append("...")
107 break
108 return back
109
110
111def formatException(err=None, format="text"):
112 import traceback
113 if type(err) == Notify:
114 return err
115 elif type(err) == tuple and err and err[0] is not None: # Passed trackeback info
116 exc_type, exc_obj, exc_tb = err
117 err = None
118 else: # No trackeback info passed, get latest
119 exc_type, exc_obj, exc_tb = sys.exc_info()
120
121 if not err:
122 if hasattr(err, "message"):
123 err = exc_obj.message
124 else:
125 err = exc_obj
126
127 tb = formatTraceback([[frame[0], frame[1]] for frame in traceback.extract_tb(exc_tb)])
128 if format == "html":
129 return "%s: %s<br><small class='multiline'>%s</small>" % (repr(err), err, " > ".join(tb))
130 else:
131 return "%s: %s in %s" % (exc_type.__name__, err, " > ".join(tb))
132
133
134def formatStack(limit=None):
135 import inspect
136 tb = formatTraceback([[frame[1], frame[2]] for frame in inspect.stack()[1:]], limit=limit)
137 return " > ".join(tb)
138
139
140# Test if gevent eventloop blocks
141import logging
142import gevent
143import time
144
145
146num_block = 0
147
148
149def testBlock():
150 global num_block
151 logging.debug("Gevent block checker started")
152 last_time = time.time()
153 while 1:
154 time.sleep(1)
155 if time.time() - last_time > 1.1:
156 logging.debug("Gevent block detected: %.3fs" % (time.time() - last_time - 1))
157 num_block += 1
158 last_time = time.time()
159
160
161gevent.spawn(testBlock)
162
163
164if __name__ == "__main__":
165 try:
166 print(1 / 0)
167 except Exception as err:
168 print(type(err).__name__)
169 print("1/0 error: %s" % formatException(err))
170
171 def loadJson():
172 json.loads("Errr")
173
174 import json
175 try:
176 loadJson()
177 except Exception as err:
178 print(err)
179 print("Json load error: %s" % formatException(err))
180
181 try:
182 raise Notify("nothing...")
183 except Exception as err:
184 print("Notify: %s" % formatException(err))
185
186 loadJson()