Live video on the AT Protocol
1package atproto
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "net/url"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/atproto/labeling"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/events"
15 "github.com/bluesky-social/indigo/events/schedulers/parallel"
16 "github.com/bluesky-social/indigo/util"
17 "github.com/gorilla/websocket"
18 "golang.org/x/sync/errgroup"
19 "stream.place/streamplace/pkg/aqhttp"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/model"
22 "stream.place/streamplace/pkg/spmetrics"
23)
24
25func (atsync *ATProtoSynchronizer) StartLabelerFirehose(ctx context.Context, did string) error {
26 retryCount := 0
27 retryWindow := time.Now()
28
29 for {
30 if ctx.Err() != nil {
31 return nil
32 }
33 err := atsync.StartLabelerFirehoseRetry(ctx, did)
34 if err != nil {
35 log.Error(ctx, "firehose error", "err", err)
36
37 // Check if we're within the 1-minute window
38 now := time.Now()
39 if now.Sub(retryWindow) > time.Minute {
40 // Reset the counter if more than a minute has passed
41 retryCount = 1
42 retryWindow = now
43 } else {
44 // Increment retry count if within the window
45 retryCount++
46 if retryCount >= 3 {
47 log.Warn(ctx, "firehose failed 3 times within a minute, backing off", "err", err)
48 time.Sleep(5 * time.Second)
49 retryCount = 0
50 retryWindow = time.Now()
51 }
52 }
53 }
54 }
55}
56
57func (atsync *ATProtoSynchronizer) StartLabelerFirehoseRetry(ctx context.Context, did string) error {
58 ctx = log.WithLogValues(ctx, "func", "StartLabelerFirehose")
59
60 ident, err := atsync.resolveIdent(ctx, did, true)
61 if err != nil {
62 return fmt.Errorf("failed to resolve DID %s: %w", did, err)
63 }
64
65 ctx = log.WithLogValues(ctx, "labelerDID", ident.DID.String(), "labelerHandle", ident.Handle.String())
66
67 pub, err := ident.GetPublicKey("atproto_label")
68 if err != nil {
69 return fmt.Errorf("failed to get public key for labeler %s: %w", did, err)
70 }
71
72 labeler, ok := ident.Services["atproto_labeler"]
73 if !ok {
74 return fmt.Errorf("labeler %s does not have a atproto_labeler service", did)
75 }
76
77 ctx = log.WithLogValues(ctx, "func", "StartFirehose")
78 ctx, cancel := context.WithCancel(ctx)
79 defer cancel()
80 dialer := websocket.DefaultDialer
81 u, err := url.Parse(labeler.URL)
82 if err != nil {
83 return fmt.Errorf("invalid labeler URI: %w", err)
84 }
85 u.Path = "xrpc/com.atproto.label.subscribeLabels"
86 if u.Scheme == "http" {
87 u.Scheme = "ws"
88 } else if u.Scheme == "https" {
89 u.Scheme = "wss"
90 } else {
91 return fmt.Errorf("invalid labeler URI scheme: %s", labeler.URL)
92 }
93 dbLabeler, err := atsync.Model.GetLabeler(did)
94 if err != nil {
95 return fmt.Errorf("failed to get labeler %s: %w", did, err)
96 }
97 if dbLabeler == nil {
98 dbLabeler, err = atsync.Model.CreateLabeler(did)
99 if err != nil {
100 return fmt.Errorf("failed to create labeler %s: %w", did, err)
101 }
102 }
103 query := u.Query()
104 query.Set("cursor", fmt.Sprintf("%d", dbLabeler.Cursor))
105 u.RawQuery = query.Encode()
106
107 con, _, err := dialer.Dial(u.String(), http.Header{
108 "User-Agent": []string{aqhttp.UserAgent},
109 })
110 if err != nil {
111 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
112 }
113 spmetrics.LabelerFirehosesConnected.WithLabelValues(did).Inc()
114 defer spmetrics.LabelerFirehosesConnected.WithLabelValues(did).Dec()
115 rsc := &events.RepoStreamCallbacks{
116 LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error {
117 err = atsync.Model.UpdateLabelerCursor(did, evt.Seq)
118 if err != nil {
119 log.Error(ctx, "failed to update labeler cursor", "err", err)
120 }
121 for _, labelLex := range evt.Labels {
122 l := labeling.FromLexicon(labelLex)
123 err = l.VerifySignature(pub)
124 if err != nil {
125 log.Error(ctx, "failed to verify label signature", "err", err)
126 continue
127 }
128 err = l.VerifySyntax()
129 if err != nil {
130 log.Error(ctx, "failed to verify label syntax", "err", err)
131 continue
132 }
133 bs := bytes.Buffer{}
134 err = labelLex.MarshalCBOR(&bs)
135 if err != nil {
136 log.Error(ctx, "failed to marshal label", "err", err)
137 continue
138 }
139 cts, err := time.Parse(util.ISO8601, l.CreatedAt)
140 if err != nil {
141 log.Error(ctx, "failed to parse label created time", "err", err)
142 continue
143 }
144 var exp time.Time
145 if l.ExpiresAt != nil {
146 e, err := time.Parse(util.ISO8601, *l.ExpiresAt)
147 if err != nil {
148 log.Error(ctx, "failed to parse label expiration time", "err", err)
149 continue
150 }
151 exp = e.UTC()
152 } else {
153 exp = time.Time{}
154 }
155 neg := false
156 if l.Negated != nil {
157 neg = *l.Negated
158 }
159
160 var targetDID string
161
162 // the URI can either be a true URI or a DID, so
163 aturi, err1 := syntax.ParseATURI(l.URI)
164 if err1 != nil {
165 did, err2 := syntax.ParseDID(l.URI)
166 if err2 != nil {
167 log.Error(ctx, "failed to parse label URI as either ATURI or DID", "err1", err1, "err2", err2)
168 continue
169 }
170 targetDID = did.String()
171 } else {
172 did, err := aturi.Authority().AsDID()
173 if err != nil {
174 log.Error(ctx, "failed to parse label URI as ATURI", "err", err)
175 continue
176 }
177 targetDID = did.String()
178 // if it's a chat message, attempt to send it to the streamers' websocket
179 if aturi.Collection() == "place.stream.chat.message" {
180 msg, err := atsync.Model.GetChatMessage(l.URI)
181 if err != nil {
182 log.Error(ctx, "failed to get chat message for label", "err", err)
183 continue
184 }
185 chatView, err := msg.ToStreamplaceMessageView()
186 if err != nil {
187 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
188 continue
189 }
190 isTrue := true
191 chatView.Deleted = &isTrue
192 atsync.Bus.Publish(msg.StreamerRepoDID, chatView)
193 }
194 }
195
196 log.Log(ctx, "labeler label", "targetDID", targetDID, "uri", l.URI, "cid", l.CID, "createdAt", cts, "expiresAt", exp, "negated", neg, "sourceDID", l.SourceDID, "val", l.Val, "version", l.Version)
197 err = atsync.Model.CreateLabel(&model.Label{
198 Cid: l.CID,
199 Cts: cts.UTC(),
200 Exp: exp,
201 Neg: neg,
202 Sig: l.Sig,
203 Src: l.SourceDID,
204 Uri: l.URI,
205 Val: l.Val,
206 Ver: &l.Version,
207 Record: bs.Bytes(),
208 RepoDID: targetDID,
209 })
210 if err != nil {
211 log.Error(ctx, "failed to create label", "err", err)
212 continue
213 }
214 atsync.Bus.Publish(targetDID, labelLex)
215 }
216 return nil
217 },
218 LabelInfo: func(evt *comatproto.LabelSubscribeLabels_Info) error {
219 log.Log(ctx, "labeler info", "name", evt.Name, "message", evt.Message)
220 return nil
221 },
222 Error: func(evt *events.ErrorFrame) error {
223 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message)
224 cancel()
225 return fmt.Errorf("firehose error: %s", evt.Error)
226 },
227 }
228
229 scheduler := parallel.NewScheduler(
230 10,
231 100,
232 did,
233 rsc.EventHandler,
234 )
235
236 log.Log(ctx, "starting labeler firehose consumer", "labelerDID", did)
237
238 g, ctx := errgroup.WithContext(ctx)
239
240 g.Go(func() error {
241 return events.HandleRepoStream(ctx, con, scheduler, nil)
242 })
243
244 <-ctx.Done()
245
246 return nil
247
248}