this repo has no description
at master 290 lines 6.3 kB view raw
1// Copyright 2020 CUE Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package flow_test 16 17import ( 18 "context" 19 "fmt" 20 "os" 21 "path" 22 "strings" 23 "sync" 24 "testing" 25 "time" 26 27 "cuelang.org/go/cue" 28 "cuelang.org/go/cue/cuecontext" 29 "cuelang.org/go/cue/errors" 30 "cuelang.org/go/cue/format" 31 "cuelang.org/go/cue/stats" 32 "cuelang.org/go/internal/cuetdtest" 33 "cuelang.org/go/internal/cuetxtar" 34 "cuelang.org/go/tools/flow" 35) 36 37// TestTasks tests the logic that determines which nodes are tasks and what are 38// their dependencies. 39func TestFlow(t *testing.T) { 40 test := cuetxtar.TxTarTest{ 41 Root: "./testdata", 42 Name: "run", 43 Matrix: cuetdtest.SmallMatrix, 44 } 45 46 test.Run(t, func(t *cuetxtar.Test) { 47 v := t.CueContext().BuildInstance(t.Instance()) 48 if err := v.Err(); err != nil { 49 t.Fatal(errors.Details(err, nil)) 50 } 51 52 seqNum = 0 53 54 var tasksTotal stats.Counts 55 56 updateFunc := func(c *flow.Controller, task *flow.Task) error { 57 str := flow.MermaidGraph(c) 58 step := fmt.Sprintf("t%d", seqNum) 59 fmt.Fprintln(t.Writer(step), str) 60 61 if task != nil { 62 n := task.Value().Syntax(cue.Final()) 63 b, err := format.Node(n) 64 if err != nil { 65 t.Fatal(err) 66 } 67 fmt.Fprintln(t.Writer(path.Join(step, "value")), string(b)) 68 69 if t.M.IsDefault() { 70 stats := task.Stats() 71 tasksTotal.Add(stats) 72 fmt.Fprintln(t.Writer(path.Join(step, "stats")), &stats) 73 } 74 } 75 76 incSeqNum() 77 78 return nil 79 } 80 81 cfg := &flow.Config{ 82 Root: cue.ParsePath("root"), 83 InferTasks: t.Bool("InferTasks"), 84 IgnoreConcrete: t.Bool("IgnoreConcrete"), 85 FindHiddenTasks: t.Bool("FindHiddenTasks"), 86 UpdateFunc: updateFunc, 87 } 88 89 c := flow.New(cfg, v, taskFunc) 90 91 w := t.Writer("errors") 92 if err := c.Run(context.Background()); err != nil { 93 cwd, _ := os.Getwd() 94 fmt.Fprint(w, "error: ") 95 errors.Print(w, err, &errors.Config{ 96 Cwd: cwd, 97 ToSlash: true, 98 }) 99 } 100 101 if !t.M.IsDefault() { 102 return 103 } 104 105 totals := c.Stats() 106 if tasksTotal != zeroStats && totals != tasksTotal { 107 t.Errorf(diffMsg, tasksTotal, totals, tasksTotal.Since(totals)) 108 } 109 fmt.Fprintln(t.Writer("stats/totals"), totals) 110 }) 111} 112 113var zeroStats stats.Counts 114 115const diffMsg = ` 116stats: task totals different from controller: 117task totals: 118%v 119 120controller totals: 121%v 122 123task totals - controller totals: 124%v` 125 126func TestFlowValuePanic(t *testing.T) { 127 f := ` 128 root: { 129 a: { 130 $id: "slow" 131 out: string 132 } 133 b: { 134 $id: "slow" 135 $after: a 136 out: string 137 } 138 } 139 ` 140 ctx := cuecontext.New() 141 v := ctx.CompileString(f) 142 143 ch := make(chan bool, 1) 144 145 cfg := &flow.Config{ 146 Root: cue.ParsePath("root"), 147 UpdateFunc: func(c *flow.Controller, t *flow.Task) error { 148 ch <- true 149 return nil 150 }, 151 } 152 153 c := flow.New(cfg, v, taskFunc) 154 155 defer func() { recover() }() 156 157 go c.Run(context.TODO()) 158 159 // Call Value amidst two task runs. This should trigger a panic as the flow 160 // is not terminated. 161 <-ch 162 c.Value() 163 <-ch 164 165 t.Errorf("Value() did not panic") 166} 167 168func taskFunc(v cue.Value) (flow.Runner, error) { 169 idPath := cue.MakePath(cue.Str("$id")) 170 valPath := cue.MakePath(cue.Str("val")) 171 172 switch name, err := v.LookupPath(idPath).String(); name { 173 default: 174 if err == nil { 175 return flow.RunnerFunc(func(t *flow.Task) error { 176 t.Fill(map[string]string{"stdout": "foo"}) 177 return nil 178 }), nil 179 } else if v.LookupPath(idPath).Exists() { 180 return nil, err 181 } 182 183 case "valToOut": 184 return flow.RunnerFunc(func(t *flow.Task) error { 185 if str, err := t.Value().LookupPath(valPath).String(); err == nil { 186 t.Fill(map[string]string{"out": str}) 187 } 188 return nil 189 }), nil 190 191 case "failure": 192 return flow.RunnerFunc(func(t *flow.Task) error { 193 return errors.New("failure") 194 }), nil 195 196 case "abort": 197 return flow.RunnerFunc(func(t *flow.Task) error { 198 return flow.ErrAbort 199 }), nil 200 201 case "list": 202 return flow.RunnerFunc(func(t *flow.Task) error { 203 t.Fill(map[string][]int{"out": {1, 2}}) 204 return nil 205 }), nil 206 207 case "slow": 208 return flow.RunnerFunc(func(t *flow.Task) error { 209 time.Sleep(10 * time.Millisecond) 210 t.Fill(map[string]string{"out": "finished"}) 211 return nil 212 }), nil 213 214 case "sequenced": 215 // This task is used to serialize different runners in case 216 // non-deterministic scheduling is possible. 217 return flow.RunnerFunc(func(t *flow.Task) error { 218 seq, err := t.Value().LookupPath(cue.MakePath(cue.Str("seq"))).Int64() 219 if err != nil { 220 return err 221 } 222 223 waitSeqNum(seq) 224 225 if str, err := t.Value().LookupPath(valPath).String(); err == nil { 226 t.Fill(map[string]string{"out": str}) 227 } 228 229 return nil 230 }), nil 231 } 232 return nil, nil 233} 234 235// These vars are used to serialize tasks that are run in parallel. This allows 236// for testing running tasks in parallel, while obtaining deterministic output. 237var ( 238 seqNum int64 239 seqLock sync.Mutex 240 seqCond = sync.NewCond(&seqLock) 241) 242 243func incSeqNum() { 244 seqCond.L.Lock() 245 seqNum++ 246 seqCond.Broadcast() 247 seqCond.L.Unlock() 248} 249 250func waitSeqNum(seq int64) { 251 seqCond.L.Lock() 252 for seq != seqNum { 253 seqCond.Wait() 254 } 255 seqCond.L.Unlock() 256} 257 258// DO NOT REMOVE: for testing purposes. 259func TestX(t *testing.T) { 260 in := ` 261 ` 262 263 if strings.TrimSpace(in) == "" { 264 t.Skip() 265 } 266 267 rt := cuecontext.New() 268 v := rt.CompileString(in) 269 if err := v.Err(); err != nil { 270 t.Fatal(err) 271 } 272 273 c := flow.New(&flow.Config{ 274 Root: cue.ParsePath("root"), 275 UpdateFunc: func(c *flow.Controller, ft *flow.Task) error { 276 if ft != nil { 277 t.Errorf("\nTASK:\n%s", ft.Stats()) 278 } 279 return nil 280 }, 281 }, v, taskFunc) 282 283 t.Error(flow.MermaidGraph(c)) 284 285 if err := c.Run(context.Background()); err != nil { 286 t.Fatal(errors.Details(err, nil)) 287 } 288 289 t.Errorf("\nCONTROLLER:\n%s", c.Stats()) 290}