alf: the atproto Latency Fabric
alf.fly.dev/
1// ABOUTME: Kysely database configuration and schema initialization for ALF (Atproto Latency Fabric)
2
3import { Kysely, SqliteDialect, PostgresDialect, Dialect } from 'kysely';
4import SQLite from 'better-sqlite3';
5import { Pool as PgPool } from 'pg';
6import type { Database } from './schema.js';
7import type { ServiceConfig } from './config.js';
8import { createLogger } from './logger.js';
9import path from 'path';
10import fs from 'fs';
11
12const logger = createLogger('Database');
13
14/**
15 * Create a Kysely dialect based on the configuration
16 */
17export function createDialect(config: ServiceConfig): Dialect {
18 if (config.databaseType === 'postgres') {
19 if (!config.databaseUrl) {
20 throw new Error('DATABASE_URL is required for PostgreSQL');
21 }
22
23 const pool = new PgPool({
24 connectionString: config.databaseUrl,
25 max: 10,
26 });
27
28 return new PostgresDialect({ pool });
29 }
30
31 // SQLite (default)
32 const dbFile = config.databasePath;
33
34 if (dbFile !== ':memory:') {
35 const dbDir = path.dirname(dbFile);
36 if (!fs.existsSync(dbDir)) {
37 fs.mkdirSync(dbDir, { recursive: true });
38 }
39 }
40
41 return new SqliteDialect({
42 database: new SQLite(dbFile),
43 });
44}
45
46/**
47 * Create a Kysely database instance
48 */
49export function createDb(config: ServiceConfig): Kysely<Database> {
50 const dialect = createDialect(config);
51 return new Kysely<Database>({ dialect });
52}
53
54/**
55 * Initialize database schema (create tables and indexes)
56 */
57export async function initializeSchema(db: Kysely<Database>, config: ServiceConfig): Promise<void> {
58 const isPostgres = config.databaseType === 'postgres';
59 const intType = isPostgres ? 'integer' : 'integer';
60 const bigintType = isPostgres ? 'bigint' : 'integer';
61
62 // user_authorizations table
63 await db.schema
64 .createTable('user_authorizations')
65 .ifNotExists()
66 .addColumn('user_did', 'text', (col) => col.primaryKey())
67 .addColumn('pds_url', 'text', (col) => col.notNull())
68 .addColumn('refresh_token', 'text', (col) => col.notNull())
69 .addColumn('dpop_private_key', 'text', (col) => col.notNull())
70 .addColumn('token_scope', 'text', (col) => col.notNull())
71 .addColumn('auth_type', 'text', (col) => col.notNull().defaultTo('oauth'))
72 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull())
73 .addColumn('updated_at', bigintType as 'integer', (col) => col.notNull())
74 .execute();
75
76 // Migrate: add auth_type column to existing user_authorizations table
77 try {
78 await db.schema
79 .alterTable('user_authorizations')
80 .addColumn('auth_type', 'text', (col) => col.notNull().defaultTo('oauth'))
81 .execute();
82 } catch {
83 // Column already exists — no-op
84 }
85
86 // oauth_states table
87 await db.schema
88 .createTable('oauth_states')
89 .ifNotExists()
90 .addColumn('state_key', 'text', (col) => col.primaryKey())
91 .addColumn('state_data', 'text', (col) => col.notNull())
92 .addColumn('expires_at', bigintType as 'integer', (col) => col.notNull())
93 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull())
94 .execute();
95
96 await db.schema
97 .createIndex('idx_oauth_states_expires_at')
98 .ifNotExists()
99 .on('oauth_states')
100 .column('expires_at')
101 .execute();
102
103 // drafts table
104 await db.schema
105 .createTable('drafts')
106 .ifNotExists()
107 .addColumn('uri', 'text', (col) => col.primaryKey())
108 .addColumn('user_did', 'text', (col) => col.notNull())
109 .addColumn('collection', 'text', (col) => col.notNull())
110 .addColumn('rkey', 'text', (col) => col.notNull())
111 .addColumn('record', 'text')
112 .addColumn('record_cid', 'text')
113 .addColumn('action', 'text', (col) => col.notNull())
114 .addColumn('status', 'text', (col) => col.notNull())
115 .addColumn('scheduled_at', bigintType as 'integer')
116 .addColumn('retry_count', intType as 'integer', (col) => col.notNull().defaultTo(0))
117 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull())
118 .addColumn('updated_at', bigintType as 'integer', (col) => col.notNull())
119 .addColumn('published_at', bigintType as 'integer')
120 .addColumn('failure_reason', 'text')
121 .addColumn('trigger_key_hash', 'text')
122 .addColumn('trigger_key_encrypted', 'text')
123 .addColumn('schedule_id', 'text')
124 .execute();
125
126 // Migrations: add trigger and schedule columns to existing drafts table
127 for (const col of ['trigger_key_hash', 'trigger_key_encrypted', 'schedule_id']) {
128 try {
129 await db.schema
130 .alterTable('drafts')
131 .addColumn(col, 'text')
132 .execute();
133 } catch {
134 // Column already exists — no-op
135 }
136 }
137
138 await db.schema
139 .createIndex('idx_drafts_scheduled_at_status')
140 .ifNotExists()
141 .on('drafts')
142 .columns(['scheduled_at', 'status'])
143 .execute();
144
145 await db.schema
146 .createIndex('idx_drafts_user_did_status')
147 .ifNotExists()
148 .on('drafts')
149 .columns(['user_did', 'status'])
150 .execute();
151
152 // Index for O(1) trigger key lookup
153 await db.schema
154 .createIndex('idx_drafts_trigger_key_hash')
155 .ifNotExists()
156 .unique()
157 .on('drafts')
158 .column('trigger_key_hash')
159 .execute();
160
161 // draft_blobs table
162 const blobDataType = isPostgres ? 'bytea' : 'blob';
163 const idColumnType = isPostgres ? 'serial' : 'integer';
164 await db.schema
165 .createTable('draft_blobs')
166 .ifNotExists()
167 .addColumn('id', idColumnType as 'integer', (col) =>
168 isPostgres ? col.primaryKey() : col.primaryKey().autoIncrement(),
169 )
170 .addColumn('user_did', 'text', (col) => col.notNull())
171 .addColumn('cid', 'text', (col) => col.notNull())
172 .addColumn('data', blobDataType as 'blob')
173 .addColumn('mime_type', 'text', (col) => col.notNull())
174 .addColumn('size', intType as 'integer', (col) => col.notNull())
175 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull())
176 .execute();
177
178 await db.schema
179 .createIndex('idx_draft_blobs_user_did_cid')
180 .ifNotExists()
181 .unique()
182 .on('draft_blobs')
183 .columns(['user_did', 'cid'])
184 .execute();
185
186 // schedules table
187 await db.schema
188 .createTable('schedules')
189 .ifNotExists()
190 .addColumn('id', 'text', (col) => col.primaryKey())
191 .addColumn('user_did', 'text', (col) => col.notNull())
192 .addColumn('collection', 'text', (col) => col.notNull())
193 .addColumn('record', 'text')
194 .addColumn('content_url', 'text')
195 .addColumn('recurrence_rule', 'text', (col) => col.notNull())
196 .addColumn('timezone', 'text', (col) => col.notNull())
197 .addColumn('status', 'text', (col) => col.notNull().defaultTo('active'))
198 .addColumn('fire_count', intType as 'integer', (col) => col.notNull().defaultTo(0))
199 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull())
200 .addColumn('updated_at', bigintType as 'integer', (col) => col.notNull())
201 .addColumn('last_fired_at', bigintType as 'integer')
202 .addColumn('next_draft_uri', 'text')
203 .execute();
204
205 await db.schema
206 .createIndex('idx_schedules_user_did_status')
207 .ifNotExists()
208 .on('schedules')
209 .columns(['user_did', 'status'])
210 .execute();
211
212 logger.info(`${config.databaseType} database schema initialized`);
213}