status + not draining and refilling state hashmap

This commit is contained in:
Alex Janka 2024-04-10 09:22:59 +10:00
parent c780932300
commit c4782814bb
5 changed files with 185 additions and 22 deletions

79
Cargo.lock generated
View file

@ -36,6 +36,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.11"
@ -489,6 +504,20 @@ dependencies = [
"zeroize",
]
[[package]]
name = "chrono"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.52.0",
]
[[package]]
name = "cipher"
version = "0.4.4"
@ -572,6 +601,12 @@ dependencies = [
"version_check",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
[[package]]
name = "cpufeatures"
version = "0.2.12"
@ -1143,8 +1178,9 @@ dependencies = [
[[package]]
name = "homekit-exporter"
version = "0.5.6"
version = "0.6.0"
dependencies = [
"chrono",
"clap",
"env_logger",
"futures-util",
@ -1230,6 +1266,29 @@ dependencies = [
"want",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "indexmap"
version = "2.2.3"
@ -1473,6 +1532,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-traits"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
@ -2624,6 +2692,15 @@ dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.0",
]
[[package]]
name = "windows-sys"
version = "0.48.0"

View file

@ -1,6 +1,6 @@
[package]
name = "homekit-exporter"
version = "0.5.6"
version = "0.6.0"
edition = "2021"
license = "Apache-2.0"
description = "Prometheus exporter for HomeKit sensors"
@ -21,3 +21,4 @@ homekit-controller = { path = "../homekit-controller" }
rocket = { version = "0.5", features = ["json"] }
mdns = "3.0.0"
futures-util = "0.3.30"
chrono = "0.4"

View file

@ -1,7 +1,7 @@
# Maintainer: Alex Janka <alex@alexjanka.com>
pkgname=homekit-logger
pkgver=0.5.6
pkgver=0.6.0
pkgrel=1
pkgdesc="Prometheus exporter for HomeKit sensors"
arch=('x86_64' 'aarch64')

View file

@ -3,6 +3,7 @@ extern crate rocket;
use std::{collections::HashMap, path::PathBuf, time::Duration};
use chrono::{DateTime, Local};
use clap::{Parser, Subcommand};
use futures_util::{pin_mut, StreamExt};
use homekit_controller::{
@ -82,7 +83,41 @@ async fn discover() -> Result<(), mdns::Error> {
Ok(())
}
async fn init(pairing_data: PathBuf) -> Result<HashMap<String, DeviceConnection>, HomekitError> {
struct DeviceConnectionWithTimestamp {
connection: DeviceConnection,
last_accessed: Option<DateTime<Local>>,
}
impl DeviceConnectionWithTimestamp {
async fn update_characteristics(&mut self) -> Result<(), HomekitError> {
let r = self.connection.update_characteristics().await;
if r.is_ok() {
self.last_accessed = Some(Local::now());
}
r
}
async fn connect(&mut self) -> Result<(), HomekitError> {
let r = self.connection.connect().await;
if r.is_ok() {
self.last_accessed = Some(Local::now());
}
r
}
}
impl From<DeviceConnection> for DeviceConnectionWithTimestamp {
fn from(connection: DeviceConnection) -> Self {
Self {
connection,
last_accessed: None,
}
}
}
async fn init(
pairing_data: PathBuf,
) -> Result<HashMap<String, DeviceConnectionWithTimestamp>, HomekitError> {
let discovered = spawn_discover_thread()?;
tokio::time::sleep(Duration::from_secs(2)).await;
if pairing_data.is_file() {
@ -91,7 +126,9 @@ async fn init(pairing_data: PathBuf) -> Result<HashMap<String, DeviceConnection>
for (k, v) in devices {
let discovered = discovered.clone();
set.spawn(async move {
let mut connection = v.to_connection(discovered.clone(), Some(k.clone()));
let mut connection = DeviceConnectionWithTimestamp::from(
v.to_connection(discovered.clone(), Some(k.clone())),
);
match tokio::time::timeout(Duration::from_secs(5), connection.connect()).await {
Ok(Err(e)) => log::error!("error connecting to {k}: {e:?}"),

View file

@ -1,35 +1,84 @@
use homekit_controller::{Data, DeviceConnection};
use chrono::{Local, TimeDelta};
use futures_util::{stream::FuturesUnordered, StreamExt};
use homekit_controller::Data;
use rocket::State;
use std::collections::HashMap;
use tokio::{sync::Mutex, task::JoinSet};
use std::fmt::Write;
use tokio::sync::Mutex;
use crate::MONITORED_CHARACTERISTICS;
use crate::{DeviceConnectionWithTimestamp, MONITORED_CHARACTERISTICS};
pub fn launch(
paired: HashMap<String, DeviceConnection>,
paired: HashMap<String, DeviceConnectionWithTimestamp>,
port: usize,
) -> rocket::Rocket<rocket::Build> {
rocket::build()
.configure(rocket::Config::figment().merge(("port", port)))
.manage(Mutex::new(paired))
.mount("/", routes![metrics])
.mount("/", routes![metrics, status])
}
#[get("/status")]
pub async fn status(
state: &State<Mutex<HashMap<String, DeviceConnectionWithTimestamp>>>,
) -> Option<String> {
let state = state.lock().await;
let mut out = String::new();
for (name, connection) in state.iter() {
writeln!(
out,
"{name}: {}",
connection
.last_accessed
.map(|v| format!(
"{} ({})",
Wrapped(Local::now().signed_duration_since(v)),
v.to_rfc2822()
))
.unwrap_or(String::from("No successful connection"))
)
.ok()?;
}
Some(out)
}
struct Wrapped(TimeDelta);
impl core::fmt::Display for Wrapped {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.0.num_days() > 0 {
write!(f, "{} days, ", self.0.num_days())?;
}
if self.0.num_hours() > 0 {
write!(f, "{} hours, ", self.0.num_hours() % 24)?;
}
if self.0.num_minutes() > 0 {
write!(f, "{} minutes, ", self.0.num_minutes() % 60)?;
}
write!(
f,
"{}.{:0>3} seconds ago",
self.0.num_seconds() % 60,
self.0.num_milliseconds() % 1000
)?;
Ok(())
}
}
#[get("/metrics")]
pub async fn metrics(state: &State<Mutex<HashMap<String, DeviceConnection>>>) -> Option<String> {
pub async fn metrics(
state: &State<Mutex<HashMap<String, DeviceConnectionWithTimestamp>>>,
) -> Option<String> {
let mut state = state.lock().await;
let mut set = JoinSet::new();
for (name, mut connected) in state.drain() {
set.spawn(async move {
let mut set = FuturesUnordered::new();
for (name, connected) in state.iter_mut() {
set.push(async move {
let mut return_string = String::new();
let mut types_seen = Vec::new();
if let Err(e) = connected.update_characteristics().await {
log::error!("updating characteristics for {name}: error {e:?}");
return (name, connected, None);
}
connected.update_characteristics().await.ok()?;
for (aid, accessory) in &connected.accessories {
for (aid, accessory) in &connected.connection.accessories {
for service in accessory.services.values() {
for (cid, characteristic) in &service.characteristics {
if !MONITORED_CHARACTERISTICS.contains(&characteristic.characteristic_type)
@ -69,15 +118,14 @@ pub async fn metrics(state: &State<Mutex<HashMap<String, DeviceConnection>>>) ->
}
}
(name, connected, Some((return_string, types_seen)))
Some((return_string, types_seen))
});
}
let mut types_string = String::new();
let mut values_string = String::new();
let mut shown_types = Vec::new();
while let Some(Ok((k, v, val))) = set.join_next().await {
state.insert(k, v);
while let Some(val) = set.next().await {
if let Some((metrics, types)) = val {
for c in types {
if !shown_types.contains(&c) {