A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
README.md

OPA Gatekeeper External Data Provider for ATProto Signatures#

This is a reference implementation of an OPA Gatekeeper External Data Provider that verifies ATProto signatures on ATCR container images.

Overview#

Gatekeeper's External Data Provider feature allows Rego policies to call external HTTP services for data validation. This provider implements signature verification as an HTTP service that Gatekeeper can query.

Architecture#

Kubernetes Pod Creation
       ↓
OPA Gatekeeper (admission webhook)
       ↓
Rego Policy (constraint template)
       ↓
External Data Provider API call
       ↓
ATProto Verification Service ← This service
       ↓
   1. Resolve image digest
   2. Discover signature artifacts
   3. Parse ATProto signature metadata
   4. Resolve DID to public key
   5. Fetch commit from PDS
   6. Verify K-256 signature
   7. Check trust policy
       ↓
   Return: verified=true/false + metadata

Files#

  • main.go - HTTP server and provider endpoints
  • verifier.go - ATProto signature verification logic
  • resolver.go - DID and PDS resolution
  • crypto.go - K-256 signature verification
  • trust-policy.yaml - Trust policy configuration
  • Dockerfile - Build provider service image
  • deployment.yaml - Kubernetes deployment manifest
  • provider-crd.yaml - Gatekeeper Provider custom resource
  • constraint-template.yaml - Rego constraint template
  • constraint.yaml - Policy constraint example

Prerequisites#

  • Go 1.21+
  • Kubernetes cluster with OPA Gatekeeper installed
  • Access to ATCR registry

Building#

# Build binary
CGO_ENABLED=0 go build -o atcr-provider \
  -ldflags="-w -s" \
  ./main.go

# Build Docker image
docker build -t atcr.io/atcr/gatekeeper-provider:latest .

# Push to registry
docker push atcr.io/atcr/gatekeeper-provider:latest

Deployment#

1. Create Trust Policy ConfigMap#

kubectl create namespace gatekeeper-system
kubectl create configmap atcr-trust-policy \
  --from-file=trust-policy.yaml \
  -n gatekeeper-system

2. Deploy Provider Service#

kubectl apply -f deployment.yaml

3. Configure Gatekeeper Provider#

kubectl apply -f provider-crd.yaml

4. Create Constraint Template#

kubectl apply -f constraint-template.yaml

5. Create Constraint#

kubectl apply -f constraint.yaml

6. Test#

# Try to create pod with signed image (should succeed)
kubectl run test-signed --image=atcr.io/alice/myapp:latest

# Try to create pod with unsigned image (should fail)
kubectl run test-unsigned --image=atcr.io/malicious/fake:latest

# Check constraint status
kubectl get constraint atcr-signatures-required -o yaml

API Specification#

Provider Endpoint#

POST /provide

Request:

{
  "keys": ["image"],
  "values": [
    "atcr.io/alice/myapp:latest",
    "atcr.io/bob/webapp:v1.0"
  ]
}

Response:

{
  "responses": [
    {
      "image": "atcr.io/alice/myapp:latest",
      "verified": true,
      "did": "did:plc:alice123",
      "handle": "alice.bsky.social",
      "signedAt": "2025-10-31T12:34:56Z",
      "commitCid": "bafyreih8..."
    },
    {
      "image": "atcr.io/bob/webapp:v1.0",
      "verified": false,
      "error": "no signature found"
    }
  ]
}

Health Check#

GET /health

Response:

{
  "status": "ok",
  "version": "1.0.0"
}

Configuration#

Trust Policy Format#

# trust-policy.yaml
version: 1.0

trustedDIDs:
  did:plc:alice123:
    name: "Alice (DevOps)"
    validFrom: "2024-01-01T00:00:00Z"
    expiresAt: null

  did:plc:bob456:
    name: "Bob (Security)"
    validFrom: "2024-06-01T00:00:00Z"
    expiresAt: "2025-12-31T23:59:59Z"

policies:
  - name: production
    scope: "atcr.io/*/prod-*"
    require:
      signature: true
      trustedDIDs:
        - did:plc:alice123
        - did:plc:bob456
    action: enforce

Provider Configuration#

Environment variables:

  • TRUST_POLICY_PATH - Path to trust policy file (default: /config/trust-policy.yaml)
  • HTTP_PORT - HTTP server port (default: 8080)
  • LOG_LEVEL - Log level: debug, info, warn, error (default: info)
  • CACHE_ENABLED - Enable caching (default: true)
  • CACHE_TTL - Cache TTL in seconds (default: 300)
  • DID_RESOLVER_TIMEOUT - DID resolution timeout (default: 10s)
  • PDS_TIMEOUT - PDS XRPC timeout (default: 10s)

Rego Policy Examples#

Simple Verification#

package atcrsignatures

import future.keywords.contains
import future.keywords.if
import future.keywords.in

provider := "atcr-verifier"

violation[{"msg": msg}] {
    container := input.review.object.spec.containers[_]
    startswith(container.image, "atcr.io/")

    # Call external provider
    response := external_data({
        "provider": provider,
        "keys": ["image"],
        "values": [container.image]
    })

    # Check verification result
    not response[_].verified == true

    msg := sprintf("Image %v has no valid ATProto signature", [container.image])
}

Advanced Verification with DID Trust#

package atcrsignatures

import future.keywords.contains
import future.keywords.if
import future.keywords.in

