tangled
alpha
login
or
join now
atscan.net
/
plcbundle
A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
14
fork
atom
overview
issues
2
pulls
pipelines
websocket cursor from current
tree.fail
3 months ago
28580f40
b6f3ed45
1/1
build.yml
success
1min 5s
+52
-16
2 changed files
expand all
collapse all
unified
split
bundle
manager.go
cmd
plcbundle
server.go
+16
bundle/manager.go
···
1296
1296
}
1297
1297
return m.plcClient.GetBaseURL()
1298
1298
}
1299
1299
+
1300
1300
+
// GetCurrentCursor returns the current latest cursor position (including mempool)
1301
1301
+
// Cursor format: (bundleNumber × BUNDLE_SIZE) + position
1302
1302
+
func (m *Manager) GetCurrentCursor() int {
1303
1303
+
index := m.GetIndex()
1304
1304
+
bundles := index.GetBundles()
1305
1305
+
cursor := len(bundles) * BUNDLE_SIZE
1306
1306
+
1307
1307
+
// Add mempool operations to get true latest position
1308
1308
+
mempoolStats := m.GetMempoolStats()
1309
1309
+
if count, ok := mempoolStats["count"].(int); ok {
1310
1310
+
cursor += count
1311
1311
+
}
1312
1312
+
1313
1313
+
return cursor
1314
1314
+
}
+36
-16
cmd/plcbundle/server.go
···
182
182
if wsEnabled {
183
183
fmt.Fprintf(w, "\nWebSocket Endpoints\n")
184
184
fmt.Fprintf(w, "━━━━━━━━━━━━━━━━━━━\n")
185
185
-
fmt.Fprintf(w, " WS /ws?cursor=N Live stream all records from cursor N\n")
186
186
-
fmt.Fprintf(w, " Streams all bundles, then mempool\n")
187
187
-
fmt.Fprintf(w, " Continues streaming new operations live\n")
188
188
-
fmt.Fprintf(w, " Connection stays open until client closes\n")
189
189
-
fmt.Fprintf(w, " Cursor: global record number (0-based)\n")
190
190
-
fmt.Fprintf(w, " Example: 88410345 = bundle 8841, pos 345\n")
185
185
+
fmt.Fprintf(w, " WS /ws Live stream (new operations only)\n")
186
186
+
fmt.Fprintf(w, " WS /ws?cursor=0 Stream all from beginning\n")
187
187
+
fmt.Fprintf(w, " WS /ws?cursor=N Stream from cursor N\n")
188
188
+
fmt.Fprintf(w, "\n")
189
189
+
fmt.Fprintf(w, "Cursor Format:\n")
190
190
+
fmt.Fprintf(w, " Global record number: (bundleNumber × 10,000) + position\n")
191
191
+
fmt.Fprintf(w, " Example: 88410345 = bundle 8841, position 345\n")
192
192
+
fmt.Fprintf(w, " Default: starts from latest (skips all historical data)\n")
193
193
+
194
194
+
// Get current cursor
195
195
+
latestCursor := mgr.GetCurrentCursor()
196
196
+
bundledOps := len(index.GetBundles()) * bundle.BUNDLE_SIZE
197
197
+
mempoolOps := latestCursor - bundledOps
198
198
+
199
199
+
if syncMode && mempoolOps > 0 {
200
200
+
fmt.Fprintf(w, " Current latest: %d (%d bundled + %d mempool)\n",
201
201
+
latestCursor, bundledOps, mempoolOps)
202
202
+
} else {
203
203
+
fmt.Fprintf(w, " Current latest: %d (%d bundles)\n",
204
204
+
latestCursor, len(index.GetBundles()))
205
205
+
}
191
206
}
192
207
193
208
fmt.Fprintf(w, "\nExamples\n")
···
200
215
fmt.Fprintf(w, " curl %s/jsonl/1\n\n", baseURL)
201
216
202
217
if wsEnabled {
203
203
-
fmt.Fprintf(w, " # Stream all operations via WebSocket (from beginning)\n")
218
218
+
fmt.Fprintf(w, " # Stream live updates only (default)\n")
204
219
fmt.Fprintf(w, " websocat %s/ws\n\n", wsURL)
205
205
-
fmt.Fprintf(w, " # Stream from cursor 10000\n")
206
206
-
fmt.Fprintf(w, " websocat '%s/ws?cursor=10000'\n\n", wsURL)
207
207
-
fmt.Fprintf(w, " # Stream and save to file\n")
208
208
-
fmt.Fprintf(w, " websocat %s/ws > all_operations.jsonl\n\n", wsURL)
209
209
-
fmt.Fprintf(w, " # Stream with jq for pretty printing\n")
210
210
-
fmt.Fprintf(w, " websocat %s/ws | jq .\n\n", wsURL)
220
220
+
fmt.Fprintf(w, " # Stream everything from the beginning\n")
221
221
+
fmt.Fprintf(w, " websocat '%s/ws?cursor=0'\n\n", wsURL)
222
222
+
fmt.Fprintf(w, " # Stream from specific bundle (e.g., bundle 100)\n")
223
223
+
fmt.Fprintf(w, " websocat '%s/ws?cursor=1000000'\n\n", wsURL)
224
224
+
fmt.Fprintf(w, " # Resume from last position\n")
225
225
+
fmt.Fprintf(w, " websocat '%s/ws?cursor=88410345'\n\n", wsURL)
211
226
}
212
227
213
228
if syncMode {
···
321
336
// handleWebSocket streams all records via WebSocket starting from cursor
322
337
// Keeps connection alive and streams new records as they arrive
323
338
func handleWebSocket(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager) {
324
324
-
// Parse cursor from query parameter (defaults to 0)
339
339
+
// Parse cursor from query parameter
325
340
cursorStr := r.URL.Query().Get("cursor")
326
326
-
cursor := 0
327
327
-
if cursorStr != "" {
341
341
+
var cursor int
342
342
+
343
343
+
if cursorStr == "" {
344
344
+
// DEFAULT: Start from latest (current state including mempool)
345
345
+
cursor = mgr.GetCurrentCursor()
346
346
+
} else {
347
347
+
// Explicit cursor provided
328
348
var err error
329
349
cursor, err = strconv.Atoi(cursorStr)
330
350
if err != nil || cursor < 0 {