at main 7.1 kB view raw
1#!/usr/bin/env -S uv run --script --quiet --with-editable=backend 2# /// script 3# requires-python = ">=3.12" 4# dependencies = [ 5# "httpx", 6# "pydantic-settings", 7# "asyncpg", 8# "sqlalchemy[asyncio]", 9# ] 10# /// 11"""migrate sensitive images from backend database to moderation service. 12 13this script reads sensitive images from the backend database and creates 14them in the moderation service. after migration, the backend will proxy 15sensitive image requests to the moderation service. 16 17usage: 18 uv run scripts/migrate_sensitive_images.py --env prod --dry-run 19 uv run scripts/migrate_sensitive_images.py --env prod 20 21environment variables (set in .env or export): 22 PROD_DATABASE_URL - production database connection string 23 STAGING_DATABASE_URL - staging database connection string 24 DEV_DATABASE_URL - development database connection string 25 MODERATION_SERVICE_URL - URL of moderation service 26 MODERATION_AUTH_TOKEN - auth token for moderation service 27""" 28 29import argparse 30import asyncio 31import os 32from typing import Literal 33 34import httpx 35from pydantic import Field 36from pydantic_settings import BaseSettings, SettingsConfigDict 37from sqlalchemy import text 38from sqlalchemy.ext.asyncio import create_async_engine 39 40Environment = Literal["dev", "staging", "prod"] 41 42 43class MigrationSettings(BaseSettings): 44 """settings for migration script.""" 45 46 model_config = SettingsConfigDict( 47 env_file=".env", 48 case_sensitive=False, 49 extra="ignore", 50 ) 51 52 dev_database_url: str = Field(default="", validation_alias="DEV_DATABASE_URL") 53 staging_database_url: str = Field( 54 default="", validation_alias="STAGING_DATABASE_URL" 55 ) 56 prod_database_url: str = Field(default="", validation_alias="PROD_DATABASE_URL") 57 58 moderation_service_url: str = Field( 59 default="https://moderation.plyr.fm", 60 validation_alias="MODERATION_SERVICE_URL", 61 ) 62 moderation_auth_token: str = Field( 63 default="", validation_alias="MODERATION_AUTH_TOKEN" 64 ) 65 66 def get_database_url(self, env: Environment) -> str: 67 """get database URL for environment.""" 68 urls = { 69 "dev": self.dev_database_url, 70 "staging": self.staging_database_url, 71 "prod": self.prod_database_url, 72 } 73 url = urls.get(env, "") 74 if not url: 75 raise ValueError(f"no database URL configured for {env}") 76 # ensure asyncpg driver is used 77 if url.startswith("postgresql://"): 78 url = url.replace("postgresql://", "postgresql+asyncpg://", 1) 79 return url 80 81 def get_moderation_url(self, env: Environment) -> str: 82 """get moderation service URL for environment.""" 83 if env == "dev": 84 return os.getenv("DEV_MODERATION_URL", "http://localhost:8002") 85 elif env == "staging": 86 return os.getenv("STAGING_MODERATION_URL", "https://moderation-stg.plyr.fm") 87 else: 88 return self.moderation_service_url 89 90 91async def fetch_sensitive_images(db_url: str) -> list[dict]: 92 """fetch all sensitive images from backend database.""" 93 engine = create_async_engine(db_url) 94 95 async with engine.begin() as conn: 96 result = await conn.execute( 97 text( 98 """ 99 SELECT id, image_id, url, reason, flagged_at, flagged_by 100 FROM sensitive_images 101 ORDER BY id 102 """ 103 ) 104 ) 105 rows = result.fetchall() 106 107 await engine.dispose() 108 109 return [ 110 { 111 "id": row[0], 112 "image_id": row[1], 113 "url": row[2], 114 "reason": row[3], 115 "flagged_at": row[4].isoformat() if row[4] else None, 116 "flagged_by": row[5], 117 } 118 for row in rows 119 ] 120 121 122async def migrate_to_moderation_service( 123 images: list[dict], 124 moderation_url: str, 125 auth_token: str, 126 dry_run: bool = False, 127) -> tuple[int, int]: 128 """migrate images to moderation service. 129 130 returns: 131 tuple of (success_count, error_count) 132 """ 133 success_count = 0 134 error_count = 0 135 136 headers = {"X-Moderation-Key": auth_token} 137 138 async with httpx.AsyncClient(timeout=30.0) as client: 139 for image in images: 140 payload = { 141 "image_id": image["image_id"], 142 "url": image["url"], 143 "reason": image["reason"], 144 "flagged_by": image["flagged_by"], 145 } 146 147 if dry_run: 148 print(f" [dry-run] would migrate: {payload}") 149 success_count += 1 150 continue 151 152 try: 153 response = await client.post( 154 f"{moderation_url}/admin/sensitive-images", 155 json=payload, 156 headers=headers, 157 ) 158 response.raise_for_status() 159 result = response.json() 160 print(f" migrated id={image['id']} -> moderation id={result['id']}") 161 success_count += 1 162 except Exception as e: 163 print(f" ERROR migrating id={image['id']}: {e}") 164 error_count += 1 165 166 return success_count, error_count 167 168 169async def main() -> None: 170 parser = argparse.ArgumentParser( 171 description="migrate sensitive images to moderation service" 172 ) 173 parser.add_argument( 174 "--env", 175 choices=["dev", "staging", "prod"], 176 required=True, 177 help="environment to migrate", 178 ) 179 parser.add_argument( 180 "--dry-run", 181 action="store_true", 182 help="show what would be migrated without making changes", 183 ) 184 args = parser.parse_args() 185 186 settings = MigrationSettings() 187 188 print(f"migrating sensitive images for {args.env}") 189 if args.dry_run: 190 print("(dry run - no changes will be made)") 191 192 # fetch from backend database 193 db_url = settings.get_database_url(args.env) 194 print("\nfetching from backend database...") 195 images = await fetch_sensitive_images(db_url) 196 print(f"found {len(images)} sensitive images") 197 198 if not images: 199 print("nothing to migrate") 200 return 201 202 # migrate to moderation service 203 moderation_url = settings.get_moderation_url(args.env) 204 print(f"\nmigrating to moderation service at {moderation_url}...") 205 206 if not settings.moderation_auth_token and not args.dry_run: 207 print("ERROR: MODERATION_AUTH_TOKEN not set") 208 return 209 210 success, errors = await migrate_to_moderation_service( 211 images, 212 moderation_url, 213 settings.moderation_auth_token, 214 dry_run=args.dry_run, 215 ) 216 217 print(f"\ndone: {success} migrated, {errors} errors") 218 219 if not args.dry_run and errors == 0: 220 print( 221 "\nnext steps:\n" 222 " 1. verify data in moderation service: GET /sensitive-images\n" 223 " 2. update backend to proxy to moderation service\n" 224 " 3. optionally drop sensitive_images table from backend db" 225 ) 226 227 228if __name__ == "__main__": 229 asyncio.run(main())