diff --git a/charge-controller-supervisor/src/controller/tristar.rs b/charge-controller-supervisor/src/controller/tristar.rs index cbbf234..04b0583 100644 --- a/charge-controller-supervisor/src/controller/tristar.rs +++ b/charge-controller-supervisor/src/controller/tristar.rs @@ -1,6 +1,6 @@ +use modbus_wrapper::ModbusTimeout; use prometheus::core::{AtomicI64, GenericGauge}; use serde::{Deserialize, Serialize}; -use tokio_modbus::client::{Reader, Writer}; use crate::gauges::{ BATTERY_TEMP, BATTERY_VOLTAGE, CHARGE_STATE, HEATSINK_TEMP, INPUT_CURRENT, TARGET_VOLTAGE, @@ -55,57 +55,10 @@ pub struct Tristar { consecutive_errors: usize, scaling: Scaling, settings_last_read: Option, - transport_settings: crate::config::Transport, } -struct ModbusTimeout { - context: tokio_modbus::client::Context, - reconnect_required: bool, -} +mod modbus_wrapper; -const MODBUS_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); - -impl ModbusTimeout { - const fn new(context: tokio_modbus::client::Context) -> Self { - Self { - context, - reconnect_required: false, - } - } - - pub async fn write_single_register( - &mut self, - addr: tokio_modbus::Address, - word: u16, - ) -> eyre::Result<()> { - let r = tokio::time::timeout( - MODBUS_TIMEOUT, - self.context.write_single_register(addr, word), - ) - .await?; - if let Err(tokio_modbus::Error::Transport(_)) = &r { - self.reconnect_required = true; - } - r??; - Ok(()) - } - - pub async fn read_holding_registers( - &mut self, - addr: tokio_modbus::Address, - cnt: tokio_modbus::Quantity, - ) -> eyre::Result> { - let r = tokio::time::timeout( - MODBUS_TIMEOUT, - self.context.read_holding_registers(addr, cnt), - ) - .await?; - if let Err(tokio_modbus::Error::Transport(_)) = &r { - self.reconnect_required = true; - } - Ok(r??) - } -} #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TristarSettings { network: Option, @@ -602,32 +555,14 @@ impl ChargeStateGauges { } } -async fn connect_modbus(transport: &crate::config::Transport) -> eyre::Result { - let slave = tokio_modbus::Slave(DEVICE_ID); - - let modbus = match transport { - crate::config::Transport::Serial { port, baud_rate } => { - let modbus_serial = - tokio_serial::SerialStream::open(&tokio_serial::new(port, *baud_rate))?; - tokio_modbus::client::rtu::attach_slave(modbus_serial, slave) - } - crate::config::Transport::Tcp { ip, port } => { - let modbus_tcp = tokio::net::TcpStream::connect((*ip, *port)).await?; - tokio_modbus::client::tcp::attach_slave(modbus_tcp, slave) - } - }; - - Ok(ModbusTimeout::new(modbus)) -} - impl Tristar { pub async fn new( friendly_name: &str, transport: &crate::config::Transport, ) -> eyre::Result { - let mut modbus = connect_modbus(transport).await?; + let mut modbus = ModbusTimeout::new(transport.clone()).await?; let scaling = { - let data = modbus.read_holding_registers(0x0000, 4).await?; + let data = modbus.read_holding_registers(0x0000, 4).await??; Scaling::from(&data) }; @@ -640,14 +575,10 @@ impl Tristar { consecutive_errors: 0, scaling, settings_last_read: None, - transport_settings: transport.clone(), }) } pub async fn refresh(&mut self) -> eyre::Result { - if self.modbus.reconnect_required { - self.modbus = connect_modbus(&self.transport_settings).await?; - } let new_state = self.get_data().await?; self.scaling = new_state.scaling; @@ -704,7 +635,7 @@ impl Tristar { let network = if let Ok(network_data) = self .modbus .read_holding_registers(NETWORK_DATA_ADDR_START, NETWORK_DATA_LENGTH) - .await + .await? { Some(NetworkSettings::from_buf(&network_data)?) } else { @@ -714,12 +645,12 @@ impl Tristar { let charge_data_1 = self .modbus .read_holding_registers(CHARGE_DATA_ADDR_START, 0x22) - .await?; + .await??; let charge_data_2 = self .modbus .read_holding_registers(CHARGE_DATA_ADDR_START + 0x80, 0x4e) - .await?; + .await??; let mut charge_data = vec![0; 0xCE]; charge_data[..0x22].copy_from_slice(&charge_data_1); @@ -742,7 +673,7 @@ impl Tristar { let scaled_voltage: u16 = self.scale_voltage(target_voltage); self.modbus .write_single_register(TristarRamAddress::VbRefSlave as u16, scaled_voltage) - .await?; + .await??; log::debug!( "tristar {} being set to voltage {target_voltage} (scaled: {scaled_voltage:#X?})", @@ -764,7 +695,7 @@ impl Tristar { let data = self .modbus .read_holding_registers(0x0000, RAM_DATA_SIZE + 1) - .await?; + .await??; Ok(TristarState::from_ram(&data)) } } diff --git a/charge-controller-supervisor/src/controller/tristar/modbus_wrapper.rs b/charge-controller-supervisor/src/controller/tristar/modbus_wrapper.rs new file mode 100644 index 0000000..913f583 --- /dev/null +++ b/charge-controller-supervisor/src/controller/tristar/modbus_wrapper.rs @@ -0,0 +1,178 @@ +pub struct ModbusTimeout { + context: Option, + transport_settings: crate::config::Transport, + counters: Counters, +} + +const MODBUS_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); + +async fn connect( + transport_settings: &crate::config::Transport, +) -> eyre::Result { + let slave = tokio_modbus::Slave(super::DEVICE_ID); + + let modbus = match transport_settings { + crate::config::Transport::Serial { port, baud_rate } => { + let modbus_serial = + tokio_serial::SerialStream::open(&tokio_serial::new(port, *baud_rate))?; + tokio_modbus::client::rtu::attach_slave(modbus_serial, slave) + } + crate::config::Transport::Tcp { ip, port } => { + let modbus_tcp = tokio::net::TcpStream::connect((*ip, *port)).await?; + tokio_modbus::client::tcp::attach_slave(modbus_tcp, slave) + } + }; + + Ok(modbus) +} + +type ModbusDeviceResult = Result; +type ModbusResult = Result, tokio_modbus::Error>; + +type ContextFuture<'a, R> = dyn std::future::Future> + Send + 'a; +type ContextFn = + fn(&mut tokio_modbus::client::Context, D) -> std::pin::Pin>>; + +trait TryInsert { + type T; + async fn get_or_try_insert_with(&mut self, f: F) -> Result<&mut Self::T, R> + where + Fut: std::future::Future>, + F: FnOnce() -> Fut; +} + +impl TryInsert for Option { + type T = T; + + async fn get_or_try_insert_with(&mut self, f: F) -> Result<&mut Self::T, R> + where + Fut: std::future::Future>, + F: FnOnce() -> Fut, + { + if self.is_none() { + let got = f().await?; + *self = Some(got); + } + + // a `None` variant for `self` would have been replaced by a `Some` variant + // in the code above, or the ? would have caused an early return + Ok(self.as_mut().unwrap()) + } +} + +#[derive(Default, Debug)] +struct Counters { + timeout: usize, + protocol: usize, +} + +const MAX_TIMEOUTS: usize = 3; + +const MAX_PROTOCOL_ERRORS: usize = 3; + +impl Counters { + fn reset(&mut self) { + *self = Self::default(); + } + + const fn any_above_max(&self) -> bool { + self.timeout > MAX_TIMEOUTS || self.protocol > MAX_PROTOCOL_ERRORS + } +} + +impl ModbusTimeout { + pub async fn new(transport_settings: crate::config::Transport) -> eyre::Result { + let context = Some(connect(&transport_settings).await?); + Ok(Self { + context, + transport_settings, + counters: Counters::default(), + }) + } + + async fn with_context( + &mut self, + f: ContextFn, + data: D, + ) -> eyre::Result> { + if let Ok(context) = self + .context + .get_or_try_insert_with(async || connect(&self.transport_settings).await) + .await + { + let res = tokio::time::timeout(MODBUS_TIMEOUT, f(context, data)).await; + match res { + Ok(Ok(v)) => { + self.counters.reset(); + return Ok(v); + } + + Ok(Err(tokio_modbus::Error::Protocol(e))) => { + // protocol error + log::warn!("protocol error: {e:?}"); + self.counters.protocol += 1; + } + Ok(Err(tokio_modbus::Error::Transport(e))) => { + // transport error + log::warn!("reconnecting due to transport error: {e:?}"); + self.context = None; + } + Err(_) => { + // timeout + self.counters.timeout += 1; + } + } + if self.counters.any_above_max() { + self.context = None; + log::warn!( + "reconnecting due to multiple errors without a successful operation: {:?}", + self.counters + ); + self.counters.reset(); + } + } else { + // failed to reconnect + return Err(eyre::eyre!("failed to reconnect to controller")); + } + + Err(eyre::eyre!(":(")) + } + + pub async fn write_single_register( + &mut self, + addr: tokio_modbus::Address, + word: u16, + ) -> eyre::Result> { + async fn write( + context: &mut tokio_modbus::client::Context, + addr: tokio_modbus::Address, + word: u16, + ) -> ModbusResult<()> { + use tokio_modbus::client::Writer; + context.write_single_register(addr, word).await + } + + let fut: ContextFn<(), _> = |context, (addr, word)| Box::pin(write(context, addr, word)); + let r = self.with_context(fut, (addr, word)).await?; + Ok(r) + } + + pub async fn read_holding_registers( + &mut self, + addr: tokio_modbus::Address, + cnt: tokio_modbus::Quantity, + ) -> eyre::Result>> { + async fn read( + context: &mut tokio_modbus::client::Context, + addr: tokio_modbus::Address, + cnt: tokio_modbus::Quantity, + ) -> ModbusResult> { + use tokio_modbus::client::Reader; + context.read_holding_registers(addr, cnt).await + } + + let fut: ContextFn<_, _> = |context, (addr, cnt)| Box::pin(read(context, addr, cnt)); + let res = self.with_context(fut, (addr, cnt)).await?; + Ok(res) + } +}