Update chunk data packet cache lazily (#170)

Previously, the chunk data cache was regenerated at the end of each tick
every time a block or biome was modified in a chunk. This causes
performance problems in situations where chunks are being modified every
tick, like the conway example or redstone machines.

This changes things so that the chunk data packet is regenerated only
when a client actually needs to load the chunk. This isn't perfect
because the regeneration cannot happen in parallel, but the benefits
outweigh the costs. (The chunk data packet _is_ generated in parallel
when the chunk is first created. There is also some parallelism when
clients are loading different chunks. The packet cache is guarded by a
`Mutex`.)

This has revealed that compression is surprisingly slow. This should be
investigated later.
This commit is contained in:
Ryan Johnson 2022-12-13 22:56:22 -08:00 committed by GitHub
parent 8b50e93b0e
commit 740778ec41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 234 additions and 250 deletions

View file

@ -11,6 +11,7 @@ use std::io::Write;
use std::iter::FusedIterator;
use std::mem;
use std::ops::{Deref, DerefMut, Index, IndexMut};
use std::sync::{Mutex, MutexGuard};
use entity_partition::PartitionCell;
use paletted_container::PalettedContainer;
@ -21,7 +22,7 @@ use valence_nbt::compound;
use valence_protocol::packets::s2c::play::{
BlockUpdate, ChunkDataAndUpdateLightEncode, UpdateSectionBlocksEncode,
};
use valence_protocol::{BlockPos, BlockState, Encode, VarInt, VarLong};
use valence_protocol::{BlockPos, BlockState, Encode, LengthPrefixedArray, VarInt, VarLong};
use crate::biome::BiomeId;
use crate::config::Config;
@ -40,14 +41,21 @@ pub struct Chunks<C: Config> {
chunks: FxHashMap<ChunkPos, (Option<LoadedChunk<C>>, PartitionCell)>,
dimension_height: i32,
dimension_min_y: i32,
dummy_sky_light_mask: Box<[u64]>,
/// Sending sky light bits full of ones causes the vanilla client to lag
filler_sky_light_mask: Box<[u64]>,
/// Sending filler light data causes the vanilla client to lag
/// less. Hopefully we can remove this in the future.
dummy_sky_light_arrays: Box<[(VarInt, [u8; 2048])]>,
filler_sky_light_arrays: Box<[LengthPrefixedArray<u8, 2048>]>,
biome_registry_len: usize,
compression_threshold: Option<u32>,
}
impl<C: Config> Chunks<C> {
pub(crate) fn new(dimension_height: i32, dimension_min_y: i32) -> Self {
pub(crate) fn new(
dimension_height: i32,
dimension_min_y: i32,
biome_registry_len: usize,
compression_threshold: Option<u32>,
) -> Self {
let section_count = (dimension_height / 16 + 2) as usize;
let mut sky_light_mask = vec![0; num::Integer::div_ceil(&section_count, &16)];
@ -60,8 +68,10 @@ impl<C: Config> Chunks<C> {
chunks: FxHashMap::default(),
dimension_height,
dimension_min_y,
dummy_sky_light_mask: sky_light_mask.into(),
dummy_sky_light_arrays: vec![(VarInt(2048), [0xff; 2048]); section_count].into(),
filler_sky_light_mask: sky_light_mask.into(),
filler_sky_light_arrays: vec![LengthPrefixedArray([0xff; 2048]); section_count].into(),
biome_registry_len,
compression_threshold,
}
}
@ -259,15 +269,12 @@ impl<C: Config> Chunks<C> {
)
}
pub(crate) fn update_caches(
&mut self,
compression_threshold: Option<u32>,
biome_registry_len: usize,
) {
pub(crate) fn update_caches(&mut self) {
let min_y = self.dimension_min_y;
self.chunks.par_iter_mut().for_each(|(&pos, (chunk, _))| {
let Some(chunk) = chunk else {
// There is no chunk at this position.
return;
};
@ -310,11 +317,11 @@ impl<C: Config> Chunks<C> {
let global_y = sect_y as i32 * 16 + (idx / (16 * 16)) as i32 + min_y;
let global_z = pos.z * 16 + (idx / 16 % 16) as i32;
let mut writer = PacketWriter {
writer: &mut chunk.cached_update_packets,
threshold: compression_threshold,
scratch: &mut compression_scratch,
};
let mut writer = PacketWriter::new(
&mut chunk.cached_update_packets,
self.compression_threshold,
&mut compression_scratch,
);
writer
.write_packet(&BlockUpdate {
@ -345,11 +352,11 @@ impl<C: Config> Chunks<C> {
| (pos.z as i64 & 0x3fffff) << 20
| (sect_y as i64 + min_y.div_euclid(16) as i64) & 0xfffff;
let mut writer = PacketWriter {
writer: &mut chunk.cached_update_packets,
threshold: compression_threshold,
scratch: &mut compression_scratch,
};
let mut writer = PacketWriter::new(
&mut chunk.cached_update_packets,
self.compression_threshold,
&mut compression_scratch,
);
writer
.write_packet(&UpdateSectionBlocksEncode {
@ -367,66 +374,26 @@ impl<C: Config> Chunks<C> {
}
}
// Regenerate the chunk data packet if the cache was invalidated.
if any_blocks_modified || chunk.any_biomes_modified || chunk.created_this_tick {
// TODO: build chunk data packet cache lazily.
// Clear the cache if the cache was invalidated.
if any_blocks_modified || chunk.any_biomes_modified {
chunk.any_biomes_modified = false;
chunk.cached_init_packet.clear();
let mut scratch = vec![];
for sect in chunk.sections.iter() {
sect.non_air_count.encode(&mut scratch).unwrap();
sect.block_states
.encode_mc_format(
&mut scratch,
|b| b.to_raw().into(),
4,
8,
bit_width(BlockState::max_raw().into()),
)
.unwrap();
sect.biomes
.encode_mc_format(
&mut scratch,
|b| b.0.into(),
0,
3,
bit_width(biome_registry_len - 1),
)
.unwrap();
chunk.cached_init_packet.get_mut().unwrap().clear();
}
let mut writer = PacketWriter {
writer: &mut chunk.cached_init_packet,
threshold: compression_threshold,
scratch: &mut compression_scratch,
};
// Initialize the chunk data cache on new chunks here so this work can be done
// in parallel.
if chunk.created_this_tick() {
debug_assert!(chunk.cached_init_packet.get_mut().unwrap().is_empty());
writer
.write_packet(&ChunkDataAndUpdateLightEncode {
chunk_x: pos.x,
chunk_z: pos.z,
heightmaps: &compound! {
// TODO: MOTION_BLOCKING heightmap
},
blocks_and_biomes: &scratch,
block_entities: &[],
trust_edges: true,
sky_light_mask: &self.dummy_sky_light_mask,
block_light_mask: &[],
empty_sky_light_mask: &[],
empty_block_light_mask: &[],
sky_light_arrays: &self.dummy_sky_light_arrays,
block_light_arrays: &[],
})
.unwrap();
let _unused: MutexGuard<_> = chunk.get_chunk_data_packet(
&mut compression_scratch,
pos,
self.biome_registry_len,
&self.filler_sky_light_mask,
&self.filler_sky_light_arrays,
self.compression_threshold,
);
}
debug_assert!(!chunk.cached_init_packet.is_empty());
});
}
@ -692,13 +659,15 @@ pub struct LoadedChunk<C: Config> {
pub state: C::ChunkState,
sections: Box<[ChunkSection]>,
// TODO: block_entities: BTreeMap<u32, BlockEntity>,
// TODO: rebuild init packet lazily?
cached_init_packet: Vec<u8>,
cached_init_packet: Mutex<Vec<u8>>,
cached_update_packets: Vec<u8>,
/// If any of the biomes in this chunk were modified this tick.
any_biomes_modified: bool,
created_this_tick: bool,
deleted: bool,
/// For debugging purposes.
#[cfg(debug_assertions)]
uuid: uuid::Uuid,
}
impl<C: Config> Deref for LoadedChunk<C> {
@ -716,7 +685,7 @@ impl<C: Config> DerefMut for LoadedChunk<C> {
}
/// A 16x16x16 meter volume of blocks, biomes, and light in a chunk.
#[derive(Clone)]
#[derive(Clone, Debug)]
struct ChunkSection {
block_states: PalettedContainer<BlockState, SECTION_BLOCK_COUNT, { SECTION_BLOCK_COUNT / 2 }>,
/// Contains a set bit for every block that has been modified in this
@ -763,11 +732,13 @@ impl<C: Config> LoadedChunk<C> {
Self {
state,
sections: chunk.sections.into(),
cached_init_packet: vec![],
cached_init_packet: Mutex::new(vec![]),
cached_update_packets: vec![],
any_biomes_modified: false,
created_this_tick: true,
deleted: false,
#[cfg(debug_assertions)]
uuid: uuid::Uuid::from_u128(rand::random()),
}
}
@ -795,13 +766,98 @@ impl<C: Config> LoadedChunk<C> {
}
/// Queues the chunk data packet for this chunk with the given position.
///
/// This will initialize the chunk for the client.
pub(crate) fn write_chunk_data_packet(
&self,
mut writer: impl WritePacket,
scratch: &mut Vec<u8>,
pos: ChunkPos,
chunks: &Chunks<C>,
) -> anyhow::Result<()> {
writer.write_bytes(&self.cached_init_packet)
#[cfg(debug_assertions)]
assert_eq!(
chunks[pos].uuid, self.uuid,
"chunks and/or position arguments are incorrect"
);
let bytes = self.get_chunk_data_packet(
scratch,
pos,
chunks.biome_registry_len,
&chunks.filler_sky_light_mask,
&chunks.filler_sky_light_arrays,
chunks.compression_threshold,
);
writer.write_bytes(&bytes)
}
/// Gets the bytes of the cached chunk data packet, initializing the cache
/// if it is empty.
fn get_chunk_data_packet(
&self,
scratch: &mut Vec<u8>,
pos: ChunkPos,
biome_registry_len: usize,
filler_sky_light_mask: &[u64],
filler_sky_light_arrays: &[LengthPrefixedArray<u8, 2048>],
compression_threshold: Option<u32>,
) -> MutexGuard<Vec<u8>> {
let mut lck = self.cached_init_packet.lock().unwrap();
if lck.is_empty() {
scratch.clear();
for sect in self.sections.iter() {
sect.non_air_count.encode(&mut *scratch).unwrap();
sect.block_states
.encode_mc_format(
&mut *scratch,
|b| b.to_raw().into(),
4,
8,
bit_width(BlockState::max_raw().into()),
)
.unwrap();
sect.biomes
.encode_mc_format(
&mut *scratch,
|b| b.0.into(),
0,
3,
bit_width(biome_registry_len - 1),
)
.unwrap();
}
let mut compression_scratch = vec![];
let mut writer =
PacketWriter::new(&mut *lck, compression_threshold, &mut compression_scratch);
writer
.write_packet(&ChunkDataAndUpdateLightEncode {
chunk_x: pos.x,
chunk_z: pos.z,
heightmaps: &compound! {
// TODO: MOTION_BLOCKING heightmap
},
blocks_and_biomes: scratch,
block_entities: &[],
trust_edges: true,
sky_light_mask: filler_sky_light_mask,
block_light_mask: &[],
empty_sky_light_mask: &[],
empty_block_light_mask: &[],
sky_light_arrays: filler_sky_light_arrays,
block_light_arrays: &[],
})
.unwrap();
}
lck
}
/// Queues block change packets for this chunk.
@ -922,7 +978,7 @@ impl<C: Config> Chunk for LoadedChunk<C> {
sect.biomes.optimize();
}
self.cached_init_packet.shrink_to_fit();
self.cached_init_packet.get_mut().unwrap().shrink_to_fit();
self.cached_update_packets.shrink_to_fit();
}
}

View file

@ -1129,7 +1129,12 @@ impl<C: Config> Client<C> {
}
(true, false) => {
// Chunk needs initialization. Send packet to load it.
chunk.write_chunk_data_packet(&mut *send)?;
chunk.write_chunk_data_packet(
&mut *send,
&mut self.scratch,
pos,
&old_world.chunks,
)?;
// Don't assert that the chunk is already loaded in this case.
// Chunks are allowed to be overwritten and their "created this
@ -1258,7 +1263,12 @@ impl<C: Config> Client<C> {
if let Some((chunk, cell)) = world.chunks.chunk_and_cell(pos) {
if let Some(chunk) = chunk {
if !chunk.deleted() {
chunk.write_chunk_data_packet(&mut *send)?;
chunk.write_chunk_data_packet(
&mut *send,
&mut self.scratch,
pos,
&world.chunks,
)?;
#[cfg(debug_assertions)]
assert!(self.loaded_chunks.insert(pos));
@ -1327,7 +1337,12 @@ impl<C: Config> Client<C> {
if let Some((chunk, cell)) = world.chunks.chunk_and_cell(pos) {
if let Some(chunk) = chunk {
if !chunk.deleted() {
chunk.write_chunk_data_packet(&mut *send)?;
chunk.write_chunk_data_packet(
&mut *send,
&mut self.scratch,
pos,
&world.chunks,
)?;
#[cfg(debug_assertions)]
assert!(self.loaded_chunks.insert(pos));

View file

@ -24,9 +24,9 @@ impl<W: WritePacket> WritePacket for &mut W {
}
pub struct PacketWriter<'a, W> {
pub writer: W,
pub threshold: Option<u32>,
pub scratch: &'a mut Vec<u8>,
writer: W,
threshold: Option<u32>,
scratch: &'a mut Vec<u8>,
}
impl<'a, W: Write> PacketWriter<'a, W> {

View file

@ -80,11 +80,11 @@ impl<C: Config> PlayerLists<C> {
for pl in self.slab.iter_mut() {
pl.cached_update_packets.clear();
let mut writer = PacketWriter {
writer: &mut pl.cached_update_packets,
threshold: compression_threshold,
scratch: &mut scratch,
};
let mut writer = PacketWriter::new(
&mut pl.cached_update_packets,
compression_threshold,
&mut scratch,
);
if !pl.removed.is_empty() {
writer

View file

@ -200,6 +200,12 @@ impl<C: Config> SharedServer<C> {
&self.0.connection_mode
}
/// Gets the compression threshold for packets. `None` indicates no
/// compression.
pub fn compression_threshold(&self) -> Option<u32> {
self.0.compression_threshold
}
/// Gets the maximum number of connections allowed to the server at once.
pub fn max_connections(&self) -> usize {
self.0.max_connections
@ -430,7 +436,6 @@ fn do_update_loop(server: &mut Server<impl Config>) -> ShutdownResult {
let mut tick_start = Instant::now();
let shared = server.shared.clone();
let biome_registry_len = shared.0.biomes.len();
let threshold = shared.0.compression_threshold;
loop {
@ -471,7 +476,7 @@ fn do_update_loop(server: &mut Server<impl Config>) -> ShutdownResult {
update_entity_partition(&mut server.entities, &mut server.worlds, threshold);
for (_, world) in server.worlds.iter_mut() {
world.chunks.update_caches(threshold, biome_registry_len);
world.chunks.update_caches();
}
server.player_lists.update_caches(threshold);

View file

@ -53,7 +53,12 @@ impl<C: Config> Worlds<C> {
let (id, world) = self.slab.insert(World {
state,
chunks: Chunks::new(dim.height, dim.min_y),
chunks: Chunks::new(
dim.height,
dim.min_y,
self.shared.biomes().len(),
self.shared.compression_threshold(),
),
dimension,
deleted: false,
});

View file

@ -0,0 +1,45 @@
use std::io::Write;
use anyhow::ensure;
use crate::{Decode, Encode, VarInt};
/// A fixed-size array encoded and decoded with a [`VarInt`] length prefix.
///
/// This is used when the length of the array is known statically, but a
/// length prefix is needed anyway.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
#[repr(transparent)]
pub struct LengthPrefixedArray<T, const N: usize>(pub [T; N]);
impl<T: Encode, const N: usize> Encode for LengthPrefixedArray<T, N> {
fn encode(&self, mut w: impl Write) -> anyhow::Result<()> {
VarInt(N as i32).encode(&mut w)?;
self.0.encode(w)
}
fn encoded_len(&self) -> usize {
VarInt(N as i32).encoded_len() + self.0.encoded_len()
}
}
impl<'a, T: Decode<'a>, const N: usize> Decode<'a> for LengthPrefixedArray<T, N> {
fn decode(r: &mut &'a [u8]) -> anyhow::Result<Self> {
let len = VarInt::decode(r)?.0;
ensure!(len == N as i32, "unexpected array length of {len}");
<[T; N]>::decode(r).map(LengthPrefixedArray)
}
}
impl<T, const N: usize> From<[T; N]> for LengthPrefixedArray<T, N> {
fn from(value: [T; N]) -> Self {
Self(value)
}
}
impl<T, const N: usize> From<LengthPrefixedArray<T, N>> for [T; N] {
fn from(value: LengthPrefixedArray<T, N>) -> Self {
value.0
}
}

View file

@ -1,144 +0,0 @@
use std::io::Write;
use std::marker::PhantomData;
use std::mem;
use anyhow::anyhow;
use crate::{Decode, Encode, Result};
/// Contains both a value of `T` and an [`EncodedBuf<T>`] to ensure the
/// buffer is updated when `T` is modified[^note].
///
/// The `Encode` implementation for `Cached<T>` encodes only the contained
/// `EncodedBuf`.
///
/// Use this type when you want `Encode` to be cached but you also need to read
/// from the value of `T`. If the value of `T` is write-only, consider using
/// `EncodedBuf` instead.
///
/// [`EncodedBuf<T>`]: EncodedBuf
/// [^note]: Assuming `T` does not use internal mutability.
#[derive(Debug)]
pub struct Cached<T> {
val: T,
buf: EncodedBuf<T>,
}
impl<T: Encode> Cached<T> {
pub fn new(val: T) -> Self {
let buf = EncodedBuf::new(&val);
Self { val, buf }
}
pub fn get(&self) -> &T {
&self.val
}
pub fn buf(&self) -> &EncodedBuf<T> {
&self.buf
}
/// Provides a mutable reference to the contained `T` for modification. The
/// buffer is re-encoded when the closure returns.
pub fn modify<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
let u = f(&mut self.val);
self.buf.set(&self.val);
u
}
pub fn replace(&mut self, new: T) -> T {
self.modify(|old| mem::replace(old, new))
}
pub fn into_inner(self) -> (T, EncodedBuf<T>) {
(self.val, self.buf)
}
}
impl<T> Encode for Cached<T>
where
T: Encode,
{
fn encode(&self, w: impl Write) -> Result<()> {
self.buf.encode(w)
}
}
/// The `Decode` implementation for `Cached` exists for the sake of
/// completeness, but you probably shouldn't need to use it.
impl<'a, T> Decode<'a> for Cached<T>
where
T: Encode + Decode<'a>,
{
fn decode(r: &mut &'a [u8]) -> Result<Self> {
let val = T::decode(r)?;
Ok(Self::new(val))
}
}
/// Caches the result of `T`'s [`Encode`] implementation into an owned buffer.
///
/// This is useful for types with expensive [`Encode`] implementations such as
/// [`Text`] or [`Compound`]. It has little to no benefit for primitive types
/// such as `i32`, `VarInt`, `&str`, `&[u8]`, etc.
///
/// # Examples
///
/// ```
/// use valence_protocol::{Encode, EncodedBuf};
///
/// let mut buf1 = vec![];
/// let mut buf2 = vec![];
///
/// "hello".encode(&mut buf1).unwrap();
///
/// let cache = EncodedBuf::new("hello");
/// cache.encode(&mut buf2).unwrap();
///
/// assert_eq!(buf1, buf2);
/// ```
///
/// [`Text`]: crate::text::Text
/// [`Compound`]: valence_nbt::Compound
#[derive(Debug)]
pub struct EncodedBuf<T: ?Sized> {
buf: Vec<u8>,
res: Result<()>,
_marker: PhantomData<fn(T) -> T>,
}
impl<T: Encode + ?Sized> EncodedBuf<T> {
pub fn new(t: &T) -> Self {
let mut buf = vec![];
let res = t.encode(&mut buf);
Self {
buf,
res,
_marker: PhantomData,
}
}
pub fn set(&mut self, t: &T) {
self.buf.clear();
self.res = t.encode(&mut self.buf);
}
pub fn into_inner(self) -> Result<Vec<u8>> {
self.res.map(|()| self.buf)
}
}
impl<T: ?Sized> Encode for EncodedBuf<T> {
fn encode(&self, mut w: impl Write) -> Result<()> {
match &self.res {
Ok(()) => Ok(w.write_all(&self.buf)?),
Err(e) => Err(anyhow!("{e:#}")),
}
}
fn encoded_len(&self) -> usize {
self.buf.len()
}
}

View file

@ -72,10 +72,10 @@ extern crate self as valence_protocol;
use std::io::Write;
pub use anyhow::{Error, Result};
pub use array::LengthPrefixedArray;
pub use block::{BlockFace, BlockKind, BlockState};
pub use block_pos::BlockPos;
pub use byte_angle::ByteAngle;
pub use cache::{Cached, EncodedBuf};
pub use codec::*;
pub use ident::Ident;
pub use inventory::InventoryKind;
@ -98,12 +98,12 @@ pub const PROTOCOL_VERSION: i32 = 760;
/// targets.
pub const MINECRAFT_VERSION: &str = "1.19.2";
mod array;
pub mod block;
mod block_pos;
mod bounded;
mod byte_angle;
mod byte_counter;
mod cache;
mod codec;
pub mod enchant;
pub mod entity_meta;

View file

@ -16,6 +16,7 @@ use crate::types::{
use crate::username::Username;
use crate::var_int::VarInt;
use crate::var_long::VarLong;
use crate::LengthPrefixedArray;
pub mod status {
use super::*;
@ -313,8 +314,8 @@ pub mod play {
pub block_light_mask: Vec<u64>,
pub empty_sky_light_mask: Vec<u64>,
pub empty_block_light_mask: Vec<u64>,
pub sky_light_arrays: Vec<(VarInt, [u8; 2048])>,
pub block_light_arrays: Vec<(VarInt, [u8; 2048])>,
pub sky_light_arrays: Vec<LengthPrefixedArray<u8, 2048>>,
pub block_light_arrays: Vec<LengthPrefixedArray<u8, 2048>>,
}
#[derive(Clone, Debug, Encode, Packet)]
@ -330,8 +331,8 @@ pub mod play {
pub block_light_mask: &'a [u64],
pub empty_sky_light_mask: &'a [u64],
pub empty_block_light_mask: &'a [u64],
pub sky_light_arrays: &'a [(VarInt, [u8; 2048])],
pub block_light_arrays: &'a [(VarInt, [u8; 2048])],
pub sky_light_arrays: &'a [LengthPrefixedArray<u8, 2048>],
pub block_light_arrays: &'a [LengthPrefixedArray<u8, 2048>],
}
#[derive(Copy, Clone, Debug, Encode, Decode, Packet)]

View file

@ -50,6 +50,7 @@ impl Encode for VarInt {
}
}
#[inline]
fn encoded_len(&self) -> usize {
match self.0 {
0 => 1,