""" Plugin Plexamp - Récupère l'état de lecture depuis le serveur Plex Interroge /status/sessions sur le serveur Plex pour voir les sessions actives Configuration: - host: IP/hostname du serveur Plex (ex: 192.168.1.235) - port: Port du serveur Plex (défaut: 32400) - token: Token d'authentification Plex """ import requests from typing import Dict, Optional, Any class PlexampPlugin: """Plugin pour Plexamp via serveur Plex""" def __init__(self, config: dict): self.host = config.get('host', '192.168.1.235') self.port = config.get('port', 32400) self.token = config.get('token', '') self.config = config def get_id(self) -> str: return 'plexamp' def get_name(self) -> str: return 'Plexamp' def get_default_port(self) -> int: return 32400 def get_base_url(self) -> str: return f"http://{self.host}:{self.port}" def _get_headers(self) -> Dict[str, str]: """Headers pour l'API Plex""" return { 'X-Plex-Token': self.token, 'Accept': 'application/json' } def test_connection(self) -> Dict[str, Any]: """Teste la connexion au serveur Plex""" if not self.token: return { 'success': False, 'message': 'Token Plex non configuré. Ajoutez-le dans les paramètres.' } try: url = f"{self.get_base_url()}/status/sessions" response = requests.get(url, headers=self._get_headers(), timeout=5) if response.status_code == 401: return { 'success': False, 'message': 'Token invalide ou expiré.' } response.raise_for_status() return { 'success': True, 'message': f'Connecté au serveur Plex ({self.host}:{self.port})', 'version': 'Plex Media Server' } except requests.exceptions.ConnectionError: return { 'success': False, 'message': f'Impossible de se connecter à {self.get_base_url()}. Vérifiez l\'adresse du serveur.' } except requests.exceptions.Timeout: return { 'success': False, 'message': 'Timeout - Le serveur ne répond pas' } except Exception as e: return { 'success': False, 'message': f'Erreur: {str(e)}' } def get_now_playing(self) -> Optional[Dict[str, Any]]: """Récupère les infos de lecture en cours depuis le serveur Plex""" if not self.token: return None try: url = f"{self.get_base_url()}/status/sessions" response = requests.get(url, headers=self._get_headers(), timeout=5) response.raise_for_status() # Parser le JSON data = response.json() media_container = data.get('MediaContainer', {}) # Vérifier s'il y a des sessions actives if media_container.get('size', 0) == 0: return { 'playing': False, 'state': 'stopped' } # Chercher une session audio (Plexamp) metadata_list = media_container.get('Metadata', []) music_session = None for session in metadata_list: # Filtrer les sessions audio if session.get('type') == 'track': music_session = session break if not music_session: return { 'playing': False, 'state': 'stopped' } # Extraire les infos player = music_session.get('Player', {}) state = player.get('state', 'stopped') # Temps en millisecondes view_offset = music_session.get('viewOffset', 0) duration = music_session.get('duration', 0) result = { 'playing': state == 'playing', 'state': state, 'time': view_offset, 'duration': duration, 'progress': (view_offset / duration * 100) if duration > 0 else 0, 'title': music_session.get('title', ''), 'artist': music_session.get('grandparentTitle', ''), 'album': music_session.get('parentTitle', ''), 'year': music_session.get('parentYear', ''), 'thumb': music_session.get('thumb', ''), 'art': music_session.get('art', ''), 'player_name': player.get('title', ''), 'player_device': player.get('device', ''), 'session_key': music_session.get('sessionKey', ''), 'rating_key': music_session.get('ratingKey', ''), 'machine_id': player.get('machineIdentifier', '') } return result except Exception as e: print(f"Erreur Plexamp get_now_playing: {e}") return None def get_artwork_url(self, thumb_path: str) -> str: """Construit l'URL complète pour l'artwork""" if not thumb_path: return '' return f"{self.get_base_url()}{thumb_path}?X-Plex-Token={self.token}" # === Contrôles de lecture === def play(self) -> bool: """Reprend la lecture""" return self._send_command('play') def pause(self) -> bool: """Met en pause""" return self._send_command('pause') def play_pause(self) -> bool: """Toggle play/pause""" now_playing = self.get_now_playing() if now_playing and now_playing.get('playing'): return self.pause() else: return self.play() def next_track(self) -> bool: """Piste suivante""" return self._send_command('skipNext') def prev_track(self) -> bool: """Piste précédente""" return self._send_command('skipPrevious') def stop(self) -> bool: """Arrête la lecture""" return self._send_command('stop') def _send_command(self, command: str) -> bool: """Envoie une commande au player via le serveur Plex""" if not self.token: return False try: # Récupérer la session active pour avoir le machineIdentifier now_playing = self.get_now_playing() if not now_playing or not now_playing.get('machine_id'): print("Pas de session active trouvée") return False machine_id = now_playing['machine_id'] # Envoyer la commande au client url = f"{self.get_base_url()}/player/playback/{command}" params = { 'commandID': 1, 'type': 'music' } headers = self._get_headers() headers['X-Plex-Target-Client-Identifier'] = machine_id response = requests.get(url, headers=headers, params=params, timeout=5) return response.status_code == 200 except Exception as e: print(f"Erreur Plexamp command {command}: {e}") return False