Monorepo for Tangled
at master 149 lines 4.1 kB view raw
1package engine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "path/filepath" 9 "sync" 10 11 securejoin "github.com/cyphar/filepath-securejoin" 12 "tangled.org/core/notifier" 13 "tangled.org/core/spindle/config" 14 "tangled.org/core/spindle/db" 15 "tangled.org/core/spindle/models" 16 "tangled.org/core/spindle/secrets" 17) 18 19var ( 20 ErrTimedOut = errors.New("timed out") 21 ErrWorkflowFailed = errors.New("workflow failed") 22) 23 24func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 25 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 26 27 // extract secrets 28 var allSecrets []secrets.UnlockedSecret 29 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 30 if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { 31 allSecrets = res 32 } 33 } 34 35 secretValues := make([]string, len(allSecrets)) 36 for i, s := range allSecrets { 37 secretValues[i] = s.Value 38 } 39 40 s3, err := NewS3(cfg.S3.LogBucket) 41 if err != nil { 42 l.Error("error creating s3 client", "err", err) 43 } 44 45 var wg sync.WaitGroup 46 for eng, wfs := range pipeline.Workflows { 47 workflowTimeout := eng.WorkflowTimeout() 48 l.Info("using workflow timeout", "timeout", workflowTimeout) 49 50 for _, w := range wfs { 51 wg.Add(1) 52 go func() { 53 defer wg.Done() 54 55 wid := models.WorkflowId{ 56 PipelineId: pipelineId, 57 Name: w.Name, 58 } 59 60 defer func() { 61 if s3 != nil { 62 logFile := filepath.Join(cfg.Server.LogDir, fmt.Sprintf("%s.log", wid.String())) 63 if err := s3.WriteFile(ctx, logFile); err != nil { 64 l.Error("error uploading logs", "err", err) 65 } 66 } 67 }() 68 69 wfLogger, err := models.NewFileWorkflowLogger(cfg.Server.LogDir, wid, secretValues) 70 if err != nil { 71 l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 72 wfLogger = models.NullLogger{} 73 } else { 74 l.Info("setup step logger; logs will be persisted", "logDir", cfg.Server.LogDir, "wid", wid) 75 defer wfLogger.Close() 76 } 77 78 err = db.StatusRunning(wid, n) 79 if err != nil { 80 l.Error("failed to set workflow status to running", "wid", wid, "err", err) 81 return 82 } 83 84 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger) 85 if err != nil { 86 // TODO(winter): Should this always set StatusFailed? 87 // In the original, we only do in a subset of cases. 88 l.Error("setting up worklow", "wid", wid, "err", err) 89 90 destroyErr := eng.DestroyWorkflow(ctx, wid) 91 if destroyErr != nil { 92 l.Error("failed to destroy workflow after setup failure", "error", destroyErr) 93 } 94 95 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 96 if dbErr != nil { 97 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 98 } 99 return 100 } 101 defer eng.DestroyWorkflow(ctx, wid) 102 103 ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 104 defer cancel() 105 106 for stepIdx, step := range w.Steps { 107 // log start of step 108 if wfLogger != nil { 109 wfLogger. 110 ControlWriter(stepIdx, step, models.StepStatusStart). 111 Write([]byte{0}) 112 } 113 114 err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) 115 116 // log end of step 117 if wfLogger != nil { 118 wfLogger. 119 ControlWriter(stepIdx, step, models.StepStatusEnd). 120 Write([]byte{0}) 121 } 122 123 if err != nil { 124 if errors.Is(err, ErrTimedOut) { 125 dbErr := db.StatusTimeout(wid, n) 126 if dbErr != nil { 127 l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr) 128 } 129 } else { 130 dbErr := db.StatusFailed(wid, err.Error(), -1, n) 131 if dbErr != nil { 132 l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr) 133 } 134 } 135 return 136 } 137 } 138 139 err = db.StatusSuccess(wid, n) 140 if err != nil { 141 l.Error("failed to set workflow status to success", "wid", wid, "err", err) 142 } 143 }() 144 } 145 } 146 147 wg.Wait() 148 l.Info("all workflows completed") 149}