Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 519 lines 20 kB view raw
1import sqlite3 2import json 3import time 4import logging 5import re 6import os 7import atexit 8import threading 9import sys 10import weakref 11import errno 12 13import gevent 14 15from Debug import Debug 16from .DbCursor import DbCursor 17from util import SafeRe 18from util import helper 19from util import ThreadPool 20from Config import config 21 22thread_pool_db = ThreadPool.ThreadPool(config.threads_db) 23 24next_db_id = 0 25opened_dbs = [] 26 27 28# Close idle databases to save some memory 29def dbCleanup(): 30 while 1: 31 time.sleep(60 * 5) 32 for db in opened_dbs[:]: 33 idle = time.time() - db.last_query_time 34 if idle > 60 * 5 and db.close_idle: 35 db.close("Cleanup") 36 37 38def dbCommitCheck(): 39 while 1: 40 time.sleep(5) 41 for db in opened_dbs[:]: 42 if not db.need_commit: 43 continue 44 45 success = db.commit("Interval") 46 if success: 47 db.need_commit = False 48 time.sleep(0.1) 49 50 51def dbCloseAll(): 52 for db in opened_dbs[:]: 53 db.close("Close all") 54 55 56gevent.spawn(dbCleanup) 57gevent.spawn(dbCommitCheck) 58atexit.register(dbCloseAll) 59 60 61class DbTableError(Exception): 62 def __init__(self, message, table): 63 super().__init__(message) 64 self.table = table 65 66 67class Db(object): 68 69 def __init__(self, schema, db_path, close_idle=False): 70 global next_db_id 71 self.db_path = db_path 72 self.db_dir = os.path.dirname(db_path) + "/" 73 self.schema = schema 74 self.schema["version"] = self.schema.get("version", 1) 75 self.conn = None 76 self.cur = None 77 self.cursors = weakref.WeakSet() 78 self.id = next_db_id 79 next_db_id += 1 80 self.progress_sleeping = False 81 self.commiting = False 82 self.log = logging.getLogger("Db#%s:%s" % (self.id, schema["db_name"])) 83 self.table_names = None 84 self.collect_stats = False 85 self.foreign_keys = False 86 self.need_commit = False 87 self.query_stats = {} 88 self.db_keyvalues = {} 89 self.delayed_queue = [] 90 self.delayed_queue_thread = None 91 self.close_idle = close_idle 92 self.last_query_time = time.time() 93 self.last_sleep_time = time.time() 94 self.num_execute_since_sleep = 0 95 self.lock = ThreadPool.Lock() 96 self.connect_lock = ThreadPool.Lock() 97 98 def __repr__(self): 99 return "<Db#%s:%s close_idle:%s>" % (id(self), self.db_path, self.close_idle) 100 101 def connect(self): 102 self.connect_lock.acquire(True) 103 try: 104 if self.conn: 105 self.log.debug("Already connected, connection ignored") 106 return 107 108 if self not in opened_dbs: 109 opened_dbs.append(self) 110 s = time.time() 111 try: # Directory not exist yet 112 os.makedirs(self.db_dir) 113 self.log.debug("Created Db path: %s" % self.db_dir) 114 except OSError as err: 115 if err.errno != errno.EEXIST: 116 raise err 117 if not os.path.isfile(self.db_path): 118 self.log.debug("Db file not exist yet: %s" % self.db_path) 119 self.conn = sqlite3.connect(self.db_path, isolation_level="DEFERRED", check_same_thread=False) 120 self.conn.row_factory = sqlite3.Row 121 self.conn.set_progress_handler(self.progress, 5000000) 122 self.conn.execute('PRAGMA journal_mode=WAL') 123 if self.foreign_keys: 124 self.conn.execute("PRAGMA foreign_keys = ON") 125 self.cur = self.getCursor() 126 127 self.log.debug( 128 "Connected to %s in %.3fs (opened: %s, sqlite version: %s)..." % 129 (self.db_path, time.time() - s, len(opened_dbs), sqlite3.version) 130 ) 131 self.log.debug("Connect by thread: %s" % threading.current_thread().ident) 132 self.log.debug("Connect called by %s" % Debug.formatStack()) 133 finally: 134 self.connect_lock.release() 135 136 def getConn(self): 137 if not self.conn: 138 self.connect() 139 return self.conn 140 141 def progress(self, *args, **kwargs): 142 self.progress_sleeping = True 143 time.sleep(0.001) 144 self.progress_sleeping = False 145 146 # Execute query using dbcursor 147 def execute(self, query, params=None): 148 if not self.conn: 149 self.connect() 150 return self.cur.execute(query, params) 151 152 @thread_pool_db.wrap 153 def commit(self, reason="Unknown"): 154 if self.progress_sleeping: 155 self.log.debug("Commit ignored: Progress sleeping") 156 return False 157 158 if not self.conn: 159 self.log.debug("Commit ignored: No connection") 160 return False 161 162 if self.commiting: 163 self.log.debug("Commit ignored: Already commiting") 164 return False 165 166 try: 167 s = time.time() 168 self.commiting = True 169 self.conn.commit() 170 self.log.debug("Commited in %.3fs (reason: %s)" % (time.time() - s, reason)) 171 return True 172 except Exception as err: 173 if "SQL statements in progress" in str(err): 174 self.log.warning("Commit delayed: %s (reason: %s)" % (Debug.formatException(err), reason)) 175 else: 176 self.log.error("Commit error: %s (reason: %s)" % (Debug.formatException(err), reason)) 177 return False 178 finally: 179 self.commiting = False 180 181 def insertOrUpdate(self, *args, **kwargs): 182 if not self.conn: 183 self.connect() 184 return self.cur.insertOrUpdate(*args, **kwargs) 185 186 def executeDelayed(self, *args, **kwargs): 187 if not self.delayed_queue_thread: 188 self.delayed_queue_thread = gevent.spawn_later(1, self.processDelayed) 189 self.delayed_queue.append(("execute", (args, kwargs))) 190 191 def insertOrUpdateDelayed(self, *args, **kwargs): 192 if not self.delayed_queue: 193 gevent.spawn_later(1, self.processDelayed) 194 self.delayed_queue.append(("insertOrUpdate", (args, kwargs))) 195 196 def processDelayed(self): 197 if not self.delayed_queue: 198 self.log.debug("processDelayed aborted") 199 return 200 if not self.conn: 201 self.connect() 202 203 s = time.time() 204 cur = self.getCursor() 205 for command, params in self.delayed_queue: 206 if command == "insertOrUpdate": 207 cur.insertOrUpdate(*params[0], **params[1]) 208 else: 209 cur.execute(*params[0], **params[1]) 210 211 if len(self.delayed_queue) > 10: 212 self.log.debug("Processed %s delayed queue in %.3fs" % (len(self.delayed_queue), time.time() - s)) 213 self.delayed_queue = [] 214 self.delayed_queue_thread = None 215 216 def close(self, reason="Unknown"): 217 if not self.conn: 218 return False 219 self.connect_lock.acquire() 220 s = time.time() 221 if self.delayed_queue: 222 self.processDelayed() 223 if self in opened_dbs: 224 opened_dbs.remove(self) 225 self.need_commit = False 226 self.commit("Closing: %s" % reason) 227 self.log.debug("Close called by %s" % Debug.formatStack()) 228 for i in range(5): 229 if len(self.cursors) == 0: 230 break 231 self.log.debug("Pending cursors: %s" % len(self.cursors)) 232 time.sleep(0.1 * i) 233 if len(self.cursors): 234 self.log.debug("Killing cursors: %s" % len(self.cursors)) 235 self.conn.interrupt() 236 237 if self.cur: 238 self.cur.close() 239 if self.conn: 240 ThreadPool.main_loop.call(self.conn.close) 241 self.conn = None 242 self.cur = None 243 self.log.debug("%s closed (reason: %s) in %.3fs, opened: %s" % (self.db_path, reason, time.time() - s, len(opened_dbs))) 244 self.connect_lock.release() 245 return True 246 247 # Gets a cursor object to database 248 # Return: Cursor class 249 def getCursor(self): 250 if not self.conn: 251 self.connect() 252 253 cur = DbCursor(self) 254 return cur 255 256 def getSharedCursor(self): 257 if not self.conn: 258 self.connect() 259 return self.cur 260 261 # Get the table version 262 # Return: Table version or None if not exist 263 def getTableVersion(self, table_name): 264 if not self.db_keyvalues: # Get db keyvalues 265 try: 266 res = self.execute("SELECT * FROM keyvalue WHERE json_id=0") # json_id = 0 is internal keyvalues 267 except sqlite3.OperationalError as err: # Table not exist 268 self.log.debug("Query table version error: %s" % err) 269 return False 270 271 for row in res: 272 self.db_keyvalues[row["key"]] = row["value"] 273 274 return self.db_keyvalues.get("table.%s.version" % table_name, 0) 275 276 # Check Db tables 277 # Return: <list> Changed table names 278 def checkTables(self): 279 s = time.time() 280 changed_tables = [] 281 282 cur = self.getSharedCursor() 283 284 # Check internal tables 285 # Check keyvalue table 286 changed = cur.needTable("keyvalue", [ 287 ["keyvalue_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], 288 ["key", "TEXT"], 289 ["value", "INTEGER"], 290 ["json_id", "INTEGER"], 291 ], [ 292 "CREATE UNIQUE INDEX key_id ON keyvalue(json_id, key)" 293 ], version=self.schema["version"]) 294 if changed: 295 changed_tables.append("keyvalue") 296 297 # Create json table if no custom one defined 298 if "json" not in self.schema.get("tables", {}): 299 if self.schema["version"] == 1: 300 changed = cur.needTable("json", [ 301 ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], 302 ["path", "VARCHAR(255)"] 303 ], [ 304 "CREATE UNIQUE INDEX path ON json(path)" 305 ], version=self.schema["version"]) 306 elif self.schema["version"] == 2: 307 changed = cur.needTable("json", [ 308 ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], 309 ["directory", "VARCHAR(255)"], 310 ["file_name", "VARCHAR(255)"] 311 ], [ 312 "CREATE UNIQUE INDEX path ON json(directory, file_name)" 313 ], version=self.schema["version"]) 314 elif self.schema["version"] == 3: 315 changed = cur.needTable("json", [ 316 ["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"], 317 ["site", "VARCHAR(255)"], 318 ["directory", "VARCHAR(255)"], 319 ["file_name", "VARCHAR(255)"] 320 ], [ 321 "CREATE UNIQUE INDEX path ON json(directory, site, file_name)" 322 ], version=self.schema["version"]) 323 if changed: 324 changed_tables.append("json") 325 326 # Check schema tables 327 for table_name, table_settings in self.schema.get("tables", {}).items(): 328 try: 329 indexes = table_settings.get("indexes", []) 330 version = table_settings.get("schema_changed", 0) 331 changed = cur.needTable( 332 table_name, table_settings["cols"], 333 indexes, version=version 334 ) 335 if changed: 336 changed_tables.append(table_name) 337 except Exception as err: 338 self.log.error("Error creating table %s: %s" % (table_name, Debug.formatException(err))) 339 raise DbTableError(err, table_name) 340 341 self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time() - s, changed_tables)) 342 if changed_tables: 343 self.db_keyvalues = {} # Refresh table version cache 344 345 return changed_tables 346 347 # Update json file to db 348 # Return: True if matched 349 def updateJson(self, file_path, file=None, cur=None): 350 if not file_path.startswith(self.db_dir): 351 return False # Not from the db dir: Skipping 352 relative_path = file_path[len(self.db_dir):] # File path realative to db file 353 354 # Check if filename matches any of mappings in schema 355 matched_maps = [] 356 for match, map_settings in self.schema["maps"].items(): 357 try: 358 if SafeRe.match(match, relative_path): 359 matched_maps.append(map_settings) 360 except SafeRe.UnsafePatternError as err: 361 self.log.error(err) 362 363 # No match found for the file 364 if not matched_maps: 365 return False 366 367 # Load the json file 368 try: 369 if file is None: # Open file is not file object passed 370 file = open(file_path, "rb") 371 372 if file is False: # File deleted 373 data = {} 374 else: 375 if file_path.endswith("json.gz"): 376 file = helper.limitedGzipFile(fileobj=file) 377 378 if sys.version_info.major == 3 and sys.version_info.minor < 6: 379 data = json.loads(file.read().decode("utf8")) 380 else: 381 data = json.load(file) 382 except Exception as err: 383 self.log.debug("Json file %s load error: %s" % (file_path, err)) 384 data = {} 385 386 # No cursor specificed 387 if not cur: 388 cur = self.getSharedCursor() 389 cur.logging = False 390 391 # Row for current json file if required 392 if not data or [dbmap for dbmap in matched_maps if "to_keyvalue" in dbmap or "to_table" in dbmap]: 393 json_row = cur.getJsonRow(relative_path) 394 395 # Check matched mappings in schema 396 for dbmap in matched_maps: 397 # Insert non-relational key values 398 if dbmap.get("to_keyvalue"): 399 # Get current values 400 res = cur.execute("SELECT * FROM keyvalue WHERE json_id = ?", (json_row["json_id"],)) 401 current_keyvalue = {} 402 current_keyvalue_id = {} 403 for row in res: 404 current_keyvalue[row["key"]] = row["value"] 405 current_keyvalue_id[row["key"]] = row["keyvalue_id"] 406 407 for key in dbmap["to_keyvalue"]: 408 if key not in current_keyvalue: # Keyvalue not exist yet in the db 409 cur.execute( 410 "INSERT INTO keyvalue ?", 411 {"key": key, "value": data.get(key), "json_id": json_row["json_id"]} 412 ) 413 elif data.get(key) != current_keyvalue[key]: # Keyvalue different value 414 cur.execute( 415 "UPDATE keyvalue SET value = ? WHERE keyvalue_id = ?", 416 (data.get(key), current_keyvalue_id[key]) 417 ) 418 419 # Insert data to json table for easier joins 420 if dbmap.get("to_json_table"): 421 directory, file_name = re.match("^(.*?)/*([^/]*)$", relative_path).groups() 422 data_json_row = dict(cur.getJsonRow(directory + "/" + dbmap.get("file_name", file_name))) 423 changed = False 424 for key in dbmap["to_json_table"]: 425 if data.get(key) != data_json_row.get(key): 426 changed = True 427 if changed: 428 # Add the custom col values 429 data_json_row.update({key: val for key, val in data.items() if key in dbmap["to_json_table"]}) 430 cur.execute("INSERT OR REPLACE INTO json ?", data_json_row) 431 432 # Insert data to tables 433 for table_settings in dbmap.get("to_table", []): 434 if isinstance(table_settings, dict): # Custom settings 435 table_name = table_settings["table"] # Table name to insert datas 436 node = table_settings.get("node", table_name) # Node keyname in data json file 437 key_col = table_settings.get("key_col") # Map dict key as this col 438 val_col = table_settings.get("val_col") # Map dict value as this col 439 import_cols = table_settings.get("import_cols") 440 replaces = table_settings.get("replaces") 441 else: # Simple settings 442 table_name = table_settings 443 node = table_settings 444 key_col = None 445 val_col = None 446 import_cols = None 447 replaces = None 448 449 # Fill import cols from table cols 450 if not import_cols: 451 import_cols = set([item[0] for item in self.schema["tables"][table_name]["cols"]]) 452 453 cur.execute("DELETE FROM %s WHERE json_id = ?" % table_name, (json_row["json_id"],)) 454 455 if node not in data: 456 continue 457 458 if key_col: # Map as dict 459 for key, val in data[node].items(): 460 if val_col: # Single value 461 cur.execute( 462 "INSERT OR REPLACE INTO %s ?" % table_name, 463 {key_col: key, val_col: val, "json_id": json_row["json_id"]} 464 ) 465 else: # Multi value 466 if type(val) is dict: # Single row 467 row = val 468 if import_cols: 469 row = {key: row[key] for key in row if key in import_cols} # Filter row by import_cols 470 row[key_col] = key 471 # Replace in value if necessary 472 if replaces: 473 for replace_key, replace in replaces.items(): 474 if replace_key in row: 475 for replace_from, replace_to in replace.items(): 476 row[replace_key] = row[replace_key].replace(replace_from, replace_to) 477 478 row["json_id"] = json_row["json_id"] 479 cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row) 480 elif type(val) is list: # Multi row 481 for row in val: 482 row[key_col] = key 483 row["json_id"] = json_row["json_id"] 484 cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row) 485 else: # Map as list 486 for row in data[node]: 487 row["json_id"] = json_row["json_id"] 488 if import_cols: 489 row = {key: row[key] for key in row if key in import_cols} # Filter row by import_cols 490 cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row) 491 492 # Cleanup json row 493 if not data: 494 self.log.debug("Cleanup json row for %s" % file_path) 495 cur.execute("DELETE FROM json WHERE json_id = %s" % json_row["json_id"]) 496 497 return True 498 499 500if __name__ == "__main__": 501 s = time.time() 502 console_log = logging.StreamHandler() 503 logging.getLogger('').setLevel(logging.DEBUG) 504 logging.getLogger('').addHandler(console_log) 505 console_log.setLevel(logging.DEBUG) 506 dbjson = Db(json.load(open("zerotalk.schema.json")), "data/users/zerotalk.db") 507 dbjson.collect_stats = True 508 dbjson.checkTables() 509 cur = dbjson.getCursor() 510 cur.logging = False 511 dbjson.updateJson("data/users/content.json", cur=cur) 512 for user_dir in os.listdir("data/users"): 513 if os.path.isdir("data/users/%s" % user_dir): 514 dbjson.updateJson("data/users/%s/data.json" % user_dir, cur=cur) 515 # print ".", 516 cur.logging = True 517 print("Done in %.3fs" % (time.time() - s)) 518 for query, stats in sorted(dbjson.query_stats.items()): 519 print("-", query, stats)