A decentralized music tracking and discovery platform built on AT Protocol 🎵
listenbrainz spotify atproto lastfm musicbrainz scrobbling

refactors pull_data function

Changed files
+515 -482
crates
pgpull
src
+515 -482
crates/pgpull/src/lib.rs
··· 5 5 6 6 use anyhow::{Context, Error}; 7 7 use owo_colors::OwoColorize; 8 - use sqlx::postgres::PgPoolOptions; 8 + use sqlx::{postgres::PgPoolOptions, PgPool}; 9 9 10 - pub async fn pull_data() -> Result<(), Error> { 10 + const MAX_CONNECTIONS: u32 = 5; 11 + const BATCH_SIZE: usize = 1000; 12 + 13 + #[derive(Clone)] 14 + pub struct DatabasePools { 15 + pub source: PgPool, 16 + pub destination: PgPool, 17 + } 18 + 19 + async fn setup_database_pools() -> Result<DatabasePools, Error> { 11 20 if env::var("SOURCE_POSTGRES_URL").is_err() { 12 21 tracing::error!( 13 22 "SOURCE_POSTGRES_URL is not set. Please set it to your PostgreSQL connection string." ··· 15 24 std::process::exit(1); 16 25 } 17 26 18 - let pool = PgPoolOptions::new() 19 - .max_connections(5) 27 + let source = PgPoolOptions::new() 28 + .max_connections(MAX_CONNECTIONS) 20 29 .connect(&env::var("SOURCE_POSTGRES_URL")?) 21 30 .await?; 22 31 23 - let dest_pool = PgPoolOptions::new() 24 - .max_connections(5) 32 + let destination = PgPoolOptions::new() 33 + .max_connections(MAX_CONNECTIONS) 25 34 .connect(&env::var("XATA_POSTGRES_URL")?) 26 35 .await?; 27 36 28 - let pool_clone = pool.clone(); 29 - let dest_pool_clone = dest_pool.clone(); 30 - let album_sync = tokio::spawn(async move { 31 - let total_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM albums") 32 - .fetch_one(&pool_clone) 33 - .await?; 34 - let total_albums = total_albums.0; 35 - tracing::info!(total = %total_albums.magenta(), "Total albums to sync"); 37 + Ok(DatabasePools { 38 + source, 39 + destination, 40 + }) 41 + } 36 42 37 - const BATCH_SIZE: usize = 1000; 43 + async fn sync_albums(pools: &DatabasePools) -> Result<(), Error> { 44 + let total_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM albums") 45 + .fetch_one(&pools.source) 46 + .await?; 47 + let total_albums = total_albums.0; 48 + tracing::info!(total = %total_albums.magenta(), "Total albums to sync"); 38 49 39 - let start = 0; 40 - let mut i = 1; 50 + let start = 0; 51 + let mut i = 1; 41 52 42 - for offset in (start..total_albums).step_by(BATCH_SIZE) { 43 - let albums = 44 - repo::album::get_albums(&pool_clone, offset as i64, BATCH_SIZE as i64).await?; 45 - tracing::info!( 46 - offset = %offset.magenta(), 47 - end = %((offset + albums.len() as i64).min(total_albums)).magenta(), 48 - total = %total_albums.magenta(), 49 - "Fetched albums" 50 - ); 51 - for album in &albums { 52 - tracing::info!(title = %album.title.cyan(), i = %i.magenta(), total = %total_albums.magenta(), "Inserting album"); 53 - repo::album::insert_album(&dest_pool_clone, album).await?; 54 - i += 1; 55 - } 53 + for offset in (start..total_albums).step_by(BATCH_SIZE) { 54 + let albums = 55 + repo::album::get_albums(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 56 + tracing::info!( 57 + offset = %offset.magenta(), 58 + end = %((offset + albums.len() as i64).min(total_albums)).magenta(), 59 + total = %total_albums.magenta(), 60 + "Fetched albums" 61 + ); 62 + for album in &albums { 63 + tracing::info!(title = %album.title.cyan(), i = %i.magenta(), total = %total_albums.magenta(), "Inserting album"); 64 + repo::album::insert_album(&pools.destination, album).await?; 65 + i += 1; 56 66 } 57 - Ok::<(), Error>(()) 58 - }); 67 + } 68 + Ok(()) 69 + } 59 70 60 - let pool_clone = pool.clone(); 61 - let dest_pool_clone = dest_pool.clone(); 62 - let artist_sync = tokio::spawn(async move { 63 - let total_artists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artists") 64 - .fetch_one(&pool_clone) 65 - .await?; 66 - let total_artists = total_artists.0; 67 - tracing::info!(total = %total_artists.magenta(), "Total artists to sync"); 71 + async fn sync_artists(pools: &DatabasePools) -> Result<(), Error> { 72 + let total_artists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artists") 73 + .fetch_one(&pools.source) 74 + .await?; 75 + let total_artists = total_artists.0; 76 + tracing::info!(total = %total_artists.magenta(), "Total artists to sync"); 68 77 69 - const BATCH_SIZE: usize = 1000; 78 + let start = 0; 79 + let mut i = 1; 70 80 71 - let start = 0; 72 - let mut i = 1; 81 + for offset in (start..total_artists).step_by(BATCH_SIZE) { 82 + let artists = 83 + repo::artist::get_artists(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 84 + tracing::info!( 85 + offset = %offset.magenta(), 86 + end = %((offset + artists.len() as i64).min(total_artists)).magenta(), 87 + total = %total_artists.magenta(), 88 + "Fetched artists" 89 + ); 90 + for artist in &artists { 91 + tracing::info!(name = %artist.name.cyan(), i = %i.magenta(), total = %total_artists.magenta(), "Inserting artist"); 92 + repo::artist::insert_artist(&pools.destination, artist).await?; 93 + i += 1; 94 + } 95 + } 96 + Ok(()) 97 + } 73 98 74 - for offset in (start..total_artists).step_by(BATCH_SIZE) { 75 - let artists = 76 - repo::artist::get_artists(&pool_clone, offset as i64, BATCH_SIZE as i64).await?; 77 - tracing::info!( 78 - offset = %offset.magenta(), 79 - end = %((offset + artists.len() as i64).min(total_artists)).magenta(), 80 - total = %total_artists.magenta(), 81 - "Fetched artists" 82 - ); 83 - for artist in &artists { 84 - tracing::info!(name = %artist.name.cyan(), i = %i.magenta(), total = %total_artists.magenta(), "Inserting artist"); 85 - repo::artist::insert_artist(&dest_pool_clone, artist).await?; 86 - i += 1; 99 + async fn sync_tracks(pools: &DatabasePools) -> Result<(), Error> { 100 + let total_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tracks") 101 + .fetch_one(&pools.source) 102 + .await?; 103 + let total_tracks = total_tracks.0; 104 + tracing::info!(total = %total_tracks.magenta(), "Total tracks to sync"); 105 + 106 + let start = 0; 107 + let mut i = 1; 108 + 109 + for offset in (start..total_tracks).step_by(BATCH_SIZE) { 110 + let tracks = 111 + repo::track::get_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 112 + tracing::info!( 113 + offset = %offset.magenta(), 114 + end = %((offset + tracks.len() as i64).min(total_tracks)).magenta(), 115 + total = %total_tracks.magenta(), 116 + "Fetched tracks" 117 + ); 118 + 119 + for track in &tracks { 120 + tracing::info!(title = %track.title.cyan(), i = %i.magenta(), total = %total_tracks.magenta(), "Inserting track"); 121 + match repo::track::insert_track(&pools.destination, track).await { 122 + Ok(_) => {} 123 + Err(e) => { 124 + tracing::error!(error = %e, "Failed to insert track"); 125 + } 87 126 } 127 + i += 1; 88 128 } 89 - Ok::<(), Error>(()) 90 - }); 129 + } 130 + Ok(()) 131 + } 91 132 92 - let pool_clone = pool.clone(); 93 - let dest_pool_clone = dest_pool.clone(); 94 - let track_sync = tokio::spawn(async move { 95 - let total_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tracks") 96 - .fetch_one(&pool_clone) 97 - .await?; 98 - let total_tracks = total_tracks.0; 99 - tracing::info!(total = %total_tracks.magenta(), "Total tracks to sync"); 133 + async fn sync_users(pools: &DatabasePools) -> Result<(), Error> { 134 + let total_users: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users") 135 + .fetch_one(&pools.source) 136 + .await?; 137 + let total_users = total_users.0; 138 + tracing::info!(total = %total_users.magenta(), "Total users to sync"); 100 139 101 - const BATCH_SIZE: usize = 1000; 140 + let start = 0; 141 + let mut i = 1; 102 142 103 - let start = 0; 104 - let mut i = 1; 143 + for offset in (start..total_users).step_by(BATCH_SIZE) { 144 + let users = repo::user::get_users(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 145 + tracing::info!( 146 + offset = %offset.magenta(), 147 + end = %((offset + users.len() as i64).min(total_users)).magenta(), 148 + total = %total_users.magenta(), 149 + "Fetched users" 150 + ); 105 151 106 - for offset in (start..total_tracks).step_by(BATCH_SIZE) { 107 - let tracks = 108 - repo::track::get_tracks(&pool_clone, offset as i64, BATCH_SIZE as i64).await?; 109 - tracing::info!( 110 - offset = %offset.magenta(), 111 - end = %((offset + tracks.len() as i64).min(total_tracks)).magenta(), 112 - total = %total_tracks.magenta(), 113 - "Fetched tracks" 114 - ); 115 - 116 - for track in &tracks { 117 - tracing::info!(title = %track.title.cyan(), i = %i.magenta(), total = %total_tracks.magenta(), "Inserting track"); 118 - match repo::track::insert_track(&dest_pool_clone, track).await { 119 - Ok(_) => {} 120 - Err(e) => { 121 - tracing::error!(error = %e, "Failed to insert track"); 122 - } 152 + for user in &users { 153 + tracing::info!(handle = %user.handle.cyan(), i = %i.magenta(), total = %total_users.magenta(), "Inserting user"); 154 + match repo::user::insert_user(&pools.destination, user).await { 155 + Ok(_) => {} 156 + Err(e) => { 157 + tracing::error!(error = %e, "Failed to insert user"); 123 158 } 124 - i += 1; 125 159 } 160 + i += 1; 126 161 } 127 - Ok::<(), Error>(()) 128 - }); 162 + } 163 + Ok(()) 164 + } 129 165 130 - let pool_clone = pool.clone(); 131 - let dest_pool_clone = dest_pool.clone(); 132 - let user_sync = tokio::spawn(async move { 133 - let total_users: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users") 134 - .fetch_one(&pool_clone) 135 - .await?; 136 - let total_users = total_users.0; 137 - tracing::info!(total = %total_users.magenta(), "Total users to sync"); 166 + async fn sync_playlists(pools: &DatabasePools) -> Result<(), Error> { 167 + let total_playlists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM playlists") 168 + .fetch_one(&pools.source) 169 + .await?; 170 + let total_playlists = total_playlists.0; 171 + tracing::info!(total = %total_playlists.magenta(), "Total playlists to sync"); 138 172 139 - const BATCH_SIZE: usize = 1000; 173 + let start = 0; 174 + let mut i = 1; 140 175 141 - let start = 0; 142 - let mut i = 1; 176 + for offset in (start..total_playlists).step_by(BATCH_SIZE) { 177 + let playlists = 178 + repo::playlist::get_playlists(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 179 + tracing::info!( 180 + offset = %offset.magenta(), 181 + end = %((offset + playlists.len() as i64).min(total_playlists)).magenta(), 182 + total = %total_playlists.magenta(), 183 + "Fetched playlists" 184 + ); 143 185 144 - for offset in (start..total_users).step_by(BATCH_SIZE) { 145 - let users = 146 - repo::user::get_users(&pool_clone, offset as i64, BATCH_SIZE as i64).await?; 147 - tracing::info!( 148 - offset = %offset.magenta(), 149 - end = %((offset + users.len() as i64).min(total_users)).magenta(), 150 - total = %total_users.magenta(), 151 - "Fetched users" 152 - ); 153 - 154 - for user in &users { 155 - tracing::info!(handle = %user.handle.cyan(), i = %i.magenta(), total = %total_users.magenta(), "Inserting user"); 156 - match repo::user::insert_user(&dest_pool_clone, user).await { 157 - Ok(_) => {} 158 - Err(e) => { 159 - tracing::error!(error = %e, "Failed to insert user"); 160 - } 186 + for playlist in &playlists { 187 + tracing::info!(name = %playlist.name.cyan(), i = %i.magenta(), total = %total_playlists.magenta(), "Inserting playlist"); 188 + match repo::playlist::insert_playlist(&pools.destination, playlist).await { 189 + Ok(_) => {} 190 + Err(e) => { 191 + tracing::error!(error = %e, "Failed to insert playlist"); 161 192 } 162 - i += 1; 163 193 } 194 + i += 1; 164 195 } 165 - Ok::<(), Error>(()) 166 - }); 196 + } 197 + Ok(()) 198 + } 167 199 168 - let (album_sync, artist_sync, track_sync, user_sync) = 169 - tokio::join!(album_sync, artist_sync, track_sync, user_sync); 200 + async fn sync_loved_tracks(pools: &DatabasePools) -> Result<(), Error> { 201 + let total_loved_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM loved_tracks") 202 + .fetch_one(&pools.source) 203 + .await?; 204 + let total_loved_tracks = total_loved_tracks.0; 205 + tracing::info!(total = %total_loved_tracks.magenta(), "Total loved tracks to sync"); 206 + 207 + let start = 0; 208 + let mut i = 1; 170 209 171 - album_sync.context("Album sync task failed")??; 172 - artist_sync.context("Artist sync task failed")??; 173 - track_sync.context("Track sync task failed")??; 174 - user_sync.context("User sync task failed")??; 210 + for offset in (start..total_loved_tracks).step_by(BATCH_SIZE) { 211 + let loved_tracks = 212 + repo::loved_track::get_loved_tracks(&pools.source, offset as i64, BATCH_SIZE as i64) 213 + .await?; 214 + tracing::info!( 215 + offset = %offset.magenta(), 216 + end = %((offset + loved_tracks.len() as i64).min(total_loved_tracks)).magenta(), 217 + total = %total_loved_tracks.magenta(), 218 + "Fetched loved tracks" 219 + ); 175 220 176 - let pool_clone = pool.clone(); 177 - let dest_pool_clone = dest_pool.clone(); 178 - let playlist_sync = tokio::spawn(async move { 179 - let total_playlists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM playlists") 180 - .fetch_one(&pool_clone) 181 - .await?; 182 - let total_playlists = total_playlists.0; 183 - tracing::info!(total = %total_playlists.magenta(), "Total playlists to sync"); 221 + for loved_track in &loved_tracks { 222 + tracing::info!(user_id = %loved_track.user_id.cyan(), track_id = %loved_track.track_id.magenta(), i = %i.magenta(), total = %total_loved_tracks.magenta(), "Inserting loved track"); 223 + match repo::loved_track::insert_loved_track(&pools.destination, loved_track).await { 224 + Ok(_) => {} 225 + Err(e) => { 226 + tracing::error!(error = %e, "Failed to insert loved track"); 227 + } 228 + } 229 + i += 1; 230 + } 231 + } 232 + Ok(()) 233 + } 184 234 185 - const BATCH_SIZE: usize = 1000; 235 + async fn sync_scrobbles(pools: &DatabasePools) -> Result<(), Error> { 236 + let total_scrobbles: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrobbles") 237 + .fetch_one(&pools.source) 238 + .await?; 239 + let total_scrobbles = total_scrobbles.0; 240 + tracing::info!(total = %total_scrobbles.magenta(), "Total scrobbles to sync"); 186 241 187 - let start = 0; 188 - let mut i = 1; 242 + let start = 0; 243 + let mut i = 1; 189 244 190 - for offset in (start..total_playlists).step_by(BATCH_SIZE) { 191 - let playlists = 192 - repo::playlist::get_playlists(&pool_clone, offset as i64, BATCH_SIZE as i64) 193 - .await?; 194 - tracing::info!( 195 - offset = %offset.magenta(), 196 - end = %((offset + playlists.len() as i64).min(total_playlists)).magenta(), 197 - total = %total_playlists.magenta(), 198 - "Fetched playlists" 199 - ); 245 + for offset in (start..total_scrobbles).step_by(BATCH_SIZE) { 246 + let scrobbles = 247 + repo::scrobble::get_scrobbles(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 248 + tracing::info!( 249 + offset = %offset.magenta(), 250 + end = %((offset + scrobbles.len() as i64).min(total_scrobbles)).magenta(), 251 + total = %total_scrobbles.magenta(), 252 + "Fetched scrobbles" 253 + ); 200 254 201 - for playlist in &playlists { 202 - tracing::info!(name = %playlist.name.cyan(), i = %i.magenta(), total = %total_playlists.magenta(), "Inserting playlist"); 203 - match repo::playlist::insert_playlist(&dest_pool_clone, playlist).await { 204 - Ok(_) => {} 205 - Err(e) => { 206 - tracing::error!(error = %e, "Failed to insert playlist"); 207 - } 255 + for scrobble in &scrobbles { 256 + tracing::info!(user_id = %scrobble.user_id.cyan(), track_id = %scrobble.track_id.magenta(), i = %i.magenta(), total = %total_scrobbles.magenta(), "Inserting scrobble"); 257 + match repo::scrobble::insert_scrobble(&pools.destination, scrobble).await { 258 + Ok(_) => {} 259 + Err(e) => { 260 + tracing::error!(error = %e, "Failed to insert scrobble"); 208 261 } 209 - i += 1; 210 262 } 263 + i += 1; 211 264 } 212 - Ok::<(), Error>(()) 213 - }); 265 + } 266 + Ok(()) 267 + } 214 268 215 - let pool_clone = pool.clone(); 216 - let dest_pool_clone = dest_pool.clone(); 217 - let loved_track_sync = tokio::spawn(async move { 218 - let total_loved_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM loved_tracks") 219 - .fetch_one(&pool_clone) 220 - .await?; 221 - let total_loved_tracks = total_loved_tracks.0; 222 - tracing::info!(total = %total_loved_tracks.magenta(), "Total loved tracks to sync"); 269 + async fn sync_album_tracks(pools: &DatabasePools) -> Result<(), Error> { 270 + let total_album_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM album_tracks") 271 + .fetch_one(&pools.source) 272 + .await?; 273 + let total_album_tracks = total_album_tracks.0; 274 + tracing::info!(total = %total_album_tracks.magenta(), "Total album tracks to sync"); 223 275 224 - const BATCH_SIZE: usize = 1000; 276 + let start = 0; 277 + let mut i = 1; 225 278 226 - let start = 0; 227 - let mut i = 1; 228 - 229 - for offset in (start..total_loved_tracks).step_by(BATCH_SIZE) { 230 - let loved_tracks = 231 - repo::loved_track::get_loved_tracks(&pool_clone, offset as i64, BATCH_SIZE as i64) 232 - .await?; 233 - tracing::info!( 234 - offset = %offset.magenta(), 235 - end = %((offset + loved_tracks.len() as i64).min(total_loved_tracks)).magenta(), 236 - total = %total_loved_tracks.magenta(), 237 - "Fetched loved tracks" 238 - ); 279 + for offset in (start..total_album_tracks).step_by(BATCH_SIZE) { 280 + let album_tracks = 281 + repo::album::get_album_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 282 + tracing::info!( 283 + offset = %offset.magenta(), 284 + end = %((offset + album_tracks.len() as i64).min(total_album_tracks)).magenta(), 285 + total = %total_album_tracks.magenta(), 286 + "Fetched album tracks" 287 + ); 239 288 240 - for loved_track in &loved_tracks { 241 - tracing::info!(user_id = %loved_track.user_id.cyan(), track_id = %loved_track.track_id.magenta(), i = %i.magenta(), total = %total_loved_tracks.magenta(), "Inserting loved track"); 242 - match repo::loved_track::insert_loved_track(&dest_pool_clone, loved_track).await { 243 - Ok(_) => {} 244 - Err(e) => { 245 - tracing::error!(error = %e, "Failed to insert loved track"); 246 - } 289 + for album_track in &album_tracks { 290 + tracing::info!(album_id = %album_track.album_id.cyan(), track_id = %album_track.track_id.magenta(), i = %i.magenta(), total = %total_album_tracks.magenta(), "Inserting album track"); 291 + match repo::album::insert_album_track(&pools.destination, album_track).await { 292 + Ok(_) => {} 293 + Err(e) => { 294 + tracing::error!(error = %e, "Failed to insert album track"); 247 295 } 248 - i += 1; 249 296 } 297 + i += 1; 250 298 } 251 - Ok::<(), Error>(()) 252 - }); 299 + } 300 + Ok(()) 301 + } 253 302 254 - let pool_clone = pool.clone(); 255 - let dest_pool_clone = dest_pool.clone(); 303 + async fn sync_artist_albums(pools: &DatabasePools) -> Result<(), Error> { 304 + let total_artist_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_albums") 305 + .fetch_one(&pools.source) 306 + .await?; 307 + let total_artist_albums = total_artist_albums.0; 308 + tracing::info!(total = %total_artist_albums.magenta(), "Total artist albums to sync"); 256 309 257 - let scrobble_sync = tokio::spawn(async move { 258 - let total_scrobbles: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrobbles") 259 - .fetch_one(&pool_clone) 260 - .await?; 261 - let total_scrobbles = total_scrobbles.0; 262 - tracing::info!(total = %total_scrobbles.magenta(), "Total scrobbles to sync"); 310 + let start = 0; 311 + let mut i = 1; 263 312 264 - const BATCH_SIZE: usize = 1000; 313 + for offset in (start..total_artist_albums).step_by(BATCH_SIZE) { 314 + let artist_albums = 315 + repo::artist::get_artist_albums(&pools.source, offset as i64, BATCH_SIZE as i64) 316 + .await?; 317 + tracing::info!( 318 + offset = %offset.magenta(), 319 + end = %((offset + artist_albums.len() as i64).min(total_artist_albums)).magenta(), 320 + total = %total_artist_albums.magenta(), 321 + "Fetched artist albums" 322 + ); 265 323 266 - let start = 0; 267 - let mut i = 1; 324 + for artist_album in &artist_albums { 325 + tracing::info!(artist_id = %artist_album.artist_id.cyan(), album_id = %artist_album.album_id.magenta(), i = %i.magenta(), total = %total_artist_albums.magenta(), "Inserting artist album"); 326 + match repo::artist::insert_artist_album(&pools.destination, artist_album).await { 327 + Ok(_) => {} 328 + Err(e) => { 329 + tracing::error!(error = %e, "Failed to insert artist album"); 330 + } 331 + } 332 + i += 1; 333 + } 334 + } 335 + Ok(()) 336 + } 268 337 269 - for offset in (start..total_scrobbles).step_by(BATCH_SIZE) { 270 - let scrobbles = 271 - repo::scrobble::get_scrobbles(&pool_clone, offset as i64, BATCH_SIZE as i64) 272 - .await?; 273 - tracing::info!( 274 - offset = %offset.magenta(), 275 - end = %((offset + scrobbles.len() as i64).min(total_scrobbles)).magenta(), 276 - total = %total_scrobbles.magenta(), 277 - "Fetched scrobbles" 278 - ); 338 + async fn sync_artist_tracks(pools: &DatabasePools) -> Result<(), Error> { 339 + let total_artist_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_tracks") 340 + .fetch_one(&pools.source) 341 + .await?; 342 + let total_artist_tracks = total_artist_tracks.0; 343 + tracing::info!(total = %total_artist_tracks.magenta(), "Total artist tracks to sync"); 344 + 345 + let start = 0; 346 + let mut i = 1; 347 + 348 + for offset in (start..total_artist_tracks).step_by(BATCH_SIZE) { 349 + let artist_tracks = 350 + repo::artist::get_artist_tracks(&pools.source, offset as i64, BATCH_SIZE as i64) 351 + .await?; 352 + tracing::info!( 353 + offset = %offset.magenta(), 354 + end = %((offset + artist_tracks.len() as i64).min(total_artist_tracks)).magenta(), 355 + total = %total_artist_tracks.magenta(), 356 + "Fetched artist tracks" 357 + ); 279 358 280 - for scrobble in &scrobbles { 281 - tracing::info!(user_id = %scrobble.user_id.cyan(), track_id = %scrobble.track_id.magenta(), i = %i.magenta(), total = %total_scrobbles.magenta(), "Inserting scrobble"); 282 - match repo::scrobble::insert_scrobble(&dest_pool_clone, scrobble).await { 283 - Ok(_) => {} 284 - Err(e) => { 285 - tracing::error!(error = %e, "Failed to insert scrobble"); 286 - } 359 + for artist_track in &artist_tracks { 360 + tracing::info!(artist_id = %artist_track.artist_id.cyan(), track_id = %artist_track.track_id.magenta(), i = %i.magenta(), total = %total_artist_tracks.magenta(), "Inserting artist track"); 361 + match repo::artist::insert_artist_track(&pools.destination, artist_track).await { 362 + Ok(_) => {} 363 + Err(e) => { 364 + tracing::error!(error = %e, "Failed to insert artist track"); 287 365 } 288 - i += 1; 289 366 } 367 + i += 1; 290 368 } 291 - Ok::<(), Error>(()) 292 - }); 369 + } 370 + Ok(()) 371 + } 293 372 294 - let (loved_track_sync, playlist_sync, scrobble_sync) = 295 - tokio::join!(loved_track_sync, playlist_sync, scrobble_sync); 296 - loved_track_sync.context("Loved track sync task failed")??; 297 - playlist_sync.context("Playlist sync task failed")??; 298 - scrobble_sync.context("Scrobble sync task failed")??; 373 + async fn sync_playlist_tracks(pools: &DatabasePools) -> Result<(), Error> { 374 + let total_playlist_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM playlist_tracks") 375 + .fetch_one(&pools.source) 376 + .await?; 377 + let total_playlist_tracks = total_playlist_tracks.0; 378 + tracing::info!(total = %total_playlist_tracks.magenta(), "Total playlist tracks to sync"); 299 379 300 - let pool_clone = pool.clone(); 301 - let dest_pool_clone = dest_pool.clone(); 302 - let album_track_sync = tokio::spawn(async move { 303 - let total_album_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM album_tracks") 304 - .fetch_one(&pool_clone) 305 - .await?; 306 - let total_album_tracks = total_album_tracks.0; 307 - tracing::info!(total = %total_album_tracks.magenta(), "Total album tracks to sync"); 380 + let start = 0; 381 + let mut i = 1; 308 382 309 - const BATCH_SIZE: usize = 1000; 383 + for offset in (start..total_playlist_tracks).step_by(BATCH_SIZE) { 384 + let playlist_tracks = 385 + repo::playlist::get_playlist_tracks(&pools.source, offset as i64, BATCH_SIZE as i64) 386 + .await?; 387 + tracing::info!( 388 + offset = %offset.magenta(), 389 + end = %((offset + playlist_tracks.len() as i64).min(total_playlist_tracks)).magenta(), 390 + total = %total_playlist_tracks.magenta(), 391 + "Fetched playlist tracks" 392 + ); 310 393 311 - let start = 0; 312 - let mut i = 1; 313 - 314 - for offset in (start..total_album_tracks).step_by(BATCH_SIZE) { 315 - let album_tracks = 316 - repo::album::get_album_tracks(&pool_clone, offset as i64, BATCH_SIZE as i64) 317 - .await?; 318 - tracing::info!( 319 - offset = %offset.magenta(), 320 - end = %((offset + album_tracks.len() as i64).min(total_album_tracks)).magenta(), 321 - total = %total_album_tracks.magenta(), 322 - "Fetched album tracks" 323 - ); 324 - 325 - for album_track in &album_tracks { 326 - tracing::info!(album_id = %album_track.album_id.cyan(), track_id = %album_track.track_id.magenta(), i = %i.magenta(), total = %total_album_tracks.magenta(), "Inserting album track"); 327 - match repo::album::insert_album_track(&dest_pool_clone, album_track).await { 328 - Ok(_) => {} 329 - Err(e) => { 330 - tracing::error!(error = %e, "Failed to insert album track"); 331 - } 394 + for playlist_track in &playlist_tracks { 395 + tracing::info!(playlist_id = %playlist_track.playlist_id.cyan(), track_id = %playlist_track.track_id.magenta(), i = %i.magenta(), total = %total_playlist_tracks.magenta(), "Inserting playlist track"); 396 + match repo::playlist::insert_playlist_track(&pools.destination, playlist_track).await { 397 + Ok(_) => {} 398 + Err(e) => { 399 + tracing::error!(error = %e, "Failed to insert playlist track"); 332 400 } 333 - i += 1; 334 401 } 402 + i += 1; 335 403 } 336 - Ok::<(), Error>(()) 337 - }); 404 + } 405 + Ok(()) 406 + } 338 407 339 - let pool_clone = pool.clone(); 340 - let dest_pool_clone = dest_pool.clone(); 341 - let artist_album_sync = tokio::spawn(async move { 342 - let total_artist_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_albums") 343 - .fetch_one(&pool_clone) 344 - .await?; 345 - let total_artist_albums = total_artist_albums.0; 346 - tracing::info!(total = %total_artist_albums.magenta(), "Total artist albums to sync"); 408 + async fn sync_user_albums(pools: &DatabasePools) -> Result<(), Error> { 409 + let total_user_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_albums") 410 + .fetch_one(&pools.source) 411 + .await?; 412 + let total_user_albums = total_user_albums.0; 413 + tracing::info!(total = %total_user_albums.magenta(), "Total user albums to sync"); 347 414 348 - const BATCH_SIZE: usize = 1000; 415 + let start = 0; 416 + let mut i = 1; 349 417 350 - let start = 0; 351 - let mut i = 1; 418 + for offset in (start..total_user_albums).step_by(BATCH_SIZE) { 419 + let user_albums = 420 + repo::album::get_user_albums(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 421 + tracing::info!( 422 + offset = %offset.magenta(), 423 + end = %((offset + user_albums.len() as i64).min(total_user_albums)).magenta(), 424 + total = %total_user_albums.magenta(), 425 + "Fetched user albums" 426 + ); 352 427 353 - for offset in (start..total_artist_albums).step_by(BATCH_SIZE) { 354 - let artist_albums = 355 - repo::artist::get_artist_albums(&pool_clone, offset as i64, BATCH_SIZE as i64) 356 - .await?; 357 - tracing::info!( 358 - offset = %offset.magenta(), 359 - end = %((offset + artist_albums.len() as i64).min(total_artist_albums)).magenta(), 360 - total = %total_artist_albums.magenta(), 361 - "Fetched artist albums" 362 - ); 363 - 364 - for artist_album in &artist_albums { 365 - tracing::info!(artist_id = %artist_album.artist_id.cyan(), album_id = %artist_album.album_id.magenta(), i = %i.magenta(), total = %total_artist_albums.magenta(), "Inserting artist album"); 366 - match repo::artist::insert_artist_album(&dest_pool_clone, artist_album).await { 367 - Ok(_) => {} 368 - Err(e) => { 369 - tracing::error!(error = %e, "Failed to insert artist album"); 370 - } 428 + for user_album in &user_albums { 429 + tracing::info!(user_id = %user_album.user_id.cyan(), album_id = %user_album.album_id.magenta(), i = %i.magenta(), total = %total_user_albums.magenta(), "Inserting user album"); 430 + match repo::album::insert_user_album(&pools.destination, user_album).await { 431 + Ok(_) => {} 432 + Err(e) => { 433 + tracing::error!(error = %e, "Failed to insert user album"); 371 434 } 372 - i += 1; 373 435 } 436 + i += 1; 374 437 } 375 - Ok::<(), Error>(()) 376 - }); 438 + } 439 + Ok(()) 440 + } 377 441 378 - let pool_clone = pool.clone(); 379 - let dest_pool_clone = dest_pool.clone(); 380 - let artist_track_sync = tokio::spawn(async move { 381 - let total_artist_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM artist_tracks") 382 - .fetch_one(&pool_clone) 383 - .await?; 384 - let total_artist_tracks = total_artist_tracks.0; 385 - tracing::info!(total = %total_artist_tracks.magenta(), "Total artist tracks to sync"); 386 - const BATCH_SIZE: usize = 1000; 442 + async fn sync_user_artists(pools: &DatabasePools) -> Result<(), Error> { 443 + let total_user_artists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_artists") 444 + .fetch_one(&pools.source) 445 + .await?; 446 + let total_user_artists = total_user_artists.0; 447 + tracing::info!(total = %total_user_artists.magenta(), "Total user artists to sync"); 387 448 388 - let start = 0; 389 - let mut i = 1; 449 + let start = 0; 450 + let mut i = 1; 390 451 391 - for offset in (start..total_artist_tracks).step_by(BATCH_SIZE) { 392 - let artist_tracks = 393 - repo::artist::get_artist_tracks(&pool_clone, offset as i64, BATCH_SIZE as i64) 394 - .await?; 395 - tracing::info!( 396 - offset = %offset.magenta(), 397 - end = %((offset + artist_tracks.len() as i64).min(total_artist_tracks)).magenta(), 398 - total = %total_artist_tracks.magenta(), 399 - "Fetched artist tracks" 400 - ); 452 + for offset in (start..total_user_artists).step_by(BATCH_SIZE) { 453 + let user_artists = 454 + repo::artist::get_user_artists(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 455 + tracing::info!( 456 + offset = %offset.magenta(), 457 + end = %((offset + user_artists.len() as i64).min(total_user_artists)).magenta(), 458 + total = %total_user_artists.magenta(), 459 + "Fetched user artists" 460 + ); 401 461 402 - for artist_track in &artist_tracks { 403 - tracing::info!(artist_id = %artist_track.artist_id.cyan(), track_id = %artist_track.track_id.magenta(), i = %i.magenta(), total = %total_artist_tracks.magenta(), "Inserting artist track"); 404 - match repo::artist::insert_artist_track(&dest_pool_clone, artist_track).await { 405 - Ok(_) => {} 406 - Err(e) => { 407 - tracing::error!(error = %e, "Failed to insert artist track"); 408 - } 462 + for user_artist in &user_artists { 463 + tracing::info!(user_id = %user_artist.user_id.cyan(), artist_id = %user_artist.artist_id.magenta(), i = %i.magenta(), total = %total_user_artists.magenta(), "Inserting user artist"); 464 + match repo::artist::insert_user_artist(&pools.destination, user_artist).await { 465 + Ok(_) => {} 466 + Err(e) => { 467 + tracing::error!(error = %e, "Failed to insert user artist"); 409 468 } 410 - i += 1; 411 469 } 470 + i += 1; 412 471 } 413 - Ok::<(), Error>(()) 414 - }); 472 + } 473 + Ok(()) 474 + } 415 475 416 - let pool_clone = pool.clone(); 417 - let dest_pool_clone = dest_pool.clone(); 418 - let playlist_track_sync = tokio::spawn(async move { 419 - let total_playlist_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM playlist_tracks") 420 - .fetch_one(&pool_clone) 421 - .await?; 422 - let total_playlist_tracks = total_playlist_tracks.0; 423 - tracing::info!(total = %total_playlist_tracks.magenta(), "Total playlist tracks to sync"); 424 - 425 - const BATCH_SIZE: usize = 1000; 476 + async fn sync_user_tracks(pools: &DatabasePools) -> Result<(), Error> { 477 + let total_user_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_tracks") 478 + .fetch_one(&pools.source) 479 + .await?; 480 + let total_user_tracks = total_user_tracks.0; 481 + tracing::info!(total = %total_user_tracks.magenta(), "Total user tracks to sync"); 426 482 427 - let start = 0; 428 - let mut i = 1; 483 + let start = 0; 484 + let mut i = 1; 429 485 430 - for offset in (start..total_playlist_tracks).step_by(BATCH_SIZE) { 431 - let playlist_tracks = 432 - repo::playlist::get_playlist_tracks(&pool_clone, offset as i64, BATCH_SIZE as i64) 433 - .await?; 434 - tracing::info!( 435 - offset = %offset.magenta(), 436 - end = %((offset + playlist_tracks.len() as i64).min(total_playlist_tracks)).magenta(), 437 - total = %total_playlist_tracks.magenta(), 438 - "Fetched playlist tracks" 439 - ); 486 + for offset in (start..total_user_tracks).step_by(BATCH_SIZE) { 487 + let user_tracks = 488 + repo::track::get_user_tracks(&pools.source, offset as i64, BATCH_SIZE as i64).await?; 489 + tracing::info!( 490 + offset = %offset.magenta(), 491 + end = %((offset + user_tracks.len() as i64).min(total_user_tracks)).magenta(), 492 + total = %total_user_tracks.magenta(), 493 + "Fetched user tracks" 494 + ); 440 495 441 - for playlist_track in &playlist_tracks { 442 - tracing::info!(playlist_id = %playlist_track.playlist_id.cyan(), track_id = %playlist_track.track_id.magenta(), i = %i.magenta(), total = %total_playlist_tracks.magenta(), "Inserting playlist track"); 443 - match repo::playlist::insert_playlist_track(&dest_pool_clone, playlist_track).await 444 - { 445 - Ok(_) => {} 446 - Err(e) => { 447 - tracing::error!(error = %e, "Failed to insert playlist track"); 448 - } 496 + for user_track in &user_tracks { 497 + tracing::info!(user_id = %user_track.user_id.cyan(), track_id = %user_track.track_id.magenta(), i = %i.magenta(), total = %total_user_tracks.magenta(), "Inserting user track"); 498 + match repo::track::insert_user_track(&pools.destination, user_track).await { 499 + Ok(_) => {} 500 + Err(e) => { 501 + tracing::error!(error = %e, "Failed to insert user track"); 449 502 } 450 - i += 1; 451 503 } 504 + i += 1; 452 505 } 453 - Ok::<(), Error>(()) 454 - }); 506 + } 507 + Ok(()) 508 + } 455 509 456 - let pool_clone = pool.clone(); 457 - let dest_pool_clone = dest_pool.clone(); 458 - let user_album_sync = tokio::spawn(async move { 459 - let total_user_albums: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_albums") 460 - .fetch_one(&pool_clone) 461 - .await?; 462 - let total_user_albums = total_user_albums.0; 463 - tracing::info!(total = %total_user_albums.magenta(), "Total user albums to sync"); 464 - const BATCH_SIZE: usize = 1000; 510 + async fn sync_user_playlists(pools: &DatabasePools) -> Result<(), Error> { 511 + let total_user_playlists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_playlists") 512 + .fetch_one(&pools.source) 513 + .await?; 514 + let total_user_playlists = total_user_playlists.0; 515 + tracing::info!(total = %total_user_playlists.magenta(), "Total user playlists to sync"); 465 516 466 - let start = 0; 467 - let mut i = 1; 517 + let start = 0; 518 + let mut i = 1; 468 519 469 - for offset in (start..total_user_albums).step_by(BATCH_SIZE) { 470 - let user_albums = 471 - repo::album::get_user_albums(&pool_clone, offset as i64, BATCH_SIZE as i64).await?; 472 - tracing::info!( 473 - offset = %offset.magenta(), 474 - end = %((offset + user_albums.len() as i64).min(total_user_albums)).magenta(), 475 - total = %total_user_albums.magenta(), 476 - "Fetched user albums" 477 - ); 520 + for offset in (start..total_user_playlists).step_by(BATCH_SIZE) { 521 + let user_playlists = 522 + repo::playlist::get_user_playlists(&pools.source, offset as i64, BATCH_SIZE as i64) 523 + .await?; 524 + tracing::info!( 525 + offset = %offset.magenta(), 526 + end = %((offset + user_playlists.len() as i64).min(total_user_playlists)).magenta(), 527 + total = %total_user_playlists.magenta(), 528 + "Fetched user playlists" 529 + ); 478 530 479 - for user_album in &user_albums { 480 - tracing::info!(user_id = %user_album.user_id.cyan(), album_id = %user_album.album_id.magenta(), i = %i.magenta(), total = %total_user_albums.magenta(), "Inserting user album"); 481 - match repo::album::insert_user_album(&dest_pool_clone, user_album).await { 482 - Ok(_) => {} 483 - Err(e) => { 484 - tracing::error!(error = %e, "Failed to insert user album"); 485 - } 531 + for user_playlist in &user_playlists { 532 + tracing::info!(user_id = %user_playlist.user_id.cyan(), playlist_id = %user_playlist.playlist_id.magenta(), i = %i.magenta(), total = %total_user_playlists.magenta(), "Inserting user playlist"); 533 + match repo::playlist::insert_user_playlist(&pools.destination, user_playlist).await { 534 + Ok(_) => {} 535 + Err(e) => { 536 + tracing::error!(error = %e, "Failed to insert user playlist"); 486 537 } 487 - i += 1; 488 538 } 539 + i += 1; 489 540 } 490 - Ok::<(), Error>(()) 541 + } 542 + Ok(()) 543 + } 544 + 545 + pub async fn pull_data() -> Result<(), Error> { 546 + let pools = setup_database_pools().await?; 547 + 548 + // Sync core entities first 549 + let album_sync = tokio::spawn({ 550 + let pools = pools.clone(); 551 + async move { sync_albums(&pools).await } 491 552 }); 492 553 493 - let pool_clone = pool.clone(); 494 - let dest_pool_clone = dest_pool.clone(); 495 - let user_artist_sync = tokio::spawn(async move { 496 - let total_user_artists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_artists") 497 - .fetch_one(&pool_clone) 498 - .await?; 499 - let total_user_artists = total_user_artists.0; 500 - tracing::info!(total = %total_user_artists.magenta(), "Total user artists to sync"); 501 - const BATCH_SIZE: usize = 1000; 554 + let artist_sync = tokio::spawn({ 555 + let pools = pools.clone(); 556 + async move { sync_artists(&pools).await } 557 + }); 502 558 503 - let start = 0; 504 - let mut i = 1; 559 + let track_sync = tokio::spawn({ 560 + let pools = pools.clone(); 561 + async move { sync_tracks(&pools).await } 562 + }); 563 + 564 + let user_sync = tokio::spawn({ 565 + let pools = pools.clone(); 566 + async move { sync_users(&pools).await } 567 + }); 568 + 569 + let (album_sync, artist_sync, track_sync, user_sync) = 570 + tokio::join!(album_sync, artist_sync, track_sync, user_sync); 571 + 572 + album_sync.context("Album sync task failed")??; 573 + artist_sync.context("Artist sync task failed")??; 574 + track_sync.context("Track sync task failed")??; 575 + user_sync.context("User sync task failed")??; 576 + 577 + // Sync relationship entities 578 + let playlist_sync = tokio::spawn({ 579 + let pools = pools.clone(); 580 + async move { sync_playlists(&pools).await } 581 + }); 505 582 506 - for offset in (start..total_user_artists).step_by(BATCH_SIZE) { 507 - let user_artists = 508 - repo::artist::get_user_artists(&pool_clone, offset as i64, BATCH_SIZE as i64) 509 - .await?; 510 - tracing::info!( 511 - offset = %offset.magenta(), 512 - end = %((offset + user_artists.len() as i64).min(total_user_artists)).magenta(), 513 - total = %total_user_artists.magenta(), 514 - "Fetched user artists" 515 - ); 583 + let loved_track_sync = tokio::spawn({ 584 + let pools = pools.clone(); 585 + async move { sync_loved_tracks(&pools).await } 586 + }); 516 587 517 - for user_artist in &user_artists { 518 - tracing::info!(user_id = %user_artist.user_id.cyan(), artist_id = %user_artist.artist_id.magenta(), i = %i.magenta(), total = %total_user_artists.magenta(), "Inserting user artist"); 519 - match repo::artist::insert_user_artist(&dest_pool_clone, user_artist).await { 520 - Ok(_) => {} 521 - Err(e) => { 522 - tracing::error!(error = %e, "Failed to insert user artist"); 523 - } 524 - } 525 - i += 1; 526 - } 527 - } 528 - Ok::<(), Error>(()) 588 + let scrobble_sync = tokio::spawn({ 589 + let pools = pools.clone(); 590 + async move { sync_scrobbles(&pools).await } 529 591 }); 530 592 531 - let pool_clone = pool.clone(); 532 - let dest_pool_clone = dest_pool.clone(); 533 - let user_track_sync = tokio::spawn(async move { 534 - let total_user_tracks: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_tracks") 535 - .fetch_one(&pool_clone) 536 - .await?; 537 - let total_user_tracks = total_user_tracks.0; 538 - tracing::info!(total = %total_user_tracks.magenta(), "Total user tracks to sync"); 539 - const BATCH_SIZE: usize = 1000; 593 + let (loved_track_sync, playlist_sync, scrobble_sync) = 594 + tokio::join!(loved_track_sync, playlist_sync, scrobble_sync); 595 + loved_track_sync.context("Loved track sync task failed")??; 596 + playlist_sync.context("Playlist sync task failed")??; 597 + scrobble_sync.context("Scrobble sync task failed")??; 540 598 541 - let start = 0; 542 - let mut i = 1; 599 + // Sync junction tables 600 + let album_track_sync = tokio::spawn({ 601 + let pools = pools.clone(); 602 + async move { sync_album_tracks(&pools).await } 603 + }); 543 604 544 - for offset in (start..total_user_tracks).step_by(BATCH_SIZE) { 545 - let user_tracks = 546 - repo::track::get_user_tracks(&pool_clone, offset as i64, BATCH_SIZE as i64).await?; 547 - tracing::info!( 548 - offset = %offset.magenta(), 549 - end = %((offset + user_tracks.len() as i64).min(total_user_tracks)).magenta(), 550 - total = %total_user_tracks.magenta(), 551 - "Fetched user tracks" 552 - ); 605 + let artist_album_sync = tokio::spawn({ 606 + let pools = pools.clone(); 607 + async move { sync_artist_albums(&pools).await } 608 + }); 553 609 554 - for user_track in &user_tracks { 555 - tracing::info!(user_id = %user_track.user_id.cyan(), track_id = %user_track.track_id.magenta(), i = %i.magenta(), total = %total_user_tracks.magenta(), "Inserting user track"); 556 - match repo::track::insert_user_track(&dest_pool_clone, user_track).await { 557 - Ok(_) => {} 558 - Err(e) => { 559 - tracing::error!(error = %e, "Failed to insert user track"); 560 - } 561 - } 562 - i += 1; 563 - } 564 - } 565 - Ok::<(), Error>(()) 610 + let artist_track_sync = tokio::spawn({ 611 + let pools = pools.clone(); 612 + async move { sync_artist_tracks(&pools).await } 566 613 }); 567 614 568 - let pool_clone = pool.clone(); 569 - let dest_pool_clone = dest_pool.clone(); 570 - let user_playlist_sync = tokio::spawn(async move { 571 - let total_user_playlists: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM user_playlists") 572 - .fetch_one(&pool_clone) 573 - .await?; 615 + let playlist_track_sync = tokio::spawn({ 616 + let pools = pools.clone(); 617 + async move { sync_playlist_tracks(&pools).await } 618 + }); 574 619 575 - let total_user_playlists = total_user_playlists.0; 576 - tracing::info!(total = %total_user_playlists.magenta(), "Total user playlists to sync"); 577 - const BATCH_SIZE: usize = 1000; 620 + let user_album_sync = tokio::spawn({ 621 + let pools = pools.clone(); 622 + async move { sync_user_albums(&pools).await } 623 + }); 578 624 579 - let start = 0; 580 - let mut i = 1; 625 + let user_artist_sync = tokio::spawn({ 626 + let pools = pools.clone(); 627 + async move { sync_user_artists(&pools).await } 628 + }); 581 629 582 - for offset in (start..total_user_playlists).step_by(BATCH_SIZE) { 583 - let user_playlists = 584 - repo::playlist::get_user_playlists(&pool_clone, offset as i64, BATCH_SIZE as i64) 585 - .await?; 586 - tracing::info!( 587 - offset = %offset.magenta(), 588 - end = %((offset + user_playlists.len() as i64).min(total_user_playlists)).magenta(), 589 - total = %total_user_playlists.magenta(), 590 - "Fetched user playlists" 591 - ); 630 + let user_track_sync = tokio::spawn({ 631 + let pools = pools.clone(); 632 + async move { sync_user_tracks(&pools).await } 633 + }); 592 634 593 - for user_playlist in &user_playlists { 594 - tracing::info!(user_id = %user_playlist.user_id.cyan(), playlist_id = %user_playlist.playlist_id.magenta(), i = %i.magenta(), total = %total_user_playlists.magenta(), "Inserting user playlist"); 595 - match repo::playlist::insert_user_playlist(&dest_pool_clone, user_playlist).await { 596 - Ok(_) => {} 597 - Err(e) => { 598 - tracing::error!(error = %e, "Failed to insert user playlist"); 599 - } 600 - } 601 - i += 1; 602 - } 603 - } 604 - Ok::<(), Error>(()) 635 + let user_playlist_sync = tokio::spawn({ 636 + let pools = pools.clone(); 637 + async move { sync_user_playlists(&pools).await } 605 638 }); 606 639 607 640 let (