fork of indigo with slightly nicer lexgen

rainbow: small refactors, and improve request forwarding (#1023)

A bit of this is moving code around so I could understand which pieces
were doing what. Then improved how the request proxying and requestCrawl
forwarding stuff works, as part of new relay ops.

None of the core configuration or behaviors should change: I tweaked
some argument names, but not the env var names, so nothing about how we
deploy/operate rainbow should change.

No behavior of the core code (ringbuf, upstream websocket subscription,
or serving to clients) has changed.

authored by bnewbold.net and committed by GitHub 539c5771 d6e162c3

Changed files
+582 -513
cmd
rainbow
splitter
util
svcutil
+3 -3
cmd/rainbow/README.md
··· 11 11 - retains upstream firehose "sequence numbers" 12 12 - does not validate events (signatures, repo tree, hashes, etc), just passes through 13 13 - does not archive or mirror individual records or entire repositories (or implement related API endpoints) 14 - - disk I/O intensive: fast NVMe disks are recommended, and RAM is helpful for caching 14 + - somewhat disk I/O intensive: fast NVMe disks are recommended, and RAM is helpful for caching 15 15 - single golang binary for easy deployment 16 - - observability: logging, prometheus metrics, OTEL traces 16 + - observability: logging, prometheus metrics, and OTEL traces 17 17 18 18 ## Running 19 19 ··· 22 22 From the top level of this repo, you can build: 23 23 24 24 ```shell 25 - go build ./cmd/rainbow -o rainbow-bin 25 + go build ./cmd/rainbow -o rainbow 26 26 ``` 27 27 28 28 or just run it, and see configuration options:
+83 -58
cmd/rainbow/main.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 "log/slog" 6 - _ "net/http/pprof" 7 7 "os" 8 8 "os/signal" 9 9 "syscall" 10 10 "time" 11 11 12 + _ "github.com/joho/godotenv/autoload" 13 + _ "go.uber.org/automaxprocs" 14 + _ "net/http/pprof" 15 + 12 16 "github.com/bluesky-social/indigo/events/pebblepersist" 13 17 "github.com/bluesky-social/indigo/splitter" 18 + "github.com/bluesky-social/indigo/util/svcutil" 14 19 15 20 "github.com/carlmjohnson/versioninfo" 16 - _ "github.com/joho/godotenv/autoload" 17 21 "github.com/urfave/cli/v2" 18 22 "go.opentelemetry.io/otel" 19 23 "go.opentelemetry.io/otel/attribute" ··· 21 25 "go.opentelemetry.io/otel/sdk/resource" 22 26 tracesdk "go.opentelemetry.io/otel/sdk/trace" 23 27 semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 24 - _ "go.uber.org/automaxprocs" 25 28 ) 26 29 27 - var log = slog.Default().With("system", "rainbow") 28 - 29 - func init() { 30 - // control log level using, eg, GOLOG_LOG_LEVEL=debug 31 - //logging.SetAllLoggers(logging.LevelDebug) 32 - } 33 - 34 30 func main() { 35 - run(os.Args) 31 + if err := run(os.Args); err != nil { 32 + slog.Error("exiting", "err", err) 33 + os.Exit(1) 34 + } 36 35 } 37 36 38 - func run(args []string) { 37 + func run(args []string) error { 38 + 39 39 app := cli.App{ 40 40 Name: "rainbow", 41 41 Usage: "atproto firehose fan-out daemon", 42 42 Version: versioninfo.Short(), 43 + Action: runSplitter, 43 44 } 44 45 45 46 app.Flags = []cli.Flag{ 46 - // TODO: unimplemented, always assumes https:// and wss:// 47 - //&cli.BoolFlag{ 48 - // Name: "crawl-insecure-ws", 49 - // Usage: "when connecting to PDS instances, use ws:// instead of wss://", 50 - // EnvVars: []string{"RAINBOW_INSECURE_CRAWL"}, 51 - //}, 47 + &cli.StringFlag{ 48 + Name: "log-level", 49 + Usage: "log verbosity level (eg: warn, info, debug)", 50 + EnvVars: []string{"RAINBOW_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 51 + }, 52 52 &cli.StringFlag{ 53 - Name: "splitter-host", 54 - Value: "bsky.network", 53 + Name: "upstream-host", 54 + Value: "http://localhost:2470", 55 + Usage: "URL (schema and hostname, no path) of the upstream host (eg, relay)", 55 56 EnvVars: []string{"ATP_RELAY_HOST", "RAINBOW_RELAY_HOST"}, 56 57 }, 57 58 &cli.StringFlag{ ··· 89 90 EnvVars: []string{"RAINBOW_PERSIST_BYTES", "SPLITTER_PERSIST_BYTES"}, 90 91 }, 91 92 &cli.StringSliceFlag{ 93 + // TODO: better name for this argument 92 94 Name: "next-crawler", 93 - Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list", 94 - EnvVars: []string{"RELAY_NEXT_CRAWLER"}, 95 + Usage: "forward POST requestCrawl to these hosts (schema and host, no path) in addition to upstream-host. Comma-separated or multiple flags", 96 + EnvVars: []string{"RAINBOW_NEXT_CRAWLER", "RELAY_NEXT_CRAWLER"}, 97 + }, 98 + &cli.StringFlag{ 99 + Name: "collectiondir-host", 100 + Value: "http://localhost:2510", 101 + Usage: "host (schema and hostname, no path) of upstream collectiondir instance, for com.atproto.sync.listReposByCollection", 102 + EnvVars: []string{"RAINBOW_COLLECTIONDIR_HOST"}, 103 + }, 104 + &cli.StringFlag{ 105 + Name: "env", 106 + Usage: "operating environment (eg, 'prod', 'test')", 107 + Value: "dev", 108 + EnvVars: []string{"ENVIRONMENT"}, 109 + }, 110 + &cli.BoolFlag{ 111 + Name: "enable-otel-otlp", 112 + Usage: "enables OTEL OTLP exporter endpoint", 113 + EnvVars: []string{"RAINBOW_ENABLE_OTEL_OTLP", "ENABLE_OTEL_OTLP"}, 114 + }, 115 + &cli.StringFlag{ 116 + Name: "otel-otlp-endpoint", 117 + Usage: "OTEL traces export endpoint", 118 + Value: "http://localhost:4318", 119 + EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"}, 95 120 }, 96 121 } 97 122 98 - // TODO: slog.SetDefault and set module `var log *slog.Logger` based on flags and env 99 - 100 - app.Action = Splitter 101 - err := app.Run(os.Args) 102 - if err != nil { 103 - log.Error(err.Error()) 104 - os.Exit(1) 105 - } 123 + return app.Run(args) 106 124 } 107 125 108 - func Splitter(cctx *cli.Context) error { 126 + func runSplitter(cctx *cli.Context) error { 109 127 // Trap SIGINT to trigger a shutdown. 110 128 signals := make(chan os.Signal, 1) 111 129 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 112 130 131 + logger := svcutil.ConfigLogger(cctx, os.Stdout).With("system", "rainbow") 132 + 113 133 // Enable OTLP HTTP exporter 114 134 // For relevant environment variables: 115 135 // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 116 - // At a minimum, you need to set 117 - // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 118 - if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { 119 - log.Info("setting up trace exporter", "endpoint", ep) 136 + if cctx.Bool("enable-otel-otlp") { 137 + ep := cctx.String("otel-otlp-endpoint") 138 + logger.Info("setting up trace exporter", "endpoint", ep) 120 139 ctx, cancel := context.WithCancel(context.Background()) 121 140 defer cancel() 122 141 123 142 exp, err := otlptracehttp.New(ctx) 124 143 if err != nil { 125 - log.Error("failed to create trace exporter", "error", err) 144 + logger.Error("failed to create trace exporter", "error", err) 126 145 os.Exit(1) 127 146 } 128 147 defer func() { 129 148 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 130 149 defer cancel() 131 150 if err := exp.Shutdown(ctx); err != nil { 132 - log.Error("failed to shutdown trace exporter", "error", err) 151 + logger.Error("failed to shutdown trace exporter", "error", err) 133 152 } 134 153 }() 135 154 155 + env := cctx.String("env") 136 156 tp := tracesdk.NewTracerProvider( 137 157 tracesdk.WithBatcher(exp), 138 158 tracesdk.WithResource(resource.NewWithAttributes( 139 159 semconv.SchemaURL, 140 160 semconv.ServiceNameKey.String("splitter"), 141 - attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog 142 - attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others 161 + attribute.String("env", env), // DataDog 162 + attribute.String("environment", env), // Others 143 163 attribute.Int64("ID", 1), 144 164 )), 145 165 ) ··· 147 167 } 148 168 149 169 persistPath := cctx.String("persist-db") 150 - upstreamHost := cctx.String("splitter-host") 170 + upstreamHost := cctx.String("upstream-host") 171 + collectionDirHost := cctx.String("collectiondir-host") 151 172 nextCrawlers := cctx.StringSlice("next-crawler") 152 173 153 174 var spl *splitter.Splitter 154 175 var err error 155 176 if persistPath != "" { 156 - log.Info("building splitter with storage at", "path", persistPath) 177 + logger.Info("building splitter with storage at", "path", persistPath) 157 178 ppopts := pebblepersist.PebblePersistOptions{ 158 179 DbPath: persistPath, 159 180 PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")), ··· 161 182 MaxBytes: uint64(cctx.Int64("persist-bytes")), 162 183 } 163 184 conf := splitter.SplitterConfig{ 164 - UpstreamHost: upstreamHost, 165 - CursorFile: cctx.String("cursor-file"), 166 - PebbleOptions: &ppopts, 185 + UpstreamHost: upstreamHost, 186 + CollectionDirHost: collectionDirHost, 187 + CursorFile: cctx.String("cursor-file"), 188 + PebbleOptions: &ppopts, 189 + UserAgent: fmt.Sprintf("rainbow/%s", versioninfo.Short()), 167 190 } 168 191 spl, err = splitter.NewSplitter(conf, nextCrawlers) 169 192 } else { 170 - log.Info("building in-memory splitter") 193 + logger.Info("building in-memory splitter") 171 194 conf := splitter.SplitterConfig{ 172 - UpstreamHost: upstreamHost, 173 - CursorFile: cctx.String("cursor-file"), 195 + UpstreamHost: upstreamHost, 196 + CollectionDirHost: collectionDirHost, 197 + CursorFile: cctx.String("cursor-file"), 174 198 } 175 199 spl, err = splitter.NewSplitter(conf, nextCrawlers) 176 200 } 177 201 if err != nil { 178 - log.Error("failed to create splitter", "path", persistPath, "error", err) 202 + logger.Error("failed to create splitter", "path", persistPath, "error", err) 179 203 os.Exit(1) 180 204 return err 181 205 } 182 206 183 207 // set up metrics endpoint 208 + metricsListen := cctx.String("metrics-listen") 184 209 go func() { 185 - if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil { 186 - log.Error("failed to start metrics endpoint", "err", err) 210 + if err := spl.StartMetrics(metricsListen); err != nil { 211 + logger.Error("failed to start metrics endpoint", "err", err) 187 212 os.Exit(1) 188 213 } 189 214 }() ··· 191 216 runErr := make(chan error, 1) 192 217 193 218 go func() { 194 - err := spl.Start(cctx.String("api-listen")) 219 + err := spl.StartAPI(cctx.String("api-listen")) 195 220 runErr <- err 196 221 }() 197 222 198 - log.Info("startup complete") 223 + logger.Info("startup complete") 199 224 select { 200 225 case <-signals: 201 - log.Info("received shutdown signal") 226 + logger.Info("received shutdown signal") 202 227 if err := spl.Shutdown(); err != nil { 203 - log.Error("error during Splitter shutdown", "err", err) 228 + logger.Error("error during Splitter shutdown", "err", err) 204 229 } 205 230 case err := <-runErr: 206 231 if err != nil { 207 - log.Error("error during Splitter startup", "err", err) 232 + logger.Error("error during Splitter startup", "err", err) 208 233 } 209 - log.Info("shutting down") 234 + logger.Info("shutting down") 210 235 if err := spl.Shutdown(); err != nil { 211 - log.Error("error during Splitter shutdown", "err", err) 236 + logger.Error("error during Splitter shutdown", "err", err) 212 237 } 213 238 } 214 239 215 - log.Info("shutdown complete") 240 + logger.Info("shutdown complete") 216 241 217 242 return nil 218 243 }
+200
splitter/firehose.go
··· 1 + package splitter 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net" 7 + "strconv" 8 + "sync" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/events" 12 + 13 + "github.com/gorilla/websocket" 14 + "github.com/labstack/echo/v4" 15 + "github.com/prometheus/client_golang/prometheus" 16 + dto "github.com/prometheus/client_model/go" 17 + ) 18 + 19 + func (s *Splitter) HandleSubscribeRepos(c echo.Context) error { 20 + var since *int64 21 + if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 22 + sval, err := strconv.ParseInt(sinceVal, 10, 64) 23 + if err != nil { 24 + return err 25 + } 26 + since = &sval 27 + } 28 + 29 + // NOTE: the request context outlives the HTTP 101 response; it lives as long as the WebSocket is open, and then get cancelled. That is the behavior we want for this ctx, but should be careful if spawning goroutines which should outlive the WebSocket connection. 30 + // https://github.com/bluesky-social/indigo/pull/1023#pullrequestreview-2768335762 31 + ctx, cancel := context.WithCancel(c.Request().Context()) 32 + defer cancel() 33 + 34 + // TODO: authhhh 35 + conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) 36 + if err != nil { 37 + return fmt.Errorf("upgrading websocket: %w", err) 38 + } 39 + 40 + defer conn.Close() 41 + 42 + lastWriteLk := sync.Mutex{} 43 + lastWrite := time.Now() 44 + 45 + // Start a goroutine to ping the client every 30 seconds to check if it's 46 + // still alive. If the client doesn't respond to a ping within 5 seconds, 47 + // we'll close the connection and teardown the consumer. 48 + go func() { 49 + ticker := time.NewTicker(30 * time.Second) 50 + defer ticker.Stop() 51 + 52 + for { 53 + select { 54 + case <-ticker.C: 55 + lastWriteLk.Lock() 56 + lw := lastWrite 57 + lastWriteLk.Unlock() 58 + 59 + if time.Since(lw) < 30*time.Second { 60 + continue 61 + } 62 + 63 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { 64 + s.logger.Error("failed to ping client", "err", err) 65 + cancel() 66 + return 67 + } 68 + case <-ctx.Done(): 69 + return 70 + } 71 + } 72 + }() 73 + 74 + conn.SetPingHandler(func(message string) error { 75 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 76 + if err == websocket.ErrCloseSent { 77 + return nil 78 + } else if e, ok := err.(net.Error); ok && e.Temporary() { 79 + return nil 80 + } 81 + return err 82 + }) 83 + 84 + // Start a goroutine to read messages from the client and discard them. 85 + go func() { 86 + for { 87 + _, _, err := conn.ReadMessage() 88 + if err != nil { 89 + s.logger.Error("failed to read message from client", "err", err) 90 + cancel() 91 + return 92 + } 93 + } 94 + }() 95 + 96 + ident := c.RealIP() + "-" + c.Request().UserAgent() 97 + 98 + evts, cleanup, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) 99 + if err != nil { 100 + return err 101 + } 102 + defer cleanup() 103 + 104 + // Keep track of the consumer for metrics and admin endpoints 105 + consumer := SocketConsumer{ 106 + RemoteAddr: c.RealIP(), 107 + UserAgent: c.Request().UserAgent(), 108 + ConnectedAt: time.Now(), 109 + } 110 + sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) 111 + consumer.EventsSent = sentCounter 112 + 113 + consumerID := s.registerConsumer(&consumer) 114 + defer s.cleanupConsumer(consumerID) 115 + 116 + s.logger.Info("new consumer", 117 + "remote_addr", consumer.RemoteAddr, 118 + "user_agent", consumer.UserAgent, 119 + "cursor", since, 120 + "consumer_id", consumerID, 121 + ) 122 + activeClientGauge.Inc() 123 + defer activeClientGauge.Dec() 124 + 125 + for { 126 + select { 127 + case evt, ok := <-evts: 128 + if !ok { 129 + s.logger.Error("event stream closed unexpectedly") 130 + return nil 131 + } 132 + 133 + wc, err := conn.NextWriter(websocket.BinaryMessage) 134 + if err != nil { 135 + s.logger.Error("failed to get next writer", "err", err) 136 + return err 137 + } 138 + 139 + if evt.Preserialized != nil { 140 + _, err = wc.Write(evt.Preserialized) 141 + } else { 142 + err = evt.Serialize(wc) 143 + } 144 + if err != nil { 145 + return fmt.Errorf("failed to write event: %w", err) 146 + } 147 + 148 + if err := wc.Close(); err != nil { 149 + s.logger.Warn("failed to flush-close our event write", "err", err) 150 + return nil 151 + } 152 + 153 + lastWriteLk.Lock() 154 + lastWrite = time.Now() 155 + lastWriteLk.Unlock() 156 + sentCounter.Inc() 157 + case <-ctx.Done(): 158 + return nil 159 + } 160 + } 161 + } 162 + 163 + type SocketConsumer struct { 164 + UserAgent string 165 + RemoteAddr string 166 + ConnectedAt time.Time 167 + EventsSent prometheus.Counter 168 + } 169 + 170 + func (s *Splitter) registerConsumer(c *SocketConsumer) uint64 { 171 + s.consumersLk.Lock() 172 + defer s.consumersLk.Unlock() 173 + 174 + id := s.nextConsumerID 175 + s.nextConsumerID++ 176 + 177 + s.consumers[id] = c 178 + 179 + return id 180 + } 181 + 182 + func (s *Splitter) cleanupConsumer(id uint64) { 183 + s.consumersLk.Lock() 184 + defer s.consumersLk.Unlock() 185 + 186 + c := s.consumers[id] 187 + 188 + var m = &dto.Metric{} 189 + if err := c.EventsSent.Write(m); err != nil { 190 + s.logger.Error("failed to get sent counter", "err", err) 191 + } 192 + 193 + s.logger.Info("consumer disconnected", 194 + "consumer_id", id, 195 + "remote_addr", c.RemoteAddr, 196 + "user_agent", c.UserAgent, 197 + "events_sent", m.Counter.GetValue()) 198 + 199 + delete(s.consumers, id) 200 + }
+141
splitter/handlers.go
··· 1 + package splitter 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "net/http" 8 + "net/url" 9 + "strings" 10 + 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/xrpc" 13 + 14 + "github.com/labstack/echo/v4" 15 + ) 16 + 17 + type HealthStatus struct { 18 + Service string `json:"service,const=rainbow"` 19 + Status string `json:"status"` 20 + Message string `json:"msg,omitempty"` 21 + } 22 + 23 + func (s *Splitter) HandleHealthCheck(c echo.Context) error { 24 + return c.JSON(http.StatusOK, HealthStatus{Status: "ok"}) 25 + } 26 + 27 + var homeMessage string = ` 28 + _ _ 29 + _ _ __ _(_)_ _ | |__ _____ __ __ 30 + | '_/ _' | | ' \| '_ \/ _ \ V V / 31 + |_| \__,_|_|_||_|_.__/\___/\_/\_/ 32 + 33 + This is an atproto [https://atproto.com] firehose fanout service, running the 'rainbow' codebase [https://github.com/bluesky-social/indigo] 34 + 35 + The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 36 + ` 37 + 38 + func (s *Splitter) HandleHomeMessage(c echo.Context) error { 39 + return c.String(http.StatusOK, homeMessage) 40 + } 41 + 42 + func (s *Splitter) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { 43 + ctx := c.Request().Context() 44 + var body comatproto.SyncRequestCrawl_Input 45 + if err := c.Bind(&body); err != nil { 46 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("invalid body: %s", err)}) 47 + } 48 + if body.Hostname == "" { 49 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "must include a hostname"}) 50 + } 51 + 52 + // first forward to the upstream 53 + xrpcc := xrpc.Client{ 54 + Client: s.upstreamClient, 55 + Host: s.conf.UpstreamHostHTTP(), 56 + } 57 + 58 + err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body) 59 + if err != nil { 60 + httpError, ok := err.(*xrpc.Error) 61 + if ok { 62 + return c.JSON(httpError.StatusCode, xrpc.XRPCError{ErrStr: "UpstreamError", Message: fmt.Sprintf("%s", httpError.Wrapped)}) 63 + } 64 + return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "ProxyRequestFailed", Message: fmt.Sprintf("failed forwarding request: %s", err)}) 65 + } 66 + 67 + // if that was successful, then forward on to the other upstreams (in goroutines) 68 + for _, c := range s.nextCrawlers { 69 + // intentional local copy of loop variable 70 + hostname := c.String() 71 + go func() { 72 + // new context to outlive original HTTP request 73 + ctx := context.Background() 74 + xrpcc := xrpc.Client{ 75 + Client: s.upstreamClient, 76 + Host: hostname, 77 + } 78 + if err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body); err != nil { 79 + s.logger.Warn("failed to forward requestCrawl", "upstream", hostname, "targetHost", body.Hostname, "err", err) 80 + } 81 + s.logger.Info("successfully forwarded requestCrawl", "upstream", hostname, "targetHost", body.Hostname) 82 + }() 83 + } 84 + 85 + return c.JSON(http.StatusOK, map[string]any{"success": true}) 86 + } 87 + 88 + // Proxies a request to the single upstream (relay) 89 + func (s *Splitter) ProxyRequestUpstream(c echo.Context) error { 90 + u, err := url.Parse(s.conf.UpstreamHostHTTP()) 91 + if err != nil { 92 + return err 93 + } 94 + return s.ProxyRequest(c, u.Host, u.Scheme) 95 + } 96 + 97 + // Proxies a request to the collectiondir 98 + func (s *Splitter) ProxyRequestCollectionDir(c echo.Context) error { 99 + u, err := url.Parse(s.conf.CollectionDirHost) 100 + if err != nil { 101 + return err 102 + } 103 + return s.ProxyRequest(c, u.Host, u.Scheme) 104 + } 105 + 106 + func (s *Splitter) ProxyRequest(c echo.Context, hostname, scheme string) error { 107 + 108 + req := c.Request() 109 + respWriter := c.Response() 110 + 111 + u := req.URL 112 + u.Scheme = scheme 113 + u.Host = hostname 114 + upstreamReq, err := http.NewRequest(req.Method, u.String(), req.Body) 115 + if err != nil { 116 + s.logger.Warn("proxy request failed", "err", err) 117 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"}) 118 + } 119 + 120 + for k, vals := range req.Header { 121 + if strings.ToLower(k) == "accept" { 122 + upstreamReq.Header.Add(k, vals[0]) 123 + } 124 + } 125 + 126 + upstreamResp, err := s.upstreamClient.Do(upstreamReq) 127 + if err != nil { 128 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "failed to proxy to upstream relay"}) 129 + } 130 + defer upstreamResp.Body.Close() 131 + 132 + respWriter.Header()["Content-Type"] = []string{upstreamResp.Header.Get("Content-Type")} 133 + respWriter.WriteHeader(upstreamResp.StatusCode) 134 + 135 + _, err = io.Copy(respWriter, upstreamResp.Body) 136 + if err != nil { 137 + s.logger.Error("error copying proxy body", "err", err) 138 + } 139 + 140 + return nil 141 + }
+125 -452
splitter/splitter.go
··· 1 1 package splitter 2 2 3 3 import ( 4 - "bytes" 5 4 "context" 6 - "encoding/json" 7 5 "errors" 8 6 "fmt" 9 7 "io" ··· 18 16 "sync" 19 17 "time" 20 18 21 - "github.com/bluesky-social/indigo/api/atproto" 22 - comatproto "github.com/bluesky-social/indigo/api/atproto" 23 19 "github.com/bluesky-social/indigo/events" 24 20 "github.com/bluesky-social/indigo/events/pebblepersist" 25 21 "github.com/bluesky-social/indigo/events/schedulers/sequential" 26 22 "github.com/bluesky-social/indigo/util" 27 23 "github.com/bluesky-social/indigo/util/svcutil" 28 - "github.com/bluesky-social/indigo/xrpc" 29 24 30 25 "github.com/gorilla/websocket" 31 26 "github.com/labstack/echo/v4" 32 27 "github.com/labstack/echo/v4/middleware" 33 - "github.com/prometheus/client_golang/prometheus" 34 28 "github.com/prometheus/client_golang/prometheus/promhttp" 35 - dto "github.com/prometheus/client_model/go" 36 - "go.opentelemetry.io/otel" 37 29 ) 38 30 39 31 type Splitter struct { ··· 48 40 49 41 conf SplitterConfig 50 42 51 - log *slog.Logger 43 + logger *slog.Logger 52 44 53 - httpC *http.Client 54 - nextCrawlers []*url.URL 45 + upstreamClient *http.Client 46 + nextCrawlers []*url.URL 55 47 } 56 48 57 49 type SplitterConfig struct { 58 - UpstreamHost string 59 - CursorFile string 60 - PebbleOptions *pebblepersist.PebblePersistOptions 50 + UpstreamHost string 51 + CollectionDirHost string 52 + CursorFile string 53 + UserAgent string 54 + PebbleOptions *pebblepersist.PebblePersistOptions 55 + Logger *slog.Logger 61 56 } 62 57 63 - func (sc *SplitterConfig) XrpcRootUrl() string { 64 - if strings.HasPrefix(sc.UpstreamHost, "http://") { 65 - return sc.UpstreamHost 58 + func (sc *SplitterConfig) UpstreamHostWebsocket() string { 59 + 60 + if !strings.Contains(sc.UpstreamHost, "://") { 61 + return "wss://" + sc.UpstreamHost 66 62 } 67 - if strings.HasPrefix(sc.UpstreamHost, "https://") { 68 - return sc.UpstreamHost 63 + u, err := url.Parse(sc.UpstreamHost) 64 + if err != nil { 65 + // this will cause an error downstream 66 + return "" 69 67 } 70 - if strings.HasPrefix(sc.UpstreamHost, "ws://") { 71 - return "http://" + sc.UpstreamHost[5:] 72 - } 73 - if strings.HasPrefix(sc.UpstreamHost, "wss://") { 74 - return "https://" + sc.UpstreamHost[6:] 68 + 69 + switch u.Scheme { 70 + case "http", "ws": 71 + return "ws://" + u.Host 72 + case "https", "wss": 73 + return "wss://" + u.Host 74 + default: 75 + return "wss://" + u.Host 75 76 } 76 - return "https://" + sc.UpstreamHost 77 77 } 78 78 79 - func (sc *SplitterConfig) UpstreamUrl() string { 80 - if strings.HasPrefix(sc.UpstreamHost, "ws://") { 81 - return "http://" + sc.UpstreamHost[7:] 79 + func (sc *SplitterConfig) UpstreamHostHTTP() string { 80 + 81 + if !strings.Contains(sc.UpstreamHost, "://") { 82 + return "https://" + sc.UpstreamHost 82 83 } 83 - if strings.HasPrefix(sc.UpstreamHost, "wss://") { 84 - return "https://" + sc.UpstreamHost[8:] 84 + u, err := url.Parse(sc.UpstreamHost) 85 + if err != nil { 86 + // this will cause an error downstream 87 + return "" 85 88 } 86 - if strings.HasPrefix(sc.UpstreamHost, "ws://") { 87 - return sc.UpstreamHost 89 + 90 + switch u.Scheme { 91 + case "http", "ws": 92 + return "http://" + u.Host 93 + case "https", "wss": 94 + return "https://" + u.Host 95 + default: 96 + return "https://" + u.Host 88 97 } 89 - if strings.HasPrefix(sc.UpstreamHost, "wss://") { 90 - return sc.UpstreamHost 91 - } 92 - return "wss://" + sc.UpstreamHost 93 98 } 94 99 95 100 func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) { 101 + 102 + logger := conf.Logger 103 + if logger == nil { 104 + logger = slog.Default().With("system", "splitter") 105 + } 106 + 96 107 var nextCrawlerURLs []*url.URL 97 - log := slog.Default().With("system", "splitter") 98 108 if len(nextCrawlers) > 0 { 99 109 nextCrawlerURLs = make([]*url.URL, len(nextCrawlers)) 100 110 for i, tu := range nextCrawlers { ··· 103 113 if err != nil { 104 114 return nil, fmt.Errorf("failed to parse next-crawler url: %w", err) 105 115 } 106 - log.Info("configuring relay for requestCrawl", "host", nextCrawlerURLs[i]) 116 + logger.Info("configuring relay for requestCrawl", "host", nextCrawlerURLs[i]) 107 117 } 108 118 } 109 119 110 - _, err := url.Parse(conf.UpstreamUrl()) 120 + _, err := url.Parse(conf.UpstreamHostHTTP()) 111 121 if err != nil { 112 - return nil, fmt.Errorf("failed to parse upstream url %#v: %w", conf.UpstreamUrl(), err) 122 + return nil, fmt.Errorf("failed to parse upstream url %#v: %w", conf.UpstreamHostHTTP(), err) 113 123 } 114 124 115 125 s := &Splitter{ 116 - conf: conf, 117 - consumers: make(map[uint64]*SocketConsumer), 118 - log: log, 119 - httpC: util.RobustHTTPClient(), 120 - nextCrawlers: nextCrawlerURLs, 126 + conf: conf, 127 + consumers: make(map[uint64]*SocketConsumer), 128 + logger: logger, 129 + upstreamClient: util.RobustHTTPClient(), 130 + nextCrawlers: nextCrawlerURLs, 121 131 } 122 132 123 133 if conf.PebbleOptions == nil { ··· 137 147 138 148 return s, nil 139 149 } 140 - func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) { 141 - ppopts := pebblepersist.PebblePersistOptions{ 142 - DbPath: path, 143 - PersistDuration: time.Duration(float64(time.Hour) * persistHours), 144 - GCPeriod: 5 * time.Minute, 145 - MaxBytes: uint64(maxBytes), 146 - } 147 - conf := SplitterConfig{ 148 - UpstreamHost: host, 149 - CursorFile: "cursor-file", 150 - PebbleOptions: &ppopts, 151 - } 152 - pp, err := pebblepersist.NewPebblePersistance(&ppopts) 153 - if err != nil { 154 - return nil, err 155 - } 156 - 157 - go pp.GCThread(context.Background()) 158 - em := events.NewEventManager(pp) 159 - return &Splitter{ 160 - conf: conf, 161 - pp: pp, 162 - events: em, 163 - consumers: make(map[uint64]*SocketConsumer), 164 - log: slog.Default().With("system", "splitter"), 165 - }, nil 166 - } 167 150 168 - func (s *Splitter) Start(addr string) error { 151 + func (s *Splitter) StartAPI(addr string) error { 169 152 var lc net.ListenConfig 170 153 ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) 171 154 defer cancel() ··· 181 164 if err != nil { 182 165 return err 183 166 } 184 - return s.StartWithListener(li) 167 + return s.startWithListener(li) 185 168 } 186 169 187 170 func (s *Splitter) StartMetrics(listen string) error { ··· 193 176 return nil 194 177 } 195 178 196 - func (s *Splitter) StartWithListener(listen net.Listener) error { 179 + func (s *Splitter) startWithListener(listen net.Listener) error { 197 180 e := echo.New() 198 181 e.HideBanner = true 199 182 ··· 213 196 */ 214 197 215 198 e.Use(svcutil.MetricsMiddleware) 199 + e.HTTPErrorHandler = s.errorHandler 216 200 217 - e.HTTPErrorHandler = func(err error, ctx echo.Context) { 218 - switch err := err.(type) { 219 - case *echo.HTTPError: 220 - if err2 := ctx.JSON(err.Code, map[string]any{ 221 - "error": err.Message, 222 - }); err2 != nil { 223 - s.log.Error("Failed to write http error", "err", err2) 224 - } 225 - default: 226 - sendHeader := true 227 - if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { 228 - sendHeader = false 229 - } 201 + e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 202 + e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.HandleSubscribeRepos) 230 203 231 - s.log.Warn("HANDLER ERROR", "path", ctx.Path(), "err", err) 232 - 233 - if strings.HasPrefix(ctx.Path(), "/admin/") { 234 - ctx.JSON(500, map[string]any{ 235 - "error": err.Error(), 236 - }) 237 - return 238 - } 239 - 240 - if sendHeader { 241 - ctx.Response().WriteHeader(500) 242 - } 243 - } 244 - } 245 - 246 - // TODO: this API is temporary until we formalize what we want here 204 + // proxy endpoints to upstream (relay) 205 + e.GET("/xrpc/com.atproto.sync.listRepos", s.ProxyRequestUpstream) 206 + e.GET("/xrpc/com.atproto.sync.getRepoStatus", s.ProxyRequestUpstream) 207 + e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.ProxyRequestUpstream) 208 + e.GET("/xrpc/com.atproto.sync.listHosts", s.ProxyRequestUpstream) 209 + e.GET("/xrpc/com.atproto.sync.getHostStatus", s.ProxyRequestUpstream) 210 + e.GET("/xrpc/com.atproto.sync.getRepo", s.ProxyRequestUpstream) 247 211 248 - e.POST("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler) 249 - e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) 250 - e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) 212 + // proxy endpoint to collectiondir 213 + e.GET("/xrpc/com.atproto.sync.listReposByCollection", s.ProxyRequestCollectionDir) 251 214 252 215 e.GET("/xrpc/_health", s.HandleHealthCheck) 253 216 e.GET("/_health", s.HandleHealthCheck) ··· 261 224 return e.StartServer(srv) 262 225 } 263 226 264 - type HealthStatus struct { 265 - Status string `json:"status"` 266 - Message string `json:"msg,omitempty"` 267 - } 268 - 269 - func (s *Splitter) HandleHealthCheck(c echo.Context) error { 270 - return c.JSON(200, HealthStatus{Status: "ok"}) 271 - } 272 - 273 - var homeMessage string = ` 274 - _ _ 275 - _ _ __ _(_)_ _ | |__ _____ __ __ 276 - | '_/ _' | | ' \| '_ \/ _ \ V V / 277 - |_| \__,_|_|_||_|_.__/\___/\_/\_/ 278 - 279 - This is an atproto [https://atproto.com] firehose fanout service, running the 'rainbow' codebase [https://github.com/bluesky-social/indigo] 280 - 281 - The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 282 - ` 283 - 284 - func (s *Splitter) HandleHomeMessage(c echo.Context) error { 285 - return c.String(http.StatusOK, homeMessage) 286 - } 287 - 288 - type XRPCError struct { 289 - Message string `json:"message"` 290 - } 291 - 292 - func (s *Splitter) RequestCrawlHandler(c echo.Context) error { 293 - ctx := c.Request().Context() 294 - var body comatproto.SyncRequestCrawl_Input 295 - if err := c.Bind(&body); err != nil { 296 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid body: %s", err)}) 297 - } 298 - 299 - host := body.Hostname 300 - if host == "" { 301 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") 302 - } 303 - 304 - if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 305 - host = "https://" + host 306 - } 307 - 308 - u, err := url.Parse(host) 309 - if err != nil { 310 - return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 311 - } 312 - 313 - if u.Scheme == "http" { 314 - return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 315 - } 316 - if u.Path != "" { 317 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") 318 - } 319 - 320 - if u.Query().Encode() != "" { 321 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") 322 - } 323 - 324 - host = u.Host // potentially hostname:port 325 - 326 - clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) 327 - 328 - xrpcC := &xrpc.Client{ 329 - Host: clientHost, 330 - Client: http.DefaultClient, // not using the client that auto-retries 331 - } 332 - 333 - desc, err := atproto.ServerDescribeServer(ctx, xrpcC) 334 - if err != nil { 335 - errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) 336 - return echo.NewHTTPError(http.StatusBadRequest, errMsg) 337 - } 227 + func (s *Splitter) errorHandler(err error, ctx echo.Context) { 228 + switch err := err.(type) { 229 + case *echo.HTTPError: 230 + if err2 := ctx.JSON(err.Code, map[string]any{ 231 + "error": err.Message, 232 + }); err2 != nil { 233 + s.logger.Error("Failed to write http error", "err", err2) 234 + } 235 + default: 236 + sendHeader := true 237 + if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { 238 + sendHeader = false 239 + } 338 240 339 - // Maybe we could do something with this response later 340 - _ = desc 241 + s.logger.Warn("HANDLER ERROR", "path", ctx.Path(), "err", err) 341 242 342 - if len(s.nextCrawlers) != 0 { 343 - blob, err := json.Marshal(body) 344 - if err != nil { 345 - s.log.Warn("could not forward requestCrawl, json err", "err", err) 346 - } else { 347 - go func(bodyBlob []byte) { 348 - for _, remote := range s.nextCrawlers { 349 - if remote == nil { 350 - continue 351 - } 243 + if strings.HasPrefix(ctx.Path(), "/admin/") { 244 + ctx.JSON(500, map[string]any{ 245 + "error": err.Error(), 246 + }) 247 + return 248 + } 352 249 353 - pu := remote.JoinPath("/xrpc/com.atproto.sync.requestCrawl") 354 - response, err := s.httpC.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob)) 355 - if response != nil && response.Body != nil { 356 - response.Body.Close() 357 - } 358 - if err != nil || response == nil { 359 - s.log.Warn("requestCrawl forward failed", "host", remote, "err", err) 360 - } else if response.StatusCode != http.StatusOK { 361 - s.log.Warn("requestCrawl forward failed", "host", remote, "status", response.Status) 362 - } else { 363 - s.log.Info("requestCrawl forward successful", "host", remote) 364 - } 365 - } 366 - }(blob) 250 + if sendHeader { 251 + ctx.Response().WriteHeader(500) 367 252 } 368 253 } 369 - 370 - return c.JSON(200, HealthStatus{Status: "ok"}) 371 254 } 372 255 373 - func (s *Splitter) HandleComAtprotoSyncListRepos(c echo.Context) error { 374 - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos") 375 - defer span.End() 376 - 377 - cursorQuery := c.QueryParam("cursor") 378 - limitQuery := c.QueryParam("limit") 379 - 380 - var err error 381 - 382 - limit := int64(500) 383 - if limitQuery != "" { 384 - limit, err = strconv.ParseInt(limitQuery, 10, 64) 385 - if err != nil || limit < 1 || limit > 1000 { 386 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", limitQuery)}) 256 + func (s *Splitter) getLastCursor() (int64, error) { 257 + if s.pp != nil { 258 + seq, millis, _, err := s.pp.GetLast(context.Background()) 259 + if err == nil { 260 + s.logger.Debug("got last cursor from pebble", "seq", seq, "millis", millis) 261 + return seq, nil 262 + } else if errors.Is(err, pebblepersist.ErrNoLast) { 263 + s.logger.Info("pebble no last") 264 + } else { 265 + s.logger.Error("pebble seq fail", "err", err) 387 266 } 388 267 } 389 268 390 - client := xrpc.Client{ 391 - Client: s.httpC, 392 - Host: s.conf.XrpcRootUrl(), 393 - } 394 - 395 - out, handleErr := atproto.SyncListRepos(ctx, &client, cursorQuery, limit) 396 - if handleErr != nil { 397 - return handleErr 398 - } 399 - return c.JSON(200, out) 400 - } 401 - 402 - func (s *Splitter) EventsHandler(c echo.Context) error { 403 - var since *int64 404 - if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 405 - sval, err := strconv.ParseInt(sinceVal, 10, 64) 406 - if err != nil { 407 - return err 269 + fi, err := os.Open(s.conf.CursorFile) 270 + if err != nil { 271 + if os.IsNotExist(err) { 272 + return -1, nil 408 273 } 409 - since = &sval 274 + return -1, err 410 275 } 411 276 412 - ctx, cancel := context.WithCancel(c.Request().Context()) 413 - defer cancel() 414 - 415 - // TODO: authhhh 416 - conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) 277 + b, err := io.ReadAll(fi) 417 278 if err != nil { 418 - return fmt.Errorf("upgrading websocket: %w", err) 279 + return -1, err 419 280 } 420 281 421 - defer conn.Close() 422 - 423 - lastWriteLk := sync.Mutex{} 424 - lastWrite := time.Now() 425 - 426 - // Start a goroutine to ping the client every 30 seconds to check if it's 427 - // still alive. If the client doesn't respond to a ping within 5 seconds, 428 - // we'll close the connection and teardown the consumer. 429 - go func() { 430 - ticker := time.NewTicker(30 * time.Second) 431 - defer ticker.Stop() 432 - 433 - for { 434 - select { 435 - case <-ticker.C: 436 - lastWriteLk.Lock() 437 - lw := lastWrite 438 - lastWriteLk.Unlock() 439 - 440 - if time.Since(lw) < 30*time.Second { 441 - continue 442 - } 443 - 444 - if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { 445 - s.log.Error("failed to ping client", "err", err) 446 - cancel() 447 - return 448 - } 449 - case <-ctx.Done(): 450 - return 451 - } 452 - } 453 - }() 454 - 455 - conn.SetPingHandler(func(message string) error { 456 - err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 457 - if err == websocket.ErrCloseSent { 458 - return nil 459 - } else if e, ok := err.(net.Error); ok && e.Temporary() { 460 - return nil 461 - } 462 - return err 463 - }) 464 - 465 - // Start a goroutine to read messages from the client and discard them. 466 - go func() { 467 - for { 468 - _, _, err := conn.ReadMessage() 469 - if err != nil { 470 - s.log.Error("failed to read message from client", "err", err) 471 - cancel() 472 - return 473 - } 474 - } 475 - }() 476 - 477 - ident := c.RealIP() + "-" + c.Request().UserAgent() 478 - 479 - evts, cleanup, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) 282 + v, err := strconv.ParseInt(string(b), 10, 64) 480 283 if err != nil { 481 - return err 482 - } 483 - defer cleanup() 484 - 485 - // Keep track of the consumer for metrics and admin endpoints 486 - consumer := SocketConsumer{ 487 - RemoteAddr: c.RealIP(), 488 - UserAgent: c.Request().UserAgent(), 489 - ConnectedAt: time.Now(), 490 - } 491 - sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) 492 - consumer.EventsSent = sentCounter 493 - 494 - consumerID := s.registerConsumer(&consumer) 495 - defer s.cleanupConsumer(consumerID) 496 - 497 - s.log.Info("new consumer", 498 - "remote_addr", consumer.RemoteAddr, 499 - "user_agent", consumer.UserAgent, 500 - "cursor", since, 501 - "consumer_id", consumerID, 502 - ) 503 - activeClientGauge.Inc() 504 - defer activeClientGauge.Dec() 505 - 506 - for { 507 - select { 508 - case evt, ok := <-evts: 509 - if !ok { 510 - s.log.Error("event stream closed unexpectedly") 511 - return nil 512 - } 513 - 514 - wc, err := conn.NextWriter(websocket.BinaryMessage) 515 - if err != nil { 516 - s.log.Error("failed to get next writer", "err", err) 517 - return err 518 - } 519 - 520 - if evt.Preserialized != nil { 521 - _, err = wc.Write(evt.Preserialized) 522 - } else { 523 - err = evt.Serialize(wc) 524 - } 525 - if err != nil { 526 - return fmt.Errorf("failed to write event: %w", err) 527 - } 528 - 529 - if err := wc.Close(); err != nil { 530 - s.log.Warn("failed to flush-close our event write", "err", err) 531 - return nil 532 - } 533 - 534 - lastWriteLk.Lock() 535 - lastWrite = time.Now() 536 - lastWriteLk.Unlock() 537 - sentCounter.Inc() 538 - case <-ctx.Done(): 539 - return nil 540 - } 284 + return -1, err 541 285 } 542 - } 543 286 544 - type SocketConsumer struct { 545 - UserAgent string 546 - RemoteAddr string 547 - ConnectedAt time.Time 548 - EventsSent prometheus.Counter 549 - } 550 - 551 - func (s *Splitter) registerConsumer(c *SocketConsumer) uint64 { 552 - s.consumersLk.Lock() 553 - defer s.consumersLk.Unlock() 554 - 555 - id := s.nextConsumerID 556 - s.nextConsumerID++ 557 - 558 - s.consumers[id] = c 559 - 560 - return id 287 + return v, nil 561 288 } 562 289 563 - func (s *Splitter) cleanupConsumer(id uint64) { 564 - s.consumersLk.Lock() 565 - defer s.consumersLk.Unlock() 566 - 567 - c := s.consumers[id] 568 - 569 - var m = &dto.Metric{} 570 - if err := c.EventsSent.Write(m); err != nil { 571 - s.log.Error("failed to get sent counter", "err", err) 572 - } 573 - 574 - s.log.Info("consumer disconnected", 575 - "consumer_id", id, 576 - "remote_addr", c.RemoteAddr, 577 - "user_agent", c.UserAgent, 578 - "events_sent", m.Counter.GetValue()) 579 - 580 - delete(s.consumers, id) 290 + func (s *Splitter) writeCursor(curs int64) error { 291 + return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) 581 292 } 582 293 583 294 func sleepForBackoff(b int) time.Duration { ··· 595 306 func (s *Splitter) subscribeWithRedialer(ctx context.Context, cursor int64) { 596 307 d := websocket.Dialer{} 597 308 598 - upstreamUrl, err := url.Parse(s.conf.UpstreamUrl()) 309 + upstreamUrl, err := url.Parse(s.conf.UpstreamHostWebsocket()) 599 310 if err != nil { 600 311 panic(err) // this should have been checked in NewSplitter 601 312 } 602 313 upstreamUrl = upstreamUrl.JoinPath("/xrpc/com.atproto.sync.subscribeRepos") 314 + 315 + header := http.Header{ 316 + "User-Agent": []string{s.conf.UserAgent}, 317 + } 603 318 604 319 var backoff int 605 320 for { ··· 609 324 default: 610 325 } 611 326 612 - header := http.Header{ 613 - "User-Agent": []string{"bgs-rainbow-v0"}, 614 - } 615 - 616 327 var uurl string 617 328 if cursor < 0 { 618 329 upstreamUrl.RawQuery = "" ··· 623 334 } 624 335 con, res, err := d.DialContext(ctx, uurl, header) 625 336 if err != nil { 626 - s.log.Warn("dialing failed", "url", uurl, "err", err, "backoff", backoff) 337 + s.logger.Warn("dialing failed", "url", uurl, "err", err, "backoff", backoff) 627 338 time.Sleep(sleepForBackoff(backoff)) 628 339 backoff++ 629 340 630 341 continue 631 342 } 632 343 633 - s.log.Info("event subscription response", "code", res.StatusCode) 344 + s.logger.Info("event subscription response", "code", res.StatusCode) 634 345 635 - if err := s.handleConnection(ctx, con, &cursor); err != nil { 636 - s.log.Warn("connection failed", "url", uurl, "err", err) 346 + if err := s.handleUpstreamConnection(ctx, con, &cursor); err != nil { 347 + s.logger.Warn("upstream connection failed", "url", uurl, "err", err) 637 348 } 638 349 } 639 350 } 640 351 641 - func (s *Splitter) handleConnection(ctx context.Context, con *websocket.Conn, lastCursor *int64) error { 352 + func (s *Splitter) handleUpstreamConnection(ctx context.Context, con *websocket.Conn, lastCursor *int64) error { 642 353 ctx, cancel := context.WithCancel(ctx) 643 354 defer cancel() 644 355 ··· 656 367 if seq%5000 == 0 { 657 368 // TODO: don't need this after we move to getting seq from pebble 658 369 if err := s.writeCursor(seq); err != nil { 659 - s.log.Error("write cursor failed", "err", err) 370 + s.logger.Error("write cursor failed", "err", err) 660 371 } 661 372 } 662 373 ··· 666 377 667 378 return events.HandleRepoStream(ctx, con, sched, nil) 668 379 } 669 - 670 - func (s *Splitter) getLastCursor() (int64, error) { 671 - if s.pp != nil { 672 - seq, millis, _, err := s.pp.GetLast(context.Background()) 673 - if err == nil { 674 - s.log.Debug("got last cursor from pebble", "seq", seq, "millis", millis) 675 - return seq, nil 676 - } else if errors.Is(err, pebblepersist.ErrNoLast) { 677 - s.log.Info("pebble no last") 678 - } else { 679 - s.log.Error("pebble seq fail", "err", err) 680 - } 681 - } 682 - 683 - fi, err := os.Open(s.conf.CursorFile) 684 - if err != nil { 685 - if os.IsNotExist(err) { 686 - return -1, nil 687 - } 688 - return -1, err 689 - } 690 - 691 - b, err := io.ReadAll(fi) 692 - if err != nil { 693 - return -1, err 694 - } 695 - 696 - v, err := strconv.ParseInt(string(b), 10, 64) 697 - if err != nil { 698 - return -1, err 699 - } 700 - 701 - return v, nil 702 - } 703 - 704 - func (s *Splitter) writeCursor(curs int64) error { 705 - return os.WriteFile(s.conf.CursorFile, []byte(fmt.Sprint(curs)), 0664) 706 - }
+30
util/svcutil/logger.go
··· 1 + package svcutil 2 + 3 + import ( 4 + "io" 5 + "log/slog" 6 + "strings" 7 + 8 + "github.com/urfave/cli/v2" 9 + ) 10 + 11 + func ConfigLogger(cctx *cli.Context, writer io.Writer) *slog.Logger { 12 + var level slog.Level 13 + switch strings.ToLower(cctx.String("log-level")) { 14 + case "error": 15 + level = slog.LevelError 16 + case "warn": 17 + level = slog.LevelWarn 18 + case "info": 19 + level = slog.LevelInfo 20 + case "debug": 21 + level = slog.LevelDebug 22 + default: 23 + level = slog.LevelInfo 24 + } 25 + logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{ 26 + Level: level, 27 + })) 28 + slog.SetDefault(logger) 29 + return logger 30 + }