[WIP] music platform user data scraper
teal-fm atproto
32
fork

Configure Feed

Select the types of activity you want to include in your feed.

at bb519076bd1c6e39165c45ace1f16abe50aa47d2 471 lines 12 kB view raw
1package db 2 3import ( 4 "database/sql" 5 "encoding/json" 6 "fmt" 7 "os" 8 "path/filepath" 9 "time" 10 11 _ "github.com/mattn/go-sqlite3" 12 "github.com/teal-fm/piper/models" 13) 14 15type DB struct { 16 *sql.DB 17} 18 19func New(dbPath string) (*DB, error) { 20 dir := filepath.Dir(dbPath) 21 if dir != "." && dir != "/" { 22 os.MkdirAll(dir, 755) 23 } 24 25 db, err := sql.Open("sqlite3", dbPath) 26 if err != nil { 27 return nil, err 28 } 29 30 // Test the connection 31 if err = db.Ping(); err != nil { 32 return nil, err 33 } 34 35 return &DB{db}, nil 36} 37 38func (db *DB) Initialize() error { 39 _, err := db.Exec(` 40 CREATE TABLE IF NOT EXISTS users ( 41 id INTEGER PRIMARY KEY AUTOINCREMENT, 42 username TEXT, -- Made nullable, might not have username initially 43 email TEXT UNIQUE, -- Made nullable 44 atproto_did TEXT UNIQUE, -- Atproto DID (identifier) 45 atproto_pds_url TEXT, 46 atproto_authserver_issuer TEXT, 47 atproto_access_token TEXT, -- Atproto access token 48 atproto_refresh_token TEXT, -- Atproto refresh token 49 atproto_token_expiry TIMESTAMP, -- Atproto token expiry 50 atproto_sub TEXT, 51 atproto_scope TEXT, -- Atproto token scope 52 atproto_token_type TEXT, -- Atproto token type 53 atproto_authserver_nonce TEXT, 54 atproto_pds_nonce TEXT, 55 atproto_dpop_private_jwk TEXT, 56 spotify_id TEXT UNIQUE, -- Spotify specific ID 57 access_token TEXT, -- Spotify access token 58 refresh_token TEXT, -- Spotify refresh token 59 token_expiry TIMESTAMP, -- Spotify token expiry 60 lastfm_username TEXT, -- Last.fm username 61 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- Use default 62 updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- Use default 63 )`) 64 if err != nil { 65 return err 66 } 67 68 _, err = db.Exec(` 69 CREATE TABLE IF NOT EXISTS tracks ( 70 id INTEGER PRIMARY KEY AUTOINCREMENT, 71 user_id INTEGER NOT NULL, 72 name TEXT NOT NULL, 73 recording_mbid TEXT, -- Added 74 artist TEXT NOT NULL, -- should be JSONB in PostgreSQL if we ever switch 75 album TEXT NOT NULL, 76 release_mbid TEXT, -- Added 77 url TEXT NOT NULL, 78 timestamp TIMESTAMP, 79 duration_ms INTEGER, 80 progress_ms INTEGER, 81 service_base_url TEXT, 82 isrc TEXT, 83 has_stamped BOOLEAN, 84 FOREIGN KEY (user_id) REFERENCES users(id) 85 )`) 86 if err != nil { 87 return err 88 } 89 90 _, err = db.Exec(` 91 CREATE TABLE IF NOT EXISTS atproto_auth_data ( 92 id INTEGER PRIMARY KEY AUTOINCREMENT, 93 state TEXT NOT NULL, 94 did TEXT, 95 pds_url TEXT NOT NULL, 96 authserver_issuer TEXT NOT NULL, 97 pkce_verifier TEXT NOT NULL, 98 dpop_authserver_nonce TEXT NOT NULL, 99 dpop_private_jwk TEXT NOT NULL, 100 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 101 )`) 102 if err != nil { 103 return err 104 } 105 106 // Add columns recording_mbid and release_mbid to tracks table if they don't exist 107 _, err = db.Exec(`ALTER TABLE tracks ADD COLUMN recording_mbid TEXT`) 108 if err != nil && err.Error() != "duplicate column name: recording_mbid" { 109 // Handle errors other than 'duplicate column' 110 return err 111 } 112 _, err = db.Exec(`ALTER TABLE tracks ADD COLUMN release_mbid TEXT`) 113 if err != nil && err.Error() != "duplicate column name: release_mbid" { 114 // Handle errors other than 'duplicate column' 115 return err 116 } 117 118 return nil 119} 120 121// create user without spotify id 122func (db *DB) CreateUser(user *models.User) (int64, error) { 123 now := time.Now().UTC() 124 125 result, err := db.Exec(` 126 INSERT INTO users (username, email, created_at, updated_at) 127 VALUES (?, ?, ?, ?)`, 128 user.Username, user.Email, now, now) 129 130 if err != nil { 131 return 0, err 132 } 133 134 return result.LastInsertId() 135} 136 137// add spotify session to user, returning the updated user 138func (db *DB) AddSpotifySession(userID int64, username, email, spotifyId, accessToken, refreshToken string, tokenExpiry time.Time) (*models.User, error) { 139 now := time.Now().UTC() 140 141 _, err := db.Exec(` 142 UPDATE users SET username = ?, email = ?, spotify_id = ?, access_token = ?, refresh_token = ?, token_expiry = ?, created_at = ?, updated_at = ? 143 WHERE id == ? 144 `, 145 username, email, spotifyId, accessToken, refreshToken, tokenExpiry, now, now, userID) 146 if err != nil { 147 return nil, err 148 } 149 150 user, err := db.GetUserByID(userID) 151 if err != nil { 152 return nil, err 153 } 154 155 return user, err 156} 157 158func (db *DB) GetUserByID(ID int64) (*models.User, error) { 159 user := &models.User{} 160 161 err := db.QueryRow(` 162 SELECT id, username, email, atproto_did, spotify_id, access_token, refresh_token, token_expiry, lastfm_username, created_at, updated_at 163 FROM users WHERE id = ?`, ID).Scan( 164 &user.ID, &user.Username, &user.Email, &user.ATProtoDID, &user.SpotifyID, 165 &user.AccessToken, &user.RefreshToken, &user.TokenExpiry, 166 &user.LastFMUsername, 167 &user.CreatedAt, &user.UpdatedAt) 168 169 if err == sql.ErrNoRows { 170 return nil, nil 171 } 172 173 if err != nil { 174 return nil, err 175 } 176 177 return user, nil 178} 179 180func (db *DB) GetUserBySpotifyID(spotifyID string) (*models.User, error) { 181 user := &models.User{} 182 183 err := db.QueryRow(` 184 SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, lastfm_username, created_at, updated_at 185 FROM users WHERE spotify_id = ?`, spotifyID).Scan( 186 &user.ID, &user.Username, &user.Email, &user.SpotifyID, 187 &user.AccessToken, &user.RefreshToken, &user.TokenExpiry, 188 &user.LastFMUsername, 189 &user.CreatedAt, &user.UpdatedAt) 190 191 if err == sql.ErrNoRows { 192 return nil, nil 193 } 194 195 if err != nil { 196 return nil, err 197 } 198 199 return user, nil 200} 201 202func (db *DB) UpdateUserToken(userID int64, accessToken, refreshToken string, expiry time.Time) error { 203 now := time.Now().UTC() 204 205 _, err := db.Exec(` 206 UPDATE users 207 SET access_token = ?, refresh_token = ?, token_expiry = ?, updated_at = ? 208 WHERE id = ?`, 209 accessToken, refreshToken, expiry, now, userID) 210 211 return err 212} 213 214func (db *DB) SaveTrack(userID int64, track *models.Track) (int64, error) { 215 // marshal artist json 216 artistString := "" 217 if len(track.Artist) > 0 { 218 bytes, err := json.Marshal(track.Artist) 219 if err != nil { 220 return 0, err 221 } else { 222 artistString = string(bytes) 223 } 224 } 225 226 var trackID int64 227 228 err := db.QueryRow(` 229 INSERT INTO tracks (user_id, name, recording_mbid, artist, album, release_mbid, url, timestamp, duration_ms, progress_ms, service_base_url, isrc, has_stamped) 230 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 231 RETURNING id`, 232 userID, track.Name, track.RecordingMBID, artistString, track.Album, track.ReleaseMBID, track.URL, track.Timestamp, 233 track.DurationMs, track.ProgressMs, track.ServiceBaseUrl, track.ISRC, track.HasStamped).Scan(&trackID) 234 235 return trackID, err 236} 237 238func (db *DB) UpdateTrack(trackID int64, track *models.Track) error { 239 // marshal artist json 240 artistString := "" 241 if len(track.Artist) > 0 { 242 bytes, err := json.Marshal(track.Artist) 243 if err != nil { 244 return err 245 } else { 246 artistString = string(bytes) 247 } 248 } 249 250 _, err := db.Exec(` 251 UPDATE tracks 252 SET name = ?, 253 recording_mbid = ?, 254 artist = ?, 255 album = ?, 256 release_mbid = ?, 257 url = ?, 258 timestamp = ?, 259 duration_ms = ?, 260 progress_ms = ?, 261 service_base_url = ?, 262 isrc = ?, 263 has_stamped = ? 264 WHERE id = ?`, 265 track.Name, track.RecordingMBID, artistString, track.Album, track.ReleaseMBID, track.URL, track.Timestamp, 266 track.DurationMs, track.ProgressMs, track.ServiceBaseUrl, track.ISRC, track.HasStamped, 267 trackID) 268 269 return err 270} 271 272func (db *DB) GetRecentTracks(userID int64, limit int) ([]*models.Track, error) { 273 rows, err := db.Query(` 274 SELECT id, name, recording_mbid, artist, album, release_mbid, url, timestamp, duration_ms, progress_ms, service_base_url, isrc, has_stamped 275 FROM tracks 276 WHERE user_id = ? 277 ORDER BY timestamp DESC 278 LIMIT ?`, userID, limit) 279 280 if err != nil { 281 return nil, err 282 } 283 defer rows.Close() 284 285 var tracks []*models.Track 286 287 for rows.Next() { 288 var artistString string 289 track := &models.Track{} 290 err := rows.Scan( 291 &track.PlayID, 292 &track.Name, 293 &track.RecordingMBID, // Scan new field 294 &artistString, // scan to be unmarshaled later 295 &track.Album, 296 &track.ReleaseMBID, // Scan new field 297 &track.URL, 298 &track.Timestamp, 299 &track.DurationMs, 300 &track.ProgressMs, 301 &track.ServiceBaseUrl, 302 &track.ISRC, 303 &track.HasStamped, 304 ) 305 306 if err != nil { 307 return nil, err 308 } 309 310 // unmarshal artist json 311 var artists []models.Artist 312 err = json.Unmarshal([]byte(artistString), &artists) 313 if err != nil { 314 // fallback to previous format 315 artists = []models.Artist{{Name: artistString}} 316 } 317 track.Artist = artists 318 tracks = append(tracks, track) 319 } 320 321 return tracks, nil 322} 323 324// SpotifyQueryMapping maps Spotify sql query results to user structs 325func SpotifyQueryMapping(rows *sql.Rows) ([]*models.User, error) { 326 327 var users []*models.User 328 329 for rows.Next() { 330 user := &models.User{} 331 err := rows.Scan( 332 &user.ID, &user.Username, &user.Email, &user.SpotifyID, 333 &user.AccessToken, &user.RefreshToken, &user.TokenExpiry, 334 &user.CreatedAt, &user.UpdatedAt) 335 if err != nil { 336 return nil, err 337 } 338 users = append(users, user) 339 } 340 341 return users, nil 342} 343 344func (db *DB) GetUsersWithExpiredTokens() ([]*models.User, error) { 345 rows, err := db.Query(` 346 SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at 347 FROM users 348 WHERE refresh_token IS NOT NULL AND token_expiry < ? 349 ORDER BY id`, time.Now().UTC()) 350 351 if err != nil { 352 return nil, err 353 } 354 defer rows.Close() 355 356 return SpotifyQueryMapping(rows) 357 358} 359 360func (db *DB) GetAllActiveUsers() ([]*models.User, error) { 361 rows, err := db.Query(` 362 SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at 363 FROM users 364 WHERE access_token IS NOT NULL 365 ORDER BY id`) 366 367 if err != nil { 368 return nil, err 369 } 370 defer rows.Close() 371 372 return SpotifyQueryMapping(rows) 373} 374 375func (db *DB) GetAllActiveUsersWithUnExpiredTokens() ([]*models.User, error) { 376 rows, err := db.Query(` 377 SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at 378 FROM users 379 WHERE access_token IS NOT NULL AND token_expiry > ? 380 ORDER BY id`, time.Now().UTC()) 381 382 if err != nil { 383 return nil, err 384 } 385 defer rows.Close() 386 387 return SpotifyQueryMapping(rows) 388} 389 390// debug to view current user's information 391// put everything in an 'any' type 392func (db *DB) DebugViewUserInformation(userID int64) (map[string]any, error) { 393 // Use Query instead of QueryRow to get access to column names and ensure only one row is processed 394 rows, err := db.Query(` 395 SELECT * 396 FROM users 397 WHERE id = ? LIMIT 1`, userID) 398 if err != nil { 399 return nil, fmt.Errorf("query failed: %w", err) 400 } 401 defer rows.Close() 402 403 // Get column names 404 cols, err := rows.Columns() 405 if err != nil { 406 return nil, fmt.Errorf("failed to get columns: %w", err) 407 } 408 409 // Check if there's a row to process 410 if !rows.Next() { 411 if err := rows.Err(); err != nil { 412 // Error during rows.Next() or preparing the result set 413 return nil, fmt.Errorf("error checking for row: %w", err) 414 } 415 // No rows found, which is a valid outcome but might be considered an error in some contexts. 416 // Returning sql.ErrNoRows is conventional. 417 return nil, sql.ErrNoRows 418 } 419 420 // Prepare scan arguments: pointers to interface{} slices 421 values := make([]any, len(cols)) 422 scanArgs := make([]any, len(cols)) 423 for i := range values { 424 scanArgs[i] = &values[i] 425 } 426 427 // Scan the row values 428 err = rows.Scan(scanArgs...) 429 if err != nil { 430 return nil, fmt.Errorf("failed to scan row: %w", err) 431 } 432 433 // Check for errors that might have occurred during iteration (after Scan) 434 if err := rows.Err(); err != nil { 435 return nil, fmt.Errorf("error after scanning row: %w", err) 436 } 437 438 // Create the result map 439 resultMap := make(map[string]any, len(cols)) 440 for i, colName := range cols { 441 val := values[i] 442 // SQLite often returns []byte for TEXT columns, convert to string for usability. 443 // Also handle potential nil values appropriately. 444 if b, ok := val.([]byte); ok { 445 resultMap[colName] = string(b) 446 } else { 447 resultMap[colName] = val // Keep nil as nil, numbers as numbers, etc. 448 } 449 } 450 451 return resultMap, nil 452} 453 454func (db *DB) GetLastKnownTimestamp(userID int64) (*time.Time, error) { 455 var lastTimestamp time.Time 456 err := db.QueryRow(` 457 SELECT timestamp 458 FROM tracks 459 WHERE user_id = ? 460 ORDER BY timestamp DESC 461 LIMIT 1`, userID).Scan(&lastTimestamp) 462 463 if err != nil { 464 if err == sql.ErrNoRows { 465 return nil, nil 466 } 467 return nil, fmt.Errorf("failed to query last scrobble timestamp for user %d: %w", userID, err) 468 } 469 470 return &lastTimestamp, nil 471}