fork of indigo with slightly nicer lexgen
at main 8.3 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log/slog" 9 _ "net/http/pprof" 10 "os" 11 "runtime" 12 "strings" 13 14 "github.com/bluesky-social/indigo/atproto/identity/apidir" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 17 "github.com/carlmjohnson/versioninfo" 18 _ "github.com/joho/godotenv/autoload" 19 "github.com/urfave/cli/v2" 20) 21 22func main() { 23 if err := run(os.Args); err != nil { 24 slog.Error("exiting", "err", err) 25 os.Exit(-1) 26 } 27} 28 29func run(args []string) error { 30 31 app := cli.App{ 32 Name: "bluepages", 33 Usage: "atproto identity directory", 34 Version: versioninfo.Short(), 35 Flags: []cli.Flag{ 36 &cli.StringFlag{ 37 Name: "atp-relay-host", 38 Usage: "hostname and port of Relay to subscribe to", 39 Value: "wss://bsky.network", 40 EnvVars: []string{"ATP_RELAY_HOST", "ATP_BGS_HOST"}, 41 }, 42 &cli.StringFlag{ 43 Name: "atp-plc-host", 44 Usage: "method, hostname, and port of PLC registry", 45 Value: "https://plc.directory", 46 EnvVars: []string{"ATP_PLC_HOST"}, 47 }, 48 &cli.IntFlag{ 49 Name: "plc-rate-limit", 50 Usage: "max number of requests per second to PLC registry", 51 Value: 300, 52 EnvVars: []string{"BLUEPAGES_PLC_RATE_LIMIT"}, 53 }, 54 &cli.StringFlag{ 55 Name: "redis-url", 56 Usage: "redis connection URL: redis://<user>:<pass>@<hostname>:6379/<db>", 57 Value: "redis://localhost:6379/0", 58 EnvVars: []string{"BLUEPAGES_REDIS_URL"}, 59 }, 60 &cli.StringFlag{ 61 Name: "log-level", 62 Usage: "log verbosity level (eg: warn, info, debug)", 63 EnvVars: []string{"BLUEPAGES_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 64 }, 65 }, 66 Commands: []*cli.Command{ 67 &cli.Command{ 68 Name: "serve", 69 Usage: "run the bluepages API daemon", 70 Action: runServeCmd, 71 Flags: []cli.Flag{ 72 &cli.StringFlag{ 73 Name: "bind", 74 Usage: "Specify the local IP/port to bind to", 75 Required: false, 76 Value: ":6600", 77 EnvVars: []string{"BLUEPAGES_BIND"}, 78 }, 79 &cli.StringFlag{ 80 Name: "metrics-listen", 81 Usage: "IP or address, and port, to listen on for metrics APIs", 82 Value: ":3989", 83 EnvVars: []string{"BLUEPAGES_METRICS_LISTEN"}, 84 }, 85 &cli.BoolFlag{ 86 Name: "disable-firehose-consumer", 87 Usage: "don't consume #identity events from firehose", 88 EnvVars: []string{"BLUEPAGES_DISABLE_FIREHOSE_CONSUMER"}, 89 }, 90 &cli.BoolFlag{ 91 Name: "disable-refresh", 92 Usage: "disable the refreshIdentity API endpoint", 93 EnvVars: []string{"BLUEPAGES_DISABLE_REFRESH"}, 94 }, 95 &cli.IntFlag{ 96 Name: "firehose-parallelism", 97 Usage: "number of concurrent firehose workers", 98 Value: 4, 99 EnvVars: []string{"BLUEPAGES_FIREHOSE_PARALLELISM"}, 100 }, 101 }, 102 }, 103 &cli.Command{ 104 Name: "resolve-handle", 105 ArgsUsage: `<handle>`, 106 Usage: "query service for handle resoltion", 107 Action: runResolveHandleCmd, 108 Flags: []cli.Flag{ 109 &cli.StringFlag{ 110 Name: "host", 111 Usage: "bluepages server to send request to", 112 Value: "http://localhost:6600", 113 EnvVars: []string{"BLUEPAGES_HOST"}, 114 }, 115 }, 116 }, 117 &cli.Command{ 118 Name: "resolve-did", 119 ArgsUsage: `<did>`, 120 Usage: "query service for DID document resoltion", 121 Action: runResolveDIDCmd, 122 Flags: []cli.Flag{ 123 &cli.StringFlag{ 124 Name: "host", 125 Usage: "bluepages server to send request to", 126 Value: "http://localhost:6600", 127 EnvVars: []string{"BLUEPAGES_HOST"}, 128 }, 129 }, 130 }, 131 &cli.Command{ 132 Name: "lookup", 133 ArgsUsage: `<at-identifier>`, 134 Usage: "query service for identity resoltion", 135 Action: runLookupCmd, 136 Flags: []cli.Flag{ 137 &cli.StringFlag{ 138 Name: "host", 139 Usage: "bluepages server to send request to", 140 Value: "http://localhost:6600", 141 EnvVars: []string{"BLUEPAGES_HOST"}, 142 }, 143 }, 144 }, 145 &cli.Command{ 146 Name: "refresh", 147 ArgsUsage: `<at-identifier>`, 148 Usage: "ask service to refresh identity", 149 Action: runRefreshCmd, 150 Flags: []cli.Flag{ 151 &cli.StringFlag{ 152 Name: "host", 153 Usage: "bluepages server to send request to", 154 Value: "http://localhost:6600", 155 EnvVars: []string{"BLUEPAGES_HOST"}, 156 }, 157 }, 158 }, 159 }, 160 } 161 162 return app.Run(args) 163} 164 165func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger { 166 var level slog.Level 167 switch strings.ToLower(cctx.String("log-level")) { 168 case "error": 169 level = slog.LevelError 170 case "warn": 171 level = slog.LevelWarn 172 case "info": 173 level = slog.LevelInfo 174 case "debug": 175 level = slog.LevelDebug 176 default: 177 level = slog.LevelInfo 178 } 179 logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{ 180 Level: level, 181 })) 182 slog.SetDefault(logger) 183 return logger 184} 185 186func configClient(cctx *cli.Context) apidir.APIDirectory { 187 return apidir.NewAPIDirectory(cctx.String("host")) 188} 189 190func runServeCmd(cctx *cli.Context) error { 191 logger := configLogger(cctx, os.Stdout) 192 ctx := context.Background() 193 194 srv, err := NewServer( 195 Config{ 196 Logger: logger, 197 Bind: cctx.String("bind"), 198 RedisURL: cctx.String("redis-url"), 199 PLCHost: cctx.String("atp-plc-host"), 200 PLCRateLimit: cctx.Int("plc-rate-limit"), 201 DisableRefresh: cctx.Bool("disable-refresh"), 202 }, 203 ) 204 if err != nil { 205 return fmt.Errorf("failed to construct server: %v", err) 206 } 207 208 if !cctx.Bool("disable-firehose-consumer") { 209 go func() { 210 firehoseHost := cctx.String("atp-relay-host") 211 firehoseParallelism := cctx.Int("firehose-parallelism") 212 if err := srv.RunFirehoseConsumer(ctx, firehoseHost, firehoseParallelism); err != nil { 213 slog.Error("firehose consumer thread failed", "err", err) 214 // NOTE: not crashing or halting process here 215 } 216 }() 217 go func() { 218 if err := srv.RunPersistCursor(ctx); err != nil { 219 slog.Error("firehose persist thread failed", "err", err) 220 // NOTE: not crashing or halting process here 221 } 222 }() 223 } 224 225 // prometheus HTTP endpoint: /metrics 226 go func() { 227 // TODO: what is this tuning for? just cargo-culted it 228 runtime.SetBlockProfileRate(10) 229 runtime.SetMutexProfileFraction(10) 230 if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil { 231 slog.Error("failed to start metrics endpoint", "error", err) 232 // NOTE: not crashing or halting process here 233 } 234 }() 235 236 return srv.RunAPI() 237} 238 239func runResolveHandleCmd(cctx *cli.Context) error { 240 ctx := context.Background() 241 dir := configClient(cctx) 242 243 s := cctx.Args().First() 244 if s == "" { 245 return fmt.Errorf("need to provide identifier for resolution") 246 } 247 handle, err := syntax.ParseHandle(s) 248 if err != nil { 249 return err 250 } 251 252 did, err := dir.ResolveHandle(ctx, handle) 253 if err != nil { 254 return err 255 } 256 fmt.Println(did.String()) 257 return nil 258} 259 260func runResolveDIDCmd(cctx *cli.Context) error { 261 ctx := context.Background() 262 dir := configClient(cctx) 263 264 s := cctx.Args().First() 265 if s == "" { 266 return fmt.Errorf("need to provide identifier for resolution") 267 } 268 did, err := syntax.ParseDID(s) 269 if err != nil { 270 return err 271 } 272 273 raw, err := dir.ResolveDIDRaw(ctx, did) 274 if err != nil { 275 return err 276 } 277 b, err := json.MarshalIndent(raw, "", " ") 278 if err != nil { 279 return err 280 } 281 fmt.Println(string(b)) 282 return nil 283} 284 285func runLookupCmd(cctx *cli.Context) error { 286 ctx := context.Background() 287 dir := configClient(cctx) 288 289 s := cctx.Args().First() 290 if s == "" { 291 return fmt.Errorf("need to provide identifier for resolution") 292 } 293 atid, err := syntax.ParseAtIdentifier(s) 294 if err != nil { 295 return err 296 } 297 298 ident, err := dir.Lookup(ctx, *atid) 299 if err != nil { 300 return err 301 } 302 303 b, err := json.MarshalIndent(ident, "", " ") 304 if err != nil { 305 return err 306 } 307 fmt.Println(string(b)) 308 return nil 309} 310 311func runRefreshCmd(cctx *cli.Context) error { 312 ctx := context.Background() 313 dir := configClient(cctx) 314 315 s := cctx.Args().First() 316 if s == "" { 317 return fmt.Errorf("need to provide identifier for resolution") 318 } 319 atid, err := syntax.ParseAtIdentifier(s) 320 if err != nil { 321 return err 322 } 323 324 err = dir.Purge(ctx, *atid) 325 if err != nil { 326 return err 327 } 328 329 ident, err := dir.Lookup(ctx, *atid) 330 if err != nil { 331 return err 332 } 333 334 b, err := json.MarshalIndent(ident, "", " ") 335 if err != nil { 336 return err 337 } 338 fmt.Println(string(b)) 339 return nil 340}