forked from
tangled.org/core
Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "strconv"
8 "time"
9
10 "github.com/bluesky-social/indigo/xrpc"
11 "github.com/gorilla/websocket"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/log"
14)
15
16var upgrader = websocket.Upgrader{
17 ReadBufferSize: 1024,
18 WriteBufferSize: 1024,
19}
20
21func (h *Knot) Events(w http.ResponseWriter, r *http.Request) {
22 l := log.SubLogger(h.l, "eventstream")
23 l.Debug("received new connection")
24
25 conn, err := upgrader.Upgrade(w, r, nil)
26 if err != nil {
27 l.Error("websocket upgrade failed", "err", err)
28 w.WriteHeader(http.StatusInternalServerError)
29 return
30 }
31 defer conn.Close()
32 l.Debug("upgraded http to wss")
33
34 ch := h.n.Subscribe()
35 defer h.n.Unsubscribe(ch)
36
37 ctx, cancel := context.WithCancel(r.Context())
38 defer cancel()
39 go func() {
40 for {
41 if _, _, err := conn.NextReader(); err != nil {
42 l.Error("failed to read", "err", err)
43 cancel()
44 return
45 }
46 }
47 }()
48
49 defaultCursor := time.Now().UnixNano()
50 cursorStr := r.URL.Query().Get("cursor")
51 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
52 if err != nil {
53 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
54 }
55 if cursor == 0 {
56 cursor = defaultCursor
57 }
58
59 // complete backfill first before going to live data
60 l.Debug("going through backfill", "cursor", cursor)
61 if err := h.streamOps(conn, &cursor); err != nil {
62 l.Error("failed to backfill", "err", err)
63 return
64 }
65
66 // try request crawl when connection closed
67 defer func() {
68 go func() {
69 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
70 defer retryCancel()
71 if err := h.requestCrawl(retryCtx); err != nil {
72 l.Error("error requesting crawls", "err", err)
73 }
74 }()
75 }()
76
77 for {
78 // wait for new data or timeout
79 select {
80 case <-ctx.Done():
81 l.Debug("stopping stream: client closed connection")
82 return
83 case <-ch:
84 // we have been notified of new data
85 l.Debug("going through live data", "cursor", cursor)
86 if err := h.streamOps(conn, &cursor); err != nil {
87 l.Error("failed to stream", "err", err)
88 return
89 }
90 case <-time.After(30 * time.Second):
91 // send a keep-alive
92 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
93 l.Error("failed to write control", "err", err)
94 }
95 }
96 }
97}
98
99func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error {
100 events, err := h.db.GetEvents(*cursor)
101 if err != nil {
102 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
103 return err
104 }
105
106 for _, event := range events {
107 // first extract the inner json into a map
108 var eventJson map[string]any
109 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
110 if err != nil {
111 h.l.Error("failed to unmarshal event", "err", err)
112 return err
113 }
114
115 jsonMsg, err := json.Marshal(map[string]any{
116 "rkey": event.Rkey,
117 "nsid": event.Nsid,
118 "event": eventJson,
119 })
120 if err != nil {
121 h.l.Error("failed to marshal record", "err", err)
122 return err
123 }
124
125 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
126 h.l.Debug("err", "err", err)
127 return err
128 }
129 *cursor = event.Created
130 }
131
132 return nil
133}
134
135func (h *Knot) requestCrawl(ctx context.Context) error {
136 h.l.Info("requesting crawl", "mirrors", h.c.KnotMirrors)
137 input := &tangled.SyncRequestCrawl_Input{
138 Hostname: h.c.Server.Hostname,
139 }
140 for _, knotmirror := range h.c.KnotMirrors {
141 xrpcc := xrpc.Client{Host: knotmirror}
142 if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil {
143 h.l.Error("error requesting crawl", "err", err)
144 } else {
145 h.l.Info("crawl requested successfully")
146 }
147 }
148 return nil
149}