Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/optional-convergence 334 lines 10 kB view raw
1package atproto 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "net/url" 9 "runtime" 10 "strings" 11 "time" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/identity" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/events" 17 "github.com/bluesky-social/indigo/events/schedulers/parallel" 18 lexutil "github.com/bluesky-social/indigo/lex/util" 19 "github.com/bluesky-social/indigo/repo" 20 "github.com/bluesky-social/indigo/repomgr" 21 "golang.org/x/sync/errgroup" 22 "stream.place/streamplace/pkg/aqhttp" 23 "stream.place/streamplace/pkg/aqtime" 24 "stream.place/streamplace/pkg/bus" 25 "stream.place/streamplace/pkg/config" 26 "stream.place/streamplace/pkg/constants" 27 "stream.place/streamplace/pkg/log" 28 "stream.place/streamplace/pkg/model" 29 notificationpkg "stream.place/streamplace/pkg/notifications" 30 "stream.place/streamplace/pkg/statedb" 31 32 "slices" 33 34 "github.com/gorilla/websocket" 35) 36 37type ATProtoSynchronizer struct { 38 CLI *config.CLI 39 Model model.Model 40 StatefulDB *statedb.StatefulDB 41 LastSeen time.Time 42 LastEvent time.Time 43 Noter notificationpkg.FirebaseNotifier 44 Bus *bus.Bus 45 PLCDirectory identity.Directory 46 CachedPLCDirectory identity.Directory 47} 48 49func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error { 50 retryCount := 0 51 retryWindow := time.Now() 52 53 for { 54 if ctx.Err() != nil { 55 return nil 56 } 57 err := atsync.StartFirehoseRetry(ctx) 58 if err != nil { 59 log.Error(ctx, "firehose error", "err", err) 60 61 // Check if we're within the 1-minute window 62 now := time.Now() 63 if now.Sub(retryWindow) > time.Minute { 64 // Reset the counter if more than a minute has passed 65 retryCount = 1 66 retryWindow = now 67 } else { 68 // Increment retry count if within the window 69 retryCount++ 70 if retryCount >= 3 { 71 log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err) 72 return fmt.Errorf("firehose failed 3 times within a minute: %w", err) 73 } 74 } 75 } 76 } 77} 78 79func (atsync *ATProtoSynchronizer) StartFirehoseRetry(ctx context.Context) error { 80 ctx = log.WithLogValues(ctx, "func", "StartFirehose") 81 ctx, cancel := context.WithCancel(ctx) 82 defer cancel() 83 dialer := websocket.DefaultDialer 84 u, err := url.Parse(atsync.CLI.RelayHost) 85 if err != nil { 86 return fmt.Errorf("invalid relayHost URI: %w", err) 87 } 88 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 89 // if cursor != 0 { 90 // u.RawQuery = fmt.Sprintf("cursor=%d", cursor) 91 // } 92 con, _, err := dialer.Dial(u.String(), http.Header{ 93 "User-Agent": []string{aqhttp.UserAgent}, 94 }) 95 if err != nil { 96 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 97 } 98 99 rsc := &events.RepoStreamCallbacks{ 100 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 101 go atsync.handleCommitEventOps(ctx, evt) 102 return nil 103 }, 104 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 105 go atsync.handleIdentityEventOps(ctx, evt) 106 return nil 107 }, 108 Error: func(evt *events.ErrorFrame) error { 109 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message) 110 cancel() 111 return fmt.Errorf("firehose error: %s", evt.Error) 112 }, 113 } 114 115 scheduler := parallel.NewScheduler( 116 10, 117 100, 118 atsync.CLI.RelayHost, 119 rsc.EventHandler, 120 ) 121 122 log.Log(ctx, "starting firehose consumer", "relayHost", atsync.CLI.RelayHost) 123 124 g, ctx := errgroup.WithContext(ctx) 125 126 g.Go(func() error { 127 err := events.HandleRepoStream(ctx, con, scheduler, nil) 128 if err != nil { 129 log.Error(ctx, "firehose error", "err", err) 130 return err 131 } 132 return nil 133 }) 134 135 g.Go(func() error { 136 ticker := time.NewTicker(5 * time.Second) 137 defer ticker.Stop() 138 for { 139 select { 140 case <-ctx.Done(): 141 return nil 142 case <-ticker.C: 143 since := time.Since(atsync.LastEvent) 144 goroutines := runtime.NumGoroutine() 145 if since > 10*time.Second { 146 log.Warn(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines) 147 } else { 148 log.Debug(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines) 149 } 150 if time.Since(atsync.LastSeen) > 10*time.Second { 151 log.Warn(ctx, fmt.Sprintf("firehose dry; no new events for %s", time.Since(atsync.LastSeen))) 152 } 153 } 154 } 155 }) 156 157 return g.Wait() 158} 159 160var CollectionFilter = []string{ 161 constants.APP_BSKY_GRAPH_FOLLOW, 162 constants.APP_BSKY_FEED_POST, 163 constants.APP_BSKY_GRAPH_BLOCK, 164 constants.PLACE_STREAM_LIVE_RECOMMENDATIONS, 165} 166 167func (atsync *ATProtoSynchronizer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) { 168 ctx = log.WithLogValues(ctx, "event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", fmt.Sprintf("%d", evt.Seq), "func", "handleCommitEventOps") 169 now := time.Now() 170 atsync.LastSeen = now 171 172 if evt.TooBig { 173 log.Warn(ctx, "skipping tooBig events for now") 174 return 175 } 176 177 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 178 if err != nil { 179 log.Error(ctx, "failed to read repo from car", "err", err) 180 return 181 } 182 183 for _, op := range evt.Ops { 184 collection, rkey, err := syntax.ParseRepoPath(op.Path) 185 uri := fmt.Sprintf("at://%s/%s", evt.Repo, op.Path) 186 if err != nil { 187 log.Error(ctx, "invalid path in repo op", "eventKind", op.Action, "path", op.Path) 188 return 189 } 190 ctx = log.WithLogValues(ctx, "eventKind", op.Action, "collection", collection.String(), "rkey", rkey.String()) 191 192 if len(CollectionFilter) > 0 { 193 keep := slices.Contains(CollectionFilter, collection.String()) 194 if strings.HasPrefix(collection.String(), "place.stream.") { 195 keep = true 196 } 197 if !keep { 198 continue 199 } 200 } 201 202 aqt, err := aqtime.FromString(evt.Time) 203 if err != nil { 204 log.Error(ctx, "failed to parse time", "err", err) 205 continue 206 } 207 opTime := aqt.Time() 208 atsync.LastEvent = opTime 209 210 r, err := atsync.Model.GetRepo(evt.Repo) 211 if err != nil { 212 log.Error(ctx, "failed to get repo", "err", err) 213 continue 214 } 215 // log.Warn(ctx, "got record we care about", "collection", collection, "rkey", rkey) 216 217 ek := repomgr.EventKind(op.Action) 218 switch ek { 219 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 220 // read the record bytes from blocks, and verify CID 221 rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path) 222 if err != nil { 223 log.Error(ctx, "reading record from event blocks (CAR)", "err", err) 224 break 225 } 226 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 227 log.Error(ctx, "mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 228 break 229 } 230 231 err = atsync.handleCreateUpdate(ctx, evt.Repo, rkey, recCBOR, op.Cid.String(), collection, ek == repomgr.EvtKindUpdateRecord, false) 232 if err != nil { 233 log.Error(ctx, "failed to handle create update", "err", err) 234 continue 235 } 236 237 case repomgr.EvtKindDeleteRecord: 238 if collection.String() == constants.APP_BSKY_GRAPH_FOLLOW { 239 if r == nil { 240 log.Debug(ctx, "no repo found for follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 241 continue 242 } 243 log.Debug(ctx, "deleting follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 244 err := atsync.Model.DeleteFollow(ctx, evt.Repo, rkey.String()) 245 if err != nil { 246 log.Debug(ctx, "failed to delete follow", "err", err) 247 } 248 } 249 250 if collection.String() == constants.APP_BSKY_GRAPH_BLOCK { 251 if r == nil { 252 log.Debug(ctx, "no repo found for block", "userDID", evt.Repo, "subjectDID", rkey.String()) 253 continue 254 } 255 log.Warn(ctx, "deleting block", "userDID", evt.Repo, "subjectDID", rkey.String()) 256 err := atsync.Model.DeleteBlock(ctx, rkey.String()) 257 if err != nil { 258 log.Error(ctx, "failed to delete block", "err", err) 259 } 260 } 261 262 if collection.String() == constants.PLACE_STREAM_KEY { 263 log.Warn(ctx, "revoking stream key", "userDID", evt.Repo, "rkey", rkey.String()) 264 key, err := atsync.Model.GetSigningKeyByRKey(ctx, rkey.String()) 265 if err != nil { 266 log.Error(ctx, "failed to get signing key", "err", err) 267 continue 268 } 269 if key == nil { 270 log.Warn(ctx, "no signing key found for stream key", "userDID", evt.Repo, "rkey", rkey.String()) 271 continue 272 } 273 now := time.Now() 274 key.RevokedAt = &now 275 err = atsync.Model.UpdateSigningKey(key) 276 if err != nil { 277 log.Error(ctx, "failed to revoke signing key", "err", err) 278 } 279 atsync.Bus.Publish(evt.Repo, key) 280 } 281 282 if collection.String() == constants.PLACE_STREAM_CHAT_MESSAGE { 283 msg, err := atsync.Model.GetChatMessage(uri) 284 if err != nil { 285 log.Error(ctx, "failed to get chat message", "err", err) 286 continue 287 } 288 if msg == nil { 289 log.Warn(ctx, "no chat message found for uri", "uri", uri) 290 continue 291 } 292 log.Warn(ctx, "deleting chat message", "userDID", evt.Repo, "uri", uri) 293 err = atsync.Model.DeleteChatMessage(ctx, uri, &opTime) 294 if err != nil { 295 log.Error(ctx, "failed to delete chat message", "err", err) 296 continue 297 } 298 mv, err := msg.ToStreamplaceMessageView() 299 if err != nil { 300 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 301 continue 302 } 303 isTrue := true 304 mv.Deleted = &isTrue 305 atsync.Bus.Publish(msg.StreamerRepoDID, mv) 306 } 307 308 default: 309 log.Error(ctx, "unexpected record op kind") 310 } 311 } 312} 313 314func (atsync *ATProtoSynchronizer) handleIdentityEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) { 315 handle := "" 316 if evt.Handle != nil { 317 handle = *evt.Handle 318 } 319 ctx = log.WithLogValues(ctx, "event", "identity", "did", evt.Did, "handle", handle, "func", "handleIdentityEventOps") 320 r, err := atsync.Model.GetRepo(evt.Did) 321 if err != nil { 322 log.Error(ctx, "failed to get repo", "err", err) 323 return 324 } 325 if r == nil { 326 log.Debug(ctx, "no repo found for identity", "did", evt.Did) 327 return 328 } 329 _, err = atsync.RefreshIdentity(ctx, evt.Did) 330 if err != nil { 331 log.Error(ctx, "failed to refresh ident", "err", err) 332 return 333 } 334}