fork of indigo with slightly nicer lexgen
at main 5.6 kB view raw
1package indexer 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/did" 12 "github.com/bluesky-social/indigo/events" 13 lexutil "github.com/bluesky-social/indigo/lex/util" 14 "github.com/bluesky-social/indigo/models" 15 "github.com/bluesky-social/indigo/repomgr" 16 "github.com/bluesky-social/indigo/util" 17 "github.com/bluesky-social/indigo/xrpc" 18 19 "go.opentelemetry.io/otel" 20 "gorm.io/gorm" 21) 22 23const MaxEventSliceLength = 1000000 24const MaxOpsSliceLength = 200 25 26type Indexer struct { 27 db *gorm.DB 28 29 events *events.EventManager 30 didr did.Resolver 31 32 Crawler *CrawlDispatcher 33 34 SendRemoteFollow func(context.Context, string, uint) error 35 CreateExternalUser func(context.Context, string) (*models.ActorInfo, error) 36 ApplyPDSClientSettings func(*xrpc.Client) 37 38 log *slog.Logger 39} 40 41func NewIndexer(db *gorm.DB, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl bool) (*Indexer, error) { 42 db.AutoMigrate(&models.FeedPost{}) 43 db.AutoMigrate(&models.ActorInfo{}) 44 db.AutoMigrate(&models.FollowRecord{}) 45 db.AutoMigrate(&models.VoteRecord{}) 46 db.AutoMigrate(&models.RepostRecord{}) 47 48 ix := &Indexer{ 49 db: db, 50 events: evtman, 51 didr: didr, 52 SendRemoteFollow: func(context.Context, string, uint) error { 53 return nil 54 }, 55 ApplyPDSClientSettings: func(*xrpc.Client) {}, 56 log: slog.Default().With("system", "indexer"), 57 } 58 59 if crawl { 60 c, err := NewCrawlDispatcher(fetcher, fetcher.MaxConcurrency, ix.log) 61 if err != nil { 62 return nil, err 63 } 64 65 ix.Crawler = c 66 ix.Crawler.Run() 67 } 68 69 return ix, nil 70} 71 72func (ix *Indexer) Shutdown() { 73 if ix.Crawler != nil { 74 ix.Crawler.Shutdown() 75 } 76} 77 78func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error { 79 ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent") 80 defer span.End() 81 82 ix.log.Debug("Handling Repo Event!", "uid", evt.User) 83 84 outops := make([]*comatproto.SyncSubscribeRepos_RepoOp, 0, len(evt.Ops)) 85 for _, op := range evt.Ops { 86 link := (*lexutil.LexLink)(op.RecCid) 87 outops = append(outops, &comatproto.SyncSubscribeRepos_RepoOp{ 88 Path: op.Collection + "/" + op.Rkey, 89 Action: string(op.Kind), 90 Cid: link, 91 }) 92 } 93 94 did, err := ix.DidForUser(ctx, evt.User) 95 if err != nil { 96 return err 97 } 98 99 toobig := false 100 slice := evt.RepoSlice 101 if len(slice) > MaxEventSliceLength || len(outops) > MaxOpsSliceLength { 102 slice = []byte{} 103 outops = nil 104 toobig = true 105 } 106 107 ix.log.Debug("Sending event", "did", did) 108 if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{ 109 RepoCommit: &comatproto.SyncSubscribeRepos_Commit{ 110 Repo: did, 111 Blocks: slice, 112 Rev: evt.Rev, 113 Since: evt.Since, 114 Commit: lexutil.LexLink(evt.NewRoot), 115 Time: time.Now().Format(util.ISO8601), 116 Ops: outops, 117 TooBig: toobig, 118 }, 119 PrivUid: evt.User, 120 }); err != nil { 121 return fmt.Errorf("failed to push event: %s", err) 122 } 123 124 return nil 125} 126 127func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*models.ActorInfo, error) { 128 ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing") 129 defer span.End() 130 131 ai, err := ix.LookupUserByDid(ctx, did) 132 if err == nil { 133 return ai, nil 134 } 135 136 if !isNotFound(err) { 137 return nil, err 138 } 139 140 // unknown user... create it and send it off to the crawler 141 return ix.createMissingUserRecord(ctx, did) 142} 143 144func (ix *Indexer) createMissingUserRecord(ctx context.Context, did string) (*models.ActorInfo, error) { 145 ctx, span := otel.Tracer("indexer").Start(ctx, "createMissingUserRecord") 146 defer span.End() 147 148 externalUserCreationAttempts.Inc() 149 150 ai, err := ix.CreateExternalUser(ctx, did) 151 if err != nil { 152 return nil, err 153 } 154 155 if err := ix.addUserToCrawler(ctx, ai); err != nil { 156 return nil, fmt.Errorf("failed to add unknown user to crawler: %w", err) 157 } 158 159 return ai, nil 160} 161 162func (ix *Indexer) addUserToCrawler(ctx context.Context, ai *models.ActorInfo) error { 163 ix.log.Debug("Sending user to crawler: ", "did", ai.Did) 164 if ix.Crawler == nil { 165 return nil 166 } 167 168 return ix.Crawler.Crawl(ctx, ai) 169} 170 171func (ix *Indexer) DidForUser(ctx context.Context, uid models.Uid) (string, error) { 172 var ai models.ActorInfo 173 if err := ix.db.First(&ai, "uid = ?", uid).Error; err != nil { 174 return "", err 175 } 176 177 return ai.Did, nil 178} 179 180func (ix *Indexer) LookupUser(ctx context.Context, id models.Uid) (*models.ActorInfo, error) { 181 var ai models.ActorInfo 182 if err := ix.db.First(&ai, "uid = ?", id).Error; err != nil { 183 return nil, err 184 } 185 186 return &ai, nil 187} 188 189func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*models.ActorInfo, error) { 190 var ai models.ActorInfo 191 if err := ix.db.Find(&ai, "did = ?", did).Error; err != nil { 192 return nil, err 193 } 194 195 if ai.ID == 0 { 196 return nil, gorm.ErrRecordNotFound 197 } 198 199 return &ai, nil 200} 201 202func (ix *Indexer) LookupUserByHandle(ctx context.Context, handle string) (*models.ActorInfo, error) { 203 var ai models.ActorInfo 204 if err := ix.db.Find(&ai, "handle = ?", handle).Error; err != nil { 205 return nil, err 206 } 207 208 if ai.ID == 0 { 209 return nil, gorm.ErrRecordNotFound 210 } 211 212 return &ai, nil 213} 214 215func isNotFound(err error) bool { 216 if errors.Is(err, gorm.ErrRecordNotFound) { 217 return true 218 } 219 220 return false 221} 222 223func (ix *Indexer) GetPost(ctx context.Context, uri string) (*models.FeedPost, error) { 224 puri, err := util.ParseAtUri(uri) 225 if err != nil { 226 return nil, err 227 } 228 229 var post models.FeedPost 230 if err := ix.db.First(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(models.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil { 231 return nil, err 232 } 233 234 return &post, nil 235}