spindle/engines/nixery: provision one container per workflow #427

merged
opened by winter.bsky.social targeting master from winter.bsky.social/core: push-luoyqwkpromz

This moves away from the old method of creating a container with some shared volumes to one that most users would expect: any changes made in one step will be accessible by the following steps, and not only if they're in the workspace or /etc/nix. This also paves the way for a more generic Docker image engine, as users can do things like apt install without the results being blown away across steps.

Signed-off-by: Winter winter@winter.cafe

Changed files
+121 -185
spindle
engines
+119 -184
spindle/engines/nixery/engine.go
··· 9 9 "os" 10 10 "path" 11 11 "runtime" 12 - "strings" 13 12 "sync" 14 13 "time" 15 14 ··· 17 16 "github.com/docker/docker/api/types/image" 18 17 "github.com/docker/docker/api/types/mount" 19 18 "github.com/docker/docker/api/types/network" 20 - "github.com/docker/docker/api/types/volume" 21 19 "github.com/docker/docker/client" 22 20 "github.com/docker/docker/pkg/stdcopy" 23 21 "gopkg.in/yaml.v3" ··· 72 70 } 73 71 74 72 type addlFields struct { 75 - image string 76 - env map[string]string 73 + image string 74 + container string 75 + env map[string]string 77 76 } 78 77 79 78 func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { ··· 170 169 return e, nil 171 170 } 172 171 173 - // SetupWorkflow sets up a new network for the workflow and volumes for 174 - // the workspace and Nix store. These are persisted across steps and are 175 - // destroyed at the end of the workflow. 176 172 func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 177 173 e.l.Info("setting up workflow", "workflow", wid) 178 174 179 - _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 180 - Name: workspaceVolume(wid), 181 - Driver: "local", 175 + _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 176 + Driver: "bridge", 182 177 }) 183 178 if err != nil { 184 179 return err 185 180 } 186 181 e.registerCleanup(wid, func(ctx context.Context) error { 187 - return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 182 + return e.docker.NetworkRemove(ctx, networkName(wid)) 188 183 }) 189 184 190 - _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 191 - Name: nixVolume(wid), 192 - Driver: "local", 193 - }) 185 + addl := wf.Data.(addlFields) 186 + 187 + reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 194 188 if err != nil { 195 - return err 189 + e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 190 + 191 + return fmt.Errorf("pulling image: %w", err) 192 + } 193 + defer reader.Close() 194 + io.Copy(os.Stdout, reader) 195 + 196 + resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 197 + Image: addl.image, 198 + Cmd: []string{"cat"}, 199 + OpenStdin: true, // so cat stays alive :3 200 + Tty: false, 201 + Hostname: "spindle", 202 + // TODO(winter): investigate whether environment variables passed here 203 + // get propagated to ContainerExec processes 204 + }, &container.HostConfig{ 205 + Mounts: []mount.Mount{ 206 + { 207 + Type: mount.TypeTmpfs, 208 + Target: "/tmp", 209 + ReadOnly: false, 210 + TmpfsOptions: &mount.TmpfsOptions{ 211 + Mode: 0o1777, // world-writeable sticky bit 212 + Options: [][]string{ 213 + {"exec"}, 214 + }, 215 + }, 216 + }, 217 + }, 218 + ReadonlyRootfs: false, 219 + CapDrop: []string{"ALL"}, 220 + CapAdd: []string{"CAP_DAC_OVERRIDE"}, 221 + SecurityOpt: []string{"no-new-privileges"}, 222 + ExtraHosts: []string{"host.docker.internal:host-gateway"}, 223 + }, nil, nil, "") 224 + if err != nil { 225 + return fmt.Errorf("creating container: %w", err) 196 226 } 197 227 e.registerCleanup(wid, func(ctx context.Context) error { 198 - return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 228 + err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 229 + if err != nil { 230 + return err 231 + } 232 + 233 + return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 234 + RemoveVolumes: true, 235 + RemoveLinks: false, 236 + Force: false, 237 + }) 199 238 }) 200 239 201 - _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 202 - Driver: "bridge", 240 + err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 241 + if err != nil { 242 + return fmt.Errorf("starting container: %w", err) 243 + } 244 + 245 + mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 246 + Cmd: []string{"mkdir", "-p", workspaceDir}, 247 + AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 248 + AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 203 249 }) 204 250 if err != nil { 205 251 return err 206 252 } 207 - e.registerCleanup(wid, func(ctx context.Context) error { 208 - return e.docker.NetworkRemove(ctx, networkName(wid)) 209 - }) 210 253 211 - addl := wf.Data.(addlFields) 254 + // This actually *starts* the command. Thanks, Docker! 255 + execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 256 + if err != nil { 257 + return err 258 + } 259 + defer execResp.Close() 212 260 213 - reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 261 + // This is apparently best way to wait for the command to complete. 262 + _, err = io.ReadAll(execResp.Reader) 214 263 if err != nil { 215 - e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 264 + return err 265 + } 216 266 217 - return fmt.Errorf("pulling image: %w", err) 267 + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 268 + if err != nil { 269 + return err 218 270 } 219 - defer reader.Close() 220 - io.Copy(os.Stdout, reader) 271 + 272 + if execInspectResp.ExitCode != 0 { 273 + return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 274 + } else if execInspectResp.Running { 275 + return errors.New("mkdir is somehow still running??") 276 + } 277 + 278 + addl.container = resp.ID 279 + wf.Data = addl 221 280 222 281 return nil 223 282 } 224 283 225 284 func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 226 - workflowEnvs := ConstructEnvs(w.Data.(addlFields).env) 285 + addl := w.Data.(addlFields) 286 + workflowEnvs := ConstructEnvs(addl.env) 287 + // TODO(winter): should SetupWorkflow also have secret access? 288 + // IMO yes, but probably worth thinking on. 227 289 for _, s := range secrets { 228 290 workflowEnvs.AddEnv(s.Key, s.Value) 229 291 } ··· 243 305 envs.AddEnv("HOME", workspaceDir) 244 306 e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 245 307 246 - hostConfig := hostConfig(wid) 247 - resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 248 - Image: w.Data.(addlFields).image, 249 - Cmd: []string{"bash", "-c", step.command}, 250 - WorkingDir: workspaceDir, 251 - Tty: false, 252 - Hostname: "spindle", 253 - Env: envs.Slice(), 254 - }, hostConfig, nil, nil, "") 255 - defer e.DestroyStep(ctx, resp.ID) 256 - if err != nil { 257 - return fmt.Errorf("creating container: %w", err) 258 - } 259 - 260 - err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 308 + mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 309 + Cmd: []string{"bash", "-c", step.command}, 310 + AttachStdout: true, 311 + AttachStderr: true, 312 + }) 261 313 if err != nil { 262 - return fmt.Errorf("connecting network: %w", err) 314 + return fmt.Errorf("creating exec: %w", err) 263 315 } 264 316 265 - err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 266 - if err != nil { 267 - return err 268 - } 269 - e.l.Info("started container", "name", resp.ID, "step", step.Name) 270 - 271 317 // start tailing logs in background 272 318 tailDone := make(chan error, 1) 273 319 go func() { 274 - tailDone <- e.TailStep(ctx, wfLogger, resp.ID, wid, idx, step) 275 - }() 276 - 277 - // wait for container completion or timeout 278 - waitDone := make(chan struct{}) 279 - var state *container.State 280 - var waitErr error 281 - 282 - go func() { 283 - defer close(waitDone) 284 - state, waitErr = e.WaitStep(ctx, resp.ID) 320 + tailDone <- e.TailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 285 321 }() 286 322 287 323 select { 288 - case <-waitDone: 289 - 290 - // wait for tailing to complete 291 - <-tailDone 324 + case <-tailDone: 292 325 293 326 case <-ctx.Done(): 294 - e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 295 - err = e.DestroyStep(context.Background(), resp.ID) 296 - if err != nil { 297 - e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 298 - } 327 + // cleanup will be handled by DestroyWorkflow, since 328 + // Docker doesn't provide an API to kill an exec run 329 + // (sure, we could grab the PID and kill it ourselves, 330 + // but that's wasted effort) 331 + e.l.Warn("step timed out", "step", step.Name) 299 332 300 - // wait for both goroutines to finish 301 - <-waitDone 302 333 <-tailDone 303 334 304 335 return engine.ErrTimedOut ··· 310 341 default: 311 342 } 312 343 313 - if waitErr != nil { 314 - return waitErr 315 - } 316 - 317 - err = e.DestroyStep(ctx, resp.ID) 344 + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 318 345 if err != nil { 319 346 return err 320 347 } 321 348 322 - if state.ExitCode != 0 { 323 - e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 324 - if state.OOMKilled { 325 - return ErrOOMKilled 326 - } 327 - return engine.ErrWorkflowFailed 328 - } 329 - 330 - return nil 331 - } 332 - 333 - func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 334 - wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 335 - select { 336 - case err := <-errCh: 349 + if execInspectResp.ExitCode != 0 { 350 + inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 337 351 if err != nil { 338 - return nil, err 352 + return err 339 353 } 340 - case <-wait: 341 - } 342 354 343 - e.l.Info("waited for container", "name", containerID) 355 + e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 344 356 345 - info, err := e.docker.ContainerInspect(ctx, containerID) 346 - if err != nil { 347 - return nil, err 357 + if inspectResp.State.OOMKilled { 358 + return ErrOOMKilled 359 + } 360 + return engine.ErrWorkflowFailed 348 361 } 349 362 350 - return info.State, nil 363 + return nil 351 364 } 352 365 353 - func (e *Engine) TailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 366 + func (e *Engine) TailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 354 367 if wfLogger == nil { 355 368 return nil 356 369 } 357 370 358 - logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 359 - Follow: true, 360 - ShowStdout: true, 361 - ShowStderr: true, 362 - Details: false, 363 - Timestamps: false, 364 - }) 371 + // This actually *starts* the command. Thanks, Docker! 372 + logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 365 373 if err != nil { 366 374 return err 367 375 } 376 + defer logs.Close() 368 377 369 378 _, err = stdcopy.StdCopy( 370 379 wfLogger.DataWriter("stdout"), 371 380 wfLogger.DataWriter("stderr"), 372 - logs, 381 + logs.Reader, 373 382 ) 374 383 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 375 384 return fmt.Errorf("failed to copy logs: %w", err) ··· 378 387 return nil 379 388 } 380 389 381 - func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 382 - err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 383 - if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 384 - return err 385 - } 386 - 387 - if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 388 - RemoveVolumes: true, 389 - RemoveLinks: false, 390 - Force: false, 391 - }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 392 - return err 393 - } 394 - 395 - return nil 396 - } 397 - 398 390 func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 399 391 e.cleanupMu.Lock() 400 392 key := wid.String() ··· 419 411 e.cleanup[key] = append(e.cleanup[key], fn) 420 412 } 421 413 422 - func workspaceVolume(wid models.WorkflowId) string { 423 - return fmt.Sprintf("workspace-%s", wid) 424 - } 425 - 426 - func nixVolume(wid models.WorkflowId) string { 427 - return fmt.Sprintf("nix-%s", wid) 428 - } 429 - 430 414 func networkName(wid models.WorkflowId) string { 431 415 return fmt.Sprintf("workflow-network-%s", wid) 432 416 } 433 - 434 - func hostConfig(wid models.WorkflowId) *container.HostConfig { 435 - hostConfig := &container.HostConfig{ 436 - Mounts: []mount.Mount{ 437 - { 438 - Type: mount.TypeVolume, 439 - Source: workspaceVolume(wid), 440 - Target: workspaceDir, 441 - }, 442 - { 443 - Type: mount.TypeVolume, 444 - Source: nixVolume(wid), 445 - Target: "/nix", 446 - }, 447 - { 448 - Type: mount.TypeTmpfs, 449 - Target: "/tmp", 450 - ReadOnly: false, 451 - TmpfsOptions: &mount.TmpfsOptions{ 452 - Mode: 0o1777, // world-writeable sticky bit 453 - Options: [][]string{ 454 - {"exec"}, 455 - }, 456 - }, 457 - }, 458 - { 459 - Type: mount.TypeVolume, 460 - Source: "etc-nix-" + wid.String(), 461 - Target: "/etc/nix", 462 - }, 463 - }, 464 - ReadonlyRootfs: false, 465 - CapDrop: []string{"ALL"}, 466 - CapAdd: []string{"CAP_DAC_OVERRIDE"}, 467 - SecurityOpt: []string{"no-new-privileges"}, 468 - ExtraHosts: []string{"host.docker.internal:host-gateway"}, 469 - } 470 - 471 - return hostConfig 472 - } 473 - 474 - // thanks woodpecker 475 - func isErrContainerNotFoundOrNotRunning(err error) bool { 476 - // Error response from daemon: Cannot kill container: ...: No such container: ... 477 - // Error response from daemon: Cannot kill container: ...: Container ... is not running" 478 - // Error response from podman daemon: can only kill running containers. ... is in state exited 479 - // Error: No such container: ... 480 - return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 481 - }
+2 -1
spindle/engines/nixery/setup_steps.go
··· 10 10 ) 11 11 12 12 func nixConfStep() Step { 13 - setupCmd := `echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf 13 + setupCmd := `mkdir -p /etc/nix 14 + echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf 14 15 echo 'build-users-group = ' >> /etc/nix/nix.conf` 15 16 return Step{ 16 17 command: setupCmd,