prefect server in zig
at main 331 lines 16 kB view raw
1const std = @import("std"); 2const backend = @import("../backend.zig"); 3const log = @import("../../logging.zig"); 4 5/// Initialize PostgreSQL schema 6pub fn init() !void { 7 // flow table 8 // NOTE: Using TEXT for timestamp columns to match SQLite and simplify reads 9 try backend.db.exec( 10 \\CREATE TABLE IF NOT EXISTS flow ( 11 \\ id TEXT PRIMARY KEY, 12 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 13 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 14 \\ name TEXT NOT NULL UNIQUE, 15 \\ tags JSONB DEFAULT '[]' 16 \\) 17 , .{}); 18 19 // flow_run table 20 // NOTE: Using TEXT for timestamp columns to match SQLite and simplify reads 21 // (pg.zig returns native TIMESTAMP as i64 microseconds, not text) 22 try backend.db.exec( 23 \\CREATE TABLE IF NOT EXISTS flow_run ( 24 \\ id TEXT PRIMARY KEY, 25 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 26 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 27 \\ flow_id TEXT REFERENCES flow(id), 28 \\ name TEXT NOT NULL, 29 \\ parameters JSONB DEFAULT '{}', 30 \\ tags JSONB DEFAULT '[]', 31 \\ state_id TEXT, 32 \\ state_type TEXT, 33 \\ state_name TEXT, 34 \\ state_timestamp TEXT, 35 \\ run_count BIGINT DEFAULT 0, 36 \\ expected_start_time TEXT, 37 \\ next_scheduled_start_time TEXT, 38 \\ start_time TEXT, 39 \\ end_time TEXT, 40 \\ total_run_time DOUBLE PRECISION DEFAULT 0.0, 41 \\ deployment_id TEXT, 42 \\ deployment_version TEXT, 43 \\ work_queue_name TEXT, 44 \\ work_queue_id TEXT, 45 \\ auto_scheduled INTEGER DEFAULT 0, 46 \\ idempotency_key TEXT, 47 \\ empirical_policy TEXT DEFAULT '{}', 48 \\ state_transition_id TEXT 49 \\) 50 , .{}); 51 52 // flow_run_state table 53 try backend.db.exec( 54 \\CREATE TABLE IF NOT EXISTS flow_run_state ( 55 \\ id TEXT PRIMARY KEY, 56 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 57 \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 58 \\ type TEXT NOT NULL, 59 \\ name TEXT NOT NULL, 60 \\ timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') 61 \\) 62 , .{}); 63 64 // task_run table 65 try backend.db.exec( 66 \\CREATE TABLE IF NOT EXISTS task_run ( 67 \\ id TEXT PRIMARY KEY, 68 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 69 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 70 \\ flow_run_id TEXT REFERENCES flow_run(id), 71 \\ name TEXT NOT NULL, 72 \\ task_key TEXT NOT NULL, 73 \\ dynamic_key TEXT NOT NULL, 74 \\ cache_key TEXT, 75 \\ tags JSONB DEFAULT '[]', 76 \\ state_id TEXT, 77 \\ state_type TEXT, 78 \\ state_name TEXT, 79 \\ state_timestamp TEXT, 80 \\ run_count BIGINT DEFAULT 0, 81 \\ expected_start_time TEXT, 82 \\ start_time TEXT, 83 \\ end_time TEXT, 84 \\ total_run_time DOUBLE PRECISION DEFAULT 0.0 85 \\) 86 , .{}); 87 88 // task_run_state table 89 try backend.db.exec( 90 \\CREATE TABLE IF NOT EXISTS task_run_state ( 91 \\ id TEXT PRIMARY KEY, 92 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 93 \\ task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE, 94 \\ type TEXT NOT NULL, 95 \\ name TEXT NOT NULL, 96 \\ timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') 97 \\) 98 , .{}); 99 100 // events table 101 try backend.db.exec( 102 \\CREATE TABLE IF NOT EXISTS events ( 103 \\ id TEXT PRIMARY KEY, 104 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 105 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 106 \\ occurred TEXT NOT NULL, 107 \\ event TEXT NOT NULL, 108 \\ resource_id TEXT NOT NULL, 109 \\ resource JSONB NOT NULL DEFAULT '{}', 110 \\ related_resource_ids JSONB DEFAULT '[]', 111 \\ related JSONB DEFAULT '[]', 112 \\ payload JSONB DEFAULT '{}', 113 \\ received TEXT NOT NULL, 114 \\ recorded TEXT NOT NULL, 115 \\ follows TEXT 116 \\) 117 , .{}); 118 119 // block_type table 120 // NOTE: Using INTEGER for is_protected to match SQLite 121 try backend.db.exec( 122 \\CREATE TABLE IF NOT EXISTS block_type ( 123 \\ id TEXT PRIMARY KEY, 124 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 125 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 126 \\ name TEXT NOT NULL, 127 \\ slug TEXT NOT NULL UNIQUE, 128 \\ logo_url TEXT, 129 \\ documentation_url TEXT, 130 \\ description TEXT, 131 \\ code_example TEXT, 132 \\ is_protected INTEGER DEFAULT 0 133 \\) 134 , .{}); 135 136 // block_schema table 137 try backend.db.exec( 138 \\CREATE TABLE IF NOT EXISTS block_schema ( 139 \\ id TEXT PRIMARY KEY, 140 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 141 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 142 \\ checksum TEXT NOT NULL, 143 \\ fields JSONB NOT NULL DEFAULT '{}', 144 \\ capabilities JSONB NOT NULL DEFAULT '[]', 145 \\ version TEXT NOT NULL DEFAULT '1', 146 \\ block_type_id TEXT NOT NULL REFERENCES block_type(id) ON DELETE CASCADE, 147 \\ UNIQUE(checksum, version) 148 \\) 149 , .{}); 150 151 // block_document table 152 // NOTE: Using INTEGER for is_anonymous to match SQLite 153 try backend.db.exec( 154 \\CREATE TABLE IF NOT EXISTS block_document ( 155 \\ id TEXT PRIMARY KEY, 156 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 157 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 158 \\ name TEXT, 159 \\ data JSONB NOT NULL DEFAULT '{}', 160 \\ is_anonymous INTEGER DEFAULT 0, 161 \\ block_type_id TEXT NOT NULL REFERENCES block_type(id), 162 \\ block_type_name TEXT, 163 \\ block_schema_id TEXT NOT NULL REFERENCES block_schema(id), 164 \\ UNIQUE(block_type_id, name) 165 \\) 166 , .{}); 167 168 // variable table 169 try backend.db.exec( 170 \\CREATE TABLE IF NOT EXISTS variable ( 171 \\ id TEXT PRIMARY KEY, 172 \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 173 \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 174 \\ name TEXT NOT NULL UNIQUE, 175 \\ value JSONB DEFAULT 'null', 176 \\ tags JSONB DEFAULT '[]' 177 \\) 178 , .{}); 179 180 // work_pool table 181 // NOTE: Using INTEGER for is_paused to match SQLite 182 try backend.db.exec( 183 \\CREATE TABLE IF NOT EXISTS work_pool ( 184 \\ id TEXT PRIMARY KEY, 185 \\ created TEXT NOT NULL, 186 \\ updated TEXT NOT NULL, 187 \\ name TEXT NOT NULL UNIQUE, 188 \\ description TEXT, 189 \\ type TEXT NOT NULL DEFAULT 'process', 190 \\ base_job_template JSONB DEFAULT '{}', 191 \\ is_paused INTEGER DEFAULT 0, 192 \\ concurrency_limit BIGINT, 193 \\ default_queue_id TEXT, 194 \\ status TEXT DEFAULT 'NOT_READY' 195 \\) 196 , .{}); 197 198 // work_queue table 199 // NOTE: Using INTEGER for is_paused to match SQLite 200 try backend.db.exec( 201 \\CREATE TABLE IF NOT EXISTS work_queue ( 202 \\ id TEXT PRIMARY KEY, 203 \\ created TEXT NOT NULL, 204 \\ updated TEXT NOT NULL, 205 \\ name TEXT NOT NULL, 206 \\ description TEXT DEFAULT '', 207 \\ is_paused INTEGER DEFAULT 0, 208 \\ concurrency_limit BIGINT, 209 \\ priority BIGINT DEFAULT 1, 210 \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, 211 \\ last_polled TEXT, 212 \\ status TEXT DEFAULT 'NOT_READY', 213 \\ UNIQUE(work_pool_id, name) 214 \\) 215 , .{}); 216 217 // worker table 218 try backend.db.exec( 219 \\CREATE TABLE IF NOT EXISTS worker ( 220 \\ id TEXT PRIMARY KEY, 221 \\ created TEXT NOT NULL, 222 \\ updated TEXT NOT NULL, 223 \\ name TEXT NOT NULL, 224 \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, 225 \\ last_heartbeat_time TEXT, 226 \\ heartbeat_interval_seconds BIGINT, 227 \\ status TEXT DEFAULT 'OFFLINE', 228 \\ UNIQUE(work_pool_id, name) 229 \\) 230 , .{}); 231 232 // deployment table 233 try backend.db.exec( 234 \\CREATE TABLE IF NOT EXISTS deployment ( 235 \\ id TEXT PRIMARY KEY, 236 \\ created TEXT NOT NULL, 237 \\ updated TEXT NOT NULL, 238 \\ name TEXT NOT NULL, 239 \\ flow_id TEXT NOT NULL REFERENCES flow(id) ON DELETE CASCADE, 240 \\ version TEXT, 241 \\ description TEXT, 242 \\ paused INTEGER DEFAULT 0, 243 \\ status TEXT DEFAULT 'NOT_READY', 244 \\ last_polled TEXT, 245 \\ parameters JSONB DEFAULT '{}', 246 \\ parameter_openapi_schema TEXT, 247 \\ enforce_parameter_schema INTEGER DEFAULT 1, 248 \\ tags JSONB DEFAULT '[]', 249 \\ labels JSONB DEFAULT '{}', 250 \\ path TEXT, 251 \\ entrypoint TEXT, 252 \\ job_variables JSONB DEFAULT '{}', 253 \\ pull_steps TEXT, 254 \\ work_pool_name TEXT, 255 \\ work_queue_name TEXT, 256 \\ work_queue_id TEXT REFERENCES work_queue(id) ON DELETE SET NULL, 257 \\ storage_document_id TEXT, 258 \\ infrastructure_document_id TEXT, 259 \\ concurrency_limit BIGINT, 260 \\ UNIQUE(flow_id, name) 261 \\) 262 , .{}); 263 264 // deployment_schedule table 265 try backend.db.exec( 266 \\CREATE TABLE IF NOT EXISTS deployment_schedule ( 267 \\ id TEXT PRIMARY KEY, 268 \\ created TEXT NOT NULL, 269 \\ updated TEXT NOT NULL, 270 \\ deployment_id TEXT NOT NULL REFERENCES deployment(id) ON DELETE CASCADE, 271 \\ schedule TEXT NOT NULL, 272 \\ active INTEGER DEFAULT 1, 273 \\ max_scheduled_runs BIGINT, 274 \\ parameters JSONB DEFAULT '{}', 275 \\ slug TEXT, 276 \\ UNIQUE(deployment_id, slug) 277 \\) 278 , .{}); 279 280 // concurrency_limit table (v2 only - no v1 support) 281 try backend.db.exec( 282 \\CREATE TABLE IF NOT EXISTS concurrency_limit ( 283 \\ id TEXT PRIMARY KEY, 284 \\ created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 285 \\ updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 286 \\ name TEXT NOT NULL UNIQUE, 287 \\ "limit" INTEGER NOT NULL, 288 \\ active BOOLEAN DEFAULT TRUE, 289 \\ active_slots INTEGER DEFAULT 0, 290 \\ denied_slots INTEGER DEFAULT 0, 291 \\ slot_decay_per_second DOUBLE PRECISION DEFAULT 0.0, 292 \\ avg_slot_occupancy_seconds DOUBLE PRECISION DEFAULT 2.0 293 \\) 294 , .{}); 295 296 // indexes 297 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); 298 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); 299 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__next_scheduled_start_time ON flow_run(next_scheduled_start_time)", .{}); 300 try backend.db.exec("CREATE UNIQUE INDEX IF NOT EXISTS uq_flow_run__flow_id_idempotency_key ON flow_run(flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL", .{}); 301 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)", .{}); 302 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run_state__task_run_id ON task_run_state(task_run_id)", .{}); 303 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id)", .{}); 304 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key)", .{}); 305 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)", .{}); 306 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__event__id ON events(event, id)", .{}); 307 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__event_resource_id_occurred ON events(event, resource_id, occurred)", .{}); 308 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred_id ON events(occurred, id)", .{}); 309 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_schema__block_type_id ON block_schema(block_type_id)", .{}); 310 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_schema__checksum ON block_schema(checksum)", .{}); 311 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_type_id ON block_document(block_type_id)", .{}); 312 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_schema_id ON block_document(block_schema_id)", .{}); 313 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__name ON block_document(name)", .{}); 314 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_variable__name ON variable(name)", .{}); 315 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__name ON work_pool(name)", .{}); 316 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__type ON work_pool(type)", .{}); 317 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__work_pool_id ON work_queue(work_pool_id)", .{}); 318 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__priority ON work_queue(priority)", .{}); 319 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_worker__work_pool_id ON worker(work_pool_id)", .{}); 320 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__deployment_id ON flow_run(deployment_id)", .{}); 321 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__flow_id ON deployment(flow_id)", .{}); 322 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_queue_id ON deployment(work_queue_id)", .{}); 323 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_pool_name ON deployment(work_pool_name)", .{}); 324 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__created ON deployment(created)", .{}); 325 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__updated ON deployment(updated)", .{}); 326 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__deployment_id ON deployment_schedule(deployment_id)", .{}); 327 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__active ON deployment_schedule(active)", .{}); 328 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_concurrency_limit__name ON concurrency_limit(name)", .{}); 329 330 log.info("database", "postgres schema initialized", .{}); 331}