2022-11-01 21:11:51 +11:00
|
|
|
use std::io::ErrorKind;
|
|
|
|
use std::time::Duration;
|
|
|
|
|
2022-11-14 01:10:42 +11:00
|
|
|
use anyhow::Result;
|
2022-11-01 21:11:51 +11:00
|
|
|
use tokio::io;
|
|
|
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
2022-11-10 13:30:44 +11:00
|
|
|
use tokio::runtime::Handle;
|
2022-11-01 21:11:51 +11:00
|
|
|
use tokio::task::JoinHandle;
|
|
|
|
use tokio::time::timeout;
|
2022-11-17 13:22:44 +11:00
|
|
|
use tracing::debug;
|
2022-11-15 17:30:20 +11:00
|
|
|
use valence_protocol::{Decode, Encode, Packet, PacketDecoder, PacketEncoder};
|
2022-11-01 21:11:51 +11:00
|
|
|
|
|
|
|
use crate::server::byte_channel::{byte_channel, ByteReceiver, ByteSender, TryRecvError};
|
|
|
|
|
|
|
|
pub struct InitialPacketController<R, W> {
|
|
|
|
reader: R,
|
|
|
|
writer: W,
|
|
|
|
enc: PacketEncoder,
|
|
|
|
dec: PacketDecoder,
|
|
|
|
timeout: Duration,
|
|
|
|
}
|
|
|
|
|
|
|
|
const READ_BUF_SIZE: usize = 4096;
|
|
|
|
|
|
|
|
impl<R, W> InitialPacketController<R, W>
|
|
|
|
where
|
|
|
|
R: AsyncRead + Unpin,
|
|
|
|
W: AsyncWrite + Unpin,
|
|
|
|
{
|
|
|
|
pub fn new(
|
|
|
|
reader: R,
|
|
|
|
writer: W,
|
|
|
|
enc: PacketEncoder,
|
|
|
|
dec: PacketDecoder,
|
|
|
|
timeout: Duration,
|
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
reader,
|
|
|
|
writer,
|
|
|
|
enc,
|
|
|
|
dec,
|
|
|
|
timeout,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-14 01:10:42 +11:00
|
|
|
pub async fn send_packet<P>(&mut self, pkt: &P) -> Result<()>
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
2022-11-14 01:10:42 +11:00
|
|
|
P: Encode + Packet + ?Sized,
|
2022-11-01 21:11:51 +11:00
|
|
|
{
|
|
|
|
self.enc.append_packet(pkt)?;
|
|
|
|
let bytes = self.enc.take();
|
|
|
|
timeout(self.timeout, self.writer.write_all(&bytes)).await??;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-11-14 01:10:42 +11:00
|
|
|
pub async fn recv_packet<'a, P>(&'a mut self) -> Result<P>
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
2022-11-14 01:10:42 +11:00
|
|
|
P: Decode<'a> + Packet,
|
2022-11-01 21:11:51 +11:00
|
|
|
{
|
|
|
|
timeout(self.timeout, async {
|
2022-11-14 01:10:42 +11:00
|
|
|
while !self.dec.has_next_packet()? {
|
|
|
|
self.dec.reserve(READ_BUF_SIZE);
|
|
|
|
let mut buf = self.dec.take_capacity();
|
|
|
|
|
|
|
|
if self.reader.read_buf(&mut buf).await? == 0 {
|
|
|
|
return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
|
|
|
|
}
|
|
|
|
|
|
|
|
// This should always be an O(1) unsplit because we reserved space earlier and
|
|
|
|
// the call to `read_buf` shouldn't have grown the allocation.
|
|
|
|
self.dec.queue_bytes(buf);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(self
|
|
|
|
.dec
|
|
|
|
.try_next_packet()?
|
|
|
|
.expect("decoder said it had another packet"))
|
|
|
|
|
|
|
|
// The following is what I want to write but can't due to borrow
|
|
|
|
// checker errors I don't understand.
|
|
|
|
/*
|
2022-11-01 21:11:51 +11:00
|
|
|
loop {
|
|
|
|
if let Some(pkt) = self.dec.try_next_packet()? {
|
|
|
|
return Ok(pkt);
|
|
|
|
}
|
|
|
|
|
|
|
|
self.dec.reserve(READ_BUF_SIZE);
|
|
|
|
let mut buf = self.dec.take_capacity();
|
|
|
|
|
|
|
|
if self.reader.read_buf(&mut buf).await? == 0 {
|
|
|
|
return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
|
|
|
|
}
|
|
|
|
|
|
|
|
// This should always be an O(1) unsplit because we reserved space earlier and
|
2022-11-14 01:10:42 +11:00
|
|
|
// the call to `read_buf` shouldn't have grown the allocation.
|
2022-11-01 21:11:51 +11:00
|
|
|
self.dec.queue_bytes(buf);
|
|
|
|
}
|
2022-11-14 01:10:42 +11:00
|
|
|
*/
|
2022-11-01 21:11:51 +11:00
|
|
|
})
|
|
|
|
.await?
|
|
|
|
}
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
pub fn set_compression(&mut self, threshold: Option<u32>) {
|
|
|
|
self.enc.set_compression(threshold);
|
|
|
|
self.dec.set_compression(threshold.is_some());
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn enable_encryption(&mut self, key: &[u8; 16]) {
|
|
|
|
self.enc.enable_encryption(key);
|
|
|
|
self.dec.enable_encryption(key);
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn into_play_packet_controller(
|
|
|
|
mut self,
|
|
|
|
incoming_limit: usize,
|
|
|
|
outgoing_limit: usize,
|
2022-11-10 13:30:44 +11:00
|
|
|
handle: Handle,
|
2022-11-01 21:11:51 +11:00
|
|
|
) -> PlayPacketController
|
|
|
|
where
|
|
|
|
R: Send + 'static,
|
|
|
|
W: Send + 'static,
|
|
|
|
{
|
|
|
|
let (mut incoming_sender, incoming_receiver) = byte_channel(incoming_limit);
|
|
|
|
|
|
|
|
let reader_task = tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
let mut buf = incoming_sender.take_capacity(READ_BUF_SIZE);
|
|
|
|
|
|
|
|
match self.reader.read_buf(&mut buf).await {
|
|
|
|
Ok(0) => break,
|
|
|
|
Err(e) => {
|
2022-11-17 13:22:44 +11:00
|
|
|
debug!("error reading packet data: {e}");
|
2022-11-01 21:11:51 +11:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
_ => {}
|
|
|
|
}
|
|
|
|
|
|
|
|
// This should always be an O(1) unsplit because we reserved space earlier.
|
|
|
|
if let Err(e) = incoming_sender.send_async(buf).await {
|
2022-11-17 13:22:44 +11:00
|
|
|
debug!("error sending packet data: {e}");
|
2022-11-01 21:11:51 +11:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
let (outgoing_sender, mut outgoing_receiver) = byte_channel(outgoing_limit);
|
|
|
|
|
|
|
|
let writer_task = tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
let bytes = match outgoing_receiver.recv_async().await {
|
|
|
|
Ok(bytes) => bytes,
|
|
|
|
Err(e) => {
|
2022-11-17 13:22:44 +11:00
|
|
|
debug!("error receiving packet data: {e}");
|
2022-11-01 21:11:51 +11:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Err(e) = self.writer.write_all(&bytes).await {
|
2022-11-17 13:22:44 +11:00
|
|
|
debug!("error writing packet data: {e}");
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
PlayPacketController {
|
|
|
|
enc: self.enc,
|
|
|
|
dec: self.dec,
|
|
|
|
send: outgoing_sender,
|
|
|
|
recv: incoming_receiver,
|
|
|
|
reader_task,
|
|
|
|
writer_task: Some(writer_task),
|
2022-11-10 13:30:44 +11:00
|
|
|
handle,
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// A convenience structure for managing a pair of packet encoder/decoders and
|
|
|
|
/// the byte channels from which to send and receive the packet data during the
|
|
|
|
/// play state.
|
|
|
|
pub struct PlayPacketController {
|
|
|
|
enc: PacketEncoder,
|
|
|
|
dec: PacketDecoder,
|
|
|
|
send: ByteSender,
|
|
|
|
recv: ByteReceiver,
|
|
|
|
reader_task: JoinHandle<()>,
|
|
|
|
writer_task: Option<JoinHandle<()>>,
|
2022-11-10 13:30:44 +11:00
|
|
|
handle: Handle,
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
impl PlayPacketController {
|
2022-11-14 01:10:42 +11:00
|
|
|
pub fn append_packet<P>(&mut self, pkt: &P) -> Result<()>
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
2022-11-14 01:10:42 +11:00
|
|
|
P: Encode + Packet + ?Sized,
|
2022-11-01 21:11:51 +11:00
|
|
|
{
|
|
|
|
self.enc.append_packet(pkt)
|
|
|
|
}
|
|
|
|
|
2022-11-14 01:10:42 +11:00
|
|
|
pub fn prepend_packet<P>(&mut self, pkt: &P) -> Result<()>
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
2022-11-14 01:10:42 +11:00
|
|
|
P: Encode + Packet + ?Sized,
|
2022-11-01 21:11:51 +11:00
|
|
|
{
|
|
|
|
self.enc.prepend_packet(pkt)
|
|
|
|
}
|
|
|
|
|
2022-11-14 01:10:42 +11:00
|
|
|
pub fn try_next_packet<'a, P>(&'a mut self) -> Result<Option<P>>
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
2022-11-14 01:10:42 +11:00
|
|
|
P: Decode<'a> + Packet,
|
2022-11-01 21:11:51 +11:00
|
|
|
{
|
|
|
|
self.dec.try_next_packet()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns true if the client is connected. Returns false otherwise.
|
|
|
|
pub fn try_recv(&mut self) -> bool {
|
|
|
|
match self.recv.try_recv() {
|
|
|
|
Ok(bytes) => {
|
|
|
|
self.dec.queue_bytes(bytes);
|
|
|
|
true
|
|
|
|
}
|
|
|
|
Err(TryRecvError::Empty) => true,
|
|
|
|
Err(TryRecvError::Disconnected) => false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
pub fn set_compression(&mut self, threshold: Option<u32>) {
|
|
|
|
self.enc.set_compression(threshold)
|
|
|
|
}
|
|
|
|
|
2022-11-14 01:10:42 +11:00
|
|
|
pub fn flush(&mut self) -> Result<()> {
|
2022-11-01 21:11:51 +11:00
|
|
|
let bytes = self.enc.take();
|
|
|
|
self.send.try_send(bytes)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for PlayPacketController {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.reader_task.abort();
|
|
|
|
|
|
|
|
let _ = self.flush();
|
|
|
|
|
|
|
|
if let Some(writer_task) = self.writer_task.take() {
|
|
|
|
if !writer_task.is_finished() {
|
2022-11-10 13:30:44 +11:00
|
|
|
let _guard = self.handle.enter();
|
|
|
|
|
2022-11-01 21:11:51 +11:00
|
|
|
// Give any unsent packets a moment to send before we cut the connection.
|
2022-11-10 13:30:44 +11:00
|
|
|
self.handle
|
|
|
|
.spawn(timeout(Duration::from_secs(1), writer_task));
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|