Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 246 lines 10 kB view raw
1import time 2import re 3from util import helper 4 5# Special sqlite cursor 6 7 8class DbCursor: 9 10 def __init__(self, db): 11 self.db = db 12 self.logging = False 13 14 def quoteValue(self, value): 15 if type(value) is int: 16 return str(value) 17 else: 18 return "'%s'" % value.replace("'", "''") 19 20 def parseQuery(self, query, params): 21 query_type = query.split(" ", 1)[0].upper() 22 if isinstance(params, dict) and "?" in query: # Make easier select and insert by allowing dict params 23 if query_type in ("SELECT", "DELETE", "UPDATE"): 24 # Convert param dict to SELECT * FROM table WHERE key = ? AND key2 = ? format 25 query_wheres = [] 26 values = [] 27 for key, value in params.items(): 28 if type(value) is list: 29 if key.startswith("not__"): 30 field = key.replace("not__", "") 31 operator = "NOT IN" 32 else: 33 field = key 34 operator = "IN" 35 if len(value) > 100: 36 # Embed values in query to avoid "too many SQL variables" error 37 query_values = ",".join(map(helper.sqlquote, value)) 38 else: 39 query_values = ",".join(["?"] * len(value)) 40 values += value 41 query_wheres.append( 42 "%s %s (%s)" % 43 (field, operator, query_values) 44 ) 45 else: 46 if key.startswith("not__"): 47 query_wheres.append(key.replace("not__", "") + " != ?") 48 elif key.endswith("__like"): 49 query_wheres.append(key.replace("__like", "") + " LIKE ?") 50 elif key.endswith(">"): 51 query_wheres.append(key.replace(">", "") + " > ?") 52 elif key.endswith("<"): 53 query_wheres.append(key.replace("<", "") + " < ?") 54 else: 55 query_wheres.append(key + " = ?") 56 values.append(value) 57 wheres = " AND ".join(query_wheres) 58 if wheres == "": 59 wheres = "1" 60 query = re.sub("(.*)[?]", "\\1 %s" % wheres, query) # Replace the last ? 61 params = values 62 else: 63 # Convert param dict to INSERT INTO table (key, key2) VALUES (?, ?) format 64 keys = ", ".join(params.keys()) 65 values = ", ".join(['?' for key in params.keys()]) 66 keysvalues = "(%s) VALUES (%s)" % (keys, values) 67 query = re.sub("(.*)[?]", "\\1%s" % keysvalues, query) # Replace the last ? 68 params = tuple(params.values()) 69 elif isinstance(params, dict) and ":" in query: 70 new_params = dict() 71 values = [] 72 for key, value in params.items(): 73 if type(value) is list: 74 for idx, val in enumerate(value): 75 new_params[key + "__" + str(idx)] = val 76 77 new_names = [":" + key + "__" + str(idx) for idx in range(len(value))] 78 query = re.sub(r":" + re.escape(key) + r"([)\s]|$)", "(%s)%s" % (", ".join(new_names), r"\1"), query) 79 else: 80 new_params[key] = value 81 82 params = new_params 83 return query, params 84 85 def execute(self, query, params=None): 86 query = query.strip() 87 while self.db.progress_sleeping or self.db.commiting: 88 time.sleep(0.1) 89 90 self.db.last_query_time = time.time() 91 92 query, params = self.parseQuery(query, params) 93 94 cursor = self.db.getConn().cursor() 95 self.db.cursors.add(cursor) 96 if self.db.lock.locked(): 97 self.db.log.debug("Locked for %.3fs" % (time.time() - self.db.lock.time_lock)) 98 99 try: 100 s = time.time() 101 self.db.lock.acquire(True) 102 if query.upper().strip("; ") == "VACUUM": 103 self.db.commit("vacuum called") 104 if params: 105 res = cursor.execute(query, params) 106 else: 107 res = cursor.execute(query) 108 finally: 109 self.db.lock.release() 110 111 taken_query = time.time() - s 112 if self.logging or taken_query > 1: 113 if params: # Query has parameters 114 self.db.log.debug("Query: " + query + " " + str(params) + " (Done in %.4f)" % (time.time() - s)) 115 else: 116 self.db.log.debug("Query: " + query + " (Done in %.4f)" % (time.time() - s)) 117 118 # Log query stats 119 if self.db.collect_stats: 120 if query not in self.db.query_stats: 121 self.db.query_stats[query] = {"call": 0, "time": 0.0} 122 self.db.query_stats[query]["call"] += 1 123 self.db.query_stats[query]["time"] += time.time() - s 124 125 query_type = query.split(" ", 1)[0].upper() 126 is_update_query = query_type in ["UPDATE", "DELETE", "INSERT", "CREATE"] 127 if not self.db.need_commit and is_update_query: 128 self.db.need_commit = True 129 130 if is_update_query: 131 return cursor 132 else: 133 return res 134 135 def executemany(self, query, params): 136 while self.db.progress_sleeping or self.db.commiting: 137 time.sleep(0.1) 138 139 self.db.last_query_time = time.time() 140 141 s = time.time() 142 cursor = self.db.getConn().cursor() 143 self.db.cursors.add(cursor) 144 145 try: 146 self.db.lock.acquire(True) 147 cursor.executemany(query, params) 148 finally: 149 self.db.lock.release() 150 151 taken_query = time.time() - s 152 if self.logging or taken_query > 0.1: 153 self.db.log.debug("Execute many: %s (Done in %.4f)" % (query, taken_query)) 154 155 self.db.need_commit = True 156 157 return cursor 158 159 # Creates on updates a database row without incrementing the rowid 160 def insertOrUpdate(self, table, query_sets, query_wheres, oninsert={}): 161 sql_sets = ["%s = :%s" % (key, key) for key in query_sets.keys()] 162 sql_wheres = ["%s = :%s" % (key, key) for key in query_wheres.keys()] 163 164 params = query_sets 165 params.update(query_wheres) 166 res = self.execute( 167 "UPDATE %s SET %s WHERE %s" % (table, ", ".join(sql_sets), " AND ".join(sql_wheres)), 168 params 169 ) 170 if res.rowcount == 0: 171 params.update(oninsert) # Add insert-only fields 172 self.execute("INSERT INTO %s ?" % table, params) 173 174 # Create new table 175 # Return: True on success 176 def createTable(self, table, cols): 177 # TODO: Check current structure 178 self.execute("DROP TABLE IF EXISTS %s" % table) 179 col_definitions = [] 180 for col_name, col_type in cols: 181 col_definitions.append("%s %s" % (col_name, col_type)) 182 183 self.execute("CREATE TABLE %s (%s)" % (table, ",".join(col_definitions))) 184 return True 185 186 # Create indexes on table 187 # Return: True on success 188 def createIndexes(self, table, indexes): 189 for index in indexes: 190 if not index.strip().upper().startswith("CREATE"): 191 self.db.log.error("Index command should start with CREATE: %s" % index) 192 continue 193 self.execute(index) 194 195 # Create table if not exist 196 # Return: True if updated 197 def needTable(self, table, cols, indexes=None, version=1): 198 current_version = self.db.getTableVersion(table) 199 if int(current_version) < int(version): # Table need update or not extis 200 self.db.log.debug("Table %s outdated...version: %s need: %s, rebuilding..." % (table, current_version, version)) 201 self.createTable(table, cols) 202 if indexes: 203 self.createIndexes(table, indexes) 204 self.execute( 205 "INSERT OR REPLACE INTO keyvalue ?", 206 {"json_id": 0, "key": "table.%s.version" % table, "value": version} 207 ) 208 return True 209 else: # Not changed 210 return False 211 212 # Get or create a row for json file 213 # Return: The database row 214 def getJsonRow(self, file_path): 215 directory, file_name = re.match("^(.*?)/*([^/]*)$", file_path).groups() 216 if self.db.schema["version"] == 1: 217 # One path field 218 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"path": file_path}) 219 row = res.fetchone() 220 if not row: # No row yet, create it 221 self.execute("INSERT INTO json ?", {"path": file_path}) 222 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"path": file_path}) 223 row = res.fetchone() 224 elif self.db.schema["version"] == 2: 225 # Separate directory, file_name (easier join) 226 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"directory": directory, "file_name": file_name}) 227 row = res.fetchone() 228 if not row: # No row yet, create it 229 self.execute("INSERT INTO json ?", {"directory": directory, "file_name": file_name}) 230 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"directory": directory, "file_name": file_name}) 231 row = res.fetchone() 232 elif self.db.schema["version"] == 3: 233 # Separate site, directory, file_name (for merger sites) 234 site_address, directory = re.match("^([^/]*)/(.*)$", directory).groups() 235 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"site": site_address, "directory": directory, "file_name": file_name}) 236 row = res.fetchone() 237 if not row: # No row yet, create it 238 self.execute("INSERT INTO json ?", {"site": site_address, "directory": directory, "file_name": file_name}) 239 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"site": site_address, "directory": directory, "file_name": file_name}) 240 row = res.fetchone() 241 else: 242 raise Exception("Dbschema version %s not supported" % self.db.schema.get("version")) 243 return row 244 245 def close(self): 246 pass