forked from tangled.org/core
Monorepo for Tangled

spindle/{db,engine}: propagate a notifier

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

anirudh.fi 55bf8204 8dbf6ee8

verified
Changed files
+12 -10
spindle
+11 -9
spindle/engine/engine.go
··· 18 "github.com/docker/docker/pkg/stdcopy" 19 "golang.org/x/sync/errgroup" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 "tangled.sh/tangled.sh/core/log" 22 "tangled.sh/tangled.sh/core/spindle/db" 23 ) ··· 30 docker client.APIClient 31 l *slog.Logger 32 db *db.DB 33 } 34 35 - func New(ctx context.Context, db *db.DB) (*Engine, error) { 36 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 37 if err != nil { 38 return nil, err ··· 40 41 l := log.FromContext(ctx).With("component", "spindle") 42 43 - return &Engine{docker: dcli, l: l, db: db}, nil 44 } 45 46 // SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. ··· 71 return err 72 } 73 74 - err = e.db.CreatePipeline(id) 75 return err 76 } 77 78 func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 79 e.l.Info("starting all workflows in parallel", "pipeline", id) 80 81 - err := e.db.MarkPipelineRunning(id) 82 if err != nil { 83 return err 84 } ··· 101 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 102 if err != nil { 103 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 104 - err := e.db.MarkPipelineFailed(id, -1, err.Error()) 105 if err != nil { 106 return err 107 } ··· 113 err = e.StartSteps(ctx, w.Steps, id, cimg) 114 if err != nil { 115 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 116 - return e.db.MarkPipelineFailed(id, -1, err.Error()) 117 } 118 119 return nil ··· 123 err = g.Wait() 124 if err != nil { 125 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 126 - return e.db.MarkPipelineFailed(id, -1, err.Error()) 127 } 128 129 e.l.Info("pipeline success!", "id", id) 130 - return e.db.MarkPipelineSuccess(id) 131 } 132 133 // StartSteps starts all steps sequentially with the same base image. ··· 181 182 if state.ExitCode != 0 { 183 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode) 184 - return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error) 185 } 186 } 187
··· 18 "github.com/docker/docker/pkg/stdcopy" 19 "golang.org/x/sync/errgroup" 20 "tangled.sh/tangled.sh/core/api/tangled" 21 + "tangled.sh/tangled.sh/core/knotserver/notifier" 22 "tangled.sh/tangled.sh/core/log" 23 "tangled.sh/tangled.sh/core/spindle/db" 24 ) ··· 31 docker client.APIClient 32 l *slog.Logger 33 db *db.DB 34 + n *notifier.Notifier 35 } 36 37 + func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { 38 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 39 if err != nil { 40 return nil, err ··· 42 43 l := log.FromContext(ctx).With("component", "spindle") 44 45 + return &Engine{docker: dcli, l: l, db: db, n: n}, nil 46 } 47 48 // SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. ··· 73 return err 74 } 75 76 + err = e.db.CreatePipeline(id, e.n) 77 return err 78 } 79 80 func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { 81 e.l.Info("starting all workflows in parallel", "pipeline", id) 82 83 + err := e.db.MarkPipelineRunning(id, e.n) 84 if err != nil { 85 return err 86 } ··· 103 reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) 104 if err != nil { 105 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 106 + err := e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 107 if err != nil { 108 return err 109 } ··· 115 err = e.StartSteps(ctx, w.Steps, id, cimg) 116 if err != nil { 117 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 118 + return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 119 } 120 121 return nil ··· 125 err = g.Wait() 126 if err != nil { 127 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 128 + return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) 129 } 130 131 e.l.Info("pipeline success!", "id", id) 132 + return e.db.MarkPipelineSuccess(id, e.n) 133 } 134 135 // StartSteps starts all steps sequentially with the same base image. ··· 183 184 if state.ExitCode != 0 { 185 e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode) 186 + return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) 187 } 188 } 189
+1 -1
spindle/server.go
··· 53 54 n := notifier.New() 55 56 - eng, err := engine.New(ctx, d) 57 if err != nil { 58 return err 59 }
··· 53 54 n := notifier.New() 55 56 + eng, err := engine.New(ctx, d, &n) 57 if err != nil { 58 return err 59 }