1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "log/slog"
8 "net/http"
9 "net/url"
10 "os"
11 "os/signal"
12 "sync"
13 "syscall"
14 "time"
15
16 "github.com/bluesky-social/indigo/events"
17 "github.com/bluesky-social/indigo/events/schedulers/sequential"
18 "github.com/gorilla/websocket"
19 "github.com/prometheus/client_golang/prometheus/promhttp"
20 _ "go.uber.org/automaxprocs"
21
22 "github.com/carlmjohnson/versioninfo"
23 "github.com/urfave/cli/v2"
24)
25
26func main() {
27 app := cli.App{
28 Name: "sonar",
29 Usage: "atproto firehose monitoring tool",
30 Version: versioninfo.Short(),
31 }
32
33 app.Flags = []cli.Flag{
34 &cli.StringFlag{
35 Name: "ws-url",
36 Usage: "full websocket path to the ATProto SubscribeRepos XRPC endpoint",
37 Value: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
38 EnvVars: []string{"SONAR_WS_URL"},
39 },
40 &cli.StringFlag{
41 Name: "log-level",
42 Usage: "log level",
43 Value: "info",
44 EnvVars: []string{"SONAR_LOG_LEVEL"},
45 },
46 &cli.IntFlag{
47 Name: "port",
48 Usage: "listen port for metrics server",
49 Value: 8345,
50 EnvVars: []string{"SONAR_PORT"},
51 },
52 &cli.IntFlag{
53 Name: "max-queue-size",
54 Usage: "max number of events to queue",
55 Value: 10,
56 },
57 &cli.StringFlag{
58 Name: "cursor-file",
59 Usage: "path to cursor file",
60 Value: "sonar_cursor.json",
61 EnvVars: []string{"SONAR_CURSOR_FILE"},
62 },
63 }
64
65 app.Action = runSonar
66
67 err := app.Run(os.Args)
68 if err != nil {
69 log.Fatal(err)
70 }
71}
72
73func runSonar(cctx *cli.Context) error {
74 ctx := cctx.Context
75 ctx, cancel := context.WithCancel(ctx)
76 defer cancel()
77
78 // Trap SIGINT to trigger a shutdown.
79 signals := make(chan os.Signal, 1)
80 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
81 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
82 Level: slog.LevelInfo,
83 }))
84 defer func() {
85 logger.Info("main function teardown")
86 }()
87
88 logger = logger.With("source", "sonar_main")
89 logger.Info("starting sonar")
90
91 u, err := url.Parse(cctx.String("ws-url"))
92 if err != nil {
93 log.Fatalf("failed to parse ws-url: %+v", err)
94 }
95
96 s, err := NewSonar(logger, cctx.String("cursor-file"), u.String())
97 if err != nil {
98 log.Fatalf("failed to create sonar: %+v", err)
99 }
100
101 wg := sync.WaitGroup{}
102
103 pool := sequential.NewScheduler(u.Host, s.HandleStreamEvent)
104
105 // Start a goroutine to manage the cursor file, saving the current cursor every 5 seconds.
106 wg.Add(1)
107 go func() {
108 defer wg.Done()
109 ticker := time.NewTicker(5 * time.Second)
110 logger := logger.With("source", "cursor_file_manager")
111
112 for {
113 select {
114 case <-ctx.Done():
115 logger.Info("shutting down cursor file manager")
116 err := s.WriteCursorFile()
117 if err != nil {
118 logger.Error("failed to write cursor file", "err", err)
119 }
120 logger.Info("cursor file manager shut down successfully")
121 return
122 case <-ticker.C:
123 err := s.WriteCursorFile()
124 if err != nil {
125 logger.Error("failed to write cursor file", "err", err)
126 }
127 }
128 }
129 }()
130
131 // Start a goroutine to manage the liveness checker, shutting down if no events are received for 15 seconds
132 wg.Add(1)
133 go func() {
134 defer wg.Done()
135 ticker := time.NewTicker(15 * time.Second)
136 lastSeq := int64(0)
137
138 logger = logger.With("source", "liveness_checker")
139
140 for {
141 select {
142 case <-ctx.Done():
143 logger.Info("shutting down liveness checker")
144 return
145 case <-ticker.C:
146 s.ProgMux.Lock()
147 seq := s.Progress.LastSeq
148 s.ProgMux.Unlock()
149 if seq <= lastSeq {
150 logger.Error("no new events in last 15 seconds, shutting down for docker to restart me")
151 cancel()
152 } else {
153 logger.Info("last event sequence", "seq", seq)
154 lastSeq = seq
155 }
156 }
157 }
158 }()
159
160 mux := http.NewServeMux()
161 mux.Handle("/metrics", promhttp.Handler())
162
163 metricServer := &http.Server{
164 Addr: fmt.Sprintf(":%d", cctx.Int("port")),
165 Handler: mux,
166 }
167
168 // Startup metrics server
169 wg.Add(1)
170 go func() {
171 defer wg.Done()
172 logger = logger.With("source", "metrics_server")
173
174 logger.Info("metrics server listening", "port", cctx.Int("port"))
175
176 if err := metricServer.ListenAndServe(); err != http.ErrServerClosed {
177 log.Fatalf("failed to start metrics server: %+v", err)
178 }
179 logger.Info("metrics server shut down successfully")
180 }()
181
182 if s.Progress.LastSeq >= 0 {
183 u.RawQuery = fmt.Sprintf("cursor=%d", s.Progress.LastSeq)
184 }
185
186 logger.Info("connecting to WebSocket", "url", u.String())
187 c, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{
188 "User-Agent": []string{"sonar/1.1"},
189 })
190 if err != nil {
191 logger.Info("failed to connect to websocket", "err", err)
192 return err
193 }
194 defer c.Close()
195
196 wg.Add(1)
197 go func() {
198 defer wg.Done()
199 err = events.HandleRepoStream(ctx, c, pool, logger)
200 logger.Info("HandleRepoStream returned unexpectedly", "err", err)
201 cancel()
202 }()
203
204 select {
205 case <-signals:
206 cancel()
207 fmt.Println("shutting down on signal")
208 case <-ctx.Done():
209 fmt.Println("shutting down on context done")
210 }
211
212 logger.Info("shutting down, waiting for workers to clean up")
213
214 if err := metricServer.Shutdown(ctx); err != nil {
215 logger.Error("failed to shut down metrics server", "err", err)
216 wg.Done()
217 }
218
219 wg.Wait()
220 logger.Info("shut down successfully")
221
222 return nil
223}