forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at spindle-log-err 138 lines 3.8 kB view raw
1package engine 2 3import ( 4 "context" 5 "errors" 6 "log/slog" 7 "sync" 8 9 securejoin "github.com/cyphar/filepath-securejoin" 10 "tangled.org/core/notifier" 11 "tangled.org/core/spindle/config" 12 "tangled.org/core/spindle/db" 13 "tangled.org/core/spindle/models" 14 "tangled.org/core/spindle/secrets" 15) 16 17var ( 18 ErrTimedOut = errors.New("timed out") 19 ErrWorkflowFailed = errors.New("workflow failed") 20) 21 22func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 23 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 24 25 // extract secrets 26 var allSecrets []secrets.UnlockedSecret 27 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 28 if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { 29 allSecrets = res 30 } 31 } 32 33 var wg sync.WaitGroup 34 for eng, wfs := range pipeline.Workflows { 35 workflowTimeout := eng.WorkflowTimeout() 36 l.Info("using workflow timeout", "timeout", workflowTimeout) 37 38 for _, w := range wfs { 39 wg.Add(1) 40 go func() { 41 defer wg.Done() 42 43 wid := models.WorkflowId{ 44 PipelineId: pipelineId, 45 Name: w.Name, 46 } 47 48 err := db.StatusRunning(wid, n) 49 if err != nil { 50 l.Error("failed to set workflow status to running", "wid", wid, "err", err) 51 return 52 } 53 54 wfLogger, err := models.NewWorkflowLogger(cfg.Server.LogDir, wid) 55 if err != nil { 56 l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 57 wfLogger = nil 58 } 59 defer func() { 60 if wfLogger != nil { 61 wfLogger.Close() 62 } 63 }() 64 65 err = eng.SetupWorkflow(ctx, wid, &w) 66 if err != nil { 67 // TODO(winter): Should this always set StatusFailed? 68 // In the original, we only do in a subset of cases. 69 l.Error("setting up worklow", "wid", wid, "err", err) 70 71 destroyErr := eng.DestroyWorkflow(ctx, wid) 72 if destroyErr != nil { 73 l.Error("failed to destroy workflow after setup failure", "error", destroyErr) 74 } 75 76 // Parse error to extract user-friendly message 77 friendlyErr := models.ParseSetupError(err) 78 79 // Write error to log file so it shows in UI 80 if wfLogger != nil { 81 wfLogger.WriteSetupError(friendlyErr) 82 } 83 84 dbErr := db.StatusFailed(wid, friendlyErr, -1, n) 85 if dbErr != nil { 86 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 87 } 88 return 89 } 90 defer eng.DestroyWorkflow(ctx, wid) 91 92 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 93 defer cancel() 94 95 for stepIdx, step := range w.Steps { 96 // log start of step 97 if wfLogger != nil { 98 wfLogger. 99 ControlWriter(stepIdx, step, models.StepStatusStart). 100 Write([]byte{0}) 101 } 102 103 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) 104 105 // log end of step 106 if wfLogger != nil { 107 wfLogger. 108 ControlWriter(stepIdx, step, models.StepStatusEnd). 109 Write([]byte{0}) 110 } 111 112 if err != nil { 113 if errors.Is(err, ErrTimedOut) { 114 dbErr := db.StatusTimeout(wid, n) 115 if dbErr != nil { 116 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr) 117 } 118 } else { 119 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 120 if dbErr != nil { 121 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 122 } 123 } 124 return 125 } 126 } 127 128 err = db.StatusSuccess(wid, n) 129 if err != nil { 130 l.Error("failed to set workflow status to success", "wid", wid, "err", err) 131 } 132 }() 133 } 134 } 135 136 wg.Wait() 137 l.Info("all workflows completed") 138}