+26
-3
appview/pipelines/pipelines.go
+26
-3
appview/pipelines/pipelines.go
···
158
l.Error("websocket upgrade failed", "err", err)
159
return
160
}
161
-
defer clientConn.Close()
162
163
ctx, cancel := context.WithCancel(r.Context())
164
defer cancel()
···
236
// start a goroutine to read from spindle
237
go func() {
238
defer close(msgChan)
239
for {
240
_, msg, err := spindleConn.ReadMessage()
241
if err != nil {
242
-
errChan <- err
243
return
244
}
245
msgChan <- msg
···
252
l.Info("client disconnected")
253
return
254
case err := <-errChan:
255
-
l.Error("error reading from spindle", "err", err)
256
return
257
case msg := <-msgChan:
258
var logLine spindlemodel.LogLine
···
158
l.Error("websocket upgrade failed", "err", err)
159
return
160
}
161
+
defer func() {
162
+
_ = clientConn.WriteControl(
163
+
websocket.CloseMessage,
164
+
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
165
+
time.Now().Add(time.Second),
166
+
)
167
+
clientConn.Close()
168
+
}()
169
170
ctx, cancel := context.WithCancel(r.Context())
171
defer cancel()
···
243
// start a goroutine to read from spindle
244
go func() {
245
defer close(msgChan)
246
+
defer close(errChan)
247
+
248
for {
249
_, msg, err := spindleConn.ReadMessage()
250
if err != nil {
251
+
if websocket.IsCloseError(err,
252
+
websocket.CloseNormalClosure,
253
+
websocket.CloseGoingAway,
254
+
websocket.CloseAbnormalClosure) {
255
+
errChan <- nil // signal graceful end
256
+
} else {
257
+
errChan <- err
258
+
}
259
return
260
}
261
msgChan <- msg
···
268
l.Info("client disconnected")
269
return
270
case err := <-errChan:
271
+
if err != nil {
272
+
l.Error("error reading from spindle", "err", err)
273
+
}
274
+
275
+
if err == nil {
276
+
l.Info("log tail complete")
277
+
}
278
+
279
return
280
case msg := <-msgChan:
281
var logLine spindlemodel.LogLine
+34
spindle/db/events.go
+34
spindle/db/events.go
···
120
121
}
122
123
+
func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) {
124
+
pipelineAtUri := workflowId.PipelineId.AtUri()
125
+
126
+
var eventJson string
127
+
err := d.QueryRow(
128
+
`
129
+
select
130
+
event from events
131
+
where
132
+
nsid = ?
133
+
and json_extract(event, '$.pipeline') = ?
134
+
and json_extract(event, '$.workflow') = ?
135
+
order by
136
+
created desc
137
+
limit
138
+
1
139
+
`,
140
+
tangled.PipelineStatusNSID,
141
+
string(pipelineAtUri),
142
+
workflowId.Name,
143
+
).Scan(&eventJson)
144
+
145
+
if err != nil {
146
+
return nil, err
147
+
}
148
+
149
+
var status tangled.PipelineStatus
150
+
if err := json.Unmarshal([]byte(eventJson), &status); err != nil {
151
+
return nil, err
152
+
}
153
+
154
+
return &status, nil
155
+
}
156
+
157
func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error {
158
return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n)
159
}
+5
-1
spindle/engine/engine.go
+5
-1
spindle/engine/engine.go
···
326
}
327
defer wfLogger.Close()
328
329
+
_, err = stdcopy.StdCopy(
330
+
wfLogger.Writer("stdout", stepIdx),
331
+
wfLogger.Writer("stderr", stepIdx),
332
+
logs,
333
+
)
334
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
335
return fmt.Errorf("failed to copy logs: %w", err)
336
}
+5
-23
spindle/engine/logger.go
+5
-23
spindle/engine/logger.go
···
17
}
18
19
func NewWorkflowLogger(baseDir string, wid models.WorkflowId) (*WorkflowLogger, error) {
20
-
dir := filepath.Join(baseDir, wid.String())
21
-
if err := os.MkdirAll(dir, 0755); err != nil {
22
-
return nil, fmt.Errorf("creating log dir: %w", err)
23
-
}
24
-
25
path := LogFilePath(baseDir, wid)
26
27
-
file, err := os.Create(path)
28
if err != nil {
29
return nil, fmt.Errorf("creating log file: %w", err)
30
}
···
43
return l.file.Close()
44
}
45
46
-
func OpenLogFile(baseDir string, workflowID models.WorkflowId) (*os.File, error) {
47
-
logPath := LogFilePath(baseDir, workflowID)
48
-
49
-
file, err := os.Open(logPath)
50
-
if err != nil {
51
-
return nil, fmt.Errorf("error opening log file: %w", err)
52
-
}
53
-
54
-
return file, nil
55
-
}
56
-
57
func LogFilePath(baseDir string, workflowID models.WorkflowId) string {
58
logFilePath := filepath.Join(baseDir, fmt.Sprintf("%s.log", workflowID.String()))
59
return logFilePath
60
}
61
62
-
func (l *WorkflowLogger) Stdout() io.Writer {
63
-
return &jsonWriter{logger: l, stream: "stdout"}
64
-
}
65
-
66
-
func (l *WorkflowLogger) Stderr() io.Writer {
67
-
return &jsonWriter{logger: l, stream: "stderr"}
68
}
69
70
type jsonWriter struct {
71
logger *WorkflowLogger
72
stream string
73
}
74
75
func (w *jsonWriter) Write(p []byte) (int, error) {
···
78
entry := models.LogLine{
79
Stream: w.stream,
80
Data: line,
81
}
82
83
if err := w.logger.encoder.Encode(entry); err != nil {
···
17
}
18
19
func NewWorkflowLogger(baseDir string, wid models.WorkflowId) (*WorkflowLogger, error) {
20
path := LogFilePath(baseDir, wid)
21
22
+
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
23
if err != nil {
24
return nil, fmt.Errorf("creating log file: %w", err)
25
}
···
38
return l.file.Close()
39
}
40
41
func LogFilePath(baseDir string, workflowID models.WorkflowId) string {
42
logFilePath := filepath.Join(baseDir, fmt.Sprintf("%s.log", workflowID.String()))
43
return logFilePath
44
}
45
46
+
func (l *WorkflowLogger) Writer(stream string, stepId int) io.Writer {
47
+
return &jsonWriter{logger: l, stream: stream, stepId: stepId}
48
}
49
50
type jsonWriter struct {
51
logger *WorkflowLogger
52
stream string
53
+
stepId int
54
}
55
56
func (w *jsonWriter) Write(p []byte) (int, error) {
···
59
entry := models.LogLine{
60
Stream: w.stream,
61
Data: line,
62
+
StepId: w.stepId,
63
}
64
65
if err := w.logger.encoder.Encode(entry); err != nil {
+1
spindle/models/models.go
+1
spindle/models/models.go
+29
-7
spindle/stream.go
+29
-7
spindle/stream.go
···
4
"context"
5
"encoding/json"
6
"fmt"
7
"net/http"
8
"strconv"
9
"time"
···
105
http.Error(w, "failed to upgrade", http.StatusInternalServerError)
106
return
107
}
108
-
defer conn.Close()
109
l.Debug("upgraded http to wss")
110
111
ctx, cancel := context.WithCancel(r.Context())
···
122
}()
123
124
if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil {
125
-
l.Error("log stream failed", "err", err)
126
}
127
-
l.Debug("logs connection closed")
128
}
129
130
func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
131
filePath := engine.LogFilePath(s.cfg.Pipelines.LogDir, wid)
132
133
config := tail.Config{
134
-
Follow: true,
135
-
ReOpen: true,
136
MustExist: false,
137
-
Location: &tail.SeekInfo{Offset: 0, Whence: 0},
138
-
Logger: tail.DiscardingLogger,
139
}
140
141
t, err := tail.TailFile(filePath, config)
···
149
case <-ctx.Done():
150
return ctx.Err()
151
case line := <-t.Lines:
152
if line == nil {
153
return fmt.Errorf("tail channel closed unexpectedly")
154
}
···
4
"context"
5
"encoding/json"
6
"fmt"
7
+
"io"
8
"net/http"
9
"strconv"
10
"time"
···
106
http.Error(w, "failed to upgrade", http.StatusInternalServerError)
107
return
108
}
109
+
defer func() {
110
+
_ = conn.WriteControl(
111
+
websocket.CloseMessage,
112
+
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
113
+
time.Now().Add(time.Second),
114
+
)
115
+
conn.Close()
116
+
}()
117
l.Debug("upgraded http to wss")
118
119
ctx, cancel := context.WithCancel(r.Context())
···
130
}()
131
132
if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil {
133
+
l.Info("log stream ended", "err", err)
134
}
135
+
136
+
l.Info("logs connection closed")
137
}
138
139
func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
140
+
status, err := s.db.GetStatus(wid)
141
+
if err != nil {
142
+
return err
143
+
}
144
+
isFinished := models.StatusKind(status.Status).IsFinish()
145
+
146
filePath := engine.LogFilePath(s.cfg.Pipelines.LogDir, wid)
147
148
config := tail.Config{
149
+
Follow: !isFinished,
150
+
ReOpen: !isFinished,
151
MustExist: false,
152
+
Location: &tail.SeekInfo{
153
+
Offset: 0,
154
+
Whence: io.SeekStart,
155
+
},
156
+
// Logger: tail.DiscardingLogger,
157
}
158
159
t, err := tail.TailFile(filePath, config)
···
167
case <-ctx.Done():
168
return ctx.Err()
169
case line := <-t.Lines:
170
+
if line == nil && isFinished {
171
+
return fmt.Errorf("tail completed")
172
+
}
173
+
174
if line == nil {
175
return fmt.Errorf("tail channel closed unexpectedly")
176
}