Live video on the AT Protocol
1package atproto
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "net/url"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/atproto/label"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/events"
15 "github.com/bluesky-social/indigo/events/schedulers/parallel"
16 "github.com/bluesky-social/indigo/util"
17 "github.com/gorilla/websocket"
18 "golang.org/x/sync/errgroup"
19 "stream.place/streamplace/pkg/aqhttp"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/model"
22)
23
24func (atsync *ATProtoSynchronizer) StartLabelerFirehose(ctx context.Context, did string) error {
25 retryCount := 0
26 retryWindow := time.Now()
27
28 for {
29 if ctx.Err() != nil {
30 return nil
31 }
32 err := atsync.StartLabelerFirehoseRetry(ctx, did)
33 if err != nil {
34 log.Error(ctx, "firehose error", "err", err)
35
36 // Check if we're within the 1-minute window
37 now := time.Now()
38 if now.Sub(retryWindow) > time.Minute {
39 // Reset the counter if more than a minute has passed
40 retryCount = 1
41 retryWindow = now
42 } else {
43 // Increment retry count if within the window
44 retryCount++
45 if retryCount >= 3 {
46 log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err)
47 return fmt.Errorf("firehose failed 3 times within a minute: %w", err)
48 }
49 }
50 }
51 }
52}
53
54func (atsync *ATProtoSynchronizer) StartLabelerFirehoseRetry(ctx context.Context, did string) error {
55 ctx = log.WithLogValues(ctx, "func", "StartLabelerFirehose")
56
57 ident, err := atsync.resolveIdent(ctx, did, true)
58 if err != nil {
59 return fmt.Errorf("failed to resolve DID %s: %w", did, err)
60 }
61
62 ctx = log.WithLogValues(ctx, "labelerDID", ident.DID.String(), "labelerHandle", ident.Handle.String())
63
64 pub, err := ident.GetPublicKey("atproto_label")
65 if err != nil {
66 return fmt.Errorf("failed to get public key for labeler %s: %w", did, err)
67 }
68
69 labeler, ok := ident.Services["atproto_labeler"]
70 if !ok {
71 return fmt.Errorf("labeler %s does not have a atproto_labeler service", did)
72 }
73
74 ctx = log.WithLogValues(ctx, "func", "StartFirehose")
75 ctx, cancel := context.WithCancel(ctx)
76 defer cancel()
77 dialer := websocket.DefaultDialer
78 u, err := url.Parse(labeler.URL)
79 if err != nil {
80 return fmt.Errorf("invalid labeler URI: %w", err)
81 }
82 u.Path = "xrpc/com.atproto.label.subscribeLabels"
83 if u.Scheme == "http" {
84 u.Scheme = "ws"
85 } else if u.Scheme == "https" {
86 u.Scheme = "wss"
87 } else {
88 return fmt.Errorf("invalid labeler URI scheme: %s", labeler.URL)
89 }
90 dbLabeler, err := atsync.Model.GetLabeler(did)
91 if err != nil {
92 return fmt.Errorf("failed to get labeler %s: %w", did, err)
93 }
94 if dbLabeler == nil {
95 dbLabeler, err = atsync.Model.CreateLabeler(did)
96 if err != nil {
97 return fmt.Errorf("failed to create labeler %s: %w", did, err)
98 }
99 }
100 query := u.Query()
101 query.Set("cursor", fmt.Sprintf("%d", dbLabeler.Cursor))
102 u.RawQuery = query.Encode()
103
104 con, _, err := dialer.Dial(u.String(), http.Header{
105 "User-Agent": []string{aqhttp.UserAgent},
106 })
107 if err != nil {
108 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
109 }
110
111 rsc := &events.RepoStreamCallbacks{
112 LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error {
113 err = atsync.Model.UpdateLabelerCursor(did, evt.Seq)
114 if err != nil {
115 log.Error(ctx, "failed to update labeler cursor", "err", err)
116 }
117 for _, labelLex := range evt.Labels {
118 l := label.FromLexicon(labelLex)
119 err = l.VerifySignature(pub)
120 if err != nil {
121 log.Error(ctx, "failed to verify label signature", "err", err)
122 continue
123 }
124 err = l.VerifySyntax()
125 if err != nil {
126 log.Error(ctx, "failed to verify label syntax", "err", err)
127 continue
128 }
129 bs := bytes.Buffer{}
130 err = labelLex.MarshalCBOR(&bs)
131 if err != nil {
132 log.Error(ctx, "failed to marshal label", "err", err)
133 continue
134 }
135 cts, err := time.Parse(util.ISO8601, l.CreatedAt)
136 if err != nil {
137 log.Error(ctx, "failed to parse label created time", "err", err)
138 continue
139 }
140 var exp time.Time
141 if l.ExpiresAt != nil {
142 e, err := time.Parse(util.ISO8601, *l.ExpiresAt)
143 if err != nil {
144 log.Error(ctx, "failed to parse label expiration time", "err", err)
145 continue
146 }
147 exp = e.UTC()
148 } else {
149 exp = time.Time{}
150 }
151 neg := false
152 if l.Negated != nil {
153 neg = *l.Negated
154 }
155
156 var targetDID string
157
158 // the URI can either be a true URI or a DID, so
159 aturi, err1 := syntax.ParseATURI(l.URI)
160 if err1 != nil {
161 did, err2 := syntax.ParseDID(l.URI)
162 if err2 != nil {
163 log.Error(ctx, "failed to parse label URI as either ATURI or DID", "err1", err1, "err2", err2)
164 continue
165 }
166 targetDID = did.String()
167 } else {
168 did, err := aturi.Authority().AsDID()
169 if err != nil {
170 log.Error(ctx, "failed to parse label URI as ATURI", "err", err)
171 continue
172 }
173 targetDID = did.String()
174 // if it's a chat message, attempt to send it to the streamers' websocket
175 if aturi.Collection() == "place.stream.chat.message" {
176 msg, err := atsync.Model.GetChatMessage(l.URI)
177 if err != nil {
178 log.Error(ctx, "failed to get chat message for label", "err", err)
179 continue
180 }
181 chatView, err := msg.ToStreamplaceMessageView()
182 if err != nil {
183 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err)
184 continue
185 }
186 isTrue := true
187 chatView.Deleted = &isTrue
188 atsync.Bus.Publish(msg.StreamerRepoDID, chatView)
189 }
190 }
191
192 log.Log(ctx, "labeler label", "targetDID", targetDID, "uri", l.URI, "cid", l.CID, "createdAt", cts, "expiresAt", exp, "negated", neg, "sourceDID", l.SourceDID, "val", l.Val, "version", l.Version)
193 err = atsync.Model.CreateLabel(&model.Label{
194 Cid: l.CID,
195 Cts: cts.UTC(),
196 Exp: exp,
197 Neg: neg,
198 Sig: l.Sig,
199 Src: l.SourceDID,
200 Uri: l.URI,
201 Val: l.Val,
202 Ver: &l.Version,
203 Record: bs.Bytes(),
204 RepoDID: targetDID,
205 })
206 if err != nil {
207 log.Error(ctx, "failed to create label", "err", err)
208 continue
209 }
210 atsync.Bus.Publish(targetDID, labelLex)
211 }
212 return nil
213 },
214 LabelInfo: func(evt *comatproto.LabelSubscribeLabels_Info) error {
215 log.Log(ctx, "labeler info", "name", evt.Name, "message", evt.Message)
216 return nil
217 },
218 Error: func(evt *events.ErrorFrame) error {
219 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message)
220 cancel()
221 return fmt.Errorf("firehose error: %s", evt.Error)
222 },
223 }
224
225 scheduler := parallel.NewScheduler(
226 10,
227 100,
228 did,
229 rsc.EventHandler,
230 )
231
232 log.Log(ctx, "starting labeler firehose consumer", "labelerDID", did)
233
234 g, ctx := errgroup.WithContext(ctx)
235
236 g.Go(func() error {
237 return events.HandleRepoStream(ctx, con, scheduler, nil)
238 })
239
240 <-ctx.Done()
241
242 return nil
243
244}