diff --git a/crates/valence_stresser/Cargo.toml b/crates/valence_stresser/Cargo.toml new file mode 100644 index 0000000..6af619a --- /dev/null +++ b/crates/valence_stresser/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "valence_stresser" +description = "A stresser for Valence Minecraft server framework development purposes." +authors = ["qualterz "] +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.69" +clap = { version = "4.1.4", features = ["derive"] } +tokio = { version = "1.25.0", features = ["full"] } +uuid = { version = "1.3.0", features = ["v4"] } +valence_protocol = { version = "0.1.0", path = "../valence_protocol", features = [ + "compression", +] } diff --git a/crates/valence_stresser/src/args.rs b/crates/valence_stresser/src/args.rs new file mode 100644 index 0000000..e7e33d7 --- /dev/null +++ b/crates/valence_stresser/src/args.rs @@ -0,0 +1,29 @@ +use clap::{arg, command, Parser}; + +#[derive(Parser)] +#[command(author, version, about)] +pub(crate) struct StresserArgs { + /// IPv4/IPv6/DNS address of a server. + #[arg(short = 't', long = "target")] + pub target_host: String, + + /// Number of sessions. + #[arg(short = 'c', long = "count")] + pub sessions_count: usize, + + /// Name prefix of sessions. + #[arg(default_value = "Stresser")] + #[arg(short = 'n', long = "name")] + pub name_prefix: String, + + /// Spawn cooldown of sessions in milliseconds. + /// The lower the value, the more frequently sessions are spawned. + #[arg(default_value = "10")] + #[arg(long = "cooldown")] + pub spawn_cooldown: u64, + + /// Read buffer size in bytes. + #[arg(default_value = "4096")] + #[arg(long = "read-buffer")] + pub read_buffer_size: usize, +} diff --git a/crates/valence_stresser/src/main.rs b/crates/valence_stresser/src/main.rs new file mode 100644 index 0000000..0d064d3 --- /dev/null +++ b/crates/valence_stresser/src/main.rs @@ -0,0 +1,44 @@ +use core::time::Duration; +use std::net::ToSocketAddrs; +use std::sync::Arc; + +use args::StresserArgs; +use clap::Parser; +use stresser::{make_session, SessionParams}; +use tokio::sync::Semaphore; + +mod args; +pub mod stresser; + +#[tokio::main] +async fn main() { + let args = StresserArgs::parse(); + + let target_addr = args.target_host.to_socket_addrs().unwrap().next().unwrap(); + + let mut session_index: usize = 0; + + let sema = Arc::new(Semaphore::new(args.sessions_count)); + + while let Ok(perm) = sema.clone().acquire_owned().await { + let session_name = format!("{}{}", args.name_prefix, session_index); + + tokio::spawn(async move { + let params = SessionParams { + socket_addr: target_addr, + session_name: session_name.as_str(), + read_buffer_size: args.read_buffer_size, + }; + + if let Err(err) = make_session(¶ms).await { + eprintln!("Session {session_name} interrupted with error: {err}") + }; + + drop(perm); + }); + + session_index += 1; + + tokio::time::sleep(Duration::from_millis(args.spawn_cooldown)).await; + } +} diff --git a/crates/valence_stresser/src/stresser.rs b/crates/valence_stresser/src/stresser.rs new file mode 100644 index 0000000..c7bae68 --- /dev/null +++ b/crates/valence_stresser/src/stresser.rs @@ -0,0 +1,151 @@ +use std::io::{self, ErrorKind}; +use std::net::SocketAddr; + +use anyhow::bail; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; +use valence_protocol::packets::c2s::handshake::Handshake; +use valence_protocol::packets::c2s::login::LoginStart; +use valence_protocol::packets::c2s::play::{ConfirmTeleport, KeepAliveC2s, SetPlayerPosition}; +use valence_protocol::packets::{C2sHandshakePacket, S2cLoginPacket, S2cPlayPacket}; +use valence_protocol::types::HandshakeNextState; +use valence_protocol::{PacketDecoder, PacketEncoder, Username, Uuid, VarInt, PROTOCOL_VERSION}; + +pub struct SessionParams<'a> { + pub socket_addr: SocketAddr, + pub session_name: &'a str, + pub read_buffer_size: usize, +} + +pub async fn make_session<'a>(params: &SessionParams<'a>) -> anyhow::Result<()> { + let sock_addr = params.socket_addr; + let sess_name = params.session_name; + let rb_size = params.read_buffer_size; + + let mut conn = match TcpStream::connect(sock_addr).await { + Ok(conn) => { + println!("{sess_name} connected"); + conn + } + Err(err) => { + println!("{sess_name} connection failed"); + return Err(err.into()); + } + }; + + _ = conn.set_nodelay(true); + + let mut dec = PacketDecoder::new(); + let mut enc = PacketEncoder::new(); + + let server_addr_str = sock_addr.ip().to_string().as_str().to_owned(); + + let handshake_pkt = C2sHandshakePacket::Handshake(Handshake { + protocol_version: VarInt::from(PROTOCOL_VERSION), + server_address: &server_addr_str, + server_port: sock_addr.port(), + next_state: HandshakeNextState::Login, + }); + + _ = enc.append_packet(&handshake_pkt); + + _ = enc.append_packet(&LoginStart { + username: Username::new(sess_name).unwrap(), + profile_id: Some(Uuid::new_v4()), + }); + + let write_buf = enc.take(); + conn.write_all(&write_buf).await?; + + loop { + dec.reserve(rb_size); + + let mut read_buf = dec.take_capacity(); + + conn.readable().await?; + + match conn.try_read_buf(&mut read_buf) { + Ok(0) => return Err(io::Error::from(ErrorKind::UnexpectedEof).into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => return Err(e.into()), + Ok(_) => (), + }; + + dec.queue_bytes(read_buf); + + if let Ok(Some(pkt)) = dec.try_next_packet::() { + match pkt { + S2cLoginPacket::SetCompression(p) => { + let threshold = p.threshold.0 as u32; + + dec.set_compression(true); + enc.set_compression(Some(threshold)); + } + + S2cLoginPacket::LoginSuccess(_) => { + break; + } + + S2cLoginPacket::EncryptionRequest(_) => { + bail!("encryption not implemented"); + } + + _ => (), + } + } + } + + println!("{sess_name} logined"); + + loop { + while !dec.has_next_packet()? { + dec.reserve(rb_size); + + let mut read_buf = dec.take_capacity(); + + conn.readable().await?; + + match conn.try_read_buf(&mut read_buf) { + Ok(0) => return Err(io::Error::from(ErrorKind::UnexpectedEof).into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => return Err(e.into()), + Ok(_) => (), + }; + + dec.queue_bytes(read_buf); + } + + match dec.try_next_packet::() { + Ok(None) => continue, + Ok(Some(pkt)) => match pkt { + S2cPlayPacket::KeepAliveS2c(p) => { + enc.clear(); + + _ = enc.append_packet(&KeepAliveC2s { id: p.id }); + conn.write_all(&enc.take()).await?; + + println!("{sess_name} keep alive") + } + + S2cPlayPacket::SynchronizePlayerPosition(p) => { + enc.clear(); + + _ = enc.append_packet(&ConfirmTeleport { + teleport_id: p.teleport_id, + }); + + _ = enc.append_packet(&SetPlayerPosition { + position: p.position, + on_ground: true, + }); + + conn.write_all(&enc.take()).await?; + + println!("{sess_name} spawned") + } + _ => (), + }, + Err(err) => return Err(err), + } + } +}