mirror of
https://github.com/italicsjenga/valence.git
synced 2025-01-25 21:26:32 +11:00
Minimal stresser implementation (#240)
<!-- Please make sure that your PR is aligned with the guidelines in CONTRIBUTING.md to the best of your ability. --> <!-- Good PRs have tests! Make sure you have sufficient test coverage. --> ## Description <!-- Describe the changes you've made. You may include any justification you want here. --> An implementation of a Minecraft server stresser for testing purposes. The potential of this pull request is to implement a minimal stresser binary package that would be bound to the local `valence_protocol` package, so it would be always up to date with the latest Valence Minecraft protocol implementation. The MVP version is going to be able concurrently connect headless clients to a target Minecraft server. ## Test Plan <!-- Explain how you tested your changes, and include any code that you used to test this. --> <!-- If there is an example that is sufficient to use in place of a playground, replace the playground section with a note that indicates this. --> <!-- <details> <summary>Playground</summary> ```rust PASTE YOUR PLAYGROUND CODE HERE ``` </details> --> <!-- You need to include steps regardless of whether or not you are using a playground. --> Steps: 1. Ensure that the connection mode is offline 2. Run `cargo run --example bench_players` or any other example 3. Run `cargo run --package valence_stresser -- --target 127.0.0.1:25565 --count 1000` 4. Monitor the `bench_players` output #### Related closes #211 --------- Co-authored-by: Carson McManus <dyc3@users.noreply.github.com>
This commit is contained in:
parent
0319635a8b
commit
1cd6be0781
4 changed files with 239 additions and 0 deletions
15
crates/valence_stresser/Cargo.toml
Normal file
15
crates/valence_stresser/Cargo.toml
Normal file
|
@ -0,0 +1,15 @@
|
|||
[package]
|
||||
name = "valence_stresser"
|
||||
description = "A stresser for Valence Minecraft server framework development purposes."
|
||||
authors = ["qualterz <qualterz@tutamail.com>"]
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.69"
|
||||
clap = { version = "4.1.4", features = ["derive"] }
|
||||
tokio = { version = "1.25.0", features = ["full"] }
|
||||
uuid = { version = "1.3.0", features = ["v4"] }
|
||||
valence_protocol = { version = "0.1.0", path = "../valence_protocol", features = [
|
||||
"compression",
|
||||
] }
|
29
crates/valence_stresser/src/args.rs
Normal file
29
crates/valence_stresser/src/args.rs
Normal file
|
@ -0,0 +1,29 @@
|
|||
use clap::{arg, command, Parser};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about)]
|
||||
pub(crate) struct StresserArgs {
|
||||
/// IPv4/IPv6/DNS address of a server.
|
||||
#[arg(short = 't', long = "target")]
|
||||
pub target_host: String,
|
||||
|
||||
/// Number of sessions.
|
||||
#[arg(short = 'c', long = "count")]
|
||||
pub sessions_count: usize,
|
||||
|
||||
/// Name prefix of sessions.
|
||||
#[arg(default_value = "Stresser")]
|
||||
#[arg(short = 'n', long = "name")]
|
||||
pub name_prefix: String,
|
||||
|
||||
/// Spawn cooldown of sessions in milliseconds.
|
||||
/// The lower the value, the more frequently sessions are spawned.
|
||||
#[arg(default_value = "10")]
|
||||
#[arg(long = "cooldown")]
|
||||
pub spawn_cooldown: u64,
|
||||
|
||||
/// Read buffer size in bytes.
|
||||
#[arg(default_value = "4096")]
|
||||
#[arg(long = "read-buffer")]
|
||||
pub read_buffer_size: usize,
|
||||
}
|
44
crates/valence_stresser/src/main.rs
Normal file
44
crates/valence_stresser/src/main.rs
Normal file
|
@ -0,0 +1,44 @@
|
|||
use core::time::Duration;
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::sync::Arc;
|
||||
|
||||
use args::StresserArgs;
|
||||
use clap::Parser;
|
||||
use stresser::{make_session, SessionParams};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
mod args;
|
||||
pub mod stresser;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let args = StresserArgs::parse();
|
||||
|
||||
let target_addr = args.target_host.to_socket_addrs().unwrap().next().unwrap();
|
||||
|
||||
let mut session_index: usize = 0;
|
||||
|
||||
let sema = Arc::new(Semaphore::new(args.sessions_count));
|
||||
|
||||
while let Ok(perm) = sema.clone().acquire_owned().await {
|
||||
let session_name = format!("{}{}", args.name_prefix, session_index);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let params = SessionParams {
|
||||
socket_addr: target_addr,
|
||||
session_name: session_name.as_str(),
|
||||
read_buffer_size: args.read_buffer_size,
|
||||
};
|
||||
|
||||
if let Err(err) = make_session(¶ms).await {
|
||||
eprintln!("Session {session_name} interrupted with error: {err}")
|
||||
};
|
||||
|
||||
drop(perm);
|
||||
});
|
||||
|
||||
session_index += 1;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(args.spawn_cooldown)).await;
|
||||
}
|
||||
}
|
151
crates/valence_stresser/src/stresser.rs
Normal file
151
crates/valence_stresser/src/stresser.rs
Normal file
|
@ -0,0 +1,151 @@
|
|||
use std::io::{self, ErrorKind};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use anyhow::bail;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::TcpStream;
|
||||
use valence_protocol::packets::c2s::handshake::Handshake;
|
||||
use valence_protocol::packets::c2s::login::LoginStart;
|
||||
use valence_protocol::packets::c2s::play::{ConfirmTeleport, KeepAliveC2s, SetPlayerPosition};
|
||||
use valence_protocol::packets::{C2sHandshakePacket, S2cLoginPacket, S2cPlayPacket};
|
||||
use valence_protocol::types::HandshakeNextState;
|
||||
use valence_protocol::{PacketDecoder, PacketEncoder, Username, Uuid, VarInt, PROTOCOL_VERSION};
|
||||
|
||||
pub struct SessionParams<'a> {
|
||||
pub socket_addr: SocketAddr,
|
||||
pub session_name: &'a str,
|
||||
pub read_buffer_size: usize,
|
||||
}
|
||||
|
||||
pub async fn make_session<'a>(params: &SessionParams<'a>) -> anyhow::Result<()> {
|
||||
let sock_addr = params.socket_addr;
|
||||
let sess_name = params.session_name;
|
||||
let rb_size = params.read_buffer_size;
|
||||
|
||||
let mut conn = match TcpStream::connect(sock_addr).await {
|
||||
Ok(conn) => {
|
||||
println!("{sess_name} connected");
|
||||
conn
|
||||
}
|
||||
Err(err) => {
|
||||
println!("{sess_name} connection failed");
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
|
||||
_ = conn.set_nodelay(true);
|
||||
|
||||
let mut dec = PacketDecoder::new();
|
||||
let mut enc = PacketEncoder::new();
|
||||
|
||||
let server_addr_str = sock_addr.ip().to_string().as_str().to_owned();
|
||||
|
||||
let handshake_pkt = C2sHandshakePacket::Handshake(Handshake {
|
||||
protocol_version: VarInt::from(PROTOCOL_VERSION),
|
||||
server_address: &server_addr_str,
|
||||
server_port: sock_addr.port(),
|
||||
next_state: HandshakeNextState::Login,
|
||||
});
|
||||
|
||||
_ = enc.append_packet(&handshake_pkt);
|
||||
|
||||
_ = enc.append_packet(&LoginStart {
|
||||
username: Username::new(sess_name).unwrap(),
|
||||
profile_id: Some(Uuid::new_v4()),
|
||||
});
|
||||
|
||||
let write_buf = enc.take();
|
||||
conn.write_all(&write_buf).await?;
|
||||
|
||||
loop {
|
||||
dec.reserve(rb_size);
|
||||
|
||||
let mut read_buf = dec.take_capacity();
|
||||
|
||||
conn.readable().await?;
|
||||
|
||||
match conn.try_read_buf(&mut read_buf) {
|
||||
Ok(0) => return Err(io::Error::from(ErrorKind::UnexpectedEof).into()),
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||
Err(e) => return Err(e.into()),
|
||||
Ok(_) => (),
|
||||
};
|
||||
|
||||
dec.queue_bytes(read_buf);
|
||||
|
||||
if let Ok(Some(pkt)) = dec.try_next_packet::<S2cLoginPacket>() {
|
||||
match pkt {
|
||||
S2cLoginPacket::SetCompression(p) => {
|
||||
let threshold = p.threshold.0 as u32;
|
||||
|
||||
dec.set_compression(true);
|
||||
enc.set_compression(Some(threshold));
|
||||
}
|
||||
|
||||
S2cLoginPacket::LoginSuccess(_) => {
|
||||
break;
|
||||
}
|
||||
|
||||
S2cLoginPacket::EncryptionRequest(_) => {
|
||||
bail!("encryption not implemented");
|
||||
}
|
||||
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("{sess_name} logined");
|
||||
|
||||
loop {
|
||||
while !dec.has_next_packet()? {
|
||||
dec.reserve(rb_size);
|
||||
|
||||
let mut read_buf = dec.take_capacity();
|
||||
|
||||
conn.readable().await?;
|
||||
|
||||
match conn.try_read_buf(&mut read_buf) {
|
||||
Ok(0) => return Err(io::Error::from(ErrorKind::UnexpectedEof).into()),
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||
Err(e) => return Err(e.into()),
|
||||
Ok(_) => (),
|
||||
};
|
||||
|
||||
dec.queue_bytes(read_buf);
|
||||
}
|
||||
|
||||
match dec.try_next_packet::<S2cPlayPacket>() {
|
||||
Ok(None) => continue,
|
||||
Ok(Some(pkt)) => match pkt {
|
||||
S2cPlayPacket::KeepAliveS2c(p) => {
|
||||
enc.clear();
|
||||
|
||||
_ = enc.append_packet(&KeepAliveC2s { id: p.id });
|
||||
conn.write_all(&enc.take()).await?;
|
||||
|
||||
println!("{sess_name} keep alive")
|
||||
}
|
||||
|
||||
S2cPlayPacket::SynchronizePlayerPosition(p) => {
|
||||
enc.clear();
|
||||
|
||||
_ = enc.append_packet(&ConfirmTeleport {
|
||||
teleport_id: p.teleport_id,
|
||||
});
|
||||
|
||||
_ = enc.append_packet(&SetPlayerPosition {
|
||||
position: p.position,
|
||||
on_ground: true,
|
||||
});
|
||||
|
||||
conn.write_all(&enc.take()).await?;
|
||||
|
||||
println!("{sess_name} spawned")
|
||||
}
|
||||
_ => (),
|
||||
},
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue