a bare-bones limbo server in rust (mirror of https://github.com/xoogware/crawlspace)
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(net): break player receive loop into separate thread

brwr.dev 5db47e30 3a527c8d

verified
+195 -181
+61 -55
src/net/io.rs
··· 17 17 * <https://www.gnu.org/licenses/>. 18 18 */ 19 19 20 - use std::{io::ErrorKind, net::SocketAddr, time::Duration}; 20 + use std::{io::ErrorKind, time::Duration}; 21 21 22 22 use bytes::BytesMut; 23 23 use color_eyre::eyre::{bail, Context, Result}; 24 24 use tokio::{ 25 25 io::{AsyncReadExt, AsyncWriteExt}, 26 - net::TcpStream, 26 + net::{ 27 + tcp::{OwnedReadHalf, OwnedWriteHalf}, 28 + TcpStream, 29 + }, 30 + sync::{Mutex, RwLock}, 27 31 }; 28 32 29 33 use crate::protocol::{self, ClientboundPacket, Frame, ServerboundPacket}; 30 34 31 35 #[derive(Debug)] 32 36 pub struct NetIo { 33 - stream: TcpStream, 34 - frame: Frame, 35 - decoder: protocol::Decoder, 36 - encoder: protocol::Encoder, 37 + pub peer_addr: String, 38 + pub connected: RwLock<bool>, 39 + read_half: Mutex<OwnedReadHalf>, 40 + write_half: Mutex<OwnedWriteHalf>, 41 + frame: Mutex<Frame>, 42 + decoder: Mutex<protocol::Decoder>, 43 + encoder: Mutex<protocol::Encoder>, 37 44 } 38 45 39 46 const BUF_SIZE: usize = 4096; ··· 50 57 ); 51 58 } 52 59 60 + let peer_addr = stream 61 + .peer_addr() 62 + .map_or("Unknown".to_owned(), |a| a.to_string()); 63 + let (read_half, write_half) = stream.into_split(); 64 + 53 65 Self { 54 - stream, 55 - frame: Frame { 66 + peer_addr, 67 + connected: RwLock::new(true), 68 + read_half: Mutex::new(read_half), 69 + write_half: Mutex::new(write_half), 70 + frame: Mutex::new(Frame { 56 71 id: -1, 57 72 body: BytesMut::new(), 58 - }, 59 - decoder: protocol::Decoder::new(), 60 - encoder: protocol::Encoder::new(), 73 + }), 74 + decoder: Mutex::new(protocol::Decoder::new()), 75 + encoder: Mutex::new(protocol::Encoder::new()), 61 76 } 62 77 } 63 78 64 - pub fn peer_addr(&self) -> Result<SocketAddr> { 65 - Ok(self.stream.peer_addr()?) 79 + pub async fn connected(&self) -> bool { 80 + let c = self.connected.read().await; 81 + *c 66 82 } 67 83 68 - pub async fn rx<'a, 'b, P>(&'a mut self) -> Result<P> 84 + pub async fn rx<'a, 'b, P>(&'a self) -> Result<Frame> 69 85 where 70 86 P: ServerboundPacket<'a>, 71 87 { 72 88 // TODO: maybe move this somewhere else? i don't know if a global timeout of 5 seconds per 73 89 // packet is realistic but for testing it's chill i suppose 74 90 tokio::time::timeout(Duration::from_secs(5), async move { 91 + let mut decoder = self.decoder.lock().await; 75 92 loop { 76 - if let Some(frame) = self 77 - .decoder 78 - .try_read_next() 79 - .context("failed try_read_next")? 80 - { 93 + if let Some(frame) = decoder.try_read_next().context("failed try_read_next")? { 81 94 if frame.id != P::ID { 82 95 debug!( 83 96 "Got packet ID {} while awaiting {}, discarding", ··· 87 100 continue; 88 101 } 89 102 90 - self.frame = frame; 91 - let r = self.frame.decode()?; 92 - debug!("Got packet {:?}", r); 93 - return Ok(r); 103 + return Ok(frame); 94 104 }; 95 105 96 - self.decoder.reserve_additional(BUF_SIZE); 97 - let mut buf = self.decoder.take_all(); 106 + decoder.reserve_additional(BUF_SIZE); 107 + let mut buf = decoder.take_all(); 98 108 99 - if self 100 - .stream 109 + let mut read_half = self.read_half.lock().await; 110 + if read_half 101 111 .read_buf(&mut buf) 102 112 .await 103 113 .context("failed read_buf")? 104 114 == 0 105 115 { 116 + let mut c = self.connected.write().await; 117 + *c = false; 106 118 return Err(std::io::Error::from(ErrorKind::UnexpectedEof).into()); 107 119 } 108 120 109 - self.decoder.add_bytes(buf); 121 + decoder.add_bytes(buf); 110 122 } 111 123 }) 112 124 .await? 113 125 } 114 126 115 - pub async fn tx<P>(&mut self, packet: &P) -> Result<()> 127 + pub async fn tx<P>(&self, packet: &P) -> Result<()> 116 128 where 117 129 P: ClientboundPacket, 118 130 { 119 131 trace!("Sending packet {:?}", packet); 120 - self.encoder.append_packet(packet)?; 121 - let bytes = self.encoder.take(); 132 + let mut encoder = self.encoder.lock().await; 133 + encoder.append_packet(packet)?; 134 + let bytes = encoder.take(); 122 135 trace!("raw packet is {} bytes", bytes.len()); 123 - Ok(self.stream.write_all(&bytes).await?) 136 + let mut writer = self.write_half.lock().await; 137 + Ok(writer.write_all(&bytes).await?) 124 138 } 125 139 126 - pub async fn tx_raw(&mut self, packet: &[u8]) -> Result<()> { 140 + pub async fn tx_raw(&self, packet: &[u8]) -> Result<()> { 127 141 trace!("Sending packet {:?}", packet); 128 - Ok(self.stream.write_all(packet).await?) 129 - } 130 - 131 - pub async fn flush(&mut self) -> Result<()> { 132 - self.stream.flush().await?; 133 - Ok(()) 142 + let mut writer = self.write_half.lock().await; 143 + Ok(writer.write_all(packet).await?) 134 144 } 135 145 136 - pub async fn rx_raw(&mut self) -> Result<Frame> { 137 - if let Some(frame) = self 138 - .decoder 139 - .try_read_next() 140 - .context("failed try_read_next")? 141 - { 146 + pub async fn rx_raw(&self) -> Result<Frame> { 147 + let mut decoder = self.decoder.lock().await; 148 + if let Some(frame) = decoder.try_read_next().context("failed try_read_next")? { 142 149 return Ok(frame); 143 150 }; 144 151 145 - self.decoder.reserve_additional(BUF_SIZE); 146 - let mut buf = self.decoder.take_all(); 152 + decoder.reserve_additional(BUF_SIZE); 153 + let mut buf = decoder.take_all(); 147 154 148 - if self 149 - .stream 150 - .read_buf(&mut buf) 151 - .await 152 - .context("failed read_buf")? 153 - == 0 154 155 { 155 - return Err(std::io::Error::from(ErrorKind::UnexpectedEof).into()); 156 + let mut reader = self.read_half.lock().await; 157 + if reader.read_buf(&mut buf).await.context("failed read_buf")? == 0 { 158 + let mut c = self.connected.write().await; 159 + *c = false; 160 + return Err(std::io::Error::from(ErrorKind::UnexpectedEof).into()); 161 + } 156 162 } 157 163 158 - self.decoder.add_bytes(buf); 164 + decoder.add_bytes(buf); 159 165 160 166 bail!("No packet available") 161 167 }
+120 -119
src/net/player.rs
··· 34 34 sync::{Mutex, OwnedSemaphorePermit, RwLock}, 35 35 time::{self, timeout, Instant}, 36 36 }; 37 + use tokio_util::sync::CancellationToken; 37 38 use uuid::Uuid; 38 39 39 40 use crate::{ ··· 63 64 pub struct Player { 64 65 pub id: u16, 65 66 _permit: OwnedSemaphorePermit, 66 - pub io: Mutex<NetIo>, 67 + pub io: NetIo, 68 + frame_queue: Mutex<Vec<Frame>>, 67 69 68 70 crawlstate: CrawlState, 69 71 packet_state: RwLock<PacketState>, ··· 98 100 ) -> Self { 99 101 Self(Arc::new(Player { 100 102 id, 101 - io: Mutex::new(NetIo::new(connection)), 103 + io: NetIo::new(connection), 104 + frame_queue: Mutex::new(Vec::new()), 102 105 _permit: permit, 103 106 104 107 crawlstate, ··· 123 126 124 127 pub async fn connect(&self) { 125 128 { 126 - let io = self.0.io.lock().await; 127 - let addy = io 128 - .peer_addr() 129 - .map_or("Unknown".to_string(), |a| a.to_string()); 130 - debug!("Handling new player (id {}) from {}", self.0.id, addy); 129 + debug!( 130 + "Handling new player (id {}) from {}", 131 + self.0.id, self.0.io.peer_addr 132 + ); 131 133 } 132 134 133 135 // crawlspace intentionally doesn't support legacy pings :3 ··· 156 158 157 159 async fn handshake(&self) -> Result<()> { 158 160 let state = self.0.crawlstate.clone(); 159 - let mut io = self.0.io.lock().await; 160 161 161 - let p = io.rx::<HandshakeS>().await?; 162 + let p = self.0.io.rx::<HandshakeS>().await?; 163 + let p = p.decode::<HandshakeS>()?; 162 164 163 165 if p.protocol_version.0 != state.version_number { 164 166 warn!( ··· 168 170 } 169 171 170 172 let next_state = p.next_state; 171 - drop(io); 172 173 173 174 let mut s = self.0.packet_state.write().await; 174 175 match next_state { ··· 189 190 } 190 191 191 192 async fn handle_status(&self) -> Result<()> { 192 - let mut io = self.0.io.lock().await; 193 - 194 - io.rx::<StatusRequestS>().await?; 193 + self.0.io.rx::<StatusRequestS>().await?; 195 194 let state = self.0.crawlstate.clone(); 196 195 197 196 let res = json!({ ··· 213 212 json_respose: &res.to_string(), 214 213 }; 215 214 216 - io.tx(&res).await?; 217 - let ping = io.rx::<Ping>().await?; 218 - io.tx(&ping).await?; 215 + self.0.io.tx(&res).await?; 216 + let ping: Ping = self.0.io.rx::<Ping>().await?.decode()?; 217 + 218 + self.0.io.tx(&ping).await?; 219 219 220 220 Ok(()) 221 221 } 222 222 223 223 async fn login(&self) -> Result<()> { 224 224 let state = self.0.crawlstate.clone(); 225 - let mut io = self.0.io.lock().await; 226 225 227 - let login = io.rx::<LoginStartS>().await?; 226 + let login = self.0.io.rx::<LoginStartS>().await?; 227 + let login: LoginStartS = login.decode()?; 228 228 229 229 // need to manually clone this or else the reference to self.io lives too long 230 230 // TODO: clean up lifetimes on encode/decode - possibly just clone strings? ··· 246 246 *own_uuid = Some(uuid); 247 247 } 248 248 249 - io.tx(&success).await?; 250 - io.rx::<LoginAckS>().await?; 249 + self.0.io.tx(&success).await?; 250 + self.0.io.rx::<LoginAckS>().await?; 251 251 252 252 let clientbound_known_packs = KnownPacksC::of_version(&state.version_name); 253 - io.tx(&clientbound_known_packs).await?; 253 + self.0.io.tx(&clientbound_known_packs).await?; 254 254 255 255 // TODO: maybe(?) actually handle this 256 - io.rx::<KnownPacksS>().await?; 256 + self.0.io.rx::<KnownPacksS>().await?; 257 257 258 - io.tx_raw(&state.registry_cache.encoded).await?; 258 + self.0.io.tx_raw(&state.registry_cache.encoded).await?; 259 259 260 - io.tx(&FinishConfigurationC).await?; 261 - io.rx::<FinishConfigurationAckS>().await?; 260 + self.0.io.tx(&FinishConfigurationC).await?; 261 + self.0.io.rx::<FinishConfigurationAckS>().await?; 262 262 263 263 Ok(()) 264 264 } ··· 282 282 *packet_state = PacketState::Play; 283 283 284 284 let state = self.0.crawlstate.clone(); 285 - let mut io = self.0.io.lock().await; 286 285 287 286 let max_players: i32 = state.max_players.try_into().unwrap_or(50); 288 287 ··· 308 307 enforces_secure_chat: false, 309 308 }; 310 309 311 - io.tx(&login).await?; 310 + self.0.io.tx(&login).await?; 312 311 313 - io.tx(&SetTickingStateC { 314 - tick_rate: 20.0, 315 - is_frozen: false, 316 - }) 317 - .await?; 318 - 319 - io.tx(&StepTicksC(10)).await?; 312 + self.0 313 + .io 314 + .tx(&SetTickingStateC { 315 + tick_rate: 20.0, 316 + is_frozen: false, 317 + }) 318 + .await?; 320 319 321 - drop(io); 320 + self.0.io.tx(&StepTicksC(10)).await?; 322 321 323 322 let spawnpoint = state.spawnpoint; 324 323 self.teleport_awaiting(spawnpoint.0, spawnpoint.1, spawnpoint.2, 0.0, 0.0) 325 324 .await?; 326 325 327 - let mut io = self.0.io.lock().await; 328 - 329 - io.tx(&SetBorderCenterC { 330 - x: spawnpoint.0, 331 - z: spawnpoint.2, 332 - }) 333 - .await?; 326 + self.0 327 + .io 328 + .tx(&SetBorderCenterC { 329 + x: spawnpoint.0, 330 + z: spawnpoint.2, 331 + }) 332 + .await?; 334 333 335 - io.tx(&SetBorderSizeC(state.border_radius as f64 * 2.0)) 334 + self.0 335 + .io 336 + .tx(&SetBorderSizeC(state.border_radius as f64 * 2.0)) 336 337 .await?; 337 338 338 - io.tx(&PlayerInfoUpdateC { 339 - players: &[PlayerStatus::for_player(self.uuid().await) 340 - .add_player("You're alone...", &[]) 341 - .update_listed(true)], 342 - }) 343 - .await?; 339 + self.0 340 + .io 341 + .tx(&PlayerInfoUpdateC { 342 + players: &[PlayerStatus::for_player(self.uuid().await) 343 + .add_player("You're alone...", &[]) 344 + .update_listed(true)], 345 + }) 346 + .await?; 344 347 345 348 let await_chunks = GameEventC::from(GameEvent::StartWaitingForLevelChunks); 346 - io.tx(&await_chunks).await?; 349 + self.0.io.tx(&await_chunks).await?; 347 350 348 351 let set_center = SetCenterChunkC { 349 352 x: VarInt(spawnpoint.0.floor() as i32 / 16), 350 353 y: VarInt(spawnpoint.2.floor() as i32 / 16), 351 354 }; 352 - io.tx(&set_center).await?; 353 - drop(io); 355 + self.0.io.tx(&set_center).await?; 354 356 355 357 // FIXME: GROSS LOL?????? this should(?) change ownership of the player to the server 356 358 // thread but realistically who knows burhhhh 357 359 state.player_send.send(self.clone()).await?; 360 + self.spawn_read_loop(); 358 361 359 362 Ok(()) 360 363 } 361 364 362 365 pub async fn handle_all_packets(&self) -> Result<()> { 363 - let Some(packets) = self.rx_all().await else { 364 - bail!("Connection for player {} was closed", self.id()) 366 + let packets = { 367 + let mut frame_queue = self.0.frame_queue.lock().await; 368 + std::mem::take(&mut *frame_queue) 365 369 }; 366 370 367 - debug!("player {} {:?}", self.id(), packets); 368 - self.handle_frames(packets).await 371 + for packet in packets { 372 + self.handle_frame(packet).await?; 373 + } 374 + 375 + Ok(()) 369 376 } 370 377 371 378 pub async fn keepalive(&self) -> Result<()> { ··· 392 399 } 393 400 } 394 401 395 - async fn rx_all(&self) -> Option<Vec<Frame>> { 396 - let mut io = self.0.io.lock().await; 397 - 398 - let mut frames = Vec::new(); 399 - 400 - loop { 401 - match io.rx_raw().await { 402 - Ok(frame) => frames.push(frame), 403 - Err(why) => match why.downcast_ref::<tokio::io::Error>().map(|e| e.kind()) { 404 - Some(tokio::io::ErrorKind::UnexpectedEof) => return None, 405 - _ => break, 406 - }, 407 - } 408 - } 409 - 410 - Some(frames) 411 - } 412 - 413 402 async fn ping(&self, id: i64) -> Result<()> { 414 - let mut io = self.0.io.lock().await; 415 403 let ka = KeepAliveC(id); 416 - io.tx(&ka).await?; 404 + self.0.io.tx(&ka).await?; 417 405 // TODO: check return keepalive, kick 418 406 Ok(()) 419 407 } ··· 438 426 }; 439 427 } 440 428 441 - let mut io = self.0.io.lock().await; 442 - 443 429 let tp = SynchronisePositionC::new(x, y, z, yaw, pitch); 444 430 { 445 431 let mut tp_state = self.0.tp_state.write().await; 446 432 // player will be given 5 (FIVE) SECONDS TO ACK!!!!! 447 433 *tp_state = TeleportState::Pending(tp.id, Instant::now()); 448 434 } 449 - io.tx(&tp).await?; 435 + self.0.io.tx(&tp).await?; 450 436 451 - let tp_ack = io.rx::<ConfirmTeleportS>().await?; 437 + let tp_ack = self.0.io.rx::<ConfirmTeleportS>().await?; 438 + let tp_ack = tp_ack.decode::<ConfirmTeleportS>()?; 452 439 453 440 match tokio::time::timeout(Duration::from_secs(5), self.confirm_teleport(tp_ack.id)).await { 454 441 Ok(Ok(())) => { ··· 506 493 } 507 494 } 508 495 509 - async fn handle_frames(&self, frames: Vec<Frame>) -> Result<()> { 510 - for frame in frames { 511 - match frame.id { 512 - SetPlayerPositionS::ID => { 513 - let packet: SetPlayerPositionS = frame.decode()?; 496 + fn spawn_read_loop(&self) { 497 + let player = self.clone(); 514 498 515 - let tp_state = self.0.tp_state.read().await; 516 - match *tp_state { 517 - TeleportState::Clear => { 518 - let mut entity = self.0.entity.write().await; 519 - entity.reposition(packet.x, packet.feet_y, packet.z); 520 - } 499 + tokio::spawn(async move { 500 + loop { 501 + match player.0.io.rx_raw().await { 502 + Ok(frame) => { 503 + let mut queue = player.0.frame_queue.lock().await; 504 + queue.push(frame); 505 + } 506 + Err(why) => match why.downcast_ref::<tokio::io::Error>().map(|e| e.kind()) { 507 + Some(tokio::io::ErrorKind::UnexpectedEof) => return, 521 508 _ => (), 509 + }, 510 + } 511 + } 512 + }); 513 + } 514 + 515 + async fn handle_frame(&self, frame: Frame) -> Result<()> { 516 + match frame.id { 517 + SetPlayerPositionS::ID => { 518 + let packet: SetPlayerPositionS = frame.decode()?; 519 + 520 + let tp_state = self.0.tp_state.read().await; 521 + match *tp_state { 522 + TeleportState::Clear => { 523 + let mut entity = self.0.entity.write().await; 524 + entity.reposition(packet.x, packet.feet_y, packet.z); 522 525 } 526 + _ => (), 523 527 } 528 + } 524 529 525 - SetPlayerPositionAndRotationS::ID => { 526 - let packet: SetPlayerPositionAndRotationS = frame.decode()?; 530 + SetPlayerPositionAndRotationS::ID => { 531 + let packet: SetPlayerPositionAndRotationS = frame.decode()?; 527 532 528 - let tp_state = self.0.tp_state.read().await; 529 - match *tp_state { 530 - TeleportState::Clear => { 531 - let mut entity = self.0.entity.write().await; 532 - entity.reposition(packet.x, packet.feet_y, packet.z); 533 - entity.rotate(packet.yaw, packet.pitch); 534 - } 535 - _ => (), 533 + let tp_state = self.0.tp_state.read().await; 534 + match *tp_state { 535 + TeleportState::Clear => { 536 + let mut entity = self.0.entity.write().await; 537 + entity.reposition(packet.x, packet.feet_y, packet.z); 538 + entity.rotate(packet.yaw, packet.pitch); 536 539 } 540 + _ => (), 537 541 } 542 + } 538 543 539 - ConfirmTeleportS::ID => { 540 - let packet: ConfirmTeleportS = frame.decode()?; 541 - self.check_teleports(Some(packet)).await?; 542 - } 544 + ConfirmTeleportS::ID => { 545 + let packet: ConfirmTeleportS = frame.decode()?; 546 + self.check_teleports(Some(packet)).await?; 547 + } 543 548 544 - UseItemOnS::ID => { 545 - let packet: UseItemOnS = frame.decode()?; 546 - self.handle_use_item(packet).await?; 547 - } 549 + UseItemOnS::ID => { 550 + let packet: UseItemOnS = frame.decode()?; 551 + self.handle_use_item(packet).await?; 552 + } 548 553 549 - id => { 550 - debug!( 551 - "Got packet with id {id} from player {}, ignoring", 552 - self.0.id 553 - ); 554 - } 554 + id => { 555 + debug!( 556 + "Got packet with id {id} from player {}, ignoring", 557 + self.0.id 558 + ); 555 559 } 556 560 } 557 561 ··· 567 571 title: "Hi".into(), 568 572 }; 569 573 570 - { 571 - let mut io = self.0.io.lock().await; 572 - io.tx(&OpenScreenC::from(&window)).await?; 573 - } 574 + self.0.io.tx(&OpenScreenC::from(&window)).await?; 574 575 575 576 { 576 577 let mut sw = self.0.window.write().await;
+14 -7
src/server/mod.rs
··· 20 20 pub mod ticker; 21 21 pub mod window; 22 22 23 - use std::{collections::HashMap, sync::Arc}; 23 + use std::{ 24 + collections::{HashMap, HashSet}, 25 + sync::Arc, 26 + }; 24 27 25 28 use color_eyre::eyre::Result; 26 29 use tokio::time::Instant; ··· 68 71 tokio::spawn(Self::send_world_to(p.clone(), self.world_cache.clone())); 69 72 } 70 73 71 - let mut invalid_players: Vec<u16> = Vec::new(); 74 + let mut invalid_players: HashSet<u16> = HashSet::new(); 72 75 73 76 for (id, player) in &self.players { 74 77 let _ = player.keepalive().await; ··· 77 80 Ok(()) => (), 78 81 Err(why) => { 79 82 error!("error handling packets for player {}: {why}", player.id()); 80 - invalid_players.push(*id); 83 + invalid_players.insert(*id); 81 84 continue; 82 85 } 83 86 } 84 87 88 + { 89 + if player.0.io.connected().await == false { 90 + invalid_players.insert(*id); 91 + } 92 + } 93 + 85 94 match player.check_teleports(None).await { 86 95 Err(TeleportError::TimedOut) | Err(TeleportError::WrongId(..)) => { 87 96 warn!("Player {} teleport failed, removing", player.0.id); 88 - invalid_players.push(*id); 97 + invalid_players.insert(*id); 89 98 } 90 99 _ => (), 91 100 } ··· 104 113 } 105 114 106 115 async fn send_world_to(player: SharedPlayer, world_cache: Arc<WorldCache>) -> Result<()> { 107 - let mut io = player.0.io.lock().await; 108 - 109 116 for packet in world_cache.encoded.iter() { 110 - io.tx_raw(packet).await?; 117 + player.0.io.tx_raw(packet).await?; 111 118 } 112 119 113 120 Ok(())