+8
-117
spindle/engine/engine.go
+8
-117
spindle/engine/engine.go
···
1
1
package engine
2
2
3
3
import (
4
-
"bufio"
5
4
"context"
6
5
"errors"
7
6
"fmt"
···
39
38
n *notifier.Notifier
40
39
cfg *config.Config
41
40
42
-
chanMu sync.RWMutex
43
-
stdoutChans map[string]chan string
44
-
stderrChans map[string]chan string
45
-
46
41
cleanupMu sync.Mutex
47
42
cleanup map[string][]cleanupFunc
48
43
}
···
62
57
n: n,
63
58
cfg: cfg,
64
59
}
65
-
66
-
e.stdoutChans = make(map[string]chan string, 100)
67
-
e.stderrChans = make(map[string]chan string, 100)
68
60
69
61
e.cleanup = make(map[string][]cleanupFunc)
70
62
···
188
180
}
189
181
e.l.Info("using step timeout", "timeout", stepTimeout)
190
182
191
-
e.chanMu.Lock()
192
-
if _, exists := e.stdoutChans[wid.String()]; !exists {
193
-
e.stdoutChans[wid.String()] = make(chan string, 100)
194
-
}
195
-
if _, exists := e.stderrChans[wid.String()]; !exists {
196
-
e.stderrChans[wid.String()] = make(chan string, 100)
197
-
}
198
-
e.chanMu.Unlock()
199
-
200
-
// close channels after all steps are complete
201
-
defer func() {
202
-
close(e.stdoutChans[wid.String()])
203
-
close(e.stderrChans[wid.String()])
204
-
}()
205
-
206
183
for stepIdx, step := range steps {
207
184
envs := ConstructEnvs(step.Environment)
208
185
envs.AddEnv("HOME", workspaceDir)
···
282
259
283
260
if state.ExitCode != 0 {
284
261
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
285
-
err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n)
286
-
if err != nil {
287
-
return err
288
-
}
289
262
return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled)
290
263
}
291
264
}
···
318
291
Follow: true,
319
292
ShowStdout: true,
320
293
ShowStderr: true,
321
-
Details: true,
294
+
Details: false,
322
295
Timestamps: false,
323
296
})
324
297
if err != nil {
325
298
return err
326
299
}
327
300
328
-
stepLogger, err := NewStepLogger(e.cfg.Pipelines.LogDir, wid.String(), stepIdx)
301
+
wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid)
329
302
if err != nil {
330
303
e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
304
+
return err
331
305
}
332
-
333
-
var logOutput io.Writer = io.Discard
306
+
defer wfLogger.Close()
334
307
335
-
if e.cfg.Server.Dev {
336
-
logOutput = &ansiStrippingWriter{underlying: os.Stdout}
308
+
_, err = stdcopy.StdCopy(wfLogger.Stdout(), wfLogger.Stderr(), logs)
309
+
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
310
+
return fmt.Errorf("failed to copy logs: %w", err)
337
311
}
338
312
339
-
tee := io.TeeReader(logs, logOutput)
340
-
341
-
// using StdCopy we demux logs and stream stdout and stderr to different
342
-
// channels.
343
-
//
344
-
// stdout w||r stdoutCh
345
-
// stderr w||r stderrCh
346
-
//
347
-
348
-
rpipeOut, wpipeOut := io.Pipe()
349
-
rpipeErr, wpipeErr := io.Pipe()
350
-
351
-
// sets up a io.MultiWriter to write to both the pipe
352
-
// and the file-based logger.
353
-
multiOut := io.MultiWriter(wpipeOut, stepLogger.Stdout())
354
-
multiErr := io.MultiWriter(wpipeErr, stepLogger.Stderr())
355
-
356
-
wg := sync.WaitGroup{}
357
-
358
-
wg.Add(1)
359
-
go func() {
360
-
defer wg.Done()
361
-
defer wpipeOut.Close()
362
-
defer wpipeErr.Close()
363
-
defer stepLogger.Close()
364
-
_, err := stdcopy.StdCopy(multiOut, multiErr, tee)
365
-
if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
366
-
e.l.Error("failed to copy logs", "error", err)
367
-
}
368
-
}()
369
-
370
-
// read from stdout and send to stdout pipe
371
-
// NOTE: the stdoutCh channnel is closed further up in StartSteps
372
-
// once all steps are done.
373
-
wg.Add(1)
374
-
go func() {
375
-
defer wg.Done()
376
-
e.chanMu.RLock()
377
-
stdoutCh := e.stdoutChans[wid.String()]
378
-
e.chanMu.RUnlock()
379
-
380
-
scanner := bufio.NewScanner(rpipeOut)
381
-
for scanner.Scan() {
382
-
stdoutCh <- scanner.Text()
383
-
}
384
-
if err := scanner.Err(); err != nil {
385
-
e.l.Error("failed to scan stdout", "error", err)
386
-
}
387
-
}()
388
-
389
-
// read from stderr and send to stderr pipe
390
-
// NOTE: the stderrCh channnel is closed further up in StartSteps
391
-
// once all steps are done.
392
-
wg.Add(1)
393
-
go func() {
394
-
defer wg.Done()
395
-
e.chanMu.RLock()
396
-
stderrCh := e.stderrChans[wid.String()]
397
-
e.chanMu.RUnlock()
398
-
399
-
scanner := bufio.NewScanner(rpipeErr)
400
-
for scanner.Scan() {
401
-
stderrCh <- scanner.Text()
402
-
}
403
-
if err := scanner.Err(); err != nil {
404
-
e.l.Error("failed to scan stderr", "error", err)
405
-
}
406
-
}()
407
-
408
-
wg.Wait()
409
-
410
313
return nil
411
314
}
412
315
···
441
344
}
442
345
}
443
346
return nil
444
-
}
445
-
446
-
func (e *Engine) LogChannels(wid models.WorkflowId) (stdout <-chan string, stderr <-chan string, ok bool) {
447
-
e.chanMu.RLock()
448
-
defer e.chanMu.RUnlock()
449
-
450
-
stdoutCh, ok1 := e.stdoutChans[wid.String()]
451
-
stderrCh, ok2 := e.stderrChans[wid.String()]
452
-
453
-
if !ok1 || !ok2 {
454
-
return nil, nil, false
455
-
}
456
-
return stdoutCh, stderrCh, true
457
347
}
458
348
459
349
func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
···
507
397
CapDrop: []string{"ALL"},
508
398
CapAdd: []string{"CAP_DAC_OVERRIDE"},
509
399
SecurityOpt: []string{"no-new-privileges"},
400
+
ExtraHosts: []string{"host.docker.internal:host-gateway"},
510
401
}
511
402
512
403
return hostConfig
+53
-37
spindle/engine/logger.go
+53
-37
spindle/engine/logger.go
···
1
1
package engine
2
2
3
3
import (
4
+
"encoding/json"
4
5
"fmt"
5
6
"io"
6
7
"os"
7
8
"path/filepath"
9
+
"strings"
10
+
11
+
"tangled.sh/tangled.sh/core/spindle/models"
8
12
)
9
13
10
-
type StepLogger struct {
11
-
stderr *os.File
12
-
stdout *os.File
14
+
type WorkflowLogger struct {
15
+
file *os.File
16
+
encoder *json.Encoder
13
17
}
14
18
15
-
func NewStepLogger(baseDir, workflowID string, stepIdx int) (*StepLogger, error) {
16
-
dir := filepath.Join(baseDir, workflowID)
19
+
func NewWorkflowLogger(baseDir string, wid models.WorkflowId) (*WorkflowLogger, error) {
20
+
dir := filepath.Join(baseDir, wid.String())
17
21
if err := os.MkdirAll(dir, 0755); err != nil {
18
22
return nil, fmt.Errorf("creating log dir: %w", err)
19
23
}
20
24
21
-
stdoutPath := logFilePath(baseDir, workflowID, "stdout", stepIdx)
22
-
stderrPath := logFilePath(baseDir, workflowID, "stderr", stepIdx)
25
+
path := LogFilePath(baseDir, wid)
23
26
24
-
stdoutFile, err := os.Create(stdoutPath)
27
+
file, err := os.Create(path)
25
28
if err != nil {
26
-
return nil, fmt.Errorf("creating stdout log file: %w", err)
29
+
return nil, fmt.Errorf("creating log file: %w", err)
27
30
}
28
31
29
-
stderrFile, err := os.Create(stderrPath)
32
+
return &WorkflowLogger{
33
+
file: file,
34
+
encoder: json.NewEncoder(file),
35
+
}, nil
36
+
}
37
+
38
+
func (l *WorkflowLogger) Write(p []byte) (n int, err error) {
39
+
return l.file.Write(p)
40
+
}
41
+
42
+
func (l *WorkflowLogger) Close() error {
43
+
return l.file.Close()
44
+
}
45
+
46
+
func OpenLogFile(baseDir string, workflowID models.WorkflowId) (*os.File, error) {
47
+
logPath := LogFilePath(baseDir, workflowID)
48
+
49
+
file, err := os.Open(logPath)
30
50
if err != nil {
31
-
stdoutFile.Close()
32
-
return nil, fmt.Errorf("creating stderr log file: %w", err)
51
+
return nil, fmt.Errorf("error opening log file: %w", err)
33
52
}
34
53
35
-
return &StepLogger{
36
-
stdout: stdoutFile,
37
-
stderr: stderrFile,
38
-
}, nil
54
+
return file, nil
39
55
}
40
56
41
-
func (l *StepLogger) Stdout() io.Writer {
42
-
return l.stdout
57
+
func LogFilePath(baseDir string, workflowID models.WorkflowId) string {
58
+
logFilePath := filepath.Join(baseDir, fmt.Sprintf("%s.log", workflowID.String()))
59
+
return logFilePath
60
+
}
61
+
62
+
func (l *WorkflowLogger) Stdout() io.Writer {
63
+
return &jsonWriter{logger: l, stream: "stdout"}
43
64
}
44
65
45
-
func (l *StepLogger) Stderr() io.Writer {
46
-
return l.stderr
66
+
func (l *WorkflowLogger) Stderr() io.Writer {
67
+
return &jsonWriter{logger: l, stream: "stderr"}
47
68
}
48
69
49
-
func (l *StepLogger) Close() error {
50
-
err1 := l.stdout.Close()
51
-
err2 := l.stderr.Close()
52
-
if err1 != nil {
53
-
return err1
54
-
}
55
-
return err2
70
+
type jsonWriter struct {
71
+
logger *WorkflowLogger
72
+
stream string
56
73
}
57
74
58
-
func ReadStepLog(baseDir, workflowID, stream string, stepIdx int) (string, error) {
59
-
logPath := logFilePath(baseDir, workflowID, stream, stepIdx)
75
+
func (w *jsonWriter) Write(p []byte) (int, error) {
76
+
line := strings.TrimRight(string(p), "\r\n")
60
77
61
-
data, err := os.ReadFile(logPath)
62
-
if err != nil {
63
-
return "", fmt.Errorf("error reading log file: %w", err)
78
+
entry := models.LogLine{
79
+
Stream: w.stream,
80
+
Data: line,
64
81
}
65
82
66
-
return string(data), nil
67
-
}
83
+
if err := w.logger.encoder.Encode(entry); err != nil {
84
+
return 0, err
85
+
}
68
86
69
-
func logFilePath(baseDir, workflowID, stream string, stepIdx int) string {
70
-
logFilePath := filepath.Join(baseDir, workflowID, fmt.Sprintf("%d-%s.log", stepIdx, stream))
71
-
return logFilePath
87
+
return len(p), nil
72
88
}
+5
spindle/models/models.go
+5
spindle/models/models.go
+5
spindle/models/pipeline.go
+5
spindle/models/pipeline.go
···
53
53
swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery)
54
54
55
55
swf.addNixProfileToPath()
56
+
swf.enableNixFlakes()
56
57
setup := &setupSteps{}
57
58
58
59
setup.addStep(nixConfStep())
···
101
102
func (wf *Workflow) addNixProfileToPath() {
102
103
wf.Environment["PATH"] = "$PATH:/.nix-profile/bin"
103
104
}
105
+
106
+
func (wf *Workflow) enableNixFlakes() {
107
+
wf.Environment["NIX_CONFIG"] = "experimental-features = nix-command flakes"
108
+
}
-1
spindle/server.go
-1
spindle/server.go
+27
-114
spindle/stream.go
+27
-114
spindle/stream.go
···
1
1
package spindle
2
2
3
3
import (
4
-
"bufio"
5
4
"context"
6
5
"encoding/json"
7
6
"fmt"
8
7
"net/http"
9
8
"strconv"
10
-
"strings"
11
9
"time"
12
10
13
11
"tangled.sh/tangled.sh/core/spindle/engine"
···
15
13
16
14
"github.com/go-chi/chi/v5"
17
15
"github.com/gorilla/websocket"
16
+
"github.com/hpcloud/tail"
18
17
)
19
18
20
19
var upgrader = websocket.Upgrader{
···
97
96
return
98
97
}
99
98
100
-
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
101
-
return s.streamLogs(ctx, conn, wid)
102
-
})
103
-
}
104
-
105
-
func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) {
106
-
wid, err := getWorkflowID(r)
107
-
if err != nil {
108
-
http.Error(w, err.Error(), http.StatusBadRequest)
109
-
return
110
-
}
111
-
112
-
idxStr := chi.URLParam(r, "idx")
113
-
if idxStr == "" {
114
-
http.Error(w, "step index required", http.StatusBadRequest)
115
-
return
116
-
}
117
-
idx, err := strconv.Atoi(idxStr)
118
-
if err != nil {
119
-
http.Error(w, "bad step index", http.StatusBadRequest)
120
-
return
121
-
}
122
-
123
-
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
124
-
return s.streamLogFromDisk(ctx, conn, wid, idx)
125
-
})
126
-
}
127
-
128
-
func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) {
129
99
l := s.l.With("handler", "Logs")
100
+
l = s.l.With("wid", wid)
130
101
131
102
conn, err := upgrader.Upgrade(w, r, nil)
132
103
if err != nil {
···
150
121
}
151
122
}()
152
123
153
-
if err := streamFn(ctx, conn); err != nil {
124
+
if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil {
154
125
l.Error("log stream failed", "err", err)
155
126
}
156
127
l.Debug("logs connection closed")
157
128
}
158
129
159
-
func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
160
-
l := s.l.With("workflow_id", wid.String())
130
+
func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
131
+
filePath := engine.LogFilePath(s.cfg.Pipelines.LogDir, wid)
161
132
162
-
stdoutCh, stderrCh, ok := s.eng.LogChannels(wid)
163
-
if !ok {
164
-
return fmt.Errorf("workflow_id %q not found", wid.String())
133
+
config := tail.Config{
134
+
Follow: true,
135
+
ReOpen: true,
136
+
MustExist: false,
137
+
Location: &tail.SeekInfo{Offset: 0, Whence: 0},
138
+
Logger: tail.DiscardingLogger,
165
139
}
166
140
167
-
done := make(chan struct{})
141
+
t, err := tail.TailFile(filePath, config)
142
+
if err != nil {
143
+
return fmt.Errorf("failed to tail log file: %w", err)
144
+
}
145
+
defer t.Stop()
168
146
169
-
go func() {
170
-
for {
171
-
select {
172
-
case line, ok := <-stdoutCh:
173
-
if !ok {
174
-
done <- struct{}{}
175
-
return
176
-
}
177
-
msg := map[string]string{"type": "stdout", "data": line}
178
-
if err := conn.WriteJSON(msg); err != nil {
179
-
l.Error("write stdout failed", "err", err)
180
-
done <- struct{}{}
181
-
return
182
-
}
183
-
case <-ctx.Done():
184
-
done <- struct{}{}
185
-
return
147
+
for {
148
+
select {
149
+
case <-ctx.Done():
150
+
return ctx.Err()
151
+
case line := <-t.Lines:
152
+
if line == nil {
153
+
return fmt.Errorf("tail channel closed unexpectedly")
186
154
}
187
-
}
188
-
}()
189
155
190
-
go func() {
191
-
for {
192
-
select {
193
-
case line, ok := <-stderrCh:
194
-
if !ok {
195
-
done <- struct{}{}
196
-
return
197
-
}
198
-
msg := map[string]string{"type": "stderr", "data": line}
199
-
if err := conn.WriteJSON(msg); err != nil {
200
-
l.Error("write stderr failed", "err", err)
201
-
done <- struct{}{}
202
-
return
203
-
}
204
-
case <-ctx.Done():
205
-
done <- struct{}{}
206
-
return
156
+
if line.Err != nil {
157
+
return fmt.Errorf("error tailing log file: %w", line.Err)
207
158
}
208
-
}
209
-
}()
210
-
211
-
select {
212
-
case <-done:
213
-
case <-ctx.Done():
214
-
}
215
159
216
-
return nil
217
-
}
218
-
219
-
func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error {
220
-
streams := []string{"stdout", "stderr"}
221
-
222
-
for _, stream := range streams {
223
-
data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx)
224
-
if err != nil {
225
-
// log but continue to next stream
226
-
s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err)
227
-
continue
228
-
}
229
-
230
-
scanner := bufio.NewScanner(strings.NewReader(data))
231
-
for scanner.Scan() {
232
-
select {
233
-
case <-ctx.Done():
234
-
return ctx.Err()
235
-
default:
236
-
msg := map[string]string{
237
-
"type": stream,
238
-
"data": scanner.Text(),
239
-
}
240
-
if err := conn.WriteJSON(msg); err != nil {
241
-
return err
242
-
}
160
+
if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil {
161
+
return fmt.Errorf("failed to write to websocket: %w", err)
243
162
}
244
163
}
245
-
246
-
if err := scanner.Err(); err != nil {
247
-
return fmt.Errorf("error scanning %s log: %w", stream, err)
248
-
}
249
164
}
250
-
251
-
return nil
252
165
}
253
166
254
167
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {