Monorepo for Tangled tangled.org

spindle/engine: clean up envs, stdout on dev and use new models

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

authored by anirudh.fi and committed by Tangled a14a3170 fe615590

Changed files
+96 -72
spindle
+7 -1
spindle/config/config.go
··· 15 15 Owner string `env:"OWNER, required"` 16 16 } 17 17 18 + type Pipelines struct { 19 + // TODO: change default to nixery.tangled.sh 20 + Nixery string `env:"NIXERY, default=nixery.dev"` 21 + } 22 + 18 23 type Config struct { 19 - Server Server `env:",prefix=SPINDLE_SERVER_"` 24 + Server Server `env:",prefix=SPINDLE_SERVER_"` 25 + Pipelines Pipelines `env:",prefix=SPINDLE_PIPELINES_"` 20 26 } 21 27 22 28 func Load(ctx context.Context) (*Config, error) {
+21
spindle/engine/ansi_stripper.go
··· 1 + package engine 2 + 3 + import ( 4 + "io" 5 + 6 + "github.com/go-enry/go-enry/v2/regex" 7 + ) 8 + 9 + // regex to match ANSI escape codes (e.g., color codes, cursor moves) 10 + const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))" 11 + 12 + var re = regex.MustCompile(ansi) 13 + 14 + type ansiStrippingWriter struct { 15 + underlying io.Writer 16 + } 17 + 18 + func (w *ansiStrippingWriter) Write(p []byte) (int, error) { 19 + clean := re.ReplaceAll(p, []byte{}) 20 + return w.underlying.Write(clean) 21 + }
+42 -28
spindle/engine/engine.go
··· 7 7 "io" 8 8 "log/slog" 9 9 "os" 10 - "path" 11 10 "strings" 12 11 "sync" 13 12 ··· 18 17 "github.com/docker/docker/api/types/volume" 19 18 "github.com/docker/docker/client" 20 19 "github.com/docker/docker/pkg/stdcopy" 21 - "tangled.sh/tangled.sh/core/api/tangled" 22 20 "tangled.sh/tangled.sh/core/log" 23 21 "tangled.sh/tangled.sh/core/notifier" 22 + "tangled.sh/tangled.sh/core/spindle/config" 24 23 "tangled.sh/tangled.sh/core/spindle/db" 25 24 "tangled.sh/tangled.sh/core/spindle/models" 26 25 ) ··· 36 35 l *slog.Logger 37 36 db *db.DB 38 37 n *notifier.Notifier 38 + cfg *config.Config 39 39 40 40 chanMu sync.RWMutex 41 41 stdoutChans map[string]chan string ··· 45 45 cleanup map[string][]cleanupFunc 46 46 } 47 47 48 - func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { 48 + func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) { 49 49 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 50 50 if err != nil { 51 51 return nil, err ··· 58 58 l: l, 59 59 db: db, 60 60 n: n, 61 + cfg: cfg, 61 62 } 62 63 63 64 e.stdoutChans = make(map[string]chan string, 100) ··· 68 69 return e, nil 69 70 } 70 71 71 - func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) { 72 + func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 72 73 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 73 74 74 75 wg := sync.WaitGroup{} ··· 93 94 } 94 95 defer e.DestroyWorkflow(ctx, wid) 95 96 96 - // TODO: actual checks for image/registry etc. 97 - var deps string 98 - for _, d := range w.Dependencies { 99 - if d.Registry == "nixpkgs" { 100 - deps = path.Join(d.Packages...) 101 - } 102 - } 103 - 104 - // load defaults from somewhere else 105 - deps = path.Join(deps, "bash", "git", "coreutils", "nix") 106 - 107 - cimg := path.Join("nixery.dev", deps) 108 - reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 97 + reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) 109 98 if err != nil { 110 99 e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) 111 100 ··· 119 108 defer reader.Close() 120 109 io.Copy(os.Stdout, reader) 121 110 122 - err = e.StartSteps(ctx, w.Steps, wid, cimg) 111 + err = e.StartSteps(ctx, w.Steps, wid, w.Image) 123 112 if err != nil { 124 113 e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) 125 114 126 - err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 127 - if err != nil { 128 - return err 115 + dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) 116 + if dbErr != nil { 117 + return dbErr 129 118 } 130 119 131 120 return fmt.Errorf("starting steps image: %w", err) ··· 187 176 // StartSteps starts all steps sequentially with the same base image. 188 177 // ONLY marks pipeline as failed if container's exit code is non-zero. 189 178 // All other errors are bubbled up. 190 - func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error { 179 + func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 191 180 // set up logging channels 192 181 e.chanMu.Lock() 193 182 if _, exists := e.stdoutChans[wid.String()]; !exists { ··· 259 248 } 260 249 261 250 if state.ExitCode != 0 { 262 - e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode) 263 - return fmt.Errorf("%s", state.Error) 251 + e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 252 + err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n) 253 + if err != nil { 254 + return err 255 + } 256 + return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled) 264 257 } 265 258 } 266 259 ··· 293 286 Follow: true, 294 287 ShowStdout: true, 295 288 ShowStderr: true, 296 - Details: false, 289 + Details: true, 297 290 Timestamps: false, 298 291 }) 299 292 if err != nil { 300 293 return err 301 294 } 302 295 296 + var devOutput io.Writer = io.Discard 297 + if e.cfg.Server.Dev { 298 + devOutput = &ansiStrippingWriter{underlying: os.Stdout} 299 + } 300 + 301 + tee := io.TeeReader(logs, devOutput) 302 + 303 303 // using StdCopy we demux logs and stream stdout and stderr to different 304 304 // channels. 305 305 // ··· 310 310 rpipeOut, wpipeOut := io.Pipe() 311 311 rpipeErr, wpipeErr := io.Pipe() 312 312 313 + wg := sync.WaitGroup{} 314 + 315 + wg.Add(1) 313 316 go func() { 317 + defer wg.Done() 314 318 defer wpipeOut.Close() 315 319 defer wpipeErr.Close() 316 - _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs) 320 + _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee) 317 321 if err != nil && err != io.EOF { 318 322 e.l.Error("failed to copy logs", "error", err) 319 323 } ··· 322 326 // read from stdout and send to stdout pipe 323 327 // NOTE: the stdoutCh channnel is closed further up in StartSteps 324 328 // once all steps are done. 329 + wg.Add(1) 325 330 go func() { 331 + defer wg.Done() 326 332 e.chanMu.RLock() 327 333 stdoutCh := e.stdoutChans[wid.String()] 328 334 e.chanMu.RUnlock() ··· 339 345 // read from stderr and send to stderr pipe 340 346 // NOTE: the stderrCh channnel is closed further up in StartSteps 341 347 // once all steps are done. 348 + wg.Add(1) 342 349 go func() { 350 + defer wg.Done() 343 351 e.chanMu.RLock() 344 352 stderrCh := e.stderrChans[wid.String()] 345 353 e.chanMu.RUnlock() ··· 352 360 e.l.Error("failed to scan stderr", "error", err) 353 361 } 354 362 }() 363 + 364 + wg.Wait() 355 365 356 366 return nil 357 367 } ··· 435 445 Source: nixVolume(wid), 436 446 Target: "/nix", 437 447 }, 448 + { 449 + Type: mount.TypeTmpfs, 450 + Target: "/tmp", 451 + }, 438 452 }, 439 - ReadonlyRootfs: true, 453 + ReadonlyRootfs: false, 440 454 CapDrop: []string{"ALL"}, 441 - SecurityOpt: []string{"no-new-privileges"}, 455 + SecurityOpt: []string{"seccomp=unconfined"}, 442 456 } 443 457 444 458 return hostConfig
+4 -8
spindle/engine/envs.go
··· 2 2 3 3 import ( 4 4 "fmt" 5 - 6 - "tangled.sh/tangled.sh/core/api/tangled" 7 5 ) 8 6 9 7 type EnvVars []string 10 8 11 9 // ConstructEnvs converts a tangled.Pipeline_Step_Environment_Elem.{Key,Value} 12 10 // representation into a docker-friendly []string{"KEY=value", ...} slice. 13 - func ConstructEnvs(envs []*tangled.Pipeline_Step_Environment_Elem) EnvVars { 11 + func ConstructEnvs(envs map[string]string) EnvVars { 14 12 var dockerEnvs EnvVars 15 - for _, env := range envs { 16 - if env != nil { 17 - ev := fmt.Sprintf("%s=%s", env.Key, env.Value) 18 - dockerEnvs = append(dockerEnvs, ev) 19 - } 13 + for k, v := range envs { 14 + ev := fmt.Sprintf("%s=%s", k, v) 15 + dockerEnvs = append(dockerEnvs, ev) 20 16 } 21 17 return dockerEnvs 22 18 }
+4 -19
spindle/engine/envs_test.go
··· 3 3 import ( 4 4 "reflect" 5 5 "testing" 6 - 7 - "tangled.sh/tangled.sh/core/api/tangled" 8 6 ) 9 7 10 8 func TestConstructEnvs(t *testing.T) { 11 9 tests := []struct { 12 10 name string 13 - in []*tangled.Pipeline_Step_Environment_Elem 11 + in map[string]string 14 12 want EnvVars 15 13 }{ 16 14 { 17 15 name: "empty input", 18 - in: []*tangled.Pipeline_Step_Environment_Elem{}, 16 + in: make(map[string]string), 19 17 want: EnvVars{}, 20 18 }, 21 19 { 22 20 name: "single env var", 23 - in: []*tangled.Pipeline_Step_Environment_Elem{ 24 - {Key: "FOO", Value: "bar"}, 25 - }, 21 + in: map[string]string{"FOO": "bar"}, 26 22 want: EnvVars{"FOO=bar"}, 27 23 }, 28 24 { 29 25 name: "multiple env vars", 30 - in: []*tangled.Pipeline_Step_Environment_Elem{ 31 - {Key: "FOO", Value: "bar"}, 32 - {Key: "BAZ", Value: "qux"}, 33 - }, 26 + in: map[string]string{"FOO": "bar", "BAZ": "qux"}, 34 27 want: EnvVars{"FOO=bar", "BAZ=qux"}, 35 - }, 36 - { 37 - name: "nil entries are skipped", 38 - in: []*tangled.Pipeline_Step_Environment_Elem{ 39 - nil, 40 - {Key: "FOO", Value: "bar"}, 41 - }, 42 - want: EnvVars{"FOO=bar"}, 43 28 }, 44 29 } 45 30
+6 -6
spindle/models/pipeline.go
··· 4 4 "path" 5 5 6 6 "tangled.sh/tangled.sh/core/api/tangled" 7 + "tangled.sh/tangled.sh/core/spindle/config" 7 8 ) 8 9 9 10 type Pipeline struct { ··· 35 36 // In the process, dependencies are resolved: nixpkgs deps 36 37 // are constructed atop nixery and set as the Workflow.Image, 37 38 // and ones from custom registries 38 - func ToPipeline(pl tangled.Pipeline, dev bool) *Pipeline { 39 + func ToPipeline(pl tangled.Pipeline, cfg config.Config) *Pipeline { 39 40 workflows := []Workflow{} 40 41 41 42 for _, twf := range pl.Workflows { ··· 49 50 } 50 51 swf.Name = twf.Name 51 52 swf.Environment = workflowEnvToMap(twf.Environment) 52 - swf.Image = workflowImage(twf.Dependencies) 53 + swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery) 53 54 54 55 swf.addNixProfileToPath() 55 56 setup := &setupSteps{} 56 57 57 - setup.addStep(cloneStep(*twf, *pl.TriggerMetadata.Repo, dev)) 58 + setup.addStep(cloneStep(*twf, *pl.TriggerMetadata.Repo, cfg.Server.Dev)) 58 59 setup.addStep(checkoutStep(*twf, *pl.TriggerMetadata)) 59 60 setup.addStep(dependencyStep(*twf)) 60 61 ··· 82 83 return envMap 83 84 } 84 85 85 - func workflowImage(deps []tangled.Pipeline_Dependencies_Elem) string { 86 + func workflowImage(deps []tangled.Pipeline_Dependencies_Elem, nixery string) string { 86 87 var dependencies string 87 88 for _, d := range deps { 88 89 if d.Registry == "nixpkgs" { ··· 93 94 // load defaults from somewhere else 94 95 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 95 96 96 - // TODO: this should use nixery from the config 97 - return path.Join("nixery.dev", dependencies) 97 + return path.Join(nixery, dependencies) 98 98 } 99 99 100 100 func (wf *Workflow) addNixProfileToPath() {
+12 -10
spindle/server.go
··· 59 59 60 60 n := notifier.New() 61 61 62 - eng, err := engine.New(ctx, d, &n) 62 + eng, err := engine.New(ctx, cfg, d, &n) 63 63 if err != nil { 64 64 return err 65 65 } ··· 153 153 154 154 func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 155 155 if msg.Nsid == tangled.PipelineNSID { 156 - pipeline := tangled.Pipeline{} 157 - err := json.Unmarshal(msg.EventJson, &pipeline) 156 + tpl := tangled.Pipeline{} 157 + err := json.Unmarshal(msg.EventJson, &tpl) 158 158 if err != nil { 159 159 fmt.Println("error unmarshalling", err) 160 160 return err 161 161 } 162 162 163 - if pipeline.TriggerMetadata == nil { 163 + if tpl.TriggerMetadata == nil { 164 164 return fmt.Errorf("no trigger metadata found") 165 165 } 166 166 167 - if pipeline.TriggerMetadata.Repo == nil { 167 + if tpl.TriggerMetadata.Repo == nil { 168 168 return fmt.Errorf("no repo data found") 169 169 } 170 170 171 171 // filter by repos 172 172 _, err = s.db.GetRepo( 173 - pipeline.TriggerMetadata.Repo.Knot, 174 - pipeline.TriggerMetadata.Repo.Did, 175 - pipeline.TriggerMetadata.Repo.Repo, 173 + tpl.TriggerMetadata.Repo.Knot, 174 + tpl.TriggerMetadata.Repo.Did, 175 + tpl.TriggerMetadata.Repo.Repo, 176 176 ) 177 177 if err != nil { 178 178 return err ··· 183 183 Rkey: msg.Rkey, 184 184 } 185 185 186 - for _, w := range pipeline.Workflows { 186 + for _, w := range tpl.Workflows { 187 187 if w != nil { 188 188 err := s.db.StatusPending(models.WorkflowId{ 189 189 PipelineId: pipelineId, ··· 195 195 } 196 196 } 197 197 198 + spl := models.ToPipeline(tpl, *s.cfg) 199 + 198 200 ok := s.jq.Enqueue(queue.Job{ 199 201 Run: func() error { 200 - s.eng.StartWorkflows(ctx, &pipeline, pipelineId) 202 + s.eng.StartWorkflows(ctx, spl, pipelineId) 201 203 return nil 202 204 }, 203 205 OnFail: func(jobError error) {