Kubernetes Operator for Tangled Spindles
at main 466 lines 16 kB view raw
1/* 2Copyright 2025 Evan Jarrett. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package main 18 19import ( 20 "context" 21 "crypto/tls" 22 _ "embed" 23 "flag" 24 "fmt" 25 "os" 26 "path/filepath" 27 28 // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) 29 // to ensure that exec-entrypoint and run can make use of them. 30 _ "k8s.io/client-go/plugin/pkg/client/auth" 31 32 "gopkg.in/yaml.v3" 33 corev1 "k8s.io/api/core/v1" 34 "k8s.io/apimachinery/pkg/api/resource" 35 "k8s.io/apimachinery/pkg/runtime" 36 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 37 clientgoscheme "k8s.io/client-go/kubernetes/scheme" 38 ctrl "sigs.k8s.io/controller-runtime" 39 "sigs.k8s.io/controller-runtime/pkg/certwatcher" 40 "sigs.k8s.io/controller-runtime/pkg/healthz" 41 "sigs.k8s.io/controller-runtime/pkg/log/zap" 42 "sigs.k8s.io/controller-runtime/pkg/metrics/filters" 43 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" 44 "sigs.k8s.io/controller-runtime/pkg/webhook" 45 46 "tangled.org/core/spindle" 47 "tangled.org/core/spindle/config" 48 "tangled.org/core/spindle/models" 49 50 loomv1alpha1 "tangled.org/evan.jarrett.net/loom/api/v1alpha1" 51 "tangled.org/evan.jarrett.net/loom/internal/controller" 52 "tangled.org/evan.jarrett.net/loom/internal/engine" 53 // +kubebuilder:scaffold:imports 54) 55 56// LoomConfig holds configuration from the ConfigMap 57type LoomConfig struct { 58 MaxConcurrentJobs int `yaml:"maxConcurrentJobs"` 59 Template LoomTemplateConfig `yaml:"template"` 60} 61 62// LoomTemplateConfig holds job template configuration 63type LoomTemplateConfig struct { 64 ResourceProfiles []ResourceProfileConfig `yaml:"resourceProfiles"` 65} 66 67// ResourceProfileConfig holds a resource profile from ConfigMap 68type ResourceProfileConfig struct { 69 NodeSelector map[string]string `yaml:"nodeSelector"` 70 Resources ResourceConfig `yaml:"resources"` 71} 72 73// ResourceConfig holds resource requirements as strings for parsing 74type ResourceConfig struct { 75 Requests ResourceValues `yaml:"requests"` 76 Limits ResourceValues `yaml:"limits"` 77} 78 79// ResourceValues holds CPU and memory as strings 80type ResourceValues struct { 81 CPU string `yaml:"cpu"` 82 Memory string `yaml:"memory"` 83} 84 85//go:embed motd.txt 86var motd []byte 87 88var ( 89 scheme = runtime.NewScheme() 90 setupLog = ctrl.Log.WithName("setup") 91) 92 93func init() { 94 utilruntime.Must(clientgoscheme.AddToScheme(scheme)) 95 96 utilruntime.Must(loomv1alpha1.AddToScheme(scheme)) 97 // +kubebuilder:scaffold:scheme 98} 99 100// loadLoomConfig loads configuration from the ConfigMap file 101func loadLoomConfig(configPath string) (*LoomConfig, error) { 102 // Read config file 103 data, err := os.ReadFile(configPath) 104 if err != nil { 105 return nil, fmt.Errorf("failed to read config file: %w", err) 106 } 107 108 // Parse YAML 109 var cfg LoomConfig 110 if err := yaml.Unmarshal(data, &cfg); err != nil { 111 return nil, fmt.Errorf("failed to parse config: %w", err) 112 } 113 114 return &cfg, nil 115} 116 117// convertToResourceRequirements converts string-based resource config to Kubernetes ResourceRequirements 118func convertToResourceRequirements(cfg ResourceConfig) (corev1.ResourceRequirements, error) { 119 reqs := corev1.ResourceRequirements{ 120 Requests: corev1.ResourceList{}, 121 Limits: corev1.ResourceList{}, 122 } 123 124 // Parse requests 125 if cfg.Requests.CPU != "" { 126 cpuQty, err := resource.ParseQuantity(cfg.Requests.CPU) 127 if err != nil { 128 return reqs, fmt.Errorf("invalid CPU request %q: %w", cfg.Requests.CPU, err) 129 } 130 reqs.Requests[corev1.ResourceCPU] = cpuQty 131 } 132 if cfg.Requests.Memory != "" { 133 memQty, err := resource.ParseQuantity(cfg.Requests.Memory) 134 if err != nil { 135 return reqs, fmt.Errorf("invalid memory request %q: %w", cfg.Requests.Memory, err) 136 } 137 reqs.Requests[corev1.ResourceMemory] = memQty 138 } 139 140 // Parse limits 141 if cfg.Limits.CPU != "" { 142 cpuQty, err := resource.ParseQuantity(cfg.Limits.CPU) 143 if err != nil { 144 return reqs, fmt.Errorf("invalid CPU limit %q: %w", cfg.Limits.CPU, err) 145 } 146 reqs.Limits[corev1.ResourceCPU] = cpuQty 147 } 148 if cfg.Limits.Memory != "" { 149 memQty, err := resource.ParseQuantity(cfg.Limits.Memory) 150 if err != nil { 151 return reqs, fmt.Errorf("invalid memory limit %q: %w", cfg.Limits.Memory, err) 152 } 153 reqs.Limits[corev1.ResourceMemory] = memQty 154 } 155 156 return reqs, nil 157} 158 159// convertToResourceProfiles converts ConfigMap profiles to API ResourceProfiles 160func convertToResourceProfiles(profiles []ResourceProfileConfig) ([]loomv1alpha1.ResourceProfile, error) { 161 result := make([]loomv1alpha1.ResourceProfile, 0, len(profiles)) 162 163 for i, p := range profiles { 164 resources, err := convertToResourceRequirements(p.Resources) 165 if err != nil { 166 return nil, fmt.Errorf("failed to convert profile %d resources: %w", i, err) 167 } 168 169 result = append(result, loomv1alpha1.ResourceProfile{ 170 NodeSelector: p.NodeSelector, 171 Resources: resources, 172 }) 173 } 174 175 return result, nil 176} 177 178// initializeSpindle creates a spindle server with KubernetesEngine 179func initializeSpindle(ctx context.Context, cfg *config.Config, mgr ctrl.Manager, loomCfg *LoomConfig) (*spindle.Spindle, error) { 180 // Initialize Kubernetes engine 181 // Get namespace from environment (injected via Downward API) 182 namespace := os.Getenv("POD_NAMESPACE") 183 if namespace == "" { 184 namespace = "default" 185 } 186 187 // Convert resource profiles to Kubernetes types 188 profiles, err := convertToResourceProfiles(loomCfg.Template.ResourceProfiles) 189 if err != nil { 190 return nil, fmt.Errorf("failed to convert resource profiles: %w", err) 191 } 192 193 // Create template from loom config 194 template := loomv1alpha1.SpindleTemplate{ 195 ResourceProfiles: profiles, 196 } 197 198 // Create spindle server first (without engine) to get access to vault 199 s, err := spindle.New(ctx, cfg, map[string]models.Engine{}) 200 if err != nil { 201 return nil, fmt.Errorf("failed to create spindle: %w", err) 202 } 203 204 // Now create kubernetes engine with access to vault 205 kubeEngine := engine.NewKubernetesEngine(mgr.GetClient(), mgr.GetConfig(), namespace, template, s.Vault()) 206 207 // Register the engine with spindle by adding to the engines map 208 s.Engines()["kubernetes"] = kubeEngine 209 210 return s, nil 211} 212 213// nolint:gocyclo 214func main() { 215 var metricsAddr string 216 var metricsCertPath, metricsCertName, metricsCertKey string 217 var webhookCertPath, webhookCertName, webhookCertKey string 218 var enableLeaderElection bool 219 var probeAddr string 220 var secureMetrics bool 221 var enableHTTP2 bool 222 var tlsOpts []func(*tls.Config) 223 flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ 224 "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") 225 flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") 226 flag.BoolVar(&enableLeaderElection, "leader-elect", false, 227 "Enable leader election for controller manager. "+ 228 "Enabling this will ensure there is only one active controller manager.") 229 flag.BoolVar(&secureMetrics, "metrics-secure", true, 230 "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.") 231 flag.StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.") 232 flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.") 233 flag.StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.") 234 flag.StringVar(&metricsCertPath, "metrics-cert-path", "", 235 "The directory that contains the metrics server certificate.") 236 flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.") 237 flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.") 238 flag.BoolVar(&enableHTTP2, "enable-http2", false, 239 "If set, HTTP/2 will be enabled for the metrics and webhook servers") 240 opts := zap.Options{ 241 Development: true, 242 } 243 opts.BindFlags(flag.CommandLine) 244 flag.Parse() 245 246 ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) 247 248 // if the enable-http2 flag is false (the default), http/2 should be disabled 249 // due to its vulnerabilities. More specifically, disabling http/2 will 250 // prevent from being vulnerable to the HTTP/2 Stream Cancellation and 251 // Rapid Reset CVEs. For more information see: 252 // - https://github.com/advisories/GHSA-qppj-fm5r-hxr3 253 // - https://github.com/advisories/GHSA-4374-p667-p6c8 254 disableHTTP2 := func(c *tls.Config) { 255 setupLog.Info("disabling http/2") 256 c.NextProtos = []string{"http/1.1"} 257 } 258 259 if !enableHTTP2 { 260 tlsOpts = append(tlsOpts, disableHTTP2) 261 } 262 263 // Create watchers for metrics and webhooks certificates 264 var metricsCertWatcher, webhookCertWatcher *certwatcher.CertWatcher 265 266 // Initial webhook TLS options 267 webhookTLSOpts := tlsOpts 268 269 if len(webhookCertPath) > 0 { 270 setupLog.Info("Initializing webhook certificate watcher using provided certificates", 271 "webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey) 272 273 var err error 274 webhookCertWatcher, err = certwatcher.New( 275 filepath.Join(webhookCertPath, webhookCertName), 276 filepath.Join(webhookCertPath, webhookCertKey), 277 ) 278 if err != nil { 279 setupLog.Error(err, "Failed to initialize webhook certificate watcher") 280 os.Exit(1) 281 } 282 283 webhookTLSOpts = append(webhookTLSOpts, func(config *tls.Config) { 284 config.GetCertificate = webhookCertWatcher.GetCertificate 285 }) 286 } 287 288 webhookServer := webhook.NewServer(webhook.Options{ 289 TLSOpts: webhookTLSOpts, 290 }) 291 292 // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. 293 // More info: 294 // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/metrics/server 295 // - https://book.kubebuilder.io/reference/metrics.html 296 metricsServerOptions := metricsserver.Options{ 297 BindAddress: metricsAddr, 298 SecureServing: secureMetrics, 299 TLSOpts: tlsOpts, 300 } 301 302 if secureMetrics { 303 // FilterProvider is used to protect the metrics endpoint with authn/authz. 304 // These configurations ensure that only authorized users and service accounts 305 // can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info: 306 // https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/metrics/filters#WithAuthenticationAndAuthorization 307 metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization 308 } 309 310 // If the certificate is not specified, controller-runtime will automatically 311 // generate self-signed certificates for the metrics server. While convenient for development and testing, 312 // this setup is not recommended for production. 313 // 314 // TODO(user): If you enable certManager, uncomment the following lines: 315 // - [METRICS-WITH-CERTS] at config/default/kustomization.yaml to generate and use certificates 316 // managed by cert-manager for the metrics server. 317 // - [PROMETHEUS-WITH-CERTS] at config/prometheus/kustomization.yaml for TLS certification. 318 if len(metricsCertPath) > 0 { 319 setupLog.Info("Initializing metrics certificate watcher using provided certificates", 320 "metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey) 321 322 var err error 323 metricsCertWatcher, err = certwatcher.New( 324 filepath.Join(metricsCertPath, metricsCertName), 325 filepath.Join(metricsCertPath, metricsCertKey), 326 ) 327 if err != nil { 328 setupLog.Error(err, "to initialize metrics certificate watcher", "error", err) 329 os.Exit(1) 330 } 331 332 metricsServerOptions.TLSOpts = append(metricsServerOptions.TLSOpts, func(config *tls.Config) { 333 config.GetCertificate = metricsCertWatcher.GetCertificate 334 }) 335 } 336 337 // Create context for spindle initialization 338 ctx := context.Background() 339 340 // Load loom configuration from ConfigMap 341 loomCfg, err := loadLoomConfig("/etc/loom/config.yaml") 342 if err != nil { 343 setupLog.Error(err, "failed to load loom config") 344 os.Exit(1) 345 } 346 setupLog.Info("Loom configuration loaded", 347 "maxConcurrentJobs", loomCfg.MaxConcurrentJobs, 348 "resourceProfiles", len(loomCfg.Template.ResourceProfiles)) 349 350 // Load spindle configuration from environment 351 spindleCfg, err := config.Load(ctx) 352 if err != nil { 353 setupLog.Error(err, "failed to load spindle config") 354 os.Exit(1) 355 } 356 357 // Override maxJobCount from loom config 358 if loomCfg.MaxConcurrentJobs > 0 { 359 spindleCfg.Server.MaxJobCount = loomCfg.MaxConcurrentJobs 360 } 361 362 // Write embedded MOTD to temp file and configure spindle to use it 363 motdFile, err := os.CreateTemp("", "loom-motd-*.txt") 364 if err != nil { 365 setupLog.Error(err, "failed to create MOTD temp file") 366 os.Exit(1) 367 } 368 if _, err := motdFile.Write(motd); err != nil { 369 setupLog.Error(err, "failed to write MOTD temp file") 370 os.Exit(1) 371 } 372 motdFile.Close() 373 spindleCfg.Server.MOTDFile = motdFile.Name() 374 375 mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ 376 Scheme: scheme, 377 Metrics: metricsServerOptions, 378 WebhookServer: webhookServer, 379 HealthProbeBindAddress: probeAddr, 380 LeaderElection: enableLeaderElection, 381 LeaderElectionID: "d9c2f6b8.j5t.io", 382 // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily 383 // when the Manager ends. This requires the binary to immediately end when the 384 // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly 385 // speeds up voluntary leader transitions as the new leader don't have to wait 386 // LeaseDuration time first. 387 // 388 // In the default scaffold provided, the program ends immediately after 389 // the manager stops, so would be fine to enable this option. However, 390 // if you are doing or is intended to do any operation such as perform cleanups 391 // after the manager stops then its usage might be unsafe. 392 // LeaderElectionReleaseOnCancel: true, 393 }) 394 if err != nil { 395 setupLog.Error(err, "unable to start manager") 396 os.Exit(1) 397 } 398 399 // Initialize spindle server with KubernetesEngine 400 s, err := initializeSpindle(ctx, spindleCfg, mgr, loomCfg) 401 if err != nil { 402 setupLog.Error(err, "failed to initialize spindle") 403 os.Exit(1) 404 } 405 defer s.Queue().Stop() 406 407 setupLog.Info("spindle server initialized successfully") 408 409 // Start spindle HTTP server in background 410 go func() { 411 setupLog.Info("starting spindle HTTP server", "address", spindleCfg.Server.ListenAddr) 412 if err := s.Start(ctx); err != nil { 413 setupLog.Error(err, "spindle HTTP server error") 414 } 415 }() 416 417 // Get loom image from environment (used for runner init container) 418 loomImage := os.Getenv("LOOM_IMAGE") 419 if loomImage == "" { 420 loomImage = "atcr.io/evan.jarrett.net/loom:latest" // default fallback 421 } 422 423 // Setup controller with spindle components 424 if err := (&controller.SpindleSetReconciler{ 425 Client: mgr.GetClient(), 426 Scheme: mgr.GetScheme(), 427 Config: mgr.GetConfig(), 428 Spindle: s, 429 LoomImage: loomImage, 430 }).SetupWithManager(mgr); err != nil { 431 setupLog.Error(err, "unable to create controller", "controller", "SpindleSet") 432 os.Exit(1) 433 } 434 // +kubebuilder:scaffold:builder 435 436 if metricsCertWatcher != nil { 437 setupLog.Info("Adding metrics certificate watcher to manager") 438 if err := mgr.Add(metricsCertWatcher); err != nil { 439 setupLog.Error(err, "unable to add metrics certificate watcher to manager") 440 os.Exit(1) 441 } 442 } 443 444 if webhookCertWatcher != nil { 445 setupLog.Info("Adding webhook certificate watcher to manager") 446 if err := mgr.Add(webhookCertWatcher); err != nil { 447 setupLog.Error(err, "unable to add webhook certificate watcher to manager") 448 os.Exit(1) 449 } 450 } 451 452 if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { 453 setupLog.Error(err, "unable to set up health check") 454 os.Exit(1) 455 } 456 if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { 457 setupLog.Error(err, "unable to set up ready check") 458 os.Exit(1) 459 } 460 461 setupLog.Info("starting manager") 462 if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { 463 setupLog.Error(err, "problem running manager") 464 os.Exit(1) 465 } 466}