+125
-45
spindle/engine/engine.go
+125
-45
spindle/engine/engine.go
···
8
"log/slog"
9
"os"
10
"path"
11
"sync"
12
13
"github.com/docker/docker/api/types/container"
14
"github.com/docker/docker/api/types/image"
···
28
workspaceDir = "/tangled/workspace"
29
)
30
31
type Engine struct {
32
docker client.APIClient
33
l *slog.Logger
···
37
chanMu sync.RWMutex
38
stdoutChans map[string]chan string
39
stderrChans map[string]chan string
40
}
41
42
func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
···
57
e.stdoutChans = make(map[string]chan string, 100)
58
e.stderrChans = make(map[string]chan string, 100)
59
60
return e, nil
61
}
62
63
-
// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc.
64
-
// in the future. In here also goes other setup steps.
65
-
func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error {
66
-
e.l.Info("setting up pipeline", "pipeline", id)
67
-
68
-
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
69
-
Name: workspaceVolume(id),
70
-
Driver: "local",
71
-
})
72
-
if err != nil {
73
-
return err
74
-
}
75
-
76
-
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
77
-
Name: nixVolume(id),
78
-
Driver: "local",
79
-
})
80
-
if err != nil {
81
-
return err
82
-
}
83
-
84
-
_, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{
85
-
Driver: "bridge",
86
-
})
87
-
if err != nil {
88
-
return err
89
-
}
90
-
91
-
err = e.db.CreatePipeline(id, atUri, e.n)
92
-
return err
93
-
}
94
-
95
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
96
e.l.Info("starting all workflows in parallel", "pipeline", id)
97
···
103
g := errgroup.Group{}
104
for _, w := range pipeline.Workflows {
105
g.Go(func() error {
106
// TODO: actual checks for image/registry etc.
107
var deps string
108
for _, d := range w.Dependencies {
···
127
defer reader.Close()
128
io.Copy(os.Stdout, reader)
129
130
-
err = e.StartSteps(ctx, w.Steps, id, cimg)
131
if err != nil {
132
e.l.Error("pipeline failed!", "id", id, "error", err.Error())
133
return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
···
147
return e.db.MarkPipelineSuccess(id, e.n)
148
}
149
150
// StartSteps starts all steps sequentially with the same base image.
151
// ONLY marks pipeline as failed if container's exit code is non-zero.
152
// All other errors are bubbled up.
153
-
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error {
154
// set up logging channels
155
e.chanMu.Lock()
156
if _, exists := e.stdoutChans[id]; !exists {
···
168
}()
169
170
for _, step := range steps {
171
-
hostConfig := hostConfig(id)
172
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
173
Image: image,
174
Cmd: []string{"bash", "-c", step.Command},
···
181
return fmt.Errorf("creating container: %w", err)
182
}
183
184
-
err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil)
185
if err != nil {
186
return fmt.Errorf("connecting network: %w", err)
187
}
···
208
wg.Wait()
209
210
state, err := e.WaitStep(ctx, resp.ID)
211
if err != nil {
212
return err
213
}
···
310
return nil
311
}
312
313
func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) {
314
e.chanMu.RLock()
315
defer e.chanMu.RUnlock()
···
323
return stdoutCh, stderrCh, true
324
}
325
326
-
func workspaceVolume(id string) string {
327
-
return "workspace-" + id
328
}
329
330
-
func nixVolume(id string) string {
331
-
return "nix-" + id
332
}
333
334
-
func pipelineName(id string) string {
335
-
return "pipeline-" + id
336
}
337
338
-
func hostConfig(id string) *container.HostConfig {
339
hostConfig := &container.HostConfig{
340
Mounts: []mount.Mount{
341
{
342
Type: mount.TypeVolume,
343
-
Source: workspaceVolume(id),
344
Target: workspaceDir,
345
},
346
{
347
Type: mount.TypeVolume,
348
-
Source: nixVolume(id),
349
Target: "/nix",
350
},
351
},
···
356
357
return hostConfig
358
}
···
8
"log/slog"
9
"os"
10
"path"
11
+
"strings"
12
"sync"
13
+
"syscall"
14
15
"github.com/docker/docker/api/types/container"
16
"github.com/docker/docker/api/types/image"
···
30
workspaceDir = "/tangled/workspace"
31
)
32
33
+
type cleanupFunc func(context.Context) error
34
+
35
type Engine struct {
36
docker client.APIClient
37
l *slog.Logger
···
41
chanMu sync.RWMutex
42
stdoutChans map[string]chan string
43
stderrChans map[string]chan string
44
+
45
+
cleanupMu sync.Mutex
46
+
cleanup map[string][]cleanupFunc
47
}
48
49
func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
···
64
e.stdoutChans = make(map[string]chan string, 100)
65
e.stderrChans = make(map[string]chan string, 100)
66
67
+
e.cleanup = make(map[string][]cleanupFunc)
68
+
69
return e, nil
70
}
71
72
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
73
e.l.Info("starting all workflows in parallel", "pipeline", id)
74
···
80
g := errgroup.Group{}
81
for _, w := range pipeline.Workflows {
82
g.Go(func() error {
83
+
err := e.SetupWorkflow(ctx, id, w.Name)
84
+
if err != nil {
85
+
return err
86
+
}
87
+
88
+
defer e.DestroyWorkflow(ctx, id, w.Name)
89
+
90
// TODO: actual checks for image/registry etc.
91
var deps string
92
for _, d := range w.Dependencies {
···
111
defer reader.Close()
112
io.Copy(os.Stdout, reader)
113
114
+
err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg)
115
if err != nil {
116
e.l.Error("pipeline failed!", "id", id, "error", err.Error())
117
return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
···
131
return e.db.MarkPipelineSuccess(id, e.n)
132
}
133
134
+
// SetupWorkflow sets up a new network for the workflow and volumes for
135
+
// the workspace and Nix store. These are persisted across steps and are
136
+
// destroyed at the end of the workflow.
137
+
func (e *Engine) SetupWorkflow(ctx context.Context, id, workflowName string) error {
138
+
e.l.Info("setting up workflow", "pipeline", id, "workflow", workflowName)
139
+
140
+
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
141
+
Name: workspaceVolume(id, workflowName),
142
+
Driver: "local",
143
+
})
144
+
if err != nil {
145
+
return err
146
+
}
147
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
148
+
return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflowName), true)
149
+
})
150
+
151
+
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
152
+
Name: nixVolume(id, workflowName),
153
+
Driver: "local",
154
+
})
155
+
if err != nil {
156
+
return err
157
+
}
158
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
159
+
return e.docker.VolumeRemove(ctx, nixVolume(id, workflowName), true)
160
+
})
161
+
162
+
_, err = e.docker.NetworkCreate(ctx, networkName(id, workflowName), network.CreateOptions{
163
+
Driver: "bridge",
164
+
})
165
+
if err != nil {
166
+
return err
167
+
}
168
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
169
+
return e.docker.NetworkRemove(ctx, networkName(id, workflowName))
170
+
})
171
+
172
+
return nil
173
+
}
174
+
175
// StartSteps starts all steps sequentially with the same base image.
176
// ONLY marks pipeline as failed if container's exit code is non-zero.
177
// All other errors are bubbled up.
178
+
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error {
179
// set up logging channels
180
e.chanMu.Lock()
181
if _, exists := e.stdoutChans[id]; !exists {
···
193
}()
194
195
for _, step := range steps {
196
+
hostConfig := hostConfig(id, workflowName)
197
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
198
Image: image,
199
Cmd: []string{"bash", "-c", step.Command},
···
206
return fmt.Errorf("creating container: %w", err)
207
}
208
209
+
err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil)
210
if err != nil {
211
return fmt.Errorf("connecting network: %w", err)
212
}
···
233
wg.Wait()
234
235
state, err := e.WaitStep(ctx, resp.ID)
236
+
if err != nil {
237
+
return err
238
+
}
239
+
240
+
err = e.DestroyStep(ctx, resp.ID, id)
241
if err != nil {
242
return err
243
}
···
340
return nil
341
}
342
343
+
func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error {
344
+
err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String())
345
+
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
346
+
return err
347
+
}
348
+
349
+
if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
350
+
RemoveVolumes: true,
351
+
RemoveLinks: false,
352
+
Force: false,
353
+
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
354
+
return err
355
+
}
356
+
357
+
return nil
358
+
}
359
+
360
+
func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error {
361
+
e.cleanupMu.Lock()
362
+
key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
363
+
364
+
fns := e.cleanup[key]
365
+
delete(e.cleanup, key)
366
+
e.cleanupMu.Unlock()
367
+
368
+
for _, fn := range fns {
369
+
if err := fn(ctx); err != nil {
370
+
e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err)
371
+
}
372
+
}
373
+
return nil
374
+
}
375
+
376
func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) {
377
e.chanMu.RLock()
378
defer e.chanMu.RUnlock()
···
386
return stdoutCh, stderrCh, true
387
}
388
389
+
func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) {
390
+
e.cleanupMu.Lock()
391
+
defer e.cleanupMu.Unlock()
392
+
393
+
key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
394
+
e.cleanup[key] = append(e.cleanup[key], fn)
395
}
396
397
+
func workspaceVolume(id, name string) string {
398
+
return fmt.Sprintf("workspace-%s-%s", id, name)
399
}
400
401
+
func nixVolume(id, name string) string {
402
+
return fmt.Sprintf("nix-%s-%s", id, name)
403
+
}
404
+
405
+
func networkName(id, name string) string {
406
+
return fmt.Sprintf("workflow-network-%s-%s", id, name)
407
}
408
409
+
func hostConfig(id, name string) *container.HostConfig {
410
hostConfig := &container.HostConfig{
411
Mounts: []mount.Mount{
412
{
413
Type: mount.TypeVolume,
414
+
Source: workspaceVolume(id, name),
415
Target: workspaceDir,
416
},
417
{
418
Type: mount.TypeVolume,
419
+
Source: nixVolume(id, name),
420
Target: "/nix",
421
},
422
},
···
427
428
return hostConfig
429
}
430
+
431
+
// thanks woodpecker
432
+
func isErrContainerNotFoundOrNotRunning(err error) bool {
433
+
// Error response from daemon: Cannot kill container: ...: No such container: ...
434
+
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
435
+
// Error response from podman daemon: can only kill running containers. ... is in state exited
436
+
// Error: No such container: ...
437
+
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
438
+
}
+4
-2
spindle/server.go
+4
-2
spindle/server.go
···
122
pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey)
123
124
rkey := TID()
125
-
err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey)
126
if err != nil {
127
return err
128
}
129
return s.eng.StartWorkflows(ctx, &pipeline, rkey)
130
},
131
OnFail: func(error) {
132
-
s.l.Error("pipeline setup failed", "error", err)
133
},
134
})
135
if ok {
···
122
pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey)
123
124
rkey := TID()
125
+
126
+
err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n)
127
if err != nil {
128
return err
129
}
130
+
131
return s.eng.StartWorkflows(ctx, &pipeline, rkey)
132
},
133
OnFail: func(error) {
134
+
s.l.Error("pipeline run failed", "error", err)
135
},
136
})
137
if ok {