[DEPRECATED] Go implementation of plcbundle
at main 5.4 kB view raw
1package server 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "net/http" 8 "os" 9 "strconv" 10 "time" 11 12 "github.com/goccy/go-json" 13 "github.com/gorilla/websocket" 14 "tangled.org/atscan.net/plcbundle-go/internal/plcclient" 15 "tangled.org/atscan.net/plcbundle-go/internal/types" 16) 17 18var upgrader = websocket.Upgrader{ 19 ReadBufferSize: 1024, 20 WriteBufferSize: 1024, 21 CheckOrigin: func(r *http.Request) bool { 22 return true 23 }, 24} 25 26func (s *Server) handleWebSocket() http.HandlerFunc { 27 return func(w http.ResponseWriter, r *http.Request) { 28 cursorStr := r.URL.Query().Get("cursor") 29 var cursor int 30 31 if cursorStr == "" { 32 cursor = s.manager.GetCurrentCursor() 33 } else { 34 var err error 35 cursor, err = strconv.Atoi(cursorStr) 36 if err != nil || cursor < 0 { 37 http.Error(w, "Invalid cursor: must be non-negative integer", 400) 38 return 39 } 40 } 41 42 conn, err := upgrader.Upgrade(w, r, nil) 43 if err != nil { 44 fmt.Fprintf(os.Stderr, "WebSocket upgrade failed: %v\n", err) 45 return 46 } 47 defer conn.Close() 48 49 conn.SetPongHandler(func(string) error { 50 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 51 return nil 52 }) 53 54 done := make(chan struct{}) 55 56 go func() { 57 defer close(done) 58 for { 59 _, _, err := conn.ReadMessage() 60 if err != nil { 61 if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { 62 fmt.Fprintf(os.Stderr, "WebSocket: client closed connection\n") 63 } 64 return 65 } 66 } 67 }() 68 69 bgCtx := context.Background() 70 71 if err := s.streamLive(bgCtx, conn, cursor, done); err != nil { 72 fmt.Fprintf(os.Stderr, "WebSocket stream error: %v\n", err) 73 } 74 } 75} 76 77func (s *Server) streamLive(ctx context.Context, conn *websocket.Conn, startCursor int, done chan struct{}) error { 78 index := s.manager.GetIndex() 79 bundles := index.GetBundles() 80 currentRecord := startCursor 81 82 // Stream existing bundles 83 if len(bundles) > 0 { 84 startBundleIdx := startCursor / types.BUNDLE_SIZE 85 startPosition := startCursor % types.BUNDLE_SIZE 86 87 if startBundleIdx < len(bundles) { 88 for i := startBundleIdx; i < len(bundles); i++ { 89 skipUntil := 0 90 if i == startBundleIdx { 91 skipUntil = startPosition 92 } 93 94 newRecordCount, err := s.streamBundle(ctx, conn, bundles[i].BundleNumber, skipUntil, done) 95 if err != nil { 96 return err 97 } 98 currentRecord += newRecordCount 99 } 100 } 101 } 102 103 lastSeenMempoolCount := 0 104 if err := s.streamMempool(conn, startCursor, len(bundles)*types.BUNDLE_SIZE, &currentRecord, &lastSeenMempoolCount, done); err != nil { 105 return err 106 } 107 108 ticker := time.NewTicker(500 * time.Millisecond) 109 defer ticker.Stop() 110 111 lastBundleCount := len(bundles) 112 113 for { 114 select { 115 case <-done: 116 return nil 117 118 case <-ticker.C: 119 index = s.manager.GetIndex() 120 bundles = index.GetBundles() 121 122 if len(bundles) > lastBundleCount { 123 newBundleCount := len(bundles) - lastBundleCount 124 currentRecord += newBundleCount * types.BUNDLE_SIZE 125 lastBundleCount = len(bundles) 126 lastSeenMempoolCount = 0 127 } 128 129 if err := s.streamMempool(conn, startCursor, len(bundles)*types.BUNDLE_SIZE, &currentRecord, &lastSeenMempoolCount, done); err != nil { 130 return err 131 } 132 133 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 134 return err 135 } 136 } 137 } 138} 139 140func (s *Server) streamBundle(ctx context.Context, conn *websocket.Conn, bundleNumber int, skipUntil int, done chan struct{}) (int, error) { 141 reader, err := s.manager.StreamBundleDecompressed(ctx, bundleNumber) 142 if err != nil { 143 return 0, nil 144 } 145 defer reader.Close() 146 147 scanner := bufio.NewScanner(reader) 148 buf := make([]byte, 0, 64*1024) 149 scanner.Buffer(buf, 1024*1024) 150 151 position := 0 152 streamed := 0 153 154 for scanner.Scan() { 155 line := scanner.Bytes() 156 if len(line) == 0 { 157 continue 158 } 159 160 if position < skipUntil { 161 position++ 162 continue 163 } 164 165 select { 166 case <-done: 167 return streamed, nil 168 default: 169 } 170 171 if err := conn.WriteMessage(websocket.TextMessage, line); err != nil { 172 return streamed, err 173 } 174 175 position++ 176 streamed++ 177 178 if streamed%1000 == 0 { 179 conn.WriteMessage(websocket.PingMessage, nil) 180 } 181 } 182 183 if err := scanner.Err(); err != nil { 184 return streamed, fmt.Errorf("scanner error on bundle %d: %w", bundleNumber, err) 185 } 186 187 return streamed, nil 188} 189 190func (s *Server) streamMempool(conn *websocket.Conn, startCursor int, bundleRecordBase int, currentRecord *int, lastSeenCount *int, done chan struct{}) error { 191 mempoolOps, err := s.manager.GetMempoolOperations() 192 if err != nil { 193 return nil 194 } 195 196 if len(mempoolOps) <= *lastSeenCount { 197 return nil 198 } 199 200 for i := *lastSeenCount; i < len(mempoolOps); i++ { 201 recordNum := bundleRecordBase + i 202 if recordNum < startCursor { 203 continue 204 } 205 206 select { 207 case <-done: 208 return nil 209 default: 210 } 211 212 if err := sendOperation(conn, mempoolOps[i]); err != nil { 213 return err 214 } 215 *currentRecord++ 216 } 217 218 *lastSeenCount = len(mempoolOps) 219 return nil 220} 221 222func sendOperation(conn *websocket.Conn, op plcclient.PLCOperation) error { 223 var data []byte 224 var err error 225 226 if len(op.RawJSON) > 0 { 227 data = op.RawJSON 228 } else { 229 data, err = json.Marshal(op) 230 if err != nil { 231 return nil 232 } 233 } 234 235 if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { 236 if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { 237 fmt.Fprintf(os.Stderr, "WebSocket write error: %v\n", err) 238 } 239 return err 240 } 241 242 return nil 243}