+13
-3
spindle/engine/engine.go
+13
-3
spindle/engine/engine.go
···
79
defer cancel()
80
81
for stepIdx, step := range w.Steps {
82
if wfLogger != nil {
83
-
ctl := wfLogger.ControlWriter(stepIdx, step)
84
-
ctl.Write([]byte(step.Name()))
85
}
86
87
err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
88
if err != nil {
89
if errors.Is(err, ErrTimedOut) {
90
dbErr := db.StatusTimeout(wid, n)
···
115
if err := eg.Wait(); err != nil {
116
l.Error("failed to run one or more workflows", "err", err)
117
} else {
118
-
l.Error("successfully ran full pipeline")
119
}
120
}
···
79
defer cancel()
80
81
for stepIdx, step := range w.Steps {
82
+
// log start of step
83
if wfLogger != nil {
84
+
wfLogger.
85
+
ControlWriter(stepIdx, step, models.StepStatusStart).
86
+
Write([]byte{0})
87
}
88
89
err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
90
+
91
+
// log end of step
92
+
if wfLogger != nil {
93
+
wfLogger.
94
+
ControlWriter(stepIdx, step, models.StepStatusEnd).
95
+
Write([]byte{0})
96
+
}
97
+
98
if err != nil {
99
if errors.Is(err, ErrTimedOut) {
100
dbErr := db.StatusTimeout(wid, n)
···
125
if err := eg.Wait(); err != nil {
126
l.Error("failed to run one or more workflows", "err", err)
127
} else {
128
+
l.Info("successfully ran full pipeline")
129
}
130
}
+2
-2
spindle/engines/nixery/engine.go
+2
-2
spindle/engines/nixery/engine.go
+14
-11
spindle/models/logger.go
+14
-11
spindle/models/logger.go
···
37
return l.file.Close()
38
}
39
40
-
func (l *WorkflowLogger) DataWriter(stream string) io.Writer {
41
-
// TODO: emit stream
42
return &dataWriter{
43
logger: l,
44
stream: stream,
45
}
46
}
47
48
-
func (l *WorkflowLogger) ControlWriter(idx int, step Step) io.Writer {
49
return &controlWriter{
50
-
logger: l,
51
-
idx: idx,
52
-
step: step,
53
}
54
}
55
56
type dataWriter struct {
57
logger *WorkflowLogger
58
stream string
59
}
60
61
func (w *dataWriter) Write(p []byte) (int, error) {
62
line := strings.TrimRight(string(p), "\r\n")
63
-
entry := NewDataLogLine(line, w.stream)
64
if err := w.logger.encoder.Encode(entry); err != nil {
65
return 0, err
66
}
···
68
}
69
70
type controlWriter struct {
71
-
logger *WorkflowLogger
72
-
idx int
73
-
step Step
74
}
75
76
func (w *controlWriter) Write(_ []byte) (int, error) {
77
-
entry := NewControlLogLine(w.idx, w.step)
78
if err := w.logger.encoder.Encode(entry); err != nil {
79
return 0, err
80
}
···
37
return l.file.Close()
38
}
39
40
+
func (l *WorkflowLogger) DataWriter(idx int, stream string) io.Writer {
41
return &dataWriter{
42
logger: l,
43
+
idx: idx,
44
stream: stream,
45
}
46
}
47
48
+
func (l *WorkflowLogger) ControlWriter(idx int, step Step, stepStatus StepStatus) io.Writer {
49
return &controlWriter{
50
+
logger: l,
51
+
idx: idx,
52
+
step: step,
53
+
stepStatus: stepStatus,
54
}
55
}
56
57
type dataWriter struct {
58
logger *WorkflowLogger
59
+
idx int
60
stream string
61
}
62
63
func (w *dataWriter) Write(p []byte) (int, error) {
64
line := strings.TrimRight(string(p), "\r\n")
65
+
entry := NewDataLogLine(w.idx, line, w.stream)
66
if err := w.logger.encoder.Encode(entry); err != nil {
67
return 0, err
68
}
···
70
}
71
72
type controlWriter struct {
73
+
logger *WorkflowLogger
74
+
idx int
75
+
step Step
76
+
stepStatus StepStatus
77
}
78
79
func (w *controlWriter) Write(_ []byte) (int, error) {
80
+
entry := NewControlLogLine(w.idx, w.step, w.stepStatus)
81
if err := w.logger.encoder.Encode(entry); err != nil {
82
return 0, err
83
}
+23
-8
spindle/models/models.go
+23
-8
spindle/models/models.go
···
4
"fmt"
5
"regexp"
6
"slices"
7
8
"tangled.org/core/api/tangled"
9
···
76
var (
77
// step log data
78
LogKindData LogKind = "data"
79
-
// indicates start/end of a step
80
LogKindControl LogKind = "control"
81
)
82
83
type LogLine struct {
84
-
Kind LogKind `json:"kind"`
85
-
Content string `json:"content"`
86
87
// fields if kind is "data"
88
Stream string `json:"stream,omitempty"`
89
90
// fields if kind is "control"
91
-
StepId int `json:"step_id,omitempty"`
92
-
StepKind StepKind `json:"step_kind,omitempty"`
93
-
StepCommand string `json:"step_command,omitempty"`
94
}
95
96
-
func NewDataLogLine(content, stream string) LogLine {
97
return LogLine{
98
Kind: LogKindData,
99
Content: content,
100
Stream: stream,
101
}
102
}
103
104
-
func NewControlLogLine(idx int, step Step) LogLine {
105
return LogLine{
106
Kind: LogKindControl,
107
Content: step.Name(),
108
StepId: idx,
109
StepKind: step.Kind(),
110
StepCommand: step.Command(),
111
}
···
4
"fmt"
5
"regexp"
6
"slices"
7
+
"time"
8
9
"tangled.org/core/api/tangled"
10
···
77
var (
78
// step log data
79
LogKindData LogKind = "data"
80
+
// indicates status of a step
81
LogKindControl LogKind = "control"
82
)
83
84
+
// step status indicator in control log lines
85
+
type StepStatus string
86
+
87
+
var (
88
+
StepStatusStart StepStatus = "start"
89
+
StepStatusEnd StepStatus = "end"
90
+
)
91
+
92
type LogLine struct {
93
+
Kind LogKind `json:"kind"`
94
+
Content string `json:"content"`
95
+
Time time.Time `json:"time"`
96
+
StepId int `json:"step_id"`
97
98
// fields if kind is "data"
99
Stream string `json:"stream,omitempty"`
100
101
// fields if kind is "control"
102
+
StepStatus StepStatus `json:"step_status,omitempty"`
103
+
StepKind StepKind `json:"step_kind,omitempty"`
104
+
StepCommand string `json:"step_command,omitempty"`
105
}
106
107
+
func NewDataLogLine(idx int, content, stream string) LogLine {
108
return LogLine{
109
Kind: LogKindData,
110
+
Time: time.Now(),
111
Content: content,
112
+
StepId: idx,
113
Stream: stream,
114
}
115
}
116
117
+
func NewControlLogLine(idx int, step Step, status StepStatus) LogLine {
118
return LogLine{
119
Kind: LogKindControl,
120
+
Time: time.Now(),
121
Content: step.Name(),
122
StepId: idx,
123
+
StepStatus: status,
124
StepKind: step.Kind(),
125
StepCommand: step.Command(),
126
}