diff --git a/src/chunk.rs b/src/chunk.rs index f735ab4..29711cc 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -11,6 +11,7 @@ use std::io::Write; use std::iter::FusedIterator; use std::mem; use std::ops::{Deref, DerefMut, Index, IndexMut}; +use std::sync::{Mutex, MutexGuard}; use entity_partition::PartitionCell; use paletted_container::PalettedContainer; @@ -21,7 +22,7 @@ use valence_nbt::compound; use valence_protocol::packets::s2c::play::{ BlockUpdate, ChunkDataAndUpdateLightEncode, UpdateSectionBlocksEncode, }; -use valence_protocol::{BlockPos, BlockState, Encode, VarInt, VarLong}; +use valence_protocol::{BlockPos, BlockState, Encode, LengthPrefixedArray, VarInt, VarLong}; use crate::biome::BiomeId; use crate::config::Config; @@ -40,14 +41,21 @@ pub struct Chunks { chunks: FxHashMap>, PartitionCell)>, dimension_height: i32, dimension_min_y: i32, - dummy_sky_light_mask: Box<[u64]>, - /// Sending sky light bits full of ones causes the vanilla client to lag + filler_sky_light_mask: Box<[u64]>, + /// Sending filler light data causes the vanilla client to lag /// less. Hopefully we can remove this in the future. - dummy_sky_light_arrays: Box<[(VarInt, [u8; 2048])]>, + filler_sky_light_arrays: Box<[LengthPrefixedArray]>, + biome_registry_len: usize, + compression_threshold: Option, } impl Chunks { - pub(crate) fn new(dimension_height: i32, dimension_min_y: i32) -> Self { + pub(crate) fn new( + dimension_height: i32, + dimension_min_y: i32, + biome_registry_len: usize, + compression_threshold: Option, + ) -> Self { let section_count = (dimension_height / 16 + 2) as usize; let mut sky_light_mask = vec![0; num::Integer::div_ceil(§ion_count, &16)]; @@ -60,8 +68,10 @@ impl Chunks { chunks: FxHashMap::default(), dimension_height, dimension_min_y, - dummy_sky_light_mask: sky_light_mask.into(), - dummy_sky_light_arrays: vec![(VarInt(2048), [0xff; 2048]); section_count].into(), + filler_sky_light_mask: sky_light_mask.into(), + filler_sky_light_arrays: vec![LengthPrefixedArray([0xff; 2048]); section_count].into(), + biome_registry_len, + compression_threshold, } } @@ -259,15 +269,12 @@ impl Chunks { ) } - pub(crate) fn update_caches( - &mut self, - compression_threshold: Option, - biome_registry_len: usize, - ) { + pub(crate) fn update_caches(&mut self) { let min_y = self.dimension_min_y; self.chunks.par_iter_mut().for_each(|(&pos, (chunk, _))| { let Some(chunk) = chunk else { + // There is no chunk at this position. return; }; @@ -310,11 +317,11 @@ impl Chunks { let global_y = sect_y as i32 * 16 + (idx / (16 * 16)) as i32 + min_y; let global_z = pos.z * 16 + (idx / 16 % 16) as i32; - let mut writer = PacketWriter { - writer: &mut chunk.cached_update_packets, - threshold: compression_threshold, - scratch: &mut compression_scratch, - }; + let mut writer = PacketWriter::new( + &mut chunk.cached_update_packets, + self.compression_threshold, + &mut compression_scratch, + ); writer .write_packet(&BlockUpdate { @@ -345,11 +352,11 @@ impl Chunks { | (pos.z as i64 & 0x3fffff) << 20 | (sect_y as i64 + min_y.div_euclid(16) as i64) & 0xfffff; - let mut writer = PacketWriter { - writer: &mut chunk.cached_update_packets, - threshold: compression_threshold, - scratch: &mut compression_scratch, - }; + let mut writer = PacketWriter::new( + &mut chunk.cached_update_packets, + self.compression_threshold, + &mut compression_scratch, + ); writer .write_packet(&UpdateSectionBlocksEncode { @@ -367,66 +374,26 @@ impl Chunks { } } - // Regenerate the chunk data packet if the cache was invalidated. - if any_blocks_modified || chunk.any_biomes_modified || chunk.created_this_tick { - // TODO: build chunk data packet cache lazily. - + // Clear the cache if the cache was invalidated. + if any_blocks_modified || chunk.any_biomes_modified { chunk.any_biomes_modified = false; - chunk.cached_init_packet.clear(); - - let mut scratch = vec![]; - - for sect in chunk.sections.iter() { - sect.non_air_count.encode(&mut scratch).unwrap(); - - sect.block_states - .encode_mc_format( - &mut scratch, - |b| b.to_raw().into(), - 4, - 8, - bit_width(BlockState::max_raw().into()), - ) - .unwrap(); - - sect.biomes - .encode_mc_format( - &mut scratch, - |b| b.0.into(), - 0, - 3, - bit_width(biome_registry_len - 1), - ) - .unwrap(); - } - - let mut writer = PacketWriter { - writer: &mut chunk.cached_init_packet, - threshold: compression_threshold, - scratch: &mut compression_scratch, - }; - - writer - .write_packet(&ChunkDataAndUpdateLightEncode { - chunk_x: pos.x, - chunk_z: pos.z, - heightmaps: &compound! { - // TODO: MOTION_BLOCKING heightmap - }, - blocks_and_biomes: &scratch, - block_entities: &[], - trust_edges: true, - sky_light_mask: &self.dummy_sky_light_mask, - block_light_mask: &[], - empty_sky_light_mask: &[], - empty_block_light_mask: &[], - sky_light_arrays: &self.dummy_sky_light_arrays, - block_light_arrays: &[], - }) - .unwrap(); + chunk.cached_init_packet.get_mut().unwrap().clear(); } - debug_assert!(!chunk.cached_init_packet.is_empty()); + // Initialize the chunk data cache on new chunks here so this work can be done + // in parallel. + if chunk.created_this_tick() { + debug_assert!(chunk.cached_init_packet.get_mut().unwrap().is_empty()); + + let _unused: MutexGuard<_> = chunk.get_chunk_data_packet( + &mut compression_scratch, + pos, + self.biome_registry_len, + &self.filler_sky_light_mask, + &self.filler_sky_light_arrays, + self.compression_threshold, + ); + } }); } @@ -692,13 +659,15 @@ pub struct LoadedChunk { pub state: C::ChunkState, sections: Box<[ChunkSection]>, // TODO: block_entities: BTreeMap, - // TODO: rebuild init packet lazily? - cached_init_packet: Vec, + cached_init_packet: Mutex>, cached_update_packets: Vec, /// If any of the biomes in this chunk were modified this tick. any_biomes_modified: bool, created_this_tick: bool, deleted: bool, + /// For debugging purposes. + #[cfg(debug_assertions)] + uuid: uuid::Uuid, } impl Deref for LoadedChunk { @@ -716,7 +685,7 @@ impl DerefMut for LoadedChunk { } /// A 16x16x16 meter volume of blocks, biomes, and light in a chunk. -#[derive(Clone)] +#[derive(Clone, Debug)] struct ChunkSection { block_states: PalettedContainer, /// Contains a set bit for every block that has been modified in this @@ -763,11 +732,13 @@ impl LoadedChunk { Self { state, sections: chunk.sections.into(), - cached_init_packet: vec![], + cached_init_packet: Mutex::new(vec![]), cached_update_packets: vec![], any_biomes_modified: false, created_this_tick: true, deleted: false, + #[cfg(debug_assertions)] + uuid: uuid::Uuid::from_u128(rand::random()), } } @@ -795,13 +766,98 @@ impl LoadedChunk { } /// Queues the chunk data packet for this chunk with the given position. - /// /// This will initialize the chunk for the client. pub(crate) fn write_chunk_data_packet( &self, mut writer: impl WritePacket, + scratch: &mut Vec, + pos: ChunkPos, + chunks: &Chunks, ) -> anyhow::Result<()> { - writer.write_bytes(&self.cached_init_packet) + #[cfg(debug_assertions)] + assert_eq!( + chunks[pos].uuid, self.uuid, + "chunks and/or position arguments are incorrect" + ); + + let bytes = self.get_chunk_data_packet( + scratch, + pos, + chunks.biome_registry_len, + &chunks.filler_sky_light_mask, + &chunks.filler_sky_light_arrays, + chunks.compression_threshold, + ); + + writer.write_bytes(&bytes) + } + + /// Gets the bytes of the cached chunk data packet, initializing the cache + /// if it is empty. + fn get_chunk_data_packet( + &self, + scratch: &mut Vec, + pos: ChunkPos, + biome_registry_len: usize, + filler_sky_light_mask: &[u64], + filler_sky_light_arrays: &[LengthPrefixedArray], + compression_threshold: Option, + ) -> MutexGuard> { + let mut lck = self.cached_init_packet.lock().unwrap(); + + if lck.is_empty() { + scratch.clear(); + + for sect in self.sections.iter() { + sect.non_air_count.encode(&mut *scratch).unwrap(); + + sect.block_states + .encode_mc_format( + &mut *scratch, + |b| b.to_raw().into(), + 4, + 8, + bit_width(BlockState::max_raw().into()), + ) + .unwrap(); + + sect.biomes + .encode_mc_format( + &mut *scratch, + |b| b.0.into(), + 0, + 3, + bit_width(biome_registry_len - 1), + ) + .unwrap(); + } + + let mut compression_scratch = vec![]; + + let mut writer = + PacketWriter::new(&mut *lck, compression_threshold, &mut compression_scratch); + + writer + .write_packet(&ChunkDataAndUpdateLightEncode { + chunk_x: pos.x, + chunk_z: pos.z, + heightmaps: &compound! { + // TODO: MOTION_BLOCKING heightmap + }, + blocks_and_biomes: scratch, + block_entities: &[], + trust_edges: true, + sky_light_mask: filler_sky_light_mask, + block_light_mask: &[], + empty_sky_light_mask: &[], + empty_block_light_mask: &[], + sky_light_arrays: filler_sky_light_arrays, + block_light_arrays: &[], + }) + .unwrap(); + } + + lck } /// Queues block change packets for this chunk. @@ -922,7 +978,7 @@ impl Chunk for LoadedChunk { sect.biomes.optimize(); } - self.cached_init_packet.shrink_to_fit(); + self.cached_init_packet.get_mut().unwrap().shrink_to_fit(); self.cached_update_packets.shrink_to_fit(); } } diff --git a/src/client.rs b/src/client.rs index 07e7a64..1092fe1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1129,7 +1129,12 @@ impl Client { } (true, false) => { // Chunk needs initialization. Send packet to load it. - chunk.write_chunk_data_packet(&mut *send)?; + chunk.write_chunk_data_packet( + &mut *send, + &mut self.scratch, + pos, + &old_world.chunks, + )?; // Don't assert that the chunk is already loaded in this case. // Chunks are allowed to be overwritten and their "created this @@ -1258,7 +1263,12 @@ impl Client { if let Some((chunk, cell)) = world.chunks.chunk_and_cell(pos) { if let Some(chunk) = chunk { if !chunk.deleted() { - chunk.write_chunk_data_packet(&mut *send)?; + chunk.write_chunk_data_packet( + &mut *send, + &mut self.scratch, + pos, + &world.chunks, + )?; #[cfg(debug_assertions)] assert!(self.loaded_chunks.insert(pos)); @@ -1327,7 +1337,12 @@ impl Client { if let Some((chunk, cell)) = world.chunks.chunk_and_cell(pos) { if let Some(chunk) = chunk { if !chunk.deleted() { - chunk.write_chunk_data_packet(&mut *send)?; + chunk.write_chunk_data_packet( + &mut *send, + &mut self.scratch, + pos, + &world.chunks, + )?; #[cfg(debug_assertions)] assert!(self.loaded_chunks.insert(pos)); diff --git a/src/packet.rs b/src/packet.rs index 57bef43..c89c4f8 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -24,9 +24,9 @@ impl WritePacket for &mut W { } pub struct PacketWriter<'a, W> { - pub writer: W, - pub threshold: Option, - pub scratch: &'a mut Vec, + writer: W, + threshold: Option, + scratch: &'a mut Vec, } impl<'a, W: Write> PacketWriter<'a, W> { diff --git a/src/player_list.rs b/src/player_list.rs index 8b7710b..3b0f787 100644 --- a/src/player_list.rs +++ b/src/player_list.rs @@ -80,11 +80,11 @@ impl PlayerLists { for pl in self.slab.iter_mut() { pl.cached_update_packets.clear(); - let mut writer = PacketWriter { - writer: &mut pl.cached_update_packets, - threshold: compression_threshold, - scratch: &mut scratch, - }; + let mut writer = PacketWriter::new( + &mut pl.cached_update_packets, + compression_threshold, + &mut scratch, + ); if !pl.removed.is_empty() { writer diff --git a/src/server.rs b/src/server.rs index 59691af..c09588d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -200,6 +200,12 @@ impl SharedServer { &self.0.connection_mode } + /// Gets the compression threshold for packets. `None` indicates no + /// compression. + pub fn compression_threshold(&self) -> Option { + self.0.compression_threshold + } + /// Gets the maximum number of connections allowed to the server at once. pub fn max_connections(&self) -> usize { self.0.max_connections @@ -430,7 +436,6 @@ fn do_update_loop(server: &mut Server) -> ShutdownResult { let mut tick_start = Instant::now(); let shared = server.shared.clone(); - let biome_registry_len = shared.0.biomes.len(); let threshold = shared.0.compression_threshold; loop { @@ -471,7 +476,7 @@ fn do_update_loop(server: &mut Server) -> ShutdownResult { update_entity_partition(&mut server.entities, &mut server.worlds, threshold); for (_, world) in server.worlds.iter_mut() { - world.chunks.update_caches(threshold, biome_registry_len); + world.chunks.update_caches(); } server.player_lists.update_caches(threshold); diff --git a/src/world.rs b/src/world.rs index 6c6be22..35c73b8 100644 --- a/src/world.rs +++ b/src/world.rs @@ -53,7 +53,12 @@ impl Worlds { let (id, world) = self.slab.insert(World { state, - chunks: Chunks::new(dim.height, dim.min_y), + chunks: Chunks::new( + dim.height, + dim.min_y, + self.shared.biomes().len(), + self.shared.compression_threshold(), + ), dimension, deleted: false, }); diff --git a/valence_protocol/src/array.rs b/valence_protocol/src/array.rs new file mode 100644 index 0000000..85d56d6 --- /dev/null +++ b/valence_protocol/src/array.rs @@ -0,0 +1,45 @@ +use std::io::Write; + +use anyhow::ensure; + +use crate::{Decode, Encode, VarInt}; + +/// A fixed-size array encoded and decoded with a [`VarInt`] length prefix. +/// +/// This is used when the length of the array is known statically, but a +/// length prefix is needed anyway. +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +#[repr(transparent)] +pub struct LengthPrefixedArray(pub [T; N]); + +impl Encode for LengthPrefixedArray { + fn encode(&self, mut w: impl Write) -> anyhow::Result<()> { + VarInt(N as i32).encode(&mut w)?; + self.0.encode(w) + } + + fn encoded_len(&self) -> usize { + VarInt(N as i32).encoded_len() + self.0.encoded_len() + } +} + +impl<'a, T: Decode<'a>, const N: usize> Decode<'a> for LengthPrefixedArray { + fn decode(r: &mut &'a [u8]) -> anyhow::Result { + let len = VarInt::decode(r)?.0; + ensure!(len == N as i32, "unexpected array length of {len}"); + + <[T; N]>::decode(r).map(LengthPrefixedArray) + } +} + +impl From<[T; N]> for LengthPrefixedArray { + fn from(value: [T; N]) -> Self { + Self(value) + } +} + +impl From> for [T; N] { + fn from(value: LengthPrefixedArray) -> Self { + value.0 + } +} diff --git a/valence_protocol/src/cache.rs b/valence_protocol/src/cache.rs deleted file mode 100644 index 67e3d22..0000000 --- a/valence_protocol/src/cache.rs +++ /dev/null @@ -1,144 +0,0 @@ -use std::io::Write; -use std::marker::PhantomData; -use std::mem; - -use anyhow::anyhow; - -use crate::{Decode, Encode, Result}; - -/// Contains both a value of `T` and an [`EncodedBuf`] to ensure the -/// buffer is updated when `T` is modified[^note]. -/// -/// The `Encode` implementation for `Cached` encodes only the contained -/// `EncodedBuf`. -/// -/// Use this type when you want `Encode` to be cached but you also need to read -/// from the value of `T`. If the value of `T` is write-only, consider using -/// `EncodedBuf` instead. -/// -/// [`EncodedBuf`]: EncodedBuf -/// [^note]: Assuming `T` does not use internal mutability. -#[derive(Debug)] -pub struct Cached { - val: T, - buf: EncodedBuf, -} - -impl Cached { - pub fn new(val: T) -> Self { - let buf = EncodedBuf::new(&val); - - Self { val, buf } - } - - pub fn get(&self) -> &T { - &self.val - } - - pub fn buf(&self) -> &EncodedBuf { - &self.buf - } - - /// Provides a mutable reference to the contained `T` for modification. The - /// buffer is re-encoded when the closure returns. - pub fn modify(&mut self, f: impl FnOnce(&mut T) -> U) -> U { - let u = f(&mut self.val); - self.buf.set(&self.val); - u - } - - pub fn replace(&mut self, new: T) -> T { - self.modify(|old| mem::replace(old, new)) - } - - pub fn into_inner(self) -> (T, EncodedBuf) { - (self.val, self.buf) - } -} - -impl Encode for Cached -where - T: Encode, -{ - fn encode(&self, w: impl Write) -> Result<()> { - self.buf.encode(w) - } -} - -/// The `Decode` implementation for `Cached` exists for the sake of -/// completeness, but you probably shouldn't need to use it. -impl<'a, T> Decode<'a> for Cached -where - T: Encode + Decode<'a>, -{ - fn decode(r: &mut &'a [u8]) -> Result { - let val = T::decode(r)?; - Ok(Self::new(val)) - } -} - -/// Caches the result of `T`'s [`Encode`] implementation into an owned buffer. -/// -/// This is useful for types with expensive [`Encode`] implementations such as -/// [`Text`] or [`Compound`]. It has little to no benefit for primitive types -/// such as `i32`, `VarInt`, `&str`, `&[u8]`, etc. -/// -/// # Examples -/// -/// ``` -/// use valence_protocol::{Encode, EncodedBuf}; -/// -/// let mut buf1 = vec![]; -/// let mut buf2 = vec![]; -/// -/// "hello".encode(&mut buf1).unwrap(); -/// -/// let cache = EncodedBuf::new("hello"); -/// cache.encode(&mut buf2).unwrap(); -/// -/// assert_eq!(buf1, buf2); -/// ``` -/// -/// [`Text`]: crate::text::Text -/// [`Compound`]: valence_nbt::Compound -#[derive(Debug)] -pub struct EncodedBuf { - buf: Vec, - res: Result<()>, - _marker: PhantomData T>, -} - -impl EncodedBuf { - pub fn new(t: &T) -> Self { - let mut buf = vec![]; - let res = t.encode(&mut buf); - - Self { - buf, - res, - _marker: PhantomData, - } - } - - pub fn set(&mut self, t: &T) { - self.buf.clear(); - self.res = t.encode(&mut self.buf); - } - - pub fn into_inner(self) -> Result> { - self.res.map(|()| self.buf) - } -} - -impl Encode for EncodedBuf { - fn encode(&self, mut w: impl Write) -> Result<()> { - match &self.res { - Ok(()) => Ok(w.write_all(&self.buf)?), - Err(e) => Err(anyhow!("{e:#}")), - } - } - - fn encoded_len(&self) -> usize { - self.buf.len() - } -} diff --git a/valence_protocol/src/lib.rs b/valence_protocol/src/lib.rs index 72588a2..8e0b26a 100644 --- a/valence_protocol/src/lib.rs +++ b/valence_protocol/src/lib.rs @@ -72,10 +72,10 @@ extern crate self as valence_protocol; use std::io::Write; pub use anyhow::{Error, Result}; +pub use array::LengthPrefixedArray; pub use block::{BlockFace, BlockKind, BlockState}; pub use block_pos::BlockPos; pub use byte_angle::ByteAngle; -pub use cache::{Cached, EncodedBuf}; pub use codec::*; pub use ident::Ident; pub use inventory::InventoryKind; @@ -98,12 +98,12 @@ pub const PROTOCOL_VERSION: i32 = 760; /// targets. pub const MINECRAFT_VERSION: &str = "1.19.2"; +mod array; pub mod block; mod block_pos; mod bounded; mod byte_angle; mod byte_counter; -mod cache; mod codec; pub mod enchant; pub mod entity_meta; diff --git a/valence_protocol/src/packets/s2c.rs b/valence_protocol/src/packets/s2c.rs index f545a37..358e3f1 100644 --- a/valence_protocol/src/packets/s2c.rs +++ b/valence_protocol/src/packets/s2c.rs @@ -16,6 +16,7 @@ use crate::types::{ use crate::username::Username; use crate::var_int::VarInt; use crate::var_long::VarLong; +use crate::LengthPrefixedArray; pub mod status { use super::*; @@ -313,8 +314,8 @@ pub mod play { pub block_light_mask: Vec, pub empty_sky_light_mask: Vec, pub empty_block_light_mask: Vec, - pub sky_light_arrays: Vec<(VarInt, [u8; 2048])>, - pub block_light_arrays: Vec<(VarInt, [u8; 2048])>, + pub sky_light_arrays: Vec>, + pub block_light_arrays: Vec>, } #[derive(Clone, Debug, Encode, Packet)] @@ -330,8 +331,8 @@ pub mod play { pub block_light_mask: &'a [u64], pub empty_sky_light_mask: &'a [u64], pub empty_block_light_mask: &'a [u64], - pub sky_light_arrays: &'a [(VarInt, [u8; 2048])], - pub block_light_arrays: &'a [(VarInt, [u8; 2048])], + pub sky_light_arrays: &'a [LengthPrefixedArray], + pub block_light_arrays: &'a [LengthPrefixedArray], } #[derive(Copy, Clone, Debug, Encode, Decode, Packet)] diff --git a/valence_protocol/src/var_int.rs b/valence_protocol/src/var_int.rs index 3489d8c..16b2539 100644 --- a/valence_protocol/src/var_int.rs +++ b/valence_protocol/src/var_int.rs @@ -50,6 +50,7 @@ impl Encode for VarInt { } } + #[inline] fn encoded_len(&self) -> usize { match self.0 { 0 => 1,