+42
-19
spindle/engine/engine.go
+42
-19
spindle/engine/engine.go
···
11
"sync"
12
"time"
13
14
"github.com/docker/docker/api/types/container"
15
"github.com/docker/docker/api/types/image"
16
"github.com/docker/docker/api/types/mount"
···
18
"github.com/docker/docker/api/types/volume"
19
"github.com/docker/docker/client"
20
"github.com/docker/docker/pkg/stdcopy"
21
"tangled.sh/tangled.sh/core/log"
22
"tangled.sh/tangled.sh/core/notifier"
23
"tangled.sh/tangled.sh/core/spindle/config"
24
"tangled.sh/tangled.sh/core/spindle/db"
25
"tangled.sh/tangled.sh/core/spindle/models"
26
)
27
28
const (
···
37
db *db.DB
38
n *notifier.Notifier
39
cfg *config.Config
40
41
cleanupMu sync.Mutex
42
cleanup map[string][]cleanupFunc
43
}
44
45
-
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
46
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
47
if err != nil {
48
return nil, err
···
56
db: db,
57
n: n,
58
cfg: cfg,
59
}
60
61
e.cleanup = make(map[string][]cleanupFunc)
···
66
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
67
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
68
69
-
wg := sync.WaitGroup{}
70
for _, w := range pipeline.Workflows {
71
-
wg.Add(1)
72
-
go func() error {
73
-
defer wg.Done()
74
wid := models.WorkflowId{
75
PipelineId: pipelineId,
76
Name: w.Name,
···
102
defer reader.Close()
103
io.Copy(os.Stdout, reader)
104
105
-
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
106
-
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
107
-
if err != nil {
108
-
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
109
-
workflowTimeout = 5 * time.Minute
110
-
}
111
-
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
112
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
113
defer cancel()
114
115
-
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
116
if err != nil {
117
if errors.Is(err, ErrTimedOut) {
118
dbErr := e.db.StatusTimeout(wid, e.n)
···
135
}
136
137
return nil
138
-
}()
139
}
140
141
-
wg.Wait()
142
}
143
144
// SetupWorkflow sets up a new network for the workflow and volumes for
···
186
// ONLY marks pipeline as failed if container's exit code is non-zero.
187
// All other errors are bubbled up.
188
// Fixed version of the step execution logic
189
-
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
190
191
-
for stepIdx, step := range steps {
192
select {
193
case <-ctx.Done():
194
return ctx.Err()
195
default:
196
}
197
198
-
envs := ConstructEnvs(step.Environment)
199
envs.AddEnv("HOME", workspaceDir)
200
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
201
202
hostConfig := hostConfig(wid)
203
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
204
-
Image: image,
205
Cmd: []string{"bash", "-c", step.Command},
206
WorkingDir: workspaceDir,
207
Tty: false,
···
11
"sync"
12
"time"
13
14
+
securejoin "github.com/cyphar/filepath-securejoin"
15
"github.com/docker/docker/api/types/container"
16
"github.com/docker/docker/api/types/image"
17
"github.com/docker/docker/api/types/mount"
···
19
"github.com/docker/docker/api/types/volume"
20
"github.com/docker/docker/client"
21
"github.com/docker/docker/pkg/stdcopy"
22
+
"golang.org/x/sync/errgroup"
23
"tangled.sh/tangled.sh/core/log"
24
"tangled.sh/tangled.sh/core/notifier"
25
"tangled.sh/tangled.sh/core/spindle/config"
26
"tangled.sh/tangled.sh/core/spindle/db"
27
"tangled.sh/tangled.sh/core/spindle/models"
28
+
"tangled.sh/tangled.sh/core/spindle/secrets"
29
)
30
31
const (
···
40
db *db.DB
41
n *notifier.Notifier
42
cfg *config.Config
43
+
vault secrets.Manager
44
45
cleanupMu sync.Mutex
46
cleanup map[string][]cleanupFunc
47
}
48
49
+
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier, vault secrets.Manager) (*Engine, error) {
50
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
51
if err != nil {
52
return nil, err
···
60
db: db,
61
n: n,
62
cfg: cfg,
63
+
vault: vault,
64
}
65
66
e.cleanup = make(map[string][]cleanupFunc)
···
71
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
72
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
73
74
+
// extract secrets
75
+
var allSecrets []secrets.UnlockedSecret
76
+
if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
77
+
if res, err := e.vault.GetSecretsUnlocked(secrets.DidSlashRepo(didSlashRepo)); err == nil {
78
+
allSecrets = res
79
+
}
80
+
}
81
+
82
+
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
83
+
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
84
+
if err != nil {
85
+
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
86
+
workflowTimeout = 5 * time.Minute
87
+
}
88
+
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
89
+
90
+
eg, ctx := errgroup.WithContext(ctx)
91
for _, w := range pipeline.Workflows {
92
+
eg.Go(func() error {
93
wid := models.WorkflowId{
94
PipelineId: pipelineId,
95
Name: w.Name,
···
121
defer reader.Close()
122
io.Copy(os.Stdout, reader)
123
124
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
125
defer cancel()
126
127
+
err = e.StartSteps(ctx, wid, w, allSecrets)
128
if err != nil {
129
if errors.Is(err, ErrTimedOut) {
130
dbErr := e.db.StatusTimeout(wid, e.n)
···
147
}
148
149
return nil
150
+
})
151
}
152
153
+
if err = eg.Wait(); err != nil {
154
+
e.l.Error("failed to run one or more workflows", "err", err)
155
+
} else {
156
+
e.l.Error("successfully ran full pipeline")
157
+
}
158
}
159
160
// SetupWorkflow sets up a new network for the workflow and volumes for
···
202
// ONLY marks pipeline as failed if container's exit code is non-zero.
203
// All other errors are bubbled up.
204
// Fixed version of the step execution logic
205
+
func (e *Engine) StartSteps(ctx context.Context, wid models.WorkflowId, w models.Workflow, secrets []secrets.UnlockedSecret) error {
206
+
workflowEnvs := ConstructEnvs(w.Environment)
207
+
for _, s := range secrets {
208
+
workflowEnvs.AddEnv(s.Key, s.Value)
209
+
}
210
211
+
for stepIdx, step := range w.Steps {
212
select {
213
case <-ctx.Done():
214
return ctx.Err()
215
default:
216
}
217
218
+
envs := append(EnvVars(nil), workflowEnvs...)
219
+
for k, v := range step.Environment {
220
+
envs.AddEnv(k, v)
221
+
}
222
envs.AddEnv("HOME", workspaceDir)
223
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
224
225
hostConfig := hostConfig(wid)
226
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
227
+
Image: w.Image,
228
Cmd: []string{"bash", "-c", step.Command},
229
WorkingDir: workspaceDir,
230
Tty: false,
+9
-12
spindle/models/pipeline.go
+9
-12
spindle/models/pipeline.go
···
8
)
9
10
type Pipeline struct {
11
Workflows []Workflow
12
}
13
···
63
swf.Environment = workflowEnvToMap(twf.Environment)
64
swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery)
65
66
-
swf.addNixProfileToPath()
67
-
swf.setGlobalEnvs()
68
setup := &setupSteps{}
69
70
setup.addStep(nixConfStep())
···
79
80
workflows = append(workflows, *swf)
81
}
82
-
return &Pipeline{Workflows: workflows}
83
}
84
85
func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string {
···
115
116
return path.Join(nixery, dependencies)
117
}
118
-
119
-
func (wf *Workflow) addNixProfileToPath() {
120
-
wf.Environment["PATH"] = "$PATH:/.nix-profile/bin"
121
-
}
122
-
123
-
func (wf *Workflow) setGlobalEnvs() {
124
-
wf.Environment["NIX_CONFIG"] = "experimental-features = nix-command flakes"
125
-
wf.Environment["HOME"] = "/tangled/workspace"
126
-
}
···
8
)
9
10
type Pipeline struct {
11
+
RepoOwner string
12
+
RepoName string
13
Workflows []Workflow
14
}
15
···
65
swf.Environment = workflowEnvToMap(twf.Environment)
66
swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery)
67
68
setup := &setupSteps{}
69
70
setup.addStep(nixConfStep())
···
79
80
workflows = append(workflows, *swf)
81
}
82
+
repoOwner := pl.TriggerMetadata.Repo.Did
83
+
repoName := pl.TriggerMetadata.Repo.Repo
84
+
return &Pipeline{
85
+
RepoOwner: repoOwner,
86
+
RepoName: repoName,
87
+
Workflows: workflows,
88
+
}
89
}
90
91
func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string {
···
121
122
return path.Join(nixery, dependencies)
123
}
+3
spindle/models/setup_steps.go
+3
spindle/models/setup_steps.go
+7
-1
spindle/server.go
+7
-1
spindle/server.go
···
68
69
n := notifier.New()
70
71
+
// TODO: add hashicorp vault provider and choose here
72
+
vault, err := secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
73
+
if err != nil {
74
+
return fmt.Errorf("failed to setup secrets provider: %w", err)
75
+
}
76
+
77
+
eng, err := engine.New(ctx, cfg, d, &n, vault)
78
if err != nil {
79
return err
80
}