Live video on the AT Protocol
1package atproto
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "net/url"
9 "runtime"
10 "strings"
11 "time"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/identity"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/events"
17 "github.com/bluesky-social/indigo/events/schedulers/parallel"
18 lexutil "github.com/bluesky-social/indigo/lex/util"
19 "github.com/bluesky-social/indigo/repo"
20 "github.com/bluesky-social/indigo/repomgr"
21 "golang.org/x/sync/errgroup"
22 "stream.place/streamplace/pkg/aqhttp"
23 "stream.place/streamplace/pkg/aqtime"
24 "stream.place/streamplace/pkg/bus"
25 "stream.place/streamplace/pkg/config"
26 "stream.place/streamplace/pkg/constants"
27 "stream.place/streamplace/pkg/log"
28 "stream.place/streamplace/pkg/model"
29 notificationpkg "stream.place/streamplace/pkg/notifications"
30 "stream.place/streamplace/pkg/statedb"
31
32 "slices"
33
34 "github.com/gorilla/websocket"
35)
36
37type ATProtoSynchronizer struct {
38 CLI *config.CLI
39 Model model.Model
40 StatefulDB *statedb.StatefulDB
41 LastSeen time.Time
42 LastEvent time.Time
43 Noter notificationpkg.FirebaseNotifier
44 Bus *bus.Bus
45 PLCDirectory identity.Directory
46 CachedPLCDirectory identity.Directory
47}
48
49func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error {
50 retryCount := 0
51 retryWindow := time.Now()
52
53 for {
54 if ctx.Err() != nil {
55 return nil
56 }
57 err := atsync.StartFirehoseRetry(ctx)
58 if err != nil {
59 log.Error(ctx, "firehose error", "err", err)
60
61 // Check if we're within the 1-minute window
62 now := time.Now()
63 if now.Sub(retryWindow) > time.Minute {
64 // Reset the counter if more than a minute has passed
65 retryCount = 1
66 retryWindow = now
67 } else {
68 // Increment retry count if within the window
69 retryCount++
70 if retryCount >= 3 {
71 log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err)
72 return fmt.Errorf("firehose failed 3 times within a minute: %w", err)
73 }
74 }
75 }
76 }
77}
78
79func (atsync *ATProtoSynchronizer) StartFirehoseRetry(ctx context.Context) error {
80 ctx = log.WithLogValues(ctx, "func", "StartFirehose")
81 ctx, cancel := context.WithCancel(ctx)
82 defer cancel()
83 dialer := websocket.DefaultDialer
84 u, err := url.Parse(atsync.CLI.RelayHost)
85 if err != nil {
86 return fmt.Errorf("invalid relayHost URI: %w", err)
87 }
88 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
89 // if cursor != 0 {
90 // u.RawQuery = fmt.Sprintf("cursor=%d", cursor)
91 // }
92 con, _, err := dialer.Dial(u.String(), http.Header{
93 "User-Agent": []string{aqhttp.UserAgent},
94 })
95 if err != nil {
96 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
97 }
98
99 rsc := &events.RepoStreamCallbacks{
100 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
101 go atsync.handleCommitEventOps(ctx, evt)
102 return nil
103 },
104 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
105 go atsync.handleIdentityEventOps(ctx, evt)
106 return nil
107 },
108 Error: func(evt *events.ErrorFrame) error {
109 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message)
110 cancel()
111 return fmt.Errorf("firehose error: %s", evt.Error)
112 },
113 }
114
115 scheduler := parallel.NewScheduler(
116 10,
117 100,
118 atsync.CLI.RelayHost,
119 rsc.EventHandler,
120 )
121
122 log.Log(ctx, "starting firehose consumer", "relayHost", atsync.CLI.RelayHost)
123
124 g, ctx := errgroup.WithContext(ctx)
125
126 g.Go(func() error {
127 err := events.HandleRepoStream(ctx, con, scheduler, nil)
128 if err != nil {
129 log.Error(ctx, "firehose error", "err", err)
130 return err
131 }
132 return nil
133 })
134
135 g.Go(func() error {
136 ticker := time.NewTicker(5 * time.Second)
137 defer ticker.Stop()
138 for {
139 select {
140 case <-ctx.Done():
141 return nil
142 case <-ticker.C:
143 since := time.Since(atsync.LastEvent)
144 goroutines := runtime.NumGoroutine()
145 if since > 10*time.Second {
146 log.Warn(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines)
147 } else {
148 log.Debug(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines)
149 }
150 if time.Since(atsync.LastSeen) > 10*time.Second {
151 log.Warn(ctx, fmt.Sprintf("firehose dry; no new events for %s", time.Since(atsync.LastSeen)))
152 }
153 }
154 }
155 })
156
157 return g.Wait()
158}
159
160var CollectionFilter = []string{
161 constants.APP_BSKY_GRAPH_FOLLOW,
162 constants.APP_BSKY_FEED_POST,
163 constants.APP_BSKY_GRAPH_BLOCK,
164 constants.PLACE_STREAM_LIVE_RECOMMENDATIONS,
165}
166
167func (atsync *ATProtoSynchronizer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) {
168 ctx = log.WithLogValues(ctx, "event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", fmt.Sprintf("%d", evt.Seq), "func", "handleCommitEventOps")
169 now := time.Now()
170 atsync.LastSeen = now
171
172 if evt.TooBig {
173 log.Warn(ctx, "skipping tooBig events for now")
174 return
175 }
176
177 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
178 if err != nil {
179 log.Error(ctx, "failed to read repo from car", "err", err)
180 return
181 }
182
183 for _, op := range evt.Ops {
184 collection, rkey, err := syntax.ParseRepoPath(op.Path)
185 uri := fmt.Sprintf("at://%s/%s", evt.Repo, op.Path)
186 if err != nil {
187 log.Error(ctx, "invalid path in repo op", "eventKind", op.Action, "path", op.Path)
188 return
189 }
190 ctx = log.WithLogValues(ctx, "eventKind", op.Action, "collection", collection.String(), "rkey", rkey.String())
191
192 if len(CollectionFilter) > 0 {
193 keep := slices.Contains(CollectionFilter, collection.String())
194 if strings.HasPrefix(collection.String(), "place.stream.") {
195 keep = true
196 }
197 if !keep {
198 continue
199 }
200 }
201
202 aqt, err := aqtime.FromString(evt.Time)
203 if err != nil {
204 log.Error(ctx, "failed to parse time", "err", err)
205 continue
206 }
207 opTime := aqt.Time()
208 atsync.LastEvent = opTime
209
210 r, err := atsync.Model.GetRepo(evt.Repo)
211 if err != nil {
212 log.Error(ctx, "failed to get repo", "err", err)
213 continue
214 }
215 // log.Warn(ctx, "got record we care about", "collection", collection, "rkey", rkey)
216
217 ek := repomgr.EventKind(op.Action)
218 switch ek {
219 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
220 // read the record bytes from blocks, and verify CID
221 rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path)
222 if err != nil {
223 log.Error(ctx, "reading record from event blocks (CAR)", "err", err)
224 break
225 }
226 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid {
227 log.Error(ctx, "mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid)
228 break
229 }
230
231 err = atsync.handleCreateUpdate(ctx, evt.Repo, rkey, recCBOR, op.Cid.String(), collection, ek == repomgr.EvtKindUpdateRecord, false)
232 if err != nil {
233 log.Error(ctx, "failed to handle create update", "err", err)
234 continue
235 }
236
237 case repomgr.EvtKindDeleteRecord:
238 if collection.String() == constants.APP_BSKY_GRAPH_FOLLOW {
239 if r == nil {
240 log.Debug(ctx, "no repo found for follow", "userDID", evt.Repo, "subjectDID", rkey.String())
241 continue
242 }
243 log.Debug(ctx, "deleting follow", "userDID", evt.Repo, "subjectDID", rkey.String())
244 err := atsync.Model.DeleteFollow(ctx, evt.Repo, rkey.String())
245 if err != nil {
246 log.Debug(ctx, "failed to delete follow", "err", err)
247 }
248 }
249
250 if collection.String() == constants.APP_BSKY_GRAPH_BLOCK {
251 if r == nil {
252 log.Debug(ctx, "no repo found for block", "userDID", evt.Repo, "subjectDID", rkey.String())
253 continue
254 }
255 log.Warn(ctx, "deleting block", "userDID", evt.Repo, "subjectDID", rkey.String())
256 err := atsync.Model.DeleteBlock(ctx, rkey.String())
257 if err != nil {
258 log.Error(ctx, "failed to delete block", "err", err)
259 }
260 }
261
262 if collection.String() == constants.PLACE_STREAM_KEY {
263 log.Warn(ctx, "revoking stream key", "userDID", evt.Repo, "rkey", rkey.String())
264 key, err := atsync.Model.GetSigningKeyByRKey(ctx, rkey.String())
265 if err != nil {
266 log.Error(ctx, "failed to get signing key", "err", err)
267 continue
268 }
269 if key == nil {
270 log.Warn(ctx, "no signing key found for stream key", "userDID", evt.Repo, "rkey", rkey.String())
271 continue
272 }
273 now := time.Now()
274 key.RevokedAt = &now
275 err = atsync.Model.UpdateSigningKey(key)
276 if err != nil {
277 log.Error(ctx, "failed to revoke signing key", "err", err)
278 }
279 atsync.Bus.Publish(evt.Repo, key)
280 }
281
282 if collection.String() == constants.PLACE_STREAM_CHAT_MESSAGE {
283 msg, err := atsync.Model.GetChatMessage(uri)
284 if err != nil {
285 log.Error(ctx, "failed to get chat message", "err", err)
286 continue
287 }
288 if msg == nil {
289 log.Warn(ctx, "no chat message found for uri", "uri", uri)
290 continue
291 }
292 log.Warn(ctx, "deleting chat message", "userDID", evt.Repo, "uri", uri)
293 err = atsync.Model.DeleteChatMessage(ctx, uri, &opTime)
294 if err != nil {
295 log.Error(ctx, "failed to delete chat message", "err", err)
296 continue
297 }
298 mv, err := msg.ToStreamplaceMessageView()
299 if err != nil {
300 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
301 continue
302 }
303 isTrue := true
304 mv.Deleted = &isTrue
305 atsync.Bus.Publish(msg.StreamerRepoDID, mv)
306 }
307
308 default:
309 log.Error(ctx, "unexpected record op kind")
310 }
311 }
312}
313
314func (atsync *ATProtoSynchronizer) handleIdentityEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) {
315 handle := ""
316 if evt.Handle != nil {
317 handle = *evt.Handle
318 }
319 ctx = log.WithLogValues(ctx, "event", "identity", "did", evt.Did, "handle", handle, "func", "handleIdentityEventOps")
320 r, err := atsync.Model.GetRepo(evt.Did)
321 if err != nil {
322 log.Error(ctx, "failed to get repo", "err", err)
323 return
324 }
325 if r == nil {
326 log.Debug(ctx, "no repo found for identity", "did", evt.Did)
327 return
328 }
329 _, err = atsync.RefreshIdentity(ctx, evt.Did)
330 if err != nil {
331 log.Error(ctx, "failed to refresh ident", "err", err)
332 return
333 }
334}