Signed-off-by: oppiliappan me@oppi.li
+42
-19
spindle/engine/engine.go
+42
-19
spindle/engine/engine.go
···
11
11
"sync"
12
12
"time"
13
13
14
+
securejoin "github.com/cyphar/filepath-securejoin"
14
15
"github.com/docker/docker/api/types/container"
15
16
"github.com/docker/docker/api/types/image"
16
17
"github.com/docker/docker/api/types/mount"
···
18
19
"github.com/docker/docker/api/types/volume"
19
20
"github.com/docker/docker/client"
20
21
"github.com/docker/docker/pkg/stdcopy"
22
+
"golang.org/x/sync/errgroup"
21
23
"tangled.sh/tangled.sh/core/log"
22
24
"tangled.sh/tangled.sh/core/notifier"
23
25
"tangled.sh/tangled.sh/core/spindle/config"
24
26
"tangled.sh/tangled.sh/core/spindle/db"
25
27
"tangled.sh/tangled.sh/core/spindle/models"
28
+
"tangled.sh/tangled.sh/core/spindle/secrets"
26
29
)
27
30
28
31
const (
···
37
40
db *db.DB
38
41
n *notifier.Notifier
39
42
cfg *config.Config
43
+
vault secrets.Manager
40
44
41
45
cleanupMu sync.Mutex
42
46
cleanup map[string][]cleanupFunc
43
47
}
44
48
45
-
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
49
+
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier, vault secrets.Manager) (*Engine, error) {
46
50
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
47
51
if err != nil {
48
52
return nil, err
···
56
60
db: db,
57
61
n: n,
58
62
cfg: cfg,
63
+
vault: vault,
59
64
}
60
65
61
66
e.cleanup = make(map[string][]cleanupFunc)
···
66
71
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
67
72
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
68
73
69
-
wg := sync.WaitGroup{}
74
+
// extract secrets
75
+
var allSecrets []secrets.UnlockedSecret
76
+
if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
77
+
if res, err := e.vault.GetSecretsUnlocked(secrets.DidSlashRepo(didSlashRepo)); err == nil {
78
+
allSecrets = res
79
+
}
80
+
}
81
+
82
+
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
83
+
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
84
+
if err != nil {
85
+
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
86
+
workflowTimeout = 5 * time.Minute
87
+
}
88
+
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
89
+
90
+
eg, ctx := errgroup.WithContext(ctx)
70
91
for _, w := range pipeline.Workflows {
71
-
wg.Add(1)
72
-
go func() error {
73
-
defer wg.Done()
92
+
eg.Go(func() error {
74
93
wid := models.WorkflowId{
75
94
PipelineId: pipelineId,
76
95
Name: w.Name,
···
102
121
defer reader.Close()
103
122
io.Copy(os.Stdout, reader)
104
123
105
-
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
106
-
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
107
-
if err != nil {
108
-
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
109
-
workflowTimeout = 5 * time.Minute
110
-
}
111
-
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
112
124
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
113
125
defer cancel()
114
126
115
-
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
127
+
err = e.StartSteps(ctx, wid, w, allSecrets)
116
128
if err != nil {
117
129
if errors.Is(err, ErrTimedOut) {
118
130
dbErr := e.db.StatusTimeout(wid, e.n)
···
135
147
}
136
148
137
149
return nil
138
-
}()
150
+
})
139
151
}
140
152
141
-
wg.Wait()
153
+
if err = eg.Wait(); err != nil {
154
+
e.l.Error("failed to run one or more workflows", "err", err)
155
+
} else {
156
+
e.l.Error("successfully ran full pipeline")
157
+
}
142
158
}
143
159
144
160
// SetupWorkflow sets up a new network for the workflow and volumes for
···
186
202
// ONLY marks pipeline as failed if container's exit code is non-zero.
187
203
// All other errors are bubbled up.
188
204
// Fixed version of the step execution logic
189
-
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
205
+
func (e *Engine) StartSteps(ctx context.Context, wid models.WorkflowId, w models.Workflow, secrets []secrets.UnlockedSecret) error {
206
+
workflowEnvs := ConstructEnvs(w.Environment)
207
+
for _, s := range secrets {
208
+
workflowEnvs.AddEnv(s.Key, s.Value)
209
+
}
190
210
191
-
for stepIdx, step := range steps {
211
+
for stepIdx, step := range w.Steps {
192
212
select {
193
213
case <-ctx.Done():
194
214
return ctx.Err()
195
215
default:
196
216
}
197
217
198
-
envs := ConstructEnvs(step.Environment)
218
+
envs := append(EnvVars(nil), workflowEnvs...)
219
+
for k, v := range step.Environment {
220
+
envs.AddEnv(k, v)
221
+
}
199
222
envs.AddEnv("HOME", workspaceDir)
200
223
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
201
224
202
225
hostConfig := hostConfig(wid)
203
226
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
204
-
Image: image,
227
+
Image: w.Image,
205
228
Cmd: []string{"bash", "-c", step.Command},
206
229
WorkingDir: workspaceDir,
207
230
Tty: false,
+9
-12
spindle/models/pipeline.go
+9
-12
spindle/models/pipeline.go
···
8
8
)
9
9
10
10
type Pipeline struct {
11
+
RepoOwner string
12
+
RepoName string
11
13
Workflows []Workflow
12
14
}
13
15
···
63
65
swf.Environment = workflowEnvToMap(twf.Environment)
64
66
swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery)
65
67
66
-
swf.addNixProfileToPath()
67
-
swf.setGlobalEnvs()
68
68
setup := &setupSteps{}
69
69
70
70
setup.addStep(nixConfStep())
···
79
79
80
80
workflows = append(workflows, *swf)
81
81
}
82
-
return &Pipeline{Workflows: workflows}
82
+
repoOwner := pl.TriggerMetadata.Repo.Did
83
+
repoName := pl.TriggerMetadata.Repo.Repo
84
+
return &Pipeline{
85
+
RepoOwner: repoOwner,
86
+
RepoName: repoName,
87
+
Workflows: workflows,
88
+
}
83
89
}
84
90
85
91
func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string {
···
115
121
116
122
return path.Join(nixery, dependencies)
117
123
}
118
-
119
-
func (wf *Workflow) addNixProfileToPath() {
120
-
wf.Environment["PATH"] = "$PATH:/.nix-profile/bin"
121
-
}
122
-
123
-
func (wf *Workflow) setGlobalEnvs() {
124
-
wf.Environment["NIX_CONFIG"] = "experimental-features = nix-command flakes"
125
-
wf.Environment["HOME"] = "/tangled/workspace"
126
-
}
+3
spindle/models/setup_steps.go
+3
spindle/models/setup_steps.go
···
102
102
continue
103
103
}
104
104
105
+
if len(packages) == 0 {
106
+
customPackages = append(customPackages, registry)
107
+
}
105
108
// collect packages from custom registries
106
109
for _, pkg := range packages {
107
110
customPackages = append(customPackages, fmt.Sprintf("'%s#%s'", registry, pkg))
+7
-1
spindle/server.go
+7
-1
spindle/server.go
···
68
68
69
69
n := notifier.New()
70
70
71
-
eng, err := engine.New(ctx, cfg, d, &n)
71
+
// TODO: add hashicorp vault provider and choose here
72
+
vault, err := secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
73
+
if err != nil {
74
+
return fmt.Errorf("failed to setup secrets provider: %w", err)
75
+
}
76
+
77
+
eng, err := engine.New(ctx, cfg, d, &n, vault)
72
78
if err != nil {
73
79
return err
74
80
}