A locally focused bluesky appview
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/bluesky-social/indigo/util"
14 "github.com/jackc/pgx/v5"
15 "github.com/jackc/pgx/v5/pgconn"
16 "github.com/whyrusleeping/market/models"
17 "gorm.io/gorm"
18 "gorm.io/gorm/clause"
19 "gorm.io/gorm/logger"
20)
21
22func (b *PostgresBackend) getOrCreateRepo(ctx context.Context, did string) (*Repo, error) {
23 r, ok := b.repoCache.Get(did)
24 if !ok {
25 b.reposLk.Lock()
26
27 r, ok = b.repoCache.Get(did)
28 if !ok {
29 r = &Repo{}
30 r.Did = did
31 b.repoCache.Add(did, r)
32 }
33
34 b.reposLk.Unlock()
35 }
36
37 r.Lk.Lock()
38 defer r.Lk.Unlock()
39 if r.Setup {
40 return r, nil
41 }
42
43 row := b.pgx.QueryRow(ctx, "SELECT id, created_at, did FROM repos WHERE did = $1", did)
44
45 err := row.Scan(&r.ID, &r.CreatedAt, &r.Did)
46 if err == nil {
47 // found it!
48 r.Setup = true
49 return r, nil
50 }
51
52 if err != pgx.ErrNoRows {
53 return nil, err
54 }
55
56 r.Did = did
57 if err := b.db.Create(r).Error; err != nil {
58 return nil, err
59 }
60
61 r.Setup = true
62
63 return r, nil
64}
65
66func (b *PostgresBackend) getOrCreateList(ctx context.Context, uri string) (*List, error) {
67 puri, err := util.ParseAtUri(uri)
68 if err != nil {
69 return nil, err
70 }
71
72 r, err := b.getOrCreateRepo(ctx, puri.Did)
73 if err != nil {
74 return nil, err
75 }
76
77 // TODO: needs upsert treatment when we actually find the list
78 var list List
79 if err := b.db.FirstOrCreate(&list, map[string]any{
80 "author": r.ID,
81 "rkey": puri.Rkey,
82 }).Error; err != nil {
83 return nil, err
84 }
85 return &list, nil
86}
87
88type cachedPostInfo struct {
89 ID uint
90 Author uint
91}
92
93func (b *PostgresBackend) postIDForUri(ctx context.Context, uri string) (uint, error) {
94 // getPostByUri implicitly fills the cache
95 p, err := b.postInfoForUri(ctx, uri)
96 if err != nil {
97 return 0, err
98 }
99
100 return p.ID, nil
101}
102
103func (b *PostgresBackend) postInfoForUri(ctx context.Context, uri string) (cachedPostInfo, error) {
104 v, ok := b.postInfoCache.Get(uri)
105 if ok {
106 return v, nil
107 }
108
109 // getPostByUri implicitly fills the cache
110 p, err := b.getOrCreatePostBare(ctx, uri)
111 if err != nil {
112 return cachedPostInfo{}, err
113 }
114
115 return cachedPostInfo{ID: p.ID, Author: p.Author}, nil
116}
117
118func (b *PostgresBackend) tryLoadPostInfo(ctx context.Context, uid uint, rkey string) (*Post, error) {
119 var p Post
120 q := "SELECT id, author FROM posts WHERE author = $1 AND rkey = $2"
121 if err := b.pgx.QueryRow(ctx, q, uid, rkey).Scan(&p.ID, &p.Author); err != nil {
122 if errors.Is(err, pgx.ErrNoRows) {
123 return nil, nil
124 }
125 return nil, err
126 }
127
128 return &p, nil
129}
130
131func (b *PostgresBackend) getOrCreatePostBare(ctx context.Context, uri string) (*Post, error) {
132 puri, err := util.ParseAtUri(uri)
133 if err != nil {
134 return nil, err
135 }
136
137 r, err := b.getOrCreateRepo(ctx, puri.Did)
138 if err != nil {
139 return nil, err
140 }
141
142 post, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey)
143 if err != nil {
144 return nil, err
145 }
146
147 if post == nil {
148 post = &Post{
149 Rkey: puri.Rkey,
150 Author: r.ID,
151 NotFound: true,
152 }
153
154 err := b.pgx.QueryRow(ctx, "INSERT INTO posts (rkey, author, not_found) VALUES ($1, $2, $3) RETURNING id", puri.Rkey, r.ID, true).Scan(&post.ID)
155 if err != nil {
156 pgErr, ok := err.(*pgconn.PgError)
157 if !ok || pgErr.Code != "23505" {
158 return nil, err
159 }
160
161 out, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey)
162 if err != nil {
163 return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err)
164 }
165 if out == nil {
166 return nil, fmt.Errorf("postgres is lying to us: %d %s", r.ID, puri.Rkey)
167 }
168
169 post = out
170 }
171
172 }
173
174 b.postInfoCache.Add(uri, cachedPostInfo{
175 ID: post.ID,
176 Author: post.Author,
177 })
178
179 return post, nil
180}
181
182func (b *PostgresBackend) getPostByUri(ctx context.Context, uri string, fields string) (*Post, error) {
183 puri, err := util.ParseAtUri(uri)
184 if err != nil {
185 return nil, err
186 }
187
188 r, err := b.getOrCreateRepo(ctx, puri.Did)
189 if err != nil {
190 return nil, err
191 }
192
193 q := "SELECT " + fields + " FROM posts WHERE author = ? AND rkey = ?"
194
195 var post Post
196 if err := b.db.Raw(q, r.ID, puri.Rkey).Scan(&post).Error; err != nil {
197 return nil, err
198 }
199
200 if post.ID == 0 {
201 post.Rkey = puri.Rkey
202 post.Author = r.ID
203 post.NotFound = true
204
205 if err := b.db.Session(&gorm.Session{
206 Logger: logger.Default.LogMode(logger.Silent),
207 }).Create(&post).Error; err != nil {
208 if !errors.Is(err, gorm.ErrDuplicatedKey) {
209 return nil, err
210 }
211 if err := b.db.Find(&post, "author = ? AND rkey = ?", r.ID, puri.Rkey).Error; err != nil {
212 return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err)
213 }
214 }
215
216 }
217
218 b.postInfoCache.Add(uri, cachedPostInfo{
219 ID: post.ID,
220 Author: post.Author,
221 })
222
223 return &post, nil
224}
225
226func (b *PostgresBackend) revForRepo(rr *Repo) (string, error) {
227 lrev, ok := b.revCache.Get(rr.ID)
228 if ok {
229 return lrev, nil
230 }
231
232 var rev string
233 if err := b.pgx.QueryRow(context.TODO(), "SELECT COALESCE(rev, '') FROM sync_infos WHERE repo = $1", rr.ID).Scan(&rev); err != nil {
234 if errors.Is(err, pgx.ErrNoRows) {
235 return "", nil
236 }
237 return "", err
238 }
239
240 if rev != "" {
241 b.revCache.Add(rr.ID, rev)
242 }
243 return rev, nil
244}
245
246func (b *PostgresBackend) ensureFollowsScraped(ctx context.Context, user string) error {
247 r, err := b.getOrCreateRepo(ctx, user)
248 if err != nil {
249 return err
250 }
251
252 var si SyncInfo
253 if err := b.db.Find(&si, "repo = ?", r.ID).Error; err != nil {
254 return err
255 }
256
257 // not found
258 if si.Repo == 0 {
259 if err := b.db.Create(&SyncInfo{
260 Repo: r.ID,
261 }).Error; err != nil {
262 return err
263 }
264 }
265
266 if si.FollowsSynced {
267 return nil
268 }
269
270 var follows []Follow
271 var cursor string
272 for {
273 resp, err := atproto.RepoListRecords(ctx, b.s.client, "app.bsky.graph.follow", cursor, 100, b.s.mydid, false)
274 if err != nil {
275 return err
276 }
277
278 for _, rec := range resp.Records {
279 if fol, ok := rec.Value.Val.(*bsky.GraphFollow); ok {
280 fr, err := b.getOrCreateRepo(ctx, fol.Subject)
281 if err != nil {
282 return err
283 }
284
285 puri, err := syntax.ParseATURI(rec.Uri)
286 if err != nil {
287 return err
288 }
289
290 follows = append(follows, Follow{
291 Created: time.Now(),
292 Indexed: time.Now(),
293 Rkey: puri.RecordKey().String(),
294 Author: r.ID,
295 Subject: fr.ID,
296 })
297 }
298 }
299
300 if resp.Cursor == nil || len(resp.Records) == 0 {
301 break
302 }
303 cursor = *resp.Cursor
304 }
305
306 if err := b.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(follows, 200).Error; err != nil {
307 return err
308 }
309
310 if err := b.db.Model(SyncInfo{}).Where("repo = ?", r.ID).Update("follows_synced", true).Error; err != nil {
311 return err
312 }
313
314 fmt.Println("Got follows: ", len(follows))
315
316 return nil
317}
318
319func (b *PostgresBackend) loadRelevantDids() error {
320 ctx := context.TODO()
321
322 if err := b.ensureFollowsScraped(ctx, b.s.mydid); err != nil {
323 return fmt.Errorf("failed to scrape follows: %w", err)
324 }
325
326 r, err := b.getOrCreateRepo(ctx, b.s.mydid)
327 if err != nil {
328 return err
329 }
330
331 var dids []string
332 if err := b.db.Raw("select did from follows left join repos on follows.subject = repos.id where follows.author = ?", r.ID).Scan(&dids).Error; err != nil {
333 return err
334 }
335
336 b.relevantDids[b.s.mydid] = true
337 for _, d := range dids {
338 fmt.Println("adding did: ", d)
339 b.relevantDids[d] = true
340 }
341
342 return nil
343}
344
345type SyncInfo struct {
346 Repo uint `gorm:"index"`
347 FollowsSynced bool
348 Rev string
349}
350
351func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
352 var id uint
353 var notfound bool
354 if err := b.pgx.QueryRow(ctx, "SELECT id, not_found FROM posts WHERE author = $1 AND rkey = $2", repo.ID, rkey).Scan(&id, ¬found); err != nil {
355 if errors.Is(err, pgx.ErrNoRows) {
356 return false, nil
357 }
358 return false, err
359 }
360
361 if id != 0 && !notfound {
362 return true, nil
363 }
364
365 return false, nil
366}
367
368func (b *PostgresBackend) didIsRelevant(did string) bool {
369 b.rdLk.Lock()
370 defer b.rdLk.Unlock()
371 return b.relevantDids[did]
372}
373
374func (b *PostgresBackend) anyRelevantIdents(idents ...string) bool {
375 for _, id := range idents {
376 if strings.HasPrefix(id, "did:") {
377 if b.didIsRelevant(id) {
378 return true
379 }
380 } else if strings.HasPrefix(id, "at://") {
381 puri, err := syntax.ParseATURI(id)
382 if err != nil {
383 continue
384 }
385
386 if b.didIsRelevant(puri.Authority().String()) {
387 return true
388 }
389 }
390 }
391
392 return false
393}
394
395func (b *PostgresBackend) getRepoByID(ctx context.Context, id uint) (*models.Repo, error) {
396 var r models.Repo
397 if err := b.db.Find(&r, "id = ?", id).Error; err != nil {
398 return nil, err
399 }
400
401 return &r, nil
402}
403
404func (b *PostgresBackend) TrackMissingActor(did string) {
405 b.s.addMissingProfile(context.TODO(), did)
406}