a database layer insipred by caqti and ecto
1type column_def = {
2 col_name : string;
3 col_type : string;
4 col_nullable : bool;
5 col_primary_key : bool;
6 col_unique : bool;
7 col_default : string option;
8 col_references : (string * string) option;
9}
10
11type index_def = {
12 idx_name : string option;
13 idx_table : string;
14 idx_columns : string list;
15 idx_unique : bool;
16}
17
18type operation =
19 | Create_table of { name : string; columns : column_def list }
20 | Drop_table of string
21 | Alter_table of { name : string; changes : alter_change list }
22 | Create_index of index_def
23 | Drop_index of string
24 | Execute of string
25
26and alter_change =
27 | Add_column of column_def
28 | Drop_column of string
29 | Rename_column of { from : string; to_ : string }
30 | Alter_column of {
31 name : string;
32 new_type : string option;
33 new_nullable : bool option;
34 }
35
36type t = {
37 version : int64;
38 name : string;
39 up : operation list;
40 down : operation list;
41}
42
43let column ?(nullable = true) ?(primary_key = false) ?(unique = false) ?default
44 ?references name sql_type =
45 {
46 col_name = name;
47 col_type = sql_type;
48 col_nullable = nullable;
49 col_primary_key = primary_key;
50 col_unique = unique;
51 col_default = default;
52 col_references = references;
53 }
54
55let create_table name columns = Create_table { name; columns }
56let drop_table name = Drop_table name
57let add_column col = Add_column col
58let drop_column name = Drop_column name
59let rename_column ~from ~to_ = Rename_column { from; to_ }
60let alter_table name changes = Alter_table { name; changes }
61
62let create_index ?(unique = false) ?name table columns =
63 Create_index
64 {
65 idx_name = name;
66 idx_table = table;
67 idx_columns = columns;
68 idx_unique = unique;
69 }
70
71let drop_index name = Drop_index name
72let execute sql = Execute sql
73
74let timestamps () =
75 [
76 column "inserted_at" "TIMESTAMPTZ" ~nullable:false ~default:"NOW()";
77 column "updated_at" "TIMESTAMPTZ" ~nullable:false ~default:"NOW()";
78 ]
79
80let migration ~version ~name ~up ~down = { version; name; up; down }
81
82let column_to_sql c =
83 let parts = [ c.col_name; c.col_type ] in
84 let parts = if c.col_nullable then parts else parts @ [ "NOT NULL" ] in
85 let parts = if c.col_primary_key then parts @ [ "PRIMARY KEY" ] else parts in
86 let parts = if c.col_unique then parts @ [ "UNIQUE" ] else parts in
87 let parts =
88 match c.col_default with
89 | Some d -> parts @ [ "DEFAULT " ^ d ]
90 | None -> parts
91 in
92 let parts =
93 match c.col_references with
94 | Some (table, col) ->
95 parts @ [ Printf.sprintf "REFERENCES %s(%s)" table col ]
96 | None -> parts
97 in
98 String.concat " " parts
99
100let operation_to_sql = function
101 | Create_table { name; columns } ->
102 let col_defs = List.map column_to_sql columns in
103 Printf.sprintf "CREATE TABLE %s (\n %s\n)" name
104 (String.concat ",\n " col_defs)
105 | Drop_table name -> Printf.sprintf "DROP TABLE %s" name
106 | Alter_table { name; changes } ->
107 let change_strs =
108 List.map
109 (function
110 | Add_column c -> Printf.sprintf "ADD COLUMN %s" (column_to_sql c)
111 | Drop_column col -> Printf.sprintf "DROP COLUMN %s" col
112 | Rename_column { from; to_ } ->
113 Printf.sprintf "RENAME COLUMN %s TO %s" from to_
114 | Alter_column { name = col; new_type; new_nullable = _ } -> (
115 match new_type with
116 | Some t -> Printf.sprintf "ALTER COLUMN %s TYPE %s" col t
117 | None -> ""))
118 changes
119 in
120 Printf.sprintf "ALTER TABLE %s %s" name (String.concat ", " change_strs)
121 | Create_index { idx_name; idx_table; idx_columns; idx_unique } ->
122 let name =
123 match idx_name with
124 | Some n -> n
125 | None ->
126 Printf.sprintf "%s_%s_index" idx_table
127 (String.concat "_" idx_columns)
128 in
129 let unique_str = if idx_unique then "UNIQUE " else "" in
130 Printf.sprintf "CREATE %sINDEX %s ON %s (%s)" unique_str name idx_table
131 (String.concat ", " idx_columns)
132 | Drop_index name -> Printf.sprintf "DROP INDEX %s" name
133 | Execute sql -> sql
134
135let schema_migrations_table =
136 Create_table
137 {
138 name = "schema_migrations";
139 columns =
140 [
141 column "version" "BIGINT" ~nullable:false ~primary_key:true;
142 column "name" "VARCHAR(255)" ~nullable:false;
143 column "inserted_at" "TIMESTAMPTZ" ~nullable:false ~default:"NOW()";
144 ];
145 }
146
147let create_schema_migrations_sql = operation_to_sql schema_migrations_table
148
149let insert_migration_sql =
150 "INSERT INTO schema_migrations (version, name) VALUES ($1, $2)"
151
152let delete_migration_sql = "DELETE FROM schema_migrations WHERE version = $1"
153
154let get_applied_versions_sql =
155 "SELECT version FROM schema_migrations ORDER BY version"
156
157let get_migration_status_sql =
158 "SELECT version, name, inserted_at FROM schema_migrations ORDER BY version"
159
160type migration_status = {
161 applied_versions : int64 list;
162 pending : t list;
163 last_applied : int64 option;
164}
165
166let sort_migrations migrations =
167 List.sort (fun m1 m2 -> Int64.compare m1.version m2.version) migrations
168
169let pending_migrations ~applied_versions migrations =
170 let is_applied v = List.mem v applied_versions in
171 migrations
172 |> List.filter (fun m -> not (is_applied m.version))
173 |> sort_migrations
174
175let generate_up_sql migration = List.map operation_to_sql migration.up
176let generate_down_sql migration = List.map operation_to_sql migration.down
177
178type migration_action =
179 | Migrate of { sql : string list; version : int64; name : string }
180 | Rollback of { sql : string list; version : int64 }
181 | CreateSchemaTable of string
182 | RecordMigration of { version : int64; name : string }
183 | RemoveMigration of int64
184
185let plan_migrate ~applied_versions ~target migrations =
186 let pending = pending_migrations ~applied_versions migrations in
187 let to_run =
188 match target with
189 | None -> pending
190 | Some v -> List.filter (fun m -> m.version <= v) pending
191 in
192 to_run
193 |> List.map (fun m ->
194 [
195 Migrate { sql = generate_up_sql m; version = m.version; name = m.name };
196 RecordMigration { version = m.version; name = m.name };
197 ])
198 |> List.flatten
199
200let plan_rollback ~applied_versions ~step migrations =
201 let applied_sorted =
202 List.sort (fun a b -> Int64.compare b a) applied_versions
203 in
204 let to_rollback =
205 match step with
206 | None -> ( match applied_sorted with v :: _ -> [ v ] | [] -> [])
207 | Some n ->
208 let rec take n lst =
209 match (n, lst) with
210 | 0, _ | _, [] -> []
211 | n, x :: xs -> x :: take (n - 1) xs
212 in
213 take n applied_sorted
214 in
215 to_rollback
216 |> List.filter_map (fun v ->
217 match List.find_opt (fun m -> m.version = v) migrations with
218 | Some m ->
219 Some
220 [
221 Rollback { sql = generate_down_sql m; version = m.version };
222 RemoveMigration m.version;
223 ]
224 | None -> None)
225 |> List.flatten
226
227let action_to_sql = function
228 | CreateSchemaTable sql -> [ sql ]
229 | Migrate { sql; _ } -> sql
230 | Rollback { sql; _ } -> sql
231 | RecordMigration { version; name } ->
232 [
233 Printf.sprintf
234 "INSERT INTO schema_migrations (version, name) VALUES (%Ld, '%s')"
235 version name;
236 ]
237 | RemoveMigration version ->
238 [
239 Printf.sprintf "DELETE FROM schema_migrations WHERE version = %Ld"
240 version;
241 ]
242
243let actions_to_sql actions = actions |> List.map action_to_sql |> List.flatten
244
245let format_status ~applied_versions ~migrations =
246 let pending = pending_migrations ~applied_versions migrations in
247 let buf = Buffer.create 256 in
248 Buffer.add_string buf "Applied migrations:\n";
249 applied_versions
250 |> List.iter (fun v ->
251 match List.find_opt (fun m -> m.version = v) migrations with
252 | Some m ->
253 Buffer.add_string buf (Printf.sprintf " [✓] %Ld: %s\n" v m.name)
254 | None ->
255 Buffer.add_string buf (Printf.sprintf " [✓] %Ld: (unknown)\n" v));
256 Buffer.add_string buf "\nPending migrations:\n";
257 pending
258 |> List.iter (fun m ->
259 Buffer.add_string buf (Printf.sprintf " [ ] %Ld: %s\n" m.version m.name));
260 Buffer.contents buf