fork of indigo with slightly nicer lexgen
at main 7.2 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "os" 8 "os/signal" 9 "syscall" 10 "time" 11 12 _ "github.com/joho/godotenv/autoload" 13 _ "go.uber.org/automaxprocs" 14 _ "net/http/pprof" 15 16 "github.com/bluesky-social/indigo/events/pebblepersist" 17 "github.com/bluesky-social/indigo/splitter" 18 "github.com/bluesky-social/indigo/util/svcutil" 19 20 "github.com/carlmjohnson/versioninfo" 21 "github.com/urfave/cli/v2" 22 "go.opentelemetry.io/otel" 23 "go.opentelemetry.io/otel/attribute" 24 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 25 "go.opentelemetry.io/otel/sdk/resource" 26 tracesdk "go.opentelemetry.io/otel/sdk/trace" 27 semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 28) 29 30func main() { 31 if err := run(os.Args); err != nil { 32 slog.Error("exiting", "err", err) 33 os.Exit(1) 34 } 35} 36 37func run(args []string) error { 38 39 app := cli.App{ 40 Name: "rainbow", 41 Usage: "atproto firehose fan-out daemon", 42 Version: versioninfo.Short(), 43 Action: runSplitter, 44 } 45 46 app.Flags = []cli.Flag{ 47 &cli.StringFlag{ 48 Name: "log-level", 49 Usage: "log verbosity level (eg: warn, info, debug)", 50 EnvVars: []string{"RAINBOW_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 51 }, 52 &cli.StringFlag{ 53 Name: "upstream-host", 54 Value: "http://localhost:2470", 55 Usage: "URL (schema and hostname, no path) of the upstream host (eg, relay)", 56 EnvVars: []string{"ATP_RELAY_HOST", "RAINBOW_RELAY_HOST"}, 57 }, 58 &cli.StringFlag{ 59 Name: "persist-db", 60 Value: "./rainbow.db", 61 Usage: "path to persistence db", 62 EnvVars: []string{"RAINBOW_DB_PATH"}, 63 }, 64 &cli.StringFlag{ 65 Name: "cursor-file", 66 Value: "./rainbow-cursor", 67 Usage: "write upstream cursor number to this file", 68 EnvVars: []string{"RAINBOW_CURSOR_PATH"}, 69 }, 70 &cli.StringFlag{ 71 Name: "api-listen", 72 Value: ":2480", 73 EnvVars: []string{"RAINBOW_API_LISTEN"}, 74 }, 75 &cli.StringFlag{ 76 Name: "metrics-listen", 77 Value: ":2481", 78 EnvVars: []string{"RAINBOW_METRICS_LISTEN", "SPLITTER_METRICS_LISTEN"}, 79 }, 80 &cli.Float64Flag{ 81 Name: "persist-hours", 82 Value: 24 * 3, 83 EnvVars: []string{"RAINBOW_PERSIST_HOURS", "SPLITTER_PERSIST_HOURS"}, 84 Usage: "hours to buffer (float, may be fractional)", 85 }, 86 &cli.Int64Flag{ 87 Name: "persist-bytes", 88 Value: 0, 89 Usage: "max bytes target for event cache, 0 to disable size target trimming", 90 EnvVars: []string{"RAINBOW_PERSIST_BYTES", "SPLITTER_PERSIST_BYTES"}, 91 }, 92 &cli.StringSliceFlag{ 93 // TODO: better name for this argument 94 Name: "next-crawler", 95 Usage: "forward POST requestCrawl to these hosts (schema and host, no path) in addition to upstream-host. Comma-separated or multiple flags", 96 EnvVars: []string{"RAINBOW_NEXT_CRAWLER", "RELAY_NEXT_CRAWLER"}, 97 }, 98 &cli.StringFlag{ 99 Name: "collectiondir-host", 100 Value: "http://localhost:2510", 101 Usage: "host (schema and hostname, no path) of upstream collectiondir instance, for com.atproto.sync.listReposByCollection", 102 EnvVars: []string{"RAINBOW_COLLECTIONDIR_HOST"}, 103 }, 104 &cli.StringFlag{ 105 Name: "env", 106 Usage: "operating environment (eg, 'prod', 'test')", 107 Value: "dev", 108 EnvVars: []string{"ENVIRONMENT"}, 109 }, 110 &cli.BoolFlag{ 111 Name: "enable-otel-otlp", 112 Usage: "enables OTEL OTLP exporter endpoint", 113 EnvVars: []string{"RAINBOW_ENABLE_OTEL_OTLP", "ENABLE_OTEL_OTLP"}, 114 }, 115 &cli.StringFlag{ 116 Name: "otel-otlp-endpoint", 117 Usage: "OTEL traces export endpoint", 118 Value: "http://localhost:4318", 119 EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"}, 120 }, 121 } 122 123 return app.Run(args) 124} 125 126func runSplitter(cctx *cli.Context) error { 127 // Trap SIGINT to trigger a shutdown. 128 signals := make(chan os.Signal, 1) 129 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 130 131 logger := svcutil.ConfigLogger(cctx, os.Stdout).With("system", "rainbow") 132 133 // Enable OTLP HTTP exporter 134 // For relevant environment variables: 135 // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 136 if cctx.Bool("enable-otel-otlp") { 137 ep := cctx.String("otel-otlp-endpoint") 138 logger.Info("setting up trace exporter", "endpoint", ep) 139 ctx, cancel := context.WithCancel(context.Background()) 140 defer cancel() 141 142 exp, err := otlptracehttp.New(ctx) 143 if err != nil { 144 logger.Error("failed to create trace exporter", "error", err) 145 os.Exit(1) 146 } 147 defer func() { 148 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 149 defer cancel() 150 if err := exp.Shutdown(ctx); err != nil { 151 logger.Error("failed to shutdown trace exporter", "error", err) 152 } 153 }() 154 155 env := cctx.String("env") 156 tp := tracesdk.NewTracerProvider( 157 tracesdk.WithBatcher(exp), 158 tracesdk.WithResource(resource.NewWithAttributes( 159 semconv.SchemaURL, 160 semconv.ServiceNameKey.String("splitter"), 161 attribute.String("env", env), // DataDog 162 attribute.String("environment", env), // Others 163 attribute.Int64("ID", 1), 164 )), 165 ) 166 otel.SetTracerProvider(tp) 167 } 168 169 persistPath := cctx.String("persist-db") 170 upstreamHost := cctx.String("upstream-host") 171 collectionDirHost := cctx.String("collectiondir-host") 172 nextCrawlers := cctx.StringSlice("next-crawler") 173 174 var spl *splitter.Splitter 175 var err error 176 if persistPath != "" { 177 logger.Info("building splitter with storage at", "path", persistPath) 178 ppopts := pebblepersist.PebblePersistOptions{ 179 DbPath: persistPath, 180 PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")), 181 GCPeriod: 5 * time.Minute, 182 MaxBytes: uint64(cctx.Int64("persist-bytes")), 183 } 184 conf := splitter.SplitterConfig{ 185 UpstreamHost: upstreamHost, 186 CollectionDirHost: collectionDirHost, 187 CursorFile: cctx.String("cursor-file"), 188 PebbleOptions: &ppopts, 189 UserAgent: fmt.Sprintf("rainbow/%s (atproto-relay)", versioninfo.Short()), 190 } 191 spl, err = splitter.NewSplitter(conf, nextCrawlers) 192 } else { 193 logger.Info("building in-memory splitter") 194 conf := splitter.SplitterConfig{ 195 UpstreamHost: upstreamHost, 196 CollectionDirHost: collectionDirHost, 197 CursorFile: cctx.String("cursor-file"), 198 } 199 spl, err = splitter.NewSplitter(conf, nextCrawlers) 200 } 201 if err != nil { 202 logger.Error("failed to create splitter", "path", persistPath, "error", err) 203 os.Exit(1) 204 return err 205 } 206 207 // set up metrics endpoint 208 metricsListen := cctx.String("metrics-listen") 209 go func() { 210 if err := spl.StartMetrics(metricsListen); err != nil { 211 logger.Error("failed to start metrics endpoint", "err", err) 212 os.Exit(1) 213 } 214 }() 215 216 runErr := make(chan error, 1) 217 218 go func() { 219 err := spl.StartAPI(cctx.String("api-listen")) 220 runErr <- err 221 }() 222 223 logger.Info("startup complete") 224 select { 225 case <-signals: 226 logger.Info("received shutdown signal") 227 if err := spl.Shutdown(); err != nil { 228 logger.Error("error during Splitter shutdown", "err", err) 229 } 230 case err := <-runErr: 231 if err != nil { 232 logger.Error("error during Splitter startup", "err", err) 233 } 234 logger.Info("shutting down") 235 if err := spl.Shutdown(); err != nil { 236 logger.Error("error during Splitter shutdown", "err", err) 237 } 238 } 239 240 logger.Info("shutdown complete") 241 242 return nil 243}