1# streaming uploads 2 3**status**: implemented in PR #182 4**date**: 2025-11-03 5 6## overview 7 8plyr.fm uses streaming uploads for audio files to maintain constant memory usage regardless of file size. this prevents out-of-memory errors when handling large files on constrained environments (fly.io shared-cpu VMs with 256MB RAM). 9 10## problem (pre-implementation) 11 12the original upload implementation loaded entire audio files into memory, causing OOM risk: 13 14### current flow (memory intensive) 15```python 16# 1. read entire file into memory 17content = file.read() # 40MB WAV → 40MB in RAM 18 19# 2. hash entire content in memory 20file_id = hashlib.sha256(content).hexdigest()[:16] # another 40MB 21 22# 3. upload entire content 23client.put_object(Body=content, ...) # entire file in RAM 24``` 25 26### memory profile 27- single 40MB upload: ~80-120MB peak memory 28- 3 concurrent uploads: ~240-360MB peak 29- fly.io shared-cpu VM: 256MB total RAM 30- **result**: OOM, worker restarts, service degradation 31 32## solution: streaming approach (implemented) 33 34### goals achieved 351. constant memory usage regardless of file size 362. maintained backward compatibility (same file_id generation) 373. supports both R2 and filesystem backends 384. no changes to upload endpoint API 395. proper test coverage added 40 41### current flow (constant memory) 42```python 43# 1. compute hash in chunks (8MB at a time) 44hasher = hashlib.sha256() 45while chunk := file.read(8*1024*1024): 46 hasher.update(chunk) 47file_id = hasher.hexdigest()[:16] 48 49# 2. stream upload to R2 50file.seek(0) # reset after hashing 51client.upload_fileobj(Fileobj=file, Bucket=bucket, Key=key) 52``` 53 54### memory profile (improved) 55- single 40MB upload: ~10-16MB peak (just chunk buffer) 56- 3 concurrent uploads: ~30-48MB peak 57- **result**: stable, no OOM risk 58 59## implementation details 60 61### 1. chunked hash utility 62 63reusable utility for streaming hash calculation: 64 65**location**: `src/backend/utilities/hashing.py` 66 67```python 68# actual implementation from src/backend/utilities/hashing.py 69import hashlib 70from typing import BinaryIO 71 72# 8MB chunks balances memory usage and performance 73CHUNK_SIZE = 8 * 1024 * 1024 74 75def hash_file_chunked(file_obj: BinaryIO, algorithm: str = "sha256") -> str: 76 """compute hash by reading file in chunks. 77 78 this prevents loading entire file into memory, enabling constant 79 memory usage regardless of file size. 80 81 args: 82 file_obj: file-like object to hash 83 algorithm: hash algorithm (default: sha256) 84 85 returns: 86 hexadecimal digest string 87 88 note: 89 file pointer is reset to beginning after hashing so subsequent 90 operations (like upload) can read from start 91 """ 92 hasher = hashlib.new(algorithm) 93 94 # ensure we start from beginning 95 file_obj.seek(0) 96 97 # read and hash in chunks 98 while chunk := file_obj.read(CHUNK_SIZE): 99 hasher.update(chunk) 100 101 # reset pointer for next operation 102 file_obj.seek(0) 103 104 return hasher.hexdigest() 105``` 106 107### 2. R2 storage backend 108 109**file**: `src/backend/storage/r2.py` 110 111**implementation**: 112- uses `hash_file_chunked()` for constant memory hashing 113- uses `aioboto3` async client with `upload_fileobj()` for streaming uploads 114- boto3's `upload_fileobj` automatically handles multipart uploads for files >5MB 115- supports both audio and image files 116 117```python 118# actual implementation (simplified) 119async def save(self, file: BinaryIO, filename: str) -> str: 120 """save media file to R2 using streaming upload. 121 122 uses chunked hashing and aioboto3's upload_fileobj for constant 123 memory usage regardless of file size. 124 """ 125 # compute hash in chunks (constant memory) 126 file_id = hash_file_chunked(file)[:16] 127 128 # determine file extension and type 129 ext = Path(filename).suffix.lower() 130 131 # try audio format first 132 audio_format = AudioFormat.from_extension(ext) 133 if audio_format: 134 key = f"audio/{file_id}{ext}" 135 media_type = audio_format.media_type 136 bucket = self.audio_bucket_name 137 else: 138 # handle image formats... 139 pass 140 141 # stream upload to R2 (constant memory, non-blocking) 142 # file pointer already reset by hash_file_chunked 143 async with self.async_session.client("s3", ...) as client: 144 await client.upload_fileobj( 145 Fileobj=file, 146 Bucket=bucket, 147 Key=key, 148 ExtraArgs={"ContentType": media_type}, 149 ) 150 151 return file_id 152``` 153 154### 3. filesystem storage backend 155 156**file**: `src/backend/storage/filesystem.py` 157 158**implementation**: 159- uses `hash_file_chunked()` for constant memory hashing 160- uses `anyio` for async file I/O instead of blocking operations 161- writes file in chunks for constant memory usage 162- supports both audio and image files 163 164```python 165# actual implementation (simplified) 166async def save(self, file: BinaryIO, filename: str) -> str: 167 """save media file using streaming write. 168 169 uses chunked hashing and async file I/O for constant 170 memory usage regardless of file size. 171 """ 172 # compute hash in chunks (constant memory) 173 file_id = hash_file_chunked(file)[:16] 174 175 # determine file extension and type 176 ext = Path(filename).suffix.lower() 177 178 # try audio format first 179 audio_format = AudioFormat.from_extension(ext) 180 if audio_format: 181 file_path = self.base_path / "audio" / f"{file_id}{ext}" 182 else: 183 # handle image formats... 184 pass 185 186 # write file using async I/O in chunks (constant memory, non-blocking) 187 # file pointer already reset by hash_file_chunked 188 async with await anyio.open_file(file_path, "wb") as dest: 189 while True: 190 chunk = file.read(CHUNK_SIZE) 191 if not chunk: 192 break 193 await dest.write(chunk) 194 195 return file_id 196``` 197 198### 4. upload endpoint 199 200**file**: `src/backend/api/tracks.py` 201 202**implementation**: no changes required! 203 204FastAPI's `UploadFile` already uses `SpooledTemporaryFile`: 205- keeps small files (<1MB) in memory 206- automatically spools larger files to disk 207- provides file-like interface that our streaming functions expect 208- works seamlessly with both storage backends 209 210## testing 211 212### 1. unit tests for hash utility 213 214**file**: `tests/utilities/test_hashing.py` 215 216```python 217def test_hash_file_chunked_correctness(): 218 """verify chunked hashing matches standard approach.""" 219 # create test file 220 test_data = b"test data" * 1000000 # ~9MB 221 222 # standard hash 223 expected = hashlib.sha256(test_data).hexdigest() 224 225 # chunked hash 226 file_obj = io.BytesIO(test_data) 227 actual = hash_file_chunked(file_obj) 228 229 assert actual == expected 230 231 232def test_hash_file_chunked_resets_pointer(): 233 """verify file pointer is reset after hashing.""" 234 file_obj = io.BytesIO(b"test data") 235 hash_file_chunked(file_obj) 236 assert file_obj.tell() == 0 # pointer at start 237``` 238 239### 2. integration tests for uploads 240 241**file**: `tests/api/test_tracks.py` 242 243```python 244async def test_upload_large_file_r2(): 245 """verify large file upload doesn't OOM.""" 246 # create 50MB test file 247 large_file = create_test_audio_file(size_mb=50) 248 249 # upload should succeed with constant memory 250 response = await client.post( 251 "/tracks/", 252 files={"file": large_file}, 253 data={"title": "large track test"}, 254 ) 255 assert response.status_code == 200 256 257 258async def test_concurrent_uploads(): 259 """verify multiple concurrent uploads don't OOM.""" 260 files = [create_test_audio_file(size_mb=30) for _ in range(3)] 261 262 # all should succeed 263 results = await asyncio.gather( 264 *[upload_file(f) for f in files] 265 ) 266 assert all(r.status_code == 200 for r in results) 267``` 268 269### 3. memory profiling 270 271manual testing with memory monitoring: 272 273```bash 274# monitor memory during upload 275watch -n 1 'ps aux | grep uvicorn' 276 277# upload large file 278curl -F "file=@test-50mb.wav" -F "title=test" http://localhost:8000/tracks/ 279``` 280 281expected results: 282- memory should stay under 50MB regardless of file size 283- no memory spikes or gradual leaks 284- consistent performance across multiple uploads 285 286## deployment 287 288implemented in PR #182 and deployed to production. 289 290### validation results 291- memory usage stays constant (~10-16MB per upload) 292- file_id generation remains consistent (backward compatible) 293- supports concurrent uploads without OOM 294- both R2 and filesystem backends working correctly 295 296## backward compatibility 297 298successfully maintained during implementation: 299 300### file_id generation 301- hash algorithm: SHA256 (unchanged) 302- truncation: 16 chars (unchanged) 303- result: existing file_ids remain valid 304 305### API contract 306- endpoint: `POST /tracks/` (unchanged) 307- parameters: title, file, album, features, image (unchanged) 308- response: same structure (unchanged) 309- result: no breaking changes for clients 310 311## edge cases 312 313### very large files (>100MB) 314- boto3 automatically handles multipart upload 315- filesystem streaming works for any size 316- only limited by storage capacity, not RAM 317 318### network failures during upload 319- boto3 multipart upload can retry failed parts 320- filesystem writes are atomic per chunk 321- FastAPI handles connection errors 322 323### concurrent uploads 324- each upload uses independent chunk buffer 325- total memory = num_concurrent * CHUNK_SIZE 326- 5 concurrent @ 8MB chunks = 40MB total (well within 256MB limit) 327 328## observability 329 330metrics tracked in Logfire: 331 3321. upload duration - remains constant regardless of file size 3332. memory usage - stays under 50MB per upload 3343. upload success rate - consistently >99% 3354. concurrent upload handling - no degradation 336 337## future optimizations 338 339### potential improvements (not in scope for this PR) 340 3411. **progressive hashing during upload** 342 - hash chunks as they arrive instead of separate pass 343 - saves one file iteration 344 3452. **client-side chunked uploads** 346 - browser sends file in chunks 347 - server assembles and validates 348 - enables upload progress tracking 349 3503. **parallel multipart upload** 351 - split large files into parts 352 - upload parts in parallel 353 - faster for very large files (>100MB) 354 3554. **deduplication before full upload** 356 - send hash first to check if file exists 357 - skip upload if duplicate found 358 - saves bandwidth and storage 359 360## references 361 362- implementation: `src/backend/storage/r2.py`, `src/backend/storage/filesystem.py` 363- utilities: `src/backend/utilities/hashing.py` 364- tests: `tests/utilities/test_hashing.py`, `tests/api/test_tracks.py` 365- PR: #182 366- boto3 upload_fileobj: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_fileobj.html 367- FastAPI UploadFile: https://fastapi.tiangolo.com/tutorial/request-files/