{"contents":"package flymachines\n\nimport (\n\t\"context\"\n\t\"encoding/base64\"\n\t\"fmt\"\n\t\"io\"\n\t\"log/slog\"\n\t\"path\"\n\t\"runtime\"\n\t\"strings\"\n\t\"sync\"\n\t\"time\"\n\n\t\"gopkg.in/yaml.v3\"\n\t\"tangled.org/core/api/tangled\"\n\t\"tangled.org/core/log\"\n\t\"tangled.org/core/spindle/config\"\n\t\"tangled.org/core/spindle/engine\"\n\t\"tangled.org/core/spindle/models\"\n\t\"tangled.org/core/spindle/secrets\"\n)\n\nconst (\n\tworkspaceDir = \"/tangled/workspace\"\n\thomeDir = \"/tangled/home\"\n)\n\ntype addlFields struct {\n\timage string\n\tmachineID string\n\tenv map[string]string\n}\n\n// Step implements models.Step for Fly machine execution.\ntype Step struct {\n\tname string\n\tkind models.StepKind\n\tcommand string\n\tenvironment map[string]string\n}\n\nfunc (s Step) Name() string { return s.name }\nfunc (s Step) Command() string { return s.command }\nfunc (s Step) Kind() models.StepKind { return s.kind }\n\ntype Engine struct {\n\tclient *Client\n\tl *slog.Logger\n\tcoreCfg *config.Config\n\tflyCfg *Config\n\n\tcleanupMu sync.Mutex\n\tcleanup map[string][]func(context.Context) error\n}\n\nfunc New(ctx context.Context, coreCfg *config.Config, flyCfg *Config) (*Engine, error) {\n\tl := log.FromContext(ctx).With(\"component\", \"fly-engine\")\n\n\tclient := NewClient(flyCfg.ApiUrl, flyCfg.ApiToken, flyCfg.App)\n\n\treturn \u0026Engine{\n\t\tclient: client,\n\t\tl: l,\n\t\tcoreCfg: coreCfg,\n\t\tflyCfg: flyCfg,\n\t\tcleanup: make(map[string][]func(context.Context) error),\n\t}, nil\n}\n\nfunc (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {\n\twf := \u0026models.Workflow{}\n\taddl := addlFields{}\n\n\tdwf := \u0026struct {\n\t\tSteps []struct {\n\t\t\tCommand string `yaml:\"command\"`\n\t\t\tName string `yaml:\"name\"`\n\t\t\tEnvironment map[string]string `yaml:\"environment\"`\n\t\t} `yaml:\"steps\"`\n\t\tDependencies map[string][]string `yaml:\"dependencies\"`\n\t\tEnvironment map[string]string `yaml:\"environment\"`\n\t}{}\n\tif err := yaml.Unmarshal([]byte(twf.Raw), dwf); err != nil {\n\t\treturn nil, err\n\t}\n\n\tfor _, s := range dwf.Steps {\n\t\twf.Steps = append(wf.Steps, Step{\n\t\t\tname: s.Name,\n\t\t\tcommand: s.Command,\n\t\t\tkind: models.StepKindUser,\n\t\t\tenvironment: s.Environment,\n\t\t})\n\t}\n\twf.Name = twf.Name\n\taddl.env = dwf.Environment\n\taddl.image = workflowImage(dwf.Dependencies, e.coreCfg.NixeryPipelines.Nixery)\n\n\t// Reuse core's BuildCloneStep for git setup\n\tvar setup []models.Step\n\tsetup = append(setup, nixConfStep())\n\tsetup = append(setup, models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.coreCfg.Server.Dev))\n\tif s := dependencyStep(dwf.Dependencies); s != nil {\n\t\tsetup = append(setup, *s)\n\t}\n\twf.Steps = append(setup, wf.Steps...)\n\twf.Data = addl\n\n\treturn wf, nil\n}\n\nfunc (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {\n\te.l.Info(\"creating fly machine for workflow\", \"workflow\", wid)\n\taddl := wf.Data.(addlFields)\n\n\tm, err := e.client.CreateMachine(ctx, CreateMachineRequest{\n\t\tRegion: e.flyCfg.Region,\n\t\tConfig: MachineConfig{\n\t\t\tImage: addl.image,\n\t\t\tInit: \u0026MachineInit{\n\t\t\t\tExec: []string{\"sleep\", \"infinity\"},\n\t\t\t},\n\t\t\tGuest: \u0026Guest{\n\t\t\t\tCPUKind: e.flyCfg.CPUKind,\n\t\t\t\tCPUs: e.flyCfg.CPUs,\n\t\t\t\tMemoryMB: e.flyCfg.MemoryMB,\n\t\t\t},\n\t\t\tAutoDestroy: true,\n\t\t},\n\t})\n\tif err != nil {\n\t\treturn fmt.Errorf(\"creating fly machine: %w\", err)\n\t}\n\te.l.Info(\"created fly machine\", \"id\", m.ID, \"workflow\", wid)\n\n\te.registerCleanup(wid, func(ctx context.Context) error {\n\t\te.l.Info(\"destroying fly machine\", \"id\", m.ID)\n\t\treturn e.client.DeleteMachine(ctx, m.ID, true)\n\t})\n\n\tif err := e.client.WaitForState(ctx, m.ID, \"started\", 30*time.Second); err != nil {\n\t\treturn fmt.Errorf(\"waiting for machine start: %w\", err)\n\t}\n\n\tmkdirCmd := buildExecCommand(nil, \"/\", fmt.Sprintf(\"mkdir -p %s %s\", workspaceDir, homeDir))\n\tresp, err := e.client.Exec(ctx, m.ID, ExecRequest{\n\t\tCmd: mkdirCmd,\n\t\tTimeout: 30,\n\t})\n\tif err != nil {\n\t\treturn fmt.Errorf(\"mkdir: %w\", err)\n\t}\n\tif resp.ExitCode != 0 {\n\t\treturn fmt.Errorf(\"mkdir exited with code %d: %s\", resp.ExitCode, resp.Stderr)\n\t}\n\n\taddl.machineID = m.ID\n\twf.Data = addl\n\treturn nil\n}\n\nfunc (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secs []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {\n\taddl := w.Data.(addlFields)\n\n\tselect {\n\tcase \u003c-ctx.Done():\n\t\treturn ctx.Err()\n\tdefault:\n\t}\n\n\tstep := w.Steps[idx].(Step)\n\n\tenvs := make(map[string]string)\n\tfor k, v := range addl.env {\n\t\tenvs[k] = v\n\t}\n\tfor _, s := range secs {\n\t\tenvs[s.Key] = s.Value\n\t}\n\tfor k, v := range step.environment {\n\t\tenvs[k] = v\n\t}\n\tenvs[\"HOME\"] = homeDir\n\n\tcmd := buildExecCommand(envs, workspaceDir, step.command)\n\n\ttimeout := int(e.WorkflowTimeout().Seconds())\n\tif deadline, ok := ctx.Deadline(); ok {\n\t\tremaining := int(time.Until(deadline).Seconds())\n\t\tif remaining \u003c 1 {\n\t\t\tremaining = 1\n\t\t}\n\t\ttimeout = remaining\n\t}\n\n\tresp, err := e.client.Exec(ctx, addl.machineID, ExecRequest{\n\t\tCmd: cmd,\n\t\tTimeout: timeout,\n\t})\n\tif err != nil {\n\t\tif ctx.Err() != nil {\n\t\t\treturn engine.ErrTimedOut\n\t\t}\n\t\treturn fmt.Errorf(\"exec step %q: %w\", step.Name(), err)\n\t}\n\n\tif wfLogger != nil {\n\t\twriteLines(wfLogger.DataWriter(idx, \"stdout\"), resp.Stdout)\n\t\twriteLines(wfLogger.DataWriter(idx, \"stderr\"), resp.Stderr)\n\t}\n\n\tif resp.ExitCode != 0 {\n\t\te.l.Error(\"step failed\", \"workflow\", wid, \"step\", step.Name(), \"exit_code\", resp.ExitCode)\n\t\treturn engine.ErrWorkflowFailed\n\t}\n\n\treturn nil\n}\n\nfunc (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {\n\te.cleanupMu.Lock()\n\tkey := wid.String()\n\tfns := e.cleanup[key]\n\tdelete(e.cleanup, key)\n\te.cleanupMu.Unlock()\n\n\tfor _, fn := range fns {\n\t\tif err := fn(ctx); err != nil {\n\t\t\te.l.Error(\"cleanup failed\", \"workflow\", wid, \"error\", err)\n\t\t}\n\t}\n\treturn nil\n}\n\nfunc (e *Engine) WorkflowTimeout() time.Duration {\n\td, err := time.ParseDuration(e.flyCfg.WorkflowTimeout)\n\tif err != nil {\n\t\te.l.Error(\"failed to parse workflow timeout\", \"error\", err)\n\t\treturn 5 * time.Minute\n\t}\n\treturn d\n}\n\nfunc (e *Engine) registerCleanup(wid models.WorkflowId, fn func(context.Context) error) {\n\te.cleanupMu.Lock()\n\tdefer e.cleanupMu.Unlock()\n\tkey := wid.String()\n\te.cleanup[key] = append(e.cleanup[key], fn)\n}\n\n// buildExecCommand wraps a command with env var exports and working directory,\n// base64-encoding the script to avoid shell quoting issues.\nfunc buildExecCommand(envs map[string]string, workDir, command string) string {\n\tvar script strings.Builder\n\tfor k, v := range envs {\n\t\tescaped := strings.ReplaceAll(v, \"'\", \"'\\\\''\")\n\t\tfmt.Fprintf(\u0026script, \"export %s='%s'\\n\", k, escaped)\n\t}\n\tif workDir != \"\" {\n\t\tfmt.Fprintf(\u0026script, \"cd '%s'\\n\", workDir)\n\t}\n\tscript.WriteString(command)\n\tscript.WriteString(\"\\n\")\n\n\tencoded := base64.StdEncoding.EncodeToString([]byte(script.String()))\n\treturn fmt.Sprintf(\"echo %s | base64 -d | bash\", encoded)\n}\n\nfunc writeLines(w io.Writer, s string) {\n\tif s == \"\" {\n\t\treturn\n\t}\n\tfor _, line := range strings.Split(s, \"\\n\") {\n\t\tif line != \"\" {\n\t\t\tw.Write([]byte(line))\n\t\t}\n\t}\n}\n\n// workflowImage builds the nixery image reference from pipeline dependencies.\nfunc workflowImage(deps map[string][]string, nixery string) string {\n\tvar dependencies string\n\tfor reg, ds := range deps {\n\t\tif reg == \"nixpkgs\" {\n\t\t\tdependencies = path.Join(ds...)\n\t\t}\n\t}\n\tdependencies = path.Join(dependencies, \"bash\", \"git\", \"coreutils\", \"nix\")\n\tif runtime.GOARCH == \"arm64\" {\n\t\tdependencies = path.Join(\"arm64\", dependencies)\n\t}\n\treturn path.Join(nixery, dependencies)\n}\n\nfunc nixConfStep() Step {\n\tcmd := `mkdir -p /etc/nix\necho 'extra-experimental-features = nix-command flakes' \u003e\u003e /etc/nix/nix.conf\necho 'build-users-group = ' \u003e\u003e /etc/nix/nix.conf`\n\treturn Step{command: cmd, name: \"Configure Nix\", kind: models.StepKindSystem}\n}\n\nfunc dependencyStep(deps map[string][]string) *Step {\n\tvar pkgs []string\n\tfor reg, packages := range deps {\n\t\tif reg == \"nixpkgs\" {\n\t\t\tcontinue\n\t\t}\n\t\tif len(packages) == 0 {\n\t\t\tpkgs = append(pkgs, reg)\n\t\t}\n\t\tfor _, pkg := range packages {\n\t\t\tpkgs = append(pkgs, fmt.Sprintf(\"'%s#%s'\", reg, pkg))\n\t\t}\n\t}\n\tif len(pkgs) \u003e 0 {\n\t\tcmd := fmt.Sprintf(\n\t\t\t\"nix --extra-experimental-features nix-command --extra-experimental-features flakes profile install %s\",\n\t\t\tstrings.Join(pkgs, \" \"),\n\t\t)\n\t\treturn \u0026Step{\n\t\t\tcommand: cmd,\n\t\t\tname: \"Install custom dependencies\",\n\t\t\tkind: models.StepKindSystem,\n\t\t\tenvironment: map[string]string{\n\t\t\t\t\"NIX_NO_COLOR\": \"1\",\n\t\t\t\t\"NIX_SHOW_DOWNLOAD_PROGRESS\": \"0\",\n\t\t\t},\n\t\t}\n\t}\n\treturn nil\n}\n","path":"flymachines/engine.go","ref":"main"}