Monorepo for Tangled
tangled.org
1package pipelines
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview/config"
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/middleware"
17 "tangled.org/core/appview/models"
18 "tangled.org/core/appview/oauth"
19 "tangled.org/core/appview/pages"
20 "tangled.org/core/appview/reporesolver"
21 "tangled.org/core/eventconsumer"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25 spindlemodel "tangled.org/core/spindle/models"
26
27 "github.com/go-chi/chi/v5"
28 "github.com/gorilla/websocket"
29)
30
31type Pipelines struct {
32 repoResolver *reporesolver.RepoResolver
33 idResolver *idresolver.Resolver
34 config *config.Config
35 oauth *oauth.OAuth
36 pages *pages.Pages
37 spindlestream *eventconsumer.Consumer
38 db *db.DB
39 enforcer *rbac.Enforcer
40 logger *slog.Logger
41}
42
43func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler {
44 r := chi.NewRouter()
45 r.Get("/", p.Index)
46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow)
47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs)
48 r.
49 With(mw.RepoPermissionMiddleware("repo:owner")).
50 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel)
51
52 return r
53}
54
55func New(
56 oauth *oauth.OAuth,
57 repoResolver *reporesolver.RepoResolver,
58 pages *pages.Pages,
59 spindlestream *eventconsumer.Consumer,
60 idResolver *idresolver.Resolver,
61 db *db.DB,
62 config *config.Config,
63 enforcer *rbac.Enforcer,
64 logger *slog.Logger,
65) *Pipelines {
66 return &Pipelines{
67 oauth: oauth,
68 repoResolver: repoResolver,
69 pages: pages,
70 idResolver: idResolver,
71 config: config,
72 spindlestream: spindlestream,
73 db: db,
74 enforcer: enforcer,
75 logger: logger,
76 }
77}
78
79func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) {
80 user := p.oauth.GetUser(r)
81 l := p.logger.With("handler", "Index")
82
83 f, err := p.repoResolver.Resolve(r)
84 if err != nil {
85 l.Error("failed to get repo and knot", "err", err)
86 return
87 }
88
89 ps, err := db.GetPipelineStatuses(
90 p.db,
91 30,
92 orm.FilterEq("repo_owner", f.Did),
93 orm.FilterEq("repo_name", f.Name),
94 orm.FilterEq("knot", f.Knot),
95 )
96 if err != nil {
97 l.Error("failed to query db", "err", err)
98 return
99 }
100
101 p.pages.Pipelines(w, pages.PipelinesParams{
102 LoggedInUser: user,
103 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
104 Pipelines: ps,
105 })
106}
107
108func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) {
109 user := p.oauth.GetUser(r)
110 l := p.logger.With("handler", "Workflow")
111
112 f, err := p.repoResolver.Resolve(r)
113 if err != nil {
114 l.Error("failed to get repo and knot", "err", err)
115 return
116 }
117
118 pipelineId := chi.URLParam(r, "pipeline")
119 if pipelineId == "" {
120 l.Error("empty pipeline ID")
121 return
122 }
123
124 workflow := chi.URLParam(r, "workflow")
125 if workflow == "" {
126 l.Error("empty workflow name")
127 return
128 }
129
130 ps, err := db.GetPipelineStatuses(
131 p.db,
132 1,
133 orm.FilterEq("repo_owner", f.Did),
134 orm.FilterEq("repo_name", f.Name),
135 orm.FilterEq("knot", f.Knot),
136 orm.FilterEq("id", pipelineId),
137 )
138 if err != nil {
139 l.Error("failed to query db", "err", err)
140 return
141 }
142
143 if len(ps) != 1 {
144 l.Error("invalid number of pipelines", "len", len(ps))
145 return
146 }
147
148 singlePipeline := ps[0]
149
150 p.pages.Workflow(w, pages.WorkflowParams{
151 LoggedInUser: user,
152 RepoInfo: p.repoResolver.GetRepoInfo(r, user),
153 Pipeline: singlePipeline,
154 Workflow: workflow,
155 })
156}
157
158var upgrader = websocket.Upgrader{
159 ReadBufferSize: 1024,
160 WriteBufferSize: 1024,
161}
162
163func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) {
164 l := p.logger.With("handler", "logs")
165
166 clientConn, err := upgrader.Upgrade(w, r, nil)
167 if err != nil {
168 l.Error("websocket upgrade failed", "err", err)
169 return
170 }
171 defer func() {
172 _ = clientConn.WriteControl(
173 websocket.CloseMessage,
174 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
175 time.Now().Add(time.Second),
176 )
177 clientConn.Close()
178 }()
179
180 ctx, cancel := context.WithCancel(r.Context())
181 defer cancel()
182
183 f, err := p.repoResolver.Resolve(r)
184 if err != nil {
185 l.Error("failed to get repo and knot", "err", err)
186 http.Error(w, "bad repo/knot", http.StatusBadRequest)
187 return
188 }
189
190 pipelineId := chi.URLParam(r, "pipeline")
191 workflow := chi.URLParam(r, "workflow")
192 if pipelineId == "" || workflow == "" {
193 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
194 return
195 }
196
197 ps, err := db.GetPipelineStatuses(
198 p.db,
199 1,
200 orm.FilterEq("repo_owner", f.Did),
201 orm.FilterEq("repo_name", f.Name),
202 orm.FilterEq("knot", f.Knot),
203 orm.FilterEq("id", pipelineId),
204 )
205 if err != nil || len(ps) != 1 {
206 l.Error("pipeline query failed", "err", err, "count", len(ps))
207 http.Error(w, "pipeline not found", http.StatusNotFound)
208 return
209 }
210
211 singlePipeline := ps[0]
212 spindle := f.Spindle
213 knot := f.Knot
214 rkey := singlePipeline.Rkey
215
216 if spindle == "" || knot == "" || rkey == "" {
217 http.Error(w, "invalid repo info", http.StatusBadRequest)
218 return
219 }
220
221 scheme := "wss"
222 if p.config.Core.Dev {
223 scheme = "ws"
224 }
225
226 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/")
227 l = l.With("url", url)
228 l.Info("logs endpoint hit")
229
230 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil)
231 if err != nil {
232 l.Error("websocket dial failed", "err", err)
233 http.Error(w, "failed to connect to log stream", http.StatusBadGateway)
234 return
235 }
236 defer spindleConn.Close()
237
238 // create a channel for incoming messages
239 evChan := make(chan logEvent, 100)
240 // start a goroutine to read from spindle
241 go readLogs(spindleConn, evChan)
242
243 stepStartTimes := make(map[int]time.Time)
244 var fragment bytes.Buffer
245 for {
246 select {
247 case <-ctx.Done():
248 l.Info("client disconnected")
249 return
250
251 case ev, ok := <-evChan:
252 if !ok {
253 continue
254 }
255
256 if ev.err != nil && ev.isCloseError() {
257 l.Debug("graceful shutdown, tail complete", "err", err)
258 return
259 }
260 if ev.err != nil {
261 l.Error("error reading from spindle", "err", err)
262 return
263 }
264
265 var logLine spindlemodel.LogLine
266 if err = json.Unmarshal(ev.msg, &logLine); err != nil {
267 l.Error("failed to parse logline", "err", err)
268 continue
269 }
270
271 fragment.Reset()
272
273 switch logLine.Kind {
274 case spindlemodel.LogKindControl:
275 switch logLine.StepStatus {
276 case spindlemodel.StepStatusStart:
277 stepStartTimes[logLine.StepId] = logLine.Time
278 collapsed := false
279 if logLine.StepKind == spindlemodel.StepKindSystem {
280 collapsed = true
281 }
282 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{
283 Id: logLine.StepId,
284 Name: logLine.Content,
285 Command: logLine.StepCommand,
286 Collapsed: collapsed,
287 StartTime: logLine.Time,
288 })
289 case spindlemodel.StepStatusEnd:
290 startTime := stepStartTimes[logLine.StepId]
291 endTime := logLine.Time
292 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{
293 Id: logLine.StepId,
294 StartTime: startTime,
295 EndTime: endTime,
296 })
297 }
298
299 case spindlemodel.LogKindData:
300 // data messages simply insert new log lines into current step
301 err = p.pages.LogLine(&fragment, pages.LogLineParams{
302 Id: logLine.StepId,
303 Content: logLine.Content,
304 })
305 }
306 if err != nil {
307 l.Error("failed to render log line", "err", err)
308 return
309 }
310
311 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil {
312 l.Error("error writing to client", "err", err)
313 return
314 }
315
316 case <-time.After(30 * time.Second):
317 l.Debug("sent keepalive")
318 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
319 l.Error("failed to write control", "err", err)
320 return
321 }
322 }
323 }
324}
325
326func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) {
327 l := p.logger.With("handler", "Cancel")
328
329 var (
330 pipelineId = chi.URLParam(r, "pipeline")
331 workflow = chi.URLParam(r, "workflow")
332 )
333 if pipelineId == "" || workflow == "" {
334 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest)
335 return
336 }
337
338 f, err := p.repoResolver.Resolve(r)
339 if err != nil {
340 l.Error("failed to get repo and knot", "err", err)
341 http.Error(w, "bad repo/knot", http.StatusBadRequest)
342 return
343 }
344
345 pipeline, err := func() (models.Pipeline, error) {
346 ps, err := db.GetPipelineStatuses(
347 p.db,
348 1,
349 orm.FilterEq("repo_owner", f.Did),
350 orm.FilterEq("repo_name", f.Name),
351 orm.FilterEq("knot", f.Knot),
352 orm.FilterEq("id", pipelineId),
353 )
354 if err != nil {
355 return models.Pipeline{}, err
356 }
357 if len(ps) != 1 {
358 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps))
359 }
360 return ps[0], nil
361 }()
362 if err != nil {
363 l.Error("pipeline query failed", "err", err)
364 http.Error(w, "pipeline not found", http.StatusNotFound)
365 }
366 var (
367 spindle = f.Spindle
368 knot = f.Knot
369 rkey = pipeline.Rkey
370 )
371
372 if spindle == "" || knot == "" || rkey == "" {
373 http.Error(w, "invalid repo info", http.StatusBadRequest)
374 return
375 }
376
377 spindleClient, err := p.oauth.ServiceClient(
378 r,
379 oauth.WithService(f.Spindle),
380 oauth.WithLxm(tangled.PipelineCancelPipelineNSID),
381 oauth.WithDev(p.config.Core.Dev),
382 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time
383 )
384
385 err = tangled.PipelineCancelPipeline(
386 r.Context(),
387 spindleClient,
388 &tangled.PipelineCancelPipeline_Input{
389 Repo: string(f.RepoAt()),
390 Pipeline: pipeline.AtUri().String(),
391 Workflow: workflow,
392 },
393 )
394 err = fmt.Errorf("boo! new error")
395 errorId := "workflow-error"
396 if err != nil {
397 l.Error("failed to cancel workflow", "err", err)
398 p.pages.Notice(w, errorId, "Failed to cancel workflow")
399 return
400 }
401 l.Debug("canceled pipeline", "uri", pipeline.AtUri())
402}
403
404// either a message or an error
405type logEvent struct {
406 msg []byte
407 err error
408}
409
410func (ev *logEvent) isCloseError() bool {
411 return websocket.IsCloseError(
412 ev.err,
413 websocket.CloseNormalClosure,
414 websocket.CloseGoingAway,
415 websocket.CloseAbnormalClosure,
416 )
417}
418
419// read logs from spindle and pass through to chan
420func readLogs(conn *websocket.Conn, ch chan logEvent) {
421 defer close(ch)
422
423 for {
424 if conn == nil {
425 return
426 }
427
428 _, msg, err := conn.ReadMessage()
429 if err != nil {
430 ch <- logEvent{err: err}
431 return
432 }
433 ch <- logEvent{msg: msg}
434 }
435}