A collection of Custom Bluesky Feeds, including Fresh Feeds, all under one roof

Compare changes

Choose any two refs to compare.

Changed files
+200 -89
pkg
auth
feeds
fresh
+15 -14
README.md
··· 1 1 # Rinds 2 2 A collection of feeds under one roof. 3 - 4 - I don't like Docker and I don't need to compile this, okay thanks! 3 + including Fresh Feeds 5 4 6 5 ## Running 7 - aint that hard, go install go, setup postgres, and generate the required feed definitions under your user account. you can use https://pdsls.dev to generate a `app.bsky.feed.generator` record. 8 - Set the `rkey` to the desired short url value. itll look like: 6 + Install go, setup postgres, and then generate the required feed definitions under your user account. You can use https://pdsls.dev to generate a `app.bsky.feed.generator` record. 7 + Set the `rkey` to the desired short url value. Itll look like: 9 8 ``` 10 9 https://bsky.app/profile/{you}/feed/{rkey} 11 10 ``` 12 11 for the contents you can use the example below 13 12 ``` 14 13 { 15 - "did": "did:web:${INSERT DID:WEB HERE}", 14 + "did": "did:web:${your service endpoint}", 16 15 "$type": "app.bsky.feed.generator", 17 16 "createdAt": "2025-01-21T11:33:02.396Z", 18 17 "description": "wowww very descriptive", ··· 20 19 } 21 20 ``` 22 21 22 + If you are specifically trying to run any of the preexisting feeds, make sure to use the rkey as defined in the /pkg/feeds/*/feed.go and main.go files 23 + 23 24 ## Env 24 25 You can check out `.env.example` for an example 25 - 26 + The port is for your feedgen serve, not your postgres port 26 27 27 28 ## Postgres 28 29 Be sure to set up `.env` correctly ··· 31 32 32 33 ## Index 33 34 You should start Postgres first 34 - Then go run the firehose ingester in 35 + Then go run the firehose indexder inside 35 36 ``` 36 37 cd ./indexer 37 38 ``` 38 - and go compile it 39 + And then go compile it 39 40 ``` 40 41 go build -o indexer ./indexer.go && export $(grep -v '^#' ./../.env | xargs) && ./indexer 41 42 ``` 42 - after it has been compiled, you can use `rerun.sh` to ensure it will automatically recover after failure 43 + Once compiled, you can use `rerun.sh` to ensure it will automatically recover after failure 43 44 44 45 ## Serve 45 46 Make sure the indexer (or at least Postgres) is running first: 46 47 ``` 47 48 go build -o feedgen cmd/main.go && export $(grep -v '^#' ./.env | xargs) && ./feedgen 48 49 ``` 49 - the logs are pretty verbose imo, fyi 50 + Logs are pretty verbose, just FYI. 50 51 51 52 ## Todo 52 53 - [ ] Faster Indexing ··· 67 68 - [ ] Fresh: Text Only 68 69 69 70 ## Architecture 70 - Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works. 71 + Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works. This repository differs from the template by not using the docker system at all. 71 72 72 - ### /feeds/static 73 + ### /pkg/feeds/static 73 74 Basic example feed from the template. Kept as a sanity check if all else seems to fail. 74 75 75 - ### /feeds/fresh 76 - Fresh feeds, all based around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds". 76 + ### /pkg/feeds/fresh 77 + Fresh feeds, all built around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds". 77 78
+1 -2
pkg/auth/auth.go
··· 115 115 accessToken := authHeaderParts[1] 116 116 117 117 parser := jwt.Parser{ 118 - ValidMethods: []string{es256k.SigningMethodES256K.Alg()}, 119 - SkipClaimsValidation: true, // IM SORRY I HAD TO, MY VPS IS ACTING STRANGE. I THINK ITS FINE 118 + ValidMethods: []string{es256k.SigningMethodES256K.Alg()}, 120 119 } 121 120 122 121 token, err := parser.ParseWithClaims(accessToken, claims, func(token *jwt.Token) (interface{}, error) {
+184 -73
pkg/feeds/fresh/feed.go
··· 30 30 type FeedType string 31 31 32 32 const ( 33 - Rinds FeedType = "rinds" 34 - Random FeedType = "random" 35 - Mnine FeedType = "mnine" 36 - Reposts FeedType = "reposts" 37 - OReplies FeedType = "oreplies" 33 + Rinds FeedType = "rinds" 34 + Random FeedType = "random" 35 + Mnine FeedType = "mnine" 36 + Reposts FeedType = "reposts" 37 + OReplies FeedType = "oreplies" 38 + followingRefreshThreshold = 12 * time.Hour 38 39 ) 39 40 40 41 type StaticFeed struct { ··· 139 140 hash = generateHash(userDID) 140 141 } else { 141 142 log.Println("Using existing hash") 143 + } 144 + 145 + // hacky stopgap following list refresh 146 + if err := sf.ensureRefreshStatusTableExists(ctx); err != nil { 147 + log.Printf("Could not ensure following_refresh_status table exists: %v", err) 148 + } else { 149 + sf.checkAndRefreshFollowers(ctx, userDID) 142 150 } 143 151 144 152 tableName := fmt.Sprintf("feedcache_%s_%s", userDID, hash) ··· 419 427 return hex.EncodeToString(hash[:])[:5] 420 428 } 421 429 422 - func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) { 423 - unquotedTableName := "follows_" + userdid 424 - tableName := pq.QuoteIdentifier(unquotedTableName) 425 - fmt.Printf("Checking for table: %s\n", tableName) // Debug log 430 + func (sf *StaticFeed) ensureRefreshStatusTableExists(ctx context.Context) error { 431 + query := ` 432 + CREATE TABLE IF NOT EXISTS following_refresh_status ( 433 + user_did TEXT PRIMARY KEY, 434 + last_fetched TIMESTAMPTZ, 435 + is_refreshing BOOLEAN DEFAULT FALSE 436 + ); 437 + ` 438 + _, err := sf.DB.ExecContext(ctx, query) 439 + if err != nil { 440 + return fmt.Errorf("error creating following_refresh_status table: %w", err) 441 + } 442 + return nil 443 + } 426 444 427 - // Check if cache table exists 428 - var exists bool 429 - query := "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)" 430 - fmt.Printf("Executing query: %s with parameter: %s\n", query, unquotedTableName) // Debug log 431 - err := db.QueryRowContext(ctx, query, unquotedTableName).Scan(&exists) 445 + func (sf *StaticFeed) checkAndRefreshFollowers(ctx context.Context, userDID string) { 446 + _, err := sf.DB.ExecContext(ctx, 447 + "INSERT INTO following_refresh_status (user_did) VALUES ($1) ON CONFLICT (user_did) DO NOTHING", userDID) 432 448 if err != nil { 433 - // Check for specific errors 434 - if err == sql.ErrNoRows { 435 - return nil, fmt.Errorf("table existence check returned no rows: %w", err) 436 - } 437 - if pqErr, ok := err.(*pq.Error); ok { 438 - return nil, fmt.Errorf("PostgreSQL error: %s, Code: %s", pqErr.Message, pqErr.Code) 439 - } 440 - return nil, fmt.Errorf("error checking cache table existence: %w", err) 449 + log.Printf("Error ensuring refresh status entry for %s: %v", userDID, err) 450 + return 451 + } 452 + 453 + var lastFetched sql.NullTime 454 + var isRefreshing bool 455 + 456 + err = sf.DB.QueryRowContext(ctx, 457 + "SELECT last_fetched, is_refreshing FROM following_refresh_status WHERE user_did = $1", userDID).Scan(&lastFetched, &isRefreshing) 458 + if err != nil { 459 + log.Printf("Error getting refresh status for %s: %v", userDID, err) 460 + return 461 + } 462 + 463 + if isRefreshing { 464 + log.Printf("Follower refresh for %s is already in progress. Skipping.", userDID) 465 + return 466 + } 467 + 468 + if !lastFetched.Valid || time.Since(lastFetched.Time) > followingRefreshThreshold { 469 + log.Printf("Follower list for %s is stale or has never been fetched. Triggering background refresh.", userDID) 470 + go sf.refreshFollowersAsync(userDID) 471 + return 441 472 } 442 473 443 - fmt.Printf("Table exists: %v\n", exists) // Debug log 474 + log.Printf("Follower list for %s is fresh enough (last fetched %v ago). Skipping refresh.", userDID, time.Since(lastFetched.Time)) 475 + } 444 476 445 - if exists { 446 - rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName)) 477 + func (sf *StaticFeed) refreshFollowersAsync(userDID string) { 478 + ctx := context.Background() 479 + 480 + _, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = TRUE WHERE user_did = $1", userDID) 481 + if err != nil { 482 + log.Printf("Error setting refresh lock for %s: %v", userDID, err) 483 + return 484 + } 485 + 486 + defer func() { 487 + _, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = FALSE WHERE user_did = $1", userDID) 447 488 if err != nil { 448 - return nil, fmt.Errorf("error querying cache table: %w", err) 489 + log.Printf("CRITICAL: Error releasing refresh lock for %s: %v", userDID, err) 449 490 } 450 - defer rows.Close() 491 + }() 451 492 452 - var cachedFollows []string 453 - for rows.Next() { 454 - var follow string 455 - if err := rows.Scan(&follow); err != nil { 456 - return nil, fmt.Errorf("error scanning cached follow: %w", err) 457 - } 458 - cachedFollows = append(cachedFollows, follow) 459 - } 493 + err = fetchAndCacheFollowersFromAPI(ctx, sf.DB, userDID) 494 + if err != nil { 495 + log.Printf("Error refreshing followers for %s: %v", userDID, err) 496 + return 497 + } 460 498 461 - if err := rows.Err(); err != nil { 462 - return nil, fmt.Errorf("error iterating cached follows: %w", err) 463 - } 499 + _, err = sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET last_fetched = NOW() WHERE user_did = $1", userDID) 500 + if err != nil { 501 + log.Printf("Error updating last_fetched for %s: %v", userDID, err) 502 + } 464 503 465 - log.Println("Returning cached followers") 466 - return cachedFollows, nil 467 - } 504 + log.Printf("Successfully refreshed followers for %s.", userDID) 505 + } 468 506 469 - log.Println("Fetching followers from API") 507 + func fetchAndCacheFollowersFromAPI(ctx context.Context, db *sql.DB, userdid string) error { 508 + log.Printf("Fetching followers from API for %s", userdid) 470 509 var allDIDs []string 471 510 cursor := "" 472 511 ··· 474 513 apiURL := fmt.Sprintf("https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor=%s&cursor=%s", userdid, cursor) 475 514 resp, err := http.Get(apiURL) 476 515 if err != nil { 477 - log.Printf("Error making request: %v\n", err) 478 - return nil, fmt.Errorf("failed to make request: %v", err) 516 + return fmt.Errorf("failed to make request: %w", err) 479 517 } 480 518 defer resp.Body.Close() 481 519 482 520 body, err := ioutil.ReadAll(resp.Body) 483 521 if err != nil { 484 - log.Printf("Error reading response: %v\n", err) 485 - return nil, fmt.Errorf("failed to read response: %v", err) 522 + return fmt.Errorf("failed to read response: %w", err) 486 523 } 487 524 488 525 var response Response 489 526 if err := json.Unmarshal(body, &response); err != nil { 490 - log.Printf("Error unmarshalling JSON: %v\n", err) 491 - return nil, fmt.Errorf("failed to unmarshal JSON: %v", err) 527 + return fmt.Errorf("failed to unmarshal JSON: %w", err) 492 528 } 493 529 494 530 for _, follow := range response.Follows { ··· 501 537 cursor = response.Cursor 502 538 } 503 539 504 - // Drop the existing table if it exists 505 - log.Println("Dropping existing followers table if it exists") 506 - _, err = db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) 540 + unquotedTableName := "follows_" + userdid 541 + tableName := pq.QuoteIdentifier(unquotedTableName) 542 + tempTableName := "follows_temp_" + userdid 543 + quotedTempTableName := pq.QuoteIdentifier(tempTableName) 544 + 545 + log.Println("Dropping existing temporary followers table if it exists") 546 + _, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", quotedTempTableName)) 507 547 if err != nil { 508 - return nil, fmt.Errorf("error dropping existing cache table: %w", err) 548 + return fmt.Errorf("error dropping existing temp cache table: %w", err) 509 549 } 510 550 511 - // Cache the results in the database 512 - log.Println("Caching followers in the database") 513 - _, err = db.ExecContext(ctx, fmt.Sprintf(` 514 - CREATE TABLE %s ( 515 - follow TEXT UNIQUE 516 - ) 517 - `, tableName)) 551 + log.Println("Creating temporary followers table") 552 + _, err = db.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s (follow TEXT UNIQUE)`, quotedTempTableName)) 518 553 if err != nil { 519 - return nil, fmt.Errorf("error creating cache table: %w", err) 554 + return fmt.Errorf("error creating temp cache table: %w", err) 520 555 } 521 556 522 - // Use a map to track unique follows 523 - followMap := make(map[string]struct{}) 524 - for _, follow := range allDIDs { 525 - if _, exists := followMap[follow]; !exists { 526 - followMap[follow] = struct{}{} 527 - _, err := db.ExecContext(ctx, fmt.Sprintf(` 528 - INSERT INTO %s (follow) 529 - VALUES ($1) 530 - ON CONFLICT (follow) DO NOTHING 531 - `, tableName), follow) 557 + if len(allDIDs) > 0 { 558 + txn, err := db.BeginTx(ctx, nil) 559 + if err != nil { 560 + return fmt.Errorf("failed to begin transaction: %w", err) 561 + } 562 + 563 + stmt, err := txn.Prepare(pq.CopyIn(tempTableName, "follow")) 564 + if err != nil { 565 + return fmt.Errorf("failed to prepare copy in: %w", err) 566 + } 567 + 568 + for _, did := range allDIDs { 569 + _, err = stmt.Exec(did) 532 570 if err != nil { 533 - return nil, fmt.Errorf("error inserting into cache table: %w", err) 571 + return fmt.Errorf("failed to exec copy in: %w", err) 534 572 } 535 573 } 574 + 575 + _, err = stmt.Exec() 576 + if err != nil { 577 + return fmt.Errorf("failed to finalize copy in: %w", err) 578 + } 579 + 580 + err = stmt.Close() 581 + if err != nil { 582 + return fmt.Errorf("failed to close statement: %w", err) 583 + } 584 + 585 + err = txn.Commit() 586 + if err != nil { 587 + return fmt.Errorf("failed to commit transaction: %w", err) 588 + } 536 589 } 537 590 538 - log.Println("Returning fetched followers") 539 - return allDIDs, nil 591 + log.Println("Atomically replacing old followers table with new one") 592 + tx, err := db.Begin() 593 + if err != nil { 594 + return fmt.Errorf("error beginning transaction for table rename: %w", err) 595 + } 596 + _, err = tx.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) 597 + if err != nil { 598 + tx.Rollback() 599 + return fmt.Errorf("error dropping old follows table in transaction: %w", err) 600 + } 601 + _, err = tx.Exec(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", quotedTempTableName, pq.QuoteIdentifier(unquotedTableName))) 602 + if err != nil { 603 + tx.Rollback() 604 + return fmt.Errorf("error renaming temp table in transaction: %w", err) 605 + } 606 + err = tx.Commit() 607 + if err != nil { 608 + return fmt.Errorf("error committing transaction for table rename: %w", err) 609 + } 610 + 611 + log.Printf("Successfully cached %d followers for %s", len(allDIDs), userdid) 612 + return nil 613 + } 614 + 615 + func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) { 616 + unquotedTableName := "follows_" + userdid 617 + tableName := pq.QuoteIdentifier(unquotedTableName) 618 + 619 + var exists bool 620 + err := db.QueryRowContext(ctx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)", unquotedTableName).Scan(&exists) 621 + if err != nil { 622 + return nil, fmt.Errorf("error checking follows cache table existence for %s: %w", userdid, err) 623 + } 624 + 625 + if !exists { 626 + log.Printf("Follows cache table for %s does not exist. Waiting for background refresh to create it.", userdid) 627 + return []string{}, nil 628 + } 629 + 630 + rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName)) 631 + if err != nil { 632 + return nil, fmt.Errorf("error querying cache table: %w", err) 633 + } 634 + defer rows.Close() 635 + 636 + var cachedFollows []string 637 + for rows.Next() { 638 + var follow string 639 + if err := rows.Scan(&follow); err != nil { 640 + return nil, fmt.Errorf("error scanning cached follow: %w", err) 641 + } 642 + cachedFollows = append(cachedFollows, follow) 643 + } 644 + 645 + if err := rows.Err(); err != nil { 646 + return nil, fmt.Errorf("error iterating cached follows: %w", err) 647 + } 648 + 649 + log.Printf("Returning %d cached followers for %s", len(cachedFollows), userdid) 650 + return cachedFollows, nil 540 651 } 541 652 542 653 func markPostsAsViewed(ctx context.Context, db *sql.DB, userDID string, posts []PostWithDate, smartReportingEnabled bool) error {