fork of indigo with slightly nicer lexgen
at main 4.4 kB view raw
1package search 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "log/slog" 8 "net/http" 9 "os" 10 "strings" 11 12 "github.com/bluesky-social/indigo/atproto/identity" 13 14 "github.com/carlmjohnson/versioninfo" 15 "github.com/labstack/echo/v4" 16 "github.com/labstack/echo/v4/middleware" 17 es "github.com/opensearch-project/opensearch-go/v2" 18 "github.com/prometheus/client_golang/prometheus/promhttp" 19 slogecho "github.com/samber/slog-echo" 20 "go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho" 21 22 _ "net/http/pprof" // For pprof in the metrics server 23) 24 25type LastSeq struct { 26 ID uint `gorm:"primarykey"` 27 Seq int64 28} 29 30type ServerConfig struct { 31 Logger *slog.Logger 32 ProfileIndex string 33 PostIndex string 34 AtlantisAddresses []string 35} 36 37type Server struct { 38 escli *es.Client 39 postIndex string 40 profileIndex string 41 dir identity.Directory 42 echo *echo.Echo 43 logger *slog.Logger 44 45 Indexer *Indexer 46} 47 48func NewServer(escli *es.Client, dir identity.Directory, config ServerConfig) (*Server, error) { 49 logger := config.Logger 50 if logger == nil { 51 logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 52 Level: slog.LevelInfo, 53 })) 54 } 55 56 serv := Server{ 57 escli: escli, 58 postIndex: config.PostIndex, 59 profileIndex: config.ProfileIndex, 60 dir: dir, 61 logger: logger, 62 } 63 64 return &serv, nil 65} 66 67func (s *Server) EnsureIndices(ctx context.Context) error { 68 69 indices := []struct { 70 Name string 71 SchemaJSON string 72 }{ 73 {Name: s.postIndex, SchemaJSON: palomarPostSchemaJSON}, 74 {Name: s.profileIndex, SchemaJSON: palomarProfileSchemaJSON}, 75 } 76 for _, idx := range indices { 77 resp, err := s.escli.Indices.Exists([]string{idx.Name}) 78 if err != nil { 79 return err 80 } 81 defer resp.Body.Close() 82 io.ReadAll(resp.Body) 83 if resp.IsError() && resp.StatusCode != 404 { 84 return fmt.Errorf("failed to check index existence") 85 } 86 if resp.StatusCode == 404 { 87 s.logger.Warn("creating opensearch index", "index", idx.Name) 88 if len(idx.SchemaJSON) < 2 { 89 return fmt.Errorf("empty schema file (go:embed failed)") 90 } 91 buf := strings.NewReader(idx.SchemaJSON) 92 resp, err := s.escli.Indices.Create( 93 idx.Name, 94 s.escli.Indices.Create.WithBody(buf)) 95 if err != nil { 96 return err 97 } 98 defer resp.Body.Close() 99 errBytes, err := io.ReadAll(resp.Body) 100 if resp.IsError() { 101 s.logger.Error("failed to create index", "index", idx.Name, "response", string(errBytes)) 102 return fmt.Errorf("failed to create index") 103 } 104 if err != nil { 105 return err 106 } 107 } 108 } 109 return nil 110} 111 112type HealthStatus struct { 113 Service string `json:"service,const=palomar"` 114 Status string `json:"status"` 115 Version string `json:"version"` 116 Message string `json:"msg,omitempty"` 117} 118 119func (a *Server) handleHealthCheck(c echo.Context) error { 120 if a.Indexer != nil { 121 if err := a.Indexer.db.Exec("SELECT 1").Error; err != nil { 122 a.logger.Error("healthcheck can't connect to database", "err", err) 123 return c.JSON(500, HealthStatus{Status: "error", Version: versioninfo.Short(), Message: "can't connect to database"}) 124 } 125 } 126 return c.JSON(200, HealthStatus{Status: "ok", Version: versioninfo.Short()}) 127} 128 129func (s *Server) RunAPI(listen string) error { 130 131 s.logger.Info("Configuring HTTP server") 132 e := echo.New() 133 e.HideBanner = true 134 e.Use(slogecho.New(s.logger)) 135 e.Use(middleware.Recover()) 136 e.Use(MetricsMiddleware) 137 e.Use(middleware.BodyLimit("64M")) 138 e.Use(otelecho.Middleware("palomar")) 139 140 e.HTTPErrorHandler = func(err error, ctx echo.Context) { 141 code := 500 142 if he, ok := err.(*echo.HTTPError); ok { 143 code = he.Code 144 } 145 s.logger.Warn("HTTP request error", "statusCode", code, "path", ctx.Path(), "err", err) 146 ctx.Response().WriteHeader(code) 147 } 148 149 e.Use(middleware.CORS()) 150 e.GET("/", s.handleHealthCheck) 151 e.GET("/_health", s.handleHealthCheck) 152 e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 153 e.GET("/xrpc/app.bsky.unspecced.searchPostsSkeleton", s.handleSearchPostsSkeleton) 154 e.GET("/xrpc/app.bsky.unspecced.searchActorsSkeleton", s.handleSearchActorsSkeleton) 155 s.echo = e 156 157 s.logger.Info("starting search API daemon", "bind", listen) 158 return s.echo.Start(listen) 159} 160 161func (s *Server) RunMetrics(listen string) error { 162 http.Handle("/metrics", promhttp.Handler()) 163 return http.ListenAndServe(listen, nil) 164} 165 166func (s *Server) Shutdown(ctx context.Context) error { 167 return s.echo.Shutdown(ctx) 168}