bring back yahoo pipes!
at main 5.5 kB view raw
1package engine 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "github.com/google/uuid" 10 "github.com/kierank/pipes/nodes" 11 "github.com/kierank/pipes/store" 12) 13 14type PipeConfig struct { 15 Version string `json:"version"` 16 Nodes []Node `json:"nodes"` 17 Connections []Connection `json:"connections"` 18 Settings Settings `json:"settings"` 19} 20 21type Node struct { 22 ID string `json:"id"` 23 Type string `json:"type"` 24 Position Position `json:"position"` 25 Config map[string]interface{} `json:"config"` 26 Label string `json:"label,omitempty"` 27} 28 29type Connection struct { 30 ID string `json:"id"` 31 Source string `json:"source"` 32 Target string `json:"target"` 33 SourceHandle string `json:"sourceHandle,omitempty"` 34 TargetHandle string `json:"targetHandle,omitempty"` 35} 36 37type Position struct { 38 X float64 `json:"x"` 39 Y float64 `json:"y"` 40} 41 42type Settings struct { 43 Schedule string `json:"schedule,omitempty"` 44 Enabled bool `json:"enabled"` 45 Timeout int `json:"timeout,omitempty"` 46 RetryConfig *RetryConfig `json:"retryConfig,omitempty"` 47} 48 49type RetryConfig struct { 50 MaxRetries int `json:"maxRetries"` 51 BackoffMs int `json:"backoffMs"` 52} 53 54type Executor struct { 55 db *store.DB 56 registry *Registry 57} 58 59func NewExecutor(db *store.DB) *Executor { 60 return &Executor{ 61 db: db, 62 registry: NewRegistry(), 63 } 64} 65 66func (e *Executor) Execute(ctx context.Context, pipeID string, triggerType string) (string, error) { 67 executionID := uuid.New().String() 68 startedAt := time.Now().Unix() 69 70 // Create execution record 71 if err := e.db.CreateExecution(executionID, pipeID, triggerType, startedAt); err != nil { 72 return "", fmt.Errorf("create execution: %w", err) 73 } 74 75 // Fetch pipe configuration 76 pipe, err := e.db.GetPipe(pipeID) 77 if err != nil { 78 return "", fmt.Errorf("get pipe: %w", err) 79 } 80 81 if pipe == nil { 82 return "", fmt.Errorf("pipe not found: %s", pipeID) 83 } 84 85 var config PipeConfig 86 if err := json.Unmarshal([]byte(pipe.Config), &config); err != nil { 87 return "", fmt.Errorf("parse config: %w", err) 88 } 89 90 // Execute pipeline 91 itemCount, err := e.executePipeline(ctx, executionID, pipeID, &config) 92 93 completedAt := time.Now().Unix() 94 durationMs := (completedAt - startedAt) * 1000 95 96 if err != nil { 97 e.db.UpdateExecutionFailed(executionID, completedAt, durationMs, err.Error()) 98 return executionID, err 99 } 100 101 e.db.UpdateExecutionSuccess(executionID, completedAt, durationMs, itemCount) 102 return executionID, nil 103} 104 105func (e *Executor) executePipeline(ctx context.Context, executionID, pipeID string, config *PipeConfig) (int, error) { 106 // Topological sort to determine execution order 107 order, err := topologicalSort(config.Nodes, config.Connections) 108 if err != nil { 109 return 0, fmt.Errorf("topological sort: %w", err) 110 } 111 112 nodeResults := make(map[string][]interface{}) 113 execCtx := nodes.NewContext(executionID, pipeID, e.db) 114 115 for _, nodeID := range order { 116 node := findNode(config.Nodes, nodeID) 117 if node == nil { 118 continue 119 } 120 121 // Get node implementation 122 nodeImpl, err := e.registry.Get(node.Type) 123 if err != nil { 124 return 0, fmt.Errorf("get node type %s: %w", node.Type, err) 125 } 126 127 // Gather inputs from connected nodes 128 inputs := e.gatherInputs(nodeID, config.Connections, nodeResults) 129 130 // Execute node 131 output, err := nodeImpl.Execute(ctx, node.Config, inputs, execCtx) 132 if err != nil { 133 e.db.LogExecution(executionID, nodeID, "error", fmt.Sprintf("Execution failed: %v", err)) 134 return 0, fmt.Errorf("node %s (%s): %w", nodeID, node.Type, err) 135 } 136 137 nodeResults[nodeID] = output 138 139 // Log output data 140 outputJSON, _ := json.Marshal(output) 141 e.db.LogExecutionWithData(executionID, nodeID, "data", fmt.Sprintf("%d items", len(output)), string(outputJSON)) 142 } 143 144 // Return item count from last node 145 if len(order) == 0 { 146 return 0, nil 147 } 148 149 lastNodeID := order[len(order)-1] 150 finalOutput := nodeResults[lastNodeID] 151 return len(finalOutput), nil 152} 153 154func (e *Executor) gatherInputs(nodeID string, connections []Connection, nodeResults map[string][]interface{}) [][]interface{} { 155 var inputs [][]interface{} 156 157 for _, conn := range connections { 158 if conn.Target == nodeID { 159 if result, ok := nodeResults[conn.Source]; ok { 160 inputs = append(inputs, result) 161 } 162 } 163 } 164 165 return inputs 166} 167 168func topologicalSort(nodes []Node, connections []Connection) ([]string, error) { 169 // Kahn's algorithm for topological sorting 170 inDegree := make(map[string]int) 171 adjacency := make(map[string][]string) 172 173 // Initialize 174 for _, n := range nodes { 175 inDegree[n.ID] = 0 176 adjacency[n.ID] = []string{} 177 } 178 179 // Build graph 180 for _, c := range connections { 181 adjacency[c.Source] = append(adjacency[c.Source], c.Target) 182 inDegree[c.Target]++ 183 } 184 185 // Find sources (nodes with no incoming edges) 186 queue := []string{} 187 for id, degree := range inDegree { 188 if degree == 0 { 189 queue = append(queue, id) 190 } 191 } 192 193 sorted := []string{} 194 for len(queue) > 0 { 195 node := queue[0] 196 queue = queue[1:] 197 sorted = append(sorted, node) 198 199 for _, neighbor := range adjacency[node] { 200 inDegree[neighbor]-- 201 if inDegree[neighbor] == 0 { 202 queue = append(queue, neighbor) 203 } 204 } 205 } 206 207 if len(sorted) != len(nodes) { 208 return nil, fmt.Errorf("pipeline contains a cycle") 209 } 210 211 return sorted, nil 212} 213 214func findNode(nodes []Node, id string) *Node { 215 for i := range nodes { 216 if nodes[i].ID == id { 217 return &nodes[i] 218 } 219 } 220 return nil 221}