Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/fix-gitlab 253 lines 7.3 kB view raw
1package atproto 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "net/url" 9 "runtime" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/repomgr" 19 "golang.org/x/sync/errgroup" 20 "stream.place/streamplace/pkg/aqhttp" 21 "stream.place/streamplace/pkg/aqtime" 22 "stream.place/streamplace/pkg/bus" 23 "stream.place/streamplace/pkg/config" 24 "stream.place/streamplace/pkg/constants" 25 "stream.place/streamplace/pkg/log" 26 "stream.place/streamplace/pkg/model" 27 notificationpkg "stream.place/streamplace/pkg/notifications" 28 29 "github.com/gorilla/websocket" 30) 31 32type ATProtoSynchronizer struct { 33 CLI *config.CLI 34 Model model.Model 35 LastSeen time.Time 36 LastEvent time.Time 37 Noter notificationpkg.FirebaseNotifier 38 Bus *bus.Bus 39} 40 41func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error { 42 retryCount := 0 43 retryWindow := time.Now() 44 45 for { 46 if ctx.Err() != nil { 47 return nil 48 } 49 err := atsync.StartFirehoseRetry(ctx) 50 if err != nil { 51 log.Error(ctx, "firehose error", "err", err) 52 53 // Check if we're within the 1-minute window 54 now := time.Now() 55 if now.Sub(retryWindow) > time.Minute { 56 // Reset the counter if more than a minute has passed 57 retryCount = 1 58 retryWindow = now 59 } else { 60 // Increment retry count if within the window 61 retryCount++ 62 if retryCount >= 3 { 63 log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err) 64 return fmt.Errorf("firehose failed 3 times within a minute: %w", err) 65 } 66 } 67 } 68 } 69} 70 71func (atsync *ATProtoSynchronizer) StartFirehoseRetry(ctx context.Context) error { 72 ctx = log.WithLogValues(ctx, "func", "StartFirehose") 73 ctx, cancel := context.WithCancel(ctx) 74 defer cancel() 75 dialer := websocket.DefaultDialer 76 u, err := url.Parse(atsync.CLI.RelayHost) 77 if err != nil { 78 return fmt.Errorf("invalid relayHost URI: %w", err) 79 } 80 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 81 // if cursor != 0 { 82 // u.RawQuery = fmt.Sprintf("cursor=%d", cursor) 83 // } 84 con, _, err := dialer.Dial(u.String(), http.Header{ 85 "User-Agent": []string{aqhttp.UserAgent}, 86 }) 87 if err != nil { 88 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 89 } 90 91 rsc := &events.RepoStreamCallbacks{ 92 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 93 go atsync.handleCommitEventOps(ctx, evt) 94 return nil 95 }, 96 Error: func(evt *events.ErrorFrame) error { 97 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message) 98 cancel() 99 return fmt.Errorf("firehose error: %s", evt.Error) 100 }, 101 } 102 103 scheduler := parallel.NewScheduler( 104 10, 105 100, 106 atsync.CLI.RelayHost, 107 rsc.EventHandler, 108 ) 109 110 log.Log(ctx, "starting firehose consumer", "relayHost", atsync.CLI.RelayHost) 111 112 g, ctx := errgroup.WithContext(ctx) 113 114 g.Go(func() error { 115 return events.HandleRepoStream(ctx, con, scheduler, nil) 116 }) 117 118 g.Go(func() error { 119 ticker := time.NewTicker(5 * time.Second) 120 defer ticker.Stop() 121 for { 122 select { 123 case <-ctx.Done(): 124 return nil 125 case <-ticker.C: 126 since := time.Since(atsync.LastEvent) 127 goroutines := runtime.NumGoroutine() 128 if since > 10*time.Second { 129 log.Warn(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines) 130 } else { 131 log.Debug(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines) 132 } 133 if time.Since(atsync.LastSeen) > 10*time.Second { 134 log.Warn(ctx, fmt.Sprintf("firehose dry; no new events for %s", time.Since(atsync.LastSeen))) 135 } 136 } 137 } 138 }) 139 140 return g.Wait() 141} 142 143var CollectionFilter = []string{ 144 constants.PLACE_STREAM_KEY, 145 constants.PLACE_STREAM_LIVESTREAM, 146 constants.PLACE_STREAM_CHAT_MESSAGE, 147 constants.PLACE_STREAM_CHAT_PROFILE, 148 constants.APP_BSKY_GRAPH_FOLLOW, 149 constants.APP_BSKY_FEED_POST, 150 constants.APP_BSKY_GRAPH_BLOCK, 151} 152 153func (atsync *ATProtoSynchronizer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) { 154 ctx = log.WithLogValues(ctx, "event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", fmt.Sprintf("%d", evt.Seq), "func", "handleCommitEventOps") 155 now := time.Now() 156 atsync.LastSeen = now 157 158 if evt.TooBig { 159 log.Warn(ctx, "skipping tooBig events for now") 160 return 161 } 162 163 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 164 if err != nil { 165 log.Error(ctx, "failed to read repo from car", "err", err) 166 return 167 } 168 169 for _, op := range evt.Ops { 170 collection, rkey, err := syntax.ParseRepoPath(op.Path) 171 if err != nil { 172 log.Error(ctx, "invalid path in repo op", "eventKind", op.Action, "path", op.Path) 173 return 174 } 175 ctx = log.WithLogValues(ctx, "eventKind", op.Action, "collection", collection.String(), "rkey", rkey.String()) 176 177 if len(CollectionFilter) > 0 { 178 keep := false 179 for _, c := range CollectionFilter { 180 if collection.String() == c { 181 keep = true 182 break 183 } 184 } 185 if !keep { 186 continue 187 } 188 } 189 190 aqt, err := aqtime.FromString(evt.Time) 191 if err != nil { 192 log.Error(ctx, "failed to parse time", "err", err) 193 continue 194 } 195 atsync.LastEvent = aqt.Time() 196 197 r, err := atsync.Model.GetRepo(evt.Repo) 198 if err != nil { 199 log.Error(ctx, "failed to get repo", "err", err) 200 continue 201 } 202 // log.Warn(ctx, "got record we care about", "collection", collection, "rkey", rkey) 203 204 ek := repomgr.EventKind(op.Action) 205 switch ek { 206 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 207 // read the record bytes from blocks, and verify CID 208 rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path) 209 if err != nil { 210 log.Error(ctx, "reading record from event blocks (CAR)", "err", err) 211 break 212 } 213 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 214 log.Error(ctx, "mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 215 break 216 } 217 218 err = atsync.handleCreateUpdate(ctx, evt.Repo, rkey, recCBOR, op.Cid.String(), collection, ek == repomgr.EvtKindUpdateRecord) 219 if err != nil { 220 log.Error(ctx, "failed to handle create update", "err", err) 221 continue 222 } 223 224 case repomgr.EvtKindDeleteRecord: 225 if collection.String() == constants.APP_BSKY_GRAPH_FOLLOW { 226 if r == nil { 227 log.Debug(ctx, "no repo found for follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 228 continue 229 } 230 log.Debug(ctx, "deleting follow", "userDID", evt.Repo, "subjectDID", rkey.String()) 231 err := atsync.Model.DeleteFollow(ctx, evt.Repo, rkey.String()) 232 if err != nil { 233 log.Debug(ctx, "failed to delete follow", "err", err) 234 } 235 } 236 237 if collection.String() == constants.APP_BSKY_GRAPH_BLOCK { 238 if r == nil { 239 log.Debug(ctx, "no repo found for block", "userDID", evt.Repo, "subjectDID", rkey.String()) 240 continue 241 } 242 log.Warn(ctx, "deleting block", "userDID", evt.Repo, "subjectDID", rkey.String()) 243 err := atsync.Model.DeleteBlock(ctx, rkey.String()) 244 if err != nil { 245 log.Error(ctx, "failed to delete block", "err", err) 246 } 247 } 248 249 default: 250 log.Error(ctx, "unexpected record op kind") 251 } 252 } 253}