const std = @import("std"); const backend = @import("../backend.zig"); const log = @import("../../logging.zig"); /// Initialize PostgreSQL schema pub fn init() !void { // flow table // NOTE: Using TEXT for timestamp columns to match SQLite and simplify reads try backend.db.exec( \\CREATE TABLE IF NOT EXISTS flow ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ name TEXT NOT NULL UNIQUE, \\ tags JSONB DEFAULT '[]' \\) , .{}); // flow_run table // NOTE: Using TEXT for timestamp columns to match SQLite and simplify reads // (pg.zig returns native TIMESTAMP as i64 microseconds, not text) try backend.db.exec( \\CREATE TABLE IF NOT EXISTS flow_run ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ flow_id TEXT REFERENCES flow(id), \\ name TEXT NOT NULL, \\ parameters JSONB DEFAULT '{}', \\ tags JSONB DEFAULT '[]', \\ state_id TEXT, \\ state_type TEXT, \\ state_name TEXT, \\ state_timestamp TEXT, \\ run_count BIGINT DEFAULT 0, \\ expected_start_time TEXT, \\ next_scheduled_start_time TEXT, \\ start_time TEXT, \\ end_time TEXT, \\ total_run_time DOUBLE PRECISION DEFAULT 0.0, \\ deployment_id TEXT, \\ deployment_version TEXT, \\ work_queue_name TEXT, \\ work_queue_id TEXT, \\ auto_scheduled INTEGER DEFAULT 0, \\ idempotency_key TEXT, \\ empirical_policy TEXT DEFAULT '{}', \\ state_transition_id TEXT \\) , .{}); // flow_run_state table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS flow_run_state ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, \\ type TEXT NOT NULL, \\ name TEXT NOT NULL, \\ timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') \\) , .{}); // task_run table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS task_run ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ flow_run_id TEXT REFERENCES flow_run(id), \\ name TEXT NOT NULL, \\ task_key TEXT NOT NULL, \\ dynamic_key TEXT NOT NULL, \\ cache_key TEXT, \\ tags JSONB DEFAULT '[]', \\ state_id TEXT, \\ state_type TEXT, \\ state_name TEXT, \\ state_timestamp TEXT, \\ run_count BIGINT DEFAULT 0, \\ expected_start_time TEXT, \\ start_time TEXT, \\ end_time TEXT, \\ total_run_time DOUBLE PRECISION DEFAULT 0.0 \\) , .{}); // task_run_state table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS task_run_state ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE, \\ type TEXT NOT NULL, \\ name TEXT NOT NULL, \\ timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') \\) , .{}); // events table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS events ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ occurred TEXT NOT NULL, \\ event TEXT NOT NULL, \\ resource_id TEXT NOT NULL, \\ resource JSONB NOT NULL DEFAULT '{}', \\ related_resource_ids JSONB DEFAULT '[]', \\ related JSONB DEFAULT '[]', \\ payload JSONB DEFAULT '{}', \\ received TEXT NOT NULL, \\ recorded TEXT NOT NULL, \\ follows TEXT \\) , .{}); // block_type table // NOTE: Using INTEGER for is_protected to match SQLite try backend.db.exec( \\CREATE TABLE IF NOT EXISTS block_type ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ name TEXT NOT NULL, \\ slug TEXT NOT NULL UNIQUE, \\ logo_url TEXT, \\ documentation_url TEXT, \\ description TEXT, \\ code_example TEXT, \\ is_protected INTEGER DEFAULT 0 \\) , .{}); // block_schema table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS block_schema ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ checksum TEXT NOT NULL, \\ fields JSONB NOT NULL DEFAULT '{}', \\ capabilities JSONB NOT NULL DEFAULT '[]', \\ version TEXT NOT NULL DEFAULT '1', \\ block_type_id TEXT NOT NULL REFERENCES block_type(id) ON DELETE CASCADE, \\ UNIQUE(checksum, version) \\) , .{}); // block_document table // NOTE: Using INTEGER for is_anonymous to match SQLite try backend.db.exec( \\CREATE TABLE IF NOT EXISTS block_document ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ name TEXT, \\ data JSONB NOT NULL DEFAULT '{}', \\ is_anonymous INTEGER DEFAULT 0, \\ block_type_id TEXT NOT NULL REFERENCES block_type(id), \\ block_type_name TEXT, \\ block_schema_id TEXT NOT NULL REFERENCES block_schema(id), \\ UNIQUE(block_type_id, name) \\) , .{}); // variable table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS variable ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), \\ name TEXT NOT NULL UNIQUE, \\ value JSONB DEFAULT 'null', \\ tags JSONB DEFAULT '[]' \\) , .{}); // work_pool table // NOTE: Using INTEGER for is_paused to match SQLite try backend.db.exec( \\CREATE TABLE IF NOT EXISTS work_pool ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ name TEXT NOT NULL UNIQUE, \\ description TEXT, \\ type TEXT NOT NULL DEFAULT 'process', \\ base_job_template JSONB DEFAULT '{}', \\ is_paused INTEGER DEFAULT 0, \\ concurrency_limit BIGINT, \\ default_queue_id TEXT, \\ status TEXT DEFAULT 'NOT_READY' \\) , .{}); // work_queue table // NOTE: Using INTEGER for is_paused to match SQLite try backend.db.exec( \\CREATE TABLE IF NOT EXISTS work_queue ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ name TEXT NOT NULL, \\ description TEXT DEFAULT '', \\ is_paused INTEGER DEFAULT 0, \\ concurrency_limit BIGINT, \\ priority BIGINT DEFAULT 1, \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, \\ last_polled TEXT, \\ status TEXT DEFAULT 'NOT_READY', \\ UNIQUE(work_pool_id, name) \\) , .{}); // worker table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS worker ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ name TEXT NOT NULL, \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, \\ last_heartbeat_time TEXT, \\ heartbeat_interval_seconds BIGINT, \\ status TEXT DEFAULT 'OFFLINE', \\ UNIQUE(work_pool_id, name) \\) , .{}); // deployment table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS deployment ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ name TEXT NOT NULL, \\ flow_id TEXT NOT NULL REFERENCES flow(id) ON DELETE CASCADE, \\ version TEXT, \\ description TEXT, \\ paused INTEGER DEFAULT 0, \\ status TEXT DEFAULT 'NOT_READY', \\ last_polled TEXT, \\ parameters JSONB DEFAULT '{}', \\ parameter_openapi_schema TEXT, \\ enforce_parameter_schema INTEGER DEFAULT 1, \\ tags JSONB DEFAULT '[]', \\ labels JSONB DEFAULT '{}', \\ path TEXT, \\ entrypoint TEXT, \\ job_variables JSONB DEFAULT '{}', \\ pull_steps TEXT, \\ work_pool_name TEXT, \\ work_queue_name TEXT, \\ work_queue_id TEXT REFERENCES work_queue(id) ON DELETE SET NULL, \\ storage_document_id TEXT, \\ infrastructure_document_id TEXT, \\ concurrency_limit BIGINT, \\ UNIQUE(flow_id, name) \\) , .{}); // deployment_schedule table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS deployment_schedule ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ deployment_id TEXT NOT NULL REFERENCES deployment(id) ON DELETE CASCADE, \\ schedule TEXT NOT NULL, \\ active INTEGER DEFAULT 1, \\ max_scheduled_runs BIGINT, \\ parameters JSONB DEFAULT '{}', \\ slug TEXT, \\ UNIQUE(deployment_id, slug) \\) , .{}); // concurrency_limit table (v2 only - no v1 support) try backend.db.exec( \\CREATE TABLE IF NOT EXISTS concurrency_limit ( \\ id TEXT PRIMARY KEY, \\ created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, \\ updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, \\ name TEXT NOT NULL UNIQUE, \\ "limit" INTEGER NOT NULL, \\ active BOOLEAN DEFAULT TRUE, \\ active_slots INTEGER DEFAULT 0, \\ denied_slots INTEGER DEFAULT 0, \\ slot_decay_per_second DOUBLE PRECISION DEFAULT 0.0, \\ avg_slot_occupancy_seconds DOUBLE PRECISION DEFAULT 2.0 \\) , .{}); // indexes try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__next_scheduled_start_time ON flow_run(next_scheduled_start_time)", .{}); try backend.db.exec("CREATE UNIQUE INDEX IF NOT EXISTS uq_flow_run__flow_id_idempotency_key ON flow_run(flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run_state__task_run_id ON task_run_state(task_run_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__event__id ON events(event, id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__event_resource_id_occurred ON events(event, resource_id, occurred)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred_id ON events(occurred, id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_schema__block_type_id ON block_schema(block_type_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_schema__checksum ON block_schema(checksum)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_type_id ON block_document(block_type_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_schema_id ON block_document(block_schema_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__name ON block_document(name)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_variable__name ON variable(name)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__name ON work_pool(name)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__type ON work_pool(type)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__work_pool_id ON work_queue(work_pool_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__priority ON work_queue(priority)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_worker__work_pool_id ON worker(work_pool_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__deployment_id ON flow_run(deployment_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__flow_id ON deployment(flow_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_queue_id ON deployment(work_queue_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_pool_name ON deployment(work_pool_name)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__created ON deployment(created)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__updated ON deployment(updated)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__deployment_id ON deployment_schedule(deployment_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__active ON deployment_schedule(active)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_concurrency_limit__name ON concurrency_limit(name)", .{}); log.info("database", "postgres schema initialized", .{}); }