1package engine
2
3import (
4 "context"
5 "errors"
6 "log/slog"
7 "sync"
8
9 securejoin "github.com/cyphar/filepath-securejoin"
10 "tangled.org/core/notifier"
11 "tangled.org/core/spindle/config"
12 "tangled.org/core/spindle/db"
13 "tangled.org/core/spindle/models"
14 "tangled.org/core/spindle/secrets"
15)
16
17var (
18 ErrTimedOut = errors.New("timed out")
19 ErrWorkflowFailed = errors.New("workflow failed")
20)
21
22func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
23 l.Info("starting all workflows in parallel", "pipeline", pipelineId)
24
25 // extract secrets
26 var allSecrets []secrets.UnlockedSecret
27 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
28 if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil {
29 allSecrets = res
30 }
31 }
32
33 secretValues := make([]string, len(allSecrets))
34 for i, s := range allSecrets {
35 secretValues[i] = s.Value
36 }
37
38 var wg sync.WaitGroup
39 for eng, wfs := range pipeline.Workflows {
40 workflowTimeout := eng.WorkflowTimeout()
41 l.Info("using workflow timeout", "timeout", workflowTimeout)
42
43 for _, w := range wfs {
44 wg.Add(1)
45 go func() {
46 defer wg.Done()
47
48 wid := models.WorkflowId{
49 PipelineId: pipelineId,
50 Name: w.Name,
51 }
52
53 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues)
54 if err != nil {
55 l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
56 wfLogger = models.NullLogger{}
57 } else {
58 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid)
59 defer wfLogger.Close()
60 }
61
62 err = db.StatusRunning(wid, n)
63 if err != nil {
64 l.Error("failed to set workflow status to running", "wid", wid, "err", err)
65 return
66 }
67
68 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger)
69 if err != nil {
70 // TODO(winter): Should this always set StatusFailed?
71 // In the original, we only do in a subset of cases.
72 l.Error("setting up worklow", "wid", wid, "err", err)
73
74 destroyErr := eng.DestroyWorkflow(ctx, wid)
75 if destroyErr != nil {
76 l.Error("failed to destroy workflow after setup failure", "error", destroyErr)
77 }
78
79 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
80 if dbErr != nil {
81 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
82 }
83 return
84 }
85 defer eng.DestroyWorkflow(ctx, wid)
86
87 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
88 defer cancel()
89
90 for stepIdx, step := range w.Steps {
91 // log start of step
92 if wfLogger != nil {
93 wfLogger.
94 ControlWriter(stepIdx, step, models.StepStatusStart).
95 Write([]byte{0})
96 }
97
98 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
99
100 // log end of step
101 if wfLogger != nil {
102 wfLogger.
103 ControlWriter(stepIdx, step, models.StepStatusEnd).
104 Write([]byte{0})
105 }
106
107 if err != nil {
108 if errors.Is(err, ErrTimedOut) {
109 dbErr := db.StatusTimeout(wid, n)
110 if dbErr != nil {
111 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr)
112 }
113 } else {
114 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
115 if dbErr != nil {
116 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
117 }
118 }
119 return
120 }
121 }
122
123 err = db.StatusSuccess(wid, n)
124 if err != nil {
125 l.Error("failed to set workflow status to success", "wid", wid, "err", err)
126 }
127 }()
128 }
129 }
130
131 wg.Wait()
132 l.Info("all workflows completed")
133}