ditch async-channel

This commit is contained in:
Alex Janka 2024-01-23 11:16:57 +11:00
parent 09887c2a09
commit b0b8cac319
4 changed files with 17 additions and 74 deletions

50
Cargo.lock generated
View file

@ -125,19 +125,6 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "async-channel"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
dependencies = [
"concurrent-queue",
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.5" version = "0.3.5"
@ -428,15 +415,6 @@ dependencies = [
"yansi 0.5.1", "yansi 0.5.1",
] ]
[[package]]
name = "concurrent-queue"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "convert_case" name = "convert_case"
version = "0.4.0" version = "0.4.0"
@ -656,27 +634,6 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "event-listener"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]] [[package]]
name = "failure" name = "failure"
version = "0.1.8" version = "0.1.8"
@ -1587,12 +1544,6 @@ version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.1" version = "0.12.1"
@ -2575,7 +2526,6 @@ dependencies = [
name = "tesla-charge-controller" name = "tesla-charge-controller"
version = "1.0.21" version = "1.0.21"
dependencies = [ dependencies = [
"async-channel",
"chrono", "chrono",
"clap 4.4.11", "clap 4.4.11",
"env_logger 0.10.1", "env_logger 0.10.1",

View file

@ -23,7 +23,6 @@ thiserror = "1.0"
rocket = { version = "0.5", features = ["json"] } rocket = { version = "0.5", features = ["json"] }
include_dir = "0.7" include_dir = "0.7"
chrono = "0.4" chrono = "0.4"
async-channel = "2.1"
metrics = "0.22" metrics = "0.22"
metrics-prometheus = "0.6.0" metrics-prometheus = "0.6.0"
prometheus = "0.13" prometheus = "0.13"

View file

@ -68,12 +68,11 @@ async fn main() {
.some_or_print_with("loading tesla interface") .some_or_print_with("loading tesla interface")
{ {
// build the channel that takes messages from the webserver thread to the api thread // build the channel that takes messages from the webserver thread to the api thread
let (api_requests, api_receiver) = async_channel::unbounded(); let (api_requests, mut api_receiver) = tokio::sync::mpsc::unbounded_channel();
// and to the pli thread // and to the pli thread
let (pli_requests, pli_receiver) = async_channel::unbounded(); let (pli_requests, mut pli_receiver) = tokio::sync::mpsc::unbounded_channel();
// and to the charge rate controller thread // and to the charge rate controller thread
let (tcrc_requests, tcrc_receiver) = async_channel::unbounded(); let (tcrc_requests, mut tcrc_receiver) = tokio::sync::mpsc::unbounded_channel();
// try to spawn the pli loop // try to spawn the pli loop
let pl_state = match { let pl_state = match {
let config = access_config(); let config = access_config();
@ -94,8 +93,8 @@ async fn main() {
tokio::select! { tokio::select! {
_ = interval.tick() => pli.refresh(), _ = interval.tick() => pli.refresh(),
message = pli_receiver.recv() => match message { message = pli_receiver.recv() => match message {
Ok(message) => pli.process_request(message), Some(message) => pli.process_request(message),
Err(e) => panic!("Error on PLI receive channel: {e:?}") None => panic!("PLI send channel dropped")
} }
} }
} }
@ -208,12 +207,12 @@ async fn main() {
} }
}, },
api_message = api_receiver.recv() => match api_message { api_message = api_receiver.recv() => match api_message {
Ok(message) => interface.process_request(message).await, Some(message) => interface.process_request(message).await,
Err(e) => panic!("Error on Tesla receive channel: {e:?}") None => panic!("Tesla send channel dropped")
}, },
tcrc_message = tcrc_receiver.recv() => match tcrc_message { tcrc_message = tcrc_receiver.recv() => match tcrc_message {
Ok(message) => tesla_charge_rate_controller.process_request(message), Some(message) => tesla_charge_rate_controller.process_request(message),
Err(e) => panic!("Error on TCRC receive channel: {e:?}") None => panic!("TCRC send channel dropped")
} }
} }
} }

View file

