simultaneous connection and better erroring

This commit is contained in:
Alex Janka 2024-03-21 09:36:35 +11:00
parent 9ff8317269
commit 7d511f3c12
6 changed files with 92 additions and 71 deletions

2
Cargo.lock generated
View file

@ -1143,7 +1143,7 @@ dependencies = [
[[package]] [[package]]
name = "homekit-exporter" name = "homekit-exporter"
version = "0.4.1" version = "0.5.0"
dependencies = [ dependencies = [
"clap", "clap",
"env_logger", "env_logger",

View file

@ -12,11 +12,10 @@ use x25519_dalek::{EphemeralSecret, PublicKey};
pub use crate::{ pub use crate::{
discovery::{spawn_discover_thread, MdnsDiscoveredList}, discovery::{spawn_discover_thread, MdnsDiscoveredList},
pairing_data::{CharacteristicType, Data, ServiceType}, pairing_data::{Accessory, CharacteristicType, Data, PythonPairingData, ServiceType},
}; };
use crate::{ use crate::{
homekit_http::{AccessorySocket, DiscoveryError}, homekit_http::{AccessorySocket, DiscoveryError},
pairing_data::{Accessory, PythonPairingData},
tlv8::{decode, HomekitState, TlvEncodableData, TlvEncode, TlvError, TlvType}, tlv8::{decode, HomekitState, TlvEncodableData, TlvEncode, TlvError, TlvType},
}; };
@ -43,7 +42,7 @@ impl PythonPairingData {
Ok(connected_device) Ok(connected_device)
} }
fn to_connection(&self, discovered: MdnsDiscoveredList) -> DeviceConnection { pub fn to_connection(&self, discovered: MdnsDiscoveredList) -> DeviceConnection {
DeviceConnection { DeviceConnection {
accessories: Default::default(), accessories: Default::default(),
discovered, discovered,
@ -79,7 +78,7 @@ pub struct DeviceConnection {
} }
impl DeviceConnection { impl DeviceConnection {
async fn connect(&mut self) -> Result<(), HomekitError> { pub async fn connect(&mut self) -> Result<(), HomekitError> {
if let Some(mut socket) = self.socket.take() { if let Some(mut socket) = self.socket.take() {
socket.disconnect().await?; socket.disconnect().await?;
} }

View file

@ -1,6 +1,6 @@
[package] [package]
name = "homekit-exporter" name = "homekit-exporter"
version = "0.4.1" version = "0.5.0"
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
description = "Prometheus exporter for HomeKit sensors" description = "Prometheus exporter for HomeKit sensors"

View file

@ -1,7 +1,7 @@
# Maintainer: Alex Janka <alex@alexjanka.com> # Maintainer: Alex Janka <alex@alexjanka.com>
pkgname=homekit-logger pkgname=homekit-logger
pkgver=0.4.1 pkgver=0.5.0
pkgrel=1 pkgrel=1
pkgdesc="Prometheus exporter for HomeKit sensors" pkgdesc="Prometheus exporter for HomeKit sensors"
arch=('x86_64' 'aarch64') arch=('x86_64' 'aarch64')

View file

@ -9,6 +9,7 @@ use homekit_controller::{
spawn_discover_thread, CharacteristicType, DeviceConnection, HomekitError, spawn_discover_thread, CharacteristicType, DeviceConnection, HomekitError,
}; };
use server::launch; use server::launch;
use tokio::task::JoinSet;
mod server; mod server;
@ -45,6 +46,7 @@ const MONITORED_CHARACTERISTICS: [CharacteristicType; 7] = [
async fn rocket() -> rocket::Rocket<rocket::Build> { async fn rocket() -> rocket::Rocket<rocket::Build> {
env_logger::init(); env_logger::init();
let args = Args::parse(); let args = Args::parse();
match args.command { match args.command {
Commands::Watch => match init(args.pairing_data).await { Commands::Watch => match init(args.pairing_data).await {
Ok(paired) => launch(paired, args.port), Ok(paired) => launch(paired, args.port),
@ -76,27 +78,25 @@ async fn init(pairing_data: PathBuf) -> Result<HashMap<String, DeviceConnection>
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
if pairing_data.is_file() { if pairing_data.is_file() {
let devices = homekit_controller::load(pairing_data)?; let devices = homekit_controller::load(pairing_data)?;
let mut connected_devices = HashMap::new(); let mut set = JoinSet::new();
for (k, v) in devices { for (k, v) in devices {
let mut num = 0; let discovered = discovered.clone();
loop { set.spawn(async move {
match v.connect(&discovered).await { let mut connection = v.to_connection(discovered.clone());
Ok(v) => {
match tokio::time::timeout(Duration::from_secs(2), connection.connect()).await {
Ok(Err(e)) => log::error!("error connecting to {k}: {e:?}"),
Err(_) => log::error!("timeout connecting to {k}"),
_ => {}
}
(k, connection)
});
}
let mut connected_devices = HashMap::new();
while let Some(Ok((k, v))) = set.join_next().await {
connected_devices.insert(k, v); connected_devices.insert(k, v);
break;
}
Err(e) => {
num += 1;
log::error!("error connecting to {k}: {e:?}");
if num > 10 {
log::error!("\t...not connecting");
break;
}
}
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
} }
if connected_devices.is_empty() { if connected_devices.is_empty() {
Err(HomekitError::NoSuccessfulConnections) Err(HomekitError::NoSuccessfulConnections)
} else { } else {

View file

@ -1,7 +1,7 @@
use homekit_controller::{Data, DeviceConnection}; use homekit_controller::{Data, DeviceConnection};
use rocket::State; use rocket::State;
use std::{collections::HashMap, ops::DerefMut}; use std::{collections::HashMap, time::Duration};
use tokio::sync::Mutex; use tokio::{sync::Mutex, task::JoinSet};
use crate::MONITORED_CHARACTERISTICS; use crate::MONITORED_CHARACTERISTICS;
@ -15,13 +15,23 @@ pub fn launch(
.mount("/", routes![metrics]) .mount("/", routes![metrics])
} }
const METRIC_GATHER_TIMEOUT_SECS: u64 = 3;
#[get("/metrics")] #[get("/metrics")]
pub async fn metrics(state: &State<Mutex<HashMap<String, DeviceConnection>>>) -> Option<String> { pub async fn metrics(state: &State<Mutex<HashMap<String, DeviceConnection>>>) -> Option<String> {
let mut s = String::new();
let mut state = state.lock().await; let mut state = state.lock().await;
let mut shown_types = Vec::new(); let mut set = JoinSet::new();
for (name, connected) in state.deref_mut() { for (name, mut connected) in state.drain() {
connected.update_characteristics().await.ok()?; set.spawn(
async move {
let mut return_string = String::new();
let mut types_seen = Vec::new();
match tokio::time::timeout(Duration::from_secs(METRIC_GATHER_TIMEOUT_SECS), connected.update_characteristics()).await{
Ok(r) => r.ok()?,
Err(_) => log::info!("failed to update characteristics for {name}"),
}
for (aid, accessory) in &connected.accessories { for (aid, accessory) in &connected.accessories {
for service in accessory.services.values() { for service in accessory.services.values() {
@ -30,15 +40,11 @@ pub async fn metrics(state: &State<Mutex<HashMap<String, DeviceConnection>>>) ->
continue; continue;
} }
if let Some(value) = &characteristic.value { if let Some(value) = &characteristic.value {
if !shown_types.contains(&characteristic.characteristic_type) { if !types_seen.contains(&characteristic.characteristic_type) {
s.push_str( types_seen.push(characteristic.characteristic_type);
format!("# TYPE {} gauge\n", characteristic.characteristic_type)
.as_str(),
);
shown_types.push(characteristic.characteristic_type);
} }
s.push_str(
format!( return_string.push_str(format!(
"{}{{hub=\"{}\",service_type=\"{}\",accessory_name=\"{}\",accessory=\"{}.{}\",service=\"{}.{}.{}\"}} {}\n", "{}{{hub=\"{}\",service_type=\"{}\",accessory_name=\"{}\",accessory=\"{}.{}\",service=\"{}.{}.{}\"}} {}\n",
characteristic.characteristic_type, characteristic.characteristic_type,
name, name,
@ -61,13 +67,29 @@ pub async fn metrics(state: &State<Mutex<HashMap<String, DeviceConnection>>>) ->
Data::Tlv8(v) => v.to_string(), Data::Tlv8(v) => v.to_string(),
Data::Data(v) => v.to_string(), Data::Data(v) => v.to_string(),
} }
) ).as_str());
.as_str(),
)
} }
} }
} }
} }
Some((name,connected,return_string,types_seen))
} }
);
}
let mut s = String::new();
let mut shown_types = Vec::new();
while let Some(Ok(Some((k, v, metrics, types)))) = set.join_next().await {
state.insert(k, v);
for c in types {
if !shown_types.contains(&c) {
s.push_str(format!("# TYPE {} gauge\n", c).as_str());
shown_types.push(c);
}
}
s.push_str(&metrics);
}
Some(s) Some(s)
} }