diff --git a/packet-inspector/src/main.rs b/packet-inspector/src/main.rs index a0bd085..5f70e7a 100644 --- a/packet-inspector/src/main.rs +++ b/packet-inspector/src/main.rs @@ -1,8 +1,9 @@ use std::error::Error; -use std::fmt; +use std::io::ErrorKind; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use std::{fmt, io}; use anyhow::bail; use chrono::{DateTime, Utc}; @@ -11,7 +12,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::Semaphore; -use valence::protocol::codec::{Decoder, Encoder}; +use valence::protocol::codec::Decoder; use valence::protocol::packets::handshake::{Handshake, HandshakeNextState}; use valence::protocol::packets::login::c2s::{EncryptionResponse, LoginStart}; use valence::protocol::packets::login::s2c::{LoginSuccess, S2cLoginPacket}; @@ -20,6 +21,7 @@ use valence::protocol::packets::play::s2c::S2cPlayPacket; use valence::protocol::packets::status::c2s::{PingRequest, StatusRequest}; use valence::protocol::packets::status::s2c::{PongResponse, StatusResponse}; use valence::protocol::packets::{DecodePacket, EncodePacket}; +use valence::protocol::{Encode, VarInt}; #[derive(Parser, Clone, Debug)] #[clap(author, version, about)] @@ -35,7 +37,7 @@ struct Cli { #[clap(short, long)] max_connections: Option, - /// When enabled, prints a timestamp before each packet. + /// Print a timestamp before each packet. #[clap(short, long)] timestamp: bool, } @@ -53,12 +55,22 @@ impl Cli { async fn rw_packet( &self, read: &mut Decoder, - write: &mut Encoder, + write: &mut OwnedWriteHalf, ) -> anyhow::Result

{ - let pkt = read.read_packet().await?; - self.print(&pkt); - write.write_packet(&pkt).await?; - Ok(pkt) + let pkt = read.read_packet().await; + + if let Ok(pkt) = &pkt { + self.print(pkt); + } + + let mut len_buf = [0u8; VarInt::MAX_SIZE]; + let len = VarInt(read.packet_buf().len() as i32); + len.encode(&mut len_buf.as_mut_slice())?; + + write.write_all(&len_buf[..len.written_size()]).await?; + write.write_all(read.packet_buf()).await?; + + pkt } } @@ -96,16 +108,14 @@ async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> { let server = TcpStream::connect(cli.server).await?; - let (client_read, client_write) = client.into_split(); - let (server_read, server_write) = server.into_split(); + let (client_read, mut client_write) = client.into_split(); + let (server_read, mut server_write) = server.into_split(); let timeout = Duration::from_secs(10); let mut client_read = Decoder::new(client_read, timeout); - let mut client_write = Encoder::new(client_write, timeout); let mut server_read = Decoder::new(server_read, timeout); - let mut server_write = Encoder::new(server_write, timeout); let handshake: Handshake = cli.rw_packet(&mut client_read, &mut server_write).await?; @@ -136,16 +146,14 @@ async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> { eprintln!("Encryption was enabled! I can't see what's going on anymore."); return tokio::select! { - c2s = passthrough(client_read.into_inner(), server_write.into_inner()) => c2s, - s2c = passthrough(server_read.into_inner(), client_write.into_inner()) => s2c, + c2s = passthrough(client_read.into_inner(), server_write) => c2s, + s2c = passthrough(server_read.into_inner(), client_write) => s2c, }; } S2cLoginPacket::SetCompression(pkt) => { let threshold = pkt.threshold.0 as u32; client_read.enable_compression(threshold); - client_write.enable_compression(threshold); server_read.enable_compression(threshold); - server_write.enable_compression(threshold); cli.rw_packet::(&mut server_read, &mut client_write) .await?; @@ -159,15 +167,33 @@ async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> { let c2s = async { loop { - cli.rw_packet::(&mut client_read, &mut server_write) - .await?; + if let Err(e) = cli + .rw_packet::(&mut client_read, &mut server_write) + .await + { + if let Some(e) = e.downcast_ref::() { + if e.kind() == ErrorKind::UnexpectedEof { + return Ok(()); + } + } + eprintln!("Error while decoding serverbound packet: {e:#}"); + } } }; let s2c = async { loop { - cli.rw_packet::(&mut server_read, &mut client_write) - .await?; + if let Err(e) = cli + .rw_packet::(&mut server_read, &mut client_write) + .await + { + if let Some(e) = e.downcast_ref::() { + if e.kind() == ErrorKind::UnexpectedEof { + return Ok(()); + } + } + eprintln!("Error while decoding clientbound packet: {e:#}"); + } } }; diff --git a/src/chunk.rs b/src/chunk.rs index d4faff6..0a7241c 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,7 +1,7 @@ // TODO: https://github.com/rust-lang/rust/issues/88581 for div_ceil -use std::collections::HashMap; use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::io::Write; use std::iter::FusedIterator; @@ -39,7 +39,7 @@ impl Chunks { Entry::Occupied(mut oe) => { oe.insert(chunk); oe.into_mut() - }, + } Entry::Vacant(ve) => ve.insert(chunk), } } diff --git a/src/client/event.rs b/src/client/event.rs index 74aaba4..a632897 100644 --- a/src/client/event.rs +++ b/src/client/event.rs @@ -27,7 +27,7 @@ pub enum ClientEvent { StartSneaking, StopSneaking, StartSprinting, - StopSprinting, + StopSprinting, StartJumpWithHorse(u8), StopJumpWithHorse, LeaveBed, diff --git a/src/protocol/codec.rs b/src/protocol/codec.rs index ee00e16..1120f24 100644 --- a/src/protocol/codec.rs +++ b/src/protocol/codec.rs @@ -227,6 +227,10 @@ impl Decoder { self.compression_threshold = Some(threshold); } + pub fn packet_buf(&self) -> &[u8] { + &self.buf + } + pub fn into_inner(self) -> R { self.read }