@ -1,6 +1,5 @@
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use async_channel::Sender;
use rocket::{ use rocket::{
fairing::{Fairing, Info, Kind}, fairing::{Fairing, Info, Kind},
http::Header, http::Header,
@ -8,6 +7,7 @@ use rocket::{
Request, Response, State, Request, Response, State,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::UnboundedSender;
use crate::{ use crate::{
api_interface::InterfaceRequest, api_interface::InterfaceRequest,
@ -26,9 +26,9 @@ pub struct ServerState {
pub car_state: Arc<RwLock<CarState>>, pub car_state: Arc<RwLock<CarState>>,
pub tcrc_state: Arc<RwLock<TcrcState>>, pub tcrc_state: Arc<RwLock<TcrcState>>,
pub pl_state: Option<Arc<RwLock<PlState>>>, pub pl_state: Option<Arc<RwLock<PlState>>>,
pub api_requests: Sender<InterfaceRequest>, pub api_requests: UnboundedSender<InterfaceRequest>,
pub pli_requests: Sender<PliRequest>, pub pli_requests: UnboundedSender<PliRequest>,
pub tcrc_requests: Sender<TcrcRequest>, pub tcrc_requests: UnboundedSender<TcrcRequest>,
} }
pub async fn launch_server(state: ServerState) { pub async fn launch_server(state: ServerState) {
@ -117,7 +117,7 @@ async fn control_state(state: &State<ServerState>) -> Result<Json<ControlState>,
#[post("/flash")] #[post("/flash")]
async fn flash(state: &State<ServerState>) { async fn flash(state: &State<ServerState>) {
let _ = state.api_requests.send(InterfaceRequest::FlashLights).await; let _ = state.api_requests.send(InterfaceRequest::FlashLights);
} }
#[post("/disable-control")] #[post("/disable-control")]
@ -125,7 +125,6 @@ async fn disable_control(state: &State<ServerState>) {
match state match state
.tcrc_requests .tcrc_requests
.send(TcrcRequest::DisableAutomaticControl) .send(TcrcRequest::DisableAutomaticControl)
.await
{ {
Ok(_) => {} Ok(_) => {}
Err(e) => log::error!("Error sending stop control request: {e:?}"), Err(e) => log::error!("Error sending stop control request: {e:?}"),
@ -137,7 +136,6 @@ async fn enable_control(state: &State<ServerState>) {
match state match state
.tcrc_requests .tcrc_requests
.send(TcrcRequest::EnableAutomaticControl) .send(TcrcRequest::EnableAutomaticControl)
.await
{ {
Ok(_) => {} Ok(_) => {}
Err(e) => log::error!("Error sending stop control request: {e:?}"), Err(e) => log::error!("Error sending stop control request: {e:?}"),
@ -191,22 +189,19 @@ fn metrics() -> Result<String, ServerError> {
#[get("/sync-time")] #[get("/sync-time")]
async fn sync_time(state: &State<ServerState>) -> String { async fn sync_time(state: &State<ServerState>) -> String {
let _ = state.pli_requests.send(PliRequest::SyncTime).await; let _ = state.pli_requests.send(PliRequest::SyncTime);
String::from("syncing time...") String::from("syncing time...")
} }
#[get("/read-ram/<address>")] #[get("/read-ram/<address>")]
async fn read_ram(address: u8, state: &State<ServerState>) -> String { async fn read_ram(address: u8, state: &State<ServerState>) -> String {
let _ = state.pli_requests.send(PliRequest::ReadRam(address)).await; let _ = state.pli_requests.send(PliRequest::ReadRam(address));
format!("reading at ram address {address}") format!("reading at ram address {address}")
} }
#[get("/read-eeprom/<address>")] #[get("/read-eeprom/<address>")]
async fn read_eeprom(address: u8, state: &State<ServerState>) -> String { async fn read_eeprom(address: u8, state: &State<ServerState>) -> String {
let _ = state let _ = state.pli_requests.send(PliRequest::ReadEeprom(address));
.pli_requests
.send(PliRequest::ReadEeprom(address))
.await;
format!("reading at eeprom address {address}") format!("reading at eeprom address {address}")
} }