+411
-1
Diff
round #0
+6
-1
spindle/config/config.go
+6
-1
spindle/config/config.go
···
45
45
LogBucket string `env:"LOG_BUCKET"`
46
46
}
47
47
48
+
type DockerPipelines struct {
49
+
WorkflowTimeout string `env:"WORKFLOW_TIMEOUT, default=5m"`
50
+
}
51
+
48
52
type Config struct {
49
53
Server Server `env:",prefix=SPINDLE_SERVER_"`
50
-
NixeryPipelines NixeryPipelines `env:",prefix=SPINDLE_NIXERY_PIPELINES_"`
51
54
S3 S3 `env:",prefix=SPINDLE_S3_"`
55
+
NixeryPipelines NixeryPipelines `env:",prefix=SPINDLE_NIXERY_PIPELINES_"`
56
+
DockerPipelines DockerPipelines `env:",prefix=SPINDLE_DOCKER_PIPELINES_"`
52
57
}
53
58
54
59
func Load(ctx context.Context) (*Config, error) {
+398
spindle/engines/docker/engine.go
+398
spindle/engines/docker/engine.go
···
1
+
package docker
2
+
3
+
import (
4
+
"context"
5
+
"errors"
6
+
"fmt"
7
+
"io"
8
+
"log/slog"
9
+
"os"
10
+
"sync"
11
+
"time"
12
+
13
+
"github.com/docker/docker/api/types/container"
14
+
"github.com/docker/docker/api/types/image"
15
+
"github.com/docker/docker/api/types/mount"
16
+
"github.com/docker/docker/api/types/network"
17
+
"github.com/docker/docker/client"
18
+
"github.com/docker/docker/pkg/stdcopy"
19
+
"gopkg.in/yaml.v3"
20
+
"tangled.org/core/api/tangled"
21
+
"tangled.org/core/log"
22
+
"tangled.org/core/spindle/config"
23
+
"tangled.org/core/spindle/engine"
24
+
"tangled.org/core/spindle/engines/common"
25
+
engineerrors "tangled.org/core/spindle/engines/errors"
26
+
"tangled.org/core/spindle/models"
27
+
"tangled.org/core/spindle/secrets"
28
+
)
29
+
30
+
const (
31
+
workspaceDir = "/tangled/workspace"
32
+
homeDir = "/tangled/home"
33
+
defaultImage = "ubuntu:22.04"
34
+
)
35
+
36
+
type cleanupFunc func(context.Context) error
37
+
38
+
type Engine struct {
39
+
docker client.APIClient
40
+
l *slog.Logger
41
+
cfg *config.Config
42
+
43
+
cleanupMu sync.Mutex
44
+
cleanup map[string][]cleanupFunc
45
+
}
46
+
47
+
type Step struct {
48
+
name string
49
+
kind models.StepKind
50
+
command string
51
+
environment map[string]string
52
+
}
53
+
54
+
func (s Step) Name() string {
55
+
return s.name
56
+
}
57
+
58
+
func (s Step) Command() string {
59
+
return s.command
60
+
}
61
+
62
+
func (s Step) Kind() models.StepKind {
63
+
return s.kind
64
+
}
65
+
66
+
// setupSteps get added to start of Steps
67
+
type setupSteps []models.Step
68
+
69
+
// addStep adds a step to the beginning of the workflow's steps.
70
+
func (ss *setupSteps) addStep(step models.Step) {
71
+
*ss = append(*ss, step)
72
+
}
73
+
74
+
type addlFields struct {
75
+
image string
76
+
container string
77
+
}
78
+
79
+
func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
80
+
swf := &models.Workflow{}
81
+
addl := addlFields{}
82
+
83
+
dwf := &struct {
84
+
Steps []struct {
85
+
Command string `yaml:"command"`
86
+
Name string `yaml:"name"`
87
+
Environment map[string]string `yaml:"environment"`
88
+
} `yaml:"steps"`
89
+
Environment map[string]string `yaml:"environment"`
90
+
Image string `yaml:"image"`
91
+
}{}
92
+
err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
93
+
if err != nil {
94
+
return nil, err
95
+
}
96
+
97
+
for _, dstep := range dwf.Steps {
98
+
sstep := Step{}
99
+
sstep.environment = dstep.Environment
100
+
sstep.command = dstep.Command
101
+
sstep.name = dstep.Name
102
+
sstep.kind = models.StepKindUser
103
+
swf.Steps = append(swf.Steps, sstep)
104
+
}
105
+
swf.Name = twf.Name
106
+
swf.Environment = dwf.Environment
107
+
108
+
// Use specified image or default
109
+
if dwf.Image != "" {
110
+
addl.image = dwf.Image
111
+
} else {
112
+
addl.image = defaultImage
113
+
}
114
+
115
+
setup := &setupSteps{}
116
+
117
+
// Add clone step
118
+
setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
119
+
120
+
// append setup steps in order to the start of workflow steps
121
+
swf.Steps = append(*setup, swf.Steps...)
122
+
swf.Data = addl
123
+
124
+
return swf, nil
125
+
}
126
+
127
+
func (e *Engine) WorkflowTimeout() time.Duration {
128
+
workflowTimeoutStr := e.cfg.DockerPipelines.WorkflowTimeout
129
+
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
130
+
if err != nil {
131
+
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
132
+
workflowTimeout = 5 * time.Minute
133
+
}
134
+
135
+
return workflowTimeout
136
+
}
137
+
138
+
func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
139
+
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
140
+
if err != nil {
141
+
return nil, err
142
+
}
143
+
144
+
l := log.FromContext(ctx).With("component", "spindle-docker")
145
+
146
+
e := &Engine{
147
+
docker: dcli,
148
+
l: l,
149
+
cfg: cfg,
150
+
}
151
+
152
+
e.cleanup = make(map[string][]cleanupFunc)
153
+
154
+
return e, nil
155
+
}
156
+
157
+
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
158
+
e.l.Info("setting up workflow", "workflow", wid)
159
+
160
+
_, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
161
+
Driver: "bridge",
162
+
})
163
+
if err != nil {
164
+
return err
165
+
}
166
+
e.registerCleanup(wid, func(ctx context.Context) error {
167
+
return e.docker.NetworkRemove(ctx, networkName(wid))
168
+
})
169
+
170
+
addl := wf.Data.(addlFields)
171
+
172
+
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
173
+
if err != nil {
174
+
e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
175
+
176
+
return fmt.Errorf("pulling image: %w", err)
177
+
}
178
+
defer reader.Close()
179
+
io.Copy(os.Stdout, reader)
180
+
181
+
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
182
+
Image: addl.image,
183
+
Cmd: []string{"sleep", "infinity"}, // Keep container running
184
+
OpenStdin: true,
185
+
Tty: false,
186
+
Hostname: "spindle",
187
+
WorkingDir: workspaceDir,
188
+
Labels: map[string]string{
189
+
"sh.tangled.pipeline/workflow_id": wid.String(),
190
+
},
191
+
}, &container.HostConfig{
192
+
Mounts: []mount.Mount{
193
+
{
194
+
Type: mount.TypeTmpfs,
195
+
Target: "/tmp",
196
+
ReadOnly: false,
197
+
TmpfsOptions: &mount.TmpfsOptions{
198
+
Mode: 0o1777, // world-writeable sticky bit
199
+
Options: [][]string{
200
+
{"exec"},
201
+
},
202
+
},
203
+
},
204
+
},
205
+
ReadonlyRootfs: false,
206
+
CapDrop: []string{"ALL"},
207
+
CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"},
208
+
SecurityOpt: []string{"no-new-privileges"},
209
+
ExtraHosts: []string{"host.docker.internal:host-gateway"},
210
+
}, nil, nil, "")
211
+
if err != nil {
212
+
return fmt.Errorf("creating container: %w", err)
213
+
}
214
+
e.registerCleanup(wid, func(ctx context.Context) error {
215
+
err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
216
+
if err != nil {
217
+
return err
218
+
}
219
+
220
+
return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
221
+
RemoveVolumes: true,
222
+
RemoveLinks: false,
223
+
Force: false,
224
+
})
225
+
})
226
+
227
+
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
228
+
if err != nil {
229
+
return fmt.Errorf("starting container: %w", err)
230
+
}
231
+
232
+
// Create necessary directories
233
+
mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
234
+
Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
235
+
AttachStdout: true,
236
+
AttachStderr: true,
237
+
})
238
+
if err != nil {
239
+
return err
240
+
}
241
+
242
+
// This actually *starts* the command. Thanks, Docker!
243
+
execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
244
+
if err != nil {
245
+
return err
246
+
}
247
+
defer execResp.Close()
248
+
249
+
// This is apparently best way to wait for the command to complete.
250
+
_, err = io.ReadAll(execResp.Reader)
251
+
if err != nil {
252
+
return err
253
+
}
254
+
255
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
256
+
if err != nil {
257
+
return err
258
+
}
259
+
260
+
if execInspectResp.ExitCode != 0 {
261
+
return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
262
+
} else if execInspectResp.Running {
263
+
return errors.New("mkdir is somehow still running??")
264
+
}
265
+
266
+
addl.container = resp.ID
267
+
wf.Data = addl
268
+
269
+
return nil
270
+
}
271
+
272
+
func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
273
+
addl := w.Data.(addlFields)
274
+
workflowEnvs := common.ConstructEnvs(w.Environment)
275
+
for _, s := range secrets {
276
+
workflowEnvs.AddEnv(s.Key, s.Value)
277
+
}
278
+
279
+
step := w.Steps[idx]
280
+
281
+
select {
282
+
case <-ctx.Done():
283
+
return ctx.Err()
284
+
default:
285
+
}
286
+
287
+
envs := append(common.EnvVars(nil), workflowEnvs...)
288
+
if dockerStep, ok := step.(Step); ok {
289
+
for k, v := range dockerStep.environment {
290
+
envs.AddEnv(k, v)
291
+
}
292
+
}
293
+
envs.AddEnv("HOME", homeDir)
294
+
295
+
mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
296
+
Cmd: []string{"sh", "-c", step.Command()},
297
+
AttachStdout: true,
298
+
AttachStderr: true,
299
+
Env: envs,
300
+
})
301
+
if err != nil {
302
+
return fmt.Errorf("creating exec: %w", err)
303
+
}
304
+
305
+
// start tailing logs in background
306
+
tailDone := make(chan error, 1)
307
+
go func() {
308
+
tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
309
+
}()
310
+
311
+
select {
312
+
case <-tailDone:
313
+
314
+
case <-ctx.Done():
315
+
e.l.Warn("step timed out", "step", step.Name())
316
+
<-tailDone
317
+
return engine.ErrTimedOut
318
+
}
319
+
320
+
select {
321
+
case <-ctx.Done():
322
+
return ctx.Err()
323
+
default:
324
+
}
325
+
326
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
327
+
if err != nil {
328
+
return err
329
+
}
330
+
331
+
if execInspectResp.ExitCode != 0 {
332
+
inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
333
+
if err != nil {
334
+
return err
335
+
}
336
+
337
+
e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
338
+
339
+
if inspectResp.State.OOMKilled {
340
+
return engineerrors.ErrOOMKilled
341
+
}
342
+
return engine.ErrWorkflowFailed
343
+
}
344
+
345
+
return nil
346
+
}
347
+
348
+
func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
349
+
if wfLogger == nil {
350
+
return nil
351
+
}
352
+
353
+
// This actually *starts* the command. Thanks, Docker!
354
+
logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
355
+
if err != nil {
356
+
return err
357
+
}
358
+
defer logs.Close()
359
+
360
+
_, err = stdcopy.StdCopy(
361
+
wfLogger.DataWriter(stepIdx, "stdout"),
362
+
wfLogger.DataWriter(stepIdx, "stderr"),
363
+
logs.Reader,
364
+
)
365
+
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
366
+
return fmt.Errorf("failed to copy logs: %w", err)
367
+
}
368
+
369
+
return nil
370
+
}
371
+
372
+
func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
373
+
e.cleanupMu.Lock()
374
+
key := wid.String()
375
+
376
+
fns := e.cleanup[key]
377
+
delete(e.cleanup, key)
378
+
e.cleanupMu.Unlock()
379
+
380
+
for _, fn := range fns {
381
+
if err := fn(ctx); err != nil {
382
+
e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
383
+
}
384
+
}
385
+
return nil
386
+
}
387
+
388
+
func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
389
+
e.cleanupMu.Lock()
390
+
defer e.cleanupMu.Unlock()
391
+
392
+
key := wid.String()
393
+
e.cleanup[key] = append(e.cleanup[key], fn)
394
+
}
395
+
396
+
func networkName(wid models.WorkflowId) string {
397
+
return fmt.Sprintf("workflow-network-%s", wid)
398
+
}
+7
spindle/server.go
+7
spindle/server.go
···
22
22
"tangled.org/core/spindle/config"
23
23
"tangled.org/core/spindle/db"
24
24
"tangled.org/core/spindle/engine"
25
+
"tangled.org/core/spindle/engines/docker"
25
26
"tangled.org/core/spindle/engines/nixery"
26
27
"tangled.org/core/spindle/models"
27
28
"tangled.org/core/spindle/queue"
···
250
251
return err
251
252
}
252
253
254
+
dockerEng, err := docker.New(ctx, cfg)
255
+
if err != nil {
256
+
return err
257
+
}
258
+
253
259
s, err := New(ctx, cfg, map[string]models.Engine{
254
260
"nixery": nixeryEng,
261
+
"docker": dockerEng,
255
262
})
256
263
if err != nil {
257
264
return err
History
1 round
0 comments
anirudh.fi
submitted
#0
1 commit
expand
collapse
spindle/engines: add docker engine
3/3 failed
expand
collapse
no conflicts, ready to merge