forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "strconv"
10 "time"
11
12 "tangled.sh/tangled.sh/core/spindle/engine"
13 "tangled.sh/tangled.sh/core/spindle/models"
14
15 "github.com/go-chi/chi/v5"
16 "github.com/gorilla/websocket"
17 "github.com/hpcloud/tail"
18)
19
20var upgrader = websocket.Upgrader{
21 ReadBufferSize: 1024,
22 WriteBufferSize: 1024,
23}
24
25func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
26 l := s.l.With("handler", "Events")
27 l.Debug("received new connection")
28
29 conn, err := upgrader.Upgrade(w, r, nil)
30 if err != nil {
31 l.Error("websocket upgrade failed", "err", err)
32 w.WriteHeader(http.StatusInternalServerError)
33 return
34 }
35 defer conn.Close()
36 l.Debug("upgraded http to wss")
37
38 ch := s.n.Subscribe()
39 defer s.n.Unsubscribe(ch)
40
41 ctx, cancel := context.WithCancel(r.Context())
42 defer cancel()
43 go func() {
44 for {
45 if _, _, err := conn.NextReader(); err != nil {
46 l.Error("failed to read", "err", err)
47 cancel()
48 return
49 }
50 }
51 }()
52
53 defaultCursor := time.Now().UnixNano()
54 cursorStr := r.URL.Query().Get("cursor")
55 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
56 if err != nil {
57 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
58 }
59 if cursor == 0 {
60 cursor = defaultCursor
61 }
62
63 // complete backfill first before going to live data
64 l.Debug("going through backfill", "cursor", cursor)
65 if err := s.streamPipelines(conn, &cursor); err != nil {
66 l.Error("failed to backfill", "err", err)
67 return
68 }
69
70 for {
71 // wait for new data or timeout
72 select {
73 case <-ctx.Done():
74 l.Debug("stopping stream: client closed connection")
75 return
76 case <-ch:
77 // we have been notified of new data
78 l.Debug("going through live data", "cursor", cursor)
79 if err := s.streamPipelines(conn, &cursor); err != nil {
80 l.Error("failed to stream", "err", err)
81 return
82 }
83 case <-time.After(30 * time.Second):
84 // send a keep-alive
85 l.Debug("sent keepalive")
86 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
87 l.Error("failed to write control", "err", err)
88 }
89 }
90 }
91}
92
93func (s *Spindle) Logs(w http.ResponseWriter, r *http.Request) {
94 wid, err := getWorkflowID(r)
95 if err != nil {
96 http.Error(w, err.Error(), http.StatusBadRequest)
97 return
98 }
99
100 l := s.l.With("handler", "Logs")
101 l = s.l.With("wid", wid)
102
103 conn, err := upgrader.Upgrade(w, r, nil)
104 if err != nil {
105 l.Error("websocket upgrade failed", "err", err)
106 http.Error(w, "failed to upgrade", http.StatusInternalServerError)
107 return
108 }
109 defer func() {
110 _ = conn.WriteControl(
111 websocket.CloseMessage,
112 websocket.FormatCloseMessage(websocket.CloseNormalClosure, "log stream complete"),
113 time.Now().Add(time.Second),
114 )
115 conn.Close()
116 }()
117 l.Debug("upgraded http to wss")
118
119 ctx, cancel := context.WithCancel(r.Context())
120 defer cancel()
121
122 go func() {
123 for {
124 if _, _, err := conn.NextReader(); err != nil {
125 l.Debug("client disconnected", "err", err)
126 cancel()
127 return
128 }
129 }
130 }()
131
132 if err := s.streamLogsFromDisk(ctx, conn, wid); err != nil {
133 l.Info("log stream ended", "err", err)
134 }
135
136 l.Info("logs connection closed")
137}
138
139func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, wid models.WorkflowId) error {
140 status, err := s.db.GetStatus(wid)
141 if err != nil {
142 return err
143 }
144 isFinished := models.StatusKind(status.Status).IsFinish()
145
146 filePath := engine.LogFilePath(s.cfg.Pipelines.LogDir, wid)
147
148 config := tail.Config{
149 Follow: !isFinished,
150 ReOpen: !isFinished,
151 MustExist: false,
152 Location: &tail.SeekInfo{
153 Offset: 0,
154 Whence: io.SeekStart,
155 },
156 // Logger: tail.DiscardingLogger,
157 }
158
159 t, err := tail.TailFile(filePath, config)
160 if err != nil {
161 return fmt.Errorf("failed to tail log file: %w", err)
162 }
163 defer t.Stop()
164
165 for {
166 select {
167 case <-ctx.Done():
168 return ctx.Err()
169 case line := <-t.Lines:
170 if line == nil && isFinished {
171 return fmt.Errorf("tail completed")
172 }
173
174 if line == nil {
175 return fmt.Errorf("tail channel closed unexpectedly")
176 }
177
178 if line.Err != nil {
179 return fmt.Errorf("error tailing log file: %w", line.Err)
180 }
181
182 if err := conn.WriteMessage(websocket.TextMessage, []byte(line.Text)); err != nil {
183 return fmt.Errorf("failed to write to websocket: %w", err)
184 }
185 }
186 }
187}
188
189func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
190 events, err := s.db.GetEvents(*cursor)
191 if err != nil {
192 s.l.Debug("err", "err", err)
193 return err
194 }
195 s.l.Debug("ops", "ops", events)
196
197 for _, event := range events {
198 // first extract the inner json into a map
199 var eventJson map[string]any
200 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
201 if err != nil {
202 s.l.Error("failed to unmarshal event", "err", err)
203 return err
204 }
205
206 jsonMsg, err := json.Marshal(map[string]any{
207 "rkey": event.Rkey,
208 "nsid": event.Nsid,
209 "event": eventJson,
210 })
211 if err != nil {
212 s.l.Error("failed to marshal record", "err", err)
213 return err
214 }
215
216 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
217 s.l.Debug("err", "err", err)
218 return err
219 }
220 *cursor = event.Created
221 }
222
223 return nil
224}
225
226func getWorkflowID(r *http.Request) (models.WorkflowId, error) {
227 knot := chi.URLParam(r, "knot")
228 rkey := chi.URLParam(r, "rkey")
229 name := chi.URLParam(r, "name")
230
231 if knot == "" || rkey == "" || name == "" {
232 return models.WorkflowId{}, fmt.Errorf("missing required parameters")
233 }
234
235 return models.WorkflowId{
236 PipelineId: models.PipelineId{
237 Knot: knot,
238 Rkey: rkey,
239 },
240 Name: name,
241 }, nil
242}