forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 133 lines 3.7 kB view raw
1package engine 2 3import ( 4 "context" 5 "errors" 6 "log/slog" 7 "sync" 8 9 securejoin "github.com/cyphar/filepath-securejoin" 10 "tangled.org/core/notifier" 11 "tangled.org/core/spindle/config" 12 "tangled.org/core/spindle/db" 13 "tangled.org/core/spindle/models" 14 "tangled.org/core/spindle/secrets" 15) 16 17var ( 18 ErrTimedOut = errors.New("timed out") 19 ErrWorkflowFailed = errors.New("workflow failed") 20) 21 22func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 23 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 24 25 // extract secrets 26 var allSecrets []secrets.UnlockedSecret 27 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 28 if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { 29 allSecrets = res 30 } 31 } 32 33 secretValues := make([]string, len(allSecrets)) 34 for i, s := range allSecrets { 35 secretValues[i] = s.Value 36 } 37 38 var wg sync.WaitGroup 39 for eng, wfs := range pipeline.Workflows { 40 workflowTimeout := eng.WorkflowTimeout() 41 l.Info("using workflow timeout", "timeout", workflowTimeout) 42 43 for _, w := range wfs { 44 wg.Add(1) 45 go func() { 46 defer wg.Done() 47 48 wid := models.WorkflowId{ 49 PipelineId: pipelineId, 50 Name: w.Name, 51 } 52 53 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues) 54 if err != nil { 55 l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 56 wfLogger = models.NullLogger{} 57 } else { 58 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid) 59 defer wfLogger.Close() 60 } 61 62 err = db.StatusRunning(wid, n) 63 if err != nil { 64 l.Error("failed to set workflow status to running", "wid", wid, "err", err) 65 return 66 } 67 68 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger) 69 if err != nil { 70 // TODO(winter): Should this always set StatusFailed? 71 // In the original, we only do in a subset of cases. 72 l.Error("setting up worklow", "wid", wid, "err", err) 73 74 destroyErr := eng.DestroyWorkflow(ctx, wid) 75 if destroyErr != nil { 76 l.Error("failed to destroy workflow after setup failure", "error", destroyErr) 77 } 78 79 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 80 if dbErr != nil { 81 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 82 } 83 return 84 } 85 defer eng.DestroyWorkflow(ctx, wid) 86 87 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 88 defer cancel() 89 90 for stepIdx, step := range w.Steps { 91 // log start of step 92 if wfLogger != nil { 93 wfLogger. 94 ControlWriter(stepIdx, step, models.StepStatusStart). 95 Write([]byte{0}) 96 } 97 98 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) 99 100 // log end of step 101 if wfLogger != nil { 102 wfLogger. 103 ControlWriter(stepIdx, step, models.StepStatusEnd). 104 Write([]byte{0}) 105 } 106 107 if err != nil { 108 if errors.Is(err, ErrTimedOut) { 109 dbErr := db.StatusTimeout(wid, n) 110 if dbErr != nil { 111 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr) 112 } 113 } else { 114 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 115 if dbErr != nil { 116 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 117 } 118 } 119 return 120 } 121 } 122 123 err = db.StatusSuccess(wid, n) 124 if err != nil { 125 l.Error("failed to set workflow status to success", "wid", wid, "err", err) 126 } 127 }() 128 } 129 } 130 131 wg.Wait() 132 l.Info("all workflows completed") 133}