atproto user agency toolkit for individuals and groups
1/**
2 * SQLite-backed Datastore for libp2p.
3 *
4 * Replaces FsDatastore to avoid filesystem churn.
5 * Stores libp2p peer/routing data in a single SQLite table.
6 */
7
8import type Database from "better-sqlite3";
9import { Key } from "interface-datastore";
10import type { Pair, Query, KeyQuery, Batch } from "interface-datastore";
11type DatastorePair = Pair;
12
13export class SqliteDatastore {
14 private db: Database.Database;
15
16 constructor(db: Database.Database) {
17 this.db = db;
18 this.db.exec(`
19 CREATE TABLE IF NOT EXISTS ipfs_datastore (
20 key TEXT PRIMARY KEY,
21 value BLOB NOT NULL
22 )
23 `);
24 }
25
26 async put(key: Key, val: Uint8Array): Promise<Key> {
27 this.db
28 .prepare("INSERT OR REPLACE INTO ipfs_datastore (key, value) VALUES (?, ?)")
29 .run(key.toString(), Buffer.from(val));
30 return key;
31 }
32
33 async get(key: Key): Promise<Uint8Array> {
34 const row = this.db
35 .prepare("SELECT value FROM ipfs_datastore WHERE key = ?")
36 .get(key.toString()) as { value: Buffer } | undefined;
37 if (!row) {
38 throw new Error(`Key not found: ${key.toString()}`);
39 }
40 return new Uint8Array(row.value);
41 }
42
43 async has(key: Key): Promise<boolean> {
44 const row = this.db
45 .prepare("SELECT 1 FROM ipfs_datastore WHERE key = ?")
46 .get(key.toString());
47 return row !== undefined;
48 }
49
50 async delete(key: Key): Promise<void> {
51 this.db
52 .prepare("DELETE FROM ipfs_datastore WHERE key = ?")
53 .run(key.toString());
54 }
55
56 async * putMany(source: AsyncIterable<DatastorePair> | Iterable<DatastorePair>): AsyncGenerator<Key> {
57 for await (const { key, value } of source) {
58 await this.put(key, value);
59 yield key;
60 }
61 }
62
63 async * getMany(source: AsyncIterable<Key> | Iterable<Key>): AsyncGenerator<DatastorePair> {
64 for await (const key of source) {
65 const value = await this.get(key);
66 yield { key, value };
67 }
68 }
69
70 async * deleteMany(source: AsyncIterable<Key> | Iterable<Key>): AsyncGenerator<Key> {
71 for await (const key of source) {
72 await this.delete(key);
73 yield key;
74 }
75 }
76
77 query(q: Query): AsyncIterable<DatastorePair> {
78 const self = this;
79 return (async function* () {
80 const prefix = q.prefix ?? "/";
81 const rows = self.db
82 .prepare("SELECT key, value FROM ipfs_datastore WHERE key LIKE ?")
83 .all(prefix + "%") as Array<{ key: string; value: Buffer }>;
84 for (const row of rows) {
85 yield { key: new Key(row.key), value: new Uint8Array(row.value) };
86 }
87 })();
88 }
89
90 queryKeys(q: KeyQuery): AsyncIterable<Key> {
91 const self = this;
92 return (async function* () {
93 const prefix = q.prefix ?? "/";
94 const rows = self.db
95 .prepare("SELECT key FROM ipfs_datastore WHERE key LIKE ?")
96 .all(prefix + "%") as Array<{ key: string }>;
97 for (const row of rows) {
98 yield new Key(row.key);
99 }
100 })();
101 }
102
103 batch(): Batch {
104 const ops: Array<{ type: "put"; key: Key; value: Uint8Array } | { type: "delete"; key: Key }> = [];
105 return {
106 put: (key: Key, value: Uint8Array) => {
107 ops.push({ type: "put", key, value });
108 },
109 delete: (key: Key) => {
110 ops.push({ type: "delete", key });
111 },
112 commit: async () => {
113 const insertStmt = this.db.prepare("INSERT OR REPLACE INTO ipfs_datastore (key, value) VALUES (?, ?)");
114 const deleteStmt = this.db.prepare("DELETE FROM ipfs_datastore WHERE key = ?");
115 const tx = this.db.transaction(() => {
116 for (const op of ops) {
117 if (op.type === "put") {
118 insertStmt.run(op.key.toString(), Buffer.from(op.value));
119 } else {
120 deleteStmt.run(op.key.toString());
121 }
122 }
123 });
124 tx();
125 },
126 };
127 }
128}