Live video on the AT Protocol
1package atproto
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "net/url"
9 "runtime"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/events"
15 "github.com/bluesky-social/indigo/events/schedulers/parallel"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/repomgr"
19 "golang.org/x/sync/errgroup"
20 "stream.place/streamplace/pkg/aqhttp"
21 "stream.place/streamplace/pkg/aqtime"
22 "stream.place/streamplace/pkg/bus"
23 "stream.place/streamplace/pkg/config"
24 "stream.place/streamplace/pkg/constants"
25 "stream.place/streamplace/pkg/log"
26 "stream.place/streamplace/pkg/model"
27 notificationpkg "stream.place/streamplace/pkg/notifications"
28
29 "github.com/gorilla/websocket"
30)
31
32type ATProtoSynchronizer struct {
33 CLI *config.CLI
34 Model model.Model
35 LastSeen time.Time
36 LastEvent time.Time
37 Noter notificationpkg.FirebaseNotifier
38 Bus *bus.Bus
39}
40
41func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error {
42 retryCount := 0
43 retryWindow := time.Now()
44
45 for {
46 if ctx.Err() != nil {
47 return nil
48 }
49 err := atsync.StartFirehoseRetry(ctx)
50 if err != nil {
51 log.Error(ctx, "firehose error", "err", err)
52
53 // Check if we're within the 1-minute window
54 now := time.Now()
55 if now.Sub(retryWindow) > time.Minute {
56 // Reset the counter if more than a minute has passed
57 retryCount = 1
58 retryWindow = now
59 } else {
60 // Increment retry count if within the window
61 retryCount++
62 if retryCount >= 3 {
63 log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err)
64 return fmt.Errorf("firehose failed 3 times within a minute: %w", err)
65 }
66 }
67 }
68 }
69}
70
71func (atsync *ATProtoSynchronizer) StartFirehoseRetry(ctx context.Context) error {
72 ctx = log.WithLogValues(ctx, "func", "StartFirehose")
73 ctx, cancel := context.WithCancel(ctx)
74 defer cancel()
75 dialer := websocket.DefaultDialer
76 u, err := url.Parse(atsync.CLI.RelayHost)
77 if err != nil {
78 return fmt.Errorf("invalid relayHost URI: %w", err)
79 }
80 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
81 // if cursor != 0 {
82 // u.RawQuery = fmt.Sprintf("cursor=%d", cursor)
83 // }
84 con, _, err := dialer.Dial(u.String(), http.Header{
85 "User-Agent": []string{aqhttp.UserAgent},
86 })
87 if err != nil {
88 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
89 }
90
91 rsc := &events.RepoStreamCallbacks{
92 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
93 go atsync.handleCommitEventOps(ctx, evt)
94 return nil
95 },
96 Error: func(evt *events.ErrorFrame) error {
97 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message)
98 cancel()
99 return fmt.Errorf("firehose error: %s", evt.Error)
100 },
101 }
102
103 scheduler := parallel.NewScheduler(
104 10,
105 100,
106 atsync.CLI.RelayHost,
107 rsc.EventHandler,
108 )
109
110 log.Log(ctx, "starting firehose consumer", "relayHost", atsync.CLI.RelayHost)
111
112 g, ctx := errgroup.WithContext(ctx)
113
114 g.Go(func() error {
115 return events.HandleRepoStream(ctx, con, scheduler, nil)
116 })
117
118 g.Go(func() error {
119 ticker := time.NewTicker(5 * time.Second)
120 defer ticker.Stop()
121 for {
122 select {
123 case <-ctx.Done():
124 return nil
125 case <-ticker.C:
126 since := time.Since(atsync.LastEvent)
127 goroutines := runtime.NumGoroutine()
128 if since > 10*time.Second {
129 log.Warn(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines)
130 } else {
131 log.Debug(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines)
132 }
133 if time.Since(atsync.LastSeen) > 10*time.Second {
134 log.Warn(ctx, fmt.Sprintf("firehose dry; no new events for %s", time.Since(atsync.LastSeen)))
135 }
136 }
137 }
138 })
139
140 return g.Wait()
141}
142
143var CollectionFilter = []string{
144 constants.PLACE_STREAM_KEY,
145 constants.PLACE_STREAM_LIVESTREAM,
146 constants.PLACE_STREAM_CHAT_MESSAGE,
147 constants.PLACE_STREAM_CHAT_PROFILE,
148 constants.APP_BSKY_GRAPH_FOLLOW,
149 constants.APP_BSKY_FEED_POST,
150 constants.APP_BSKY_GRAPH_BLOCK,
151 constants.PLACE_STREAM_SERVER_SETTINGS,
152}
153
154func (atsync *ATProtoSynchronizer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) {
155 ctx = log.WithLogValues(ctx, "event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", fmt.Sprintf("%d", evt.Seq), "func", "handleCommitEventOps")
156 now := time.Now()
157 atsync.LastSeen = now
158
159 if evt.TooBig {
160 log.Warn(ctx, "skipping tooBig events for now")
161 return
162 }
163
164 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
165 if err != nil {
166 log.Error(ctx, "failed to read repo from car", "err", err)
167 return
168 }
169
170 for _, op := range evt.Ops {
171 collection, rkey, err := syntax.ParseRepoPath(op.Path)
172 if err != nil {
173 log.Error(ctx, "invalid path in repo op", "eventKind", op.Action, "path", op.Path)
174 return
175 }
176 ctx = log.WithLogValues(ctx, "eventKind", op.Action, "collection", collection.String(), "rkey", rkey.String())
177
178 if len(CollectionFilter) > 0 {
179 keep := false
180 for _, c := range CollectionFilter {
181 if collection.String() == c {
182 keep = true
183 break
184 }
185 }
186 if !keep {
187 continue
188 }
189 }
190
191 aqt, err := aqtime.FromString(evt.Time)
192 if err != nil {
193 log.Error(ctx, "failed to parse time", "err", err)
194 continue
195 }
196 atsync.LastEvent = aqt.Time()
197
198 r, err := atsync.Model.GetRepo(evt.Repo)
199 if err != nil {
200 log.Error(ctx, "failed to get repo", "err", err)
201 continue
202 }
203 // log.Warn(ctx, "got record we care about", "collection", collection, "rkey", rkey)
204
205 ek := repomgr.EventKind(op.Action)
206 switch ek {
207 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
208 // read the record bytes from blocks, and verify CID
209 rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path)
210 if err != nil {
211 log.Error(ctx, "reading record from event blocks (CAR)", "err", err)
212 break
213 }
214 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid {
215 log.Error(ctx, "mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid)
216 break
217 }
218
219 err = atsync.handleCreateUpdate(ctx, evt.Repo, rkey, recCBOR, op.Cid.String(), collection, ek == repomgr.EvtKindUpdateRecord, false)
220 if err != nil {
221 log.Error(ctx, "failed to handle create update", "err", err)
222 continue
223 }
224
225 case repomgr.EvtKindDeleteRecord:
226 if collection.String() == constants.APP_BSKY_GRAPH_FOLLOW {
227 if r == nil {
228 log.Debug(ctx, "no repo found for follow", "userDID", evt.Repo, "subjectDID", rkey.String())
229 continue
230 }
231 log.Debug(ctx, "deleting follow", "userDID", evt.Repo, "subjectDID", rkey.String())
232 err := atsync.Model.DeleteFollow(ctx, evt.Repo, rkey.String())
233 if err != nil {
234 log.Debug(ctx, "failed to delete follow", "err", err)
235 }
236 }
237
238 if collection.String() == constants.APP_BSKY_GRAPH_BLOCK {
239 if r == nil {
240 log.Debug(ctx, "no repo found for block", "userDID", evt.Repo, "subjectDID", rkey.String())
241 continue
242 }
243 log.Warn(ctx, "deleting block", "userDID", evt.Repo, "subjectDID", rkey.String())
244 err := atsync.Model.DeleteBlock(ctx, rkey.String())
245 if err != nil {
246 log.Error(ctx, "failed to delete block", "err", err)
247 }
248 }
249
250 if collection.String() == constants.PLACE_STREAM_KEY {
251 log.Warn(ctx, "revoking stream key", "userDID", evt.Repo, "rkey", rkey.String())
252 key, err := atsync.Model.GetSigningKeyByRKey(ctx, rkey.String())
253 if err != nil {
254 log.Error(ctx, "failed to get signing key", "err", err)
255 continue
256 }
257 if key == nil {
258 log.Warn(ctx, "no signing key found for stream key", "userDID", evt.Repo, "rkey", rkey.String())
259 continue
260 }
261 now := time.Now()
262 key.RevokedAt = &now
263 err = atsync.Model.UpdateSigningKey(key)
264 if err != nil {
265 log.Error(ctx, "failed to revoke signing key", "err", err)
266 }
267 atsync.Bus.Publish(evt.Repo, key)
268 }
269
270 default:
271 log.Error(ctx, "unexpected record op kind")
272 }
273 }
274}