1package search
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "net/url"
9 "strings"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 bsky "github.com/bluesky-social/indigo/api/bsky"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/backfill"
16 "github.com/bluesky-social/indigo/events"
17 "github.com/bluesky-social/indigo/events/schedulers/autoscaling"
18 lexutil "github.com/bluesky-social/indigo/lex/util"
19 "github.com/bluesky-social/indigo/repo"
20 typegen "github.com/whyrusleeping/cbor-gen"
21
22 "github.com/carlmjohnson/versioninfo"
23 "github.com/gorilla/websocket"
24 "github.com/ipfs/go-cid"
25)
26
27func (idx *Indexer) getLastCursor() (int64, error) {
28 var lastSeq LastSeq
29 if err := idx.db.Find(&lastSeq).Error; err != nil {
30 return 0, err
31 }
32
33 if lastSeq.ID == 0 {
34 return 0, idx.db.Create(&lastSeq).Error
35 }
36
37 return lastSeq.Seq, nil
38}
39
40func (idx *Indexer) updateLastCursor(curs int64) error {
41 return idx.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error
42}
43
44func (idx *Indexer) RunIndexer(ctx context.Context) error {
45 cur, err := idx.getLastCursor()
46 if err != nil {
47 return fmt.Errorf("get last cursor: %w", err)
48 }
49
50 // Start the indexer batch workers
51 go idx.runPostIndexer(ctx)
52 go idx.runProfileIndexer(ctx)
53
54 err = idx.bfs.LoadJobs(ctx)
55 if err != nil {
56 return fmt.Errorf("loading backfill jobs: %w", err)
57 }
58 go idx.bf.Start()
59
60 if idx.enableRepoDiscovery {
61 go idx.discoverRepos()
62 }
63
64 d := websocket.DefaultDialer
65 u, err := url.Parse(idx.relayhost)
66 if err != nil {
67 return fmt.Errorf("invalid bgshost URI: %w", err)
68 }
69 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
70 if cur != 0 {
71 u.RawQuery = fmt.Sprintf("cursor=%d", cur)
72 }
73 con, _, err := d.Dial(u.String(), http.Header{
74 "User-Agent": []string{fmt.Sprintf("palomar/%s", versioninfo.Short())},
75 })
76 if err != nil {
77 return fmt.Errorf("events dial failed: %w", err)
78 }
79
80 rsc := &events.RepoStreamCallbacks{
81 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
82 ctx := context.Background()
83 ctx, span := tracer.Start(ctx, "RepoCommit")
84 defer span.End()
85
86 defer func() {
87 if evt.Seq%50 == 0 {
88 if err := idx.updateLastCursor(evt.Seq); err != nil {
89 idx.logger.Error("failed to persist cursor", "err", err)
90 }
91 }
92 }()
93 logEvt := idx.logger.With("repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
94 if evt.TooBig && evt.Since != nil {
95 // TODO: handle this case (instead of return nil)
96 logEvt.Error("skipping non-genesis tooBig events for now")
97 return nil
98 }
99
100 if evt.TooBig {
101 if err := idx.processTooBigCommit(ctx, evt); err != nil {
102 // TODO: handle this case (instead of return nil)
103 logEvt.Error("failed to process tooBig event", "err", err)
104 return nil
105 }
106
107 return nil
108 }
109
110 // Pass events to the backfiller which will process or buffer as needed
111 if err := idx.bf.HandleEvent(ctx, evt); err != nil {
112 logEvt.Error("failed to handle event", "err", err)
113 }
114
115 return nil
116
117 },
118 // TODO: process RepoIdentity
119 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
120 ctx := context.Background()
121 ctx, span := tracer.Start(ctx, "RepoIdentity")
122 defer span.End()
123
124 did, err := syntax.ParseDID(evt.Did)
125 if err != nil {
126 idx.logger.Error("bad DID in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err)
127 return nil
128 }
129 ident, err := idx.dir.LookupDID(ctx, did)
130 if err != nil {
131 idx.logger.Error("failed identity resolution in RepoIdentity event", "did", evt.Did, "seq", evt.Seq, "err", err)
132 return nil
133 }
134 if err := idx.updateUserHandle(ctx, did, ident.Handle.String()); err != nil {
135 // TODO: handle this case (instead of return nil)
136 idx.logger.Error("failed to update user handle", "did", evt.Did, "handle", ident.Handle, "seq", evt.Seq, "err", err)
137 }
138 return nil
139 },
140 }
141
142 return events.HandleRepoStream(
143 ctx, con, autoscaling.NewScheduler(
144 autoscaling.DefaultAutoscaleSettings(),
145 idx.relayhost,
146 rsc.EventHandler,
147 ),
148 idx.logger,
149 )
150}
151
152func (idx *Indexer) discoverRepos() {
153 ctx := context.Background()
154 log := idx.logger.With("func", "discoverRepos")
155 log.Info("starting repo discovery")
156
157 cursor := ""
158 limit := int64(500)
159
160 total := 0
161 totalErrored := 0
162
163 for {
164 resp, err := comatproto.SyncListRepos(ctx, idx.relayXRPC, cursor, limit)
165 if err != nil {
166 log.Error("failed to list repos", "err", err)
167 time.Sleep(5 * time.Second)
168 continue
169 }
170 log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor)
171 errored := 0
172 for _, repo := range resp.Repos {
173 _, err := idx.bfs.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
174 if err != nil {
175 log.Error("failed to get or create job", "did", repo.Did, "err", err)
176 errored++
177 }
178 }
179 log.Info("enqueued repos", "total", len(resp.Repos), "errored", errored)
180 totalErrored += errored
181 total += len(resp.Repos)
182 if resp.Cursor != nil && *resp.Cursor != "" {
183 cursor = *resp.Cursor
184 } else {
185 break
186 }
187 }
188
189 log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored)
190}
191
192func (idx *Indexer) handleCreateOrUpdate(ctx context.Context, rawDID string, rev string, path string, recB *[]byte, rcid *cid.Cid) error {
193 logger := idx.logger.With("func", "handleCreateOrUpdate", "did", rawDID, "rev", rev, "path", path)
194 // Since this gets called in a backfill job, we need to check if the path is a post or profile
195 if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
196 return nil
197 }
198
199 did, err := syntax.ParseDID(rawDID)
200 if err != nil {
201 return fmt.Errorf("bad DID syntax in event: %w", err)
202 }
203
204 // CBOR Unmarshal the record
205 recCBOR, err := lexutil.CborDecodeValue(*recB)
206 if err != nil {
207 return fmt.Errorf("cbor decode: %w", err)
208 }
209
210 rec, ok := recCBOR.(typegen.CBORMarshaler)
211 if !ok {
212 return fmt.Errorf("failed to cast record to CBORMarshaler")
213 }
214
215 parts := strings.SplitN(path, "/", 3)
216 if len(parts) < 2 {
217 logger.Warn("skipping post record with malformed path")
218 return nil
219 }
220
221 switch rec := rec.(type) {
222 case *bsky.FeedPost:
223 rkey, err := syntax.ParseTID(parts[1])
224 if err != nil {
225 logger.Warn("skipping post record with non-TID rkey")
226 return nil
227 }
228
229 job := PostIndexJob{
230 did: did,
231 record: rec,
232 rcid: *rcid,
233 rkey: rkey.String(),
234 }
235
236 // Send the job to the bulk indexer
237 idx.postQueue <- &job
238 postsIndexed.Inc()
239 case *bsky.ActorProfile:
240 if parts[1] != "self" {
241 return nil
242 }
243
244 ident, err := idx.dir.LookupDID(ctx, did)
245 if err != nil {
246 return fmt.Errorf("resolving identity: %w", err)
247 }
248 if ident == nil {
249 return fmt.Errorf("identity not found for did: %s", did.String())
250 }
251
252 job := ProfileIndexJob{
253 ident: ident,
254 record: rec,
255 rcid: *rcid,
256 }
257
258 // Send the job to the bulk indexer
259 idx.profileQueue <- &job
260 profilesIndexed.Inc()
261 default:
262 }
263 return nil
264}
265
266func (idx *Indexer) handleDelete(ctx context.Context, rawDID, rev, path string) error {
267 // Since this gets called in a backfill job, we need to check if the path is a post or profile
268 if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
269 return nil
270 }
271
272 did, err := syntax.ParseDID(rawDID)
273 if err != nil {
274 return fmt.Errorf("invalid DID in event: %w", err)
275 }
276
277 switch {
278 // TODO: handle profile deletes, its an edge case, but worth doing still
279 case strings.Contains(path, "app.bsky.feed.post"):
280 if err := idx.deletePost(ctx, did, path); err != nil {
281 return err
282 }
283 postsDeleted.Inc()
284 case strings.Contains(path, "app.bsky.actor.profile"):
285 // profilesDeleted.Inc()
286 }
287
288 return nil
289}
290
291func (idx *Indexer) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
292 logger := idx.logger.With("func", "processTooBigCommit", "repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
293
294 repodata, err := comatproto.SyncGetRepo(ctx, idx.relayXRPC, evt.Repo, "")
295 if err != nil {
296 return err
297 }
298
299 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata))
300 if err != nil {
301 return err
302 }
303
304 did, err := syntax.ParseDID(evt.Repo)
305 if err != nil {
306 return fmt.Errorf("bad DID in repo event: %w", err)
307 }
308
309 ident, err := idx.dir.LookupDID(ctx, did)
310 if err != nil {
311 return err
312 }
313 if ident == nil {
314 return fmt.Errorf("identity not found for did: %s", did.String())
315 }
316
317 return r.ForEach(ctx, "", func(k string, v cid.Cid) error {
318 if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") {
319 rcid, rec, err := r.GetRecord(ctx, k)
320 if err != nil {
321 // TODO: handle this case (instead of return nil)
322 idx.logger.Error("failed to get record from repo checkout", "path", k, "err", err)
323 return nil
324 }
325
326 parts := strings.SplitN(k, "/", 3)
327 if len(parts) < 2 {
328 logger.Warn("skipping post record with malformed path")
329 return nil
330 }
331
332 switch rec := rec.(type) {
333 case *bsky.FeedPost:
334 rkey, err := syntax.ParseTID(parts[1])
335 if err != nil {
336 logger.Warn("skipping post record with non-TID rkey")
337 return nil
338 }
339
340 job := PostIndexJob{
341 did: did,
342 record: rec,
343 rcid: rcid,
344 rkey: rkey.String(),
345 }
346
347 // Send the job to the bulk indexer
348 idx.postQueue <- &job
349 case *bsky.ActorProfile:
350 if parts[1] != "self" {
351 return nil
352 }
353
354 job := ProfileIndexJob{
355 ident: ident,
356 record: rec,
357 rcid: rcid,
358 }
359
360 // Send the job to the bulk indexer
361 idx.profileQueue <- &job
362 default:
363 }
364
365 }
366 return nil
367 })
368}