porting all github actions from bluesky-social/indigo to tangled CI
at main 14 kB view raw
1package main 2 3import ( 4 "context" 5 "crypto/tls" 6 "encoding/json" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "os" 12 "strings" 13 "time" 14 15 _ "github.com/joho/godotenv/autoload" 16 "go.opentelemetry.io/otel" 17 "go.opentelemetry.io/otel/attribute" 18 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 19 "go.opentelemetry.io/otel/sdk/resource" 20 tracesdk "go.opentelemetry.io/otel/sdk/trace" 21 semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 22 "golang.org/x/time/rate" 23 24 "github.com/bluesky-social/indigo/atproto/identity" 25 "github.com/bluesky-social/indigo/search" 26 "github.com/bluesky-social/indigo/util/cliutil" 27 28 "github.com/carlmjohnson/versioninfo" 29 es "github.com/opensearch-project/opensearch-go/v2" 30 cli "github.com/urfave/cli/v2" 31) 32 33func main() { 34 if err := run(os.Args); err != nil { 35 slog.Error("exiting", "err", err) 36 os.Exit(-1) 37 } 38} 39 40func run(args []string) error { 41 42 app := cli.App{ 43 Name: "palomar", 44 Usage: "search indexing and query service (using ES or OS)", 45 Version: versioninfo.Short(), 46 } 47 48 app.Flags = []cli.Flag{ 49 &cli.StringFlag{ 50 Name: "elastic-cert-file", 51 Usage: "certificate file path", 52 EnvVars: []string{"ES_CERT_FILE", "ELASTIC_CERT_FILE"}, 53 }, 54 &cli.BoolFlag{ 55 Name: "elastic-insecure-ssl", 56 Usage: "if true, disable SSL cert validation", 57 EnvVars: []string{"ES_INSECURE_SSL"}, 58 }, 59 &cli.StringFlag{ 60 Name: "elastic-username", 61 Usage: "elasticsearch username", 62 Value: "admin", 63 EnvVars: []string{"ES_USERNAME", "ELASTIC_USERNAME"}, 64 }, 65 &cli.StringFlag{ 66 Name: "elastic-password", 67 Usage: "elasticsearch password", 68 Value: "0penSearch-Pal0mar", 69 EnvVars: []string{"ES_PASSWORD", "ELASTIC_PASSWORD"}, 70 }, 71 &cli.StringFlag{ 72 Name: "elastic-hosts", 73 Usage: "elasticsearch hosts (schema/host/port)", 74 Value: "http://localhost:9200", 75 EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS", "OPENSEARCH_URL", "ELASTICSEARCH_URL"}, 76 }, 77 &cli.StringFlag{ 78 Name: "es-post-index", 79 Usage: "ES index for 'post' documents", 80 Value: "palomar_post", 81 EnvVars: []string{"ES_POST_INDEX"}, 82 }, 83 &cli.StringFlag{ 84 Name: "es-profile-index", 85 Usage: "ES index for 'profile' documents", 86 Value: "palomar_profile", 87 EnvVars: []string{"ES_PROFILE_INDEX"}, 88 }, 89 &cli.StringFlag{ 90 Name: "atp-relay-host", 91 Usage: "hostname and port of Relay to subscribe to", 92 Value: "wss://bsky.network", 93 EnvVars: []string{"ATP_RELAY_HOST", "ATP_BGS_HOST"}, 94 }, 95 &cli.StringFlag{ 96 Name: "atp-plc-host", 97 Usage: "method, hostname, and port of PLC registry", 98 Value: "https://plc.directory", 99 EnvVars: []string{"ATP_PLC_HOST"}, 100 }, 101 &cli.IntFlag{ 102 Name: "max-metadb-connections", 103 EnvVars: []string{"MAX_METADB_CONNECTIONS"}, 104 Value: 40, 105 }, 106 &cli.StringFlag{ 107 Name: "log-level", 108 Usage: "log level (debug, info, warn, error)", 109 Value: "info", 110 EnvVars: []string{"GOLOG_LOG_LEVEL", "LOG_LEVEL"}, 111 }, 112 } 113 114 app.Commands = []*cli.Command{ 115 runCmd, 116 elasticCheckCmd, 117 searchPostCmd, 118 searchProfileCmd, 119 } 120 121 return app.Run(args) 122} 123 124var runCmd = &cli.Command{ 125 Name: "run", 126 Usage: "combined indexing+query server", 127 Flags: []cli.Flag{ 128 &cli.StringFlag{ 129 Name: "database-url", 130 Value: "sqlite://data/palomar/search.db", 131 EnvVars: []string{"DATABASE_URL"}, 132 }, 133 &cli.BoolFlag{ 134 Name: "readonly", 135 EnvVars: []string{"PALOMAR_READONLY", "READONLY"}, 136 }, 137 &cli.StringFlag{ 138 Name: "bind", 139 Usage: "IP or address, and port, to listen on for HTTP APIs", 140 Value: ":3999", 141 EnvVars: []string{"PALOMAR_BIND"}, 142 }, 143 &cli.StringFlag{ 144 Name: "metrics-listen", 145 Usage: "IP or address, and port, to listen on for metrics APIs", 146 Value: ":3998", 147 EnvVars: []string{"PALOMAR_METRICS_LISTEN"}, 148 }, 149 &cli.IntFlag{ 150 Name: "relay-sync-rate-limit", 151 Usage: "max repo sync (checkout) requests per second to upstream (Relay)", 152 Value: 8, 153 EnvVars: []string{"PALOMAR_RELAY_SYNC_RATE_LIMIT", "PALOMAR_BGS_SYNC_RATE_LIMIT"}, 154 }, 155 &cli.IntFlag{ 156 Name: "index-max-concurrency", 157 Usage: "max number of concurrent index requests (HTTP POST) to search index", 158 Value: 20, 159 EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"}, 160 }, 161 &cli.IntFlag{ 162 Name: "indexing-rate-limit", 163 Usage: "max number of documents per second to index", 164 Value: 50_000, 165 EnvVars: []string{"PALOMAR_INDEXING_RATE_LIMIT"}, 166 }, 167 &cli.IntFlag{ 168 Name: "plc-rate-limit", 169 Usage: "max number of requests per second to PLC registry", 170 Value: 100, 171 EnvVars: []string{"PALOMAR_PLC_RATE_LIMIT"}, 172 }, 173 &cli.BoolFlag{ 174 Name: "discover-repos", 175 Usage: "if true, discover repositories from the Relay", 176 EnvVars: []string{"PALOMAR_DISCOVER_REPOS"}, 177 Value: false, 178 }, 179 &cli.StringFlag{ 180 Name: "pagerank-file", 181 EnvVars: []string{"PAGERANK_FILE"}, 182 }, 183 &cli.StringFlag{ 184 Name: "bulk-posts-file", 185 EnvVars: []string{"BULK_POSTS_FILE"}, 186 }, 187 &cli.StringFlag{ 188 Name: "bulk-profiles-file", 189 EnvVars: []string{"BULK_PROFILES_FILE"}, 190 }, 191 }, 192 Action: func(cctx *cli.Context) error { 193 logLevel := slog.LevelInfo 194 switch cctx.String("log-level") { 195 case "debug": 196 logLevel = slog.LevelDebug 197 case "info": 198 logLevel = slog.LevelInfo 199 case "warn": 200 logLevel = slog.LevelWarn 201 case "error": 202 logLevel = slog.LevelError 203 } 204 205 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 206 Level: logLevel, 207 AddSource: true, 208 })) 209 slog.SetDefault(logger) 210 211 readonly := cctx.Bool("readonly") 212 213 // Enable OTLP HTTP exporter 214 // For relevant environment variables: 215 // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 216 // At a minimum, you need to set 217 // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 218 if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { 219 slog.Info("setting up trace exporter", "endpoint", ep) 220 ctx, cancel := context.WithCancel(context.Background()) 221 defer cancel() 222 223 exp, err := otlptracehttp.New(ctx) 224 if err != nil { 225 log.Fatal("failed to create trace exporter", "error", err) 226 } 227 defer func() { 228 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 229 defer cancel() 230 if err := exp.Shutdown(ctx); err != nil { 231 slog.Error("failed to shutdown trace exporter", "error", err) 232 } 233 }() 234 235 tp := tracesdk.NewTracerProvider( 236 tracesdk.WithBatcher(exp), 237 tracesdk.WithResource(resource.NewWithAttributes( 238 semconv.SchemaURL, 239 semconv.ServiceNameKey.String("palomar"), 240 attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog 241 attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others 242 attribute.Int64("ID", 1), 243 )), 244 ) 245 otel.SetTracerProvider(tp) 246 } 247 248 escli, err := createEsClient(cctx) 249 if err != nil { 250 return fmt.Errorf("failed to get elasticsearch: %w", err) 251 } 252 253 base := identity.BaseDirectory{ 254 PLCURL: cctx.String("atp-plc-host"), 255 HTTPClient: http.Client{ 256 Timeout: time.Second * 15, 257 }, 258 PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1), 259 TryAuthoritativeDNS: true, 260 SkipDNSDomainSuffixes: []string{".bsky.social"}, 261 } 262 dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2, time.Minute*5) 263 264 apiConfig := search.ServerConfig{ 265 Logger: logger, 266 ProfileIndex: cctx.String("es-profile-index"), 267 PostIndex: cctx.String("es-post-index"), 268 } 269 270 srv, err := search.NewServer(escli, &dir, apiConfig) 271 if err != nil { 272 return err 273 } 274 275 // Configure the indexer if we're not in readonly mode 276 if !readonly { 277 db, err := cliutil.SetupDatabase(cctx.String("database-url"), cctx.Int("max-metadb-connections")) 278 if err != nil { 279 return fmt.Errorf("failed to set up database: %w", err) 280 } 281 282 indexerConfig := search.IndexerConfig{ 283 RelayHost: cctx.String("atp-relay-host"), 284 ProfileIndex: cctx.String("es-profile-index"), 285 PostIndex: cctx.String("es-post-index"), 286 Logger: logger, 287 RelaySyncRateLimit: cctx.Int("relay-sync-rate-limit"), 288 IndexMaxConcurrency: cctx.Int("index-max-concurrency"), 289 DiscoverRepos: cctx.Bool("discover-repos"), 290 IndexingRateLimit: cctx.Int("indexing-rate-limit"), 291 } 292 293 idx, err := search.NewIndexer(db, escli, &dir, indexerConfig) 294 if err != nil { 295 return fmt.Errorf("failed to set up indexer: %w", err) 296 } 297 298 srv.Indexer = idx 299 } 300 301 go func() { 302 if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil { 303 slog.Error("failed to start metrics endpoint", "error", err) 304 panic(fmt.Errorf("failed to start metrics endpoint: %w", err)) 305 } 306 }() 307 308 go func() { 309 srv.RunAPI(cctx.String("bind")) 310 }() 311 312 // If we're in readonly mode, just block forever 313 if readonly { 314 select {} 315 } else if cctx.String("pagerank-file") != "" && srv.Indexer != nil { 316 // If we're not in readonly mode, and we have a pagerank file, update pageranks 317 ctx := context.Background() 318 if err := srv.Indexer.BulkIndexPageranks(ctx, cctx.String("pagerank-file")); err != nil { 319 return fmt.Errorf("failed to update pageranks: %w", err) 320 } 321 } else if cctx.String("bulk-posts-file") != "" && srv.Indexer != nil { 322 // If we're not in readonly mode, and we have a bulk posts file, index posts 323 ctx := context.Background() 324 if err := srv.Indexer.BulkIndexPosts(ctx, cctx.String("bulk-posts-file")); err != nil { 325 return fmt.Errorf("failed to bulk index posts: %w", err) 326 } 327 } else if cctx.String("bulk-profiles-file") != "" && srv.Indexer != nil { 328 // If we're not in readonly mode, and we have a bulk profiles file, index profiles 329 ctx := context.Background() 330 if err := srv.Indexer.BulkIndexProfiles(ctx, cctx.String("bulk-profiles-file")); err != nil { 331 return fmt.Errorf("failed to bulk index profiles: %w", err) 332 } 333 } else if srv.Indexer != nil { 334 // Otherwise, just run the indexer 335 ctx := context.Background() 336 if err := srv.Indexer.EnsureIndices(ctx); err != nil { 337 return fmt.Errorf("failed to create opensearch indices: %w", err) 338 } 339 if err := srv.Indexer.RunIndexer(ctx); err != nil { 340 return fmt.Errorf("failed to run indexer: %w", err) 341 } 342 } 343 344 return nil 345 }, 346} 347 348var elasticCheckCmd = &cli.Command{ 349 Name: "elastic-check", 350 Flags: []cli.Flag{}, 351 Action: func(cctx *cli.Context) error { 352 escli, err := createEsClient(cctx) 353 if err != nil { 354 return err 355 } 356 357 // NOTE: this extra info check is redundant; createEsClient() already made this call and logged results 358 inf, err := escli.Info() 359 if err != nil { 360 return fmt.Errorf("failed to get info: %w", err) 361 } 362 defer inf.Body.Close() 363 if inf.IsError() { 364 return fmt.Errorf("failed to get info") 365 } 366 slog.Info("opensearch client connected", "client_info", inf) 367 368 resp, err := escli.Indices.Exists([]string{cctx.String("es-profile-index"), cctx.String("es-post-index")}) 369 if err != nil { 370 return fmt.Errorf("failed to check index existence: %w", err) 371 } 372 defer resp.Body.Close() 373 if inf.IsError() { 374 return fmt.Errorf("failed to check index existence") 375 } 376 slog.Info("index existence", "resp", resp) 377 378 return nil 379 380 }, 381} 382 383func printHits(resp *search.EsSearchResponse) { 384 fmt.Printf("%d hits in %d\n", len(resp.Hits.Hits), resp.Took) 385 for _, hit := range resp.Hits.Hits { 386 b, _ := json.Marshal(hit.Source) 387 fmt.Println(string(b)) 388 } 389 return 390} 391 392var searchPostCmd = &cli.Command{ 393 Name: "search-post", 394 Usage: "run a simple query against posts index", 395 Action: func(cctx *cli.Context) error { 396 escli, err := createEsClient(cctx) 397 if err != nil { 398 return err 399 } 400 res, err := search.DoSearchPosts( 401 context.Background(), 402 identity.DefaultDirectory(), // TODO: parse PLC arg 403 escli, 404 cctx.String("es-post-index"), 405 &search.PostSearchParams{ 406 Query: strings.Join(cctx.Args().Slice(), " "), 407 Offset: 0, 408 Size: 20, 409 }, 410 ) 411 if err != nil { 412 return err 413 } 414 printHits(res) 415 return nil 416 }, 417} 418 419var searchProfileCmd = &cli.Command{ 420 Name: "search-profile", 421 Usage: "run a simple query against posts index", 422 Flags: []cli.Flag{ 423 &cli.BoolFlag{ 424 Name: "typeahead", 425 }, 426 }, 427 Action: func(cctx *cli.Context) error { 428 escli, err := createEsClient(cctx) 429 if err != nil { 430 return err 431 } 432 if cctx.Bool("typeahead") { 433 res, err := search.DoSearchProfilesTypeahead( 434 context.Background(), 435 escli, 436 cctx.String("es-profile-index"), 437 &search.ActorSearchParams{ 438 Query: strings.Join(cctx.Args().Slice(), " "), 439 Size: 10, 440 }, 441 ) 442 if err != nil { 443 return err 444 } 445 printHits(res) 446 } else { 447 res, err := search.DoSearchProfiles( 448 context.Background(), 449 identity.DefaultDirectory(), // TODO: parse PLC arg 450 escli, 451 cctx.String("es-profile-index"), 452 &search.ActorSearchParams{ 453 Query: strings.Join(cctx.Args().Slice(), " "), 454 Offset: 0, 455 Size: 20, 456 }, 457 ) 458 if err != nil { 459 return err 460 } 461 printHits(res) 462 } 463 return nil 464 }, 465} 466 467func createEsClient(cctx *cli.Context) (*es.Client, error) { 468 469 addrs := []string{} 470 if hosts := cctx.String("elastic-hosts"); hosts != "" { 471 addrs = strings.Split(hosts, ",") 472 } 473 474 certfi := cctx.String("elastic-cert-file") 475 var cert []byte 476 if certfi != "" { 477 b, err := os.ReadFile(certfi) 478 if err != nil { 479 return nil, err 480 } 481 482 cert = b 483 } 484 485 insecure := cctx.Bool("elastic-insecure-ssl") 486 487 cfg := es.Config{ 488 Addresses: addrs, 489 Username: cctx.String("elastic-username"), 490 Password: cctx.String("elastic-password"), 491 CACert: cert, 492 Transport: &http.Transport{ 493 MaxIdleConnsPerHost: 20, 494 TLSClientConfig: &tls.Config{ 495 InsecureSkipVerify: insecure, 496 }, 497 }, 498 } 499 500 escli, err := es.NewClient(cfg) 501 if err != nil { 502 return nil, fmt.Errorf("failed to set up client: %w", err) 503 } 504 505 info, err := escli.Info() 506 if err != nil { 507 return nil, fmt.Errorf("cannot get escli info: %w", err) 508 } 509 defer info.Body.Close() 510 slog.Debug("opensearch client initialized", "info", info) 511 512 return escli, nil 513}