Live video on the AT Protocol
1package atproto
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "net/url"
9 "runtime"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/events"
15 "github.com/bluesky-social/indigo/events/schedulers/parallel"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/repomgr"
19 "golang.org/x/sync/errgroup"
20 "stream.place/streamplace/pkg/aqhttp"
21 "stream.place/streamplace/pkg/aqtime"
22 "stream.place/streamplace/pkg/bus"
23 "stream.place/streamplace/pkg/config"
24 "stream.place/streamplace/pkg/constants"
25 "stream.place/streamplace/pkg/log"
26 "stream.place/streamplace/pkg/model"
27 notificationpkg "stream.place/streamplace/pkg/notifications"
28
29 "github.com/gorilla/websocket"
30)
31
32type ATProtoSynchronizer struct {
33 CLI *config.CLI
34 Model model.Model
35 LastSeen time.Time
36 LastEvent time.Time
37 Noter notificationpkg.FirebaseNotifier
38 Bus *bus.Bus
39}
40
41func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error {
42 retryCount := 0
43 retryWindow := time.Now()
44
45 for {
46 if ctx.Err() != nil {
47 return nil
48 }
49 err := atsync.StartFirehoseRetry(ctx)
50 if err != nil {
51 log.Error(ctx, "firehose error", "err", err)
52
53 // Check if we're within the 1-minute window
54 now := time.Now()
55 if now.Sub(retryWindow) > time.Minute {
56 // Reset the counter if more than a minute has passed
57 retryCount = 1
58 retryWindow = now
59 } else {
60 // Increment retry count if within the window
61 retryCount++
62 if retryCount >= 3 {
63 log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err)
64 return fmt.Errorf("firehose failed 3 times within a minute: %w", err)
65 }
66 }
67 }
68 }
69}
70
71func (atsync *ATProtoSynchronizer) StartFirehoseRetry(ctx context.Context) error {
72 ctx = log.WithLogValues(ctx, "func", "StartFirehose")
73 ctx, cancel := context.WithCancel(ctx)
74 defer cancel()
75 dialer := websocket.DefaultDialer
76 u, err := url.Parse(atsync.CLI.RelayHost)
77 if err != nil {
78 return fmt.Errorf("invalid relayHost URI: %w", err)
79 }
80 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
81 // if cursor != 0 {
82 // u.RawQuery = fmt.Sprintf("cursor=%d", cursor)
83 // }
84 con, _, err := dialer.Dial(u.String(), http.Header{
85 "User-Agent": []string{aqhttp.UserAgent},
86 })
87 if err != nil {
88 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
89 }
90
91 rsc := &events.RepoStreamCallbacks{
92 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
93 go atsync.handleCommitEventOps(ctx, evt)
94 return nil
95 },
96 Error: func(evt *events.ErrorFrame) error {
97 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message)
98 cancel()
99 return fmt.Errorf("firehose error: %s", evt.Error)
100 },
101 }
102
103 scheduler := parallel.NewScheduler(
104 10,
105 100,
106 atsync.CLI.RelayHost,
107 rsc.EventHandler,
108 )
109
110 log.Log(ctx, "starting firehose consumer", "relayHost", atsync.CLI.RelayHost)
111
112 g, ctx := errgroup.WithContext(ctx)
113
114 g.Go(func() error {
115 return events.HandleRepoStream(ctx, con, scheduler, nil)
116 })
117
118 g.Go(func() error {
119 ticker := time.NewTicker(5 * time.Second)
120 defer ticker.Stop()
121 for {
122 select {
123 case <-ctx.Done():
124 return nil
125 case <-ticker.C:
126 since := time.Since(atsync.LastEvent)
127 goroutines := runtime.NumGoroutine()
128 if since > 10*time.Second {
129 log.Warn(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines)
130 } else {
131 log.Debug(ctx, fmt.Sprintf("firehose is %s behind real time", since), "goroutines", goroutines)
132 }
133 if time.Since(atsync.LastSeen) > 10*time.Second {
134 log.Warn(ctx, fmt.Sprintf("firehose dry; no new events for %s", time.Since(atsync.LastSeen)))
135 }
136 }
137 }
138 })
139
140 return g.Wait()
141}
142
143var CollectionFilter = []string{
144 constants.PLACE_STREAM_KEY,
145 constants.PLACE_STREAM_LIVESTREAM,
146 constants.PLACE_STREAM_CHAT_MESSAGE,
147 constants.PLACE_STREAM_CHAT_PROFILE,
148 constants.APP_BSKY_GRAPH_FOLLOW,
149 constants.APP_BSKY_FEED_POST,
150 constants.APP_BSKY_GRAPH_BLOCK,
151}
152
153func (atsync *ATProtoSynchronizer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) {
154 ctx = log.WithLogValues(ctx, "event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", fmt.Sprintf("%d", evt.Seq), "func", "handleCommitEventOps")
155 now := time.Now()
156 atsync.LastSeen = now
157
158 if evt.TooBig {
159 log.Warn(ctx, "skipping tooBig events for now")
160 return
161 }
162
163 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
164 if err != nil {
165 log.Error(ctx, "failed to read repo from car", "err", err)
166 return
167 }
168
169 for _, op := range evt.Ops {
170 collection, rkey, err := syntax.ParseRepoPath(op.Path)
171 if err != nil {
172 log.Error(ctx, "invalid path in repo op", "eventKind", op.Action, "path", op.Path)
173 return
174 }
175 ctx = log.WithLogValues(ctx, "eventKind", op.Action, "collection", collection.String(), "rkey", rkey.String())
176
177 if len(CollectionFilter) > 0 {
178 keep := false
179 for _, c := range CollectionFilter {
180 if collection.String() == c {
181 keep = true
182 break
183 }
184 }
185 if !keep {
186 continue
187 }
188 }
189
190 aqt, err := aqtime.FromString(evt.Time)
191 if err != nil {
192 log.Error(ctx, "failed to parse time", "err", err)
193 continue
194 }
195 atsync.LastEvent = aqt.Time()
196
197 r, err := atsync.Model.GetRepo(evt.Repo)
198 if err != nil {
199 log.Error(ctx, "failed to get repo", "err", err)
200 continue
201 }
202 // log.Warn(ctx, "got record we care about", "collection", collection, "rkey", rkey)
203
204 ek := repomgr.EventKind(op.Action)
205 switch ek {
206 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
207 // read the record bytes from blocks, and verify CID
208 rc, recCBOR, err := rr.GetRecordBytes(ctx, op.Path)
209 if err != nil {
210 log.Error(ctx, "reading record from event blocks (CAR)", "err", err)
211 break
212 }
213 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid {
214 log.Error(ctx, "mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid)
215 break
216 }
217
218 err = atsync.handleCreateUpdate(ctx, evt.Repo, rkey, recCBOR, op.Cid.String(), collection, ek == repomgr.EvtKindUpdateRecord)
219 if err != nil {
220 log.Error(ctx, "failed to handle create update", "err", err)
221 continue
222 }
223
224 case repomgr.EvtKindDeleteRecord:
225 if collection.String() == constants.APP_BSKY_GRAPH_FOLLOW {
226 if r == nil {
227 log.Debug(ctx, "no repo found for follow", "userDID", evt.Repo, "subjectDID", rkey.String())
228 continue
229 }
230 log.Debug(ctx, "deleting follow", "userDID", evt.Repo, "subjectDID", rkey.String())
231 err := atsync.Model.DeleteFollow(ctx, evt.Repo, rkey.String())
232 if err != nil {
233 log.Debug(ctx, "failed to delete follow", "err", err)
234 }
235 }
236
237 if collection.String() == constants.APP_BSKY_GRAPH_BLOCK {
238 if r == nil {
239 log.Debug(ctx, "no repo found for block", "userDID", evt.Repo, "subjectDID", rkey.String())
240 continue
241 }
242 log.Warn(ctx, "deleting block", "userDID", evt.Repo, "subjectDID", rkey.String())
243 err := atsync.Model.DeleteBlock(ctx, rkey.String())
244 if err != nil {
245 log.Error(ctx, "failed to delete block", "err", err)
246 }
247 }
248
249 default:
250 log.Error(ctx, "unexpected record op kind")
251 }
252 }
253}