Fix missing tokio context on client disconnect

The `PlayPacketController` might spawn a tokio task, but it was not in a
tokio context when that happened. This is because tokio contexts are
thread-local, but rayon tasks run in their own thread pool.
This commit is contained in:
Ryan 2022-11-09 18:30:44 -08:00
parent bb43856c9e
commit 923fabe890
2 changed files with 14 additions and 4 deletions

View file

@ -88,8 +88,10 @@ struct SharedServerInner<C: Config> {
max_connections: usize, max_connections: usize,
incoming_capacity: usize, incoming_capacity: usize,
outgoing_capacity: usize, outgoing_capacity: usize,
/// The tokio handle used by the server.
tokio_handle: Handle, tokio_handle: Handle,
/// Store this here so we don't drop it. /// Holding a runtime handle is not enough to keep tokio working. We need
/// to store the runtime here so we don't drop it.
_tokio_runtime: Option<Runtime>, _tokio_runtime: Option<Runtime>,
dimensions: Vec<Dimension>, dimensions: Vec<Dimension>,
biomes: Vec<Biome>, biomes: Vec<Biome>,
@ -254,8 +256,8 @@ impl<C: Config> SharedServer<C> {
/// Consumes the configuration and starts the server. /// Consumes the configuration and starts the server.
/// ///
/// The function returns once the server has shut down, a runtime error /// This function blocks the current thread and returns once the server has shut
/// occurs, or the configuration is found to be invalid. /// down, a runtime error occurs, or the configuration is found to be invalid.
pub fn start_server<C: Config>(config: C, data: C::ServerState) -> ShutdownResult { pub fn start_server<C: Config>(config: C, data: C::ServerState) -> ShutdownResult {
let shared = setup_server(config) let shared = setup_server(config)
.context("failed to initialize server") .context("failed to initialize server")
@ -522,6 +524,7 @@ async fn handle_connection(
ctrl: ctrl.into_play_packet_controller( ctrl: ctrl.into_play_packet_controller(
server.0.incoming_capacity, server.0.incoming_capacity,
server.0.outgoing_capacity, server.0.outgoing_capacity,
server.tokio_handle().clone(),
), ),
}; };

View file

@ -3,6 +3,7 @@ use std::time::Duration;
use tokio::io; use tokio::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::runtime::Handle;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
@ -91,6 +92,7 @@ where
mut self, mut self,
incoming_limit: usize, incoming_limit: usize,
outgoing_limit: usize, outgoing_limit: usize,
handle: Handle,
) -> PlayPacketController ) -> PlayPacketController
where where
R: Send + 'static, R: Send + 'static,
@ -144,6 +146,7 @@ where
recv: incoming_receiver, recv: incoming_receiver,
reader_task, reader_task,
writer_task: Some(writer_task), writer_task: Some(writer_task),
handle,
} }
} }
} }
@ -158,6 +161,7 @@ pub struct PlayPacketController {
recv: ByteReceiver, recv: ByteReceiver,
reader_task: JoinHandle<()>, reader_task: JoinHandle<()>,
writer_task: Option<JoinHandle<()>>, writer_task: Option<JoinHandle<()>>,
handle: Handle,
} }
impl PlayPacketController { impl PlayPacketController {
@ -214,8 +218,11 @@ impl Drop for PlayPacketController {
if let Some(writer_task) = self.writer_task.take() { if let Some(writer_task) = self.writer_task.take() {
if !writer_task.is_finished() { if !writer_task.is_finished() {
let _guard = self.handle.enter();
// Give any unsent packets a moment to send before we cut the connection. // Give any unsent packets a moment to send before we cut the connection.
tokio::spawn(timeout(Duration::from_secs(1), writer_task)); self.handle
.spawn(timeout(Duration::from_secs(1), writer_task));
} }
} }
} }