provider := "atcr-verifier"

trusted_dids := [
    "did:plc:alice123",
    "did:plc:bob456"
]

violation[{"msg": msg}] {
    container := input.review.object.spec.containers[_]
    startswith(container.image, "atcr.io/")

    # Call external provider
    response := external_data({
        "provider": provider,
        "keys": ["image"],
        "values": [container.image]
    })

    # Get response for this image
    result := response[_]
    result.image == container.image

    # Check if verified
    not result.verified == true
    msg := sprintf("Image %v failed signature verification: %v", [container.image, result.error])
}

violation[{"msg": msg}] {
    container := input.review.object.spec.containers[_]
    startswith(container.image, "atcr.io/")

    # Call external provider
    response := external_data({
        "provider": provider,
        "keys": ["image"],
        "values": [container.image]
    })

    # Get response for this image
    result := response[_]
    result.image == container.image
    result.verified == true

    # Check DID is trusted
    not result.did in trusted_dids
    msg := sprintf("Image %v signed by untrusted DID: %v", [container.image, result.did])
}

Namespace-Specific Policies#

package atcrsignatures

import future.keywords.contains
import future.keywords.if
import future.keywords.in

provider := "atcr-verifier"

# Production namespaces require signatures
production_namespaces := ["production", "prod", "staging"]

violation[{"msg": msg}] {
    # Only apply to production namespaces
    input.review.object.metadata.namespace in production_namespaces

    container := input.review.object.spec.containers[_]
    startswith(container.image, "atcr.io/")

    # Call external provider
    response := external_data({
        "provider": provider,
        "keys": ["image"],
        "values": [container.image]
    })

    # Check verification result
    not response[_].verified == true

    msg := sprintf("Production namespace requires signed images. Image %v is not signed", [container.image])
}

Performance Considerations#

Caching#

The provider caches:

  • Signature verification results (TTL: 5 minutes)
  • DID documents (TTL: 5 minutes)
  • PDS endpoints (TTL: 5 minutes)
  • Public keys (TTL: 5 minutes)

Enable/disable via CACHE_ENABLED environment variable.

Timeouts#

  • DID_RESOLVER_TIMEOUT - DID resolution timeout (default: 10s)
  • PDS_TIMEOUT - PDS XRPC calls timeout (default: 10s)
  • HTTP client timeout: 30s total

Horizontal Scaling#

The provider is stateless and can be scaled horizontally:

apiVersion: apps/v1
kind: Deployment
spec:
  replicas: 3  # Scale up for high traffic

Rate Limiting#

Consider implementing rate limiting for:

  • Gatekeeper → Provider requests
  • Provider → DID resolver
  • Provider → PDS

Monitoring#

Metrics#

The provider exposes Prometheus metrics at /metrics:

# Request metrics
atcr_provider_requests_total{status="success|failure"}
atcr_provider_request_duration_seconds

# Verification metrics
atcr_provider_verifications_total{result="verified|failed|error"}
atcr_provider_verification_duration_seconds

# Cache metrics
atcr_provider_cache_hits_total
atcr_provider_cache_misses_total

Logging#

Structured JSON logging with fields:

  • image - Image being verified
  • did - Signer DID (if found)
  • duration - Verification duration
  • error - Error message (if failed)

Health Checks#

# Liveness probe
curl http://localhost:8080/health

# Readiness probe
curl http://localhost:8080/ready

Troubleshooting#

Provider Not Reachable#

# Check provider pod status
kubectl get pods -n gatekeeper-system -l app=atcr-provider

# Check service
kubectl get svc -n gatekeeper-system atcr-provider

# Test connectivity from Gatekeeper pod
kubectl exec -n gatekeeper-system deployment/gatekeeper-controller-manager -- \
  curl http://atcr-provider.gatekeeper-system/health

Verification Failing#

# Check provider logs
kubectl logs -n gatekeeper-system deployment/atcr-provider

# Test verification manually
kubectl run test-curl --rm -it --image=curlimages/curl -- \
  curl -X POST http://atcr-provider.gatekeeper-system/provide \
  -H "Content-Type: application/json" \
  -d '{"keys":["image"],"values":["atcr.io/alice/myapp:latest"]}'

Policy Not Enforcing#

# Check Gatekeeper logs
kubectl logs -n gatekeeper-system deployment/gatekeeper-controller-manager

# Check constraint status
kubectl get constraint atcr-signatures-required -o yaml

# Test policy manually with conftest
conftest test -p constraint-template.yaml pod.yaml

Security Considerations#

Network Policies#

Restrict network access:

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: atcr-provider
  namespace: gatekeeper-system
spec:
  podSelector:
    matchLabels:
      app: atcr-provider
  ingress:
  - from:
    - podSelector:
        matchLabels:
          control-plane: controller-manager  # Gatekeeper
    ports:
    - port: 8080
  egress:
  - to:  # PLC directory
    - namespaceSelector: {}
    ports:
    - port: 443

Authentication#

The provider should only be accessible from Gatekeeper. Options:

  • Network policies (recommended for Kubernetes)
  • Mutual TLS
  • API tokens

Trust Policy Management#

  • Store trust policy in version control
  • Use GitOps (Flux, ArgoCD) for updates
  • Review DID changes carefully
  • Audit policy modifications

See Also#

Support#

For issues or questions: