1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os"
8 "os/signal"
9 "syscall"
10 "time"
11
12 _ "github.com/joho/godotenv/autoload"
13 _ "go.uber.org/automaxprocs"
14 _ "net/http/pprof"
15
16 "github.com/bluesky-social/indigo/events/pebblepersist"
17 "github.com/bluesky-social/indigo/splitter"
18 "github.com/bluesky-social/indigo/util/svcutil"
19
20 "github.com/carlmjohnson/versioninfo"
21 "github.com/urfave/cli/v2"
22 "go.opentelemetry.io/otel"
23 "go.opentelemetry.io/otel/attribute"
24 "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
25 "go.opentelemetry.io/otel/sdk/resource"
26 tracesdk "go.opentelemetry.io/otel/sdk/trace"
27 semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
28)
29
30func main() {
31 if err := run(os.Args); err != nil {
32 slog.Error("exiting", "err", err)
33 os.Exit(1)
34 }
35}
36
37func run(args []string) error {
38
39 app := cli.App{
40 Name: "rainbow",
41 Usage: "atproto firehose fan-out daemon",
42 Version: versioninfo.Short(),
43 Action: runSplitter,
44 }
45
46 app.Flags = []cli.Flag{
47 &cli.StringFlag{
48 Name: "log-level",
49 Usage: "log verbosity level (eg: warn, info, debug)",
50 EnvVars: []string{"RAINBOW_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"},
51 },
52 &cli.StringFlag{
53 Name: "upstream-host",
54 Value: "http://localhost:2470",
55 Usage: "URL (schema and hostname, no path) of the upstream host (eg, relay)",
56 EnvVars: []string{"ATP_RELAY_HOST", "RAINBOW_RELAY_HOST"},
57 },
58 &cli.StringFlag{
59 Name: "persist-db",
60 Value: "./rainbow.db",
61 Usage: "path to persistence db",
62 EnvVars: []string{"RAINBOW_DB_PATH"},
63 },
64 &cli.StringFlag{
65 Name: "cursor-file",
66 Value: "./rainbow-cursor",
67 Usage: "write upstream cursor number to this file",
68 EnvVars: []string{"RAINBOW_CURSOR_PATH"},
69 },
70 &cli.StringFlag{
71 Name: "api-listen",
72 Value: ":2480",
73 EnvVars: []string{"RAINBOW_API_LISTEN"},
74 },
75 &cli.StringFlag{
76 Name: "metrics-listen",
77 Value: ":2481",
78 EnvVars: []string{"RAINBOW_METRICS_LISTEN", "SPLITTER_METRICS_LISTEN"},
79 },
80 &cli.Float64Flag{
81 Name: "persist-hours",
82 Value: 24 * 3,
83 EnvVars: []string{"RAINBOW_PERSIST_HOURS", "SPLITTER_PERSIST_HOURS"},
84 Usage: "hours to buffer (float, may be fractional)",
85 },
86 &cli.Int64Flag{
87 Name: "persist-bytes",
88 Value: 0,
89 Usage: "max bytes target for event cache, 0 to disable size target trimming",
90 EnvVars: []string{"RAINBOW_PERSIST_BYTES", "SPLITTER_PERSIST_BYTES"},
91 },
92 &cli.StringSliceFlag{
93 // TODO: better name for this argument
94 Name: "next-crawler",
95 Usage: "forward POST requestCrawl to these hosts (schema and host, no path) in addition to upstream-host. Comma-separated or multiple flags",
96 EnvVars: []string{"RAINBOW_NEXT_CRAWLER", "RELAY_NEXT_CRAWLER"},
97 },
98 &cli.StringFlag{
99 Name: "collectiondir-host",
100 Value: "http://localhost:2510",
101 Usage: "host (schema and hostname, no path) of upstream collectiondir instance, for com.atproto.sync.listReposByCollection",
102 EnvVars: []string{"RAINBOW_COLLECTIONDIR_HOST"},
103 },
104 &cli.StringFlag{
105 Name: "env",
106 Usage: "operating environment (eg, 'prod', 'test')",
107 Value: "dev",
108 EnvVars: []string{"ENVIRONMENT"},
109 },
110 &cli.BoolFlag{
111 Name: "enable-otel-otlp",
112 Usage: "enables OTEL OTLP exporter endpoint",
113 EnvVars: []string{"RAINBOW_ENABLE_OTEL_OTLP", "ENABLE_OTEL_OTLP"},
114 },
115 &cli.StringFlag{
116 Name: "otel-otlp-endpoint",
117 Usage: "OTEL traces export endpoint",
118 Value: "http://localhost:4318",
119 EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"},
120 },
121 }
122
123 return app.Run(args)
124}
125
126func runSplitter(cctx *cli.Context) error {
127 // Trap SIGINT to trigger a shutdown.
128 signals := make(chan os.Signal, 1)
129 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
130
131 logger := svcutil.ConfigLogger(cctx, os.Stdout).With("system", "rainbow")
132
133 // Enable OTLP HTTP exporter
134 // For relevant environment variables:
135 // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables
136 if cctx.Bool("enable-otel-otlp") {
137 ep := cctx.String("otel-otlp-endpoint")
138 logger.Info("setting up trace exporter", "endpoint", ep)
139 ctx, cancel := context.WithCancel(context.Background())
140 defer cancel()
141
142 exp, err := otlptracehttp.New(ctx)
143 if err != nil {
144 logger.Error("failed to create trace exporter", "error", err)
145 os.Exit(1)
146 }
147 defer func() {
148 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
149 defer cancel()
150 if err := exp.Shutdown(ctx); err != nil {
151 logger.Error("failed to shutdown trace exporter", "error", err)
152 }
153 }()
154
155 env := cctx.String("env")
156 tp := tracesdk.NewTracerProvider(
157 tracesdk.WithBatcher(exp),
158 tracesdk.WithResource(resource.NewWithAttributes(
159 semconv.SchemaURL,
160 semconv.ServiceNameKey.String("splitter"),
161 attribute.String("env", env), // DataDog
162 attribute.String("environment", env), // Others
163 attribute.Int64("ID", 1),
164 )),
165 )
166 otel.SetTracerProvider(tp)
167 }
168
169 persistPath := cctx.String("persist-db")
170 upstreamHost := cctx.String("upstream-host")
171 collectionDirHost := cctx.String("collectiondir-host")
172 nextCrawlers := cctx.StringSlice("next-crawler")
173
174 var spl *splitter.Splitter
175 var err error
176 if persistPath != "" {
177 logger.Info("building splitter with storage at", "path", persistPath)
178 ppopts := pebblepersist.PebblePersistOptions{
179 DbPath: persistPath,
180 PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")),
181 GCPeriod: 5 * time.Minute,
182 MaxBytes: uint64(cctx.Int64("persist-bytes")),
183 }
184 conf := splitter.SplitterConfig{
185 UpstreamHost: upstreamHost,
186 CollectionDirHost: collectionDirHost,
187 CursorFile: cctx.String("cursor-file"),
188 PebbleOptions: &ppopts,
189 UserAgent: fmt.Sprintf("rainbow/%s (atproto-relay)", versioninfo.Short()),
190 }
191 spl, err = splitter.NewSplitter(conf, nextCrawlers)
192 } else {
193 logger.Info("building in-memory splitter")
194 conf := splitter.SplitterConfig{
195 UpstreamHost: upstreamHost,
196 CollectionDirHost: collectionDirHost,
197 CursorFile: cctx.String("cursor-file"),
198 }
199 spl, err = splitter.NewSplitter(conf, nextCrawlers)
200 }
201 if err != nil {
202 logger.Error("failed to create splitter", "path", persistPath, "error", err)
203 os.Exit(1)
204 return err
205 }
206
207 // set up metrics endpoint
208 metricsListen := cctx.String("metrics-listen")
209 go func() {
210 if err := spl.StartMetrics(metricsListen); err != nil {
211 logger.Error("failed to start metrics endpoint", "err", err)
212 os.Exit(1)
213 }
214 }()
215
216 runErr := make(chan error, 1)
217
218 go func() {
219 err := spl.StartAPI(cctx.String("api-listen"))
220 runErr <- err
221 }()
222
223 logger.Info("startup complete")
224 select {
225 case <-signals:
226 logger.Info("received shutdown signal")
227 if err := spl.Shutdown(); err != nil {
228 logger.Error("error during Splitter shutdown", "err", err)
229 }
230 case err := <-runErr:
231 if err != nil {
232 logger.Error("error during Splitter startup", "err", err)
233 }
234 logger.Info("shutting down")
235 if err := spl.Shutdown(); err != nil {
236 logger.Error("error during Splitter shutdown", "err", err)
237 }
238 }
239
240 logger.Info("shutdown complete")
241
242 return nil
243}