prefect server in zig
1const std = @import("std");
2const backend = @import("../backend.zig");
3const log = @import("../../logging.zig");
4
5/// Initialize PostgreSQL schema
6pub fn init() !void {
7 // flow table
8 // NOTE: Using TEXT for timestamp columns to match SQLite and simplify reads
9 try backend.db.exec(
10 \\CREATE TABLE IF NOT EXISTS flow (
11 \\ id TEXT PRIMARY KEY,
12 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
13 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
14 \\ name TEXT NOT NULL UNIQUE,
15 \\ tags JSONB DEFAULT '[]'
16 \\)
17 , .{});
18
19 // flow_run table
20 // NOTE: Using TEXT for timestamp columns to match SQLite and simplify reads
21 // (pg.zig returns native TIMESTAMP as i64 microseconds, not text)
22 try backend.db.exec(
23 \\CREATE TABLE IF NOT EXISTS flow_run (
24 \\ id TEXT PRIMARY KEY,
25 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
26 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
27 \\ flow_id TEXT REFERENCES flow(id),
28 \\ name TEXT NOT NULL,
29 \\ parameters JSONB DEFAULT '{}',
30 \\ tags JSONB DEFAULT '[]',
31 \\ state_id TEXT,
32 \\ state_type TEXT,
33 \\ state_name TEXT,
34 \\ state_timestamp TEXT,
35 \\ run_count BIGINT DEFAULT 0,
36 \\ expected_start_time TEXT,
37 \\ next_scheduled_start_time TEXT,
38 \\ start_time TEXT,
39 \\ end_time TEXT,
40 \\ total_run_time DOUBLE PRECISION DEFAULT 0.0,
41 \\ deployment_id TEXT,
42 \\ deployment_version TEXT,
43 \\ work_queue_name TEXT,
44 \\ work_queue_id TEXT,
45 \\ auto_scheduled INTEGER DEFAULT 0,
46 \\ idempotency_key TEXT,
47 \\ empirical_policy TEXT DEFAULT '{}',
48 \\ state_transition_id TEXT
49 \\)
50 , .{});
51
52 // flow_run_state table
53 try backend.db.exec(
54 \\CREATE TABLE IF NOT EXISTS flow_run_state (
55 \\ id TEXT PRIMARY KEY,
56 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
57 \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE,
58 \\ type TEXT NOT NULL,
59 \\ name TEXT NOT NULL,
60 \\ timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"')
61 \\)
62 , .{});
63
64 // task_run table
65 try backend.db.exec(
66 \\CREATE TABLE IF NOT EXISTS task_run (
67 \\ id TEXT PRIMARY KEY,
68 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
69 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
70 \\ flow_run_id TEXT REFERENCES flow_run(id),
71 \\ name TEXT NOT NULL,
72 \\ task_key TEXT NOT NULL,
73 \\ dynamic_key TEXT NOT NULL,
74 \\ cache_key TEXT,
75 \\ tags JSONB DEFAULT '[]',
76 \\ state_id TEXT,
77 \\ state_type TEXT,
78 \\ state_name TEXT,
79 \\ state_timestamp TEXT,
80 \\ run_count BIGINT DEFAULT 0,
81 \\ expected_start_time TEXT,
82 \\ start_time TEXT,
83 \\ end_time TEXT,
84 \\ total_run_time DOUBLE PRECISION DEFAULT 0.0
85 \\)
86 , .{});
87
88 // task_run_state table
89 try backend.db.exec(
90 \\CREATE TABLE IF NOT EXISTS task_run_state (
91 \\ id TEXT PRIMARY KEY,
92 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
93 \\ task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE,
94 \\ type TEXT NOT NULL,
95 \\ name TEXT NOT NULL,
96 \\ timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"')
97 \\)
98 , .{});
99
100 // events table
101 try backend.db.exec(
102 \\CREATE TABLE IF NOT EXISTS events (
103 \\ id TEXT PRIMARY KEY,
104 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
105 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
106 \\ occurred TEXT NOT NULL,
107 \\ event TEXT NOT NULL,
108 \\ resource_id TEXT NOT NULL,
109 \\ resource JSONB NOT NULL DEFAULT '{}',
110 \\ related_resource_ids JSONB DEFAULT '[]',
111 \\ related JSONB DEFAULT '[]',
112 \\ payload JSONB DEFAULT '{}',
113 \\ received TEXT NOT NULL,
114 \\ recorded TEXT NOT NULL,
115 \\ follows TEXT
116 \\)
117 , .{});
118
119 // block_type table
120 // NOTE: Using INTEGER for is_protected to match SQLite
121 try backend.db.exec(
122 \\CREATE TABLE IF NOT EXISTS block_type (
123 \\ id TEXT PRIMARY KEY,
124 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
125 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
126 \\ name TEXT NOT NULL,
127 \\ slug TEXT NOT NULL UNIQUE,
128 \\ logo_url TEXT,
129 \\ documentation_url TEXT,
130 \\ description TEXT,
131 \\ code_example TEXT,
132 \\ is_protected INTEGER DEFAULT 0
133 \\)
134 , .{});
135
136 // block_schema table
137 try backend.db.exec(
138 \\CREATE TABLE IF NOT EXISTS block_schema (
139 \\ id TEXT PRIMARY KEY,
140 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
141 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
142 \\ checksum TEXT NOT NULL,
143 \\ fields JSONB NOT NULL DEFAULT '{}',
144 \\ capabilities JSONB NOT NULL DEFAULT '[]',
145 \\ version TEXT NOT NULL DEFAULT '1',
146 \\ block_type_id TEXT NOT NULL REFERENCES block_type(id) ON DELETE CASCADE,
147 \\ UNIQUE(checksum, version)
148 \\)
149 , .{});
150
151 // block_document table
152 // NOTE: Using INTEGER for is_anonymous to match SQLite
153 try backend.db.exec(
154 \\CREATE TABLE IF NOT EXISTS block_document (
155 \\ id TEXT PRIMARY KEY,
156 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
157 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
158 \\ name TEXT,
159 \\ data JSONB NOT NULL DEFAULT '{}',
160 \\ is_anonymous INTEGER DEFAULT 0,
161 \\ block_type_id TEXT NOT NULL REFERENCES block_type(id),
162 \\ block_type_name TEXT,
163 \\ block_schema_id TEXT NOT NULL REFERENCES block_schema(id),
164 \\ UNIQUE(block_type_id, name)
165 \\)
166 , .{});
167
168 // variable table
169 try backend.db.exec(
170 \\CREATE TABLE IF NOT EXISTS variable (
171 \\ id TEXT PRIMARY KEY,
172 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
173 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'),
174 \\ name TEXT NOT NULL UNIQUE,
175 \\ value JSONB DEFAULT 'null',
176 \\ tags JSONB DEFAULT '[]'
177 \\)
178 , .{});
179
180 // work_pool table
181 // NOTE: Using INTEGER for is_paused to match SQLite
182 try backend.db.exec(
183 \\CREATE TABLE IF NOT EXISTS work_pool (
184 \\ id TEXT PRIMARY KEY,
185 \\ created TEXT NOT NULL,
186 \\ updated TEXT NOT NULL,
187 \\ name TEXT NOT NULL UNIQUE,
188 \\ description TEXT,
189 \\ type TEXT NOT NULL DEFAULT 'process',
190 \\ base_job_template JSONB DEFAULT '{}',
191 \\ is_paused INTEGER DEFAULT 0,
192 \\ concurrency_limit BIGINT,
193 \\ default_queue_id TEXT,
194 \\ status TEXT DEFAULT 'NOT_READY'
195 \\)
196 , .{});
197
198 // work_queue table
199 // NOTE: Using INTEGER for is_paused to match SQLite
200 try backend.db.exec(
201 \\CREATE TABLE IF NOT EXISTS work_queue (
202 \\ id TEXT PRIMARY KEY,
203 \\ created TEXT NOT NULL,
204 \\ updated TEXT NOT NULL,
205 \\ name TEXT NOT NULL,
206 \\ description TEXT DEFAULT '',
207 \\ is_paused INTEGER DEFAULT 0,
208 \\ concurrency_limit BIGINT,
209 \\ priority BIGINT DEFAULT 1,
210 \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE,
211 \\ last_polled TEXT,
212 \\ status TEXT DEFAULT 'NOT_READY',
213 \\ UNIQUE(work_pool_id, name)
214 \\)
215 , .{});
216
217 // worker table
218 try backend.db.exec(
219 \\CREATE TABLE IF NOT EXISTS worker (
220 \\ id TEXT PRIMARY KEY,
221 \\ created TEXT NOT NULL,
222 \\ updated TEXT NOT NULL,
223 \\ name TEXT NOT NULL,
224 \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE,
225 \\ last_heartbeat_time TEXT,
226 \\ heartbeat_interval_seconds BIGINT,
227 \\ status TEXT DEFAULT 'OFFLINE',
228 \\ UNIQUE(work_pool_id, name)
229 \\)
230 , .{});
231
232 // deployment table
233 try backend.db.exec(
234 \\CREATE TABLE IF NOT EXISTS deployment (
235 \\ id TEXT PRIMARY KEY,
236 \\ created TEXT NOT NULL,
237 \\ updated TEXT NOT NULL,
238 \\ name TEXT NOT NULL,
239 \\ flow_id TEXT NOT NULL REFERENCES flow(id) ON DELETE CASCADE,
240 \\ version TEXT,
241 \\ description TEXT,
242 \\ paused INTEGER DEFAULT 0,
243 \\ status TEXT DEFAULT 'NOT_READY',
244 \\ last_polled TEXT,
245 \\ parameters JSONB DEFAULT '{}',
246 \\ parameter_openapi_schema TEXT,
247 \\ enforce_parameter_schema INTEGER DEFAULT 1,
248 \\ tags JSONB DEFAULT '[]',
249 \\ labels JSONB DEFAULT '{}',
250 \\ path TEXT,
251 \\ entrypoint TEXT,
252 \\ job_variables JSONB DEFAULT '{}',
253 \\ pull_steps TEXT,
254 \\ work_pool_name TEXT,
255 \\ work_queue_name TEXT,
256 \\ work_queue_id TEXT REFERENCES work_queue(id) ON DELETE SET NULL,
257 \\ storage_document_id TEXT,
258 \\ infrastructure_document_id TEXT,
259 \\ concurrency_limit BIGINT,
260 \\ UNIQUE(flow_id, name)
261 \\)
262 , .{});
263
264 // deployment_schedule table
265 try backend.db.exec(
266 \\CREATE TABLE IF NOT EXISTS deployment_schedule (
267 \\ id TEXT PRIMARY KEY,
268 \\ created TEXT NOT NULL,
269 \\ updated TEXT NOT NULL,
270 \\ deployment_id TEXT NOT NULL REFERENCES deployment(id) ON DELETE CASCADE,
271 \\ schedule TEXT NOT NULL,
272 \\ active INTEGER DEFAULT 1,
273 \\ max_scheduled_runs BIGINT,
274 \\ parameters JSONB DEFAULT '{}',
275 \\ slug TEXT,
276 \\ UNIQUE(deployment_id, slug)
277 \\)
278 , .{});
279
280 // concurrency_limit table (v2 only - no v1 support)
281 try backend.db.exec(
282 \\CREATE TABLE IF NOT EXISTS concurrency_limit (
283 \\ id TEXT PRIMARY KEY,
284 \\ created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
285 \\ updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
286 \\ name TEXT NOT NULL UNIQUE,
287 \\ "limit" INTEGER NOT NULL,
288 \\ active BOOLEAN DEFAULT TRUE,
289 \\ active_slots INTEGER DEFAULT 0,
290 \\ denied_slots INTEGER DEFAULT 0,
291 \\ slot_decay_per_second DOUBLE PRECISION DEFAULT 0.0,
292 \\ avg_slot_occupancy_seconds DOUBLE PRECISION DEFAULT 2.0
293 \\)
294 , .{});
295
296 // indexes
297 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{});
298 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{});
299 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__next_scheduled_start_time ON flow_run(next_scheduled_start_time)", .{});
300 try backend.db.exec("CREATE UNIQUE INDEX IF NOT EXISTS uq_flow_run__flow_id_idempotency_key ON flow_run(flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL", .{});
301 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)", .{});
302 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run_state__task_run_id ON task_run_state(task_run_id)", .{});
303 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id)", .{});
304 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key)", .{});
305 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)", .{});
306 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__event__id ON events(event, id)", .{});
307 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__event_resource_id_occurred ON events(event, resource_id, occurred)", .{});
308 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred_id ON events(occurred, id)", .{});
309 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_schema__block_type_id ON block_schema(block_type_id)", .{});
310 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_schema__checksum ON block_schema(checksum)", .{});
311 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_type_id ON block_document(block_type_id)", .{});
312 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_schema_id ON block_document(block_schema_id)", .{});
313 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__name ON block_document(name)", .{});
314 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_variable__name ON variable(name)", .{});
315 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__name ON work_pool(name)", .{});
316 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__type ON work_pool(type)", .{});
317 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__work_pool_id ON work_queue(work_pool_id)", .{});
318 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__priority ON work_queue(priority)", .{});
319 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_worker__work_pool_id ON worker(work_pool_id)", .{});
320 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__deployment_id ON flow_run(deployment_id)", .{});
321 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__flow_id ON deployment(flow_id)", .{});
322 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_queue_id ON deployment(work_queue_id)", .{});
323 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_pool_name ON deployment(work_pool_name)", .{});
324 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__created ON deployment(created)", .{});
325 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__updated ON deployment(updated)", .{});
326 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__deployment_id ON deployment_schedule(deployment_id)", .{});
327 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__active ON deployment_schedule(active)", .{});
328 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_concurrency_limit__name ON concurrency_limit(name)", .{});
329
330 log.info("database", "postgres schema initialized", .{});
331}