Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import sqlite3
2import json
3import time
4import logging
5import re
6import os
7import atexit
8import threading
9import sys
10import weakref
11import errno
12
13import gevent
14
15from Debug import Debug
16from .DbCursor import DbCursor
17from util import SafeRe
18from util import helper
19from util import ThreadPool
20from Config import config
21
22thread_pool_db = ThreadPool.ThreadPool(config.threads_db)
23
24next_db_id = 0
25opened_dbs = []
26
27
28# Close idle databases to save some memory
29def dbCleanup():
30 while 1:
31 time.sleep(60 * 5)
32 for db in opened_dbs[:]:
33 idle = time.time() - db.last_query_time
34 if idle > 60 * 5 and db.close_idle:
35 db.close("Cleanup")
36
37
38def dbCommitCheck():
39 while 1:
40 time.sleep(5)
41 for db in opened_dbs[:]:
42 if not db.need_commit:
43 continue
44
45 success = db.commit("Interval")
46 if success:
47 db.need_commit = False
48 time.sleep(0.1)
49
50
51def dbCloseAll():
52 for db in opened_dbs[:]:
53 db.close("Close all")
54
55
56gevent.spawn(dbCleanup)
57gevent.spawn(dbCommitCheck)
58atexit.register(dbCloseAll)
59
60
61class DbTableError(Exception):
62 def __init__(self, message, table):
63 super().__init__(message)
64 self.table = table
65
66
67class Db(object):
68
69 def __init__(self, schema, db_path, close_idle=False):
70 global next_db_id
71 self.db_path = db_path
72 self.db_dir = os.path.dirname(db_path) + "/"
73 self.schema = schema
74 self.schema["version"] = self.schema.get("version", 1)
75 self.conn = None
76 self.cur = None
77 self.cursors = weakref.WeakSet()
78 self.id = next_db_id
79 next_db_id += 1
80 self.progress_sleeping = False
81 self.commiting = False
82 self.log = logging.getLogger("Db#%s:%s" % (self.id, schema["db_name"]))
83 self.table_names = None
84 self.collect_stats = False
85 self.foreign_keys = False
86 self.need_commit = False
87 self.query_stats = {}
88 self.db_keyvalues = {}
89 self.delayed_queue = []
90 self.delayed_queue_thread = None
91 self.close_idle = close_idle
92 self.last_query_time = time.time()
93 self.last_sleep_time = time.time()
94 self.num_execute_since_sleep = 0
95 self.lock = ThreadPool.Lock()
96 self.connect_lock = ThreadPool.Lock()
97
98 def __repr__(self):
99 return "<Db#%s:%s close_idle:%s>" % (id(self), self.db_path, self.close_idle)
100
101 def connect(self):
102 self.connect_lock.acquire(True)
103 try:
104 if self.conn:
105 self.log.debug("Already connected, connection ignored")
106 return
107
108 if self not in opened_dbs:
109 opened_dbs.append(self)
110 s = time.time()
111 try: # Directory not exist yet
112 os.makedirs(self.db_dir)
113 self.log.debug("Created Db path: %s" % self.db_dir)
114 except OSError as err:
115 if err.errno != errno.EEXIST:
116 raise err
117 if not os.path.isfile(self.db_path):
118 self.log.debug("Db file not exist yet: %s" % self.db_path)
119 self.conn = sqlite3.connect(self.db_path, isolation_level="DEFERRED", check_same_thread=False)
120 self.conn.row_factory = sqlite3.Row
121 self.conn.set_progress_handler(self.progress, 5000000)
122 self.conn.execute('PRAGMA journal_mode=WAL')
123 if self.foreign_keys:
124 self.conn.execute("PRAGMA foreign_keys = ON")
125 self.cur = self.getCursor()
126
127 self.log.debug(
128 "Connected to %s in %.3fs (opened: %s, sqlite version: %s)..." %
129 (self.db_path, time.time() - s, len(opened_dbs), sqlite3.version)
130 )
131 self.log.debug("Connect by thread: %s" % threading.current_thread().ident)
132 self.log.debug("Connect called by %s" % Debug.formatStack())
133 finally:
134 self.connect_lock.release()
135
136 def getConn(self):
137 if not self.conn:
138 self.connect()
139 return self.conn
140
141 def progress(self, *args, **kwargs):
142 self.progress_sleeping = True
143 time.sleep(0.001)
144 self.progress_sleeping = False
145
146 # Execute query using dbcursor
147 def execute(self, query, params=None):
148 if not self.conn:
149 self.connect()
150 return self.cur.execute(query, params)
151
152 @thread_pool_db.wrap
153 def commit(self, reason="Unknown"):
154 if self.progress_sleeping:
155 self.log.debug("Commit ignored: Progress sleeping")
156 return False
157
158 if not self.conn:
159 self.log.debug("Commit ignored: No connection")
160 return False
161
162 if self.commiting:
163 self.log.debug("Commit ignored: Already commiting")
164 return False
165
166 try:
167 s = time.time()
168 self.commiting = True
169 self.conn.commit()
170 self.log.debug("Commited in %.3fs (reason: %s)" % (time.time() - s, reason))
171 return True
172 except Exception as err:
173 if "SQL statements in progress" in str(err):
174 self.log.warning("Commit delayed: %s (reason: %s)" % (Debug.formatException(err), reason))
175 else:
176 self.log.error("Commit error: %s (reason: %s)" % (Debug.formatException(err), reason))
177 return False
178 finally:
179 self.commiting = False
180
181 def insertOrUpdate(self, *args, **kwargs):
182 if not self.conn:
183 self.connect()
184 return self.cur.insertOrUpdate(*args, **kwargs)
185
186 def executeDelayed(self, *args, **kwargs):
187 if not self.delayed_queue_thread:
188 self.delayed_queue_thread = gevent.spawn_later(1, self.processDelayed)
189 self.delayed_queue.append(("execute", (args, kwargs)))
190
191 def insertOrUpdateDelayed(self, *args, **kwargs):
192 if not self.delayed_queue:
193 gevent.spawn_later(1, self.processDelayed)
194 self.delayed_queue.append(("insertOrUpdate", (args, kwargs)))
195
196 def processDelayed(self):
197 if not self.delayed_queue:
198 self.log.debug("processDelayed aborted")
199 return
200 if not self.conn:
201 self.connect()
202
203 s = time.time()
204 cur = self.getCursor()
205 for command, params in self.delayed_queue:
206 if command == "insertOrUpdate":
207 cur.insertOrUpdate(*params[0], **params[1])
208 else:
209 cur.execute(*params[0], **params[1])
210
211 if len(self.delayed_queue) > 10:
212 self.log.debug("Processed %s delayed queue in %.3fs" % (len(self.delayed_queue), time.time() - s))
213 self.delayed_queue = []
214 self.delayed_queue_thread = None
215
216 def close(self, reason="Unknown"):
217 if not self.conn:
218 return False
219 self.connect_lock.acquire()
220 s = time.time()
221 if self.delayed_queue:
222 self.processDelayed()
223 if self in opened_dbs:
224 opened_dbs.remove(self)
225 self.need_commit = False
226 self.commit("Closing: %s" % reason)
227 self.log.debug("Close called by %s" % Debug.formatStack())
228 for i in range(5):
229 if len(self.cursors) == 0:
230 break
231 self.log.debug("Pending cursors: %s" % len(self.cursors))
232 time.sleep(0.1 * i)
233 if len(self.cursors):
234 self.log.debug("Killing cursors: %s" % len(self.cursors))
235 self.conn.interrupt()
236
237 if self.cur:
238 self.cur.close()
239 if self.conn:
240 ThreadPool.main_loop.call(self.conn.close)
241 self.conn = None
242 self.cur = None
243 self.log.debug("%s closed (reason: %s) in %.3fs, opened: %s" % (self.db_path, reason, time.time() - s, len(opened_dbs)))
244 self.connect_lock.release()
245 return True
246
247 # Gets a cursor object to database
248 # Return: Cursor class
249 def getCursor(self):
250 if not self.conn:
251 self.connect()
252
253 cur = DbCursor(self)
254 return cur
255
256 def getSharedCursor(self):
257 if not self.conn:
258 self.connect()
259 return self.cur
260
261 # Get the table version
262 # Return: Table version or None if not exist
263 def getTableVersion(self, table_name):
264 if not self.db_keyvalues: # Get db keyvalues
265 try:
266 res = self.execute("SELECT * FROM keyvalue WHERE json_id=0") # json_id = 0 is internal keyvalues
267 except sqlite3.OperationalError as err: # Table not exist
268 self.log.debug("Query table version error: %s" % err)
269 return False
270
271 for row in res:
272 self.db_keyvalues[row["key"]] = row["value"]
273
274 return self.db_keyvalues.get("table.%s.version" % table_name, 0)
275
276 # Check Db tables
277 # Return: <list> Changed table names
278 def checkTables(self):
279 s = time.time()
280 changed_tables = []
281
282 cur = self.getSharedCursor()
283
284 # Check internal tables
285 # Check keyvalue table
286 changed = cur.needTable("keyvalue", [
287 ["keyvalue_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
288 ["key", "TEXT"],
289 ["value", "INTEGER"],
290 ["json_id", "INTEGER"],
291 ], [
292 "CREATE UNIQUE INDEX key_id ON keyvalue(json_id, key)"
293 ], version=self.schema["version"])
294 if changed:
295 changed_tables.append("keyvalue")
296
297 # Create json table if no custom one defined
298 if "json" not in self.schema.get("tables", {}):
299 if self.schema["version"] == 1:
300 changed = cur.needTable("json", [
301 ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
302 ["path", "VARCHAR(255)"]
303 ], [
304 "CREATE UNIQUE INDEX path ON json(path)"
305 ], version=self.schema["version"])
306 elif self.schema["version"] == 2:
307 changed = cur.needTable("json", [
308 ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
309 ["directory", "VARCHAR(255)"],
310 ["file_name", "VARCHAR(255)"]
311 ], [
312 "CREATE UNIQUE INDEX path ON json(directory, file_name)"
313 ], version=self.schema["version"])
314 elif self.schema["version"] == 3:
315 changed = cur.needTable("json", [
316 ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
317 ["site", "VARCHAR(255)"],
318 ["directory", "VARCHAR(255)"],
319 ["file_name", "VARCHAR(255)"]
320 ], [
321 "CREATE UNIQUE INDEX path ON json(directory, site, file_name)"
322 ], version=self.schema["version"])
323 if changed:
324 changed_tables.append("json")
325
326 # Check schema tables
327 for table_name, table_settings in self.schema.get("tables", {}).items():
328 try:
329 indexes = table_settings.get("indexes", [])
330 version = table_settings.get("schema_changed", 0)
331 changed = cur.needTable(
332 table_name, table_settings["cols"],
333 indexes, version=version
334 )
335 if changed:
336 changed_tables.append(table_name)
337 except Exception as err:
338 self.log.error("Error creating table %s: %s" % (table_name, Debug.formatException(err)))
339 raise DbTableError(err, table_name)
340
341 self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time() - s, changed_tables))
342 if changed_tables:
343 self.db_keyvalues = {} # Refresh table version cache
344
345 return changed_tables
346
347 # Update json file to db
348 # Return: True if matched
349 def updateJson(self, file_path, file=None, cur=None):
350 if not file_path.startswith(self.db_dir):
351 return False # Not from the db dir: Skipping
352 relative_path = file_path[len(self.db_dir):] # File path realative to db file
353
354 # Check if filename matches any of mappings in schema
355 matched_maps = []
356 for match, map_settings in self.schema["maps"].items():
357 try:
358 if SafeRe.match(match, relative_path):
359 matched_maps.append(map_settings)
360 except SafeRe.UnsafePatternError as err:
361 self.log.error(err)
362
363 # No match found for the file
364 if not matched_maps:
365 return False
366
367 # Load the json file
368 try:
369 if file is None: # Open file is not file object passed
370 file = open(file_path, "rb")
371
372 if file is False: # File deleted
373 data = {}
374 else:
375 if file_path.endswith("json.gz"):
376 file = helper.limitedGzipFile(fileobj=file)
377
378 if sys.version_info.major == 3 and sys.version_info.minor < 6:
379 data = json.loads(file.read().decode("utf8"))
380 else:
381 data = json.load(file)
382 except Exception as err:
383 self.log.debug("Json file %s load error: %s" % (file_path, err))
384 data = {}
385
386 # No cursor specificed
387 if not cur:
388 cur = self.getSharedCursor()
389 cur.logging = False
390
391 # Row for current json file if required
392 if not data or [dbmap for dbmap in matched_maps if "to_keyvalue" in dbmap or "to_table" in dbmap]:
393 json_row = cur.getJsonRow(relative_path)
394
395 # Check matched mappings in schema
396 for dbmap in matched_maps:
397 # Insert non-relational key values
398 if dbmap.get("to_keyvalue"):
399 # Get current values
400 res = cur.execute("SELECT * FROM keyvalue WHERE json_id = ?", (json_row["json_id"],))
401 current_keyvalue = {}
402 current_keyvalue_id = {}
403 for row in res:
404 current_keyvalue[row["key"]] = row["value"]
405 current_keyvalue_id[row["key"]] = row["keyvalue_id"]
406
407 for key in dbmap["to_keyvalue"]:
408 if key not in current_keyvalue: # Keyvalue not exist yet in the db
409 cur.execute(
410 "INSERT INTO keyvalue ?",
411 {"key": key, "value": data.get(key), "json_id": json_row["json_id"]}
412 )
413 elif data.get(key) != current_keyvalue[key]: # Keyvalue different value
414 cur.execute(
415 "UPDATE keyvalue SET value = ? WHERE keyvalue_id = ?",
416 (data.get(key), current_keyvalue_id[key])
417 )
418
419 # Insert data to json table for easier joins
420 if dbmap.get("to_json_table"):
421 directory, file_name = re.match("^(.*?)/*([^/]*)$", relative_path).groups()
422 data_json_row = dict(cur.getJsonRow(directory + "/" + dbmap.get("file_name", file_name)))
423 changed = False
424 for key in dbmap["to_json_table"]:
425 if data.get(key) != data_json_row.get(key):
426 changed = True
427 if changed:
428 # Add the custom col values
429 data_json_row.update({key: val for key, val in data.items() if key in dbmap["to_json_table"]})
430 cur.execute("INSERT OR REPLACE INTO json ?", data_json_row)
431
432 # Insert data to tables
433 for table_settings in dbmap.get("to_table", []):
434 if isinstance(table_settings, dict): # Custom settings
435 table_name = table_settings["table"] # Table name to insert datas
436 node = table_settings.get("node", table_name) # Node keyname in data json file
437 key_col = table_settings.get("key_col") # Map dict key as this col
438 val_col = table_settings.get("val_col") # Map dict value as this col
439 import_cols = table_settings.get("import_cols")
440 replaces = table_settings.get("replaces")
441 else: # Simple settings
442 table_name = table_settings
443 node = table_settings
444 key_col = None
445 val_col = None
446 import_cols = None
447 replaces = None
448
449 # Fill import cols from table cols
450 if not import_cols:
451 import_cols = set([item[0] for item in self.schema["tables"][table_name]["cols"]])
452
453 cur.execute("DELETE FROM %s WHERE json_id = ?" % table_name, (json_row["json_id"],))
454
455 if node not in data:
456 continue
457
458 if key_col: # Map as dict
459 for key, val in data[node].items():
460 if val_col: # Single value
461 cur.execute(
462 "INSERT OR REPLACE INTO %s ?" % table_name,
463 {key_col: key, val_col: val, "json_id": json_row["json_id"]}
464 )
465 else: # Multi value
466 if type(val) is dict: # Single row
467 row = val
468 if import_cols:
469 row = {key: row[key] for key in row if key in import_cols} # Filter row by import_cols
470 row[key_col] = key
471 # Replace in value if necessary
472 if replaces:
473 for replace_key, replace in replaces.items():
474 if replace_key in row:
475 for replace_from, replace_to in replace.items():
476 row[replace_key] = row[replace_key].replace(replace_from, replace_to)
477
478 row["json_id"] = json_row["json_id"]
479 cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
480 elif type(val) is list: # Multi row
481 for row in val:
482 row[key_col] = key
483 row["json_id"] = json_row["json_id"]
484 cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
485 else: # Map as list
486 for row in data[node]:
487 row["json_id"] = json_row["json_id"]
488 if import_cols:
489 row = {key: row[key] for key in row if key in import_cols} # Filter row by import_cols
490 cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
491
492 # Cleanup json row
493 if not data:
494 self.log.debug("Cleanup json row for %s" % file_path)
495 cur.execute("DELETE FROM json WHERE json_id = %s" % json_row["json_id"])
496
497 return True
498
499
500if __name__ == "__main__":
501 s = time.time()
502 console_log = logging.StreamHandler()
503 logging.getLogger('').setLevel(logging.DEBUG)
504 logging.getLogger('').addHandler(console_log)
505 console_log.setLevel(logging.DEBUG)
506 dbjson = Db(json.load(open("zerotalk.schema.json")), "data/users/zerotalk.db")
507 dbjson.collect_stats = True
508 dbjson.checkTables()
509 cur = dbjson.getCursor()
510 cur.logging = False
511 dbjson.updateJson("data/users/content.json", cur=cur)
512 for user_dir in os.listdir("data/users"):
513 if os.path.isdir("data/users/%s" % user_dir):
514 dbjson.updateJson("data/users/%s/data.json" % user_dir, cur=cur)
515 # print ".",
516 cur.logging = True
517 print("Done in %.3fs" % (time.time() - s))
518 for query, stats in sorted(dbjson.query_stats.items()):
519 print("-", query, stats)