Sifa professional network API (Fastify, AT Protocol, Jetstream) sifa.id/

fix(suggestions): use PDS-native listRecords for follow import (#112)

The Bluesky follow import was silently failing because it used
agent.getFollows() (an AppView endpoint) through the authenticated
PDS agent. Switch to com.atproto.repo.listRecords which reads
app.bsky.graph.follow records directly from the user's PDS —
works with any PDS, no Bluesky AppView dependency.

Also adds POST /api/suggestions/sync endpoint so existing users
can trigger a re-import without logging out and back in.

authored by

Guido X Jansen and committed by
GitHub
29ee4eae afc8c2ec

+198 -102
+7 -38
src/oauth/routes.ts
··· 6 6 import { Agent } from '@atproto/api'; 7 7 import type { Database } from '../db/index.js'; 8 8 import { sessions, profiles, oauthSessions } from '../db/schema/index.js'; 9 - import { importBlueskyFollows } from '../services/bluesky-follows.js'; 10 - import { importTangledFollows } from '../services/tangled-follows.js'; 9 + import { fetchBlueskyFollowsFromPds, importBlueskyFollows } from '../services/bluesky-follows.js'; 10 + import { fetchTangledFollowsFromPds, importTangledFollows } from '../services/tangled-follows.js'; 11 11 12 12 const loginSchema = z.object({ 13 13 handle: z.string().min(1).max(253), ··· 175 175 } 176 176 177 177 // Import follows from Bluesky and Tangled (fire-and-forget, non-blocking) 178 + // Uses PDS-native listRecords — works with any PDS, no AppView dependency. 178 179 void (async () => { 179 180 try { 180 181 const agent = new Agent(session); 181 182 182 - // Import Bluesky follows 183 - const bskyFollows: Array<{ did: string; handle: string; createdAt: string }> = []; 184 - let cursor: string | undefined; 185 - do { 186 - const res = await agent.getFollows({ actor: did, limit: 100, cursor }); 187 - for (const f of res.data.follows) { 188 - bskyFollows.push({ 189 - did: f.did, 190 - handle: f.handle, 191 - createdAt: f.indexedAt ?? new Date().toISOString(), 192 - }); 193 - } 194 - cursor = res.data.cursor; 195 - } while (cursor); 183 + // Import Bluesky follows from PDS 184 + const bskyFollows = await fetchBlueskyFollowsFromPds(agent, did); 196 185 await importBlueskyFollows(db, did, bskyFollows); 197 186 app.log.info({ did, count: bskyFollows.length }, 'Imported Bluesky follows'); 198 187 199 - // Import Tangled follows (read from user's PDS repo) 188 + // Import Tangled follows from PDS 200 189 try { 201 - const tangledFollows: Array<{ did: string; handle: string; createdAt: string }> = []; 202 - let tangledCursor: string | undefined; 203 - do { 204 - const res = await agent.com.atproto.repo.listRecords({ 205 - repo: did, 206 - collection: 'sh.tangled.graph.follow', 207 - limit: 100, 208 - cursor: tangledCursor, 209 - }); 210 - for (const record of res.data.records) { 211 - const val = record.value as { subject?: string; createdAt?: string }; 212 - if (val.subject) { 213 - tangledFollows.push({ 214 - did: val.subject, 215 - handle: '', // We don't have the handle from PDS records 216 - createdAt: val.createdAt ?? new Date().toISOString(), 217 - }); 218 - } 219 - } 220 - tangledCursor = res.data.cursor; 221 - } while (tangledCursor); 190 + const tangledFollows = await fetchTangledFollowsFromPds(agent, did); 222 191 await importTangledFollows(db, did, tangledFollows); 223 192 app.log.info({ did, count: tangledFollows.length }, 'Imported Tangled follows'); 224 193 } catch (err) {
+41
src/routes/suggestions.ts
··· 1 1 import type { FastifyInstance } from 'fastify'; 2 2 import { z } from 'zod'; 3 + import { Agent } from '@atproto/api'; 3 4 import type { NodeOAuthClient } from '@atproto/oauth-client-node'; 4 5 import type { Database } from '../db/index.js'; 5 6 import { connections, profiles, suggestionDismissals, invites } from '../db/schema/index.js'; 6 7 import { and, eq, ne, notInArray, sql, count, gt } from 'drizzle-orm'; 7 8 import { createAuthMiddleware, getAuthContext } from '../middleware/auth.js'; 9 + import { fetchBlueskyFollowsFromPds, importBlueskyFollows } from '../services/bluesky-follows.js'; 10 + import { fetchTangledFollowsFromPds, importTangledFollows } from '../services/tangled-follows.js'; 8 11 9 12 const dismissSchema = z.object({ 10 13 subjectDid: z.string().startsWith('did:'), ··· 211 214 ); 212 215 213 216 return reply.status(200).send({ status: 'ok' }); 217 + }, 218 + ); 219 + 220 + // POST /api/suggestions/sync -- re-import follows from PDS 221 + app.post( 222 + '/api/suggestions/sync', 223 + { preHandler: requireAuth, config: { rateLimit: { max: 3, timeWindow: '1 minute' } } }, 224 + async (request, reply) => { 225 + const { did, session } = getAuthContext(request); 226 + const agent = new Agent(session); 227 + 228 + let blueskyCount = 0; 229 + let tangledCount = 0; 230 + 231 + // Import Bluesky follows 232 + try { 233 + const bskyFollows = await fetchBlueskyFollowsFromPds(agent, did); 234 + await importBlueskyFollows(db, did, bskyFollows); 235 + blueskyCount = bskyFollows.length; 236 + app.log.info({ did, count: blueskyCount }, 'Synced Bluesky follows'); 237 + } catch (err) { 238 + app.log.error({ err, did }, 'Bluesky follow sync failed'); 239 + } 240 + 241 + // Import Tangled follows 242 + try { 243 + const tangledFollows = await fetchTangledFollowsFromPds(agent, did); 244 + await importTangledFollows(db, did, tangledFollows); 245 + tangledCount = tangledFollows.length; 246 + app.log.info({ did, count: tangledCount }, 'Synced Tangled follows'); 247 + } catch (err) { 248 + app.log.debug({ err, did }, 'Tangled follow sync skipped or failed'); 249 + } 250 + 251 + return reply.send({ 252 + status: 'ok', 253 + imported: { bluesky: blueskyCount, tangled: tangledCount }, 254 + }); 214 255 }, 215 256 ); 216 257
+37 -12
src/services/bluesky-follows.ts
··· 1 + import type { Agent } from '@atproto/api'; 1 2 import type { Database } from '../db/index.js'; 2 3 import { connections } from '../db/schema/index.js'; 3 4 4 - export function mapBskyFollowToConnection( 5 - followerDid: string, 6 - follow: { did: string; handle: string; createdAt: string }, 7 - ) { 8 - return { 9 - followerDid, 10 - subjectDid: follow.did, 11 - source: 'bluesky' as const, 12 - createdAt: new Date(follow.createdAt), 13 - }; 5 + /** 6 + * Fetches Bluesky follows from the user's PDS using listRecords 7 + * (AT Protocol-native, works with any PDS — no Bluesky AppView dependency). 8 + */ 9 + export async function fetchBlueskyFollowsFromPds( 10 + agent: Agent, 11 + did: string, 12 + ): Promise<Array<{ did: string; createdAt: string }>> { 13 + const follows: Array<{ did: string; createdAt: string }> = []; 14 + let cursor: string | undefined; 15 + do { 16 + const res = await agent.com.atproto.repo.listRecords({ 17 + repo: did, 18 + collection: 'app.bsky.graph.follow', 19 + limit: 100, 20 + cursor, 21 + }); 22 + for (const record of res.data.records) { 23 + const val = record.value as { subject?: string; createdAt?: string }; 24 + if (val.subject) { 25 + follows.push({ 26 + did: val.subject, 27 + createdAt: val.createdAt ?? new Date().toISOString(), 28 + }); 29 + } 30 + } 31 + cursor = res.data.cursor; 32 + } while (cursor); 33 + return follows; 14 34 } 15 35 16 36 export async function importBlueskyFollows( 17 37 db: Database, 18 38 followerDid: string, 19 - follows: Array<{ did: string; handle: string; createdAt: string }>, 39 + follows: Array<{ did: string; createdAt: string }>, 20 40 ) { 21 - const rows = follows.map((f) => mapBskyFollowToConnection(followerDid, f)); 41 + const rows = follows.map((f) => ({ 42 + followerDid, 43 + subjectDid: f.did, 44 + source: 'bluesky' as const, 45 + createdAt: new Date(f.createdAt), 46 + })); 22 47 if (rows.length === 0) return; 23 48 24 49 await db.insert(connections).values(rows).onConflictDoNothing();
+36 -12
src/services/tangled-follows.ts
··· 1 + import type { Agent } from '@atproto/api'; 1 2 import type { Database } from '../db/index.js'; 2 3 import { connections } from '../db/schema/index.js'; 3 4 4 - export function mapTangledFollowToConnection( 5 - followerDid: string, 6 - follow: { did: string; handle: string; createdAt: string }, 7 - ) { 8 - return { 9 - followerDid, 10 - subjectDid: follow.did, 11 - source: 'tangled' as const, 12 - createdAt: new Date(follow.createdAt), 13 - }; 5 + /** 6 + * Fetches Tangled follows from the user's PDS using listRecords. 7 + */ 8 + export async function fetchTangledFollowsFromPds( 9 + agent: Agent, 10 + did: string, 11 + ): Promise<Array<{ did: string; createdAt: string }>> { 12 + const follows: Array<{ did: string; createdAt: string }> = []; 13 + let cursor: string | undefined; 14 + do { 15 + const res = await agent.com.atproto.repo.listRecords({ 16 + repo: did, 17 + collection: 'sh.tangled.graph.follow', 18 + limit: 100, 19 + cursor, 20 + }); 21 + for (const record of res.data.records) { 22 + const val = record.value as { subject?: string; createdAt?: string }; 23 + if (val.subject) { 24 + follows.push({ 25 + did: val.subject, 26 + createdAt: val.createdAt ?? new Date().toISOString(), 27 + }); 28 + } 29 + } 30 + cursor = res.data.cursor; 31 + } while (cursor); 32 + return follows; 14 33 } 15 34 16 35 export async function importTangledFollows( 17 36 db: Database, 18 37 followerDid: string, 19 - follows: Array<{ did: string; handle: string; createdAt: string }>, 38 + follows: Array<{ did: string; createdAt: string }>, 20 39 ) { 21 - const rows = follows.map((f) => mapTangledFollowToConnection(followerDid, f)); 40 + const rows = follows.map((f) => ({ 41 + followerDid, 42 + subjectDid: f.did, 43 + source: 'tangled' as const, 44 + createdAt: new Date(f.createdAt), 45 + })); 22 46 if (rows.length === 0) return; 23 47 24 48 await db.insert(connections).values(rows).onConflictDoNothing();
+42 -20
tests/services/bluesky-follows.test.ts
··· 1 - import { describe, it, expect } from 'vitest'; 2 - import { mapBskyFollowToConnection } from '../../src/services/bluesky-follows.js'; 1 + import { describe, it, expect, vi } from 'vitest'; 3 2 4 - describe('Bluesky follow import', () => { 5 - it('maps Bluesky follow to connection row', () => { 6 - const conn = mapBskyFollowToConnection('did:plc:follower', { 7 - did: 'did:plc:subject', 8 - handle: 'alice.bsky.social', 9 - createdAt: '2026-01-01T00:00:00Z', 10 - }); 11 - expect(conn.source).toBe('bluesky'); 12 - expect(conn.followerDid).toBe('did:plc:follower'); 13 - expect(conn.subjectDid).toBe('did:plc:subject'); 3 + // Mock drizzle to capture insert values 4 + const insertedValues: unknown[] = []; 5 + vi.mock('../../src/db/schema/index.js', () => ({ 6 + connections: 'connections_table', 7 + })); 8 + 9 + vi.mock('drizzle-orm/pg-core', async (importOriginal) => { 10 + const actual = await importOriginal<typeof import('drizzle-orm/pg-core')>(); 11 + return actual; 12 + }); 13 + 14 + const mockDb = { 15 + insert: vi.fn().mockReturnValue({ 16 + values: vi.fn().mockImplementation((rows: unknown[]) => { 17 + insertedValues.push(...rows); 18 + return { onConflictDoNothing: vi.fn() }; 19 + }), 20 + }), 21 + }; 22 + 23 + // Must import after mocks 24 + const { importBlueskyFollows } = await import('../../src/services/bluesky-follows.js'); 25 + 26 + describe('importBlueskyFollows', () => { 27 + it('maps follows to connection rows with source bluesky', async () => { 28 + insertedValues.length = 0; 29 + await importBlueskyFollows(mockDb as never, 'did:plc:follower', [ 30 + { did: 'did:plc:subject', createdAt: '2026-01-01T00:00:00Z' }, 31 + ]); 32 + 33 + expect(insertedValues).toHaveLength(1); 34 + const row = insertedValues[0] as Record<string, unknown>; 35 + expect(row.source).toBe('bluesky'); 36 + expect(row.followerDid).toBe('did:plc:follower'); 37 + expect(row.subjectDid).toBe('did:plc:subject'); 38 + expect(row.createdAt).toBeInstanceOf(Date); 39 + expect((row.createdAt as Date).toISOString()).toBe('2026-01-01T00:00:00.000Z'); 14 40 }); 15 41 16 - it('sets createdAt as Date from ISO string', () => { 17 - const conn = mapBskyFollowToConnection('did:plc:follower', { 18 - did: 'did:plc:subject', 19 - handle: 'alice.bsky.social', 20 - createdAt: '2026-01-01T00:00:00Z', 21 - }); 22 - expect(conn.createdAt).toBeInstanceOf(Date); 23 - expect(conn.createdAt.toISOString()).toBe('2026-01-01T00:00:00.000Z'); 42 + it('skips insert when follows array is empty', async () => { 43 + mockDb.insert.mockClear(); 44 + await importBlueskyFollows(mockDb as never, 'did:plc:follower', []); 45 + expect(mockDb.insert).not.toHaveBeenCalled(); 24 46 }); 25 47 });
+35 -20
tests/services/tangled-follows.test.ts
··· 1 - import { describe, it, expect } from 'vitest'; 2 - import { mapTangledFollowToConnection } from '../../src/services/tangled-follows.js'; 1 + import { describe, it, expect, vi } from 'vitest'; 3 2 4 - describe('Tangled follow import', () => { 5 - it('maps Tangled follow to connection row with source tangled', () => { 6 - const conn = mapTangledFollowToConnection('did:plc:follower', { 7 - did: 'did:plc:subject', 8 - handle: 'alice.tangled.sh', 9 - createdAt: '2026-01-15T12:00:00Z', 10 - }); 11 - expect(conn.source).toBe('tangled'); 12 - expect(conn.followerDid).toBe('did:plc:follower'); 13 - expect(conn.subjectDid).toBe('did:plc:subject'); 3 + const insertedValues: unknown[] = []; 4 + vi.mock('../../src/db/schema/index.js', () => ({ 5 + connections: 'connections_table', 6 + })); 7 + 8 + const mockDb = { 9 + insert: vi.fn().mockReturnValue({ 10 + values: vi.fn().mockImplementation((rows: unknown[]) => { 11 + insertedValues.push(...rows); 12 + return { onConflictDoNothing: vi.fn() }; 13 + }), 14 + }), 15 + }; 16 + 17 + const { importTangledFollows } = await import('../../src/services/tangled-follows.js'); 18 + 19 + describe('importTangledFollows', () => { 20 + it('maps follows to connection rows with source tangled', async () => { 21 + insertedValues.length = 0; 22 + await importTangledFollows(mockDb as never, 'did:plc:follower', [ 23 + { did: 'did:plc:subject', createdAt: '2026-01-15T12:00:00Z' }, 24 + ]); 25 + 26 + expect(insertedValues).toHaveLength(1); 27 + const row = insertedValues[0] as Record<string, unknown>; 28 + expect(row.source).toBe('tangled'); 29 + expect(row.followerDid).toBe('did:plc:follower'); 30 + expect(row.subjectDid).toBe('did:plc:subject'); 31 + expect(row.createdAt).toBeInstanceOf(Date); 32 + expect((row.createdAt as Date).toISOString()).toBe('2026-01-15T12:00:00.000Z'); 14 33 }); 15 34 16 - it('sets createdAt as Date from ISO string', () => { 17 - const conn = mapTangledFollowToConnection('did:plc:follower', { 18 - did: 'did:plc:subject', 19 - handle: 'bob.tangled.sh', 20 - createdAt: '2026-02-01T00:00:00Z', 21 - }); 22 - expect(conn.createdAt).toBeInstanceOf(Date); 23 - expect(conn.createdAt.toISOString()).toBe('2026-02-01T00:00:00.000Z'); 35 + it('skips insert when follows array is empty', async () => { 36 + mockDb.insert.mockClear(); 37 + await importTangledFollows(mockDb as never, 'did:plc:follower', []); 38 + expect(mockDb.insert).not.toHaveBeenCalled(); 24 39 }); 25 40 });