Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
at HEAD 149 lines 3.7 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "net/http" 7 "strconv" 8 "time" 9 10 "github.com/bluesky-social/indigo/xrpc" 11 "github.com/gorilla/websocket" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/log" 14) 15 16var upgrader = websocket.Upgrader{ 17 ReadBufferSize: 1024, 18 WriteBufferSize: 1024, 19} 20 21func (h *Knot) Events(w http.ResponseWriter, r *http.Request) { 22 l := log.SubLogger(h.l, "eventstream") 23 l.Debug("received new connection") 24 25 conn, err := upgrader.Upgrade(w, r, nil) 26 if err != nil { 27 l.Error("websocket upgrade failed", "err", err) 28 w.WriteHeader(http.StatusInternalServerError) 29 return 30 } 31 defer conn.Close() 32 l.Debug("upgraded http to wss") 33 34 ch := h.n.Subscribe() 35 defer h.n.Unsubscribe(ch) 36 37 ctx, cancel := context.WithCancel(r.Context()) 38 defer cancel() 39 go func() { 40 for { 41 if _, _, err := conn.NextReader(); err != nil { 42 l.Error("failed to read", "err", err) 43 cancel() 44 return 45 } 46 } 47 }() 48 49 defaultCursor := time.Now().UnixNano() 50 cursorStr := r.URL.Query().Get("cursor") 51 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 52 if err != nil { 53 l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 54 } 55 if cursor == 0 { 56 cursor = defaultCursor 57 } 58 59 // complete backfill first before going to live data 60 l.Debug("going through backfill", "cursor", cursor) 61 if err := h.streamOps(conn, &cursor); err != nil { 62 l.Error("failed to backfill", "err", err) 63 return 64 } 65 66 // try request crawl when connection closed 67 defer func() { 68 go func() { 69 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 70 defer retryCancel() 71 if err := h.requestCrawl(retryCtx); err != nil { 72 l.Error("error requesting crawls", "err", err) 73 } 74 }() 75 }() 76 77 for { 78 // wait for new data or timeout 79 select { 80 case <-ctx.Done(): 81 l.Debug("stopping stream: client closed connection") 82 return 83 case <-ch: 84 // we have been notified of new data 85 l.Debug("going through live data", "cursor", cursor) 86 if err := h.streamOps(conn, &cursor); err != nil { 87 l.Error("failed to stream", "err", err) 88 return 89 } 90 case <-time.After(30 * time.Second): 91 // send a keep-alive 92 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 93 l.Error("failed to write control", "err", err) 94 } 95 } 96 } 97} 98 99func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error { 100 events, err := h.db.GetEvents(*cursor) 101 if err != nil { 102 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 103 return err 104 } 105 106 for _, event := range events { 107 // first extract the inner json into a map 108 var eventJson map[string]any 109 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 110 if err != nil { 111 h.l.Error("failed to unmarshal event", "err", err) 112 return err 113 } 114 115 jsonMsg, err := json.Marshal(map[string]any{ 116 "rkey": event.Rkey, 117 "nsid": event.Nsid, 118 "event": eventJson, 119 }) 120 if err != nil { 121 h.l.Error("failed to marshal record", "err", err) 122 return err 123 } 124 125 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 126 h.l.Debug("err", "err", err) 127 return err 128 } 129 *cursor = event.Created 130 } 131 132 return nil 133} 134 135func (h *Knot) requestCrawl(ctx context.Context) error { 136 h.l.Info("requesting crawl", "mirrors", h.c.KnotMirrors) 137 input := &tangled.SyncRequestCrawl_Input{ 138 Hostname: h.c.Server.Hostname, 139 } 140 for _, knotmirror := range h.c.KnotMirrors { 141 xrpcc := xrpc.Client{Host: knotmirror} 142 if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 143 h.l.Error("error requesting crawl", "err", err) 144 } else { 145 h.l.Info("crawl requested successfully") 146 } 147 } 148 return nil 149}