music on atproto
plyr.fm
1#!/usr/bin/env -S uv run --script --quiet --with-editable=backend
2# /// script
3# requires-python = ">=3.12"
4# dependencies = [
5# "httpx",
6# "pydantic-settings",
7# "asyncpg",
8# "sqlalchemy[asyncio]",
9# ]
10# ///
11"""migrate sensitive images from backend database to moderation service.
12
13this script reads sensitive images from the backend database and creates
14them in the moderation service. after migration, the backend will proxy
15sensitive image requests to the moderation service.
16
17usage:
18 uv run scripts/migrate_sensitive_images.py --env prod --dry-run
19 uv run scripts/migrate_sensitive_images.py --env prod
20
21environment variables (set in .env or export):
22 PROD_DATABASE_URL - production database connection string
23 STAGING_DATABASE_URL - staging database connection string
24 DEV_DATABASE_URL - development database connection string
25 MODERATION_SERVICE_URL - URL of moderation service
26 MODERATION_AUTH_TOKEN - auth token for moderation service
27"""
28
29import argparse
30import asyncio
31import os
32from typing import Literal
33
34import httpx
35from pydantic import Field
36from pydantic_settings import BaseSettings, SettingsConfigDict
37from sqlalchemy import text
38from sqlalchemy.ext.asyncio import create_async_engine
39
40Environment = Literal["dev", "staging", "prod"]
41
42
43class MigrationSettings(BaseSettings):
44 """settings for migration script."""
45
46 model_config = SettingsConfigDict(
47 env_file=".env",
48 case_sensitive=False,
49 extra="ignore",
50 )
51
52 dev_database_url: str = Field(default="", validation_alias="DEV_DATABASE_URL")
53 staging_database_url: str = Field(
54 default="", validation_alias="STAGING_DATABASE_URL"
55 )
56 prod_database_url: str = Field(default="", validation_alias="PROD_DATABASE_URL")
57
58 moderation_service_url: str = Field(
59 default="https://moderation.plyr.fm",
60 validation_alias="MODERATION_SERVICE_URL",
61 )
62 moderation_auth_token: str = Field(
63 default="", validation_alias="MODERATION_AUTH_TOKEN"
64 )
65
66 def get_database_url(self, env: Environment) -> str:
67 """get database URL for environment."""
68 urls = {
69 "dev": self.dev_database_url,
70 "staging": self.staging_database_url,
71 "prod": self.prod_database_url,
72 }
73 url = urls.get(env, "")
74 if not url:
75 raise ValueError(f"no database URL configured for {env}")
76 # ensure asyncpg driver is used
77 if url.startswith("postgresql://"):
78 url = url.replace("postgresql://", "postgresql+asyncpg://", 1)
79 return url
80
81 def get_moderation_url(self, env: Environment) -> str:
82 """get moderation service URL for environment."""
83 if env == "dev":
84 return os.getenv("DEV_MODERATION_URL", "http://localhost:8002")
85 elif env == "staging":
86 return os.getenv("STAGING_MODERATION_URL", "https://moderation-stg.plyr.fm")
87 else:
88 return self.moderation_service_url
89
90
91async def fetch_sensitive_images(db_url: str) -> list[dict]:
92 """fetch all sensitive images from backend database."""
93 engine = create_async_engine(db_url)
94
95 async with engine.begin() as conn:
96 result = await conn.execute(
97 text(
98 """
99 SELECT id, image_id, url, reason, flagged_at, flagged_by
100 FROM sensitive_images
101 ORDER BY id
102 """
103 )
104 )
105 rows = result.fetchall()
106
107 await engine.dispose()
108
109 return [
110 {
111 "id": row[0],
112 "image_id": row[1],
113 "url": row[2],
114 "reason": row[3],
115 "flagged_at": row[4].isoformat() if row[4] else None,
116 "flagged_by": row[5],
117 }
118 for row in rows
119 ]
120
121
122async def migrate_to_moderation_service(
123 images: list[dict],
124 moderation_url: str,
125 auth_token: str,
126 dry_run: bool = False,
127) -> tuple[int, int]:
128 """migrate images to moderation service.
129
130 returns:
131 tuple of (success_count, error_count)
132 """
133 success_count = 0
134 error_count = 0
135
136 headers = {"X-Moderation-Key": auth_token}
137
138 async with httpx.AsyncClient(timeout=30.0) as client:
139 for image in images:
140 payload = {
141 "image_id": image["image_id"],
142 "url": image["url"],
143 "reason": image["reason"],
144 "flagged_by": image["flagged_by"],
145 }
146
147 if dry_run:
148 print(f" [dry-run] would migrate: {payload}")
149 success_count += 1
150 continue
151
152 try:
153 response = await client.post(
154 f"{moderation_url}/admin/sensitive-images",
155 json=payload,
156 headers=headers,
157 )
158 response.raise_for_status()
159 result = response.json()
160 print(f" migrated id={image['id']} -> moderation id={result['id']}")
161 success_count += 1
162 except Exception as e:
163 print(f" ERROR migrating id={image['id']}: {e}")
164 error_count += 1
165
166 return success_count, error_count
167
168
169async def main() -> None:
170 parser = argparse.ArgumentParser(
171 description="migrate sensitive images to moderation service"
172 )
173 parser.add_argument(
174 "--env",
175 choices=["dev", "staging", "prod"],
176 required=True,
177 help="environment to migrate",
178 )
179 parser.add_argument(
180 "--dry-run",
181 action="store_true",
182 help="show what would be migrated without making changes",
183 )
184 args = parser.parse_args()
185
186 settings = MigrationSettings()
187
188 print(f"migrating sensitive images for {args.env}")
189 if args.dry_run:
190 print("(dry run - no changes will be made)")
191
192 # fetch from backend database
193 db_url = settings.get_database_url(args.env)
194 print("\nfetching from backend database...")
195 images = await fetch_sensitive_images(db_url)
196 print(f"found {len(images)} sensitive images")
197
198 if not images:
199 print("nothing to migrate")
200 return
201
202 # migrate to moderation service
203 moderation_url = settings.get_moderation_url(args.env)
204 print(f"\nmigrating to moderation service at {moderation_url}...")
205
206 if not settings.moderation_auth_token and not args.dry_run:
207 print("ERROR: MODERATION_AUTH_TOKEN not set")
208 return
209
210 success, errors = await migrate_to_moderation_service(
211 images,
212 moderation_url,
213 settings.moderation_auth_token,
214 dry_run=args.dry_run,
215 )
216
217 print(f"\ndone: {success} migrated, {errors} errors")
218
219 if not args.dry_run and errors == 0:
220 print(
221 "\nnext steps:\n"
222 " 1. verify data in moderation service: GET /sensitive-images\n"
223 " 2. update backend to proxy to moderation service\n"
224 " 3. optionally drop sensitive_images table from backend db"
225 )
226
227
228if __name__ == "__main__":
229 asyncio.run(main())