1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "net"
9 "net/http"
10 "os"
11 "os/signal"
12 "syscall"
13 "time"
14
15 "github.com/bluesky-social/indigo/atproto/identity"
16
17 "github.com/labstack/echo/v4"
18 "github.com/labstack/echo/v4/middleware"
19 "github.com/prometheus/client_golang/prometheus/promhttp"
20 "github.com/redis/go-redis/v9"
21 slogecho "github.com/samber/slog-echo"
22 "golang.org/x/time/rate"
23)
24
25type Server struct {
26 dir *RedisResolver
27 echo *echo.Echo
28 httpd *http.Server
29 logger *slog.Logger
30
31 // this redis client is used to store firehose offset
32 redisClient *redis.Client
33
34 // lastSeq is the most recent event sequence number we've received and begun to handle.
35 // This number is periodically persisted to redis, if redis is present.
36 // The value is best-effort (the stream handling itself is concurrent, so event numbers may not be monotonic),
37 // but nonetheless, you must use atomics when updating or reading this (to avoid data races).
38 lastSeq int64
39}
40
41type Config struct {
42 Logger *slog.Logger
43 PLCHost string
44 PLCRateLimit int
45 RedisURL string
46 Bind string
47 DisableRefresh bool
48}
49
50func NewServer(config Config) (*Server, error) {
51 logger := config.Logger
52 if logger == nil {
53 logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
54 Level: slog.LevelInfo,
55 }))
56 }
57
58 baseDir := identity.BaseDirectory{
59 PLCURL: config.PLCHost,
60 HTTPClient: http.Client{
61 Timeout: time.Second * 10,
62 Transport: &http.Transport{
63 // would want this around 100ms for services doing lots of handle resolution (to reduce number of idle connections). Impacts PLC connections as well, but not too bad.
64 IdleConnTimeout: time.Millisecond * 100,
65 MaxIdleConns: 1000,
66 },
67 },
68 Resolver: net.Resolver{
69 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
70 d := net.Dialer{Timeout: time.Second * 3}
71 return d.DialContext(ctx, network, address)
72 },
73 },
74 PLCLimiter: rate.NewLimiter(rate.Limit(config.PLCRateLimit), 1),
75 TryAuthoritativeDNS: true,
76 SkipDNSDomainSuffixes: []string{".bsky.social", ".staging.bsky.dev"},
77 // TODO: UserAgent: "bluepages",
78 }
79
80 // TODO: config these timeouts
81 redisDir, err := NewRedisResolver(&baseDir, config.RedisURL, time.Hour*24, time.Minute*2, time.Minute*5, 50_000)
82 if err != nil {
83 return nil, err
84 }
85 redisDir.Logger = logger
86
87 // configure redis client (for firehose consumer)
88 redisOpt, err := redis.ParseURL(config.RedisURL)
89 if err != nil {
90 return nil, fmt.Errorf("parsing redis URL: %v", err)
91 }
92 redisClient := redis.NewClient(redisOpt)
93 // check redis connection
94 _, err = redisClient.Ping(context.Background()).Result()
95 if err != nil {
96 return nil, fmt.Errorf("redis ping failed: %v", err)
97 }
98
99 e := echo.New()
100
101 // httpd
102 var (
103 httpTimeout = 1 * time.Minute
104 httpMaxHeaderBytes = 1 * (1024 * 1024)
105 )
106
107 srv := &Server{
108 echo: e,
109 dir: redisDir,
110 logger: logger,
111 redisClient: redisClient,
112 }
113
114 srv.httpd = &http.Server{
115 Handler: srv,
116 Addr: config.Bind,
117 WriteTimeout: httpTimeout,
118 ReadTimeout: httpTimeout,
119 MaxHeaderBytes: httpMaxHeaderBytes,
120 }
121
122 e.HideBanner = true
123 e.Use(slogecho.New(logger))
124 e.Use(middleware.Recover())
125 e.Use(middleware.BodyLimit("4M"))
126 e.HTTPErrorHandler = srv.errorHandler
127 e.Use(middleware.SecureWithConfig(middleware.SecureConfig{
128 ContentTypeNosniff: "nosniff",
129 XFrameOptions: "SAMEORIGIN",
130 HSTSMaxAge: 31536000, // 365 days
131 // TODO:
132 // ContentSecurityPolicy
133 // XSSProtection
134 }))
135
136 e.GET("/", srv.WebHome)
137 e.GET("/_health", srv.HandleHealthCheck)
138 e.GET("/xrpc/com.atproto.identity.resolveHandle", srv.ResolveHandle)
139 e.GET("/xrpc/com.atproto.identity.resolveDid", srv.ResolveDid)
140 e.GET("/xrpc/com.atproto.identity.resolveIdentity", srv.ResolveIdentity)
141 if !config.DisableRefresh {
142 e.POST("/xrpc/com.atproto.identity.refreshIdentity", srv.RefreshIdentity)
143 }
144
145 return srv, nil
146}
147
148func (srv *Server) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
149 srv.echo.ServeHTTP(rw, req)
150}
151
152func (srv *Server) RunAPI() error {
153 srv.logger.Info("starting server", "bind", srv.httpd.Addr)
154 go func() {
155 if err := srv.httpd.ListenAndServe(); err != nil {
156 if !errors.Is(err, http.ErrServerClosed) {
157 srv.logger.Error("HTTP server shutting down unexpectedly", "err", err)
158 }
159 }
160 }()
161
162 // Wait for a signal to exit.
163 srv.logger.Info("registering OS exit signal handler")
164 quit := make(chan struct{})
165 exitSignals := make(chan os.Signal, 1)
166 signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM)
167 go func() {
168 sig := <-exitSignals
169 srv.logger.Info("received OS exit signal", "signal", sig)
170
171 // Shut down the HTTP server
172 if err := srv.Shutdown(); err != nil {
173 srv.logger.Error("HTTP server shutdown error", "err", err)
174 }
175
176 // Trigger the return that causes an exit.
177 close(quit)
178 }()
179 <-quit
180 srv.logger.Info("graceful shutdown complete")
181 return nil
182}
183
184func (srv *Server) RunMetrics(bind string) error {
185 p := "/metrics"
186 srv.logger.Info("starting metrics endpoint", "bind", bind, "path", p)
187 http.Handle(p, promhttp.Handler())
188 return http.ListenAndServe(bind, nil)
189}
190
191func (srv *Server) Shutdown() error {
192 srv.logger.Info("shutting down")
193
194 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
195 defer cancel()
196
197 return srv.httpd.Shutdown(ctx)
198}
199
200type GenericError struct {
201 Error string `json:"error"`
202 Message string `json:"message"`
203}
204
205func (srv *Server) errorHandler(err error, c echo.Context) {
206 code := http.StatusInternalServerError
207 var errorMessage string
208 if he, ok := err.(*echo.HTTPError); ok {
209 code = he.Code
210 errorMessage = fmt.Sprintf("%s", he.Message)
211 }
212 if code >= 500 {
213 srv.logger.Warn("bluepages-http-internal-error", "err", err)
214 }
215 if !c.Response().Committed {
216 c.JSON(code, GenericError{Error: "InternalError", Message: errorMessage})
217 }
218}