ccs: use tokio::sync::watch for tx to controllers

This commit is contained in:
Alex Janka 2025-01-16 11:15:31 +11:00
parent 05edfe6b84
commit 186d8fc71a
3 changed files with 62 additions and 71 deletions

View file

@ -8,8 +8,9 @@ pub struct Controller {
interval: std::time::Duration, interval: std::time::Duration,
inner: ControllerInner, inner: ControllerInner,
data: std::sync::Arc<ControllerData>, data: std::sync::Arc<ControllerData>,
voltage_rx: Option<tokio::sync::mpsc::UnboundedReceiver<VoltageCommand>>, follow_voltage: bool,
voltage_tx: Option<MultiTx>, voltage_rx: tokio::sync::watch::Receiver<VoltageCommand>,
voltage_tx: Option<tokio::sync::watch::Sender<VoltageCommand>>,
settings_last_read: Option<std::time::Instant>, settings_last_read: Option<std::time::Instant>,
} }
@ -53,16 +54,15 @@ pub enum ControllerSettings {
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub enum VoltageCommand { pub enum VoltageCommand {
None,
Set(f64), Set(f64),
} }
impl Controller { impl Controller {
pub async fn new( pub async fn new(
config: crate::config::ChargeControllerConfig, config: crate::config::ChargeControllerConfig,
) -> eyre::Result<( voltage_rx: tokio::sync::watch::Receiver<VoltageCommand>,
Self, ) -> eyre::Result<Self> {
Option<tokio::sync::mpsc::UnboundedSender<VoltageCommand>>,
)> {
let inner = match config.variant { let inner = match config.variant {
crate::config::ChargeControllerVariant::Tristar => ControllerInner::Tristar( crate::config::ChargeControllerVariant::Tristar => ControllerInner::Tristar(
tristar::Tristar::new(&config.name, &config.transport).await?, tristar::Tristar::new(&config.name, &config.transport).await?,
@ -81,25 +81,16 @@ impl Controller {
let data = std::sync::Arc::new(ControllerData::new()); let data = std::sync::Arc::new(ControllerData::new());
let (voltage_tx, voltage_rx) = if config.follow_primary { Ok(Self {
let (a, b) = tokio::sync::mpsc::unbounded_channel(); name: config.name,
(Some(a), Some(b)) interval: std::time::Duration::from_secs(config.watch_interval_seconds),
} else { inner,
(None, None) data,
}; voltage_rx,
voltage_tx: None,
Ok(( settings_last_read: None,
Self { follow_voltage: config.follow_primary,
name: config.name, })
interval: std::time::Duration::from_secs(config.watch_interval_seconds),
inner,
data,
voltage_rx,
voltage_tx: None,
settings_last_read: None,
},
voltage_tx,
))
} }
pub fn get_data_ptr(&self) -> std::sync::Arc<ControllerData> { pub fn get_data_ptr(&self) -> std::sync::Arc<ControllerData> {
@ -121,7 +112,7 @@ impl Controller {
target target
); );
tx.send_to_all(VoltageCommand::Set(target)); tx.send(VoltageCommand::Set(target))?;
} }
} }
@ -149,9 +140,9 @@ impl Controller {
&self.name &self.name
} }
pub fn set_tx_to_secondary(&mut self, tx: MultiTx) { pub fn set_tx_to_secondary(&mut self, tx: tokio::sync::watch::Sender<VoltageCommand>) {
assert!( assert!(
self.voltage_rx.is_none(), !self.follow_voltage,
"trying to set {} as primary when it is also a secondary!", "trying to set {} as primary when it is also a secondary!",
self.name self.name
); );
@ -159,14 +150,22 @@ impl Controller {
self.voltage_tx = Some(tx); self.voltage_tx = Some(tx);
} }
pub fn get_rx(&mut self) -> Option<&mut tokio::sync::mpsc::UnboundedReceiver<VoltageCommand>> { pub fn get_rx(&mut self) -> &mut tokio::sync::watch::Receiver<VoltageCommand> {
self.voltage_rx.as_mut() &mut self.voltage_rx
} }
pub async fn process_command(&mut self, command: VoltageCommand) -> eyre::Result<()> { pub async fn process_command(&mut self, command: VoltageCommand) -> eyre::Result<()> {
match command { match command {
VoltageCommand::Set(target_voltage) => { VoltageCommand::Set(target_voltage) => {
self.inner.set_target_voltage(target_voltage).await if self.follow_voltage {
self.inner.set_target_voltage(target_voltage).await
} else {
Ok(())
}
}
VoltageCommand::None => {
// todo: disable voltage control
Ok(())
} }
} }
} }
@ -178,19 +177,6 @@ impl Controller {
} }
} }
#[derive(Clone)]
pub struct MultiTx(pub Vec<tokio::sync::mpsc::UnboundedSender<VoltageCommand>>);
impl MultiTx {
pub fn send_to_all(&self, command: VoltageCommand) {
for sender in &self.0 {
if let Err(e) = sender.send(command) {
log::error!("failed to send command {command:?}: {e:?}");
}
}
}
}
#[expect(clippy::large_enum_variant)] #[expect(clippy::large_enum_variant)]
pub enum ControllerInner { pub enum ControllerInner {
Pl(pl::Pli), Pl(pl::Pli),

View file

@ -108,29 +108,26 @@ async fn watch(args: Args) -> eyre::Result<()> {
let mut controllers = Vec::new(); let mut controllers = Vec::new();
let mut map = std::collections::HashMap::new(); let mut map = std::collections::HashMap::new();
let mut follow_voltage_tx = Vec::new();
let (voltage_tx, voltage_rx) =
tokio::sync::watch::channel(controller::VoltageCommand::None);
for config in &config.charge_controllers { for config in &config.charge_controllers {
let n = config.name.clone(); let n = config.name.clone();
match controller::Controller::new(config.clone()).await { match controller::Controller::new(config.clone(), voltage_rx.clone()).await {
Ok((v, voltage_tx)) => { Ok(v) => {
map.insert(n, v.get_data_ptr()); map.insert(n, v.get_data_ptr());
controllers.push(v); controllers.push(v);
if let Some(voltage_tx) = voltage_tx {
follow_voltage_tx.push(voltage_tx);
}
} }
Err(e) => log::error!("couldn't connect to {}: {e:?}", n), Err(e) => log::error!("couldn't connect to {}: {e:?}", n),
} }
} }
let follow_voltage_tx = controller::MultiTx(follow_voltage_tx);
if let Some(primary) = controllers if let Some(primary) = controllers
.iter_mut() .iter_mut()
.find(|c| c.name() == config.primary_charge_controller) .find(|c| c.name() == config.primary_charge_controller)
{ {
primary.set_tx_to_secondary(follow_voltage_tx.clone()); primary.set_tx_to_secondary(voltage_tx.clone());
} }
drop(config); drop(config);
@ -142,7 +139,7 @@ async fn watch(args: Args) -> eyre::Result<()> {
( (
storage::AllControllers::new(map), storage::AllControllers::new(map),
follow_voltage_tx, voltage_tx,
controller_tasks, controller_tasks,
) )
}; };
@ -183,20 +180,17 @@ async fn run_loop(mut controller: controller::Controller) -> eyre::Result<()> {
timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop { loop {
if let Some(rx) = controller.get_rx() { let rx = controller.get_rx();
tokio::select! { tokio::select! {
_ = timeout.tick() => { _ = timeout.tick() => {
do_refresh(&mut controller).await; do_refresh(&mut controller).await;
} }
Some(command) = rx.recv() => { Ok(()) = rx.changed() => {
if let Err(e) = controller.process_command(command).await { let command = *rx.borrow();
log::error!("controller {} failed to process command: {e}", controller.name()); if let Err(e) = controller.process_command(command).await {
} log::error!("controller {} failed to process command: {e}", controller.name());
} }
} }
} else {
timeout.tick().await;
do_refresh(&mut controller).await;
} }
} }
} }

View file

@ -7,14 +7,14 @@ mod static_handler;
pub struct ServerState { pub struct ServerState {
primary_name: String, primary_name: String,
data: AllControllers, data: AllControllers,
tx_to_controllers: crate::controller::MultiTx, tx_to_controllers: tokio::sync::watch::Sender<crate::controller::VoltageCommand>,
} }
impl ServerState { impl ServerState {
pub fn new( pub fn new(
primary_name: &impl ToString, primary_name: &impl ToString,
data: AllControllers, data: AllControllers,
tx_to_controllers: crate::controller::MultiTx, tx_to_controllers: tokio::sync::watch::Sender<crate::controller::VoltageCommand>,
) -> Self { ) -> Self {
let primary_name = primary_name.to_string(); let primary_name = primary_name.to_string();
Self { Self {
@ -200,20 +200,23 @@ async fn enable_control() {
} }
#[post("/control/disable")] #[post("/control/disable")]
async fn disable_control(state: &State<ServerState>) { async fn disable_control(state: &State<ServerState>) -> Result<(), ServerError> {
log::warn!("disabling control"); log::warn!("disabling control");
crate::config::write_to_config() crate::config::write_to_config()
.await .await
.enable_secondary_control = false; .enable_secondary_control = false;
state state
.tx_to_controllers .tx_to_controllers
.send_to_all(crate::controller::VoltageCommand::Set(-1.0)); .send(crate::controller::VoltageCommand::None)?;
Ok(())
} }
enum ServerError { enum ServerError {
Prometheus, Prometheus,
NotFound, NotFound,
InvalidPrimaryName, InvalidPrimaryName,
NoData, NoData,
ControllerTx,
} }
impl From<prometheus::Error> for ServerError { impl From<prometheus::Error> for ServerError {
@ -222,12 +225,20 @@ impl From<prometheus::Error> for ServerError {
} }
} }
impl<T> From<tokio::sync::watch::error::SendError<T>> for ServerError {
fn from(_: tokio::sync::watch::error::SendError<T>) -> Self {
Self::ControllerTx
}
}
impl<'a> rocket::response::Responder<'a, 'a> for ServerError { impl<'a> rocket::response::Responder<'a, 'a> for ServerError {
fn respond_to(self, _: &'a rocket::Request<'_>) -> rocket::response::Result<'a> { fn respond_to(self, _: &'a rocket::Request<'_>) -> rocket::response::Result<'a> {
Err(match self { Err(match self {
Self::NotFound => rocket::http::Status::NotFound, Self::NotFound => rocket::http::Status::NotFound,
Self::InvalidPrimaryName => rocket::http::Status::ServiceUnavailable, Self::InvalidPrimaryName => rocket::http::Status::ServiceUnavailable,
Self::NoData | Self::Prometheus => rocket::http::Status::InternalServerError, Self::ControllerTx | Self::NoData | Self::Prometheus => {
rocket::http::Status::InternalServerError
}
}) })
} }
} }