Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import io
2
3
4class TestDb:
5 def testCheckTables(self, db):
6 tables = [row["name"] for row in db.execute("SELECT name FROM sqlite_master WHERE type='table'")]
7 assert "keyvalue" in tables # To store simple key -> value
8 assert "json" in tables # Json file path registry
9 assert "test" in tables # The table defined in dbschema.json
10
11 # Verify test table
12 cols = [col["name"] for col in db.execute("PRAGMA table_info(test)")]
13 assert "test_id" in cols
14 assert "title" in cols
15
16 # Add new table
17 assert "newtest" not in tables
18 db.schema["tables"]["newtest"] = {
19 "cols": [
20 ["newtest_id", "INTEGER"],
21 ["newtitle", "TEXT"],
22 ],
23 "indexes": ["CREATE UNIQUE INDEX newtest_id ON newtest(newtest_id)"],
24 "schema_changed": 1426195822
25 }
26 db.checkTables()
27 tables = [row["name"] for row in db.execute("SELECT name FROM sqlite_master WHERE type='table'")]
28 assert "test" in tables
29 assert "newtest" in tables
30
31 def testQueries(self, db):
32 # Test insert
33 for i in range(100):
34 db.execute("INSERT INTO test ?", {"test_id": i, "title": "Test #%s" % i})
35
36 assert db.execute("SELECT COUNT(*) AS num FROM test").fetchone()["num"] == 100
37
38 # Test single select
39 assert db.execute("SELECT COUNT(*) AS num FROM test WHERE ?", {"test_id": 1}).fetchone()["num"] == 1
40
41 # Test multiple select
42 assert db.execute("SELECT COUNT(*) AS num FROM test WHERE ?", {"test_id": [1, 2, 3]}).fetchone()["num"] == 3
43 assert db.execute(
44 "SELECT COUNT(*) AS num FROM test WHERE ?",
45 {"test_id": [1, 2, 3], "title": "Test #2"}
46 ).fetchone()["num"] == 1
47 assert db.execute(
48 "SELECT COUNT(*) AS num FROM test WHERE ?",
49 {"test_id": [1, 2, 3], "title": ["Test #2", "Test #3", "Test #4"]}
50 ).fetchone()["num"] == 2
51
52 # Test multiple select using named params
53 assert db.execute("SELECT COUNT(*) AS num FROM test WHERE test_id IN :test_id", {"test_id": [1, 2, 3]}).fetchone()["num"] == 3
54 assert db.execute(
55 "SELECT COUNT(*) AS num FROM test WHERE test_id IN :test_id AND title = :title",
56 {"test_id": [1, 2, 3], "title": "Test #2"}
57 ).fetchone()["num"] == 1
58 assert db.execute(
59 "SELECT COUNT(*) AS num FROM test WHERE test_id IN :test_id AND title IN :title",
60 {"test_id": [1, 2, 3], "title": ["Test #2", "Test #3", "Test #4"]}
61 ).fetchone()["num"] == 2
62
63 # Large ammount of IN values
64 assert db.execute(
65 "SELECT COUNT(*) AS num FROM test WHERE ?",
66 {"not__test_id": list(range(2, 3000))}
67 ).fetchone()["num"] == 2
68 assert db.execute(
69 "SELECT COUNT(*) AS num FROM test WHERE ?",
70 {"test_id": list(range(50, 3000))}
71 ).fetchone()["num"] == 50
72
73 assert db.execute(
74 "SELECT COUNT(*) AS num FROM test WHERE ?",
75 {"not__title": ["Test #%s" % i for i in range(50, 3000)]}
76 ).fetchone()["num"] == 50
77
78 assert db.execute(
79 "SELECT COUNT(*) AS num FROM test WHERE ?",
80 {"title__like": "%20%"}
81 ).fetchone()["num"] == 1
82
83 # Test named parameter escaping
84 assert db.execute(
85 "SELECT COUNT(*) AS num FROM test WHERE test_id = :test_id AND title LIKE :titlelike",
86 {"test_id": 1, "titlelike": "Test%"}
87 ).fetchone()["num"] == 1
88
89 def testEscaping(self, db):
90 # Test insert
91 for i in range(100):
92 db.execute("INSERT INTO test ?", {"test_id": i, "title": "Test '\" #%s" % i})
93
94 assert db.execute(
95 "SELECT COUNT(*) AS num FROM test WHERE ?",
96 {"title": "Test '\" #1"}
97 ).fetchone()["num"] == 1
98
99 assert db.execute(
100 "SELECT COUNT(*) AS num FROM test WHERE ?",
101 {"title": ["Test '\" #%s" % i for i in range(0, 50)]}
102 ).fetchone()["num"] == 50
103
104 assert db.execute(
105 "SELECT COUNT(*) AS num FROM test WHERE ?",
106 {"not__title": ["Test '\" #%s" % i for i in range(50, 3000)]}
107 ).fetchone()["num"] == 50
108
109
110 def testUpdateJson(self, db):
111 f = io.BytesIO()
112 f.write("""
113 {
114 "test": [
115 {"test_id": 1, "title": "Test 1 title", "extra col": "Ignore it"}
116 ]
117 }
118 """.encode())
119 f.seek(0)
120 assert db.updateJson(db.db_dir + "data.json", f) is True
121 assert db.execute("SELECT COUNT(*) AS num FROM test_importfilter").fetchone()["num"] == 1
122 assert db.execute("SELECT COUNT(*) AS num FROM test").fetchone()["num"] == 1
123
124 def testUnsafePattern(self, db):
125 db.schema["maps"] = {"[A-Za-z.]*": db.schema["maps"]["data.json"]} # Only repetition of . supported
126 f = io.StringIO()
127 f.write("""
128 {
129 "test": [
130 {"test_id": 1, "title": "Test 1 title", "extra col": "Ignore it"}
131 ]
132 }
133 """)
134 f.seek(0)
135 assert db.updateJson(db.db_dir + "data.json", f) is False
136 assert db.execute("SELECT COUNT(*) AS num FROM test_importfilter").fetchone()["num"] == 0
137 assert db.execute("SELECT COUNT(*) AS num FROM test").fetchone()["num"] == 0