+20
appview/pages/pages.go
+20
appview/pages/pages.go
···
974
return p.executeRepo("repo/pipelines/pipelines", w, params)
975
}
976
977
+
type LogBlockParams struct {
978
+
Id int
979
+
Name string
980
+
Command string
981
+
Collapsed bool
982
+
}
983
+
984
+
func (p *Pages) LogBlock(w io.Writer, params LogBlockParams) error {
985
+
return p.executePlain("repo/pipelines/fragments/logBlock", w, params)
986
+
}
987
+
988
+
type LogLineParams struct {
989
+
Id int
990
+
Content string
991
+
}
992
+
993
+
func (p *Pages) LogLine(w io.Writer, params LogLineParams) error {
994
+
return p.executePlain("repo/pipelines/fragments/logLine", w, params)
995
+
}
996
+
997
type WorkflowParams struct {
998
LoggedInUser *oauth.User
999
RepoInfo repoinfo.RepoInfo
+16
appview/pages/templates/repo/pipelines/fragments/logBlock.html
+16
appview/pages/templates/repo/pipelines/fragments/logBlock.html
···
···
1
+
{{ define "repo/pipelines/fragments/logBlock" }}
2
+
<div id="lines" hx-swap-oob="beforeend">
3
+
<details id="step-{{ .Id }}" {{if not .Collapsed}}open{{end}} class="group bg-gray-100 px-2 dark:bg-gray-900">
4
+
<summary class="sticky top-0 py-1 list-none cursor-pointer py-2 bg-gray-100 dark:bg-gray-900 hover:text-gray-500 hover:dark:text-gray-400">
5
+
<div class="group-open:hidden flex items-center gap-1">
6
+
{{ i "chevron-right" "w-4 h-4" }} {{ .Name }}
7
+
</div>
8
+
<div class="hidden group-open:flex items-center gap-1">
9
+
{{ i "chevron-down" "w-4 h-4" }} {{ .Name }}
10
+
</div>
11
+
</summary>
12
+
<div class="text-blue-600 dark:text-blue-300 font-mono">{{ .Command }}</div>
13
+
<div id="step-body-{{ .Id }}" class="font-mono"></div>
14
+
</details>
15
+
</div>
16
+
{{ end }}
+6
appview/pages/templates/repo/pipelines/fragments/logLine.html
+6
appview/pages/templates/repo/pipelines/fragments/logLine.html
+5
-13
appview/pages/templates/repo/pipelines/pipelines.html
+5
-13
appview/pages/templates/repo/pipelines/pipelines.html
···
8
9
{{ define "repoContent" }}
10
<div class="flex justify-between items-center gap-4">
11
-
<div class="flex gap-4">
12
-
</div>
13
-
14
-
</div>
15
-
<div class="error" id="issues"></div>
16
-
{{ end }}
17
-
18
-
{{ define "repoAfter" }}
19
-
<section
20
-
class="w-full flex flex-col gap-2 mt-2"
21
-
>
22
{{ range .Pipelines }}
23
{{ block "pipeline" (list $ .) }} {{ end }}
24
{{ else }}
···
26
No pipelines run for this repository.
27
</p>
28
{{ end }}
29
-
</section>
30
{{ end }}
31
32
{{ define "pipeline" }}
33
{{ $root := index . 0 }}
34
{{ $p := index . 1 }}
35
-
<div class="py-4 px-6 bg-white dark:bg-gray-800 dark:text-white">
36
{{ block "pipelineHeader" $ }} {{ end }}
37
</div>
38
{{ end }}
···
8
9
{{ define "repoContent" }}
10
<div class="flex justify-between items-center gap-4">
11
+
<div class="w-full flex flex-col gap-2">
12
{{ range .Pipelines }}
13
{{ block "pipeline" (list $ .) }} {{ end }}
14
{{ else }}
···
16
No pipelines run for this repository.
17
</p>
18
{{ end }}
19
+
</div>
20
+
</div>
21
{{ end }}
22
+
23
24
{{ define "pipeline" }}
25
{{ $root := index . 0 }}
26
{{ $p := index . 1 }}
27
+
<div class="py-2 bg-white dark:bg-gray-800 dark:text-white">
28
{{ block "pipelineHeader" $ }} {{ end }}
29
</div>
30
{{ end }}
+3
-5
appview/pages/templates/repo/pipelines/workflow.html
+3
-5
appview/pages/templates/repo/pipelines/workflow.html
···
24
{{ $active := .Workflow }}
25
{{ with .Pipeline }}
26
{{ $id := .Id }}
27
-
<div class="grid grid-cols-1 rounded border border-gray-200 dark:border-gray-700 divide-y divide-gray-200 dark:divide-gray-700">
28
{{ range $name, $all := .Statuses }}
29
<a href="/{{ $.RepoInfo.FullName }}/pipelines/{{ $id }}/workflow/{{ $name }}" class="no-underline hover:no-underline hover:bg-gray-100/25 hover:dark:bg-gray-700/25">
30
<div
···
58
59
{{ define "logs" }}
60
<div id="log-stream"
61
-
class="p-2 bg-gray-100 dark:bg-gray-900 font-mono text-sm min-h-96 max-h-screen overflow-auto flex flex-col-reverse [overflow-anchor:auto_!important]"
62
hx-ext="ws"
63
ws-connect="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/logs">
64
-
<div id="lines">
65
-
<!-- Each log line should be rendered with class="item" like below -->
66
-
<!-- <div class="item">[INFO] Log line here</div> -->
67
</div>
68
</div>
69
{{ end }}
···
24
{{ $active := .Workflow }}
25
{{ with .Pipeline }}
26
{{ $id := .Id }}
27
+
<div class="sticky top-2 grid grid-cols-1 rounded border border-gray-200 dark:border-gray-700 divide-y divide-gray-200 dark:divide-gray-700">
28
{{ range $name, $all := .Statuses }}
29
<a href="/{{ $.RepoInfo.FullName }}/pipelines/{{ $id }}/workflow/{{ $name }}" class="no-underline hover:no-underline hover:bg-gray-100/25 hover:dark:bg-gray-700/25">
30
<div
···
58
59
{{ define "logs" }}
60
<div id="log-stream"
61
+
class="text-sm"
62
hx-ext="ws"
63
ws-connect="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/logs">
64
+
<div id="lines" class="flex flex-col gap-2">
65
</div>
66
</div>
67
{{ end }}
+72
-58
appview/pipelines/pipelines.go
+72
-58
appview/pipelines/pipelines.go
···
1
package pipelines
2
3
import (
4
"context"
5
"encoding/json"
6
-
"fmt"
7
-
"html"
8
"log/slog"
9
"net/http"
10
"strings"
···
170
171
ctx, cancel := context.WithCancel(r.Context())
172
defer cancel()
173
-
go func() {
174
-
for {
175
-
if _, _, err := clientConn.NextReader(); err != nil {
176
-
l.Error("failed to read", "err", err)
177
-
cancel()
178
-
return
179
-
}
180
-
}
181
-
}()
182
183
user := p.oauth.GetUser(r)
184
f, err := p.repoResolver.Resolve(r)
···
238
defer spindleConn.Close()
239
240
// create a channel for incoming messages
241
-
msgChan := make(chan []byte, 10)
242
-
errChan := make(chan error, 1)
243
-
244
// start a goroutine to read from spindle
245
-
go func() {
246
-
defer close(msgChan)
247
-
defer close(errChan)
248
-
249
-
for {
250
-
_, msg, err := spindleConn.ReadMessage()
251
-
if err != nil {
252
-
if websocket.IsCloseError(err,
253
-
websocket.CloseNormalClosure,
254
-
websocket.CloseGoingAway,
255
-
websocket.CloseAbnormalClosure) {
256
-
errChan <- nil // signal graceful end
257
-
} else {
258
-
errChan <- err
259
-
}
260
-
return
261
-
}
262
-
msgChan <- msg
263
-
}
264
-
}()
265
266
stepIdx := 0
267
for {
268
select {
269
case <-ctx.Done():
270
l.Info("client disconnected")
271
return
272
-
case err := <-errChan:
273
-
if err != nil {
274
-
l.Error("error reading from spindle", "err", err)
275
}
276
277
-
if err == nil {
278
-
l.Info("log tail complete")
279
}
280
281
-
return
282
-
case msg := <-msgChan:
283
var logLine spindlemodel.LogLine
284
-
if err = json.Unmarshal(msg, &logLine); err != nil {
285
l.Error("failed to parse logline", "err", err)
286
continue
287
}
288
289
-
var fragment []byte
290
switch logLine.Kind {
291
case spindlemodel.LogKindControl:
292
// control messages create a new step block
293
stepIdx++
294
-
fragment = fmt.Appendf(nil, `
295
-
<div id="lines" hx-swap-oob="beforeend">
296
-
<details id="step-%d" open>
297
-
<summary>%s</summary>
298
-
<div id="step-body-%d"></div>
299
-
</details>
300
-
</div>
301
-
`, stepIdx, logLine.Content, stepIdx)
302
case spindlemodel.LogKindData:
303
// data messages simply insert new log lines into current step
304
-
escaped := html.EscapeString(logLine.Content)
305
-
fragment = fmt.Appendf(nil, `
306
-
<div id="step-body-%d" hx-swap-oob="beforeend">
307
-
<p>%s</p>
308
-
</div>
309
-
`, stepIdx, escaped)
310
}
311
312
-
if err = clientConn.WriteMessage(websocket.TextMessage, fragment); err != nil {
313
l.Error("error writing to client", "err", err)
314
return
315
}
316
case <-time.After(30 * time.Second):
317
l.Debug("sent keepalive")
318
if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
319
l.Error("failed to write control", "err", err)
320
}
321
}
322
}
323
}
···
1
package pipelines
2
3
import (
4
+
"bytes"
5
"context"
6
"encoding/json"
7
"log/slog"
8
"net/http"
9
"strings"
···
169
170
ctx, cancel := context.WithCancel(r.Context())
171
defer cancel()
172
173
user := p.oauth.GetUser(r)
174
f, err := p.repoResolver.Resolve(r)
···
228
defer spindleConn.Close()
229
230
// create a channel for incoming messages
231
+
evChan := make(chan logEvent, 100)
232
// start a goroutine to read from spindle
233
+
go readLogs(spindleConn, evChan)
234
235
stepIdx := 0
236
+
var fragment bytes.Buffer
237
for {
238
select {
239
case <-ctx.Done():
240
l.Info("client disconnected")
241
return
242
+
243
+
case ev, ok := <-evChan:
244
+
if !ok {
245
+
continue
246
}
247
248
+
if ev.err != nil && ev.isCloseError() {
249
+
l.Debug("graceful shutdown, tail complete", "err", err)
250
+
return
251
+
}
252
+
if ev.err != nil {
253
+
l.Error("error reading from spindle", "err", err)
254
+
return
255
}
256
257
var logLine spindlemodel.LogLine
258
+
if err = json.Unmarshal(ev.msg, &logLine); err != nil {
259
l.Error("failed to parse logline", "err", err)
260
continue
261
}
262
263
+
fragment.Reset()
264
+
265
switch logLine.Kind {
266
case spindlemodel.LogKindControl:
267
// control messages create a new step block
268
stepIdx++
269
+
collapsed := false
270
+
if logLine.StepKind == spindlemodel.StepKindSystem {
271
+
collapsed = true
272
+
}
273
+
err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
274
+
Id: stepIdx,
275
+
Name: logLine.Content,
276
+
Command: logLine.StepCommand,
277
+
Collapsed: collapsed,
278
+
})
279
case spindlemodel.LogKindData:
280
// data messages simply insert new log lines into current step
281
+
err = p.pages.LogLine(&fragment, pages.LogLineParams{
282
+
Id: stepIdx,
283
+
Content: logLine.Content,
284
+
})
285
+
}
286
+
if err != nil {
287
+
l.Error("failed to render log line", "err", err)
288
+
return
289
}
290
291
+
if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
292
l.Error("error writing to client", "err", err)
293
return
294
}
295
+
296
case <-time.After(30 * time.Second):
297
l.Debug("sent keepalive")
298
if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
299
l.Error("failed to write control", "err", err)
300
+
return
301
}
302
}
303
}
304
}
305
+
306
+
// either a message or an error
307
+
type logEvent struct {
308
+
msg []byte
309
+
err error
310
+
}
311
+
312
+
func (ev *logEvent) isCloseError() bool {
313
+
return websocket.IsCloseError(
314
+
ev.err,
315
+
websocket.CloseNormalClosure,
316
+
websocket.CloseGoingAway,
317
+
websocket.CloseAbnormalClosure,
318
+
)
319
+
}
320
+
321
+
// read logs from spindle and pass through to chan
322
+
func readLogs(conn *websocket.Conn, ch chan logEvent) {
323
+
defer close(ch)
324
+
325
+
for {
326
+
if conn == nil {
327
+
return
328
+
}
329
+
330
+
_, msg, err := conn.ReadMessage()
331
+
if err != nil {
332
+
ch <- logEvent{err: err}
333
+
return
334
+
}
335
+
ch <- logEvent{msg: msg}
336
+
}
337
+
}
+2
-2
spindle/engine/engine.go
+2
-2
spindle/engine/engine.go
+27
-12
spindle/engine/logger.go
+27
-12
spindle/engine/logger.go
···
41
42
func (l *WorkflowLogger) DataWriter(stream string) io.Writer {
43
// TODO: emit stream
44
-
return &jsonWriter{logger: l, kind: models.LogKindData}
45
}
46
47
-
func (l *WorkflowLogger) ControlWriter() io.Writer {
48
-
return &jsonWriter{logger: l, kind: models.LogKindControl}
49
}
50
51
-
type jsonWriter struct {
52
logger *WorkflowLogger
53
-
kind models.LogKind
54
}
55
56
-
func (w *jsonWriter) Write(p []byte) (int, error) {
57
line := strings.TrimRight(string(p), "\r\n")
58
59
-
entry := models.LogLine{
60
-
Kind: w.kind,
61
-
Content: line,
62
-
}
63
64
if err := w.logger.encoder.Encode(entry); err != nil {
65
return 0, err
66
}
67
-
68
-
return len(p), nil
69
}
···
41
42
func (l *WorkflowLogger) DataWriter(stream string) io.Writer {
43
// TODO: emit stream
44
+
return &dataWriter{
45
+
logger: l,
46
+
stream: stream,
47
+
}
48
}
49
50
+
func (l *WorkflowLogger) ControlWriter(idx int, step models.Step) io.Writer {
51
+
return &controlWriter{
52
+
logger: l,
53
+
idx: idx,
54
+
step: step,
55
+
}
56
}
57
58
+
type dataWriter struct {
59
logger *WorkflowLogger
60
+
stream string
61
}
62
63
+
func (w *dataWriter) Write(p []byte) (int, error) {
64
line := strings.TrimRight(string(p), "\r\n")
65
+
entry := models.NewDataLogLine(line, w.stream)
66
+
if err := w.logger.encoder.Encode(entry); err != nil {
67
+
return 0, err
68
+
}
69
+
return len(p), nil
70
+
}
71
72
+
type controlWriter struct {
73
+
logger *WorkflowLogger
74
+
idx int
75
+
step models.Step
76
+
}
77
78
+
func (w *controlWriter) Write(_ []byte) (int, error) {
79
+
entry := models.NewControlLogLine(w.idx, w.step)
80
if err := w.logger.encoder.Encode(entry); err != nil {
81
return 0, err
82
}
83
+
return len(w.step.Name), nil
84
}
+21
spindle/models/models.go
+21
spindle/models/models.go
···
88
Stream string `json:"stream,omitempty"`
89
90
// fields if kind is "control"
91
+
StepId int `json:"step_id,omitempty"`
92
+
StepKind StepKind `json:"step_kind,omitempty"`
93
+
StepCommand string `json:"step_command,omitempty"`
94
+
}
95
+
96
+
func NewDataLogLine(content, stream string) LogLine {
97
+
return LogLine{
98
+
Kind: LogKindData,
99
+
Content: content,
100
+
Stream: stream,
101
+
}
102
+
}
103
+
104
+
func NewControlLogLine(idx int, step Step) LogLine {
105
+
return LogLine{
106
+
Kind: LogKindControl,
107
+
Content: step.Name,
108
+
StepId: idx,
109
+
StepKind: step.Kind,
110
+
StepCommand: step.Command,
111
+
}
112
}
+15
-1
spindle/models/pipeline.go
+15
-1
spindle/models/pipeline.go
···
15
Command string
16
Name string
17
Environment map[string]string
18
}
19
20
type Workflow struct {
21
Steps []Step
···
46
sstep.Environment = stepEnvToMap(tstep.Environment)
47
sstep.Command = tstep.Command
48
sstep.Name = tstep.Name
49
swf.Steps = append(swf.Steps, sstep)
50
}
51
swf.Name = twf.Name
···
59
setup.addStep(nixConfStep())
60
setup.addStep(cloneStep(*twf, *pl.TriggerMetadata.Repo, cfg.Server.Dev))
61
setup.addStep(checkoutStep(*twf, *pl.TriggerMetadata))
62
-
setup.addStep(dependencyStep(*twf))
63
64
// append setup steps in order to the start of workflow steps
65
swf.Steps = append(*setup, swf.Steps...)
···
15
Command string
16
Name string
17
Environment map[string]string
18
+
Kind StepKind
19
}
20
+
21
+
type StepKind int
22
+
23
+
const (
24
+
// steps injected by the CI runner
25
+
StepKindSystem StepKind = iota
26
+
// steps defined by the user in the original pipeline
27
+
StepKindUser
28
+
)
29
30
type Workflow struct {
31
Steps []Step
···
56
sstep.Environment = stepEnvToMap(tstep.Environment)
57
sstep.Command = tstep.Command
58
sstep.Name = tstep.Name
59
+
sstep.Kind = StepKindUser
60
swf.Steps = append(swf.Steps, sstep)
61
}
62
swf.Name = twf.Name
···
70
setup.addStep(nixConfStep())
71
setup.addStep(cloneStep(*twf, *pl.TriggerMetadata.Repo, cfg.Server.Dev))
72
setup.addStep(checkoutStep(*twf, *pl.TriggerMetadata))
73
+
// this step could be empty
74
+
if s := dependencyStep(*twf); s != nil {
75
+
setup.addStep(*s)
76
+
}
77
78
// append setup steps in order to the start of workflow steps
79
swf.Steps = append(*setup, swf.Steps...)
+3
-3
spindle/models/setup_steps.go
+3
-3
spindle/models/setup_steps.go
···
83
// For dependencies using a custom registry (i.e. not nixpkgs), it collects
84
// all packages and adds a single 'nix profile install' step to the
85
// beginning of the workflow's step list.
86
-
func dependencyStep(twf tangled.Pipeline_Workflow) Step {
87
var customPackages []string
88
89
for _, d := range twf.Dependencies {
···
111
"NIX_SHOW_DOWNLOAD_PROGRESS": "0",
112
},
113
}
114
-
return installStep
115
}
116
-
return Step{}
117
}
···
83
// For dependencies using a custom registry (i.e. not nixpkgs), it collects
84
// all packages and adds a single 'nix profile install' step to the
85
// beginning of the workflow's step list.
86
+
func dependencyStep(twf tangled.Pipeline_Workflow) *Step {
87
var customPackages []string
88
89
for _, d := range twf.Dependencies {
···
111
"NIX_SHOW_DOWNLOAD_PROGRESS": "0",
112
},
113
}
114
+
return &installStep
115
}
116
+
return nil
117
}