220 lines
7.4 KiB
Python
220 lines
7.4 KiB
Python
"""
|
|
Plugin Plexamp - Récupère l'état de lecture depuis le serveur Plex
|
|
Interroge /status/sessions sur le serveur Plex pour voir les sessions actives
|
|
|
|
Configuration:
|
|
- host: IP/hostname du serveur Plex (ex: 192.168.1.235)
|
|
- port: Port du serveur Plex (défaut: 32400)
|
|
- token: Token d'authentification Plex
|
|
"""
|
|
import requests
|
|
from typing import Dict, Optional, Any
|
|
|
|
|
|
class PlexampPlugin:
|
|
"""Plugin pour Plexamp via serveur Plex"""
|
|
|
|
def __init__(self, config: dict):
|
|
self.host = config.get('host', '192.168.1.235')
|
|
self.port = config.get('port', 32400)
|
|
self.token = config.get('token', '')
|
|
self.config = config
|
|
|
|
def get_id(self) -> str:
|
|
return 'plexamp'
|
|
|
|
def get_name(self) -> str:
|
|
return 'Plexamp'
|
|
|
|
def get_default_port(self) -> int:
|
|
return 32400
|
|
|
|
def get_base_url(self) -> str:
|
|
return f"http://{self.host}:{self.port}"
|
|
|
|
def _get_headers(self) -> Dict[str, str]:
|
|
"""Headers pour l'API Plex"""
|
|
return {
|
|
'X-Plex-Token': self.token,
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
def test_connection(self) -> Dict[str, Any]:
|
|
"""Teste la connexion au serveur Plex"""
|
|
if not self.token:
|
|
return {
|
|
'success': False,
|
|
'message': 'Token Plex non configuré. Ajoutez-le dans les paramètres.'
|
|
}
|
|
|
|
try:
|
|
url = f"{self.get_base_url()}/status/sessions"
|
|
response = requests.get(url, headers=self._get_headers(), timeout=5)
|
|
|
|
if response.status_code == 401:
|
|
return {
|
|
'success': False,
|
|
'message': 'Token invalide ou expiré.'
|
|
}
|
|
|
|
response.raise_for_status()
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'Connecté au serveur Plex ({self.host}:{self.port})',
|
|
'version': 'Plex Media Server'
|
|
}
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
return {
|
|
'success': False,
|
|
'message': f'Impossible de se connecter à {self.get_base_url()}. Vérifiez l\'adresse du serveur.'
|
|
}
|
|
except requests.exceptions.Timeout:
|
|
return {
|
|
'success': False,
|
|
'message': 'Timeout - Le serveur ne répond pas'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'message': f'Erreur: {str(e)}'
|
|
}
|
|
|
|
def get_now_playing(self) -> Optional[Dict[str, Any]]:
|
|
"""Récupère les infos de lecture en cours depuis le serveur Plex"""
|
|
if not self.token:
|
|
return None
|
|
|
|
try:
|
|
url = f"{self.get_base_url()}/status/sessions"
|
|
response = requests.get(url, headers=self._get_headers(), timeout=5)
|
|
response.raise_for_status()
|
|
|
|
# Parser le JSON
|
|
data = response.json()
|
|
|
|
media_container = data.get('MediaContainer', {})
|
|
|
|
# Vérifier s'il y a des sessions actives
|
|
if media_container.get('size', 0) == 0:
|
|
return {
|
|
'playing': False,
|
|
'state': 'stopped'
|
|
}
|
|
|
|
# Chercher une session audio (Plexamp)
|
|
metadata_list = media_container.get('Metadata', [])
|
|
|
|
music_session = None
|
|
for session in metadata_list:
|
|
# Filtrer les sessions audio
|
|
if session.get('type') == 'track':
|
|
music_session = session
|
|
break
|
|
|
|
if not music_session:
|
|
return {
|
|
'playing': False,
|
|
'state': 'stopped'
|
|
}
|
|
|
|
# Extraire les infos
|
|
player = music_session.get('Player', {})
|
|
state = player.get('state', 'stopped')
|
|
|
|
# Temps en millisecondes
|
|
view_offset = music_session.get('viewOffset', 0)
|
|
duration = music_session.get('duration', 0)
|
|
|
|
result = {
|
|
'playing': state == 'playing',
|
|
'state': state,
|
|
'time': view_offset,
|
|
'duration': duration,
|
|
'progress': (view_offset / duration * 100) if duration > 0 else 0,
|
|
'title': music_session.get('title', ''),
|
|
'artist': music_session.get('grandparentTitle', ''),
|
|
'album': music_session.get('parentTitle', ''),
|
|
'year': music_session.get('parentYear', ''),
|
|
'thumb': music_session.get('thumb', ''),
|
|
'art': music_session.get('art', ''),
|
|
'player_name': player.get('title', ''),
|
|
'player_device': player.get('device', ''),
|
|
'session_key': music_session.get('sessionKey', ''),
|
|
'rating_key': music_session.get('ratingKey', ''),
|
|
'machine_id': player.get('machineIdentifier', '')
|
|
}
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"Erreur Plexamp get_now_playing: {e}")
|
|
return None
|
|
|
|
def get_artwork_url(self, thumb_path: str) -> str:
|
|
"""Construit l'URL complète pour l'artwork"""
|
|
if not thumb_path:
|
|
return ''
|
|
return f"{self.get_base_url()}{thumb_path}?X-Plex-Token={self.token}"
|
|
|
|
# === Contrôles de lecture ===
|
|
|
|
def play(self) -> bool:
|
|
"""Reprend la lecture"""
|
|
return self._send_command('play')
|
|
|
|
def pause(self) -> bool:
|
|
"""Met en pause"""
|
|
return self._send_command('pause')
|
|
|
|
def play_pause(self) -> bool:
|
|
"""Toggle play/pause"""
|
|
now_playing = self.get_now_playing()
|
|
if now_playing and now_playing.get('playing'):
|
|
return self.pause()
|
|
else:
|
|
return self.play()
|
|
|
|
def next_track(self) -> bool:
|
|
"""Piste suivante"""
|
|
return self._send_command('skipNext')
|
|
|
|
def prev_track(self) -> bool:
|
|
"""Piste précédente"""
|
|
return self._send_command('skipPrevious')
|
|
|
|
def stop(self) -> bool:
|
|
"""Arrête la lecture"""
|
|
return self._send_command('stop')
|
|
|
|
def _send_command(self, command: str) -> bool:
|
|
"""Envoie une commande au player via le serveur Plex"""
|
|
if not self.token:
|
|
return False
|
|
|
|
try:
|
|
# Récupérer la session active pour avoir le machineIdentifier
|
|
now_playing = self.get_now_playing()
|
|
if not now_playing or not now_playing.get('machine_id'):
|
|
print("Pas de session active trouvée")
|
|
return False
|
|
|
|
machine_id = now_playing['machine_id']
|
|
|
|
# Envoyer la commande au client
|
|
url = f"{self.get_base_url()}/player/playback/{command}"
|
|
params = {
|
|
'commandID': 1,
|
|
'type': 'music'
|
|
}
|
|
headers = self._get_headers()
|
|
headers['X-Plex-Target-Client-Identifier'] = machine_id
|
|
|
|
response = requests.get(url, headers=headers, params=params, timeout=5)
|
|
return response.status_code == 200
|
|
|
|
except Exception as e:
|
|
print(f"Erreur Plexamp command {command}: {e}")
|
|
return False
|