Demonstrating core cloud concepts, starting with CaaS. Not for production use.
1use dotenvy::dotenv;
2use futures_util::{SinkExt, StreamExt};
3use serde::{Deserialize, Serialize};
4use std::env;
5use std::time::Duration;
6use sysinfo::{Disks, System};
7use tokio_tungstenite::{connect_async, tungstenite::Message};
8use tracing::{error, info, instrument};
9use tracing_subscriber::FmtSubscriber;
10use url::Url;
11
12#[derive(Debug, Serialize, Deserialize, Clone)]
13struct Job {
14 id: String,
15 payload: String,
16}
17
18#[derive(Debug, Serialize, Deserialize, Clone)]
19struct Bid {
20 job_id: String,
21 available_cpu_cores: u32,
22 available_ram_mb: u32,
23 available_storage_mb: u32,
24}
25
26#[derive(Debug, Serialize, Deserialize)]
27#[serde(tag = "type")]
28enum ServerMessage {
29 PingForBids { job_id: String },
30 AssignJob { job: Job },
31 Acknowledge,
32}
33
34#[derive(Debug, Serialize, Deserialize)]
35#[serde(tag = "type")]
36enum WorkerMessage {
37 BidResponse(Bid),
38}
39
40fn get_system_resources() -> (u32, u32, u32) {
41 let mut sys = System::new();
42 sys.refresh_cpu_all();
43 sys.refresh_memory();
44
45 let cpu_cores = sys.cpus().len() as u32;
46 let available_ram_mb = (sys.available_memory() / (1024 * 1024)) as u32;
47 let available_storage_mb: u32;
48
49 let disks = Disks::new_with_refreshed_list();
50
51 let exe_path = std::env::current_exe().unwrap_or_else(|_| std::path::PathBuf::from("/"));
52
53 let mut best_disk_mount: Option<&std::path::Path> = None;
54 let mut best_disk_space: u64 = 0;
55
56 for disk in disks.iter() {
57 let mount_point = disk.mount_point();
58 if exe_path.starts_with(mount_point) {
59 if best_disk_mount.is_none()
60 || mount_point.as_os_str().len() > best_disk_mount.unwrap().as_os_str().len()
61 {
62 best_disk_mount = Some(mount_point);
63 best_disk_space = disk.available_space();
64 }
65 }
66 }
67
68 if best_disk_mount.is_some() {
69 available_storage_mb = (best_disk_space / (1024 * 1024)) as u32;
70 } else {
71 let mut total_space: u64 = 0;
72 for disk in disks.iter() {
73 match disk.kind() {
74 sysinfo::DiskKind::HDD | sysinfo::DiskKind::SSD => {
75 total_space += disk.available_space();
76 }
77 _ => {}
78 }
79 }
80 if total_space == 0 {
81 for disk in disks.iter() {
82 total_space += disk.available_space();
83 }
84 }
85 available_storage_mb = (total_space / (1024 * 1024)) as u32;
86 }
87
88 (cpu_cores, available_ram_mb, available_storage_mb)
89}
90
91async fn spin_up_job(job: Job) {
92 info!("Spinning up job: {}", job.id);
93 tokio::time::sleep(Duration::from_secs(5)).await;
94 info!("Job {} completed. Payload: {}", job.id, job.payload);
95}
96
97#[tokio::main]
98#[instrument]
99async fn main() {
100 dotenv().ok();
101 FmtSubscriber::builder()
102 .with_max_level(tracing::Level::INFO)
103 .init();
104
105 let server_url =
106 env::var("CONTROL_PLANE_URL").unwrap_or_else(|_| "ws://127.0.0.1:8080".to_string());
107
108 info!("Attempting to connect to job board at {}", server_url);
109
110 let url = Url::parse(&server_url).expect("Failed to parse server URL");
111
112 loop {
113 match connect_async(&url).await {
114 Ok((ws_stream, _response)) => {
115 info!("Successfully connected to job board");
116 let (mut write, mut read) = ws_stream.split();
117
118 while let Some(msg) = read.next().await {
119 match msg {
120 Ok(Message::Text(text)) => {
121 match serde_json::from_str::<ServerMessage>(&text) {
122 Ok(server_msg) => match server_msg {
123 ServerMessage::PingForBids { job_id } => {
124 info!("Received bid request for job_id: {}", job_id);
125 let (cpu, ram, storage) = get_system_resources();
126 let bid = Bid {
127 job_id: job_id.clone(),
128 available_cpu_cores: cpu,
129 available_ram_mb: ram,
130 available_storage_mb: storage,
131 };
132 let response_msg = WorkerMessage::BidResponse(bid);
133 let response_json = serde_json::to_string(&response_msg)
134 .expect("Failed to serialize bid response");
135
136 if let Err(e) =
137 write.send(Message::Text(response_json.clone())).await
138 {
139 error!("Failed to send bid response: {}", e);
140 break;
141 }
142 info!("Sent bid for job_id: {}", job_id);
143 }
144 ServerMessage::AssignJob { job } => {
145 info!("Won bid! Assigned job: {}", job.id);
146 tokio::spawn(spin_up_job(job));
147 }
148 ServerMessage::Acknowledge => {
149 info!("Received Acknowledge from server.");
150 }
151 },
152 Err(e) => {
153 error!(
154 "Failed to deserialize server message: {}. Raw text: {}",
155 e, text
156 );
157 }
158 }
159 }
160 Ok(Message::Close(_)) => {
161 info!("Server closed the connection.");
162 break;
163 }
164 Err(e) => {
165 error!("Error receiving message: {}", e);
166 break;
167 }
168 _ => {}
169 }
170 }
171 }
172 Err(e) => {
173 error!("Failed to connect: {}", e);
174 }
175 }
176 info!("Disconnected. Reconnecting in 5 seconds...");
177 tokio::time::sleep(Duration::from_secs(5)).await;
178 }
179}
180