+17
-20
spindle/engine/engine.go
+17
-20
spindle/engine/engine.go
···
3
import (
4
"context"
5
"errors"
6
-
"fmt"
7
"log/slog"
8
9
securejoin "github.com/cyphar/filepath-securejoin"
10
-
"golang.org/x/sync/errgroup"
11
"tangled.org/core/notifier"
12
"tangled.org/core/spindle/config"
13
"tangled.org/core/spindle/db"
···
31
}
32
}
33
34
-
eg, ctx := errgroup.WithContext(ctx)
35
for eng, wfs := range pipeline.Workflows {
36
workflowTimeout := eng.WorkflowTimeout()
37
l.Info("using workflow timeout", "timeout", workflowTimeout)
38
39
for _, w := range wfs {
40
-
eg.Go(func() error {
41
wid := models.WorkflowId{
42
PipelineId: pipelineId,
43
Name: w.Name,
···
45
46
err := db.StatusRunning(wid, n)
47
if err != nil {
48
-
return err
49
}
50
51
err = eng.SetupWorkflow(ctx, wid, &w)
···
61
62
dbErr := db.StatusFailed(wid, err.Error(), -1, n)
63
if dbErr != nil {
64
-
return dbErr
65
}
66
-
return err
67
}
68
defer eng.DestroyWorkflow(ctx, wid)
69
···
99
if errors.Is(err, ErrTimedOut) {
100
dbErr := db.StatusTimeout(wid, n)
101
if dbErr != nil {
102
-
return dbErr
103
}
104
} else {
105
dbErr := db.StatusFailed(wid, err.Error(), -1, n)
106
if dbErr != nil {
107
-
return dbErr
108
}
109
}
110
-
111
-
return fmt.Errorf("starting steps image: %w", err)
112
}
113
}
114
115
err = db.StatusSuccess(wid, n)
116
if err != nil {
117
-
return err
118
}
119
-
120
-
return nil
121
-
})
122
}
123
}
124
125
-
if err := eg.Wait(); err != nil {
126
-
l.Error("failed to run one or more workflows", "err", err)
127
-
} else {
128
-
l.Info("successfully ran full pipeline")
129
-
}
130
}
···
3
import (
4
"context"
5
"errors"
6
"log/slog"
7
+
"sync"
8
9
securejoin "github.com/cyphar/filepath-securejoin"
10
"tangled.org/core/notifier"
11
"tangled.org/core/spindle/config"
12
"tangled.org/core/spindle/db"
···
30
}
31
}
32
33
+
var wg sync.WaitGroup
34
for eng, wfs := range pipeline.Workflows {
35
workflowTimeout := eng.WorkflowTimeout()
36
l.Info("using workflow timeout", "timeout", workflowTimeout)
37
38
for _, w := range wfs {
39
+
wg.Add(1)
40
+
go func() {
41
+
defer wg.Done()
42
+
43
wid := models.WorkflowId{
44
PipelineId: pipelineId,
45
Name: w.Name,
···
47
48
err := db.StatusRunning(wid, n)
49
if err != nil {
50
+
l.Error("failed to set workflow status to running", "wid", wid, "err", err)
51
+
return
52
}
53
54
err = eng.SetupWorkflow(ctx, wid, &w)
···
64
65
dbErr := db.StatusFailed(wid, err.Error(), -1, n)
66
if dbErr != nil {
67
+
l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
68
}
69
+
return
70
}
71
defer eng.DestroyWorkflow(ctx, wid)
72
···
102
if errors.Is(err, ErrTimedOut) {
103
dbErr := db.StatusTimeout(wid, n)
104
if dbErr != nil {
105
+
l.Error("failed to set workflow status to timeout", "wid", wid, "err", dbErr)
106
}
107
} else {
108
dbErr := db.StatusFailed(wid, err.Error(), -1, n)
109
if dbErr != nil {
110
+
l.Error("failed to set workflow status to failed", "wid", wid, "err", dbErr)
111
}
112
}
113
+
return
114
}
115
}
116
117
err = db.StatusSuccess(wid, n)
118
if err != nil {
119
+
l.Error("failed to set workflow status to success", "wid", wid, "err", err)
120
}
121
+
}()
122
}
123
}
124
125
+
wg.Wait()
126
+
l.Info("all workflows completed")
127
}