ccs: tristar modbus wrapper: actually retry

This commit is contained in:
Alex Janka 2025-01-15 12:05:41 +11:00
parent 667f51d375
commit 9bd1243129

View file

@ -62,10 +62,13 @@ impl<T> TryInsert for Option<T> {
#[derive(Default, Debug)]
struct Counters {
gateway: usize,
timeout: usize,
protocol: usize,
}
const MAX_GATEWAY_ERRORS: usize = 3;
const MAX_TIMEOUTS: usize = 3;
const MAX_PROTOCOL_ERRORS: usize = 3;
@ -76,10 +79,14 @@ impl Counters {
}
const fn any_above_max(&self) -> bool {
self.timeout > MAX_TIMEOUTS || self.protocol > MAX_PROTOCOL_ERRORS
self.gateway > MAX_GATEWAY_ERRORS
|| self.timeout > MAX_TIMEOUTS
|| self.protocol > MAX_PROTOCOL_ERRORS
}
}
const NUM_TRIES: usize = 3;
impl ModbusTimeout {
pub async fn new(transport_settings: crate::config::Transport) -> eyre::Result<Self> {
let context = Some(connect(&transport_settings).await?);
@ -90,52 +97,65 @@ impl ModbusTimeout {
})
}
async fn with_context<R, D>(
async fn with_context<R, D: Copy>(
&mut self,
f: ContextFn<R, D>,
data: D,
) -> eyre::Result<Result<R, tokio_modbus::ExceptionCode>> {
if let Ok(context) = self
.context
.get_or_try_insert_with(async || connect(&self.transport_settings).await)
.await
{
let res = tokio::time::timeout(MODBUS_TIMEOUT, f(context, data)).await;
match res {
Ok(Ok(v)) => {
self.counters.reset();
return Ok(v);
let mut last_err = None;
for _ in 0..NUM_TRIES {
if let Ok(context) = self
.context
.get_or_try_insert_with(async || connect(&self.transport_settings).await)
.await
{
let res = tokio::time::timeout(MODBUS_TIMEOUT, f(context, data)).await;
match res {
Ok(Ok(Err(e)))
if e == tokio_modbus::ExceptionCode::GatewayTargetDevice
|| e == tokio_modbus::ExceptionCode::GatewayPathUnavailable =>
{
log::warn!("gateway error: {e:?}");
last_err = Some(e.into());
self.counters.gateway += 1;
}
Ok(Ok(v)) => {
self.counters.reset();
return Ok(v);
}
Ok(Err(tokio_modbus::Error::Protocol(e))) => {
// protocol error
log::warn!("protocol error: {e:?}");
last_err = Some(e.into());
self.counters.protocol += 1;
}
Ok(Err(tokio_modbus::Error::Transport(e))) => {
// transport error
log::warn!("reconnecting due to transport error: {e:?}");
last_err = Some(e.into());
self.context = None;
}
Err(_) => {
// timeout
last_err = Some(eyre::eyre!("timeout"));
self.counters.timeout += 1;
}
}
Ok(Err(tokio_modbus::Error::Protocol(e))) => {
// protocol error
log::warn!("protocol error: {e:?}");
self.counters.protocol += 1;
}
Ok(Err(tokio_modbus::Error::Transport(e))) => {
// transport error
log::warn!("reconnecting due to transport error: {e:?}");
if self.counters.any_above_max() {
self.context = None;
log::warn!(
"reconnecting due to multiple errors without a successful operation: {:?}",
self.counters
);
self.counters.reset();
}
Err(_) => {
// timeout
self.counters.timeout += 1;
}
} else {
// failed to reconnect
return Err(eyre::eyre!("failed to reconnect to controller"));
}
if self.counters.any_above_max() {
self.context = None;
log::warn!(
"reconnecting due to multiple errors without a successful operation: {:?}",
self.counters
);
self.counters.reset();
}
} else {
// failed to reconnect
return Err(eyre::eyre!("failed to reconnect to controller"));
}
Err(eyre::eyre!(":("))
Err(last_err.unwrap_or_else(|| eyre::eyre!("unknown last error????")))
}
pub async fn write_single_register(