+93
knotserver/events.go
+93
knotserver/events.go
···
1
+
package knotserver
2
+
3
+
import (
4
+
"context"
5
+
"net/http"
6
+
"time"
7
+
8
+
"github.com/gorilla/websocket"
9
+
)
10
+
11
+
var upgrader = websocket.Upgrader{
12
+
ReadBufferSize: 1024,
13
+
WriteBufferSize: 1024,
14
+
}
15
+
16
+
func (h *Handle) OpLog(w http.ResponseWriter, r *http.Request) {
17
+
l := h.l.With("handler", "OpLog")
18
+
l.Info("received new connection")
19
+
20
+
conn, err := upgrader.Upgrade(w, r, nil)
21
+
if err != nil {
22
+
l.Error("websocket upgrade failed", "err", err)
23
+
w.WriteHeader(http.StatusInternalServerError)
24
+
return
25
+
}
26
+
defer conn.Close()
27
+
l.Info("upgraded http to wss")
28
+
29
+
ch := h.n.Subscribe()
30
+
defer h.n.Unsubscribe(ch)
31
+
32
+
ctx, cancel := context.WithCancel(r.Context())
33
+
defer cancel()
34
+
go func() {
35
+
for {
36
+
if _, _, err := conn.NextReader(); err != nil {
37
+
l.Error("failed to read", "err", err)
38
+
cancel()
39
+
return
40
+
}
41
+
}
42
+
}()
43
+
44
+
cursor := ""
45
+
46
+
// complete backfill first before going to live data
47
+
l.Info("going through backfill", "cursor", cursor)
48
+
if err := h.streamOps(conn, &cursor); err != nil {
49
+
l.Error("failed to backfill", "err", err)
50
+
return
51
+
}
52
+
53
+
for {
54
+
// wait for new data or timeout
55
+
select {
56
+
case <-ctx.Done():
57
+
l.Info("stopping stream: client closed connection")
58
+
return
59
+
case <-ch:
60
+
// we have been notified of new data
61
+
l.Info("going through live data", "cursor", cursor)
62
+
if err := h.streamOps(conn, &cursor); err != nil {
63
+
l.Error("failed to stream", "err", err)
64
+
return
65
+
}
66
+
case <-time.After(30 * time.Second):
67
+
// send a keep-alive
68
+
l.Info("sent keepalive")
69
+
if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
70
+
l.Error("failed to write control", "err", err)
71
+
}
72
+
}
73
+
}
74
+
}
75
+
76
+
func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error {
77
+
ops, err := h.db.GetOps(*cursor)
78
+
if err != nil {
79
+
h.l.Debug("err", "err", err)
80
+
return err
81
+
}
82
+
h.l.Debug("ops", "ops", ops)
83
+
84
+
for _, op := range ops {
85
+
if err := conn.WriteJSON(op); err != nil {
86
+
h.l.Debug("err", "err", err)
87
+
return err
88
+
}
89
+
*cursor = op.Tid
90
+
}
91
+
92
+
return nil
93
+
}