+7
-1
spindle/config/config.go
+7
-1
spindle/config/config.go
···
15
Owner string `env:"OWNER, required"`
16
}
17
18
+
type Pipelines struct {
19
+
// TODO: change default to nixery.tangled.sh
20
+
Nixery string `env:"NIXERY, default=nixery.dev"`
21
+
}
22
+
23
type Config struct {
24
+
Server Server `env:",prefix=SPINDLE_SERVER_"`
25
+
Pipelines Pipelines `env:",prefix=SPINDLE_PIPELINES_"`
26
}
27
28
func Load(ctx context.Context) (*Config, error) {
+21
spindle/engine/ansi_stripper.go
+21
spindle/engine/ansi_stripper.go
···
···
1
+
package engine
2
+
3
+
import (
4
+
"io"
5
+
6
+
"github.com/go-enry/go-enry/v2/regex"
7
+
)
8
+
9
+
// regex to match ANSI escape codes (e.g., color codes, cursor moves)
10
+
const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))"
11
+
12
+
var re = regex.MustCompile(ansi)
13
+
14
+
type ansiStrippingWriter struct {
15
+
underlying io.Writer
16
+
}
17
+
18
+
func (w *ansiStrippingWriter) Write(p []byte) (int, error) {
19
+
clean := re.ReplaceAll(p, []byte{})
20
+
return w.underlying.Write(clean)
21
+
}
+42
-28
spindle/engine/engine.go
+42
-28
spindle/engine/engine.go
···
7
"io"
8
"log/slog"
9
"os"
10
-
"path"
11
"strings"
12
"sync"
13
···
18
"github.com/docker/docker/api/types/volume"
19
"github.com/docker/docker/client"
20
"github.com/docker/docker/pkg/stdcopy"
21
-
"tangled.sh/tangled.sh/core/api/tangled"
22
"tangled.sh/tangled.sh/core/log"
23
"tangled.sh/tangled.sh/core/notifier"
24
"tangled.sh/tangled.sh/core/spindle/db"
25
"tangled.sh/tangled.sh/core/spindle/models"
26
)
···
36
l *slog.Logger
37
db *db.DB
38
n *notifier.Notifier
39
40
chanMu sync.RWMutex
41
stdoutChans map[string]chan string
···
45
cleanup map[string][]cleanupFunc
46
}
47
48
-
func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
49
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
50
if err != nil {
51
return nil, err
···
58
l: l,
59
db: db,
60
n: n,
61
}
62
63
e.stdoutChans = make(map[string]chan string, 100)
···
68
return e, nil
69
}
70
71
-
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) {
72
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
73
74
wg := sync.WaitGroup{}
···
93
}
94
defer e.DestroyWorkflow(ctx, wid)
95
96
-
// TODO: actual checks for image/registry etc.
97
-
var deps string
98
-
for _, d := range w.Dependencies {
99
-
if d.Registry == "nixpkgs" {
100
-
deps = path.Join(d.Packages...)
101
-
}
102
-
}
103
-
104
-
// load defaults from somewhere else
105
-
deps = path.Join(deps, "bash", "git", "coreutils", "nix")
106
-
107
-
cimg := path.Join("nixery.dev", deps)
108
-
reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{})
109
if err != nil {
110
e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
111
···
119
defer reader.Close()
120
io.Copy(os.Stdout, reader)
121
122
-
err = e.StartSteps(ctx, w.Steps, wid, cimg)
123
if err != nil {
124
e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
125
126
-
err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
127
-
if err != nil {
128
-
return err
129
}
130
131
return fmt.Errorf("starting steps image: %w", err)
···
187
// StartSteps starts all steps sequentially with the same base image.
188
// ONLY marks pipeline as failed if container's exit code is non-zero.
189
// All other errors are bubbled up.
190
-
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error {
191
// set up logging channels
192
e.chanMu.Lock()
193
if _, exists := e.stdoutChans[wid.String()]; !exists {
···
259
}
260
261
if state.ExitCode != 0 {
262
-
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode)
263
-
return fmt.Errorf("%s", state.Error)
264
}
265
}
266
···
293
Follow: true,
294
ShowStdout: true,
295
ShowStderr: true,
296
-
Details: false,
297
Timestamps: false,
298
})
299
if err != nil {
300
return err
301
}
302
303
// using StdCopy we demux logs and stream stdout and stderr to different
304
// channels.
305
//
···
310
rpipeOut, wpipeOut := io.Pipe()
311
rpipeErr, wpipeErr := io.Pipe()
312
313
go func() {
314
defer wpipeOut.Close()
315
defer wpipeErr.Close()
316
-
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs)
317
if err != nil && err != io.EOF {
318
e.l.Error("failed to copy logs", "error", err)
319
}
···
322
// read from stdout and send to stdout pipe
323
// NOTE: the stdoutCh channnel is closed further up in StartSteps
324
// once all steps are done.
325
go func() {
326
e.chanMu.RLock()
327
stdoutCh := e.stdoutChans[wid.String()]
328
e.chanMu.RUnlock()
···
339
// read from stderr and send to stderr pipe
340
// NOTE: the stderrCh channnel is closed further up in StartSteps
341
// once all steps are done.
342
go func() {
343
e.chanMu.RLock()
344
stderrCh := e.stderrChans[wid.String()]
345
e.chanMu.RUnlock()
···
352
e.l.Error("failed to scan stderr", "error", err)
353
}
354
}()
355
356
return nil
357
}
···
435
Source: nixVolume(wid),
436
Target: "/nix",
437
},
438
},
439
-
ReadonlyRootfs: true,
440
CapDrop: []string{"ALL"},
441
-
SecurityOpt: []string{"no-new-privileges"},
442
}
443
444
return hostConfig
···
7
"io"
8
"log/slog"
9
"os"
10
"strings"
11
"sync"
12
···
17
"github.com/docker/docker/api/types/volume"
18
"github.com/docker/docker/client"
19
"github.com/docker/docker/pkg/stdcopy"
20
"tangled.sh/tangled.sh/core/log"
21
"tangled.sh/tangled.sh/core/notifier"
22
+
"tangled.sh/tangled.sh/core/spindle/config"
23
"tangled.sh/tangled.sh/core/spindle/db"
24
"tangled.sh/tangled.sh/core/spindle/models"
25
)
···
35
l *slog.Logger
36
db *db.DB
37
n *notifier.Notifier
38
+
cfg *config.Config
39
40
chanMu sync.RWMutex
41
stdoutChans map[string]chan string
···
45
cleanup map[string][]cleanupFunc
46
}
47
48
+
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
49
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
50
if err != nil {
51
return nil, err
···
58
l: l,
59
db: db,
60
n: n,
61
+
cfg: cfg,
62
}
63
64
e.stdoutChans = make(map[string]chan string, 100)
···
69
return e, nil
70
}
71
72
+
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
73
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
74
75
wg := sync.WaitGroup{}
···
94
}
95
defer e.DestroyWorkflow(ctx, wid)
96
97
+
reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
98
if err != nil {
99
e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
100
···
108
defer reader.Close()
109
io.Copy(os.Stdout, reader)
110
111
+
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
112
if err != nil {
113
e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error())
114
115
+
dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
116
+
if dbErr != nil {
117
+
return dbErr
118
}
119
120
return fmt.Errorf("starting steps image: %w", err)
···
176
// StartSteps starts all steps sequentially with the same base image.
177
// ONLY marks pipeline as failed if container's exit code is non-zero.
178
// All other errors are bubbled up.
179
+
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
180
// set up logging channels
181
e.chanMu.Lock()
182
if _, exists := e.stdoutChans[wid.String()]; !exists {
···
248
}
249
250
if state.ExitCode != 0 {
251
+
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
252
+
err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n)
253
+
if err != nil {
254
+
return err
255
+
}
256
+
return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled)
257
}
258
}
259
···
286
Follow: true,
287
ShowStdout: true,
288
ShowStderr: true,
289
+
Details: true,
290
Timestamps: false,
291
})
292
if err != nil {
293
return err
294
}
295
296
+
var devOutput io.Writer = io.Discard
297
+
if e.cfg.Server.Dev {
298
+
devOutput = &ansiStrippingWriter{underlying: os.Stdout}
299
+
}
300
+
301
+
tee := io.TeeReader(logs, devOutput)
302
+
303
// using StdCopy we demux logs and stream stdout and stderr to different
304
// channels.
305
//
···
310
rpipeOut, wpipeOut := io.Pipe()
311
rpipeErr, wpipeErr := io.Pipe()
312
313
+
wg := sync.WaitGroup{}
314
+
315
+
wg.Add(1)
316
go func() {
317
+
defer wg.Done()
318
defer wpipeOut.Close()
319
defer wpipeErr.Close()
320
+
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
321
if err != nil && err != io.EOF {
322
e.l.Error("failed to copy logs", "error", err)
323
}
···
326
// read from stdout and send to stdout pipe
327
// NOTE: the stdoutCh channnel is closed further up in StartSteps
328
// once all steps are done.
329
+
wg.Add(1)
330
go func() {
331
+
defer wg.Done()
332
e.chanMu.RLock()
333
stdoutCh := e.stdoutChans[wid.String()]
334
e.chanMu.RUnlock()
···
345
// read from stderr and send to stderr pipe
346
// NOTE: the stderrCh channnel is closed further up in StartSteps
347
// once all steps are done.
348
+
wg.Add(1)
349
go func() {
350
+
defer wg.Done()
351
e.chanMu.RLock()
352
stderrCh := e.stderrChans[wid.String()]
353
e.chanMu.RUnlock()
···
360
e.l.Error("failed to scan stderr", "error", err)
361
}
362
}()
363
+
364
+
wg.Wait()
365
366
return nil
367
}
···
445
Source: nixVolume(wid),
446
Target: "/nix",
447
},
448
+
{
449
+
Type: mount.TypeTmpfs,
450
+
Target: "/tmp",
451
+
},
452
},
453
+
ReadonlyRootfs: false,
454
CapDrop: []string{"ALL"},
455
+
SecurityOpt: []string{"seccomp=unconfined"},
456
}
457
458
return hostConfig
+4
-8
spindle/engine/envs.go
+4
-8
spindle/engine/envs.go
···
2
3
import (
4
"fmt"
5
-
6
-
"tangled.sh/tangled.sh/core/api/tangled"
7
)
8
9
type EnvVars []string
10
11
// ConstructEnvs converts a tangled.Pipeline_Step_Environment_Elem.{Key,Value}
12
// representation into a docker-friendly []string{"KEY=value", ...} slice.
13
-
func ConstructEnvs(envs []*tangled.Pipeline_Step_Environment_Elem) EnvVars {
14
var dockerEnvs EnvVars
15
-
for _, env := range envs {
16
-
if env != nil {
17
-
ev := fmt.Sprintf("%s=%s", env.Key, env.Value)
18
-
dockerEnvs = append(dockerEnvs, ev)
19
-
}
20
}
21
return dockerEnvs
22
}
···
2
3
import (
4
"fmt"
5
)
6
7
type EnvVars []string
8
9
// ConstructEnvs converts a tangled.Pipeline_Step_Environment_Elem.{Key,Value}
10
// representation into a docker-friendly []string{"KEY=value", ...} slice.
11
+
func ConstructEnvs(envs map[string]string) EnvVars {
12
var dockerEnvs EnvVars
13
+
for k, v := range envs {
14
+
ev := fmt.Sprintf("%s=%s", k, v)
15
+
dockerEnvs = append(dockerEnvs, ev)
16
}
17
return dockerEnvs
18
}
+4
-19
spindle/engine/envs_test.go
+4
-19
spindle/engine/envs_test.go
···
3
import (
4
"reflect"
5
"testing"
6
-
7
-
"tangled.sh/tangled.sh/core/api/tangled"
8
)
9
10
func TestConstructEnvs(t *testing.T) {
11
tests := []struct {
12
name string
13
-
in []*tangled.Pipeline_Step_Environment_Elem
14
want EnvVars
15
}{
16
{
17
name: "empty input",
18
-
in: []*tangled.Pipeline_Step_Environment_Elem{},
19
want: EnvVars{},
20
},
21
{
22
name: "single env var",
23
-
in: []*tangled.Pipeline_Step_Environment_Elem{
24
-
{Key: "FOO", Value: "bar"},
25
-
},
26
want: EnvVars{"FOO=bar"},
27
},
28
{
29
name: "multiple env vars",
30
-
in: []*tangled.Pipeline_Step_Environment_Elem{
31
-
{Key: "FOO", Value: "bar"},
32
-
{Key: "BAZ", Value: "qux"},
33
-
},
34
want: EnvVars{"FOO=bar", "BAZ=qux"},
35
-
},
36
-
{
37
-
name: "nil entries are skipped",
38
-
in: []*tangled.Pipeline_Step_Environment_Elem{
39
-
nil,
40
-
{Key: "FOO", Value: "bar"},
41
-
},
42
-
want: EnvVars{"FOO=bar"},
43
},
44
}
45
···
3
import (
4
"reflect"
5
"testing"
6
)
7
8
func TestConstructEnvs(t *testing.T) {
9
tests := []struct {
10
name string
11
+
in map[string]string
12
want EnvVars
13
}{
14
{
15
name: "empty input",
16
+
in: make(map[string]string),
17
want: EnvVars{},
18
},
19
{
20
name: "single env var",
21
+
in: map[string]string{"FOO": "bar"},
22
want: EnvVars{"FOO=bar"},
23
},
24
{
25
name: "multiple env vars",
26
+
in: map[string]string{"FOO": "bar", "BAZ": "qux"},
27
want: EnvVars{"FOO=bar", "BAZ=qux"},
28
},
29
}
30
+6
-6
spindle/models/pipeline.go
+6
-6
spindle/models/pipeline.go
···
4
"path"
5
6
"tangled.sh/tangled.sh/core/api/tangled"
7
)
8
9
type Pipeline struct {
···
35
// In the process, dependencies are resolved: nixpkgs deps
36
// are constructed atop nixery and set as the Workflow.Image,
37
// and ones from custom registries
38
-
func ToPipeline(pl tangled.Pipeline, dev bool) *Pipeline {
39
workflows := []Workflow{}
40
41
for _, twf := range pl.Workflows {
···
49
}
50
swf.Name = twf.Name
51
swf.Environment = workflowEnvToMap(twf.Environment)
52
-
swf.Image = workflowImage(twf.Dependencies)
53
54
swf.addNixProfileToPath()
55
setup := &setupSteps{}
56
57
-
setup.addStep(cloneStep(*twf, *pl.TriggerMetadata.Repo, dev))
58
setup.addStep(checkoutStep(*twf, *pl.TriggerMetadata))
59
setup.addStep(dependencyStep(*twf))
60
···
82
return envMap
83
}
84
85
-
func workflowImage(deps []tangled.Pipeline_Dependencies_Elem) string {
86
var dependencies string
87
for _, d := range deps {
88
if d.Registry == "nixpkgs" {
···
93
// load defaults from somewhere else
94
dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
95
96
-
// TODO: this should use nixery from the config
97
-
return path.Join("nixery.dev", dependencies)
98
}
99
100
func (wf *Workflow) addNixProfileToPath() {
···
4
"path"
5
6
"tangled.sh/tangled.sh/core/api/tangled"
7
+
"tangled.sh/tangled.sh/core/spindle/config"
8
)
9
10
type Pipeline struct {
···
36
// In the process, dependencies are resolved: nixpkgs deps
37
// are constructed atop nixery and set as the Workflow.Image,
38
// and ones from custom registries
39
+
func ToPipeline(pl tangled.Pipeline, cfg config.Config) *Pipeline {
40
workflows := []Workflow{}
41
42
for _, twf := range pl.Workflows {
···
50
}
51
swf.Name = twf.Name
52
swf.Environment = workflowEnvToMap(twf.Environment)
53
+
swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery)
54
55
swf.addNixProfileToPath()
56
setup := &setupSteps{}
57
58
+
setup.addStep(cloneStep(*twf, *pl.TriggerMetadata.Repo, cfg.Server.Dev))
59
setup.addStep(checkoutStep(*twf, *pl.TriggerMetadata))
60
setup.addStep(dependencyStep(*twf))
61
···
83
return envMap
84
}
85
86
+
func workflowImage(deps []tangled.Pipeline_Dependencies_Elem, nixery string) string {
87
var dependencies string
88
for _, d := range deps {
89
if d.Registry == "nixpkgs" {
···
94
// load defaults from somewhere else
95
dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
96
97
+
return path.Join(nixery, dependencies)
98
}
99
100
func (wf *Workflow) addNixProfileToPath() {
+12
-10
spindle/server.go
+12
-10
spindle/server.go
···
59
60
n := notifier.New()
61
62
-
eng, err := engine.New(ctx, d, &n)
63
if err != nil {
64
return err
65
}
···
153
154
func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
155
if msg.Nsid == tangled.PipelineNSID {
156
-
pipeline := tangled.Pipeline{}
157
-
err := json.Unmarshal(msg.EventJson, &pipeline)
158
if err != nil {
159
fmt.Println("error unmarshalling", err)
160
return err
161
}
162
163
-
if pipeline.TriggerMetadata == nil {
164
return fmt.Errorf("no trigger metadata found")
165
}
166
167
-
if pipeline.TriggerMetadata.Repo == nil {
168
return fmt.Errorf("no repo data found")
169
}
170
171
// filter by repos
172
_, err = s.db.GetRepo(
173
-
pipeline.TriggerMetadata.Repo.Knot,
174
-
pipeline.TriggerMetadata.Repo.Did,
175
-
pipeline.TriggerMetadata.Repo.Repo,
176
)
177
if err != nil {
178
return err
···
183
Rkey: msg.Rkey,
184
}
185
186
-
for _, w := range pipeline.Workflows {
187
if w != nil {
188
err := s.db.StatusPending(models.WorkflowId{
189
PipelineId: pipelineId,
···
195
}
196
}
197
198
ok := s.jq.Enqueue(queue.Job{
199
Run: func() error {
200
-
s.eng.StartWorkflows(ctx, &pipeline, pipelineId)
201
return nil
202
},
203
OnFail: func(jobError error) {
···
59
60
n := notifier.New()
61
62
+
eng, err := engine.New(ctx, cfg, d, &n)
63
if err != nil {
64
return err
65
}
···
153
154
func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
155
if msg.Nsid == tangled.PipelineNSID {
156
+
tpl := tangled.Pipeline{}
157
+
err := json.Unmarshal(msg.EventJson, &tpl)
158
if err != nil {
159
fmt.Println("error unmarshalling", err)
160
return err
161
}
162
163
+
if tpl.TriggerMetadata == nil {
164
return fmt.Errorf("no trigger metadata found")
165
}
166
167
+
if tpl.TriggerMetadata.Repo == nil {
168
return fmt.Errorf("no repo data found")
169
}
170
171
// filter by repos
172
_, err = s.db.GetRepo(
173
+
tpl.TriggerMetadata.Repo.Knot,
174
+
tpl.TriggerMetadata.Repo.Did,
175
+
tpl.TriggerMetadata.Repo.Repo,
176
)
177
if err != nil {
178
return err
···
183
Rkey: msg.Rkey,
184
}
185
186
+
for _, w := range tpl.Workflows {
187
if w != nil {
188
err := s.db.StatusPending(models.WorkflowId{
189
PipelineId: pipelineId,
···
195
}
196
}
197
198
+
spl := models.ToPipeline(tpl, *s.cfg)
199
+
200
ok := s.jq.Enqueue(queue.Job{
201
Run: func() error {
202
+
s.eng.StartWorkflows(ctx, spl, pipelineId)
203
return nil
204
},
205
OnFail: func(jobError error) {