From 186d8fc71ade9f2a8bd080179b670d521ab69ca4 Mon Sep 17 00:00:00 2001 From: Alex Janka Date: Thu, 16 Jan 2025 11:15:31 +1100 Subject: [PATCH] ccs: use tokio::sync::watch for tx to controllers --- .../src/controller.rs | 74 ++++++++----------- charge-controller-supervisor/src/main.rs | 38 ++++------ charge-controller-supervisor/src/web.rs | 21 ++++-- 3 files changed, 62 insertions(+), 71 deletions(-) diff --git a/charge-controller-supervisor/src/controller.rs b/charge-controller-supervisor/src/controller.rs index b7aaba1..422dfb2 100644 --- a/charge-controller-supervisor/src/controller.rs +++ b/charge-controller-supervisor/src/controller.rs @@ -8,8 +8,9 @@ pub struct Controller { interval: std::time::Duration, inner: ControllerInner, data: std::sync::Arc, - voltage_rx: Option>, - voltage_tx: Option, + follow_voltage: bool, + voltage_rx: tokio::sync::watch::Receiver, + voltage_tx: Option>, settings_last_read: Option, } @@ -53,16 +54,15 @@ pub enum ControllerSettings { #[derive(Clone, Copy, Debug)] pub enum VoltageCommand { + None, Set(f64), } impl Controller { pub async fn new( config: crate::config::ChargeControllerConfig, - ) -> eyre::Result<( - Self, - Option>, - )> { + voltage_rx: tokio::sync::watch::Receiver, + ) -> eyre::Result { let inner = match config.variant { crate::config::ChargeControllerVariant::Tristar => ControllerInner::Tristar( tristar::Tristar::new(&config.name, &config.transport).await?, @@ -81,25 +81,16 @@ impl Controller { let data = std::sync::Arc::new(ControllerData::new()); - let (voltage_tx, voltage_rx) = if config.follow_primary { - let (a, b) = tokio::sync::mpsc::unbounded_channel(); - (Some(a), Some(b)) - } else { - (None, None) - }; - - Ok(( - Self { - name: config.name, - interval: std::time::Duration::from_secs(config.watch_interval_seconds), - inner, - data, - voltage_rx, - voltage_tx: None, - settings_last_read: None, - }, - voltage_tx, - )) + Ok(Self { + name: config.name, + interval: std::time::Duration::from_secs(config.watch_interval_seconds), + inner, + data, + voltage_rx, + voltage_tx: None, + settings_last_read: None, + follow_voltage: config.follow_primary, + }) } pub fn get_data_ptr(&self) -> std::sync::Arc { @@ -121,7 +112,7 @@ impl Controller { target ); - tx.send_to_all(VoltageCommand::Set(target)); + tx.send(VoltageCommand::Set(target))?; } } @@ -149,9 +140,9 @@ impl Controller { &self.name } - pub fn set_tx_to_secondary(&mut self, tx: MultiTx) { + pub fn set_tx_to_secondary(&mut self, tx: tokio::sync::watch::Sender) { assert!( - self.voltage_rx.is_none(), + !self.follow_voltage, "trying to set {} as primary when it is also a secondary!", self.name ); @@ -159,14 +150,22 @@ impl Controller { self.voltage_tx = Some(tx); } - pub fn get_rx(&mut self) -> Option<&mut tokio::sync::mpsc::UnboundedReceiver> { - self.voltage_rx.as_mut() + pub fn get_rx(&mut self) -> &mut tokio::sync::watch::Receiver { + &mut self.voltage_rx } pub async fn process_command(&mut self, command: VoltageCommand) -> eyre::Result<()> { match command { VoltageCommand::Set(target_voltage) => { - self.inner.set_target_voltage(target_voltage).await + if self.follow_voltage { + self.inner.set_target_voltage(target_voltage).await + } else { + Ok(()) + } + } + VoltageCommand::None => { + // todo: disable voltage control + Ok(()) } } } @@ -178,19 +177,6 @@ impl Controller { } } -#[derive(Clone)] -pub struct MultiTx(pub Vec>); - -impl MultiTx { - pub fn send_to_all(&self, command: VoltageCommand) { - for sender in &self.0 { - if let Err(e) = sender.send(command) { - log::error!("failed to send command {command:?}: {e:?}"); - } - } - } -} - #[expect(clippy::large_enum_variant)] pub enum ControllerInner { Pl(pl::Pli), diff --git a/charge-controller-supervisor/src/main.rs b/charge-controller-supervisor/src/main.rs index 7d5b38a..da15ff0 100644 --- a/charge-controller-supervisor/src/main.rs +++ b/charge-controller-supervisor/src/main.rs @@ -108,29 +108,26 @@ async fn watch(args: Args) -> eyre::Result<()> { let mut controllers = Vec::new(); let mut map = std::collections::HashMap::new(); - let mut follow_voltage_tx = Vec::new(); + + let (voltage_tx, voltage_rx) = + tokio::sync::watch::channel(controller::VoltageCommand::None); for config in &config.charge_controllers { let n = config.name.clone(); - match controller::Controller::new(config.clone()).await { - Ok((v, voltage_tx)) => { + match controller::Controller::new(config.clone(), voltage_rx.clone()).await { + Ok(v) => { map.insert(n, v.get_data_ptr()); controllers.push(v); - if let Some(voltage_tx) = voltage_tx { - follow_voltage_tx.push(voltage_tx); - } } Err(e) => log::error!("couldn't connect to {}: {e:?}", n), } } - let follow_voltage_tx = controller::MultiTx(follow_voltage_tx); - if let Some(primary) = controllers .iter_mut() .find(|c| c.name() == config.primary_charge_controller) { - primary.set_tx_to_secondary(follow_voltage_tx.clone()); + primary.set_tx_to_secondary(voltage_tx.clone()); } drop(config); @@ -142,7 +139,7 @@ async fn watch(args: Args) -> eyre::Result<()> { ( storage::AllControllers::new(map), - follow_voltage_tx, + voltage_tx, controller_tasks, ) }; @@ -183,20 +180,17 @@ async fn run_loop(mut controller: controller::Controller) -> eyre::Result<()> { timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { - if let Some(rx) = controller.get_rx() { - tokio::select! { - _ = timeout.tick() => { - do_refresh(&mut controller).await; - } - Some(command) = rx.recv() => { - if let Err(e) = controller.process_command(command).await { - log::error!("controller {} failed to process command: {e}", controller.name()); - } + let rx = controller.get_rx(); + tokio::select! { + _ = timeout.tick() => { + do_refresh(&mut controller).await; + } + Ok(()) = rx.changed() => { + let command = *rx.borrow(); + if let Err(e) = controller.process_command(command).await { + log::error!("controller {} failed to process command: {e}", controller.name()); } } - } else { - timeout.tick().await; - do_refresh(&mut controller).await; } } } diff --git a/charge-controller-supervisor/src/web.rs b/charge-controller-supervisor/src/web.rs index f98f0bb..70cbc36 100644 --- a/charge-controller-supervisor/src/web.rs +++ b/charge-controller-supervisor/src/web.rs @@ -7,14 +7,14 @@ mod static_handler; pub struct ServerState { primary_name: String, data: AllControllers, - tx_to_controllers: crate::controller::MultiTx, + tx_to_controllers: tokio::sync::watch::Sender, } impl ServerState { pub fn new( primary_name: &impl ToString, data: AllControllers, - tx_to_controllers: crate::controller::MultiTx, + tx_to_controllers: tokio::sync::watch::Sender, ) -> Self { let primary_name = primary_name.to_string(); Self { @@ -200,20 +200,23 @@ async fn enable_control() { } #[post("/control/disable")] -async fn disable_control(state: &State) { +async fn disable_control(state: &State) -> Result<(), ServerError> { log::warn!("disabling control"); crate::config::write_to_config() .await .enable_secondary_control = false; state .tx_to_controllers - .send_to_all(crate::controller::VoltageCommand::Set(-1.0)); + .send(crate::controller::VoltageCommand::None)?; + Ok(()) } + enum ServerError { Prometheus, NotFound, InvalidPrimaryName, NoData, + ControllerTx, } impl From for ServerError { @@ -222,12 +225,20 @@ impl From for ServerError { } } +impl From> for ServerError { + fn from(_: tokio::sync::watch::error::SendError) -> Self { + Self::ControllerTx + } +} + impl<'a> rocket::response::Responder<'a, 'a> for ServerError { fn respond_to(self, _: &'a rocket::Request<'_>) -> rocket::response::Result<'a> { Err(match self { Self::NotFound => rocket::http::Status::NotFound, Self::InvalidPrimaryName => rocket::http::Status::ServiceUnavailable, - Self::NoData | Self::Prometheus => rocket::http::Status::InternalServerError, + Self::ControllerTx | Self::NoData | Self::Prometheus => { + rocket::http::Status::InternalServerError + } }) } }