Files
PC-Monitor/plugins/plexamp.py
2026-03-24 07:17:48 +01:00

220 lines
7.4 KiB
Python

"""
Plugin Plexamp - Récupère l'état de lecture depuis le serveur Plex
Interroge /status/sessions sur le serveur Plex pour voir les sessions actives
Configuration:
- host: IP/hostname du serveur Plex (ex: 192.168.1.235)
- port: Port du serveur Plex (défaut: 32400)
- token: Token d'authentification Plex
"""
import requests
from typing import Dict, Optional, Any
class PlexampPlugin:
"""Plugin pour Plexamp via serveur Plex"""
def __init__(self, config: dict):
self.host = config.get('host', '192.168.1.235')
self.port = config.get('port', 32400)
self.token = config.get('token', '')
self.config = config
def get_id(self) -> str:
return 'plexamp'
def get_name(self) -> str:
return 'Plexamp'
def get_default_port(self) -> int:
return 32400
def get_base_url(self) -> str:
return f"http://{self.host}:{self.port}"
def _get_headers(self) -> Dict[str, str]:
"""Headers pour l'API Plex"""
return {
'X-Plex-Token': self.token,
'Accept': 'application/json'
}
def test_connection(self) -> Dict[str, Any]:
"""Teste la connexion au serveur Plex"""
if not self.token:
return {
'success': False,
'message': 'Token Plex non configuré. Ajoutez-le dans les paramètres.'
}
try:
url = f"{self.get_base_url()}/status/sessions"
response = requests.get(url, headers=self._get_headers(), timeout=5)
if response.status_code == 401:
return {
'success': False,
'message': 'Token invalide ou expiré.'
}
response.raise_for_status()
return {
'success': True,
'message': f'Connecté au serveur Plex ({self.host}:{self.port})',
'version': 'Plex Media Server'
}
except requests.exceptions.ConnectionError:
return {
'success': False,
'message': f'Impossible de se connecter à {self.get_base_url()}. Vérifiez l\'adresse du serveur.'
}
except requests.exceptions.Timeout:
return {
'success': False,
'message': 'Timeout - Le serveur ne répond pas'
}
except Exception as e:
return {
'success': False,
'message': f'Erreur: {str(e)}'
}
def get_now_playing(self) -> Optional[Dict[str, Any]]:
"""Récupère les infos de lecture en cours depuis le serveur Plex"""
if not self.token:
return None
try:
url = f"{self.get_base_url()}/status/sessions"
response = requests.get(url, headers=self._get_headers(), timeout=5)
response.raise_for_status()
# Parser le JSON
data = response.json()
media_container = data.get('MediaContainer', {})
# Vérifier s'il y a des sessions actives
if media_container.get('size', 0) == 0:
return {
'playing': False,
'state': 'stopped'
}
# Chercher une session audio (Plexamp)
metadata_list = media_container.get('Metadata', [])
music_session = None
for session in metadata_list:
# Filtrer les sessions audio
if session.get('type') == 'track':
music_session = session
break
if not music_session:
return {
'playing': False,
'state': 'stopped'
}
# Extraire les infos
player = music_session.get('Player', {})
state = player.get('state', 'stopped')
# Temps en millisecondes
view_offset = music_session.get('viewOffset', 0)
duration = music_session.get('duration', 0)
result = {
'playing': state == 'playing',
'state': state,
'time': view_offset,
'duration': duration,
'progress': (view_offset / duration * 100) if duration > 0 else 0,
'title': music_session.get('title', ''),
'artist': music_session.get('grandparentTitle', ''),
'album': music_session.get('parentTitle', ''),
'year': music_session.get('parentYear', ''),
'thumb': music_session.get('thumb', ''),
'art': music_session.get('art', ''),
'player_name': player.get('title', ''),
'player_device': player.get('device', ''),
'session_key': music_session.get('sessionKey', ''),
'rating_key': music_session.get('ratingKey', ''),
'machine_id': player.get('machineIdentifier', '')
}
return result
except Exception as e:
print(f"Erreur Plexamp get_now_playing: {e}")
return None
def get_artwork_url(self, thumb_path: str) -> str:
"""Construit l'URL complète pour l'artwork"""
if not thumb_path:
return ''
return f"{self.get_base_url()}{thumb_path}?X-Plex-Token={self.token}"
# === Contrôles de lecture ===
def play(self) -> bool:
"""Reprend la lecture"""
return self._send_command('play')
def pause(self) -> bool:
"""Met en pause"""
return self._send_command('pause')
def play_pause(self) -> bool:
"""Toggle play/pause"""
now_playing = self.get_now_playing()
if now_playing and now_playing.get('playing'):
return self.pause()
else:
return self.play()
def next_track(self) -> bool:
"""Piste suivante"""
return self._send_command('skipNext')
def prev_track(self) -> bool:
"""Piste précédente"""
return self._send_command('skipPrevious')
def stop(self) -> bool:
"""Arrête la lecture"""
return self._send_command('stop')
def _send_command(self, command: str) -> bool:
"""Envoie une commande au player via le serveur Plex"""
if not self.token:
return False
try:
# Récupérer la session active pour avoir le machineIdentifier
now_playing = self.get_now_playing()
if not now_playing or not now_playing.get('machine_id'):
print("Pas de session active trouvée")
return False
machine_id = now_playing['machine_id']
# Envoyer la commande au client
url = f"{self.get_base_url()}/player/playback/{command}"
params = {
'commandID': 1,
'type': 'music'
}
headers = self._get_headers()
headers['X-Plex-Target-Client-Identifier'] = machine_id
response = requests.get(url, headers=headers, params=params, timeout=5)
return response.status_code == 200
except Exception as e:
print(f"Erreur Plexamp command {command}: {e}")
return False