From c1857e1603242026d61230417b57ea1a7be3dbdf Mon Sep 17 00:00:00 2001 From: Ryan Date: Sat, 6 Aug 2022 03:53:39 -0700 Subject: [PATCH] Implement basic incoming packet buffering There is still room for improvement because of one unnecessary layer of copying. --- src/protocol_inner/codec.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/protocol_inner/codec.rs b/src/protocol_inner/codec.rs index 9cc736a..2fb4d72 100644 --- a/src/protocol_inner/codec.rs +++ b/src/protocol_inner/codec.rs @@ -1,4 +1,4 @@ -//! Reading and writing whole packets. +//! Reading and writing packets. use std::io::Read; use std::time::Duration; @@ -10,7 +10,7 @@ use cfb8::Cfb8; use flate2::bufread::{ZlibDecoder, ZlibEncoder}; use flate2::Compression; use log::{log_enabled, Level}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::time::timeout; use super::packets::{DecodePacket, EncodePacket}; @@ -128,7 +128,7 @@ impl Encoder { } pub struct Decoder { - read: R, + read: BufReader, buf: Vec, decompress_buf: Vec, compression_threshold: Option, @@ -139,7 +139,7 @@ pub struct Decoder { impl Decoder { pub fn new(read: R, timeout: Duration) -> Self { Self { - read, + read: BufReader::new(read), buf: Vec::new(), decompress_buf: Vec::new(), compression_threshold: None, @@ -181,7 +181,7 @@ impl Decoder { // The length of the packet data once uncompressed (zero indicates no // compression). let data_len = VarInt::decode(&mut packet_contents) - .context("reading data length (once uncompressed)")? + .context("reading data length (once decompressed)")? .0; ensure!( @@ -193,13 +193,13 @@ impl Decoder { let mut z = ZlibDecoder::new(&mut packet_contents); self.decompress_buf.resize(data_len as usize, 0); z.read_exact(&mut self.decompress_buf) - .context("uncompressing packet body")?; + .context("decompressing packet body")?; - let mut uncompressed = self.decompress_buf.as_slice(); - let packet = P::decode_packet(&mut uncompressed) - .context("decoding packet after uncompressing")?; + let mut decompressed = self.decompress_buf.as_slice(); + let packet = P::decode_packet(&mut decompressed) + .context("decoding packet after decompressing")?; ensure!( - uncompressed.is_empty(), + decompressed.is_empty(), "packet contents were not read completely" ); packet @@ -254,7 +254,7 @@ impl Decoder { } pub fn into_inner(self) -> R { - self.read + self.read.into_inner() } } @@ -317,7 +317,7 @@ mod tests { } async fn send_test_packet(w: &mut Encoder) { - w.queue_packet(&TestPacket { + w.write_packet(&TestPacket { first: "abcdefghijklmnopqrstuvwxyz".into(), second: vec![0x1234, 0xabcd], third: 0x1122334455667788,