forked from
tangled.org/core
Monorepo for Tangled
1package engine
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "path/filepath"
9 "sync"
10
11 securejoin "github.com/cyphar/filepath-securejoin"
12 "tangled.org/core/notifier"
13 "tangled.org/core/spindle/config"
14 "tangled.org/core/spindle/db"
15 "tangled.org/core/spindle/models"
16 "tangled.org/core/spindle/secrets"
17)
18
19var (
20 ErrTimedOut = errors.New("timed out")
21 ErrWorkflowFailed = errors.New("workflow failed")
22)
23
24func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
25 l.Info("starting all workflows in parallel", "pipeline", pipelineId)
26
27 // extract secrets
28 var allSecrets []secrets.UnlockedSecret
29 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil {
30 if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil {
31 allSecrets = res
32 }
33 }
34
35 secretValues := make([]string, len(allSecrets))
36 for i, s := range allSecrets {
37 secretValues[i] = s.Value
38 }
39
40 s3, err := NewS3(cfg.S3.LogBucket)
41 if err != nil {
42 l.Error("error creating s3 client", "err", err)
43 }
44
45 var wg sync.WaitGroup
46 for eng, wfs := range pipeline.Workflows {
47 workflowTimeout := eng.WorkflowTimeout()
48 l.Info("using workflow timeout", "timeout", workflowTimeout)
49
50 for _, w := range wfs {
51 wg.Add(1)
52 go func() {
53 defer wg.Done()
54
55 wid := models.WorkflowId{
56 PipelineId: pipelineId,
57 Name: w.Name,
58 }
59
60 defer func() {
61 if s3 != nil {
62 logFile := filepath.Join(cfg.Server.LogDir, fmt.Sprintf("%s.log", wid.String()))
63 if err := s3.WriteFile(ctx, logFile); err != nil {
64 l.Error("error uploading logs", "err", err)
65 }
66 }
67 }()
68
69 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues)
70 if err != nil {
71 l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
72 wfLogger = models.NullLogger{}
73 } else {
74 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid)
75 defer wfLogger.Close()
76 }
77
78 err = db.StatusRunning(wid, n)
79 if err != nil {
80 l.Error("failed to set workflow status to running", "wid", wid, "err", err)
81 return
82 }
83
84 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger)
85 if err != nil {
86 // TODO(winter): Should this always set StatusFailed?
87 // In the original, we only do in a subset of cases.
88 l.Error("setting up worklow", "wid", wid, "err", err)
89
90 destroyErr := eng.DestroyWorkflow(ctx, wid)
91 if destroyErr != nil {
92 l.Error("failed to destroy workflow after setup failure", "error", destroyErr)
93 }
94
95 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
96 if dbErr != nil {
97 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
98 }
99 return
100 }
101 defer eng.DestroyWorkflow(ctx, wid)
102
103 ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
104 defer cancel()
105
106 for stepIdx, step := range w.Steps {
107 // log start of step
108 if wfLogger != nil {
109 wfLogger.
110 ControlWriter(stepIdx, step, models.StepStatusStart).
111 Write([]byte{0})
112 }
113
114 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger)
115
116 // log end of step
117 if wfLogger != nil {
118 wfLogger.
119 ControlWriter(stepIdx, step, models.StepStatusEnd).
120 Write([]byte{0})
121 }
122
123 if err != nil {
124 if errors.Is(err, ErrTimedOut) {
125 dbErr := db.StatusTimeout(wid, n)
126 if dbErr != nil {
127 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr)
128 }
129 } else {
130 dbErr := db.StatusFailed(wid, err.Error(), -1, n)
131 if dbErr != nil {
132 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
133 }
134 }
135 return
136 }
137 }
138
139 err = db.StatusSuccess(wid, n)
140 if err != nil {
141 l.Error("failed to set workflow status to success", "wid", wid, "err", err)
142 }
143 }()
144 }
145 }
146
147 wg.Wait()
148 l.Info("all workflows completed")
149}