forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "strconv"
8 "time"
9
10 "github.com/gorilla/websocket"
11)
12
13var upgrader = websocket.Upgrader{
14 ReadBufferSize: 1024,
15 WriteBufferSize: 1024,
16}
17
18func (h *Handle) Events(w http.ResponseWriter, r *http.Request) {
19 l := h.l.With("handler", "OpLog")
20 l.Debug("received new connection")
21
22 conn, err := upgrader.Upgrade(w, r, nil)
23 if err != nil {
24 l.Error("websocket upgrade failed", "err", err)
25 w.WriteHeader(http.StatusInternalServerError)
26 return
27 }
28 defer conn.Close()
29 l.Debug("upgraded http to wss")
30
31 ch := h.n.Subscribe()
32 defer h.n.Unsubscribe(ch)
33
34 ctx, cancel := context.WithCancel(r.Context())
35 defer cancel()
36 go func() {
37 for {
38 if _, _, err := conn.NextReader(); err != nil {
39 l.Error("failed to read", "err", err)
40 cancel()
41 return
42 }
43 }
44 }()
45
46 cursorStr := r.URL.Query().Get("cursor")
47 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
48 if err != nil {
49 l.Error("empty or invalid cursor, defaulting to zero", "invalidCursor", cursorStr)
50 }
51
52 // complete backfill first before going to live data
53 l.Info("going through backfill", "cursor", cursor)
54 l.Debug("going through backfill", "cursor", cursor)
55 if err := h.streamOps(conn, &cursor); err != nil {
56 l.Error("failed to backfill", "err", err)
57 return
58 }
59 for {
60 // wait for new data or timeout
61 select {
62 case <-ctx.Done():
63 l.Debug("stopping stream: client closed connection")
64 return
65 case <-ch:
66 // we have been notified of new data
67 l.Debug("going through live data", "cursor", cursor)
68 if err := h.streamOps(conn, &cursor); err != nil {
69 l.Error("failed to stream", "err", err)
70 return
71 }
72 case <-time.After(30 * time.Second):
73 // send a keep-alive
74 l.Debug("sent keepalive")
75 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
76 l.Error("failed to write control", "err", err)
77 }
78 }
79 }
80}
81
82func (h *Handle) streamOps(conn *websocket.Conn, cursor *int64) error {
83 events, err := h.db.GetEvents(*cursor)
84 if err != nil {
85 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
86 return err
87 }
88 h.l.Debug("ops", "ops", events)
89
90 for _, event := range events {
91 // first extract the inner json into a map
92 var eventJson map[string]any
93 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
94 if err != nil {
95 h.l.Error("failed to unmarshal event", "err", err)
96 return err
97 }
98
99 jsonMsg, err := json.Marshal(map[string]any{
100 "rkey": event.Rkey,
101 "nsid": event.Nsid,
102 "event": eventJson,
103 })
104 if err != nil {
105 h.l.Error("failed to marshal record", "err", err)
106 return err
107 }
108
109 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
110 h.l.Debug("err", "err", err)
111 return err
112 }
113 *cursor = event.Created
114 }
115
116 return nil
117}