+17
-17
README.md
+17
-17
README.md
···
1
1
# Rinds
2
2
A collection of feeds under one roof.
3
-
4
-
I don't like Docker and I don't need to compile this, okay thanks!
3
+
including Fresh Feeds
5
4
6
5
## Running
7
-
aint that hard, go install go, setup postgres, and generate the required feed definitions under your user account. you can use https://pdsls.dev to generate a `app.bsky.feed.generator` record.
8
-
Set the `rkey` to the desired short url value. itll look like:
6
+
Install go, setup postgres, and then generate the required feed definitions under your user account. You can use https://pdsls.dev to generate a `app.bsky.feed.generator` record.
7
+
Set the `rkey` to the desired short url value. Itll look like:
9
8
```
10
9
https://bsky.app/profile/{you}/feed/{rkey}
11
10
```
12
11
for the contents you can use the example below
13
12
```
14
13
{
15
-
"did": "did:web:${INSERT DID:WEB HERE}",
14
+
"did": "did:web:${your service endpoint}",
16
15
"$type": "app.bsky.feed.generator",
17
16
"createdAt": "2025-01-21T11:33:02.396Z",
18
17
"description": "wowww very descriptive",
···
20
19
}
21
20
```
22
21
22
+
If you are specifically trying to run any of the preexisting feeds, make sure to use the rkey as defined in the /pkg/feeds/*/feed.go and main.go files
23
+
23
24
## Env
24
25
You can check out `.env.example` for an example
25
-
26
+
The port is for your feedgen serve, not your postgres port
26
27
27
28
## Postgres
28
29
Be sure to set up `.env` correctly
···
31
32
32
33
## Index
33
34
You should start Postgres first
34
-
Then go run the firehose ingester in
35
+
Then go run the firehose indexder inside
35
36
```
36
37
cd ./indexer
37
38
```
38
-
and go compile it
39
+
And then go compile it
39
40
```
40
41
go build -o indexer ./indexer.go && export $(grep -v '^#' ./../.env | xargs) && ./indexer
41
42
```
42
-
after it has been compiled, you can use `rerun.sh` to ensure it will automatically recover after failure
43
+
Once compiled, you can use `rerun.sh` to ensure it will automatically recover after failure
43
44
44
45
## Serve
45
46
Make sure the indexer (or at least Postgres) is running first:
46
47
```
47
48
go build -o feedgen cmd/main.go && export $(grep -v '^#' ./.env | xargs) && ./feedgen
48
49
```
49
-
the logs are pretty verbose imo, fyi
50
+
Logs are pretty verbose, just FYI.
50
51
51
52
## Todo
52
53
- [ ] Faster Indexing
53
-
- [ ] Proper Up-to-Date Following Indexing
54
+
- [x] Proper Up-to-Date Following Indexing
54
55
- [x] Repost Indicators
55
-
- [ ] Cache Timeouts
56
+
- [x] Cache Timeouts
56
57
- [x] Likes
57
58
- [x] Posts
58
59
- [x] Feed Caches
59
-
- [ ] Followings
60
60
- [ ] More Fresh Feed Variants
61
61
- [ ] unFresh
62
62
- [x] +9 hrs
···
68
68
- [ ] Fresh: Text Only
69
69
70
70
## Architecture
71
-
Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works.
71
+
Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works. This repository differs from the template by not using the docker system at all.
72
72
73
-
### /feeds/static
73
+
### /pkg/feeds/static
74
74
Basic example feed from the template. Kept as a sanity check if all else seems to fail.
75
75
76
-
### /feeds/fresh
77
-
Fresh feeds, all based around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds".
76
+
### /pkg/feeds/fresh
77
+
Fresh feeds, all built around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds".
78
78
+5
-2
cmd/main.go
+5
-2
cmd/main.go
···
263
263
| _ <| | | | | (_| \__ \
264
264
|_| \_\_|_| |_|\__,_|___/
265
265
266
-
bsky feed generator
266
+
bsky feed generators by @whey.party
267
267
268
268
Code: https://github.com/rimar1337/rinds
269
-
Flagship instance: https://bsky.app/profile/did:plc:mn45tewwnse5btfftvd3powc/feed/rinds
269
+
Flagship Fresh Feeds Instance: https://bsky.app/profile/did:plc:mn45tewwnse5btfftvd3powc/feed/rinds
270
+
271
+
Fresh Feeds icons by @pprmint.de
272
+
Repository generated from https://github.com/ericvolp12/go-bsky-feed-generator
270
273
`)
271
274
})
272
275
+185
-1
indexer/indexer.go
+185
-1
indexer/indexer.go
···
3
3
import (
4
4
"context"
5
5
"database/sql"
6
+
"encoding/json"
6
7
"fmt"
7
8
"log"
8
9
"os"
···
13
14
_ "github.com/lib/pq"
14
15
)
15
16
16
-
const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like"
17
+
const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow"
17
18
18
19
type LikeMessage struct {
19
20
Did string `json:"did"`
···
38
39
Reply *Reply `json:"reply,omitempty"`
39
40
}
40
41
42
+
type FollowRecord struct {
43
+
DID string `json:"subject"`
44
+
createdAt string `json:"createdAt"`
45
+
}
46
+
41
47
type Reply struct {
42
48
Parent ReplySubject `json:"parent"`
43
49
Root ReplySubject `json:"root"`
···
46
52
type LikeSubject struct {
47
53
CID string `json:"cid"`
48
54
URI string `json:"uri"`
55
+
Raw string // Stores raw string if not a JSON object
56
+
}
57
+
58
+
// This handles both string and object forms
59
+
func (ls *LikeSubject) UnmarshalJSON(data []byte) error {
60
+
// Try to unmarshal as object first
61
+
var obj struct {
62
+
CID string `json:"cid"`
63
+
URI string `json:"uri"`
64
+
}
65
+
if err := json.Unmarshal(data, &obj); err == nil && obj.URI != "" {
66
+
ls.CID = obj.CID
67
+
ls.URI = obj.URI
68
+
return nil
69
+
}
70
+
71
+
// If it's not a valid object, try to unmarshal as string
72
+
var str string
73
+
if err := json.Unmarshal(data, &str); err == nil {
74
+
ls.Raw = str
75
+
return nil
76
+
}
77
+
78
+
return fmt.Errorf("LikeSubject: invalid JSON: %s", string(data))
49
79
}
50
80
51
81
type ReplySubject struct {
···
58
88
var (
59
89
postBatch []Post
60
90
likeBatch []Like
91
+
followBatch []Follow
61
92
batchInsertSize = 1000 // Adjust the batch size as needed
62
93
batchInterval = 30 * time.Second // Flush every 30 seconds
63
94
)
···
77
108
RelDate int64
78
109
}
79
110
111
+
type Follow struct {
112
+
RelAuthor string
113
+
followSubjectDID string
114
+
}
115
+
116
+
var trackedDIDsMap map[string]struct{}
117
+
var trackedDIDs []string
118
+
119
+
func initTrackedDIDsMap(dids []string) {
120
+
trackedDIDsMap = make(map[string]struct{}, len(dids))
121
+
for _, did := range dids {
122
+
trackedDIDsMap[did] = struct{}{}
123
+
}
124
+
}
125
+
80
126
func getLastCursor(db *sql.DB) int64 {
81
127
var lastCursor int64
82
128
err := db.QueryRow("SELECT lastCursor FROM cursor WHERE id = 1").Scan(&lastCursor)
···
90
136
return lastCursor
91
137
}
92
138
139
+
// Periodically refreshes the trackedDIDs and trackedDIDsMap from the database.
140
+
func startTrackedDIDRefreshJob(db *sql.DB) {
141
+
ticker := time.NewTicker(10 * time.Minute)
142
+
defer ticker.Stop()
143
+
144
+
for range ticker.C {
145
+
updatedDIDs, err := getTrackedDIDs(context.Background(), db)
146
+
if err != nil {
147
+
log.Printf("Failed to refresh tracked DIDs: %v", err)
148
+
continue
149
+
}
150
+
initTrackedDIDsMap(updatedDIDs)
151
+
trackedDIDs = updatedDIDs
152
+
log.Printf("Refreshed tracked DIDs: %v\n", trackedDIDs)
153
+
}
154
+
}
93
155
func main() {
94
156
// Connect to Postgres // Open the database connection
95
157
dbHost := os.Getenv("DB_HOST")
···
118
180
// Retrieve the last cursor
119
181
lastCursor := getLastCursor(db)
120
182
183
+
// initialize the tracked DIDs
184
+
trackedDIDs, err = getTrackedDIDs(context.Background(), db)
185
+
if err != nil {
186
+
log.Fatalf("Failed to get tracked DIDs: %v", err)
187
+
}
188
+
log.Printf("Tracked DIDs: %v\n", trackedDIDs)
189
+
initTrackedDIDsMap(trackedDIDs)
190
+
191
+
go startBatchInsertFollowJob(db)
192
+
193
+
go startTrackedDIDRefreshJob(db)
194
+
121
195
// If the cursor is older than 24 hours, skip it
122
196
if lastCursor > 0 {
123
197
cursorTime := time.UnixMicro(lastCursor)
···
212
286
}
213
287
}
214
288
func processMessage(db *sql.DB, msg LikeMessage) {
289
+
// log.Print("Processing message...")
215
290
// Convert cursor to time
216
291
cursorTime := time.UnixMicro(msg.TimeUs)
217
292
···
245
320
deletePost(db, msg.Did, postUri, msg.TimeUs)
246
321
}
247
322
case "app.bsky.feed.repost":
323
+
248
324
if record.Subject.URI != "" {
249
325
if msg.Commit.Operation == "create" {
250
326
postBatch = append(postBatch, Post{msg.Did, record.Subject.URI, msg.TimeUs, true, repostUri, ""})
···
260
336
deleteLike(db, msg.Did, record.Subject.URI)
261
337
}
262
338
}
339
+
case "app.bsky.graph.follow":
340
+
_, tracked := trackedDIDsMap[msg.Did]
341
+
if tracked {
342
+
// log.Printf("Following found tracked DID: %s\n", msg.Did)
343
+
if msg.Commit.Operation == "create" {
344
+
// log.Printf("Following Create; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw)
345
+
followBatch = append(followBatch, Follow{msg.Did, record.Subject.Raw})
346
+
} else if msg.Commit.Operation == "delete" {
347
+
// log.Printf("Following Delete; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw)
348
+
//log.Printf("Unfollowing: %s", msg.Commit.RKey)
349
+
// Remove the DID from the tracked DIDs map
350
+
deleteFollow(db, msg.Did, msg.Commit.RKey)
351
+
}
352
+
}
263
353
default:
264
354
//log.Printf("Unknown collection: %s", msg.Commit.Collection)
265
355
}
···
291
381
}
292
382
}
293
383
384
+
func deleteFollow(db *sql.DB, relAuthor, did string) {
385
+
unquotedTableName := "follows_" + relAuthor
386
+
tableName := pq.QuoteIdentifier(unquotedTableName)
387
+
_, err := db.Exec(fmt.Sprintf(`
388
+
DELETE FROM %s WHERE follow = $1;
389
+
`, tableName), did)
390
+
if err != nil {
391
+
log.Printf("Error deleting follow: %v", err)
392
+
}
393
+
}
394
+
294
395
func startCleanupJob(db *sql.DB) {
295
396
ticker := time.NewTicker(1 * time.Hour)
296
397
defer ticker.Stop()
···
373
474
}
374
475
}
375
476
477
+
func startBatchInsertFollowJob(db *sql.DB) {
478
+
ticker := time.NewTicker(batchInterval)
479
+
defer ticker.Stop()
480
+
481
+
for range ticker.C {
482
+
if len(followBatch) >= 1 {
483
+
batchInsertFollow(db)
484
+
}
485
+
}
486
+
}
487
+
func batchInsertFollow(db *sql.DB) {
488
+
tx, err := db.Begin()
489
+
if err != nil {
490
+
log.Printf("Error starting transaction: %v", err)
491
+
return
492
+
}
493
+
494
+
unquotedTableName := "follows_" + followBatch[0].RelAuthor
495
+
tableName := pq.QuoteIdentifier(unquotedTableName)
496
+
497
+
stmt, err := tx.Prepare(fmt.Sprintf(`
498
+
INSERT INTO %s (follow)
499
+
VALUES ($1)
500
+
ON CONFLICT (follow) DO NOTHING
501
+
`, tableName))
502
+
if err != nil {
503
+
log.Printf("Error preparing statement: %v", err)
504
+
return
505
+
}
506
+
defer stmt.Close()
507
+
508
+
for _, follow := range followBatch {
509
+
_, err := stmt.Exec(follow.followSubjectDID)
510
+
if err != nil {
511
+
log.Printf("Error executing statement: %v", err)
512
+
log.Printf("Failed FOLLOW INSERT: %+v\nError: %v", follow, err)
513
+
os.Exit(1) // Exit on error
514
+
}
515
+
}
516
+
517
+
err = tx.Commit()
518
+
if err != nil {
519
+
log.Printf("Error committing transaction: %v", err)
520
+
}
521
+
522
+
// Clear the batch
523
+
followBatch = followBatch[:0]
524
+
}
525
+
376
526
func batchInsertLikes(db *sql.DB) {
377
527
tx, err := db.Begin()
378
528
if err != nil {
···
477
627
478
628
return nil
479
629
}
630
+
631
+
// ehhhhh why are we doing this
632
+
func getTrackedDIDs(ctx context.Context, db *sql.DB) ([]string, error) {
633
+
const prefix = "follows_"
634
+
query := `
635
+
SELECT table_name
636
+
FROM information_schema.tables
637
+
WHERE table_name LIKE $1
638
+
`
639
+
rows, err := db.QueryContext(ctx, query, prefix+"%")
640
+
if err != nil {
641
+
return nil, fmt.Errorf("error querying tracked follows tables: %w", err)
642
+
}
643
+
defer rows.Close()
644
+
645
+
var trackedDIDs []string
646
+
for rows.Next() {
647
+
var tableName string
648
+
if err := rows.Scan(&tableName); err != nil {
649
+
return nil, fmt.Errorf("error scanning table name: %w", err)
650
+
}
651
+
// Strip prefix to get the DID
652
+
if len(tableName) > len(prefix) {
653
+
did := tableName[len(prefix):]
654
+
trackedDIDs = append(trackedDIDs, did)
655
+
}
656
+
}
657
+
658
+
if err := rows.Err(); err != nil {
659
+
return nil, fmt.Errorf("error iterating tracked tables: %w", err)
660
+
}
661
+
662
+
return trackedDIDs, nil
663
+
}
+1
-2
pkg/auth/auth.go
+1
-2
pkg/auth/auth.go
···
115
115
accessToken := authHeaderParts[1]
116
116
117
117
parser := jwt.Parser{
118
-
ValidMethods: []string{es256k.SigningMethodES256K.Alg()},
119
-
SkipClaimsValidation: true, // IM SORRY I HAD TO, MY VPS IS ACTING STRANGE. I THINK ITS FINE
118
+
ValidMethods: []string{es256k.SigningMethodES256K.Alg()},
120
119
}
121
120
122
121
token, err := parser.ParseWithClaims(accessToken, claims, func(token *jwt.Token) (interface{}, error) {
+184
-73
pkg/feeds/fresh/feed.go
+184
-73
pkg/feeds/fresh/feed.go
···
30
30
type FeedType string
31
31
32
32
const (
33
-
Rinds FeedType = "rinds"
34
-
Random FeedType = "random"
35
-
Mnine FeedType = "mnine"
36
-
Reposts FeedType = "reposts"
37
-
OReplies FeedType = "oreplies"
33
+
Rinds FeedType = "rinds"
34
+
Random FeedType = "random"
35
+
Mnine FeedType = "mnine"
36
+
Reposts FeedType = "reposts"
37
+
OReplies FeedType = "oreplies"
38
+
followingRefreshThreshold = 12 * time.Hour
38
39
)
39
40
40
41
type StaticFeed struct {
···
139
140
hash = generateHash(userDID)
140
141
} else {
141
142
log.Println("Using existing hash")
143
+
}
144
+
145
+
// hacky stopgap following list refresh
146
+
if err := sf.ensureRefreshStatusTableExists(ctx); err != nil {
147
+
log.Printf("Could not ensure following_refresh_status table exists: %v", err)
148
+
} else {
149
+
sf.checkAndRefreshFollowers(ctx, userDID)
142
150
}
143
151
144
152
tableName := fmt.Sprintf("feedcache_%s_%s", userDID, hash)
···
419
427
return hex.EncodeToString(hash[:])[:5]
420
428
}
421
429
422
-
func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) {
423
-
unquotedTableName := "follows_" + userdid
424
-
tableName := pq.QuoteIdentifier(unquotedTableName)
425
-
fmt.Printf("Checking for table: %s\n", tableName) // Debug log
430
+
func (sf *StaticFeed) ensureRefreshStatusTableExists(ctx context.Context) error {
431
+
query := `
432
+
CREATE TABLE IF NOT EXISTS following_refresh_status (
433
+
user_did TEXT PRIMARY KEY,
434
+
last_fetched TIMESTAMPTZ,
435
+
is_refreshing BOOLEAN DEFAULT FALSE
436
+
);
437
+
`
438
+
_, err := sf.DB.ExecContext(ctx, query)
439
+
if err != nil {
440
+
return fmt.Errorf("error creating following_refresh_status table: %w", err)
441
+
}
442
+
return nil
443
+
}
426
444
427
-
// Check if cache table exists
428
-
var exists bool
429
-
query := "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)"
430
-
fmt.Printf("Executing query: %s with parameter: %s\n", query, unquotedTableName) // Debug log
431
-
err := db.QueryRowContext(ctx, query, unquotedTableName).Scan(&exists)
445
+
func (sf *StaticFeed) checkAndRefreshFollowers(ctx context.Context, userDID string) {
446
+
_, err := sf.DB.ExecContext(ctx,
447
+
"INSERT INTO following_refresh_status (user_did) VALUES ($1) ON CONFLICT (user_did) DO NOTHING", userDID)
432
448
if err != nil {
433
-
// Check for specific errors
434
-
if err == sql.ErrNoRows {
435
-
return nil, fmt.Errorf("table existence check returned no rows: %w", err)
436
-
}
437
-
if pqErr, ok := err.(*pq.Error); ok {
438
-
return nil, fmt.Errorf("PostgreSQL error: %s, Code: %s", pqErr.Message, pqErr.Code)
439
-
}
440
-
return nil, fmt.Errorf("error checking cache table existence: %w", err)
449
+
log.Printf("Error ensuring refresh status entry for %s: %v", userDID, err)
450
+
return
451
+
}
452
+
453
+
var lastFetched sql.NullTime
454
+
var isRefreshing bool
455
+
456
+
err = sf.DB.QueryRowContext(ctx,
457
+
"SELECT last_fetched, is_refreshing FROM following_refresh_status WHERE user_did = $1", userDID).Scan(&lastFetched, &isRefreshing)
458
+
if err != nil {
459
+
log.Printf("Error getting refresh status for %s: %v", userDID, err)
460
+
return
461
+
}
462
+
463
+
if isRefreshing {
464
+
log.Printf("Follower refresh for %s is already in progress. Skipping.", userDID)
465
+
return
466
+
}
467
+
468
+
if !lastFetched.Valid || time.Since(lastFetched.Time) > followingRefreshThreshold {
469
+
log.Printf("Follower list for %s is stale or has never been fetched. Triggering background refresh.", userDID)
470
+
go sf.refreshFollowersAsync(userDID)
471
+
return
441
472
}
442
473
443
-
fmt.Printf("Table exists: %v\n", exists) // Debug log
474
+
log.Printf("Follower list for %s is fresh enough (last fetched %v ago). Skipping refresh.", userDID, time.Since(lastFetched.Time))
475
+
}
444
476
445
-
if exists {
446
-
rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName))
477
+
func (sf *StaticFeed) refreshFollowersAsync(userDID string) {
478
+
ctx := context.Background()
479
+
480
+
_, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = TRUE WHERE user_did = $1", userDID)
481
+
if err != nil {
482
+
log.Printf("Error setting refresh lock for %s: %v", userDID, err)
483
+
return
484
+
}
485
+
486
+
defer func() {
487
+
_, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = FALSE WHERE user_did = $1", userDID)
447
488
if err != nil {
448
-
return nil, fmt.Errorf("error querying cache table: %w", err)
489
+
log.Printf("CRITICAL: Error releasing refresh lock for %s: %v", userDID, err)
449
490
}
450
-
defer rows.Close()
491
+
}()
451
492
452
-
var cachedFollows []string
453
-
for rows.Next() {
454
-
var follow string
455
-
if err := rows.Scan(&follow); err != nil {
456
-
return nil, fmt.Errorf("error scanning cached follow: %w", err)
457
-
}
458
-
cachedFollows = append(cachedFollows, follow)
459
-
}
493
+
err = fetchAndCacheFollowersFromAPI(ctx, sf.DB, userDID)
494
+
if err != nil {
495
+
log.Printf("Error refreshing followers for %s: %v", userDID, err)
496
+
return
497
+
}
460
498
461
-
if err := rows.Err(); err != nil {
462
-
return nil, fmt.Errorf("error iterating cached follows: %w", err)
463
-
}
499
+
_, err = sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET last_fetched = NOW() WHERE user_did = $1", userDID)
500
+
if err != nil {
501
+
log.Printf("Error updating last_fetched for %s: %v", userDID, err)
502
+
}
464
503
465
-
log.Println("Returning cached followers")
466
-
return cachedFollows, nil
467
-
}
504
+
log.Printf("Successfully refreshed followers for %s.", userDID)
505
+
}
468
506
469
-
log.Println("Fetching followers from API")
507
+
func fetchAndCacheFollowersFromAPI(ctx context.Context, db *sql.DB, userdid string) error {
508
+
log.Printf("Fetching followers from API for %s", userdid)
470
509
var allDIDs []string
471
510
cursor := ""
472
511
···
474
513
apiURL := fmt.Sprintf("https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor=%s&cursor=%s", userdid, cursor)
475
514
resp, err := http.Get(apiURL)
476
515
if err != nil {
477
-
log.Printf("Error making request: %v\n", err)
478
-
return nil, fmt.Errorf("failed to make request: %v", err)
516
+
return fmt.Errorf("failed to make request: %w", err)
479
517
}
480
518
defer resp.Body.Close()
481
519
482
520
body, err := ioutil.ReadAll(resp.Body)
483
521
if err != nil {
484
-
log.Printf("Error reading response: %v\n", err)
485
-
return nil, fmt.Errorf("failed to read response: %v", err)
522
+
return fmt.Errorf("failed to read response: %w", err)
486
523
}
487
524
488
525
var response Response
489
526
if err := json.Unmarshal(body, &response); err != nil {
490
-
log.Printf("Error unmarshalling JSON: %v\n", err)
491
-
return nil, fmt.Errorf("failed to unmarshal JSON: %v", err)
527
+
return fmt.Errorf("failed to unmarshal JSON: %w", err)
492
528
}
493
529
494
530
for _, follow := range response.Follows {
···
501
537
cursor = response.Cursor
502
538
}
503
539
504
-
// Drop the existing table if it exists
505
-
log.Println("Dropping existing followers table if it exists")
506
-
_, err = db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
540
+
unquotedTableName := "follows_" + userdid
541
+
tableName := pq.QuoteIdentifier(unquotedTableName)
542
+
tempTableName := "follows_temp_" + userdid
543
+
quotedTempTableName := pq.QuoteIdentifier(tempTableName)
544
+
545
+
log.Println("Dropping existing temporary followers table if it exists")
546
+
_, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", quotedTempTableName))
507
547
if err != nil {
508
-
return nil, fmt.Errorf("error dropping existing cache table: %w", err)
548
+
return fmt.Errorf("error dropping existing temp cache table: %w", err)
509
549
}
510
550
511
-
// Cache the results in the database
512
-
log.Println("Caching followers in the database")
513
-
_, err = db.ExecContext(ctx, fmt.Sprintf(`
514
-
CREATE TABLE %s (
515
-
follow TEXT UNIQUE
516
-
)
517
-
`, tableName))
551
+
log.Println("Creating temporary followers table")
552
+
_, err = db.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s (follow TEXT UNIQUE)`, quotedTempTableName))
518
553
if err != nil {
519
-
return nil, fmt.Errorf("error creating cache table: %w", err)
554
+
return fmt.Errorf("error creating temp cache table: %w", err)
520
555
}
521
556
522
-
// Use a map to track unique follows
523
-
followMap := make(map[string]struct{})
524
-
for _, follow := range allDIDs {
525
-
if _, exists := followMap[follow]; !exists {
526
-
followMap[follow] = struct{}{}
527
-
_, err := db.ExecContext(ctx, fmt.Sprintf(`
528
-
INSERT INTO %s (follow)
529
-
VALUES ($1)
530
-
ON CONFLICT (follow) DO NOTHING
531
-
`, tableName), follow)
557
+
if len(allDIDs) > 0 {
558
+
txn, err := db.BeginTx(ctx, nil)
559
+
if err != nil {
560
+
return fmt.Errorf("failed to begin transaction: %w", err)
561
+
}
562
+
563
+
stmt, err := txn.Prepare(pq.CopyIn(tempTableName, "follow"))
564
+
if err != nil {
565
+
return fmt.Errorf("failed to prepare copy in: %w", err)
566
+
}
567
+
568
+
for _, did := range allDIDs {
569
+
_, err = stmt.Exec(did)
532
570
if err != nil {
533
-
return nil, fmt.Errorf("error inserting into cache table: %w", err)
571
+
return fmt.Errorf("failed to exec copy in: %w", err)
534
572
}
535
573
}
574
+
575
+
_, err = stmt.Exec()
576
+
if err != nil {
577
+
return fmt.Errorf("failed to finalize copy in: %w", err)
578
+
}
579
+
580
+
err = stmt.Close()
581
+
if err != nil {
582
+
return fmt.Errorf("failed to close statement: %w", err)
583
+
}
584
+
585
+
err = txn.Commit()
586
+
if err != nil {
587
+
return fmt.Errorf("failed to commit transaction: %w", err)
588
+
}
536
589
}
537
590
538
-
log.Println("Returning fetched followers")
539
-
return allDIDs, nil
591
+
log.Println("Atomically replacing old followers table with new one")
592
+
tx, err := db.Begin()
593
+
if err != nil {
594
+
return fmt.Errorf("error beginning transaction for table rename: %w", err)
595
+
}
596
+
_, err = tx.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
597
+
if err != nil {
598
+
tx.Rollback()
599
+
return fmt.Errorf("error dropping old follows table in transaction: %w", err)
600
+
}
601
+
_, err = tx.Exec(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", quotedTempTableName, pq.QuoteIdentifier(unquotedTableName)))
602
+
if err != nil {
603
+
tx.Rollback()
604
+
return fmt.Errorf("error renaming temp table in transaction: %w", err)
605
+
}
606
+
err = tx.Commit()
607
+
if err != nil {
608
+
return fmt.Errorf("error committing transaction for table rename: %w", err)
609
+
}
610
+
611
+
log.Printf("Successfully cached %d followers for %s", len(allDIDs), userdid)
612
+
return nil
613
+
}
614
+
615
+
func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) {
616
+
unquotedTableName := "follows_" + userdid
617
+
tableName := pq.QuoteIdentifier(unquotedTableName)
618
+
619
+
var exists bool
620
+
err := db.QueryRowContext(ctx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)", unquotedTableName).Scan(&exists)
621
+
if err != nil {
622
+
return nil, fmt.Errorf("error checking follows cache table existence for %s: %w", userdid, err)
623
+
}
624
+
625
+
if !exists {
626
+
log.Printf("Follows cache table for %s does not exist. Waiting for background refresh to create it.", userdid)
627
+
return []string{}, nil
628
+
}
629
+
630
+
rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName))
631
+
if err != nil {
632
+
return nil, fmt.Errorf("error querying cache table: %w", err)
633
+
}
634
+
defer rows.Close()
635
+
636
+
var cachedFollows []string
637
+
for rows.Next() {
638
+
var follow string
639
+
if err := rows.Scan(&follow); err != nil {
640
+
return nil, fmt.Errorf("error scanning cached follow: %w", err)
641
+
}
642
+
cachedFollows = append(cachedFollows, follow)
643
+
}
644
+
645
+
if err := rows.Err(); err != nil {
646
+
return nil, fmt.Errorf("error iterating cached follows: %w", err)
647
+
}
648
+
649
+
log.Printf("Returning %d cached followers for %s", len(cachedFollows), userdid)
650
+
return cachedFollows, nil
540
651
}
541
652
542
653
func markPostsAsViewed(ctx context.Context, db *sql.DB, userDID string, posts []PostWithDate, smartReportingEnabled bool) error {