275 lines
9.9 KiB
Python
275 lines
9.9 KiB
Python
"""
|
|
Plugin pour HWiNFO via RemoteHWInfo (Demion)
|
|
https://github.com/Demion/remotehwinfo
|
|
|
|
Format JSON RemoteHWInfo:
|
|
{
|
|
"hwinfo": {
|
|
"sensors": [
|
|
{"entryIndex": 0, "sensorNameUser": "System", ...},
|
|
{"entryIndex": 1, "sensorNameUser": "CPU [#0]: Intel Core i7", ...}
|
|
],
|
|
"readings": [
|
|
{"sensorIndex": 0, "labelUser": "Memory Load", "value": 45.5, "unit": "%", ...},
|
|
{"sensorIndex": 1, "labelUser": "CPU Temp", "value": 65.0, "unit": "°C", ...}
|
|
]
|
|
}
|
|
}
|
|
"""
|
|
import requests
|
|
from typing import Dict, List, Optional, Any
|
|
from .base import BasePlugin
|
|
|
|
|
|
class HWiNFOPlugin(BasePlugin):
|
|
"""Plugin pour HWiNFO via RemoteHWInfo"""
|
|
|
|
def __init__(self, config: dict):
|
|
super().__init__(config)
|
|
self.history = {}
|
|
self.max_history = 120
|
|
|
|
def get_id(self) -> str:
|
|
return 'hwinfo'
|
|
|
|
def get_name(self) -> str:
|
|
return 'HWiNFO'
|
|
|
|
def get_default_port(self) -> int:
|
|
return 60000
|
|
|
|
def get_base_url(self) -> str:
|
|
"""Retourne l'URL de l'API RemoteHWInfo"""
|
|
return f"http://{self.host}:{self.port}/json.json"
|
|
|
|
def test_connection(self) -> Dict[str, Any]:
|
|
"""Teste la connexion à RemoteHWInfo"""
|
|
try:
|
|
url = self.get_base_url()
|
|
response = requests.get(url, timeout=5)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Vérifier le format RemoteHWInfo
|
|
if 'hwinfo' not in data:
|
|
return {
|
|
'success': False,
|
|
'message': 'Format invalide. Vérifiez que RemoteHWInfo est lancé.'
|
|
}
|
|
|
|
hwinfo = data['hwinfo']
|
|
sensor_count = len(hwinfo.get('sensors', []))
|
|
reading_count = len(hwinfo.get('readings', []))
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'Connecté - {sensor_count} capteurs, {reading_count} lectures',
|
|
'version': 'RemoteHWInfo',
|
|
'sensor_count': reading_count
|
|
}
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
return {
|
|
'success': False,
|
|
'message': f'Impossible de se connecter à {self.get_base_url()}. Vérifiez que RemoteHWInfo est lancé.'
|
|
}
|
|
except requests.exceptions.Timeout:
|
|
return {
|
|
'success': False,
|
|
'message': 'Timeout - Le serveur ne répond pas'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Erreur: {str(e)}'
|
|
}
|
|
|
|
def get_data(self) -> Optional[Dict]:
|
|
"""Récupère les données JSON depuis RemoteHWInfo"""
|
|
try:
|
|
url = self.get_base_url()
|
|
response = requests.get(url, timeout=5)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
if 'hwinfo' in data:
|
|
return data['hwinfo']
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Erreur HWiNFO get_data: {e}")
|
|
return None
|
|
|
|
def get_hierarchy(self) -> List[dict]:
|
|
"""Récupère la hiérarchie des capteurs pour l'admin"""
|
|
data = self.get_data()
|
|
if not data:
|
|
return []
|
|
|
|
return self._build_hierarchy(data)
|
|
|
|
def _build_hierarchy(self, data: Dict) -> List[dict]:
|
|
"""Construit la hiérarchie des capteurs"""
|
|
sensors = data.get('sensors', [])
|
|
readings = data.get('readings', [])
|
|
|
|
# Créer un dictionnaire des sensors par index
|
|
sensor_map = {}
|
|
for sensor in sensors:
|
|
idx = sensor.get('entryIndex', 0)
|
|
sensor_map[idx] = {
|
|
'name': sensor.get('sensorNameUser', sensor.get('sensorNameOriginal', 'Unknown')),
|
|
'value': '',
|
|
'id': '',
|
|
'level': 0,
|
|
'type': 'group',
|
|
'children': []
|
|
}
|
|
|
|
# Ajouter les readings aux sensors correspondants
|
|
for reading in readings:
|
|
sensor_idx = reading.get('sensorIndex', 0)
|
|
|
|
if sensor_idx not in sensor_map:
|
|
continue
|
|
|
|
label = reading.get('labelUser', reading.get('labelOriginal', 'Unknown'))
|
|
value = reading.get('value', 0)
|
|
unit = reading.get('unit', '')
|
|
|
|
# Générer un ID unique
|
|
sensor_id = self._generate_sensor_id(sensor_idx, reading.get('readingId', 0), label)
|
|
|
|
# Formater la valeur
|
|
if isinstance(value, float):
|
|
if value == int(value):
|
|
formatted_value = f"{int(value)} {unit}".strip()
|
|
else:
|
|
formatted_value = f"{value:.2f} {unit}".strip()
|
|
else:
|
|
formatted_value = f"{value} {unit}".strip()
|
|
|
|
reading_entry = {
|
|
'name': label,
|
|
'value': formatted_value,
|
|
'id': sensor_id,
|
|
'level': 1,
|
|
'type': self.get_sensor_type(unit, label),
|
|
'children': []
|
|
}
|
|
|
|
sensor_map[sensor_idx]['children'].append(reading_entry)
|
|
|
|
# Retourner seulement les sensors qui ont des readings
|
|
return [s for s in sensor_map.values() if s['children']]
|
|
|
|
def parse_sensors(self, data: Dict) -> List[Dict]:
|
|
"""Parse les données et retourne une liste plate de capteurs"""
|
|
sensors_list = []
|
|
|
|
if not data:
|
|
return sensors_list
|
|
|
|
sensors = data.get('sensors', [])
|
|
readings = data.get('readings', [])
|
|
|
|
# Créer un dictionnaire des sensors par index
|
|
sensor_names = {}
|
|
for sensor in sensors:
|
|
idx = sensor.get('entryIndex', 0)
|
|
sensor_names[idx] = sensor.get('sensorNameUser', sensor.get('sensorNameOriginal', 'Unknown'))
|
|
|
|
# Parser chaque reading
|
|
for reading in readings:
|
|
sensor_idx = reading.get('sensorIndex', 0)
|
|
hardware_name = sensor_names.get(sensor_idx, 'Unknown')
|
|
|
|
label = reading.get('labelUser', reading.get('labelOriginal', 'Unknown'))
|
|
value = reading.get('value', 0)
|
|
unit = reading.get('unit', '')
|
|
|
|
# Générer un ID unique
|
|
sensor_id = self._generate_sensor_id(sensor_idx, reading.get('readingId', 0), label)
|
|
|
|
# Formater la valeur
|
|
if isinstance(value, float):
|
|
if value == int(value):
|
|
formatted_value = f"{int(value)} {unit}".strip()
|
|
else:
|
|
formatted_value = f"{value:.2f} {unit}".strip()
|
|
else:
|
|
formatted_value = f"{value} {unit}".strip()
|
|
|
|
sensor_type = self.get_sensor_type(unit, label)
|
|
hardware_type = self._guess_hardware_type(hardware_name)
|
|
|
|
sensor = {
|
|
'id': sensor_id,
|
|
'name': label,
|
|
'value': formatted_value,
|
|
'raw_value': float(value) if value else 0.0,
|
|
'type': sensor_type,
|
|
'unit': unit,
|
|
'hardware': hardware_name,
|
|
'hardware_type': hardware_type,
|
|
'category': hardware_name,
|
|
'path': f"{hardware_name}/{label}",
|
|
}
|
|
sensors_list.append(sensor)
|
|
|
|
return sensors_list
|
|
|
|
def _generate_sensor_id(self, sensor_idx: int, reading_id: int, label: str) -> str:
|
|
"""Génère un ID unique pour un capteur"""
|
|
# Nettoyer le label pour l'ID
|
|
clean_label = label.replace(' ', '_').replace('[', '').replace(']', '')
|
|
clean_label = clean_label.replace('#', '').replace(':', '_').replace('/', '_')
|
|
return f"/hwinfo/{sensor_idx}_{reading_id}_{clean_label}"
|
|
|
|
def _guess_hardware_type(self, sensor_name: str) -> str:
|
|
"""Devine le type de hardware basé sur le nom du sensor"""
|
|
name_lower = sensor_name.lower()
|
|
|
|
if 'cpu' in name_lower or 'processor' in name_lower:
|
|
return 'CPU'
|
|
elif 'gpu' in name_lower or 'graphics' in name_lower or 'radeon' in name_lower or 'geforce' in name_lower:
|
|
return 'GPU'
|
|
elif 'memory' in name_lower or 'ram' in name_lower or 'dimm' in name_lower:
|
|
return 'RAM'
|
|
elif 'drive' in name_lower or 'disk' in name_lower or 'ssd' in name_lower or 'hdd' in name_lower or 's.m.a.r.t' in name_lower:
|
|
return 'Storage'
|
|
elif 'network' in name_lower or 'ethernet' in name_lower or 'wifi' in name_lower or 'réseau' in name_lower:
|
|
return 'Network'
|
|
elif 'system' in name_lower or 'système' in name_lower:
|
|
return 'System'
|
|
else:
|
|
return 'Other'
|
|
|
|
# === Gestion de l'historique ===
|
|
|
|
def update_history(self, sensors: List[Dict]):
|
|
"""Met à jour l'historique des capteurs pour les graphiques"""
|
|
for sensor in sensors:
|
|
sensor_id = sensor.get("id", "")
|
|
if not sensor_id:
|
|
continue
|
|
|
|
raw_value = sensor.get("raw_value", 0.0)
|
|
|
|
if sensor_id not in self.history:
|
|
self.history[sensor_id] = []
|
|
|
|
self.history[sensor_id].append(raw_value)
|
|
|
|
if len(self.history[sensor_id]) > self.max_history:
|
|
self.history[sensor_id] = self.history[sensor_id][-self.max_history:]
|
|
|
|
def get_history(self, sensor_id: str, count: int = 60) -> List[float]:
|
|
"""Récupère l'historique d'un capteur"""
|
|
if sensor_id not in self.history:
|
|
return []
|
|
return self.history[sensor_id][-count:]
|