Monorepo for Tangled — https://tangled.org
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 161 lines 3.9 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "net/http" 7 "strconv" 8 "time" 9 10 "github.com/bluesky-social/indigo/xrpc" 11 "github.com/gorilla/websocket" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/log" 14) 15 16var upgrader = websocket.Upgrader{ 17 ReadBufferSize: 1024, 18 WriteBufferSize: 1024, 19} 20 21func (h *Knot) Events(w http.ResponseWriter, r *http.Request) { 22 l := log.SubLogger(h.l, "eventstream") 23 l.Debug("received new connection") 24 25 conn, err := upgrader.Upgrade(w, r, nil) 26 if err != nil { 27 l.Error("websocket upgrade failed", "err", err) 28 w.WriteHeader(http.StatusInternalServerError) 29 return 30 } 31 defer conn.Close() 32 l.Debug("upgraded http to wss") 33 34 ch := h.n.Subscribe() 35 defer h.n.Unsubscribe(ch) 36 37 ctx, cancel := context.WithCancel(r.Context()) 38 defer cancel() 39 go func() { 40 for { 41 if _, _, err := conn.NextReader(); err != nil { 42 l.Error("failed to read", "err", err) 43 cancel() 44 return 45 } 46 } 47 }() 48 49 var cursor int64 50 cursorStr := r.URL.Query().Get("cursor") 51 if cursorStr != "" { 52 cursor, err = strconv.ParseInt(cursorStr, 10, 64) 53 if err != nil { 54 l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr) 55 cursor = 0 56 } 57 } 58 59 l.Debug("going through backfill", "cursor", cursor) 60 if err := h.drainBackfill(conn, &cursor, 10_000); err != nil { 61 l.Error("failed to backfill", "err", err) 62 return 63 } 64 65 // try request crawl when connection closed 66 defer func() { 67 go func() { 68 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 69 defer retryCancel() 70 if err := h.requestCrawl(retryCtx); err != nil { 71 l.Error("error requesting crawls", "err", err) 72 } 73 }() 74 }() 75 76 for { 77 // wait for new data or timeout 78 select { 79 case <-ctx.Done(): 80 l.Debug("stopping stream: client closed connection") 81 return 82 case <-ch: 83 l.Debug("going through live data", "cursor", cursor) 84 if _, err := h.streamOps(conn, &cursor); err != nil { 85 l.Error("failed to stream", "err", err) 86 return 87 } 88 case <-time.After(30 * time.Second): 89 // send a keep-alive 90 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 91 l.Error("failed to write control", "err", err) 92 } 93 } 94 } 95} 96 97func (h *Knot) drainBackfill(conn *websocket.Conn, cursor *int64, maxBatches int) error { 98 for range maxBatches { 99 n, err := h.streamOps(conn, cursor) 100 if err != nil { 101 return err 102 } 103 if n < 100 { 104 return nil 105 } 106 } 107 h.l.Warn("backfill hit batch limit", "maxBatches", maxBatches, "cursor", *cursor) 108 return nil 109} 110 111func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) (int, error) { 112 events, err := h.db.GetEvents(*cursor) 113 if err != nil { 114 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 115 return 0, err 116 } 117 118 for _, event := range events { 119 var eventJson map[string]any 120 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 121 if err != nil { 122 h.l.Error("failed to unmarshal event", "err", err) 123 return 0, err 124 } 125 126 jsonMsg, err := json.Marshal(map[string]any{ 127 "rkey": event.Rkey, 128 "nsid": event.Nsid, 129 "event": eventJson, 130 "created": event.Created, 131 }) 132 if err != nil { 133 h.l.Error("failed to marshal record", "err", err) 134 return 0, err 135 } 136 137 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 138 h.l.Debug("err", "err", err) 139 return 0, err 140 } 141 *cursor = event.Created 142 } 143 144 return len(events), nil 145} 146 147func (h *Knot) requestCrawl(ctx context.Context) error { 148 h.l.Info("requesting crawl", "mirrors", h.c.KnotMirrors) 149 input := &tangled.SyncRequestCrawl_Input{ 150 Hostname: h.c.Server.Hostname, 151 } 152 for _, knotmirror := range h.c.KnotMirrors { 153 xrpcc := xrpc.Client{Host: knotmirror} 154 if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 155 h.l.Error("error requesting crawl", "err", err) 156 } else { 157 h.l.Info("crawl requested successfully") 158 } 159 } 160 return nil 161}