alf: the atproto Latency Fabric alf.fly.dev/
at main 213 lines 7.2 kB view raw
1// ABOUTME: Kysely database configuration and schema initialization for ALF (Atproto Latency Fabric) 2 3import { Kysely, SqliteDialect, PostgresDialect, Dialect } from 'kysely'; 4import SQLite from 'better-sqlite3'; 5import { Pool as PgPool } from 'pg'; 6import type { Database } from './schema.js'; 7import type { ServiceConfig } from './config.js'; 8import { createLogger } from './logger.js'; 9import path from 'path'; 10import fs from 'fs'; 11 12const logger = createLogger('Database'); 13 14/** 15 * Create a Kysely dialect based on the configuration 16 */ 17export function createDialect(config: ServiceConfig): Dialect { 18 if (config.databaseType === 'postgres') { 19 if (!config.databaseUrl) { 20 throw new Error('DATABASE_URL is required for PostgreSQL'); 21 } 22 23 const pool = new PgPool({ 24 connectionString: config.databaseUrl, 25 max: 10, 26 }); 27 28 return new PostgresDialect({ pool }); 29 } 30 31 // SQLite (default) 32 const dbFile = config.databasePath; 33 34 if (dbFile !== ':memory:') { 35 const dbDir = path.dirname(dbFile); 36 if (!fs.existsSync(dbDir)) { 37 fs.mkdirSync(dbDir, { recursive: true }); 38 } 39 } 40 41 return new SqliteDialect({ 42 database: new SQLite(dbFile), 43 }); 44} 45 46/** 47 * Create a Kysely database instance 48 */ 49export function createDb(config: ServiceConfig): Kysely<Database> { 50 const dialect = createDialect(config); 51 return new Kysely<Database>({ dialect }); 52} 53 54/** 55 * Initialize database schema (create tables and indexes) 56 */ 57export async function initializeSchema(db: Kysely<Database>, config: ServiceConfig): Promise<void> { 58 const isPostgres = config.databaseType === 'postgres'; 59 const intType = isPostgres ? 'integer' : 'integer'; 60 const bigintType = isPostgres ? 'bigint' : 'integer'; 61 62 // user_authorizations table 63 await db.schema 64 .createTable('user_authorizations') 65 .ifNotExists() 66 .addColumn('user_did', 'text', (col) => col.primaryKey()) 67 .addColumn('pds_url', 'text', (col) => col.notNull()) 68 .addColumn('refresh_token', 'text', (col) => col.notNull()) 69 .addColumn('dpop_private_key', 'text', (col) => col.notNull()) 70 .addColumn('token_scope', 'text', (col) => col.notNull()) 71 .addColumn('auth_type', 'text', (col) => col.notNull().defaultTo('oauth')) 72 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull()) 73 .addColumn('updated_at', bigintType as 'integer', (col) => col.notNull()) 74 .execute(); 75 76 // Migrate: add auth_type column to existing user_authorizations table 77 try { 78 await db.schema 79 .alterTable('user_authorizations') 80 .addColumn('auth_type', 'text', (col) => col.notNull().defaultTo('oauth')) 81 .execute(); 82 } catch { 83 // Column already exists — no-op 84 } 85 86 // oauth_states table 87 await db.schema 88 .createTable('oauth_states') 89 .ifNotExists() 90 .addColumn('state_key', 'text', (col) => col.primaryKey()) 91 .addColumn('state_data', 'text', (col) => col.notNull()) 92 .addColumn('expires_at', bigintType as 'integer', (col) => col.notNull()) 93 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull()) 94 .execute(); 95 96 await db.schema 97 .createIndex('idx_oauth_states_expires_at') 98 .ifNotExists() 99 .on('oauth_states') 100 .column('expires_at') 101 .execute(); 102 103 // drafts table 104 await db.schema 105 .createTable('drafts') 106 .ifNotExists() 107 .addColumn('uri', 'text', (col) => col.primaryKey()) 108 .addColumn('user_did', 'text', (col) => col.notNull()) 109 .addColumn('collection', 'text', (col) => col.notNull()) 110 .addColumn('rkey', 'text', (col) => col.notNull()) 111 .addColumn('record', 'text') 112 .addColumn('record_cid', 'text') 113 .addColumn('action', 'text', (col) => col.notNull()) 114 .addColumn('status', 'text', (col) => col.notNull()) 115 .addColumn('scheduled_at', bigintType as 'integer') 116 .addColumn('retry_count', intType as 'integer', (col) => col.notNull().defaultTo(0)) 117 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull()) 118 .addColumn('updated_at', bigintType as 'integer', (col) => col.notNull()) 119 .addColumn('published_at', bigintType as 'integer') 120 .addColumn('failure_reason', 'text') 121 .addColumn('trigger_key_hash', 'text') 122 .addColumn('trigger_key_encrypted', 'text') 123 .addColumn('schedule_id', 'text') 124 .execute(); 125 126 // Migrations: add trigger and schedule columns to existing drafts table 127 for (const col of ['trigger_key_hash', 'trigger_key_encrypted', 'schedule_id']) { 128 try { 129 await db.schema 130 .alterTable('drafts') 131 .addColumn(col, 'text') 132 .execute(); 133 } catch { 134 // Column already exists — no-op 135 } 136 } 137 138 await db.schema 139 .createIndex('idx_drafts_scheduled_at_status') 140 .ifNotExists() 141 .on('drafts') 142 .columns(['scheduled_at', 'status']) 143 .execute(); 144 145 await db.schema 146 .createIndex('idx_drafts_user_did_status') 147 .ifNotExists() 148 .on('drafts') 149 .columns(['user_did', 'status']) 150 .execute(); 151 152 // Index for O(1) trigger key lookup 153 await db.schema 154 .createIndex('idx_drafts_trigger_key_hash') 155 .ifNotExists() 156 .unique() 157 .on('drafts') 158 .column('trigger_key_hash') 159 .execute(); 160 161 // draft_blobs table 162 const blobDataType = isPostgres ? 'bytea' : 'blob'; 163 const idColumnType = isPostgres ? 'serial' : 'integer'; 164 await db.schema 165 .createTable('draft_blobs') 166 .ifNotExists() 167 .addColumn('id', idColumnType as 'integer', (col) => 168 isPostgres ? col.primaryKey() : col.primaryKey().autoIncrement(), 169 ) 170 .addColumn('user_did', 'text', (col) => col.notNull()) 171 .addColumn('cid', 'text', (col) => col.notNull()) 172 .addColumn('data', blobDataType as 'blob') 173 .addColumn('mime_type', 'text', (col) => col.notNull()) 174 .addColumn('size', intType as 'integer', (col) => col.notNull()) 175 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull()) 176 .execute(); 177 178 await db.schema 179 .createIndex('idx_draft_blobs_user_did_cid') 180 .ifNotExists() 181 .unique() 182 .on('draft_blobs') 183 .columns(['user_did', 'cid']) 184 .execute(); 185 186 // schedules table 187 await db.schema 188 .createTable('schedules') 189 .ifNotExists() 190 .addColumn('id', 'text', (col) => col.primaryKey()) 191 .addColumn('user_did', 'text', (col) => col.notNull()) 192 .addColumn('collection', 'text', (col) => col.notNull()) 193 .addColumn('record', 'text') 194 .addColumn('content_url', 'text') 195 .addColumn('recurrence_rule', 'text', (col) => col.notNull()) 196 .addColumn('timezone', 'text', (col) => col.notNull()) 197 .addColumn('status', 'text', (col) => col.notNull().defaultTo('active')) 198 .addColumn('fire_count', intType as 'integer', (col) => col.notNull().defaultTo(0)) 199 .addColumn('created_at', bigintType as 'integer', (col) => col.notNull()) 200 .addColumn('updated_at', bigintType as 'integer', (col) => col.notNull()) 201 .addColumn('last_fired_at', bigintType as 'integer') 202 .addColumn('next_draft_uri', 'text') 203 .execute(); 204 205 await db.schema 206 .createIndex('idx_schedules_user_did_status') 207 .ifNotExists() 208 .on('schedules') 209 .columns(['user_did', 'status']) 210 .execute(); 211 212 logger.info(`${config.databaseType} database schema initialized`); 213}