A locally focused bluesky appview
at master 6.9 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "sync" 9 "time" 10 11 "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/cmd/relay/stream" 13 "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 14 jsclient "github.com/bluesky-social/jetstream/pkg/client" 15 jsparallel "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel" 16 "github.com/bluesky-social/jetstream/pkg/models" 17 "github.com/gorilla/websocket" 18) 19 20type SyncConfig struct { 21 Backends []SyncBackend `json:"backends"` 22} 23 24type SyncBackend struct { 25 Type string `json:"type"` 26 Host string `json:"host"` 27 MaxWorkers int `json:"max_workers,omitempty"` 28} 29 30func (s *Server) StartSyncEngine(ctx context.Context, sc *SyncConfig) error { 31 for _, be := range sc.Backends { 32 switch be.Type { 33 case "firehose": 34 go s.runSyncFirehose(ctx, be) 35 case "jetstream": 36 go s.runSyncJetstream(ctx, be) 37 default: 38 return fmt.Errorf("unrecognized sync backend type: %q", be.Type) 39 } 40 } 41 42 <-ctx.Done() 43 return fmt.Errorf("exiting sync routine") 44} 45 46const failureTimeInterval = time.Second * 5 47 48func (s *Server) runSyncFirehose(ctx context.Context, be SyncBackend) { 49 var failures int 50 for { 51 seqno, err := loadLastSeq(s.db, be.Host) 52 if err != nil { 53 fmt.Println("failed to load sequence number, starting over", err) 54 } 55 56 maxWorkers := 10 57 if be.MaxWorkers != 0 { 58 maxWorkers = be.MaxWorkers 59 } 60 61 start := time.Now() 62 if err := s.startLiveTail(ctx, be.Host, int(seqno), maxWorkers, 20); err != nil { 63 slog.Error("firehose connection lost", "host", be.Host, "error", err) 64 } 65 66 elapsed := time.Since(start) 67 68 if elapsed > failureTimeInterval { 69 failures = 0 70 continue 71 } 72 failures++ 73 74 delay := delayForFailureCount(failures) 75 slog.Warn("retrying connection after delay", "host", be.Host, "delay", delay) 76 } 77} 78 79func (s *Server) runSyncJetstream(ctx context.Context, be SyncBackend) { 80 var failures int 81 for { 82 // Load last cursor (stored as sequence number in same table) 83 cursor, err := loadLastSeq(s.db, be.Host) 84 if err != nil { 85 slog.Warn("failed to load jetstream cursor, starting from live", "error", err) 86 cursor = 0 87 } 88 89 maxWorkers := 10 90 if be.MaxWorkers != 0 { 91 maxWorkers = be.MaxWorkers 92 } 93 94 start := time.Now() 95 if err := s.startJetstreamTail(ctx, be.Host, cursor, maxWorkers); err != nil { 96 slog.Error("jetstream connection lost", "host", be.Host, "error", err) 97 } 98 99 elapsed := time.Since(start) 100 101 if elapsed > failureTimeInterval { 102 failures = 0 103 continue 104 } 105 failures++ 106 107 delay := delayForFailureCount(failures) 108 slog.Warn("retrying jetstream connection after delay", "host", be.Host, "delay", delay) 109 time.Sleep(delay) 110 } 111} 112 113func delayForFailureCount(n int) time.Duration { 114 if n < 5 { 115 return (time.Second * 5) + (time.Second * 2 * time.Duration(n)) 116 } 117 118 return time.Second * 30 119} 120 121func (s *Server) startLiveTail(ctx context.Context, host string, curs int, parWorkers, maxQ int) error { 122 ctx, cancel := context.WithCancel(ctx) 123 defer cancel() 124 125 slog.Info("starting live tail") 126 127 // Connect to the Relay websocket 128 urlStr := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", host, curs) 129 130 d := websocket.DefaultDialer 131 con, _, err := d.Dial(urlStr, http.Header{ 132 "User-Agent": []string{"konbini/0.0.1"}, 133 }) 134 if err != nil { 135 return fmt.Errorf("failed to connect to relay: %w", err) 136 } 137 138 var lelk sync.Mutex 139 lastEvent := time.Now() 140 141 go func() { 142 tick := time.NewTicker(time.Second) 143 defer tick.Stop() 144 for { 145 select { 146 case <-tick.C: 147 lelk.Lock() 148 let := lastEvent 149 lelk.Unlock() 150 151 if time.Since(let) > time.Second*30 { 152 slog.Error("firehose connection timed out") 153 con.Close() 154 return 155 } 156 case <-ctx.Done(): 157 return 158 } 159 } 160 }() 161 162 var cclk sync.Mutex 163 var completeCursor int64 164 165 rsc := &stream.RepoStreamCallbacks{ 166 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 167 ctx := context.Background() 168 169 firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq)) 170 171 s.seqLk.Lock() 172 if evt.Seq > s.lastSeq { 173 curs = int(evt.Seq) 174 s.lastSeq = evt.Seq 175 176 if evt.Seq%1000 == 0 { 177 if err := storeLastSeq(s.db, host, evt.Seq); err != nil { 178 fmt.Println("failed to store seqno: ", err) 179 } 180 } 181 } 182 s.seqLk.Unlock() 183 184 lelk.Lock() 185 lastEvent = time.Now() 186 lelk.Unlock() 187 188 if err := s.backend.HandleEvent(ctx, evt); err != nil { 189 return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err) 190 } 191 192 cclk.Lock() 193 if evt.Seq > completeCursor { 194 completeCursor = evt.Seq 195 firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq)) 196 } 197 cclk.Unlock() 198 199 return nil 200 }, 201 RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error { 202 return nil 203 }, 204 // TODO: all the other event types 205 Error: func(errf *stream.ErrorFrame) error { 206 return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 207 }, 208 } 209 210 sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler) 211 212 return stream.HandleRepoStream(ctx, con, sched, slog.Default()) 213} 214 215func (s *Server) startJetstreamTail(ctx context.Context, host string, cursor int64, parWorkers int) error { 216 ctx, cancel := context.WithCancel(ctx) 217 defer cancel() 218 219 slog.Info("starting jetstream tail", "host", host, "cursor", cursor) 220 221 // Create a scheduler for parallel processing 222 lastStored := int64(0) 223 sched := jsparallel.NewScheduler( 224 parWorkers, 225 host, 226 slog.Default(), 227 func(ctx context.Context, event *models.Event) error { 228 // Update cursor tracking 229 s.seqLk.Lock() 230 if event.TimeUS > s.lastSeq { 231 s.lastSeq = event.TimeUS 232 if event.TimeUS-lastStored > 1_000_000 { 233 // Store checkpoint periodically 234 if err := storeLastSeq(s.db, host, event.TimeUS); err != nil { 235 slog.Error("failed to store jetstream cursor", "error", err) 236 } 237 lastStored = event.TimeUS 238 } 239 } 240 s.seqLk.Unlock() 241 242 // Update metrics 243 firehoseCursorGauge.WithLabelValues("ingest").Set(float64(event.TimeUS)) 244 245 // Convert Jetstream event to ATProto event format 246 if event.Commit != nil { 247 248 if err := s.backend.HandleEventJetstream(ctx, event); err != nil { 249 return fmt.Errorf("handle event (%s,%d): %w", event.Did, event.TimeUS, err) 250 } 251 252 firehoseCursorGauge.WithLabelValues("complete").Set(float64(event.TimeUS)) 253 } 254 255 return nil 256 }, 257 ) 258 259 // Configure Jetstream client 260 config := jsclient.DefaultClientConfig() 261 config.WebsocketURL = fmt.Sprintf("wss://%s/subscribe", host) 262 263 // Prepare cursor pointer 264 var cursorPtr *int64 265 if cursor > 0 { 266 cursorPtr = &cursor 267 } 268 269 // Create and connect client 270 client, err := jsclient.NewClient( 271 config, 272 slog.Default(), 273 sched, 274 ) 275 if err != nil { 276 return fmt.Errorf("create jetstream client: %w", err) 277 } 278 279 // Start reading from Jetstream 280 return client.ConnectAndRead(ctx, cursorPtr) 281}