atproto user agency toolkit for individuals and groups
at main 128 lines 3.5 kB view raw
1/** 2 * SQLite-backed Datastore for libp2p. 3 * 4 * Replaces FsDatastore to avoid filesystem churn. 5 * Stores libp2p peer/routing data in a single SQLite table. 6 */ 7 8import type Database from "better-sqlite3"; 9import { Key } from "interface-datastore"; 10import type { Pair, Query, KeyQuery, Batch } from "interface-datastore"; 11type DatastorePair = Pair; 12 13export class SqliteDatastore { 14 private db: Database.Database; 15 16 constructor(db: Database.Database) { 17 this.db = db; 18 this.db.exec(` 19 CREATE TABLE IF NOT EXISTS ipfs_datastore ( 20 key TEXT PRIMARY KEY, 21 value BLOB NOT NULL 22 ) 23 `); 24 } 25 26 async put(key: Key, val: Uint8Array): Promise<Key> { 27 this.db 28 .prepare("INSERT OR REPLACE INTO ipfs_datastore (key, value) VALUES (?, ?)") 29 .run(key.toString(), Buffer.from(val)); 30 return key; 31 } 32 33 async get(key: Key): Promise<Uint8Array> { 34 const row = this.db 35 .prepare("SELECT value FROM ipfs_datastore WHERE key = ?") 36 .get(key.toString()) as { value: Buffer } | undefined; 37 if (!row) { 38 throw new Error(`Key not found: ${key.toString()}`); 39 } 40 return new Uint8Array(row.value); 41 } 42 43 async has(key: Key): Promise<boolean> { 44 const row = this.db 45 .prepare("SELECT 1 FROM ipfs_datastore WHERE key = ?") 46 .get(key.toString()); 47 return row !== undefined; 48 } 49 50 async delete(key: Key): Promise<void> { 51 this.db 52 .prepare("DELETE FROM ipfs_datastore WHERE key = ?") 53 .run(key.toString()); 54 } 55 56 async * putMany(source: AsyncIterable<DatastorePair> | Iterable<DatastorePair>): AsyncGenerator<Key> { 57 for await (const { key, value } of source) { 58 await this.put(key, value); 59 yield key; 60 } 61 } 62 63 async * getMany(source: AsyncIterable<Key> | Iterable<Key>): AsyncGenerator<DatastorePair> { 64 for await (const key of source) { 65 const value = await this.get(key); 66 yield { key, value }; 67 } 68 } 69 70 async * deleteMany(source: AsyncIterable<Key> | Iterable<Key>): AsyncGenerator<Key> { 71 for await (const key of source) { 72 await this.delete(key); 73 yield key; 74 } 75 } 76 77 query(q: Query): AsyncIterable<DatastorePair> { 78 const self = this; 79 return (async function* () { 80 const prefix = q.prefix ?? "/"; 81 const rows = self.db 82 .prepare("SELECT key, value FROM ipfs_datastore WHERE key LIKE ?") 83 .all(prefix + "%") as Array<{ key: string; value: Buffer }>; 84 for (const row of rows) { 85 yield { key: new Key(row.key), value: new Uint8Array(row.value) }; 86 } 87 })(); 88 } 89 90 queryKeys(q: KeyQuery): AsyncIterable<Key> { 91 const self = this; 92 return (async function* () { 93 const prefix = q.prefix ?? "/"; 94 const rows = self.db 95 .prepare("SELECT key FROM ipfs_datastore WHERE key LIKE ?") 96 .all(prefix + "%") as Array<{ key: string }>; 97 for (const row of rows) { 98 yield new Key(row.key); 99 } 100 })(); 101 } 102 103 batch(): Batch { 104 const ops: Array<{ type: "put"; key: Key; value: Uint8Array } | { type: "delete"; key: Key }> = []; 105 return { 106 put: (key: Key, value: Uint8Array) => { 107 ops.push({ type: "put", key, value }); 108 }, 109 delete: (key: Key) => { 110 ops.push({ type: "delete", key }); 111 }, 112 commit: async () => { 113 const insertStmt = this.db.prepare("INSERT OR REPLACE INTO ipfs_datastore (key, value) VALUES (?, ?)"); 114 const deleteStmt = this.db.prepare("DELETE FROM ipfs_datastore WHERE key = ?"); 115 const tx = this.db.transaction(() => { 116 for (const op of ops) { 117 if (op.type === "put") { 118 insertStmt.run(op.key.toString(), Buffer.from(op.value)); 119 } else { 120 deleteStmt.run(op.key.toString()); 121 } 122 } 123 }); 124 tx(); 125 }, 126 }; 127 } 128}