a database layer insipred by caqti and ecto
at main 8.2 kB view raw
1type column_def = { 2 col_name : string; 3 col_type : string; 4 col_nullable : bool; 5 col_primary_key : bool; 6 col_unique : bool; 7 col_default : string option; 8 col_references : (string * string) option; 9} 10 11type index_def = { 12 idx_name : string option; 13 idx_table : string; 14 idx_columns : string list; 15 idx_unique : bool; 16} 17 18type operation = 19 | Create_table of { name : string; columns : column_def list } 20 | Drop_table of string 21 | Alter_table of { name : string; changes : alter_change list } 22 | Create_index of index_def 23 | Drop_index of string 24 | Execute of string 25 26and alter_change = 27 | Add_column of column_def 28 | Drop_column of string 29 | Rename_column of { from : string; to_ : string } 30 | Alter_column of { 31 name : string; 32 new_type : string option; 33 new_nullable : bool option; 34 } 35 36type t = { 37 version : int64; 38 name : string; 39 up : operation list; 40 down : operation list; 41} 42 43let column ?(nullable = true) ?(primary_key = false) ?(unique = false) ?default 44 ?references name sql_type = 45 { 46 col_name = name; 47 col_type = sql_type; 48 col_nullable = nullable; 49 col_primary_key = primary_key; 50 col_unique = unique; 51 col_default = default; 52 col_references = references; 53 } 54 55let create_table name columns = Create_table { name; columns } 56let drop_table name = Drop_table name 57let add_column col = Add_column col 58let drop_column name = Drop_column name 59let rename_column ~from ~to_ = Rename_column { from; to_ } 60let alter_table name changes = Alter_table { name; changes } 61 62let create_index ?(unique = false) ?name table columns = 63 Create_index 64 { 65 idx_name = name; 66 idx_table = table; 67 idx_columns = columns; 68 idx_unique = unique; 69 } 70 71let drop_index name = Drop_index name 72let execute sql = Execute sql 73 74let timestamps () = 75 [ 76 column "inserted_at" "TIMESTAMPTZ" ~nullable:false ~default:"NOW()"; 77 column "updated_at" "TIMESTAMPTZ" ~nullable:false ~default:"NOW()"; 78 ] 79 80let migration ~version ~name ~up ~down = { version; name; up; down } 81 82let column_to_sql c = 83 let parts = [ c.col_name; c.col_type ] in 84 let parts = if c.col_nullable then parts else parts @ [ "NOT NULL" ] in 85 let parts = if c.col_primary_key then parts @ [ "PRIMARY KEY" ] else parts in 86 let parts = if c.col_unique then parts @ [ "UNIQUE" ] else parts in 87 let parts = 88 match c.col_default with 89 | Some d -> parts @ [ "DEFAULT " ^ d ] 90 | None -> parts 91 in 92 let parts = 93 match c.col_references with 94 | Some (table, col) -> 95 parts @ [ Printf.sprintf "REFERENCES %s(%s)" table col ] 96 | None -> parts 97 in 98 String.concat " " parts 99 100let operation_to_sql = function 101 | Create_table { name; columns } -> 102 let col_defs = List.map column_to_sql columns in 103 Printf.sprintf "CREATE TABLE %s (\n %s\n)" name 104 (String.concat ",\n " col_defs) 105 | Drop_table name -> Printf.sprintf "DROP TABLE %s" name 106 | Alter_table { name; changes } -> 107 let change_strs = 108 List.map 109 (function 110 | Add_column c -> Printf.sprintf "ADD COLUMN %s" (column_to_sql c) 111 | Drop_column col -> Printf.sprintf "DROP COLUMN %s" col 112 | Rename_column { from; to_ } -> 113 Printf.sprintf "RENAME COLUMN %s TO %s" from to_ 114 | Alter_column { name = col; new_type; new_nullable = _ } -> ( 115 match new_type with 116 | Some t -> Printf.sprintf "ALTER COLUMN %s TYPE %s" col t 117 | None -> "")) 118 changes 119 in 120 Printf.sprintf "ALTER TABLE %s %s" name (String.concat ", " change_strs) 121 | Create_index { idx_name; idx_table; idx_columns; idx_unique } -> 122 let name = 123 match idx_name with 124 | Some n -> n 125 | None -> 126 Printf.sprintf "%s_%s_index" idx_table 127 (String.concat "_" idx_columns) 128 in 129 let unique_str = if idx_unique then "UNIQUE " else "" in 130 Printf.sprintf "CREATE %sINDEX %s ON %s (%s)" unique_str name idx_table 131 (String.concat ", " idx_columns) 132 | Drop_index name -> Printf.sprintf "DROP INDEX %s" name 133 | Execute sql -> sql 134 135let schema_migrations_table = 136 Create_table 137 { 138 name = "schema_migrations"; 139 columns = 140 [ 141 column "version" "BIGINT" ~nullable:false ~primary_key:true; 142 column "name" "VARCHAR(255)" ~nullable:false; 143 column "inserted_at" "TIMESTAMPTZ" ~nullable:false ~default:"NOW()"; 144 ]; 145 } 146 147let create_schema_migrations_sql = operation_to_sql schema_migrations_table 148 149let insert_migration_sql = 150 "INSERT INTO schema_migrations (version, name) VALUES ($1, $2)" 151 152let delete_migration_sql = "DELETE FROM schema_migrations WHERE version = $1" 153 154let get_applied_versions_sql = 155 "SELECT version FROM schema_migrations ORDER BY version" 156 157let get_migration_status_sql = 158 "SELECT version, name, inserted_at FROM schema_migrations ORDER BY version" 159 160type migration_status = { 161 applied_versions : int64 list; 162 pending : t list; 163 last_applied : int64 option; 164} 165 166let sort_migrations migrations = 167 List.sort (fun m1 m2 -> Int64.compare m1.version m2.version) migrations 168 169let pending_migrations ~applied_versions migrations = 170 let is_applied v = List.mem v applied_versions in 171 migrations 172 |> List.filter (fun m -> not (is_applied m.version)) 173 |> sort_migrations 174 175let generate_up_sql migration = List.map operation_to_sql migration.up 176let generate_down_sql migration = List.map operation_to_sql migration.down 177 178type migration_action = 179 | Migrate of { sql : string list; version : int64; name : string } 180 | Rollback of { sql : string list; version : int64 } 181 | CreateSchemaTable of string 182 | RecordMigration of { version : int64; name : string } 183 | RemoveMigration of int64 184 185let plan_migrate ~applied_versions ~target migrations = 186 let pending = pending_migrations ~applied_versions migrations in 187 let to_run = 188 match target with 189 | None -> pending 190 | Some v -> List.filter (fun m -> m.version <= v) pending 191 in 192 to_run 193 |> List.map (fun m -> 194 [ 195 Migrate { sql = generate_up_sql m; version = m.version; name = m.name }; 196 RecordMigration { version = m.version; name = m.name }; 197 ]) 198 |> List.flatten 199 200let plan_rollback ~applied_versions ~step migrations = 201 let applied_sorted = 202 List.sort (fun a b -> Int64.compare b a) applied_versions 203 in 204 let to_rollback = 205 match step with 206 | None -> ( match applied_sorted with v :: _ -> [ v ] | [] -> []) 207 | Some n -> 208 let rec take n lst = 209 match (n, lst) with 210 | 0, _ | _, [] -> [] 211 | n, x :: xs -> x :: take (n - 1) xs 212 in 213 take n applied_sorted 214 in 215 to_rollback 216 |> List.filter_map (fun v -> 217 match List.find_opt (fun m -> m.version = v) migrations with 218 | Some m -> 219 Some 220 [ 221 Rollback { sql = generate_down_sql m; version = m.version }; 222 RemoveMigration m.version; 223 ] 224 | None -> None) 225 |> List.flatten 226 227let action_to_sql = function 228 | CreateSchemaTable sql -> [ sql ] 229 | Migrate { sql; _ } -> sql 230 | Rollback { sql; _ } -> sql 231 | RecordMigration { version; name } -> 232 [ 233 Printf.sprintf 234 "INSERT INTO schema_migrations (version, name) VALUES (%Ld, '%s')" 235 version name; 236 ] 237 | RemoveMigration version -> 238 [ 239 Printf.sprintf "DELETE FROM schema_migrations WHERE version = %Ld" 240 version; 241 ] 242 243let actions_to_sql actions = actions |> List.map action_to_sql |> List.flatten 244 245let format_status ~applied_versions ~migrations = 246 let pending = pending_migrations ~applied_versions migrations in 247 let buf = Buffer.create 256 in 248 Buffer.add_string buf "Applied migrations:\n"; 249 applied_versions 250 |> List.iter (fun v -> 251 match List.find_opt (fun m -> m.version = v) migrations with 252 | Some m -> 253 Buffer.add_string buf (Printf.sprintf " [✓] %Ld: %s\n" v m.name) 254 | None -> 255 Buffer.add_string buf (Printf.sprintf " [✓] %Ld: (unknown)\n" v)); 256 Buffer.add_string buf "\nPending migrations:\n"; 257 pending 258 |> List.iter (fun m -> 259 Buffer.add_string buf (Printf.sprintf " [ ] %Ld: %s\n" m.version m.name)); 260 Buffer.contents buf