auto-reconnecting jetstream proxy

add support for wantedCollections

+98 -9
+98 -9
main.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/json" 5 6 "fmt" 6 7 "log/slog" 7 8 "net/http" ··· 24 25 // want yours here? contact me 25 26 } 26 27 28 + // Event represents a Jetstream event 29 + type Event struct { 30 + Did string `json:"did"` 31 + TimeUS int64 `json:"time_us"` 32 + Kind string `json:"kind,omitempty"` 33 + Commit *Commit `json:"commit,omitempty"` 34 + } 35 + 36 + // Commit represents a commit event 37 + type Commit struct { 38 + Rev string `json:"rev,omitempty"` 39 + Operation string `json:"operation,omitempty"` 40 + Collection string `json:"collection,omitempty"` 41 + RKey string `json:"rkey,omitempty"` 42 + Record json.RawMessage `json:"record,omitempty"` 43 + CID string `json:"cid,omitempty"` 44 + } 45 + 46 + // Message wraps a Jetstream event with both parsed and raw forms 47 + type Message struct { 48 + Event *Event 49 + Raw []byte 50 + } 51 + 27 52 // Broadcaster manages subscribers to Jetstream events 28 53 type Broadcaster struct { 29 - listeners []chan []byte 54 + listeners []chan *Message 30 55 mu sync.Mutex 31 56 connected atomic.Bool 32 57 lastMessageTime atomic.Int64 // Unix timestamp in seconds 33 58 } 34 59 35 60 // Subscribe returns a new channel that will receive Jetstream events 36 - func (b *Broadcaster) Subscribe() chan []byte { 61 + func (b *Broadcaster) Subscribe() chan *Message { 37 62 b.mu.Lock() 38 63 defer b.mu.Unlock() 39 64 40 65 // firehose can be more-than-1k events per second, 41 66 // prefer to create a large buffer for the subscribers 42 - ch := make(chan []byte, 10000) 67 + ch := make(chan *Message, 10000) 43 68 b.listeners = append(b.listeners, ch) 44 69 return ch 45 70 } 46 71 47 - func (b *Broadcaster) Unsubscribe(ch chan []byte) { 72 + func (b *Broadcaster) Unsubscribe(ch chan *Message) { 48 73 b.mu.Lock() 49 74 defer b.mu.Unlock() 50 75 ··· 57 82 } 58 83 } 59 84 60 - func (b *Broadcaster) Broadcast(message []byte) { 85 + func (b *Broadcaster) Broadcast(rawMessage []byte) { 61 86 b.lastMessageTime.Store(time.Now().Unix()) 62 87 88 + // Parse the event once 89 + var event Event 90 + if err := json.Unmarshal(rawMessage, &event); err != nil { 91 + slog.Debug("Failed to parse event", slog.Any("error", err)) 92 + // Broadcast anyway with nil event 93 + } 94 + 95 + msg := &Message{ 96 + Event: &event, 97 + Raw: rawMessage, 98 + } 99 + 63 100 b.mu.Lock() 64 101 defer b.mu.Unlock() 65 102 66 103 for _, ch := range b.listeners { 67 104 select { 68 - case ch <- message: 105 + case ch <- msg: 69 106 // event sent successfully. we don't want to block 70 107 default: 71 108 // channel full, skip to avoid blocking ··· 120 157 } 121 158 } 122 159 160 + // matchesCollection checks if an event matches any of the wanted collections 161 + func matchesCollection(event *Event, wantedCollections []string) bool { 162 + // Always pass through account and identity events 163 + if event.Kind == "account" || event.Kind == "identity" { 164 + return true 165 + } 166 + 167 + // If no wanted collections specified, pass everything 168 + if len(wantedCollections) == 0 { 169 + return true 170 + } 171 + 172 + // For commit events, check the collection 173 + if event.Commit == nil { 174 + return false 175 + } 176 + 177 + collection := event.Commit.Collection 178 + for _, wanted := range wantedCollections { 179 + // Support wildcard matching like "app.bsky.graph.*" 180 + if strings.HasSuffix(wanted, ".*") { 181 + prefix := strings.TrimSuffix(wanted, ".*") 182 + if strings.HasPrefix(collection, prefix+".") || collection == prefix { 183 + return true 184 + } 185 + } else if collection == wanted { 186 + return true 187 + } 188 + } 189 + 190 + return false 191 + } 192 + 123 193 // handleSubscribe upgrades HTTP connection to websocket and streams events 124 194 func handleSubscribe(broadcaster *Broadcaster) http.HandlerFunc { 125 195 return func(w http.ResponseWriter, r *http.Request) { ··· 130 200 } 131 201 defer conn.Close() 132 202 203 + // Parse wantedCollections from query params 204 + wantedCollections := r.URL.Query()["wantedCollections"] 205 + if len(wantedCollections) > 100 { 206 + slog.Warn("Client requested too many collections, limiting to 100", slog.Int("requested", len(wantedCollections))) 207 + wantedCollections = wantedCollections[:100] 208 + } 209 + 133 210 // Subscribe to broadcaster 134 211 ch := broadcaster.Subscribe() 135 212 defer broadcaster.Unsubscribe(ch) 136 213 137 - slog.Info("Client connected", slog.String("remote", r.RemoteAddr)) 214 + if len(wantedCollections) > 0 { 215 + slog.Info("Client connected", slog.String("remote", r.RemoteAddr), slog.Any("wantedCollections", wantedCollections)) 216 + } else { 217 + slog.Info("Client connected", slog.String("remote", r.RemoteAddr)) 218 + } 138 219 139 220 // Stream events to client 140 - for message := range ch { 141 - err := conn.WriteMessage(websocket.TextMessage, message) 221 + for msg := range ch { 222 + // If filtering is enabled, check the event 223 + if len(wantedCollections) > 0 && msg.Event != nil { 224 + // Check if event matches wanted collections 225 + if !matchesCollection(msg.Event, wantedCollections) { 226 + continue 227 + } 228 + } 229 + 230 + err := conn.WriteMessage(websocket.TextMessage, msg.Raw) 142 231 if err != nil { 143 232 slog.Debug("Client disconnected", slog.String("remote", r.RemoteAddr), slog.Any("error", err)) 144 233 break