A from-scratch atproto PDS implementation in Python (mirrors https://github.com/DavidBuchanan314/millipds)
1# TODO: some smarter way of handling migrations 2 3import apsw 4import apsw.bestpractice 5 6apsw.bestpractice.apply(apsw.bestpractice.recommended) 7 8from millipds import static_config 9 10 11def migrate(con: apsw.Connection): 12 version_now, *_ = con.execute("SELECT db_version FROM config").fetchone() 13 14 assert version_now == 2 15 16 con.execute( 17 """ 18 CREATE TABLE revoked_token( 19 did TEXT NOT NULL, 20 jti TEXT NOT NULL, 21 expires_at INTEGER NOT NULL, 22 PRIMARY KEY (did, jti) 23 ) STRICT, WITHOUT ROWID 24 """ 25 ) 26 27 con.execute("UPDATE config SET db_version=3") 28 29 30if __name__ == "__main__": 31 with apsw.Connection(static_config.MAIN_DB_PATH) as con: 32 migrate(con) 33 34 print("v2 -> v3 Migration successful")