+1
spindle/server.go
+1
spindle/server.go
+84
-21
spindle/stream.go
+84
-21
spindle/stream.go
···
1
1
package spindle
2
2
3
3
import (
4
+
"bufio"
4
5
"context"
5
6
"encoding/json"
6
7
"fmt"
7
8
"net/http"
8
9
"strconv"
10
+
"strings"
9
11
"time"
10
12
13
+
"tangled.sh/tangled.sh/core/spindle/engine"
11
14
"tangled.sh/tangled.sh/core/spindle/models"
12
15
13
16
"github.com/go-chi/chi/v5"
···
88
91
}
89
92
90
93
func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
91
-
l := s.l.With("handler", "Logs")
94
+
wid, err := getWorkflowID(r)
95
+
if err != nil {
96
+
http.Error(w, err.Error(), http.StatusBadRequest)
97
+
return
98
+
}
92
99
93
-
knot := chi.URLParam(r, "knot")
94
-
if knot == "" {
95
-
http.Error(w, "knot required", http.StatusBadRequest)
100
+
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
101
+
return s.streamLogs(ctx, conn, wid)
102
+
})
103
+
}
104
+
105
+
func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) {
106
+
wid, err := getWorkflowID(r)
107
+
if err != nil {
108
+
http.Error(w, err.Error(), http.StatusBadRequest)
96
109
return
97
110
}
98
111
99
-
rkey := chi.URLParam(r, "rkey")
100
-
if rkey == "" {
101
-
http.Error(w, "rkey required", http.StatusBadRequest)
112
+
idxStr := chi.URLParam(r, "idx")
113
+
if idxStr == "" {
114
+
http.Error(w, "step index required", http.StatusBadRequest)
102
115
return
103
116
}
104
-
105
-
name := chi.URLParam(r, "name")
106
-
if name == "" {
107
-
http.Error(w, "name required", http.StatusBadRequest)
117
+
idx, err := strconv.Atoi(idxStr)
118
+
if err != nil {
119
+
http.Error(w, "bad step index", http.StatusBadRequest)
108
120
return
109
121
}
110
122
111
-
wid := models.WorkflowId{
112
-
PipelineId: models.PipelineId{
113
-
Knot: knot,
114
-
Rkey: rkey,
115
-
},
116
-
Name: name,
117
-
}
123
+
s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error {
124
+
return s.streamLogFromDisk(ctx, conn, wid, idx)
125
+
})
126
+
}
118
127
119
-
l = l.With("knot", knot, "rkey", rkey, "name", name)
128
+
func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) {
129
+
l := s.l.With("handler", "Logs")
120
130
121
131
conn, err := upgrader.Upgrade(w, r, nil)
122
132
if err != nil {
···
140
150
}
141
151
}()
142
152
143
-
if err := s.streamLogs(ctx, conn, wid); err != nil {
144
-
l.Error("streamLogs failed", "err", err)
153
+
if err := streamFn(ctx, conn); err != nil {
154
+
l.Error("log stream failed", "err", err)
145
155
}
146
156
l.Debug("logs connection closed")
147
157
}
···
206
216
return nil
207
217
}
208
218
219
+
func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error {
220
+
streams := []string{"stdout", "stderr"}
221
+
222
+
for _, stream := range streams {
223
+
data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx)
224
+
if err != nil {
225
+
// log but continue to next stream
226
+
s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err)
227
+
continue
228
+
}
229
+
230
+
scanner := bufio.NewScanner(strings.NewReader(data))
231
+
for scanner.Scan() {
232
+
select {
233
+
case <-ctx.Done():
234
+
return ctx.Err()
235
+
default:
236
+
msg := map[string]string{
237
+
"type": stream,
238
+
"data": scanner.Text(),
239
+
}
240
+
if err := conn.WriteJSON(msg); err != nil {
241
+
return err
242
+
}
243
+
}
244
+
}
245
+
246
+
if err := scanner.Err(); err != nil {
247
+
return fmt.Errorf("error scanning %s log: %w", stream, err)
248
+
}
249
+
}
250
+
251
+
return nil
252
+
}
253
+
209
254
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
210
255
events, err := s.db.GetEvents(*cursor)
211
256
if err != nil {
···
242
287
243
288
return nil
244
289
}
290
+
291
+
func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
292
+
knot := chi.URLParam(r, "knot")
293
+
rkey := chi.URLParam(r, "rkey")
294
+
name := chi.URLParam(r, "name")
295
+
296
+
if knot == "" || rkey == "" || name == "" {
297
+
return models.WorkflowId{}, fmt.Errorf("missing required parameters")
298
+
}
299
+
300
+
return models.WorkflowId{
301
+
PipelineId: models.PipelineId{
302
+
Knot: knot,
303
+
Rkey: rkey,
304
+
},
305
+
Name: name,
306
+
}, nil
307
+
}