music on atproto
plyr.fm
1use std::{
2 env,
3 net::SocketAddr,
4 path::{Path, PathBuf},
5};
6
7use anyhow::anyhow;
8use axum::{
9 body::Body,
10 extract::{DefaultBodyLimit, Multipart, Query, Request},
11 http::{header, HeaderValue, StatusCode},
12 middleware::{self, Next},
13 response::{IntoResponse, Response},
14 routing::{get, post},
15 Json, Router,
16};
17use sanitize_filename::sanitize;
18use serde::Deserialize;
19use tempfile::TempDir;
20use tokio::{fs::File, io::AsyncWriteExt, net::TcpListener, process::Command};
21use tracing::{error, info, warn};
22
23#[derive(Debug, Deserialize, Default)]
24struct TranscodeParams {
25 target: Option<String>,
26}
27
28#[derive(Debug, serde::Serialize)]
29struct HealthResponse {
30 status: &'static str,
31}
32
33#[tokio::main]
34async fn main() -> anyhow::Result<()> {
35 tracing_subscriber::fmt()
36 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
37 .with_target(false)
38 .init();
39
40 let max_upload_bytes: usize = env::var("TRANSCODER_MAX_UPLOAD_BYTES")
41 .ok()
42 .and_then(|v| v.parse().ok())
43 .unwrap_or(512 * 1024 * 1024); // 512MB default
44
45 let auth_token = env::var("TRANSCODER_AUTH_TOKEN").ok();
46
47 let app = Router::new()
48 .route("/health", get(health))
49 .route("/transcode", post(transcode))
50 .layer(middleware::from_fn(move |req, next| {
51 auth_middleware(req, next, auth_token.clone())
52 }))
53 .layer(DefaultBodyLimit::max(max_upload_bytes));
54
55 let port: u16 = env::var("TRANSCODER_PORT")
56 .ok()
57 .and_then(|v| v.parse().ok())
58 .unwrap_or(8082);
59 let host = env::var("TRANSCODER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
60 let addr: SocketAddr = format!("{}:{}", host, port)
61 .parse()
62 .map_err(|e| anyhow!("invalid bind addr: {e}"))?;
63 info!(%addr, max_upload_bytes, "transcoder listening");
64
65 let listener = TcpListener::bind(addr).await?;
66 axum::serve(listener, app).await?;
67 Ok(())
68}
69
70async fn auth_middleware(
71 req: Request,
72 next: Next,
73 auth_token: Option<String>,
74) -> Result<Response, StatusCode> {
75 // skip auth for health endpoint
76 if req.uri().path() == "/health" {
77 return Ok(next.run(req).await);
78 }
79
80 // if no auth token configured, allow all requests (local dev mode)
81 let Some(expected_token) = auth_token else {
82 warn!("no TRANSCODER_AUTH_TOKEN set - accepting all requests");
83 return Ok(next.run(req).await);
84 };
85
86 // check for X-Transcoder-Key header
87 let token = req
88 .headers()
89 .get("X-Transcoder-Key")
90 .and_then(|v| v.to_str().ok());
91
92 match token {
93 Some(t) if t == expected_token => Ok(next.run(req).await),
94 Some(_) => {
95 warn!("invalid auth token provided");
96 Err(StatusCode::UNAUTHORIZED)
97 }
98 None => {
99 warn!("missing X-Transcoder-Key header");
100 Err(StatusCode::UNAUTHORIZED)
101 }
102 }
103}
104
105async fn health() -> Json<HealthResponse> {
106 Json(HealthResponse { status: "ok" })
107}
108
109async fn transcode(
110 Query(params): Query<TranscodeParams>,
111 mut multipart: Multipart,
112) -> Result<Response, AppError> {
113 let target_ext = params.target.unwrap_or_else(|| "mp3".to_string());
114
115 let temp_dir =
116 tempfile::tempdir().map_err(|e| AppError::Io(format!("failed to create temp dir: {e}")))?;
117 let (input_path, original_name) = write_upload_to_disk(&mut multipart, &temp_dir).await?;
118
119 let output_path = temp_dir.path().join(format!("output.{}", target_ext));
120 run_ffmpeg(&input_path, &output_path, &target_ext).await?;
121
122 let bytes = tokio::fs::read(&output_path)
123 .await
124 .map_err(|e| AppError::Io(format!("failed to read output file: {e}")))?;
125
126 let media_type = match target_ext.as_str() {
127 "mp3" => "audio/mpeg",
128 "wav" => "audio/wav",
129 "m4a" => "audio/mp4",
130 other => {
131 info!(
132 target = other,
133 "unknown target format, defaulting to octet-stream"
134 );
135 "application/octet-stream"
136 }
137 };
138
139 let download_name = format!("{}.{}", original_name, target_ext);
140 let response = Response::builder()
141 .status(StatusCode::OK)
142 .header(header::CONTENT_TYPE, HeaderValue::from_static(media_type))
143 .header(
144 header::CONTENT_DISPOSITION,
145 HeaderValue::from_str(&format!("attachment; filename=\"{}\"", download_name))
146 .unwrap_or_else(|_| HeaderValue::from_static("attachment")),
147 )
148 .body(Body::from(bytes))
149 .map_err(|e| AppError::Http(e.to_string()))?;
150
151 Ok(response)
152}
153
154async fn write_upload_to_disk(
155 multipart: &mut Multipart,
156 temp_dir: &TempDir,
157) -> Result<(PathBuf, String), AppError> {
158 let mut file_path: Option<PathBuf> = None;
159 let mut original_name: Option<String> = None;
160
161 while let Some(mut field) = multipart
162 .next_field()
163 .await
164 .map_err(|e| AppError::BadRequest(format!("invalid multipart data: {e}")))?
165 {
166 if field.name() != Some("file") {
167 continue;
168 }
169
170 let filename = field
171 .file_name()
172 .map(|s| s.to_string())
173 .unwrap_or_else(|| "upload".to_string());
174 let sanitized_name = sanitize(&filename);
175 let ext = std::path::Path::new(&sanitized_name)
176 .extension()
177 .and_then(|s| s.to_str())
178 .unwrap_or("bin");
179 let path = temp_dir.path().join(format!("input.{}", ext));
180 let mut file = File::create(&path)
181 .await
182 .map_err(|e| AppError::Io(format!("failed to create temp file: {e}")))?;
183
184 while let Some(chunk) = field
185 .chunk()
186 .await
187 .map_err(|e| AppError::BadRequest(format!("failed to read upload chunk: {e}")))?
188 {
189 file.write_all(&chunk)
190 .await
191 .map_err(|e| AppError::Io(format!("failed to write chunk: {e}")))?;
192 }
193 file.flush()
194 .await
195 .map_err(|e| AppError::Io(format!("failed to flush file: {e}")))?;
196
197 file_path = Some(path);
198 original_name = Some(
199 std::path::Path::new(&sanitized_name)
200 .file_stem()
201 .and_then(|s| s.to_str())
202 .unwrap_or("track")
203 .to_string(),
204 );
205 break;
206 }
207
208 match (file_path, original_name) {
209 (Some(path), Some(name)) => Ok((path, name)),
210 _ => Err(AppError::BadRequest(
211 "multipart form must include a 'file' field".into(),
212 )),
213 }
214}
215
216async fn run_ffmpeg(input: &Path, output: &Path, target_ext: &str) -> Result<(), AppError> {
217 let mut cmd = Command::new("ffmpeg");
218 cmd.arg("-y").arg("-i").arg(input);
219
220 match target_ext {
221 "mp3" => {
222 cmd.args(["-acodec", "libmp3lame", "-b:a", "320k", "-ar", "44100"]);
223 }
224 "wav" => {
225 cmd.args(["-acodec", "pcm_s16le", "-ar", "44100"]);
226 }
227 "m4a" => {
228 cmd.args(["-acodec", "aac", "-b:a", "256k", "-ar", "44100"]);
229 }
230 other => {
231 return Err(AppError::BadRequest(format!(
232 "unsupported target format: {}",
233 other
234 )));
235 }
236 }
237
238 cmd.arg(output);
239
240 let output_res = cmd
241 .output()
242 .await
243 .map_err(|e| AppError::Ffmpeg(format!("failed to spawn ffmpeg: {e}")))?;
244
245 if !output_res.status.success() {
246 let stderr = String::from_utf8_lossy(&output_res.stderr).to_string();
247 error!(%stderr, "ffmpeg failed");
248 return Err(AppError::Ffmpeg(stderr));
249 }
250
251 Ok(())
252}
253
254#[derive(Debug, thiserror::Error)]
255enum AppError {
256 #[error("bad request: {0}")]
257 BadRequest(String),
258 #[error("io error: {0}")]
259 Io(String),
260 #[error("http error: {0}")]
261 Http(String),
262 #[error("ffmpeg error: {0}")]
263 Ffmpeg(String),
264}
265
266impl IntoResponse for AppError {
267 fn into_response(self) -> Response {
268 tracing::error!(error = %self, "request failed");
269 let status = match self {
270 AppError::BadRequest(_) => StatusCode::BAD_REQUEST,
271 AppError::Io(_) | AppError::Http(_) | AppError::Ffmpeg(_) => {
272 StatusCode::INTERNAL_SERVER_ERROR
273 }
274 };
275 let body = serde_json::json!({
276 "error": self.to_string(),
277 });
278 (status, Json(body)).into_response()
279 }
280}