Implement basic incoming packet buffering

There is still room for improvement because of one unnecessary layer of copying.
This commit is contained in:
Ryan 2022-08-06 03:53:39 -07:00
parent fb9220bf58
commit c1857e1603

View file

@ -1,4 +1,4 @@
//! Reading and writing whole packets. //! Reading and writing packets.
use std::io::Read; use std::io::Read;
use std::time::Duration; use std::time::Duration;
@ -10,7 +10,7 @@ use cfb8::Cfb8;
use flate2::bufread::{ZlibDecoder, ZlibEncoder}; use flate2::bufread::{ZlibDecoder, ZlibEncoder};
use flate2::Compression; use flate2::Compression;
use log::{log_enabled, Level}; use log::{log_enabled, Level};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::time::timeout; use tokio::time::timeout;
use super::packets::{DecodePacket, EncodePacket}; use super::packets::{DecodePacket, EncodePacket};
@ -128,7 +128,7 @@ impl<W: AsyncWrite + Unpin> Encoder<W> {
} }
pub struct Decoder<R> { pub struct Decoder<R> {
read: R, read: BufReader<R>,
buf: Vec<u8>, buf: Vec<u8>,
decompress_buf: Vec<u8>, decompress_buf: Vec<u8>,
compression_threshold: Option<u32>, compression_threshold: Option<u32>,
@ -139,7 +139,7 @@ pub struct Decoder<R> {
impl<R: AsyncRead + Unpin> Decoder<R> { impl<R: AsyncRead + Unpin> Decoder<R> {
pub fn new(read: R, timeout: Duration) -> Self { pub fn new(read: R, timeout: Duration) -> Self {
Self { Self {
read, read: BufReader::new(read),
buf: Vec::new(), buf: Vec::new(),
decompress_buf: Vec::new(), decompress_buf: Vec::new(),
compression_threshold: None, compression_threshold: None,
@ -181,7 +181,7 @@ impl<R: AsyncRead + Unpin> Decoder<R> {
// The length of the packet data once uncompressed (zero indicates no // The length of the packet data once uncompressed (zero indicates no
// compression). // compression).
let data_len = VarInt::decode(&mut packet_contents) let data_len = VarInt::decode(&mut packet_contents)
.context("reading data length (once uncompressed)")? .context("reading data length (once decompressed)")?
.0; .0;
ensure!( ensure!(
@ -193,13 +193,13 @@ impl<R: AsyncRead + Unpin> Decoder<R> {
let mut z = ZlibDecoder::new(&mut packet_contents); let mut z = ZlibDecoder::new(&mut packet_contents);
self.decompress_buf.resize(data_len as usize, 0); self.decompress_buf.resize(data_len as usize, 0);
z.read_exact(&mut self.decompress_buf) z.read_exact(&mut self.decompress_buf)
.context("uncompressing packet body")?; .context("decompressing packet body")?;
let mut uncompressed = self.decompress_buf.as_slice(); let mut decompressed = self.decompress_buf.as_slice();
let packet = P::decode_packet(&mut uncompressed) let packet = P::decode_packet(&mut decompressed)
.context("decoding packet after uncompressing")?; .context("decoding packet after decompressing")?;
ensure!( ensure!(
uncompressed.is_empty(), decompressed.is_empty(),
"packet contents were not read completely" "packet contents were not read completely"
); );
packet packet
@ -254,7 +254,7 @@ impl<R: AsyncRead + Unpin> Decoder<R> {
} }
pub fn into_inner(self) -> R { pub fn into_inner(self) -> R {
self.read self.read.into_inner()
} }
} }
@ -317,7 +317,7 @@ mod tests {
} }
async fn send_test_packet(w: &mut Encoder<TcpStream>) { async fn send_test_packet(w: &mut Encoder<TcpStream>) {
w.queue_packet(&TestPacket { w.write_packet(&TestPacket {
first: "abcdefghijklmnopqrstuvwxyz".into(), first: "abcdefghijklmnopqrstuvwxyz".into(),
second: vec![0x1234, 0xabcd], second: vec![0x1234, 0xabcd],
third: 0x1122334455667788, third: 0x1122334455667788,