const std = @import("std"); const backend = @import("../backend.zig"); const log = @import("../../logging.zig"); /// Initialize SQLite schema pub fn init() !void { // flow table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS flow ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ name TEXT NOT NULL UNIQUE, \\ tags TEXT DEFAULT '[]' \\) , .{}); // flow_run table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS flow_run ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ flow_id TEXT REFERENCES flow(id), \\ name TEXT NOT NULL, \\ parameters TEXT DEFAULT '{}', \\ tags TEXT DEFAULT '[]', \\ state_id TEXT, \\ state_type TEXT, \\ state_name TEXT, \\ state_timestamp TEXT, \\ run_count INTEGER DEFAULT 0, \\ expected_start_time TEXT, \\ next_scheduled_start_time TEXT, \\ start_time TEXT, \\ end_time TEXT, \\ total_run_time REAL DEFAULT 0.0, \\ deployment_id TEXT, \\ deployment_version TEXT, \\ work_queue_name TEXT, \\ work_queue_id TEXT, \\ auto_scheduled INTEGER DEFAULT 0, \\ idempotency_key TEXT, \\ empirical_policy TEXT DEFAULT '{}' \\) , .{}); // flow_run_state table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS flow_run_state ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, \\ type TEXT NOT NULL, \\ name TEXT NOT NULL, \\ timestamp TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')) \\) , .{}); // task_run table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS task_run ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ flow_run_id TEXT REFERENCES flow_run(id), \\ name TEXT NOT NULL, \\ task_key TEXT NOT NULL, \\ dynamic_key TEXT NOT NULL, \\ cache_key TEXT, \\ tags TEXT DEFAULT '[]', \\ state_id TEXT, \\ state_type TEXT, \\ state_name TEXT, \\ state_timestamp TEXT, \\ run_count INTEGER DEFAULT 0, \\ expected_start_time TEXT, \\ start_time TEXT, \\ end_time TEXT, \\ total_run_time REAL DEFAULT 0.0 \\) , .{}); // task_run_state table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS task_run_state ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE, \\ type TEXT NOT NULL, \\ name TEXT NOT NULL, \\ timestamp TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')) \\) , .{}); // events table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS events ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ occurred TEXT NOT NULL, \\ event TEXT NOT NULL, \\ resource_id TEXT NOT NULL, \\ resource TEXT NOT NULL DEFAULT '{}', \\ related_resource_ids TEXT DEFAULT '[]', \\ related TEXT DEFAULT '[]', \\ payload TEXT DEFAULT '{}', \\ received TEXT NOT NULL, \\ recorded TEXT NOT NULL, \\ follows TEXT \\) , .{}); // block_type table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS block_type ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ name TEXT NOT NULL, \\ slug TEXT NOT NULL UNIQUE, \\ logo_url TEXT, \\ documentation_url TEXT, \\ description TEXT, \\ code_example TEXT, \\ is_protected INTEGER DEFAULT 0 \\) , .{}); // block_schema table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS block_schema ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ checksum TEXT NOT NULL, \\ fields TEXT NOT NULL DEFAULT '{}', \\ capabilities TEXT NOT NULL DEFAULT '[]', \\ version TEXT NOT NULL DEFAULT '1', \\ block_type_id TEXT NOT NULL REFERENCES block_type(id) ON DELETE CASCADE, \\ UNIQUE(checksum, version) \\) , .{}); // block_document table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS block_document ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ name TEXT, \\ data TEXT NOT NULL DEFAULT '{}', \\ is_anonymous INTEGER DEFAULT 0, \\ block_type_id TEXT NOT NULL REFERENCES block_type(id), \\ block_type_name TEXT, \\ block_schema_id TEXT NOT NULL REFERENCES block_schema(id), \\ UNIQUE(block_type_id, name) \\) , .{}); // variable table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS variable ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ name TEXT NOT NULL UNIQUE, \\ value TEXT DEFAULT 'null', \\ tags TEXT DEFAULT '[]' \\) , .{}); // work_pool table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS work_pool ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ name TEXT NOT NULL UNIQUE, \\ description TEXT, \\ type TEXT NOT NULL DEFAULT 'process', \\ base_job_template TEXT DEFAULT '{}', \\ is_paused INTEGER DEFAULT 0, \\ concurrency_limit INTEGER, \\ default_queue_id TEXT, \\ status TEXT DEFAULT 'NOT_READY' \\) , .{}); // work_queue table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS work_queue ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ name TEXT NOT NULL, \\ description TEXT DEFAULT '', \\ is_paused INTEGER DEFAULT 0, \\ concurrency_limit INTEGER, \\ priority INTEGER DEFAULT 1, \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, \\ last_polled TEXT, \\ status TEXT DEFAULT 'NOT_READY', \\ UNIQUE(work_pool_id, name) \\) , .{}); // worker table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS worker ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ name TEXT NOT NULL, \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, \\ last_heartbeat_time TEXT, \\ heartbeat_interval_seconds INTEGER, \\ status TEXT DEFAULT 'OFFLINE', \\ UNIQUE(work_pool_id, name) \\) , .{}); // deployment table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS deployment ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ name TEXT NOT NULL, \\ flow_id TEXT NOT NULL REFERENCES flow(id) ON DELETE CASCADE, \\ version TEXT, \\ description TEXT, \\ paused INTEGER DEFAULT 0, \\ status TEXT DEFAULT 'NOT_READY', \\ last_polled TEXT, \\ parameters TEXT DEFAULT '{}', \\ parameter_openapi_schema TEXT, \\ enforce_parameter_schema INTEGER DEFAULT 1, \\ tags TEXT DEFAULT '[]', \\ labels TEXT DEFAULT '{}', \\ path TEXT, \\ entrypoint TEXT, \\ job_variables TEXT DEFAULT '{}', \\ pull_steps TEXT, \\ work_pool_name TEXT, \\ work_queue_name TEXT, \\ work_queue_id TEXT REFERENCES work_queue(id) ON DELETE SET NULL, \\ storage_document_id TEXT, \\ infrastructure_document_id TEXT, \\ concurrency_limit INTEGER, \\ UNIQUE(flow_id, name) \\) , .{}); // deployment_schedule table try backend.db.exec( \\CREATE TABLE IF NOT EXISTS deployment_schedule ( \\ id TEXT PRIMARY KEY, \\ created TEXT NOT NULL, \\ updated TEXT NOT NULL, \\ deployment_id TEXT NOT NULL REFERENCES deployment(id) ON DELETE CASCADE, \\ schedule TEXT NOT NULL, \\ active INTEGER DEFAULT 1, \\ max_scheduled_runs INTEGER, \\ parameters TEXT DEFAULT '{}', \\ slug TEXT, \\ UNIQUE(deployment_id, slug) \\) , .{}); // concurrency_limit table (v2 only - no v1 support) try backend.db.exec( \\CREATE TABLE IF NOT EXISTS concurrency_limit ( \\ id TEXT PRIMARY KEY, \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), \\ name TEXT NOT NULL UNIQUE, \\ "limit" INTEGER NOT NULL, \\ active INTEGER DEFAULT 1, \\ active_slots INTEGER DEFAULT 0, \\ denied_slots INTEGER DEFAULT 0, \\ slot_decay_per_second REAL DEFAULT 0.0, \\ avg_slot_occupancy_seconds REAL DEFAULT 2.0 \\) , .{}); // indexes try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__next_scheduled_start_time ON flow_run(next_scheduled_start_time)", .{}); try backend.db.exec("CREATE UNIQUE INDEX IF NOT EXISTS uq_flow_run__flow_id_idempotency_key ON flow_run(flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run_state__task_run_id ON task_run_state(task_run_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__event__id ON events(event, id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__event_resource_id_occurred ON events(event, resource_id, occurred)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred_id ON events(occurred, id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_schema__block_type_id ON block_schema(block_type_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_schema__checksum ON block_schema(checksum)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_type_id ON block_document(block_type_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_schema_id ON block_document(block_schema_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__name ON block_document(name)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_variable__name ON variable(name)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__name ON work_pool(name)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__type ON work_pool(type)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__work_pool_id ON work_queue(work_pool_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__priority ON work_queue(priority)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_worker__work_pool_id ON worker(work_pool_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__deployment_id ON flow_run(deployment_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__flow_id ON deployment(flow_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_queue_id ON deployment(work_queue_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_pool_name ON deployment(work_pool_name)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__created ON deployment(created)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__updated ON deployment(updated)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__deployment_id ON deployment_schedule(deployment_id)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__active ON deployment_schedule(active)", .{}); try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_concurrency_limit__name ON concurrency_limit(name)", .{}); log.info("database", "sqlite schema initialized", .{}); }