porting all github actions from bluesky-social/indigo to tangled CI
at ci 9.6 kB view raw
1package search 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "net/url" 9 "strings" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 bsky "github.com/bluesky-social/indigo/api/bsky" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/backfill" 16 "github.com/bluesky-social/indigo/events" 17 "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 18 lexutil "github.com/bluesky-social/indigo/lex/util" 19 "github.com/bluesky-social/indigo/repo" 20 typegen "github.com/whyrusleeping/cbor-gen" 21 22 "github.com/carlmjohnson/versioninfo" 23 "github.com/gorilla/websocket" 24 "github.com/ipfs/go-cid" 25) 26 27func (idx *Indexer) getLastCursor() (int64, error) { 28 var lastSeq LastSeq 29 if err := idx.db.Find(&lastSeq).Error; err != nil { 30 return 0, err 31 } 32 33 if lastSeq.ID == 0 { 34 return 0, idx.db.Create(&lastSeq).Error 35 } 36 37 return lastSeq.Seq, nil 38} 39 40func (idx *Indexer) updateLastCursor(curs int64) error { 41 return idx.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error 42} 43 44func (idx *Indexer) RunIndexer(ctx context.Context) error { 45 cur, err := idx.getLastCursor() 46 if err != nil { 47 return fmt.Errorf("get last cursor: %w", err) 48 } 49 50 // Start the indexer batch workers 51 go idx.runPostIndexer(ctx) 52 go idx.runProfileIndexer(ctx) 53 54 err = idx.bfs.LoadJobs(ctx) 55 if err != nil { 56 return fmt.Errorf("loading backfill jobs: %w", err) 57 } 58 go idx.bf.Start() 59 60 if idx.enableRepoDiscovery { 61 go idx.discoverRepos() 62 } 63 64 d := websocket.DefaultDialer 65 u, err := url.Parse(idx.relayhost) 66 if err != nil { 67 return fmt.Errorf("invalid bgshost URI: %w", err) 68 } 69 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 70 if cur != 0 { 71 u.RawQuery = fmt.Sprintf("cursor=%d", cur) 72 } 73 con, _, err := d.Dial(u.String(), http.Header{ 74 "User-Agent": []string{fmt.Sprintf("palomar/%s", versioninfo.Short())}, 75 }) 76 if err != nil { 77 return fmt.Errorf("events dial failed: %w", err) 78 } 79 80 rsc := &events.RepoStreamCallbacks{ 81 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 82 ctx := context.Background() 83 ctx, span := tracer.Start(ctx, "RepoCommit") 84 defer span.End() 85 86 defer func() { 87 if evt.Seq%50 == 0 { 88 if err := idx.updateLastCursor(evt.Seq); err != nil { 89 idx.logger.Error("failed to persist cursor", "err", err) 90 } 91 } 92 }() 93 logEvt := idx.logger.With("repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 94 if evt.TooBig && evt.Since != nil { 95 // TODO: handle this case (instead of return nil) 96 logEvt.Error("skipping non-genesis tooBig events for now") 97 return nil 98 } 99 100 if evt.TooBig { 101 if err := idx.processTooBigCommit(ctx, evt); err != nil { 102 // TODO: handle this case (instead of return nil) 103 logEvt.Error("failed to process tooBig event", "err", err) 104 return nil 105 } 106 107 return nil 108 } 109 110 // Pass events to the backfiller which will process or buffer as needed 111 if err := idx.bf.HandleEvent(ctx, evt); err != nil { 112 logEvt.Error("failed to handle event", "err", err) 113 } 114 115 return nil 116 117 }, 118 // TODO: process RepoIdentity 119 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 120 ctx := context.Background() 121 ctx, span := tracer.Start(ctx, "RepoIdentity") 122 defer span.End() 123 124 did, err := syntax.ParseDID(evt.Did) 125 if err != nil { 126 idx.logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) 127 return nil 128 } 129 ident, err := idx.dir.LookupDID(ctx, did) 130 if err != nil { 131 idx.logger.Error("failed identity resolution in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err) 132 return nil 133 } 134 if err := idx.updateUserHandle(ctx, did, ident.Handle.String()); err != nil { 135 // TODO: handle this case (instead of return nil) 136 idx.logger.Error("failed to update user handle", "did", evt.Did, "handle", ident.Handle, "seq", evt.Seq, "err", err) 137 } 138 return nil 139 }, 140 } 141 142 return events.HandleRepoStream( 143 ctx, con, autoscaling.NewScheduler( 144 autoscaling.DefaultAutoscaleSettings(), 145 idx.relayhost, 146 rsc.EventHandler, 147 ), 148 idx.logger, 149 ) 150} 151 152func (idx *Indexer) discoverRepos() { 153 ctx := context.Background() 154 log := idx.logger.With("func", "discoverRepos") 155 log.Info("starting repo discovery") 156 157 cursor := "" 158 limit := int64(500) 159 160 total := 0 161 totalErrored := 0 162 163 for { 164 resp, err := comatproto.SyncListRepos(ctx, idx.relayXRPC, cursor, limit) 165 if err != nil { 166 log.Error("failed to list repos", "err", err) 167 time.Sleep(5 * time.Second) 168 continue 169 } 170 log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor) 171 errored := 0 172 for _, repo := range resp.Repos { 173 _, err := idx.bfs.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) 174 if err != nil { 175 log.Error("failed to get or create job", "did", repo.Did, "err", err) 176 errored++ 177 } 178 } 179 log.Info("enqueued repos", "total", len(resp.Repos), "errored", errored) 180 totalErrored += errored 181 total += len(resp.Repos) 182 if resp.Cursor != nil && *resp.Cursor != "" { 183 cursor = *resp.Cursor 184 } else { 185 break 186 } 187 } 188 189 log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored) 190} 191 192func (idx *Indexer) handleCreateOrUpdate(ctx context.Context, rawDID string, rev string, path string, recB *[]byte, rcid *cid.Cid) error { 193 logger := idx.logger.With("func", "handleCreateOrUpdate", "did", rawDID, "rev", rev, "path", path) 194 // Since this gets called in a backfill job, we need to check if the path is a post or profile 195 if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { 196 return nil 197 } 198 199 did, err := syntax.ParseDID(rawDID) 200 if err != nil { 201 return fmt.Errorf("bad DID syntax in event: %w", err) 202 } 203 204 // CBOR Unmarshal the record 205 recCBOR, err := lexutil.CborDecodeValue(*recB) 206 if err != nil { 207 return fmt.Errorf("cbor decode: %w", err) 208 } 209 210 rec, ok := recCBOR.(typegen.CBORMarshaler) 211 if !ok { 212 return fmt.Errorf("failed to cast record to CBORMarshaler") 213 } 214 215 parts := strings.SplitN(path, "/", 3) 216 if len(parts) < 2 { 217 logger.Warn("skipping post record with malformed path") 218 return nil 219 } 220 221 switch rec := rec.(type) { 222 case *bsky.FeedPost: 223 rkey, err := syntax.ParseTID(parts[1]) 224 if err != nil { 225 logger.Warn("skipping post record with non-TID rkey") 226 return nil 227 } 228 229 job := PostIndexJob{ 230 did: did, 231 record: rec, 232 rcid: *rcid, 233 rkey: rkey.String(), 234 } 235 236 // Send the job to the bulk indexer 237 idx.postQueue <- &job 238 postsIndexed.Inc() 239 case *bsky.ActorProfile: 240 if parts[1] != "self" { 241 return nil 242 } 243 244 ident, err := idx.dir.LookupDID(ctx, did) 245 if err != nil { 246 return fmt.Errorf("resolving identity: %w", err) 247 } 248 if ident == nil { 249 return fmt.Errorf("identity not found for did: %s", did.String()) 250 } 251 252 job := ProfileIndexJob{ 253 ident: ident, 254 record: rec, 255 rcid: *rcid, 256 } 257 258 // Send the job to the bulk indexer 259 idx.profileQueue <- &job 260 profilesIndexed.Inc() 261 default: 262 } 263 return nil 264} 265 266func (idx *Indexer) handleDelete(ctx context.Context, rawDID, rev, path string) error { 267 // Since this gets called in a backfill job, we need to check if the path is a post or profile 268 if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { 269 return nil 270 } 271 272 did, err := syntax.ParseDID(rawDID) 273 if err != nil { 274 return fmt.Errorf("invalid DID in event: %w", err) 275 } 276 277 switch { 278 // TODO: handle profile deletes, its an edge case, but worth doing still 279 case strings.Contains(path, "app.bsky.feed.post"): 280 if err := idx.deletePost(ctx, did, path); err != nil { 281 return err 282 } 283 postsDeleted.Inc() 284 case strings.Contains(path, "app.bsky.actor.profile"): 285 // profilesDeleted.Inc() 286 } 287 288 return nil 289} 290 291func (idx *Indexer) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 292 logger := idx.logger.With("func", "processTooBigCommit", "repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 293 294 repodata, err := comatproto.SyncGetRepo(ctx, idx.relayXRPC, evt.Repo, "") 295 if err != nil { 296 return err 297 } 298 299 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) 300 if err != nil { 301 return err 302 } 303 304 did, err := syntax.ParseDID(evt.Repo) 305 if err != nil { 306 return fmt.Errorf("bad DID in repo event: %w", err) 307 } 308 309 ident, err := idx.dir.LookupDID(ctx, did) 310 if err != nil { 311 return err 312 } 313 if ident == nil { 314 return fmt.Errorf("identity not found for did: %s", did.String()) 315 } 316 317 return r.ForEach(ctx, "", func(k string, v cid.Cid) error { 318 if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { 319 rcid, rec, err := r.GetRecord(ctx, k) 320 if err != nil { 321 // TODO: handle this case (instead of return nil) 322 idx.logger.Error("failed to get record from repo checkout", "path", k, "err", err) 323 return nil 324 } 325 326 parts := strings.SplitN(k, "/", 3) 327 if len(parts) < 2 { 328 logger.Warn("skipping post record with malformed path") 329 return nil 330 } 331 332 switch rec := rec.(type) { 333 case *bsky.FeedPost: 334 rkey, err := syntax.ParseTID(parts[1]) 335 if err != nil { 336 logger.Warn("skipping post record with non-TID rkey") 337 return nil 338 } 339 340 job := PostIndexJob{ 341 did: did, 342 record: rec, 343 rcid: rcid, 344 rkey: rkey.String(), 345 } 346 347 // Send the job to the bulk indexer 348 idx.postQueue <- &job 349 case *bsky.ActorProfile: 350 if parts[1] != "self" { 351 return nil 352 } 353 354 job := ProfileIndexJob{ 355 ident: ident, 356 record: rec, 357 rcid: rcid, 358 } 359 360 // Send the job to the bulk indexer 361 idx.profileQueue <- &job 362 default: 363 } 364 365 } 366 return nil 367 }) 368}