import { count, desc, sql, gte, eq, isNotNull, or, notInArray } from 'drizzle-orm'; import { z } from 'zod'; import type { FastifyInstance } from 'fastify'; import type { NodeOAuthClient } from '@atproto/oauth-client-node'; import type { Database } from '../db/index.js'; import type { ValkeyClient } from '../cache/index.js'; import type { Env } from '../config.js'; import { profiles, linkedinImports, positions, education, skills, certifications, } from '../db/schema/index.js'; import { userAppStats } from '../db/schema/user-app-stats.js'; import { getAppsRegistry } from '../lib/atproto-app-registry.js'; import { mapPdsHostToProvider } from '../lib/pds-provider.js'; import { COUNTRY_CENTROIDS, resolveCountryCode } from '../lib/country-centroids.js'; import { createAuthMiddleware } from '../middleware/auth.js'; import { createAdminMiddleware } from '../middleware/admin.js'; const CACHE_TTL = 300; // 5 minutes const SITE_LAUNCH_DATE = '2026-03-04'; /** Generate all YYYY-MM-DD strings from startDate to endDate (inclusive). */ function allDatesBetween(startDate: string, endDate: string): string[] { const dates: string[] = []; const current = new Date(startDate + 'T00:00:00Z'); const end = new Date(endDate + 'T00:00:00Z'); while (current <= end) { dates.push(current.toISOString().slice(0, 10)); current.setUTCDate(current.getUTCDate() + 1); } return dates; } /** Fill missing dates in a sparse array with a default record. */ function fillDateGaps( rows: T[], days: number, defaults: Omit, ): T[] { const today = new Date().toISOString().slice(0, 10); let startDate: string; if (days > 0) { const start = new Date(); start.setUTCDate(start.getUTCDate() - days); startDate = start.toISOString().slice(0, 10); } else { startDate = SITE_LAUNCH_DATE; } if (startDate < SITE_LAUNCH_DATE) startDate = SITE_LAUNCH_DATE; const endDate = today; const lookup = new Map(rows.map((r) => [r.date, r])); return allDatesBetween(startDate, endDate).map( (date) => lookup.get(date) ?? ({ date, ...defaults } as T), ); } const querySchema = z.object({ days: z.enum(['7', '30', '90', '0']).default('30'), }); const latestSignupsSchema = z.object({ limit: z.coerce.number().int().min(1).max(100).default(20), offset: z.coerce.number().int().min(0).default(0), filter: z.enum(['all', 'no-import', 'gt50', 'complete']).default('all'), }); interface SignupRow { date: string; count: number; } interface SignupEntry { date: string; count: number; cumulative: number; } interface SignupsResponse { totalUsers: number; signups: SignupEntry[]; } export function registerAdminStatsRoutes( app: FastifyInstance, db: Database, valkey: ValkeyClient | null, oauthClient: NodeOAuthClient | null, config: Env, ) { const requireAuth = createAuthMiddleware(oauthClient, db); const requireAdmin = createAdminMiddleware(config); app.get( '/api/admin/stats/signups', { preHandler: [requireAuth, requireAdmin] }, async (request, reply) => { const parsed = querySchema.safeParse(request.query); if (!parsed.success) { return reply.status(400).send({ error: 'Invalid query', details: parsed.error.format() }); } const days = Number(parsed.data.days); const cacheKey = `admin:stats:signups:${days}`; // Check cache if (valkey) { const cached = await valkey.get(cacheKey); if (cached !== null) { return reply.send(JSON.parse(cached) as SignupsResponse); } } // Total user count (always all-time) const [totalResult] = await db.select({ value: count() }).from(profiles); const totalUsers = totalResult?.value ?? 0; // Signups grouped by date let signupRows: SignupRow[]; if (days > 0) { const rows = await db .select({ date: sql`DATE(${profiles.createdAt})`.as('date'), count: count().as('count'), }) .from(profiles) .where(gte(profiles.createdAt, sql`NOW() - INTERVAL '${sql.raw(String(days))} days'`)) .groupBy(sql`DATE(${profiles.createdAt})`) .orderBy(sql`DATE(${profiles.createdAt})`); signupRows = rows.map((r) => ({ date: String(r.date), count: r.count })); } else { const rows = await db .select({ date: sql`DATE(${profiles.createdAt})`.as('date'), count: count().as('count'), }) .from(profiles) .groupBy(sql`DATE(${profiles.createdAt})`) .orderBy(sql`DATE(${profiles.createdAt})`); signupRows = rows.map((r) => ({ date: String(r.date), count: r.count })); } // Fill in missing dates with zero counts const filledRows = fillDateGaps(signupRows, days, { count: 0 }); // For windowed queries, get count of users before the window // so cumulative reflects the real total at each date let priorCount = 0; if (days > 0) { const [priorResult] = await db .select({ value: count() }) .from(profiles) .where(sql`${profiles.createdAt} < NOW() - INTERVAL '${sql.raw(String(days))} days'`); priorCount = priorResult?.value ?? 0; } // Build cumulative let running = priorCount; const signups: SignupEntry[] = filledRows.map((row) => { running += row.count; return { date: row.date, count: row.count, cumulative: running }; }); const response: SignupsResponse = { totalUsers, signups }; // Cache result if (valkey) { await valkey.setex(cacheKey, CACHE_TTL, JSON.stringify(response)); } return reply.send(response); }, ); app.get( '/api/admin/stats/latest-signups', { preHandler: [requireAuth, requireAdmin] }, async (request, reply) => { const parsed = latestSignupsSchema.safeParse(request.query); if (!parsed.success) { return reply.status(400).send({ error: 'Invalid query', details: parsed.error.format() }); } const { limit, offset, filter } = parsed.data; const cacheKey = `admin:stats:latest-signups:${filter}:${limit}:${offset}`; if (valkey) { const cached = await valkey.get(cacheKey); if (cached !== null) { return reply.send(JSON.parse(cached)); } } // Subquery: count of successful LinkedIn imports per user const importCountSq = db .select({ did: linkedinImports.did, cnt: count().as('import_cnt'), }) .from(linkedinImports) .where(eq(linkedinImports.success, true)) .groupBy(linkedinImports.did) .as('import_counts'); // Subqueries for profile completion counts const posCountSq = db .select({ did: positions.did, cnt: count().as('pos_cnt'), }) .from(positions) .groupBy(positions.did) .as('pos_counts'); const eduCountSq = db .select({ did: education.did, cnt: count().as('edu_cnt'), }) .from(education) .groupBy(education.did) .as('edu_counts'); const skillCountSq = db .select({ did: skills.did, cnt: count().as('skill_cnt'), }) .from(skills) .groupBy(skills.did) .as('skill_counts'); const certCountSq = db .select({ did: certifications.did, cnt: count().as('cert_cnt'), }) .from(certifications) .groupBy(certifications.did) .as('cert_counts'); let query = db .select({ did: profiles.did, handle: profiles.handle, displayName: profiles.displayName, avatarUrl: profiles.avatarUrl, headline: profiles.headline, about: profiles.about, createdAt: profiles.createdAt, importCount: sql`COALESCE(${importCountSq.cnt}, 0)`.as('import_count'), positionCount: sql`COALESCE(${posCountSq.cnt}, 0)`.as('position_count'), educationCount: sql`COALESCE(${eduCountSq.cnt}, 0)`.as('education_count'), skillCount: sql`COALESCE(${skillCountSq.cnt}, 0)`.as('skill_count'), certificationCount: sql`COALESCE(${certCountSq.cnt}, 0)`.as( 'certification_count', ), }) .from(profiles) .leftJoin(importCountSq, eq(profiles.did, importCountSq.did)) .leftJoin(posCountSq, eq(profiles.did, posCountSq.did)) .leftJoin(eduCountSq, eq(profiles.did, eduCountSq.did)) .leftJoin(skillCountSq, eq(profiles.did, skillCountSq.did)) .leftJoin(certCountSq, eq(profiles.did, certCountSq.did)) .orderBy(desc(profiles.createdAt)) .limit(limit) .offset(offset) .$dynamic(); const completionExpr = sql`( CASE WHEN ${profiles.headline} IS NOT NULL AND ${profiles.headline} != '' THEN 1 ELSE 0 END + CASE WHEN ${profiles.about} IS NOT NULL AND ${profiles.about} != '' THEN 1 ELSE 0 END + CASE WHEN ${posCountSq.cnt} IS NOT NULL THEN 1 ELSE 0 END + CASE WHEN ${eduCountSq.cnt} IS NOT NULL THEN 1 ELSE 0 END + CASE WHEN ${skillCountSq.cnt} IS NOT NULL THEN 1 ELSE 0 END + CASE WHEN ${certCountSq.cnt} IS NOT NULL THEN 1 ELSE 0 END )`; const noImportFilter = sql`${importCountSq.cnt} IS NULL AND ${completionExpr} <= 3`; const gt50Filter = sql`${completionExpr} > 3 AND ${completionExpr} < 6`; const completeFilter = sql`${completionExpr} = 6`; const filterMap: Record | null> = { all: null, 'no-import': noImportFilter, gt50: gt50Filter, complete: completeFilter, }; const activeFilter = filterMap[filter] ?? null; if (activeFilter) { query = query.where(activeFilter); } // Total count for pagination let totalCount: number; if (activeFilter) { const [countResult] = await db .select({ value: count() }) .from(profiles) .leftJoin(importCountSq, eq(profiles.did, importCountSq.did)) .leftJoin(posCountSq, eq(profiles.did, posCountSq.did)) .leftJoin(eduCountSq, eq(profiles.did, eduCountSq.did)) .leftJoin(skillCountSq, eq(profiles.did, skillCountSq.did)) .leftJoin(certCountSq, eq(profiles.did, certCountSq.did)) .where(activeFilter); totalCount = countResult?.value ?? 0; } else { const [countResult] = await db.select({ value: count() }).from(profiles); totalCount = countResult?.value ?? 0; } const rows = await query; const users = rows.map((r) => ({ did: r.did, handle: r.handle, displayName: r.displayName, avatarUrl: r.avatarUrl, createdAt: r.createdAt.toISOString(), hasImported: Number(r.importCount) > 0, profileCompletion: { hasHeadline: !!r.headline, hasAbout: !!r.about, positionCount: Number(r.positionCount), educationCount: Number(r.educationCount), skillCount: Number(r.skillCount), certificationCount: Number(r.certificationCount), }, })); const response = { users, total: totalCount }; if (valkey) { await valkey.setex(cacheKey, CACHE_TTL, JSON.stringify(response)); } return reply.send(response); }, ); app.get( '/api/admin/stats/active-users', { preHandler: [requireAuth, requireAdmin] }, async (request, reply) => { const parsed = querySchema.safeParse(request.query); if (!parsed.success) { return reply.status(400).send({ error: 'Invalid query', details: parsed.error.format() }); } const days = Number(parsed.data.days); const cacheKey = `admin:stats:active-users:${days}`; if (valkey) { const cached = await valkey.get(cacheKey); if (cached !== null) { return reply.send(JSON.parse(cached)); } } // DAU: count distinct users active per day const dauDays = days > 0 ? days : 90; const dauRows = await db .select({ date: sql`DATE(${profiles.lastActiveAt})`.as('date'), count: count().as('count'), }) .from(profiles) .where( sql`${profiles.lastActiveAt} IS NOT NULL AND ${profiles.lastActiveAt} >= NOW() - INTERVAL '${sql.raw(String(dauDays))} days'`, ) .groupBy(sql`DATE(${profiles.lastActiveAt})`) .orderBy(sql`DATE(${profiles.lastActiveAt})`); const dailySparse = dauRows.map((r) => ({ date: String(r.date), count: r.count })); const daily = fillDateGaps(dailySparse, dauDays, { count: 0 }); // MAU: count distinct users active per month (last 12 months max) const mauRows = await db .select({ month: sql`TO_CHAR(${profiles.lastActiveAt}, 'YYYY-MM')`.as('month'), count: count().as('count'), }) .from(profiles) .where( sql`${profiles.lastActiveAt} IS NOT NULL AND ${profiles.lastActiveAt} >= NOW() - INTERVAL '12 months'`, ) .groupBy(sql`TO_CHAR(${profiles.lastActiveAt}, 'YYYY-MM')`) .orderBy(sql`TO_CHAR(${profiles.lastActiveAt}, 'YYYY-MM')`); const monthly = mauRows.map((r) => ({ month: String(r.month), count: r.count })); const response = { daily, monthly }; if (valkey) { await valkey.setex(cacheKey, CACHE_TTL, JSON.stringify(response)); } return reply.send(response); }, ); app.get( '/api/admin/stats/pds-distribution', { preHandler: [requireAuth, requireAdmin] }, async (_request, reply) => { const cacheKey = 'admin:stats:pds-distribution'; if (valkey) { const cached = await valkey.get(cacheKey); if (cached !== null) { return reply.send(JSON.parse(cached)); } } const rows = await db .select({ pdsHost: profiles.pdsHost, count: count().as('count'), }) .from(profiles) .where(sql`${profiles.pdsHost} IS NOT NULL`) .groupBy(profiles.pdsHost); // Group using known provider mapping: bluesky, eurosky, etc. → display name; selfhosted → "Self-hosted" const PROVIDER_LABELS: Record = { bluesky: 'Bluesky', blacksky: 'Blacksky', eurosky: 'Eurosky', northsky: 'Northsky', 'selfhosted-social': 'selfhosted.social', selfhosted: 'Self-hosted', }; const groups = new Map(); for (const row of rows) { const host = row.pdsHost ?? ''; const provider = mapPdsHostToProvider(host); const label = PROVIDER_LABELS[provider.name] ?? provider.name; groups.set(label, (groups.get(label) ?? 0) + row.count); } const slices = Array.from(groups.entries()) .map(([name, value]) => ({ name, value })) .sort((a, b) => b.value - a.value); const response = { slices }; if (valkey) { await valkey.setex(cacheKey, CACHE_TTL, JSON.stringify(response)); } return reply.send(response); }, ); app.get( '/api/admin/stats/linkedin-imports', { preHandler: [requireAuth, requireAdmin] }, async (request, reply) => { const parsed = querySchema.safeParse(request.query); if (!parsed.success) { return reply.status(400).send({ error: 'Invalid query', details: parsed.error.format() }); } const days = Number(parsed.data.days); const cacheKey = `admin:stats:linkedin-imports:${days}`; if (valkey) { const cached = await valkey.get(cacheKey); if (cached !== null) { return reply.send(JSON.parse(cached)); } } const importDays = days > 0 ? days : 90; const rows = await db .select({ date: sql`DATE(${linkedinImports.createdAt})`.as('date'), successCount: sql`COUNT(*) FILTER (WHERE ${linkedinImports.success} = true)`.as( 'success_count', ), failureCount: sql`COUNT(*) FILTER (WHERE ${linkedinImports.success} = false)`.as( 'failure_count', ), totalItems: sql`COALESCE(SUM( ${linkedinImports.positionCount} + ${linkedinImports.educationCount} + ${linkedinImports.skillCount} + ${linkedinImports.certificationCount} + ${linkedinImports.projectCount} + ${linkedinImports.volunteeringCount} + ${linkedinImports.publicationCount} + ${linkedinImports.courseCount} + ${linkedinImports.honorCount} + ${linkedinImports.languageCount} ), 0)`.as('total_items'), }) .from(linkedinImports) .where( sql`${linkedinImports.createdAt} >= NOW() - INTERVAL '${sql.raw(String(importDays))} days'`, ) .groupBy(sql`DATE(${linkedinImports.createdAt})`) .orderBy(sql`DATE(${linkedinImports.createdAt})`); const dailySparse = rows.map((r) => ({ date: String(r.date), successCount: Number(r.successCount), failureCount: Number(r.failureCount), totalItems: Number(r.totalItems), })); const daily = fillDateGaps(dailySparse, importDays, { successCount: 0, failureCount: 0, totalItems: 0, }); const totalImports = daily.reduce((s, d) => s + d.successCount + d.failureCount, 0); const totalSuccess = daily.reduce((s, d) => s + d.successCount, 0); const totalItems = daily.reduce((s, d) => s + d.totalItems, 0); const successRate = totalImports > 0 ? Math.round((totalSuccess / totalImports) * 100) : 0; const response = { daily, summary: { totalImports, totalSuccess, totalItems, successRate } }; if (valkey) { await valkey.setex(cacheKey, CACHE_TTL, JSON.stringify(response)); } return reply.send(response); }, ); app.get( '/api/admin/stats/user-locations', { preHandler: [requireAuth, requireAdmin] }, async (_request, reply) => { const cacheKey = 'admin:stats:user-locations'; if (valkey) { const cached = await valkey.get(cacheKey); if (cached !== null) { return reply.send(JSON.parse(cached)); } } // Aggregate profiles by country_code + location_city const rows = await db .select({ countryCode: profiles.countryCode, locationCountry: profiles.locationCountry, locationCity: profiles.locationCity, latitude: profiles.latitude, longitude: profiles.longitude, count: count().as('count'), }) .from(profiles) .where(or(isNotNull(profiles.countryCode), isNotNull(profiles.locationCountry))) .groupBy( profiles.countryCode, profiles.locationCountry, profiles.locationCity, profiles.latitude, profiles.longitude, ); interface LocationPoint { lat: number; lng: number; count: number; label: string; } // Merge rows into coordinate-based points. // City-level rows get a small offset from the country centroid to avoid // perfect overlap, while country-only rows use the centroid directly. const pointMap = new Map(); for (const row of rows) { const code = resolveCountryCode(row.countryCode, row.locationCountry); if (!code) continue; const centroid = COUNTRY_CENTROIDS[code]; if (!centroid) continue; const city = row.locationCity?.trim() ?? ''; const country = row.locationCountry?.trim() ?? code; // Build a dedup key: country+city const key = `${code}:${city.toLowerCase()}`; const label = city ? `${city}, ${country}` : country; const existing = pointMap.get(key); if (existing) { existing.count += row.count; } else { // Prefer stored coordinates, fall back to country centroid const lat = row.latitude ?? centroid.lat; const lng = row.longitude ?? centroid.lng; pointMap.set(key, { lat, lng, count: row.count, label }); } } const locations = Array.from(pointMap.values()).sort((a, b) => b.count - a.count); const response = { locations }; if (valkey) { await valkey.setex(cacheKey, CACHE_TTL, JSON.stringify(response)); } return reply.send(response); }, ); const unregisteredSchema = z.object({ limit: z.coerce.number().int().min(1).max(100).default(10), offset: z.coerce.number().int().min(0).default(0), }); app.get<{ Querystring: { limit?: string; offset?: string } }>( '/api/admin/stats/unregistered-collections', { preHandler: [requireAuth, requireAdmin] }, async (request, reply) => { const parsed = unregisteredSchema.safeParse(request.query); if (!parsed.success) { return reply.status(400).send({ error: 'Invalid query', details: parsed.error.format() }); } const { limit, offset } = parsed.data; const cacheKey = `admin:stats:unregistered-collections:${limit}:${offset}`; if (valkey) { try { const cached = await valkey.get(cacheKey); if (cached !== null) { return reply.send(JSON.parse(cached)); } } catch { // Cache miss -- fall through to query } } const registeredIds = getAppsRegistry().map((e) => e.id); const whereClause = sql`${notInArray(userAppStats.appId, registeredIds)} AND ${eq(userAppStats.isActive, true)}`; const [countResult] = await db .select({ value: sql`COUNT(DISTINCT ${userAppStats.appId})`.as('total') }) .from(userAppStats) .where(whereClause); const total = Number(countResult?.value ?? 0); const rows = await db .select({ collection: userAppStats.appId, userCount: sql`COUNT(DISTINCT ${userAppStats.did})`.as('user_count'), totalRecords: sql`COALESCE(SUM(${userAppStats.recentCount}), 0)`.as( 'total_records', ), latestSeenAt: sql`MAX(${userAppStats.latestRecordAt})`.as('latest_seen_at'), }) .from(userAppStats) .where(whereClause) .groupBy(userAppStats.appId) .orderBy( desc(sql`COUNT(DISTINCT ${userAppStats.did})`), desc(sql`COALESCE(SUM(${userAppStats.recentCount}), 0)`), ) .limit(limit) .offset(offset); const collections = rows.map((r) => { const parts = r.collection.split('.'); const namespace = parts.length >= 2 ? `${parts[0]}.${parts[1]}` : r.collection; return { collection: r.collection, namespace, userCount: Number(r.userCount), totalRecords: Number(r.totalRecords), latestSeenAt: r.latestSeenAt ?? null, }; }); const response = { collections, total }; if (valkey) { try { await valkey.setex(cacheKey, CACHE_TTL, JSON.stringify(response)); } catch { // Cache write failure is non-critical } } return reply.send(response); }, ); }