forked from tangled.org/core
Monorepo for Tangled

appview,spindle: group logs based on step

spindle logs now inform the beginning of a new step using a "control"
message, with logs categorized under a "data" message.

the appview in turn, is able to group logs by step:

- upon encountering a control message, it begins a new step (a
collapsible details tag)
- upon encontering a data message, it adds the log line into the last
encountered step

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 08ab8698 80373ac0

verified
Changed files
+68 -34
appview
pages
templates
pipelines
repo
spindle
engine
models
+1 -2
appview/pages/templates/repo/settings.html
··· 91 <select id="spindle" name="spindle" required class="p-1 border border-gray-200 bg-white dark:bg-gray-800 dark:text-white dark:border-gray-700"> 92 <option 93 value="" 94 - disabled 95 selected 96 > 97 - Choose a spindle 98 </option> 99 {{ range .Spindles }} 100 <option
··· 91 <select id="spindle" name="spindle" required class="p-1 border border-gray-200 bg-white dark:bg-gray-800 dark:text-white dark:border-gray-700"> 92 <option 93 value="" 94 selected 95 > 96 + None 97 </option> 98 {{ range .Spindles }} 99 <option
+25 -6
appview/pipelines/pipelines.go
··· 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 "strings" ··· 262 } 263 }() 264 265 for { 266 select { 267 case <-ctx.Done(): ··· 284 continue 285 } 286 287 - html := fmt.Appendf(nil, ` 288 - <div id="lines" hx-swap-oob="beforeend"> 289 - <p>%s: %s</p> 290 - </div> 291 - `, logLine.Stream, logLine.Data) 292 293 - if err = clientConn.WriteMessage(websocket.TextMessage, html); err != nil { 294 l.Error("error writing to client", "err", err) 295 return 296 }
··· 4 "context" 5 "encoding/json" 6 "fmt" 7 + "html" 8 "log/slog" 9 "net/http" 10 "strings" ··· 263 } 264 }() 265 266 + stepIdx := 0 267 for { 268 select { 269 case <-ctx.Done(): ··· 286 continue 287 } 288 289 + var fragment []byte 290 + switch logLine.Kind { 291 + case spindlemodel.LogKindControl: 292 + // control messages create a new step block 293 + stepIdx++ 294 + fragment = fmt.Appendf(nil, ` 295 + <div id="lines" hx-swap-oob="beforeend"> 296 + <details id="step-%d" open> 297 + <summary>%s</summary> 298 + <div id="step-body-%d"></div> 299 + </details> 300 + </div> 301 + `, stepIdx, logLine.Content, stepIdx) 302 + case spindlemodel.LogKindData: 303 + // data messages simply insert new log lines into current step 304 + escaped := html.EscapeString(logLine.Content) 305 + fragment = fmt.Appendf(nil, ` 306 + <div id="step-body-%d" hx-swap-oob="beforeend"> 307 + <p>%s</p> 308 + </div> 309 + `, stepIdx, escaped) 310 + } 311 312 + if err = clientConn.WriteMessage(websocket.TextMessage, fragment); err != nil { 313 l.Error("error writing to client", "err", err) 314 return 315 }
+1
appview/repo/repo.go
··· 234 Owner: user.Did, 235 CreatedAt: f.CreatedAt, 236 Description: &newDescription, 237 }, 238 }, 239 })
··· 234 Owner: user.Did, 235 CreatedAt: f.CreatedAt, 236 Description: &newDescription, 237 + Spindle: &f.Spindle, 238 }, 239 }, 240 })
+14 -11
spindle/engine/engine.go
··· 227 // start tailing logs in background 228 tailDone := make(chan error, 1) 229 go func() { 230 - tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx) 231 }() 232 233 // wait for container completion or timeout ··· 307 return info.State, nil 308 } 309 310 - func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int) error { 311 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 312 Follow: true, 313 ShowStdout: true, ··· 319 return err 320 } 321 322 - wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid) 323 - if err != nil { 324 - e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 325 - return err 326 - } 327 - defer wfLogger.Close() 328 - 329 _, err = stdcopy.StdCopy( 330 - wfLogger.Writer("stdout", stepIdx), 331 - wfLogger.Writer("stderr", stepIdx), 332 logs, 333 ) 334 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
··· 227 // start tailing logs in background 228 tailDone := make(chan error, 1) 229 go func() { 230 + tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step) 231 }() 232 233 // wait for container completion or timeout ··· 307 return info.State, nil 308 } 309 310 + func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 311 + wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid) 312 + if err != nil { 313 + e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 314 + return err 315 + } 316 + defer wfLogger.Close() 317 + 318 + ctl := wfLogger.ControlWriter() 319 + ctl.Write([]byte(step.Command)) 320 + 321 logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 322 Follow: true, 323 ShowStdout: true, ··· 329 return err 330 } 331 332 _, err = stdcopy.StdCopy( 333 + wfLogger.DataWriter("stdout"), 334 + wfLogger.DataWriter("stderr"), 335 logs, 336 ) 337 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
+11 -12
spindle/engine/logger.go
··· 30 }, nil 31 } 32 33 - func (l *WorkflowLogger) Write(p []byte) (n int, err error) { 34 - return l.file.Write(p) 35 } 36 37 func (l *WorkflowLogger) Close() error { 38 return l.file.Close() 39 } 40 41 - func LogFilePath(baseDir string, workflowID models.WorkflowId) string { 42 - logFilePath := filepath.Join(baseDir, fmt.Sprintf("%s.log", workflowID.String())) 43 - return logFilePath 44 } 45 46 - func (l *WorkflowLogger) Writer(stream string, stepId int) io.Writer { 47 - return &jsonWriter{logger: l, stream: stream, stepId: stepId} 48 } 49 50 type jsonWriter struct { 51 logger *WorkflowLogger 52 - stream string 53 - stepId int 54 } 55 56 func (w *jsonWriter) Write(p []byte) (int, error) { 57 line := strings.TrimRight(string(p), "\r\n") 58 59 entry := models.LogLine{ 60 - Stream: w.stream, 61 - Data: line, 62 - StepId: w.stepId, 63 } 64 65 if err := w.logger.encoder.Encode(entry); err != nil {
··· 30 }, nil 31 } 32 33 + func LogFilePath(baseDir string, workflowID models.WorkflowId) string { 34 + logFilePath := filepath.Join(baseDir, fmt.Sprintf("%s.log", workflowID.String())) 35 + return logFilePath 36 } 37 38 func (l *WorkflowLogger) Close() error { 39 return l.file.Close() 40 } 41 42 + func (l *WorkflowLogger) DataWriter(stream string) io.Writer { 43 + // TODO: emit stream 44 + return &jsonWriter{logger: l, kind: models.LogKindData} 45 } 46 47 + func (l *WorkflowLogger) ControlWriter() io.Writer { 48 + return &jsonWriter{logger: l, kind: models.LogKindControl} 49 } 50 51 type jsonWriter struct { 52 logger *WorkflowLogger 53 + kind models.LogKind 54 } 55 56 func (w *jsonWriter) Write(p []byte) (int, error) { 57 line := strings.TrimRight(string(p), "\r\n") 58 59 entry := models.LogLine{ 60 + Kind: w.kind, 61 + Content: line, 62 } 63 64 if err := w.logger.encoder.Encode(entry); err != nil {
+16 -3
spindle/models/models.go
··· 71 return slices.Contains(FinishStates[:], s) 72 } 73 74 type LogLine struct { 75 - Stream string `json:"s"` 76 - Data string `json:"d"` 77 - StepId int `json:"i"` 78 }
··· 71 return slices.Contains(FinishStates[:], s) 72 } 73 74 + type LogKind string 75 + 76 + var ( 77 + // step log data 78 + LogKindData LogKind = "data" 79 + // indicates start/end of a step 80 + LogKindControl LogKind = "control" 81 + ) 82 + 83 type LogLine struct { 84 + Kind LogKind `json:"kind"` 85 + Content string `json:"content"` 86 + 87 + // fields if kind is "data" 88 + Stream string `json:"stream,omitempty"` 89 + 90 + // fields if kind is "control" 91 }