[DEPRECATED] Go implementation of plcbundle
1package server
2
3import (
4 "bufio"
5 "context"
6 "fmt"
7 "net/http"
8 "os"
9 "strconv"
10 "time"
11
12 "github.com/goccy/go-json"
13 "github.com/gorilla/websocket"
14 "tangled.org/atscan.net/plcbundle-go/internal/plcclient"
15 "tangled.org/atscan.net/plcbundle-go/internal/types"
16)
17
18var upgrader = websocket.Upgrader{
19 ReadBufferSize: 1024,
20 WriteBufferSize: 1024,
21 CheckOrigin: func(r *http.Request) bool {
22 return true
23 },
24}
25
26func (s *Server) handleWebSocket() http.HandlerFunc {
27 return func(w http.ResponseWriter, r *http.Request) {
28 cursorStr := r.URL.Query().Get("cursor")
29 var cursor int
30
31 if cursorStr == "" {
32 cursor = s.manager.GetCurrentCursor()
33 } else {
34 var err error
35 cursor, err = strconv.Atoi(cursorStr)
36 if err != nil || cursor < 0 {
37 http.Error(w, "Invalid cursor: must be non-negative integer", 400)
38 return
39 }
40 }
41
42 conn, err := upgrader.Upgrade(w, r, nil)
43 if err != nil {
44 fmt.Fprintf(os.Stderr, "WebSocket upgrade failed: %v\n", err)
45 return
46 }
47 defer conn.Close()
48
49 conn.SetPongHandler(func(string) error {
50 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
51 return nil
52 })
53
54 done := make(chan struct{})
55
56 go func() {
57 defer close(done)
58 for {
59 _, _, err := conn.ReadMessage()
60 if err != nil {
61 if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
62 fmt.Fprintf(os.Stderr, "WebSocket: client closed connection\n")
63 }
64 return
65 }
66 }
67 }()
68
69 bgCtx := context.Background()
70
71 if err := s.streamLive(bgCtx, conn, cursor, done); err != nil {
72 fmt.Fprintf(os.Stderr, "WebSocket stream error: %v\n", err)
73 }
74 }
75}
76
77func (s *Server) streamLive(ctx context.Context, conn *websocket.Conn, startCursor int, done chan struct{}) error {
78 index := s.manager.GetIndex()
79 bundles := index.GetBundles()
80 currentRecord := startCursor
81
82 // Stream existing bundles
83 if len(bundles) > 0 {
84 startBundleIdx := startCursor / types.BUNDLE_SIZE
85 startPosition := startCursor % types.BUNDLE_SIZE
86
87 if startBundleIdx < len(bundles) {
88 for i := startBundleIdx; i < len(bundles); i++ {
89 skipUntil := 0
90 if i == startBundleIdx {
91 skipUntil = startPosition
92 }
93
94 newRecordCount, err := s.streamBundle(ctx, conn, bundles[i].BundleNumber, skipUntil, done)
95 if err != nil {
96 return err
97 }
98 currentRecord += newRecordCount
99 }
100 }
101 }
102
103 lastSeenMempoolCount := 0
104 if err := s.streamMempool(conn, startCursor, len(bundles)*types.BUNDLE_SIZE, ¤tRecord, &lastSeenMempoolCount, done); err != nil {
105 return err
106 }
107
108 ticker := time.NewTicker(500 * time.Millisecond)
109 defer ticker.Stop()
110
111 lastBundleCount := len(bundles)
112
113 for {
114 select {
115 case <-done:
116 return nil
117
118 case <-ticker.C:
119 index = s.manager.GetIndex()
120 bundles = index.GetBundles()
121
122 if len(bundles) > lastBundleCount {
123 newBundleCount := len(bundles) - lastBundleCount
124 currentRecord += newBundleCount * types.BUNDLE_SIZE
125 lastBundleCount = len(bundles)
126 lastSeenMempoolCount = 0
127 }
128
129 if err := s.streamMempool(conn, startCursor, len(bundles)*types.BUNDLE_SIZE, ¤tRecord, &lastSeenMempoolCount, done); err != nil {
130 return err
131 }
132
133 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
134 return err
135 }
136 }
137 }
138}
139
140func (s *Server) streamBundle(ctx context.Context, conn *websocket.Conn, bundleNumber int, skipUntil int, done chan struct{}) (int, error) {
141 reader, err := s.manager.StreamBundleDecompressed(ctx, bundleNumber)
142 if err != nil {
143 return 0, nil
144 }
145 defer reader.Close()
146
147 scanner := bufio.NewScanner(reader)
148 buf := make([]byte, 0, 64*1024)
149 scanner.Buffer(buf, 1024*1024)
150
151 position := 0
152 streamed := 0
153
154 for scanner.Scan() {
155 line := scanner.Bytes()
156 if len(line) == 0 {
157 continue
158 }
159
160 if position < skipUntil {
161 position++
162 continue
163 }
164
165 select {
166 case <-done:
167 return streamed, nil
168 default:
169 }
170
171 if err := conn.WriteMessage(websocket.TextMessage, line); err != nil {
172 return streamed, err
173 }
174
175 position++
176 streamed++
177
178 if streamed%1000 == 0 {
179 conn.WriteMessage(websocket.PingMessage, nil)
180 }
181 }
182
183 if err := scanner.Err(); err != nil {
184 return streamed, fmt.Errorf("scanner error on bundle %d: %w", bundleNumber, err)
185 }
186
187 return streamed, nil
188}
189
190func (s *Server) streamMempool(conn *websocket.Conn, startCursor int, bundleRecordBase int, currentRecord *int, lastSeenCount *int, done chan struct{}) error {
191 mempoolOps, err := s.manager.GetMempoolOperations()
192 if err != nil {
193 return nil
194 }
195
196 if len(mempoolOps) <= *lastSeenCount {
197 return nil
198 }
199
200 for i := *lastSeenCount; i < len(mempoolOps); i++ {
201 recordNum := bundleRecordBase + i
202 if recordNum < startCursor {
203 continue
204 }
205
206 select {
207 case <-done:
208 return nil
209 default:
210 }
211
212 if err := sendOperation(conn, mempoolOps[i]); err != nil {
213 return err
214 }
215 *currentRecord++
216 }
217
218 *lastSeenCount = len(mempoolOps)
219 return nil
220}
221
222func sendOperation(conn *websocket.Conn, op plcclient.PLCOperation) error {
223 var data []byte
224 var err error
225
226 if len(op.RawJSON) > 0 {
227 data = op.RawJSON
228 } else {
229 data, err = json.Marshal(op)
230 if err != nil {
231 return nil
232 }
233 }
234
235 if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
236 if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
237 fmt.Fprintf(os.Stderr, "WebSocket write error: %v\n", err)
238 }
239 return err
240 }
241
242 return nil
243}