+87
-3
api/atproto/cbor_gen.go
+87
-3
api/atproto/cbor_gen.go
···
194
194
195
195
cw := cbg.NewCborWriter(w)
196
196
197
-
if _, err := cw.Write([]byte{170}); err != nil {
197
+
if _, err := cw.Write([]byte{172}); err != nil {
198
198
return err
199
199
}
200
200
···
221
221
if err := v.MarshalCBOR(cw); err != nil {
222
222
return err
223
223
}
224
+
}
225
+
226
+
// t.Rev (string) (string)
227
+
if len("rev") > cbg.MaxLength {
228
+
return xerrors.Errorf("Value in field \"rev\" was too long")
229
+
}
230
+
231
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil {
232
+
return err
233
+
}
234
+
if _, err := cw.WriteString(string("rev")); err != nil {
235
+
return err
236
+
}
237
+
238
+
if len(t.Rev) > cbg.MaxLength {
239
+
return xerrors.Errorf("Value in field t.Rev was too long")
240
+
}
241
+
242
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil {
243
+
return err
244
+
}
245
+
if _, err := cw.WriteString(string(t.Rev)); err != nil {
246
+
return err
224
247
}
225
248
226
249
// t.Seq (int64) (int64)
···
332
355
}
333
356
}
334
357
358
+
// t.Since (string) (string)
359
+
if len("since") > cbg.MaxLength {
360
+
return xerrors.Errorf("Value in field \"since\" was too long")
361
+
}
362
+
363
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("since"))); err != nil {
364
+
return err
365
+
}
366
+
if _, err := cw.WriteString(string("since")); err != nil {
367
+
return err
368
+
}
369
+
370
+
if t.Since == nil {
371
+
if _, err := cw.Write(cbg.CborNull); err != nil {
372
+
return err
373
+
}
374
+
} else {
375
+
if len(*t.Since) > cbg.MaxLength {
376
+
return xerrors.Errorf("Value in field t.Since was too long")
377
+
}
378
+
379
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.Since))); err != nil {
380
+
return err
381
+
}
382
+
if _, err := cw.WriteString(string(*t.Since)); err != nil {
383
+
return err
384
+
}
385
+
}
386
+
335
387
// t.Blocks (util.LexBytes) (slice)
336
388
if len("blocks") > cbg.MaxLength {
337
389
return xerrors.Errorf("Value in field \"blocks\" was too long")
···
474
526
t.Ops[i] = &v
475
527
}
476
528
529
+
// t.Rev (string) (string)
530
+
case "rev":
531
+
532
+
{
533
+
sval, err := cbg.ReadString(cr)
534
+
if err != nil {
535
+
return err
536
+
}
537
+
538
+
t.Rev = string(sval)
539
+
}
477
540
// t.Seq (int64) (int64)
478
541
case "seq":
479
542
{
···
572
635
t.Blobs[i] = v
573
636
}
574
637
638
+
// t.Since (string) (string)
639
+
case "since":
640
+
641
+
{
642
+
b, err := cr.ReadByte()
643
+
if err != nil {
644
+
return err
645
+
}
646
+
if b != cbg.CborNull[0] {
647
+
if err := cr.UnreadByte(); err != nil {
648
+
return err
649
+
}
650
+
651
+
sval, err := cbg.ReadString(cr)
652
+
if err != nil {
653
+
return err
654
+
}
655
+
656
+
t.Since = (*string)(&sval)
657
+
}
658
+
}
575
659
// t.Blocks (util.LexBytes) (slice)
576
660
case "blocks":
577
661
···
1606
1690
return err
1607
1691
}
1608
1692
1609
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("com.atproto.label.defs"))); err != nil {
1693
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("com.atproto.label.defs#selfLabels"))); err != nil {
1610
1694
return err
1611
1695
}
1612
-
if _, err := cw.WriteString(string("com.atproto.label.defs")); err != nil {
1696
+
if _, err := cw.WriteString(string("com.atproto.label.defs#selfLabels")); err != nil {
1613
1697
return err
1614
1698
}
1615
1699
+1
-1
api/atproto/repostrongRef.go
+1
-1
api/atproto/repostrongRef.go
···
13
13
} // RepoStrongRef is a "main" in the com.atproto.repo.strongRef schema.
14
14
// RECORDTYPE: RepoStrongRef
15
15
type RepoStrongRef struct {
16
-
LexiconTypeID string `json:"$type,const=com.atproto.repo.strongRef#main,omitempty" cborgen:"$type,const=com.atproto.repo.strongRef#main,omitempty"`
16
+
LexiconTypeID string `json:"$type,const=com.atproto.repo.strongRef,omitempty" cborgen:"$type,const=com.atproto.repo.strongRef,omitempty"`
17
17
Cid string `json:"cid" cborgen:"cid"`
18
18
Uri string `json:"uri" cborgen:"uri"`
19
19
}
+6
-6
api/bsky/cbor_gen.go
+6
-6
api/bsky/cbor_gen.go
···
2867
2867
return err
2868
2868
}
2869
2869
2870
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.richtext.facet"))); err != nil {
2870
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.richtext.facet#link"))); err != nil {
2871
2871
return err
2872
2872
}
2873
-
if _, err := cw.WriteString(string("app.bsky.richtext.facet")); err != nil {
2873
+
if _, err := cw.WriteString(string("app.bsky.richtext.facet#link")); err != nil {
2874
2874
return err
2875
2875
}
2876
2876
return nil
···
2992
2992
return err
2993
2993
}
2994
2994
2995
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.richtext.facet"))); err != nil {
2995
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.richtext.facet#mention"))); err != nil {
2996
2996
return err
2997
2997
}
2998
-
if _, err := cw.WriteString(string("app.bsky.richtext.facet")); err != nil {
2998
+
if _, err := cw.WriteString(string("app.bsky.richtext.facet#mention")); err != nil {
2999
2999
return err
3000
3000
}
3001
3001
return nil
···
3280
3280
return err
3281
3281
}
3282
3282
3283
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.feed.defs"))); err != nil {
3283
+
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("app.bsky.feed.defs#notFoundPost"))); err != nil {
3284
3284
return err
3285
3285
}
3286
-
if _, err := cw.WriteString(string("app.bsky.feed.defs")); err != nil {
3286
+
if _, err := cw.WriteString(string("app.bsky.feed.defs#notFoundPost")); err != nil {
3287
3287
return err
3288
3288
}
3289
3289
+1
-1
api/bsky/embedexternal.go
+1
-1
api/bsky/embedexternal.go
···
13
13
} // EmbedExternal is a "main" in the app.bsky.embed.external schema.
14
14
// RECORDTYPE: EmbedExternal
15
15
type EmbedExternal struct {
16
-
LexiconTypeID string `json:"$type,const=app.bsky.embed.external#main" cborgen:"$type,const=app.bsky.embed.external#main"`
16
+
LexiconTypeID string `json:"$type,const=app.bsky.embed.external" cborgen:"$type,const=app.bsky.embed.external"`
17
17
External *EmbedExternal_External `json:"external" cborgen:"external"`
18
18
}
19
19
+1
-1
api/bsky/embedimages.go
+1
-1
api/bsky/embedimages.go
···
13
13
} // EmbedImages is a "main" in the app.bsky.embed.images schema.
14
14
// RECORDTYPE: EmbedImages
15
15
type EmbedImages struct {
16
-
LexiconTypeID string `json:"$type,const=app.bsky.embed.images#main" cborgen:"$type,const=app.bsky.embed.images#main"`
16
+
LexiconTypeID string `json:"$type,const=app.bsky.embed.images" cborgen:"$type,const=app.bsky.embed.images"`
17
17
Images []*EmbedImages_Image `json:"images" cborgen:"images"`
18
18
}
19
19
+1
-1
api/bsky/embedrecord.go
+1
-1
api/bsky/embedrecord.go
···
17
17
} // EmbedRecord is a "main" in the app.bsky.embed.record schema.
18
18
// RECORDTYPE: EmbedRecord
19
19
type EmbedRecord struct {
20
-
LexiconTypeID string `json:"$type,const=app.bsky.embed.record#main" cborgen:"$type,const=app.bsky.embed.record#main"`
20
+
LexiconTypeID string `json:"$type,const=app.bsky.embed.record" cborgen:"$type,const=app.bsky.embed.record"`
21
21
Record *comatprototypes.RepoStrongRef `json:"record" cborgen:"record"`
22
22
}
23
23
+1
-1
api/bsky/embedrecordWithMedia.go
+1
-1
api/bsky/embedrecordWithMedia.go
···
19
19
} // EmbedRecordWithMedia is a "main" in the app.bsky.embed.recordWithMedia schema.
20
20
// RECORDTYPE: EmbedRecordWithMedia
21
21
type EmbedRecordWithMedia struct {
22
-
LexiconTypeID string `json:"$type,const=app.bsky.embed.recordWithMedia#main" cborgen:"$type,const=app.bsky.embed.recordWithMedia#main"`
22
+
LexiconTypeID string `json:"$type,const=app.bsky.embed.recordWithMedia" cborgen:"$type,const=app.bsky.embed.recordWithMedia"`
23
23
Media *EmbedRecordWithMedia_Media `json:"media" cborgen:"media"`
24
24
Record *EmbedRecord `json:"record" cborgen:"record"`
25
25
}
+1
-11
bgs/bgs.go
+1
-11
bgs/bgs.go
···
275
275
276
276
e.GET("/xrpc/com.atproto.sync.subscribeRepos", bgs.EventsHandler)
277
277
e.GET("/xrpc/com.atproto.sync.getCheckout", bgs.HandleComAtprotoSyncGetCheckout)
278
-
e.GET("/xrpc/com.atproto.sync.getCommitPath", bgs.HandleComAtprotoSyncGetCommitPath)
279
278
e.GET("/xrpc/com.atproto.sync.getHead", bgs.HandleComAtprotoSyncGetHead)
280
279
e.GET("/xrpc/com.atproto.sync.getRecord", bgs.HandleComAtprotoSyncGetRecord)
281
280
e.GET("/xrpc/com.atproto.sync.getRepo", bgs.HandleComAtprotoSyncGetRepo)
···
755
754
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
756
755
}
757
756
758
-
if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks, evt.Ops); err != nil {
757
+
if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil {
759
758
log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
760
759
761
760
if errors.Is(err, carstore.ErrRepoBaseMismatch) {
···
767
766
span.SetAttributes(attribute.Bool("catchup_queue", true))
768
767
769
768
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
770
-
}
771
-
772
-
if errors.Is(err, carstore.ErrRepoFork) {
773
-
log.Errorw("detected repo fork", "from", stringLink(evt.Prev), "host", host.Host, "repo", u.Did)
774
-
775
-
span.SetAttributes(attribute.Bool("catchup_queue", true))
776
-
span.SetAttributes(attribute.Bool("fork", true))
777
-
778
-
return fmt.Errorf("cannot process repo fork")
779
769
}
780
770
781
771
return fmt.Errorf("handle user event failed: %w", err)
+12
-27
bgs/handlers.go
+12
-27
bgs/handlers.go
···
12
12
comatprototypes "github.com/bluesky-social/indigo/api/atproto"
13
13
"github.com/bluesky-social/indigo/util"
14
14
"github.com/bluesky-social/indigo/xrpc"
15
-
"github.com/ipfs/go-cid"
16
15
"github.com/labstack/echo/v4"
17
16
)
18
17
19
-
func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context, commit string, did string) (io.Reader, error) {
18
+
func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context, did string) (io.Reader, error) {
20
19
/*
21
20
u, err := s.Index.LookupUserByDid(ctx, did)
22
21
if err != nil {
···
63
62
return nil, fmt.Errorf("nyi")
64
63
}
65
64
66
-
func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, earliest string, latest string) (io.Reader, error) {
65
+
func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) {
67
66
u, err := s.Index.LookupUserByDid(ctx, did)
68
67
if err != nil {
69
68
return nil, err
70
69
}
71
70
72
-
var earlyCid, lateCid cid.Cid
73
-
if earliest != "" {
74
-
c, err := cid.Decode(earliest)
75
-
if err != nil {
76
-
return nil, err
77
-
}
78
-
79
-
earlyCid = c
80
-
}
81
-
82
-
if latest != "" {
83
-
c, err := cid.Decode(latest)
84
-
if err != nil {
85
-
return nil, err
86
-
}
87
-
88
-
lateCid = c
89
-
}
90
-
91
71
// TODO: stream the response
92
72
buf := new(bytes.Buffer)
93
-
if err := s.repoman.ReadRepo(ctx, u.Uid, earlyCid, lateCid, buf); err != nil {
94
-
return nil, err
73
+
if err := s.repoman.ReadRepo(ctx, u.Uid, since, buf); err != nil {
74
+
return nil, fmt.Errorf("failed to read repo: %w", err)
95
75
}
96
76
97
77
return buf, nil
···
101
81
return nil, fmt.Errorf("NYI")
102
82
}
103
83
104
-
func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, host string) error {
84
+
func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatprototypes.SyncRequestCrawl_Input) error {
85
+
host := body.Hostname
105
86
if host == "" {
106
87
return fmt.Errorf("must pass valid hostname")
107
88
}
···
151
132
return s.slurper.SubscribeToPds(ctx, norm, true)
152
133
}
153
134
154
-
func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, hostname string) error {
135
+
func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error {
155
136
// TODO:
156
137
return nil
157
138
}
···
169
150
return bytes.NewReader(b), nil
170
151
}
171
152
172
-
func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncListBlobs_Output, error) {
153
+
func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context, cursor string, did string, limit int, since string) (*comatprototypes.SyncListBlobs_Output, error) {
173
154
return nil, fmt.Errorf("NYI")
174
155
}
175
156
176
157
func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) {
177
158
return nil, fmt.Errorf("NYI")
178
159
}
160
+
161
+
func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) {
162
+
return nil, fmt.Errorf("NYI")
163
+
}
+45
-45
bgs/stubs.go
+45
-45
bgs/stubs.go
···
1
1
package bgs
2
2
3
3
import (
4
-
"fmt"
5
4
"io"
6
5
"strconv"
7
6
···
18
17
e.GET("/xrpc/com.atproto.sync.getBlob", s.HandleComAtprotoSyncGetBlob)
19
18
e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks)
20
19
e.GET("/xrpc/com.atproto.sync.getCheckout", s.HandleComAtprotoSyncGetCheckout)
21
-
e.GET("/xrpc/com.atproto.sync.getCommitPath", s.HandleComAtprotoSyncGetCommitPath)
22
20
e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetHead)
21
+
e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit)
23
22
e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord)
24
23
e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo)
25
24
e.GET("/xrpc/com.atproto.sync.listBlobs", s.HandleComAtprotoSyncListBlobs)
26
25
e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos)
27
-
e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate)
28
-
e.GET("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl)
26
+
e.POST("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate)
27
+
e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl)
29
28
return nil
30
29
}
31
30
···
63
62
func (s *BGS) HandleComAtprotoSyncGetCheckout(c echo.Context) error {
64
63
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCheckout")
65
64
defer span.End()
66
-
commit := c.QueryParam("commit")
67
65
did := c.QueryParam("did")
68
66
var out io.Reader
69
67
var handleErr error
70
-
// func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context,commit string,did string) (io.Reader, error)
71
-
out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, commit, did)
68
+
// func (s *BGS) handleComAtprotoSyncGetCheckout(ctx context.Context,did string) (io.Reader, error)
69
+
out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, did)
72
70
if handleErr != nil {
73
71
return handleErr
74
72
}
75
73
return c.Stream(200, "application/vnd.ipld.car", out)
76
74
}
77
75
78
-
func (s *BGS) HandleComAtprotoSyncGetCommitPath(c echo.Context) error {
79
-
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCommitPath")
76
+
func (s *BGS) HandleComAtprotoSyncGetHead(c echo.Context) error {
77
+
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead")
80
78
defer span.End()
81
79
did := c.QueryParam("did")
82
-
earliest := c.QueryParam("earliest")
83
-
latest := c.QueryParam("latest")
84
-
var out *comatprototypes.SyncGetCommitPath_Output
80
+
var out *comatprototypes.SyncGetHead_Output
85
81
var handleErr error
86
-
// func (s *BGS) handleComAtprotoSyncGetCommitPath(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncGetCommitPath_Output, error)
87
-
out, handleErr = s.handleComAtprotoSyncGetCommitPath(ctx, did, earliest, latest)
82
+
// func (s *BGS) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error)
83
+
out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did)
88
84
if handleErr != nil {
89
85
return handleErr
90
86
}
91
87
return c.JSON(200, out)
92
88
}
93
89
94
-
func (s *BGS) HandleComAtprotoSyncGetHead(c echo.Context) error {
95
-
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead")
90
+
func (s *BGS) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error {
91
+
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit")
96
92
defer span.End()
97
93
did := c.QueryParam("did")
98
-
var out *comatprototypes.SyncGetHead_Output
94
+
var out *comatprototypes.SyncGetLatestCommit_Output
99
95
var handleErr error
100
-
// func (s *BGS) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error)
101
-
out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did)
96
+
// func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context,did string) (*comatprototypes.SyncGetLatestCommit_Output, error)
97
+
out, handleErr = s.handleComAtprotoSyncGetLatestCommit(ctx, did)
102
98
if handleErr != nil {
103
99
return handleErr
104
100
}
···
126
122
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRepo")
127
123
defer span.End()
128
124
did := c.QueryParam("did")
129
-
earliest := c.QueryParam("earliest")
130
-
latest := c.QueryParam("latest")
125
+
since := c.QueryParam("since")
131
126
var out io.Reader
132
127
var handleErr error
133
-
// func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context,did string,earliest string,latest string) (io.Reader, error)
134
-
out, handleErr = s.handleComAtprotoSyncGetRepo(ctx, did, earliest, latest)
128
+
// func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context,did string,since string) (io.Reader, error)
129
+
out, handleErr = s.handleComAtprotoSyncGetRepo(ctx, did, since)
135
130
if handleErr != nil {
136
131
return handleErr
137
132
}
···
141
136
func (s *BGS) HandleComAtprotoSyncListBlobs(c echo.Context) error {
142
137
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListBlobs")
143
138
defer span.End()
139
+
cursor := c.QueryParam("cursor")
144
140
did := c.QueryParam("did")
145
-
earliest := c.QueryParam("earliest")
146
-
latest := c.QueryParam("latest")
141
+
142
+
var limit int
143
+
if p := c.QueryParam("limit"); p != "" {
144
+
var err error
145
+
limit, err = strconv.Atoi(p)
146
+
if err != nil {
147
+
return err
148
+
}
149
+
} else {
150
+
limit = 500
151
+
}
152
+
since := c.QueryParam("since")
147
153
var out *comatprototypes.SyncListBlobs_Output
148
154
var handleErr error
149
-
// func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncListBlobs_Output, error)
150
-
out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, did, earliest, latest)
155
+
// func (s *BGS) handleComAtprotoSyncListBlobs(ctx context.Context,cursor string,did string,limit int,since string) (*comatprototypes.SyncListBlobs_Output, error)
156
+
out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, cursor, did, limit, since)
151
157
if handleErr != nil {
152
158
return handleErr
153
159
}
···
182
188
func (s *BGS) HandleComAtprotoSyncNotifyOfUpdate(c echo.Context) error {
183
189
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncNotifyOfUpdate")
184
190
defer span.End()
185
-
hostname := c.QueryParam("hostname")
191
+
192
+
var body comatprototypes.SyncNotifyOfUpdate_Input
193
+
if err := c.Bind(&body); err != nil {
194
+
return err
195
+
}
186
196
var handleErr error
187
-
// func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context,hostname string) error
188
-
handleErr = s.handleComAtprotoSyncNotifyOfUpdate(ctx, hostname)
197
+
// func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context,body *comatprototypes.SyncNotifyOfUpdate_Input) error
198
+
handleErr = s.handleComAtprotoSyncNotifyOfUpdate(ctx, &body)
189
199
if handleErr != nil {
190
200
return handleErr
191
201
}
···
196
206
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncRequestCrawl")
197
207
defer span.End()
198
208
199
-
var hostname string
200
-
switch c.Request().Method {
201
-
case "GET":
202
-
hostname = c.QueryParam("hostname")
203
-
case "POST":
204
-
var m map[string]string
205
-
if err := c.Bind(&m); err != nil {
206
-
return err
207
-
}
208
-
209
-
hostname = m["hostname"]
210
-
default:
211
-
return fmt.Errorf("invalid method for handler")
209
+
var body comatprototypes.SyncRequestCrawl_Input
210
+
if err := c.Bind(&body); err != nil {
211
+
return err
212
212
}
213
213
var handleErr error
214
-
// func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context,hostname string) error
215
-
handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, hostname)
214
+
// func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatprototypes.SyncRequestCrawl_Input) error
215
+
handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, &body)
216
216
if handleErr != nil {
217
217
return handleErr
218
218
}
+47
-52
carstore/bs.go
+47
-52
carstore/bs.go
···
74
74
Path string
75
75
Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"`
76
76
Rebase bool
77
+
Rev string
77
78
}
78
79
79
80
type blockRef struct {
···
243
244
rmcids map[cid.Cid]bool
244
245
base blockstore.Blockstore
245
246
user models.Uid
247
+
baseCid cid.Cid
246
248
seq int
247
249
readonly bool
248
250
cs *CarStore
···
292
294
293
295
var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head")
294
296
295
-
var ErrRepoFork = fmt.Errorf("repo fork detected")
296
-
297
-
func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, prev *cid.Cid) (*DeltaSession, error) {
297
+
func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
298
298
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
299
299
defer span.End()
300
300
···
305
305
return nil, err
306
306
}
307
307
308
-
if prev != nil {
309
-
if lastShard.Root.CID != *prev {
310
-
fork, err := cs.checkFork(ctx, user, *prev)
311
-
if err != nil {
312
-
return nil, fmt.Errorf("failed to check carstore base mismatch for fork condition: %w", err)
313
-
}
314
-
315
-
if fork {
316
-
return nil, fmt.Errorf("fork at %s: %w", prev.String(), ErrRepoFork)
317
-
}
318
-
319
-
return nil, fmt.Errorf("mismatch: %s != %s: %w", lastShard.Root.CID, prev.String(), ErrRepoBaseMismatch)
320
-
}
308
+
if since != nil && *since != lastShard.Rev {
309
+
return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
321
310
}
322
311
323
312
return &DeltaSession{
···
329
318
prefetch: true,
330
319
cache: make(map[cid.Cid]blockformat.Block),
331
320
},
332
-
user: user,
333
-
cs: cs,
334
-
seq: lastShard.Seq + 1,
321
+
user: user,
322
+
baseCid: lastShard.Root.CID,
323
+
cs: cs,
324
+
seq: lastShard.Seq + 1,
335
325
}, nil
336
326
}
337
327
···
349
339
}, nil
350
340
}
351
341
352
-
func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, earlyCid, lateCid cid.Cid, incremental bool, w io.Writer) error {
342
+
func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
353
343
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
354
344
defer span.End()
355
345
356
-
var lateSeq, earlySeq int
357
-
358
-
if earlyCid.Defined() {
346
+
var earlySeq int
347
+
if sinceRev != "" {
359
348
var untilShard CarShard
360
-
if err := cs.meta.First(&untilShard, "root = ? AND usr = ?", models.DbCID{earlyCid}, user).Error; err != nil {
349
+
if err := cs.meta.Where("rev >= ? AND usr = ?", sinceRev, user).Order("rev").First(&untilShard).Error; err != nil {
361
350
return fmt.Errorf("finding early shard: %w", err)
362
351
}
363
352
earlySeq = untilShard.Seq
364
353
}
365
354
366
-
if lateCid.Defined() {
367
-
var fromShard CarShard
368
-
if err := cs.meta.First(&fromShard, "root = ? AND usr = ?", models.DbCID{lateCid}, user).Error; err != nil {
369
-
return fmt.Errorf("finding late shard: %w", err)
355
+
q := cs.meta.Order("seq desc").Where("usr = ? AND seq >= ?", user, earlySeq)
356
+
/*
357
+
if lateCid.Defined() {
358
+
q = q.Where("seq <= ?", lateSeq)
370
359
}
371
-
lateSeq = fromShard.Seq
372
-
}
373
-
374
-
q := cs.meta.Order("seq desc").Where("usr = ? AND seq > ?", user, earlySeq)
375
-
if lateCid.Defined() {
376
-
q = q.Where("seq <= ?", lateSeq)
377
-
}
360
+
*/
378
361
var shards []CarShard
379
-
if err := q.Find(&shards).Error; err != nil {
362
+
if err := q.Debug().Find(&shards).Error; err != nil {
380
363
return err
381
364
}
382
365
383
-
if !incremental && earlyCid.Defined() {
366
+
if !incremental && earlySeq > 0 {
384
367
// have to do it the ugly way
385
368
return fmt.Errorf("nyi")
386
369
}
···
462
445
}
463
446
464
447
var _ blockstore.Blockstore = (*DeltaSession)(nil)
448
+
449
+
func (ds *DeltaSession) BaseCid() cid.Cid {
450
+
return ds.baseCid
451
+
}
465
452
466
453
func (ds *DeltaSession) Put(ctx context.Context, b blockformat.Block) error {
467
454
if ds.readonly {
···
563
550
564
551
// CloseWithRoot writes all new blocks in a car file to the writer with the
565
552
// given cid as the 'root'
566
-
func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid) ([]byte, error) {
567
-
return ds.closeWithRoot(ctx, root, false)
553
+
func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error) {
554
+
return ds.closeWithRoot(ctx, root, rev, false)
568
555
}
569
556
570
557
func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
···
585
572
return hnw, nil
586
573
}
587
574
588
-
func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rebase bool) ([]byte, error) {
575
+
func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rev string, rebase bool) ([]byte, error) {
589
576
ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot")
590
577
defer span.End()
591
578
···
640
627
Seq: ds.seq,
641
628
Path: path,
642
629
Usr: ds.user,
630
+
Rev: rev,
643
631
}
644
632
645
633
if err := ds.putShard(ctx, &shard, brefs); err != nil {
···
733
721
return nil
734
722
}
735
723
736
-
func (ds *DeltaSession) CloseAsRebase(ctx context.Context, root cid.Cid) error {
737
-
_, err := ds.closeWithRoot(ctx, root, true)
724
+
func (ds *DeltaSession) CloseAsRebase(ctx context.Context, root cid.Cid, rev string) error {
725
+
_, err := ds.closeWithRoot(ctx, root, rev, true)
738
726
if err != nil {
739
727
return err
740
728
}
···
855
843
return dropset, nil
856
844
}
857
845
858
-
func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, prev *cid.Cid, carslice []byte) (cid.Cid, *DeltaSession, error) {
846
+
func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
859
847
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
860
848
defer span.End()
861
849
···
868
856
return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
869
857
}
870
858
871
-
ds, err := cs.NewDeltaSession(ctx, uid, prev)
859
+
ds, err := cs.NewDeltaSession(ctx, uid, since)
872
860
if err != nil {
873
-
return cid.Undef, nil, err
861
+
return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
874
862
}
875
863
876
864
var cids []cid.Cid
···
890
878
}
891
879
}
892
880
893
-
base := cid.Undef
894
-
if prev != nil {
895
-
base = *prev
896
-
}
897
-
898
-
rmcids, err := BlockDiff(ctx, ds, base, cids)
881
+
rmcids, err := BlockDiff(ctx, ds, ds.baseCid, cids)
899
882
if err != nil {
900
-
return cid.Undef, nil, err
883
+
return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s): %w", ds.baseCid, err)
901
884
}
902
885
903
886
ds.rmcids = rmcids
···
915
898
}
916
899
917
900
return lastShard.Root.CID, nil
901
+
}
902
+
903
+
func (cs *CarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
904
+
lastShard, err := cs.getLastShard(ctx, user)
905
+
if err != nil {
906
+
return "", err
907
+
}
908
+
if lastShard.ID == 0 {
909
+
return "", nil
910
+
}
911
+
912
+
return lastShard.Rev, nil
918
913
}
919
914
920
915
type UserStat struct {
+25
-21
carstore/repo_test.go
+25
-21
carstore/repo_test.go
···
83
83
t.Fatal(err)
84
84
}
85
85
86
-
ncid, err := setupRepo(ctx, ds)
86
+
ncid, rev, err := setupRepo(ctx, ds)
87
87
if err != nil {
88
88
t.Fatal(err)
89
89
}
90
90
91
-
if _, err := ds.CloseWithRoot(ctx, ncid); err != nil {
91
+
if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil {
92
92
t.Fatal(err)
93
93
}
94
94
95
95
head := ncid
96
96
for i := 0; i < 10; i++ {
97
-
ds, err := cs.NewDeltaSession(ctx, 1, &head)
97
+
ds, err := cs.NewDeltaSession(ctx, 1, &rev)
98
98
if err != nil {
99
99
t.Fatal(err)
100
100
}
···
111
111
}
112
112
113
113
kmgr := &util.FakeKeyManager{}
114
-
nroot, err := rr.Commit(ctx, kmgr.SignForUser)
114
+
nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser)
115
115
if err != nil {
116
116
t.Fatal(err)
117
117
}
118
118
119
-
if _, err := ds.CloseWithRoot(ctx, nroot); err != nil {
119
+
rev = nrev
120
+
121
+
if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil {
120
122
t.Fatal(err)
121
123
}
122
124
···
124
126
}
125
127
126
128
buf := new(bytes.Buffer)
127
-
if err := cs.ReadUserCar(ctx, 1, cid.Undef, cid.Undef, true, buf); err != nil {
129
+
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
128
130
t.Fatal(err)
129
131
}
130
132
···
132
134
133
135
}
134
136
135
-
func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, error) {
137
+
func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, string, error) {
136
138
nr := repo.NewRepo(ctx, "did:foo", bs)
137
139
138
140
if _, _, err := nr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{
139
141
Text: fmt.Sprintf("hey look its a tweet %s", time.Now()),
140
142
}); err != nil {
141
-
return cid.Undef, err
143
+
return cid.Undef, "", err
142
144
}
143
145
144
146
kmgr := &util.FakeKeyManager{}
145
-
ncid, err := nr.Commit(ctx, kmgr.SignForUser)
147
+
ncid, rev, err := nr.Commit(ctx, kmgr.SignForUser)
146
148
if err != nil {
147
-
return cid.Undef, fmt.Errorf("commit failed: %w", err)
149
+
return cid.Undef, "", fmt.Errorf("commit failed: %w", err)
148
150
}
149
151
150
-
return ncid, nil
152
+
return ncid, rev, nil
151
153
}
152
154
153
155
func BenchmarkRepoWritesCarstore(b *testing.B) {
···
159
161
}
160
162
defer cleanup()
161
163
162
-
ds, err := cs.NewDeltaSession(ctx, 1, &cid.Undef)
164
+
ds, err := cs.NewDeltaSession(ctx, 1, nil)
163
165
if err != nil {
164
166
b.Fatal(err)
165
167
}
166
168
167
-
ncid, err := setupRepo(ctx, ds)
169
+
ncid, rev, err := setupRepo(ctx, ds)
168
170
if err != nil {
169
171
b.Fatal(err)
170
172
}
171
173
172
-
if _, err := ds.CloseWithRoot(ctx, ncid); err != nil {
174
+
if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil {
173
175
b.Fatal(err)
174
176
}
175
177
176
178
head := ncid
177
179
b.ResetTimer()
178
180
for i := 0; i < b.N; i++ {
179
-
ds, err := cs.NewDeltaSession(ctx, 1, &head)
181
+
ds, err := cs.NewDeltaSession(ctx, 1, &rev)
180
182
if err != nil {
181
183
b.Fatal(err)
182
184
}
···
193
195
}
194
196
195
197
kmgr := &util.FakeKeyManager{}
196
-
nroot, err := rr.Commit(ctx, kmgr.SignForUser)
198
+
nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser)
197
199
if err != nil {
198
200
b.Fatal(err)
199
201
}
200
202
201
-
if _, err := ds.CloseWithRoot(ctx, nroot); err != nil {
203
+
rev = nrev
204
+
205
+
if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil {
202
206
b.Fatal(err)
203
207
}
204
208
···
215
219
}
216
220
defer cleanup()
217
221
218
-
ncid, err := setupRepo(ctx, bs)
222
+
ncid, _, err := setupRepo(ctx, bs)
219
223
if err != nil {
220
224
b.Fatal(err)
221
225
}
···
236
240
}
237
241
238
242
kmgr := &util.FakeKeyManager{}
239
-
nroot, err := rr.Commit(ctx, kmgr.SignForUser)
243
+
nroot, _, err := rr.Commit(ctx, kmgr.SignForUser)
240
244
if err != nil {
241
245
b.Fatal(err)
242
246
}
···
253
257
b.Fatal(err)
254
258
}
255
259
256
-
ncid, err := setupRepo(ctx, bs)
260
+
ncid, _, err := setupRepo(ctx, bs)
257
261
if err != nil {
258
262
b.Fatal(err)
259
263
}
···
274
278
}
275
279
276
280
kmgr := &util.FakeKeyManager{}
277
-
nroot, err := rr.Commit(ctx, kmgr.SignForUser)
281
+
nroot, _, err := rr.Commit(ctx, kmgr.SignForUser)
278
282
if err != nil {
279
283
b.Fatal(err)
280
284
}
+3
-3
cmd/gosky/main.go
+3
-3
cmd/gosky/main.go
···
347
347
348
348
ctx := context.TODO()
349
349
350
-
repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "", "")
350
+
repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "")
351
351
if err != nil {
352
352
return err
353
353
}
···
647
647
arg = xrpcc.Auth.Did
648
648
}
649
649
650
-
rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, arg, "", "")
650
+
rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, arg, "")
651
651
if err != nil {
652
652
return err
653
653
}
···
1168
1168
return err
1169
1169
}
1170
1170
1171
-
rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, rfi, "", "")
1171
+
rrb, err := comatproto.SyncGetRepo(ctx, xrpcc, rfi, "")
1172
1172
if err != nil {
1173
1173
return err
1174
1174
}
-1
cmd/supercollider/main.go
-1
cmd/supercollider/main.go
+7
-6
events/dbpersist.go
+7
-6
events/dbpersist.go
···
67
67
68
68
type RepoEventRecord struct {
69
69
Seq uint `gorm:"primarykey"`
70
+
Rev string
71
+
Since *string
70
72
Commit *models.DbCID
71
73
Prev *models.DbCID
72
74
NewHandle *string // NewHandle is only set if this is a handle change event
···
276
278
Blobs: blobs,
277
279
Time: t,
278
280
Rebase: evt.Rebase,
281
+
Rev: evt.Rev,
282
+
Since: evt.Since,
279
283
}
280
284
281
285
opsb, err := json.Marshal(evt.Ops)
···
493
497
Blobs: blobCIDs,
494
498
Rebase: rer.Rebase,
495
499
Ops: ops,
500
+
Rev: rer.Rev,
501
+
Since: rer.Since,
496
502
}
497
503
498
504
cs, err := p.readCarSlice(ctx, rer)
···
511
517
512
518
func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {
513
519
514
-
var early cid.Cid
515
-
if rer.Prev != nil && !rer.Rebase {
516
-
early = rer.Prev.CID
517
-
}
518
-
519
520
buf := new(bytes.Buffer)
520
-
if err := p.cs.ReadUserCar(ctx, rer.Repo, early, rer.Commit.CID, true, buf); err != nil {
521
+
if err := p.cs.ReadUserCar(ctx, rer.Repo, rer.Rev, true, buf); err != nil {
521
522
return nil, err
522
523
}
523
524
+299
-334
indexer/indexer.go
+299
-334
indexer/indexer.go
···
130
130
131
131
toobig := false
132
132
slice := evt.RepoSlice
133
-
if len(slice) > MaxEventSliceLength || len(outops) > MaxOpsSliceLength {
133
+
if len(slice) > MaxEventSliceLength || len(outops) > MaxOpsSliceLength || evt.TooBig {
134
134
slice = nil
135
135
outops = nil
136
136
toobig = true
137
137
}
138
138
139
-
if evt.Rebase {
140
-
if err := ix.events.HandleRebase(ctx, evt.User); err != nil {
141
-
log.Errorf("failed to handle rebase in events manager: %s", err)
142
-
}
143
-
}
144
-
145
139
log.Debugw("Sending event", "did", did)
146
140
if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{
147
141
RepoCommit: &comatproto.SyncSubscribeRepos_Commit{
148
142
Repo: did,
149
143
Prev: (*lexutil.LexLink)(evt.OldRoot),
150
144
Blocks: slice,
145
+
Rev: evt.Rev,
146
+
Since: evt.Since,
151
147
Commit: lexutil.LexLink(evt.NewRoot),
152
148
Time: time.Now().Format(util.ISO8601),
153
149
Ops: outops,
154
150
TooBig: toobig,
155
-
Rebase: evt.Rebase,
156
151
},
157
152
PrivUid: evt.User,
158
153
}); err != nil {
···
165
160
func (ix *Indexer) handleRepoOp(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error {
166
161
switch op.Kind {
167
162
case repomgr.EvtKindCreateRecord:
168
-
if err := ix.crawlRecordReferences(ctx, op); err != nil {
169
-
return err
170
-
}
171
-
172
163
if ix.doAggregations {
173
164
_, err := ix.handleRecordCreate(ctx, evt, op, true)
174
165
if err != nil {
175
166
return fmt.Errorf("handle recordCreate: %w", err)
176
167
}
177
168
}
169
+
if err := ix.crawlRecordReferences(ctx, op); err != nil {
170
+
return err
171
+
}
172
+
178
173
case repomgr.EvtKindDeleteRecord:
179
174
if ix.doAggregations {
180
175
if err := ix.handleRecordDelete(ctx, evt, op, true); err != nil {
···
194
189
return nil
195
190
}
196
191
192
+
func (ix *Indexer) crawlAtUriRef(ctx context.Context, uri string) error {
193
+
puri, err := util.ParseAtUri(uri)
194
+
if err != nil {
195
+
return err
196
+
} else {
197
+
_, err := ix.GetUserOrMissing(ctx, puri.Did)
198
+
if err != nil {
199
+
return err
200
+
}
201
+
}
202
+
return nil
203
+
}
204
+
func (ix *Indexer) crawlRecordReferences(ctx context.Context, op *repomgr.RepoOp) error {
205
+
ctx, span := otel.Tracer("indexer").Start(ctx, "crawlRecordReferences")
206
+
defer span.End()
207
+
208
+
switch rec := op.Record.(type) {
209
+
case *bsky.FeedPost:
210
+
for _, e := range rec.Entities {
211
+
if e.Type == "mention" {
212
+
_, err := ix.GetUserOrMissing(ctx, e.Value)
213
+
if err != nil {
214
+
log.Infow("failed to parse user mention", "ref", e.Value, "err", err)
215
+
}
216
+
}
217
+
}
218
+
219
+
if rec.Reply != nil {
220
+
if rec.Reply.Parent != nil {
221
+
if err := ix.crawlAtUriRef(ctx, rec.Reply.Parent.Uri); err != nil {
222
+
log.Infow("failed to crawl reply parent", "cid", op.RecCid, "replyuri", rec.Reply.Parent.Uri, "err", err)
223
+
}
224
+
}
225
+
226
+
if rec.Reply.Root != nil {
227
+
if err := ix.crawlAtUriRef(ctx, rec.Reply.Root.Uri); err != nil {
228
+
log.Infow("failed to crawl reply root", "cid", op.RecCid, "rooturi", rec.Reply.Root.Uri, "err", err)
229
+
}
230
+
}
231
+
}
232
+
233
+
return nil
234
+
case *bsky.FeedRepost:
235
+
if rec.Subject != nil {
236
+
if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil {
237
+
log.Infow("failed to crawl repost subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err)
238
+
}
239
+
}
240
+
return nil
241
+
case *bsky.FeedLike:
242
+
if rec.Subject != nil {
243
+
if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil {
244
+
log.Infow("failed to crawl vote subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err)
245
+
}
246
+
}
247
+
return nil
248
+
case *bsky.GraphFollow:
249
+
_, err := ix.GetUserOrMissing(ctx, rec.Subject)
250
+
if err != nil {
251
+
log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err)
252
+
}
253
+
return nil
254
+
case *bsky.GraphBlock:
255
+
_, err := ix.GetUserOrMissing(ctx, rec.Subject)
256
+
if err != nil {
257
+
log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err)
258
+
}
259
+
return nil
260
+
case *bsky.ActorProfile:
261
+
return nil
262
+
default:
263
+
log.Warnf("unrecognized record type: %T", op.Record)
264
+
return nil
265
+
}
266
+
}
267
+
268
+
func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*models.ActorInfo, error) {
269
+
ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing")
270
+
defer span.End()
271
+
272
+
ai, err := ix.LookupUserByDid(ctx, did)
273
+
if err == nil {
274
+
return ai, nil
275
+
}
276
+
277
+
if !isNotFound(err) {
278
+
return nil, err
279
+
}
280
+
281
+
// unknown user... create it and send it off to the crawler
282
+
return ix.createMissingUserRecord(ctx, did)
283
+
}
284
+
285
+
func (ix *Indexer) createMissingUserRecord(ctx context.Context, did string) (*models.ActorInfo, error) {
286
+
ctx, span := otel.Tracer("indexer").Start(ctx, "createMissingUserRecord")
287
+
defer span.End()
288
+
289
+
ai, err := ix.CreateExternalUser(ctx, did)
290
+
if err != nil {
291
+
return nil, err
292
+
}
293
+
294
+
if err := ix.addUserToCrawler(ctx, ai); err != nil {
295
+
return nil, fmt.Errorf("failed to add unknown user to crawler: %w", err)
296
+
}
297
+
298
+
return ai, nil
299
+
}
300
+
301
+
func (ix *Indexer) addUserToCrawler(ctx context.Context, ai *models.ActorInfo) error {
302
+
log.Infow("Sending user to crawler: ", "did", ai.Did)
303
+
if ix.Crawler == nil {
304
+
return nil
305
+
}
306
+
307
+
return ix.Crawler.Crawl(ctx, ai)
308
+
}
309
+
310
+
func (ix *Indexer) DidForUser(ctx context.Context, uid models.Uid) (string, error) {
311
+
var ai models.ActorInfo
312
+
if err := ix.db.First(&ai, "uid = ?", uid).Error; err != nil {
313
+
return "", err
314
+
}
315
+
316
+
return ai.Did, nil
317
+
}
318
+
319
+
func (ix *Indexer) LookupUser(ctx context.Context, id models.Uid) (*models.ActorInfo, error) {
320
+
var ai models.ActorInfo
321
+
if err := ix.db.First(&ai, "uid = ?", id).Error; err != nil {
322
+
return nil, err
323
+
}
324
+
325
+
return &ai, nil
326
+
}
327
+
328
+
func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*models.ActorInfo, error) {
329
+
var ai models.ActorInfo
330
+
if err := ix.db.Find(&ai, "did = ?", did).Error; err != nil {
331
+
return nil, err
332
+
}
333
+
334
+
if ai.ID == 0 {
335
+
return nil, gorm.ErrRecordNotFound
336
+
}
337
+
338
+
return &ai, nil
339
+
}
340
+
341
+
func (ix *Indexer) LookupUserByHandle(ctx context.Context, handle string) (*models.ActorInfo, error) {
342
+
var ai models.ActorInfo
343
+
if err := ix.db.Find(&ai, "handle = ?", handle).Error; err != nil {
344
+
return nil, err
345
+
}
346
+
347
+
if ai.ID == 0 {
348
+
return nil, gorm.ErrRecordNotFound
349
+
}
350
+
351
+
return &ai, nil
352
+
}
353
+
354
+
func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error {
355
+
ai := op.ActorInfo
356
+
357
+
if err := ix.db.Clauses(clause.OnConflict{
358
+
Columns: []clause.Column{{Name: "uid"}},
359
+
UpdateAll: true,
360
+
}).Create(&models.ActorInfo{
361
+
Uid: evt.User,
362
+
Handle: ai.Handle,
363
+
Did: ai.Did,
364
+
DisplayName: ai.DisplayName,
365
+
Type: ai.Type,
366
+
PDS: evt.PDS,
367
+
}).Error; err != nil {
368
+
return fmt.Errorf("initializing new actor info: %w", err)
369
+
}
370
+
371
+
if err := ix.db.Create(&models.FollowRecord{
372
+
Follower: evt.User,
373
+
Target: evt.User,
374
+
}).Error; err != nil {
375
+
return err
376
+
}
377
+
378
+
return nil
379
+
}
380
+
381
+
func isNotFound(err error) bool {
382
+
if errors.Is(err, gorm.ErrRecordNotFound) {
383
+
return true
384
+
}
385
+
386
+
return false
387
+
}
388
+
389
+
// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way?
390
+
func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error {
391
+
ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo")
392
+
defer span.End()
393
+
394
+
span.SetAttributes(attribute.Int("catchup", len(job.catchup)))
395
+
396
+
ai := job.act
397
+
398
+
var pds models.PDS
399
+
if err := ix.db.First(&pds, "id = ?", ai.PDS).Error; err != nil {
400
+
return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err)
401
+
}
402
+
403
+
rev, err := ix.repomgr.GetRepoRev(ctx, ai.Uid)
404
+
if err != nil && !isNotFound(err) {
405
+
return fmt.Errorf("failed to get repo root: %w", err)
406
+
}
407
+
408
+
if !(job.initScrape || len(job.catchup) == 0) {
409
+
first := job.catchup[0]
410
+
if first.evt.Since == nil || rev == *first.evt.Since {
411
+
for _, j := range job.catchup {
412
+
if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, j.evt.Since, j.evt.Rev, j.evt.Blocks, j.evt.Ops); err != nil {
413
+
// TODO: if we fail here, we should probably fall back to a repo re-sync
414
+
return fmt.Errorf("post rebase catchup failed: %w", err)
415
+
}
416
+
}
417
+
418
+
return nil
419
+
}
420
+
}
421
+
422
+
var host string
423
+
if pds.SSL {
424
+
host = "https://" + pds.Host
425
+
} else {
426
+
host = "http://" + pds.Host
427
+
}
428
+
c := &xrpc.Client{
429
+
Host: host,
430
+
}
431
+
432
+
ix.ApplyPDSClientSettings(c)
433
+
434
+
if rev == "" {
435
+
span.SetAttributes(attribute.Bool("full", true))
436
+
}
437
+
438
+
limiter := ix.GetLimiter(pds.ID)
439
+
if limiter == nil {
440
+
limiter = rate.NewLimiter(rate.Limit(pds.CrawlRateLimit), 1)
441
+
ix.SetLimiter(pds.ID, limiter)
442
+
}
443
+
444
+
// Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos
445
+
limiter.Wait(ctx)
446
+
447
+
log.Infow("SyncGetRepo", "did", ai.Did, "user", ai.Handle, "since", rev)
448
+
// TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us
449
+
repo, err := comatproto.SyncGetRepo(ctx, c, ai.Did, rev)
450
+
if err != nil {
451
+
return fmt.Errorf("failed to fetch repo: %w", err)
452
+
}
453
+
454
+
// this process will send individual indexing events back to the indexer, doing a 'fast forward' of the users entire history
455
+
// we probably want alternative ways of doing this for 'very large' or 'very old' repos, but this works for now
456
+
if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil {
457
+
span.RecordError(err)
458
+
return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err)
459
+
}
460
+
461
+
// TODO: this is currently doing too much work, allowing us to ignore the catchup events we've gotten
462
+
// need to do 'just enough' work...
463
+
464
+
return nil
465
+
}
466
+
467
+
func (ix *Indexer) GetPost(ctx context.Context, uri string) (*models.FeedPost, error) {
468
+
puri, err := util.ParseAtUri(uri)
469
+
if err != nil {
470
+
return nil, err
471
+
}
472
+
473
+
var post models.FeedPost
474
+
if err := ix.db.First(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(models.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil {
475
+
return nil, err
476
+
}
477
+
478
+
return &post, nil
479
+
}
480
+
197
481
func (ix *Indexer) handleRecordDelete(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error {
198
482
log.Infow("record delete event", "collection", op.Collection)
199
483
···
227
511
228
512
log.Warn("TODO: remove notifications on delete")
229
513
/*
230
-
if err := ix.notifman.RemoveRepost(ctx, fp.Author, rr.ID, evt.User); err != nil {
231
-
return nil, err
232
-
}
514
+
if err := ix.notifman.RemoveRepost(ctx, fp.Author, rr.ID, evt.User); err != nil {
515
+
return nil, err
516
+
}
233
517
*/
234
518
235
519
case "app.bsky.feed.vote":
···
335
619
return out, nil
336
620
}
337
621
338
-
func (ix *Indexer) crawlAtUriRef(ctx context.Context, uri string) error {
339
-
puri, err := util.ParseAtUri(uri)
340
-
if err != nil {
341
-
return err
342
-
} else {
343
-
_, err := ix.GetUserOrMissing(ctx, puri.Did)
344
-
if err != nil {
345
-
return err
346
-
}
347
-
}
348
-
return nil
349
-
}
350
-
func (ix *Indexer) crawlRecordReferences(ctx context.Context, op *repomgr.RepoOp) error {
351
-
ctx, span := otel.Tracer("indexer").Start(ctx, "crawlRecordReferences")
352
-
defer span.End()
353
-
354
-
switch rec := op.Record.(type) {
355
-
case *bsky.FeedPost:
356
-
for _, e := range rec.Entities {
357
-
if e.Type == "mention" {
358
-
_, err := ix.GetUserOrMissing(ctx, e.Value)
359
-
if err != nil {
360
-
log.Infow("failed to parse user mention", "ref", e.Value, "err", err)
361
-
}
362
-
}
363
-
}
364
-
365
-
if rec.Reply != nil {
366
-
if rec.Reply.Parent != nil {
367
-
if err := ix.crawlAtUriRef(ctx, rec.Reply.Parent.Uri); err != nil {
368
-
log.Infow("failed to crawl reply parent", "cid", op.RecCid, "replyuri", rec.Reply.Parent.Uri, "err", err)
369
-
}
370
-
}
371
-
372
-
if rec.Reply.Root != nil {
373
-
if err := ix.crawlAtUriRef(ctx, rec.Reply.Root.Uri); err != nil {
374
-
log.Infow("failed to crawl reply root", "cid", op.RecCid, "rooturi", rec.Reply.Root.Uri, "err", err)
375
-
}
376
-
}
377
-
}
378
-
379
-
return nil
380
-
case *bsky.FeedRepost:
381
-
if rec.Subject != nil {
382
-
if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil {
383
-
log.Infow("failed to crawl repost subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err)
384
-
}
385
-
}
386
-
return nil
387
-
case *bsky.FeedLike:
388
-
if rec.Subject != nil {
389
-
if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil {
390
-
log.Infow("failed to crawl vote subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err)
391
-
}
392
-
}
393
-
return nil
394
-
case *bsky.GraphFollow:
395
-
_, err := ix.GetUserOrMissing(ctx, rec.Subject)
396
-
if err != nil {
397
-
log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err)
398
-
}
399
-
return nil
400
-
case *bsky.GraphBlock:
401
-
_, err := ix.GetUserOrMissing(ctx, rec.Subject)
402
-
if err != nil {
403
-
log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err)
404
-
}
405
-
return nil
406
-
case *bsky.ActorProfile:
407
-
return nil
408
-
default:
409
-
log.Warnf("unrecognized record type: %T", op.Record)
410
-
return nil
411
-
}
412
-
}
413
-
414
622
func (ix *Indexer) handleRecordCreateFeedLike(ctx context.Context, rec *bsky.FeedLike, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error {
415
623
post, err := ix.GetPostOrMissing(ctx, rec.Subject.Uri)
416
624
if err != nil {
···
658
866
return nil
659
867
}
660
868
661
-
func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*models.ActorInfo, error) {
662
-
ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing")
663
-
defer span.End()
664
-
665
-
ai, err := ix.LookupUserByDid(ctx, did)
666
-
if err == nil {
667
-
return ai, nil
668
-
}
669
-
670
-
if !isNotFound(err) {
671
-
return nil, err
672
-
}
673
-
674
-
// unknown user... create it and send it off to the crawler
675
-
return ix.createMissingUserRecord(ctx, did)
676
-
}
677
-
678
869
func (ix *Indexer) createMissingPostRecord(ctx context.Context, puri *util.ParsedUri) (*models.FeedPost, error) {
679
870
log.Warn("creating missing post record")
680
871
ai, err := ix.GetUserOrMissing(ctx, puri.Did)
···
694
885
return &fp, nil
695
886
}
696
887
697
-
func (ix *Indexer) createMissingUserRecord(ctx context.Context, did string) (*models.ActorInfo, error) {
698
-
ctx, span := otel.Tracer("indexer").Start(ctx, "createMissingUserRecord")
699
-
defer span.End()
700
-
701
-
ai, err := ix.CreateExternalUser(ctx, did)
702
-
if err != nil {
703
-
return nil, err
704
-
}
705
-
706
-
if err := ix.addUserToCrawler(ctx, ai); err != nil {
707
-
return nil, fmt.Errorf("failed to add unknown user to crawler: %w", err)
708
-
}
709
-
710
-
return ai, nil
711
-
}
712
-
713
-
func (ix *Indexer) addUserToCrawler(ctx context.Context, ai *models.ActorInfo) error {
714
-
log.Infow("Sending user to crawler: ", "did", ai.Did)
715
-
if ix.Crawler == nil {
716
-
return nil
717
-
}
718
-
719
-
return ix.Crawler.Crawl(ctx, ai)
720
-
}
721
-
722
-
func (ix *Indexer) DidForUser(ctx context.Context, uid models.Uid) (string, error) {
723
-
var ai models.ActorInfo
724
-
if err := ix.db.First(&ai, "uid = ?", uid).Error; err != nil {
725
-
return "", err
726
-
}
727
-
728
-
return ai.Did, nil
729
-
}
730
-
731
-
func (ix *Indexer) LookupUser(ctx context.Context, id models.Uid) (*models.ActorInfo, error) {
732
-
var ai models.ActorInfo
733
-
if err := ix.db.First(&ai, "uid = ?", id).Error; err != nil {
734
-
return nil, err
735
-
}
736
-
737
-
return &ai, nil
738
-
}
739
-
740
-
func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*models.ActorInfo, error) {
741
-
var ai models.ActorInfo
742
-
if err := ix.db.Find(&ai, "did = ?", did).Error; err != nil {
743
-
return nil, err
744
-
}
745
-
746
-
if ai.ID == 0 {
747
-
return nil, gorm.ErrRecordNotFound
748
-
}
749
-
750
-
return &ai, nil
751
-
}
752
-
753
-
func (ix *Indexer) LookupUserByHandle(ctx context.Context, handle string) (*models.ActorInfo, error) {
754
-
var ai models.ActorInfo
755
-
if err := ix.db.Find(&ai, "handle = ?", handle).Error; err != nil {
756
-
return nil, err
757
-
}
758
-
759
-
if ai.ID == 0 {
760
-
return nil, gorm.ErrRecordNotFound
761
-
}
762
-
763
-
return &ai, nil
764
-
}
765
-
766
888
func (ix *Indexer) addNewPostNotification(ctx context.Context, post *bsky.FeedPost, fp *models.FeedPost, mentions []*models.ActorInfo) error {
767
889
if post.Reply != nil {
768
890
replyto, err := ix.GetPost(ctx, post.Reply.Parent.Uri)
···
788
910
func (ix *Indexer) addNewVoteNotification(ctx context.Context, postauthor models.Uid, vr *models.VoteRecord) error {
789
911
return ix.notifman.AddUpVote(ctx, vr.Voter, vr.Post, vr.ID, postauthor)
790
912
}
791
-
792
-
func (ix *Indexer) handleInitActor(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp) error {
793
-
ai := op.ActorInfo
794
-
795
-
if err := ix.db.Clauses(clause.OnConflict{
796
-
Columns: []clause.Column{{Name: "uid"}},
797
-
UpdateAll: true,
798
-
}).Create(&models.ActorInfo{
799
-
Uid: evt.User,
800
-
Handle: ai.Handle,
801
-
Did: ai.Did,
802
-
DisplayName: ai.DisplayName,
803
-
Type: ai.Type,
804
-
PDS: evt.PDS,
805
-
}).Error; err != nil {
806
-
return fmt.Errorf("initializing new actor info: %w", err)
807
-
}
808
-
809
-
if err := ix.db.Create(&models.FollowRecord{
810
-
Follower: evt.User,
811
-
Target: evt.User,
812
-
}).Error; err != nil {
813
-
return err
814
-
}
815
-
816
-
return nil
817
-
}
818
-
819
-
func isNotFound(err error) bool {
820
-
if errors.Is(err, gorm.ErrRecordNotFound) {
821
-
return true
822
-
}
823
-
824
-
return false
825
-
}
826
-
827
-
func (ix *Indexer) GetPost(ctx context.Context, uri string) (*models.FeedPost, error) {
828
-
puri, err := util.ParseAtUri(uri)
829
-
if err != nil {
830
-
return nil, err
831
-
}
832
-
833
-
var post models.FeedPost
834
-
if err := ix.db.First(&post, "rkey = ? AND author = (?)", puri.Rkey, ix.db.Model(models.ActorInfo{}).Where("did = ?", puri.Did).Select("id")).Error; err != nil {
835
-
return nil, err
836
-
}
837
-
838
-
return &post, nil
839
-
}
840
-
841
-
// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way?
842
-
func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error {
843
-
ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo")
844
-
defer span.End()
845
-
846
-
span.SetAttributes(attribute.Int("catchup", len(job.catchup)))
847
-
848
-
ai := job.act
849
-
850
-
var pds models.PDS
851
-
if err := ix.db.First(&pds, "id = ?", ai.PDS).Error; err != nil {
852
-
return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err)
853
-
}
854
-
855
-
curHead, err := ix.repomgr.GetRepoRoot(ctx, ai.Uid)
856
-
if err != nil && !isNotFound(err) {
857
-
return fmt.Errorf("failed to get repo root: %w", err)
858
-
}
859
-
860
-
var rebase *comatproto.SyncSubscribeRepos_Commit
861
-
var rebaseIx int
862
-
for i, j := range job.catchup {
863
-
if j.evt.Rebase {
864
-
rebase = j.evt
865
-
rebaseIx = i
866
-
break
867
-
}
868
-
}
869
-
870
-
if rebase != nil {
871
-
if err := ix.repomgr.HandleRebase(ctx, ai.PDS, ai.Uid, ai.Did, (*cid.Cid)(rebase.Prev), (cid.Cid)(rebase.Commit), rebase.Blocks); err != nil {
872
-
return fmt.Errorf("handling rebase: %w", err)
873
-
}
874
-
// now process the rest of the catchup events
875
-
// these are all events that got received *after* the rebase, but
876
-
// before we could start processing it.
877
-
// That means these should be the next operations that get cleanly
878
-
// applied after the rebase
879
-
for _, j := range job.catchup[rebaseIx+1:] {
880
-
if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, (*cid.Cid)(j.evt.Prev), j.evt.Blocks, j.evt.Ops); err != nil {
881
-
return fmt.Errorf("post rebase catchup failed: %w", err)
882
-
}
883
-
}
884
-
return nil
885
-
}
886
-
887
-
if !(job.initScrape || len(job.catchup) == 0) {
888
-
first := job.catchup[0]
889
-
if first.evt.Prev == nil || curHead == (cid.Cid)(*first.evt.Prev) {
890
-
for _, j := range job.catchup {
891
-
if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, (*cid.Cid)(j.evt.Prev), j.evt.Blocks, j.evt.Ops); err != nil {
892
-
// TODO: if we fail here, we should probably fall back to a repo re-sync
893
-
return fmt.Errorf("post rebase catchup failed: %w", err)
894
-
}
895
-
}
896
-
897
-
return nil
898
-
}
899
-
}
900
-
901
-
var host string
902
-
if pds.SSL {
903
-
host = "https://" + pds.Host
904
-
} else {
905
-
host = "http://" + pds.Host
906
-
}
907
-
c := &xrpc.Client{
908
-
Host: host,
909
-
}
910
-
911
-
ix.ApplyPDSClientSettings(c)
912
-
913
-
var from string
914
-
if curHead.Defined() {
915
-
from = curHead.String()
916
-
} else {
917
-
span.SetAttributes(attribute.Bool("full", true))
918
-
}
919
-
920
-
limiter := ix.GetLimiter(pds.ID)
921
-
if limiter == nil {
922
-
limiter = rate.NewLimiter(rate.Limit(pds.CrawlRateLimit), 1)
923
-
ix.SetLimiter(pds.ID, limiter)
924
-
}
925
-
926
-
// Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos
927
-
limiter.Wait(ctx)
928
-
929
-
log.Infow("SyncGetRepo", "did", ai.Did, "user", ai.Handle, "from", from)
930
-
// TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us
931
-
repo, err := comatproto.SyncGetRepo(ctx, c, ai.Did, from, "")
932
-
if err != nil {
933
-
return fmt.Errorf("failed to fetch repo: %w", err)
934
-
}
935
-
936
-
// this process will send individual indexing events back to the indexer, doing a 'fast forward' of the users entire history
937
-
// we probably want alternative ways of doing this for 'very large' or 'very old' repos, but this works for now
938
-
if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), curHead); err != nil {
939
-
span.RecordError(err)
940
-
return fmt.Errorf("importing fetched repo (curHead: %s): %w", from, err)
941
-
}
942
-
943
-
// TODO: this is currently doing too much work, allowing us to ignore the catchup events we've gotten
944
-
// need to do 'just enough' work...
945
-
946
-
return nil
947
-
}
+1
-1
lex/gen.go
+1
-1
lex/gen.go
···
1211
1211
omit = ",omitempty"
1212
1212
}
1213
1213
cval := ts.id
1214
-
if ts.defName != "" {
1214
+
if ts.defName != "" && ts.defName != "main" {
1215
1215
cval += "#" + ts.defName
1216
1216
}
1217
1217
pf("\tLexiconTypeID string `json:\"$type,const=%s%s\" cborgen:\"$type,const=%s%s\"`\n", cval, omit, cval, omit)
+20
-33
pds/handlers.go
+20
-33
pds/handlers.go
···
565
565
panic("not yet implemented")
566
566
}
567
567
568
-
func (s *Server) handleComAtprotoSyncGetCheckout(ctx context.Context, commit string, did string) (io.Reader, error) {
568
+
func (s *Server) handleComAtprotoSyncGetCheckout(ctx context.Context, did string) (io.Reader, error) {
569
569
panic("not yet implemented")
570
570
}
571
571
···
593
593
panic("not yet implemented")
594
594
}
595
595
596
-
func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context, did string, earliest, latest string) (io.Reader, error) {
597
-
var earlyCid cid.Cid
598
-
if earliest != "" {
599
-
cc, err := cid.Decode(earliest)
600
-
if err != nil {
601
-
return nil, err
602
-
}
603
-
604
-
earlyCid = cc
605
-
}
606
-
607
-
var lateCid cid.Cid
608
-
if latest != "" {
609
-
cc, err := cid.Decode(latest)
610
-
if err != nil {
611
-
return nil, err
612
-
}
613
-
614
-
lateCid = cc
615
-
}
616
-
596
+
func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) {
617
597
targetUser, err := s.lookupUser(ctx, did)
618
598
if err != nil {
619
599
return nil, err
620
600
}
621
601
622
602
buf := new(bytes.Buffer)
623
-
if err := s.repoman.ReadRepo(ctx, targetUser.ID, earlyCid, lateCid, buf); err != nil {
603
+
if err := s.repoman.ReadRepo(ctx, targetUser.ID, since, buf); err != nil {
624
604
return nil, err
625
605
}
626
606
···
681
661
panic("nyi")
682
662
}
683
663
684
-
func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context, did string, earliest string, latest string) (*comatprototypes.SyncListBlobs_Output, error) {
664
+
func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context, cursor string, did string, limit int, since string) (*comatprototypes.SyncListBlobs_Output, error) {
685
665
panic("nyi")
686
666
}
687
667
···
798
778
panic("nyi")
799
779
}
800
780
801
-
func (s *Server) handleComAtprotoRepoRebaseRepo(ctx context.Context, body *comatprototypes.RepoRebaseRepo_Input) error {
802
-
u, err := s.getUser(ctx)
803
-
if err != nil {
804
-
return err
805
-
}
806
-
807
-
return s.repoman.DoRebase(ctx, u.ID)
808
-
}
809
-
810
781
func (s *Server) handleAppBskyFeedDescribeFeedGenerator(ctx context.Context) (*appbskytypes.FeedDescribeFeedGenerator_Output, error) {
811
782
panic("nyi")
812
783
}
···
843
814
func (s *Server) handleComAtprotoAdminSendEmail(ctx context.Context, body *comatprototypes.AdminSendEmail_Input) (*comatprototypes.AdminSendEmail_Output, error) {
844
815
panic("nyi")
845
816
}
817
+
818
+
func (s *Server) handleAppBskyFeedGetActorLikes(ctx context.Context, actor string, cursor string, limit int) (*appbskytypes.FeedGetActorLikes_Output, error) {
819
+
panic("nyi")
820
+
}
821
+
822
+
func (s *Server) handleAppBskyNotificationRegisterPush(ctx context.Context, body *appbskytypes.NotificationRegisterPush_Input) error {
823
+
panic("nyi")
824
+
}
825
+
826
+
func (s *Server) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) {
827
+
panic("nyi")
828
+
}
829
+
830
+
func (s *Server) handleComAtprotoTempUpgradeRepoVersion(ctx context.Context, body *comatprototypes.TempUpgradeRepoVersion_Input) error {
831
+
panic("nyi")
832
+
}
+2
-2
pds/server.go
+2
-2
pds/server.go
···
142
142
u.ID = subj.Uid
143
143
}
144
144
145
-
return s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks, evt.Ops)
145
+
return s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops)
146
146
default:
147
147
return fmt.Errorf("invalid fed event")
148
148
}
···
338
338
}
339
339
340
340
e.HTTPErrorHandler = func(err error, ctx echo.Context) {
341
-
fmt.Printf("HANDLER ERROR: (%s) %s\n", ctx.Path(), err)
341
+
fmt.Printf("PDS HANDLER ERROR: (%s) %s\n", ctx.Path(), err)
342
342
343
343
// TODO: need to properly figure out where http error codes for error
344
344
// types get decided. This spot is reasonable, but maybe a bit weird.
+94
-60
pds/stubs.go
+94
-60
pds/stubs.go
···
20
20
e.GET("/xrpc/app.bsky.actor.searchActorsTypeahead", s.HandleAppBskyActorSearchActorsTypeahead)
21
21
e.GET("/xrpc/app.bsky.feed.describeFeedGenerator", s.HandleAppBskyFeedDescribeFeedGenerator)
22
22
e.GET("/xrpc/app.bsky.feed.getActorFeeds", s.HandleAppBskyFeedGetActorFeeds)
23
+
e.GET("/xrpc/app.bsky.feed.getActorLikes", s.HandleAppBskyFeedGetActorLikes)
23
24
e.GET("/xrpc/app.bsky.feed.getAuthorFeed", s.HandleAppBskyFeedGetAuthorFeed)
24
25
e.GET("/xrpc/app.bsky.feed.getFeed", s.HandleAppBskyFeedGetFeed)
25
26
e.GET("/xrpc/app.bsky.feed.getFeedGenerator", s.HandleAppBskyFeedGetFeedGenerator)
···
43
44
e.POST("/xrpc/app.bsky.graph.unmuteActorList", s.HandleAppBskyGraphUnmuteActorList)
44
45
e.GET("/xrpc/app.bsky.notification.getUnreadCount", s.HandleAppBskyNotificationGetUnreadCount)
45
46
e.GET("/xrpc/app.bsky.notification.listNotifications", s.HandleAppBskyNotificationListNotifications)
47
+
e.POST("/xrpc/app.bsky.notification.registerPush", s.HandleAppBskyNotificationRegisterPush)
46
48
e.POST("/xrpc/app.bsky.notification.updateSeen", s.HandleAppBskyNotificationUpdateSeen)
47
49
e.POST("/xrpc/app.bsky.unspecced.applyLabels", s.HandleAppBskyUnspeccedApplyLabels)
48
50
e.GET("/xrpc/app.bsky.unspecced.getPopular", s.HandleAppBskyUnspeccedGetPopular)
···
219
221
var handleErr error
220
222
// func (s *Server) handleAppBskyFeedGetActorFeeds(ctx context.Context,actor string,cursor string,limit int) (*appbskytypes.FeedGetActorFeeds_Output, error)
221
223
out, handleErr = s.handleAppBskyFeedGetActorFeeds(ctx, actor, cursor, limit)
224
+
if handleErr != nil {
225
+
return handleErr
226
+
}
227
+
return c.JSON(200, out)
228
+
}
229
+
230
+
func (s *Server) HandleAppBskyFeedGetActorLikes(c echo.Context) error {
231
+
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleAppBskyFeedGetActorLikes")
232
+
defer span.End()
233
+
actor := c.QueryParam("actor")
234
+
cursor := c.QueryParam("cursor")
235
+
236
+
var limit int
237
+
if p := c.QueryParam("limit"); p != "" {
238
+
var err error
239
+
limit, err = strconv.Atoi(p)
240
+
if err != nil {
241
+
return err
242
+
}
243
+
} else {
244
+
limit = 50
245
+
}
246
+
var out *appbskytypes.FeedGetActorLikes_Output
247
+
var handleErr error
248
+
// func (s *Server) handleAppBskyFeedGetActorLikes(ctx context.Context,actor string,cursor string,limit int) (*appbskytypes.FeedGetActorLikes_Output, error)
249
+
out, handleErr = s.handleAppBskyFeedGetActorLikes(ctx, actor, cursor, limit)
222
250
if handleErr != nil {
223
251
return handleErr
224
252
}
···
751
779
return c.JSON(200, out)
752
780
}
753
781
782
+
func (s *Server) HandleAppBskyNotificationRegisterPush(c echo.Context) error {
783
+
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleAppBskyNotificationRegisterPush")
784
+
defer span.End()
785
+
786
+
var body appbskytypes.NotificationRegisterPush_Input
787
+
if err := c.Bind(&body); err != nil {
788
+
return err
789
+
}
790
+
var handleErr error
791
+
// func (s *Server) handleAppBskyNotificationRegisterPush(ctx context.Context,body *appbskytypes.NotificationRegisterPush_Input) error
792
+
handleErr = s.handleAppBskyNotificationRegisterPush(ctx, &body)
793
+
if handleErr != nil {
794
+
return handleErr
795
+
}
796
+
return nil
797
+
}
798
+
754
799
func (s *Server) HandleAppBskyNotificationUpdateSeen(c echo.Context) error {
755
800
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleAppBskyNotificationUpdateSeen")
756
801
defer span.End()
···
883
928
e.GET("/xrpc/com.atproto.admin.getModerationReports", s.HandleComAtprotoAdminGetModerationReports)
884
929
e.GET("/xrpc/com.atproto.admin.getRecord", s.HandleComAtprotoAdminGetRecord)
885
930
e.GET("/xrpc/com.atproto.admin.getRepo", s.HandleComAtprotoAdminGetRepo)
886
-
e.POST("/xrpc/com.atproto.admin.rebaseRepo", s.HandleComAtprotoAdminRebaseRepo)
887
931
e.POST("/xrpc/com.atproto.admin.resolveModerationReports", s.HandleComAtprotoAdminResolveModerationReports)
888
932
e.POST("/xrpc/com.atproto.admin.reverseModerationAction", s.HandleComAtprotoAdminReverseModerationAction)
889
933
e.GET("/xrpc/com.atproto.admin.searchRepos", s.HandleComAtprotoAdminSearchRepos)
···
902
946
e.GET("/xrpc/com.atproto.repo.getRecord", s.HandleComAtprotoRepoGetRecord)
903
947
e.GET("/xrpc/com.atproto.repo.listRecords", s.HandleComAtprotoRepoListRecords)
904
948
e.POST("/xrpc/com.atproto.repo.putRecord", s.HandleComAtprotoRepoPutRecord)
905
-
e.POST("/xrpc/com.atproto.repo.rebaseRepo", s.HandleComAtprotoRepoRebaseRepo)
906
949
e.POST("/xrpc/com.atproto.repo.uploadBlob", s.HandleComAtprotoRepoUploadBlob)
907
950
e.POST("/xrpc/com.atproto.server.createAccount", s.HandleComAtprotoServerCreateAccount)
908
951
e.POST("/xrpc/com.atproto.server.createAppPassword", s.HandleComAtprotoServerCreateAppPassword)
···
923
966
e.GET("/xrpc/com.atproto.sync.getBlob", s.HandleComAtprotoSyncGetBlob)
924
967
e.GET("/xrpc/com.atproto.sync.getBlocks", s.HandleComAtprotoSyncGetBlocks)
925
968
e.GET("/xrpc/com.atproto.sync.getCheckout", s.HandleComAtprotoSyncGetCheckout)
926
-
e.GET("/xrpc/com.atproto.sync.getCommitPath", s.HandleComAtprotoSyncGetCommitPath)
927
969
e.GET("/xrpc/com.atproto.sync.getHead", s.HandleComAtprotoSyncGetHead)
970
+
e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit)
928
971
e.GET("/xrpc/com.atproto.sync.getRecord", s.HandleComAtprotoSyncGetRecord)
929
972
e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo)
930
973
e.GET("/xrpc/com.atproto.sync.listBlobs", s.HandleComAtprotoSyncListBlobs)
931
974
e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos)
932
975
e.POST("/xrpc/com.atproto.sync.notifyOfUpdate", s.HandleComAtprotoSyncNotifyOfUpdate)
933
976
e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl)
977
+
e.POST("/xrpc/com.atproto.temp.upgradeRepoVersion", s.HandleComAtprotoTempUpgradeRepoVersion)
934
978
return nil
935
979
}
936
980
···
1152
1196
return c.JSON(200, out)
1153
1197
}
1154
1198
1155
-
func (s *Server) HandleComAtprotoAdminRebaseRepo(c echo.Context) error {
1156
-
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoAdminRebaseRepo")
1157
-
defer span.End()
1158
-
1159
-
var body comatprototypes.AdminRebaseRepo_Input
1160
-
if err := c.Bind(&body); err != nil {
1161
-
return err
1162
-
}
1163
-
var handleErr error
1164
-
// func (s *Server) handleComAtprotoAdminRebaseRepo(ctx context.Context,body *comatprototypes.AdminRebaseRepo_Input) error
1165
-
handleErr = s.handleComAtprotoAdminRebaseRepo(ctx, &body)
1166
-
if handleErr != nil {
1167
-
return handleErr
1168
-
}
1169
-
return nil
1170
-
}
1171
-
1172
1199
func (s *Server) HandleComAtprotoAdminResolveModerationReports(c echo.Context) error {
1173
1200
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoAdminResolveModerationReports")
1174
1201
defer span.End()
···
1519
1546
return c.JSON(200, out)
1520
1547
}
1521
1548
1522
-
func (s *Server) HandleComAtprotoRepoRebaseRepo(c echo.Context) error {
1523
-
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoRepoRebaseRepo")
1524
-
defer span.End()
1525
-
1526
-
var body comatprototypes.RepoRebaseRepo_Input
1527
-
if err := c.Bind(&body); err != nil {
1528
-
return err
1529
-
}
1530
-
var handleErr error
1531
-
// func (s *Server) handleComAtprotoRepoRebaseRepo(ctx context.Context,body *comatprototypes.RepoRebaseRepo_Input) error
1532
-
handleErr = s.handleComAtprotoRepoRebaseRepo(ctx, &body)
1533
-
if handleErr != nil {
1534
-
return handleErr
1535
-
}
1536
-
return nil
1537
-
}
1538
-
1539
1549
func (s *Server) HandleComAtprotoRepoUploadBlob(c echo.Context) error {
1540
1550
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoRepoUploadBlob")
1541
1551
defer span.End()
···
1854
1864
func (s *Server) HandleComAtprotoSyncGetCheckout(c echo.Context) error {
1855
1865
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCheckout")
1856
1866
defer span.End()
1857
-
commit := c.QueryParam("commit")
1858
1867
did := c.QueryParam("did")
1859
1868
var out io.Reader
1860
1869
var handleErr error
1861
-
// func (s *Server) handleComAtprotoSyncGetCheckout(ctx context.Context,commit string,did string) (io.Reader, error)
1862
-
out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, commit, did)
1870
+
// func (s *Server) handleComAtprotoSyncGetCheckout(ctx context.Context,did string) (io.Reader, error)
1871
+
out, handleErr = s.handleComAtprotoSyncGetCheckout(ctx, did)
1863
1872
if handleErr != nil {
1864
1873
return handleErr
1865
1874
}
1866
1875
return c.Stream(200, "application/vnd.ipld.car", out)
1867
1876
}
1868
1877
1869
-
func (s *Server) HandleComAtprotoSyncGetCommitPath(c echo.Context) error {
1870
-
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetCommitPath")
1878
+
func (s *Server) HandleComAtprotoSyncGetHead(c echo.Context) error {
1879
+
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead")
1871
1880
defer span.End()
1872
1881
did := c.QueryParam("did")
1873
-
earliest := c.QueryParam("earliest")
1874
-
latest := c.QueryParam("latest")
1875
-
var out *comatprototypes.SyncGetCommitPath_Output
1882
+
var out *comatprototypes.SyncGetHead_Output
1876
1883
var handleErr error
1877
-
// func (s *Server) handleComAtprotoSyncGetCommitPath(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncGetCommitPath_Output, error)
1878
-
out, handleErr = s.handleComAtprotoSyncGetCommitPath(ctx, did, earliest, latest)
1884
+
// func (s *Server) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error)
1885
+
out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did)
1879
1886
if handleErr != nil {
1880
1887
return handleErr
1881
1888
}
1882
1889
return c.JSON(200, out)
1883
1890
}
1884
1891
1885
-
func (s *Server) HandleComAtprotoSyncGetHead(c echo.Context) error {
1886
-
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHead")
1892
+
func (s *Server) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error {
1893
+
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit")
1887
1894
defer span.End()
1888
1895
did := c.QueryParam("did")
1889
-
var out *comatprototypes.SyncGetHead_Output
1896
+
var out *comatprototypes.SyncGetLatestCommit_Output
1890
1897
var handleErr error
1891
-
// func (s *Server) handleComAtprotoSyncGetHead(ctx context.Context,did string) (*comatprototypes.SyncGetHead_Output, error)
1892
-
out, handleErr = s.handleComAtprotoSyncGetHead(ctx, did)
1898
+
// func (s *Server) handleComAtprotoSyncGetLatestCommit(ctx context.Context,did string) (*comatprototypes.SyncGetLatestCommit_Output, error)
1899
+
out, handleErr = s.handleComAtprotoSyncGetLatestCommit(ctx, did)
1893
1900
if handleErr != nil {
1894
1901
return handleErr
1895
1902
}
···
1917
1924
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRepo")
1918
1925
defer span.End()
1919
1926
did := c.QueryParam("did")
1920
-
earliest := c.QueryParam("earliest")
1921
-
latest := c.QueryParam("latest")
1927
+
since := c.QueryParam("since")
1922
1928
var out io.Reader
1923
1929
var handleErr error
1924
-
// func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context,did string,earliest string,latest string) (io.Reader, error)
1925
-
out, handleErr = s.handleComAtprotoSyncGetRepo(ctx, did, earliest, latest)
1930
+
// func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context,did string,since string) (io.Reader, error)
1931
+
out, handleErr = s.handleComAtprotoSyncGetRepo(ctx, did, since)
1926
1932
if handleErr != nil {
1927
1933
return handleErr
1928
1934
}
···
1932
1938
func (s *Server) HandleComAtprotoSyncListBlobs(c echo.Context) error {
1933
1939
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListBlobs")
1934
1940
defer span.End()
1941
+
cursor := c.QueryParam("cursor")
1935
1942
did := c.QueryParam("did")
1936
-
earliest := c.QueryParam("earliest")
1937
-
latest := c.QueryParam("latest")
1943
+
1944
+
var limit int
1945
+
if p := c.QueryParam("limit"); p != "" {
1946
+
var err error
1947
+
limit, err = strconv.Atoi(p)
1948
+
if err != nil {
1949
+
return err
1950
+
}
1951
+
} else {
1952
+
limit = 500
1953
+
}
1954
+
since := c.QueryParam("since")
1938
1955
var out *comatprototypes.SyncListBlobs_Output
1939
1956
var handleErr error
1940
-
// func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context,did string,earliest string,latest string) (*comatprototypes.SyncListBlobs_Output, error)
1941
-
out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, did, earliest, latest)
1957
+
// func (s *Server) handleComAtprotoSyncListBlobs(ctx context.Context,cursor string,did string,limit int,since string) (*comatprototypes.SyncListBlobs_Output, error)
1958
+
out, handleErr = s.handleComAtprotoSyncListBlobs(ctx, cursor, did, limit, since)
1942
1959
if handleErr != nil {
1943
1960
return handleErr
1944
1961
}
···
2003
2020
}
2004
2021
return nil
2005
2022
}
2023
+
2024
+
func (s *Server) HandleComAtprotoTempUpgradeRepoVersion(c echo.Context) error {
2025
+
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoTempUpgradeRepoVersion")
2026
+
defer span.End()
2027
+
2028
+
var body comatprototypes.TempUpgradeRepoVersion_Input
2029
+
if err := c.Bind(&body); err != nil {
2030
+
return err
2031
+
}
2032
+
var handleErr error
2033
+
// func (s *Server) handleComAtprotoTempUpgradeRepoVersion(ctx context.Context,body *comatprototypes.TempUpgradeRepoVersion_Input) error
2034
+
handleErr = s.handleComAtprotoTempUpgradeRepoVersion(ctx, &body)
2035
+
if handleErr != nil {
2036
+
return handleErr
2037
+
}
2038
+
return nil
2039
+
}
+3
plc/fakedid.go
+3
plc/fakedid.go
···
4
4
"context"
5
5
"crypto/rand"
6
6
"encoding/hex"
7
+
"fmt"
7
8
8
9
"github.com/whyrusleeping/go-did"
9
10
"gorm.io/gorm"
···
32
33
if err := fd.db.First(&rec, "did = ?", udid).Error; err != nil {
33
34
return nil, err
34
35
}
36
+
37
+
fmt.Println("GET DOCUMENT: ", udid, rec.Handle, rec.Service)
35
38
36
39
d, err := did.ParseDID(rec.Did)
37
40
if err != nil {
+4
-4
repo/cbor_gen.go
+4
-4
repo/cbor_gen.go
···
68
68
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil {
69
69
return err
70
70
}
71
-
if _, err := io.WriteString(w, string("rev")); err != nil {
71
+
if _, err := cw.WriteString(string("rev")); err != nil {
72
72
return err
73
73
}
74
74
···
79
79
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil {
80
80
return err
81
81
}
82
-
if _, err := io.WriteString(w, string(t.Rev)); err != nil {
82
+
if _, err := cw.WriteString(string(t.Rev)); err != nil {
83
83
return err
84
84
}
85
85
}
···
373
373
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil {
374
374
return err
375
375
}
376
-
if _, err := io.WriteString(w, string("rev")); err != nil {
376
+
if _, err := cw.WriteString(string("rev")); err != nil {
377
377
return err
378
378
}
379
379
···
384
384
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil {
385
385
return err
386
386
}
387
-
if _, err := io.WriteString(w, string(t.Rev)); err != nil {
387
+
if _, err := cw.WriteString(string(t.Rev)); err != nil {
388
388
return err
389
389
}
390
390
}
+7
-7
repo/repo.go
+7
-7
repo/repo.go
···
253
253
}
254
254
255
255
// creates and writes a new SignedCommit for this repo, with `prev` pointing to old value
256
-
func (r *Repo) Commit(ctx context.Context, signer func(context.Context, string, []byte) ([]byte, error)) (cid.Cid, error) {
256
+
func (r *Repo) Commit(ctx context.Context, signer func(context.Context, string, []byte) ([]byte, error)) (cid.Cid, string, error) {
257
257
ctx, span := otel.Tracer("repo").Start(ctx, "Commit")
258
258
defer span.End()
259
259
260
260
t, err := r.getMst(ctx)
261
261
if err != nil {
262
-
return cid.Undef, err
262
+
return cid.Undef, "", err
263
263
}
264
264
265
265
rcid, err := t.GetPointer(ctx)
266
266
if err != nil {
267
-
return cid.Undef, err
267
+
return cid.Undef, "", err
268
268
}
269
269
270
270
ncom := UnsignedCommit{
···
276
276
277
277
sb, err := ncom.BytesForSigning()
278
278
if err != nil {
279
-
return cid.Undef, fmt.Errorf("failed to serialize commit: %w", err)
279
+
return cid.Undef, "", fmt.Errorf("failed to serialize commit: %w", err)
280
280
}
281
281
sig, err := signer(ctx, ncom.Did, sb)
282
282
if err != nil {
283
-
return cid.Undef, fmt.Errorf("failed to sign root: %w", err)
283
+
return cid.Undef, "", fmt.Errorf("failed to sign root: %w", err)
284
284
}
285
285
286
286
nsc := SignedCommit{
···
294
294
295
295
nsccid, err := r.cst.Put(ctx, &nsc)
296
296
if err != nil {
297
-
return cid.Undef, err
297
+
return cid.Undef, "", err
298
298
}
299
299
300
300
r.sc = nsc
301
301
r.dirty = false
302
302
303
-
return nsccid, nil
303
+
return nsccid, nsc.Rev, nil
304
304
}
305
305
306
306
func (r *Repo) getMst(ctx context.Context) (*mst.MerkleSearchTree, error) {
+13
-63
repomgr/ingest_test.go
+13
-63
repomgr/ingest_test.go
···
64
64
defer fi.Close()
65
65
66
66
ctx := context.TODO()
67
-
if err := repoman.ImportNewRepo(ctx, 2, "", fi, cid.Undef); err != nil {
67
+
if err := repoman.ImportNewRepo(ctx, 2, "", fi, nil); err != nil {
68
68
t.Fatal(err)
69
69
}
70
70
}
···
116
116
}
117
117
cs2 := testCarstore(t, dir2)
118
118
119
+
var since *string
119
120
ctx := context.TODO()
120
-
var prev *cid.Cid
121
121
for i := 0; i < 5; i++ {
122
-
slice, head, tid := doPost(t, cs2, did, prev, i)
122
+
slice, _, nrev, tid := doPost(t, cs2, did, since, i)
123
123
124
124
ops := []*atproto.SyncSubscribeRepos_RepoOp{
125
125
{
···
128
128
},
129
129
}
130
130
131
-
if err := repoman.HandleExternalUserEvent(ctx, 1, 1, did, prev, slice, ops); err != nil {
131
+
if err := repoman.HandleExternalUserEvent(ctx, 1, 1, did, since, nrev, slice, ops); err != nil {
132
132
t.Fatal(err)
133
133
}
134
134
135
-
prev = &head
135
+
since = &nrev
136
136
}
137
137
138
-
latest := *prev
139
-
140
138
// now do a few outside of the standard event stream flow
141
139
for i := 0; i < 5; i++ {
142
-
_, head, _ := doPost(t, cs2, did, prev, i)
143
-
prev = &head
140
+
_, _, nrev, _ := doPost(t, cs2, did, since, i)
141
+
since = &nrev
144
142
}
145
143
146
144
buf := new(bytes.Buffer)
147
-
if err := cs2.ReadUserCar(ctx, 1, latest, *prev, true, buf); err != nil {
145
+
if err := cs2.ReadUserCar(ctx, 1, "", true, buf); err != nil {
148
146
t.Fatal(err)
149
147
}
150
148
151
-
if err := repoman.ImportNewRepo(ctx, 1, did, buf, latest); err != nil {
149
+
if err := repoman.ImportNewRepo(ctx, 1, did, buf, nil); err != nil {
152
150
t.Fatal(err)
153
151
}
154
152
}
155
153
156
-
func doPost(t *testing.T, cs *carstore.CarStore, did string, prev *cid.Cid, postid int) ([]byte, cid.Cid, string) {
154
+
func doPost(t *testing.T, cs *carstore.CarStore, did string, prev *string, postid int) ([]byte, cid.Cid, string, string) {
157
155
ctx := context.TODO()
158
156
ds, err := cs.NewDeltaSession(ctx, 1, prev)
159
157
if err != nil {
···
169
167
t.Fatal(err)
170
168
}
171
169
172
-
root, err := r.Commit(ctx, func(context.Context, string, []byte) ([]byte, error) { return nil, nil })
173
-
if err != nil {
174
-
t.Fatal(err)
175
-
}
176
-
177
-
slice, err := ds.CloseWithRoot(ctx, root)
178
-
if err != nil {
179
-
t.Fatal(err)
180
-
}
181
-
182
-
return slice, root, tid
183
-
}
184
-
185
-
func TestRebase(t *testing.T) {
186
-
dir, err := os.MkdirTemp("", "integtest")
170
+
root, nrev, err := r.Commit(ctx, func(context.Context, string, []byte) ([]byte, error) { return nil, nil })
187
171
if err != nil {
188
172
t.Fatal(err)
189
173
}
190
174
191
-
maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite")))
175
+
slice, err := ds.CloseWithRoot(ctx, root, nrev)
192
176
if err != nil {
193
177
t.Fatal(err)
194
178
}
195
-
maindb.AutoMigrate(models.ActorInfo{})
196
179
197
-
did := "did:plc:beepboop"
198
-
maindb.Create(&models.ActorInfo{
199
-
Did: did,
200
-
Uid: 1,
201
-
})
202
-
203
-
cs := testCarstore(t, dir)
204
-
205
-
repoman := NewRepoManager(cs, &util.FakeKeyManager{})
206
-
207
-
ctx := context.TODO()
208
-
if err := repoman.InitNewActor(ctx, 1, "hello.world", "did:plc:foobar", "", "", ""); err != nil {
209
-
t.Fatal(err)
210
-
}
211
-
212
-
for i := 0; i < 5; i++ {
213
-
_, _, err := repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{
214
-
Text: fmt.Sprintf("hello friend %d", i),
215
-
})
216
-
if err != nil {
217
-
t.Fatal(err)
218
-
}
219
-
}
220
-
221
-
if err := repoman.DoRebase(ctx, 1); err != nil {
222
-
t.Fatal(err)
223
-
}
224
-
225
-
_, _, err = repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{
226
-
Text: "after the rebase",
227
-
})
228
-
if err != nil {
229
-
t.Fatal(err)
230
-
}
180
+
return slice, root, nrev, tid
231
181
}
232
182
233
183
func TestDuplicateRecord(t *testing.T) {
+95
-271
repomgr/repomgr.go
+95
-271
repomgr/repomgr.go
···
16
16
"github.com/bluesky-social/indigo/models"
17
17
"github.com/bluesky-social/indigo/mst"
18
18
"github.com/bluesky-social/indigo/repo"
19
-
"github.com/bluesky-social/indigo/util"
20
19
21
20
"github.com/ipfs/go-cid"
22
21
"github.com/ipfs/go-datastore"
···
70
69
User models.Uid
71
70
OldRoot *cid.Cid
72
71
NewRoot cid.Cid
72
+
Since *string
73
+
Rev string
73
74
RepoSlice []byte
74
75
PDS uint
75
76
Ops []RepoOp
76
-
Rebase bool
77
+
TooBig bool
77
78
}
78
79
79
80
type RepoOp struct {
···
146
147
unlock := rm.lockUser(ctx, user)
147
148
defer unlock()
148
149
149
-
head, err := rm.cs.GetUserRepoHead(ctx, user)
150
+
rev, err := rm.cs.GetUserRepoRev(ctx, user)
150
151
if err != nil {
151
152
return "", cid.Undef, err
152
153
}
153
154
154
-
ds, err := rm.cs.NewDeltaSession(ctx, user, &head)
155
+
ds, err := rm.cs.NewDeltaSession(ctx, user, &rev)
155
156
if err != nil {
156
157
return "", cid.Undef, err
157
158
}
158
159
160
+
head := ds.BaseCid()
161
+
159
162
r, err := repo.OpenRepo(ctx, ds, head, true)
160
163
if err != nil {
161
164
return "", cid.Undef, err
···
166
169
return "", cid.Undef, err
167
170
}
168
171
169
-
nroot, err := r.Commit(ctx, rm.kmgr.SignForUser)
172
+
nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
170
173
if err != nil {
171
174
return "", cid.Undef, err
172
175
}
173
176
174
-
rslice, err := ds.CloseWithRoot(ctx, nroot)
177
+
fmt.Println("NEW REV: ", nrev)
178
+
179
+
rslice, err := ds.CloseWithRoot(ctx, nroot, nrev)
175
180
if err != nil {
176
181
return "", cid.Undef, fmt.Errorf("close with root: %w", err)
177
182
}
···
186
191
User: user,
187
192
OldRoot: oldroot,
188
193
NewRoot: nroot,
194
+
Rev: nrev,
195
+
Since: &rev,
189
196
Ops: []RepoOp{{
190
197
Kind: EvtKindCreateRecord,
191
198
Collection: collection,
···
207
214
unlock := rm.lockUser(ctx, user)
208
215
defer unlock()
209
216
210
-
head, err := rm.cs.GetUserRepoHead(ctx, user)
217
+
rev, err := rm.cs.GetUserRepoRev(ctx, user)
211
218
if err != nil {
212
219
return cid.Undef, err
213
220
}
214
221
215
-
ds, err := rm.cs.NewDeltaSession(ctx, user, &head)
222
+
ds, err := rm.cs.NewDeltaSession(ctx, user, &rev)
216
223
if err != nil {
217
224
return cid.Undef, err
218
225
}
219
226
227
+
head := ds.BaseCid()
220
228
r, err := repo.OpenRepo(ctx, ds, head, true)
221
229
if err != nil {
222
230
return cid.Undef, err
···
228
236
return cid.Undef, err
229
237
}
230
238
231
-
nroot, err := r.Commit(ctx, rm.kmgr.SignForUser)
239
+
nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
232
240
if err != nil {
233
241
return cid.Undef, err
234
242
}
235
243
236
-
rslice, err := ds.CloseWithRoot(ctx, nroot)
244
+
rslice, err := ds.CloseWithRoot(ctx, nroot, nrev)
237
245
if err != nil {
238
246
return cid.Undef, fmt.Errorf("close with root: %w", err)
239
247
}
···
248
256
User: user,
249
257
OldRoot: oldroot,
250
258
NewRoot: nroot,
259
+
Rev: nrev,
260
+
Since: &rev,
251
261
Ops: []RepoOp{{
252
262
Kind: EvtKindUpdateRecord,
253
263
Collection: collection,
···
269
279
unlock := rm.lockUser(ctx, user)
270
280
defer unlock()
271
281
272
-
head, err := rm.cs.GetUserRepoHead(ctx, user)
282
+
rev, err := rm.cs.GetUserRepoRev(ctx, user)
273
283
if err != nil {
274
284
return err
275
285
}
276
286
277
-
ds, err := rm.cs.NewDeltaSession(ctx, user, &head)
287
+
ds, err := rm.cs.NewDeltaSession(ctx, user, &rev)
278
288
if err != nil {
279
289
return err
280
290
}
281
291
292
+
head := ds.BaseCid()
282
293
r, err := repo.OpenRepo(ctx, ds, head, true)
283
294
if err != nil {
284
295
return err
···
289
300
return err
290
301
}
291
302
292
-
nroot, err := r.Commit(ctx, rm.kmgr.SignForUser)
303
+
nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
293
304
if err != nil {
294
305
return err
295
306
}
296
307
297
-
rslice, err := ds.CloseWithRoot(ctx, nroot)
308
+
rslice, err := ds.CloseWithRoot(ctx, nroot, nrev)
298
309
if err != nil {
299
310
return fmt.Errorf("close with root: %w", err)
300
311
}
···
309
320
User: user,
310
321
OldRoot: oldroot,
311
322
NewRoot: nroot,
323
+
Rev: nrev,
324
+
Since: &rev,
312
325
Ops: []RepoOp{{
313
326
Kind: EvtKindDeleteRecord,
314
327
Collection: collection,
···
350
363
return fmt.Errorf("setting initial actor profile: %w", err)
351
364
}
352
365
353
-
root, err := r.Commit(ctx, rm.kmgr.SignForUser)
366
+
root, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
354
367
if err != nil {
355
368
return fmt.Errorf("committing repo for actor init: %w", err)
356
369
}
357
370
358
-
rslice, err := ds.CloseWithRoot(ctx, root)
371
+
rslice, err := ds.CloseWithRoot(ctx, root, nrev)
359
372
if err != nil {
360
373
return fmt.Errorf("close with root: %w", err)
361
374
}
···
364
377
rm.events(ctx, &RepoEvent{
365
378
User: user,
366
379
NewRoot: root,
380
+
Rev: nrev,
367
381
Ops: []RepoOp{{
368
382
Kind: EvtKindCreateRecord,
369
383
Collection: "app.bsky.actor.profile",
···
384
398
return rm.cs.GetUserRepoHead(ctx, user)
385
399
}
386
400
387
-
func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, earlyCid, lateCid cid.Cid, w io.Writer) error {
388
-
return rm.cs.ReadUserCar(ctx, user, earlyCid, lateCid, true, w)
401
+
func (rm *RepoManager) GetRepoRev(ctx context.Context, user models.Uid) (string, error) {
402
+
unlock := rm.lockUser(ctx, user)
403
+
defer unlock()
404
+
405
+
return rm.cs.GetUserRepoRev(ctx, user)
406
+
}
407
+
408
+
func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, since string, w io.Writer) error {
409
+
return rm.cs.ReadUserCar(ctx, user, since, true, w)
389
410
}
390
411
391
412
func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, cbg.CBORMarshaler, error) {
···
445
466
return ap, nil
446
467
}
447
468
448
-
var ErrUncleanRebase = fmt.Errorf("unclean rebase")
449
-
450
-
func (rm *RepoManager) HandleRebase(ctx context.Context, pdsid uint, uid models.Uid, did string, prev *cid.Cid, commit cid.Cid, carslice []byte) error {
451
-
ctx, span := otel.Tracer("repoman").Start(ctx, "HandleRebase")
452
-
defer span.End()
453
-
454
-
log.Infow("HandleRebase", "pds", pdsid, "uid", uid, "commit", commit)
455
-
456
-
unlock := rm.lockUser(ctx, uid)
457
-
defer unlock()
458
-
459
-
ro, err := rm.cs.ReadOnlySession(uid)
460
-
if err != nil {
461
-
return err
462
-
}
463
-
464
-
head, err := rm.cs.GetUserRepoHead(ctx, uid)
465
-
if err != nil {
466
-
return err
467
-
}
468
-
469
-
// TODO: do we allow prev to be nil in any case here?
470
-
if prev != nil {
471
-
if *prev != head {
472
-
log.Warnw("rebase 'prev' value did not match our latest head for repo", "did", did, "rprev", prev.String(), "lprev", head.String())
473
-
}
474
-
}
475
-
476
-
currepo, err := repo.OpenRepo(ctx, ro, head, true)
477
-
if err != nil {
478
-
return err
479
-
}
480
-
481
-
olddc := currepo.DataCid()
482
-
483
-
root, ds, err := rm.cs.ImportSlice(ctx, uid, nil, carslice)
484
-
if err != nil {
485
-
return fmt.Errorf("importing external carslice: %w", err)
486
-
}
487
-
488
-
r, err := repo.OpenRepo(ctx, ds, root, true)
489
-
if err != nil {
490
-
return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err)
491
-
}
492
-
493
-
if r.DataCid() != olddc {
494
-
return ErrUncleanRebase
495
-
}
496
-
497
-
if err := rm.CheckRepoSig(ctx, r, did); err != nil {
498
-
return err
499
-
}
500
-
501
-
// TODO: this is moderately expensive and currently results in the users
502
-
// entire repo being held in memory
503
-
if err := r.CopyDataTo(ctx, ds); err != nil {
504
-
return err
505
-
}
506
-
507
-
if err := ds.CloseAsRebase(ctx, root); err != nil {
508
-
return fmt.Errorf("finalizing rebase: %w", err)
509
-
}
510
-
511
-
if rm.events != nil {
512
-
rm.events(ctx, &RepoEvent{
513
-
User: uid,
514
-
OldRoot: prev,
515
-
NewRoot: root,
516
-
Ops: nil,
517
-
RepoSlice: carslice,
518
-
PDS: pdsid,
519
-
Rebase: true,
520
-
})
521
-
}
522
-
523
-
return nil
524
-
}
525
-
526
-
func (rm *RepoManager) DoRebase(ctx context.Context, uid models.Uid) error {
527
-
ctx, span := otel.Tracer("repoman").Start(ctx, "DoRebase")
528
-
defer span.End()
529
-
530
-
log.Infow("DoRebase", "uid", uid)
531
-
532
-
unlock := rm.lockUser(ctx, uid)
533
-
defer unlock()
534
-
535
-
ds, err := rm.cs.NewDeltaSession(ctx, uid, nil)
536
-
if err != nil {
537
-
return err
538
-
}
539
-
540
-
head, err := rm.cs.GetUserRepoHead(ctx, uid)
541
-
if err != nil {
542
-
return err
543
-
}
544
-
545
-
r, err := repo.OpenRepo(ctx, ds, head, true)
546
-
if err != nil {
547
-
return err
548
-
}
549
-
550
-
r.Truncate()
551
-
552
-
nroot, err := r.Commit(ctx, rm.kmgr.SignForUser)
553
-
if err != nil {
554
-
return err
555
-
}
556
-
557
-
if err := r.CopyDataTo(ctx, ds); err != nil {
558
-
return err
559
-
}
560
-
561
-
if err := ds.CloseAsRebase(ctx, nroot); err != nil {
562
-
return fmt.Errorf("finalizing rebase: %w", err)
563
-
}
564
-
565
-
// outbound car slice should just be the new signed root
566
-
buf := new(bytes.Buffer)
567
-
if _, err := carstore.WriteCarHeader(buf, nroot); err != nil {
568
-
return err
569
-
}
570
-
571
-
robj, err := ds.Get(ctx, nroot)
572
-
if err != nil {
573
-
return err
574
-
}
575
-
_, err = carstore.LdWrite(buf, robj.Cid().Bytes(), robj.RawData())
576
-
if err != nil {
577
-
return err
578
-
}
579
-
580
-
if rm.events != nil {
581
-
rm.events(ctx, &RepoEvent{
582
-
User: uid,
583
-
OldRoot: &head,
584
-
NewRoot: nroot,
585
-
Ops: nil,
586
-
RepoSlice: buf.Bytes(),
587
-
PDS: 0,
588
-
Rebase: true,
589
-
})
590
-
}
591
-
592
-
return nil
593
-
}
594
-
595
469
func (rm *RepoManager) CheckRepoSig(ctx context.Context, r *repo.Repo, expdid string) error {
596
470
ctx, span := otel.Tracer("repoman").Start(ctx, "CheckRepoSig")
597
471
defer span.End()
···
615
489
return nil
616
490
}
617
491
618
-
func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, prev *cid.Cid, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error {
492
+
func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error {
619
493
ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent")
620
494
defer span.End()
621
495
622
-
log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "prev", prev)
496
+
log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev)
623
497
624
498
unlock := rm.lockUser(ctx, uid)
625
499
defer unlock()
626
500
627
-
root, ds, err := rm.cs.ImportSlice(ctx, uid, prev, carslice)
501
+
root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice)
628
502
if err != nil {
629
503
return fmt.Errorf("importing external carslice: %w", err)
630
504
}
···
684
558
}
685
559
}
686
560
687
-
rslice, err := ds.CloseWithRoot(ctx, root)
561
+
rslice, err := ds.CloseWithRoot(ctx, root, nrev)
688
562
if err != nil {
689
563
return fmt.Errorf("close with root: %w", err)
690
564
}
691
565
692
566
if rm.events != nil {
693
567
rm.events(ctx, &RepoEvent{
694
-
User: uid,
695
-
OldRoot: prev,
568
+
User: uid,
569
+
//OldRoot: prev,
696
570
NewRoot: root,
571
+
Rev: nrev,
572
+
Since: since,
697
573
Ops: evtops,
698
574
RepoSlice: rslice,
699
575
PDS: pdsid,
···
714
590
unlock := rm.lockUser(ctx, user)
715
591
defer unlock()
716
592
717
-
head, err := rm.cs.GetUserRepoHead(ctx, user)
593
+
rev, err := rm.cs.GetUserRepoRev(ctx, user)
718
594
if err != nil {
719
595
return err
720
596
}
721
597
722
-
ds, err := rm.cs.NewDeltaSession(ctx, user, &head)
598
+
ds, err := rm.cs.NewDeltaSession(ctx, user, &rev)
723
599
if err != nil {
724
600
return err
725
601
}
726
602
603
+
head := ds.BaseCid()
727
604
r, err := repo.OpenRepo(ctx, ds, head, true)
728
605
if err != nil {
729
606
return err
···
786
663
}
787
664
}
788
665
789
-
nroot, err := r.Commit(ctx, rm.kmgr.SignForUser)
666
+
nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
790
667
if err != nil {
791
668
return err
792
669
}
793
670
794
-
rslice, err := ds.CloseWithRoot(ctx, nroot)
671
+
rslice, err := ds.CloseWithRoot(ctx, nroot, nrev)
795
672
if err != nil {
796
673
return fmt.Errorf("close with root: %w", err)
797
674
}
···
807
684
OldRoot: oldroot,
808
685
NewRoot: nroot,
809
686
RepoSlice: rslice,
687
+
Rev: nrev,
688
+
Since: &rev,
810
689
Ops: ops,
811
690
})
812
691
}
···
814
693
return nil
815
694
}
816
695
817
-
func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoDid string, r io.Reader, oldest cid.Cid) error {
696
+
func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoDid string, r io.Reader, rev *string) error {
818
697
ctx, span := otel.Tracer("repoman").Start(ctx, "ImportNewRepo")
819
698
defer span.End()
820
699
821
700
unlock := rm.lockUser(ctx, user)
822
701
defer unlock()
823
702
824
-
head, err := rm.cs.GetUserRepoHead(ctx, user)
703
+
currev, err := rm.cs.GetUserRepoRev(ctx, user)
825
704
if err != nil {
826
705
return err
827
706
}
828
707
829
-
if head != oldest {
708
+
curhead, err := rm.cs.GetUserRepoHead(ctx, user)
709
+
if err != nil {
710
+
return err
711
+
}
712
+
713
+
if rev != nil && *rev != currev {
830
714
// TODO: we could probably just deal with this
831
715
return fmt.Errorf("ImportNewRepo called with incorrect base")
832
716
}
833
717
834
-
err = rm.processNewRepo(ctx, user, r, head, func(ctx context.Context, old, nu cid.Cid, finish func(context.Context) ([]byte, error), bs blockstore.Blockstore) error {
835
-
r, err := repo.OpenRepo(ctx, bs, nu, true)
718
+
err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error {
719
+
r, err := repo.OpenRepo(ctx, bs, root, true)
836
720
if err != nil {
837
721
return fmt.Errorf("opening new repo: %w", err)
838
722
}
···
848
732
return fmt.Errorf("new user signature check failed: %w", err)
849
733
}
850
734
851
-
diffops, err := r.DiffSince(ctx, old)
735
+
diffops, err := r.DiffSince(ctx, curhead)
852
736
if err != nil {
853
737
return fmt.Errorf("diff trees: %w", err)
854
738
}
···
865
749
}
866
750
}
867
751
868
-
slice, err := finish(ctx)
752
+
slice, err := finish(ctx, scom.Rev)
869
753
if err != nil {
870
754
return err
871
755
}
872
756
873
-
var oldroot *cid.Cid
874
-
if old.Defined() {
875
-
oldroot = &old
876
-
}
877
-
878
757
if rm.events != nil {
879
758
rm.events(ctx, &RepoEvent{
880
-
User: user,
881
-
OldRoot: oldroot,
882
-
NewRoot: nu,
759
+
User: user,
760
+
//OldRoot: oldroot,
761
+
NewRoot: root,
762
+
Rev: scom.Rev,
763
+
Since: &currev,
883
764
RepoSlice: slice,
884
765
Ops: ops,
885
766
})
···
888
769
return nil
889
770
})
890
771
if err != nil {
891
-
return fmt.Errorf("process new repo (current head: %s): %w:", head, err)
772
+
return fmt.Errorf("process new repo (current rev: %s): %w:", currev, err)
892
773
}
893
774
894
775
return nil
···
944
825
}
945
826
}
946
827
947
-
func (rm *RepoManager) processNewRepo(ctx context.Context, user models.Uid, r io.Reader, until cid.Cid, cb func(ctx context.Context, old, nu cid.Cid, finish func(context.Context) ([]byte, error), bs blockstore.Blockstore) error) error {
828
+
func (rm *RepoManager) processNewRepo(ctx context.Context, user models.Uid, r io.Reader, rev *string, cb func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error) error {
948
829
ctx, span := otel.Tracer("repoman").Start(ctx, "processNewRepo")
949
830
defer span.End()
950
831
···
973
854
}
974
855
}
975
856
976
-
head := &carr.Header.Roots[0]
977
-
978
-
var commits []cid.Cid
979
-
for head != nil && *head != until {
980
-
commits = append(commits, *head)
981
-
rep, err := repo.OpenRepo(ctx, membs, *head, true)
982
-
if err != nil {
983
-
return fmt.Errorf("opening repo for backwalk (%d commits, until: %s, head: %s, carRoot: %s): %w", len(commits), until, *head, carr.Header.Roots[0], err)
984
-
}
985
-
986
-
prev, err := rep.PrevCommit(ctx)
987
-
if err != nil {
988
-
return fmt.Errorf("prevCommit: %w", err)
989
-
}
990
-
991
-
head = prev
992
-
}
993
-
994
-
if until.Defined() && (head == nil || *head != until) {
995
-
// TODO: this shouldnt be happening, but i've seen some log messages
996
-
// suggest that it might. Leaving this here to discover any cases where
997
-
// it does.
998
-
log.Errorw("reached end of walkback without finding our 'until' commit",
999
-
"until", until,
1000
-
"root", carr.Header.Roots[0],
1001
-
"commits", len(commits),
1002
-
"head", head,
1003
-
"user", user,
1004
-
)
1005
-
}
1006
-
1007
-
// now we need to generate repo slices for each commit
1008
-
1009
857
seen := make(map[cid.Cid]bool)
1010
858
1011
-
if until.Defined() {
1012
-
seen[until] = true
859
+
root := carr.Header.Roots[0]
860
+
// TODO: if there are blocks that get convergently recreated throughout
861
+
// the repos lifecycle, this will end up erroneously not including
862
+
// them. We should compute the set of blocks needed to read any repo
863
+
// ops that happened in the commit and use that for our 'output' blocks
864
+
cids, err := walkTree(ctx, seen, root, membs, true)
865
+
if err != nil {
866
+
return fmt.Errorf("walkTree: %w", err)
1013
867
}
1014
868
1015
-
cbs := membs
1016
-
if until.Defined() {
1017
-
bs, err := rm.cs.ReadOnlySession(user)
1018
-
if err != nil {
1019
-
return err
1020
-
}
1021
-
1022
-
// TODO: we technically only need this for the 'next' commit to diff against our current head.
1023
-
cbs = util.NewReadThroughBstore(bs, membs)
869
+
ds, err := rm.cs.NewDeltaSession(ctx, user, rev)
870
+
if err != nil {
871
+
return fmt.Errorf("opening delta session: %w", err)
1024
872
}
1025
873
1026
-
prev := until
1027
-
for i := len(commits) - 1; i >= 0; i-- {
1028
-
root := commits[i]
1029
-
// TODO: if there are blocks that get convergently recreated throughout
1030
-
// the repos lifecycle, this will end up erroneously not including
1031
-
// them. We should compute the set of blocks needed to read any repo
1032
-
// ops that happened in the commit and use that for our 'output' blocks
1033
-
cids, err := walkTree(ctx, seen, root, membs, true)
874
+
for _, c := range cids {
875
+
blk, err := membs.Get(ctx, c)
1034
876
if err != nil {
1035
-
return fmt.Errorf("walkTree: %w", err)
877
+
return fmt.Errorf("copying walked cids to carstore: %w", err)
1036
878
}
1037
879
1038
-
var prevptr *cid.Cid
1039
-
if prev.Defined() {
1040
-
prevptr = &prev
880
+
if err := ds.Put(ctx, blk); err != nil {
881
+
return err
1041
882
}
1042
-
ds, err := rm.cs.NewDeltaSession(ctx, user, prevptr)
1043
-
if err != nil {
1044
-
return fmt.Errorf("opening delta session (%d / %d): %w", i, len(commits)-1, err)
1045
-
}
883
+
}
1046
884
1047
-
for _, c := range cids {
1048
-
blk, err := membs.Get(ctx, c)
1049
-
if err != nil {
1050
-
return fmt.Errorf("copying walked cids to carstore: %w", err)
1051
-
}
1052
-
1053
-
if err := ds.Put(ctx, blk); err != nil {
1054
-
return err
1055
-
}
1056
-
}
1057
-
1058
-
finish := func(ctx context.Context) ([]byte, error) {
1059
-
return ds.CloseWithRoot(ctx, root)
1060
-
}
885
+
finish := func(ctx context.Context, nrev string) ([]byte, error) {
886
+
return ds.CloseWithRoot(ctx, root, nrev)
887
+
}
1061
888
1062
-
if err := cb(ctx, prev, root, finish, cbs); err != nil {
1063
-
return fmt.Errorf("cb errored (%d/%d) root: %s, prev: %s: %w", i, len(commits)-1, root, prev, err)
1064
-
}
1065
-
1066
-
prev = root
889
+
if err := cb(ctx, root, finish, membs); err != nil {
890
+
return fmt.Errorf("cb errored root: %s, rev: %s: %w", root, *rev, err)
1067
891
}
1068
892
1069
893
return nil
+1
-1
search/server.go
+1
-1
search/server.go
···
251
251
}
252
252
253
253
func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
254
-
repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String())
254
+
repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "")
255
255
if err != nil {
256
256
return err
257
257
}
+1
-1
testing/car_did_repro_test.go
+1
-1
testing/car_did_repro_test.go
+2
-122
testing/integ_test.go
+2
-122
testing/integ_test.go
···
166
166
time.Sleep(time.Millisecond * 50)
167
167
168
168
// Now, the bgs will discover a gap, and have to catch up somehow
169
+
fmt.Println("EXPECT BGS TO CATCHUP NOW")
169
170
socialSim(t, users2, 1, 0)
170
171
171
172
time.Sleep(time.Second)
···
228
229
p2.RequestScraping(t, b1)
229
230
time.Sleep(time.Millisecond * 50)
230
231
232
+
fmt.Println("AFTER THIS EXPECT THE BGS TO DO A GETREPO TO CATCH UP")
231
233
// Now, the bgs will discover a gap, and have to catch up somehow
232
234
socialSim(t, users2, 1, 0)
233
235
···
330
332
331
333
last := es2.Next()
332
334
assert.Equal(alice.did, last.RepoCommit.Repo)
333
-
}
334
-
335
-
func TestRebase(t *testing.T) {
336
-
if testing.Short() {
337
-
t.Skip("skipping BGS test in 'short' test mode")
338
-
}
339
-
assert := assert.New(t)
340
-
didr := TestPLC(t)
341
-
p1 := MustSetupPDS(t, ".tpds", didr)
342
-
p1.Run(t)
343
-
344
-
b1 := MustSetupBGS(t, didr)
345
-
b1.Run(t)
346
-
347
-
b1.tr.TrialHosts = []string{p1.RawHost()}
348
-
349
-
p1.RequestScraping(t, b1)
350
-
351
-
time.Sleep(time.Millisecond * 50)
352
-
353
-
bob := p1.MustNewUser(t, "bob.tpds")
354
-
355
-
bob.Post(t, "cats for cats")
356
-
bob.Post(t, "i am the king of the world")
357
-
bob.Post(t, "the name is bob")
358
-
bob.Post(t, "why cant i eat pie")
359
-
360
-
time.Sleep(time.Millisecond * 100)
361
-
362
-
evts1 := b1.Events(t, 0)
363
-
defer evts1.Cancel()
364
-
365
-
preRebaseEvts := evts1.WaitFor(5)
366
-
fmt.Println(preRebaseEvts)
367
-
368
-
bob.DoRebase(t)
369
-
370
-
rbevt := evts1.Next()
371
-
assert.Equal(true, rbevt.RepoCommit.Rebase)
372
-
373
-
sc := commitFromSlice(t, rbevt.RepoCommit.Blocks, (cid.Cid)(rbevt.RepoCommit.Commit))
374
-
assert.Nil(sc.Prev)
375
-
376
-
lev := preRebaseEvts[4]
377
-
oldsc := commitFromSlice(t, lev.RepoCommit.Blocks, (cid.Cid)(lev.RepoCommit.Commit))
378
-
379
-
assert.Equal(sc.Data, oldsc.Data)
380
-
381
-
evts2 := b1.Events(t, 0)
382
-
afterEvts := evts2.WaitFor(1)
383
-
assert.Equal(true, afterEvts[0].RepoCommit.Rebase)
384
-
}
385
-
386
-
func TestRebaseMulti(t *testing.T) {
387
-
if testing.Short() {
388
-
t.Skip("skipping BGS test in 'short' test mode")
389
-
}
390
-
assert := assert.New(t)
391
-
didr := TestPLC(t)
392
-
p1 := MustSetupPDS(t, ".tpds", didr)
393
-
p1.Run(t)
394
-
395
-
b1 := MustSetupBGS(t, didr)
396
-
b1.Run(t)
397
-
398
-
b1.tr.TrialHosts = []string{p1.RawHost()}
399
-
400
-
p1.RequestScraping(t, b1)
401
-
402
-
esgenesis := b1.Events(t, 0)
403
-
404
-
time.Sleep(time.Millisecond * 50)
405
-
406
-
bob := p1.MustNewUser(t, "bob.tpds")
407
-
408
-
for i := 0; i < 10; i++ {
409
-
bob.Post(t, fmt.Sprintf("this is bobs post %d", i))
410
-
}
411
-
412
-
// wait for 11 events, the first one is the actor creation
413
-
firsten := esgenesis.WaitFor(11)
414
-
_ = firsten
415
-
416
-
fmt.Println("REBASE ONE")
417
-
bob.DoRebase(t)
418
-
419
-
var posts []*atproto.RepoStrongRef
420
-
for i := 0; i < 10; i++ {
421
-
ref := bob.Post(t, fmt.Sprintf("this is bobs post after rebase %d", i))
422
-
posts = append(posts, ref)
423
-
}
424
-
425
-
time.Sleep(time.Millisecond * 50)
426
-
427
-
evts1 := b1.Events(t, 0)
428
-
defer evts1.Cancel()
429
-
430
-
all := evts1.WaitFor(11)
431
-
432
-
assert.Equal(true, all[0].RepoCommit.Rebase)
433
-
assert.Equal(int64(12), all[0].RepoCommit.Seq)
434
-
assert.Equal(posts[0].Cid, all[1].RepoCommit.Ops[0].Cid.String())
435
-
436
-
// and another one!
437
-
fmt.Println("REBASE TWO")
438
-
bob.DoRebase(t)
439
-
440
-
var posts2 []*atproto.RepoStrongRef
441
-
for i := 0; i < 15; i++ {
442
-
ref := bob.Post(t, fmt.Sprintf("this is bobs post after second rebase %d", i))
443
-
posts2 = append(posts2, ref)
444
-
}
445
-
446
-
time.Sleep(time.Millisecond * 50)
447
-
448
-
evts2 := b1.Events(t, 0)
449
-
defer evts2.Cancel()
450
-
451
-
all = evts2.WaitFor(16)
452
-
453
-
assert.Equal(true, all[0].RepoCommit.Rebase)
454
-
assert.Equal(posts2[0].Cid, all[1].RepoCommit.Ops[0].Cid.String())
455
335
}
456
336
457
337
func jsonPrint(v any) {
testing/testdata/greenground.repo.car
testing/testdata/greenground.repo.car
This is a binary file and will not be displayed.