auto-reconnecting jetstream proxy

mirror ws over

Changed files
+187 -5
+4 -1
go.mod
··· 2 2 3 3 go 1.25.1 4 4 5 - require github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 // indirect 5 + require ( 6 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 // indirect 7 + github.com/gorilla/websocket v1.5.3 // indirect 8 + )
+2
go.sum
··· 1 1 github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU= 2 2 github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs= 3 + github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= 4 + github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+181 -4
main.go
··· 8 8 "strings" 9 9 "sync" 10 10 "time" 11 + 12 + "github.com/gorilla/websocket" 11 13 ) 12 14 13 15 var DEFAULT_POOL = []string{ ··· 20 22 // want yours here? contact me 21 23 } 22 24 25 + // Broadcaster manages subscribers to Jetstream events 26 + type Broadcaster struct { 27 + listeners []chan []byte 28 + mu sync.Mutex 29 + } 30 + 31 + // Subscribe returns a new channel that will receive Jetstream events 32 + func (b *Broadcaster) Subscribe() chan []byte { 33 + b.mu.Lock() 34 + defer b.mu.Unlock() 35 + 36 + // firehose can be more-than-1k events per second, 37 + // prefer to create a large buffer for the subscribers 38 + ch := make(chan []byte, 10000) 39 + b.listeners = append(b.listeners, ch) 40 + return ch 41 + } 42 + 43 + func (b *Broadcaster) Unsubscribe(ch chan []byte) { 44 + b.mu.Lock() 45 + defer b.mu.Unlock() 46 + 47 + for i, listener := range b.listeners { 48 + if listener == ch { 49 + b.listeners = append(b.listeners[:i], b.listeners[i+1:]...) 50 + close(ch) 51 + break 52 + } 53 + } 54 + } 55 + 56 + func (b *Broadcaster) Broadcast(message []byte) { 57 + b.mu.Lock() 58 + defer b.mu.Unlock() 59 + 60 + for _, ch := range b.listeners { 61 + select { 62 + case ch <- message: 63 + // event sent successfully. we don't want to block 64 + default: 65 + // channel full, skip to avoid blocking 66 + slog.Warn("jetstream broadcast: channel full, dropping event") 67 + } 68 + } 69 + } 70 + 23 71 type latencyResult struct { 24 72 url string 25 73 latency time.Duration ··· 29 77 func measureLatency(url string) (time.Duration, error) { 30 78 httpsURL := strings.Replace(url, "wss://", "https://", 1) 31 79 80 + client := &http.Client{ 81 + Timeout: 20 * time.Second, 82 + } 83 + 32 84 start := time.Now() 33 - resp, err := http.Get(httpsURL) 85 + resp, err := client.Get(httpsURL) 34 86 if err != nil { 35 87 return 0, err 36 88 } ··· 39 91 return time.Since(start), nil 40 92 } 41 93 94 + var upgrader = websocket.Upgrader{ 95 + CheckOrigin: func(r *http.Request) bool { 96 + return true // Allow all origins 97 + }, 98 + } 99 + 100 + // handleSubscribe upgrades HTTP connection to websocket and streams events 101 + func handleSubscribe(broadcaster *Broadcaster) http.HandlerFunc { 102 + return func(w http.ResponseWriter, r *http.Request) { 103 + conn, err := upgrader.Upgrade(w, r, nil) 104 + if err != nil { 105 + slog.Error("Failed to upgrade connection", slog.Any("error", err)) 106 + return 107 + } 108 + defer conn.Close() 109 + 110 + // Subscribe to broadcaster 111 + ch := broadcaster.Subscribe() 112 + defer broadcaster.Unsubscribe(ch) 113 + 114 + slog.Info("Client connected", slog.String("remote", r.RemoteAddr)) 115 + 116 + // Stream events to client 117 + for message := range ch { 118 + err := conn.WriteMessage(websocket.TextMessage, message) 119 + if err != nil { 120 + slog.Debug("Client disconnected", slog.String("remote", r.RemoteAddr), slog.Any("error", err)) 121 + break 122 + } 123 + } 124 + 125 + slog.Info("Client disconnected", slog.String("remote", r.RemoteAddr)) 126 + } 127 + } 128 + 129 + // connectToUpstream maintains a connection to the upstream websocket and broadcasts messages 130 + func connectToUpstream(pool []string, broadcaster *Broadcaster) { 131 + backoff := time.Second 132 + maxBackoff := time.Minute 133 + var currentUpstream string 134 + 135 + for { 136 + // Find best upstream (re-evaluate on each connection attempt) 137 + bestUpstream, err := findBestUpstream(pool) 138 + if err != nil { 139 + slog.Error("Failed to find best upstream", slog.Any("error", err)) 140 + time.Sleep(backoff) 141 + backoff *= 2 142 + if backoff > maxBackoff { 143 + backoff = maxBackoff 144 + } 145 + continue 146 + } 147 + 148 + if bestUpstream != currentUpstream { 149 + slog.Info("Switching to new upstream", slog.String("url", bestUpstream)) 150 + currentUpstream = bestUpstream 151 + } 152 + 153 + slog.Info("Connecting to upstream", slog.String("url", currentUpstream)) 154 + 155 + conn, _, err := websocket.DefaultDialer.Dial(currentUpstream+"/subscribe", nil) 156 + if err != nil { 157 + slog.Error("Failed to connect to upstream", slog.String("url", currentUpstream), slog.Any("error", err)) 158 + time.Sleep(backoff) 159 + backoff *= 2 160 + if backoff > maxBackoff { 161 + backoff = maxBackoff 162 + } 163 + continue 164 + } 165 + 166 + slog.Info("Connected to upstream", slog.String("url", currentUpstream)) 167 + backoff = time.Second // Reset backoff on successful connection 168 + 169 + // Read messages from upstream and broadcast them 170 + for { 171 + messageType, message, err := conn.ReadMessage() 172 + if err != nil { 173 + slog.Error("Error reading from upstream", slog.Any("error", err)) 174 + conn.Close() 175 + break 176 + } 177 + 178 + // Only broadcast text/binary messages 179 + if messageType == websocket.TextMessage || messageType == websocket.BinaryMessage { 180 + broadcaster.Broadcast(message) 181 + } 182 + } 183 + 184 + // Connection lost, will re-evaluate best upstream on next iteration 185 + slog.Info("Connection lost, finding new upstream", slog.Duration("backoff", backoff)) 186 + time.Sleep(backoff) 187 + backoff *= 2 188 + if backoff > maxBackoff { 189 + backoff = maxBackoff 190 + } 191 + } 192 + } 193 + 42 194 func findBestUpstream(pool []string) (string, error) { 43 195 44 196 // Measure latency concurrently ··· 96 248 slog.SetLogLoggerLevel(slog.LevelDebug) 97 249 } 98 250 99 - bestUpstream, err := findBestUpstream(pool) 100 - if err != nil { 251 + envPort := os.Getenv("PORT") 252 + port := envPort 253 + if envPort == "" { 254 + port = "8096" 255 + } 256 + 257 + envHost := os.Getenv("HOST") 258 + host := envHost 259 + if envHost == "" { 260 + // should be running on the same hardware as your service 261 + host = "127.0.0.1" 262 + } 263 + 264 + bindAddr := fmt.Sprintf("%s:%s", host, port) 265 + 266 + // Create broadcaster and start upstream connection 267 + // connectToUpstream will continuously find the best upstream and reconnect on failures 268 + broadcaster := &Broadcaster{} 269 + go connectToUpstream(pool, broadcaster) 270 + 271 + // Setup HTTP server 272 + http.HandleFunc("/subscribe", handleSubscribe(broadcaster)) 273 + 274 + slog.Info("Starting proxy server", slog.String("bind", bindAddr)) 275 + if err := http.ListenAndServe(bindAddr, nil); err != nil { 276 + slog.Error("Server failed", slog.Any("error", err)) 101 277 panic(err) 102 278 } 103 279 104 - fmt.Println(bestUpstream) 280 + // TODO (future) let zlib compression be env'd 281 + // TODO: the proxy subscribes to all lexicons, but then filters out at client level. add env var for lex filtering too 105 282 }