+15
-14
README.md
+15
-14
README.md
···
1
1
# Rinds
2
2
A collection of feeds under one roof.
3
-
4
-
I don't like Docker and I don't need to compile this, okay thanks!
3
+
including Fresh Feeds
5
4
6
5
## Running
7
-
aint that hard, go install go, setup postgres, and generate the required feed definitions under your user account. you can use https://pdsls.dev to generate a `app.bsky.feed.generator` record.
8
-
Set the `rkey` to the desired short url value. itll look like:
6
+
Install go, setup postgres, and then generate the required feed definitions under your user account. You can use https://pdsls.dev to generate a `app.bsky.feed.generator` record.
7
+
Set the `rkey` to the desired short url value. Itll look like:
9
8
```
10
9
https://bsky.app/profile/{you}/feed/{rkey}
11
10
```
12
11
for the contents you can use the example below
13
12
```
14
13
{
15
-
"did": "did:web:${INSERT DID:WEB HERE}",
14
+
"did": "did:web:${your service endpoint}",
16
15
"$type": "app.bsky.feed.generator",
17
16
"createdAt": "2025-01-21T11:33:02.396Z",
18
17
"description": "wowww very descriptive",
···
20
19
}
21
20
```
22
21
22
+
If you are specifically trying to run any of the preexisting feeds, make sure to use the rkey as defined in the /pkg/feeds/*/feed.go and main.go files
23
+
23
24
## Env
24
25
You can check out `.env.example` for an example
25
-
26
+
The port is for your feedgen serve, not your postgres port
26
27
27
28
## Postgres
28
29
Be sure to set up `.env` correctly
···
31
32
32
33
## Index
33
34
You should start Postgres first
34
-
Then go run the firehose ingester in
35
+
Then go run the firehose indexder inside
35
36
```
36
37
cd ./indexer
37
38
```
38
-
and go compile it
39
+
And then go compile it
39
40
```
40
41
go build -o indexer ./indexer.go && export $(grep -v '^#' ./../.env | xargs) && ./indexer
41
42
```
42
-
after it has been compiled, you can use `rerun.sh` to ensure it will automatically recover after failure
43
+
Once compiled, you can use `rerun.sh` to ensure it will automatically recover after failure
43
44
44
45
## Serve
45
46
Make sure the indexer (or at least Postgres) is running first:
46
47
```
47
48
go build -o feedgen cmd/main.go && export $(grep -v '^#' ./.env | xargs) && ./feedgen
48
49
```
49
-
the logs are pretty verbose imo, fyi
50
+
Logs are pretty verbose, just FYI.
50
51
51
52
## Todo
52
53
- [ ] Faster Indexing
···
67
68
- [ ] Fresh: Text Only
68
69
69
70
## Architecture
70
-
Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works.
71
+
Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works. This repository differs from the template by not using the docker system at all.
71
72
72
-
### /feeds/static
73
+
### /pkg/feeds/static
73
74
Basic example feed from the template. Kept as a sanity check if all else seems to fail.
74
75
75
-
### /feeds/fresh
76
-
Fresh feeds, all based around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds".
76
+
### /pkg/feeds/fresh
77
+
Fresh feeds, all built around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds".
77
78
+1
-2
pkg/auth/auth.go
+1
-2
pkg/auth/auth.go
···
115
115
accessToken := authHeaderParts[1]
116
116
117
117
parser := jwt.Parser{
118
-
ValidMethods: []string{es256k.SigningMethodES256K.Alg()},
119
-
SkipClaimsValidation: true, // IM SORRY I HAD TO, MY VPS IS ACTING STRANGE. I THINK ITS FINE
118
+
ValidMethods: []string{es256k.SigningMethodES256K.Alg()},
120
119
}
121
120
122
121
token, err := parser.ParseWithClaims(accessToken, claims, func(token *jwt.Token) (interface{}, error) {
+184
-73
pkg/feeds/fresh/feed.go
+184
-73
pkg/feeds/fresh/feed.go
···
30
30
type FeedType string
31
31
32
32
const (
33
-
Rinds FeedType = "rinds"
34
-
Random FeedType = "random"
35
-
Mnine FeedType = "mnine"
36
-
Reposts FeedType = "reposts"
37
-
OReplies FeedType = "oreplies"
33
+
Rinds FeedType = "rinds"
34
+
Random FeedType = "random"
35
+
Mnine FeedType = "mnine"
36
+
Reposts FeedType = "reposts"
37
+
OReplies FeedType = "oreplies"
38
+
followingRefreshThreshold = 12 * time.Hour
38
39
)
39
40
40
41
type StaticFeed struct {
···
139
140
hash = generateHash(userDID)
140
141
} else {
141
142
log.Println("Using existing hash")
143
+
}
144
+
145
+
// hacky stopgap following list refresh
146
+
if err := sf.ensureRefreshStatusTableExists(ctx); err != nil {
147
+
log.Printf("Could not ensure following_refresh_status table exists: %v", err)
148
+
} else {
149
+
sf.checkAndRefreshFollowers(ctx, userDID)
142
150
}
143
151
144
152
tableName := fmt.Sprintf("feedcache_%s_%s", userDID, hash)
···
419
427
return hex.EncodeToString(hash[:])[:5]
420
428
}
421
429
422
-
func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) {
423
-
unquotedTableName := "follows_" + userdid
424
-
tableName := pq.QuoteIdentifier(unquotedTableName)
425
-
fmt.Printf("Checking for table: %s\n", tableName) // Debug log
430
+
func (sf *StaticFeed) ensureRefreshStatusTableExists(ctx context.Context) error {
431
+
query := `
432
+
CREATE TABLE IF NOT EXISTS following_refresh_status (
433
+
user_did TEXT PRIMARY KEY,
434
+
last_fetched TIMESTAMPTZ,
435
+
is_refreshing BOOLEAN DEFAULT FALSE
436
+
);
437
+
`
438
+
_, err := sf.DB.ExecContext(ctx, query)
439
+
if err != nil {
440
+
return fmt.Errorf("error creating following_refresh_status table: %w", err)
441
+
}
442
+
return nil
443
+
}
426
444
427
-
// Check if cache table exists
428
-
var exists bool
429
-
query := "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)"
430
-
fmt.Printf("Executing query: %s with parameter: %s\n", query, unquotedTableName) // Debug log
431
-
err := db.QueryRowContext(ctx, query, unquotedTableName).Scan(&exists)
445
+
func (sf *StaticFeed) checkAndRefreshFollowers(ctx context.Context, userDID string) {
446
+
_, err := sf.DB.ExecContext(ctx,
447
+
"INSERT INTO following_refresh_status (user_did) VALUES ($1) ON CONFLICT (user_did) DO NOTHING", userDID)
432
448
if err != nil {
433
-
// Check for specific errors
434
-
if err == sql.ErrNoRows {
435
-
return nil, fmt.Errorf("table existence check returned no rows: %w", err)
436
-
}
437
-
if pqErr, ok := err.(*pq.Error); ok {
438
-
return nil, fmt.Errorf("PostgreSQL error: %s, Code: %s", pqErr.Message, pqErr.Code)
439
-
}
440
-
return nil, fmt.Errorf("error checking cache table existence: %w", err)
449
+
log.Printf("Error ensuring refresh status entry for %s: %v", userDID, err)
450
+
return
451
+
}
452
+
453
+
var lastFetched sql.NullTime
454
+
var isRefreshing bool
455
+
456
+
err = sf.DB.QueryRowContext(ctx,
457
+
"SELECT last_fetched, is_refreshing FROM following_refresh_status WHERE user_did = $1", userDID).Scan(&lastFetched, &isRefreshing)
458
+
if err != nil {
459
+
log.Printf("Error getting refresh status for %s: %v", userDID, err)
460
+
return
461
+
}
462
+
463
+
if isRefreshing {
464
+
log.Printf("Follower refresh for %s is already in progress. Skipping.", userDID)
465
+
return
466
+
}
467
+
468
+
if !lastFetched.Valid || time.Since(lastFetched.Time) > followingRefreshThreshold {
469
+
log.Printf("Follower list for %s is stale or has never been fetched. Triggering background refresh.", userDID)
470
+
go sf.refreshFollowersAsync(userDID)
471
+
return
441
472
}
442
473
443
-
fmt.Printf("Table exists: %v\n", exists) // Debug log
474
+
log.Printf("Follower list for %s is fresh enough (last fetched %v ago). Skipping refresh.", userDID, time.Since(lastFetched.Time))
475
+
}
444
476
445
-
if exists {
446
-
rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName))
477
+
func (sf *StaticFeed) refreshFollowersAsync(userDID string) {
478
+
ctx := context.Background()
479
+
480
+
_, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = TRUE WHERE user_did = $1", userDID)
481
+
if err != nil {
482
+
log.Printf("Error setting refresh lock for %s: %v", userDID, err)
483
+
return
484
+
}
485
+
486
+
defer func() {
487
+
_, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = FALSE WHERE user_did = $1", userDID)
447
488
if err != nil {
448
-
return nil, fmt.Errorf("error querying cache table: %w", err)
489
+
log.Printf("CRITICAL: Error releasing refresh lock for %s: %v", userDID, err)
449
490
}
450
-
defer rows.Close()
491
+
}()
451
492
452
-
var cachedFollows []string
453
-
for rows.Next() {
454
-
var follow string
455
-
if err := rows.Scan(&follow); err != nil {
456
-
return nil, fmt.Errorf("error scanning cached follow: %w", err)
457
-
}
458
-
cachedFollows = append(cachedFollows, follow)
459
-
}
493
+
err = fetchAndCacheFollowersFromAPI(ctx, sf.DB, userDID)
494
+
if err != nil {
495
+
log.Printf("Error refreshing followers for %s: %v", userDID, err)
496
+
return
497
+
}
460
498
461
-
if err := rows.Err(); err != nil {
462
-
return nil, fmt.Errorf("error iterating cached follows: %w", err)
463
-
}
499
+
_, err = sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET last_fetched = NOW() WHERE user_did = $1", userDID)
500
+
if err != nil {
501
+
log.Printf("Error updating last_fetched for %s: %v", userDID, err)
502
+
}
464
503
465
-
log.Println("Returning cached followers")
466
-
return cachedFollows, nil
467
-
}
504
+
log.Printf("Successfully refreshed followers for %s.", userDID)
505
+
}
468
506
469
-
log.Println("Fetching followers from API")
507
+
func fetchAndCacheFollowersFromAPI(ctx context.Context, db *sql.DB, userdid string) error {
508
+
log.Printf("Fetching followers from API for %s", userdid)
470
509
var allDIDs []string
471
510
cursor := ""
472
511
···
474
513
apiURL := fmt.Sprintf("https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor=%s&cursor=%s", userdid, cursor)
475
514
resp, err := http.Get(apiURL)
476
515
if err != nil {
477
-
log.Printf("Error making request: %v\n", err)
478
-
return nil, fmt.Errorf("failed to make request: %v", err)
516
+
return fmt.Errorf("failed to make request: %w", err)
479
517
}
480
518
defer resp.Body.Close()
481
519
482
520
body, err := ioutil.ReadAll(resp.Body)
483
521
if err != nil {
484
-
log.Printf("Error reading response: %v\n", err)
485
-
return nil, fmt.Errorf("failed to read response: %v", err)
522
+
return fmt.Errorf("failed to read response: %w", err)
486
523
}
487
524
488
525
var response Response
489
526
if err := json.Unmarshal(body, &response); err != nil {
490
-
log.Printf("Error unmarshalling JSON: %v\n", err)
491
-
return nil, fmt.Errorf("failed to unmarshal JSON: %v", err)
527
+
return fmt.Errorf("failed to unmarshal JSON: %w", err)
492
528
}
493
529
494
530
for _, follow := range response.Follows {
···
501
537
cursor = response.Cursor
502
538
}
503
539
504
-
// Drop the existing table if it exists
505
-
log.Println("Dropping existing followers table if it exists")
506
-
_, err = db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
540
+
unquotedTableName := "follows_" + userdid
541
+
tableName := pq.QuoteIdentifier(unquotedTableName)
542
+
tempTableName := "follows_temp_" + userdid
543
+
quotedTempTableName := pq.QuoteIdentifier(tempTableName)
544
+
545
+
log.Println("Dropping existing temporary followers table if it exists")
546
+
_, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", quotedTempTableName))
507
547
if err != nil {
508
-
return nil, fmt.Errorf("error dropping existing cache table: %w", err)
548
+
return fmt.Errorf("error dropping existing temp cache table: %w", err)
509
549
}
510
550
511
-
// Cache the results in the database
512
-
log.Println("Caching followers in the database")
513
-
_, err = db.ExecContext(ctx, fmt.Sprintf(`
514
-
CREATE TABLE %s (
515
-
follow TEXT UNIQUE
516
-
)
517
-
`, tableName))
551
+
log.Println("Creating temporary followers table")
552
+
_, err = db.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s (follow TEXT UNIQUE)`, quotedTempTableName))
518
553
if err != nil {
519
-
return nil, fmt.Errorf("error creating cache table: %w", err)
554
+
return fmt.Errorf("error creating temp cache table: %w", err)
520
555
}
521
556
522
-
// Use a map to track unique follows
523
-
followMap := make(map[string]struct{})
524
-
for _, follow := range allDIDs {
525
-
if _, exists := followMap[follow]; !exists {
526
-
followMap[follow] = struct{}{}
527
-
_, err := db.ExecContext(ctx, fmt.Sprintf(`
528
-
INSERT INTO %s (follow)
529
-
VALUES ($1)
530
-
ON CONFLICT (follow) DO NOTHING
531
-
`, tableName), follow)
557
+
if len(allDIDs) > 0 {
558
+
txn, err := db.BeginTx(ctx, nil)
559
+
if err != nil {
560
+
return fmt.Errorf("failed to begin transaction: %w", err)
561
+
}
562
+
563
+
stmt, err := txn.Prepare(pq.CopyIn(tempTableName, "follow"))
564
+
if err != nil {
565
+
return fmt.Errorf("failed to prepare copy in: %w", err)
566
+
}
567
+
568
+
for _, did := range allDIDs {
569
+
_, err = stmt.Exec(did)
532
570
if err != nil {
533
-
return nil, fmt.Errorf("error inserting into cache table: %w", err)
571
+
return fmt.Errorf("failed to exec copy in: %w", err)
534
572
}
535
573
}
574
+
575
+
_, err = stmt.Exec()
576
+
if err != nil {
577
+
return fmt.Errorf("failed to finalize copy in: %w", err)
578
+
}
579
+
580
+
err = stmt.Close()
581
+
if err != nil {
582
+
return fmt.Errorf("failed to close statement: %w", err)
583
+
}
584
+
585
+
err = txn.Commit()
586
+
if err != nil {
587
+
return fmt.Errorf("failed to commit transaction: %w", err)
588
+
}
536
589
}
537
590
538
-
log.Println("Returning fetched followers")
539
-
return allDIDs, nil
591
+
log.Println("Atomically replacing old followers table with new one")
592
+
tx, err := db.Begin()
593
+
if err != nil {
594
+
return fmt.Errorf("error beginning transaction for table rename: %w", err)
595
+
}
596
+
_, err = tx.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
597
+
if err != nil {
598
+
tx.Rollback()
599
+
return fmt.Errorf("error dropping old follows table in transaction: %w", err)
600
+
}
601
+
_, err = tx.Exec(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", quotedTempTableName, pq.QuoteIdentifier(unquotedTableName)))
602
+
if err != nil {
603
+
tx.Rollback()
604
+
return fmt.Errorf("error renaming temp table in transaction: %w", err)
605
+
}
606
+
err = tx.Commit()
607
+
if err != nil {
608
+
return fmt.Errorf("error committing transaction for table rename: %w", err)
609
+
}
610
+
611
+
log.Printf("Successfully cached %d followers for %s", len(allDIDs), userdid)
612
+
return nil
613
+
}
614
+
615
+
func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) {
616
+
unquotedTableName := "follows_" + userdid
617
+
tableName := pq.QuoteIdentifier(unquotedTableName)
618
+
619
+
var exists bool
620
+
err := db.QueryRowContext(ctx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)", unquotedTableName).Scan(&exists)
621
+
if err != nil {
622
+
return nil, fmt.Errorf("error checking follows cache table existence for %s: %w", userdid, err)
623
+
}
624
+
625
+
if !exists {
626
+
log.Printf("Follows cache table for %s does not exist. Waiting for background refresh to create it.", userdid)
627
+
return []string{}, nil
628
+
}
629
+
630
+
rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName))
631
+
if err != nil {
632
+
return nil, fmt.Errorf("error querying cache table: %w", err)
633
+
}
634
+
defer rows.Close()
635
+
636
+
var cachedFollows []string
637
+
for rows.Next() {
638
+
var follow string
639
+
if err := rows.Scan(&follow); err != nil {
640
+
return nil, fmt.Errorf("error scanning cached follow: %w", err)
641
+
}
642
+
cachedFollows = append(cachedFollows, follow)
643
+
}
644
+
645
+
if err := rows.Err(); err != nil {
646
+
return nil, fmt.Errorf("error iterating cached follows: %w", err)
647
+
}
648
+
649
+
log.Printf("Returning %d cached followers for %s", len(cachedFollows), userdid)
650
+
return cachedFollows, nil
540
651
}
541
652
542
653
func markPostsAsViewed(ctx context.Context, db *sql.DB, userDID string, posts []PostWithDate, smartReportingEnabled bool) error {