this repo has no description
at master 226 lines 5.5 kB view raw
1// Copyright 2019 CUE Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package task provides a registry for tasks to be used by commands. 16package task 17 18import ( 19 "context" 20 "io" 21 "sync" 22 "sync/atomic" 23 24 "cuelang.org/go/cue" 25 "cuelang.org/go/cue/errors" 26 "cuelang.org/go/cue/token" 27 "cuelang.org/go/internal/value" 28 "cuelang.org/go/tools/flow" 29) 30 31// A Context provides context for running a task. 32type Context struct { 33 Context context.Context 34 35 TaskKey func(v cue.Value) (string, error) 36 37 Root cue.Value 38 Stdin io.Reader 39 Stdout io.Writer 40 Stderr io.Writer 41 Obj cue.Value 42 Err errors.Error 43} 44 45func (c *Context) Lookup(field string) cue.Value { 46 f := c.Obj.LookupPath(cue.MakePath(cue.Str(field))) 47 if !f.Exists() { 48 c.addErr(f, nil, "could not find field %q", field) 49 return cue.Value{} 50 } 51 if err := f.Err(); err != nil { 52 c.Err = errors.Append(c.Err, errors.Promote(err, "lookup")) 53 } 54 return f 55} 56 57func (c *Context) Int64(field string) int64 { 58 f := c.Obj.LookupPath(cue.MakePath(cue.Str(field))) 59 value, err := f.Int64() 60 if err != nil { 61 c.addErr(f, err, "invalid integer argument") 62 return 0 63 } 64 return value 65} 66 67func (c *Context) String(field string) string { 68 f := c.Obj.LookupPath(cue.MakePath(cue.Str(field))) 69 value, err := f.String() 70 if err != nil { 71 c.addErr(f, err, "invalid string argument") 72 return "" 73 } 74 return value 75} 76 77func (c *Context) Bytes(field string) []byte { 78 f := c.Obj.LookupPath(cue.MakePath(cue.Str(field))) 79 value, err := f.Bytes() 80 if err != nil { 81 c.addErr(f, err, "invalid bytes argument") 82 return nil 83 } 84 return value 85} 86 87func (c *Context) addErr(v cue.Value, wrap error, format string, args ...interface{}) { 88 89 err := &taskError{ 90 task: c.Obj, 91 v: v, 92 Message: errors.NewMessagef(format, args...), 93 } 94 c.Err = errors.Append(c.Err, errors.Wrap(err, wrap)) 95} 96 97// ErrLegacy is a sentinel error value that may be returned by a TaskKey 98// function to indicate that the task is a legacy task. This will cause the 99// configuration value to be passed to the RunnerFunc instead of an empty 100// value. 101var ErrLegacy error = errors.New("legacy task error") 102 103// NewTaskFunc creates a flow.TaskFunc that uses global settings from Context 104// and a taskKey function to determine the kind of task to run. 105func (c Context) TaskFunc(didWork *atomic.Bool) flow.TaskFunc { 106 return func(v cue.Value) (flow.Runner, error) { 107 kind, err := c.TaskKey(v) 108 var isLegacy bool 109 if err == ErrLegacy { 110 err = nil 111 isLegacy = true 112 } 113 if err != nil || kind == "" { 114 return nil, err 115 } 116 117 didWork.Store(true) 118 119 rf := Lookup(kind) 120 if rf == nil { 121 return nil, errors.Newf(v.Pos(), "runner of kind %q not found", kind) 122 } 123 124 // Verify entry against template. 125 v = value.UnifyBuiltin(v, kind) 126 if err := v.Err(); err != nil { 127 err = v.Validate() 128 return nil, errors.Promote(err, "newTask") 129 } 130 131 runner, err := rf(v) 132 if err != nil { 133 return nil, errors.Promote(err, "errors running task") 134 } 135 136 if !isLegacy { 137 v = cue.Value{} 138 } 139 140 return c.flowFunc(runner, v), nil 141 } 142} 143 144// flowFunc takes a Runner and a schema v, which should only be defined for 145// legacy task ids. 146func (c Context) flowFunc(runner Runner, v cue.Value) flow.RunnerFunc { 147 return flow.RunnerFunc(func(t *flow.Task) error { 148 // Set task-specific values. 149 c.Context = t.Context() 150 c.Obj = t.Value() 151 if v.Exists() { 152 c.Obj = c.Obj.Unify(v) 153 } 154 value, err := runner.Run(&c) 155 if err != nil { 156 return err 157 } 158 if value != nil { 159 _ = t.Fill(value) 160 } 161 return nil 162 }) 163} 164 165// taskError wraps some error values to retain position information about the 166// error. 167type taskError struct { 168 task cue.Value 169 v cue.Value 170 errors.Message 171} 172 173var _ errors.Error = &taskError{} 174 175func (t *taskError) Path() (a []string) { 176 for _, x := range t.v.Path().Selectors() { 177 a = append(a, x.String()) 178 } 179 return a 180} 181 182func (t *taskError) Position() token.Pos { 183 return t.task.Pos() 184} 185 186func (t *taskError) InputPositions() (a []token.Pos) { 187 _, nx := value.ToInternal(t.v) 188 189 for x := range nx.LeafConjuncts() { 190 if src := x.Source(); src != nil { 191 a = append(a, src.Pos()) 192 } 193 } 194 return a 195} 196 197// A RunnerFunc creates a Runner. 198type RunnerFunc func(v cue.Value) (Runner, error) 199 200// A Runner defines a command type. 201type Runner interface { 202 // Init is called with the original configuration before any task is run. 203 // As a result, the configuration may be incomplete, but allows some 204 // validation before tasks are kicked off. 205 // Init(v cue.Value) 206 207 // Runner runs given the current value and returns a new value which is to 208 // be unified with the original result. 209 Run(ctx *Context) (results interface{}, err error) 210} 211 212// Register registers a task for cue commands. 213func Register(key string, f RunnerFunc) { 214 runners.Store(key, f) 215} 216 217// Lookup returns the RunnerFunc for a key. 218func Lookup(key string) RunnerFunc { 219 v, ok := runners.Load(key) 220 if !ok { 221 return nil 222 } 223 return v.(RunnerFunc) 224} 225 226var runners sync.Map