+256
server/services/auto-backfill-follows.ts
+256
server/services/auto-backfill-follows.ts
···
106
106
// Step 2: Backfill profile info for all related users
107
107
await this.backfillProfileInfo(userDid);
108
108
109
+
// Step 3: Backfill posts from all followed users
110
+
await this.backfillFollowedUsersPosts(userDid);
111
+
109
112
// Update last backfill timestamp
110
113
await db
111
114
.insert(userSettings)
···
599
602
} catch (error) {
600
603
console.error(
601
604
`[AUTO_BACKFILL_FOLLOWS] Error backfilling profiles:`,
605
+
error
606
+
);
607
+
}
608
+
}
609
+
610
+
/**
611
+
* Backfill posts from all users that this user follows
612
+
* Fetches complete timeline for each followed user
613
+
*/
614
+
private async backfillFollowedUsersPosts(userDid: string): Promise<void> {
615
+
try {
616
+
console.log(
617
+
`[AUTO_BACKFILL_FOLLOWS] Starting posts backfill for followed users of ${userDid}`
618
+
);
619
+
620
+
// Get all users that this user follows
621
+
const followedUsers = await db.execute(
622
+
sql`
623
+
SELECT DISTINCT following_did as did
624
+
FROM ${follows}
625
+
WHERE follower_did = ${userDid}
626
+
`
627
+
);
628
+
629
+
const followedDids = followedUsers.rows.map((row: any) => row.did);
630
+
631
+
if (followedDids.length === 0) {
632
+
console.log(
633
+
`[AUTO_BACKFILL_FOLLOWS] User ${userDid} doesn't follow anyone yet`
634
+
);
635
+
return;
636
+
}
637
+
638
+
console.log(
639
+
`[AUTO_BACKFILL_FOLLOWS] Backfilling posts from ${followedDids.length} followed users`
640
+
);
641
+
642
+
const eventProcessor = new EventProcessor(storage);
643
+
eventProcessor.setSkipPdsFetching(true);
644
+
eventProcessor.setSkipDataCollectionCheck(true);
645
+
646
+
const { didResolver } = await import('./did-resolver');
647
+
let totalPostsFetched = 0;
648
+
let usersCompleted = 0;
649
+
let usersFailed = 0;
650
+
651
+
// Process each followed user
652
+
for (const followedDid of followedDids) {
653
+
try {
654
+
// Resolve their DID to find their PDS
655
+
const didDoc = await didResolver.resolveDID(followedDid);
656
+
if (!didDoc) {
657
+
usersFailed++;
658
+
continue;
659
+
}
660
+
661
+
const services = (didDoc as any).service || [];
662
+
const pdsService = services.find(
663
+
(s: any) =>
664
+
s.type === 'AtprotoPersonalDataServer' || s.id === '#atproto_pds'
665
+
);
666
+
667
+
if (!pdsService?.serviceEndpoint) {
668
+
usersFailed++;
669
+
continue;
670
+
}
671
+
672
+
const pdsAgent = new AtpAgent({
673
+
service: pdsService.serviceEndpoint,
674
+
});
675
+
676
+
// Fetch their posts
677
+
let postsFetched = 0;
678
+
let cursor: string | undefined;
679
+
680
+
do {
681
+
try {
682
+
const response = await pdsAgent.com.atproto.repo.listRecords({
683
+
repo: followedDid,
684
+
collection: 'app.bsky.feed.post',
685
+
limit: 100,
686
+
cursor: cursor,
687
+
});
688
+
689
+
// Process each post
690
+
for (const record of response.data.records) {
691
+
try {
692
+
const createdAt =
693
+
record.value?.createdAt || new Date().toISOString();
694
+
695
+
await eventProcessor.processCommit({
696
+
repo: followedDid,
697
+
ops: [
698
+
{
699
+
action: 'create',
700
+
path: `app.bsky.feed.post/${record.uri.split('/').pop()}`,
701
+
cid: record.cid,
702
+
record: record.value,
703
+
},
704
+
],
705
+
time: createdAt,
706
+
rev: '',
707
+
} as any);
708
+
709
+
postsFetched++;
710
+
totalPostsFetched++;
711
+
} catch (error: any) {
712
+
// Silently skip individual post errors (e.g., duplicates)
713
+
if (error?.code !== '23505') {
714
+
console.error(
715
+
`[AUTO_BACKFILL_FOLLOWS] Error processing post from ${followedDid}:`,
716
+
error.message
717
+
);
718
+
}
719
+
}
720
+
}
721
+
722
+
cursor = response.data.cursor;
723
+
} catch (error: any) {
724
+
console.error(
725
+
`[AUTO_BACKFILL_FOLLOWS] Error listing posts for ${followedDid}:`,
726
+
error.message
727
+
);
728
+
break;
729
+
}
730
+
} while (cursor);
731
+
732
+
usersCompleted++;
733
+
console.log(
734
+
`[AUTO_BACKFILL_FOLLOWS] User ${usersCompleted}/${followedDids.length}: Fetched ${postsFetched} posts from ${followedDid}`
735
+
);
736
+
} catch (error: any) {
737
+
console.error(
738
+
`[AUTO_BACKFILL_FOLLOWS] Error backfilling posts for ${followedDid}:`,
739
+
error.message
740
+
);
741
+
usersFailed++;
742
+
}
743
+
}
744
+
745
+
console.log(
746
+
`[AUTO_BACKFILL_FOLLOWS] Posts backfill complete: ${totalPostsFetched} posts from ${usersCompleted} users (${usersFailed} failed)`
747
+
);
748
+
} catch (error) {
749
+
console.error(
750
+
`[AUTO_BACKFILL_FOLLOWS] Error in backfillFollowedUsersPosts:`,
751
+
error
752
+
);
753
+
}
754
+
}
755
+
756
+
/**
757
+
* Backfill posts from a single user (called when following someone new)
758
+
*/
759
+
async backfillNewFollowPosts(followedDid: string): Promise<void> {
760
+
console.log(
761
+
`[AUTO_BACKFILL_FOLLOWS] Backfilling posts from newly followed user: ${followedDid}`
762
+
);
763
+
764
+
try {
765
+
const eventProcessor = new EventProcessor(storage);
766
+
eventProcessor.setSkipPdsFetching(true);
767
+
eventProcessor.setSkipDataCollectionCheck(true);
768
+
769
+
const { didResolver } = await import('./did-resolver');
770
+
771
+
// Resolve their DID to find their PDS
772
+
const didDoc = await didResolver.resolveDID(followedDid);
773
+
if (!didDoc) {
774
+
console.error(
775
+
`[AUTO_BACKFILL_FOLLOWS] Could not resolve DID ${followedDid}`
776
+
);
777
+
return;
778
+
}
779
+
780
+
const services = (didDoc as any).service || [];
781
+
const pdsService = services.find(
782
+
(s: any) =>
783
+
s.type === 'AtprotoPersonalDataServer' || s.id === '#atproto_pds'
784
+
);
785
+
786
+
if (!pdsService?.serviceEndpoint) {
787
+
console.error(
788
+
`[AUTO_BACKFILL_FOLLOWS] No PDS endpoint found for ${followedDid}`
789
+
);
790
+
return;
791
+
}
792
+
793
+
const pdsAgent = new AtpAgent({
794
+
service: pdsService.serviceEndpoint,
795
+
});
796
+
797
+
// Fetch their posts
798
+
let postsFetched = 0;
799
+
let cursor: string | undefined;
800
+
801
+
do {
802
+
try {
803
+
const response = await pdsAgent.com.atproto.repo.listRecords({
804
+
repo: followedDid,
805
+
collection: 'app.bsky.feed.post',
806
+
limit: 100,
807
+
cursor: cursor,
808
+
});
809
+
810
+
// Process each post
811
+
for (const record of response.data.records) {
812
+
try {
813
+
const createdAt =
814
+
record.value?.createdAt || new Date().toISOString();
815
+
816
+
await eventProcessor.processCommit({
817
+
repo: followedDid,
818
+
ops: [
819
+
{
820
+
action: 'create',
821
+
path: `app.bsky.feed.post/${record.uri.split('/').pop()}`,
822
+
cid: record.cid,
823
+
record: record.value,
824
+
},
825
+
],
826
+
time: createdAt,
827
+
rev: '',
828
+
} as any);
829
+
830
+
postsFetched++;
831
+
} catch (error: any) {
832
+
// Silently skip duplicates
833
+
if (error?.code !== '23505') {
834
+
console.error(
835
+
`[AUTO_BACKFILL_FOLLOWS] Error processing post from ${followedDid}:`,
836
+
error.message
837
+
);
838
+
}
839
+
}
840
+
}
841
+
842
+
cursor = response.data.cursor;
843
+
} catch (error: any) {
844
+
console.error(
845
+
`[AUTO_BACKFILL_FOLLOWS] Error listing posts for ${followedDid}:`,
846
+
error.message
847
+
);
848
+
break;
849
+
}
850
+
} while (cursor);
851
+
852
+
console.log(
853
+
`[AUTO_BACKFILL_FOLLOWS] Fetched ${postsFetched} posts from newly followed user ${followedDid}`
854
+
);
855
+
} catch (error) {
856
+
console.error(
857
+
`[AUTO_BACKFILL_FOLLOWS] Error backfilling new follow posts:`,
602
858
error
603
859
);
604
860
}
+64
server/services/event-processor.ts
+64
server/services/event-processor.ts
···
1719
1719
// Invalidate following list cache for the follower
1720
1720
await cacheService.invalidateUserFollowing(followerDid);
1721
1721
1722
+
// Trigger backfill of the followed user's posts (async, non-blocking)
1723
+
// Only backfill for users who have logged in (have sessions)
1724
+
this.triggerNewFollowBackfill(followerDid, followingDid);
1725
+
1722
1726
// Try to create notification if target user exists locally
1723
1727
const followingUser = await this.storage.getUser(followingDid);
1724
1728
if (followingUser) {
···
2009
2013
.catch((error) => {
2010
2014
smartConsole.error(
2011
2015
`[EVENT_PROCESSOR] Failed to import feed generator discovery:`,
2016
+
error
2017
+
);
2018
+
});
2019
+
}
2020
+
2021
+
/**
2022
+
* Trigger backfill of posts from a newly followed user
2023
+
* Only backfills if the follower has ever logged in (has sessions)
2024
+
*/
2025
+
private triggerNewFollowBackfill(
2026
+
followerDid: string,
2027
+
followingDid: string
2028
+
): void {
2029
+
// Check if follower has ever logged in (async, non-blocking)
2030
+
import('../../shared/schema')
2031
+
.then(({ sessions }) => {
2032
+
import('../db')
2033
+
.then(({ db }) => {
2034
+
db.query.sessions
2035
+
.findFirst({
2036
+
where: (s: any, { eq }: any) => eq(s.userDid, followerDid),
2037
+
})
2038
+
.then((session: any) => {
2039
+
if (session) {
2040
+
// User has logged in, backfill the followed user's posts
2041
+
smartConsole.log(
2042
+
`[EVENT_PROCESSOR] Triggering post backfill for new follow: ${followerDid} -> ${followingDid}`
2043
+
);
2044
+
2045
+
import('./auto-backfill-follows')
2046
+
.then(({ autoBackfillFollowsService }) => {
2047
+
autoBackfillFollowsService.backfillNewFollowPosts(
2048
+
followingDid
2049
+
);
2050
+
})
2051
+
.catch((error) => {
2052
+
smartConsole.error(
2053
+
`[EVENT_PROCESSOR] Failed to trigger follow backfill:`,
2054
+
error
2055
+
);
2056
+
});
2057
+
}
2058
+
})
2059
+
.catch((error: any) => {
2060
+
smartConsole.error(
2061
+
`[EVENT_PROCESSOR] Error checking session for follow backfill:`,
2062
+
error
2063
+
);
2064
+
});
2065
+
})
2066
+
.catch((error) => {
2067
+
smartConsole.error(
2068
+
`[EVENT_PROCESSOR] Failed to import db for follow backfill:`,
2069
+
error
2070
+
);
2071
+
});
2072
+
})
2073
+
.catch((error) => {
2074
+
smartConsole.error(
2075
+
`[EVENT_PROCESSOR] Failed to import schema for follow backfill:`,
2012
2076
error
2013
2077
);
2014
2078
});