[WIP] music platform user data scraper
teal-fm
atproto
1package db
2
3import (
4 "database/sql"
5 "encoding/json"
6 "fmt"
7 "os"
8 "path/filepath"
9 "time"
10
11 _ "github.com/mattn/go-sqlite3"
12 "github.com/teal-fm/piper/models"
13)
14
15type DB struct {
16 *sql.DB
17}
18
19func New(dbPath string) (*DB, error) {
20 dir := filepath.Dir(dbPath)
21 if dir != "." && dir != "/" {
22 os.MkdirAll(dir, 755)
23 }
24
25 db, err := sql.Open("sqlite3", dbPath)
26 if err != nil {
27 return nil, err
28 }
29
30 // Test the connection
31 if err = db.Ping(); err != nil {
32 return nil, err
33 }
34
35 return &DB{db}, nil
36}
37
38func (db *DB) Initialize() error {
39 _, err := db.Exec(`
40 CREATE TABLE IF NOT EXISTS users (
41 id INTEGER PRIMARY KEY AUTOINCREMENT,
42 username TEXT, -- Made nullable, might not have username initially
43 email TEXT UNIQUE, -- Made nullable
44 atproto_did TEXT UNIQUE, -- Atproto DID (identifier)
45 atproto_pds_url TEXT,
46 atproto_authserver_issuer TEXT,
47 atproto_access_token TEXT, -- Atproto access token
48 atproto_refresh_token TEXT, -- Atproto refresh token
49 atproto_token_expiry TIMESTAMP, -- Atproto token expiry
50 atproto_sub TEXT,
51 atproto_scope TEXT, -- Atproto token scope
52 atproto_token_type TEXT, -- Atproto token type
53 atproto_authserver_nonce TEXT,
54 atproto_pds_nonce TEXT,
55 atproto_dpop_private_jwk TEXT,
56 spotify_id TEXT UNIQUE, -- Spotify specific ID
57 access_token TEXT, -- Spotify access token
58 refresh_token TEXT, -- Spotify refresh token
59 token_expiry TIMESTAMP, -- Spotify token expiry
60 lastfm_username TEXT, -- Last.fm username
61 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- Use default
62 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- Use default
63 )`)
64 if err != nil {
65 return err
66 }
67
68 _, err = db.Exec(`
69 CREATE TABLE IF NOT EXISTS tracks (
70 id INTEGER PRIMARY KEY AUTOINCREMENT,
71 user_id INTEGER NOT NULL,
72 name TEXT NOT NULL,
73 recording_mbid TEXT, -- Added
74 artist TEXT NOT NULL, -- should be JSONB in PostgreSQL if we ever switch
75 album TEXT NOT NULL,
76 release_mbid TEXT, -- Added
77 url TEXT NOT NULL,
78 timestamp TIMESTAMP,
79 duration_ms INTEGER,
80 progress_ms INTEGER,
81 service_base_url TEXT,
82 isrc TEXT,
83 has_stamped BOOLEAN,
84 FOREIGN KEY (user_id) REFERENCES users(id)
85 )`)
86 if err != nil {
87 return err
88 }
89
90 _, err = db.Exec(`
91 CREATE TABLE IF NOT EXISTS atproto_auth_data (
92 id INTEGER PRIMARY KEY AUTOINCREMENT,
93 state TEXT NOT NULL,
94 did TEXT,
95 pds_url TEXT NOT NULL,
96 authserver_issuer TEXT NOT NULL,
97 pkce_verifier TEXT NOT NULL,
98 dpop_authserver_nonce TEXT NOT NULL,
99 dpop_private_jwk TEXT NOT NULL,
100 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
101 )`)
102 if err != nil {
103 return err
104 }
105
106 // Add columns recording_mbid and release_mbid to tracks table if they don't exist
107 _, err = db.Exec(`ALTER TABLE tracks ADD COLUMN recording_mbid TEXT`)
108 if err != nil && err.Error() != "duplicate column name: recording_mbid" {
109 // Handle errors other than 'duplicate column'
110 return err
111 }
112 _, err = db.Exec(`ALTER TABLE tracks ADD COLUMN release_mbid TEXT`)
113 if err != nil && err.Error() != "duplicate column name: release_mbid" {
114 // Handle errors other than 'duplicate column'
115 return err
116 }
117
118 return nil
119}
120
121// create user without spotify id
122func (db *DB) CreateUser(user *models.User) (int64, error) {
123 now := time.Now().UTC()
124
125 result, err := db.Exec(`
126 INSERT INTO users (username, email, created_at, updated_at)
127 VALUES (?, ?, ?, ?)`,
128 user.Username, user.Email, now, now)
129
130 if err != nil {
131 return 0, err
132 }
133
134 return result.LastInsertId()
135}
136
137// add spotify session to user, returning the updated user
138func (db *DB) AddSpotifySession(userID int64, username, email, spotifyId, accessToken, refreshToken string, tokenExpiry time.Time) (*models.User, error) {
139 now := time.Now().UTC()
140
141 _, err := db.Exec(`
142 UPDATE users SET username = ?, email = ?, spotify_id = ?, access_token = ?, refresh_token = ?, token_expiry = ?, created_at = ?, updated_at = ?
143 WHERE id == ?
144 `,
145 username, email, spotifyId, accessToken, refreshToken, tokenExpiry, now, now, userID)
146 if err != nil {
147 return nil, err
148 }
149
150 user, err := db.GetUserByID(userID)
151 if err != nil {
152 return nil, err
153 }
154
155 return user, err
156}
157
158func (db *DB) GetUserByID(ID int64) (*models.User, error) {
159 user := &models.User{}
160
161 err := db.QueryRow(`
162 SELECT id, username, email, atproto_did, spotify_id, access_token, refresh_token, token_expiry, lastfm_username, created_at, updated_at
163 FROM users WHERE id = ?`, ID).Scan(
164 &user.ID, &user.Username, &user.Email, &user.ATProtoDID, &user.SpotifyID,
165 &user.AccessToken, &user.RefreshToken, &user.TokenExpiry,
166 &user.LastFMUsername,
167 &user.CreatedAt, &user.UpdatedAt)
168
169 if err == sql.ErrNoRows {
170 return nil, nil
171 }
172
173 if err != nil {
174 return nil, err
175 }
176
177 return user, nil
178}
179
180func (db *DB) GetUserBySpotifyID(spotifyID string) (*models.User, error) {
181 user := &models.User{}
182
183 err := db.QueryRow(`
184 SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, lastfm_username, created_at, updated_at
185 FROM users WHERE spotify_id = ?`, spotifyID).Scan(
186 &user.ID, &user.Username, &user.Email, &user.SpotifyID,
187 &user.AccessToken, &user.RefreshToken, &user.TokenExpiry,
188 &user.LastFMUsername,
189 &user.CreatedAt, &user.UpdatedAt)
190
191 if err == sql.ErrNoRows {
192 return nil, nil
193 }
194
195 if err != nil {
196 return nil, err
197 }
198
199 return user, nil
200}
201
202func (db *DB) UpdateUserToken(userID int64, accessToken, refreshToken string, expiry time.Time) error {
203 now := time.Now().UTC()
204
205 _, err := db.Exec(`
206 UPDATE users
207 SET access_token = ?, refresh_token = ?, token_expiry = ?, updated_at = ?
208 WHERE id = ?`,
209 accessToken, refreshToken, expiry, now, userID)
210
211 return err
212}
213
214func (db *DB) SaveTrack(userID int64, track *models.Track) (int64, error) {
215 // marshal artist json
216 artistString := ""
217 if len(track.Artist) > 0 {
218 bytes, err := json.Marshal(track.Artist)
219 if err != nil {
220 return 0, err
221 } else {
222 artistString = string(bytes)
223 }
224 }
225
226 var trackID int64
227
228 err := db.QueryRow(`
229 INSERT INTO tracks (user_id, name, recording_mbid, artist, album, release_mbid, url, timestamp, duration_ms, progress_ms, service_base_url, isrc, has_stamped)
230 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
231 RETURNING id`,
232 userID, track.Name, track.RecordingMBID, artistString, track.Album, track.ReleaseMBID, track.URL, track.Timestamp,
233 track.DurationMs, track.ProgressMs, track.ServiceBaseUrl, track.ISRC, track.HasStamped).Scan(&trackID)
234
235 return trackID, err
236}
237
238func (db *DB) UpdateTrack(trackID int64, track *models.Track) error {
239 // marshal artist json
240 artistString := ""
241 if len(track.Artist) > 0 {
242 bytes, err := json.Marshal(track.Artist)
243 if err != nil {
244 return err
245 } else {
246 artistString = string(bytes)
247 }
248 }
249
250 _, err := db.Exec(`
251 UPDATE tracks
252 SET name = ?,
253 recording_mbid = ?,
254 artist = ?,
255 album = ?,
256 release_mbid = ?,
257 url = ?,
258 timestamp = ?,
259 duration_ms = ?,
260 progress_ms = ?,
261 service_base_url = ?,
262 isrc = ?,
263 has_stamped = ?
264 WHERE id = ?`,
265 track.Name, track.RecordingMBID, artistString, track.Album, track.ReleaseMBID, track.URL, track.Timestamp,
266 track.DurationMs, track.ProgressMs, track.ServiceBaseUrl, track.ISRC, track.HasStamped,
267 trackID)
268
269 return err
270}
271
272func (db *DB) GetRecentTracks(userID int64, limit int) ([]*models.Track, error) {
273 rows, err := db.Query(`
274 SELECT id, name, recording_mbid, artist, album, release_mbid, url, timestamp, duration_ms, progress_ms, service_base_url, isrc, has_stamped
275 FROM tracks
276 WHERE user_id = ?
277 ORDER BY timestamp DESC
278 LIMIT ?`, userID, limit)
279
280 if err != nil {
281 return nil, err
282 }
283 defer rows.Close()
284
285 var tracks []*models.Track
286
287 for rows.Next() {
288 var artistString string
289 track := &models.Track{}
290 err := rows.Scan(
291 &track.PlayID,
292 &track.Name,
293 &track.RecordingMBID, // Scan new field
294 &artistString, // scan to be unmarshaled later
295 &track.Album,
296 &track.ReleaseMBID, // Scan new field
297 &track.URL,
298 &track.Timestamp,
299 &track.DurationMs,
300 &track.ProgressMs,
301 &track.ServiceBaseUrl,
302 &track.ISRC,
303 &track.HasStamped,
304 )
305
306 if err != nil {
307 return nil, err
308 }
309
310 // unmarshal artist json
311 var artists []models.Artist
312 err = json.Unmarshal([]byte(artistString), &artists)
313 if err != nil {
314 // fallback to previous format
315 artists = []models.Artist{{Name: artistString}}
316 }
317 track.Artist = artists
318 tracks = append(tracks, track)
319 }
320
321 return tracks, nil
322}
323
324// SpotifyQueryMapping maps Spotify sql query results to user structs
325func SpotifyQueryMapping(rows *sql.Rows) ([]*models.User, error) {
326
327 var users []*models.User
328
329 for rows.Next() {
330 user := &models.User{}
331 err := rows.Scan(
332 &user.ID, &user.Username, &user.Email, &user.SpotifyID,
333 &user.AccessToken, &user.RefreshToken, &user.TokenExpiry,
334 &user.CreatedAt, &user.UpdatedAt)
335 if err != nil {
336 return nil, err
337 }
338 users = append(users, user)
339 }
340
341 return users, nil
342}
343
344func (db *DB) GetUsersWithExpiredTokens() ([]*models.User, error) {
345 rows, err := db.Query(`
346 SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at
347 FROM users
348 WHERE refresh_token IS NOT NULL AND token_expiry < ?
349 ORDER BY id`, time.Now().UTC())
350
351 if err != nil {
352 return nil, err
353 }
354 defer rows.Close()
355
356 return SpotifyQueryMapping(rows)
357
358}
359
360func (db *DB) GetAllActiveUsers() ([]*models.User, error) {
361 rows, err := db.Query(`
362 SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at
363 FROM users
364 WHERE access_token IS NOT NULL
365 ORDER BY id`)
366
367 if err != nil {
368 return nil, err
369 }
370 defer rows.Close()
371
372 return SpotifyQueryMapping(rows)
373}
374
375func (db *DB) GetAllActiveUsersWithUnExpiredTokens() ([]*models.User, error) {
376 rows, err := db.Query(`
377 SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at
378 FROM users
379 WHERE access_token IS NOT NULL AND token_expiry > ?
380 ORDER BY id`, time.Now().UTC())
381
382 if err != nil {
383 return nil, err
384 }
385 defer rows.Close()
386
387 return SpotifyQueryMapping(rows)
388}
389
390// debug to view current user's information
391// put everything in an 'any' type
392func (db *DB) DebugViewUserInformation(userID int64) (map[string]any, error) {
393 // Use Query instead of QueryRow to get access to column names and ensure only one row is processed
394 rows, err := db.Query(`
395 SELECT *
396 FROM users
397 WHERE id = ? LIMIT 1`, userID)
398 if err != nil {
399 return nil, fmt.Errorf("query failed: %w", err)
400 }
401 defer rows.Close()
402
403 // Get column names
404 cols, err := rows.Columns()
405 if err != nil {
406 return nil, fmt.Errorf("failed to get columns: %w", err)
407 }
408
409 // Check if there's a row to process
410 if !rows.Next() {
411 if err := rows.Err(); err != nil {
412 // Error during rows.Next() or preparing the result set
413 return nil, fmt.Errorf("error checking for row: %w", err)
414 }
415 // No rows found, which is a valid outcome but might be considered an error in some contexts.
416 // Returning sql.ErrNoRows is conventional.
417 return nil, sql.ErrNoRows
418 }
419
420 // Prepare scan arguments: pointers to interface{} slices
421 values := make([]any, len(cols))
422 scanArgs := make([]any, len(cols))
423 for i := range values {
424 scanArgs[i] = &values[i]
425 }
426
427 // Scan the row values
428 err = rows.Scan(scanArgs...)
429 if err != nil {
430 return nil, fmt.Errorf("failed to scan row: %w", err)
431 }
432
433 // Check for errors that might have occurred during iteration (after Scan)
434 if err := rows.Err(); err != nil {
435 return nil, fmt.Errorf("error after scanning row: %w", err)
436 }
437
438 // Create the result map
439 resultMap := make(map[string]any, len(cols))
440 for i, colName := range cols {
441 val := values[i]
442 // SQLite often returns []byte for TEXT columns, convert to string for usability.
443 // Also handle potential nil values appropriately.
444 if b, ok := val.([]byte); ok {
445 resultMap[colName] = string(b)
446 } else {
447 resultMap[colName] = val // Keep nil as nil, numbers as numbers, etc.
448 }
449 }
450
451 return resultMap, nil
452}
453
454func (db *DB) GetLastKnownTimestamp(userID int64) (*time.Time, error) {
455 var lastTimestamp time.Time
456 err := db.QueryRow(`
457 SELECT timestamp
458 FROM tracks
459 WHERE user_id = ?
460 ORDER BY timestamp DESC
461 LIMIT 1`, userID).Scan(&lastTimestamp)
462
463 if err != nil {
464 if err == sql.ErrNoRows {
465 return nil, nil
466 }
467 return nil, fmt.Errorf("failed to query last scrobble timestamp for user %d: %w", userID, err)
468 }
469
470 return &lastTimestamp, nil
471}