+1
spindle/config/config.go
+1
spindle/config/config.go
+19
-7
spindle/engine/engine.go
+19
-7
spindle/engine/engine.go
···
203
203
close(e.stderrChans[wid.String()])
204
204
}()
205
205
206
-
for _, step := range steps {
206
+
for stepIdx, step := range steps {
207
207
envs := ConstructEnvs(step.Environment)
208
208
envs.AddEnv("HOME", workspaceDir)
209
209
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
···
238
238
// start tailing logs in background
239
239
tailDone := make(chan error, 1)
240
240
go func() {
241
-
tailDone <- e.TailStep(stepCtx, resp.ID, wid)
241
+
tailDone <- e.TailStep(stepCtx, resp.ID, wid, stepIdx)
242
242
}()
243
243
244
244
// wait for container completion or timeout
···
314
314
return info.State, nil
315
315
}
316
316
317
-
func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId) error {
317
+
func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error {
318
318
logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
319
319
Follow: true,
320
320
ShowStdout: true,
···
326
326
return err
327
327
}
328
328
329
-
var devOutput io.Writer = io.Discard
329
+
stepLogger, err := NewStepLogger(e.cfg.Pipelines.LogDir, wid.String(), stepIdx)
330
+
if err != nil {
331
+
e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
332
+
}
333
+
334
+
var logOutput io.Writer = io.Discard
335
+
330
336
if e.cfg.Server.Dev {
331
-
devOutput = &ansiStrippingWriter{underlying: os.Stdout}
337
+
logOutput = &ansiStrippingWriter{underlying: os.Stdout}
332
338
}
333
339
334
-
tee := io.TeeReader(logs, devOutput)
340
+
tee := io.TeeReader(logs, logOutput)
335
341
336
342
// using StdCopy we demux logs and stream stdout and stderr to different
337
343
// channels.
···
343
349
rpipeOut, wpipeOut := io.Pipe()
344
350
rpipeErr, wpipeErr := io.Pipe()
345
351
352
+
// sets up a io.MultiWriter to write to both the pipe
353
+
// and the file-based logger.
354
+
multiOut := io.MultiWriter(wpipeOut, stepLogger.Stdout())
355
+
multiErr := io.MultiWriter(wpipeErr, stepLogger.Stderr())
356
+
346
357
wg := sync.WaitGroup{}
347
358
348
359
wg.Add(1)
···
350
361
defer wg.Done()
351
362
defer wpipeOut.Close()
352
363
defer wpipeErr.Close()
353
-
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
364
+
defer stepLogger.Close()
365
+
_, err := stdcopy.StdCopy(multiOut, multiErr, tee)
354
366
if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
355
367
e.l.Error("failed to copy logs", "error", err)
356
368
}
+72
spindle/engine/logger.go
+72
spindle/engine/logger.go
···
1
+
package engine
2
+
3
+
import (
4
+
"fmt"
5
+
"io"
6
+
"os"
7
+
"path/filepath"
8
+
)
9
+
10
+
type StepLogger struct {
11
+
stderr *os.File
12
+
stdout *os.File
13
+
}
14
+
15
+
func NewStepLogger(baseDir, workflowID string, stepIdx int) (*StepLogger, error) {
16
+
dir := filepath.Join(baseDir, workflowID)
17
+
if err := os.MkdirAll(dir, 0755); err != nil {
18
+
return nil, fmt.Errorf("creating log dir: %w", err)
19
+
}
20
+
21
+
stdoutPath := logFilePath(baseDir, workflowID, "stdout", stepIdx)
22
+
stderrPath := logFilePath(baseDir, workflowID, "stderr", stepIdx)
23
+
24
+
stdoutFile, err := os.Create(stdoutPath)
25
+
if err != nil {
26
+
return nil, fmt.Errorf("creating stdout log file: %w", err)
27
+
}
28
+
29
+
stderrFile, err := os.Create(stderrPath)
30
+
if err != nil {
31
+
stdoutFile.Close()
32
+
return nil, fmt.Errorf("creating stderr log file: %w", err)
33
+
}
34
+
35
+
return &StepLogger{
36
+
stdout: stdoutFile,
37
+
stderr: stderrFile,
38
+
}, nil
39
+
}
40
+
41
+
func (l *StepLogger) Stdout() io.Writer {
42
+
return l.stdout
43
+
}
44
+
45
+
func (l *StepLogger) Stderr() io.Writer {
46
+
return l.stderr
47
+
}
48
+
49
+
func (l *StepLogger) Close() error {
50
+
err1 := l.stdout.Close()
51
+
err2 := l.stderr.Close()
52
+
if err1 != nil {
53
+
return err1
54
+
}
55
+
return err2
56
+
}
57
+
58
+
func ReadStepLog(baseDir, workflowID, stream string, stepIdx int) (string, error) {
59
+
logPath := logFilePath(baseDir, workflowID, stream, stepIdx)
60
+
61
+
data, err := os.ReadFile(logPath)
62
+
if err != nil {
63
+
return "", fmt.Errorf("error reading log file: %w", err)
64
+
}
65
+
66
+
return string(data), nil
67
+
}
68
+
69
+
func logFilePath(baseDir, workflowID, stream string, stepIdx int) string {
70
+
logFilePath := filepath.Join(baseDir, workflowID, fmt.Sprintf("%d-%s.log", stepIdx, stream))
71
+
return logFilePath
72
+
}