1package engine
2
3import (
4 "context"
5 "errors"
6 "log/slog"
7 "sync"
8
9 securejoin "github.com/cyphar/filepath-securejoin"
10 "tangled.org/core/notifier"
11 "tangled.org/core/spindle/config"
12 "tangled.org/core/spindle/db"
13 "tangled.org/core/spindle/models"
14 "tangled.org/core/spindle/secrets"
15)
16
17var (
18 ErrTimedOut = errors.New("timed out")
19 ErrWorkflowFailed = errors.New("workflow failed")
20)
21
22func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
23 l.Info("starting all workflows in parallel", "pipeline", pipelineId)
24
25 // extract secrets
26 var allSecrets []secrets.UnlockedSecret
27 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
28 if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil {
29 allSecrets = res
30 }
31 }
32
33 var wg sync.WaitGroup
34 for eng, wfs := range pipeline.Workflows {
35 workflowTimeout := eng.WorkflowTimeout()
36 l.Info("using workflow timeout", "timeout", workflowTimeout)
37
38 for _, w := range wfs {
39 wg.Add(1)
40 go func() {
41 defer wg.Done()
42
43 wid := models.WorkflowId{
44 PipelineId: pipelineId,
45 Name: w.Name,
46 }
47
48 err := db.StatusRunning(wid, n)
49 if err != nil {
50 l.Error("failed to set workflow status to running", "wid", wid, "err", err)
51 return
52 }
53
54 wfLogger, err := models.NewWorkflowLogger(cfg.Server.LogDir, wid)
55 if err != nil {
56 l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
57 wfLogger = nil
58 }
59 defer func() {
60 if wfLogger != nil {
61 wfLogger.Close()
62 }
63 }()
64
65 err = eng.SetupWorkflow(ctx, wid, &w)
66 if err != nil {
67 // TODO(winter): Should this always set StatusFailed?
68 // In the original, we only do in a subset of cases.
69 l.Error("setting up worklow", "wid", wid, "err", err)
70
71 destroyErr := eng.DestroyWorkflow(ctx, wid)
72 if destroyErr != nil {
73 l.Error("failed to destroy workflow after setup failure", "error", destroyErr)
74 }
75
76 // Parse error to extract user-friendly message
77 friendlyErr := models.ParseSetupError(err)
78
79 // Write error to log file so it shows in UI
80 if wfLogger != nil {
81 wfLogger.WriteSetupError(friendlyErr)
82 }
83
84 dbErr := db.StatusFailed(wid, friendlyErr, -1, n)
85 if dbErr != nil {
86 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
87 }
88 return
89 }
90 defer eng.DestroyWorkflow(ctx, wid)
91
92 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
93 defer cancel()
94
95 for stepIdx, step := range w.Steps {
96 // log start of step
97 if wfLogger != nil {
98 wfLogger.
99 ControlWriter(stepIdx, step, models.StepStatusStart).
100 Write([]byte{0})
101 }
102
103 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
104
105 // log end of step
106 if wfLogger != nil {
107 wfLogger.
108 ControlWriter(stepIdx, step, models.StepStatusEnd).
109 Write([]byte{0})
110 }
111
112 if err != nil {
113 if errors.Is(err, ErrTimedOut) {
114 dbErr := db.StatusTimeout(wid, n)
115 if dbErr != nil {
116 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr)
117 }
118 } else {
119 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
120 if dbErr != nil {
121 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
122 }
123 }
124 return
125 }
126 }
127
128 err = db.StatusSuccess(wid, n)
129 if err != nil {
130 l.Error("failed to set workflow status to success", "wid", wid, "err", err)
131 }
132 }()
133 }
134 }
135
136 wg.Wait()
137 l.Info("all workflows completed")
138}