1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 _ "net/http/pprof"
10 "os"
11 "runtime"
12 "strings"
13
14 "github.com/bluesky-social/indigo/atproto/identity/apidir"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16
17 "github.com/carlmjohnson/versioninfo"
18 _ "github.com/joho/godotenv/autoload"
19 "github.com/urfave/cli/v2"
20)
21
22func main() {
23 if err := run(os.Args); err != nil {
24 slog.Error("exiting", "err", err)
25 os.Exit(-1)
26 }
27}
28
29func run(args []string) error {
30
31 app := cli.App{
32 Name: "bluepages",
33 Usage: "atproto identity directory",
34 Version: versioninfo.Short(),
35 Flags: []cli.Flag{
36 &cli.StringFlag{
37 Name: "atp-relay-host",
38 Usage: "hostname and port of Relay to subscribe to",
39 Value: "wss://bsky.network",
40 EnvVars: []string{"ATP_RELAY_HOST", "ATP_BGS_HOST"},
41 },
42 &cli.StringFlag{
43 Name: "atp-plc-host",
44 Usage: "method, hostname, and port of PLC registry",
45 Value: "https://plc.directory",
46 EnvVars: []string{"ATP_PLC_HOST"},
47 },
48 &cli.IntFlag{
49 Name: "plc-rate-limit",
50 Usage: "max number of requests per second to PLC registry",
51 Value: 300,
52 EnvVars: []string{"BLUEPAGES_PLC_RATE_LIMIT"},
53 },
54 &cli.StringFlag{
55 Name: "redis-url",
56 Usage: "redis connection URL: redis://<user>:<pass>@<hostname>:6379/<db>",
57 Value: "redis://localhost:6379/0",
58 EnvVars: []string{"BLUEPAGES_REDIS_URL"},
59 },
60 &cli.StringFlag{
61 Name: "log-level",
62 Usage: "log verbosity level (eg: warn, info, debug)",
63 EnvVars: []string{"BLUEPAGES_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"},
64 },
65 },
66 Commands: []*cli.Command{
67 &cli.Command{
68 Name: "serve",
69 Usage: "run the bluepages API daemon",
70 Action: runServeCmd,
71 Flags: []cli.Flag{
72 &cli.StringFlag{
73 Name: "bind",
74 Usage: "Specify the local IP/port to bind to",
75 Required: false,
76 Value: ":6600",
77 EnvVars: []string{"BLUEPAGES_BIND"},
78 },
79 &cli.StringFlag{
80 Name: "metrics-listen",
81 Usage: "IP or address, and port, to listen on for metrics APIs",
82 Value: ":3989",
83 EnvVars: []string{"BLUEPAGES_METRICS_LISTEN"},
84 },
85 &cli.BoolFlag{
86 Name: "disable-firehose-consumer",
87 Usage: "don't consume #identity events from firehose",
88 EnvVars: []string{"BLUEPAGES_DISABLE_FIREHOSE_CONSUMER"},
89 },
90 &cli.BoolFlag{
91 Name: "disable-refresh",
92 Usage: "disable the refreshIdentity API endpoint",
93 EnvVars: []string{"BLUEPAGES_DISABLE_REFRESH"},
94 },
95 &cli.IntFlag{
96 Name: "firehose-parallelism",
97 Usage: "number of concurrent firehose workers",
98 Value: 4,
99 EnvVars: []string{"BLUEPAGES_FIREHOSE_PARALLELISM"},
100 },
101 },
102 },
103 &cli.Command{
104 Name: "resolve-handle",
105 ArgsUsage: `<handle>`,
106 Usage: "query service for handle resoltion",
107 Action: runResolveHandleCmd,
108 Flags: []cli.Flag{
109 &cli.StringFlag{
110 Name: "host",
111 Usage: "bluepages server to send request to",
112 Value: "http://localhost:6600",
113 EnvVars: []string{"BLUEPAGES_HOST"},
114 },
115 },
116 },
117 &cli.Command{
118 Name: "resolve-did",
119 ArgsUsage: `<did>`,
120 Usage: "query service for DID document resoltion",
121 Action: runResolveDIDCmd,
122 Flags: []cli.Flag{
123 &cli.StringFlag{
124 Name: "host",
125 Usage: "bluepages server to send request to",
126 Value: "http://localhost:6600",
127 EnvVars: []string{"BLUEPAGES_HOST"},
128 },
129 },
130 },
131 &cli.Command{
132 Name: "lookup",
133 ArgsUsage: `<at-identifier>`,
134 Usage: "query service for identity resoltion",
135 Action: runLookupCmd,
136 Flags: []cli.Flag{
137 &cli.StringFlag{
138 Name: "host",
139 Usage: "bluepages server to send request to",
140 Value: "http://localhost:6600",
141 EnvVars: []string{"BLUEPAGES_HOST"},
142 },
143 },
144 },
145 &cli.Command{
146 Name: "refresh",
147 ArgsUsage: `<at-identifier>`,
148 Usage: "ask service to refresh identity",
149 Action: runRefreshCmd,
150 Flags: []cli.Flag{
151 &cli.StringFlag{
152 Name: "host",
153 Usage: "bluepages server to send request to",
154 Value: "http://localhost:6600",
155 EnvVars: []string{"BLUEPAGES_HOST"},
156 },
157 },
158 },
159 },
160 }
161
162 return app.Run(args)
163}
164
165func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger {
166 var level slog.Level
167 switch strings.ToLower(cctx.String("log-level")) {
168 case "error":
169 level = slog.LevelError
170 case "warn":
171 level = slog.LevelWarn
172 case "info":
173 level = slog.LevelInfo
174 case "debug":
175 level = slog.LevelDebug
176 default:
177 level = slog.LevelInfo
178 }
179 logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{
180 Level: level,
181 }))
182 slog.SetDefault(logger)
183 return logger
184}
185
186func configClient(cctx *cli.Context) apidir.APIDirectory {
187 return apidir.NewAPIDirectory(cctx.String("host"))
188}
189
190func runServeCmd(cctx *cli.Context) error {
191 logger := configLogger(cctx, os.Stdout)
192 ctx := context.Background()
193
194 srv, err := NewServer(
195 Config{
196 Logger: logger,
197 Bind: cctx.String("bind"),
198 RedisURL: cctx.String("redis-url"),
199 PLCHost: cctx.String("atp-plc-host"),
200 PLCRateLimit: cctx.Int("plc-rate-limit"),
201 DisableRefresh: cctx.Bool("disable-refresh"),
202 },
203 )
204 if err != nil {
205 return fmt.Errorf("failed to construct server: %v", err)
206 }
207
208 if !cctx.Bool("disable-firehose-consumer") {
209 go func() {
210 firehoseHost := cctx.String("atp-relay-host")
211 firehoseParallelism := cctx.Int("firehose-parallelism")
212 if err := srv.RunFirehoseConsumer(ctx, firehoseHost, firehoseParallelism); err != nil {
213 slog.Error("firehose consumer thread failed", "err", err)
214 // NOTE: not crashing or halting process here
215 }
216 }()
217 go func() {
218 if err := srv.RunPersistCursor(ctx); err != nil {
219 slog.Error("firehose persist thread failed", "err", err)
220 // NOTE: not crashing or halting process here
221 }
222 }()
223 }
224
225 // prometheus HTTP endpoint: /metrics
226 go func() {
227 // TODO: what is this tuning for? just cargo-culted it
228 runtime.SetBlockProfileRate(10)
229 runtime.SetMutexProfileFraction(10)
230 if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil {
231 slog.Error("failed to start metrics endpoint", "error", err)
232 // NOTE: not crashing or halting process here
233 }
234 }()
235
236 return srv.RunAPI()
237}
238
239func runResolveHandleCmd(cctx *cli.Context) error {
240 ctx := context.Background()
241 dir := configClient(cctx)
242
243 s := cctx.Args().First()
244 if s == "" {
245 return fmt.Errorf("need to provide identifier for resolution")
246 }
247 handle, err := syntax.ParseHandle(s)
248 if err != nil {
249 return err
250 }
251
252 did, err := dir.ResolveHandle(ctx, handle)
253 if err != nil {
254 return err
255 }
256 fmt.Println(did.String())
257 return nil
258}
259
260func runResolveDIDCmd(cctx *cli.Context) error {
261 ctx := context.Background()
262 dir := configClient(cctx)
263
264 s := cctx.Args().First()
265 if s == "" {
266 return fmt.Errorf("need to provide identifier for resolution")
267 }
268 did, err := syntax.ParseDID(s)
269 if err != nil {
270 return err
271 }
272
273 raw, err := dir.ResolveDIDRaw(ctx, did)
274 if err != nil {
275 return err
276 }
277 b, err := json.MarshalIndent(raw, "", " ")
278 if err != nil {
279 return err
280 }
281 fmt.Println(string(b))
282 return nil
283}
284
285func runLookupCmd(cctx *cli.Context) error {
286 ctx := context.Background()
287 dir := configClient(cctx)
288
289 s := cctx.Args().First()
290 if s == "" {
291 return fmt.Errorf("need to provide identifier for resolution")
292 }
293 atid, err := syntax.ParseAtIdentifier(s)
294 if err != nil {
295 return err
296 }
297
298 ident, err := dir.Lookup(ctx, *atid)
299 if err != nil {
300 return err
301 }
302
303 b, err := json.MarshalIndent(ident, "", " ")
304 if err != nil {
305 return err
306 }
307 fmt.Println(string(b))
308 return nil
309}
310
311func runRefreshCmd(cctx *cli.Context) error {
312 ctx := context.Background()
313 dir := configClient(cctx)
314
315 s := cctx.Args().First()
316 if s == "" {
317 return fmt.Errorf("need to provide identifier for resolution")
318 }
319 atid, err := syntax.ParseAtIdentifier(s)
320 if err != nil {
321 return err
322 }
323
324 err = dir.Purge(ctx, *atid)
325 if err != nil {
326 return err
327 }
328
329 ident, err := dir.Lookup(ctx, *atid)
330 if err != nil {
331 return err
332 }
333
334 b, err := json.MarshalIndent(ident, "", " ")
335 if err != nil {
336 return err
337 }
338 fmt.Println(string(b))
339 return nil
340}