1package indexer
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/did"
12 "github.com/bluesky-social/indigo/events"
13 lexutil "github.com/bluesky-social/indigo/lex/util"
14 "github.com/bluesky-social/indigo/models"
15 "github.com/bluesky-social/indigo/repomgr"
16 "github.com/bluesky-social/indigo/util"
17 "github.com/bluesky-social/indigo/xrpc"
18
19 "go.opentelemetry.io/otel"
20 "gorm.io/gorm"
21)
22
23const MaxEventSliceLength = 1000000
24const MaxOpsSliceLength = 200
25
26type Indexer struct {
27 db *gorm.DB
28
29 events *events.EventManager
30 didr did.Resolver
31
32 Crawler *CrawlDispatcher
33
34 SendRemoteFollow func(context.Context, string, uint) error
35 CreateExternalUser func(context.Context, string) (*models.ActorInfo, error)
36 ApplyPDSClientSettings func(*xrpc.Client)
37
38 log *slog.Logger
39}
40
41func NewIndexer(db *gorm.DB, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl bool) (*Indexer, error) {
42 db.AutoMigrate(&models.FeedPost{})
43 db.AutoMigrate(&models.ActorInfo{})
44 db.AutoMigrate(&models.FollowRecord{})
45 db.AutoMigrate(&models.VoteRecord{})
46 db.AutoMigrate(&models.RepostRecord{})
47
48 ix := &Indexer{
49 db: db,
50 events: evtman,
51 didr: didr,
52 SendRemoteFollow: func(context.Context, string, uint) error {
53 return nil
54 },
55 ApplyPDSClientSettings: func(*xrpc.Client) {},
56 log: slog.Default().With("system", "indexer"),
57 }
58
59 if crawl {
60 c, err := NewCrawlDispatcher(fetcher, fetcher.MaxConcurrency, ix.log)
61 if err != nil {
62 return nil, err
63 }
64
65 ix.Crawler = c
66 ix.Crawler.Run()
67 }
68
69 return ix, nil
70}
71
72func (ix *Indexer) Shutdown() {
73 if ix.Crawler != nil {
74 ix.Crawler.Shutdown()
75 }
76}
77
78func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error {
79 ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent")
80 defer span.End()
81
82 ix.log.Debug("Handling Repo Event!", "uid", evt.User)
83
84 outops := make([]*comatproto.SyncSubscribeRepos_RepoOp, 0, len(evt.Ops))
85 for _, op := range evt.Ops {
86 link := (*lexutil.LexLink)(op.RecCid)
87 outops = append(outops, &comatproto.SyncSubscribeRepos_RepoOp{
88 Path: op.Collection + "/" + op.Rkey,
89 Action: string(op.Kind),
90 Cid: link,
91 })
92 }
93
94 did, err := ix.DidForUser(ctx, evt.User)
95 if err != nil {
96 return err
97 }
98
99 toobig := false
100 slice := evt.RepoSlice
101 if len(slice) > MaxEventSliceLength || len(outops) > MaxOpsSliceLength {
102 slice = []byte{}
103 outops = nil
104 toobig = true
105 }
106
107 ix.log.Debug("Sending event", "did", did)
108 if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{
109 RepoCommit: &comatproto.SyncSubscribeRepos_Commit{
110 Repo: did,
111 Blocks: slice,
112 Rev: evt.Rev,
113 Since: evt.Since,
114 Commit: lexutil.LexLink(evt.NewRoot),
115 Time: time.Now().Format(util.ISO8601),
116 Ops: outops,
117 TooBig: toobig,
118 },
119 PrivUid: evt.User,
120 }); err != nil {
121 return fmt.Errorf("failed to push event: %s", err)
122 }
123
124 return nil
125}
126
127func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*models.ActorInfo, error) {
128 ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing")
129 defer span.End()
130
131 ai, err := ix.LookupUserByDid(ctx, did)
132 if err == nil {
133 return ai, nil
134 }
135
136 if !isNotFound(err) {
137 return nil, err
138 }
139
140 // unknown user... create it and send it off to the crawler
141 return ix.createMissingUserRecord(ctx, did)
142}
143
144func (ix *Indexer) createMissingUserRecord(ctx context.Context, did string) (*models.ActorInfo, error) {
145 ctx, span := otel.Tracer("indexer").Start(ctx, "createMissingUserRecord")
146 defer span.End()
147
148 externalUserCreationAttempts.Inc()
149
150 ai, err := ix.CreateExternalUser(ctx, did)
151 if err != nil {
152 return nil, err
153 }
154
155 if err := ix.addUserToCrawler(ctx, ai); err != nil {
156 return nil, fmt.Errorf("failed to add unknown user to crawler: %w", err)
157 }
158
159 return ai, nil
160}
161
162func (ix *Indexer) addUserToCrawler(ctx context.Context, ai *models.ActorInfo) error {
163 ix.log.Debug("Sending user to crawler: ", "did", ai.Did)
164 if ix.Crawler == nil {
165 return nil
166 }
167
168 return ix.Crawler.Crawl(ctx, ai)
169}
170
171func (ix *Indexer) DidForUser(ctx context.Context, uid models.Uid) (string, error) {
172 var ai models.ActorInfo
173 if err := ix.db.First(&ai, "uid = ?", uid).Error; err != nil {
174 return "", err
175 }
176
177 return ai.Did, nil
178}
179
180func (ix *Indexer) LookupUser(ctx context.Context, id models.Uid) (*models.ActorInfo, error) {
181 var ai models.ActorInfo
182 if err := ix.db.First(&ai, "uid = ?", id).Error; err != nil {
183 return nil, err
184 }
185
186 return &ai, nil
187}
188
189func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*models.ActorInfo, error) {
190 var ai models.ActorInfo
191 if err := ix.db.Find(&ai, "did = ?", did).Error; err != nil {
192 return nil, err
193 }
194
195 if ai.ID == 0 {
196 return nil, gorm.ErrRecordNotFound
197 }
198
199 return &ai, nil
200}
201
202func (ix *Indexer) LookupUserByHandle(ctx context.Context, handle string) (*models.ActorInfo, error) {
203 var ai models.ActorInfo
204 if err := ix.db.Find(&ai, "handle = ?", handle).Error; err != nil {
205 return nil, err
206 }
207
208 if ai.ID == 0 {
209 return nil, gorm.ErrRecordNotFound
210 }
211
212 return &ai, nil
213}
214
215func isNotFound(err error) bool {
216 if errors.Is(err, gorm.ErrRecordNotFound) {
217 return true
218 }
219
220 return false
221}
222
223func (ix *Indexer) GetPost(ctx context.Context, uri string) (*models.FeedPost, error) {
224 puri, err := util.ParseAtUri(uri)
225 if err != nil {
226 return nil, err
227 }
228
229 var post models.FeedPost
230 if err := ix.db.First(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(models.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil {
231 return nil, err
232 }
233
234 return &post, nil
235}