fork of indigo with slightly nicer lexgen
at main 7.8 kB view raw
1package search 2 3import ( 4 "bufio" 5 "context" 6 "encoding/hex" 7 "encoding/json" 8 "fmt" 9 "os" 10 "strconv" 11 "strings" 12 "sync" 13 14 appbsky "github.com/bluesky-social/indigo/api/bsky" 15 "github.com/bluesky-social/indigo/atproto/identity" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 18 "github.com/ipfs/go-cid" 19) 20 21type pagerankJob struct { 22 did syntax.DID 23 rank float64 24} 25 26// BulkIndexPageranks updates the pageranks for the DIDs in the Search Index from a CSV file. 27func (idx *Indexer) BulkIndexPageranks(ctx context.Context, pagerankFile string) error { 28 f, err := os.Open(pagerankFile) 29 if err != nil { 30 return fmt.Errorf("failed to open csv file: %w", err) 31 } 32 defer f.Close() 33 34 // Run 5 pagerank indexers in parallel 35 for i := 0; i < 5; i++ { 36 go idx.runPagerankIndexer(ctx) 37 } 38 39 logger := idx.logger.With("source", "bulk_index_pageranks") 40 41 queue := make(chan string, 20_000) 42 wg := &sync.WaitGroup{} 43 workerCount := 20 44 for i := 0; i < workerCount; i++ { 45 wg.Add(1) 46 go func() { 47 defer wg.Done() 48 for line := range queue { 49 if err := idx.processPagerankCSVLine(line); err != nil { 50 logger.Error("failed to process line", "err", err) 51 } 52 } 53 }() 54 } 55 56 // Create a scanner to read the file line by line 57 scanner := bufio.NewScanner(f) 58 buf := make([]byte, 0, 64*1024) 59 scanner.Buffer(buf, 1024*1024) 60 61 linesRead := 0 62 63 // Iterate over each line in the file 64 for scanner.Scan() { 65 line := scanner.Text() 66 67 queue <- line 68 69 linesRead++ 70 if linesRead%100_000 == 0 { 71 idx.logger.Info("processed csv lines", "lines", linesRead) 72 } 73 } 74 75 close(queue) 76 77 // Check for any scanner errors 78 if err := scanner.Err(); err != nil { 79 return fmt.Errorf("error reading csv file: %w", err) 80 } 81 82 wg.Wait() 83 84 idx.logger.Info("finished processing csv file", "lines", linesRead) 85 86 return nil 87} 88 89// BulkIndexPosts indexes posts from a CSV file. 90func (idx *Indexer) BulkIndexPosts(ctx context.Context, postsFile string) error { 91 f, err := os.Open(postsFile) 92 if err != nil { 93 return fmt.Errorf("failed to open csv file: %w", err) 94 } 95 defer f.Close() 96 97 // Run 5 post indexers in parallel 98 for i := 0; i < 5; i++ { 99 go idx.runPostIndexer(ctx) 100 } 101 102 logger := idx.logger.With("source", "bulk_index_posts") 103 104 queue := make(chan string, 20_000) 105 wg := &sync.WaitGroup{} 106 workerCount := 20 107 for i := 0; i < workerCount; i++ { 108 wg.Add(1) 109 go func() { 110 defer wg.Done() 111 for line := range queue { 112 if err := idx.processPostCSVLine(line); err != nil { 113 logger.Error("failed to process line", "err", err) 114 } 115 } 116 }() 117 } 118 119 // Create a scanner to read the file line by line 120 scanner := bufio.NewScanner(f) 121 buf := make([]byte, 0, 64*1024) 122 scanner.Buffer(buf, 1024*1024) 123 124 linesRead := 0 125 126 // Iterate over each line in the file 127 for scanner.Scan() { 128 line := scanner.Text() 129 130 queue <- line 131 132 linesRead++ 133 if linesRead%100_000 == 0 { 134 idx.logger.Info("processed csv lines", "lines", linesRead) 135 } 136 } 137 138 close(queue) 139 140 // Check for any scanner errors 141 if err := scanner.Err(); err != nil { 142 return fmt.Errorf("error reading csv file: %w", err) 143 } 144 145 wg.Wait() 146 147 idx.logger.Info("finished processing csv file", "lines", linesRead) 148 149 return nil 150} 151 152// BulkIndexProfiles indexes profiles from a CSV file. 153func (idx *Indexer) BulkIndexProfiles(ctx context.Context, profilesFile string) error { 154 f, err := os.Open(profilesFile) 155 if err != nil { 156 return fmt.Errorf("failed to open csv file: %w", err) 157 } 158 defer f.Close() 159 160 for i := 0; i < 5; i++ { 161 go idx.runProfileIndexer(ctx) 162 } 163 164 logger := idx.logger.With("source", "bulk_index_profiles") 165 166 queue := make(chan string, 20_000) 167 wg := &sync.WaitGroup{} 168 workerCount := 20 169 for i := 0; i < workerCount; i++ { 170 wg.Add(1) 171 go func() { 172 defer wg.Done() 173 for line := range queue { 174 if err := idx.processProfileCSVLine(line); err != nil { 175 logger.Error("failed to process line", "err", err) 176 } 177 } 178 }() 179 } 180 181 // Create a scanner to read the file line by line 182 scanner := bufio.NewScanner(f) 183 buf := make([]byte, 0, 64*1024) 184 scanner.Buffer(buf, 1024*1024) 185 186 linesRead := 0 187 188 // Iterate over each line in the file 189 for scanner.Scan() { 190 line := scanner.Text() 191 192 queue <- line 193 194 linesRead++ 195 if linesRead%100_000 == 0 { 196 idx.logger.Info("processed csv lines", "lines", linesRead) 197 } 198 } 199 200 close(queue) 201 202 // Check for any scanner errors 203 if err := scanner.Err(); err != nil { 204 return fmt.Errorf("error reading csv file: %w", err) 205 } 206 207 wg.Wait() 208 209 idx.logger.Info("finished processing csv file", "lines", linesRead) 210 211 return nil 212} 213 214func (idx *Indexer) processPagerankCSVLine(line string) error { 215 // Split the line into DID and rank 216 parts := strings.Split(line, ",") 217 if len(parts) != 2 { 218 return fmt.Errorf("invalid pagerank line: %s", line) 219 } 220 221 did, err := syntax.ParseDID(parts[0]) 222 if err != nil { 223 return fmt.Errorf("invalid DID: %s", parts[0]) 224 } 225 226 rank, err := strconv.ParseFloat(parts[1], 64) 227 if err != nil { 228 return fmt.Errorf("invalid pagerank value: %s", parts[1]) 229 } 230 231 job := PagerankIndexJob{ 232 did: did, 233 rank: rank, 234 } 235 236 // Send the job to the pagerank queue 237 idx.pagerankQueue <- &job 238 239 return nil 240} 241 242func (idx *Indexer) processPostCSVLine(line string) error { 243 // CSV is formatted as 244 // actor_did,rkey,taken_down(time or null),violates_threadgate(False or null),cid,raw(post JSON as hex) 245 parts := strings.Split(line, ",") 246 if len(parts) != 6 { 247 return fmt.Errorf("invalid csv line: %s", line) 248 } 249 250 did, err := syntax.ParseDID(parts[0]) 251 if err != nil { 252 return fmt.Errorf("invalid DID: %s", parts[0]) 253 } 254 255 rkey, err := syntax.ParseRecordKey(parts[1]) 256 if err != nil { 257 return fmt.Errorf("invalid record key: %s", parts[1]) 258 } 259 260 isTakenDown := false 261 if parts[2] != "" && parts[2] != "null" { 262 isTakenDown = true 263 } 264 265 violatesThreadgate := false 266 if parts[3] != "" && parts[3] != "False" { 267 violatesThreadgate = true 268 } 269 270 if isTakenDown || violatesThreadgate { 271 return nil 272 } 273 274 cid, err := cid.Parse(parts[4]) 275 if err != nil { 276 return fmt.Errorf("invalid CID: %s", parts[4]) 277 } 278 279 if len(parts[5]) <= 2 { 280 return nil 281 } 282 283 raw, err := hex.DecodeString(parts[5][2:]) 284 if err != nil { 285 return fmt.Errorf("invalid raw record (%s/%s): %s", did, rkey, parts[5][2:]) 286 } 287 288 post := appbsky.FeedPost{} 289 if err := json.Unmarshal(raw, &post); err != nil { 290 return fmt.Errorf("failed to unmarshal post: %w", err) 291 } 292 293 job := PostIndexJob{ 294 did: did, 295 rkey: rkey.String(), 296 rcid: cid, 297 record: &post, 298 } 299 300 // Send the job to the post queue 301 idx.postQueue <- &job 302 303 return nil 304} 305 306func (idx *Indexer) processProfileCSVLine(line string) error { 307 // CSV is formatted as 308 // actor_did,taken_down(time or null),cid,handle,raw(profile JSON as hex) 309 parts := strings.Split(line, ",") 310 if len(parts) != 5 { 311 return fmt.Errorf("invalid csv line: %s", line) 312 } 313 314 did, err := syntax.ParseDID(parts[0]) 315 if err != nil { 316 return fmt.Errorf("invalid DID: %s", parts[0]) 317 } 318 319 isTakenDown := false 320 if parts[1] != "" && parts[1] != "null" { 321 isTakenDown = true 322 } 323 324 if isTakenDown { 325 return nil 326 } 327 328 // Skip actors without profile records 329 if parts[2] == "" { 330 return nil 331 } 332 333 cid, err := cid.Parse(parts[2]) 334 if err != nil { 335 return fmt.Errorf("invalid CID: %s", parts[2]) 336 } 337 338 if len(parts[3]) <= 2 { 339 return nil 340 } 341 342 raw, err := hex.DecodeString(parts[4][2:]) 343 if err != nil { 344 return fmt.Errorf("invalid raw record (%s): %s", did, parts[4][2:]) 345 } 346 347 profile := appbsky.ActorProfile{} 348 if err := json.Unmarshal(raw, &profile); err != nil { 349 return fmt.Errorf("failed to unmarshal profile: %w", err) 350 } 351 352 ident := identity.Identity{DID: did} 353 354 handle, err := syntax.ParseHandle(parts[3]) 355 if err != nil { 356 ident.Handle = syntax.HandleInvalid 357 } else { 358 ident.Handle = handle 359 } 360 361 job := ProfileIndexJob{ 362 ident: &ident, 363 rcid: cid, 364 record: &profile, 365 } 366 367 // Send the job to the profile queue 368 idx.profileQueue <- &job 369 370 return nil 371}