1package search
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log/slog"
8 "net/http"
9 "os"
10 "strings"
11
12 "github.com/bluesky-social/indigo/atproto/identity"
13
14 "github.com/carlmjohnson/versioninfo"
15 "github.com/labstack/echo/v4"
16 "github.com/labstack/echo/v4/middleware"
17 es "github.com/opensearch-project/opensearch-go/v2"
18 "github.com/prometheus/client_golang/prometheus/promhttp"
19 slogecho "github.com/samber/slog-echo"
20 "go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho"
21
22 _ "net/http/pprof" // For pprof in the metrics server
23)
24
25type LastSeq struct {
26 ID uint `gorm:"primarykey"`
27 Seq int64
28}
29
30type ServerConfig struct {
31 Logger *slog.Logger
32 ProfileIndex string
33 PostIndex string
34 AtlantisAddresses []string
35}
36
37type Server struct {
38 escli *es.Client
39 postIndex string
40 profileIndex string
41 dir identity.Directory
42 echo *echo.Echo
43 logger *slog.Logger
44
45 Indexer *Indexer
46}
47
48func NewServer(escli *es.Client, dir identity.Directory, config ServerConfig) (*Server, error) {
49 logger := config.Logger
50 if logger == nil {
51 logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
52 Level: slog.LevelInfo,
53 }))
54 }
55
56 serv := Server{
57 escli: escli,
58 postIndex: config.PostIndex,
59 profileIndex: config.ProfileIndex,
60 dir: dir,
61 logger: logger,
62 }
63
64 return &serv, nil
65}
66
67func (s *Server) EnsureIndices(ctx context.Context) error {
68
69 indices := []struct {
70 Name string
71 SchemaJSON string
72 }{
73 {Name: s.postIndex, SchemaJSON: palomarPostSchemaJSON},
74 {Name: s.profileIndex, SchemaJSON: palomarProfileSchemaJSON},
75 }
76 for _, idx := range indices {
77 resp, err := s.escli.Indices.Exists([]string{idx.Name})
78 if err != nil {
79 return err
80 }
81 defer resp.Body.Close()
82 io.ReadAll(resp.Body)
83 if resp.IsError() && resp.StatusCode != 404 {
84 return fmt.Errorf("failed to check index existence")
85 }
86 if resp.StatusCode == 404 {
87 s.logger.Warn("creating opensearch index", "index", idx.Name)
88 if len(idx.SchemaJSON) < 2 {
89 return fmt.Errorf("empty schema file (go:embed failed)")
90 }
91 buf := strings.NewReader(idx.SchemaJSON)
92 resp, err := s.escli.Indices.Create(
93 idx.Name,
94 s.escli.Indices.Create.WithBody(buf))
95 if err != nil {
96 return err
97 }
98 defer resp.Body.Close()
99 errBytes, err := io.ReadAll(resp.Body)
100 if resp.IsError() {
101 s.logger.Error("failed to create index", "index", idx.Name, "response", string(errBytes))
102 return fmt.Errorf("failed to create index")
103 }
104 if err != nil {
105 return err
106 }
107 }
108 }
109 return nil
110}
111
112type HealthStatus struct {
113 Service string `json:"service,const=palomar"`
114 Status string `json:"status"`
115 Version string `json:"version"`
116 Message string `json:"msg,omitempty"`
117}
118
119func (a *Server) handleHealthCheck(c echo.Context) error {
120 if a.Indexer != nil {
121 if err := a.Indexer.db.Exec("SELECT 1").Error; err != nil {
122 a.logger.Error("healthcheck can't connect to database", "err", err)
123 return c.JSON(500, HealthStatus{Status: "error", Version: versioninfo.Short(), Message: "can't connect to database"})
124 }
125 }
126 return c.JSON(200, HealthStatus{Status: "ok", Version: versioninfo.Short()})
127}
128
129func (s *Server) RunAPI(listen string) error {
130
131 s.logger.Info("Configuring HTTP server")
132 e := echo.New()
133 e.HideBanner = true
134 e.Use(slogecho.New(s.logger))
135 e.Use(middleware.Recover())
136 e.Use(MetricsMiddleware)
137 e.Use(middleware.BodyLimit("64M"))
138 e.Use(otelecho.Middleware("palomar"))
139
140 e.HTTPErrorHandler = func(err error, ctx echo.Context) {
141 code := 500
142 if he, ok := err.(*echo.HTTPError); ok {
143 code = he.Code
144 }
145 s.logger.Warn("HTTP request error", "statusCode", code, "path", ctx.Path(), "err", err)
146 ctx.Response().WriteHeader(code)
147 }
148
149 e.Use(middleware.CORS())
150 e.GET("/", s.handleHealthCheck)
151 e.GET("/_health", s.handleHealthCheck)
152 e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
153 e.GET("/xrpc/app.bsky.unspecced.searchPostsSkeleton", s.handleSearchPostsSkeleton)
154 e.GET("/xrpc/app.bsky.unspecced.searchActorsSkeleton", s.handleSearchActorsSkeleton)
155 s.echo = e
156
157 s.logger.Info("starting search API daemon", "bind", listen)
158 return s.echo.Start(listen)
159}
160
161func (s *Server) RunMetrics(listen string) error {
162 http.Handle("/metrics", promhttp.Handler())
163 return http.ListenAndServe(listen, nil)
164}
165
166func (s *Server) Shutdown(ctx context.Context) error {
167 return s.echo.Shutdown(ctx)
168}