+3
cmd/palomar/Dockerfile copy.opensearch
+3
cmd/palomar/Dockerfile copy.opensearch
+2
cmd/palomar/Dockerfile.opensearch-dashboards
+2
cmd/palomar/Dockerfile.opensearch-dashboards
+110
cmd/palomar/docker-compose.yml
+110
cmd/palomar/docker-compose.yml
···
1
+
version: "3.9"
2
+
services:
3
+
opensearch:
4
+
container_name: opensearch
5
+
build:
6
+
context: ../../
7
+
dockerfile: cmd/palomar/Dockerfile.opensearch
8
+
ports:
9
+
- "9200:9200"
10
+
- "9600:9600"
11
+
environment:
12
+
- "discovery.type=single-node"
13
+
- "cluster.name=opensearch-palomar"
14
+
- "plugins.security.disabled=true"
15
+
- "bootstrap.memory_lock=true" # Disable JVM heap memory swapping
16
+
- "OPENSEARCH_JAVA_OPTS=-Xms4096m -Xmx4096m" # Set min and max JVM heap sizes to at least 50% of system RAM
17
+
ulimits:
18
+
memlock:
19
+
soft: -1
20
+
hard: -1
21
+
nofile:
22
+
soft: 65536
23
+
hard: 65536
24
+
volumes:
25
+
- type: bind
26
+
source: ../../data/opensearch
27
+
target: /usr/share/opensearch/data
28
+
indexer:
29
+
container_name: indexer
30
+
build:
31
+
context: ../../
32
+
dockerfile: cmd/palomar/Dockerfile
33
+
environment:
34
+
- "GOLOG_LOG_LEVEL=info"
35
+
- "ATP_PLC_HOST=https://plc.directory"
36
+
- "ATP_BGS_HOST=wss://bsky.network"
37
+
- "ELASTIC_HOSTS=http://opensearch:9200"
38
+
- "ES_INSECURE_SSL=true"
39
+
- "ENVIRONMENT=dev"
40
+
- "ES_POST_INDEX=palomar_post_dev"
41
+
- "ES_PROFILE_INDEX=palomar_profile_dev"
42
+
- "PALOMAR_DISCOVER_REPOS=false"
43
+
- "PALOMAR_BGS_SYNC_RATE_LIMIT=20"
44
+
- "PALOMAR_INDEX_MAX_CONCURRENCY=5"
45
+
- "DATABASE_URL=sqlite:///data/palomar/search.db"
46
+
depends_on:
47
+
- opensearch
48
+
volumes:
49
+
- type: bind
50
+
source: ../../data
51
+
target: /data
52
+
# pagerank:
53
+
# container_name: pagerank
54
+
# build:
55
+
# context: ../../
56
+
# dockerfile: cmd/palomar/Dockerfile
57
+
# environment:
58
+
# - "GOLOG_LOG_LEVEL=info"
59
+
# - "ATP_PLC_HOST=https://plc.directory"
60
+
# - "ATP_BGS_HOST=wss://bsky.network"
61
+
# - "ELASTIC_HOSTS=http://opensearch:9200"
62
+
# - "ES_INSECURE_SSL=true"
63
+
# - "ENVIRONMENT=dev"
64
+
# - "ES_POST_INDEX=palomar_post_dev"
65
+
# - "ES_PROFILE_INDEX=palomar_profile_dev"
66
+
# - "PALOMAR_DISCOVER_REPOS=false"
67
+
# - "PALOMAR_BGS_SYNC_RATE_LIMIT=20"
68
+
# - "PALOMAR_INDEX_MAX_CONCURRENCY=5"
69
+
# - "DATABASE_URL=sqlite:///data/palomar/pagerank.db"
70
+
# - "PAGERANK_FILE=/data/palomar/pageranks.csv"
71
+
# depends_on:
72
+
# - opensearch
73
+
# volumes:
74
+
# - type: bind
75
+
# source: ../../data
76
+
# target: /data
77
+
api:
78
+
container_name: api
79
+
build:
80
+
context: ../../
81
+
dockerfile: cmd/palomar/Dockerfile
82
+
ports:
83
+
- "3999:3999"
84
+
- "3998:3998"
85
+
environment:
86
+
- "GOLOG_LOG_LEVEL=info"
87
+
- "ATP_PLC_HOST=https://plc.directory"
88
+
- "ATP_BGS_HOST=wss://bsky.network"
89
+
- "ELASTIC_HOSTS=http://opensearch:9200"
90
+
- "ES_INSECURE_SSL=true"
91
+
- "ENVIRONMENT=dev"
92
+
- "ES_POST_INDEX=palomar_post_dev"
93
+
- "ES_PROFILE_INDEX=palomar_profile_dev"
94
+
- "DATABASE_URL=sqlite:///data/palomar/search.db"
95
+
- "PALOMAR_READONLY=true"
96
+
volumes:
97
+
- type: bind
98
+
source: ../../data
99
+
target: /data
100
+
opensearch-dashboards:
101
+
build:
102
+
context: ../../
103
+
dockerfile: cmd/palomar/Dockerfile.opensearch-dashboards
104
+
container_name: opensearch-dashboards
105
+
ports:
106
+
- 5601:5601
107
+
environment:
108
+
OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
109
+
networks:
110
+
default:
+93
-20
cmd/palomar/main.go
+93
-20
cmd/palomar/main.go
···
103
103
EnvVars: []string{"MAX_METADB_CONNECTIONS"},
104
104
Value: 40,
105
105
},
106
+
&cli.StringFlag{
107
+
Name: "log-level",
108
+
Usage: "log level (debug, info, warn, error)",
109
+
Value: "info",
110
+
EnvVars: []string{"GOLOG_LOG_LEVEL", "LOG_LEVEL"},
111
+
},
106
112
}
107
113
108
114
app.Commands = []*cli.Command{
···
153
159
EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"},
154
160
},
155
161
&cli.IntFlag{
162
+
Name: "indexing-rate-limit",
163
+
Usage: "max number of documents per second to index",
164
+
Value: 50_000,
165
+
EnvVars: []string{"PALOMAR_INDEXING_RATE_LIMIT"},
166
+
},
167
+
&cli.IntFlag{
156
168
Name: "plc-rate-limit",
157
169
Usage: "max number of requests per second to PLC registry",
158
170
Value: 100,
···
164
176
EnvVars: []string{"PALOMAR_DISCOVER_REPOS"},
165
177
Value: false,
166
178
},
179
+
&cli.StringFlag{
180
+
Name: "pagerank-file",
181
+
EnvVars: []string{"PAGERANK_FILE"},
182
+
},
183
+
&cli.StringFlag{
184
+
Name: "bulk-posts-file",
185
+
EnvVars: []string{"BULK_POSTS_FILE"},
186
+
},
187
+
&cli.StringFlag{
188
+
Name: "bulk-profiles-file",
189
+
EnvVars: []string{"BULK_PROFILES_FILE"},
190
+
},
167
191
},
168
192
Action: func(cctx *cli.Context) error {
193
+
logLevel := slog.LevelInfo
194
+
switch cctx.String("log-level") {
195
+
case "debug":
196
+
logLevel = slog.LevelDebug
197
+
case "info":
198
+
logLevel = slog.LevelInfo
199
+
case "warn":
200
+
logLevel = slog.LevelWarn
201
+
case "error":
202
+
logLevel = slog.LevelError
203
+
}
204
+
169
205
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
170
-
Level: slog.LevelInfo,
206
+
Level: logLevel,
207
+
AddSource: true,
171
208
}))
172
209
slog.SetDefault(logger)
210
+
211
+
readonly := cctx.Bool("readonly")
173
212
174
213
// Enable OTLP HTTP exporter
175
214
// For relevant environment variables:
···
206
245
otel.SetTracerProvider(tp)
207
246
}
208
247
209
-
db, err := cliutil.SetupDatabase(cctx.String("database-url"), cctx.Int("max-metadb-connections"))
210
-
if err != nil {
211
-
return err
212
-
}
213
-
214
248
escli, err := createEsClient(cctx)
215
249
if err != nil {
216
250
return fmt.Errorf("failed to get elasticsearch: %w", err)
217
251
}
218
252
219
-
// TODO: replace this with "bingo" resolver
220
253
base := identity.BaseDirectory{
221
254
PLCURL: cctx.String("atp-plc-host"),
222
255
HTTPClient: http.Client{
···
228
261
}
229
262
dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2, time.Minute*5)
230
263
231
-
srv, err := search.NewServer(
232
-
db,
233
-
escli,
234
-
&dir,
235
-
search.Config{
264
+
apiConfig := search.ServerConfig{
265
+
Logger: logger,
266
+
ProfileIndex: cctx.String("es-profile-index"),
267
+
PostIndex: cctx.String("es-post-index"),
268
+
}
269
+
270
+
srv, err := search.NewServer(escli, &dir, apiConfig)
271
+
if err != nil {
272
+
return err
273
+
}
274
+
275
+
// Configure the indexer if we're not in readonly mode
276
+
if !readonly {
277
+
db, err := cliutil.SetupDatabase(cctx.String("database-url"), cctx.Int("max-metadb-connections"))
278
+
if err != nil {
279
+
return fmt.Errorf("failed to set up database: %w", err)
280
+
}
281
+
282
+
indexerConfig := search.IndexerConfig{
236
283
RelayHost: cctx.String("atp-relay-host"),
237
284
ProfileIndex: cctx.String("es-profile-index"),
238
285
PostIndex: cctx.String("es-post-index"),
239
286
Logger: logger,
240
287
RelaySyncRateLimit: cctx.Int("relay-sync-rate-limit"),
241
288
IndexMaxConcurrency: cctx.Int("index-max-concurrency"),
242
-
},
243
-
)
244
-
if err != nil {
245
-
return err
289
+
DiscoverRepos: cctx.Bool("discover-repos"),
290
+
IndexingRateLimit: cctx.Int("indexing-rate-limit"),
291
+
}
292
+
293
+
idx, err := search.NewIndexer(db, escli, &dir, indexerConfig)
294
+
if err != nil {
295
+
return fmt.Errorf("failed to set up indexer: %w", err)
296
+
}
297
+
298
+
srv.Indexer = idx
246
299
}
247
300
248
301
go func() {
···
256
309
srv.RunAPI(cctx.String("bind"))
257
310
}()
258
311
259
-
if cctx.Bool("readonly") {
312
+
// If we're in readonly mode, just block forever
313
+
if readonly {
260
314
select {}
261
-
} else {
315
+
} else if cctx.String("pagerank-file") != "" && srv.Indexer != nil {
316
+
// If we're not in readonly mode, and we have a pagerank file, update pageranks
262
317
ctx := context.Background()
263
-
if err := srv.EnsureIndices(ctx); err != nil {
318
+
if err := srv.Indexer.BulkIndexPageranks(ctx, cctx.String("pagerank-file")); err != nil {
319
+
return fmt.Errorf("failed to update pageranks: %w", err)
320
+
}
321
+
} else if cctx.String("bulk-posts-file") != "" && srv.Indexer != nil {
322
+
// If we're not in readonly mode, and we have a bulk posts file, index posts
323
+
ctx := context.Background()
324
+
if err := srv.Indexer.BulkIndexPosts(ctx, cctx.String("bulk-posts-file")); err != nil {
325
+
return fmt.Errorf("failed to bulk index posts: %w", err)
326
+
}
327
+
} else if cctx.String("bulk-profiles-file") != "" && srv.Indexer != nil {
328
+
// If we're not in readonly mode, and we have a bulk profiles file, index profiles
329
+
ctx := context.Background()
330
+
if err := srv.Indexer.BulkIndexProfiles(ctx, cctx.String("bulk-profiles-file")); err != nil {
331
+
return fmt.Errorf("failed to bulk index profiles: %w", err)
332
+
}
333
+
} else if srv.Indexer != nil {
334
+
// Otherwise, just run the indexer
335
+
ctx := context.Background()
336
+
if err := srv.Indexer.EnsureIndices(ctx); err != nil {
264
337
return fmt.Errorf("failed to create opensearch indices: %w", err)
265
338
}
266
-
if err := srv.RunIndexer(ctx); err != nil {
339
+
if err := srv.Indexer.RunIndexer(ctx); err != nil {
267
340
return fmt.Errorf("failed to run indexer: %w", err)
268
341
}
269
342
}
+4
cmd/palomar/opensearch_dashboards.yml
+4
cmd/palomar/opensearch_dashboards.yml
+57
cmd/palomar/pagerank.sh
+57
cmd/palomar/pagerank.sh
···
1
+
#!/bin/bash
2
+
set -o errexit
3
+
set -o nounset
4
+
set -o pipefail
5
+
6
+
export SCYLLA_KEYSPACE="${SCYLLA_KEYSPACE:-}"
7
+
export SCYLLA_HOST="${SCYLLA_HOST:-}"
8
+
9
+
# Used by pagerank.
10
+
export FOLLOWS_FILE="/data/follows.csv"
11
+
export ACTORS_FILE="/data/actors.csv"
12
+
export OUTPUT_FILE="/data/pageranks.csv"
13
+
export EXPECTED_ACTOR_COUNT="5000000"
14
+
export RUST_LOG="info"
15
+
16
+
# Used by palomar.
17
+
export PAGERANK_FILE="${OUTPUT_FILE}"
18
+
export PALOMAR_INDEXING_RATE_LIMIT="10000"
19
+
20
+
function run_pagerank {
21
+
# Check that the required environment variables are set.
22
+
if [[ "${SCYLLA_KEYSPACE}" == "" ]]; then
23
+
echo "SCYLLA_KEYSPACE is not set"
24
+
exit 1
25
+
fi
26
+
27
+
if [[ "${SCYLLA_HOST}" == "" ]]; then
28
+
echo "SCYLLA_HOST is not set"
29
+
exit 1
30
+
fi
31
+
32
+
# Dump the tables to CSV files.
33
+
rm --force "${FOLLOWS_FILE}"
34
+
cqlsh \
35
+
"--keyspace=${SCYLLA_KEYSPACE}" \
36
+
"--request-timeout=1200" \
37
+
---execute "COPY follows (actor_did, subject_did) TO '${FOLLOWS_FILE}' WITH HEADER = FALSE;" \
38
+
"${SCYLLA_HOST}"
39
+
40
+
rm --force "${ACTORS_FILE}"
41
+
cqlsh \
42
+
"--keyspace=${SCYLLA_KEYSPACE}" \
43
+
"--request-timeout=1200" \
44
+
---execute "COPY actors (did) TO '${ACTORS_FILE}' WITH HEADER = FALSE;" \
45
+
"${SCYLLA_HOST}"
46
+
47
+
# Run the pagerank file which reads in the table CSV files and outputs a CSV.
48
+
/usr/local/bin/pagerank
49
+
50
+
# Run palomar with the pagerank CSV file.
51
+
/palomar run
52
+
}
53
+
54
+
while true; do
55
+
run_pagerank
56
+
sleep 24h
57
+
done
+1
-1
go.mod
+1
-1
go.mod
···
4
4
5
5
require (
6
6
contrib.go.opencensus.io/exporter/prometheus v0.4.2
7
+
github.com/PuerkitoBio/purell v1.2.1
7
8
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b
8
9
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
9
10
github.com/brianvoe/gofakeit/v6 v6.25.0
···
74
75
)
75
76
76
77
require (
77
-
github.com/PuerkitoBio/purell v1.2.1 // indirect
78
78
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
79
79
github.com/go-redis/redis v6.15.9+incompatible // indirect
80
80
github.com/hashicorp/golang-lru v1.0.2 // indirect
+371
search/bulk.go
+371
search/bulk.go
···
1
+
package search
2
+
3
+
import (
4
+
"bufio"
5
+
"context"
6
+
"encoding/hex"
7
+
"encoding/json"
8
+
"fmt"
9
+
"os"
10
+
"strconv"
11
+
"strings"
12
+
"sync"
13
+
14
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
15
+
"github.com/bluesky-social/indigo/atproto/identity"
16
+
"github.com/bluesky-social/indigo/atproto/syntax"
17
+
18
+
"github.com/ipfs/go-cid"
19
+
)
20
+
21
+
type pagerankJob struct {
22
+
did syntax.DID
23
+
rank float64
24
+
}
25
+
26
+
// BulkIndexPageranks updates the pageranks for the DIDs in the Serch Index from a CSV file.
27
+
func (idx *Indexer) BulkIndexPageranks(ctx context.Context, pagerankFile string) error {
28
+
f, err := os.Open(pagerankFile)
29
+
if err != nil {
30
+
return fmt.Errorf("failed to open csv file: %w", err)
31
+
}
32
+
defer f.Close()
33
+
34
+
// Run 5 pagerank indexers in parallel
35
+
for i := 0; i < 5; i++ {
36
+
go idx.runPagerankIndexer(ctx)
37
+
}
38
+
39
+
logger := idx.logger.With("source", "bulk_index_pageranks")
40
+
41
+
queue := make(chan string, 20_000)
42
+
wg := &sync.WaitGroup{}
43
+
workerCount := 20
44
+
for i := 0; i < workerCount; i++ {
45
+
wg.Add(1)
46
+
go func() {
47
+
defer wg.Done()
48
+
for line := range queue {
49
+
if err := idx.processPagerankCSVLine(line); err != nil {
50
+
logger.Error("failed to process line", "err", err)
51
+
}
52
+
}
53
+
}()
54
+
}
55
+
56
+
// Create a scanner to read the file line by line
57
+
scanner := bufio.NewScanner(f)
58
+
buf := make([]byte, 0, 64*1024)
59
+
scanner.Buffer(buf, 1024*1024)
60
+
61
+
linesRead := 0
62
+
63
+
// Iterate over each line in the file
64
+
for scanner.Scan() {
65
+
line := scanner.Text()
66
+
67
+
queue <- line
68
+
69
+
linesRead++
70
+
if linesRead%100_000 == 0 {
71
+
idx.logger.Info("processed csv lines", "lines", linesRead)
72
+
}
73
+
}
74
+
75
+
close(queue)
76
+
77
+
// Check for any scanner errors
78
+
if err := scanner.Err(); err != nil {
79
+
return fmt.Errorf("error reading csv file: %w", err)
80
+
}
81
+
82
+
wg.Wait()
83
+
84
+
idx.logger.Info("finished processing csv file", "lines", linesRead)
85
+
86
+
return nil
87
+
}
88
+
89
+
// BulkIndexPosts indexes posts from a CSV file.
90
+
func (idx *Indexer) BulkIndexPosts(ctx context.Context, postsFile string) error {
91
+
f, err := os.Open(postsFile)
92
+
if err != nil {
93
+
return fmt.Errorf("failed to open csv file: %w", err)
94
+
}
95
+
defer f.Close()
96
+
97
+
// Run 5 post indexers in parallel
98
+
for i := 0; i < 5; i++ {
99
+
go idx.runPostIndexer(ctx)
100
+
}
101
+
102
+
logger := idx.logger.With("source", "bulk_index_posts")
103
+
104
+
queue := make(chan string, 20_000)
105
+
wg := &sync.WaitGroup{}
106
+
workerCount := 20
107
+
for i := 0; i < workerCount; i++ {
108
+
wg.Add(1)
109
+
go func() {
110
+
defer wg.Done()
111
+
for line := range queue {
112
+
if err := idx.processPostCSVLine(line); err != nil {
113
+
logger.Error("failed to process line", "err", err)
114
+
}
115
+
}
116
+
}()
117
+
}
118
+
119
+
// Create a scanner to read the file line by line
120
+
scanner := bufio.NewScanner(f)
121
+
buf := make([]byte, 0, 64*1024)
122
+
scanner.Buffer(buf, 1024*1024)
123
+
124
+
linesRead := 0
125
+
126
+
// Iterate over each line in the file
127
+
for scanner.Scan() {
128
+
line := scanner.Text()
129
+
130
+
queue <- line
131
+
132
+
linesRead++
133
+
if linesRead%100_000 == 0 {
134
+
idx.logger.Info("processed csv lines", "lines", linesRead)
135
+
}
136
+
}
137
+
138
+
close(queue)
139
+
140
+
// Check for any scanner errors
141
+
if err := scanner.Err(); err != nil {
142
+
return fmt.Errorf("error reading csv file: %w", err)
143
+
}
144
+
145
+
wg.Wait()
146
+
147
+
idx.logger.Info("finished processing csv file", "lines", linesRead)
148
+
149
+
return nil
150
+
}
151
+
152
+
// BulkIndexProfiles indexes profiles from a CSV file.
153
+
func (idx *Indexer) BulkIndexProfiles(ctx context.Context, profilesFile string) error {
154
+
f, err := os.Open(profilesFile)
155
+
if err != nil {
156
+
return fmt.Errorf("failed to open csv file: %w", err)
157
+
}
158
+
defer f.Close()
159
+
160
+
for i := 0; i < 5; i++ {
161
+
go idx.runProfileIndexer(ctx)
162
+
}
163
+
164
+
logger := idx.logger.With("source", "bulk_index_profiles")
165
+
166
+
queue := make(chan string, 20_000)
167
+
wg := &sync.WaitGroup{}
168
+
workerCount := 20
169
+
for i := 0; i < workerCount; i++ {
170
+
wg.Add(1)
171
+
go func() {
172
+
defer wg.Done()
173
+
for line := range queue {
174
+
if err := idx.processProfileCSVLine(line); err != nil {
175
+
logger.Error("failed to process line", "err", err)
176
+
}
177
+
}
178
+
}()
179
+
}
180
+
181
+
// Create a scanner to read the file line by line
182
+
scanner := bufio.NewScanner(f)
183
+
buf := make([]byte, 0, 64*1024)
184
+
scanner.Buffer(buf, 1024*1024)
185
+
186
+
linesRead := 0
187
+
188
+
// Iterate over each line in the file
189
+
for scanner.Scan() {
190
+
line := scanner.Text()
191
+
192
+
queue <- line
193
+
194
+
linesRead++
195
+
if linesRead%100_000 == 0 {
196
+
idx.logger.Info("processed csv lines", "lines", linesRead)
197
+
}
198
+
}
199
+
200
+
close(queue)
201
+
202
+
// Check for any scanner errors
203
+
if err := scanner.Err(); err != nil {
204
+
return fmt.Errorf("error reading csv file: %w", err)
205
+
}
206
+
207
+
wg.Wait()
208
+
209
+
idx.logger.Info("finished processing csv file", "lines", linesRead)
210
+
211
+
return nil
212
+
}
213
+
214
+
func (idx *Indexer) processPagerankCSVLine(line string) error {
215
+
// Split the line into DID and rank
216
+
parts := strings.Split(line, ",")
217
+
if len(parts) != 2 {
218
+
return fmt.Errorf("invalid pagerank line: %s", line)
219
+
}
220
+
221
+
did, err := syntax.ParseDID(parts[0])
222
+
if err != nil {
223
+
return fmt.Errorf("invalid DID: %s", parts[0])
224
+
}
225
+
226
+
rank, err := strconv.ParseFloat(parts[1], 64)
227
+
if err != nil {
228
+
return fmt.Errorf("invalid pagerank value: %s", parts[1])
229
+
}
230
+
231
+
job := PagerankIndexJob{
232
+
did: did,
233
+
rank: rank,
234
+
}
235
+
236
+
// Send the job to the pagerank queue
237
+
idx.pagerankQueue <- &job
238
+
239
+
return nil
240
+
}
241
+
242
+
func (idx *Indexer) processPostCSVLine(line string) error {
243
+
// CSV is formatted as
244
+
// actor_did,rkey,taken_down(time or null),violates_threadgate(False or null),cid,raw(post JSON as hex)
245
+
parts := strings.Split(line, ",")
246
+
if len(parts) != 6 {
247
+
return fmt.Errorf("invalid csv line: %s", line)
248
+
}
249
+
250
+
did, err := syntax.ParseDID(parts[0])
251
+
if err != nil {
252
+
return fmt.Errorf("invalid DID: %s", parts[0])
253
+
}
254
+
255
+
rkey, err := syntax.ParseRecordKey(parts[1])
256
+
if err != nil {
257
+
return fmt.Errorf("invalid record key: %s", parts[1])
258
+
}
259
+
260
+
isTakenDown := false
261
+
if parts[2] != "" && parts[2] != "null" {
262
+
isTakenDown = true
263
+
}
264
+
265
+
violatesThreadgate := false
266
+
if parts[3] != "" && parts[3] != "False" {
267
+
violatesThreadgate = true
268
+
}
269
+
270
+
if isTakenDown || violatesThreadgate {
271
+
return nil
272
+
}
273
+
274
+
cid, err := cid.Parse(parts[4])
275
+
if err != nil {
276
+
return fmt.Errorf("invalid CID: %s", parts[4])
277
+
}
278
+
279
+
if len(parts[5]) <= 2 {
280
+
return nil
281
+
}
282
+
283
+
raw, err := hex.DecodeString(parts[5][2:])
284
+
if err != nil {
285
+
return fmt.Errorf("invalid raw record (%s/%s): %s", did, rkey, parts[5][2:])
286
+
}
287
+
288
+
post := appbsky.FeedPost{}
289
+
if err := json.Unmarshal(raw, &post); err != nil {
290
+
return fmt.Errorf("failed to unmarshal post: %w", err)
291
+
}
292
+
293
+
job := PostIndexJob{
294
+
did: did,
295
+
rkey: rkey.String(),
296
+
rcid: cid,
297
+
record: &post,
298
+
}
299
+
300
+
// Send the job to the post queue
301
+
idx.postQueue <- &job
302
+
303
+
return nil
304
+
}
305
+
306
+
func (idx *Indexer) processProfileCSVLine(line string) error {
307
+
// CSV is formatted as
308
+
// actor_did,taken_down(time or null),cid,handle,raw(profile JSON as hex)
309
+
parts := strings.Split(line, ",")
310
+
if len(parts) != 5 {
311
+
return fmt.Errorf("invalid csv line: %s", line)
312
+
}
313
+
314
+
did, err := syntax.ParseDID(parts[0])
315
+
if err != nil {
316
+
return fmt.Errorf("invalid DID: %s", parts[0])
317
+
}
318
+
319
+
isTakenDown := false
320
+
if parts[1] != "" && parts[1] != "null" {
321
+
isTakenDown = true
322
+
}
323
+
324
+
if isTakenDown {
325
+
return nil
326
+
}
327
+
328
+
// Skip actors without profile records
329
+
if parts[2] == "" {
330
+
return nil
331
+
}
332
+
333
+
cid, err := cid.Parse(parts[2])
334
+
if err != nil {
335
+
return fmt.Errorf("invalid CID: %s", parts[2])
336
+
}
337
+
338
+
if len(parts[3]) <= 2 {
339
+
return nil
340
+
}
341
+
342
+
raw, err := hex.DecodeString(parts[4][2:])
343
+
if err != nil {
344
+
return fmt.Errorf("invalid raw record (%s): %s", did, parts[4][2:])
345
+
}
346
+
347
+
profile := appbsky.ActorProfile{}
348
+
if err := json.Unmarshal(raw, &profile); err != nil {
349
+
return fmt.Errorf("failed to unmarshal profile: %w", err)
350
+
}
351
+
352
+
ident := identity.Identity{DID: did}
353
+
354
+
handle, err := syntax.ParseHandle(parts[3])
355
+
if err != nil {
356
+
ident.Handle = syntax.HandleInvalid
357
+
} else {
358
+
ident.Handle = handle
359
+
}
360
+
361
+
job := ProfileIndexJob{
362
+
ident: &ident,
363
+
rcid: cid,
364
+
record: &profile,
365
+
}
366
+
367
+
// Send the job to the profile queue
368
+
idx.profileQueue <- &job
369
+
370
+
return nil
371
+
}
+110
-59
search/firehose.go
+110
-59
search/firehose.go
···
24
24
"github.com/ipfs/go-cid"
25
25
)
26
26
27
-
func (s *Server) getLastCursor() (int64, error) {
27
+
func (idx *Indexer) getLastCursor() (int64, error) {
28
28
var lastSeq LastSeq
29
-
if err := s.db.Find(&lastSeq).Error; err != nil {
29
+
if err := idx.db.Find(&lastSeq).Error; err != nil {
30
30
return 0, err
31
31
}
32
32
33
33
if lastSeq.ID == 0 {
34
-
return 0, s.db.Create(&lastSeq).Error
34
+
return 0, idx.db.Create(&lastSeq).Error
35
35
}
36
36
37
37
return lastSeq.Seq, nil
38
38
}
39
39
40
-
func (s *Server) updateLastCursor(curs int64) error {
41
-
return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error
40
+
func (idx *Indexer) updateLastCursor(curs int64) error {
41
+
return idx.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error
42
42
}
43
43
44
-
func (s *Server) RunIndexer(ctx context.Context) error {
45
-
cur, err := s.getLastCursor()
44
+
func (idx *Indexer) RunIndexer(ctx context.Context) error {
45
+
cur, err := idx.getLastCursor()
46
46
if err != nil {
47
47
return fmt.Errorf("get last cursor: %w", err)
48
48
}
49
49
50
-
err = s.bfs.LoadJobs(ctx)
50
+
// Start the indexer batch workers
51
+
go idx.runPostIndexer(ctx)
52
+
go idx.runProfileIndexer(ctx)
53
+
54
+
err = idx.bfs.LoadJobs(ctx)
51
55
if err != nil {
52
56
return fmt.Errorf("loading backfill jobs: %w", err)
53
57
}
54
-
go s.bf.Start()
58
+
go idx.bf.Start()
55
59
56
-
if s.enableRepoDiscovery {
57
-
go s.discoverRepos()
60
+
if idx.enableRepoDiscovery {
61
+
go idx.discoverRepos()
58
62
}
59
63
60
64
d := websocket.DefaultDialer
61
-
u, err := url.Parse(s.relayHost)
65
+
u, err := url.Parse(idx.relayhost)
62
66
if err != nil {
63
-
return fmt.Errorf("invalid relayHost URI: %w", err)
67
+
return fmt.Errorf("invalid bgshost URI: %w", err)
64
68
}
65
69
u.Path = "xrpc/com.atproto.sync.subscribeRepos"
66
70
if cur != 0 {
···
81
85
82
86
defer func() {
83
87
if evt.Seq%50 == 0 {
84
-
if err := s.updateLastCursor(evt.Seq); err != nil {
85
-
s.logger.Error("failed to persist cursor", "err", err)
88
+
if err := idx.updateLastCursor(evt.Seq); err != nil {
89
+
idx.logger.Error("failed to persist cursor", "err", err)
86
90
}
87
91
}
88
92
}()
89
-
logEvt := s.logger.With("repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
93
+
logEvt := idx.logger.With("repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
90
94
if evt.TooBig && evt.Since != nil {
91
95
// TODO: handle this case (instead of return nil)
92
96
logEvt.Error("skipping non-genesis tooBig events for now")
···
94
98
}
95
99
96
100
if evt.TooBig {
97
-
if err := s.processTooBigCommit(ctx, evt); err != nil {
101
+
if err := idx.processTooBigCommit(ctx, evt); err != nil {
98
102
// TODO: handle this case (instead of return nil)
99
103
logEvt.Error("failed to process tooBig event", "err", err)
100
104
return nil
···
104
108
}
105
109
106
110
// Pass events to the backfiller which will process or buffer as needed
107
-
if err := s.bf.HandleEvent(ctx, evt); err != nil {
111
+
if err := idx.bf.HandleEvent(ctx, evt); err != nil {
108
112
logEvt.Error("failed to handle event", "err", err)
109
113
}
110
114
···
118
122
119
123
did, err := syntax.ParseDID(evt.Did)
120
124
if err != nil {
121
-
s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
125
+
idx.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
122
126
return nil
123
127
}
124
-
if err := s.updateUserHandle(ctx, did, evt.Handle); err != nil {
128
+
if err := idx.updateUserHandle(ctx, did, evt.Handle); err != nil {
125
129
// TODO: handle this case (instead of return nil)
126
-
s.logger.Error("failed to update user handle", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
130
+
idx.logger.Error("failed to update user handle", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
127
131
}
128
132
return nil
129
133
},
···
132
136
return events.HandleRepoStream(
133
137
ctx, con, autoscaling.NewScheduler(
134
138
autoscaling.DefaultAutoscaleSettings(),
135
-
s.relayHost,
139
+
idx.relayhost,
136
140
rsc.EventHandler,
137
141
),
138
142
)
139
143
}
140
144
141
-
func (s *Server) discoverRepos() {
145
+
func (idx *Indexer) discoverRepos() {
142
146
ctx := context.Background()
143
-
log := s.logger.With("func", "discoverRepos")
147
+
log := idx.logger.With("func", "discoverRepos")
144
148
log.Info("starting repo discovery")
145
149
146
150
cursor := ""
···
150
154
totalErrored := 0
151
155
152
156
for {
153
-
resp, err := comatproto.SyncListRepos(ctx, s.relayClient, cursor, limit)
157
+
resp, err := comatproto.SyncListRepos(ctx, idx.relayXRPC, cursor, limit)
154
158
if err != nil {
155
159
log.Error("failed to list repos", "err", err)
156
160
time.Sleep(5 * time.Second)
···
159
163
log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor)
160
164
errored := 0
161
165
for _, repo := range resp.Repos {
162
-
_, err := s.bfs.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
166
+
_, err := idx.bfs.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
163
167
if err != nil {
164
168
log.Error("failed to get or create job", "did", repo.Did, "err", err)
165
169
errored++
···
178
182
log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored)
179
183
}
180
184
181
-
func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, rev string, path string, recB *[]byte, rcid *cid.Cid) error {
185
+
func (idx *Indexer) handleCreateOrUpdate(ctx context.Context, rawDID string, rev string, path string, recB *[]byte, rcid *cid.Cid) error {
186
+
logger := idx.logger.With("func", "handleCreateOrUpdate", "did", rawDID, "rev", rev, "path", path)
182
187
// Since this gets called in a backfill job, we need to check if the path is a post or profile
183
188
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
184
189
return nil
···
189
194
return fmt.Errorf("bad DID syntax in event: %w", err)
190
195
}
191
196
192
-
ident, err := s.dir.LookupDID(ctx, did)
193
-
if err != nil {
194
-
return fmt.Errorf("resolving identity: %w", err)
195
-
}
196
-
if ident == nil {
197
-
return fmt.Errorf("identity not found for did: %s", did.String())
198
-
}
199
-
200
197
// CBOR Unmarshal the record
201
198
recCBOR, err := lexutil.CborDecodeValue(*recB)
202
199
if err != nil {
···
208
205
return fmt.Errorf("failed to cast record to CBORMarshaler")
209
206
}
210
207
208
+
parts := strings.SplitN(path, "/", 3)
209
+
if len(parts) < 2 {
210
+
logger.Warn("skipping post record with malformed path")
211
+
return nil
212
+
}
213
+
211
214
switch rec := rec.(type) {
212
215
case *bsky.FeedPost:
213
-
if err := s.indexPost(ctx, ident, rec, path, *rcid); err != nil {
214
-
postsFailed.Inc()
215
-
return fmt.Errorf("indexing post for %s: %w", did.String(), err)
216
+
rkey, err := syntax.ParseTID(parts[1])
217
+
if err != nil {
218
+
logger.Warn("skipping post record with non-TID rkey")
219
+
return nil
220
+
}
221
+
222
+
job := PostIndexJob{
223
+
did: did,
224
+
record: rec,
225
+
rcid: *rcid,
226
+
rkey: rkey.String(),
216
227
}
228
+
229
+
// Send the job to the bulk indexer
230
+
idx.postQueue <- &job
217
231
postsIndexed.Inc()
218
232
case *bsky.ActorProfile:
219
-
if err := s.indexProfile(ctx, ident, rec, path, *rcid); err != nil {
220
-
profilesFailed.Inc()
221
-
return fmt.Errorf("indexing profile for %s: %w", did.String(), err)
233
+
if parts[1] != "self" {
234
+
return nil
235
+
}
236
+
237
+
ident, err := idx.dir.LookupDID(ctx, did)
238
+
if err != nil {
239
+
return fmt.Errorf("resolving identity: %w", err)
222
240
}
241
+
if ident == nil {
242
+
return fmt.Errorf("identity not found for did: %s", did.String())
243
+
}
244
+
245
+
job := ProfileIndexJob{
246
+
ident: ident,
247
+
record: rec,
248
+
rcid: *rcid,
249
+
}
250
+
251
+
// Send the job to the bulk indexer
252
+
idx.profileQueue <- &job
223
253
profilesIndexed.Inc()
224
254
default:
225
255
}
226
256
return nil
227
257
}
228
258
229
-
func (s *Server) handleDelete(ctx context.Context, rawDID, rev, path string) error {
259
+
func (idx *Indexer) handleDelete(ctx context.Context, rawDID, rev, path string) error {
230
260
// Since this gets called in a backfill job, we need to check if the path is a post or profile
231
261
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
232
262
return nil
···
237
267
return fmt.Errorf("invalid DID in event: %w", err)
238
268
}
239
269
240
-
ident, err := s.dir.LookupDID(ctx, did)
241
-
if err != nil {
242
-
return err
243
-
}
244
-
if ident == nil {
245
-
return fmt.Errorf("identity not found for did: %s", did.String())
246
-
}
247
-
248
270
switch {
249
271
// TODO: handle profile deletes, its an edge case, but worth doing still
250
272
case strings.Contains(path, "app.bsky.feed.post"):
251
-
if err := s.deletePost(ctx, ident, path); err != nil {
273
+
if err := idx.deletePost(ctx, did, path); err != nil {
252
274
return err
253
275
}
254
276
postsDeleted.Inc()
···
259
281
return nil
260
282
}
261
283
262
-
func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
263
-
repodata, err := comatproto.SyncGetRepo(ctx, s.relayClient, evt.Repo, "")
284
+
func (idx *Indexer) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
285
+
logger := idx.logger.With("func", "processTooBigCommit", "repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
286
+
287
+
repodata, err := comatproto.SyncGetRepo(ctx, idx.relayXRPC, evt.Repo, "")
264
288
if err != nil {
265
289
return err
266
290
}
···
275
299
return fmt.Errorf("bad DID in repo event: %w", err)
276
300
}
277
301
278
-
ident, err := s.dir.LookupDID(ctx, did)
302
+
ident, err := idx.dir.LookupDID(ctx, did)
279
303
if err != nil {
280
304
return err
281
305
}
···
288
312
rcid, rec, err := r.GetRecord(ctx, k)
289
313
if err != nil {
290
314
// TODO: handle this case (instead of return nil)
291
-
s.logger.Error("failed to get record from repo checkout", "path", k, "err", err)
315
+
idx.logger.Error("failed to get record from repo checkout", "path", k, "err", err)
316
+
return nil
317
+
}
318
+
319
+
parts := strings.SplitN(k, "/", 3)
320
+
if len(parts) < 2 {
321
+
logger.Warn("skipping post record with malformed path")
292
322
return nil
293
323
}
294
324
295
325
switch rec := rec.(type) {
296
326
case *bsky.FeedPost:
297
-
if err := s.indexPost(ctx, ident, rec, k, rcid); err != nil {
298
-
return fmt.Errorf("indexing post: %w", err)
327
+
rkey, err := syntax.ParseTID(parts[1])
328
+
if err != nil {
329
+
logger.Warn("skipping post record with non-TID rkey")
330
+
return nil
299
331
}
332
+
333
+
job := PostIndexJob{
334
+
did: did,
335
+
record: rec,
336
+
rcid: rcid,
337
+
rkey: rkey.String(),
338
+
}
339
+
340
+
// Send the job to the bulk indexer
341
+
idx.postQueue <- &job
300
342
case *bsky.ActorProfile:
301
-
if err := s.indexProfile(ctx, ident, rec, k, rcid); err != nil {
302
-
return fmt.Errorf("indexing profile: %w", err)
343
+
if parts[1] != "self" {
344
+
return nil
345
+
}
346
+
347
+
job := ProfileIndexJob{
348
+
ident: ident,
349
+
record: rec,
350
+
rcid: rcid,
303
351
}
352
+
353
+
// Send the job to the bulk indexer
354
+
idx.profileQueue <- &job
304
355
default:
305
356
}
306
357
-53
search/handlers.go
-53
search/handlers.go
···
276
276
return e.JSON(200, out)
277
277
}
278
278
279
-
type IndexError struct {
280
-
DID string `json:"did"`
281
-
Err string `json:"err"`
282
-
}
283
-
284
-
func (s *Server) handleIndexRepos(e echo.Context) error {
285
-
ctx, span := tracer.Start(e.Request().Context(), "handleIndexRepos")
286
-
defer span.End()
287
-
288
-
dids, ok := e.QueryParams()["did"]
289
-
if !ok {
290
-
return e.JSON(400, map[string]any{
291
-
"error": "must pass at least one did to index",
292
-
})
293
-
}
294
-
295
-
for _, did := range dids {
296
-
_, err := syntax.ParseDID(did)
297
-
if err != nil {
298
-
return e.JSON(400, map[string]any{
299
-
"error": fmt.Sprintf("invalid DID (%s): %s", did, err),
300
-
})
301
-
}
302
-
}
303
-
304
-
errs := []IndexError{}
305
-
successes := 0
306
-
skipped := 0
307
-
for _, did := range dids {
308
-
job, err := s.bfs.GetJob(ctx, did)
309
-
if job == nil && err == nil {
310
-
err := s.bfs.EnqueueJob(ctx, did)
311
-
if err != nil {
312
-
errs = append(errs, IndexError{
313
-
DID: did,
314
-
Err: err.Error(),
315
-
})
316
-
continue
317
-
}
318
-
successes++
319
-
continue
320
-
}
321
-
skipped++
322
-
}
323
-
324
-
return e.JSON(200, map[string]any{
325
-
"numEnqueued": successes,
326
-
"numSkipped": skipped,
327
-
"numErrored": len(errs),
328
-
"errors": errs,
329
-
})
330
-
}
331
-
332
279
func (s *Server) SearchPosts(ctx context.Context, params *PostSearchParams) (*appbsky.UnspeccedSearchPostsSkeleton_Output, error) {
333
280
ctx, span := tracer.Start(ctx, "SearchPosts")
334
281
defer span.End()
+459
-73
search/indexing.go
+459
-73
search/indexing.go
···
3
3
import (
4
4
"bytes"
5
5
"context"
6
+
_ "embed"
6
7
"encoding/json"
7
8
"fmt"
8
9
"io"
10
+
"log/slog"
11
+
"os"
9
12
"strings"
13
+
"time"
10
14
11
15
appbsky "github.com/bluesky-social/indigo/api/bsky"
12
16
"github.com/bluesky-social/indigo/atproto/identity"
13
17
"github.com/bluesky-social/indigo/atproto/syntax"
18
+
"github.com/bluesky-social/indigo/backfill"
19
+
"github.com/bluesky-social/indigo/xrpc"
14
20
"github.com/ipfs/go-cid"
21
+
"github.com/labstack/echo/v4"
15
22
"go.opentelemetry.io/otel/attribute"
23
+
"golang.org/x/time/rate"
24
+
gorm "gorm.io/gorm"
16
25
26
+
es "github.com/opensearch-project/opensearch-go/v2"
17
27
esapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi"
18
28
)
19
29
20
-
func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, recordPath string) error {
30
+
type Indexer struct {
31
+
escli *es.Client
32
+
postIndex string
33
+
profileIndex string
34
+
db *gorm.DB
35
+
relayhost string
36
+
relayXRPC *xrpc.Client
37
+
dir identity.Directory
38
+
echo *echo.Echo
39
+
logger *slog.Logger
40
+
41
+
bfs *backfill.Gormstore
42
+
bf *backfill.Backfiller
43
+
44
+
enableRepoDiscovery bool
45
+
46
+
indexLimiter *rate.Limiter
47
+
profileQueue chan *ProfileIndexJob
48
+
postQueue chan *PostIndexJob
49
+
pagerankQueue chan *PagerankIndexJob
50
+
}
51
+
52
+
type IndexerConfig struct {
53
+
RelayHost string
54
+
ProfileIndex string
55
+
PostIndex string
56
+
Logger *slog.Logger
57
+
RelaySyncRateLimit int
58
+
IndexMaxConcurrency int
59
+
DiscoverRepos bool
60
+
IndexingRateLimit int
61
+
}
62
+
63
+
type ProfileIndexJob struct {
64
+
ident *identity.Identity
65
+
record *appbsky.ActorProfile
66
+
rcid cid.Cid
67
+
}
68
+
69
+
type PostIndexJob struct {
70
+
did syntax.DID
71
+
record *appbsky.FeedPost
72
+
rcid cid.Cid
73
+
rkey string
74
+
}
75
+
76
+
type PagerankIndexJob struct {
77
+
did syntax.DID
78
+
rank float64
79
+
}
80
+
81
+
func NewIndexer(db *gorm.DB, escli *es.Client, dir identity.Directory, config IndexerConfig) (*Indexer, error) {
82
+
logger := config.Logger
83
+
if logger == nil {
84
+
logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
85
+
Level: slog.LevelInfo,
86
+
}))
87
+
}
88
+
logger = logger.With("component", "indexer")
89
+
90
+
logger.Info("running database migrations")
91
+
db.AutoMigrate(&LastSeq{})
92
+
db.AutoMigrate(&backfill.GormDBJob{})
93
+
94
+
relayWS := config.RelayHost
95
+
if !strings.HasPrefix(relayWS, "ws") {
96
+
return nil, fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'")
97
+
}
98
+
99
+
relayHTTP := strings.Replace(relayWS, "ws", "http", 1)
100
+
relayXRPC := &xrpc.Client{
101
+
Host: relayHTTP,
102
+
}
103
+
104
+
limiter := rate.NewLimiter(rate.Limit(config.IndexingRateLimit), 10_000)
105
+
106
+
idx := &Indexer{
107
+
escli: escli,
108
+
profileIndex: config.ProfileIndex,
109
+
postIndex: config.PostIndex,
110
+
db: db,
111
+
relayhost: config.RelayHost,
112
+
relayXRPC: relayXRPC,
113
+
dir: dir,
114
+
logger: logger,
115
+
enableRepoDiscovery: config.DiscoverRepos,
116
+
117
+
indexLimiter: limiter,
118
+
profileQueue: make(chan *ProfileIndexJob, 1000),
119
+
postQueue: make(chan *PostIndexJob, 1000),
120
+
pagerankQueue: make(chan *PagerankIndexJob, 1000),
121
+
}
122
+
123
+
bfstore := backfill.NewGormstore(db)
124
+
opts := backfill.DefaultBackfillOptions()
125
+
126
+
if config.RelaySyncRateLimit > 0 {
127
+
opts.SyncRequestsPerSecond = config.RelaySyncRateLimit
128
+
opts.ParallelBackfills = 2 * config.RelaySyncRateLimit
129
+
} else {
130
+
opts.SyncRequestsPerSecond = 8
131
+
}
132
+
133
+
opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", relayHTTP)
134
+
if config.IndexMaxConcurrency > 0 {
135
+
opts.ParallelRecordCreates = config.IndexMaxConcurrency
136
+
} else {
137
+
opts.ParallelRecordCreates = 20
138
+
}
139
+
opts.NSIDFilter = "app.bsky."
140
+
bf := backfill.NewBackfiller(
141
+
"search",
142
+
bfstore,
143
+
idx.handleCreateOrUpdate,
144
+
idx.handleCreateOrUpdate,
145
+
idx.handleDelete,
146
+
opts,
147
+
)
148
+
149
+
idx.bfs = bfstore
150
+
idx.bf = bf
151
+
152
+
return idx, nil
153
+
}
154
+
155
+
//go:embed post_schema.json
156
+
var palomarPostSchemaJSON string
157
+
158
+
//go:embed profile_schema.json
159
+
var palomarProfileSchemaJSON string
160
+
161
+
func (idx *Indexer) EnsureIndices(ctx context.Context) error {
162
+
indices := []struct {
163
+
Name string
164
+
SchemaJSON string
165
+
}{
166
+
{Name: idx.postIndex, SchemaJSON: palomarPostSchemaJSON},
167
+
{Name: idx.profileIndex, SchemaJSON: palomarProfileSchemaJSON},
168
+
}
169
+
for _, index := range indices {
170
+
resp, err := idx.escli.Indices.Exists([]string{index.Name})
171
+
if err != nil {
172
+
return err
173
+
}
174
+
defer resp.Body.Close()
175
+
io.ReadAll(resp.Body)
176
+
if resp.IsError() && resp.StatusCode != 404 {
177
+
return fmt.Errorf("failed to check index existence")
178
+
}
179
+
if resp.StatusCode == 404 {
180
+
idx.logger.Warn("creating opensearch index", "index", index.Name)
181
+
if len(index.SchemaJSON) < 2 {
182
+
return fmt.Errorf("empty schema file (go:embed failed)")
183
+
}
184
+
buf := strings.NewReader(index.SchemaJSON)
185
+
resp, err := idx.escli.Indices.Create(
186
+
index.Name,
187
+
idx.escli.Indices.Create.WithBody(buf))
188
+
if err != nil {
189
+
return err
190
+
}
191
+
defer resp.Body.Close()
192
+
io.ReadAll(resp.Body)
193
+
if resp.IsError() {
194
+
return fmt.Errorf("failed to create index")
195
+
}
196
+
}
197
+
}
198
+
return nil
199
+
}
200
+
201
+
func (idx *Indexer) runPostIndexer(ctx context.Context) {
202
+
ctx, span := tracer.Start(ctx, "runPostIndexer")
203
+
defer span.End()
204
+
205
+
// Batch up to 1000 posts at a time, or every 5 seconds
206
+
tick := time.NewTicker(5 * time.Second)
207
+
defer tick.Stop()
208
+
209
+
var posts []*PostIndexJob
210
+
for {
211
+
select {
212
+
case <-ctx.Done():
213
+
return
214
+
case <-tick.C:
215
+
if len(posts) > 0 {
216
+
err := idx.indexLimiter.WaitN(ctx, len(posts))
217
+
if err != nil {
218
+
idx.logger.Error("failed to wait for rate limiter", "err", err)
219
+
continue
220
+
}
221
+
err = idx.indexPosts(ctx, posts)
222
+
if err != nil {
223
+
idx.logger.Error("failed to index posts", "err", err)
224
+
}
225
+
posts = posts[:0]
226
+
}
227
+
case job := <-idx.postQueue:
228
+
posts = append(posts, job)
229
+
if len(posts) >= 1000 {
230
+
err := idx.indexLimiter.WaitN(ctx, len(posts))
231
+
if err != nil {
232
+
idx.logger.Error("failed to wait for rate limiter", "err", err)
233
+
continue
234
+
}
235
+
err = idx.indexPosts(ctx, posts)
236
+
if err != nil {
237
+
idx.logger.Error("failed to index posts", "err", err)
238
+
}
239
+
posts = posts[:0]
240
+
}
241
+
}
242
+
}
243
+
}
244
+
245
+
func (idx *Indexer) runProfileIndexer(ctx context.Context) {
246
+
ctx, span := tracer.Start(ctx, "runProfileIndexer")
247
+
defer span.End()
248
+
249
+
// Batch up to 1000 profiles at a time, or every 5 seconds
250
+
tick := time.NewTicker(5 * time.Second)
251
+
defer tick.Stop()
252
+
253
+
var profiles []*ProfileIndexJob
254
+
for {
255
+
select {
256
+
case <-ctx.Done():
257
+
return
258
+
case <-tick.C:
259
+
if len(profiles) > 0 {
260
+
err := idx.indexLimiter.WaitN(ctx, len(profiles))
261
+
if err != nil {
262
+
idx.logger.Error("failed to wait for rate limiter", "err", err)
263
+
continue
264
+
}
265
+
err = idx.indexProfiles(ctx, profiles)
266
+
if err != nil {
267
+
idx.logger.Error("failed to index profiles", "err", err)
268
+
}
269
+
profiles = profiles[:0]
270
+
}
271
+
case job := <-idx.profileQueue:
272
+
profiles = append(profiles, job)
273
+
if len(profiles) >= 1000 {
274
+
err := idx.indexLimiter.WaitN(ctx, len(profiles))
275
+
if err != nil {
276
+
idx.logger.Error("failed to wait for rate limiter", "err", err)
277
+
continue
278
+
}
279
+
err = idx.indexProfiles(ctx, profiles)
280
+
if err != nil {
281
+
idx.logger.Error("failed to index profiles", "err", err)
282
+
}
283
+
profiles = profiles[:0]
284
+
}
285
+
}
286
+
}
287
+
}
288
+
289
+
func (idx *Indexer) runPagerankIndexer(ctx context.Context) {
290
+
ctx, span := tracer.Start(ctx, "runPagerankIndexer")
291
+
defer span.End()
292
+
293
+
// Batch up to 1000 pageranks at a time, or every 5 seconds
294
+
tick := time.NewTicker(5 * time.Second)
295
+
defer tick.Stop()
296
+
297
+
var pageranks []*PagerankIndexJob
298
+
for {
299
+
select {
300
+
case <-ctx.Done():
301
+
return
302
+
case <-tick.C:
303
+
if len(pageranks) > 0 {
304
+
err := idx.indexLimiter.WaitN(ctx, len(pageranks))
305
+
if err != nil {
306
+
idx.logger.Error("failed to wait for rate limiter", "err", err)
307
+
continue
308
+
}
309
+
err = idx.indexPageranks(ctx, pageranks)
310
+
if err != nil {
311
+
idx.logger.Error("failed to index pageranks", "err", err)
312
+
}
313
+
pageranks = pageranks[:0]
314
+
}
315
+
case job := <-idx.pagerankQueue:
316
+
pageranks = append(pageranks, job)
317
+
if len(pageranks) >= 1000 {
318
+
err := idx.indexLimiter.WaitN(ctx, len(pageranks))
319
+
if err != nil {
320
+
idx.logger.Error("failed to wait for rate limiter", "err", err)
321
+
continue
322
+
}
323
+
err = idx.indexPageranks(ctx, pageranks)
324
+
if err != nil {
325
+
idx.logger.Error("failed to index pageranks", "err", err)
326
+
}
327
+
pageranks = pageranks[:0]
328
+
}
329
+
}
330
+
}
331
+
}
332
+
333
+
func (idx *Indexer) deletePost(ctx context.Context, did syntax.DID, recordPath string) error {
21
334
ctx, span := tracer.Start(ctx, "deletePost")
22
335
defer span.End()
23
-
span.SetAttributes(attribute.String("repo", ident.DID.String()), attribute.String("path", recordPath))
336
+
span.SetAttributes(attribute.String("repo", did.String()), attribute.String("path", recordPath))
24
337
25
-
logger := s.logger.With("repo", ident.DID, "path", recordPath, "op", "deletePost")
338
+
logger := idx.logger.With("repo", did, "path", recordPath, "op", "deletePost")
26
339
27
340
parts := strings.SplitN(recordPath, "/", 3)
28
341
if len(parts) < 2 {
···
35
348
return nil
36
349
}
37
350
38
-
docID := fmt.Sprintf("%s_%s", ident.DID.String(), rkey)
351
+
docID := fmt.Sprintf("%s_%s", did.String(), rkey)
39
352
logger.Info("deleting post from index", "docID", docID)
40
353
req := esapi.DeleteRequest{
41
-
Index: s.postIndex,
354
+
Index: idx.postIndex,
42
355
DocumentID: docID,
43
356
Refresh: "true",
44
357
}
45
358
46
-
res, err := req.Do(ctx, s.escli)
359
+
err = idx.indexLimiter.Wait(ctx)
360
+
if err != nil {
361
+
logger.Warn("failed to wait for rate limiter", "err", err)
362
+
return err
363
+
}
364
+
res, err := req.Do(ctx, idx.escli)
47
365
if err != nil {
48
366
return fmt.Errorf("failed to delete post: %w", err)
49
367
}
···
59
377
return nil
60
378
}
61
379
62
-
func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *appbsky.FeedPost, path string, rcid cid.Cid) error {
63
-
ctx, span := tracer.Start(ctx, "indexPost")
380
+
func (idx *Indexer) indexPosts(ctx context.Context, jobs []*PostIndexJob) error {
381
+
ctx, span := tracer.Start(ctx, "indexPosts")
64
382
defer span.End()
65
-
span.SetAttributes(attribute.String("repo", ident.DID.String()), attribute.String("path", path))
383
+
span.SetAttributes(attribute.Int("num_posts", len(jobs)))
66
384
67
-
log := s.logger.With("repo", ident.DID, "path", path, "op", "indexPost")
68
-
parts := strings.SplitN(path, "/", 3)
69
-
if len(parts) < 2 {
70
-
log.Warn("skipping post record with malformed path")
71
-
return nil
72
-
}
73
-
rkey, err := syntax.ParseTID(parts[1])
74
-
if err != nil {
75
-
log.Warn("skipping post record with non-TID rkey")
76
-
return nil
77
-
}
385
+
log := idx.logger.With("op", "indexPosts")
386
+
start := time.Now()
78
387
79
-
log = log.With("rkey", rkey)
388
+
var buf bytes.Buffer
389
+
for i := range jobs {
390
+
job := jobs[i]
391
+
doc := TransformPost(job.record, job.did, job.rkey, job.rcid.String())
392
+
docBytes, err := json.Marshal(doc)
393
+
if err != nil {
394
+
log.Warn("failed to marshal post", "err", err)
395
+
return err
396
+
}
80
397
81
-
doc := TransformPost(rec, ident, rkey.String(), rcid.String())
82
-
b, err := json.Marshal(doc)
83
-
if err != nil {
84
-
return err
85
-
}
398
+
indexScript := []byte(fmt.Sprintf(`{"index":{"_id":"%s"}}%s`, doc.DocId(), "\n"))
399
+
docBytes = append(docBytes, "\n"...)
86
400
87
-
log.Debug("indexing post")
88
-
req := esapi.IndexRequest{
89
-
Index: s.postIndex,
90
-
DocumentID: doc.DocId(),
91
-
Body: bytes.NewReader(b),
401
+
buf.Grow(len(indexScript) + len(docBytes))
402
+
buf.Write(indexScript)
403
+
buf.Write(docBytes)
92
404
}
93
405
94
-
res, err := req.Do(ctx, s.escli)
406
+
log.Info("indexing posts", "num_posts", len(jobs))
407
+
408
+
res, err := idx.escli.Bulk(bytes.NewReader(buf.Bytes()), idx.escli.Bulk.WithIndex(idx.postIndex))
95
409
if err != nil {
96
-
log.Warn("failed to send indexing request", "err", err)
97
-
return fmt.Errorf("failed to send indexing request: %w", err)
410
+
log.Warn("failed to send bulk indexing request", "err", err)
411
+
return fmt.Errorf("failed to send bulk indexing request: %w", err)
98
412
}
99
413
defer res.Body.Close()
100
-
body, err := io.ReadAll(res.Body)
101
-
if err != nil {
102
-
log.Warn("failed to read indexing response", "err", err)
103
-
return fmt.Errorf("failed to read indexing response: %w", err)
104
-
}
414
+
105
415
if res.IsError() {
106
-
log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
107
-
return fmt.Errorf("indexing error, code=%d", res.StatusCode)
416
+
body, err := io.ReadAll(res.Body)
417
+
if err != nil {
418
+
log.Warn("failed to read bulk indexing response", "err", err)
419
+
return fmt.Errorf("failed to read bulk indexing response: %w", err)
420
+
}
421
+
log.Warn("opensearch bulk indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
422
+
return fmt.Errorf("bulk indexing error, code=%d", res.StatusCode)
108
423
}
424
+
425
+
log.Info("indexed posts", "num_posts", len(jobs), "duration", time.Since(start))
426
+
109
427
return nil
110
428
}
111
429
112
-
func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec *appbsky.ActorProfile, path string, rcid cid.Cid) error {
113
-
ctx, span := tracer.Start(ctx, "indexProfile")
430
+
func (idx *Indexer) indexProfiles(ctx context.Context, jobs []*ProfileIndexJob) error {
431
+
ctx, span := tracer.Start(ctx, "indexProfiles")
114
432
defer span.End()
115
-
span.SetAttributes(attribute.String("repo", ident.DID.String()), attribute.String("path", path))
433
+
span.SetAttributes(attribute.Int("num_profiles", len(jobs)))
434
+
435
+
log := idx.logger.With("op", "indexProfiles")
436
+
start := time.Now()
116
437
117
-
log := s.logger.With("repo", ident.DID, "path", path, "op", "indexProfile")
118
-
parts := strings.SplitN(path, "/", 3)
119
-
if len(parts) != 2 || parts[1] != "self" {
120
-
log.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path)
121
-
return nil
438
+
var buf bytes.Buffer
439
+
for i := range jobs {
440
+
job := jobs[i]
441
+
442
+
doc := TransformProfile(job.record, job.ident, job.rcid.String())
443
+
docBytes, err := json.Marshal(doc)
444
+
if err != nil {
445
+
log.Warn("failed to marshal profile", "err", err)
446
+
return err
447
+
}
448
+
449
+
indexScript := []byte(fmt.Sprintf(`{"index":{"_id":"%s"}}%s`, job.ident.DID.String(), "\n"))
450
+
docBytes = append(docBytes, "\n"...)
451
+
452
+
buf.Grow(len(indexScript) + len(docBytes))
453
+
buf.Write(indexScript)
454
+
buf.Write(docBytes)
122
455
}
123
456
124
-
log.Info("indexing profile", "handle", ident.Handle)
457
+
log.Info("indexing profiles", "num_profiles", len(jobs))
125
458
126
-
doc := TransformProfile(rec, ident, rcid.String())
127
-
b, err := json.Marshal(doc)
459
+
res, err := idx.escli.Bulk(bytes.NewReader(buf.Bytes()), idx.escli.Bulk.WithIndex(idx.profileIndex))
128
460
if err != nil {
129
-
return err
461
+
log.Warn("failed to send bulk indexing request", "err", err)
462
+
return fmt.Errorf("failed to send bulk indexing request: %w", err)
463
+
}
464
+
defer res.Body.Close()
465
+
466
+
if res.IsError() {
467
+
body, err := io.ReadAll(res.Body)
468
+
if err != nil {
469
+
log.Warn("failed to read bulk indexing response", "err", err)
470
+
return fmt.Errorf("failed to read bulk indexing response: %w", err)
471
+
}
472
+
log.Warn("opensearch bulk indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
473
+
return fmt.Errorf("bulk indexing error, code=%d", res.StatusCode)
130
474
}
131
-
req := esapi.IndexRequest{
132
-
Index: s.profileIndex,
133
-
DocumentID: ident.DID.String(),
134
-
Body: bytes.NewReader(b),
475
+
476
+
log.Info("indexed profiles", "num_profiles", len(jobs), "duration", time.Since(start))
477
+
478
+
return nil
479
+
}
480
+
481
+
// updateProfilePagranks uses the OpenSearch bulk API to update the pageranks for the given DIDs
482
+
func (idx *Indexer) indexPageranks(ctx context.Context, pageranks []*PagerankIndexJob) error {
483
+
ctx, span := tracer.Start(ctx, "indexPageranks")
484
+
defer span.End()
485
+
span.SetAttributes(attribute.Int("num_profiles", len(pageranks)))
486
+
487
+
log := idx.logger.With("op", "indexPageranks")
488
+
489
+
log.Info("updating profile pageranks")
490
+
491
+
var buf bytes.Buffer
492
+
for _, pr := range pageranks {
493
+
updateScript := map[string]any{
494
+
"script": map[string]any{
495
+
"source": "ctx._source.pagerank = params.pagerank",
496
+
"lang": "painless",
497
+
"params": map[string]any{
498
+
"pagerank": pr.rank,
499
+
},
500
+
},
501
+
}
502
+
updateScriptJSON, err := json.Marshal(updateScript)
503
+
if err != nil {
504
+
log.Warn("failed to marshal update script", "err", err)
505
+
return err
506
+
}
507
+
508
+
updateMetaJSON := []byte(fmt.Sprintf(`{"update":{"_id":"%s"}}%s`, pr.did.String(), "\n"))
509
+
updateScriptJSON = append(updateScriptJSON, "\n"...)
510
+
511
+
buf.Grow(len(updateMetaJSON) + len(updateScriptJSON))
512
+
buf.Write(updateMetaJSON)
513
+
buf.Write(updateScriptJSON)
135
514
}
136
515
137
-
res, err := req.Do(ctx, s.escli)
516
+
res, err := idx.escli.Bulk(bytes.NewReader(buf.Bytes()), idx.escli.Bulk.WithIndex(idx.profileIndex))
138
517
if err != nil {
139
-
log.Warn("failed to send indexing request", "err", err)
140
-
return fmt.Errorf("failed to send indexing request: %w", err)
518
+
log.Warn("failed to send bulk indexing request", "err", err)
519
+
return fmt.Errorf("failed to send bulk indexing request: %w", err)
141
520
}
142
521
defer res.Body.Close()
143
-
body, err := io.ReadAll(res.Body)
144
-
if err != nil {
145
-
log.Warn("failed to read indexing response", "err", err)
146
-
return fmt.Errorf("failed to read indexing response: %w", err)
147
-
}
522
+
148
523
if res.IsError() {
149
-
log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
150
-
return fmt.Errorf("indexing error, code=%d", res.StatusCode)
524
+
body, err := io.ReadAll(res.Body)
525
+
if err != nil {
526
+
log.Warn("failed to read bulk indexing response", "err", err)
527
+
return fmt.Errorf("failed to read bulk indexing response: %w", err)
528
+
}
529
+
log.Warn("opensearch bulk indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
530
+
return fmt.Errorf("bulk indexing error, code=%d", res.StatusCode)
151
531
}
532
+
152
533
return nil
153
534
}
154
535
155
-
func (s *Server) updateUserHandle(ctx context.Context, did syntax.DID, handle string) error {
536
+
func (idx *Indexer) updateUserHandle(ctx context.Context, did syntax.DID, handle string) error {
156
537
ctx, span := tracer.Start(ctx, "updateUserHandle")
157
538
defer span.End()
158
539
span.SetAttributes(attribute.String("repo", did.String()), attribute.String("event.handle", handle))
159
540
160
-
log := s.logger.With("repo", did.String(), "op", "updateUserHandle", "handle_from_event", handle)
541
+
log := idx.logger.With("repo", did.String(), "op", "updateUserHandle", "handle_from_event", handle)
161
542
162
-
err := s.dir.Purge(ctx, did.AtIdentifier())
543
+
err := idx.dir.Purge(ctx, did.AtIdentifier())
163
544
if err != nil {
164
545
log.Warn("failed to purge DID from directory", "err", err)
165
546
return err
166
547
}
167
548
168
-
ident, err := s.dir.LookupDID(ctx, did)
549
+
ident, err := idx.dir.LookupDID(ctx, did)
169
550
if err != nil {
170
551
log.Warn("failed to lookup DID in directory", "err", err)
171
552
return err
···
194
575
}
195
576
196
577
req := esapi.UpdateRequest{
197
-
Index: s.profileIndex,
578
+
Index: idx.profileIndex,
198
579
DocumentID: did.String(),
199
580
Body: bytes.NewReader(b),
200
581
}
201
582
202
-
res, err := req.Do(ctx, s.escli)
583
+
err = idx.indexLimiter.Wait(ctx)
584
+
if err != nil {
585
+
log.Warn("failed to wait for rate limiter", "err", err)
586
+
return err
587
+
}
588
+
res, err := req.Do(ctx, idx.escli)
203
589
if err != nil {
204
590
log.Warn("failed to send indexing request", "err", err)
205
591
return fmt.Errorf("failed to send indexing request: %w", err)
+29
-90
search/server.go
+29
-90
search/server.go
···
2
2
3
3
import (
4
4
"context"
5
-
_ "embed"
6
5
"fmt"
7
6
"io"
8
7
"log/slog"
···
11
10
"strings"
12
11
13
12
"github.com/bluesky-social/indigo/atproto/identity"
14
-
"github.com/bluesky-social/indigo/backfill"
15
-
"github.com/bluesky-social/indigo/xrpc"
16
13
17
14
"github.com/carlmjohnson/versioninfo"
18
15
"github.com/labstack/echo/v4"
···
21
18
"github.com/prometheus/client_golang/prometheus/promhttp"
22
19
slogecho "github.com/samber/slog-echo"
23
20
"go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho"
24
-
gorm "gorm.io/gorm"
21
+
22
+
_ "net/http/pprof" // For pprof in the metrics server
25
23
)
26
24
25
+
type LastSeq struct {
26
+
ID uint `gorm:"primarykey"`
27
+
Seq int64
28
+
}
29
+
30
+
type ServerConfig struct {
31
+
Logger *slog.Logger
32
+
ProfileIndex string
33
+
PostIndex string
34
+
AtlantisAddresses []string
35
+
}
36
+
27
37
type Server struct {
28
38
escli *es.Client
29
39
postIndex string
30
40
profileIndex string
31
-
db *gorm.DB
32
-
relayHost string
33
-
relayClient *xrpc.Client
34
41
dir identity.Directory
35
42
echo *echo.Echo
36
43
logger *slog.Logger
37
44
38
-
bfs *backfill.Gormstore
39
-
bf *backfill.Backfiller
40
-
41
-
enableRepoDiscovery bool
42
-
}
43
-
44
-
type LastSeq struct {
45
-
ID uint `gorm:"primarykey"`
46
-
Seq int64
45
+
Indexer *Indexer
47
46
}
48
47
49
-
type Config struct {
50
-
RelayHost string
51
-
ProfileIndex string
52
-
PostIndex string
53
-
Logger *slog.Logger
54
-
RelaySyncRateLimit int
55
-
IndexMaxConcurrency int
56
-
DiscoverRepos bool
57
-
}
58
-
59
-
func NewServer(db *gorm.DB, escli *es.Client, dir identity.Directory, config Config) (*Server, error) {
48
+
func NewServer(escli *es.Client, dir identity.Directory, config ServerConfig) (*Server, error) {
60
49
logger := config.Logger
61
50
if logger == nil {
62
51
logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
···
64
53
}))
65
54
}
66
55
67
-
logger.Info("running database migrations")
68
-
db.AutoMigrate(&LastSeq{})
69
-
db.AutoMigrate(&backfill.GormDBJob{})
70
-
71
-
relayws := config.RelayHost
72
-
if !strings.HasPrefix(relayws, "ws") {
73
-
return nil, fmt.Errorf("specified relay host must include 'ws://' or 'wss://'")
56
+
serv := Server{
57
+
escli: escli,
58
+
postIndex: config.PostIndex,
59
+
profileIndex: config.ProfileIndex,
60
+
dir: dir,
61
+
logger: logger,
74
62
}
75
63
76
-
relayhttp := strings.Replace(relayws, "ws", "http", 1)
77
-
relayClient := &xrpc.Client{
78
-
Host: relayhttp,
79
-
}
80
-
81
-
s := &Server{
82
-
escli: escli,
83
-
profileIndex: config.ProfileIndex,
84
-
postIndex: config.PostIndex,
85
-
db: db,
86
-
relayHost: config.RelayHost, // NOTE: the original URL, not 'relayhttp'
87
-
relayClient: relayClient,
88
-
dir: dir,
89
-
logger: logger,
90
-
enableRepoDiscovery: config.DiscoverRepos,
91
-
}
92
-
93
-
bfstore := backfill.NewGormstore(db)
94
-
opts := backfill.DefaultBackfillOptions()
95
-
if config.RelaySyncRateLimit > 0 {
96
-
opts.SyncRequestsPerSecond = config.RelaySyncRateLimit
97
-
opts.ParallelBackfills = 2 * config.RelaySyncRateLimit
98
-
} else {
99
-
opts.SyncRequestsPerSecond = 8
100
-
}
101
-
opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", relayhttp)
102
-
if config.IndexMaxConcurrency > 0 {
103
-
opts.ParallelRecordCreates = config.IndexMaxConcurrency
104
-
} else {
105
-
opts.ParallelRecordCreates = 20
106
-
}
107
-
opts.NSIDFilter = "app.bsky."
108
-
bf := backfill.NewBackfiller(
109
-
"search",
110
-
bfstore,
111
-
s.handleCreateOrUpdate,
112
-
s.handleCreateOrUpdate,
113
-
s.handleDelete,
114
-
opts,
115
-
)
116
-
117
-
s.bfs = bfstore
118
-
s.bf = bf
119
-
120
-
return s, nil
64
+
return &serv, nil
121
65
}
122
66
123
-
//go:embed post_schema.json
124
-
var palomarPostSchemaJSON string
125
-
126
-
//go:embed profile_schema.json
127
-
var palomarProfileSchemaJSON string
128
-
129
67
func (s *Server) EnsureIndices(ctx context.Context) error {
130
68
131
69
indices := []struct {
···
178
116
Message string `json:"msg,omitempty"`
179
117
}
180
118
181
-
func (s *Server) handleHealthCheck(c echo.Context) error {
182
-
if err := s.db.Exec("SELECT 1").Error; err != nil {
183
-
s.logger.Error("healthcheck can't connect to database", "err", err)
184
-
return c.JSON(500, HealthStatus{Status: "error", Version: versioninfo.Short(), Message: "can't connect to database"})
119
+
func (a *Server) handleHealthCheck(c echo.Context) error {
120
+
if a.Indexer != nil {
121
+
if err := a.Indexer.db.Exec("SELECT 1").Error; err != nil {
122
+
a.logger.Error("healthcheck can't connect to database", "err", err)
123
+
return c.JSON(500, HealthStatus{Status: "error", Version: versioninfo.Short(), Message: "can't connect to database"})
124
+
}
185
125
}
186
126
return c.JSON(200, HealthStatus{Status: "ok", Version: versioninfo.Short()})
187
127
}
···
212
152
e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
213
153
e.GET("/xrpc/app.bsky.unspecced.searchPostsSkeleton", s.handleSearchPostsSkeleton)
214
154
e.GET("/xrpc/app.bsky.unspecced.searchActorsSkeleton", s.handleSearchActorsSkeleton)
215
-
e.GET("/xrpc/app.bsky.unspecced.indexRepos", s.handleIndexRepos)
216
155
s.echo = e
217
156
218
157
s.logger.Info("starting search API daemon", "bind", listen)
+4
-4
search/transform.go
+4
-4
search/transform.go
···
102
102
}
103
103
}
104
104
105
-
func TransformPost(post *appbsky.FeedPost, ident *identity.Identity, rkey, cid string) PostDoc {
105
+
func TransformPost(post *appbsky.FeedPost, did syntax.DID, rkey, cid string) PostDoc {
106
106
altText := []string{}
107
107
if post.Embed != nil && post.Embed.EmbedImages != nil {
108
108
for _, img := range post.Embed.EmbedImages.Images {
···
145
145
if post.Embed != nil && post.Embed.EmbedRecordWithMedia != nil {
146
146
embedATURI = &post.Embed.EmbedRecordWithMedia.Record.Record.Uri
147
147
}
148
-
var embedImgCount int = 0
148
+
var embedImgCount int
149
149
var embedImgAltText []string
150
150
var embedImgAltTextJA []string
151
151
if post.Embed != nil && post.Embed.EmbedImages != nil {
···
178
178
179
179
doc := PostDoc{
180
180
DocIndexTs: syntax.DatetimeNow().String(),
181
-
DID: ident.DID.String(),
181
+
DID: did.String(),
182
182
RecordRkey: rkey,
183
183
RecordCID: cid,
184
184
Text: post.Text,
···
210
210
s := dt.String()
211
211
doc.CreatedAt = &s
212
212
} else {
213
-
slog.Warn("rejecting future post CreatedAt", "datetime", dt.String(), "did", ident.DID.String(), "rkey", rkey)
213
+
slog.Warn("rejecting future post CreatedAt", "datetime", dt.String(), "did", did.String(), "rkey", rkey)
214
214
s := syntax.DatetimeNow().String()
215
215
doc.CreatedAt = &s
216
216
}
+1
-1
search/transform_test.go
+1
-1
search/transform_test.go
···
108
108
Handle: syntax.Handle(row.Handle),
109
109
DID: syntax.DID(row.DID),
110
110
}
111
-
doc := TransformPost(row.PostRecord, &repo, row.Rkey, row.Cid)
111
+
doc := TransformPost(row.PostRecord, repo.DID, row.Rkey, row.Cid)
112
112
doc.DocIndexTs = "2006-01-02T15:04:05.000Z"
113
113
assert.Equal(row.PostDoc, doc)
114
114
assert.Equal(row.DocId, doc.DocId())