forked from tangled.org/core
Monorepo for Tangled

spindle/engine: stream logs to websocket

Logs are streamed for each running pipeline on a websocket at
/logs/{pipelineID}. engine.TailStep demuxes stdout and stderr from the
container's logs and pipes that out to corresponding stdout and stderr
channels.

These channels are maintained inside engine's container
struct, key'd by the pipeline ID, and protected by a read/write mutex.
engine.LogChannels fetches the stdout/stderr chans as recieve-only if
the pipeline is known to exist.

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

anirudh.fi 7f7d89ac d643cad8

verified
Changed files
+201 -7
spindle
+98 -5
spindle/engine/engine.go
··· 1 package engine 2 3 import ( 4 "context" 5 "fmt" 6 "io" ··· 32 l *slog.Logger 33 db *db.DB 34 n *notifier.Notifier 35 } 36 37 func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { ··· 42 43 l := log.FromContext(ctx).With("component", "spindle") 44 45 - return &Engine{docker: dcli, l: l, db: db, n: n}, nil 46 } 47 48 // SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. ··· 136 // ONLY marks pipeline as failed if container's exit code is non-zero. 137 // All other errors are bubbled up. 138 func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error { 139 for _, step := range steps { 140 hostConfig := hostConfig(id) 141 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ ··· 166 wg.Add(1) 167 go func() { 168 defer wg.Done() 169 - err := e.TailStep(ctx, resp.ID) 170 if err != nil { 171 e.l.Error("failed to tail container", "container", resp.ID) 172 return ··· 211 return info.State, nil 212 } 213 214 - func (e *Engine) TailStep(ctx context.Context, containerID string) error { 215 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 216 Follow: true, 217 ShowStdout: true, ··· 223 return err 224 } 225 226 go func() { 227 - _, _ = stdcopy.StdCopy(os.Stdout, os.Stdout, logs) 228 - _ = logs.Close() 229 }() 230 return nil 231 } 232 233 func workspaceVolume(id string) string {
··· 1 package engine 2 3 import ( 4 + "bufio" 5 "context" 6 "fmt" 7 "io" ··· 33 l *slog.Logger 34 db *db.DB 35 n *notifier.Notifier 36 + 37 + chanMu sync.RWMutex 38 + stdoutChans map[string]chan string 39 + stderrChans map[string]chan string 40 } 41 42 func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { ··· 47 48 l := log.FromContext(ctx).With("component", "spindle") 49 50 + e := &Engine{ 51 + docker: dcli, 52 + l: l, 53 + db: db, 54 + n: n, 55 + } 56 + 57 + e.stdoutChans = make(map[string]chan string, 100) 58 + e.stderrChans = make(map[string]chan string, 100) 59 + 60 + return e, nil 61 } 62 63 // SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. ··· 151 // ONLY marks pipeline as failed if container's exit code is non-zero. 152 // All other errors are bubbled up. 153 func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error { 154 + // set up logging channels 155 + e.chanMu.Lock() 156 + if _, exists := e.stdoutChans[id]; !exists { 157 + e.stdoutChans[id] = make(chan string, 100) 158 + } 159 + if _, exists := e.stderrChans[id]; !exists { 160 + e.stderrChans[id] = make(chan string, 100) 161 + } 162 + e.chanMu.Unlock() 163 + 164 + // close channels after all steps are complete 165 + defer func() { 166 + close(e.stdoutChans[id]) 167 + close(e.stderrChans[id]) 168 + }() 169 + 170 for _, step := range steps { 171 hostConfig := hostConfig(id) 172 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ ··· 197 wg.Add(1) 198 go func() { 199 defer wg.Done() 200 + err := e.TailStep(ctx, resp.ID, id) 201 if err != nil { 202 e.l.Error("failed to tail container", "container", resp.ID) 203 return ··· 242 return info.State, nil 243 } 244 245 + func (e *Engine) TailStep(ctx context.Context, containerID, pipelineID string) error { 246 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 247 Follow: true, 248 ShowStdout: true, ··· 254 return err 255 } 256 257 + // using StdCopy we demux logs and stream stdout and stderr to different 258 + // channels. 259 + // 260 + // stdout w||r stdoutCh 261 + // stderr w||r stderrCh 262 + // 263 + 264 + rpipeOut, wpipeOut := io.Pipe() 265 + rpipeErr, wpipeErr := io.Pipe() 266 + 267 go func() { 268 + defer wpipeOut.Close() 269 + defer wpipeErr.Close() 270 + _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs) 271 + if err != nil && err != io.EOF { 272 + e.l.Error("failed to copy logs", "error", err) 273 + } 274 + }() 275 + 276 + // read from stdout and send to stdout pipe 277 + // NOTE: the stdoutCh channnel is closed further up in StartSteps 278 + // once all steps are done. 279 + go func() { 280 + e.chanMu.RLock() 281 + stdoutCh := e.stdoutChans[pipelineID] 282 + e.chanMu.RUnlock() 283 + 284 + scanner := bufio.NewScanner(rpipeOut) 285 + for scanner.Scan() { 286 + stdoutCh <- scanner.Text() 287 + } 288 + if err := scanner.Err(); err != nil { 289 + e.l.Error("failed to scan stdout", "error", err) 290 + } 291 + }() 292 + 293 + // read from stderr and send to stderr pipe 294 + // NOTE: the stderrCh channnel is closed further up in StartSteps 295 + // once all steps are done. 296 + go func() { 297 + e.chanMu.RLock() 298 + stderrCh := e.stderrChans[pipelineID] 299 + e.chanMu.RUnlock() 300 + 301 + scanner := bufio.NewScanner(rpipeErr) 302 + for scanner.Scan() { 303 + stderrCh <- scanner.Text() 304 + } 305 + if err := scanner.Err(); err != nil { 306 + e.l.Error("failed to scan stderr", "error", err) 307 + } 308 }() 309 + 310 return nil 311 + } 312 + 313 + func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) { 314 + e.chanMu.RLock() 315 + defer e.chanMu.RUnlock() 316 + 317 + stdoutCh, ok1 := e.stdoutChans[pipelineID] 318 + stderrCh, ok2 := e.stderrChans[pipelineID] 319 + 320 + if !ok1 || !ok2 { 321 + return nil, nil, false 322 + } 323 + return stdoutCh, stderrCh, true 324 } 325 326 func workspaceVolume(id string) string {
+3 -2
spindle/server.go
··· 6 "log/slog" 7 "net/http" 8 9 "golang.org/x/net/context" 10 "tangled.sh/tangled.sh/core/api/tangled" 11 "tangled.sh/tangled.sh/core/jetstream" ··· 52 } 53 54 n := notifier.New() 55 - 56 eng, err := engine.New(ctx, d, &n) 57 if err != nil { 58 return err ··· 89 } 90 91 func (s *Spindle) Router() http.Handler { 92 - mux := &http.ServeMux{} 93 94 mux.HandleFunc("/events", s.Events) 95 return mux 96 } 97
··· 6 "log/slog" 7 "net/http" 8 9 + "github.com/go-chi/chi/v5" 10 "golang.org/x/net/context" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" ··· 53 } 54 55 n := notifier.New() 56 eng, err := engine.New(ctx, d, &n) 57 if err != nil { 58 return err ··· 89 } 90 91 func (s *Spindle) Router() http.Handler { 92 + mux := chi.NewRouter() 93 94 mux.HandleFunc("/events", s.Events) 95 + mux.HandleFunc("/logs/{pipelineID}", s.Logs) 96 return mux 97 } 98
+100
spindle/stream.go
··· 1 package spindle 2 3 import ( 4 "net/http" 5 "time" 6 7 "context" 8 9 "github.com/gorilla/websocket" 10 ) 11 ··· 72 } 73 } 74 } 75 } 76 77 func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error {
··· 1 package spindle 2 3 import ( 4 + "fmt" 5 "net/http" 6 "time" 7 8 "context" 9 10 + "github.com/go-chi/chi/v5" 11 "github.com/gorilla/websocket" 12 ) 13 ··· 74 } 75 } 76 } 77 + } 78 + 79 + func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) { 80 + l := s.l.With("handler", "Logs") 81 + 82 + pipelineID := chi.URLParam(r, "pipelineID") 83 + if pipelineID == "" { 84 + http.Error(w, "pipelineID required", http.StatusBadRequest) 85 + return 86 + } 87 + l = l.With("pipelineID", pipelineID) 88 + 89 + conn, err := upgrader.Upgrade(w, r, nil) 90 + if err != nil { 91 + l.Error("websocket upgrade failed", "err", err) 92 + http.Error(w, "failed to upgrade", http.StatusInternalServerError) 93 + return 94 + } 95 + defer conn.Close() 96 + l.Info("upgraded http to wss") 97 + 98 + ctx, cancel := context.WithCancel(r.Context()) 99 + defer cancel() 100 + 101 + go func() { 102 + for { 103 + if _, _, err := conn.NextReader(); err != nil { 104 + l.Info("client disconnected", "err", err) 105 + cancel() 106 + return 107 + } 108 + } 109 + }() 110 + 111 + if err := s.streamLogs(ctx, conn, pipelineID); err != nil { 112 + l.Error("streamLogs failed", "err", err) 113 + } 114 + l.Info("logs connection closed") 115 + } 116 + 117 + func (s *Spindle) streamLogs(ctx context.Context, conn *websocket.Conn, pipelineID string) error { 118 + l := s.l.With("pipelineID", pipelineID) 119 + 120 + stdoutCh, stderrCh, ok := s.eng.LogChannels(pipelineID) 121 + if !ok { 122 + return fmt.Errorf("pipelineID %q not found", pipelineID) 123 + } 124 + 125 + done := make(chan struct{}) 126 + 127 + go func() { 128 + for { 129 + select { 130 + case line, ok := <-stdoutCh: 131 + if !ok { 132 + done <- struct{}{} 133 + return 134 + } 135 + msg := map[string]string{"type": "stdout", "data": line} 136 + if err := conn.WriteJSON(msg); err != nil { 137 + l.Error("write stdout failed", "err", err) 138 + done <- struct{}{} 139 + return 140 + } 141 + case <-ctx.Done(): 142 + done <- struct{}{} 143 + return 144 + } 145 + } 146 + }() 147 + 148 + go func() { 149 + for { 150 + select { 151 + case line, ok := <-stderrCh: 152 + if !ok { 153 + done <- struct{}{} 154 + return 155 + } 156 + msg := map[string]string{"type": "stderr", "data": line} 157 + if err := conn.WriteJSON(msg); err != nil { 158 + l.Error("write stderr failed", "err", err) 159 + done <- struct{}{} 160 + return 161 + } 162 + case <-ctx.Done(): 163 + done <- struct{}{} 164 + return 165 + } 166 + } 167 + }() 168 + 169 + select { 170 + case <-done: 171 + case <-ctx.Done(): 172 + } 173 + 174 + return nil 175 } 176 177 func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error {