+184
-73
pkg/feeds/fresh/feed.go
+184
-73
pkg/feeds/fresh/feed.go
···
30
type FeedType string
31
32
const (
33
-
Rinds FeedType = "rinds"
34
-
Random FeedType = "random"
35
-
Mnine FeedType = "mnine"
36
-
Reposts FeedType = "reposts"
37
-
OReplies FeedType = "oreplies"
38
)
39
40
type StaticFeed struct {
···
139
hash = generateHash(userDID)
140
} else {
141
log.Println("Using existing hash")
142
}
143
144
tableName := fmt.Sprintf("feedcache_%s_%s", userDID, hash)
···
419
return hex.EncodeToString(hash[:])[:5]
420
}
421
422
-
func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) {
423
-
unquotedTableName := "follows_" + userdid
424
-
tableName := pq.QuoteIdentifier(unquotedTableName)
425
-
fmt.Printf("Checking for table: %s\n", tableName) // Debug log
426
427
-
// Check if cache table exists
428
-
var exists bool
429
-
query := "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)"
430
-
fmt.Printf("Executing query: %s with parameter: %s\n", query, unquotedTableName) // Debug log
431
-
err := db.QueryRowContext(ctx, query, unquotedTableName).Scan(&exists)
432
if err != nil {
433
-
// Check for specific errors
434
-
if err == sql.ErrNoRows {
435
-
return nil, fmt.Errorf("table existence check returned no rows: %w", err)
436
-
}
437
-
if pqErr, ok := err.(*pq.Error); ok {
438
-
return nil, fmt.Errorf("PostgreSQL error: %s, Code: %s", pqErr.Message, pqErr.Code)
439
-
}
440
-
return nil, fmt.Errorf("error checking cache table existence: %w", err)
441
}
442
443
-
fmt.Printf("Table exists: %v\n", exists) // Debug log
444
445
-
if exists {
446
-
rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName))
447
if err != nil {
448
-
return nil, fmt.Errorf("error querying cache table: %w", err)
449
}
450
-
defer rows.Close()
451
452
-
var cachedFollows []string
453
-
for rows.Next() {
454
-
var follow string
455
-
if err := rows.Scan(&follow); err != nil {
456
-
return nil, fmt.Errorf("error scanning cached follow: %w", err)
457
-
}
458
-
cachedFollows = append(cachedFollows, follow)
459
-
}
460
461
-
if err := rows.Err(); err != nil {
462
-
return nil, fmt.Errorf("error iterating cached follows: %w", err)
463
-
}
464
465
-
log.Println("Returning cached followers")
466
-
return cachedFollows, nil
467
-
}
468
469
-
log.Println("Fetching followers from API")
470
var allDIDs []string
471
cursor := ""
472
···
474
apiURL := fmt.Sprintf("https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor=%s&cursor=%s", userdid, cursor)
475
resp, err := http.Get(apiURL)
476
if err != nil {
477
-
log.Printf("Error making request: %v\n", err)
478
-
return nil, fmt.Errorf("failed to make request: %v", err)
479
}
480
defer resp.Body.Close()
481
482
body, err := ioutil.ReadAll(resp.Body)
483
if err != nil {
484
-
log.Printf("Error reading response: %v\n", err)
485
-
return nil, fmt.Errorf("failed to read response: %v", err)
486
}
487
488
var response Response
489
if err := json.Unmarshal(body, &response); err != nil {
490
-
log.Printf("Error unmarshalling JSON: %v\n", err)
491
-
return nil, fmt.Errorf("failed to unmarshal JSON: %v", err)
492
}
493
494
for _, follow := range response.Follows {
···
501
cursor = response.Cursor
502
}
503
504
-
// Drop the existing table if it exists
505
-
log.Println("Dropping existing followers table if it exists")
506
-
_, err = db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
507
if err != nil {
508
-
return nil, fmt.Errorf("error dropping existing cache table: %w", err)
509
}
510
511
-
// Cache the results in the database
512
-
log.Println("Caching followers in the database")
513
-
_, err = db.ExecContext(ctx, fmt.Sprintf(`
514
-
CREATE TABLE %s (
515
-
follow TEXT UNIQUE
516
-
)
517
-
`, tableName))
518
if err != nil {
519
-
return nil, fmt.Errorf("error creating cache table: %w", err)
520
}
521
522
-
// Use a map to track unique follows
523
-
followMap := make(map[string]struct{})
524
-
for _, follow := range allDIDs {
525
-
if _, exists := followMap[follow]; !exists {
526
-
followMap[follow] = struct{}{}
527
-
_, err := db.ExecContext(ctx, fmt.Sprintf(`
528
-
INSERT INTO %s (follow)
529
-
VALUES ($1)
530
-
ON CONFLICT (follow) DO NOTHING
531
-
`, tableName), follow)
532
if err != nil {
533
-
return nil, fmt.Errorf("error inserting into cache table: %w", err)
534
}
535
}
536
}
537
538
-
log.Println("Returning fetched followers")
539
-
return allDIDs, nil
540
}
541
542
func markPostsAsViewed(ctx context.Context, db *sql.DB, userDID string, posts []PostWithDate, smartReportingEnabled bool) error {
···
30
type FeedType string
31
32
const (
33
+
Rinds FeedType = "rinds"
34
+
Random FeedType = "random"
35
+
Mnine FeedType = "mnine"
36
+
Reposts FeedType = "reposts"
37
+
OReplies FeedType = "oreplies"
38
+
followingRefreshThreshold = 12 * time.Hour
39
)
40
41
type StaticFeed struct {
···
140
hash = generateHash(userDID)
141
} else {
142
log.Println("Using existing hash")
143
+
}
144
+
145
+
// hacky stopgap following list refresh
146
+
if err := sf.ensureRefreshStatusTableExists(ctx); err != nil {
147
+
log.Printf("Could not ensure following_refresh_status table exists: %v", err)
148
+
} else {
149
+
sf.checkAndRefreshFollowers(ctx, userDID)
150
}
151
152
tableName := fmt.Sprintf("feedcache_%s_%s", userDID, hash)
···
427
return hex.EncodeToString(hash[:])[:5]
428
}
429
430
+
func (sf *StaticFeed) ensureRefreshStatusTableExists(ctx context.Context) error {
431
+
query := `
432
+
CREATE TABLE IF NOT EXISTS following_refresh_status (
433
+
user_did TEXT PRIMARY KEY,
434
+
last_fetched TIMESTAMPTZ,
435
+
is_refreshing BOOLEAN DEFAULT FALSE
436
+
);
437
+
`
438
+
_, err := sf.DB.ExecContext(ctx, query)
439
+
if err != nil {
440
+
return fmt.Errorf("error creating following_refresh_status table: %w", err)
441
+
}
442
+
return nil
443
+
}
444
445
+
func (sf *StaticFeed) checkAndRefreshFollowers(ctx context.Context, userDID string) {
446
+
_, err := sf.DB.ExecContext(ctx,
447
+
"INSERT INTO following_refresh_status (user_did) VALUES ($1) ON CONFLICT (user_did) DO NOTHING", userDID)
448
if err != nil {
449
+
log.Printf("Error ensuring refresh status entry for %s: %v", userDID, err)
450
+
return
451
+
}
452
+
453
+
var lastFetched sql.NullTime
454
+
var isRefreshing bool
455
+
456
+
err = sf.DB.QueryRowContext(ctx,
457
+
"SELECT last_fetched, is_refreshing FROM following_refresh_status WHERE user_did = $1", userDID).Scan(&lastFetched, &isRefreshing)
458
+
if err != nil {
459
+
log.Printf("Error getting refresh status for %s: %v", userDID, err)
460
+
return
461
+
}
462
+
463
+
if isRefreshing {
464
+
log.Printf("Follower refresh for %s is already in progress. Skipping.", userDID)
465
+
return
466
+
}
467
+
468
+
if !lastFetched.Valid || time.Since(lastFetched.Time) > followingRefreshThreshold {
469
+
log.Printf("Follower list for %s is stale or has never been fetched. Triggering background refresh.", userDID)
470
+
go sf.refreshFollowersAsync(userDID)
471
+
return
472
}
473
474
+
log.Printf("Follower list for %s is fresh enough (last fetched %v ago). Skipping refresh.", userDID, time.Since(lastFetched.Time))
475
+
}
476
477
+
func (sf *StaticFeed) refreshFollowersAsync(userDID string) {
478
+
ctx := context.Background()
479
+
480
+
_, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = TRUE WHERE user_did = $1", userDID)
481
+
if err != nil {
482
+
log.Printf("Error setting refresh lock for %s: %v", userDID, err)
483
+
return
484
+
}
485
+
486
+
defer func() {
487
+
_, err := sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET is_refreshing = FALSE WHERE user_did = $1", userDID)
488
if err != nil {
489
+
log.Printf("CRITICAL: Error releasing refresh lock for %s: %v", userDID, err)
490
}
491
+
}()
492
493
+
err = fetchAndCacheFollowersFromAPI(ctx, sf.DB, userDID)
494
+
if err != nil {
495
+
log.Printf("Error refreshing followers for %s: %v", userDID, err)
496
+
return
497
+
}
498
499
+
_, err = sf.DB.ExecContext(ctx, "UPDATE following_refresh_status SET last_fetched = NOW() WHERE user_did = $1", userDID)
500
+
if err != nil {
501
+
log.Printf("Error updating last_fetched for %s: %v", userDID, err)
502
+
}
503
504
+
log.Printf("Successfully refreshed followers for %s.", userDID)
505
+
}
506
507
+
func fetchAndCacheFollowersFromAPI(ctx context.Context, db *sql.DB, userdid string) error {
508
+
log.Printf("Fetching followers from API for %s", userdid)
509
var allDIDs []string
510
cursor := ""
511
···
513
apiURL := fmt.Sprintf("https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor=%s&cursor=%s", userdid, cursor)
514
resp, err := http.Get(apiURL)
515
if err != nil {
516
+
return fmt.Errorf("failed to make request: %w", err)
517
}
518
defer resp.Body.Close()
519
520
body, err := ioutil.ReadAll(resp.Body)
521
if err != nil {
522
+
return fmt.Errorf("failed to read response: %w", err)
523
}
524
525
var response Response
526
if err := json.Unmarshal(body, &response); err != nil {
527
+
return fmt.Errorf("failed to unmarshal JSON: %w", err)
528
}
529
530
for _, follow := range response.Follows {
···
537
cursor = response.Cursor
538
}
539
540
+
unquotedTableName := "follows_" + userdid
541
+
tableName := pq.QuoteIdentifier(unquotedTableName)
542
+
tempTableName := "follows_temp_" + userdid
543
+
quotedTempTableName := pq.QuoteIdentifier(tempTableName)
544
+
545
+
log.Println("Dropping existing temporary followers table if it exists")
546
+
_, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", quotedTempTableName))
547
if err != nil {
548
+
return fmt.Errorf("error dropping existing temp cache table: %w", err)
549
}
550
551
+
log.Println("Creating temporary followers table")
552
+
_, err = db.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %s (follow TEXT UNIQUE)`, quotedTempTableName))
553
if err != nil {
554
+
return fmt.Errorf("error creating temp cache table: %w", err)
555
}
556
557
+
if len(allDIDs) > 0 {
558
+
txn, err := db.BeginTx(ctx, nil)
559
+
if err != nil {
560
+
return fmt.Errorf("failed to begin transaction: %w", err)
561
+
}
562
+
563
+
stmt, err := txn.Prepare(pq.CopyIn(tempTableName, "follow"))
564
+
if err != nil {
565
+
return fmt.Errorf("failed to prepare copy in: %w", err)
566
+
}
567
+
568
+
for _, did := range allDIDs {
569
+
_, err = stmt.Exec(did)
570
if err != nil {
571
+
return fmt.Errorf("failed to exec copy in: %w", err)
572
}
573
}
574
+
575
+
_, err = stmt.Exec()
576
+
if err != nil {
577
+
return fmt.Errorf("failed to finalize copy in: %w", err)
578
+
}
579
+
580
+
err = stmt.Close()
581
+
if err != nil {
582
+
return fmt.Errorf("failed to close statement: %w", err)
583
+
}
584
+
585
+
err = txn.Commit()
586
+
if err != nil {
587
+
return fmt.Errorf("failed to commit transaction: %w", err)
588
+
}
589
}
590
591
+
log.Println("Atomically replacing old followers table with new one")
592
+
tx, err := db.Begin()
593
+
if err != nil {
594
+
return fmt.Errorf("error beginning transaction for table rename: %w", err)
595
+
}
596
+
_, err = tx.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
597
+
if err != nil {
598
+
tx.Rollback()
599
+
return fmt.Errorf("error dropping old follows table in transaction: %w", err)
600
+
}
601
+
_, err = tx.Exec(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", quotedTempTableName, pq.QuoteIdentifier(unquotedTableName)))
602
+
if err != nil {
603
+
tx.Rollback()
604
+
return fmt.Errorf("error renaming temp table in transaction: %w", err)
605
+
}
606
+
err = tx.Commit()
607
+
if err != nil {
608
+
return fmt.Errorf("error committing transaction for table rename: %w", err)
609
+
}
610
+
611
+
log.Printf("Successfully cached %d followers for %s", len(allDIDs), userdid)
612
+
return nil
613
+
}
614
+
615
+
func getFollowers(ctx context.Context, db *sql.DB, userdid string) ([]string, error) {
616
+
unquotedTableName := "follows_" + userdid
617
+
tableName := pq.QuoteIdentifier(unquotedTableName)
618
+
619
+
var exists bool
620
+
err := db.QueryRowContext(ctx, "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)", unquotedTableName).Scan(&exists)
621
+
if err != nil {
622
+
return nil, fmt.Errorf("error checking follows cache table existence for %s: %w", userdid, err)
623
+
}
624
+
625
+
if !exists {
626
+
log.Printf("Follows cache table for %s does not exist. Waiting for background refresh to create it.", userdid)
627
+
return []string{}, nil
628
+
}
629
+
630
+
rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT follow FROM %s", tableName))
631
+
if err != nil {
632
+
return nil, fmt.Errorf("error querying cache table: %w", err)
633
+
}
634
+
defer rows.Close()
635
+
636
+
var cachedFollows []string
637
+
for rows.Next() {
638
+
var follow string
639
+
if err := rows.Scan(&follow); err != nil {
640
+
return nil, fmt.Errorf("error scanning cached follow: %w", err)
641
+
}
642
+
cachedFollows = append(cachedFollows, follow)
643
+
}
644
+
645
+
if err := rows.Err(); err != nil {
646
+
return nil, fmt.Errorf("error iterating cached follows: %w", err)
647
+
}
648
+
649
+
log.Printf("Returning %d cached followers for %s", len(cachedFollows), userdid)
650
+
return cachedFollows, nil
651
}
652
653
func markPostsAsViewed(ctx context.Context, db *sql.DB, userDID string, posts []PostWithDate, smartReportingEnabled bool) error {