Implement outgoing packet buffering

This commit is contained in:
Ryan 2022-08-05 23:10:25 -07:00
parent 49d63a39c0
commit fb9220bf58
4 changed files with 83 additions and 33 deletions

View file

@ -36,7 +36,7 @@ use crate::protocol_inner::packets::s2c::play::{
Rotate, RotateAndMoveRelative, S2cPlayPacket, UnloadChunk, UpdateSubtitle, UpdateTitle, Rotate, RotateAndMoveRelative, S2cPlayPacket, UnloadChunk, UpdateSubtitle, UpdateTitle,
}; };
use crate::protocol_inner::{BoundedInt, ByteAngle, Nbt, RawBytes, VarInt}; use crate::protocol_inner::{BoundedInt, ByteAngle, Nbt, RawBytes, VarInt};
use crate::server::{C2sPacketChannels, NewClientData, SharedServer}; use crate::server::{C2sPacketChannels, NewClientData, S2cPlayMessage, SharedServer};
use crate::slotmap::{Key, SlotMap}; use crate::slotmap::{Key, SlotMap};
use crate::text::Text; use crate::text::Text;
use crate::util::{chunks_in_view_distance, is_chunk_in_view_distance}; use crate::util::{chunks_in_view_distance, is_chunk_in_view_distance};
@ -1275,12 +1275,14 @@ impl<C: Config> Client<C> {
self.player_data.clear_modifications(); self.player_data.clear_modifications();
self.old_position = self.position; self.old_position = self.position;
self.bits.set_created_this_tick(false); self.bits.set_created_this_tick(false);
send_packet(&mut self.send, S2cPlayMessage::Flush);
} }
} }
type SendOpt = Option<Sender<S2cPlayPacket>>; type SendOpt = Option<Sender<S2cPlayMessage>>;
fn send_packet(send_opt: &mut SendOpt, pkt: impl Into<S2cPlayPacket>) { fn send_packet(send_opt: &mut SendOpt, pkt: impl Into<S2cPlayMessage>) {
if let Some(send) = send_opt { if let Some(send) = send_opt {
match send.try_send(pkt.into()) { match send.try_send(pkt.into()) {
Err(TrySendError::Full(_)) => { Err(TrySendError::Full(_)) => {

View file

@ -1,9 +1,3 @@
mod byte_angle;
pub mod codec;
pub mod packets;
mod var_int;
mod var_long;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::mem; use std::mem;
@ -21,8 +15,16 @@ use vek::{Vec2, Vec3, Vec4};
use crate::entity::EntityId; use crate::entity::EntityId;
mod byte_angle;
pub mod codec;
pub mod packets;
mod var_int;
mod var_long;
/// Types that can be written to the Minecraft protocol. /// Types that can be written to the Minecraft protocol.
pub trait Encode { pub trait Encode {
/// This function must be pure. In other words, consecutive calls to
/// `encode` must write the exact same sequence of bytes.
fn encode(&self, w: &mut impl Write) -> anyhow::Result<()>; fn encode(&self, w: &mut impl Write) -> anyhow::Result<()>;
} }

View file

@ -1,4 +1,5 @@
/// Reading and writing whole packets. //! Reading and writing whole packets.
use std::io::Read; use std::io::Read;
use std::time::Duration; use std::time::Duration;
@ -36,24 +37,22 @@ impl<W: AsyncWrite + Unpin> Encoder<W> {
} }
} }
pub async fn write_packet(&mut self, packet: &impl EncodePacket) -> anyhow::Result<()> { /// Queues a packet to be written to the writer.
timeout(self.timeout, self.write_packet_impl(packet)).await? ///
} /// To write all queued packets, call [`Self::flush`].
pub fn queue_packet(&mut self, packet: &(impl EncodePacket + ?Sized)) -> anyhow::Result<()> {
async fn write_packet_impl(&mut self, packet: &impl EncodePacket) -> anyhow::Result<()> { let start_len = self.buf.len();
self.buf.clear();
packet.encode_packet(&mut self.buf)?; packet.encode_packet(&mut self.buf)?;
let data_len = self.buf.len(); let data_len = self.buf.len() - start_len;
ensure!(data_len <= i32::MAX as usize, "bad packet data length"); ensure!(data_len <= i32::MAX as usize, "bad packet data length");
if let Some(threshold) = self.compression_threshold { if let Some(threshold) = self.compression_threshold {
if data_len >= threshold as usize { if data_len >= threshold as usize {
let mut z = ZlibEncoder::new(self.buf.as_slice(), Compression::best()); let mut z = ZlibEncoder::new(&self.buf[start_len..], Compression::best());
self.compress_buf.clear();
z.read_to_end(&mut self.compress_buf)?; z.read_to_end(&mut self.compress_buf)?;
let data_len_len = VarInt(data_len as i32).written_size(); let data_len_len = VarInt(data_len as i32).written_size();
@ -61,18 +60,21 @@ impl<W: AsyncWrite + Unpin> Encoder<W> {
ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length"); ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length");
self.buf.clear(); self.buf.truncate(start_len);
VarInt(packet_len as i32).encode(&mut self.buf)?; VarInt(packet_len as i32).encode(&mut self.buf)?;
VarInt(data_len as i32).encode(&mut self.buf)?; VarInt(data_len as i32).encode(&mut self.buf)?;
self.buf.extend_from_slice(&self.compress_buf); self.buf.extend_from_slice(&self.compress_buf);
self.compress_buf.clear();
} else { } else {
let packet_len = VarInt(0).written_size() + data_len; let packet_len = VarInt(0).written_size() + data_len;
ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length"); ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length");
self.buf.clear(); self.buf.truncate(start_len);
VarInt(packet_len as i32).encode(&mut self.buf)?; VarInt(packet_len as i32).encode(&mut self.buf)?;
VarInt(0).encode(&mut self.buf)?; VarInt(0).encode(&mut self.buf)?; // 0 for no compression.
packet.encode_packet(&mut self.buf)?; packet.encode_packet(&mut self.buf)?;
} }
} else { } else {
@ -80,19 +82,38 @@ impl<W: AsyncWrite + Unpin> Encoder<W> {
ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length"); ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length");
self.buf.clear(); self.buf.truncate(start_len);
VarInt(packet_len as i32).encode(&mut self.buf)?; VarInt(packet_len as i32).encode(&mut self.buf)?;
packet.encode_packet(&mut self.buf)?; packet.encode_packet(&mut self.buf)?;
} }
Ok(())
}
/// Writes all queued packets to the writer.
pub async fn flush(&mut self) -> anyhow::Result<()> {
if !self.buf.is_empty() {
if let Some(cipher) = &mut self.cipher { if let Some(cipher) = &mut self.cipher {
cipher.encrypt(&mut self.buf); cipher.encrypt(&mut self.buf);
} }
self.write.write_all(&self.buf).await?; timeout(self.timeout, self.write.write_all(&self.buf)).await??;
self.buf.clear();
}
Ok(()) Ok(())
} }
/// Queue one packet and then flush the buffer.
pub async fn write_packet(
&mut self,
packet: &(impl EncodePacket + ?Sized),
) -> anyhow::Result<()> {
self.queue_packet(packet)?;
self.flush().await
}
pub fn enable_encryption(&mut self, key: &[u8; 16]) { pub fn enable_encryption(&mut self, key: &[u8; 16]) {
self.cipher = Some(NewCipher::new(key.into(), key.into())); self.cipher = Some(NewCipher::new(key.into(), key.into()));
} }
@ -296,7 +317,7 @@ mod tests {
} }
async fn send_test_packet(w: &mut Encoder<TcpStream>) { async fn send_test_packet(w: &mut Encoder<TcpStream>) {
w.write_packet(&TestPacket { w.queue_packet(&TestPacket {
first: "abcdefghijklmnopqrstuvwxyz".into(), first: "abcdefghijklmnopqrstuvwxyz".into(),
second: vec![0x1234, 0xabcd], second: vec![0x1234, 0xabcd],
third: 0x1122334455667788, third: 0x1122334455667788,

View file

@ -137,8 +137,23 @@ struct NewClientMessage {
/// The result type returned from [`start_server`]. /// The result type returned from [`start_server`].
pub type ShutdownResult = Result<(), Box<dyn Error + Send + Sync + 'static>>; pub type ShutdownResult = Result<(), Box<dyn Error + Send + Sync + 'static>>;
pub(crate) type S2cPacketChannels = (Sender<C2sPlayPacket>, Receiver<S2cPlayPacket>); pub(crate) type S2cPacketChannels = (Sender<C2sPlayPacket>, Receiver<S2cPlayMessage>);
pub(crate) type C2sPacketChannels = (Sender<S2cPlayPacket>, Receiver<C2sPlayPacket>); pub(crate) type C2sPacketChannels = (Sender<S2cPlayMessage>, Receiver<C2sPlayPacket>);
/// Messages sent to packet encoders.
#[derive(Clone, Debug)]
pub(crate) enum S2cPlayMessage {
/// Queue a play packet for sending.
Queue(S2cPlayPacket),
/// Instructs the encoder to flush all queued packets to the TCP stream.
Flush,
}
impl<P: Into<S2cPlayPacket>> From<P> for S2cPlayMessage {
fn from(pkt: P) -> Self {
Self::Queue(pkt.into())
}
}
impl<C: Config> SharedServer<C> { impl<C: Config> SharedServer<C> {
/// Gets a reference to the config object used to start the server. /// Gets a reference to the config object used to start the server.
@ -758,12 +773,22 @@ async fn handle_play<C: Config>(
let Codec { mut enc, mut dec } = c; let Codec { mut enc, mut dec } = c;
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(pkt) = packet_rx.recv_async().await { while let Ok(msg) = packet_rx.recv_async().await {
if let Err(e) = enc.write_packet(&pkt).await { match msg {
log::debug!("error while sending play packet: {e:#}"); S2cPlayMessage::Queue(pkt) => {
if let Err(e) = enc.queue_packet(&pkt) {
log::debug!("error while queueing play packet: {e:#}");
break; break;
} }
} }
S2cPlayMessage::Flush => {
if let Err(e) = enc.flush().await {
log::debug!("error while flushing packet queue: {e:#}");
break;
}
}
}
}
}); });
loop { loop {