forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "strconv"
8 "time"
9
10 "github.com/gorilla/websocket"
11)
12
13var upgrader = websocket.Upgrader{
14 ReadBufferSize: 1024,
15 WriteBufferSize: 1024,
16}
17
18func (h *Handle) Events(w http.ResponseWriter, r *http.Request) {
19 l := h.l.With("handler", "OpLog")
20 l.Debug("received new connection")
21
22 conn, err := upgrader.Upgrade(w, r, nil)
23 if err != nil {
24 l.Error("websocket upgrade failed", "err", err)
25 w.WriteHeader(http.StatusInternalServerError)
26 return
27 }
28 defer conn.Close()
29 l.Debug("upgraded http to wss")
30
31 ch := h.n.Subscribe()
32 defer h.n.Unsubscribe(ch)
33
34 ctx, cancel := context.WithCancel(r.Context())
35 defer cancel()
36 go func() {
37 for {
38 if _, _, err := conn.NextReader(); err != nil {
39 l.Error("failed to read", "err", err)
40 cancel()
41 return
42 }
43 }
44 }()
45
46 defaultCursor := time.Now().UnixNano()
47 cursorStr := r.URL.Query().Get("cursor")
48 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
49 if err != nil {
50 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
51 }
52 if cursor == 0 {
53 cursor = defaultCursor
54 }
55
56 // complete backfill first before going to live data
57 l.Debug("going through backfill", "cursor", cursor)
58 if err := h.streamOps(conn, &cursor); err != nil {
59 l.Error("failed to backfill", "err", err)
60 return
61 }
62
63 for {
64 // wait for new data or timeout
65 select {
66 case <-ctx.Done():
67 l.Debug("stopping stream: client closed connection")
68 return
69 case <-ch:
70 // we have been notified of new data
71 l.Debug("going through live data", "cursor", cursor)
72 if err := h.streamOps(conn, &cursor); err != nil {
73 l.Error("failed to stream", "err", err)
74 return
75 }
76 case <-time.After(30 * time.Second):
77 // send a keep-alive
78 l.Debug("sent keepalive")
79 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
80 l.Error("failed to write control", "err", err)
81 }
82 }
83 }
84}
85
86func (h *Handle) streamOps(conn *websocket.Conn, cursor *int64) error {
87 events, err := h.db.GetEvents(*cursor)
88 if err != nil {
89 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
90 return err
91 }
92 h.l.Debug("ops", "ops", events)
93
94 for _, event := range events {
95 // first extract the inner json into a map
96 var eventJson map[string]any
97 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
98 if err != nil {
99 h.l.Error("failed to unmarshal event", "err", err)
100 return err
101 }
102
103 jsonMsg, err := json.Marshal(map[string]any{
104 "rkey": event.Rkey,
105 "nsid": event.Nsid,
106 "event": eventJson,
107 })
108 if err != nil {
109 h.l.Error("failed to marshal record", "err", err)
110 return err
111 }
112
113 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
114 h.l.Debug("err", "err", err)
115 return err
116 }
117 *cursor = event.Created
118 }
119
120 return nil
121}