1package search
2
3import (
4 "bufio"
5 "context"
6 "encoding/hex"
7 "encoding/json"
8 "fmt"
9 "os"
10 "strconv"
11 "strings"
12 "sync"
13
14 appbsky "github.com/bluesky-social/indigo/api/bsky"
15 "github.com/bluesky-social/indigo/atproto/identity"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17
18 "github.com/ipfs/go-cid"
19)
20
21type pagerankJob struct {
22 did syntax.DID
23 rank float64
24}
25
26// BulkIndexPageranks updates the pageranks for the DIDs in the Search Index from a CSV file.
27func (idx *Indexer) BulkIndexPageranks(ctx context.Context, pagerankFile string) error {
28 f, err := os.Open(pagerankFile)
29 if err != nil {
30 return fmt.Errorf("failed to open csv file: %w", err)
31 }
32 defer f.Close()
33
34 // Run 5 pagerank indexers in parallel
35 for i := 0; i < 5; i++ {
36 go idx.runPagerankIndexer(ctx)
37 }
38
39 logger := idx.logger.With("source", "bulk_index_pageranks")
40
41 queue := make(chan string, 20_000)
42 wg := &sync.WaitGroup{}
43 workerCount := 20
44 for i := 0; i < workerCount; i++ {
45 wg.Add(1)
46 go func() {
47 defer wg.Done()
48 for line := range queue {
49 if err := idx.processPagerankCSVLine(line); err != nil {
50 logger.Error("failed to process line", "err", err)
51 }
52 }
53 }()
54 }
55
56 // Create a scanner to read the file line by line
57 scanner := bufio.NewScanner(f)
58 buf := make([]byte, 0, 64*1024)
59 scanner.Buffer(buf, 1024*1024)
60
61 linesRead := 0
62
63 // Iterate over each line in the file
64 for scanner.Scan() {
65 line := scanner.Text()
66
67 queue <- line
68
69 linesRead++
70 if linesRead%100_000 == 0 {
71 idx.logger.Info("processed csv lines", "lines", linesRead)
72 }
73 }
74
75 close(queue)
76
77 // Check for any scanner errors
78 if err := scanner.Err(); err != nil {
79 return fmt.Errorf("error reading csv file: %w", err)
80 }
81
82 wg.Wait()
83
84 idx.logger.Info("finished processing csv file", "lines", linesRead)
85
86 return nil
87}
88
89// BulkIndexPosts indexes posts from a CSV file.
90func (idx *Indexer) BulkIndexPosts(ctx context.Context, postsFile string) error {
91 f, err := os.Open(postsFile)
92 if err != nil {
93 return fmt.Errorf("failed to open csv file: %w", err)
94 }
95 defer f.Close()
96
97 // Run 5 post indexers in parallel
98 for i := 0; i < 5; i++ {
99 go idx.runPostIndexer(ctx)
100 }
101
102 logger := idx.logger.With("source", "bulk_index_posts")
103
104 queue := make(chan string, 20_000)
105 wg := &sync.WaitGroup{}
106 workerCount := 20
107 for i := 0; i < workerCount; i++ {
108 wg.Add(1)
109 go func() {
110 defer wg.Done()
111 for line := range queue {
112 if err := idx.processPostCSVLine(line); err != nil {
113 logger.Error("failed to process line", "err", err)
114 }
115 }
116 }()
117 }
118
119 // Create a scanner to read the file line by line
120 scanner := bufio.NewScanner(f)
121 buf := make([]byte, 0, 64*1024)
122 scanner.Buffer(buf, 1024*1024)
123
124 linesRead := 0
125
126 // Iterate over each line in the file
127 for scanner.Scan() {
128 line := scanner.Text()
129
130 queue <- line
131
132 linesRead++
133 if linesRead%100_000 == 0 {
134 idx.logger.Info("processed csv lines", "lines", linesRead)
135 }
136 }
137
138 close(queue)
139
140 // Check for any scanner errors
141 if err := scanner.Err(); err != nil {
142 return fmt.Errorf("error reading csv file: %w", err)
143 }
144
145 wg.Wait()
146
147 idx.logger.Info("finished processing csv file", "lines", linesRead)
148
149 return nil
150}
151
152// BulkIndexProfiles indexes profiles from a CSV file.
153func (idx *Indexer) BulkIndexProfiles(ctx context.Context, profilesFile string) error {
154 f, err := os.Open(profilesFile)
155 if err != nil {
156 return fmt.Errorf("failed to open csv file: %w", err)
157 }
158 defer f.Close()
159
160 for i := 0; i < 5; i++ {
161 go idx.runProfileIndexer(ctx)
162 }
163
164 logger := idx.logger.With("source", "bulk_index_profiles")
165
166 queue := make(chan string, 20_000)
167 wg := &sync.WaitGroup{}
168 workerCount := 20
169 for i := 0; i < workerCount; i++ {
170 wg.Add(1)
171 go func() {
172 defer wg.Done()
173 for line := range queue {
174 if err := idx.processProfileCSVLine(line); err != nil {
175 logger.Error("failed to process line", "err", err)
176 }
177 }
178 }()
179 }
180
181 // Create a scanner to read the file line by line
182 scanner := bufio.NewScanner(f)
183 buf := make([]byte, 0, 64*1024)
184 scanner.Buffer(buf, 1024*1024)
185
186 linesRead := 0
187
188 // Iterate over each line in the file
189 for scanner.Scan() {
190 line := scanner.Text()
191
192 queue <- line
193
194 linesRead++
195 if linesRead%100_000 == 0 {
196 idx.logger.Info("processed csv lines", "lines", linesRead)
197 }
198 }
199
200 close(queue)
201
202 // Check for any scanner errors
203 if err := scanner.Err(); err != nil {
204 return fmt.Errorf("error reading csv file: %w", err)
205 }
206
207 wg.Wait()
208
209 idx.logger.Info("finished processing csv file", "lines", linesRead)
210
211 return nil
212}
213
214func (idx *Indexer) processPagerankCSVLine(line string) error {
215 // Split the line into DID and rank
216 parts := strings.Split(line, ",")
217 if len(parts) != 2 {
218 return fmt.Errorf("invalid pagerank line: %s", line)
219 }
220
221 did, err := syntax.ParseDID(parts[0])
222 if err != nil {
223 return fmt.Errorf("invalid DID: %s", parts[0])
224 }
225
226 rank, err := strconv.ParseFloat(parts[1], 64)
227 if err != nil {
228 return fmt.Errorf("invalid pagerank value: %s", parts[1])
229 }
230
231 job := PagerankIndexJob{
232 did: did,
233 rank: rank,
234 }
235
236 // Send the job to the pagerank queue
237 idx.pagerankQueue <- &job
238
239 return nil
240}
241
242func (idx *Indexer) processPostCSVLine(line string) error {
243 // CSV is formatted as
244 // actor_did,rkey,taken_down(time or null),violates_threadgate(False or null),cid,raw(post JSON as hex)
245 parts := strings.Split(line, ",")
246 if len(parts) != 6 {
247 return fmt.Errorf("invalid csv line: %s", line)
248 }
249
250 did, err := syntax.ParseDID(parts[0])
251 if err != nil {
252 return fmt.Errorf("invalid DID: %s", parts[0])
253 }
254
255 rkey, err := syntax.ParseRecordKey(parts[1])
256 if err != nil {
257 return fmt.Errorf("invalid record key: %s", parts[1])
258 }
259
260 isTakenDown := false
261 if parts[2] != "" && parts[2] != "null" {
262 isTakenDown = true
263 }
264
265 violatesThreadgate := false
266 if parts[3] != "" && parts[3] != "False" {
267 violatesThreadgate = true
268 }
269
270 if isTakenDown || violatesThreadgate {
271 return nil
272 }
273
274 cid, err := cid.Parse(parts[4])
275 if err != nil {
276 return fmt.Errorf("invalid CID: %s", parts[4])
277 }
278
279 if len(parts[5]) <= 2 {
280 return nil
281 }
282
283 raw, err := hex.DecodeString(parts[5][2:])
284 if err != nil {
285 return fmt.Errorf("invalid raw record (%s/%s): %s", did, rkey, parts[5][2:])
286 }
287
288 post := appbsky.FeedPost{}
289 if err := json.Unmarshal(raw, &post); err != nil {
290 return fmt.Errorf("failed to unmarshal post: %w", err)
291 }
292
293 job := PostIndexJob{
294 did: did,
295 rkey: rkey.String(),
296 rcid: cid,
297 record: &post,
298 }
299
300 // Send the job to the post queue
301 idx.postQueue <- &job
302
303 return nil
304}
305
306func (idx *Indexer) processProfileCSVLine(line string) error {
307 // CSV is formatted as
308 // actor_did,taken_down(time or null),cid,handle,raw(profile JSON as hex)
309 parts := strings.Split(line, ",")
310 if len(parts) != 5 {
311 return fmt.Errorf("invalid csv line: %s", line)
312 }
313
314 did, err := syntax.ParseDID(parts[0])
315 if err != nil {
316 return fmt.Errorf("invalid DID: %s", parts[0])
317 }
318
319 isTakenDown := false
320 if parts[1] != "" && parts[1] != "null" {
321 isTakenDown = true
322 }
323
324 if isTakenDown {
325 return nil
326 }
327
328 // Skip actors without profile records
329 if parts[2] == "" {
330 return nil
331 }
332
333 cid, err := cid.Parse(parts[2])
334 if err != nil {
335 return fmt.Errorf("invalid CID: %s", parts[2])
336 }
337
338 if len(parts[3]) <= 2 {
339 return nil
340 }
341
342 raw, err := hex.DecodeString(parts[4][2:])
343 if err != nil {
344 return fmt.Errorf("invalid raw record (%s): %s", did, parts[4][2:])
345 }
346
347 profile := appbsky.ActorProfile{}
348 if err := json.Unmarshal(raw, &profile); err != nil {
349 return fmt.Errorf("failed to unmarshal profile: %w", err)
350 }
351
352 ident := identity.Identity{DID: did}
353
354 handle, err := syntax.ParseHandle(parts[3])
355 if err != nil {
356 ident.Handle = syntax.HandleInvalid
357 } else {
358 ident.Handle = handle
359 }
360
361 job := ProfileIndexJob{
362 ident: &ident,
363 rcid: cid,
364 record: &profile,
365 }
366
367 // Send the job to the profile queue
368 idx.profileQueue <- &job
369
370 return nil
371}