+15
-16
hydration/hydrator.go
+15
-16
hydration/hydrator.go
···
10
10
db *gorm.DB
11
11
dir identity.Directory
12
12
13
-
missingActorCallback func(string)
14
-
missingPostCallback func(string)
15
-
missingFeedGeneratorCallback func(string)
13
+
missingRecordCallback func(string, bool)
16
14
}
17
15
18
16
// NewHydrator creates a new Hydrator
···
23
21
}
24
22
}
25
23
26
-
func (h *Hydrator) SetMissingActorCallback(fn func(string)) {
27
-
h.missingActorCallback = fn
24
+
// SetMissingRecordCallback sets the callback for when a record is missing
25
+
// The callback receives an identifier which can be:
26
+
// - A DID (e.g., "did:plc:...") for actors/profiles
27
+
// - An AT-URI (e.g., "at://did:plc:.../app.bsky.feed.post/...") for posts
28
+
// - An AT-URI (e.g., "at://did:plc:.../app.bsky.feed.generator/...") for feed generators
29
+
func (h *Hydrator) SetMissingRecordCallback(fn func(string, bool)) {
30
+
h.missingRecordCallback = fn
28
31
}
29
32
30
-
func (h *Hydrator) addMissingActor(did string) {
31
-
if h.missingActorCallback != nil {
32
-
h.missingActorCallback(did)
33
+
// AddMissingRecord reports a missing record that needs to be fetched
34
+
func (h *Hydrator) AddMissingRecord(identifier string, wait bool) {
35
+
if h.missingRecordCallback != nil {
36
+
h.missingRecordCallback(identifier, wait)
33
37
}
34
38
}
35
39
36
-
func (h *Hydrator) SetMissingFeedGeneratorCallback(fn func(string)) {
37
-
h.missingFeedGeneratorCallback = fn
38
-
}
39
-
40
-
func (h *Hydrator) AddMissingFeedGenerator(uri string) {
41
-
if h.missingFeedGeneratorCallback != nil {
42
-
h.missingFeedGeneratorCallback(uri)
43
-
}
40
+
// addMissingActor is a convenience method for adding missing actors
41
+
func (h *Hydrator) addMissingActor(did string) {
42
+
h.AddMissingRecord(did, false)
44
43
}
45
44
46
45
// HydrateCtx contains context for hydration operations
+23
missing.go
+23
missing.go
···
24
24
type MissingRecord struct {
25
25
Type MissingRecordType
26
26
Identifier string // DID for profiles, AT-URI for posts/feedgens
27
+
Wait bool
28
+
29
+
waitch chan struct{}
27
30
}
28
31
29
32
func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) {
33
+
if rec.Wait {
34
+
rec.waitch = make(chan struct{})
35
+
}
36
+
30
37
select {
31
38
case s.missingRecords <- rec:
32
39
case <-ctx.Done():
33
40
}
41
+
42
+
if rec.Wait {
43
+
select {
44
+
case <-rec.waitch:
45
+
case <-ctx.Done():
46
+
}
47
+
}
34
48
}
35
49
36
50
// Legacy methods for backward compatibility
···
74
88
75
89
if err != nil {
76
90
slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err)
91
+
}
92
+
93
+
if rec.Wait {
94
+
close(rec.waitch)
77
95
}
78
96
}
79
97
}
80
98
81
99
func (s *Server) fetchMissingProfile(ctx context.Context, did string) error {
100
+
s.backend.addRelevantDid(did)
101
+
82
102
repo, err := s.backend.getOrCreateRepo(ctx, did)
83
103
if err != nil {
84
104
return err
···
126
146
collection := puri.Collection().String()
127
147
rkey := puri.RecordKey().String()
128
148
149
+
s.backend.addRelevantDid(did)
150
+
129
151
repo, err := s.backend.getOrCreateRepo(ctx, did)
130
152
if err != nil {
131
153
return err
···
172
194
did := puri.Authority().String()
173
195
collection := puri.Collection().String()
174
196
rkey := puri.RecordKey().String()
197
+
s.backend.addRelevantDid(did)
175
198
176
199
repo, err := s.backend.getOrCreateRepo(ctx, did)
177
200
if err != nil {
+23
-4
pgbackend.go
+23
-4
pgbackend.go
···
409
409
return &r, nil
410
410
}
411
411
412
-
func (b *PostgresBackend) TrackMissingActor(did string) {
413
-
b.s.addMissingProfile(context.TODO(), did)
412
+
func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) {
413
+
b.s.addMissingRecord(context.TODO(), MissingRecord{
414
+
Type: inferRecordType(identifier),
415
+
Identifier: identifier,
416
+
Wait: wait,
417
+
})
414
418
}
415
419
416
-
func (b *PostgresBackend) TrackMissingFeedGenerator(uri string) {
417
-
b.s.addMissingFeedGenerator(context.TODO(), uri)
420
+
// inferRecordType determines the record type based on the identifier format
421
+
func inferRecordType(identifier string) MissingRecordType {
422
+
if strings.HasPrefix(identifier, "did:") {
423
+
return MissingRecordTypeProfile
424
+
}
425
+
426
+
if strings.HasPrefix(identifier, "at://") {
427
+
if strings.Contains(identifier, "/app.bsky.feed.post/") {
428
+
return MissingRecordTypePost
429
+
}
430
+
if strings.Contains(identifier, "/app.bsky.feed.generator/") {
431
+
return MissingRecordTypeFeedGenerator
432
+
}
433
+
}
434
+
435
+
// Default to post if we can't determine
436
+
return MissingRecordTypePost
418
437
}
+36
-18
xrpc/feed/getFeed.go
+36
-18
xrpc/feed/getFeed.go
···
5
5
"log/slog"
6
6
"net/http"
7
7
"strconv"
8
+
"strings"
9
+
"sync"
8
10
9
11
"github.com/bluesky-social/indigo/api/bsky"
10
12
"github.com/bluesky-social/indigo/atproto/identity"
···
64
66
}
65
67
66
68
if feedGen.ID == 0 {
67
-
hydrator.AddMissingFeedGenerator(feedURI)
69
+
hydrator.AddMissingRecord(feedURI, true)
68
70
return c.JSON(http.StatusNotFound, map[string]any{
69
71
"error": "NotFound",
70
72
"message": "feed generator not found",
···
150
152
151
153
// Hydrate the posts from the skeleton
152
154
posts := make([]*bsky.FeedDefs_FeedViewPost, 0, len(skeleton.Feed))
153
-
for _, skeletonPost := range skeleton.Feed {
154
-
postURI, err := syntax.ParseATURI(skeletonPost.Post)
155
-
if err != nil {
156
-
slog.Warn("invalid post URI in skeleton", "uri", skeletonPost.Post, "error", err)
157
-
continue
158
-
}
155
+
var wg sync.WaitGroup
156
+
for i := range skeleton.Feed {
157
+
wg.Add(1)
158
+
go func(ix int) {
159
+
defer wg.Done()
160
+
skeletonPost := skeleton.Feed[ix]
161
+
postURI, err := syntax.ParseATURI(skeletonPost.Post)
162
+
if err != nil {
163
+
slog.Warn("invalid post URI in skeleton", "uri", skeletonPost.Post, "error", err)
164
+
return
165
+
}
159
166
160
-
postInfo, err := hydrator.HydratePost(ctx, string(postURI), viewer)
161
-
if err != nil {
162
-
slog.Warn("failed to hydrate post", "uri", postURI, "error", err)
163
-
continue
164
-
}
167
+
postInfo, err := hydrator.HydratePost(ctx, postURI.String(), viewer)
168
+
if err != nil {
169
+
if strings.Contains(err.Error(), "post not found") {
170
+
hydrator.AddMissingRecord(postURI.String(), true)
171
+
postInfo, err = hydrator.HydratePost(ctx, postURI.String(), viewer)
172
+
if err != nil {
173
+
slog.Error("failed to hydrate post after fetch missing", "uri", postURI, "error", err)
174
+
return
175
+
}
176
+
} else {
177
+
slog.Warn("failed to hydrate post", "uri", postURI, "error", err)
178
+
return
179
+
}
180
+
}
165
181
166
-
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
167
-
if err != nil {
168
-
slog.Warn("failed to hydrate author", "did", postInfo.Author, "error", err)
169
-
continue
170
-
}
182
+
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
183
+
if err != nil {
184
+
slog.Warn("failed to hydrate author", "did", postInfo.Author, "error", err)
185
+
return
186
+
}
171
187
172
-
posts = append(posts, views.FeedViewPost(postInfo, authorInfo))
188
+
posts[ix] = views.FeedViewPost(postInfo, authorInfo)
189
+
}(i)
173
190
}
191
+
wg.Wait()
174
192
175
193
output := &bsky.FeedGetFeed_Output{
176
194
Feed: posts,
+1
-1
xrpc/feed/getFeedGenerator.go
+1
-1
xrpc/feed/getFeedGenerator.go
+7
-4
xrpc/server.go
+7
-4
xrpc/server.go
···
31
31
type Backend interface {
32
32
// Add methods as needed for data access
33
33
34
-
TrackMissingActor(did string)
35
-
TrackMissingFeedGenerator(uri string)
34
+
TrackMissingRecord(identifier string, wait bool)
36
35
}
37
36
38
37
// NewServer creates a new XRPC server
···
60
59
hydrator: hydration.NewHydrator(db, dir),
61
60
}
62
61
63
-
s.hydrator.SetMissingActorCallback(backend.TrackMissingActor)
64
-
s.hydrator.SetMissingFeedGeneratorCallback(backend.TrackMissingFeedGenerator)
62
+
s.hydrator.SetMissingRecordCallback(backend.TrackMissingRecord)
65
63
66
64
// Register XRPC endpoints
67
65
s.registerEndpoints()
···
78
76
// registerEndpoints registers all XRPC endpoints
79
77
func (s *Server) registerEndpoints() {
80
78
// XRPC endpoints follow the pattern: /xrpc/<namespace>.<method>
79
+
80
+
s.e.GET("/.well-known/did.json", func(c echo.Context) error {
81
+
return c.File("did.json")
82
+
})
83
+
81
84
xrpcGroup := s.e.Group("/xrpc")
82
85
83
86
// com.atproto.identity.*