+3
-3
spindle/config/config.go
+3
-3
spindle/config/config.go
+32
-19
spindle/engine/engine.go
+32
-19
spindle/engine/engine.go
···
102
defer reader.Close()
103
io.Copy(os.Stdout, reader)
104
105
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
106
if err != nil {
107
if errors.Is(err, ErrTimedOut) {
···
177
// All other errors are bubbled up.
178
// Fixed version of the step execution logic
179
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
180
-
stepTimeoutStr := e.cfg.Pipelines.StepTimeout
181
-
stepTimeout, err := time.ParseDuration(stepTimeoutStr)
182
-
if err != nil {
183
-
e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
184
-
stepTimeout = 5 * time.Minute
185
-
}
186
-
e.l.Info("using step timeout", "timeout", stepTimeout)
187
188
for stepIdx, step := range steps {
189
envs := ConstructEnvs(step.Environment)
190
envs.AddEnv("HOME", workspaceDir)
191
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
···
199
Hostname: "spindle",
200
Env: envs.Slice(),
201
}, hostConfig, nil, nil, "")
202
if err != nil {
203
return fmt.Errorf("creating container: %w", err)
204
}
···
208
return fmt.Errorf("connecting network: %w", err)
209
}
210
211
-
stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
212
-
213
-
err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
214
if err != nil {
215
-
stepCancel()
216
return err
217
}
218
e.l.Info("started container", "name", resp.ID, "step", step.Name)
···
220
// start tailing logs in background
221
tailDone := make(chan error, 1)
222
go func() {
223
-
tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx)
224
}()
225
226
// wait for container completion or timeout
···
230
231
go func() {
232
defer close(waitDone)
233
-
state, waitErr = e.WaitStep(stepCtx, resp.ID)
234
}()
235
236
select {
···
238
239
// wait for tailing to complete
240
<-tailDone
241
-
stepCancel()
242
243
-
case <-stepCtx.Done():
244
-
e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
245
-
246
-
_ = e.DestroyStep(ctx, resp.ID)
247
248
// wait for both goroutines to finish
249
<-waitDone
250
<-tailDone
251
252
-
stepCancel()
253
return ErrTimedOut
254
}
255
256
if waitErr != nil {
···
102
defer reader.Close()
103
io.Copy(os.Stdout, reader)
104
105
+
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
106
+
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
107
+
if err != nil {
108
+
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
109
+
workflowTimeout = 5 * time.Minute
110
+
}
111
+
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
112
+
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
113
+
defer cancel()
114
+
115
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
116
if err != nil {
117
if errors.Is(err, ErrTimedOut) {
···
187
// All other errors are bubbled up.
188
// Fixed version of the step execution logic
189
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
190
191
for stepIdx, step := range steps {
192
+
select {
193
+
case <-ctx.Done():
194
+
return ctx.Err()
195
+
default:
196
+
}
197
+
198
envs := ConstructEnvs(step.Environment)
199
envs.AddEnv("HOME", workspaceDir)
200
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
···
208
Hostname: "spindle",
209
Env: envs.Slice(),
210
}, hostConfig, nil, nil, "")
211
+
defer e.DestroyStep(ctx, resp.ID)
212
if err != nil {
213
return fmt.Errorf("creating container: %w", err)
214
}
···
218
return fmt.Errorf("connecting network: %w", err)
219
}
220
221
+
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
222
if err != nil {
223
return err
224
}
225
e.l.Info("started container", "name", resp.ID, "step", step.Name)
···
227
// start tailing logs in background
228
tailDone := make(chan error, 1)
229
go func() {
230
+
tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx)
231
}()
232
233
// wait for container completion or timeout
···
237
238
go func() {
239
defer close(waitDone)
240
+
state, waitErr = e.WaitStep(ctx, resp.ID)
241
}()
242
243
select {
···
245
246
// wait for tailing to complete
247
<-tailDone
248
249
+
case <-ctx.Done():
250
+
e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
251
+
err = e.DestroyStep(context.Background(), resp.ID)
252
+
if err != nil {
253
+
e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
254
+
}
255
256
// wait for both goroutines to finish
257
<-waitDone
258
<-tailDone
259
260
return ErrTimedOut
261
+
}
262
+
263
+
select {
264
+
case <-ctx.Done():
265
+
return ctx.Err()
266
+
default:
267
}
268
269
if waitErr != nil {