Store entities and clients per server rather than per world

This commit is contained in:
Ryan 2022-07-03 15:31:24 -07:00
parent 985ecf3922
commit 79cb4c159a
12 changed files with 349 additions and 311 deletions

View file

@ -10,8 +10,8 @@ use valence::client::{Event, GameMode};
use valence::config::{Config, ServerListPing};
use valence::text::Color;
use valence::{
async_trait, ident, Biome, BlockState, Client, Dimension, DimensionId, Server, ShutdownResult,
Text, TextFormat, WorldId, Worlds,
async_trait, ident, Biome, BlockState, Dimension, DimensionId, Server, SharedServer,
ShutdownResult, TextFormat,
};
pub fn main() -> ShutdownResult {
@ -57,6 +57,13 @@ impl Config for Game {
false
}
fn dimensions(&self) -> Vec<Dimension> {
vec![Dimension {
fixed_time: Some(6000),
..Dimension::default()
}]
}
fn biomes(&self) -> Vec<Biome> {
vec![Biome {
name: ident!("valence:default_biome"),
@ -65,14 +72,11 @@ impl Config for Game {
}]
}
fn dimensions(&self) -> Vec<Dimension> {
vec![Dimension {
fixed_time: Some(6000),
..Dimension::default()
}]
}
async fn server_list_ping(&self, _server: &Server, _remote_addr: SocketAddr) -> ServerListPing {
async fn server_list_ping(
&self,
_server: &SharedServer,
_remote_addr: SocketAddr,
) -> ServerListPing {
ServerListPing::Respond {
online_players: self.player_count.load(Ordering::SeqCst) as i32,
max_players: MAX_PLAYERS as i32,
@ -81,26 +85,8 @@ impl Config for Game {
}
}
fn join(
&self,
_server: &Server,
_client: &mut Client,
worlds: &mut Worlds,
) -> Result<WorldId, Text> {
if let Ok(_) = self
.player_count
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
(count < MAX_PLAYERS).then(|| count + 1)
})
{
Ok(worlds.iter().next().unwrap().0)
} else {
Err("The server is full!".into())
}
}
fn init(&self, _server: &Server, worlds: &mut Worlds) {
let world = worlds.create(DimensionId::default()).1;
fn init(&self, server: &mut Server) {
let world = server.worlds.create(DimensionId::default()).1;
world.meta.set_flat(true);
for chunk_z in -2..Integer::div_ceil(&(SIZE_X as i32), &16) + 2 {
@ -110,8 +96,8 @@ impl Config for Game {
}
}
fn update(&self, server: &Server, worlds: &mut Worlds) {
let world = worlds.iter_mut().next().unwrap().1;
fn update(&self, server: &mut Server) {
let (world_id, world) = server.worlds.iter_mut().next().unwrap();
let spawn_pos = [
SIZE_X as f64 / 2.0,
@ -119,10 +105,21 @@ impl Config for Game {
SIZE_Z as f64 / 2.0,
];
world.clients.retain(|_, client| {
if client.created_tick() == server.current_tick() {
client.set_game_mode(GameMode::Survival);
server.clients.retain(|_, client| {
if client.created_tick() == server.shared.current_tick() {
if self
.player_count
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
(count < MAX_PLAYERS).then_some(count + 1)
})
.is_err()
{
client.disconnect("The server is full!".color(Color::RED));
return false;
}
client.set_world(world_id);
client.set_game_mode(GameMode::Survival);
client.teleport(spawn_pos, 0.0, 0.0);
world.meta.player_list_mut().insert(
@ -148,7 +145,7 @@ impl Config for Game {
let State { board, board_buf } = &mut *self.state.lock().unwrap();
for (_, client) in world.clients.iter_mut() {
for (_, client) in server.clients.iter_mut() {
while let Some(event) = client.pop_event() {
match event {
Event::Digging(e) => {
@ -171,7 +168,7 @@ impl Config for Game {
}
}
if server.current_tick() % 4 != 0 {
if server.shared.current_tick() % 4 != 0 {
return;
}
@ -201,7 +198,7 @@ impl Config for Game {
mem::swap(board, board_buf);
let min_y = server.dimensions().next().unwrap().1.min_y;
let min_y = server.shared.dimensions().next().unwrap().1.min_y;
for chunk_x in 0..Integer::div_ceil(&SIZE_X, &16) {
for chunk_z in 0..Integer::div_ceil(&SIZE_Z, &16) {

View file

@ -9,8 +9,8 @@ use valence::config::{Config, ServerListPing};
use valence::text::Color;
use valence::util::to_yaw_and_pitch;
use valence::{
async_trait, Client, DimensionId, EntityId, EntityType, Server, ShutdownResult, Text,
TextFormat, WorldId, Worlds,
async_trait, DimensionId, EntityId, EntityType, Server, SharedServer, ShutdownResult,
TextFormat,
};
use vek::{Mat3, Vec3};
@ -45,7 +45,11 @@ impl Config for Game {
false
}
async fn server_list_ping(&self, _server: &Server, _remote_addr: SocketAddr) -> ServerListPing {
async fn server_list_ping(
&self,
_server: &SharedServer,
_remote_addr: SocketAddr,
) -> ServerListPing {
ServerListPing::Respond {
online_players: self.player_count.load(Ordering::SeqCst) as i32,
max_players: MAX_PLAYERS as i32,
@ -54,26 +58,8 @@ impl Config for Game {
}
}
fn join(
&self,
_server: &Server,
_client: &mut Client,
worlds: &mut Worlds,
) -> Result<WorldId, Text> {
if let Ok(_) = self
.player_count
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
(count < MAX_PLAYERS).then(|| count + 1)
})
{
Ok(worlds.iter().next().unwrap().0)
} else {
Err("The server is full!".into())
}
}
fn init(&self, _server: &Server, worlds: &mut Worlds) {
let world = worlds.create(DimensionId::default()).1;
fn init(&self, server: &mut Server) {
let (world_id, world) = server.worlds.create(DimensionId::default());
world.meta.set_flat(true);
let size = 5;
@ -84,17 +70,30 @@ impl Config for Game {
}
self.cows.lock().unwrap().extend((0..200).map(|_| {
let (id, e) = world.entities.create();
let (id, e) = server.entities.create();
e.set_world(world_id);
e.set_type(EntityType::Cow);
id
}));
}
fn update(&self, server: &Server, worlds: &mut Worlds) {
let world = worlds.iter_mut().next().unwrap().1;
fn update(&self, server: &mut Server) {
let (world_id, world) = server.worlds.iter_mut().next().unwrap();
world.clients.retain(|_, client| {
if client.created_tick() == server.current_tick() {
server.clients.retain(|_, client| {
if client.created_tick() == server.shared.current_tick() {
if self
.player_count
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
(count < MAX_PLAYERS).then_some(count + 1)
})
.is_err()
{
client.disconnect("The server is full!".color(Color::RED));
return false;
}
client.set_world(world_id);
client.set_game_mode(GameMode::Creative);
client.teleport([0.0, 200.0, 0.0], 0.0, 0.0);
@ -116,7 +115,7 @@ impl Config for Game {
}
});
let time = server.current_tick() as f64 / server.tick_rate() as f64;
let time = server.shared.current_tick() as f64 / server.shared.tick_rate() as f64;
let rot = Mat3::rotation_x(time * TAU * 0.1)
.rotated_y(time * TAU * 0.2)
@ -128,7 +127,7 @@ impl Config for Game {
let radius = 6.0 + ((time * TAU / 2.5).sin() + 1.0) / 2.0 * 10.0;
// TODO: use eye position.
let player_pos = world
let player_pos = server
.clients
.iter()
.next()
@ -136,7 +135,7 @@ impl Config for Game {
.unwrap_or_default();
for (cow_id, p) in cows.iter().cloned().zip(fibonacci_spiral(cow_count)) {
let cow = world.entities.get_mut(cow_id).expect("missing cow");
let cow = server.entities.get_mut(cow_id).expect("missing cow");
let rotated = p * rot;
let transformed = rotated * radius + [0.0, 100.0, 0.0];

View file

@ -11,8 +11,7 @@ use valence::config::{Config, ServerListPing};
use valence::text::Color;
use valence::util::chunks_in_view_distance;
use valence::{
async_trait, ChunkPos, Client, DimensionId, Server, ShutdownResult, Text, TextFormat, WorldId,
Worlds,
async_trait, ChunkPos, DimensionId, Server, SharedServer, ShutdownResult, TextFormat,
};
use vek::Lerp;
@ -57,7 +56,11 @@ impl Config for Game {
false
}
async fn server_list_ping(&self, _server: &Server, _remote_addr: SocketAddr) -> ServerListPing {
async fn server_list_ping(
&self,
_server: &SharedServer,
_remote_addr: SocketAddr,
) -> ServerListPing {
ServerListPing::Respond {
online_players: self.player_count.load(Ordering::SeqCst) as i32,
max_players: MAX_PLAYERS as i32,
@ -66,41 +69,35 @@ impl Config for Game {
}
}
fn join(
&self,
_server: &Server,
_client: &mut Client,
worlds: &mut Worlds,
) -> Result<WorldId, Text> {
if let Ok(_) = self
.player_count
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
(count < MAX_PLAYERS).then(|| count + 1)
})
{
Ok(worlds.iter().next().unwrap().0)
} else {
Err("The server is full!".into())
}
}
fn init(&self, _server: &Server, worlds: &mut Worlds) {
let (_, world) = worlds.create(DimensionId::default());
fn init(&self, server: &mut Server) {
let (_, world) = server.worlds.create(DimensionId::default());
world.meta.set_flat(true);
}
fn update(&self, server: &Server, worlds: &mut Worlds) {
let world = worlds.iter_mut().next().unwrap().1;
fn update(&self, server: &mut Server) {
let (world_id, world) = server.worlds.iter_mut().next().unwrap();
let mut chunks_to_unload = HashSet::<_>::from_iter(world.chunks.iter().map(|t| t.0));
world.clients.retain(|_, client| {
server.clients.retain(|_, client| {
if client.is_disconnected() {
self.player_count.fetch_sub(1, Ordering::SeqCst);
return false;
}
if client.created_tick() == server.current_tick() {
if client.created_tick() == server.shared.current_tick() {
if self
.player_count
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
(count < MAX_PLAYERS).then_some(count + 1)
})
.is_err()
{
client.disconnect("The server is full!".color(Color::RED));
return false;
}
client.set_world(world_id);
client.set_game_mode(GameMode::Creative);
client.set_max_view_distance(32);
client.teleport([0.0, 200.0, 0.0], 0.0, 0.0);
@ -115,6 +112,10 @@ impl Config for Game {
);
client.send_message("Welcome to the terrain example!".italic());
client.send_message(
"This demonstrates how to create infinite procedurally generated terrain."
.italic(),
);
}
let dist = client.view_distance();
@ -135,7 +136,10 @@ impl Config for Game {
}
world.chunks.par_iter_mut().for_each(|(pos, chunk)| {
if chunk.created_tick() == server.current_tick() {
if chunk.created_tick() != server.shared.current_tick() {
return;
}
for z in 0..16 {
for x in 0..16 {
let block_x = x as i64 + pos.x as i64 * 16;
@ -170,8 +174,7 @@ impl Config for Game {
);
if density > 0.55 {
if density > 0.7 && chunk.get_block_state(x, y + 1, z).is_air()
{
if density > 0.7 && chunk.get_block_state(x, y + 1, z).is_air() {
let upper = BlockState::TALL_GRASS
.set(PropName::Half, PropValue::Upper);
let lower = BlockState::TALL_GRASS
@ -187,7 +190,6 @@ impl Config for Game {
}
}
}
}
});
}
}
@ -257,8 +259,8 @@ fn has_terrain_at(g: &Game, x: i64, y: i64, z: i64) -> bool {
noise01(&g.hilly_noise, [x, y, z].map(|a| a as f64 / 400.0)).powi(2),
);
let lower = 15.0 + 75.0 * hilly;
let upper = lower + 125.0 * hilly;
let lower = 15.0 + 100.0 * hilly;
let upper = lower + 100.0 * hilly;
if y as f64 <= lower {
return true;
@ -266,7 +268,7 @@ fn has_terrain_at(g: &Game, x: i64, y: i64, z: i64) -> bool {
return false;
}
let density = (1.0 - lerpstep(lower, upper, y as f64)).sqrt();
let density = 1.0 - lerpstep(lower, upper, y as f64);
let n = fbm(
&g.density_noise,

View file

@ -13,16 +13,16 @@ use crate::protocol::packets::play::s2c::{
BlockUpdate, LevelChunkHeightmaps, LevelChunkWithLight, S2cPlayPacket, SectionBlocksUpdate,
};
use crate::protocol::{Encode, Nbt, VarInt, VarLong};
use crate::{BiomeId, BlockPos, ChunkPos, DimensionId, Server, Ticks};
use crate::{BiomeId, BlockPos, ChunkPos, DimensionId, SharedServer, Ticks};
pub struct Chunks {
chunks: HashMap<ChunkPos, Chunk>,
server: Server,
server: SharedServer,
dimension: DimensionId,
}
impl Chunks {
pub(crate) fn new(server: Server, dimension: DimensionId) -> Self {
pub(crate) fn new(server: SharedServer, dimension: DimensionId) -> Self {
Self {
chunks: HashMap::new(),
server,

View file

@ -31,8 +31,8 @@ use crate::server::C2sPacketChannels;
use crate::slotmap::{Key, SlotMap};
use crate::util::{chunks_in_view_distance, is_chunk_in_view_distance};
use crate::{
ident, BlockPos, ChunkPos, Chunks, DimensionId, Entities, EntityId, NewClientData, Server,
SpatialIndex, Text, Ticks, WorldMeta, LIBRARY_NAMESPACE,
ident, BlockPos, ChunkPos, DimensionId, Entities, EntityId, NewClientData, SharedServer, Text,
Ticks, WorldId, Worlds, LIBRARY_NAMESPACE,
};
pub struct Clients {
@ -44,7 +44,7 @@ impl Clients {
Self { sm: SlotMap::new() }
}
pub(crate) fn create(&mut self, client: Client) -> (ClientId, &mut Client) {
pub(crate) fn insert(&mut self, client: Client) -> (ClientId, &mut Client) {
let (id, client) = self.sm.insert(client);
(ClientId(id), client)
}
@ -98,6 +98,8 @@ pub struct Client {
uuid: Uuid,
username: String,
textures: Option<SignedPlayerTextures>,
new_world: Option<WorldId>,
old_world: Option<WorldId>,
on_ground: bool,
new_position: Vec3<f64>,
old_position: Vec3<f64>,
@ -143,7 +145,7 @@ pub struct Client {
impl Client {
pub(crate) fn new(
packet_channels: C2sPacketChannels,
server: &Server,
server: &SharedServer,
ncd: NewClientData,
) -> Self {
let (send, recv) = packet_channels;
@ -155,6 +157,8 @@ impl Client {
uuid: ncd.uuid,
username: ncd.username,
textures: ncd.textures,
new_world: None,
old_world: None,
on_ground: false,
new_position: Vec3::default(),
old_position: Vec3::default(),
@ -199,6 +203,14 @@ impl Client {
self.textures.as_ref()
}
pub fn world(&self) -> Option<WorldId> {
self.new_world
}
pub fn set_world(&mut self, world: WorldId) {
self.new_world = Some(world);
}
/// Sends a system message to the player.
pub fn send_message(&mut self, msg: impl Into<Text>) {
self.msgs_to_send.push(msg.into());
@ -554,26 +566,33 @@ impl Client {
}
}
pub(crate) fn update(
&mut self,
server: &Server,
entities: &Entities,
spatial_index: &SpatialIndex,
chunks: &Chunks,
meta: &WorldMeta,
) {
pub(crate) fn update(&mut self, shared: &SharedServer, entities: &Entities, worlds: &Worlds) {
// Mark the client as disconnected when appropriate.
if self.recv.is_disconnected() || self.send.as_ref().map_or(true, |s| s.is_disconnected()) {
self.send = None;
return;
}
let current_tick = server.current_tick();
let world = match self.new_world.and_then(|id| worlds.get(id)) {
Some(world) => world,
None => {
log::warn!(
"client {} is in an invalid world and must be disconnected",
self.username()
);
self.disconnect_no_reason();
return;
}
};
let current_tick = shared.current_tick();
// Send the join game packet and other initial packets. We defer this until now
// so that the user can set the client's location, game mode, etc.
if self.created_tick == current_tick {
meta.player_list()
world
.meta
.player_list()
.initial_packets(|pkt| self.send_packet(pkt));
self.send_packet(Login {
@ -581,16 +600,19 @@ impl Client {
is_hardcore: false, // TODO
gamemode: self.new_game_mode,
previous_gamemode: self.old_game_mode,
dimension_names: server
dimension_names: shared
.dimensions()
.map(|(id, _)| ident!("{LIBRARY_NAMESPACE}:dimension_{}", id.0))
.collect(),
registry_codec: Nbt(make_dimension_codec(server)),
registry_codec: Nbt(make_dimension_codec(shared)),
dimension_type_name: ident!(
"{LIBRARY_NAMESPACE}:dimension_type_{}",
meta.dimension().0
world.meta.dimension().0
),
dimension_name: ident!(
"{LIBRARY_NAMESPACE}:dimension_{}",
world.meta.dimension().0
),
dimension_name: ident!("{LIBRARY_NAMESPACE}:dimension_{}", meta.dimension().0),
hashed_seed: 0,
max_players: VarInt(0),
view_distance: BoundedInt(VarInt(self.new_max_view_distance as i32)),
@ -598,7 +620,7 @@ impl Client {
reduced_debug_info: false,
enable_respawn_screen: false,
is_debug: false,
is_flat: meta.is_flat(),
is_flat: world.meta.is_flat(),
last_death_location: self
.death_location
.map(|(id, pos)| (ident!("{LIBRARY_NAMESPACE}:dimension_{}", id.0), pos)),
@ -606,6 +628,13 @@ impl Client {
self.teleport(self.position(), self.yaw(), self.pitch());
} else {
if self.new_world != self.old_world {
self.loaded_entities.clear();
self.loaded_chunks.clear();
todo!("send respawn packet");
}
if self.old_game_mode != self.new_game_mode {
self.old_game_mode = self.new_game_mode;
self.send_packet(GameEvent {
@ -614,7 +643,10 @@ impl Client {
});
}
meta.player_list().diff_packets(|pkt| self.send_packet(pkt));
world
.meta
.player_list()
.diff_packets(|pkt| self.send_packet(pkt));
}
// Update the players spawn position (compass position)
@ -638,7 +670,7 @@ impl Client {
}
// Check if it's time to send another keepalive.
if current_tick % (server.tick_rate() * 8) == 0 {
if current_tick % (shared.tick_rate() * 8) == 0 {
if self.got_keepalive {
let id = rand::random();
self.send_packet(KeepAlive { id });
@ -671,7 +703,7 @@ impl Client {
}
}
let dimension = server.dimension(meta.dimension());
let dimension = shared.dimension(world.meta.dimension());
// Update existing chunks and unload those outside the view distance. Chunks
// that have been overwritten also need to be unloaded.
@ -680,7 +712,7 @@ impl Client {
// moves to an adjacent chunk and back to the original.
let cache = 2;
if let Some(chunk) = chunks.get(pos) {
if let Some(chunk) = world.chunks.get(pos) {
if is_chunk_in_view_distance(center, pos, view_dist + cache)
&& chunk.created_tick() != current_tick
{
@ -703,7 +735,7 @@ impl Client {
// Load new chunks within the view distance
for pos in chunks_in_view_distance(center, view_dist) {
if let Some(chunk) = chunks.get(pos) {
if let Some(chunk) = world.chunks.get(pos) {
if self.loaded_chunks.insert(pos) {
self.send_packet(chunk.chunk_data_packet(pos));
chunk.block_change_packets(pos, dimension.min_y, |pkt| self.send_packet(pkt));
@ -866,7 +898,7 @@ impl Client {
// Spawn new entities within the view distance.
let pos = self.position();
spatial_index.query::<_, _, ()>(
world.spatial_index.query::<_, _, ()>(
|bb| bb.projected_point(pos).distance(pos) <= view_dist as f64 * 16.0,
|id, _| {
if self.loaded_entities.insert(id) {
@ -888,6 +920,7 @@ impl Client {
);
self.old_position = self.new_position;
self.old_world = self.new_world;
}
}
@ -906,9 +939,9 @@ fn send_packet(send_opt: &mut Option<Sender<S2cPlayPacket>>, pkt: impl Into<S2cP
}
}
fn make_dimension_codec(server: &Server) -> RegistryCodec {
fn make_dimension_codec(shared: &SharedServer) -> RegistryCodec {
let mut dims = Vec::new();
for (id, dim) in server.dimensions() {
for (id, dim) in shared.dimensions() {
let id = id.0 as i32;
dims.push(DimensionTypeRegistryEntry {
name: ident!("{LIBRARY_NAMESPACE}:dimension_type_{id}"),
@ -918,7 +951,7 @@ fn make_dimension_codec(server: &Server) -> RegistryCodec {
}
let mut biomes = Vec::new();
for (id, biome) in server.biomes() {
for (id, biome) in shared.biomes() {
biomes.push(to_biome_registry_item(biome, id.0 as i32));
}

View file

@ -5,7 +5,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
use async_trait::async_trait;
use tokio::runtime::Handle as TokioHandle;
use crate::{Biome, Client, Dimension, NewClientData, Server, Text, Ticks, WorldId, Worlds};
use crate::{Biome, Dimension, NewClientData, Server, SharedServer, Text, Ticks};
/// A trait containing callbacks which are invoked by the running Minecraft
/// server.
@ -151,7 +151,11 @@ pub trait Config: Any + Send + Sync + UnwindSafe + RefUnwindSafe {
///
/// # Default Implementation
/// The query is ignored.
async fn server_list_ping(&self, server: &Server, remote_addr: SocketAddr) -> ServerListPing {
async fn server_list_ping(
&self,
shared: &SharedServer,
remote_addr: SocketAddr,
) -> ServerListPing {
ServerListPing::Ignore
}
@ -163,31 +167,16 @@ pub trait Config: Any + Send + Sync + UnwindSafe + RefUnwindSafe {
///
/// This method is the appropriate place to perform asynchronous
/// operations such as database queries which may take some time to
/// complete. If you need access to the worlds on the server and don't need
/// async, see [`Config::join`].
/// complete.
///
/// This method is called from within a tokio runtime.
///
/// # Default Implementation
/// The client is allowed to join unconditionally.
async fn login(&self, server: &Server, ncd: &NewClientData) -> Result<(), Text> {
async fn login(&self, shared: &SharedServer, ncd: &NewClientData) -> Result<(), Text> {
Ok(())
}
/// Called after a successful [`Config::login`] to determine what world the
/// new client should join. If this method returns with `Err(reason)`, then
/// the client is immediately disconnected with the given reason.
///
/// If the returned [`WorldId`] is invalid, then the client is disconnected.
///
/// This method is called from within a tokio runtime.
fn join(
&self,
server: &Server,
client: &mut Client,
worlds: &mut Worlds,
) -> Result<WorldId, Text>;
/// Called after the server is created, but prior to accepting connections
/// and entering the update loop.
///
@ -195,7 +184,7 @@ pub trait Config: Any + Send + Sync + UnwindSafe + RefUnwindSafe {
/// no connections to the server will be made until this function returns.
///
/// This method is called from within a tokio runtime.
fn init(&self, server: &Server, worlds: &mut Worlds) {}
fn init(&self, server: &mut Server) {}
/// Called once at the beginning of every server update (also known as
/// a "tick").
@ -207,7 +196,7 @@ pub trait Config: Any + Send + Sync + UnwindSafe + RefUnwindSafe {
///
/// # Default Implementation
/// The default implementation does nothing.
fn update(&self, server: &Server, worlds: &mut Worlds);
fn update(&self, server: &mut Server);
}
/// The result of the [`server_list_ping`](Handler::server_list_ping) callback.

View file

@ -18,6 +18,7 @@ use crate::protocol::packets::play::s2c::{
use crate::protocol::{ByteAngle, RawBytes, VarInt};
use crate::slotmap::{Key, SlotMap};
use crate::util::aabb_from_bottom_and_size;
use crate::WorldId;
pub struct Entities {
sm: SlotMap<Entity>,
@ -56,6 +57,7 @@ impl Entities {
let (k, e) = self.sm.insert(Entity {
flags: EntityFlags(0),
meta: EntityMeta::new(EntityType::Marker),
world: None,
new_position: Vec3::default(),
old_position: Vec3::default(),
yaw: 0.0,
@ -174,6 +176,7 @@ impl EntityId {
pub struct Entity {
flags: EntityFlags,
meta: EntityMeta,
world: Option<WorldId>,
new_position: Vec3<f64>,
old_position: Vec3<f64>,
yaw: f32,
@ -226,6 +229,14 @@ impl Entity {
self.flags.set_type_modified(true);
}
pub fn world(&self) -> Option<WorldId> {
self.world
}
pub fn set_world(&mut self, world: impl Into<Option<WorldId>>) {
self.world = world.into();
}
/// Returns the position of this entity in the world it inhabits.
pub fn position(&self) -> Vec3<f64> {
self.new_position

View file

@ -43,7 +43,7 @@ pub use config::Config;
pub use dimension::{Dimension, DimensionId};
pub use entity::{Entities, Entity, EntityId, EntityType};
pub use ident::Ident;
pub use server::{start_server, NewClientData, Server, ShutdownResult};
pub use server::{start_server, NewClientData, Server, SharedServer, ShutdownResult};
pub use spatial_index::SpatialIndex;
pub use text::{Text, TextFormat};
pub use uuid::Uuid;

View file

@ -40,16 +40,24 @@ use crate::protocol::{BoundedArray, BoundedString, VarInt};
use crate::util::valid_username;
use crate::world::Worlds;
use crate::{
Biome, BiomeId, Client, Dimension, DimensionId, Ticks, PROTOCOL_VERSION, VERSION_NAME,
Biome, BiomeId, Client, Clients, Dimension, DimensionId, Entities, Ticks, PROTOCOL_VERSION,
VERSION_NAME,
};
pub struct Server {
pub shared: SharedServer,
pub clients: Clients,
pub entities: Entities,
pub worlds: Worlds,
}
/// A handle to a running Minecraft server containing state which is accessible
/// outside the update loop. Servers are internally refcounted and can be shared
/// between threads.
#[derive(Clone)]
pub struct Server(Arc<ServerInner>);
pub struct SharedServer(Arc<SharedServerInner>);
struct ServerInner {
struct SharedServerInner {
cfg: Box<dyn Config>,
address: SocketAddr,
tick_rate: Ticks,
@ -104,7 +112,7 @@ pub type ShutdownError = Box<dyn Error + Send + Sync + 'static>;
pub(crate) type S2cPacketChannels = (Sender<C2sPlayPacket>, Receiver<S2cPlayPacket>);
pub(crate) type C2sPacketChannels = (Sender<S2cPlayPacket>, Receiver<C2sPlayPacket>);
impl Server {
impl SharedServer {
pub fn config(&self) -> &(impl Config + ?Sized) {
self.0.cfg.as_ref()
}
@ -207,20 +215,25 @@ impl Server {
/// The function returns when the server has shut down, a runtime error
/// occurs, or the configuration is invalid.
pub fn start_server(config: impl Config) -> ShutdownResult {
let server = setup_server(config).map_err(ShutdownError::from)?;
let shared = setup_server(config).map_err(ShutdownError::from)?;
let _guard = server.tokio_handle().enter();
let _guard = shared.tokio_handle().enter();
let mut worlds = Worlds::new(server.clone());
let mut server = Server {
shared: shared.clone(),
clients: Clients::new(),
entities: Entities::new(),
worlds: Worlds::new(shared.clone()),
};
server.config().init(&server, &mut worlds);
shared.config().init(&mut server);
tokio::spawn(do_accept_loop(server.clone()));
tokio::spawn(do_accept_loop(shared));
do_update_loop(server, &mut worlds)
do_update_loop(&mut server)
}
fn setup_server(cfg: impl Config) -> anyhow::Result<Server> {
fn setup_server(cfg: impl Config) -> anyhow::Result<SharedServer> {
let max_connections = cfg.max_connections();
let address = cfg.address();
let tick_rate = cfg.tick_rate();
@ -320,7 +333,7 @@ fn setup_server(cfg: impl Config) -> anyhow::Result<Server> {
None => tokio_handle.unwrap(),
};
let server = ServerInner {
let server = SharedServerInner {
cfg: Box::new(cfg),
address,
tick_rate,
@ -343,33 +356,32 @@ fn setup_server(cfg: impl Config) -> anyhow::Result<Server> {
http_client: HttpClient::new(),
};
Ok(Server(Arc::new(server)))
Ok(SharedServer(Arc::new(server)))
}
fn do_update_loop(server: Server, worlds: &mut Worlds) -> ShutdownResult {
fn do_update_loop(server: &mut Server) -> ShutdownResult {
let mut tick_start = Instant::now();
let shared = server.shared.clone();
loop {
if let Some(res) = server.0.shutdown_result.lock().unwrap().take() {
if let Some(res) = shared.0.shutdown_result.lock().unwrap().take() {
return res;
}
while let Ok(msg) = server.0.new_clients_rx.try_recv() {
join_player(&server, worlds, msg);
while let Ok(msg) = shared.0.new_clients_rx.try_recv() {
join_player(server, msg);
}
// Get serverbound packets first so they are not dealt with a tick late.
worlds.par_iter_mut().for_each(|(_, world)| {
world.clients.par_iter_mut().for_each(|(_, client)| {
client.handle_serverbound_packets(&world.entities);
});
server.clients.par_iter_mut().for_each(|(_, client)| {
client.handle_serverbound_packets(&server.entities);
});
server.config().update(&server, worlds);
shared.config().update(server);
worlds.par_iter_mut().for_each(|(_, world)| {
server.worlds.par_iter_mut().for_each(|(id, world)| {
world.chunks.par_iter_mut().for_each(|(_, chunk)| {
if chunk.created_tick() == server.current_tick() {
if chunk.created_tick() == shared.current_tick() {
// Chunks created this tick can have their changes applied immediately because
// they have not been observed by clients yet. Clients will not have to be sent
// the block change packet in this case.
@ -377,20 +389,16 @@ fn do_update_loop(server: Server, worlds: &mut Worlds) -> ShutdownResult {
}
});
world.spatial_index.update(&world.entities);
world.clients.par_iter_mut().for_each(|(_, client)| {
client.update(
&server,
&world.entities,
&world.spatial_index,
&world.chunks,
&world.meta,
);
world.spatial_index.update(&server.entities, id);
});
world.entities.update();
server.clients.par_iter_mut().for_each(|(_, client)| {
client.update(&shared, &server.entities, &server.worlds);
});
server.entities.update();
server.worlds.par_iter_mut().for_each(|(_, world)| {
world.chunks.par_iter_mut().for_each(|(_, chunk)| {
chunk.apply_modifications();
});
@ -399,53 +407,43 @@ fn do_update_loop(server: Server, worlds: &mut Worlds) -> ShutdownResult {
});
// Sleep for the remainder of the tick.
let tick_duration = Duration::from_secs_f64((server.0.tick_rate as f64).recip());
let tick_duration = Duration::from_secs_f64((shared.0.tick_rate as f64).recip());
thread::sleep(tick_duration.saturating_sub(tick_start.elapsed()));
tick_start = Instant::now();
server.0.tick_counter.fetch_add(1, Ordering::SeqCst);
shared.0.tick_counter.fetch_add(1, Ordering::SeqCst);
}
}
fn join_player(server: &Server, worlds: &mut Worlds, msg: NewClientMessage) {
let (clientbound_tx, clientbound_rx) = flume::bounded(server.0.outgoing_packet_capacity);
let (serverbound_tx, serverbound_rx) = flume::bounded(server.0.incoming_packet_capacity);
fn join_player(server: &mut Server, msg: NewClientMessage) {
let (clientbound_tx, clientbound_rx) = flume::bounded(server.shared.0.outgoing_packet_capacity);
let (serverbound_tx, serverbound_rx) = flume::bounded(server.shared.0.incoming_packet_capacity);
let s2c_packet_channels: S2cPacketChannels = (serverbound_tx, clientbound_rx);
let c2s_packet_channels: C2sPacketChannels = (clientbound_tx, serverbound_rx);
let _ = msg.reply.send(s2c_packet_channels);
let mut client = Client::new(c2s_packet_channels, server, msg.ncd);
let client = Client::new(c2s_packet_channels, &server.shared, msg.ncd);
match server.0.cfg.join(server, &mut client, worlds) {
Ok(world_id) => {
if let Some(world) = worlds.get_mut(world_id) {
if world.entities.get_with_uuid(client.uuid()).is_none() {
world.clients.create(client);
if server.entities.get_with_uuid(client.uuid()).is_none() {
server.clients.insert(client);
} else {
log::warn!(
"client '{}' cannot join the server because their UUID ({}) conflicts \
with an existing entity",
"client '{}' cannot join the server because their UUID ({}) conflicts with an \
existing entity",
client.username(),
client.uuid()
);
}
} else {
log::warn!(
"client '{}' cannot join the server because the WorldId returned by \
Config::join is invalid.",
client.username()
);
}
}
Err(errmsg) => client.disconnect(errmsg),
}
}
type Codec = (Encoder<OwnedWriteHalf>, Decoder<OwnedReadHalf>);
struct Codec {
enc: Encoder<OwnedWriteHalf>,
dec: Decoder<OwnedReadHalf>,
}
async fn do_accept_loop(server: Server) {
async fn do_accept_loop(server: SharedServer) {
log::trace!("entering accept loop");
let listener = match TcpListener::bind(server.0.address).await {
@ -485,18 +483,21 @@ async fn do_accept_loop(server: Server) {
}
async fn handle_connection(
server: Server,
server: SharedServer,
stream: TcpStream,
remote_addr: SocketAddr,
) -> anyhow::Result<()> {
let timeout = Duration::from_secs(10);
let (read, write) = stream.into_split();
let mut c: Codec = (Encoder::new(write, timeout), Decoder::new(read, timeout));
let mut c = Codec {
enc: Encoder::new(write, timeout),
dec: Decoder::new(read, timeout),
};
// TODO: peek stream for 0xFE legacy ping
match c.1.read_packet::<Handshake>().await?.next_state {
match c.dec.read_packet::<Handshake>().await?.next_state {
HandshakeNextState::Status => handle_status(server, &mut c, remote_addr)
.await
.context("error during status"),
@ -513,11 +514,11 @@ async fn handle_connection(
}
async fn handle_status(
server: Server,
server: SharedServer,
c: &mut Codec,
remote_addr: SocketAddr,
) -> anyhow::Result<()> {
c.1.read_packet::<StatusRequest>().await?;
c.dec.read_packet::<StatusRequest>().await?;
match server.0.cfg.server_list_ping(&server, remote_addr).await {
ServerListPing::Respond {
@ -547,7 +548,8 @@ async fn handle_status(
.insert("favicon".to_string(), Value::String(buf));
}
c.0.write_packet(&StatusResponse {
c.enc
.write_packet(&StatusResponse {
json_response: json.to_string(),
})
.await?;
@ -555,30 +557,31 @@ async fn handle_status(
ServerListPing::Ignore => return Ok(()),
}
let PingRequest { payload } = c.1.read_packet().await?;
let PingRequest { payload } = c.dec.read_packet().await?;
c.0.write_packet(&PongResponse { payload }).await?;
c.enc.write_packet(&PongResponse { payload }).await?;
Ok(())
}
/// Handle the login process and return the new player's data if successful.
async fn handle_login(
server: &Server,
server: &SharedServer,
c: &mut Codec,
remote_addr: SocketAddr,
) -> anyhow::Result<Option<NewClientData>> {
let LoginStart {
username: BoundedString(username),
sig_data: _, // TODO
} = c.1.read_packet().await?;
} = c.dec.read_packet().await?;
ensure!(valid_username(&username), "invalid username '{username}'");
let (uuid, textures) = if server.0.online_mode {
let my_verify_token: [u8; 16] = rand::random();
c.0.write_packet(&EncryptionRequest {
c.enc
.write_packet(&EncryptionRequest {
server_id: Default::default(), // Always empty
public_key: server.0.public_key_der.to_vec(),
verify_token: my_verify_token.to_vec().into(),
@ -588,7 +591,7 @@ async fn handle_login(
let EncryptionResponse {
shared_secret: BoundedArray(encrypted_shared_secret),
token_or_sig,
} = c.1.read_packet().await?;
} = c.dec.read_packet().await?;
let shared_secret = server
.0
@ -618,8 +621,8 @@ async fn handle_login(
.try_into()
.context("shared secret has the wrong length")?;
c.0.enable_encryption(&crypt_key);
c.1.enable_encryption(&crypt_key);
c.enc.enable_encryption(&crypt_key);
c.dec.enable_encryption(&crypt_key);
#[derive(Debug, Deserialize)]
struct AuthResponse {
@ -667,13 +670,14 @@ async fn handle_login(
};
let compression_threshold = 256;
c.0.write_packet(&SetCompression {
c.enc
.write_packet(&SetCompression {
threshold: VarInt(compression_threshold as i32),
})
.await?;
c.0.enable_compression(compression_threshold);
c.1.enable_compression(compression_threshold);
c.enc.enable_compression(compression_threshold);
c.dec.enable_compression(compression_threshold);
let npd = NewClientData {
uuid,
@ -684,11 +688,14 @@ async fn handle_login(
if let Err(reason) = server.0.cfg.login(server, &npd).await {
log::info!("Disconnect at login: \"{reason}\"");
c.0.write_packet(&login::s2c::Disconnect { reason }).await?;
c.enc
.write_packet(&login::s2c::Disconnect { reason })
.await?;
return Ok(None);
}
c.0.write_packet(&LoginSuccess {
c.enc
.write_packet(&LoginSuccess {
uuid: npd.uuid,
username: npd.username.clone().into(),
properties: Vec::new(),
@ -698,7 +705,7 @@ async fn handle_login(
Ok(Some(npd))
}
async fn handle_play(server: &Server, c: Codec, ncd: NewClientData) -> anyhow::Result<()> {
async fn handle_play(server: &SharedServer, c: Codec, ncd: NewClientData) -> anyhow::Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
server
@ -715,11 +722,11 @@ async fn handle_play(server: &Server, c: Codec, ncd: NewClientData) -> anyhow::R
Err(_) => return Ok(()), // Server closed
};
let (mut encoder, mut decoder) = c;
let Codec { mut enc, mut dec } = c;
tokio::spawn(async move {
while let Ok(pkt) = packet_rx.recv_async().await {
if let Err(e) = encoder.write_packet(&pkt).await {
if let Err(e) = enc.write_packet(&pkt).await {
log::debug!("error while sending play packet: {e:#}");
break;
}
@ -727,7 +734,7 @@ async fn handle_play(server: &Server, c: Codec, ncd: NewClientData) -> anyhow::R
});
loop {
let pkt = decoder.read_packet().await?;
let pkt = dec.read_packet().await?;
if packet_tx.send_async(pkt).await.is_err() {
break;
}

View file

@ -2,7 +2,7 @@ use vek::{Aabb, Vec3};
use crate::bvh::Bvh;
pub use crate::bvh::TraverseStep;
use crate::{Entities, EntityId};
use crate::{Entities, EntityId, WorldId};
pub struct SpatialIndex {
bvh: Bvh<EntityId>,
@ -96,9 +96,13 @@ impl SpatialIndex {
)
}
pub(crate) fn update(&mut self, entities: &Entities) {
self.bvh
.build(entities.iter().map(|(id, e)| (id, e.hitbox())))
pub(crate) fn update(&mut self, entities: &Entities, id: WorldId) {
self.bvh.build(
entities
.iter()
.filter(|(_, e)| e.world() == Some(id))
.map(|(id, e)| (id, e.hitbox())),
)
}
}

View file

@ -66,8 +66,8 @@ pub fn to_yaw_and_pitch(d: Vec3<f64>) -> (f32, f32) {
(yaw, pitch)
}
// /// Takes yaw and pitch angles (in degrees) and returns a normalized direction
// /// vector.
// /// Takes yaw and pitch angles (in degrees) and returns a normalized
// direction /// vector.
// ///
// /// This function is the inverse of [`to_yaw_and_pitch`].
// pub fn from_yaw_and_pitch(yaw: f32, pitch: f32) -> Vec3<f64> {

View file

@ -4,18 +4,18 @@ use rayon::iter::ParallelIterator;
use crate::player_list::PlayerList;
use crate::slotmap::{Key, SlotMap};
use crate::{Chunks, Clients, DimensionId, Entities, Server, SpatialIndex};
use crate::{Chunks, DimensionId, SharedServer, SpatialIndex};
pub struct Worlds {
sm: SlotMap<World>,
server: Server,
server: SharedServer,
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct WorldId(Key);
impl Worlds {
pub(crate) fn new(server: Server) -> Self {
pub(crate) fn new(server: SharedServer) -> Self {
Self {
sm: SlotMap::new(),
server,
@ -24,8 +24,6 @@ impl Worlds {
pub fn create(&mut self, dim: DimensionId) -> (WorldId, &mut World) {
let (id, world) = self.sm.insert(World {
clients: Clients::new(),
entities: Entities::new(),
spatial_index: SpatialIndex::new(),
chunks: Chunks::new(self.server.clone(), dim),
meta: WorldMeta {
@ -81,8 +79,6 @@ impl Worlds {
}
pub struct World {
pub clients: Clients,
pub entities: Entities,
pub spatial_index: SpatialIndex,
pub chunks: Chunks,
pub meta: WorldMeta,