Update packet inspector and fix packet bugs (#172)

- Implement `encoded_len` for packet enums.
- Fix bug in `has_next_packet` causing a later `unwrap` to panic.
- Add robust `encoded_len` check in `PacketEncoder` for debug builds.
- Split packet inspector regex into separate "inclusive" and "exclusive"
arguments because the `regex` crate does not support negative lookahead.
- Better `Debug` impls for `Ident` and `Compound`.
- Remove read/write timeouts from packet inspector.
This commit is contained in:
Ryan Johnson 2022-12-16 08:23:48 -08:00 committed by GitHub
parent c73abacc98
commit d85b7f5e89
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 133 additions and 69 deletions

View file

@ -24,12 +24,11 @@ cargo r -r -p packet_inspector -- 127.0.0.1:25566 127.0.0.1:25565
The client must connect to `localhost:25566`. You should see the packets in `stdout`.
The third argument to the packet inspector is an optional regular expression compatible with
the [regex](https://docs.rs/regex/latest/regex/) crate. Packets with names that match the regex are printed while those
that don't are ignored. If the regex is not provided then the empty string is assumed and all packets are considered
matching.
The `-i` and `-e` flags accept a regex to filter packets according to their name. The `-i` regex includes matching
packets while the `-e` regex excludes matching packets.
If you're only interested in packets `Foo`, `Bar`, and `Baz`, you can use a regex such as `^(Foo|Bar|Baz)$`.
For instance, if you only want to print the packets `Foo`, `Bar`, and `Baz`, you can use a regex such
as `^(Foo|Bar|Baz)$` with the `-i` flag.
```sh
cargo r -r -p packet_inspector -- 127.0.0.1:25566 127.0.0.1:25565 '^(Foo|Bar|Baz)$'

View file

@ -2,7 +2,6 @@ use std::error::Error;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io};
use anyhow::bail;
@ -14,7 +13,6 @@ use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use valence_protocol::packets::c2s::handshake::Handshake;
use valence_protocol::packets::c2s::login::{EncryptionResponse, LoginStart};
use valence_protocol::packets::c2s::play::C2sPlayPacket;
@ -30,15 +28,22 @@ use valence_protocol::{Decode, Encode, Packet, PacketDecoder, PacketEncoder};
struct Cli {
/// The socket address to listen for connections on. This is the address
/// clients should connect to.
client: SocketAddr,
client_addr: SocketAddr,
/// The socket address the proxy will connect to. This is the address of the
/// server.
server: SocketAddr,
/// The optional regular expression to use on packet names. Packet names
server_addr: SocketAddr,
/// An optional regular expression to use on packet names. Packet names
/// matching the regex are printed while those that don't are ignored.
///
/// If no regex is provided, all packets are considered matching.
regex: Option<Regex>,
#[clap(short, long)]
include_regex: Option<Regex>,
/// An optional regular expression to use on packet names. Packet names
/// matching the regex are ignored while those are don't are printed.
///
/// If no regex is provided, all packets are not considered matching.
#[clap(short, long)]
exclude_regex: Option<Regex>,
/// The maximum number of connections allowed to the proxy. By default,
/// there is no limit.
#[clap(short, long)]
@ -56,48 +61,49 @@ struct State {
write: OwnedWriteHalf,
}
const TIMEOUT: Duration = Duration::from_secs(10);
impl State {
pub async fn rw_packet<'a, P>(&'a mut self) -> anyhow::Result<P>
where
P: Decode<'a> + Encode + Packet + fmt::Debug,
{
timeout(TIMEOUT, async {
while !self.dec.has_next_packet()? {
self.dec.reserve(4096);
let mut buf = self.dec.take_capacity();
while !self.dec.has_next_packet()? {
self.dec.reserve(4096);
let mut buf = self.dec.take_capacity();
if self.read.read_buf(&mut buf).await? == 0 {
return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
}
self.dec.queue_bytes(buf);
if self.read.read_buf(&mut buf).await? == 0 {
return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
}
let pkt: P = self.dec.try_next_packet()?.unwrap();
self.dec.queue_bytes(buf);
}
self.enc.append_packet(&pkt)?;
let pkt: P = self.dec.try_next_packet()?.unwrap();
let bytes = self.enc.take();
self.write.write_all(&bytes).await?;
self.enc.append_packet(&pkt)?;
if let Some(r) = &self.cli.regex {
if !r.is_match(pkt.packet_name()) {
return Ok(pkt);
}
let bytes = self.enc.take();
self.write.write_all(&bytes).await?;
if let Some(r) = &self.cli.include_regex {
if !r.is_match(pkt.packet_name()) {
return Ok(pkt);
}
}
if self.cli.timestamp {
let now: DateTime<Utc> = Utc::now();
println!("{now} {pkt:#?}");
} else {
println!("{pkt:#?}");
if let Some(r) = &self.cli.exclude_regex {
if r.is_match(pkt.packet_name()) {
return Ok(pkt);
}
}
Ok(pkt)
})
.await?
if self.cli.timestamp {
let now: DateTime<Utc> = Utc::now();
println!("{now} {pkt:#?}");
} else {
println!("{pkt:#?}");
}
Ok(pkt)
}
}
@ -107,8 +113,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
let sema = Arc::new(Semaphore::new(cli.max_connections.unwrap_or(100_000)));
eprintln!("Waiting for connections on {}", cli.client);
let listen = TcpListener::bind(cli.client).await?;
eprintln!("Waiting for connections on {}", cli.client_addr);
let listen = TcpListener::bind(cli.client_addr).await?;
while let Ok(permit) = sema.clone().acquire_owned().await {
let (client, remote_client_addr) = listen.accept().await?;
@ -133,9 +139,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
async fn handle_connection(client: TcpStream, cli: Arc<Cli>) -> anyhow::Result<()> {
eprintln!("Connecting to {}", cli.server);
eprintln!("Connecting to {}", cli.server_addr);
let server = TcpStream::connect(cli.server).await?;
let server = TcpStream::connect(cli.server_addr).await?;
if let Err(e) = server.set_nodelay(true) {
eprintln!("Failed to set TCP_NODELAY: {e}");
@ -226,16 +232,15 @@ async fn handle_connection(client: TcpStream, cli: Arc<Cli>) -> anyhow::Result<(
}
async fn passthrough(mut read: OwnedReadHalf, mut write: OwnedWriteHalf) -> anyhow::Result<()> {
let mut buf = vec![0u8; 8192].into_boxed_slice();
let mut buf = Box::new([0u8; 8192]);
loop {
let bytes_read = read.read(&mut buf).await?;
let bytes_read = read.read(buf.as_mut_slice()).await?;
let bytes = &mut buf[..bytes_read];
if bytes.is_empty() {
break;
break Ok(());
}
write.write_all(bytes).await?;
}
Ok(())
}

View file

@ -79,7 +79,6 @@ where
Ok(self
.dec
.try_next_packet()?
// TODO: this panicked after a timeout.
.expect("decoder said it had another packet"))
// The following is what I want to write but can't due to borrow

View file

@ -1,4 +1,5 @@
use std::borrow::Borrow;
use std::fmt;
use std::hash::Hash;
use std::iter::FusedIterator;
use std::ops::{Index, IndexMut};
@ -7,7 +8,7 @@ use crate::to_binary_writer::encoded_len;
use crate::Value;
/// A map type with [`String`] keys and [`Value`] values.
#[derive(Clone, PartialEq, Default, Debug)]
#[derive(Clone, PartialEq, Default)]
pub struct Compound {
map: Map,
}
@ -34,6 +35,12 @@ impl Compound {
}
}
impl fmt::Debug for Compound {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.map.fmt(f)
}
}
impl Compound {
pub fn new() -> Self {
Self { map: Map::new() }

View file

@ -53,6 +53,24 @@ impl PacketEncoder {
) -> Result<()> {
let data_len = pkt.encoded_len();
#[cfg(debug_assertions)]
{
use crate::byte_counter::ByteCounter;
let mut counter = ByteCounter::new();
pkt.encode(&mut counter)?;
let actual = counter.0;
assert_eq!(
actual,
data_len,
"actual encoded size of {} packet differs from reported size (actual = {actual}, \
reported = {data_len})",
pkt.packet_name()
);
}
#[cfg(feature = "compression")]
if let Some(threshold) = self.compression_threshold {
use flate2::write::ZlibEncoder;
@ -112,15 +130,6 @@ impl PacketEncoder {
VarInt(packet_len as i32).encode(&mut slice)?;
VarInt(0).encode(&mut slice)?;
pkt.encode(&mut slice)?;
debug_assert!(
slice.is_empty(),
"actual size of {} packet differs from reported size (actual = {}, \
reported = {})",
pkt.packet_name(),
data_len - slice.len(),
data_len,
);
}
}
@ -293,7 +302,7 @@ impl PacketDecoder {
};
ensure!(
packet_len <= MAX_PACKET_SIZE,
(0..=MAX_PACKET_SIZE).contains(&packet_len),
"packet length of {packet_len} is out of bounds"
);
@ -353,7 +362,14 @@ impl PacketDecoder {
let mut r = &self.buf[self.cursor..];
match VarInt::decode_partial(&mut r) {
Ok(_) => Ok(true),
Ok(packet_len) => {
ensure!(
(0..=MAX_PACKET_SIZE).contains(&packet_len),
"packet length of {packet_len} is out of bounds"
);
Ok(r.len() >= packet_len as usize)
}
Err(VarIntDecodeError::Incomplete) => Ok(false),
Err(VarIntDecodeError::TooLarge) => bail!("malformed packet length VarInt"),
}

View file

@ -34,7 +34,7 @@ use crate::{nbt, Decode, Encode};
/// string is wrapped in `Ident` must return the same value.
///
/// [borrow]: std::borrow::Borrow::borrow
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone)]
pub struct Ident<S> {
string: S,
path_start: usize,
@ -120,6 +120,12 @@ impl<'a, S: ?Sized> Ident<&'a S> {
}
}
impl<S: fmt::Debug> fmt::Debug for Ident<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
self.string.fmt(f)
}
}
impl<'a> From<Ident<&'a str>> for Ident<String> {
fn from(value: Ident<&'a str>) -> Self {
value.to_owned_ident()

View file

@ -53,7 +53,13 @@ macro_rules! packet_enum {
}
fn encoded_len(&self) -> usize {
todo!()
match self {
$(
Self::$packet(pkt) => {
pkt.encoded_len()
}
)*
}
}
}
@ -81,6 +87,16 @@ macro_rules! packet_enum {
}
}
}
impl<$enum_life> std::fmt::Debug for $enum_name<$enum_life> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
$(
Self::$packet(pkt) => pkt.fmt(f),
)*
}
}
}
};
// No lifetime on the enum in this case.
(
@ -122,7 +138,13 @@ macro_rules! packet_enum {
}
fn encoded_len(&self) -> usize {
todo!()
match self {
$(
Self::$packet(pkt) => {
pkt.encoded_len()
}
)*
}
}
}
@ -150,6 +172,16 @@ macro_rules! packet_enum {
}
}
}
impl std::fmt::Debug for $enum_name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
$(
Self::$packet(pkt) => pkt.fmt(f),
)*
}
}
}
}
}

View file

@ -39,7 +39,7 @@ pub mod handshake {
}
packet_enum! {
#[derive(Clone, Debug)]
#[derive(Clone)]
C2sHandshakePacket<'a> {
Handshake<'a>
}
@ -60,7 +60,7 @@ pub mod status {
}
packet_enum! {
#[derive(Clone, Debug)]
#[derive(Clone)]
C2sStatusPacket {
StatusRequest,
PingRequest,
@ -94,7 +94,7 @@ pub mod login {
}
packet_enum! {
#[derive(Clone, Debug)]
#[derive(Clone)]
C2sLoginPacket<'a> {
LoginStart<'a>,
EncryptionResponse<'a>,
@ -495,7 +495,7 @@ pub mod play {
}
packet_enum! {
#[derive(Clone, Debug)]
#[derive(Clone)]
C2sPlayPacket<'a> {
ConfirmTeleport,
QueryBlockEntityTag,

View file

@ -34,7 +34,7 @@ pub mod status {
}
packet_enum! {
#[derive(Clone, Debug)]
#[derive(Clone)]
S2cStatusPacket<'a> {
StatusResponse<'a>,
PingResponse,
@ -82,7 +82,7 @@ pub mod login {
}
packet_enum! {
#[derive(Clone, Debug)]
#[derive(Clone)]
S2cLoginPacket<'a> {
DisconnectLogin,
EncryptionRequest<'a>,
@ -680,7 +680,7 @@ pub mod play {
}
packet_enum! {
#[derive(Clone, Debug)]
#[derive(Clone)]
S2cPlayPacket<'a> {
SpawnEntity,
SpawnExperienceOrb,