+38
README.md
+38
README.md
···
201
201
202
202
It will take a minute but it should pull all records from that user.
203
203
204
+
## Upstream Firehose Configuration
205
+
206
+
Konbini supports both standard firehose endpoints as well as jetstream. If
207
+
bandwidth and CPU usage is a concern, and you trust the jetstream endpoint,
208
+
then it may be worth trying that out.
209
+
210
+
The configuration file is formatted as follows:
211
+
212
+
```json
213
+
{
214
+
"backends": [
215
+
{
216
+
"type": "jetstream",
217
+
"host": "jetstream1.us-west.bsky.network"
218
+
}
219
+
]
220
+
}
221
+
```
222
+
223
+
The default (implicit) configuration file looks like this:
224
+
225
+
```json
226
+
{
227
+
"backends": [
228
+
{
229
+
"type": "firehose",
230
+
"host": "bsky.network"
231
+
}
232
+
]
233
+
}
234
+
```
235
+
236
+
Note that this is an array of backends, you can specify multiple upstreams, and
237
+
konbini will read from all of them. The main intended purpose of this is to be
238
+
able to subscribe directly to PDSs. PDSs currently only support the full
239
+
firehose endpoint, not jetstream, so be sure to specify a type of "firehose"
240
+
for individual PDS endpoints.
241
+
204
242
## License
205
243
206
244
MIT (whyrusleeping)
+55
-7
backend/backend.go
+55
-7
backend/backend.go
···
10
10
11
11
"github.com/bluesky-social/indigo/api/atproto"
12
12
"github.com/bluesky-social/indigo/api/bsky"
13
+
"github.com/bluesky-social/indigo/atproto/identity"
13
14
"github.com/bluesky-social/indigo/atproto/syntax"
14
15
"github.com/bluesky-social/indigo/util"
15
16
"github.com/bluesky-social/indigo/xrpc"
···
26
27
27
28
// PostgresBackend handles database operations
28
29
type PostgresBackend struct {
29
-
db *gorm.DB
30
-
pgx *pgxpool.Pool
31
-
tracker RecordTracker
30
+
db *gorm.DB
31
+
pgx *pgxpool.Pool
32
+
33
+
dir identity.Directory
32
34
33
35
client *xrpc.Client
34
36
···
43
45
repoCache *lru.TwoQueueCache[string, *Repo]
44
46
reposLk sync.Mutex
45
47
48
+
didByIDCache *lru.TwoQueueCache[uint, string]
49
+
46
50
postInfoCache *lru.TwoQueueCache[string, cachedPostInfo]
51
+
52
+
missingRecords chan MissingRecord
47
53
}
48
54
49
55
type cachedPostInfo struct {
···
52
58
}
53
59
54
60
// NewPostgresBackend creates a new PostgresBackend
55
-
func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, tracker RecordTracker) (*PostgresBackend, error) {
61
+
func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, dir identity.Directory) (*PostgresBackend, error) {
56
62
rc, _ := lru.New2Q[string, *Repo](1_000_000)
57
63
pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000)
58
64
revc, _ := lru.New2Q[uint, string](1_000_000)
65
+
dbic, _ := lru.New2Q[uint, string](1_000_000)
59
66
60
67
b := &PostgresBackend{
61
68
client: client,
62
69
mydid: mydid,
63
70
db: db,
64
71
pgx: pgx,
65
-
tracker: tracker,
66
72
relevantDids: make(map[string]bool),
67
73
repoCache: rc,
68
74
postInfoCache: pc,
69
75
revCache: revc,
76
+
didByIDCache: dbic,
77
+
dir: dir,
78
+
79
+
missingRecords: make(chan MissingRecord, 1000),
70
80
}
71
81
72
82
r, err := b.GetOrCreateRepo(context.TODO(), mydid)
···
75
85
}
76
86
77
87
b.myrepo = r
88
+
89
+
go b.missingRecordFetcher()
78
90
return b, nil
79
91
}
80
92
81
93
// TrackMissingRecord implements the RecordTracker interface
82
94
func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) {
83
-
if b.tracker != nil {
84
-
b.tracker.TrackMissingRecord(identifier, wait)
95
+
mr := MissingRecord{
96
+
Type: mrTypeFromIdent(identifier),
97
+
Identifier: identifier,
98
+
Wait: wait,
99
+
}
100
+
101
+
b.addMissingRecord(context.TODO(), mr)
102
+
}
103
+
104
+
func mrTypeFromIdent(ident string) MissingRecordType {
105
+
if strings.HasPrefix(ident, "did:") {
106
+
return MissingRecordTypeProfile
107
+
}
108
+
109
+
puri, _ := syntax.ParseATURI(ident)
110
+
switch puri.Collection().String() {
111
+
case "app.bsky.feed.post":
112
+
return MissingRecordTypePost
113
+
case "app.bsky.feed.generator":
114
+
return MissingRecordTypeFeedGenerator
115
+
default:
116
+
return MissingRecordTypeUnknown
85
117
}
118
+
86
119
}
87
120
88
121
// DidToID converts a DID to a database ID
···
363
396
}
364
397
365
398
return &r, nil
399
+
}
400
+
401
+
func (b *PostgresBackend) DidFromID(ctx context.Context, uid uint) (string, error) {
402
+
val, ok := b.didByIDCache.Get(uid)
403
+
if ok {
404
+
return val, nil
405
+
}
406
+
407
+
r, err := b.GetRepoByID(ctx, uid)
408
+
if err != nil {
409
+
return "", err
410
+
}
411
+
412
+
b.didByIDCache.Add(uid, r.Did)
413
+
return r.Did, nil
366
414
}
367
415
368
416
func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
+1208
backend/events.go
+1208
backend/events.go
···
1
+
package backend
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"encoding/json"
7
+
"fmt"
8
+
"log/slog"
9
+
"strings"
10
+
"time"
11
+
12
+
"github.com/bluesky-social/indigo/api/atproto"
13
+
"github.com/bluesky-social/indigo/api/bsky"
14
+
"github.com/bluesky-social/indigo/atproto/syntax"
15
+
lexutil "github.com/bluesky-social/indigo/lex/util"
16
+
"github.com/bluesky-social/indigo/repo"
17
+
jsmodels "github.com/bluesky-social/jetstream/pkg/models"
18
+
"github.com/ipfs/go-cid"
19
+
"github.com/jackc/pgx/v5/pgconn"
20
+
"github.com/prometheus/client_golang/prometheus"
21
+
"github.com/prometheus/client_golang/prometheus/promauto"
22
+
23
+
. "github.com/whyrusleeping/konbini/models"
24
+
)
25
+
26
+
var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
27
+
Name: "handle_op_duration",
28
+
Help: "A histogram of op handling durations",
29
+
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
30
+
}, []string{"op", "collection"})
31
+
32
+
func (b *PostgresBackend) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error {
33
+
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
34
+
if err != nil {
35
+
return fmt.Errorf("failed to read event repo: %w", err)
36
+
}
37
+
38
+
for _, op := range evt.Ops {
39
+
switch op.Action {
40
+
case "create":
41
+
c, rec, err := r.GetRecordBytes(ctx, op.Path)
42
+
if err != nil {
43
+
return err
44
+
}
45
+
if err := b.HandleCreate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
46
+
return fmt.Errorf("create record failed: %w", err)
47
+
}
48
+
case "update":
49
+
c, rec, err := r.GetRecordBytes(ctx, op.Path)
50
+
if err != nil {
51
+
return err
52
+
}
53
+
if err := b.HandleUpdate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
54
+
return fmt.Errorf("update record failed: %w", err)
55
+
}
56
+
case "delete":
57
+
if err := b.HandleDelete(ctx, evt.Repo, evt.Rev, op.Path); err != nil {
58
+
return fmt.Errorf("delete record failed: %w", err)
59
+
}
60
+
}
61
+
}
62
+
63
+
// TODO: sync with the Since field to make sure we don't miss events we care about
64
+
/*
65
+
if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil {
66
+
return fmt.Errorf("failed to update rev: %w", err)
67
+
}
68
+
*/
69
+
70
+
return nil
71
+
}
72
+
73
+
func cborBytesFromEvent(evt *jsmodels.Event) ([]byte, error) {
74
+
val, err := lexutil.NewFromType(evt.Commit.Collection)
75
+
if err != nil {
76
+
return nil, fmt.Errorf("failed to load event record type: %w", err)
77
+
}
78
+
79
+
if err := json.Unmarshal(evt.Commit.Record, val); err != nil {
80
+
return nil, err
81
+
}
82
+
83
+
cval, ok := val.(lexutil.CBOR)
84
+
if !ok {
85
+
return nil, fmt.Errorf("decoded type was not cbor marshalable")
86
+
}
87
+
88
+
buf := new(bytes.Buffer)
89
+
if err := cval.MarshalCBOR(buf); err != nil {
90
+
return nil, fmt.Errorf("failed to marshal event to cbor: %w", err)
91
+
}
92
+
93
+
rec := buf.Bytes()
94
+
return rec, nil
95
+
}
96
+
97
+
func (b *PostgresBackend) HandleEventJetstream(ctx context.Context, evt *jsmodels.Event) error {
98
+
99
+
path := evt.Commit.Collection + "/" + evt.Commit.RKey
100
+
switch evt.Commit.Operation {
101
+
case jsmodels.CommitOperationCreate:
102
+
rec, err := cborBytesFromEvent(evt)
103
+
if err != nil {
104
+
return err
105
+
}
106
+
107
+
c, err := cid.Decode(evt.Commit.CID)
108
+
if err != nil {
109
+
return err
110
+
}
111
+
112
+
if err := b.HandleCreate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil {
113
+
return fmt.Errorf("create record failed: %w", err)
114
+
}
115
+
case jsmodels.CommitOperationUpdate:
116
+
rec, err := cborBytesFromEvent(evt)
117
+
if err != nil {
118
+
return err
119
+
}
120
+
121
+
c, err := cid.Decode(evt.Commit.CID)
122
+
if err != nil {
123
+
return err
124
+
}
125
+
126
+
if err := b.HandleUpdate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil {
127
+
return fmt.Errorf("update record failed: %w", err)
128
+
}
129
+
case jsmodels.CommitOperationDelete:
130
+
if err := b.HandleDelete(ctx, evt.Did, evt.Commit.Rev, path); err != nil {
131
+
return fmt.Errorf("delete record failed: %w", err)
132
+
}
133
+
}
134
+
135
+
return nil
136
+
}
137
+
138
+
func (b *PostgresBackend) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
139
+
start := time.Now()
140
+
141
+
rr, err := b.GetOrCreateRepo(ctx, repo)
142
+
if err != nil {
143
+
return fmt.Errorf("get user failed: %w", err)
144
+
}
145
+
146
+
lrev, err := b.revForRepo(rr)
147
+
if err != nil {
148
+
return err
149
+
}
150
+
if lrev != "" {
151
+
if rev < lrev {
152
+
slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
153
+
return nil
154
+
}
155
+
}
156
+
157
+
parts := strings.Split(path, "/")
158
+
if len(parts) != 2 {
159
+
return fmt.Errorf("invalid path in HandleCreate: %q", path)
160
+
}
161
+
col := parts[0]
162
+
rkey := parts[1]
163
+
164
+
defer func() {
165
+
handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
166
+
}()
167
+
168
+
if rkey == "" {
169
+
fmt.Printf("messed up path: %q\n", rkey)
170
+
}
171
+
172
+
switch col {
173
+
case "app.bsky.feed.post":
174
+
if err := b.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
175
+
return err
176
+
}
177
+
case "app.bsky.feed.like":
178
+
if err := b.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
179
+
return err
180
+
}
181
+
case "app.bsky.feed.repost":
182
+
if err := b.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
183
+
return err
184
+
}
185
+
case "app.bsky.graph.follow":
186
+
if err := b.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
187
+
return err
188
+
}
189
+
case "app.bsky.graph.block":
190
+
if err := b.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
191
+
return err
192
+
}
193
+
case "app.bsky.graph.list":
194
+
if err := b.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
195
+
return err
196
+
}
197
+
case "app.bsky.graph.listitem":
198
+
if err := b.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
199
+
return err
200
+
}
201
+
case "app.bsky.graph.listblock":
202
+
if err := b.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
203
+
return err
204
+
}
205
+
case "app.bsky.actor.profile":
206
+
if err := b.HandleCreateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
207
+
return err
208
+
}
209
+
case "app.bsky.feed.generator":
210
+
if err := b.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
211
+
return err
212
+
}
213
+
case "app.bsky.feed.threadgate":
214
+
if err := b.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
215
+
return err
216
+
}
217
+
case "chat.bsky.actor.declaration":
218
+
if err := b.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
219
+
return err
220
+
}
221
+
case "app.bsky.feed.postgate":
222
+
if err := b.HandleCreatePostGate(ctx, rr, rkey, *rec, *cid); err != nil {
223
+
return err
224
+
}
225
+
case "app.bsky.graph.starterpack":
226
+
if err := b.HandleCreateStarterPack(ctx, rr, rkey, *rec, *cid); err != nil {
227
+
return err
228
+
}
229
+
default:
230
+
slog.Debug("unrecognized record type", "repo", repo, "path", path, "rev", rev)
231
+
}
232
+
233
+
b.revCache.Add(rr.ID, rev)
234
+
return nil
235
+
}
236
+
237
+
func (b *PostgresBackend) HandleCreatePost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
238
+
exists, err := b.checkPostExists(ctx, repo, rkey)
239
+
if err != nil {
240
+
return err
241
+
}
242
+
243
+
// still technically a race condition if two creates for the same post happen concurrently... probably fine
244
+
if exists {
245
+
return nil
246
+
}
247
+
248
+
var rec bsky.FeedPost
249
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
250
+
uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey
251
+
slog.Warn("skipping post with malformed data", "uri", uri, "error", err)
252
+
return nil // Skip this post rather than failing the entire event
253
+
}
254
+
255
+
reldids := []string{repo.Did}
256
+
// care about a post if its in a thread of a user we are interested in
257
+
if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
258
+
reldids = append(reldids, rec.Reply.Parent.Uri, rec.Reply.Root.Uri)
259
+
}
260
+
// TODO: maybe also care if its mentioning a user we care about or quoting a user we care about?
261
+
if !b.anyRelevantIdents(reldids...) {
262
+
return nil
263
+
}
264
+
265
+
uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey
266
+
slog.Warn("adding post", "uri", uri)
267
+
268
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
269
+
if err != nil {
270
+
return fmt.Errorf("invalid timestamp: %w", err)
271
+
}
272
+
273
+
p := Post{
274
+
Created: created.Time(),
275
+
Indexed: time.Now(),
276
+
Author: repo.ID,
277
+
Rkey: rkey,
278
+
Raw: recb,
279
+
Cid: cc.String(),
280
+
}
281
+
282
+
if rec.Reply != nil && rec.Reply.Parent != nil {
283
+
if rec.Reply.Root == nil {
284
+
return fmt.Errorf("post reply had nil root")
285
+
}
286
+
287
+
pinfo, err := b.postInfoForUri(ctx, rec.Reply.Parent.Uri)
288
+
if err != nil {
289
+
return fmt.Errorf("getting reply parent: %w", err)
290
+
}
291
+
292
+
p.ReplyTo = pinfo.ID
293
+
p.ReplyToUsr = pinfo.Author
294
+
295
+
thread, err := b.postIDForUri(ctx, rec.Reply.Root.Uri)
296
+
if err != nil {
297
+
return fmt.Errorf("getting thread root: %w", err)
298
+
}
299
+
300
+
p.InThread = thread
301
+
302
+
r, err := b.GetOrCreateRepo(ctx, b.mydid)
303
+
if err != nil {
304
+
return err
305
+
}
306
+
307
+
if p.ReplyToUsr == r.ID {
308
+
if err := b.AddNotification(ctx, r.ID, p.Author, uri, cc, NotifKindReply); err != nil {
309
+
slog.Warn("failed to create notification", "uri", uri, "error", err)
310
+
}
311
+
}
312
+
}
313
+
314
+
if rec.Embed != nil {
315
+
var rpref string
316
+
if rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
317
+
rpref = rec.Embed.EmbedRecord.Record.Uri
318
+
}
319
+
if rec.Embed.EmbedRecordWithMedia != nil &&
320
+
rec.Embed.EmbedRecordWithMedia.Record != nil &&
321
+
rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
322
+
rpref = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
323
+
}
324
+
325
+
if rpref != "" && strings.Contains(rpref, "app.bsky.feed.post") {
326
+
rp, err := b.postIDForUri(ctx, rpref)
327
+
if err != nil {
328
+
return fmt.Errorf("getting quote subject: %w", err)
329
+
}
330
+
331
+
p.Reposting = rp
332
+
}
333
+
}
334
+
335
+
if err := b.doPostCreate(ctx, &p); err != nil {
336
+
return err
337
+
}
338
+
339
+
// Check for mentions and create notifications
340
+
if rec.Facets != nil {
341
+
for _, facet := range rec.Facets {
342
+
for _, feature := range facet.Features {
343
+
if feature.RichtextFacet_Mention != nil {
344
+
mentionDid := feature.RichtextFacet_Mention.Did
345
+
// This is a mention
346
+
mentionedRepo, err := b.GetOrCreateRepo(ctx, mentionDid)
347
+
if err != nil {
348
+
slog.Warn("failed to get repo for mention", "did", mentionDid, "error", err)
349
+
continue
350
+
}
351
+
352
+
// Create notification if the mentioned user is the current user
353
+
if mentionedRepo.ID == b.myrepo.ID {
354
+
if err := b.AddNotification(ctx, b.myrepo.ID, p.Author, uri, cc, NotifKindMention); err != nil {
355
+
slog.Warn("failed to create mention notification", "uri", uri, "error", err)
356
+
}
357
+
}
358
+
}
359
+
}
360
+
}
361
+
}
362
+
363
+
b.postInfoCache.Add(uri, cachedPostInfo{
364
+
ID: p.ID,
365
+
Author: p.Author,
366
+
})
367
+
368
+
return nil
369
+
}
370
+
371
+
func (b *PostgresBackend) doPostCreate(ctx context.Context, p *Post) error {
372
+
/*
373
+
if err := b.db.Clauses(clause.OnConflict{
374
+
Columns: []clause.Column{{Name: "author"}, {Name: "rkey"}},
375
+
DoUpdates: clause.AssignmentColumns([]string{"cid", "not_found", "raw", "created", "indexed"}),
376
+
}).Create(p).Error; err != nil {
377
+
return err
378
+
}
379
+
*/
380
+
381
+
query := `
382
+
INSERT INTO posts (author, rkey, cid, not_found, raw, created, indexed, reposting, reply_to, reply_to_usr, in_thread)
383
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
384
+
ON CONFLICT (author, rkey)
385
+
DO UPDATE SET
386
+
cid = $3,
387
+
not_found = $4,
388
+
raw = $5,
389
+
created = $6,
390
+
indexed = $7,
391
+
reposting = $8,
392
+
reply_to = $9,
393
+
reply_to_usr = $10,
394
+
in_thread = $11
395
+
RETURNING id
396
+
`
397
+
398
+
// Execute the query with parameters from the Post struct
399
+
if err := b.pgx.QueryRow(
400
+
ctx,
401
+
query,
402
+
p.Author,
403
+
p.Rkey,
404
+
p.Cid,
405
+
p.NotFound,
406
+
p.Raw,
407
+
p.Created,
408
+
p.Indexed,
409
+
p.Reposting,
410
+
p.ReplyTo,
411
+
p.ReplyToUsr,
412
+
p.InThread,
413
+
).Scan(&p.ID); err != nil {
414
+
return err
415
+
}
416
+
417
+
return nil
418
+
}
419
+
420
+
func (b *PostgresBackend) HandleCreateLike(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
421
+
var rec bsky.FeedLike
422
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
423
+
return err
424
+
}
425
+
426
+
if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
427
+
return nil
428
+
}
429
+
430
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
431
+
if err != nil {
432
+
return fmt.Errorf("invalid timestamp: %w", err)
433
+
}
434
+
435
+
pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri)
436
+
if err != nil {
437
+
return fmt.Errorf("getting like subject: %w", err)
438
+
}
439
+
440
+
if _, err := b.pgx.Exec(ctx, `INSERT INTO "likes" ("created","indexed","author","rkey","subject","cid") VALUES ($1, $2, $3, $4, $5, $6)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID, cc.String()); err != nil {
441
+
pgErr, ok := err.(*pgconn.PgError)
442
+
if ok && pgErr.Code == "23505" {
443
+
return nil
444
+
}
445
+
return err
446
+
}
447
+
448
+
// Create notification if the liked post belongs to the current user
449
+
if pinfo.Author == b.myrepo.ID {
450
+
uri := fmt.Sprintf("at://%s/app.bsky.feed.like/%s", repo.Did, rkey)
451
+
if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindLike); err != nil {
452
+
slog.Warn("failed to create like notification", "uri", uri, "error", err)
453
+
}
454
+
}
455
+
456
+
return nil
457
+
}
458
+
459
+
func (b *PostgresBackend) HandleCreateRepost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
460
+
var rec bsky.FeedRepost
461
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
462
+
return err
463
+
}
464
+
465
+
if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
466
+
return nil
467
+
}
468
+
469
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
470
+
if err != nil {
471
+
return fmt.Errorf("invalid timestamp: %w", err)
472
+
}
473
+
474
+
pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri)
475
+
if err != nil {
476
+
return fmt.Errorf("getting repost subject: %w", err)
477
+
}
478
+
479
+
if _, err := b.pgx.Exec(ctx, `INSERT INTO "reposts" ("created","indexed","author","rkey","subject") VALUES ($1, $2, $3, $4, $5)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID); err != nil {
480
+
pgErr, ok := err.(*pgconn.PgError)
481
+
if ok && pgErr.Code == "23505" {
482
+
return nil
483
+
}
484
+
return err
485
+
}
486
+
487
+
// Create notification if the reposted post belongs to the current user
488
+
if pinfo.Author == b.myrepo.ID {
489
+
uri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", repo.Did, rkey)
490
+
if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindRepost); err != nil {
491
+
slog.Warn("failed to create repost notification", "uri", uri, "error", err)
492
+
}
493
+
}
494
+
495
+
return nil
496
+
}
497
+
498
+
func (b *PostgresBackend) HandleCreateFollow(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
499
+
var rec bsky.GraphFollow
500
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
501
+
return err
502
+
}
503
+
504
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
505
+
return nil
506
+
}
507
+
508
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
509
+
if err != nil {
510
+
return fmt.Errorf("invalid timestamp: %w", err)
511
+
}
512
+
513
+
subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
514
+
if err != nil {
515
+
return err
516
+
}
517
+
518
+
if _, err := b.pgx.Exec(ctx, "INSERT INTO follows (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", created.Time(), time.Now(), repo.ID, rkey, subj.ID); err != nil {
519
+
return err
520
+
}
521
+
522
+
return nil
523
+
}
524
+
525
+
func (b *PostgresBackend) HandleCreateBlock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
526
+
var rec bsky.GraphBlock
527
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
528
+
return err
529
+
}
530
+
531
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
532
+
return nil
533
+
}
534
+
535
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
536
+
if err != nil {
537
+
return fmt.Errorf("invalid timestamp: %w", err)
538
+
}
539
+
540
+
subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
541
+
if err != nil {
542
+
return err
543
+
}
544
+
545
+
if err := b.db.Create(&Block{
546
+
Created: created.Time(),
547
+
Indexed: time.Now(),
548
+
Author: repo.ID,
549
+
Rkey: rkey,
550
+
Subject: subj.ID,
551
+
}).Error; err != nil {
552
+
return err
553
+
}
554
+
555
+
return nil
556
+
}
557
+
558
+
func (b *PostgresBackend) HandleCreateList(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
559
+
var rec bsky.GraphList
560
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
561
+
return err
562
+
}
563
+
564
+
if !b.anyRelevantIdents(repo.Did) {
565
+
return nil
566
+
}
567
+
568
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
569
+
if err != nil {
570
+
return fmt.Errorf("invalid timestamp: %w", err)
571
+
}
572
+
573
+
if err := b.db.Create(&List{
574
+
Created: created.Time(),
575
+
Indexed: time.Now(),
576
+
Author: repo.ID,
577
+
Rkey: rkey,
578
+
Raw: recb,
579
+
}).Error; err != nil {
580
+
return err
581
+
}
582
+
583
+
return nil
584
+
}
585
+
586
+
func (b *PostgresBackend) HandleCreateListitem(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
587
+
var rec bsky.GraphListitem
588
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
589
+
return err
590
+
}
591
+
if !b.anyRelevantIdents(repo.Did) {
592
+
return nil
593
+
}
594
+
595
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
596
+
if err != nil {
597
+
return fmt.Errorf("invalid timestamp: %w", err)
598
+
}
599
+
600
+
subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
601
+
if err != nil {
602
+
return err
603
+
}
604
+
605
+
list, err := b.GetOrCreateList(ctx, rec.List)
606
+
if err != nil {
607
+
return err
608
+
}
609
+
610
+
if err := b.db.Create(&ListItem{
611
+
Created: created.Time(),
612
+
Indexed: time.Now(),
613
+
Author: repo.ID,
614
+
Rkey: rkey,
615
+
Subject: subj.ID,
616
+
List: list.ID,
617
+
}).Error; err != nil {
618
+
return err
619
+
}
620
+
621
+
return nil
622
+
}
623
+
624
+
func (b *PostgresBackend) HandleCreateListblock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
625
+
var rec bsky.GraphListblock
626
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
627
+
return err
628
+
}
629
+
630
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
631
+
return nil
632
+
}
633
+
634
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
635
+
if err != nil {
636
+
return fmt.Errorf("invalid timestamp: %w", err)
637
+
}
638
+
639
+
list, err := b.GetOrCreateList(ctx, rec.Subject)
640
+
if err != nil {
641
+
return err
642
+
}
643
+
644
+
if err := b.db.Create(&ListBlock{
645
+
Created: created.Time(),
646
+
Indexed: time.Now(),
647
+
Author: repo.ID,
648
+
Rkey: rkey,
649
+
List: list.ID,
650
+
}).Error; err != nil {
651
+
return err
652
+
}
653
+
654
+
return nil
655
+
}
656
+
657
+
func (b *PostgresBackend) HandleCreateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
658
+
if !b.anyRelevantIdents(repo.Did) {
659
+
return nil
660
+
}
661
+
662
+
if err := b.db.Create(&Profile{
663
+
//Created: created.Time(),
664
+
Indexed: time.Now(),
665
+
Repo: repo.ID,
666
+
Raw: recb,
667
+
Rev: rev,
668
+
}).Error; err != nil {
669
+
return err
670
+
}
671
+
672
+
return nil
673
+
}
674
+
675
+
func (b *PostgresBackend) HandleUpdateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
676
+
if !b.anyRelevantIdents(repo.Did) {
677
+
return nil
678
+
}
679
+
680
+
if err := b.db.Create(&Profile{
681
+
Indexed: time.Now(),
682
+
Repo: repo.ID,
683
+
Raw: recb,
684
+
Rev: rev,
685
+
}).Error; err != nil {
686
+
return err
687
+
}
688
+
689
+
return nil
690
+
}
691
+
692
+
func (b *PostgresBackend) HandleCreateFeedGenerator(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
693
+
if !b.anyRelevantIdents(repo.Did) {
694
+
return nil
695
+
}
696
+
697
+
var rec bsky.FeedGenerator
698
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
699
+
return err
700
+
}
701
+
702
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
703
+
if err != nil {
704
+
return fmt.Errorf("invalid timestamp: %w", err)
705
+
}
706
+
707
+
if err := b.db.Create(&FeedGenerator{
708
+
Created: created.Time(),
709
+
Indexed: time.Now(),
710
+
Author: repo.ID,
711
+
Rkey: rkey,
712
+
Did: rec.Did,
713
+
Raw: recb,
714
+
}).Error; err != nil {
715
+
return err
716
+
}
717
+
718
+
return nil
719
+
}
720
+
721
+
func (b *PostgresBackend) HandleCreateThreadgate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
722
+
if !b.anyRelevantIdents(repo.Did) {
723
+
return nil
724
+
}
725
+
var rec bsky.FeedThreadgate
726
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
727
+
return err
728
+
}
729
+
730
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
731
+
if err != nil {
732
+
return fmt.Errorf("invalid timestamp: %w", err)
733
+
}
734
+
735
+
pid, err := b.postIDForUri(ctx, rec.Post)
736
+
if err != nil {
737
+
return err
738
+
}
739
+
740
+
if err := b.db.Create(&ThreadGate{
741
+
Created: created.Time(),
742
+
Indexed: time.Now(),
743
+
Author: repo.ID,
744
+
Rkey: rkey,
745
+
Post: pid,
746
+
}).Error; err != nil {
747
+
return err
748
+
}
749
+
750
+
return nil
751
+
}
752
+
753
+
func (b *PostgresBackend) HandleCreateChatDeclaration(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
754
+
// TODO: maybe track these?
755
+
return nil
756
+
}
757
+
758
+
func (b *PostgresBackend) HandleCreatePostGate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
759
+
if !b.anyRelevantIdents(repo.Did) {
760
+
return nil
761
+
}
762
+
var rec bsky.FeedPostgate
763
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
764
+
return err
765
+
}
766
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
767
+
if err != nil {
768
+
return fmt.Errorf("invalid timestamp: %w", err)
769
+
}
770
+
771
+
refPost, err := b.postInfoForUri(ctx, rec.Post)
772
+
if err != nil {
773
+
return err
774
+
}
775
+
776
+
if err := b.db.Create(&PostGate{
777
+
Created: created.Time(),
778
+
Indexed: time.Now(),
779
+
Author: repo.ID,
780
+
Rkey: rkey,
781
+
Subject: refPost.ID,
782
+
Raw: recb,
783
+
}).Error; err != nil {
784
+
return err
785
+
}
786
+
787
+
return nil
788
+
}
789
+
790
+
func (b *PostgresBackend) HandleCreateStarterPack(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
791
+
if !b.anyRelevantIdents(repo.Did) {
792
+
return nil
793
+
}
794
+
var rec bsky.GraphStarterpack
795
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
796
+
return err
797
+
}
798
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
799
+
if err != nil {
800
+
return fmt.Errorf("invalid timestamp: %w", err)
801
+
}
802
+
803
+
list, err := b.GetOrCreateList(ctx, rec.List)
804
+
if err != nil {
805
+
return err
806
+
}
807
+
808
+
if err := b.db.Create(&StarterPack{
809
+
Created: created.Time(),
810
+
Indexed: time.Now(),
811
+
Author: repo.ID,
812
+
Rkey: rkey,
813
+
Raw: recb,
814
+
List: list.ID,
815
+
}).Error; err != nil {
816
+
return err
817
+
}
818
+
819
+
return nil
820
+
}
821
+
822
+
func (b *PostgresBackend) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
823
+
start := time.Now()
824
+
825
+
rr, err := b.GetOrCreateRepo(ctx, repo)
826
+
if err != nil {
827
+
return fmt.Errorf("get user failed: %w", err)
828
+
}
829
+
830
+
lrev, err := b.revForRepo(rr)
831
+
if err != nil {
832
+
return err
833
+
}
834
+
if lrev != "" {
835
+
if rev < lrev {
836
+
//slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
837
+
return nil
838
+
}
839
+
}
840
+
841
+
parts := strings.Split(path, "/")
842
+
if len(parts) != 2 {
843
+
return fmt.Errorf("invalid path in HandleCreate: %q", path)
844
+
}
845
+
col := parts[0]
846
+
rkey := parts[1]
847
+
848
+
defer func() {
849
+
handleOpHist.WithLabelValues("update", col).Observe(float64(time.Since(start).Milliseconds()))
850
+
}()
851
+
852
+
if rkey == "" {
853
+
fmt.Printf("messed up path: %q\n", rkey)
854
+
}
855
+
856
+
switch col {
857
+
/*
858
+
case "app.bsky.feed.post":
859
+
if err := s.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
860
+
return err
861
+
}
862
+
case "app.bsky.feed.like":
863
+
if err := s.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
864
+
return err
865
+
}
866
+
case "app.bsky.feed.repost":
867
+
if err := s.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
868
+
return err
869
+
}
870
+
case "app.bsky.graph.follow":
871
+
if err := s.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
872
+
return err
873
+
}
874
+
case "app.bsky.graph.block":
875
+
if err := s.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
876
+
return err
877
+
}
878
+
case "app.bsky.graph.list":
879
+
if err := s.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
880
+
return err
881
+
}
882
+
case "app.bsky.graph.listitem":
883
+
if err := s.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
884
+
return err
885
+
}
886
+
case "app.bsky.graph.listblock":
887
+
if err := s.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
888
+
return err
889
+
}
890
+
*/
891
+
case "app.bsky.actor.profile":
892
+
if err := b.HandleUpdateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
893
+
return err
894
+
}
895
+
/*
896
+
case "app.bsky.feed.generator":
897
+
if err := s.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
898
+
return err
899
+
}
900
+
case "app.bsky.feed.threadgate":
901
+
if err := s.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
902
+
return err
903
+
}
904
+
case "chat.bsky.actor.declaration":
905
+
if err := s.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
906
+
return err
907
+
}
908
+
*/
909
+
default:
910
+
slog.Debug("unrecognized record type in update", "repo", repo, "path", path, "rev", rev)
911
+
}
912
+
913
+
return nil
914
+
}
915
+
916
+
func (b *PostgresBackend) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
917
+
start := time.Now()
918
+
919
+
rr, err := b.GetOrCreateRepo(ctx, repo)
920
+
if err != nil {
921
+
return fmt.Errorf("get user failed: %w", err)
922
+
}
923
+
924
+
lrev, ok := b.revCache.Get(rr.ID)
925
+
if ok {
926
+
if rev < lrev {
927
+
//slog.Info("skipping old rev delete", "did", rr.Did, "rev", rev, "oldrev", lrev)
928
+
return nil
929
+
}
930
+
}
931
+
932
+
parts := strings.Split(path, "/")
933
+
if len(parts) != 2 {
934
+
return fmt.Errorf("invalid path in HandleDelete: %q", path)
935
+
}
936
+
col := parts[0]
937
+
rkey := parts[1]
938
+
939
+
defer func() {
940
+
handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
941
+
}()
942
+
943
+
switch col {
944
+
case "app.bsky.feed.post":
945
+
if err := b.HandleDeletePost(ctx, rr, rkey); err != nil {
946
+
return err
947
+
}
948
+
case "app.bsky.feed.like":
949
+
if err := b.HandleDeleteLike(ctx, rr, rkey); err != nil {
950
+
return err
951
+
}
952
+
case "app.bsky.feed.repost":
953
+
if err := b.HandleDeleteRepost(ctx, rr, rkey); err != nil {
954
+
return err
955
+
}
956
+
case "app.bsky.graph.follow":
957
+
if err := b.HandleDeleteFollow(ctx, rr, rkey); err != nil {
958
+
return err
959
+
}
960
+
case "app.bsky.graph.block":
961
+
if err := b.HandleDeleteBlock(ctx, rr, rkey); err != nil {
962
+
return err
963
+
}
964
+
case "app.bsky.graph.list":
965
+
if err := b.HandleDeleteList(ctx, rr, rkey); err != nil {
966
+
return err
967
+
}
968
+
case "app.bsky.graph.listitem":
969
+
if err := b.HandleDeleteListitem(ctx, rr, rkey); err != nil {
970
+
return err
971
+
}
972
+
case "app.bsky.graph.listblock":
973
+
if err := b.HandleDeleteListblock(ctx, rr, rkey); err != nil {
974
+
return err
975
+
}
976
+
case "app.bsky.actor.profile":
977
+
if err := b.HandleDeleteProfile(ctx, rr, rkey); err != nil {
978
+
return err
979
+
}
980
+
case "app.bsky.feed.generator":
981
+
if err := b.HandleDeleteFeedGenerator(ctx, rr, rkey); err != nil {
982
+
return err
983
+
}
984
+
case "app.bsky.feed.threadgate":
985
+
if err := b.HandleDeleteThreadgate(ctx, rr, rkey); err != nil {
986
+
return err
987
+
}
988
+
default:
989
+
slog.Warn("delete unrecognized record type", "repo", repo, "path", path, "rev", rev)
990
+
}
991
+
992
+
b.revCache.Add(rr.ID, rev)
993
+
return nil
994
+
}
995
+
996
+
func (b *PostgresBackend) HandleDeletePost(ctx context.Context, repo *Repo, rkey string) error {
997
+
var p Post
998
+
if err := b.db.Find(&p, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
999
+
return err
1000
+
}
1001
+
1002
+
if p.ID == 0 {
1003
+
//slog.Warn("delete of unknown post record", "repo", repo.Did, "rkey", rkey)
1004
+
return nil
1005
+
}
1006
+
1007
+
if err := b.db.Delete(&Post{}, p.ID).Error; err != nil {
1008
+
return err
1009
+
}
1010
+
1011
+
return nil
1012
+
}
1013
+
1014
+
func (b *PostgresBackend) HandleDeleteLike(ctx context.Context, repo *Repo, rkey string) error {
1015
+
var like Like
1016
+
if err := b.db.Find(&like, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1017
+
return err
1018
+
}
1019
+
1020
+
if like.ID == 0 {
1021
+
//slog.Warn("delete of missing like", "repo", repo.Did, "rkey", rkey)
1022
+
return nil
1023
+
}
1024
+
1025
+
if err := b.db.Exec("DELETE FROM likes WHERE id = ?", like.ID).Error; err != nil {
1026
+
return err
1027
+
}
1028
+
1029
+
return nil
1030
+
}
1031
+
1032
+
func (b *PostgresBackend) HandleDeleteRepost(ctx context.Context, repo *Repo, rkey string) error {
1033
+
var repost Repost
1034
+
if err := b.db.Find(&repost, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1035
+
return err
1036
+
}
1037
+
1038
+
if repost.ID == 0 {
1039
+
//return fmt.Errorf("delete of missing repost: %s %s", repo.Did, rkey)
1040
+
return nil
1041
+
}
1042
+
1043
+
if err := b.db.Exec("DELETE FROM reposts WHERE id = ?", repost.ID).Error; err != nil {
1044
+
return err
1045
+
}
1046
+
1047
+
return nil
1048
+
}
1049
+
1050
+
func (b *PostgresBackend) HandleDeleteFollow(ctx context.Context, repo *Repo, rkey string) error {
1051
+
var follow Follow
1052
+
if err := b.db.Find(&follow, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1053
+
return err
1054
+
}
1055
+
1056
+
if follow.ID == 0 {
1057
+
//slog.Warn("delete of missing follow", "repo", repo.Did, "rkey", rkey)
1058
+
return nil
1059
+
}
1060
+
1061
+
if err := b.db.Exec("DELETE FROM follows WHERE id = ?", follow.ID).Error; err != nil {
1062
+
return err
1063
+
}
1064
+
1065
+
return nil
1066
+
}
1067
+
1068
+
func (b *PostgresBackend) HandleDeleteBlock(ctx context.Context, repo *Repo, rkey string) error {
1069
+
var block Block
1070
+
if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1071
+
return err
1072
+
}
1073
+
1074
+
if block.ID == 0 {
1075
+
//slog.Warn("delete of missing block", "repo", repo.Did, "rkey", rkey)
1076
+
return nil
1077
+
}
1078
+
1079
+
if err := b.db.Exec("DELETE FROM blocks WHERE id = ?", block.ID).Error; err != nil {
1080
+
return err
1081
+
}
1082
+
1083
+
return nil
1084
+
}
1085
+
1086
+
func (b *PostgresBackend) HandleDeleteList(ctx context.Context, repo *Repo, rkey string) error {
1087
+
var list List
1088
+
if err := b.db.Find(&list, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1089
+
return err
1090
+
}
1091
+
1092
+
if list.ID == 0 {
1093
+
return nil
1094
+
//return fmt.Errorf("delete of missing list: %s %s", repo.Did, rkey)
1095
+
}
1096
+
1097
+
if err := b.db.Exec("DELETE FROM lists WHERE id = ?", list.ID).Error; err != nil {
1098
+
return err
1099
+
}
1100
+
1101
+
return nil
1102
+
}
1103
+
1104
+
func (b *PostgresBackend) HandleDeleteListitem(ctx context.Context, repo *Repo, rkey string) error {
1105
+
var item ListItem
1106
+
if err := b.db.Find(&item, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1107
+
return err
1108
+
}
1109
+
1110
+
if item.ID == 0 {
1111
+
return nil
1112
+
//return fmt.Errorf("delete of missing listitem: %s %s", repo.Did, rkey)
1113
+
}
1114
+
1115
+
if err := b.db.Exec("DELETE FROM list_items WHERE id = ?", item.ID).Error; err != nil {
1116
+
return err
1117
+
}
1118
+
1119
+
return nil
1120
+
}
1121
+
1122
+
func (b *PostgresBackend) HandleDeleteListblock(ctx context.Context, repo *Repo, rkey string) error {
1123
+
var block ListBlock
1124
+
if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1125
+
return err
1126
+
}
1127
+
1128
+
if block.ID == 0 {
1129
+
return nil
1130
+
//return fmt.Errorf("delete of missing listblock: %s %s", repo.Did, rkey)
1131
+
}
1132
+
1133
+
if err := b.db.Exec("DELETE FROM list_blocks WHERE id = ?", block.ID).Error; err != nil {
1134
+
return err
1135
+
}
1136
+
1137
+
return nil
1138
+
}
1139
+
1140
+
func (b *PostgresBackend) HandleDeleteFeedGenerator(ctx context.Context, repo *Repo, rkey string) error {
1141
+
var feedgen FeedGenerator
1142
+
if err := b.db.Find(&feedgen, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1143
+
return err
1144
+
}
1145
+
1146
+
if feedgen.ID == 0 {
1147
+
return nil
1148
+
//return fmt.Errorf("delete of missing feedgen: %s %s", repo.Did, rkey)
1149
+
}
1150
+
1151
+
if err := b.db.Exec("DELETE FROM feed_generators WHERE id = ?", feedgen.ID).Error; err != nil {
1152
+
return err
1153
+
}
1154
+
1155
+
return nil
1156
+
}
1157
+
1158
+
func (b *PostgresBackend) HandleDeleteThreadgate(ctx context.Context, repo *Repo, rkey string) error {
1159
+
var threadgate ThreadGate
1160
+
if err := b.db.Find(&threadgate, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1161
+
return err
1162
+
}
1163
+
1164
+
if threadgate.ID == 0 {
1165
+
return nil
1166
+
//return fmt.Errorf("delete of missing threadgate: %s %s", repo.Did, rkey)
1167
+
}
1168
+
1169
+
if err := b.db.Exec("DELETE FROM thread_gates WHERE id = ?", threadgate.ID).Error; err != nil {
1170
+
return err
1171
+
}
1172
+
1173
+
return nil
1174
+
}
1175
+
1176
+
func (b *PostgresBackend) HandleDeleteProfile(ctx context.Context, repo *Repo, rkey string) error {
1177
+
var profile Profile
1178
+
if err := b.db.Find(&profile, "repo = ?", repo.ID).Error; err != nil {
1179
+
return err
1180
+
}
1181
+
1182
+
if profile.ID == 0 {
1183
+
return nil
1184
+
}
1185
+
1186
+
if err := b.db.Exec("DELETE FROM profiles WHERE id = ?", profile.ID).Error; err != nil {
1187
+
return err
1188
+
}
1189
+
1190
+
return nil
1191
+
}
1192
+
1193
+
const (
1194
+
NotifKindReply = "reply"
1195
+
NotifKindLike = "like"
1196
+
NotifKindMention = "mention"
1197
+
NotifKindRepost = "repost"
1198
+
)
1199
+
1200
+
func (b *PostgresBackend) AddNotification(ctx context.Context, forUser, author uint, recordUri string, recordCid cid.Cid, kind string) error {
1201
+
return b.db.Create(&Notification{
1202
+
For: forUser,
1203
+
Author: author,
1204
+
Source: recordUri,
1205
+
SourceCid: recordCid.String(),
1206
+
Kind: kind,
1207
+
}).Error
1208
+
}
+211
backend/missing.go
+211
backend/missing.go
···
1
+
package backend
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"log/slog"
8
+
9
+
"github.com/bluesky-social/indigo/api/atproto"
10
+
"github.com/bluesky-social/indigo/api/bsky"
11
+
"github.com/bluesky-social/indigo/atproto/syntax"
12
+
xrpclib "github.com/bluesky-social/indigo/xrpc"
13
+
"github.com/ipfs/go-cid"
14
+
)
15
+
16
+
type MissingRecordType string
17
+
18
+
const (
19
+
MissingRecordTypeProfile MissingRecordType = "profile"
20
+
MissingRecordTypePost MissingRecordType = "post"
21
+
MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22
+
MissingRecordTypeUnknown MissingRecordType = "unknown"
23
+
)
24
+
25
+
type MissingRecord struct {
26
+
Type MissingRecordType
27
+
Identifier string // DID for profiles, AT-URI for posts/feedgens
28
+
Wait bool
29
+
30
+
waitch chan struct{}
31
+
}
32
+
33
+
func (b *PostgresBackend) addMissingRecord(ctx context.Context, rec MissingRecord) {
34
+
if rec.Wait {
35
+
rec.waitch = make(chan struct{})
36
+
}
37
+
38
+
select {
39
+
case b.missingRecords <- rec:
40
+
case <-ctx.Done():
41
+
}
42
+
43
+
if rec.Wait {
44
+
select {
45
+
case <-rec.waitch:
46
+
case <-ctx.Done():
47
+
}
48
+
}
49
+
}
50
+
51
+
func (b *PostgresBackend) missingRecordFetcher() {
52
+
for rec := range b.missingRecords {
53
+
var err error
54
+
switch rec.Type {
55
+
case MissingRecordTypeProfile:
56
+
err = b.fetchMissingProfile(context.TODO(), rec.Identifier)
57
+
case MissingRecordTypePost:
58
+
err = b.fetchMissingPost(context.TODO(), rec.Identifier)
59
+
case MissingRecordTypeFeedGenerator:
60
+
err = b.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
61
+
default:
62
+
slog.Error("unknown missing record type", "type", rec.Type)
63
+
continue
64
+
}
65
+
66
+
if err != nil {
67
+
slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err)
68
+
}
69
+
70
+
if rec.Wait {
71
+
close(rec.waitch)
72
+
}
73
+
}
74
+
}
75
+
76
+
func (b *PostgresBackend) fetchMissingProfile(ctx context.Context, did string) error {
77
+
b.AddRelevantDid(did)
78
+
79
+
repo, err := b.GetOrCreateRepo(ctx, did)
80
+
if err != nil {
81
+
return err
82
+
}
83
+
84
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
85
+
if err != nil {
86
+
return err
87
+
}
88
+
89
+
c := &xrpclib.Client{
90
+
Host: resp.PDSEndpoint(),
91
+
}
92
+
93
+
rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self")
94
+
if err != nil {
95
+
return err
96
+
}
97
+
98
+
prof, ok := rec.Value.Val.(*bsky.ActorProfile)
99
+
if !ok {
100
+
return fmt.Errorf("record we got back wasnt a profile somehow")
101
+
}
102
+
103
+
buf := new(bytes.Buffer)
104
+
if err := prof.MarshalCBOR(buf); err != nil {
105
+
return err
106
+
}
107
+
108
+
cc, err := cid.Decode(*rec.Cid)
109
+
if err != nil {
110
+
return err
111
+
}
112
+
113
+
return b.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
114
+
}
115
+
116
+
func (b *PostgresBackend) fetchMissingPost(ctx context.Context, uri string) error {
117
+
puri, err := syntax.ParseATURI(uri)
118
+
if err != nil {
119
+
return fmt.Errorf("invalid AT URI: %s", uri)
120
+
}
121
+
122
+
did := puri.Authority().String()
123
+
collection := puri.Collection().String()
124
+
rkey := puri.RecordKey().String()
125
+
126
+
b.AddRelevantDid(did)
127
+
128
+
repo, err := b.GetOrCreateRepo(ctx, did)
129
+
if err != nil {
130
+
return err
131
+
}
132
+
133
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
134
+
if err != nil {
135
+
return err
136
+
}
137
+
138
+
c := &xrpclib.Client{
139
+
Host: resp.PDSEndpoint(),
140
+
}
141
+
142
+
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
143
+
if err != nil {
144
+
return err
145
+
}
146
+
147
+
post, ok := rec.Value.Val.(*bsky.FeedPost)
148
+
if !ok {
149
+
return fmt.Errorf("record we got back wasn't a post somehow")
150
+
}
151
+
152
+
buf := new(bytes.Buffer)
153
+
if err := post.MarshalCBOR(buf); err != nil {
154
+
return err
155
+
}
156
+
157
+
cc, err := cid.Decode(*rec.Cid)
158
+
if err != nil {
159
+
return err
160
+
}
161
+
162
+
return b.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
163
+
}
164
+
165
+
func (b *PostgresBackend) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
166
+
puri, err := syntax.ParseATURI(uri)
167
+
if err != nil {
168
+
return fmt.Errorf("invalid AT URI: %s", uri)
169
+
}
170
+
171
+
did := puri.Authority().String()
172
+
collection := puri.Collection().String()
173
+
rkey := puri.RecordKey().String()
174
+
b.AddRelevantDid(did)
175
+
176
+
repo, err := b.GetOrCreateRepo(ctx, did)
177
+
if err != nil {
178
+
return err
179
+
}
180
+
181
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
182
+
if err != nil {
183
+
return err
184
+
}
185
+
186
+
c := &xrpclib.Client{
187
+
Host: resp.PDSEndpoint(),
188
+
}
189
+
190
+
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
191
+
if err != nil {
192
+
return err
193
+
}
194
+
195
+
feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator)
196
+
if !ok {
197
+
return fmt.Errorf("record we got back wasn't a feed generator somehow")
198
+
}
199
+
200
+
buf := new(bytes.Buffer)
201
+
if err := feedGen.MarshalCBOR(buf); err != nil {
202
+
return err
203
+
}
204
+
205
+
cc, err := cid.Decode(*rec.Cid)
206
+
if err != nil {
207
+
return err
208
+
}
209
+
210
+
return b.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
211
+
}
+6
-5
go.mod
+6
-5
go.mod
···
3
3
go 1.25.1
4
4
5
5
require (
6
-
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f
6
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe
7
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1
7
8
github.com/gorilla/websocket v1.5.1
8
9
github.com/hashicorp/golang-lru/v2 v2.0.7
9
10
github.com/ipfs/go-cid v0.4.1
···
60
61
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
61
62
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
62
63
github.com/ipfs/go-verifcid v0.0.3 // indirect
63
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect
64
+
github.com/ipld/go-car v0.6.2 // indirect
64
65
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
65
66
github.com/ipld/go-ipld-prime v0.21.0 // indirect
66
67
github.com/jackc/pgpassfile v1.0.0 // indirect
···
69
70
github.com/jbenet/goprocess v0.1.4 // indirect
70
71
github.com/jinzhu/inflection v1.0.0 // indirect
71
72
github.com/jinzhu/now v1.1.5 // indirect
72
-
github.com/klauspost/compress v1.17.3 // indirect
73
+
github.com/klauspost/compress v1.17.9 // indirect
73
74
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
74
75
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
75
76
github.com/lestrrat-go/httpcc v1.0.1 // indirect
···
91
92
github.com/orandin/slog-gorm v1.3.2 // indirect
92
93
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect
93
94
github.com/prometheus/client_model v0.6.1 // indirect
94
-
github.com/prometheus/common v0.48.0 // indirect
95
-
github.com/prometheus/procfs v0.12.0 // indirect
95
+
github.com/prometheus/common v0.54.0 // indirect
96
+
github.com/prometheus/procfs v0.15.1 // indirect
96
97
github.com/redis/go-redis/v9 v9.3.0 // indirect
97
98
github.com/russross/blackfriday/v2 v2.1.0 // indirect
98
99
github.com/segmentio/asm v1.2.0 // indirect
+12
-10
go.sum
+12
-10
go.sum
···
6
6
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
7
7
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
8
8
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
9
-
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f h1:FugOoTzh0nCMTWGqNGsjttFWVPcwxaaGD3p/nE9V8qY=
10
-
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f/go.mod h1:n6QE1NDPFoi7PRbMUZmc2y7FibCqiVU4ePpsvhHUBR8=
9
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo=
10
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck=
11
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU=
12
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs=
11
13
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous=
12
14
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c=
13
15
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
···
158
160
github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU=
159
161
github.com/ipfs/go-verifcid v0.0.3 h1:gmRKccqhWDocCRkC+a59g5QW7uJw5bpX9HWBevXa0zs=
160
162
github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw=
161
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI=
162
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA=
163
+
github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc=
164
+
github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8=
163
165
github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4=
164
166
github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo=
165
167
github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc=
···
188
190
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
189
191
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
190
192
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
191
-
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
192
-
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
193
+
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
194
+
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
193
195
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
194
196
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
195
197
github.com/koron/go-ssdp v0.0.3 h1:JivLMY45N76b4p/vsWGOKewBQu6uf39y8l+AQ7sDKx8=
···
312
314
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
313
315
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
314
316
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
315
-
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
316
-
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
317
-
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
318
-
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
317
+
github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8=
318
+
github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ=
319
+
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
320
+
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
319
321
github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA=
320
322
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
321
323
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+8
-8
handlers.go
+8
-8
handlers.go
···
146
146
}
147
147
148
148
if profile.Raw == nil || len(profile.Raw) == 0 {
149
-
s.addMissingProfile(ctx, accdid)
149
+
s.backend.TrackMissingRecord(accdid, false)
150
150
return e.JSON(404, map[string]any{
151
151
"error": "missing profile info for user",
152
152
})
···
307
307
}
308
308
309
309
if profile.Raw == nil || len(profile.Raw) == 0 {
310
-
s.addMissingProfile(ctx, r.Did)
310
+
s.backend.TrackMissingRecord(r.Did, false)
311
311
return &authorInfo{
312
312
Handle: resp.Handle.String(),
313
313
Did: r.Did,
···
379
379
380
380
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey)
381
381
if len(p.Raw) == 0 || p.NotFound {
382
-
s.addMissingPost(ctx, uri)
382
+
s.backend.TrackMissingRecord(uri, false)
383
383
posts[ix] = postResponse{
384
384
Uri: uri,
385
385
Missing: true,
···
515
515
quotedPost, err := s.backend.GetPostByUri(ctx, quotedURI, "*")
516
516
if err != nil {
517
517
slog.Warn("failed to get quoted post", "uri", quotedURI, "error", err)
518
-
s.addMissingPost(ctx, quotedURI)
518
+
s.backend.TrackMissingRecord(quotedURI, false)
519
519
return s.buildQuoteFallback(quotedURI, quotedCid)
520
520
}
521
521
522
522
if quotedPost == nil || quotedPost.Raw == nil || len(quotedPost.Raw) == 0 || quotedPost.NotFound {
523
-
s.addMissingPost(ctx, quotedURI)
523
+
s.backend.TrackMissingRecord(quotedURI, false)
524
524
return s.buildQuoteFallback(quotedURI, quotedCid)
525
525
}
526
526
···
707
707
prof = &p
708
708
}
709
709
} else {
710
-
s.addMissingProfile(ctx, r.Did)
710
+
s.backend.TrackMissingRecord(r.Did, false)
711
711
}
712
712
713
713
users = append(users, engagementUser{
···
767
767
prof = &p
768
768
}
769
769
} else {
770
-
s.addMissingProfile(ctx, r.Did)
770
+
s.backend.TrackMissingRecord(r.Did, false)
771
771
}
772
772
773
773
users = append(users, engagementUser{
···
835
835
prof = &p
836
836
}
837
837
} else {
838
-
s.addMissingProfile(ctx, r.Did)
838
+
s.backend.TrackMissingRecord(r.Did, false)
839
839
}
840
840
841
841
users = append(users, engagementUser{
+16
-10
hydration/post.go
+16
-10
hydration/post.go
···
17
17
18
18
// PostInfo contains hydrated post information
19
19
type PostInfo struct {
20
+
ID uint
20
21
URI string
21
22
Cid string
22
23
Post *bsky.FeedPost
···
39
40
ctx, span := tracer.Start(ctx, "hydratePost")
40
41
defer span.End()
41
42
43
+
p, err := h.backend.GetPostByUri(ctx, uri, "*")
44
+
if err != nil {
45
+
return nil, err
46
+
}
47
+
48
+
return h.HydratePostDB(ctx, uri, p, viewerDID)
49
+
}
50
+
51
+
func (h *Hydrator) HydratePostDB(ctx context.Context, uri string, dbPost *models.Post, viewerDID string) (*PostInfo, error) {
42
52
autoFetch, _ := ctx.Value("auto-fetch").(bool)
43
53
44
54
authorDid := extractDIDFromURI(uri)
···
47
57
return nil, err
48
58
}
49
59
50
-
// Query post from database
51
-
var dbPost models.Post
52
-
if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil {
53
-
return nil, fmt.Errorf("failed to query post: %w", err)
54
-
}
55
-
56
60
if dbPost.NotFound || len(dbPost.Raw) == 0 {
57
61
if autoFetch {
58
62
h.AddMissingRecord(uri, true)
59
-
if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil {
63
+
if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ?`, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil {
60
64
return nil, fmt.Errorf("failed to query post: %w", err)
61
65
}
62
66
if dbPost.NotFound || len(dbPost.Raw) == 0 {
···
75
79
76
80
var wg sync.WaitGroup
77
81
78
-
// Get author DID
79
-
80
-
authorDID := extractDIDFromURI(uri)
82
+
authorDID := r.Did
81
83
82
84
// Get engagement counts
83
85
var likes, reposts, replies int
···
121
123
wg.Wait()
122
124
123
125
info := &PostInfo{
126
+
ID: dbPost.ID,
124
127
URI: uri,
125
128
Cid: dbPost.Cid,
126
129
Post: &feedPost,
···
385
388
386
389
// hydrateEmbeddedRecord hydrates an embedded record (for quote posts, etc.)
387
390
func (h *Hydrator) hydrateEmbeddedRecord(ctx context.Context, uri string, viewerDID string) *bsky.EmbedRecord_View_Record {
391
+
ctx, span := tracer.Start(ctx, "hydrateEmbeddedRecord")
392
+
defer span.End()
393
+
388
394
// Check if it's a post URI
389
395
if !isPostURI(uri) {
390
396
// Could be a feed generator, list, labeler, or starter pack
+10
hydration/utils.go
+10
hydration/utils.go
···
5
5
"fmt"
6
6
7
7
"github.com/bluesky-social/indigo/atproto/syntax"
8
+
"github.com/whyrusleeping/market/models"
8
9
)
9
10
10
11
func (h *Hydrator) NormalizeUri(ctx context.Context, uri string) (string, error) {
···
27
28
28
29
return fmt.Sprintf("at://%s/%s/%s", did, puri.Collection().String(), puri.RecordKey().String()), nil
29
30
}
31
+
32
+
func (h *Hydrator) UriForPost(ctx context.Context, p *models.Post) (string, error) {
33
+
did, err := h.backend.DidFromID(ctx, p.Author)
34
+
if err != nil {
35
+
return "", err
36
+
}
37
+
38
+
return fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, p.Rkey), nil
39
+
}
+40
-103
main.go
+40
-103
main.go
···
3
3
import (
4
4
"bytes"
5
5
"context"
6
+
"encoding/json"
6
7
"fmt"
7
8
"log"
8
9
"log/slog"
···
19
20
"github.com/bluesky-social/indigo/atproto/identity"
20
21
"github.com/bluesky-social/indigo/atproto/identity/redisdir"
21
22
"github.com/bluesky-social/indigo/atproto/syntax"
22
-
"github.com/bluesky-social/indigo/cmd/relay/stream"
23
-
"github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
24
23
"github.com/bluesky-social/indigo/repo"
25
24
"github.com/bluesky-social/indigo/util/cliutil"
26
25
xrpclib "github.com/bluesky-social/indigo/xrpc"
27
-
"github.com/gorilla/websocket"
28
26
"github.com/ipfs/go-cid"
29
27
"github.com/jackc/pgx/v5/pgxpool"
30
28
"github.com/prometheus/client_golang/prometheus"
···
71
69
&cli.StringFlag{
72
70
Name: "redis-url",
73
71
},
72
+
&cli.StringFlag{
73
+
Name: "sync-config",
74
+
},
74
75
}
75
76
app.Action = func(cctx *cli.Context) error {
76
77
db, err := cliutil.SetupDatabase(cctx.String("db-url"), cctx.Int("max-db-connections"))
···
131
132
db.AutoMigrate(StarterPack{})
132
133
db.AutoMigrate(backend.SyncInfo{})
133
134
db.AutoMigrate(Notification{})
135
+
db.AutoMigrate(NotificationSeen{})
134
136
db.AutoMigrate(SequenceTracker{})
135
137
db.Exec("CREATE INDEX IF NOT EXISTS reposts_subject_idx ON reposts (subject)")
136
138
db.Exec("CREATE INDEX IF NOT EXISTS posts_reply_to_idx ON posts (reply_to)")
139
+
db.Exec("CREATE INDEX IF NOT EXISTS posts_in_thread_idx ON posts (in_thread)")
137
140
138
141
ctx := context.TODO()
139
142
···
198
201
client: cc,
199
202
dir: dir,
200
203
201
-
missingRecords: make(chan MissingRecord, 1024),
202
-
db: db,
204
+
db: db,
203
205
}
204
-
fmt.Println("MY DID: ", s.mydid)
205
206
206
-
pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, nil)
207
+
pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, dir)
207
208
if err != nil {
208
209
return err
209
210
}
···
240
241
http.ListenAndServe(":4445", nil)
241
242
}()
242
243
243
-
go s.missingRecordFetcher()
244
+
sc := SyncConfig{
245
+
Backends: []SyncBackend{
246
+
{
247
+
Type: "firehose",
248
+
Host: "bsky.network",
249
+
},
250
+
},
251
+
}
244
252
245
-
seqno, err := loadLastSeq(db, "firehose_seq")
246
-
if err != nil {
247
-
fmt.Println("failed to load sequence number, starting over", err)
253
+
if scfn := cctx.String("sync-config"); scfn != "" {
254
+
{
255
+
scfi, err := os.Open(scfn)
256
+
if err != nil {
257
+
return err
258
+
}
259
+
defer scfi.Close()
260
+
261
+
var lsc SyncConfig
262
+
if err := json.NewDecoder(scfi).Decode(&lsc); err != nil {
263
+
return err
264
+
}
265
+
sc = lsc
266
+
}
248
267
}
249
268
250
-
return s.startLiveTail(ctx, int(seqno), 10, 20)
269
+
/*
270
+
sc.Backends[0] = SyncBackend{
271
+
Type: "jetstream",
272
+
Host: "jetstream1.us-west.bsky.network",
273
+
}
274
+
*/
275
+
276
+
return s.StartSyncEngine(ctx, &sc)
277
+
251
278
}
252
279
253
280
app.RunAndExitOnError()
···
265
292
seqLk sync.Mutex
266
293
lastSeq int64
267
294
268
-
mpLk sync.Mutex
269
-
missingRecords chan MissingRecord
295
+
mpLk sync.Mutex
270
296
271
297
db *gorm.DB
272
298
}
···
274
300
func (s *Server) getXrpcClient() (*xrpclib.Client, error) {
275
301
// TODO: handle refreshing the token periodically
276
302
return s.client, nil
277
-
}
278
-
279
-
func (s *Server) startLiveTail(ctx context.Context, curs int, parWorkers, maxQ int) error {
280
-
slog.Info("starting live tail")
281
-
282
-
// Connect to the Relay websocket
283
-
urlStr := fmt.Sprintf("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", curs)
284
-
285
-
d := websocket.DefaultDialer
286
-
con, _, err := d.Dial(urlStr, http.Header{
287
-
"User-Agent": []string{"market/0.0.1"},
288
-
})
289
-
if err != nil {
290
-
return fmt.Errorf("failed to connect to relay: %w", err)
291
-
}
292
-
293
-
var lelk sync.Mutex
294
-
lastEvent := time.Now()
295
-
296
-
go func() {
297
-
for range time.Tick(time.Second) {
298
-
lelk.Lock()
299
-
let := lastEvent
300
-
lelk.Unlock()
301
-
302
-
if time.Since(let) > time.Second*30 {
303
-
slog.Error("firehose connection timed out")
304
-
con.Close()
305
-
return
306
-
}
307
-
308
-
}
309
-
310
-
}()
311
-
312
-
var cclk sync.Mutex
313
-
var completeCursor int64
314
-
315
-
rsc := &stream.RepoStreamCallbacks{
316
-
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
317
-
ctx := context.Background()
318
-
319
-
firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq))
320
-
321
-
s.seqLk.Lock()
322
-
if evt.Seq > s.lastSeq {
323
-
curs = int(evt.Seq)
324
-
s.lastSeq = evt.Seq
325
-
326
-
if evt.Seq%1000 == 0 {
327
-
if err := storeLastSeq(s.db, "firehose_seq", evt.Seq); err != nil {
328
-
fmt.Println("failed to store seqno: ", err)
329
-
}
330
-
}
331
-
}
332
-
s.seqLk.Unlock()
333
-
334
-
lelk.Lock()
335
-
lastEvent = time.Now()
336
-
lelk.Unlock()
337
-
338
-
if err := s.backend.HandleEvent(ctx, evt); err != nil {
339
-
return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err)
340
-
}
341
-
342
-
cclk.Lock()
343
-
if evt.Seq > completeCursor {
344
-
completeCursor = evt.Seq
345
-
firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq))
346
-
}
347
-
cclk.Unlock()
348
-
349
-
return nil
350
-
},
351
-
RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error {
352
-
return nil
353
-
},
354
-
// TODO: all the other event types
355
-
Error: func(errf *stream.ErrorFrame) error {
356
-
return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message)
357
-
},
358
-
}
359
-
360
-
sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler)
361
-
362
-
//s.eventScheduler = sched
363
-
//s.streamFinished = make(chan struct{})
364
-
365
-
return stream.HandleRepoStream(ctx, con, sched, slog.Default())
366
303
}
367
304
368
305
func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) {
-234
missing.go
-234
missing.go
···
1
-
package main
2
-
3
-
import (
4
-
"bytes"
5
-
"context"
6
-
"fmt"
7
-
"log/slog"
8
-
9
-
"github.com/bluesky-social/indigo/api/atproto"
10
-
"github.com/bluesky-social/indigo/api/bsky"
11
-
"github.com/bluesky-social/indigo/atproto/syntax"
12
-
xrpclib "github.com/bluesky-social/indigo/xrpc"
13
-
"github.com/ipfs/go-cid"
14
-
)
15
-
16
-
type MissingRecordType string
17
-
18
-
const (
19
-
MissingRecordTypeProfile MissingRecordType = "profile"
20
-
MissingRecordTypePost MissingRecordType = "post"
21
-
MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22
-
)
23
-
24
-
type MissingRecord struct {
25
-
Type MissingRecordType
26
-
Identifier string // DID for profiles, AT-URI for posts/feedgens
27
-
Wait bool
28
-
29
-
waitch chan struct{}
30
-
}
31
-
32
-
func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) {
33
-
if rec.Wait {
34
-
rec.waitch = make(chan struct{})
35
-
}
36
-
37
-
select {
38
-
case s.missingRecords <- rec:
39
-
case <-ctx.Done():
40
-
}
41
-
42
-
if rec.Wait {
43
-
select {
44
-
case <-rec.waitch:
45
-
case <-ctx.Done():
46
-
}
47
-
}
48
-
}
49
-
50
-
// Legacy methods for backward compatibility
51
-
func (s *Server) addMissingProfile(ctx context.Context, did string) {
52
-
s.addMissingRecord(ctx, MissingRecord{
53
-
Type: MissingRecordTypeProfile,
54
-
Identifier: did,
55
-
})
56
-
}
57
-
58
-
func (s *Server) addMissingPost(ctx context.Context, uri string) {
59
-
slog.Info("adding missing post to fetch queue", "uri", uri)
60
-
s.addMissingRecord(ctx, MissingRecord{
61
-
Type: MissingRecordTypePost,
62
-
Identifier: uri,
63
-
})
64
-
}
65
-
66
-
func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) {
67
-
slog.Info("adding missing feed generator to fetch queue", "uri", uri)
68
-
s.addMissingRecord(ctx, MissingRecord{
69
-
Type: MissingRecordTypeFeedGenerator,
70
-
Identifier: uri,
71
-
})
72
-
}
73
-
74
-
func (s *Server) missingRecordFetcher() {
75
-
for rec := range s.missingRecords {
76
-
var err error
77
-
switch rec.Type {
78
-
case MissingRecordTypeProfile:
79
-
err = s.fetchMissingProfile(context.TODO(), rec.Identifier)
80
-
case MissingRecordTypePost:
81
-
err = s.fetchMissingPost(context.TODO(), rec.Identifier)
82
-
case MissingRecordTypeFeedGenerator:
83
-
err = s.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
84
-
default:
85
-
slog.Error("unknown missing record type", "type", rec.Type)
86
-
continue
87
-
}
88
-
89
-
if err != nil {
90
-
slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err)
91
-
}
92
-
93
-
if rec.Wait {
94
-
close(rec.waitch)
95
-
}
96
-
}
97
-
}
98
-
99
-
func (s *Server) fetchMissingProfile(ctx context.Context, did string) error {
100
-
s.backend.AddRelevantDid(did)
101
-
102
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
103
-
if err != nil {
104
-
return err
105
-
}
106
-
107
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
108
-
if err != nil {
109
-
return err
110
-
}
111
-
112
-
c := &xrpclib.Client{
113
-
Host: resp.PDSEndpoint(),
114
-
}
115
-
116
-
rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self")
117
-
if err != nil {
118
-
return err
119
-
}
120
-
121
-
prof, ok := rec.Value.Val.(*bsky.ActorProfile)
122
-
if !ok {
123
-
return fmt.Errorf("record we got back wasnt a profile somehow")
124
-
}
125
-
126
-
buf := new(bytes.Buffer)
127
-
if err := prof.MarshalCBOR(buf); err != nil {
128
-
return err
129
-
}
130
-
131
-
cc, err := cid.Decode(*rec.Cid)
132
-
if err != nil {
133
-
return err
134
-
}
135
-
136
-
return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
137
-
}
138
-
139
-
func (s *Server) fetchMissingPost(ctx context.Context, uri string) error {
140
-
puri, err := syntax.ParseATURI(uri)
141
-
if err != nil {
142
-
return fmt.Errorf("invalid AT URI: %s", uri)
143
-
}
144
-
145
-
did := puri.Authority().String()
146
-
collection := puri.Collection().String()
147
-
rkey := puri.RecordKey().String()
148
-
149
-
s.backend.AddRelevantDid(did)
150
-
151
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
152
-
if err != nil {
153
-
return err
154
-
}
155
-
156
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
157
-
if err != nil {
158
-
return err
159
-
}
160
-
161
-
c := &xrpclib.Client{
162
-
Host: resp.PDSEndpoint(),
163
-
}
164
-
165
-
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
166
-
if err != nil {
167
-
return err
168
-
}
169
-
170
-
post, ok := rec.Value.Val.(*bsky.FeedPost)
171
-
if !ok {
172
-
return fmt.Errorf("record we got back wasn't a post somehow")
173
-
}
174
-
175
-
buf := new(bytes.Buffer)
176
-
if err := post.MarshalCBOR(buf); err != nil {
177
-
return err
178
-
}
179
-
180
-
cc, err := cid.Decode(*rec.Cid)
181
-
if err != nil {
182
-
return err
183
-
}
184
-
185
-
return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
186
-
}
187
-
188
-
func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
189
-
puri, err := syntax.ParseATURI(uri)
190
-
if err != nil {
191
-
return fmt.Errorf("invalid AT URI: %s", uri)
192
-
}
193
-
194
-
did := puri.Authority().String()
195
-
collection := puri.Collection().String()
196
-
rkey := puri.RecordKey().String()
197
-
s.backend.AddRelevantDid(did)
198
-
199
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
200
-
if err != nil {
201
-
return err
202
-
}
203
-
204
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
205
-
if err != nil {
206
-
return err
207
-
}
208
-
209
-
c := &xrpclib.Client{
210
-
Host: resp.PDSEndpoint(),
211
-
}
212
-
213
-
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
214
-
if err != nil {
215
-
return err
216
-
}
217
-
218
-
feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator)
219
-
if !ok {
220
-
return fmt.Errorf("record we got back wasn't a feed generator somehow")
221
-
}
222
-
223
-
buf := new(bytes.Buffer)
224
-
if err := feedGen.MarshalCBOR(buf); err != nil {
225
-
return err
226
-
}
227
-
228
-
cc, err := cid.Decode(*rec.Cid)
229
-
if err != nil {
230
-
return err
231
-
}
232
-
233
-
return s.backend.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
234
-
}
+5
models/models.go
+5
models/models.go
+8
sync-config-jetstream.json
+8
sync-config-jetstream.json
+281
sync.go
+281
sync.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"log/slog"
7
+
"net/http"
8
+
"sync"
9
+
"time"
10
+
11
+
"github.com/bluesky-social/indigo/api/atproto"
12
+
"github.com/bluesky-social/indigo/cmd/relay/stream"
13
+
"github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
14
+
jsclient "github.com/bluesky-social/jetstream/pkg/client"
15
+
jsparallel "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel"
16
+
"github.com/bluesky-social/jetstream/pkg/models"
17
+
"github.com/gorilla/websocket"
18
+
)
19
+
20
+
type SyncConfig struct {
21
+
Backends []SyncBackend `json:"backends"`
22
+
}
23
+
24
+
type SyncBackend struct {
25
+
Type string `json:"type"`
26
+
Host string `json:"host"`
27
+
MaxWorkers int `json:"max_workers,omitempty"`
28
+
}
29
+
30
+
func (s *Server) StartSyncEngine(ctx context.Context, sc *SyncConfig) error {
31
+
for _, be := range sc.Backends {
32
+
switch be.Type {
33
+
case "firehose":
34
+
go s.runSyncFirehose(ctx, be)
35
+
case "jetstream":
36
+
go s.runSyncJetstream(ctx, be)
37
+
default:
38
+
return fmt.Errorf("unrecognized sync backend type: %q", be.Type)
39
+
}
40
+
}
41
+
42
+
<-ctx.Done()
43
+
return fmt.Errorf("exiting sync routine")
44
+
}
45
+
46
+
const failureTimeInterval = time.Second * 5
47
+
48
+
func (s *Server) runSyncFirehose(ctx context.Context, be SyncBackend) {
49
+
var failures int
50
+
for {
51
+
seqno, err := loadLastSeq(s.db, be.Host)
52
+
if err != nil {
53
+
fmt.Println("failed to load sequence number, starting over", err)
54
+
}
55
+
56
+
maxWorkers := 10
57
+
if be.MaxWorkers != 0 {
58
+
maxWorkers = be.MaxWorkers
59
+
}
60
+
61
+
start := time.Now()
62
+
if err := s.startLiveTail(ctx, be.Host, int(seqno), maxWorkers, 20); err != nil {
63
+
slog.Error("firehose connection lost", "host", be.Host, "error", err)
64
+
}
65
+
66
+
elapsed := time.Since(start)
67
+
68
+
if elapsed > failureTimeInterval {
69
+
failures = 0
70
+
continue
71
+
}
72
+
failures++
73
+
74
+
delay := delayForFailureCount(failures)
75
+
slog.Warn("retrying connection after delay", "host", be.Host, "delay", delay)
76
+
}
77
+
}
78
+
79
+
func (s *Server) runSyncJetstream(ctx context.Context, be SyncBackend) {
80
+
var failures int
81
+
for {
82
+
// Load last cursor (stored as sequence number in same table)
83
+
cursor, err := loadLastSeq(s.db, be.Host)
84
+
if err != nil {
85
+
slog.Warn("failed to load jetstream cursor, starting from live", "error", err)
86
+
cursor = 0
87
+
}
88
+
89
+
maxWorkers := 10
90
+
if be.MaxWorkers != 0 {
91
+
maxWorkers = be.MaxWorkers
92
+
}
93
+
94
+
start := time.Now()
95
+
if err := s.startJetstreamTail(ctx, be.Host, cursor, maxWorkers); err != nil {
96
+
slog.Error("jetstream connection lost", "host", be.Host, "error", err)
97
+
}
98
+
99
+
elapsed := time.Since(start)
100
+
101
+
if elapsed > failureTimeInterval {
102
+
failures = 0
103
+
continue
104
+
}
105
+
failures++
106
+
107
+
delay := delayForFailureCount(failures)
108
+
slog.Warn("retrying jetstream connection after delay", "host", be.Host, "delay", delay)
109
+
time.Sleep(delay)
110
+
}
111
+
}
112
+
113
+
func delayForFailureCount(n int) time.Duration {
114
+
if n < 5 {
115
+
return (time.Second * 5) + (time.Second * 2 * time.Duration(n))
116
+
}
117
+
118
+
return time.Second * 30
119
+
}
120
+
121
+
func (s *Server) startLiveTail(ctx context.Context, host string, curs int, parWorkers, maxQ int) error {
122
+
ctx, cancel := context.WithCancel(ctx)
123
+
defer cancel()
124
+
125
+
slog.Info("starting live tail")
126
+
127
+
// Connect to the Relay websocket
128
+
urlStr := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", host, curs)
129
+
130
+
d := websocket.DefaultDialer
131
+
con, _, err := d.Dial(urlStr, http.Header{
132
+
"User-Agent": []string{"konbini/0.0.1"},
133
+
})
134
+
if err != nil {
135
+
return fmt.Errorf("failed to connect to relay: %w", err)
136
+
}
137
+
138
+
var lelk sync.Mutex
139
+
lastEvent := time.Now()
140
+
141
+
go func() {
142
+
tick := time.NewTicker(time.Second)
143
+
defer tick.Stop()
144
+
for {
145
+
select {
146
+
case <-tick.C:
147
+
lelk.Lock()
148
+
let := lastEvent
149
+
lelk.Unlock()
150
+
151
+
if time.Since(let) > time.Second*30 {
152
+
slog.Error("firehose connection timed out")
153
+
con.Close()
154
+
return
155
+
}
156
+
case <-ctx.Done():
157
+
return
158
+
}
159
+
}
160
+
}()
161
+
162
+
var cclk sync.Mutex
163
+
var completeCursor int64
164
+
165
+
rsc := &stream.RepoStreamCallbacks{
166
+
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
167
+
ctx := context.Background()
168
+
169
+
firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq))
170
+
171
+
s.seqLk.Lock()
172
+
if evt.Seq > s.lastSeq {
173
+
curs = int(evt.Seq)
174
+
s.lastSeq = evt.Seq
175
+
176
+
if evt.Seq%1000 == 0 {
177
+
if err := storeLastSeq(s.db, host, evt.Seq); err != nil {
178
+
fmt.Println("failed to store seqno: ", err)
179
+
}
180
+
}
181
+
}
182
+
s.seqLk.Unlock()
183
+
184
+
lelk.Lock()
185
+
lastEvent = time.Now()
186
+
lelk.Unlock()
187
+
188
+
if err := s.backend.HandleEvent(ctx, evt); err != nil {
189
+
return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err)
190
+
}
191
+
192
+
cclk.Lock()
193
+
if evt.Seq > completeCursor {
194
+
completeCursor = evt.Seq
195
+
firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq))
196
+
}
197
+
cclk.Unlock()
198
+
199
+
return nil
200
+
},
201
+
RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error {
202
+
return nil
203
+
},
204
+
// TODO: all the other event types
205
+
Error: func(errf *stream.ErrorFrame) error {
206
+
return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message)
207
+
},
208
+
}
209
+
210
+
sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler)
211
+
212
+
return stream.HandleRepoStream(ctx, con, sched, slog.Default())
213
+
}
214
+
215
+
func (s *Server) startJetstreamTail(ctx context.Context, host string, cursor int64, parWorkers int) error {
216
+
ctx, cancel := context.WithCancel(ctx)
217
+
defer cancel()
218
+
219
+
slog.Info("starting jetstream tail", "host", host, "cursor", cursor)
220
+
221
+
// Create a scheduler for parallel processing
222
+
lastStored := int64(0)
223
+
sched := jsparallel.NewScheduler(
224
+
parWorkers,
225
+
host,
226
+
slog.Default(),
227
+
func(ctx context.Context, event *models.Event) error {
228
+
// Update cursor tracking
229
+
s.seqLk.Lock()
230
+
if event.TimeUS > s.lastSeq {
231
+
s.lastSeq = event.TimeUS
232
+
if event.TimeUS-lastStored > 1_000_000 {
233
+
// Store checkpoint periodically
234
+
if err := storeLastSeq(s.db, host, event.TimeUS); err != nil {
235
+
slog.Error("failed to store jetstream cursor", "error", err)
236
+
}
237
+
lastStored = event.TimeUS
238
+
}
239
+
}
240
+
s.seqLk.Unlock()
241
+
242
+
// Update metrics
243
+
firehoseCursorGauge.WithLabelValues("ingest").Set(float64(event.TimeUS))
244
+
245
+
// Convert Jetstream event to ATProto event format
246
+
if event.Commit != nil {
247
+
248
+
if err := s.backend.HandleEventJetstream(ctx, event); err != nil {
249
+
return fmt.Errorf("handle event (%s,%d): %w", event.Did, event.TimeUS, err)
250
+
}
251
+
252
+
firehoseCursorGauge.WithLabelValues("complete").Set(float64(event.TimeUS))
253
+
}
254
+
255
+
return nil
256
+
},
257
+
)
258
+
259
+
// Configure Jetstream client
260
+
config := jsclient.DefaultClientConfig()
261
+
config.WebsocketURL = fmt.Sprintf("wss://%s/subscribe", host)
262
+
263
+
// Prepare cursor pointer
264
+
var cursorPtr *int64
265
+
if cursor > 0 {
266
+
cursorPtr = &cursor
267
+
}
268
+
269
+
// Create and connect client
270
+
client, err := jsclient.NewClient(
271
+
config,
272
+
slog.Default(),
273
+
sched,
274
+
)
275
+
if err != nil {
276
+
return fmt.Errorf("create jetstream client: %w", err)
277
+
}
278
+
279
+
// Start reading from Jetstream
280
+
return client.ConnectAndRead(ctx, cursorPtr)
281
+
}
+11
-11
xrpc/feed/getPostThread.go
+11
-11
xrpc/feed/getPostThread.go
···
15
15
func HandleGetPostThread(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
16
16
uriParam := c.QueryParam("uri")
17
17
if uriParam == "" {
18
-
return c.JSON(http.StatusBadRequest, map[string]interface{}{
18
+
return c.JSON(http.StatusBadRequest, map[string]any{
19
19
"error": "InvalidRequest",
20
20
"message": "uri parameter is required",
21
21
})
···
27
27
// Hydrate the requested post
28
28
postInfo, err := hydrator.HydratePost(ctx, uriParam, viewer)
29
29
if err != nil {
30
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
30
+
return c.JSON(http.StatusNotFound, map[string]any{
31
31
"error": "NotFound",
32
32
"message": "post not found",
33
33
})
···
74
74
uri: uri,
75
75
replyTo: tp.ReplyTo,
76
76
inThread: tp.InThread,
77
-
replies: []interface{}{},
77
+
replies: []any{},
78
78
}
79
79
}
80
80
···
98
98
}
99
99
100
100
if rootNode == nil {
101
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
101
+
return c.JSON(http.StatusNotFound, map[string]any{
102
102
"error": "NotFound",
103
103
"message": "thread root not found",
104
104
})
···
107
107
// Build the response by traversing the tree
108
108
thread := buildThreadView(ctx, db, rootNode, postsByID, hydrator, viewer, nil)
109
109
110
-
return c.JSON(http.StatusOK, map[string]interface{}{
110
+
return c.JSON(http.StatusOK, map[string]any{
111
111
"thread": thread,
112
112
})
113
113
}
···
117
117
uri string
118
118
replyTo uint
119
119
inThread uint
120
-
replies []interface{}
120
+
replies []any
121
121
}
122
122
123
-
func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent interface{}) interface{} {
123
+
func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent any) any {
124
124
// Hydrate this post
125
125
postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer)
126
126
if err != nil {
127
127
// Return a notFound post
128
-
return map[string]interface{}{
128
+
return map[string]any{
129
129
"$type": "app.bsky.feed.defs#notFoundPost",
130
130
"uri": node.uri,
131
131
}
···
134
134
// Hydrate author
135
135
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
136
136
if err != nil {
137
-
return map[string]interface{}{
137
+
return map[string]any{
138
138
"$type": "app.bsky.feed.defs#notFoundPost",
139
139
"uri": node.uri,
140
140
}
141
141
}
142
142
143
143
// Build replies
144
-
var replies []interface{}
144
+
var replies []any
145
145
for _, replyNode := range node.replies {
146
146
if rn, ok := replyNode.(*threadPostNode); ok {
147
147
replyView := buildThreadView(ctx, db, rn, allNodes, hydrator, viewer, nil)
···
150
150
}
151
151
152
152
// Build the thread view post
153
-
var repliesForView interface{}
153
+
var repliesForView any
154
154
if len(replies) > 0 {
155
155
repliesForView = replies
156
156
}
+92
-11
xrpc/notification/listNotifications.go
+92
-11
xrpc/notification/listNotifications.go
···
13
13
lexutil "github.com/bluesky-social/indigo/lex/util"
14
14
"github.com/labstack/echo/v4"
15
15
"github.com/whyrusleeping/konbini/hydration"
16
+
models "github.com/whyrusleeping/konbini/models"
16
17
"github.com/whyrusleeping/konbini/views"
17
-
"github.com/whyrusleeping/market/models"
18
18
"gorm.io/gorm"
19
+
"gorm.io/gorm/clause"
19
20
)
20
21
21
22
// HandleListNotifications implements app.bsky.notification.listNotifications
22
23
func HandleListNotifications(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
23
24
viewer := getUserDID(c)
24
25
if viewer == "" {
25
-
return c.JSON(http.StatusUnauthorized, map[string]interface{}{
26
+
return c.JSON(http.StatusUnauthorized, map[string]any{
26
27
"error": "AuthenticationRequired",
27
28
"message": "authentication required",
28
29
})
···
77
78
}
78
79
query += ` ORDER BY n.created_at DESC LIMIT ?`
79
80
80
-
var queryArgs []interface{}
81
+
var queryArgs []any
81
82
queryArgs = append(queryArgs, viewer)
82
83
if cursor > 0 {
83
84
queryArgs = append(queryArgs, cursor)
···
85
86
queryArgs = append(queryArgs, limit)
86
87
87
88
if err := db.Raw(query, queryArgs...).Scan(&rows).Error; err != nil {
88
-
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
89
+
return c.JSON(http.StatusInternalServerError, map[string]any{
89
90
"error": "InternalError",
90
91
"message": "failed to query notifications",
91
92
})
···
130
131
cursorPtr = &cursor
131
132
}
132
133
134
+
var lastSeen time.Time
135
+
if err := db.Raw("SELECT seen_at FROM notification_seens WHERE repo = (select id from repos where did = ?)", viewer).Scan(&lastSeen).Error; err != nil {
136
+
return err
137
+
}
138
+
139
+
var lastSeenStr *string
140
+
if !lastSeen.IsZero() {
141
+
s := lastSeen.Format(time.RFC3339)
142
+
lastSeenStr = &s
143
+
}
144
+
133
145
output := &bsky.NotificationListNotifications_Output{
134
146
Notifications: notifications,
135
147
Cursor: cursorPtr,
148
+
SeenAt: lastSeenStr,
136
149
}
137
150
138
151
return c.JSON(http.StatusOK, output)
···
142
155
func HandleGetUnreadCount(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
143
156
viewer := getUserDID(c)
144
157
if viewer == "" {
145
-
return c.JSON(http.StatusUnauthorized, map[string]interface{}{
158
+
return c.JSON(http.StatusUnauthorized, map[string]any{
146
159
"error": "AuthenticationRequired",
147
160
"message": "authentication required",
148
161
})
149
162
}
150
163
151
-
// For now, return 0 - we'd need to track read state in the database
152
-
return c.JSON(http.StatusOK, map[string]interface{}{
153
-
"count": 0,
164
+
var repo models.Repo
165
+
if err := db.Find(&repo, "did = ?", viewer).Error; err != nil {
166
+
return err
167
+
}
168
+
169
+
var lastSeen time.Time
170
+
if err := db.Raw("SELECT seen_at FROM notification_seens WHERE repo = ?", repo.ID).Scan(&lastSeen).Error; err != nil {
171
+
return err
172
+
}
173
+
174
+
var count int
175
+
query := `SELECT count(*) FROM notifications WHERE created_at > ? AND for = ?`
176
+
if err := db.Raw(query, lastSeen, repo.ID).Scan(&count).Error; err != nil {
177
+
return c.JSON(http.StatusInternalServerError, map[string]any{
178
+
"error": "InternalError",
179
+
"message": "failed to count unread notifications",
180
+
})
181
+
}
182
+
183
+
return c.JSON(http.StatusOK, map[string]any{
184
+
"count": count,
154
185
})
155
186
}
156
187
···
158
189
func HandleUpdateSeen(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
159
190
viewer := getUserDID(c)
160
191
if viewer == "" {
161
-
return c.JSON(http.StatusUnauthorized, map[string]interface{}{
192
+
return c.JSON(http.StatusUnauthorized, map[string]any{
162
193
"error": "AuthenticationRequired",
163
194
"message": "authentication required",
164
195
})
165
196
}
166
197
167
-
// For now, just return success - we'd need to track seen timestamps in the database
168
-
return c.JSON(http.StatusOK, map[string]interface{}{})
198
+
var body bsky.NotificationUpdateSeen_Input
199
+
if err := c.Bind(&body); err != nil {
200
+
return c.JSON(http.StatusBadRequest, map[string]any{
201
+
"error": "InvalidRequest",
202
+
"message": "invalid request body",
203
+
})
204
+
}
205
+
206
+
// Parse the seenAt timestamp
207
+
seenAt, err := time.Parse(time.RFC3339, body.SeenAt)
208
+
if err != nil {
209
+
return c.JSON(http.StatusBadRequest, map[string]any{
210
+
"error": "InvalidRequest",
211
+
"message": "invalid seenAt timestamp",
212
+
})
213
+
}
214
+
215
+
// Get the viewer's repo ID
216
+
var repoID uint
217
+
if err := db.Raw("SELECT id FROM repos WHERE did = ?", viewer).Scan(&repoID).Error; err != nil {
218
+
return c.JSON(http.StatusInternalServerError, map[string]any{
219
+
"error": "InternalError",
220
+
"message": "failed to find viewer repo",
221
+
})
222
+
}
223
+
224
+
if repoID == 0 {
225
+
return c.JSON(http.StatusInternalServerError, map[string]any{
226
+
"error": "InternalError",
227
+
"message": "viewer repo not found",
228
+
})
229
+
}
230
+
231
+
// Upsert the NotificationSeen record
232
+
notifSeen := models.NotificationSeen{
233
+
Repo: repoID,
234
+
SeenAt: seenAt,
235
+
}
236
+
237
+
err = db.Clauses(clause.OnConflict{
238
+
Columns: []clause.Column{{Name: "repo"}},
239
+
DoUpdates: clause.AssignmentColumns([]string{"seen_at"}),
240
+
}).Create(¬ifSeen).Error
241
+
242
+
if err != nil {
243
+
return c.JSON(http.StatusInternalServerError, map[string]any{
244
+
"error": "InternalError",
245
+
"message": "failed to update seen timestamp",
246
+
})
247
+
}
248
+
249
+
return c.JSON(http.StatusOK, map[string]any{})
169
250
}
170
251
171
252
func getUserDID(c echo.Context) string {
+159
-131
xrpc/unspecced/getPostThreadV2.go
+159
-131
xrpc/unspecced/getPostThreadV2.go
···
1
1
package unspecced
2
2
3
3
import (
4
+
"bytes"
4
5
"context"
5
6
"fmt"
6
7
"log/slog"
7
8
"net/http"
8
9
"strconv"
10
+
"sync"
9
11
10
12
"github.com/bluesky-social/indigo/api/bsky"
11
13
"github.com/labstack/echo/v4"
12
14
"github.com/whyrusleeping/konbini/hydration"
13
15
"github.com/whyrusleeping/konbini/views"
16
+
"github.com/whyrusleeping/market/models"
17
+
"go.opentelemetry.io/otel"
14
18
"gorm.io/gorm"
15
19
)
16
20
21
+
var tracer = otel.Tracer("xrpc/unspecced")
22
+
17
23
// HandleGetPostThreadV2 implements app.bsky.unspecced.getPostThreadV2
18
24
func HandleGetPostThreadV2(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
19
-
ctx := c.Request().Context()
25
+
ctx, span := tracer.Start(c.Request().Context(), "getPostThreadV2")
26
+
defer span.End()
20
27
ctx = context.WithValue(ctx, "auto-fetch", true)
21
28
22
29
// Parse parameters
···
69
76
})
70
77
}
71
78
72
-
// Determine the root post ID for the thread
73
-
rootPostID := anchorPostInfo.InThread
74
-
if rootPostID == 0 {
75
-
// This post is the root - get its ID
76
-
var postID uint
77
-
db.Raw(`
78
-
SELECT id FROM posts
79
-
WHERE author = (SELECT id FROM repos WHERE did = ?)
80
-
AND rkey = ?
81
-
`, extractDIDFromURI(anchorUri), extractRkeyFromURI(anchorUri)).Scan(&postID)
82
-
rootPostID = postID
79
+
threadID := anchorPostInfo.InThread
80
+
if threadID == 0 {
81
+
threadID = anchorPostInfo.ID
83
82
}
84
83
85
-
// Query all posts in this thread
86
-
type threadPostRow struct {
87
-
ID uint
88
-
Rkey string
89
-
ReplyTo uint
90
-
InThread uint
91
-
AuthorDid string
84
+
var threadPosts []*models.Post
85
+
if err := db.Raw("SELECT * FROM posts WHERE in_thread = ? OR id = ?", threadID, anchorPostInfo.ID).Scan(&threadPosts).Error; err != nil {
86
+
return err
92
87
}
93
-
var threadPosts []threadPostRow
94
-
db.Raw(`
95
-
SELECT p.id, p.rkey, p.reply_to, p.in_thread, r.did as author_did
96
-
FROM posts p
97
-
JOIN repos r ON r.id = p.author
98
-
WHERE (p.id = ? OR p.in_thread = ?)
99
-
AND p.not_found = false
100
-
ORDER BY p.created ASC
101
-
`, rootPostID, rootPostID).Scan(&threadPosts)
102
88
103
-
// Build a map of posts by ID
104
-
postsByID := make(map[uint]*threadNode)
105
-
for _, tp := range threadPosts {
106
-
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", tp.AuthorDid, tp.Rkey)
107
-
postsByID[tp.ID] = &threadNode{
108
-
id: tp.ID,
109
-
uri: uri,
110
-
replyTo: tp.ReplyTo,
111
-
inThread: tp.InThread,
112
-
children: []*threadNode{},
113
-
}
114
-
}
89
+
fmt.Println("GOT THREAD POSTS: ", len(threadPosts))
115
90
116
-
// Build parent-child relationships
117
-
for _, node := range postsByID {
118
-
if node.replyTo != 0 {
119
-
parent := postsByID[node.replyTo]
120
-
if parent != nil {
121
-
parent.children = append(parent.children, node)
122
-
}
123
-
}
91
+
treeNodes, err := buildThreadTree(ctx, hydrator, db, threadPosts)
92
+
if err != nil {
93
+
return fmt.Errorf("failed to construct tree: %w", err)
124
94
}
125
95
126
-
// Find the anchor node
127
-
anchorID := uint(0)
128
-
for id, node := range postsByID {
129
-
if node.uri == anchorUri {
130
-
anchorID = id
131
-
break
132
-
}
133
-
}
134
-
135
-
if anchorID == 0 {
136
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
137
-
"error": "NotFound",
138
-
"message": "anchor post not found in thread",
139
-
})
140
-
}
141
-
142
-
anchorNode := postsByID[anchorID]
96
+
anchor := treeNodes[anchorPostInfo.ID]
143
97
144
98
// Build flat thread items list
145
99
var threadItems []*bsky.UnspeccedGetPostThreadV2_ThreadItem
···
147
101
148
102
// Add parents if requested
149
103
if above {
150
-
parents := collectParents(anchorNode, postsByID)
151
-
for i := len(parents) - 1; i >= 0; i-- {
152
-
depth := int64(-(len(parents) - i))
153
-
item := buildThreadItem(ctx, hydrator, parents[i], depth, viewer)
104
+
parent := anchor.parent
105
+
depth := int64(-1)
106
+
for parent != nil {
107
+
if parent.missing {
108
+
fmt.Println("Parent missing: ", depth)
109
+
item := &bsky.UnspeccedGetPostThreadV2_ThreadItem{
110
+
Depth: depth,
111
+
Uri: parent.uri,
112
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
113
+
UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{
114
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound",
115
+
},
116
+
},
117
+
}
118
+
119
+
threadItems = append(threadItems, item)
120
+
break
121
+
}
122
+
123
+
item := buildThreadItem(ctx, hydrator, parent, depth, viewer)
154
124
if item != nil {
155
125
threadItems = append(threadItems, item)
156
126
}
127
+
128
+
parent = parent.parent
129
+
depth--
157
130
}
158
131
}
159
132
160
133
// Add anchor post (depth 0)
161
-
anchorItem := buildThreadItem(ctx, hydrator, anchorNode, 0, viewer)
134
+
anchorItem := buildThreadItem(ctx, hydrator, anchor, 0, viewer)
162
135
if anchorItem != nil {
163
136
threadItems = append(threadItems, anchorItem)
164
137
}
165
138
166
139
// Add replies below anchor
167
140
if below > 0 {
168
-
replies, hasMore := collectReplies(ctx, hydrator, anchorNode, 1, below, branchingFactor, sort, viewer)
141
+
replies, err := collectReplies(ctx, hydrator, anchor, 0, below, branchingFactor, sort, viewer)
142
+
if err != nil {
143
+
return err
144
+
}
169
145
threadItems = append(threadItems, replies...)
170
-
hasOtherReplies = hasMore
146
+
//hasOtherReplies = hasMore
171
147
}
172
148
173
149
return c.JSON(http.StatusOK, &bsky.UnspeccedGetPostThreadV2_Output{
···
176
152
})
177
153
}
178
154
179
-
type threadNode struct {
180
-
id uint
181
-
uri string
182
-
replyTo uint
183
-
inThread uint
184
-
children []*threadNode
185
-
}
155
+
func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, curnode *threadTree, depth int64, below int64, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, error) {
156
+
if below == 0 {
157
+
return nil, nil
158
+
}
186
159
187
-
func collectParents(node *threadNode, allNodes map[uint]*threadNode) []*threadNode {
188
-
var parents []*threadNode
189
-
current := node
190
-
for current.replyTo != 0 {
191
-
parent := allNodes[current.replyTo]
192
-
if parent == nil {
193
-
break
194
-
}
195
-
parents = append(parents, parent)
196
-
current = parent
160
+
type parThreadResults struct {
161
+
node *bsky.UnspeccedGetPostThreadV2_ThreadItem
162
+
children []*bsky.UnspeccedGetPostThreadV2_ThreadItem
197
163
}
198
-
return parents
199
-
}
164
+
165
+
results := make([]parThreadResults, len(curnode.children))
166
+
167
+
var wg sync.WaitGroup
168
+
for i := range curnode.children {
169
+
ix := i
170
+
wg.Go(func() {
171
+
child := curnode.children[ix]
172
+
173
+
results[ix].node = buildThreadItem(ctx, hydrator, child, depth+1, viewer)
174
+
if child.missing {
175
+
return
176
+
}
200
177
201
-
func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, currentDepth, maxDepth, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, bool) {
202
-
var items []*bsky.UnspeccedGetPostThreadV2_ThreadItem
203
-
hasMore := false
178
+
sub, err := collectReplies(ctx, hydrator, child, depth+1, below-1, branchingFactor, sort, viewer)
179
+
if err != nil {
180
+
slog.Error("failed to collect replies", "node", child.uri, "error", err)
181
+
return
182
+
}
204
183
205
-
if currentDepth > maxDepth {
206
-
return items, false
184
+
results[ix].children = sub
185
+
})
207
186
}
208
187
209
-
// Sort children based on sort parameter
210
-
children := node.children
211
-
// TODO: Actually sort based on the sort parameter (newest/oldest/top)
212
-
// For now, just use the order we have
188
+
wg.Wait()
213
189
214
-
// Limit to branchingFactor
215
-
limit := int(branchingFactor)
216
-
if len(children) > limit {
217
-
hasMore = true
218
-
children = children[:limit]
190
+
var out []*bsky.UnspeccedGetPostThreadV2_ThreadItem
191
+
for _, res := range results {
192
+
out = append(out, res.node)
193
+
out = append(out, res.children...)
219
194
}
220
195
221
-
for _, child := range children {
222
-
item := buildThreadItem(ctx, hydrator, child, currentDepth, viewer)
223
-
if item != nil {
224
-
items = append(items, item)
196
+
return out, nil
197
+
}
225
198
226
-
// Recursively collect replies
227
-
if currentDepth < maxDepth {
228
-
childReplies, childHasMore := collectReplies(ctx, hydrator, child, currentDepth+1, maxDepth, branchingFactor, sort, viewer)
229
-
items = append(items, childReplies...)
230
-
if childHasMore {
231
-
hasMore = true
232
-
}
233
-
}
199
+
func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadTree, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem {
200
+
if node.missing {
201
+
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
202
+
Depth: depth,
203
+
Uri: node.uri,
204
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
205
+
UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{
206
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound",
207
+
},
208
+
},
234
209
}
235
210
}
236
211
237
-
return items, hasMore
238
-
}
239
-
240
-
func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem {
241
212
// Hydrate the post
242
-
postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer)
213
+
postInfo, err := hydrator.HydratePostDB(ctx, node.uri, node.val, viewer)
243
214
if err != nil {
215
+
slog.Error("failed to hydrate post in thread item", "uri", node.uri, "error", err)
244
216
// Return not found item
245
217
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
246
218
Depth: depth,
···
256
228
// Hydrate author
257
229
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
258
230
if err != nil {
231
+
slog.Error("failed to hydrate actor in thread item", "author", postInfo.Author, "error", err)
259
232
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
260
233
Depth: depth,
261
234
Uri: node.uri,
···
319
292
return string(parts)
320
293
}
321
294
322
-
func extractRkeyFromURI(uri string) string {
323
-
// URI format: at://did:plc:xxx/collection/rkey
324
-
if len(uri) < 5 || uri[:5] != "at://" {
325
-
return ""
295
+
type threadTree struct {
296
+
parent *threadTree
297
+
children []*threadTree
298
+
299
+
val *models.Post
300
+
301
+
missing bool
302
+
303
+
uri string
304
+
cid string
305
+
}
306
+
307
+
func buildThreadTree(ctx context.Context, hydrator *hydration.Hydrator, db *gorm.DB, posts []*models.Post) (map[uint]*threadTree, error) {
308
+
nodes := make(map[uint]*threadTree)
309
+
for _, p := range posts {
310
+
puri, err := hydrator.UriForPost(ctx, p)
311
+
if err != nil {
312
+
return nil, err
313
+
}
314
+
315
+
t := &threadTree{
316
+
val: p,
317
+
uri: puri,
318
+
}
319
+
320
+
nodes[p.ID] = t
326
321
}
327
-
// Find last slash
328
-
for i := len(uri) - 1; i >= 5; i-- {
329
-
if uri[i] == '/' {
330
-
return uri[i+1:]
322
+
323
+
missing := make(map[uint]*threadTree)
324
+
for _, node := range nodes {
325
+
if node.val.ReplyTo == 0 {
326
+
continue
331
327
}
328
+
329
+
pnode, ok := nodes[node.val.ReplyTo]
330
+
if !ok {
331
+
pnode = &threadTree{
332
+
missing: true,
333
+
}
334
+
missing[node.val.ReplyTo] = pnode
335
+
336
+
var bspost bsky.FeedPost
337
+
if err := bspost.UnmarshalCBOR(bytes.NewReader(node.val.Raw)); err != nil {
338
+
return nil, err
339
+
}
340
+
341
+
if bspost.Reply == nil || bspost.Reply.Parent == nil {
342
+
return nil, fmt.Errorf("node with parent had no parent in object")
343
+
}
344
+
345
+
pnode.uri = bspost.Reply.Parent.Uri
346
+
pnode.cid = bspost.Reply.Parent.Cid
347
+
348
+
/* Maybe we could force hydrate these?
349
+
hydrator.AddMissingRecord(puri, true)
350
+
*/
351
+
}
352
+
353
+
pnode.children = append(pnode.children, node)
354
+
node.parent = pnode
332
355
}
333
-
return ""
356
+
357
+
for k, v := range missing {
358
+
nodes[k] = v
359
+
}
360
+
361
+
return nodes, nil
334
362
}