Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import re
3from util import helper
4
5# Special sqlite cursor
6
7
8class DbCursor:
9
10 def __init__(self, db):
11 self.db = db
12 self.logging = False
13
14 def quoteValue(self, value):
15 if type(value) is int:
16 return str(value)
17 else:
18 return "'%s'" % value.replace("'", "''")
19
20 def parseQuery(self, query, params):
21 query_type = query.split(" ", 1)[0].upper()
22 if isinstance(params, dict) and "?" in query: # Make easier select and insert by allowing dict params
23 if query_type in ("SELECT", "DELETE", "UPDATE"):
24 # Convert param dict to SELECT * FROM table WHERE key = ? AND key2 = ? format
25 query_wheres = []
26 values = []
27 for key, value in params.items():
28 if type(value) is list:
29 if key.startswith("not__"):
30 field = key.replace("not__", "")
31 operator = "NOT IN"
32 else:
33 field = key
34 operator = "IN"
35 if len(value) > 100:
36 # Embed values in query to avoid "too many SQL variables" error
37 query_values = ",".join(map(helper.sqlquote, value))
38 else:
39 query_values = ",".join(["?"] * len(value))
40 values += value
41 query_wheres.append(
42 "%s %s (%s)" %
43 (field, operator, query_values)
44 )
45 else:
46 if key.startswith("not__"):
47 query_wheres.append(key.replace("not__", "") + " != ?")
48 elif key.endswith("__like"):
49 query_wheres.append(key.replace("__like", "") + " LIKE ?")
50 elif key.endswith(">"):
51 query_wheres.append(key.replace(">", "") + " > ?")
52 elif key.endswith("<"):
53 query_wheres.append(key.replace("<", "") + " < ?")
54 else:
55 query_wheres.append(key + " = ?")
56 values.append(value)
57 wheres = " AND ".join(query_wheres)
58 if wheres == "":
59 wheres = "1"
60 query = re.sub("(.*)[?]", "\\1 %s" % wheres, query) # Replace the last ?
61 params = values
62 else:
63 # Convert param dict to INSERT INTO table (key, key2) VALUES (?, ?) format
64 keys = ", ".join(params.keys())
65 values = ", ".join(['?' for key in params.keys()])
66 keysvalues = "(%s) VALUES (%s)" % (keys, values)
67 query = re.sub("(.*)[?]", "\\1%s" % keysvalues, query) # Replace the last ?
68 params = tuple(params.values())
69 elif isinstance(params, dict) and ":" in query:
70 new_params = dict()
71 values = []
72 for key, value in params.items():
73 if type(value) is list:
74 for idx, val in enumerate(value):
75 new_params[key + "__" + str(idx)] = val
76
77 new_names = [":" + key + "__" + str(idx) for idx in range(len(value))]
78 query = re.sub(r":" + re.escape(key) + r"([)\s]|$)", "(%s)%s" % (", ".join(new_names), r"\1"), query)
79 else:
80 new_params[key] = value
81
82 params = new_params
83 return query, params
84
85 def execute(self, query, params=None):
86 query = query.strip()
87 while self.db.progress_sleeping or self.db.commiting:
88 time.sleep(0.1)
89
90 self.db.last_query_time = time.time()
91
92 query, params = self.parseQuery(query, params)
93
94 cursor = self.db.getConn().cursor()
95 self.db.cursors.add(cursor)
96 if self.db.lock.locked():
97 self.db.log.debug("Locked for %.3fs" % (time.time() - self.db.lock.time_lock))
98
99 try:
100 s = time.time()
101 self.db.lock.acquire(True)
102 if query.upper().strip("; ") == "VACUUM":
103 self.db.commit("vacuum called")
104 if params:
105 res = cursor.execute(query, params)
106 else:
107 res = cursor.execute(query)
108 finally:
109 self.db.lock.release()
110
111 taken_query = time.time() - s
112 if self.logging or taken_query > 1:
113 if params: # Query has parameters
114 self.db.log.debug("Query: " + query + " " + str(params) + " (Done in %.4f)" % (time.time() - s))
115 else:
116 self.db.log.debug("Query: " + query + " (Done in %.4f)" % (time.time() - s))
117
118 # Log query stats
119 if self.db.collect_stats:
120 if query not in self.db.query_stats:
121 self.db.query_stats[query] = {"call": 0, "time": 0.0}
122 self.db.query_stats[query]["call"] += 1
123 self.db.query_stats[query]["time"] += time.time() - s
124
125 query_type = query.split(" ", 1)[0].upper()
126 is_update_query = query_type in ["UPDATE", "DELETE", "INSERT", "CREATE"]
127 if not self.db.need_commit and is_update_query:
128 self.db.need_commit = True
129
130 if is_update_query:
131 return cursor
132 else:
133 return res
134
135 def executemany(self, query, params):
136 while self.db.progress_sleeping or self.db.commiting:
137 time.sleep(0.1)
138
139 self.db.last_query_time = time.time()
140
141 s = time.time()
142 cursor = self.db.getConn().cursor()
143 self.db.cursors.add(cursor)
144
145 try:
146 self.db.lock.acquire(True)
147 cursor.executemany(query, params)
148 finally:
149 self.db.lock.release()
150
151 taken_query = time.time() - s
152 if self.logging or taken_query > 0.1:
153 self.db.log.debug("Execute many: %s (Done in %.4f)" % (query, taken_query))
154
155 self.db.need_commit = True
156
157 return cursor
158
159 # Creates on updates a database row without incrementing the rowid
160 def insertOrUpdate(self, table, query_sets, query_wheres, oninsert={}):
161 sql_sets = ["%s = :%s" % (key, key) for key in query_sets.keys()]
162 sql_wheres = ["%s = :%s" % (key, key) for key in query_wheres.keys()]
163
164 params = query_sets
165 params.update(query_wheres)
166 res = self.execute(
167 "UPDATE %s SET %s WHERE %s" % (table, ", ".join(sql_sets), " AND ".join(sql_wheres)),
168 params
169 )
170 if res.rowcount == 0:
171 params.update(oninsert) # Add insert-only fields
172 self.execute("INSERT INTO %s ?" % table, params)
173
174 # Create new table
175 # Return: True on success
176 def createTable(self, table, cols):
177 # TODO: Check current structure
178 self.execute("DROP TABLE IF EXISTS %s" % table)
179 col_definitions = []
180 for col_name, col_type in cols:
181 col_definitions.append("%s %s" % (col_name, col_type))
182
183 self.execute("CREATE TABLE %s (%s)" % (table, ",".join(col_definitions)))
184 return True
185
186 # Create indexes on table
187 # Return: True on success
188 def createIndexes(self, table, indexes):
189 for index in indexes:
190 if not index.strip().upper().startswith("CREATE"):
191 self.db.log.error("Index command should start with CREATE: %s" % index)
192 continue
193 self.execute(index)
194
195 # Create table if not exist
196 # Return: True if updated
197 def needTable(self, table, cols, indexes=None, version=1):
198 current_version = self.db.getTableVersion(table)
199 if int(current_version) < int(version): # Table need update or not extis
200 self.db.log.debug("Table %s outdated...version: %s need: %s, rebuilding..." % (table, current_version, version))
201 self.createTable(table, cols)
202 if indexes:
203 self.createIndexes(table, indexes)
204 self.execute(
205 "INSERT OR REPLACE INTO keyvalue ?",
206 {"json_id": 0, "key": "table.%s.version" % table, "value": version}
207 )
208 return True
209 else: # Not changed
210 return False
211
212 # Get or create a row for json file
213 # Return: The database row
214 def getJsonRow(self, file_path):
215 directory, file_name = re.match("^(.*?)/*([^/]*)$", file_path).groups()
216 if self.db.schema["version"] == 1:
217 # One path field
218 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"path": file_path})
219 row = res.fetchone()
220 if not row: # No row yet, create it
221 self.execute("INSERT INTO json ?", {"path": file_path})
222 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"path": file_path})
223 row = res.fetchone()
224 elif self.db.schema["version"] == 2:
225 # Separate directory, file_name (easier join)
226 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"directory": directory, "file_name": file_name})
227 row = res.fetchone()
228 if not row: # No row yet, create it
229 self.execute("INSERT INTO json ?", {"directory": directory, "file_name": file_name})
230 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"directory": directory, "file_name": file_name})
231 row = res.fetchone()
232 elif self.db.schema["version"] == 3:
233 # Separate site, directory, file_name (for merger sites)
234 site_address, directory = re.match("^([^/]*)/(.*)$", directory).groups()
235 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"site": site_address, "directory": directory, "file_name": file_name})
236 row = res.fetchone()
237 if not row: # No row yet, create it
238 self.execute("INSERT INTO json ?", {"site": site_address, "directory": directory, "file_name": file_name})
239 res = self.execute("SELECT * FROM json WHERE ? LIMIT 1", {"site": site_address, "directory": directory, "file_name": file_name})
240 row = res.fetchone()
241 else:
242 raise Exception("Dbschema version %s not supported" % self.db.schema.get("version"))
243 return row
244
245 def close(self):
246 pass