Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/multistream-fixes 248 lines 7.2 kB view raw
1package atproto 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "net/url" 9 "time" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/labeling" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 "github.com/bluesky-social/indigo/util" 17 "github.com/gorilla/websocket" 18 "golang.org/x/sync/errgroup" 19 "stream.place/streamplace/pkg/aqhttp" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/model" 22 "stream.place/streamplace/pkg/spmetrics" 23) 24 25func (atsync *ATProtoSynchronizer) StartLabelerFirehose(ctx context.Context, did string) error { 26 retryCount := 0 27 retryWindow := time.Now() 28 29 for { 30 if ctx.Err() != nil { 31 return nil 32 } 33 err := atsync.StartLabelerFirehoseRetry(ctx, did) 34 if err != nil { 35 log.Error(ctx, "firehose error", "err", err) 36 37 // Check if we're within the 1-minute window 38 now := time.Now() 39 if now.Sub(retryWindow) > time.Minute { 40 // Reset the counter if more than a minute has passed 41 retryCount = 1 42 retryWindow = now 43 } else { 44 // Increment retry count if within the window 45 retryCount++ 46 if retryCount >= 3 { 47 log.Warn(ctx, "firehose failed 3 times within a minute, backing off", "err", err) 48 time.Sleep(5 * time.Second) 49 retryCount = 0 50 retryWindow = time.Now() 51 } 52 } 53 } 54 } 55} 56 57func (atsync *ATProtoSynchronizer) StartLabelerFirehoseRetry(ctx context.Context, did string) error { 58 ctx = log.WithLogValues(ctx, "func", "StartLabelerFirehose") 59 60 ident, err := atsync.resolveIdent(ctx, did, true) 61 if err != nil { 62 return fmt.Errorf("failed to resolve DID %s: %w", did, err) 63 } 64 65 ctx = log.WithLogValues(ctx, "labelerDID", ident.DID.String(), "labelerHandle", ident.Handle.String()) 66 67 pub, err := ident.GetPublicKey("atproto_label") 68 if err != nil { 69 return fmt.Errorf("failed to get public key for labeler %s: %w", did, err) 70 } 71 72 labeler, ok := ident.Services["atproto_labeler"] 73 if !ok { 74 return fmt.Errorf("labeler %s does not have a atproto_labeler service", did) 75 } 76 77 ctx = log.WithLogValues(ctx, "func", "StartFirehose") 78 ctx, cancel := context.WithCancel(ctx) 79 defer cancel() 80 dialer := websocket.DefaultDialer 81 u, err := url.Parse(labeler.URL) 82 if err != nil { 83 return fmt.Errorf("invalid labeler URI: %w", err) 84 } 85 u.Path = "xrpc/com.atproto.label.subscribeLabels" 86 if u.Scheme == "http" { 87 u.Scheme = "ws" 88 } else if u.Scheme == "https" { 89 u.Scheme = "wss" 90 } else { 91 return fmt.Errorf("invalid labeler URI scheme: %s", labeler.URL) 92 } 93 dbLabeler, err := atsync.Model.GetLabeler(did) 94 if err != nil { 95 return fmt.Errorf("failed to get labeler %s: %w", did, err) 96 } 97 if dbLabeler == nil { 98 dbLabeler, err = atsync.Model.CreateLabeler(did) 99 if err != nil { 100 return fmt.Errorf("failed to create labeler %s: %w", did, err) 101 } 102 } 103 query := u.Query() 104 query.Set("cursor", fmt.Sprintf("%d", dbLabeler.Cursor)) 105 u.RawQuery = query.Encode() 106 107 con, _, err := dialer.Dial(u.String(), http.Header{ 108 "User-Agent": []string{aqhttp.UserAgent}, 109 }) 110 if err != nil { 111 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 112 } 113 spmetrics.LabelerFirehosesConnected.WithLabelValues(did).Inc() 114 defer spmetrics.LabelerFirehosesConnected.WithLabelValues(did).Dec() 115 rsc := &events.RepoStreamCallbacks{ 116 LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error { 117 err = atsync.Model.UpdateLabelerCursor(did, evt.Seq) 118 if err != nil { 119 log.Error(ctx, "failed to update labeler cursor", "err", err) 120 } 121 for _, labelLex := range evt.Labels { 122 l := labeling.FromLexicon(labelLex) 123 err = l.VerifySignature(pub) 124 if err != nil { 125 log.Error(ctx, "failed to verify label signature", "err", err) 126 continue 127 } 128 err = l.VerifySyntax() 129 if err != nil { 130 log.Error(ctx, "failed to verify label syntax", "err", err) 131 continue 132 } 133 bs := bytes.Buffer{} 134 err = labelLex.MarshalCBOR(&bs) 135 if err != nil { 136 log.Error(ctx, "failed to marshal label", "err", err) 137 continue 138 } 139 cts, err := time.Parse(util.ISO8601, l.CreatedAt) 140 if err != nil { 141 log.Error(ctx, "failed to parse label created time", "err", err) 142 continue 143 } 144 var exp time.Time 145 if l.ExpiresAt != nil { 146 e, err := time.Parse(util.ISO8601, *l.ExpiresAt) 147 if err != nil { 148 log.Error(ctx, "failed to parse label expiration time", "err", err) 149 continue 150 } 151 exp = e.UTC() 152 } else { 153 exp = time.Time{} 154 } 155 neg := false 156 if l.Negated != nil { 157 neg = *l.Negated 158 } 159 160 var targetDID string 161 162 // the URI can either be a true URI or a DID, so 163 aturi, err1 := syntax.ParseATURI(l.URI) 164 if err1 != nil { 165 did, err2 := syntax.ParseDID(l.URI) 166 if err2 != nil { 167 log.Error(ctx, "failed to parse label URI as either ATURI or DID", "err1", err1, "err2", err2) 168 continue 169 } 170 targetDID = did.String() 171 } else { 172 did, err := aturi.Authority().AsDID() 173 if err != nil { 174 log.Error(ctx, "failed to parse label URI as ATURI", "err", err) 175 continue 176 } 177 targetDID = did.String() 178 // if it's a chat message, attempt to send it to the streamers' websocket 179 if aturi.Collection() == "place.stream.chat.message" { 180 msg, err := atsync.Model.GetChatMessage(l.URI) 181 if err != nil { 182 log.Error(ctx, "failed to get chat message for label", "err", err) 183 continue 184 } 185 chatView, err := msg.ToStreamplaceMessageView() 186 if err != nil { 187 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 188 continue 189 } 190 isTrue := true 191 chatView.Deleted = &isTrue 192 atsync.Bus.Publish(msg.StreamerRepoDID, chatView) 193 } 194 } 195 196 log.Log(ctx, "labeler label", "targetDID", targetDID, "uri", l.URI, "cid", l.CID, "createdAt", cts, "expiresAt", exp, "negated", neg, "sourceDID", l.SourceDID, "val", l.Val, "version", l.Version) 197 err = atsync.Model.CreateLabel(&model.Label{ 198 Cid: l.CID, 199 Cts: cts.UTC(), 200 Exp: exp, 201 Neg: neg, 202 Sig: l.Sig, 203 Src: l.SourceDID, 204 Uri: l.URI, 205 Val: l.Val, 206 Ver: &l.Version, 207 Record: bs.Bytes(), 208 RepoDID: targetDID, 209 }) 210 if err != nil { 211 log.Error(ctx, "failed to create label", "err", err) 212 continue 213 } 214 atsync.Bus.Publish(targetDID, labelLex) 215 } 216 return nil 217 }, 218 LabelInfo: func(evt *comatproto.LabelSubscribeLabels_Info) error { 219 log.Log(ctx, "labeler info", "name", evt.Name, "message", evt.Message) 220 return nil 221 }, 222 Error: func(evt *events.ErrorFrame) error { 223 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message) 224 cancel() 225 return fmt.Errorf("firehose error: %s", evt.Error) 226 }, 227 } 228 229 scheduler := parallel.NewScheduler( 230 10, 231 100, 232 did, 233 rsc.EventHandler, 234 ) 235 236 log.Log(ctx, "starting labeler firehose consumer", "labelerDID", did) 237 238 g, ctx := errgroup.WithContext(ctx) 239 240 g.Go(func() error { 241 return events.HandleRepoStream(ctx, con, scheduler, nil) 242 }) 243 244 <-ctx.Done() 245 246 return nil 247 248}