+6
-6
knotserver/events.go
+6
-6
knotserver/events.go
···
16
17
func (h *Handle) Events(w http.ResponseWriter, r *http.Request) {
18
l := h.l.With("handler", "OpLog")
19
-
l.Info("received new connection")
20
21
conn, err := upgrader.Upgrade(w, r, nil)
22
if err != nil {
···
25
return
26
}
27
defer conn.Close()
28
-
l.Info("upgraded http to wss")
29
30
ch := h.n.Subscribe()
31
defer h.n.Unsubscribe(ch)
···
45
cursor := r.URL.Query().Get("cursor")
46
47
// complete backfill first before going to live data
48
-
l.Info("going through backfill", "cursor", cursor)
49
if err := h.streamOps(conn, &cursor); err != nil {
50
l.Error("failed to backfill", "err", err)
51
return
···
55
// wait for new data or timeout
56
select {
57
case <-ctx.Done():
58
-
l.Info("stopping stream: client closed connection")
59
return
60
case <-ch:
61
// we have been notified of new data
62
-
l.Info("going through live data", "cursor", cursor)
63
if err := h.streamOps(conn, &cursor); err != nil {
64
l.Error("failed to stream", "err", err)
65
return
66
}
67
case <-time.After(30 * time.Second):
68
// send a keep-alive
69
-
l.Info("sent keepalive")
70
if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
71
l.Error("failed to write control", "err", err)
72
}
···
16
17
func (h *Handle) Events(w http.ResponseWriter, r *http.Request) {
18
l := h.l.With("handler", "OpLog")
19
+
l.Debug("received new connection")
20
21
conn, err := upgrader.Upgrade(w, r, nil)
22
if err != nil {
···
25
return
26
}
27
defer conn.Close()
28
+
l.Debug("upgraded http to wss")
29
30
ch := h.n.Subscribe()
31
defer h.n.Unsubscribe(ch)
···
45
cursor := r.URL.Query().Get("cursor")
46
47
// complete backfill first before going to live data
48
+
l.Debug("going through backfill", "cursor", cursor)
49
if err := h.streamOps(conn, &cursor); err != nil {
50
l.Error("failed to backfill", "err", err)
51
return
···
55
// wait for new data or timeout
56
select {
57
case <-ctx.Done():
58
+
l.Debug("stopping stream: client closed connection")
59
return
60
case <-ch:
61
// we have been notified of new data
62
+
l.Debug("going through live data", "cursor", cursor)
63
if err := h.streamOps(conn, &cursor); err != nil {
64
l.Error("failed to stream", "err", err)
65
return
66
}
67
case <-time.After(30 * time.Second):
68
// send a keep-alive
69
+
l.Debug("sent keepalive")
70
if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
71
l.Error("failed to write control", "err", err)
72
}