+47
-3
.github/workflows/check-rust.yml
+47
-3
.github/workflows/check-rust.yml
···
11
11
contents: read
12
12
13
13
jobs:
14
+
changes:
15
+
name: detect changes
16
+
runs-on: ubuntu-latest
17
+
outputs:
18
+
moderation: ${{ steps.filter.outputs.moderation }}
19
+
transcoder: ${{ steps.filter.outputs.transcoder }}
20
+
steps:
21
+
- uses: actions/checkout@v4
22
+
- uses: dorny/paths-filter@v3
23
+
id: filter
24
+
with:
25
+
filters: |
26
+
moderation:
27
+
- 'moderation/**'
28
+
- '.github/workflows/check-rust.yml'
29
+
transcoder:
30
+
- 'transcoder/**'
31
+
- '.github/workflows/check-rust.yml'
32
+
14
33
check:
15
34
name: cargo check
16
35
runs-on: ubuntu-latest
17
36
timeout-minutes: 15
37
+
needs: changes
18
38
19
39
strategy:
40
+
fail-fast: false
20
41
matrix:
21
-
service: [moderation, transcoder]
42
+
include:
43
+
- service: moderation
44
+
changed: ${{ needs.changes.outputs.moderation }}
45
+
- service: transcoder
46
+
changed: ${{ needs.changes.outputs.transcoder }}
22
47
23
48
steps:
24
49
- uses: actions/checkout@v4
50
+
if: matrix.changed == 'true'
25
51
26
52
- name: install rust toolchain
53
+
if: matrix.changed == 'true'
27
54
uses: dtolnay/rust-toolchain@stable
28
55
29
56
- name: cache cargo
57
+
if: matrix.changed == 'true'
30
58
uses: Swatinem/rust-cache@v2
31
59
with:
32
60
workspaces: ${{ matrix.service }}
33
61
34
62
- name: cargo check
63
+
if: matrix.changed == 'true'
35
64
working-directory: ${{ matrix.service }}
36
65
run: cargo check --release
37
66
67
+
- name: skip (no changes)
68
+
if: matrix.changed != 'true'
69
+
run: echo "skipping ${{ matrix.service }} - no changes"
70
+
38
71
docker-build:
39
72
name: docker build
40
73
runs-on: ubuntu-latest
41
74
timeout-minutes: 10
42
-
needs: check
75
+
needs: [changes, check]
43
76
44
77
strategy:
78
+
fail-fast: false
45
79
matrix:
46
-
service: [moderation, transcoder]
80
+
include:
81
+
- service: moderation
82
+
changed: ${{ needs.changes.outputs.moderation }}
83
+
- service: transcoder
84
+
changed: ${{ needs.changes.outputs.transcoder }}
47
85
48
86
steps:
49
87
- uses: actions/checkout@v4
88
+
if: matrix.changed == 'true'
50
89
51
90
- name: build docker image
91
+
if: matrix.changed == 'true'
52
92
working-directory: ${{ matrix.service }}
53
93
run: docker build -t ${{ matrix.service }}:ci-test .
94
+
95
+
- name: skip (no changes)
96
+
if: matrix.changed != 'true'
97
+
run: echo "skipping ${{ matrix.service }} - no changes"
+64
.github/workflows/run-moderation-loop.yml
+64
.github/workflows/run-moderation-loop.yml
···
1
+
# run moderation loop via workflow dispatch
2
+
#
3
+
# analyzes pending copyright flags, auto-resolves false positives,
4
+
# creates review batches for human review, sends DM notification
5
+
#
6
+
# required secrets:
7
+
# MODERATION_SERVICE_URL - moderation service base URL
8
+
# MODERATION_AUTH_TOKEN - X-Moderation-Key header value
9
+
# ANTHROPIC_API_KEY - for flag analysis
10
+
# NOTIFY_BOT_HANDLE - bluesky bot handle for DMs
11
+
# NOTIFY_BOT_PASSWORD - bluesky bot app password
12
+
# NOTIFY_RECIPIENT_HANDLE - who receives DM notifications
13
+
14
+
name: run moderation loop
15
+
16
+
on:
17
+
workflow_dispatch:
18
+
inputs:
19
+
dry_run:
20
+
description: "dry run (analyze only, don't resolve or send DMs)"
21
+
type: boolean
22
+
default: true
23
+
limit:
24
+
description: "max flags to process (leave empty for all)"
25
+
type: string
26
+
default: ""
27
+
env:
28
+
description: "environment (for DM header)"
29
+
type: choice
30
+
options:
31
+
- prod
32
+
- staging
33
+
- dev
34
+
default: prod
35
+
36
+
jobs:
37
+
run:
38
+
runs-on: ubuntu-latest
39
+
40
+
steps:
41
+
- uses: actions/checkout@v4
42
+
43
+
- uses: astral-sh/setup-uv@v4
44
+
45
+
- name: Run moderation loop
46
+
run: |
47
+
ARGS=""
48
+
if [ "${{ inputs.dry_run }}" = "true" ]; then
49
+
ARGS="$ARGS --dry-run"
50
+
fi
51
+
if [ -n "${{ inputs.limit }}" ]; then
52
+
ARGS="$ARGS --limit ${{ inputs.limit }}"
53
+
fi
54
+
ARGS="$ARGS --env ${{ inputs.env }}"
55
+
56
+
echo "Running: uv run scripts/moderation_loop.py $ARGS"
57
+
uv run scripts/moderation_loop.py $ARGS
58
+
env:
59
+
MODERATION_SERVICE_URL: ${{ secrets.MODERATION_SERVICE_URL }}
60
+
MODERATION_AUTH_TOKEN: ${{ secrets.MODERATION_AUTH_TOKEN }}
61
+
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
62
+
NOTIFY_BOT_HANDLE: ${{ secrets.NOTIFY_BOT_HANDLE }}
63
+
NOTIFY_BOT_PASSWORD: ${{ secrets.NOTIFY_BOT_PASSWORD }}
64
+
NOTIFY_RECIPIENT_HANDLE: ${{ secrets.NOTIFY_RECIPIENT_HANDLE }}
+1
-1
backend/fly.staging.toml
+1
-1
backend/fly.staging.toml
···
44
44
# - AWS_ACCESS_KEY_ID (cloudflare R2)
45
45
# - AWS_SECRET_ACCESS_KEY (cloudflare R2)
46
46
# - OAUTH_ENCRYPTION_KEY (generate: python -c 'from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())')
47
-
# - DOCKET_URL (upstash redis: rediss://default:xxx@xxx.upstash.io:6379)
47
+
# - DOCKET_URL (self-hosted redis: redis://plyr-redis-stg.internal:6379)
+1
-1
backend/fly.toml
+1
-1
backend/fly.toml
···
39
39
# - AWS_ACCESS_KEY_ID (cloudflare R2)
40
40
# - AWS_SECRET_ACCESS_KEY (cloudflare R2)
41
41
# - OAUTH_ENCRYPTION_KEY (generate: python -c 'from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())')
42
-
# - DOCKET_URL (upstash redis: rediss://default:xxx@xxx.upstash.io:6379)
42
+
# - DOCKET_URL (self-hosted redis: redis://plyr-redis.internal:6379)
+12
-17
docs/backend/background-tasks.md
+12
-17
docs/backend/background-tasks.md
···
71
71
72
72
### production/staging
73
73
74
-
Redis instances are provisioned via Upstash (managed Redis):
74
+
Redis instances are self-hosted on Fly.io (redis:7-alpine):
75
75
76
-
| environment | instance | region |
77
-
|-------------|----------|--------|
78
-
| production | `plyr-redis-prd` | us-east-1 (near fly.io) |
79
-
| staging | `plyr-redis-stg` | us-east-1 |
76
+
| environment | fly app | region |
77
+
|-------------|---------|--------|
78
+
| production | `plyr-redis` | iad |
79
+
| staging | `plyr-redis-stg` | iad |
80
80
81
81
set `DOCKET_URL` in fly.io secrets:
82
82
```bash
83
-
flyctl secrets set DOCKET_URL=rediss://default:xxx@xxx.upstash.io:6379 -a relay-api
84
-
flyctl secrets set DOCKET_URL=rediss://default:xxx@xxx.upstash.io:6379 -a relay-api-staging
83
+
flyctl secrets set DOCKET_URL=redis://plyr-redis.internal:6379 -a relay-api
84
+
flyctl secrets set DOCKET_URL=redis://plyr-redis-stg.internal:6379 -a relay-api-staging
85
85
```
86
86
87
-
note: use `rediss://` (with double 's') for TLS connections to Upstash.
87
+
note: uses Fly internal networking (`.internal` domain), no TLS needed within private network.
88
88
89
89
## usage
90
90
···
134
134
135
135
## costs
136
136
137
-
**Upstash pricing** (pay-per-request):
138
-
- free tier: 10k commands/day
139
-
- pro: $0.2 per 100k commands + $0.25/GB storage
137
+
**self-hosted Redis on Fly.io** (fixed monthly):
138
+
- ~$2/month per instance (256MB shared-cpu VM)
139
+
- ~$4/month total for prod + staging
140
140
141
-
for plyr.fm's volume (~100 uploads/day), this stays well within free tier or costs $0-5/mo.
142
-
143
-
**tips to avoid surprise bills**:
144
-
- use **regional** (not global) replication
145
-
- set **max data limit** (256MB is plenty for a task queue)
146
-
- monitor usage in Upstash dashboard
141
+
this replaced Upstash pay-per-command pricing which was costing ~$75/month at scale (37M commands/month).
147
142
148
143
## fallback behavior
149
144
+2
-2
docs/deployment/environments.md
+2
-2
docs/deployment/environments.md
···
7
7
| environment | trigger | backend URL | database | redis | frontend | storage |
8
8
|-------------|---------|-------------|----------|-------|----------|---------|
9
9
| **development** | local | localhost:8001 | plyr-dev (neon) | localhost:6379 (docker) | localhost:5173 | audio-dev, images-dev (r2) |
10
-
| **staging** | push to main | api-stg.plyr.fm | plyr-stg (neon) | plyr-redis-stg (upstash) | stg.plyr.fm (main branch) | audio-staging, images-staging (r2) |
11
-
| **production** | github release | api.plyr.fm | plyr-prd (neon) | plyr-redis-prd (upstash) | plyr.fm (production-fe branch) | audio-prod, images-prod (r2) |
10
+
| **staging** | push to main | api-stg.plyr.fm | plyr-stg (neon) | plyr-redis-stg (fly.io) | stg.plyr.fm (main branch) | audio-staging, images-staging (r2) |
11
+
| **production** | github release | api.plyr.fm | plyr-prd (neon) | plyr-redis (fly.io) | plyr.fm (production-fe branch) | audio-prod, images-prod (r2) |
12
12
13
13
## workflow
14
14
+1
-1
moderation/Cargo.toml
+1
-1
moderation/Cargo.toml
···
6
6
[dependencies]
7
7
anyhow = "1.0"
8
8
axum = { version = "0.7", features = ["macros", "json", "ws"] }
9
+
rand = "0.8"
9
10
bytes = "1.0"
10
11
chrono = { version = "0.4", features = ["serde"] }
11
12
futures = "0.3"
···
25
26
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
26
27
27
28
[dev-dependencies]
28
-
rand = "0.8"
+69
moderation/src/admin.rs
+69
moderation/src/admin.rs
···
137
137
pub message: String,
138
138
}
139
139
140
+
/// Request to create a review batch.
141
+
#[derive(Debug, Deserialize)]
142
+
pub struct CreateBatchRequest {
143
+
/// URIs to include. If empty, uses all pending flags.
144
+
#[serde(default)]
145
+
pub uris: Vec<String>,
146
+
/// Who created this batch.
147
+
pub created_by: Option<String>,
148
+
}
149
+
150
+
/// Response after creating a review batch.
151
+
#[derive(Debug, Serialize)]
152
+
pub struct CreateBatchResponse {
153
+
pub id: String,
154
+
pub url: String,
155
+
pub flag_count: usize,
156
+
}
157
+
140
158
/// List all flagged tracks - returns JSON for API, HTML for htmx.
141
159
pub async fn list_flagged(
142
160
State(state): State<AppState>,
···
325
343
Ok(Json(StoreContextResponse {
326
344
message: format!("context stored for {}", request.uri),
327
345
}))
346
+
}
347
+
348
+
/// Create a review batch from pending flags.
349
+
pub async fn create_batch(
350
+
State(state): State<AppState>,
351
+
Json(request): Json<CreateBatchRequest>,
352
+
) -> Result<Json<CreateBatchResponse>, AppError> {
353
+
let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?;
354
+
355
+
// Get URIs to include
356
+
let uris = if request.uris.is_empty() {
357
+
let pending = db.get_pending_flags().await?;
358
+
pending
359
+
.into_iter()
360
+
.filter(|t| !t.resolved)
361
+
.map(|t| t.uri)
362
+
.collect()
363
+
} else {
364
+
request.uris
365
+
};
366
+
367
+
if uris.is_empty() {
368
+
return Err(AppError::BadRequest("no flags to review".to_string()));
369
+
}
370
+
371
+
let id = generate_batch_id();
372
+
let flag_count = uris.len();
373
+
374
+
tracing::info!(
375
+
batch_id = %id,
376
+
flag_count = flag_count,
377
+
"creating review batch"
378
+
);
379
+
380
+
db.create_batch(&id, &uris, request.created_by.as_deref())
381
+
.await?;
382
+
383
+
let url = format!("/admin/review/{}", id);
384
+
385
+
Ok(Json(CreateBatchResponse { id, url, flag_count }))
386
+
}
387
+
388
+
/// Generate a short, URL-safe batch ID.
389
+
fn generate_batch_id() -> String {
390
+
use std::time::{SystemTime, UNIX_EPOCH};
391
+
let now = SystemTime::now()
392
+
.duration_since(UNIX_EPOCH)
393
+
.unwrap()
394
+
.as_millis();
395
+
let rand_part: u32 = rand::random();
396
+
format!("{:x}{:x}", (now as u64) & 0xFFFFFFFF, rand_part & 0xFFFF)
328
397
}
329
398
330
399
/// Add a sensitive image entry.
+5
-1
moderation/src/auth.rs
+5
-1
moderation/src/auth.rs
···
12
12
let path = req.uri().path();
13
13
14
14
// Public endpoints - no auth required
15
-
// Note: /admin serves HTML, auth is handled client-side for API calls
15
+
// Note: /admin and /admin/review/:id serve HTML, auth is handled client-side for API calls
16
16
// Static files must be public for admin UI CSS/JS to load
17
+
let is_review_page = path.starts_with("/admin/review/")
18
+
&& !path.ends_with("/data")
19
+
&& !path.ends_with("/submit");
17
20
if path == "/"
18
21
|| path == "/health"
19
22
|| path == "/sensitive-images"
20
23
|| path == "/admin"
24
+
|| is_review_page
21
25
|| path.starts_with("/static/")
22
26
|| path.starts_with("/xrpc/com.atproto.label.")
23
27
{
+251
moderation/src/db.rs
+251
moderation/src/db.rs
···
23
23
pub flagged_by: Option<String>,
24
24
}
25
25
26
+
/// Review batch for mobile-friendly flag review.
27
+
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
28
+
pub struct ReviewBatch {
29
+
pub id: String,
30
+
pub created_at: DateTime<Utc>,
31
+
pub expires_at: Option<DateTime<Utc>>,
32
+
/// Status: pending, completed.
33
+
pub status: String,
34
+
/// Who created this batch.
35
+
pub created_by: Option<String>,
36
+
}
37
+
38
+
/// A flag within a review batch.
39
+
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
40
+
pub struct BatchFlag {
41
+
pub id: i64,
42
+
pub batch_id: String,
43
+
pub uri: String,
44
+
pub reviewed: bool,
45
+
pub reviewed_at: Option<DateTime<Utc>>,
46
+
/// Decision: approved, rejected, or null.
47
+
pub decision: Option<String>,
48
+
}
49
+
26
50
/// Type alias for context row from database query.
27
51
type ContextRow = (
28
52
Option<i64>, // track_id
···
71
95
FingerprintNoise,
72
96
/// Legal cover version or remix
73
97
CoverVersion,
98
+
/// Content was deleted from plyr.fm
99
+
ContentDeleted,
74
100
/// Other reason (see resolution_notes)
75
101
Other,
76
102
}
···
83
109
Self::Licensed => "licensed",
84
110
Self::FingerprintNoise => "fingerprint noise",
85
111
Self::CoverVersion => "cover/remix",
112
+
Self::ContentDeleted => "content deleted",
86
113
Self::Other => "other",
87
114
}
88
115
}
···
94
121
"licensed" => Some(Self::Licensed),
95
122
"fingerprint_noise" => Some(Self::FingerprintNoise),
96
123
"cover_version" => Some(Self::CoverVersion),
124
+
"content_deleted" => Some(Self::ContentDeleted),
97
125
"other" => Some(Self::Other),
98
126
_ => None,
99
127
}
···
232
260
.execute(&self.pool)
233
261
.await?;
234
262
sqlx::query("CREATE INDEX IF NOT EXISTS idx_sensitive_images_url ON sensitive_images(url)")
263
+
.execute(&self.pool)
264
+
.await?;
265
+
266
+
// Review batches for mobile-friendly flag review
267
+
sqlx::query(
268
+
r#"
269
+
CREATE TABLE IF NOT EXISTS review_batches (
270
+
id TEXT PRIMARY KEY,
271
+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
272
+
expires_at TIMESTAMPTZ,
273
+
status TEXT NOT NULL DEFAULT 'pending',
274
+
created_by TEXT
275
+
)
276
+
"#,
277
+
)
278
+
.execute(&self.pool)
279
+
.await?;
280
+
281
+
// Flags within review batches
282
+
sqlx::query(
283
+
r#"
284
+
CREATE TABLE IF NOT EXISTS batch_flags (
285
+
id BIGSERIAL PRIMARY KEY,
286
+
batch_id TEXT NOT NULL REFERENCES review_batches(id) ON DELETE CASCADE,
287
+
uri TEXT NOT NULL,
288
+
reviewed BOOLEAN NOT NULL DEFAULT FALSE,
289
+
reviewed_at TIMESTAMPTZ,
290
+
decision TEXT,
291
+
UNIQUE(batch_id, uri)
292
+
)
293
+
"#,
294
+
)
295
+
.execute(&self.pool)
296
+
.await?;
297
+
298
+
sqlx::query("CREATE INDEX IF NOT EXISTS idx_batch_flags_batch_id ON batch_flags(batch_id)")
235
299
.execute(&self.pool)
236
300
.await?;
237
301
···
631
695
.collect();
632
696
633
697
Ok(tracks)
698
+
}
699
+
700
+
// -------------------------------------------------------------------------
701
+
// Review batches
702
+
// -------------------------------------------------------------------------
703
+
704
+
/// Create a review batch with the given flags.
705
+
pub async fn create_batch(
706
+
&self,
707
+
id: &str,
708
+
uris: &[String],
709
+
created_by: Option<&str>,
710
+
) -> Result<ReviewBatch, sqlx::Error> {
711
+
let batch = sqlx::query_as::<_, ReviewBatch>(
712
+
r#"
713
+
INSERT INTO review_batches (id, created_by)
714
+
VALUES ($1, $2)
715
+
RETURNING id, created_at, expires_at, status, created_by
716
+
"#,
717
+
)
718
+
.bind(id)
719
+
.bind(created_by)
720
+
.fetch_one(&self.pool)
721
+
.await?;
722
+
723
+
for uri in uris {
724
+
sqlx::query(
725
+
r#"
726
+
INSERT INTO batch_flags (batch_id, uri)
727
+
VALUES ($1, $2)
728
+
ON CONFLICT (batch_id, uri) DO NOTHING
729
+
"#,
730
+
)
731
+
.bind(id)
732
+
.bind(uri)
733
+
.execute(&self.pool)
734
+
.await?;
735
+
}
736
+
737
+
Ok(batch)
738
+
}
739
+
740
+
/// Get a batch by ID.
741
+
pub async fn get_batch(&self, id: &str) -> Result<Option<ReviewBatch>, sqlx::Error> {
742
+
sqlx::query_as::<_, ReviewBatch>(
743
+
r#"
744
+
SELECT id, created_at, expires_at, status, created_by
745
+
FROM review_batches
746
+
WHERE id = $1
747
+
"#,
748
+
)
749
+
.bind(id)
750
+
.fetch_optional(&self.pool)
751
+
.await
752
+
}
753
+
754
+
/// Get all flags in a batch with their context.
755
+
pub async fn get_batch_flags(&self, batch_id: &str) -> Result<Vec<FlaggedTrack>, sqlx::Error> {
756
+
let rows: Vec<FlaggedRow> = sqlx::query_as(
757
+
r#"
758
+
SELECT l.seq, l.uri, l.val, l.cts,
759
+
c.track_id, c.track_title, c.artist_handle, c.artist_did, c.highest_score, c.matches,
760
+
c.resolution_reason, c.resolution_notes
761
+
FROM batch_flags bf
762
+
JOIN labels l ON l.uri = bf.uri AND l.val = 'copyright-violation' AND l.neg = false
763
+
LEFT JOIN label_context c ON l.uri = c.uri
764
+
WHERE bf.batch_id = $1
765
+
ORDER BY l.seq DESC
766
+
"#,
767
+
)
768
+
.bind(batch_id)
769
+
.fetch_all(&self.pool)
770
+
.await?;
771
+
772
+
let batch_uris: Vec<String> = rows.iter().map(|r| r.1.clone()).collect();
773
+
let negated_uris: std::collections::HashSet<String> = if !batch_uris.is_empty() {
774
+
sqlx::query_scalar::<_, String>(
775
+
r#"
776
+
SELECT DISTINCT uri
777
+
FROM labels
778
+
WHERE val = 'copyright-violation' AND neg = true AND uri = ANY($1)
779
+
"#,
780
+
)
781
+
.bind(&batch_uris)
782
+
.fetch_all(&self.pool)
783
+
.await?
784
+
.into_iter()
785
+
.collect()
786
+
} else {
787
+
std::collections::HashSet::new()
788
+
};
789
+
790
+
let tracks = rows
791
+
.into_iter()
792
+
.map(
793
+
|(
794
+
seq,
795
+
uri,
796
+
val,
797
+
cts,
798
+
track_id,
799
+
track_title,
800
+
artist_handle,
801
+
artist_did,
802
+
highest_score,
803
+
matches,
804
+
resolution_reason,
805
+
resolution_notes,
806
+
)| {
807
+
let context = if track_id.is_some()
808
+
|| track_title.is_some()
809
+
|| artist_handle.is_some()
810
+
|| resolution_reason.is_some()
811
+
{
812
+
Some(LabelContext {
813
+
track_id,
814
+
track_title,
815
+
artist_handle,
816
+
artist_did,
817
+
highest_score,
818
+
matches: matches.and_then(|v| serde_json::from_value(v).ok()),
819
+
resolution_reason: resolution_reason
820
+
.and_then(|s| ResolutionReason::from_str(&s)),
821
+
resolution_notes,
822
+
})
823
+
} else {
824
+
None
825
+
};
826
+
827
+
FlaggedTrack {
828
+
seq,
829
+
uri: uri.clone(),
830
+
val,
831
+
created_at: cts.format("%Y-%m-%d %H:%M:%S").to_string(),
832
+
resolved: negated_uris.contains(&uri),
833
+
context,
834
+
}
835
+
},
836
+
)
837
+
.collect();
838
+
839
+
Ok(tracks)
840
+
}
841
+
842
+
/// Update batch status.
843
+
pub async fn update_batch_status(&self, id: &str, status: &str) -> Result<bool, sqlx::Error> {
844
+
let result = sqlx::query("UPDATE review_batches SET status = $1 WHERE id = $2")
845
+
.bind(status)
846
+
.bind(id)
847
+
.execute(&self.pool)
848
+
.await?;
849
+
Ok(result.rows_affected() > 0)
850
+
}
851
+
852
+
/// Mark a flag in a batch as reviewed.
853
+
pub async fn mark_flag_reviewed(
854
+
&self,
855
+
batch_id: &str,
856
+
uri: &str,
857
+
decision: &str,
858
+
) -> Result<bool, sqlx::Error> {
859
+
let result = sqlx::query(
860
+
r#"
861
+
UPDATE batch_flags
862
+
SET reviewed = true, reviewed_at = NOW(), decision = $1
863
+
WHERE batch_id = $2 AND uri = $3
864
+
"#,
865
+
)
866
+
.bind(decision)
867
+
.bind(batch_id)
868
+
.bind(uri)
869
+
.execute(&self.pool)
870
+
.await?;
871
+
Ok(result.rows_affected() > 0)
872
+
}
873
+
874
+
/// Get pending (non-reviewed) flags from a batch.
875
+
pub async fn get_batch_pending_uris(&self, batch_id: &str) -> Result<Vec<String>, sqlx::Error> {
876
+
sqlx::query_scalar::<_, String>(
877
+
r#"
878
+
SELECT uri FROM batch_flags
879
+
WHERE batch_id = $1 AND reviewed = false
880
+
"#,
881
+
)
882
+
.bind(batch_id)
883
+
.fetch_all(&self.pool)
884
+
.await
634
885
}
635
886
636
887
// -------------------------------------------------------------------------
+6
moderation/src/main.rs
+6
moderation/src/main.rs
···
25
25
mod db;
26
26
mod handlers;
27
27
mod labels;
28
+
mod review;
28
29
mod state;
29
30
mod xrpc;
30
31
···
91
92
"/admin/sensitive-images/remove",
92
93
post(admin::remove_sensitive_image),
93
94
)
95
+
.route("/admin/batches", post(admin::create_batch))
96
+
// Review endpoints (under admin, auth protected)
97
+
.route("/admin/review/:id", get(review::review_page))
98
+
.route("/admin/review/:id/data", get(review::review_data))
99
+
.route("/admin/review/:id/submit", post(review::submit_review))
94
100
// Static files (CSS, JS for admin UI)
95
101
.nest_service("/static", ServeDir::new("static"))
96
102
// ATProto XRPC endpoints (public)
+526
moderation/src/review.rs
+526
moderation/src/review.rs
···
1
+
//! Review endpoints for batch flag review.
2
+
//!
3
+
//! These endpoints are behind the same auth as admin endpoints.
4
+
5
+
use axum::{
6
+
extract::{Path, State},
7
+
http::header::CONTENT_TYPE,
8
+
response::{IntoResponse, Response},
9
+
Json,
10
+
};
11
+
use serde::{Deserialize, Serialize};
12
+
13
+
use crate::admin::FlaggedTrack;
14
+
use crate::state::{AppError, AppState};
15
+
16
+
/// Response for review page data.
17
+
#[derive(Debug, Serialize)]
18
+
pub struct ReviewPageData {
19
+
pub batch_id: String,
20
+
pub flags: Vec<FlaggedTrack>,
21
+
pub status: String,
22
+
}
23
+
24
+
/// Request to submit review decisions.
25
+
#[derive(Debug, Deserialize)]
26
+
pub struct SubmitReviewRequest {
27
+
pub decisions: Vec<ReviewDecision>,
28
+
}
29
+
30
+
/// A single review decision.
31
+
#[derive(Debug, Deserialize)]
32
+
pub struct ReviewDecision {
33
+
pub uri: String,
34
+
/// "clear" (false positive), "defer" (acknowledge, no action), "confirm" (real violation)
35
+
pub decision: String,
36
+
}
37
+
38
+
/// Response after submitting review.
39
+
#[derive(Debug, Serialize)]
40
+
pub struct SubmitReviewResponse {
41
+
pub resolved_count: usize,
42
+
pub message: String,
43
+
}
44
+
45
+
/// Get review page HTML.
46
+
pub async fn review_page(
47
+
State(state): State<AppState>,
48
+
Path(batch_id): Path<String>,
49
+
) -> Result<Response, AppError> {
50
+
let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?;
51
+
52
+
let batch = db
53
+
.get_batch(&batch_id)
54
+
.await?
55
+
.ok_or(AppError::NotFound("batch not found".to_string()))?;
56
+
57
+
let flags = db.get_batch_flags(&batch_id).await?;
58
+
let html = render_review_page(&batch_id, &flags, &batch.status);
59
+
60
+
Ok(([(CONTENT_TYPE, "text/html; charset=utf-8")], html).into_response())
61
+
}
62
+
63
+
/// Get review data as JSON.
64
+
pub async fn review_data(
65
+
State(state): State<AppState>,
66
+
Path(batch_id): Path<String>,
67
+
) -> Result<Json<ReviewPageData>, AppError> {
68
+
let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?;
69
+
70
+
let batch = db
71
+
.get_batch(&batch_id)
72
+
.await?
73
+
.ok_or(AppError::NotFound("batch not found".to_string()))?;
74
+
75
+
let flags = db.get_batch_flags(&batch_id).await?;
76
+
77
+
Ok(Json(ReviewPageData {
78
+
batch_id,
79
+
flags,
80
+
status: batch.status,
81
+
}))
82
+
}
83
+
84
+
/// Submit review decisions.
85
+
pub async fn submit_review(
86
+
State(state): State<AppState>,
87
+
Path(batch_id): Path<String>,
88
+
Json(request): Json<SubmitReviewRequest>,
89
+
) -> Result<Json<SubmitReviewResponse>, AppError> {
90
+
let db = state.db.as_ref().ok_or(AppError::LabelerNotConfigured)?;
91
+
let signer = state
92
+
.signer
93
+
.as_ref()
94
+
.ok_or(AppError::LabelerNotConfigured)?;
95
+
96
+
let _batch = db
97
+
.get_batch(&batch_id)
98
+
.await?
99
+
.ok_or(AppError::NotFound("batch not found".to_string()))?;
100
+
101
+
let mut resolved_count = 0;
102
+
103
+
for decision in &request.decisions {
104
+
tracing::info!(
105
+
batch_id = %batch_id,
106
+
uri = %decision.uri,
107
+
decision = %decision.decision,
108
+
"processing review decision"
109
+
);
110
+
111
+
db.mark_flag_reviewed(&batch_id, &decision.uri, &decision.decision)
112
+
.await?;
113
+
114
+
match decision.decision.as_str() {
115
+
"clear" => {
116
+
// False positive - emit negation label to clear the flag
117
+
let label =
118
+
crate::labels::Label::new(signer.did(), &decision.uri, "copyright-violation")
119
+
.negated();
120
+
let label = signer.sign_label(label)?;
121
+
let seq = db.store_label(&label).await?;
122
+
123
+
db.store_resolution(
124
+
&decision.uri,
125
+
crate::db::ResolutionReason::FingerprintNoise,
126
+
Some("batch review: cleared"),
127
+
)
128
+
.await?;
129
+
130
+
if let Some(tx) = &state.label_tx {
131
+
let _ = tx.send((seq, label));
132
+
}
133
+
134
+
resolved_count += 1;
135
+
}
136
+
"defer" => {
137
+
// Acknowledge but take no action - flag stays active
138
+
// Just mark as reviewed in the batch, no label changes
139
+
tracing::info!(uri = %decision.uri, "deferred - no action taken");
140
+
}
141
+
"confirm" => {
142
+
// Real violation - flag stays active, could add enforcement later
143
+
tracing::info!(uri = %decision.uri, "confirmed as violation");
144
+
}
145
+
_ => {
146
+
tracing::warn!(uri = %decision.uri, decision = %decision.decision, "unknown decision type");
147
+
}
148
+
}
149
+
}
150
+
151
+
let pending = db.get_batch_pending_uris(&batch_id).await?;
152
+
if pending.is_empty() {
153
+
db.update_batch_status(&batch_id, "completed").await?;
154
+
}
155
+
156
+
Ok(Json(SubmitReviewResponse {
157
+
resolved_count,
158
+
message: format!(
159
+
"processed {} decisions, resolved {} flags",
160
+
request.decisions.len(),
161
+
resolved_count
162
+
),
163
+
}))
164
+
}
165
+
166
+
/// Render the review page.
167
+
fn render_review_page(batch_id: &str, flags: &[FlaggedTrack], status: &str) -> String {
168
+
let pending: Vec<_> = flags.iter().filter(|f| !f.resolved).collect();
169
+
let resolved: Vec<_> = flags.iter().filter(|f| f.resolved).collect();
170
+
171
+
let pending_cards: Vec<String> = pending.iter().map(|f| render_review_card(f)).collect();
172
+
let resolved_cards: Vec<String> = resolved.iter().map(|f| render_review_card(f)).collect();
173
+
174
+
let pending_html = if pending_cards.is_empty() {
175
+
"<div class=\"empty\">all flags reviewed!</div>".to_string()
176
+
} else {
177
+
pending_cards.join("\n")
178
+
};
179
+
180
+
let resolved_html = if resolved_cards.is_empty() {
181
+
String::new()
182
+
} else {
183
+
format!(
184
+
r#"<details class="resolved-section">
185
+
<summary>{} resolved</summary>
186
+
{}
187
+
</details>"#,
188
+
resolved_cards.len(),
189
+
resolved_cards.join("\n")
190
+
)
191
+
};
192
+
193
+
let status_badge = if status == "completed" {
194
+
r#"<span class="badge resolved">completed</span>"#
195
+
} else {
196
+
""
197
+
};
198
+
199
+
format!(
200
+
r#"<!DOCTYPE html>
201
+
<html lang="en">
202
+
<head>
203
+
<meta charset="utf-8">
204
+
<meta name="viewport" content="width=device-width, initial-scale=1">
205
+
<title>review batch - plyr.fm</title>
206
+
<link rel="stylesheet" href="/static/admin.css">
207
+
<style>{}</style>
208
+
</head>
209
+
<body>
210
+
<h1>plyr.fm moderation</h1>
211
+
<p class="subtitle">
212
+
<a href="/admin">← back to dashboard</a>
213
+
<span style="margin: 0 12px; color: var(--text-muted);">|</span>
214
+
batch review: {} pending {}
215
+
</p>
216
+
217
+
<div class="auth-section" id="auth-section">
218
+
<input type="password" id="auth-token" placeholder="auth token"
219
+
onkeyup="if(event.key==='Enter')authenticate()">
220
+
<button class="btn btn-primary" onclick="authenticate()">authenticate</button>
221
+
</div>
222
+
223
+
<form id="review-form" style="display: none;">
224
+
<div class="flags-list">
225
+
{}
226
+
</div>
227
+
228
+
{}
229
+
230
+
<div class="submit-bar">
231
+
<button type="submit" class="btn btn-primary" id="submit-btn" disabled>
232
+
submit decisions
233
+
</button>
234
+
</div>
235
+
</form>
236
+
237
+
<script>
238
+
const form = document.getElementById('review-form');
239
+
const submitBtn = document.getElementById('submit-btn');
240
+
const authSection = document.getElementById('auth-section');
241
+
const batchId = '{}';
242
+
243
+
let currentToken = '';
244
+
const decisions = {{}};
245
+
246
+
function authenticate() {{
247
+
const token = document.getElementById('auth-token').value;
248
+
if (token && token !== '••••••••') {{
249
+
localStorage.setItem('mod_token', token);
250
+
currentToken = token;
251
+
showReviewForm();
252
+
}}
253
+
}}
254
+
255
+
function showReviewForm() {{
256
+
authSection.style.display = 'none';
257
+
form.style.display = 'block';
258
+
}}
259
+
260
+
// Check for saved token on load
261
+
const savedToken = localStorage.getItem('mod_token');
262
+
if (savedToken) {{
263
+
currentToken = savedToken;
264
+
document.getElementById('auth-token').value = '••••••••';
265
+
showReviewForm();
266
+
}}
267
+
268
+
function updateSubmitBtn() {{
269
+
const count = Object.keys(decisions).length;
270
+
submitBtn.disabled = count === 0;
271
+
submitBtn.textContent = count > 0 ? `submit ${{count}} decision${{count > 1 ? 's' : ''}}` : 'submit decisions';
272
+
}}
273
+
274
+
function setDecision(uri, decision) {{
275
+
// Toggle off if clicking the same decision
276
+
if (decisions[uri] === decision) {{
277
+
delete decisions[uri];
278
+
const card = document.querySelector(`[data-uri="${{CSS.escape(uri)}}"]`);
279
+
if (card) card.classList.remove('decision-clear', 'decision-defer', 'decision-confirm');
280
+
}} else {{
281
+
decisions[uri] = decision;
282
+
const card = document.querySelector(`[data-uri="${{CSS.escape(uri)}}"]`);
283
+
if (card) {{
284
+
card.classList.remove('decision-clear', 'decision-defer', 'decision-confirm');
285
+
card.classList.add('decision-' + decision);
286
+
}}
287
+
}}
288
+
updateSubmitBtn();
289
+
}}
290
+
291
+
form.addEventListener('submit', async (e) => {{
292
+
e.preventDefault();
293
+
submitBtn.disabled = true;
294
+
submitBtn.textContent = 'submitting...';
295
+
296
+
try {{
297
+
const response = await fetch(`/admin/review/${{batchId}}/submit`, {{
298
+
method: 'POST',
299
+
headers: {{
300
+
'Content-Type': 'application/json',
301
+
'X-Moderation-Key': currentToken
302
+
}},
303
+
body: JSON.stringify({{
304
+
decisions: Object.entries(decisions).map(([uri, decision]) => ({{ uri, decision }}))
305
+
}})
306
+
}});
307
+
308
+
if (response.status === 401) {{
309
+
localStorage.removeItem('mod_token');
310
+
currentToken = '';
311
+
authSection.style.display = 'block';
312
+
form.style.display = 'none';
313
+
document.getElementById('auth-token').value = '';
314
+
alert('invalid token');
315
+
return;
316
+
}}
317
+
318
+
if (response.ok) {{
319
+
const result = await response.json();
320
+
alert(result.message);
321
+
location.reload();
322
+
}} else {{
323
+
const err = await response.json();
324
+
alert('error: ' + (err.message || 'unknown error'));
325
+
submitBtn.disabled = false;
326
+
updateSubmitBtn();
327
+
}}
328
+
}} catch (err) {{
329
+
alert('network error: ' + err.message);
330
+
submitBtn.disabled = false;
331
+
updateSubmitBtn();
332
+
}}
333
+
}});
334
+
</script>
335
+
</body>
336
+
</html>"#,
337
+
REVIEW_CSS,
338
+
pending.len(),
339
+
status_badge,
340
+
pending_html,
341
+
resolved_html,
342
+
html_escape(batch_id)
343
+
)
344
+
}
345
+
346
+
/// Render a single review card.
347
+
fn render_review_card(track: &FlaggedTrack) -> String {
348
+
let ctx = track.context.as_ref();
349
+
350
+
let title = ctx
351
+
.and_then(|c| c.track_title.as_deref())
352
+
.unwrap_or("unknown track");
353
+
let artist = ctx
354
+
.and_then(|c| c.artist_handle.as_deref())
355
+
.unwrap_or("unknown");
356
+
let track_id = ctx.and_then(|c| c.track_id);
357
+
358
+
let title_html = if let Some(id) = track_id {
359
+
format!(
360
+
r#"<a href="https://plyr.fm/track/{}" target="_blank">{}</a>"#,
361
+
id,
362
+
html_escape(title)
363
+
)
364
+
} else {
365
+
html_escape(title)
366
+
};
367
+
368
+
let matches_html = ctx
369
+
.and_then(|c| c.matches.as_ref())
370
+
.filter(|m| !m.is_empty())
371
+
.map(|matches| {
372
+
let items: Vec<String> = matches
373
+
.iter()
374
+
.take(3)
375
+
.map(|m| {
376
+
format!(
377
+
r#"<div class="match-item"><span class="title">{}</span> <span class="artist">by {}</span></div>"#,
378
+
html_escape(&m.title),
379
+
html_escape(&m.artist)
380
+
)
381
+
})
382
+
.collect();
383
+
format!(
384
+
r#"<div class="matches"><h4>potential matches</h4>{}</div>"#,
385
+
items.join("\n")
386
+
)
387
+
})
388
+
.unwrap_or_default();
389
+
390
+
let resolved_badge = if track.resolved {
391
+
r#"<span class="badge resolved">resolved</span>"#
392
+
} else {
393
+
r#"<span class="badge pending">pending</span>"#
394
+
};
395
+
396
+
let action_buttons = if !track.resolved {
397
+
format!(
398
+
r#"<div class="flag-actions">
399
+
<button type="button" class="btn btn-clear" onclick="setDecision('{}', 'clear')">clear</button>
400
+
<button type="button" class="btn btn-defer" onclick="setDecision('{}', 'defer')">defer</button>
401
+
<button type="button" class="btn btn-confirm" onclick="setDecision('{}', 'confirm')">confirm</button>
402
+
</div>"#,
403
+
html_escape(&track.uri),
404
+
html_escape(&track.uri),
405
+
html_escape(&track.uri)
406
+
)
407
+
} else {
408
+
String::new()
409
+
};
410
+
411
+
format!(
412
+
r#"<div class="flag-card{}" data-uri="{}">
413
+
<div class="flag-header">
414
+
<div class="track-info">
415
+
<h3>{}</h3>
416
+
<div class="artist">@{}</div>
417
+
</div>
418
+
<div class="flag-badges">
419
+
{}
420
+
</div>
421
+
</div>
422
+
{}
423
+
{}
424
+
</div>"#,
425
+
if track.resolved { " resolved" } else { "" },
426
+
html_escape(&track.uri),
427
+
title_html,
428
+
html_escape(artist),
429
+
resolved_badge,
430
+
matches_html,
431
+
action_buttons
432
+
)
433
+
}
434
+
435
+
fn html_escape(s: &str) -> String {
436
+
s.replace('&', "&")
437
+
.replace('<', "<")
438
+
.replace('>', ">")
439
+
.replace('"', """)
440
+
.replace('\'', "'")
441
+
}
442
+
443
+
/// Additional CSS for review page (supplements admin.css)
444
+
const REVIEW_CSS: &str = r#"
445
+
/* review page specific styles */
446
+
body { padding-bottom: 80px; }
447
+
448
+
.subtitle a {
449
+
color: var(--accent);
450
+
text-decoration: none;
451
+
}
452
+
.subtitle a:hover { text-decoration: underline; }
453
+
454
+
/* action buttons */
455
+
.btn-clear {
456
+
background: rgba(74, 222, 128, 0.15);
457
+
color: var(--success);
458
+
border: 1px solid rgba(74, 222, 128, 0.3);
459
+
}
460
+
.btn-clear:hover {
461
+
background: rgba(74, 222, 128, 0.25);
462
+
}
463
+
464
+
.btn-defer {
465
+
background: rgba(251, 191, 36, 0.15);
466
+
color: var(--warning);
467
+
border: 1px solid rgba(251, 191, 36, 0.3);
468
+
}
469
+
.btn-defer:hover {
470
+
background: rgba(251, 191, 36, 0.25);
471
+
}
472
+
473
+
.btn-confirm {
474
+
background: rgba(239, 68, 68, 0.15);
475
+
color: var(--error);
476
+
border: 1px solid rgba(239, 68, 68, 0.3);
477
+
}
478
+
.btn-confirm:hover {
479
+
background: rgba(239, 68, 68, 0.25);
480
+
}
481
+
482
+
/* card selection states */
483
+
.flag-card.decision-clear {
484
+
border-color: var(--success);
485
+
background: rgba(74, 222, 128, 0.05);
486
+
}
487
+
.flag-card.decision-defer {
488
+
border-color: var(--warning);
489
+
background: rgba(251, 191, 36, 0.05);
490
+
}
491
+
.flag-card.decision-confirm {
492
+
border-color: var(--error);
493
+
background: rgba(239, 68, 68, 0.05);
494
+
}
495
+
496
+
/* submit bar */
497
+
.submit-bar {
498
+
position: fixed;
499
+
bottom: 0;
500
+
left: 0;
501
+
right: 0;
502
+
padding: 16px 24px;
503
+
background: var(--bg-secondary);
504
+
border-top: 1px solid var(--border-subtle);
505
+
}
506
+
.submit-bar .btn {
507
+
width: 100%;
508
+
max-width: 900px;
509
+
margin: 0 auto;
510
+
display: block;
511
+
padding: 14px;
512
+
}
513
+
514
+
/* resolved section */
515
+
.resolved-section {
516
+
margin-top: 24px;
517
+
padding-top: 16px;
518
+
border-top: 1px solid var(--border-subtle);
519
+
}
520
+
.resolved-section summary {
521
+
cursor: pointer;
522
+
color: var(--text-tertiary);
523
+
font-size: 0.85rem;
524
+
margin-bottom: 12px;
525
+
}
526
+
"#;
+4
moderation/src/state.rs
+4
moderation/src/state.rs
···
35
35
#[error("bad request: {0}")]
36
36
BadRequest(String),
37
37
38
+
#[error("not found: {0}")]
39
+
NotFound(String),
40
+
38
41
#[error("label error: {0}")]
39
42
Label(#[from] LabelError),
40
43
···
54
57
(StatusCode::SERVICE_UNAVAILABLE, "LabelerNotConfigured")
55
58
}
56
59
AppError::BadRequest(_) => (StatusCode::BAD_REQUEST, "BadRequest"),
60
+
AppError::NotFound(_) => (StatusCode::NOT_FOUND, "NotFound"),
57
61
AppError::Label(_) => (StatusCode::INTERNAL_SERVER_ERROR, "LabelError"),
58
62
AppError::Database(_) => (StatusCode::INTERNAL_SERVER_ERROR, "DatabaseError"),
59
63
AppError::Io(_) => (StatusCode::INTERNAL_SERVER_ERROR, "IoError"),
+4
-6
scripts/docket_runs.py
+4
-6
scripts/docket_runs.py
···
38
38
url = os.environ.get("DOCKET_URL_STAGING")
39
39
if not url:
40
40
print("error: DOCKET_URL_STAGING not set")
41
-
print(
42
-
"hint: export DOCKET_URL_STAGING=rediss://default:xxx@xxx.upstash.io:6379"
43
-
)
41
+
print("hint: flyctl proxy 6380:6379 -a plyr-redis-stg")
42
+
print(" export DOCKET_URL_STAGING=redis://localhost:6380")
44
43
return 1
45
44
elif args.env == "production":
46
45
url = os.environ.get("DOCKET_URL_PRODUCTION")
47
46
if not url:
48
47
print("error: DOCKET_URL_PRODUCTION not set")
49
-
print(
50
-
"hint: export DOCKET_URL_PRODUCTION=rediss://default:xxx@xxx.upstash.io:6379"
51
-
)
48
+
print("hint: flyctl proxy 6381:6379 -a plyr-redis")
49
+
print(" export DOCKET_URL_PRODUCTION=redis://localhost:6381")
52
50
return 1
53
51
54
52
print(f"connecting to {args.env}...")
+348
scripts/moderation_loop.py
+348
scripts/moderation_loop.py
···
1
+
#!/usr/bin/env -S uv run --script --quiet
2
+
# /// script
3
+
# requires-python = ">=3.12"
4
+
# dependencies = [
5
+
# "pydantic-ai>=0.1.0",
6
+
# "anthropic",
7
+
# "httpx",
8
+
# "pydantic>=2.0",
9
+
# "pydantic-settings",
10
+
# "atproto>=0.0.55",
11
+
# "rich",
12
+
# ]
13
+
# ///
14
+
"""autonomous moderation loop for plyr.fm.
15
+
16
+
workflow:
17
+
1. fetch pending flags from moderation service
18
+
2. analyze each flag with LLM (FALSE_POSITIVE, VIOLATION, NEEDS_HUMAN)
19
+
3. auto-resolve false positives
20
+
4. create review batch for needs_human flags
21
+
5. send DM with link to review UI
22
+
23
+
the review UI handles human decisions - DM is just a notification channel.
24
+
"""
25
+
26
+
import argparse
27
+
import asyncio
28
+
from dataclasses import dataclass, field
29
+
from pathlib import Path
30
+
31
+
import httpx
32
+
from atproto import AsyncClient, models
33
+
from pydantic import BaseModel, Field
34
+
from pydantic_ai import Agent
35
+
from pydantic_ai.models.anthropic import AnthropicModel
36
+
from pydantic_settings import BaseSettings, SettingsConfigDict
37
+
from rich.console import Console
38
+
39
+
console = Console()
40
+
41
+
42
+
class LoopSettings(BaseSettings):
43
+
model_config = SettingsConfigDict(
44
+
env_file=Path(__file__).parent.parent / ".env",
45
+
case_sensitive=False,
46
+
extra="ignore",
47
+
)
48
+
moderation_service_url: str = Field(
49
+
default="https://moderation.plyr.fm", validation_alias="MODERATION_SERVICE_URL"
50
+
)
51
+
moderation_auth_token: str = Field(
52
+
default="", validation_alias="MODERATION_AUTH_TOKEN"
53
+
)
54
+
anthropic_api_key: str = Field(default="", validation_alias="ANTHROPIC_API_KEY")
55
+
anthropic_model: str = Field(
56
+
default="claude-sonnet-4-20250514", validation_alias="ANTHROPIC_MODEL"
57
+
)
58
+
bot_handle: str = Field(default="", validation_alias="NOTIFY_BOT_HANDLE")
59
+
bot_password: str = Field(default="", validation_alias="NOTIFY_BOT_PASSWORD")
60
+
recipient_handle: str = Field(
61
+
default="", validation_alias="NOTIFY_RECIPIENT_HANDLE"
62
+
)
63
+
64
+
65
+
class FlagAnalysis(BaseModel):
66
+
"""result of analyzing a single flag."""
67
+
68
+
uri: str
69
+
category: str = Field(description="FALSE_POSITIVE, VIOLATION, or NEEDS_HUMAN")
70
+
reason: str
71
+
72
+
73
+
@dataclass
74
+
class DMClient:
75
+
handle: str
76
+
password: str
77
+
recipient_handle: str
78
+
_client: AsyncClient = field(init=False, repr=False)
79
+
_dm_client: AsyncClient = field(init=False, repr=False)
80
+
_recipient_did: str = field(init=False, repr=False)
81
+
_convo_id: str = field(init=False, repr=False)
82
+
83
+
async def setup(self) -> None:
84
+
self._client = AsyncClient()
85
+
await self._client.login(self.handle, self.password)
86
+
self._dm_client = self._client.with_bsky_chat_proxy()
87
+
profile = await self._client.app.bsky.actor.get_profile(
88
+
{"actor": self.recipient_handle}
89
+
)
90
+
self._recipient_did = profile.did
91
+
convo = await self._dm_client.chat.bsky.convo.get_convo_for_members(
92
+
models.ChatBskyConvoGetConvoForMembers.Params(members=[self._recipient_did])
93
+
)
94
+
self._convo_id = convo.convo.id
95
+
96
+
async def get_messages(self, limit: int = 30) -> list[dict]:
97
+
response = await self._dm_client.chat.bsky.convo.get_messages(
98
+
models.ChatBskyConvoGetMessages.Params(convo_id=self._convo_id, limit=limit)
99
+
)
100
+
return [
101
+
{
102
+
"text": m.text,
103
+
"is_bot": m.sender.did != self._recipient_did,
104
+
"sent_at": getattr(m, "sent_at", None),
105
+
}
106
+
for m in response.messages
107
+
if hasattr(m, "text") and hasattr(m, "sender")
108
+
]
109
+
110
+
async def send(self, text: str) -> None:
111
+
await self._dm_client.chat.bsky.convo.send_message(
112
+
models.ChatBskyConvoSendMessage.Data(
113
+
convo_id=self._convo_id,
114
+
message=models.ChatBskyConvoDefs.MessageInput(text=text),
115
+
)
116
+
)
117
+
118
+
119
+
@dataclass
120
+
class PlyrClient:
121
+
"""client for checking track existence in plyr.fm."""
122
+
123
+
env: str = "prod"
124
+
_client: httpx.AsyncClient = field(init=False, repr=False)
125
+
126
+
def __post_init__(self) -> None:
127
+
base_url = {
128
+
"prod": "https://api.plyr.fm",
129
+
"staging": "https://api-stg.plyr.fm",
130
+
"dev": "http://localhost:8001",
131
+
}.get(self.env, "https://api.plyr.fm")
132
+
self._client = httpx.AsyncClient(base_url=base_url, timeout=10.0)
133
+
134
+
async def close(self) -> None:
135
+
await self._client.aclose()
136
+
137
+
async def track_exists(self, track_id: int) -> bool:
138
+
"""check if a track exists (returns False if 404)."""
139
+
try:
140
+
r = await self._client.get(f"/tracks/{track_id}")
141
+
return r.status_code == 200
142
+
except Exception:
143
+
return True # assume exists on error (don't accidentally delete labels)
144
+
145
+
146
+
@dataclass
147
+
class ModClient:
148
+
base_url: str
149
+
auth_token: str
150
+
_client: httpx.AsyncClient = field(init=False, repr=False)
151
+
152
+
def __post_init__(self) -> None:
153
+
self._client = httpx.AsyncClient(
154
+
base_url=self.base_url,
155
+
headers={"X-Moderation-Key": self.auth_token},
156
+
timeout=30.0,
157
+
)
158
+
159
+
async def close(self) -> None:
160
+
await self._client.aclose()
161
+
162
+
async def list_pending(self) -> list[dict]:
163
+
r = await self._client.get("/admin/flags", params={"filter": "pending"})
164
+
r.raise_for_status()
165
+
return r.json().get("tracks", [])
166
+
167
+
async def resolve(self, uri: str, reason: str, notes: str = "") -> None:
168
+
r = await self._client.post(
169
+
"/admin/resolve",
170
+
json={
171
+
"uri": uri,
172
+
"val": "copyright-violation",
173
+
"reason": reason,
174
+
"notes": notes,
175
+
},
176
+
)
177
+
r.raise_for_status()
178
+
179
+
async def create_batch(
180
+
self, uris: list[str], created_by: str | None = None
181
+
) -> dict:
182
+
"""create a review batch and return {id, url, flag_count}."""
183
+
r = await self._client.post(
184
+
"/admin/batches",
185
+
json={"uris": uris, "created_by": created_by},
186
+
)
187
+
r.raise_for_status()
188
+
return r.json()
189
+
190
+
191
+
def get_header(env: str) -> str:
192
+
return f"[PLYR-MOD:{env.upper()}]"
193
+
194
+
195
+
def create_flag_analyzer(api_key: str, model: str) -> Agent[None, list[FlagAnalysis]]:
196
+
from pydantic_ai.providers.anthropic import AnthropicProvider
197
+
198
+
return Agent(
199
+
model=AnthropicModel(model, provider=AnthropicProvider(api_key=api_key)),
200
+
output_type=list[FlagAnalysis],
201
+
system_prompt="""\
202
+
analyze each copyright flag. categorize as:
203
+
- FALSE_POSITIVE: fingerprint noise, uploader is the artist, unrelated matches
204
+
- VIOLATION: clearly copyrighted commercial content
205
+
- NEEDS_HUMAN: ambiguous, need human review
206
+
207
+
return a FlagAnalysis for each flag with uri, category, and brief reason.
208
+
""",
209
+
)
210
+
211
+
212
+
async def run_loop(
213
+
dry_run: bool = False, limit: int | None = None, env: str = "prod"
214
+
) -> None:
215
+
settings = LoopSettings()
216
+
for attr in [
217
+
"moderation_auth_token",
218
+
"anthropic_api_key",
219
+
"bot_handle",
220
+
"bot_password",
221
+
"recipient_handle",
222
+
]:
223
+
if not getattr(settings, attr):
224
+
console.print(f"[red]missing {attr}[/red]")
225
+
return
226
+
227
+
console.print(f"[bold]moderation loop[/bold] ({settings.anthropic_model})")
228
+
if dry_run:
229
+
console.print("[yellow]DRY RUN[/yellow]")
230
+
231
+
dm = DMClient(settings.bot_handle, settings.bot_password, settings.recipient_handle)
232
+
mod = ModClient(settings.moderation_service_url, settings.moderation_auth_token)
233
+
plyr = PlyrClient(env=env)
234
+
235
+
try:
236
+
await dm.setup()
237
+
238
+
# get pending flags
239
+
pending = await mod.list_pending()
240
+
if not pending:
241
+
console.print("[green]no pending flags[/green]")
242
+
return
243
+
244
+
console.print(f"[bold]{len(pending)} pending flags[/bold]")
245
+
246
+
# check for deleted tracks and auto-resolve them
247
+
console.print("[dim]checking for deleted tracks...[/dim]")
248
+
active_flags = []
249
+
deleted_count = 0
250
+
for flag in pending:
251
+
track_id = flag.get("context", {}).get("track_id")
252
+
if track_id and not await plyr.track_exists(track_id):
253
+
# track was deleted - resolve the flag
254
+
if not dry_run:
255
+
try:
256
+
await mod.resolve(
257
+
flag["uri"], "content_deleted", "track no longer exists"
258
+
)
259
+
console.print(
260
+
f" [yellow]⌫[/yellow] deleted: {flag['uri'][-40:]}"
261
+
)
262
+
deleted_count += 1
263
+
except Exception as e:
264
+
console.print(f" [red]✗[/red] {e}")
265
+
active_flags.append(flag)
266
+
else:
267
+
console.print(
268
+
f" [yellow]would resolve deleted:[/yellow] {flag['uri'][-40:]}"
269
+
)
270
+
deleted_count += 1
271
+
else:
272
+
active_flags.append(flag)
273
+
274
+
if deleted_count > 0:
275
+
console.print(f"[yellow]{deleted_count} deleted tracks resolved[/yellow]")
276
+
277
+
pending = active_flags
278
+
if not pending:
279
+
console.print("[green]all flags were for deleted tracks[/green]")
280
+
return
281
+
282
+
# analyze remaining flags
283
+
if limit:
284
+
pending = pending[:limit]
285
+
286
+
analyzer = create_flag_analyzer(
287
+
settings.anthropic_api_key, settings.anthropic_model
288
+
)
289
+
desc = "\n---\n".join(
290
+
f"URI: {f['uri']}\nTrack: {f.get('context', {}).get('track_title', '?')}\n"
291
+
f"Uploader: @{f.get('context', {}).get('artist_handle', '?')}\n"
292
+
f"Matches: {', '.join(m['artist'] for m in f.get('context', {}).get('matches', [])[:3])}"
293
+
for f in pending
294
+
)
295
+
result = await analyzer.run(f"analyze {len(pending)} flags:\n\n{desc}")
296
+
analyses = result.output
297
+
298
+
# auto-resolve false positives
299
+
auto = [a for a in analyses if a.category == "FALSE_POSITIVE"]
300
+
human = [a for a in analyses if a.category == "NEEDS_HUMAN"]
301
+
console.print(f"analysis: {len(auto)} auto-resolve, {len(human)} need human")
302
+
303
+
for a in auto:
304
+
if not dry_run:
305
+
try:
306
+
await mod.resolve(
307
+
a.uri, "fingerprint_noise", f"auto: {a.reason[:50]}"
308
+
)
309
+
console.print(f" [green]✓[/green] {a.uri[-40:]}")
310
+
except Exception as e:
311
+
console.print(f" [red]✗[/red] {e}")
312
+
313
+
# create batch and send link for needs_human (if any)
314
+
if human:
315
+
human_uris = [h.uri for h in human]
316
+
console.print(f"[dim]creating batch for {len(human_uris)} flags...[/dim]")
317
+
318
+
if not dry_run:
319
+
batch = await mod.create_batch(human_uris, created_by="moderation_loop")
320
+
full_url = f"{mod.base_url.rstrip('/')}{batch['url']}"
321
+
msg = (
322
+
f"{get_header(env)} {batch['flag_count']} need review:\n{full_url}"
323
+
)
324
+
await dm.send(msg)
325
+
console.print(f"[green]sent batch {batch['id']}[/green]")
326
+
else:
327
+
console.print(
328
+
f"[yellow]would create batch with {len(human_uris)} flags[/yellow]"
329
+
)
330
+
331
+
console.print("[bold]done[/bold]")
332
+
333
+
finally:
334
+
await mod.close()
335
+
await plyr.close()
336
+
337
+
338
+
def main() -> None:
339
+
parser = argparse.ArgumentParser()
340
+
parser.add_argument("--dry-run", action="store_true")
341
+
parser.add_argument("--limit", type=int, default=None)
342
+
parser.add_argument("--env", default="prod", choices=["dev", "staging", "prod"])
343
+
args = parser.parse_args()
344
+
asyncio.run(run_loop(dry_run=args.dry_run, limit=args.limit, env=args.env))
345
+
346
+
347
+
if __name__ == "__main__":
348
+
main()