bring back yahoo pipes!
1package engine
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "github.com/google/uuid"
10 "github.com/kierank/pipes/nodes"
11 "github.com/kierank/pipes/store"
12)
13
14type PipeConfig struct {
15 Version string `json:"version"`
16 Nodes []Node `json:"nodes"`
17 Connections []Connection `json:"connections"`
18 Settings Settings `json:"settings"`
19}
20
21type Node struct {
22 ID string `json:"id"`
23 Type string `json:"type"`
24 Position Position `json:"position"`
25 Config map[string]interface{} `json:"config"`
26 Label string `json:"label,omitempty"`
27}
28
29type Connection struct {
30 ID string `json:"id"`
31 Source string `json:"source"`
32 Target string `json:"target"`
33 SourceHandle string `json:"sourceHandle,omitempty"`
34 TargetHandle string `json:"targetHandle,omitempty"`
35}
36
37type Position struct {
38 X float64 `json:"x"`
39 Y float64 `json:"y"`
40}
41
42type Settings struct {
43 Schedule string `json:"schedule,omitempty"`
44 Enabled bool `json:"enabled"`
45 Timeout int `json:"timeout,omitempty"`
46 RetryConfig *RetryConfig `json:"retryConfig,omitempty"`
47}
48
49type RetryConfig struct {
50 MaxRetries int `json:"maxRetries"`
51 BackoffMs int `json:"backoffMs"`
52}
53
54type Executor struct {
55 db *store.DB
56 registry *Registry
57}
58
59func NewExecutor(db *store.DB) *Executor {
60 return &Executor{
61 db: db,
62 registry: NewRegistry(),
63 }
64}
65
66func (e *Executor) Execute(ctx context.Context, pipeID string, triggerType string) (string, error) {
67 executionID := uuid.New().String()
68 startedAt := time.Now().Unix()
69
70 // Create execution record
71 if err := e.db.CreateExecution(executionID, pipeID, triggerType, startedAt); err != nil {
72 return "", fmt.Errorf("create execution: %w", err)
73 }
74
75 // Fetch pipe configuration
76 pipe, err := e.db.GetPipe(pipeID)
77 if err != nil {
78 return "", fmt.Errorf("get pipe: %w", err)
79 }
80
81 if pipe == nil {
82 return "", fmt.Errorf("pipe not found: %s", pipeID)
83 }
84
85 var config PipeConfig
86 if err := json.Unmarshal([]byte(pipe.Config), &config); err != nil {
87 return "", fmt.Errorf("parse config: %w", err)
88 }
89
90 // Execute pipeline
91 itemCount, err := e.executePipeline(ctx, executionID, pipeID, &config)
92
93 completedAt := time.Now().Unix()
94 durationMs := (completedAt - startedAt) * 1000
95
96 if err != nil {
97 e.db.UpdateExecutionFailed(executionID, completedAt, durationMs, err.Error())
98 return executionID, err
99 }
100
101 e.db.UpdateExecutionSuccess(executionID, completedAt, durationMs, itemCount)
102 return executionID, nil
103}
104
105func (e *Executor) executePipeline(ctx context.Context, executionID, pipeID string, config *PipeConfig) (int, error) {
106 // Topological sort to determine execution order
107 order, err := topologicalSort(config.Nodes, config.Connections)
108 if err != nil {
109 return 0, fmt.Errorf("topological sort: %w", err)
110 }
111
112 nodeResults := make(map[string][]interface{})
113 execCtx := nodes.NewContext(executionID, pipeID, e.db)
114
115 for _, nodeID := range order {
116 node := findNode(config.Nodes, nodeID)
117 if node == nil {
118 continue
119 }
120
121 // Get node implementation
122 nodeImpl, err := e.registry.Get(node.Type)
123 if err != nil {
124 return 0, fmt.Errorf("get node type %s: %w", node.Type, err)
125 }
126
127 // Gather inputs from connected nodes
128 inputs := e.gatherInputs(nodeID, config.Connections, nodeResults)
129
130 // Execute node
131 output, err := nodeImpl.Execute(ctx, node.Config, inputs, execCtx)
132 if err != nil {
133 e.db.LogExecution(executionID, nodeID, "error", fmt.Sprintf("Execution failed: %v", err))
134 return 0, fmt.Errorf("node %s (%s): %w", nodeID, node.Type, err)
135 }
136
137 nodeResults[nodeID] = output
138
139 // Log output data
140 outputJSON, _ := json.Marshal(output)
141 e.db.LogExecutionWithData(executionID, nodeID, "data", fmt.Sprintf("%d items", len(output)), string(outputJSON))
142 }
143
144 // Return item count from last node
145 if len(order) == 0 {
146 return 0, nil
147 }
148
149 lastNodeID := order[len(order)-1]
150 finalOutput := nodeResults[lastNodeID]
151 return len(finalOutput), nil
152}
153
154func (e *Executor) gatherInputs(nodeID string, connections []Connection, nodeResults map[string][]interface{}) [][]interface{} {
155 var inputs [][]interface{}
156
157 for _, conn := range connections {
158 if conn.Target == nodeID {
159 if result, ok := nodeResults[conn.Source]; ok {
160 inputs = append(inputs, result)
161 }
162 }
163 }
164
165 return inputs
166}
167
168func topologicalSort(nodes []Node, connections []Connection) ([]string, error) {
169 // Kahn's algorithm for topological sorting
170 inDegree := make(map[string]int)
171 adjacency := make(map[string][]string)
172
173 // Initialize
174 for _, n := range nodes {
175 inDegree[n.ID] = 0
176 adjacency[n.ID] = []string{}
177 }
178
179 // Build graph
180 for _, c := range connections {
181 adjacency[c.Source] = append(adjacency[c.Source], c.Target)
182 inDegree[c.Target]++
183 }
184
185 // Find sources (nodes with no incoming edges)
186 queue := []string{}
187 for id, degree := range inDegree {
188 if degree == 0 {
189 queue = append(queue, id)
190 }
191 }
192
193 sorted := []string{}
194 for len(queue) > 0 {
195 node := queue[0]
196 queue = queue[1:]
197 sorted = append(sorted, node)
198
199 for _, neighbor := range adjacency[node] {
200 inDegree[neighbor]--
201 if inDegree[neighbor] == 0 {
202 queue = append(queue, neighbor)
203 }
204 }
205 }
206
207 if len(sorted) != len(nodes) {
208 return nil, fmt.Errorf("pipeline contains a cycle")
209 }
210
211 return sorted, nil
212}
213
214func findNode(nodes []Node, id string) *Node {
215 for i := range nodes {
216 if nodes[i].ID == id {
217 return &nodes[i]
218 }
219 }
220 return nil
221}