+3
-3
cmd/rainbow/README.md
+3
-3
cmd/rainbow/README.md
···
11
11
- retains upstream firehose "sequence numbers"
12
12
- does not validate events (signatures, repo tree, hashes, etc), just passes through
13
13
- does not archive or mirror individual records or entire repositories (or implement related API endpoints)
14
-
- disk I/O intensive: fast NVMe disks are recommended, and RAM is helpful for caching
14
+
- somewhat disk I/O intensive: fast NVMe disks are recommended, and RAM is helpful for caching
15
15
- single golang binary for easy deployment
16
-
- observability: logging, prometheus metrics, OTEL traces
16
+
- observability: logging, prometheus metrics, and OTEL traces
17
17
18
18
## Running
19
19
···
22
22
From the top level of this repo, you can build:
23
23
24
24
```shell
25
-
go build ./cmd/rainbow -o rainbow-bin
25
+
go build ./cmd/rainbow -o rainbow
26
26
```
27
27
28
28
or just run it, and see configuration options:
+62
-50
cmd/rainbow/main.go
+62
-50
cmd/rainbow/main.go
···
3
3
import (
4
4
"context"
5
5
"log/slog"
6
-
_ "net/http/pprof"
7
6
"os"
8
7
"os/signal"
9
8
"syscall"
10
9
"time"
11
10
11
+
_ "github.com/joho/godotenv/autoload"
12
+
_ "go.uber.org/automaxprocs"
13
+
_ "net/http/pprof"
14
+
12
15
"github.com/bluesky-social/indigo/events/pebblepersist"
13
16
"github.com/bluesky-social/indigo/splitter"
17
+
"github.com/bluesky-social/indigo/util/svcutil"
14
18
15
19
"github.com/carlmjohnson/versioninfo"
16
-
_ "github.com/joho/godotenv/autoload"
17
20
"github.com/urfave/cli/v2"
18
21
"go.opentelemetry.io/otel"
19
22
"go.opentelemetry.io/otel/attribute"
···
21
24
"go.opentelemetry.io/otel/sdk/resource"
22
25
tracesdk "go.opentelemetry.io/otel/sdk/trace"
23
26
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
24
-
_ "go.uber.org/automaxprocs"
25
27
)
26
28
27
-
var log = slog.Default().With("system", "rainbow")
28
-
29
-
func init() {
30
-
// control log level using, eg, GOLOG_LOG_LEVEL=debug
31
-
//logging.SetAllLoggers(logging.LevelDebug)
32
-
}
33
-
34
29
func main() {
35
-
run(os.Args)
30
+
if err := run(os.Args); err != nil {
31
+
slog.Error("exiting", "err", err)
32
+
os.Exit(1)
33
+
}
36
34
}
37
35
38
-
func run(args []string) {
36
+
func run(args []string) error {
37
+
39
38
app := cli.App{
40
39
Name: "rainbow",
41
40
Usage: "atproto firehose fan-out daemon",
42
41
Version: versioninfo.Short(),
42
+
Action: runSplitter,
43
43
}
44
44
45
45
app.Flags = []cli.Flag{
46
-
// TODO: unimplemented, always assumes https:// and wss://
47
-
//&cli.BoolFlag{
48
-
// Name: "crawl-insecure-ws",
49
-
// Usage: "when connecting to PDS instances, use ws:// instead of wss://",
50
-
// EnvVars: []string{"RAINBOW_INSECURE_CRAWL"},
51
-
//},
52
46
&cli.StringFlag{
53
-
Name: "splitter-host",
47
+
Name: "log-level",
48
+
Usage: "log verbosity level (eg: warn, info, debug)",
49
+
EnvVars: []string{"RAINBOW_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"},
50
+
},
51
+
&cli.StringFlag{
52
+
Name: "upstream-host",
54
53
Value: "bsky.network",
54
+
Usage: "simple hostname (no URI scheme) of the upstream host (eg, relay)",
55
55
EnvVars: []string{"ATP_RELAY_HOST", "RAINBOW_RELAY_HOST"},
56
56
},
57
57
&cli.StringFlag{
···
91
91
&cli.StringSliceFlag{
92
92
Name: "next-crawler",
93
93
Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list",
94
-
EnvVars: []string{"RELAY_NEXT_CRAWLER"},
94
+
EnvVars: []string{"RAINBOW_NEXT_CRAWLER", "RELAY_NEXT_CRAWLER"},
95
+
},
96
+
&cli.StringFlag{
97
+
Name: "env",
98
+
Usage: "operating environment (eg, 'prod', 'test')",
99
+
Value: "dev",
100
+
EnvVars: []string{"ENVIRONMENT"},
101
+
},
102
+
&cli.BoolFlag{
103
+
Name: "enable-otel-otlp",
104
+
Usage: "enables OTEL OTLP exporter endpoint",
105
+
},
106
+
&cli.StringFlag{
107
+
Name: "otel-otlp-endpoint",
108
+
Usage: "OTEL traces export endpoint",
109
+
Value: "http://localhost:4318",
110
+
EnvVars: []string{"OTEL_EXPORTER_OTLP_ENDPOINT"},
95
111
},
96
112
}
97
113
98
-
// TODO: slog.SetDefault and set module `var log *slog.Logger` based on flags and env
99
-
100
-
app.Action = Splitter
101
-
err := app.Run(os.Args)
102
-
if err != nil {
103
-
log.Error(err.Error())
104
-
os.Exit(1)
105
-
}
114
+
return app.Run(args)
106
115
}
107
116
108
-
func Splitter(cctx *cli.Context) error {
117
+
func runSplitter(cctx *cli.Context) error {
109
118
// Trap SIGINT to trigger a shutdown.
110
119
signals := make(chan os.Signal, 1)
111
120
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
112
121
122
+
logger := svcutil.ConfigLogger(cctx, os.Stdout).With("system", "rainbow")
123
+
113
124
// Enable OTLP HTTP exporter
114
125
// For relevant environment variables:
115
126
// https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables
116
-
// At a minimum, you need to set
117
-
// OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
118
-
if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" {
119
-
log.Info("setting up trace exporter", "endpoint", ep)
127
+
if cctx.Bool("enable-otel-otlp") {
128
+
ep := cctx.String("otel-otlp-endpoint")
129
+
logger.Info("setting up trace exporter", "endpoint", ep)
120
130
ctx, cancel := context.WithCancel(context.Background())
121
131
defer cancel()
122
132
123
133
exp, err := otlptracehttp.New(ctx)
124
134
if err != nil {
125
-
log.Error("failed to create trace exporter", "error", err)
135
+
logger.Error("failed to create trace exporter", "error", err)
126
136
os.Exit(1)
127
137
}
128
138
defer func() {
129
139
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
130
140
defer cancel()
131
141
if err := exp.Shutdown(ctx); err != nil {
132
-
log.Error("failed to shutdown trace exporter", "error", err)
142
+
logger.Error("failed to shutdown trace exporter", "error", err)
133
143
}
134
144
}()
135
145
146
+
env := cctx.String("env")
136
147
tp := tracesdk.NewTracerProvider(
137
148
tracesdk.WithBatcher(exp),
138
149
tracesdk.WithResource(resource.NewWithAttributes(
139
150
semconv.SchemaURL,
140
151
semconv.ServiceNameKey.String("splitter"),
141
-
attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog
142
-
attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others
152
+
attribute.String("env", env), // DataDog
153
+
attribute.String("environment", env), // Others
143
154
attribute.Int64("ID", 1),
144
155
)),
145
156
)
···
147
158
}
148
159
149
160
persistPath := cctx.String("persist-db")
150
-
upstreamHost := cctx.String("splitter-host")
161
+
upstreamHost := cctx.String("upstream-host")
151
162
nextCrawlers := cctx.StringSlice("next-crawler")
152
163
153
164
var spl *splitter.Splitter
154
165
var err error
155
166
if persistPath != "" {
156
-
log.Info("building splitter with storage at", "path", persistPath)
167
+
logger.Info("building splitter with storage at", "path", persistPath)
157
168
ppopts := pebblepersist.PebblePersistOptions{
158
169
DbPath: persistPath,
159
170
PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")),
···
167
178
}
168
179
spl, err = splitter.NewSplitter(conf, nextCrawlers)
169
180
} else {
170
-
log.Info("building in-memory splitter")
181
+
logger.Info("building in-memory splitter")
171
182
conf := splitter.SplitterConfig{
172
183
UpstreamHost: upstreamHost,
173
184
CursorFile: cctx.String("cursor-file"),
···
175
186
spl, err = splitter.NewSplitter(conf, nextCrawlers)
176
187
}
177
188
if err != nil {
178
-
log.Error("failed to create splitter", "path", persistPath, "error", err)
189
+
logger.Error("failed to create splitter", "path", persistPath, "error", err)
179
190
os.Exit(1)
180
191
return err
181
192
}
182
193
183
194
// set up metrics endpoint
195
+
metricsListen := cctx.String("metrics-listen")
184
196
go func() {
185
-
if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil {
186
-
log.Error("failed to start metrics endpoint", "err", err)
197
+
if err := spl.StartMetrics(metricsListen); err != nil {
198
+
logger.Error("failed to start metrics endpoint", "err", err)
187
199
os.Exit(1)
188
200
}
189
201
}()
···
195
207
runErr <- err
196
208
}()
197
209
198
-
log.Info("startup complete")
210
+
logger.Info("startup complete")
199
211
select {
200
212
case <-signals:
201
-
log.Info("received shutdown signal")
213
+
logger.Info("received shutdown signal")
202
214
if err := spl.Shutdown(); err != nil {
203
-
log.Error("error during Splitter shutdown", "err", err)
215
+
logger.Error("error during Splitter shutdown", "err", err)
204
216
}
205
217
case err := <-runErr:
206
218
if err != nil {
207
-
log.Error("error during Splitter startup", "err", err)
219
+
logger.Error("error during Splitter startup", "err", err)
208
220
}
209
-
log.Info("shutting down")
221
+
logger.Info("shutting down")
210
222
if err := spl.Shutdown(); err != nil {
211
-
log.Error("error during Splitter shutdown", "err", err)
223
+
logger.Error("error during Splitter shutdown", "err", err)
212
224
}
213
225
}
214
226
215
-
log.Info("shutdown complete")
227
+
logger.Info("shutdown complete")
216
228
217
229
return nil
218
230
}
+30
util/svcutil/logger.go
+30
util/svcutil/logger.go
···
1
+
package svcutil
2
+
3
+
import (
4
+
"io"
5
+
"log/slog"
6
+
"strings"
7
+
8
+
"github.com/urfave/cli/v2"
9
+
)
10
+
11
+
func ConfigLogger(cctx *cli.Context, writer io.Writer) *slog.Logger {
12
+
var level slog.Level
13
+
switch strings.ToLower(cctx.String("log-level")) {
14
+
case "error":
15
+
level = slog.LevelError
16
+
case "warn":
17
+
level = slog.LevelWarn
18
+
case "info":
19
+
level = slog.LevelInfo
20
+
case "debug":
21
+
level = slog.LevelDebug
22
+
default:
23
+
level = slog.LevelInfo
24
+
}
25
+
logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{
26
+
Level: level,
27
+
}))
28
+
slog.SetDefault(logger)
29
+
return logger
30
+
}