Replace log crate with tracing (#157)

The `tracing` crate seems to be the go-to
logging/profiling/instrumentation solution nowadays. Perhaps in the
future we could use `tracing` for profiling instead of (or in addition
to) the `perf`-based `cargo flamegraph` command. This would sidestep the
issue of `rayon` polluting the output. I conducted an initial experiment
by adding some more spans but wasn't very happy with the result.

Log messages have also been improved. There is some additional context
and events are raised when clients are added/removed from the server.
This commit is contained in:
Ryan Johnson 2022-11-16 18:22:44 -08:00 committed by GitHub
parent ab2e1081ed
commit 4226201fed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 110 additions and 125 deletions

View file

@ -22,7 +22,6 @@ bytes = "1.2.1"
flume = "0.10.14" flume = "0.10.14"
futures = "0.3.24" futures = "0.3.24"
hmac = "0.12.1" hmac = "0.12.1"
log = "0.4.17"
num = "0.4.0" num = "0.4.0"
paste = "1.0.9" paste = "1.0.9"
rand = "0.8.5" rand = "0.8.5"
@ -34,6 +33,7 @@ serde_json = "1.0.85"
sha1 = "0.10.5" sha1 = "0.10.5"
sha2 = "0.10.6" sha2 = "0.10.6"
thiserror = "1.0.35" thiserror = "1.0.35"
tracing = "0.1.37"
url = { version = "2.2.2", features = ["serde"] } url = { version = "2.2.2", features = ["serde"] }
uuid = { version = "1.1.2", features = ["serde"] } uuid = { version = "1.1.2", features = ["serde"] }
valence_nbt = { version = "0.4.0", path = "valence_nbt" } valence_nbt = { version = "0.4.0", path = "valence_nbt" }
@ -51,7 +51,7 @@ default-features = false
features = ["rustls-tls", "json"] features = ["rustls-tls", "json"]
[dev-dependencies] [dev-dependencies]
env_logger = "0.9.1" tracing-subscriber = "0.3.16"
noise = "0.7.0" noise = "0.7.0"
[build-dependencies] [build-dependencies]

View file

@ -2,14 +2,10 @@ use std::iter;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use valence::prelude::*; use valence::prelude::*;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -1,16 +1,12 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use num::Integer; use num::Integer;
use valence::client::DiggingStatus; use valence::client::DiggingStatus;
use valence::prelude::*; use valence::prelude::*;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -1,16 +1,12 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use num::Integer; use num::Integer;
use valence::prelude::*; use valence::prelude::*;
use valence_protocol::var_int::VarInt; use valence::protocol::VarInt;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -1,14 +1,10 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use valence::prelude::*; use valence::prelude::*;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -2,16 +2,12 @@ use std::mem;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use num::Integer; use num::Integer;
use rayon::prelude::*; use rayon::prelude::*;
use valence::prelude::*; use valence::prelude::*;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -3,15 +3,11 @@ use std::f64::consts::TAU;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use uuid::Uuid; use uuid::Uuid;
use valence::prelude::*; use valence::prelude::*;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -1,14 +1,10 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use valence::prelude::*; use valence::prelude::*;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -1,15 +1,11 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use valence::prelude::*; use valence::prelude::*;
use valence_protocol::entity_meta::{Facing, PaintingKind}; use valence_protocol::entity_meta::{Facing, PaintingKind};
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -1,16 +1,12 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use num::Integer; use num::Integer;
pub use valence::prelude::*; pub use valence::prelude::*;
use valence_protocol::types::ClickContainerMode; use valence_protocol::types::ClickContainerMode;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -3,16 +3,12 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use log::LevelFilter;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::Rng; use rand::Rng;
use valence::prelude::*; use valence::prelude::*;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -1,16 +1,12 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use log::LevelFilter;
use valence::prelude::*; use valence::prelude::*;
use valence_protocol::packets::c2s::play::ResourcePackC2s; use valence_protocol::packets::c2s::play::ResourcePackC2s;
use valence_protocol::types::EntityInteraction; use valence_protocol::types::EntityInteraction;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game { Game {

View file

@ -2,17 +2,13 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::SystemTime; use std::time::SystemTime;
use log::LevelFilter;
use noise::{NoiseFn, Seedable, SuperSimplex}; use noise::{NoiseFn, Seedable, SuperSimplex};
use rayon::iter::ParallelIterator; use rayon::iter::ParallelIterator;
pub use valence::prelude::*; pub use valence::prelude::*;
use vek::Lerp; use vek::Lerp;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
let seconds_per_day = 86_400; let seconds_per_day = 86_400;

View file

@ -4,6 +4,6 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
env_logger = "0.9.1" tracing-subscriber = "0.3.16"
log = "0.4.17"
valence = { path = "../.." } valence = { path = "../.." }

View file

@ -1,14 +1,10 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Instant; use std::time::Instant;
use log::LevelFilter;
use valence::prelude::*; use valence::prelude::*;
pub fn main() -> ShutdownResult { pub fn main() -> ShutdownResult {
env_logger::Builder::new() tracing_subscriber::fmt().init();
.filter_module("valence", LevelFilter::Trace)
.parse_default_env()
.init();
valence::start_server( valence::start_server(
Game, Game,
@ -119,8 +115,6 @@ impl Config for Game {
server.clients.retain(|_, client| { server.clients.retain(|_, client| {
if client.created_this_tick() { if client.created_this_tick() {
log::info!("Client \"{}\" joined", client.username());
client.spawn(world_id); client.spawn(world_id);
client.set_flat(true); client.set_flat(true);
client.teleport([0.0, 1.0, 0.0], 0.0, 0.0); client.teleport([0.0, 1.0, 0.0], 0.0, 0.0);
@ -152,8 +146,6 @@ impl Config for Game {
} }
if client.is_disconnected() { if client.is_disconnected() {
log::info!("Client \"{}\" disconnected", client.username());
if WITH_PLAYER_ENTITIES { if WITH_PLAYER_ENTITIES {
if let Some(id) = &server.state.player_list { if let Some(id) = &server.state.player_list {
server.player_lists.get_mut(id).remove(client.uuid()); server.player_lists.get_mut(id).remove(client.uuid());

View file

@ -3,6 +3,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use anyhow::ensure; use anyhow::ensure;
use tracing::warn;
use valence_nbt::{compound, Compound}; use valence_nbt::{compound, Compound};
use valence_protocol::ident; use valence_protocol::ident;
use valence_protocol::ident::Ident; use valence_protocol::ident::Ident;
@ -157,7 +158,7 @@ pub(crate) fn validate_biomes(biomes: &[Biome]) -> anyhow::Result<()> {
} }
if !names.contains(&ident!("plains")) { if !names.contains(&ident!("plains")) {
log::warn!( warn!(
"A biome named \"plains\" is missing from the biome registry! Due to a bug in the \ "A biome named \"plains\" is missing from the biome registry! Due to a bug in the \
vanilla client, players may not be able to join the game!" vanilla client, players may not be able to join the game!"
); );

View file

@ -2,8 +2,7 @@ use std::array;
use std::io::Write; use std::io::Write;
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use valence_protocol::var_int::VarInt; use valence_protocol::{Encode, VarInt};
use valence_protocol::Encode;
use crate::chunk::{compact_u64s_len, encode_compact_u64s}; use crate::chunk::{compact_u64s_len, encode_compact_u64s};
use crate::util::bits_needed; use crate::util::bits_needed;

View file

@ -9,6 +9,7 @@ use anyhow::{bail, Context};
pub use bitfield_struct::bitfield; pub use bitfield_struct::bitfield;
pub use event::*; pub use event::*;
use rayon::iter::ParallelIterator; use rayon::iter::ParallelIterator;
use tracing::{error, info, warn};
use uuid::Uuid; use uuid::Uuid;
use valence_protocol::packets::c2s::play::ClientCommand; use valence_protocol::packets::c2s::play::ClientCommand;
use valence_protocol::packets::s2c::play::{ use valence_protocol::packets::s2c::play::{
@ -81,14 +82,24 @@ impl<C: Config> Clients<C> {
/// and the client is deleted. Otherwise, `None` is returned and the /// and the client is deleted. Otherwise, `None` is returned and the
/// function has no effect. /// function has no effect.
pub fn remove(&mut self, client: ClientId) -> Option<C::ClientState> { pub fn remove(&mut self, client: ClientId) -> Option<C::ClientState> {
self.slab.remove(client.0).map(|c| c.state) self.slab.remove(client.0).map(|c| {
info!(username = %c.username, uuid = %c.uuid, "removing client");
c.state
})
} }
/// Deletes all clients from the server for which `f` returns `false`. /// Deletes all clients from the server for which `f` returns `false`.
/// ///
/// All clients are visited in an unspecified order. /// All clients are visited in an unspecified order.
pub fn retain(&mut self, mut f: impl FnMut(ClientId, &mut Client<C>) -> bool) { pub fn retain(&mut self, mut f: impl FnMut(ClientId, &mut Client<C>) -> bool) {
self.slab.retain(|k, v| f(ClientId(k), v)) self.slab.retain(|k, v| {
if !f(ClientId(k), v) {
info!(username = %v.username, uuid = %v.uuid, "removing client");
false
} else {
true
}
})
} }
/// Returns the number of clients on the server. This includes clients for /// Returns the number of clients on the server. This includes clients for
@ -321,10 +332,10 @@ impl<C: Config> Client<C> {
{ {
if let Some(ctrl) = &mut self.ctrl { if let Some(ctrl) = &mut self.ctrl {
if let Err(e) = ctrl.append_packet(pkt) { if let Err(e) = ctrl.append_packet(pkt) {
log::warn!( warn!(
"failed to queue packet {} for client {}: {e:#}", username = %self.username,
pkt.packet_name(), uuid = %self.uuid,
&self.username "failed to queue packet: {e:#}"
); );
self.ctrl = None; self.ctrl = None;
} }
@ -817,7 +828,7 @@ impl<C: Config> Client<C> {
pub fn disconnect(&mut self, reason: impl Into<Text>) { pub fn disconnect(&mut self, reason: impl Into<Text>) {
if self.ctrl.is_some() { if self.ctrl.is_some() {
let txt = reason.into(); let txt = reason.into();
log::info!("disconnecting client '{}': \"{txt}\"", self.username); info!("disconnecting client '{}': \"{txt}\"", self.username);
self.queue_packet(&DisconnectPlay { reason: txt }); self.queue_packet(&DisconnectPlay { reason: txt });
@ -829,7 +840,7 @@ impl<C: Config> Client<C> {
/// displayed. /// displayed.
pub fn disconnect_no_reason(&mut self) { pub fn disconnect_no_reason(&mut self) {
if self.ctrl.is_some() { if self.ctrl.is_some() {
log::info!("disconnecting client '{}'", self.username); info!("disconnecting client '{}'", self.username);
self.ctrl = None; self.ctrl = None;
} }
} }
@ -859,7 +870,7 @@ impl<C: Config> Client<C> {
Ok(Some(pkt)) => { Ok(Some(pkt)) => {
let name = pkt.packet_name(); let name = pkt.packet_name();
if let Err(e) = self.handle_serverbound_packet(entities, pkt) { if let Err(e) = self.handle_serverbound_packet(entities, pkt) {
log::error!( error!(
"failed to handle {name} packet from client {}: {e:#}", "failed to handle {name} packet from client {}: {e:#}",
&self.username &self.username
); );
@ -871,7 +882,7 @@ impl<C: Config> Client<C> {
return; return;
} }
Err(e) => { Err(e) => {
log::error!( error!(
"failed to read next serverbound packet from client {}: {e:#}", "failed to read next serverbound packet from client {}: {e:#}",
&self.username &self.username
); );
@ -1175,7 +1186,11 @@ impl<C: Config> Client<C> {
) { ) {
Ok(()) => self.ctrl = Some(ctrl), Ok(()) => self.ctrl = Some(ctrl),
Err(e) => { Err(e) => {
log::warn!("error updating client '{}': {e:#}", &self.username); error!(
username = %self.username,
uuid = %self.uuid,
"error updating client: {e:#}"
);
} }
} }
} }
@ -1197,7 +1212,7 @@ impl<C: Config> Client<C> {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let world = match worlds.get(self.world) { let world = match worlds.get(self.world) {
Some(world) => world, Some(world) => world,
None => bail!("client is in an invalid world and must be disconnected",), None => bail!("client is in an invalid world and must be disconnected"),
}; };
let current_tick = shared.current_tick(); let current_tick = shared.current_tick();

View file

@ -6,9 +6,8 @@ use std::collections::{HashMap, HashSet};
use bitfield_struct::bitfield; use bitfield_struct::bitfield;
use uuid::Uuid; use uuid::Uuid;
use valence_protocol::packets::s2c::play::{PlayerInfo, SetTabListHeaderAndFooter}; use valence_protocol::packets::s2c::play::{PlayerInfo, SetTabListHeaderAndFooter};
use valence_protocol::text::Text;
use valence_protocol::types::{GameMode, PlayerInfoAddPlayer, SignedProperty}; use valence_protocol::types::{GameMode, PlayerInfoAddPlayer, SignedProperty};
use valence_protocol::var_int::VarInt; use valence_protocol::{Text, VarInt};
use crate::config::Config; use crate::config::Config;
use crate::player_textures::SignedPlayerTextures; use crate::player_textures::SignedPlayerTextures;

View file

@ -20,6 +20,7 @@ use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::{Handle, Runtime}; use tokio::runtime::{Handle, Runtime};
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use tracing::{error, info, info_span, instrument, trace, warn};
use uuid::Uuid; use uuid::Uuid;
use valence_nbt::{compound, Compound, List}; use valence_nbt::{compound, Compound, List};
use valence_protocol::packets::c2s::handshake::HandshakeOwned; use valence_protocol::packets::c2s::handshake::HandshakeOwned;
@ -63,6 +64,7 @@ pub struct Server<C: Config> {
pub worlds: Worlds<C>, pub worlds: Worlds<C>,
/// All of the player lists on the server. /// All of the player lists on the server.
pub player_lists: PlayerLists<C>, pub player_lists: PlayerLists<C>,
/// All of the inventories on the server.
pub inventories: Inventories, pub inventories: Inventories,
} }
@ -276,13 +278,14 @@ pub fn start_server<C: Config>(config: C, data: C::ServerState) -> ShutdownResul
inventories: Inventories::new(), inventories: Inventories::new(),
}; };
shared.config().init(&mut server); info_span!("configured_init").in_scope(|| shared.config().init(&mut server));
tokio::spawn(do_accept_loop(shared)); tokio::spawn(do_accept_loop(shared));
do_update_loop(&mut server) do_update_loop(&mut server)
} }
#[instrument(skip_all)]
fn setup_server<C: Config>(cfg: C) -> anyhow::Result<SharedServer<C>> { fn setup_server<C: Config>(cfg: C) -> anyhow::Result<SharedServer<C>> {
let max_connections = cfg.max_connections(); let max_connections = cfg.max_connections();
let address = cfg.address(); let address = cfg.address();
@ -381,7 +384,6 @@ fn make_registry_codec(dimensions: &[Dimension], biomes: &[Biome]) -> Compound {
.map(|(id, biome)| biome.to_biome_registry_item(id as i32)) .map(|(id, biome)| biome.to_biome_registry_item(id as i32))
.collect()) .collect())
} }
}, },
ident!("chat_type") => compound! { ident!("chat_type") => compound! {
"type" => ident!("chat_type"), "type" => ident!("chat_type"),
@ -392,25 +394,36 @@ fn make_registry_codec(dimensions: &[Dimension], biomes: &[Biome]) -> Compound {
fn do_update_loop(server: &mut Server<impl Config>) -> ShutdownResult { fn do_update_loop(server: &mut Server<impl Config>) -> ShutdownResult {
let mut tick_start = Instant::now(); let mut tick_start = Instant::now();
let mut current_tick = 0;
let shared = server.shared.clone(); let shared = server.shared.clone();
loop { loop {
let _span = info_span!("update_loop", tick = current_tick).entered();
if let Some(res) = shared.0.shutdown_result.lock().unwrap().take() { if let Some(res) = shared.0.shutdown_result.lock().unwrap().take() {
return res; return res;
} }
while let Ok(msg) = shared.0.new_clients_rx.try_recv() { while let Ok(msg) = shared.0.new_clients_rx.try_recv() {
info!(
username = %msg.ncd.username,
uuid = %msg.ncd.uuid,
ip = %msg.ncd.remote_addr,
"inserting client"
);
server server
.clients .clients
.insert(Client::new(msg.ctrl, msg.ncd, Default::default())); .insert(Client::new(msg.ctrl, msg.ncd, Default::default()));
} }
// Get serverbound packets first so they are not dealt with a tick late. // Get serverbound packets first so they are not dealt with a tick late.
server.clients.par_iter_mut().for_each(|(_, client)| { server.clients.par_iter_mut().for_each(|(_, client)| {
client.handle_serverbound_packets(&server.entities); client.handle_serverbound_packets(&server.entities);
}); });
shared.config().update(server); info_span!("configured_update").in_scope(|| shared.config().update(server));
server.worlds.par_iter_mut().for_each(|(id, world)| { server.worlds.par_iter_mut().for_each(|(id, world)| {
world.spatial_index.update(&server.entities, id); world.spatial_index.update(&server.entities, id);
@ -433,6 +446,7 @@ fn do_update_loop(server: &mut Server<impl Config>) -> ShutdownResult {
}); });
server.player_lists.update(); server.player_lists.update();
server.inventories.update(); server.inventories.update();
// Sleep for the remainder of the tick. // Sleep for the remainder of the tick.
@ -440,13 +454,12 @@ fn do_update_loop(server: &mut Server<impl Config>) -> ShutdownResult {
thread::sleep(tick_duration.saturating_sub(tick_start.elapsed())); thread::sleep(tick_duration.saturating_sub(tick_start.elapsed()));
tick_start = Instant::now(); tick_start = Instant::now();
shared.0.tick_counter.fetch_add(1, Ordering::SeqCst); current_tick = shared.0.tick_counter.fetch_add(1, Ordering::SeqCst) + 1;
} }
} }
#[instrument(skip_all)]
async fn do_accept_loop(server: SharedServer<impl Config>) { async fn do_accept_loop(server: SharedServer<impl Config>) {
log::trace!("entering accept loop");
let listener = match TcpListener::bind(server.0.address).await { let listener = match TcpListener::bind(server.0.address).await {
Ok(listener) => listener, Ok(listener) => listener,
Err(e) => { Err(e) => {
@ -461,23 +474,12 @@ async fn do_accept_loop(server: SharedServer<impl Config>) {
Ok((stream, remote_addr)) => { Ok((stream, remote_addr)) => {
let server = server.clone(); let server = server.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = stream.set_nodelay(true) { handle_connection(server, stream, remote_addr).await;
log::error!("failed to set TCP_NODELAY: {e}");
}
if let Err(e) = handle_connection(server, stream, remote_addr).await {
if let Some(e) = e.downcast_ref::<io::Error>() {
if e.kind() == io::ErrorKind::UnexpectedEof {
return;
}
}
log::error!("connection to {remote_addr} ended: {e:#}");
}
drop(permit); drop(permit);
}); });
} }
Err(e) => { Err(e) => {
log::error!("failed to accept incoming connection: {e}"); error!("failed to accept incoming connection: {e}");
} }
}, },
// Closed semaphore indicates server shutdown. // Closed semaphore indicates server shutdown.
@ -486,14 +488,21 @@ async fn do_accept_loop(server: SharedServer<impl Config>) {
} }
} }
#[instrument(skip(server, stream))]
async fn handle_connection( async fn handle_connection(
server: SharedServer<impl Config>, server: SharedServer<impl Config>,
stream: TcpStream, stream: TcpStream,
remote_addr: SocketAddr, remote_addr: SocketAddr,
) -> anyhow::Result<()> { ) {
trace!("handling connection");
if let Err(e) = stream.set_nodelay(true) {
error!("failed to set TCP_NODELAY: {e}");
}
let (read, write) = stream.into_split(); let (read, write) = stream.into_split();
let mut ctrl = InitialPacketController::new( let ctrl = InitialPacketController::new(
read, read,
write, write,
PacketEncoder::new(), PacketEncoder::new(),
@ -503,6 +512,23 @@ async fn handle_connection(
// TODO: peek stream for 0xFE legacy ping // TODO: peek stream for 0xFE legacy ping
if let Err(e) = handle_handshake(server, ctrl, remote_addr).await {
// EOF can happen if the client disconnects while joining, which isn't
// very erroneous.
if let Some(e) = e.downcast_ref::<io::Error>() {
if e.kind() == io::ErrorKind::UnexpectedEof {
return;
}
}
warn!("connection ended with error: {e:#}");
}
}
async fn handle_handshake(
server: SharedServer<impl Config>,
mut ctrl: InitialPacketController<OwnedReadHalf, OwnedWriteHalf>,
remote_addr: SocketAddr,
) -> anyhow::Result<()> {
let handshake = ctrl.recv_packet::<HandshakeOwned>().await?; let handshake = ctrl.recv_packet::<HandshakeOwned>().await?;
ensure!( ensure!(
@ -514,10 +540,10 @@ async fn handle_connection(
match handshake.next_state { match handshake.next_state {
HandshakeNextState::Status => handle_status(server, ctrl, remote_addr, handshake) HandshakeNextState::Status => handle_status(server, ctrl, remote_addr, handshake)
.await .await
.context("error during status"), .context("error handling status"),
HandshakeNextState::Login => match handle_login(&server, &mut ctrl, remote_addr, handshake) HandshakeNextState::Login => match handle_login(&server, &mut ctrl, remote_addr, handshake)
.await .await
.context("error during login")? .context("error handling login")?
{ {
Some(ncd) => { Some(ncd) => {
let msg = NewClientMessage { let msg = NewClientMessage {
@ -631,7 +657,7 @@ async fn handle_login(
} }
if let Err(reason) = server.0.cfg.login(server, &ncd).await { if let Err(reason) = server.0.cfg.login(server, &ncd).await {
log::info!("Disconnect at login: \"{reason}\""); info!("disconnect at login: \"{reason}\"");
ctrl.send_packet(&DisconnectLogin { reason }).await?; ctrl.send_packet(&DisconnectLogin { reason }).await?;
return Ok(None); return Ok(None);
} }

View file

@ -7,6 +7,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
use tracing::debug;
use valence_protocol::{Decode, Encode, Packet, PacketDecoder, PacketEncoder}; use valence_protocol::{Decode, Encode, Packet, PacketDecoder, PacketEncoder};
use crate::server::byte_channel::{byte_channel, ByteReceiver, ByteSender, TryRecvError}; use crate::server::byte_channel::{byte_channel, ByteReceiver, ByteSender, TryRecvError};
@ -129,7 +130,7 @@ where
match self.reader.read_buf(&mut buf).await { match self.reader.read_buf(&mut buf).await {
Ok(0) => break, Ok(0) => break,
Err(e) => { Err(e) => {
log::warn!("error reading packet data: {e}"); debug!("error reading packet data: {e}");
break; break;
} }
_ => {} _ => {}
@ -137,7 +138,7 @@ where
// This should always be an O(1) unsplit because we reserved space earlier. // This should always be an O(1) unsplit because we reserved space earlier.
if let Err(e) = incoming_sender.send_async(buf).await { if let Err(e) = incoming_sender.send_async(buf).await {
log::warn!("error sending packet data: {e}"); debug!("error sending packet data: {e}");
break; break;
} }
} }
@ -150,13 +151,13 @@ where
let bytes = match outgoing_receiver.recv_async().await { let bytes = match outgoing_receiver.recv_async().await {
Ok(bytes) => bytes, Ok(bytes) => bytes,
Err(e) => { Err(e) => {
log::warn!("error receiving packet data: {e}"); debug!("error receiving packet data: {e}");
break; break;
} }
}; };
if let Err(e) = self.writer.write_all(&bytes).await { if let Err(e) = self.writer.write_all(&bytes).await {
log::warn!("error writing packet data: {e}"); debug!("error writing packet data: {e}");
} }
} }
}); });

View file

@ -2,6 +2,7 @@ use std::iter::FusedIterator;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use rayon::iter::{IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator}; use rayon::iter::{IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator};
use tracing::warn;
use crate::slab::Slab; use crate::slab::Slab;
@ -87,7 +88,7 @@ impl<T> VersionedSlab<T> {
pub fn insert_with(&mut self, f: impl FnOnce(Key) -> T) -> (Key, &mut T) { pub fn insert_with(&mut self, f: impl FnOnce(Key) -> T) -> (Key, &mut T) {
let version = self.version; let version = self.version;
self.version = NonZeroU32::new(version.get().wrapping_add(1)).unwrap_or_else(|| { self.version = NonZeroU32::new(version.get().wrapping_add(1)).unwrap_or_else(|| {
log::warn!("slab version overflow"); warn!("slab version overflow");
ONE ONE
}); });

View file

@ -114,16 +114,15 @@ mod raw_bytes;
pub mod text; pub mod text;
pub mod types; pub mod types;
pub mod username; pub mod username;
pub mod var_int; mod var_int;
pub mod var_long; mod var_long;
/// Used only by proc macros. Not public API. /// Used only by proc macros. Not public API.
#[doc(hidden)] #[doc(hidden)]
pub mod __private { pub mod __private {
pub use anyhow::{anyhow, bail, ensure, Context, Result}; pub use anyhow::{anyhow, bail, ensure, Context, Result};
pub use crate::var_int::VarInt; pub use crate::{Decode, DerivedPacketDecode, DerivedPacketEncode, Encode, VarInt};
pub use crate::{Decode, DerivedPacketDecode, DerivedPacketEncode, Encode};
} }
/// The maximum number of bytes in a single Minecraft packet. /// The maximum number of bytes in a single Minecraft packet.