2022-11-01 21:11:51 +11:00
|
|
|
use std::io::ErrorKind;
|
2023-04-09 05:55:31 +10:00
|
|
|
use std::sync::Arc;
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
use std::{io, mem};
|
2022-11-01 21:11:51 +11:00
|
|
|
|
2023-02-12 04:51:53 +11:00
|
|
|
use anyhow::bail;
|
2023-04-09 05:55:31 +10:00
|
|
|
use bytes::{Buf, BytesMut};
|
2022-11-01 21:11:51 +11:00
|
|
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
2023-04-09 05:55:31 +10:00
|
|
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
2022-11-01 21:11:51 +11:00
|
|
|
use tokio::task::JoinHandle;
|
|
|
|
use tokio::time::timeout;
|
2023-04-09 05:55:31 +10:00
|
|
|
use tracing::{debug, warn};
|
|
|
|
use valence_protocol::decoder::{decode_packet, PacketDecoder};
|
|
|
|
use valence_protocol::encoder::PacketEncoder;
|
|
|
|
use valence_protocol::var_int::VarInt;
|
|
|
|
use valence_protocol::{Decode, Packet};
|
|
|
|
|
|
|
|
use crate::client::{ClientConnection, ReceivedPacket};
|
|
|
|
use crate::server::byte_channel::{byte_channel, ByteSender, TrySendError};
|
2023-02-12 04:51:53 +11:00
|
|
|
use crate::server::NewClientInfo;
|
2022-11-01 21:11:51 +11:00
|
|
|
|
2023-02-12 04:51:53 +11:00
|
|
|
pub(super) struct InitialConnection<R, W> {
|
2022-11-01 21:11:51 +11:00
|
|
|
reader: R,
|
|
|
|
writer: W,
|
|
|
|
enc: PacketEncoder,
|
|
|
|
dec: PacketDecoder,
|
2023-04-09 05:55:31 +10:00
|
|
|
frame: BytesMut,
|
2022-11-01 21:11:51 +11:00
|
|
|
timeout: Duration,
|
2022-11-29 22:37:32 +11:00
|
|
|
permit: OwnedSemaphorePermit,
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
const READ_BUF_SIZE: usize = 4096;
|
|
|
|
|
2023-02-12 04:51:53 +11:00
|
|
|
impl<R, W> InitialConnection<R, W>
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
|
|
|
R: AsyncRead + Unpin,
|
|
|
|
W: AsyncWrite + Unpin,
|
|
|
|
{
|
|
|
|
pub fn new(
|
|
|
|
reader: R,
|
|
|
|
writer: W,
|
|
|
|
enc: PacketEncoder,
|
|
|
|
dec: PacketDecoder,
|
|
|
|
timeout: Duration,
|
2022-11-29 22:37:32 +11:00
|
|
|
permit: OwnedSemaphorePermit,
|
2022-11-01 21:11:51 +11:00
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
reader,
|
|
|
|
writer,
|
|
|
|
enc,
|
|
|
|
dec,
|
2023-04-09 05:55:31 +10:00
|
|
|
frame: BytesMut::new(),
|
2022-11-01 21:11:51 +11:00
|
|
|
timeout,
|
2022-11-29 22:37:32 +11:00
|
|
|
permit,
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-26 06:21:25 +11:00
|
|
|
pub async fn send_packet<'a, P>(&mut self, pkt: &P) -> anyhow::Result<()>
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
2023-02-26 06:21:25 +11:00
|
|
|
P: Packet<'a>,
|
2022-11-01 21:11:51 +11:00
|
|
|
{
|
|
|
|
self.enc.append_packet(pkt)?;
|
|
|
|
let bytes = self.enc.take();
|
|
|
|
timeout(self.timeout, self.writer.write_all(&bytes)).await??;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-02-12 04:51:53 +11:00
|
|
|
pub async fn recv_packet<'a, P>(&'a mut self) -> anyhow::Result<P>
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
2023-02-26 06:21:25 +11:00
|
|
|
P: Packet<'a>,
|
2022-11-01 21:11:51 +11:00
|
|
|
{
|
|
|
|
timeout(self.timeout, async {
|
|
|
|
loop {
|
2023-04-09 05:55:31 +10:00
|
|
|
if let Some(frame) = self.dec.try_next_packet()? {
|
|
|
|
self.frame = frame;
|
|
|
|
|
|
|
|
return decode_packet(&self.frame);
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
self.dec.reserve(READ_BUF_SIZE);
|
|
|
|
let mut buf = self.dec.take_capacity();
|
|
|
|
|
|
|
|
if self.reader.read_buf(&mut buf).await? == 0 {
|
|
|
|
return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
|
|
|
|
}
|
|
|
|
|
|
|
|
// This should always be an O(1) unsplit because we reserved space earlier and
|
2022-11-14 01:10:42 +11:00
|
|
|
// the call to `read_buf` shouldn't have grown the allocation.
|
2022-11-01 21:11:51 +11:00
|
|
|
self.dec.queue_bytes(buf);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.await?
|
|
|
|
}
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
pub fn set_compression(&mut self, threshold: Option<u32>) {
|
|
|
|
self.enc.set_compression(threshold);
|
2023-04-09 05:55:31 +10:00
|
|
|
self.dec.set_compression(threshold);
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn enable_encryption(&mut self, key: &[u8; 16]) {
|
|
|
|
self.enc.enable_encryption(key);
|
|
|
|
self.dec.enable_encryption(key);
|
|
|
|
}
|
|
|
|
|
2023-04-09 05:55:31 +10:00
|
|
|
pub fn into_client_args(
|
2022-11-01 21:11:51 +11:00
|
|
|
mut self,
|
2023-02-12 04:51:53 +11:00
|
|
|
info: NewClientInfo,
|
2022-11-01 21:11:51 +11:00
|
|
|
incoming_limit: usize,
|
|
|
|
outgoing_limit: usize,
|
2023-04-09 05:55:31 +10:00
|
|
|
) -> NewClientArgs
|
2022-11-01 21:11:51 +11:00
|
|
|
where
|
|
|
|
R: Send + 'static,
|
|
|
|
W: Send + 'static,
|
|
|
|
{
|
2023-04-09 05:55:31 +10:00
|
|
|
let (incoming_sender, incoming_receiver) = flume::unbounded();
|
|
|
|
|
|
|
|
let recv_sem = Arc::new(Semaphore::new(incoming_limit));
|
|
|
|
let recv_sem_clone = recv_sem.clone();
|
2022-11-01 21:11:51 +11:00
|
|
|
|
|
|
|
let reader_task = tokio::spawn(async move {
|
2023-04-09 05:55:31 +10:00
|
|
|
let mut buf = BytesMut::new();
|
|
|
|
|
2022-11-01 21:11:51 +11:00
|
|
|
loop {
|
2023-04-09 05:55:31 +10:00
|
|
|
let mut data = match self.dec.try_next_packet() {
|
|
|
|
Ok(Some(data)) => data,
|
|
|
|
Ok(None) => {
|
|
|
|
// Incomplete packet. Need more data.
|
2022-11-01 21:11:51 +11:00
|
|
|
|
2023-04-09 05:55:31 +10:00
|
|
|
buf.reserve(READ_BUF_SIZE);
|
|
|
|
match self.reader.read_buf(&mut buf).await {
|
|
|
|
Ok(0) => break, // Reader is at EOF.
|
|
|
|
Ok(_) => {}
|
|
|
|
Err(e) => {
|
|
|
|
debug!("error reading data from stream: {e}");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
self.dec.queue_bytes(buf.split());
|
|
|
|
|
|
|
|
continue;
|
|
|
|
}
|
2022-11-01 21:11:51 +11:00
|
|
|
Err(e) => {
|
2023-04-09 05:55:31 +10:00
|
|
|
warn!("error decoding packet frame: {e:#}");
|
2022-11-01 21:11:51 +11:00
|
|
|
break;
|
|
|
|
}
|
2023-04-09 05:55:31 +10:00
|
|
|
};
|
|
|
|
|
|
|
|
let timestamp = Instant::now();
|
|
|
|
|
|
|
|
// Remove the packet ID from the front of the data.
|
|
|
|
let packet_id = {
|
|
|
|
let mut r = &data[..];
|
|
|
|
|
|
|
|
match VarInt::decode(&mut r) {
|
|
|
|
Ok(id) => {
|
|
|
|
data.advance(data.len() - r.len());
|
|
|
|
id.0
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
warn!("failed to decode packet ID: {e:#}");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// Estimate memory usage of this packet.
|
|
|
|
let cost = mem::size_of::<ReceivedPacket>() + data.len();
|
|
|
|
|
|
|
|
if cost > incoming_limit {
|
|
|
|
debug!(
|
|
|
|
cost,
|
|
|
|
incoming_limit,
|
|
|
|
"cost of received packet is greater than the incoming memory limit"
|
|
|
|
);
|
|
|
|
// We would never acquire enough permits, so we should exit instead of getting
|
|
|
|
// stuck.
|
|
|
|
break;
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
|
2023-04-09 05:55:31 +10:00
|
|
|
// Wait until there's enough space for this packet.
|
|
|
|
let Ok(permits) = recv_sem.acquire_many(cost as u32).await else {
|
|
|
|
// Semaphore closed.
|
|
|
|
break;
|
|
|
|
};
|
|
|
|
|
|
|
|
// The permits will be added back on the other side of the channel.
|
|
|
|
permits.forget();
|
|
|
|
|
|
|
|
let packet = ReceivedPacket {
|
|
|
|
timestamp,
|
|
|
|
id: packet_id,
|
|
|
|
data: data.freeze(),
|
|
|
|
};
|
|
|
|
|
|
|
|
if incoming_sender.try_send(packet).is_err() {
|
|
|
|
// Channel closed.
|
2022-11-01 21:11:51 +11:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
let (outgoing_sender, mut outgoing_receiver) = byte_channel(outgoing_limit);
|
|
|
|
|
|
|
|
let writer_task = tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
let bytes = match outgoing_receiver.recv_async().await {
|
|
|
|
Ok(bytes) => bytes,
|
|
|
|
Err(e) => {
|
2022-11-17 13:22:44 +11:00
|
|
|
debug!("error receiving packet data: {e}");
|
2022-11-01 21:11:51 +11:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Err(e) = self.writer.write_all(&bytes).await {
|
2023-04-09 05:55:31 +10:00
|
|
|
debug!("error writing data to stream: {e}");
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2023-04-09 05:55:31 +10:00
|
|
|
NewClientArgs {
|
2023-02-12 04:51:53 +11:00
|
|
|
info,
|
2023-04-09 05:55:31 +10:00
|
|
|
conn: Box::new(RealClientConnection {
|
2022-11-29 22:37:32 +11:00
|
|
|
send: outgoing_sender,
|
|
|
|
recv: incoming_receiver,
|
2023-04-09 05:55:31 +10:00
|
|
|
recv_sem: recv_sem_clone,
|
|
|
|
_client_permit: self.permit,
|
2022-11-29 22:37:32 +11:00
|
|
|
reader_task,
|
2023-02-12 04:51:53 +11:00
|
|
|
writer_task,
|
|
|
|
}),
|
2023-04-09 05:55:31 +10:00
|
|
|
enc: self.enc,
|
|
|
|
}
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-04-09 05:55:31 +10:00
|
|
|
pub struct NewClientArgs {
|
|
|
|
pub info: NewClientInfo,
|
|
|
|
pub conn: Box<dyn ClientConnection>,
|
|
|
|
pub enc: PacketEncoder,
|
|
|
|
}
|
|
|
|
|
2023-02-12 04:51:53 +11:00
|
|
|
struct RealClientConnection {
|
2022-11-01 21:11:51 +11:00
|
|
|
send: ByteSender,
|
2023-04-09 05:55:31 +10:00
|
|
|
recv: flume::Receiver<ReceivedPacket>,
|
|
|
|
/// Limits the amount of data queued in the `recv` channel. Each permit
|
|
|
|
/// represents one byte.
|
|
|
|
recv_sem: Arc<Semaphore>,
|
|
|
|
/// Limits the number of new clients that can connect to the server. Permit
|
|
|
|
/// is released when the connection is dropped.
|
|
|
|
_client_permit: OwnedSemaphorePermit,
|
2023-02-12 04:51:53 +11:00
|
|
|
reader_task: JoinHandle<()>,
|
|
|
|
writer_task: JoinHandle<()>,
|
2022-12-11 21:37:02 +11:00
|
|
|
}
|
|
|
|
|
2023-02-12 04:51:53 +11:00
|
|
|
impl Drop for RealClientConnection {
|
2022-11-01 21:11:51 +11:00
|
|
|
fn drop(&mut self) {
|
2023-02-12 04:51:53 +11:00
|
|
|
self.writer_task.abort();
|
|
|
|
self.reader_task.abort();
|
2022-11-01 21:11:51 +11:00
|
|
|
}
|
|
|
|
}
|
2022-11-29 22:37:32 +11:00
|
|
|
|
2023-02-12 04:51:53 +11:00
|
|
|
impl ClientConnection for RealClientConnection {
|
|
|
|
fn try_send(&mut self, bytes: BytesMut) -> anyhow::Result<()> {
|
|
|
|
match self.send.try_send(bytes) {
|
|
|
|
Ok(()) => Ok(()),
|
|
|
|
Err(TrySendError::Full(_)) => bail!(
|
|
|
|
"reached configured outgoing limit of {} bytes",
|
|
|
|
self.send.limit()
|
|
|
|
),
|
|
|
|
Err(TrySendError::Disconnected(_)) => bail!("client disconnected"),
|
|
|
|
}
|
2022-11-29 22:37:32 +11:00
|
|
|
}
|
|
|
|
|
2023-04-09 05:55:31 +10:00
|
|
|
fn try_recv(&mut self) -> anyhow::Result<Option<ReceivedPacket>> {
|
2022-11-29 22:37:32 +11:00
|
|
|
match self.recv.try_recv() {
|
2023-04-09 05:55:31 +10:00
|
|
|
Ok(packet) => {
|
|
|
|
let cost = mem::size_of::<ReceivedPacket>() + packet.data.len();
|
|
|
|
|
|
|
|
// Add the permits back that we removed eariler.
|
|
|
|
self.recv_sem.add_permits(cost);
|
|
|
|
|
|
|
|
Ok(Some(packet))
|
|
|
|
}
|
|
|
|
Err(flume::TryRecvError::Empty) => Ok(None),
|
|
|
|
Err(flume::TryRecvError::Disconnected) => bail!("client disconnected"),
|
2022-11-29 22:37:32 +11:00
|
|
|
}
|
|
|
|
}
|
2023-04-09 05:55:31 +10:00
|
|
|
|
|
|
|
fn len(&self) -> usize {
|
|
|
|
self.recv.len()
|
|
|
|
}
|
2022-11-29 22:37:32 +11:00
|
|
|
}
|