Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/mobile-performance 274 lines 8.0 kB view raw
1package atproto 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "net/url" 9 "runtime" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/repomgr" 19 "golang.org/x/sync/errgroup" 20 "stream.place/streamplace/pkg/aqhttp" 21 "stream.place/streamplace/pkg/aqtime" 22 "stream.place/streamplace/pkg/bus" 23 "stream.place/streamplace/pkg/config" 24 "stream.place/streamplace/pkg/constants" 25 "stream.place/streamplace/pkg/log" 26 "stream.place/streamplace/pkg/model" 27 notificationpkg "stream.place/streamplace/pkg/notifications" 28 29 "github.com/gorilla/websocket" 30) 31 32type ATProtoSynchronizer struct { 33 CLI *config.CLI 34 Model model.Model 35 LastSeen time.Time 36 LastEvent time.Time 37 Noter notificationpkg.FirebaseNotifier 38 Bus *bus.Bus 39} 40 41func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error { 42 retryCount := 0 43 retryWindow := time.Now() 44 45 for { 46 if ctx.Err() != nil { 47 return nil 48 } 49 err := atsync.StartFirehoseRetry(ctx) 50 if err != nil { 51 log.Error(ctx, "firehose error", "err", err) 52 53 // Check if we're within the 1-minute window 54 now := time.Now() 55 if now.Sub(retryWindow) > time.Minute { 56 // Reset the counter if more than a minute has passed 57 retryCount = 1 58 retryWindow = now 59 } else { 60 // Increment retry count if within the window 61 retryCount++ 62 if retryCount >= 3 { 63 log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err) 64 return fmt.Errorf("firehose failed 3 times within a minute: %w", err) 65 } 66 } 67 } 68 } 69} 70 71func (atsync *ATProtoSynchronizer) StartFirehoseRetry(ctx context.Context) error { 72 ctx = log.WithLogValues(ctx, "func", "StartFirehose") 73 ctx, cancel := context.WithCancel(ctx) 74 defer cancel() 75 dialer := websocket.DefaultDialer 76 u, err := url.Parse(atsync.CLI.RelayHost) 77 if err != nil { 78 return fmt.Errorf("invalid relayHost URI: %w", err) 79 } 80 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 81 // if cursor != 0 { 82 // u.RawQuery = fmt.Sprintf("cursor=%d", cursor) 83 // } 84 con, _, err := dialer.Dial(u.String(), http.Header{ 85 "User-Agent": []string{aqhttp.UserAgent}, 86 }) 87 if err != nil { 88 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 89 } 90 91 rsc := &events.RepoStreamCallbacks{ 92 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 93 go atsync.handleCommitEventOps(ctx, evt) 94 return nil 95 }, 96 Error: func(evt *events.ErrorFrame) error { 97 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message) 98 cancel() 99 return fmt.Errorf("firehose error: %s", evt.Error) 100 }, 101 } 102 103 scheduler := parallel.NewScheduler( 104 10, 105 100, 106 atsync.CLI.RelayHost, 107 rsc.EventHandler, 108 ) 109 110 log.Log(ctx, "starting firehose consumer", "relayHost", atsync.CLI.RelayHost) 111 112 g, ctx := errgroup.WithContext(ctx) 113 114 g.Go(func() error { 115 return events.HandleRepoStream(ctx, con, scheduler, nil) 116 }) 117 118 g.Go(func() error { 119 ticker := time.NewTicker(5 * time.Second) 120 defer ticker.Stop() 121 for { 122 select { 123 case <-ctx.Done(): 124 return nil 125 case <-ticker.C: 126 since := time.Since(atsync.LastEvent) 127 goroutines := runtime.NumGoroutine() 128 if since > 10*time.Second { 129 log.Warn(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines) 130 } else { 131 log.Debug(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines) 132 } 133 if time.Since(atsync.LastSeen) > 10*time.Second { 134 log.Warn(ctx, fmt.Sprintf("firehose dry; no new events for %s", time.Since(atsync.LastSeen))) 135 } 136 } 137 } 138 }) 139 140 return g.Wait() 141} 142 143var CollectionFilter = []string{ 144 constants.PLACE_STREAM_KEY, 145 constants.PLACE_STREAM_LIVESTREAM, 146 constants.PLACE_STREAM_CHAT_MESSAGE, 147 constants.PLACE_STREAM_CHAT_PROFILE, 148 constants.APP_BSKY_GRAPH_FOLLOW, 149 constants.APP_BSKY_FEED_POST, 150 constants.APP_BSKY_GRAPH_BLOCK, 151 constants.PLACE_STREAM_SERVER_SETTINGS, 152} 153 154func (atsync *ATProtoSynchronizer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) { 155 ctx = log.WithLogValues(ctx, "event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", fmt.Sprintf("%d", evt.Seq), "func", "handleCommitEventOps") 156 now := time.Now() 157 atsync.LastSeen = now 158 159 if evt.TooBig { 160 log.Warn(ctx, "skipping tooBig events for now") 161 return 162 } 163 164 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 165 if err != nil { 166 log.Error(ctx, "failed to read repo from car", "err", err) 167 return 168 } 169 170 for _, op := range evt.Ops { 171 collection, rkey, err := syntax.ParseRepoPath(op.Path) 172 if err != nil { 173 log.Error(ctx, "invalid path in repo op", "eventKind", op.Action, "path", op.Path) 174 return 175 } 176 ctx = log.WithLogValues(ctx, "eventKind", op.Action, "collection", collection.String(), "rkey", rkey.String()) 177 178 if len(CollectionFilter) > 0 { 179 keep := false 180 for _, c := range CollectionFilter { 181 if collection.String() == c { 182 keep = true 183 break 184 } 185 } 186 if !keep { 187 continue 188 } 189 } 190 191 aqt, err := aqtime.FromString(evt.Time) 192 if err != nil { 193 log.Error(ctx, "failed to parse time", "err", err) 194 continue 195 } 196 atsync.LastEvent = aqt.Time() 197 198 r, err := atsync.Model.GetRepo(evt.Repo) 199 if err != nil { 200 log.Error(ctx, "failed to get repo", "err", err) 201 continue 202 } 203 // log.Warn(ctx, "got record we care about", "collection", collection, "rkey", rkey) 204 205 ek := repomgr.EventKind(op.Action) 206 switch ek { 207 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 208 // read the record bytes from blocks, and verify CID 209 rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path) 210 if err != nil { 211 log.Error(ctx, "reading record from event blocks (CAR)", "err", err) 212 break 213 } 214 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 215 log.Error(ctx, "mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 216 break 217 } 218 219 err = atsync.handleCreateUpdate(ctx, evt.Repo, rkey, recCBOR, op.Cid.String(), collection, ek == repomgr.EvtKindUpdateRecord, false) 220 if err != nil { 221 log.Error(ctx, "failed to handle create update", "err", err) 222 continue 223 } 224 225 case repomgr.EvtKindDeleteRecord: 226 if collection.String() == constants.APP_BSKY_GRAPH_FOLLOW { 227 if r == nil { 228 log.Debug(ctx, "no repo found for follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 229 continue 230 } 231 log.Debug(ctx, "deleting follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 232 err := atsync.Model.DeleteFollow(ctx, evt.Repo, rkey.String()) 233 if err != nil { 234 log.Debug(ctx, "failed to delete follow", "err", err) 235 } 236 } 237 238 if collection.String() == constants.APP_BSKY_GRAPH_BLOCK { 239 if r == nil { 240 log.Debug(ctx, "no repo found for block", "userDID", evt.Repo, "subjectDID", rkey.String()) 241 continue 242 } 243 log.Warn(ctx, "deleting block", "userDID", evt.Repo, "subjectDID", rkey.String()) 244 err := atsync.Model.DeleteBlock(ctx, rkey.String()) 245 if err != nil { 246 log.Error(ctx, "failed to delete block", "err", err) 247 } 248 } 249 250 if collection.String() == constants.PLACE_STREAM_KEY { 251 log.Warn(ctx, "revoking stream key", "userDID", evt.Repo, "rkey", rkey.String()) 252 key, err := atsync.Model.GetSigningKeyByRKey(ctx, rkey.String()) 253 if err != nil { 254 log.Error(ctx, "failed to get signing key", "err", err) 255 continue 256 } 257 if key == nil { 258 log.Warn(ctx, "no signing key found for stream key", "userDID", evt.Repo, "rkey", rkey.String()) 259 continue 260 } 261 now := time.Now() 262 key.RevokedAt = &now 263 err = atsync.Model.UpdateSigningKey(key) 264 if err != nil { 265 log.Error(ctx, "failed to revoke signing key", "err", err) 266 } 267 atsync.Bus.Publish(evt.Repo, key) 268 } 269 270 default: 271 log.Error(ctx, "unexpected record op kind") 272 } 273 } 274}