a love letter to tangled (android, iOS, and a search API)
at main 275 lines 7.1 kB view raw
1package store 2 3import ( 4 "database/sql" 5 "embed" 6 "fmt" 7 "log/slog" 8 "sort" 9 "strings" 10 "time" 11 12 _ "github.com/jackc/pgx/v5/stdlib" 13 _ "modernc.org/sqlite" 14) 15 16//go:embed migrations/*.sql migrations_postgres/*.sql 17var migrationsFS embed.FS 18 19type Backend string 20 21const ( 22 BackendPostgres Backend = "postgres" 23 BackendSQLite Backend = "sqlite" 24) 25 26type migrationMode struct { 27 backend Backend 28 targetDescription string 29} 30 31// DetectBackend returns the configured database backend for the given URL. 32func DetectBackend(url string) Backend { 33 if strings.HasPrefix(url, "file:") { 34 return BackendSQLite 35 } 36 return BackendPostgres 37} 38 39// Open establishes a connection to the database. 40func Open(url string) (*sql.DB, error) { 41 driver, dsn := driverAndDSN(url) 42 db, err := sql.Open(driver, dsn) 43 if err != nil { 44 return nil, fmt.Errorf("open db: %w", err) 45 } 46 switch DetectBackend(url) { 47 case BackendSQLite: 48 if err := configureLocalSQLite(db); err != nil { 49 db.Close() 50 return nil, err 51 } 52 case BackendPostgres: 53 configurePostgresPool(db) 54 } 55 if err := db.Ping(); err != nil { 56 db.Close() 57 return nil, fmt.Errorf("ping db: %w", err) 58 } 59 return db, nil 60} 61 62func configureLocalSQLite(db *sql.DB) error { 63 if _, err := db.Exec(`PRAGMA busy_timeout = 5000`); err != nil { 64 return fmt.Errorf("configure sqlite busy_timeout: %w", err) 65 } 66 if _, err := db.Exec(`PRAGMA journal_mode = WAL`); err != nil { 67 return fmt.Errorf("configure sqlite wal mode: %w", err) 68 } 69 if _, err := db.Exec(`PRAGMA synchronous = NORMAL`); err != nil { 70 return fmt.Errorf("configure sqlite synchronous mode: %w", err) 71 } 72 73 db.SetMaxOpenConns(1) 74 db.SetMaxIdleConns(1) 75 db.SetConnMaxLifetime(0) 76 db.SetConnMaxIdleTime(5 * time.Minute) 77 return nil 78} 79 80func configurePostgresPool(db *sql.DB) { 81 db.SetMaxOpenConns(10) 82 db.SetMaxIdleConns(10) 83 db.SetConnMaxLifetime(30 * time.Minute) 84 db.SetConnMaxIdleTime(5 * time.Minute) 85} 86 87func driverAndDSN(url string) (driver, dsn string) { 88 if strings.HasPrefix(url, "file:") { 89 return "sqlite", strings.TrimPrefix(url, "file:") 90 } 91 return "pgx", url 92} 93 94// Migrate runs embedded SQL migrations for the selected backend. 95func Migrate(db *sql.DB, url string) error { 96 switch DetectBackend(url) { 97 case BackendSQLite: 98 return migrateSQLite(db, url) 99 default: 100 return migratePostgres(db) 101 } 102} 103 104func migrateSQLite(db *sql.DB, url string) error { 105 if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS schema_migrations ( 106 filename TEXT PRIMARY KEY, 107 applied_at TEXT NOT NULL 108 )`); err != nil { 109 return fmt.Errorf("create schema_migrations table: %w", err) 110 } 111 112 if err := backfillSQLiteMigrationHistory(db); err != nil { 113 return fmt.Errorf("backfill migration history: %w", err) 114 } 115 116 mode := migrationMode{ 117 backend: BackendSQLite, 118 targetDescription: migrationTargetDescription(url), 119 } 120 return runMigrations(db, "migrations", "?", mode) 121} 122 123func migratePostgres(db *sql.DB) error { 124 if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS schema_migrations ( 125 filename TEXT PRIMARY KEY, 126 applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 127 )`); err != nil { 128 return fmt.Errorf("create schema_migrations table: %w", err) 129 } 130 131 mode := migrationMode{ 132 backend: BackendPostgres, 133 targetDescription: "postgresql", 134 } 135 return runMigrations(db, "migrations_postgres", "$", mode) 136} 137 138func runMigrations(db *sql.DB, dir, placeholderPrefix string, mode migrationMode) error { 139 entries, err := migrationsFS.ReadDir(dir) 140 if err != nil { 141 return fmt.Errorf("read migrations dir: %w", err) 142 } 143 sort.Slice(entries, func(i, j int) bool { 144 return entries[i].Name() < entries[j].Name() 145 }) 146 for _, entry := range entries { 147 if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".sql") { 148 continue 149 } 150 151 var already int 152 query := `SELECT COUNT(*) FROM schema_migrations WHERE filename = ?` 153 args := []any{entry.Name()} 154 if placeholderPrefix == "$" { 155 query = `SELECT COUNT(*) FROM schema_migrations WHERE filename = $1` 156 } 157 if err := db.QueryRow(query, args...).Scan(&already); err != nil { 158 return fmt.Errorf("check migration %s: %w", entry.Name(), err) 159 } 160 if already > 0 { 161 slog.Debug("migration already applied, skipping", "file", entry.Name()) 162 continue 163 } 164 165 data, err := migrationsFS.ReadFile(dir + "/" + entry.Name()) 166 if err != nil { 167 return fmt.Errorf("read migration %s: %w", entry.Name(), err) 168 } 169 if err := execMigration(db, entry.Name(), string(data), mode); err != nil { 170 return err 171 } 172 173 insert := `INSERT INTO schema_migrations (filename, applied_at) VALUES (?, datetime('now'))` 174 if placeholderPrefix == "$" { 175 insert = `INSERT INTO schema_migrations (filename) VALUES ($1)` 176 } 177 if _, err := db.Exec(insert, entry.Name()); err != nil { 178 return fmt.Errorf("record migration %s: %w", entry.Name(), err) 179 } 180 slog.Info("migration applied", "file", entry.Name()) 181 } 182 return nil 183} 184 185func backfillSQLiteMigrationHistory(db *sql.DB) error { 186 var count int 187 if err := db.QueryRow(`SELECT COUNT(*) FROM schema_migrations`).Scan(&count); err != nil || count > 0 { 188 return nil 189 } 190 191 if !sqliteTableExists(db, "documents") { 192 return nil 193 } 194 195 mark := func(filename string) { 196 _, _ = db.Exec( 197 `INSERT OR IGNORE INTO schema_migrations (filename, applied_at) VALUES (?, datetime('now'))`, 198 filename, 199 ) 200 } 201 202 mark("001_initial.sql") 203 204 if sqliteTableExists(db, "identity_handles") { 205 mark("002_identity_handles.sql") 206 } 207 if sqliteTableExists(db, "documents_fts") { 208 mark("003_documents_fts5.sql") 209 } 210 if sqliteColumnExists(db, "documents", "web_url") { 211 mark("004_web_url.sql") 212 } 213 return nil 214} 215 216func sqliteTableExists(db *sql.DB, table string) bool { 217 var n int 218 _ = db.QueryRow( 219 `SELECT COUNT(*) FROM sqlite_master WHERE type IN ('table','view') AND name = ?`, table, 220 ).Scan(&n) 221 return n > 0 222} 223 224func sqliteColumnExists(db *sql.DB, table, column string) bool { 225 var n int 226 _ = db.QueryRow( 227 `SELECT COUNT(*) FROM pragma_table_info(?) WHERE name = ?`, table, column, 228 ).Scan(&n) 229 return n > 0 230} 231 232func execMigration(db *sql.DB, name, content string, mode migrationMode) error { 233 for _, stmt := range splitStatements(content) { 234 if _, err := db.Exec(stmt); err != nil { 235 if mode.backend == BackendSQLite { 236 upper := strings.ToUpper(stmt) 237 if strings.Contains(upper, "LIBSQL_VECTOR_IDX") { 238 slog.Debug("migration: skipping unsupported vector index DDL", 239 "migration", name, 240 ) 241 continue 242 } 243 if strings.Contains(upper, "CREATE VIRTUAL TABLE") && 244 strings.Contains(upper, "USING FTS5") { 245 return fmt.Errorf( 246 "migration %s: SQLite FTS5 statement failed on %s: %w\nstatement: %s", 247 name, mode.targetDescription, err, stmt, 248 ) 249 } 250 } 251 return fmt.Errorf("migration %s: exec failed: %w\nstatement: %s", name, err, stmt) 252 } 253 } 254 return nil 255} 256 257func migrationTargetDescription(url string) string { 258 switch DetectBackend(url) { 259 case BackendSQLite: 260 return "local SQLite" 261 default: 262 return "postgresql" 263 } 264} 265 266func splitStatements(content string) []string { 267 var stmts []string 268 for _, s := range strings.Split(content, ";") { 269 s = strings.TrimSpace(s) 270 if s != "" { 271 stmts = append(stmts, s) 272 } 273 } 274 return stmts 275}