Kubernetes Operator for Tangled Spindles
1/*
2Copyright 2025 Evan Jarrett.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package main
18
19import (
20 "context"
21 "crypto/tls"
22 _ "embed"
23 "flag"
24 "fmt"
25 "os"
26 "path/filepath"
27
28 // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
29 // to ensure that exec-entrypoint and run can make use of them.
30 _ "k8s.io/client-go/plugin/pkg/client/auth"
31
32 "gopkg.in/yaml.v3"
33 corev1 "k8s.io/api/core/v1"
34 "k8s.io/apimachinery/pkg/api/resource"
35 "k8s.io/apimachinery/pkg/runtime"
36 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
38 ctrl "sigs.k8s.io/controller-runtime"
39 "sigs.k8s.io/controller-runtime/pkg/certwatcher"
40 "sigs.k8s.io/controller-runtime/pkg/healthz"
41 "sigs.k8s.io/controller-runtime/pkg/log/zap"
42 "sigs.k8s.io/controller-runtime/pkg/metrics/filters"
43 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
44 "sigs.k8s.io/controller-runtime/pkg/webhook"
45
46 "tangled.org/core/spindle"
47 "tangled.org/core/spindle/config"
48 "tangled.org/core/spindle/models"
49
50 loomv1alpha1 "tangled.org/evan.jarrett.net/loom/api/v1alpha1"
51 "tangled.org/evan.jarrett.net/loom/internal/controller"
52 "tangled.org/evan.jarrett.net/loom/internal/engine"
53 // +kubebuilder:scaffold:imports
54)
55
56// LoomConfig holds configuration from the ConfigMap
57type LoomConfig struct {
58 MaxConcurrentJobs int `yaml:"maxConcurrentJobs"`
59 Template LoomTemplateConfig `yaml:"template"`
60}
61
62// LoomTemplateConfig holds job template configuration
63type LoomTemplateConfig struct {
64 ResourceProfiles []ResourceProfileConfig `yaml:"resourceProfiles"`
65}
66
67// ResourceProfileConfig holds a resource profile from ConfigMap
68type ResourceProfileConfig struct {
69 NodeSelector map[string]string `yaml:"nodeSelector"`
70 Resources ResourceConfig `yaml:"resources"`
71}
72
73// ResourceConfig holds resource requirements as strings for parsing
74type ResourceConfig struct {
75 Requests ResourceValues `yaml:"requests"`
76 Limits ResourceValues `yaml:"limits"`
77}
78
79// ResourceValues holds CPU and memory as strings
80type ResourceValues struct {
81 CPU string `yaml:"cpu"`
82 Memory string `yaml:"memory"`
83}
84
85//go:embed motd.txt
86var motd []byte
87
88var (
89 scheme = runtime.NewScheme()
90 setupLog = ctrl.Log.WithName("setup")
91)
92
93func init() {
94 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
95
96 utilruntime.Must(loomv1alpha1.AddToScheme(scheme))
97 // +kubebuilder:scaffold:scheme
98}
99
100// loadLoomConfig loads configuration from the ConfigMap file
101func loadLoomConfig(configPath string) (*LoomConfig, error) {
102 // Read config file
103 data, err := os.ReadFile(configPath)
104 if err != nil {
105 return nil, fmt.Errorf("failed to read config file: %w", err)
106 }
107
108 // Parse YAML
109 var cfg LoomConfig
110 if err := yaml.Unmarshal(data, &cfg); err != nil {
111 return nil, fmt.Errorf("failed to parse config: %w", err)
112 }
113
114 return &cfg, nil
115}
116
117// convertToResourceRequirements converts string-based resource config to Kubernetes ResourceRequirements
118func convertToResourceRequirements(cfg ResourceConfig) (corev1.ResourceRequirements, error) {
119 reqs := corev1.ResourceRequirements{
120 Requests: corev1.ResourceList{},
121 Limits: corev1.ResourceList{},
122 }
123
124 // Parse requests
125 if cfg.Requests.CPU != "" {
126 cpuQty, err := resource.ParseQuantity(cfg.Requests.CPU)
127 if err != nil {
128 return reqs, fmt.Errorf("invalid CPU request %q: %w", cfg.Requests.CPU, err)
129 }
130 reqs.Requests[corev1.ResourceCPU] = cpuQty
131 }
132 if cfg.Requests.Memory != "" {
133 memQty, err := resource.ParseQuantity(cfg.Requests.Memory)
134 if err != nil {
135 return reqs, fmt.Errorf("invalid memory request %q: %w", cfg.Requests.Memory, err)
136 }
137 reqs.Requests[corev1.ResourceMemory] = memQty
138 }
139
140 // Parse limits
141 if cfg.Limits.CPU != "" {
142 cpuQty, err := resource.ParseQuantity(cfg.Limits.CPU)
143 if err != nil {
144 return reqs, fmt.Errorf("invalid CPU limit %q: %w", cfg.Limits.CPU, err)
145 }
146 reqs.Limits[corev1.ResourceCPU] = cpuQty
147 }
148 if cfg.Limits.Memory != "" {
149 memQty, err := resource.ParseQuantity(cfg.Limits.Memory)
150 if err != nil {
151 return reqs, fmt.Errorf("invalid memory limit %q: %w", cfg.Limits.Memory, err)
152 }
153 reqs.Limits[corev1.ResourceMemory] = memQty
154 }
155
156 return reqs, nil
157}
158
159// convertToResourceProfiles converts ConfigMap profiles to API ResourceProfiles
160func convertToResourceProfiles(profiles []ResourceProfileConfig) ([]loomv1alpha1.ResourceProfile, error) {
161 result := make([]loomv1alpha1.ResourceProfile, 0, len(profiles))
162
163 for i, p := range profiles {
164 resources, err := convertToResourceRequirements(p.Resources)
165 if err != nil {
166 return nil, fmt.Errorf("failed to convert profile %d resources: %w", i, err)
167 }
168
169 result = append(result, loomv1alpha1.ResourceProfile{
170 NodeSelector: p.NodeSelector,
171 Resources: resources,
172 })
173 }
174
175 return result, nil
176}
177
178// initializeSpindle creates a spindle server with KubernetesEngine
179func initializeSpindle(ctx context.Context, cfg *config.Config, mgr ctrl.Manager, loomCfg *LoomConfig) (*spindle.Spindle, error) {
180 // Initialize Kubernetes engine
181 // Get namespace from environment (injected via Downward API)
182 namespace := os.Getenv("POD_NAMESPACE")
183 if namespace == "" {
184 namespace = "default"
185 }
186
187 // Convert resource profiles to Kubernetes types
188 profiles, err := convertToResourceProfiles(loomCfg.Template.ResourceProfiles)
189 if err != nil {
190 return nil, fmt.Errorf("failed to convert resource profiles: %w", err)
191 }
192
193 // Create template from loom config
194 template := loomv1alpha1.SpindleTemplate{
195 ResourceProfiles: profiles,
196 }
197
198 // Create spindle server first (without engine) to get access to vault
199 s, err := spindle.New(ctx, cfg, map[string]models.Engine{})
200 if err != nil {
201 return nil, fmt.Errorf("failed to create spindle: %w", err)
202 }
203
204 // Now create kubernetes engine with access to vault
205 kubeEngine := engine.NewKubernetesEngine(mgr.GetClient(), mgr.GetConfig(), namespace, template, s.Vault())
206
207 // Register the engine with spindle by adding to the engines map
208 s.Engines()["kubernetes"] = kubeEngine
209
210 return s, nil
211}
212
213// nolint:gocyclo
214func main() {
215 var metricsAddr string
216 var metricsCertPath, metricsCertName, metricsCertKey string
217 var webhookCertPath, webhookCertName, webhookCertKey string
218 var enableLeaderElection bool
219 var probeAddr string
220 var secureMetrics bool
221 var enableHTTP2 bool
222 var tlsOpts []func(*tls.Config)
223 flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
224 "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
225 flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
226 flag.BoolVar(&enableLeaderElection, "leader-elect", false,
227 "Enable leader election for controller manager. "+
228 "Enabling this will ensure there is only one active controller manager.")
229 flag.BoolVar(&secureMetrics, "metrics-secure", true,
230 "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
231 flag.StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.")
232 flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.")
233 flag.StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.")
234 flag.StringVar(&metricsCertPath, "metrics-cert-path", "",
235 "The directory that contains the metrics server certificate.")
236 flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.")
237 flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
238 flag.BoolVar(&enableHTTP2, "enable-http2", false,
239 "If set, HTTP/2 will be enabled for the metrics and webhook servers")
240 opts := zap.Options{
241 Development: true,
242 }
243 opts.BindFlags(flag.CommandLine)
244 flag.Parse()
245
246 ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
247
248 // if the enable-http2 flag is false (the default), http/2 should be disabled
249 // due to its vulnerabilities. More specifically, disabling http/2 will
250 // prevent from being vulnerable to the HTTP/2 Stream Cancellation and
251 // Rapid Reset CVEs. For more information see:
252 // - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
253 // - https://github.com/advisories/GHSA-4374-p667-p6c8
254 disableHTTP2 := func(c *tls.Config) {
255 setupLog.Info("disabling http/2")
256 c.NextProtos = []string{"http/1.1"}
257 }
258
259 if !enableHTTP2 {
260 tlsOpts = append(tlsOpts, disableHTTP2)
261 }
262
263 // Create watchers for metrics and webhooks certificates
264 var metricsCertWatcher, webhookCertWatcher *certwatcher.CertWatcher
265
266 // Initial webhook TLS options
267 webhookTLSOpts := tlsOpts
268
269 if len(webhookCertPath) > 0 {
270 setupLog.Info("Initializing webhook certificate watcher using provided certificates",
271 "webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey)
272
273 var err error
274 webhookCertWatcher, err = certwatcher.New(
275 filepath.Join(webhookCertPath, webhookCertName),
276 filepath.Join(webhookCertPath, webhookCertKey),
277 )
278 if err != nil {
279 setupLog.Error(err, "Failed to initialize webhook certificate watcher")
280 os.Exit(1)
281 }
282
283 webhookTLSOpts = append(webhookTLSOpts, func(config *tls.Config) {
284 config.GetCertificate = webhookCertWatcher.GetCertificate
285 })
286 }
287
288 webhookServer := webhook.NewServer(webhook.Options{
289 TLSOpts: webhookTLSOpts,
290 })
291
292 // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
293 // More info:
294 // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/metrics/server
295 // - https://book.kubebuilder.io/reference/metrics.html
296 metricsServerOptions := metricsserver.Options{
297 BindAddress: metricsAddr,
298 SecureServing: secureMetrics,
299 TLSOpts: tlsOpts,
300 }
301
302 if secureMetrics {
303 // FilterProvider is used to protect the metrics endpoint with authn/authz.
304 // These configurations ensure that only authorized users and service accounts
305 // can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info:
306 // https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/metrics/filters#WithAuthenticationAndAuthorization
307 metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization
308 }
309
310 // If the certificate is not specified, controller-runtime will automatically
311 // generate self-signed certificates for the metrics server. While convenient for development and testing,
312 // this setup is not recommended for production.
313 //
314 // TODO(user): If you enable certManager, uncomment the following lines:
315 // - [METRICS-WITH-CERTS] at config/default/kustomization.yaml to generate and use certificates
316 // managed by cert-manager for the metrics server.
317 // - [PROMETHEUS-WITH-CERTS] at config/prometheus/kustomization.yaml for TLS certification.
318 if len(metricsCertPath) > 0 {
319 setupLog.Info("Initializing metrics certificate watcher using provided certificates",
320 "metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey)
321
322 var err error
323 metricsCertWatcher, err = certwatcher.New(
324 filepath.Join(metricsCertPath, metricsCertName),
325 filepath.Join(metricsCertPath, metricsCertKey),
326 )
327 if err != nil {
328 setupLog.Error(err, "to initialize metrics certificate watcher", "error", err)
329 os.Exit(1)
330 }
331
332 metricsServerOptions.TLSOpts = append(metricsServerOptions.TLSOpts, func(config *tls.Config) {
333 config.GetCertificate = metricsCertWatcher.GetCertificate
334 })
335 }
336
337 // Create context for spindle initialization
338 ctx := context.Background()
339
340 // Load loom configuration from ConfigMap
341 loomCfg, err := loadLoomConfig("/etc/loom/config.yaml")
342 if err != nil {
343 setupLog.Error(err, "failed to load loom config")
344 os.Exit(1)
345 }
346 setupLog.Info("Loom configuration loaded",
347 "maxConcurrentJobs", loomCfg.MaxConcurrentJobs,
348 "resourceProfiles", len(loomCfg.Template.ResourceProfiles))
349
350 // Load spindle configuration from environment
351 spindleCfg, err := config.Load(ctx)
352 if err != nil {
353 setupLog.Error(err, "failed to load spindle config")
354 os.Exit(1)
355 }
356
357 // Override maxJobCount from loom config
358 if loomCfg.MaxConcurrentJobs > 0 {
359 spindleCfg.Server.MaxJobCount = loomCfg.MaxConcurrentJobs
360 }
361
362 // Write embedded MOTD to temp file and configure spindle to use it
363 motdFile, err := os.CreateTemp("", "loom-motd-*.txt")
364 if err != nil {
365 setupLog.Error(err, "failed to create MOTD temp file")
366 os.Exit(1)
367 }
368 if _, err := motdFile.Write(motd); err != nil {
369 setupLog.Error(err, "failed to write MOTD temp file")
370 os.Exit(1)
371 }
372 motdFile.Close()
373 spindleCfg.Server.MOTDFile = motdFile.Name()
374
375 mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
376 Scheme: scheme,
377 Metrics: metricsServerOptions,
378 WebhookServer: webhookServer,
379 HealthProbeBindAddress: probeAddr,
380 LeaderElection: enableLeaderElection,
381 LeaderElectionID: "d9c2f6b8.j5t.io",
382 // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
383 // when the Manager ends. This requires the binary to immediately end when the
384 // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
385 // speeds up voluntary leader transitions as the new leader don't have to wait
386 // LeaseDuration time first.
387 //
388 // In the default scaffold provided, the program ends immediately after
389 // the manager stops, so would be fine to enable this option. However,
390 // if you are doing or is intended to do any operation such as perform cleanups
391 // after the manager stops then its usage might be unsafe.
392 // LeaderElectionReleaseOnCancel: true,
393 })
394 if err != nil {
395 setupLog.Error(err, "unable to start manager")
396 os.Exit(1)
397 }
398
399 // Initialize spindle server with KubernetesEngine
400 s, err := initializeSpindle(ctx, spindleCfg, mgr, loomCfg)
401 if err != nil {
402 setupLog.Error(err, "failed to initialize spindle")
403 os.Exit(1)
404 }
405 defer s.Queue().Stop()
406
407 setupLog.Info("spindle server initialized successfully")
408
409 // Start spindle HTTP server in background
410 go func() {
411 setupLog.Info("starting spindle HTTP server", "address", spindleCfg.Server.ListenAddr)
412 if err := s.Start(ctx); err != nil {
413 setupLog.Error(err, "spindle HTTP server error")
414 }
415 }()
416
417 // Get loom image from environment (used for runner init container)
418 loomImage := os.Getenv("LOOM_IMAGE")
419 if loomImage == "" {
420 loomImage = "atcr.io/evan.jarrett.net/loom:latest" // default fallback
421 }
422
423 // Setup controller with spindle components
424 if err := (&controller.SpindleSetReconciler{
425 Client: mgr.GetClient(),
426 Scheme: mgr.GetScheme(),
427 Config: mgr.GetConfig(),
428 Spindle: s,
429 LoomImage: loomImage,
430 }).SetupWithManager(mgr); err != nil {
431 setupLog.Error(err, "unable to create controller", "controller", "SpindleSet")
432 os.Exit(1)
433 }
434 // +kubebuilder:scaffold:builder
435
436 if metricsCertWatcher != nil {
437 setupLog.Info("Adding metrics certificate watcher to manager")
438 if err := mgr.Add(metricsCertWatcher); err != nil {
439 setupLog.Error(err, "unable to add metrics certificate watcher to manager")
440 os.Exit(1)
441 }
442 }
443
444 if webhookCertWatcher != nil {
445 setupLog.Info("Adding webhook certificate watcher to manager")
446 if err := mgr.Add(webhookCertWatcher); err != nil {
447 setupLog.Error(err, "unable to add webhook certificate watcher to manager")
448 os.Exit(1)
449 }
450 }
451
452 if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
453 setupLog.Error(err, "unable to set up health check")
454 os.Exit(1)
455 }
456 if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
457 setupLog.Error(err, "unable to set up ready check")
458 os.Exit(1)
459 }
460
461 setupLog.Info("starting manager")
462 if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
463 setupLog.Error(err, "problem running manager")
464 os.Exit(1)
465 }
466}