fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
tangled.org
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "strconv"
8 "time"
9
10 "github.com/gorilla/websocket"
11 "tangled.org/core/log"
12)
13
14var upgrader = websocket.Upgrader{
15 ReadBufferSize: 1024,
16 WriteBufferSize: 1024,
17}
18
19func (h *Knot) Events(w http.ResponseWriter, r *http.Request) {
20 l := log.SubLogger(h.l, "eventstream")
21 l.Debug("received new connection")
22
23 conn, err := upgrader.Upgrade(w, r, nil)
24 if err != nil {
25 l.Error("websocket upgrade failed", "err", err)
26 w.WriteHeader(http.StatusInternalServerError)
27 return
28 }
29 defer conn.Close()
30 l.Debug("upgraded http to wss")
31
32 ch := h.n.Subscribe()
33 defer h.n.Unsubscribe(ch)
34
35 ctx, cancel := context.WithCancel(r.Context())
36 defer cancel()
37 go func() {
38 for {
39 if _, _, err := conn.NextReader(); err != nil {
40 l.Error("failed to read", "err", err)
41 cancel()
42 return
43 }
44 }
45 }()
46
47 defaultCursor := time.Now().UnixNano()
48 cursorStr := r.URL.Query().Get("cursor")
49 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
50 if err != nil {
51 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
52 }
53 if cursor == 0 {
54 cursor = defaultCursor
55 }
56
57 // complete backfill first before going to live data
58 l.Debug("going through backfill", "cursor", cursor)
59 if err := h.streamOps(conn, &cursor); err != nil {
60 l.Error("failed to backfill", "err", err)
61 return
62 }
63
64 for {
65 // wait for new data or timeout
66 select {
67 case <-ctx.Done():
68 l.Debug("stopping stream: client closed connection")
69 return
70 case <-ch:
71 // we have been notified of new data
72 l.Debug("going through live data", "cursor", cursor)
73 if err := h.streamOps(conn, &cursor); err != nil {
74 l.Error("failed to stream", "err", err)
75 return
76 }
77 case <-time.After(30 * time.Second):
78 // send a keep-alive
79 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
80 l.Error("failed to write control", "err", err)
81 }
82 }
83 }
84}
85
86func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error {
87 events, err := h.db.GetEvents(*cursor)
88 if err != nil {
89 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
90 return err
91 }
92
93 for _, event := range events {
94 // first extract the inner json into a map
95 var eventJson map[string]any
96 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
97 if err != nil {
98 h.l.Error("failed to unmarshal event", "err", err)
99 return err
100 }
101
102 jsonMsg, err := json.Marshal(map[string]any{
103 "rkey": event.Rkey,
104 "nsid": event.Nsid,
105 "event": eventJson,
106 })
107 if err != nil {
108 h.l.Error("failed to marshal record", "err", err)
109 return err
110 }
111
112 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
113 h.l.Debug("err", "err", err)
114 return err
115 }
116 *cursor = event.Created
117 }
118
119 return nil
120}