wip: spindle: engines -> adapters

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me 855593bf 54748b06

verified
+931
+459
spindle/adapters/nixery/adapter.go
··· 1 + package nixery 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "log/slog" 8 + "os" 9 + "path" 10 + "path/filepath" 11 + "regexp" 12 + "runtime" 13 + "sync" 14 + "time" 15 + 16 + "github.com/bluesky-social/indigo/atproto/syntax" 17 + "github.com/docker/docker/api/types/container" 18 + "github.com/docker/docker/api/types/filters" 19 + "github.com/docker/docker/api/types/image" 20 + "github.com/docker/docker/api/types/mount" 21 + "github.com/docker/docker/api/types/network" 22 + "github.com/docker/docker/client" 23 + "github.com/stretchr/testify/assert/yaml" 24 + "tangled.org/core/api/tangled" 25 + "tangled.org/core/sets" 26 + "tangled.org/core/spindle/config" 27 + "tangled.org/core/spindle/models" 28 + "tangled.org/core/spindle/repomanager" 29 + "tangled.org/core/tid" 30 + "tangled.org/core/workflow" 31 + ) 32 + 33 + const AdapterID = "nixery" 34 + 35 + type Adapter struct { 36 + l *slog.Logger 37 + repoManager *repomanager.RepoManager 38 + docker client.APIClient 39 + Timeout time.Duration 40 + spindleDid syntax.DID 41 + cfg config.NixeryPipelines 42 + 43 + mu sync.RWMutex 44 + activeRuns map[syntax.ATURI]models.WorkflowRun 45 + subscribers sets.Set[chan<- models.WorkflowRun] 46 + } 47 + 48 + var _ models.Adapter = (*Adapter)(nil) 49 + 50 + func New(l *slog.Logger, cfg config.Config, repoManager *repomanager.RepoManager) (*Adapter, error) { 51 + dc, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 52 + if err != nil { 53 + return nil, fmt.Errorf("creating docker client: %w", err) 54 + } 55 + return &Adapter{ 56 + l: l, 57 + repoManager: repoManager, 58 + docker: dc, 59 + Timeout: time.Minute * 5, // TODO: set timeout from config 60 + spindleDid: cfg.Server.Did(), 61 + cfg: cfg.NixeryPipelines, 62 + 63 + activeRuns: make(map[syntax.ATURI]models.WorkflowRun), 64 + subscribers: sets.New[chan<- models.WorkflowRun](), 65 + }, nil 66 + } 67 + 68 + func (a *Adapter) Init() error { 69 + // no-op 70 + return nil 71 + } 72 + 73 + func (a *Adapter) Shutdown(ctx context.Context) error { 74 + // TODO: cleanup spawned containers just in case 75 + panic("unimplemented") 76 + } 77 + 78 + func (a *Adapter) SetupRepo(ctx context.Context, repo syntax.ATURI) error { 79 + if err := a.repoManager.RegisterRepo(ctx, repo, []string{"/.tangled/workflows"}); err != nil { 80 + return fmt.Errorf("syncing repo: %w", err) 81 + } 82 + return nil 83 + } 84 + 85 + func (a *Adapter) ListWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]models.WorkflowDef, error) { 86 + defs, err := a.listWorkflowDefs(ctx, repo, rev) 87 + if err != nil { 88 + return nil, err 89 + } 90 + retDefs := make([]models.WorkflowDef, len(defs)) 91 + for i, def := range defs { 92 + retDefs[i] = def.AsInfo() 93 + } 94 + return retDefs, nil 95 + } 96 + 97 + func (a *Adapter) listWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]WorkflowDef, error) { 98 + workflowDir, err := a.repoManager.FileTree(ctx, repo, rev, workflow.WorkflowDir) 99 + if err != nil { 100 + return nil, fmt.Errorf("loading file tree: %w", err) 101 + } 102 + 103 + if len(workflowDir) == 0 { 104 + return nil, nil 105 + } 106 + 107 + // TODO(boltless): repoManager.FileTree() should be smart enough so we don't need to do this: 108 + gr, err := a.repoManager.Open(repo, rev) 109 + if err != nil { 110 + return nil, fmt.Errorf("opening git repo: %w", err) 111 + } 112 + 113 + var defs []WorkflowDef 114 + for _, e := range workflowDir { 115 + if !e.IsFile() { 116 + continue 117 + } 118 + 119 + fpath := filepath.Join(workflow.WorkflowDir, e.Name) 120 + contents, err := gr.RawContent(fpath) 121 + if err != nil { 122 + return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 123 + } 124 + 125 + var wf WorkflowDef 126 + if err := yaml.Unmarshal(contents, &wf); err != nil { 127 + return nil, fmt.Errorf("parsing yaml: %w", err) 128 + } 129 + wf.Name = e.Name 130 + 131 + defs = append(defs, wf) 132 + } 133 + 134 + return defs, nil 135 + } 136 + 137 + func (a *Adapter) EvaluateEvent(ctx context.Context, event models.Event) ([]models.WorkflowRun, error) { 138 + defs, err := a.listWorkflowDefs(ctx, event.SourceRepo, event.SourceSha) 139 + if err != nil { 140 + return nil, fmt.Errorf("fetching workflow definitions: %w", err) 141 + } 142 + 143 + // filter out triggered workflows 144 + var triggered []nixeryWorkflow 145 + for _, def := range defs { 146 + if def.ShouldRunOn(event) { 147 + triggered = append(triggered, nixeryWorkflow{ 148 + event: event, 149 + def: def, 150 + }) 151 + } 152 + } 153 + 154 + // TODO: append more workflows from "on_workflow" event 155 + 156 + // schedule workflows and return immediately 157 + runs := make([]models.WorkflowRun, len(triggered)) 158 + for i, workflow := range triggered { 159 + runs[i] = a.scheduleWorkflow(ctx, workflow) 160 + } 161 + return runs, nil 162 + } 163 + 164 + // NOTE: nixery adapter is volatile. GetActiveWorkflowRun will return error 165 + // when the workflow is terminated. It lets spindle to mark lost workflow.run 166 + // as "Failed". 167 + func (a *Adapter) GetActiveWorkflowRun(ctx context.Context, runId syntax.ATURI) (models.WorkflowRun, error) { 168 + a.mu.RLock() 169 + run, exists := a.activeRuns[runId] 170 + a.mu.RUnlock() 171 + if !exists { 172 + return run, fmt.Errorf("unknown or terminated workflow") 173 + } 174 + return run, nil 175 + } 176 + 177 + func (a *Adapter) ListActiveWorkflowRuns(ctx context.Context) ([]models.WorkflowRun, error) { 178 + a.mu.RLock() 179 + defer a.mu.RUnlock() 180 + 181 + runs := make([]models.WorkflowRun, 0, len(a.activeRuns)) 182 + for _, run := range a.activeRuns { 183 + runs = append(runs, run) 184 + } 185 + return runs, nil 186 + } 187 + 188 + func (a *Adapter) SubscribeWorkflowRun(ctx context.Context) <-chan models.WorkflowRun { 189 + ch := make(chan models.WorkflowRun, 1) 190 + 191 + a.mu.Lock() 192 + a.subscribers.Insert(ch) 193 + a.mu.Unlock() 194 + 195 + // cleanup spindle stops listening 196 + go func() { 197 + <-ctx.Done() 198 + a.mu.Lock() 199 + a.subscribers.Remove(ch) 200 + a.mu.Unlock() 201 + close(ch) 202 + }() 203 + 204 + return ch 205 + } 206 + 207 + func (a *Adapter) emit(run models.WorkflowRun) { 208 + a.mu.Lock() 209 + if run.Status.IsActive() { 210 + a.activeRuns[run.AtUri()] = run 211 + } else { 212 + delete(a.activeRuns, run.AtUri()) 213 + } 214 + 215 + // Snapshot subscribers to broadcast outside the lock 216 + subs := make([]chan<- models.WorkflowRun, 0, a.subscribers.Len()) 217 + for ch := range a.subscribers.All() { 218 + subs = append(subs, ch) 219 + } 220 + a.mu.Unlock() 221 + 222 + for _, ch := range subs { 223 + select { 224 + case ch <- run: 225 + default: 226 + // avoid blocking if channel is full 227 + // spindle will catch the state by regular GetWorkflowRun poll 228 + } 229 + } 230 + } 231 + 232 + func (a *Adapter) StreamWorkflowRunLogs(ctx context.Context, runId syntax.ATURI, handle func(line models.LogLine) error) error { 233 + panic("unimplemented") 234 + } 235 + 236 + func (a *Adapter) CancelWorkflowRun(ctx context.Context, runId syntax.ATURI) error { 237 + // remove network 238 + if err := a.docker.NetworkRemove(ctx, networkName(runId)); err != nil { 239 + return fmt.Errorf("removing network: %w", err) 240 + } 241 + 242 + // stop & remove docker containers with label 243 + containers, err := a.docker.ContainerList(ctx, container.ListOptions{ 244 + Filters: labelFilter(tangled.CiWorkflowRunNSID, runId.String()), 245 + }) 246 + if err != nil { 247 + return fmt.Errorf("finding container with label: %w", err) 248 + } 249 + for _, c := range containers { 250 + if err := a.docker.ContainerStop(ctx, c.ID, container.StopOptions{}); err != nil { 251 + return fmt.Errorf("stopping container: %w", err) 252 + } 253 + 254 + if err := a.docker.ContainerRemove(ctx, c.ID, container.RemoveOptions{ 255 + RemoveVolumes: true, 256 + RemoveLinks: false, 257 + Force: false, 258 + }); err != nil { 259 + return fmt.Errorf("removing container: %w", err) 260 + } 261 + } 262 + return nil 263 + } 264 + 265 + func labelFilter(labelKey, labelVal string) filters.Args { 266 + filterArgs := filters.NewArgs() 267 + filterArgs.Add("label", fmt.Sprintf("%s=%s", labelKey, labelVal)) 268 + return filterArgs 269 + } 270 + 271 + const ( 272 + workspaceDir = "/tangled/workspace" 273 + homeDir = "/tangled/home" 274 + ) 275 + 276 + // scheduleWorkflow schedules a workflow run in job queue and return queued run 277 + func (a *Adapter) scheduleWorkflow(ctx context.Context, workflow nixeryWorkflow) models.WorkflowRun { 278 + l := a.l 279 + 280 + run := models.WorkflowRun{ 281 + Did: a.spindleDid, 282 + Rkey: syntax.RecordKey(tid.TID()), 283 + AdapterId: AdapterID, 284 + Name: workflow.def.Name, 285 + Status: models.WorkflowStatusPending, 286 + } 287 + 288 + a.mu.Lock() 289 + a.activeRuns[run.AtUri()] = run 290 + a.mu.Unlock() 291 + 292 + go func() { 293 + defer a.CancelWorkflowRun(ctx, run.AtUri()) 294 + 295 + containerId, err := a.initNixeryContainer(ctx, workflow.def, run.AtUri()) 296 + if err != nil { 297 + l.Error("failed to intialize container", "err", err) 298 + // TODO: put user-facing logs in workflow log 299 + a.emit(run.WithStatus(models.WorkflowStatusFailed)) 300 + return 301 + } 302 + 303 + ctx, cancel := context.WithTimeout(ctx, a.Timeout) 304 + defer cancel() 305 + 306 + for stepIdx, step := range workflow.def.Steps { 307 + if err := a.runStep(ctx, containerId, stepIdx, step); err != nil { 308 + l.Error("failed to run step", "stepIdx", stepIdx, "err", err) 309 + return 310 + } 311 + } 312 + l.Info("all steps completed successfully") 313 + }() 314 + 315 + l.Info("workflow scheduled to background", "workflow.run", run.AtUri()) 316 + 317 + return run 318 + } 319 + 320 + func (a *Adapter) runStep(ctx context.Context, containerId string, stepIdx int, step Step) error { 321 + // TODO: implement this 322 + 323 + // TODO: configure envs 324 + var envs []string 325 + 326 + select { 327 + case <-ctx.Done(): 328 + return ctx.Err() 329 + default: 330 + } 331 + 332 + mkExecResp, err := a.docker.ContainerExecCreate(ctx, containerId, container.ExecOptions{ 333 + Cmd: []string{"bash", "-c", step.Command}, 334 + AttachStdout: true, 335 + AttachStderr: true, 336 + Env: envs, 337 + }) 338 + if err != nil { 339 + return fmt.Errorf("creating exec: %w", err) 340 + } 341 + 342 + panic("unimplemented") 343 + } 344 + 345 + // initNixeryContainer pulls the image from nixery and start the container. 346 + func (a *Adapter) initNixeryContainer(ctx context.Context, def WorkflowDef, runAt syntax.ATURI) (string, error) { 347 + imageName := workflowImageName(def.Dependencies, a.cfg.Nixery) 348 + 349 + _, err := a.docker.NetworkCreate(ctx, networkName(runAt), network.CreateOptions{ 350 + Driver: "bridge", 351 + }) 352 + if err != nil { 353 + return "", fmt.Errorf("creating network: %w", err) 354 + } 355 + 356 + reader, err := a.docker.ImagePull(ctx, imageName, image.PullOptions{}) 357 + if err != nil { 358 + return "", fmt.Errorf("pulling image: %w", err) 359 + } 360 + defer reader.Close() 361 + io.Copy(os.Stdout, reader) 362 + 363 + resp, err := a.docker.ContainerCreate(ctx, &container.Config{ 364 + Image: imageName, 365 + Cmd: []string{"cat"}, 366 + OpenStdin: true, // so cat stays alive :3 367 + Tty: false, 368 + Hostname: "spindle", 369 + WorkingDir: workspaceDir, 370 + Labels: map[string]string{ 371 + tangled.CiWorkflowRunNSID: runAt.String(), 372 + }, 373 + // TODO(winter): investigate whether environment variables passed here 374 + // get propagated to ContainerExec processes 375 + }, &container.HostConfig{ 376 + Mounts: []mount.Mount{ 377 + { 378 + Type: mount.TypeTmpfs, 379 + Target: "/tmp", 380 + ReadOnly: false, 381 + TmpfsOptions: &mount.TmpfsOptions{ 382 + Mode: 0o1777, // world-writeable sticky bit 383 + Options: [][]string{ 384 + {"exec"}, 385 + }, 386 + }, 387 + }, 388 + }, 389 + ReadonlyRootfs: false, 390 + CapDrop: []string{"ALL"}, 391 + CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 392 + SecurityOpt: []string{"no-new-privileges"}, 393 + ExtraHosts: []string{"host.docker.internal:host-gateway"}, 394 + }, nil, nil, "") 395 + if err != nil { 396 + return "", fmt.Errorf("creating container: %w", err) 397 + } 398 + 399 + if err := a.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { 400 + return "", fmt.Errorf("starting container: %w", err) 401 + } 402 + 403 + mkExecResp, err := a.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 404 + Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 405 + AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 406 + AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 407 + }) 408 + if err != nil { 409 + return "", err 410 + } 411 + 412 + // This actually *starts* the command. Thanks, Docker! 413 + execResp, err := a.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 414 + if err != nil { 415 + return "", err 416 + } 417 + defer execResp.Close() 418 + 419 + // This is apparently best way to wait for the command to complete. 420 + _, err = io.ReadAll(execResp.Reader) 421 + if err != nil { 422 + return "", err 423 + } 424 + 425 + execInspectResp, err := a.docker.ContainerExecInspect(ctx, mkExecResp.ID) 426 + if err != nil { 427 + return "", err 428 + } 429 + 430 + if execInspectResp.ExitCode != 0 { 431 + return "", fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 432 + } else if execInspectResp.Running { 433 + return "", fmt.Errorf("mkdir is somehow still running??") 434 + } 435 + 436 + return resp.ID, nil 437 + } 438 + 439 + func workflowImageName(deps map[string][]string, nixery string) string { 440 + var dependencies string 441 + for reg, ds := range deps { 442 + if reg == "nixpkgs" { 443 + dependencies = path.Join(ds...) 444 + } 445 + } 446 + // NOTE: shouldn't base dependencies come first? 447 + // like: nixery.tangled.sh/arm64/bash/git/coreutils/nix 448 + dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 449 + if runtime.GOARCH == "arm64" { 450 + dependencies = path.Join("arm64", dependencies) 451 + } 452 + 453 + return path.Join(nixery, dependencies) 454 + } 455 + 456 + var re = regexp.MustCompile(`[^a-zA-Z0-9_.-]`) 457 + func networkName(runId syntax.ATURI) string { 458 + return re.ReplaceAllString(runId.String()[5:], "-") 459 + }
+3
spindle/adapters/nixery/readme.md
··· 1 + # Nixery spindle adapter implementation 2 + 3 + Nixery adapter uses `/.tangled/workflows/*.yml` files as workflow definitions.
+42
spindle/adapters/nixery/workflow.go
··· 1 + package nixery 2 + 3 + import ( 4 + "tangled.org/core/spindle/models" 5 + "tangled.org/core/workflow" 6 + ) 7 + 8 + type nixeryWorkflow struct { 9 + event models.Event // event that triggered the workflow 10 + def WorkflowDef // definition of the workflow 11 + } 12 + 13 + // TODO: extract general fields to workflow.WorkflowDef struct 14 + 15 + // nixery adapter workflow definition spec 16 + type WorkflowDef struct { 17 + Name string `yaml:"-"` // name of the workflow file 18 + When []workflow.Constraint `yaml:"when"` 19 + CloneOpts workflow.CloneOpts `yaml:"clone"` 20 + 21 + Dependencies map[string][]string // nix packages used for the workflow 22 + Steps []Step // workflow steps 23 + } 24 + 25 + type Step struct { 26 + Name string `yaml:"name"` 27 + Command string `yaml:"command"` 28 + Enviornment map[string]string `yaml:"environment"` 29 + } 30 + 31 + func (d *WorkflowDef) AsInfo() models.WorkflowDef { 32 + return models.WorkflowDef{ 33 + AdapterId: AdapterID, 34 + Name: d.Name, 35 + When: d.When, 36 + } 37 + } 38 + 39 + func (d *WorkflowDef) ShouldRunOn(event models.Event) bool { 40 + // panic("unimplemented") 41 + return false 42 + }
+93
spindle/models/adapter.go
··· 1 + package models 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + ) 8 + 9 + // Adapter is the core of the spindle. It can use its own way to configure and 10 + // run the workflows. The workflow definition can be either yaml files in git 11 + // repositories or even from dedicated web UI. 12 + // 13 + // An adapter is expected to be hold all created workflow runs. 14 + type Adapter interface { 15 + // Init intializes the adapter 16 + Init() error 17 + 18 + // Shutdown gracefully shuts down background jobs 19 + Shutdown(ctx context.Context) error 20 + 21 + // SetupRepo ensures adapter connected to the repository. 22 + // This usually includes adding repository watcher that does sparse-clone. 23 + SetupRepo(ctx context.Context, repo syntax.ATURI) error 24 + 25 + // ListWorkflowDefs parses and returns all workflow definitions in the given 26 + // repository at the specified revision 27 + ListWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]WorkflowDef, error) 28 + 29 + // EvaluateEvent consumes a trigger event and returns a list of triggered 30 + // workflow runs. It is expected to return immediately after scheduling the 31 + // workflows. 32 + EvaluateEvent(ctx context.Context, event Event) ([]WorkflowRun, error) 33 + 34 + // GetActiveWorkflowRun returns current state of specific workflow run. 35 + // This method will be called regularly for active workflow runs. 36 + GetActiveWorkflowRun(ctx context.Context, runId syntax.ATURI) (WorkflowRun, error) 37 + 38 + 39 + 40 + 41 + // NOTE: baisically I'm not sure about this method. 42 + // How to properly sync workflow.run states? 43 + // 44 + // for adapters with external engine, they will hold every past 45 + // workflow.run objects. 46 + // for adapters with internal engine, they... should also hold every 47 + // past workflow.run objects..? 48 + // 49 + // problem: 50 + // when spindle suffer downtime (spindle server shutdown), 51 + // external `workflow.run`s might be unsynced in "running" or "pending" state 52 + // same for internal `workflow.run`s. 53 + // 54 + // BUT, spindle itself is holding the runs, 55 + // so it already knows unsynced workflows (=workflows not finished) 56 + // therefore, it can just fetch them again. 57 + // for adapters with internal engines, they will fail to fetch previous 58 + // run. 59 + // Leaving spindle to mark the run as "Lost" or "Failed". 60 + // Because of _lacking_ adaters, spindle should be able to manually 61 + // mark unknown runs with "lost" state. 62 + // 63 + // GetWorkflowRun : used to get background crawling 64 + // XCodeCloud: ok 65 + // Nixery: (will fail if unknown) -> spindle will mark workflow as failed anyways 66 + // StreamWorkflowRun : used to notify real-time updates 67 + // XCodeCloud: ok (but old events will be lost) 68 + // Nixery: same. old events on spindle downtime will be lost 69 + // 70 + // 71 + // To avoid this, each adapters should hold outbox buffer 72 + // 73 + // | 74 + // v 75 + 76 + // StreamWorkflowRun(ctx context.Context) <-chan WorkflowRun 77 + 78 + 79 + // ListActiveWorkflowRuns returns current list of active workflow runs. 80 + // Runs where status is either Pending or Running 81 + ListActiveWorkflowRuns(ctx context.Context) ([]WorkflowRun, error) 82 + SubscribeWorkflowRun(ctx context.Context) <-chan WorkflowRun 83 + 84 + 85 + 86 + 87 + // StreamWorkflowRunLogs streams logs for a running workflow execution 88 + StreamWorkflowRunLogs(ctx context.Context, runId syntax.ATURI, handle func(line LogLine) error) error 89 + 90 + // CancelWorkflowRun attempts to stop a running workflow execution. 91 + // It won't do anything when the workflow has already completed. 92 + CancelWorkflowRun(ctx context.Context, runId syntax.ATURI) error 93 + }
+124
spindle/models/pipeline2.go
··· 1 + package models 2 + 3 + import ( 4 + "fmt" 5 + "slices" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + "tangled.org/core/api/tangled" 9 + ) 10 + 11 + // `sh.tangled.ci.event` 12 + type Event struct { 13 + SourceRepo syntax.ATURI // repository to find the workflow definition 14 + SourceSha string // sha to find the workflow definition 15 + TargetSha string // sha to run the workflow 16 + // union type of: 17 + // 1. PullRequestEvent 18 + // 2. PushEvent 19 + // 3. ManualEvent 20 + } 21 + 22 + func (e *Event) AsRecord() tangled.CiEvent { 23 + // var meta tangled.CiEvent_Meta 24 + // return tangled.CiEvent{ 25 + // Meta: &meta, 26 + // } 27 + panic("unimplemented") 28 + } 29 + 30 + // `sh.tangled.ci.pipeline` 31 + // 32 + // Pipeline is basically a group of workflows triggered by single event. 33 + type Pipeline2 struct { 34 + Did syntax.DID 35 + Rkey syntax.RecordKey 36 + 37 + Event Event // event that triggered the pipeline 38 + WorkflowRuns []WorkflowRun // workflow runs inside this pipeline 39 + } 40 + 41 + func (p *Pipeline2) AtUri() syntax.ATURI { 42 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", p.Did, tangled.CiPipelineNSID, p.Rkey)) 43 + } 44 + 45 + func (p *Pipeline2) AsRecord() tangled.CiPipeline { 46 + event := p.Event.AsRecord() 47 + runs := make([]string, len(p.WorkflowRuns)) 48 + for i, run := range p.WorkflowRuns { 49 + runs[i] = run.AtUri().String() 50 + } 51 + return tangled.CiPipeline{ 52 + Event: &event, 53 + WorkflowRuns: runs, 54 + } 55 + } 56 + 57 + // `sh.tangled.ci.workflow.run` 58 + type WorkflowRun struct { 59 + Did syntax.DID 60 + Rkey syntax.RecordKey 61 + 62 + AdapterId string // adapter id 63 + Name string // name of workflow run (not workflow definition name!) 64 + Status WorkflowStatus // workflow status 65 + // TODO: can add some custom fields like adapter-specific log-id 66 + } 67 + 68 + func (r WorkflowRun) WithStatus(status WorkflowStatus) WorkflowRun { 69 + return WorkflowRun{ 70 + Did: r.Did, 71 + Rkey: r.Rkey, 72 + AdapterId: r.AdapterId, 73 + Name: r.Name, 74 + Status: status, 75 + } 76 + } 77 + 78 + func (r *WorkflowRun) AtUri() syntax.ATURI { 79 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, tangled.CiWorkflowRunNSID, r.Rkey)) 80 + } 81 + 82 + func (r *WorkflowRun) AsRecord() tangled.CiWorkflowRun { 83 + statusStr := string(r.Status) 84 + return tangled.CiWorkflowRun{ 85 + Adapter: r.AdapterId, 86 + Name: r.Name, 87 + Status: &statusStr, 88 + } 89 + } 90 + 91 + // `sh.tangled.ci.workflow.status` 92 + type WorkflowStatus string 93 + 94 + var ( 95 + WorkflowStatusPending WorkflowStatus = "pending" 96 + WorkflowStatusRunning WorkflowStatus = "running" 97 + WorkflowStatusFailed WorkflowStatus = "failed" 98 + WorkflowStatusCancelled WorkflowStatus = "cancelled" 99 + WorkflowStatusSuccess WorkflowStatus = "success" 100 + WorkflowStatusTimeout WorkflowStatus = "timeout" 101 + 102 + activeStatuses [2]WorkflowStatus = [2]WorkflowStatus{ 103 + WorkflowStatusPending, 104 + WorkflowStatusRunning, 105 + } 106 + ) 107 + 108 + func (s WorkflowStatus) IsActive() bool { 109 + return slices.Contains(activeStatuses[:], s) 110 + } 111 + 112 + func (s WorkflowStatus) IsFinish() bool { 113 + return !s.IsActive() 114 + } 115 + 116 + // `sh.tangled.ci.workflow.def` 117 + // 118 + // Brief information of the workflow definition. A workflow can be defined in 119 + // any form. This is a common info struct for any workflow definitions 120 + type WorkflowDef struct { 121 + AdapterId string // adapter id 122 + Name string // name or the workflow (usually the yml file name) 123 + When any // events the workflow is listening to 124 + }
+40
spindle/pipeline.go
··· 1 + package spindle 2 + 3 + import ( 4 + "context" 5 + 6 + "tangled.org/core/spindle/models" 7 + ) 8 + 9 + // createPipeline creates a pipeline from given event. 10 + // It will call `EvaluateEvent` for all adapters, gather the triggered workflow 11 + // runs, and constuct a pipeline record from them. pipeline record. It will 12 + // return nil if no workflow run has triggered. 13 + // 14 + // NOTE: This method won't fail. If `adapter.EvaluateEvent` returns an error, 15 + // the error will be logged but won't bubble-up. 16 + // 17 + // NOTE: Adapters might create sub-event on its own for workflows triggered by 18 + // other workflow runs. 19 + func (s *Spindle) createPipeline(ctx context.Context, event models.Event) (*models.Pipeline2) { 20 + l := s.l 21 + 22 + pipeline := models.Pipeline2{ 23 + Event: event, 24 + } 25 + 26 + // TODO: run in parallel 27 + for id, adapter := range s.adapters { 28 + runs, err := adapter.EvaluateEvent(ctx, event) 29 + if err != nil { 30 + l.Error("failed to process trigger from adapter '%s': %w", id, err) 31 + } 32 + pipeline.WorkflowRuns = append(pipeline.WorkflowRuns, runs...) 33 + } 34 + 35 + if len(pipeline.WorkflowRuns) == 0 { 36 + return nil 37 + } 38 + 39 + return &pipeline 40 + }
+169
spindle/repomanager/repomanager.go
··· 1 + package repomanager 2 + 3 + import ( 4 + "bufio" 5 + "bytes" 6 + "context" 7 + "errors" 8 + "fmt" 9 + "os" 10 + "os/exec" 11 + "path/filepath" 12 + "slices" 13 + "strings" 14 + 15 + "github.com/bluesky-social/indigo/atproto/syntax" 16 + "github.com/go-git/go-git/v5" 17 + "github.com/go-git/go-git/v5/config" 18 + "github.com/go-git/go-git/v5/plumbing/object" 19 + kgit "tangled.org/core/knotserver/git" 20 + "tangled.org/core/types" 21 + ) 22 + 23 + // RepoManager manages a `sh.tangled.repo` record with its git context. 24 + // It can be used to efficiently fetch the filetree of the repository. 25 + type RepoManager struct { 26 + repoDir string 27 + // TODO: it would be nice if RepoManager can be configured with different 28 + // strategies: 29 + // - use db as an only source for repo records 30 + // - use atproto if record doesn't exist from the db 31 + // - always use atproto 32 + // hmm do we need `RepoStore` interface? 33 + // now `DbRepoStore` and `AtprotoRepoStore` can implement both. 34 + // all `RepoStore` objects will hold `KnotStore` interface, so they can 35 + // source the knot store if needed. 36 + 37 + // but now we can't do complex queries like "get repo with issue count" 38 + // that kind of queries will be done directly from `appview.DB` struct 39 + // is graphql better tech for atproto? 40 + } 41 + 42 + func New(repoDir string) RepoManager { 43 + return RepoManager{ 44 + repoDir: repoDir, 45 + } 46 + } 47 + 48 + // TODO: RepoManager can return file tree from repoAt & rev 49 + // It will start syncing the repository if doesn't exist 50 + 51 + // RegisterRepo starts sparse-syncing repository with paths 52 + func (m *RepoManager) RegisterRepo(ctx context.Context, repoAt syntax.ATURI, paths []string) error { 53 + repoPath := m.repoPath(repoAt) 54 + exist, err := isDir(repoPath) 55 + if err != nil { 56 + return fmt.Errorf("checking dir info: %w", err) 57 + } 58 + var sparsePaths []string 59 + if !exist { 60 + // init bare git repo 61 + repo, err := git.PlainInit(repoPath, true) 62 + if err != nil { 63 + return fmt.Errorf("initializing repo: %w", err) 64 + } 65 + _, err = repo.CreateRemote(&config.RemoteConfig{ 66 + Name: "origin", 67 + URLs: []string{m.repoCloneUrl(repoAt)}, 68 + }) 69 + if err != nil { 70 + return fmt.Errorf("configuring repo remote: %w", err) 71 + } 72 + } else { 73 + // get sparse-checkout list 74 + sparsePaths, err = func(path string) ([]string, error) { 75 + var stdout bytes.Buffer 76 + listCmd := exec.Command("git", "-C", path, "sparse-checkout", "list") 77 + listCmd.Stdout = &stdout 78 + if err := listCmd.Run(); err != nil { 79 + return nil, err 80 + } 81 + 82 + var sparseList []string 83 + scanner := bufio.NewScanner(&stdout) 84 + for scanner.Scan() { 85 + line := strings.TrimSpace(scanner.Text()) 86 + if line == "" { 87 + continue 88 + } 89 + sparseList = append(sparseList, line) 90 + } 91 + if err := scanner.Err(); err != nil { 92 + return nil, fmt.Errorf("scanning stdout: %w", err) 93 + } 94 + 95 + return sparseList, nil 96 + }(repoPath) 97 + if err != nil { 98 + return fmt.Errorf("parsing sparse-checkout list: %w", err) 99 + } 100 + 101 + // add paths to sparse-checkout list 102 + for _, path := range paths { 103 + sparsePaths = append(sparsePaths, path) 104 + } 105 + sparsePaths = slices.Collect(slices.Values(sparsePaths)) 106 + } 107 + 108 + // set sparse-checkout list 109 + args := append([]string{"-C", repoPath, "sparse-checkout", "set", "--no-cone"}, sparsePaths...) 110 + if err := exec.Command("git", args...).Run(); err != nil { 111 + return fmt.Errorf("setting sparse-checkout list: %w", err) 112 + } 113 + return nil 114 + } 115 + 116 + // SyncRepo sparse-fetch specific rev of the repo 117 + func (m *RepoManager) SyncRepo(ctx context.Context, repo syntax.ATURI, rev string) error { 118 + // TODO: fetch repo with rev. 119 + panic("unimplemented") 120 + } 121 + 122 + func (m *RepoManager) Open(repo syntax.ATURI, rev string) (*kgit.GitRepo, error) { 123 + // TODO: don't depend on knot/git 124 + return kgit.Open(m.repoPath(repo), rev) 125 + } 126 + 127 + func (m *RepoManager) FileTree(ctx context.Context, repo syntax.ATURI, rev, path string) ([]types.NiceTree, error) { 128 + if err := m.SyncRepo(ctx, repo, rev); err != nil { 129 + return nil, fmt.Errorf("syncing git repo") 130 + } 131 + gr, err := m.Open(repo, rev) 132 + if err != nil { 133 + return nil, err 134 + } 135 + dir, err := gr.FileTree(ctx, path) 136 + if err != nil { 137 + if errors.Is(err, object.ErrDirectoryNotFound) { 138 + return nil, nil 139 + } 140 + return nil, fmt.Errorf("loading file tree: %w", err) 141 + } 142 + return dir, err 143 + } 144 + 145 + func (m *RepoManager) repoPath(repo syntax.ATURI) string { 146 + return filepath.Join( 147 + m.repoDir, 148 + repo.Authority().String(), 149 + repo.Collection().String(), 150 + repo.RecordKey().String(), 151 + ) 152 + } 153 + 154 + func (m *RepoManager) repoCloneUrl(repo syntax.ATURI) string { 155 + // 1. get repo & knot models from db. fetch it if doesn't exist 156 + // 2. construct https clone url 157 + panic("unimplemented") 158 + } 159 + 160 + func isDir(path string) (bool, error) { 161 + info, err := os.Stat(path) 162 + if err == nil && info.IsDir() { 163 + return true, nil 164 + } 165 + if os.IsNotExist(err) { 166 + return false, nil 167 + } 168 + return false, err 169 + }
+1
spindle/server.go
··· 49 49 l *slog.Logger 50 50 n *notifier.Notifier 51 51 engs map[string]models.Engine 52 + adapters map[string]models.Adapter 52 53 jq *queue.Queue 53 54 cfg *config.Config 54 55 ks *eventconsumer.Consumer