1// Copyright 2020 CUE Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package flow_test
16
17import (
18 "context"
19 "fmt"
20 "os"
21 "path"
22 "strings"
23 "sync"
24 "testing"
25 "time"
26
27 "cuelang.org/go/cue"
28 "cuelang.org/go/cue/cuecontext"
29 "cuelang.org/go/cue/errors"
30 "cuelang.org/go/cue/format"
31 "cuelang.org/go/cue/stats"
32 "cuelang.org/go/internal/cuetdtest"
33 "cuelang.org/go/internal/cuetxtar"
34 "cuelang.org/go/tools/flow"
35)
36
37// TestTasks tests the logic that determines which nodes are tasks and what are
38// their dependencies.
39func TestFlow(t *testing.T) {
40 test := cuetxtar.TxTarTest{
41 Root: "./testdata",
42 Name: "run",
43 Matrix: cuetdtest.SmallMatrix,
44 }
45
46 test.Run(t, func(t *cuetxtar.Test) {
47 v := t.CueContext().BuildInstance(t.Instance())
48 if err := v.Err(); err != nil {
49 t.Fatal(errors.Details(err, nil))
50 }
51
52 seqNum = 0
53
54 var tasksTotal stats.Counts
55
56 updateFunc := func(c *flow.Controller, task *flow.Task) error {
57 str := flow.MermaidGraph(c)
58 step := fmt.Sprintf("t%d", seqNum)
59 fmt.Fprintln(t.Writer(step), str)
60
61 if task != nil {
62 n := task.Value().Syntax(cue.Final())
63 b, err := format.Node(n)
64 if err != nil {
65 t.Fatal(err)
66 }
67 fmt.Fprintln(t.Writer(path.Join(step, "value")), string(b))
68
69 if t.M.IsDefault() {
70 stats := task.Stats()
71 tasksTotal.Add(stats)
72 fmt.Fprintln(t.Writer(path.Join(step, "stats")), &stats)
73 }
74 }
75
76 incSeqNum()
77
78 return nil
79 }
80
81 cfg := &flow.Config{
82 Root: cue.ParsePath("root"),
83 InferTasks: t.Bool("InferTasks"),
84 IgnoreConcrete: t.Bool("IgnoreConcrete"),
85 FindHiddenTasks: t.Bool("FindHiddenTasks"),
86 UpdateFunc: updateFunc,
87 }
88
89 c := flow.New(cfg, v, taskFunc)
90
91 w := t.Writer("errors")
92 if err := c.Run(context.Background()); err != nil {
93 cwd, _ := os.Getwd()
94 fmt.Fprint(w, "error: ")
95 errors.Print(w, err, &errors.Config{
96 Cwd: cwd,
97 ToSlash: true,
98 })
99 }
100
101 if !t.M.IsDefault() {
102 return
103 }
104
105 totals := c.Stats()
106 if tasksTotal != zeroStats && totals != tasksTotal {
107 t.Errorf(diffMsg, tasksTotal, totals, tasksTotal.Since(totals))
108 }
109 fmt.Fprintln(t.Writer("stats/totals"), totals)
110 })
111}
112
113var zeroStats stats.Counts
114
115const diffMsg = `
116stats: task totals different from controller:
117task totals:
118%v
119
120controller totals:
121%v
122
123task totals - controller totals:
124%v`
125
126func TestFlowValuePanic(t *testing.T) {
127 f := `
128 root: {
129 a: {
130 $id: "slow"
131 out: string
132 }
133 b: {
134 $id: "slow"
135 $after: a
136 out: string
137 }
138 }
139 `
140 ctx := cuecontext.New()
141 v := ctx.CompileString(f)
142
143 ch := make(chan bool, 1)
144
145 cfg := &flow.Config{
146 Root: cue.ParsePath("root"),
147 UpdateFunc: func(c *flow.Controller, t *flow.Task) error {
148 ch <- true
149 return nil
150 },
151 }
152
153 c := flow.New(cfg, v, taskFunc)
154
155 defer func() { recover() }()
156
157 go c.Run(context.TODO())
158
159 // Call Value amidst two task runs. This should trigger a panic as the flow
160 // is not terminated.
161 <-ch
162 c.Value()
163 <-ch
164
165 t.Errorf("Value() did not panic")
166}
167
168func taskFunc(v cue.Value) (flow.Runner, error) {
169 idPath := cue.MakePath(cue.Str("$id"))
170 valPath := cue.MakePath(cue.Str("val"))
171
172 switch name, err := v.LookupPath(idPath).String(); name {
173 default:
174 if err == nil {
175 return flow.RunnerFunc(func(t *flow.Task) error {
176 t.Fill(map[string]string{"stdout": "foo"})
177 return nil
178 }), nil
179 } else if v.LookupPath(idPath).Exists() {
180 return nil, err
181 }
182
183 case "valToOut":
184 return flow.RunnerFunc(func(t *flow.Task) error {
185 if str, err := t.Value().LookupPath(valPath).String(); err == nil {
186 t.Fill(map[string]string{"out": str})
187 }
188 return nil
189 }), nil
190
191 case "failure":
192 return flow.RunnerFunc(func(t *flow.Task) error {
193 return errors.New("failure")
194 }), nil
195
196 case "abort":
197 return flow.RunnerFunc(func(t *flow.Task) error {
198 return flow.ErrAbort
199 }), nil
200
201 case "list":
202 return flow.RunnerFunc(func(t *flow.Task) error {
203 t.Fill(map[string][]int{"out": {1, 2}})
204 return nil
205 }), nil
206
207 case "slow":
208 return flow.RunnerFunc(func(t *flow.Task) error {
209 time.Sleep(10 * time.Millisecond)
210 t.Fill(map[string]string{"out": "finished"})
211 return nil
212 }), nil
213
214 case "sequenced":
215 // This task is used to serialize different runners in case
216 // non-deterministic scheduling is possible.
217 return flow.RunnerFunc(func(t *flow.Task) error {
218 seq, err := t.Value().LookupPath(cue.MakePath(cue.Str("seq"))).Int64()
219 if err != nil {
220 return err
221 }
222
223 waitSeqNum(seq)
224
225 if str, err := t.Value().LookupPath(valPath).String(); err == nil {
226 t.Fill(map[string]string{"out": str})
227 }
228
229 return nil
230 }), nil
231 }
232 return nil, nil
233}
234
235// These vars are used to serialize tasks that are run in parallel. This allows
236// for testing running tasks in parallel, while obtaining deterministic output.
237var (
238 seqNum int64
239 seqLock sync.Mutex
240 seqCond = sync.NewCond(&seqLock)
241)
242
243func incSeqNum() {
244 seqCond.L.Lock()
245 seqNum++
246 seqCond.Broadcast()
247 seqCond.L.Unlock()
248}
249
250func waitSeqNum(seq int64) {
251 seqCond.L.Lock()
252 for seq != seqNum {
253 seqCond.Wait()
254 }
255 seqCond.L.Unlock()
256}
257
258// DO NOT REMOVE: for testing purposes.
259func TestX(t *testing.T) {
260 in := `
261 `
262
263 if strings.TrimSpace(in) == "" {
264 t.Skip()
265 }
266
267 rt := cuecontext.New()
268 v := rt.CompileString(in)
269 if err := v.Err(); err != nil {
270 t.Fatal(err)
271 }
272
273 c := flow.New(&flow.Config{
274 Root: cue.ParsePath("root"),
275 UpdateFunc: func(c *flow.Controller, ft *flow.Task) error {
276 if ft != nil {
277 t.Errorf("\nTASK:\n%s", ft.Stats())
278 }
279 return nil
280 },
281 }, v, taskFunc)
282
283 t.Error(flow.MermaidGraph(c))
284
285 if err := c.Run(context.Background()); err != nil {
286 t.Fatal(errors.Details(err, nil))
287 }
288
289 t.Errorf("\nCONTROLLER:\n%s", c.Stats())
290}