audio streaming app plyr.fm
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 2025.1205.175819 624 lines 20 kB view raw
1//! Database operations for the labeler. 2 3use chrono::{DateTime, Utc}; 4use serde::{Deserialize, Serialize}; 5use sqlx::{postgres::PgPoolOptions, PgPool}; 6 7use crate::admin::FlaggedTrack; 8use crate::labels::Label; 9 10/// Type alias for context row from database query. 11type ContextRow = ( 12 Option<i64>, // track_id 13 Option<String>, // track_title 14 Option<String>, // artist_handle 15 Option<String>, // artist_did 16 Option<f64>, // highest_score 17 Option<serde_json::Value>, // matches 18 Option<String>, // resolution_reason 19 Option<String>, // resolution_notes 20); 21 22/// Type alias for flagged track row from database query. 23type FlaggedRow = ( 24 i64, // seq 25 String, // uri 26 String, // val 27 DateTime<Utc>, // cts 28 Option<i64>, // track_id 29 Option<String>, // track_title 30 Option<String>, // artist_handle 31 Option<String>, // artist_did 32 Option<f64>, // highest_score 33 Option<serde_json::Value>, // matches 34 Option<String>, // resolution_reason 35 Option<String>, // resolution_notes 36); 37 38/// Copyright match info stored alongside labels. 39#[derive(Debug, Clone, Serialize, Deserialize)] 40pub struct CopyrightMatch { 41 pub title: String, 42 pub artist: String, 43 pub score: f64, 44} 45 46/// Reason for resolving a false positive. 47#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 48#[serde(rename_all = "snake_case")] 49pub enum ResolutionReason { 50 /// Artist uploaded their own distributed music 51 OriginalArtist, 52 /// Artist has licensing/permission for the content 53 Licensed, 54 /// Fingerprint matcher produced a false match 55 FingerprintNoise, 56 /// Legal cover version or remix 57 CoverVersion, 58 /// Other reason (see resolution_notes) 59 Other, 60} 61 62impl ResolutionReason { 63 /// Human-readable label for the reason. 64 pub fn label(&self) -> &'static str { 65 match self { 66 Self::OriginalArtist => "original artist", 67 Self::Licensed => "licensed", 68 Self::FingerprintNoise => "fingerprint noise", 69 Self::CoverVersion => "cover/remix", 70 Self::Other => "other", 71 } 72 } 73 74 /// Parse from string. 75 pub fn from_str(s: &str) -> Option<Self> { 76 match s { 77 "original_artist" => Some(Self::OriginalArtist), 78 "licensed" => Some(Self::Licensed), 79 "fingerprint_noise" => Some(Self::FingerprintNoise), 80 "cover_version" => Some(Self::CoverVersion), 81 "other" => Some(Self::Other), 82 _ => None, 83 } 84 } 85} 86 87/// Context stored alongside a label for display in admin UI. 88#[derive(Debug, Clone, Serialize, Deserialize, Default)] 89pub struct LabelContext { 90 pub track_id: Option<i64>, 91 pub track_title: Option<String>, 92 pub artist_handle: Option<String>, 93 pub artist_did: Option<String>, 94 pub highest_score: Option<f64>, 95 pub matches: Option<Vec<CopyrightMatch>>, 96 /// Why the flag was resolved as false positive (set on resolution). 97 #[serde(skip_serializing_if = "Option::is_none")] 98 pub resolution_reason: Option<ResolutionReason>, 99 /// Additional notes about the resolution. 100 #[serde(skip_serializing_if = "Option::is_none")] 101 pub resolution_notes: Option<String>, 102} 103 104/// Database connection pool and operations. 105#[derive(Clone)] 106pub struct LabelDb { 107 pool: PgPool, 108} 109 110/// Stored label row from the database. 111#[derive(Debug, Clone, sqlx::FromRow)] 112pub struct LabelRow { 113 pub seq: i64, 114 pub src: String, 115 pub uri: String, 116 pub cid: Option<String>, 117 pub val: String, 118 pub neg: bool, 119 pub cts: DateTime<Utc>, 120 pub exp: Option<DateTime<Utc>>, 121 pub sig: Vec<u8>, 122} 123 124impl LabelDb { 125 /// Connect to the database. 126 pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> { 127 let pool = PgPoolOptions::new() 128 .max_connections(5) 129 .connect(database_url) 130 .await?; 131 Ok(Self { pool }) 132 } 133 134 /// Run database migrations. 135 pub async fn migrate(&self) -> Result<(), sqlx::Error> { 136 sqlx::query( 137 r#" 138 CREATE TABLE IF NOT EXISTS labels ( 139 id BIGSERIAL PRIMARY KEY, 140 seq BIGSERIAL UNIQUE NOT NULL, 141 src TEXT NOT NULL, 142 uri TEXT NOT NULL, 143 cid TEXT, 144 val TEXT NOT NULL, 145 neg BOOLEAN NOT NULL DEFAULT FALSE, 146 cts TIMESTAMPTZ NOT NULL, 147 exp TIMESTAMPTZ, 148 sig BYTEA NOT NULL, 149 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 150 ) 151 "#, 152 ) 153 .execute(&self.pool) 154 .await?; 155 156 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_uri ON labels(uri)") 157 .execute(&self.pool) 158 .await?; 159 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_src ON labels(src)") 160 .execute(&self.pool) 161 .await?; 162 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_seq ON labels(seq)") 163 .execute(&self.pool) 164 .await?; 165 sqlx::query("CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val)") 166 .execute(&self.pool) 167 .await?; 168 169 // Label context table for admin UI display 170 sqlx::query( 171 r#" 172 CREATE TABLE IF NOT EXISTS label_context ( 173 id BIGSERIAL PRIMARY KEY, 174 uri TEXT NOT NULL UNIQUE, 175 track_title TEXT, 176 artist_handle TEXT, 177 artist_did TEXT, 178 highest_score DOUBLE PRECISION, 179 matches JSONB, 180 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 181 ) 182 "#, 183 ) 184 .execute(&self.pool) 185 .await?; 186 187 sqlx::query("CREATE INDEX IF NOT EXISTS idx_label_context_uri ON label_context(uri)") 188 .execute(&self.pool) 189 .await?; 190 191 // Add resolution columns (migration-safe: only adds if missing) 192 sqlx::query("ALTER TABLE label_context ADD COLUMN IF NOT EXISTS resolution_reason TEXT") 193 .execute(&self.pool) 194 .await?; 195 sqlx::query("ALTER TABLE label_context ADD COLUMN IF NOT EXISTS resolution_notes TEXT") 196 .execute(&self.pool) 197 .await?; 198 199 Ok(()) 200 } 201 202 /// Store or update label context for a URI. 203 pub async fn store_context( 204 &self, 205 uri: &str, 206 context: &LabelContext, 207 ) -> Result<(), sqlx::Error> { 208 let matches_json = context 209 .matches 210 .as_ref() 211 .map(|m| serde_json::to_value(m).unwrap_or_default()); 212 let reason_str = context 213 .resolution_reason 214 .map(|r| format!("{:?}", r).to_lowercase()); 215 216 sqlx::query( 217 r#" 218 INSERT INTO label_context (uri, track_id, track_title, artist_handle, artist_did, highest_score, matches, resolution_reason, resolution_notes) 219 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 220 ON CONFLICT (uri) DO UPDATE SET 221 track_id = COALESCE(EXCLUDED.track_id, label_context.track_id), 222 track_title = COALESCE(EXCLUDED.track_title, label_context.track_title), 223 artist_handle = COALESCE(EXCLUDED.artist_handle, label_context.artist_handle), 224 artist_did = COALESCE(EXCLUDED.artist_did, label_context.artist_did), 225 highest_score = COALESCE(EXCLUDED.highest_score, label_context.highest_score), 226 matches = COALESCE(EXCLUDED.matches, label_context.matches), 227 resolution_reason = COALESCE(EXCLUDED.resolution_reason, label_context.resolution_reason), 228 resolution_notes = COALESCE(EXCLUDED.resolution_notes, label_context.resolution_notes) 229 "#, 230 ) 231 .bind(uri) 232 .bind(context.track_id) 233 .bind(&context.track_title) 234 .bind(&context.artist_handle) 235 .bind(&context.artist_did) 236 .bind(context.highest_score) 237 .bind(matches_json) 238 .bind(reason_str) 239 .bind(&context.resolution_notes) 240 .execute(&self.pool) 241 .await?; 242 243 Ok(()) 244 } 245 246 /// Store resolution reason for a URI (without overwriting other context). 247 pub async fn store_resolution( 248 &self, 249 uri: &str, 250 reason: ResolutionReason, 251 notes: Option<&str>, 252 ) -> Result<(), sqlx::Error> { 253 let reason_str = format!("{:?}", reason).to_lowercase(); 254 sqlx::query( 255 r#" 256 INSERT INTO label_context (uri, resolution_reason, resolution_notes) 257 VALUES ($1, $2, $3) 258 ON CONFLICT (uri) DO UPDATE SET 259 resolution_reason = EXCLUDED.resolution_reason, 260 resolution_notes = EXCLUDED.resolution_notes 261 "#, 262 ) 263 .bind(uri) 264 .bind(reason_str) 265 .bind(notes) 266 .execute(&self.pool) 267 .await?; 268 269 Ok(()) 270 } 271 272 /// Get label context for a URI. 273 pub async fn get_context(&self, uri: &str) -> Result<Option<LabelContext>, sqlx::Error> { 274 let row: Option<ContextRow> = sqlx::query_as( 275 r#" 276 SELECT track_id, track_title, artist_handle, artist_did, highest_score, matches, resolution_reason, resolution_notes 277 FROM label_context 278 WHERE uri = $1 279 "#, 280 ) 281 .bind(uri) 282 .fetch_optional(&self.pool) 283 .await?; 284 285 Ok(row.map( 286 |( 287 track_id, 288 track_title, 289 artist_handle, 290 artist_did, 291 highest_score, 292 matches, 293 resolution_reason, 294 resolution_notes, 295 )| { 296 LabelContext { 297 track_id, 298 track_title, 299 artist_handle, 300 artist_did, 301 highest_score, 302 matches: matches.and_then(|v| serde_json::from_value(v).ok()), 303 resolution_reason: resolution_reason 304 .and_then(|s| ResolutionReason::from_str(&s)), 305 resolution_notes, 306 } 307 }, 308 )) 309 } 310 311 /// Store a signed label and return its sequence number. 312 pub async fn store_label(&self, label: &Label) -> Result<i64, sqlx::Error> { 313 let sig = label.sig.as_ref().map(|b| b.to_vec()).unwrap_or_default(); 314 let cts: DateTime<Utc> = label.cts.parse().unwrap_or_else(|_| Utc::now()); 315 let exp: Option<DateTime<Utc>> = label.exp.as_ref().and_then(|e| e.parse().ok()); 316 317 let row = sqlx::query_scalar::<_, i64>( 318 r#" 319 INSERT INTO labels (src, uri, cid, val, neg, cts, exp, sig) 320 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 321 RETURNING seq 322 "#, 323 ) 324 .bind(&label.src) 325 .bind(&label.uri) 326 .bind(&label.cid) 327 .bind(&label.val) 328 .bind(label.neg.unwrap_or(false)) 329 .bind(cts) 330 .bind(exp) 331 .bind(sig) 332 .fetch_one(&self.pool) 333 .await?; 334 335 Ok(row) 336 } 337 338 /// Query labels matching URI patterns. 339 /// 340 /// Patterns can contain `*` as a wildcard (e.g., `at://did:plc:*`). 341 pub async fn query_labels( 342 &self, 343 uri_patterns: &[String], 344 sources: Option<&[String]>, 345 cursor: Option<&str>, 346 limit: i64, 347 ) -> Result<(Vec<LabelRow>, Option<String>), sqlx::Error> { 348 // Build dynamic query 349 let mut conditions = Vec::new(); 350 let mut param_idx = 1; 351 352 // URI pattern matching 353 let uri_conditions: Vec<String> = uri_patterns 354 .iter() 355 .map(|p| { 356 let idx = param_idx; 357 param_idx += 1; 358 if p.contains('*') { 359 format!("uri LIKE ${}", idx) 360 } else { 361 format!("uri = ${}", idx) 362 } 363 }) 364 .collect(); 365 366 if !uri_conditions.is_empty() { 367 conditions.push(format!("({})", uri_conditions.join(" OR "))); 368 } 369 370 // Source filtering 371 if let Some(srcs) = sources { 372 if !srcs.is_empty() { 373 let placeholders: Vec<String> = srcs 374 .iter() 375 .map(|_| { 376 let idx = param_idx; 377 param_idx += 1; 378 format!("${}", idx) 379 }) 380 .collect(); 381 conditions.push(format!("src IN ({})", placeholders.join(", "))); 382 } 383 } 384 385 // Cursor for pagination 386 if cursor.is_some() { 387 conditions.push(format!("seq > ${}", param_idx)); 388 } 389 390 let where_clause = if conditions.is_empty() { 391 String::new() 392 } else { 393 format!("WHERE {}", conditions.join(" AND ")) 394 }; 395 396 let query = format!( 397 r#" 398 SELECT seq, src, uri, cid, val, neg, cts, exp, sig 399 FROM labels 400 {} 401 ORDER BY seq ASC 402 LIMIT {} 403 "#, 404 where_clause, 405 limit + 1 // Fetch one extra to determine if there's more 406 ); 407 408 // Build query with parameters 409 let mut q = sqlx::query_as::<_, LabelRow>(&query); 410 411 // Bind URI patterns (converting * to %) 412 for pattern in uri_patterns { 413 let sql_pattern = pattern.replace('*', "%"); 414 q = q.bind(sql_pattern); 415 } 416 417 // Bind sources 418 if let Some(srcs) = sources { 419 for src in srcs { 420 q = q.bind(src); 421 } 422 } 423 424 // Bind cursor 425 if let Some(c) = cursor { 426 let cursor_seq: i64 = c.parse().unwrap_or(0); 427 q = q.bind(cursor_seq); 428 } 429 430 let mut rows: Vec<LabelRow> = q.fetch_all(&self.pool).await?; 431 432 // Determine next cursor 433 let next_cursor = if rows.len() > limit as usize { 434 rows.pop(); // Remove the extra row 435 rows.last().map(|r| r.seq.to_string()) 436 } else { 437 None 438 }; 439 440 Ok((rows, next_cursor)) 441 } 442 443 /// Get labels since a sequence number (for subscribeLabels). 444 pub async fn get_labels_since( 445 &self, 446 cursor: i64, 447 limit: i64, 448 ) -> Result<Vec<LabelRow>, sqlx::Error> { 449 sqlx::query_as::<_, LabelRow>( 450 r#" 451 SELECT seq, src, uri, cid, val, neg, cts, exp, sig 452 FROM labels 453 WHERE seq > $1 454 ORDER BY seq ASC 455 LIMIT $2 456 "#, 457 ) 458 .bind(cursor) 459 .bind(limit) 460 .fetch_all(&self.pool) 461 .await 462 } 463 464 /// Get the latest sequence number. 465 pub async fn get_latest_seq(&self) -> Result<i64, sqlx::Error> { 466 sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(seq) FROM labels") 467 .fetch_one(&self.pool) 468 .await 469 .map(|s| s.unwrap_or(0)) 470 } 471 472 /// Get all copyright-violation labels with their resolution status and context. 473 /// 474 /// A label is resolved if there's a negation label for the same uri+val. 475 pub async fn get_pending_flags(&self) -> Result<Vec<FlaggedTrack>, sqlx::Error> { 476 // Get all copyright-violation labels with context via LEFT JOIN 477 let rows: Vec<FlaggedRow> = sqlx::query_as( 478 r#" 479 SELECT l.seq, l.uri, l.val, l.cts, 480 c.track_id, c.track_title, c.artist_handle, c.artist_did, c.highest_score, c.matches, 481 c.resolution_reason, c.resolution_notes 482 FROM labels l 483 LEFT JOIN label_context c ON l.uri = c.uri 484 WHERE l.val = 'copyright-violation' AND l.neg = false 485 ORDER BY l.seq DESC 486 "#, 487 ) 488 .fetch_all(&self.pool) 489 .await?; 490 491 // Get all negation labels 492 let negated_uris: std::collections::HashSet<String> = sqlx::query_scalar::<_, String>( 493 r#" 494 SELECT DISTINCT uri 495 FROM labels 496 WHERE val = 'copyright-violation' AND neg = true 497 "#, 498 ) 499 .fetch_all(&self.pool) 500 .await? 501 .into_iter() 502 .collect(); 503 504 let tracks = rows 505 .into_iter() 506 .map( 507 |( 508 seq, 509 uri, 510 val, 511 cts, 512 track_id, 513 track_title, 514 artist_handle, 515 artist_did, 516 highest_score, 517 matches, 518 resolution_reason, 519 resolution_notes, 520 )| { 521 let context = if track_id.is_some() 522 || track_title.is_some() 523 || artist_handle.is_some() 524 || resolution_reason.is_some() 525 { 526 Some(LabelContext { 527 track_id, 528 track_title, 529 artist_handle, 530 artist_did, 531 highest_score, 532 matches: matches.and_then(|v| serde_json::from_value(v).ok()), 533 resolution_reason: resolution_reason 534 .and_then(|s| ResolutionReason::from_str(&s)), 535 resolution_notes, 536 }) 537 } else { 538 None 539 }; 540 541 FlaggedTrack { 542 seq, 543 uri: uri.clone(), 544 val, 545 created_at: cts.format("%Y-%m-%d %H:%M:%S").to_string(), 546 resolved: negated_uris.contains(&uri), 547 context, 548 } 549 }, 550 ) 551 .collect(); 552 553 Ok(tracks) 554 } 555} 556 557impl LabelRow { 558 /// Convert database row to Label struct. 559 pub fn to_label(&self) -> Label { 560 Label { 561 ver: Some(1), 562 src: self.src.clone(), 563 uri: self.uri.clone(), 564 cid: self.cid.clone(), 565 val: self.val.clone(), 566 neg: if self.neg { Some(true) } else { None }, 567 cts: self.cts.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(), 568 exp: self 569 .exp 570 .map(|e| e.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()), 571 sig: Some(bytes::Bytes::from(self.sig.clone())), 572 } 573 } 574} 575 576#[cfg(test)] 577mod tests { 578 use super::*; 579 580 #[test] 581 fn test_resolution_reason_from_str() { 582 assert_eq!( 583 ResolutionReason::from_str("original_artist"), 584 Some(ResolutionReason::OriginalArtist) 585 ); 586 assert_eq!( 587 ResolutionReason::from_str("licensed"), 588 Some(ResolutionReason::Licensed) 589 ); 590 assert_eq!( 591 ResolutionReason::from_str("fingerprint_noise"), 592 Some(ResolutionReason::FingerprintNoise) 593 ); 594 assert_eq!( 595 ResolutionReason::from_str("cover_version"), 596 Some(ResolutionReason::CoverVersion) 597 ); 598 assert_eq!( 599 ResolutionReason::from_str("other"), 600 Some(ResolutionReason::Other) 601 ); 602 assert_eq!(ResolutionReason::from_str("invalid"), None); 603 } 604 605 #[test] 606 fn test_resolution_reason_labels() { 607 assert_eq!(ResolutionReason::OriginalArtist.label(), "original artist"); 608 assert_eq!(ResolutionReason::Licensed.label(), "licensed"); 609 assert_eq!( 610 ResolutionReason::FingerprintNoise.label(), 611 "fingerprint noise" 612 ); 613 assert_eq!(ResolutionReason::CoverVersion.label(), "cover/remix"); 614 assert_eq!(ResolutionReason::Other.label(), "other"); 615 } 616 617 #[test] 618 fn test_label_context_default() { 619 let ctx = LabelContext::default(); 620 assert!(ctx.track_title.is_none()); 621 assert!(ctx.resolution_reason.is_none()); 622 assert!(ctx.resolution_notes.is_none()); 623 } 624}