Redesign packet processing and improve Client update procedure. (#146)

Closes #82
Closes #43
Closes #64

# Changes and Improvements
- Packet encoding/decoding happens within `Client` instead of being sent
over a channel first. This is better for performance and lays the
groundwork for #83.
- Reduce the amount of copying necessary by leveraging the `bytes` crate
and recent changes to `EncodePacket`. Performance is noticeably improved
with maximum players in the `rust-mc-bot` test going from 750 to 1050.
- Packet encoding/decoding code is decoupled from IO. This is easier to
understand and more suitable for a future protocol lib.
- Precise control over the number of bytes that are buffered for
sending/receiving. This is important for limiting maximum memory usage
correctly.
- "packet controllers" are introduced, which are convenient structures
for managing packet IO before and during the play state.
- `byte_channel` module is created to help implement the
`PlayPacketController`. This is essentially a channel of bytes
implemented with an `Arc<Mutex<BytesMut>>`.
- Error handling in the update procedure for clients was improved using
`anyhow::Result<()>` to exit as early as possible. The `client` module
is a bit cleaner as a result.
- The `LoginPlay` packet is always sent before all other play packets.
We no longer have to worry about the behavior of packets sent before
that packet. Most packet deferring performed currently can be
eliminated.
- The packet_inspector was rewritten in response to the above changes.
- Timeouts on IO operations behave better.

# Known Issues
- The packet_inspector now re-encodes packets rather than just decoding
them. This will cause problems when trying to use it with the vanilla
server because there are missing clientbound packets and other issues.
This will be fixed when the protocol module is moved to a separate
crate.
This commit is contained in:
Ryan Johnson 2022-11-01 03:11:51 -07:00 committed by GitHub
parent a20ed2ac21
commit f4714cf255
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
31 changed files with 1488 additions and 1059 deletions

View file

@ -21,7 +21,7 @@ base64 = "0.13.0"
bitfield-struct = "0.1.7"
bitvec = "1.0.1"
byteorder = "1.4.3"
cesu8 = "1.1.0"
bytes = "1.2.1"
cfb8 = "0.7.1"
flate2 = "1.0.24"
flume = "0.10.14"

View file

@ -45,11 +45,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
fn dimensions(&self) -> Vec<Dimension> {
vec![Dimension {
fixed_time: Some(6000),

View file

@ -47,11 +47,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
fn dimensions(&self) -> Vec<Dimension> {
vec![Dimension {
fixed_time: Some(6000),

View file

@ -55,11 +55,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
fn dimensions(&self) -> Vec<Dimension> {
vec![Dimension {
fixed_time: Some(6000),

View file

@ -53,11 +53,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
async fn server_list_ping(
&self,
_server: &SharedServer<Self>,

View file

@ -61,11 +61,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
fn address(&self) -> SocketAddr {
SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 25565).into() // TODO remove
}

View file

@ -46,11 +46,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
async fn server_list_ping(
&self,
_server: &SharedServer<Self>,

View file

@ -79,11 +79,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
fn dimensions(&self) -> Vec<Dimension> {
vec![
Dimension {

View file

@ -47,11 +47,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
async fn server_list_ping(
&self,
_server: &SharedServer<Self>,

View file

@ -55,11 +55,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
fn dimensions(&self) -> Vec<Dimension> {
vec![Dimension {
fixed_time: Some(6000),

View file

@ -67,11 +67,6 @@ impl Config for Game {
type ChunkState = bool;
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
async fn server_list_ping(
&self,
_server: &SharedServer<Self>,

View file

@ -49,11 +49,6 @@ impl Config for Game {
type ChunkState = ();
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
async fn server_list_ping(
&self,
_server: &SharedServer<Self>,

View file

@ -55,11 +55,6 @@ impl Config for Game {
type ChunkState = bool;
type PlayerListState = ();
fn max_connections(&self) -> usize {
// We want status pings to be successful even if the server is full.
MAX_PLAYERS + 64
}
async fn server_list_ping(
&self,
_server: &SharedServer<Self>,

View file

@ -13,7 +13,9 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Semaphore;
use valence::protocol::codec::Decoder;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use valence::protocol::codec::{PacketDecoder, PacketEncoder};
use valence::protocol::packets::c2s::handshake::{Handshake, HandshakeNextState};
use valence::protocol::packets::c2s::login::{EncryptionResponse, LoginStart};
use valence::protocol::packets::c2s::play::C2sPlayPacket;
@ -22,7 +24,6 @@ use valence::protocol::packets::s2c::login::{LoginSuccess, S2cLoginPacket};
use valence::protocol::packets::s2c::play::S2cPlayPacket;
use valence::protocol::packets::s2c::status::{PingResponse, StatusResponse};
use valence::protocol::packets::{DecodePacket, EncodePacket, PacketName};
use valence::protocol::{Encode, VarInt};
#[derive(Parser, Clone, Debug)]
#[clap(author, version, about)]
@ -47,51 +48,70 @@ struct Cli {
timestamp: bool,
}
impl Cli {
fn print(&self, p: &(impl fmt::Debug + PacketName)) {
if let Some(r) = &self.regex {
if !r.is_match(p.packet_name()) {
struct State {
cli: Arc<Cli>,
enc: PacketEncoder,
dec: PacketDecoder,
read: OwnedReadHalf,
write: OwnedWriteHalf,
}
const TIMEOUT: Duration = Duration::from_secs(10);
impl State {
pub async fn rw_packet<P>(&mut self) -> anyhow::Result<P>
where
P: DecodePacket + EncodePacket,
{
timeout(TIMEOUT, async {
loop {
if let Some(pkt) = self.dec.try_next_packet()? {
self.enc.append_packet(&pkt)?;
let bytes = self.enc.take();
self.write.write_all(&bytes).await?;
self.print(&pkt);
return Ok(pkt);
}
self.dec.reserve(4096);
let mut buf = self.dec.take_capacity();
if self.read.read_buf(&mut buf).await? == 0 {
return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
}
self.dec.queue_bytes(buf);
}
})
.await?
}
fn print<P>(&self, pkt: &P)
where
P: fmt::Debug + PacketName + ?Sized,
{
if let Some(r) = &self.cli.regex {
if !r.is_match(pkt.packet_name()) {
return;
}
}
if self.timestamp {
if self.cli.timestamp {
let now: DateTime<Utc> = Utc::now();
println!("{now} {p:#?}");
println!("{now} {pkt:#?}");
} else {
println!("{p:#?}");
println!("{pkt:#?}");
}
}
async fn rw_packet<P: DecodePacket + EncodePacket>(
&self,
read: &mut Decoder<OwnedReadHalf>,
write: &mut OwnedWriteHalf,
) -> anyhow::Result<P> {
let pkt = read.read_packet().await;
if let Ok(pkt) = &pkt {
self.print(pkt);
}
let mut len_buf = [0u8; VarInt::MAX_SIZE];
let len = VarInt(read.packet_buf().len() as i32);
len.encode(&mut len_buf.as_mut_slice())?;
write.write_all(&len_buf[..len.encoded_len()]).await?;
write.write_all(read.packet_buf()).await?;
pkt
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cli = Cli::parse();
let cli = Arc::new(Cli::parse());
let sema = Arc::new(Semaphore::new(
cli.max_connections.unwrap_or(usize::MAX).min(100_000),
));
let sema = Arc::new(Semaphore::new(cli.max_connections.unwrap_or(100_000)));
eprintln!("Waiting for connections on {}", cli.client);
let listen = TcpListener::bind(cli.client).await?;
@ -100,6 +120,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let (client, remote_client_addr) = listen.accept().await?;
eprintln!("Accepted connection to {remote_client_addr}");
if let Err(e) = client.set_nodelay(true) {
eprintln!("Failed to set TCP_NODELAY: {e}");
}
let cli = cli.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(client, cli).await {
@ -114,45 +138,51 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}
async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> {
async fn handle_connection(client: TcpStream, cli: Arc<Cli>) -> anyhow::Result<()> {
eprintln!("Connecting to {}", cli.server);
let server = TcpStream::connect(cli.server).await?;
let (client_read, mut client_write) = client.into_split();
let (server_read, mut server_write) = server.into_split();
if let Err(e) = server.set_nodelay(true) {
eprintln!("Failed to set TCP_NODELAY: {e}");
}
let timeout = Duration::from_secs(10);
let (client_read, client_write) = client.into_split();
let (server_read, server_write) = server.into_split();
let mut client_read = Decoder::new(client_read, timeout);
let mut s2c = State {
cli: cli.clone(),
enc: PacketEncoder::new(),
dec: PacketDecoder::new(),
read: server_read,
write: client_write,
};
let mut server_read = Decoder::new(server_read, timeout);
let mut c2s = State {
cli,
enc: PacketEncoder::new(),
dec: PacketDecoder::new(),
read: client_read,
write: server_write,
};
let handshake: Handshake = cli.rw_packet(&mut client_read, &mut server_write).await?;
let handshake: Handshake = c2s.rw_packet().await?;
match handshake.next_state {
HandshakeNextState::Status => {
cli.rw_packet::<StatusRequest>(&mut client_read, &mut server_write)
.await?;
cli.rw_packet::<StatusResponse>(&mut server_read, &mut client_write)
.await?;
c2s.rw_packet::<StatusRequest>().await?;
s2c.rw_packet::<StatusResponse>().await?;
c2s.rw_packet::<PingRequest>().await?;
s2c.rw_packet::<PingResponse>().await?;
cli.rw_packet::<PingRequest>(&mut client_read, &mut server_write)
.await?;
cli.rw_packet::<PingResponse>(&mut server_read, &mut client_write)
.await?;
Ok(())
}
HandshakeNextState::Login => {
cli.rw_packet::<LoginStart>(&mut client_read, &mut server_write)
.await?;
c2s.rw_packet::<LoginStart>().await?;
match cli
.rw_packet::<S2cLoginPacket>(&mut server_read, &mut client_write)
.await?
{
match s2c.rw_packet::<S2cLoginPacket>().await? {
S2cLoginPacket::EncryptionRequest(_) => {
cli.rw_packet::<EncryptionResponse>(&mut client_read, &mut server_write)
.await?;
c2s.rw_packet::<EncryptionResponse>().await?;
eprintln!(
"Encryption was enabled! Packet contents are inaccessible to the proxy. \
@ -160,17 +190,19 @@ async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> {
);
return tokio::select! {
c2s = passthrough(client_read.into_inner(), server_write) => c2s,
s2c = passthrough(server_read.into_inner(), client_write) => s2c,
c2s_res = passthrough(c2s.read, c2s.write) => c2s_res,
s2c_res = passthrough(s2c.read, s2c.write) => s2c_res,
};
}
S2cLoginPacket::SetCompression(pkt) => {
let threshold = pkt.threshold.0 as u32;
client_read.enable_compression(threshold);
server_read.enable_compression(threshold);
cli.rw_packet::<LoginSuccess>(&mut server_read, &mut client_write)
.await?;
s2c.enc.set_compression(Some(threshold));
s2c.dec.set_compression(true);
c2s.enc.set_compression(Some(threshold));
c2s.dec.set_compression(true);
s2c.rw_packet::<LoginSuccess>().await?;
}
S2cLoginPacket::LoginSuccess(_) => {}
S2cLoginPacket::DisconnectLogin(_) => return Ok(()),
@ -179,50 +211,28 @@ async fn handle_connection(client: TcpStream, cli: Cli) -> anyhow::Result<()> {
}
}
let c2s = async {
let c2s_fut: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
loop {
if let Err(e) = cli
.rw_packet::<C2sPlayPacket>(&mut client_read, &mut server_write)
.await
{
if let Some(e) = e.downcast_ref::<io::Error>() {
if e.kind() == ErrorKind::UnexpectedEof {
return Ok(());
c2s.rw_packet::<C2sPlayPacket>().await?;
}
}
eprintln!("Error while decoding serverbound packet: {e:#}");
}
}
};
});
let s2c = async {
let s2c_fut = async move {
loop {
if let Err(e) = cli
.rw_packet::<S2cPlayPacket>(&mut server_read, &mut client_write)
.await
{
if let Some(e) = e.downcast_ref::<io::Error>() {
if e.kind() == ErrorKind::UnexpectedEof {
return Ok(());
}
}
eprintln!("Error while decoding clientbound packet: {e:#}");
}
s2c.rw_packet::<S2cPlayPacket>().await?;
}
};
return tokio::select! {
c2s = c2s => c2s,
s2c = s2c => s2c,
};
tokio::select! {
c2s = c2s_fut => Ok(c2s??),
s2c = s2c_fut => s2c,
}
}
}
Ok(())
}
async fn passthrough(mut read: OwnedReadHalf, mut write: OwnedWriteHalf) -> anyhow::Result<()> {
let mut buf = vec![0u8; 4096].into_boxed_slice();
let mut buf = vec![0u8; 8192].into_boxed_slice();
loop {
let bytes_read = read.read(&mut buf).await?;
let bytes = &mut buf[..bytes_read];

View file

@ -10,25 +10,23 @@ In a separate terminal, start [rust-mc-bot](https://github.com/Eoghanmc22/rust-m
This command should connect 1000 clients to the server.
```shell
# In the rust-mc-bot directory
cargo r -r -- 127.0.0.1:25565 1000
# If rust-mc-bot was cloned in the performance_tests directory, do
cargo r -r -p rust-mc-bot -- 127.0.0.1:25565 1000
```
If the delta time is consistently >50ms, the server is running behind schedule.
Note:
# Flamegraph
To start capturing a [flamegraph](https://github.com/flamegraph-rs/flamegraph),
run the server like this:
```shell
CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph -p players
# You can also try setting the `CARGO_PROFILE_RELEASE_DEBUG` environment variable to `true`.
cargo flamegraph -p players
```
Run rust-mc-bot as above, and then stop the server after a few seconds. Flamegraph will take its own sweet time to
generate a flamegraph.svg in the current directory. You can then open that file in your internet browser of choice.
NOTE: The indiscriminate use of `rayon` in Valence appears to have made the flamegraph basically unreadable. This
situation should change soon.
Run rust-mc-bot as above, and then stop the server after a few seconds. Flamegraph will generate a flamegraph.svg in the
current directory. You can then open that file in your internet browser of choice.

View file

@ -48,7 +48,7 @@ impl Config for Game {
ConnectionMode::Offline
}
fn outgoing_packet_capacity(&self) -> usize {
fn outgoing_capacity(&self) -> usize {
usize::MAX
}

View file

@ -22,9 +22,10 @@ use crate::block_pos::BlockPos;
pub use crate::chunk_pos::ChunkPos;
use crate::config::Config;
use crate::protocol::packets::s2c::play::{
BlockUpdate, ChunkDataAndUpdateLight, S2cPlayPacket, UpdateSectionBlocks,
BlockUpdate, ChunkDataAndUpdateLight, UpdateSectionBlocks,
};
use crate::protocol::{Encode, VarInt, VarLong};
use crate::server::PlayPacketController;
use crate::util::bits_needed;
mod paletted_container;
@ -598,8 +599,8 @@ impl<C: Config> LoadedChunk<C> {
&self,
pos: ChunkPos,
min_y: i32,
mut push_packet: impl FnMut(S2cPlayPacket),
) {
ctrl: &mut PlayPacketController,
) -> anyhow::Result<()> {
for (sect_y, sect) in self.sections.iter().enumerate() {
if sect.modified_blocks_count == 1 {
let (i, bits) = sect
@ -619,13 +620,10 @@ impl<C: Config> LoadedChunk<C> {
let global_y = sect_y as i32 * 16 + (idx / (16 * 16)) as i32 + min_y;
let global_z = pos.z * 16 + (idx / 16 % 16) as i32;
push_packet(
BlockUpdate {
ctrl.append_packet(&BlockUpdate {
location: BlockPos::new(global_x, global_y, global_z),
block_id: VarInt(block.to_raw() as _),
}
.into(),
);
})?;
} else if sect.modified_blocks_count > 1 {
let mut blocks = Vec::with_capacity(sect.modified_blocks_count.into());
@ -648,16 +646,15 @@ impl<C: Config> LoadedChunk<C> {
| (pos.z as i64 & 0x3fffff) << 20
| (sect_y as i64 + min_y.div_euclid(16) as i64) & 0xfffff;
push_packet(
UpdateSectionBlocks {
ctrl.append_packet(&UpdateSectionBlocks {
chunk_section_position,
invert_trust_edges: false,
blocks,
}
.into(),
);
})?;
}
}
Ok(())
}
fn update(&mut self) {

File diff suppressed because it is too large Load diff

View file

@ -10,6 +10,7 @@ use uuid::Uuid;
use crate::biome::Biome;
use crate::dimension::Dimension;
use crate::protocol::MAX_PACKET_SIZE;
use crate::server::{NewClientData, Server, SharedServer};
use crate::text::Text;
use crate::username::Username;
@ -47,7 +48,13 @@ pub trait Config: Sized + Send + Sync + 'static {
/// You will want this value to be somewhere above the maximum number of
/// players, since status pings should still succeed even when the server is
/// full.
fn max_connections(&self) -> usize;
///
/// # Default Implementation
///
/// Currently returns `1024`. This may change in a future version.
fn max_connections(&self) -> usize {
1024
}
/// Called once at startup to get the socket address the server will
/// be bound to.
@ -103,32 +110,34 @@ pub trait Config: Sized + Send + Sync + 'static {
false
}
/// Called once at startup to get the capacity of the buffer used to
/// hold incoming packets.
/// Called once at startup to get the maximum capacity (in bytes) of the
/// buffer used to hold incoming packet data.
///
/// A larger capacity reduces the chance that a client needs to be
/// disconnected due to a full buffer, but increases potential memory usage.
/// disconnected due to the buffer being full, but increases potential
/// memory usage.
///
/// # Default Implementation
///
/// An unspecified value is returned that should be adequate in most
/// situations.
fn incoming_packet_capacity(&self) -> usize {
64
fn incoming_capacity(&self) -> usize {
MAX_PACKET_SIZE as usize
}
/// Called once at startup to get the capacity of the buffer used to
/// hold outgoing packets.
/// Called once at startup to get the maximum capacity (in bytes) of the
/// buffer used to hold outgoing packets.
///
/// A larger capacity reduces the chance that a client needs to be
/// disconnected due to a full buffer, but increases potential memory usage.
/// disconnected due to the buffer being full, but increases potential
/// memory usage.
///
/// # Default Implementation
///
/// An unspecified value is returned that should be adequate in most
/// situations.
fn outgoing_packet_capacity(&self) -> usize {
2048
fn outgoing_capacity(&self) -> usize {
MAX_PACKET_SIZE as usize * 4
}
/// Called once at startup to get a handle to the tokio runtime the server
@ -387,9 +396,5 @@ where
type ChunkState = Ch;
type PlayerListState = P;
fn max_connections(&self) -> usize {
64
}
fn update(&self, _server: &mut Server<Self>) {}
}

View file

@ -14,9 +14,10 @@ use vek::{Aabb, Vec3};
use crate::config::Config;
use crate::entity::types::{Facing, PaintingKind, Pose};
use crate::protocol::packets::s2c::play::{
S2cPlayPacket, SetEntityMetadata, SetHeadRotation, SpawnEntity, SpawnExperienceOrb, SpawnPlayer,
SetEntityMetadata, SetHeadRotation, SpawnEntity, SpawnExperienceOrb, SpawnPlayer,
};
use crate::protocol::{ByteAngle, RawBytes, VarInt};
use crate::server::PlayPacketController;
use crate::slab_versioned::{Key, VersionedSlab};
use crate::util::aabb_from_bottom_and_size;
use crate::world::WorldId;
@ -741,10 +742,9 @@ impl<C: Config> Entity<C> {
pub(crate) fn spawn_packets(
&self,
this_id: EntityId,
mut push_packet: impl FnMut(S2cPlayPacket),
) {
let with_object_data = |data| {
S2cPlayPacket::from(SpawnEntity {
ctrl: &mut PlayPacketController,
) -> anyhow::Result<()> {
let with_object_data = |data| SpawnEntity {
entity_id: VarInt(this_id.to_network_id()),
object_uuid: self.uuid,
kind: VarInt(self.kind() as i32),
@ -754,58 +754,55 @@ impl<C: Config> Entity<C> {
head_yaw: ByteAngle::from_degrees(self.head_yaw),
data: VarInt(data),
velocity: velocity_to_packet_units(self.velocity),
})
};
match &self.variants {
TrackedData::Marker(_) => {}
TrackedData::ExperienceOrb(_) => push_packet(
SpawnExperienceOrb {
TrackedData::ExperienceOrb(_) => ctrl.append_packet(&SpawnExperienceOrb {
entity_id: VarInt(this_id.to_network_id()),
position: self.new_position,
count: 0, // TODO
}
.into(),
),
})?,
TrackedData::Player(_) => {
push_packet(
SpawnPlayer {
ctrl.append_packet(&SpawnPlayer {
entity_id: VarInt(this_id.to_network_id()),
player_uuid: self.uuid,
position: self.new_position,
yaw: ByteAngle::from_degrees(self.yaw),
pitch: ByteAngle::from_degrees(self.pitch),
}
.into(),
);
})?;
// Player spawn packet doesn't include head yaw for some reason.
push_packet(
SetHeadRotation {
ctrl.append_packet(&SetHeadRotation {
entity_id: VarInt(this_id.to_network_id()),
head_yaw: ByteAngle::from_degrees(self.head_yaw),
})?;
}
.into(),
);
TrackedData::ItemFrame(e) => ctrl.append_packet(&with_object_data(e.get_rotation()))?,
TrackedData::GlowItemFrame(e) => {
ctrl.append_packet(&with_object_data(e.get_rotation()))?
}
TrackedData::ItemFrame(e) => push_packet(with_object_data(e.get_rotation())),
TrackedData::GlowItemFrame(e) => push_packet(with_object_data(e.get_rotation())),
TrackedData::Painting(_) => push_packet(with_object_data(
TrackedData::Painting(_) => ctrl.append_packet(&with_object_data(
match ((self.yaw + 45.0).rem_euclid(360.0) / 90.0) as u8 {
0 => 3,
1 => 4,
2 => 2,
_ => 5,
},
)),
))?,
// TODO: set block state ID for falling block.
TrackedData::FallingBlock(_) => push_packet(with_object_data(1)),
TrackedData::FishingBobber(e) => push_packet(with_object_data(e.get_hook_entity_id())),
TrackedData::FallingBlock(_) => ctrl.append_packet(&with_object_data(1))?,
TrackedData::FishingBobber(e) => {
ctrl.append_packet(&with_object_data(e.get_hook_entity_id()))?
}
TrackedData::Warden(e) => {
push_packet(with_object_data((e.get_pose() == Pose::Emerging).into()))
ctrl.append_packet(&with_object_data((e.get_pose() == Pose::Emerging).into()))?
}
_ => push_packet(with_object_data(0)),
_ => ctrl.append_packet(&with_object_data(0))?,
}
Ok(())
}
}

View file

@ -10,10 +10,11 @@ use crate::client::GameMode;
use crate::config::Config;
use crate::player_textures::SignedPlayerTextures;
use crate::protocol::packets::s2c::play::{
PlayerInfo, PlayerListAddPlayer, S2cPlayPacket, SetTabListHeaderAndFooter,
PlayerInfo, PlayerListAddPlayer, SetTabListHeaderAndFooter,
};
use crate::protocol::packets::Property;
use crate::protocol::VarInt;
use crate::server::PlayPacketController;
use crate::slab_rc::{Key, SlabRc};
use crate::text::Text;
@ -246,7 +247,7 @@ impl<C: Config> PlayerList<C> {
self.entries.iter_mut().map(|(k, v)| (*k, v))
}
pub(crate) fn initial_packets(&self, mut push_packet: impl FnMut(S2cPlayPacket)) {
pub(crate) fn initial_packets(&self, ctrl: &mut PlayPacketController) -> anyhow::Result<()> {
let add_player: Vec<_> = self
.entries
.iter()
@ -272,23 +273,24 @@ impl<C: Config> PlayerList<C> {
.collect();
if !add_player.is_empty() {
push_packet(PlayerInfo::AddPlayer(add_player).into());
ctrl.append_packet(&PlayerInfo::AddPlayer(add_player))?;
}
if self.header != Text::default() || self.footer != Text::default() {
push_packet(
SetTabListHeaderAndFooter {
ctrl.append_packet(&SetTabListHeaderAndFooter {
header: self.header.clone(),
footer: self.footer.clone(),
}
.into(),
);
}
})?;
}
pub(crate) fn update_packets(&self, mut push_packet: impl FnMut(S2cPlayPacket)) {
Ok(())
}
pub(crate) fn update_packets(&self, ctrl: &mut PlayPacketController) -> anyhow::Result<()> {
if !self.removed.is_empty() {
push_packet(PlayerInfo::RemovePlayer(self.removed.iter().cloned().collect()).into());
ctrl.append_packet(&PlayerInfo::RemovePlayer(
self.removed.iter().cloned().collect(),
))?;
}
let mut add_player = Vec::new();
@ -334,34 +336,35 @@ impl<C: Config> PlayerList<C> {
}
if !add_player.is_empty() {
push_packet(PlayerInfo::AddPlayer(add_player).into());
ctrl.append_packet(&PlayerInfo::AddPlayer(add_player))?;
}
if !game_mode.is_empty() {
push_packet(PlayerInfo::UpdateGameMode(game_mode).into());
ctrl.append_packet(&PlayerInfo::UpdateGameMode(game_mode))?;
}
if !ping.is_empty() {
push_packet(PlayerInfo::UpdateLatency(ping).into());
ctrl.append_packet(&PlayerInfo::UpdateLatency(ping))?;
}
if !display_name.is_empty() {
push_packet(PlayerInfo::UpdateDisplayName(display_name).into());
ctrl.append_packet(&PlayerInfo::UpdateDisplayName(display_name))?;
}
if self.modified_header_or_footer {
push_packet(
SetTabListHeaderAndFooter {
ctrl.append_packet(&SetTabListHeaderAndFooter {
header: self.header.clone(),
footer: self.footer.clone(),
}
.into(),
);
}
})?;
}
pub(crate) fn clear_packets(&self, mut push_packet: impl FnMut(S2cPlayPacket)) {
push_packet(PlayerInfo::RemovePlayer(self.entries.keys().cloned().collect()).into());
Ok(())
}
pub(crate) fn clear_packets(&self, ctrl: &mut PlayPacketController) -> anyhow::Result<()> {
ctrl.append_packet(&PlayerInfo::RemovePlayer(
self.entries.keys().cloned().collect(),
))
}
}

View file

@ -6,12 +6,12 @@
//! While the protocol module is technically public API, its use is discouraged
//! and has thus been hidden from the documentation. You may find yourself
//! needing to use this module under the following circumstances:
//! - You want to send packets to clients manually using the [`send_packet`]
//! - You want to send packets to clients manually using the [`queue_packet`]
//! function.
//! - You are writing a proxy between the client and server.
//! - You are writing a Minecraft client.
//!
//! [`send_packet`]: crate::client::Client::send_packet
//! [`queue_packet`]: crate::client::Client::queue_packet
use std::borrow::Cow;
use std::io::{Read, Write};

View file

@ -1,341 +1,409 @@
//! Reading and writing packets.
use std::io::Read;
use std::time::Duration;
use aes::cipher::{AsyncStreamCipher, NewCipher};
use aes::Aes128;
use anyhow::{bail, ensure, Context};
use cfb8::cipher::{AsyncStreamCipher, NewCipher};
use bytes::{Buf, BufMut, BytesMut};
use cfb8::Cfb8;
use flate2::bufread::{ZlibDecoder, ZlibEncoder};
use flate2::bufread::ZlibDecoder;
use flate2::write::ZlibEncoder;
use flate2::Compression;
use log::{log_enabled, Level};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::time::timeout;
use log::log_enabled;
use super::packets::{DecodePacket, EncodePacket};
use crate::protocol::packets::{DecodePacket, EncodePacket};
use crate::protocol::var_int::VarIntDecodeError;
use crate::protocol::{Decode, Encode, VarInt, MAX_PACKET_SIZE};
pub struct Encoder<W> {
write: W,
buf: Vec<u8>,
compress_buf: Vec<u8>,
compression_threshold: Option<u32>,
cipher: Option<Cipher>,
timeout: Duration,
}
impl<W: AsyncWrite + Unpin> Encoder<W> {
pub fn new(write: W, timeout: Duration) -> Self {
Self {
write,
buf: Vec::new(),
compress_buf: Vec::new(),
compression_threshold: None,
cipher: None,
timeout,
}
}
/// Queues a packet to be written to the writer.
///
/// To write all queued packets, call [`Self::flush`].
pub fn queue_packet(&mut self, packet: &(impl EncodePacket + ?Sized)) -> anyhow::Result<()> {
let start_len = self.buf.len();
packet.encode_packet(&mut self.buf)?;
let data_len = self.buf.len() - start_len;
ensure!(data_len <= i32::MAX as usize, "bad packet data length");
if let Some(threshold) = self.compression_threshold {
if data_len >= threshold as usize {
let mut z = ZlibEncoder::new(&self.buf[start_len..], Compression::best());
z.read_to_end(&mut self.compress_buf)?;
let data_len_len = VarInt(data_len as i32).encoded_len();
let packet_len = data_len_len + self.compress_buf.len();
ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length");
self.buf.truncate(start_len);
VarInt(packet_len as i32).encode(&mut self.buf)?;
VarInt(data_len as i32).encode(&mut self.buf)?;
self.buf.extend_from_slice(&self.compress_buf);
self.compress_buf.clear();
} else {
let packet_len = VarInt(0).encoded_len() + data_len;
ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length");
self.buf.truncate(start_len);
VarInt(packet_len as i32).encode(&mut self.buf)?;
VarInt(0).encode(&mut self.buf)?; // 0 for no compression.
packet.encode_packet(&mut self.buf)?;
}
} else {
let packet_len = data_len;
ensure!(packet_len <= MAX_PACKET_SIZE as usize, "bad packet length");
self.buf.truncate(start_len);
VarInt(packet_len as i32).encode(&mut self.buf)?;
packet.encode_packet(&mut self.buf)?;
}
Ok(())
}
/// Writes all queued packets to the writer.
pub async fn flush(&mut self) -> anyhow::Result<()> {
if !self.buf.is_empty() {
if let Some(cipher) = &mut self.cipher {
cipher.encrypt(&mut self.buf);
}
timeout(self.timeout, self.write.write_all(&self.buf)).await??;
self.buf.clear();
}
Ok(())
}
/// Queue one packet and then flush the buffer.
pub async fn write_packet(
&mut self,
packet: &(impl EncodePacket + ?Sized),
) -> anyhow::Result<()> {
self.queue_packet(packet)?;
self.flush().await
}
pub fn enable_encryption(&mut self, key: &[u8; 16]) {
self.cipher = Some(NewCipher::new(key.into(), key.into()));
}
pub fn enable_compression(&mut self, threshold: u32) {
self.compression_threshold = Some(threshold);
}
pub fn into_inner(self) -> W {
self.write
}
}
pub struct Decoder<R> {
read: BufReader<R>,
buf: Vec<u8>,
decompress_buf: Vec<u8>,
compression_threshold: Option<u32>,
cipher: Option<Cipher>,
timeout: Duration,
}
impl<R: AsyncRead + Unpin> Decoder<R> {
pub fn new(read: R, timeout: Duration) -> Self {
Self {
read: BufReader::new(read),
buf: Vec::new(),
decompress_buf: Vec::new(),
compression_threshold: None,
cipher: None,
timeout,
}
}
pub async fn read_packet<P: DecodePacket>(&mut self) -> anyhow::Result<P> {
timeout(self.timeout, self.read_packet_impl()).await?
}
async fn read_packet_impl<P: DecodePacket>(&mut self) -> anyhow::Result<P> {
let packet_len = self
.read_var_int_async()
.await
.context("reading packet length")?;
ensure!(
(0..=MAX_PACKET_SIZE).contains(&packet_len),
"invalid packet length of {packet_len}."
);
self.buf.resize(packet_len as usize, 0);
self.read
.read_exact(&mut self.buf)
.await
.context("reading packet body")?;
if let Some(cipher) = &mut self.cipher {
cipher.decrypt(&mut self.buf);
}
let mut packet_contents = self.buf.as_slice();
// Compression enabled?
let packet = if self.compression_threshold.is_some() {
// The length of the packet data once uncompressed (zero indicates no
// compression).
let data_len = VarInt::decode(&mut packet_contents)
.context("reading data length (once decompressed)")?
.0;
ensure!(
(0..=MAX_PACKET_SIZE).contains(&data_len),
"invalid packet data length of {data_len}."
);
if data_len != 0 {
let mut z = ZlibDecoder::new(&mut packet_contents);
self.decompress_buf.resize(data_len as usize, 0);
z.read_exact(&mut self.decompress_buf)
.context("decompressing packet body")?;
let mut decompressed = self.decompress_buf.as_slice();
let packet = P::decode_packet(&mut decompressed)
.context("decoding packet after decompressing")?;
ensure!(
decompressed.is_empty(),
"packet contents were not read completely ({} bytes remaining)",
decompressed.len()
);
packet
} else {
P::decode_packet(&mut packet_contents).context("decoding packet")?
}
} else {
P::decode_packet(&mut packet_contents).context("decoding packet")?
};
if !packet_contents.is_empty() {
if log_enabled!(Level::Debug) {
log::debug!("complete packet after partial decode: {packet:?}");
}
bail!(
"packet contents were not decoded completely ({} bytes remaining)",
packet_contents.len()
);
}
Ok(packet)
}
async fn read_var_int_async(&mut self) -> anyhow::Result<i32> {
let mut val = 0;
for i in 0..VarInt::MAX_SIZE {
let array = &mut [self.read.read_u8().await?];
if let Some(cipher) = &mut self.cipher {
cipher.decrypt(array);
}
let [byte] = *array;
val |= (byte as i32 & 0b01111111) << (i * 7);
if byte & 0b10000000 == 0 {
return Ok(val);
}
}
bail!("var int is too large")
}
pub fn enable_encryption(&mut self, key: &[u8; 16]) {
self.cipher = Some(NewCipher::new(key.into(), key.into()));
}
pub fn enable_compression(&mut self, threshold: u32) {
self.compression_threshold = Some(threshold);
}
pub fn packet_buf(&self) -> &[u8] {
&self.buf
}
pub fn into_inner(self) -> R {
self.read.into_inner()
}
}
/// The AES block cipher with a 128 bit key, using the CFB-8 mode of
/// operation.
type Cipher = Cfb8<Aes128>;
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use std::time::Duration;
#[derive(Default)]
pub struct PacketEncoder {
buf: BytesMut,
compress_buf: Vec<u8>,
compression_threshold: Option<u32>,
cipher: Option<Cipher>,
}
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::oneshot;
use super::*;
use crate::protocol::packets::test::TestPacket;
#[tokio::test]
async fn encode_decode() {
encode_decode_impl().await
impl PacketEncoder {
pub fn new() -> Self {
Self::default()
}
const CRYPT_KEY: [u8; 16] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
const TIMEOUT: Duration = Duration::from_secs(3);
async fn encode_decode_impl() {
let (tx, rx) = oneshot::channel();
let t = tokio::spawn(listen(tx));
let stream = TcpStream::connect(rx.await.unwrap()).await.unwrap();
let mut encoder = Encoder::new(stream, TIMEOUT);
send_test_packet(&mut encoder).await;
encoder.enable_compression(10);
send_test_packet(&mut encoder).await;
encoder.enable_encryption(&CRYPT_KEY);
send_test_packet(&mut encoder).await;
send_test_packet(&mut encoder).await;
send_test_packet(&mut encoder).await;
t.await.unwrap()
pub fn append_packet<P>(&mut self, pkt: &P) -> anyhow::Result<()>
where
P: EncodePacket + ?Sized,
{
self.append_or_prepend_packet::<true>(pkt)
}
async fn listen(local_addr: oneshot::Sender<SocketAddr>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
local_addr.send(listener.local_addr().unwrap()).unwrap();
let stream = listener.accept().await.unwrap().0;
let mut decoder = Decoder::new(stream, TIMEOUT);
recv_test_packet(&mut decoder).await;
decoder.enable_compression(10);
recv_test_packet(&mut decoder).await;
decoder.enable_encryption(&CRYPT_KEY);
recv_test_packet(&mut decoder).await;
recv_test_packet(&mut decoder).await;
recv_test_packet(&mut decoder).await;
pub fn prepend_packet<P>(&mut self, pkt: &P) -> anyhow::Result<()>
where
P: EncodePacket + ?Sized,
{
self.append_or_prepend_packet::<false>(pkt)
}
async fn send_test_packet(w: &mut Encoder<TcpStream>) {
w.write_packet(&TestPacket {
first: "abcdefghijklmnopqrstuvwxyz".into(),
second: vec![0x1234, 0xabcd],
third: 0x1122334455667788,
})
.await
.unwrap();
fn append_or_prepend_packet<const APPEND: bool>(
&mut self,
pkt: &(impl EncodePacket + ?Sized),
) -> anyhow::Result<()> {
let data_len = pkt.encoded_packet_len();
if let Some(threshold) = self.compression_threshold {
if data_len >= threshold as usize {
let mut z = ZlibEncoder::new(&mut self.compress_buf, Compression::best());
pkt.encode_packet(&mut z)?;
drop(z);
let packet_len = VarInt(data_len as i32).encoded_len() + self.compress_buf.len();
ensure!(
packet_len <= MAX_PACKET_SIZE as usize,
"packet exceeds maximum length"
);
// BytesMut doesn't implement io::Write for some reason.
let mut writer = (&mut self.buf).writer();
if APPEND {
VarInt(packet_len as i32).encode(&mut writer)?;
VarInt(data_len as i32).encode(&mut writer)?;
self.buf.extend_from_slice(&self.compress_buf);
} else {
let mut slice = move_forward_by(
&mut self.buf,
VarInt(packet_len as i32).encoded_len() + packet_len,
);
VarInt(packet_len as i32).encode(&mut slice)?;
VarInt(data_len as i32).encode(&mut slice)?;
slice.copy_from_slice(&self.compress_buf);
}
async fn recv_test_packet(r: &mut Decoder<TcpStream>) {
let TestPacket {
first,
second,
third,
} = r.read_packet().await.unwrap();
self.compress_buf.clear();
} else {
let packet_len = VarInt(0).encoded_len() + data_len;
assert_eq!(&first, "abcdefghijklmnopqrstuvwxyz");
assert_eq!(&second, &[0x1234, 0xabcd]);
assert_eq!(third, 0x1122334455667788);
ensure!(
packet_len <= MAX_PACKET_SIZE as usize,
"packet exceeds maximum length"
);
let mut writer = (&mut self.buf).writer();
if APPEND {
VarInt(packet_len as i32).encode(&mut writer)?;
VarInt(0).encode(&mut writer)?; // 0 for no compression on this packet.
pkt.encode_packet(&mut writer)?;
} else {
let mut slice = move_forward_by(
&mut self.buf,
VarInt(packet_len as i32).encoded_len() + packet_len,
);
VarInt(packet_len as i32).encode(&mut slice)?;
VarInt(0).encode(&mut slice)?;
pkt.encode_packet(&mut slice)?;
debug_assert!(
slice.is_empty(),
"actual size of {} packet differs from reported size (actual = {}, \
reported = {})",
pkt.packet_name(),
data_len - slice.len(),
data_len,
);
}
}
} else {
let packet_len = data_len;
ensure!(
packet_len <= MAX_PACKET_SIZE as usize,
"packet exceeds maximum length"
);
if APPEND {
let mut writer = (&mut self.buf).writer();
VarInt(packet_len as i32).encode(&mut writer)?;
pkt.encode_packet(&mut writer)?;
} else {
let mut slice = move_forward_by(
&mut self.buf,
VarInt(packet_len as i32).encoded_len() + packet_len,
);
VarInt(packet_len as i32).encode(&mut slice)?;
pkt.encode_packet(&mut slice)?;
debug_assert!(
slice.is_empty(),
"actual size of {} packet differs from reported size (actual = {}, reported = \
{})",
pkt.packet_name(),
data_len - slice.len(),
data_len,
);
}
}
Ok(())
}
/// Takes all the packets written so far and encrypts them if encryption is
/// enabled.
pub fn take(&mut self) -> BytesMut {
if let Some(cipher) = &mut self.cipher {
cipher.encrypt(&mut self.buf);
}
self.buf.split()
}
pub fn set_compression(&mut self, threshold: Option<u32>) {
self.compression_threshold = threshold;
}
/// Enables encryption for all future packets **and any packets that have
/// not been [taken] yet.**
///
/// [taken]: Self::take
pub fn enable_encryption(&mut self, key: &[u8; 16]) {
assert!(self.cipher.is_none(), "encryption is already enabled");
self.cipher = Some(NewCipher::new(key.into(), key.into()));
}
}
/// Move the bytes in `bytes` forward by `count` bytes and return a
/// mutable reference to the new space at the front.
fn move_forward_by(bytes: &mut BytesMut, count: usize) -> &mut [u8] {
let len = bytes.len();
bytes.put_bytes(0, count);
bytes.copy_within(..len, count);
&mut bytes[..count]
}
#[derive(Default)]
pub struct PacketDecoder {
buf: BytesMut,
decompress_buf: Vec<u8>,
compression: bool,
cipher: Option<Cipher>,
}
impl PacketDecoder {
pub fn new() -> Self {
Self::default()
}
pub fn try_next_packet<P>(&mut self) -> anyhow::Result<Option<P>>
where
P: DecodePacket,
{
let mut r = &self.buf[..];
let packet_len = match VarInt::decode_partial(&mut r) {
Ok(len) => len,
Err(VarIntDecodeError::Incomplete) => return Ok(None),
Err(VarIntDecodeError::TooLarge) => bail!("malformed packet length VarInt"),
};
ensure!(
packet_len <= MAX_PACKET_SIZE,
"packet length of {packet_len} is out of bounds"
);
if r.len() < packet_len as usize {
return Ok(None);
}
r = &r[..packet_len as usize];
let packet = if self.compression {
let data_len = VarInt::decode(&mut r)?.0;
ensure!(
(0..MAX_PACKET_SIZE).contains(&data_len),
"decompressed packet length of {data_len} is out of bounds"
);
if data_len != 0 {
self.decompress_buf.clear();
self.decompress_buf.reserve_exact(data_len as usize);
let mut z = ZlibDecoder::new(r).take(data_len as u64);
z.read_to_end(&mut self.decompress_buf)
.context("decompressing packet")?;
r = &self.decompress_buf;
P::decode_packet(&mut r)?
} else {
P::decode_packet(&mut r)?
}
} else {
P::decode_packet(&mut r)?
};
if !r.is_empty() {
if log_enabled!(log::Level::Debug) {
log::debug!("packet after partial decode: {packet:?}");
}
bail!(
"packet contents were not read completely ({} bytes remain)",
r.len()
);
}
let total_packet_len = VarInt(packet_len).encoded_len() + packet_len as usize;
self.buf.advance(total_packet_len);
Ok(Some(packet))
}
pub fn set_compression(&mut self, compression: bool) {
self.compression = compression;
}
pub fn enable_encryption(&mut self, key: &[u8; 16]) {
assert!(self.cipher.is_none(), "encryption is already enabled");
let mut cipher = Cipher::new(key.into(), key.into());
// Don't forget to decrypt the data we already have.
cipher.decrypt(&mut self.buf);
self.cipher = Some(cipher);
}
pub fn queue_bytes(&mut self, mut bytes: BytesMut) {
if let Some(cipher) = &mut self.cipher {
cipher.decrypt(&mut bytes);
}
self.buf.unsplit(bytes);
}
pub fn queue_slice(&mut self, bytes: &[u8]) {
let len = self.buf.len();
self.buf.extend_from_slice(bytes);
if let Some(cipher) = &mut self.cipher {
cipher.decrypt(&mut self.buf[len..]);
}
}
pub fn queued_bytes(&self) -> &[u8] {
self.buf.as_ref()
}
pub fn take_capacity(&mut self) -> BytesMut {
self.buf.split_off(self.buf.len())
}
pub fn reserve(&mut self, additional: usize) {
self.buf.reserve(additional);
}
}
#[cfg(test)]
mod tests {
use std::io::Write;
use anyhow::Context;
use super::*;
use crate::protocol::packets::{DecodePacket, EncodePacket, PacketName};
use crate::protocol::{Decode, Encode};
const CRYPT_KEY: [u8; 16] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
#[derive(Clone, PartialEq, Eq, Debug)]
struct TestPacket {
string: String,
vec_of_u16: Vec<u16>,
u64: u64,
}
impl PacketName for TestPacket {
fn packet_name(&self) -> &'static str {
"TestPacket"
}
}
impl EncodePacket for TestPacket {
fn encode_packet(&self, w: &mut impl Write) -> anyhow::Result<()> {
self.string.encode(w)?;
self.vec_of_u16.encode(w)?;
self.u64.encode(w)
}
fn encoded_packet_len(&self) -> usize {
self.string.encoded_len() + self.vec_of_u16.encoded_len() + self.u64.encoded_len()
}
}
impl DecodePacket for TestPacket {
fn decode_packet(r: &mut &[u8]) -> anyhow::Result<Self> {
Ok(TestPacket {
string: String::decode(r).context("decoding string field")?,
vec_of_u16: Vec::decode(r).context("decoding vec of u16 field")?,
u64: u64::decode(r).context("decoding u64 field")?,
})
}
}
impl TestPacket {
fn new(s: impl Into<String>) -> Self {
Self {
string: s.into(),
vec_of_u16: vec![0x1234, 0xabcd],
u64: 0x1122334455667788,
}
}
fn check(&self, s: impl AsRef<str>) {
assert_eq!(&self.string, s.as_ref());
assert_eq!(&self.vec_of_u16, &[0x1234, 0xabcd]);
assert_eq!(self.u64, 0x1122334455667788);
}
}
#[test]
fn packets_round_trip() {
let mut buf = BytesMut::new();
let mut enc = PacketEncoder::new();
enc.append_packet(&TestPacket::new("first")).unwrap();
enc.set_compression(Some(0));
enc.append_packet(&TestPacket::new("second")).unwrap();
buf.unsplit(enc.take());
enc.enable_encryption(&CRYPT_KEY);
enc.append_packet(&TestPacket::new("third")).unwrap();
enc.prepend_packet(&TestPacket::new("fourth")).unwrap();
buf.unsplit(enc.take());
let mut dec = PacketDecoder::new();
dec.queue_bytes(buf);
dec.try_next_packet::<TestPacket>()
.unwrap()
.unwrap()
.check("first");
dec.set_compression(true);
dec.try_next_packet::<TestPacket>()
.unwrap()
.unwrap()
.check("second");
dec.enable_encryption(&CRYPT_KEY);
dec.try_next_packet::<TestPacket>()
.unwrap()
.unwrap()
.check("fourth");
dec.try_next_packet::<TestPacket>()
.unwrap()
.unwrap()
.check("third");
}
}

View file

@ -39,6 +39,7 @@ pub trait PacketName {
pub trait EncodePacket: PacketName + fmt::Debug {
/// Writes a packet to the Minecraft protocol, including its packet ID.
fn encode_packet(&self, w: &mut impl Write) -> anyhow::Result<()>;
fn encoded_packet_len(&self) -> usize;
}
/// Trait for types that can be read from the Minecraft protocol as a complete
@ -334,6 +335,10 @@ macro_rules! def_packet_group {
VarInt($id).encode(w).context("failed to write packet ID")?;
self.encode(w)
}
fn encoded_packet_len(&self) -> usize {
VarInt($id).encoded_len() + self.encoded_len()
}
}
impl DecodePacket for $packet {
@ -395,6 +400,14 @@ macro_rules! def_packet_group {
)*
}
}
fn encoded_packet_len(&self) -> usize {
match self {
$(
Self::$packet(pkt) => VarInt($id).encoded_len() + pkt.encoded_len(),
)*
}
}
}
impl fmt::Debug for $group_name {

View file

@ -1,7 +1,8 @@
use std::io::Write;
use std::io::{Read, Write};
use anyhow::bail;
use byteorder::{ReadBytesExt, WriteBytesExt};
use thiserror::Error;
use crate::protocol::{Decode, Encode};
@ -13,12 +14,31 @@ impl VarInt {
/// The maximum number of bytes a VarInt could occupy when read from and
/// written to the Minecraft protocol.
pub const MAX_SIZE: usize = 5;
pub(crate) fn decode_partial(mut r: impl Read) -> Result<i32, VarIntDecodeError> {
let mut val = 0;
for i in 0..Self::MAX_SIZE {
let byte = r.read_u8().map_err(|_| VarIntDecodeError::Incomplete)?;
val |= (byte as i32 & 0b01111111) << (i * 7);
if byte & 0b10000000 == 0 {
return Ok(val);
}
}
Err(VarIntDecodeError::TooLarge)
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, Error)]
pub(crate) enum VarIntDecodeError {
#[error("incomplete VarInt decode")]
Incomplete,
#[error("VarInt is too large")]
TooLarge,
}
impl Encode for VarInt {
fn encode(&self, w: &mut impl Write) -> anyhow::Result<()> {
// TODO: optimize this.
let mut val = self.0 as u32;
loop {
if val & 0b11111111111111111111111110000000 == 0 {

View file

@ -17,8 +17,6 @@ impl VarLong {
impl Encode for VarLong {
fn encode(&self, w: &mut impl Write) -> anyhow::Result<()> {
// TODO: optimize this.
let mut val = self.0 as u64;
loop {
if val & 0b1111111111111111111111111111111111111111111111111111111110000000 == 0 {

View file

@ -10,6 +10,7 @@ use std::{io, thread};
use anyhow::{ensure, Context};
use flume::{Receiver, Sender};
pub(crate) use packet_controller::PlayPacketController;
use rand::rngs::OsRng;
use rayon::iter::ParallelIterator;
use reqwest::Client as HttpClient;
@ -18,7 +19,7 @@ use serde_json::{json, Value};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::{Handle, Runtime};
use tokio::sync::{oneshot, Semaphore};
use tokio::sync::Semaphore;
use uuid::Uuid;
use valence_nbt::{compound, Compound, List};
@ -30,20 +31,21 @@ use crate::entity::Entities;
use crate::inventory::Inventories;
use crate::player_list::PlayerLists;
use crate::player_textures::SignedPlayerTextures;
use crate::protocol::codec::{Decoder, Encoder};
use crate::protocol::codec::{PacketDecoder, PacketEncoder};
use crate::protocol::packets::c2s::handshake::{Handshake, HandshakeNextState};
use crate::protocol::packets::c2s::login::LoginStart;
use crate::protocol::packets::c2s::play::C2sPlayPacket;
use crate::protocol::packets::c2s::status::{PingRequest, StatusRequest};
use crate::protocol::packets::s2c::login::{DisconnectLogin, LoginSuccess, SetCompression};
use crate::protocol::packets::s2c::play::S2cPlayPacket;
use crate::protocol::packets::s2c::status::{PingResponse, StatusResponse};
use crate::protocol::VarInt;
use crate::server::packet_controller::InitialPacketController;
use crate::username::Username;
use crate::world::Worlds;
use crate::{ident, Ticks, PROTOCOL_VERSION, VERSION_NAME};
mod byte_channel;
mod login;
mod packet_controller;
/// Contains the entire state of a running Minecraft server, accessible from
/// within the [update](crate::config::Config::update) loop.
@ -64,7 +66,7 @@ pub struct Server<C: Config> {
}
/// A handle to a Minecraft server containing the subset of functionality which
/// is accessible outside the [update][update] loop.
/// is accessible outside the [update] loop.
///
/// `SharedServer`s are internally refcounted and can
/// be shared between threads.
@ -84,8 +86,8 @@ struct SharedServerInner<C: Config> {
tick_rate: Ticks,
connection_mode: ConnectionMode,
max_connections: usize,
incoming_packet_capacity: usize,
outgoing_packet_capacity: usize,
incoming_capacity: usize,
outgoing_capacity: usize,
tokio_handle: Handle,
/// Store this here so we don't drop it.
_tokio_runtime: Option<Runtime>,
@ -116,6 +118,7 @@ struct SharedServerInner<C: Config> {
}
/// Contains information about a new client.
#[non_exhaustive]
pub struct NewClientData {
/// The UUID of the new client.
pub uuid: Uuid,
@ -130,31 +133,12 @@ pub struct NewClientData {
struct NewClientMessage {
ncd: NewClientData,
reply: oneshot::Sender<S2cPacketChannels>,
ctrl: PlayPacketController,
}
/// The result type returned from [`start_server`].
pub type ShutdownResult = Result<(), Box<dyn Error + Send + Sync + 'static>>;
pub(crate) type S2cPacketChannels = (Sender<C2sPlayPacket>, Receiver<S2cPlayMessage>);
pub(crate) type C2sPacketChannels = (Sender<S2cPlayMessage>, Receiver<C2sPlayPacket>);
/// Messages sent to packet encoders.
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug)]
pub(crate) enum S2cPlayMessage {
/// Queue a play packet for sending.
Queue(S2cPlayPacket),
/// Instructs the encoder to flush all queued packets to the TCP stream.
Flush,
}
impl<P: Into<S2cPlayPacket>> From<P> for S2cPlayMessage {
fn from(pkt: P) -> Self {
Self::Queue(pkt.into())
}
}
impl<C: Config> SharedServer<C> {
/// Gets a reference to the config object used to start the server.
pub fn config(&self) -> &C {
@ -181,14 +165,14 @@ impl<C: Config> SharedServer<C> {
self.0.max_connections
}
/// Gets the configured incoming packet capacity.
pub fn incoming_packet_capacity(&self) -> usize {
self.0.incoming_packet_capacity
/// Gets the configured incoming capacity.
pub fn incoming_capacity(&self) -> usize {
self.0.incoming_capacity
}
/// Gets the configured outgoing incoming packet capacity.
pub fn outgoing_packet_capacity(&self) -> usize {
self.0.outgoing_packet_capacity
/// Gets the configured outgoing incoming capacity.
pub fn outgoing_capacity(&self) -> usize {
self.0.outgoing_capacity
}
/// Gets a handle to the tokio instance this server is using.
@ -259,13 +243,12 @@ impl<C: Config> SharedServer<C> {
///
/// You may want to disconnect all players with a message prior to calling
/// this function.
pub fn shutdown<R, E>(&self, res: R)
pub fn shutdown<E>(&self, res: Result<(), E>)
where
R: Into<Result<(), E>>,
E: Into<Box<dyn Error + Send + Sync + 'static>>,
{
self.0.connection_sema.close();
*self.0.shutdown_result.lock().unwrap() = Some(res.into().map_err(|e| e.into()));
*self.0.shutdown_result.lock().unwrap() = Some(res.map_err(|e| e.into()));
}
}
@ -306,14 +289,14 @@ fn setup_server<C: Config>(cfg: C) -> anyhow::Result<SharedServer<C>> {
let connection_mode = cfg.connection_mode();
let incoming_packet_capacity = cfg.incoming_packet_capacity();
let incoming_packet_capacity = cfg.incoming_capacity();
ensure!(
incoming_packet_capacity > 0,
"serverbound packet capacity must be nonzero"
);
let outgoing_packet_capacity = cfg.outgoing_packet_capacity();
let outgoing_packet_capacity = cfg.outgoing_capacity();
ensure!(
outgoing_packet_capacity > 0,
@ -334,7 +317,7 @@ fn setup_server<C: Config>(cfg: C) -> anyhow::Result<SharedServer<C>> {
rsa_der::public_key_to_der(&rsa_key.n().to_bytes_be(), &rsa_key.e().to_bytes_be())
.into_boxed_slice();
let (new_clients_tx, new_clients_rx) = flume::bounded(1);
let (new_clients_tx, new_clients_rx) = flume::bounded(1024);
let runtime = if tokio_handle.is_none() {
Some(Runtime::new()?)
@ -355,8 +338,8 @@ fn setup_server<C: Config>(cfg: C) -> anyhow::Result<SharedServer<C>> {
tick_rate,
connection_mode,
max_connections,
incoming_packet_capacity,
outgoing_packet_capacity,
incoming_capacity: incoming_packet_capacity,
outgoing_capacity: outgoing_packet_capacity,
tokio_handle,
_tokio_runtime: runtime,
dimensions,
@ -404,7 +387,7 @@ fn make_registry_codec(dimensions: &[Dimension], biomes: &[Biome]) -> Compound {
}
}
fn do_update_loop<C: Config>(server: &mut Server<C>) -> ShutdownResult {
fn do_update_loop(server: &mut Server<impl Config>) -> ShutdownResult {
let mut tick_start = Instant::now();
let shared = server.shared.clone();
@ -414,7 +397,9 @@ fn do_update_loop<C: Config>(server: &mut Server<C>) -> ShutdownResult {
}
while let Ok(msg) = shared.0.new_clients_rx.try_recv() {
join_player(server, msg);
server
.clients
.insert(Client::new(msg.ctrl, msg.ncd, Default::default()));
}
// Get serverbound packets first so they are not dealt with a tick late.
@ -456,26 +441,7 @@ fn do_update_loop<C: Config>(server: &mut Server<C>) -> ShutdownResult {
}
}
fn join_player<C: Config>(server: &mut Server<C>, msg: NewClientMessage) {
let (clientbound_tx, clientbound_rx) = flume::bounded(server.shared.0.outgoing_packet_capacity);
let (serverbound_tx, serverbound_rx) = flume::bounded(server.shared.0.incoming_packet_capacity);
let s2c_packet_channels: S2cPacketChannels = (serverbound_tx, clientbound_rx);
let c2s_packet_channels: C2sPacketChannels = (clientbound_tx, serverbound_rx);
let _ = msg.reply.send(s2c_packet_channels);
let client = Client::new(c2s_packet_channels, msg.ncd, C::ClientState::default());
server.clients.insert(client);
}
struct Codec {
enc: Encoder<OwnedWriteHalf>,
dec: Decoder<OwnedReadHalf>,
}
async fn do_accept_loop<C: Config>(server: SharedServer<C>) {
async fn do_accept_loop(server: SharedServer<impl Config>) {
log::trace!("entering accept loop");
let listener = match TcpListener::bind(server.0.address).await {
@ -517,22 +483,24 @@ async fn do_accept_loop<C: Config>(server: SharedServer<C>) {
}
}
async fn handle_connection<C: Config>(
server: SharedServer<C>,
async fn handle_connection(
server: SharedServer<impl Config>,
stream: TcpStream,
remote_addr: SocketAddr,
) -> anyhow::Result<()> {
let timeout = Duration::from_secs(10);
let (read, write) = stream.into_split();
let mut c = Codec {
enc: Encoder::new(write, timeout),
dec: Decoder::new(read, timeout),
};
let mut ctrl = InitialPacketController::new(
read,
write,
PacketEncoder::new(),
PacketDecoder::new(),
Duration::from_secs(5),
);
// TODO: peek stream for 0xFE legacy ping
let handshake: Handshake = c.dec.read_packet().await?;
let handshake: Handshake = ctrl.recv_packet().await?;
ensure!(
matches!(server.connection_mode(), ConnectionMode::BungeeCord)
@ -541,28 +509,37 @@ async fn handle_connection<C: Config>(
);
match handshake.next_state {
HandshakeNextState::Status => handle_status(server, &mut c, remote_addr, handshake)
HandshakeNextState::Status => handle_status(server, ctrl, remote_addr, handshake)
.await
.context("error during status"),
HandshakeNextState::Login => match handle_login(&server, &mut c, remote_addr, handshake)
HandshakeNextState::Login => match handle_login(&server, &mut ctrl, remote_addr, handshake)
.await
.context("error during login")?
{
Some(npd) => handle_play(&server, c, npd)
.await
.context("error during play"),
Some(ncd) => {
let msg = NewClientMessage {
ncd,
ctrl: ctrl.into_play_packet_controller(
server.0.incoming_capacity,
server.0.outgoing_capacity,
),
};
let _ = server.0.new_clients_tx.send_async(msg).await;
Ok(())
}
None => Ok(()),
},
}
}
async fn handle_status<C: Config>(
server: SharedServer<C>,
c: &mut Codec,
async fn handle_status(
server: SharedServer<impl Config>,
mut ctrl: InitialPacketController<OwnedReadHalf, OwnedWriteHalf>,
remote_addr: SocketAddr,
handshake: Handshake,
) -> anyhow::Result<()> {
c.dec.read_packet::<StatusRequest>().await?;
ctrl.recv_packet::<StatusRequest>().await?;
match server
.0
@ -598,8 +575,7 @@ async fn handle_status<C: Config>(
.insert("favicon".to_owned(), Value::String(buf));
}
c.enc
.write_packet(&StatusResponse {
ctrl.send_packet(&StatusResponse {
json_response: json.to_string(),
})
.await?;
@ -607,9 +583,9 @@ async fn handle_status<C: Config>(
ServerListPing::Ignore => return Ok(()),
}
let PingRequest { payload } = c.dec.read_packet().await?;
let PingRequest { payload } = ctrl.recv_packet().await?;
c.enc.write_packet(&PingResponse { payload }).await?;
ctrl.send_packet(&PingResponse { payload }).await?;
Ok(())
}
@ -617,7 +593,7 @@ async fn handle_status<C: Config>(
/// Handle the login process and return the new client's data if successful.
async fn handle_login(
server: &SharedServer<impl Config>,
c: &mut Codec,
ctrl: &mut InitialPacketController<OwnedReadHalf, OwnedWriteHalf>,
remote_addr: SocketAddr,
handshake: Handshake,
) -> anyhow::Result<Option<NewClientData>> {
@ -630,33 +606,30 @@ async fn handle_login(
username,
sig_data: _, // TODO
profile_id: _, // TODO
} = c.dec.read_packet().await?;
} = ctrl.recv_packet().await?;
let ncd = match server.connection_mode() {
ConnectionMode::Online => login::online(server, c, remote_addr, username).await?,
ConnectionMode::Online => login::online(server, ctrl, remote_addr, username).await?,
ConnectionMode::Offline => login::offline(remote_addr, username)?,
ConnectionMode::BungeeCord => login::bungeecord(&handshake.server_address, username)?,
ConnectionMode::Velocity { secret } => login::velocity(c, username, secret).await?,
ConnectionMode::Velocity { secret } => login::velocity(ctrl, username, secret).await?,
};
let compression_threshold = 256;
c.enc
.write_packet(&SetCompression {
ctrl.send_packet(&SetCompression {
threshold: VarInt(compression_threshold as i32),
})
.await?;
c.enc.enable_compression(compression_threshold);
c.dec.enable_compression(compression_threshold);
ctrl.set_compression(Some(compression_threshold));
if let Err(reason) = server.0.cfg.login(server, &ncd).await {
log::info!("Disconnect at login: \"{reason}\"");
c.enc.write_packet(&DisconnectLogin { reason }).await?;
ctrl.send_packet(&DisconnectLogin { reason }).await?;
return Ok(None);
}
c.enc
.write_packet(&LoginSuccess {
ctrl.send_packet(&LoginSuccess {
uuid: ncd.uuid,
username: ncd.username.clone(),
properties: Vec::new(),
@ -665,55 +638,3 @@ async fn handle_login(
Ok(Some(ncd))
}
async fn handle_play<C: Config>(
server: &SharedServer<C>,
c: Codec,
ncd: NewClientData,
) -> anyhow::Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
server
.0
.new_clients_tx
.send_async(NewClientMessage {
ncd,
reply: reply_tx,
})
.await?;
let (packet_tx, packet_rx) = match reply_rx.await {
Ok(res) => res,
Err(_) => return Ok(()), // Server closed
};
let Codec { mut enc, mut dec } = c;
tokio::spawn(async move {
while let Ok(msg) = packet_rx.recv_async().await {
match msg {
S2cPlayMessage::Queue(pkt) => {
if let Err(e) = enc.queue_packet(&pkt) {
log::debug!("error while queueing play packet: {e:#}");
break;
}
}
S2cPlayMessage::Flush => {
if let Err(e) = enc.flush().await {
log::debug!("error while flushing packet queue: {e:#}");
break;
}
}
}
}
});
loop {
let pkt = dec.read_packet().await?;
if packet_tx.send_async(pkt).await.is_err() {
break;
}
}
Ok(())
}

244
src/server/byte_channel.rs Normal file
View file

@ -0,0 +1,244 @@
#![allow(dead_code)]
use std::sync::{Arc, Mutex};
use bytes::BytesMut;
use thiserror::Error;
use tokio::sync::Notify;
pub fn byte_channel(limit: usize) -> (ByteSender, ByteReceiver) {
let shared = Arc::new(Shared {
mtx: Mutex::new(Inner {
bytes: BytesMut::new(),
disconnected: false,
}),
notify: Notify::new(),
limit,
});
let sender = ByteSender {
shared: shared.clone(),
};
let receiver = ByteReceiver { shared };
(sender, receiver)
}
pub struct ByteSender {
shared: Arc<Shared>,
}
pub struct ByteReceiver {
shared: Arc<Shared>,
}
struct Shared {
mtx: Mutex<Inner>,
notify: Notify,
limit: usize,
}
struct Inner {
bytes: BytesMut,
disconnected: bool,
}
impl ByteSender {
pub fn take_capacity(&mut self, additional: usize) -> BytesMut {
let mut lck = self.shared.mtx.lock().unwrap();
lck.bytes.reserve(additional);
let len = lck.bytes.len();
lck.bytes.split_off(len)
}
pub fn try_send(&mut self, mut bytes: BytesMut) -> Result<(), TrySendError> {
let mut lck = self.shared.mtx.lock().unwrap();
if lck.disconnected {
return Err(TrySendError::Disconnected(bytes));
}
if bytes.is_empty() {
return Ok(());
}
let available = self.shared.limit - lck.bytes.len();
if bytes.len() > available {
if available > 0 {
lck.bytes.unsplit(bytes.split_to(available));
self.shared.notify.notify_waiters();
}
return Err(TrySendError::Full(bytes));
}
lck.bytes.unsplit(bytes);
self.shared.notify.notify_waiters();
Ok(())
}
pub async fn send_async(&mut self, mut bytes: BytesMut) -> Result<(), SendError> {
loop {
{
let mut lck = self.shared.mtx.lock().unwrap();
if lck.disconnected {
return Err(SendError(bytes));
}
if bytes.is_empty() {
return Ok(());
}
let available = self.shared.limit - lck.bytes.len();
if bytes.len() <= available {
lck.bytes.unsplit(bytes);
self.shared.notify.notify_waiters();
return Ok(());
}
if available > 0 {
lck.bytes.unsplit(bytes.split_to(available));
self.shared.notify.notify_waiters();
}
}
self.shared.notify.notified().await;
}
}
pub fn is_disconnected(&self) -> bool {
self.shared.mtx.lock().unwrap().disconnected
}
}
/// Contains any excess bytes not sent.
#[derive(Clone, PartialEq, Eq, Debug, Error)]
pub enum TrySendError {
#[error("sender disconnected")]
Disconnected(BytesMut),
#[error("channel full")]
Full(BytesMut),
}
#[derive(Clone, PartialEq, Eq, Debug, Error)]
#[error("sender disconnected")]
pub struct SendError(pub BytesMut);
impl SendError {
pub fn into_inner(self) -> BytesMut {
self.0
}
}
impl ByteReceiver {
pub fn try_recv(&mut self) -> Result<BytesMut, TryRecvError> {
let mut lck = self.shared.mtx.lock().unwrap();
if !lck.bytes.is_empty() {
self.shared.notify.notify_waiters();
return Ok(lck.bytes.split());
}
if lck.disconnected {
return Err(TryRecvError::Disconnected);
}
Err(TryRecvError::Empty)
}
pub async fn recv_async(&mut self) -> Result<BytesMut, RecvError> {
loop {
{
let mut lck = self.shared.mtx.lock().unwrap();
if !lck.bytes.is_empty() {
self.shared.notify.notify_waiters();
return Ok(lck.bytes.split());
}
if lck.disconnected {
return Err(RecvError::Disconnected);
}
}
self.shared.notify.notified().await;
}
}
pub fn is_disconnected(&self) -> bool {
self.shared.mtx.lock().unwrap().disconnected
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, Error)]
pub enum TryRecvError {
#[error("empty channel")]
Empty,
#[error("receiver disconnected")]
Disconnected,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, Error)]
pub enum RecvError {
#[error("receiver disconnected")]
Disconnected,
}
impl Drop for ByteSender {
fn drop(&mut self) {
self.shared.mtx.lock().unwrap().disconnected = true;
}
}
impl Drop for ByteReceiver {
fn drop(&mut self) {
self.shared.mtx.lock().unwrap().disconnected = true;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn byte_channel_try() {
let (mut sender, mut receiver) = byte_channel(4);
assert_eq!(
sender.try_send("hello".as_bytes().into()),
Err(TrySendError::Full("o".as_bytes().into()))
);
assert_eq!(
receiver.try_recv().unwrap(),
BytesMut::from("hell".as_bytes())
);
}
#[tokio::test]
async fn byte_channel_async() {
let (mut sender, mut receiver) = byte_channel(4);
let t = tokio::spawn(async move {
let bytes = receiver.recv_async().await.unwrap();
assert_eq!(&bytes[..], b"hell");
let bytes = receiver.recv_async().await.unwrap();
assert_eq!(&bytes[..], b"o");
assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
});
sender.send_async("hello".as_bytes().into()).await.unwrap();
t.await.unwrap();
assert!(sender.is_disconnected());
}
}

View file

@ -13,6 +13,7 @@ use rsa::PaddingScheme;
use serde::Deserialize;
use sha1::Sha1;
use sha2::{Digest, Sha256};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use uuid::Uuid;
use crate::config::Config;
@ -26,7 +27,8 @@ use crate::protocol::packets::s2c::login::{
};
use crate::protocol::packets::Property;
use crate::protocol::{BoundedArray, Decode, RawBytes, VarInt};
use crate::server::{Codec, NewClientData, SharedServer};
use crate::server::packet_controller::InitialPacketController;
use crate::server::{NewClientData, SharedServer};
use crate::text::Text;
use crate::username::Username;
@ -34,14 +36,13 @@ use crate::username::Username;
/// [`ConnectionMode::Online`](crate::config::ConnectionMode).
pub(super) async fn online(
server: &SharedServer<impl Config>,
c: &mut Codec,
ctrl: &mut InitialPacketController<OwnedReadHalf, OwnedWriteHalf>,
remote_addr: SocketAddr,
username: Username<String>,
) -> anyhow::Result<NewClientData> {
let my_verify_token: [u8; 16] = rand::random();
c.enc
.write_packet(&EncryptionRequest {
ctrl.send_packet(&EncryptionRequest {
server_id: Default::default(), // Always empty
public_key: server.0.public_key_der.to_vec(),
verify_token: my_verify_token.to_vec().into(),
@ -51,7 +52,7 @@ pub(super) async fn online(
let EncryptionResponse {
shared_secret: BoundedArray(encrypted_shared_secret),
token_or_sig,
} = c.dec.read_packet().await?;
} = ctrl.recv_packet().await?;
let shared_secret = server
.0
@ -81,8 +82,7 @@ pub(super) async fn online(
.try_into()
.context("shared secret has the wrong length")?;
c.enc.enable_encryption(&crypt_key);
c.dec.enable_encryption(&crypt_key);
ctrl.enable_encryption(&crypt_key);
#[derive(Debug, Deserialize)]
struct AuthResponse {
@ -109,7 +109,7 @@ pub(super) async fn online(
StatusCode::OK => {}
StatusCode::NO_CONTENT => {
let reason = Text::translate("multiplayer.disconnect.unverified_username");
c.enc.write_packet(&DisconnectLogin { reason }).await?;
ctrl.send_packet(&DisconnectLogin { reason }).await?;
bail!("session server could not verify username");
}
status => {
@ -200,7 +200,7 @@ fn auth_digest(bytes: &[u8]) -> String {
}
pub(super) async fn velocity(
c: &mut Codec,
ctrl: &mut InitialPacketController<OwnedReadHalf, OwnedWriteHalf>,
username: Username<String>,
velocity_secret: &str,
) -> anyhow::Result<NewClientData> {
@ -210,8 +210,7 @@ pub(super) async fn velocity(
let message_id = 0;
// Send Player Info Request into the Plugin Channel
c.enc
.write_packet(&LoginPluginRequest {
ctrl.send_packet(&LoginPluginRequest {
message_id: VarInt(message_id),
channel: ident!("velocity:player_info"),
data: RawBytes(vec![VELOCITY_MIN_SUPPORTED_VERSION]),
@ -219,7 +218,7 @@ pub(super) async fn velocity(
.await?;
// Get Response
let plugin_response: LoginPluginResponse = c.dec.read_packet().await?;
let plugin_response: LoginPluginResponse = ctrl.recv_packet().await?;
ensure!(
plugin_response.message_id.0 == message_id,

View file

@ -0,0 +1,222 @@
use std::io::ErrorKind;
use std::time::Duration;
use tokio::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use crate::protocol::codec::{PacketDecoder, PacketEncoder};
use crate::protocol::packets::{DecodePacket, EncodePacket};
use crate::server::byte_channel::{byte_channel, ByteReceiver, ByteSender, TryRecvError};
pub struct InitialPacketController<R, W> {
reader: R,
writer: W,
enc: PacketEncoder,
dec: PacketDecoder,
timeout: Duration,
}
const READ_BUF_SIZE: usize = 4096;
impl<R, W> InitialPacketController<R, W>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
pub fn new(
reader: R,
writer: W,
enc: PacketEncoder,
dec: PacketDecoder,
timeout: Duration,
) -> Self {
Self {
reader,
writer,
enc,
dec,
timeout,
}
}
pub async fn send_packet<P>(&mut self, pkt: &P) -> anyhow::Result<()>
where
P: EncodePacket + ?Sized,
{
self.enc.append_packet(pkt)?;
let bytes = self.enc.take();
timeout(self.timeout, self.writer.write_all(&bytes)).await??;
Ok(())
}
pub async fn recv_packet<P>(&mut self) -> anyhow::Result<P>
where
P: DecodePacket,
{
timeout(self.timeout, async {
loop {
if let Some(pkt) = self.dec.try_next_packet()? {
return Ok(pkt);
}
self.dec.reserve(READ_BUF_SIZE);
let mut buf = self.dec.take_capacity();
if self.reader.read_buf(&mut buf).await? == 0 {
return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
}
// This should always be an O(1) unsplit because we reserved space earlier and
// the previous call to `read_buf` shouldn't have grown the allocation.
self.dec.queue_bytes(buf);
}
})
.await?
}
#[allow(dead_code)]
pub fn set_compression(&mut self, threshold: Option<u32>) {
self.enc.set_compression(threshold);
self.dec.set_compression(threshold.is_some());
}
pub fn enable_encryption(&mut self, key: &[u8; 16]) {
self.enc.enable_encryption(key);
self.dec.enable_encryption(key);
}
pub fn into_play_packet_controller(
mut self,
incoming_limit: usize,
outgoing_limit: usize,
) -> PlayPacketController
where
R: Send + 'static,
W: Send + 'static,
{
let (mut incoming_sender, incoming_receiver) = byte_channel(incoming_limit);
let reader_task = tokio::spawn(async move {
loop {
let mut buf = incoming_sender.take_capacity(READ_BUF_SIZE);
match self.reader.read_buf(&mut buf).await {
Ok(0) => break,
Err(e) => {
log::warn!("error reading packet data: {e}");
break;
}
_ => {}
}
// This should always be an O(1) unsplit because we reserved space earlier.
if let Err(e) = incoming_sender.send_async(buf).await {
log::warn!("error sending packet data: {e}");
break;
}
}
});
let (outgoing_sender, mut outgoing_receiver) = byte_channel(outgoing_limit);
let writer_task = tokio::spawn(async move {
loop {
let bytes = match outgoing_receiver.recv_async().await {
Ok(bytes) => bytes,
Err(e) => {
log::warn!("error receiving packet data: {e}");
break;
}
};
if let Err(e) = self.writer.write_all(&bytes).await {
log::warn!("error writing packet data: {e}");
}
}
});
PlayPacketController {
enc: self.enc,
dec: self.dec,
send: outgoing_sender,
recv: incoming_receiver,
reader_task,
writer_task: Some(writer_task),
}
}
}
/// A convenience structure for managing a pair of packet encoder/decoders and
/// the byte channels from which to send and receive the packet data during the
/// play state.
pub struct PlayPacketController {
enc: PacketEncoder,
dec: PacketDecoder,
send: ByteSender,
recv: ByteReceiver,
reader_task: JoinHandle<()>,
writer_task: Option<JoinHandle<()>>,
}
impl PlayPacketController {
pub fn append_packet<P>(&mut self, pkt: &P) -> anyhow::Result<()>
where
P: EncodePacket + ?Sized,
{
self.enc.append_packet(pkt)
}
pub fn prepend_packet<P>(&mut self, pkt: &P) -> anyhow::Result<()>
where
P: EncodePacket + ?Sized,
{
self.enc.prepend_packet(pkt)
}
pub fn try_next_packet<P>(&mut self) -> anyhow::Result<Option<P>>
where
P: DecodePacket,
{
self.dec.try_next_packet()
}
/// Returns true if the client is connected. Returns false otherwise.
pub fn try_recv(&mut self) -> bool {
match self.recv.try_recv() {
Ok(bytes) => {
self.dec.queue_bytes(bytes);
true
}
Err(TryRecvError::Empty) => true,
Err(TryRecvError::Disconnected) => false,
}
}
#[allow(dead_code)]
pub fn set_compression(&mut self, threshold: Option<u32>) {
self.enc.set_compression(threshold)
}
pub fn flush(&mut self) -> anyhow::Result<()> {
let bytes = self.enc.take();
self.send.try_send(bytes)?;
Ok(())
}
}
impl Drop for PlayPacketController {
fn drop(&mut self) {
self.reader_task.abort();
let _ = self.flush();
if let Some(writer_task) = self.writer_task.take() {
if !writer_task.is_finished() {
// Give any unsent packets a moment to send before we cut the connection.
tokio::spawn(timeout(Duration::from_secs(1), writer_task));
}
}
}
}

View file

@ -458,7 +458,8 @@ impl Encode for Text {
}
fn encoded_len(&self) -> usize {
todo!("remove Encode impl on text and come up with solution")
// TODO: This is obviously not ideal. This will be fixed later.
serde_json::to_string(self).map_or(0, |s| s.encoded_len())
}
}