porting all github actions from bluesky-social/indigo to tangled CI
at main 5.2 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "log/slog" 8 "net/http" 9 "net/url" 10 "os" 11 "os/signal" 12 "sync" 13 "syscall" 14 "time" 15 16 "github.com/bluesky-social/indigo/events" 17 "github.com/bluesky-social/indigo/events/schedulers/sequential" 18 "github.com/gorilla/websocket" 19 "github.com/prometheus/client_golang/prometheus/promhttp" 20 _ "go.uber.org/automaxprocs" 21 22 "github.com/carlmjohnson/versioninfo" 23 "github.com/urfave/cli/v2" 24) 25 26func main() { 27 app := cli.App{ 28 Name: "sonar", 29 Usage: "atproto firehose monitoring tool", 30 Version: versioninfo.Short(), 31 } 32 33 app.Flags = []cli.Flag{ 34 &cli.StringFlag{ 35 Name: "ws-url", 36 Usage: "full websocket path to the ATProto SubscribeRepos XRPC endpoint", 37 Value: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", 38 EnvVars: []string{"SONAR_WS_URL"}, 39 }, 40 &cli.StringFlag{ 41 Name: "log-level", 42 Usage: "log level", 43 Value: "info", 44 EnvVars: []string{"SONAR_LOG_LEVEL"}, 45 }, 46 &cli.IntFlag{ 47 Name: "port", 48 Usage: "listen port for metrics server", 49 Value: 8345, 50 EnvVars: []string{"SONAR_PORT"}, 51 }, 52 &cli.IntFlag{ 53 Name: "max-queue-size", 54 Usage: "max number of events to queue", 55 Value: 10, 56 }, 57 &cli.StringFlag{ 58 Name: "cursor-file", 59 Usage: "path to cursor file", 60 Value: "sonar_cursor.json", 61 EnvVars: []string{"SONAR_CURSOR_FILE"}, 62 }, 63 } 64 65 app.Action = runSonar 66 67 err := app.Run(os.Args) 68 if err != nil { 69 log.Fatal(err) 70 } 71} 72 73func runSonar(cctx *cli.Context) error { 74 ctx := cctx.Context 75 ctx, cancel := context.WithCancel(ctx) 76 defer cancel() 77 78 // Trap SIGINT to trigger a shutdown. 79 signals := make(chan os.Signal, 1) 80 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 81 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 82 Level: slog.LevelInfo, 83 })) 84 defer func() { 85 logger.Info("main function teardown") 86 }() 87 88 logger = logger.With("source", "sonar_main") 89 logger.Info("starting sonar") 90 91 u, err := url.Parse(cctx.String("ws-url")) 92 if err != nil { 93 log.Fatalf("failed to parse ws-url: %+v", err) 94 } 95 96 s, err := NewSonar(logger, cctx.String("cursor-file"), u.String()) 97 if err != nil { 98 log.Fatalf("failed to create sonar: %+v", err) 99 } 100 101 wg := sync.WaitGroup{} 102 103 pool := sequential.NewScheduler(u.Host, s.HandleStreamEvent) 104 105 // Start a goroutine to manage the cursor file, saving the current cursor every 5 seconds. 106 wg.Add(1) 107 go func() { 108 defer wg.Done() 109 ticker := time.NewTicker(5 * time.Second) 110 logger := logger.With("source", "cursor_file_manager") 111 112 for { 113 select { 114 case <-ctx.Done(): 115 logger.Info("shutting down cursor file manager") 116 err := s.WriteCursorFile() 117 if err != nil { 118 logger.Error("failed to write cursor file", "err", err) 119 } 120 logger.Info("cursor file manager shut down successfully") 121 return 122 case <-ticker.C: 123 err := s.WriteCursorFile() 124 if err != nil { 125 logger.Error("failed to write cursor file", "err", err) 126 } 127 } 128 } 129 }() 130 131 // Start a goroutine to manage the liveness checker, shutting down if no events are received for 15 seconds 132 wg.Add(1) 133 go func() { 134 defer wg.Done() 135 ticker := time.NewTicker(15 * time.Second) 136 lastSeq := int64(0) 137 138 logger = logger.With("source", "liveness_checker") 139 140 for { 141 select { 142 case <-ctx.Done(): 143 logger.Info("shutting down liveness checker") 144 return 145 case <-ticker.C: 146 s.ProgMux.Lock() 147 seq := s.Progress.LastSeq 148 s.ProgMux.Unlock() 149 if seq <= lastSeq { 150 logger.Error("no new events in last 15 seconds, shutting down for docker to restart me") 151 cancel() 152 } else { 153 logger.Info("last event sequence", "seq", seq) 154 lastSeq = seq 155 } 156 } 157 } 158 }() 159 160 mux := http.NewServeMux() 161 mux.Handle("/metrics", promhttp.Handler()) 162 163 metricServer := &http.Server{ 164 Addr: fmt.Sprintf(":%d", cctx.Int("port")), 165 Handler: mux, 166 } 167 168 // Startup metrics server 169 wg.Add(1) 170 go func() { 171 defer wg.Done() 172 logger = logger.With("source", "metrics_server") 173 174 logger.Info("metrics server listening", "port", cctx.Int("port")) 175 176 if err := metricServer.ListenAndServe(); err != http.ErrServerClosed { 177 log.Fatalf("failed to start metrics server: %+v", err) 178 } 179 logger.Info("metrics server shut down successfully") 180 }() 181 182 if s.Progress.LastSeq >= 0 { 183 u.RawQuery = fmt.Sprintf("cursor=%d", s.Progress.LastSeq) 184 } 185 186 logger.Info("connecting to WebSocket", "url", u.String()) 187 c, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{ 188 "User-Agent": []string{"sonar/1.1"}, 189 }) 190 if err != nil { 191 logger.Info("failed to connect to websocket", "err", err) 192 return err 193 } 194 defer c.Close() 195 196 wg.Add(1) 197 go func() { 198 defer wg.Done() 199 err = events.HandleRepoStream(ctx, c, pool, logger) 200 logger.Info("HandleRepoStream returned unexpectedly", "err", err) 201 cancel() 202 }() 203 204 select { 205 case <-signals: 206 cancel() 207 fmt.Println("shutting down on signal") 208 case <-ctx.Done(): 209 fmt.Println("shutting down on context done") 210 } 211 212 logger.Info("shutting down, waiting for workers to clean up") 213 214 if err := metricServer.Shutdown(ctx); err != nil { 215 logger.Error("failed to shut down metrics server", "err", err) 216 wg.Done() 217 } 218 219 wg.Wait() 220 logger.Info("shut down successfully") 221 222 return nil 223}