my pkgs monorepo
1import type { AtpAgent } from '@atproto/api';
2import type { PlayRecord, Config } from '../types.js';
3import { fetchRepoViaCAR, getPdsUrlFromAgent, getAgentToken } from '../utils/car-fetch.js';
4import { formatDate, formatDateRange } from '../utils/helpers.js';
5import * as ui from '../utils/ui.js';
6import { log } from '../utils/logger.js';
7import { isCacheValid, loadCache, saveCache, getCacheInfo } from '../utils/teal-cache.js';
8
9interface ExistingRecord {
10 uri: string;
11 cid: string;
12 value: PlayRecord;
13}
14
15interface DuplicateGroup {
16 key: string;
17 records: ExistingRecord[];
18}
19
20/**
21 * Fetch all existing play records from Teal via a single CAR export.
22 * Uses com.atproto.sync.getRepo (sync namespace) — separate, generous
23 * rate-limit envelope; burns zero AppView write-quota points.
24 */
25export async function fetchExistingRecords(
26 agent: AtpAgent,
27 config: Config,
28 forceRefresh: boolean = false
29): Promise<Map<string, ExistingRecord>> {
30 log.section('Checking Existing Records');
31 const { RECORD_TYPE } = config;
32 const did = agent.session?.did;
33
34 if (!did) {
35 throw new Error('No authenticated session found');
36 }
37
38 // Serve from cache when valid and not forcing a refresh
39 if (!forceRefresh && isCacheValid(did)) {
40 const cacheInfo = getCacheInfo(did);
41 log.info(`📂 Loading from cache (${cacheInfo.age!.toFixed(1)}h old, ${cacheInfo.records!.toLocaleString()} records)...`);
42 const cached = loadCache(did);
43 if (cached) {
44 const existingRecords = new Map<string, ExistingRecord>();
45 for (const [, record] of cached.entries()) {
46 const playRecord = record.value as PlayRecord;
47 existingRecords.set(createRecordKey(playRecord), record as ExistingRecord);
48 }
49 log.success(`✓ Loaded ${existingRecords.size.toLocaleString()} records from cache`);
50 log.blank();
51 return existingRecords;
52 }
53 }
54
55 if (forceRefresh) {
56 log.info('🔄 Force refresh — fetching repo via CAR export...');
57 } else {
58 log.info('📦 Fetching repo via CAR export (no rate-limit points consumed)...');
59 }
60
61 const pdsUrl = getPdsUrlFromAgent(agent);
62 const token = await getAgentToken(agent);
63 const carStart = Date.now();
64 const carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, undefined, token);
65 const carElapsed = ((Date.now() - carStart) / 1000).toFixed(1);
66
67 const existingRecords = new Map<string, ExistingRecord>();
68 const cacheMap = new Map<string, { uri: string; cid: string; value: any }>();
69
70 for (const rec of carRecords) {
71 const playRecord = rec.value as PlayRecord;
72 const entry = { uri: rec.uri, cid: rec.cid, value: playRecord };
73 existingRecords.set(createRecordKey(playRecord), entry);
74 cacheMap.set(rec.uri, entry);
75 }
76
77 log.success(`✓ Loaded ${existingRecords.size.toLocaleString()} records via CAR in ${carElapsed}s`);
78 saveCache(did, cacheMap);
79 log.blank();
80 return existingRecords;
81}
82
83/**
84 * Fetch ALL existing play records as an array (including duplicates) via CAR export.
85 * Used by the deduplicate flow.
86 */
87export async function fetchAllRecords(
88 agent: AtpAgent,
89 config: Config
90): Promise<ExistingRecord[]> {
91 const { RECORD_TYPE } = config;
92 const did = agent.session?.did;
93
94 if (!did) {
95 throw new Error('No authenticated session found');
96 }
97
98 ui.startSpinner('📦 Fetching repo via CAR export...');
99
100 const pdsUrl = getPdsUrlFromAgent(agent);
101 const token = await getAgentToken(agent);
102 const carRecords = await fetchRepoViaCAR(pdsUrl, did, RECORD_TYPE, undefined, token);
103 const allRecords: ExistingRecord[] = carRecords.map((rec) => ({
104 uri: rec.uri,
105 cid: rec.cid,
106 value: rec.value as PlayRecord,
107 }));
108
109 ui.succeedSpinner(`Found ${allRecords.length.toLocaleString()} records via CAR`);
110 return allRecords;
111}
112
113/**
114 * Create a unique key for a play record based on its essential properties.
115 */
116export function createRecordKey(record: PlayRecord): string {
117 const artist = (record.artists[0]?.artistName ?? '').toLowerCase().trim();
118 const track = record.trackName.toLowerCase().trim();
119 return `${artist}|||${track}|||${record.playedTime}`;
120}
121
122/**
123 * Deduplicate input records before submission.
124 * Keeps the first occurrence of each duplicate.
125 */
126export function deduplicateInputRecords(records: PlayRecord[]): { unique: PlayRecord[]; duplicates: number } {
127 const seen = new Map<string, PlayRecord>();
128 let duplicates = 0;
129
130 for (const record of records) {
131 const key = createRecordKey(record);
132 if (!seen.has(key)) {
133 seen.set(key, record);
134 } else {
135 duplicates++;
136 }
137 }
138
139 return { unique: Array.from(seen.values()), duplicates };
140}
141
142/**
143 * Filter out records that already exist in Teal.
144 */
145export function filterNewRecords(
146 lastfmRecords: PlayRecord[],
147 existingRecords: Map<string, ExistingRecord>
148): PlayRecord[] {
149 log.section('Identifying New Records');
150
151 const newRecords: PlayRecord[] = [];
152 const duplicates: PlayRecord[] = [];
153
154 for (const record of lastfmRecords) {
155 if (existingRecords.has(createRecordKey(record))) {
156 duplicates.push(record);
157 } else {
158 newRecords.push(record);
159 }
160 }
161
162 log.info(`Total: ${lastfmRecords.length.toLocaleString()} records`);
163 log.info(`Existing: ${duplicates.length.toLocaleString()} already in Teal`);
164 log.info(`New: ${newRecords.length.toLocaleString()} to import`);
165 log.blank();
166
167 if (log.getLevel() <= 0 && duplicates.length > 0) {
168 const exampleCount = Math.min(3, duplicates.length);
169 log.debug('Examples of existing records (skipped):');
170 duplicates.slice(0, exampleCount).forEach((record, i) => {
171 log.debug(` ${i + 1}. ${record.artists[0]?.artistName} - ${record.trackName}`);
172 log.debug(` ${formatDate(record.playedTime, true)}`);
173 });
174 if (duplicates.length > exampleCount) {
175 log.debug(` ... and ${(duplicates.length - exampleCount).toLocaleString()} more`);
176 }
177 log.blank();
178 }
179
180 return newRecords;
181}
182
183/**
184 * Get time range of records.
185 */
186export function getRecordTimeRange(records: PlayRecord[]): { earliest: Date; latest: Date } | null {
187 if (records.length === 0) return null;
188 const times = records.map(r => new Date(r.playedTime).getTime());
189 return {
190 earliest: new Date(Math.min(...times)),
191 latest: new Date(Math.max(...times)),
192 };
193}
194
195/**
196 * Display sync statistics.
197 */
198export function displaySyncStats(
199 lastfmRecords: PlayRecord[],
200 existingRecords: Map<string, ExistingRecord>,
201 newRecords: PlayRecord[]
202): void {
203 const lastfmRange = getRecordTimeRange(lastfmRecords);
204 const existingArray = Array.from(existingRecords.values()).map(r => r.value);
205 const existingRange = getRecordTimeRange(existingArray);
206
207 log.section('Sync Statistics');
208 log.info(`Last.fm export: ${lastfmRecords.length.toLocaleString()} records`);
209 if (lastfmRange) {
210 log.info(` Range: ${formatDateRange(lastfmRange.earliest, lastfmRange.latest)}`);
211 }
212 log.blank();
213
214 log.info(`Teal current: ${existingRecords.size.toLocaleString()} records`);
215 if (existingRange) {
216 log.info(` Range: ${formatDateRange(existingRange.earliest, existingRange.latest)}`);
217 }
218 log.blank();
219
220 log.info(`New to import: ${newRecords.length.toLocaleString()}`);
221 log.info(`Duplicates: ${(lastfmRecords.length - newRecords.length).toLocaleString()} skipped`);
222 log.info(`Match rate: ${((1 - newRecords.length / lastfmRecords.length) * 100).toFixed(1)}%`);
223 log.blank();
224}
225
226/**
227 * Find duplicate records in the existing records.
228 * Returns groups of duplicates (each group has 2+ records with the same key).
229 */
230export function findDuplicates(allRecords: ExistingRecord[]): DuplicateGroup[] {
231 const keyGroups = new Map<string, ExistingRecord[]>();
232
233 for (const record of allRecords) {
234 const key = createRecordKey(record.value);
235 if (!keyGroups.has(key)) keyGroups.set(key, []);
236 keyGroups.get(key)!.push(record);
237 }
238
239 const duplicates: DuplicateGroup[] = [];
240 for (const [key, records] of keyGroups) {
241 if (records.length > 1) duplicates.push({ key, records });
242 }
243 return duplicates;
244}
245
246/**
247 * Remove duplicate records from Teal, keeping only the first occurrence.
248 */
249export async function removeDuplicates(
250 agent: AtpAgent,
251 config: Config,
252 dryRun: boolean = false
253): Promise<{ totalDuplicates: number; recordsRemoved: number }> {
254 ui.header('Checking for Duplicate Records');
255
256 const allRecords = await fetchAllRecords(agent, config);
257
258 ui.startSpinner('Analyzing records for duplicates...');
259 const duplicateGroups = findDuplicates(allRecords);
260
261 if (duplicateGroups.length === 0) {
262 ui.succeedSpinner('No duplicates found!');
263 return { totalDuplicates: 0, recordsRemoved: 0 };
264 }
265
266 ui.stopSpinner();
267
268 const totalDuplicates = duplicateGroups.reduce((sum, group) => sum + (group.records.length - 1), 0);
269
270 ui.warning(`Found ${duplicateGroups.length.toLocaleString()} duplicate groups (${totalDuplicates.toLocaleString()} records to remove)`);
271 console.log('');
272
273 const exampleCount = Math.min(5, duplicateGroups.length);
274 ui.subheader('Examples of Duplicates:');
275 for (let i = 0; i < exampleCount; i++) {
276 const group = duplicateGroups[i];
277 const firstRecord = group.records[0].value;
278 console.log(` ${i + 1}. ${firstRecord.artists[0]?.artistName} - ${firstRecord.trackName}`);
279 console.log(` ${formatDate(firstRecord.playedTime, true)} · ${group.records.length - 1} duplicate(s)`);
280 }
281 if (duplicateGroups.length > exampleCount) {
282 console.log(` ... and ${duplicateGroups.length - exampleCount} more groups`);
283 }
284 console.log('');
285
286 if (dryRun) {
287 ui.info('DRY RUN: No records were removed.');
288 return { totalDuplicates, recordsRemoved: 0 };
289 }
290
291 console.log('');
292 const progressBar = ui.createProgressBar(totalDuplicates, 'Removing duplicates');
293 let recordsRemoved = 0;
294 const startTime = Date.now();
295
296 for (const group of duplicateGroups) {
297 for (const record of group.records.slice(1)) {
298 try {
299 await agent.com.atproto.repo.deleteRecord({
300 repo: agent.session?.did || '',
301 collection: record.value.$type,
302 rkey: record.uri.split('/').pop()!,
303 });
304 recordsRemoved++;
305 const elapsed = (Date.now() - startTime) / 1000;
306 progressBar.update(recordsRemoved, { speed: recordsRemoved / Math.max(elapsed, 0.1) });
307 } catch {
308 // continue on individual failures
309 }
310 await new Promise(resolve => setTimeout(resolve, 100));
311 }
312 }
313
314 progressBar.stop();
315 console.log('');
316 ui.success(`Removed ${recordsRemoved.toLocaleString()} duplicate records`);
317 ui.info(`Kept ${duplicateGroups.length.toLocaleString()} unique records`);
318
319 return { totalDuplicates, recordsRemoved };
320}