at main 5.2 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2"""One-time migration script to copy audio files from old 'relay' bucket to new 'audio-prod' bucket. 3 4This script: 51. Fetches all tracks from the production database 62. Identifies tracks with R2 URLs pointing to the old bucket 73. Copies files from old bucket to new bucket 84. Updates the r2_url column in the database 9 10Usage: 11 uv run python scripts/migrate_r2_bucket.py 12""" 13 14import asyncio 15import logging 16 17import boto3 18from botocore.config import Config 19from sqlalchemy import select, update 20from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine 21 22from backend.config import settings 23from backend.models.track import Track 24 25logging.basicConfig(level=logging.INFO) 26logger = logging.getLogger(__name__) 27 28# Old and new bucket configuration 29OLD_BUCKET = "relay" 30OLD_PUBLIC_URL = "https://pub-841ec0f5a7854eaab01292d44aca4820.r2.dev" 31NEW_BUCKET = "audio-prod" 32NEW_PUBLIC_URL = "https://pub-d4ed8a1e39d44dac85263d86ad5676fd.r2.dev" 33R2_ENDPOINT_URL = "https://8feb33b5fb57ce2bc093bc6f4141f40a.r2.cloudflarestorage.com" 34 35 36async def main(): 37 """Run the R2 bucket migration.""" 38 logger.info("Starting R2 bucket migration") 39 40 # Create R2 client 41 if not all( 42 [settings.storage.aws_access_key_id, settings.storage.aws_secret_access_key] 43 ): 44 logger.error("AWS credentials not found in environment") 45 logger.error( 46 "Please set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables" 47 ) 48 return 49 50 r2_client = boto3.client( 51 "s3", 52 endpoint_url=R2_ENDPOINT_URL, 53 aws_access_key_id=settings.storage.aws_access_key_id, 54 aws_secret_access_key=settings.storage.aws_secret_access_key, 55 config=Config( 56 request_checksum_calculation="WHEN_REQUIRED", 57 response_checksum_validation="WHEN_REQUIRED", 58 ), 59 ) 60 61 # Create database session 62 engine = create_async_engine(settings.database.url) 63 async_session = async_sessionmaker(engine, expire_on_commit=False) 64 65 async with async_session() as session: 66 # Fetch all tracks with old bucket URLs 67 result = await session.execute( 68 select(Track).where(Track.r2_url.like(f"{OLD_PUBLIC_URL}%")) 69 ) 70 tracks = result.scalars().all() 71 72 if not tracks: 73 logger.info("No tracks found with old bucket URLs") 74 return 75 76 logger.info(f"Found {len(tracks)} tracks to migrate") 77 78 migrated_count = 0 79 failed_count = 0 80 81 for track in tracks: 82 try: 83 # Extract the S3 key from the old URL 84 # Format: https://pub-841ec0f5a7854eaab01292d44aca4820.r2.dev/audio/FILE_ID.EXT 85 old_key = track.r2_url.replace(f"{OLD_PUBLIC_URL}/", "") 86 new_key = old_key # Same key structure in new bucket 87 88 logger.info(f"Migrating track {track.id}: {track.title}") 89 logger.info(f" Old: {OLD_BUCKET}/{old_key}") 90 logger.info(f" New: {NEW_BUCKET}/{new_key}") 91 92 # Check if file exists in old bucket 93 try: 94 r2_client.head_object(Bucket=OLD_BUCKET, Key=old_key) 95 except r2_client.exceptions.ClientError as e: 96 if e.response["Error"]["Code"] == "404": 97 logger.error(f" File not found in old bucket: {old_key}") 98 failed_count += 1 99 continue 100 raise 101 102 # Check if file already exists in new bucket 103 try: 104 r2_client.head_object(Bucket=NEW_BUCKET, Key=new_key) 105 logger.info(" File already exists in new bucket, skipping copy") 106 except r2_client.exceptions.ClientError as e: 107 if e.response["Error"]["Code"] == "404": 108 # Copy file from old bucket to new bucket 109 logger.info(" Copying file to new bucket...") 110 r2_client.copy_object( 111 Bucket=NEW_BUCKET, 112 Key=new_key, 113 CopySource={"Bucket": OLD_BUCKET, "Key": old_key}, 114 ) 115 logger.info(" File copied successfully") 116 else: 117 raise 118 119 # Update database with new URL 120 new_url = f"{NEW_PUBLIC_URL}/{new_key}" 121 await session.execute( 122 update(Track).where(Track.id == track.id).values(r2_url=new_url) 123 ) 124 125 logger.info(f" Updated database: {new_url}") 126 migrated_count += 1 127 128 except Exception as e: 129 logger.error(f" Failed to migrate track {track.id}: {e}") 130 failed_count += 1 131 132 # Commit all database changes 133 await session.commit() 134 135 logger.info("") 136 logger.info("=" * 60) 137 logger.info("Migration complete!") 138 logger.info(f" Migrated: {migrated_count}") 139 logger.info(f" Failed: {failed_count}") 140 logger.info("=" * 60) 141 142 143if __name__ == "__main__": 144 asyncio.run(main())