Monorepo for Tangled tangled.org

spindle: pass secrets to engine as env vars

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li b29af2a4 9271522f

verified
Changed files
+61 -32
spindle
+42 -19
spindle/engine/engine.go
··· 11 "sync" 12 "time" 13 14 "github.com/docker/docker/api/types/container" 15 "github.com/docker/docker/api/types/image" 16 "github.com/docker/docker/api/types/mount" ··· 18 "github.com/docker/docker/api/types/volume" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "tangled.sh/tangled.sh/core/log" 22 "tangled.sh/tangled.sh/core/notifier" 23 "tangled.sh/tangled.sh/core/spindle/config" 24 "tangled.sh/tangled.sh/core/spindle/db" 25 "tangled.sh/tangled.sh/core/spindle/models" 26 ) 27 28 const ( ··· 37 db *db.DB 38 n *notifier.Notifier 39 cfg *config.Config 40 41 cleanupMu sync.Mutex 42 cleanup map[string][]cleanupFunc 43 } 44 45 - func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) { 46 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 47 if err != nil { 48 return nil, err ··· 56 db: db, 57 n: n, 58 cfg: cfg, 59 } 60 61 e.cleanup = make(map[string][]cleanupFunc) ··· 66 func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 67 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 68 69 - wg := sync.WaitGroup{} 70 for _, w := range pipeline.Workflows { 71 - wg.Add(1) 72 - go func() error { 73 - defer wg.Done() 74 wid := models.WorkflowId{ 75 PipelineId: pipelineId, 76 Name: w.Name, ··· 102 defer reader.Close() 103 io.Copy(os.Stdout, reader) 104 105 - workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout 106 - workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 107 - if err != nil { 108 - e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 109 - workflowTimeout = 5 * time.Minute 110 - } 111 - e.l.Info("using workflow timeout", "timeout", workflowTimeout) 112 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 113 defer cancel() 114 115 - err = e.StartSteps(ctx, w.Steps, wid, w.Image) 116 if err != nil { 117 if errors.Is(err, ErrTimedOut) { 118 dbErr := e.db.StatusTimeout(wid, e.n) ··· 135 } 136 137 return nil 138 - }() 139 } 140 141 - wg.Wait() 142 } 143 144 // SetupWorkflow sets up a new network for the workflow and volumes for ··· 186 // ONLY marks pipeline as failed if container's exit code is non-zero. 187 // All other errors are bubbled up. 188 // Fixed version of the step execution logic 189 - func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { 190 191 - for stepIdx, step := range steps { 192 select { 193 case <-ctx.Done(): 194 return ctx.Err() 195 default: 196 } 197 198 - envs := ConstructEnvs(step.Environment) 199 envs.AddEnv("HOME", workspaceDir) 200 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 201 202 hostConfig := hostConfig(wid) 203 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 204 - Image: image, 205 Cmd: []string{"bash", "-c", step.Command}, 206 WorkingDir: workspaceDir, 207 Tty: false,
··· 11 "sync" 12 "time" 13 14 + securejoin "github.com/cyphar/filepath-securejoin" 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" ··· 19 "github.com/docker/docker/api/types/volume" 20 "github.com/docker/docker/client" 21 "github.com/docker/docker/pkg/stdcopy" 22 + "golang.org/x/sync/errgroup" 23 "tangled.sh/tangled.sh/core/log" 24 "tangled.sh/tangled.sh/core/notifier" 25 "tangled.sh/tangled.sh/core/spindle/config" 26 "tangled.sh/tangled.sh/core/spindle/db" 27 "tangled.sh/tangled.sh/core/spindle/models" 28 + "tangled.sh/tangled.sh/core/spindle/secrets" 29 ) 30 31 const ( ··· 40 db *db.DB 41 n *notifier.Notifier 42 cfg *config.Config 43 + vault secrets.Manager 44 45 cleanupMu sync.Mutex 46 cleanup map[string][]cleanupFunc 47 } 48 49 + func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier, vault secrets.Manager) (*Engine, error) { 50 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 51 if err != nil { 52 return nil, err ··· 60 db: db, 61 n: n, 62 cfg: cfg, 63 + vault: vault, 64 } 65 66 e.cleanup = make(map[string][]cleanupFunc) ··· 71 func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 72 e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 73 74 + // extract secrets 75 + var allSecrets []secrets.UnlockedSecret 76 + if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 77 + if res, err := e.vault.GetSecretsUnlocked(secrets.DidSlashRepo(didSlashRepo)); err == nil { 78 + allSecrets = res 79 + } 80 + } 81 + 82 + workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout 83 + workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 84 + if err != nil { 85 + e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 86 + workflowTimeout = 5 * time.Minute 87 + } 88 + e.l.Info("using workflow timeout", "timeout", workflowTimeout) 89 + 90 + eg, ctx := errgroup.WithContext(ctx) 91 for _, w := range pipeline.Workflows { 92 + eg.Go(func() error { 93 wid := models.WorkflowId{ 94 PipelineId: pipelineId, 95 Name: w.Name, ··· 121 defer reader.Close() 122 io.Copy(os.Stdout, reader) 123 124 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 125 defer cancel() 126 127 + err = e.StartSteps(ctx, wid, w, allSecrets) 128 if err != nil { 129 if errors.Is(err, ErrTimedOut) { 130 dbErr := e.db.StatusTimeout(wid, e.n) ··· 147 } 148 149 return nil 150 + }) 151 } 152 153 + if err = eg.Wait(); err != nil { 154 + e.l.Error("failed to run one or more workflows", "err", err) 155 + } else { 156 + e.l.Error("successfully ran full pipeline") 157 + } 158 } 159 160 // SetupWorkflow sets up a new network for the workflow and volumes for ··· 202 // ONLY marks pipeline as failed if container's exit code is non-zero. 203 // All other errors are bubbled up. 204 // Fixed version of the step execution logic 205 + func (e *Engine) StartSteps(ctx context.Context, wid models.WorkflowId, w models.Workflow, secrets []secrets.UnlockedSecret) error { 206 + workflowEnvs := ConstructEnvs(w.Environment) 207 + for _, s := range secrets { 208 + workflowEnvs.AddEnv(s.Key, s.Value) 209 + } 210 211 + for stepIdx, step := range w.Steps { 212 select { 213 case <-ctx.Done(): 214 return ctx.Err() 215 default: 216 } 217 218 + envs := append(EnvVars(nil), workflowEnvs...) 219 + for k, v := range step.Environment { 220 + envs.AddEnv(k, v) 221 + } 222 envs.AddEnv("HOME", workspaceDir) 223 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 224 225 hostConfig := hostConfig(wid) 226 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 227 + Image: w.Image, 228 Cmd: []string{"bash", "-c", step.Command}, 229 WorkingDir: workspaceDir, 230 Tty: false,
+9 -12
spindle/models/pipeline.go
··· 8 ) 9 10 type Pipeline struct { 11 Workflows []Workflow 12 } 13 ··· 63 swf.Environment = workflowEnvToMap(twf.Environment) 64 swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery) 65 66 - swf.addNixProfileToPath() 67 - swf.setGlobalEnvs() 68 setup := &setupSteps{} 69 70 setup.addStep(nixConfStep()) ··· 79 80 workflows = append(workflows, *swf) 81 } 82 - return &Pipeline{Workflows: workflows} 83 } 84 85 func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string { ··· 115 116 return path.Join(nixery, dependencies) 117 } 118 - 119 - func (wf *Workflow) addNixProfileToPath() { 120 - wf.Environment["PATH"] = "$PATH:/.nix-profile/bin" 121 - } 122 - 123 - func (wf *Workflow) setGlobalEnvs() { 124 - wf.Environment["NIX_CONFIG"] = "experimental-features = nix-command flakes" 125 - wf.Environment["HOME"] = "/tangled/workspace" 126 - }
··· 8 ) 9 10 type Pipeline struct { 11 + RepoOwner string 12 + RepoName string 13 Workflows []Workflow 14 } 15 ··· 65 swf.Environment = workflowEnvToMap(twf.Environment) 66 swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery) 67 68 setup := &setupSteps{} 69 70 setup.addStep(nixConfStep()) ··· 79 80 workflows = append(workflows, *swf) 81 } 82 + repoOwner := pl.TriggerMetadata.Repo.Did 83 + repoName := pl.TriggerMetadata.Repo.Repo 84 + return &Pipeline{ 85 + RepoOwner: repoOwner, 86 + RepoName: repoName, 87 + Workflows: workflows, 88 + } 89 } 90 91 func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string { ··· 121 122 return path.Join(nixery, dependencies) 123 }
+3
spindle/models/setup_steps.go
··· 102 continue 103 } 104 105 // collect packages from custom registries 106 for _, pkg := range packages { 107 customPackages = append(customPackages, fmt.Sprintf("'%s#%s'", registry, pkg))
··· 102 continue 103 } 104 105 + if len(packages) == 0 { 106 + customPackages = append(customPackages, registry) 107 + } 108 // collect packages from custom registries 109 for _, pkg := range packages { 110 customPackages = append(customPackages, fmt.Sprintf("'%s#%s'", registry, pkg))
+7 -1
spindle/server.go
··· 68 69 n := notifier.New() 70 71 - eng, err := engine.New(ctx, cfg, d, &n) 72 if err != nil { 73 return err 74 }
··· 68 69 n := notifier.New() 70 71 + // TODO: add hashicorp vault provider and choose here 72 + vault, err := secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 73 + if err != nil { 74 + return fmt.Errorf("failed to setup secrets provider: %w", err) 75 + } 76 + 77 + eng, err := engine.New(ctx, cfg, d, &n, vault) 78 if err != nil { 79 return err 80 }