From fb9220bf586b3581396d553003ad9bb0d5dea762 Mon Sep 17 00:00:00 2001 From: Ryan Date: Fri, 5 Aug 2022 23:10:25 -0700 Subject: [PATCH] Implement outgoing packet buffering --- src/client.rs | 8 ++++-- src/protocol_inner.rs | 14 +++++---- src/protocol_inner/codec.rs | 57 +++++++++++++++++++++++++------------ src/server.rs | 37 ++++++++++++++++++++---- 4 files changed, 83 insertions(+), 33 deletions(-) diff --git a/src/client.rs b/src/client.rs index e76617c..aaa5bfc 100644 --- a/src/client.rs +++ b/src/client.rs @@ -36,7 +36,7 @@ use crate::protocol_inner::packets::s2c::play::{ Rotate, RotateAndMoveRelative, S2cPlayPacket, UnloadChunk, UpdateSubtitle, UpdateTitle, }; use crate::protocol_inner::{BoundedInt, ByteAngle, Nbt, RawBytes, VarInt}; -use crate::server::{C2sPacketChannels, NewClientData, SharedServer}; +use crate::server::{C2sPacketChannels, NewClientData, S2cPlayMessage, SharedServer}; use crate::slotmap::{Key, SlotMap}; use crate::text::Text; use crate::util::{chunks_in_view_distance, is_chunk_in_view_distance}; @@ -1275,12 +1275,14 @@ impl Client { self.player_data.clear_modifications(); self.old_position = self.position; self.bits.set_created_this_tick(false); + + send_packet(&mut self.send, S2cPlayMessage::Flush); } } -type SendOpt = Option>; +type SendOpt = Option>; -fn send_packet(send_opt: &mut SendOpt, pkt: impl Into) { +fn send_packet(send_opt: &mut SendOpt, pkt: impl Into) { if let Some(send) = send_opt { match send.try_send(pkt.into()) { Err(TrySendError::Full(_)) => { diff --git a/src/protocol_inner.rs b/src/protocol_inner.rs index 0061a2e..558cb8b 100644 --- a/src/protocol_inner.rs +++ b/src/protocol_inner.rs @@ -1,9 +1,3 @@ -mod byte_angle; -pub mod codec; -pub mod packets; -mod var_int; -mod var_long; - use std::io::{Read, Write}; use std::mem; @@ -21,8 +15,16 @@ use vek::{Vec2, Vec3, Vec4}; use crate::entity::EntityId; +mod byte_angle; +pub mod codec; +pub mod packets; +mod var_int; +mod var_long; + /// Types that can be written to the Minecraft protocol. pub trait Encode { + /// This function must be pure. In other words, consecutive calls to + /// `encode` must write the exact same sequence of bytes. fn encode(&self, w: &mut impl Write) -> anyhow::Result<()>; } diff --git a/src/protocol_inner/codec.rs b/src/protocol_inner/codec.rs index 62cfb6b..9cc736a 100644 --- a/src/protocol_inner/codec.rs +++ b/src/protocol_inner/codec.rs @@ -1,4 +1,5 @@ -/// Reading and writing whole packets. +//! Reading and writing whole packets. + use std::io::Read; use std::time::Duration; @@ -36,24 +37,22 @@ impl Encoder { } } - pub async fn write_packet(&mut self, packet: &impl EncodePacket) -> anyhow::Result<()> { - timeout(self.timeout, self.write_packet_impl(packet)).await? - } - - async fn write_packet_impl(&mut self, packet: &impl EncodePacket) -> anyhow::Result<()> { - self.buf.clear(); + /// Queues a packet to be written to the writer. + /// + /// To write all queued packets, call [`Self::flush`]. + pub fn queue_packet(&mut self, packet: &(impl EncodePacket + ?Sized)) -> anyhow::Result<()> { + let start_len = self.buf.len(); packet.encode_packet(&mut self.buf)?; - let data_len = self.buf.len(); + let data_len = self.buf.len() - start_len; ensure!(data_len <= i32::MAX as usize, "bad packet data length"); if let Some(threshold) = self.compression_threshold { if data_len >= threshold as usize { - let mut z = ZlibEncoder::new(self.buf.as_slice(), Compression::best()); + let mut z = ZlibEncoder::new(&self.buf[start_len..], Compression::best()); - self.compress_buf.clear(); z.read_to_end(&mut self.compress_buf)?; let data_len_len = VarInt(data_len as i32).written_size(); @@ -61,18 +60,21 @@ impl Encoder { ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length"); - self.buf.clear(); + self.buf.truncate(start_len); + VarInt(packet_len as i32).encode(&mut self.buf)?; VarInt(data_len as i32).encode(&mut self.buf)?; self.buf.extend_from_slice(&self.compress_buf); + self.compress_buf.clear(); } else { let packet_len = VarInt(0).written_size() + data_len; ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length"); - self.buf.clear(); + self.buf.truncate(start_len); + VarInt(packet_len as i32).encode(&mut self.buf)?; - VarInt(0).encode(&mut self.buf)?; + VarInt(0).encode(&mut self.buf)?; // 0 for no compression. packet.encode_packet(&mut self.buf)?; } } else { @@ -80,19 +82,38 @@ impl Encoder { ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length"); - self.buf.clear(); + self.buf.truncate(start_len); + VarInt(packet_len as i32).encode(&mut self.buf)?; packet.encode_packet(&mut self.buf)?; } - if let Some(cipher) = &mut self.cipher { - cipher.encrypt(&mut self.buf); + Ok(()) + } + + /// Writes all queued packets to the writer. + pub async fn flush(&mut self) -> anyhow::Result<()> { + if !self.buf.is_empty() { + if let Some(cipher) = &mut self.cipher { + cipher.encrypt(&mut self.buf); + } + + timeout(self.timeout, self.write.write_all(&self.buf)).await??; + self.buf.clear(); } - self.write.write_all(&self.buf).await?; Ok(()) } + /// Queue one packet and then flush the buffer. + pub async fn write_packet( + &mut self, + packet: &(impl EncodePacket + ?Sized), + ) -> anyhow::Result<()> { + self.queue_packet(packet)?; + self.flush().await + } + pub fn enable_encryption(&mut self, key: &[u8; 16]) { self.cipher = Some(NewCipher::new(key.into(), key.into())); } @@ -296,7 +317,7 @@ mod tests { } async fn send_test_packet(w: &mut Encoder) { - w.write_packet(&TestPacket { + w.queue_packet(&TestPacket { first: "abcdefghijklmnopqrstuvwxyz".into(), second: vec![0x1234, 0xabcd], third: 0x1122334455667788, diff --git a/src/server.rs b/src/server.rs index adf9213..e190a9f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -137,8 +137,23 @@ struct NewClientMessage { /// The result type returned from [`start_server`]. pub type ShutdownResult = Result<(), Box>; -pub(crate) type S2cPacketChannels = (Sender, Receiver); -pub(crate) type C2sPacketChannels = (Sender, Receiver); +pub(crate) type S2cPacketChannels = (Sender, Receiver); +pub(crate) type C2sPacketChannels = (Sender, Receiver); + +/// Messages sent to packet encoders. +#[derive(Clone, Debug)] +pub(crate) enum S2cPlayMessage { + /// Queue a play packet for sending. + Queue(S2cPlayPacket), + /// Instructs the encoder to flush all queued packets to the TCP stream. + Flush, +} + +impl> From

for S2cPlayMessage { + fn from(pkt: P) -> Self { + Self::Queue(pkt.into()) + } +} impl SharedServer { /// Gets a reference to the config object used to start the server. @@ -758,10 +773,20 @@ async fn handle_play( let Codec { mut enc, mut dec } = c; tokio::spawn(async move { - while let Ok(pkt) = packet_rx.recv_async().await { - if let Err(e) = enc.write_packet(&pkt).await { - log::debug!("error while sending play packet: {e:#}"); - break; + while let Ok(msg) = packet_rx.recv_async().await { + match msg { + S2cPlayMessage::Queue(pkt) => { + if let Err(e) = enc.queue_packet(&pkt) { + log::debug!("error while queueing play packet: {e:#}"); + break; + } + } + S2cPlayMessage::Flush => { + if let Err(e) = enc.flush().await { + log::debug!("error while flushing packet queue: {e:#}"); + break; + } + } } } });