""" Plugin pour HWiNFO via RemoteHWInfo (Demion) https://github.com/Demion/remotehwinfo Format JSON RemoteHWInfo: { "hwinfo": { "sensors": [ {"entryIndex": 0, "sensorNameUser": "System", ...}, {"entryIndex": 1, "sensorNameUser": "CPU [#0]: Intel Core i7", ...} ], "readings": [ {"sensorIndex": 0, "labelUser": "Memory Load", "value": 45.5, "unit": "%", ...}, {"sensorIndex": 1, "labelUser": "CPU Temp", "value": 65.0, "unit": "°C", ...} ] } } """ import requests from typing import Dict, List, Optional, Any from .base import BasePlugin class HWiNFOPlugin(BasePlugin): """Plugin pour HWiNFO via RemoteHWInfo""" def __init__(self, config: dict): super().__init__(config) self.history = {} self.max_history = 120 def get_id(self) -> str: return 'hwinfo' def get_name(self) -> str: return 'HWiNFO' def get_default_port(self) -> int: return 60000 def get_base_url(self) -> str: """Retourne l'URL de l'API RemoteHWInfo""" return f"http://{self.host}:{self.port}/json.json" def test_connection(self) -> Dict[str, Any]: """Teste la connexion à RemoteHWInfo""" try: url = self.get_base_url() response = requests.get(url, timeout=5) response.raise_for_status() data = response.json() # Vérifier le format RemoteHWInfo if 'hwinfo' not in data: return { 'success': False, 'message': 'Format invalide. Vérifiez que RemoteHWInfo est lancé.' } hwinfo = data['hwinfo'] sensor_count = len(hwinfo.get('sensors', [])) reading_count = len(hwinfo.get('readings', [])) return { 'success': True, 'message': f'Connecté - {sensor_count} capteurs, {reading_count} lectures', 'version': 'RemoteHWInfo', 'sensor_count': reading_count } except requests.exceptions.ConnectionError: return { 'success': False, 'message': f'Impossible de se connecter à {self.get_base_url()}. Vérifiez que RemoteHWInfo est lancé.' } except requests.exceptions.Timeout: return { 'success': False, 'message': 'Timeout - Le serveur ne répond pas' } except Exception as e: return { 'success': False, 'message': f'Erreur: {str(e)}' } def get_data(self) -> Optional[Dict]: """Récupère les données JSON depuis RemoteHWInfo""" try: url = self.get_base_url() response = requests.get(url, timeout=5) response.raise_for_status() data = response.json() if 'hwinfo' in data: return data['hwinfo'] return None except Exception as e: print(f"Erreur HWiNFO get_data: {e}") return None def get_hierarchy(self) -> List[dict]: """Récupère la hiérarchie des capteurs pour l'admin""" data = self.get_data() if not data: return [] return self._build_hierarchy(data) def _build_hierarchy(self, data: Dict) -> List[dict]: """Construit la hiérarchie des capteurs""" sensors = data.get('sensors', []) readings = data.get('readings', []) # Créer un dictionnaire des sensors par index sensor_map = {} for sensor in sensors: idx = sensor.get('entryIndex', 0) sensor_map[idx] = { 'name': sensor.get('sensorNameUser', sensor.get('sensorNameOriginal', 'Unknown')), 'value': '', 'id': '', 'level': 0, 'type': 'group', 'children': [] } # Ajouter les readings aux sensors correspondants for reading in readings: sensor_idx = reading.get('sensorIndex', 0) if sensor_idx not in sensor_map: continue label = reading.get('labelUser', reading.get('labelOriginal', 'Unknown')) value = reading.get('value', 0) unit = reading.get('unit', '') # Générer un ID unique sensor_id = self._generate_sensor_id(sensor_idx, reading.get('readingId', 0), label) # Formater la valeur if isinstance(value, float): if value == int(value): formatted_value = f"{int(value)} {unit}".strip() else: formatted_value = f"{value:.2f} {unit}".strip() else: formatted_value = f"{value} {unit}".strip() reading_entry = { 'name': label, 'value': formatted_value, 'id': sensor_id, 'level': 1, 'type': self.get_sensor_type(unit, label), 'children': [] } sensor_map[sensor_idx]['children'].append(reading_entry) # Retourner seulement les sensors qui ont des readings return [s for s in sensor_map.values() if s['children']] def parse_sensors(self, data: Dict) -> List[Dict]: """Parse les données et retourne une liste plate de capteurs""" sensors_list = [] if not data: return sensors_list sensors = data.get('sensors', []) readings = data.get('readings', []) # Créer un dictionnaire des sensors par index sensor_names = {} for sensor in sensors: idx = sensor.get('entryIndex', 0) sensor_names[idx] = sensor.get('sensorNameUser', sensor.get('sensorNameOriginal', 'Unknown')) # Parser chaque reading for reading in readings: sensor_idx = reading.get('sensorIndex', 0) hardware_name = sensor_names.get(sensor_idx, 'Unknown') label = reading.get('labelUser', reading.get('labelOriginal', 'Unknown')) value = reading.get('value', 0) unit = reading.get('unit', '') # Générer un ID unique sensor_id = self._generate_sensor_id(sensor_idx, reading.get('readingId', 0), label) # Formater la valeur if isinstance(value, float): if value == int(value): formatted_value = f"{int(value)} {unit}".strip() else: formatted_value = f"{value:.2f} {unit}".strip() else: formatted_value = f"{value} {unit}".strip() sensor_type = self.get_sensor_type(unit, label) hardware_type = self._guess_hardware_type(hardware_name) sensor = { 'id': sensor_id, 'name': label, 'value': formatted_value, 'raw_value': float(value) if value else 0.0, 'type': sensor_type, 'unit': unit, 'hardware': hardware_name, 'hardware_type': hardware_type, 'category': hardware_name, 'path': f"{hardware_name}/{label}", } sensors_list.append(sensor) return sensors_list def _generate_sensor_id(self, sensor_idx: int, reading_id: int, label: str) -> str: """Génère un ID unique pour un capteur""" # Nettoyer le label pour l'ID clean_label = label.replace(' ', '_').replace('[', '').replace(']', '') clean_label = clean_label.replace('#', '').replace(':', '_').replace('/', '_') return f"/hwinfo/{sensor_idx}_{reading_id}_{clean_label}" def _guess_hardware_type(self, sensor_name: str) -> str: """Devine le type de hardware basé sur le nom du sensor""" name_lower = sensor_name.lower() if 'cpu' in name_lower or 'processor' in name_lower: return 'CPU' elif 'gpu' in name_lower or 'graphics' in name_lower or 'radeon' in name_lower or 'geforce' in name_lower: return 'GPU' elif 'memory' in name_lower or 'ram' in name_lower or 'dimm' in name_lower: return 'RAM' elif 'drive' in name_lower or 'disk' in name_lower or 'ssd' in name_lower or 'hdd' in name_lower or 's.m.a.r.t' in name_lower: return 'Storage' elif 'network' in name_lower or 'ethernet' in name_lower or 'wifi' in name_lower or 'réseau' in name_lower: return 'Network' elif 'system' in name_lower or 'système' in name_lower: return 'System' else: return 'Other' # === Gestion de l'historique === def update_history(self, sensors: List[Dict]): """Met à jour l'historique des capteurs pour les graphiques""" for sensor in sensors: sensor_id = sensor.get("id", "") if not sensor_id: continue raw_value = sensor.get("raw_value", 0.0) if sensor_id not in self.history: self.history[sensor_id] = [] self.history[sensor_id].append(raw_value) if len(self.history[sensor_id]) > self.max_history: self.history[sensor_id] = self.history[sensor_id][-self.max_history:] def get_history(self, sensor_id: str, count: int = 60) -> List[float]: """Récupère l'historique d'un capteur""" if sensor_id not in self.history: return [] return self.history[sensor_id][-count:]