feat(moderation): add batch review system with mobile-friendly UI (#672)

* feat(moderation): add batch review system with mobile-friendly UI

- Add review batch tables (review_batches, batch_flags) with migrations
- Add /admin/batches POST endpoint to create review batches
- Add /review/:id endpoints for auth-protected review UI
- Review page renders server-side HTML with embedded JS
- Same auth middleware as admin endpoints (X-Moderation-Key header)
- Update moderation_loop.py to create batches and send DM links
- Simplify loop: DM is now just a notification channel, not for parsing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat(ci): add workflow dispatch for moderation loop

Allows triggering the moderation loop from GitHub Actions UI with:
- dry_run toggle (default: true for safety)
- limit input for testing with subset of flags
- env selector (prod/staging/dev)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

authored by zzstoatzz.io Claude Opus 4.5 and committed by GitHub cdc942b6 bafa0f24

Changed files
+1124 -1
.github
moderation
scripts
+64
.github/workflows/run-moderation-loop.yml
··· 1 + # run moderation loop via workflow dispatch 2 + # 3 + # analyzes pending copyright flags, auto-resolves false positives, 4 + # creates review batches for human review, sends DM notification 5 + # 6 + # required secrets: 7 + # MODERATION_SERVICE_URL - moderation service base URL 8 + # MODERATION_AUTH_TOKEN - X-Moderation-Key header value 9 + # ANTHROPIC_API_KEY - for flag analysis 10 + # NOTIFY_BOT_HANDLE - bluesky bot handle for DMs 11 + # NOTIFY_BOT_PASSWORD - bluesky bot app password 12 + # NOTIFY_RECIPIENT_HANDLE - who receives DM notifications 13 + 14 + name: run moderation loop 15 + 16 + on: 17 + workflow_dispatch: 18 + inputs: 19 + dry_run: 20 + description: "dry run (analyze only, don't resolve or send DMs)" 21 + type: boolean 22 + default: true 23 + limit: 24 + description: "max flags to process (leave empty for all)" 25 + type: string 26 + default: "" 27 + env: 28 + description: "environment (for DM header)" 29 + type: choice 30 + options: 31 + - prod 32 + - staging 33 + - dev 34 + default: prod 35 + 36 + jobs: 37 + run: 38 + runs-on: ubuntu-latest 39 + 40 + steps: 41 + - uses: actions/checkout@v4 42 + 43 + - uses: astral-sh/setup-uv@v4 44 + 45 + - name: Run moderation loop 46 + run: | 47 + ARGS="" 48 + if [ "${{ inputs.dry_run }}" = "true" ]; then 49 + ARGS="$ARGS --dry-run" 50 + fi 51 + if [ -n "${{ inputs.limit }}" ]; then 52 + ARGS="$ARGS --limit ${{ inputs.limit }}" 53 + fi 54 + ARGS="$ARGS --env ${{ inputs.env }}" 55 + 56 + echo "Running: uv run scripts/moderation_loop.py $ARGS" 57 + uv run scripts/moderation_loop.py $ARGS 58 + env: 59 + MODERATION_SERVICE_URL: ${{ secrets.MODERATION_SERVICE_URL }} 60 + MODERATION_AUTH_TOKEN: ${{ secrets.MODERATION_AUTH_TOKEN }} 61 + ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} 62 + NOTIFY_BOT_HANDLE: ${{ secrets.NOTIFY_BOT_HANDLE }} 63 + NOTIFY_BOT_PASSWORD: ${{ secrets.NOTIFY_BOT_PASSWORD }} 64 + NOTIFY_RECIPIENT_HANDLE: ${{ secrets.NOTIFY_RECIPIENT_HANDLE }}
+1 -1
moderation/Cargo.toml
··· 6 6 [dependencies] 7 7 anyhow = "1.0" 8 8 axum = { version = "0.7", features = ["macros", "json", "ws"] } 9 + rand = "0.8" 9 10 bytes = "1.0" 10 11 chrono = { version = "0.4", features = ["serde"] } 11 12 futures = "0.3" ··· 25 26 tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } 26 27 27 28 [dev-dependencies] 28 - rand = "0.8"
+69
moderation/src/admin.rs
··· 137 137 pub message: String, 138 138 } 139 139 140 + /// Request to create a review batch. 141 + #[derive(Debug, Deserialize)] 142 + pub struct CreateBatchRequest { 143 + /// URIs to include. If empty, uses all pending flags. 144 + #[serde(default)] 145 + pub uris: Vec<String>, 146 + /// Who created this batch. 147 + pub created_by: Option<String>, 148 + } 149 + 150 + /// Response after creating a review batch. 151 + #[derive(Debug, Serialize)] 152 + pub struct CreateBatchResponse { 153 + pub id: String, 154 + pub url: String, 155 + pub flag_count: usize, 156 + } 157 + 140 158 /// List all flagged tracks - returns JSON for API, HTML for htmx. 141 159 pub async fn list_flagged( 142 160 State(state): State<AppState>, ··· 325 343 Ok(Json(StoreContextResponse { 326 344 message: format!("context stored for {}", request.uri), 327 345 })) 346 + } 347 + 348 + /// Create a review batch from pending flags. 349 + pub async fn create_batch( 350 + State(state): State<AppState>, 351 + Json(request): Json<CreateBatchRequest>, 352 + ) -> Result<Json<CreateBatchResponse>, AppError> { 353 + let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 354 + 355 + // Get URIs to include 356 + let uris = if request.uris.is_empty() { 357 + let pending = db.get_pending_flags().await?; 358 + pending 359 + .into_iter() 360 + .filter(|t| !t.resolved) 361 + .map(|t| t.uri) 362 + .collect() 363 + } else { 364 + request.uris 365 + }; 366 + 367 + if uris.is_empty() { 368 + return Err(AppError::BadRequest("no flags to review".to_string())); 369 + } 370 + 371 + let id = generate_batch_id(); 372 + let flag_count = uris.len(); 373 + 374 + tracing::info!( 375 + batch_id = %id, 376 + flag_count = flag_count, 377 + "creating review batch" 378 + ); 379 + 380 + db.create_batch(&id, &uris, request.created_by.as_deref()) 381 + .await?; 382 + 383 + let url = format!("/review/{}", id); 384 + 385 + Ok(Json(CreateBatchResponse { id, url, flag_count })) 386 + } 387 + 388 + /// Generate a short, URL-safe batch ID. 389 + fn generate_batch_id() -> String { 390 + use std::time::{SystemTime, UNIX_EPOCH}; 391 + let now = SystemTime::now() 392 + .duration_since(UNIX_EPOCH) 393 + .unwrap() 394 + .as_millis(); 395 + let rand_part: u32 = rand::random(); 396 + format!("{:x}{:x}", (now as u64) & 0xFFFFFFFF, rand_part & 0xFFFF) 328 397 } 329 398 330 399 /// Add a sensitive image entry.
+247
moderation/src/db.rs
··· 23 23 pub flagged_by: Option<String>, 24 24 } 25 25 26 + /// Review batch for mobile-friendly flag review. 27 + #[derive(Debug, Clone, FromRow, Serialize, Deserialize)] 28 + pub struct ReviewBatch { 29 + pub id: String, 30 + pub created_at: DateTime<Utc>, 31 + pub expires_at: Option<DateTime<Utc>>, 32 + /// Status: pending, completed. 33 + pub status: String, 34 + /// Who created this batch. 35 + pub created_by: Option<String>, 36 + } 37 + 38 + /// A flag within a review batch. 39 + #[derive(Debug, Clone, FromRow, Serialize, Deserialize)] 40 + pub struct BatchFlag { 41 + pub id: i64, 42 + pub batch_id: String, 43 + pub uri: String, 44 + pub reviewed: bool, 45 + pub reviewed_at: Option<DateTime<Utc>>, 46 + /// Decision: approved, rejected, or null. 47 + pub decision: Option<String>, 48 + } 49 + 26 50 /// Type alias for context row from database query. 27 51 type ContextRow = ( 28 52 Option<i64>, // track_id ··· 232 256 .execute(&self.pool) 233 257 .await?; 234 258 sqlx::query("CREATE INDEX IF NOT EXISTS idx_sensitive_images_url ON sensitive_images(url)") 259 + .execute(&self.pool) 260 + .await?; 261 + 262 + // Review batches for mobile-friendly flag review 263 + sqlx::query( 264 + r#" 265 + CREATE TABLE IF NOT EXISTS review_batches ( 266 + id TEXT PRIMARY KEY, 267 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 268 + expires_at TIMESTAMPTZ, 269 + status TEXT NOT NULL DEFAULT 'pending', 270 + created_by TEXT 271 + ) 272 + "#, 273 + ) 274 + .execute(&self.pool) 275 + .await?; 276 + 277 + // Flags within review batches 278 + sqlx::query( 279 + r#" 280 + CREATE TABLE IF NOT EXISTS batch_flags ( 281 + id BIGSERIAL PRIMARY KEY, 282 + batch_id TEXT NOT NULL REFERENCES review_batches(id) ON DELETE CASCADE, 283 + uri TEXT NOT NULL, 284 + reviewed BOOLEAN NOT NULL DEFAULT FALSE, 285 + reviewed_at TIMESTAMPTZ, 286 + decision TEXT, 287 + UNIQUE(batch_id, uri) 288 + ) 289 + "#, 290 + ) 291 + .execute(&self.pool) 292 + .await?; 293 + 294 + sqlx::query("CREATE INDEX IF NOT EXISTS idx_batch_flags_batch_id ON batch_flags(batch_id)") 235 295 .execute(&self.pool) 236 296 .await?; 237 297 ··· 631 691 .collect(); 632 692 633 693 Ok(tracks) 694 + } 695 + 696 + // ------------------------------------------------------------------------- 697 + // Review batches 698 + // ------------------------------------------------------------------------- 699 + 700 + /// Create a review batch with the given flags. 701 + pub async fn create_batch( 702 + &self, 703 + id: &str, 704 + uris: &[String], 705 + created_by: Option<&str>, 706 + ) -> Result<ReviewBatch, sqlx::Error> { 707 + let batch = sqlx::query_as::<_, ReviewBatch>( 708 + r#" 709 + INSERT INTO review_batches (id, created_by) 710 + VALUES ($1, $2) 711 + RETURNING id, created_at, expires_at, status, created_by 712 + "#, 713 + ) 714 + .bind(id) 715 + .bind(created_by) 716 + .fetch_one(&self.pool) 717 + .await?; 718 + 719 + for uri in uris { 720 + sqlx::query( 721 + r#" 722 + INSERT INTO batch_flags (batch_id, uri) 723 + VALUES ($1, $2) 724 + ON CONFLICT (batch_id, uri) DO NOTHING 725 + "#, 726 + ) 727 + .bind(id) 728 + .bind(uri) 729 + .execute(&self.pool) 730 + .await?; 731 + } 732 + 733 + Ok(batch) 734 + } 735 + 736 + /// Get a batch by ID. 737 + pub async fn get_batch(&self, id: &str) -> Result<Option<ReviewBatch>, sqlx::Error> { 738 + sqlx::query_as::<_, ReviewBatch>( 739 + r#" 740 + SELECT id, created_at, expires_at, status, created_by 741 + FROM review_batches 742 + WHERE id = $1 743 + "#, 744 + ) 745 + .bind(id) 746 + .fetch_optional(&self.pool) 747 + .await 748 + } 749 + 750 + /// Get all flags in a batch with their context. 751 + pub async fn get_batch_flags(&self, batch_id: &str) -> Result<Vec<FlaggedTrack>, sqlx::Error> { 752 + let rows: Vec<FlaggedRow> = sqlx::query_as( 753 + r#" 754 + SELECT l.seq, l.uri, l.val, l.cts, 755 + c.track_id, c.track_title, c.artist_handle, c.artist_did, c.highest_score, c.matches, 756 + c.resolution_reason, c.resolution_notes 757 + FROM batch_flags bf 758 + JOIN labels l ON l.uri = bf.uri AND l.val = 'copyright-violation' AND l.neg = false 759 + LEFT JOIN label_context c ON l.uri = c.uri 760 + WHERE bf.batch_id = $1 761 + ORDER BY l.seq DESC 762 + "#, 763 + ) 764 + .bind(batch_id) 765 + .fetch_all(&self.pool) 766 + .await?; 767 + 768 + let batch_uris: Vec<String> = rows.iter().map(|r| r.1.clone()).collect(); 769 + let negated_uris: std::collections::HashSet<String> = if !batch_uris.is_empty() { 770 + sqlx::query_scalar::<_, String>( 771 + r#" 772 + SELECT DISTINCT uri 773 + FROM labels 774 + WHERE val = 'copyright-violation' AND neg = true AND uri = ANY($1) 775 + "#, 776 + ) 777 + .bind(&batch_uris) 778 + .fetch_all(&self.pool) 779 + .await? 780 + .into_iter() 781 + .collect() 782 + } else { 783 + std::collections::HashSet::new() 784 + }; 785 + 786 + let tracks = rows 787 + .into_iter() 788 + .map( 789 + |( 790 + seq, 791 + uri, 792 + val, 793 + cts, 794 + track_id, 795 + track_title, 796 + artist_handle, 797 + artist_did, 798 + highest_score, 799 + matches, 800 + resolution_reason, 801 + resolution_notes, 802 + )| { 803 + let context = if track_id.is_some() 804 + || track_title.is_some() 805 + || artist_handle.is_some() 806 + || resolution_reason.is_some() 807 + { 808 + Some(LabelContext { 809 + track_id, 810 + track_title, 811 + artist_handle, 812 + artist_did, 813 + highest_score, 814 + matches: matches.and_then(|v| serde_json::from_value(v).ok()), 815 + resolution_reason: resolution_reason 816 + .and_then(|s| ResolutionReason::from_str(&s)), 817 + resolution_notes, 818 + }) 819 + } else { 820 + None 821 + }; 822 + 823 + FlaggedTrack { 824 + seq, 825 + uri: uri.clone(), 826 + val, 827 + created_at: cts.format("%Y-%m-%d %H:%M:%S").to_string(), 828 + resolved: negated_uris.contains(&uri), 829 + context, 830 + } 831 + }, 832 + ) 833 + .collect(); 834 + 835 + Ok(tracks) 836 + } 837 + 838 + /// Update batch status. 839 + pub async fn update_batch_status(&self, id: &str, status: &str) -> Result<bool, sqlx::Error> { 840 + let result = sqlx::query("UPDATE review_batches SET status = $1 WHERE id = $2") 841 + .bind(status) 842 + .bind(id) 843 + .execute(&self.pool) 844 + .await?; 845 + Ok(result.rows_affected() > 0) 846 + } 847 + 848 + /// Mark a flag in a batch as reviewed. 849 + pub async fn mark_flag_reviewed( 850 + &self, 851 + batch_id: &str, 852 + uri: &str, 853 + decision: &str, 854 + ) -> Result<bool, sqlx::Error> { 855 + let result = sqlx::query( 856 + r#" 857 + UPDATE batch_flags 858 + SET reviewed = true, reviewed_at = NOW(), decision = $1 859 + WHERE batch_id = $2 AND uri = $3 860 + "#, 861 + ) 862 + .bind(decision) 863 + .bind(batch_id) 864 + .bind(uri) 865 + .execute(&self.pool) 866 + .await?; 867 + Ok(result.rows_affected() > 0) 868 + } 869 + 870 + /// Get pending (non-reviewed) flags from a batch. 871 + pub async fn get_batch_pending_uris(&self, batch_id: &str) -> Result<Vec<String>, sqlx::Error> { 872 + sqlx::query_scalar::<_, String>( 873 + r#" 874 + SELECT uri FROM batch_flags 875 + WHERE batch_id = $1 AND reviewed = false 876 + "#, 877 + ) 878 + .bind(batch_id) 879 + .fetch_all(&self.pool) 880 + .await 634 881 } 635 882 636 883 // -------------------------------------------------------------------------
+6
moderation/src/main.rs
··· 25 25 mod db; 26 26 mod handlers; 27 27 mod labels; 28 + mod review; 28 29 mod state; 29 30 mod xrpc; 30 31 ··· 91 92 "/admin/sensitive-images/remove", 92 93 post(admin::remove_sensitive_image), 93 94 ) 95 + .route("/admin/batches", post(admin::create_batch)) 96 + // Review endpoints (auth protected) 97 + .route("/review/:id", get(review::review_page)) 98 + .route("/review/:id/data", get(review::review_data)) 99 + .route("/review/:id/submit", post(review::submit_review)) 94 100 // Static files (CSS, JS for admin UI) 95 101 .nest_service("/static", ServeDir::new("static")) 96 102 // ATProto XRPC endpoints (public)
+453
moderation/src/review.rs
··· 1 + //! Review endpoints for batch flag review. 2 + //! 3 + //! These endpoints are behind the same auth as admin endpoints. 4 + 5 + use axum::{ 6 + extract::{Path, State}, 7 + http::header::CONTENT_TYPE, 8 + response::{IntoResponse, Response}, 9 + Json, 10 + }; 11 + use serde::{Deserialize, Serialize}; 12 + 13 + use crate::admin::FlaggedTrack; 14 + use crate::state::{AppError, AppState}; 15 + 16 + /// Response for review page data. 17 + #[derive(Debug, Serialize)] 18 + pub struct ReviewPageData { 19 + pub batch_id: String, 20 + pub flags: Vec<FlaggedTrack>, 21 + pub status: String, 22 + } 23 + 24 + /// Request to submit review decisions. 25 + #[derive(Debug, Deserialize)] 26 + pub struct SubmitReviewRequest { 27 + pub decisions: Vec<ReviewDecision>, 28 + } 29 + 30 + /// A single review decision. 31 + #[derive(Debug, Deserialize)] 32 + pub struct ReviewDecision { 33 + pub uri: String, 34 + pub decision: String, // "approved" or "rejected" 35 + } 36 + 37 + /// Response after submitting review. 38 + #[derive(Debug, Serialize)] 39 + pub struct SubmitReviewResponse { 40 + pub resolved_count: usize, 41 + pub message: String, 42 + } 43 + 44 + /// Get review page HTML. 45 + pub async fn review_page( 46 + State(state): State<AppState>, 47 + Path(batch_id): Path<String>, 48 + ) -> Result<Response, AppError> { 49 + let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 50 + 51 + let batch = db 52 + .get_batch(&batch_id) 53 + .await? 54 + .ok_or(AppError::NotFound("batch not found".to_string()))?; 55 + 56 + let flags = db.get_batch_flags(&batch_id).await?; 57 + let html = render_review_page(&batch_id, &flags, &batch.status); 58 + 59 + Ok(([(CONTENT_TYPE, "text/html; charset=utf-8")], html).into_response()) 60 + } 61 + 62 + /// Get review data as JSON. 63 + pub async fn review_data( 64 + State(state): State<AppState>, 65 + Path(batch_id): Path<String>, 66 + ) -> Result<Json<ReviewPageData>, AppError> { 67 + let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 68 + 69 + let batch = db 70 + .get_batch(&batch_id) 71 + .await? 72 + .ok_or(AppError::NotFound("batch not found".to_string()))?; 73 + 74 + let flags = db.get_batch_flags(&batch_id).await?; 75 + 76 + Ok(Json(ReviewPageData { 77 + batch_id, 78 + flags, 79 + status: batch.status, 80 + })) 81 + } 82 + 83 + /// Submit review decisions. 84 + pub async fn submit_review( 85 + State(state): State<AppState>, 86 + Path(batch_id): Path<String>, 87 + Json(request): Json<SubmitReviewRequest>, 88 + ) -> Result<Json<SubmitReviewResponse>, AppError> { 89 + let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?; 90 + let signer = state 91 + .signer 92 + .as_ref() 93 + .ok_or(AppError::LabelerNotConfigured)?; 94 + 95 + let _batch = db 96 + .get_batch(&batch_id) 97 + .await? 98 + .ok_or(AppError::NotFound("batch not found".to_string()))?; 99 + 100 + let mut resolved_count = 0; 101 + 102 + for decision in &request.decisions { 103 + tracing::info!( 104 + batch_id = %batch_id, 105 + uri = %decision.uri, 106 + decision = %decision.decision, 107 + "processing review decision" 108 + ); 109 + 110 + db.mark_flag_reviewed(&batch_id, &decision.uri, &decision.decision) 111 + .await?; 112 + 113 + if decision.decision == "approved" { 114 + let label = 115 + crate::labels::Label::new(signer.did(), &decision.uri, "copyright-violation") 116 + .negated(); 117 + let label = signer.sign_label(label)?; 118 + let seq = db.store_label(&label).await?; 119 + 120 + db.store_resolution( 121 + &decision.uri, 122 + crate::db::ResolutionReason::FingerprintNoise, 123 + Some("batch review"), 124 + ) 125 + .await?; 126 + 127 + if let Some(tx) = &state.label_tx { 128 + let _ = tx.send((seq, label)); 129 + } 130 + 131 + resolved_count += 1; 132 + } 133 + } 134 + 135 + let pending = db.get_batch_pending_uris(&batch_id).await?; 136 + if pending.is_empty() { 137 + db.update_batch_status(&batch_id, "completed").await?; 138 + } 139 + 140 + Ok(Json(SubmitReviewResponse { 141 + resolved_count, 142 + message: format!( 143 + "processed {} decisions, resolved {} flags", 144 + request.decisions.len(), 145 + resolved_count 146 + ), 147 + })) 148 + } 149 + 150 + /// Render the review page. 151 + fn render_review_page(batch_id: &str, flags: &[FlaggedTrack], status: &str) -> String { 152 + let pending: Vec<_> = flags.iter().filter(|f| !f.resolved).collect(); 153 + let resolved: Vec<_> = flags.iter().filter(|f| f.resolved).collect(); 154 + 155 + let pending_cards: Vec<String> = pending.iter().map(|f| render_review_card(f)).collect(); 156 + let resolved_cards: Vec<String> = resolved.iter().map(|f| render_review_card(f)).collect(); 157 + 158 + let pending_html = if pending_cards.is_empty() { 159 + "<div class=\"empty\">all flags reviewed!</div>".to_string() 160 + } else { 161 + pending_cards.join("\n") 162 + }; 163 + 164 + let resolved_html = if resolved_cards.is_empty() { 165 + String::new() 166 + } else { 167 + format!( 168 + r#"<details class="resolved-section"> 169 + <summary>{} resolved</summary> 170 + {} 171 + </details>"#, 172 + resolved_cards.len(), 173 + resolved_cards.join("\n") 174 + ) 175 + }; 176 + 177 + let status_badge = if status == "completed" { 178 + r#"<span class="status-badge completed">completed</span>"# 179 + } else { 180 + "" 181 + }; 182 + 183 + format!( 184 + r#"<!DOCTYPE html> 185 + <html lang="en"> 186 + <head> 187 + <meta charset="utf-8"> 188 + <meta name="viewport" content="width=device-width, initial-scale=1"> 189 + <title>review batch - plyr.fm</title> 190 + <style>{}</style> 191 + </head> 192 + <body> 193 + <div class="container"> 194 + <header> 195 + <h1>plyr.fm moderation</h1> 196 + <div class="batch-info">{} pending {}</div> 197 + </header> 198 + 199 + <form id="review-form" class="review-form"> 200 + <div class="flags-list"> 201 + {} 202 + </div> 203 + 204 + {} 205 + 206 + <div class="submit-bar"> 207 + <button type="submit" class="btn-submit" id="submit-btn" disabled> 208 + submit decisions 209 + </button> 210 + </div> 211 + </form> 212 + </div> 213 + 214 + <script> 215 + const form = document.getElementById('review-form'); 216 + const submitBtn = document.getElementById('submit-btn'); 217 + const batchId = '{}'; 218 + 219 + const decisions = {{}}; 220 + 221 + function updateSubmitBtn() {{ 222 + const count = Object.keys(decisions).length; 223 + submitBtn.disabled = count === 0; 224 + submitBtn.textContent = count > 0 ? `submit ${{count}} decision${{count > 1 ? 's' : ''}}` : 'submit decisions'; 225 + }} 226 + 227 + function setDecision(uri, decision) {{ 228 + decisions[uri] = decision; 229 + const card = document.querySelector(`[data-uri="${{CSS.escape(uri)}}"]`); 230 + if (card) {{ 231 + card.classList.remove('approved', 'rejected'); 232 + card.classList.add(decision); 233 + }} 234 + updateSubmitBtn(); 235 + }} 236 + 237 + form.addEventListener('submit', async (e) => {{ 238 + e.preventDefault(); 239 + submitBtn.disabled = true; 240 + submitBtn.textContent = 'submitting...'; 241 + 242 + try {{ 243 + const response = await fetch(`/review/${{batchId}}/submit`, {{ 244 + method: 'POST', 245 + headers: {{ 'Content-Type': 'application/json' }}, 246 + body: JSON.stringify({{ 247 + decisions: Object.entries(decisions).map(([uri, decision]) => ({{ uri, decision }})) 248 + }}) 249 + }}); 250 + 251 + if (response.ok) {{ 252 + const result = await response.json(); 253 + alert(result.message); 254 + location.reload(); 255 + }} else {{ 256 + const err = await response.json(); 257 + alert('error: ' + (err.message || 'unknown error')); 258 + submitBtn.disabled = false; 259 + updateSubmitBtn(); 260 + }} 261 + }} catch (err) {{ 262 + alert('network error: ' + err.message); 263 + submitBtn.disabled = false; 264 + updateSubmitBtn(); 265 + }} 266 + }}); 267 + </script> 268 + </body> 269 + </html>"#, 270 + REVIEW_CSS, 271 + pending.len(), 272 + status_badge, 273 + pending_html, 274 + resolved_html, 275 + html_escape(batch_id) 276 + ) 277 + } 278 + 279 + /// Render a single review card. 280 + fn render_review_card(track: &FlaggedTrack) -> String { 281 + let ctx = track.context.as_ref(); 282 + 283 + let title = ctx 284 + .and_then(|c| c.track_title.as_deref()) 285 + .unwrap_or("unknown track"); 286 + let artist = ctx 287 + .and_then(|c| c.artist_handle.as_deref()) 288 + .unwrap_or("unknown"); 289 + let track_id = ctx.and_then(|c| c.track_id); 290 + 291 + let title_html = if let Some(id) = track_id { 292 + format!( 293 + r#"<a href="https://plyr.fm/track/{}" target="_blank">{}</a>"#, 294 + id, 295 + html_escape(title) 296 + ) 297 + } else { 298 + html_escape(title) 299 + }; 300 + 301 + let matches_html = ctx 302 + .and_then(|c| c.matches.as_ref()) 303 + .filter(|m| !m.is_empty()) 304 + .map(|matches| { 305 + let items: Vec<String> = matches 306 + .iter() 307 + .take(2) 308 + .map(|m| { 309 + format!( 310 + r#"<span class="match">{} - {}</span>"#, 311 + html_escape(&m.title), 312 + html_escape(&m.artist) 313 + ) 314 + }) 315 + .collect(); 316 + format!(r#"<div class="matches">{}</div>"#, items.join("")) 317 + }) 318 + .unwrap_or_default(); 319 + 320 + let resolved_badge = if track.resolved { 321 + r#"<span class="badge resolved">resolved</span>"# 322 + } else { 323 + "" 324 + }; 325 + 326 + let action_buttons = if !track.resolved { 327 + format!( 328 + r#"<div class="actions"> 329 + <button type="button" class="btn-approve" onclick="setDecision('{}', 'approved')">approve</button> 330 + <button type="button" class="btn-reject" onclick="setDecision('{}', 'rejected')">reject</button> 331 + </div>"#, 332 + html_escape(&track.uri), 333 + html_escape(&track.uri) 334 + ) 335 + } else { 336 + String::new() 337 + }; 338 + 339 + format!( 340 + r#"<div class="review-card{}" data-uri="{}"> 341 + <div class="track-info"> 342 + <div class="title">{}</div> 343 + <div class="artist">@{}</div> 344 + {} 345 + </div> 346 + {} 347 + {} 348 + </div>"#, 349 + if track.resolved { " resolved" } else { "" }, 350 + html_escape(&track.uri), 351 + title_html, 352 + html_escape(artist), 353 + matches_html, 354 + resolved_badge, 355 + action_buttons 356 + ) 357 + } 358 + 359 + fn html_escape(s: &str) -> String { 360 + s.replace('&', "&amp;") 361 + .replace('<', "&lt;") 362 + .replace('>', "&gt;") 363 + .replace('"', "&quot;") 364 + .replace('\'', "&#039;") 365 + } 366 + 367 + const REVIEW_CSS: &str = r#" 368 + * { box-sizing: border-box; margin: 0; padding: 0; } 369 + 370 + body { 371 + font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; 372 + background: #0a0a0a; 373 + color: #e0e0e0; 374 + min-height: 100vh; 375 + } 376 + 377 + .container { 378 + max-width: 600px; 379 + margin: 0 auto; 380 + padding: 16px; 381 + padding-bottom: 80px; 382 + } 383 + 384 + header { 385 + display: flex; 386 + justify-content: space-between; 387 + align-items: center; 388 + margin-bottom: 20px; 389 + padding-bottom: 12px; 390 + border-bottom: 1px solid #333; 391 + } 392 + 393 + h1 { font-size: 1.25rem; font-weight: 600; color: #fff; } 394 + .batch-info { font-size: 0.875rem; color: #888; } 395 + .status-badge { font-size: 0.7rem; background: #1a3a1a; color: #6d9; padding: 2px 6px; border-radius: 4px; margin-left: 8px; } 396 + .flags-list { display: flex; flex-direction: column; gap: 12px; } 397 + 398 + .review-card { 399 + background: #1a1a1a; 400 + border: 1px solid #333; 401 + border-radius: 8px; 402 + padding: 12px; 403 + transition: border-color 0.2s, background 0.2s; 404 + } 405 + 406 + .review-card.approved { border-color: #2d5a27; background: #1a2a18; } 407 + .review-card.rejected { border-color: #5a2727; background: #2a1818; } 408 + .review-card.resolved { opacity: 0.6; } 409 + .track-info { margin-bottom: 8px; } 410 + .title { font-weight: 600; font-size: 1rem; margin-bottom: 2px; } 411 + .title a { color: inherit; text-decoration: none; } 412 + .title a:hover { text-decoration: underline; } 413 + .artist { font-size: 0.875rem; color: #888; } 414 + .matches { margin-top: 6px; display: flex; flex-wrap: wrap; gap: 4px; } 415 + .match { font-size: 0.75rem; background: #2a2a2a; padding: 2px 6px; border-radius: 4px; color: #aaa; } 416 + .badge { display: inline-block; font-size: 0.7rem; padding: 2px 6px; border-radius: 4px; text-transform: uppercase; font-weight: 500; } 417 + .badge.resolved { background: #1a3a1a; color: #6d9; } 418 + .actions { display: flex; gap: 8px; margin-top: 10px; } 419 + .actions button { flex: 1; padding: 10px; border: none; border-radius: 6px; font-size: 0.875rem; font-weight: 500; cursor: pointer; transition: opacity 0.2s; } 420 + .btn-approve { background: #2d5a27; color: #fff; } 421 + .btn-reject { background: #5a2727; color: #fff; } 422 + .actions button:active { opacity: 0.8; } 423 + 424 + .submit-bar { 425 + position: fixed; 426 + bottom: 0; 427 + left: 0; 428 + right: 0; 429 + padding: 12px 16px; 430 + background: #111; 431 + border-top: 1px solid #333; 432 + } 433 + 434 + .btn-submit { 435 + width: 100%; 436 + max-width: 600px; 437 + margin: 0 auto; 438 + display: block; 439 + padding: 14px; 440 + background: #4a9eff; 441 + color: #fff; 442 + border: none; 443 + border-radius: 8px; 444 + font-size: 1rem; 445 + font-weight: 600; 446 + cursor: pointer; 447 + } 448 + 449 + .btn-submit:disabled { background: #333; color: #666; cursor: not-allowed; } 450 + .empty { text-align: center; padding: 40px 20px; color: #666; } 451 + .resolved-section { margin-top: 20px; border-top: 1px solid #333; padding-top: 16px; } 452 + .resolved-section summary { cursor: pointer; color: #888; font-size: 0.875rem; margin-bottom: 12px; } 453 + "#;
+4
moderation/src/state.rs
··· 35 35 #[error("bad request: {0}")] 36 36 BadRequest(String), 37 37 38 + #[error("not found: {0}")] 39 + NotFound(String), 40 + 38 41 #[error("label error: {0}")] 39 42 Label(#[from] LabelError), 40 43 ··· 54 57 (StatusCode::SERVICE_UNAVAILABLE, "LabelerNotConfigured") 55 58 } 56 59 AppError::BadRequest(_) => (StatusCode::BAD_REQUEST, "BadRequest"), 60 + AppError::NotFound(_) => (StatusCode::NOT_FOUND, "NotFound"), 57 61 AppError::Label(_) => (StatusCode::INTERNAL_SERVER_ERROR, "LabelError"), 58 62 AppError::Database(_) => (StatusCode::INTERNAL_SERVER_ERROR, "DatabaseError"), 59 63 AppError::Io(_) => (StatusCode::INTERNAL_SERVER_ERROR, "IoError"),
+280
scripts/moderation_loop.py
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = [ 5 + # "pydantic-ai>=0.1.0", 6 + # "anthropic", 7 + # "httpx", 8 + # "pydantic>=2.0", 9 + # "pydantic-settings", 10 + # "atproto>=0.0.55", 11 + # "rich", 12 + # ] 13 + # /// 14 + """autonomous moderation loop for plyr.fm. 15 + 16 + workflow: 17 + 1. fetch pending flags from moderation service 18 + 2. analyze each flag with LLM (FALSE_POSITIVE, VIOLATION, NEEDS_HUMAN) 19 + 3. auto-resolve false positives 20 + 4. create review batch for needs_human flags 21 + 5. send DM with link to review UI 22 + 23 + the review UI handles human decisions - DM is just a notification channel. 24 + """ 25 + 26 + import argparse 27 + import asyncio 28 + from dataclasses import dataclass, field 29 + from pathlib import Path 30 + 31 + import httpx 32 + from atproto import AsyncClient, models 33 + from pydantic import BaseModel, Field 34 + from pydantic_ai import Agent 35 + from pydantic_ai.models.anthropic import AnthropicModel 36 + from pydantic_settings import BaseSettings, SettingsConfigDict 37 + from rich.console import Console 38 + 39 + console = Console() 40 + 41 + 42 + class LoopSettings(BaseSettings): 43 + model_config = SettingsConfigDict( 44 + env_file=Path(__file__).parent.parent / ".env", 45 + case_sensitive=False, 46 + extra="ignore", 47 + ) 48 + moderation_service_url: str = Field( 49 + default="https://moderation.plyr.fm", validation_alias="MODERATION_SERVICE_URL" 50 + ) 51 + moderation_auth_token: str = Field( 52 + default="", validation_alias="MODERATION_AUTH_TOKEN" 53 + ) 54 + anthropic_api_key: str = Field(default="", validation_alias="ANTHROPIC_API_KEY") 55 + anthropic_model: str = Field( 56 + default="claude-sonnet-4-20250514", validation_alias="ANTHROPIC_MODEL" 57 + ) 58 + bot_handle: str = Field(default="", validation_alias="NOTIFY_BOT_HANDLE") 59 + bot_password: str = Field(default="", validation_alias="NOTIFY_BOT_PASSWORD") 60 + recipient_handle: str = Field( 61 + default="", validation_alias="NOTIFY_RECIPIENT_HANDLE" 62 + ) 63 + 64 + 65 + class FlagAnalysis(BaseModel): 66 + """result of analyzing a single flag.""" 67 + 68 + uri: str 69 + category: str = Field(description="FALSE_POSITIVE, VIOLATION, or NEEDS_HUMAN") 70 + reason: str 71 + 72 + 73 + @dataclass 74 + class DMClient: 75 + handle: str 76 + password: str 77 + recipient_handle: str 78 + _client: AsyncClient = field(init=False, repr=False) 79 + _dm_client: AsyncClient = field(init=False, repr=False) 80 + _recipient_did: str = field(init=False, repr=False) 81 + _convo_id: str = field(init=False, repr=False) 82 + 83 + async def setup(self) -> None: 84 + self._client = AsyncClient() 85 + await self._client.login(self.handle, self.password) 86 + self._dm_client = self._client.with_bsky_chat_proxy() 87 + profile = await self._client.app.bsky.actor.get_profile( 88 + {"actor": self.recipient_handle} 89 + ) 90 + self._recipient_did = profile.did 91 + convo = await self._dm_client.chat.bsky.convo.get_convo_for_members( 92 + models.ChatBskyConvoGetConvoForMembers.Params(members=[self._recipient_did]) 93 + ) 94 + self._convo_id = convo.convo.id 95 + 96 + async def get_messages(self, limit: int = 30) -> list[dict]: 97 + response = await self._dm_client.chat.bsky.convo.get_messages( 98 + models.ChatBskyConvoGetMessages.Params(convo_id=self._convo_id, limit=limit) 99 + ) 100 + return [ 101 + { 102 + "text": m.text, 103 + "is_bot": m.sender.did != self._recipient_did, 104 + "sent_at": getattr(m, "sent_at", None), 105 + } 106 + for m in response.messages 107 + if hasattr(m, "text") and hasattr(m, "sender") 108 + ] 109 + 110 + async def send(self, text: str) -> None: 111 + await self._dm_client.chat.bsky.convo.send_message( 112 + models.ChatBskyConvoSendMessage.Data( 113 + convo_id=self._convo_id, 114 + message=models.ChatBskyConvoDefs.MessageInput(text=text), 115 + ) 116 + ) 117 + 118 + 119 + @dataclass 120 + class ModClient: 121 + base_url: str 122 + auth_token: str 123 + _client: httpx.AsyncClient = field(init=False, repr=False) 124 + 125 + def __post_init__(self) -> None: 126 + self._client = httpx.AsyncClient( 127 + base_url=self.base_url, 128 + headers={"X-Moderation-Key": self.auth_token}, 129 + timeout=30.0, 130 + ) 131 + 132 + async def close(self) -> None: 133 + await self._client.aclose() 134 + 135 + async def list_pending(self) -> list[dict]: 136 + r = await self._client.get("/admin/flags", params={"filter": "pending"}) 137 + r.raise_for_status() 138 + return r.json().get("tracks", []) 139 + 140 + async def resolve(self, uri: str, reason: str, notes: str = "") -> None: 141 + r = await self._client.post( 142 + "/admin/resolve", 143 + json={ 144 + "uri": uri, 145 + "val": "copyright-violation", 146 + "reason": reason, 147 + "notes": notes, 148 + }, 149 + ) 150 + r.raise_for_status() 151 + 152 + async def create_batch( 153 + self, uris: list[str], created_by: str | None = None 154 + ) -> dict: 155 + """create a review batch and return {id, url, flag_count}.""" 156 + r = await self._client.post( 157 + "/admin/batches", 158 + json={"uris": uris, "created_by": created_by}, 159 + ) 160 + r.raise_for_status() 161 + return r.json() 162 + 163 + 164 + def get_header(env: str) -> str: 165 + return f"[PLYR-MOD:{env.upper()}]" 166 + 167 + 168 + def create_flag_analyzer(api_key: str, model: str) -> Agent[None, list[FlagAnalysis]]: 169 + from pydantic_ai.providers.anthropic import AnthropicProvider 170 + 171 + return Agent( 172 + model=AnthropicModel(model, provider=AnthropicProvider(api_key=api_key)), 173 + output_type=list[FlagAnalysis], 174 + system_prompt="""\ 175 + analyze each copyright flag. categorize as: 176 + - FALSE_POSITIVE: fingerprint noise, uploader is the artist, unrelated matches 177 + - VIOLATION: clearly copyrighted commercial content 178 + - NEEDS_HUMAN: ambiguous, need human review 179 + 180 + return a FlagAnalysis for each flag with uri, category, and brief reason. 181 + """, 182 + ) 183 + 184 + 185 + async def run_loop( 186 + dry_run: bool = False, limit: int | None = None, env: str = "prod" 187 + ) -> None: 188 + settings = LoopSettings() 189 + for attr in [ 190 + "moderation_auth_token", 191 + "anthropic_api_key", 192 + "bot_handle", 193 + "bot_password", 194 + "recipient_handle", 195 + ]: 196 + if not getattr(settings, attr): 197 + console.print(f"[red]missing {attr}[/red]") 198 + return 199 + 200 + console.print(f"[bold]moderation loop[/bold] ({settings.anthropic_model})") 201 + if dry_run: 202 + console.print("[yellow]DRY RUN[/yellow]") 203 + 204 + dm = DMClient(settings.bot_handle, settings.bot_password, settings.recipient_handle) 205 + mod = ModClient(settings.moderation_service_url, settings.moderation_auth_token) 206 + 207 + try: 208 + await dm.setup() 209 + 210 + # get pending flags 211 + pending = await mod.list_pending() 212 + if not pending: 213 + console.print("[green]no pending flags[/green]") 214 + return 215 + 216 + console.print(f"[bold]{len(pending)} pending flags[/bold]") 217 + 218 + # analyze flags 219 + if limit: 220 + pending = pending[:limit] 221 + 222 + analyzer = create_flag_analyzer( 223 + settings.anthropic_api_key, settings.anthropic_model 224 + ) 225 + desc = "\n---\n".join( 226 + f"URI: {f['uri']}\nTrack: {f.get('context', {}).get('track_title', '?')}\n" 227 + f"Uploader: @{f.get('context', {}).get('artist_handle', '?')}\n" 228 + f"Matches: {', '.join(m['artist'] for m in f.get('context', {}).get('matches', [])[:3])}" 229 + for f in pending 230 + ) 231 + result = await analyzer.run(f"analyze {len(pending)} flags:\n\n{desc}") 232 + analyses = result.output 233 + 234 + # auto-resolve false positives 235 + auto = [a for a in analyses if a.category == "FALSE_POSITIVE"] 236 + human = [a for a in analyses if a.category == "NEEDS_HUMAN"] 237 + console.print(f"analysis: {len(auto)} auto-resolve, {len(human)} need human") 238 + 239 + for a in auto: 240 + if not dry_run: 241 + try: 242 + await mod.resolve( 243 + a.uri, "fingerprint_noise", f"auto: {a.reason[:50]}" 244 + ) 245 + console.print(f" [green]✓[/green] {a.uri[-40:]}") 246 + except Exception as e: 247 + console.print(f" [red]✗[/red] {e}") 248 + 249 + # create batch and send link for needs_human (if any) 250 + if human: 251 + human_uris = [h.uri for h in human] 252 + console.print(f"[dim]creating batch for {len(human_uris)} flags...[/dim]") 253 + 254 + if not dry_run: 255 + batch = await mod.create_batch(human_uris, created_by="moderation_loop") 256 + msg = f"{get_header(env)} {batch['flag_count']} need review:\n{batch['url']}" 257 + await dm.send(msg) 258 + console.print(f"[green]sent batch {batch['id']}[/green]") 259 + else: 260 + console.print( 261 + f"[yellow]would create batch with {len(human_uris)} flags[/yellow]" 262 + ) 263 + 264 + console.print("[bold]done[/bold]") 265 + 266 + finally: 267 + await mod.close() 268 + 269 + 270 + def main() -> None: 271 + parser = argparse.ArgumentParser() 272 + parser.add_argument("--dry-run", action="store_true") 273 + parser.add_argument("--limit", type=int, default=None) 274 + parser.add_argument("--env", default="prod", choices=["dev", "staging", "prod"]) 275 + args = parser.parse_args() 276 + asyncio.run(run_loop(dry_run=args.dry_run, limit=args.limit, env=args.env)) 277 + 278 + 279 + if __name__ == "__main__": 280 + main()