Monorepo for Tangled tangled.org
1package pipelines 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/middleware" 17 "tangled.org/core/appview/models" 18 "tangled.org/core/appview/oauth" 19 "tangled.org/core/appview/pages" 20 "tangled.org/core/appview/reporesolver" 21 "tangled.org/core/eventconsumer" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25 spindlemodel "tangled.org/core/spindle/models" 26 27 "github.com/go-chi/chi/v5" 28 "github.com/gorilla/websocket" 29) 30 31type Pipelines struct { 32 repoResolver *reporesolver.RepoResolver 33 idResolver *idresolver.Resolver 34 config *config.Config 35 oauth *oauth.OAuth 36 pages *pages.Pages 37 spindlestream *eventconsumer.Consumer 38 db *db.DB 39 enforcer *rbac.Enforcer 40 logger *slog.Logger 41} 42 43func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { 44 r := chi.NewRouter() 45 r.Get("/", p.Index) 46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 48 r. 49 With(mw.RepoPermissionMiddleware("repo:owner")). 50 Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 51 52 return r 53} 54 55func New( 56 oauth *oauth.OAuth, 57 repoResolver *reporesolver.RepoResolver, 58 pages *pages.Pages, 59 spindlestream *eventconsumer.Consumer, 60 idResolver *idresolver.Resolver, 61 db *db.DB, 62 config *config.Config, 63 enforcer *rbac.Enforcer, 64 logger *slog.Logger, 65) *Pipelines { 66 return &Pipelines{ 67 oauth: oauth, 68 repoResolver: repoResolver, 69 pages: pages, 70 idResolver: idResolver, 71 config: config, 72 spindlestream: spindlestream, 73 db: db, 74 enforcer: enforcer, 75 logger: logger, 76 } 77} 78 79func (p *Pipelines) Index(w http.ResponseWriter, r *http.Request) { 80 user := p.oauth.GetUser(r) 81 l := p.logger.With("handler", "Index") 82 83 f, err := p.repoResolver.Resolve(r) 84 if err != nil { 85 l.Error("failed to get repo and knot", "err", err) 86 return 87 } 88 89 ps, err := db.GetPipelineStatuses( 90 p.db, 91 30, 92 orm.FilterEq("repo_owner", f.Did), 93 orm.FilterEq("repo_name", f.Name), 94 orm.FilterEq("knot", f.Knot), 95 ) 96 if err != nil { 97 l.Error("failed to query db", "err", err) 98 return 99 } 100 101 p.pages.Pipelines(w, pages.PipelinesParams{ 102 LoggedInUser: user, 103 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 104 Pipelines: ps, 105 }) 106} 107 108func (p *Pipelines) Workflow(w http.ResponseWriter, r *http.Request) { 109 user := p.oauth.GetUser(r) 110 l := p.logger.With("handler", "Workflow") 111 112 f, err := p.repoResolver.Resolve(r) 113 if err != nil { 114 l.Error("failed to get repo and knot", "err", err) 115 return 116 } 117 118 pipelineId := chi.URLParam(r, "pipeline") 119 if pipelineId == "" { 120 l.Error("empty pipeline ID") 121 return 122 } 123 124 workflow := chi.URLParam(r, "workflow") 125 if workflow == "" { 126 l.Error("empty workflow name") 127 return 128 } 129 130 ps, err := db.GetPipelineStatuses( 131 p.db, 132 1, 133 orm.FilterEq("repo_owner", f.Did), 134 orm.FilterEq("repo_name", f.Name), 135 orm.FilterEq("knot", f.Knot), 136 orm.FilterEq("id", pipelineId), 137 ) 138 if err != nil { 139 l.Error("failed to query db", "err", err) 140 return 141 } 142 143 if len(ps) != 1 { 144 l.Error("invalid number of pipelines", "len", len(ps)) 145 return 146 } 147 148 singlePipeline := ps[0] 149 150 p.pages.Workflow(w, pages.WorkflowParams{ 151 LoggedInUser: user, 152 RepoInfo: p.repoResolver.GetRepoInfo(r, user), 153 Pipeline: singlePipeline, 154 Workflow: workflow, 155 }) 156} 157 158var upgrader = websocket.Upgrader{ 159 ReadBufferSize: 1024, 160 WriteBufferSize: 1024, 161} 162 163func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { 164 l := p.logger.With("handler", "logs") 165 166 clientConn, err := upgrader.Upgrade(w, r, nil) 167 if err != nil { 168 l.Error("websocket upgrade failed", "err", err) 169 return 170 } 171 defer func() { 172 _ = clientConn.WriteControl( 173 websocket.CloseMessage, 174 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"), 175 time.Now().Add(time.Second), 176 ) 177 clientConn.Close() 178 }() 179 180 ctx, cancel := context.WithCancel(r.Context()) 181 defer cancel() 182 183 f, err := p.repoResolver.Resolve(r) 184 if err != nil { 185 l.Error("failed to get repo and knot", "err", err) 186 http.Error(w, "bad repo/knot", http.StatusBadRequest) 187 return 188 } 189 190 pipelineId := chi.URLParam(r, "pipeline") 191 workflow := chi.URLParam(r, "workflow") 192 if pipelineId == "" || workflow == "" { 193 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 194 return 195 } 196 197 ps, err := db.GetPipelineStatuses( 198 p.db, 199 1, 200 orm.FilterEq("repo_owner", f.Did), 201 orm.FilterEq("repo_name", f.Name), 202 orm.FilterEq("knot", f.Knot), 203 orm.FilterEq("id", pipelineId), 204 ) 205 if err != nil || len(ps) != 1 { 206 l.Error("pipeline query failed", "err", err, "count", len(ps)) 207 http.Error(w, "pipeline not found", http.StatusNotFound) 208 return 209 } 210 211 singlePipeline := ps[0] 212 spindle := f.Spindle 213 knot := f.Knot 214 rkey := singlePipeline.Rkey 215 216 if spindle == "" || knot == "" || rkey == "" { 217 http.Error(w, "invalid repo info", http.StatusBadRequest) 218 return 219 } 220 221 scheme := "wss" 222 if p.config.Core.Dev { 223 scheme = "ws" 224 } 225 226 url := scheme + "://" + strings.Join([]string{spindle, "logs", knot, rkey, workflow}, "/") 227 l = l.With("url", url) 228 l.Info("logs endpoint hit") 229 230 spindleConn, _, err := websocket.DefaultDialer.Dial(url, nil) 231 if err != nil { 232 l.Error("websocket dial failed", "err", err) 233 http.Error(w, "failed to connect to log stream", http.StatusBadGateway) 234 return 235 } 236 defer spindleConn.Close() 237 238 // create a channel for incoming messages 239 evChan := make(chan logEvent, 100) 240 // start a goroutine to read from spindle 241 go readLogs(spindleConn, evChan) 242 243 stepStartTimes := make(map[int]time.Time) 244 var fragment bytes.Buffer 245 for { 246 select { 247 case <-ctx.Done(): 248 l.Info("client disconnected") 249 return 250 251 case ev, ok := <-evChan: 252 if !ok { 253 continue 254 } 255 256 if ev.err != nil && ev.isCloseError() { 257 l.Debug("graceful shutdown, tail complete", "err", err) 258 return 259 } 260 if ev.err != nil { 261 l.Error("error reading from spindle", "err", err) 262 return 263 } 264 265 var logLine spindlemodel.LogLine 266 if err = json.Unmarshal(ev.msg, &logLine); err != nil { 267 l.Error("failed to parse logline", "err", err) 268 continue 269 } 270 271 fragment.Reset() 272 273 switch logLine.Kind { 274 case spindlemodel.LogKindControl: 275 switch logLine.StepStatus { 276 case spindlemodel.StepStatusStart: 277 stepStartTimes[logLine.StepId] = logLine.Time 278 collapsed := false 279 if logLine.StepKind == spindlemodel.StepKindSystem { 280 collapsed = true 281 } 282 err = p.pages.LogBlock(&fragment, pages.LogBlockParams{ 283 Id: logLine.StepId, 284 Name: logLine.Content, 285 Command: logLine.StepCommand, 286 Collapsed: collapsed, 287 StartTime: logLine.Time, 288 }) 289 case spindlemodel.StepStatusEnd: 290 startTime := stepStartTimes[logLine.StepId] 291 endTime := logLine.Time 292 err = p.pages.LogBlockEnd(&fragment, pages.LogBlockEndParams{ 293 Id: logLine.StepId, 294 StartTime: startTime, 295 EndTime: endTime, 296 }) 297 } 298 299 case spindlemodel.LogKindData: 300 // data messages simply insert new log lines into current step 301 err = p.pages.LogLine(&fragment, pages.LogLineParams{ 302 Id: logLine.StepId, 303 Content: logLine.Content, 304 }) 305 } 306 if err != nil { 307 l.Error("failed to render log line", "err", err) 308 return 309 } 310 311 if err = clientConn.WriteMessage(websocket.TextMessage, fragment.Bytes()); err != nil { 312 l.Error("error writing to client", "err", err) 313 return 314 } 315 316 case <-time.After(30 * time.Second): 317 l.Debug("sent keepalive") 318 if err = clientConn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 319 l.Error("failed to write control", "err", err) 320 return 321 } 322 } 323 } 324} 325 326func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 327 l := p.logger.With("handler", "Cancel") 328 329 var ( 330 pipelineId = chi.URLParam(r, "pipeline") 331 workflow = chi.URLParam(r, "workflow") 332 ) 333 if pipelineId == "" || workflow == "" { 334 http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 335 return 336 } 337 338 f, err := p.repoResolver.Resolve(r) 339 if err != nil { 340 l.Error("failed to get repo and knot", "err", err) 341 http.Error(w, "bad repo/knot", http.StatusBadRequest) 342 return 343 } 344 345 pipeline, err := func() (models.Pipeline, error) { 346 ps, err := db.GetPipelineStatuses( 347 p.db, 348 1, 349 orm.FilterEq("repo_owner", f.Did), 350 orm.FilterEq("repo_name", f.Name), 351 orm.FilterEq("knot", f.Knot), 352 orm.FilterEq("id", pipelineId), 353 ) 354 if err != nil { 355 return models.Pipeline{}, err 356 } 357 if len(ps) != 1 { 358 return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 359 } 360 return ps[0], nil 361 }() 362 if err != nil { 363 l.Error("pipeline query failed", "err", err) 364 http.Error(w, "pipeline not found", http.StatusNotFound) 365 } 366 var ( 367 spindle = f.Spindle 368 knot = f.Knot 369 rkey = pipeline.Rkey 370 ) 371 372 if spindle == "" || knot == "" || rkey == "" { 373 http.Error(w, "invalid repo info", http.StatusBadRequest) 374 return 375 } 376 377 spindleClient, err := p.oauth.ServiceClient( 378 r, 379 oauth.WithService(f.Spindle), 380 oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 381 oauth.WithDev(p.config.Core.Dev), 382 oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 383 ) 384 385 err = tangled.PipelineCancelPipeline( 386 r.Context(), 387 spindleClient, 388 &tangled.PipelineCancelPipeline_Input{ 389 Repo: string(f.RepoAt()), 390 Pipeline: pipeline.AtUri().String(), 391 Workflow: workflow, 392 }, 393 ) 394 err = fmt.Errorf("boo! new error") 395 errorId := "workflow-error" 396 if err != nil { 397 l.Error("failed to cancel workflow", "err", err) 398 p.pages.Notice(w, errorId, "Failed to cancel workflow") 399 return 400 } 401 l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 402} 403 404// either a message or an error 405type logEvent struct { 406 msg []byte 407 err error 408} 409 410func (ev *logEvent) isCloseError() bool { 411 return websocket.IsCloseError( 412 ev.err, 413 websocket.CloseNormalClosure, 414 websocket.CloseGoingAway, 415 websocket.CloseAbnormalClosure, 416 ) 417} 418 419// read logs from spindle and pass through to chan 420func readLogs(conn *websocket.Conn, ch chan logEvent) { 421 defer close(ch) 422 423 for { 424 if conn == nil { 425 return 426 } 427 428 _, msg, err := conn.ReadMessage() 429 if err != nil { 430 ch <- logEvent{err: err} 431 return 432 } 433 ch <- logEvent{msg: msg} 434 } 435}