From 740778ec41e51db715fcf742d5e3d3e8ca2fb871 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Tue, 13 Dec 2022 22:56:22 -0800 Subject: [PATCH] Update chunk data packet cache lazily (#170) Previously, the chunk data cache was regenerated at the end of each tick every time a block or biome was modified in a chunk. This causes performance problems in situations where chunks are being modified every tick, like the conway example or redstone machines. This changes things so that the chunk data packet is regenerated only when a client actually needs to load the chunk. This isn't perfect because the regeneration cannot happen in parallel, but the benefits outweigh the costs. (The chunk data packet _is_ generated in parallel when the chunk is first created. There is also some parallelism when clients are loading different chunks. The packet cache is guarded by a `Mutex`.) This has revealed that compression is surprisingly slow. This should be investigated later. --- src/chunk.rs | 228 +++++++++++++++++----------- src/client.rs | 21 ++- src/packet.rs | 6 +- src/player_list.rs | 10 +- src/server.rs | 9 +- src/world.rs | 7 +- valence_protocol/src/array.rs | 45 ++++++ valence_protocol/src/cache.rs | 144 ------------------ valence_protocol/src/lib.rs | 4 +- valence_protocol/src/packets/s2c.rs | 9 +- valence_protocol/src/var_int.rs | 1 + 11 files changed, 234 insertions(+), 250 deletions(-) create mode 100644 valence_protocol/src/array.rs delete mode 100644 valence_protocol/src/cache.rs diff --git a/src/chunk.rs b/src/chunk.rs index f735ab4..29711cc 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -11,6 +11,7 @@ use std::io::Write; use std::iter::FusedIterator; use std::mem; use std::ops::{Deref, DerefMut, Index, IndexMut}; +use std::sync::{Mutex, MutexGuard}; use entity_partition::PartitionCell; use paletted_container::PalettedContainer; @@ -21,7 +22,7 @@ use valence_nbt::compound; use valence_protocol::packets::s2c::play::{ BlockUpdate, ChunkDataAndUpdateLightEncode, UpdateSectionBlocksEncode, }; -use valence_protocol::{BlockPos, BlockState, Encode, VarInt, VarLong}; +use valence_protocol::{BlockPos, BlockState, Encode, LengthPrefixedArray, VarInt, VarLong}; use crate::biome::BiomeId; use crate::config::Config; @@ -40,14 +41,21 @@ pub struct Chunks { chunks: FxHashMap>, PartitionCell)>, dimension_height: i32, dimension_min_y: i32, - dummy_sky_light_mask: Box<[u64]>, - /// Sending sky light bits full of ones causes the vanilla client to lag + filler_sky_light_mask: Box<[u64]>, + /// Sending filler light data causes the vanilla client to lag /// less. Hopefully we can remove this in the future. - dummy_sky_light_arrays: Box<[(VarInt, [u8; 2048])]>, + filler_sky_light_arrays: Box<[LengthPrefixedArray]>, + biome_registry_len: usize, + compression_threshold: Option, } impl Chunks { - pub(crate) fn new(dimension_height: i32, dimension_min_y: i32) -> Self { + pub(crate) fn new( + dimension_height: i32, + dimension_min_y: i32, + biome_registry_len: usize, + compression_threshold: Option, + ) -> Self { let section_count = (dimension_height / 16 + 2) as usize; let mut sky_light_mask = vec![0; num::Integer::div_ceil(§ion_count, &16)]; @@ -60,8 +68,10 @@ impl Chunks { chunks: FxHashMap::default(), dimension_height, dimension_min_y, - dummy_sky_light_mask: sky_light_mask.into(), - dummy_sky_light_arrays: vec![(VarInt(2048), [0xff; 2048]); section_count].into(), + filler_sky_light_mask: sky_light_mask.into(), + filler_sky_light_arrays: vec![LengthPrefixedArray([0xff; 2048]); section_count].into(), + biome_registry_len, + compression_threshold, } } @@ -259,15 +269,12 @@ impl Chunks { ) } - pub(crate) fn update_caches( - &mut self, - compression_threshold: Option, - biome_registry_len: usize, - ) { + pub(crate) fn update_caches(&mut self) { let min_y = self.dimension_min_y; self.chunks.par_iter_mut().for_each(|(&pos, (chunk, _))| { let Some(chunk) = chunk else { + // There is no chunk at this position. return; }; @@ -310,11 +317,11 @@ impl Chunks { let global_y = sect_y as i32 * 16 + (idx / (16 * 16)) as i32 + min_y; let global_z = pos.z * 16 + (idx / 16 % 16) as i32; - let mut writer = PacketWriter { - writer: &mut chunk.cached_update_packets, - threshold: compression_threshold, - scratch: &mut compression_scratch, - }; + let mut writer = PacketWriter::new( + &mut chunk.cached_update_packets, + self.compression_threshold, + &mut compression_scratch, + ); writer .write_packet(&BlockUpdate { @@ -345,11 +352,11 @@ impl Chunks { | (pos.z as i64 & 0x3fffff) << 20 | (sect_y as i64 + min_y.div_euclid(16) as i64) & 0xfffff; - let mut writer = PacketWriter { - writer: &mut chunk.cached_update_packets, - threshold: compression_threshold, - scratch: &mut compression_scratch, - }; + let mut writer = PacketWriter::new( + &mut chunk.cached_update_packets, + self.compression_threshold, + &mut compression_scratch, + ); writer .write_packet(&UpdateSectionBlocksEncode { @@ -367,66 +374,26 @@ impl Chunks { } } - // Regenerate the chunk data packet if the cache was invalidated. - if any_blocks_modified || chunk.any_biomes_modified || chunk.created_this_tick { - // TODO: build chunk data packet cache lazily. - + // Clear the cache if the cache was invalidated. + if any_blocks_modified || chunk.any_biomes_modified { chunk.any_biomes_modified = false; - chunk.cached_init_packet.clear(); - - let mut scratch = vec![]; - - for sect in chunk.sections.iter() { - sect.non_air_count.encode(&mut scratch).unwrap(); - - sect.block_states - .encode_mc_format( - &mut scratch, - |b| b.to_raw().into(), - 4, - 8, - bit_width(BlockState::max_raw().into()), - ) - .unwrap(); - - sect.biomes - .encode_mc_format( - &mut scratch, - |b| b.0.into(), - 0, - 3, - bit_width(biome_registry_len - 1), - ) - .unwrap(); - } - - let mut writer = PacketWriter { - writer: &mut chunk.cached_init_packet, - threshold: compression_threshold, - scratch: &mut compression_scratch, - }; - - writer - .write_packet(&ChunkDataAndUpdateLightEncode { - chunk_x: pos.x, - chunk_z: pos.z, - heightmaps: &compound! { - // TODO: MOTION_BLOCKING heightmap - }, - blocks_and_biomes: &scratch, - block_entities: &[], - trust_edges: true, - sky_light_mask: &self.dummy_sky_light_mask, - block_light_mask: &[], - empty_sky_light_mask: &[], - empty_block_light_mask: &[], - sky_light_arrays: &self.dummy_sky_light_arrays, - block_light_arrays: &[], - }) - .unwrap(); + chunk.cached_init_packet.get_mut().unwrap().clear(); } - debug_assert!(!chunk.cached_init_packet.is_empty()); + // Initialize the chunk data cache on new chunks here so this work can be done + // in parallel. + if chunk.created_this_tick() { + debug_assert!(chunk.cached_init_packet.get_mut().unwrap().is_empty()); + + let _unused: MutexGuard<_> = chunk.get_chunk_data_packet( + &mut compression_scratch, + pos, + self.biome_registry_len, + &self.filler_sky_light_mask, + &self.filler_sky_light_arrays, + self.compression_threshold, + ); + } }); } @@ -692,13 +659,15 @@ pub struct LoadedChunk { pub state: C::ChunkState, sections: Box<[ChunkSection]>, // TODO: block_entities: BTreeMap, - // TODO: rebuild init packet lazily? - cached_init_packet: Vec, + cached_init_packet: Mutex>, cached_update_packets: Vec, /// If any of the biomes in this chunk were modified this tick. any_biomes_modified: bool, created_this_tick: bool, deleted: bool, + /// For debugging purposes. + #[cfg(debug_assertions)] + uuid: uuid::Uuid, } impl Deref for LoadedChunk { @@ -716,7 +685,7 @@ impl DerefMut for LoadedChunk { } /// A 16x16x16 meter volume of blocks, biomes, and light in a chunk. -#[derive(Clone)] +#[derive(Clone, Debug)] struct ChunkSection { block_states: PalettedContainer, /// Contains a set bit for every block that has been modified in this @@ -763,11 +732,13 @@ impl LoadedChunk { Self { state, sections: chunk.sections.into(), - cached_init_packet: vec![], + cached_init_packet: Mutex::new(vec![]), cached_update_packets: vec![], any_biomes_modified: false, created_this_tick: true, deleted: false, + #[cfg(debug_assertions)] + uuid: uuid::Uuid::from_u128(rand::random()), } } @@ -795,13 +766,98 @@ impl LoadedChunk { } /// Queues the chunk data packet for this chunk with the given position. - /// /// This will initialize the chunk for the client. pub(crate) fn write_chunk_data_packet( &self, mut writer: impl WritePacket, + scratch: &mut Vec, + pos: ChunkPos, + chunks: &Chunks, ) -> anyhow::Result<()> { - writer.write_bytes(&self.cached_init_packet) + #[cfg(debug_assertions)] + assert_eq!( + chunks[pos].uuid, self.uuid, + "chunks and/or position arguments are incorrect" + ); + + let bytes = self.get_chunk_data_packet( + scratch, + pos, + chunks.biome_registry_len, + &chunks.filler_sky_light_mask, + &chunks.filler_sky_light_arrays, + chunks.compression_threshold, + ); + + writer.write_bytes(&bytes) + } + + /// Gets the bytes of the cached chunk data packet, initializing the cache + /// if it is empty. + fn get_chunk_data_packet( + &self, + scratch: &mut Vec, + pos: ChunkPos, + biome_registry_len: usize, + filler_sky_light_mask: &[u64], + filler_sky_light_arrays: &[LengthPrefixedArray], + compression_threshold: Option, + ) -> MutexGuard> { + let mut lck = self.cached_init_packet.lock().unwrap(); + + if lck.is_empty() { + scratch.clear(); + + for sect in self.sections.iter() { + sect.non_air_count.encode(&mut *scratch).unwrap(); + + sect.block_states + .encode_mc_format( + &mut *scratch, + |b| b.to_raw().into(), + 4, + 8, + bit_width(BlockState::max_raw().into()), + ) + .unwrap(); + + sect.biomes + .encode_mc_format( + &mut *scratch, + |b| b.0.into(), + 0, + 3, + bit_width(biome_registry_len - 1), + ) + .unwrap(); + } + + let mut compression_scratch = vec![]; + + let mut writer = + PacketWriter::new(&mut *lck, compression_threshold, &mut compression_scratch); + + writer + .write_packet(&ChunkDataAndUpdateLightEncode { + chunk_x: pos.x, + chunk_z: pos.z, + heightmaps: &compound! { + // TODO: MOTION_BLOCKING heightmap + }, + blocks_and_biomes: scratch, + block_entities: &[], + trust_edges: true, + sky_light_mask: filler_sky_light_mask, + block_light_mask: &[], + empty_sky_light_mask: &[], + empty_block_light_mask: &[], + sky_light_arrays: filler_sky_light_arrays, + block_light_arrays: &[], + }) + .unwrap(); + } + + lck } /// Queues block change packets for this chunk. @@ -922,7 +978,7 @@ impl Chunk for LoadedChunk { sect.biomes.optimize(); } - self.cached_init_packet.shrink_to_fit(); + self.cached_init_packet.get_mut().unwrap().shrink_to_fit(); self.cached_update_packets.shrink_to_fit(); } } diff --git a/src/client.rs b/src/client.rs index 07e7a64..1092fe1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1129,7 +1129,12 @@ impl Client { } (true, false) => { // Chunk needs initialization. Send packet to load it. - chunk.write_chunk_data_packet(&mut *send)?; + chunk.write_chunk_data_packet( + &mut *send, + &mut self.scratch, + pos, + &old_world.chunks, + )?; // Don't assert that the chunk is already loaded in this case. // Chunks are allowed to be overwritten and their "created this @@ -1258,7 +1263,12 @@ impl Client { if let Some((chunk, cell)) = world.chunks.chunk_and_cell(pos) { if let Some(chunk) = chunk { if !chunk.deleted() { - chunk.write_chunk_data_packet(&mut *send)?; + chunk.write_chunk_data_packet( + &mut *send, + &mut self.scratch, + pos, + &world.chunks, + )?; #[cfg(debug_assertions)] assert!(self.loaded_chunks.insert(pos)); @@ -1327,7 +1337,12 @@ impl Client { if let Some((chunk, cell)) = world.chunks.chunk_and_cell(pos) { if let Some(chunk) = chunk { if !chunk.deleted() { - chunk.write_chunk_data_packet(&mut *send)?; + chunk.write_chunk_data_packet( + &mut *send, + &mut self.scratch, + pos, + &world.chunks, + )?; #[cfg(debug_assertions)] assert!(self.loaded_chunks.insert(pos)); diff --git a/src/packet.rs b/src/packet.rs index 57bef43..c89c4f8 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -24,9 +24,9 @@ impl WritePacket for &mut W { } pub struct PacketWriter<'a, W> { - pub writer: W, - pub threshold: Option, - pub scratch: &'a mut Vec, + writer: W, + threshold: Option, + scratch: &'a mut Vec, } impl<'a, W: Write> PacketWriter<'a, W> { diff --git a/src/player_list.rs b/src/player_list.rs index 8b7710b..3b0f787 100644 --- a/src/player_list.rs +++ b/src/player_list.rs @@ -80,11 +80,11 @@ impl PlayerLists { for pl in self.slab.iter_mut() { pl.cached_update_packets.clear(); - let mut writer = PacketWriter { - writer: &mut pl.cached_update_packets, - threshold: compression_threshold, - scratch: &mut scratch, - }; + let mut writer = PacketWriter::new( + &mut pl.cached_update_packets, + compression_threshold, + &mut scratch, + ); if !pl.removed.is_empty() { writer diff --git a/src/server.rs b/src/server.rs index 59691af..c09588d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -200,6 +200,12 @@ impl SharedServer { &self.0.connection_mode } + /// Gets the compression threshold for packets. `None` indicates no + /// compression. + pub fn compression_threshold(&self) -> Option { + self.0.compression_threshold + } + /// Gets the maximum number of connections allowed to the server at once. pub fn max_connections(&self) -> usize { self.0.max_connections @@ -430,7 +436,6 @@ fn do_update_loop(server: &mut Server) -> ShutdownResult { let mut tick_start = Instant::now(); let shared = server.shared.clone(); - let biome_registry_len = shared.0.biomes.len(); let threshold = shared.0.compression_threshold; loop { @@ -471,7 +476,7 @@ fn do_update_loop(server: &mut Server) -> ShutdownResult { update_entity_partition(&mut server.entities, &mut server.worlds, threshold); for (_, world) in server.worlds.iter_mut() { - world.chunks.update_caches(threshold, biome_registry_len); + world.chunks.update_caches(); } server.player_lists.update_caches(threshold); diff --git a/src/world.rs b/src/world.rs index 6c6be22..35c73b8 100644 --- a/src/world.rs +++ b/src/world.rs @@ -53,7 +53,12 @@ impl Worlds { let (id, world) = self.slab.insert(World { state, - chunks: Chunks::new(dim.height, dim.min_y), + chunks: Chunks::new( + dim.height, + dim.min_y, + self.shared.biomes().len(), + self.shared.compression_threshold(), + ), dimension, deleted: false, }); diff --git a/valence_protocol/src/array.rs b/valence_protocol/src/array.rs new file mode 100644 index 0000000..85d56d6 --- /dev/null +++ b/valence_protocol/src/array.rs @@ -0,0 +1,45 @@ +use std::io::Write; + +use anyhow::ensure; + +use crate::{Decode, Encode, VarInt}; + +/// A fixed-size array encoded and decoded with a [`VarInt`] length prefix. +/// +/// This is used when the length of the array is known statically, but a +/// length prefix is needed anyway. +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +#[repr(transparent)] +pub struct LengthPrefixedArray(pub [T; N]); + +impl Encode for LengthPrefixedArray { + fn encode(&self, mut w: impl Write) -> anyhow::Result<()> { + VarInt(N as i32).encode(&mut w)?; + self.0.encode(w) + } + + fn encoded_len(&self) -> usize { + VarInt(N as i32).encoded_len() + self.0.encoded_len() + } +} + +impl<'a, T: Decode<'a>, const N: usize> Decode<'a> for LengthPrefixedArray { + fn decode(r: &mut &'a [u8]) -> anyhow::Result { + let len = VarInt::decode(r)?.0; + ensure!(len == N as i32, "unexpected array length of {len}"); + + <[T; N]>::decode(r).map(LengthPrefixedArray) + } +} + +impl From<[T; N]> for LengthPrefixedArray { + fn from(value: [T; N]) -> Self { + Self(value) + } +} + +impl From> for [T; N] { + fn from(value: LengthPrefixedArray) -> Self { + value.0 + } +} diff --git a/valence_protocol/src/cache.rs b/valence_protocol/src/cache.rs deleted file mode 100644 index 67e3d22..0000000 --- a/valence_protocol/src/cache.rs +++ /dev/null @@ -1,144 +0,0 @@ -use std::io::Write; -use std::marker::PhantomData; -use std::mem; - -use anyhow::anyhow; - -use crate::{Decode, Encode, Result}; - -/// Contains both a value of `T` and an [`EncodedBuf`] to ensure the -/// buffer is updated when `T` is modified[^note]. -/// -/// The `Encode` implementation for `Cached` encodes only the contained -/// `EncodedBuf`. -/// -/// Use this type when you want `Encode` to be cached but you also need to read -/// from the value of `T`. If the value of `T` is write-only, consider using -/// `EncodedBuf` instead. -/// -/// [`EncodedBuf`]: EncodedBuf -/// [^note]: Assuming `T` does not use internal mutability. -#[derive(Debug)] -pub struct Cached { - val: T, - buf: EncodedBuf, -} - -impl Cached { - pub fn new(val: T) -> Self { - let buf = EncodedBuf::new(&val); - - Self { val, buf } - } - - pub fn get(&self) -> &T { - &self.val - } - - pub fn buf(&self) -> &EncodedBuf { - &self.buf - } - - /// Provides a mutable reference to the contained `T` for modification. The - /// buffer is re-encoded when the closure returns. - pub fn modify(&mut self, f: impl FnOnce(&mut T) -> U) -> U { - let u = f(&mut self.val); - self.buf.set(&self.val); - u - } - - pub fn replace(&mut self, new: T) -> T { - self.modify(|old| mem::replace(old, new)) - } - - pub fn into_inner(self) -> (T, EncodedBuf) { - (self.val, self.buf) - } -} - -impl Encode for Cached -where - T: Encode, -{ - fn encode(&self, w: impl Write) -> Result<()> { - self.buf.encode(w) - } -} - -/// The `Decode` implementation for `Cached` exists for the sake of -/// completeness, but you probably shouldn't need to use it. -impl<'a, T> Decode<'a> for Cached -where - T: Encode + Decode<'a>, -{ - fn decode(r: &mut &'a [u8]) -> Result { - let val = T::decode(r)?; - Ok(Self::new(val)) - } -} - -/// Caches the result of `T`'s [`Encode`] implementation into an owned buffer. -/// -/// This is useful for types with expensive [`Encode`] implementations such as -/// [`Text`] or [`Compound`]. It has little to no benefit for primitive types -/// such as `i32`, `VarInt`, `&str`, `&[u8]`, etc. -/// -/// # Examples -/// -/// ``` -/// use valence_protocol::{Encode, EncodedBuf}; -/// -/// let mut buf1 = vec![]; -/// let mut buf2 = vec![]; -/// -/// "hello".encode(&mut buf1).unwrap(); -/// -/// let cache = EncodedBuf::new("hello"); -/// cache.encode(&mut buf2).unwrap(); -/// -/// assert_eq!(buf1, buf2); -/// ``` -/// -/// [`Text`]: crate::text::Text -/// [`Compound`]: valence_nbt::Compound -#[derive(Debug)] -pub struct EncodedBuf { - buf: Vec, - res: Result<()>, - _marker: PhantomData T>, -} - -impl EncodedBuf { - pub fn new(t: &T) -> Self { - let mut buf = vec![]; - let res = t.encode(&mut buf); - - Self { - buf, - res, - _marker: PhantomData, - } - } - - pub fn set(&mut self, t: &T) { - self.buf.clear(); - self.res = t.encode(&mut self.buf); - } - - pub fn into_inner(self) -> Result> { - self.res.map(|()| self.buf) - } -} - -impl Encode for EncodedBuf { - fn encode(&self, mut w: impl Write) -> Result<()> { - match &self.res { - Ok(()) => Ok(w.write_all(&self.buf)?), - Err(e) => Err(anyhow!("{e:#}")), - } - } - - fn encoded_len(&self) -> usize { - self.buf.len() - } -} diff --git a/valence_protocol/src/lib.rs b/valence_protocol/src/lib.rs index 72588a2..8e0b26a 100644 --- a/valence_protocol/src/lib.rs +++ b/valence_protocol/src/lib.rs @@ -72,10 +72,10 @@ extern crate self as valence_protocol; use std::io::Write; pub use anyhow::{Error, Result}; +pub use array::LengthPrefixedArray; pub use block::{BlockFace, BlockKind, BlockState}; pub use block_pos::BlockPos; pub use byte_angle::ByteAngle; -pub use cache::{Cached, EncodedBuf}; pub use codec::*; pub use ident::Ident; pub use inventory::InventoryKind; @@ -98,12 +98,12 @@ pub const PROTOCOL_VERSION: i32 = 760; /// targets. pub const MINECRAFT_VERSION: &str = "1.19.2"; +mod array; pub mod block; mod block_pos; mod bounded; mod byte_angle; mod byte_counter; -mod cache; mod codec; pub mod enchant; pub mod entity_meta; diff --git a/valence_protocol/src/packets/s2c.rs b/valence_protocol/src/packets/s2c.rs index f545a37..358e3f1 100644 --- a/valence_protocol/src/packets/s2c.rs +++ b/valence_protocol/src/packets/s2c.rs @@ -16,6 +16,7 @@ use crate::types::{ use crate::username::Username; use crate::var_int::VarInt; use crate::var_long::VarLong; +use crate::LengthPrefixedArray; pub mod status { use super::*; @@ -313,8 +314,8 @@ pub mod play { pub block_light_mask: Vec, pub empty_sky_light_mask: Vec, pub empty_block_light_mask: Vec, - pub sky_light_arrays: Vec<(VarInt, [u8; 2048])>, - pub block_light_arrays: Vec<(VarInt, [u8; 2048])>, + pub sky_light_arrays: Vec>, + pub block_light_arrays: Vec>, } #[derive(Clone, Debug, Encode, Packet)] @@ -330,8 +331,8 @@ pub mod play { pub block_light_mask: &'a [u64], pub empty_sky_light_mask: &'a [u64], pub empty_block_light_mask: &'a [u64], - pub sky_light_arrays: &'a [(VarInt, [u8; 2048])], - pub block_light_arrays: &'a [(VarInt, [u8; 2048])], + pub sky_light_arrays: &'a [LengthPrefixedArray], + pub block_light_arrays: &'a [LengthPrefixedArray], } #[derive(Copy, Clone, Debug, Encode, Decode, Packet)] diff --git a/valence_protocol/src/var_int.rs b/valence_protocol/src/var_int.rs index 3489d8c..16b2539 100644 --- a/valence_protocol/src/var_int.rs +++ b/valence_protocol/src/var_int.rs @@ -50,6 +50,7 @@ impl Encode for VarInt { } } + #[inline] fn encoded_len(&self) -> usize { match self.0 { 0 => 1,