my pkgs monorepo
at main 320 lines 11 kB view raw
1import type { AtpAgent } from '@atproto/api'; 2import type { PlayRecord, Config } from '../types.js'; 3import { fetchRepoViaCAR, getPdsUrlFromAgent, getAgentToken } from '../utils/car-fetch.js'; 4import { formatDate, formatDateRange } from '../utils/helpers.js'; 5import * as ui from '../utils/ui.js'; 6import { log } from '../utils/logger.js'; 7import { isCacheValid, loadCache, saveCache, getCacheInfo } from '../utils/teal-cache.js'; 8 9interface ExistingRecord { 10 uri: string; 11 cid: string; 12 value: PlayRecord; 13} 14 15interface DuplicateGroup { 16 key: string; 17 records: ExistingRecord[]; 18} 19 20/** 21 * Fetch all existing play records from Teal via a single CAR export. 22 * Uses com.atproto.sync.getRepo (sync namespace) — separate, generous 23 * rate-limit envelope; burns zero AppView write-quota points. 24 */ 25export async function fetchExistingRecords( 26 agent: AtpAgent, 27 config: Config, 28 forceRefresh: boolean = false 29): Promise<Map<string, ExistingRecord>> { 30 log.section('Checking Existing Records'); 31 const { RECORD_TYPE } = config; 32 const did = agent.session?.did; 33 34 if (!did) { 35 throw new Error('No authenticated session found'); 36 } 37 38 // Serve from cache when valid and not forcing a refresh 39 if (!forceRefresh && isCacheValid(did)) { 40 const cacheInfo = getCacheInfo(did); 41 log.info(`📂 Loading from cache (${cacheInfo.age!.toFixed(1)}h old, ${cacheInfo.records!.toLocaleString()} records)...`); 42 const cached = loadCache(did); 43 if (cached) { 44 const existingRecords = new Map<string, ExistingRecord>(); 45 for (const [, record] of cached.entries()) { 46 const playRecord = record.value as PlayRecord; 47 existingRecords.set(createRecordKey(playRecord), record as ExistingRecord); 48 } 49 log.success(`✓ Loaded ${existingRecords.size.toLocaleString()} records from cache`); 50 log.blank(); 51 return existingRecords; 52 } 53 } 54 55 if (forceRefresh) { 56 log.info('🔄 Force refresh — fetching repo via CAR export...'); 57 } else { 58 log.info('📦 Fetching repo via CAR export (no rate-limit points consumed)...'); 59 } 60 61 const pdsUrl = getPdsUrlFromAgent(agent); 62 const token = await getAgentToken(agent); 63 const carStart = Date.now(); 64 const carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, undefined, token); 65 const carElapsed = ((Date.now() - carStart) / 1000).toFixed(1); 66 67 const existingRecords = new Map<string, ExistingRecord>(); 68 const cacheMap = new Map<string, { uri: string; cid: string; value: any }>(); 69 70 for (const rec of carRecords) { 71 const playRecord = rec.value as PlayRecord; 72 const entry = { uri: rec.uri, cid: rec.cid, value: playRecord }; 73 existingRecords.set(createRecordKey(playRecord), entry); 74 cacheMap.set(rec.uri, entry); 75 } 76 77 log.success(`✓ Loaded ${existingRecords.size.toLocaleString()} records via CAR in ${carElapsed}s`); 78 saveCache(did, cacheMap); 79 log.blank(); 80 return existingRecords; 81} 82 83/** 84 * Fetch ALL existing play records as an array (including duplicates) via CAR export. 85 * Used by the deduplicate flow. 86 */ 87export async function fetchAllRecords( 88 agent: AtpAgent, 89 config: Config 90): Promise<ExistingRecord[]> { 91 const { RECORD_TYPE } = config; 92 const did = agent.session?.did; 93 94 if (!did) { 95 throw new Error('No authenticated session found'); 96 } 97 98 ui.startSpinner('📦 Fetching repo via CAR export...'); 99 100 const pdsUrl = getPdsUrlFromAgent(agent); 101 const token = await getAgentToken(agent); 102 const carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, undefined, token); 103 const allRecords: ExistingRecord[] = carRecords.map((rec) => ({ 104 uri: rec.uri, 105 cid: rec.cid, 106 value: rec.value as PlayRecord, 107 })); 108 109 ui.succeedSpinner(`Found ${allRecords.length.toLocaleString()} records via CAR`); 110 return allRecords; 111} 112 113/** 114 * Create a unique key for a play record based on its essential properties. 115 */ 116export function createRecordKey(record: PlayRecord): string { 117 const artist = (record.artists[0]?.artistName ?? '').toLowerCase().trim(); 118 const track = record.trackName.toLowerCase().trim(); 119 return `${artist}|||${track}|||${record.playedTime}`; 120} 121 122/** 123 * Deduplicate input records before submission. 124 * Keeps the first occurrence of each duplicate. 125 */ 126export function deduplicateInputRecords(records: PlayRecord[]): { unique: PlayRecord[]; duplicates: number } { 127 const seen = new Map<string, PlayRecord>(); 128 let duplicates = 0; 129 130 for (const record of records) { 131 const key = createRecordKey(record); 132 if (!seen.has(key)) { 133 seen.set(key, record); 134 } else { 135 duplicates++; 136 } 137 } 138 139 return { unique: Array.from(seen.values()), duplicates }; 140} 141 142/** 143 * Filter out records that already exist in Teal. 144 */ 145export function filterNewRecords( 146 lastfmRecords: PlayRecord[], 147 existingRecords: Map<string, ExistingRecord> 148): PlayRecord[] { 149 log.section('Identifying New Records'); 150 151 const newRecords: PlayRecord[] = []; 152 const duplicates: PlayRecord[] = []; 153 154 for (const record of lastfmRecords) { 155 if (existingRecords.has(createRecordKey(record))) { 156 duplicates.push(record); 157 } else { 158 newRecords.push(record); 159 } 160 } 161 162 log.info(`Total: ${lastfmRecords.length.toLocaleString()} records`); 163 log.info(`Existing: ${duplicates.length.toLocaleString()} already in Teal`); 164 log.info(`New: ${newRecords.length.toLocaleString()} to import`); 165 log.blank(); 166 167 if (log.getLevel() <= 0 && duplicates.length > 0) { 168 const exampleCount = Math.min(3, duplicates.length); 169 log.debug('Examples of existing records (skipped):'); 170 duplicates.slice(0, exampleCount).forEach((record, i) => { 171 log.debug(` ${i + 1}. ${record.artists[0]?.artistName} - ${record.trackName}`); 172 log.debug(` ${formatDate(record.playedTime, true)}`); 173 }); 174 if (duplicates.length > exampleCount) { 175 log.debug(` ... and ${(duplicates.length - exampleCount).toLocaleString()} more`); 176 } 177 log.blank(); 178 } 179 180 return newRecords; 181} 182 183/** 184 * Get time range of records. 185 */ 186export function getRecordTimeRange(records: PlayRecord[]): { earliest: Date; latest: Date } | null { 187 if (records.length === 0) return null; 188 const times = records.map(r => new Date(r.playedTime).getTime()); 189 return { 190 earliest: new Date(Math.min(...times)), 191 latest: new Date(Math.max(...times)), 192 }; 193} 194 195/** 196 * Display sync statistics. 197 */ 198export function displaySyncStats( 199 lastfmRecords: PlayRecord[], 200 existingRecords: Map<string, ExistingRecord>, 201 newRecords: PlayRecord[] 202): void { 203 const lastfmRange = getRecordTimeRange(lastfmRecords); 204 const existingArray = Array.from(existingRecords.values()).map(r => r.value); 205 const existingRange = getRecordTimeRange(existingArray); 206 207 log.section('Sync Statistics'); 208 log.info(`Last.fm export: ${lastfmRecords.length.toLocaleString()} records`); 209 if (lastfmRange) { 210 log.info(` Range: ${formatDateRange(lastfmRange.earliest, lastfmRange.latest)}`); 211 } 212 log.blank(); 213 214 log.info(`Teal current: ${existingRecords.size.toLocaleString()} records`); 215 if (existingRange) { 216 log.info(` Range: ${formatDateRange(existingRange.earliest, existingRange.latest)}`); 217 } 218 log.blank(); 219 220 log.info(`New to import: ${newRecords.length.toLocaleString()}`); 221 log.info(`Duplicates: ${(lastfmRecords.length - newRecords.length).toLocaleString()} skipped`); 222 log.info(`Match rate: ${((1 - newRecords.length / lastfmRecords.length) * 100).toFixed(1)}%`); 223 log.blank(); 224} 225 226/** 227 * Find duplicate records in the existing records. 228 * Returns groups of duplicates (each group has 2+ records with the same key). 229 */ 230export function findDuplicates(allRecords: ExistingRecord[]): DuplicateGroup[] { 231 const keyGroups = new Map<string, ExistingRecord[]>(); 232 233 for (const record of allRecords) { 234 const key = createRecordKey(record.value); 235 if (!keyGroups.has(key)) keyGroups.set(key, []); 236 keyGroups.get(key)!.push(record); 237 } 238 239 const duplicates: DuplicateGroup[] = []; 240 for (const [key, records] of keyGroups) { 241 if (records.length > 1) duplicates.push({ key, records }); 242 } 243 return duplicates; 244} 245 246/** 247 * Remove duplicate records from Teal, keeping only the first occurrence. 248 */ 249export async function removeDuplicates( 250 agent: AtpAgent, 251 config: Config, 252 dryRun: boolean = false 253): Promise<{ totalDuplicates: number; recordsRemoved: number }> { 254 ui.header('Checking for Duplicate Records'); 255 256 const allRecords = await fetchAllRecords(agent, config); 257 258 ui.startSpinner('Analyzing records for duplicates...'); 259 const duplicateGroups = findDuplicates(allRecords); 260 261 if (duplicateGroups.length === 0) { 262 ui.succeedSpinner('No duplicates found!'); 263 return { totalDuplicates: 0, recordsRemoved: 0 }; 264 } 265 266 ui.stopSpinner(); 267 268 const totalDuplicates = duplicateGroups.reduce((sum, group) => sum + (group.records.length - 1), 0); 269 270 ui.warning(`Found ${duplicateGroups.length.toLocaleString()} duplicate groups (${totalDuplicates.toLocaleString()} records to remove)`); 271 console.log(''); 272 273 const exampleCount = Math.min(5, duplicateGroups.length); 274 ui.subheader('Examples of Duplicates:'); 275 for (let i = 0; i < exampleCount; i++) { 276 const group = duplicateGroups[i]; 277 const firstRecord = group.records[0].value; 278 console.log(` ${i + 1}. ${firstRecord.artists[0]?.artistName} - ${firstRecord.trackName}`); 279 console.log(` ${formatDate(firstRecord.playedTime, true)} · ${group.records.length - 1} duplicate(s)`); 280 } 281 if (duplicateGroups.length > exampleCount) { 282 console.log(` ... and ${duplicateGroups.length - exampleCount} more groups`); 283 } 284 console.log(''); 285 286 if (dryRun) { 287 ui.info('DRY RUN: No records were removed.'); 288 return { totalDuplicates, recordsRemoved: 0 }; 289 } 290 291 console.log(''); 292 const progressBar = ui.createProgressBar(totalDuplicates, 'Removing duplicates'); 293 let recordsRemoved = 0; 294 const startTime = Date.now(); 295 296 for (const group of duplicateGroups) { 297 for (const record of group.records.slice(1)) { 298 try { 299 await agent.com.atproto.repo.deleteRecord({ 300 repo: agent.session?.did || '', 301 collection: record.value.$type, 302 rkey: record.uri.split('/').pop()!, 303 }); 304 recordsRemoved++; 305 const elapsed = (Date.now() - startTime) / 1000; 306 progressBar.update(recordsRemoved, { speed: recordsRemoved / Math.max(elapsed, 0.1) }); 307 } catch { 308 // continue on individual failures 309 } 310 await new Promise(resolve => setTimeout(resolve, 100)); 311 } 312 } 313 314 progressBar.stop(); 315 console.log(''); 316 ui.success(`Removed ${recordsRemoved.toLocaleString()} duplicate records`); 317 ui.info(`Kept ${duplicateGroups.length.toLocaleString()} unique records`); 318 319 return { totalDuplicates, recordsRemoved }; 320}