From 7d511f3c1235157d9f227fba2e271f455da7b2ae Mon Sep 17 00:00:00 2001 From: Alex Janka Date: Thu, 21 Mar 2024 09:36:35 +1100 Subject: [PATCH] simultaneous connection and better erroring --- Cargo.lock | 2 +- homekit-controller/src/lib.rs | 7 +- homekit-exporter/Cargo.toml | 2 +- homekit-exporter/packaging/PKGBUILD | 2 +- homekit-exporter/src/main.rs | 36 ++++----- homekit-exporter/src/server.rs | 114 +++++++++++++++++----------- 6 files changed, 92 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9a03c44..7c3cec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1143,7 +1143,7 @@ dependencies = [ [[package]] name = "homekit-exporter" -version = "0.4.1" +version = "0.5.0" dependencies = [ "clap", "env_logger", diff --git a/homekit-controller/src/lib.rs b/homekit-controller/src/lib.rs index 23af8ff..9fe209f 100644 --- a/homekit-controller/src/lib.rs +++ b/homekit-controller/src/lib.rs @@ -12,11 +12,10 @@ use x25519_dalek::{EphemeralSecret, PublicKey}; pub use crate::{ discovery::{spawn_discover_thread, MdnsDiscoveredList}, - pairing_data::{CharacteristicType, Data, ServiceType}, + pairing_data::{Accessory, CharacteristicType, Data, PythonPairingData, ServiceType}, }; use crate::{ homekit_http::{AccessorySocket, DiscoveryError}, - pairing_data::{Accessory, PythonPairingData}, tlv8::{decode, HomekitState, TlvEncodableData, TlvEncode, TlvError, TlvType}, }; @@ -43,7 +42,7 @@ impl PythonPairingData { Ok(connected_device) } - fn to_connection(&self, discovered: MdnsDiscoveredList) -> DeviceConnection { + pub fn to_connection(&self, discovered: MdnsDiscoveredList) -> DeviceConnection { DeviceConnection { accessories: Default::default(), discovered, @@ -79,7 +78,7 @@ pub struct DeviceConnection { } impl DeviceConnection { - async fn connect(&mut self) -> Result<(), HomekitError> { + pub async fn connect(&mut self) -> Result<(), HomekitError> { if let Some(mut socket) = self.socket.take() { socket.disconnect().await?; } diff --git a/homekit-exporter/Cargo.toml b/homekit-exporter/Cargo.toml index ee977d6..e413590 100644 --- a/homekit-exporter/Cargo.toml +++ b/homekit-exporter/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "homekit-exporter" -version = "0.4.1" +version = "0.5.0" edition = "2021" license = "Apache-2.0" description = "Prometheus exporter for HomeKit sensors" diff --git a/homekit-exporter/packaging/PKGBUILD b/homekit-exporter/packaging/PKGBUILD index cb96fdd..06403d3 100644 --- a/homekit-exporter/packaging/PKGBUILD +++ b/homekit-exporter/packaging/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Alex Janka pkgname=homekit-logger -pkgver=0.4.1 +pkgver=0.5.0 pkgrel=1 pkgdesc="Prometheus exporter for HomeKit sensors" arch=('x86_64' 'aarch64') diff --git a/homekit-exporter/src/main.rs b/homekit-exporter/src/main.rs index 3732cbb..a8c826b 100644 --- a/homekit-exporter/src/main.rs +++ b/homekit-exporter/src/main.rs @@ -9,6 +9,7 @@ use homekit_controller::{ spawn_discover_thread, CharacteristicType, DeviceConnection, HomekitError, }; use server::launch; +use tokio::task::JoinSet; mod server; @@ -45,6 +46,7 @@ const MONITORED_CHARACTERISTICS: [CharacteristicType; 7] = [ async fn rocket() -> rocket::Rocket { env_logger::init(); let args = Args::parse(); + match args.command { Commands::Watch => match init(args.pairing_data).await { Ok(paired) => launch(paired, args.port), @@ -76,27 +78,25 @@ async fn init(pairing_data: PathBuf) -> Result tokio::time::sleep(Duration::from_secs(2)).await; if pairing_data.is_file() { let devices = homekit_controller::load(pairing_data)?; - let mut connected_devices = HashMap::new(); + let mut set = JoinSet::new(); for (k, v) in devices { - let mut num = 0; - loop { - match v.connect(&discovered).await { - Ok(v) => { - connected_devices.insert(k, v); - break; - } - Err(e) => { - num += 1; - log::error!("error connecting to {k}: {e:?}"); - if num > 10 { - log::error!("\t...not connecting"); - break; - } - } + let discovered = discovered.clone(); + set.spawn(async move { + let mut connection = v.to_connection(discovered.clone()); + + match tokio::time::timeout(Duration::from_secs(2), connection.connect()).await { + Ok(Err(e)) => log::error!("error connecting to {k}: {e:?}"), + Err(_) => log::error!("timeout connecting to {k}"), + _ => {} } - tokio::time::sleep(Duration::from_millis(1000)).await; - } + (k, connection) + }); } + let mut connected_devices = HashMap::new(); + while let Some(Ok((k, v))) = set.join_next().await { + connected_devices.insert(k, v); + } + if connected_devices.is_empty() { Err(HomekitError::NoSuccessfulConnections) } else { diff --git a/homekit-exporter/src/server.rs b/homekit-exporter/src/server.rs index e43bba5..478c48f 100644 --- a/homekit-exporter/src/server.rs +++ b/homekit-exporter/src/server.rs @@ -1,7 +1,7 @@ use homekit_controller::{Data, DeviceConnection}; use rocket::State; -use std::{collections::HashMap, ops::DerefMut}; -use tokio::sync::Mutex; +use std::{collections::HashMap, time::Duration}; +use tokio::{sync::Mutex, task::JoinSet}; use crate::MONITORED_CHARACTERISTICS; @@ -15,59 +15,81 @@ pub fn launch( .mount("/", routes![metrics]) } +const METRIC_GATHER_TIMEOUT_SECS: u64 = 3; + #[get("/metrics")] pub async fn metrics(state: &State>>) -> Option { - let mut s = String::new(); let mut state = state.lock().await; - let mut shown_types = Vec::new(); - for (name, connected) in state.deref_mut() { - connected.update_characteristics().await.ok()?; + let mut set = JoinSet::new(); + for (name, mut connected) in state.drain() { + set.spawn( + async move { + let mut return_string = String::new(); + let mut types_seen = Vec::new(); - for (aid, accessory) in &connected.accessories { - for service in accessory.services.values() { - for (cid, characteristic) in &service.characteristics { - if !MONITORED_CHARACTERISTICS.contains(&characteristic.characteristic_type) { - continue; - } - if let Some(value) = &characteristic.value { - if !shown_types.contains(&characteristic.characteristic_type) { - s.push_str( - format!("# TYPE {} gauge\n", characteristic.characteristic_type) - .as_str(), - ); - shown_types.push(characteristic.characteristic_type); - } - s.push_str( - format!( - "{}{{hub=\"{}\",service_type=\"{}\",accessory_name=\"{}\",accessory=\"{}.{}\",service=\"{}.{}.{}\"}} {}\n", - characteristic.characteristic_type, - name, - service.service_type, - accessory.name, - name, - aid, - name, - aid, - cid, - match value { - Data::Bool(v) => format!("{}", if *v { 1 } else { 0 }), - Data::Uint8(v) => format!("{v}"), - Data::Uint16(v) => format!("{v}"), - Data::Uint32(v) => format!("{v}"), - Data::Uint64(v) => format!("{v}"), - Data::Int(v) => format!("{v}"), - Data::Float(v) => format!("{v}"), - Data::String(v) => v.to_string(), - Data::Tlv8(v) => v.to_string(), - Data::Data(v) => v.to_string(), + match tokio::time::timeout(Duration::from_secs(METRIC_GATHER_TIMEOUT_SECS), connected.update_characteristics()).await{ + Ok(r) => r.ok()?, + Err(_) => log::info!("failed to update characteristics for {name}"), + } + + + for (aid, accessory) in &connected.accessories { + for service in accessory.services.values() { + for (cid, characteristic) in &service.characteristics { + if !MONITORED_CHARACTERISTICS.contains(&characteristic.characteristic_type) { + continue; + } + if let Some(value) = &characteristic.value { + if !types_seen.contains(&characteristic.characteristic_type) { + types_seen.push(characteristic.characteristic_type); } - ) - .as_str(), - ) + + return_string.push_str(format!( + "{}{{hub=\"{}\",service_type=\"{}\",accessory_name=\"{}\",accessory=\"{}.{}\",service=\"{}.{}.{}\"}} {}\n", + characteristic.characteristic_type, + name, + service.service_type, + accessory.name, + name, + aid, + name, + aid, + cid, + match value { + Data::Bool(v) => format!("{}", if *v { 1 } else { 0 }), + Data::Uint8(v) => format!("{v}"), + Data::Uint16(v) => format!("{v}"), + Data::Uint32(v) => format!("{v}"), + Data::Uint64(v) => format!("{v}"), + Data::Int(v) => format!("{v}"), + Data::Float(v) => format!("{v}"), + Data::String(v) => v.to_string(), + Data::Tlv8(v) => v.to_string(), + Data::Data(v) => v.to_string(), + } + ).as_str()); + } + } } } + + Some((name,connected,return_string,types_seen)) + } + ); + } + + let mut s = String::new(); + let mut shown_types = Vec::new(); + while let Some(Ok(Some((k, v, metrics, types)))) = set.join_next().await { + state.insert(k, v); + for c in types { + if !shown_types.contains(&c) { + s.push_str(format!("# TYPE {} gauge\n", c).as_str()); + shown_types.push(c); } } + s.push_str(&metrics); } + Some(s) }