Monorepo for Aesthetic.Computer aesthetic.computer
at main 833 lines 24 kB view raw
1// Baker - Core tape processing logic 2// Adapted from system/backend/tape-to-mp4.mjs 3 4import { spawn } from 'child_process'; 5import { promises as fs } from 'fs'; 6import { tmpdir } from 'os'; 7import https from 'https'; 8import http from 'http'; 9import { join } from 'path'; 10import { randomBytes } from 'crypto'; 11import AdmZip from 'adm-zip'; 12import { S3Client, PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3'; 13import { MongoClient } from 'mongodb'; 14 15// MongoDB connection 16let mongoClient; 17let db; 18 19async function connectMongo() { 20 if (!mongoClient) { 21 const mongoUri = process.env.MONGODB_CONNECTION_STRING; 22 const dbName = process.env.MONGODB_NAME; 23 24 if (!mongoUri || !dbName) { 25 console.warn('⚠️ MongoDB not configured, bake history will not persist'); 26 return null; 27 } 28 29 try { 30 mongoClient = await MongoClient.connect(mongoUri); 31 db = mongoClient.db(dbName); 32 console.log('✅ Connected to MongoDB for bake history'); 33 } catch (error) { 34 console.error('❌ Failed to connect to MongoDB:', error.message); 35 return null; 36 } 37 } 38 return db; 39} 40 41// Initialize MongoDB on startup and set up change stream watcher 42connectMongo().then((database) => { 43 if (database) { 44 watchForNewTapes(); 45 } 46}); 47 48/** 49 * Watch MongoDB for new tape inserts to show "incoming" bakes 50 */ 51async function watchForNewTapes() { 52 try { 53 const collection = db.collection('tapes'); 54 const changeStream = collection.watch([ 55 { $match: { operationType: 'insert' } } 56 ]); 57 58 console.log('👀 Watching MongoDB for new tapes...'); 59 60 changeStream.on('change', (change) => { 61 const tape = change.fullDocument; 62 63 // Only track tapes that don't have MP4s yet (need processing) 64 if (!tape.mp4Url && !tape.mp4Status && tape.code) { 65 console.log(`📥 Incoming tape detected: ${tape.code}`); 66 67 incomingBakes.set(tape.code, { 68 code: tape.code, 69 slug: tape.slug, 70 mongoId: tape._id.toString(), 71 detectedAt: Date.now(), 72 status: 'incoming', 73 details: 'Waiting for processing to start' 74 }); 75 76 notifySubscribers(); 77 78 // Auto-remove from incoming after 60 seconds if not picked up 79 setTimeout(() => { 80 if (incomingBakes.has(tape.code) && !activeBakes.has(tape.code)) { 81 console.log(`⏱️ Removing stale incoming bake: ${tape.code}`); 82 incomingBakes.delete(tape.code); 83 notifySubscribers(); 84 } 85 }, 60000); 86 } 87 }); 88 89 changeStream.on('error', (error) => { 90 console.error('❌ Change stream error:', error); 91 // Attempt to reconnect after 5 seconds 92 setTimeout(watchForNewTapes, 5000); 93 }); 94 95 } catch (error) { 96 console.error('❌ Failed to set up change stream:', error); 97 } 98} 99 100// Initialize ffmpeg and ffprobe paths 101let ffmpegPath = 'ffmpeg'; 102let ffprobePath = 'ffprobe'; 103 104// Initialize S3 clients 105const artSpacesClient = new S3Client({ 106 endpoint: process.env.ART_SPACES_ENDPOINT || 'https://sfo3.digitaloceanspaces.com', 107 region: 'us-east-1', // Required but ignored by DigitalOcean 108 credentials: { 109 accessKeyId: process.env.ART_SPACES_KEY, 110 secretAccessKey: process.env.ART_SPACES_SECRET, 111 }, 112}); 113 114const atBlobsSpacesClient = new S3Client({ 115 endpoint: process.env.AT_BLOBS_SPACES_ENDPOINT || 'https://sfo3.digitaloceanspaces.com', 116 region: 'us-east-1', 117 credentials: { 118 accessKeyId: process.env.AT_BLOBS_SPACES_KEY, 119 secretAccessKey: process.env.AT_BLOBS_SPACES_SECRET, 120 }, 121}); 122 123const ART_BUCKET = process.env.ART_SPACES_BUCKET || 'art-aesthetic-computer'; 124const AT_BLOBS_BUCKET = process.env.AT_BLOBS_SPACES_BUCKET || 'at-blobs-aesthetic-computer'; 125const AT_BLOBS_CDN = process.env.AT_BLOBS_CDN || null; // Optional custom CDN domain 126const CALLBACK_SECRET = process.env.CALLBACK_SECRET; 127 128// In-memory status tracking 129const recentBakes = []; // Store last 20 completed bakes 130const activeBakes = new Map(); // Currently processing bakes 131const incomingBakes = new Map(); // Tapes waiting to be processed (from MongoDB watch) 132 133// WebSocket subscribers (defined here, used throughout) 134const subscribers = new Set(); 135 136async function loadRecentBakes() { 137 const database = await connectMongo(); 138 if (!database) return; 139 140 try { 141 const ovenBakes = database.collection('oven-bakes'); 142 const tapes = database.collection('tapes'); 143 const handles = database.collection('@handles'); 144 145 const bakes = await ovenBakes 146 .find({}) 147 .sort({ completedAt: -1 }) 148 .limit(20) 149 .toArray(); 150 151 // Enrich each bake with user handle for ATProto links 152 for (const bake of bakes) { 153 // Get tape to find user ID 154 const tape = await tapes.findOne({ code: bake.code }); 155 if (tape && tape.user) { 156 // Get handle for user 157 const handleDoc = await handles.findOne({ _id: tape.user }); 158 if (handleDoc) { 159 bake.userHandle = handleDoc.handle; 160 } 161 } 162 } 163 164 recentBakes.length = 0; // Clear existing 165 recentBakes.push(...bakes); 166 console.log(`📚 Loaded ${bakes.length} recent bakes from MongoDB`); 167 } catch (error) { 168 console.error('❌ Failed to load recent bakes:', error.message); 169 } 170} 171 172/** 173 * Clean up stale active bakes by checking against completed bakes in MongoDB 174 */ 175export async function cleanupStaleBakes() { 176 const database = await connectMongo(); 177 if (!database) return; 178 179 try { 180 const collection = database.collection('oven-bakes'); 181 182 // Check each active bake to see if it's actually completed 183 for (const [code, bake] of activeBakes.entries()) { 184 const completed = await collection.findOne({ code: code }); 185 if (completed) { 186 console.log(`🧹 Removing stale active bake: ${code} (found in completed)`); 187 activeBakes.delete(code); 188 189 // Add to recent if not already there 190 if (!recentBakes.find(b => b.code === code)) { 191 recentBakes.unshift({ 192 ...bake, 193 ...completed, 194 success: completed.success, 195 completedAt: completed.completedAt?.getTime() || Date.now(), 196 duration: completed.completedAt?.getTime() - bake.startTime 197 }); 198 if (recentBakes.length > 20) recentBakes.pop(); 199 } 200 } 201 } 202 } catch (error) { 203 console.error('❌ Failed to cleanup stale bakes:', error.message); 204 } 205} 206 207/** 208 * Load pending (unprocessed) tapes on startup to restore queue 209 */ 210async function loadPendingTapes() { 211 const database = await connectMongo(); 212 if (!database) return; 213 214 try { 215 const collection = database.collection('tapes'); 216 217 // Find tapes that need processing: 218 // - Have no mp4Url (not yet processed) 219 // - Were created in the last hour (not ancient) 220 const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000); 221 222 const pendingTapes = await collection.find({ 223 mp4Url: { $exists: false }, 224 createdAt: { $gte: oneHourAgo } 225 }).sort({ createdAt: 1 }).limit(10).toArray(); 226 227 if (pendingTapes.length > 0) { 228 console.log(`📥 Restoring ${pendingTapes.length} pending tapes to queue...`); 229 for (const tape of pendingTapes) { 230 if (tape.code && !incomingBakes.has(tape.code)) { 231 incomingBakes.set(tape.code, { 232 code: tape.code, 233 slug: tape.slug, 234 mongoId: tape._id.toString(), 235 detectedAt: new Date(tape.createdAt).getTime(), 236 status: 'incoming', 237 details: 'Restored after server restart' 238 }); 239 } 240 } 241 } 242 } catch (error) { 243 console.error('❌ Failed to load pending tapes:', error.message); 244 } 245} 246 247// Load recent bakes and pending queue on startup 248loadRecentBakes(); 249loadPendingTapes(); 250 251function addRecentBake(bake) { 252 recentBakes.unshift(bake); 253 if (recentBakes.length > 20) recentBakes.pop(); 254 255 // Note: MongoDB persistence is handled by oven-complete webhook 256} 257 258function startBake(code, data) { 259 // Remove from incoming if present 260 if (incomingBakes.has(code)) { 261 console.log(`📤 Moving ${code} from incoming to active`); 262 incomingBakes.delete(code); 263 } 264 265 activeBakes.set(code, { 266 code, 267 startTime: Date.now(), 268 status: 'downloading', 269 ...data 270 }); 271} 272 273function updateBakeStatus(code, status, details = {}) { 274 const bake = activeBakes.get(code); 275 if (bake) { 276 bake.status = status; 277 bake.lastUpdate = Date.now(); 278 Object.assign(bake, details); 279 } 280} 281 282function completeBake(code, success, result = {}) { 283 const bake = activeBakes.get(code); 284 if (bake) { 285 activeBakes.delete(code); 286 addRecentBake({ 287 ...bake, 288 success, 289 completedAt: Date.now(), 290 duration: Date.now() - bake.startTime, 291 ...result 292 }); 293 } 294} 295 296/** 297 * Health check handler 298 */ 299export async function healthHandler(req, res) { 300 res.json({ 301 status: 'ok', 302 service: 'oven', 303 timestamp: new Date().toISOString() 304 }); 305} 306 307/** 308 * Main bake handler 309 */ 310export async function bakeHandler(req, res) { 311 const { mongoId, slug, code, zipUrl, callbackUrl, callbackSecret, metadata } = req.body; 312 313 console.log(`📥 Bake request received:`, { 314 mongoId, 315 slug, 316 code, 317 zipUrl, 318 callbackUrl, 319 hasSecret: !!callbackSecret, 320 secretPreview: callbackSecret ? callbackSecret.substring(0, 10) + '...' : 'none', 321 expectedSecretPreview: CALLBACK_SECRET ? CALLBACK_SECRET.substring(0, 10) + '...' : 'none' 322 }); 323 324 // Validate request 325 if (!mongoId || !slug || !code || !zipUrl || !callbackUrl) { 326 console.error('❌ Missing required fields'); 327 return res.status(400).json({ 328 error: 'Missing required fields: mongoId, slug, code, zipUrl, callbackUrl' 329 }); 330 } 331 332 if (callbackSecret !== CALLBACK_SECRET) { 333 console.error('❌ Invalid callback secret'); 334 console.error(` Received: ${callbackSecret}`); 335 console.error(` Expected: ${CALLBACK_SECRET}`); 336 return res.status(401).json({ error: 'Invalid callback secret' }); 337 } 338 339 console.log(`🔥 Starting bake for tape: ${slug} (${code})`); 340 341 // Start tracking (keyed by code) 342 startBake(code, { mongoId, slug, code, zipUrl, callbackUrl }); 343 notifySubscribers(); 344 345 // Respond immediately - processing happens in background 346 res.json({ 347 status: 'accepted', 348 slug, 349 code, 350 message: 'Baking started' 351 }); 352 353 // Process asynchronously 354 processTape({ mongoId, slug, code, zipUrl, callbackUrl, metadata }) 355 .catch(err => { 356 console.error(`❌ Bake failed for ${slug}:`, err); 357 }); 358} 359 360/** 361 * Process tape: download, convert, upload, callback 362 */ 363async function processTape({ mongoId, slug, code, zipUrl, callbackUrl, metadata }) { 364 const workDir = join(tmpdir(), `tape-${code}-${Date.now()}`); 365 366 try { 367 updateBakeStatus(code, 'downloading'); 368 notifySubscribers(); 369 console.log(`📥 Downloading ZIP for ${code} (${slug})...`); 370 const zipBuffer = await downloadZip(zipUrl); 371 372 updateBakeStatus(code, 'extracting'); 373 notifySubscribers(); 374 console.log(`📦 Extracting to ${workDir}...`); 375 await fs.mkdir(workDir, { recursive: true }); 376 await extractZip(zipBuffer, workDir); 377 378 updateBakeStatus(code, 'processing'); 379 notifySubscribers(); 380 console.log(`📊 Reading timing data...`); 381 const timing = await readTiming(workDir); 382 383 console.log(`📸 Generating thumbnail...`); 384 const thumbnailBuffer = await generateThumbnail(workDir); 385 386 console.log(`🎬 Converting to MP4...`); 387 const mp4Buffer = await framesToMp4(workDir, timing); 388 389 updateBakeStatus(code, 'uploading'); 390 notifySubscribers(); 391 console.log(`☁️ Uploading to Spaces...`); 392 const mp4Url = await uploadToSpaces(mp4Buffer, `tapes/${code}.mp4`); 393 const thumbnailUrl = await uploadToSpaces(thumbnailBuffer, `tapes/${code}-thumb.jpg`, 'image/jpeg'); 394 395 console.log(`📞 Calling back to ${callbackUrl}...`); 396 await postCallback({ mongoId, slug, code, mp4Url, thumbnailUrl, callbackUrl }); 397 398 console.log(`🧹 Cleaning up ${workDir}...`); 399 await fs.rm(workDir, { recursive: true, force: true }); 400 401 console.log(`✅ Bake complete for ${code} (${slug})`); 402 // Completion will be handled by webhook notification 403 404 } catch (error) { 405 console.error(`❌ Error processing ${code} (${slug}):`, error); 406 407 // Notify callback of error 408 try { 409 await postCallback({ 410 mongoId, 411 slug, 412 code, 413 callbackUrl, 414 error: error.message 415 }); 416 } catch (callbackError) { 417 console.warn(`⚠️ Callback notification failed:`, callbackError.message); 418 } 419 420 // Error completion will be handled by webhook notification 421 422 throw error; 423 } finally { 424 // Ensure cleanup 425 try { 426 await fs.rm(workDir, { recursive: true, force: true }); 427 } catch (cleanupError) { 428 console.warn(`⚠️ Cleanup failed for ${workDir}:`, cleanupError.message); 429 } 430 } 431} 432 433/** 434 * Download ZIP from URL (supports both http and https with self-signed certs) 435 */ 436async function downloadZip(zipUrl) { 437 return new Promise((resolve, reject) => { 438 const url = new URL(zipUrl); 439 const isHttps = url.protocol === 'https:'; 440 const client = isHttps ? https : http; 441 442 const options = { 443 hostname: url.hostname, 444 port: url.port || (isHttps ? 443 : 80), 445 path: url.pathname + url.search, 446 method: 'GET', 447 rejectUnauthorized: false, // Accept self-signed certs in dev 448 }; 449 450 // Set timeout for both connection and idle 451 const timeoutMs = 120000; // 2 minutes 452 let timeoutId; 453 454 const req = client.request(options, (res) => { 455 // Clear connection timeout, set idle timeout 456 clearTimeout(timeoutId); 457 timeoutId = setTimeout(() => { 458 req.destroy(); 459 reject(new Error('Download timeout: no data received for 2 minutes')); 460 }, timeoutMs); 461 462 // Follow redirects 463 if (res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) { 464 console.log(` Following redirect to: ${res.headers.location}`); 465 clearTimeout(timeoutId); 466 downloadZip(res.headers.location).then(resolve).catch(reject); 467 return; 468 } 469 470 if (res.statusCode !== 200) { 471 clearTimeout(timeoutId); 472 reject(new Error(`Failed to download ZIP: ${res.statusCode} ${res.statusMessage}`)); 473 return; 474 } 475 476 const chunks = []; 477 res.on('data', (chunk) => { 478 chunks.push(chunk); 479 // Reset timeout on each data chunk 480 clearTimeout(timeoutId); 481 timeoutId = setTimeout(() => { 482 req.destroy(); 483 reject(new Error('Download timeout: no data received for 2 minutes')); 484 }, timeoutMs); 485 }); 486 res.on('end', () => { 487 clearTimeout(timeoutId); 488 resolve(Buffer.concat(chunks)); 489 }); 490 }); 491 492 // Initial connection timeout 493 timeoutId = setTimeout(() => { 494 req.destroy(); 495 reject(new Error('Connection timeout: failed to connect within 2 minutes')); 496 }, timeoutMs); 497 498 req.on('error', (err) => { 499 clearTimeout(timeoutId); 500 reject(err); 501 }); 502 req.end(); 503 }); 504} 505 506/** 507 * Extract ZIP to directory 508 */ 509async function extractZip(zipBuffer, workDir) { 510 const zip = new AdmZip(zipBuffer); 511 zip.extractAllTo(workDir, true); 512} 513 514/** 515 * Read timing.json 516 */ 517async function readTiming(workDir) { 518 const timingPath = join(workDir, 'timing.json'); 519 try { 520 const timingData = await fs.readFile(timingPath, 'utf-8'); 521 const timing = JSON.parse(timingData); 522 if (Array.isArray(timing) && timing.length > 0) { 523 console.log(` Found timing data for ${timing.length} frames`); 524 return timing; 525 } 526 return null; 527 } catch (error) { 528 console.log(` No timing.json found, using default frame rate`); 529 return null; 530 } 531} 532 533/** 534 * Generate thumbnail from midpoint frame 535 */ 536async function generateThumbnail(workDir) { 537 try { 538 const files = await fs.readdir(workDir); 539 const frameFiles = files.filter(f => f.startsWith('frame-') && f.endsWith('.png')).sort(); 540 541 if (frameFiles.length === 0) { 542 throw new Error('No frames found for thumbnail'); 543 } 544 545 const midIndex = Math.floor(frameFiles.length / 2); 546 const midFrame = frameFiles[midIndex]; 547 const framePath = join(workDir, midFrame); 548 549 console.log(` Using frame ${midIndex + 1}/${frameFiles.length}: ${midFrame}`); 550 551 const sharp = (await import('sharp')).default; 552 553 const image = sharp(framePath); 554 const metadata = await image.metadata(); 555 556 // Scale 3x with nearest neighbor 557 const scaled3x = await image 558 .resize(metadata.width * 3, metadata.height * 3, { 559 kernel: 'nearest' 560 }) 561 .toBuffer(); 562 563 // Fit to 512x512 564 const thumbnail = await sharp(scaled3x) 565 .resize(512, 512, { 566 fit: 'contain', 567 background: { r: 0, g: 0, b: 0, alpha: 0 } 568 }) 569 .jpeg({ quality: 90 }) 570 .toBuffer(); 571 572 const sizeKB = (thumbnail.length / 1024).toFixed(2); 573 console.log(` Thumbnail: ${sizeKB} KB`); 574 575 return thumbnail; 576 } catch (error) { 577 console.error(` Thumbnail generation failed:`, error.message); 578 return null; 579 } 580} 581 582/** 583 * Convert frames to MP4 using ffmpeg 584 */ 585async function framesToMp4(workDir, timing) { 586 const outputPath = join(workDir, 'output.mp4'); 587 const soundtrackPath = join(workDir, 'soundtrack.wav'); 588 589 // Check for audio 590 const hasSoundtrack = await fs.access(soundtrackPath).then(() => true).catch(() => false); 591 592 let frameRate = 60; // default 593 594 if (hasSoundtrack && timing && Array.isArray(timing) && timing.length > 0) { 595 // Probe audio duration 596 const audioDuration = await new Promise((resolve, reject) => { 597 const ffprobe = spawn(ffprobePath, [ 598 '-v', 'error', 599 '-show_entries', 'format=duration', 600 '-of', 'default=noprint_wrappers=1:nokey=1', 601 soundtrackPath 602 ]); 603 604 let output = ''; 605 ffprobe.stdout.on('data', (data) => output += data.toString()); 606 ffprobe.on('error', (err) => reject(err)); 607 ffprobe.on('close', (code) => { 608 if (code === 0) { 609 resolve(parseFloat(output.trim())); 610 } else { 611 reject(new Error('ffprobe failed')); 612 } 613 }); 614 }); 615 616 frameRate = Math.round(timing.length / audioDuration); 617 console.log(` ${timing.length} frames, ${audioDuration.toFixed(2)}s audio → ${frameRate}fps`); 618 } else if (timing && Array.isArray(timing) && timing.length > 0) { 619 const totalDuration = timing.reduce((sum, frame) => sum + frame.duration, 0); 620 const avgFrameDuration = totalDuration / timing.length; 621 frameRate = Math.round(1000 / avgFrameDuration); 622 console.log(` ${timing.length} frames, calculated ${frameRate}fps from timing`); 623 } 624 625 const ffmpegArgs = [ 626 '-r', frameRate.toString(), 627 '-i', join(workDir, 'frame-%05d.png'), 628 ]; 629 630 if (hasSoundtrack) { 631 console.log(` Including soundtrack.wav`); 632 ffmpegArgs.push('-i', soundtrackPath); 633 } 634 635 ffmpegArgs.push( 636 '-vf', 'scale=iw*3:ih*3:flags=neighbor,scale=trunc(iw/2)*2:trunc(ih/2)*2:flags=neighbor', 637 '-c:v', 'libx264', 638 '-pix_fmt', 'yuv420p', 639 ); 640 641 if (hasSoundtrack) { 642 ffmpegArgs.push('-c:a', 'aac', '-b:a', '128k'); 643 } 644 645 ffmpegArgs.push( 646 '-movflags', '+faststart', 647 '-y', 648 outputPath 649 ); 650 651 console.log(` Running ffmpeg at ${frameRate}fps`); 652 653 return new Promise((resolve, reject) => { 654 const ffmpeg = spawn(ffmpegPath, ffmpegArgs); 655 656 let stderr = ''; 657 ffmpeg.stderr.on('data', (data) => { 658 stderr += data.toString(); 659 }); 660 661 ffmpeg.on('error', (error) => { 662 reject(new Error(`ffmpeg spawn error: ${error.message}`)); 663 }); 664 665 ffmpeg.on('close', async (code) => { 666 if (code !== 0) { 667 reject(new Error(`ffmpeg exited with code ${code}\n${stderr}`)); 668 return; 669 } 670 671 try { 672 const mp4Buffer = await fs.readFile(outputPath); 673 const sizeKB = (mp4Buffer.length / 1024).toFixed(2); 674 console.log(` MP4 created: ${sizeKB} KB`); 675 resolve(mp4Buffer); 676 } catch (error) { 677 reject(new Error(`Failed to read MP4: ${error.message}`)); 678 } 679 }); 680 }); 681} 682 683/** 684 * Upload buffer to Spaces 685 */ 686async function uploadToSpaces(buffer, key, contentType = 'video/mp4') { 687 if (!buffer) { 688 throw new Error('Cannot upload null buffer'); 689 } 690 691 const command = new PutObjectCommand({ 692 Bucket: AT_BLOBS_BUCKET, 693 Key: key, 694 Body: buffer, 695 ContentType: contentType, 696 ACL: 'public-read', 697 }); 698 699 await atBlobsSpacesClient.send(command); 700 701 // Always use direct Spaces URL for webhook callbacks 702 // The CDN URL (at-blobs.aesthetic.computer) might have auth restrictions 703 const endpoint = process.env.AT_BLOBS_SPACES_ENDPOINT || 'https://sfo3.digitaloceanspaces.com'; 704 const region = endpoint.match(/https:\/\/([^.]+)\./)?.[1] || 'sfo3'; 705 const url = `https://${AT_BLOBS_BUCKET}.${region}.digitaloceanspaces.com/${key}`; 706 707 console.log(` Uploaded: ${url}`); 708 return url; 709} 710 711/** 712 * POST callback to Netlify 713 */ 714async function postCallback({ mongoId, slug, code, mp4Url, thumbnailUrl, callbackUrl, error }) { 715 const payload = { 716 mongoId, 717 slug, 718 code, 719 secret: CALLBACK_SECRET, 720 }; 721 722 if (error) { 723 payload.error = error; 724 } else { 725 payload.mp4Url = mp4Url; 726 payload.thumbnailUrl = thumbnailUrl; 727 } 728 729 return new Promise((resolve, reject) => { 730 const url = new URL(callbackUrl); 731 const isHttps = url.protocol === 'https:'; 732 const client = isHttps ? https : http; 733 const body = JSON.stringify(payload); 734 735 const options = { 736 hostname: url.hostname, 737 port: url.port || (isHttps ? 443 : 80), 738 path: url.pathname + url.search, 739 method: 'POST', 740 headers: { 741 'Content-Type': 'application/json', 742 'Content-Length': Buffer.byteLength(body), 743 }, 744 rejectUnauthorized: false, // Accept self-signed certs in dev 745 }; 746 747 const req = client.request(options, (res) => { 748 if (res.statusCode < 200 || res.statusCode >= 300) { 749 reject(new Error(`Callback failed: ${res.statusCode} ${res.statusMessage}`)); 750 return; 751 } 752 753 console.log(` Callback successful`); 754 resolve(); 755 }); 756 757 req.on('error', reject); 758 req.write(body); 759 req.end(); 760 }); 761} 762 763export function subscribeToUpdates(callback) { 764 subscribers.add(callback); 765 return () => subscribers.delete(callback); 766} 767 768function notifySubscribers() { 769 subscribers.forEach(cb => cb()); 770} 771 772export function getActiveBakes() { 773 return activeBakes; 774} 775 776export function getIncomingBakes() { 777 return incomingBakes; 778} 779 780export function getRecentBakes() { 781 return recentBakes; 782} 783 784export async function statusHandler(req, res) { 785 // Clean up any stale active bakes before returning status 786 await cleanupStaleBakes(); 787 788 res.json({ 789 incoming: Array.from(incomingBakes.values()), 790 active: Array.from(activeBakes.values()), 791 recent: recentBakes 792 }); 793} 794 795/** 796 * Bake completion notification handler 797 * Called by oven-complete webhook to notify oven that processing finished 798 */ 799export function bakeCompleteHandler(req, res) { 800 const { slug, code, success, mp4Url, thumbnailUrl, error, atprotoRkey } = req.body; 801 802 if (!code) { 803 return res.status(400).json({ error: 'Missing code' }); 804 } 805 806 console.log(`🎬 Bake completion notification: ${slug} (${code}) - ${success ? 'success' : 'failed'}${atprotoRkey ? ' 🦋' : ''}`); 807 808 // Move from active to recent (keyed by code) 809 completeBake(code, success, { slug, code, mp4Url, thumbnailUrl, error, atprotoRkey }); 810 notifySubscribers(); 811 812 res.json({ status: 'ok' }); 813} 814 815/** 816 * Bake status update handler 817 * Called by oven-complete webhook for incremental progress updates 818 */ 819export function bakeStatusHandler(req, res) { 820 const { code, status, details } = req.body; 821 822 if (!code) { 823 return res.status(400).json({ error: 'Missing code' }); 824 } 825 826 console.log(`📊 Status update: ${code} - ${status}${details ? ': ' + details : ''}`); 827 828 // Update the bake status 829 updateBakeStatus(code, status, details); 830 notifySubscribers(); 831 832 res.json({ status: 'ok' }); 833}