forked from tangled.org/core
Monorepo for Tangled

spindle/{config,engine}: configure a timeout for steps

WaitStep and TailStep now run in goroutines, and are tracked with a
stepCtx which has a timeout attached. Once stepCtx expires, the step is
killed with DestroyStep.

The default timeout is set to 5m, and is configureable using
SPINDLE_PIPELINES_STEP_TIMEOUT.

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

anirudh.fi 93cfd4e4 b1136410

verified
Changed files
+56 -22
spindle
+2 -1
spindle/config/config.go
··· 17 18 type Pipelines struct { 19 // TODO: change default to nixery.tangled.sh 20 - Nixery string `env:"NIXERY, default=nixery.dev"` 21 } 22 23 type Config struct {
··· 17 18 type Pipelines struct { 19 // TODO: change default to nixery.tangled.sh 20 + Nixery string `env:"NIXERY, default=nixery.dev"` 21 + StepTimeout string `env:"STEP_TIMEOUT, default=5m"` 22 } 23 24 type Config struct {
+2 -2
spindle/engine/ansi_stripper.go
··· 3 import ( 4 "io" 5 6 - "github.com/go-enry/go-enry/v2/regex" 7 ) 8 9 // regex to match ANSI escape codes (e.g., color codes, cursor moves) 10 const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))" 11 12 - var re = regex.MustCompile(ansi) 13 14 type ansiStrippingWriter struct { 15 underlying io.Writer
··· 3 import ( 4 "io" 5 6 + "regexp" 7 ) 8 9 // regex to match ANSI escape codes (e.g., color codes, cursor moves) 10 const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))" 11 12 + var re = regexp.MustCompile(ansi) 13 14 type ansiStrippingWriter struct { 15 underlying io.Writer
+52 -19
spindle/engine/engine.go
··· 3 import ( 4 "bufio" 5 "context" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "strings" 11 "sync" 12 13 "github.com/docker/docker/api/types/container" 14 "github.com/docker/docker/api/types/image" ··· 176 // StartSteps starts all steps sequentially with the same base image. 177 // ONLY marks pipeline as failed if container's exit code is non-zero. 178 // All other errors are bubbled up. 179 func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 180 - // set up logging channels 181 e.chanMu.Lock() 182 if _, exists := e.stdoutChans[wid.String()]; !exists { 183 e.stdoutChans[wid.String()] = make(chan string, 100) ··· 216 return fmt.Errorf("connecting network: %w", err) 217 } 218 219 - err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 220 if err != nil { 221 return err 222 } 223 e.l.Info("started container", "name", resp.ID, "step", step.Name) 224 225 - wg := sync.WaitGroup{} 226 227 - wg.Add(1) 228 go func() { 229 - defer wg.Done() 230 - err := e.TailStep(ctx, resp.ID, wid) 231 - if err != nil { 232 - e.l.Error("failed to tail container", "container", resp.ID) 233 - return 234 - } 235 }() 236 237 - // wait until all logs are piped 238 - wg.Wait() 239 240 - state, err := e.WaitStep(ctx, resp.ID) 241 - if err != nil { 242 - return err 243 } 244 245 err = e.DestroyStep(ctx, resp.ID) ··· 253 if err != nil { 254 return err 255 } 256 - return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled) 257 } 258 } 259 260 return nil 261 - 262 } 263 264 func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { ··· 318 defer wpipeOut.Close() 319 defer wpipeErr.Close() 320 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee) 321 - if err != nil && err != io.EOF { 322 e.l.Error("failed to copy logs", "error", err) 323 } 324 }() ··· 393 394 for _, fn := range fns { 395 if err := fn(ctx); err != nil { 396 - e.l.Error("failed to cleanup workflow resource", "workflowId", wid) 397 } 398 } 399 return nil
··· 3 import ( 4 "bufio" 5 "context" 6 + "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "os" 11 "strings" 12 "sync" 13 + "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" ··· 178 // StartSteps starts all steps sequentially with the same base image. 179 // ONLY marks pipeline as failed if container's exit code is non-zero. 180 // All other errors are bubbled up. 181 + // Fixed version of the step execution logic 182 func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 183 + stepTimeoutStr := e.cfg.Pipelines.StepTimeout 184 + stepTimeout, err := time.ParseDuration(stepTimeoutStr) 185 + if err != nil { 186 + e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr) 187 + stepTimeout = 5 * time.Minute 188 + } 189 + e.l.Info("using step timeout", "timeout", stepTimeout) 190 + 191 e.chanMu.Lock() 192 if _, exists := e.stdoutChans[wid.String()]; !exists { 193 e.stdoutChans[wid.String()] = make(chan string, 100) ··· 226 return fmt.Errorf("connecting network: %w", err) 227 } 228 229 + stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout) 230 + 231 + err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{}) 232 if err != nil { 233 + stepCancel() 234 return err 235 } 236 e.l.Info("started container", "name", resp.ID, "step", step.Name) 237 238 + // start tailing logs in background 239 + tailDone := make(chan error, 1) 240 + go func() { 241 + tailDone <- e.TailStep(stepCtx, resp.ID, wid) 242 + }() 243 244 + // wait for container completion or timeout 245 + waitDone := make(chan struct{}) 246 + var state *container.State 247 + var waitErr error 248 + 249 go func() { 250 + defer close(waitDone) 251 + state, waitErr = e.WaitStep(stepCtx, resp.ID) 252 }() 253 254 + select { 255 + case <-waitDone: 256 + // container finished normally 257 + stepCancel() 258 + 259 + // wait for tailing to complete 260 + <-tailDone 261 + 262 + case <-stepCtx.Done(): 263 + e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout) 264 + 265 + _ = e.DestroyStep(ctx, resp.ID) 266 + 267 + // wait for both goroutines to finish 268 + <-waitDone 269 + <-tailDone 270 + 271 + stepCancel() 272 + return fmt.Errorf("step timed out after %v", stepTimeout) 273 + } 274 275 + if waitErr != nil { 276 + return waitErr 277 } 278 279 err = e.DestroyStep(ctx, resp.ID) ··· 287 if err != nil { 288 return err 289 } 290 + return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled) 291 } 292 } 293 294 return nil 295 } 296 297 func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { ··· 351 defer wpipeOut.Close() 352 defer wpipeErr.Close() 353 _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee) 354 + if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) { 355 e.l.Error("failed to copy logs", "error", err) 356 } 357 }() ··· 426 427 for _, fn := range fns { 428 if err := fn(ctx); err != nil { 429 + e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 430 } 431 } 432 return nil