1// Copyright 2019 CUE Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package task provides a registry for tasks to be used by commands.
16package task
17
18import (
19 "context"
20 "io"
21 "sync"
22 "sync/atomic"
23
24 "cuelang.org/go/cue"
25 "cuelang.org/go/cue/errors"
26 "cuelang.org/go/cue/token"
27 "cuelang.org/go/internal/value"
28 "cuelang.org/go/tools/flow"
29)
30
31// A Context provides context for running a task.
32type Context struct {
33 Context context.Context
34
35 TaskKey func(v cue.Value) (string, error)
36
37 Root cue.Value
38 Stdin io.Reader
39 Stdout io.Writer
40 Stderr io.Writer
41 Obj cue.Value
42 Err errors.Error
43}
44
45func (c *Context) Lookup(field string) cue.Value {
46 f := c.Obj.LookupPath(cue.MakePath(cue.Str(field)))
47 if !f.Exists() {
48 c.addErr(f, nil, "could not find field %q", field)
49 return cue.Value{}
50 }
51 if err := f.Err(); err != nil {
52 c.Err = errors.Append(c.Err, errors.Promote(err, "lookup"))
53 }
54 return f
55}
56
57func (c *Context) Int64(field string) int64 {
58 f := c.Obj.LookupPath(cue.MakePath(cue.Str(field)))
59 value, err := f.Int64()
60 if err != nil {
61 c.addErr(f, err, "invalid integer argument")
62 return 0
63 }
64 return value
65}
66
67func (c *Context) String(field string) string {
68 f := c.Obj.LookupPath(cue.MakePath(cue.Str(field)))
69 value, err := f.String()
70 if err != nil {
71 c.addErr(f, err, "invalid string argument")
72 return ""
73 }
74 return value
75}
76
77func (c *Context) Bytes(field string) []byte {
78 f := c.Obj.LookupPath(cue.MakePath(cue.Str(field)))
79 value, err := f.Bytes()
80 if err != nil {
81 c.addErr(f, err, "invalid bytes argument")
82 return nil
83 }
84 return value
85}
86
87func (c *Context) addErr(v cue.Value, wrap error, format string, args ...interface{}) {
88
89 err := &taskError{
90 task: c.Obj,
91 v: v,
92 Message: errors.NewMessagef(format, args...),
93 }
94 c.Err = errors.Append(c.Err, errors.Wrap(err, wrap))
95}
96
97// ErrLegacy is a sentinel error value that may be returned by a TaskKey
98// function to indicate that the task is a legacy task. This will cause the
99// configuration value to be passed to the RunnerFunc instead of an empty
100// value.
101var ErrLegacy error = errors.New("legacy task error")
102
103// NewTaskFunc creates a flow.TaskFunc that uses global settings from Context
104// and a taskKey function to determine the kind of task to run.
105func (c Context) TaskFunc(didWork *atomic.Bool) flow.TaskFunc {
106 return func(v cue.Value) (flow.Runner, error) {
107 kind, err := c.TaskKey(v)
108 var isLegacy bool
109 if err == ErrLegacy {
110 err = nil
111 isLegacy = true
112 }
113 if err != nil || kind == "" {
114 return nil, err
115 }
116
117 didWork.Store(true)
118
119 rf := Lookup(kind)
120 if rf == nil {
121 return nil, errors.Newf(v.Pos(), "runner of kind %q not found", kind)
122 }
123
124 // Verify entry against template.
125 v = value.UnifyBuiltin(v, kind)
126 if err := v.Err(); err != nil {
127 err = v.Validate()
128 return nil, errors.Promote(err, "newTask")
129 }
130
131 runner, err := rf(v)
132 if err != nil {
133 return nil, errors.Promote(err, "errors running task")
134 }
135
136 if !isLegacy {
137 v = cue.Value{}
138 }
139
140 return c.flowFunc(runner, v), nil
141 }
142}
143
144// flowFunc takes a Runner and a schema v, which should only be defined for
145// legacy task ids.
146func (c Context) flowFunc(runner Runner, v cue.Value) flow.RunnerFunc {
147 return flow.RunnerFunc(func(t *flow.Task) error {
148 // Set task-specific values.
149 c.Context = t.Context()
150 c.Obj = t.Value()
151 if v.Exists() {
152 c.Obj = c.Obj.Unify(v)
153 }
154 value, err := runner.Run(&c)
155 if err != nil {
156 return err
157 }
158 if value != nil {
159 _ = t.Fill(value)
160 }
161 return nil
162 })
163}
164
165// taskError wraps some error values to retain position information about the
166// error.
167type taskError struct {
168 task cue.Value
169 v cue.Value
170 errors.Message
171}
172
173var _ errors.Error = &taskError{}
174
175func (t *taskError) Path() (a []string) {
176 for _, x := range t.v.Path().Selectors() {
177 a = append(a, x.String())
178 }
179 return a
180}
181
182func (t *taskError) Position() token.Pos {
183 return t.task.Pos()
184}
185
186func (t *taskError) InputPositions() (a []token.Pos) {
187 _, nx := value.ToInternal(t.v)
188
189 for x := range nx.LeafConjuncts() {
190 if src := x.Source(); src != nil {
191 a = append(a, src.Pos())
192 }
193 }
194 return a
195}
196
197// A RunnerFunc creates a Runner.
198type RunnerFunc func(v cue.Value) (Runner, error)
199
200// A Runner defines a command type.
201type Runner interface {
202 // Init is called with the original configuration before any task is run.
203 // As a result, the configuration may be incomplete, but allows some
204 // validation before tasks are kicked off.
205 // Init(v cue.Value)
206
207 // Runner runs given the current value and returns a new value which is to
208 // be unified with the original result.
209 Run(ctx *Context) (results interface{}, err error)
210}
211
212// Register registers a task for cue commands.
213func Register(key string, f RunnerFunc) {
214 runners.Store(key, f)
215}
216
217// Lookup returns the RunnerFunc for a key.
218func Lookup(key string) RunnerFunc {
219 v, ok := runners.Load(key)
220 if !ok {
221 return nil
222 }
223 return v.(RunnerFunc)
224}
225
226var runners sync.Map