#!/usr/bin/env -S uv run --script --quiet # /// script # requires-python = ">=3.12" # dependencies = ["httpx", "rich"] # /// """ Functional test suite for prefect-server API. Tests API correctness by exercising all endpoints with expected request/response patterns. Includes scheduler integration tests (which have intentional delays to verify background services). For performance benchmarking, use ./scripts/benchmark instead. Usage: ./scripts/test-api-sequence # human-readable output ./scripts/test-api-sequence --json # machine-readable for CI ./scripts/test-api-sequence --quiet # minimal output """ import json as json_lib import os import sys import time import uuid from dataclasses import dataclass, field from typing import Callable import httpx from rich.console import Console from rich.panel import Panel from rich.table import Table console = Console() BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") QUIET = "--json" in sys.argv or "--quiet" in sys.argv @dataclass class TestResult: name: str passed: bool duration_ms: float requests: int = 0 error: str | None = None class CountingClient(httpx.Client): """HTTP client that counts requests.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.request_count = 0 def request(self, *args, **kwargs): self.request_count += 1 return super().request(*args, **kwargs) def validate_response(data: dict, required_fields: list[str], field_types: dict | None = None) -> bool: """Validate response contains required fields with expected types.""" for field in required_fields: if field not in data: if not QUIET: console.print(f"[red]VALIDATION[/red]: missing field '{field}'") return False if field_types: for field, expected_type in field_types.items(): if field in data and data[field] is not None: if not isinstance(data[field], expected_type): if not QUIET: console.print(f"[red]VALIDATION[/red]: field '{field}' expected {expected_type.__name__}, got {type(data[field]).__name__}") return False return True def run_test(name: str, test_fn: Callable[[CountingClient], bool]) -> TestResult: """Run a test function with timing and request counting.""" if not QUIET: console.print(Panel(f"testing {name}", style="blue")) client = CountingClient(base_url=BASE_URL, timeout=10) start = time.perf_counter() try: passed = test_fn(client) duration_ms = (time.perf_counter() - start) * 1000 return TestResult( name=name, passed=passed, duration_ms=duration_ms, requests=client.request_count, ) except Exception as e: duration_ms = (time.perf_counter() - start) * 1000 return TestResult( name=name, passed=False, duration_ms=duration_ms, requests=client.request_count, error=str(e), ) finally: client.close() # ---------- test functions ---------- def test_admin(client: CountingClient) -> bool: """Test admin/health endpoints.""" # health if not QUIET: console.print("[bold]GET /health[/bold]") resp = client.get("/health") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" {resp.text}") # version if not QUIET: console.print("[bold]GET /admin/version[/bold]") resp = client.get("/admin/version") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" {resp.json()}") # csrf-token if not QUIET: console.print("[bold]GET /csrf-token[/bold]") resp = client.get("/csrf-token", params={"client": "test-client"}) if resp.status_code == 200: if not QUIET: console.print(f" token received") elif resp.status_code == 422: if not QUIET: console.print(f" csrf protection disabled (ok)") else: if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False return True def test_flow_run(client: CountingClient, should_fail: bool = False) -> bool: """Test flow run lifecycle.""" suffix = "fail" if should_fail else "success" if not QUIET: console.print(f"server: {BASE_URL}\n") # create flow if not QUIET: console.print("[bold]1. POST /flows/[/bold]") resp = client.post("/flows/", json={"name": f"bench-flow-{suffix}"}) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False flow = resp.json() if not validate_response(flow, ["id", "name", "created", "updated"], {"id": str, "name": str}): return False if not QUIET: console.print(f" flow_id: {flow.get('id')}") # create flow run if not QUIET: console.print("\n[bold]2. POST /flow_runs/[/bold]") resp = client.post("/flow_runs/", json={ "flow_id": flow["id"], "name": f"run-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") return False flow_run = resp.json() if not validate_response(flow_run, ["id", "name", "flow_id", "state_type"], {"id": str, "name": str}): return False flow_run_id = flow_run.get("id") if not QUIET: console.print(f" flow_run_id: {flow_run_id}") # read flow run if not QUIET: console.print("\n[bold]3. GET /flow_runs/{id}[/bold]") resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" state: {resp.json().get('state_type')}") # set RUNNING if not QUIET: console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, "force": False, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" status: {resp.json().get('status')}") # set final state final_type = "FAILED" if should_fail else "COMPLETED" final_name = "Failed" if should_fail else "Completed" if not QUIET: console.print(f"\n[bold]5. POST /flow_runs/{{id}}/set_state ({final_type})[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": final_type, "name": final_name}, "force": False, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" status: {resp.json().get('status')}") # verify if not QUIET: console.print("\n[bold]6. GET /flow_runs/{id} (verify)[/bold]") resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: return False actual_type = resp.json().get("state_type") if actual_type != final_type: if not QUIET: console.print(f"[red]FAIL[/red]: expected {final_type}, got {actual_type}") return False if not QUIET: console.print(f" [green]state: {actual_type} (correct)[/green]") return True def test_flow_with_task_runs(client: CountingClient) -> bool: """Test flow execution with nested task runs - simulates real flow execution.""" def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup: create flow log("[bold]setup: create flow[/bold]") resp = client.post("/flows/", json={"name": f"flow-with-tasks-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") log(f" flow_id: {flow_id}") # create flow run log("[bold]1. create flow run[/bold]") resp = client.post("/flow_runs/", json={ "flow_id": flow_id, "name": f"run-with-tasks-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") flow_run_id = resp.json().get("id") log(f" flow_run_id: {flow_run_id}") # start flow run log("[bold]2. start flow run (RUNNING)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT": return fail(f"start flow run failed") log(f" flow run started") # create and execute task 1 log("[bold]3. execute task 1: extract[/bold]") resp = client.post("/task_runs/", json={ "flow_run_id": flow_run_id, "task_key": "extract_data", "dynamic_key": "extract-0", "name": "extract_data-0", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return fail(f"create task 1 {resp.status_code}") task1_id = resp.json().get("id") # run task 1 resp = client.post(f"/task_runs/{task1_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) if resp.status_code not in (200, 201): return fail(f"task 1 RUNNING {resp.status_code}") resp = client.post(f"/task_runs/{task1_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, }) if resp.status_code not in (200, 201): return fail(f"task 1 COMPLETED {resp.status_code}") log(f" task 1 completed") # create and execute task 2 (depends on task 1) log("[bold]4. execute task 2: transform[/bold]") resp = client.post("/task_runs/", json={ "flow_run_id": flow_run_id, "task_key": "transform_data", "dynamic_key": "transform-0", "name": "transform_data-0", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return fail(f"create task 2 {resp.status_code}") task2_id = resp.json().get("id") resp = client.post(f"/task_runs/{task2_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) resp = client.post(f"/task_runs/{task2_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, }) if resp.status_code not in (200, 201): return fail(f"task 2 COMPLETED {resp.status_code}") log(f" task 2 completed") # create and execute task 3 log("[bold]5. execute task 3: load[/bold]") resp = client.post("/task_runs/", json={ "flow_run_id": flow_run_id, "task_key": "load_data", "dynamic_key": "load-0", "name": "load_data-0", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return fail(f"create task 3 {resp.status_code}") task3_id = resp.json().get("id") resp = client.post(f"/task_runs/{task3_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) resp = client.post(f"/task_runs/{task3_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, }) if resp.status_code not in (200, 201): return fail(f"task 3 COMPLETED {resp.status_code}") log(f" task 3 completed") # complete flow run log("[bold]6. complete flow run[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, }) if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT": return fail(f"complete flow run failed") log(f" [green]flow run completed[/green]") # verify task runs can be read individually and are completed log("[bold]7. verify task runs completed[/bold]") for task_id, task_name in [(task1_id, "extract"), (task2_id, "transform"), (task3_id, "load")]: resp = client.get(f"/task_runs/{task_id}") if resp.status_code != 200: return fail(f"read task {task_name} {resp.status_code}") task_run = resp.json() if task_run.get("flow_run_id") != flow_run_id: return fail(f"task {task_name} not linked to flow run") if task_run.get("state_type") != "COMPLETED": return fail(f"task {task_name} not COMPLETED") log(f" [green]all 3 tasks completed and linked to flow run[/green]") return True def test_task_run(client: CountingClient) -> bool: """Test task run lifecycle.""" # create if not QUIET: console.print("[bold]POST /task_runs/[/bold]") resp = client.post("/task_runs/", json={ "task_key": "bench-task", "dynamic_key": f"key-{uuid.uuid4().hex[:8]}", "name": f"task-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False task_run_id = resp.json().get("id") if not QUIET: console.print(f" task_run_id: {task_run_id}") # read resp = client.get(f"/task_runs/{task_run_id}") if resp.status_code != 200: return False # RUNNING resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, "force": False, }) if resp.status_code not in (200, 201): return False if not QUIET: console.print(f" -> RUNNING: {resp.json().get('status')}") # COMPLETED resp = client.post(f"/task_runs/{task_run_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, "force": False, }) if resp.status_code not in (200, 201): return False if not QUIET: console.print(f" -> COMPLETED: {resp.json().get('status')}") return True def test_filters(client: CountingClient) -> bool: """Test filter endpoints.""" for endpoint, label in [ ("/flows/filter", "flows"), ("/flow_runs/filter", "flow_runs"), ("/task_runs/filter", "task_runs"), ]: resp = client.post(endpoint, json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: {endpoint} {resp.status_code}") return False if not QUIET: console.print(f" {label}: {len(resp.json())} items") return True def test_logs(client: CountingClient) -> bool: """Test logs endpoint.""" from datetime import datetime, timezone logs = [ {"level": 20, "message": "test log 1", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()}, {"level": 30, "message": "test log 2", "name": "test", "timestamp": datetime.now(timezone.utc).isoformat()}, ] resp = client.post("/logs/", json=logs) if resp.status_code not in (200, 201, 204): if not QUIET: console.print(f"[red]FAIL[/red]: {resp.status_code}") return False if not QUIET: console.print(f" {len(logs)} logs sent") return True def test_variables(client: CountingClient) -> bool: """Test variables API (CRUD).""" var_name = f"bench-var-{uuid.uuid4().hex[:8]}" # create if not QUIET: console.print("[bold]POST /variables/[/bold]") resp = client.post("/variables/", json={ "name": var_name, "value": {"nested": "object", "count": 42}, "tags": ["benchmark", "test"], }) if resp.status_code != 201: if not QUIET: console.print(f"[red]FAIL[/red]: create {resp.status_code}") return False variable = resp.json() if not validate_response(variable, ["id", "name", "value", "tags", "created", "updated"], {"id": str, "name": str, "tags": list}): return False var_id = variable.get("id") if not QUIET: console.print(f" created: {var_id}") # get by name resp = client.get(f"/variables/name/{var_name}") if resp.status_code != 200: return False if not QUIET: console.print(f" get by name: ok") # get by id resp = client.get(f"/variables/{var_id}") if resp.status_code != 200: return False # update by name resp = client.patch(f"/variables/name/{var_name}", json={"value": "updated"}) if resp.status_code != 204: return False if not QUIET: console.print(f" updated by name") # filter resp = client.post("/variables/filter", json={"limit": 10}) if resp.status_code != 200: return False if not QUIET: console.print(f" filter: {len(resp.json())} items") # count resp = client.post("/variables/count", json={}) if resp.status_code != 200: return False if not QUIET: console.print(f" count: {resp.text}") # duplicate name should fail resp = client.post("/variables/", json={"name": var_name, "value": "dupe"}) if resp.status_code != 409: if not QUIET: console.print(f"[red]FAIL[/red]: duplicate should return 409, got {resp.status_code}") return False if not QUIET: console.print(f" duplicate rejected: 409") # delete resp = client.delete(f"/variables/name/{var_name}") if resp.status_code != 204: return False if not QUIET: console.print(f" deleted") return True def test_blocks(client: CountingClient) -> bool: """Test blocks API (types, schemas, documents).""" slug = f"bench-block-{uuid.uuid4().hex[:8]}" # create block type if not QUIET: console.print("[bold]block_types[/bold]") resp = client.post("/block_types/", json={ "name": f"Bench Block {slug}", "slug": slug, "description": "benchmark block type", }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create block_type {resp.status_code}") return False block_type = resp.json() if not validate_response(block_type, ["id", "name", "slug"], {"id": str, "name": str, "slug": str}): return False block_type_id = block_type.get("id") if not QUIET: console.print(f" created: {block_type_id}") # get by slug resp = client.get(f"/block_types/slug/{slug}") if resp.status_code != 200: return False # create schema if not QUIET: console.print("[bold]block_schemas[/bold]") resp = client.post("/block_schemas/", json={ "block_type_id": block_type_id, "fields": {"properties": {"value": {"type": "string"}}}, "capabilities": ["test"], "version": "1.0.0", }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create block_schema {resp.status_code}") return False block_schema = resp.json() block_schema_id = block_schema.get("id") checksum = block_schema.get("checksum") if not QUIET: console.print(f" created: {block_schema_id}") # get by checksum resp = client.get(f"/block_schemas/checksum/{checksum}") if resp.status_code != 200: return False # create document if not QUIET: console.print("[bold]block_documents[/bold]") doc_name = f"bench-doc-{uuid.uuid4().hex[:8]}" resp = client.post("/block_documents/", json={ "name": doc_name, "block_type_id": block_type_id, "block_schema_id": block_schema_id, "data": {"value": "secret-value"}, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create block_document {resp.status_code}") return False block_doc = resp.json() block_doc_id = block_doc.get("id") if not QUIET: console.print(f" created: {block_doc_id}") # get by id resp = client.get(f"/block_documents/{block_doc_id}") if resp.status_code != 200: return False # get by slug/name resp = client.get(f"/block_types/slug/{slug}/block_documents/name/{doc_name}") if resp.status_code != 200: return False # update resp = client.patch(f"/block_documents/{block_doc_id}", json={"data": {"value": "updated"}}) if resp.status_code != 204: return False if not QUIET: console.print(f" updated") # filters for endpoint in ["/block_types/filter", "/block_schemas/filter", "/block_documents/filter"]: resp = client.post(endpoint, json={}) if resp.status_code != 200: return False # delete resp = client.delete(f"/block_documents/{block_doc_id}") if resp.status_code != 204: return False if not QUIET: console.print(f" deleted") return True def test_work_pools(client: CountingClient) -> bool: """Test work pools API (pools, queues, workers).""" pool_name = f"test-pool-{uuid.uuid4().hex[:8]}" # create work pool if not QUIET: console.print("[bold]work_pools[/bold]") resp = client.post("/work_pools/", json={ "name": pool_name, "type": "process", "description": "test work pool", }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create work_pool {resp.status_code} {resp.text}") return False pool = resp.json() if not validate_response(pool, ["id", "name", "type", "status", "default_queue_id"], {"id": str, "name": str}): return False if not QUIET: console.print(f" created: {pool.get('id')}") # check default queue was created if not pool.get("default_queue_id"): if not QUIET: console.print("[red]FAIL[/red]: no default_queue_id") return False # get by name resp = client.get(f"/work_pools/{pool_name}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: get work_pool {resp.status_code}") return False # update resp = client.patch(f"/work_pools/{pool_name}", json={"description": "updated"}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: update work_pool {resp.status_code}") return False if not QUIET: console.print(" updated") # filter resp = client.post("/work_pools/filter", json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: filter work_pools {resp.status_code}") return False pools = resp.json() if not isinstance(pools, list): return False # create queue if not QUIET: console.print("[bold]work_queues[/bold]") queue_name = f"test-queue-{uuid.uuid4().hex[:8]}" resp = client.post(f"/work_pools/{pool_name}/queues/", json={ "name": queue_name, "description": "test queue", "priority": 5, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create work_queue {resp.status_code} {resp.text}") return False queue = resp.json() if not validate_response(queue, ["id", "name", "priority", "work_pool_id"], {"id": str, "priority": int}): return False if not QUIET: console.print(f" created: {queue.get('id')}") # get queue resp = client.get(f"/work_pools/{pool_name}/queues/{queue_name}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: get work_queue {resp.status_code}") return False # filter queues resp = client.post(f"/work_pools/{pool_name}/queues/filter", json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: filter work_queues {resp.status_code}") return False queues = resp.json() if not isinstance(queues, list) or len(queues) < 2: # default + our queue if not QUIET: console.print(f"[red]FAIL[/red]: expected at least 2 queues, got {len(queues) if isinstance(queues, list) else 'not a list'}") return False # worker heartbeat if not QUIET: console.print("[bold]workers[/bold]") resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={ "name": "test-worker-1", "heartbeat_interval_seconds": 30, }) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: worker heartbeat {resp.status_code} {resp.text}") return False if not QUIET: console.print(" heartbeat sent") # check pool status is now READY resp = client.get(f"/work_pools/{pool_name}") if resp.status_code != 200: return False pool = resp.json() if pool.get("status") != "READY": if not QUIET: console.print(f"[red]FAIL[/red]: expected pool status READY, got {pool.get('status')}") return False if not QUIET: console.print(" pool status: READY") # filter workers resp = client.post(f"/work_pools/{pool_name}/workers/filter", json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: filter workers {resp.status_code}") return False workers = resp.json() if not isinstance(workers, list) or len(workers) < 1: return False # delete queue (not default) resp = client.delete(f"/work_pools/{pool_name}/queues/{queue_name}") if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: delete work_queue {resp.status_code}") return False if not QUIET: console.print(" deleted queue") # delete pool resp = client.delete(f"/work_pools/{pool_name}") if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: delete work_pool {resp.status_code}") return False if not QUIET: console.print(" deleted pool") return True def test_deployments(client: CountingClient) -> bool: """Test deployments API (deployments, schedules, create_flow_run).""" # create a flow first if not QUIET: console.print("[bold]setup: create flow[/bold]") resp = client.post("/flows/", json={"name": f"deploy-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create flow {resp.status_code}") return False flow = resp.json() flow_id = flow.get("id") flow_name = flow.get("name") # create deployment deployment_name = f"test-deployment-{uuid.uuid4().hex[:8]}" if not QUIET: console.print("[bold]deployments[/bold]") resp = client.post("/deployments/", json={ "name": deployment_name, "flow_id": flow_id, "description": "test deployment", "tags": ["test", "benchmark"], "parameters": {"key": "value"}, "schedules": [ {"schedule": {"interval": 3600}, "active": True}, ], }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create deployment {resp.status_code} {resp.text}") return False deployment = resp.json() if not validate_response(deployment, ["id", "name", "flow_id", "status", "schedules"], {"id": str, "name": str}): return False deployment_id = deployment.get("id") if not QUIET: console.print(f" created: {deployment_id}") # verify schedules were created schedules = deployment.get("schedules", []) if not isinstance(schedules, list) or len(schedules) != 1: if not QUIET: console.print(f"[red]FAIL[/red]: expected 1 schedule, got {len(schedules) if isinstance(schedules, list) else 'not a list'}") return False if not QUIET: console.print(f" schedules: {len(schedules)}") # get by id resp = client.get(f"/deployments/{deployment_id}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: get deployment {resp.status_code}") return False # get by name resp = client.get(f"/deployments/name/{flow_name}/{deployment_name}") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: get deployment by name {resp.status_code}") return False if not QUIET: console.print(" get by name: ok") # update resp = client.patch(f"/deployments/{deployment_id}", json={"description": "updated"}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: update deployment {resp.status_code}") return False if not QUIET: console.print(" updated") # filter resp = client.post("/deployments/filter", json={"limit": 10}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: filter deployments {resp.status_code}") return False if not QUIET: console.print(f" filter: {len(resp.json())} items") # count resp = client.post("/deployments/count", json={}) if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: count deployments {resp.status_code}") return False if not QUIET: console.print(f" count: {resp.text}") # pause resp = client.post(f"/deployments/{deployment_id}/pause_deployment", json={}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: pause deployment {resp.status_code}") return False if not QUIET: console.print(" paused") # resume resp = client.post(f"/deployments/{deployment_id}/resume_deployment", json={}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: resume deployment {resp.status_code}") return False if not QUIET: console.print(" resumed") # create flow run from deployment if not QUIET: console.print("[bold]create_flow_run[/bold]") resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={}) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create flow run {resp.status_code} {resp.text}") return False flow_run = resp.json() if not validate_response(flow_run, ["id", "flow_id", "deployment_id"], {"id": str}): return False if flow_run.get("deployment_id") != deployment_id: if not QUIET: console.print(f"[red]FAIL[/red]: deployment_id mismatch") return False if not QUIET: console.print(f" created flow run: {flow_run.get('id')}") # schedules - list if not QUIET: console.print("[bold]deployment_schedules[/bold]") resp = client.get(f"/deployments/{deployment_id}/schedules") if resp.status_code != 200: if not QUIET: console.print(f"[red]FAIL[/red]: list schedules {resp.status_code}") return False schedules = resp.json() schedule_id = schedules[0].get("id") if schedules else None if not QUIET: console.print(f" list: {len(schedules)} schedules") # schedules - create resp = client.post(f"/deployments/{deployment_id}/schedules", json={ "schedule": {"cron": "0 0 * * *"}, "active": False, }) if resp.status_code not in (200, 201): if not QUIET: console.print(f"[red]FAIL[/red]: create schedule {resp.status_code}") return False created_schedules = resp.json() if not isinstance(created_schedules, list) or len(created_schedules) != 1: if not QUIET: console.print(f"[red]FAIL[/red]: expected 1 created schedule") return False new_schedule_id = created_schedules[0].get("id") if not QUIET: console.print(f" created schedule: {new_schedule_id}") # schedules - update resp = client.patch(f"/deployments/{deployment_id}/schedules/{new_schedule_id}", json={"active": True}) if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: update schedule {resp.status_code}") return False if not QUIET: console.print(" updated schedule") # schedules - delete resp = client.delete(f"/deployments/{deployment_id}/schedules/{new_schedule_id}") if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: delete schedule {resp.status_code}") return False if not QUIET: console.print(" deleted schedule") # delete deployment resp = client.delete(f"/deployments/{deployment_id}") if resp.status_code != 204: if not QUIET: console.print(f"[red]FAIL[/red]: delete deployment {resp.status_code}") return False if not QUIET: console.print(" deleted deployment") return True def test_scheduler_idempotency(client: CountingClient) -> bool: """Test that scheduler is idempotent - running twice doesn't create duplicates.""" import time as time_mod def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) def wait_for_runs(deployment_id: str, min_count: int, max_wait: float = 10.0) -> int: """Poll until we have at least min_count runs, or timeout (client-side filter).""" start = time_mod.time() while time_mod.time() - start < max_wait: resp = client.post("/flow_runs/filter", json={"limit": 100}) if resp.status_code == 200: all_runs = resp.json() matching = [r for r in all_runs if r.get("deployment_id") == deployment_id] if len(matching) >= min_count: return len(matching) time_mod.sleep(0.5) return 0 # setup: create flow, work pool, deployment with interval schedule log("[bold]setup[/bold]") resp = client.post("/flows/", json={"name": f"idem-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") pool_name = f"idem-pool-{uuid.uuid4().hex[:8]}" resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") log(f" pool: {pool_name}") # create deployment with interval schedule (every hour) resp = client.post("/deployments/", json={ "name": f"idem-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name, "schedules": [{"schedule": {"interval": 3600}, "active": True}], # every hour }) if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") deployment = resp.json() deployment_id = deployment.get("id") log(f" deployment: {deployment_id}") # poll for scheduler to create runs and wait for tick to complete # we need to wait for count to stabilize, not just see the first run log("[bold]waiting for scheduler tick to complete...[/bold]") count_after_first = wait_for_runs(deployment_id, min_count=1) # wait for count to stabilize (scheduler tick may still be inserting) for _ in range(10): time_mod.sleep(0.2) resp = client.post("/flow_runs/filter", json={"limit": 100}) if resp.status_code == 200: matching = [r for r in resp.json() if r.get("deployment_id") == deployment_id] if len(matching) == count_after_first: break # count stable count_after_first = len(matching) log(f" runs after first tick: {count_after_first}") if count_after_first == 0: return fail("scheduler did not create any runs (timeout)") # verify idempotency: poll for a few seconds to confirm count stays stable # if scheduler creates duplicates, count would increase during this window log("[bold]verifying idempotency (polling to confirm count stable)...[/bold]") stable_checks = 0 for _ in range(6): # 6 checks over ~3 seconds time_mod.sleep(0.5) resp = client.post("/flow_runs/filter", json={"limit": 100}) if resp.status_code != 200: continue all_runs = resp.json() matching = [r for r in all_runs if r.get("deployment_id") == deployment_id] current_count = len(matching) if current_count == count_after_first: stable_checks += 1 elif current_count > count_after_first: return fail(f"idempotency failed: {count_after_first} -> {current_count} runs") log(f" stable count checks: {stable_checks}/6") log(f" [green]idempotency verified: count unchanged at {count_after_first}[/green]") # cleanup client.delete(f"/deployments/{deployment_id}") client.delete(f"/work_pools/{pool_name}") log(" cleanup: ok") return True def test_parameter_merging(client: CountingClient) -> bool: """Test that schedule parameters override deployment parameters.""" import time as time_mod def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup log("[bold]setup[/bold]") resp = client.post("/flows/", json={"name": f"params-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") pool_name = f"params-pool-{uuid.uuid4().hex[:8]}" resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") log(f" pool: {pool_name}") # create deployment with base parameters # schedule has override parameter resp = client.post("/deployments/", json={ "name": f"params-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name, "parameters": {"base_key": "base_value", "override_key": "deployment_value"}, "schedules": [{ "schedule": {"interval": 3600}, "active": True, "parameters": {"override_key": "schedule_value", "schedule_key": "schedule_only"}, }], }) if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") deployment = resp.json() deployment_id = deployment.get("id") log(f" deployment: {deployment_id}") log(f" deployment params: {deployment.get('parameters')}") # poll for scheduler to create runs (filter client-side since server filter doesn't support deployment_id yet) log("[bold]polling for scheduler to create runs...[/bold]") runs = [] start = time_mod.time() max_wait = 10.0 while time_mod.time() - start < max_wait: resp = client.post("/flow_runs/filter", json={"limit": 100}) if resp.status_code == 200: all_runs = resp.json() runs = [r for r in all_runs if r.get("deployment_id") == deployment_id] if len(runs) > 0: break time_mod.sleep(0.5) log(f" found {len(runs)} runs for deployment in {time_mod.time() - start:.1f}s") if len(runs) == 0: return fail("scheduler did not create any runs (timeout)") # check merged parameters on first run run_params = runs[0].get("parameters", {}) if isinstance(run_params, str): import json as json_mod run_params = json_mod.loads(run_params) log(f" run params: {run_params}") # verify merging: # - base_key should be from deployment # - override_key should be from schedule (override) # - schedule_key should be from schedule (new key) if run_params.get("base_key") != "base_value": return fail(f"base_key not preserved: {run_params.get('base_key')}") if run_params.get("override_key") != "schedule_value": return fail(f"override_key not overridden: {run_params.get('override_key')}") if run_params.get("schedule_key") != "schedule_only": return fail(f"schedule_key not added: {run_params.get('schedule_key')}") log(" [green]parameter merging verified[/green]") # cleanup client.delete(f"/deployments/{deployment_id}") client.delete(f"/work_pools/{pool_name}") log(" cleanup: ok") return True def test_get_scheduled_flow_runs(client: CountingClient) -> bool: """Test get_scheduled_flow_runs endpoint (worker polling).""" from datetime import datetime, timezone def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup log("[bold]setup: create flow[/bold]") resp = client.post("/flows/", json={"name": f"schedule-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") pool_name = f"schedule-pool-{uuid.uuid4().hex[:8]}" resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") pool = resp.json() pool_id, default_queue_id = pool.get("id"), pool.get("default_queue_id") log(f" pool: {pool_id}") if pool.get("status") != "NOT_READY": return fail(f"expected pool NOT_READY, got {pool.get('status')}") resp = client.post("/deployments/", json={"name": f"schedule-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name}) if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") deployment = resp.json() deployment_id = deployment.get("id") log(f" deployment: {deployment_id}") if deployment.get("status") != "NOT_READY": return fail(f"expected deployment NOT_READY, got {deployment.get('status')}") # create scheduled flow run log("[bold]create scheduled flow run[/bold]") resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}}) if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") flow_run = resp.json() flow_run_id = flow_run.get("id") log(f" flow_run: {flow_run_id}") log(f" state: {flow_run.get('state_type')}") if flow_run.get("state_type") != "SCHEDULED": return fail(f"expected SCHEDULED, got {flow_run.get('state_type')}") # test polling log("[bold]get_scheduled_flow_runs[/bold]") resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) if resp.status_code != 200: return fail(f"get_scheduled_flow_runs {resp.status_code}") scheduled_runs = resp.json() if not isinstance(scheduled_runs, list): return fail(f"expected list, got {type(scheduled_runs)}") log(f" returned {len(scheduled_runs)} runs") # verify our run is in results found = any(item.get("flow_run", {}).get("id") == flow_run_id and item.get("work_pool_id") == pool_id and item.get("work_queue_id") == default_queue_id for item in scheduled_runs) if not found: return fail("scheduled flow run not found in results") log(" flow run found in results") # verify status changes resp = client.get(f"/work_pools/{pool_name}") if resp.status_code != 200 or resp.json().get("status") != "READY": return fail(f"expected pool READY after polling") log(" pool status: READY") resp = client.get(f"/deployments/{deployment_id}") if resp.status_code != 200 or resp.json().get("status") != "READY": return fail(f"expected deployment READY after polling") log(" deployment status: READY") # test filters resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"work_queue_names": ["default"]}) if resp.status_code != 200: return fail(f"filter test {resp.status_code}") log(" filtered by queue: ok") resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"scheduled_before": datetime.now(timezone.utc).isoformat()}) if resp.status_code != 200: return fail(f"scheduled_before test {resp.status_code}") log(f" scheduled_before filter: {len(resp.json())} runs") # cleanup client.delete(f"/deployments/{deployment_id}") client.delete(f"/work_pools/{pool_name}") log(" cleanup: ok") return True def test_retry_failed_flows(client: CountingClient) -> bool: """Test RetryFailedFlows rule - retry cycle when configured.""" def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup log("[bold]setup[/bold]") resp = client.post("/flows/", json={"name": f"retry-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") # create flow run with retry configuration via empirical_policy log("[bold]create flow run with retries=2, retry_delay=1[/bold]") resp = client.post("/flow_runs/", json={ "flow_id": flow_id, "name": f"retry-run-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, "empirical_policy": {"retries": 2, "retry_delay": 1}, }) if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") flow_run = resp.json() flow_run_id = flow_run.get("id") log(f" flow_run_id: {flow_run_id}") # run_count starts at 0, increments when entering RUNNING # transition to RUNNING (run_count becomes 1) log("[bold]PENDING → RUNNING (first attempt)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) if resp.status_code not in (200, 201): return fail(f"set RUNNING {resp.status_code}") if resp.json().get("status") != "ACCEPT": return fail(f"expected ACCEPT for RUNNING, got {resp.json().get('status')}") # verify run_count resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: return fail(f"get flow run {resp.status_code}") run_count = resp.json().get("run_count", 0) log(f" run_count after RUNNING: {run_count}") # fail - with retries available, should REJECT and schedule retry log("[bold]RUNNING → FAILED (expect REJECT + AwaitingRetry)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "FAILED", "name": "Failed", "message": "Test failure"}, }) if resp.status_code not in (200, 201): return fail(f"set FAILED {resp.status_code}") result = resp.json() status = result.get("status") log(f" status: {status}") # with retries, should get REJECT and the run should be in AwaitingRetry state if status == "REJECT": log(f" [green]REJECT (retries available)[/green]") # verify state is now AwaitingRetry (SCHEDULED) resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code != 200: return fail(f"get flow run {resp.status_code}") actual_state = resp.json().get("state_type") actual_name = resp.json().get("state_name") log(f" state: {actual_state}/{actual_name}") if actual_state != "SCHEDULED": return fail(f"expected SCHEDULED (AwaitingRetry), got {actual_state}") log(f" [green]state: SCHEDULED/AwaitingRetry (correct)[/green]") elif status == "ACCEPT": # this could happen if run_count > retries - check if that's the case log(f" ACCEPT (retries may be exhausted)") return True else: return fail(f"unexpected status: {status}") # complete the retry cycle: SCHEDULED → PENDING → RUNNING → COMPLETED log("[bold]retry cycle: SCHEDULED → PENDING → RUNNING → COMPLETED[/bold]") # SCHEDULED → PENDING resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return fail(f"retry PENDING {resp.status_code}") log(f" → PENDING: {resp.json().get('status')}") # PENDING → RUNNING (run_count increments) resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) if resp.status_code not in (200, 201): return fail(f"retry RUNNING {resp.status_code}") log(f" → RUNNING: {resp.json().get('status')}") # verify run_count incremented resp = client.get(f"/flow_runs/{flow_run_id}") run_count = resp.json().get("run_count", 0) log(f" run_count: {run_count}") # complete successfully resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, }) if resp.status_code not in (200, 201): return fail(f"COMPLETED {resp.status_code}") if resp.json().get("status") != "ACCEPT": return fail(f"expected ACCEPT for COMPLETED, got {resp.json().get('status')}") log(f" [green]→ COMPLETED: ACCEPT[/green]") return True def test_cancellation_flow(client: CountingClient) -> bool: """Test cancellation states (CANCELLING, CANCELLED).""" def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup log("[bold]setup[/bold]") resp = client.post("/flows/", json={"name": f"cancel-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") resp = client.post("/flow_runs/", json={ "flow_id": flow_id, "name": f"cancel-run-{uuid.uuid4().hex[:8]}", "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") flow_run_id = resp.json().get("id") log(f" flow_run_id: {flow_run_id}") # start the run log("[bold]PENDING → RUNNING[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) if resp.status_code not in (200, 201) or resp.json().get("status") != "ACCEPT": return fail(f"RUNNING transition failed") log(f" status: ACCEPT") # cancel while running - first CANCELLING log("[bold]RUNNING → CANCELLING[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "CANCELLING", "name": "Cancelling"}, }) if resp.status_code not in (200, 201): return fail(f"CANCELLING {resp.status_code}") if resp.json().get("status") != "ACCEPT": return fail(f"expected ACCEPT for CANCELLING, got {resp.json().get('status')}") log(f" [green]status: ACCEPT[/green]") # verify state resp = client.get(f"/flow_runs/{flow_run_id}") if resp.json().get("state_type") != "CANCELLING": return fail(f"expected CANCELLING, got {resp.json().get('state_type')}") # complete cancellation log("[bold]CANCELLING → CANCELLED[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "CANCELLED", "name": "Cancelled"}, }) if resp.status_code not in (200, 201): return fail(f"CANCELLED {resp.status_code}") if resp.json().get("status") != "ACCEPT": return fail(f"expected ACCEPT for CANCELLED, got {resp.json().get('status')}") log(f" [green]status: ACCEPT[/green]") # verify final state resp = client.get(f"/flow_runs/{flow_run_id}") if resp.json().get("state_type") != "CANCELLED": return fail(f"expected CANCELLED, got {resp.json().get('state_type')}") log(f" [green]final state: CANCELLED[/green]") # CANCELLED is terminal - verify can't go back to PENDING (PreventPendingTransitions) log("[bold]CANCELLED → PENDING (expect REJECT)[/bold]") resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return fail(f"PENDING attempt {resp.status_code}") if resp.json().get("status") != "REJECT": return fail(f"expected REJECT for CANCELLED→PENDING, got {resp.json().get('status')}") log(f" [green]CANCELLED → PENDING: REJECT (correct)[/green]") return True def test_serve_pattern(client: CountingClient) -> bool: """Test Runner/.serve() pattern: poll /deployments/get_scheduled_flow_runs, execute locally. This simulates what happens when you call flow.serve() - a Runner process polls the deployments endpoint for scheduled runs and executes them in subprocesses. """ def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup: create flow and deployment (no work pool - serve doesn't use workers) log("[bold]setup: flow + deployment (no work pool)[/bold]") resp = client.post("/flows/", json={"name": f"serve-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") resp = client.post("/deployments/", json={ "name": f"serve-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, # no work_pool_name - this is the serve pattern }) if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") deployment = resp.json() deployment_id = deployment.get("id") log(f" deployment: {deployment_id}") # create a scheduled flow run log("[bold]1. create scheduled flow run[/bold]") resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={ "state": {"type": "SCHEDULED", "name": "Scheduled"}, }) if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") flow_run = resp.json() flow_run_id = flow_run.get("id") log(f" flow_run: {flow_run_id}") # Runner polls: POST /deployments/get_scheduled_flow_runs log("[bold]2. Runner polls /deployments/get_scheduled_flow_runs[/bold]") from datetime import datetime, timezone resp = client.post("/deployments/get_scheduled_flow_runs", json={ "deployment_ids": [deployment_id], "scheduled_before": datetime.now(timezone.utc).isoformat(), "limit": 10, }) if resp.status_code != 200: return fail(f"poll {resp.status_code}") scheduled = resp.json() found = any(r.get("id") == flow_run_id for r in scheduled) if not found: return fail("run not found in scheduled runs") log(f" found {len(scheduled)} scheduled run(s)") # Runner executes: SCHEDULED → PENDING → RUNNING → COMPLETED log("[bold]3. Runner executes flow locally[/bold]") # claim the run resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "PENDING", "name": "Pending"}, }) if resp.status_code not in (200, 201): return fail(f"PENDING {resp.status_code}") log(f" → PENDING: {resp.json().get('status')}") # execute resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) if resp.status_code not in (200, 201): return fail(f"RUNNING {resp.status_code}") log(f" → RUNNING: {resp.json().get('status')}") # complete resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, }) if resp.status_code not in (200, 201): return fail(f"COMPLETED {resp.status_code}") if resp.json().get("status") != "ACCEPT": return fail(f"expected ACCEPT, got {resp.json().get('status')}") log(f" [green]→ COMPLETED: ACCEPT[/green]") # cleanup client.delete(f"/deployments/{deployment_id}") log(" cleanup: ok") return True def test_worker_pattern(client: CountingClient) -> bool: """Test Worker pattern: poll /work_pools/{name}/get_scheduled_flow_runs, dispatch to infra. This simulates what a Prefect Worker does - it polls a work pool for scheduled runs and dispatches them to infrastructure (k8s, docker, etc). The worker doesn't execute the flow itself; infrastructure does. """ def fail(msg: str) -> bool: if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") return False def log(msg: str) -> None: if not QUIET: console.print(msg) # setup: flow + work pool + deployment with work_pool_name log("[bold]setup: flow + work pool + deployment[/bold]") resp = client.post("/flows/", json={"name": f"worker-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") pool_name = f"worker-pool-{uuid.uuid4().hex[:8]}" resp = client.post("/work_pools/", json={"name": pool_name, "type": "kubernetes"}) if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") log(f" pool: {pool_name}") resp = client.post("/deployments/", json={ "name": f"worker-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name, # KEY: uses work pool }) if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") deployment_id = resp.json().get("id") log(f" deployment: {deployment_id}") # worker registers via heartbeat log("[bold]1. Worker sends heartbeat[/bold]") resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={ "name": "k8s-worker-1", "heartbeat_interval_seconds": 30, }) if resp.status_code != 204: return fail(f"heartbeat {resp.status_code}") log(f" heartbeat sent") # create a scheduled flow run log("[bold]2. create scheduled flow run[/bold]") resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={ "state": {"type": "SCHEDULED", "name": "Scheduled"}, }) if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") flow_run_id = resp.json().get("id") log(f" flow_run: {flow_run_id}") # Worker polls: POST /work_pools/{name}/get_scheduled_flow_runs log("[bold]3. Worker polls /work_pools/{name}/get_scheduled_flow_runs[/bold]") resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) if resp.status_code != 200: return fail(f"poll {resp.status_code}") scheduled = resp.json() found = any(r.get("flow_run", {}).get("id") == flow_run_id for r in scheduled) if not found: return fail("run not found in work pool queue") log(f" found run in work pool queue") # Worker dispatches to infrastructure (simulated - infra reports back state) log("[bold]4. Infrastructure executes and reports state[/bold]") # infra claims resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "PENDING", "name": "Pending"}, }) log(f" → PENDING: {resp.json().get('status')}") # infra runs resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "RUNNING", "name": "Running"}, }) log(f" → RUNNING: {resp.json().get('status')}") # infra completes resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json={ "state": {"type": "COMPLETED", "name": "Completed"}, }) if resp.json().get("status") != "ACCEPT": return fail(f"expected ACCEPT, got {resp.json().get('status')}") log(f" [green]→ COMPLETED: ACCEPT[/green]") # cleanup client.delete(f"/deployments/{deployment_id}") client.delete(f"/work_pools/{pool_name}") log(" cleanup: ok") return True def test_late_runs(client: CountingClient) -> bool: """Test that late_runs service marks overdue scheduled runs as Late.""" from datetime import datetime, timezone, timedelta import time as time_mod fail = lambda msg: (QUIET or console.print(f"[red]FAIL[/red]: {msg}"), False)[1] log = lambda msg: QUIET or console.print(msg) resp = client.post("/flows/", json={"name": f"late-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") # create flow run with past scheduled time (60s ago, threshold is 15s) past_time = (datetime.now(timezone.utc) - timedelta(seconds=60)).isoformat() resp = client.post("/flow_runs/", json={ "flow_id": flow_id, "name": f"late-run-{uuid.uuid4().hex[:8]}", "state": {"type": "SCHEDULED", "name": "Scheduled"}, "next_scheduled_start_time": past_time}) if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") flow_run_id = resp.json().get("id") # wait for late_runs service (runs every 5s) for i in range(15): time_mod.sleep(1) resp = client.get(f"/flow_runs/{flow_run_id}") if resp.status_code == 200 and resp.json().get("state_name") == "Late": log(f" [green]state changed to Late after {i+1}s[/green]") return True return fail("late_runs service did not mark run as Late within timeout") def test_work_queue_priority(client: CountingClient) -> bool: """Test work queue priority ordering in get_scheduled_flow_runs.""" fail = lambda msg: (QUIET or console.print(f"[red]FAIL[/red]: {msg}"), False)[1] log = lambda msg: QUIET or console.print(msg) resp = client.post("/flows/", json={"name": f"priority-flow-{uuid.uuid4().hex[:8]}"}) if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") flow_id = resp.json().get("id") pool_name = f"priority-pool-{uuid.uuid4().hex[:8]}" resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) if resp.status_code not in (200, 201): return fail(f"create pool {resp.status_code}") # create high and low priority queues for name, pri in [("high-priority", 1), ("low-priority", 100)]: resp = client.post(f"/work_pools/{pool_name}/queues/", json={"name": name, "priority": pri}) if resp.status_code not in (200, 201): return fail(f"create {name} queue {resp.status_code}") # create deployments for each queue and flow runs in reverse priority order deployments, flow_runs = {}, {} for q in ["high-priority", "low-priority", "default"]: resp = client.post("/deployments/", json={"name": f"d-{q}-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name, "work_queue_name": q}) if resp.status_code not in (200, 201): return fail(f"create deployment {q} {resp.status_code}") deployments[q] = resp.json().get("id") for q in ["low-priority", "default", "high-priority"]: resp = client.post(f"/deployments/{deployments[q]}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}}) if resp.status_code not in (200, 201): return fail(f"create run for {q} {resp.status_code}") flow_runs[q] = resp.json().get("id") # poll and verify priority ordering resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) if resp.status_code != 200: return fail(f"poll {resp.status_code}") scheduled = resp.json() if len(scheduled) < 3: return fail(f"expected 3 runs, got {len(scheduled)}") run_ids = [r.get("flow_run", {}).get("id") for r in scheduled] pos = {q: run_ids.index(flow_runs[q]) if flow_runs[q] in run_ids else -1 for q in flow_runs} if -1 in pos.values(): return fail("not all runs found") if pos["high-priority"] > pos["low-priority"]: return fail("high should come before low") log(f" [green]priority ordering verified (high={pos['high-priority']}, low={pos['low-priority']})[/green]") for d in deployments.values(): client.delete(f"/deployments/{d}") client.delete(f"/work_pools/{pool_name}") return True def main(): json_output = "--json" in sys.argv if not QUIET: console.print("\n[bold cyan]prefect-server API test[/bold cyan]\n") results: list[TestResult] = [] # run all tests # core API tests (fast, no sleeps) results.append(run_test("admin", test_admin)) results.append(run_test("flow_run (success)", lambda c: test_flow_run(c, should_fail=False))) results.append(run_test("flow_run (failure)", lambda c: test_flow_run(c, should_fail=True))) results.append(run_test("flow_with_task_runs", test_flow_with_task_runs)) results.append(run_test("task_run", test_task_run)) results.append(run_test("filters", test_filters)) results.append(run_test("logs", test_logs)) results.append(run_test("variables", test_variables)) results.append(run_test("blocks", test_blocks)) results.append(run_test("work_pools", test_work_pools)) results.append(run_test("deployments", test_deployments)) # execution patterns (two distinct models) results.append(run_test("serve_pattern", test_serve_pattern)) # Runner: polls deployments results.append(run_test("worker_pattern", test_worker_pattern)) # Worker: polls work pools results.append(run_test("work_queue_priority", test_work_queue_priority)) results.append(run_test("retry_failed_flows", test_retry_failed_flows)) results.append(run_test("cancellation_flow", test_cancellation_flow)) results.append(run_test("late_runs", test_late_runs)) # background service marks overdue runs # scheduler integration tests (require sleeps for background service) results.append(run_test("scheduler_idempotency", test_scheduler_idempotency)) results.append(run_test("parameter_merging", test_parameter_merging)) total_duration = sum(r.duration_ms for r in results) total_requests = sum(r.requests for r in results) all_passed = all(r.passed for r in results) if json_output: # machine-readable output for benchmark script output = { "passed": all_passed, "total_duration_ms": total_duration, "total_requests": total_requests, "sections": [ { "name": r.name, "passed": r.passed, "duration_ms": r.duration_ms, "requests": r.requests, "error": r.error, } for r in results ], } print(json_lib.dumps(output)) else: # human-readable output console.print("\n" + "=" * 60) table = Table(title="test results") table.add_column("section", style="cyan") table.add_column("time", justify="right") table.add_column("reqs", justify="right") table.add_column("status", justify="center") for r in results: status = "[green]✓[/green]" if r.passed else "[red]✗[/red]" table.add_row(r.name, f"{r.duration_ms:.1f}ms", str(r.requests), status) table.add_row("", "", "", "", style="dim") table.add_row("[bold]total[/bold]", f"[bold]{total_duration:.1f}ms[/bold]", f"[bold]{total_requests}[/bold]", "") console.print(table) if all_passed: console.print("\n[bold green]all tests passed[/bold green]") else: console.print("\n[bold red]some tests failed[/bold red]") sys.exit(0 if all_passed else 1) if __name__ == "__main__": main()