Demonstrating core cloud concepts, starting with CaaS. Not for production use.
at main 6.6 kB view raw
1use dotenvy::dotenv; 2use futures_util::{SinkExt, StreamExt}; 3use serde::{Deserialize, Serialize}; 4use std::env; 5use std::time::Duration; 6use sysinfo::{Disks, System}; 7use tokio_tungstenite::{connect_async, tungstenite::Message}; 8use tracing::{error, info, instrument}; 9use tracing_subscriber::FmtSubscriber; 10use url::Url; 11 12#[derive(Debug, Serialize, Deserialize, Clone)] 13struct Job { 14 id: String, 15 payload: String, 16} 17 18#[derive(Debug, Serialize, Deserialize, Clone)] 19struct Bid { 20 job_id: String, 21 available_cpu_cores: u32, 22 available_ram_mb: u32, 23 available_storage_mb: u32, 24} 25 26#[derive(Debug, Serialize, Deserialize)] 27#[serde(tag = "type")] 28enum ServerMessage { 29 PingForBids { job_id: String }, 30 AssignJob { job: Job }, 31 Acknowledge, 32} 33 34#[derive(Debug, Serialize, Deserialize)] 35#[serde(tag = "type")] 36enum WorkerMessage { 37 BidResponse(Bid), 38} 39 40fn get_system_resources() -> (u32, u32, u32) { 41 let mut sys = System::new(); 42 sys.refresh_cpu_all(); 43 sys.refresh_memory(); 44 45 let cpu_cores = sys.cpus().len() as u32; 46 let available_ram_mb = (sys.available_memory() / (1024 * 1024)) as u32; 47 let available_storage_mb: u32; 48 49 let disks = Disks::new_with_refreshed_list(); 50 51 let exe_path = std::env::current_exe().unwrap_or_else(|_| std::path::PathBuf::from("/")); 52 53 let mut best_disk_mount: Option<&std::path::Path> = None; 54 let mut best_disk_space: u64 = 0; 55 56 for disk in disks.iter() { 57 let mount_point = disk.mount_point(); 58 if exe_path.starts_with(mount_point) { 59 if best_disk_mount.is_none() 60 || mount_point.as_os_str().len() > best_disk_mount.unwrap().as_os_str().len() 61 { 62 best_disk_mount = Some(mount_point); 63 best_disk_space = disk.available_space(); 64 } 65 } 66 } 67 68 if best_disk_mount.is_some() { 69 available_storage_mb = (best_disk_space / (1024 * 1024)) as u32; 70 } else { 71 let mut total_space: u64 = 0; 72 for disk in disks.iter() { 73 match disk.kind() { 74 sysinfo::DiskKind::HDD | sysinfo::DiskKind::SSD => { 75 total_space += disk.available_space(); 76 } 77 _ => {} 78 } 79 } 80 if total_space == 0 { 81 for disk in disks.iter() { 82 total_space += disk.available_space(); 83 } 84 } 85 available_storage_mb = (total_space / (1024 * 1024)) as u32; 86 } 87 88 (cpu_cores, available_ram_mb, available_storage_mb) 89} 90 91async fn spin_up_job(job: Job) { 92 info!("Spinning up job: {}", job.id); 93 tokio::time::sleep(Duration::from_secs(5)).await; 94 info!("Job {} completed. Payload: {}", job.id, job.payload); 95} 96 97#[tokio::main] 98#[instrument] 99async fn main() { 100 dotenv().ok(); 101 FmtSubscriber::builder() 102 .with_max_level(tracing::Level::INFO) 103 .init(); 104 105 let server_url = 106 env::var("CONTROL_PLANE_URL").unwrap_or_else(|_| "ws://127.0.0.1:8080".to_string()); 107 108 info!("Attempting to connect to job board at {}", server_url); 109 110 let url = Url::parse(&server_url).expect("Failed to parse server URL"); 111 112 loop { 113 match connect_async(&url).await { 114 Ok((ws_stream, _response)) => { 115 info!("Successfully connected to job board"); 116 let (mut write, mut read) = ws_stream.split(); 117 118 while let Some(msg) = read.next().await { 119 match msg { 120 Ok(Message::Text(text)) => { 121 match serde_json::from_str::<ServerMessage>(&text) { 122 Ok(server_msg) => match server_msg { 123 ServerMessage::PingForBids { job_id } => { 124 info!("Received bid request for job_id: {}", job_id); 125 let (cpu, ram, storage) = get_system_resources(); 126 let bid = Bid { 127 job_id: job_id.clone(), 128 available_cpu_cores: cpu, 129 available_ram_mb: ram, 130 available_storage_mb: storage, 131 }; 132 let response_msg = WorkerMessage::BidResponse(bid); 133 let response_json = serde_json::to_string(&response_msg) 134 .expect("Failed to serialize bid response"); 135 136 if let Err(e) = 137 write.send(Message::Text(response_json.clone())).await 138 { 139 error!("Failed to send bid response: {}", e); 140 break; 141 } 142 info!("Sent bid for job_id: {}", job_id); 143 } 144 ServerMessage::AssignJob { job } => { 145 info!("Won bid! Assigned job: {}", job.id); 146 tokio::spawn(spin_up_job(job)); 147 } 148 ServerMessage::Acknowledge => { 149 info!("Received Acknowledge from server."); 150 } 151 }, 152 Err(e) => { 153 error!( 154 "Failed to deserialize server message: {}. Raw text: {}", 155 e, text 156 ); 157 } 158 } 159 } 160 Ok(Message::Close(_)) => { 161 info!("Server closed the connection."); 162 break; 163 } 164 Err(e) => { 165 error!("Error receiving message: {}", e); 166 break; 167 } 168 _ => {} 169 } 170 } 171 } 172 Err(e) => { 173 error!("Failed to connect: {}", e); 174 } 175 } 176 info!("Disconnected. Reconnecting in 5 seconds..."); 177 tokio::time::sleep(Duration::from_secs(5)).await; 178 } 179} 180