+38
README.md
+38
README.md
···
201
201
202
202
It will take a minute but it should pull all records from that user.
203
203
204
+
## Upstream Firehose Configuration
205
+
206
+
Konbini supports both standard firehose endpoints as well as jetstream. If
207
+
bandwidth and CPU usage is a concern, and you trust the jetstream endpoint,
208
+
then it may be worth trying that out.
209
+
210
+
The configuration file is formatted as follows:
211
+
212
+
```json
213
+
{
214
+
"backends": [
215
+
{
216
+
"type": "jetstream",
217
+
"host": "jetstream1.us-west.bsky.network"
218
+
}
219
+
]
220
+
}
221
+
```
222
+
223
+
The default (implicit) configuration file looks like this:
224
+
225
+
```json
226
+
{
227
+
"backends": [
228
+
{
229
+
"type": "firehose",
230
+
"host": "bsky.network"
231
+
}
232
+
]
233
+
}
234
+
```
235
+
236
+
Note that this is an array of backends, you can specify multiple upstreams, and
237
+
konbini will read from all of them. The main intended purpose of this is to be
238
+
able to subscribe directly to PDSs. PDSs currently only support the full
239
+
firehose endpoint, not jetstream, so be sure to specify a type of "firehose"
240
+
for individual PDS endpoints.
241
+
204
242
## License
205
243
206
244
MIT (whyrusleeping)
+55
-7
backend/backend.go
+55
-7
backend/backend.go
···
10
10
11
11
"github.com/bluesky-social/indigo/api/atproto"
12
12
"github.com/bluesky-social/indigo/api/bsky"
13
+
"github.com/bluesky-social/indigo/atproto/identity"
13
14
"github.com/bluesky-social/indigo/atproto/syntax"
14
15
"github.com/bluesky-social/indigo/util"
15
16
"github.com/bluesky-social/indigo/xrpc"
···
26
27
27
28
// PostgresBackend handles database operations
28
29
type PostgresBackend struct {
29
-
db *gorm.DB
30
-
pgx *pgxpool.Pool
31
-
tracker RecordTracker
30
+
db *gorm.DB
31
+
pgx *pgxpool.Pool
32
+
33
+
dir identity.Directory
32
34
33
35
client *xrpc.Client
34
36
···
43
45
repoCache *lru.TwoQueueCache[string, *Repo]
44
46
reposLk sync.Mutex
45
47
48
+
didByIDCache *lru.TwoQueueCache[uint, string]
49
+
46
50
postInfoCache *lru.TwoQueueCache[string, cachedPostInfo]
51
+
52
+
missingRecords chan MissingRecord
47
53
}
48
54
49
55
type cachedPostInfo struct {
···
52
58
}
53
59
54
60
// NewPostgresBackend creates a new PostgresBackend
55
-
func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, tracker RecordTracker) (*PostgresBackend, error) {
61
+
func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, dir identity.Directory) (*PostgresBackend, error) {
56
62
rc, _ := lru.New2Q[string, *Repo](1_000_000)
57
63
pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000)
58
64
revc, _ := lru.New2Q[uint, string](1_000_000)
65
+
dbic, _ := lru.New2Q[uint, string](1_000_000)
59
66
60
67
b := &PostgresBackend{
61
68
client: client,
62
69
mydid: mydid,
63
70
db: db,
64
71
pgx: pgx,
65
-
tracker: tracker,
66
72
relevantDids: make(map[string]bool),
67
73
repoCache: rc,
68
74
postInfoCache: pc,
69
75
revCache: revc,
76
+
didByIDCache: dbic,
77
+
dir: dir,
78
+
79
+
missingRecords: make(chan MissingRecord, 1000),
70
80
}
71
81
72
82
r, err := b.GetOrCreateRepo(context.TODO(), mydid)
···
75
85
}
76
86
77
87
b.myrepo = r
88
+
89
+
go b.missingRecordFetcher()
78
90
return b, nil
79
91
}
80
92
81
93
// TrackMissingRecord implements the RecordTracker interface
82
94
func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) {
83
-
if b.tracker != nil {
84
-
b.tracker.TrackMissingRecord(identifier, wait)
95
+
mr := MissingRecord{
96
+
Type: mrTypeFromIdent(identifier),
97
+
Identifier: identifier,
98
+
Wait: wait,
99
+
}
100
+
101
+
b.addMissingRecord(context.TODO(), mr)
102
+
}
103
+
104
+
func mrTypeFromIdent(ident string) MissingRecordType {
105
+
if strings.HasPrefix(ident, "did:") {
106
+
return MissingRecordTypeProfile
107
+
}
108
+
109
+
puri, _ := syntax.ParseATURI(ident)
110
+
switch puri.Collection().String() {
111
+
case "app.bsky.feed.post":
112
+
return MissingRecordTypePost
113
+
case "app.bsky.feed.generator":
114
+
return MissingRecordTypeFeedGenerator
115
+
default:
116
+
return MissingRecordTypeUnknown
85
117
}
118
+
86
119
}
87
120
88
121
// DidToID converts a DID to a database ID
···
363
396
}
364
397
365
398
return &r, nil
399
+
}
400
+
401
+
func (b *PostgresBackend) DidFromID(ctx context.Context, uid uint) (string, error) {
402
+
val, ok := b.didByIDCache.Get(uid)
403
+
if ok {
404
+
return val, nil
405
+
}
406
+
407
+
r, err := b.GetRepoByID(ctx, uid)
408
+
if err != nil {
409
+
return "", err
410
+
}
411
+
412
+
b.didByIDCache.Add(uid, r.Did)
413
+
return r.Did, nil
366
414
}
367
415
368
416
func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
+68
backend/events.go
+68
backend/events.go
···
3
3
import (
4
4
"bytes"
5
5
"context"
6
+
"encoding/json"
6
7
"fmt"
7
8
"log/slog"
8
9
"strings"
···
11
12
"github.com/bluesky-social/indigo/api/atproto"
12
13
"github.com/bluesky-social/indigo/api/bsky"
13
14
"github.com/bluesky-social/indigo/atproto/syntax"
15
+
lexutil "github.com/bluesky-social/indigo/lex/util"
14
16
"github.com/bluesky-social/indigo/repo"
17
+
jsmodels "github.com/bluesky-social/jetstream/pkg/models"
15
18
"github.com/ipfs/go-cid"
16
19
"github.com/jackc/pgx/v5/pgconn"
17
20
"github.com/prometheus/client_golang/prometheus"
···
63
66
return fmt.Errorf("failed to update rev: %w", err)
64
67
}
65
68
*/
69
+
70
+
return nil
71
+
}
72
+
73
+
func cborBytesFromEvent(evt *jsmodels.Event) ([]byte, error) {
74
+
val, err := lexutil.NewFromType(evt.Commit.Collection)
75
+
if err != nil {
76
+
return nil, fmt.Errorf("failed to load event record type: %w", err)
77
+
}
78
+
79
+
if err := json.Unmarshal(evt.Commit.Record, val); err != nil {
80
+
return nil, err
81
+
}
82
+
83
+
cval, ok := val.(lexutil.CBOR)
84
+
if !ok {
85
+
return nil, fmt.Errorf("decoded type was not cbor marshalable")
86
+
}
87
+
88
+
buf := new(bytes.Buffer)
89
+
if err := cval.MarshalCBOR(buf); err != nil {
90
+
return nil, fmt.Errorf("failed to marshal event to cbor: %w", err)
91
+
}
92
+
93
+
rec := buf.Bytes()
94
+
return rec, nil
95
+
}
96
+
97
+
func (b *PostgresBackend) HandleEventJetstream(ctx context.Context, evt *jsmodels.Event) error {
98
+
99
+
path := evt.Commit.Collection + "/" + evt.Commit.RKey
100
+
switch evt.Commit.Operation {
101
+
case jsmodels.CommitOperationCreate:
102
+
rec, err := cborBytesFromEvent(evt)
103
+
if err != nil {
104
+
return err
105
+
}
106
+
107
+
c, err := cid.Decode(evt.Commit.CID)
108
+
if err != nil {
109
+
return err
110
+
}
111
+
112
+
if err := b.HandleCreate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil {
113
+
return fmt.Errorf("create record failed: %w", err)
114
+
}
115
+
case jsmodels.CommitOperationUpdate:
116
+
rec, err := cborBytesFromEvent(evt)
117
+
if err != nil {
118
+
return err
119
+
}
120
+
121
+
c, err := cid.Decode(evt.Commit.CID)
122
+
if err != nil {
123
+
return err
124
+
}
125
+
126
+
if err := b.HandleUpdate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil {
127
+
return fmt.Errorf("update record failed: %w", err)
128
+
}
129
+
case jsmodels.CommitOperationDelete:
130
+
if err := b.HandleDelete(ctx, evt.Did, evt.Commit.Rev, path); err != nil {
131
+
return fmt.Errorf("delete record failed: %w", err)
132
+
}
133
+
}
66
134
67
135
return nil
68
136
}
+211
backend/missing.go
+211
backend/missing.go
···
1
+
package backend
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"log/slog"
8
+
9
+
"github.com/bluesky-social/indigo/api/atproto"
10
+
"github.com/bluesky-social/indigo/api/bsky"
11
+
"github.com/bluesky-social/indigo/atproto/syntax"
12
+
xrpclib "github.com/bluesky-social/indigo/xrpc"
13
+
"github.com/ipfs/go-cid"
14
+
)
15
+
16
+
type MissingRecordType string
17
+
18
+
const (
19
+
MissingRecordTypeProfile MissingRecordType = "profile"
20
+
MissingRecordTypePost MissingRecordType = "post"
21
+
MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22
+
MissingRecordTypeUnknown MissingRecordType = "unknown"
23
+
)
24
+
25
+
type MissingRecord struct {
26
+
Type MissingRecordType
27
+
Identifier string // DID for profiles, AT-URI for posts/feedgens
28
+
Wait bool
29
+
30
+
waitch chan struct{}
31
+
}
32
+
33
+
func (b *PostgresBackend) addMissingRecord(ctx context.Context, rec MissingRecord) {
34
+
if rec.Wait {
35
+
rec.waitch = make(chan struct{})
36
+
}
37
+
38
+
select {
39
+
case b.missingRecords <- rec:
40
+
case <-ctx.Done():
41
+
}
42
+
43
+
if rec.Wait {
44
+
select {
45
+
case <-rec.waitch:
46
+
case <-ctx.Done():
47
+
}
48
+
}
49
+
}
50
+
51
+
func (b *PostgresBackend) missingRecordFetcher() {
52
+
for rec := range b.missingRecords {
53
+
var err error
54
+
switch rec.Type {
55
+
case MissingRecordTypeProfile:
56
+
err = b.fetchMissingProfile(context.TODO(), rec.Identifier)
57
+
case MissingRecordTypePost:
58
+
err = b.fetchMissingPost(context.TODO(), rec.Identifier)
59
+
case MissingRecordTypeFeedGenerator:
60
+
err = b.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
61
+
default:
62
+
slog.Error("unknown missing record type", "type", rec.Type)
63
+
continue
64
+
}
65
+
66
+
if err != nil {
67
+
slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err)
68
+
}
69
+
70
+
if rec.Wait {
71
+
close(rec.waitch)
72
+
}
73
+
}
74
+
}
75
+
76
+
func (b *PostgresBackend) fetchMissingProfile(ctx context.Context, did string) error {
77
+
b.AddRelevantDid(did)
78
+
79
+
repo, err := b.GetOrCreateRepo(ctx, did)
80
+
if err != nil {
81
+
return err
82
+
}
83
+
84
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
85
+
if err != nil {
86
+
return err
87
+
}
88
+
89
+
c := &xrpclib.Client{
90
+
Host: resp.PDSEndpoint(),
91
+
}
92
+
93
+
rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self")
94
+
if err != nil {
95
+
return err
96
+
}
97
+
98
+
prof, ok := rec.Value.Val.(*bsky.ActorProfile)
99
+
if !ok {
100
+
return fmt.Errorf("record we got back wasnt a profile somehow")
101
+
}
102
+
103
+
buf := new(bytes.Buffer)
104
+
if err := prof.MarshalCBOR(buf); err != nil {
105
+
return err
106
+
}
107
+
108
+
cc, err := cid.Decode(*rec.Cid)
109
+
if err != nil {
110
+
return err
111
+
}
112
+
113
+
return b.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
114
+
}
115
+
116
+
func (b *PostgresBackend) fetchMissingPost(ctx context.Context, uri string) error {
117
+
puri, err := syntax.ParseATURI(uri)
118
+
if err != nil {
119
+
return fmt.Errorf("invalid AT URI: %s", uri)
120
+
}
121
+
122
+
did := puri.Authority().String()
123
+
collection := puri.Collection().String()
124
+
rkey := puri.RecordKey().String()
125
+
126
+
b.AddRelevantDid(did)
127
+
128
+
repo, err := b.GetOrCreateRepo(ctx, did)
129
+
if err != nil {
130
+
return err
131
+
}
132
+
133
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
134
+
if err != nil {
135
+
return err
136
+
}
137
+
138
+
c := &xrpclib.Client{
139
+
Host: resp.PDSEndpoint(),
140
+
}
141
+
142
+
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
143
+
if err != nil {
144
+
return err
145
+
}
146
+
147
+
post, ok := rec.Value.Val.(*bsky.FeedPost)
148
+
if !ok {
149
+
return fmt.Errorf("record we got back wasn't a post somehow")
150
+
}
151
+
152
+
buf := new(bytes.Buffer)
153
+
if err := post.MarshalCBOR(buf); err != nil {
154
+
return err
155
+
}
156
+
157
+
cc, err := cid.Decode(*rec.Cid)
158
+
if err != nil {
159
+
return err
160
+
}
161
+
162
+
return b.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
163
+
}
164
+
165
+
func (b *PostgresBackend) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
166
+
puri, err := syntax.ParseATURI(uri)
167
+
if err != nil {
168
+
return fmt.Errorf("invalid AT URI: %s", uri)
169
+
}
170
+
171
+
did := puri.Authority().String()
172
+
collection := puri.Collection().String()
173
+
rkey := puri.RecordKey().String()
174
+
b.AddRelevantDid(did)
175
+
176
+
repo, err := b.GetOrCreateRepo(ctx, did)
177
+
if err != nil {
178
+
return err
179
+
}
180
+
181
+
resp, err := b.dir.LookupDID(ctx, syntax.DID(did))
182
+
if err != nil {
183
+
return err
184
+
}
185
+
186
+
c := &xrpclib.Client{
187
+
Host: resp.PDSEndpoint(),
188
+
}
189
+
190
+
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
191
+
if err != nil {
192
+
return err
193
+
}
194
+
195
+
feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator)
196
+
if !ok {
197
+
return fmt.Errorf("record we got back wasn't a feed generator somehow")
198
+
}
199
+
200
+
buf := new(bytes.Buffer)
201
+
if err := feedGen.MarshalCBOR(buf); err != nil {
202
+
return err
203
+
}
204
+
205
+
cc, err := cid.Decode(*rec.Cid)
206
+
if err != nil {
207
+
return err
208
+
}
209
+
210
+
return b.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
211
+
}
+6
-5
go.mod
+6
-5
go.mod
···
3
3
go 1.25.1
4
4
5
5
require (
6
-
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f
6
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe
7
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1
7
8
github.com/gorilla/websocket v1.5.1
8
9
github.com/hashicorp/golang-lru/v2 v2.0.7
9
10
github.com/ipfs/go-cid v0.4.1
···
60
61
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
61
62
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
62
63
github.com/ipfs/go-verifcid v0.0.3 // indirect
63
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect
64
+
github.com/ipld/go-car v0.6.2 // indirect
64
65
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
65
66
github.com/ipld/go-ipld-prime v0.21.0 // indirect
66
67
github.com/jackc/pgpassfile v1.0.0 // indirect
···
69
70
github.com/jbenet/goprocess v0.1.4 // indirect
70
71
github.com/jinzhu/inflection v1.0.0 // indirect
71
72
github.com/jinzhu/now v1.1.5 // indirect
72
-
github.com/klauspost/compress v1.17.3 // indirect
73
+
github.com/klauspost/compress v1.17.9 // indirect
73
74
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
74
75
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
75
76
github.com/lestrrat-go/httpcc v1.0.1 // indirect
···
91
92
github.com/orandin/slog-gorm v1.3.2 // indirect
92
93
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect
93
94
github.com/prometheus/client_model v0.6.1 // indirect
94
-
github.com/prometheus/common v0.48.0 // indirect
95
-
github.com/prometheus/procfs v0.12.0 // indirect
95
+
github.com/prometheus/common v0.54.0 // indirect
96
+
github.com/prometheus/procfs v0.15.1 // indirect
96
97
github.com/redis/go-redis/v9 v9.3.0 // indirect
97
98
github.com/russross/blackfriday/v2 v2.1.0 // indirect
98
99
github.com/segmentio/asm v1.2.0 // indirect
+12
-10
go.sum
+12
-10
go.sum
···
6
6
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
7
7
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
8
8
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
9
-
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f h1:FugOoTzh0nCMTWGqNGsjttFWVPcwxaaGD3p/nE9V8qY=
10
-
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f/go.mod h1:n6QE1NDPFoi7PRbMUZmc2y7FibCqiVU4ePpsvhHUBR8=
9
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo=
10
+
github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck=
11
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU=
12
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs=
11
13
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous=
12
14
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c=
13
15
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
···
158
160
github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU=
159
161
github.com/ipfs/go-verifcid v0.0.3 h1:gmRKccqhWDocCRkC+a59g5QW7uJw5bpX9HWBevXa0zs=
160
162
github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw=
161
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI=
162
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA=
163
+
github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc=
164
+
github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8=
163
165
github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4=
164
166
github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo=
165
167
github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc=
···
188
190
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
189
191
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
190
192
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
191
-
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
192
-
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
193
+
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
194
+
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
193
195
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
194
196
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
195
197
github.com/koron/go-ssdp v0.0.3 h1:JivLMY45N76b4p/vsWGOKewBQu6uf39y8l+AQ7sDKx8=
···
312
314
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
313
315
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
314
316
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
315
-
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
316
-
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
317
-
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
318
-
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
317
+
github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8=
318
+
github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ=
319
+
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
320
+
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
319
321
github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA=
320
322
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
321
323
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+8
-8
handlers.go
+8
-8
handlers.go
···
146
146
}
147
147
148
148
if profile.Raw == nil || len(profile.Raw) == 0 {
149
-
s.addMissingProfile(ctx, accdid)
149
+
s.backend.TrackMissingRecord(accdid, false)
150
150
return e.JSON(404, map[string]any{
151
151
"error": "missing profile info for user",
152
152
})
···
307
307
}
308
308
309
309
if profile.Raw == nil || len(profile.Raw) == 0 {
310
-
s.addMissingProfile(ctx, r.Did)
310
+
s.backend.TrackMissingRecord(r.Did, false)
311
311
return &authorInfo{
312
312
Handle: resp.Handle.String(),
313
313
Did: r.Did,
···
379
379
380
380
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey)
381
381
if len(p.Raw) == 0 || p.NotFound {
382
-
s.addMissingPost(ctx, uri)
382
+
s.backend.TrackMissingRecord(uri, false)
383
383
posts[ix] = postResponse{
384
384
Uri: uri,
385
385
Missing: true,
···
515
515
quotedPost, err := s.backend.GetPostByUri(ctx, quotedURI, "*")
516
516
if err != nil {
517
517
slog.Warn("failed to get quoted post", "uri", quotedURI, "error", err)
518
-
s.addMissingPost(ctx, quotedURI)
518
+
s.backend.TrackMissingRecord(quotedURI, false)
519
519
return s.buildQuoteFallback(quotedURI, quotedCid)
520
520
}
521
521
522
522
if quotedPost == nil || quotedPost.Raw == nil || len(quotedPost.Raw) == 0 || quotedPost.NotFound {
523
-
s.addMissingPost(ctx, quotedURI)
523
+
s.backend.TrackMissingRecord(quotedURI, false)
524
524
return s.buildQuoteFallback(quotedURI, quotedCid)
525
525
}
526
526
···
707
707
prof = &p
708
708
}
709
709
} else {
710
-
s.addMissingProfile(ctx, r.Did)
710
+
s.backend.TrackMissingRecord(r.Did, false)
711
711
}
712
712
713
713
users = append(users, engagementUser{
···
767
767
prof = &p
768
768
}
769
769
} else {
770
-
s.addMissingProfile(ctx, r.Did)
770
+
s.backend.TrackMissingRecord(r.Did, false)
771
771
}
772
772
773
773
users = append(users, engagementUser{
···
835
835
prof = &p
836
836
}
837
837
} else {
838
-
s.addMissingProfile(ctx, r.Did)
838
+
s.backend.TrackMissingRecord(r.Did, false)
839
839
}
840
840
841
841
users = append(users, engagementUser{
+16
-10
hydration/post.go
+16
-10
hydration/post.go
···
17
17
18
18
// PostInfo contains hydrated post information
19
19
type PostInfo struct {
20
+
ID uint
20
21
URI string
21
22
Cid string
22
23
Post *bsky.FeedPost
···
39
40
ctx, span := tracer.Start(ctx, "hydratePost")
40
41
defer span.End()
41
42
43
+
p, err := h.backend.GetPostByUri(ctx, uri, "*")
44
+
if err != nil {
45
+
return nil, err
46
+
}
47
+
48
+
return h.HydratePostDB(ctx, uri, p, viewerDID)
49
+
}
50
+
51
+
func (h *Hydrator) HydratePostDB(ctx context.Context, uri string, dbPost *models.Post, viewerDID string) (*PostInfo, error) {
42
52
autoFetch, _ := ctx.Value("auto-fetch").(bool)
43
53
44
54
authorDid := extractDIDFromURI(uri)
···
47
57
return nil, err
48
58
}
49
59
50
-
// Query post from database
51
-
var dbPost models.Post
52
-
if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil {
53
-
return nil, fmt.Errorf("failed to query post: %w", err)
54
-
}
55
-
56
60
if dbPost.NotFound || len(dbPost.Raw) == 0 {
57
61
if autoFetch {
58
62
h.AddMissingRecord(uri, true)
59
-
if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil {
63
+
if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ?`, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil {
60
64
return nil, fmt.Errorf("failed to query post: %w", err)
61
65
}
62
66
if dbPost.NotFound || len(dbPost.Raw) == 0 {
···
75
79
76
80
var wg sync.WaitGroup
77
81
78
-
// Get author DID
79
-
80
-
authorDID := extractDIDFromURI(uri)
82
+
authorDID := r.Did
81
83
82
84
// Get engagement counts
83
85
var likes, reposts, replies int
···
121
123
wg.Wait()
122
124
123
125
info := &PostInfo{
126
+
ID: dbPost.ID,
124
127
URI: uri,
125
128
Cid: dbPost.Cid,
126
129
Post: &feedPost,
···
385
388
386
389
// hydrateEmbeddedRecord hydrates an embedded record (for quote posts, etc.)
387
390
func (h *Hydrator) hydrateEmbeddedRecord(ctx context.Context, uri string, viewerDID string) *bsky.EmbedRecord_View_Record {
391
+
ctx, span := tracer.Start(ctx, "hydrateEmbeddedRecord")
392
+
defer span.End()
393
+
388
394
// Check if it's a post URI
389
395
if !isPostURI(uri) {
390
396
// Could be a feed generator, list, labeler, or starter pack
+10
hydration/utils.go
+10
hydration/utils.go
···
5
5
"fmt"
6
6
7
7
"github.com/bluesky-social/indigo/atproto/syntax"
8
+
"github.com/whyrusleeping/market/models"
8
9
)
9
10
10
11
func (h *Hydrator) NormalizeUri(ctx context.Context, uri string) (string, error) {
···
27
28
28
29
return fmt.Sprintf("at://%s/%s/%s", did, puri.Collection().String(), puri.RecordKey().String()), nil
29
30
}
31
+
32
+
func (h *Hydrator) UriForPost(ctx context.Context, p *models.Post) (string, error) {
33
+
did, err := h.backend.DidFromID(ctx, p.Author)
34
+
if err != nil {
35
+
return "", err
36
+
}
37
+
38
+
return fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, p.Rkey), nil
39
+
}
+39
-103
main.go
+39
-103
main.go
···
3
3
import (
4
4
"bytes"
5
5
"context"
6
+
"encoding/json"
6
7
"fmt"
7
8
"log"
8
9
"log/slog"
···
19
20
"github.com/bluesky-social/indigo/atproto/identity"
20
21
"github.com/bluesky-social/indigo/atproto/identity/redisdir"
21
22
"github.com/bluesky-social/indigo/atproto/syntax"
22
-
"github.com/bluesky-social/indigo/cmd/relay/stream"
23
-
"github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
24
23
"github.com/bluesky-social/indigo/repo"
25
24
"github.com/bluesky-social/indigo/util/cliutil"
26
25
xrpclib "github.com/bluesky-social/indigo/xrpc"
27
-
"github.com/gorilla/websocket"
28
26
"github.com/ipfs/go-cid"
29
27
"github.com/jackc/pgx/v5/pgxpool"
30
28
"github.com/prometheus/client_golang/prometheus"
···
71
69
&cli.StringFlag{
72
70
Name: "redis-url",
73
71
},
72
+
&cli.StringFlag{
73
+
Name: "sync-config",
74
+
},
74
75
}
75
76
app.Action = func(cctx *cli.Context) error {
76
77
db, err := cliutil.SetupDatabase(cctx.String("db-url"), cctx.Int("max-db-connections"))
···
135
136
db.AutoMigrate(SequenceTracker{})
136
137
db.Exec("CREATE INDEX IF NOT EXISTS reposts_subject_idx ON reposts (subject)")
137
138
db.Exec("CREATE INDEX IF NOT EXISTS posts_reply_to_idx ON posts (reply_to)")
139
+
db.Exec("CREATE INDEX IF NOT EXISTS posts_in_thread_idx ON posts (in_thread)")
138
140
139
141
ctx := context.TODO()
140
142
···
199
201
client: cc,
200
202
dir: dir,
201
203
202
-
missingRecords: make(chan MissingRecord, 1024),
203
-
db: db,
204
+
db: db,
204
205
}
205
-
fmt.Println("MY DID: ", s.mydid)
206
206
207
-
pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, nil)
207
+
pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, dir)
208
208
if err != nil {
209
209
return err
210
210
}
···
241
241
http.ListenAndServe(":4445", nil)
242
242
}()
243
243
244
-
go s.missingRecordFetcher()
244
+
sc := SyncConfig{
245
+
Backends: []SyncBackend{
246
+
{
247
+
Type: "firehose",
248
+
Host: "bsky.network",
249
+
},
250
+
},
251
+
}
245
252
246
-
seqno, err := loadLastSeq(db, "firehose_seq")
247
-
if err != nil {
248
-
fmt.Println("failed to load sequence number, starting over", err)
253
+
if scfn := cctx.String("sync-config"); scfn != "" {
254
+
{
255
+
scfi, err := os.Open(scfn)
256
+
if err != nil {
257
+
return err
258
+
}
259
+
defer scfi.Close()
260
+
261
+
var lsc SyncConfig
262
+
if err := json.NewDecoder(scfi).Decode(&lsc); err != nil {
263
+
return err
264
+
}
265
+
sc = lsc
266
+
}
249
267
}
250
268
251
-
return s.startLiveTail(ctx, int(seqno), 10, 20)
269
+
/*
270
+
sc.Backends[0] = SyncBackend{
271
+
Type: "jetstream",
272
+
Host: "jetstream1.us-west.bsky.network",
273
+
}
274
+
*/
275
+
276
+
return s.StartSyncEngine(ctx, &sc)
277
+
252
278
}
253
279
254
280
app.RunAndExitOnError()
···
266
292
seqLk sync.Mutex
267
293
lastSeq int64
268
294
269
-
mpLk sync.Mutex
270
-
missingRecords chan MissingRecord
295
+
mpLk sync.Mutex
271
296
272
297
db *gorm.DB
273
298
}
···
275
300
func (s *Server) getXrpcClient() (*xrpclib.Client, error) {
276
301
// TODO: handle refreshing the token periodically
277
302
return s.client, nil
278
-
}
279
-
280
-
func (s *Server) startLiveTail(ctx context.Context, curs int, parWorkers, maxQ int) error {
281
-
slog.Info("starting live tail")
282
-
283
-
// Connect to the Relay websocket
284
-
urlStr := fmt.Sprintf("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", curs)
285
-
286
-
d := websocket.DefaultDialer
287
-
con, _, err := d.Dial(urlStr, http.Header{
288
-
"User-Agent": []string{"market/0.0.1"},
289
-
})
290
-
if err != nil {
291
-
return fmt.Errorf("failed to connect to relay: %w", err)
292
-
}
293
-
294
-
var lelk sync.Mutex
295
-
lastEvent := time.Now()
296
-
297
-
go func() {
298
-
for range time.Tick(time.Second) {
299
-
lelk.Lock()
300
-
let := lastEvent
301
-
lelk.Unlock()
302
-
303
-
if time.Since(let) > time.Second*30 {
304
-
slog.Error("firehose connection timed out")
305
-
con.Close()
306
-
return
307
-
}
308
-
309
-
}
310
-
311
-
}()
312
-
313
-
var cclk sync.Mutex
314
-
var completeCursor int64
315
-
316
-
rsc := &stream.RepoStreamCallbacks{
317
-
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
318
-
ctx := context.Background()
319
-
320
-
firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq))
321
-
322
-
s.seqLk.Lock()
323
-
if evt.Seq > s.lastSeq {
324
-
curs = int(evt.Seq)
325
-
s.lastSeq = evt.Seq
326
-
327
-
if evt.Seq%1000 == 0 {
328
-
if err := storeLastSeq(s.db, "firehose_seq", evt.Seq); err != nil {
329
-
fmt.Println("failed to store seqno: ", err)
330
-
}
331
-
}
332
-
}
333
-
s.seqLk.Unlock()
334
-
335
-
lelk.Lock()
336
-
lastEvent = time.Now()
337
-
lelk.Unlock()
338
-
339
-
if err := s.backend.HandleEvent(ctx, evt); err != nil {
340
-
return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err)
341
-
}
342
-
343
-
cclk.Lock()
344
-
if evt.Seq > completeCursor {
345
-
completeCursor = evt.Seq
346
-
firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq))
347
-
}
348
-
cclk.Unlock()
349
-
350
-
return nil
351
-
},
352
-
RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error {
353
-
return nil
354
-
},
355
-
// TODO: all the other event types
356
-
Error: func(errf *stream.ErrorFrame) error {
357
-
return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message)
358
-
},
359
-
}
360
-
361
-
sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler)
362
-
363
-
//s.eventScheduler = sched
364
-
//s.streamFinished = make(chan struct{})
365
-
366
-
return stream.HandleRepoStream(ctx, con, sched, slog.Default())
367
303
}
368
304
369
305
func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) {
-234
missing.go
-234
missing.go
···
1
-
package main
2
-
3
-
import (
4
-
"bytes"
5
-
"context"
6
-
"fmt"
7
-
"log/slog"
8
-
9
-
"github.com/bluesky-social/indigo/api/atproto"
10
-
"github.com/bluesky-social/indigo/api/bsky"
11
-
"github.com/bluesky-social/indigo/atproto/syntax"
12
-
xrpclib "github.com/bluesky-social/indigo/xrpc"
13
-
"github.com/ipfs/go-cid"
14
-
)
15
-
16
-
type MissingRecordType string
17
-
18
-
const (
19
-
MissingRecordTypeProfile MissingRecordType = "profile"
20
-
MissingRecordTypePost MissingRecordType = "post"
21
-
MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator"
22
-
)
23
-
24
-
type MissingRecord struct {
25
-
Type MissingRecordType
26
-
Identifier string // DID for profiles, AT-URI for posts/feedgens
27
-
Wait bool
28
-
29
-
waitch chan struct{}
30
-
}
31
-
32
-
func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) {
33
-
if rec.Wait {
34
-
rec.waitch = make(chan struct{})
35
-
}
36
-
37
-
select {
38
-
case s.missingRecords <- rec:
39
-
case <-ctx.Done():
40
-
}
41
-
42
-
if rec.Wait {
43
-
select {
44
-
case <-rec.waitch:
45
-
case <-ctx.Done():
46
-
}
47
-
}
48
-
}
49
-
50
-
// Legacy methods for backward compatibility
51
-
func (s *Server) addMissingProfile(ctx context.Context, did string) {
52
-
s.addMissingRecord(ctx, MissingRecord{
53
-
Type: MissingRecordTypeProfile,
54
-
Identifier: did,
55
-
})
56
-
}
57
-
58
-
func (s *Server) addMissingPost(ctx context.Context, uri string) {
59
-
slog.Info("adding missing post to fetch queue", "uri", uri)
60
-
s.addMissingRecord(ctx, MissingRecord{
61
-
Type: MissingRecordTypePost,
62
-
Identifier: uri,
63
-
})
64
-
}
65
-
66
-
func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) {
67
-
slog.Info("adding missing feed generator to fetch queue", "uri", uri)
68
-
s.addMissingRecord(ctx, MissingRecord{
69
-
Type: MissingRecordTypeFeedGenerator,
70
-
Identifier: uri,
71
-
})
72
-
}
73
-
74
-
func (s *Server) missingRecordFetcher() {
75
-
for rec := range s.missingRecords {
76
-
var err error
77
-
switch rec.Type {
78
-
case MissingRecordTypeProfile:
79
-
err = s.fetchMissingProfile(context.TODO(), rec.Identifier)
80
-
case MissingRecordTypePost:
81
-
err = s.fetchMissingPost(context.TODO(), rec.Identifier)
82
-
case MissingRecordTypeFeedGenerator:
83
-
err = s.fetchMissingFeedGenerator(context.TODO(), rec.Identifier)
84
-
default:
85
-
slog.Error("unknown missing record type", "type", rec.Type)
86
-
continue
87
-
}
88
-
89
-
if err != nil {
90
-
slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err)
91
-
}
92
-
93
-
if rec.Wait {
94
-
close(rec.waitch)
95
-
}
96
-
}
97
-
}
98
-
99
-
func (s *Server) fetchMissingProfile(ctx context.Context, did string) error {
100
-
s.backend.AddRelevantDid(did)
101
-
102
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
103
-
if err != nil {
104
-
return err
105
-
}
106
-
107
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
108
-
if err != nil {
109
-
return err
110
-
}
111
-
112
-
c := &xrpclib.Client{
113
-
Host: resp.PDSEndpoint(),
114
-
}
115
-
116
-
rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self")
117
-
if err != nil {
118
-
return err
119
-
}
120
-
121
-
prof, ok := rec.Value.Val.(*bsky.ActorProfile)
122
-
if !ok {
123
-
return fmt.Errorf("record we got back wasnt a profile somehow")
124
-
}
125
-
126
-
buf := new(bytes.Buffer)
127
-
if err := prof.MarshalCBOR(buf); err != nil {
128
-
return err
129
-
}
130
-
131
-
cc, err := cid.Decode(*rec.Cid)
132
-
if err != nil {
133
-
return err
134
-
}
135
-
136
-
return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
137
-
}
138
-
139
-
func (s *Server) fetchMissingPost(ctx context.Context, uri string) error {
140
-
puri, err := syntax.ParseATURI(uri)
141
-
if err != nil {
142
-
return fmt.Errorf("invalid AT URI: %s", uri)
143
-
}
144
-
145
-
did := puri.Authority().String()
146
-
collection := puri.Collection().String()
147
-
rkey := puri.RecordKey().String()
148
-
149
-
s.backend.AddRelevantDid(did)
150
-
151
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
152
-
if err != nil {
153
-
return err
154
-
}
155
-
156
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
157
-
if err != nil {
158
-
return err
159
-
}
160
-
161
-
c := &xrpclib.Client{
162
-
Host: resp.PDSEndpoint(),
163
-
}
164
-
165
-
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
166
-
if err != nil {
167
-
return err
168
-
}
169
-
170
-
post, ok := rec.Value.Val.(*bsky.FeedPost)
171
-
if !ok {
172
-
return fmt.Errorf("record we got back wasn't a post somehow")
173
-
}
174
-
175
-
buf := new(bytes.Buffer)
176
-
if err := post.MarshalCBOR(buf); err != nil {
177
-
return err
178
-
}
179
-
180
-
cc, err := cid.Decode(*rec.Cid)
181
-
if err != nil {
182
-
return err
183
-
}
184
-
185
-
return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
186
-
}
187
-
188
-
func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error {
189
-
puri, err := syntax.ParseATURI(uri)
190
-
if err != nil {
191
-
return fmt.Errorf("invalid AT URI: %s", uri)
192
-
}
193
-
194
-
did := puri.Authority().String()
195
-
collection := puri.Collection().String()
196
-
rkey := puri.RecordKey().String()
197
-
s.backend.AddRelevantDid(did)
198
-
199
-
repo, err := s.backend.GetOrCreateRepo(ctx, did)
200
-
if err != nil {
201
-
return err
202
-
}
203
-
204
-
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
205
-
if err != nil {
206
-
return err
207
-
}
208
-
209
-
c := &xrpclib.Client{
210
-
Host: resp.PDSEndpoint(),
211
-
}
212
-
213
-
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
214
-
if err != nil {
215
-
return err
216
-
}
217
-
218
-
feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator)
219
-
if !ok {
220
-
return fmt.Errorf("record we got back wasn't a feed generator somehow")
221
-
}
222
-
223
-
buf := new(bytes.Buffer)
224
-
if err := feedGen.MarshalCBOR(buf); err != nil {
225
-
return err
226
-
}
227
-
228
-
cc, err := cid.Decode(*rec.Cid)
229
-
if err != nil {
230
-
return err
231
-
}
232
-
233
-
return s.backend.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc)
234
-
}
+8
sync-config-jetstream.json
+8
sync-config-jetstream.json
+281
sync.go
+281
sync.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"log/slog"
7
+
"net/http"
8
+
"sync"
9
+
"time"
10
+
11
+
"github.com/bluesky-social/indigo/api/atproto"
12
+
"github.com/bluesky-social/indigo/cmd/relay/stream"
13
+
"github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
14
+
jsclient "github.com/bluesky-social/jetstream/pkg/client"
15
+
jsparallel "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel"
16
+
"github.com/bluesky-social/jetstream/pkg/models"
17
+
"github.com/gorilla/websocket"
18
+
)
19
+
20
+
type SyncConfig struct {
21
+
Backends []SyncBackend `json:"backends"`
22
+
}
23
+
24
+
type SyncBackend struct {
25
+
Type string `json:"type"`
26
+
Host string `json:"host"`
27
+
MaxWorkers int `json:"max_workers,omitempty"`
28
+
}
29
+
30
+
func (s *Server) StartSyncEngine(ctx context.Context, sc *SyncConfig) error {
31
+
for _, be := range sc.Backends {
32
+
switch be.Type {
33
+
case "firehose":
34
+
go s.runSyncFirehose(ctx, be)
35
+
case "jetstream":
36
+
go s.runSyncJetstream(ctx, be)
37
+
default:
38
+
return fmt.Errorf("unrecognized sync backend type: %q", be.Type)
39
+
}
40
+
}
41
+
42
+
<-ctx.Done()
43
+
return fmt.Errorf("exiting sync routine")
44
+
}
45
+
46
+
const failureTimeInterval = time.Second * 5
47
+
48
+
func (s *Server) runSyncFirehose(ctx context.Context, be SyncBackend) {
49
+
var failures int
50
+
for {
51
+
seqno, err := loadLastSeq(s.db, be.Host)
52
+
if err != nil {
53
+
fmt.Println("failed to load sequence number, starting over", err)
54
+
}
55
+
56
+
maxWorkers := 10
57
+
if be.MaxWorkers != 0 {
58
+
maxWorkers = be.MaxWorkers
59
+
}
60
+
61
+
start := time.Now()
62
+
if err := s.startLiveTail(ctx, be.Host, int(seqno), maxWorkers, 20); err != nil {
63
+
slog.Error("firehose connection lost", "host", be.Host, "error", err)
64
+
}
65
+
66
+
elapsed := time.Since(start)
67
+
68
+
if elapsed > failureTimeInterval {
69
+
failures = 0
70
+
continue
71
+
}
72
+
failures++
73
+
74
+
delay := delayForFailureCount(failures)
75
+
slog.Warn("retrying connection after delay", "host", be.Host, "delay", delay)
76
+
}
77
+
}
78
+
79
+
func (s *Server) runSyncJetstream(ctx context.Context, be SyncBackend) {
80
+
var failures int
81
+
for {
82
+
// Load last cursor (stored as sequence number in same table)
83
+
cursor, err := loadLastSeq(s.db, be.Host)
84
+
if err != nil {
85
+
slog.Warn("failed to load jetstream cursor, starting from live", "error", err)
86
+
cursor = 0
87
+
}
88
+
89
+
maxWorkers := 10
90
+
if be.MaxWorkers != 0 {
91
+
maxWorkers = be.MaxWorkers
92
+
}
93
+
94
+
start := time.Now()
95
+
if err := s.startJetstreamTail(ctx, be.Host, cursor, maxWorkers); err != nil {
96
+
slog.Error("jetstream connection lost", "host", be.Host, "error", err)
97
+
}
98
+
99
+
elapsed := time.Since(start)
100
+
101
+
if elapsed > failureTimeInterval {
102
+
failures = 0
103
+
continue
104
+
}
105
+
failures++
106
+
107
+
delay := delayForFailureCount(failures)
108
+
slog.Warn("retrying jetstream connection after delay", "host", be.Host, "delay", delay)
109
+
time.Sleep(delay)
110
+
}
111
+
}
112
+
113
+
func delayForFailureCount(n int) time.Duration {
114
+
if n < 5 {
115
+
return (time.Second * 5) + (time.Second * 2 * time.Duration(n))
116
+
}
117
+
118
+
return time.Second * 30
119
+
}
120
+
121
+
func (s *Server) startLiveTail(ctx context.Context, host string, curs int, parWorkers, maxQ int) error {
122
+
ctx, cancel := context.WithCancel(ctx)
123
+
defer cancel()
124
+
125
+
slog.Info("starting live tail")
126
+
127
+
// Connect to the Relay websocket
128
+
urlStr := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", host, curs)
129
+
130
+
d := websocket.DefaultDialer
131
+
con, _, err := d.Dial(urlStr, http.Header{
132
+
"User-Agent": []string{"konbini/0.0.1"},
133
+
})
134
+
if err != nil {
135
+
return fmt.Errorf("failed to connect to relay: %w", err)
136
+
}
137
+
138
+
var lelk sync.Mutex
139
+
lastEvent := time.Now()
140
+
141
+
go func() {
142
+
tick := time.NewTicker(time.Second)
143
+
defer tick.Stop()
144
+
for {
145
+
select {
146
+
case <-tick.C:
147
+
lelk.Lock()
148
+
let := lastEvent
149
+
lelk.Unlock()
150
+
151
+
if time.Since(let) > time.Second*30 {
152
+
slog.Error("firehose connection timed out")
153
+
con.Close()
154
+
return
155
+
}
156
+
case <-ctx.Done():
157
+
return
158
+
}
159
+
}
160
+
}()
161
+
162
+
var cclk sync.Mutex
163
+
var completeCursor int64
164
+
165
+
rsc := &stream.RepoStreamCallbacks{
166
+
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
167
+
ctx := context.Background()
168
+
169
+
firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq))
170
+
171
+
s.seqLk.Lock()
172
+
if evt.Seq > s.lastSeq {
173
+
curs = int(evt.Seq)
174
+
s.lastSeq = evt.Seq
175
+
176
+
if evt.Seq%1000 == 0 {
177
+
if err := storeLastSeq(s.db, host, evt.Seq); err != nil {
178
+
fmt.Println("failed to store seqno: ", err)
179
+
}
180
+
}
181
+
}
182
+
s.seqLk.Unlock()
183
+
184
+
lelk.Lock()
185
+
lastEvent = time.Now()
186
+
lelk.Unlock()
187
+
188
+
if err := s.backend.HandleEvent(ctx, evt); err != nil {
189
+
return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err)
190
+
}
191
+
192
+
cclk.Lock()
193
+
if evt.Seq > completeCursor {
194
+
completeCursor = evt.Seq
195
+
firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq))
196
+
}
197
+
cclk.Unlock()
198
+
199
+
return nil
200
+
},
201
+
RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error {
202
+
return nil
203
+
},
204
+
// TODO: all the other event types
205
+
Error: func(errf *stream.ErrorFrame) error {
206
+
return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message)
207
+
},
208
+
}
209
+
210
+
sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler)
211
+
212
+
return stream.HandleRepoStream(ctx, con, sched, slog.Default())
213
+
}
214
+
215
+
func (s *Server) startJetstreamTail(ctx context.Context, host string, cursor int64, parWorkers int) error {
216
+
ctx, cancel := context.WithCancel(ctx)
217
+
defer cancel()
218
+
219
+
slog.Info("starting jetstream tail", "host", host, "cursor", cursor)
220
+
221
+
// Create a scheduler for parallel processing
222
+
lastStored := int64(0)
223
+
sched := jsparallel.NewScheduler(
224
+
parWorkers,
225
+
host,
226
+
slog.Default(),
227
+
func(ctx context.Context, event *models.Event) error {
228
+
// Update cursor tracking
229
+
s.seqLk.Lock()
230
+
if event.TimeUS > s.lastSeq {
231
+
s.lastSeq = event.TimeUS
232
+
if event.TimeUS-lastStored > 1_000_000 {
233
+
// Store checkpoint periodically
234
+
if err := storeLastSeq(s.db, host, event.TimeUS); err != nil {
235
+
slog.Error("failed to store jetstream cursor", "error", err)
236
+
}
237
+
lastStored = event.TimeUS
238
+
}
239
+
}
240
+
s.seqLk.Unlock()
241
+
242
+
// Update metrics
243
+
firehoseCursorGauge.WithLabelValues("ingest").Set(float64(event.TimeUS))
244
+
245
+
// Convert Jetstream event to ATProto event format
246
+
if event.Commit != nil {
247
+
248
+
if err := s.backend.HandleEventJetstream(ctx, event); err != nil {
249
+
return fmt.Errorf("handle event (%s,%d): %w", event.Did, event.TimeUS, err)
250
+
}
251
+
252
+
firehoseCursorGauge.WithLabelValues("complete").Set(float64(event.TimeUS))
253
+
}
254
+
255
+
return nil
256
+
},
257
+
)
258
+
259
+
// Configure Jetstream client
260
+
config := jsclient.DefaultClientConfig()
261
+
config.WebsocketURL = fmt.Sprintf("wss://%s/subscribe", host)
262
+
263
+
// Prepare cursor pointer
264
+
var cursorPtr *int64
265
+
if cursor > 0 {
266
+
cursorPtr = &cursor
267
+
}
268
+
269
+
// Create and connect client
270
+
client, err := jsclient.NewClient(
271
+
config,
272
+
slog.Default(),
273
+
sched,
274
+
)
275
+
if err != nil {
276
+
return fmt.Errorf("create jetstream client: %w", err)
277
+
}
278
+
279
+
// Start reading from Jetstream
280
+
return client.ConnectAndRead(ctx, cursorPtr)
281
+
}
+11
-11
xrpc/feed/getPostThread.go
+11
-11
xrpc/feed/getPostThread.go
···
15
15
func HandleGetPostThread(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
16
16
uriParam := c.QueryParam("uri")
17
17
if uriParam == "" {
18
-
return c.JSON(http.StatusBadRequest, map[string]interface{}{
18
+
return c.JSON(http.StatusBadRequest, map[string]any{
19
19
"error": "InvalidRequest",
20
20
"message": "uri parameter is required",
21
21
})
···
27
27
// Hydrate the requested post
28
28
postInfo, err := hydrator.HydratePost(ctx, uriParam, viewer)
29
29
if err != nil {
30
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
30
+
return c.JSON(http.StatusNotFound, map[string]any{
31
31
"error": "NotFound",
32
32
"message": "post not found",
33
33
})
···
74
74
uri: uri,
75
75
replyTo: tp.ReplyTo,
76
76
inThread: tp.InThread,
77
-
replies: []interface{}{},
77
+
replies: []any{},
78
78
}
79
79
}
80
80
···
98
98
}
99
99
100
100
if rootNode == nil {
101
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
101
+
return c.JSON(http.StatusNotFound, map[string]any{
102
102
"error": "NotFound",
103
103
"message": "thread root not found",
104
104
})
···
107
107
// Build the response by traversing the tree
108
108
thread := buildThreadView(ctx, db, rootNode, postsByID, hydrator, viewer, nil)
109
109
110
-
return c.JSON(http.StatusOK, map[string]interface{}{
110
+
return c.JSON(http.StatusOK, map[string]any{
111
111
"thread": thread,
112
112
})
113
113
}
···
117
117
uri string
118
118
replyTo uint
119
119
inThread uint
120
-
replies []interface{}
120
+
replies []any
121
121
}
122
122
123
-
func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent interface{}) interface{} {
123
+
func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent any) any {
124
124
// Hydrate this post
125
125
postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer)
126
126
if err != nil {
127
127
// Return a notFound post
128
-
return map[string]interface{}{
128
+
return map[string]any{
129
129
"$type": "app.bsky.feed.defs#notFoundPost",
130
130
"uri": node.uri,
131
131
}
···
134
134
// Hydrate author
135
135
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
136
136
if err != nil {
137
-
return map[string]interface{}{
137
+
return map[string]any{
138
138
"$type": "app.bsky.feed.defs#notFoundPost",
139
139
"uri": node.uri,
140
140
}
141
141
}
142
142
143
143
// Build replies
144
-
var replies []interface{}
144
+
var replies []any
145
145
for _, replyNode := range node.replies {
146
146
if rn, ok := replyNode.(*threadPostNode); ok {
147
147
replyView := buildThreadView(ctx, db, rn, allNodes, hydrator, viewer, nil)
···
150
150
}
151
151
152
152
// Build the thread view post
153
-
var repliesForView interface{}
153
+
var repliesForView any
154
154
if len(replies) > 0 {
155
155
repliesForView = replies
156
156
}
+12
xrpc/notification/listNotifications.go
+12
xrpc/notification/listNotifications.go
···
131
131
cursorPtr = &cursor
132
132
}
133
133
134
+
var lastSeen time.Time
135
+
if err := db.Raw("SELECT seen_at FROM notification_seens WHERE repo = (select id from repos where did = ?)", viewer).Scan(&lastSeen).Error; err != nil {
136
+
return err
137
+
}
138
+
139
+
var lastSeenStr *string
140
+
if !lastSeen.IsZero() {
141
+
s := lastSeen.Format(time.RFC3339)
142
+
lastSeenStr = &s
143
+
}
144
+
134
145
output := &bsky.NotificationListNotifications_Output{
135
146
Notifications: notifications,
136
147
Cursor: cursorPtr,
148
+
SeenAt: lastSeenStr,
137
149
}
138
150
139
151
return c.JSON(http.StatusOK, output)
+159
-131
xrpc/unspecced/getPostThreadV2.go
+159
-131
xrpc/unspecced/getPostThreadV2.go
···
1
1
package unspecced
2
2
3
3
import (
4
+
"bytes"
4
5
"context"
5
6
"fmt"
6
7
"log/slog"
7
8
"net/http"
8
9
"strconv"
10
+
"sync"
9
11
10
12
"github.com/bluesky-social/indigo/api/bsky"
11
13
"github.com/labstack/echo/v4"
12
14
"github.com/whyrusleeping/konbini/hydration"
13
15
"github.com/whyrusleeping/konbini/views"
16
+
"github.com/whyrusleeping/market/models"
17
+
"go.opentelemetry.io/otel"
14
18
"gorm.io/gorm"
15
19
)
16
20
21
+
var tracer = otel.Tracer("xrpc/unspecced")
22
+
17
23
// HandleGetPostThreadV2 implements app.bsky.unspecced.getPostThreadV2
18
24
func HandleGetPostThreadV2(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error {
19
-
ctx := c.Request().Context()
25
+
ctx, span := tracer.Start(c.Request().Context(), "getPostThreadV2")
26
+
defer span.End()
20
27
ctx = context.WithValue(ctx, "auto-fetch", true)
21
28
22
29
// Parse parameters
···
69
76
})
70
77
}
71
78
72
-
// Determine the root post ID for the thread
73
-
rootPostID := anchorPostInfo.InThread
74
-
if rootPostID == 0 {
75
-
// This post is the root - get its ID
76
-
var postID uint
77
-
db.Raw(`
78
-
SELECT id FROM posts
79
-
WHERE author = (SELECT id FROM repos WHERE did = ?)
80
-
AND rkey = ?
81
-
`, extractDIDFromURI(anchorUri), extractRkeyFromURI(anchorUri)).Scan(&postID)
82
-
rootPostID = postID
79
+
threadID := anchorPostInfo.InThread
80
+
if threadID == 0 {
81
+
threadID = anchorPostInfo.ID
83
82
}
84
83
85
-
// Query all posts in this thread
86
-
type threadPostRow struct {
87
-
ID uint
88
-
Rkey string
89
-
ReplyTo uint
90
-
InThread uint
91
-
AuthorDid string
84
+
var threadPosts []*models.Post
85
+
if err := db.Raw("SELECT * FROM posts WHERE in_thread = ? OR id = ?", threadID, anchorPostInfo.ID).Scan(&threadPosts).Error; err != nil {
86
+
return err
92
87
}
93
-
var threadPosts []threadPostRow
94
-
db.Raw(`
95
-
SELECT p.id, p.rkey, p.reply_to, p.in_thread, r.did as author_did
96
-
FROM posts p
97
-
JOIN repos r ON r.id = p.author
98
-
WHERE (p.id = ? OR p.in_thread = ?)
99
-
AND p.not_found = false
100
-
ORDER BY p.created ASC
101
-
`, rootPostID, rootPostID).Scan(&threadPosts)
102
88
103
-
// Build a map of posts by ID
104
-
postsByID := make(map[uint]*threadNode)
105
-
for _, tp := range threadPosts {
106
-
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", tp.AuthorDid, tp.Rkey)
107
-
postsByID[tp.ID] = &threadNode{
108
-
id: tp.ID,
109
-
uri: uri,
110
-
replyTo: tp.ReplyTo,
111
-
inThread: tp.InThread,
112
-
children: []*threadNode{},
113
-
}
114
-
}
89
+
fmt.Println("GOT THREAD POSTS: ", len(threadPosts))
115
90
116
-
// Build parent-child relationships
117
-
for _, node := range postsByID {
118
-
if node.replyTo != 0 {
119
-
parent := postsByID[node.replyTo]
120
-
if parent != nil {
121
-
parent.children = append(parent.children, node)
122
-
}
123
-
}
91
+
treeNodes, err := buildThreadTree(ctx, hydrator, db, threadPosts)
92
+
if err != nil {
93
+
return fmt.Errorf("failed to construct tree: %w", err)
124
94
}
125
95
126
-
// Find the anchor node
127
-
anchorID := uint(0)
128
-
for id, node := range postsByID {
129
-
if node.uri == anchorUri {
130
-
anchorID = id
131
-
break
132
-
}
133
-
}
134
-
135
-
if anchorID == 0 {
136
-
return c.JSON(http.StatusNotFound, map[string]interface{}{
137
-
"error": "NotFound",
138
-
"message": "anchor post not found in thread",
139
-
})
140
-
}
141
-
142
-
anchorNode := postsByID[anchorID]
96
+
anchor := treeNodes[anchorPostInfo.ID]
143
97
144
98
// Build flat thread items list
145
99
var threadItems []*bsky.UnspeccedGetPostThreadV2_ThreadItem
···
147
101
148
102
// Add parents if requested
149
103
if above {
150
-
parents := collectParents(anchorNode, postsByID)
151
-
for i := len(parents) - 1; i >= 0; i-- {
152
-
depth := int64(-(len(parents) - i))
153
-
item := buildThreadItem(ctx, hydrator, parents[i], depth, viewer)
104
+
parent := anchor.parent
105
+
depth := int64(-1)
106
+
for parent != nil {
107
+
if parent.missing {
108
+
fmt.Println("Parent missing: ", depth)
109
+
item := &bsky.UnspeccedGetPostThreadV2_ThreadItem{
110
+
Depth: depth,
111
+
Uri: parent.uri,
112
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
113
+
UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{
114
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound",
115
+
},
116
+
},
117
+
}
118
+
119
+
threadItems = append(threadItems, item)
120
+
break
121
+
}
122
+
123
+
item := buildThreadItem(ctx, hydrator, parent, depth, viewer)
154
124
if item != nil {
155
125
threadItems = append(threadItems, item)
156
126
}
127
+
128
+
parent = parent.parent
129
+
depth--
157
130
}
158
131
}
159
132
160
133
// Add anchor post (depth 0)
161
-
anchorItem := buildThreadItem(ctx, hydrator, anchorNode, 0, viewer)
134
+
anchorItem := buildThreadItem(ctx, hydrator, anchor, 0, viewer)
162
135
if anchorItem != nil {
163
136
threadItems = append(threadItems, anchorItem)
164
137
}
165
138
166
139
// Add replies below anchor
167
140
if below > 0 {
168
-
replies, hasMore := collectReplies(ctx, hydrator, anchorNode, 1, below, branchingFactor, sort, viewer)
141
+
replies, err := collectReplies(ctx, hydrator, anchor, 0, below, branchingFactor, sort, viewer)
142
+
if err != nil {
143
+
return err
144
+
}
169
145
threadItems = append(threadItems, replies...)
170
-
hasOtherReplies = hasMore
146
+
//hasOtherReplies = hasMore
171
147
}
172
148
173
149
return c.JSON(http.StatusOK, &bsky.UnspeccedGetPostThreadV2_Output{
···
176
152
})
177
153
}
178
154
179
-
type threadNode struct {
180
-
id uint
181
-
uri string
182
-
replyTo uint
183
-
inThread uint
184
-
children []*threadNode
185
-
}
155
+
func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, curnode *threadTree, depth int64, below int64, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, error) {
156
+
if below == 0 {
157
+
return nil, nil
158
+
}
186
159
187
-
func collectParents(node *threadNode, allNodes map[uint]*threadNode) []*threadNode {
188
-
var parents []*threadNode
189
-
current := node
190
-
for current.replyTo != 0 {
191
-
parent := allNodes[current.replyTo]
192
-
if parent == nil {
193
-
break
194
-
}
195
-
parents = append(parents, parent)
196
-
current = parent
160
+
type parThreadResults struct {
161
+
node *bsky.UnspeccedGetPostThreadV2_ThreadItem
162
+
children []*bsky.UnspeccedGetPostThreadV2_ThreadItem
197
163
}
198
-
return parents
199
-
}
164
+
165
+
results := make([]parThreadResults, len(curnode.children))
166
+
167
+
var wg sync.WaitGroup
168
+
for i := range curnode.children {
169
+
ix := i
170
+
wg.Go(func() {
171
+
child := curnode.children[ix]
172
+
173
+
results[ix].node = buildThreadItem(ctx, hydrator, child, depth+1, viewer)
174
+
if child.missing {
175
+
return
176
+
}
200
177
201
-
func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, currentDepth, maxDepth, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, bool) {
202
-
var items []*bsky.UnspeccedGetPostThreadV2_ThreadItem
203
-
hasMore := false
178
+
sub, err := collectReplies(ctx, hydrator, child, depth+1, below-1, branchingFactor, sort, viewer)
179
+
if err != nil {
180
+
slog.Error("failed to collect replies", "node", child.uri, "error", err)
181
+
return
182
+
}
204
183
205
-
if currentDepth > maxDepth {
206
-
return items, false
184
+
results[ix].children = sub
185
+
})
207
186
}
208
187
209
-
// Sort children based on sort parameter
210
-
children := node.children
211
-
// TODO: Actually sort based on the sort parameter (newest/oldest/top)
212
-
// For now, just use the order we have
188
+
wg.Wait()
213
189
214
-
// Limit to branchingFactor
215
-
limit := int(branchingFactor)
216
-
if len(children) > limit {
217
-
hasMore = true
218
-
children = children[:limit]
190
+
var out []*bsky.UnspeccedGetPostThreadV2_ThreadItem
191
+
for _, res := range results {
192
+
out = append(out, res.node)
193
+
out = append(out, res.children...)
219
194
}
220
195
221
-
for _, child := range children {
222
-
item := buildThreadItem(ctx, hydrator, child, currentDepth, viewer)
223
-
if item != nil {
224
-
items = append(items, item)
196
+
return out, nil
197
+
}
225
198
226
-
// Recursively collect replies
227
-
if currentDepth < maxDepth {
228
-
childReplies, childHasMore := collectReplies(ctx, hydrator, child, currentDepth+1, maxDepth, branchingFactor, sort, viewer)
229
-
items = append(items, childReplies...)
230
-
if childHasMore {
231
-
hasMore = true
232
-
}
233
-
}
199
+
func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadTree, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem {
200
+
if node.missing {
201
+
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
202
+
Depth: depth,
203
+
Uri: node.uri,
204
+
Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{
205
+
UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{
206
+
LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound",
207
+
},
208
+
},
234
209
}
235
210
}
236
211
237
-
return items, hasMore
238
-
}
239
-
240
-
func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem {
241
212
// Hydrate the post
242
-
postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer)
213
+
postInfo, err := hydrator.HydratePostDB(ctx, node.uri, node.val, viewer)
243
214
if err != nil {
215
+
slog.Error("failed to hydrate post in thread item", "uri", node.uri, "error", err)
244
216
// Return not found item
245
217
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
246
218
Depth: depth,
···
256
228
// Hydrate author
257
229
authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author)
258
230
if err != nil {
231
+
slog.Error("failed to hydrate actor in thread item", "author", postInfo.Author, "error", err)
259
232
return &bsky.UnspeccedGetPostThreadV2_ThreadItem{
260
233
Depth: depth,
261
234
Uri: node.uri,
···
319
292
return string(parts)
320
293
}
321
294
322
-
func extractRkeyFromURI(uri string) string {
323
-
// URI format: at://did:plc:xxx/collection/rkey
324
-
if len(uri) < 5 || uri[:5] != "at://" {
325
-
return ""
295
+
type threadTree struct {
296
+
parent *threadTree
297
+
children []*threadTree
298
+
299
+
val *models.Post
300
+
301
+
missing bool
302
+
303
+
uri string
304
+
cid string
305
+
}
306
+
307
+
func buildThreadTree(ctx context.Context, hydrator *hydration.Hydrator, db *gorm.DB, posts []*models.Post) (map[uint]*threadTree, error) {
308
+
nodes := make(map[uint]*threadTree)
309
+
for _, p := range posts {
310
+
puri, err := hydrator.UriForPost(ctx, p)
311
+
if err != nil {
312
+
return nil, err
313
+
}
314
+
315
+
t := &threadTree{
316
+
val: p,
317
+
uri: puri,
318
+
}
319
+
320
+
nodes[p.ID] = t
326
321
}
327
-
// Find last slash
328
-
for i := len(uri) - 1; i >= 5; i-- {
329
-
if uri[i] == '/' {
330
-
return uri[i+1:]
322
+
323
+
missing := make(map[uint]*threadTree)
324
+
for _, node := range nodes {
325
+
if node.val.ReplyTo == 0 {
326
+
continue
331
327
}
328
+
329
+
pnode, ok := nodes[node.val.ReplyTo]
330
+
if !ok {
331
+
pnode = &threadTree{
332
+
missing: true,
333
+
}
334
+
missing[node.val.ReplyTo] = pnode
335
+
336
+
var bspost bsky.FeedPost
337
+
if err := bspost.UnmarshalCBOR(bytes.NewReader(node.val.Raw)); err != nil {
338
+
return nil, err
339
+
}
340
+
341
+
if bspost.Reply == nil || bspost.Reply.Parent == nil {
342
+
return nil, fmt.Errorf("node with parent had no parent in object")
343
+
}
344
+
345
+
pnode.uri = bspost.Reply.Parent.Uri
346
+
pnode.cid = bspost.Reply.Parent.Cid
347
+
348
+
/* Maybe we could force hydrate these?
349
+
hydrator.AddMissingRecord(puri, true)
350
+
*/
351
+
}
352
+
353
+
pnode.children = append(pnode.children, node)
354
+
node.parent = pnode
332
355
}
333
-
return ""
356
+
357
+
for k, v := range missing {
358
+
nodes[k] = v
359
+
}
360
+
361
+
return nodes, nil
334
362
}