Sifa professional network API (Fastify, AT Protocol, Jetstream) sifa.id/

Merge pull request #123 from singi-labs/feat/cross-app-activity

feat: cross-app activity API

authored by

Guido X Jansen and committed by
GitHub
483a71df 316bae87

+1694
+20
drizzle/0019_user_app_stats.sql
··· 1 + CREATE TABLE IF NOT EXISTS "user_app_stats" ( 2 + "did" text NOT NULL REFERENCES "profiles"("did") ON DELETE CASCADE, 3 + "app_id" text NOT NULL, 4 + "is_active" boolean NOT NULL DEFAULT false, 5 + "recent_count" integer NOT NULL DEFAULT 0, 6 + "latest_record_at" timestamptz, 7 + "refreshed_at" timestamptz NOT NULL DEFAULT now(), 8 + "visible" boolean NOT NULL DEFAULT true, 9 + "created_at" timestamptz NOT NULL DEFAULT now(), 10 + PRIMARY KEY ("did", "app_id") 11 + ); 12 + 13 + CREATE INDEX IF NOT EXISTS "idx_user_app_stats_app_count" ON "user_app_stats" ("app_id", "recent_count"); 14 + CREATE INDEX IF NOT EXISTS "idx_user_app_stats_refreshed" ON "user_app_stats" ("refreshed_at"); 15 + CREATE INDEX IF NOT EXISTS "idx_user_app_stats_visible" ON "user_app_stats" ("did", "visible", "recent_count"); 16 + 17 + CREATE TABLE IF NOT EXISTS "suppressed_dids" ( 18 + "did" text PRIMARY KEY, 19 + "requested_at" timestamptz NOT NULL DEFAULT now() 20 + );
+7
drizzle/meta/_journal.json
··· 134 134 "when": 1774317600000, 135 135 "tag": "0018_avatar_displayname_overrides", 136 136 "breakpoints": true 137 + }, 138 + { 139 + "idx": 19, 140 + "version": "7", 141 + "when": 1774404000000, 142 + "tag": "0019_user_app_stats", 143 + "breakpoints": true 137 144 } 138 145 ] 139 146 }
+2
src/db/schema/index.ts
··· 21 21 export { skillPositionLinks } from './skill-position-links.js'; 22 22 export { linkedinImports } from './linkedin-imports.js'; 23 23 export { featuredProfiles } from './featured-profiles.js'; 24 + export { userAppStats } from './user-app-stats.js'; 25 + export { suppressedDids } from './suppressed-dids.js';
+6
src/db/schema/suppressed-dids.ts
··· 1 + import { pgTable, text, timestamp } from 'drizzle-orm/pg-core'; 2 + 3 + export const suppressedDids = pgTable('suppressed_dids', { 4 + did: text('did').primaryKey(), 5 + requestedAt: timestamp('requested_at', { withTimezone: true }).notNull().defaultNow(), 6 + });
+24
src/db/schema/user-app-stats.ts
··· 1 + import { pgTable, text, timestamp, boolean, integer, primaryKey, index } from 'drizzle-orm/pg-core'; 2 + import { profiles } from './profiles.js'; 3 + 4 + export const userAppStats = pgTable( 5 + 'user_app_stats', 6 + { 7 + did: text('did') 8 + .notNull() 9 + .references(() => profiles.did, { onDelete: 'cascade' }), 10 + appId: text('app_id').notNull(), 11 + isActive: boolean('is_active').notNull().default(false), 12 + recentCount: integer('recent_count').notNull().default(0), 13 + latestRecordAt: timestamp('latest_record_at', { withTimezone: true }), 14 + refreshedAt: timestamp('refreshed_at', { withTimezone: true }).notNull().defaultNow(), 15 + visible: boolean('visible').notNull().default(true), 16 + createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(), 17 + }, 18 + (table) => [ 19 + primaryKey({ columns: [table.did, table.appId] }), 20 + index('idx_user_app_stats_app_count').on(table.appId, table.recentCount), 21 + index('idx_user_app_stats_refreshed').on(table.refreshedAt), 22 + index('idx_user_app_stats_visible').on(table.did, table.visible, table.recentCount), 23 + ], 24 + );
+125
src/lib/atproto-app-registry.ts
··· 1 + export interface AppRegistryEntry { 2 + id: string; 3 + name: string; 4 + category: string; 5 + collectionPrefixes: string[]; 6 + scanCollections: string[]; 7 + urlPattern?: string; 8 + color: string; 9 + } 10 + 11 + const APP_REGISTRY: AppRegistryEntry[] = [ 12 + { 13 + id: 'bluesky', 14 + name: 'Bluesky', 15 + category: 'Posts', 16 + collectionPrefixes: ['app.bsky.feed'], 17 + scanCollections: ['app.bsky.feed.post'], 18 + urlPattern: 'https://bsky.app/profile/{handle}/post/{rkey}', 19 + color: 'sky', 20 + }, 21 + { 22 + id: 'tangled', 23 + name: 'Tangled', 24 + category: 'Code', 25 + collectionPrefixes: ['sh.tangled'], 26 + scanCollections: [], 27 + color: 'emerald', 28 + }, 29 + { 30 + id: 'smokesignal', 31 + name: 'Smoke Signal', 32 + category: 'Events', 33 + collectionPrefixes: ['events.smokesignal'], 34 + scanCollections: [], 35 + color: 'orange', 36 + }, 37 + { 38 + id: 'flashes', 39 + name: 'Flashes', 40 + category: 'Photos', 41 + collectionPrefixes: ['blue.flashes'], 42 + scanCollections: [], 43 + color: 'pink', 44 + }, 45 + { 46 + id: 'whitewind', 47 + name: 'Whitewind', 48 + category: 'Articles', 49 + collectionPrefixes: ['com.whtwnd'], 50 + scanCollections: ['com.whtwnd.blog.entry'], 51 + urlPattern: 'https://whtwnd.com/{handle}/{rkey}', 52 + color: 'slate', 53 + }, 54 + { 55 + id: 'frontpage', 56 + name: 'Frontpage', 57 + category: 'Links', 58 + collectionPrefixes: ['com.frontpage'], 59 + scanCollections: [], 60 + color: 'violet', 61 + }, 62 + { 63 + id: 'picosky', 64 + name: 'Picosky', 65 + category: 'Pages', 66 + collectionPrefixes: ['blue.picosky'], 67 + scanCollections: [], 68 + color: 'pink', 69 + }, 70 + { 71 + id: 'linkat', 72 + name: 'Linkat', 73 + category: 'Links', 74 + collectionPrefixes: ['blue.linkat'], 75 + scanCollections: [], 76 + color: 'emerald', 77 + }, 78 + { 79 + id: 'pastesphere', 80 + name: 'PasteSphere', 81 + category: 'Pastes', 82 + collectionPrefixes: ['com.pastesphere'], 83 + scanCollections: [], 84 + color: 'amber', 85 + }, 86 + ]; 87 + 88 + export const EXCLUDED_COLLECTIONS: string[] = [ 89 + 'app.bsky.feed.like', 90 + 'app.bsky.feed.repost', 91 + 'app.bsky.graph.follow', 92 + 'app.bsky.graph.block', 93 + 'app.bsky.graph.mute', 94 + 'app.bsky.graph.listitem', 95 + ]; 96 + 97 + export function getAppsRegistry(): AppRegistryEntry[] { 98 + return APP_REGISTRY; 99 + } 100 + 101 + export function getAppForCollection( 102 + collection: string, 103 + ): (AppRegistryEntry & { matchedPrefix: string }) | undefined { 104 + // Exact match on scanCollections first 105 + for (const entry of APP_REGISTRY) { 106 + if (entry.scanCollections.includes(collection)) { 107 + const matchedPrefix: string = 108 + entry.collectionPrefixes.find((p) => collection.startsWith(p)) ?? 109 + entry.collectionPrefixes[0] ?? 110 + collection; 111 + return { ...entry, matchedPrefix }; 112 + } 113 + } 114 + 115 + // Prefix match on collectionPrefixes 116 + for (const entry of APP_REGISTRY) { 117 + for (const prefix of entry.collectionPrefixes) { 118 + if (collection.startsWith(prefix)) { 119 + return { ...entry, matchedPrefix: prefix }; 120 + } 121 + } 122 + } 123 + 124 + return undefined; 125 + }
+491
src/routes/activity.ts
··· 1 + import type { FastifyInstance } from 'fastify'; 2 + import type { NodeOAuthClient } from '@atproto/oauth-client-node'; 3 + import { Agent } from '@atproto/api'; 4 + import { z } from 'zod'; 5 + import { eq, and } from 'drizzle-orm'; 6 + import type { Database } from '../db/index.js'; 7 + import type { ValkeyClient } from '../cache/index.js'; 8 + import { userAppStats } from '../db/schema/user-app-stats.js'; 9 + import { profiles } from '../db/schema/index.js'; 10 + import { createAuthMiddleware, getAuthContext } from '../middleware/auth.js'; 11 + import { getVisibleAppStats, isDidSuppressed, suppressDid } from '../services/app-stats.js'; 12 + import { getAppsRegistry, type AppRegistryEntry } from '../lib/atproto-app-registry.js'; 13 + import { resolvePdsHost } from '../lib/pds-provider.js'; 14 + 15 + const publicBskyAgent = new Agent('https://public.api.bsky.app'); 16 + 17 + const visibilitySchema = z.object({ 18 + appId: z.string().min(1).max(200), 19 + visible: z.boolean(), 20 + }); 21 + 22 + const suppressSchema = z.object({ 23 + handleOrDid: z.string().min(1).max(500), 24 + }); 25 + 26 + interface ActivityItem { 27 + uri: string; 28 + collection: string; 29 + rkey: string; 30 + record: unknown; 31 + appId: string; 32 + appName: string; 33 + category: string; 34 + indexedAt: string; 35 + } 36 + 37 + interface CompositeCursor { 38 + cursors: Record<string, string>; 39 + } 40 + 41 + /** 42 + * Resolve a handle-or-DID string to a DID. 43 + * Checks the local profiles table first, then falls back to Bluesky public API. 44 + */ 45 + async function resolveHandleOrDid(db: Database, handleOrDid: string): Promise<string | null> { 46 + const isDid = handleOrDid.startsWith('did:'); 47 + const normalized = isDid ? handleOrDid : handleOrDid.toLowerCase(); 48 + const condition = isDid ? eq(profiles.did, normalized) : eq(profiles.handle, normalized); 49 + 50 + const [profile] = await db.select().from(profiles).where(condition).limit(1); 51 + if (profile) return profile.did; 52 + 53 + // Fall back to Bluesky public API resolution 54 + try { 55 + const res = await publicBskyAgent.resolveHandle( 56 + { handle: normalized }, 57 + { signal: AbortSignal.timeout(3000) }, 58 + ); 59 + return res.data.did; 60 + } catch { 61 + return isDid ? normalized : null; 62 + } 63 + } 64 + 65 + /** 66 + * Determine which collection to fetch for a given registry entry. 67 + */ 68 + function getCollectionForApp(entry: AppRegistryEntry): string { 69 + if (entry.scanCollections.length > 0) { 70 + return entry.scanCollections[0] ?? entry.collectionPrefixes[0] ?? ''; 71 + } 72 + return entry.collectionPrefixes[0] ?? ''; 73 + } 74 + 75 + /** 76 + * Extract createdAt from a record, falling back to current time. 77 + */ 78 + function extractCreatedAt(record: unknown): string { 79 + if ( 80 + record !== null && 81 + typeof record === 'object' && 82 + 'createdAt' in record && 83 + typeof (record as Record<string, unknown>).createdAt === 'string' 84 + ) { 85 + return (record as Record<string, unknown>).createdAt as string; 86 + } 87 + return new Date().toISOString(); 88 + } 89 + 90 + /** 91 + * Extract rkey from an AT URI (at://did/collection/rkey). 92 + */ 93 + function extractRkey(uri: string): string { 94 + const parts = uri.split('/'); 95 + return parts[parts.length - 1] ?? ''; 96 + } 97 + 98 + /** 99 + * Fetch recent items from Bluesky via the public AppView API. 100 + */ 101 + async function fetchBlueskyItems(did: string, limit: number): Promise<ActivityItem[]> { 102 + const res = await publicBskyAgent.app.bsky.feed.getAuthorFeed( 103 + { actor: did, limit }, 104 + { signal: AbortSignal.timeout(3000) }, 105 + ); 106 + 107 + return res.data.feed 108 + .filter((item) => item.post.author.did === did) // exclude reposts from others 109 + .map((item) => ({ 110 + uri: item.post.uri, 111 + collection: 'app.bsky.feed.post', 112 + rkey: extractRkey(item.post.uri), 113 + record: item.post.record, 114 + appId: 'bluesky', 115 + appName: 'Bluesky', 116 + category: 'Posts', 117 + indexedAt: item.post.indexedAt, 118 + })); 119 + } 120 + 121 + /** 122 + * Fetch recent items from a PDS via listRecords. 123 + */ 124 + async function fetchPdsItems( 125 + pdsHost: string, 126 + did: string, 127 + collection: string, 128 + entry: AppRegistryEntry, 129 + limit: number, 130 + cursor?: string, 131 + ): Promise<{ items: ActivityItem[]; cursor: string | undefined }> { 132 + const agent = new Agent(`https://${pdsHost}`); 133 + const params: Record<string, unknown> = { 134 + repo: did, 135 + collection, 136 + limit, 137 + }; 138 + if (cursor) { 139 + params.cursor = cursor; 140 + } 141 + 142 + const res = await agent.com.atproto.repo.listRecords( 143 + params as { repo: string; collection: string; limit: number; cursor?: string }, 144 + { signal: AbortSignal.timeout(3000) }, 145 + ); 146 + 147 + const items: ActivityItem[] = res.data.records.map((rec) => ({ 148 + uri: rec.uri, 149 + collection, 150 + rkey: extractRkey(rec.uri), 151 + record: rec.value, 152 + appId: entry.id, 153 + appName: entry.name, 154 + category: entry.category, 155 + indexedAt: extractCreatedAt(rec.value), 156 + })); 157 + 158 + return { items, cursor: res.data.cursor }; 159 + } 160 + 161 + /** 162 + * Fetch Bluesky feed with cursor support for paginated endpoint. 163 + */ 164 + async function fetchBlueskyFeedPaginated( 165 + did: string, 166 + limit: number, 167 + cursor?: string, 168 + ): Promise<{ items: ActivityItem[]; cursor: string | undefined }> { 169 + const params: { actor: string; limit: number; cursor?: string } = { actor: did, limit }; 170 + if (cursor) { 171 + params.cursor = cursor; 172 + } 173 + 174 + const res = await publicBskyAgent.app.bsky.feed.getAuthorFeed(params, { 175 + signal: AbortSignal.timeout(3000), 176 + }); 177 + 178 + const items: ActivityItem[] = res.data.feed 179 + .filter((item) => item.post.author.did === did) 180 + .map((item) => ({ 181 + uri: item.post.uri, 182 + collection: 'app.bsky.feed.post', 183 + rkey: extractRkey(item.post.uri), 184 + record: item.post.record, 185 + appId: 'bluesky', 186 + appName: 'Bluesky', 187 + category: 'Posts', 188 + indexedAt: item.post.indexedAt, 189 + })); 190 + 191 + return { items, cursor: res.data.cursor }; 192 + } 193 + 194 + export function registerActivityRoutes( 195 + app: FastifyInstance, 196 + db: Database, 197 + oauthClient: NodeOAuthClient | null, 198 + valkey: ValkeyClient | null = null, 199 + ) { 200 + const requireAuth = createAuthMiddleware(oauthClient, db); 201 + 202 + // PUT /api/profile/activity-visibility -- toggle visibility of a specific app's activity 203 + app.put( 204 + '/api/profile/activity-visibility', 205 + { preHandler: requireAuth }, 206 + async (request, reply) => { 207 + const parsed = visibilitySchema.safeParse(request.body); 208 + if (!parsed.success) { 209 + return reply.status(400).send({ error: 'ValidationError', issues: parsed.error.issues }); 210 + } 211 + 212 + const { did } = getAuthContext(request); 213 + const { appId, visible } = parsed.data; 214 + 215 + const result = await db 216 + .update(userAppStats) 217 + .set({ visible }) 218 + .where(and(eq(userAppStats.did, did), eq(userAppStats.appId, appId))); 219 + 220 + if (result.rowCount === 0) { 221 + return reply.status(404).send({ 222 + error: 'NotFound', 223 + message: `No activity stats found for app: ${appId}`, 224 + }); 225 + } 226 + 227 + return reply.status(200).send({ ok: true }); 228 + }, 229 + ); 230 + 231 + // GET /api/activity/:handleOrDid/teaser -- 3-5 recent items for profile activity card 232 + app.get<{ Params: { handleOrDid: string } }>( 233 + '/api/activity/:handleOrDid/teaser', 234 + async (request, reply) => { 235 + const { handleOrDid } = request.params; 236 + 237 + const did = await resolveHandleOrDid(db, handleOrDid); 238 + if (!did) { 239 + return reply.status(404).send({ error: 'NotFound', message: 'User not found' }); 240 + } 241 + 242 + const suppressed = await isDidSuppressed(db, did); 243 + if (suppressed) { 244 + return reply.send({ items: [] }); 245 + } 246 + 247 + const stats = await getVisibleAppStats(db, did); 248 + const topApps = stats.slice(0, 3); 249 + 250 + if (topApps.length === 0) { 251 + return reply.send({ items: [] }); 252 + } 253 + 254 + // Check Valkey cache 255 + const cacheKey = `activity-teaser:${did}`; 256 + if (valkey) { 257 + try { 258 + const cached = await valkey.get(cacheKey); 259 + if (cached !== null) { 260 + return reply.send(JSON.parse(cached)); 261 + } 262 + } catch (err) { 263 + request.log.warn({ err, cacheKey }, 'valkey.get failed for activity teaser'); 264 + } 265 + } 266 + 267 + const pdsHost = await resolvePdsHost(did); 268 + const registry = getAppsRegistry(); 269 + 270 + // Build fetch promises for each top app 271 + const fetchPromises = topApps.map((stat) => { 272 + const entry = registry.find((e) => e.id === stat.appId); 273 + if (!entry) return Promise.resolve([]); 274 + 275 + if (entry.id === 'bluesky') { 276 + return fetchBlueskyItems(did, 2); 277 + } 278 + 279 + if (!pdsHost) return Promise.resolve([]); 280 + const collection = getCollectionForApp(entry); 281 + return fetchPdsItems(pdsHost, did, collection, entry, 2).then((r) => r.items); 282 + }); 283 + 284 + const results = await Promise.allSettled(fetchPromises); 285 + 286 + const allItems: ActivityItem[] = []; 287 + for (const result of results) { 288 + if (result.status === 'fulfilled' && Array.isArray(result.value)) { 289 + // Take up to 2 items per app 290 + allItems.push(...result.value.slice(0, 2)); 291 + } 292 + } 293 + 294 + // Sort by date descending, take up to 5 295 + allItems.sort((a, b) => new Date(b.indexedAt).getTime() - new Date(a.indexedAt).getTime()); 296 + const items = allItems.slice(0, 5); 297 + 298 + const responseBody = { items }; 299 + 300 + // Cache result 301 + if (valkey) { 302 + try { 303 + await valkey.set(cacheKey, JSON.stringify(responseBody), 'EX', 900); 304 + } catch (err) { 305 + request.log.warn({ err, cacheKey }, 'valkey.set failed for activity teaser'); 306 + } 307 + } 308 + 309 + return reply.send(responseBody); 310 + }, 311 + ); 312 + 313 + // GET /api/activity/:handleOrDid -- full paginated activity feed 314 + app.get<{ 315 + Params: { handleOrDid: string }; 316 + Querystring: { category?: string; limit?: string; cursor?: string }; 317 + }>('/api/activity/:handleOrDid', async (request, reply) => { 318 + const { handleOrDid } = request.params; 319 + const categoryParam = request.query.category ?? 'all'; 320 + const limitParam = Math.min(Math.max(parseInt(request.query.limit ?? '20', 10) || 20, 1), 50); 321 + const cursorParam = request.query.cursor ?? null; 322 + 323 + const did = await resolveHandleOrDid(db, handleOrDid); 324 + if (!did) { 325 + return reply.status(404).send({ error: 'NotFound', message: 'User not found' }); 326 + } 327 + 328 + const suppressed = await isDidSuppressed(db, did); 329 + if (suppressed) { 330 + return reply.send({ items: [], cursor: null, hasMore: false }); 331 + } 332 + 333 + const stats = await getVisibleAppStats(db, did); 334 + const registry = getAppsRegistry(); 335 + 336 + // Filter apps by category if specified 337 + let targetApps: { stat: (typeof stats)[number]; entry: AppRegistryEntry }[]; 338 + if (categoryParam === 'all') { 339 + targetApps = stats 340 + .slice(0, 5) 341 + .map((stat) => { 342 + const entry = registry.find((e) => e.id === stat.appId); 343 + return entry ? { stat, entry } : null; 344 + }) 345 + .filter((x): x is NonNullable<typeof x> => x !== null); 346 + } else { 347 + targetApps = stats 348 + .map((stat) => { 349 + const entry = registry.find((e) => e.id === stat.appId); 350 + return entry && entry.category === categoryParam ? { stat, entry } : null; 351 + }) 352 + .filter((x): x is NonNullable<typeof x> => x !== null); 353 + } 354 + 355 + if (targetApps.length === 0) { 356 + return reply.send({ items: [], cursor: null, hasMore: false }); 357 + } 358 + 359 + // Decode composite cursor 360 + let perCollectionCursors: Record<string, string> = {}; 361 + if (cursorParam) { 362 + try { 363 + const decoded = JSON.parse( 364 + Buffer.from(cursorParam, 'base64url').toString('utf-8'), 365 + ) as CompositeCursor; 366 + perCollectionCursors = decoded.cursors; 367 + } catch { 368 + return reply.status(400).send({ error: 'BadRequest', message: 'Invalid cursor' }); 369 + } 370 + } 371 + 372 + const pdsHost = await resolvePdsHost(did); 373 + 374 + // Per-app limit: fetch a bit more per source to allow merge-sorting 375 + const perAppLimit = Math.min(limitParam, 20); 376 + 377 + // Build fetch promises 378 + const fetchPromises = targetApps.map(({ entry }) => { 379 + const collection = getCollectionForApp(entry); 380 + const collectionCursor = perCollectionCursors[collection] ?? undefined; 381 + 382 + // Check Valkey cache for this collection+cursor combo 383 + const collCacheKey = `activity:${did}:${collection}:${collectionCursor ?? 'start'}`; 384 + 385 + const fetchFn = async (): Promise<{ 386 + items: ActivityItem[]; 387 + cursor: string | undefined; 388 + collection: string; 389 + }> => { 390 + // Try cache first 391 + if (valkey) { 392 + try { 393 + const cached = await valkey.get(collCacheKey); 394 + if (cached !== null) { 395 + return JSON.parse(cached) as { 396 + items: ActivityItem[]; 397 + cursor: string | undefined; 398 + collection: string; 399 + }; 400 + } 401 + } catch (err) { 402 + request.log.warn({ err, collCacheKey }, 'valkey.get failed for activity feed'); 403 + } 404 + } 405 + 406 + let result: { items: ActivityItem[]; cursor: string | undefined }; 407 + 408 + if (entry.id === 'bluesky') { 409 + result = await fetchBlueskyFeedPaginated(did, perAppLimit, collectionCursor); 410 + } else if (pdsHost) { 411 + result = await fetchPdsItems( 412 + pdsHost, 413 + did, 414 + collection, 415 + entry, 416 + perAppLimit, 417 + collectionCursor, 418 + ); 419 + } else { 420 + result = { items: [], cursor: undefined }; 421 + } 422 + 423 + const cacheValue = { ...result, collection }; 424 + 425 + // Cache the result 426 + if (valkey) { 427 + try { 428 + await valkey.set(collCacheKey, JSON.stringify(cacheValue), 'EX', 300); 429 + } catch (err) { 430 + request.log.warn({ err, collCacheKey }, 'valkey.set failed for activity feed'); 431 + } 432 + } 433 + 434 + return cacheValue; 435 + }; 436 + 437 + return fetchFn(); 438 + }); 439 + 440 + const results = await Promise.allSettled(fetchPromises); 441 + 442 + // Merge all items and build new composite cursor 443 + const allItems: ActivityItem[] = []; 444 + const newCursors: Record<string, string> = {}; 445 + 446 + for (const result of results) { 447 + if (result.status === 'fulfilled') { 448 + allItems.push(...result.value.items); 449 + if (result.value.cursor) { 450 + newCursors[result.value.collection] = result.value.cursor; 451 + } 452 + } 453 + } 454 + 455 + // Sort by date descending 456 + allItems.sort((a, b) => new Date(b.indexedAt).getTime() - new Date(a.indexedAt).getTime()); 457 + 458 + // Paginate 459 + const items = allItems.slice(0, limitParam); 460 + const hasMore = Object.keys(newCursors).length > 0; 461 + 462 + const compositeCursor = hasMore 463 + ? Buffer.from(JSON.stringify({ cursors: newCursors }), 'utf-8').toString('base64url') 464 + : null; 465 + 466 + return reply.send({ items, cursor: compositeCursor, hasMore }); 467 + }); 468 + 469 + // POST /api/privacy/suppress -- GDPR erasure endpoint (no auth required) 470 + app.post('/api/privacy/suppress', async (request, reply) => { 471 + const parsed = suppressSchema.safeParse(request.body); 472 + if (!parsed.success) { 473 + return reply.status(400).send({ error: 'ValidationError', issues: parsed.error.issues }); 474 + } 475 + 476 + const did = await resolveHandleOrDid(db, parsed.data.handleOrDid); 477 + if (!did) { 478 + return reply.status(404).send({ error: 'NotFound', message: 'User not found' }); 479 + } 480 + 481 + if (!valkey) { 482 + return reply 483 + .status(503) 484 + .send({ error: 'ServiceUnavailable', message: 'Cache not available' }); 485 + } 486 + 487 + await suppressDid(db, valkey, did); 488 + 489 + return reply.send({ ok: true }); 490 + }); 491 + }
+25
src/routes/apps.ts
··· 1 + import type { FastifyInstance } from 'fastify'; 2 + import type { ValkeyClient } from '../cache/index.js'; 3 + import { getAppsRegistry } from '../lib/atproto-app-registry.js'; 4 + 5 + const APPS_REGISTRY_KEY = 'apps:registry'; 6 + const APPS_REGISTRY_TTL = 86400; // 24 hours 7 + 8 + export function registerAppsRoutes(app: FastifyInstance, valkey: ValkeyClient | null) { 9 + app.get('/api/apps/registry', async (_request, reply) => { 10 + if (valkey) { 11 + const cached = await valkey.get(APPS_REGISTRY_KEY); 12 + if (cached !== null) { 13 + return reply.send(JSON.parse(cached)); 14 + } 15 + } 16 + 17 + const registry = getAppsRegistry(); 18 + 19 + if (valkey) { 20 + await valkey.setex(APPS_REGISTRY_KEY, APPS_REGISTRY_TTL, JSON.stringify(registry)); 21 + } 22 + 23 + return reply.send(registry); 24 + }); 25 + }
+16
src/routes/profile-write.ts
··· 51 51 sessions as sessionsTable, 52 52 oauthSessions as oauthSessionsTable, 53 53 } from '../db/schema/index.js'; 54 + import { scanUserApps } from '../services/pds-scanner.js'; 55 + import { upsertScanResults } from '../services/app-stats.js'; 56 + import { resolvePdsHost } from '../lib/pds-provider.js'; 54 57 55 58 const overrideSchema = z.object({ 56 59 headline: z.string().max(300).nullish(), ··· 952 955 } 953 956 954 957 app.log.info({ did, synced }, 'Profile synced from PDS'); 958 + 959 + // Scan cross-app activity (blocking at claim time so badges are ready immediately) 960 + try { 961 + const pdsHost = await resolvePdsHost(did); 962 + if (pdsHost) { 963 + const scanResults = await scanUserApps(`https://${pdsHost}`, did); 964 + await upsertScanResults(db, did, scanResults); 965 + } 966 + } catch (err) { 967 + app.log.warn({ err, did }, 'Cross-app activity scan failed during profile sync'); 968 + // Non-fatal — profile sync still succeeds even if activity scan fails 969 + } 970 + 955 971 return reply.send({ synced }); 956 972 }); 957 973
+35
src/routes/profile.ts
··· 27 27 import { isVerifiablePlatform } from '../services/verification.js'; 28 28 import { resolveProfileFields } from '../lib/resolve-profile.js'; 29 29 import { resolvePdsHost, mapPdsHostToProvider } from '../lib/pds-provider.js'; 30 + import { 31 + getVisibleAppStats, 32 + triggerRefreshIfStale, 33 + isDidSuppressed, 34 + } from '../services/app-stats.js'; 35 + import { getAppsRegistry } from '../lib/atproto-app-registry.js'; 30 36 31 37 export async function getMutualFollowCount(db: Database, did: string): Promise<number> { 32 38 // Raw SQL required: Drizzle ORM doesn't support self-join aggregate subqueries for mutual follow counting ··· 244 250 followingCount: 0, 245 251 connectionsCount: 0, 246 252 atprotoFollowersCount: bskyFollowers, 253 + activeApps: [], 247 254 inviteCount: inviteCountResult[0]?.value ?? 0, 248 255 pdsProvider: pdsHost ? mapPdsHostToProvider(pdsHost) : null, 249 256 claimed: false, ··· 334 341 const followersCount = followersResult[0]?.value ?? 0; 335 342 const followingCount = followingResult[0]?.value ?? 0; 336 343 344 + // Fetch active apps (cross-app activity) in parallel 345 + const [suppressed, visibleStats] = await Promise.all([ 346 + isDidSuppressed(db, profile.did), 347 + getVisibleAppStats(db, profile.did), 348 + ]); 349 + 350 + const registry = getAppsRegistry(); 351 + const activeApps = suppressed 352 + ? [] 353 + : visibleStats 354 + .filter((s) => s.isActive) 355 + .map((s) => { 356 + const registryEntry = registry.find((r) => r.id === s.appId); 357 + return { 358 + id: s.appId, 359 + name: registryEntry?.name ?? s.appId, 360 + category: registryEntry?.category ?? 'Other', 361 + recentCount: s.recentCount, 362 + latestRecordAt: s.latestRecordAt?.toISOString() ?? null, 363 + }; 364 + }); 365 + 337 366 // Assemble location display string from parts 338 367 const locationParts = [ 339 368 profile.locationCity, ··· 366 395 .catch((err: unknown) => { 367 396 request.log.warn({ err, did: profile.did }, 'Failed to cache pdsHost'); 368 397 }); 398 + } 399 + 400 + // Trigger background PDS scan if data is stale (fire-and-forget) 401 + if (valkey && pdsHost) { 402 + triggerRefreshIfStale(db, valkey, profile.did, pdsHost); 369 403 } 370 404 371 405 // Find primary external account for website display ··· 510 544 followersCount, 511 545 followingCount, 512 546 connectionsCount: connectionsCountResult, 547 + activeApps, 513 548 atprotoFollowersCount: atprotoFollowersCount ?? 0, 514 549 inviteCount: inviteCountResult[0]?.value ?? 0, 515 550 pdsProvider: pdsHost ? mapPdsHostToProvider(pdsHost) : null,
+4
src/server.ts
··· 45 45 import { createHonorIndexer } from './jetstream/indexers/honor.js'; 46 46 import { createLanguageIndexer } from './jetstream/indexers/language.js'; 47 47 import { createCursorManager } from './jetstream/cursor.js'; 48 + import { registerActivityRoutes } from './routes/activity.js'; 49 + import { registerAppsRoutes } from './routes/apps.js'; 48 50 import { registerFeaturedRoutes } from './routes/featured.js'; 49 51 import { startFeaturedProfileJob } from './services/featured-job.js'; 50 52 import { createBotAgent } from './services/bluesky-bot.js'; ··· 160 162 registerStatsRoutes(app, db, valkey); 161 163 registerAdminStatsRoutes(app, db, valkey, oauthClient, config); 162 164 registerLocationRoutes(app, config.GEONAMES_USERNAME); 165 + registerActivityRoutes(app, db, oauthClient, valkey); 166 + registerAppsRoutes(app, valkey); 163 167 registerFeaturedRoutes(app, db, valkey); 164 168 165 169 // Start Jetstream in non-test mode
+137
src/services/app-stats.ts
··· 1 + import { eq, and, desc, notInArray } from 'drizzle-orm'; 2 + import type { Database } from '../db/index.js'; 3 + import type { ValkeyClient } from '../cache/index.js'; 4 + import { userAppStats } from '../db/schema/user-app-stats.js'; 5 + import { suppressedDids } from '../db/schema/suppressed-dids.js'; 6 + import { scanUserApps, type AppScanResult } from './pds-scanner.js'; 7 + 8 + export interface AppStatRow { 9 + did: string; 10 + appId: string; 11 + isActive: boolean; 12 + recentCount: number; 13 + latestRecordAt: Date | null; 14 + refreshedAt: Date; 15 + visible: boolean; 16 + createdAt: Date; 17 + } 18 + 19 + const STALE_THRESHOLD_MS = 24 * 60 * 60 * 1000; // 24 hours 20 + const LOCK_TTL_SECONDS = 120; 21 + 22 + export async function getAppStatsForDid(db: Database, did: string): Promise<AppStatRow[]> { 23 + return db 24 + .select() 25 + .from(userAppStats) 26 + .where(eq(userAppStats.did, did)) 27 + .orderBy(desc(userAppStats.recentCount)); 28 + } 29 + 30 + export async function getVisibleAppStats(db: Database, did: string): Promise<AppStatRow[]> { 31 + return db 32 + .select() 33 + .from(userAppStats) 34 + .where(and(eq(userAppStats.did, did), eq(userAppStats.visible, true))) 35 + .orderBy(desc(userAppStats.recentCount)); 36 + } 37 + 38 + export async function upsertScanResults( 39 + db: Database, 40 + did: string, 41 + results: AppScanResult[], 42 + ): Promise<void> { 43 + if (results.length === 0) { 44 + // Delete all rows for this DID if no results 45 + await db.delete(userAppStats).where(eq(userAppStats.did, did)); 46 + return; 47 + } 48 + 49 + const now = new Date(); 50 + 51 + // Upsert each result 52 + for (const result of results) { 53 + await db 54 + .insert(userAppStats) 55 + .values({ 56 + did, 57 + appId: result.appId, 58 + isActive: result.isActive, 59 + recentCount: result.recentCount, 60 + latestRecordAt: result.latestRecordAt, 61 + refreshedAt: now, 62 + }) 63 + .onConflictDoUpdate({ 64 + target: [userAppStats.did, userAppStats.appId], 65 + set: { 66 + isActive: result.isActive, 67 + recentCount: result.recentCount, 68 + latestRecordAt: result.latestRecordAt, 69 + refreshedAt: now, 70 + }, 71 + }); 72 + } 73 + 74 + // Delete rows for apps not in the results 75 + const resultAppIds = results.map((r) => r.appId); 76 + await db 77 + .delete(userAppStats) 78 + .where(and(eq(userAppStats.did, did), notInArray(userAppStats.appId, resultAppIds))); 79 + } 80 + 81 + function isStale(rows: AppStatRow[]): boolean { 82 + if (rows.length === 0) return true; 83 + 84 + const now = Date.now(); 85 + return rows.every((row) => now - row.refreshedAt.getTime() >= STALE_THRESHOLD_MS); 86 + } 87 + 88 + export function triggerRefreshIfStale( 89 + db: Database, 90 + valkey: ValkeyClient, 91 + did: string, 92 + pdsHost: string, 93 + ): void { 94 + // Fire-and-forget — caller does not await 95 + void (async () => { 96 + // Check staleness 97 + const rows = await getAppStatsForDid(db, did); 98 + if (!isStale(rows)) return; 99 + 100 + // Acquire lock 101 + const lockKey = `pds-scan:${did}`; 102 + const locked = await valkey.set(lockKey, '1', 'EX', LOCK_TTL_SECONDS, 'NX'); 103 + if (locked === null) return; // another scan is running 104 + 105 + try { 106 + const results = await scanUserApps(pdsHost, did); 107 + await upsertScanResults(db, did, results); 108 + } catch (err) { 109 + console.error(`Background PDS scan failed for ${did}:`, err); 110 + // Release lock early on failure 111 + await valkey.del(lockKey).catch(() => {}); 112 + } 113 + })().catch((err) => { 114 + console.error(`Unexpected error in triggerRefreshIfStale for ${did}:`, err); 115 + }); 116 + } 117 + 118 + export async function isDidSuppressed(db: Database, did: string): Promise<boolean> { 119 + const rows = await db.select().from(suppressedDids).where(eq(suppressedDids.did, did)); 120 + return rows.length > 0; 121 + } 122 + 123 + export async function suppressDid(db: Database, valkey: ValkeyClient, did: string): Promise<void> { 124 + // Insert into suppressed list 125 + await db.insert(suppressedDids).values({ did }).onConflictDoNothing(); 126 + 127 + // Delete all stats for this DID 128 + await db.delete(userAppStats).where(eq(userAppStats.did, did)); 129 + 130 + // Clean up Valkey keys 131 + await valkey.del(`pds-scan:${did}`); 132 + await valkey.del(`activity-teaser:${did}`); 133 + const activityKeys = await valkey.keys(`activity:${did}:*`); 134 + if (activityKeys.length > 0) { 135 + await valkey.del(...activityKeys); 136 + } 137 + }
+149
src/services/pds-scanner.ts
··· 1 + import { Agent } from '@atproto/api'; 2 + import { getAppsRegistry, EXCLUDED_COLLECTIONS } from '../lib/atproto-app-registry.js'; 3 + 4 + export interface AppScanResult { 5 + appId: string; 6 + isActive: boolean; 7 + recentCount: number; 8 + latestRecordAt: Date | null; 9 + } 10 + 11 + const MAX_PAGES = 5; 12 + const PAGE_LIMIT = 100; 13 + const RECENT_WINDOW_DAYS = 90; 14 + const REQUEST_TIMEOUT_MS = 5000; 15 + 16 + function getRecentWindowStart(): Date { 17 + const d = new Date(); 18 + d.setDate(d.getDate() - RECENT_WINDOW_DAYS); 19 + return d; 20 + } 21 + 22 + function getCollectionsForApp(app: { 23 + scanCollections: string[]; 24 + collectionPrefixes: string[]; 25 + }): string[] { 26 + const collections = 27 + app.scanCollections.length > 0 28 + ? [...app.scanCollections] 29 + : app.collectionPrefixes.length > 0 30 + ? [app.collectionPrefixes[0] ?? ''].filter(Boolean) 31 + : []; 32 + 33 + return collections.filter((c) => !EXCLUDED_COLLECTIONS.includes(c)); 34 + } 35 + 36 + interface RecordValue { 37 + createdAt?: string; 38 + } 39 + 40 + async function scanCollection( 41 + agent: Agent, 42 + did: string, 43 + collection: string, 44 + ): Promise<{ recentCount: number; latestRecordAt: Date | null }> { 45 + const windowStart = getRecentWindowStart(); 46 + let recentCount = 0; 47 + let latestRecordAt: Date | null = null; 48 + let cursor: string | undefined; 49 + 50 + for (let page = 0; page < MAX_PAGES; page++) { 51 + const res = await agent.com.atproto.repo.listRecords( 52 + { 53 + repo: did, 54 + collection, 55 + limit: PAGE_LIMIT, 56 + cursor, 57 + }, 58 + { signal: AbortSignal.timeout(REQUEST_TIMEOUT_MS) }, 59 + ); 60 + 61 + const records = res.data.records; 62 + if (records.length === 0) break; 63 + 64 + let allOutsideWindow = true; 65 + 66 + for (const record of records) { 67 + const val = record.value as RecordValue; 68 + if (!val.createdAt) continue; 69 + 70 + const recordDate = new Date(val.createdAt); 71 + 72 + if (latestRecordAt === null || recordDate > latestRecordAt) { 73 + latestRecordAt = recordDate; 74 + } 75 + 76 + if (recordDate >= windowStart) { 77 + recentCount++; 78 + allOutsideWindow = false; 79 + } 80 + } 81 + 82 + if (allOutsideWindow) break; 83 + 84 + cursor = res.data.cursor; 85 + if (!cursor) break; 86 + } 87 + 88 + return { recentCount, latestRecordAt }; 89 + } 90 + 91 + async function scanApp( 92 + agent: Agent, 93 + did: string, 94 + appId: string, 95 + collections: string[], 96 + ): Promise<AppScanResult> { 97 + if (collections.length === 0) { 98 + return { appId, isActive: false, recentCount: 0, latestRecordAt: null }; 99 + } 100 + 101 + const collectionResults = await Promise.allSettled( 102 + collections.map((c) => scanCollection(agent, did, c)), 103 + ); 104 + 105 + let totalRecentCount = 0; 106 + let overallLatest: Date | null = null; 107 + 108 + for (const result of collectionResults) { 109 + if (result.status !== 'fulfilled') continue; 110 + totalRecentCount += result.value.recentCount; 111 + if ( 112 + result.value.latestRecordAt !== null && 113 + (overallLatest === null || result.value.latestRecordAt > overallLatest) 114 + ) { 115 + overallLatest = result.value.latestRecordAt; 116 + } 117 + } 118 + 119 + return { 120 + appId, 121 + isActive: totalRecentCount > 0, 122 + recentCount: totalRecentCount, 123 + latestRecordAt: overallLatest, 124 + }; 125 + } 126 + 127 + export async function scanUserApps(pdsUrl: string, did: string): Promise<AppScanResult[]> { 128 + const agent = new Agent(pdsUrl); 129 + const registry = getAppsRegistry(); 130 + 131 + const scanPromises = registry.map((app) => { 132 + const collections = getCollectionsForApp(app); 133 + return scanApp(agent, did, app.id, collections); 134 + }); 135 + 136 + const results = await Promise.allSettled(scanPromises); 137 + 138 + return results.map((result, i) => { 139 + if (result.status === 'fulfilled') { 140 + return result.value; 141 + } 142 + return { 143 + appId: registry[i]?.id ?? 'unknown', 144 + isActive: false, 145 + recentCount: 0, 146 + latestRecordAt: null, 147 + }; 148 + }); 149 + }
+79
tests/lib/atproto-app-registry.test.ts
··· 1 + import { describe, expect, it } from 'vitest'; 2 + import { 3 + EXCLUDED_COLLECTIONS, 4 + getAppForCollection, 5 + getAppsRegistry, 6 + } from '../../src/lib/atproto-app-registry.js'; 7 + 8 + describe('atproto-app-registry', () => { 9 + describe('getAppsRegistry', () => { 10 + it('returns all entries with required fields', () => { 11 + const registry = getAppsRegistry(); 12 + expect(registry.length).toBeGreaterThan(0); 13 + for (const entry of registry) { 14 + expect(entry.id).toBeTruthy(); 15 + expect(entry.name).toBeTruthy(); 16 + expect(entry.category).toBeTruthy(); 17 + expect(entry.collectionPrefixes.length).toBeGreaterThan(0); 18 + expect(entry.color).toBeTruthy(); 19 + expect(Array.isArray(entry.scanCollections)).toBe(true); 20 + } 21 + }); 22 + }); 23 + 24 + describe('getAppForCollection', () => { 25 + it('maps known collection to its app via scanCollections', () => { 26 + const result = getAppForCollection('app.bsky.feed.post'); 27 + expect(result).toBeDefined(); 28 + expect(result?.id).toBe('bluesky'); 29 + expect(result?.matchedPrefix).toBe('app.bsky.feed'); 30 + }); 31 + 32 + it('maps com.whtwnd.blog.entry to whitewind via scanCollections', () => { 33 + const result = getAppForCollection('com.whtwnd.blog.entry'); 34 + expect(result).toBeDefined(); 35 + expect(result?.id).toBe('whitewind'); 36 + expect(result?.matchedPrefix).toBe('com.whtwnd'); 37 + }); 38 + 39 + it('maps by prefix for unknown sub-collections', () => { 40 + const result = getAppForCollection('sh.tangled.some.new'); 41 + expect(result).toBeDefined(); 42 + expect(result?.id).toBe('tangled'); 43 + expect(result?.matchedPrefix).toBe('sh.tangled'); 44 + }); 45 + 46 + it('maps events.smokesignal.foo by prefix', () => { 47 + const result = getAppForCollection('events.smokesignal.foo'); 48 + expect(result).toBeDefined(); 49 + expect(result?.id).toBe('smokesignal'); 50 + }); 51 + 52 + it('returns undefined for unknown collections', () => { 53 + const result = getAppForCollection('com.unknown.something'); 54 + expect(result).toBeUndefined(); 55 + }); 56 + 57 + it('prefers scanCollections exact match over prefix match', () => { 58 + // app.bsky.feed.post should match bluesky via scanCollections, 59 + // not just prefix 60 + const result = getAppForCollection('app.bsky.feed.post'); 61 + expect(result?.id).toBe('bluesky'); 62 + }); 63 + }); 64 + 65 + describe('EXCLUDED_COLLECTIONS', () => { 66 + it('contains expected entries', () => { 67 + expect(EXCLUDED_COLLECTIONS).toContain('app.bsky.feed.like'); 68 + expect(EXCLUDED_COLLECTIONS).toContain('app.bsky.feed.repost'); 69 + expect(EXCLUDED_COLLECTIONS).toContain('app.bsky.graph.follow'); 70 + expect(EXCLUDED_COLLECTIONS).toContain('app.bsky.graph.block'); 71 + expect(EXCLUDED_COLLECTIONS).toContain('app.bsky.graph.mute'); 72 + expect(EXCLUDED_COLLECTIONS).toContain('app.bsky.graph.listitem'); 73 + }); 74 + 75 + it('has exactly 6 entries', () => { 76 + expect(EXCLUDED_COLLECTIONS).toHaveLength(6); 77 + }); 78 + }); 79 + });
+291
tests/services/app-stats.test.ts
··· 1 + import { describe, it, expect, vi, beforeEach } from 'vitest'; 2 + 3 + // Mock pds-scanner 4 + const mockScanUserApps = vi.fn(); 5 + vi.mock('../../src/services/pds-scanner.js', () => ({ 6 + scanUserApps: mockScanUserApps, 7 + })); 8 + 9 + // Mock drizzle-orm operators — return tagged objects so we can assert calls 10 + vi.mock('drizzle-orm', () => ({ 11 + eq: vi.fn((col, val) => ({ _op: 'eq', col, val })), 12 + and: vi.fn((...args: unknown[]) => ({ _op: 'and', args })), 13 + desc: vi.fn((col) => ({ _op: 'desc', col })), 14 + lt: vi.fn((col, val) => ({ _op: 'lt', col, val })), 15 + notInArray: vi.fn((col, vals) => ({ _op: 'notInArray', col, vals })), 16 + })); 17 + 18 + import type { AppScanResult } from '../../src/services/pds-scanner.js'; 19 + import type { AppStatRow } from '../../src/services/app-stats.js'; 20 + 21 + // ---- Mock DB builder ---- 22 + function createMockDb() { 23 + const mockDeleteWhere = vi.fn().mockResolvedValue(undefined); 24 + const mockDeleteObj = { where: mockDeleteWhere }; 25 + const mockOnConflictDoUpdate = vi.fn().mockResolvedValue(undefined); 26 + const mockOnConflictDoNothing = vi.fn().mockResolvedValue(undefined); 27 + const mockInsertValues = vi.fn().mockReturnValue({ 28 + onConflictDoUpdate: mockOnConflictDoUpdate, 29 + onConflictDoNothing: mockOnConflictDoNothing, 30 + }); 31 + const mockSelectWhere = vi.fn().mockReturnValue({ 32 + orderBy: vi.fn().mockResolvedValue([]), 33 + }); 34 + const mockSelectFrom = vi.fn().mockReturnValue({ 35 + where: mockSelectWhere, 36 + }); 37 + 38 + const db = { 39 + select: vi.fn().mockReturnValue({ from: mockSelectFrom }), 40 + insert: vi.fn().mockReturnValue({ values: mockInsertValues }), 41 + delete: vi.fn().mockReturnValue(mockDeleteObj), 42 + _mocks: { 43 + selectFrom: mockSelectFrom, 44 + selectWhere: mockSelectWhere, 45 + insertValues: mockInsertValues, 46 + onConflictDoUpdate: mockOnConflictDoUpdate, 47 + onConflictDoNothing: mockOnConflictDoNothing, 48 + deleteObj: mockDeleteObj, 49 + deleteWhere: mockDeleteWhere, 50 + }, 51 + } as unknown; 52 + 53 + return db as ReturnType<typeof createMockDb> & { 54 + select: ReturnType<typeof vi.fn>; 55 + insert: ReturnType<typeof vi.fn>; 56 + delete: ReturnType<typeof vi.fn>; 57 + _mocks: { 58 + selectFrom: ReturnType<typeof vi.fn>; 59 + selectWhere: ReturnType<typeof vi.fn>; 60 + insertValues: ReturnType<typeof vi.fn>; 61 + onConflictDoUpdate: ReturnType<typeof vi.fn>; 62 + onConflictDoNothing: ReturnType<typeof vi.fn>; 63 + deleteObj: { where: ReturnType<typeof vi.fn> }; 64 + deleteWhere: ReturnType<typeof vi.fn>; 65 + }; 66 + }; 67 + } 68 + 69 + // ---- Mock Valkey ---- 70 + function createMockValkey() { 71 + return { 72 + set: vi.fn(), 73 + del: vi.fn().mockResolvedValue(1), 74 + keys: vi.fn().mockResolvedValue([]), 75 + } as unknown as ReturnType<typeof createMockValkey> & { 76 + set: ReturnType<typeof vi.fn>; 77 + del: ReturnType<typeof vi.fn>; 78 + keys: ReturnType<typeof vi.fn>; 79 + }; 80 + } 81 + 82 + describe('app-stats service', () => { 83 + let db: ReturnType<typeof createMockDb>; 84 + let valkey: ReturnType<typeof createMockValkey>; 85 + 86 + beforeEach(() => { 87 + vi.clearAllMocks(); 88 + vi.useFakeTimers(); 89 + db = createMockDb(); 90 + valkey = createMockValkey(); 91 + }); 92 + 93 + afterEach(() => { 94 + vi.useRealTimers(); 95 + }); 96 + 97 + // We import dynamically after mocks are set up 98 + async function getModule() { 99 + return import('../../src/services/app-stats.js'); 100 + } 101 + 102 + describe('getVisibleAppStats', () => { 103 + it('returns only visible rows ordered by recentCount DESC', async () => { 104 + const { getVisibleAppStats } = await getModule(); 105 + 106 + const visibleRows: AppStatRow[] = [ 107 + { 108 + did: 'did:plc:test', 109 + appId: 'bluesky', 110 + isActive: true, 111 + recentCount: 42, 112 + latestRecordAt: new Date(), 113 + refreshedAt: new Date(), 114 + visible: true, 115 + createdAt: new Date(), 116 + }, 117 + { 118 + did: 'did:plc:test', 119 + appId: 'whitewind', 120 + isActive: true, 121 + recentCount: 5, 122 + latestRecordAt: new Date(), 123 + refreshedAt: new Date(), 124 + visible: true, 125 + createdAt: new Date(), 126 + }, 127 + ]; 128 + 129 + const mockOrderBy = vi.fn().mockResolvedValue(visibleRows); 130 + db._mocks.selectWhere.mockReturnValue({ orderBy: mockOrderBy }); 131 + 132 + const result = await getVisibleAppStats(db as never, 'did:plc:test'); 133 + 134 + expect(result).toEqual(visibleRows); 135 + expect(result[0]?.recentCount).toBeGreaterThan(result[1]?.recentCount ?? 0); 136 + expect(db.select).toHaveBeenCalled(); 137 + }); 138 + }); 139 + 140 + describe('upsertScanResults', () => { 141 + it('calls Drizzle upsert for each scan result', async () => { 142 + const { upsertScanResults } = await getModule(); 143 + 144 + const results: AppScanResult[] = [ 145 + { appId: 'bluesky', isActive: true, recentCount: 10, latestRecordAt: new Date() }, 146 + { appId: 'whitewind', isActive: false, recentCount: 0, latestRecordAt: null }, 147 + ]; 148 + 149 + await upsertScanResults(db as never, 'did:plc:test', results); 150 + 151 + // Should have called insert for the batch 152 + expect(db.insert).toHaveBeenCalled(); 153 + expect(db._mocks.insertValues).toHaveBeenCalled(); 154 + expect(db._mocks.onConflictDoUpdate).toHaveBeenCalled(); 155 + }); 156 + }); 157 + 158 + describe('triggerRefreshIfStale', () => { 159 + it('acquires Valkey lock before scanning', async () => { 160 + const { triggerRefreshIfStale } = await getModule(); 161 + 162 + // No rows exist — stale 163 + const mockOrderBy = vi.fn().mockResolvedValue([]); 164 + db._mocks.selectWhere.mockReturnValue({ orderBy: mockOrderBy }); 165 + 166 + // Lock acquired 167 + valkey.set.mockResolvedValue('OK'); 168 + mockScanUserApps.mockResolvedValue([]); 169 + 170 + triggerRefreshIfStale( 171 + db as never, 172 + valkey as never, 173 + 'did:plc:test', 174 + 'https://pds.example.com', 175 + ); 176 + 177 + // Let the microtask queue flush 178 + await vi.advanceTimersByTimeAsync(0); 179 + 180 + expect(valkey.set).toHaveBeenCalledWith( 181 + 'pds-scan:did:plc:test', 182 + expect.any(String), 183 + 'EX', 184 + 120, 185 + 'NX', 186 + ); 187 + expect(mockScanUserApps).toHaveBeenCalledWith('https://pds.example.com', 'did:plc:test'); 188 + }); 189 + 190 + it('skips if lock already held', async () => { 191 + const { triggerRefreshIfStale } = await getModule(); 192 + 193 + // No rows — stale 194 + const mockOrderBy = vi.fn().mockResolvedValue([]); 195 + db._mocks.selectWhere.mockReturnValue({ orderBy: mockOrderBy }); 196 + 197 + // Lock NOT acquired 198 + valkey.set.mockResolvedValue(null); 199 + 200 + triggerRefreshIfStale( 201 + db as never, 202 + valkey as never, 203 + 'did:plc:test', 204 + 'https://pds.example.com', 205 + ); 206 + 207 + await vi.advanceTimersByTimeAsync(0); 208 + 209 + expect(valkey.set).toHaveBeenCalled(); 210 + expect(mockScanUserApps).not.toHaveBeenCalled(); 211 + }); 212 + 213 + it('skips if data is fresh (refreshedAt < 24h old)', async () => { 214 + const { triggerRefreshIfStale } = await getModule(); 215 + 216 + const freshRow: AppStatRow = { 217 + did: 'did:plc:test', 218 + appId: 'bluesky', 219 + isActive: true, 220 + recentCount: 10, 221 + latestRecordAt: new Date(), 222 + refreshedAt: new Date(), // just now — fresh 223 + visible: true, 224 + createdAt: new Date(), 225 + }; 226 + 227 + const mockOrderBy = vi.fn().mockResolvedValue([freshRow]); 228 + db._mocks.selectWhere.mockReturnValue({ orderBy: mockOrderBy }); 229 + 230 + triggerRefreshIfStale( 231 + db as never, 232 + valkey as never, 233 + 'did:plc:test', 234 + 'https://pds.example.com', 235 + ); 236 + 237 + await vi.advanceTimersByTimeAsync(0); 238 + 239 + expect(valkey.set).not.toHaveBeenCalled(); 240 + expect(mockScanUserApps).not.toHaveBeenCalled(); 241 + }); 242 + }); 243 + 244 + describe('isDidSuppressed', () => { 245 + it('returns true when DID exists in suppressed table', async () => { 246 + const { isDidSuppressed } = await getModule(); 247 + 248 + const mockOrderBy = vi.fn().mockResolvedValue([{ did: 'did:plc:bad' }]); 249 + db._mocks.selectWhere.mockReturnValue({ orderBy: mockOrderBy }); 250 + // For suppressedDids, we use a simpler select pattern 251 + db._mocks.selectWhere.mockResolvedValue([{ did: 'did:plc:bad' }]); 252 + 253 + const result = await isDidSuppressed(db as never, 'did:plc:bad'); 254 + expect(result).toBe(true); 255 + }); 256 + 257 + it('returns false when DID is not suppressed', async () => { 258 + const { isDidSuppressed } = await getModule(); 259 + 260 + db._mocks.selectWhere.mockResolvedValue([]); 261 + 262 + const result = await isDidSuppressed(db as never, 'did:plc:good'); 263 + expect(result).toBe(false); 264 + }); 265 + }); 266 + 267 + describe('suppressDid', () => { 268 + it('inserts suppression, deletes stats, and clears Valkey keys', async () => { 269 + const { suppressDid } = await getModule(); 270 + 271 + valkey.keys.mockResolvedValue(['activity:did:plc:bad:bluesky']); 272 + 273 + await suppressDid(db as never, valkey as never, 'did:plc:bad'); 274 + 275 + // Should insert into suppressedDids 276 + expect(db.insert).toHaveBeenCalled(); 277 + expect(db._mocks.onConflictDoNothing).toHaveBeenCalled(); 278 + 279 + // Should delete userAppStats 280 + expect(db.delete).toHaveBeenCalled(); 281 + 282 + // Should delete Valkey keys 283 + expect(valkey.del).toHaveBeenCalledWith('pds-scan:did:plc:bad'); 284 + expect(valkey.del).toHaveBeenCalledWith('activity-teaser:did:plc:bad'); 285 + expect(valkey.keys).toHaveBeenCalledWith('activity:did:plc:bad:*'); 286 + }); 287 + }); 288 + }); 289 + 290 + // Need afterEach at module level for vitest 291 + import { afterEach } from 'vitest';
+283
tests/services/pds-scanner.test.ts
··· 1 + import { describe, it, expect, vi, beforeEach } from 'vitest'; 2 + import type { AppRegistryEntry } from '../../src/lib/atproto-app-registry.js'; 3 + 4 + // Mock the app registry 5 + vi.mock('../../src/lib/atproto-app-registry.js', () => ({ 6 + getAppsRegistry: vi.fn(), 7 + EXCLUDED_COLLECTIONS: ['app.bsky.feed.like', 'app.bsky.feed.repost'], 8 + })); 9 + 10 + // Mock @atproto/api 11 + const mockListRecords = vi.fn(); 12 + vi.mock('@atproto/api', () => { 13 + class MockAgent { 14 + com = { 15 + atproto: { 16 + repo: { 17 + listRecords: mockListRecords, 18 + }, 19 + }, 20 + }; 21 + } 22 + return { Agent: MockAgent }; 23 + }); 24 + 25 + import { scanUserApps } from '../../src/services/pds-scanner.js'; 26 + import { getAppsRegistry } from '../../src/lib/atproto-app-registry.js'; 27 + 28 + const mockedGetAppsRegistry = vi.mocked(getAppsRegistry); 29 + 30 + function daysAgo(n: number): string { 31 + const d = new Date(); 32 + d.setDate(d.getDate() - n); 33 + return d.toISOString(); 34 + } 35 + 36 + function makeRecord(createdAt: string, uri?: string) { 37 + return { 38 + uri: uri ?? 'at://did:plc:test/collection/rkey', 39 + cid: 'bafytest', 40 + value: { createdAt }, 41 + }; 42 + } 43 + 44 + function makeRegistry(overrides: Partial<AppRegistryEntry>[] = []): AppRegistryEntry[] { 45 + const defaults: AppRegistryEntry[] = [ 46 + { 47 + id: 'bluesky', 48 + name: 'Bluesky', 49 + category: 'Posts', 50 + collectionPrefixes: ['app.bsky.feed'], 51 + scanCollections: ['app.bsky.feed.post'], 52 + color: 'sky', 53 + }, 54 + { 55 + id: 'whitewind', 56 + name: 'Whitewind', 57 + category: 'Articles', 58 + collectionPrefixes: ['com.whtwnd'], 59 + scanCollections: ['com.whtwnd.blog.entry'], 60 + color: 'slate', 61 + }, 62 + ]; 63 + if (overrides.length > 0) { 64 + return overrides.map((o, i) => ({ 65 + ...(defaults[i % defaults.length] as AppRegistryEntry), 66 + ...o, 67 + })); 68 + } 69 + return defaults; 70 + } 71 + 72 + describe('scanUserApps', () => { 73 + beforeEach(() => { 74 + vi.clearAllMocks(); 75 + }); 76 + 77 + it('calls listRecords for each scan collection', async () => { 78 + mockedGetAppsRegistry.mockReturnValue(makeRegistry()); 79 + mockListRecords.mockResolvedValue({ 80 + data: { records: [], cursor: undefined }, 81 + }); 82 + 83 + await scanUserApps('https://pds.example.com', 'did:plc:test'); 84 + 85 + expect(mockListRecords).toHaveBeenCalledWith( 86 + expect.objectContaining({ 87 + repo: 'did:plc:test', 88 + collection: 'app.bsky.feed.post', 89 + limit: 100, 90 + }), 91 + expect.anything(), 92 + ); 93 + expect(mockListRecords).toHaveBeenCalledWith( 94 + expect.objectContaining({ 95 + repo: 'did:plc:test', 96 + collection: 'com.whtwnd.blog.entry', 97 + limit: 100, 98 + }), 99 + expect.anything(), 100 + ); 101 + }); 102 + 103 + it('counts records within the 90-day window', async () => { 104 + mockedGetAppsRegistry.mockReturnValue( 105 + makeRegistry([ 106 + { 107 + id: 'bluesky', 108 + name: 'Bluesky', 109 + category: 'Posts', 110 + collectionPrefixes: ['app.bsky.feed'], 111 + scanCollections: ['app.bsky.feed.post'], 112 + color: 'sky', 113 + }, 114 + ]), 115 + ); 116 + 117 + mockListRecords.mockResolvedValueOnce({ 118 + data: { 119 + records: [ 120 + makeRecord(daysAgo(1)), 121 + makeRecord(daysAgo(10)), 122 + makeRecord(daysAgo(89)), 123 + makeRecord(daysAgo(100)), // outside window 124 + ], 125 + cursor: undefined, 126 + }, 127 + }); 128 + 129 + const results = await scanUserApps('https://pds.example.com', 'did:plc:test'); 130 + const bluesky = results.find((r) => r.appId === 'bluesky'); 131 + 132 + expect(bluesky).toBeDefined(); 133 + expect(bluesky?.recentCount).toBe(3); 134 + expect(bluesky?.isActive).toBe(true); 135 + }); 136 + 137 + it('stops pagination when all records fall outside the 90-day window', async () => { 138 + mockedGetAppsRegistry.mockReturnValue( 139 + makeRegistry([ 140 + { 141 + id: 'bluesky', 142 + name: 'Bluesky', 143 + category: 'Posts', 144 + collectionPrefixes: ['app.bsky.feed'], 145 + scanCollections: ['app.bsky.feed.post'], 146 + color: 'sky', 147 + }, 148 + ]), 149 + ); 150 + 151 + // Page 1: some recent records + cursor 152 + mockListRecords.mockResolvedValueOnce({ 153 + data: { 154 + records: [makeRecord(daysAgo(5)), makeRecord(daysAgo(30))], 155 + cursor: 'page2', 156 + }, 157 + }); 158 + 159 + // Page 2: all records outside window 160 + mockListRecords.mockResolvedValueOnce({ 161 + data: { 162 + records: [makeRecord(daysAgo(100)), makeRecord(daysAgo(120))], 163 + cursor: 'page3', 164 + }, 165 + }); 166 + 167 + await scanUserApps('https://pds.example.com', 'did:plc:test'); 168 + 169 + // Should NOT have fetched page 3 170 + expect(mockListRecords).toHaveBeenCalledTimes(2); 171 + }); 172 + 173 + it('skips excluded collections', async () => { 174 + mockedGetAppsRegistry.mockReturnValue([ 175 + { 176 + id: 'bluesky-likes', 177 + name: 'Bluesky Likes', 178 + category: 'Posts', 179 + collectionPrefixes: ['app.bsky.feed'], 180 + scanCollections: ['app.bsky.feed.like'], // excluded 181 + color: 'sky', 182 + }, 183 + ]); 184 + 185 + mockListRecords.mockResolvedValue({ 186 + data: { records: [], cursor: undefined }, 187 + }); 188 + 189 + const results = await scanUserApps('https://pds.example.com', 'did:plc:test'); 190 + 191 + expect(mockListRecords).not.toHaveBeenCalled(); 192 + // Should still return a result for the app, but with zero counts 193 + const likesApp = results.find((r) => r.appId === 'bluesky-likes'); 194 + expect(likesApp).toBeDefined(); 195 + expect(likesApp?.recentCount).toBe(0); 196 + expect(likesApp?.isActive).toBe(false); 197 + }); 198 + 199 + it('does not fail the entire scan when one collection errors', async () => { 200 + mockedGetAppsRegistry.mockReturnValue(makeRegistry()); 201 + 202 + // Bluesky fails 203 + mockListRecords.mockImplementation(async (params: { collection: string }) => { 204 + if (params.collection === 'app.bsky.feed.post') { 205 + throw new Error('PDS timeout'); 206 + } 207 + return { 208 + data: { 209 + records: [makeRecord(daysAgo(5))], 210 + cursor: undefined, 211 + }, 212 + }; 213 + }); 214 + 215 + const results = await scanUserApps('https://pds.example.com', 'did:plc:test'); 216 + 217 + expect(results).toHaveLength(2); 218 + 219 + const bluesky = results.find((r) => r.appId === 'bluesky'); 220 + expect(bluesky?.isActive).toBe(false); 221 + expect(bluesky?.recentCount).toBe(0); 222 + 223 + const whitewind = results.find((r) => r.appId === 'whitewind'); 224 + expect(whitewind?.isActive).toBe(true); 225 + expect(whitewind?.recentCount).toBe(1); 226 + }); 227 + 228 + it('captures the most recent latestRecordAt', async () => { 229 + const recentDate = daysAgo(2); 230 + const olderDate = daysAgo(50); 231 + 232 + mockedGetAppsRegistry.mockReturnValue( 233 + makeRegistry([ 234 + { 235 + id: 'bluesky', 236 + name: 'Bluesky', 237 + category: 'Posts', 238 + collectionPrefixes: ['app.bsky.feed'], 239 + scanCollections: ['app.bsky.feed.post'], 240 + color: 'sky', 241 + }, 242 + ]), 243 + ); 244 + 245 + mockListRecords.mockResolvedValueOnce({ 246 + data: { 247 + records: [makeRecord(olderDate), makeRecord(recentDate)], 248 + cursor: undefined, 249 + }, 250 + }); 251 + 252 + const results = await scanUserApps('https://pds.example.com', 'did:plc:test'); 253 + const bluesky = results.find((r) => r.appId === 'bluesky'); 254 + 255 + expect(bluesky?.latestRecordAt).toEqual(new Date(recentDate)); 256 + }); 257 + 258 + it('uses first collectionPrefix when scanCollections is empty', async () => { 259 + mockedGetAppsRegistry.mockReturnValue([ 260 + { 261 + id: 'tangled', 262 + name: 'Tangled', 263 + category: 'Code', 264 + collectionPrefixes: ['sh.tangled'], 265 + scanCollections: [], 266 + color: 'emerald', 267 + }, 268 + ]); 269 + 270 + mockListRecords.mockResolvedValue({ 271 + data: { records: [], cursor: undefined }, 272 + }); 273 + 274 + await scanUserApps('https://pds.example.com', 'did:plc:test'); 275 + 276 + expect(mockListRecords).toHaveBeenCalledWith( 277 + expect.objectContaining({ 278 + collection: 'sh.tangled', 279 + }), 280 + expect.anything(), 281 + ); 282 + }); 283 + });