spindle/engines/nixery: provision one container per workflow #427

merged
opened by winter.bsky.social targeting master from winter.bsky.social/core: push-luoyqwkpromz

This moves away from the old method of creating a container with some shared volumes to one that most users would expect: any changes made in one step will be accessible by the following steps, and not only if they're in the workspace or /etc/nix. This also paves the way for a more generic Docker image engine, as users can do things like apt install without the results being blown away across steps.

Signed-off-by: Winter winter@winter.cafe

Changed files
+119 -185
spindle
engines
+117 -184
spindle/engines/nixery/engine.go
··· 9 9 "os" 10 10 "path" 11 11 "runtime" 12 - "strings" 13 12 "sync" 14 13 "time" 15 14 ··· 17 16 "github.com/docker/docker/api/types/image" 18 17 "github.com/docker/docker/api/types/mount" 19 18 "github.com/docker/docker/api/types/network" 20 - "github.com/docker/docker/api/types/volume" 21 19 "github.com/docker/docker/client" 22 20 "github.com/docker/docker/pkg/stdcopy" 23 21 "gopkg.in/yaml.v3" ··· 72 70 } 73 71 74 72 type addlFields struct { 75 - image string 76 - env map[string]string 73 + image string 74 + container string 75 + env map[string]string 77 76 } 78 77 79 78 func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { ··· 170 169 return e, nil 171 170 } 172 171 173 - // SetupWorkflow sets up a new network for the workflow and volumes for 174 - // the workspace and Nix store. These are persisted across steps and are 175 - // destroyed at the end of the workflow. 176 172 func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 177 173 e.l.Info("setting up workflow", "workflow", wid) 178 174 179 - _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 180 - Name: workspaceVolume(wid), 181 - Driver: "local", 175 + _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 176 + Driver: "bridge", 182 177 }) 183 178 if err != nil { 184 179 return err 185 180 } 186 181 e.registerCleanup(wid, func(ctx context.Context) error { 187 - return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 182 + return e.docker.NetworkRemove(ctx, networkName(wid)) 188 183 }) 189 184 190 - _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 191 - Name: nixVolume(wid), 192 - Driver: "local", 193 - }) 185 + addl := wf.Data.(addlFields) 186 + 187 + reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 194 188 if err != nil { 195 - return err 189 + e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 190 + 191 + return fmt.Errorf("pulling image: %w", err) 192 + } 193 + defer reader.Close() 194 + io.Copy(os.Stdout, reader) 195 + 196 + resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 197 + Image: addl.image, 198 + Cmd: []string{"cat"}, 199 + OpenStdin: true, // so cat stays alive :3 200 + Tty: false, 201 + Hostname: "spindle", 202 + // TODO(winter): investigate whether environment variables passed here 203 + // get propagated to ContainerExec processes 204 + }, &container.HostConfig{ 205 + Mounts: []mount.Mount{ 206 + { 207 + Type: mount.TypeTmpfs, 208 + Target: "/tmp", 209 + ReadOnly: false, 210 + TmpfsOptions: &mount.TmpfsOptions{ 211 + Mode: 0o1777, // world-writeable sticky bit 212 + Options: [][]string{ 213 + {"exec"}, 214 + }, 215 + }, 216 + }, 217 + }, 218 + ReadonlyRootfs: false, 219 + CapDrop: []string{"ALL"}, 220 + CapAdd: []string{"CAP_DAC_OVERRIDE"}, 221 + SecurityOpt: []string{"no-new-privileges"}, 222 + ExtraHosts: []string{"host.docker.internal:host-gateway"}, 223 + }, nil, nil, "") 224 + if err != nil { 225 + return fmt.Errorf("creating container: %w", err) 196 226 } 197 227 e.registerCleanup(wid, func(ctx context.Context) error { 198 - return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 228 + err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 229 + if err != nil { 230 + return err 231 + } 232 + 233 + return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 234 + RemoveVolumes: true, 235 + RemoveLinks: false, 236 + Force: false, 237 + }) 199 238 }) 200 239 201 - _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 202 - Driver: "bridge", 240 + err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 241 + if err != nil { 242 + return fmt.Errorf("starting container: %w", err) 243 + } 244 + 245 + mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 246 + Cmd: []string{"mkdir", "-p", workspaceDir}, 247 + AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 248 + AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 203 249 }) 204 250 if err != nil { 205 251 return err 206 252 } 207 - e.registerCleanup(wid, func(ctx context.Context) error { 208 - return e.docker.NetworkRemove(ctx, networkName(wid)) 209 - }) 210 253 211 - addl := wf.Data.(addlFields) 254 + execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 255 + if err != nil { 256 + return err 257 + } 258 + defer execResp.Close() 212 259 213 - reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 260 + _, err = io.ReadAll(execResp.Reader) 214 261 if err != nil { 215 - e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 262 + return err 263 + } 216 264 217 - return fmt.Errorf("pulling image: %w", err) 265 + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 266 + if err != nil { 267 + return err 218 268 } 219 - defer reader.Close() 220 - io.Copy(os.Stdout, reader) 269 + 270 + if execInspectResp.ExitCode != 0 { 271 + return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 272 + } else if execInspectResp.Running { 273 + return errors.New("mkdir is somehow still running??") 274 + } 275 + 276 + addl.container = resp.ID 277 + wf.Data = addl 221 278 222 279 return nil 223 280 } 224 281 225 282 func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 226 - workflowEnvs := ConstructEnvs(w.Data.(addlFields).env) 283 + addl := w.Data.(addlFields) 284 + workflowEnvs := ConstructEnvs(addl.env) 285 + // TODO(winter): should SetupWorkflow also have secret access? 286 + // IMO yes, but probably worth thinking on. 227 287 for _, s := range secrets { 228 288 workflowEnvs.AddEnv(s.Key, s.Value) 229 289 } ··· 243 303 envs.AddEnv("HOME", workspaceDir) 244 304 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 245 305 246 - hostConfig := hostConfig(wid) 247 - resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 248 - Image: w.Data.(addlFields).image, 249 - Cmd: []string{"bash", "-c", step.command}, 250 - WorkingDir: workspaceDir, 251 - Tty: false, 252 - Hostname: "spindle", 253 - Env: envs.Slice(), 254 - }, hostConfig, nil, nil, "") 255 - defer e.DestroyStep(ctx, resp.ID) 256 - if err != nil { 257 - return fmt.Errorf("creating container: %w", err) 258 - } 259 - 260 - err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 306 + mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 307 + Cmd: []string{"bash", "-c", step.command}, 308 + AttachStdout: true, 309 + AttachStderr: true, 310 + }) 261 311 if err != nil { 262 - return fmt.Errorf("connecting network: %w", err) 312 + return fmt.Errorf("creating exec: %w", err) 263 313 } 264 314 265 - err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 266 - if err != nil { 267 - return err 268 - } 269 - e.l.Info("started container", "name", resp.ID, "step", step.Name) 270 - 271 315 // start tailing logs in background 272 316 tailDone := make(chan error, 1) 273 317 go func() { 274 - tailDone <- e.TailStep(ctx, wfLogger, resp.ID, wid, idx, step) 275 - }() 276 - 277 - // wait for container completion or timeout 278 - waitDone := make(chan struct{}) 279 - var state *container.State 280 - var waitErr error 281 - 282 - go func() { 283 - defer close(waitDone) 284 - state, waitErr = e.WaitStep(ctx, resp.ID) 318 + tailDone <- e.TailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 285 319 }() 286 320 287 321 select { 288 - case <-waitDone: 289 - 290 - // wait for tailing to complete 291 - <-tailDone 322 + case <-tailDone: 292 323 293 324 case <-ctx.Done(): 294 - e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 295 - err = e.DestroyStep(context.Background(), resp.ID) 296 - if err != nil { 297 - e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 298 - } 325 + // cleanup will be handled by DestroyWorkflow, since 326 + // Docker doesn't provide an API to kill an exec run 327 + // (sure, we could grab the PID and kill it ourselves, 328 + // but that's wasted effort) 329 + e.l.Warn("step timed out", "step", step.Name) 299 330 300 - // wait for both goroutines to finish 301 - <-waitDone 302 331 <-tailDone 303 332 304 333 return engine.ErrTimedOut ··· 310 339 default: 311 340 } 312 341 313 - if waitErr != nil { 314 - return waitErr 315 - } 316 - 317 - err = e.DestroyStep(ctx, resp.ID) 342 + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 318 343 if err != nil { 319 344 return err 320 345 } 321 346 322 - if state.ExitCode != 0 { 323 - e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 324 - if state.OOMKilled { 325 - return ErrOOMKilled 326 - } 327 - return engine.ErrWorkflowFailed 328 - } 329 - 330 - return nil 331 - } 332 - 333 - func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 334 - wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 335 - select { 336 - case err := <-errCh: 347 + if execInspectResp.ExitCode != 0 { 348 + inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 337 349 if err != nil { 338 - return nil, err 350 + return err 339 351 } 340 - case <-wait: 341 - } 342 352 343 - e.l.Info("waited for container", "name", containerID) 353 + e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 344 354 345 - info, err := e.docker.ContainerInspect(ctx, containerID) 346 - if err != nil { 347 - return nil, err 355 + if inspectResp.State.OOMKilled { 356 + return ErrOOMKilled 357 + } 358 + return engine.ErrWorkflowFailed 348 359 } 349 360 350 - return info.State, nil 361 + return nil 351 362 } 352 363 353 - func (e *Engine) TailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 364 + func (e *Engine) TailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 354 365 if wfLogger == nil { 355 366 return nil 356 367 } 357 368 358 - logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 359 - Follow: true, 360 - ShowStdout: true, 361 - ShowStderr: true, 362 - Details: false, 363 - Timestamps: false, 364 - }) 369 + // NOTE: This actually *starts* the command. Thanks, Docker! 370 + logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 365 371 if err != nil { 366 372 return err 367 373 } 374 + defer logs.Close() 368 375 369 376 _, err = stdcopy.StdCopy( 370 377 wfLogger.DataWriter("stdout"), 371 378 wfLogger.DataWriter("stderr"), 372 - logs, 379 + logs.Reader, 373 380 ) 374 381 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 375 382 return fmt.Errorf("failed to copy logs: %w", err) ··· 378 385 return nil 379 386 } 380 387 381 - func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 382 - err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 383 - if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 384 - return err 385 - } 386 - 387 - if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 388 - RemoveVolumes: true, 389 - RemoveLinks: false, 390 - Force: false, 391 - }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 392 - return err 393 - } 394 - 395 - return nil 396 - } 397 - 398 388 func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 399 389 e.cleanupMu.Lock() 400 390 key := wid.String() ··· 419 409 e.cleanup[key] = append(e.cleanup[key], fn) 420 410 } 421 411 422 - func workspaceVolume(wid models.WorkflowId) string { 423 - return fmt.Sprintf("workspace-%s", wid) 424 - } 425 - 426 - func nixVolume(wid models.WorkflowId) string { 427 - return fmt.Sprintf("nix-%s", wid) 428 - } 429 - 430 412 func networkName(wid models.WorkflowId) string { 431 413 return fmt.Sprintf("workflow-network-%s", wid) 432 414 } 433 - 434 - func hostConfig(wid models.WorkflowId) *container.HostConfig { 435 - hostConfig := &container.HostConfig{ 436 - Mounts: []mount.Mount{ 437 - { 438 - Type: mount.TypeVolume, 439 - Source: workspaceVolume(wid), 440 - Target: workspaceDir, 441 - }, 442 - { 443 - Type: mount.TypeVolume, 444 - Source: nixVolume(wid), 445 - Target: "/nix", 446 - }, 447 - { 448 - Type: mount.TypeTmpfs, 449 - Target: "/tmp", 450 - ReadOnly: false, 451 - TmpfsOptions: &mount.TmpfsOptions{ 452 - Mode: 0o1777, // world-writeable sticky bit 453 - Options: [][]string{ 454 - {"exec"}, 455 - }, 456 - }, 457 - }, 458 - { 459 - Type: mount.TypeVolume, 460 - Source: "etc-nix-" + wid.String(), 461 - Target: "/etc/nix", 462 - }, 463 - }, 464 - ReadonlyRootfs: false, 465 - CapDrop: []string{"ALL"}, 466 - CapAdd: []string{"CAP_DAC_OVERRIDE"}, 467 - SecurityOpt: []string{"no-new-privileges"}, 468 - ExtraHosts: []string{"host.docker.internal:host-gateway"}, 469 - } 470 - 471 - return hostConfig 472 - } 473 - 474 - // thanks woodpecker 475 - func isErrContainerNotFoundOrNotRunning(err error) bool { 476 - // Error response from daemon: Cannot kill container: ...: No such container: ... 477 - // Error response from daemon: Cannot kill container: ...: Container ... is not running" 478 - // Error response from podman daemon: can only kill running containers. ... is in state exited 479 - // Error: No such container: ... 480 - return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 481 - }
+2 -1
spindle/engines/nixery/setup_steps.go
··· 10 10 ) 11 11 12 12 func nixConfStep() Step { 13 - setupCmd := `echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf 13 + setupCmd := `mkdir -p /etc/nix 14 + echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf 14 15 echo 'build-users-group = ' >> /etc/nix/nix.conf` 15 16 return Step{ 16 17 command: setupCmd,