feat: add admin UI for reviewing copyright flags (#390)

* feat: add admin UI for reviewing copyright flags

adds /admin endpoint to moderation service with:
- simple HTML/JS UI for reviewing flagged tracks
- /admin/flags API to list copyright-violation labels
- /admin/resolve API to create negation labels (false positives)
- auth via existing X-Moderation-Key header

resolves #388

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: modularize moderation service

breaks main.rs into focused modules:
- config.rs: environment config loading
- state.rs: AppState and AppError types
- auth.rs: authentication middleware
- audd.rs: AuDD fingerprinting types and handler
- xrpc.rs: ATProto labeler endpoints
- handlers.rs: health, landing, emit_label

main.rs now only composes routes (~100 lines vs ~700)

also fixes clippy warning in labels.rs (redundant closure)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>

authored by zzstoatzz.io Claude and committed by GitHub 7e334b38 d3ecaf81

+46 -2
moderation/Cargo.lock
··· 786 ] 787 788 [[package]] 789 name = "httparse" 790 version = "1.10.1" 791 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 833 "tokio", 834 "tokio-rustls", 835 "tower-service", 836 - "webpki-roots", 837 ] 838 839 [[package]] ··· 1171 checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" 1172 1173 [[package]] 1174 name = "mio" 1175 version = "1.1.0" 1176 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1202 "thiserror 2.0.17", 1203 "tokio", 1204 "tokio-stream", 1205 "tracing", 1206 "tracing-subscriber", 1207 ] ··· 1590 "wasm-bindgen", 1591 "wasm-bindgen-futures", 1592 "web-sys", 1593 - "webpki-roots", 1594 ] 1595 1596 [[package]] ··· 1935 "memchr", 1936 "once_cell", 1937 "percent-encoding", 1938 "serde", 1939 "serde_json", 1940 "sha2", ··· 1944 "tokio-stream", 1945 "tracing", 1946 "url", 1947 ] 1948 1949 [[package]] ··· 2327 dependencies = [ 2328 "bitflags", 2329 "bytes", 2330 "futures-util", 2331 "http", 2332 "http-body", 2333 "iri-string", 2334 "pin-project-lite", 2335 "tower", 2336 "tower-layer", 2337 "tower-service", 2338 ] 2339 2340 [[package]] ··· 2440 version = "1.19.0" 2441 source = "registry+https://github.com/rust-lang/crates.io-index" 2442 checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" 2443 2444 [[package]] 2445 name = "unicode-bidi" ··· 2628 dependencies = [ 2629 "js-sys", 2630 "wasm-bindgen", 2631 ] 2632 2633 [[package]]
··· 786 ] 787 788 [[package]] 789 + name = "http-range-header" 790 + version = "0.4.2" 791 + source = "registry+https://github.com/rust-lang/crates.io-index" 792 + checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c" 793 + 794 + [[package]] 795 name = "httparse" 796 version = "1.10.1" 797 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 839 "tokio", 840 "tokio-rustls", 841 "tower-service", 842 + "webpki-roots 1.0.4", 843 ] 844 845 [[package]] ··· 1177 checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" 1178 1179 [[package]] 1180 + name = "mime_guess" 1181 + version = "2.0.5" 1182 + source = "registry+https://github.com/rust-lang/crates.io-index" 1183 + checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" 1184 + dependencies = [ 1185 + "mime", 1186 + "unicase", 1187 + ] 1188 + 1189 + [[package]] 1190 name = "mio" 1191 version = "1.1.0" 1192 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1218 "thiserror 2.0.17", 1219 "tokio", 1220 "tokio-stream", 1221 + "tower-http", 1222 "tracing", 1223 "tracing-subscriber", 1224 ] ··· 1607 "wasm-bindgen", 1608 "wasm-bindgen-futures", 1609 "web-sys", 1610 + "webpki-roots 1.0.4", 1611 ] 1612 1613 [[package]] ··· 1952 "memchr", 1953 "once_cell", 1954 "percent-encoding", 1955 + "rustls", 1956 "serde", 1957 "serde_json", 1958 "sha2", ··· 1962 "tokio-stream", 1963 "tracing", 1964 "url", 1965 + "webpki-roots 0.26.11", 1966 ] 1967 1968 [[package]] ··· 2346 dependencies = [ 2347 "bitflags", 2348 "bytes", 2349 + "futures-core", 2350 "futures-util", 2351 "http", 2352 "http-body", 2353 + "http-body-util", 2354 + "http-range-header", 2355 + "httpdate", 2356 "iri-string", 2357 + "mime", 2358 + "mime_guess", 2359 + "percent-encoding", 2360 "pin-project-lite", 2361 + "tokio", 2362 + "tokio-util", 2363 "tower", 2364 "tower-layer", 2365 "tower-service", 2366 + "tracing", 2367 ] 2368 2369 [[package]] ··· 2469 version = "1.19.0" 2470 source = "registry+https://github.com/rust-lang/crates.io-index" 2471 checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" 2472 + 2473 + [[package]] 2474 + name = "unicase" 2475 + version = "2.8.1" 2476 + source = "registry+https://github.com/rust-lang/crates.io-index" 2477 + checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" 2478 2479 [[package]] 2480 name = "unicode-bidi" ··· 2663 dependencies = [ 2664 "js-sys", 2665 "wasm-bindgen", 2666 + ] 2667 + 2668 + [[package]] 2669 + name = "webpki-roots" 2670 + version = "0.26.11" 2671 + source = "registry+https://github.com/rust-lang/crates.io-index" 2672 + checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" 2673 + dependencies = [ 2674 + "webpki-roots 1.0.4", 2675 ] 2676 2677 [[package]]
+1
moderation/Cargo.toml
··· 20 thiserror = "2.0" 21 tokio = { version = "1.40", features = ["rt-multi-thread", "macros", "signal", "sync"] } 22 tokio-stream = { version = "0.1", features = ["sync"] } 23 tracing = "0.1" 24 tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } 25
··· 20 thiserror = "2.0" 21 tokio = { version = "1.40", features = ["rt-multi-thread", "macros", "signal", "sync"] } 22 tokio-stream = { version = "0.1", features = ["sync"] } 23 + tower-http = { version = "0.6", features = ["fs"] } 24 tracing = "0.1" 25 tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } 26
+331
moderation/src/admin.rs
···
··· 1 + //! Admin API for reviewing and resolving copyright flags. 2 + 3 + use axum::{extract::State, response::Html, Json}; 4 + use serde::{Deserialize, Serialize}; 5 + 6 + use crate::state::{AppError, AppState}; 7 + 8 + /// A flagged track pending review. 9 + #[derive(Debug, Serialize)] 10 + pub struct FlaggedTrack { 11 + pub seq: i64, 12 + pub uri: String, 13 + pub val: String, 14 + pub created_at: String, 15 + /// If there's a negation label for this URI+val, it's been resolved. 16 + pub resolved: bool, 17 + } 18 + 19 + /// Response for listing flagged tracks. 20 + #[derive(Debug, Serialize)] 21 + pub struct ListFlaggedResponse { 22 + pub tracks: Vec<FlaggedTrack>, 23 + } 24 + 25 + /// Request to resolve (negate) a flag. 26 + #[derive(Debug, Deserialize)] 27 + pub struct ResolveRequest { 28 + pub uri: String, 29 + #[serde(default = "default_val")] 30 + pub val: String, 31 + } 32 + 33 + fn default_val() -> String { 34 + "copyright-violation".to_string() 35 + } 36 + 37 + /// Response after resolving a flag. 38 + #[derive(Debug, Serialize)] 39 + pub struct ResolveResponse { 40 + pub seq: i64, 41 + pub message: String, 42 + } 43 + 44 + /// List all flagged tracks (copyright-violation labels without negations). 45 + pub async fn list_flagged( 46 + State(state): State<AppState>, 47 + ) -> Result<Json<ListFlaggedResponse>, AppError> { 48 + let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 49 + 50 + let tracks = db.get_pending_flags().await?; 51 + 52 + Ok(Json(ListFlaggedResponse { tracks })) 53 + } 54 + 55 + /// Resolve (negate) a copyright flag, marking it as a false positive. 56 + pub async fn resolve_flag( 57 + State(state): State<AppState>, 58 + Json(request): Json<ResolveRequest>, 59 + ) -> Result<Json<ResolveResponse>, AppError> { 60 + let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 61 + let signer = state.signer.as_ref().ok_or(AppError::LabelerNotConfigured)?; 62 + 63 + tracing::info!(uri = %request.uri, val = %request.val, "resolving flag (creating negation)"); 64 + 65 + // Create a negation label 66 + let label = crate::labels::Label::new(signer.did(), &request.uri, &request.val).negated(); 67 + let label = signer.sign_label(label)?; 68 + 69 + let seq = db.store_label(&label).await?; 70 + 71 + // Broadcast to subscribers 72 + if let Some(tx) = &state.label_tx { 73 + let _ = tx.send((seq, label)); 74 + } 75 + 76 + Ok(Json(ResolveResponse { 77 + seq, 78 + message: format!("created negation label for {}", request.uri), 79 + })) 80 + } 81 + 82 + /// Serve the admin UI HTML. 83 + pub async fn admin_ui() -> Html<&'static str> { 84 + Html(ADMIN_HTML) 85 + } 86 + 87 + const ADMIN_HTML: &str = r##"<!DOCTYPE html> 88 + <html> 89 + <head> 90 + <meta charset="utf-8"> 91 + <meta name="viewport" content="width=device-width, initial-scale=1"> 92 + <title>plyr.fm moderation admin</title> 93 + <style> 94 + * { box-sizing: border-box; } 95 + body { 96 + font-family: system-ui, -apple-system, sans-serif; 97 + background: #0a0a0a; 98 + color: #e5e5e5; 99 + max-width: 900px; 100 + margin: 0 auto; 101 + padding: 20px; 102 + line-height: 1.6; 103 + } 104 + h1 { color: #fff; margin-bottom: 8px; } 105 + .subtitle { color: #888; margin-bottom: 24px; } 106 + .auth-form { 107 + background: #111; 108 + border: 1px solid #222; 109 + border-radius: 8px; 110 + padding: 20px; 111 + margin-bottom: 24px; 112 + } 113 + .auth-form input { 114 + background: #1a1a1a; 115 + border: 1px solid #333; 116 + color: #fff; 117 + padding: 8px 12px; 118 + border-radius: 4px; 119 + width: 300px; 120 + margin-right: 8px; 121 + } 122 + .auth-form button { 123 + background: #3b82f6; 124 + color: #fff; 125 + border: none; 126 + padding: 8px 16px; 127 + border-radius: 4px; 128 + cursor: pointer; 129 + } 130 + .auth-form button:hover { background: #2563eb; } 131 + .status { 132 + padding: 8px 12px; 133 + border-radius: 4px; 134 + margin-bottom: 16px; 135 + display: none; 136 + } 137 + .status.error { display: block; background: rgba(239, 68, 68, 0.2); color: #ef4444; } 138 + .status.success { display: block; background: rgba(34, 197, 94, 0.2); color: #22c55e; } 139 + table { 140 + width: 100%; 141 + border-collapse: collapse; 142 + } 143 + th, td { 144 + text-align: left; 145 + padding: 12px; 146 + border-bottom: 1px solid #222; 147 + } 148 + th { color: #888; font-weight: 500; } 149 + .uri { 150 + font-family: monospace; 151 + font-size: 0.85em; 152 + word-break: break-all; 153 + } 154 + .badge { 155 + display: inline-block; 156 + padding: 2px 8px; 157 + border-radius: 4px; 158 + font-size: 0.8em; 159 + } 160 + .badge.pending { background: rgba(234, 179, 8, 0.2); color: #eab308; } 161 + .badge.resolved { background: rgba(34, 197, 94, 0.2); color: #22c55e; } 162 + .resolve-btn { 163 + background: #f59e0b; 164 + color: #000; 165 + border: none; 166 + padding: 6px 12px; 167 + border-radius: 4px; 168 + cursor: pointer; 169 + font-size: 0.85em; 170 + } 171 + .resolve-btn:hover { background: #d97706; } 172 + .resolve-btn:disabled { background: #444; color: #888; cursor: not-allowed; } 173 + .empty { color: #666; text-align: center; padding: 40px; } 174 + .loading { color: #888; } 175 + .refresh-btn { 176 + background: #333; 177 + color: #fff; 178 + border: none; 179 + padding: 8px 16px; 180 + border-radius: 4px; 181 + cursor: pointer; 182 + margin-left: auto; 183 + } 184 + .refresh-btn:hover { background: #444; } 185 + .header-row { 186 + display: flex; 187 + align-items: center; 188 + margin-bottom: 16px; 189 + } 190 + .header-row h2 { margin: 0; } 191 + </style> 192 + </head> 193 + <body> 194 + <h1>moderation admin</h1> 195 + <p class="subtitle">review and resolve copyright flags</p> 196 + 197 + <div class="auth-form"> 198 + <input type="password" id="token" placeholder="moderation auth token"> 199 + <button onclick="authenticate()">authenticate</button> 200 + </div> 201 + 202 + <div id="status" class="status"></div> 203 + 204 + <div id="content" style="display: none;"> 205 + <div class="header-row"> 206 + <h2>flagged tracks</h2> 207 + <button class="refresh-btn" onclick="loadFlags()">refresh</button> 208 + </div> 209 + <table> 210 + <thead> 211 + <tr> 212 + <th>seq</th> 213 + <th>uri</th> 214 + <th>label</th> 215 + <th>status</th> 216 + <th>action</th> 217 + </tr> 218 + </thead> 219 + <tbody id="flags-table"> 220 + <tr><td colspan="5" class="loading">loading...</td></tr> 221 + </tbody> 222 + </table> 223 + </div> 224 + 225 + <script> 226 + let authToken = localStorage.getItem('moderation_token') || ''; 227 + 228 + if (authToken) { 229 + document.getElementById('token').value = '••••••••'; 230 + showContent(); 231 + loadFlags(); 232 + } 233 + 234 + function authenticate() { 235 + authToken = document.getElementById('token').value; 236 + localStorage.setItem('moderation_token', authToken); 237 + showContent(); 238 + loadFlags(); 239 + } 240 + 241 + function showContent() { 242 + document.getElementById('content').style.display = 'block'; 243 + } 244 + 245 + function showStatus(message, type) { 246 + const status = document.getElementById('status'); 247 + status.textContent = message; 248 + status.className = 'status ' + type; 249 + } 250 + 251 + async function loadFlags() { 252 + const tbody = document.getElementById('flags-table'); 253 + tbody.innerHTML = '<tr><td colspan="5" class="loading">loading...</td></tr>'; 254 + 255 + try { 256 + const res = await fetch('/admin/flags', { 257 + headers: { 'X-Moderation-Key': authToken } 258 + }); 259 + 260 + if (!res.ok) { 261 + if (res.status === 401) { 262 + showStatus('invalid token', 'error'); 263 + localStorage.removeItem('moderation_token'); 264 + return; 265 + } 266 + throw new Error('failed to load flags'); 267 + } 268 + 269 + const data = await res.json(); 270 + 271 + if (data.tracks.length === 0) { 272 + tbody.innerHTML = '<tr><td colspan="5" class="empty">no flagged tracks</td></tr>'; 273 + return; 274 + } 275 + 276 + tbody.innerHTML = data.tracks.map(track => ` 277 + <tr> 278 + <td>${track.seq}</td> 279 + <td class="uri">${escapeHtml(track.uri)}</td> 280 + <td><span class="badge pending">${escapeHtml(track.val)}</span></td> 281 + <td><span class="badge ${track.resolved ? 'resolved' : 'pending'}">${track.resolved ? 'resolved' : 'pending'}</span></td> 282 + <td> 283 + <button class="resolve-btn" 284 + onclick="resolveFlag('${escapeHtml(track.uri)}', '${escapeHtml(track.val)}')" 285 + ${track.resolved ? 'disabled' : ''}> 286 + ${track.resolved ? 'resolved' : 'mark false positive'} 287 + </button> 288 + </td> 289 + </tr> 290 + `).join(''); 291 + 292 + } catch (err) { 293 + tbody.innerHTML = `<tr><td colspan="5" class="empty">error: ${err.message}</td></tr>`; 294 + } 295 + } 296 + 297 + async function resolveFlag(uri, val) { 298 + try { 299 + const res = await fetch('/admin/resolve', { 300 + method: 'POST', 301 + headers: { 302 + 'Content-Type': 'application/json', 303 + 'X-Moderation-Key': authToken 304 + }, 305 + body: JSON.stringify({ uri, val }) 306 + }); 307 + 308 + if (!res.ok) { 309 + const data = await res.json(); 310 + throw new Error(data.message || 'failed to resolve'); 311 + } 312 + 313 + showStatus('flag resolved successfully', 'success'); 314 + loadFlags(); 315 + 316 + } catch (err) { 317 + showStatus(err.message, 'error'); 318 + } 319 + } 320 + 321 + function escapeHtml(str) { 322 + return str.replace(/&/g, '&amp;') 323 + .replace(/</g, '&lt;') 324 + .replace(/>/g, '&gt;') 325 + .replace(/"/g, '&quot;') 326 + .replace(/'/g, '&#039;'); 327 + } 328 + </script> 329 + </body> 330 + </html> 331 + "##;
+190
moderation/src/audd.rs
···
··· 1 + //! AuDD audio fingerprinting integration. 2 + 3 + use axum::{extract::State, Json}; 4 + use serde::{Deserialize, Serialize}; 5 + use tracing::info; 6 + 7 + use crate::state::{AppError, AppState}; 8 + 9 + // --- request/response types --- 10 + 11 + #[derive(Debug, Deserialize)] 12 + pub struct ScanRequest { 13 + pub audio_url: String, 14 + } 15 + 16 + #[derive(Debug, Serialize)] 17 + pub struct ScanResponse { 18 + pub matches: Vec<AuddMatch>, 19 + pub is_flagged: bool, 20 + pub highest_score: i32, 21 + pub raw_response: serde_json::Value, 22 + } 23 + 24 + #[derive(Debug, Serialize, Clone)] 25 + pub struct AuddMatch { 26 + pub artist: String, 27 + pub title: String, 28 + #[serde(skip_serializing_if = "Option::is_none")] 29 + pub album: Option<String>, 30 + pub score: i32, 31 + #[serde(skip_serializing_if = "Option::is_none")] 32 + pub isrc: Option<String>, 33 + #[serde(skip_serializing_if = "Option::is_none")] 34 + pub timecode: Option<String>, 35 + #[serde(skip_serializing_if = "Option::is_none")] 36 + pub offset_ms: Option<i64>, 37 + } 38 + 39 + // --- audd api types --- 40 + 41 + #[derive(Debug, Deserialize)] 42 + pub struct AuddResponse { 43 + pub status: Option<String>, 44 + pub result: Option<AuddResult>, 45 + } 46 + 47 + #[derive(Debug, Deserialize)] 48 + #[serde(untagged)] 49 + pub enum AuddResult { 50 + Groups(Vec<AuddGroup>), 51 + Single(AuddSong), 52 + } 53 + 54 + #[derive(Debug, Deserialize)] 55 + pub struct AuddGroup { 56 + pub offset: Option<serde_json::Value>, 57 + pub songs: Option<Vec<AuddSong>>, 58 + } 59 + 60 + #[derive(Debug, Deserialize)] 61 + #[allow(dead_code)] 62 + pub struct AuddSong { 63 + pub artist: Option<String>, 64 + pub title: Option<String>, 65 + pub album: Option<String>, 66 + pub score: Option<i32>, 67 + pub isrc: Option<String>, 68 + pub timecode: Option<String>, 69 + pub release_date: Option<String>, 70 + pub label: Option<String>, 71 + pub song_link: Option<String>, 72 + } 73 + 74 + // --- handler --- 75 + 76 + /// Scan audio for copyright matches via AuDD. 77 + pub async fn scan( 78 + State(state): State<AppState>, 79 + Json(request): Json<ScanRequest>, 80 + ) -> Result<Json<ScanResponse>, AppError> { 81 + info!(audio_url = %request.audio_url, "scanning audio"); 82 + 83 + let client = reqwest::Client::new(); 84 + let response = client 85 + .post(&state.audd_api_url) 86 + .form(&[ 87 + ("api_token", &state.audd_api_token), 88 + ("url", &request.audio_url), 89 + ("accurate_offsets", &"1".to_string()), 90 + ]) 91 + .send() 92 + .await 93 + .map_err(|e| AppError::Audd(format!("request failed: {e}")))?; 94 + 95 + let raw_response: serde_json::Value = response 96 + .json() 97 + .await 98 + .map_err(|e| AppError::Audd(format!("failed to parse response: {e}")))?; 99 + 100 + let audd_response: AuddResponse = serde_json::from_value(raw_response.clone()) 101 + .map_err(|e| AppError::Audd(format!("failed to parse audd response: {e}")))?; 102 + 103 + if audd_response.status.as_deref() == Some("error") { 104 + return Err(AppError::Audd(format!( 105 + "audd returned error: {}", 106 + raw_response 107 + ))); 108 + } 109 + 110 + let matches = extract_matches(&audd_response); 111 + let highest_score = matches.iter().map(|m| m.score).max().unwrap_or(0); 112 + let is_flagged = !matches.is_empty(); 113 + 114 + info!( 115 + match_count = matches.len(), 116 + highest_score, 117 + is_flagged, 118 + "scan complete" 119 + ); 120 + 121 + Ok(Json(ScanResponse { 122 + matches, 123 + is_flagged, 124 + highest_score, 125 + raw_response, 126 + })) 127 + } 128 + 129 + // --- helpers --- 130 + 131 + fn extract_matches(response: &AuddResponse) -> Vec<AuddMatch> { 132 + let Some(result) = &response.result else { 133 + return vec![]; 134 + }; 135 + 136 + match result { 137 + AuddResult::Groups(groups) => groups 138 + .iter() 139 + .flat_map(|group| { 140 + group 141 + .songs 142 + .as_ref() 143 + .map(|songs| { 144 + songs 145 + .iter() 146 + .map(|song| parse_song(song, group.offset.as_ref())) 147 + .collect::<Vec<_>>() 148 + }) 149 + .unwrap_or_default() 150 + }) 151 + .collect(), 152 + AuddResult::Single(song) => vec![parse_song(song, None)], 153 + } 154 + } 155 + 156 + fn parse_song(song: &AuddSong, offset: Option<&serde_json::Value>) -> AuddMatch { 157 + let offset_ms = offset.and_then(|v| match v { 158 + serde_json::Value::Number(n) => n.as_i64(), 159 + serde_json::Value::String(s) => parse_timecode_to_ms(s), 160 + _ => None, 161 + }); 162 + 163 + AuddMatch { 164 + artist: song.artist.clone().unwrap_or_else(|| "Unknown".to_string()), 165 + title: song.title.clone().unwrap_or_else(|| "Unknown".to_string()), 166 + album: song.album.clone(), 167 + score: song.score.unwrap_or(0), 168 + isrc: song.isrc.clone(), 169 + timecode: song.timecode.clone(), 170 + offset_ms, 171 + } 172 + } 173 + 174 + fn parse_timecode_to_ms(timecode: &str) -> Option<i64> { 175 + let parts: Vec<&str> = timecode.split(':').collect(); 176 + match parts.len() { 177 + 2 => { 178 + let mins: i64 = parts[0].parse().ok()?; 179 + let secs: i64 = parts[1].parse().ok()?; 180 + Some((mins * 60 + secs) * 1000) 181 + } 182 + 3 => { 183 + let hours: i64 = parts[0].parse().ok()?; 184 + let mins: i64 = parts[1].parse().ok()?; 185 + let secs: i64 = parts[2].parse().ok()?; 186 + Some((hours * 3600 + mins * 60 + secs) * 1000) 187 + } 188 + _ => None, 189 + } 190 + }
+43
moderation/src/auth.rs
···
··· 1 + //! Authentication middleware. 2 + 3 + use axum::{extract::Request, http::StatusCode, middleware::Next, response::Response}; 4 + use tracing::warn; 5 + 6 + /// Auth middleware that checks X-Moderation-Key header for protected endpoints. 7 + pub async fn auth_middleware( 8 + req: Request, 9 + next: Next, 10 + auth_token: Option<String>, 11 + ) -> Result<Response, StatusCode> { 12 + let path = req.uri().path(); 13 + 14 + // Public endpoints - no auth required 15 + if path == "/" 16 + || path == "/health" 17 + || path.starts_with("/xrpc/com.atproto.label.") 18 + { 19 + return Ok(next.run(req).await); 20 + } 21 + 22 + let Some(expected_token) = auth_token else { 23 + warn!("no MODERATION_AUTH_TOKEN set - accepting all requests"); 24 + return Ok(next.run(req).await); 25 + }; 26 + 27 + let token = req 28 + .headers() 29 + .get("X-Moderation-Key") 30 + .and_then(|v| v.to_str().ok()); 31 + 32 + match token { 33 + Some(t) if t == expected_token => Ok(next.run(req).await), 34 + Some(_) => { 35 + warn!("invalid auth token provided"); 36 + Err(StatusCode::UNAUTHORIZED) 37 + } 38 + None => { 39 + warn!("missing X-Moderation-Key header"); 40 + Err(StatusCode::UNAUTHORIZED) 41 + } 42 + } 43 + }
+44
moderation/src/config.rs
···
··· 1 + //! Configuration loading from environment variables. 2 + 3 + use anyhow::anyhow; 4 + use std::env; 5 + 6 + /// Service configuration loaded from environment. 7 + pub struct Config { 8 + pub host: String, 9 + pub port: u16, 10 + pub auth_token: Option<String>, 11 + pub audd_api_token: String, 12 + pub audd_api_url: String, 13 + pub database_url: Option<String>, 14 + pub labeler_did: Option<String>, 15 + pub labeler_signing_key: Option<String>, 16 + } 17 + 18 + impl Config { 19 + /// Load configuration from environment variables. 20 + pub fn from_env() -> anyhow::Result<Self> { 21 + Ok(Self { 22 + host: env::var("MODERATION_HOST").unwrap_or_else(|_| "0.0.0.0".to_string()), 23 + port: env::var("MODERATION_PORT") 24 + .ok() 25 + .and_then(|v| v.parse().ok()) 26 + .unwrap_or(8083), 27 + auth_token: env::var("MODERATION_AUTH_TOKEN").ok(), 28 + audd_api_token: env::var("MODERATION_AUDD_API_TOKEN") 29 + .map_err(|_| anyhow!("MODERATION_AUDD_API_TOKEN is required"))?, 30 + audd_api_url: env::var("MODERATION_AUDD_API_URL") 31 + .unwrap_or_else(|_| "https://enterprise.audd.io/".to_string()), 32 + database_url: env::var("MODERATION_DATABASE_URL").ok(), 33 + labeler_did: env::var("MODERATION_LABELER_DID").ok(), 34 + labeler_signing_key: env::var("MODERATION_LABELER_SIGNING_KEY").ok(), 35 + }) 36 + } 37 + 38 + /// Check if labeler is fully configured. 39 + pub fn labeler_enabled(&self) -> bool { 40 + self.database_url.is_some() 41 + && self.labeler_did.is_some() 42 + && self.labeler_signing_key.is_some() 43 + } 44 + }
+49
moderation/src/db.rs
··· 3 use chrono::{DateTime, Utc}; 4 use sqlx::{postgres::PgPoolOptions, PgPool}; 5 6 use crate::labels::Label; 7 8 /// Database connection pool and operations. ··· 231 .fetch_one(&self.pool) 232 .await 233 .map(|s| s.unwrap_or(0)) 234 } 235 } 236
··· 3 use chrono::{DateTime, Utc}; 4 use sqlx::{postgres::PgPoolOptions, PgPool}; 5 6 + use crate::admin::FlaggedTrack; 7 use crate::labels::Label; 8 9 /// Database connection pool and operations. ··· 232 .fetch_one(&self.pool) 233 .await 234 .map(|s| s.unwrap_or(0)) 235 + } 236 + 237 + /// Get all copyright-violation labels with their resolution status. 238 + /// 239 + /// A label is resolved if there's a negation label for the same uri+val. 240 + pub async fn get_pending_flags(&self) -> Result<Vec<FlaggedTrack>, sqlx::Error> { 241 + // Get all copyright-violation labels (non-negations) 242 + let rows = sqlx::query_as::<_, LabelRow>( 243 + r#" 244 + SELECT seq, src, uri, cid, val, neg, cts, exp, sig 245 + FROM labels 246 + WHERE val = 'copyright-violation' AND neg = false 247 + ORDER BY seq DESC 248 + "#, 249 + ) 250 + .fetch_all(&self.pool) 251 + .await?; 252 + 253 + // Get all negation labels for these URIs 254 + let uris: Vec<String> = rows.iter().map(|r| r.uri.clone()).collect(); 255 + let negated_uris: std::collections::HashSet<String> = if !uris.is_empty() { 256 + sqlx::query_scalar::<_, String>( 257 + r#" 258 + SELECT DISTINCT uri 259 + FROM labels 260 + WHERE val = 'copyright-violation' AND neg = true 261 + "#, 262 + ) 263 + .fetch_all(&self.pool) 264 + .await? 265 + .into_iter() 266 + .collect() 267 + } else { 268 + std::collections::HashSet::new() 269 + }; 270 + 271 + let tracks = rows 272 + .into_iter() 273 + .map(|r| FlaggedTrack { 274 + seq: r.seq, 275 + uri: r.uri.clone(), 276 + val: r.val, 277 + created_at: r.cts.format("%Y-%m-%d %H:%M:%S").to_string(), 278 + resolved: negated_uris.contains(&r.uri), 279 + }) 280 + .collect(); 281 + 282 + Ok(tracks) 283 } 284 } 285
+156
moderation/src/handlers.rs
···
··· 1 + //! HTTP request handlers for core endpoints. 2 + 3 + use axum::{extract::State, response::Html, Json}; 4 + use serde::{Deserialize, Serialize}; 5 + use tracing::info; 6 + 7 + use crate::labels::Label; 8 + use crate::state::{AppError, AppState}; 9 + 10 + // --- types --- 11 + 12 + #[derive(Debug, Serialize)] 13 + pub struct HealthResponse { 14 + pub status: &'static str, 15 + pub labeler_enabled: bool, 16 + } 17 + 18 + #[derive(Debug, Deserialize)] 19 + pub struct EmitLabelRequest { 20 + /// AT URI of the resource to label (e.g., at://did:plc:xxx/fm.plyr.track/abc123) 21 + pub uri: String, 22 + /// Label value (e.g., "copyright-violation") 23 + #[serde(default = "default_label_val")] 24 + pub val: String, 25 + /// Optional CID of specific version 26 + pub cid: Option<String>, 27 + /// If true, negate an existing label 28 + #[serde(default)] 29 + pub neg: bool, 30 + } 31 + 32 + fn default_label_val() -> String { 33 + "copyright-violation".to_string() 34 + } 35 + 36 + #[derive(Debug, Serialize)] 37 + pub struct EmitLabelResponse { 38 + pub seq: i64, 39 + pub label: Label, 40 + } 41 + 42 + // --- handlers --- 43 + 44 + /// Health check endpoint. 45 + pub async fn health(State(state): State<AppState>) -> Json<HealthResponse> { 46 + Json(HealthResponse { 47 + status: "ok", 48 + labeler_enabled: state.db.is_some(), 49 + }) 50 + } 51 + 52 + /// Landing page with service info. 53 + pub async fn landing(State(state): State<AppState>) -> Html<String> { 54 + let labeler_did = state 55 + .signer 56 + .as_ref() 57 + .map(|s| s.did().to_string()) 58 + .unwrap_or_else(|| "not configured".to_string()); 59 + 60 + Html(format!( 61 + r#"<!DOCTYPE html> 62 + <html> 63 + <head> 64 + <meta charset="utf-8"> 65 + <meta name="viewport" content="width=device-width, initial-scale=1"> 66 + <title>plyr.fm moderation</title> 67 + <style> 68 + body {{ 69 + font-family: system-ui, -apple-system, sans-serif; 70 + background: #0a0a0a; 71 + color: #e5e5e5; 72 + max-width: 600px; 73 + margin: 80px auto; 74 + padding: 20px; 75 + line-height: 1.6; 76 + }} 77 + h1 {{ color: #fff; margin-bottom: 8px; }} 78 + .subtitle {{ color: #888; margin-bottom: 32px; }} 79 + a {{ color: #3b82f6; }} 80 + code {{ 81 + background: #1a1a1a; 82 + padding: 2px 6px; 83 + border-radius: 4px; 84 + font-size: 0.9em; 85 + }} 86 + .endpoint {{ 87 + background: #111; 88 + border: 1px solid #222; 89 + border-radius: 8px; 90 + padding: 16px; 91 + margin: 12px 0; 92 + }} 93 + .endpoint-name {{ color: #10b981; font-family: monospace; }} 94 + </style> 95 + </head> 96 + <body> 97 + <h1>plyr.fm moderation</h1> 98 + <p class="subtitle">ATProto labeler for audio content moderation</p> 99 + 100 + <p>This service provides content labels for <a href="https://plyr.fm">plyr.fm</a>, 101 + the music streaming platform on ATProto.</p> 102 + 103 + <p><strong>Labeler DID:</strong> <code>{}</code></p> 104 + 105 + <h2>Endpoints</h2> 106 + 107 + <div class="endpoint"> 108 + <div class="endpoint-name">GET /xrpc/com.atproto.label.queryLabels</div> 109 + <p>Query labels by URI pattern</p> 110 + </div> 111 + 112 + <div class="endpoint"> 113 + <div class="endpoint-name">GET /xrpc/com.atproto.label.subscribeLabels</div> 114 + <p>WebSocket subscription for real-time label updates</p> 115 + </div> 116 + 117 + <p style="margin-top: 32px; color: #666;"> 118 + <a href="https://bsky.app/profile/moderation.plyr.fm">@moderation.plyr.fm</a> 119 + </p> 120 + </body> 121 + </html>"#, 122 + labeler_did 123 + )) 124 + } 125 + 126 + /// Emit a new label (internal API). 127 + pub async fn emit_label( 128 + State(state): State<AppState>, 129 + Json(request): Json<EmitLabelRequest>, 130 + ) -> Result<Json<EmitLabelResponse>, AppError> { 131 + let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 132 + let signer = state.signer.as_ref().ok_or(AppError::LabelerNotConfigured)?; 133 + 134 + info!(uri = %request.uri, val = %request.val, neg = request.neg, "emitting label"); 135 + 136 + // Create and sign the label 137 + let mut label = Label::new(signer.did(), &request.uri, &request.val); 138 + if let Some(cid) = request.cid { 139 + label = label.with_cid(cid); 140 + } 141 + if request.neg { 142 + label = label.negated(); 143 + } 144 + let label = signer.sign_label(label)?; 145 + 146 + // Store in database 147 + let seq = db.store_label(&label).await?; 148 + info!(seq, uri = %request.uri, "label stored"); 149 + 150 + // Broadcast to subscribers 151 + if let Some(tx) = &state.label_tx { 152 + let _ = tx.send((seq, label.clone())); 153 + } 154 + 155 + Ok(Json(EmitLabelResponse { seq, label })) 156 + }
+1 -1
moderation/src/labels.rs
··· 120 121 // Encode to DAG-CBOR 122 let cbor_bytes = 123 - serde_ipld_dagcbor::to_vec(&unsigned).map_err(|e| LabelError::Serialization(e))?; 124 125 // Sign with secp256k1 126 let signature: Signature = signing_key.sign(&cbor_bytes);
··· 120 121 // Encode to DAG-CBOR 122 let cbor_bytes = 123 + serde_ipld_dagcbor::to_vec(&unsigned).map_err(LabelError::Serialization)?; 124 125 // Sign with secp256k1 126 let signature: Signature = signing_key.sign(&cbor_bytes);
+29 -623
moderation/src/main.rs
··· 4 //! - AuDD audio fingerprinting for copyright detection 5 //! - ATProto labeler endpoints (queryLabels, subscribeLabels) 6 //! - Label emission for copyright violations 7 8 - use std::{env, net::SocketAddr, sync::Arc}; 9 10 use anyhow::anyhow; 11 use axum::{ 12 - extract::{ 13 - ws::{Message, WebSocket, WebSocketUpgrade}, 14 - Query, Request, State, 15 - }, 16 - http::StatusCode, 17 - middleware::{self, Next}, 18 - response::{IntoResponse, Response}, 19 routing::{get, post}, 20 - Json, Router, 21 }; 22 - use futures::StreamExt; 23 - use serde::{Deserialize, Serialize}; 24 use tokio::{net::TcpListener, sync::broadcast}; 25 - use tokio_stream::wrappers::BroadcastStream; 26 - use tracing::{error, info, warn}; 27 28 mod db; 29 mod labels; 30 - 31 - use db::LabelDb; 32 - use labels::{Label, LabelSigner}; 33 - 34 - // --- config --- 35 - 36 - struct Config { 37 - host: String, 38 - port: u16, 39 - auth_token: Option<String>, 40 - audd_api_token: String, 41 - audd_api_url: String, 42 - database_url: Option<String>, 43 - labeler_did: Option<String>, 44 - labeler_signing_key: Option<String>, 45 - } 46 - 47 - impl Config { 48 - fn from_env() -> anyhow::Result<Self> { 49 - Ok(Self { 50 - host: env::var("MODERATION_HOST").unwrap_or_else(|_| "0.0.0.0".to_string()), 51 - port: env::var("MODERATION_PORT") 52 - .ok() 53 - .and_then(|v| v.parse().ok()) 54 - .unwrap_or(8083), 55 - auth_token: env::var("MODERATION_AUTH_TOKEN").ok(), 56 - audd_api_token: env::var("MODERATION_AUDD_API_TOKEN") 57 - .map_err(|_| anyhow!("MODERATION_AUDD_API_TOKEN is required"))?, 58 - audd_api_url: env::var("MODERATION_AUDD_API_URL") 59 - .unwrap_or_else(|_| "https://enterprise.audd.io/".to_string()), 60 - database_url: env::var("MODERATION_DATABASE_URL").ok(), 61 - labeler_did: env::var("MODERATION_LABELER_DID").ok(), 62 - labeler_signing_key: env::var("MODERATION_LABELER_SIGNING_KEY").ok(), 63 - }) 64 - } 65 - 66 - fn labeler_enabled(&self) -> bool { 67 - self.database_url.is_some() 68 - && self.labeler_did.is_some() 69 - && self.labeler_signing_key.is_some() 70 - } 71 - } 72 - 73 - // --- request/response types --- 74 - 75 - #[derive(Debug, Deserialize)] 76 - struct ScanRequest { 77 - audio_url: String, 78 - } 79 - 80 - #[derive(Debug, Serialize)] 81 - struct ScanResponse { 82 - matches: Vec<AuddMatch>, 83 - is_flagged: bool, 84 - highest_score: i32, 85 - raw_response: serde_json::Value, 86 - } 87 - 88 - #[derive(Debug, Serialize, Clone)] 89 - struct AuddMatch { 90 - artist: String, 91 - title: String, 92 - #[serde(skip_serializing_if = "Option::is_none")] 93 - album: Option<String>, 94 - score: i32, 95 - #[serde(skip_serializing_if = "Option::is_none")] 96 - isrc: Option<String>, 97 - #[serde(skip_serializing_if = "Option::is_none")] 98 - timecode: Option<String>, 99 - #[serde(skip_serializing_if = "Option::is_none")] 100 - offset_ms: Option<i64>, 101 - } 102 - 103 - #[derive(Debug, Serialize)] 104 - struct HealthResponse { 105 - status: &'static str, 106 - labeler_enabled: bool, 107 - } 108 - 109 - // --- emit label request --- 110 - 111 - #[derive(Debug, Deserialize)] 112 - struct EmitLabelRequest { 113 - /// AT URI of the resource to label (e.g., at://did:plc:xxx/fm.plyr.track/abc123) 114 - uri: String, 115 - /// Label value (e.g., "copyright-violation") 116 - #[serde(default = "default_label_val")] 117 - val: String, 118 - /// Optional CID of specific version 119 - cid: Option<String>, 120 - /// If true, negate an existing label 121 - #[serde(default)] 122 - neg: bool, 123 - } 124 - 125 - fn default_label_val() -> String { 126 - "copyright-violation".to_string() 127 - } 128 - 129 - #[derive(Debug, Serialize)] 130 - struct EmitLabelResponse { 131 - seq: i64, 132 - label: Label, 133 - } 134 - 135 - // --- xrpc types --- 136 - 137 - #[derive(Debug, Deserialize)] 138 - #[serde(rename_all = "camelCase")] 139 - struct QueryLabelsParams { 140 - uri_patterns: String, // comma-separated 141 - sources: Option<String>, 142 - cursor: Option<String>, 143 - limit: Option<i64>, 144 - } 145 - 146 - #[derive(Debug, Serialize)] 147 - struct QueryLabelsResponse { 148 - cursor: Option<String>, 149 - labels: Vec<Label>, 150 - } 151 - 152 - #[derive(Debug, Deserialize)] 153 - struct SubscribeLabelsParams { 154 - cursor: Option<i64>, 155 - } 156 - 157 - // --- audd api types --- 158 - 159 - #[derive(Debug, Deserialize)] 160 - struct AuddResponse { 161 - status: Option<String>, 162 - result: Option<AuddResult>, 163 - } 164 - 165 - #[derive(Debug, Deserialize)] 166 - #[serde(untagged)] 167 - enum AuddResult { 168 - Groups(Vec<AuddGroup>), 169 - Single(AuddSong), 170 - } 171 172 - #[derive(Debug, Deserialize)] 173 - struct AuddGroup { 174 - offset: Option<serde_json::Value>, 175 - songs: Option<Vec<AuddSong>>, 176 - } 177 - 178 - #[derive(Debug, Deserialize)] 179 - #[allow(dead_code)] 180 - struct AuddSong { 181 - artist: Option<String>, 182 - title: Option<String>, 183 - album: Option<String>, 184 - score: Option<i32>, 185 - isrc: Option<String>, 186 - timecode: Option<String>, 187 - release_date: Option<String>, 188 - label: Option<String>, 189 - song_link: Option<String>, 190 - } 191 - 192 - // --- main --- 193 194 #[tokio::main] 195 async fn main() -> anyhow::Result<()> { ··· 198 .with_target(false) 199 .init(); 200 201 - let config = Config::from_env()?; 202 let auth_token = config.auth_token.clone(); 203 204 // Initialize labeler components if configured 205 let (db, signer, label_tx) = if config.labeler_enabled() { 206 - let db = LabelDb::connect(config.database_url.as_ref().unwrap()).await?; 207 db.migrate().await?; 208 info!("labeler database connected and migrated"); 209 210 - let signer = LabelSigner::from_hex( 211 config.labeler_signing_key.as_ref().unwrap(), 212 config.labeler_did.as_ref().unwrap(), 213 )?; 214 info!(did = %signer.did(), "labeler signer initialized"); 215 216 - let (tx, _) = broadcast::channel::<(i64, Label)>(1024); 217 (Some(db), Some(signer), Some(tx)) 218 } else { 219 warn!("labeler not configured - XRPC endpoints will return 503"); ··· 230 231 let app = Router::new() 232 // Landing page 233 - .route("/", get(landing)) 234 // Health check 235 - .route("/health", get(health)) 236 - // AuDD scanning (existing) 237 - .route("/scan", post(scan)) 238 // Label emission (internal API) 239 - .route("/emit-label", post(emit_label)) 240 // ATProto XRPC endpoints (public) 241 - .route("/xrpc/com.atproto.label.queryLabels", get(query_labels)) 242 .route( 243 "/xrpc/com.atproto.label.subscribeLabels", 244 - get(subscribe_labels), 245 ) 246 .layer(middleware::from_fn(move |req, next| { 247 - auth_middleware(req, next, auth_token.clone()) 248 })) 249 .with_state(state); 250 ··· 257 axum::serve(listener, app).await?; 258 Ok(()) 259 } 260 - 261 - // --- state --- 262 - 263 - #[derive(Clone)] 264 - struct AppState { 265 - audd_api_token: String, 266 - audd_api_url: String, 267 - db: Option<Arc<LabelDb>>, 268 - signer: Option<Arc<LabelSigner>>, 269 - label_tx: Option<broadcast::Sender<(i64, Label)>>, 270 - } 271 - 272 - // --- middleware --- 273 - 274 - async fn auth_middleware( 275 - req: Request, 276 - next: Next, 277 - auth_token: Option<String>, 278 - ) -> Result<Response, StatusCode> { 279 - let path = req.uri().path(); 280 - 281 - // Public endpoints - no auth required 282 - if path == "/" 283 - || path == "/health" 284 - || path.starts_with("/xrpc/com.atproto.label.") 285 - { 286 - return Ok(next.run(req).await); 287 - } 288 - 289 - let Some(expected_token) = auth_token else { 290 - warn!("no MODERATION_AUTH_TOKEN set - accepting all requests"); 291 - return Ok(next.run(req).await); 292 - }; 293 - 294 - let token = req 295 - .headers() 296 - .get("X-Moderation-Key") 297 - .and_then(|v| v.to_str().ok()); 298 - 299 - match token { 300 - Some(t) if t == expected_token => Ok(next.run(req).await), 301 - Some(_) => { 302 - warn!("invalid auth token provided"); 303 - Err(StatusCode::UNAUTHORIZED) 304 - } 305 - None => { 306 - warn!("missing X-Moderation-Key header"); 307 - Err(StatusCode::UNAUTHORIZED) 308 - } 309 - } 310 - } 311 - 312 - // --- handlers --- 313 - 314 - async fn health(State(state): State<AppState>) -> Json<HealthResponse> { 315 - Json(HealthResponse { 316 - status: "ok", 317 - labeler_enabled: state.db.is_some(), 318 - }) 319 - } 320 - 321 - async fn landing(State(state): State<AppState>) -> axum::response::Html<String> { 322 - let labeler_did = state 323 - .signer 324 - .as_ref() 325 - .map(|s| s.did().to_string()) 326 - .unwrap_or_else(|| "not configured".to_string()); 327 - 328 - axum::response::Html(format!( 329 - r#"<!DOCTYPE html> 330 - <html> 331 - <head> 332 - <meta charset="utf-8"> 333 - <meta name="viewport" content="width=device-width, initial-scale=1"> 334 - <title>plyr.fm moderation</title> 335 - <style> 336 - body {{ 337 - font-family: system-ui, -apple-system, sans-serif; 338 - background: #0a0a0a; 339 - color: #e5e5e5; 340 - max-width: 600px; 341 - margin: 80px auto; 342 - padding: 20px; 343 - line-height: 1.6; 344 - }} 345 - h1 {{ color: #fff; margin-bottom: 8px; }} 346 - .subtitle {{ color: #888; margin-bottom: 32px; }} 347 - a {{ color: #3b82f6; }} 348 - code {{ 349 - background: #1a1a1a; 350 - padding: 2px 6px; 351 - border-radius: 4px; 352 - font-size: 0.9em; 353 - }} 354 - .endpoint {{ 355 - background: #111; 356 - border: 1px solid #222; 357 - border-radius: 8px; 358 - padding: 16px; 359 - margin: 12px 0; 360 - }} 361 - .endpoint-name {{ color: #10b981; font-family: monospace; }} 362 - </style> 363 - </head> 364 - <body> 365 - <h1>plyr.fm moderation</h1> 366 - <p class="subtitle">ATProto labeler for audio content moderation</p> 367 - 368 - <p>This service provides content labels for <a href="https://plyr.fm">plyr.fm</a>, 369 - the music streaming platform on ATProto.</p> 370 - 371 - <p><strong>Labeler DID:</strong> <code>{}</code></p> 372 - 373 - <h2>Endpoints</h2> 374 - 375 - <div class="endpoint"> 376 - <div class="endpoint-name">GET /xrpc/com.atproto.label.queryLabels</div> 377 - <p>Query labels by URI pattern</p> 378 - </div> 379 - 380 - <div class="endpoint"> 381 - <div class="endpoint-name">GET /xrpc/com.atproto.label.subscribeLabels</div> 382 - <p>WebSocket subscription for real-time label updates</p> 383 - </div> 384 - 385 - <p style="margin-top: 32px; color: #666;"> 386 - <a href="https://bsky.app/profile/moderation.plyr.fm">@moderation.plyr.fm</a> 387 - </p> 388 - </body> 389 - </html>"#, 390 - labeler_did 391 - )) 392 - } 393 - 394 - async fn scan( 395 - State(state): State<AppState>, 396 - Json(request): Json<ScanRequest>, 397 - ) -> Result<Json<ScanResponse>, AppError> { 398 - info!(audio_url = %request.audio_url, "scanning audio"); 399 - 400 - let client = reqwest::Client::new(); 401 - let response = client 402 - .post(&state.audd_api_url) 403 - .form(&[ 404 - ("api_token", &state.audd_api_token), 405 - ("url", &request.audio_url), 406 - ("accurate_offsets", &"1".to_string()), 407 - ]) 408 - .send() 409 - .await 410 - .map_err(|e| AppError::Audd(format!("request failed: {e}")))?; 411 - 412 - let raw_response: serde_json::Value = response 413 - .json() 414 - .await 415 - .map_err(|e| AppError::Audd(format!("failed to parse response: {e}")))?; 416 - 417 - let audd_response: AuddResponse = serde_json::from_value(raw_response.clone()) 418 - .map_err(|e| AppError::Audd(format!("failed to parse audd response: {e}")))?; 419 - 420 - if audd_response.status.as_deref() == Some("error") { 421 - return Err(AppError::Audd(format!( 422 - "audd returned error: {}", 423 - raw_response 424 - ))); 425 - } 426 - 427 - let matches = extract_matches(&audd_response); 428 - let highest_score = matches.iter().map(|m| m.score).max().unwrap_or(0); 429 - let is_flagged = !matches.is_empty(); 430 - 431 - info!( 432 - match_count = matches.len(), 433 - highest_score, 434 - is_flagged, 435 - "scan complete" 436 - ); 437 - 438 - Ok(Json(ScanResponse { 439 - matches, 440 - is_flagged, 441 - highest_score, 442 - raw_response, 443 - })) 444 - } 445 - 446 - async fn emit_label( 447 - State(state): State<AppState>, 448 - Json(request): Json<EmitLabelRequest>, 449 - ) -> Result<Json<EmitLabelResponse>, AppError> { 450 - let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 451 - let signer = state.signer.as_ref().ok_or(AppError::LabelerNotConfigured)?; 452 - 453 - info!(uri = %request.uri, val = %request.val, neg = request.neg, "emitting label"); 454 - 455 - // Create and sign the label 456 - let mut label = Label::new(signer.did(), &request.uri, &request.val); 457 - if let Some(cid) = request.cid { 458 - label = label.with_cid(cid); 459 - } 460 - if request.neg { 461 - label = label.negated(); 462 - } 463 - let label = signer.sign_label(label)?; 464 - 465 - // Store in database 466 - let seq = db.store_label(&label).await?; 467 - info!(seq, uri = %request.uri, "label stored"); 468 - 469 - // Broadcast to subscribers 470 - if let Some(tx) = &state.label_tx { 471 - let _ = tx.send((seq, label.clone())); 472 - } 473 - 474 - Ok(Json(EmitLabelResponse { seq, label })) 475 - } 476 - 477 - async fn query_labels( 478 - State(state): State<AppState>, 479 - Query(params): Query<QueryLabelsParams>, 480 - ) -> Result<Json<QueryLabelsResponse>, AppError> { 481 - let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 482 - 483 - let uri_patterns: Vec<String> = params 484 - .uri_patterns 485 - .split(',') 486 - .map(|s| s.trim().to_string()) 487 - .collect(); 488 - let sources: Option<Vec<String>> = params 489 - .sources 490 - .map(|s| s.split(',').map(|s| s.trim().to_string()).collect()); 491 - let limit = params.limit.unwrap_or(50).min(250).max(1); 492 - 493 - let (rows, cursor) = db 494 - .query_labels( 495 - &uri_patterns, 496 - sources.as_deref(), 497 - params.cursor.as_deref(), 498 - limit, 499 - ) 500 - .await?; 501 - 502 - let labels: Vec<Label> = rows.iter().map(|r| r.to_label()).collect(); 503 - 504 - Ok(Json(QueryLabelsResponse { cursor, labels })) 505 - } 506 - 507 - async fn subscribe_labels( 508 - State(state): State<AppState>, 509 - Query(params): Query<SubscribeLabelsParams>, 510 - ws: WebSocketUpgrade, 511 - ) -> Result<Response, AppError> { 512 - let db = state.db.clone().ok_or(AppError::LabelerNotConfigured)?; 513 - let label_tx = state.label_tx.clone().ok_or(AppError::LabelerNotConfigured)?; 514 - 515 - Ok(ws.on_upgrade(move |socket| handle_subscribe(socket, db, label_tx, params.cursor))) 516 - } 517 - 518 - async fn handle_subscribe( 519 - mut socket: WebSocket, 520 - db: Arc<LabelDb>, 521 - label_tx: broadcast::Sender<(i64, Label)>, 522 - cursor: Option<i64>, 523 - ) { 524 - // If cursor provided, backfill from that point 525 - let start_seq = if let Some(c) = cursor { 526 - // Send historical labels first 527 - match db.get_labels_since(c, 1000).await { 528 - Ok(rows) => { 529 - for row in &rows { 530 - let msg = SubscribeLabelsMessage { 531 - seq: row.seq, 532 - labels: vec![row.to_label()], 533 - }; 534 - if let Ok(json) = serde_json::to_string(&msg) { 535 - if socket.send(Message::Text(json.into())).await.is_err() { 536 - return; 537 - } 538 - } 539 - } 540 - rows.last().map(|r| r.seq).unwrap_or(c) 541 - } 542 - Err(e) => { 543 - error!(error = %e, "failed to backfill labels"); 544 - return; 545 - } 546 - } 547 - } else { 548 - // Start from current position 549 - db.get_latest_seq().await.unwrap_or(0) 550 - }; 551 - 552 - // Subscribe to live updates 553 - let rx = label_tx.subscribe(); 554 - let mut stream = BroadcastStream::new(rx); 555 - 556 - let mut last_seq = start_seq; 557 - 558 - loop { 559 - tokio::select! { 560 - // Receive from broadcast 561 - Some(result) = stream.next() => { 562 - match result { 563 - Ok((seq, label)) => { 564 - if seq > last_seq { 565 - let msg = SubscribeLabelsMessage { 566 - seq, 567 - labels: vec![label], 568 - }; 569 - if let Ok(json) = serde_json::to_string(&msg) { 570 - if socket.send(Message::Text(json.into())).await.is_err() { 571 - break; 572 - } 573 - } 574 - last_seq = seq; 575 - } 576 - } 577 - Err(_) => continue, // Lagged, skip 578 - } 579 - } 580 - // Check for client disconnect 581 - msg = socket.recv() => { 582 - match msg { 583 - Some(Ok(Message::Close(_))) | None => break, 584 - Some(Ok(Message::Ping(data))) => { 585 - if socket.send(Message::Pong(data)).await.is_err() { 586 - break; 587 - } 588 - } 589 - _ => {} 590 - } 591 - } 592 - } 593 - } 594 - } 595 - 596 - #[derive(Serialize)] 597 - struct SubscribeLabelsMessage { 598 - seq: i64, 599 - labels: Vec<Label>, 600 - } 601 - 602 - fn extract_matches(response: &AuddResponse) -> Vec<AuddMatch> { 603 - let Some(result) = &response.result else { 604 - return vec![]; 605 - }; 606 - 607 - match result { 608 - AuddResult::Groups(groups) => groups 609 - .iter() 610 - .flat_map(|group| { 611 - group 612 - .songs 613 - .as_ref() 614 - .map(|songs| { 615 - songs 616 - .iter() 617 - .map(|song| parse_song(song, group.offset.as_ref())) 618 - .collect::<Vec<_>>() 619 - }) 620 - .unwrap_or_default() 621 - }) 622 - .collect(), 623 - AuddResult::Single(song) => vec![parse_song(song, None)], 624 - } 625 - } 626 - 627 - fn parse_song(song: &AuddSong, offset: Option<&serde_json::Value>) -> AuddMatch { 628 - let offset_ms = offset.and_then(|v| match v { 629 - serde_json::Value::Number(n) => n.as_i64(), 630 - serde_json::Value::String(s) => parse_timecode_to_ms(s), 631 - _ => None, 632 - }); 633 - 634 - AuddMatch { 635 - artist: song.artist.clone().unwrap_or_else(|| "Unknown".to_string()), 636 - title: song.title.clone().unwrap_or_else(|| "Unknown".to_string()), 637 - album: song.album.clone(), 638 - score: song.score.unwrap_or(0), 639 - isrc: song.isrc.clone(), 640 - timecode: song.timecode.clone(), 641 - offset_ms, 642 - } 643 - } 644 - 645 - fn parse_timecode_to_ms(timecode: &str) -> Option<i64> { 646 - let parts: Vec<&str> = timecode.split(':').collect(); 647 - match parts.len() { 648 - 2 => { 649 - let mins: i64 = parts[0].parse().ok()?; 650 - let secs: i64 = parts[1].parse().ok()?; 651 - Some((mins * 60 + secs) * 1000) 652 - } 653 - 3 => { 654 - let hours: i64 = parts[0].parse().ok()?; 655 - let mins: i64 = parts[1].parse().ok()?; 656 - let secs: i64 = parts[2].parse().ok()?; 657 - Some((hours * 3600 + mins * 60 + secs) * 1000) 658 - } 659 - _ => None, 660 - } 661 - } 662 - 663 - // --- errors --- 664 - 665 - #[derive(Debug, thiserror::Error)] 666 - enum AppError { 667 - #[error("audd error: {0}")] 668 - Audd(String), 669 - 670 - #[error("labeler not configured")] 671 - LabelerNotConfigured, 672 - 673 - #[error("label error: {0}")] 674 - Label(#[from] labels::LabelError), 675 - 676 - #[error("database error: {0}")] 677 - Database(#[from] sqlx::Error), 678 - } 679 - 680 - impl IntoResponse for AppError { 681 - fn into_response(self) -> Response { 682 - error!(error = %self, "request failed"); 683 - let (status, error_type) = match &self { 684 - AppError::Audd(_) => (StatusCode::BAD_GATEWAY, "AuddError"), 685 - AppError::LabelerNotConfigured => (StatusCode::SERVICE_UNAVAILABLE, "LabelerNotConfigured"), 686 - AppError::Label(_) => (StatusCode::INTERNAL_SERVER_ERROR, "LabelError"), 687 - AppError::Database(_) => (StatusCode::INTERNAL_SERVER_ERROR, "DatabaseError"), 688 - }; 689 - let body = serde_json::json!({ 690 - "error": error_type, 691 - "message": self.to_string() 692 - }); 693 - (status, Json(body)).into_response() 694 - } 695 - }
··· 4 //! - AuDD audio fingerprinting for copyright detection 5 //! - ATProto labeler endpoints (queryLabels, subscribeLabels) 6 //! - Label emission for copyright violations 7 + //! - Admin UI for reviewing and resolving flags 8 9 + use std::{net::SocketAddr, sync::Arc}; 10 11 use anyhow::anyhow; 12 use axum::{ 13 + middleware, 14 routing::{get, post}, 15 + Router, 16 }; 17 use tokio::{net::TcpListener, sync::broadcast}; 18 + use tracing::{info, warn}; 19 20 + mod admin; 21 + mod audd; 22 + mod config; 23 mod db; 24 + mod handlers; 25 mod labels; 26 + mod auth; 27 + mod state; 28 + mod xrpc; 29 30 + pub use state::{AppError, AppState}; 31 32 #[tokio::main] 33 async fn main() -> anyhow::Result<()> { ··· 36 .with_target(false) 37 .init(); 38 39 + let config = config::Config::from_env()?; 40 let auth_token = config.auth_token.clone(); 41 42 // Initialize labeler components if configured 43 let (db, signer, label_tx) = if config.labeler_enabled() { 44 + let db = db::LabelDb::connect(config.database_url.as_ref().unwrap()).await?; 45 db.migrate().await?; 46 info!("labeler database connected and migrated"); 47 48 + let signer = labels::LabelSigner::from_hex( 49 config.labeler_signing_key.as_ref().unwrap(), 50 config.labeler_did.as_ref().unwrap(), 51 )?; 52 info!(did = %signer.did(), "labeler signer initialized"); 53 54 + let (tx, _) = broadcast::channel::<(i64, labels::Label)>(1024); 55 (Some(db), Some(signer), Some(tx)) 56 } else { 57 warn!("labeler not configured - XRPC endpoints will return 503"); ··· 68 69 let app = Router::new() 70 // Landing page 71 + .route("/", get(handlers::landing)) 72 // Health check 73 + .route("/health", get(handlers::health)) 74 + // AuDD scanning 75 + .route("/scan", post(audd::scan)) 76 // Label emission (internal API) 77 + .route("/emit-label", post(handlers::emit_label)) 78 + // Admin UI and API 79 + .route("/admin", get(admin::admin_ui)) 80 + .route("/admin/flags", get(admin::list_flagged)) 81 + .route("/admin/resolve", post(admin::resolve_flag)) 82 // ATProto XRPC endpoints (public) 83 + .route("/xrpc/com.atproto.label.queryLabels", get(xrpc::query_labels)) 84 .route( 85 "/xrpc/com.atproto.label.subscribeLabels", 86 + get(xrpc::subscribe_labels), 87 ) 88 .layer(middleware::from_fn(move |req, next| { 89 + auth::auth_middleware(req, next, auth_token.clone()) 90 })) 91 .with_state(state); 92 ··· 99 axum::serve(listener, app).await?; 100 Ok(()) 101 }
+59
moderation/src/state.rs
···
··· 1 + //! Application state and error types. 2 + 3 + use std::sync::Arc; 4 + 5 + use axum::{ 6 + http::StatusCode, 7 + response::{IntoResponse, Response}, 8 + Json, 9 + }; 10 + use tokio::sync::broadcast; 11 + use tracing::error; 12 + 13 + use crate::db::LabelDb; 14 + use crate::labels::{Label, LabelError, LabelSigner}; 15 + 16 + /// Shared application state. 17 + #[derive(Clone)] 18 + pub struct AppState { 19 + pub audd_api_token: String, 20 + pub audd_api_url: String, 21 + pub db: Option<Arc<LabelDb>>, 22 + pub signer: Option<Arc<LabelSigner>>, 23 + pub label_tx: Option<broadcast::Sender<(i64, Label)>>, 24 + } 25 + 26 + /// Application error type. 27 + #[derive(Debug, thiserror::Error)] 28 + pub enum AppError { 29 + #[error("audd error: {0}")] 30 + Audd(String), 31 + 32 + #[error("labeler not configured")] 33 + LabelerNotConfigured, 34 + 35 + #[error("label error: {0}")] 36 + Label(#[from] LabelError), 37 + 38 + #[error("database error: {0}")] 39 + Database(#[from] sqlx::Error), 40 + } 41 + 42 + impl IntoResponse for AppError { 43 + fn into_response(self) -> Response { 44 + error!(error = %self, "request failed"); 45 + let (status, error_type) = match &self { 46 + AppError::Audd(_) => (StatusCode::BAD_GATEWAY, "AuddError"), 47 + AppError::LabelerNotConfigured => { 48 + (StatusCode::SERVICE_UNAVAILABLE, "LabelerNotConfigured") 49 + } 50 + AppError::Label(_) => (StatusCode::INTERNAL_SERVER_ERROR, "LabelError"), 51 + AppError::Database(_) => (StatusCode::INTERNAL_SERVER_ERROR, "DatabaseError"), 52 + }; 53 + let body = serde_json::json!({ 54 + "error": error_type, 55 + "message": self.to_string() 56 + }); 57 + (status, Json(body)).into_response() 58 + } 59 + }
+175
moderation/src/xrpc.rs
···
··· 1 + //! ATProto XRPC endpoints for the labeler protocol. 2 + 3 + use std::sync::Arc; 4 + 5 + use axum::{ 6 + extract::{ 7 + ws::{Message, WebSocket, WebSocketUpgrade}, 8 + Query, State, 9 + }, 10 + response::Response, 11 + Json, 12 + }; 13 + use futures::StreamExt; 14 + use serde::{Deserialize, Serialize}; 15 + use tokio::sync::broadcast; 16 + use tokio_stream::wrappers::BroadcastStream; 17 + use tracing::error; 18 + 19 + use crate::db::LabelDb; 20 + use crate::labels::Label; 21 + use crate::state::{AppError, AppState}; 22 + 23 + // --- types --- 24 + 25 + #[derive(Debug, Deserialize)] 26 + #[serde(rename_all = "camelCase")] 27 + pub struct QueryLabelsParams { 28 + pub uri_patterns: String, // comma-separated 29 + pub sources: Option<String>, 30 + pub cursor: Option<String>, 31 + pub limit: Option<i64>, 32 + } 33 + 34 + #[derive(Debug, Serialize)] 35 + pub struct QueryLabelsResponse { 36 + pub cursor: Option<String>, 37 + pub labels: Vec<Label>, 38 + } 39 + 40 + #[derive(Debug, Deserialize)] 41 + pub struct SubscribeLabelsParams { 42 + pub cursor: Option<i64>, 43 + } 44 + 45 + #[derive(Serialize)] 46 + struct SubscribeLabelsMessage { 47 + seq: i64, 48 + labels: Vec<Label>, 49 + } 50 + 51 + // --- handlers --- 52 + 53 + /// Query labels by URI pattern. 54 + pub async fn query_labels( 55 + State(state): State<AppState>, 56 + Query(params): Query<QueryLabelsParams>, 57 + ) -> Result<Json<QueryLabelsResponse>, AppError> { 58 + let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 59 + 60 + let uri_patterns: Vec<String> = params 61 + .uri_patterns 62 + .split(',') 63 + .map(|s| s.trim().to_string()) 64 + .collect(); 65 + let sources: Option<Vec<String>> = params 66 + .sources 67 + .map(|s| s.split(',').map(|s| s.trim().to_string()).collect()); 68 + let limit = params.limit.unwrap_or(50).clamp(1, 250); 69 + 70 + let (rows, cursor) = db 71 + .query_labels( 72 + &uri_patterns, 73 + sources.as_deref(), 74 + params.cursor.as_deref(), 75 + limit, 76 + ) 77 + .await?; 78 + 79 + let labels: Vec<Label> = rows.iter().map(|r| r.to_label()).collect(); 80 + 81 + Ok(Json(QueryLabelsResponse { cursor, labels })) 82 + } 83 + 84 + /// WebSocket subscription for real-time label updates. 85 + pub async fn subscribe_labels( 86 + State(state): State<AppState>, 87 + Query(params): Query<SubscribeLabelsParams>, 88 + ws: WebSocketUpgrade, 89 + ) -> Result<Response, AppError> { 90 + let db = state.db.clone().ok_or(AppError::LabelerNotConfigured)?; 91 + let label_tx = state 92 + .label_tx 93 + .clone() 94 + .ok_or(AppError::LabelerNotConfigured)?; 95 + 96 + Ok(ws.on_upgrade(move |socket| handle_subscribe(socket, db, label_tx, params.cursor))) 97 + } 98 + 99 + async fn handle_subscribe( 100 + mut socket: WebSocket, 101 + db: Arc<LabelDb>, 102 + label_tx: broadcast::Sender<(i64, Label)>, 103 + cursor: Option<i64>, 104 + ) { 105 + // If cursor provided, backfill from that point 106 + let start_seq = if let Some(c) = cursor { 107 + // Send historical labels first 108 + match db.get_labels_since(c, 1000).await { 109 + Ok(rows) => { 110 + for row in &rows { 111 + let msg = SubscribeLabelsMessage { 112 + seq: row.seq, 113 + labels: vec![row.to_label()], 114 + }; 115 + if let Ok(json) = serde_json::to_string(&msg) { 116 + if socket.send(Message::Text(json)).await.is_err() { 117 + return; 118 + } 119 + } 120 + } 121 + rows.last().map(|r| r.seq).unwrap_or(c) 122 + } 123 + Err(e) => { 124 + error!(error = %e, "failed to backfill labels"); 125 + return; 126 + } 127 + } 128 + } else { 129 + // Start from current position 130 + db.get_latest_seq().await.unwrap_or(0) 131 + }; 132 + 133 + // Subscribe to live updates 134 + let rx = label_tx.subscribe(); 135 + let mut stream = BroadcastStream::new(rx); 136 + 137 + let mut last_seq = start_seq; 138 + 139 + loop { 140 + tokio::select! { 141 + // Receive from broadcast 142 + Some(result) = stream.next() => { 143 + match result { 144 + Ok((seq, label)) => { 145 + if seq > last_seq { 146 + let msg = SubscribeLabelsMessage { 147 + seq, 148 + labels: vec![label], 149 + }; 150 + if let Ok(json) = serde_json::to_string(&msg) { 151 + if socket.send(Message::Text(json)).await.is_err() { 152 + break; 153 + } 154 + } 155 + last_seq = seq; 156 + } 157 + } 158 + Err(_) => continue, // Lagged, skip 159 + } 160 + } 161 + // Check for client disconnect 162 + msg = socket.recv() => { 163 + match msg { 164 + Some(Ok(Message::Close(_))) | None => break, 165 + Some(Ok(Message::Ping(data))) => { 166 + if socket.send(Message::Pong(data)).await.is_err() { 167 + break; 168 + } 169 + } 170 + _ => {} 171 + } 172 + } 173 + } 174 + } 175 + }