spindle/engines/nixery: provision one container per workflow #427

merged
opened by winter.bsky.social targeting master from winter.bsky.social/core: push-luoyqwkpromz

This moves away from the old method of creating a container with some shared volumes to one that most users would expect: any changes made in one step will be accessible by the following steps, and not only if they're in the workspace or /etc/nix. This also paves the way for a more generic Docker image engine, as users can do things like apt install without the results being blown away across steps.

Signed-off-by: Winter winter@winter.cafe

Changed files
+121 -187
nix
spindle
engines
+2 -2
nix/vm.nix
··· 3 self, 4 }: 5 nixpkgs.lib.nixosSystem { 6 - system = "x86_64-linux"; 7 modules = [ 8 self.nixosModules.knot 9 self.nixosModules.spindle ··· 58 services.tangled-spindle = { 59 enable = true; 60 server = { 61 - owner = "did:plc:qfpnj4og54vl56wngdriaxug"; 62 hostname = "localhost:6555"; 63 listenAddr = "0.0.0.0:6555"; 64 dev = true;
··· 3 self, 4 }: 5 nixpkgs.lib.nixosSystem { 6 + system = "aarch64-linux"; 7 modules = [ 8 self.nixosModules.knot 9 self.nixosModules.spindle ··· 58 services.tangled-spindle = { 59 enable = true; 60 server = { 61 + owner = "did:plc:pdrr2fgsfkvfbznqucjmyeee"; 62 hostname = "localhost:6555"; 63 listenAddr = "0.0.0.0:6555"; 64 dev = true;
+117 -184
spindle/engines/nixery/engine.go
··· 9 "os" 10 "path" 11 "runtime" 12 - "strings" 13 "sync" 14 "time" 15 ··· 17 "github.com/docker/docker/api/types/image" 18 "github.com/docker/docker/api/types/mount" 19 "github.com/docker/docker/api/types/network" 20 - "github.com/docker/docker/api/types/volume" 21 "github.com/docker/docker/client" 22 "github.com/docker/docker/pkg/stdcopy" 23 "gopkg.in/yaml.v3" ··· 72 } 73 74 type addlFields struct { 75 - image string 76 - env map[string]string 77 } 78 79 func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { ··· 170 return e, nil 171 } 172 173 - // SetupWorkflow sets up a new network for the workflow and volumes for 174 - // the workspace and Nix store. These are persisted across steps and are 175 - // destroyed at the end of the workflow. 176 func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 177 e.l.Info("setting up workflow", "workflow", wid) 178 179 - _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 180 - Name: workspaceVolume(wid), 181 - Driver: "local", 182 }) 183 if err != nil { 184 return err 185 } 186 e.registerCleanup(wid, func(ctx context.Context) error { 187 - return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 188 }) 189 190 - _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 191 - Name: nixVolume(wid), 192 - Driver: "local", 193 - }) 194 if err != nil { 195 - return err 196 } 197 e.registerCleanup(wid, func(ctx context.Context) error { 198 - return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 199 }) 200 201 - _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 202 - Driver: "bridge", 203 }) 204 if err != nil { 205 return err 206 } 207 - e.registerCleanup(wid, func(ctx context.Context) error { 208 - return e.docker.NetworkRemove(ctx, networkName(wid)) 209 - }) 210 211 - addl := wf.Data.(addlFields) 212 213 - reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 214 if err != nil { 215 - e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 216 217 - return fmt.Errorf("pulling image: %w", err) 218 } 219 - defer reader.Close() 220 - io.Copy(os.Stdout, reader) 221 222 return nil 223 } 224 225 func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *engine.WorkflowLogger) error { 226 - workflowEnvs := ConstructEnvs(w.Data.(addlFields).env) 227 for _, s := range secrets { 228 workflowEnvs.AddEnv(s.Key, s.Value) 229 } ··· 243 envs.AddEnv("HOME", workspaceDir) 244 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 245 246 - hostConfig := hostConfig(wid) 247 - resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 248 - Image: w.Data.(addlFields).image, 249 - Cmd: []string{"bash", "-c", step.command}, 250 - WorkingDir: workspaceDir, 251 - Tty: false, 252 - Hostname: "spindle", 253 - Env: envs.Slice(), 254 - }, hostConfig, nil, nil, "") 255 - defer e.DestroyStep(ctx, resp.ID) 256 - if err != nil { 257 - return fmt.Errorf("creating container: %w", err) 258 - } 259 - 260 - err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 261 if err != nil { 262 - return fmt.Errorf("connecting network: %w", err) 263 } 264 265 - err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 266 - if err != nil { 267 - return err 268 - } 269 - e.l.Info("started container", "name", resp.ID, "step", step.Name) 270 - 271 // start tailing logs in background 272 tailDone := make(chan error, 1) 273 go func() { 274 - tailDone <- e.TailStep(ctx, wfLogger, resp.ID, wid, idx, step) 275 - }() 276 - 277 - // wait for container completion or timeout 278 - waitDone := make(chan struct{}) 279 - var state *container.State 280 - var waitErr error 281 - 282 - go func() { 283 - defer close(waitDone) 284 - state, waitErr = e.WaitStep(ctx, resp.ID) 285 }() 286 287 select { 288 - case <-waitDone: 289 - 290 - // wait for tailing to complete 291 - <-tailDone 292 293 case <-ctx.Done(): 294 - e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 295 - err = e.DestroyStep(context.Background(), resp.ID) 296 - if err != nil { 297 - e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 298 - } 299 300 - // wait for both goroutines to finish 301 - <-waitDone 302 <-tailDone 303 304 return engine.ErrTimedOut ··· 310 default: 311 } 312 313 - if waitErr != nil { 314 - return waitErr 315 - } 316 - 317 - err = e.DestroyStep(ctx, resp.ID) 318 if err != nil { 319 return err 320 } 321 322 - if state.ExitCode != 0 { 323 - e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 324 - if state.OOMKilled { 325 - return ErrOOMKilled 326 - } 327 - return engine.ErrWorkflowFailed 328 - } 329 - 330 - return nil 331 - } 332 - 333 - func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 334 - wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 335 - select { 336 - case err := <-errCh: 337 if err != nil { 338 - return nil, err 339 } 340 - case <-wait: 341 - } 342 343 - e.l.Info("waited for container", "name", containerID) 344 345 - info, err := e.docker.ContainerInspect(ctx, containerID) 346 - if err != nil { 347 - return nil, err 348 } 349 350 - return info.State, nil 351 } 352 353 - func (e *Engine) TailStep(ctx context.Context, wfLogger *engine.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 354 if wfLogger == nil { 355 return nil 356 } 357 358 - logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 359 - Follow: true, 360 - ShowStdout: true, 361 - ShowStderr: true, 362 - Details: false, 363 - Timestamps: false, 364 - }) 365 if err != nil { 366 return err 367 } 368 369 _, err = stdcopy.StdCopy( 370 wfLogger.DataWriter("stdout"), 371 wfLogger.DataWriter("stderr"), 372 - logs, 373 ) 374 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 375 return fmt.Errorf("failed to copy logs: %w", err) ··· 378 return nil 379 } 380 381 - func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 382 - err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 383 - if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 384 - return err 385 - } 386 - 387 - if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 388 - RemoveVolumes: true, 389 - RemoveLinks: false, 390 - Force: false, 391 - }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 392 - return err 393 - } 394 - 395 - return nil 396 - } 397 - 398 func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 399 e.cleanupMu.Lock() 400 key := wid.String() ··· 419 e.cleanup[key] = append(e.cleanup[key], fn) 420 } 421 422 - func workspaceVolume(wid models.WorkflowId) string { 423 - return fmt.Sprintf("workspace-%s", wid) 424 - } 425 - 426 - func nixVolume(wid models.WorkflowId) string { 427 - return fmt.Sprintf("nix-%s", wid) 428 - } 429 - 430 func networkName(wid models.WorkflowId) string { 431 return fmt.Sprintf("workflow-network-%s", wid) 432 } 433 - 434 - func hostConfig(wid models.WorkflowId) *container.HostConfig { 435 - hostConfig := &container.HostConfig{ 436 - Mounts: []mount.Mount{ 437 - { 438 - Type: mount.TypeVolume, 439 - Source: workspaceVolume(wid), 440 - Target: workspaceDir, 441 - }, 442 - { 443 - Type: mount.TypeVolume, 444 - Source: nixVolume(wid), 445 - Target: "/nix", 446 - }, 447 - { 448 - Type: mount.TypeTmpfs, 449 - Target: "/tmp", 450 - ReadOnly: false, 451 - TmpfsOptions: &mount.TmpfsOptions{ 452 - Mode: 0o1777, // world-writeable sticky bit 453 - Options: [][]string{ 454 - {"exec"}, 455 - }, 456 - }, 457 - }, 458 - { 459 - Type: mount.TypeVolume, 460 - Source: "etc-nix-" + wid.String(), 461 - Target: "/etc/nix", 462 - }, 463 - }, 464 - ReadonlyRootfs: false, 465 - CapDrop: []string{"ALL"}, 466 - CapAdd: []string{"CAP_DAC_OVERRIDE"}, 467 - SecurityOpt: []string{"no-new-privileges"}, 468 - ExtraHosts: []string{"host.docker.internal:host-gateway"}, 469 - } 470 - 471 - return hostConfig 472 - } 473 - 474 - // thanks woodpecker 475 - func isErrContainerNotFoundOrNotRunning(err error) bool { 476 - // Error response from daemon: Cannot kill container: ...: No such container: ... 477 - // Error response from daemon: Cannot kill container: ...: Container ... is not running" 478 - // Error response from podman daemon: can only kill running containers. ... is in state exited 479 - // Error: No such container: ... 480 - return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 481 - }
··· 9 "os" 10 "path" 11 "runtime" 12 "sync" 13 "time" 14 ··· 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "gopkg.in/yaml.v3" ··· 70 } 71 72 type addlFields struct { 73 + image string 74 + container string 75 + env map[string]string 76 } 77 78 func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { ··· 169 return e, nil 170 } 171 172 func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 173 e.l.Info("setting up workflow", "workflow", wid) 174 175 + _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 176 + Driver: "bridge", 177 }) 178 if err != nil { 179 return err 180 } 181 e.registerCleanup(wid, func(ctx context.Context) error { 182 + return e.docker.NetworkRemove(ctx, networkName(wid)) 183 }) 184 185 + addl := wf.Data.(addlFields) 186 + 187 + reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 188 if err != nil { 189 + e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 190 + 191 + return fmt.Errorf("pulling image: %w", err) 192 + } 193 + defer reader.Close() 194 + io.Copy(os.Stdout, reader) 195 + 196 + resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 197 + Image: addl.image, 198 + Cmd: []string{"cat"}, 199 + OpenStdin: true, // so cat stays alive :3 200 + Tty: false, 201 + Hostname: "spindle", 202 + // TODO(winter): investigate whether environment variables passed here 203 + // get propagated to ContainerExec processes 204 + }, &container.HostConfig{ 205 + Mounts: []mount.Mount{ 206 + { 207 + Type: mount.TypeTmpfs, 208 + Target: "/tmp", 209 + ReadOnly: false, 210 + TmpfsOptions: &mount.TmpfsOptions{ 211 + Mode: 0o1777, // world-writeable sticky bit 212 + Options: [][]string{ 213 + {"exec"}, 214 + }, 215 + }, 216 + }, 217 + }, 218 + ReadonlyRootfs: false, 219 + CapDrop: []string{"ALL"}, 220 + CapAdd: []string{"CAP_DAC_OVERRIDE"}, 221 + SecurityOpt: []string{"no-new-privileges"}, 222 + ExtraHosts: []string{"host.docker.internal:host-gateway"}, 223 + }, nil, nil, "") 224 + if err != nil { 225 + return fmt.Errorf("creating container: %w", err) 226 } 227 e.registerCleanup(wid, func(ctx context.Context) error { 228 + err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 229 + if err != nil { 230 + return err 231 + } 232 + 233 + return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 234 + RemoveVolumes: true, 235 + RemoveLinks: false, 236 + Force: false, 237 + }) 238 }) 239 240 + err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 241 + if err != nil { 242 + return fmt.Errorf("starting container: %w", err) 243 + } 244 + 245 + mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 246 + Cmd: []string{"mkdir", "-p", workspaceDir}, 247 + AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 248 + AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 249 }) 250 if err != nil { 251 return err 252 } 253 254 + execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 255 + if err != nil { 256 + return err 257 + } 258 + defer execResp.Close() 259 260 + _, err = io.ReadAll(execResp.Reader) 261 if err != nil { 262 + return err 263 + } 264 265 + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 266 + if err != nil { 267 + return err 268 } 269 + 270 + if execInspectResp.ExitCode != 0 { 271 + return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 272 + } else if execInspectResp.Running { 273 + return errors.New("mkdir is somehow still running??") 274 + } 275 + 276 + addl.container = resp.ID 277 + wf.Data = addl 278 279 return nil 280 } 281 282 func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *engine.WorkflowLogger) error { 283 + addl := w.Data.(addlFields) 284 + workflowEnvs := ConstructEnvs(addl.env) 285 + // TODO(winter): should SetupWorkflow also have secret access? 286 + // IMO yes, but probably worth thinking on. 287 for _, s := range secrets { 288 workflowEnvs.AddEnv(s.Key, s.Value) 289 } ··· 303 envs.AddEnv("HOME", workspaceDir) 304 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 305 306 + mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 307 + Cmd: []string{"bash", "-c", step.command}, 308 + AttachStdout: true, 309 + AttachStderr: true, 310 + }) 311 if err != nil { 312 + return fmt.Errorf("creating exec: %w", err) 313 } 314 315 // start tailing logs in background 316 tailDone := make(chan error, 1) 317 go func() { 318 + tailDone <- e.TailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 319 }() 320 321 select { 322 + case <-tailDone: 323 324 case <-ctx.Done(): 325 + // cleanup will be handled by DestroyWorkflow, since 326 + // Docker doesn't provide an API to kill an exec run 327 + // (sure, we could grab the PID and kill it ourselves, 328 + // but that's wasted effort) 329 + e.l.Warn("step timed out", "step", step.Name) 330 331 <-tailDone 332 333 return engine.ErrTimedOut ··· 339 default: 340 } 341 342 + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 343 if err != nil { 344 return err 345 } 346 347 + if execInspectResp.ExitCode != 0 { 348 + inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 349 if err != nil { 350 + return err 351 } 352 353 + e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 354 355 + if inspectResp.State.OOMKilled { 356 + return ErrOOMKilled 357 + } 358 + return engine.ErrWorkflowFailed 359 } 360 361 + return nil 362 } 363 364 + func (e *Engine) TailStep(ctx context.Context, wfLogger *engine.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 365 if wfLogger == nil { 366 return nil 367 } 368 369 + // NOTE: This actually *starts* the command. Thanks, Docker! 370 + logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 371 if err != nil { 372 return err 373 } 374 + defer logs.Close() 375 376 _, err = stdcopy.StdCopy( 377 wfLogger.DataWriter("stdout"), 378 wfLogger.DataWriter("stderr"), 379 + logs.Reader, 380 ) 381 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 382 return fmt.Errorf("failed to copy logs: %w", err) ··· 385 return nil 386 } 387 388 func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 389 e.cleanupMu.Lock() 390 key := wid.String() ··· 409 e.cleanup[key] = append(e.cleanup[key], fn) 410 } 411 412 func networkName(wid models.WorkflowId) string { 413 return fmt.Sprintf("workflow-network-%s", wid) 414 }
+2 -1
spindle/engines/nixery/setup_steps.go
··· 10 ) 11 12 func nixConfStep() Step { 13 - setupCmd := `echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf 14 echo 'build-users-group = ' >> /etc/nix/nix.conf` 15 return Step{ 16 command: setupCmd,
··· 10 ) 11 12 func nixConfStep() Step { 13 + setupCmd := `mkdir -p /etc/nix 14 + echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf 15 echo 'build-users-group = ' >> /etc/nix/nix.conf` 16 return Step{ 17 command: setupCmd,