music on atproto
plyr.fm
1//! Database operations for the labeler.
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::{postgres::PgPoolOptions, FromRow, PgPool};
6
7use crate::admin::FlaggedTrack;
8use crate::labels::Label;
9
10/// Sensitive image record from the database.
11#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
12pub struct SensitiveImageRow {
13 pub id: i64,
14 /// R2 storage ID (for track/album artwork)
15 pub image_id: Option<String>,
16 /// Full URL (for external images like avatars)
17 pub url: Option<String>,
18 /// Why this image was flagged
19 pub reason: Option<String>,
20 /// When the image was flagged
21 pub flagged_at: DateTime<Utc>,
22 /// Admin who flagged it
23 pub flagged_by: Option<String>,
24}
25
26/// Review batch for mobile-friendly flag review.
27#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
28pub struct ReviewBatch {
29 pub id: String,
30 pub created_at: DateTime<Utc>,
31 pub expires_at: Option<DateTime<Utc>>,
32 /// Status: pending, completed.
33 pub status: String,
34 /// Who created this batch.
35 pub created_by: Option<String>,
36}
37
38/// A flag within a review batch.
39#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
40pub struct BatchFlag {
41 pub id: i64,
42 pub batch_id: String,
43 pub uri: String,
44 pub reviewed: bool,
45 pub reviewed_at: Option<DateTime<Utc>>,
46 /// Decision: approved, rejected, or null.
47 pub decision: Option<String>,
48}
49
50/// Type alias for context row from database query.
51type ContextRow = (
52 Option<i64>, // track_id
53 Option<String>, // track_title
54 Option<String>, // artist_handle
55 Option<String>, // artist_did
56 Option<f64>, // highest_score
57 Option<serde_json::Value>, // matches
58 Option<String>, // resolution_reason
59 Option<String>, // resolution_notes
60);
61
62/// Type alias for flagged track row from database query.
63type FlaggedRow = (
64 i64, // seq
65 String, // uri
66 String, // val
67 DateTime<Utc>, // cts
68 Option<i64>, // track_id
69 Option<String>, // track_title
70 Option<String>, // artist_handle
71 Option<String>, // artist_did
72 Option<f64>, // highest_score
73 Option<serde_json::Value>, // matches
74 Option<String>, // resolution_reason
75 Option<String>, // resolution_notes
76);
77
78/// Copyright match info stored alongside labels.
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct CopyrightMatch {
81 pub title: String,
82 pub artist: String,
83 pub score: f64,
84}
85
86/// Reason for resolving a false positive.
87#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(rename_all = "snake_case")]
89pub enum ResolutionReason {
90 /// Artist uploaded their own distributed music
91 OriginalArtist,
92 /// Artist has licensing/permission for the content
93 Licensed,
94 /// Fingerprint matcher produced a false match
95 FingerprintNoise,
96 /// Legal cover version or remix
97 CoverVersion,
98 /// Content was deleted from plyr.fm
99 ContentDeleted,
100 /// Other reason (see resolution_notes)
101 Other,
102}
103
104impl ResolutionReason {
105 /// Human-readable label for the reason.
106 pub fn label(&self) -> &'static str {
107 match self {
108 Self::OriginalArtist => "original artist",
109 Self::Licensed => "licensed",
110 Self::FingerprintNoise => "fingerprint noise",
111 Self::CoverVersion => "cover/remix",
112 Self::ContentDeleted => "content deleted",
113 Self::Other => "other",
114 }
115 }
116
117 /// Parse from string.
118 pub fn from_str(s: &str) -> Option<Self> {
119 match s {
120 "original_artist" => Some(Self::OriginalArtist),
121 "licensed" => Some(Self::Licensed),
122 "fingerprint_noise" => Some(Self::FingerprintNoise),
123 "cover_version" => Some(Self::CoverVersion),
124 "content_deleted" => Some(Self::ContentDeleted),
125 "other" => Some(Self::Other),
126 _ => None,
127 }
128 }
129}
130
131/// Context stored alongside a label for display in admin UI.
132#[derive(Debug, Clone, Serialize, Deserialize, Default)]
133pub struct LabelContext {
134 pub track_id: Option<i64>,
135 pub track_title: Option<String>,
136 pub artist_handle: Option<String>,
137 pub artist_did: Option<String>,
138 pub highest_score: Option<f64>,
139 pub matches: Option<Vec<CopyrightMatch>>,
140 /// Why the flag was resolved as false positive (set on resolution).
141 #[serde(skip_serializing_if = "Option::is_none")]
142 pub resolution_reason: Option<ResolutionReason>,
143 /// Additional notes about the resolution.
144 #[serde(skip_serializing_if = "Option::is_none")]
145 pub resolution_notes: Option<String>,
146}
147
148/// Database connection pool and operations.
149#[derive(Clone)]
150pub struct LabelDb {
151 pool: PgPool,
152}
153
154/// Stored label row from the database.
155#[derive(Debug, Clone, sqlx::FromRow)]
156pub struct LabelRow {
157 pub seq: i64,
158 pub src: String,
159 pub uri: String,
160 pub cid: Option<String>,
161 pub val: String,
162 pub neg: bool,
163 pub cts: DateTime<Utc>,
164 pub exp: Option<DateTime<Utc>>,
165 pub sig: Vec<u8>,
166}
167
168impl LabelDb {
169 /// Connect to the database.
170 pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> {
171 let pool = PgPoolOptions::new()
172 .max_connections(5)
173 .connect(database_url)
174 .await?;
175 Ok(Self { pool })
176 }
177
178 /// Run database migrations.
179 pub async fn migrate(&self) -> Result<(), sqlx::Error> {
180 sqlx::query(
181 r#"
182 CREATE TABLE IF NOT EXISTS labels (
183 id BIGSERIAL PRIMARY KEY,
184 seq BIGSERIAL UNIQUE NOT NULL,
185 src TEXT NOT NULL,
186 uri TEXT NOT NULL,
187 cid TEXT,
188 val TEXT NOT NULL,
189 neg BOOLEAN NOT NULL DEFAULT FALSE,
190 cts TIMESTAMPTZ NOT NULL,
191 exp TIMESTAMPTZ,
192 sig BYTEA NOT NULL,
193 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
194 )
195 "#,
196 )
197 .execute(&self.pool)
198 .await?;
199
200 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_uri ON labels(uri)")
201 .execute(&self.pool)
202 .await?;
203 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_src ON labels(src)")
204 .execute(&self.pool)
205 .await?;
206 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_seq ON labels(seq)")
207 .execute(&self.pool)
208 .await?;
209 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val)")
210 .execute(&self.pool)
211 .await?;
212
213 // Label context table for admin UI display
214 sqlx::query(
215 r#"
216 CREATE TABLE IF NOT EXISTS label_context (
217 id BIGSERIAL PRIMARY KEY,
218 uri TEXT NOT NULL UNIQUE,
219 track_title TEXT,
220 artist_handle TEXT,
221 artist_did TEXT,
222 highest_score DOUBLE PRECISION,
223 matches JSONB,
224 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
225 )
226 "#,
227 )
228 .execute(&self.pool)
229 .await?;
230
231 sqlx::query("CREATE INDEX IF NOT EXISTS idx_label_context_uri ON label_context(uri)")
232 .execute(&self.pool)
233 .await?;
234
235 // Add resolution columns (migration-safe: only adds if missing)
236 sqlx::query("ALTER TABLE label_context ADD COLUMN IF NOT EXISTS resolution_reason TEXT")
237 .execute(&self.pool)
238 .await?;
239 sqlx::query("ALTER TABLE label_context ADD COLUMN IF NOT EXISTS resolution_notes TEXT")
240 .execute(&self.pool)
241 .await?;
242
243 // Sensitive images table for content moderation
244 sqlx::query(
245 r#"
246 CREATE TABLE IF NOT EXISTS sensitive_images (
247 id BIGSERIAL PRIMARY KEY,
248 image_id TEXT,
249 url TEXT,
250 reason TEXT,
251 flagged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
252 flagged_by TEXT
253 )
254 "#,
255 )
256 .execute(&self.pool)
257 .await?;
258
259 sqlx::query("CREATE INDEX IF NOT EXISTS idx_sensitive_images_image_id ON sensitive_images(image_id)")
260 .execute(&self.pool)
261 .await?;
262 sqlx::query("CREATE INDEX IF NOT EXISTS idx_sensitive_images_url ON sensitive_images(url)")
263 .execute(&self.pool)
264 .await?;
265
266 // Review batches for mobile-friendly flag review
267 sqlx::query(
268 r#"
269 CREATE TABLE IF NOT EXISTS review_batches (
270 id TEXT PRIMARY KEY,
271 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
272 expires_at TIMESTAMPTZ,
273 status TEXT NOT NULL DEFAULT 'pending',
274 created_by TEXT
275 )
276 "#,
277 )
278 .execute(&self.pool)
279 .await?;
280
281 // Flags within review batches
282 sqlx::query(
283 r#"
284 CREATE TABLE IF NOT EXISTS batch_flags (
285 id BIGSERIAL PRIMARY KEY,
286 batch_id TEXT NOT NULL REFERENCES review_batches(id) ON DELETE CASCADE,
287 uri TEXT NOT NULL,
288 reviewed BOOLEAN NOT NULL DEFAULT FALSE,
289 reviewed_at TIMESTAMPTZ,
290 decision TEXT,
291 UNIQUE(batch_id, uri)
292 )
293 "#,
294 )
295 .execute(&self.pool)
296 .await?;
297
298 sqlx::query("CREATE INDEX IF NOT EXISTS idx_batch_flags_batch_id ON batch_flags(batch_id)")
299 .execute(&self.pool)
300 .await?;
301
302 Ok(())
303 }
304
305 /// Store or update label context for a URI.
306 pub async fn store_context(
307 &self,
308 uri: &str,
309 context: &LabelContext,
310 ) -> Result<(), sqlx::Error> {
311 let matches_json = context
312 .matches
313 .as_ref()
314 .map(|m| serde_json::to_value(m).unwrap_or_default());
315 let reason_str = context
316 .resolution_reason
317 .map(|r| format!("{:?}", r).to_lowercase());
318
319 sqlx::query(
320 r#"
321 INSERT INTO label_context (uri, track_id, track_title, artist_handle, artist_did, highest_score, matches, resolution_reason, resolution_notes)
322 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
323 ON CONFLICT (uri) DO UPDATE SET
324 track_id = COALESCE(EXCLUDED.track_id, label_context.track_id),
325 track_title = COALESCE(EXCLUDED.track_title, label_context.track_title),
326 artist_handle = COALESCE(EXCLUDED.artist_handle, label_context.artist_handle),
327 artist_did = COALESCE(EXCLUDED.artist_did, label_context.artist_did),
328 highest_score = COALESCE(EXCLUDED.highest_score, label_context.highest_score),
329 matches = COALESCE(EXCLUDED.matches, label_context.matches),
330 resolution_reason = COALESCE(EXCLUDED.resolution_reason, label_context.resolution_reason),
331 resolution_notes = COALESCE(EXCLUDED.resolution_notes, label_context.resolution_notes)
332 "#,
333 )
334 .bind(uri)
335 .bind(context.track_id)
336 .bind(&context.track_title)
337 .bind(&context.artist_handle)
338 .bind(&context.artist_did)
339 .bind(context.highest_score)
340 .bind(matches_json)
341 .bind(reason_str)
342 .bind(&context.resolution_notes)
343 .execute(&self.pool)
344 .await?;
345
346 Ok(())
347 }
348
349 /// Store resolution reason for a URI (without overwriting other context).
350 pub async fn store_resolution(
351 &self,
352 uri: &str,
353 reason: ResolutionReason,
354 notes: Option<&str>,
355 ) -> Result<(), sqlx::Error> {
356 let reason_str = format!("{:?}", reason).to_lowercase();
357 sqlx::query(
358 r#"
359 INSERT INTO label_context (uri, resolution_reason, resolution_notes)
360 VALUES ($1, $2, $3)
361 ON CONFLICT (uri) DO UPDATE SET
362 resolution_reason = EXCLUDED.resolution_reason,
363 resolution_notes = EXCLUDED.resolution_notes
364 "#,
365 )
366 .bind(uri)
367 .bind(reason_str)
368 .bind(notes)
369 .execute(&self.pool)
370 .await?;
371
372 Ok(())
373 }
374
375 /// Get label context for a URI.
376 pub async fn get_context(&self, uri: &str) -> Result<Option<LabelContext>, sqlx::Error> {
377 let row: Option<ContextRow> = sqlx::query_as(
378 r#"
379 SELECT track_id, track_title, artist_handle, artist_did, highest_score, matches, resolution_reason, resolution_notes
380 FROM label_context
381 WHERE uri = $1
382 "#,
383 )
384 .bind(uri)
385 .fetch_optional(&self.pool)
386 .await?;
387
388 Ok(row.map(
389 |(
390 track_id,
391 track_title,
392 artist_handle,
393 artist_did,
394 highest_score,
395 matches,
396 resolution_reason,
397 resolution_notes,
398 )| {
399 LabelContext {
400 track_id,
401 track_title,
402 artist_handle,
403 artist_did,
404 highest_score,
405 matches: matches.and_then(|v| serde_json::from_value(v).ok()),
406 resolution_reason: resolution_reason
407 .and_then(|s| ResolutionReason::from_str(&s)),
408 resolution_notes,
409 }
410 },
411 ))
412 }
413
414 /// Store a signed label and return its sequence number.
415 pub async fn store_label(&self, label: &Label) -> Result<i64, sqlx::Error> {
416 let sig = label.sig.as_ref().map(|b| b.to_vec()).unwrap_or_default();
417 let cts: DateTime<Utc> = label.cts.parse().unwrap_or_else(|_| Utc::now());
418 let exp: Option<DateTime<Utc>> = label.exp.as_ref().and_then(|e| e.parse().ok());
419
420 let row = sqlx::query_scalar::<_, i64>(
421 r#"
422 INSERT INTO labels (src, uri, cid, val, neg, cts, exp, sig)
423 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
424 RETURNING seq
425 "#,
426 )
427 .bind(&label.src)
428 .bind(&label.uri)
429 .bind(&label.cid)
430 .bind(&label.val)
431 .bind(label.neg.unwrap_or(false))
432 .bind(cts)
433 .bind(exp)
434 .bind(sig)
435 .fetch_one(&self.pool)
436 .await?;
437
438 Ok(row)
439 }
440
441 /// Query labels matching URI patterns.
442 ///
443 /// Patterns can contain `*` as a wildcard (e.g., `at://did:plc:*`).
444 pub async fn query_labels(
445 &self,
446 uri_patterns: &[String],
447 sources: Option<&[String]>,
448 cursor: Option<&str>,
449 limit: i64,
450 ) -> Result<(Vec<LabelRow>, Option<String>), sqlx::Error> {
451 // Build dynamic query
452 let mut conditions = Vec::new();
453 let mut param_idx = 1;
454
455 // URI pattern matching
456 let uri_conditions: Vec<String> = uri_patterns
457 .iter()
458 .map(|p| {
459 let idx = param_idx;
460 param_idx += 1;
461 if p.contains('*') {
462 format!("uri LIKE ${}", idx)
463 } else {
464 format!("uri = ${}", idx)
465 }
466 })
467 .collect();
468
469 if !uri_conditions.is_empty() {
470 conditions.push(format!("({})", uri_conditions.join(" OR ")));
471 }
472
473 // Source filtering
474 if let Some(srcs) = sources {
475 if !srcs.is_empty() {
476 let placeholders: Vec<String> = srcs
477 .iter()
478 .map(|_| {
479 let idx = param_idx;
480 param_idx += 1;
481 format!("${}", idx)
482 })
483 .collect();
484 conditions.push(format!("src IN ({})", placeholders.join(", ")));
485 }
486 }
487
488 // Cursor for pagination
489 if cursor.is_some() {
490 conditions.push(format!("seq > ${}", param_idx));
491 }
492
493 let where_clause = if conditions.is_empty() {
494 String::new()
495 } else {
496 format!("WHERE {}", conditions.join(" AND "))
497 };
498
499 let query = format!(
500 r#"
501 SELECT seq, src, uri, cid, val, neg, cts, exp, sig
502 FROM labels
503 {}
504 ORDER BY seq ASC
505 LIMIT {}
506 "#,
507 where_clause,
508 limit + 1 // Fetch one extra to determine if there's more
509 );
510
511 // Build query with parameters
512 let mut q = sqlx::query_as::<_, LabelRow>(&query);
513
514 // Bind URI patterns (converting * to %)
515 for pattern in uri_patterns {
516 let sql_pattern = pattern.replace('*', "%");
517 q = q.bind(sql_pattern);
518 }
519
520 // Bind sources
521 if let Some(srcs) = sources {
522 for src in srcs {
523 q = q.bind(src);
524 }
525 }
526
527 // Bind cursor
528 if let Some(c) = cursor {
529 let cursor_seq: i64 = c.parse().unwrap_or(0);
530 q = q.bind(cursor_seq);
531 }
532
533 let mut rows: Vec<LabelRow> = q.fetch_all(&self.pool).await?;
534
535 // Determine next cursor
536 let next_cursor = if rows.len() > limit as usize {
537 rows.pop(); // Remove the extra row
538 rows.last().map(|r| r.seq.to_string())
539 } else {
540 None
541 };
542
543 Ok((rows, next_cursor))
544 }
545
546 /// Get labels since a sequence number (for subscribeLabels).
547 pub async fn get_labels_since(
548 &self,
549 cursor: i64,
550 limit: i64,
551 ) -> Result<Vec<LabelRow>, sqlx::Error> {
552 sqlx::query_as::<_, LabelRow>(
553 r#"
554 SELECT seq, src, uri, cid, val, neg, cts, exp, sig
555 FROM labels
556 WHERE seq > $1
557 ORDER BY seq ASC
558 LIMIT $2
559 "#,
560 )
561 .bind(cursor)
562 .bind(limit)
563 .fetch_all(&self.pool)
564 .await
565 }
566
567 /// Get the latest sequence number.
568 pub async fn get_latest_seq(&self) -> Result<i64, sqlx::Error> {
569 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(seq) FROM labels")
570 .fetch_one(&self.pool)
571 .await
572 .map(|s| s.unwrap_or(0))
573 }
574
575 /// Get URIs that have active (non-negated) copyright-violation labels.
576 ///
577 /// For each URI, checks if there's a negation label. Returns only those
578 /// that are still actively flagged.
579 pub async fn get_active_labels(&self, uris: &[String]) -> Result<Vec<String>, sqlx::Error> {
580 if uris.is_empty() {
581 return Ok(Vec::new());
582 }
583
584 // Get all negated URIs from our input set
585 let negated_uris: std::collections::HashSet<String> = sqlx::query_scalar::<_, String>(
586 r#"
587 SELECT DISTINCT uri
588 FROM labels
589 WHERE val = 'copyright-violation' AND neg = true AND uri = ANY($1)
590 "#,
591 )
592 .bind(uris)
593 .fetch_all(&self.pool)
594 .await?
595 .into_iter()
596 .collect();
597
598 // Get URIs that have a positive label and are not negated
599 let active_uris: Vec<String> = sqlx::query_scalar::<_, String>(
600 r#"
601 SELECT DISTINCT uri
602 FROM labels
603 WHERE val = 'copyright-violation' AND neg = false AND uri = ANY($1)
604 "#,
605 )
606 .bind(uris)
607 .fetch_all(&self.pool)
608 .await?
609 .into_iter()
610 .filter(|uri| !negated_uris.contains(uri))
611 .collect();
612
613 Ok(active_uris)
614 }
615
616 /// Get all copyright-violation labels with their resolution status and context.
617 ///
618 /// A label is resolved if there's a negation label for the same uri+val.
619 pub async fn get_pending_flags(&self) -> Result<Vec<FlaggedTrack>, sqlx::Error> {
620 // Get all copyright-violation labels with context via LEFT JOIN
621 let rows: Vec<FlaggedRow> = sqlx::query_as(
622 r#"
623 SELECT l.seq, l.uri, l.val, l.cts,
624 c.track_id, c.track_title, c.artist_handle, c.artist_did, c.highest_score, c.matches,
625 c.resolution_reason, c.resolution_notes
626 FROM labels l
627 LEFT JOIN label_context c ON l.uri = c.uri
628 WHERE l.val = 'copyright-violation' AND l.neg = false
629 ORDER BY l.seq DESC
630 "#,
631 )
632 .fetch_all(&self.pool)
633 .await?;
634
635 // Get all negation labels
636 let negated_uris: std::collections::HashSet<String> = sqlx::query_scalar::<_, String>(
637 r#"
638 SELECT DISTINCT uri
639 FROM labels
640 WHERE val = 'copyright-violation' AND neg = true
641 "#,
642 )
643 .fetch_all(&self.pool)
644 .await?
645 .into_iter()
646 .collect();
647
648 let tracks = rows
649 .into_iter()
650 .map(
651 |(
652 seq,
653 uri,
654 val,
655 cts,
656 track_id,
657 track_title,
658 artist_handle,
659 artist_did,
660 highest_score,
661 matches,
662 resolution_reason,
663 resolution_notes,
664 )| {
665 let context = if track_id.is_some()
666 || track_title.is_some()
667 || artist_handle.is_some()
668 || resolution_reason.is_some()
669 {
670 Some(LabelContext {
671 track_id,
672 track_title,
673 artist_handle,
674 artist_did,
675 highest_score,
676 matches: matches.and_then(|v| serde_json::from_value(v).ok()),
677 resolution_reason: resolution_reason
678 .and_then(|s| ResolutionReason::from_str(&s)),
679 resolution_notes,
680 })
681 } else {
682 None
683 };
684
685 FlaggedTrack {
686 seq,
687 uri: uri.clone(),
688 val,
689 created_at: cts.format("%Y-%m-%d %H:%M:%S").to_string(),
690 resolved: negated_uris.contains(&uri),
691 context,
692 }
693 },
694 )
695 .collect();
696
697 Ok(tracks)
698 }
699
700 // -------------------------------------------------------------------------
701 // Review batches
702 // -------------------------------------------------------------------------
703
704 /// Create a review batch with the given flags.
705 pub async fn create_batch(
706 &self,
707 id: &str,
708 uris: &[String],
709 created_by: Option<&str>,
710 ) -> Result<ReviewBatch, sqlx::Error> {
711 let batch = sqlx::query_as::<_, ReviewBatch>(
712 r#"
713 INSERT INTO review_batches (id, created_by)
714 VALUES ($1, $2)
715 RETURNING id, created_at, expires_at, status, created_by
716 "#,
717 )
718 .bind(id)
719 .bind(created_by)
720 .fetch_one(&self.pool)
721 .await?;
722
723 for uri in uris {
724 sqlx::query(
725 r#"
726 INSERT INTO batch_flags (batch_id, uri)
727 VALUES ($1, $2)
728 ON CONFLICT (batch_id, uri) DO NOTHING
729 "#,
730 )
731 .bind(id)
732 .bind(uri)
733 .execute(&self.pool)
734 .await?;
735 }
736
737 Ok(batch)
738 }
739
740 /// Get a batch by ID.
741 pub async fn get_batch(&self, id: &str) -> Result<Option<ReviewBatch>, sqlx::Error> {
742 sqlx::query_as::<_, ReviewBatch>(
743 r#"
744 SELECT id, created_at, expires_at, status, created_by
745 FROM review_batches
746 WHERE id = $1
747 "#,
748 )
749 .bind(id)
750 .fetch_optional(&self.pool)
751 .await
752 }
753
754 /// Get all flags in a batch with their context.
755 pub async fn get_batch_flags(&self, batch_id: &str) -> Result<Vec<FlaggedTrack>, sqlx::Error> {
756 let rows: Vec<FlaggedRow> = sqlx::query_as(
757 r#"
758 SELECT l.seq, l.uri, l.val, l.cts,
759 c.track_id, c.track_title, c.artist_handle, c.artist_did, c.highest_score, c.matches,
760 c.resolution_reason, c.resolution_notes
761 FROM batch_flags bf
762 JOIN labels l ON l.uri = bf.uri AND l.val = 'copyright-violation' AND l.neg = false
763 LEFT JOIN label_context c ON l.uri = c.uri
764 WHERE bf.batch_id = $1
765 ORDER BY l.seq DESC
766 "#,
767 )
768 .bind(batch_id)
769 .fetch_all(&self.pool)
770 .await?;
771
772 let batch_uris: Vec<String> = rows.iter().map(|r| r.1.clone()).collect();
773 let negated_uris: std::collections::HashSet<String> = if !batch_uris.is_empty() {
774 sqlx::query_scalar::<_, String>(
775 r#"
776 SELECT DISTINCT uri
777 FROM labels
778 WHERE val = 'copyright-violation' AND neg = true AND uri = ANY($1)
779 "#,
780 )
781 .bind(&batch_uris)
782 .fetch_all(&self.pool)
783 .await?
784 .into_iter()
785 .collect()
786 } else {
787 std::collections::HashSet::new()
788 };
789
790 let tracks = rows
791 .into_iter()
792 .map(
793 |(
794 seq,
795 uri,
796 val,
797 cts,
798 track_id,
799 track_title,
800 artist_handle,
801 artist_did,
802 highest_score,
803 matches,
804 resolution_reason,
805 resolution_notes,
806 )| {
807 let context = if track_id.is_some()
808 || track_title.is_some()
809 || artist_handle.is_some()
810 || resolution_reason.is_some()
811 {
812 Some(LabelContext {
813 track_id,
814 track_title,
815 artist_handle,
816 artist_did,
817 highest_score,
818 matches: matches.and_then(|v| serde_json::from_value(v).ok()),
819 resolution_reason: resolution_reason
820 .and_then(|s| ResolutionReason::from_str(&s)),
821 resolution_notes,
822 })
823 } else {
824 None
825 };
826
827 FlaggedTrack {
828 seq,
829 uri: uri.clone(),
830 val,
831 created_at: cts.format("%Y-%m-%d %H:%M:%S").to_string(),
832 resolved: negated_uris.contains(&uri),
833 context,
834 }
835 },
836 )
837 .collect();
838
839 Ok(tracks)
840 }
841
842 /// Update batch status.
843 pub async fn update_batch_status(&self, id: &str, status: &str) -> Result<bool, sqlx::Error> {
844 let result = sqlx::query("UPDATE review_batches SET status = $1 WHERE id = $2")
845 .bind(status)
846 .bind(id)
847 .execute(&self.pool)
848 .await?;
849 Ok(result.rows_affected() > 0)
850 }
851
852 /// Mark a flag in a batch as reviewed.
853 pub async fn mark_flag_reviewed(
854 &self,
855 batch_id: &str,
856 uri: &str,
857 decision: &str,
858 ) -> Result<bool, sqlx::Error> {
859 let result = sqlx::query(
860 r#"
861 UPDATE batch_flags
862 SET reviewed = true, reviewed_at = NOW(), decision = $1
863 WHERE batch_id = $2 AND uri = $3
864 "#,
865 )
866 .bind(decision)
867 .bind(batch_id)
868 .bind(uri)
869 .execute(&self.pool)
870 .await?;
871 Ok(result.rows_affected() > 0)
872 }
873
874 /// Get pending (non-reviewed) flags from a batch.
875 pub async fn get_batch_pending_uris(&self, batch_id: &str) -> Result<Vec<String>, sqlx::Error> {
876 sqlx::query_scalar::<_, String>(
877 r#"
878 SELECT uri FROM batch_flags
879 WHERE batch_id = $1 AND reviewed = false
880 "#,
881 )
882 .bind(batch_id)
883 .fetch_all(&self.pool)
884 .await
885 }
886
887 // -------------------------------------------------------------------------
888 // Sensitive images
889 // -------------------------------------------------------------------------
890
891 /// Get all sensitive images.
892 pub async fn get_sensitive_images(&self) -> Result<Vec<SensitiveImageRow>, sqlx::Error> {
893 sqlx::query_as::<_, SensitiveImageRow>(
894 "SELECT id, image_id, url, reason, flagged_at, flagged_by FROM sensitive_images ORDER BY flagged_at DESC",
895 )
896 .fetch_all(&self.pool)
897 .await
898 }
899
900 /// Add a sensitive image entry.
901 pub async fn add_sensitive_image(
902 &self,
903 image_id: Option<&str>,
904 url: Option<&str>,
905 reason: Option<&str>,
906 flagged_by: Option<&str>,
907 ) -> Result<i64, sqlx::Error> {
908 sqlx::query_scalar::<_, i64>(
909 r#"
910 INSERT INTO sensitive_images (image_id, url, reason, flagged_by)
911 VALUES ($1, $2, $3, $4)
912 RETURNING id
913 "#,
914 )
915 .bind(image_id)
916 .bind(url)
917 .bind(reason)
918 .bind(flagged_by)
919 .fetch_one(&self.pool)
920 .await
921 }
922
923 /// Remove a sensitive image entry by ID.
924 pub async fn remove_sensitive_image(&self, id: i64) -> Result<bool, sqlx::Error> {
925 let result = sqlx::query("DELETE FROM sensitive_images WHERE id = $1")
926 .bind(id)
927 .execute(&self.pool)
928 .await?;
929 Ok(result.rows_affected() > 0)
930 }
931}
932
933impl LabelRow {
934 /// Convert database row to Label struct.
935 pub fn to_label(&self) -> Label {
936 Label {
937 ver: Some(1),
938 src: self.src.clone(),
939 uri: self.uri.clone(),
940 cid: self.cid.clone(),
941 val: self.val.clone(),
942 neg: if self.neg { Some(true) } else { None },
943 cts: self.cts.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
944 exp: self
945 .exp
946 .map(|e| e.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()),
947 sig: Some(bytes::Bytes::from(self.sig.clone())),
948 }
949 }
950}
951
952#[cfg(test)]
953mod tests {
954 use super::*;
955
956 #[test]
957 fn test_resolution_reason_from_str() {
958 assert_eq!(
959 ResolutionReason::from_str("original_artist"),
960 Some(ResolutionReason::OriginalArtist)
961 );
962 assert_eq!(
963 ResolutionReason::from_str("licensed"),
964 Some(ResolutionReason::Licensed)
965 );
966 assert_eq!(
967 ResolutionReason::from_str("fingerprint_noise"),
968 Some(ResolutionReason::FingerprintNoise)
969 );
970 assert_eq!(
971 ResolutionReason::from_str("cover_version"),
972 Some(ResolutionReason::CoverVersion)
973 );
974 assert_eq!(
975 ResolutionReason::from_str("other"),
976 Some(ResolutionReason::Other)
977 );
978 assert_eq!(ResolutionReason::from_str("invalid"), None);
979 }
980
981 #[test]
982 fn test_resolution_reason_labels() {
983 assert_eq!(ResolutionReason::OriginalArtist.label(), "original artist");
984 assert_eq!(ResolutionReason::Licensed.label(), "licensed");
985 assert_eq!(
986 ResolutionReason::FingerprintNoise.label(),
987 "fingerprint noise"
988 );
989 assert_eq!(ResolutionReason::CoverVersion.label(), "cover/remix");
990 assert_eq!(ResolutionReason::Other.label(), "other");
991 }
992
993 #[test]
994 fn test_label_context_default() {
995 let ctx = LabelContext::default();
996 assert!(ctx.track_title.is_none());
997 assert!(ctx.resolution_reason.is_none());
998 assert!(ctx.resolution_notes.is_none());
999 }
1000}