1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "sort"
8
9 comatproto "github.com/bluesky-social/indigo/api/atproto"
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/indigo/xrpc"
12
13 "github.com/urfave/cli/v2"
14)
15
16var cmdRelay = &cli.Command{
17 Name: "relay",
18 Usage: "sub-commands for relays",
19 Flags: []cli.Flag{
20 &cli.StringFlag{
21 Name: "relay-host",
22 Usage: "method, hostname, and port of Relay instance",
23 Value: "https://bsky.network",
24 EnvVars: []string{"ATP_RELAY_HOST", "RELAY_HOST"},
25 },
26 },
27 Subcommands: []*cli.Command{
28 &cli.Command{
29 Name: "account",
30 Usage: "sub-commands for accounts/repos on relay",
31 Subcommands: []*cli.Command{
32 &cli.Command{
33 Name: "list",
34 Aliases: []string{"ls"},
35 Usage: "enumerate all accounts",
36 Flags: []cli.Flag{
37 &cli.StringFlag{
38 Name: "collection",
39 Aliases: []string{"c"},
40 Usage: "collection (NSID) to match",
41 },
42 &cli.BoolFlag{
43 Name: "json",
44 Usage: "print output as JSON lines",
45 },
46 },
47 Action: runRelayAccountList,
48 },
49 &cli.Command{
50 Name: "status",
51 ArgsUsage: `<did>`,
52 Usage: "describe status of individual account",
53 Flags: []cli.Flag{
54 &cli.BoolFlag{
55 Name: "json",
56 Usage: "print output as JSON",
57 },
58 },
59 Action: runRelayAccountStatus,
60 },
61 },
62 },
63 &cli.Command{
64 Name: "host",
65 Usage: "sub-commands for upstream hosts (eg, PDS)",
66 Subcommands: []*cli.Command{
67 &cli.Command{
68 Name: "request-crawl",
69 Aliases: []string{"add"},
70 Usage: "request crawl of upstream host (eg, PDS)",
71 ArgsUsage: `<hostname>`,
72 Action: runRelayHostRequestCrawl,
73 },
74 &cli.Command{
75 Name: "list",
76 Aliases: []string{"ls"},
77 Usage: "enumerate all hosts indexed by relay",
78 Flags: []cli.Flag{
79 &cli.BoolFlag{
80 Name: "json",
81 Usage: "print output as JSON lines",
82 },
83 },
84 Action: runRelayHostList,
85 },
86 &cli.Command{
87 Name: "status",
88 ArgsUsage: `<hostname>`,
89 Usage: "describe status of individual host",
90 Flags: []cli.Flag{
91 &cli.BoolFlag{
92 Name: "json",
93 Usage: "print output as JSON",
94 },
95 },
96 Action: runRelayHostStatus,
97 },
98 &cli.Command{
99 Name: "diff",
100 Usage: "compare host set (and seq) between two relay instances",
101 ArgsUsage: `<relay-A-url> <relay-B-url>`,
102 Flags: []cli.Flag{
103 &cli.BoolFlag{
104 Name: "verbose",
105 Usage: "print all hosts",
106 },
107 &cli.IntFlag{
108 Name: "seq-slop",
109 Value: 100,
110 Usage: "sequence delta allowed as close enough",
111 },
112 },
113 Action: runRelayHostDiff,
114 },
115 },
116 },
117 cmdRelayAdmin,
118 },
119}
120
121func runRelayAccountList(cctx *cli.Context) error {
122 ctx := cctx.Context
123
124 if cctx.Args().Len() > 0 {
125 return fmt.Errorf("unexpected arguments")
126 }
127
128 client := xrpc.Client{
129 Host: cctx.String("relay-host"),
130 UserAgent: userAgent(),
131 }
132
133 collection := cctx.String("collection")
134 cursor := ""
135 var size int64 = 500
136 for {
137 if collection != "" {
138 resp, err := comatproto.SyncListReposByCollection(ctx, &client, collection, cursor, size)
139 if err != nil {
140 return err
141 }
142 for _, r := range resp.Repos {
143 fmt.Println(r.Did)
144 }
145
146 if resp.Cursor == nil || *resp.Cursor == "" {
147 break
148 }
149 cursor = *resp.Cursor
150 } else {
151 resp, err := comatproto.SyncListRepos(ctx, &client, cursor, size)
152 if err != nil {
153 return err
154 }
155
156 for _, r := range resp.Repos {
157 if cctx.Bool("json") {
158 b, err := json.Marshal(r)
159 if err != nil {
160 return err
161 }
162 fmt.Println(string(b))
163 } else {
164 status := "unknown"
165 if r.Active != nil && *r.Active {
166 status = "active"
167 } else if r.Status != nil {
168 status = *r.Status
169 }
170 fmt.Printf("%s\t%s\t%s\n", r.Did, status, r.Rev)
171 }
172 }
173
174 if resp.Cursor == nil || *resp.Cursor == "" {
175 break
176 }
177 cursor = *resp.Cursor
178 }
179 }
180 return nil
181}
182
183func runRelayAccountStatus(cctx *cli.Context) error {
184 ctx := cctx.Context
185
186 didStr := cctx.Args().First()
187 if didStr == "" {
188 return fmt.Errorf("need to provide account DID as argument")
189 }
190 if cctx.Args().Len() != 1 {
191 return fmt.Errorf("unexpected arguments")
192 }
193
194 did, err := syntax.ParseDID(didStr)
195 if err != nil {
196 return err
197 }
198
199 client := xrpc.Client{
200 Host: cctx.String("relay-host"),
201 UserAgent: userAgent(),
202 }
203
204 r, err := comatproto.SyncGetRepoStatus(ctx, &client, did.String())
205 if err != nil {
206 return err
207 }
208
209 if cctx.Bool("json") {
210 b, err := json.Marshal(r)
211 if err != nil {
212 return err
213 }
214 fmt.Println(string(b))
215 } else {
216 status := "unknown"
217 if r.Active {
218 status = "active"
219 } else if r.Status != nil {
220 status = *r.Status
221 }
222 rev := ""
223 if r.Rev != nil {
224 rev = *r.Rev
225 }
226 fmt.Printf("%s\t%s\t%s\n", r.Did, status, rev)
227 }
228
229 return nil
230}
231
232func runRelayHostRequestCrawl(cctx *cli.Context) error {
233 ctx := cctx.Context
234
235 hostname := cctx.Args().First()
236 if hostname == "" {
237 return fmt.Errorf("need to provide hostname as argument")
238 }
239 if cctx.Args().Len() != 1 {
240 return fmt.Errorf("unexpected arguments")
241 }
242
243 client := xrpc.Client{
244 Host: cctx.String("relay-host"),
245 UserAgent: userAgent(),
246 }
247
248 err := comatproto.SyncRequestCrawl(ctx, &client, &comatproto.SyncRequestCrawl_Input{Hostname: hostname})
249 if err != nil {
250 return err
251 }
252 fmt.Println("success")
253 return nil
254}
255
256func runRelayHostList(cctx *cli.Context) error {
257 ctx := cctx.Context
258
259 if cctx.Args().Len() > 0 {
260 return fmt.Errorf("unexpected arguments")
261 }
262
263 client := xrpc.Client{
264 Host: cctx.String("relay-host"),
265 UserAgent: userAgent(),
266 }
267
268 cursor := ""
269 var size int64 = 500
270 for {
271 resp, err := comatproto.SyncListHosts(ctx, &client, cursor, size)
272 if err != nil {
273 return err
274 }
275
276 for _, h := range resp.Hosts {
277 if cctx.Bool("json") {
278 b, err := json.Marshal(h)
279 if err != nil {
280 return err
281 }
282 fmt.Println(string(b))
283 } else {
284 status := ""
285 if h.Status != nil {
286 status = *h.Status
287 }
288 count := ""
289 if h.AccountCount != nil {
290 count = fmt.Sprintf("%d", *h.AccountCount)
291 }
292 seq := ""
293 if h.Seq != nil {
294 seq = fmt.Sprintf("%d", *h.Seq)
295 }
296 fmt.Printf("%s\t%s\t%s\t%s\n", h.Hostname, status, count, seq)
297 }
298 }
299
300 if resp.Cursor == nil || *resp.Cursor == "" {
301 break
302 }
303 cursor = *resp.Cursor
304 }
305 return nil
306}
307
308func runRelayHostStatus(cctx *cli.Context) error {
309 ctx := cctx.Context
310
311 hostname := cctx.Args().First()
312 if hostname == "" {
313 return fmt.Errorf("need to provide hostname as argument")
314 }
315 if cctx.Args().Len() != 1 {
316 return fmt.Errorf("unexpected arguments")
317 }
318
319 client := xrpc.Client{
320 Host: cctx.String("relay-host"),
321 UserAgent: userAgent(),
322 }
323
324 h, err := comatproto.SyncGetHostStatus(ctx, &client, hostname)
325 if err != nil {
326 return err
327 }
328
329 if cctx.Bool("json") {
330 b, err := json.Marshal(h)
331 if err != nil {
332 return err
333 }
334 fmt.Println(string(b))
335 } else {
336 status := ""
337 if h.Status != nil {
338 status = *h.Status
339 }
340 count := ""
341 if h.AccountCount != nil {
342 count = fmt.Sprintf("%d", *h.AccountCount)
343 }
344 seq := ""
345 if h.Seq != nil {
346 seq = fmt.Sprintf("%d", *h.Seq)
347 }
348 fmt.Printf("%s\t%s\t%s\t%s\n", h.Hostname, status, count, seq)
349 }
350
351 return nil
352}
353
354type hostInfo struct {
355 Hostname string
356 Status string
357 Seq int64
358}
359
360func fetchHosts(ctx context.Context, relayHost string) ([]hostInfo, error) {
361
362 client := xrpc.Client{
363 Host: relayHost,
364 UserAgent: userAgent(),
365 }
366
367 hosts := []hostInfo{}
368 cursor := ""
369 var size int64 = 500
370 for {
371 resp, err := comatproto.SyncListHosts(ctx, &client, cursor, size)
372 if err != nil {
373 return nil, err
374 }
375
376 for _, h := range resp.Hosts {
377 if h.Status == nil || h.Seq == nil || *h.Seq <= 0 {
378 continue
379 }
380
381 // TODO: only active or idle hosts?
382 info := hostInfo{
383 Hostname: h.Hostname,
384 Status: *h.Status,
385 Seq: *h.Seq,
386 }
387 hosts = append(hosts, info)
388 }
389
390 if resp.Cursor == nil || *resp.Cursor == "" {
391 break
392 }
393 cursor = *resp.Cursor
394 }
395 return hosts, nil
396}
397
398func runRelayHostDiff(cctx *cli.Context) error {
399 ctx := cctx.Context
400 verbose := cctx.Bool("verbose")
401 seqSlop := cctx.Int64("seq-slop")
402
403 if cctx.Args().Len() != 2 {
404 return fmt.Errorf("expected two relay URLs are args")
405 }
406
407 urlOne := cctx.Args().Get(0)
408 urlTwo := cctx.Args().Get(1)
409
410 listOne, err := fetchHosts(ctx, urlOne)
411 if err != nil {
412 return err
413 }
414 listTwo, err := fetchHosts(ctx, urlTwo)
415 if err != nil {
416 return err
417 }
418
419 allHosts := make(map[string]bool)
420 mapOne := make(map[string]hostInfo)
421 for _, val := range listOne {
422 allHosts[val.Hostname] = true
423 mapOne[val.Hostname] = val
424 }
425 mapTwo := make(map[string]hostInfo)
426 for _, val := range listTwo {
427 allHosts[val.Hostname] = true
428 mapTwo[val.Hostname] = val
429 }
430
431 names := []string{}
432 for k, _ := range allHosts {
433 names = append(names, k)
434 }
435 sort.Strings(names)
436
437 for _, k := range names {
438 one, okOne := mapOne[k]
439 two, okTwo := mapTwo[k]
440 if !okOne {
441 if !verbose && two.Status != "active" {
442 continue
443 }
444 fmt.Printf("%s\t\t%s/%d\tA-missing\n", k, two.Status, two.Seq)
445 } else if !okTwo {
446 if !verbose && one.Status != "active" {
447 continue
448 }
449 fmt.Printf("%s\t%s/%d\t\tB-missing\n", k, one.Status, one.Seq)
450 } else {
451 status := ""
452 if one.Status != two.Status {
453 status = "diff-status"
454 } else {
455 delta := max(one.Seq, two.Seq) - min(one.Seq, two.Seq)
456 if delta == 0 {
457 status = "sync"
458 if !verbose {
459 continue
460 }
461 } else if delta < seqSlop {
462 status = "nearly"
463 if !verbose {
464 continue
465 }
466 } else {
467 status = fmt.Sprintf("delta=%d", delta)
468 }
469 }
470 fmt.Printf("%s\t%s/%d\t%s/%d\t%s\n", k, one.Status, one.Seq, two.Status, two.Seq, status)
471 }
472 }
473
474 return nil
475}