+36
-7
backend/backend.go
+36
-7
backend/backend.go
···
10
11
"github.com/bluesky-social/indigo/api/atproto"
12
"github.com/bluesky-social/indigo/api/bsky"
13
"github.com/bluesky-social/indigo/atproto/syntax"
14
"github.com/bluesky-social/indigo/util"
15
"github.com/bluesky-social/indigo/xrpc"
···
26
27
// PostgresBackend handles database operations
28
type PostgresBackend struct {
29
-
db *gorm.DB
30
-
pgx *pgxpool.Pool
31
-
tracker RecordTracker
32
33
client *xrpc.Client
34
···
46
didByIDCache *lru.TwoQueueCache[uint, string]
47
48
postInfoCache *lru.TwoQueueCache[string, cachedPostInfo]
49
}
50
51
type cachedPostInfo struct {
···
54
}
55
56
// NewPostgresBackend creates a new PostgresBackend
57
-
func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, tracker RecordTracker) (*PostgresBackend, error) {
58
rc, _ := lru.New2Q[string, *Repo](1_000_000)
59
pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000)
60
revc, _ := lru.New2Q[uint, string](1_000_000)
···
65
mydid: mydid,
66
db: db,
67
pgx: pgx,
68
-
tracker: tracker,
69
relevantDids: make(map[string]bool),
70
repoCache: rc,
71
postInfoCache: pc,
72
revCache: revc,
73
didByIDCache: dbic,
74
}
75
76
r, err := b.GetOrCreateRepo(context.TODO(), mydid)
···
79
}
80
81
b.myrepo = r
82
return b, nil
83
}
84
85
// TrackMissingRecord implements the RecordTracker interface
86
func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) {
87
-
if b.tracker != nil {
88
-
b.tracker.TrackMissingRecord(identifier, wait)
89
}
90
}
91
92
// DidToID converts a DID to a database ID
···
10
11
"github.com/bluesky-social/indigo/api/atproto"
12
"github.com/bluesky-social/indigo/api/bsky"
13
+
"github.com/bluesky-social/indigo/atproto/identity"
14
"github.com/bluesky-social/indigo/atproto/syntax"
15
"github.com/bluesky-social/indigo/util"
16
"github.com/bluesky-social/indigo/xrpc"
···
27
28
// PostgresBackend handles database operations
29
type PostgresBackend struct {
30
+
db *gorm.DB
31
+
pgx *pgxpool.Pool
32
+
33
+
dir identity.Directory
34
35
client *xrpc.Client
36
···
48
didByIDCache *lru.TwoQueueCache[uint, string]
49
50
postInfoCache *lru.TwoQueueCache[string, cachedPostInfo]
51
+
52
+
missingRecords chan MissingRecord
53
}
54
55
type cachedPostInfo struct {
···
58
}
59
60
// NewPostgresBackend creates a new PostgresBackend
61
+
func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, dir identity.Directory) (*PostgresBackend, error) {
62
rc, _ := lru.New2Q[string, *Repo](1_000_000)
63
pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000)
64
revc, _ := lru.New2Q[uint, string](1_000_000)
···
69
mydid: mydid,
70
db: db,
71
pgx: pgx,
72
relevantDids: make(map[string]bool),
73
repoCache: rc,
74
postInfoCache: pc,
75
revCache: revc,
76
didByIDCache: dbic,
77
+
dir: dir,
78
+
79
+
missingRecords: make(chan MissingRecord, 1000),
80
}
81
82
r, err := b.GetOrCreateRepo(context.TODO(), mydid)
···
85
}
86
87
b.myrepo = r
88
+
89
+
go b.missingRecordFetcher()
90
return b, nil
91
}
92
93
// TrackMissingRecord implements the RecordTracker interface
94
func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) {
95
+
mr := MissingRecord{
96
+
Type: mrTypeFromIdent(identifier),
97
+
Identifier: identifier,
98
+
Wait: wait,
99
+
}
100
+
101
+
b.addMissingRecord(context.TODO(), mr)
102
+
}
103
+
104
+
func mrTypeFromIdent(ident string) MissingRecordType {
105
+
if strings.HasPrefix(ident, "did:") {
106
+
return MissingRecordTypeProfile
107
+
}
108
+
109
+
puri, _ := syntax.ParseATURI(ident)
110
+
switch puri.Collection().String() {
111
+
case "app.bsky.feed.post":
112
+
return MissingRecordTypePost
113
+
case "app.bsky.feed.generator":
114
+
return MissingRecordTypeFeedGenerator
115
+
default:
116
+
return MissingRecordTypeUnknown
117
}
118
+
119
}
120
121
// DidToID converts a DID to a database ID
+8
-8
handlers.go
+8
-8
handlers.go
···
146
}
147
148
if profile.Raw == nil || len(profile.Raw) == 0 {
149
-
s.addMissingProfile(ctx, accdid)
150
return e.JSON(404, map[string]any{
151
"error": "missing profile info for user",
152
})
···
307
}
308
309
if profile.Raw == nil || len(profile.Raw) == 0 {
310
-
s.addMissingProfile(ctx, r.Did)
311
return &authorInfo{
312
Handle: resp.Handle.String(),
313
Did: r.Did,
···
379
380
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey)
381
if len(p.Raw) == 0 || p.NotFound {
382
-
s.addMissingPost(ctx, uri)
383
posts[ix] = postResponse{
384
Uri: uri,
385
Missing: true,
···
515
quotedPost, err := s.backend.GetPostByUri(ctx, quotedURI, "*")
516
if err != nil {
517
slog.Warn("failed to get quoted post", "uri", quotedURI, "error", err)
518
-
s.addMissingPost(ctx, quotedURI)
519
return s.buildQuoteFallback(quotedURI, quotedCid)
520
}
521
522
if quotedPost == nil || quotedPost.Raw == nil || len(quotedPost.Raw) == 0 || quotedPost.NotFound {
523
-
s.addMissingPost(ctx, quotedURI)
524
return s.buildQuoteFallback(quotedURI, quotedCid)
525
}
526
···
707
prof = &p
708
}
709
} else {
710
-
s.addMissingProfile(ctx, r.Did)
711
}
712
713
users = append(users, engagementUser{
···
767
prof = &p
768
}
769
} else {
770
-
s.addMissingProfile(ctx, r.Did)
771
}
772
773
users = append(users, engagementUser{
···
835
prof = &p
836
}
837
} else {
838
-
s.addMissingProfile(ctx, r.Did)
839
}
840
841
users = append(users, engagementUser{
···
146
}
147
148
if profile.Raw == nil || len(profile.Raw) == 0 {
149
+
s.backend.TrackMissingRecord(accdid, false)
150
return e.JSON(404, map[string]any{
151
"error": "missing profile info for user",
152
})
···
307
}
308
309
if profile.Raw == nil || len(profile.Raw) == 0 {
310
+
s.backend.TrackMissingRecord(r.Did, false)
311
return &authorInfo{
312
Handle: resp.Handle.String(),
313
Did: r.Did,
···
379
380
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey)
381
if len(p.Raw) == 0 || p.NotFound {
382
+
s.backend.TrackMissingRecord(uri, false)
383
posts[ix] = postResponse{
384
Uri: uri,
385
Missing: true,
···
515
quotedPost, err := s.backend.GetPostByUri(ctx, quotedURI, "*")
516
if err != nil {
517
slog.Warn("failed to get quoted post", "uri", quotedURI, "error", err)
518
+
s.backend.TrackMissingRecord(quotedURI, false)
519
return s.buildQuoteFallback(quotedURI, quotedCid)
520
}
521
522
if quotedPost == nil || quotedPost.Raw == nil || len(quotedPost.Raw) == 0 || quotedPost.NotFound {
523
+
s.backend.TrackMissingRecord(quotedURI, false)
524
return s.buildQuoteFallback(quotedURI, quotedCid)
525
}
526
···
707
prof = &p
708
}
709
} else {
710
+
s.backend.TrackMissingRecord(r.Did, false)
711
}
712
713
users = append(users, engagementUser{
···
767
prof = &p
768
}
769
} else {
770
+
s.backend.TrackMissingRecord(r.Did, false)
771
}
772
773
users = append(users, engagementUser{
···
835
prof = &p
836
}
837
} else {
838
+
s.backend.TrackMissingRecord(r.Did, false)
839
}
840
841
users = append(users, engagementUser{
+3
hydration/post.go
+3
hydration/post.go
···
388
389
// hydrateEmbeddedRecord hydrates an embedded record (for quote posts, etc.)
390
func (h *Hydrator) hydrateEmbeddedRecord(ctx context.Context, uri string, viewerDID string) *bsky.EmbedRecord_View_Record {
391
// Check if it's a post URI
392
if !isPostURI(uri) {
393
// Could be a feed generator, list, labeler, or starter pack
···
388
389
// hydrateEmbeddedRecord hydrates an embedded record (for quote posts, etc.)
390
func (h *Hydrator) hydrateEmbeddedRecord(ctx context.Context, uri string, viewerDID string) *bsky.EmbedRecord_View_Record {
391
+
ctx, span := tracer.Start(ctx, "hydrateEmbeddedRecord")
392
+
defer span.End()
393
+
394
// Check if it's a post URI
395
if !isPostURI(uri) {
396
// Could be a feed generator, list, labeler, or starter pack
+3
-7
main.go
+3
-7
main.go
···
200
client: cc,
201
dir: dir,
202
203
-
missingRecords: make(chan MissingRecord, 1024),
204
-
db: db,
205
}
206
fmt.Println("MY DID: ", s.mydid)
207
208
-
pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, nil)
209
if err != nil {
210
return err
211
}
···
242
http.ListenAndServe(":4445", nil)
243
}()
244
245
-
go s.missingRecordFetcher()
246
-
247
seqno, err := loadLastSeq(db, "firehose_seq")
248
if err != nil {
249
fmt.Println("failed to load sequence number, starting over", err)
···
267
seqLk sync.Mutex
268
lastSeq int64
269
270
-
mpLk sync.Mutex
271
-
missingRecords chan MissingRecord
272
273
db *gorm.DB
274
}
···
200
client: cc,
201
dir: dir,
202
203
+
db: db,
204
}
205
fmt.Println("MY DID: ", s.mydid)
206
207
+
pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, dir)
208
if err != nil {
209
return err
210
}
···
241
http.ListenAndServe(":4445", nil)
242
}()
243
244
seqno, err := loadLastSeq(db, "firehose_seq")
245
if err != nil {
246
fmt.Println("failed to load sequence number, starting over", err)
···
264
seqLk sync.Mutex
265
lastSeq int64
266
267
+
mpLk sync.Mutex
268
269
db *gorm.DB
270
}
+24
-47
missing.go
backend/missing.go
+24
-47
missing.go
backend/missing.go
···
1
-
package main
2
3
import (
4
"bytes"
···
19
MissingRecordTypeProfile MissingRecordType = "profile"
20
MissingRecordTypePost MissingRecordType = "post"
21
MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22
)
23
24
type MissingRecord struct {
···
29
waitch chan struct{}
30
}
31
32
-
func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) {
33
if rec.Wait {
34
rec.waitch = make(chan struct{})
35
}
36
37
select {
38
-
case s.missingRecords <- rec:
39
case <-ctx.Done():
40
}
41
···
47
}
48
}
49
50
-
// Legacy methods for backward compatibility
51
-
func (s *Server) addMissingProfile(ctx context.Context, did string) {
52
-
s.addMissingRecord(ctx, MissingRecord{
53
-
Type: MissingRecordTypeProfile,
54
-
Identifier: did,
55
-
})
56
-
}
57
-
58
-
func (s *Server) addMissingPost(ctx context.Context, uri string) {
59
-
slog.Info("adding missing post to fetch queue", "uri", uri)
60
-
s.addMissingRecord(ctx, MissingRecord{
61
-
Type: MissingRecordTypePost,
62
-
Identifier: uri,
63
-
})
64
-
}
65
-
66
-
func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) {
67
-
slog.Info("adding missing feed generator to fetch queue", "uri", uri)
68
-
s.addMissingRecord(ctx, MissingRecord{
69
-
Type: MissingRecordTypeFeedGenerator,
70
-
Identifier: uri,
71
-
})
72
-
}
73
-
74
-
func (s *Server) missingRecordFetcher() {
75
-
for rec := range s.missingRecords {
76
var err error
77
switch rec.Type {
78
case MissingRecordTypeProfile:
79
-
err = s.fetchMissingProfile(context.TODO(), rec.Identifier)
80
case MissingRecordTypePost:
81
-
err = s.fetchMissingPost(context.TODO(), rec.Identifier)
82
case MissingRecordTypeFeedGenerator:
83
-
err = s.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
84
default:
85
slog.Error("unknown missing record type", "type", rec.Type)
86
continue
···
96
}
97
}
98
99
-
func (s *Server) fetchMissingProfile(ctx context.Context, did string) error {
100
-
s.backend.AddRelevantDid(did)
101
102
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
103
if err != nil {
104
return err
105
}
106
107
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
108
if err != nil {
109
return err
110
}
···
133
return err
134
}
135
136
-
return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
137
}
138
139
-
func (s *Server) fetchMissingPost(ctx context.Context, uri string) error {
140
puri, err := syntax.ParseATURI(uri)
141
if err != nil {
142
return fmt.Errorf("invalid AT URI: %s", uri)
···
146
collection := puri.Collection().String()
147
rkey := puri.RecordKey().String()
148
149
-
s.backend.AddRelevantDid(did)
150
151
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
152
if err != nil {
153
return err
154
}
155
156
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
157
if err != nil {
158
return err
159
}
···
182
return err
183
}
184
185
-
return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
186
}
187
188
-
func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
189
puri, err := syntax.ParseATURI(uri)
190
if err != nil {
191
return fmt.Errorf("invalid AT URI: %s", uri)
···
194
did := puri.Authority().String()
195
collection := puri.Collection().String()
196
rkey := puri.RecordKey().String()
197
-
s.backend.AddRelevantDid(did)
198
199
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
200
if err != nil {
201
return err
202
}
203
204
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
205
if err != nil {
206
return err
207
}
···
230
return err
231
}
232
233
-
return s.backend.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
234
}
···
1
+
package backend
2
3
import (
4
"bytes"
···
19
MissingRecordTypeProfile MissingRecordType = "profile"
20
MissingRecordTypePost MissingRecordType = "post"
21
MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22
+
MissingRecordTypeUnknown MissingRecordType = "unknown"
23
)
24
25
type MissingRecord struct {
···
30
waitch chan struct{}
31
}
32
33
+
func (b *PostgresBackend) addMissingRecord(ctx context.Context, rec MissingRecord) {
34
if rec.Wait {
35
rec.waitch = make(chan struct{})
36
}
37
38
select {
39
+
case b.missingRecords <- rec:
40
case <-ctx.Done():
41
}
42
···
48
}
49
}
50
51
+
func (b *PostgresBackend) missingRecordFetcher() {
52
+
for rec := range b.missingRecords {
53
var err error
54
switch rec.Type {
55
case MissingRecordTypeProfile:
56
+
err = b.fetchMissingProfile(context.TODO(), rec.Identifier)
57
case MissingRecordTypePost:
58
+
err = b.fetchMissingPost(context.TODO(), rec.Identifier)
59
case MissingRecordTypeFeedGenerator:
60
+
err = b.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
61
default:
62
slog.Error("unknown missing record type", "type", rec.Type)
63
continue
···
73
}
74
}
75
76
+
func (b *PostgresBackend) fetchMissingProfile(ctx context.Context, did string) error {
77
+
b.AddRelevantDid(did)
78
79
+
repo, err := b.GetOrCreateRepo(ctx, did)
80
if err != nil {
81
return err
82
}
83
84
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
85
if err != nil {
86
return err
87
}
···
110
return err
111
}
112
113
+
return b.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
114
}
115
116
+
func (b *PostgresBackend) fetchMissingPost(ctx context.Context, uri string) error {
117
puri, err := syntax.ParseATURI(uri)
118
if err != nil {
119
return fmt.Errorf("invalid AT URI: %s", uri)
···
123
collection := puri.Collection().String()
124
rkey := puri.RecordKey().String()
125
126
+
b.AddRelevantDid(did)
127
128
+
repo, err := b.GetOrCreateRepo(ctx, did)
129
if err != nil {
130
return err
131
}
132
133
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
134
if err != nil {
135
return err
136
}
···
159
return err
160
}
161
162
+
return b.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
163
}
164
165
+
func (b *PostgresBackend) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
166
puri, err := syntax.ParseATURI(uri)
167
if err != nil {
168
return fmt.Errorf("invalid AT URI: %s", uri)
···
171
did := puri.Authority().String()
172
collection := puri.Collection().String()
173
rkey := puri.RecordKey().String()
174
+
b.AddRelevantDid(did)
175
176
+
repo, err := b.GetOrCreateRepo(ctx, did)
177
if err != nil {
178
return err
179
}
180
181
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
182
if err != nil {
183
return err
184
}
···
207
return err
208
}
209
210
+
return b.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
211
}
+12
xrpc/notification/listNotifications.go
+12
xrpc/notification/listNotifications.go
···
131
cursorPtr = &cursor
132
}
133
134
+
var lastSeen time.Time
135
+
if err := db.Raw("SELECT seen_at FROM notification_seens WHERE repo = (select id from repos where did = ?)", viewer).Scan(&lastSeen).Error; err != nil {
136
+
return err
137
+
}
138
+
139
+
var lastSeenStr *string
140
+
if !lastSeen.IsZero() {
141
+
s := lastSeen.Format(time.RFC3339)
142
+
lastSeenStr = &s
143
+
}
144
+
145
output := &bsky.NotificationListNotifications_Output{
146
Notifications: notifications,
147
Cursor: cursorPtr,
148
+
SeenAt: lastSeenStr,
149
}
150
151
return c.JSON(http.StatusOK, output)