Monorepo for Tangled tangled.org

spindle: stream logs from disk

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

authored by anirudh.fi and committed by Tangled 3a3a98e0 ad84c0f8

Changed files
+85 -21
spindle
+1
spindle/server.go
··· 148 148 w.Write([]byte(s.cfg.Server.Owner)) 149 149 }) 150 150 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 151 + mux.HandleFunc("/logs/{knot}/{rkey}/{name}/{idx}", s.StepLogs) 151 152 return mux 152 153 } 153 154
+84 -21
spindle/stream.go
··· 1 1 package spindle 2 2 3 3 import ( 4 + "bufio" 4 5 "context" 5 6 "encoding/json" 6 7 "fmt" 7 8 "net/http" 8 9 "strconv" 10 + "strings" 9 11 "time" 10 12 13 + "tangled.sh/tangled.sh/core/spindle/engine" 11 14 "tangled.sh/tangled.sh/core/spindle/models" 12 15 13 16 "github.com/go-chi/chi/v5" ··· 88 91 } 89 92 90 93 func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 91 - l := s.l.With("handler", "Logs") 94 + wid, err := getWorkflowID(r) 95 + if err != nil { 96 + http.Error(w, err.Error(), http.StatusBadRequest) 97 + return 98 + } 92 99 93 - knot := chi.URLParam(r, "knot") 94 - if knot == "" { 95 - http.Error(w, "knot required", http.StatusBadRequest) 100 + s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error { 101 + return s.streamLogs(ctx, conn, wid) 102 + }) 103 + } 104 + 105 + func (s *Spindle) StepLogs(w http.ResponseWriter, r *http.Request) { 106 + wid, err := getWorkflowID(r) 107 + if err != nil { 108 + http.Error(w, err.Error(), http.StatusBadRequest) 96 109 return 97 110 } 98 111 99 - rkey := chi.URLParam(r, "rkey") 100 - if rkey == "" { 101 - http.Error(w, "rkey required", http.StatusBadRequest) 112 + idxStr := chi.URLParam(r, "idx") 113 + if idxStr == "" { 114 + http.Error(w, "step index required", http.StatusBadRequest) 102 115 return 103 116 } 104 - 105 - name := chi.URLParam(r, "name") 106 - if name == "" { 107 - http.Error(w, "name required", http.StatusBadRequest) 117 + idx, err := strconv.Atoi(idxStr) 118 + if err != nil { 119 + http.Error(w, "bad step index", http.StatusBadRequest) 108 120 return 109 121 } 110 122 111 - wid := models.WorkflowId{ 112 - PipelineId: models.PipelineId{ 113 - Knot: knot, 114 - Rkey: rkey, 115 - }, 116 - Name: name, 117 - } 123 + s.handleLogStream(w, r, func(ctx context.Context, conn *websocket.Conn) error { 124 + return s.streamLogFromDisk(ctx, conn, wid, idx) 125 + }) 126 + } 118 127 119 - l = l.With("knot", knot, "rkey", rkey, "name", name) 128 + func (s *Spindle) handleLogStream(w http.ResponseWriter, r *http.Request, streamFn func(ctx context.Context, conn *websocket.Conn) error) { 129 + l := s.l.With("handler", "Logs") 120 130 121 131 conn, err := upgrader.Upgrade(w, r, nil) 122 132 if err != nil { ··· 140 150 } 141 151 }() 142 152 143 - if err := s.streamLogs(ctx, conn, wid); err != nil { 144 - l.Error("streamLogs failed", "err", err) 153 + if err := streamFn(ctx, conn); err != nil { 154 + l.Error("log stream failed", "err", err) 145 155 } 146 156 l.Debug("logs connection closed") 147 157 } ··· 206 216 return nil 207 217 } 208 218 219 + func (s *Spindle) streamLogFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId, stepIdx int) error { 220 + streams := []string{"stdout", "stderr"} 221 + 222 + for _, stream := range streams { 223 + data, err := engine.ReadStepLog(s.cfg.Pipelines.LogDir, wid.String(), stream, stepIdx) 224 + if err != nil { 225 + // log but continue to next stream 226 + s.l.Error("failed to read step log", "stream", stream, "step", stepIdx, "wid", wid.String(), "err", err) 227 + continue 228 + } 229 + 230 + scanner := bufio.NewScanner(strings.NewReader(data)) 231 + for scanner.Scan() { 232 + select { 233 + case <-ctx.Done(): 234 + return ctx.Err() 235 + default: 236 + msg := map[string]string{ 237 + "type": stream, 238 + "data": scanner.Text(), 239 + } 240 + if err := conn.WriteJSON(msg); err != nil { 241 + return err 242 + } 243 + } 244 + } 245 + 246 + if err := scanner.Err(); err != nil { 247 + return fmt.Errorf("error scanning %s log: %w", stream, err) 248 + } 249 + } 250 + 251 + return nil 252 + } 253 + 209 254 func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 210 255 events, err := s.db.GetEvents(*cursor) 211 256 if err != nil { ··· 242 287 243 288 return nil 244 289 } 290 + 291 + func getWorkflowID(r *http.Request) (models.WorkflowId, error) { 292 + knot := chi.URLParam(r, "knot") 293 + rkey := chi.URLParam(r, "rkey") 294 + name := chi.URLParam(r, "name") 295 + 296 + if knot == "" || rkey == "" || name == "" { 297 + return models.WorkflowId{}, fmt.Errorf("missing required parameters") 298 + } 299 + 300 + return models.WorkflowId{ 301 + PipelineId: models.PipelineId{ 302 + Knot: knot, 303 + Rkey: rkey, 304 + }, 305 + Name: name, 306 + }, nil 307 + }