1//! Database operations for the labeler. 2 3use chrono::{DateTime, Utc}; 4use serde::{Deserialize, Serialize}; 5use sqlx::{postgres::PgPoolOptions, FromRow, PgPool}; 6 7use crate::admin::FlaggedTrack; 8use crate::labels::Label; 9 10/// Sensitive image record from the database. 11#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] 12pub struct SensitiveImageRow { 13 pub id: i64, 14 /// R2 storage ID (for track/album artwork) 15 pub image_id: Option<String>, 16 /// Full URL (for external images like avatars) 17 pub url: Option<String>, 18 /// Why this image was flagged 19 pub reason: Option<String>, 20 /// When the image was flagged 21 pub flagged_at: DateTime<Utc>, 22 /// Admin who flagged it 23 pub flagged_by: Option<String>, 24} 25 26/// Review batch for mobile-friendly flag review. 27#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] 28pub struct ReviewBatch { 29 pub id: String, 30 pub created_at: DateTime<Utc>, 31 pub expires_at: Option<DateTime<Utc>>, 32 /// Status: pending, completed. 33 pub status: String, 34 /// Who created this batch. 35 pub created_by: Option<String>, 36} 37 38/// A flag within a review batch. 39#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] 40pub struct BatchFlag { 41 pub id: i64, 42 pub batch_id: String, 43 pub uri: String, 44 pub reviewed: bool, 45 pub reviewed_at: Option<DateTime<Utc>>, 46 /// Decision: approved, rejected, or null. 47 pub decision: Option<String>, 48} 49 50/// Type alias for context row from database query. 51type ContextRow = ( 52 Option<i64>, // track_id 53 Option<String>, // track_title 54 Option<String>, // artist_handle 55 Option<String>, // artist_did 56 Option<f64>, // highest_score 57 Option<serde_json::Value>, // matches 58 Option<String>, // resolution_reason 59 Option<String>, // resolution_notes 60); 61 62/// Type alias for flagged track row from database query. 63type FlaggedRow = ( 64 i64, // seq 65 String, // uri 66 String, // val 67 DateTime<Utc>, // cts 68 Option<i64>, // track_id 69 Option<String>, // track_title 70 Option<String>, // artist_handle 71 Option<String>, // artist_did 72 Option<f64>, // highest_score 73 Option<serde_json::Value>, // matches 74 Option<String>, // resolution_reason 75 Option<String>, // resolution_notes 76); 77 78/// Copyright match info stored alongside labels. 79#[derive(Debug, Clone, Serialize, Deserialize)] 80pub struct CopyrightMatch { 81 pub title: String, 82 pub artist: String, 83 pub score: f64, 84} 85 86/// Reason for resolving a false positive. 87#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 88#[serde(rename_all = "snake_case")] 89pub enum ResolutionReason { 90 /// Artist uploaded their own distributed music 91 OriginalArtist, 92 /// Artist has licensing/permission for the content 93 Licensed, 94 /// Fingerprint matcher produced a false match 95 FingerprintNoise, 96 /// Legal cover version or remix 97 CoverVersion, 98 /// Content was deleted from plyr.fm 99 ContentDeleted, 100 /// Other reason (see resolution_notes) 101 Other, 102} 103 104impl ResolutionReason { 105 /// Human-readable label for the reason. 106 pub fn label(&self) -> &'static str { 107 match self { 108 Self::OriginalArtist => "original artist", 109 Self::Licensed => "licensed", 110 Self::FingerprintNoise => "fingerprint noise", 111 Self::CoverVersion => "cover/remix", 112 Self::ContentDeleted => "content deleted", 113 Self::Other => "other", 114 } 115 } 116 117 /// Parse from string. 118 pub fn from_str(s: &str) -> Option<Self> { 119 match s { 120 "original_artist" => Some(Self::OriginalArtist), 121 "licensed" => Some(Self::Licensed), 122 "fingerprint_noise" => Some(Self::FingerprintNoise), 123 "cover_version" => Some(Self::CoverVersion), 124 "content_deleted" => Some(Self::ContentDeleted), 125 "other" => Some(Self::Other), 126 _ => None, 127 } 128 } 129} 130 131/// Context stored alongside a label for display in admin UI. 132#[derive(Debug, Clone, Serialize, Deserialize, Default)] 133pub struct LabelContext { 134 pub track_id: Option<i64>, 135 pub track_title: Option<String>, 136 pub artist_handle: Option<String>, 137 pub artist_did: Option<String>, 138 pub highest_score: Option<f64>, 139 pub matches: Option<Vec<CopyrightMatch>>, 140 /// Why the flag was resolved as false positive (set on resolution). 141 #[serde(skip_serializing_if = "Option::is_none")] 142 pub resolution_reason: Option<ResolutionReason>, 143 /// Additional notes about the resolution. 144 #[serde(skip_serializing_if = "Option::is_none")] 145 pub resolution_notes: Option<String>, 146} 147 148/// Database connection pool and operations. 149#[derive(Clone)] 150pub struct LabelDb { 151 pool: PgPool, 152} 153 154/// Stored label row from the database. 155#[derive(Debug, Clone, sqlx::FromRow)] 156pub struct LabelRow { 157 pub seq: i64, 158 pub src: String, 159 pub uri: String, 160 pub cid: Option<String>, 161 pub val: String, 162 pub neg: bool, 163 pub cts: DateTime<Utc>, 164 pub exp: Option<DateTime<Utc>>, 165 pub sig: Vec<u8>, 166} 167 168impl LabelDb { 169 /// Connect to the database. 170 pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> { 171 let pool = PgPoolOptions::new() 172 .max_connections(5) 173 .connect(database_url) 174 .await?; 175 Ok(Self { pool }) 176 } 177 178 /// Run database migrations. 179 pub async fn migrate(&self) -> Result<(), sqlx::Error> { 180 sqlx::query( 181 r#" 182 CREATE TABLE IF NOT EXISTS labels ( 183 id BIGSERIAL PRIMARY KEY, 184 seq BIGSERIAL UNIQUE NOT NULL, 185 src TEXT NOT NULL, 186 uri TEXT NOT NULL, 187 cid TEXT, 188 val TEXT NOT NULL, 189 neg BOOLEAN NOT NULL DEFAULT FALSE, 190 cts TIMESTAMPTZ NOT NULL, 191 exp TIMESTAMPTZ, 192 sig BYTEA NOT NULL, 193 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 194 ) 195 "#, 196 ) 197 .execute(&self.pool) 198 .await?; 199 200 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_uri ON labels(uri)") 201 .execute(&self.pool) 202 .await?; 203 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_src ON labels(src)") 204 .execute(&self.pool) 205 .await?; 206 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_seq ON labels(seq)") 207 .execute(&self.pool) 208 .await?; 209 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val)") 210 .execute(&self.pool) 211 .await?; 212 213 // Label context table for admin UI display 214 sqlx::query( 215 r#" 216 CREATE TABLE IF NOT EXISTS label_context ( 217 id BIGSERIAL PRIMARY KEY, 218 uri TEXT NOT NULL UNIQUE, 219 track_title TEXT, 220 artist_handle TEXT, 221 artist_did TEXT, 222 highest_score DOUBLE PRECISION, 223 matches JSONB, 224 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 225 ) 226 "#, 227 ) 228 .execute(&self.pool) 229 .await?; 230 231 sqlx::query("CREATE INDEX IF NOT EXISTS idx_label_context_uri ON label_context(uri)") 232 .execute(&self.pool) 233 .await?; 234 235 // Add resolution columns (migration-safe: only adds if missing) 236 sqlx::query("ALTER TABLE label_context ADD COLUMN IF NOT EXISTS resolution_reason TEXT") 237 .execute(&self.pool) 238 .await?; 239 sqlx::query("ALTER TABLE label_context ADD COLUMN IF NOT EXISTS resolution_notes TEXT") 240 .execute(&self.pool) 241 .await?; 242 243 // Sensitive images table for content moderation 244 sqlx::query( 245 r#" 246 CREATE TABLE IF NOT EXISTS sensitive_images ( 247 id BIGSERIAL PRIMARY KEY, 248 image_id TEXT, 249 url TEXT, 250 reason TEXT, 251 flagged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 252 flagged_by TEXT 253 ) 254 "#, 255 ) 256 .execute(&self.pool) 257 .await?; 258 259 sqlx::query("CREATE INDEX IF NOT EXISTS idx_sensitive_images_image_id ON sensitive_images(image_id)") 260 .execute(&self.pool) 261 .await?; 262 sqlx::query("CREATE INDEX IF NOT EXISTS idx_sensitive_images_url ON sensitive_images(url)") 263 .execute(&self.pool) 264 .await?; 265 266 // Review batches for mobile-friendly flag review 267 sqlx::query( 268 r#" 269 CREATE TABLE IF NOT EXISTS review_batches ( 270 id TEXT PRIMARY KEY, 271 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 272 expires_at TIMESTAMPTZ, 273 status TEXT NOT NULL DEFAULT 'pending', 274 created_by TEXT 275 ) 276 "#, 277 ) 278 .execute(&self.pool) 279 .await?; 280 281 // Flags within review batches 282 sqlx::query( 283 r#" 284 CREATE TABLE IF NOT EXISTS batch_flags ( 285 id BIGSERIAL PRIMARY KEY, 286 batch_id TEXT NOT NULL REFERENCES review_batches(id) ON DELETE CASCADE, 287 uri TEXT NOT NULL, 288 reviewed BOOLEAN NOT NULL DEFAULT FALSE, 289 reviewed_at TIMESTAMPTZ, 290 decision TEXT, 291 UNIQUE(batch_id, uri) 292 ) 293 "#, 294 ) 295 .execute(&self.pool) 296 .await?; 297 298 sqlx::query("CREATE INDEX IF NOT EXISTS idx_batch_flags_batch_id ON batch_flags(batch_id)") 299 .execute(&self.pool) 300 .await?; 301 302 Ok(()) 303 } 304 305 /// Store or update label context for a URI. 306 pub async fn store_context( 307 &self, 308 uri: &str, 309 context: &LabelContext, 310 ) -> Result<(), sqlx::Error> { 311 let matches_json = context 312 .matches 313 .as_ref() 314 .map(|m| serde_json::to_value(m).unwrap_or_default()); 315 let reason_str = context 316 .resolution_reason 317 .map(|r| format!("{:?}", r).to_lowercase()); 318 319 sqlx::query( 320 r#" 321 INSERT INTO label_context (uri, track_id, track_title, artist_handle, artist_did, highest_score, matches, resolution_reason, resolution_notes) 322 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 323 ON CONFLICT (uri) DO UPDATE SET 324 track_id = COALESCE(EXCLUDED.track_id, label_context.track_id), 325 track_title = COALESCE(EXCLUDED.track_title, label_context.track_title), 326 artist_handle = COALESCE(EXCLUDED.artist_handle, label_context.artist_handle), 327 artist_did = COALESCE(EXCLUDED.artist_did, label_context.artist_did), 328 highest_score = COALESCE(EXCLUDED.highest_score, label_context.highest_score), 329 matches = COALESCE(EXCLUDED.matches, label_context.matches), 330 resolution_reason = COALESCE(EXCLUDED.resolution_reason, label_context.resolution_reason), 331 resolution_notes = COALESCE(EXCLUDED.resolution_notes, label_context.resolution_notes) 332 "#, 333 ) 334 .bind(uri) 335 .bind(context.track_id) 336 .bind(&context.track_title) 337 .bind(&context.artist_handle) 338 .bind(&context.artist_did) 339 .bind(context.highest_score) 340 .bind(matches_json) 341 .bind(reason_str) 342 .bind(&context.resolution_notes) 343 .execute(&self.pool) 344 .await?; 345 346 Ok(()) 347 } 348 349 /// Store resolution reason for a URI (without overwriting other context). 350 pub async fn store_resolution( 351 &self, 352 uri: &str, 353 reason: ResolutionReason, 354 notes: Option<&str>, 355 ) -> Result<(), sqlx::Error> { 356 let reason_str = format!("{:?}", reason).to_lowercase(); 357 sqlx::query( 358 r#" 359 INSERT INTO label_context (uri, resolution_reason, resolution_notes) 360 VALUES ($1, $2, $3) 361 ON CONFLICT (uri) DO UPDATE SET 362 resolution_reason = EXCLUDED.resolution_reason, 363 resolution_notes = EXCLUDED.resolution_notes 364 "#, 365 ) 366 .bind(uri) 367 .bind(reason_str) 368 .bind(notes) 369 .execute(&self.pool) 370 .await?; 371 372 Ok(()) 373 } 374 375 /// Get label context for a URI. 376 pub async fn get_context(&self, uri: &str) -> Result<Option<LabelContext>, sqlx::Error> { 377 let row: Option<ContextRow> = sqlx::query_as( 378 r#" 379 SELECT track_id, track_title, artist_handle, artist_did, highest_score, matches, resolution_reason, resolution_notes 380 FROM label_context 381 WHERE uri = $1 382 "#, 383 ) 384 .bind(uri) 385 .fetch_optional(&self.pool) 386 .await?; 387 388 Ok(row.map( 389 |( 390 track_id, 391 track_title, 392 artist_handle, 393 artist_did, 394 highest_score, 395 matches, 396 resolution_reason, 397 resolution_notes, 398 )| { 399 LabelContext { 400 track_id, 401 track_title, 402 artist_handle, 403 artist_did, 404 highest_score, 405 matches: matches.and_then(|v| serde_json::from_value(v).ok()), 406 resolution_reason: resolution_reason 407 .and_then(|s| ResolutionReason::from_str(&s)), 408 resolution_notes, 409 } 410 }, 411 )) 412 } 413 414 /// Store a signed label and return its sequence number. 415 pub async fn store_label(&self, label: &Label) -> Result<i64, sqlx::Error> { 416 let sig = label.sig.as_ref().map(|b| b.to_vec()).unwrap_or_default(); 417 let cts: DateTime<Utc> = label.cts.parse().unwrap_or_else(|_| Utc::now()); 418 let exp: Option<DateTime<Utc>> = label.exp.as_ref().and_then(|e| e.parse().ok()); 419 420 let row = sqlx::query_scalar::<_, i64>( 421 r#" 422 INSERT INTO labels (src, uri, cid, val, neg, cts, exp, sig) 423 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 424 RETURNING seq 425 "#, 426 ) 427 .bind(&label.src) 428 .bind(&label.uri) 429 .bind(&label.cid) 430 .bind(&label.val) 431 .bind(label.neg.unwrap_or(false)) 432 .bind(cts) 433 .bind(exp) 434 .bind(sig) 435 .fetch_one(&self.pool) 436 .await?; 437 438 Ok(row) 439 } 440 441 /// Query labels matching URI patterns. 442 /// 443 /// Patterns can contain `*` as a wildcard (e.g., `at://did:plc:*`). 444 pub async fn query_labels( 445 &self, 446 uri_patterns: &[String], 447 sources: Option<&[String]>, 448 cursor: Option<&str>, 449 limit: i64, 450 ) -> Result<(Vec<LabelRow>, Option<String>), sqlx::Error> { 451 // Build dynamic query 452 let mut conditions = Vec::new(); 453 let mut param_idx = 1; 454 455 // URI pattern matching 456 let uri_conditions: Vec<String> = uri_patterns 457 .iter() 458 .map(|p| { 459 let idx = param_idx; 460 param_idx += 1; 461 if p.contains('*') { 462 format!("uri LIKE ${}", idx) 463 } else { 464 format!("uri = ${}", idx) 465 } 466 }) 467 .collect(); 468 469 if !uri_conditions.is_empty() { 470 conditions.push(format!("({})", uri_conditions.join(" OR "))); 471 } 472 473 // Source filtering 474 if let Some(srcs) = sources { 475 if !srcs.is_empty() { 476 let placeholders: Vec<String> = srcs 477 .iter() 478 .map(|_| { 479 let idx = param_idx; 480 param_idx += 1; 481 format!("${}", idx) 482 }) 483 .collect(); 484 conditions.push(format!("src IN ({})", placeholders.join(", "))); 485 } 486 } 487 488 // Cursor for pagination 489 if cursor.is_some() { 490 conditions.push(format!("seq > ${}", param_idx)); 491 } 492 493 let where_clause = if conditions.is_empty() { 494 String::new() 495 } else { 496 format!("WHERE {}", conditions.join(" AND ")) 497 }; 498 499 let query = format!( 500 r#" 501 SELECT seq, src, uri, cid, val, neg, cts, exp, sig 502 FROM labels 503 {} 504 ORDER BY seq ASC 505 LIMIT {} 506 "#, 507 where_clause, 508 limit + 1 // Fetch one extra to determine if there's more 509 ); 510 511 // Build query with parameters 512 let mut q = sqlx::query_as::<_, LabelRow>(&query); 513 514 // Bind URI patterns (converting * to %) 515 for pattern in uri_patterns { 516 let sql_pattern = pattern.replace('*', "%"); 517 q = q.bind(sql_pattern); 518 } 519 520 // Bind sources 521 if let Some(srcs) = sources { 522 for src in srcs { 523 q = q.bind(src); 524 } 525 } 526 527 // Bind cursor 528 if let Some(c) = cursor { 529 let cursor_seq: i64 = c.parse().unwrap_or(0); 530 q = q.bind(cursor_seq); 531 } 532 533 let mut rows: Vec<LabelRow> = q.fetch_all(&self.pool).await?; 534 535 // Determine next cursor 536 let next_cursor = if rows.len() > limit as usize { 537 rows.pop(); // Remove the extra row 538 rows.last().map(|r| r.seq.to_string()) 539 } else { 540 None 541 }; 542 543 Ok((rows, next_cursor)) 544 } 545 546 /// Get labels since a sequence number (for subscribeLabels). 547 pub async fn get_labels_since( 548 &self, 549 cursor: i64, 550 limit: i64, 551 ) -> Result<Vec<LabelRow>, sqlx::Error> { 552 sqlx::query_as::<_, LabelRow>( 553 r#" 554 SELECT seq, src, uri, cid, val, neg, cts, exp, sig 555 FROM labels 556 WHERE seq > $1 557 ORDER BY seq ASC 558 LIMIT $2 559 "#, 560 ) 561 .bind(cursor) 562 .bind(limit) 563 .fetch_all(&self.pool) 564 .await 565 } 566 567 /// Get the latest sequence number. 568 pub async fn get_latest_seq(&self) -> Result<i64, sqlx::Error> { 569 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(seq) FROM labels") 570 .fetch_one(&self.pool) 571 .await 572 .map(|s| s.unwrap_or(0)) 573 } 574 575 /// Get URIs that have active (non-negated) copyright-violation labels. 576 /// 577 /// For each URI, checks if there's a negation label. Returns only those 578 /// that are still actively flagged. 579 pub async fn get_active_labels(&self, uris: &[String]) -> Result<Vec<String>, sqlx::Error> { 580 if uris.is_empty() { 581 return Ok(Vec::new()); 582 } 583 584 // Get all negated URIs from our input set 585 let negated_uris: std::collections::HashSet<String> = sqlx::query_scalar::<_, String>( 586 r#" 587 SELECT DISTINCT uri 588 FROM labels 589 WHERE val = 'copyright-violation' AND neg = true AND uri = ANY($1) 590 "#, 591 ) 592 .bind(uris) 593 .fetch_all(&self.pool) 594 .await? 595 .into_iter() 596 .collect(); 597 598 // Get URIs that have a positive label and are not negated 599 let active_uris: Vec<String> = sqlx::query_scalar::<_, String>( 600 r#" 601 SELECT DISTINCT uri 602 FROM labels 603 WHERE val = 'copyright-violation' AND neg = false AND uri = ANY($1) 604 "#, 605 ) 606 .bind(uris) 607 .fetch_all(&self.pool) 608 .await? 609 .into_iter() 610 .filter(|uri| !negated_uris.contains(uri)) 611 .collect(); 612 613 Ok(active_uris) 614 } 615 616 /// Get all copyright-violation labels with their resolution status and context. 617 /// 618 /// A label is resolved if there's a negation label for the same uri+val. 619 pub async fn get_pending_flags(&self) -> Result<Vec<FlaggedTrack>, sqlx::Error> { 620 // Get all copyright-violation labels with context via LEFT JOIN 621 let rows: Vec<FlaggedRow> = sqlx::query_as( 622 r#" 623 SELECT l.seq, l.uri, l.val, l.cts, 624 c.track_id, c.track_title, c.artist_handle, c.artist_did, c.highest_score, c.matches, 625 c.resolution_reason, c.resolution_notes 626 FROM labels l 627 LEFT JOIN label_context c ON l.uri = c.uri 628 WHERE l.val = 'copyright-violation' AND l.neg = false 629 ORDER BY l.seq DESC 630 "#, 631 ) 632 .fetch_all(&self.pool) 633 .await?; 634 635 // Get all negation labels 636 let negated_uris: std::collections::HashSet<String> = sqlx::query_scalar::<_, String>( 637 r#" 638 SELECT DISTINCT uri 639 FROM labels 640 WHERE val = 'copyright-violation' AND neg = true 641 "#, 642 ) 643 .fetch_all(&self.pool) 644 .await? 645 .into_iter() 646 .collect(); 647 648 let tracks = rows 649 .into_iter() 650 .map( 651 |( 652 seq, 653 uri, 654 val, 655 cts, 656 track_id, 657 track_title, 658 artist_handle, 659 artist_did, 660 highest_score, 661 matches, 662 resolution_reason, 663 resolution_notes, 664 )| { 665 let context = if track_id.is_some() 666 || track_title.is_some() 667 || artist_handle.is_some() 668 || resolution_reason.is_some() 669 { 670 Some(LabelContext { 671 track_id, 672 track_title, 673 artist_handle, 674 artist_did, 675 highest_score, 676 matches: matches.and_then(|v| serde_json::from_value(v).ok()), 677 resolution_reason: resolution_reason 678 .and_then(|s| ResolutionReason::from_str(&s)), 679 resolution_notes, 680 }) 681 } else { 682 None 683 }; 684 685 FlaggedTrack { 686 seq, 687 uri: uri.clone(), 688 val, 689 created_at: cts.format("%Y-%m-%d %H:%M:%S").to_string(), 690 resolved: negated_uris.contains(&uri), 691 context, 692 } 693 }, 694 ) 695 .collect(); 696 697 Ok(tracks) 698 } 699 700 // ------------------------------------------------------------------------- 701 // Review batches 702 // ------------------------------------------------------------------------- 703 704 /// Create a review batch with the given flags. 705 pub async fn create_batch( 706 &self, 707 id: &str, 708 uris: &[String], 709 created_by: Option<&str>, 710 ) -> Result<ReviewBatch, sqlx::Error> { 711 let batch = sqlx::query_as::<_, ReviewBatch>( 712 r#" 713 INSERT INTO review_batches (id, created_by) 714 VALUES ($1, $2) 715 RETURNING id, created_at, expires_at, status, created_by 716 "#, 717 ) 718 .bind(id) 719 .bind(created_by) 720 .fetch_one(&self.pool) 721 .await?; 722 723 for uri in uris { 724 sqlx::query( 725 r#" 726 INSERT INTO batch_flags (batch_id, uri) 727 VALUES ($1, $2) 728 ON CONFLICT (batch_id, uri) DO NOTHING 729 "#, 730 ) 731 .bind(id) 732 .bind(uri) 733 .execute(&self.pool) 734 .await?; 735 } 736 737 Ok(batch) 738 } 739 740 /// Get a batch by ID. 741 pub async fn get_batch(&self, id: &str) -> Result<Option<ReviewBatch>, sqlx::Error> { 742 sqlx::query_as::<_, ReviewBatch>( 743 r#" 744 SELECT id, created_at, expires_at, status, created_by 745 FROM review_batches 746 WHERE id = $1 747 "#, 748 ) 749 .bind(id) 750 .fetch_optional(&self.pool) 751 .await 752 } 753 754 /// Get all flags in a batch with their context. 755 pub async fn get_batch_flags(&self, batch_id: &str) -> Result<Vec<FlaggedTrack>, sqlx::Error> { 756 let rows: Vec<FlaggedRow> = sqlx::query_as( 757 r#" 758 SELECT l.seq, l.uri, l.val, l.cts, 759 c.track_id, c.track_title, c.artist_handle, c.artist_did, c.highest_score, c.matches, 760 c.resolution_reason, c.resolution_notes 761 FROM batch_flags bf 762 JOIN labels l ON l.uri = bf.uri AND l.val = 'copyright-violation' AND l.neg = false 763 LEFT JOIN label_context c ON l.uri = c.uri 764 WHERE bf.batch_id = $1 765 ORDER BY l.seq DESC 766 "#, 767 ) 768 .bind(batch_id) 769 .fetch_all(&self.pool) 770 .await?; 771 772 let batch_uris: Vec<String> = rows.iter().map(|r| r.1.clone()).collect(); 773 let negated_uris: std::collections::HashSet<String> = if !batch_uris.is_empty() { 774 sqlx::query_scalar::<_, String>( 775 r#" 776 SELECT DISTINCT uri 777 FROM labels 778 WHERE val = 'copyright-violation' AND neg = true AND uri = ANY($1) 779 "#, 780 ) 781 .bind(&batch_uris) 782 .fetch_all(&self.pool) 783 .await? 784 .into_iter() 785 .collect() 786 } else { 787 std::collections::HashSet::new() 788 }; 789 790 let tracks = rows 791 .into_iter() 792 .map( 793 |( 794 seq, 795 uri, 796 val, 797 cts, 798 track_id, 799 track_title, 800 artist_handle, 801 artist_did, 802 highest_score, 803 matches, 804 resolution_reason, 805 resolution_notes, 806 )| { 807 let context = if track_id.is_some() 808 || track_title.is_some() 809 || artist_handle.is_some() 810 || resolution_reason.is_some() 811 { 812 Some(LabelContext { 813 track_id, 814 track_title, 815 artist_handle, 816 artist_did, 817 highest_score, 818 matches: matches.and_then(|v| serde_json::from_value(v).ok()), 819 resolution_reason: resolution_reason 820 .and_then(|s| ResolutionReason::from_str(&s)), 821 resolution_notes, 822 }) 823 } else { 824 None 825 }; 826 827 FlaggedTrack { 828 seq, 829 uri: uri.clone(), 830 val, 831 created_at: cts.format("%Y-%m-%d %H:%M:%S").to_string(), 832 resolved: negated_uris.contains(&uri), 833 context, 834 } 835 }, 836 ) 837 .collect(); 838 839 Ok(tracks) 840 } 841 842 /// Update batch status. 843 pub async fn update_batch_status(&self, id: &str, status: &str) -> Result<bool, sqlx::Error> { 844 let result = sqlx::query("UPDATE review_batches SET status = $1 WHERE id = $2") 845 .bind(status) 846 .bind(id) 847 .execute(&self.pool) 848 .await?; 849 Ok(result.rows_affected() > 0) 850 } 851 852 /// Mark a flag in a batch as reviewed. 853 pub async fn mark_flag_reviewed( 854 &self, 855 batch_id: &str, 856 uri: &str, 857 decision: &str, 858 ) -> Result<bool, sqlx::Error> { 859 let result = sqlx::query( 860 r#" 861 UPDATE batch_flags 862 SET reviewed = true, reviewed_at = NOW(), decision = $1 863 WHERE batch_id = $2 AND uri = $3 864 "#, 865 ) 866 .bind(decision) 867 .bind(batch_id) 868 .bind(uri) 869 .execute(&self.pool) 870 .await?; 871 Ok(result.rows_affected() > 0) 872 } 873 874 /// Get pending (non-reviewed) flags from a batch. 875 pub async fn get_batch_pending_uris(&self, batch_id: &str) -> Result<Vec<String>, sqlx::Error> { 876 sqlx::query_scalar::<_, String>( 877 r#" 878 SELECT uri FROM batch_flags 879 WHERE batch_id = $1 AND reviewed = false 880 "#, 881 ) 882 .bind(batch_id) 883 .fetch_all(&self.pool) 884 .await 885 } 886 887 // ------------------------------------------------------------------------- 888 // Sensitive images 889 // ------------------------------------------------------------------------- 890 891 /// Get all sensitive images. 892 pub async fn get_sensitive_images(&self) -> Result<Vec<SensitiveImageRow>, sqlx::Error> { 893 sqlx::query_as::<_, SensitiveImageRow>( 894 "SELECT id, image_id, url, reason, flagged_at, flagged_by FROM sensitive_images ORDER BY flagged_at DESC", 895 ) 896 .fetch_all(&self.pool) 897 .await 898 } 899 900 /// Add a sensitive image entry. 901 pub async fn add_sensitive_image( 902 &self, 903 image_id: Option<&str>, 904 url: Option<&str>, 905 reason: Option<&str>, 906 flagged_by: Option<&str>, 907 ) -> Result<i64, sqlx::Error> { 908 sqlx::query_scalar::<_, i64>( 909 r#" 910 INSERT INTO sensitive_images (image_id, url, reason, flagged_by) 911 VALUES ($1, $2, $3, $4) 912 RETURNING id 913 "#, 914 ) 915 .bind(image_id) 916 .bind(url) 917 .bind(reason) 918 .bind(flagged_by) 919 .fetch_one(&self.pool) 920 .await 921 } 922 923 /// Remove a sensitive image entry by ID. 924 pub async fn remove_sensitive_image(&self, id: i64) -> Result<bool, sqlx::Error> { 925 let result = sqlx::query("DELETE FROM sensitive_images WHERE id = $1") 926 .bind(id) 927 .execute(&self.pool) 928 .await?; 929 Ok(result.rows_affected() > 0) 930 } 931} 932 933impl LabelRow { 934 /// Convert database row to Label struct. 935 pub fn to_label(&self) -> Label { 936 Label { 937 ver: Some(1), 938 src: self.src.clone(), 939 uri: self.uri.clone(), 940 cid: self.cid.clone(), 941 val: self.val.clone(), 942 neg: if self.neg { Some(true) } else { None }, 943 cts: self.cts.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(), 944 exp: self 945 .exp 946 .map(|e| e.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()), 947 sig: Some(bytes::Bytes::from(self.sig.clone())), 948 } 949 } 950} 951 952#[cfg(test)] 953mod tests { 954 use super::*; 955 956 #[test] 957 fn test_resolution_reason_from_str() { 958 assert_eq!( 959 ResolutionReason::from_str("original_artist"), 960 Some(ResolutionReason::OriginalArtist) 961 ); 962 assert_eq!( 963 ResolutionReason::from_str("licensed"), 964 Some(ResolutionReason::Licensed) 965 ); 966 assert_eq!( 967 ResolutionReason::from_str("fingerprint_noise"), 968 Some(ResolutionReason::FingerprintNoise) 969 ); 970 assert_eq!( 971 ResolutionReason::from_str("cover_version"), 972 Some(ResolutionReason::CoverVersion) 973 ); 974 assert_eq!( 975 ResolutionReason::from_str("other"), 976 Some(ResolutionReason::Other) 977 ); 978 assert_eq!(ResolutionReason::from_str("invalid"), None); 979 } 980 981 #[test] 982 fn test_resolution_reason_labels() { 983 assert_eq!(ResolutionReason::OriginalArtist.label(), "original artist"); 984 assert_eq!(ResolutionReason::Licensed.label(), "licensed"); 985 assert_eq!( 986 ResolutionReason::FingerprintNoise.label(), 987 "fingerprint noise" 988 ); 989 assert_eq!(ResolutionReason::CoverVersion.label(), "cover/remix"); 990 assert_eq!(ResolutionReason::Other.label(), "other"); 991 } 992 993 #[test] 994 fn test_label_context_default() { 995 let ctx = LabelContext::default(); 996 assert!(ctx.track_title.is_none()); 997 assert!(ctx.resolution_reason.is_none()); 998 assert!(ctx.resolution_notes.is_none()); 999 } 1000}