porting all github actions from bluesky-social/indigo to tangled CI
at ci 5.9 kB view raw
1package main 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "net" 9 "net/http" 10 "os" 11 "os/signal" 12 "syscall" 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/identity" 16 17 "github.com/labstack/echo/v4" 18 "github.com/labstack/echo/v4/middleware" 19 "github.com/prometheus/client_golang/prometheus/promhttp" 20 "github.com/redis/go-redis/v9" 21 slogecho "github.com/samber/slog-echo" 22 "golang.org/x/time/rate" 23) 24 25type Server struct { 26 dir *RedisResolver 27 echo *echo.Echo 28 httpd *http.Server 29 logger *slog.Logger 30 31 // this redis client is used to store firehose offset 32 redisClient *redis.Client 33 34 // lastSeq is the most recent event sequence number we've received and begun to handle. 35 // This number is periodically persisted to redis, if redis is present. 36 // The value is best-effort (the stream handling itself is concurrent, so event numbers may not be monotonic), 37 // but nonetheless, you must use atomics when updating or reading this (to avoid data races). 38 lastSeq int64 39} 40 41type Config struct { 42 Logger *slog.Logger 43 PLCHost string 44 PLCRateLimit int 45 RedisURL string 46 Bind string 47 DisableRefresh bool 48} 49 50func NewServer(config Config) (*Server, error) { 51 logger := config.Logger 52 if logger == nil { 53 logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 54 Level: slog.LevelInfo, 55 })) 56 } 57 58 baseDir := identity.BaseDirectory{ 59 PLCURL: config.PLCHost, 60 HTTPClient: http.Client{ 61 Timeout: time.Second * 10, 62 Transport: &http.Transport{ 63 // would want this around 100ms for services doing lots of handle resolution (to reduce number of idle connections). Impacts PLC connections as well, but not too bad. 64 IdleConnTimeout: time.Millisecond * 100, 65 MaxIdleConns: 1000, 66 }, 67 }, 68 Resolver: net.Resolver{ 69 Dial: func(ctx context.Context, network, address string) (net.Conn, error) { 70 d := net.Dialer{Timeout: time.Second * 3} 71 return d.DialContext(ctx, network, address) 72 }, 73 }, 74 PLCLimiter: rate.NewLimiter(rate.Limit(config.PLCRateLimit), 1), 75 TryAuthoritativeDNS: true, 76 SkipDNSDomainSuffixes: []string{".bsky.social", ".staging.bsky.dev"}, 77 // TODO: UserAgent: "bluepages", 78 } 79 80 // TODO: config these timeouts 81 redisDir, err := NewRedisResolver(&baseDir, config.RedisURL, time.Hour*24, time.Minute*2, time.Minute*5, 50_000) 82 if err != nil { 83 return nil, err 84 } 85 redisDir.Logger = logger 86 87 // configure redis client (for firehose consumer) 88 redisOpt, err := redis.ParseURL(config.RedisURL) 89 if err != nil { 90 return nil, fmt.Errorf("parsing redis URL: %v", err) 91 } 92 redisClient := redis.NewClient(redisOpt) 93 // check redis connection 94 _, err = redisClient.Ping(context.Background()).Result() 95 if err != nil { 96 return nil, fmt.Errorf("redis ping failed: %v", err) 97 } 98 99 e := echo.New() 100 101 // httpd 102 var ( 103 httpTimeout = 1 * time.Minute 104 httpMaxHeaderBytes = 1 * (1024 * 1024) 105 ) 106 107 srv := &Server{ 108 echo: e, 109 dir: redisDir, 110 logger: logger, 111 redisClient: redisClient, 112 } 113 114 srv.httpd = &http.Server{ 115 Handler: srv, 116 Addr: config.Bind, 117 WriteTimeout: httpTimeout, 118 ReadTimeout: httpTimeout, 119 MaxHeaderBytes: httpMaxHeaderBytes, 120 } 121 122 e.HideBanner = true 123 e.Use(slogecho.New(logger)) 124 e.Use(middleware.Recover()) 125 e.Use(middleware.BodyLimit("4M")) 126 e.HTTPErrorHandler = srv.errorHandler 127 e.Use(middleware.SecureWithConfig(middleware.SecureConfig{ 128 ContentTypeNosniff: "nosniff", 129 XFrameOptions: "SAMEORIGIN", 130 HSTSMaxAge: 31536000, // 365 days 131 // TODO: 132 // ContentSecurityPolicy 133 // XSSProtection 134 })) 135 136 e.GET("/", srv.WebHome) 137 e.GET("/_health", srv.HandleHealthCheck) 138 e.GET("/xrpc/com.atproto.identity.resolveHandle", srv.ResolveHandle) 139 e.GET("/xrpc/com.atproto.identity.resolveDid", srv.ResolveDid) 140 e.GET("/xrpc/com.atproto.identity.resolveIdentity", srv.ResolveIdentity) 141 if !config.DisableRefresh { 142 e.POST("/xrpc/com.atproto.identity.refreshIdentity", srv.RefreshIdentity) 143 } 144 145 return srv, nil 146} 147 148func (srv *Server) ServeHTTP(rw http.ResponseWriter, req *http.Request) { 149 srv.echo.ServeHTTP(rw, req) 150} 151 152func (srv *Server) RunAPI() error { 153 srv.logger.Info("starting server", "bind", srv.httpd.Addr) 154 go func() { 155 if err := srv.httpd.ListenAndServe(); err != nil { 156 if !errors.Is(err, http.ErrServerClosed) { 157 srv.logger.Error("HTTP server shutting down unexpectedly", "err", err) 158 } 159 } 160 }() 161 162 // Wait for a signal to exit. 163 srv.logger.Info("registering OS exit signal handler") 164 quit := make(chan struct{}) 165 exitSignals := make(chan os.Signal, 1) 166 signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM) 167 go func() { 168 sig := <-exitSignals 169 srv.logger.Info("received OS exit signal", "signal", sig) 170 171 // Shut down the HTTP server 172 if err := srv.Shutdown(); err != nil { 173 srv.logger.Error("HTTP server shutdown error", "err", err) 174 } 175 176 // Trigger the return that causes an exit. 177 close(quit) 178 }() 179 <-quit 180 srv.logger.Info("graceful shutdown complete") 181 return nil 182} 183 184func (srv *Server) RunMetrics(bind string) error { 185 p := "/metrics" 186 srv.logger.Info("starting metrics endpoint", "bind", bind, "path", p) 187 http.Handle(p, promhttp.Handler()) 188 return http.ListenAndServe(bind, nil) 189} 190 191func (srv *Server) Shutdown() error { 192 srv.logger.Info("shutting down") 193 194 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 195 defer cancel() 196 197 return srv.httpd.Shutdown(ctx) 198} 199 200type GenericError struct { 201 Error string `json:"error"` 202 Message string `json:"message"` 203} 204 205func (srv *Server) errorHandler(err error, c echo.Context) { 206 code := http.StatusInternalServerError 207 var errorMessage string 208 if he, ok := err.(*echo.HTTPError); ok { 209 code = he.Code 210 errorMessage = fmt.Sprintf("%s", he.Message) 211 } 212 if code >= 500 { 213 srv.logger.Warn("bluepages-http-internal-error", "err", err) 214 } 215 if !c.Response().Committed { 216 c.JSON(code, GenericError{Error: "InternalError", Message: errorMessage}) 217 } 218}