-32
api/atproto/identityresolveHandle.go
-32
api/atproto/identityresolveHandle.go
···
1
-
// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
2
-
3
-
package atproto
4
-
5
-
// schema: com.atproto.identity.resolveHandle
6
-
7
-
import (
8
-
"context"
9
-
10
-
"github.com/bluesky-social/indigo/xrpc"
11
-
)
12
-
13
-
// IdentityResolveHandle_Output is the output of a com.atproto.identity.resolveHandle call.
14
-
type IdentityResolveHandle_Output struct {
15
-
Did string `json:"did" cborgen:"did"`
16
-
}
17
-
18
-
// IdentityResolveHandle calls the XRPC method "com.atproto.identity.resolveHandle".
19
-
//
20
-
// handle: The handle to resolve. If not supplied, will resolve the host's own handle.
21
-
func IdentityResolveHandle(ctx context.Context, c *xrpc.Client, handle string) (*IdentityResolveHandle_Output, error) {
22
-
var out IdentityResolveHandle_Output
23
-
24
-
params := map[string]interface{}{
25
-
"handle": handle,
26
-
}
27
-
if err := c.Do(ctx, xrpc.Query, "", "com.atproto.identity.resolveHandle", params, nil, &out); err != nil {
28
-
return nil, err
29
-
}
30
-
31
-
return &out, nil
32
-
}
+98
-4
api/extra.go
+98
-4
api/extra.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
+
"io/ioutil"
7
+
"net"
8
+
"net/http"
6
9
"net/url"
10
+
"strings"
7
11
8
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
9
12
"github.com/bluesky-social/indigo/did"
10
13
"github.com/bluesky-social/indigo/xrpc"
14
+
logging "github.com/ipfs/go-log"
11
15
otel "go.opentelemetry.io/otel"
12
16
)
13
17
14
-
func ResolveDidToHandle(ctx context.Context, xrpcc *xrpc.Client, res did.Resolver, udid string) (string, string, error) {
18
+
var log = logging.Logger("api")
19
+
20
+
func ResolveDidToHandle(ctx context.Context, xrpcc *xrpc.Client, res did.Resolver, hr HandleResolver, udid string) (string, string, error) {
15
21
ctx, span := otel.Tracer("gosky").Start(ctx, "resolveDidToHandle")
16
22
defer span.End()
17
23
···
49
55
return "", "", fmt.Errorf("our XRPC client is authed for a different pds (%s != %s)", svc.ServiceEndpoint, xrpcc.Host)
50
56
}
51
57
52
-
verdid, err := comatproto.IdentityResolveHandle(ctx, xrpcc, handle)
58
+
verdid, err := hr.ResolveHandleToDid(ctx, handle)
53
59
if err != nil {
54
60
return "", "", err
55
61
}
56
62
57
-
if verdid.Did != udid {
63
+
if verdid != udid {
58
64
return "", "", fmt.Errorf("pds server reported different did for claimed handle")
59
65
}
60
66
61
67
return handle, svc.ServiceEndpoint, nil
62
68
}
69
+
70
+
type HandleResolver interface {
71
+
ResolveHandleToDid(ctx context.Context, handle string) (string, error)
72
+
}
73
+
74
+
type ProdHandleResolver struct {
75
+
}
76
+
77
+
func (dr *ProdHandleResolver) ResolveHandleToDid(ctx context.Context, handle string) (string, error) {
78
+
c := http.DefaultClient
79
+
80
+
resp, wkerr := c.Get(fmt.Sprintf("https://%s/.well-known/atproto-did", handle))
81
+
if wkerr == nil && resp.StatusCode == 200 {
82
+
b, err := ioutil.ReadAll(resp.Body)
83
+
if err != nil {
84
+
return "", fmt.Errorf("failed to read resolved did: %w", err)
85
+
}
86
+
87
+
parsed, err := did.ParseDID(string(b))
88
+
if err != nil {
89
+
return "", err
90
+
}
91
+
92
+
return parsed.String(), nil
93
+
}
94
+
log.Infof("failed to resolve handle (%s) through well-known route: %s", handle, wkerr)
95
+
96
+
res, err := net.LookupTXT("_atproto." + handle)
97
+
if err != nil {
98
+
return "", fmt.Errorf("handle lookup failed: %w", err)
99
+
}
100
+
101
+
for _, s := range res {
102
+
if strings.HasPrefix(s, "did=") {
103
+
parts := strings.Split(s, "=")
104
+
pdid, err := did.ParseDID(parts[1])
105
+
if err != nil {
106
+
return "", fmt.Errorf("invalid did in record: %w", err)
107
+
}
108
+
109
+
return pdid.String(), nil
110
+
}
111
+
}
112
+
113
+
return "", fmt.Errorf("no did record found for handle %q", handle)
114
+
}
115
+
116
+
type TestHandleResolver struct {
117
+
TrialHosts []string
118
+
}
119
+
120
+
func (tr *TestHandleResolver) ResolveHandleToDid(ctx context.Context, handle string) (string, error) {
121
+
c := http.DefaultClient
122
+
123
+
for _, h := range tr.TrialHosts {
124
+
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/.well-known/atproto-did", h), nil)
125
+
if err != nil {
126
+
return "", err
127
+
}
128
+
129
+
req.Host = handle
130
+
131
+
resp, err := c.Do(req)
132
+
if err != nil {
133
+
log.Warnf("failed to get did: %s", err)
134
+
continue
135
+
}
136
+
137
+
if resp.StatusCode != 200 {
138
+
log.Warnf("got non-200 status code while fetching did: %d", resp.StatusCode)
139
+
continue
140
+
}
141
+
142
+
b, err := ioutil.ReadAll(resp.Body)
143
+
if err != nil {
144
+
return "", fmt.Errorf("failed to read resolved did: %w", err)
145
+
}
146
+
147
+
parsed, err := did.ParseDID(string(b))
148
+
if err != nil {
149
+
return "", err
150
+
}
151
+
152
+
return parsed.String(), nil
153
+
}
154
+
155
+
return "", fmt.Errorf("no did record found for handle %q", handle)
156
+
}
+15
-4
bgs/bgs.go
+15
-4
bgs/bgs.go
···
14
14
"time"
15
15
16
16
"contrib.go.opencensus.io/exporter/prometheus"
17
+
"github.com/bluesky-social/indigo/api"
17
18
atproto "github.com/bluesky-social/indigo/api/atproto"
18
19
comatproto "github.com/bluesky-social/indigo/api/atproto"
19
20
"github.com/bluesky-social/indigo/blobs"
···
48
49
didr did.Resolver
49
50
50
51
blobs blobs.BlobStore
52
+
hr api.HandleResolver
53
+
54
+
// TODO: work on doing away with this flag in favor of more pluggable
55
+
// pieces that abstract the need for explicit ssl checks
56
+
ssl bool
51
57
52
58
crawlOnly bool
53
59
···
58
64
repoman *repomgr.RepoManager
59
65
}
60
66
61
-
func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, ssl bool) (*BGS, error) {
67
+
func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, blobs blobs.BlobStore, hr api.HandleResolver, ssl bool) (*BGS, error) {
62
68
db.AutoMigrate(User{})
63
69
db.AutoMigrate(AuthToken{})
64
70
db.AutoMigrate(models.PDS{})
···
67
73
Index: ix,
68
74
db: db,
69
75
76
+
hr: hr,
70
77
repoman: repoman,
71
78
events: evtman,
72
79
didr: didr,
···
605
612
peering.Host = durl.Host
606
613
peering.SSL = (durl.Scheme == "https")
607
614
615
+
if s.ssl != peering.SSL {
616
+
return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, svc.ServiceEndpoint)
617
+
}
618
+
608
619
if err := s.db.Create(&peering).Error; err != nil {
609
620
return nil, err
610
621
}
···
625
636
626
637
handle := hurl.Host
627
638
628
-
res, err := atproto.IdentityResolveHandle(ctx, c, handle)
639
+
resdid, err := s.hr.ResolveHandleToDid(ctx, handle)
629
640
if err != nil {
630
641
return nil, fmt.Errorf("failed to resolve users claimed handle (%q) on pds: %w", handle, err)
631
642
}
632
643
633
-
if res.Did != did {
634
-
return nil, fmt.Errorf("claimed handle did not match servers response (%s != %s)", res.Did, did)
644
+
if resdid != did {
645
+
return nil, fmt.Errorf("claimed handle did not match servers response (%s != %s)", resdid, did)
635
646
}
636
647
637
648
s.extUserLk.Lock()
+6
-1
cmd/bigsky/main.go
+6
-1
cmd/bigsky/main.go
···
243
243
blobstore = &blobs.DiskBlobStore{bsdir}
244
244
}
245
245
246
-
bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, !cctx.Bool("crawl-insecure-ws"))
246
+
var hr api.HandleResolver = &api.ProdHandleResolver{}
247
+
if cctx.Bool("crawl-insecure-ws") {
248
+
hr = &api.TestHandleResolver{}
249
+
}
250
+
251
+
bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws"))
247
252
if err != nil {
248
253
return err
249
254
}
+20
-9
cmd/gosky/admin.go
+20
-9
cmd/gosky/admin.go
···
57
57
58
58
ctx := context.Background()
59
59
60
-
resp, err := atproto.IdentityResolveHandle(ctx, xrpcc, cctx.Args().First())
60
+
phr := &api.ProdHandleResolver{}
61
+
62
+
rdid, err := phr.ResolveHandleToDid(ctx, cctx.Args().First())
61
63
if err != nil {
62
64
return fmt.Errorf("resolve handle %q: %w", cctx.Args().First(), err)
63
65
}
···
65
67
adminKey := cctx.String("admin-password")
66
68
xrpcc.AdminToken = &adminKey
67
69
68
-
rep, err := atproto.AdminGetRepo(ctx, xrpcc, resp.Did)
70
+
rep, err := atproto.AdminGetRepo(ctx, xrpcc, rdid)
69
71
if err != nil {
70
-
return fmt.Errorf("getRepo %s: %w", resp.Did, err)
72
+
return fmt.Errorf("getRepo %s: %w", rdid, err)
71
73
}
72
74
73
75
b, err := json.MarshalIndent(rep, "", " ")
···
91
93
if fa == "admin" {
92
94
invby = fa
93
95
} else {
94
-
handle, _, err := api.ResolveDidToHandle(ctx, xrpcc, plcc, fa)
96
+
handle, _, err := api.ResolveDidToHandle(ctx, xrpcc, plcc, phr, fa)
95
97
if err != nil {
96
98
return fmt.Errorf("resolve did %q: %w", fa, err)
97
99
}
···
364
366
&cli.BoolFlag{
365
367
Name: "raw",
366
368
},
369
+
&cli.BoolFlag{
370
+
Name: "resolved",
371
+
Value: true,
372
+
},
373
+
&cli.BoolFlag{
374
+
Name: "template-output",
375
+
},
367
376
},
368
377
Action: func(cctx *cli.Context) error {
369
378
xrpcc, err := cliutil.GetXrpcClient(cctx, false)
···
376
385
adminKey := cctx.String("admin-password")
377
386
xrpcc.AdminToken = &adminKey
378
387
379
-
resp, err := atproto.AdminGetModerationReports(ctx, xrpcc, "", "", 100, true, "")
388
+
resp, err := atproto.AdminGetModerationReports(ctx, xrpcc, "", "", 100, cctx.Bool("resolved"), "")
380
389
if err != nil {
381
390
return err
382
391
}
···
430
439
adminKey := cctx.String("admin-password")
431
440
xrpcc.AdminToken = &adminKey
432
441
442
+
phr := &api.ProdHandleResolver{}
433
443
handle := cctx.Args().First()
434
444
if !strings.HasPrefix(handle, "did:") {
435
-
resp, err := atproto.IdentityResolveHandle(ctx, xrpcc, handle)
445
+
resp, err := phr.ResolveHandleToDid(ctx, handle)
436
446
if err != nil {
437
447
return err
438
448
}
439
449
440
-
handle = resp.Did
450
+
handle = resp
441
451
}
442
452
443
453
return atproto.AdminDisableAccountInvites(ctx, xrpcc, &atproto.AdminDisableAccountInvites_Input{
···
469
479
470
480
handle := cctx.Args().First()
471
481
if !strings.HasPrefix(handle, "did:") {
472
-
resp, err := atproto.IdentityResolveHandle(ctx, xrpcc, handle)
482
+
phr := &api.ProdHandleResolver{}
483
+
resp, err := phr.ResolveHandleToDid(ctx, handle)
473
484
if err != nil {
474
485
return err
475
486
}
476
487
477
-
handle = resp.Did
488
+
handle = resp
478
489
}
479
490
480
491
return atproto.AdminEnableAccountInvites(ctx, xrpcc, &atproto.AdminEnableAccountInvites_Input{
+349
-1
cmd/gosky/debug.go
+349
-1
cmd/gosky/debug.go
···
6
6
"encoding/json"
7
7
"fmt"
8
8
"io"
9
+
"io/ioutil"
9
10
"net/http"
11
+
"os"
12
+
"os/signal"
10
13
"strconv"
14
+
"strings"
15
+
"syscall"
11
16
12
17
"github.com/bluesky-social/indigo/api/atproto"
13
18
comatproto "github.com/bluesky-social/indigo/api/atproto"
···
34
39
inspectEventCmd,
35
40
debugStreamCmd,
36
41
debugFeedGenCmd,
42
+
debugFeedViewCmd,
43
+
compareStreamsCmd,
37
44
},
38
45
}
39
46
···
216
223
es = evt.Prev.String()
217
224
}
218
225
219
-
if cs != es {
226
+
if !evt.Rebase && cs != es {
220
227
fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev)
221
228
}
222
229
}
···
254
261
},
255
262
}
256
263
264
+
var compareStreamsCmd = &cli.Command{
265
+
Name: "compare-streams",
266
+
Flags: []cli.Flag{
267
+
&cli.StringFlag{
268
+
Name: "host1",
269
+
Required: true,
270
+
},
271
+
&cli.StringFlag{
272
+
Name: "host2",
273
+
Required: true,
274
+
},
275
+
},
276
+
ArgsUsage: `<cursor>`,
277
+
Action: func(cctx *cli.Context) error {
278
+
h1 := cctx.String("host1")
279
+
h2 := cctx.String("host2")
280
+
281
+
url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1)
282
+
url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2)
283
+
284
+
d := websocket.DefaultDialer
285
+
286
+
eventChans := []chan *comatproto.SyncSubscribeRepos_Commit{
287
+
make(chan *comatproto.SyncSubscribeRepos_Commit, 2),
288
+
make(chan *comatproto.SyncSubscribeRepos_Commit, 2),
289
+
}
290
+
291
+
buffers := []map[string][]*comatproto.SyncSubscribeRepos_Commit{
292
+
make(map[string][]*comatproto.SyncSubscribeRepos_Commit),
293
+
make(map[string][]*comatproto.SyncSubscribeRepos_Commit),
294
+
}
295
+
296
+
addToBuffer := func(n int, event *comatproto.SyncSubscribeRepos_Commit) {
297
+
buffers[n][event.Repo] = append(buffers[n][event.Repo], event)
298
+
}
299
+
300
+
pll := func(ll *lexutil.LexLink) string {
301
+
if ll == nil {
302
+
return "<nil>"
303
+
}
304
+
return ll.String()
305
+
}
306
+
307
+
findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (*comatproto.SyncSubscribeRepos_Commit, error) {
308
+
buf := buffers[n]
309
+
slice, ok := buf[event.Repo]
310
+
if !ok || len(slice) == 0 {
311
+
return nil, nil
312
+
}
313
+
314
+
for i, ev := range slice {
315
+
if ev.Commit == event.Commit {
316
+
if pll(ev.Prev) != pll(event.Prev) {
317
+
// same commit different prev??
318
+
return nil, fmt.Errorf("matched event with same commit but different prev: (%d) %d - %d", n, ev.Seq, event.Seq)
319
+
}
320
+
}
321
+
322
+
if i != 0 {
323
+
fmt.Printf("detected skipped event: %d (%d)\n", slice[0].Seq, i)
324
+
}
325
+
326
+
slice = slice[i+1:]
327
+
buf[event.Repo] = slice
328
+
return ev, nil
329
+
}
330
+
331
+
return nil, fmt.Errorf("did not find matching event despite having events in buffer")
332
+
}
333
+
334
+
printCurrentDelta := func() {
335
+
var a, b int
336
+
for _, sl := range buffers[0] {
337
+
a += len(sl)
338
+
}
339
+
for _, sl := range buffers[1] {
340
+
b += len(sl)
341
+
}
342
+
343
+
fmt.Printf("%d %d\n", a, b)
344
+
}
345
+
346
+
printDetailedDelta := func() {
347
+
for did, sl := range buffers[0] {
348
+
osl := buffers[1][did]
349
+
if len(osl) > 0 && len(sl) > 0 {
350
+
fmt.Printf("%s had mismatched events on both streams (%d, %d)\n", did, len(sl), len(osl))
351
+
}
352
+
353
+
}
354
+
}
355
+
356
+
// Create two goroutines for reading events from two URLs
357
+
for i, url := range []string{url1, url2} {
358
+
go func(i int, url string) {
359
+
con, _, err := d.Dial(url, http.Header{})
360
+
if err != nil {
361
+
log.Fatalf("Dial failure on url%d: %s", i+1, err)
362
+
}
363
+
364
+
ctx := context.TODO()
365
+
rsc := &events.RepoStreamCallbacks{
366
+
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
367
+
eventChans[i] <- evt
368
+
return nil
369
+
},
370
+
// TODO: all the other Repo* event types
371
+
Error: func(evt *events.ErrorFrame) error {
372
+
return fmt.Errorf("%s: %s", evt.Error, evt.Message)
373
+
},
374
+
}
375
+
if err := events.HandleRepoStream(ctx, con, &events.SequentialScheduler{rsc.EventHandler}); err != nil {
376
+
log.Fatalf("HandleRepoStream failure on url%d: %s", i+1, err)
377
+
}
378
+
}(i, url)
379
+
}
380
+
381
+
ch := make(chan os.Signal, 1)
382
+
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
383
+
384
+
// Compare events from the two URLs
385
+
for {
386
+
select {
387
+
case event := <-eventChans[0]:
388
+
partner, err := findMatchAndRemove(1, event)
389
+
if err != nil {
390
+
fmt.Println("checking for match failed: ", err)
391
+
continue
392
+
}
393
+
if partner == nil {
394
+
addToBuffer(0, event)
395
+
} else {
396
+
// the good case
397
+
fmt.Println("Match found")
398
+
}
399
+
400
+
case event := <-eventChans[1]:
401
+
partner, err := findMatchAndRemove(0, event)
402
+
if err != nil {
403
+
fmt.Println("checking for match failed: ", err)
404
+
continue
405
+
}
406
+
if partner == nil {
407
+
addToBuffer(1, event)
408
+
} else {
409
+
// the good case
410
+
fmt.Println("Match found")
411
+
}
412
+
case <-ch:
413
+
printDetailedDelta()
414
+
/*
415
+
b, err := json.Marshal(buffers)
416
+
if err != nil {
417
+
return err
418
+
}
419
+
420
+
fmt.Println(string(b))
421
+
*/
422
+
return nil
423
+
}
424
+
425
+
printCurrentDelta()
426
+
}
427
+
return nil
428
+
},
429
+
}
430
+
257
431
var debugFeedGenCmd = &cli.Command{
258
432
Name: "debug-feed",
259
433
Action: func(cctx *cli.Context) error {
···
385
559
return nil
386
560
},
387
561
}
562
+
var debugFeedViewCmd = &cli.Command{
563
+
Name: "viewFeed",
564
+
Action: func(cctx *cli.Context) error {
565
+
xrpcc, err := cliutil.GetXrpcClient(cctx, true)
566
+
if err != nil {
567
+
return err
568
+
}
569
+
570
+
didr := cliutil.GetDidResolver(cctx)
571
+
572
+
uri := cctx.Args().First()
573
+
puri, err := util.ParseAtUri(uri)
574
+
if err != nil {
575
+
return err
576
+
}
577
+
578
+
ctx := context.TODO()
579
+
580
+
out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey)
581
+
if err != nil {
582
+
return fmt.Errorf("getting record: %w", err)
583
+
}
584
+
585
+
fgr, ok := out.Value.Val.(*bsky.FeedGenerator)
586
+
if !ok {
587
+
return fmt.Errorf("invalid feedgen record")
588
+
}
589
+
590
+
doc, err := didr.GetDocument(ctx, fgr.Did)
591
+
if err != nil {
592
+
return err
593
+
}
594
+
595
+
var ss *did.Service
596
+
for _, s := range doc.Service {
597
+
if s.ID.String() == "#bsky_fg" {
598
+
cp := s
599
+
ss = &cp
600
+
break
601
+
}
602
+
}
603
+
604
+
if ss == nil {
605
+
return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document")
606
+
}
607
+
608
+
fgclient := &xrpc.Client{
609
+
Host: ss.ServiceEndpoint,
610
+
}
611
+
612
+
cache, err := loadCache("postcache.json")
613
+
if err != nil {
614
+
return err
615
+
}
616
+
var cacheUpdate bool
617
+
618
+
var cursor string
619
+
getPage := func(curs string) ([]*bsky.FeedDefs_PostView, error) {
620
+
skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, cursor, uri, 30)
621
+
if err != nil {
622
+
return nil, fmt.Errorf("failed to fetch feed skeleton: %w", err)
623
+
}
624
+
625
+
if skel.Cursor != nil {
626
+
cursor = *skel.Cursor
627
+
}
628
+
629
+
var posts []*bsky.FeedDefs_PostView
630
+
for _, fp := range skel.Feed {
631
+
cached, ok := cache[fp.Post]
632
+
if ok {
633
+
posts = append(posts, cached)
634
+
continue
635
+
}
636
+
fps, err := bsky.FeedGetPosts(ctx, xrpcc, []string{fp.Post})
637
+
if err != nil {
638
+
return nil, err
639
+
}
640
+
641
+
if len(fps.Posts) == 0 {
642
+
fmt.Println("FAILED TO GET POST: ", fp.Post)
643
+
continue
644
+
}
645
+
p := fps.Posts[0]
646
+
rec := p.Record.Val.(*bsky.FeedPost)
647
+
rec.Embed = nil // nil out embeds since they sometimes fail to json marshal...
648
+
posts = append(posts, p)
649
+
cache[fp.Post] = p
650
+
cacheUpdate = true
651
+
}
652
+
653
+
return posts, nil
654
+
}
655
+
656
+
printPosts := func(posts []*bsky.FeedDefs_PostView) {
657
+
for _, p := range posts {
658
+
text := p.Record.Val.(*bsky.FeedPost).Text
659
+
text = strings.Replace(text, "\n", " ", -1)
660
+
if len(text) > 70 {
661
+
text = text[:70] + "..."
662
+
}
663
+
fmt.Printf("%s: %s\n", *p.Author.DisplayName, text)
664
+
}
665
+
}
666
+
667
+
seen := make(map[string]bool)
668
+
for i := 1; i < 5; i++ {
669
+
fmt.Printf("PAGE %d - cursor: %s\n", i, cursor)
670
+
posts, err := getPage(cursor)
671
+
if err != nil {
672
+
return err
673
+
}
674
+
var alreadySeen int
675
+
for _, p := range posts {
676
+
if seen[p.Uri] {
677
+
alreadySeen++
678
+
}
679
+
seen[p.Uri] = true
680
+
}
681
+
fmt.Printf("Already saw %d / %d posts in page 1\n", alreadySeen, len(posts))
682
+
printPosts(posts)
683
+
fmt.Println("")
684
+
fmt.Println("")
685
+
}
686
+
687
+
if cacheUpdate {
688
+
if err := saveCache("postcache.json", cache); err != nil {
689
+
return err
690
+
}
691
+
}
692
+
693
+
return nil
694
+
},
695
+
}
696
+
697
+
func loadCache(filename string) (map[string]*bsky.FeedDefs_PostView, error) {
698
+
var data map[string]*bsky.FeedDefs_PostView
699
+
700
+
jsonFile, err := os.Open(filename)
701
+
if err != nil {
702
+
if os.IsNotExist(err) {
703
+
return make(map[string]*bsky.FeedDefs_PostView), nil
704
+
}
705
+
706
+
return nil, fmt.Errorf("failed to open file: %w", err)
707
+
}
708
+
defer jsonFile.Close()
709
+
710
+
byteValue, err := ioutil.ReadAll(jsonFile)
711
+
if err != nil {
712
+
return nil, fmt.Errorf("failed to read file: %w", err)
713
+
}
714
+
715
+
err = json.Unmarshal(byteValue, &data)
716
+
if err != nil {
717
+
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
718
+
}
719
+
720
+
return data, nil
721
+
}
722
+
723
+
func saveCache(filename string, data map[string]*bsky.FeedDefs_PostView) error {
724
+
file, err := json.MarshalIndent(data, "", " ")
725
+
if err != nil {
726
+
return fmt.Errorf("failed to marshal json: %w", err)
727
+
}
728
+
729
+
err = ioutil.WriteFile(filename, file, 0644)
730
+
if err != nil {
731
+
return fmt.Errorf("failed to write file: %w", err)
732
+
}
733
+
734
+
return nil
735
+
}
+10
-12
cmd/gosky/main.go
+10
-12
cmd/gosky/main.go
···
236
236
return err
237
237
}
238
238
239
-
h, _, err := api.ResolveDidToHandle(context.TODO(), xrpcc, s, did)
239
+
phr := &api.ProdHandleResolver{}
240
+
h, _, err := api.ResolveDidToHandle(context.TODO(), xrpcc, s, phr, did)
240
241
if err != nil {
241
242
return err
242
243
}
···
877
878
Action: func(cctx *cli.Context) error {
878
879
ctx := context.TODO()
879
880
880
-
xrpcc, err := cliutil.GetXrpcClient(cctx, false)
881
-
if err != nil {
882
-
return err
883
-
}
884
-
885
881
args, err := needArgs(cctx, "handle")
886
882
if err != nil {
887
883
return err
888
884
}
889
885
handle := args[0]
890
886
891
-
out, err := comatproto.IdentityResolveHandle(ctx, xrpcc, handle)
887
+
phr := &api.ProdHandleResolver{}
888
+
out, err := phr.ResolveHandleToDid(ctx, handle)
892
889
if err != nil {
893
890
return err
894
891
}
895
892
896
-
fmt.Println(out.Did)
893
+
fmt.Println(out)
897
894
898
895
return nil
899
896
},
···
1207
1204
count := cctx.Int("useCount")
1208
1205
num := cctx.Int("num")
1209
1206
1207
+
phr := &api.ProdHandleResolver{}
1210
1208
if bulkfi := cctx.String("bulk"); bulkfi != "" {
1211
1209
xrpcc.AdminToken = &adminKey
1212
1210
dids, err := readDids(bulkfi)
···
1216
1214
1217
1215
for i, d := range dids {
1218
1216
if !strings.HasPrefix(d, "did:plc:") {
1219
-
out, err := comatproto.IdentityResolveHandle(context.TODO(), xrpcc, d)
1217
+
out, err := phr.ResolveHandleToDid(context.TODO(), d)
1220
1218
if err != nil {
1221
1219
return fmt.Errorf("failed to resolve %q: %w", d, err)
1222
1220
}
1223
1221
1224
-
dids[i] = out.Did
1222
+
dids[i] = out
1225
1223
}
1226
1224
}
1227
1225
···
1240
1238
var usrdid []string
1241
1239
if forUser := cctx.Args().Get(0); forUser != "" {
1242
1240
if !strings.HasPrefix(forUser, "did:") {
1243
-
resp, err := comatproto.IdentityResolveHandle(context.TODO(), xrpcc, forUser)
1241
+
resp, err := phr.ResolveHandleToDid(context.TODO(), forUser)
1244
1242
if err != nil {
1245
1243
return fmt.Errorf("resolving handle: %w", err)
1246
1244
}
1247
1245
1248
-
usrdid = []string{resp.Did}
1246
+
usrdid = []string{resp}
1249
1247
} else {
1250
1248
usrdid = []string{forUser}
1251
1249
}
+2
did/alias.go
+2
did/alias.go
-15
labeler/xrpc_endpoints.go
-15
labeler/xrpc_endpoints.go
···
14
14
15
15
func (s *Server) RegisterHandlersComAtproto(e *echo.Echo) error {
16
16
// handle/account hosting
17
-
e.GET("/xrpc/com.atproto.identity.resolveHandle", s.HandleComAtprotoIdentityResolveHandle)
18
17
e.GET("/xrpc/com.atproto.server.describeServer", s.HandleComAtprotoServerDescribeServer)
19
18
// TODO: session create/refresh/delete?
20
19
···
50
49
e.GET("/xrpc/com.atproto.admin.searchRepos", echo.WrapHandler(rp))
51
50
52
51
return nil
53
-
}
54
-
55
-
func (s *Server) HandleComAtprotoIdentityResolveHandle(c echo.Context) error {
56
-
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoIdentityResolveHandle")
57
-
defer span.End()
58
-
handle := c.QueryParam("handle")
59
-
var out *atproto.IdentityResolveHandle_Output
60
-
var handleErr error
61
-
// func (s *Server) handleComAtprotoIdentityResolveHandle(ctx context.Context,handle string) (*atproto.IdentityResolveHandle_Output, error)
62
-
out, handleErr = s.handleComAtprotoIdentityResolveHandle(ctx, handle)
63
-
if handleErr != nil {
64
-
return handleErr
65
-
}
66
-
return c.JSON(200, out)
67
52
}
68
53
69
54
func (s *Server) HandleComAtprotoServerDescribeServer(c echo.Context) error {
-11
labeler/xrpc_handlers.go
-11
labeler/xrpc_handlers.go
···
16
16
"gorm.io/gorm"
17
17
)
18
18
19
-
func (s *Server) handleComAtprotoIdentityResolveHandle(ctx context.Context, handle string) (*atproto.IdentityResolveHandle_Output, error) {
20
-
// only the one handle, for labelmaker
21
-
if handle == "" {
22
-
return &atproto.IdentityResolveHandle_Output{Did: s.user.SigningKey.Public().DID()}, nil
23
-
} else if handle == s.user.Handle {
24
-
return &atproto.IdentityResolveHandle_Output{Did: s.user.Did}, nil
25
-
} else {
26
-
return nil, echo.NewHTTPError(404, "user not found: %s", handle)
27
-
}
28
-
}
29
-
30
19
func (s *Server) handleComAtprotoServerDescribeServer(ctx context.Context) (*atproto.ServerDescribeServer_Output, error) {
31
20
invcode := true
32
21
return &atproto.ServerDescribeServer_Output{
-12
pds/handlers.go
-12
pds/handlers.go
···
373
373
panic("not yet implemented")
374
374
}
375
375
376
-
func (s *Server) handleComAtprotoIdentityResolveHandle(ctx context.Context, handle string) (*comatprototypes.IdentityResolveHandle_Output, error) {
377
-
if handle == "" {
378
-
return &comatprototypes.IdentityResolveHandle_Output{Did: s.signingKey.Public().DID()}, nil
379
-
}
380
-
u, err := s.lookupUserByHandle(ctx, handle)
381
-
if err != nil {
382
-
return nil, err
383
-
}
384
-
385
-
return &comatprototypes.IdentityResolveHandle_Output{Did: u.Did}, nil
386
-
}
387
-
388
376
func (s *Server) handleComAtprotoRepoApplyWrites(ctx context.Context, body *comatprototypes.RepoApplyWrites_Input) error {
389
377
u, err := s.getUser(ctx)
390
378
if err != nil {
+25
-3
pds/server.go
+25
-3
pds/server.go
···
12
12
"strings"
13
13
"time"
14
14
15
+
"github.com/bluesky-social/indigo/api/atproto"
15
16
comatproto "github.com/bluesky-social/indigo/api/atproto"
16
17
bsky "github.com/bluesky-social/indigo/api/bsky"
17
18
"github.com/bluesky-social/indigo/carstore"
···
173
174
c := &xrpc.Client{Host: svc.ServiceEndpoint}
174
175
175
176
if peering.ID == 0 {
176
-
pdsdid, err := comatproto.IdentityResolveHandle(ctx, c, "")
177
+
cfg, err := atproto.ServerDescribeServer(ctx, c)
177
178
if err != nil {
178
179
// TODO: failing this shouldnt halt our indexing
179
-
return nil, fmt.Errorf("failed to get accounts config for unrecognized pds: %w", err)
180
+
return nil, fmt.Errorf("failed to check unrecognized pds: %w", err)
180
181
}
181
182
183
+
// since handles can be anything, checking against this list doesnt matter...
184
+
_ = cfg
185
+
182
186
// TODO: could check other things, a valid response is good enough for now
183
187
peering.Host = svc.ServiceEndpoint
184
-
peering.Did = pdsdid.Did
185
188
186
189
if err := s.db.Create(&peering).Error; err != nil {
187
190
return nil, err
···
324
327
ctx = context.WithValue(ctx, "auth", auth)
325
328
c.SetRequest(c.Request().WithContext(ctx))
326
329
return true
330
+
case "/.well-known/atproto-did":
331
+
return true
327
332
default:
328
333
return false
329
334
}
···
350
355
s.RegisterHandlersAppBsky(e)
351
356
e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler)
352
357
e.GET("/xrpc/_health", s.HandleHealthCheck)
358
+
e.GET("/.well-known/atproto-did", s.HandleResolveDid)
353
359
354
360
// In order to support booting on random ports in tests, we need to tell the
355
361
// Echo instance it's already got a port, and then use its StartServer
···
371
377
} else {
372
378
return c.JSON(200, HealthStatus{Status: "ok"})
373
379
}
380
+
}
381
+
382
+
func (s *Server) HandleResolveDid(c echo.Context) error {
383
+
ctx := c.Request().Context()
384
+
385
+
handle := c.Request().Host
386
+
if hh := c.Request().Header.Get("Host"); hh != "" {
387
+
handle = hh
388
+
}
389
+
390
+
u, err := s.lookupUserByHandle(ctx, handle)
391
+
if err != nil {
392
+
return fmt.Errorf("resolving %q: %w", handle, err)
393
+
}
394
+
395
+
return c.String(200, u.Did)
374
396
}
375
397
376
398
type User struct {
-15
pds/stubs.go
-15
pds/stubs.go
···
804
804
e.POST("/xrpc/com.atproto.admin.takeModerationAction", s.HandleComAtprotoAdminTakeModerationAction)
805
805
e.POST("/xrpc/com.atproto.admin.updateAccountEmail", s.HandleComAtprotoAdminUpdateAccountEmail)
806
806
e.POST("/xrpc/com.atproto.admin.updateAccountHandle", s.HandleComAtprotoAdminUpdateAccountHandle)
807
-
e.GET("/xrpc/com.atproto.identity.resolveHandle", s.HandleComAtprotoIdentityResolveHandle)
808
807
e.POST("/xrpc/com.atproto.identity.updateHandle", s.HandleComAtprotoIdentityUpdateHandle)
809
808
e.GET("/xrpc/com.atproto.label.queryLabels", s.HandleComAtprotoLabelQueryLabels)
810
809
e.POST("/xrpc/com.atproto.moderation.createReport", s.HandleComAtprotoModerationCreateReport)
···
1164
1163
return handleErr
1165
1164
}
1166
1165
return nil
1167
-
}
1168
-
1169
-
func (s *Server) HandleComAtprotoIdentityResolveHandle(c echo.Context) error {
1170
-
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoIdentityResolveHandle")
1171
-
defer span.End()
1172
-
handle := c.QueryParam("handle")
1173
-
var out *comatprototypes.IdentityResolveHandle_Output
1174
-
var handleErr error
1175
-
// func (s *Server) handleComAtprotoIdentityResolveHandle(ctx context.Context,handle string) (*comatprototypes.IdentityResolveHandle_Output, error)
1176
-
out, handleErr = s.handleComAtprotoIdentityResolveHandle(ctx, handle)
1177
-
if handleErr != nil {
1178
-
return handleErr
1179
-
}
1180
-
return c.JSON(200, out)
1181
1166
}
1182
1167
1183
1168
func (s *Server) HandleComAtprotoIdentityUpdateHandle(c echo.Context) error {
+2
-1
search/handlers.go
+2
-1
search/handlers.go
···
18
18
}
19
19
20
20
func (s *Server) handleFromDid(ctx context.Context, did string) (string, error) {
21
-
handle, _, err := api.ResolveDidToHandle(ctx, s.xrpcc, s.plc, did)
21
+
phr := &api.ProdHandleResolver{}
22
+
handle, _, err := api.ResolveDidToHandle(ctx, s.xrpcc, s.plc, phr, did)
22
23
if err != nil {
23
24
return "", err
24
25
}
+12
testing/integ_test.go
+12
testing/integ_test.go
···
32
32
b1 := MustSetupBGS(t, "localhost:8231", didr)
33
33
b1.Run(t)
34
34
35
+
b1.tr.TrialHosts = []string{p1.host}
36
+
35
37
p1.RequestScraping(t, b1)
36
38
37
39
time.Sleep(time.Millisecond * 50)
···
128
130
b1 := MustSetupBGS(t, "localhost:8281", didr)
129
131
b1.Run(t)
130
132
133
+
b1.tr.TrialHosts = []string{p1.host, p2.host}
134
+
131
135
p1.RequestScraping(t, b1)
132
136
time.Sleep(time.Millisecond * 100)
133
137
···
191
195
192
196
b1 := MustSetupBGS(t, "localhost:8291", didr)
193
197
b1.Run(t)
198
+
199
+
b1.tr.TrialHosts = []string{p1.host, p2.host}
194
200
195
201
p1.RequestScraping(t, b1)
196
202
time.Sleep(time.Millisecond * 50)
···
246
252
b1 := MustSetupBGS(t, "localhost:8391", didr)
247
253
b1.Run(t)
248
254
255
+
b1.tr.TrialHosts = []string{p1.host}
256
+
249
257
p1.RequestScraping(t, b1)
250
258
time.Sleep(time.Millisecond * 50)
251
259
···
278
286
279
287
b1 := MustSetupBGS(t, "localhost:3231", didr)
280
288
b1.Run(t)
289
+
290
+
b1.tr.TrialHosts = []string{p1.host}
281
291
282
292
p1.RequestScraping(t, b1)
283
293
···
329
339
330
340
b1 := MustSetupBGS(t, "localhost:1531", didr)
331
341
b1.Run(t)
342
+
343
+
b1.tr.TrialHosts = []string{p1.host}
332
344
333
345
p1.RequestScraping(t, b1)
334
346
+5
-1
testing/utils.go
+5
-1
testing/utils.go
···
373
373
type TestBGS struct {
374
374
bgs *bgs.BGS
375
375
host string
376
+
tr *api.TestHandleResolver
376
377
}
377
378
378
379
func MustSetupBGS(t *testing.T, host string, didr plc.PLCClient) *TestBGS {
···
435
436
}
436
437
})
437
438
438
-
b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, nil, false)
439
+
tr := &api.TestHandleResolver{}
440
+
441
+
b, err := bgs.NewBGS(maindb, ix, repoman, evtman, didr, nil, tr, false)
439
442
if err != nil {
440
443
return nil, err
441
444
}
···
443
446
return &TestBGS{
444
447
bgs: b,
445
448
host: host,
449
+
tr: tr,
446
450
}, nil
447
451
}
448
452