1package main
2
3import (
4 "context"
5 "crypto/tls"
6 "encoding/json"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 "os"
12 "strings"
13 "time"
14
15 _ "github.com/joho/godotenv/autoload"
16 "go.opentelemetry.io/otel"
17 "go.opentelemetry.io/otel/attribute"
18 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
19 "go.opentelemetry.io/otel/sdk/resource"
20 tracesdk "go.opentelemetry.io/otel/sdk/trace"
21 semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
22 "golang.org/x/time/rate"
23
24 "github.com/bluesky-social/indigo/atproto/identity"
25 "github.com/bluesky-social/indigo/search"
26 "github.com/bluesky-social/indigo/util/cliutil"
27
28 "github.com/carlmjohnson/versioninfo"
29 es "github.com/opensearch-project/opensearch-go/v2"
30 cli "github.com/urfave/cli/v2"
31)
32
33func main() {
34 if err := run(os.Args); err != nil {
35 slog.Error("exiting", "err", err)
36 os.Exit(-1)
37 }
38}
39
40func run(args []string) error {
41
42 app := cli.App{
43 Name: "palomar",
44 Usage: "search indexing and query service (using ES or OS)",
45 Version: versioninfo.Short(),
46 }
47
48 app.Flags = []cli.Flag{
49 &cli.StringFlag{
50 Name: "elastic-cert-file",
51 Usage: "certificate file path",
52 EnvVars: []string{"ES_CERT_FILE", "ELASTIC_CERT_FILE"},
53 },
54 &cli.BoolFlag{
55 Name: "elastic-insecure-ssl",
56 Usage: "if true, disable SSL cert validation",
57 EnvVars: []string{"ES_INSECURE_SSL"},
58 },
59 &cli.StringFlag{
60 Name: "elastic-username",
61 Usage: "elasticsearch username",
62 Value: "admin",
63 EnvVars: []string{"ES_USERNAME", "ELASTIC_USERNAME"},
64 },
65 &cli.StringFlag{
66 Name: "elastic-password",
67 Usage: "elasticsearch password",
68 Value: "0penSearch-Pal0mar",
69 EnvVars: []string{"ES_PASSWORD", "ELASTIC_PASSWORD"},
70 },
71 &cli.StringFlag{
72 Name: "elastic-hosts",
73 Usage: "elasticsearch hosts (schema/host/port)",
74 Value: "http://localhost:9200",
75 EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS", "OPENSEARCH_URL", "ELASTICSEARCH_URL"},
76 },
77 &cli.StringFlag{
78 Name: "es-post-index",
79 Usage: "ES index for 'post' documents",
80 Value: "palomar_post",
81 EnvVars: []string{"ES_POST_INDEX"},
82 },
83 &cli.StringFlag{
84 Name: "es-profile-index",
85 Usage: "ES index for 'profile' documents",
86 Value: "palomar_profile",
87 EnvVars: []string{"ES_PROFILE_INDEX"},
88 },
89 &cli.StringFlag{
90 Name: "atp-relay-host",
91 Usage: "hostname and port of Relay to subscribe to",
92 Value: "wss://bsky.network",
93 EnvVars: []string{"ATP_RELAY_HOST", "ATP_BGS_HOST"},
94 },
95 &cli.StringFlag{
96 Name: "atp-plc-host",
97 Usage: "method, hostname, and port of PLC registry",
98 Value: "https://plc.directory",
99 EnvVars: []string{"ATP_PLC_HOST"},
100 },
101 &cli.IntFlag{
102 Name: "max-metadb-connections",
103 EnvVars: []string{"MAX_METADB_CONNECTIONS"},
104 Value: 40,
105 },
106 &cli.StringFlag{
107 Name: "log-level",
108 Usage: "log level (debug, info, warn, error)",
109 Value: "info",
110 EnvVars: []string{"GOLOG_LOG_LEVEL", "LOG_LEVEL"},
111 },
112 }
113
114 app.Commands = []*cli.Command{
115 runCmd,
116 elasticCheckCmd,
117 searchPostCmd,
118 searchProfileCmd,
119 }
120
121 return app.Run(args)
122}
123
124var runCmd = &cli.Command{
125 Name: "run",
126 Usage: "combined indexing+query server",
127 Flags: []cli.Flag{
128 &cli.StringFlag{
129 Name: "database-url",
130 Value: "sqlite://data/palomar/search.db",
131 EnvVars: []string{"DATABASE_URL"},
132 },
133 &cli.BoolFlag{
134 Name: "readonly",
135 EnvVars: []string{"PALOMAR_READONLY", "READONLY"},
136 },
137 &cli.StringFlag{
138 Name: "bind",
139 Usage: "IP or address, and port, to listen on for HTTP APIs",
140 Value: ":3999",
141 EnvVars: []string{"PALOMAR_BIND"},
142 },
143 &cli.StringFlag{
144 Name: "metrics-listen",
145 Usage: "IP or address, and port, to listen on for metrics APIs",
146 Value: ":3998",
147 EnvVars: []string{"PALOMAR_METRICS_LISTEN"},
148 },
149 &cli.IntFlag{
150 Name: "relay-sync-rate-limit",
151 Usage: "max repo sync (checkout) requests per second to upstream (Relay)",
152 Value: 8,
153 EnvVars: []string{"PALOMAR_RELAY_SYNC_RATE_LIMIT", "PALOMAR_BGS_SYNC_RATE_LIMIT"},
154 },
155 &cli.IntFlag{
156 Name: "index-max-concurrency",
157 Usage: "max number of concurrent index requests (HTTP POST) to search index",
158 Value: 20,
159 EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"},
160 },
161 &cli.IntFlag{
162 Name: "indexing-rate-limit",
163 Usage: "max number of documents per second to index",
164 Value: 50_000,
165 EnvVars: []string{"PALOMAR_INDEXING_RATE_LIMIT"},
166 },
167 &cli.IntFlag{
168 Name: "plc-rate-limit",
169 Usage: "max number of requests per second to PLC registry",
170 Value: 100,
171 EnvVars: []string{"PALOMAR_PLC_RATE_LIMIT"},
172 },
173 &cli.BoolFlag{
174 Name: "discover-repos",
175 Usage: "if true, discover repositories from the Relay",
176 EnvVars: []string{"PALOMAR_DISCOVER_REPOS"},
177 Value: false,
178 },
179 &cli.StringFlag{
180 Name: "pagerank-file",
181 EnvVars: []string{"PAGERANK_FILE"},
182 },
183 &cli.StringFlag{
184 Name: "bulk-posts-file",
185 EnvVars: []string{"BULK_POSTS_FILE"},
186 },
187 &cli.StringFlag{
188 Name: "bulk-profiles-file",
189 EnvVars: []string{"BULK_PROFILES_FILE"},
190 },
191 },
192 Action: func(cctx *cli.Context) error {
193 logLevel := slog.LevelInfo
194 switch cctx.String("log-level") {
195 case "debug":
196 logLevel = slog.LevelDebug
197 case "info":
198 logLevel = slog.LevelInfo
199 case "warn":
200 logLevel = slog.LevelWarn
201 case "error":
202 logLevel = slog.LevelError
203 }
204
205 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
206 Level: logLevel,
207 AddSource: true,
208 }))
209 slog.SetDefault(logger)
210
211 readonly := cctx.Bool("readonly")
212
213 // Enable OTLP HTTP exporter
214 // For relevant environment variables:
215 // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables
216 // At a minimum, you need to set
217 // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
218 if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" {
219 slog.Info("setting up trace exporter", "endpoint", ep)
220 ctx, cancel := context.WithCancel(context.Background())
221 defer cancel()
222
223 exp, err := otlptracehttp.New(ctx)
224 if err != nil {
225 log.Fatal("failed to create trace exporter", "error", err)
226 }
227 defer func() {
228 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
229 defer cancel()
230 if err := exp.Shutdown(ctx); err != nil {
231 slog.Error("failed to shutdown trace exporter", "error", err)
232 }
233 }()
234
235 tp := tracesdk.NewTracerProvider(
236 tracesdk.WithBatcher(exp),
237 tracesdk.WithResource(resource.NewWithAttributes(
238 semconv.SchemaURL,
239 semconv.ServiceNameKey.String("palomar"),
240 attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog
241 attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others
242 attribute.Int64("ID", 1),
243 )),
244 )
245 otel.SetTracerProvider(tp)
246 }
247
248 escli, err := createEsClient(cctx)
249 if err != nil {
250 return fmt.Errorf("failed to get elasticsearch: %w", err)
251 }
252
253 base := identity.BaseDirectory{
254 PLCURL: cctx.String("atp-plc-host"),
255 HTTPClient: http.Client{
256 Timeout: time.Second * 15,
257 },
258 PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1),
259 TryAuthoritativeDNS: true,
260 SkipDNSDomainSuffixes: []string{".bsky.social"},
261 }
262 dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2, time.Minute*5)
263
264 apiConfig := search.ServerConfig{
265 Logger: logger,
266 ProfileIndex: cctx.String("es-profile-index"),
267 PostIndex: cctx.String("es-post-index"),
268 }
269
270 srv, err := search.NewServer(escli, &dir, apiConfig)
271 if err != nil {
272 return err
273 }
274
275 // Configure the indexer if we're not in readonly mode
276 if !readonly {
277 db, err := cliutil.SetupDatabase(cctx.String("database-url"), cctx.Int("max-metadb-connections"))
278 if err != nil {
279 return fmt.Errorf("failed to set up database: %w", err)
280 }
281
282 indexerConfig := search.IndexerConfig{
283 RelayHost: cctx.String("atp-relay-host"),
284 ProfileIndex: cctx.String("es-profile-index"),
285 PostIndex: cctx.String("es-post-index"),
286 Logger: logger,
287 RelaySyncRateLimit: cctx.Int("relay-sync-rate-limit"),
288 IndexMaxConcurrency: cctx.Int("index-max-concurrency"),
289 DiscoverRepos: cctx.Bool("discover-repos"),
290 IndexingRateLimit: cctx.Int("indexing-rate-limit"),
291 }
292
293 idx, err := search.NewIndexer(db, escli, &dir, indexerConfig)
294 if err != nil {
295 return fmt.Errorf("failed to set up indexer: %w", err)
296 }
297
298 srv.Indexer = idx
299 }
300
301 go func() {
302 if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil {
303 slog.Error("failed to start metrics endpoint", "error", err)
304 panic(fmt.Errorf("failed to start metrics endpoint: %w", err))
305 }
306 }()
307
308 go func() {
309 srv.RunAPI(cctx.String("bind"))
310 }()
311
312 // If we're in readonly mode, just block forever
313 if readonly {
314 select {}
315 } else if cctx.String("pagerank-file") != "" && srv.Indexer != nil {
316 // If we're not in readonly mode, and we have a pagerank file, update pageranks
317 ctx := context.Background()
318 if err := srv.Indexer.BulkIndexPageranks(ctx, cctx.String("pagerank-file")); err != nil {
319 return fmt.Errorf("failed to update pageranks: %w", err)
320 }
321 } else if cctx.String("bulk-posts-file") != "" && srv.Indexer != nil {
322 // If we're not in readonly mode, and we have a bulk posts file, index posts
323 ctx := context.Background()
324 if err := srv.Indexer.BulkIndexPosts(ctx, cctx.String("bulk-posts-file")); err != nil {
325 return fmt.Errorf("failed to bulk index posts: %w", err)
326 }
327 } else if cctx.String("bulk-profiles-file") != "" && srv.Indexer != nil {
328 // If we're not in readonly mode, and we have a bulk profiles file, index profiles
329 ctx := context.Background()
330 if err := srv.Indexer.BulkIndexProfiles(ctx, cctx.String("bulk-profiles-file")); err != nil {
331 return fmt.Errorf("failed to bulk index profiles: %w", err)
332 }
333 } else if srv.Indexer != nil {
334 // Otherwise, just run the indexer
335 ctx := context.Background()
336 if err := srv.Indexer.EnsureIndices(ctx); err != nil {
337 return fmt.Errorf("failed to create opensearch indices: %w", err)
338 }
339 if err := srv.Indexer.RunIndexer(ctx); err != nil {
340 return fmt.Errorf("failed to run indexer: %w", err)
341 }
342 }
343
344 return nil
345 },
346}
347
348var elasticCheckCmd = &cli.Command{
349 Name: "elastic-check",
350 Flags: []cli.Flag{},
351 Action: func(cctx *cli.Context) error {
352 escli, err := createEsClient(cctx)
353 if err != nil {
354 return err
355 }
356
357 // NOTE: this extra info check is redundant; createEsClient() already made this call and logged results
358 inf, err := escli.Info()
359 if err != nil {
360 return fmt.Errorf("failed to get info: %w", err)
361 }
362 defer inf.Body.Close()
363 if inf.IsError() {
364 return fmt.Errorf("failed to get info")
365 }
366 slog.Info("opensearch client connected", "client_info", inf)
367
368 resp, err := escli.Indices.Exists([]string{cctx.String("es-profile-index"), cctx.String("es-post-index")})
369 if err != nil {
370 return fmt.Errorf("failed to check index existence: %w", err)
371 }
372 defer resp.Body.Close()
373 if inf.IsError() {
374 return fmt.Errorf("failed to check index existence")
375 }
376 slog.Info("index existence", "resp", resp)
377
378 return nil
379
380 },
381}
382
383func printHits(resp *search.EsSearchResponse) {
384 fmt.Printf("%d hits in %d\n", len(resp.Hits.Hits), resp.Took)
385 for _, hit := range resp.Hits.Hits {
386 b, _ := json.Marshal(hit.Source)
387 fmt.Println(string(b))
388 }
389 return
390}
391
392var searchPostCmd = &cli.Command{
393 Name: "search-post",
394 Usage: "run a simple query against posts index",
395 Action: func(cctx *cli.Context) error {
396 escli, err := createEsClient(cctx)
397 if err != nil {
398 return err
399 }
400 res, err := search.DoSearchPosts(
401 context.Background(),
402 identity.DefaultDirectory(), // TODO: parse PLC arg
403 escli,
404 cctx.String("es-post-index"),
405 &search.PostSearchParams{
406 Query: strings.Join(cctx.Args().Slice(), " "),
407 Offset: 0,
408 Size: 20,
409 },
410 )
411 if err != nil {
412 return err
413 }
414 printHits(res)
415 return nil
416 },
417}
418
419var searchProfileCmd = &cli.Command{
420 Name: "search-profile",
421 Usage: "run a simple query against posts index",
422 Flags: []cli.Flag{
423 &cli.BoolFlag{
424 Name: "typeahead",
425 },
426 },
427 Action: func(cctx *cli.Context) error {
428 escli, err := createEsClient(cctx)
429 if err != nil {
430 return err
431 }
432 if cctx.Bool("typeahead") {
433 res, err := search.DoSearchProfilesTypeahead(
434 context.Background(),
435 escli,
436 cctx.String("es-profile-index"),
437 &search.ActorSearchParams{
438 Query: strings.Join(cctx.Args().Slice(), " "),
439 Size: 10,
440 },
441 )
442 if err != nil {
443 return err
444 }
445 printHits(res)
446 } else {
447 res, err := search.DoSearchProfiles(
448 context.Background(),
449 identity.DefaultDirectory(), // TODO: parse PLC arg
450 escli,
451 cctx.String("es-profile-index"),
452 &search.ActorSearchParams{
453 Query: strings.Join(cctx.Args().Slice(), " "),
454 Offset: 0,
455 Size: 20,
456 },
457 )
458 if err != nil {
459 return err
460 }
461 printHits(res)
462 }
463 return nil
464 },
465}
466
467func createEsClient(cctx *cli.Context) (*es.Client, error) {
468
469 addrs := []string{}
470 if hosts := cctx.String("elastic-hosts"); hosts != "" {
471 addrs = strings.Split(hosts, ",")
472 }
473
474 certfi := cctx.String("elastic-cert-file")
475 var cert []byte
476 if certfi != "" {
477 b, err := os.ReadFile(certfi)
478 if err != nil {
479 return nil, err
480 }
481
482 cert = b
483 }
484
485 insecure := cctx.Bool("elastic-insecure-ssl")
486
487 cfg := es.Config{
488 Addresses: addrs,
489 Username: cctx.String("elastic-username"),
490 Password: cctx.String("elastic-password"),
491 CACert: cert,
492 Transport: &http.Transport{
493 MaxIdleConnsPerHost: 20,
494 TLSClientConfig: &tls.Config{
495 InsecureSkipVerify: insecure,
496 },
497 },
498 }
499
500 escli, err := es.NewClient(cfg)
501 if err != nil {
502 return nil, fmt.Errorf("failed to set up client: %w", err)
503 }
504
505 info, err := escli.Info()
506 if err != nil {
507 return nil, fmt.Errorf("cannot get escli info: %w", err)
508 }
509 defer info.Body.Close()
510 slog.Debug("opensearch client initialized", "info", info)
511
512 return escli, nil
513}