mirror of
https://github.com/italicsjenga/valence.git
synced 2025-01-11 15:21:31 +11:00
Make the packet inspector error resistant
This commit is contained in:
parent
c4590a45fc
commit
0f8b906265
|
@ -1,8 +1,9 @@
|
|||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::io::ErrorKind;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{fmt, io};
|
||||
|
||||
use anyhow::bail;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
@ -11,7 +12,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|||
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::Semaphore;
|
||||
use valence::protocol::codec::{Decoder, Encoder};
|
||||
use valence::protocol::codec::Decoder;
|
||||
use valence::protocol::packets::handshake::{Handshake, HandshakeNextState};
|
||||
use valence::protocol::packets::login::c2s::{EncryptionResponse, LoginStart};
|
||||
use valence::protocol::packets::login::s2c::{LoginSuccess, S2cLoginPacket};
|
||||
|
@ -20,6 +21,7 @@ use valence::protocol::packets::play::s2c::S2cPlayPacket;
|
|||
use valence::protocol::packets::status::c2s::{PingRequest, StatusRequest};
|
||||
use valence::protocol::packets::status::s2c::{PongResponse, StatusResponse};
|
||||
use valence::protocol::packets::{DecodePacket, EncodePacket};
|
||||
use valence::protocol::{Encode, VarInt};
|
||||
|
||||
#[derive(Parser, Clone, Debug)]
|
||||
#[clap(author, version, about)]
|
||||
|
@ -35,7 +37,7 @@ struct Cli {
|
|||
#[clap(short, long)]
|
||||
max_connections: Option<usize>,
|
||||
|
||||
/// When enabled, prints a timestamp before each packet.
|
||||
/// Print a timestamp before each packet.
|
||||
#[clap(short, long)]
|
||||
timestamp: bool,
|
||||
}
|
||||
|
@ -53,12 +55,22 @@ impl Cli {
|
|||
async fn rw_packet<P: DecodePacket + EncodePacket>(
|
||||
&self,
|
||||
read: &mut Decoder<OwnedReadHalf>,
|
||||
write: &mut Encoder<OwnedWriteHalf>,
|
||||
write: &mut OwnedWriteHalf,
|
||||
) -> anyhow::Result<P> {
|
||||
let pkt = read.read_packet().await?;
|
||||
self.print(&pkt);
|
||||
write.write_packet(&pkt).await?;
|
||||
Ok(pkt)
|
||||
let pkt = read.read_packet().await;
|
||||
|
||||
if let Ok(pkt) = &pkt {
|
||||
self.print(pkt);
|
||||
}
|
||||
|
||||
let mut len_buf = [0u8; VarInt::MAX_SIZE];
|
||||
let len = VarInt(read.packet_buf().len() as i32);
|
||||
len.encode(&mut len_buf.as_mut_slice())?;
|
||||
|
||||
write.write_all(&len_buf[..len.written_size()]).await?;
|
||||
write.write_all(read.packet_buf()).await?;
|
||||
|
||||
pkt
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -96,16 +108,14 @@ async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> {
|
|||
|
||||
let server = TcpStream::connect(cli.server).await?;
|
||||
|
||||
let (client_read, client_write) = client.into_split();
|
||||
let (server_read, server_write) = server.into_split();
|
||||
let (client_read, mut client_write) = client.into_split();
|
||||
let (server_read, mut server_write) = server.into_split();
|
||||
|
||||
let timeout = Duration::from_secs(10);
|
||||
|
||||
let mut client_read = Decoder::new(client_read, timeout);
|
||||
let mut client_write = Encoder::new(client_write, timeout);
|
||||
|
||||
let mut server_read = Decoder::new(server_read, timeout);
|
||||
let mut server_write = Encoder::new(server_write, timeout);
|
||||
|
||||
let handshake: Handshake = cli.rw_packet(&mut client_read, &mut server_write).await?;
|
||||
|
||||
|
@ -136,16 +146,14 @@ async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> {
|
|||
eprintln!("Encryption was enabled! I can't see what's going on anymore.");
|
||||
|
||||
return tokio::select! {
|
||||
c2s = passthrough(client_read.into_inner(), server_write.into_inner()) => c2s,
|
||||
s2c = passthrough(server_read.into_inner(), client_write.into_inner()) => s2c,
|
||||
c2s = passthrough(client_read.into_inner(), server_write) => c2s,
|
||||
s2c = passthrough(server_read.into_inner(), client_write) => s2c,
|
||||
};
|
||||
}
|
||||
S2cLoginPacket::SetCompression(pkt) => {
|
||||
let threshold = pkt.threshold.0 as u32;
|
||||
client_read.enable_compression(threshold);
|
||||
client_write.enable_compression(threshold);
|
||||
server_read.enable_compression(threshold);
|
||||
server_write.enable_compression(threshold);
|
||||
|
||||
cli.rw_packet::<LoginSuccess>(&mut server_read, &mut client_write)
|
||||
.await?;
|
||||
|
@ -159,15 +167,33 @@ async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> {
|
|||
|
||||
let c2s = async {
|
||||
loop {
|
||||
cli.rw_packet::<C2sPlayPacket>(&mut client_read, &mut server_write)
|
||||
.await?;
|
||||
if let Err(e) = cli
|
||||
.rw_packet::<C2sPlayPacket>(&mut client_read, &mut server_write)
|
||||
.await
|
||||
{
|
||||
if let Some(e) = e.downcast_ref::<io::Error>() {
|
||||
if e.kind() == ErrorKind::UnexpectedEof {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
eprintln!("Error while decoding serverbound packet: {e:#}");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let s2c = async {
|
||||
loop {
|
||||
cli.rw_packet::<S2cPlayPacket>(&mut server_read, &mut client_write)
|
||||
.await?;
|
||||
if let Err(e) = cli
|
||||
.rw_packet::<S2cPlayPacket>(&mut server_read, &mut client_write)
|
||||
.await
|
||||
{
|
||||
if let Some(e) = e.downcast_ref::<io::Error>() {
|
||||
if e.kind() == ErrorKind::UnexpectedEof {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
eprintln!("Error while decoding clientbound packet: {e:#}");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// TODO: https://github.com/rust-lang/rust/issues/88581 for div_ceil
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::iter::FusedIterator;
|
||||
|
||||
|
@ -39,7 +39,7 @@ impl Chunks {
|
|||
Entry::Occupied(mut oe) => {
|
||||
oe.insert(chunk);
|
||||
oe.into_mut()
|
||||
},
|
||||
}
|
||||
Entry::Vacant(ve) => ve.insert(chunk),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -227,6 +227,10 @@ impl<R: AsyncRead + Unpin> Decoder<R> {
|
|||
self.compression_threshold = Some(threshold);
|
||||
}
|
||||
|
||||
pub fn packet_buf(&self) -> &[u8] {
|
||||
&self.buf
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> R {
|
||||
self.read
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue