[WIP] music platform user data scraper
teal-fm atproto

add atproto submission to lfm

Natalie B 3d0e6dee 769791e2

Changed files
+260 -126
db
models
oauth
atproto
service
lastfm
musicbrainz
+51 -16
db/atproto.go
··· 109 109 } 110 110 111 111 // create or update the current user's ATproto session data. 112 - func (db *DB) SaveATprotoSession(tokenResp *oauth.TokenResponse, authserverIss string, dpopPrivateJWK jwk.Key) error { 113 - 112 + func (db *DB) SaveATprotoSession(tokenResp *oauth.TokenResponse, authserverIss string, dpopPrivateJWK jwk.Key, pdsUrl string) error { 113 + fmt.Printf("Saving session with PDS url %s", pdsUrl) 114 114 expiryTime := time.Now().Add(time.Second * time.Duration(tokenResp.ExpiresIn)) 115 115 now := time.Now() 116 116 ··· 130 130 atproto_token_type = ?, 131 131 atproto_authserver_nonce = ?, 132 132 atproto_dpop_private_jwk = ?, 133 + atproto_pds_url = ?, 134 + atproto_pds_nonce = ?, 133 135 updated_at = ? 134 136 WHERE atproto_did = ?`, 135 137 tokenResp.AccessToken, ··· 141 143 tokenResp.TokenType, 142 144 tokenResp.DpopAuthserverNonce, 143 145 string(dpopPrivateJWKBytes), 146 + pdsUrl, 147 + // will get set later 148 + "", 144 149 now, 145 150 tokenResp.Sub, 146 151 ) ··· 162 167 return nil 163 168 } 164 169 165 - func (db *DB) GetAtprotoSession(did string, ctx context.Context, oauthClient oauth.Client) (*oauth.TokenResponse, error) { 166 - var oauthSession oauth.TokenResponse 170 + func (db *DB) GetAtprotoSession(did string, ctx context.Context, oauthClient oauth.Client) (*models.ATprotoAuthSession, error) { 171 + var oauthSession models.ATprotoAuthSession 167 172 var authserverIss string 168 173 var jwkBytes string 169 174 170 175 err := db.QueryRow(` 171 - SELECT atproto_access_token, atproto_refresh_token, atproto_token_expiry, atproto_scope, atproto_sub, atproto_authserver_issuer, atproto_token_type, atproto_authserver_nonce, atproto_dpop_private_jwk 176 + SELECT id, atproto_did, atproto_pds_url, atproto_authserver_issuer, atproto_access_token, atproto_refresh_token, atproto_pds_nonce, atproto_authserver_nonce, atproto_dpop_private_jwk, atproto_token_expiry 172 177 FROM users 173 - WHERE atproto_did = ?`, 178 + WHERE atproto_did = ? OR id`, 174 179 did, 175 180 ).Scan( 181 + &oauthSession.ID, 182 + &oauthSession.DID, 183 + &oauthSession.PDSUrl, 184 + &authserverIss, 176 185 &oauthSession.AccessToken, 177 186 &oauthSession.RefreshToken, 178 - &oauthSession.ExpiresIn, 179 - &oauthSession.Scope, 180 - &oauthSession.Sub, 181 - &authserverIss, 182 - &oauthSession.TokenType, 183 - &oauthSession.DpopAuthserverNonce, 187 + &oauthSession.DpopPdsNonce, 188 + &oauthSession.DpopAuthServerNonce, 184 189 &jwkBytes, 190 + &oauthSession.TokenExpiry, 185 191 ) 186 192 187 193 if err != nil { ··· 191 197 privateJwk, err := helpers.ParseJWKFromBytes([]byte(jwkBytes)) 192 198 if err != nil { 193 199 return nil, fmt.Errorf("failed to parse DPoPPrivateJWK: %w", err) 200 + } else { 201 + // add jwk to the struct 202 + oauthSession.DpopPrivateJWK = privateJwk 194 203 } 195 204 205 + // printout the session details 206 + fmt.Printf("Session details from DB: %+v\n", oauthSession) 207 + 196 208 // if token is expired, refresh it 197 - if int64(oauthSession.ExpiresIn) < time.Now().Unix() { 209 + if time.Now().After(oauthSession.TokenExpiry) { 198 210 199 - resp, err := oauthClient.RefreshTokenRequest(ctx, oauthSession.RefreshToken, authserverIss, oauthSession.DpopAuthserverNonce, privateJwk) 211 + resp, err := oauthClient.RefreshTokenRequest(ctx, oauthSession.RefreshToken, authserverIss, oauthSession.DpopAuthServerNonce, privateJwk) 200 212 if err != nil { 201 213 return nil, err 202 214 } 203 215 204 - if err := db.SaveATprotoSession(resp, authserverIss, privateJwk); err != nil { 216 + if err := db.SaveATprotoSession(resp, authserverIss, privateJwk, oauthSession.PDSUrl); err != nil { 205 217 return nil, fmt.Errorf("failed to save refreshed token: %w", err) 206 218 } 207 219 208 - oauthSession = *resp 220 + oauthSession = models.ATprotoAuthSession{ 221 + ID: oauthSession.ID, 222 + DID: oauthSession.DID, 223 + PDSUrl: oauthSession.PDSUrl, 224 + AuthServerIssuer: authserverIss, 225 + AccessToken: resp.AccessToken, 226 + RefreshToken: resp.RefreshToken, 227 + DpopPdsNonce: oauthSession.DpopPdsNonce, 228 + DpopAuthServerNonce: resp.DpopAuthserverNonce, 229 + DpopPrivateJWK: privateJwk, 230 + TokenExpiry: time.Now().Add(time.Duration(resp.ExpiresIn) * time.Second), 231 + } 209 232 210 233 } 211 234 212 235 return &oauthSession, nil 213 236 } 237 + 238 + func AtpSessionToAuthArgs(sess *models.ATprotoAuthSession) *oauth.XrpcAuthedRequestArgs { 239 + fmt.Printf("DID: %s\nPDS URL: %s\nISS: %s\nAccess Token: %s\nNonce: %s\nPrivate JWK: %s\n", sess.DID, sess.PDSUrl, sess.AuthServerIssuer, sess.AccessToken, sess.DpopPdsNonce, sess.DpopPrivateJWK) 240 + return &oauth.XrpcAuthedRequestArgs{ 241 + Did: sess.DID, 242 + PdsUrl: sess.PDSUrl, 243 + Issuer: sess.AuthServerIssuer, 244 + AccessToken: sess.AccessToken, 245 + DpopPdsNonce: sess.DpopPdsNonce, 246 + DpopPrivateJwk: sess.DpopPrivateJWK, 247 + } 248 + }
+7 -3
db/db.go
··· 42 42 username TEXT, -- Made nullable, might not have username initially 43 43 email TEXT UNIQUE, -- Made nullable 44 44 atproto_did TEXT UNIQUE, -- Atproto DID (identifier) 45 + atproto_pds_url TEXT, 45 46 atproto_authserver_issuer TEXT, 46 47 atproto_access_token TEXT, -- Atproto access token 47 48 atproto_refresh_token TEXT, -- Atproto refresh token ··· 50 51 atproto_scope TEXT, -- Atproto token scope 51 52 atproto_token_type TEXT, -- Atproto token type 52 53 atproto_authserver_nonce TEXT, 54 + atproto_pds_nonce TEXT, 53 55 atproto_dpop_private_jwk TEXT, 54 56 spotify_id TEXT UNIQUE, -- Spotify specific ID 55 57 access_token TEXT, -- Spotify access token ··· 157 159 user := &models.User{} 158 160 159 161 err := db.QueryRow(` 160 - SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at 162 + SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, lastfm_username, created_at, updated_at 161 163 FROM users WHERE id = ?`, ID).Scan( 162 164 &user.ID, &user.Username, &user.Email, &user.SpotifyID, 163 165 &user.AccessToken, &user.RefreshToken, &user.TokenExpiry, 166 + &user.LastFMUsername, 164 167 &user.CreatedAt, &user.UpdatedAt) 165 168 166 169 if err == sql.ErrNoRows { ··· 178 181 user := &models.User{} 179 182 180 183 err := db.QueryRow(` 181 - SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at 184 + SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, lastfm_username, created_at, updated_at 182 185 FROM users WHERE spotify_id = ?`, spotifyID).Scan( 183 186 &user.ID, &user.Username, &user.Email, &user.SpotifyID, 184 187 &user.AccessToken, &user.RefreshToken, &user.TokenExpiry, 188 + &user.LastFMUsername, 185 189 &user.CreatedAt, &user.UpdatedAt) 186 190 187 191 if err == sql.ErrNoRows { ··· 439 443 return resultMap, nil 440 444 } 441 445 442 - func (db *DB) GetLastScrobbleTimestamp(userID int64) (*time.Time, error) { 446 + func (db *DB) GetLastKnownTimestamp(userID int64) (*time.Time, error) { 443 447 var lastTimestamp time.Time 444 448 err := db.QueryRow(` 445 449 SELECT timestamp
+4 -7
db/lfm.go
··· 15 15 16 16 func (db *DB) GetAllUsersWithLastFM() ([]*models.User, error) { 17 17 rows, err := db.Query(` 18 - SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at, lastfm_username 18 + SELECT id, username, email, lastfm_username 19 19 FROM users 20 20 WHERE lastfm_username IS NOT NULL 21 21 ORDER BY id`) ··· 30 30 for rows.Next() { 31 31 user := &models.User{} 32 32 err := rows.Scan( 33 - &user.ID, &user.Username, &user.Email, &user.SpotifyID, 34 - &user.AccessToken, &user.RefreshToken, &user.TokenExpiry, 35 - &user.CreatedAt, &user.UpdatedAt, &user.LastFMUsername) 33 + &user.ID, &user.Username, &user.Email, &user.LastFMUsername) 36 34 if err != nil { 37 35 return nil, err 38 36 } ··· 44 42 45 43 func (db *DB) GetUserByLastFM(lastfmUsername string) (*models.User, error) { 46 44 row := db.QueryRow(` 47 - SELECT id, username, email, spotify_id, access_token, refresh_token, token_expiry, created_at, updated_at, lastfm_username 45 + SELECT id, username, email, atproto_did, created_at, updated_at, lastfm_username 48 46 FROM users 49 47 WHERE lastfm_username = ?`, lastfmUsername) 50 48 51 49 user := &models.User{} 52 50 err := row.Scan( 53 - &user.ID, &user.Username, &user.Email, &user.SpotifyID, 54 - &user.AccessToken, &user.RefreshToken, &user.TokenExpiry, 51 + &user.ID, &user.Username, &user.Email, &user.ATProtoDID, 55 52 &user.CreatedAt, &user.UpdatedAt, &user.LastFMUsername) 56 53 if err != nil { 57 54 return nil, err
+3 -2
main.go
··· 32 32 33 33 if isLoggedIn { 34 34 user, err := database.GetUserByID(userID) 35 + fmt.Printf("User: %+v\n", user) 35 36 if err == nil && user != nil && user.LastFMUsername != nil { 36 37 lastfmUsername = *user.LastFMUsername 37 38 } else if err != nil { ··· 360 361 361 362 mbService := musicbrainz.NewMusicBrainzService(database) 362 363 spotifyService := spotify.NewSpotifyService(database, atprotoService, mbService) 363 - lastfmService := lastfm.NewLastFMService(database, viper.GetString("lastfm.api_key"), mbService) 364 + lastfmService := lastfm.NewLastFMService(database, viper.GetString("lastfm.api_key"), mbService, atprotoService) 364 365 365 366 sessionManager := session.NewSessionManager(database) 366 367 oauthManager := oauth.NewOAuthServiceManager(sessionManager) ··· 409 410 trackerInterval := time.Duration(viper.GetInt("tracker.interval")) * time.Second 410 411 lastfmInterval := time.Duration(viper.GetInt("lastfm.interval_seconds")) * time.Second // Add config for Last.fm interval 411 412 if lastfmInterval <= 0 { 412 - lastfmInterval = 1 * time.Minute 413 + lastfmInterval = 30 * time.Second 413 414 } 414 415 415 416 if err := spotifyService.LoadAllUsers(); err != nil {
+46 -11
models/atproto.go
··· 1 1 package models 2 2 3 3 import ( 4 + "encoding/json" 5 + "fmt" 6 + "io" 4 7 "time" 5 8 6 9 "github.com/lestrrat-go/jwx/v2/jwk" 10 + cbg "github.com/whyrusleeping/cbor-gen" 7 11 ) 8 12 9 13 type ATprotoAuthData struct { ··· 17 21 DPoPPrivateJWK jwk.Key `json:"dpop_private_jwk"` 18 22 } 19 23 20 - type TealFmPlayLexicon struct { 21 - Type string `json:"$type"` 22 - Duration int `json:"duration"` 23 - TrackName string `json:"trackName"` 24 - PlayedTime time.Time `json:"playedTime"` 25 - ArtistMbIDs []string `json:"artistMbIds"` 26 - ArtistNames []string `json:"artistNames"` 27 - ReleaseMbID string `json:"releaseMbId"` 28 - ReleaseName string `json:"releaseName"` 29 - RecordingMbID string `json:"recordingMbId"` 30 - SubmissionClientAgent string `json:"submissionClientAgent"` 24 + type ATprotoAuthSession struct { 25 + ID string `json:"id"` 26 + DID string `json:"did"` 27 + PDSUrl string `json:"pds_url"` 28 + AuthServerIssuer string `json:"authserver_issuer"` 29 + AccessToken string `json:"access_token"` 30 + RefreshToken string `json:"refresh_token"` 31 + DpopPdsNonce string `json:"dpop_pds_nonce"` 32 + DpopAuthServerNonce string `json:"dpop_authserver_nonce"` 33 + DpopPrivateJWK jwk.Key `json:"dpop_private_jwk"` 34 + TokenExpiry time.Time `json:"expires_at"` 35 + } 36 + 37 + type TealFmFeedPlay struct { 38 + Type string `json:"$type"` 39 + Duration int `json:"duration"` 40 + TrackName string `json:"trackName"` 41 + PlayedTime string `json:"playedTime"` 42 + ArtistMbIDs []string `json:"artistMbIds"` 43 + ArtistNames []string `json:"artistNames"` 44 + ReleaseMBID string `json:"releaseMbId"` 45 + ReleaseName string `json:"releaseName"` 46 + RecordingMBID string `json:"recordingMbId"` 47 + SubmissionClientAgent string `json:"submissionClientAgent"` 48 + } 49 + 50 + func (tfmTrack TealFmFeedPlay) MarshalCBOR(w io.Writer) error { 51 + // in case the pointer is nil 52 + // if tfmTrack == nil { 53 + // return fmt.Errorf("cannot marshal nil TealFmFeedPlay") 54 + // } 55 + fmt.Println("Marshalling", tfmTrack) 56 + trackBytes, err := json.Marshal(tfmTrack) 57 + if err != nil { 58 + return fmt.Errorf("failed to marshal trackMap to bytes: %w", err) 59 + } 60 + 61 + cw := cbg.NewCborWriter(w) 62 + if err := cbg.WriteByteArray(cw, trackBytes); err != nil { 63 + return fmt.Errorf("failed to write bytes as CBOR: %w", err) 64 + } 65 + return nil 31 66 }
+41 -15
oauth/atproto/atproto.go
··· 12 12 "github.com/lestrrat-go/jwx/v2/jwk" 13 13 "github.com/teal-fm/piper/db" 14 14 "github.com/teal-fm/piper/models" 15 - // woof 16 15 ) 17 16 18 17 type ATprotoAuthService struct { 19 - client *oauth.Client 20 - jwks jwk.Key 21 - DB *db.DB 22 - clientId string 18 + client *oauth.Client 19 + jwks jwk.Key 20 + DB *db.DB 21 + clientId string 22 + callbackUrl string 23 + xrpc *oauth.XrpcClient 23 24 } 24 25 25 26 func NewATprotoAuthService(db *db.DB, jwks jwk.Key, clientId string, callbackUrl string) (*ATprotoAuthService, error) { ··· 32 33 if err != nil { 33 34 return nil, fmt.Errorf("failed to create atproto oauth client: %w", err) 34 35 } 35 - return &ATprotoAuthService{ 36 - client: cli, 37 - jwks: jwks, 38 - DB: db, 39 - clientId: clientId, 40 - }, nil 36 + svc := &ATprotoAuthService{ 37 + client: cli, 38 + jwks: jwks, 39 + callbackUrl: callbackUrl, 40 + DB: db, 41 + clientId: clientId, 42 + } 43 + svc.NewXrpcClient() 44 + return svc, nil 45 + } 46 + 47 + func (a *ATprotoAuthService) GetATProtoClient() (*oauth.Client, error) { 48 + if a.client != nil { 49 + return a.client, nil 50 + } 51 + 52 + if a.client == nil { 53 + cli, err := oauth.NewClient(oauth.ClientArgs{ 54 + ClientJwk: a.jwks, 55 + ClientId: a.clientId, 56 + RedirectUri: a.callbackUrl, 57 + }) 58 + if err != nil { 59 + return nil, fmt.Errorf("failed to create atproto oauth client: %w", err) 60 + } 61 + a.client = cli 62 + } 63 + 64 + return a.client, nil 41 65 } 42 66 43 67 func LoadJwks(jwksBytes []byte) (jwk.Key, error) { ··· 68 92 } 69 93 70 94 func (a *ATprotoAuthService) getLoginUrlAndSaveState(ctx context.Context, handle string) (*url.URL, error) { 71 - scope := "atproto" 95 + scope := "atproto transition:generic" 72 96 // resolve 73 97 ui, err := a.getUserInformation(ctx, handle) 74 98 if err != nil { 75 99 return nil, fmt.Errorf("failed to get user information for %s: %w", handle, err) 76 100 } 77 101 102 + fmt.Println("user info: ", ui.AuthServer, ui.AuthService) 103 + 78 104 // create a dpop jwk for this session 79 105 k, err := helpers.GenerateKey(nil) // Generate ephemeral DPoP key for this flow 80 106 if err != nil { ··· 91 117 data := &models.ATprotoAuthData{ 92 118 State: parResp.State, 93 119 DID: ui.DID, 94 - PDSUrl: ui.AuthServer, 120 + PDSUrl: ui.AuthService, 95 121 AuthServerIssuer: ui.AuthMeta.Issuer, 96 122 PKCEVerifier: parResp.PkceVerifier, 97 123 DPoPAuthServerNonce: parResp.DpopAuthserverNonce, ··· 123 149 func (a *ATprotoAuthService) HandleCallback(w http.ResponseWriter, r *http.Request) (int64, error) { 124 150 state := r.URL.Query().Get("state") 125 151 code := r.URL.Query().Get("code") 126 - issuer := r.URL.Query().Get("iss") // Issuer (PDS URL) is needed for token request 152 + issuer := r.URL.Query().Get("iss") // Issuer (auth base URL) is needed for token request 127 153 128 154 if state == "" || code == "" || issuer == "" { 129 155 errMsg := r.URL.Query().Get("error") ··· 164 190 return 0, fmt.Errorf("failed to find or create user") 165 191 } 166 192 167 - err = a.DB.SaveATprotoSession(resp, data.AuthServerIssuer, data.DPoPPrivateJWK) 193 + err = a.DB.SaveATprotoSession(resp, data.AuthServerIssuer, data.DPoPPrivateJWK, data.PDSUrl) 168 194 if err != nil { 169 195 log.Printf("ATProto Callback Error: Failed to save ATProto tokens for user %d (DID %s): %v", userID.ID, data.DID, err) 170 196 }
+107 -71
service/lastfm/lastfm.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "errors" 6 7 "fmt" 7 8 "io" 8 9 "log" ··· 12 13 "sync" 13 14 "time" 14 15 16 + "github.com/bluesky-social/indigo/api/atproto" 17 + lexutil "github.com/bluesky-social/indigo/lex/util" 18 + "github.com/bluesky-social/indigo/xrpc" 15 19 "github.com/teal-fm/piper/db" 16 20 "github.com/teal-fm/piper/models" 21 + atprotoauth "github.com/teal-fm/piper/oauth/atproto" 22 + "github.com/teal-fm/piper/pkg/lex/teal" 17 23 "github.com/teal-fm/piper/service/musicbrainz" 18 24 "golang.org/x/time/rate" 19 25 ) ··· 23 29 defaultLimit = 1 // Default number of tracks to fetch per user 24 30 ) 25 31 26 - // Structs to represent the Last.fm API response for user.getrecenttracks 27 - type RecentTracksResponse struct { 28 - RecentTracks RecentTracks `json:"recenttracks"` 29 - } 30 - 31 - type RecentTracks struct { 32 - Tracks []Track `json:"track"` 33 - Attr TrackXMLAttr `json:"@attr"` 34 - } 35 - 36 - type Track struct { 37 - Artist Artist `json:"artist"` 38 - Streamable string `json:"streamable"` // Typically "0" or "1" 39 - Image []Image `json:"image"` 40 - MBID string `json:"mbid"` // MusicBrainz ID for the track 41 - Album Album `json:"album"` 42 - Name string `json:"name"` 43 - URL string `json:"url"` 44 - Date *TrackDate `json:"date,omitempty"` // Use pointer for optional fields 45 - Attr *struct { // Custom handling for @attr.nowplaying 46 - NowPlaying string `json:"nowplaying"` // Field name corrected to match struct tag 47 - } `json:"@attr,omitempty"` // This captures the @attr object within the track 48 - } 49 - 50 - type Artist struct { 51 - MBID string `json:"mbid"` // MusicBrainz ID for the artist 52 - Text string `json:"#text"` 53 - } 54 - 55 - type Image struct { 56 - Size string `json:"size"` // "small", "medium", "large", "extralarge" 57 - Text string `json:"#text"` // URL of the image 58 - } 59 - 60 - type Album struct { 61 - MBID string `json:"mbid"` // MusicBrainz ID for the album 62 - Text string `json:"#text"` // Album name 63 - } 64 - 65 - type TrackDate struct { 66 - UTS string `json:"uts"` // Unix timestamp string 67 - Text string `json:"#text"` // Human-readable date string 68 - } 69 - 70 - type TrackXMLAttr struct { 71 - User string `json:"user"` 72 - TotalPages string `json:"totalPages"` 73 - Page string `json:"page"` 74 - PerPage string `json:"perPage"` 75 - Total string `json:"total"` 76 - } 77 - 78 32 type LastFMService struct { 79 33 db *db.DB 80 34 httpClient *http.Client ··· 82 36 apiKey string 83 37 Usernames []string 84 38 musicBrainzService *musicbrainz.MusicBrainzService 39 + atprotoService *atprotoauth.ATprotoAuthService 85 40 lastSeenNowPlaying map[string]Track 86 41 mu sync.Mutex 87 42 } 88 43 89 - func NewLastFMService(db *db.DB, apiKey string, musicBrainzService *musicbrainz.MusicBrainzService) *LastFMService { 44 + func NewLastFMService(db *db.DB, apiKey string, musicBrainzService *musicbrainz.MusicBrainzService, atprotoService *atprotoauth.ATprotoAuthService) *LastFMService { 90 45 return &LastFMService{ 91 46 db: db, 92 47 httpClient: &http.Client{ 93 48 Timeout: 10 * time.Second, 94 49 }, 95 50 // Last.fm unofficial rate limit is ~5 requests per second 96 - limiter: rate.NewLimiter(rate.Every(200*time.Millisecond), 1), 97 - apiKey: apiKey, 98 - Usernames: make([]string, 0), 99 - // lastSeenTrackDate: make(map[string]time.Time), // Removed 51 + limiter: rate.NewLimiter(rate.Every(200*time.Millisecond), 1), 52 + apiKey: apiKey, 53 + Usernames: make([]string, 0), 54 + atprotoService: atprotoService, 100 55 musicBrainzService: musicBrainzService, 101 56 lastSeenNowPlaying: make(map[string]Track), 102 57 mu: sync.Mutex{}, ··· 111 66 } 112 67 usernames := make([]string, len(u)) 113 68 for i, user := range u { 114 - // Assuming the User struct has a LastFMUsername field 69 + // print out user stuff 115 70 if user.LastFMUsername != nil { // Check if the username is set 116 71 usernames[i] = *user.LastFMUsername 117 72 } else { 118 73 log.Printf("User ID %d has Last.fm enabled but no username set", user.ID) 119 - // Handle this case - maybe skip the user or log differently 120 74 } 121 75 } 122 76 123 - // Filter out empty usernames if any were added due to missing data 77 + // filter empty usernames (shouldn't happen?) 124 78 filteredUsernames := make([]string, 0, len(usernames)) 125 79 for _, name := range usernames { 126 80 if name != "" { ··· 279 233 } 280 234 281 235 // Process the fetched tracks 282 - if err := l.processTracks(uname, recentTracks.RecentTracks.Tracks); err != nil { 236 + if err := l.processTracks(ctx, uname, recentTracks.RecentTracks.Tracks); err != nil { 283 237 log.Printf("Error processing tracks for %s: %v", uname, err) 284 238 fetchErrors <- fmt.Errorf("process failed for %s: %w", uname, err) // Report error 285 239 } ··· 303 257 } 304 258 } 305 259 306 - func (l *LastFMService) processTracks(username string, tracks []Track) error { 260 + func (l *LastFMService) processTracks(ctx context.Context, username string, tracks []Track) error { 307 261 if l.db == nil { 308 262 return fmt.Errorf("database connection is nil") 309 263 } ··· 313 267 return fmt.Errorf("failed to get user ID for %s: %w", username, err) 314 268 } 315 269 316 - lastKnownTimestamp, err := l.db.GetLastScrobbleTimestamp(user.ID) 270 + lastKnownTimestamp, err := l.db.GetLastKnownTimestamp(user.ID) 317 271 if err != nil { 318 272 return fmt.Errorf("failed to get last scrobble timestamp for %s: %w", username, err) 319 273 } 320 274 321 - found := lastKnownTimestamp == nil 322 - if found { 275 + if lastKnownTimestamp == nil { 323 276 log.Printf("no previous scrobble timestamp found for user %s. processing latest track.", username) 324 277 } else { 325 278 log.Printf("last known scrobble for %s was at %s", username, lastKnownTimestamp.Format(time.RFC3339)) ··· 369 322 } 370 323 latestTrackTime := time.Unix(uts, 0) 371 324 372 - if found && lastKnownTimestamp.Equal(latestTrackTime) { 325 + // print both 326 + fmt.Printf("latestTrackTime: %s\n", latestTrackTime) 327 + fmt.Printf("lastKnownTimestamp: %s\n", lastKnownTimestamp) 328 + 329 + if lastKnownTimestamp != nil && lastKnownTimestamp.Equal(latestTrackTime) { 373 330 log.Printf("no new tracks to process for user %s.", username) 374 331 return nil 375 332 } ··· 387 344 } 388 345 trackTime := time.Unix(uts, 0) 389 346 390 - if lastKnownTimestamp != nil && trackTime.Before(*lastKnownTimestamp) { 347 + // before or at last known 348 + if lastKnownTimestamp != nil && (trackTime.Before(*lastKnownTimestamp) || trackTime.Equal(*lastKnownTimestamp)) { 391 349 if processedCount == 0 { 392 350 log.Printf("reached already known scrobbles for user %s (track time: %s, last known: %s).", 393 351 username, trackTime.Format(time.RFC3339), lastKnownTimestamp.Format(time.RFC3339)) ··· 404 362 Artist: []models.Artist{ 405 363 { 406 364 Name: track.Artist.Text, 407 - MBID: track.Artist.MBID, 408 365 }, 409 366 }, 367 + // this is submitted after the track has been scrobbled on LFM 368 + HasStamped: true, 410 369 } 411 370 412 371 hydratedTrack, err := musicbrainz.HydrateTrack(l.musicBrainzService, baseTrack) 413 372 if err != nil { 414 373 log.Printf("error hydrating track for user %s: %s - %s: %v", username, track.Artist.Text, track.Name, err) 415 - continue 374 + // we can use the track without MBIDs, it's still valid 375 + hydratedTrack = &baseTrack 416 376 } 417 - 418 377 l.db.SaveTrack(user.ID, hydratedTrack) 378 + log.Printf("Submitting track") 379 + err = l.SubmitTrackToPDS(*user.ATProtoDID, hydratedTrack, ctx) 380 + if err != nil { 381 + log.Printf("error submitting track for user %s: %s - %s: %v", username, track.Artist.Text, track.Name, err) 382 + } 419 383 processedCount++ 420 384 421 385 if trackTime.After(latestProcessedTime) { 422 386 latestProcessedTime = trackTime 423 387 } 424 388 425 - if found { 389 + if lastKnownTimestamp != nil { 426 390 break 427 391 } 428 392 } ··· 434 398 435 399 return nil 436 400 } 401 + 402 + func (l *LastFMService) SubmitTrackToPDS(did string, track *models.Track, ctx context.Context) error { 403 + client, err := l.atprotoService.GetATProtoClient() 404 + if err != nil || client == nil { 405 + return err 406 + } 407 + 408 + xrpcClient := l.atprotoService.GetXrpcClient() 409 + if xrpcClient == nil { 410 + return errors.New("xrpc client is kil") 411 + } 412 + 413 + // we check for client above 414 + sess, err := l.db.GetAtprotoSession(did, ctx, *client) 415 + if err != nil { 416 + return fmt.Errorf("Couldn't get Atproto session: %s", err) 417 + } 418 + 419 + // printout the session details 420 + fmt.Printf("Session details: %+v\n", sess) 421 + 422 + // horrible no good very bad for now 423 + artistArr := []string{} 424 + artistMbIdArr := []string{} 425 + for _, a := range track.Artist { 426 + artistArr = append(artistArr, a.Name) 427 + artistMbIdArr = append(artistMbIdArr, a.MBID) 428 + } 429 + 430 + var durationPtr *int64 431 + if track.DurationMs > 0 { 432 + durationSeconds := track.DurationMs / 1000 433 + durationPtr = &durationSeconds 434 + } 435 + 436 + playedTimeStr := track.Timestamp.Format(time.RFC3339) 437 + submissionAgent := "piper/v0.0.1" // TODO: get this from the environment on compilation 438 + 439 + // track -> tealfm track 440 + tfmTrack := teal.AlphaFeedPlay{ 441 + LexiconTypeID: "fm.teal.alpha.feed.play", // Assuming this is the correct Lexicon ID 442 + // tfm specifies duration in seconds 443 + Duration: durationPtr, // Pointer required 444 + TrackName: track.Name, 445 + // should be unix timestamp 446 + PlayedTime: &playedTimeStr, // Pointer required 447 + ArtistNames: artistArr, // Slice of strings is correct 448 + ArtistMbIds: artistMbIdArr, // Slice of strings is correct 449 + ReleaseMbId: &track.ReleaseMBID, // Pointer required 450 + ReleaseName: &track.Album, // Pointer required 451 + RecordingMbId: &track.RecordingMBID, // Pointer required 452 + SubmissionClientAgent: &submissionAgent, // Pointer required 453 + } 454 + 455 + input := atproto.RepoCreateRecord_Input{ 456 + Collection: "fm.teal.alpha.feed.play", 457 + Repo: sess.DID, 458 + Record: &lexutil.LexiconTypeDecoder{Val: &tfmTrack}, 459 + } 460 + 461 + authArgs := db.AtpSessionToAuthArgs(sess) 462 + fmt.Println(authArgs) 463 + 464 + var out atproto.RepoCreateRecord_Output 465 + if err := xrpcClient.Do(ctx, authArgs, xrpc.Procedure, "application/json", "com.atproto.repo.createRecord", nil, input, &out); err != nil { 466 + return err 467 + } 468 + 469 + // submit track to PDS 470 + 471 + return nil 472 + }
+1 -1
service/musicbrainz/musicbrainz.go
··· 175 175 defer resp.Body.Close() 176 176 177 177 if resp.StatusCode != http.StatusOK { 178 - // TODO: Consider reading body for detailed error message from MusicBrainz 178 + // TODO: read body for detailed error message 179 179 return nil, fmt.Errorf("MusicBrainz API request to %s returned status %d", endpoint, resp.StatusCode) 180 180 } 181 181