Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 156 lines 6.5 kB view raw
1import time 2import re 3 4import gevent 5 6from Config import config 7from Db import Db 8from util import helper 9 10 11class BootstrapperDb(Db.Db): 12 def __init__(self): 13 self.version = 7 14 self.hash_ids = {} # hash -> id cache 15 super(BootstrapperDb, self).__init__({"db_name": "Bootstrapper"}, "%s/bootstrapper.db" % config.data_dir) 16 self.foreign_keys = True 17 self.checkTables() 18 self.updateHashCache() 19 gevent.spawn(self.cleanup) 20 21 def cleanup(self): 22 while 1: 23 time.sleep(4 * 60) 24 timeout = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 60 * 40)) 25 self.execute("DELETE FROM peer WHERE date_announced < ?", [timeout]) 26 27 def updateHashCache(self): 28 res = self.execute("SELECT * FROM hash") 29 self.hash_ids = {row["hash"]: row["hash_id"] for row in res} 30 self.log.debug("Loaded %s hash_ids" % len(self.hash_ids)) 31 32 def checkTables(self): 33 version = int(self.execute("PRAGMA user_version").fetchone()[0]) 34 self.log.debug("Db version: %s, needed: %s" % (version, self.version)) 35 if version < self.version: 36 self.createTables() 37 else: 38 self.execute("VACUUM") 39 40 def createTables(self): 41 # Delete all tables 42 self.execute("PRAGMA writable_schema = 1") 43 self.execute("DELETE FROM sqlite_master WHERE type IN ('table', 'index', 'trigger')") 44 self.execute("PRAGMA writable_schema = 0") 45 self.execute("VACUUM") 46 self.execute("PRAGMA INTEGRITY_CHECK") 47 # Create new tables 48 self.execute(""" 49 CREATE TABLE peer ( 50 peer_id INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE, 51 type TEXT, 52 address TEXT, 53 port INTEGER NOT NULL, 54 date_added DATETIME DEFAULT (CURRENT_TIMESTAMP), 55 date_announced DATETIME DEFAULT (CURRENT_TIMESTAMP) 56 ); 57 """) 58 self.execute("CREATE UNIQUE INDEX peer_key ON peer (address, port);") 59 60 self.execute(""" 61 CREATE TABLE peer_to_hash ( 62 peer_to_hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL, 63 peer_id INTEGER REFERENCES peer (peer_id) ON DELETE CASCADE, 64 hash_id INTEGER REFERENCES hash (hash_id) 65 ); 66 """) 67 self.execute("CREATE INDEX peer_id ON peer_to_hash (peer_id);") 68 self.execute("CREATE INDEX hash_id ON peer_to_hash (hash_id);") 69 70 self.execute(""" 71 CREATE TABLE hash ( 72 hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL, 73 hash BLOB UNIQUE NOT NULL, 74 date_added DATETIME DEFAULT (CURRENT_TIMESTAMP) 75 ); 76 """) 77 self.execute("PRAGMA user_version = %s" % self.version) 78 79 def getHashId(self, hash): 80 if hash not in self.hash_ids: 81 self.log.debug("New hash: %s" % repr(hash)) 82 res = self.execute("INSERT OR IGNORE INTO hash ?", {"hash": hash}) 83 self.hash_ids[hash] = res.lastrowid 84 return self.hash_ids[hash] 85 86 def peerAnnounce(self, ip_type, address, port=None, hashes=[], onion_signed=False, delete_missing_hashes=False): 87 hashes_ids_announced = [] 88 for hash in hashes: 89 hashes_ids_announced.append(self.getHashId(hash)) 90 91 # Check user 92 res = self.execute("SELECT peer_id FROM peer WHERE ? LIMIT 1", {"address": address, "port": port}) 93 94 user_row = res.fetchone() 95 now = time.strftime("%Y-%m-%d %H:%M:%S") 96 if user_row: 97 peer_id = user_row["peer_id"] 98 self.execute("UPDATE peer SET date_announced = ? WHERE peer_id = ?", (now, peer_id)) 99 else: 100 self.log.debug("New peer: %s signed: %s" % (address, onion_signed)) 101 if ip_type == "onion" and not onion_signed: 102 return len(hashes) 103 res = self.execute("INSERT INTO peer ?", {"type": ip_type, "address": address, "port": port, "date_announced": now}) 104 peer_id = res.lastrowid 105 106 # Check user's hashes 107 res = self.execute("SELECT * FROM peer_to_hash WHERE ?", {"peer_id": peer_id}) 108 hash_ids_db = [row["hash_id"] for row in res] 109 if hash_ids_db != hashes_ids_announced: 110 hash_ids_added = set(hashes_ids_announced) - set(hash_ids_db) 111 hash_ids_removed = set(hash_ids_db) - set(hashes_ids_announced) 112 if ip_type != "onion" or onion_signed: 113 for hash_id in hash_ids_added: 114 self.execute("INSERT INTO peer_to_hash ?", {"peer_id": peer_id, "hash_id": hash_id}) 115 if hash_ids_removed and delete_missing_hashes: 116 self.execute("DELETE FROM peer_to_hash WHERE ?", {"peer_id": peer_id, "hash_id": list(hash_ids_removed)}) 117 118 return len(hash_ids_added) + len(hash_ids_removed) 119 else: 120 return 0 121 122 def peerList(self, hash, address=None, onions=[], port=None, limit=30, need_types=["ipv4", "onion"], order=True): 123 back = {"ipv4": [], "ipv6": [], "onion": []} 124 if limit == 0: 125 return back 126 hashid = self.getHashId(hash) 127 128 if order: 129 order_sql = "ORDER BY date_announced DESC" 130 else: 131 order_sql = "" 132 where_sql = "hash_id = :hashid" 133 if onions: 134 onions_escaped = ["'%s'" % re.sub("[^a-z0-9,]", "", onion) for onion in onions if type(onion) is str] 135 where_sql += " AND address NOT IN (%s)" % ",".join(onions_escaped) 136 elif address: 137 where_sql += " AND NOT (address = :address AND port = :port)" 138 139 query = """ 140 SELECT type, address, port 141 FROM peer_to_hash 142 LEFT JOIN peer USING (peer_id) 143 WHERE %s 144 %s 145 LIMIT :limit 146 """ % (where_sql, order_sql) 147 res = self.execute(query, {"hashid": hashid, "address": address, "port": port, "limit": limit}) 148 149 for row in res: 150 if row["type"] in need_types: 151 if row["type"] == "onion": 152 packed = helper.packOnionAddress(row["address"], row["port"]) 153 else: 154 packed = helper.packAddress(str(row["address"]), row["port"]) 155 back[row["type"]].append(packed) 156 return back