# ORM models ## base class all models inherit from `Base` (DeclarativeBase): - **auto table names**: CamelCase → snake_case (e.g., `FlowRun` → `flow_run`) - **universal columns**: - `id`: UUID primary key with server-side generation (`GenerateUUID()`) - `created`: timestamp with UTC timezone and server default - `updated`: indexed timestamp with `onupdate=sa.func.now()` - **eager defaults**: `eager_defaults=True` fetches server defaults after INSERT without extra queries ## custom type decorators ### UUID - **PostgreSQL**: native `postgresql.UUID()` - **SQLite**: `CHAR(36)` string representation ### Timestamp - **PostgreSQL**: `TIMESTAMP(timezone=True)` - **SQLite**: `DATETIME()` naive, stored as UTC - always returns UTC timezone on read ### JSON - **PostgreSQL**: `postgresql.JSONB(none_as_null=True)` - binary, indexed, queryable - **SQLite**: `sqlite.JSON(none_as_null=True)` - sanitizes special floats (NaN, Infinity) unsupported by PostgreSQL ### Pydantic - wrapper over JSON that auto-serializes/deserializes Pydantic models - used for: `StateDetails`, `FlowRunPolicy`, `CreatedBy`, etc. ### GenerateUUID (server default) - **PostgreSQL**: `GEN_RANDOM_UUID()` - **SQLite**: complex `hex(randomblob())` construction mimicking UUID v4 ## key models ### core execution | model | key fields | notes | |-------|------------|-------| | Flow | name, tags, labels | unique name constraint | | FlowRun | flow_id, state_type, state_name, parameters, empirical_policy | inherits from `Run` mixin | | TaskRun | flow_run_id, task_key, dynamic_key, cache_key | inherits from `Run` mixin | | FlowRunState | flow_run_id, type, name, timestamp, state_details | one-to-many with FlowRun | | TaskRunState | task_run_id, type, name, timestamp, state_details | one-to-many with TaskRun | ### Run mixin (shared by FlowRun and TaskRun) ``` state_type: StateType enum state_name: string state_timestamp: datetime start_time: datetime (nullable) end_time: datetime (nullable) expected_start_time: datetime (nullable) total_run_time: float (seconds) run_count: int ``` ### deployments | model | key fields | notes | |-------|------------|-------| | Deployment | flow_id, name, status, concurrency_limit | unique (flow_id, name) | | DeploymentSchedule | deployment_id, schedule, active | cron/interval schedules | ### work management | model | key fields | notes | |-------|------------|-------| | WorkPool | name, type, status | pool infrastructure config | | WorkQueue | work_pool_id, name, priority, concurrency_limit | routes runs to workers | | Worker | work_pool_id, name, last_heartbeat_time | heartbeat tracking | ### blocks | model | key fields | notes | |-------|------------|-------| | BlockType | name, slug | schema definitions | | BlockSchema | block_type_id, checksum, capabilities | versioned schemas | | BlockDocument | block_type_id, block_schema_id, name, data | encrypted credential storage | ### artifacts | model | key fields | notes | |-------|------------|-------| | Artifact | key, type, data, flow_run_id, task_run_id | flexible result storage | | ArtifactCollection | key, latest_id | tracks latest artifact per key | ## relationship patterns ### one-to-many (standard) ```python # Flow → FlowRuns flow_runs: Mapped[list["FlowRun"]] = relationship( back_populates="flow", lazy="raise" ) ``` ### lazy loading strategies - `lazy="raise"` - prevents implicit queries (catches N+1 problems) - `lazy="selectin"` - eager loads in separate SELECT with IN clause ### foreign keys ```python # cascade delete flow_id = mapped_column(sa.ForeignKey("flow.id", ondelete="cascade")) # nullable with soft delete parent_task_run_id = mapped_column( sa.ForeignKey("task_run.id", ondelete="SET NULL", use_alter=True) ) ``` `use_alter=True` defers FK constraint creation (handles circular dependencies) ## state tracking pattern models maintain BOTH: 1. **state history** - via many-to-one relationship to State table 2. **current state pointer** - via optional FK with `use_alter=True` unique constraint on `(run_id, timestamp DESC)` ensures one state per instant. ## JSON field patterns ```python # plain JSON with defaults tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) parameters: Mapped[dict] = mapped_column(JSON, server_default="{}", default=dict) # Pydantic-backed JSON empirical_policy: Mapped[FlowRunPolicy] = mapped_column( Pydantic(FlowRunPolicy), server_default="{}", default=FlowRunPolicy, ) ``` ### FlowRunPolicy schema the `empirical_policy` column stores retry/pause behavior as JSON: | field | type | default | description | |-------|------|---------|-------------| | max_retries | int | 0 | **deprecated** - use `retries` | | retry_delay_seconds | float | 0 | **deprecated** - use `retry_delay` | | retries | int? | null | number of retries allowed | | retry_delay | int? | null | delay between retries (seconds) | | pause_keys | set[str]? | [] | tracks pauses observed | | resuming | bool? | false | indicates resuming from pause | | retry_type | "in_process" \| "reschedule"? | null | retry execution mode | this is the key schema for implementing RetryFailedFlows orchestration rule. ## PostgreSQL-specific indexes ```python # GIN index for JSON operations sa.Index("ix_events__related_resource_ids_gin", "related_resource_ids", postgresql_using="gin").ddl_if(dialect="postgresql") # partial index (both dialects) sa.Index("ix_flow_run__state_type_scheduled", cls.deployment_id, cls.auto_scheduled, cls.next_scheduled_start_time, postgresql_where=cls.state_type == StateType.SCHEDULED, sqlite_where=cls.state_type == StateType.SCHEDULED, ) ``` ## default value pattern both server and Python defaults for safety: ```python created: Mapped[DateTime] = mapped_column( server_default=sa.func.now(), # database-side default=lambda: now("UTC") # ORM-side ) ```