A collection of Custom Bluesky Feeds, including Fresh Feeds, all under one roof

Compare changes

Choose any two refs to compare.

Changed files
+426 -96
cmd
indexer
pkg
auth
feeds
fresh
+17 -17
README.md
··· 1 1 # Rinds 2 2 A collection of feeds under one roof. 3 - 4 - I don't like Docker and I don't need to compile this, okay thanks! 3 + including Fresh Feeds 5 4 6 5 ## Running 7 - aint that hard, go install go, setup postgres, and generate the required feed definitions under your user account. you can use https://pdsls.dev to generate a `app.bsky.feed.generator` record. 8 - Set the `rkey` to the desired short url value. itll look like: 6 + Install go, setup postgres, and then generate the required feed definitions under your user account. You can use https://pdsls.dev to generate a `app.bsky.feed.generator` record. 7 + Set the `rkey` to the desired short url value. Itll look like: 9 8 ``` 10 9 https://bsky.app/profile/{you}/feed/{rkey} 11 10 ``` 12 11 for the contents you can use the example below 13 12 ``` 14 13 { 15 - "did": "did:web:${INSERT DID:WEB HERE}", 14 + "did": "did:web:${your service endpoint}", 16 15 "$type": "app.bsky.feed.generator", 17 16 "createdAt": "2025-01-21T11:33:02.396Z", 18 17 "description": "wowww very descriptive", ··· 20 19 } 21 20 ``` 22 21 22 + If you are specifically trying to run any of the preexisting feeds, make sure to use the rkey as defined in the /pkg/feeds/*/feed.go and main.go files 23 + 23 24 ## Env 24 25 You can check out `.env.example` for an example 25 - 26 + The port is for your feedgen serve, not your postgres port 26 27 27 28 ## Postgres 28 29 Be sure to set up `.env` correctly ··· 31 32 32 33 ## Index 33 34 You should start Postgres first 34 - Then go run the firehose ingester in 35 + Then go run the firehose indexder inside 35 36 ``` 36 37 cd ./indexer 37 38 ``` 38 - and go compile it 39 + And then go compile it 39 40 ``` 40 41 go build -o indexer ./indexer.go && export $(grep -v '^#' ./../.env | xargs) && ./indexer 41 42 ``` 42 - after it has been compiled, you can use `rerun.sh` to ensure it will automatically recover after failure 43 + Once compiled, you can use `rerun.sh` to ensure it will automatically recover after failure 43 44 44 45 ## Serve 45 46 Make sure the indexer (or at least Postgres) is running first: 46 47 ``` 47 48 go build -o feedgen cmd/main.go && export $(grep -v '^#' ./.env | xargs) && ./feedgen 48 49 ``` 49 - the logs are pretty verbose imo, fyi 50 + Logs are pretty verbose, just FYI. 50 51 51 52 ## Todo 52 53 - [ ] Faster Indexing 53 - - [ ] Proper Up-to-Date Following Indexing 54 + - [x] Proper Up-to-Date Following Indexing 54 55 - [x] Repost Indicators 55 - - [ ] Cache Timeouts 56 + - [x] Cache Timeouts 56 57 - [x] Likes 57 58 - [x] Posts 58 59 - [x] Feed Caches 59 - - [ ] Followings 60 60 - [ ] More Fresh Feed Variants 61 61 - [ ] unFresh 62 62 - [x] +9 hrs ··· 68 68 - [ ] Fresh: Text Only 69 69 70 70 ## Architecture 71 - Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works. 71 + Based on [go-bsky-feed-generator](https://github.com/ericvolp12/go-bsky-feed-generator). Read the README in the linked repo for more info about how it all works. This repository differs from the template by not using the docker system at all. 72 72 73 - ### /feeds/static 73 + ### /pkg/feeds/static 74 74 Basic example feed from the template. Kept as a sanity check if all else seems to fail. 75 75 76 - ### /feeds/fresh 77 - Fresh feeds, all based around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds". 76 + ### /pkg/feeds/fresh 77 + Fresh feeds, all built around a shared Following feed builder and logic to set posts as viewed. May contain some remnant old references to the old name "rinds". 78 78
+31
cmd/main.go
··· 121 121 false, 122 122 ) 123 123 124 + localrindsFeed, localrindsFeedAliases, err := freshfeeds.NewStaticFeed( 125 + ctx, 126 + feedActorDID, 127 + "localrinds-test", 128 + // This static post is the conversation that sparked this demo repo 129 + []string{"at://did:plc:mn45tewwnse5btfftvd3powc/app.bsky.feed.post/3kgjjhlsnoi2f"}, 130 + db, 131 + "localrinds-test", 132 + false, 133 + ) 134 + 124 135 randomFeed, randomFeedAliases, err := freshfeeds.NewStaticFeed( 125 136 ctx, 126 137 feedActorDID, ··· 208 219 // Add the static feed to the feed generator 209 220 feedRouter.AddFeed(staticFeedAliases, staticFeed) 210 221 222 + feedRouter.AddFeed(localrindsFeedAliases, localrindsFeed) 223 + 211 224 feedRouter.AddFeed(rindsFeedAliases, rindsFeed) 212 225 feedRouter.AddFeed(randomFeedAliases, randomFeed) 213 226 feedRouter.AddFeed(repostsFeedAliases, repostsFeed) ··· 241 254 ep := ginendpoints.NewEndpoints(feedRouter) 242 255 router.GET("/.well-known/did.json", ep.GetWellKnownDID) 243 256 router.GET("/xrpc/app.bsky.feed.describeFeedGenerator", ep.DescribeFeeds) 257 + // Root route: ASCII art and GitHub link 258 + router.GET("/", func(c *gin.Context) { 259 + c.Header("Content-Type", "text/plain; charset=utf-8") 260 + c.String(http.StatusOK, ` ____ _ _ 261 + | _ \(_)_ __ __| |___ 262 + | |_) | | '_ \ / _' / __| 263 + | _ <| | | | | (_| \__ \ 264 + |_| \_\_|_| |_|\__,_|___/ 265 + 266 + bsky feed generators by @whey.party 267 + 268 + Code: https://github.com/rimar1337/rinds 269 + Flagship Fresh Feeds Instance: https://bsky.app/profile/did:plc:mn45tewwnse5btfftvd3powc/feed/rinds 270 + 271 + Fresh Feeds icons by @pprmint.de 272 + Repository generated from https://github.com/ericvolp12/go-bsky-feed-generator 273 + `) 274 + }) 244 275 245 276 // Plug in Authentication Middleware 246 277 auther, err := auth.NewAuth(
+193 -4
indexer/indexer.go
··· 3 3 import ( 4 4 "context" 5 5 "database/sql" 6 + "encoding/json" 6 7 "fmt" 7 8 "log" 8 9 "os" ··· 13 14 _ "github.com/lib/pq" 14 15 ) 15 16 16 - const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like" 17 + const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow" 17 18 18 19 type LikeMessage struct { 19 20 Did string `json:"did"` ··· 38 39 Reply *Reply `json:"reply,omitempty"` 39 40 } 40 41 42 + type FollowRecord struct { 43 + DID string `json:"subject"` 44 + createdAt string `json:"createdAt"` 45 + } 46 + 41 47 type Reply struct { 42 48 Parent ReplySubject `json:"parent"` 43 49 Root ReplySubject `json:"root"` ··· 46 52 type LikeSubject struct { 47 53 CID string `json:"cid"` 48 54 URI string `json:"uri"` 55 + Raw string // Stores raw string if not a JSON object 56 + } 57 + 58 + // This handles both string and object forms 59 + func (ls *LikeSubject) UnmarshalJSON(data []byte) error { 60 + // Try to unmarshal as object first 61 + var obj struct { 62 + CID string `json:"cid"` 63 + URI string `json:"uri"` 64 + } 65 + if err := json.Unmarshal(data, &obj); err == nil && obj.URI != "" { 66 + ls.CID = obj.CID 67 + ls.URI = obj.URI 68 + return nil 69 + } 70 + 71 + // If it's not a valid object, try to unmarshal as string 72 + var str string 73 + if err := json.Unmarshal(data, &str); err == nil { 74 + ls.Raw = str 75 + return nil 76 + } 77 + 78 + return fmt.Errorf("LikeSubject: invalid JSON: %s", string(data)) 49 79 } 50 80 51 81 type ReplySubject struct { ··· 58 88 var ( 59 89 postBatch []Post 60 90 likeBatch []Like 91 + followBatch []Follow 61 92 batchInsertSize = 1000 // Adjust the batch size as needed 62 93 batchInterval = 30 * time.Second // Flush every 30 seconds 63 94 ) ··· 77 108 RelDate int64 78 109 } 79 110 111 + type Follow struct { 112 + RelAuthor string 113 + followSubjectDID string 114 + } 115 + 116 + var trackedDIDsMap map[string]struct{} 117 + var trackedDIDs []string 118 + 119 + func initTrackedDIDsMap(dids []string) { 120 + trackedDIDsMap = make(map[string]struct{}, len(dids)) 121 + for _, did := range dids { 122 + trackedDIDsMap[did] = struct{}{} 123 + } 124 + } 125 + 80 126 func getLastCursor(db *sql.DB) int64 { 81 127 var lastCursor int64 82 128 err := db.QueryRow("SELECT lastCursor FROM cursor WHERE id = 1").Scan(&lastCursor) ··· 90 136 return lastCursor 91 137 } 92 138 139 + // Periodically refreshes the trackedDIDs and trackedDIDsMap from the database. 140 + func startTrackedDIDRefreshJob(db *sql.DB) { 141 + ticker := time.NewTicker(10 * time.Minute) 142 + defer ticker.Stop() 143 + 144 + for range ticker.C { 145 + updatedDIDs, err := getTrackedDIDs(context.Background(), db) 146 + if err != nil { 147 + log.Printf("Failed to refresh tracked DIDs: %v", err) 148 + continue 149 + } 150 + initTrackedDIDsMap(updatedDIDs) 151 + trackedDIDs = updatedDIDs 152 + log.Printf("Refreshed tracked DIDs: %v\n", trackedDIDs) 153 + } 154 + } 93 155 func main() { 94 156 // Connect to Postgres // Open the database connection 95 157 dbHost := os.Getenv("DB_HOST") ··· 118 180 // Retrieve the last cursor 119 181 lastCursor := getLastCursor(db) 120 182 183 + // initialize the tracked DIDs 184 + trackedDIDs, err = getTrackedDIDs(context.Background(), db) 185 + if err != nil { 186 + log.Fatalf("Failed to get tracked DIDs: %v", err) 187 + } 188 + log.Printf("Tracked DIDs: %v\n", trackedDIDs) 189 + initTrackedDIDsMap(trackedDIDs) 190 + 191 + go startBatchInsertFollowJob(db) 192 + 193 + go startTrackedDIDRefreshJob(db) 194 + 121 195 // If the cursor is older than 24 hours, skip it 122 196 if lastCursor > 0 { 123 197 cursorTime := time.UnixMicro(lastCursor) ··· 182 256 id SERIAL PRIMARY KEY, 183 257 rel_author TEXT NOT NULL, 184 258 post_uri TEXT NOT NULL, 185 - rel_date BIGINT NOT NULL 259 + rel_date BIGINT NOT NULL, 260 + UNIQUE(rel_author, post_uri) 186 261 ); 187 262 `) 188 263 if err != nil { ··· 211 286 } 212 287 } 213 288 func processMessage(db *sql.DB, msg LikeMessage) { 289 + // log.Print("Processing message...") 214 290 // Convert cursor to time 215 291 cursorTime := time.UnixMicro(msg.TimeUs) 216 292 ··· 244 320 deletePost(db, msg.Did, postUri, msg.TimeUs) 245 321 } 246 322 case "app.bsky.feed.repost": 323 + 247 324 if record.Subject.URI != "" { 248 325 if msg.Commit.Operation == "create" { 249 326 postBatch = append(postBatch, Post{msg.Did, record.Subject.URI, msg.TimeUs, true, repostUri, ""}) ··· 259 336 deleteLike(db, msg.Did, record.Subject.URI) 260 337 } 261 338 } 339 + case "app.bsky.graph.follow": 340 + _, tracked := trackedDIDsMap[msg.Did] 341 + if tracked { 342 + // log.Printf("Following found tracked DID: %s\n", msg.Did) 343 + if msg.Commit.Operation == "create" { 344 + // log.Printf("Following Create; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw) 345 + followBatch = append(followBatch, Follow{msg.Did, record.Subject.Raw}) 346 + } else if msg.Commit.Operation == "delete" { 347 + // log.Printf("Following Delete; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw) 348 + //log.Printf("Unfollowing: %s", msg.Commit.RKey) 349 + // Remove the DID from the tracked DIDs map 350 + deleteFollow(db, msg.Did, msg.Commit.RKey) 351 + } 352 + } 262 353 default: 263 354 //log.Printf("Unknown collection: %s", msg.Commit.Collection) 264 355 } ··· 290 381 } 291 382 } 292 383 384 + func deleteFollow(db *sql.DB, relAuthor, did string) { 385 + unquotedTableName := "follows_" + relAuthor 386 + tableName := pq.QuoteIdentifier(unquotedTableName) 387 + _, err := db.Exec(fmt.Sprintf(` 388 + DELETE FROM %s WHERE follow = $1; 389 + `, tableName), did) 390 + if err != nil { 391 + log.Printf("Error deleting follow: %v", err) 392 + } 393 + } 394 + 293 395 func startCleanupJob(db *sql.DB) { 294 396 ticker := time.NewTicker(1 * time.Hour) 295 397 defer ticker.Stop() ··· 347 449 _, err := stmt.Exec(post.RelAuthor, post.PostUri, post.RelDate, post.IsRepost, post.RepostUri, post.ReplyTo) 348 450 if err != nil { 349 451 log.Printf("Error executing statement: %v", err) 452 + log.Printf("Failed INSERT: %+v\nError: %v", post, err) 453 + os.Exit(1) // Exit on error 350 454 } 351 455 } 352 456 ··· 360 464 } 361 465 362 466 func startBatchInsertLikesJob(db *sql.DB) { 363 - ticker := time.NewTicker(1 * time.Second) 467 + ticker := time.NewTicker(batchInterval) 364 468 defer ticker.Stop() 365 469 366 470 for range ticker.C { 367 - if len(likeBatch) > 0 { 471 + if len(likeBatch) >= batchInsertSize { 368 472 batchInsertLikes(db) 369 473 } 370 474 } 371 475 } 372 476 477 + func startBatchInsertFollowJob(db *sql.DB) { 478 + ticker := time.NewTicker(batchInterval) 479 + defer ticker.Stop() 480 + 481 + for range ticker.C { 482 + if len(followBatch) >= 1 { 483 + batchInsertFollow(db) 484 + } 485 + } 486 + } 487 + func batchInsertFollow(db *sql.DB) { 488 + tx, err := db.Begin() 489 + if err != nil { 490 + log.Printf("Error starting transaction: %v", err) 491 + return 492 + } 493 + 494 + unquotedTableName := "follows_" + followBatch[0].RelAuthor 495 + tableName := pq.QuoteIdentifier(unquotedTableName) 496 + 497 + stmt, err := tx.Prepare(fmt.Sprintf(` 498 + INSERT INTO %s (follow) 499 + VALUES ($1) 500 + ON CONFLICT (follow) DO NOTHING 501 + `, tableName)) 502 + if err != nil { 503 + log.Printf("Error preparing statement: %v", err) 504 + return 505 + } 506 + defer stmt.Close() 507 + 508 + for _, follow := range followBatch { 509 + _, err := stmt.Exec(follow.followSubjectDID) 510 + if err != nil { 511 + log.Printf("Error executing statement: %v", err) 512 + log.Printf("Failed FOLLOW INSERT: %+v\nError: %v", follow, err) 513 + os.Exit(1) // Exit on error 514 + } 515 + } 516 + 517 + err = tx.Commit() 518 + if err != nil { 519 + log.Printf("Error committing transaction: %v", err) 520 + } 521 + 522 + // Clear the batch 523 + followBatch = followBatch[:0] 524 + } 525 + 373 526 func batchInsertLikes(db *sql.DB) { 374 527 tx, err := db.Begin() 375 528 if err != nil { ··· 392 545 _, err := stmt.Exec(like.RelAuthor, like.PostUri, like.RelDate) 393 546 if err != nil { 394 547 log.Printf("Error executing statement: %v", err) 548 + log.Printf("Failed LIKE INSERT: %+v\nError: %v", like, err) 549 + os.Exit(1) // Exit on error 395 550 } 396 551 } 397 552 ··· 472 627 473 628 return nil 474 629 } 630 + 631 + // ehhhhh why are we doing this 632 + func getTrackedDIDs(ctx context.Context, db *sql.DB) ([]string, error) { 633 + const prefix = "follows_" 634 + query := ` 635 + SELECT table_name 636 + FROM information_schema.tables 637 + WHERE table_name LIKE $1 638 + ` 639 + rows, err := db.QueryContext(ctx, query, prefix+"%") 640 + if err != nil { 641 + return nil, fmt.Errorf("error querying tracked follows tables: %w", err) 642 + } 643 + defer rows.Close() 644 + 645 + var trackedDIDs []string 646 + for rows.Next() { 647 + var tableName string 648 + if err := rows.Scan(&tableName); err != nil { 649 + return nil, fmt.Errorf("error scanning table name: %w", err) 650 + } 651 + // Strip prefix to get the DID 652 + if len(tableName) > len(prefix) { 653 + did := tableName[len(prefix):] 654 + trackedDIDs = append(trackedDIDs, did) 655 + } 656 + } 657 + 658 + if err := rows.Err(); err != nil { 659 + return nil, fmt.Errorf("error iterating tracked tables: %w", err) 660 + } 661 + 662 + return trackedDIDs, nil 663 + }
+1 -2
pkg/auth/auth.go
··· 115 115 accessToken := authHeaderParts[1] 116 116 117 117 parser := jwt.Parser{ 118 - ValidMethods: []string{es256k.SigningMethodES256K.Alg()}, 119 - SkipClaimsValidation: true, // IM SORRY I HAD TO, MY VPS IS ACTING STRANGE. I THINK ITS FINE 118 + ValidMethods: []string{es256k.SigningMethodES256K.Alg()}, 120 119 } 121 120 122 121 token, err := parser.ParseWithClaims(accessToken, claims, func(token *jwt.Token) (interface{}, error) {
+184 -73
pkg/feeds/fresh/feed.go
··· 30 30 type FeedType string 31 31 32 32 const ( 33 - Rinds FeedType = "rinds" 34 - Random FeedType = "random" 35 - Mnine FeedType = "mnine" 36 - Reposts FeedType = "reposts" 37 - OReplies FeedType = "oreplies" 33 + Rinds FeedType = "rinds" 34 + Random FeedType = "random" 35 + Mnine FeedType = "mnine" 36 + Reposts FeedType = "reposts" 37 + OReplies FeedType = "oreplies" 38 + followingRefreshThreshold = 12 * time.Hour 38 39 ) 39 40 40 41 type StaticFeed struct { ··· 139 140 hash = generateHash(userDID) 140 141 } else { 141 142 log.Println("Using existing hash") 143 + } 144 + 145 + // hacky stopgap following list refresh 146 + if err := sf.ensureRefreshStatusTableExists(ctx); err != nil { 147 + log.Printf("Could not ensure following_refresh_status table exists: %v", err) 148 + } else { 149 + sf.checkAndRefreshFollowers(ctx, userDID) 142 150 } 143 151 144 152 tableName := fmt.Sprintf("feedcache_%s_%s", userDID, hash) ··· 419 427 return hex.EncodeToString(hash[:])[:5] 420 428 } 421 429 422 - func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) { 423 - unquotedTableName := "follows_" + userdid 424 - tableName := pq.QuoteIdentifier(unquotedTableName) 425 - fmt.Printf("Checking for table: %s\n", tableName) // Debug log 430 + func (sf *StaticFeed) ensureRefreshStatusTableExists(ctx context.Context) error { 431 + query := ` 432 + CREATE TABLE IF NOT EXISTS following_refresh_status ( 433 + user_did TEXT PRIMARY KEY, 434 + last_fetched TIMESTAMPTZ, 435 + is_refreshing BOOLEAN DEFAULT FALSE 436 + ); 437 + ` 438 + _, err := sf.DB.ExecContext(ctx, query) 439 + if err != nil { 440 + return fmt.Errorf("error creating following_refresh_status table: %w", err) 441 + } 442 + return nil 443 + } 426 444 427 - // Check if cache table exists 428 - var exists bool 429 - query := "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)" 430 - fmt.Printf("Executing query: %s with parameter: %s\n", query, unquotedTableName) // Debug log 431 - err := db.QueryRowContext(ctx, query, unquotedTableName).Scan(&exists) 445 + func (sf *StaticFeed) checkAndRefreshFollowers(ctx context.Context, userDID string) { 446 + _, err := sf.DB.ExecContext(ctx, 447 + "INSERT INTO following_refresh_status (user_did) VALUES ($1) ON CONFLICT (user_did) DO NOTHING", userDID) 432 448 if err != nil { 433 - // Check for specific errors 434 - if err == sql.ErrNoRows { 435 - return nil, fmt.Errorf("table existence check returned no rows: %w", err) 436 - } 437 - if pqErr, ok := err.(*pq.Error); ok { 438 - return nil, fmt.Errorf("PostgreSQL error: %s, Code: %s", pqErr.Message, pqErr.Code) 439 - } 440 - return nil, fmt.Errorf("error checking cache table existence: %w", err) 449 + log.Printf("Error ensuring refresh status entry for %s: %v", userDID, err) 450 + return 451 + } 452 + 453 + var lastFetched sql.NullTime 454 + var isRefreshing bool 455 + 456 + err = sf.DB.QueryRowContext(ctx, 457 + "SELECT last_fetched, is_refreshing FROM following_refresh_status WHERE user_did = $1", userDID).Scan(&lastFetched, &isRefreshing) 458 + if err != nil { 459 + log.Printf("Error getting refresh status for %s: %v", userDID, err) 460 + return 461 + } 462 + 463 + if isRefreshing { 464 + log.Printf("Follower refresh for %s is already in progress. Skipping.", userDID) 465 + return 466 + } 467 + 468 + if !lastFetched.Valid || time.Since(lastFetched.Time) > followingRefreshThreshold { 469 + log.Printf("Follower list for %s is stale or has never been fetched. Triggering background refresh.", userDID) 470 + go sf.refreshFollowersAsync(userDID) 471 + return 441 472 } 442 473 443 - fmt.Printf("Table exists: %v\n", exists) // Debug log 474 + log.Printf("Follower list for %s is fresh enough (last fetched %v ago). Skipping refresh.", userDID, time.Since(lastFetched.Time)) 475 + } 444 476 445 - if exists { 446 - rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName)) 477 + func (sf *StaticFeed) refreshFollowersAsync(userDID string) { 478 + ctx := context.Background() 479 + 480 + _, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = TRUE WHERE user_did = $1", userDID) 481 + if err != nil { 482 + log.Printf("Error setting refresh lock for %s: %v", userDID, err) 483 + return 484 + } 485 + 486 + defer func() { 487 + _, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = FALSE WHERE user_did = $1", userDID) 447 488 if err != nil { 448 - return nil, fmt.Errorf("error querying cache table: %w", err) 489 + log.Printf("CRITICAL: Error releasing refresh lock for %s: %v", userDID, err) 449 490 } 450 - defer rows.Close() 491 + }() 451 492 452 - var cachedFollows []string 453 - for rows.Next() { 454 - var follow string 455 - if err := rows.Scan(&follow); err != nil { 456 - return nil, fmt.Errorf("error scanning cached follow: %w", err) 457 - } 458 - cachedFollows = append(cachedFollows, follow) 459 - } 493 + err = fetchAndCacheFollowersFromAPI(ctx, sf.DB, userDID) 494 + if err != nil { 495 + log.Printf("Error refreshing followers for %s: %v", userDID, err) 496 + return 497 + } 460 498 461 - if err := rows.Err(); err != nil { 462 - return nil, fmt.Errorf("error iterating cached follows: %w", err) 463 - } 499 + _, err = sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET last_fetched = NOW() WHERE user_did = $1", userDID) 500 + if err != nil { 501 + log.Printf("Error updating last_fetched for %s: %v", userDID, err) 502 + } 464 503 465 - log.Println("Returning cached followers") 466 - return cachedFollows, nil 467 - } 504 + log.Printf("Successfully refreshed followers for %s.", userDID) 505 + } 468 506 469 - log.Println("Fetching followers from API") 507 + func fetchAndCacheFollowersFromAPI(ctx context.Context, db *sql.DB, userdid string) error { 508 + log.Printf("Fetching followers from API for %s", userdid) 470 509 var allDIDs []string 471 510 cursor := "" 472 511 ··· 474 513 apiURL := fmt.Sprintf("https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor=%s&cursor=%s", userdid, cursor) 475 514 resp, err := http.Get(apiURL) 476 515 if err != nil { 477 - log.Printf("Error making request: %v\n", err) 478 - return nil, fmt.Errorf("failed to make request: %v", err) 516 + return fmt.Errorf("failed to make request: %w", err) 479 517 } 480 518 defer resp.Body.Close() 481 519 482 520 body, err := ioutil.ReadAll(resp.Body) 483 521 if err != nil { 484 - log.Printf("Error reading response: %v\n", err) 485 - return nil, fmt.Errorf("failed to read response: %v", err) 522 + return fmt.Errorf("failed to read response: %w", err) 486 523 } 487 524 488 525 var response Response 489 526 if err := json.Unmarshal(body, &response); err != nil { 490 - log.Printf("Error unmarshalling JSON: %v\n", err) 491 - return nil, fmt.Errorf("failed to unmarshal JSON: %v", err) 527 + return fmt.Errorf("failed to unmarshal JSON: %w", err) 492 528 } 493 529 494 530 for _, follow := range response.Follows { ··· 501 537 cursor = response.Cursor 502 538 } 503 539 504 - // Drop the existing table if it exists 505 - log.Println("Dropping existing followers table if it exists") 506 - _, err = db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) 540 + unquotedTableName := "follows_" + userdid 541 + tableName := pq.QuoteIdentifier(unquotedTableName) 542 + tempTableName := "follows_temp_" + userdid 543 + quotedTempTableName := pq.QuoteIdentifier(tempTableName) 544 + 545 + log.Println("Dropping existing temporary followers table if it exists") 546 + _, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", quotedTempTableName)) 507 547 if err != nil { 508 - return nil, fmt.Errorf("error dropping existing cache table: %w", err) 548 + return fmt.Errorf("error dropping existing temp cache table: %w", err) 509 549 } 510 550 511 - // Cache the results in the database 512 - log.Println("Caching followers in the database") 513 - _, err = db.ExecContext(ctx, fmt.Sprintf(` 514 - CREATE TABLE %s ( 515 - follow TEXT UNIQUE 516 - ) 517 - `, tableName)) 551 + log.Println("Creating temporary followers table") 552 + _, err = db.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s (follow TEXT UNIQUE)`, quotedTempTableName)) 518 553 if err != nil { 519 - return nil, fmt.Errorf("error creating cache table: %w", err) 554 + return fmt.Errorf("error creating temp cache table: %w", err) 520 555 } 521 556 522 - // Use a map to track unique follows 523 - followMap := make(map[string]struct{}) 524 - for _, follow := range allDIDs { 525 - if _, exists := followMap[follow]; !exists { 526 - followMap[follow] = struct{}{} 527 - _, err := db.ExecContext(ctx, fmt.Sprintf(` 528 - INSERT INTO %s (follow) 529 - VALUES ($1) 530 - ON CONFLICT (follow) DO NOTHING 531 - `, tableName), follow) 557 + if len(allDIDs) > 0 { 558 + txn, err := db.BeginTx(ctx, nil) 559 + if err != nil { 560 + return fmt.Errorf("failed to begin transaction: %w", err) 561 + } 562 + 563 + stmt, err := txn.Prepare(pq.CopyIn(tempTableName, "follow")) 564 + if err != nil { 565 + return fmt.Errorf("failed to prepare copy in: %w", err) 566 + } 567 + 568 + for _, did := range allDIDs { 569 + _, err = stmt.Exec(did) 532 570 if err != nil { 533 - return nil, fmt.Errorf("error inserting into cache table: %w", err) 571 + return fmt.Errorf("failed to exec copy in: %w", err) 534 572 } 535 573 } 574 + 575 + _, err = stmt.Exec() 576 + if err != nil { 577 + return fmt.Errorf("failed to finalize copy in: %w", err) 578 + } 579 + 580 + err = stmt.Close() 581 + if err != nil { 582 + return fmt.Errorf("failed to close statement: %w", err) 583 + } 584 + 585 + err = txn.Commit() 586 + if err != nil { 587 + return fmt.Errorf("failed to commit transaction: %w", err) 588 + } 536 589 } 537 590 538 - log.Println("Returning fetched followers") 539 - return allDIDs, nil 591 + log.Println("Atomically replacing old followers table with new one") 592 + tx, err := db.Begin() 593 + if err != nil { 594 + return fmt.Errorf("error beginning transaction for table rename: %w", err) 595 + } 596 + _, err = tx.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) 597 + if err != nil { 598 + tx.Rollback() 599 + return fmt.Errorf("error dropping old follows table in transaction: %w", err) 600 + } 601 + _, err = tx.Exec(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", quotedTempTableName, pq.QuoteIdentifier(unquotedTableName))) 602 + if err != nil { 603 + tx.Rollback() 604 + return fmt.Errorf("error renaming temp table in transaction: %w", err) 605 + } 606 + err = tx.Commit() 607 + if err != nil { 608 + return fmt.Errorf("error committing transaction for table rename: %w", err) 609 + } 610 + 611 + log.Printf("Successfully cached %d followers for %s", len(allDIDs), userdid) 612 + return nil 613 + } 614 + 615 + func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) { 616 + unquotedTableName := "follows_" + userdid 617 + tableName := pq.QuoteIdentifier(unquotedTableName) 618 + 619 + var exists bool 620 + err := db.QueryRowContext(ctx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)", unquotedTableName).Scan(&exists) 621 + if err != nil { 622 + return nil, fmt.Errorf("error checking follows cache table existence for %s: %w", userdid, err) 623 + } 624 + 625 + if !exists { 626 + log.Printf("Follows cache table for %s does not exist. Waiting for background refresh to create it.", userdid) 627 + return []string{}, nil 628 + } 629 + 630 + rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName)) 631 + if err != nil { 632 + return nil, fmt.Errorf("error querying cache table: %w", err) 633 + } 634 + defer rows.Close() 635 + 636 + var cachedFollows []string 637 + for rows.Next() { 638 + var follow string 639 + if err := rows.Scan(&follow); err != nil { 640 + return nil, fmt.Errorf("error scanning cached follow: %w", err) 641 + } 642 + cachedFollows = append(cachedFollows, follow) 643 + } 644 + 645 + if err := rows.Err(); err != nil { 646 + return nil, fmt.Errorf("error iterating cached follows: %w", err) 647 + } 648 + 649 + log.Printf("Returning %d cached followers for %s", len(cachedFollows), userdid) 650 + return cachedFollows, nil 540 651 } 541 652 542 653 func markPostsAsViewed(ctx context.Context, db *sql.DB, userDID string, posts []PostWithDate, smartReportingEnabled bool) error {