forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "strconv"
8 "time"
9
10 "github.com/bluesky-social/indigo/xrpc"
11 "github.com/gorilla/websocket"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/log"
14)
15
16var upgrader = websocket.Upgrader{
17 ReadBufferSize: 1024,
18 WriteBufferSize: 1024,
19}
20
21func (h *Knot) Events(w http.ResponseWriter, r *http.Request) {
22 l := log.SubLogger(h.l, "eventstream")
23 l.Debug("received new connection")
24
25 conn, err := upgrader.Upgrade(w, r, nil)
26 if err != nil {
27 l.Error("websocket upgrade failed", "err", err)
28 w.WriteHeader(http.StatusInternalServerError)
29 return
30 }
31 defer conn.Close()
32 l.Debug("upgraded http to wss")
33
34 ch := h.n.Subscribe()
35 defer h.n.Unsubscribe(ch)
36
37 ctx, cancel := context.WithCancel(r.Context())
38 defer cancel()
39 go func() {
40 for {
41 if _, _, err := conn.NextReader(); err != nil {
42 l.Error("failed to read", "err", err)
43 cancel()
44 return
45 }
46 }
47 }()
48
49 var cursor int64
50 cursorStr := r.URL.Query().Get("cursor")
51 if cursorStr != "" {
52 cursor, err = strconv.ParseInt(cursorStr, 10, 64)
53 if err != nil {
54 l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr)
55 cursor = 0
56 }
57 }
58
59 l.Debug("going through backfill", "cursor", cursor)
60 if err := h.drainBackfill(conn, &cursor, 10_000); err != nil {
61 l.Error("failed to backfill", "err", err)
62 return
63 }
64
65 // try request crawl when connection closed
66 defer func() {
67 go func() {
68 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
69 defer retryCancel()
70 if err := h.requestCrawl(retryCtx); err != nil {
71 l.Error("error requesting crawls", "err", err)
72 }
73 }()
74 }()
75
76 for {
77 // wait for new data or timeout
78 select {
79 case <-ctx.Done():
80 l.Debug("stopping stream: client closed connection")
81 return
82 case <-ch:
83 l.Debug("going through live data", "cursor", cursor)
84 if _, err := h.streamOps(conn, &cursor); err != nil {
85 l.Error("failed to stream", "err", err)
86 return
87 }
88 case <-time.After(30 * time.Second):
89 // send a keep-alive
90 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
91 l.Error("failed to write control", "err", err)
92 }
93 }
94 }
95}
96
97func (h *Knot) drainBackfill(conn *websocket.Conn, cursor *int64, maxBatches int) error {
98 for range maxBatches {
99 n, err := h.streamOps(conn, cursor)
100 if err != nil {
101 return err
102 }
103 if n < 100 {
104 return nil
105 }
106 }
107 h.l.Warn("backfill hit batch limit", "maxBatches", maxBatches, "cursor", *cursor)
108 return nil
109}
110
111func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) (int, error) {
112 events, err := h.db.GetEvents(*cursor)
113 if err != nil {
114 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
115 return 0, err
116 }
117
118 for _, event := range events {
119 var eventJson map[string]any
120 err := json.Unmarshal([]byte(event.EventJson), &eventJson)
121 if err != nil {
122 h.l.Error("failed to unmarshal event", "err", err)
123 return 0, err
124 }
125
126 jsonMsg, err := json.Marshal(map[string]any{
127 "rkey": event.Rkey,
128 "nsid": event.Nsid,
129 "event": eventJson,
130 "created": event.Created,
131 })
132 if err != nil {
133 h.l.Error("failed to marshal record", "err", err)
134 return 0, err
135 }
136
137 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
138 h.l.Debug("err", "err", err)
139 return 0, err
140 }
141 *cursor = event.Created
142 }
143
144 return len(events), nil
145}
146
147func (h *Knot) requestCrawl(ctx context.Context) error {
148 h.l.Info("requesting crawl", "mirrors", h.c.KnotMirrors)
149 input := &tangled.SyncRequestCrawl_Input{
150 Hostname: h.c.Server.Hostname,
151 }
152 for _, knotmirror := range h.c.KnotMirrors {
153 xrpcc := xrpc.Client{Host: knotmirror}
154 if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil {
155 h.l.Error("error requesting crawl", "err", err)
156 } else {
157 h.l.Info("crawl requested successfully")
158 }
159 }
160 return nil
161}