Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/localhost-dev-is-back 244 lines 7.0 kB view raw
1package atproto 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "net/url" 9 "time" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/label" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 "github.com/bluesky-social/indigo/util" 17 "github.com/gorilla/websocket" 18 "golang.org/x/sync/errgroup" 19 "stream.place/streamplace/pkg/aqhttp" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/model" 22) 23 24func (atsync *ATProtoSynchronizer) StartLabelerFirehose(ctx context.Context, did string) error { 25 retryCount := 0 26 retryWindow := time.Now() 27 28 for { 29 if ctx.Err() != nil { 30 return nil 31 } 32 err := atsync.StartLabelerFirehoseRetry(ctx, did) 33 if err != nil { 34 log.Error(ctx, "firehose error", "err", err) 35 36 // Check if we're within the 1-minute window 37 now := time.Now() 38 if now.Sub(retryWindow) > time.Minute { 39 // Reset the counter if more than a minute has passed 40 retryCount = 1 41 retryWindow = now 42 } else { 43 // Increment retry count if within the window 44 retryCount++ 45 if retryCount >= 3 { 46 log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err) 47 return fmt.Errorf("firehose failed 3 times within a minute: %w", err) 48 } 49 } 50 } 51 } 52} 53 54func (atsync *ATProtoSynchronizer) StartLabelerFirehoseRetry(ctx context.Context, did string) error { 55 ctx = log.WithLogValues(ctx, "func", "StartLabelerFirehose") 56 57 ident, err := atsync.resolveIdent(ctx, did, true) 58 if err != nil { 59 return fmt.Errorf("failed to resolve DID %s: %w", did, err) 60 } 61 62 ctx = log.WithLogValues(ctx, "labelerDID", ident.DID.String(), "labelerHandle", ident.Handle.String()) 63 64 pub, err := ident.GetPublicKey("atproto_label") 65 if err != nil { 66 return fmt.Errorf("failed to get public key for labeler %s: %w", did, err) 67 } 68 69 labeler, ok := ident.Services["atproto_labeler"] 70 if !ok { 71 return fmt.Errorf("labeler %s does not have a atproto_labeler service", did) 72 } 73 74 ctx = log.WithLogValues(ctx, "func", "StartFirehose") 75 ctx, cancel := context.WithCancel(ctx) 76 defer cancel() 77 dialer := websocket.DefaultDialer 78 u, err := url.Parse(labeler.URL) 79 if err != nil { 80 return fmt.Errorf("invalid labeler URI: %w", err) 81 } 82 u.Path = "xrpc/com.atproto.label.subscribeLabels" 83 if u.Scheme == "http" { 84 u.Scheme = "ws" 85 } else if u.Scheme == "https" { 86 u.Scheme = "wss" 87 } else { 88 return fmt.Errorf("invalid labeler URI scheme: %s", labeler.URL) 89 } 90 dbLabeler, err := atsync.Model.GetLabeler(did) 91 if err != nil { 92 return fmt.Errorf("failed to get labeler %s: %w", did, err) 93 } 94 if dbLabeler == nil { 95 dbLabeler, err = atsync.Model.CreateLabeler(did) 96 if err != nil { 97 return fmt.Errorf("failed to create labeler %s: %w", did, err) 98 } 99 } 100 query := u.Query() 101 query.Set("cursor", fmt.Sprintf("%d", dbLabeler.Cursor)) 102 u.RawQuery = query.Encode() 103 104 con, _, err := dialer.Dial(u.String(), http.Header{ 105 "User-Agent": []string{aqhttp.UserAgent}, 106 }) 107 if err != nil { 108 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 109 } 110 111 rsc := &events.RepoStreamCallbacks{ 112 LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error { 113 err = atsync.Model.UpdateLabelerCursor(did, evt.Seq) 114 if err != nil { 115 log.Error(ctx, "failed to update labeler cursor", "err", err) 116 } 117 for _, labelLex := range evt.Labels { 118 l := label.FromLexicon(labelLex) 119 err = l.VerifySignature(pub) 120 if err != nil { 121 log.Error(ctx, "failed to verify label signature", "err", err) 122 continue 123 } 124 err = l.VerifySyntax() 125 if err != nil { 126 log.Error(ctx, "failed to verify label syntax", "err", err) 127 continue 128 } 129 bs := bytes.Buffer{} 130 err = labelLex.MarshalCBOR(&bs) 131 if err != nil { 132 log.Error(ctx, "failed to marshal label", "err", err) 133 continue 134 } 135 cts, err := time.Parse(util.ISO8601, l.CreatedAt) 136 if err != nil { 137 log.Error(ctx, "failed to parse label created time", "err", err) 138 continue 139 } 140 var exp time.Time 141 if l.ExpiresAt != nil { 142 e, err := time.Parse(util.ISO8601, *l.ExpiresAt) 143 if err != nil { 144 log.Error(ctx, "failed to parse label expiration time", "err", err) 145 continue 146 } 147 exp = e.UTC() 148 } else { 149 exp = time.Time{} 150 } 151 neg := false 152 if l.Negated != nil { 153 neg = *l.Negated 154 } 155 156 var targetDID string 157 158 // the URI can either be a true URI or a DID, so 159 aturi, err1 := syntax.ParseATURI(l.URI) 160 if err1 != nil { 161 did, err2 := syntax.ParseDID(l.URI) 162 if err2 != nil { 163 log.Error(ctx, "failed to parse label URI as either ATURI or DID", "err1", err1, "err2", err2) 164 continue 165 } 166 targetDID = did.String() 167 } else { 168 did, err := aturi.Authority().AsDID() 169 if err != nil { 170 log.Error(ctx, "failed to parse label URI as ATURI", "err", err) 171 continue 172 } 173 targetDID = did.String() 174 // if it's a chat message, attempt to send it to the streamers' websocket 175 if aturi.Collection() == "place.stream.chat.message" { 176 msg, err := atsync.Model.GetChatMessage(l.URI) 177 if err != nil { 178 log.Error(ctx, "failed to get chat message for label", "err", err) 179 continue 180 } 181 chatView, err := msg.ToStreamplaceMessageView() 182 if err != nil { 183 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 184 continue 185 } 186 isTrue := true 187 chatView.Deleted = &isTrue 188 atsync.Bus.Publish(msg.StreamerRepoDID, chatView) 189 } 190 } 191 192 log.Log(ctx, "labeler label", "targetDID", targetDID, "uri", l.URI, "cid", l.CID, "createdAt", cts, "expiresAt", exp, "negated", neg, "sourceDID", l.SourceDID, "val", l.Val, "version", l.Version) 193 err = atsync.Model.CreateLabel(&model.Label{ 194 Cid: l.CID, 195 Cts: cts.UTC(), 196 Exp: exp, 197 Neg: neg, 198 Sig: l.Sig, 199 Src: l.SourceDID, 200 Uri: l.URI, 201 Val: l.Val, 202 Ver: &l.Version, 203 Record: bs.Bytes(), 204 RepoDID: targetDID, 205 }) 206 if err != nil { 207 log.Error(ctx, "failed to create label", "err", err) 208 continue 209 } 210 atsync.Bus.Publish(targetDID, labelLex) 211 } 212 return nil 213 }, 214 LabelInfo: func(evt *comatproto.LabelSubscribeLabels_Info) error { 215 log.Log(ctx, "labeler info", "name", evt.Name, "message", evt.Message) 216 return nil 217 }, 218 Error: func(evt *events.ErrorFrame) error { 219 log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message) 220 cancel() 221 return fmt.Errorf("firehose error: %s", evt.Error) 222 }, 223 } 224 225 scheduler := parallel.NewScheduler( 226 10, 227 100, 228 did, 229 rsc.EventHandler, 230 ) 231 232 log.Log(ctx, "starting labeler firehose consumer", "labelerDID", did) 233 234 g, ctx := errgroup.WithContext(ctx) 235 236 g.Go(func() error { 237 return events.HandleRepoStream(ctx, con, scheduler, nil) 238 }) 239 240 <-ctx.Done() 241 242 return nil 243 244}